[server] replace repository's tasks manager by a scheduler
Repository() does not accept anymore a 'tasks_manager' argument but rather a
'scheduler' argument which is expected to be an instance of sched.scheduler
class. The drop the _tasks_manager attribute of the repository and adjust all
internal usages of it. In particular, in the 'repo_stats' service we do not
export 'looping_tasks' statistics anymore as there's no way to retrieve this
anymore from a web instance.
Closes #17057223.
--- a/cubicweb/cwconfig.py Mon Mar 06 13:21:50 2017 +0100
+++ b/cubicweb/cwconfig.py Mon Mar 06 13:23:33 2017 +0100
@@ -1121,8 +1121,8 @@
def repository(self, vreg=None):
"""Return a new bootstrapped repository."""
from cubicweb.server.repository import Repository
- from cubicweb.server.utils import TasksManager
- repo = Repository(self, TasksManager(), vreg=vreg)
+ from cubicweb.server.utils import scheduler
+ repo = Repository(self, scheduler=scheduler(), vreg=vreg)
repo.bootstrap()
return repo
--- a/cubicweb/hooks/__init__.py Mon Mar 06 13:21:50 2017 +0100
+++ b/cubicweb/hooks/__init__.py Mon Mar 06 13:23:33 2017 +0100
@@ -29,7 +29,7 @@
events = ('server_startup',)
def __call__(self):
- if self.repo._tasks_manager is None:
+ if self.repo._scheduler is None:
return
# XXX use named args and inner functions to avoid referencing globals
# which may cause reloading pb
@@ -51,7 +51,7 @@
events = ('server_startup',)
def __call__(self):
- if self.repo._tasks_manager is None:
+ if self.repo._scheduler is None:
return
def update_feeds(repo):
# take a list to avoid iterating on a dictionary whose size may
@@ -75,7 +75,7 @@
events = ('server_startup',)
def __call__(self):
- if self.repo._tasks_manager is None:
+ if self.repo._scheduler is None:
return
def expire_dataimports(repo=self.repo):
for uri, source in repo.sources_by_uri.items():
--- a/cubicweb/server/repository.py Mon Mar 06 13:21:50 2017 +0100
+++ b/cubicweb/server/repository.py Mon Mar 06 13:23:33 2017 +0100
@@ -211,13 +211,13 @@
entities and relations
"""
- def __init__(self, config, tasks_manager=None, vreg=None):
+ def __init__(self, config, scheduler=None, vreg=None):
self.config = config
self.sources_by_eid = {}
if vreg is None:
vreg = cwvreg.CWRegistryStore(config)
self.vreg = vreg
- self._tasks_manager = tasks_manager
+ self._scheduler = scheduler
self.app_instances_bus = NullEventBus()
# dictionary of opened sessions
@@ -409,26 +409,29 @@
if not (self.config.creating or self.config.repairing
or self.config.quick_start):
# register a task to cleanup expired session
- if self._tasks_manager is not None:
+ if self._scheduler is not None:
self.cleanup_session_time = self.config['cleanup-session-time'] or 60 * 60 * 24
assert self.cleanup_session_time > 0
cleanup_session_interval = min(60 * 60, self.cleanup_session_time / 3)
self.looping_task(cleanup_session_interval, self.clean_sessions)
- def start_looping_tasks(self):
- """Actual "Repository as a server" startup.
+ def run_scheduler(self):
+ """Start repository scheduler after preparing the repository for that.
* trigger server startup hook,
* register session clean up task,
- * start all tasks.
+ * start the scheduler *and block*.
XXX Other startup related stuffs are done elsewhere. In Repository
XXX __init__ or in external codes (various server managers).
"""
self._prepare_startup()
- assert self._tasks_manager is not None,\
+ assert self._scheduler is not None, \
"This Repository is not intended to be used as a server"
- self._tasks_manager.start()
+ self.info(
+ 'starting repository scheduler with tasks: %s',
+ ', '.join(e.action.__name__ for e in self._scheduler.queue))
+ self._scheduler.run()
def looping_task(self, interval, func, *args):
"""register a function to be called every `interval` seconds.
@@ -436,9 +439,12 @@
looping tasks can only be registered during repository initialization,
once done this method will fail.
"""
- assert self._tasks_manager is not None,\
+ assert self._scheduler is not None, \
"This Repository is not intended to be used as a server"
- self._tasks_manager.add_looping_task(interval, func, *args)
+ event = utils.schedule_periodic_task(
+ self._scheduler, interval, func, *args)
+ self.info('scheduled periodic task %s (interval: %.2fs)',
+ event.action.__name__, interval)
def threaded_task(self, func):
"""start function in a separated thread"""
@@ -455,8 +461,6 @@
self.hm.call_hooks('before_server_shutdown', repo=self)
self.shutting_down = True
self.system_source.shutdown()
- if self._tasks_manager is not None:
- self._tasks_manager.stop()
if not (self.config.creating or self.config.repairing
or self.config.quick_start):
self.hm.call_hooks('server_shutdown', repo=self)
--- a/cubicweb/sobjects/services.py Mon Mar 06 13:21:50 2017 +0100
+++ b/cubicweb/sobjects/services.py Mon Mar 06 13:23:33 2017 +0100
@@ -52,8 +52,6 @@
results['sql_no_cache'] = repo.system_source.no_cache
results['nb_open_sessions'] = len(repo._sessions)
results['nb_active_threads'] = threading.activeCount()
- looping_tasks = repo._tasks_manager._looping_tasks
- results['looping_tasks'] = [(t.name, t.interval) for t in looping_tasks]
results['available_cnxsets'] = repo.cnxsets.qsize()
results['threads'] = [t.name for t in threading.enumerate()]
return results
--- a/cubicweb/web/views/debug.py Mon Mar 06 13:21:50 2017 +0100
+++ b/cubicweb/web/views/debug.py Mon Mar 06 13:23:33 2017 +0100
@@ -91,7 +91,6 @@
w(u'<h2>%s</h2>' % _('Repository'))
w(u'<h3>%s</h3>' % _('resources usage'))
stats = self._cw.call_service('repo_stats')
- stats['looping_tasks'] = ', '.join('%s (%s seconds)' % (n, i) for n, i in stats['looping_tasks'])
stats['threads'] = ', '.join(sorted(stats['threads']))
for k in stats:
if k == 'type_cache_size':
--- a/cubicweb/web/views/management.py Mon Mar 06 13:21:50 2017 +0100
+++ b/cubicweb/web/views/management.py Mon Mar 06 13:23:33 2017 +0100
@@ -188,7 +188,6 @@
def call(self):
stats = self._cw.call_service('repo_stats')
- stats['looping_tasks'] = ', '.join('%s (%s seconds)' % (n, i) for n, i in stats['looping_tasks'])
stats['threads'] = ', '.join(sorted(stats['threads']))
for k in stats:
if k in ('extid_cache_size', 'type_source_cache_size'):