dataimport.py
branchstable
changeset 9013 b4bcabf55e77
parent 8970 0a1bd0c590e2
child 9181 2eac0aa1d3f6
equal deleted inserted replaced
9012:2cf127d4f5fd 9013:b4bcabf55e77
    68 
    68 
    69 import csv
    69 import csv
    70 import sys
    70 import sys
    71 import threading
    71 import threading
    72 import traceback
    72 import traceback
       
    73 import warnings
    73 import cPickle
    74 import cPickle
    74 import os.path as osp
    75 import os.path as osp
       
    76 import inspect
    75 from collections import defaultdict
    77 from collections import defaultdict
    76 from contextlib import contextmanager
       
    77 from copy import copy
    78 from copy import copy
    78 from datetime import date, datetime
    79 from datetime import date, datetime
    79 from time import asctime
    80 from time import asctime
    80 from StringIO import StringIO
    81 from StringIO import StringIO
    81 
    82 
   321 def check_doubles_not_none(buckets):
   322 def check_doubles_not_none(buckets):
   322     """Extract the keys that have more than one item in their bucket."""
   323     """Extract the keys that have more than one item in their bucket."""
   323     return [(k, len(v)) for k, v in buckets.items()
   324     return [(k, len(v)) for k, v in buckets.items()
   324             if k is not None and len(v) > 1]
   325             if k is not None and len(v) > 1]
   325 
   326 
   326 
       
   327 # sql generator utility functions #############################################
   327 # sql generator utility functions #############################################
   328 
   328 
   329 
   329 
   330 def _import_statements(sql_connect, statements, nb_threads=3,
   330 def _import_statements(sql_connect, statements, nb_threads=3,
   331                        dump_output_dir=None,
   331                        dump_output_dir=None,
   394                     columns = None
   394                     columns = None
   395                 else:
   395                 else:
   396                     columns = list(data[0])
   396                     columns = list(data[0])
   397                 execmany_func(cu, statement, data, table, columns, encoding)
   397                 execmany_func(cu, statement, data, table, columns, encoding)
   398             except Exception:
   398             except Exception:
   399                 print 'unable to copy data into table %s', table
   399                 print 'unable to copy data into table %s' % table
   400                 # Error in import statement, save data in dump_output_dir
   400                 # Error in import statement, save data in dump_output_dir
   401                 if dump_output_dir is not None:
   401                 if dump_output_dir is not None:
   402                     pdata = {'data': data, 'statement': statement,
   402                     pdata = {'data': data, 'statement': statement,
   403                              'time': asctime(), 'columns': columns}
   403                              'time': asctime(), 'columns': columns}
   404                     filename = make_uid()
   404                     filename = make_uid()
   429         # Iterate over the different columns and the different values
   429         # Iterate over the different columns and the different values
   430         # and try to convert them to a correct datatype.
   430         # and try to convert them to a correct datatype.
   431         # If an error is raised, do not continue.
   431         # If an error is raised, do not continue.
   432         formatted_row = []
   432         formatted_row = []
   433         for col in columns:
   433         for col in columns:
   434             value = row[col]
   434             try:
       
   435                 value = row[col]
       
   436             except KeyError:
       
   437                 warnings.warn(u"Column %s is not accessible in row %s" 
       
   438                               % (col, row), RuntimeWarning)
       
   439                 # XXX 'value' set to None so that the import does not end in 
       
   440                 # error. 
       
   441                 # Instead, the extra keys are set to NULL from the 
       
   442                 # database point of view.
       
   443                 value = None
   435             if value is None:
   444             if value is None:
   436                 value = 'NULL'
   445                 value = 'NULL'
   437             elif isinstance(value, (long, int, float)):
   446             elif isinstance(value, (long, int, float)):
   438                 value = str(value)
   447                 value = str(value)
   439             elif isinstance(value, (str, unicode)):
   448             elif isinstance(value, (str, unicode)):
   504         assert isinstance(item, dict), 'item is not a dict but a %s' % type(item)
   513         assert isinstance(item, dict), 'item is not a dict but a %s' % type(item)
   505         data = self.create_entity(etype, **item)
   514         data = self.create_entity(etype, **item)
   506         item['eid'] = data['eid']
   515         item['eid'] = data['eid']
   507         return item
   516         return item
   508 
   517 
   509     def relate(self, eid_from, rtype, eid_to, inlined=False):
   518     def relate(self, eid_from, rtype, eid_to, **kwargs):
   510         """Add new relation"""
   519         """Add new relation"""
   511         relation = eid_from, rtype, eid_to
   520         relation = eid_from, rtype, eid_to
   512         self.relations.add(relation)
   521         self.relations.add(relation)
   513         return relation
   522         return relation
   514 
   523 
   520 
   529 
   521         If you want override commit method, please set it by the
   530         If you want override commit method, please set it by the
   522         constructor
   531         constructor
   523         """
   532         """
   524         pass
   533         pass
       
   534 
       
   535     def flush(self):
       
   536         """The method is provided so that all stores share a common API.
       
   537         It just tries to call the commit method.
       
   538         """
       
   539         print 'starting flush'
       
   540         try:
       
   541             self.commit()
       
   542         except:
       
   543             print 'failed to flush'
       
   544         else:
       
   545             print 'flush done'
   525 
   546 
   526     def rql(self, *args):
   547     def rql(self, *args):
   527         if self._rql is not None:
   548         if self._rql is not None:
   528             return self._rql(*args)
   549             return self._rql(*args)
   529         return []
   550         return []
   535     def nb_inserted_types(self):
   556     def nb_inserted_types(self):
   536         return len(self.types)
   557         return len(self.types)
   537     @property
   558     @property
   538     def nb_inserted_relations(self):
   559     def nb_inserted_relations(self):
   539         return len(self.relations)
   560         return len(self.relations)
   540 
       
   541     @deprecated("[3.7] index support will disappear")
       
   542     def build_index(self, name, type, func=None, can_be_empty=False):
       
   543         """build internal index for further search"""
       
   544         index = {}
       
   545         if func is None or not callable(func):
       
   546             func = lambda x: x['eid']
       
   547         for eid in self.types[type]:
       
   548             index.setdefault(func(self.eids[eid]), []).append(eid)
       
   549         if not can_be_empty:
       
   550             assert index, "new index '%s' cannot be empty" % name
       
   551         self.indexes[name] = index
       
   552 
       
   553     @deprecated("[3.7] index support will disappear")
       
   554     def build_rqlindex(self, name, type, key, rql, rql_params=False,
       
   555                        func=None, can_be_empty=False):
       
   556         """build an index by rql query
       
   557 
       
   558         rql should return eid in first column
       
   559         ctl.store.build_index('index_name', 'users', 'login', 'Any U WHERE U is CWUser')
       
   560         """
       
   561         self.types[type] = []
       
   562         rset = self.rql(rql, rql_params or {})
       
   563         if not can_be_empty:
       
   564             assert rset, "new index type '%s' cannot be empty (0 record found)" % type
       
   565         for entity in rset.entities():
       
   566             getattr(entity, key) # autopopulate entity with key attribute
       
   567             self.eids[entity.eid] = dict(entity)
       
   568             if entity.eid not in self.types[type]:
       
   569                 self.types[type].append(entity.eid)
       
   570 
       
   571         # Build index with specified key
       
   572         func = lambda x: x[key]
       
   573         self.build_index(name, type, func, can_be_empty=can_be_empty)
       
   574 
       
   575     @deprecated("[3.7] index support will disappear")
       
   576     def fetch(self, name, key, unique=False, decorator=None):
       
   577         """index fetcher method
       
   578 
       
   579         decorator is a callable method or an iterator of callable methods (usually a lambda function)
       
   580         decorator=lambda x: x[:1] (first value is returned)
       
   581         decorator=lambda x: x.lower (lowercased value is returned)
       
   582 
       
   583         decorator is handy when you want to improve index keys but without
       
   584         changing the original field
       
   585 
       
   586         Same check functions can be reused here.
       
   587         """
       
   588         eids = self.indexes[name].get(key, [])
       
   589         if decorator is not None:
       
   590             if not hasattr(decorator, '__iter__'):
       
   591                 decorator = (decorator,)
       
   592             for f in decorator:
       
   593                 eids = f(eids)
       
   594         if unique:
       
   595             assert len(eids) == 1, u'expected a single one value for key "%s" in index "%s". Got %i' % (key, name, len(eids))
       
   596             eids = eids[0]
       
   597         return eids
       
   598 
       
   599     @deprecated("[3.7] index support will disappear")
       
   600     def find(self, type, key, value):
       
   601         for idx in self.types[type]:
       
   602             item = self.items[idx]
       
   603             if item[key] == value:
       
   604                 yield item
       
   605 
       
   606     @deprecated("[3.7] checkpoint() deprecated. use commit() instead")
       
   607     def checkpoint(self):
       
   608         self.commit()
       
   609 
       
   610 
   561 
   611 class RQLObjectStore(ObjectStore):
   562 class RQLObjectStore(ObjectStore):
   612     """ObjectStore that works with an actual RQL repository (production mode)"""
   563     """ObjectStore that works with an actual RQL repository (production mode)"""
   613     _rql = None # bw compat
   564     _rql = None # bw compat
   614 
   565 
   628         else:
   579         else:
   629             session.set_cnxset()
   580             session.set_cnxset()
   630         self.session = session
   581         self.session = session
   631         self._commit = commit or session.commit
   582         self._commit = commit or session.commit
   632 
   583 
   633     @deprecated("[3.7] checkpoint() deprecated. use commit() instead")
       
   634     def checkpoint(self):
       
   635         self.commit()
       
   636 
       
   637     def commit(self):
   584     def commit(self):
   638         txuuid = self._commit()
   585         txuuid = self._commit()
   639         self.session.set_cnxset()
   586         self.session.set_cnxset()
   640         return txuuid
   587         return txuuid
   641 
   588 
   655         if item:
   602         if item:
   656             query += ': ' + ', '.join('X %s %%(%s)s' % (k, k)
   603             query += ': ' + ', '.join('X %s %%(%s)s' % (k, k)
   657                                       for k in item)
   604                                       for k in item)
   658         return self.rql(query, item)[0][0]
   605         return self.rql(query, item)[0][0]
   659 
   606 
   660     def relate(self, eid_from, rtype, eid_to, inlined=False):
   607     def relate(self, eid_from, rtype, eid_to, **kwargs):
   661         eid_from, rtype, eid_to = super(RQLObjectStore, self).relate(
   608         eid_from, rtype, eid_to = super(RQLObjectStore, self).relate(
   662             eid_from, rtype, eid_to)
   609             eid_from, rtype, eid_to, **kwargs)
   663         self.rql('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype,
   610         self.rql('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype,
   664                  {'x': int(eid_from), 'y': int(eid_to)})
   611                  {'x': int(eid_from), 'y': int(eid_to)})
   665 
   612 
   666     def find_entities(self, *args, **kwargs):
   613     def find_entities(self, *args, **kwargs):
   667         return self.session.find_entities(*args, **kwargs)
   614         return self.session.find_entities(*args, **kwargs)
   807         self._nb_inserted_entities = 0
   754         self._nb_inserted_entities = 0
   808         self._nb_inserted_types = 0
   755         self._nb_inserted_types = 0
   809         self._nb_inserted_relations = 0
   756         self._nb_inserted_relations = 0
   810         self.rql = session.execute
   757         self.rql = session.execute
   811         # deactivate security
   758         # deactivate security
   812         session.set_read_security(False)
   759         session.read_security = False
   813         session.set_write_security(False)
   760         session.write_security = False
   814 
   761 
   815     def create_entity(self, etype, **kwargs):
   762     def create_entity(self, etype, **kwargs):
   816         for k, v in kwargs.iteritems():
   763         for k, v in kwargs.iteritems():
   817             kwargs[k] = getattr(v, 'eid', v)
   764             kwargs[k] = getattr(v, 'eid', v)
   818         entity, rels = self.metagen.base_etype_dicts(etype)
   765         entity, rels = self.metagen.base_etype_dicts(etype)
   823         self.metagen.init_entity(entity)
   770         self.metagen.init_entity(entity)
   824         entity.cw_edited.update(kwargs, skipsec=False)
   771         entity.cw_edited.update(kwargs, skipsec=False)
   825         session = self.session
   772         session = self.session
   826         self.source.add_entity(session, entity)
   773         self.source.add_entity(session, entity)
   827         self.source.add_info(session, entity, self.source, None, complete=False)
   774         self.source.add_info(session, entity, self.source, None, complete=False)
       
   775         kwargs = dict()
       
   776         if inspect.getargspec(self.add_relation).keywords:
       
   777             kwargs['subjtype'] = entity.cw_etype
   828         for rtype, targeteids in rels.iteritems():
   778         for rtype, targeteids in rels.iteritems():
   829             # targeteids may be a single eid or a list of eids
   779             # targeteids may be a single eid or a list of eids
   830             inlined = self.rschema(rtype).inlined
   780             inlined = self.rschema(rtype).inlined
   831             try:
   781             try:
   832                 for targeteid in targeteids:
   782                 for targeteid in targeteids:
   833                     self.add_relation(session, entity.eid, rtype, targeteid,
   783                     self.add_relation(session, entity.eid, rtype, targeteid,
   834                                       inlined)
   784                                       inlined, **kwargs)
   835             except TypeError:
   785             except TypeError:
   836                 self.add_relation(session, entity.eid, rtype, targeteids,
   786                 self.add_relation(session, entity.eid, rtype, targeteids,
   837                                   inlined)
   787                                   inlined, **kwargs)
   838         self._nb_inserted_entities += 1
   788         self._nb_inserted_entities += 1
   839         return entity
   789         return entity
   840 
   790 
   841     def relate(self, eid_from, rtype, eid_to):
   791     def relate(self, eid_from, rtype, eid_to, **kwargs):
   842         assert not rtype.startswith('reverse_')
   792         assert not rtype.startswith('reverse_')
   843         self.add_relation(self.session, eid_from, rtype, eid_to,
   793         self.add_relation(self.session, eid_from, rtype, eid_to,
   844                           self.rschema(rtype).inlined)
   794                           self.rschema(rtype).inlined)
   845         self._nb_inserted_relations += 1
   795         self._nb_inserted_relations += 1
   846 
   796 
   960 
   910 
   961     def flush(self):
   911     def flush(self):
   962         """Flush data to the database"""
   912         """Flush data to the database"""
   963         self.source.flush()
   913         self.source.flush()
   964 
   914 
   965     def relate(self, subj_eid, rtype, obj_eid, subjtype=None):
   915     def relate(self, subj_eid, rtype, obj_eid, **kwargs):
   966         if subj_eid is None or obj_eid is None:
   916         if subj_eid is None or obj_eid is None:
   967             return
   917             return
   968         # XXX Could subjtype be inferred ?
   918         # XXX Could subjtype be inferred ?
   969         self.source.add_relation(self.session, subj_eid, rtype, obj_eid,
   919         self.source.add_relation(self.session, subj_eid, rtype, obj_eid,
   970                                  self.rschema(rtype).inlined, subjtype)
   920                                  self.rschema(rtype).inlined, **kwargs)
   971 
   921 
   972     def drop_indexes(self, etype):
   922     def drop_indexes(self, etype):
   973         """Drop indexes for a given entity type"""
   923         """Drop indexes for a given entity type"""
   974         if etype not in self.indexes_etypes:
   924         if etype not in self.indexes_etypes:
   975             cu = self.session.cnxset['system']
   925             cu = self.session.cnxset['system']
  1079                                nb_threads=self.nb_threads_statement,
  1029                                nb_threads=self.nb_threads_statement,
  1080                                support_copy_from=self.support_copy_from,
  1030                                support_copy_from=self.support_copy_from,
  1081                                encoding=self.dbencoding)
  1031                                encoding=self.dbencoding)
  1082         except:
  1032         except:
  1083             print 'failed to flush'
  1033             print 'failed to flush'
       
  1034         else:
       
  1035             print 'flush done'
  1084         finally:
  1036         finally:
  1085             _entities_sql.clear()
  1037             _entities_sql.clear()
  1086             _relations_sql.clear()
  1038             _relations_sql.clear()
  1087             _insertdicts.clear()
  1039             _insertdicts.clear()
  1088             _inlined_relations_sql.clear()
  1040             _inlined_relations_sql.clear()
  1089             print 'flush done'
       
  1090 
  1041 
  1091     def add_relation(self, session, subject, rtype, object,
  1042     def add_relation(self, session, subject, rtype, object,
  1092                      inlined=False, subjtype=None):
  1043                      inlined=False, **kwargs):
  1093         if inlined:
  1044         if inlined:
  1094             _sql = self._sql.inlined_relations
  1045             _sql = self._sql.inlined_relations
  1095             data = {'cw_eid': subject, SQL_PREFIX + rtype: object}
  1046             data = {'cw_eid': subject, SQL_PREFIX + rtype: object}
       
  1047             subjtype = kwargs.get('subjtype')
  1096             if subjtype is None:
  1048             if subjtype is None:
  1097                 # Try to infer it
  1049                 # Try to infer it
  1098                 targets = [t.type for t in
  1050                 targets = [t.type for t in
  1099                            self.schema.rschema(rtype).targets()]
  1051                            self.schema.rschema(rtype).targets()]
  1100                 if len(targets) == 1:
  1052                 if len(targets) == 1:
  1101                     subjtype = targets[0]
  1053                     subjtype = targets[0]
  1102                 else:
  1054                 else:
  1103                     raise ValueError('You should give the subject etype for '
  1055                     raise ValueError('You should give the subject etype for '
  1104                                      'inlined relation %s'
  1056                                      'inlined relation %s'
  1105                                      ', as it cannot be inferred' % rtype)
  1057                                      ', as it cannot be inferred: '
       
  1058                                      'this type is given as keyword argument '
       
  1059                                      '``subjtype``'% rtype)
  1106             statement = self.sqlgen.update(SQL_PREFIX + subjtype,
  1060             statement = self.sqlgen.update(SQL_PREFIX + subjtype,
  1107                                            data, ['cw_eid'])
  1061                                            data, ['cw_eid'])
  1108         else:
  1062         else:
  1109             _sql = self._sql.relations
  1063             _sql = self._sql.relations
  1110             data = {'eid_from': subject, 'eid_to': object}
  1064             data = {'eid_from': subject, 'eid_to': object}
  1115             _sql[statement] = [data]
  1069             _sql[statement] = [data]
  1116 
  1070 
  1117     def add_entity(self, session, entity):
  1071     def add_entity(self, session, entity):
  1118         with self._storage_handler(entity, 'added'):
  1072         with self._storage_handler(entity, 'added'):
  1119             attrs = self.preprocess_entity(entity)
  1073             attrs = self.preprocess_entity(entity)
  1120             rtypes = self._inlined_rtypes_cache.get(entity.__regid__, ())
  1074             rtypes = self._inlined_rtypes_cache.get(entity.cw_etype, ())
  1121             if isinstance(rtypes, str):
  1075             if isinstance(rtypes, str):
  1122                 rtypes = (rtypes,)
  1076                 rtypes = (rtypes,)
  1123             for rtype in rtypes:
  1077             for rtype in rtypes:
  1124                 if rtype not in attrs:
  1078                 if rtype not in attrs:
  1125                     attrs[rtype] = None
  1079                     attrs[rtype] = None
  1126             sql = self.sqlgen.insert(SQL_PREFIX + entity.__regid__, attrs)
  1080             sql = self.sqlgen.insert(SQL_PREFIX + entity.cw_etype, attrs)
  1127             self._sql.eid_insertdicts[entity.eid] = attrs
  1081             self._sql.eid_insertdicts[entity.eid] = attrs
  1128             self._append_to_entities(sql, attrs)
  1082             self._append_to_entities(sql, attrs)
  1129 
  1083 
  1130     def _append_to_entities(self, sql, attrs):
  1084     def _append_to_entities(self, sql, attrs):
  1131         self._sql.entities[sql].append(attrs)
  1085         self._sql.entities[sql].append(attrs)
  1154         # begin by inserting eid/type/source/extid into the entities table
  1108         # begin by inserting eid/type/source/extid into the entities table
  1155         if extid is not None:
  1109         if extid is not None:
  1156             assert isinstance(extid, str)
  1110             assert isinstance(extid, str)
  1157             extid = b64encode(extid)
  1111             extid = b64encode(extid)
  1158         uri = 'system' if source.copy_based_source else source.uri
  1112         uri = 'system' if source.copy_based_source else source.uri
  1159         attrs = {'type': entity.__regid__, 'eid': entity.eid, 'extid': extid,
  1113         attrs = {'type': entity.cw_etype, 'eid': entity.eid, 'extid': extid,
  1160                  'source': uri, 'asource': source.uri, 'mtime': datetime.utcnow()}
  1114                  'source': uri, 'asource': source.uri, 'mtime': datetime.utcnow()}
  1161         self._handle_insert_entity_sql(session, self.sqlgen.insert('entities', attrs), attrs)
  1115         self._handle_insert_entity_sql(session, self.sqlgen.insert('entities', attrs), attrs)
  1162         # insert core relations: is, is_instance_of and cw_source
  1116         # insert core relations: is, is_instance_of and cw_source
  1163         try:
  1117         try:
  1164             self._handle_is_relation_sql(session, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)',
  1118             self._handle_is_relation_sql(session, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)',
  1173                                              (entity.eid, eschema_eid(session, eschema)))
  1127                                              (entity.eid, eschema_eid(session, eschema)))
  1174         if 'CWSource' in self.schema and source.eid is not None: # else, cw < 3.10
  1128         if 'CWSource' in self.schema and source.eid is not None: # else, cw < 3.10
  1175             self._handle_is_relation_sql(session, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)',
  1129             self._handle_is_relation_sql(session, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)',
  1176                                          (entity.eid, source.eid))
  1130                                          (entity.eid, source.eid))
  1177         # now we can update the full text index
  1131         # now we can update the full text index
  1178         if self.do_fti and self.need_fti_indexation(entity.__regid__):
  1132         if self.do_fti and self.need_fti_indexation(entity.cw_etype):
  1179             if complete:
  1133             if complete:
  1180                 entity.complete(entity.e_schema.indexable_attributes())
  1134                 entity.complete(entity.e_schema.indexable_attributes())
  1181             self.index_entity(session, entity=entity)
  1135             self.index_entity(session, entity=entity)