[server] replace repository's tasks manager by a scheduler
authorDenis Laxalde <denis.laxalde@logilab.fr>
Mon, 06 Mar 2017 13:23:33 +0100
changeset 12012 f7ff5217a02f
parent 12011 d2888fee6031
child 12013 7b975655d0ae
[server] replace repository's tasks manager by a scheduler Repository() does not accept anymore a 'tasks_manager' argument but rather a 'scheduler' argument which is expected to be an instance of sched.scheduler class. The drop the _tasks_manager attribute of the repository and adjust all internal usages of it. In particular, in the 'repo_stats' service we do not export 'looping_tasks' statistics anymore as there's no way to retrieve this anymore from a web instance. Closes #17057223.
cubicweb/cwconfig.py
cubicweb/hooks/__init__.py
cubicweb/server/repository.py
cubicweb/sobjects/services.py
cubicweb/web/views/debug.py
cubicweb/web/views/management.py
--- a/cubicweb/cwconfig.py	Mon Mar 06 13:21:50 2017 +0100
+++ b/cubicweb/cwconfig.py	Mon Mar 06 13:23:33 2017 +0100
@@ -1121,8 +1121,8 @@
     def repository(self, vreg=None):
         """Return a new bootstrapped repository."""
         from cubicweb.server.repository import Repository
-        from cubicweb.server.utils import TasksManager
-        repo = Repository(self, TasksManager(), vreg=vreg)
+        from cubicweb.server.utils import scheduler
+        repo = Repository(self, scheduler=scheduler(), vreg=vreg)
         repo.bootstrap()
         return repo
 
--- a/cubicweb/hooks/__init__.py	Mon Mar 06 13:21:50 2017 +0100
+++ b/cubicweb/hooks/__init__.py	Mon Mar 06 13:23:33 2017 +0100
@@ -29,7 +29,7 @@
     events = ('server_startup',)
 
     def __call__(self):
-        if self.repo._tasks_manager is None:
+        if self.repo._scheduler is None:
             return
         # XXX use named args and inner functions to avoid referencing globals
         # which may cause reloading pb
@@ -51,7 +51,7 @@
     events = ('server_startup',)
 
     def __call__(self):
-        if self.repo._tasks_manager is None:
+        if self.repo._scheduler is None:
             return
         def update_feeds(repo):
             # take a list to avoid iterating on a dictionary whose size may
@@ -75,7 +75,7 @@
     events = ('server_startup',)
 
     def __call__(self):
-        if self.repo._tasks_manager is None:
+        if self.repo._scheduler is None:
             return
         def expire_dataimports(repo=self.repo):
             for uri, source in repo.sources_by_uri.items():
--- a/cubicweb/server/repository.py	Mon Mar 06 13:21:50 2017 +0100
+++ b/cubicweb/server/repository.py	Mon Mar 06 13:23:33 2017 +0100
@@ -211,13 +211,13 @@
     entities and relations
     """
 
-    def __init__(self, config, tasks_manager=None, vreg=None):
+    def __init__(self, config, scheduler=None, vreg=None):
         self.config = config
         self.sources_by_eid = {}
         if vreg is None:
             vreg = cwvreg.CWRegistryStore(config)
         self.vreg = vreg
-        self._tasks_manager = tasks_manager
+        self._scheduler = scheduler
 
         self.app_instances_bus = NullEventBus()
         # dictionary of opened sessions
@@ -409,26 +409,29 @@
         if not (self.config.creating or self.config.repairing
                 or self.config.quick_start):
             # register a task to cleanup expired session
-            if self._tasks_manager is not None:
+            if self._scheduler is not None:
                 self.cleanup_session_time = self.config['cleanup-session-time'] or 60 * 60 * 24
                 assert self.cleanup_session_time > 0
                 cleanup_session_interval = min(60 * 60, self.cleanup_session_time / 3)
                 self.looping_task(cleanup_session_interval, self.clean_sessions)
 
-    def start_looping_tasks(self):
-        """Actual "Repository as a server" startup.
+    def run_scheduler(self):
+        """Start repository scheduler after preparing the repository for that.
 
         * trigger server startup hook,
         * register session clean up task,
-        * start all tasks.
+        * start the scheduler *and block*.
 
         XXX Other startup related stuffs are done elsewhere. In Repository
         XXX __init__ or in external codes (various server managers).
         """
         self._prepare_startup()
