140 entities and relations |
140 entities and relations |
141 |
141 |
142 XXX protect pyro access |
142 XXX protect pyro access |
143 """ |
143 """ |
144 |
144 |
145 def __init__(self, config, vreg=None): |
145 def __init__(self, config, tasks_manager=None, vreg=None): |
146 self.config = config |
146 self.config = config |
147 if vreg is None: |
147 if vreg is None: |
148 vreg = cwvreg.CWRegistryStore(config) |
148 vreg = cwvreg.CWRegistryStore(config) |
149 self.vreg = vreg |
149 self.vreg = vreg |
150 self._tasks_manager = utils.TasksManager() |
150 self._tasks_manager = tasks_manager |
151 |
151 |
152 self.pyro_registered = False |
152 self.pyro_registered = False |
153 self.pyro_uri = None |
153 self.pyro_uri = None |
154 self.app_instances_bus = NullEventBus() |
154 self.app_instances_bus = NullEventBus() |
155 self.info('starting repository from %s', self.config.apphome) |
155 self.info('starting repository from %s', self.config.apphome) |
366 self.hm.call_hooks('server_startup', repo=self) |
366 self.hm.call_hooks('server_startup', repo=self) |
367 # register a task to cleanup expired session |
367 # register a task to cleanup expired session |
368 self.cleanup_session_time = self.config['cleanup-session-time'] or 60 * 60 * 24 |
368 self.cleanup_session_time = self.config['cleanup-session-time'] or 60 * 60 * 24 |
369 assert self.cleanup_session_time > 0 |
369 assert self.cleanup_session_time > 0 |
370 cleanup_session_interval = min(60*60, self.cleanup_session_time / 3) |
370 cleanup_session_interval = min(60*60, self.cleanup_session_time / 3) |
|
371 assert self._tasks_manager is not None, "This Repository is not intended to be used as a server" |
371 self._tasks_manager.add_looping_task(cleanup_session_interval, |
372 self._tasks_manager.add_looping_task(cleanup_session_interval, |
372 self.clean_sessions) |
373 self.clean_sessions) |
373 |
374 |
374 def start_looping_tasks(self): |
375 def start_looping_tasks(self): |
375 """Actual "Repository as a server" startup. |
376 """Actual "Repository as a server" startup. |
380 |
381 |
381 XXX Other startup related stuffs are done elsewhere. In Repository |
382 XXX Other startup related stuffs are done elsewhere. In Repository |
382 XXX __init__ or in external codes (various server managers). |
383 XXX __init__ or in external codes (various server managers). |
383 """ |
384 """ |
384 self._prepare_startup() |
385 self._prepare_startup() |
|
386 assert self._tasks_manager is not None, "This Repository is not intended to be used as a server" |
385 self._tasks_manager.start() |
387 self._tasks_manager.start() |
386 |
388 |
387 def looping_task(self, interval, func, *args): |
389 def looping_task(self, interval, func, *args): |
388 """register a function to be called every `interval` seconds. |
390 """register a function to be called every `interval` seconds. |
389 |
391 |
390 looping tasks can only be registered during repository initialization, |
392 looping tasks can only be registered during repository initialization, |
391 once done this method will fail. |
393 once done this method will fail. |
392 """ |
394 """ |
|
395 assert self._tasks_manager is not None, "This Repository is not intended to be used as a server" |
393 self._tasks_manager.add_looping_task(interval, func, *args) |
396 self._tasks_manager.add_looping_task(interval, func, *args) |
394 |
397 |
395 def threaded_task(self, func): |
398 def threaded_task(self, func): |
396 """start function in a separated thread""" |
399 """start function in a separated thread""" |
397 utils.RepoThread(func, self._running_threads).start() |
400 utils.RepoThread(func, self._running_threads).start() |
422 connections |
425 connections |
423 """ |
426 """ |
424 assert not self.shutting_down, 'already shutting down' |
427 assert not self.shutting_down, 'already shutting down' |
425 self.shutting_down = True |
428 self.shutting_down = True |
426 self.system_source.shutdown() |
429 self.system_source.shutdown() |
427 self._tasks_manager.stop() |
430 if self._tasks_manager is not None: |
|
431 self._tasks_manager.stop() |
428 for thread in self._running_threads: |
432 for thread in self._running_threads: |
429 self.info('waiting thread %s...', thread.getName()) |
433 self.info('waiting thread %s...', thread.getName()) |
430 thread.join() |
434 thread.join() |
431 self.info('thread %s finished', thread.getName()) |
435 self.info('thread %s finished', thread.getName()) |
432 if not (self.config.creating or self.config.repairing |
436 if not (self.config.creating or self.config.repairing |