--- a/server/sources/datafeed.py Fri Apr 04 17:57:58 2014 +0200
+++ b/server/sources/datafeed.py Fri Apr 04 18:23:02 2014 +0200
@@ -1,4 +1,4 @@
-# copyright 2010-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2010-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
@@ -143,67 +143,64 @@
return False
return datetime.utcnow() < (self.latest_retrieval + self.synchro_interval)
- def update_latest_retrieval(self, session):
+ def update_latest_retrieval(self, cnx):
self.latest_retrieval = datetime.utcnow()
- session.set_cnxset()
- session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
+ cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
{'x': self.eid, 'date': self.latest_retrieval})
- session.commit()
+ cnx.commit()
- def acquire_synchronization_lock(self, session):
+ def acquire_synchronization_lock(self, cnx):
# XXX race condition until WHERE of SET queries is executed using
# 'SELECT FOR UPDATE'
now = datetime.utcnow()
- session.set_cnxset()
- if not session.execute(
+ if not cnx.execute(
'SET X in_synchronization %(now)s WHERE X eid %(x)s, '
'X in_synchronization NULL OR X in_synchronization < %(maxdt)s',
{'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}):
self.error('concurrent synchronization detected, skip pull')
- session.commit()
+ cnx.commit()
return False
- session.commit()
+ cnx.commit()
return True
- def release_synchronization_lock(self, session):
- session.set_cnxset()
- session.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
+ def release_synchronization_lock(self, cnx):
+ cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
{'x': self.eid})
- session.commit()
+ cnx.commit()
- def pull_data(self, session, force=False, raise_on_error=False):
+ def pull_data(self, cnx, force=False, raise_on_error=False):
"""Launch synchronization of the source if needed.
This method is responsible to handle commit/rollback on the given
- session.
+ connection.
"""
if not force and self.fresh():
return {}
- if not self.acquire_synchronization_lock(session):
+ if not self.acquire_synchronization_lock(cnx):
return {}
try:
- with session.transaction(free_cnxset=False):
- return self._pull_data(session, force, raise_on_error)
+ return self._pull_data(cnx, force, raise_on_error)
finally:
- self.release_synchronization_lock(session)
+ cnx.rollback() # rollback first in case there is some dirty
+ # transaction remaining
+ self.release_synchronization_lock(cnx)
- def _pull_data(self, session, force=False, raise_on_error=False):
- importlog = self.init_import_log(session)
- myuris = self.source_cwuris(session)
- parser = self._get_parser(session, sourceuris=myuris, import_log=importlog)
+ def _pull_data(self, cnx, force=False, raise_on_error=False):
+ importlog = self.init_import_log(cnx)
+ myuris = self.source_cwuris(cnx)
+ parser = self._get_parser(cnx, sourceuris=myuris, import_log=importlog)
if self.process_urls(parser, self.urls, raise_on_error):
self.warning("some error occurred, don't attempt to delete entities")
else:
- parser.handle_deletion(self.config, session, myuris)
- self.update_latest_retrieval(session)
+ parser.handle_deletion(self.config, cnx, myuris)
+ self.update_latest_retrieval(cnx)
stats = parser.stats
if stats.get('created'):
importlog.record_info('added %s entities' % len(stats['created']))
if stats.get('updated'):
importlog.record_info('updated %s entities' % len(stats['updated']))
- session.set_cnxset()
- importlog.write_log(session, end_timestamp=self.latest_retrieval)
- session.commit()
+ importlog.write_log(cnx, end_timestamp=self.latest_retrieval)
+ cnx.commit()
return stats
def process_urls(self, parser, urls, raise_on_error=False):
@@ -416,11 +413,9 @@
# Check whether self._cw is a session or a connection
if getattr(self._cw, 'commit', None) is not None:
commit = self._cw.commit
- set_cnxset = self._cw.set_cnxset
rollback = self._cw.rollback
else:
commit = self._cw.cnx.commit
- set_cnxset = lambda: None
rollback = self._cw.cnx.rollback
for args in parsed:
try:
@@ -428,14 +423,12 @@
# commit+set_cnxset instead of commit(free_cnxset=False) to let
# other a chance to get our connections set
commit()
- set_cnxset()
except ValidationError as exc:
if raise_on_error:
raise
self.source.error('Skipping %s because of validation error %s'
% (args, exc))
rollback()
- set_cnxset()
error = True
return error