141 def fresh(self): |
141 def fresh(self): |
142 if self.latest_retrieval is None: |
142 if self.latest_retrieval is None: |
143 return False |
143 return False |
144 return datetime.utcnow() < (self.latest_retrieval + self.synchro_interval) |
144 return datetime.utcnow() < (self.latest_retrieval + self.synchro_interval) |
145 |
145 |
146 def update_latest_retrieval(self, session): |
146 def update_latest_retrieval(self, cnx): |
147 self.latest_retrieval = datetime.utcnow() |
147 self.latest_retrieval = datetime.utcnow() |
148 session.set_cnxset() |
148 cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', |
149 session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', |
|
150 {'x': self.eid, 'date': self.latest_retrieval}) |
149 {'x': self.eid, 'date': self.latest_retrieval}) |
151 session.commit() |
150 cnx.commit() |
152 |
151 |
153 def acquire_synchronization_lock(self, session): |
152 def acquire_synchronization_lock(self, cnx): |
154 # XXX race condition until WHERE of SET queries is executed using |
153 # XXX race condition until WHERE of SET queries is executed using |
155 # 'SELECT FOR UPDATE' |
154 # 'SELECT FOR UPDATE' |
156 now = datetime.utcnow() |
155 now = datetime.utcnow() |
157 session.set_cnxset() |
156 if not cnx.execute( |
158 if not session.execute( |
|
159 'SET X in_synchronization %(now)s WHERE X eid %(x)s, ' |
157 'SET X in_synchronization %(now)s WHERE X eid %(x)s, ' |
160 'X in_synchronization NULL OR X in_synchronization < %(maxdt)s', |
158 'X in_synchronization NULL OR X in_synchronization < %(maxdt)s', |
161 {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}): |
159 {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}): |
162 self.error('concurrent synchronization detected, skip pull') |
160 self.error('concurrent synchronization detected, skip pull') |
163 session.commit() |
161 cnx.commit() |
164 return False |
162 return False |
165 session.commit() |
163 cnx.commit() |
166 return True |
164 return True |
167 |
165 |
168 def release_synchronization_lock(self, session): |
166 def release_synchronization_lock(self, cnx): |
169 session.set_cnxset() |
167 cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s', |
170 session.execute('SET X in_synchronization NULL WHERE X eid %(x)s', |
|
171 {'x': self.eid}) |
168 {'x': self.eid}) |
172 session.commit() |
169 cnx.commit() |
173 |
170 |
174 def pull_data(self, session, force=False, raise_on_error=False): |
171 def pull_data(self, cnx, force=False, raise_on_error=False): |
175 """Launch synchronization of the source if needed. |
172 """Launch synchronization of the source if needed. |
176 |
173 |
177 This method is responsible to handle commit/rollback on the given |
174 This method is responsible to handle commit/rollback on the given |
178 session. |
175 connection. |
179 """ |
176 """ |
180 if not force and self.fresh(): |
177 if not force and self.fresh(): |
181 return {} |
178 return {} |
182 if not self.acquire_synchronization_lock(session): |
179 if not self.acquire_synchronization_lock(cnx): |
183 return {} |
180 return {} |
184 try: |
181 try: |
185 with session.transaction(free_cnxset=False): |
182 return self._pull_data(cnx, force, raise_on_error) |
186 return self._pull_data(session, force, raise_on_error) |
|
187 finally: |
183 finally: |
188 self.release_synchronization_lock(session) |
184 cnx.rollback() # rollback first in case there is some dirty |
189 |
185 # transaction remaining |
190 def _pull_data(self, session, force=False, raise_on_error=False): |
186 self.release_synchronization_lock(cnx) |
191 importlog = self.init_import_log(session) |
187 |
192 myuris = self.source_cwuris(session) |
188 def _pull_data(self, cnx, force=False, raise_on_error=False): |
193 parser = self._get_parser(session, sourceuris=myuris, import_log=importlog) |
189 importlog = self.init_import_log(cnx) |
|
190 myuris = self.source_cwuris(cnx) |
|
191 parser = self._get_parser(cnx, sourceuris=myuris, import_log=importlog) |
194 if self.process_urls(parser, self.urls, raise_on_error): |
192 if self.process_urls(parser, self.urls, raise_on_error): |
195 self.warning("some error occurred, don't attempt to delete entities") |
193 self.warning("some error occurred, don't attempt to delete entities") |
196 else: |
194 else: |
197 parser.handle_deletion(self.config, session, myuris) |
195 parser.handle_deletion(self.config, cnx, myuris) |
198 self.update_latest_retrieval(session) |
196 self.update_latest_retrieval(cnx) |
199 stats = parser.stats |
197 stats = parser.stats |
200 if stats.get('created'): |
198 if stats.get('created'): |
201 importlog.record_info('added %s entities' % len(stats['created'])) |
199 importlog.record_info('added %s entities' % len(stats['created'])) |
202 if stats.get('updated'): |
200 if stats.get('updated'): |
203 importlog.record_info('updated %s entities' % len(stats['updated'])) |
201 importlog.record_info('updated %s entities' % len(stats['updated'])) |
204 session.set_cnxset() |
202 importlog.write_log(cnx, end_timestamp=self.latest_retrieval) |
205 importlog.write_log(session, end_timestamp=self.latest_retrieval) |
203 cnx.commit() |
206 session.commit() |
|
207 return stats |
204 return stats |
208 |
205 |
209 def process_urls(self, parser, urls, raise_on_error=False): |
206 def process_urls(self, parser, urls, raise_on_error=False): |
210 error = False |
207 error = False |
211 for url in urls: |
208 for url in urls: |
414 return True |
411 return True |
415 error = False |
412 error = False |
416 # Check whether self._cw is a session or a connection |
413 # Check whether self._cw is a session or a connection |
417 if getattr(self._cw, 'commit', None) is not None: |
414 if getattr(self._cw, 'commit', None) is not None: |
418 commit = self._cw.commit |
415 commit = self._cw.commit |
419 set_cnxset = self._cw.set_cnxset |
|
420 rollback = self._cw.rollback |
416 rollback = self._cw.rollback |
421 else: |
417 else: |
422 commit = self._cw.cnx.commit |
418 commit = self._cw.cnx.commit |
423 set_cnxset = lambda: None |
|
424 rollback = self._cw.cnx.rollback |
419 rollback = self._cw.cnx.rollback |
425 for args in parsed: |
420 for args in parsed: |
426 try: |
421 try: |
427 self.process_item(*args) |
422 self.process_item(*args) |
428 # commit+set_cnxset instead of commit(free_cnxset=False) to let |
423 # commit+set_cnxset instead of commit(free_cnxset=False) to let |
429 # other a chance to get our connections set |
424 # other a chance to get our connections set |
430 commit() |
425 commit() |
431 set_cnxset() |
|
432 except ValidationError as exc: |
426 except ValidationError as exc: |
433 if raise_on_error: |
427 if raise_on_error: |
434 raise |
428 raise |
435 self.source.error('Skipping %s because of validation error %s' |
429 self.source.error('Skipping %s because of validation error %s' |
436 % (args, exc)) |
430 % (args, exc)) |
437 rollback() |
431 rollback() |
438 set_cnxset() |
|
439 error = True |
432 error = True |
440 return error |
433 return error |
441 |
434 |
442 def parse(self, url): |
435 def parse(self, url): |
443 if url.startswith('http'): |
436 if url.startswith('http'): |