server/sources/datafeed.py
changeset 8535 268b6349baf3
parent 8483 4ba11607d84a
parent 8529 1daea1f433c9
child 8573 ae0a567dff30
--- a/server/sources/datafeed.py	Fri Aug 03 13:29:37 2012 +0200
+++ b/server/sources/datafeed.py	Fri Sep 07 14:01:59 2012 +0200
@@ -152,21 +152,24 @@
 
     def update_latest_retrieval(self, session):
         self.latest_retrieval = datetime.utcnow()
+        session.set_cnxset()
         session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
                         {'x': self.eid, 'date': self.latest_retrieval})
+        session.commit()
 
     def acquire_synchronization_lock(self, session):
         # XXX race condition until WHERE of SET queries is executed using
         # 'SELECT FOR UPDATE'
         now = datetime.utcnow()
+        session.set_cnxset()
         if not session.execute(
             'SET X in_synchronization %(now)s WHERE X eid %(x)s, '
             'X in_synchronization NULL OR X in_synchronization < %(maxdt)s',
             {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}):
             self.error('concurrent synchronization detected, skip pull')
-            session.commit(free_cnxset=False)
+            session.commit()
             return False
-        session.commit(free_cnxset=False)
+        session.commit()
         return True
 
     def release_synchronization_lock(self, session):
@@ -205,7 +208,9 @@
             importlog.record_info('added %s entities' % len(stats['created']))
         if stats.get('updated'):
             importlog.record_info('updated %s entities' % len(stats['updated']))
+        session.set_cnxset()
         importlog.write_log(session, end_timestamp=self.latest_retrieval)
+        session.commit()
         return stats
 
     def process_urls(self, parser, urls, raise_on_error=False):
@@ -376,8 +381,10 @@
                     byetype.setdefault(etype, []).append(str(eid))
             for etype, eids in byetype.iteritems():
                 self.warning('delete %s %s entities', len(eids), etype)
+                session.set_cnxset()
                 session.execute('DELETE %s X WHERE X eid IN (%s)'
                                 % (etype, ','.join(eids)))
+                session.commit()
 
     def update_if_necessary(self, entity, attrs):
         entity.complete(tuple(attrs))