[datafeed] make cnxset handling of datafeed source more robust
currently we may run in some cases where the session has no more cnxset
depending on errors and parser's handling of the cnxset.
Also, free the cnxset and reacquire it later, letting a chance to other
threads to run.
--- a/server/session.py Thu Aug 30 17:38:43 2012 +0200
+++ b/server/session.py Tue Sep 04 06:09:17 2012 +0200
@@ -106,7 +106,8 @@
self.free_cnxset = free_cnxset
def __enter__(self):
- pass
+ # ensure session has a cnxset
+ self.session.set_cnxset()
def __exit__(self, exctype, exc, traceback):
if exctype:
--- a/server/sources/datafeed.py Thu Aug 30 17:38:43 2012 +0200
+++ b/server/sources/datafeed.py Tue Sep 04 06:09:17 2012 +0200
@@ -152,21 +152,24 @@
def update_latest_retrieval(self, session):
self.latest_retrieval = datetime.utcnow()
+ session.set_cnxset()
session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
{'x': self.eid, 'date': self.latest_retrieval})
+ session.commit()
def acquire_synchronization_lock(self, session):
# XXX race condition until WHERE of SET queries is executed using
# 'SELECT FOR UPDATE'
now = datetime.utcnow()
+ session.set_cnxset()
if not session.execute(
'SET X in_synchronization %(now)s WHERE X eid %(x)s, '
'X in_synchronization NULL OR X in_synchronization < %(maxdt)s',
{'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}):
self.error('concurrent synchronization detected, skip pull')
- session.commit(free_cnxset=False)
+ session.commit()
return False
- session.commit(free_cnxset=False)
+ session.commit()
return True
def release_synchronization_lock(self, session):
@@ -205,7 +208,9 @@
importlog.record_info('added %s entities' % len(stats['created']))
if stats.get('updated'):
importlog.record_info('updated %s entities' % len(stats['updated']))
+ session.set_cnxset()
importlog.write_log(session, end_timestamp=self.latest_retrieval)
+ session.commit()
return stats
def process_urls(self, parser, urls, raise_on_error=False):
@@ -376,8 +381,10 @@
byetype.setdefault(etype, []).append(str(eid))
for etype, eids in byetype.iteritems():
self.warning('delete %s %s entities', len(eids), etype)
+ session.set_cnxset()
session.execute('DELETE %s X WHERE X eid IN (%s)'
% (etype, ','.join(eids)))
+ session.commit()
def update_if_necessary(self, entity, attrs):
entity.complete(tuple(attrs))