# HG changeset patch # User Pierre-Yves David # Date 1332264275 -3600 # Node ID cd2d332b3063aec12b00e745c4170d4fc49940c3 # Parent e8a2fd7d9606545f8588ab14e8d8c6d9f1431830 [repo looping task] move looping task logic in a dedicated object (progress #2204047) diff -r e8a2fd7d9606 -r cd2d332b3063 server/repository.py --- a/server/repository.py Fri Mar 16 14:23:58 2012 +0100 +++ b/server/repository.py Tue Mar 20 18:24:35 2012 +0100 @@ -120,6 +120,7 @@ {'x': eidfrom, 'y': eidto}) + class NullEventBus(object): def publish(self, msg): pass @@ -146,14 +147,17 @@ if vreg is None: vreg = cwvreg.CWRegistryStore(config) self.vreg = vreg + self._tasks_manager = utils.TasksManager() + self.pyro_registered = False self.pyro_uri = None self.app_instances_bus = NullEventBus() self.info('starting repository from %s', self.config.apphome) # dictionary of opened sessions self._sessions = {} + + # list of functions to be called at regular interval - self._looping_tasks = [] # list of running threads self._running_threads = [] # initial schema, should be build or replaced latter @@ -359,14 +363,7 @@ assert self.cleanup_session_time > 0 cleanup_session_interval = min(60*60, self.cleanup_session_time / 3) self.looping_task(cleanup_session_interval, self.clean_sessions) - assert isinstance(self._looping_tasks, list), 'already started' - for i, (interval, func, args) in enumerate(self._looping_tasks): - self._looping_tasks[i] = task = utils.LoopTask(self, interval, func, args) - self.info('starting task %s with interval %.2fs', task.name, - interval) - task.start() - # ensure no tasks will be further added - self._looping_tasks = tuple(self._looping_tasks) + self._tasks_manager.start() def looping_task(self, interval, func, *args): """register a function to be called every `interval` seconds. @@ -374,15 +371,11 @@ looping tasks can only be registered during repository initialization, once done this method will fail. """ - try: - self._looping_tasks.append( (interval, func, args) ) - except AttributeError: - raise RuntimeError("can't add looping task once the repository is started") + self._tasks_manager.add_looping_task(interval, func, *args) def threaded_task(self, func): """start function in a separated thread""" - t = utils.RepoThread(func, self._running_threads) - t.start() + utils.RepoThread(func, self._running_threads).start() #@locked def _get_cnxset(self): @@ -412,12 +405,7 @@ assert not self.shutting_down, 'already shutting down' self.shutting_down = True self.system_source.shutdown() - if isinstance(self._looping_tasks, tuple): # if tasks have been started - for looptask in self._looping_tasks: - self.info('canceling task %s...', looptask.name) - looptask.cancel() - looptask.join() - self.info('task %s finished', looptask.name) + self._tasks_manager.stop() for thread in self._running_threads: self.info('waiting thread %s...', thread.getName()) thread.join() @@ -517,7 +505,8 @@ results['sql_no_cache'] = self.system_source.no_cache results['nb_open_sessions'] = len(self._sessions) results['nb_active_threads'] = threading.activeCount() - results['looping_tasks'] = ', '.join(str(t) for t in self._looping_tasks) + looping_tasks = self._tasks_manager._looping_tasks + results['looping_tasks'] = ', '.join(str(t) for t in looping_tasks) results['available_cnxsets'] = self._cnxsets_pool.qsize() results['threads'] = ', '.join(sorted(str(t) for t in threading.enumerate())) return results diff -r e8a2fd7d9606 -r cd2d332b3063 server/utils.py --- a/server/utils.py Fri Mar 16 14:23:58 2012 +0100 +++ b/server/utils.py Tue Mar 20 18:24:35 2012 +0100 @@ -122,12 +122,12 @@ class LoopTask(object): """threaded task restarting itself once executed""" - def __init__(self, repo, interval, func, args): + def __init__(self, tasks_manager, interval, func, args): if interval <= 0: raise ValueError('Loop task interval must be > 0 ' '(current value: %f for %s)' % \ (interval, func_name(func))) - self.repo = repo + self._tasks_manager = tasks_manager self.interval = interval def auto_restart_func(self=self, func=func, args=args): restart = True @@ -140,7 +140,7 @@ except BaseException: restart = False finally: - if restart and not self.repo.shutting_down: + if restart and tasks_manager.running: self.start() self.func = auto_restart_func self.name = func_name(func) @@ -186,3 +186,52 @@ def getName(self): return '%s(%s)' % (self._name, Thread.getName(self)) + +class TasksManager(object): + """Object dedicated manage background task""" + + def __init__(self): + self.running = False + self._tasks = [] + self._looping_tasks = [] + + def add_looping_task(self, interval, func, *args): + """register a function to be called every `interval` seconds. + + looping tasks can only be registered during repository initialization, + once done this method will fail. + """ + if self.running: + raise RuntimeError("can't add looping task once the repository is started") + self._tasks.append( (interval, func, args) ) + + def start(self): + """Start running looping task""" + assert self.running == False # bw compat purpose maintly + + while self._tasks: + interval, func, args = self._tasks.pop() + task = LoopTask(self, interval, func, args) + self._looping_tasks.append(task) + self.info('starting task %s with interval %.2fs', task.name, + interval) + task.start() + + self.running = True + + def stop(self): + """Stop all running task. + + returns when all task have been cancel and none are running anymore""" + if self.running: + while self._looping_tasks: + looptask = self._looping_tasks.pop() + self.info('canceling task %s...', looptask.name) + looptask.cancel() + looptask.join() + self.info('task %s finished', looptask.name) + +from logging import getLogger +from cubicweb import set_log_methods +set_log_methods(TasksManager, getLogger('cubicweb.repository')) +