server/repository.py
changeset 8322 cb838b126b07
parent 8321 b5d5a5630649
child 8351 02f4f01375e8
equal deleted inserted replaced
8321:b5d5a5630649 8322:cb838b126b07
   140     entities and relations
   140     entities and relations
   141 
   141 
   142     XXX protect pyro access
   142     XXX protect pyro access
   143     """
   143     """
   144 
   144 
   145     def __init__(self, config, vreg=None):
   145     def __init__(self, config, tasks_manager=None, vreg=None):
   146         self.config = config
   146         self.config = config
   147         if vreg is None:
   147         if vreg is None:
   148             vreg = cwvreg.CWRegistryStore(config)
   148             vreg = cwvreg.CWRegistryStore(config)
   149         self.vreg = vreg
   149         self.vreg = vreg
   150         self._tasks_manager = utils.TasksManager()
   150         self._tasks_manager = tasks_manager
   151 
   151 
   152         self.pyro_registered = False
   152         self.pyro_registered = False
   153         self.pyro_uri = None
   153         self.pyro_uri = None
   154         self.app_instances_bus = NullEventBus()
   154         self.app_instances_bus = NullEventBus()
   155         self.info('starting repository from %s', self.config.apphome)
   155         self.info('starting repository from %s', self.config.apphome)
   366             self.hm.call_hooks('server_startup', repo=self)
   366             self.hm.call_hooks('server_startup', repo=self)
   367             # register a task to cleanup expired session
   367             # register a task to cleanup expired session
   368             self.cleanup_session_time = self.config['cleanup-session-time'] or 60 * 60 * 24
   368             self.cleanup_session_time = self.config['cleanup-session-time'] or 60 * 60 * 24
   369             assert self.cleanup_session_time > 0
   369             assert self.cleanup_session_time > 0
   370             cleanup_session_interval = min(60*60, self.cleanup_session_time / 3)
   370             cleanup_session_interval = min(60*60, self.cleanup_session_time / 3)
       
   371             assert self._tasks_manager is not None, "This Repository is not intended to be used as a server"
   371             self._tasks_manager.add_looping_task(cleanup_session_interval,
   372             self._tasks_manager.add_looping_task(cleanup_session_interval,
   372                                                  self.clean_sessions)
   373                                                  self.clean_sessions)
   373 
   374 
   374     def start_looping_tasks(self):
   375     def start_looping_tasks(self):
   375         """Actual "Repository as a server" startup.
   376         """Actual "Repository as a server" startup.
   380 
   381 
   381         XXX Other startup related stuffs are done elsewhere. In Repository
   382         XXX Other startup related stuffs are done elsewhere. In Repository
   382         XXX __init__ or in external codes (various server managers).
   383         XXX __init__ or in external codes (various server managers).
   383         """
   384         """
   384         self._prepare_startup()
   385         self._prepare_startup()
       
   386         assert self._tasks_manager is not None, "This Repository is not intended to be used as a server"
   385         self._tasks_manager.start()
   387         self._tasks_manager.start()
   386 
   388 
   387     def looping_task(self, interval, func, *args):
   389     def looping_task(self, interval, func, *args):
   388         """register a function to be called every `interval` seconds.
   390         """register a function to be called every `interval` seconds.
   389 
   391 
   390         looping tasks can only be registered during repository initialization,
   392         looping tasks can only be registered during repository initialization,
   391         once done this method will fail.
   393         once done this method will fail.
   392         """
   394         """
       
   395         assert self._tasks_manager is not None, "This Repository is not intended to be used as a server"
   393         self._tasks_manager.add_looping_task(interval, func, *args)
   396         self._tasks_manager.add_looping_task(interval, func, *args)
   394 
   397 
   395     def threaded_task(self, func):
   398     def threaded_task(self, func):
   396         """start function in a separated thread"""
   399         """start function in a separated thread"""
   397         utils.RepoThread(func, self._running_threads).start()
   400         utils.RepoThread(func, self._running_threads).start()
   422         connections
   425         connections
   423         """
   426         """
   424         assert not self.shutting_down, 'already shutting down'
   427         assert not self.shutting_down, 'already shutting down'
   425         self.shutting_down = True
   428         self.shutting_down = True
   426         self.system_source.shutdown()
   429         self.system_source.shutdown()
   427         self._tasks_manager.stop()
   430         if self._tasks_manager is not None:
       
   431             self._tasks_manager.stop()
   428         for thread in self._running_threads:
   432         for thread in self._running_threads:
   429             self.info('waiting thread %s...', thread.getName())
   433             self.info('waiting thread %s...', thread.getName())
   430             thread.join()
   434             thread.join()
   431             self.info('thread %s finished', thread.getName())
   435             self.info('thread %s finished', thread.getName())
   432         if not (self.config.creating or self.config.repairing
   436         if not (self.config.creating or self.config.repairing