# HG changeset patch # User Sylvain Thénault # Date 1307366275 -7200 # Node ID c54038622fc96bb0adc5ca2c5ece414809e39db1 # Parent 694b21f0fc620614303fe448ebe836fe3b7a3c40 [datafeed] use a boolean flag on CWSource to ensure we don't have concurrent synchronizations. Closes #1725690 diff -r 694b21f0fc62 -r c54038622fc9 hooks/__init__.py --- a/hooks/__init__.py Mon Jun 06 15:11:29 2011 +0200 +++ b/hooks/__init__.py Mon Jun 06 15:17:55 2011 +0200 @@ -63,10 +63,8 @@ source.info('added %s entities', len(stats['created'])) if stats.get('updated'): source.info('updated %s entities', len(stats['updated'])) - session.commit() except Exception, exc: session.exception('while trying to update feed %s', source) - session.rollback() session.set_cnxset() finally: session.close() diff -r 694b21f0fc62 -r c54038622fc9 misc/migration/3.13.0_Any.py --- a/misc/migration/3.13.0_Any.py Mon Jun 06 15:11:29 2011 +0200 +++ b/misc/migration/3.13.0_Any.py Mon Jun 06 15:17:55 2011 +0200 @@ -1,1 +1,2 @@ sync_schema_props_perms('cw_source', syncprops=False) +add_attribute('CWSource', 'synchronizing') diff -r 694b21f0fc62 -r c54038622fc9 schemas/base.py --- a/schemas/base.py Mon Jun 06 15:11:29 2011 +0200 +++ b/schemas/base.py Mon Jun 06 15:17:55 2011 +0200 @@ -21,7 +21,8 @@ _ = unicode from yams.buildobjs import (EntityType, RelationType, RelationDefinition, - SubjectRelation, String, Datetime, Password, Interval) + SubjectRelation, + String, Datetime, Password, Interval, Boolean) from cubicweb.schema import ( RQLConstraint, WorkflowableEntityType, ERQLExpression, RRQLExpression, PUB_SYSTEM_ENTITY_PERMS, PUB_SYSTEM_REL_PERMS, PUB_SYSTEM_ATTR_PERMS) @@ -265,7 +266,8 @@ url = String(description=_('URLs from which content will be imported. You can put one url per line')) parser = String(description=_('parser to use to extract entities from content retrieved at given URLs.')) latest_retrieval = Datetime(description=_('latest synchronization time')) - + synchronizing = Boolean(description=_('currently in synchronization'), + default=False) ENTITY_MANAGERS_PERMISSIONS = { 'read': ('managers',), diff -r 694b21f0fc62 -r c54038622fc9 server/sources/datafeed.py --- a/server/sources/datafeed.py Mon Jun 06 15:11:29 2011 +0200 +++ b/server/sources/datafeed.py Mon Jun 06 15:17:55 2011 +0200 @@ -18,6 +18,7 @@ """datafeed sources: copy data from an external data stream into the system database """ +from __future__ import with_statement import urllib2 import StringIO @@ -31,6 +32,7 @@ from cubicweb.server.sources import AbstractSource from cubicweb.appobject import AppObject + class DataFeedSource(AbstractSource): copy_based_source = True @@ -131,9 +133,39 @@ session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', {'x': self.eid, 'date': self.latest_retrieval}) + def acquire_synchronization_lock(self, session): + # XXX race condition until WHERE of SET queries is executed using + # 'SELECT FOR UPDATE' + if not session.execute('SET X synchronizing TRUE WHERE X eid %(x)s, X synchronizing FALSE', + {'x': self.eid})[0][0]: + self.error('concurrent synchronization detected, skip pull') + session.commit(free_cnxset=False) + return False + session.commit(free_cnxset=False) + return True + + def release_synchronization_lock(self, session): + session.execute('SET X synchronizing FALSE WHERE X eid %(x)s', + {'x': self.eid}) + session.commit() + def pull_data(self, session, force=False, raise_on_error=False): + """Launch synchronization of the source if needed. + + This method is responsible to handle commit/rollback on the given + session. + """ if not force and self.fresh(): return {} + if not self.acquire_synchronization_lock(session): + return {} + try: + with session.transaction(free_cnxset=False): + return self._pull_data(session, force, raise_on_error) + finally: + self.release_synchronization_lock(session) + + def _pull_data(self, session, force=False, raise_on_error=False): if self.config['delete-entities']: myuris = self.source_cwuris(session) else: @@ -272,7 +304,7 @@ try: parsed = self.parse(url) except Exception, ex: - self.source.error(ex) + self.source.error(str(ex)) return True error = False for args in parsed: