118 session.execute('DELETE X %s Y WHERE Y eid %%(y)s, ' |
118 session.execute('DELETE X %s Y WHERE Y eid %%(y)s, ' |
119 'NOT X eid %%(x)s' % rtype, |
119 'NOT X eid %%(x)s' % rtype, |
120 {'x': eidfrom, 'y': eidto}) |
120 {'x': eidfrom, 'y': eidto}) |
121 |
121 |
122 |
122 |
|
123 |
123 class NullEventBus(object): |
124 class NullEventBus(object): |
124 def publish(self, msg): |
125 def publish(self, msg): |
125 pass |
126 pass |
126 |
127 |
127 def add_subscription(self, topic, callback): |
128 def add_subscription(self, topic, callback): |
144 def __init__(self, config, vreg=None): |
145 def __init__(self, config, vreg=None): |
145 self.config = config |
146 self.config = config |
146 if vreg is None: |
147 if vreg is None: |
147 vreg = cwvreg.CWRegistryStore(config) |
148 vreg = cwvreg.CWRegistryStore(config) |
148 self.vreg = vreg |
149 self.vreg = vreg |
|
150 self._tasks_manager = utils.TasksManager() |
|
151 |
149 self.pyro_registered = False |
152 self.pyro_registered = False |
150 self.pyro_uri = None |
153 self.pyro_uri = None |
151 self.app_instances_bus = NullEventBus() |
154 self.app_instances_bus = NullEventBus() |
152 self.info('starting repository from %s', self.config.apphome) |
155 self.info('starting repository from %s', self.config.apphome) |
153 # dictionary of opened sessions |
156 # dictionary of opened sessions |
154 self._sessions = {} |
157 self._sessions = {} |
|
158 |
|
159 |
155 # list of functions to be called at regular interval |
160 # list of functions to be called at regular interval |
156 self._looping_tasks = [] |
|
157 # list of running threads |
161 # list of running threads |
158 self._running_threads = [] |
162 self._running_threads = [] |
159 # initial schema, should be build or replaced latter |
163 # initial schema, should be build or replaced latter |
160 self.schema = schema.CubicWebSchema(config.appid) |
164 self.schema = schema.CubicWebSchema(config.appid) |
161 self.vreg.schema = self.schema # until actual schema is loaded... |
165 self.vreg.schema = self.schema # until actual schema is loaded... |
357 # register a task to cleanup expired session |
361 # register a task to cleanup expired session |
358 self.cleanup_session_time = self.config['cleanup-session-time'] or 60 * 60 * 24 |
362 self.cleanup_session_time = self.config['cleanup-session-time'] or 60 * 60 * 24 |
359 assert self.cleanup_session_time > 0 |
363 assert self.cleanup_session_time > 0 |
360 cleanup_session_interval = min(60*60, self.cleanup_session_time / 3) |
364 cleanup_session_interval = min(60*60, self.cleanup_session_time / 3) |
361 self.looping_task(cleanup_session_interval, self.clean_sessions) |
365 self.looping_task(cleanup_session_interval, self.clean_sessions) |
362 assert isinstance(self._looping_tasks, list), 'already started' |
366 self._tasks_manager.start() |
363 for i, (interval, func, args) in enumerate(self._looping_tasks): |
|
364 self._looping_tasks[i] = task = utils.LoopTask(self, interval, func, args) |
|
365 self.info('starting task %s with interval %.2fs', task.name, |
|
366 interval) |
|
367 task.start() |
|
368 # ensure no tasks will be further added |
|
369 self._looping_tasks = tuple(self._looping_tasks) |
|
370 |
367 |
371 def looping_task(self, interval, func, *args): |
368 def looping_task(self, interval, func, *args): |
372 """register a function to be called every `interval` seconds. |
369 """register a function to be called every `interval` seconds. |
373 |
370 |
374 looping tasks can only be registered during repository initialization, |
371 looping tasks can only be registered during repository initialization, |
375 once done this method will fail. |
372 once done this method will fail. |
376 """ |
373 """ |
377 try: |
374 self._tasks_manager.add_looping_task(interval, func, *args) |
378 self._looping_tasks.append( (interval, func, args) ) |
|
379 except AttributeError: |
|
380 raise RuntimeError("can't add looping task once the repository is started") |
|
381 |
375 |
382 def threaded_task(self, func): |
376 def threaded_task(self, func): |
383 """start function in a separated thread""" |
377 """start function in a separated thread""" |
384 t = utils.RepoThread(func, self._running_threads) |
378 utils.RepoThread(func, self._running_threads).start() |
385 t.start() |
|
386 |
379 |
387 #@locked |
380 #@locked |
388 def _get_cnxset(self): |
381 def _get_cnxset(self): |
389 try: |
382 try: |
390 return self._cnxsets_pool.get(True, timeout=5) |
383 return self._cnxsets_pool.get(True, timeout=5) |
410 connections |
403 connections |
411 """ |
404 """ |
412 assert not self.shutting_down, 'already shutting down' |
405 assert not self.shutting_down, 'already shutting down' |
413 self.shutting_down = True |
406 self.shutting_down = True |
414 self.system_source.shutdown() |
407 self.system_source.shutdown() |
415 if isinstance(self._looping_tasks, tuple): # if tasks have been started |
408 self._tasks_manager.stop() |
416 for looptask in self._looping_tasks: |
|
417 self.info('canceling task %s...', looptask.name) |
|
418 looptask.cancel() |
|
419 looptask.join() |
|
420 self.info('task %s finished', looptask.name) |
|
421 for thread in self._running_threads: |
409 for thread in self._running_threads: |
422 self.info('waiting thread %s...', thread.getName()) |
410 self.info('waiting thread %s...', thread.getName()) |
423 thread.join() |
411 thread.join() |
424 self.info('thread %s finished', thread.getName()) |
412 self.info('thread %s finished', thread.getName()) |
425 if not (self.config.creating or self.config.repairing |
413 if not (self.config.creating or self.config.repairing |
515 results['type_source_cache_size'] = len(self._type_source_cache) |
503 results['type_source_cache_size'] = len(self._type_source_cache) |
516 results['extid_cache_size'] = len(self._extid_cache) |
504 results['extid_cache_size'] = len(self._extid_cache) |
517 results['sql_no_cache'] = self.system_source.no_cache |
505 results['sql_no_cache'] = self.system_source.no_cache |
518 results['nb_open_sessions'] = len(self._sessions) |
506 results['nb_open_sessions'] = len(self._sessions) |
519 results['nb_active_threads'] = threading.activeCount() |
507 results['nb_active_threads'] = threading.activeCount() |
520 results['looping_tasks'] = ', '.join(str(t) for t in self._looping_tasks) |
508 looping_tasks = self._tasks_manager._looping_tasks |
|
509 results['looping_tasks'] = ', '.join(str(t) for t in looping_tasks) |
521 results['available_cnxsets'] = self._cnxsets_pool.qsize() |
510 results['available_cnxsets'] = self._cnxsets_pool.qsize() |
522 results['threads'] = ', '.join(sorted(str(t) for t in threading.enumerate())) |
511 results['threads'] = ', '.join(sorted(str(t) for t in threading.enumerate())) |
523 return results |
512 return results |
524 |
513 |
525 def gc_stats(self, nmax=20): |
514 def gc_stats(self, nmax=20): |