# HG changeset patch # User Sylvain Thénault # Date 1454494329 -3600 # Node ID 78c8e64f3cef7a4a57ad789bf8e7efccf45e2a25 # Parent 447a6f1e8def0af51296b0fcae37d01a7dddb175 [sources] synchronize source asynchronously when started from the UI and redirect to the forthcoming import log, for a better user experience (there is still a lot more to do there though). Closes #10468967 diff -r 447a6f1e8def -r 78c8e64f3cef cubicweb/server/sources/datafeed.py --- a/cubicweb/server/sources/datafeed.py Fri Feb 19 12:35:22 2016 +0100 +++ b/cubicweb/server/sources/datafeed.py Wed Feb 03 11:12:09 2016 +0100 @@ -1,4 +1,4 @@ -# copyright 2010-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2010-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. @@ -22,6 +22,7 @@ from io import BytesIO from os.path import exists from datetime import datetime, timedelta +from functools import partial from six import text_type from six.moves.urllib.parse import urlparse @@ -194,25 +195,43 @@ {'x': self.eid}) cnx.commit() - def pull_data(self, cnx, force=False, raise_on_error=False): + def pull_data(self, cnx, force=False, raise_on_error=False, async=False): """Launch synchronization of the source if needed. - This method is responsible to handle commit/rollback on the given - connection. + If `async` is true, the method return immediatly a dictionnary containing the import log's + eid, and the actual synchronization is done asynchronously. If `async` is false, return some + imports statistics (e.g. number of created and updated entities). + + This method is responsible to handle commit/rollback on the given connection. """ if not force and self.fresh(): return {} if not self.acquire_synchronization_lock(cnx, force): return {} try: - return self._pull_data(cnx, force, raise_on_error) + if async: + return self._async_pull_data(cnx, force, raise_on_error) + else: + return self._pull_data(cnx, force, raise_on_error) finally: - cnx.rollback() # rollback first in case there is some dirty - # transaction remaining + cnx.rollback() # rollback first in case there is some dirty transaction remaining self.release_synchronization_lock(cnx) - def _pull_data(self, cnx, force=False, raise_on_error=False): - importlog = self.init_import_log(cnx) + def _async_pull_data(self, cnx, force, raise_on_error): + import_log = cnx.create_entity('CWDataImport', cw_import_of=self) + cnx.commit() # commit the import log creation before starting the synchronize task + + def _synchronize_source(repo, source_eid, import_log_eid): + with repo.internal_cnx() as cnx: + source = repo.sources_by_eid[source_eid] + source._pull_data(cnx, force, raise_on_error, import_log_eid=import_log_eid) + + sync = partial(_synchronize_source, cnx.repo, self.eid, import_log.eid) + cnx.repo.threaded_task(sync) + return {'import_log_eid': import_log.eid} + + def _pull_data(self, cnx, force=False, raise_on_error=False, import_log_eid=None): + importlog = self.init_import_log(cnx, import_log_eid) myuris = self.source_cwuris(cnx) try: parser = self._get_parser(cnx, sourceuris=myuris, import_log=importlog) @@ -297,12 +316,17 @@ return dict((self.decode_extid(uri), (eid, type)) for uri, eid, type in cnx.system_sql(sql).fetchall()) - def init_import_log(self, cnx, **kwargs): - dataimport = cnx.create_entity('CWDataImport', cw_import_of=self, - start_timestamp=datetime.now(tz=utc), - **kwargs) - dataimport.init() - return dataimport + def init_import_log(self, cnx, import_log_eid=None, **kwargs): + if import_log_eid is None: + import_log = cnx.create_entity('CWDataImport', cw_import_of=self, + start_timestamp=datetime.now(tz=utc), + **kwargs) + else: + import_log = cnx.entity_from_eid(import_log_eid) + import_log.cw_set(start_timestamp=datetime.now(tz=utc), **kwargs) + cnx.commit() # make changes visible + import_log.init() + return import_log class DataFeedParser(AppObject): diff -r 447a6f1e8def -r 78c8e64f3cef cubicweb/sobjects/services.py --- a/cubicweb/sobjects/services.py Fri Feb 19 12:35:22 2016 +0100 +++ b/cubicweb/sobjects/services.py Wed Feb 03 11:12:09 2016 +0100 @@ -1,4 +1,4 @@ -# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2003-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. @@ -27,6 +27,7 @@ from cubicweb.server import Service from cubicweb.predicates import match_user_groups, match_kwargs + class StatsService(Service): """Return a dictionary containing some statistics about the repository resources usage. @@ -161,14 +162,14 @@ class SourceSynchronizationService(Service): - """Force synchronization of a datafeed source""" + """Force synchronization of a datafeed source. Actual synchronization is done + asynchronously, this will simply create and return the entity which will hold the import + log. + """ __regid__ = 'source-sync' __select__ = Service.__select__ & match_user_groups('managers') def call(self, source_eid): - source_entity = self._cw.entity_from_eid(source_eid) - repo = self._cw.repo # Service are repo side only. - with repo.internal_cnx() as cnx: - source = repo.sources_by_uri[source_entity.name] - source.pull_data(cnx) - + source = self._cw.repo.sources_by_eid[source_eid] + result = source.pull_data(self._cw, force=True, async=True) + return result['import_log_eid'] diff -r 447a6f1e8def -r 78c8e64f3cef cubicweb/web/test/unittest_views_cwsources.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cubicweb/web/test/unittest_views_cwsources.py Wed Feb 03 11:12:09 2016 +0100 @@ -0,0 +1,26 @@ +from logilab.common import tempattr +from cubicweb.devtools.testlib import CubicWebTC + + +class SynchronizeSourceTC(CubicWebTC): + def test_synchronize_view(self): + with self.admin_access.web_request(vid='cw.source-sync') as req: + source = req.create_entity('CWSource', name=u'ext', type=u'datafeed', + parser=u'cw.entityxml') + req.cnx.commit() + + self.threads = 0 + + def threaded_task(func): + self.threads += 1 + + with tempattr(req.cnx.repo, 'threaded_task', threaded_task): + path, args = self.expect_redirect_handle_request( + req, path=source.rest_path()) + self.assertEqual(self.threads, 1) + self.assertTrue(path.startswith('cwdataimport/')) + + +if __name__ == '__main__': + import unittest + unittest.main() diff -r 447a6f1e8def -r 78c8e64f3cef cubicweb/web/views/cwsources.py --- a/cubicweb/web/views/cwsources.py Fri Feb 19 12:35:22 2016 +0100 +++ b/cubicweb/web/views/cwsources.py Wed Feb 03 11:12:09 2016 +0100 @@ -1,4 +1,4 @@ -# copyright 2010-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2010-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. @@ -204,7 +204,6 @@ pass - class CWSourceImportsTab(EntityView): __regid__ = 'cwsource-imports' __select__ = (is_instance('CWSource') @@ -250,14 +249,14 @@ title = _('synchronize') def entity_call(self, entity): - self._cw.call_service('source-sync', source_eid=entity.eid) - msg = self._cw._('Source has been synchronized') - url = entity.absolute_url(tab='cwsource-imports', __message=msg) + import_log_eid = self._cw.call_service('source-sync', source_eid=entity.eid) + msg = self._cw._('Synchronization has been requested, refresh this page in a few ' + 'minutes.') + import_log = self._cw.entity_from_eid(import_log_eid) + url = import_log.absolute_url(__message=msg) raise Redirect(url) - - # sources management view ###################################################### class ManageSourcesAction(actions.ManagersAction):