[sources] synchronize source asynchronously when started from the UI
and redirect to the forthcoming import log, for a better user experience (there
is still a lot more to do there though).
Closes #10468967
--- a/cubicweb/server/sources/datafeed.py Fri Feb 19 12:35:22 2016 +0100
+++ b/cubicweb/server/sources/datafeed.py Wed Feb 03 11:12:09 2016 +0100
@@ -1,4 +1,4 @@
-# copyright 2010-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2010-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
@@ -22,6 +22,7 @@
from io import BytesIO
from os.path import exists
from datetime import datetime, timedelta
+from functools import partial
from six import text_type
from six.moves.urllib.parse import urlparse
@@ -194,25 +195,43 @@
{'x': self.eid})
cnx.commit()
- def pull_data(self, cnx, force=False, raise_on_error=False):
+ def pull_data(self, cnx, force=False, raise_on_error=False, async=False):
"""Launch synchronization of the source if needed.
- This method is responsible to handle commit/rollback on the given
- connection.
+ If `async` is true, the method return immediatly a dictionnary containing the import log's
+ eid, and the actual synchronization is done asynchronously. If `async` is false, return some
+ imports statistics (e.g. number of created and updated entities).
+
+ This method is responsible to handle commit/rollback on the given connection.
"""
if not force and self.fresh():
return {}
if not self.acquire_synchronization_lock(cnx, force):
return {}
try:
- return self._pull_data(cnx, force, raise_on_error)
+ if async:
+ return self._async_pull_data(cnx, force, raise_on_error)
+ else:
+ return self._pull_data(cnx, force, raise_on_error)
finally:
- cnx.rollback() # rollback first in case there is some dirty
- # transaction remaining
+ cnx.rollback() # rollback first in case there is some dirty transaction remaining
self.release_synchronization_lock(cnx)
- def _pull_data(self, cnx, force=False, raise_on_error=False):
- importlog = self.init_import_log(cnx)
+ def _async_pull_data(self, cnx, force, raise_on_error):
+ import_log = cnx.create_entity('CWDataImport', cw_import_of=self)
+ cnx.commit() # commit the import log creation before starting the synchronize task
+
+ def _synchronize_source(repo, source_eid, import_log_eid):
+ with repo.internal_cnx() as cnx:
+ source = repo.sources_by_eid[source_eid]
+ source._pull_data(cnx, force, raise_on_error, import_log_eid=import_log_eid)
+
+ sync = partial(_synchronize_source, cnx.repo, self.eid, import_log.eid)
+ cnx.repo.threaded_task(sync)
+ return {'import_log_eid': import_log.eid}
+
+ def _pull_data(self, cnx, force=False, raise_on_error=False, import_log_eid=None):
+ importlog = self.init_import_log(cnx, import_log_eid)
myuris = self.source_cwuris(cnx)
try:
parser = self._get_parser(cnx, sourceuris=myuris, import_log=importlog)
@@ -297,12 +316,17 @@
return dict((self.decode_extid(uri), (eid, type))
for uri, eid, type in cnx.system_sql(sql).fetchall())
- def init_import_log(self, cnx, **kwargs):
- dataimport = cnx.create_entity('CWDataImport', cw_import_of=self,
- start_timestamp=datetime.now(tz=utc),
- **kwargs)
- dataimport.init()
- return dataimport
+ def init_import_log(self, cnx, import_log_eid=None, **kwargs):
+ if import_log_eid is None:
+ import_log = cnx.create_entity('CWDataImport', cw_import_of=self,
+ start_timestamp=datetime.now(tz=utc),
+ **kwargs)
+ else:
+ import_log = cnx.entity_from_eid(import_log_eid)
+ import_log.cw_set(start_timestamp=datetime.now(tz=utc), **kwargs)
+ cnx.commit() # make changes visible
+ import_log.init()
+ return import_log
class DataFeedParser(AppObject):
--- a/cubicweb/sobjects/services.py Fri Feb 19 12:35:22 2016 +0100
+++ b/cubicweb/sobjects/services.py Wed Feb 03 11:12:09 2016 +0100
@@ -1,4 +1,4 @@
-# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2003-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
@@ -27,6 +27,7 @@
from cubicweb.server import Service
from cubicweb.predicates import match_user_groups, match_kwargs
+
class StatsService(Service):
"""Return a dictionary containing some statistics about the repository
resources usage.
@@ -161,14 +162,14 @@
class SourceSynchronizationService(Service):
- """Force synchronization of a datafeed source"""
+ """Force synchronization of a datafeed source. Actual synchronization is done
+ asynchronously, this will simply create and return the entity which will hold the import
+ log.
+ """
__regid__ = 'source-sync'
__select__ = Service.__select__ & match_user_groups('managers')
def call(self, source_eid):
- source_entity = self._cw.entity_from_eid(source_eid)
- repo = self._cw.repo # Service are repo side only.
- with repo.internal_cnx() as cnx:
- source = repo.sources_by_uri[source_entity.name]
- source.pull_data(cnx)
-
+ source = self._cw.repo.sources_by_eid[source_eid]
+ result = source.pull_data(self._cw, force=True, async=True)
+ return result['import_log_eid']
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/cubicweb/web/test/unittest_views_cwsources.py Wed Feb 03 11:12:09 2016 +0100
@@ -0,0 +1,26 @@
+from logilab.common import tempattr
+from cubicweb.devtools.testlib import CubicWebTC
+
+
+class SynchronizeSourceTC(CubicWebTC):
+ def test_synchronize_view(self):
+ with self.admin_access.web_request(vid='cw.source-sync') as req:
+ source = req.create_entity('CWSource', name=u'ext', type=u'datafeed',
+ parser=u'cw.entityxml')
+ req.cnx.commit()
+
+ self.threads = 0
+
+ def threaded_task(func):
+ self.threads += 1
+
+ with tempattr(req.cnx.repo, 'threaded_task', threaded_task):
+ path, args = self.expect_redirect_handle_request(
+ req, path=source.rest_path())
+ self.assertEqual(self.threads, 1)
+ self.assertTrue(path.startswith('cwdataimport/'))
+
+
+if __name__ == '__main__':
+ import unittest
+ unittest.main()
--- a/cubicweb/web/views/cwsources.py Fri Feb 19 12:35:22 2016 +0100
+++ b/cubicweb/web/views/cwsources.py Wed Feb 03 11:12:09 2016 +0100
@@ -1,4 +1,4 @@
-# copyright 2010-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2010-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
@@ -204,7 +204,6 @@
pass
-
class CWSourceImportsTab(EntityView):
__regid__ = 'cwsource-imports'
__select__ = (is_instance('CWSource')
@@ -250,14 +249,14 @@
title = _('synchronize')
def entity_call(self, entity):
- self._cw.call_service('source-sync', source_eid=entity.eid)
- msg = self._cw._('Source has been synchronized')
- url = entity.absolute_url(tab='cwsource-imports', __message=msg)
+ import_log_eid = self._cw.call_service('source-sync', source_eid=entity.eid)
+ msg = self._cw._('Synchronization has been requested, refresh this page in a few '
+ 'minutes.')
+ import_log = self._cw.entity_from_eid(import_log_eid)
+ url = import_log.absolute_url(__message=msg)
raise Redirect(url)
-
-
# sources management view ######################################################
class ManageSourcesAction(actions.ManagersAction):