diff -r 436400e7f807 -r a93e2ed5877a server/sources/datafeed.py --- a/server/sources/datafeed.py Fri Oct 07 11:47:42 2011 +0200 +++ b/server/sources/datafeed.py Fri Oct 07 15:55:14 2011 +0200 @@ -55,6 +55,15 @@ 'external source (default to 5 minutes, must be >= 1 min).'), 'group': 'datafeed-source', 'level': 2, }), + ('max-lock-lifetime', + {'type' : 'time', + 'default': '1h', + 'help': ('Maximum time allowed for a synchronization to be run. ' + 'Exceeded that time, the synchronization will be considered ' + 'as having failed and not properly released the lock, hence ' + 'it won\'t be considered'), + 'group': 'datafeed-source', 'level': 2, + }), ('delete-entities', {'type' : 'yn', 'default': True, @@ -90,6 +99,7 @@ properly typed with defaults set """ self.synchro_interval = timedelta(seconds=typedconfig['synchronization-interval']) + self.max_lock_lifetime = timedelta(seconds=typedconfig['max-lock-lifetime']) if source_entity is not None: self._entity_update(source_entity) self.config = typedconfig @@ -138,8 +148,11 @@ def acquire_synchronization_lock(self, session): # XXX race condition until WHERE of SET queries is executed using # 'SELECT FOR UPDATE' - if not session.execute('SET X synchronizing TRUE WHERE X eid %(x)s, X synchronizing FALSE', - {'x': self.eid}): + now = datetime.utcnow() + if not session.execute('SET X in_synchronizaton %(now)s WHERE X eid %(x)s, X synchronizing NULL OR X synchronizing < %(maxdt)s', + {'x': self.eid, + 'now': now, + 'maxdt': now - self.max_lock_lifetime}): self.error('concurrent synchronization detected, skip pull') session.commit(free_cnxset=False) return False @@ -148,7 +161,7 @@ def release_synchronization_lock(self, session): session.set_cnxset() - session.execute('SET X synchronizing FALSE WHERE X eid %(x)s', + session.execute('SET X synchronizing None WHERE X eid %(x)s', {'x': self.eid}) session.commit()