--- a/server/repository.py Fri Mar 16 14:23:58 2012 +0100
+++ b/server/repository.py Tue Mar 20 18:24:35 2012 +0100
@@ -120,6 +120,7 @@
{'x': eidfrom, 'y': eidto})
+
class NullEventBus(object):
def publish(self, msg):
pass
@@ -146,14 +147,17 @@
if vreg is None:
vreg = cwvreg.CWRegistryStore(config)
self.vreg = vreg
+ self._tasks_manager = utils.TasksManager()
+
self.pyro_registered = False
self.pyro_uri = None
self.app_instances_bus = NullEventBus()
self.info('starting repository from %s', self.config.apphome)
# dictionary of opened sessions
self._sessions = {}
+
+
# list of functions to be called at regular interval
- self._looping_tasks = []
# list of running threads
self._running_threads = []
# initial schema, should be build or replaced latter
@@ -359,14 +363,7 @@
assert self.cleanup_session_time > 0
cleanup_session_interval = min(60*60, self.cleanup_session_time / 3)
self.looping_task(cleanup_session_interval, self.clean_sessions)
- assert isinstance(self._looping_tasks, list), 'already started'
- for i, (interval, func, args) in enumerate(self._looping_tasks):
- self._looping_tasks[i] = task = utils.LoopTask(self, interval, func, args)
- self.info('starting task %s with interval %.2fs', task.name,
- interval)
- task.start()
- # ensure no tasks will be further added
- self._looping_tasks = tuple(self._looping_tasks)
+ self._tasks_manager.start()
def looping_task(self, interval, func, *args):
"""register a function to be called every `interval` seconds.
@@ -374,15 +371,11 @@
looping tasks can only be registered during repository initialization,
once done this method will fail.
"""
- try:
- self._looping_tasks.append( (interval, func, args) )
- except AttributeError:
- raise RuntimeError("can't add looping task once the repository is started")
+ self._tasks_manager.add_looping_task(interval, func, *args)
def threaded_task(self, func):
"""start function in a separated thread"""
- t = utils.RepoThread(func, self._running_threads)
- t.start()
+ utils.RepoThread(func, self._running_threads).start()
#@locked
def _get_cnxset(self):
@@ -412,12 +405,7 @@
assert not self.shutting_down, 'already shutting down'
self.shutting_down = True
self.system_source.shutdown()
- if isinstance(self._looping_tasks, tuple): # if tasks have been started
- for looptask in self._looping_tasks:
- self.info('canceling task %s...', looptask.name)
- looptask.cancel()
- looptask.join()
- self.info('task %s finished', looptask.name)
+ self._tasks_manager.stop()
for thread in self._running_threads:
self.info('waiting thread %s...', thread.getName())
thread.join()
@@ -517,7 +505,8 @@
results['sql_no_cache'] = self.system_source.no_cache
results['nb_open_sessions'] = len(self._sessions)
results['nb_active_threads'] = threading.activeCount()
- results['looping_tasks'] = ', '.join(str(t) for t in self._looping_tasks)
+ looping_tasks = self._tasks_manager._looping_tasks
+ results['looping_tasks'] = ', '.join(str(t) for t in looping_tasks)
results['available_cnxsets'] = self._cnxsets_pool.qsize()
results['threads'] = ', '.join(sorted(str(t) for t in threading.enumerate()))
return results