# HG changeset patch # User Sylvain Thénault # Date 1475653085 -7200 # Node ID c1aa50a88de390c0af4793ceb226ca38b69db711 # Parent 8865c9e55575b13bca19a994c84e59acfe7c7672 [massive store] Use a slave specific table for relation insertion in the massive store * attribute a random id to the store * add a column containing this id in the cwmassive_initialized table * separate generic `master_init` which create the cwmassive_initialized table from rtype specific initialization which must be done in each slave now * delay removal of table indexes to `finish` * get back relations from each table on `finish` Similar work still has to be done for entities insertion - which will allow more cleanup to the constraints handling which is still rough for now. Related to #15538303 diff -r 8865c9e55575 -r c1aa50a88de3 cubicweb/dataimport/massive_store.py --- a/cubicweb/dataimport/massive_store.py Tue Oct 04 13:14:35 2016 +0200 +++ b/cubicweb/dataimport/massive_store.py Wed Oct 05 09:38:05 2016 +0200 @@ -17,12 +17,14 @@ # You should have received a copy of the GNU Lesser General Public License along # with CubicWeb. If not, see . -import logging +from base64 import b64encode from copy import copy from collections import defaultdict from itertools import chain -from base64 import b64encode +import logging +from uuid import uuid4 +from six import text_type from six.moves import range from cubicweb.dataimport import stores, pgstore @@ -72,6 +74,8 @@ - `eids_seq_range`: size of eid range reserved by the store for each batch """ super(MassiveObjectStore, self).__init__(cnx) + + self.uuid = text_type(uuid4()).replace('-', '') self.on_commit_callback = on_commit_callback self.on_rollback_callback = on_rollback_callback self.slave_mode = slave_mode @@ -103,6 +107,17 @@ # master/slaves specific API + def master_init(self): + """Initialize database for massive insertion. + + This is expected to be called once, by the master store in master/slaves configuration. + """ + assert not self.slave_mode + if self not in self._initialized: + self.sql('CREATE TABLE cwmassive_initialized' + '(retype text, type varchar(128), uuid varchar(32))') + self._initialized.append(self) + def master_init_etype(self, etype): """Initialize database for insertion of entities of the given etype. @@ -114,25 +129,9 @@ self._dbh.drop_constraints(tablename) self._dbh.drop_indexes(tablename) self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized' - '(retype text, type varchar(128))') - self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype')", {'e': etype}) - self.sql('ALTER TABLE %s ADD COLUMN extid VARCHAR(256)' % tablename) - - def master_init_rtype(self, rtype): - """Initialize database for insertion of relation of the given rtype. - - This is expected to be called once, usually by the master store in master/slaves - configuration. - """ - assert not self._cnx.vreg.schema.rschema(rtype).inlined - self._drop_metadata_constraints_if_necessary() - tablename = '%s_relation' % rtype.lower() - self._dbh.drop_constraints(tablename) - self._dbh.drop_indexes(tablename) - self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized' - '(retype text, type varchar(128))') - self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'rtype')", {'e': rtype}) - self.sql('CREATE TABLE %s_tmp (eid_from integer, eid_to integer)' % tablename) + '(retype text, type varchar(128), uuid varchar(32))') + self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype', %(uuid)s)", + {'e': etype, 'uuid': self.uuid}) def master_insert_etype_metadata(self, etype): """Massive insertion of meta data for a given etype, based on SQL statements. @@ -214,9 +213,16 @@ Relation must not be inlined. """ - if not self.slave_mode and rtype not in self._initialized: + if rtype not in self._initialized: + if not self.slave_mode: + self.master_init() + assert not self._cnx.vreg.schema.rschema(rtype).inlined self._initialized.add(rtype) - self.master_init_rtype(rtype) + tablename = '%s_relation' % rtype.lower() + tmp_tablename = '%s_%s' % (tablename, self.uuid) + self.sql("INSERT INTO cwmassive_initialized VALUES (%(r)s, 'rtype', %(uuid)s)", + {'r': rtype, 'uuid': self.uuid}) + self.sql('CREATE TABLE %s(eid_from integer, eid_to integer)' % tmp_tablename) self._data_relations[rtype].append({'eid_from': eid_from, 'eid_to': eid_to}) def flush(self): @@ -236,14 +242,29 @@ self.logger.info("Start cleaning") # Get all the initialized etypes/rtypes if self._dbh.table_exists('cwmassive_initialized'): - cu = self.sql('SELECT retype, type FROM cwmassive_initialized') - for retype, _type in cu.fetchall(): - self.logger.info('Cleanup for %s' % retype) + cu = self.sql('SELECT retype, type, uuid FROM cwmassive_initialized') + relations = defaultdict(list) + for retype, _type, uuid in cu.fetchall(): if _type == 'rtype': - # Cleanup relations tables - self._cleanup_relations(retype) - self.sql('DELETE FROM cwmassive_initialized WHERE retype = %(e)s', - {'e': retype}) + relations[retype].append(uuid) + # get back relation data from the temporary tables + for rtype, uuids in relations.items(): + tablename = '%s_relation' % rtype.lower() + self._dbh.drop_constraints(tablename) + self._dbh.drop_indexes(tablename) + for uuid in uuids: + tmp_tablename = '%s_%s' % (tablename, uuid) + # XXX no index on the original relation table, EXISTS subquery may be sloooow + self.sql('INSERT INTO %(table)s(eid_from, eid_to) SELECT DISTINCT ' + 'T.eid_from, T.eid_to FROM %(tmp_table)s AS T ' + 'WHERE NOT EXISTS (SELECT 1 FROM %(table)s AS TT WHERE ' + 'TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);' + % {'table': tablename, 'tmp_table': tmp_tablename}) + # Drop temporary relation table and record from cwmassive_initialized + self.sql('DROP TABLE %(tmp_table)s' % {'tmp_table': tmp_tablename}) + self.sql('DELETE FROM cwmassive_initialized ' + 'WHERE retype = %(rtype)s AND uuid = %(uuid)s', + {'rtype': retype, 'uuid': uuid}) self._dbh.restore_indexes_and_constraints() # Delete the meta data table self.sql('DROP TABLE IF EXISTS cwmassive_initialized') @@ -274,8 +295,9 @@ raise ValueError cursor = self._cnx.cnxset.cu # Push into the tmp table - cursor.copy_from(buf, '%s_relation_tmp' % rtype.lower(), - null='NULL', columns=('eid_from', 'eid_to')) + tablename = '%s_relation' % rtype.lower() + tmp_tablename = '%s_%s' % (tablename, self.uuid) + cursor.copy_from(buf, tmp_tablename, null='NULL', columns=('eid_from', 'eid_to')) # Clear data cache self._data_relations[rtype] = [] @@ -313,16 +335,6 @@ if not self.slave_mode: self.master_insert_etype_metadata(etype) - def _cleanup_relations(self, rtype): - """ Cleanup rtype table """ - # Push into relation table while removing duplicate - self.sql('INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT ' - 'T.eid_from, T.eid_to FROM %(r)s_relation_tmp AS T ' - 'WHERE NOT EXISTS (SELECT 1 FROM %(r)s_relation AS TT WHERE ' - 'TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);' % {'r': rtype}) - # Drop temporary relation table - self.sql('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()}) - def _insert_meta_relation(self, etype, eid_to, rtype): self.sql("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s " "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"