[massive store] Store entities in temporary table as well
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Thu, 06 Oct 2016 12:12:04 +0200
changeset 11789 71df2811b422
parent 11788 8e1fb9445d75
child 11790 04607da552ac
[massive store] Store entities in temporary table as well * on some entity type is encountered by a slave, create a dedicated table for insertion of entities of this type by this slave, similarly to what is done for relation - this should lower changes of conflicts in master/slaves mode ; * delay drop of constraints and indexes to `finish` method, where copy from temporary tables to regular table is done ; * insertion of metadata is done by scanning temporary tables, which may be way shorter than theier associated regular table ; * drop drop_metadata_constraints with its constraint_dropped friend attribute, there are no more necessary since this is done once in the `finish`. Related to #15538303
cubicweb/dataimport/massive_store.py
cubicweb/dataimport/test/test_massive_store.py
cubicweb/server/schema2sql.py
cubicweb/server/test/unittest_schema2sql.py
--- a/cubicweb/dataimport/massive_store.py	Tue Oct 11 10:24:13 2016 +0200
+++ b/cubicweb/dataimport/massive_store.py	Thu Oct 06 12:12:04 2016 +0200
@@ -28,6 +28,7 @@
 from six.moves import range
 
 from cubicweb.dataimport import stores, pgstore
+from cubicweb.server.schema2sql import eschema_sql_def
 
 
 class MassiveObjectStore(stores.RQLObjectStore):
@@ -89,12 +90,12 @@
         self.schema = cnx.vreg.schema
         self.default_values = get_default_values(self.schema)
         self.get_next_eid = lambda g=self._get_eid_gen(): next(g)
+        self._source_dbhelper = cnx.repo.system_source.dbhelper
         self._dbh = PGHelper(cnx)
 
         self._data_entities = defaultdict(list)
         self._data_relations = defaultdict(list)
-        self._initialized = set()
-        self._constraints_dropped = self.slave_mode
+        self._initialized = {}
 
     def _get_eid_gen(self):
         """ Function getting the next eid. This is done by preselecting
@@ -116,53 +117,10 @@
         if self not in self._initialized:
             self.sql('CREATE TABLE cwmassive_initialized'
                      '(retype text, type varchar(128), uuid varchar(32))')
-            self._initialized.append(self)
-
-    def master_init_etype(self, etype):
-        """Initialize database for insertion of entities of the given etype.
-
-        This is expected to be called once, usually by the master store in master/slaves
-        configuration.
-        """
-        self._drop_metadata_constraints_if_necessary()
-        tablename = 'cw_%s' % etype.lower()
-        self._dbh.drop_constraints(tablename)
-        self._dbh.drop_indexes(tablename)
-        self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized'
-                 '(retype text, type varchar(128), uuid varchar(32))')
-        self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype', %(uuid)s)",
-                 {'e': etype, 'uuid': self.uuid})
-
-    def master_insert_etype_metadata(self, etype):
-        """Massive insertion of meta data for a given etype, based on SQL statements.
-
-        In master/slabes configuration, you'll usually want to call it from the master once all
-        slaves have finished (at least slaves won't call it automatically, so that's your
-        reponsability).
-        """
-        # insert standard metadata relations
-        for rtype, eid in self.metagen.base_etype_rels(etype).items():
-            self._insert_meta_relation(etype, eid, '%s_relation' % rtype)
-        # insert cw_source, is and is_instance_of relations (normally handled by the system source)
-        self._insert_meta_relation(etype, self.metagen.source.eid, 'cw_source_relation')
-        eschema = self.schema[etype]
-        self._insert_meta_relation(etype, eschema.eid, 'is_relation')
-        for parent_eschema in chain(eschema.ancestors(), [eschema]):
-            self._insert_meta_relation(etype, parent_eschema.eid, 'is_instance_of_relation')
-        # finally insert records into the entities table
-        self.sql("INSERT INTO entities (eid, type, extid) "
-                 "SELECT cw_eid, '%s', extid FROM cw_%s "
-                 "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
-                 % (etype, etype.lower()))
+            self._initialized[self] = None
 
     # SQL utilities #########################################################
 
-    def _drop_metadata_constraints_if_necessary(self):
-        """Drop constraints and indexes for the metadata tables if necessary."""
-        if not self._constraints_dropped:
-            self._drop_metadata_constraints()
-            self._constraints_dropped = True
-
     def _drop_metadata_constraints(self):
         """Drop constraints and indexes for the metadata tables.
 
