cubicweb/server/sources/datafeed.py
changeset 11138 78c8e64f3cef
parent 11129 97095348b3ee
child 11151 4259c55df3e7
--- a/cubicweb/server/sources/datafeed.py	Fri Feb 19 12:35:22 2016 +0100
+++ b/cubicweb/server/sources/datafeed.py	Wed Feb 03 11:12:09 2016 +0100
@@ -1,4 +1,4 @@
-# copyright 2010-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2010-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
 #
 # This file is part of CubicWeb.
@@ -22,6 +22,7 @@
 from io import BytesIO
 from os.path import exists
 from datetime import datetime, timedelta
+from functools import partial
 
 from six import text_type
 from six.moves.urllib.parse import urlparse
@@ -194,25 +195,43 @@
                     {'x': self.eid})
         cnx.commit()
 
-    def pull_data(self, cnx, force=False, raise_on_error=False):
+    def pull_data(self, cnx, force=False, raise_on_error=False, async=False):
         """Launch synchronization of the source if needed.
 
-        This method is responsible to handle commit/rollback on the given
-        connection.
+        If `async` is true, the method return immediatly a dictionnary containing the import log's
+        eid, and the actual synchronization is done asynchronously. If `async` is false, return some
+        imports statistics (e.g. number of created and updated entities).
+
+        This method is responsible to handle commit/rollback on the given connection.
         """
         if not force and self.fresh():
             return {}
         if not self.acquire_synchronization_lock(cnx, force):
             return {}
         try:
-            return self._pull_data(cnx, force, raise_on_error)
+            if async:
+                return self._async_pull_data(cnx, force, raise_on_error)
+            else:
+                return self._pull_data(cnx, force, raise_on_error)
         finally:
-            cnx.rollback() # rollback first in case there is some dirty
-                           # transaction remaining
+            cnx.rollback()  # rollback first in case there is some dirty transaction remaining
             self.release_synchronization_lock(cnx)
 
-    def _pull_data(self, cnx, force=False, raise_on_error=False):
-        importlog = self.init_import_log(cnx)
+    def _async_pull_data(self, cnx, force, raise_on_error):
+        import_log = cnx.create_entity('CWDataImport', cw_import_of=self)
+        cnx.commit()  # commit the import log creation before starting the synchronize task
+
+        def _synchronize_source(repo, source_eid, import_log_eid):
+            with repo.internal_cnx() as cnx:
+                source = repo.sources_by_eid[source_eid]
+                source._pull_data(cnx, force, raise_on_error, import_log_eid=import_log_eid)
+
+        sync = partial(_synchronize_source, cnx.repo, self.eid, import_log.eid)
+        cnx.repo.threaded_task(sync)
+        return {'import_log_eid': import_log.eid}
+
+    def _pull_data(self, cnx, force=False, raise_on_error=False, import_log_eid=None):
+        importlog = self.init_import_log(cnx, import_log_eid)
         myuris = self.source_cwuris(cnx)
         try:
             parser = self._get_parser(cnx, sourceuris=myuris, import_log=importlog)
@@ -297,12 +316,17 @@
         return dict((self.decode_extid(uri), (eid, type))
                     for uri, eid, type in cnx.system_sql(sql).fetchall())
 
-    def init_import_log(self, cnx, **kwargs):
-        dataimport = cnx.create_entity('CWDataImport', cw_import_of=self,
-                                       start_timestamp=datetime.now(tz=utc),
-                                       **kwargs)
-        dataimport.init()
-        return dataimport
+    def init_import_log(self, cnx, import_log_eid=None, **kwargs):
+        if import_log_eid is None:
+            import_log = cnx.create_entity('CWDataImport', cw_import_of=self,
+                                           start_timestamp=datetime.now(tz=utc),
+                                           **kwargs)
+        else:
+            import_log = cnx.entity_from_eid(import_log_eid)
+            import_log.cw_set(start_timestamp=datetime.now(tz=utc), **kwargs)
+        cnx.commit()  # make changes visible
+        import_log.init()
+        return import_log
 
 
 class DataFeedParser(AppObject):