server/sources/datafeed.py
branchstable
changeset 7933 b25dda2214a2
parent 7921 a93e2ed5877a
child 7934 2250a60a7653
equal deleted inserted replaced
7932:2ad26cc3b5c6 7933:b25dda2214a2
   147 
   147 
   148     def acquire_synchronization_lock(self, session):
   148     def acquire_synchronization_lock(self, session):
   149         # XXX race condition until WHERE of SET queries is executed using
   149         # XXX race condition until WHERE of SET queries is executed using
   150         # 'SELECT FOR UPDATE'
   150         # 'SELECT FOR UPDATE'
   151         now = datetime.utcnow()
   151         now = datetime.utcnow()
   152         if not session.execute('SET X in_synchronizaton %(now)s WHERE X eid %(x)s, X synchronizing NULL OR X synchronizing < %(maxdt)s',
   152         if not session.execute(
   153                                {'x': self.eid,
   153             'SET X in_synchronization %(now)s WHERE X eid %(x)s, '
   154                                 'now': now,
   154             'X in_synchronization NULL OR X in_synchronization < %(maxdt)s',
   155                                 'maxdt': now - self.max_lock_lifetime}):
   155             {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}):
   156             self.error('concurrent synchronization detected, skip pull')
   156             self.error('concurrent synchronization detected, skip pull')
   157             session.commit(free_cnxset=False)
   157             session.commit(free_cnxset=False)
   158             return False
   158             return False
   159         session.commit(free_cnxset=False)
   159         session.commit(free_cnxset=False)
   160         return True
   160         return True
   161 
   161 
   162     def release_synchronization_lock(self, session):
   162     def release_synchronization_lock(self, session):
   163         session.set_cnxset()
   163         session.set_cnxset()
   164         session.execute('SET X synchronizing None WHERE X eid %(x)s',
   164         session.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
   165                         {'x': self.eid})
   165                         {'x': self.eid})
   166         session.commit()
   166         session.commit()
   167 
   167 
   168     def pull_data(self, session, force=False, raise_on_error=False):
   168     def pull_data(self, session, force=False, raise_on_error=False):
   169         """Launch synchronization of the source if needed.
   169         """Launch synchronization of the source if needed.