209 class Repository(object): |
209 class Repository(object): |
210 """a repository provides access to a set of persistent storages for |
210 """a repository provides access to a set of persistent storages for |
211 entities and relations |
211 entities and relations |
212 """ |
212 """ |
213 |
213 |
214 def __init__(self, config, tasks_manager=None, vreg=None): |
214 def __init__(self, config, scheduler=None, vreg=None): |
215 self.config = config |
215 self.config = config |
216 self.sources_by_eid = {} |
216 self.sources_by_eid = {} |
217 if vreg is None: |
217 if vreg is None: |
218 vreg = cwvreg.CWRegistryStore(config) |
218 vreg = cwvreg.CWRegistryStore(config) |
219 self.vreg = vreg |
219 self.vreg = vreg |
220 self._tasks_manager = tasks_manager |
220 self._scheduler = scheduler |
221 |
221 |
222 self.app_instances_bus = NullEventBus() |
222 self.app_instances_bus = NullEventBus() |
223 # dictionary of opened sessions |
223 # dictionary of opened sessions |
224 self._sessions = {} |
224 self._sessions = {} |
225 |
225 |
407 * register session clean up task. |
407 * register session clean up task. |
408 """ |
408 """ |
409 if not (self.config.creating or self.config.repairing |
409 if not (self.config.creating or self.config.repairing |
410 or self.config.quick_start): |
410 or self.config.quick_start): |
411 # register a task to cleanup expired session |
411 # register a task to cleanup expired session |
412 if self._tasks_manager is not None: |
412 if self._scheduler is not None: |
413 self.cleanup_session_time = self.config['cleanup-session-time'] or 60 * 60 * 24 |
413 self.cleanup_session_time = self.config['cleanup-session-time'] or 60 * 60 * 24 |
414 assert self.cleanup_session_time > 0 |
414 assert self.cleanup_session_time > 0 |
415 cleanup_session_interval = min(60 * 60, self.cleanup_session_time / 3) |
415 cleanup_session_interval = min(60 * 60, self.cleanup_session_time / 3) |
416 self.looping_task(cleanup_session_interval, self.clean_sessions) |
416 self.looping_task(cleanup_session_interval, self.clean_sessions) |
417 |
417 |
418 def start_looping_tasks(self): |
418 def run_scheduler(self): |
419 """Actual "Repository as a server" startup. |
419 """Start repository scheduler after preparing the repository for that. |
420 |
420 |
421 * trigger server startup hook, |
421 * trigger server startup hook, |
422 * register session clean up task, |
422 * register session clean up task, |
423 * start all tasks. |
423 * start the scheduler *and block*. |
424 |
424 |
425 XXX Other startup related stuffs are done elsewhere. In Repository |
425 XXX Other startup related stuffs are done elsewhere. In Repository |
426 XXX __init__ or in external codes (various server managers). |
426 XXX __init__ or in external codes (various server managers). |
427 """ |
427 """ |
428 self._prepare_startup() |
428 self._prepare_startup() |
429 assert self._tasks_manager is not None,\ |
429 assert self._scheduler is not None, \ |
430 "This Repository is not intended to be used as a server" |
430 "This Repository is not intended to be used as a server" |
431 self._tasks_manager.start() |
431 self.info( |
|
432 'starting repository scheduler with tasks: %s', |
|
433 ', '.join(e.action.__name__ for e in self._scheduler.queue)) |
|
434 self._scheduler.run() |
432 |
435 |
433 def looping_task(self, interval, func, *args): |
436 def looping_task(self, interval, func, *args): |
434 """register a function to be called every `interval` seconds. |
437 """register a function to be called every `interval` seconds. |
435 |
438 |
436 looping tasks can only be registered during repository initialization, |
439 looping tasks can only be registered during repository initialization, |
437 once done this method will fail. |
440 once done this method will fail. |
438 """ |
441 """ |
439 assert self._tasks_manager is not None,\ |
442 assert self._scheduler is not None, \ |
440 "This Repository is not intended to be used as a server" |
443 "This Repository is not intended to be used as a server" |
441 self._tasks_manager.add_looping_task(interval, func, *args) |
444 event = utils.schedule_periodic_task( |
|
445 self._scheduler, interval, func, *args) |
|
446 self.info('scheduled periodic task %s (interval: %.2fs)', |
|
447 event.action.__name__, interval) |
442 |
448 |
443 def threaded_task(self, func): |
449 def threaded_task(self, func): |
444 """start function in a separated thread""" |
450 """start function in a separated thread""" |
445 utils.RepoThread(func, self._running_threads).start() |
451 utils.RepoThread(func, self._running_threads).start() |
446 |
452 |
453 or self.config.quick_start): |
459 or self.config.quick_start): |
454 # then, the system source is still available |
460 # then, the system source is still available |
455 self.hm.call_hooks('before_server_shutdown', repo=self) |
461 self.hm.call_hooks('before_server_shutdown', repo=self) |
456 self.shutting_down = True |
462 self.shutting_down = True |
457 self.system_source.shutdown() |
463 self.system_source.shutdown() |
458 if self._tasks_manager is not None: |
|
459 self._tasks_manager.stop() |
|
460 if not (self.config.creating or self.config.repairing |
464 if not (self.config.creating or self.config.repairing |
461 or self.config.quick_start): |
465 or self.config.quick_start): |
462 self.hm.call_hooks('server_shutdown', repo=self) |
466 self.hm.call_hooks('server_shutdown', repo=self) |
463 for thread in self._running_threads: |
467 for thread in self._running_threads: |
464 self.info('waiting thread %s...', thread.getName()) |
468 self.info('waiting thread %s...', thread.getName()) |