@@ -189,9 +147,19 @@
         """Given an entity type, attributes and inlined relations, returns the inserted entity's
         eid.
         """
-        if not self.slave_mode and etype not in self._initialized:
-            self._initialized.add(etype)
-            self.master_init_etype(etype)
+        if etype not in self._initialized:
+            if not self.slave_mode:
+                self.master_init()
+            tablename = 'cw_%s' % etype.lower()
+            tmp_tablename = '%s_%s' % (tablename, self.uuid)
+            self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype', %(uuid)s)",
+                     {'e': etype, 'uuid': self.uuid})
+            attr_defs = eschema_sql_def(self._source_dbhelper, self.schema[etype])
+            self.sql('CREATE TABLE %s(%s);' % (tmp_tablename,
+                                               ', '.join('cw_%s %s' % (column, sqltype)
+                                                         for column, sqltype in attr_defs)))
+            self._initialized[etype] = [attr for attr, _ in attr_defs]
+
         if 'eid' not in data:
             # If eid is not given and the eids sequence is set, use the value from the sequence
             eid = self.get_next_eid()
@@ -209,7 +177,7 @@
             if not self.slave_mode:
                 self.master_init()
             assert not self._cnx.vreg.schema.rschema(rtype).inlined
-            self._initialized.add(rtype)
+            self._initialized[rtype] = None
             tablename = '%s_relation' % rtype.lower()
             tmp_tablename = '%s_%s' % (tablename, self.uuid)
             self.sql("INSERT INTO cwmassive_initialized VALUES (%(r)s, 'rtype', %(uuid)s)",
@@ -234,10 +202,31 @@
         # Get all the initialized etypes/rtypes
         if self._dbh.table_exists('cwmassive_initialized'):
             cu = self.sql('SELECT retype, type, uuid FROM cwmassive_initialized')
+            entities = defaultdict(list)
             relations = defaultdict(list)
             for retype, _type, uuid in cu.fetchall():
                 if _type == 'rtype':
                     relations[retype].append(uuid)
+                else:  # _type = 'etype'
+                    entities[retype].append(uuid)
+            # if there is some entities to insert, delete constraint on metadata tables once for all
+            if entities:
+                self._drop_metadata_constraints()
+            # get back entity data from the temporary tables
+            for etype, uuids in entities.items():
+                tablename = 'cw_%s' % etype.lower()
+                attr_defs = eschema_sql_def(self._source_dbhelper, self.schema[etype])
+                columns = ','.join('cw_%s' % attr for attr, _ in attr_defs)
+                self._dbh.drop_constraints(tablename)
+                self._dbh.drop_indexes(tablename)
+                for uuid in uuids:
+                    tmp_tablename = '%s_%s' % (tablename, uuid)
+                    self.sql('INSERT INTO %(table)s(%(columns)s) '
+                             'SELECT %(columns)s FROM %(tmp_table)s'
+                             % {'table': tablename, 'tmp_table': tmp_tablename,
+                                'columns': columns})
+                    self._insert_etype_metadata(etype, tmp_tablename)
+                    self._tmp_data_cleanup(tmp_tablename, etype, uuid)
             # get back relation data from the temporary tables
             for rtype, uuids in relations.items():
                 tablename = '%s_relation' % rtype.lower()
@@ -251,17 +240,43 @@
                              'WHERE NOT EXISTS (SELECT 1 FROM %(table)s AS TT WHERE '
                              'TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);'
                              % {'table': tablename, 'tmp_table': tmp_tablename})
-                    # Drop temporary relation table and record from cwmassive_initialized
-                    self.sql('DROP TABLE %(tmp_table)s' % {'tmp_table': tmp_tablename})
-                    self.sql('DELETE FROM cwmassive_initialized '
-                             'WHERE retype = %(rtype)s AND uuid = %(uuid)s',
-                             {'rtype': retype, 'uuid': uuid})
+                    self._tmp_data_cleanup(tmp_tablename, rtype, uuid)
         # restore all deleted indexes and constraints
         self._dbh.restore_indexes_and_constraints()
         # delete the meta data table
         self.sql('DROP TABLE IF EXISTS cwmassive_initialized')
         self.commit()
 
