[datafeed] update datafeed internals to use connection instead of session
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Fri, 04 Apr 2014 18:23:02 +0200
changeset 9746 81b56897a377
parent 9745 c013d5d76f66
child 9747 10108d9f502a
[datafeed] update datafeed internals to use connection instead of session
hooks/__init__.py
server/sources/datafeed.py
--- a/hooks/__init__.py	Fri Apr 04 17:57:58 2014 +0200
+++ b/hooks/__init__.py	Fri Apr 04 18:23:02 2014 +0200
@@ -60,13 +60,11 @@
                     or not repo.config.source_enabled(source)
                     or not source.config['synchronize']):
                     continue
-                session = repo.internal_session(safe=True)
-                try:
-                    source.pull_data(session)
-                except Exception as exc:
-                    session.exception('while trying to update feed %s', source)
-                finally:
-                    session.close()
+                with repo.internal_connection() as cnx:
+                    try:
+                        source.pull_data(cnx)
+                    except Exception as exc:
+                        cnx.exception('while trying to update feed %s', source)
         self.repo.looping_task(60, update_feeds, self.repo)
 
 
--- a/server/sources/datafeed.py	Fri Apr 04 17:57:58 2014 +0200
+++ b/server/sources/datafeed.py	Fri Apr 04 18:23:02 2014 +0200
@@ -1,4 +1,4 @@
-# copyright 2010-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2010-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
 #
 # This file is part of CubicWeb.
@@ -143,67 +143,64 @@
             return False
         return datetime.utcnow() < (self.latest_retrieval + self.synchro_interval)
 
-    def update_latest_retrieval(self, session):
+    def update_latest_retrieval(self, cnx):
         self.latest_retrieval = datetime.utcnow()
-        session.set_cnxset()
-        session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
+        cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
                         {'x': self.eid, 'date': self.latest_retrieval})
-        session.commit()
+        cnx.commit()
 
-    def acquire_synchronization_lock(self, session):
+    def acquire_synchronization_lock(self, cnx):
         # XXX race condition until WHERE of SET queries is executed using
         # 'SELECT FOR UPDATE'
         now = datetime.utcnow()
-        session.set_cnxset()
-        if not session.execute(
+        if not cnx.execute(
             'SET X in_synchronization %(now)s WHERE X eid %(x)s, '
             'X in_synchronization NULL OR X in_synchronization < %(maxdt)s',
             {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}):
             self.error('concurrent synchronization detected, skip pull')
-            session.commit()
+            cnx.commit()
             return False
-        session.commit()
+        cnx.commit()
         return True
 
-    def release_synchronization_lock(self, session):
-        session.set_cnxset()
-        session.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
+    def release_synchronization_lock(self, cnx):
+        cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
                         {'x': self.eid})
-        session.commit()
+        cnx.commit()
 
-    def pull_data(self, session, force=False, raise_on_error=False):
+    def pull_data(self, cnx, force=False, raise_on_error=False):
         """Launch synchronization of the source if needed.
 
         This method is responsible to handle commit/rollback on the given
-        session.
+        connection.
         """
         if not force and self.fresh():
             return {}
-        if not self.acquire_synchronization_lock(session):
+        if not self.acquire_synchronization_lock(cnx):
             return {}
         try:
-            with session.transaction(free_cnxset=False):
-                return self._pull_data(session, force, raise_on_error)
+            return self._pull_data(cnx, force, raise_on_error)
         finally:
-            self.release_synchronization_lock(session)
+            cnx.rollback() # rollback first in case there is some dirty
+                           # transaction remaining
+            self.release_synchronization_lock(cnx)
 
-    def _pull_data(self, session, force=False, raise_on_error=False):
-        importlog = self.init_import_log(session)
-        myuris = self.source_cwuris(session)
-        parser = self._get_parser(session, sourceuris=myuris, import_log=importlog)
+    def _pull_data(self, cnx, force=False, raise_on_error=False):
+        importlog = self.init_import_log(cnx)
+        myuris = self.source_cwuris(cnx)
+        parser = self._get_parser(cnx, sourceuris=myuris, import_log=importlog)
         if self.process_urls(parser, self.urls, raise_on_error):
             self.warning("some error occurred, don't attempt to delete entities")
         else:
-            parser.handle_deletion(self.config, session, myuris)
-        self.update_latest_retrieval(session)
+            parser.handle_deletion(self.config, cnx, myuris)
+        self.update_latest_retrieval(cnx)
         stats = parser.stats
         if stats.get('created'):
             importlog.record_info('added %s entities' % len(stats['created']))
         if stats.get('updated'):
             importlog.record_info('updated %s entities' % len(stats['updated']))
-        session.set_cnxset()
-        importlog.write_log(session, end_timestamp=self.latest_retrieval)
-        session.commit()
+        importlog.write_log(cnx, end_timestamp=self.latest_retrieval)
+        cnx.commit()
         return stats
 
     def process_urls(self, parser, urls, raise_on_error=False):
@@ -416,11 +413,9 @@
         # Check whether self._cw is a session or a connection
         if getattr(self._cw, 'commit', None) is not None:
             commit = self._cw.commit
-            set_cnxset = self._cw.set_cnxset
             rollback = self._cw.rollback
         else:
             commit = self._cw.cnx.commit
-            set_cnxset = lambda: None
             rollback = self._cw.cnx.rollback
         for args in parsed:
             try:
@@ -428,14 +423,12 @@
                 # commit+set_cnxset instead of commit(free_cnxset=False) to let
                 # other a chance to get our connections set
                 commit()
-                set_cnxset()
             except ValidationError as exc:
                 if raise_on_error:
                     raise
                 self.source.error('Skipping %s because of validation error %s'
                                   % (args, exc))
                 rollback()
-                set_cnxset()
                 error = True
         return error