diff -r a42f6e7cef35 -r 50ed87bc4cc6 dataimport/pgstore.py --- a/dataimport/pgstore.py Thu Dec 03 14:55:10 2015 +0100 +++ b/dataimport/pgstore.py Mon Dec 14 14:53:02 2015 +0100 @@ -32,28 +32,6 @@ from cubicweb.server.sqlutils import SQL_PREFIX from cubicweb.dataimport.stores import NoHookRQLObjectStore -def _import_statements(sql_connect, statements, nb_threads=3, - dump_output_dir=None, - support_copy_from=True, encoding='utf-8'): - """ - Import a bunch of sql statements, using different threads. - """ - try: - chunksize = (len(statements) / nb_threads) + 1 - threads = [] - for i in xrange(nb_threads): - chunks = statements[i*chunksize:(i+1)*chunksize] - thread = threading.Thread(target=_execmany_thread, - args=(sql_connect, chunks, - dump_output_dir, - support_copy_from, - encoding)) - thread.start() - threads.append(thread) - for t in threads: - t.join() - except Exception: - print 'Error in import statements' def _execmany_thread_not_copy_from(cu, statement, data, table=None, columns=None, encoding='utf-8'): @@ -230,7 +208,7 @@ >>> store.flush() """ - def __init__(self, cnx, dump_output_dir=None, nb_threads_statement=3): + def __init__(self, cnx, dump_output_dir=None, nb_threads_statement=1): """ Initialize a SQLGenObjectStore. @@ -239,19 +217,18 @@ - cnx: connection on the cubicweb instance - dump_output_dir: a directory to dump failed statements for easier recovery. Default is None (no dump). - - nb_threads_statement: number of threads used - for SQL insertion (default is 3). """ super(SQLGenObjectStore, self).__init__(cnx) ### hijack default source self.source = SQLGenSourceWrapper( self.source, cnx.vreg.schema, - dump_output_dir=dump_output_dir, - nb_threads_statement=nb_threads_statement) + dump_output_dir=dump_output_dir) ### XXX This is done in super().__init__(), but should be ### redone here to link to the correct source self.add_relation = self.source.add_relation self.indexes_etypes = {} + if nb_threads_statement != 1: + warn('[3.21] SQLGenObjectStore is no longer threaded', DeprecationWarning) def flush(self): """Flush data to the database""" @@ -296,7 +273,7 @@ class SQLGenSourceWrapper(object): def __init__(self, system_source, schema, - dump_output_dir=None, nb_threads_statement=3): + dump_output_dir=None): self.system_source = system_source self._sql = threading.local() # Explicitely backport attributes from system source @@ -313,7 +290,6 @@ spcfrom = system_source.dbhelper.dbapi_module.support_copy_from self.support_copy_from = spcfrom self.dbencoding = system_source.dbhelper.dbencoding - self.nb_threads_statement = nb_threads_statement # initialize thread-local data for main thread self.init_thread_locals() self._inlined_rtypes_cache = {} @@ -368,14 +344,13 @@ # UPDATE query new_datalist.append(data) _inlined_relations_sql[statement] = new_datalist - _import_statements(self.system_source.get_connection, - _entities_sql.items() - + _relations_sql.items() - + _inlined_relations_sql.items(), - dump_output_dir=self.dump_output_dir, - nb_threads=self.nb_threads_statement, - support_copy_from=self.support_copy_from, - encoding=self.dbencoding) + _execmany_thread(self.system_source.get_connection, + _entities_sql.items() + + _relations_sql.items() + + _inlined_relations_sql.items(), + dump_output_dir=self.dump_output_dir, + support_copy_from=self.support_copy_from, + encoding=self.dbencoding) finally: _entities_sql.clear() _relations_sql.clear()