server/sources/datafeed.py
changeset 7456 c54038622fc9
parent 7447 d5705c9bbe82
child 7461 cb25b62074cc
--- a/server/sources/datafeed.py	Mon Jun 06 15:11:29 2011 +0200
+++ b/server/sources/datafeed.py	Mon Jun 06 15:17:55 2011 +0200
@@ -18,6 +18,7 @@
 """datafeed sources: copy data from an external data stream into the system
 database
 """
+from __future__ import with_statement
 
 import urllib2
 import StringIO
@@ -31,6 +32,7 @@
 from cubicweb.server.sources import AbstractSource
 from cubicweb.appobject import AppObject
 
+
 class DataFeedSource(AbstractSource):
     copy_based_source = True
 
@@ -131,9 +133,39 @@
         session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
                         {'x': self.eid, 'date': self.latest_retrieval})
 
+    def acquire_synchronization_lock(self, session):
+        # XXX race condition until WHERE of SET queries is executed using
+        # 'SELECT FOR UPDATE'
+        if not session.execute('SET X synchronizing TRUE WHERE X eid %(x)s, X synchronizing FALSE',
+                               {'x': self.eid})[0][0]:
+            self.error('concurrent synchronization detected, skip pull')
+            session.commit(free_cnxset=False)
+            return False
+        session.commit(free_cnxset=False)
+        return True
+
+    def release_synchronization_lock(self, session):
+        session.execute('SET X synchronizing FALSE WHERE X eid %(x)s',
+                        {'x': self.eid})
+        session.commit()
+
     def pull_data(self, session, force=False, raise_on_error=False):
+        """Launch synchronization of the source if needed.
+
+        This method is responsible to handle commit/rollback on the given
+        session.
+        """
         if not force and self.fresh():
             return {}
+        if not self.acquire_synchronization_lock(session):
+            return {}
+        try:
+            with session.transaction(free_cnxset=False):
+                return self._pull_data(session, force, raise_on_error)
+        finally:
+            self.release_synchronization_lock(session)
+
+    def _pull_data(self, session, force=False, raise_on_error=False):
         if self.config['delete-entities']:
             myuris = self.source_cwuris(session)
         else:
@@ -272,7 +304,7 @@
         try:
             parsed = self.parse(url)
         except Exception, ex:
-            self.source.error(ex)
+            self.source.error(str(ex))
             return True
         error = False
         for args in parsed: