--- a/cubicweb/server/sources/datafeed.py Fri Feb 19 12:35:22 2016 +0100
+++ b/cubicweb/server/sources/datafeed.py Wed Feb 03 11:12:09 2016 +0100
@@ -1,4 +1,4 @@
-# copyright 2010-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2010-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
@@ -22,6 +22,7 @@
from io import BytesIO
from os.path import exists
from datetime import datetime, timedelta
+from functools import partial
from six import text_type
from six.moves.urllib.parse import urlparse
@@ -194,25 +195,43 @@
{'x': self.eid})
cnx.commit()
- def pull_data(self, cnx, force=False, raise_on_error=False):
+ def pull_data(self, cnx, force=False, raise_on_error=False, async=False):
"""Launch synchronization of the source if needed.
- This method is responsible to handle commit/rollback on the given
- connection.
+ If `async` is true, the method return immediatly a dictionnary containing the import log's
+ eid, and the actual synchronization is done asynchronously. If `async` is false, return some
+ imports statistics (e.g. number of created and updated entities).
+
+ This method is responsible to handle commit/rollback on the given connection.
"""
if not force and self.fresh():
return {}
if not self.acquire_synchronization_lock(cnx, force):
return {}
try:
- return self._pull_data(cnx, force, raise_on_error)
+ if async:
+ return self._async_pull_data(cnx, force, raise_on_error)
+ else:
+ return self._pull_data(cnx, force, raise_on_error)
finally:
- cnx.rollback() # rollback first in case there is some dirty
- # transaction remaining
+ cnx.rollback() # rollback first in case there is some dirty transaction remaining
self.release_synchronization_lock(cnx)
- def _pull_data(self, cnx, force=False, raise_on_error=False):
- importlog = self.init_import_log(cnx)
+ def _async_pull_data(self, cnx, force, raise_on_error):
+ import_log = cnx.create_entity('CWDataImport', cw_import_of=self)
+ cnx.commit() # commit the import log creation before starting the synchronize task
+
+ def _synchronize_source(repo, source_eid, import_log_eid):
+ with repo.internal_cnx() as cnx:
+ source = repo.sources_by_eid[source_eid]
+ source._pull_data(cnx, force, raise_on_error, import_log_eid=import_log_eid)
+
+ sync = partial(_synchronize_source, cnx.repo, self.eid, import_log.eid)
+ cnx.repo.threaded_task(sync)
+ return {'import_log_eid': import_log.eid}
+
+ def _pull_data(self, cnx, force=False, raise_on_error=False, import_log_eid=None):
+ importlog = self.init_import_log(cnx, import_log_eid)
myuris = self.source_cwuris(cnx)
try:
parser = self._get_parser(cnx, sourceuris=myuris, import_log=importlog)
@@ -297,12 +316,17 @@
return dict((self.decode_extid(uri), (eid, type))
for uri, eid, type in cnx.system_sql(sql).fetchall())
- def init_import_log(self, cnx, **kwargs):
- dataimport = cnx.create_entity('CWDataImport', cw_import_of=self,
- start_timestamp=datetime.now(tz=utc),
- **kwargs)
- dataimport.init()
- return dataimport
+ def init_import_log(self, cnx, import_log_eid=None, **kwargs):
+ if import_log_eid is None:
+ import_log = cnx.create_entity('CWDataImport', cw_import_of=self,
+ start_timestamp=datetime.now(tz=utc),
+ **kwargs)
+ else:
+ import_log = cnx.entity_from_eid(import_log_eid)
+ import_log.cw_set(start_timestamp=datetime.now(tz=utc), **kwargs)
+ cnx.commit() # make changes visible
+ import_log.init()
+ return import_log
class DataFeedParser(AppObject):