[datafeed] attempt to acquire synchronization lock even when force is given
instead of the implementation in e717da3dc164, raise an error if the lock is
already grabbed and catch this error in the caller.
See discussion on https://www.cubicweb.org/revision/10790765
Closes #10451635
--- a/cubicweb/server/serverctl.py Mon May 30 17:41:12 2016 +0200
+++ b/cubicweb/server/serverctl.py Thu Feb 18 17:51:12 2016 +0100
@@ -35,7 +35,7 @@
from logilab.database import get_db_helper, get_connection
-from cubicweb import AuthenticationError, ExecutionError, ConfigurationError
+from cubicweb import AuthenticationError, ExecutionError, ConfigurationError, SourceException
from cubicweb.toolsutils import Command, CommandHandler, underline_title
from cubicweb.cwctl import CWCTL, check_options_consistency, ConfigureInstanceCommand
from cubicweb.server import SOURCE_TYPES
@@ -995,18 +995,25 @@
init_cmdline_log_threshold(config, self['loglevel'])
repo = repoapi.get_repository(config=config)
repo.hm.call_hooks('server_maintenance', repo=repo)
+ status = 0
try:
try:
source = repo.sources_by_uri[args[1]]
except KeyError:
raise ExecutionError('no source named %r' % args[1])
with repo.internal_cnx() as cnx:
- stats = source.pull_data(cnx, force=True, raise_on_error=True)
+ try:
+ stats = source.pull_data(cnx, force=True, raise_on_error=True)
+ except SourceException as exc:
+ print("can't synchronize the source:", exc)
+ status = 1
+ stats = {}
finally:
repo.shutdown()
for key, val in stats.items():
if val:
print(key, ':', val)
+ sys.exit(status)
def permissionshandler(relation, perms):
--- a/cubicweb/server/sources/datafeed.py Mon May 30 17:41:12 2016 +0200
+++ b/cubicweb/server/sources/datafeed.py Thu Feb 18 17:51:12 2016 +0100
@@ -35,7 +35,7 @@
from logilab.common.deprecation import deprecated
-from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid
+from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid, SourceException
from cubicweb.server.repository import preprocess_inlined_relations
from cubicweb.server.sources import AbstractSource
from cubicweb.appobject import AppObject
@@ -172,23 +172,18 @@
{'x': self.eid, 'date': self.latest_retrieval})
cnx.commit()
- def acquire_synchronization_lock(self, cnx, force=False):
+ def acquire_synchronization_lock(self, cnx):
# XXX race condition until WHERE of SET queries is executed using
# 'SELECT FOR UPDATE'
now = datetime.now(tz=utc)
- if force:
- maxdt = now
- else:
- maxdt = now - self.max_lock_lifetime
+ maxdt = now - self.max_lock_lifetime
if not cnx.execute(
'SET X in_synchronization %(now)s WHERE X eid %(x)s, '
'X in_synchronization NULL OR X in_synchronization < %(maxdt)s',
{'x': self.eid, 'now': now, 'maxdt': maxdt}):
- self.error('concurrent synchronization detected, skip pull')
cnx.commit()
- return False
+ raise SourceException("a concurrent synchronization is already running")
cnx.commit()
- return True
def release_synchronization_lock(self, cnx):
cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
@@ -206,7 +201,12 @@
"""
if not force and self.fresh():
return {}
- if not self.acquire_synchronization_lock(cnx, force):
+ try:
+ self.acquire_synchronization_lock(cnx)
+ except SourceException as exc:
+ if force:
+ raise
+ self.error(str(exc))
return {}
try:
if async: