150 return False |
150 return False |
151 return datetime.utcnow() < (self.latest_retrieval + self.synchro_interval) |
151 return datetime.utcnow() < (self.latest_retrieval + self.synchro_interval) |
152 |
152 |
153 def update_latest_retrieval(self, session): |
153 def update_latest_retrieval(self, session): |
154 self.latest_retrieval = datetime.utcnow() |
154 self.latest_retrieval = datetime.utcnow() |
|
155 session.set_cnxset() |
155 session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', |
156 session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', |
156 {'x': self.eid, 'date': self.latest_retrieval}) |
157 {'x': self.eid, 'date': self.latest_retrieval}) |
|
158 session.commit() |
157 |
159 |
158 def acquire_synchronization_lock(self, session): |
160 def acquire_synchronization_lock(self, session): |
159 # XXX race condition until WHERE of SET queries is executed using |
161 # XXX race condition until WHERE of SET queries is executed using |
160 # 'SELECT FOR UPDATE' |
162 # 'SELECT FOR UPDATE' |
161 now = datetime.utcnow() |
163 now = datetime.utcnow() |
|
164 session.set_cnxset() |
162 if not session.execute( |
165 if not session.execute( |
163 'SET X in_synchronization %(now)s WHERE X eid %(x)s, ' |
166 'SET X in_synchronization %(now)s WHERE X eid %(x)s, ' |
164 'X in_synchronization NULL OR X in_synchronization < %(maxdt)s', |
167 'X in_synchronization NULL OR X in_synchronization < %(maxdt)s', |
165 {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}): |
168 {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}): |
166 self.error('concurrent synchronization detected, skip pull') |
169 self.error('concurrent synchronization detected, skip pull') |
167 session.commit(free_cnxset=False) |
170 session.commit() |
168 return False |
171 return False |
169 session.commit(free_cnxset=False) |
172 session.commit() |
170 return True |
173 return True |
171 |
174 |
172 def release_synchronization_lock(self, session): |
175 def release_synchronization_lock(self, session): |
173 session.set_cnxset() |
176 session.set_cnxset() |
174 session.execute('SET X in_synchronization NULL WHERE X eid %(x)s', |
177 session.execute('SET X in_synchronization NULL WHERE X eid %(x)s', |
203 stats = parser.stats |
206 stats = parser.stats |
204 if stats.get('created'): |
207 if stats.get('created'): |
205 importlog.record_info('added %s entities' % len(stats['created'])) |
208 importlog.record_info('added %s entities' % len(stats['created'])) |
206 if stats.get('updated'): |
209 if stats.get('updated'): |
207 importlog.record_info('updated %s entities' % len(stats['updated'])) |
210 importlog.record_info('updated %s entities' % len(stats['updated'])) |
|
211 session.set_cnxset() |
208 importlog.write_log(session, end_timestamp=self.latest_retrieval) |
212 importlog.write_log(session, end_timestamp=self.latest_retrieval) |
|
213 session.commit() |
209 return stats |
214 return stats |
210 |
215 |
211 def process_urls(self, parser, urls, raise_on_error=False): |
216 def process_urls(self, parser, urls, raise_on_error=False): |
212 error = False |
217 error = False |
213 for url in urls: |
218 for url in urls: |
374 for extid, (eid, etype) in myuris.iteritems(): |
379 for extid, (eid, etype) in myuris.iteritems(): |
375 if self.is_deleted(extid, etype, eid): |
380 if self.is_deleted(extid, etype, eid): |
376 byetype.setdefault(etype, []).append(str(eid)) |
381 byetype.setdefault(etype, []).append(str(eid)) |
377 for etype, eids in byetype.iteritems(): |
382 for etype, eids in byetype.iteritems(): |
378 self.warning('delete %s %s entities', len(eids), etype) |
383 self.warning('delete %s %s entities', len(eids), etype) |
|
384 session.set_cnxset() |
379 session.execute('DELETE %s X WHERE X eid IN (%s)' |
385 session.execute('DELETE %s X WHERE X eid IN (%s)' |
380 % (etype, ','.join(eids))) |
386 % (etype, ','.join(eids))) |
|
387 session.commit() |
381 |
388 |
382 def update_if_necessary(self, entity, attrs): |
389 def update_if_necessary(self, entity, attrs): |
383 entity.complete(tuple(attrs)) |
390 entity.complete(tuple(attrs)) |
384 # check modification date and compare attribute values to only update |
391 # check modification date and compare attribute values to only update |
385 # what's actually needed |
392 # what's actually needed |