--- a/server/sources/datafeed.py Mon Jun 06 15:11:29 2011 +0200
+++ b/server/sources/datafeed.py Mon Jun 06 15:17:55 2011 +0200
@@ -18,6 +18,7 @@
"""datafeed sources: copy data from an external data stream into the system
database
"""
+from __future__ import with_statement
import urllib2
import StringIO
@@ -31,6 +32,7 @@
from cubicweb.server.sources import AbstractSource
from cubicweb.appobject import AppObject
+
class DataFeedSource(AbstractSource):
copy_based_source = True
@@ -131,9 +133,39 @@
session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
{'x': self.eid, 'date': self.latest_retrieval})
+ def acquire_synchronization_lock(self, session):
+ # XXX race condition until WHERE of SET queries is executed using
+ # 'SELECT FOR UPDATE'
+ if not session.execute('SET X synchronizing TRUE WHERE X eid %(x)s, X synchronizing FALSE',
+ {'x': self.eid})[0][0]:
+ self.error('concurrent synchronization detected, skip pull')
+ session.commit(free_cnxset=False)
+ return False
+ session.commit(free_cnxset=False)
+ return True
+
+ def release_synchronization_lock(self, session):
+ session.execute('SET X synchronizing FALSE WHERE X eid %(x)s',
+ {'x': self.eid})
+ session.commit()
+
def pull_data(self, session, force=False, raise_on_error=False):
+ """Launch synchronization of the source if needed.
+
+ This method is responsible to handle commit/rollback on the given
+ session.
+ """
if not force and self.fresh():
return {}
+ if not self.acquire_synchronization_lock(session):
+ return {}
+ try:
+ with session.transaction(free_cnxset=False):
+ return self._pull_data(session, force, raise_on_error)
+ finally:
+ self.release_synchronization_lock(session)
+
+ def _pull_data(self, session, force=False, raise_on_error=False):
if self.config['delete-entities']:
myuris = self.source_cwuris(session)
else:
@@ -272,7 +304,7 @@
try:
parsed = self.parse(url)
except Exception, ex:
- self.source.error(ex)
+ self.source.error(str(ex))
return True
error = False
for args in parsed: