dataimport.py
changeset 8832 26cdfc6dd6f8
parent 8807 d9aaad2c52e9
child 8833 39f81e2db2fc
equal deleted inserted replaced
8830:7fd6c52ef878 8832:26cdfc6dd6f8
    70 import sys
    70 import sys
    71 import threading
    71 import threading
    72 import traceback
    72 import traceback
    73 import cPickle
    73 import cPickle
    74 import os.path as osp
    74 import os.path as osp
       
    75 import inspect
    75 from collections import defaultdict
    76 from collections import defaultdict
    76 from contextlib import contextmanager
    77 from contextlib import contextmanager
    77 from copy import copy
    78 from copy import copy
    78 from datetime import date, datetime
    79 from datetime import date, datetime
    79 from time import asctime
    80 from time import asctime
   320 
   321 
   321 def check_doubles_not_none(buckets):
   322 def check_doubles_not_none(buckets):
   322     """Extract the keys that have more than one item in their bucket."""
   323     """Extract the keys that have more than one item in their bucket."""
   323     return [(k, len(v)) for k, v in buckets.items()
   324     return [(k, len(v)) for k, v in buckets.items()
   324             if k is not None and len(v) > 1]
   325             if k is not None and len(v) > 1]
   325 
       
   326 
   326 
   327 # sql generator utility functions #############################################
   327 # sql generator utility functions #############################################
   328 
   328 
   329 
   329 
   330 def _import_statements(sql_connect, statements, nb_threads=3,
   330 def _import_statements(sql_connect, statements, nb_threads=3,
   504         assert isinstance(item, dict), 'item is not a dict but a %s' % type(item)
   504         assert isinstance(item, dict), 'item is not a dict but a %s' % type(item)
   505         data = self.create_entity(etype, **item)
   505         data = self.create_entity(etype, **item)
   506         item['eid'] = data['eid']
   506         item['eid'] = data['eid']
   507         return item
   507         return item
   508 
   508 
   509     def relate(self, eid_from, rtype, eid_to, inlined=False):
   509     def relate(self, eid_from, rtype, eid_to, **kwargs):
   510         """Add new relation"""
   510         """Add new relation"""
   511         relation = eid_from, rtype, eid_to
   511         relation = eid_from, rtype, eid_to
   512         self.relations.add(relation)
   512         self.relations.add(relation)
   513         return relation
   513         return relation
   514 
   514 
   581         if item:
   581         if item:
   582             query += ': ' + ', '.join('X %s %%(%s)s' % (k, k)
   582             query += ': ' + ', '.join('X %s %%(%s)s' % (k, k)
   583                                       for k in item)
   583                                       for k in item)
   584         return self.rql(query, item)[0][0]
   584         return self.rql(query, item)[0][0]
   585 
   585 
   586     def relate(self, eid_from, rtype, eid_to, inlined=False):
   586     def relate(self, eid_from, rtype, eid_to, **kwargs):
   587         eid_from, rtype, eid_to = super(RQLObjectStore, self).relate(
   587         eid_from, rtype, eid_to = super(RQLObjectStore, self).relate(
   588             eid_from, rtype, eid_to)
   588             eid_from, rtype, eid_to, **kwargs)
   589         self.rql('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype,
   589         self.rql('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype,
   590                  {'x': int(eid_from), 'y': int(eid_to)})
   590                  {'x': int(eid_from), 'y': int(eid_to)})
   591 
   591 
   592     def find_entities(self, *args, **kwargs):
   592     def find_entities(self, *args, **kwargs):
   593         return self.session.find_entities(*args, **kwargs)
   593         return self.session.find_entities(*args, **kwargs)
   749         self.metagen.init_entity(entity)
   749         self.metagen.init_entity(entity)
   750         entity.cw_edited.update(kwargs, skipsec=False)
   750         entity.cw_edited.update(kwargs, skipsec=False)
   751         session = self.session
   751         session = self.session
   752         self.source.add_entity(session, entity)
   752         self.source.add_entity(session, entity)
   753         self.source.add_info(session, entity, self.source, None, complete=False)
   753         self.source.add_info(session, entity, self.source, None, complete=False)
       
   754         kwargs = dict()
       
   755         if inspect.getargspec(self.add_relation).keywords:
       
   756             kwargs['subjtype'] = entity.__regid__
   754         for rtype, targeteids in rels.iteritems():
   757         for rtype, targeteids in rels.iteritems():
   755             # targeteids may be a single eid or a list of eids
   758             # targeteids may be a single eid or a list of eids
   756             inlined = self.rschema(rtype).inlined
   759             inlined = self.rschema(rtype).inlined
   757             try:
   760             try:
   758                 for targeteid in targeteids:
   761                 for targeteid in targeteids:
   759                     self.add_relation(session, entity.eid, rtype, targeteid,
   762                     self.add_relation(session, entity.eid, rtype, targeteid,
   760                                       inlined)
   763                                       inlined, **kwargs)
   761             except TypeError:
   764             except TypeError:
   762                 self.add_relation(session, entity.eid, rtype, targeteids,
   765                 self.add_relation(session, entity.eid, rtype, targeteids,
   763                                   inlined)
   766                                   inlined, **kwargs)
   764         self._nb_inserted_entities += 1
   767         self._nb_inserted_entities += 1
   765         return entity
   768         return entity
   766 
   769 
   767     def relate(self, eid_from, rtype, eid_to):
   770     def relate(self, eid_from, rtype, eid_to, **kwargs):
   768         assert not rtype.startswith('reverse_')
   771         assert not rtype.startswith('reverse_')
   769         self.add_relation(self.session, eid_from, rtype, eid_to,
   772         self.add_relation(self.session, eid_from, rtype, eid_to,
   770                           self.rschema(rtype).inlined)
   773                           self.rschema(rtype).inlined)
   771         self._nb_inserted_relations += 1
   774         self._nb_inserted_relations += 1
   772 
   775 
   886 
   889 
   887     def flush(self):
   890     def flush(self):
   888         """Flush data to the database"""
   891         """Flush data to the database"""
   889         self.source.flush()
   892         self.source.flush()
   890 
   893 
   891     def relate(self, subj_eid, rtype, obj_eid, subjtype=None):
   894     def relate(self, subj_eid, rtype, obj_eid, **kwargs):
   892         if subj_eid is None or obj_eid is None:
   895         if subj_eid is None or obj_eid is None:
   893             return
   896             return
   894         # XXX Could subjtype be inferred ?
   897         # XXX Could subjtype be inferred ?
   895         self.source.add_relation(self.session, subj_eid, rtype, obj_eid,
   898         self.source.add_relation(self.session, subj_eid, rtype, obj_eid,
   896                                  self.rschema(rtype).inlined, subjtype)
   899                                  self.rschema(rtype).inlined, **kwargs)
   897 
   900 
   898     def drop_indexes(self, etype):
   901     def drop_indexes(self, etype):
   899         """Drop indexes for a given entity type"""
   902         """Drop indexes for a given entity type"""
   900         if etype not in self.indexes_etypes:
   903         if etype not in self.indexes_etypes:
   901             cu = self.session.cnxset['system']
   904             cu = self.session.cnxset['system']
  1010         finally:
  1013         finally:
  1011             _entities_sql.clear()
  1014             _entities_sql.clear()
  1012             _relations_sql.clear()
  1015             _relations_sql.clear()
  1013             _insertdicts.clear()
  1016             _insertdicts.clear()
  1014             _inlined_relations_sql.clear()
  1017             _inlined_relations_sql.clear()
  1015             print 'flush done'
       
  1016 
  1018 
  1017     def add_relation(self, session, subject, rtype, object,
  1019     def add_relation(self, session, subject, rtype, object,
  1018                      inlined=False, subjtype=None):
  1020                      inlined=False, **kwargs):
  1019         if inlined:
  1021         if inlined:
  1020             _sql = self._sql.inlined_relations
  1022             _sql = self._sql.inlined_relations
  1021             data = {'cw_eid': subject, SQL_PREFIX + rtype: object}
  1023             data = {'cw_eid': subject, SQL_PREFIX + rtype: object}
       
  1024             subjtype = kwargs.get('subjtype')
  1022             if subjtype is None:
  1025             if subjtype is None:
  1023                 # Try to infer it
  1026                 # Try to infer it
  1024                 targets = [t.type for t in
  1027                 targets = [t.type for t in
  1025                            self.schema.rschema(rtype).targets()]
  1028                            self.schema.rschema(rtype).targets()]
  1026                 if len(targets) == 1:
  1029                 if len(targets) == 1: