server/sources/datafeed.py
branchstable
changeset 7921 a93e2ed5877a
parent 7731 48e78934a4e2
child 7931 60068dc83457
child 7933 b25dda2214a2
--- a/server/sources/datafeed.py	Fri Oct 07 11:47:42 2011 +0200
+++ b/server/sources/datafeed.py	Fri Oct 07 15:55:14 2011 +0200
@@ -55,6 +55,15 @@
                    'external source (default to 5 minutes, must be >= 1 min).'),
           'group': 'datafeed-source', 'level': 2,
           }),
+        ('max-lock-lifetime',
+         {'type' : 'time',
+          'default': '1h',
+          'help': ('Maximum time allowed for a synchronization to be run. '
+                   'Exceeded that time, the synchronization will be considered '
+                   'as having failed and not properly released the lock, hence '
+                   'it won\'t be considered'),
+          'group': 'datafeed-source', 'level': 2,
+          }),
         ('delete-entities',
          {'type' : 'yn',
           'default': True,
@@ -90,6 +99,7 @@
         properly typed with defaults set
         """
         self.synchro_interval = timedelta(seconds=typedconfig['synchronization-interval'])
+        self.max_lock_lifetime = timedelta(seconds=typedconfig['max-lock-lifetime'])
         if source_entity is not None:
             self._entity_update(source_entity)
         self.config = typedconfig
@@ -138,8 +148,11 @@
     def acquire_synchronization_lock(self, session):
         # XXX race condition until WHERE of SET queries is executed using
         # 'SELECT FOR UPDATE'
-        if not session.execute('SET X synchronizing TRUE WHERE X eid %(x)s, X synchronizing FALSE',
-                               {'x': self.eid}):
+        now = datetime.utcnow()
+        if not session.execute('SET X in_synchronizaton %(now)s WHERE X eid %(x)s, X synchronizing NULL OR X synchronizing < %(maxdt)s',
+                               {'x': self.eid,
+                                'now': now,
+                                'maxdt': now - self.max_lock_lifetime}):
             self.error('concurrent synchronization detected, skip pull')
             session.commit(free_cnxset=False)
             return False
@@ -148,7 +161,7 @@
 
     def release_synchronization_lock(self, session):
         session.set_cnxset()
-        session.execute('SET X synchronizing FALSE WHERE X eid %(x)s',
+        session.execute('SET X synchronizing None WHERE X eid %(x)s',
                         {'x': self.eid})
         session.commit()