[sources] synchronize source asynchronously when started from the UI
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Wed, 03 Feb 2016 11:12:09 +0100
changeset 11138 78c8e64f3cef
parent 11137 447a6f1e8def
child 11139 df928a3a94e3
[sources] synchronize source asynchronously when started from the UI and redirect to the forthcoming import log, for a better user experience (there is still a lot more to do there though). Closes #10468967
cubicweb/server/sources/datafeed.py
cubicweb/sobjects/services.py
cubicweb/web/test/unittest_views_cwsources.py
cubicweb/web/views/cwsources.py
--- a/cubicweb/server/sources/datafeed.py	Fri Feb 19 12:35:22 2016 +0100
+++ b/cubicweb/server/sources/datafeed.py	Wed Feb 03 11:12:09 2016 +0100
@@ -1,4 +1,4 @@
-# copyright 2010-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2010-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
 #
 # This file is part of CubicWeb.
@@ -22,6 +22,7 @@
 from io import BytesIO
 from os.path import exists
 from datetime import datetime, timedelta
+from functools import partial
 
 from six import text_type
 from six.moves.urllib.parse import urlparse
@@ -194,25 +195,43 @@
                     {'x': self.eid})
         cnx.commit()
 
-    def pull_data(self, cnx, force=False, raise_on_error=False):
+    def pull_data(self, cnx, force=False, raise_on_error=False, async=False):
         """Launch synchronization of the source if needed.
 
-        This method is responsible to handle commit/rollback on the given
-        connection.
+        If `async` is true, the method return immediatly a dictionnary containing the import log's
+        eid, and the actual synchronization is done asynchronously. If `async` is false, return some
+        imports statistics (e.g. number of created and updated entities).
+
+        This method is responsible to handle commit/rollback on the given connection.
         """
         if not force and self.fresh():
             return {}
         if not self.acquire_synchronization_lock(cnx, force):
             return {}
         try:
-            return self._pull_data(cnx, force, raise_on_error)
+            if async:
+                return self._async_pull_data(cnx, force, raise_on_error)
+            else:
+                return self._pull_data(cnx, force, raise_on_error)
         finally:
-            cnx.rollback() # rollback first in case there is some dirty
-                           # transaction remaining
+            cnx.rollback()  # rollback first in case there is some dirty transaction remaining
             self.release_synchronization_lock(cnx)
 
-    def _pull_data(self, cnx, force=False, raise_on_error=False):
-        importlog = self.init_import_log(cnx)
+    def _async_pull_data(self, cnx, force, raise_on_error):
+        import_log = cnx.create_entity('CWDataImport', cw_import_of=self)
+        cnx.commit()  # commit the import log creation before starting the synchronize task
+
+        def _synchronize_source(repo, source_eid, import_log_eid):
+            with repo.internal_cnx() as cnx:
+                source = repo.sources_by_eid[source_eid]
+                source._pull_data(cnx, force, raise_on_error, import_log_eid=import_log_eid)
+
+        sync = partial(_synchronize_source, cnx.repo, self.eid, import_log.eid)
+        cnx.repo.threaded_task(sync)
+        return {'import_log_eid': import_log.eid}
+
+    def _pull_data(self, cnx, force=False, raise_on_error=False, import_log_eid=None):
+        importlog = self.init_import_log(cnx, import_log_eid)
         myuris = self.source_cwuris(cnx)
         try:
             parser = self._get_parser(cnx, sourceuris=myuris, import_log=importlog)
@@ -297,12 +316,17 @@
         return dict((self.decode_extid(uri), (eid, type))
                     for uri, eid, type in cnx.system_sql(sql).fetchall())
 
-    def init_import_log(self, cnx, **kwargs):
-        dataimport = cnx.create_entity('CWDataImport', cw_import_of=self,
-                                       start_timestamp=datetime.now(tz=utc),
-                                       **kwargs)
-        dataimport.init()
-        return dataimport
+    def init_import_log(self, cnx, import_log_eid=None, **kwargs):
+        if import_log_eid is None:
+            import_log = cnx.create_entity('CWDataImport', cw_import_of=self,
+                                           start_timestamp=datetime.now(tz=utc),
+                                           **kwargs)
+        else:
+            import_log = cnx.entity_from_eid(import_log_eid)
+            import_log.cw_set(start_timestamp=datetime.now(tz=utc), **kwargs)
+        cnx.commit()  # make changes visible
+        import_log.init()
+        return import_log
 
 
 class DataFeedParser(AppObject):
