dataimport/pgstore.py
changeset 10991 7ceb0971c694
parent 10981 45bc791275b4
parent 10986 ca8321b32392
child 11009 d5962fb5bb8e
equal deleted inserted replaced
10981:45bc791275b4 10991:7ceb0971c694
    16 # You should have received a copy of the GNU Lesser General Public License along
    16 # You should have received a copy of the GNU Lesser General Public License along
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
    18 """Postgres specific store"""
    18 """Postgres specific store"""
    19 from __future__ import print_function
    19 from __future__ import print_function
    20 
    20 
    21 import threading
       
    22 import warnings
    21 import warnings
    23 import os.path as osp
    22 import os.path as osp
    24 from io import StringIO
    23 from io import StringIO
    25 from time import asctime
    24 from time import asctime
    26 from datetime import date, datetime, time
    25 from datetime import date, datetime, time
    33 from cubicweb.utils import make_uid
    32 from cubicweb.utils import make_uid
    34 from cubicweb.server.utils import eschema_eid
    33 from cubicweb.server.utils import eschema_eid
    35 from cubicweb.server.sqlutils import SQL_PREFIX
    34 from cubicweb.server.sqlutils import SQL_PREFIX
    36 from cubicweb.dataimport.stores import NoHookRQLObjectStore
    35 from cubicweb.dataimport.stores import NoHookRQLObjectStore
    37 
    36 
    38 def _import_statements(sql_connect, statements, nb_threads=3,
       
    39                        dump_output_dir=None,
       
    40                        support_copy_from=True, encoding='utf-8'):
       
    41     """
       
    42     Import a bunch of sql statements, using different threads.
       
    43     """
       
    44     try:
       
    45         chunksize = (len(statements) / nb_threads) + 1
       
    46         threads = []
       
    47         for i in range(nb_threads):
       
    48             chunks = statements[i*chunksize:(i+1)*chunksize]
       
    49             thread = threading.Thread(target=_execmany_thread,
       
    50                                       args=(sql_connect, chunks,
       
    51                                             dump_output_dir,
       
    52                                             support_copy_from,
       
    53                                             encoding))
       
    54             thread.start()
       
    55             threads.append(thread)
       
    56         for t in threads:
       
    57             t.join()
       
    58     except Exception:
       
    59         print('Error in import statements')
       
    60 
    37 
    61 def _execmany_thread_not_copy_from(cu, statement, data, table=None,
    38 def _execmany_thread_not_copy_from(cu, statement, data, table=None,
    62                                    columns=None, encoding='utf-8'):
    39                                    columns=None, encoding='utf-8'):
    63     """ Execute thread without copy from
    40     """ Execute thread without copy from
    64     """
    41     """
   225     >>> store = SQLGenObjectStore(cnx)
   202     >>> store = SQLGenObjectStore(cnx)
   226     >>> store.create_entity('Person', ...)
   203     >>> store.create_entity('Person', ...)
   227     >>> store.flush()
   204     >>> store.flush()
   228     """
   205     """
   229 
   206 
   230     def __init__(self, cnx, dump_output_dir=None, nb_threads_statement=3):
   207     def __init__(self, cnx, dump_output_dir=None, nb_threads_statement=1):
   231         """
   208         """
   232         Initialize a SQLGenObjectStore.
   209         Initialize a SQLGenObjectStore.
   233 
   210 
   234         Parameters:
   211         Parameters:
   235 
   212 
   236           - cnx: connection on the cubicweb instance
   213           - cnx: connection on the cubicweb instance
   237           - dump_output_dir: a directory to dump failed statements
   214           - dump_output_dir: a directory to dump failed statements
   238             for easier recovery. Default is None (no dump).
   215             for easier recovery. Default is None (no dump).
   239           - nb_threads_statement: number of threads used
       
   240             for SQL insertion (default is 3).
       
   241         """
   216         """
   242         super(SQLGenObjectStore, self).__init__(cnx)
   217         super(SQLGenObjectStore, self).__init__(cnx)
   243         ### hijack default source
   218         ### hijack default source
   244         self.source = SQLGenSourceWrapper(
   219         self.source = SQLGenSourceWrapper(
   245             self.source, cnx.vreg.schema,
   220             self.source, cnx.vreg.schema,
   246             dump_output_dir=dump_output_dir,
   221             dump_output_dir=dump_output_dir)
   247             nb_threads_statement=nb_threads_statement)
       
   248         ### XXX This is done in super().__init__(), but should be
   222         ### XXX This is done in super().__init__(), but should be
   249         ### redone here to link to the correct source
   223         ### redone here to link to the correct source
   250         self.add_relation = self.source.add_relation
   224         self.add_relation = self.source.add_relation
   251         self.indexes_etypes = {}
   225         self.indexes_etypes = {}
       
   226         if nb_threads_statement != 1:
       
   227             warn('[3.21] SQLGenObjectStore is no longer threaded', DeprecationWarning)
   252 
   228 
   253     def flush(self):
   229     def flush(self):
   254         """Flush data to the database"""
   230         """Flush data to the database"""
   255         self.source.flush()
   231         self.source.flush()
   256 
   232 
   291 ###########################################################################
   267 ###########################################################################
   292 
   268 
   293 class SQLGenSourceWrapper(object):
   269 class SQLGenSourceWrapper(object):
   294 
   270 
   295     def __init__(self, system_source, schema,
   271     def __init__(self, system_source, schema,
   296                  dump_output_dir=None, nb_threads_statement=3):
   272                  dump_output_dir=None):
   297         self.system_source = system_source
   273         self.system_source = system_source
   298         self._sql = threading.local()
       
   299         # Explicitely backport attributes from system source
   274         # Explicitely backport attributes from system source
   300         self._storage_handler = self.system_source._storage_handler
   275         self._storage_handler = self.system_source._storage_handler
   301         self.preprocess_entity = self.system_source.preprocess_entity
   276         self.preprocess_entity = self.system_source.preprocess_entity
   302         self.sqlgen = self.system_source.sqlgen
   277         self.sqlgen = self.system_source.sqlgen
   303         self.uri = self.system_source.uri
   278         self.uri = self.system_source.uri
   308         # not support (yet...) copy_from
   283         # not support (yet...) copy_from
   309         # XXX Should be dealt with in logilab.database
   284         # XXX Should be dealt with in logilab.database
   310         spcfrom = system_source.dbhelper.dbapi_module.support_copy_from
   285         spcfrom = system_source.dbhelper.dbapi_module.support_copy_from
   311         self.support_copy_from = spcfrom
   286         self.support_copy_from = spcfrom
   312         self.dbencoding = system_source.dbhelper.dbencoding
   287         self.dbencoding = system_source.dbhelper.dbencoding
   313         self.nb_threads_statement = nb_threads_statement
   288         self.init_statement_lists()
   314         # initialize thread-local data for main thread
       
   315         self.init_thread_locals()
       
   316         self._inlined_rtypes_cache = {}
   289         self._inlined_rtypes_cache = {}
   317         self._fill_inlined_rtypes_cache(schema)
   290         self._fill_inlined_rtypes_cache(schema)
   318         self.schema = schema
   291         self.schema = schema
   319         self.do_fti = False
   292         self.do_fti = False
   320 
   293 
   323         for eschema in schema.entities():
   296         for eschema in schema.entities():
   324             for rschema in eschema.ordered_relations():
   297             for rschema in eschema.ordered_relations():
   325                 if rschema.inlined:
   298                 if rschema.inlined:
   326                     cache[eschema.type] = SQL_PREFIX + rschema.type
   299                     cache[eschema.type] = SQL_PREFIX + rschema.type
   327 
   300 
   328     def init_thread_locals(self):
   301     def init_statement_lists(self):
   329         """initializes thread-local data"""
   302         self._sql_entities = defaultdict(list)
   330         self._sql.entities = defaultdict(list)
   303         self._sql_relations = {}
   331         self._sql.relations = {}
   304         self._sql_inlined_relations = {}
   332         self._sql.inlined_relations = {}
   305         self._sql_eids = defaultdict(list)
   333         # keep track, for each eid of the corresponding data dict
   306         # keep track, for each eid of the corresponding data dict
   334         self._sql.eid_insertdicts = {}
   307         self._sql_eid_insertdicts = {}
   335 
   308 
   336     def flush(self):
   309     def flush(self):
   337         print('starting flush')
   310         print('starting flush')
   338         _entities_sql = self._sql.entities
   311         _entities_sql = self._sql_entities
   339         _relations_sql = self._sql.relations
   312         _relations_sql = self._sql_relations
   340         _inlined_relations_sql = self._sql.inlined_relations
   313         _inlined_relations_sql = self._sql_inlined_relations
   341         _insertdicts = self._sql.eid_insertdicts
   314         _insertdicts = self._sql_eid_insertdicts
   342         try:
   315         try:
   343             # try, for each inlined_relation, to find if we're also creating
   316             # try, for each inlined_relation, to find if we're also creating
   344             # the host entity (i.e. the subject of the relation).
   317             # the host entity (i.e. the subject of the relation).
   345             # In that case, simply update the insert dict and remove
   318             # In that case, simply update the insert dict and remove
   346             # the need to make the
   319             # the need to make the
   363                     else:
   336                     else:
   364                         # could not find corresponding insert dict, keep the
   337                         # could not find corresponding insert dict, keep the
   365                         # UPDATE query
   338                         # UPDATE query
   366                         new_datalist.append(data)
   339                         new_datalist.append(data)
   367                 _inlined_relations_sql[statement] = new_datalist
   340                 _inlined_relations_sql[statement] = new_datalist
   368             _import_statements(self.system_source.get_connection,
   341             _execmany_thread(self.system_source.get_connection,
   369                                _entities_sql.items()
   342                              self._sql_eids.items()
   370                                + _relations_sql.items()
   343                              + _entities_sql.items()
   371                                + _inlined_relations_sql.items(),
   344                              + _relations_sql.items()
   372                                dump_output_dir=self.dump_output_dir,
   345                              + _inlined_relations_sql.items(),
   373                                nb_threads=self.nb_threads_statement,
   346                              dump_output_dir=self.dump_output_dir,
   374                                support_copy_from=self.support_copy_from,
   347                              support_copy_from=self.support_copy_from,
   375                                encoding=self.dbencoding)
   348                              encoding=self.dbencoding)
   376         finally:
   349         finally:
   377             _entities_sql.clear()
   350             _entities_sql.clear()
   378             _relations_sql.clear()
   351             _relations_sql.clear()
   379             _insertdicts.clear()
   352             _insertdicts.clear()
   380             _inlined_relations_sql.clear()
   353             _inlined_relations_sql.clear()
   381 
   354 
   382     def add_relation(self, cnx, subject, rtype, object,
   355     def add_relation(self, cnx, subject, rtype, object,
   383                      inlined=False, **kwargs):
   356                      inlined=False, **kwargs):
   384         if inlined:
   357         if inlined:
   385             _sql = self._sql.inlined_relations
   358             _sql = self._sql_inlined_relations
   386             data = {'cw_eid': subject, SQL_PREFIX + rtype: object}
   359             data = {'cw_eid': subject, SQL_PREFIX + rtype: object}
   387             subjtype = kwargs.get('subjtype')
   360             subjtype = kwargs.get('subjtype')
   388             if subjtype is None:
   361             if subjtype is None:
   389                 # Try to infer it
   362                 # Try to infer it
   390                 targets = [t.type for t in
   363                 targets = [t.type for t in
   398                                      'this type is given as keyword argument '
   371                                      'this type is given as keyword argument '
   399                                      '``subjtype``'% rtype)
   372                                      '``subjtype``'% rtype)
   400             statement = self.sqlgen.update(SQL_PREFIX + subjtype,
   373             statement = self.sqlgen.update(SQL_PREFIX + subjtype,
   401                                            data, ['cw_eid'])
   374                                            data, ['cw_eid'])
   402         else:
   375         else:
   403             _sql = self._sql.relations
   376             _sql = self._sql_relations
   404             data = {'eid_from': subject, 'eid_to': object}
   377             data = {'eid_from': subject, 'eid_to': object}
   405             statement = self.sqlgen.insert('%s_relation' % rtype, data)
   378             statement = self.sqlgen.insert('%s_relation' % rtype, data)
   406         if statement in _sql:
   379         if statement in _sql:
   407             _sql[statement].append(data)
   380             _sql[statement].append(data)
   408         else:
   381         else:
   416                 rtypes = (rtypes,)
   389                 rtypes = (rtypes,)
   417             for rtype in rtypes:
   390             for rtype in rtypes:
   418                 if rtype not in attrs:
   391                 if rtype not in attrs:
   419                     attrs[rtype] = None
   392                     attrs[rtype] = None
   420             sql = self.sqlgen.insert(SQL_PREFIX + entity.cw_etype, attrs)
   393             sql = self.sqlgen.insert(SQL_PREFIX + entity.cw_etype, attrs)
   421             self._sql.eid_insertdicts[entity.eid] = attrs
   394             self._sql_eid_insertdicts[entity.eid] = attrs
   422             self._append_to_entities(sql, attrs)
   395             self._append_to_entities(sql, attrs)
   423 
   396 
   424     def _append_to_entities(self, sql, attrs):
   397     def _append_to_entities(self, sql, attrs):
   425         self._sql.entities[sql].append(attrs)
   398         self._sql_entities[sql].append(attrs)
   426 
   399 
   427     def _handle_insert_entity_sql(self, cnx, sql, attrs):
   400     def _handle_insert_entity_sql(self, cnx, sql, attrs):
   428         # We have to overwrite the source given in parameters
   401         # We have to overwrite the source given in parameters
   429         # as here, we directly use the system source
   402         # as here, we directly use the system source
   430         attrs['asource'] = self.system_source.uri
   403         attrs['asource'] = self.system_source.uri
   431         self._append_to_entities(sql, attrs)
   404         self._sql_eids[sql].append(attrs)
   432 
   405 
   433     def _handle_is_relation_sql(self, cnx, sql, attrs):
   406     def _handle_is_relation_sql(self, cnx, sql, attrs):
   434         self._append_to_entities(sql, attrs)
   407         self._append_to_entities(sql, attrs)
   435 
   408 
   436     def _handle_is_instance_of_sql(self, cnx, sql, attrs):
   409     def _handle_is_instance_of_sql(self, cnx, sql, attrs):