dataimport/__init__.py
changeset 10350 31327bd26931
parent 10349 efbbf1e93a04
child 10457 1f5026e7d848
equal deleted inserted replaced
10349:efbbf1e93a04 10350:31327bd26931
       
     1 # -*- coding: utf-8 -*-
       
     2 # copyright 2003-2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     3 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     4 #
       
     5 # This file is part of CubicWeb.
       
     6 #
       
     7 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     8 # terms of the GNU Lesser General Public License as published by the Free
       
     9 # Software Foundation, either version 2.1 of the License, or (at your option)
       
    10 # any later version.
       
    11 #
       
    12 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    13 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    14 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    15 # details.
       
    16 #
       
    17 # You should have received a copy of the GNU Lesser General Public License along
       
    18 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    19 """This module provides tools to import tabular data.
       
    20 
       
    21 
       
    22 Example of use (run this with `cubicweb-ctl shell instance import-script.py`):
       
    23 
       
    24 .. sourcecode:: python
       
    25 
       
    26   from cubicweb.dataimport import *
       
    27   # define data generators
       
    28   GENERATORS = []
       
    29 
       
    30   USERS = [('Prenom', 'firstname', ()),
       
    31            ('Nom', 'surname', ()),
       
    32            ('Identifiant', 'login', ()),
       
    33            ]
       
    34 
       
    35   def gen_users(ctl):
       
    36       for row in ctl.iter_and_commit('utilisateurs'):
       
    37           entity = mk_entity(row, USERS)
       
    38           entity['upassword'] = 'motdepasse'
       
    39           ctl.check('login', entity['login'], None)
       
    40           entity = ctl.store.create_entity('CWUser', **entity)
       
    41           email = ctl.store.create_entity('EmailAddress', address=row['email'])
       
    42           ctl.store.relate(entity.eid, 'use_email', email.eid)
       
    43           ctl.store.rql('SET U in_group G WHERE G name "users", U eid %(x)s', {'x':entity['eid']})
       
    44 
       
    45   CHK = [('login', check_doubles, 'Utilisateurs Login',
       
    46           'Deux utilisateurs ne devraient pas avoir le même login.'),
       
    47          ]
       
    48 
       
    49   GENERATORS.append( (gen_users, CHK) )
       
    50 
       
    51   # create controller
       
    52   ctl = CWImportController(RQLObjectStore(cnx))
       
    53   ctl.askerror = 1
       
    54   ctl.generators = GENERATORS
       
    55   ctl.data['utilisateurs'] = lazytable(ucsvreader(open('users.csv')))
       
    56   # run
       
    57   ctl.run()
       
    58 
       
    59 .. BUG file with one column are not parsable
       
    60 .. TODO rollback() invocation is not possible yet
       
    61 """
       
    62 __docformat__ = "restructuredtext en"
       
    63 
       
    64 import csv
       
    65 import sys
       
    66 import threading
       
    67 import traceback
       
    68 import warnings
       
    69 import cPickle
       
    70 import os.path as osp
       
    71 import inspect
       
    72 from base64 import b64encode
       
    73 from collections import defaultdict
       
    74 from copy import copy
       
    75 from datetime import date, datetime, time
       
    76 from time import asctime
       
    77 from StringIO import StringIO
       
    78 
       
    79 from logilab.common import shellutils, attrdict
       
    80 from logilab.common.date import strptime
       
    81 from logilab.common.decorators import cached
       
    82 from logilab.common.deprecation import deprecated
       
    83 
       
    84 from cubicweb import QueryError
       
    85 from cubicweb.utils import make_uid
       
    86 from cubicweb.schema import META_RTYPES, VIRTUAL_RTYPES
       
    87 from cubicweb.server.edition import EditedEntity
       
    88 from cubicweb.server.sqlutils import SQL_PREFIX
       
    89 from cubicweb.server.utils import eschema_eid
       
    90 
       
    91 
       
    92 def count_lines(stream_or_filename):
       
    93     if isinstance(stream_or_filename, basestring):
       
    94         f = open(stream_or_filename)
       
    95     else:
       
    96         f = stream_or_filename
       
    97         f.seek(0)
       
    98     for i, line in enumerate(f):
       
    99         pass
       
   100     f.seek(0)
       
   101     return i+1
       
   102 
       
   103 def ucsvreader_pb(stream_or_path, encoding='utf-8', delimiter=',', quotechar='"',
       
   104                   skipfirst=False, withpb=True, skip_empty=True, separator=None,
       
   105                   quote=None):
       
   106     """same as :func:`ucsvreader` but a progress bar is displayed as we iter on rows"""
       
   107     if separator is not None:
       
   108         delimiter = separator
       
   109         warnings.warn("[3.20] 'separator' kwarg is deprecated, use 'delimiter' instead")
       
   110     if quote is not None:
       
   111         quotechar = quote
       
   112         warnings.warn("[3.20] 'quote' kwarg is deprecated, use 'quotechar' instead")
       
   113     if isinstance(stream_or_path, basestring):
       
   114         if not osp.exists(stream_or_path):
       
   115             raise Exception("file doesn't exists: %s" % stream_or_path)
       
   116         stream = open(stream_or_path)
       
   117     else:
       
   118         stream = stream_or_path
       
   119     rowcount = count_lines(stream)
       
   120     if skipfirst:
       
   121         rowcount -= 1
       
   122     if withpb:
       
   123         pb = shellutils.ProgressBar(rowcount, 50)
       
   124     for urow in ucsvreader(stream, encoding, delimiter, quotechar,
       
   125                            skipfirst=skipfirst, skip_empty=skip_empty):
       
   126         yield urow
       
   127         if withpb:
       
   128             pb.update()
       
   129     print ' %s rows imported' % rowcount
       
   130 
       
   131 def ucsvreader(stream, encoding='utf-8', delimiter=',', quotechar='"',
       
   132                skipfirst=False, ignore_errors=False, skip_empty=True,
       
   133                separator=None, quote=None):
       
   134     """A csv reader that accepts files with any encoding and outputs unicode
       
   135     strings
       
   136 
       
   137     if skip_empty (the default), lines without any values specified (only
       
   138     separators) will be skipped. This is useful for Excel exports which may be
       
   139     full of such lines.
       
   140     """
       
   141     if separator is not None:
       
   142         delimiter = separator
       
   143         warnings.warn("[3.20] 'separator' kwarg is deprecated, use 'delimiter' instead")
       
   144     if quote is not None:
       
   145         quotechar = quote
       
   146         warnings.warn("[3.20] 'quote' kwarg is deprecated, use 'quotechar' instead")
       
   147     it = iter(csv.reader(stream, delimiter=delimiter, quotechar=quotechar))
       
   148     if not ignore_errors:
       
   149         if skipfirst:
       
   150             it.next()
       
   151         for row in it:
       
   152             decoded = [item.decode(encoding) for item in row]
       
   153             if not skip_empty or any(decoded):
       
   154                 yield decoded
       
   155     else:
       
   156         if skipfirst:
       
   157             try:
       
   158                 row = it.next()
       
   159             except csv.Error:
       
   160                 pass
       
   161         # Safe version, that can cope with error in CSV file
       
   162         while True:
       
   163             try:
       
   164                 row = it.next()
       
   165             # End of CSV, break
       
   166             except StopIteration:
       
   167                 break
       
   168             # Error in CSV, ignore line and continue
       
   169             except csv.Error:
       
   170                 continue
       
   171             decoded = [item.decode(encoding) for item in row]
       
   172             if not skip_empty or any(decoded):
       
   173                 yield decoded
       
   174 
       
   175 
       
   176 def callfunc_every(func, number, iterable):
       
   177     """yield items of `iterable` one by one and call function `func`
       
   178     every `number` iterations. Always call function `func` at the end.
       
   179     """
       
   180     for idx, item in enumerate(iterable):
       
   181         yield item
       
   182         if not idx % number:
       
   183             func()
       
   184     func()
       
   185 
       
   186 def lazytable(reader):
       
   187     """The first row is taken to be the header of the table and
       
   188     used to output a dict for each row of data.
       
   189 
       
   190     >>> data = lazytable(ucsvreader(open(filename)))
       
   191     """
       
   192     header = reader.next()
       
   193     for row in reader:
       
   194         yield dict(zip(header, row))
       
   195 
       
   196 def lazydbtable(cu, table, headers, orderby=None):
       
   197     """return an iterator on rows of a sql table. On each row, fetch columns
       
   198     defined in headers and return values as a dictionary.
       
   199 
       
   200     >>> data = lazydbtable(cu, 'experimentation', ('id', 'nickname', 'gps'))
       
   201     """
       
   202     sql = 'SELECT %s FROM %s' % (','.join(headers), table,)
       
   203     if orderby:
       
   204         sql += ' ORDER BY %s' % ','.join(orderby)
       
   205     cu.execute(sql)
       
   206     while True:
       
   207         row = cu.fetchone()
       
   208         if row is None:
       
   209             break
       
   210         yield dict(zip(headers, row))
       
   211 
       
   212 def mk_entity(row, map):
       
   213     """Return a dict made from sanitized mapped values.
       
   214 
       
   215     ValueError can be raised on unexpected values found in checkers
       
   216 
       
   217     >>> row = {'myname': u'dupont'}
       
   218     >>> map = [('myname', u'name', (call_transform_method('title'),))]
       
   219     >>> mk_entity(row, map)
       
   220     {'name': u'Dupont'}
       
   221     >>> row = {'myname': u'dupont', 'optname': u''}
       
   222     >>> map = [('myname', u'name', (call_transform_method('title'),)),
       
   223     ...        ('optname', u'MARKER', (optional,))]
       
   224     >>> mk_entity(row, map)
       
   225     {'name': u'Dupont', 'optname': None}
       
   226     """
       
   227     res = {}
       
   228     assert isinstance(row, dict)
       
   229     assert isinstance(map, list)
       
   230     for src, dest, funcs in map:
       
   231         try:
       
   232             res[dest] = row[src]
       
   233         except KeyError:
       
   234             continue
       
   235         try:
       
   236             for func in funcs:
       
   237                 res[dest] = func(res[dest])
       
   238                 if res[dest] is None:
       
   239                     break
       
   240         except ValueError as err:
       
   241             raise ValueError('error with %r field: %s' % (src, err)), None, sys.exc_info()[-1]
       
   242     return res
       
   243 
       
   244 # user interactions ############################################################
       
   245 
       
   246 def tell(msg):
       
   247     print msg
       
   248 
       
   249 def confirm(question):
       
   250     """A confirm function that asks for yes/no/abort and exits on abort."""
       
   251     answer = shellutils.ASK.ask(question, ('Y', 'n', 'abort'), 'Y')
       
   252     if answer == 'abort':
       
   253         sys.exit(1)
       
   254     return answer == 'Y'
       
   255 
       
   256 
       
   257 class catch_error(object):
       
   258     """Helper for @contextmanager decorator."""
       
   259 
       
   260     def __init__(self, ctl, key='unexpected error', msg=None):
       
   261         self.ctl = ctl
       
   262         self.key = key
       
   263         self.msg = msg
       
   264 
       
   265     def __enter__(self):
       
   266         return self
       
   267 
       
   268     def __exit__(self, type, value, traceback):
       
   269         if type is not None:
       
   270             if issubclass(type, (KeyboardInterrupt, SystemExit)):
       
   271                 return # re-raise
       
   272             if self.ctl.catcherrors:
       
   273                 self.ctl.record_error(self.key, None, type, value, traceback)
       
   274                 return True # silent
       
   275 
       
   276 
       
   277 # base sanitizing/coercing functions ###########################################
       
   278 
       
   279 def optional(value):
       
   280     """checker to filter optional field
       
   281 
       
   282     If value is undefined (ex: empty string), return None that will
       
   283     break the checkers validation chain
       
   284 
       
   285     General use is to add 'optional' check in first condition to avoid
       
   286     ValueError by further checkers
       
   287 
       
   288     >>> MAPPER = [(u'value', 'value', (optional, int))]
       
   289     >>> row = {'value': u'XXX'}
       
   290     >>> mk_entity(row, MAPPER)
       
   291     {'value': None}
       
   292     >>> row = {'value': u'100'}
       
   293     >>> mk_entity(row, MAPPER)
       
   294     {'value': 100}
       
   295     """
       
   296     if value:
       
   297         return value
       
   298     return None
       
   299 
       
   300 def required(value):
       
   301     """raise ValueError if value is empty
       
   302 
       
   303     This check should be often found in last position in the chain.
       
   304     """
       
   305     if value:
       
   306         return value
       
   307     raise ValueError("required")
       
   308 
       
   309 def todatetime(format='%d/%m/%Y'):
       
   310     """return a transformation function to turn string input value into a
       
   311     `datetime.datetime` instance, using given format.
       
   312 
       
   313     Follow it by `todate` or `totime` functions from `logilab.common.date` if
       
   314     you want a `date`/`time` instance instead of `datetime`.
       
   315     """
       
   316     def coerce(value):
       
   317         return strptime(value, format)
       
   318     return coerce
       
   319 
       
   320 def call_transform_method(methodname, *args, **kwargs):
       
   321     """return value returned by calling the given method on input"""
       
   322     def coerce(value):
       
   323         return getattr(value, methodname)(*args, **kwargs)
       
   324     return coerce
       
   325 
       
   326 def call_check_method(methodname, *args, **kwargs):
       
   327     """check value returned by calling the given method on input is true,
       
   328     else raise ValueError
       
   329     """
       
   330     def check(value):
       
   331         if getattr(value, methodname)(*args, **kwargs):
       
   332             return value
       
   333         raise ValueError('%s not verified on %r' % (methodname, value))
       
   334     return check
       
   335 
       
   336 # base integrity checking functions ############################################
       
   337 
       
   338 def check_doubles(buckets):
       
   339     """Extract the keys that have more than one item in their bucket."""
       
   340     return [(k, len(v)) for k, v in buckets.items() if len(v) > 1]
       
   341 
       
   342 def check_doubles_not_none(buckets):
       
   343     """Extract the keys that have more than one item in their bucket."""
       
   344     return [(k, len(v)) for k, v in buckets.items()
       
   345             if k is not None and len(v) > 1]
       
   346 
       
   347 # sql generator utility functions #############################################
       
   348 
       
   349 
       
   350 def _import_statements(sql_connect, statements, nb_threads=3,
       
   351                        dump_output_dir=None,
       
   352                        support_copy_from=True, encoding='utf-8'):
       
   353     """
       
   354     Import a bunch of sql statements, using different threads.
       
   355     """
       
   356     try:
       
   357         chunksize = (len(statements) / nb_threads) + 1
       
   358         threads = []
       
   359         for i in xrange(nb_threads):
       
   360             chunks = statements[i*chunksize:(i+1)*chunksize]
       
   361             thread = threading.Thread(target=_execmany_thread,
       
   362                                       args=(sql_connect, chunks,
       
   363                                             dump_output_dir,
       
   364                                             support_copy_from,
       
   365                                             encoding))
       
   366             thread.start()
       
   367             threads.append(thread)
       
   368         for t in threads:
       
   369             t.join()
       
   370     except Exception:
       
   371         print 'Error in import statements'
       
   372 
       
   373 def _execmany_thread_not_copy_from(cu, statement, data, table=None,
       
   374                                    columns=None, encoding='utf-8'):
       
   375     """ Execute thread without copy from
       
   376     """
       
   377     cu.executemany(statement, data)
       
   378 
       
   379 def _execmany_thread_copy_from(cu, statement, data, table,
       
   380                                columns, encoding='utf-8'):
       
   381     """ Execute thread with copy from
       
   382     """
       
   383     buf = _create_copyfrom_buffer(data, columns, encoding=encoding)
       
   384     if buf is None:
       
   385         _execmany_thread_not_copy_from(cu, statement, data)
       
   386     else:
       
   387         if columns is None:
       
   388             cu.copy_from(buf, table, null='NULL')
       
   389         else:
       
   390             cu.copy_from(buf, table, null='NULL', columns=columns)
       
   391 
       
   392 def _execmany_thread(sql_connect, statements, dump_output_dir=None,
       
   393                      support_copy_from=True, encoding='utf-8'):
       
   394     """
       
   395     Execute sql statement. If 'INSERT INTO', try to use 'COPY FROM' command,
       
   396     or fallback to execute_many.
       
   397     """
       
   398     if support_copy_from:
       
   399         execmany_func = _execmany_thread_copy_from
       
   400     else:
       
   401         execmany_func = _execmany_thread_not_copy_from
       
   402     cnx = sql_connect()
       
   403     cu = cnx.cursor()
       
   404     try:
       
   405         for statement, data in statements:
       
   406             table = None
       
   407             columns = None
       
   408             try:
       
   409                 if not statement.startswith('INSERT INTO'):
       
   410                     cu.executemany(statement, data)
       
   411                     continue
       
   412                 table = statement.split()[2]
       
   413                 if isinstance(data[0], (tuple, list)):
       
   414                     columns = None
       
   415                 else:
       
   416                     columns = list(data[0])
       
   417                 execmany_func(cu, statement, data, table, columns, encoding)
       
   418             except Exception:
       
   419                 print 'unable to copy data into table %s' % table
       
   420                 # Error in import statement, save data in dump_output_dir
       
   421                 if dump_output_dir is not None:
       
   422                     pdata = {'data': data, 'statement': statement,
       
   423                              'time': asctime(), 'columns': columns}
       
   424                     filename = make_uid()
       
   425                     try:
       
   426                         with open(osp.join(dump_output_dir,
       
   427                                            '%s.pickle' % filename), 'w') as fobj:
       
   428                             fobj.write(cPickle.dumps(pdata))
       
   429                     except IOError:
       
   430                         print 'ERROR while pickling in', dump_output_dir, filename+'.pickle'
       
   431                         pass
       
   432                 cnx.rollback()
       
   433                 raise
       
   434     finally:
       
   435         cnx.commit()
       
   436         cu.close()
       
   437 
       
   438 
       
   439 def _copyfrom_buffer_convert_None(value, **opts):
       
   440     '''Convert None value to "NULL"'''
       
   441     return 'NULL'
       
   442 
       
   443 def _copyfrom_buffer_convert_number(value, **opts):
       
   444     '''Convert a number into its string representation'''
       
   445     return str(value)
       
   446 
       
   447 def _copyfrom_buffer_convert_string(value, **opts):
       
   448     '''Convert string value.
       
   449 
       
   450     Recognized keywords:
       
   451     :encoding: resulting string encoding (default: utf-8)
       
   452     '''
       
   453     encoding = opts.get('encoding','utf-8')
       
   454     escape_chars = ((u'\\', ur'\\'), (u'\t', u'\\t'), (u'\r', u'\\r'),
       
   455                     (u'\n', u'\\n'))
       
   456     for char, replace in escape_chars:
       
   457         value = value.replace(char, replace)
       
   458     if isinstance(value, unicode):
       
   459         value = value.encode(encoding)
       
   460     return value
       
   461 
       
   462 def _copyfrom_buffer_convert_date(value, **opts):
       
   463     '''Convert date into "YYYY-MM-DD"'''
       
   464     # Do not use strftime, as it yields issue with date < 1900
       
   465     # (http://bugs.python.org/issue1777412)
       
   466     return '%04d-%02d-%02d' % (value.year, value.month, value.day)
       
   467 
       
   468 def _copyfrom_buffer_convert_datetime(value, **opts):
       
   469     '''Convert date into "YYYY-MM-DD HH:MM:SS.UUUUUU"'''
       
   470     # Do not use strftime, as it yields issue with date < 1900
       
   471     # (http://bugs.python.org/issue1777412)
       
   472     return '%s %s' % (_copyfrom_buffer_convert_date(value, **opts),
       
   473                       _copyfrom_buffer_convert_time(value, **opts))
       
   474 
       
   475 def _copyfrom_buffer_convert_time(value, **opts):
       
   476     '''Convert time into "HH:MM:SS.UUUUUU"'''
       
   477     return '%02d:%02d:%02d.%06d' % (value.hour, value.minute,
       
   478                                     value.second, value.microsecond)
       
   479 
       
   480 # (types, converter) list.
       
   481 _COPYFROM_BUFFER_CONVERTERS = [
       
   482     (type(None), _copyfrom_buffer_convert_None),
       
   483     ((long, int, float), _copyfrom_buffer_convert_number),
       
   484     (basestring, _copyfrom_buffer_convert_string),
       
   485     (datetime, _copyfrom_buffer_convert_datetime),
       
   486     (date, _copyfrom_buffer_convert_date),
       
   487     (time, _copyfrom_buffer_convert_time),
       
   488 ]
       
   489 
       
   490 def _create_copyfrom_buffer(data, columns=None, **convert_opts):
       
   491     """
       
   492     Create a StringIO buffer for 'COPY FROM' command.
       
   493     Deals with Unicode, Int, Float, Date... (see ``converters``)
       
   494 
       
   495     :data: a sequence/dict of tuples
       
   496     :columns: list of columns to consider (default to all columns)
       
   497     :converter_opts: keyword arguements given to converters
       
   498     """
       
   499     # Create a list rather than directly create a StringIO
       
   500     # to correctly write lines separated by '\n' in a single step
       
   501     rows = []
       
   502     if columns is None:
       
   503         if isinstance(data[0], (tuple, list)):
       
   504             columns = range(len(data[0]))
       
   505         elif isinstance(data[0], dict):
       
   506             columns = data[0].keys()
       
   507         else:
       
   508             raise ValueError('Could not get columns: you must provide columns.')
       
   509     for row in data:
       
   510         # Iterate over the different columns and the different values
       
   511         # and try to convert them to a correct datatype.
       
   512         # If an error is raised, do not continue.
       
   513         formatted_row = []
       
   514         for col in columns:
       
   515             try:
       
   516                 value = row[col]
       
   517             except KeyError:
       
   518                 warnings.warn(u"Column %s is not accessible in row %s"
       
   519                               % (col, row), RuntimeWarning)
       
   520                 # XXX 'value' set to None so that the import does not end in
       
   521                 # error.
       
   522                 # Instead, the extra keys are set to NULL from the
       
   523                 # database point of view.
       
   524                 value = None
       
   525             for types, converter in _COPYFROM_BUFFER_CONVERTERS:
       
   526                 if isinstance(value, types):
       
   527                     value = converter(value, **convert_opts)
       
   528                     break
       
   529             else:
       
   530                 raise ValueError("Unsupported value type %s" % type(value))
       
   531             # We push the value to the new formatted row
       
   532             # if the value is not None and could be converted to a string.
       
   533             formatted_row.append(value)
       
   534         rows.append('\t'.join(formatted_row))
       
   535     return StringIO('\n'.join(rows))
       
   536 
       
   537 
       
   538 # object stores #################################################################
       
   539 
       
   540 class ObjectStore(object):
       
   541     """Store objects in memory for *faster* validation (development mode)
       
   542 
       
   543     But it will not enforce the constraints of the schema and hence will miss some problems
       
   544 
       
   545     >>> store = ObjectStore()
       
   546     >>> user = store.create_entity('CWUser', login=u'johndoe')
       
   547     >>> group = store.create_entity('CWUser', name=u'unknown')
       
   548     >>> store.relate(user.eid, 'in_group', group.eid)
       
   549     """
       
   550     def __init__(self):
       
   551         self.items = []
       
   552         self.eids = {}
       
   553         self.types = {}
       
   554         self.relations = set()
       
   555         self.indexes = {}
       
   556 
       
   557     def create_entity(self, etype, **data):
       
   558         data = attrdict(data)
       
   559         data['eid'] = eid = len(self.items)
       
   560         self.items.append(data)
       
   561         self.eids[eid] = data
       
   562         self.types.setdefault(etype, []).append(eid)
       
   563         return data
       
   564 
       
   565     def relate(self, eid_from, rtype, eid_to, **kwargs):
       
   566         """Add new relation"""
       
   567         relation = eid_from, rtype, eid_to
       
   568         self.relations.add(relation)
       
   569         return relation
       
   570 
       
   571     def commit(self):
       
   572         """this commit method does nothing by default"""
       
   573         return
       
   574 
       
   575     def flush(self):
       
   576         """The method is provided so that all stores share a common API"""
       
   577         pass
       
   578 
       
   579     @property
       
   580     def nb_inserted_entities(self):
       
   581         return len(self.eids)
       
   582     @property
       
   583     def nb_inserted_types(self):
       
   584         return len(self.types)
       
   585     @property
       
   586     def nb_inserted_relations(self):
       
   587         return len(self.relations)
       
   588 
       
   589 class RQLObjectStore(ObjectStore):
       
   590     """ObjectStore that works with an actual RQL repository (production mode)"""
       
   591 
       
   592     def __init__(self, cnx, commit=None):
       
   593         if commit is not None:
       
   594             warnings.warn('[3.19] commit argument should not be specified '
       
   595                           'as the cnx object already provides it.',
       
   596                           DeprecationWarning, stacklevel=2)
       
   597         super(RQLObjectStore, self).__init__()
       
   598         self._cnx = cnx
       
   599         self._commit = commit or cnx.commit
       
   600 
       
   601     def commit(self):
       
   602         return self._commit()
       
   603 
       
   604     def rql(self, *args):
       
   605         return self._cnx.execute(*args)
       
   606 
       
   607     @property
       
   608     def session(self):
       
   609         warnings.warn('[3.19] deprecated property.', DeprecationWarning,
       
   610                       stacklevel=2)
       
   611         return self._cnx.repo._get_session(self._cnx.sessionid)
       
   612 
       
   613     def create_entity(self, *args, **kwargs):
       
   614         entity = self._cnx.create_entity(*args, **kwargs)
       
   615         self.eids[entity.eid] = entity
       
   616         self.types.setdefault(args[0], []).append(entity.eid)
       
   617         return entity
       
   618 
       
   619     def relate(self, eid_from, rtype, eid_to, **kwargs):
       
   620         eid_from, rtype, eid_to = super(RQLObjectStore, self).relate(
       
   621             eid_from, rtype, eid_to, **kwargs)
       
   622         self.rql('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype,
       
   623                  {'x': int(eid_from), 'y': int(eid_to)})
       
   624 
       
   625     @deprecated("[3.19] use cnx.find(*args, **kwargs).entities() instead")
       
   626     def find_entities(self, *args, **kwargs):
       
   627         return self._cnx.find(*args, **kwargs).entities()
       
   628 
       
   629     @deprecated("[3.19] use cnx.find(*args, **kwargs).one() instead")
       
   630     def find_one_entity(self, *args, **kwargs):
       
   631         return self._cnx.find(*args, **kwargs).one()
       
   632 
       
   633 # the import controller ########################################################
       
   634 
       
   635 class CWImportController(object):
       
   636     """Controller of the data import process.
       
   637 
       
   638     >>> ctl = CWImportController(store)
       
   639     >>> ctl.generators = list_of_data_generators
       
   640     >>> ctl.data = dict_of_data_tables
       
   641     >>> ctl.run()
       
   642     """
       
   643 
       
   644     def __init__(self, store, askerror=0, catcherrors=None, tell=tell,
       
   645                  commitevery=50):
       
   646         self.store = store
       
   647         self.generators = None
       
   648         self.data = {}
       
   649         self.errors = None
       
   650         self.askerror = askerror
       
   651         if  catcherrors is None:
       
   652             catcherrors = askerror
       
   653         self.catcherrors = catcherrors
       
   654         self.commitevery = commitevery # set to None to do a single commit
       
   655         self._tell = tell
       
   656 
       
   657     def check(self, type, key, value):
       
   658         self._checks.setdefault(type, {}).setdefault(key, []).append(value)
       
   659 
       
   660     def check_map(self, entity, key, map, default):
       
   661         try:
       
   662             entity[key] = map[entity[key]]
       
   663         except KeyError:
       
   664             self.check(key, entity[key], None)
       
   665             entity[key] = default
       
   666 
       
   667     def record_error(self, key, msg=None, type=None, value=None, tb=None):
       
   668         tmp = StringIO()
       
   669         if type is None:
       
   670             traceback.print_exc(file=tmp)
       
   671         else:
       
   672             traceback.print_exception(type, value, tb, file=tmp)
       
   673         # use a list to avoid counting a <nb lines> errors instead of one
       
   674         errorlog = self.errors.setdefault(key, [])
       
   675         if msg is None:
       
   676             errorlog.append(tmp.getvalue().splitlines())
       
   677         else:
       
   678             errorlog.append( (msg, tmp.getvalue().splitlines()) )
       
   679 
       
   680     def run(self):
       
   681         self.errors = {}
       
   682         if self.commitevery is None:
       
   683             self.tell('Will commit all or nothing.')
       
   684         else:
       
   685             self.tell('Will commit every %s iterations' % self.commitevery)
       
   686         for func, checks in self.generators:
       
   687             self._checks = {}
       
   688             func_name = func.__name__
       
   689             self.tell("Run import function '%s'..." % func_name)
       
   690             try:
       
   691                 func(self)
       
   692             except Exception:
       
   693                 if self.catcherrors:
       
   694                     self.record_error(func_name, 'While calling %s' % func.__name__)
       
   695                 else:
       
   696                     self._print_stats()
       
   697                     raise
       
   698             for key, func, title, help in checks:
       
   699                 buckets = self._checks.get(key)
       
   700                 if buckets:
       
   701                     err = func(buckets)
       
   702                     if err:
       
   703                         self.errors[title] = (help, err)
       
   704         try:
       
   705             txuuid = self.store.commit()
       
   706             if txuuid is not None:
       
   707                 self.tell('Transaction commited (txuuid: %s)' % txuuid)
       
   708         except QueryError as ex:
       
   709             self.tell('Transaction aborted: %s' % ex)
       
   710         self._print_stats()
       
   711         if self.errors:
       
   712             if self.askerror == 2 or (self.askerror and confirm('Display errors ?')):
       
   713                 from pprint import pformat
       
   714                 for errkey, error in self.errors.items():
       
   715                     self.tell("\n%s (%s): %d\n" % (error[0], errkey, len(error[1])))
       
   716                     self.tell(pformat(sorted(error[1])))
       
   717 
       
   718     def _print_stats(self):
       
   719         nberrors = sum(len(err) for err in self.errors.itervalues())
       
   720         self.tell('\nImport statistics: %i entities, %i types, %i relations and %i errors'
       
   721                   % (self.store.nb_inserted_entities,
       
   722                      self.store.nb_inserted_types,
       
   723                      self.store.nb_inserted_relations,
       
   724                      nberrors))
       
   725 
       
   726     def get_data(self, key):
       
   727         return self.data.get(key)
       
   728 
       
   729     def index(self, name, key, value, unique=False):
       
   730         """create a new index
       
   731 
       
   732         If unique is set to True, only first occurence will be kept not the following ones
       
   733         """
       
   734         if unique:
       
   735             try:
       
   736                 if value in self.store.indexes[name][key]:
       
   737                     return
       
   738             except KeyError:
       
   739                 # we're sure that one is the first occurence; so continue...
       
   740                 pass
       
   741         self.store.indexes.setdefault(name, {}).setdefault(key, []).append(value)
       
   742 
       
   743     def tell(self, msg):
       
   744         self._tell(msg)
       
   745 
       
   746     def iter_and_commit(self, datakey):
       
   747         """iter rows, triggering commit every self.commitevery iterations"""
       
   748         if self.commitevery is None:
       
   749             return self.get_data(datakey)
       
   750         else:
       
   751             return callfunc_every(self.store.commit,
       
   752                                   self.commitevery,
       
   753                                   self.get_data(datakey))
       
   754 
       
   755 
       
   756 class NoHookRQLObjectStore(RQLObjectStore):
       
   757     """ObjectStore that works with an actual RQL repository (production mode)"""
       
   758 
       
   759     def __init__(self, cnx, metagen=None, baseurl=None):
       
   760         super(NoHookRQLObjectStore, self).__init__(cnx)
       
   761         self.source = cnx.repo.system_source
       
   762         self.rschema = cnx.repo.schema.rschema
       
   763         self.add_relation = self.source.add_relation
       
   764         if metagen is None:
       
   765             metagen = MetaGenerator(cnx, baseurl)
       
   766         self.metagen = metagen
       
   767         self._nb_inserted_entities = 0
       
   768         self._nb_inserted_types = 0
       
   769         self._nb_inserted_relations = 0
       
   770         # deactivate security
       
   771         cnx.read_security = False
       
   772         cnx.write_security = False
       
   773 
       
   774     def create_entity(self, etype, **kwargs):
       
   775         for k, v in kwargs.iteritems():
       
   776             kwargs[k] = getattr(v, 'eid', v)
       
   777         entity, rels = self.metagen.base_etype_dicts(etype)
       
   778         # make a copy to keep cached entity pristine
       
   779         entity = copy(entity)
       
   780         entity.cw_edited = copy(entity.cw_edited)
       
   781         entity.cw_clear_relation_cache()
       
   782         entity.cw_edited.update(kwargs, skipsec=False)
       
   783         entity_source, extid = self.metagen.init_entity(entity)
       
   784         cnx = self._cnx
       
   785         self.source.add_entity(cnx, entity)
       
   786         self.source.add_info(cnx, entity, entity_source, extid)
       
   787         kwargs = dict()
       
   788         if inspect.getargspec(self.add_relation).keywords:
       
   789             kwargs['subjtype'] = entity.cw_etype
       
   790         for rtype, targeteids in rels.iteritems():
       
   791             # targeteids may be a single eid or a list of eids
       
   792             inlined = self.rschema(rtype).inlined
       
   793             try:
       
   794                 for targeteid in targeteids:
       
   795                     self.add_relation(cnx, entity.eid, rtype, targeteid,
       
   796                                       inlined, **kwargs)
       
   797             except TypeError:
       
   798                 self.add_relation(cnx, entity.eid, rtype, targeteids,
       
   799                                   inlined, **kwargs)
       
   800         self._nb_inserted_entities += 1
       
   801         return entity
       
   802 
       
   803     def relate(self, eid_from, rtype, eid_to, **kwargs):
       
   804         assert not rtype.startswith('reverse_')
       
   805         self.add_relation(self._cnx, eid_from, rtype, eid_to,
       
   806                           self.rschema(rtype).inlined)
       
   807         if self.rschema(rtype).symmetric:
       
   808             self.add_relation(self._cnx, eid_to, rtype, eid_from,
       
   809                               self.rschema(rtype).inlined)
       
   810         self._nb_inserted_relations += 1
       
   811 
       
   812     @property
       
   813     def nb_inserted_entities(self):
       
   814         return self._nb_inserted_entities
       
   815     @property
       
   816     def nb_inserted_types(self):
       
   817         return self._nb_inserted_types
       
   818     @property
       
   819     def nb_inserted_relations(self):
       
   820         return self._nb_inserted_relations
       
   821 
       
   822 
       
   823 class MetaGenerator(object):
       
   824     META_RELATIONS = (META_RTYPES
       
   825                       - VIRTUAL_RTYPES
       
   826                       - set(('eid', 'cwuri',
       
   827                              'is', 'is_instance_of', 'cw_source')))
       
   828 
       
   829     def __init__(self, cnx, baseurl=None, source=None):
       
   830         self._cnx = cnx
       
   831         if baseurl is None:
       
   832             config = cnx.vreg.config
       
   833             baseurl = config['base-url'] or config.default_base_url()
       
   834         if not baseurl[-1] == '/':
       
   835             baseurl += '/'
       
   836         self.baseurl = baseurl
       
   837         if source is None:
       
   838             source = cnx.repo.system_source
       
   839         self.source = source
       
   840         self.create_eid = cnx.repo.system_source.create_eid
       
   841         self.time = datetime.now()
       
   842         # attributes/relations shared by all entities of the same type
       
   843         self.etype_attrs = []
       
   844         self.etype_rels = []
       
   845         # attributes/relations specific to each entity
       
   846         self.entity_attrs = ['cwuri']
       
   847         #self.entity_rels = [] XXX not handled (YAGNI?)
       
   848         schema = cnx.vreg.schema
       
   849         rschema = schema.rschema
       
   850         for rtype in self.META_RELATIONS:
       
   851             # skip owned_by / created_by if user is the internal manager
       
   852             if cnx.user.eid == -1 and rtype in ('owned_by', 'created_by'):
       
   853                 continue
       
   854             if rschema(rtype).final:
       
   855                 self.etype_attrs.append(rtype)
       
   856             else:
       
   857                 self.etype_rels.append(rtype)
       
   858 
       
   859     @cached
       
   860     def base_etype_dicts(self, etype):
       
   861         entity = self._cnx.vreg['etypes'].etype_class(etype)(self._cnx)
       
   862         # entity are "surface" copied, avoid shared dict between copies
       
   863         del entity.cw_extra_kwargs
       
   864         entity.cw_edited = EditedEntity(entity)
       
   865         for attr in self.etype_attrs:
       
   866             genfunc = self.generate(attr)
       
   867             if genfunc:
       
   868                 entity.cw_edited.edited_attribute(attr, genfunc(entity))
       
   869         rels = {}
       
   870         for rel in self.etype_rels:
       
   871             genfunc = self.generate(rel)
       
   872             if genfunc:
       
   873                 rels[rel] = genfunc(entity)
       
   874         return entity, rels
       
   875 
       
   876     def init_entity(self, entity):
       
   877         entity.eid = self.create_eid(self._cnx)
       
   878         extid = entity.cw_edited.get('cwuri')
       
   879         for attr in self.entity_attrs:
       
   880             if attr in entity.cw_edited:
       
   881                 # already set, skip this attribute
       
   882                 continue
       
   883             genfunc = self.generate(attr)
       
   884             if genfunc:
       
   885                 entity.cw_edited.edited_attribute(attr, genfunc(entity))
       
   886         if isinstance(extid, unicode):
       
   887             extid = extid.encode('utf-8')
       
   888         return self.source, extid
       
   889 
       
   890     def generate(self, rtype):
       
   891         return getattr(self, 'gen_%s' % rtype, None)
       
   892 
       
   893     def gen_cwuri(self, entity):
       
   894         assert self.baseurl, 'baseurl is None while generating cwuri'
       
   895         return u'%s%s' % (self.baseurl, entity.eid)
       
   896 
       
   897     def gen_creation_date(self, entity):
       
   898         return self.time
       
   899 
       
   900     def gen_modification_date(self, entity):
       
   901         return self.time
       
   902 
       
   903     def gen_created_by(self, entity):
       
   904         return self._cnx.user.eid
       
   905 
       
   906     def gen_owned_by(self, entity):
       
   907         return self._cnx.user.eid
       
   908 
       
   909 
       
   910 ###########################################################################
       
   911 ## SQL object store #######################################################
       
   912 ###########################################################################
       
   913 class SQLGenObjectStore(NoHookRQLObjectStore):
       
   914     """Controller of the data import process. This version is based
       
   915     on direct insertions throught SQL command (COPY FROM or execute many).
       
   916 
       
   917     >>> store = SQLGenObjectStore(cnx)
       
   918     >>> store.create_entity('Person', ...)
       
   919     >>> store.flush()
       
   920     """
       
   921 
       
   922     def __init__(self, cnx, dump_output_dir=None, nb_threads_statement=3):
       
   923         """
       
   924         Initialize a SQLGenObjectStore.
       
   925 
       
   926         Parameters:
       
   927 
       
   928           - cnx: connection on the cubicweb instance
       
   929           - dump_output_dir: a directory to dump failed statements
       
   930             for easier recovery. Default is None (no dump).
       
   931           - nb_threads_statement: number of threads used
       
   932             for SQL insertion (default is 3).
       
   933         """
       
   934         super(SQLGenObjectStore, self).__init__(cnx)
       
   935         ### hijack default source
       
   936         self.source = SQLGenSourceWrapper(
       
   937             self.source, cnx.vreg.schema,
       
   938             dump_output_dir=dump_output_dir,
       
   939             nb_threads_statement=nb_threads_statement)
       
   940         ### XXX This is done in super().__init__(), but should be
       
   941         ### redone here to link to the correct source
       
   942         self.add_relation = self.source.add_relation
       
   943         self.indexes_etypes = {}
       
   944 
       
   945     def flush(self):
       
   946         """Flush data to the database"""
       
   947         self.source.flush()
       
   948 
       
   949     def relate(self, subj_eid, rtype, obj_eid, **kwargs):
       
   950         if subj_eid is None or obj_eid is None:
       
   951             return
       
   952         # XXX Could subjtype be inferred ?
       
   953         self.source.add_relation(self._cnx, subj_eid, rtype, obj_eid,
       
   954                                  self.rschema(rtype).inlined, **kwargs)
       
   955         if self.rschema(rtype).symmetric:
       
   956             self.source.add_relation(self._cnx, obj_eid, rtype, subj_eid,
       
   957                                      self.rschema(rtype).inlined, **kwargs)
       
   958 
       
   959     def drop_indexes(self, etype):
       
   960         """Drop indexes for a given entity type"""
       
   961         if etype not in self.indexes_etypes:
       
   962             cu = self._cnx.cnxset.cu
       
   963             def index_to_attr(index):
       
   964                 """turn an index name to (database) attribute name"""
       
   965                 return index.replace(etype.lower(), '').replace('idx', '').strip('_')
       
   966             indices = [(index, index_to_attr(index))
       
   967                        for index in self.source.dbhelper.list_indices(cu, etype)
       
   968                        # Do not consider 'cw_etype_pkey' index
       
   969                        if not index.endswith('key')]
       
   970             self.indexes_etypes[etype] = indices
       
   971         for index, attr in self.indexes_etypes[etype]:
       
   972             self._cnx.system_sql('DROP INDEX %s' % index)
       
   973 
       
   974     def create_indexes(self, etype):
       
   975         """Recreate indexes for a given entity type"""
       
   976         for index, attr in self.indexes_etypes.get(etype, []):
       
   977             sql = 'CREATE INDEX %s ON cw_%s(%s)' % (index, etype, attr)
       
   978             self._cnx.system_sql(sql)
       
   979 
       
   980 
       
   981 ###########################################################################
       
   982 ## SQL Source #############################################################
       
   983 ###########################################################################
       
   984 
       
   985 class SQLGenSourceWrapper(object):
       
   986 
       
   987     def __init__(self, system_source, schema,
       
   988                  dump_output_dir=None, nb_threads_statement=3):
       
   989         self.system_source = system_source
       
   990         self._sql = threading.local()
       
   991         # Explicitely backport attributes from system source
       
   992         self._storage_handler = self.system_source._storage_handler
       
   993         self.preprocess_entity = self.system_source.preprocess_entity
       
   994         self.sqlgen = self.system_source.sqlgen
       
   995         self.uri = self.system_source.uri
       
   996         self.eid = self.system_source.eid
       
   997         # Directory to write temporary files
       
   998         self.dump_output_dir = dump_output_dir
       
   999         # Allow to execute code with SQLite backend that does
       
  1000         # not support (yet...) copy_from
       
  1001         # XXX Should be dealt with in logilab.database
       
  1002         spcfrom = system_source.dbhelper.dbapi_module.support_copy_from
       
  1003         self.support_copy_from = spcfrom
       
  1004         self.dbencoding = system_source.dbhelper.dbencoding
       
  1005         self.nb_threads_statement = nb_threads_statement
       
  1006         # initialize thread-local data for main thread
       
  1007         self.init_thread_locals()
       
  1008         self._inlined_rtypes_cache = {}
       
  1009         self._fill_inlined_rtypes_cache(schema)
       
  1010         self.schema = schema
       
  1011         self.do_fti = False
       
  1012 
       
  1013     def _fill_inlined_rtypes_cache(self, schema):
       
  1014         cache = self._inlined_rtypes_cache
       
  1015         for eschema in schema.entities():
       
  1016             for rschema in eschema.ordered_relations():
       
  1017                 if rschema.inlined:
       
  1018                     cache[eschema.type] = SQL_PREFIX + rschema.type
       
  1019 
       
  1020     def init_thread_locals(self):
       
  1021         """initializes thread-local data"""
       
  1022         self._sql.entities = defaultdict(list)
       
  1023         self._sql.relations = {}
       
  1024         self._sql.inlined_relations = {}
       
  1025         # keep track, for each eid of the corresponding data dict
       
  1026         self._sql.eid_insertdicts = {}
       
  1027 
       
  1028     def flush(self):
       
  1029         print 'starting flush'
       
  1030         _entities_sql = self._sql.entities
       
  1031         _relations_sql = self._sql.relations
       
  1032         _inlined_relations_sql = self._sql.inlined_relations
       
  1033         _insertdicts = self._sql.eid_insertdicts
       
  1034         try:
       
  1035             # try, for each inlined_relation, to find if we're also creating
       
  1036             # the host entity (i.e. the subject of the relation).
       
  1037             # In that case, simply update the insert dict and remove
       
  1038             # the need to make the
       
  1039             # UPDATE statement
       
  1040             for statement, datalist in _inlined_relations_sql.iteritems():
       
  1041                 new_datalist = []
       
  1042                 # for a given inlined relation,
       
  1043                 # browse each couple to be inserted
       
  1044                 for data in datalist:
       
  1045                     keys = list(data)
       
  1046                     # For inlined relations, it exists only two case:
       
  1047                     # (rtype, cw_eid) or (cw_eid, rtype)
       
  1048                     if keys[0] == 'cw_eid':
       
  1049                         rtype = keys[1]
       
  1050                     else:
       
  1051                         rtype = keys[0]
       
  1052                     updated_eid = data['cw_eid']
       
  1053                     if updated_eid in _insertdicts:
       
  1054                         _insertdicts[updated_eid][rtype] = data[rtype]
       
  1055                     else:
       
  1056                         # could not find corresponding insert dict, keep the
       
  1057                         # UPDATE query
       
  1058                         new_datalist.append(data)
       
  1059                 _inlined_relations_sql[statement] = new_datalist
       
  1060             _import_statements(self.system_source.get_connection,
       
  1061                                _entities_sql.items()
       
  1062                                + _relations_sql.items()
       
  1063                                + _inlined_relations_sql.items(),
       
  1064                                dump_output_dir=self.dump_output_dir,
       
  1065                                nb_threads=self.nb_threads_statement,
       
  1066                                support_copy_from=self.support_copy_from,
       
  1067                                encoding=self.dbencoding)
       
  1068         finally:
       
  1069             _entities_sql.clear()
       
  1070             _relations_sql.clear()
       
  1071             _insertdicts.clear()
       
  1072             _inlined_relations_sql.clear()
       
  1073 
       
  1074     def add_relation(self, cnx, subject, rtype, object,
       
  1075                      inlined=False, **kwargs):
       
  1076         if inlined:
       
  1077             _sql = self._sql.inlined_relations
       
  1078             data = {'cw_eid': subject, SQL_PREFIX + rtype: object}
       
  1079             subjtype = kwargs.get('subjtype')
       
  1080             if subjtype is None:
       
  1081                 # Try to infer it
       
  1082                 targets = [t.type for t in
       
  1083                            self.schema.rschema(rtype).subjects()]
       
  1084                 if len(targets) == 1:
       
  1085                     subjtype = targets[0]
       
  1086                 else:
       
  1087                     raise ValueError('You should give the subject etype for '
       
  1088                                      'inlined relation %s'
       
  1089                                      ', as it cannot be inferred: '
       
  1090                                      'this type is given as keyword argument '
       
  1091                                      '``subjtype``'% rtype)
       
  1092             statement = self.sqlgen.update(SQL_PREFIX + subjtype,
       
  1093                                            data, ['cw_eid'])
       
  1094         else:
       
  1095             _sql = self._sql.relations
       
  1096             data = {'eid_from': subject, 'eid_to': object}
       
  1097             statement = self.sqlgen.insert('%s_relation' % rtype, data)
       
  1098         if statement in _sql:
       
  1099             _sql[statement].append(data)
       
  1100         else:
       
  1101             _sql[statement] = [data]
       
  1102 
       
  1103     def add_entity(self, cnx, entity):
       
  1104         with self._storage_handler(entity, 'added'):
       
  1105             attrs = self.preprocess_entity(entity)
       
  1106             rtypes = self._inlined_rtypes_cache.get(entity.cw_etype, ())
       
  1107             if isinstance(rtypes, str):
       
  1108                 rtypes = (rtypes,)
       
  1109             for rtype in rtypes:
       
  1110                 if rtype not in attrs:
       
  1111                     attrs[rtype] = None
       
  1112             sql = self.sqlgen.insert(SQL_PREFIX + entity.cw_etype, attrs)
       
  1113             self._sql.eid_insertdicts[entity.eid] = attrs
       
  1114             self._append_to_entities(sql, attrs)
       
  1115 
       
  1116     def _append_to_entities(self, sql, attrs):
       
  1117         self._sql.entities[sql].append(attrs)
       
  1118 
       
  1119     def _handle_insert_entity_sql(self, cnx, sql, attrs):
       
  1120         # We have to overwrite the source given in parameters
       
  1121         # as here, we directly use the system source
       
  1122         attrs['asource'] = self.system_source.uri
       
  1123         self._append_to_entities(sql, attrs)
       
  1124 
       
  1125     def _handle_is_relation_sql(self, cnx, sql, attrs):
       
  1126         self._append_to_entities(sql, attrs)
       
  1127 
       
  1128     def _handle_is_instance_of_sql(self, cnx, sql, attrs):
       
  1129         self._append_to_entities(sql, attrs)
       
  1130 
       
  1131     def _handle_source_relation_sql(self, cnx, sql, attrs):
       
  1132         self._append_to_entities(sql, attrs)
       
  1133 
       
  1134     # add_info is _copypasted_ from the one in NativeSQLSource. We want it
       
  1135     # there because it will use the _handlers of the SQLGenSourceWrapper, which
       
  1136     # are not like the ones in the native source.
       
  1137     def add_info(self, cnx, entity, source, extid):
       
  1138         """add type and source info for an eid into the system table"""
       
  1139         # begin by inserting eid/type/source/extid into the entities table
       
  1140         if extid is not None:
       
  1141             assert isinstance(extid, str)
       
  1142             extid = b64encode(extid)
       
  1143         attrs = {'type': entity.cw_etype, 'eid': entity.eid, 'extid': extid,
       
  1144                  'asource': source.uri}
       
  1145         self._handle_insert_entity_sql(cnx, self.sqlgen.insert('entities', attrs), attrs)
       
  1146         # insert core relations: is, is_instance_of and cw_source
       
  1147         try:
       
  1148             self._handle_is_relation_sql(cnx, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)',
       
  1149                                          (entity.eid, eschema_eid(cnx, entity.e_schema)))
       
  1150         except IndexError:
       
  1151             # during schema serialization, skip
       
  1152             pass
       
  1153         else:
       
  1154             for eschema in entity.e_schema.ancestors() + [entity.e_schema]:
       
  1155                 self._handle_is_relation_sql(cnx,
       
  1156                                              'INSERT INTO is_instance_of_relation(eid_from,eid_to) VALUES (%s,%s)',
       
  1157                                              (entity.eid, eschema_eid(cnx, eschema)))
       
  1158         if 'CWSource' in self.schema and source.eid is not None: # else, cw < 3.10
       
  1159             self._handle_is_relation_sql(cnx, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)',
       
  1160                                          (entity.eid, source.eid))
       
  1161         # now we can update the full text index
       
  1162         if self.do_fti and self.need_fti_indexation(entity.cw_etype):
       
  1163             self.index_entity(cnx, entity=entity)