--- a/cubicweb/dataimport/massive_store.py Wed Sep 28 09:02:14 2016 +0200
+++ b/cubicweb/dataimport/massive_store.py Fri Sep 30 17:34:59 2016 +0200
@@ -106,6 +106,61 @@
for eid in range(last_eid - self.eids_seq_range + 1, last_eid + 1):
yield eid
+ # master/slaves specific API
+
+ def master_init_etype(self, etype):
+ """Initialize database for insertion of entities of the given etype.
+
+ This is expected to be called once, usually by the master store in master/slaves
+ configuration.
+ """
+ self._drop_metadata_constraints_if_necessary()
+ tablename = 'cw_%s' % etype.lower()
+ self._dbh.drop_constraints(tablename)
+ self._dbh.drop_indexes(tablename)
+ self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized'
+ '(retype text, type varchar(128))')
+ self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype')", {'e': etype})
+ self.sql('ALTER TABLE %s ADD COLUMN extid VARCHAR(256)' % tablename)
+
+ def master_init_rtype(self, rtype):
+ """Initialize database for insertion of relation of the given rtype.
+
+ This is expected to be called once, usually by the master store in master/slaves
+ configuration.
+ """
+ assert not self._cnx.vreg.schema.rschema(rtype).inlined
+ self._drop_metadata_constraints_if_necessary()
+ tablename = '%s_relation' % rtype.lower()
+ self._dbh.drop_constraints(tablename)
+ self._dbh.drop_indexes(tablename)
+ self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized'
+ '(retype text, type varchar(128))')
+ self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'rtype')", {'e': rtype})
+ self.sql('CREATE TABLE %s_tmp (eid_from integer, eid_to integer)' % tablename)
+
+ def master_insert_etype_metadata(self, etype):
+ """Massive insertion of meta data for a given etype, based on SQL statements.
+
+ In master/slabes configuration, you'll usually want to call it from the master once all
+ slaves have finished (at least slaves won't call it automatically, so that's your
+ reponsability).
+ """
+ # insert standard metadata relations
+ for rtype, eid in self.metagen.base_etype_rels(etype).items():
+ self._insert_meta_relation(etype, eid, '%s_relation' % rtype)
+ # insert cw_source, is and is_instance_of relations (normally handled by the system source)
+ self._insert_meta_relation(etype, self.metagen.source.eid, 'cw_source_relation')
+ eschema = self.schema[etype]
+ self._insert_meta_relation(etype, eschema.eid, 'is_relation')
+ for parent_eschema in chain(eschema.ancestors(), [eschema]):
+ self._insert_meta_relation(etype, parent_eschema.eid, 'is_instance_of_relation')
+ # finally insert records into the entities table
+ self.sql("INSERT INTO entities (eid, type, extid) "
+ "SELECT cw_eid, '%s', extid FROM cw_%s "
+ "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
+ % (etype, etype.lower()))
+
# SQL utilities #########################################################
def _drop_metadata_constraints_if_necessary(self):
@@ -142,13 +197,7 @@
"""
if not self.slave_mode and etype not in self._initialized:
self._initialized.add(etype)
- self._drop_metadata_constraints_if_necessary()
- tablename = 'cw_%s' % etype.lower()
- self._dbh.drop_constraints(tablename)
- self._dbh.drop_indexes(tablename)
- self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized'
- '(retype text, type varchar(128))')
- self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype')", {'e': etype})
+ self.master_init_etype(etype)
attrs = self.metagen.base_etype_attrs(etype)
data = copy(attrs) # base_etype_attrs is @cached, a copy is necessary
data.update(kwargs)
@@ -171,17 +220,8 @@
Relation must not be inlined.
"""
if not self.slave_mode and rtype not in self._initialized:
- assert not self._cnx.vreg.schema.rschema(rtype).inlined
self._initialized.add(rtype)
- self._drop_metadata_constraints_if_necessary()
- tablename = '%s_relation' % rtype.lower()
- self._dbh.drop_constraints(tablename)
- self._dbh.drop_indexes(tablename)
- self.sql('CREATE TABLE %s_tmp (eid_from integer, eid_to integer)'
- % tablename)
- self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized'
- '(retype text, type varchar(128))')
- self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'rtype')", {'e': rtype})
+ self.master_init_rtype(rtype)
self._data_relations[rtype].append({'eid_from': eid_from, 'eid_to': eid_to})
def flush(self):
@@ -276,7 +316,7 @@
# Clear data cache
self._data_entities[etype] = []
if not self.slave_mode:
- self._insert_etype_metadata(etype)
+ self.master_insert_etype_metadata(etype)
def _cleanup_relations(self, rtype):
""" Cleanup rtype table """
@@ -288,24 +328,6 @@
# Drop temporary relation table
self.sql('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()})
- def _insert_etype_metadata(self, etype):
- """Massive insertion of meta data for a given etype, based on SQL statements.
- """
- # insert standard metadata relations
- for rtype, eid in self.metagen.base_etype_rels(etype).items():
- self._insert_meta_relation(etype, eid, '%s_relation' % rtype)
- # insert cw_source, is and is_instance_of relations (normally handled by the system source)
- self._insert_meta_relation(etype, self.metagen.source.eid, 'cw_source_relation')
- eschema = self.schema[etype]
- self._insert_meta_relation(etype, eschema.eid, 'is_relation')
- for parent_eschema in chain(eschema.ancestors(), [eschema]):
- self._insert_meta_relation(etype, parent_eschema.eid, 'is_instance_of_relation')
- # finally insert records into the entities table
- self.sql("INSERT INTO entities (eid, type) "
- "SELECT cw_eid, '%s' FROM cw_%s "
- "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
- % (etype, etype.lower()))
-
def _insert_meta_relation(self, etype, eid_to, rtype):
self.sql("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s "
"WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
--- a/cubicweb/dataimport/test/test_massive_store.py Wed Sep 28 09:02:14 2016 +0200
+++ b/cubicweb/dataimport/test/test_massive_store.py Fri Sep 30 17:34:59 2016 +0200
@@ -138,7 +138,7 @@
with self.admin_access.repo_cnx() as cnx:
store = MassiveObjectStore(cnx)
- store._drop_constraints()
+ store._drop_metadata_constraints()
indexes = all_indexes(cnx)
self.assertIn('entities_pkey', indexes)
self.assertNotIn(build_index_name('owned_by_relation', ['eid_from', 'eid_to'], 'key_'),
@@ -159,8 +159,8 @@
with self.admin_access.repo_cnx() as cnx:
metagen = stores.MetadataGenerator(cnx, meta_skipped=('owned_by',))
store = MassiveObjectStore(cnx, metagen=metagen)
+ store._drop_metadata_constraints()
- store._drop_constraints()
indexes = all_indexes(cnx)
self.assertIn(build_index_name('owned_by_relation', ['eid_from', 'eid_to'], 'key_'),
indexes)