--- a/dataimport/pgstore.py Thu Dec 03 14:55:10 2015 +0100
+++ b/dataimport/pgstore.py Mon Dec 14 14:53:02 2015 +0100
@@ -32,28 +32,6 @@
from cubicweb.server.sqlutils import SQL_PREFIX
from cubicweb.dataimport.stores import NoHookRQLObjectStore
-def _import_statements(sql_connect, statements, nb_threads=3,
- dump_output_dir=None,
- support_copy_from=True, encoding='utf-8'):
- """
- Import a bunch of sql statements, using different threads.
- """
- try:
- chunksize = (len(statements) / nb_threads) + 1
- threads = []
- for i in xrange(nb_threads):
- chunks = statements[i*chunksize:(i+1)*chunksize]
- thread = threading.Thread(target=_execmany_thread,
- args=(sql_connect, chunks,
- dump_output_dir,
- support_copy_from,
- encoding))
- thread.start()
- threads.append(thread)
- for t in threads:
- t.join()
- except Exception:
- print 'Error in import statements'
def _execmany_thread_not_copy_from(cu, statement, data, table=None,
columns=None, encoding='utf-8'):
@@ -230,7 +208,7 @@
>>> store.flush()
"""
- def __init__(self, cnx, dump_output_dir=None, nb_threads_statement=3):
+ def __init__(self, cnx, dump_output_dir=None, nb_threads_statement=1):
"""
Initialize a SQLGenObjectStore.
@@ -239,19 +217,18 @@
- cnx: connection on the cubicweb instance
- dump_output_dir: a directory to dump failed statements
for easier recovery. Default is None (no dump).
- - nb_threads_statement: number of threads used
- for SQL insertion (default is 3).
"""
super(SQLGenObjectStore, self).__init__(cnx)
### hijack default source
self.source = SQLGenSourceWrapper(
self.source, cnx.vreg.schema,
- dump_output_dir=dump_output_dir,
- nb_threads_statement=nb_threads_statement)
+ dump_output_dir=dump_output_dir)
### XXX This is done in super().__init__(), but should be
### redone here to link to the correct source
self.add_relation = self.source.add_relation
self.indexes_etypes = {}
+ if nb_threads_statement != 1:
+ warn('[3.21] SQLGenObjectStore is no longer threaded', DeprecationWarning)
def flush(self):
"""Flush data to the database"""
@@ -296,7 +273,7 @@
class SQLGenSourceWrapper(object):
def __init__(self, system_source, schema,
- dump_output_dir=None, nb_threads_statement=3):
+ dump_output_dir=None):
self.system_source = system_source
self._sql = threading.local()
# Explicitely backport attributes from system source
@@ -313,7 +290,6 @@
spcfrom = system_source.dbhelper.dbapi_module.support_copy_from
self.support_copy_from = spcfrom
self.dbencoding = system_source.dbhelper.dbencoding
- self.nb_threads_statement = nb_threads_statement
# initialize thread-local data for main thread
self.init_thread_locals()
self._inlined_rtypes_cache = {}
@@ -368,14 +344,13 @@
# UPDATE query
new_datalist.append(data)
_inlined_relations_sql[statement] = new_datalist
- _import_statements(self.system_source.get_connection,
- _entities_sql.items()
- + _relations_sql.items()
- + _inlined_relations_sql.items(),
- dump_output_dir=self.dump_output_dir,
- nb_threads=self.nb_threads_statement,
- support_copy_from=self.support_copy_from,
- encoding=self.dbencoding)
+ _execmany_thread(self.system_source.get_connection,
+ _entities_sql.items()
+ + _relations_sql.items()
+ + _inlined_relations_sql.items(),
+ dump_output_dir=self.dump_output_dir,
+ support_copy_from=self.support_copy_from,
+ encoding=self.dbencoding)
finally:
_entities_sql.clear()
_relations_sql.clear()