# HG changeset patch # User Sylvain Thénault # Date 1455814272 -3600 # Node ID 27b98f3cceae0f7c86b2ac4e818971165a7a494b # Parent 847ab4bdd985b3ac1644033c974cfc06f9221022 [datafeed] attempt to acquire synchronization lock even when force is given instead of the implementation in e717da3dc164, raise an error if the lock is already grabbed and catch this error in the caller. See discussion on https://www.cubicweb.org/revision/10790765 Closes #10451635 diff -r 847ab4bdd985 -r 27b98f3cceae cubicweb/server/serverctl.py --- a/cubicweb/server/serverctl.py Mon May 30 17:41:12 2016 +0200 +++ b/cubicweb/server/serverctl.py Thu Feb 18 17:51:12 2016 +0100 @@ -35,7 +35,7 @@ from logilab.database import get_db_helper, get_connection -from cubicweb import AuthenticationError, ExecutionError, ConfigurationError +from cubicweb import AuthenticationError, ExecutionError, ConfigurationError, SourceException from cubicweb.toolsutils import Command, CommandHandler, underline_title from cubicweb.cwctl import CWCTL, check_options_consistency, ConfigureInstanceCommand from cubicweb.server import SOURCE_TYPES @@ -995,18 +995,25 @@ init_cmdline_log_threshold(config, self['loglevel']) repo = repoapi.get_repository(config=config) repo.hm.call_hooks('server_maintenance', repo=repo) + status = 0 try: try: source = repo.sources_by_uri[args[1]] except KeyError: raise ExecutionError('no source named %r' % args[1]) with repo.internal_cnx() as cnx: - stats = source.pull_data(cnx, force=True, raise_on_error=True) + try: + stats = source.pull_data(cnx, force=True, raise_on_error=True) + except SourceException as exc: + print("can't synchronize the source:", exc) + status = 1 + stats = {} finally: repo.shutdown() for key, val in stats.items(): if val: print(key, ':', val) + sys.exit(status) def permissionshandler(relation, perms): diff -r 847ab4bdd985 -r 27b98f3cceae cubicweb/server/sources/datafeed.py --- a/cubicweb/server/sources/datafeed.py Mon May 30 17:41:12 2016 +0200 +++ b/cubicweb/server/sources/datafeed.py Thu Feb 18 17:51:12 2016 +0100 @@ -35,7 +35,7 @@ from logilab.common.deprecation import deprecated -from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid +from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid, SourceException from cubicweb.server.repository import preprocess_inlined_relations from cubicweb.server.sources import AbstractSource from cubicweb.appobject import AppObject @@ -172,23 +172,18 @@ {'x': self.eid, 'date': self.latest_retrieval}) cnx.commit() - def acquire_synchronization_lock(self, cnx, force=False): + def acquire_synchronization_lock(self, cnx): # XXX race condition until WHERE of SET queries is executed using # 'SELECT FOR UPDATE' now = datetime.now(tz=utc) - if force: - maxdt = now - else: - maxdt = now - self.max_lock_lifetime + maxdt = now - self.max_lock_lifetime if not cnx.execute( 'SET X in_synchronization %(now)s WHERE X eid %(x)s, ' 'X in_synchronization NULL OR X in_synchronization < %(maxdt)s', {'x': self.eid, 'now': now, 'maxdt': maxdt}): - self.error('concurrent synchronization detected, skip pull') cnx.commit() - return False + raise SourceException("a concurrent synchronization is already running") cnx.commit() - return True def release_synchronization_lock(self, cnx): cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s', @@ -206,7 +201,12 @@ """ if not force and self.fresh(): return {} - if not self.acquire_synchronization_lock(cnx, force): + try: + self.acquire_synchronization_lock(cnx) + except SourceException as exc: + if force: + raise + self.error(str(exc)) return {} try: if async: