diff -r 8e1fb9445d75 -r 71df2811b422 cubicweb/dataimport/massive_store.py --- a/cubicweb/dataimport/massive_store.py Tue Oct 11 10:24:13 2016 +0200 +++ b/cubicweb/dataimport/massive_store.py Thu Oct 06 12:12:04 2016 +0200 @@ -28,6 +28,7 @@ from six.moves import range from cubicweb.dataimport import stores, pgstore +from cubicweb.server.schema2sql import eschema_sql_def class MassiveObjectStore(stores.RQLObjectStore): @@ -89,12 +90,12 @@ self.schema = cnx.vreg.schema self.default_values = get_default_values(self.schema) self.get_next_eid = lambda g=self._get_eid_gen(): next(g) + self._source_dbhelper = cnx.repo.system_source.dbhelper self._dbh = PGHelper(cnx) self._data_entities = defaultdict(list) self._data_relations = defaultdict(list) - self._initialized = set() - self._constraints_dropped = self.slave_mode + self._initialized = {} def _get_eid_gen(self): """ Function getting the next eid. This is done by preselecting @@ -116,53 +117,10 @@ if self not in self._initialized: self.sql('CREATE TABLE cwmassive_initialized' '(retype text, type varchar(128), uuid varchar(32))') - self._initialized.append(self) - - def master_init_etype(self, etype): - """Initialize database for insertion of entities of the given etype. - - This is expected to be called once, usually by the master store in master/slaves - configuration. - """ - self._drop_metadata_constraints_if_necessary() - tablename = 'cw_%s' % etype.lower() - self._dbh.drop_constraints(tablename) - self._dbh.drop_indexes(tablename) - self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized' - '(retype text, type varchar(128), uuid varchar(32))') - self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype', %(uuid)s)", - {'e': etype, 'uuid': self.uuid}) - - def master_insert_etype_metadata(self, etype): - """Massive insertion of meta data for a given etype, based on SQL statements. - - In master/slabes configuration, you'll usually want to call it from the master once all - slaves have finished (at least slaves won't call it automatically, so that's your - reponsability). - """ - # insert standard metadata relations - for rtype, eid in self.metagen.base_etype_rels(etype).items(): - self._insert_meta_relation(etype, eid, '%s_relation' % rtype) - # insert cw_source, is and is_instance_of relations (normally handled by the system source) - self._insert_meta_relation(etype, self.metagen.source.eid, 'cw_source_relation') - eschema = self.schema[etype] - self._insert_meta_relation(etype, eschema.eid, 'is_relation') - for parent_eschema in chain(eschema.ancestors(), [eschema]): - self._insert_meta_relation(etype, parent_eschema.eid, 'is_instance_of_relation') - # finally insert records into the entities table - self.sql("INSERT INTO entities (eid, type, extid) " - "SELECT cw_eid, '%s', extid FROM cw_%s " - "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" - % (etype, etype.lower())) + self._initialized[self] = None # SQL utilities ######################################################### - def _drop_metadata_constraints_if_necessary(self): - """Drop constraints and indexes for the metadata tables if necessary.""" - if not self._constraints_dropped: - self._drop_metadata_constraints() - self._constraints_dropped = True - def _drop_metadata_constraints(self): """Drop constraints and indexes for the metadata tables. @@ -189,9 +147,19 @@ """Given an entity type, attributes and inlined relations, returns the inserted entity's eid. """ - if not self.slave_mode and etype not in self._initialized: - self._initialized.add(etype) - self.master_init_etype(etype) + if etype not in self._initialized: + if not self.slave_mode: + self.master_init() + tablename = 'cw_%s' % etype.lower() + tmp_tablename = '%s_%s' % (tablename, self.uuid) + self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype', %(uuid)s)", + {'e': etype, 'uuid': self.uuid}) + attr_defs = eschema_sql_def(self._source_dbhelper, self.schema[etype]) + self.sql('CREATE TABLE %s(%s);' % (tmp_tablename, + ', '.join('cw_%s %s' % (column, sqltype) + for column, sqltype in attr_defs))) + self._initialized[etype] = [attr for attr, _ in attr_defs] + if 'eid' not in data: # If eid is not given and the eids sequence is set, use the value from the sequence eid = self.get_next_eid() @@ -209,7 +177,7 @@ if not self.slave_mode: self.master_init() assert not self._cnx.vreg.schema.rschema(rtype).inlined - self._initialized.add(rtype) + self._initialized[rtype] = None tablename = '%s_relation' % rtype.lower() tmp_tablename = '%s_%s' % (tablename, self.uuid) self.sql("INSERT INTO cwmassive_initialized VALUES (%(r)s, 'rtype', %(uuid)s)", @@ -234,10 +202,31 @@ # Get all the initialized etypes/rtypes if self._dbh.table_exists('cwmassive_initialized'): cu = self.sql('SELECT retype, type, uuid FROM cwmassive_initialized') + entities = defaultdict(list) relations = defaultdict(list) for retype, _type, uuid in cu.fetchall(): if _type == 'rtype': relations[retype].append(uuid) + else: # _type = 'etype' + entities[retype].append(uuid) + # if there is some entities to insert, delete constraint on metadata tables once for all + if entities: + self._drop_metadata_constraints() + # get back entity data from the temporary tables + for etype, uuids in entities.items(): + tablename = 'cw_%s' % etype.lower() + attr_defs = eschema_sql_def(self._source_dbhelper, self.schema[etype]) + columns = ','.join('cw_%s' % attr for attr, _ in attr_defs) + self._dbh.drop_constraints(tablename) + self._dbh.drop_indexes(tablename) + for uuid in uuids: + tmp_tablename = '%s_%s' % (tablename, uuid) + self.sql('INSERT INTO %(table)s(%(columns)s) ' + 'SELECT %(columns)s FROM %(tmp_table)s' + % {'table': tablename, 'tmp_table': tmp_tablename, + 'columns': columns}) + self._insert_etype_metadata(etype, tmp_tablename) + self._tmp_data_cleanup(tmp_tablename, etype, uuid) # get back relation data from the temporary tables for rtype, uuids in relations.items(): tablename = '%s_relation' % rtype.lower() @@ -251,17 +240,43 @@ 'WHERE NOT EXISTS (SELECT 1 FROM %(table)s AS TT WHERE ' 'TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);' % {'table': tablename, 'tmp_table': tmp_tablename}) - # Drop temporary relation table and record from cwmassive_initialized - self.sql('DROP TABLE %(tmp_table)s' % {'tmp_table': tmp_tablename}) - self.sql('DELETE FROM cwmassive_initialized ' - 'WHERE retype = %(rtype)s AND uuid = %(uuid)s', - {'rtype': retype, 'uuid': uuid}) + self._tmp_data_cleanup(tmp_tablename, rtype, uuid) # restore all deleted indexes and constraints self._dbh.restore_indexes_and_constraints() # delete the meta data table self.sql('DROP TABLE IF EXISTS cwmassive_initialized') self.commit() + def _insert_etype_metadata(self, etype, tmp_tablename): + """Massive insertion of meta data for `etype`, with new entities in `tmp_tablename`. + """ + # insert standard metadata relations + for rtype, eid in self.metagen.base_etype_rels(etype).items(): + self._insert_meta_relation(tmp_tablename, rtype, eid) + # insert cw_source, is and is_instance_of relations (normally handled by the system source) + self._insert_meta_relation(tmp_tablename, 'cw_source', self.metagen.source.eid) + eschema = self.schema[etype] + self._insert_meta_relation(tmp_tablename, 'is', eschema.eid) + for parent_eschema in chain(eschema.ancestors(), [eschema]): + self._insert_meta_relation(tmp_tablename, 'is_instance_of', parent_eschema.eid) + # finally insert records into the entities table + self.sql("INSERT INTO entities(eid, type) " + "SELECT cw_eid, '%s' FROM %s " + "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" + % (etype, tmp_tablename)) + + def _insert_meta_relation(self, tmp_tablename, rtype, eid_to): + self.sql("INSERT INTO %s_relation(eid_from, eid_to) SELECT cw_eid, %s FROM %s " + "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" + % (rtype, eid_to, tmp_tablename)) + + def _tmp_data_cleanup(self, tmp_tablename, ertype, uuid): + """Drop temporary relation table and record from cwmassive_initialized.""" + self.sql('DROP TABLE %(tmp_table)s' % {'tmp_table': tmp_tablename}) + self.sql('DELETE FROM cwmassive_initialized ' + 'WHERE retype = %(rtype)s AND uuid = %(uuid)s', + {'rtype': ertype, 'uuid': uuid}) + # FLUSH ################################################################# def on_commit(self): @@ -296,38 +311,30 @@ if not data: # There is no data for these etype for this flush round. continue - # XXX It may be interresting to directly infer the columns' names from the schema - # XXX For now, the _create_copyfrom_buffer does a "row[column]" - # which can lead to a key error. - # Thus we should create dictionary with all the keys. - columns = set() - for d in data: - columns.update(d) - _base_data = dict.fromkeys(columns) + attrs = self._initialized[etype] + _base_data = dict.fromkeys(attrs) _base_data.update(self.default_values[etype]) _base_data.update(metagen.base_etype_attrs(etype)) _data = [] for d in data: + # do this first on `d`, because it won't fill keys associated to None as provided by + # `_base_data` + metagen.init_entity_attrs(etype, d['eid'], d) + # XXX warn/raise if there is some key not in attrs? _d = _base_data.copy() _d.update(d) - metagen.init_entity_attrs(etype, _d['eid'], _d) _data.append(_d) - buf = pgstore._create_copyfrom_buffer(_data, columns) - columns = ['cw_%s' % attr for attr in columns] + buf = pgstore._create_copyfrom_buffer(_data, attrs) + tablename = 'cw_%s' % etype.lower() + tmp_tablename = '%s_%s' % (tablename, self.uuid) + columns = ['cw_%s' % attr for attr in attrs] cursor = self._cnx.cnxset.cu try: - cursor.copy_from(buf, 'cw_%s' % etype.lower(), null='NULL', columns=columns) + cursor.copy_from(buf, tmp_tablename, null='NULL', columns=columns) except Exception as exc: self.on_rollback(exc, etype, data) # Clear data cache self._data_entities[etype] = [] - if not self.slave_mode: - self.master_insert_etype_metadata(etype) - - def _insert_meta_relation(self, etype, eid_to, rtype): - self.sql("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s " - "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" - % (rtype, eid_to, etype.lower())) def get_default_values(schema):