dataimport.py
branchstable
changeset 8743 27a83746aebd
parent 8724 1beab80aed23
child 8796 2884368b263c
equal deleted inserted replaced
8742:bd374bd906f3 8743:27a83746aebd
     1 # -*- coding: utf-8 -*-
     1 # -*- coding: utf-8 -*-
     2 # copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     2 # copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     3 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
     3 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
     4 #
     4 #
     5 # This file is part of CubicWeb.
     5 # This file is part of CubicWeb.
     6 #
     6 #
     7 # CubicWeb is free software: you can redistribute it and/or modify it under the
     7 # CubicWeb is free software: you can redistribute it and/or modify it under the
    62   ctl.run()
    62   ctl.run()
    63 
    63 
    64 .. BUG file with one column are not parsable
    64 .. BUG file with one column are not parsable
    65 .. TODO rollback() invocation is not possible yet
    65 .. TODO rollback() invocation is not possible yet
    66 """
    66 """
    67 from __future__ import with_statement
       
    68 
       
    69 __docformat__ = "restructuredtext en"
    67 __docformat__ = "restructuredtext en"
    70 
    68 
       
    69 import csv
    71 import sys
    70 import sys
    72 import csv
    71 import threading
    73 import traceback
    72 import traceback
       
    73 import cPickle
    74 import os.path as osp
    74 import os.path as osp
       
    75 from collections import defaultdict
       
    76 from contextlib import contextmanager
       
    77 from copy import copy
       
    78 from datetime import date, datetime
       
    79 from time import asctime
    75 from StringIO import StringIO
    80 from StringIO import StringIO
    76 from copy import copy
       
    77 from datetime import datetime
       
    78 
    81 
    79 from logilab.common import shellutils, attrdict
    82 from logilab.common import shellutils, attrdict
    80 from logilab.common.date import strptime
    83 from logilab.common.date import strptime
    81 from logilab.common.decorators import cached
    84 from logilab.common.decorators import cached
    82 from logilab.common.deprecation import deprecated
    85 from logilab.common.deprecation import deprecated
    83 
    86 
    84 from cubicweb import QueryError
    87 from cubicweb import QueryError
       
    88 from cubicweb.utils import make_uid
    85 from cubicweb.schema import META_RTYPES, VIRTUAL_RTYPES
    89 from cubicweb.schema import META_RTYPES, VIRTUAL_RTYPES
       
    90 from cubicweb.server.edition import EditedEntity
       
    91 from cubicweb.server.sqlutils import SQL_PREFIX
    86 from cubicweb.server.utils import eschema_eid
    92 from cubicweb.server.utils import eschema_eid
    87 from cubicweb.server.edition import EditedEntity
       
    88 
    93 
    89 
    94 
    90 def count_lines(stream_or_filename):
    95 def count_lines(stream_or_filename):
    91     if isinstance(stream_or_filename, basestring):
    96     if isinstance(stream_or_filename, basestring):
    92         f = open(stream_or_filename)
    97         f = open(stream_or_filename)
   117         if withpb:
   122         if withpb:
   118             pb.update()
   123             pb.update()
   119     print ' %s rows imported' % rowcount
   124     print ' %s rows imported' % rowcount
   120 
   125 
   121 def ucsvreader(stream, encoding='utf-8', separator=',', quote='"',
   126 def ucsvreader(stream, encoding='utf-8', separator=',', quote='"',
   122                skipfirst=False):
   127                skipfirst=False, ignore_errors=False):
   123     """A csv reader that accepts files with any encoding and outputs unicode
   128     """A csv reader that accepts files with any encoding and outputs unicode
   124     strings
   129     strings
   125     """
   130     """
   126     it = iter(csv.reader(stream, delimiter=separator, quotechar=quote))
   131     it = iter(csv.reader(stream, delimiter=separator, quotechar=quote))
   127     if skipfirst:
   132     if not ignore_errors:
   128         it.next()
   133         if skipfirst:
   129     for row in it:
   134             it.next()
   130         yield [item.decode(encoding) for item in row]
   135         for row in it:
       
   136             yield [item.decode(encoding) for item in row]
       
   137     else:
       
   138         # Skip first line
       
   139         try:
       
   140             row = it.next()
       
   141         except csv.Error:
       
   142             pass
       
   143         # Safe version, that can cope with error in CSV file
       
   144         while True:
       
   145             try:
       
   146                 row = it.next()
       
   147             # End of CSV, break
       
   148             except StopIteration:
       
   149                 break
       
   150             # Error in CSV, ignore line and continue
       
   151             except csv.Error:
       
   152                 continue
       
   153             yield [item.decode(encoding) for item in row]
   131 
   154 
   132 def callfunc_every(func, number, iterable):
   155 def callfunc_every(func, number, iterable):
   133     """yield items of `iterable` one by one and call function `func`
   156     """yield items of `iterable` one by one and call function `func`
   134     every `number` iterations. Always call function `func` at the end.
   157     every `number` iterations. Always call function `func` at the end.
   135     """
   158     """
   191         try:
   214         try:
   192             for func in funcs:
   215             for func in funcs:
   193                 res[dest] = func(res[dest])
   216                 res[dest] = func(res[dest])
   194                 if res[dest] is None:
   217                 if res[dest] is None:
   195                     break
   218                     break
   196         except ValueError, err:
   219         except ValueError as err:
   197             raise ValueError('error with %r field: %s' % (src, err)), None, sys.exc_info()[-1]
   220             raise ValueError('error with %r field: %s' % (src, err)), None, sys.exc_info()[-1]
   198     return res
   221     return res
   199 
   222 
   200 # user interactions ############################################################
   223 # user interactions ############################################################
   201 
   224 
   297 
   320 
   298 def check_doubles_not_none(buckets):
   321 def check_doubles_not_none(buckets):
   299     """Extract the keys that have more than one item in their bucket."""
   322     """Extract the keys that have more than one item in their bucket."""
   300     return [(k, len(v)) for k, v in buckets.items()
   323     return [(k, len(v)) for k, v in buckets.items()
   301             if k is not None and len(v) > 1]
   324             if k is not None and len(v) > 1]
       
   325 
       
   326 
       
   327 # sql generator utility functions #############################################
       
   328 
       
   329 
       
   330 def _import_statements(sql_connect, statements, nb_threads=3,
       
   331                        dump_output_dir=None,
       
   332                        support_copy_from=True, encoding='utf-8'):
       
   333     """
       
   334     Import a bunch of sql statements, using different threads.
       
   335     """
       
   336     try:
       
   337         chunksize = (len(statements) / nb_threads) + 1
       
   338         threads = []
       
   339         for i in xrange(nb_threads):
       
   340             chunks = statements[i*chunksize:(i+1)*chunksize]
       
   341             thread = threading.Thread(target=_execmany_thread,
       
   342                                       args=(sql_connect, chunks,
       
   343                                             dump_output_dir,
       
   344                                             support_copy_from,
       
   345                                             encoding))
       
   346             thread.start()
       
   347             threads.append(thread)
       
   348         for t in threads:
       
   349             t.join()
       
   350     except Exception:
       
   351         print 'Error in import statements'
       
   352 
       
   353 def _execmany_thread_not_copy_from(cu, statement, data, table=None,
       
   354                                    columns=None, encoding='utf-8'):
       
   355     """ Execute thread without copy from
       
   356     """
       
   357     cu.executemany(statement, data)
       
   358 
       
   359 def _execmany_thread_copy_from(cu, statement, data, table,
       
   360                                columns, encoding='utf-8'):
       
   361     """ Execute thread with copy from
       
   362     """
       
   363     buf = _create_copyfrom_buffer(data, columns, encoding)
       
   364     if buf is None:
       
   365         _execmany_thread_not_copy_from(cu, statement, data)
       
   366     else:
       
   367         if columns is None:
       
   368             cu.copy_from(buf, table, null='NULL')
       
   369         else:
       
   370             cu.copy_from(buf, table, null='NULL', columns=columns)
       
   371 
       
   372 def _execmany_thread(sql_connect, statements, dump_output_dir=None,
       
   373                      support_copy_from=True, encoding='utf-8'):
       
   374     """
       
   375     Execute sql statement. If 'INSERT INTO', try to use 'COPY FROM' command,
       
   376     or fallback to execute_many.
       
   377     """
       
   378     if support_copy_from:
       
   379         execmany_func = _execmany_thread_copy_from
       
   380     else:
       
   381         execmany_func = _execmany_thread_not_copy_from
       
   382     cnx = sql_connect()
       
   383     cu = cnx.cursor()
       
   384     try:
       
   385         for statement, data in statements:
       
   386             table = None
       
   387             columns = None
       
   388             try:
       
   389                 if not statement.startswith('INSERT INTO'):
       
   390                     cu.executemany(statement, data)
       
   391                     continue
       
   392                 table = statement.split()[2]
       
   393                 if isinstance(data[0], (tuple, list)):
       
   394                     columns = None
       
   395                 else:
       
   396                     columns = list(data[0])
       
   397                 execmany_func(cu, statement, data, table, columns, encoding)
       
   398             except Exception:
       
   399                 print 'unable to copy data into table %s', table
       
   400                 # Error in import statement, save data in dump_output_dir
       
   401                 if dump_output_dir is not None:
       
   402                     pdata = {'data': data, 'statement': statement,
       
   403                              'time': asctime(), 'columns': columns}
       
   404                     filename = make_uid()
       
   405                     try:
       
   406                         with open(osp.join(dump_output_dir,
       
   407                                            '%s.pickle' % filename), 'w') as fobj:
       
   408                             fobj.write(cPickle.dumps(pdata))
       
   409                     except IOError:
       
   410                         print 'ERROR while pickling in', dump_output_dir, filename+'.pickle'
       
   411                         pass
       
   412                 cnx.rollback()
       
   413                 raise
       
   414     finally:
       
   415         cnx.commit()
       
   416         cu.close()
       
   417 
       
   418 def _create_copyfrom_buffer(data, columns, encoding='utf-8', replace_sep=None):
       
   419     """
       
   420     Create a StringIO buffer for 'COPY FROM' command.
       
   421     Deals with Unicode, Int, Float, Date...
       
   422     """
       
   423     # Create a list rather than directly create a StringIO
       
   424     # to correctly write lines separated by '\n' in a single step
       
   425     rows = []
       
   426     if isinstance(data[0], (tuple, list)):
       
   427         columns = range(len(data[0]))
       
   428     for row in data:
       
   429         # Iterate over the different columns and the different values
       
   430         # and try to convert them to a correct datatype.
       
   431         # If an error is raised, do not continue.
       
   432         formatted_row = []
       
   433         for col in columns:
       
   434             value = row[col]
       
   435             if value is None:
       
   436                 value = 'NULL'
       
   437             elif isinstance(value, (long, int, float)):
       
   438                 value = str(value)
       
   439             elif isinstance(value, (str, unicode)):
       
   440                 # Remove separators used in string formatting
       
   441                 for _char in (u'\t', u'\r', u'\n'):
       
   442                     if _char in value:
       
   443                         # If a replace_sep is given, replace
       
   444                         # the separator instead of returning None
       
   445                         # (and thus avoid empty buffer)
       
   446                         if replace_sep:
       
   447                             value = value.replace(_char, replace_sep)
       
   448                         else:
       
   449                             return
       
   450                 value = value.replace('\\', r'\\')
       
   451                 if value is None:
       
   452                     return
       
   453                 if isinstance(value, unicode):
       
   454                     value = value.encode(encoding)
       
   455             elif isinstance(value, (date, datetime)):
       
   456                 # Do not use strftime, as it yields issue
       
   457                 # with date < 1900
       
   458                 value = '%04d-%02d-%02d' % (value.year,
       
   459                                             value.month,
       
   460                                             value.day)
       
   461             else:
       
   462                 return None
       
   463             # We push the value to the new formatted row
       
   464             # if the value is not None and could be converted to a string.
       
   465             formatted_row.append(value)
       
   466         rows.append('\t'.join(formatted_row))
       
   467     return StringIO('\n'.join(rows))
   302 
   468 
   303 
   469 
   304 # object stores #################################################################
   470 # object stores #################################################################
   305 
   471 
   306 class ObjectStore(object):
   472 class ObjectStore(object):
   576                         self.errors[title] = (help, err)
   742                         self.errors[title] = (help, err)
   577         try:
   743         try:
   578             txuuid = self.store.commit()
   744             txuuid = self.store.commit()
   579             if txuuid is not None:
   745             if txuuid is not None:
   580                 self.tell('Transaction commited (txuuid: %s)' % txuuid)
   746                 self.tell('Transaction commited (txuuid: %s)' % txuuid)
   581         except QueryError, ex:
   747         except QueryError as ex:
   582             self.tell('Transaction aborted: %s' % ex)
   748             self.tell('Transaction aborted: %s' % ex)
   583         self._print_stats()
   749         self._print_stats()
   584         if self.errors:
   750         if self.errors:
   585             if self.askerror == 2 or (self.askerror and confirm('Display errors ?')):
   751             if self.askerror == 2 or (self.askerror and confirm('Display errors ?')):
   586                 from pprint import pformat
   752                 from pprint import pformat
   587                 for errkey, error in self.errors.items():
   753                 for errkey, error in self.errors.items():
   588                     self.tell("\n%s (%s): %d\n" % (error[0], errkey, len(error[1])))
   754                     self.tell("\n%s (%s): %d\n" % (error[0], errkey, len(error[1])))
   589                     self.tell(pformat(sorted(error[1])))
   755                     self.tell(pformat(sorted(error[1])))
   590 
   756 
   591     def _print_stats(self):
   757     def _print_stats(self):
   592         nberrors = sum(len(err) for err in self.errors.values())
   758         nberrors = sum(len(err) for err in self.errors.itervalues())
   593         self.tell('\nImport statistics: %i entities, %i types, %i relations and %i errors'
   759         self.tell('\nImport statistics: %i entities, %i types, %i relations and %i errors'
   594                   % (self.store.nb_inserted_entities,
   760                   % (self.store.nb_inserted_entities,
   595                      self.store.nb_inserted_types,
   761                      self.store.nb_inserted_types,
   596                      self.store.nb_inserted_relations,
   762                      self.store.nb_inserted_relations,
   597                      nberrors))
   763                      nberrors))
   753 
   919 
   754     def gen_created_by(self, entity):
   920     def gen_created_by(self, entity):
   755         return self.session.user.eid
   921         return self.session.user.eid
   756     def gen_owned_by(self, entity):
   922     def gen_owned_by(self, entity):
   757         return self.session.user.eid
   923         return self.session.user.eid
       
   924 
       
   925 
       
   926 ###########################################################################
       
   927 ## SQL object store #######################################################
       
   928 ###########################################################################
       
   929 class SQLGenObjectStore(NoHookRQLObjectStore):
       
   930     """Controller of the data import process. This version is based
       
   931     on direct insertions throught SQL command (COPY FROM or execute many).
       
   932 
       
   933     >>> store = SQLGenObjectStore(session)
       
   934     >>> store.create_entity('Person', ...)
       
   935     >>> store.flush()
       
   936     """
       
   937 
       
   938     def __init__(self, session, dump_output_dir=None, nb_threads_statement=3):
       
   939         """
       
   940         Initialize a SQLGenObjectStore.
       
   941 
       
   942         Parameters:
       
   943 
       
   944           - session: session on the cubicweb instance
       
   945           - dump_output_dir: a directory to dump failed statements
       
   946             for easier recovery. Default is None (no dump).
       
   947           - nb_threads_statement: number of threads used
       
   948             for SQL insertion (default is 3).
       
   949         """
       
   950         super(SQLGenObjectStore, self).__init__(session)
       
   951         ### hijack default source
       
   952         self.source = SQLGenSourceWrapper(
       
   953             self.source, session.vreg.schema,
       
   954             dump_output_dir=dump_output_dir,
       
   955             nb_threads_statement=nb_threads_statement)
       
   956         ### XXX This is done in super().__init__(), but should be
       
   957         ### redone here to link to the correct source
       
   958         self.add_relation = self.source.add_relation
       
   959         self.indexes_etypes = {}
       
   960 
       
   961     def flush(self):
       
   962         """Flush data to the database"""
       
   963         self.source.flush()
       
   964 
       
   965     def relate(self, subj_eid, rtype, obj_eid, subjtype=None):
       
   966         if subj_eid is None or obj_eid is None:
       
   967             return
       
   968         # XXX Could subjtype be inferred ?
       
   969         self.source.add_relation(self.session, subj_eid, rtype, obj_eid,
       
   970                                  self.rschema(rtype).inlined, subjtype)
       
   971 
       
   972     def drop_indexes(self, etype):
       
   973         """Drop indexes for a given entity type"""
       
   974         if etype not in self.indexes_etypes:
       
   975             cu = self.session.cnxset['system']
       
   976             def index_to_attr(index):
       
   977                 """turn an index name to (database) attribute name"""
       
   978                 return index.replace(etype.lower(), '').replace('idx', '').strip('_')
       
   979             indices = [(index, index_to_attr(index))
       
   980                        for index in self.source.dbhelper.list_indices(cu, etype)
       
   981                        # Do not consider 'cw_etype_pkey' index
       
   982                        if not index.endswith('key')]
       
   983             self.indexes_etypes[etype] = indices
       
   984         for index, attr in self.indexes_etypes[etype]:
       
   985             self.session.system_sql('DROP INDEX %s' % index)
       
   986 
       
   987     def create_indexes(self, etype):
       
   988         """Recreate indexes for a given entity type"""
       
   989         for index, attr in self.indexes_etypes.get(etype, []):
       
   990             sql = 'CREATE INDEX %s ON cw_%s(%s)' % (index, etype, attr)
       
   991             self.session.system_sql(sql)
       
   992 
       
   993 
       
   994 ###########################################################################
       
   995 ## SQL Source #############################################################
       
   996 ###########################################################################
       
   997 
       
   998 class SQLGenSourceWrapper(object):
       
   999 
       
  1000     def __init__(self, system_source, schema,
       
  1001                  dump_output_dir=None, nb_threads_statement=3):
       
  1002         self.system_source = system_source
       
  1003         self._sql = threading.local()
       
  1004         # Explicitely backport attributes from system source
       
  1005         self._storage_handler = self.system_source._storage_handler
       
  1006         self.preprocess_entity = self.system_source.preprocess_entity
       
  1007         self.sqlgen = self.system_source.sqlgen
       
  1008         self.copy_based_source = self.system_source.copy_based_source
       
  1009         self.uri = self.system_source.uri
       
  1010         self.eid = self.system_source.eid
       
  1011         # Directory to write temporary files
       
  1012         self.dump_output_dir = dump_output_dir
       
  1013         # Allow to execute code with SQLite backend that does
       
  1014         # not support (yet...) copy_from
       
  1015         # XXX Should be dealt with in logilab.database
       
  1016         spcfrom = system_source.dbhelper.dbapi_module.support_copy_from
       
  1017         self.support_copy_from = spcfrom
       
  1018         self.dbencoding = system_source.dbhelper.dbencoding
       
  1019         self.nb_threads_statement = nb_threads_statement
       
  1020         # initialize thread-local data for main thread
       
  1021         self.init_thread_locals()
       
  1022         self._inlined_rtypes_cache = {}
       
  1023         self._fill_inlined_rtypes_cache(schema)
       
  1024         self.schema = schema
       
  1025         self.do_fti = False
       
  1026 
       
  1027     def _fill_inlined_rtypes_cache(self, schema):
       
  1028         cache = self._inlined_rtypes_cache
       
  1029         for eschema in schema.entities():
       
  1030             for rschema in eschema.ordered_relations():
       
  1031                 if rschema.inlined:
       
  1032                     cache[eschema.type] = SQL_PREFIX + rschema.type
       
  1033 
       
  1034     def init_thread_locals(self):
       
  1035         """initializes thread-local data"""
       
  1036         self._sql.entities = defaultdict(list)
       
  1037         self._sql.relations = {}
       
  1038         self._sql.inlined_relations = {}
       
  1039         # keep track, for each eid of the corresponding data dict
       
  1040         self._sql.eid_insertdicts = {}
       
  1041 
       
  1042     def flush(self):
       
  1043         print 'starting flush'
       
  1044         _entities_sql = self._sql.entities
       
  1045         _relations_sql = self._sql.relations
       
  1046         _inlined_relations_sql = self._sql.inlined_relations
       
  1047         _insertdicts = self._sql.eid_insertdicts
       
  1048         try:
       
  1049             # try, for each inlined_relation, to find if we're also creating
       
  1050             # the host entity (i.e. the subject of the relation).
       
  1051             # In that case, simply update the insert dict and remove
       
  1052             # the need to make the
       
  1053             # UPDATE statement
       
  1054             for statement, datalist in _inlined_relations_sql.iteritems():
       
  1055                 new_datalist = []
       
  1056                 # for a given inlined relation,
       
  1057                 # browse each couple to be inserted
       
  1058                 for data in datalist:
       
  1059                     keys = list(data)
       
  1060                     # For inlined relations, it exists only two case:
       
  1061                     # (rtype, cw_eid) or (cw_eid, rtype)
       
  1062                     if keys[0] == 'cw_eid':
       
  1063                         rtype = keys[1]
       
  1064                     else:
       
  1065                         rtype = keys[0]
       
  1066                     updated_eid = data['cw_eid']
       
  1067                     if updated_eid in _insertdicts:
       
  1068                         _insertdicts[updated_eid][rtype] = data[rtype]
       
  1069                     else:
       
  1070                         # could not find corresponding insert dict, keep the
       
  1071                         # UPDATE query
       
  1072                         new_datalist.append(data)
       
  1073                 _inlined_relations_sql[statement] = new_datalist
       
  1074             _import_statements(self.system_source.get_connection,
       
  1075                                _entities_sql.items()
       
  1076                                + _relations_sql.items()
       
  1077                                + _inlined_relations_sql.items(),
       
  1078                                dump_output_dir=self.dump_output_dir,
       
  1079                                nb_threads=self.nb_threads_statement,
       
  1080                                support_copy_from=self.support_copy_from,
       
  1081                                encoding=self.dbencoding)
       
  1082         except:
       
  1083             print 'failed to flush'
       
  1084         finally:
       
  1085             _entities_sql.clear()
       
  1086             _relations_sql.clear()
       
  1087             _insertdicts.clear()
       
  1088             _inlined_relations_sql.clear()
       
  1089             print 'flush done'
       
  1090 
       
  1091     def add_relation(self, session, subject, rtype, object,
       
  1092                      inlined=False, subjtype=None):
       
  1093         if inlined:
       
  1094             _sql = self._sql.inlined_relations
       
  1095             data = {'cw_eid': subject, SQL_PREFIX + rtype: object}
       
  1096             if subjtype is None:
       
  1097                 # Try to infer it
       
  1098                 targets = [t.type for t in
       
  1099                            self.schema.rschema(rtype).targets()]
       
  1100                 if len(targets) == 1:
       
  1101                     subjtype = targets[0]
       
  1102                 else:
       
  1103                     raise ValueError('You should give the subject etype for '
       
  1104                                      'inlined relation %s'
       
  1105                                      ', as it cannot be inferred' % rtype)
       
  1106             statement = self.sqlgen.update(SQL_PREFIX + subjtype,
       
  1107                                            data, ['cw_eid'])
       
  1108         else:
       
  1109             _sql = self._sql.relations
       
  1110             data = {'eid_from': subject, 'eid_to': object}
       
  1111             statement = self.sqlgen.insert('%s_relation' % rtype, data)
       
  1112         if statement in _sql:
       
  1113             _sql[statement].append(data)
       
  1114         else:
       
  1115             _sql[statement] = [data]
       
  1116 
       
  1117     def add_entity(self, session, entity):
       
  1118         with self._storage_handler(entity, 'added'):
       
  1119             attrs = self.preprocess_entity(entity)
       
  1120             rtypes = self._inlined_rtypes_cache.get(entity.__regid__, ())
       
  1121             if isinstance(rtypes, str):
       
  1122                 rtypes = (rtypes,)
       
  1123             for rtype in rtypes:
       
  1124                 if rtype not in attrs:
       
  1125                     attrs[rtype] = None
       
  1126             sql = self.sqlgen.insert(SQL_PREFIX + entity.__regid__, attrs)
       
  1127             self._sql.eid_insertdicts[entity.eid] = attrs
       
  1128             self._append_to_entities(sql, attrs)
       
  1129 
       
  1130     def _append_to_entities(self, sql, attrs):
       
  1131         self._sql.entities[sql].append(attrs)
       
  1132 
       
  1133     def _handle_insert_entity_sql(self, session, sql, attrs):
       
  1134         # We have to overwrite the source given in parameters
       
  1135         # as here, we directly use the system source
       
  1136         attrs['source'] = 'system'
       
  1137         attrs['asource'] = self.system_source.uri
       
  1138         self._append_to_entities(sql, attrs)
       
  1139 
       
  1140     def _handle_is_relation_sql(self, session, sql, attrs):
       
  1141         self._append_to_entities(sql, attrs)
       
  1142 
       
  1143     def _handle_is_instance_of_sql(self, session, sql, attrs):
       
  1144         self._append_to_entities(sql, attrs)
       
  1145 
       
  1146     def _handle_source_relation_sql(self, session, sql, attrs):
       
  1147         self._append_to_entities(sql, attrs)
       
  1148 
       
  1149     # XXX add_info is similar to the one in NativeSQLSource. It is rewritten
       
  1150     # here to correctly used the _handle_xxx of the SQLGenSourceWrapper. This
       
  1151     # part should be rewritten in a more clearly way.
       
  1152     def add_info(self, session, entity, source, extid, complete):
       
  1153         """add type and source info for an eid into the system table"""
       
  1154         # begin by inserting eid/type/source/extid into the entities table
       
  1155         if extid is not None:
       
  1156             assert isinstance(extid, str)
       
  1157             extid = b64encode(extid)
       
  1158         uri = 'system' if source.copy_based_source else source.uri
       
  1159         attrs = {'type': entity.__regid__, 'eid': entity.eid, 'extid': extid,
       
  1160                  'source': uri, 'asource': source.uri, 'mtime': datetime.utcnow()}
       
  1161         self._handle_insert_entity_sql(session, self.sqlgen.insert('entities', attrs), attrs)
       
  1162         # insert core relations: is, is_instance_of and cw_source
       
  1163         try:
       
  1164             self._handle_is_relation_sql(session, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)',
       
  1165                                          (entity.eid, eschema_eid(session, entity.e_schema)))
       
  1166         except IndexError:
       
  1167             # during schema serialization, skip
       
  1168             pass
       
  1169         else:
       
  1170             for eschema in entity.e_schema.ancestors() + [entity.e_schema]:
       
  1171                 self._handle_is_relation_sql(session,
       
  1172                                              'INSERT INTO is_instance_of_relation(eid_from,eid_to) VALUES (%s,%s)',
       
  1173                                              (entity.eid, eschema_eid(session, eschema)))
       
  1174         if 'CWSource' in self.schema and source.eid is not None: # else, cw < 3.10
       
  1175             self._handle_is_relation_sql(session, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)',
       
  1176                                          (entity.eid, source.eid))
       
  1177         # now we can update the full text index
       
  1178         if self.do_fti and self.need_fti_indexation(entity.__regid__):
       
  1179             if complete:
       
  1180                 entity.complete(entity.e_schema.indexable_attributes())
       
  1181             self.index_entity(session, entity=entity)