cubicweb/dataimport/massive_store.py
changeset 11707 2c4518fea26f
parent 11705 54b1c8881f1e
child 11773 054a947b5415
--- a/cubicweb/dataimport/massive_store.py	Tue Sep 27 12:01:24 2016 +0200
+++ b/cubicweb/dataimport/massive_store.py	Tue Sep 27 12:23:19 2016 +0200
@@ -20,7 +20,6 @@
 import logging
 from copy import copy
 from collections import defaultdict
-from io import StringIO
 from itertools import chain
 from base64 import b64encode
 
@@ -43,41 +42,20 @@
       the indexes.
 
     - This store can only insert relations that are not inlined (i.e.,
-      which do *not* have inlined=True in their definition in the schema), unless they are
-      specified as entity attributes.
+      which do *not* have inlined=True in their definition in the schema),
+      unless they are specified as entity attributes.
 
     It should be used as follows:
 
        store = MassiveObjectStore(cnx)
-       store.init_rtype_table('Person', 'lives_in', 'Location')
-       ...
-
-       store.prepare_insert_entity('Person', subj_iid_attribute=person_iid, ...)
-       store.prepare_insert_entity('Location', obj_iid_attribute=location_iid, ...)
-       ...
-
-       # subj_iid_attribute and obj_iid_attribute are argument names
-       # chosen by the user (e.g. "cwuri"). These names can be identical.
-       # person_iid and location_iid are unique IDs and depend on the data
-       # (e.g URI).
+       eid_p = store.prepare_insert_entity('Person',
+                                           cwuri='http://dbpedia.org/toto',
+                                           name='Toto')
+       eid_loc = store.prepare_insert_entity('Location',
+                                             cwuri='http://geonames.org/11111',
+                                             name='Somewhere')
+       store.prepare_insert_relation(eid_p, 'lives_in', eid_loc)
        store.flush()
-       store.relate_by_iid(person_iid, 'lives_in', location_iid)
-       # For example:
-       store.prepare_insert_entity('Person',
-                                   cwuri='http://dbpedia.org/toto',
-                                   name='Toto')
-       store.prepare_insert_entity('Location',
-                                   uri='http://geonames.org/11111',
-                                   name='Somewhere')
-       store.flush()
-       store.relate_by_iid('http://dbpedia.org/toto',
-                           'lives_in',
-                           'http://geonames.org/11111')
-       # Finally
-       store.convert_relations('Person', 'lives_in', 'Location',
-                               'subj_iid_attribute', 'obj_iid_attribute')
-       # For the previous example:
-       store.convert_relations('Person', 'lives_in', 'Location', 'cwuri', 'uri')
        ...
        store.commit()
        store.finish()
