[dataimport] remove threading support from SQLGenObjectStore
authorJulien Cristau <julien.cristau@logilab.fr>
Mon, 14 Dec 2015 14:53:02 +0100
changeset 10985 50ed87bc4cc6
parent 10984 a42f6e7cef35
child 10986 ca8321b32392
[dataimport] remove threading support from SQLGenObjectStore It's untested and does not actually work at the moment with nb_threads > 1. related to #9177565.
dataimport/pgstore.py
--- a/dataimport/pgstore.py	Thu Dec 03 14:55:10 2015 +0100
+++ b/dataimport/pgstore.py	Mon Dec 14 14:53:02 2015 +0100
@@ -32,28 +32,6 @@
 from cubicweb.server.sqlutils import SQL_PREFIX
 from cubicweb.dataimport.stores import NoHookRQLObjectStore
 
-def _import_statements(sql_connect, statements, nb_threads=3,
-                       dump_output_dir=None,
-                       support_copy_from=True, encoding='utf-8'):
-    """
-    Import a bunch of sql statements, using different threads.
-    """
-    try:
-        chunksize = (len(statements) / nb_threads) + 1
-        threads = []
-        for i in xrange(nb_threads):
-            chunks = statements[i*chunksize:(i+1)*chunksize]
-            thread = threading.Thread(target=_execmany_thread,
-                                      args=(sql_connect, chunks,
-                                            dump_output_dir,
-                                            support_copy_from,
-                                            encoding))
-            thread.start()
-            threads.append(thread)
-        for t in threads:
-            t.join()
-    except Exception:
-        print 'Error in import statements'
 
 def _execmany_thread_not_copy_from(cu, statement, data, table=None,
                                    columns=None, encoding='utf-8'):
@@ -230,7 +208,7 @@
     >>> store.flush()
     """
 
-    def __init__(self, cnx, dump_output_dir=None, nb_threads_statement=3):
+    def __init__(self, cnx, dump_output_dir=None, nb_threads_statement=1):
         """
         Initialize a SQLGenObjectStore.
 
@@ -239,19 +217,18 @@
           - cnx: connection on the cubicweb instance
           - dump_output_dir: a directory to dump failed statements
             for easier recovery. Default is None (no dump).
-          - nb_threads_statement: number of threads used
-            for SQL insertion (default is 3).
         """
         super(SQLGenObjectStore, self).__init__(cnx)
         ### hijack default source
         self.source = SQLGenSourceWrapper(
             self.source, cnx.vreg.schema,
-            dump_output_dir=dump_output_dir,
-            nb_threads_statement=nb_threads_statement)
+            dump_output_dir=dump_output_dir)
         ### XXX This is done in super().__init__(), but should be
         ### redone here to link to the correct source
         self.add_relation = self.source.add_relation
         self.indexes_etypes = {}
+        if nb_threads_statement != 1:
+            warn('[3.21] SQLGenObjectStore is no longer threaded', DeprecationWarning)
 
     def flush(self):
         """Flush data to the database"""
@@ -296,7 +273,7 @@
 class SQLGenSourceWrapper(object):
 
     def __init__(self, system_source, schema,
-                 dump_output_dir=None, nb_threads_statement=3):
+                 dump_output_dir=None):
         self.system_source = system_source
         self._sql = threading.local()
         # Explicitely backport attributes from system source
@@ -313,7 +290,6 @@
         spcfrom = system_source.dbhelper.dbapi_module.support_copy_from
         self.support_copy_from = spcfrom
         self.dbencoding = system_source.dbhelper.dbencoding
-        self.nb_threads_statement = nb_threads_statement
         # initialize thread-local data for main thread
         self.init_thread_locals()
         self._inlined_rtypes_cache = {}
@@ -368,14 +344,13 @@
                         # UPDATE query
                         new_datalist.append(data)
                 _inlined_relations_sql[statement] = new_datalist
-            _import_statements(self.system_source.get_connection,
-                               _entities_sql.items()
-                               + _relations_sql.items()
-                               + _inlined_relations_sql.items(),
-                               dump_output_dir=self.dump_output_dir,
-                               nb_threads=self.nb_threads_statement,
-                               support_copy_from=self.support_copy_from,
-                               encoding=self.dbencoding)
+            _execmany_thread(self.system_source.get_connection,
+                             _entities_sql.items()
+                             + _relations_sql.items()
+                             + _inlined_relations_sql.items(),
+                             dump_output_dir=self.dump_output_dir,
+                             support_copy_from=self.support_copy_from,
+                             encoding=self.dbencoding)
         finally:
             _entities_sql.clear()
             _relations_sql.clear()