# HG changeset patch # User Sylvain Thénault # Date 1475748724 -7200 # Node ID 71df2811b4225e1c4d6f5bd9b8129a5b1b15535e # Parent 8e1fb9445d75dddd50896e8b06ac70780df48482 [massive store] Store entities in temporary table as well * on some entity type is encountered by a slave, create a dedicated table for insertion of entities of this type by this slave, similarly to what is done for relation - this should lower changes of conflicts in master/slaves mode ; * delay drop of constraints and indexes to `finish` method, where copy from temporary tables to regular table is done ; * insertion of metadata is done by scanning temporary tables, which may be way shorter than theier associated regular table ; * drop drop_metadata_constraints with its constraint_dropped friend attribute, there are no more necessary since this is done once in the `finish`. Related to #15538303 diff -r 8e1fb9445d75 -r 71df2811b422 cubicweb/dataimport/massive_store.py --- a/cubicweb/dataimport/massive_store.py Tue Oct 11 10:24:13 2016 +0200 +++ b/cubicweb/dataimport/massive_store.py Thu Oct 06 12:12:04 2016 +0200 @@ -28,6 +28,7 @@ from six.moves import range from cubicweb.dataimport import stores, pgstore +from cubicweb.server.schema2sql import eschema_sql_def class MassiveObjectStore(stores.RQLObjectStore): @@ -89,12 +90,12 @@ self.schema = cnx.vreg.schema self.default_values = get_default_values(self.schema) self.get_next_eid = lambda g=self._get_eid_gen(): next(g) + self._source_dbhelper = cnx.repo.system_source.dbhelper self._dbh = PGHelper(cnx) self._data_entities = defaultdict(list) self._data_relations = defaultdict(list) - self._initialized = set() - self._constraints_dropped = self.slave_mode + self._initialized = {} def _get_eid_gen(self): """ Function getting the next eid. This is done by preselecting @@ -116,53 +117,10 @@ if self not in self._initialized: self.sql('CREATE TABLE cwmassive_initialized' '(retype text, type varchar(128), uuid varchar(32))') - self._initialized.append(self) - - def master_init_etype(self, etype): - """Initialize database for insertion of entities of the given etype. - - This is expected to be called once, usually by the master store in master/slaves - configuration. - """ - self._drop_metadata_constraints_if_necessary() - tablename = 'cw_%s' % etype.lower() - self._dbh.drop_constraints(tablename) - self._dbh.drop_indexes(tablename) - self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized' - '(retype text, type varchar(128), uuid varchar(32))') - self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype', %(uuid)s)", - {'e': etype, 'uuid': self.uuid}) - - def master_insert_etype_metadata(self, etype): - """Massive insertion of meta data for a given etype, based on SQL statements. - - In master/slabes configuration, you'll usually want to call it from the master once all - slaves have finished (at least slaves won't call it automatically, so that's your - reponsability). - """ - # insert standard metadata relations - for rtype, eid in self.metagen.base_etype_rels(etype).items(): - self._insert_meta_relation(etype, eid, '%s_relation' % rtype) - # insert cw_source, is and is_instance_of relations (normally handled by the system source) - self._insert_meta_relation(etype, self.metagen.source.eid, 'cw_source_relation') - eschema = self.schema[etype] - self._insert_meta_relation(etype, eschema.eid, 'is_relation') - for parent_eschema in chain(eschema.ancestors(), [eschema]): - self._insert_meta_relation(etype, parent_eschema.eid, 'is_instance_of_relation') - # finally insert records into the entities table - self.sql("INSERT INTO entities (eid, type, extid) " - "SELECT cw_eid, '%s', extid FROM cw_%s " - "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" - % (etype, etype.lower())) + self._initialized[self] = None # SQL utilities ######################################################### - def _drop_metadata_constraints_if_necessary(self): - """Drop constraints and indexes for the metadata tables if necessary.""" - if not self._constraints_dropped: - self._drop_metadata_constraints() - self._constraints_dropped = True - def _drop_metadata_constraints(self): """Drop constraints and indexes for the metadata tables. @@ -189,9 +147,19 @@ """Given an entity type, attributes and inlined relations, returns the inserted entity's eid. """ - if not self.slave_mode and etype not in self._initialized: - self._initialized.add(etype) - self.master_init_etype(etype) + if etype not in self._initialized: + if not self.slave_mode: + self.master_init() + tablename = 'cw_%s' % etype.lower() + tmp_tablename = '%s_%s' % (tablename, self.uuid) + self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype', %(uuid)s)", + {'e': etype, 'uuid': self.uuid}) + attr_defs = eschema_sql_def(self._source_dbhelper, self.schema[etype]) + self.sql('CREATE TABLE %s(%s);' % (tmp_tablename, + ', '.join('cw_%s %s' % (column, sqltype) + for column, sqltype in attr_defs))) + self._initialized[etype] = [attr for attr, _ in attr_defs] + if 'eid' not in data: # If eid is not given and the eids sequence is set, use the value from the sequence eid = self.get_next_eid() @@ -209,7 +177,7 @@ if not self.slave_mode: self.master_init() assert not self._cnx.vreg.schema.rschema(rtype).inlined - self._initialized.add(rtype) + self._initialized[rtype] = None tablename = '%s_relation' % rtype.lower() tmp_tablename = '%s_%s' % (tablename, self.uuid) self.sql("INSERT INTO cwmassive_initialized VALUES (%(r)s, 'rtype', %(uuid)s)", @@ -234,10 +202,31 @@ # Get all the initialized etypes/rtypes if self._dbh.table_exists('cwmassive_initialized'): cu = self.sql('SELECT retype, type, uuid FROM cwmassive_initialized') + entities = defaultdict(list) relations = defaultdict(list) for retype, _type, uuid in cu.fetchall(): if _type == 'rtype': relations[retype].append(uuid) + else: # _type = 'etype' + entities[retype].append(uuid) + # if there is some entities to insert, delete constraint on metadata tables once for all + if entities: + self._drop_metadata_constraints() + # get back entity data from the temporary tables + for etype, uuids in entities.items(): + tablename = 'cw_%s' % etype.lower() + attr_defs = eschema_sql_def(self._source_dbhelper, self.schema[etype]) + columns = ','.join('cw_%s' % attr for attr, _ in attr_defs) + self._dbh.drop_constraints(tablename) + self._dbh.drop_indexes(tablename) + for uuid in uuids: + tmp_tablename = '%s_%s' % (tablename, uuid) + self.sql('INSERT INTO %(table)s(%(columns)s) ' + 'SELECT %(columns)s FROM %(tmp_table)s' + % {'table': tablename, 'tmp_table': tmp_tablename, + 'columns': columns}) + self._insert_etype_metadata(etype, tmp_tablename) + self._tmp_data_cleanup(tmp_tablename, etype, uuid) # get back relation data from the temporary tables for rtype, uuids in relations.items(): tablename = '%s_relation' % rtype.lower() @@ -251,17 +240,43 @@ 'WHERE NOT EXISTS (SELECT 1 FROM %(table)s AS TT WHERE ' 'TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);' % {'table': tablename, 'tmp_table': tmp_tablename}) - # Drop temporary relation table and record from cwmassive_initialized - self.sql('DROP TABLE %(tmp_table)s' % {'tmp_table': tmp_tablename}) - self.sql('DELETE FROM cwmassive_initialized ' - 'WHERE retype = %(rtype)s AND uuid = %(uuid)s', - {'rtype': retype, 'uuid': uuid}) + self._tmp_data_cleanup(tmp_tablename, rtype, uuid) # restore all deleted indexes and constraints self._dbh.restore_indexes_and_constraints() # delete the meta data table self.sql('DROP TABLE IF EXISTS cwmassive_initialized') self.commit() + def _insert_etype_metadata(self, etype, tmp_tablename): + """Massive insertion of meta data for `etype`, with new entities in `tmp_tablename`. + """ + # insert standard metadata relations + for rtype, eid in self.metagen.base_etype_rels(etype).items(): + self._insert_meta_relation(tmp_tablename, rtype, eid) + # insert cw_source, is and is_instance_of relations (normally handled by the system source) + self._insert_meta_relation(tmp_tablename, 'cw_source', self.metagen.source.eid) + eschema = self.schema[etype] + self._insert_meta_relation(tmp_tablename, 'is', eschema.eid) + for parent_eschema in chain(eschema.ancestors(), [eschema]): + self._insert_meta_relation(tmp_tablename, 'is_instance_of', parent_eschema.eid) + # finally insert records into the entities table + self.sql("INSERT INTO entities(eid, type) " + "SELECT cw_eid, '%s' FROM %s " + "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" + % (etype, tmp_tablename)) + + def _insert_meta_relation(self, tmp_tablename, rtype, eid_to): + self.sql("INSERT INTO %s_relation(eid_from, eid_to) SELECT cw_eid, %s FROM %s " + "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" + % (rtype, eid_to, tmp_tablename)) + + def _tmp_data_cleanup(self, tmp_tablename, ertype, uuid): + """Drop temporary relation table and record from cwmassive_initialized.""" + self.sql('DROP TABLE %(tmp_table)s' % {'tmp_table': tmp_tablename}) + self.sql('DELETE FROM cwmassive_initialized ' + 'WHERE retype = %(rtype)s AND uuid = %(uuid)s', + {'rtype': ertype, 'uuid': uuid}) + # FLUSH ################################################################# def on_commit(self): @@ -296,38 +311,30 @@ if not data: # There is no data for these etype for this flush round. continue - # XXX It may be interresting to directly infer the columns' names from the schema - # XXX For now, the _create_copyfrom_buffer does a "row[column]" - # which can lead to a key error. - # Thus we should create dictionary with all the keys. - columns = set() - for d in data: - columns.update(d) - _base_data = dict.fromkeys(columns) + attrs = self._initialized[etype] + _base_data = dict.fromkeys(attrs) _base_data.update(self.default_values[etype]) _base_data.update(metagen.base_etype_attrs(etype)) _data = [] for d in data: + # do this first on `d`, because it won't fill keys associated to None as provided by + # `_base_data` + metagen.init_entity_attrs(etype, d['eid'], d) + # XXX warn/raise if there is some key not in attrs? _d = _base_data.copy() _d.update(d) - metagen.init_entity_attrs(etype, _d['eid'], _d) _data.append(_d) - buf = pgstore._create_copyfrom_buffer(_data, columns) - columns = ['cw_%s' % attr for attr in columns] + buf = pgstore._create_copyfrom_buffer(_data, attrs) + tablename = 'cw_%s' % etype.lower() + tmp_tablename = '%s_%s' % (tablename, self.uuid) + columns = ['cw_%s' % attr for attr in attrs] cursor = self._cnx.cnxset.cu try: - cursor.copy_from(buf, 'cw_%s' % etype.lower(), null='NULL', columns=columns) + cursor.copy_from(buf, tmp_tablename, null='NULL', columns=columns) except Exception as exc: self.on_rollback(exc, etype, data) # Clear data cache self._data_entities[etype] = [] - if not self.slave_mode: - self.master_insert_etype_metadata(etype) - - def _insert_meta_relation(self, etype, eid_to, rtype): - self.sql("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s " - "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" - % (rtype, eid_to, etype.lower())) def get_default_values(schema): diff -r 8e1fb9445d75 -r 71df2811b422 cubicweb/dataimport/test/test_massive_store.py --- a/cubicweb/dataimport/test/test_massive_store.py Tue Oct 11 10:24:13 2016 +0200 +++ b/cubicweb/dataimport/test/test_massive_store.py Thu Oct 06 12:12:04 2016 +0200 @@ -119,6 +119,7 @@ store.prepare_insert_entity('Location', timezone=timezone_eid) store.flush() store.commit() + store.finish() eid, etname = cnx.execute('Any X, TN WHERE X timezone TZ, X is T, ' 'T name TN')[0] self.assertEqual(cnx.entity_from_eid(eid).cw_etype, etname) @@ -231,16 +232,11 @@ counter = itertools.count() with self.admin_access.repo_cnx() as cnx: store = MassiveObjectStore(cnx, on_rollback_callback=lambda *_: next(counter)) - store.prepare_insert_entity('Location', nm='toto') - store.commit() # commit modification to the database before flush + # oversized attribute + store.prepare_insert_entity('Location', feature_class='toto') store.flush() self.assertEqual(next(counter), 1) - def test_slave_mode_exception(self): - with self.admin_access.repo_cnx() as cnx: - slave_store = MassiveObjectStore(cnx, slave_mode=True) - self.assertRaises(RuntimeError, slave_store.finish) - def test_simple_insert(self): with self.admin_access.repo_cnx() as cnx: store = MassiveObjectStore(cnx) @@ -263,10 +259,10 @@ # Check index indexes = all_indexes(cnx) self.assertIn('entities_pkey', indexes) - self.assertNotIn(build_index_name('owned_by_relation', ['eid_from', 'eid_to'], 'key_'), - indexes) - self.assertNotIn(build_index_name('owned_by_relation', ['eid_from'], 'idx_'), - indexes) + self.assertIn(build_index_name('owned_by_relation', ['eid_from', 'eid_to'], 'key_'), + indexes) + self.assertIn(build_index_name('owned_by_relation', ['eid_from'], 'idx_'), + indexes) # Cleanup -> index store.finish() diff -r 8e1fb9445d75 -r 71df2811b422 cubicweb/server/schema2sql.py --- a/cubicweb/server/schema2sql.py Tue Oct 11 10:24:13 2016 +0200 +++ b/cubicweb/server/schema2sql.py Thu Oct 06 12:12:04 2016 +0200 @@ -102,6 +102,28 @@ yield attrs, unique_index_name(eschema, attrs) +def eschema_sql_def(dbhelper, eschema, skip_relations=(), prefix=''): + """Return a list of (column names, sql type def) for the given entity schema. + + No constraint nor index are considered - this function is usually for massive import purpose. + """ + attrs = [attrdef for attrdef in eschema.attribute_definitions() + if not attrdef[0].type in skip_relations] + attrs += [(rschema, None) + for rschema in eschema.subject_relations() + if not rschema.final and rschema.inlined] + result = [] + for i in range(len(attrs)): + rschema, attrschema = attrs[i] + if attrschema is not None: + # creating = False will avoid NOT NULL / REFERENCES constraints + sqltype = aschema2sql(dbhelper, eschema, rschema, attrschema, creating=False) + else: # inline relation + sqltype = 'integer' + result.append(('%s%s' % (prefix, rschema.type), sqltype)) + return result + + def eschema2sql(dbhelper, eschema, skip_relations=(), prefix=''): """Yield SQL statements to initialize database from an entity schema.""" table = prefix + eschema.type diff -r 8e1fb9445d75 -r 71df2811b422 cubicweb/server/test/unittest_schema2sql.py --- a/cubicweb/server/test/unittest_schema2sql.py Tue Oct 11 10:24:13 2016 +0200 +++ b/cubicweb/server/test/unittest_schema2sql.py Thu Oct 06 12:12:04 2016 +0200 @@ -245,6 +245,32 @@ output = list(schema2sql.schema2sql(dbhelper, schema, skip_relations=('works_for',))) self.assertEqual(output, EXPECTED_DATA_NO_DROP) + def test_eschema_sql_def_attributes(self): + dbhelper = get_db_helper('postgres') + attr_defs = schema2sql.eschema_sql_def(dbhelper, schema['Person']) + self.assertEqual(attr_defs, + [('nom', 'varchar(64)'), + ('prenom', 'varchar(64)'), + ('sexe', "varchar(1) DEFAULT 'M'"), + ('promo', 'varchar(22)'), + ('titre', 'varchar(128)'), + ('adel', 'varchar(128)'), + ('ass', 'varchar(128)'), + ('web', 'varchar(128)'), + ('tel', 'integer'), + ('fax', 'integer'), + ('datenaiss', 'date'), + ('test', 'boolean'), + ('salary', 'float')]) + + def test_eschema_sql_def_inlined_rel(self): + dbhelper = get_db_helper('postgres') + attr_defs = schema2sql.eschema_sql_def(dbhelper, schema['Affaire']) + self.assertEqual(attr_defs, + [('sujet', 'varchar(128)'), + ('ref', 'varchar(12)'), + ('inline_rel', 'integer')]) + if __name__ == '__main__': unittest_main()