diff -r c09feae04094 -r 268b6349baf3 server/sources/datafeed.py --- a/server/sources/datafeed.py Fri Aug 03 13:29:37 2012 +0200 +++ b/server/sources/datafeed.py Fri Sep 07 14:01:59 2012 +0200 @@ -152,21 +152,24 @@ def update_latest_retrieval(self, session): self.latest_retrieval = datetime.utcnow() + session.set_cnxset() session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', {'x': self.eid, 'date': self.latest_retrieval}) + session.commit() def acquire_synchronization_lock(self, session): # XXX race condition until WHERE of SET queries is executed using # 'SELECT FOR UPDATE' now = datetime.utcnow() + session.set_cnxset() if not session.execute( 'SET X in_synchronization %(now)s WHERE X eid %(x)s, ' 'X in_synchronization NULL OR X in_synchronization < %(maxdt)s', {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}): self.error('concurrent synchronization detected, skip pull') - session.commit(free_cnxset=False) + session.commit() return False - session.commit(free_cnxset=False) + session.commit() return True def release_synchronization_lock(self, session): @@ -205,7 +208,9 @@ importlog.record_info('added %s entities' % len(stats['created'])) if stats.get('updated'): importlog.record_info('updated %s entities' % len(stats['updated'])) + session.set_cnxset() importlog.write_log(session, end_timestamp=self.latest_retrieval) + session.commit() return stats def process_urls(self, parser, urls, raise_on_error=False): @@ -376,8 +381,10 @@ byetype.setdefault(etype, []).append(str(eid)) for etype, eids in byetype.iteritems(): self.warning('delete %s %s entities', len(eids), etype) + session.set_cnxset() session.execute('DELETE %s X WHERE X eid IN (%s)' % (etype, ','.join(eids))) + session.commit() def update_if_necessary(self, entity, attrs): entity.complete(tuple(attrs))