--- a/server/sources/datafeed.py Fri Oct 07 11:47:42 2011 +0200
+++ b/server/sources/datafeed.py Fri Oct 07 15:55:14 2011 +0200
@@ -55,6 +55,15 @@
'external source (default to 5 minutes, must be >= 1 min).'),
'group': 'datafeed-source', 'level': 2,
}),
+ ('max-lock-lifetime',
+ {'type' : 'time',
+ 'default': '1h',
+ 'help': ('Maximum time allowed for a synchronization to be run. '
+ 'Exceeded that time, the synchronization will be considered '
+ 'as having failed and not properly released the lock, hence '
+ 'it won\'t be considered'),
+ 'group': 'datafeed-source', 'level': 2,
+ }),
('delete-entities',
{'type' : 'yn',
'default': True,
@@ -90,6 +99,7 @@
properly typed with defaults set
"""
self.synchro_interval = timedelta(seconds=typedconfig['synchronization-interval'])
+ self.max_lock_lifetime = timedelta(seconds=typedconfig['max-lock-lifetime'])
if source_entity is not None:
self._entity_update(source_entity)
self.config = typedconfig
@@ -138,8 +148,11 @@
def acquire_synchronization_lock(self, session):
# XXX race condition until WHERE of SET queries is executed using
# 'SELECT FOR UPDATE'
- if not session.execute('SET X synchronizing TRUE WHERE X eid %(x)s, X synchronizing FALSE',
- {'x': self.eid}):
+ now = datetime.utcnow()
+ if not session.execute('SET X in_synchronizaton %(now)s WHERE X eid %(x)s, X synchronizing NULL OR X synchronizing < %(maxdt)s',
+ {'x': self.eid,
+ 'now': now,
+ 'maxdt': now - self.max_lock_lifetime}):
self.error('concurrent synchronization detected, skip pull')
session.commit(free_cnxset=False)
return False
@@ -148,7 +161,7 @@
def release_synchronization_lock(self, session):
session.set_cnxset()
- session.execute('SET X synchronizing FALSE WHERE X eid %(x)s',
+ session.execute('SET X synchronizing None WHERE X eid %(x)s',
{'x': self.eid})
session.commit()