cubicweb/server/sources/datafeed.py
changeset 11345 27b98f3cceae
parent 11255 58be5fe4a232
child 11740 dabbb2a4a493
--- a/cubicweb/server/sources/datafeed.py	Mon May 30 17:41:12 2016 +0200
+++ b/cubicweb/server/sources/datafeed.py	Thu Feb 18 17:51:12 2016 +0100
@@ -35,7 +35,7 @@
 
 from logilab.common.deprecation import deprecated
 
-from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid
+from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid, SourceException
 from cubicweb.server.repository import preprocess_inlined_relations
 from cubicweb.server.sources import AbstractSource
 from cubicweb.appobject import AppObject
@@ -172,23 +172,18 @@
                     {'x': self.eid, 'date': self.latest_retrieval})
         cnx.commit()
 
-    def acquire_synchronization_lock(self, cnx, force=False):
+    def acquire_synchronization_lock(self, cnx):
         # XXX race condition until WHERE of SET queries is executed using
         # 'SELECT FOR UPDATE'
         now = datetime.now(tz=utc)
-        if force:
-            maxdt = now
-        else:
-            maxdt = now - self.max_lock_lifetime
+        maxdt = now - self.max_lock_lifetime
         if not cnx.execute(
                 'SET X in_synchronization %(now)s WHERE X eid %(x)s, '
                 'X in_synchronization NULL OR X in_synchronization < %(maxdt)s',
                 {'x': self.eid, 'now': now, 'maxdt': maxdt}):
-            self.error('concurrent synchronization detected, skip pull')
             cnx.commit()
-            return False
+            raise SourceException("a concurrent synchronization is already running")
         cnx.commit()
-        return True
 
     def release_synchronization_lock(self, cnx):
         cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
@@ -206,7 +201,12 @@
         """
         if not force and self.fresh():
             return {}
-        if not self.acquire_synchronization_lock(cnx, force):
+        try:
+            self.acquire_synchronization_lock(cnx)
+        except SourceException as exc:
+            if force:
+                raise
+            self.error(str(exc))
             return {}
         try:
             if async: