cubicweb/server/sources/datafeed.py
changeset 11125 e717da3dc164
parent 11057 0b59724cb3f2
child 11129 97095348b3ee
equal deleted inserted replaced
11099:5fdbf6f2db88 11125:e717da3dc164
   169         self.latest_retrieval = datetime.now(tz=utc)
   169         self.latest_retrieval = datetime.now(tz=utc)
   170         cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
   170         cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
   171                     {'x': self.eid, 'date': self.latest_retrieval})
   171                     {'x': self.eid, 'date': self.latest_retrieval})
   172         cnx.commit()
   172         cnx.commit()
   173 
   173 
   174     def acquire_synchronization_lock(self, cnx):
   174     def acquire_synchronization_lock(self, cnx, force=False):
   175         # XXX race condition until WHERE of SET queries is executed using
   175         # XXX race condition until WHERE of SET queries is executed using
   176         # 'SELECT FOR UPDATE'
   176         # 'SELECT FOR UPDATE'
   177         now = datetime.now(tz=utc)
   177         now = datetime.now(tz=utc)
       
   178         if force:
       
   179             maxdt = now
       
   180         else:
       
   181             maxdt = now - self.max_lock_lifetime
   178         if not cnx.execute(
   182         if not cnx.execute(
   179             'SET X in_synchronization %(now)s WHERE X eid %(x)s, '
   183                 'SET X in_synchronization %(now)s WHERE X eid %(x)s, '
   180             'X in_synchronization NULL OR X in_synchronization < %(maxdt)s',
   184                 'X in_synchronization NULL OR X in_synchronization < %(maxdt)s',
   181             {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}):
   185                 {'x': self.eid, 'now': now, 'maxdt': maxdt}):
   182             self.error('concurrent synchronization detected, skip pull')
   186             self.error('concurrent synchronization detected, skip pull')
   183             cnx.commit()
   187             cnx.commit()
   184             return False
   188             return False
   185         cnx.commit()
   189         cnx.commit()
   186         return True
   190         return True
   196         This method is responsible to handle commit/rollback on the given
   200         This method is responsible to handle commit/rollback on the given
   197         connection.
   201         connection.
   198         """
   202         """
   199         if not force and self.fresh():
   203         if not force and self.fresh():
   200             return {}
   204             return {}
   201         if not self.acquire_synchronization_lock(cnx):
   205         if not self.acquire_synchronization_lock(cnx, force):
   202             return {}
   206             return {}
   203         try:
   207         try:
   204             return self._pull_data(cnx, force, raise_on_error)
   208             return self._pull_data(cnx, force, raise_on_error)
   205         finally:
   209         finally:
   206             cnx.rollback() # rollback first in case there is some dirty
   210             cnx.rollback() # rollback first in case there is some dirty