[datafeed] use a boolean flag on CWSource to ensure we don't have concurrent synchronizations. Closes #1725690
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Mon, 06 Jun 2011 15:17:55 +0200
changeset 7456 c54038622fc9
parent 7455 694b21f0fc62
child 7457 f7c97a3cd121
[datafeed] use a boolean flag on CWSource to ensure we don't have concurrent synchronizations. Closes #1725690
hooks/__init__.py
misc/migration/3.13.0_Any.py
schemas/base.py
server/sources/datafeed.py
--- a/hooks/__init__.py	Mon Jun 06 15:11:29 2011 +0200
+++ b/hooks/__init__.py	Mon Jun 06 15:17:55 2011 +0200
@@ -63,10 +63,8 @@
                             source.info('added %s entities', len(stats['created']))
                         if stats.get('updated'):
                             source.info('updated %s entities', len(stats['updated']))
-                        session.commit()
                     except Exception, exc:
                         session.exception('while trying to update feed %s', source)
-                        session.rollback()
                     session.set_cnxset()
             finally:
                 session.close()
--- a/misc/migration/3.13.0_Any.py	Mon Jun 06 15:11:29 2011 +0200
+++ b/misc/migration/3.13.0_Any.py	Mon Jun 06 15:17:55 2011 +0200
@@ -1,1 +1,2 @@
 sync_schema_props_perms('cw_source', syncprops=False)
+add_attribute('CWSource', 'synchronizing')
--- a/schemas/base.py	Mon Jun 06 15:11:29 2011 +0200
+++ b/schemas/base.py	Mon Jun 06 15:17:55 2011 +0200
@@ -21,7 +21,8 @@
 _ = unicode
 
 from yams.buildobjs import (EntityType, RelationType, RelationDefinition,
-                            SubjectRelation, String, Datetime, Password, Interval)
+                            SubjectRelation,
+                            String, Datetime, Password, Interval, Boolean)
 from cubicweb.schema import (
     RQLConstraint, WorkflowableEntityType, ERQLExpression, RRQLExpression,
     PUB_SYSTEM_ENTITY_PERMS, PUB_SYSTEM_REL_PERMS, PUB_SYSTEM_ATTR_PERMS)
@@ -265,7 +266,8 @@
     url = String(description=_('URLs from which content will be imported. You can put one url per line'))
     parser = String(description=_('parser to use to extract entities from content retrieved at given URLs.'))
     latest_retrieval = Datetime(description=_('latest synchronization time'))
-
+    synchronizing = Boolean(description=_('currently in synchronization'),
+                            default=False)
 
 ENTITY_MANAGERS_PERMISSIONS = {
     'read':   ('managers',),
--- a/server/sources/datafeed.py	Mon Jun 06 15:11:29 2011 +0200
+++ b/server/sources/datafeed.py	Mon Jun 06 15:17:55 2011 +0200
@@ -18,6 +18,7 @@
 """datafeed sources: copy data from an external data stream into the system
 database
 """
+from __future__ import with_statement
 
 import urllib2
 import StringIO
@@ -31,6 +32,7 @@
 from cubicweb.server.sources import AbstractSource
 from cubicweb.appobject import AppObject
 
+
 class DataFeedSource(AbstractSource):
     copy_based_source = True
 
@@ -131,9 +133,39 @@
         session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
                         {'x': self.eid, 'date': self.latest_retrieval})
 
+    def acquire_synchronization_lock(self, session):
+        # XXX race condition until WHERE of SET queries is executed using
+        # 'SELECT FOR UPDATE'
+        if not session.execute('SET X synchronizing TRUE WHERE X eid %(x)s, X synchronizing FALSE',
+                               {'x': self.eid})[0][0]:
+            self.error('concurrent synchronization detected, skip pull')
+            session.commit(free_cnxset=False)
+            return False
+        session.commit(free_cnxset=False)
+        return True
+
+    def release_synchronization_lock(self, session):
+        session.execute('SET X synchronizing FALSE WHERE X eid %(x)s',
+                        {'x': self.eid})
+        session.commit()
+
     def pull_data(self, session, force=False, raise_on_error=False):
+        """Launch synchronization of the source if needed.
+
+        This method is responsible to handle commit/rollback on the given
+        session.
+        """
         if not force and self.fresh():
             return {}
+        if not self.acquire_synchronization_lock(session):
+            return {}
+        try:
+            with session.transaction(free_cnxset=False):
+                return self._pull_data(session, force, raise_on_error)
+        finally:
+            self.release_synchronization_lock(session)
+
+    def _pull_data(self, session, force=False, raise_on_error=False):
         if self.config['delete-entities']:
             myuris = self.source_cwuris(session)
         else:
@@ -272,7 +304,7 @@
         try:
             parsed = self.parse(url)
         except Exception, ex:
-            self.source.error(ex)
+            self.source.error(str(ex))
             return True
         error = False
         for args in parsed: