--- a/dataimport/pgstore.py Tue Dec 15 14:12:59 2015 +0100
+++ b/dataimport/pgstore.py Wed Dec 16 11:23:48 2015 +0100
@@ -18,7 +18,6 @@
"""Postgres specific store"""
from __future__ import print_function
-import threading
import warnings
import os.path as osp
from io import StringIO
@@ -35,28 +34,6 @@
from cubicweb.server.sqlutils import SQL_PREFIX
from cubicweb.dataimport.stores import NoHookRQLObjectStore
-def _import_statements(sql_connect, statements, nb_threads=3,
- dump_output_dir=None,
- support_copy_from=True, encoding='utf-8'):
- """
- Import a bunch of sql statements, using different threads.
- """
- try:
- chunksize = (len(statements) / nb_threads) + 1
- threads = []
- for i in range(nb_threads):
- chunks = statements[i*chunksize:(i+1)*chunksize]
- thread = threading.Thread(target=_execmany_thread,
- args=(sql_connect, chunks,
- dump_output_dir,
- support_copy_from,
- encoding))
- thread.start()
- threads.append(thread)
- for t in threads:
- t.join()
- except Exception:
- print('Error in import statements')
def _execmany_thread_not_copy_from(cu, statement, data, table=None,
columns=None, encoding='utf-8'):
@@ -227,7 +204,7 @@
>>> store.flush()
"""
- def __init__(self, cnx, dump_output_dir=None, nb_threads_statement=3):
+ def __init__(self, cnx, dump_output_dir=None, nb_threads_statement=1):
"""
Initialize a SQLGenObjectStore.
@@ -236,19 +213,18 @@
- cnx: connection on the cubicweb instance
- dump_output_dir: a directory to dump failed statements
for easier recovery. Default is None (no dump).
- - nb_threads_statement: number of threads used
- for SQL insertion (default is 3).
"""
super(SQLGenObjectStore, self).__init__(cnx)
### hijack default source
self.source = SQLGenSourceWrapper(
self.source, cnx.vreg.schema,
- dump_output_dir=dump_output_dir,
- nb_threads_statement=nb_threads_statement)
+ dump_output_dir=dump_output_dir)
### XXX This is done in super().__init__(), but should be
### redone here to link to the correct source
self.add_relation = self.source.add_relation
self.indexes_etypes = {}
+ if nb_threads_statement != 1:
+ warn('[3.21] SQLGenObjectStore is no longer threaded', DeprecationWarning)
def flush(self):
"""Flush data to the database"""
@@ -293,9 +269,8 @@
class SQLGenSourceWrapper(object):
def __init__(self, system_source, schema,
- dump_output_dir=None, nb_threads_statement=3):
+ dump_output_dir=None):
self.system_source = system_source
- self._sql = threading.local()
# Explicitely backport attributes from system source
self._storage_handler = self.system_source._storage_handler
self.preprocess_entity = self.system_source.preprocess_entity
@@ -310,9 +285,7 @@
spcfrom = system_source.dbhelper.dbapi_module.support_copy_from
self.support_copy_from = spcfrom
self.dbencoding = system_source.dbhelper.dbencoding
- self.nb_threads_statement = nb_threads_statement
- # initialize thread-local data for main thread
- self.init_thread_locals()
+ self.init_statement_lists()
self._inlined_rtypes_cache = {}
self._fill_inlined_rtypes_cache(schema)
self.schema = schema
@@ -325,20 +298,20 @@
if rschema.inlined:
cache[eschema.type] = SQL_PREFIX + rschema.type
- def init_thread_locals(self):
- """initializes thread-local data"""
- self._sql.entities = defaultdict(list)
- self._sql.relations = {}
- self._sql.inlined_relations = {}
+ def init_statement_lists(self):
+ self._sql_entities = defaultdict(list)
+ self._sql_relations = {}
+ self._sql_inlined_relations = {}
+ self._sql_eids = defaultdict(list)
# keep track, for each eid of the corresponding data dict
- self._sql.eid_insertdicts = {}
+ self._sql_eid_insertdicts = {}
def flush(self):
print('starting flush')
- _entities_sql = self._sql.entities
- _relations_sql = self._sql.relations
- _inlined_relations_sql = self._sql.inlined_relations
- _insertdicts = self._sql.eid_insertdicts
+ _entities_sql = self._sql_entities
+ _relations_sql = self._sql_relations
+ _inlined_relations_sql = self._sql_inlined_relations
+ _insertdicts = self._sql_eid_insertdicts
try:
# try, for each inlined_relation, to find if we're also creating
# the host entity (i.e. the subject of the relation).
@@ -365,14 +338,14 @@
# UPDATE query
new_datalist.append(data)
_inlined_relations_sql[statement] = new_datalist
- _import_statements(self.system_source.get_connection,
- _entities_sql.items()
- + _relations_sql.items()
- + _inlined_relations_sql.items(),
- dump_output_dir=self.dump_output_dir,
- nb_threads=self.nb_threads_statement,
- support_copy_from=self.support_copy_from,
- encoding=self.dbencoding)
+ _execmany_thread(self.system_source.get_connection,
+ self._sql_eids.items()
+ + _entities_sql.items()
+ + _relations_sql.items()
+ + _inlined_relations_sql.items(),
+ dump_output_dir=self.dump_output_dir,
+ support_copy_from=self.support_copy_from,
+ encoding=self.dbencoding)
finally:
_entities_sql.clear()
_relations_sql.clear()
@@ -382,7 +355,7 @@
def add_relation(self, cnx, subject, rtype, object,
inlined=False, **kwargs):
if inlined:
- _sql = self._sql.inlined_relations
+ _sql = self._sql_inlined_relations
data = {'cw_eid': subject, SQL_PREFIX + rtype: object}
subjtype = kwargs.get('subjtype')
if subjtype is None:
@@ -400,7 +373,7 @@
statement = self.sqlgen.update(SQL_PREFIX + subjtype,
data, ['cw_eid'])
else:
- _sql = self._sql.relations
+ _sql = self._sql_relations
data = {'eid_from': subject, 'eid_to': object}
statement = self.sqlgen.insert('%s_relation' % rtype, data)
if statement in _sql:
@@ -418,17 +391,17 @@
if rtype not in attrs:
attrs[rtype] = None
sql = self.sqlgen.insert(SQL_PREFIX + entity.cw_etype, attrs)
- self._sql.eid_insertdicts[entity.eid] = attrs
+ self._sql_eid_insertdicts[entity.eid] = attrs
self._append_to_entities(sql, attrs)
def _append_to_entities(self, sql, attrs):
- self._sql.entities[sql].append(attrs)
+ self._sql_entities[sql].append(attrs)
def _handle_insert_entity_sql(self, cnx, sql, attrs):
# We have to overwrite the source given in parameters
# as here, we directly use the system source
attrs['asource'] = self.system_source.uri
- self._append_to_entities(sql, attrs)
+ self._sql_eids[sql].append(attrs)
def _handle_is_relation_sql(self, cnx, sql, attrs):
self._append_to_entities(sql, attrs)