+    def _insert_etype_metadata(self, etype, tmp_tablename):
+        """Massive insertion of meta data for `etype`, with new entities in `tmp_tablename`.
+        """
+        # insert standard metadata relations
+        for rtype, eid in self.metagen.base_etype_rels(etype).items():
+            self._insert_meta_relation(tmp_tablename, rtype, eid)
+        # insert cw_source, is and is_instance_of relations (normally handled by the system source)
+        self._insert_meta_relation(tmp_tablename, 'cw_source', self.metagen.source.eid)
+        eschema = self.schema[etype]
+        self._insert_meta_relation(tmp_tablename, 'is', eschema.eid)
+        for parent_eschema in chain(eschema.ancestors(), [eschema]):
+            self._insert_meta_relation(tmp_tablename, 'is_instance_of', parent_eschema.eid)
+        # finally insert records into the entities table
+        self.sql("INSERT INTO entities(eid, type) "
+                 "SELECT cw_eid, '%s' FROM %s "
+                 "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
+                 % (etype, tmp_tablename))
+
+    def _insert_meta_relation(self, tmp_tablename, rtype, eid_to):
+        self.sql("INSERT INTO %s_relation(eid_from, eid_to) SELECT cw_eid, %s FROM %s "
+                 "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
+                 % (rtype, eid_to, tmp_tablename))
+
+    def _tmp_data_cleanup(self, tmp_tablename, ertype, uuid):
+        """Drop temporary relation table and record from cwmassive_initialized."""
+        self.sql('DROP TABLE %(tmp_table)s' % {'tmp_table': tmp_tablename})
+        self.sql('DELETE FROM cwmassive_initialized '
+                 'WHERE retype = %(rtype)s AND uuid = %(uuid)s',
+                 {'rtype': ertype, 'uuid': uuid})
+
     # FLUSH #################################################################
 
     def on_commit(self):
@@ -296,38 +311,30 @@
             if not data:
                 # There is no data for these etype for this flush round.
                 continue
-            # XXX It may be interresting to directly infer the columns' names from the schema
-            # XXX For now, the _create_copyfrom_buffer does a "row[column]"
-            # which can lead to a key error.
-            # Thus we should create dictionary with all the keys.
-            columns = set()
-            for d in data:
-                columns.update(d)
-            _base_data = dict.fromkeys(columns)
+            attrs = self._initialized[etype]
+            _base_data = dict.fromkeys(attrs)
             _base_data.update(self.default_values[etype])
             _base_data.update(metagen.base_etype_attrs(etype))
             _data = []
             for d in data:
+                # do this first on `d`, because it won't fill keys associated to None as provided by
+                # `_base_data`
+                metagen.init_entity_attrs(etype, d['eid'], d)
+                # XXX warn/raise if there is some key not in attrs?
                 _d = _base_data.copy()
                 _d.update(d)
-                metagen.init_entity_attrs(etype, _d['eid'], _d)
                 _data.append(_d)
-            buf = pgstore._create_copyfrom_buffer(_data, columns)
-            columns = ['cw_%s' % attr for attr in columns]
+            buf = pgstore._create_copyfrom_buffer(_data, attrs)
+            tablename = 'cw_%s' % etype.lower()
+            tmp_tablename = '%s_%s' % (tablename, self.uuid)
+            columns = ['cw_%s' % attr for attr in attrs]
             cursor = self._cnx.cnxset.cu
             try:
-                cursor.copy_from(buf, 'cw_%s' % etype.lower(), null='NULL', columns=columns)
+                cursor.copy_from(buf, tmp_tablename, null='NULL', columns=columns)
             except Exception as exc:
                 self.on_rollback(exc, etype, data)
             # Clear data cache
             self._data_entities[etype] = []
-            if not self.slave_mode:
-                self.master_insert_etype_metadata(etype)
-
-    def _insert_meta_relation(self, etype, eid_to, rtype):
-        self.sql("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s "
-                 "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
-                 % (rtype, eid_to, etype.lower()))
 
 
 def get_default_values(schema):
--- a/cubicweb/dataimport/test/test_massive_store.py	Tue Oct 11 10:24:13 2016 +0200
+++ b/cubicweb/dataimport/test/test_massive_store.py	Thu Oct 06 12:12:04 2016 +0200
@@ -119,6 +119,7 @@
             store.prepare_insert_entity('Location', timezone=timezone_eid)
             store.flush()
             store.commit()
+            store.finish()
             eid, etname = cnx.execute('Any X, TN WHERE X timezone TZ, X is T, '
                                       'T name TN')[0]
             self.assertEqual(cnx.entity_from_eid(eid).cw_etype, etname)
@@ -231,16 +232,11 @@
         counter = itertools.count()
         with self.admin_access.repo_cnx() as cnx:
             store = MassiveObjectStore(cnx, on_rollback_callback=lambda *_: next(counter))
