# HG changeset patch # User Sylvain Thénault # Date 1454001407 -3600 # Node ID 21316020eae35ac5d1c250625c069cb9e0e0af0a # Parent fab543f542acb56702b4e8b09f9a35d533174542 [dataimport] move cwmassive_constraint temporary table handling to the PGHelper class with consistent renaming and some rationalization to avoid doing things several times and have code slightly easier to grasp. diff -r fab543f542ac -r 21316020eae3 cubicweb/dataimport/massive_store.py --- a/cubicweb/dataimport/massive_store.py Thu Jan 28 18:17:08 2016 +0100 +++ b/cubicweb/dataimport/massive_store.py Thu Jan 28 18:16:47 2016 +0100 @@ -245,69 +245,22 @@ except Exception as ex: self.logger.error("Can't insert relation %s: %s", rtype, ex) - # SQL UTILITIES ######################################################### + # SQL utilities ######################################################### def _drop_all_constraints(self): etypes_tables = ('cw_%s' % eschema.type.lower() for eschema in self.schema.entities() if not eschema.final) rtypes_tables = ('%s_relation' % rschema.type.lower() for rschema in self.schema.relations() if rschema_has_table(rschema, skip_relations=PURE_VIRTUAL_RTYPES)) - # Create a table to save the constraints, it allows reloading even after crash - self.sql('CREATE TABLE IF NOT EXISTS cwmassive_constraints' - '(origtable text, query text, type varchar(256))') for tablename in chain(etypes_tables, rtypes_tables, ('entities',)): - constraints = self._dbh.table_constraints(tablename) - for name, query in constraints.items(): - self.sql('INSERT INTO cwmassive_constraints VALUES (%(e)s, %(c)s, %(t)s)', - {'e': tablename, 'c': query, 't': 'constraint'}) - self.sql('ALTER TABLE %s DROP CONSTRAINT %s' % (tablename, name)) - - def _reapply_all_constraints(self): - if not self._dbh.table_exists('cwmassive_constraints'): - self.logger.info('The table cwmassive_constraints does not exist') - return - cu = self.sql("SELECT query FROM cwmassive_constraints WHERE type='constraint'") - for query, in cu.fetchall(): - self.sql(query) - self.sql("DELETE FROM cwmassive_constraints WHERE type='constraint' AND query=%(q)s", - {'q': query}) - - def drop_and_store_indexes(self, tablename): - """Drop indexes and constraints""" - # Create a table to save the constraints, it allows reloading even after crash - self.sql('CREATE TABLE IF NOT EXISTS cwmassive_constraints' - '(origtable text, query text, type varchar(256))') - indexes = self._dbh.table_indexes(tablename) - for name, query in indexes.items(): - self.sql('INSERT INTO cwmassive_constraints VALUES (%(e)s, %(c)s, %(t)s)', - {'e': tablename, 'c': query, 't': 'index'}) - self.sql('DROP INDEX %s' % name) - - def reapply_constraint_index(self, tablename): - if not self._dbh.table_exists('cwmassive_constraints'): - self.logger.info('The table cwmassive_constraints does not exist') - return - cu = self.sql('SELECT query FROM cwmassive_constraints WHERE origtable = %(e)s', - {'e': tablename}) - for query, in cu.fetchall(): - self.sql(query) - self.sql('DELETE FROM cwmassive_constraints WHERE origtable = %(e)s ' - 'AND query = %(q)s', {'e': tablename, 'q': query}) + self._dbh.drop_constraints(tablename) def _drop_metatables_constraints(self): """ Drop all the constraints for the meta data""" for tablename in ('created_by_relation', 'owned_by_relation', 'is_instance_of_relation', 'is_relation', 'entities'): - self.drop_and_store_indexes(tablename) - - def _create_metatables_constraints(self): - """ Create all the constraints for the meta data""" - for tablename in ('entities', - 'created_by_relation', 'owned_by_relation', - 'is_instance_of_relation', 'is_relation'): - # Indexes and constraints - self.reapply_constraint_index(tablename) + self._dbh.drop_indexes(tablename) def restart_eid_sequence(self, start_eid): self._cnx.system_sql(self._cnx.repo.system_source.dbhelper.sql_restart_numrange( @@ -339,7 +292,7 @@ """ if not self.slave_mode and etype not in self._initialized: self._initialized.add(etype) - self.drop_and_store_indexes('cw_%s' % etype.lower()) + self._dbh.drop_indexes('cw_%s' % etype.lower()) self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized' '(retype text, type varchar(128))') self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype')", {'e': etype}) @@ -365,7 +318,7 @@ """ if not self.slave_mode and rtype not in self._initialized: self._initialized.add(rtype) - self.drop_and_store_indexes('%s_relation' % rtype.lower()) + self._dbh.drop_indexes('%s_relation' % rtype.lower()) self.sql('CREATE TABLE %s_relation_tmp (eid_from integer, eid_to integer)' % rtype.lower()) self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized' @@ -396,24 +349,19 @@ # Remove relations tables for rtype in self._uri_rtypes: self.sql('DROP TABLE %(r)s_relation_iid_tmp' % {'r': rtype}) - # Create meta constraints (entities, is_instance_of, ...) - self._create_metatables_constraints() # Get all the initialized etypes/rtypes if self._dbh.table_exists('cwmassive_initialized'): cu = self.sql('SELECT retype, type FROM cwmassive_initialized') for retype, _type in cu.fetchall(): self.logger.info('Cleanup for %s' % retype) - if _type == 'etype': - # Cleanup entities tables - Recreate indexes - self.reapply_constraint_index('cw_%s' % etype.lower()) - elif _type == 'rtype': + if _type == 'rtype': # Cleanup relations tables self._cleanup_relations(retype) self.sql('DELETE FROM cwmassive_initialized WHERE retype = %(e)s', {'e': retype}) - self._reapply_all_constraints() + self._dbh.restore_indexes_and_constraints() # Delete the meta data table - for table_name in ('cwmassive_initialized', 'cwmassive_constraints', 'cwmassive_metadata'): + for table_name in ('cwmassive_initialized', 'cwmassive_metadata'): self.sql('DROP TABLE IF EXISTS %s' % table_name) self.commit() @@ -514,9 +462,6 @@ 'TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);' % {'r': rtype}) # Drop temporary relation table self.sql('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()}) - # Create indexes and constraints - tablename = '%s_relation' % rtype.lower() - self.reapply_constraint_index(tablename) def insert_massive_meta_data(self, etype): """ Massive insertion of meta data for a given etype, based on SQL statements. @@ -595,6 +540,35 @@ pg_schema = cnx.repo.config.system_source_config.get('db-namespace') or 'public' self.pg_schema = pg_schema + def drop_indexes(self, tablename): + """Drop indexes and constraints, storing them in a table for later restore.""" + # Create a table to save the constraints, it allows reloading even after crash + self.sql('CREATE TABLE IF NOT EXISTS cwmassive_constraints(sql TEXT, insert_order SERIAL)') + indexes = self.table_indexes(tablename) + for name, query in indexes.items(): + self.sql('INSERT INTO cwmassive_constraints(sql) VALUES (%(sql)s)', {'sql': query}) + self.sql('DROP INDEX %s' % name) + + def drop_constraints(self, tablename): + self.sql('CREATE TABLE IF NOT EXISTS cwmassive_constraints(sql TEXT, insert_order SERIAL)') + constraints = self.table_constraints(tablename) + for name, query in constraints.items(): + self.sql('INSERT INTO cwmassive_constraints(sql) VALUES (%(sql)s)', {'sql': query}) + self.sql('ALTER TABLE %s DROP CONSTRAINT %s' % (tablename, name)) + + def restore_indexes_and_constraints(self): + """Restore indexes and constraints.""" + if not self.table_exists('cwmassive_constraints'): + self.logger.info('The table cwmassive_constraints does not exist') + return + cu = self.sql('SELECT sql, insert_order FROM cwmassive_constraints ' + 'ORDER BY insert_order DESC') + for query, order in cu.fetchall(): + self.sql(query) + self.sql('DELETE FROM cwmassive_constraints WHERE insert_order=%(order)s', + {'order': order}) + self.sql('DROP TABLE cwmassive_constraints') + def table_exists(self, tablename): """Return True if the given table already exists in the database.""" cu = self.sql('SELECT 1 from information_schema.tables '