[repo looping task] move looping task logic in a dedicated object (progress #2204047)
authorPierre-Yves David <pierre-yves.david@logilab.fr>
Tue, 20 Mar 2012 18:24:35 +0100
changeset 8320 cd2d332b3063
parent 8318 e8a2fd7d9606
child 8321 b5d5a5630649
[repo looping task] move looping task logic in a dedicated object (progress #2204047)
server/repository.py
server/utils.py
--- a/server/repository.py	Fri Mar 16 14:23:58 2012 +0100
+++ b/server/repository.py	Tue Mar 20 18:24:35 2012 +0100
@@ -120,6 +120,7 @@
                             {'x': eidfrom, 'y': eidto})
 
 
+
 class NullEventBus(object):
     def publish(self, msg):
         pass
@@ -146,14 +147,17 @@
         if vreg is None:
             vreg = cwvreg.CWRegistryStore(config)
         self.vreg = vreg
+        self._tasks_manager = utils.TasksManager()
+
         self.pyro_registered = False
         self.pyro_uri = None
         self.app_instances_bus = NullEventBus()
         self.info('starting repository from %s', self.config.apphome)
         # dictionary of opened sessions
         self._sessions = {}
+
+
         # list of functions to be called at regular interval
-        self._looping_tasks = []
         # list of running threads
         self._running_threads = []
         # initial schema, should be build or replaced latter
@@ -359,14 +363,7 @@
             assert self.cleanup_session_time > 0
             cleanup_session_interval = min(60*60, self.cleanup_session_time / 3)
             self.looping_task(cleanup_session_interval, self.clean_sessions)
-        assert isinstance(self._looping_tasks, list), 'already started'
-        for i, (interval, func, args) in enumerate(self._looping_tasks):
-            self._looping_tasks[i] = task = utils.LoopTask(self, interval, func, args)
-            self.info('starting task %s with interval %.2fs', task.name,
-                      interval)
-            task.start()
-        # ensure no tasks will be further added
-        self._looping_tasks = tuple(self._looping_tasks)
+        self._tasks_manager.start()
 
     def looping_task(self, interval, func, *args):
         """register a function to be called every `interval` seconds.
@@ -374,15 +371,11 @@
         looping tasks can only be registered during repository initialization,
         once done this method will fail.
         """
-        try:
-            self._looping_tasks.append( (interval, func, args) )
-        except AttributeError:
-            raise RuntimeError("can't add looping task once the repository is started")
+        self._tasks_manager.add_looping_task(interval, func, *args)
 
     def threaded_task(self, func):
         """start function in a separated thread"""
-        t = utils.RepoThread(func, self._running_threads)
-        t.start()
+        utils.RepoThread(func, self._running_threads).start()
 
     #@locked
     def _get_cnxset(self):
@@ -412,12 +405,7 @@
         assert not self.shutting_down, 'already shutting down'
         self.shutting_down = True
         self.system_source.shutdown()
-        if isinstance(self._looping_tasks, tuple): # if tasks have been started
-            for looptask in self._looping_tasks:
-                self.info('canceling task %s...', looptask.name)
-                looptask.cancel()
-                looptask.join()
-                self.info('task %s finished', looptask.name)
+        self._tasks_manager.stop()
         for thread in self._running_threads:
             self.info('waiting thread %s...', thread.getName())
             thread.join()
@@ -517,7 +505,8 @@
         results['sql_no_cache'] = self.system_source.no_cache
         results['nb_open_sessions'] = len(self._sessions)
         results['nb_active_threads'] = threading.activeCount()
-        results['looping_tasks'] = ', '.join(str(t) for t in self._looping_tasks)
+        looping_tasks = self._tasks_manager._looping_tasks
+        results['looping_tasks'] = ', '.join(str(t) for t in looping_tasks)
         results['available_cnxsets'] = self._cnxsets_pool.qsize()
         results['threads'] = ', '.join(sorted(str(t) for t in threading.enumerate()))
         return results
--- a/server/utils.py	Fri Mar 16 14:23:58 2012 +0100
+++ b/server/utils.py	Tue Mar 20 18:24:35 2012 +0100
@@ -122,12 +122,12 @@
 
 class LoopTask(object):
     """threaded task restarting itself once executed"""
-    def __init__(self, repo, interval, func, args):
+    def __init__(self, tasks_manager, interval, func, args):
         if interval <= 0:
             raise ValueError('Loop task interval must be > 0 '
                              '(current value: %f for %s)' % \
                              (interval, func_name(func)))
-        self.repo = repo
+        self._tasks_manager = tasks_manager
         self.interval = interval
         def auto_restart_func(self=self, func=func, args=args):
             restart = True
@@ -140,7 +140,7 @@
             except BaseException:
                 restart = False
             finally:
-                if restart and not self.repo.shutting_down:
+                if restart and tasks_manager.running:
                     self.start()
         self.func = auto_restart_func
         self.name = func_name(func)
@@ -186,3 +186,52 @@
 
     def getName(self):
         return '%s(%s)' % (self._name, Thread.getName(self))
+
+class TasksManager(object):
+    """Object dedicated manage background task"""
+
+    def __init__(self):
+        self.running = False
+        self._tasks = []
+        self._looping_tasks = []
+
+    def add_looping_task(self, interval, func, *args):
+        """register a function to be called every `interval` seconds.
+
+        looping tasks can only be registered during repository initialization,
+        once done this method will fail.
+        """
+        if self.running:
+            raise RuntimeError("can't add looping task once the repository is started")
+        self._tasks.append( (interval, func, args) )
+
+    def start(self):
+        """Start running looping task"""
+        assert self.running == False # bw compat purpose maintly
+
+        while self._tasks:
+            interval, func, args = self._tasks.pop()
+            task = LoopTask(self, interval, func, args)
+            self._looping_tasks.append(task)
+            self.info('starting task %s with interval %.2fs', task.name,
+                      interval)
+            task.start()
+
+        self.running = True
+
+    def stop(self):
+        """Stop all running task.
+
+        returns when all task have been cancel and none are running anymore"""
+        if self.running:
+            while self._looping_tasks:
+                looptask = self._looping_tasks.pop()
+                self.info('canceling task %s...', looptask.name)
+                looptask.cancel()
+                looptask.join()
+                self.info('task %s finished', looptask.name)
+
+from logging import getLogger
+from cubicweb import set_log_methods
+set_log_methods(TasksManager, getLogger('cubicweb.repository'))
+