--- a/cubicweb/sobjects/services.py	Fri Feb 19 12:35:22 2016 +0100
+++ b/cubicweb/sobjects/services.py	Wed Feb 03 11:12:09 2016 +0100
@@ -1,4 +1,4 @@
-# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2003-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
 #
 # This file is part of CubicWeb.
@@ -27,6 +27,7 @@
 from cubicweb.server import Service
 from cubicweb.predicates import match_user_groups, match_kwargs
 
+
 class StatsService(Service):
     """Return a dictionary containing some statistics about the repository
     resources usage.
@@ -161,14 +162,14 @@
 
 
 class SourceSynchronizationService(Service):
-    """Force synchronization of a datafeed source"""
+    """Force synchronization of a datafeed source. Actual synchronization is done
+    asynchronously, this will simply create and return the entity which will hold the import
+    log.
+    """
     __regid__ = 'source-sync'
     __select__ = Service.__select__ & match_user_groups('managers')
 
     def call(self, source_eid):
-        source_entity = self._cw.entity_from_eid(source_eid)
-        repo = self._cw.repo # Service are repo side only.
-        with repo.internal_cnx() as cnx:
-            source = repo.sources_by_uri[source_entity.name]
-            source.pull_data(cnx)
-
+        source = self._cw.repo.sources_by_eid[source_eid]
+        result = source.pull_data(self._cw, force=True, async=True)
+        return result['import_log_eid']
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/cubicweb/web/test/unittest_views_cwsources.py	Wed Feb 03 11:12:09 2016 +0100
@@ -0,0 +1,26 @@
+from logilab.common import tempattr
+from cubicweb.devtools.testlib import CubicWebTC
+
+
+class SynchronizeSourceTC(CubicWebTC):
+    def test_synchronize_view(self):
+        with self.admin_access.web_request(vid='cw.source-sync') as req:
+            source = req.create_entity('CWSource', name=u'ext', type=u'datafeed',
+                                       parser=u'cw.entityxml')
+            req.cnx.commit()
+
+            self.threads = 0
+
+            def threaded_task(func):
+                self.threads += 1
+
+            with tempattr(req.cnx.repo, 'threaded_task', threaded_task):
+                path, args = self.expect_redirect_handle_request(
+                    req, path=source.rest_path())
+                self.assertEqual(self.threads, 1)
+                self.assertTrue(path.startswith('cwdataimport/'))
+
+
+if __name__ == '__main__':
+    import unittest
+    unittest.main()
--- a/cubicweb/web/views/cwsources.py	Fri Feb 19 12:35:22 2016 +0100
+++ b/cubicweb/web/views/cwsources.py	Wed Feb 03 11:12:09 2016 +0100
@@ -1,4 +1,4 @@
-# copyright 2010-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2010-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
 #
 # This file is part of CubicWeb.
@@ -204,7 +204,6 @@
         pass
 
 
-
 class CWSourceImportsTab(EntityView):
     __regid__ = 'cwsource-imports'
     __select__ = (is_instance('CWSource')
@@ -250,14 +249,14 @@
     title = _('synchronize')
 
     def entity_call(self, entity):
-        self._cw.call_service('source-sync', source_eid=entity.eid)
-        msg = self._cw._('Source has been synchronized')
-        url = entity.absolute_url(tab='cwsource-imports', __message=msg)
+        import_log_eid = self._cw.call_service('source-sync', source_eid=entity.eid)
+        msg = self._cw._('Synchronization has been requested, refresh this page in a few '
+                         'minutes.')
+        import_log = self._cw.entity_from_eid(import_log_eid)
+        url = import_log.absolute_url(__message=msg)
         raise Redirect(url)
 
 
-
-
 # sources management view ######################################################
 
 class ManageSourcesAction(actions.ManagersAction):