[datafeed] use a boolean flag on CWSource to ensure we don't have concurrent synchronizations. Closes #1725690
--- a/hooks/__init__.py Mon Jun 06 15:11:29 2011 +0200
+++ b/hooks/__init__.py Mon Jun 06 15:17:55 2011 +0200
@@ -63,10 +63,8 @@
source.info('added %s entities', len(stats['created']))
if stats.get('updated'):
source.info('updated %s entities', len(stats['updated']))
- session.commit()
except Exception, exc:
session.exception('while trying to update feed %s', source)
- session.rollback()
session.set_cnxset()
finally:
session.close()
--- a/misc/migration/3.13.0_Any.py Mon Jun 06 15:11:29 2011 +0200
+++ b/misc/migration/3.13.0_Any.py Mon Jun 06 15:17:55 2011 +0200
@@ -1,1 +1,2 @@
sync_schema_props_perms('cw_source', syncprops=False)
+add_attribute('CWSource', 'synchronizing')
--- a/schemas/base.py Mon Jun 06 15:11:29 2011 +0200
+++ b/schemas/base.py Mon Jun 06 15:17:55 2011 +0200
@@ -21,7 +21,8 @@
_ = unicode
from yams.buildobjs import (EntityType, RelationType, RelationDefinition,
- SubjectRelation, String, Datetime, Password, Interval)
+ SubjectRelation,
+ String, Datetime, Password, Interval, Boolean)
from cubicweb.schema import (
RQLConstraint, WorkflowableEntityType, ERQLExpression, RRQLExpression,
PUB_SYSTEM_ENTITY_PERMS, PUB_SYSTEM_REL_PERMS, PUB_SYSTEM_ATTR_PERMS)
@@ -265,7 +266,8 @@
url = String(description=_('URLs from which content will be imported. You can put one url per line'))
parser = String(description=_('parser to use to extract entities from content retrieved at given URLs.'))
latest_retrieval = Datetime(description=_('latest synchronization time'))
-
+ synchronizing = Boolean(description=_('currently in synchronization'),
+ default=False)
ENTITY_MANAGERS_PERMISSIONS = {
'read': ('managers',),
--- a/server/sources/datafeed.py Mon Jun 06 15:11:29 2011 +0200
+++ b/server/sources/datafeed.py Mon Jun 06 15:17:55 2011 +0200
@@ -18,6 +18,7 @@
"""datafeed sources: copy data from an external data stream into the system
database
"""
+from __future__ import with_statement
import urllib2
import StringIO
@@ -31,6 +32,7 @@
from cubicweb.server.sources import AbstractSource
from cubicweb.appobject import AppObject
+
class DataFeedSource(AbstractSource):
copy_based_source = True
@@ -131,9 +133,39 @@
session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
{'x': self.eid, 'date': self.latest_retrieval})
+ def acquire_synchronization_lock(self, session):
+ # XXX race condition until WHERE of SET queries is executed using
+ # 'SELECT FOR UPDATE'
+ if not session.execute('SET X synchronizing TRUE WHERE X eid %(x)s, X synchronizing FALSE',
+ {'x': self.eid})[0][0]:
+ self.error('concurrent synchronization detected, skip pull')
+ session.commit(free_cnxset=False)
+ return False
+ session.commit(free_cnxset=False)
+ return True
+
+ def release_synchronization_lock(self, session):
+ session.execute('SET X synchronizing FALSE WHERE X eid %(x)s',
+ {'x': self.eid})
+ session.commit()
+
def pull_data(self, session, force=False, raise_on_error=False):
+ """Launch synchronization of the source if needed.
+
+ This method is responsible to handle commit/rollback on the given
+ session.
+ """
if not force and self.fresh():
return {}
+ if not self.acquire_synchronization_lock(session):
+ return {}
+ try:
+ with session.transaction(free_cnxset=False):
+ return self._pull_data(session, force, raise_on_error)
+ finally:
+ self.release_synchronization_lock(session)
+
+ def _pull_data(self, session, force=False, raise_on_error=False):
if self.config['delete-entities']:
myuris = self.source_cwuris(session)
else:
@@ -272,7 +304,7 @@
try:
parsed = self.parse(url)
except Exception, ex:
- self.source.error(ex)
+ self.source.error(str(ex))
return True
error = False
for args in parsed: