--- a/cubicweb/dataimport/massive_store.py Thu Jan 28 18:17:08 2016 +0100
+++ b/cubicweb/dataimport/massive_store.py Thu Jan 28 18:16:47 2016 +0100
@@ -245,69 +245,22 @@
except Exception as ex:
self.logger.error("Can't insert relation %s: %s", rtype, ex)
- # SQL UTILITIES #########################################################
+ # SQL utilities #########################################################
def _drop_all_constraints(self):
etypes_tables = ('cw_%s' % eschema.type.lower() for eschema in self.schema.entities()
if not eschema.final)
rtypes_tables = ('%s_relation' % rschema.type.lower() for rschema in self.schema.relations()
if rschema_has_table(rschema, skip_relations=PURE_VIRTUAL_RTYPES))
- # Create a table to save the constraints, it allows reloading even after crash
- self.sql('CREATE TABLE IF NOT EXISTS cwmassive_constraints'
- '(origtable text, query text, type varchar(256))')
for tablename in chain(etypes_tables, rtypes_tables, ('entities',)):
- constraints = self._dbh.table_constraints(tablename)
- for name, query in constraints.items():
- self.sql('INSERT INTO cwmassive_constraints VALUES (%(e)s, %(c)s, %(t)s)',
- {'e': tablename, 'c': query, 't': 'constraint'})
- self.sql('ALTER TABLE %s DROP CONSTRAINT %s' % (tablename, name))
-
- def _reapply_all_constraints(self):
- if not self._dbh.table_exists('cwmassive_constraints'):
- self.logger.info('The table cwmassive_constraints does not exist')
- return
- cu = self.sql("SELECT query FROM cwmassive_constraints WHERE type='constraint'")
- for query, in cu.fetchall():
- self.sql(query)
- self.sql("DELETE FROM cwmassive_constraints WHERE type='constraint' AND query=%(q)s",
- {'q': query})
-
- def drop_and_store_indexes(self, tablename):
- """Drop indexes and constraints"""
- # Create a table to save the constraints, it allows reloading even after crash
- self.sql('CREATE TABLE IF NOT EXISTS cwmassive_constraints'
- '(origtable text, query text, type varchar(256))')
- indexes = self._dbh.table_indexes(tablename)
- for name, query in indexes.items():
- self.sql('INSERT INTO cwmassive_constraints VALUES (%(e)s, %(c)s, %(t)s)',
- {'e': tablename, 'c': query, 't': 'index'})
- self.sql('DROP INDEX %s' % name)
-
- def reapply_constraint_index(self, tablename):
- if not self._dbh.table_exists('cwmassive_constraints'):
- self.logger.info('The table cwmassive_constraints does not exist')
- return
- cu = self.sql('SELECT query FROM cwmassive_constraints WHERE origtable = %(e)s',
- {'e': tablename})
- for query, in cu.fetchall():
- self.sql(query)
- self.sql('DELETE FROM cwmassive_constraints WHERE origtable = %(e)s '
- 'AND query = %(q)s', {'e': tablename, 'q': query})
+ self._dbh.drop_constraints(tablename)
def _drop_metatables_constraints(self):
""" Drop all the constraints for the meta data"""
for tablename in ('created_by_relation', 'owned_by_relation',
'is_instance_of_relation', 'is_relation',
'entities'):
- self.drop_and_store_indexes(tablename)
-
- def _create_metatables_constraints(self):
- """ Create all the constraints for the meta data"""
- for tablename in ('entities',
- 'created_by_relation', 'owned_by_relation',
- 'is_instance_of_relation', 'is_relation'):
- # Indexes and constraints
- self.reapply_constraint_index(tablename)
+ self._dbh.drop_indexes(tablename)
def restart_eid_sequence(self, start_eid):
self._cnx.system_sql(self._cnx.repo.system_source.dbhelper.sql_restart_numrange(
@@ -339,7 +292,7 @@
"""
if not self.slave_mode and etype not in self._initialized:
self._initialized.add(etype)
- self.drop_and_store_indexes('cw_%s' % etype.lower())
+ self._dbh.drop_indexes('cw_%s' % etype.lower())
self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized'
'(retype text, type varchar(128))')
self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype')", {'e': etype})
@@ -365,7 +318,7 @@
"""
if not self.slave_mode and rtype not in self._initialized:
self._initialized.add(rtype)
- self.drop_and_store_indexes('%s_relation' % rtype.lower())
+ self._dbh.drop_indexes('%s_relation' % rtype.lower())
self.sql('CREATE TABLE %s_relation_tmp (eid_from integer, eid_to integer)'
% rtype.lower())
self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized'
@@ -396,24 +349,19 @@
# Remove relations tables
for rtype in self._uri_rtypes:
self.sql('DROP TABLE %(r)s_relation_iid_tmp' % {'r': rtype})
- # Create meta constraints (entities, is_instance_of, ...)
- self._create_metatables_constraints()
# Get all the initialized etypes/rtypes
if self._dbh.table_exists('cwmassive_initialized'):
cu = self.sql('SELECT retype, type FROM cwmassive_initialized')
for retype, _type in cu.fetchall():
self.logger.info('Cleanup for %s' % retype)
- if _type == 'etype':
- # Cleanup entities tables - Recreate indexes
- self.reapply_constraint_index('cw_%s' % etype.lower())
- elif _type == 'rtype':
+ if _type == 'rtype':
# Cleanup relations tables
self._cleanup_relations(retype)
self.sql('DELETE FROM cwmassive_initialized WHERE retype = %(e)s',
{'e': retype})
- self._reapply_all_constraints()
+ self._dbh.restore_indexes_and_constraints()
# Delete the meta data table
- for table_name in ('cwmassive_initialized', 'cwmassive_constraints', 'cwmassive_metadata'):
+ for table_name in ('cwmassive_initialized', 'cwmassive_metadata'):
self.sql('DROP TABLE IF EXISTS %s' % table_name)
self.commit()
@@ -514,9 +462,6 @@
'TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);' % {'r': rtype})
# Drop temporary relation table
self.sql('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()})
- # Create indexes and constraints
- tablename = '%s_relation' % rtype.lower()
- self.reapply_constraint_index(tablename)
def insert_massive_meta_data(self, etype):
""" Massive insertion of meta data for a given etype, based on SQL statements.
@@ -595,6 +540,35 @@
pg_schema = cnx.repo.config.system_source_config.get('db-namespace') or 'public'
self.pg_schema = pg_schema
+ def drop_indexes(self, tablename):
+ """Drop indexes and constraints, storing them in a table for later restore."""
+ # Create a table to save the constraints, it allows reloading even after crash
+ self.sql('CREATE TABLE IF NOT EXISTS cwmassive_constraints(sql TEXT, insert_order SERIAL)')
+ indexes = self.table_indexes(tablename)
+ for name, query in indexes.items():
+ self.sql('INSERT INTO cwmassive_constraints(sql) VALUES (%(sql)s)', {'sql': query})
+ self.sql('DROP INDEX %s' % name)
+
+ def drop_constraints(self, tablename):
+ self.sql('CREATE TABLE IF NOT EXISTS cwmassive_constraints(sql TEXT, insert_order SERIAL)')
+ constraints = self.table_constraints(tablename)
+ for name, query in constraints.items():
+ self.sql('INSERT INTO cwmassive_constraints(sql) VALUES (%(sql)s)', {'sql': query})
+ self.sql('ALTER TABLE %s DROP CONSTRAINT %s' % (tablename, name))
+
+ def restore_indexes_and_constraints(self):
+ """Restore indexes and constraints."""
+ if not self.table_exists('cwmassive_constraints'):
+ self.logger.info('The table cwmassive_constraints does not exist')
+ return
+ cu = self.sql('SELECT sql, insert_order FROM cwmassive_constraints '
+ 'ORDER BY insert_order DESC')
+ for query, order in cu.fetchall():
+ self.sql(query)
+ self.sql('DELETE FROM cwmassive_constraints WHERE insert_order=%(order)s',
+ {'order': order})
+ self.sql('DROP TABLE cwmassive_constraints')
+
def table_exists(self, tablename):
"""Return True if the given table already exists in the database."""
cu = self.sql('SELECT 1 from information_schema.tables '