# HG changeset patch # User Sylvain Thénault # Date 1346731757 -7200 # Node ID 1daea1f433c9e32be27cee4aada9c16c018b91ba # Parent f32c50c6b7e0e5f0a3351c162afbbad5a5dd8d69 [datafeed] make cnxset handling of datafeed source more robust currently we may run in some cases where the session has no more cnxset depending on errors and parser's handling of the cnxset. Also, free the cnxset and reacquire it later, letting a chance to other threads to run. diff -r f32c50c6b7e0 -r 1daea1f433c9 server/session.py --- a/server/session.py Thu Aug 30 17:38:43 2012 +0200 +++ b/server/session.py Tue Sep 04 06:09:17 2012 +0200 @@ -106,7 +106,8 @@ self.free_cnxset = free_cnxset def __enter__(self): - pass + # ensure session has a cnxset + self.session.set_cnxset() def __exit__(self, exctype, exc, traceback): if exctype: diff -r f32c50c6b7e0 -r 1daea1f433c9 server/sources/datafeed.py --- a/server/sources/datafeed.py Thu Aug 30 17:38:43 2012 +0200 +++ b/server/sources/datafeed.py Tue Sep 04 06:09:17 2012 +0200 @@ -152,21 +152,24 @@ def update_latest_retrieval(self, session): self.latest_retrieval = datetime.utcnow() + session.set_cnxset() session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', {'x': self.eid, 'date': self.latest_retrieval}) + session.commit() def acquire_synchronization_lock(self, session): # XXX race condition until WHERE of SET queries is executed using # 'SELECT FOR UPDATE' now = datetime.utcnow() + session.set_cnxset() if not session.execute( 'SET X in_synchronization %(now)s WHERE X eid %(x)s, ' 'X in_synchronization NULL OR X in_synchronization < %(maxdt)s', {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}): self.error('concurrent synchronization detected, skip pull') - session.commit(free_cnxset=False) + session.commit() return False - session.commit(free_cnxset=False) + session.commit() return True def release_synchronization_lock(self, session): @@ -205,7 +208,9 @@ importlog.record_info('added %s entities' % len(stats['created'])) if stats.get('updated'): importlog.record_info('updated %s entities' % len(stats['updated'])) + session.set_cnxset() importlog.write_log(session, end_timestamp=self.latest_retrieval) + session.commit() return stats def process_urls(self, parser, urls, raise_on_error=False): @@ -376,8 +381,10 @@ byetype.setdefault(etype, []).append(str(eid)) for etype, eids in byetype.iteritems(): self.warning('delete %s %s entities', len(eids), etype) + session.set_cnxset() session.execute('DELETE %s X WHERE X eid IN (%s)' % (etype, ','.join(eids))) + session.commit() def update_if_necessary(self, entity, attrs): entity.complete(tuple(attrs))