cubicweb/server/repository.py
changeset 12012 f7ff5217a02f
parent 11995 7d56ae4aa0ee
child 12016 88ed82a25f8a
equal deleted inserted replaced
12011:d2888fee6031 12012:f7ff5217a02f
   209 class Repository(object):
   209 class Repository(object):
   210     """a repository provides access to a set of persistent storages for
   210     """a repository provides access to a set of persistent storages for
   211     entities and relations
   211     entities and relations
   212     """
   212     """
   213 
   213 
   214     def __init__(self, config, tasks_manager=None, vreg=None):
   214     def __init__(self, config, scheduler=None, vreg=None):
   215         self.config = config
   215         self.config = config
   216         self.sources_by_eid = {}
   216         self.sources_by_eid = {}
   217         if vreg is None:
   217         if vreg is None:
   218             vreg = cwvreg.CWRegistryStore(config)
   218             vreg = cwvreg.CWRegistryStore(config)
   219         self.vreg = vreg
   219         self.vreg = vreg
   220         self._tasks_manager = tasks_manager
   220         self._scheduler = scheduler
   221 
   221 
   222         self.app_instances_bus = NullEventBus()
   222         self.app_instances_bus = NullEventBus()
   223         # dictionary of opened sessions
   223         # dictionary of opened sessions
   224         self._sessions = {}
   224         self._sessions = {}
   225 
   225 
   407         * register session clean up task.
   407         * register session clean up task.
   408         """
   408         """
   409         if not (self.config.creating or self.config.repairing
   409         if not (self.config.creating or self.config.repairing
   410                 or self.config.quick_start):
   410                 or self.config.quick_start):
   411             # register a task to cleanup expired session
   411             # register a task to cleanup expired session
   412             if self._tasks_manager is not None:
   412             if self._scheduler is not None:
   413                 self.cleanup_session_time = self.config['cleanup-session-time'] or 60 * 60 * 24
   413                 self.cleanup_session_time = self.config['cleanup-session-time'] or 60 * 60 * 24
   414                 assert self.cleanup_session_time > 0
   414                 assert self.cleanup_session_time > 0
   415                 cleanup_session_interval = min(60 * 60, self.cleanup_session_time / 3)
   415                 cleanup_session_interval = min(60 * 60, self.cleanup_session_time / 3)
   416                 self.looping_task(cleanup_session_interval, self.clean_sessions)
   416                 self.looping_task(cleanup_session_interval, self.clean_sessions)
   417 
   417 
   418     def start_looping_tasks(self):
   418     def run_scheduler(self):
   419         """Actual "Repository as a server" startup.
   419         """Start repository scheduler after preparing the repository for that.
   420 
   420 
   421         * trigger server startup hook,
   421         * trigger server startup hook,
   422         * register session clean up task,
   422         * register session clean up task,
   423         * start all tasks.
   423         * start the scheduler *and block*.
   424 
   424 
   425         XXX Other startup related stuffs are done elsewhere. In Repository
   425         XXX Other startup related stuffs are done elsewhere. In Repository
   426         XXX __init__ or in external codes (various server managers).
   426         XXX __init__ or in external codes (various server managers).
   427         """
   427         """
   428         self._prepare_startup()
   428         self._prepare_startup()
   429         assert self._tasks_manager is not None,\
   429         assert self._scheduler is not None, \
   430             "This Repository is not intended to be used as a server"
   430             "This Repository is not intended to be used as a server"
   431         self._tasks_manager.start()
   431         self.info(
       
   432             'starting repository scheduler with tasks: %s',
       
   433             ', '.join(e.action.__name__ for e in self._scheduler.queue))
       
   434         self._scheduler.run()
   432 
   435 
   433     def looping_task(self, interval, func, *args):
   436     def looping_task(self, interval, func, *args):
   434         """register a function to be called every `interval` seconds.
   437         """register a function to be called every `interval` seconds.
   435 
   438 
   436         looping tasks can only be registered during repository initialization,
   439         looping tasks can only be registered during repository initialization,
   437         once done this method will fail.
   440         once done this method will fail.
   438         """
   441         """
   439         assert self._tasks_manager is not None,\
   442         assert self._scheduler is not None, \
   440             "This Repository is not intended to be used as a server"
   443             "This Repository is not intended to be used as a server"
   441         self._tasks_manager.add_looping_task(interval, func, *args)
   444         event = utils.schedule_periodic_task(
       
   445             self._scheduler, interval, func, *args)
       
   446         self.info('scheduled periodic task %s (interval: %.2fs)',
       
   447                   event.action.__name__, interval)
   442 
   448 
   443     def threaded_task(self, func):
   449     def threaded_task(self, func):
   444         """start function in a separated thread"""
   450         """start function in a separated thread"""
   445         utils.RepoThread(func, self._running_threads).start()
   451         utils.RepoThread(func, self._running_threads).start()
   446 
   452 
   453                 or self.config.quick_start):
   459                 or self.config.quick_start):
   454             # then, the system source is still available
   460             # then, the system source is still available
   455             self.hm.call_hooks('before_server_shutdown', repo=self)
   461             self.hm.call_hooks('before_server_shutdown', repo=self)
   456         self.shutting_down = True
   462         self.shutting_down = True
   457         self.system_source.shutdown()
   463         self.system_source.shutdown()
   458         if self._tasks_manager is not None:
       
   459             self._tasks_manager.stop()
       
   460         if not (self.config.creating or self.config.repairing
   464         if not (self.config.creating or self.config.repairing
   461                 or self.config.quick_start):
   465                 or self.config.quick_start):
   462             self.hm.call_hooks('server_shutdown', repo=self)
   466             self.hm.call_hooks('server_shutdown', repo=self)
   463         for thread in self._running_threads:
   467         for thread in self._running_threads:
   464             self.info('waiting thread %s...', thread.getName())
   468             self.info('waiting thread %s...', thread.getName())