[server] introduce a scheduler class to run repository "looping tasks"
authorDenis Laxalde <denis.laxalde@logilab.fr>
Mon, 06 Mar 2017 13:21:50 +0100
changeset 12011 d2888fee6031
parent 12010 c34590161082
child 12012 f7ff5217a02f
[server] introduce a scheduler class to run repository "looping tasks" We just use the sched module from the standard library and introduce a tiny Python2/3 compatibility layer (more for convenience actually). The "looping" aspect of tasks (previously in LoopTask class) is re-implemeted as a `schedule_periodic_task` function. This is a reasonably thin layer as compared to LoopTask/TasksManager classes. Only the "restart" aspect of LoopTask is no longer present as I'm not sure it's worth keeping. The advantage of using this (in addition to eventually dropping our custom code) is that this scheduler class provides a `run` method that blocks the process while running tasks in its queue. So we can rely on this to have a 'scheduler' ctl command (see forthcoming patch) that would only run "looping tasks" without having to implement the "blocking" aspect ourself. Related to #17057223.
cubicweb/server/test/unittest_utils.py
cubicweb/server/utils.py
--- a/cubicweb/server/test/unittest_utils.py	Thu Feb 16 11:15:23 2017 +0100
+++ b/cubicweb/server/test/unittest_utils.py	Mon Mar 06 13:21:50 2017 +0100
@@ -17,12 +17,11 @@
 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
 """Tests for cubicweb.server.utils module."""
 
-from unittest import TestCase
-
+from cubicweb.devtools import testlib
 from cubicweb.server import utils
 
 
-class UtilsTC(TestCase):
+class UtilsTC(testlib.BaseTestCase):
 
     def test_crypt(self):
         for hash in (
@@ -40,6 +39,29 @@
         self.assertEqual(utils.crypt_password('xxx', ''), '')
         self.assertEqual(utils.crypt_password('yyy', ''), '')
 
+    def test_schedule_periodic_task(self):
+        scheduler = utils.scheduler()
+        this = []
+
+        def fill_this(x):
+            this.append(x)
+            if len(this) > 2:
+                raise SystemExit()
+            elif len(this) > 1:
+                raise RuntimeError()
+
+        event = utils.schedule_periodic_task(scheduler, 0.01, fill_this, 1)
+        self.assertEqual(event.action.__name__, 'fill_this')
+        self.assertEqual(len(scheduler.queue), 1)
+
+        with self.assertLogs('cubicweb.scheduler', level='ERROR') as cm:
+            scheduler.run()
+        self.assertEqual(this, [1] * 3)
+        self.assertEqual(len(cm.output), 2)
+        self.assertIn('Unhandled exception in periodic task "fill_this"',
+                      cm.output[0])
+        self.assertIn('"fill_this" not re-scheduled', cm.output[1])
+
 
 if __name__ == '__main__':
     import unittest
--- a/cubicweb/server/utils.py	Thu Feb 16 11:15:23 2017 +0100
+++ b/cubicweb/server/utils.py	Mon Mar 06 13:21:50 2017 +0100
@@ -19,7 +19,8 @@
 from __future__ import print_function
 
 
-
+from functools import wraps
+import sched
 import sys
 import logging
 from threading import Timer, Thread
@@ -113,6 +114,48 @@
     return user, passwd
 
 
+if PY2:
+    import time  # noqa
+
+    class scheduler(sched.scheduler):
+        """Python2 version of sched.scheduler that matches Python3 API."""
+
+        def __init__(self, **kwargs):
+            kwargs.setdefault('timefunc', time.time)
+            kwargs.setdefault('delayfunc', time.sleep)
+            # sched.scheduler is an old-style class.
+            sched.scheduler.__init__(self, **kwargs)
+
+else:
+    scheduler = sched.scheduler
+
+
+def schedule_periodic_task(scheduler, interval, func, *args):
+    """Enter a task with `func(*args)` as a periodic event in `scheduler`
+    executing at `interval` seconds. Once executed, the task would re-schedule
+    itself unless a BaseException got raised.
+    """
+    @wraps(func)
+    def task(*args):
+        restart = True
+        try:
+            func(*args)
+        except Exception:
+            logger = logging.getLogger('cubicweb.scheduler')
+            logger.exception('Unhandled exception in periodic task "%s"',
+                             func.__name__)
+        except BaseException as exc:
+            logger = logging.getLogger('cubicweb.scheduler')
+            logger.error('periodic task "%s" not re-scheduled due to %r',
+                         func.__name__, exc)
+            restart = False
+        finally:
+            if restart:
+                scheduler.enter(interval, 1, task, argument=args)
+
+    return scheduler.enter(interval, 1, task, argument=args)
+
+
 _MARKER = object()
 def func_name(func):
     name = getattr(func, '__name__', _MARKER)