[datafeed] make cnxset handling of datafeed source more robust stable
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Tue, 04 Sep 2012 06:09:17 +0200
branchstable
changeset 8529 1daea1f433c9
parent 8528 f32c50c6b7e0
child 8530 2bceea9dee95
[datafeed] make cnxset handling of datafeed source more robust currently we may run in some cases where the session has no more cnxset depending on errors and parser's handling of the cnxset. Also, free the cnxset and reacquire it later, letting a chance to other threads to run.
server/session.py
server/sources/datafeed.py
--- a/server/session.py	Thu Aug 30 17:38:43 2012 +0200
+++ b/server/session.py	Tue Sep 04 06:09:17 2012 +0200
@@ -106,7 +106,8 @@
         self.free_cnxset = free_cnxset
 
     def __enter__(self):
-        pass
+        # ensure session has a cnxset
+        self.session.set_cnxset()
 
     def __exit__(self, exctype, exc, traceback):
         if exctype:
--- a/server/sources/datafeed.py	Thu Aug 30 17:38:43 2012 +0200
+++ b/server/sources/datafeed.py	Tue Sep 04 06:09:17 2012 +0200
@@ -152,21 +152,24 @@
 
     def update_latest_retrieval(self, session):
         self.latest_retrieval = datetime.utcnow()
+        session.set_cnxset()
         session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
                         {'x': self.eid, 'date': self.latest_retrieval})
+        session.commit()
 
     def acquire_synchronization_lock(self, session):
         # XXX race condition until WHERE of SET queries is executed using
         # 'SELECT FOR UPDATE'
         now = datetime.utcnow()
+        session.set_cnxset()
         if not session.execute(
             'SET X in_synchronization %(now)s WHERE X eid %(x)s, '
             'X in_synchronization NULL OR X in_synchronization < %(maxdt)s',
             {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}):
             self.error('concurrent synchronization detected, skip pull')
-            session.commit(free_cnxset=False)
+            session.commit()
             return False
-        session.commit(free_cnxset=False)
+        session.commit()
         return True
 
     def release_synchronization_lock(self, session):
@@ -205,7 +208,9 @@
             importlog.record_info('added %s entities' % len(stats['created']))
         if stats.get('updated'):
             importlog.record_info('updated %s entities' % len(stats['updated']))
+        session.set_cnxset()
         importlog.write_log(session, end_timestamp=self.latest_retrieval)
+        session.commit()
         return stats
 
     def process_urls(self, parser, urls, raise_on_error=False):
@@ -376,8 +381,10 @@
                     byetype.setdefault(etype, []).append(str(eid))
             for etype, eids in byetype.iteritems():
                 self.warning('delete %s %s entities', len(eids), etype)
+                session.set_cnxset()
                 session.execute('DELETE %s X WHERE X eid IN (%s)'
                                 % (etype, ','.join(eids)))
+                session.commit()
 
     def update_if_necessary(self, entity, attrs):
         entity.complete(tuple(attrs))