-            store.prepare_insert_entity('Location', nm='toto')
-            store.commit()  # commit modification to the database before flush
+            # oversized attribute
+            store.prepare_insert_entity('Location', feature_class='toto')
             store.flush()
         self.assertEqual(next(counter), 1)
 
-    def test_slave_mode_exception(self):
-        with self.admin_access.repo_cnx() as cnx:
-            slave_store = MassiveObjectStore(cnx, slave_mode=True)
-            self.assertRaises(RuntimeError, slave_store.finish)
-
     def test_simple_insert(self):
         with self.admin_access.repo_cnx() as cnx:
             store = MassiveObjectStore(cnx)
@@ -263,10 +259,10 @@
             # Check index
             indexes = all_indexes(cnx)
             self.assertIn('entities_pkey', indexes)
-            self.assertNotIn(build_index_name('owned_by_relation', ['eid_from', 'eid_to'], 'key_'),
-                             indexes)
-            self.assertNotIn(build_index_name('owned_by_relation', ['eid_from'], 'idx_'),
-                             indexes)
+            self.assertIn(build_index_name('owned_by_relation', ['eid_from', 'eid_to'], 'key_'),
+                          indexes)
+            self.assertIn(build_index_name('owned_by_relation', ['eid_from'], 'idx_'),
+                          indexes)
 
             # Cleanup -> index
             store.finish()
--- a/cubicweb/server/schema2sql.py	Tue Oct 11 10:24:13 2016 +0200
+++ b/cubicweb/server/schema2sql.py	Thu Oct 06 12:12:04 2016 +0200
@@ -102,6 +102,28 @@
         yield attrs, unique_index_name(eschema, attrs)
 
 
+def eschema_sql_def(dbhelper, eschema, skip_relations=(), prefix=''):
+    """Return a list of (column names, sql type def) for the given entity schema.
+
+    No constraint nor index are considered - this function is usually for massive import purpose.
+    """
+    attrs = [attrdef for attrdef in eschema.attribute_definitions()
+             if not attrdef[0].type in skip_relations]
+    attrs += [(rschema, None)
+              for rschema in eschema.subject_relations()
+              if not rschema.final and rschema.inlined]
+    result = []
+    for i in range(len(attrs)):
+        rschema, attrschema = attrs[i]
+        if attrschema is not None:
+            # creating = False will avoid NOT NULL / REFERENCES constraints
+            sqltype = aschema2sql(dbhelper, eschema, rschema, attrschema, creating=False)
+        else:  # inline relation
+            sqltype = 'integer'
+        result.append(('%s%s' % (prefix, rschema.type), sqltype))
+    return result
+
+
 def eschema2sql(dbhelper, eschema, skip_relations=(), prefix=''):
     """Yield SQL statements to initialize database from an entity schema."""
     table = prefix + eschema.type
--- a/cubicweb/server/test/unittest_schema2sql.py	Tue Oct 11 10:24:13 2016 +0200
+++ b/cubicweb/server/test/unittest_schema2sql.py	Thu Oct 06 12:12:04 2016 +0200
@@ -245,6 +245,32 @@
         output = list(schema2sql.schema2sql(dbhelper, schema, skip_relations=('works_for',)))
         self.assertEqual(output, EXPECTED_DATA_NO_DROP)
 
+    def test_eschema_sql_def_attributes(self):
+        dbhelper = get_db_helper('postgres')
+        attr_defs = schema2sql.eschema_sql_def(dbhelper, schema['Person'])
+        self.assertEqual(attr_defs,
+                         [('nom', 'varchar(64)'),
+                          ('prenom', 'varchar(64)'),
+                          ('sexe', "varchar(1) DEFAULT 'M'"),
+                          ('promo', 'varchar(22)'),
+                          ('titre', 'varchar(128)'),
+                          ('adel', 'varchar(128)'),
+                          ('ass', 'varchar(128)'),
+                          ('web', 'varchar(128)'),
+                          ('tel', 'integer'),
+                          ('fax', 'integer'),
+                          ('datenaiss', 'date'),
+                          ('test', 'boolean'),
+                          ('salary', 'float')])
+
+    def test_eschema_sql_def_inlined_rel(self):
+        dbhelper = get_db_helper('postgres')
+        attr_defs = schema2sql.eschema_sql_def(dbhelper, schema['Affaire'])
+        self.assertEqual(attr_defs,
+                         [('sujet', 'varchar(128)'),
+                          ('ref', 'varchar(12)'),
+                          ('inline_rel', 'integer')])
+
 
 if __name__ == '__main__':
     unittest_main()