[massive store] Store entities in temporary table as well
* on some entity type is encountered by a slave, create a dedicated table for
insertion of entities of this type by this slave, similarly to what is done
for relation - this should lower changes of conflicts in master/slaves mode ;
* delay drop of constraints and indexes to `finish` method, where copy from
temporary tables to regular table is done ;
* insertion of metadata is done by scanning temporary tables, which may be way
shorter than theier associated regular table ;
* drop drop_metadata_constraints with its constraint_dropped friend attribute,
there are no more necessary since this is done once in the `finish`.
Related to #15538303
--- a/cubicweb/dataimport/massive_store.py Tue Oct 11 10:24:13 2016 +0200
+++ b/cubicweb/dataimport/massive_store.py Thu Oct 06 12:12:04 2016 +0200
@@ -28,6 +28,7 @@
from six.moves import range
from cubicweb.dataimport import stores, pgstore
+from cubicweb.server.schema2sql import eschema_sql_def
class MassiveObjectStore(stores.RQLObjectStore):
@@ -89,12 +90,12 @@
self.schema = cnx.vreg.schema
self.default_values = get_default_values(self.schema)
self.get_next_eid = lambda g=self._get_eid_gen(): next(g)
+ self._source_dbhelper = cnx.repo.system_source.dbhelper
self._dbh = PGHelper(cnx)
self._data_entities = defaultdict(list)
self._data_relations = defaultdict(list)
- self._initialized = set()
- self._constraints_dropped = self.slave_mode
+ self._initialized = {}
def _get_eid_gen(self):
""" Function getting the next eid. This is done by preselecting
@@ -116,53 +117,10 @@
if self not in self._initialized:
self.sql('CREATE TABLE cwmassive_initialized'
'(retype text, type varchar(128), uuid varchar(32))')
- self._initialized.append(self)
-
- def master_init_etype(self, etype):
- """Initialize database for insertion of entities of the given etype.
-
- This is expected to be called once, usually by the master store in master/slaves
- configuration.
- """
- self._drop_metadata_constraints_if_necessary()
- tablename = 'cw_%s' % etype.lower()
- self._dbh.drop_constraints(tablename)
- self._dbh.drop_indexes(tablename)
- self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized'
- '(retype text, type varchar(128), uuid varchar(32))')
- self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype', %(uuid)s)",
- {'e': etype, 'uuid': self.uuid})
-
- def master_insert_etype_metadata(self, etype):
- """Massive insertion of meta data for a given etype, based on SQL statements.
-
- In master/slabes configuration, you'll usually want to call it from the master once all
- slaves have finished (at least slaves won't call it automatically, so that's your
- reponsability).
- """
- # insert standard metadata relations
- for rtype, eid in self.metagen.base_etype_rels(etype).items():
- self._insert_meta_relation(etype, eid, '%s_relation' % rtype)
- # insert cw_source, is and is_instance_of relations (normally handled by the system source)
- self._insert_meta_relation(etype, self.metagen.source.eid, 'cw_source_relation')
- eschema = self.schema[etype]
- self._insert_meta_relation(etype, eschema.eid, 'is_relation')
- for parent_eschema in chain(eschema.ancestors(), [eschema]):
- self._insert_meta_relation(etype, parent_eschema.eid, 'is_instance_of_relation')
- # finally insert records into the entities table
- self.sql("INSERT INTO entities (eid, type, extid) "
- "SELECT cw_eid, '%s', extid FROM cw_%s "
- "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
- % (etype, etype.lower()))
+ self._initialized[self] = None
# SQL utilities #########################################################
- def _drop_metadata_constraints_if_necessary(self):
- """Drop constraints and indexes for the metadata tables if necessary."""
- if not self._constraints_dropped:
- self._drop_metadata_constraints()
- self._constraints_dropped = True
-
def _drop_metadata_constraints(self):
"""Drop constraints and indexes for the metadata tables.
@@ -189,9 +147,19 @@
"""Given an entity type, attributes and inlined relations, returns the inserted entity's
eid.
"""
- if not self.slave_mode and etype not in self._initialized:
- self._initialized.add(etype)
- self.master_init_etype(etype)
+ if etype not in self._initialized:
+ if not self.slave_mode:
+ self.master_init()
+ tablename = 'cw_%s' % etype.lower()
+ tmp_tablename = '%s_%s' % (tablename, self.uuid)
+ self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype', %(uuid)s)",
+ {'e': etype, 'uuid': self.uuid})
+ attr_defs = eschema_sql_def(self._source_dbhelper, self.schema[etype])
+ self.sql('CREATE TABLE %s(%s);' % (tmp_tablename,
+ ', '.join('cw_%s %s' % (column, sqltype)
+ for column, sqltype in attr_defs)))
+ self._initialized[etype] = [attr for attr, _ in attr_defs]
+
if 'eid' not in data:
# If eid is not given and the eids sequence is set, use the value from the sequence
eid = self.get_next_eid()
@@ -209,7 +177,7 @@
if not self.slave_mode:
self.master_init()
assert not self._cnx.vreg.schema.rschema(rtype).inlined
- self._initialized.add(rtype)
+ self._initialized[rtype] = None
tablename = '%s_relation' % rtype.lower()
tmp_tablename = '%s_%s' % (tablename, self.uuid)
self.sql("INSERT INTO cwmassive_initialized VALUES (%(r)s, 'rtype', %(uuid)s)",
@@ -234,10 +202,31 @@
# Get all the initialized etypes/rtypes
if self._dbh.table_exists('cwmassive_initialized'):
cu = self.sql('SELECT retype, type, uuid FROM cwmassive_initialized')
+ entities = defaultdict(list)
relations = defaultdict(list)
for retype, _type, uuid in cu.fetchall():
if _type == 'rtype':
relations[retype].append(uuid)
+ else: # _type = 'etype'
+ entities[retype].append(uuid)
+ # if there is some entities to insert, delete constraint on metadata tables once for all
+ if entities:
+ self._drop_metadata_constraints()
+ # get back entity data from the temporary tables
+ for etype, uuids in entities.items():
+ tablename = 'cw_%s' % etype.lower()
+ attr_defs = eschema_sql_def(self._source_dbhelper, self.schema[etype])
+ columns = ','.join('cw_%s' % attr for attr, _ in attr_defs)
+ self._dbh.drop_constraints(tablename)
+ self._dbh.drop_indexes(tablename)
+ for uuid in uuids:
+ tmp_tablename = '%s_%s' % (tablename, uuid)
+ self.sql('INSERT INTO %(table)s(%(columns)s) '
+ 'SELECT %(columns)s FROM %(tmp_table)s'
+ % {'table': tablename, 'tmp_table': tmp_tablename,
+ 'columns': columns})
+ self._insert_etype_metadata(etype, tmp_tablename)
+ self._tmp_data_cleanup(tmp_tablename, etype, uuid)
# get back relation data from the temporary tables
for rtype, uuids in relations.items():
tablename = '%s_relation' % rtype.lower()
@@ -251,17 +240,43 @@
'WHERE NOT EXISTS (SELECT 1 FROM %(table)s AS TT WHERE '
'TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);'
% {'table': tablename, 'tmp_table': tmp_tablename})
- # Drop temporary relation table and record from cwmassive_initialized
- self.sql('DROP TABLE %(tmp_table)s' % {'tmp_table': tmp_tablename})
- self.sql('DELETE FROM cwmassive_initialized '
- 'WHERE retype = %(rtype)s AND uuid = %(uuid)s',
- {'rtype': retype, 'uuid': uuid})
+ self._tmp_data_cleanup(tmp_tablename, rtype, uuid)
# restore all deleted indexes and constraints
self._dbh.restore_indexes_and_constraints()
# delete the meta data table
self.sql('DROP TABLE IF EXISTS cwmassive_initialized')
self.commit()
+ def _insert_etype_metadata(self, etype, tmp_tablename):
+ """Massive insertion of meta data for `etype`, with new entities in `tmp_tablename`.
+ """
+ # insert standard metadata relations
+ for rtype, eid in self.metagen.base_etype_rels(etype).items():
+ self._insert_meta_relation(tmp_tablename, rtype, eid)
+ # insert cw_source, is and is_instance_of relations (normally handled by the system source)
+ self._insert_meta_relation(tmp_tablename, 'cw_source', self.metagen.source.eid)
+ eschema = self.schema[etype]
+ self._insert_meta_relation(tmp_tablename, 'is', eschema.eid)
+ for parent_eschema in chain(eschema.ancestors(), [eschema]):
+ self._insert_meta_relation(tmp_tablename, 'is_instance_of', parent_eschema.eid)
+ # finally insert records into the entities table
+ self.sql("INSERT INTO entities(eid, type) "
+ "SELECT cw_eid, '%s' FROM %s "
+ "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
+ % (etype, tmp_tablename))
+
+ def _insert_meta_relation(self, tmp_tablename, rtype, eid_to):
+ self.sql("INSERT INTO %s_relation(eid_from, eid_to) SELECT cw_eid, %s FROM %s "
+ "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
+ % (rtype, eid_to, tmp_tablename))
+
+ def _tmp_data_cleanup(self, tmp_tablename, ertype, uuid):
+ """Drop temporary relation table and record from cwmassive_initialized."""
+ self.sql('DROP TABLE %(tmp_table)s' % {'tmp_table': tmp_tablename})
+ self.sql('DELETE FROM cwmassive_initialized '
+ 'WHERE retype = %(rtype)s AND uuid = %(uuid)s',
+ {'rtype': ertype, 'uuid': uuid})
+
# FLUSH #################################################################
def on_commit(self):
@@ -296,38 +311,30 @@
if not data:
# There is no data for these etype for this flush round.
continue
- # XXX It may be interresting to directly infer the columns' names from the schema
- # XXX For now, the _create_copyfrom_buffer does a "row[column]"
- # which can lead to a key error.
- # Thus we should create dictionary with all the keys.
- columns = set()
- for d in data:
- columns.update(d)
- _base_data = dict.fromkeys(columns)
+ attrs = self._initialized[etype]
+ _base_data = dict.fromkeys(attrs)
_base_data.update(self.default_values[etype])
_base_data.update(metagen.base_etype_attrs(etype))
_data = []
for d in data:
+ # do this first on `d`, because it won't fill keys associated to None as provided by
+ # `_base_data`
+ metagen.init_entity_attrs(etype, d['eid'], d)
+ # XXX warn/raise if there is some key not in attrs?
_d = _base_data.copy()
_d.update(d)
- metagen.init_entity_attrs(etype, _d['eid'], _d)
_data.append(_d)
- buf = pgstore._create_copyfrom_buffer(_data, columns)
- columns = ['cw_%s' % attr for attr in columns]
+ buf = pgstore._create_copyfrom_buffer(_data, attrs)
+ tablename = 'cw_%s' % etype.lower()
+ tmp_tablename = '%s_%s' % (tablename, self.uuid)
+ columns = ['cw_%s' % attr for attr in attrs]
cursor = self._cnx.cnxset.cu
try:
- cursor.copy_from(buf, 'cw_%s' % etype.lower(), null='NULL', columns=columns)
+ cursor.copy_from(buf, tmp_tablename, null='NULL', columns=columns)
except Exception as exc:
self.on_rollback(exc, etype, data)
# Clear data cache
self._data_entities[etype] = []
- if not self.slave_mode:
- self.master_insert_etype_metadata(etype)
-
- def _insert_meta_relation(self, etype, eid_to, rtype):
- self.sql("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s "
- "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
- % (rtype, eid_to, etype.lower()))
def get_default_values(schema):
--- a/cubicweb/dataimport/test/test_massive_store.py Tue Oct 11 10:24:13 2016 +0200
+++ b/cubicweb/dataimport/test/test_massive_store.py Thu Oct 06 12:12:04 2016 +0200
@@ -119,6 +119,7 @@
store.prepare_insert_entity('Location', timezone=timezone_eid)
store.flush()
store.commit()
+ store.finish()
eid, etname = cnx.execute('Any X, TN WHERE X timezone TZ, X is T, '
'T name TN')[0]
self.assertEqual(cnx.entity_from_eid(eid).cw_etype, etname)
@@ -231,16 +232,11 @@
counter = itertools.count()
with self.admin_access.repo_cnx() as cnx:
store = MassiveObjectStore(cnx, on_rollback_callback=lambda *_: next(counter))
- store.prepare_insert_entity('Location', nm='toto')
- store.commit() # commit modification to the database before flush
+ # oversized attribute
+ store.prepare_insert_entity('Location', feature_class='toto')
store.flush()
self.assertEqual(next(counter), 1)
- def test_slave_mode_exception(self):
- with self.admin_access.repo_cnx() as cnx:
- slave_store = MassiveObjectStore(cnx, slave_mode=True)
- self.assertRaises(RuntimeError, slave_store.finish)
-
def test_simple_insert(self):
with self.admin_access.repo_cnx() as cnx:
store = MassiveObjectStore(cnx)
@@ -263,10 +259,10 @@
# Check index
indexes = all_indexes(cnx)
self.assertIn('entities_pkey', indexes)
- self.assertNotIn(build_index_name('owned_by_relation', ['eid_from', 'eid_to'], 'key_'),
- indexes)
- self.assertNotIn(build_index_name('owned_by_relation', ['eid_from'], 'idx_'),
- indexes)
+ self.assertIn(build_index_name('owned_by_relation', ['eid_from', 'eid_to'], 'key_'),
+ indexes)
+ self.assertIn(build_index_name('owned_by_relation', ['eid_from'], 'idx_'),
+ indexes)
# Cleanup -> index
store.finish()
--- a/cubicweb/server/schema2sql.py Tue Oct 11 10:24:13 2016 +0200
+++ b/cubicweb/server/schema2sql.py Thu Oct 06 12:12:04 2016 +0200
@@ -102,6 +102,28 @@
yield attrs, unique_index_name(eschema, attrs)
+def eschema_sql_def(dbhelper, eschema, skip_relations=(), prefix=''):
+ """Return a list of (column names, sql type def) for the given entity schema.
+
+ No constraint nor index are considered - this function is usually for massive import purpose.
+ """
+ attrs = [attrdef for attrdef in eschema.attribute_definitions()
+ if not attrdef[0].type in skip_relations]
+ attrs += [(rschema, None)
+ for rschema in eschema.subject_relations()
+ if not rschema.final and rschema.inlined]
+ result = []
+ for i in range(len(attrs)):
+ rschema, attrschema = attrs[i]
+ if attrschema is not None:
+ # creating = False will avoid NOT NULL / REFERENCES constraints
+ sqltype = aschema2sql(dbhelper, eschema, rschema, attrschema, creating=False)
+ else: # inline relation
+ sqltype = 'integer'
+ result.append(('%s%s' % (prefix, rschema.type), sqltype))
+ return result
+
+
def eschema2sql(dbhelper, eschema, skip_relations=(), prefix=''):
"""Yield SQL statements to initialize database from an entity schema."""
table = prefix + eschema.type
--- a/cubicweb/server/test/unittest_schema2sql.py Tue Oct 11 10:24:13 2016 +0200
+++ b/cubicweb/server/test/unittest_schema2sql.py Thu Oct 06 12:12:04 2016 +0200
@@ -245,6 +245,32 @@
output = list(schema2sql.schema2sql(dbhelper, schema, skip_relations=('works_for',)))
self.assertEqual(output, EXPECTED_DATA_NO_DROP)
+ def test_eschema_sql_def_attributes(self):
+ dbhelper = get_db_helper('postgres')
+ attr_defs = schema2sql.eschema_sql_def(dbhelper, schema['Person'])
+ self.assertEqual(attr_defs,
+ [('nom', 'varchar(64)'),
+ ('prenom', 'varchar(64)'),
+ ('sexe', "varchar(1) DEFAULT 'M'"),
+ ('promo', 'varchar(22)'),
+ ('titre', 'varchar(128)'),
+ ('adel', 'varchar(128)'),
+ ('ass', 'varchar(128)'),
+ ('web', 'varchar(128)'),
+ ('tel', 'integer'),
+ ('fax', 'integer'),
+ ('datenaiss', 'date'),
+ ('test', 'boolean'),
+ ('salary', 'float')])
+
+ def test_eschema_sql_def_inlined_rel(self):
+ dbhelper = get_db_helper('postgres')
+ attr_defs = schema2sql.eschema_sql_def(dbhelper, schema['Affaire'])
+ self.assertEqual(attr_defs,
+ [('sujet', 'varchar(128)'),
+ ('ref', 'varchar(12)'),
+ ('inline_rel', 'integer')])
+
if __name__ == '__main__':
unittest_main()