@@ -85,8 +63,6 @@
     Full-text indexation is not handled, you'll have to reindex the proper entity types by yourself
     if desired.
     """
-    # max size of the iid, used to create the iid_eid conversion table
-    iid_maxsize = 1024
 
     def __init__(self, cnx,
                  on_commit_callback=None, on_rollback_callback=None,
@@ -120,14 +96,6 @@
         self._data_entities = defaultdict(list)
         self._data_relations = defaultdict(list)
         self._initialized = set()
-        # uri handling
-        self._data_uri_relations = defaultdict(list)
-        # etypes for which we have a uri_eid_%(etype)s table
-        self._init_uri_eid = set()
-        # etypes for which we have a uri_eid_%(e)s_idx index
-        self._uri_eid_inserted = set()
-        # set of rtypes for which we have a %(rtype)s_relation_iid_tmp table
-        self._uri_rtypes = set()
 
         if not self.slave_mode:
             # drop constraint and metadata table, they will be recreated when self.finish() is
@@ -144,112 +112,6 @@
             for eid in range(last_eid - self.eids_seq_range + 1, last_eid + 1):
                 yield eid
 
-    # URI related things #######################################################
-
-    def init_rtype_table(self, etype_from, rtype, etype_to):
-        """ Build temporary table for standard rtype """
-        # Create an uri_eid table for each etype for a better control of which etype is concerned by
-        # a particular possibly multivalued relation.
-        for etype in (etype_from, etype_to):
-            if etype and etype not in self._init_uri_eid:
-                self._init_uri_eid.add(etype)
-                self.sql('CREATE TABLE IF NOT EXISTS uri_eid_%(e)s'
-                         '(uri character varying(%(size)s), eid integer)'
-                         % {'e': etype.lower(), 'size': self.iid_maxsize})
-        if rtype not in self._uri_rtypes:
-            # Create the temporary table
-            if not self.schema.rschema(rtype).inlined:
-                self.sql('CREATE TABLE IF NOT EXISTS %(r)s_relation_iid_tmp'
-                         '(uri_from character varying(%(s)s), uri_to character varying(%(s)s))'
-                         % {'r': rtype, 's': self.iid_maxsize})
-                self._uri_rtypes.add(rtype)
-            else:
-                self.logger.warning("inlined relation %s: cannot insert it", rtype)
-
-    def relate_by_iid(self, iid_from, rtype, iid_to):
-        """Add new relation based on the internal id (iid)
-        of the entities (not the eid)"""
-        # Push data
-        if isinstance(iid_from, unicode):
-            iid_from = iid_from.encode('utf-8')
-        if isinstance(iid_to, unicode):
-            iid_to = iid_to.encode('utf-8')
-        self._data_uri_relations[rtype].append({'uri_from': iid_from, 'uri_to': iid_to})
-
-    def flush_relations(self):
-        """ Flush the relations data
-        """
-        for rtype, data in self._data_uri_relations.items():
-            if not data:
-                self.logger.info('No data for rtype %s', rtype)
-            buf = StringIO('\n'.join(['%(uri_from)s\t%(uri_to)s' % d for d in data]))
-            if not buf:
-                self.logger.info('Empty Buffer for rtype %s', rtype)
-                continue
-            cursor = self._cnx.cnxset.cu
-            if not self.schema.rschema(rtype).inlined:
-                cursor.copy_from(buf, '%s_relation_iid_tmp' % rtype.lower(),
-                                 null='NULL', columns=('uri_from', 'uri_to'))
-            else:
-                self.logger.warning("inlined relation %s: cannot insert it", rtype)
-            buf.close()
-            # Clear data cache
-            self._data_uri_relations[rtype] = []
-
-    def fill_uri_eid_table(self, etype, uri_label):
-        """ Fill the uri_eid table
-        """
-        if etype not in self._uri_eid_inserted:
-            self._uri_eid_inserted.add(etype)
-            self.logger.info('Fill uri_eid for etype %s', etype)
-            self.sql('INSERT INTO uri_eid_%(e)s SELECT cw_%(l)s, cw_eid FROM cw_%(e)s'
-                     % {'l': uri_label, 'e': etype.lower()})
-            self.sql('CREATE INDEX uri_eid_%(e)s_idx ON uri_eid_%(e)s(uri)'
-                     % {'e': etype.lower()})
-
-    def convert_relations(self, etype_from, rtype, etype_to,
-                          uri_label_from='cwuri', uri_label_to='cwuri'):
-        """ Flush the converted relations
-        """
-        # Always flush relations to be sure
-        self.logger.info('Convert relations %s %s %s', etype_from, rtype, etype_to)
-        self.flush_relations()
-        if uri_label_from:
-            self.fill_uri_eid_table(etype_from, uri_label_from)
-        if uri_label_to:
-            self.fill_uri_eid_table(etype_to, uri_label_to)
-        if self.schema.rschema(rtype).inlined:
-            self.logger.warning("Can't insert inlined relation %s", rtype)
-            return
-        if uri_label_from and uri_label_to:
-            sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT O1.eid, O2.eid
-            FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(ef)s as O1, uri_eid_%(et)s as O2
-            WHERE O1.uri=T.uri_from AND O2.uri=T.uri_to AND NOT EXISTS (
-            SELECT 1 FROM %(r)s_relation AS TT WHERE TT.eid_from=O1.eid AND TT.eid_to=O2.eid);
-            '''
-        elif uri_label_to:
-            sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT
-            CAST(T.uri_from AS INTEGER), O1.eid
-            FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(et)s as O1
-            WHERE O1.uri=T.uri_to AND NOT EXISTS (
-            SELECT 1 FROM %(r)s_relation AS TT WHERE
-            TT.eid_from=CAST(T.uri_from AS INTEGER) AND TT.eid_to=O1.eid);
-            '''
-        elif uri_label_from:
-            sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT O1.eid, T.uri_to
-            O1.eid, CAST(T.uri_to AS INTEGER)
-            FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(ef)s as O1
-            WHERE O1.uri=T.uri_from AND NOT EXISTS (
-            SELECT 1 FROM %(r)s_relation AS TT WHERE
-            TT.eid_from=O1.eid AND TT.eid_to=CAST(T.uri_to AS INTEGER));
-            '''
-        try:
-            self.sql(sql % {'r': rtype.lower(),
-                            'et': etype_to.lower() if etype_to else u'',
-                            'ef': etype_from.lower() if etype_from else u''})
-        except Exception as ex:
-            self.logger.error("Can't insert relation %s: %s", rtype, ex)
-
     # SQL utilities #########################################################
 
     def _drop_all_constraints(self):
@@ -324,7 +186,6 @@
     def flush(self):
         """Flush the data"""
         self.flush_entities()
-        self.flush_internal_relations()
         self.flush_relations()
 
     def commit(self):
@@ -337,12 +198,6 @@
         if self.slave_mode:
             raise RuntimeError('Store cleanup is not allowed in slave mode')
         self.logger.info("Start cleaning")
-        # Cleanup relations tables
-        for etype in self._init_uri_eid:
-            self.sql('DROP TABLE uri_eid_%s' % etype.lower())
-        # Remove relations tables
-        for rtype in self._uri_rtypes:
-            self.sql('DROP TABLE %(r)s_relation_iid_tmp' % {'r': rtype})
         # Get all the initialized etypes/rtypes
         if self._dbh.table_exists('cwmassive_initialized'):
             cu = self.sql('SELECT retype, type FROM cwmassive_initialized')
@@ -373,9 +228,8 @@
         else:
             raise exc
 
-    def flush_internal_relations(self):
-        """ Flush the relations data
-        """
+    def flush_relations(self):
+        """Flush the relations data."""
         for rtype, data in self._data_relations.items():
             if not data:
                 # There is no data for these etype for this flush round.
@@ -392,8 +246,7 @@
             self._data_relations[rtype] = []
 
     def flush_entities(self):
-        """ Flush the entities data
-        """
+        """Flush the entities data."""
         for etype, data in self._data_entities.items():
             if not data:
                 # There is no data for these etype for this flush round.