147 |
147 |
148 def acquire_synchronization_lock(self, session): |
148 def acquire_synchronization_lock(self, session): |
149 # XXX race condition until WHERE of SET queries is executed using |
149 # XXX race condition until WHERE of SET queries is executed using |
150 # 'SELECT FOR UPDATE' |
150 # 'SELECT FOR UPDATE' |
151 now = datetime.utcnow() |
151 now = datetime.utcnow() |
152 if not session.execute('SET X in_synchronizaton %(now)s WHERE X eid %(x)s, X synchronizing NULL OR X synchronizing < %(maxdt)s', |
152 if not session.execute( |
153 {'x': self.eid, |
153 'SET X in_synchronization %(now)s WHERE X eid %(x)s, ' |
154 'now': now, |
154 'X in_synchronization NULL OR X in_synchronization < %(maxdt)s', |
155 'maxdt': now - self.max_lock_lifetime}): |
155 {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}): |
156 self.error('concurrent synchronization detected, skip pull') |
156 self.error('concurrent synchronization detected, skip pull') |
157 session.commit(free_cnxset=False) |
157 session.commit(free_cnxset=False) |
158 return False |
158 return False |
159 session.commit(free_cnxset=False) |
159 session.commit(free_cnxset=False) |
160 return True |
160 return True |
161 |
161 |
162 def release_synchronization_lock(self, session): |
162 def release_synchronization_lock(self, session): |
163 session.set_cnxset() |
163 session.set_cnxset() |
164 session.execute('SET X synchronizing None WHERE X eid %(x)s', |
164 session.execute('SET X in_synchronization NULL WHERE X eid %(x)s', |
165 {'x': self.eid}) |
165 {'x': self.eid}) |
166 session.commit() |
166 session.commit() |
167 |
167 |
168 def pull_data(self, session, force=False, raise_on_error=False): |
168 def pull_data(self, session, force=False, raise_on_error=False): |
169 """Launch synchronization of the source if needed. |
169 """Launch synchronization of the source if needed. |