169 self.latest_retrieval = datetime.now(tz=utc) |
169 self.latest_retrieval = datetime.now(tz=utc) |
170 cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', |
170 cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', |
171 {'x': self.eid, 'date': self.latest_retrieval}) |
171 {'x': self.eid, 'date': self.latest_retrieval}) |
172 cnx.commit() |
172 cnx.commit() |
173 |
173 |
174 def acquire_synchronization_lock(self, cnx): |
174 def acquire_synchronization_lock(self, cnx, force=False): |
175 # XXX race condition until WHERE of SET queries is executed using |
175 # XXX race condition until WHERE of SET queries is executed using |
176 # 'SELECT FOR UPDATE' |
176 # 'SELECT FOR UPDATE' |
177 now = datetime.now(tz=utc) |
177 now = datetime.now(tz=utc) |
|
178 if force: |
|
179 maxdt = now |
|
180 else: |
|
181 maxdt = now - self.max_lock_lifetime |
178 if not cnx.execute( |
182 if not cnx.execute( |
179 'SET X in_synchronization %(now)s WHERE X eid %(x)s, ' |
183 'SET X in_synchronization %(now)s WHERE X eid %(x)s, ' |
180 'X in_synchronization NULL OR X in_synchronization < %(maxdt)s', |
184 'X in_synchronization NULL OR X in_synchronization < %(maxdt)s', |
181 {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}): |
185 {'x': self.eid, 'now': now, 'maxdt': maxdt}): |
182 self.error('concurrent synchronization detected, skip pull') |
186 self.error('concurrent synchronization detected, skip pull') |
183 cnx.commit() |
187 cnx.commit() |
184 return False |
188 return False |
185 cnx.commit() |
189 cnx.commit() |
186 return True |
190 return True |
196 This method is responsible to handle commit/rollback on the given |
200 This method is responsible to handle commit/rollback on the given |
197 connection. |
201 connection. |
198 """ |
202 """ |
199 if not force and self.fresh(): |
203 if not force and self.fresh(): |
200 return {} |
204 return {} |
201 if not self.acquire_synchronization_lock(cnx): |
205 if not self.acquire_synchronization_lock(cnx, force): |
202 return {} |
206 return {} |
203 try: |
207 try: |
204 return self._pull_data(cnx, force, raise_on_error) |
208 return self._pull_data(cnx, force, raise_on_error) |
205 finally: |
209 finally: |
206 cnx.rollback() # rollback first in case there is some dirty |
210 cnx.rollback() # rollback first in case there is some dirty |