[datafeed] attempt to acquire synchronization lock even when force is given
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Thu, 18 Feb 2016 17:51:12 +0100
changeset 11345 27b98f3cceae
parent 11344 847ab4bdd985
child 11346 69c17d011f74
[datafeed] attempt to acquire synchronization lock even when force is given instead of the implementation in e717da3dc164, raise an error if the lock is already grabbed and catch this error in the caller. See discussion on https://www.cubicweb.org/revision/10790765 Closes #10451635
cubicweb/server/serverctl.py
cubicweb/server/sources/datafeed.py
--- a/cubicweb/server/serverctl.py	Mon May 30 17:41:12 2016 +0200
+++ b/cubicweb/server/serverctl.py	Thu Feb 18 17:51:12 2016 +0100
@@ -35,7 +35,7 @@
 
 from logilab.database import get_db_helper, get_connection
 
-from cubicweb import AuthenticationError, ExecutionError, ConfigurationError
+from cubicweb import AuthenticationError, ExecutionError, ConfigurationError, SourceException
 from cubicweb.toolsutils import Command, CommandHandler, underline_title
 from cubicweb.cwctl import CWCTL, check_options_consistency, ConfigureInstanceCommand
 from cubicweb.server import SOURCE_TYPES
@@ -995,18 +995,25 @@
         init_cmdline_log_threshold(config, self['loglevel'])
         repo = repoapi.get_repository(config=config)
         repo.hm.call_hooks('server_maintenance', repo=repo)
+        status = 0
         try:
             try:
                 source = repo.sources_by_uri[args[1]]
             except KeyError:
                 raise ExecutionError('no source named %r' % args[1])
             with repo.internal_cnx() as cnx:
-                stats = source.pull_data(cnx, force=True, raise_on_error=True)
+                try:
+                    stats = source.pull_data(cnx, force=True, raise_on_error=True)
+                except SourceException as exc:
+                    print("can't synchronize the source:", exc)
+                    status = 1
+                    stats = {}
         finally:
             repo.shutdown()
         for key, val in stats.items():
             if val:
                 print(key, ':', val)
+        sys.exit(status)
 
 
 def permissionshandler(relation, perms):
--- a/cubicweb/server/sources/datafeed.py	Mon May 30 17:41:12 2016 +0200
+++ b/cubicweb/server/sources/datafeed.py	Thu Feb 18 17:51:12 2016 +0100
@@ -35,7 +35,7 @@
 
 from logilab.common.deprecation import deprecated
 
-from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid
+from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid, SourceException
 from cubicweb.server.repository import preprocess_inlined_relations
 from cubicweb.server.sources import AbstractSource
 from cubicweb.appobject import AppObject
@@ -172,23 +172,18 @@
                     {'x': self.eid, 'date': self.latest_retrieval})
         cnx.commit()
 
-    def acquire_synchronization_lock(self, cnx, force=False):
+    def acquire_synchronization_lock(self, cnx):
         # XXX race condition until WHERE of SET queries is executed using
         # 'SELECT FOR UPDATE'
         now = datetime.now(tz=utc)
-        if force:
-            maxdt = now
-        else:
-            maxdt = now - self.max_lock_lifetime
+        maxdt = now - self.max_lock_lifetime
         if not cnx.execute(
                 'SET X in_synchronization %(now)s WHERE X eid %(x)s, '
                 'X in_synchronization NULL OR X in_synchronization < %(maxdt)s',
                 {'x': self.eid, 'now': now, 'maxdt': maxdt}):
-            self.error('concurrent synchronization detected, skip pull')
             cnx.commit()
-            return False
+            raise SourceException("a concurrent synchronization is already running")
         cnx.commit()
-        return True
 
     def release_synchronization_lock(self, cnx):
         cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
@@ -206,7 +201,12 @@
         """
         if not force and self.fresh():
             return {}
-        if not self.acquire_synchronization_lock(cnx, force):
+        try:
+            self.acquire_synchronization_lock(cnx)
+        except SourceException as exc:
+            if force:
+                raise
+            self.error(str(exc))
             return {}
         try:
             if async: