--- a/server/sources/datafeed.py Fri Aug 03 13:29:37 2012 +0200
+++ b/server/sources/datafeed.py Fri Sep 07 14:01:59 2012 +0200
@@ -152,21 +152,24 @@
def update_latest_retrieval(self, session):
self.latest_retrieval = datetime.utcnow()
+ session.set_cnxset()
session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
{'x': self.eid, 'date': self.latest_retrieval})
+ session.commit()
def acquire_synchronization_lock(self, session):
# XXX race condition until WHERE of SET queries is executed using
# 'SELECT FOR UPDATE'
now = datetime.utcnow()
+ session.set_cnxset()
if not session.execute(
'SET X in_synchronization %(now)s WHERE X eid %(x)s, '
'X in_synchronization NULL OR X in_synchronization < %(maxdt)s',
{'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}):
self.error('concurrent synchronization detected, skip pull')
- session.commit(free_cnxset=False)
+ session.commit()
return False
- session.commit(free_cnxset=False)
+ session.commit()
return True
def release_synchronization_lock(self, session):
@@ -205,7 +208,9 @@
importlog.record_info('added %s entities' % len(stats['created']))
if stats.get('updated'):
importlog.record_info('updated %s entities' % len(stats['updated']))
+ session.set_cnxset()
importlog.write_log(session, end_timestamp=self.latest_retrieval)
+ session.commit()
return stats
def process_urls(self, parser, urls, raise_on_error=False):
@@ -376,8 +381,10 @@
byetype.setdefault(etype, []).append(str(eid))
for etype, eids in byetype.iteritems():
self.warning('delete %s %s entities', len(eids), etype)
+ session.set_cnxset()
session.execute('DELETE %s X WHERE X eid IN (%s)'
% (etype, ','.join(eids)))
+ session.commit()
def update_if_necessary(self, entity, attrs):
entity.complete(tuple(attrs))