[server] introduce a scheduler class to run repository "looping tasks"
We just use the sched module from the standard library and introduce a tiny
Python2/3 compatibility layer (more for convenience actually). The "looping"
aspect of tasks (previously in LoopTask class) is re-implemeted as a
`schedule_periodic_task` function. This is a reasonably thin layer as compared
to LoopTask/TasksManager classes. Only the "restart" aspect of LoopTask is no
longer present as I'm not sure it's worth keeping.
The advantage of using this (in addition to eventually dropping our custom
code) is that this scheduler class provides a `run` method that blocks the
process while running tasks in its queue. So we can rely on this to have a
'scheduler' ctl command (see forthcoming patch) that would only run "looping
tasks" without having to implement the "blocking" aspect ourself.
Related to #17057223.
--- a/cubicweb/server/test/unittest_utils.py Thu Feb 16 11:15:23 2017 +0100
+++ b/cubicweb/server/test/unittest_utils.py Mon Mar 06 13:21:50 2017 +0100
@@ -17,12 +17,11 @@
# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
"""Tests for cubicweb.server.utils module."""
-from unittest import TestCase
-
+from cubicweb.devtools import testlib
from cubicweb.server import utils
-class UtilsTC(TestCase):
+class UtilsTC(testlib.BaseTestCase):
def test_crypt(self):
for hash in (
@@ -40,6 +39,29 @@
self.assertEqual(utils.crypt_password('xxx', ''), '')
self.assertEqual(utils.crypt_password('yyy', ''), '')
+ def test_schedule_periodic_task(self):
+ scheduler = utils.scheduler()
+ this = []
+
+ def fill_this(x):
+ this.append(x)
+ if len(this) > 2:
+ raise SystemExit()
+ elif len(this) > 1:
+ raise RuntimeError()
+
+ event = utils.schedule_periodic_task(scheduler, 0.01, fill_this, 1)
+ self.assertEqual(event.action.__name__, 'fill_this')
+ self.assertEqual(len(scheduler.queue), 1)
+
+ with self.assertLogs('cubicweb.scheduler', level='ERROR') as cm:
+ scheduler.run()
+ self.assertEqual(this, [1] * 3)
+ self.assertEqual(len(cm.output), 2)
+ self.assertIn('Unhandled exception in periodic task "fill_this"',
+ cm.output[0])
+ self.assertIn('"fill_this" not re-scheduled', cm.output[1])
+
if __name__ == '__main__':
import unittest
--- a/cubicweb/server/utils.py Thu Feb 16 11:15:23 2017 +0100
+++ b/cubicweb/server/utils.py Mon Mar 06 13:21:50 2017 +0100
@@ -19,7 +19,8 @@
from __future__ import print_function
-
+from functools import wraps
+import sched
import sys
import logging
from threading import Timer, Thread
@@ -113,6 +114,48 @@
return user, passwd
+if PY2:
+ import time # noqa
+
+ class scheduler(sched.scheduler):
+ """Python2 version of sched.scheduler that matches Python3 API."""
+
+ def __init__(self, **kwargs):
+ kwargs.setdefault('timefunc', time.time)
+ kwargs.setdefault('delayfunc', time.sleep)
+ # sched.scheduler is an old-style class.
+ sched.scheduler.__init__(self, **kwargs)
+
+else:
+ scheduler = sched.scheduler
+
+
+def schedule_periodic_task(scheduler, interval, func, *args):
+ """Enter a task with `func(*args)` as a periodic event in `scheduler`
+ executing at `interval` seconds. Once executed, the task would re-schedule
+ itself unless a BaseException got raised.
+ """
+ @wraps(func)
+ def task(*args):
+ restart = True
+ try:
+ func(*args)
+ except Exception:
+ logger = logging.getLogger('cubicweb.scheduler')
+ logger.exception('Unhandled exception in periodic task "%s"',
+ func.__name__)
+ except BaseException as exc:
+ logger = logging.getLogger('cubicweb.scheduler')
+ logger.error('periodic task "%s" not re-scheduled due to %r',
+ func.__name__, exc)
+ restart = False
+ finally:
+ if restart:
+ scheduler.enter(interval, 1, task, argument=args)
+
+ return scheduler.enter(interval, 1, task, argument=args)
+
+
_MARKER = object()
def func_name(func):
name = getattr(func, '__name__', _MARKER)