server/sources/datafeed.py
changeset 7931 60068dc83457
parent 7910 e5d5609e3bf1
parent 7921 a93e2ed5877a
child 7934 2250a60a7653
--- a/server/sources/datafeed.py	Mon Oct 10 16:15:55 2011 +0200
+++ b/server/sources/datafeed.py	Tue Oct 11 11:00:24 2011 +0200
@@ -55,6 +55,15 @@
                    'external source (default to 5 minutes, must be >= 1 min).'),
           'group': 'datafeed-source', 'level': 2,
           }),
+        ('max-lock-lifetime',
+         {'type' : 'time',
+          'default': '1h',
+          'help': ('Maximum time allowed for a synchronization to be run. '
+                   'Exceeded that time, the synchronization will be considered '
+                   'as having failed and not properly released the lock, hence '
+                   'it won\'t be considered'),
+          'group': 'datafeed-source', 'level': 2,
+          }),
         ('delete-entities',
          {'type' : 'yn',
           'default': True,
@@ -92,6 +101,7 @@
         properly typed with defaults set
         """
         self.synchro_interval = timedelta(seconds=typedconfig['synchronization-interval'])
+        self.max_lock_lifetime = timedelta(seconds=typedconfig['max-lock-lifetime'])
         if source_entity is not None:
             self._entity_update(source_entity)
         self.config = typedconfig
@@ -140,8 +150,11 @@
     def acquire_synchronization_lock(self, session):
         # XXX race condition until WHERE of SET queries is executed using
         # 'SELECT FOR UPDATE'
-        if not session.execute('SET X synchronizing TRUE WHERE X eid %(x)s, X synchronizing FALSE',
-                               {'x': self.eid}):
+        now = datetime.utcnow()
+        if not session.execute('SET X in_synchronizaton %(now)s WHERE X eid %(x)s, X synchronizing NULL OR X synchronizing < %(maxdt)s',
+                               {'x': self.eid,
+                                'now': now,
+                                'maxdt': now - self.max_lock_lifetime}):
             self.error('concurrent synchronization detected, skip pull')
             session.commit(free_cnxset=False)
             return False
@@ -150,7 +163,7 @@
 
     def release_synchronization_lock(self, session):
         session.set_cnxset()
-        session.execute('SET X synchronizing FALSE WHERE X eid %(x)s',
+        session.execute('SET X synchronizing None WHERE X eid %(x)s',
                         {'x': self.eid})
         session.commit()