server/sources/datafeed.py
changeset 9746 81b56897a377
parent 9665 887ad08e3a61
child 9822 4a118bfd6ab4
child 9860 e24bf60428d3
--- a/server/sources/datafeed.py	Fri Apr 04 17:57:58 2014 +0200
+++ b/server/sources/datafeed.py	Fri Apr 04 18:23:02 2014 +0200
@@ -1,4 +1,4 @@
-# copyright 2010-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2010-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
 #
 # This file is part of CubicWeb.
@@ -143,67 +143,64 @@
             return False
         return datetime.utcnow() < (self.latest_retrieval + self.synchro_interval)
 
-    def update_latest_retrieval(self, session):
+    def update_latest_retrieval(self, cnx):
         self.latest_retrieval = datetime.utcnow()
-        session.set_cnxset()
-        session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
+        cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
                         {'x': self.eid, 'date': self.latest_retrieval})
-        session.commit()
+        cnx.commit()
 
-    def acquire_synchronization_lock(self, session):
+    def acquire_synchronization_lock(self, cnx):
         # XXX race condition until WHERE of SET queries is executed using
         # 'SELECT FOR UPDATE'
         now = datetime.utcnow()
-        session.set_cnxset()
-        if not session.execute(
+        if not cnx.execute(
             'SET X in_synchronization %(now)s WHERE X eid %(x)s, '
             'X in_synchronization NULL OR X in_synchronization < %(maxdt)s',
             {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}):
             self.error('concurrent synchronization detected, skip pull')
-            session.commit()
+            cnx.commit()
             return False
-        session.commit()
+        cnx.commit()
         return True
 
-    def release_synchronization_lock(self, session):
-        session.set_cnxset()
-        session.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
+    def release_synchronization_lock(self, cnx):
+        cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
                         {'x': self.eid})
-        session.commit()
+        cnx.commit()
 
-    def pull_data(self, session, force=False, raise_on_error=False):
+    def pull_data(self, cnx, force=False, raise_on_error=False):
         """Launch synchronization of the source if needed.
 
         This method is responsible to handle commit/rollback on the given
-        session.
+        connection.
         """
         if not force and self.fresh():
             return {}
-        if not self.acquire_synchronization_lock(session):
+        if not self.acquire_synchronization_lock(cnx):
             return {}
         try:
-            with session.transaction(free_cnxset=False):
-                return self._pull_data(session, force, raise_on_error)
+            return self._pull_data(cnx, force, raise_on_error)
         finally:
-            self.release_synchronization_lock(session)
+            cnx.rollback() # rollback first in case there is some dirty
+                           # transaction remaining
+            self.release_synchronization_lock(cnx)
 
-    def _pull_data(self, session, force=False, raise_on_error=False):
-        importlog = self.init_import_log(session)
-        myuris = self.source_cwuris(session)
-        parser = self._get_parser(session, sourceuris=myuris, import_log=importlog)
+    def _pull_data(self, cnx, force=False, raise_on_error=False):
+        importlog = self.init_import_log(cnx)
+        myuris = self.source_cwuris(cnx)
+        parser = self._get_parser(cnx, sourceuris=myuris, import_log=importlog)
         if self.process_urls(parser, self.urls, raise_on_error):
             self.warning("some error occurred, don't attempt to delete entities")
         else:
-            parser.handle_deletion(self.config, session, myuris)
-        self.update_latest_retrieval(session)
+            parser.handle_deletion(self.config, cnx, myuris)
+        self.update_latest_retrieval(cnx)
         stats = parser.stats
         if stats.get('created'):
             importlog.record_info('added %s entities' % len(stats['created']))
         if stats.get('updated'):
             importlog.record_info('updated %s entities' % len(stats['updated']))
-        session.set_cnxset()
-        importlog.write_log(session, end_timestamp=self.latest_retrieval)
-        session.commit()
+        importlog.write_log(cnx, end_timestamp=self.latest_retrieval)
+        cnx.commit()
         return stats
 
     def process_urls(self, parser, urls, raise_on_error=False):
@@ -416,11 +413,9 @@
         # Check whether self._cw is a session or a connection
         if getattr(self._cw, 'commit', None) is not None:
             commit = self._cw.commit
-            set_cnxset = self._cw.set_cnxset
             rollback = self._cw.rollback
         else:
             commit = self._cw.cnx.commit
-            set_cnxset = lambda: None
             rollback = self._cw.cnx.rollback
         for args in parsed:
             try:
@@ -428,14 +423,12 @@
                 # commit+set_cnxset instead of commit(free_cnxset=False) to let
                 # other a chance to get our connections set
                 commit()
-                set_cnxset()
             except ValidationError as exc:
                 if raise_on_error:
                     raise
                 self.source.error('Skipping %s because of validation error %s'
                                   % (args, exc))
                 rollback()
-                set_cnxset()
                 error = True
         return error