-        assert self._tasks_manager is not None,\
+        assert self._scheduler is not None, \
             "This Repository is not intended to be used as a server"
-        self._tasks_manager.start()
+        self.info(
+            'starting repository scheduler with tasks: %s',
+            ', '.join(e.action.__name__ for e in self._scheduler.queue))
+        self._scheduler.run()
 
     def looping_task(self, interval, func, *args):
         """register a function to be called every `interval` seconds.
@@ -436,9 +439,12 @@
         looping tasks can only be registered during repository initialization,
         once done this method will fail.
         """
-        assert self._tasks_manager is not None,\
+        assert self._scheduler is not None, \
             "This Repository is not intended to be used as a server"
-        self._tasks_manager.add_looping_task(interval, func, *args)
+        event = utils.schedule_periodic_task(
+            self._scheduler, interval, func, *args)
+        self.info('scheduled periodic task %s (interval: %.2fs)',
+                  event.action.__name__, interval)
 
     def threaded_task(self, func):
         """start function in a separated thread"""
@@ -455,8 +461,6 @@
             self.hm.call_hooks('before_server_shutdown', repo=self)
         self.shutting_down = True
         self.system_source.shutdown()
-        if self._tasks_manager is not None:
-            self._tasks_manager.stop()
         if not (self.config.creating or self.config.repairing
                 or self.config.quick_start):
             self.hm.call_hooks('server_shutdown', repo=self)
--- a/cubicweb/sobjects/services.py	Mon Mar 06 13:21:50 2017 +0100
+++ b/cubicweb/sobjects/services.py	Mon Mar 06 13:23:33 2017 +0100
@@ -52,8 +52,6 @@
         results['sql_no_cache'] = repo.system_source.no_cache
         results['nb_open_sessions'] = len(repo._sessions)
         results['nb_active_threads'] = threading.activeCount()
-        looping_tasks = repo._tasks_manager._looping_tasks
-        results['looping_tasks'] = [(t.name, t.interval) for t in looping_tasks]
         results['available_cnxsets'] = repo.cnxsets.qsize()
         results['threads'] = [t.name for t in threading.enumerate()]
         return results
--- a/cubicweb/web/views/debug.py	Mon Mar 06 13:21:50 2017 +0100
+++ b/cubicweb/web/views/debug.py	Mon Mar 06 13:23:33 2017 +0100
@@ -91,7 +91,6 @@
         w(u'<h2>%s</h2>' % _('Repository'))
         w(u'<h3>%s</h3>' % _('resources usage'))
         stats = self._cw.call_service('repo_stats')
-        stats['looping_tasks'] = ', '.join('%s (%s seconds)' % (n, i) for n, i in stats['looping_tasks'])
         stats['threads'] = ', '.join(sorted(stats['threads']))
         for k in stats:
             if k == 'type_cache_size':
--- a/cubicweb/web/views/management.py	Mon Mar 06 13:21:50 2017 +0100
+++ b/cubicweb/web/views/management.py	Mon Mar 06 13:23:33 2017 +0100
@@ -188,7 +188,6 @@
 
     def call(self):
         stats = self._cw.call_service('repo_stats')
-        stats['looping_tasks'] = ', '.join('%s (%s seconds)' % (n, i) for n, i in stats['looping_tasks'])
         stats['threads'] = ', '.join(sorted(stats['threads']))
         for k in stats:
             if k in ('extid_cache_size', 'type_source_cache_size'):