# HG changeset patch # User Sylvain Thénault # Date 1396628582 -7200 # Node ID 81b56897a3776183393c350a9b67974265b64407 # Parent c013d5d76f6660c799d83ac0d6c8cce12baa718a [datafeed] update datafeed internals to use connection instead of session diff -r c013d5d76f66 -r 81b56897a377 hooks/__init__.py --- a/hooks/__init__.py Fri Apr 04 17:57:58 2014 +0200 +++ b/hooks/__init__.py Fri Apr 04 18:23:02 2014 +0200 @@ -60,13 +60,11 @@ or not repo.config.source_enabled(source) or not source.config['synchronize']): continue - session = repo.internal_session(safe=True) - try: - source.pull_data(session) - except Exception as exc: - session.exception('while trying to update feed %s', source) - finally: - session.close() + with repo.internal_connection() as cnx: + try: + source.pull_data(cnx) + except Exception as exc: + cnx.exception('while trying to update feed %s', source) self.repo.looping_task(60, update_feeds, self.repo) diff -r c013d5d76f66 -r 81b56897a377 server/sources/datafeed.py --- a/server/sources/datafeed.py Fri Apr 04 17:57:58 2014 +0200 +++ b/server/sources/datafeed.py Fri Apr 04 18:23:02 2014 +0200 @@ -1,4 +1,4 @@ -# copyright 2010-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2010-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. @@ -143,67 +143,64 @@ return False return datetime.utcnow() < (self.latest_retrieval + self.synchro_interval) - def update_latest_retrieval(self, session): + def update_latest_retrieval(self, cnx): self.latest_retrieval = datetime.utcnow() - session.set_cnxset() - session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', + cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', {'x': self.eid, 'date': self.latest_retrieval}) - session.commit() + cnx.commit() - def acquire_synchronization_lock(self, session): + def acquire_synchronization_lock(self, cnx): # XXX race condition until WHERE of SET queries is executed using # 'SELECT FOR UPDATE' now = datetime.utcnow() - session.set_cnxset() - if not session.execute( + if not cnx.execute( 'SET X in_synchronization %(now)s WHERE X eid %(x)s, ' 'X in_synchronization NULL OR X in_synchronization < %(maxdt)s', {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}): self.error('concurrent synchronization detected, skip pull') - session.commit() + cnx.commit() return False - session.commit() + cnx.commit() return True - def release_synchronization_lock(self, session): - session.set_cnxset() - session.execute('SET X in_synchronization NULL WHERE X eid %(x)s', + def release_synchronization_lock(self, cnx): + cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s', {'x': self.eid}) - session.commit() + cnx.commit() - def pull_data(self, session, force=False, raise_on_error=False): + def pull_data(self, cnx, force=False, raise_on_error=False): """Launch synchronization of the source if needed. This method is responsible to handle commit/rollback on the given - session. + connection. """ if not force and self.fresh(): return {} - if not self.acquire_synchronization_lock(session): + if not self.acquire_synchronization_lock(cnx): return {} try: - with session.transaction(free_cnxset=False): - return self._pull_data(session, force, raise_on_error) + return self._pull_data(cnx, force, raise_on_error) finally: - self.release_synchronization_lock(session) + cnx.rollback() # rollback first in case there is some dirty + # transaction remaining + self.release_synchronization_lock(cnx) - def _pull_data(self, session, force=False, raise_on_error=False): - importlog = self.init_import_log(session) - myuris = self.source_cwuris(session) - parser = self._get_parser(session, sourceuris=myuris, import_log=importlog) + def _pull_data(self, cnx, force=False, raise_on_error=False): + importlog = self.init_import_log(cnx) + myuris = self.source_cwuris(cnx) + parser = self._get_parser(cnx, sourceuris=myuris, import_log=importlog) if self.process_urls(parser, self.urls, raise_on_error): self.warning("some error occurred, don't attempt to delete entities") else: - parser.handle_deletion(self.config, session, myuris) - self.update_latest_retrieval(session) + parser.handle_deletion(self.config, cnx, myuris) + self.update_latest_retrieval(cnx) stats = parser.stats if stats.get('created'): importlog.record_info('added %s entities' % len(stats['created'])) if stats.get('updated'): importlog.record_info('updated %s entities' % len(stats['updated'])) - session.set_cnxset() - importlog.write_log(session, end_timestamp=self.latest_retrieval) - session.commit() + importlog.write_log(cnx, end_timestamp=self.latest_retrieval) + cnx.commit() return stats def process_urls(self, parser, urls, raise_on_error=False): @@ -416,11 +413,9 @@ # Check whether self._cw is a session or a connection if getattr(self._cw, 'commit', None) is not None: commit = self._cw.commit - set_cnxset = self._cw.set_cnxset rollback = self._cw.rollback else: commit = self._cw.cnx.commit - set_cnxset = lambda: None rollback = self._cw.cnx.rollback for args in parsed: try: @@ -428,14 +423,12 @@ # commit+set_cnxset instead of commit(free_cnxset=False) to let # other a chance to get our connections set commit() - set_cnxset() except ValidationError as exc: if raise_on_error: raise self.source.error('Skipping %s because of validation error %s' % (args, exc)) rollback() - set_cnxset() error = True return error