--- a/cubicweb/dataimport/massive_store.py Thu Jan 28 14:14:57 2016 +0100
+++ b/cubicweb/dataimport/massive_store.py Thu Jan 28 14:02:31 2016 +0100
@@ -159,21 +159,19 @@
'(origtable text, query text, type varchar(256))')
constraints = self._dbh.application_constraints(tablename)
for name, query in constraints.items():
- sql = 'INSERT INTO cwmassive_constraints VALUES (%(e)s, %(c)s, %(t)s)'
- self.sql(sql, {'e': tablename, 'c': query, 't': 'constraint'})
- sql = 'ALTER TABLE %s DROP CONSTRAINT %s' % (tablename, name)
- self.sql(sql)
+ self.sql('INSERT INTO cwmassive_constraints VALUES (%(e)s, %(c)s, %(t)s)',
+ {'e': tablename, 'c': query, 't': 'constraint'})
+ self.sql('ALTER TABLE %s DROP CONSTRAINT %s' % (tablename, name))
def reapply_all_constraints(self):
if not self._dbh.table_exists('cwmassive_constraints'):
self.logger.info('The table cwmassive_constraints does not exist')
return
- sql = 'SELECT query FROM cwmassive_constraints WHERE type = %(t)s'
- crs = self.sql(sql, {'t': 'constraint'})
- for query, in crs.fetchall():
+ cu = self.sql("SELECT query FROM cwmassive_constraints WHERE type='constraint'")
+ for query, in cu.fetchall():
self.sql(query)
- self.sql('DELETE FROM cwmassive_constraints WHERE type = %(t)s '
- 'AND query = %(q)s', {'t': 'constraint', 'q': query})
+ self.sql("DELETE FROM cwmassive_constraints WHERE type='constraint' AND query=%(q)s",
+ {'q': query})
def init_rtype_table(self, etype_from, rtype, etype_to):
""" Build temporary table for standard rtype """
@@ -242,8 +240,8 @@
""" Fill the uri_eid table
"""
self.logger.info('Fill uri_eid for etype %s', etype)
- sql = 'INSERT INTO uri_eid_%(e)s SELECT cw_%(l)s, cw_eid FROM cw_%(e)s'
- self.sql(sql % {'l': uri_label, 'e': etype.lower()})
+ self.sql('INSERT INTO uri_eid_%(e)s SELECT cw_%(l)s, cw_eid FROM cw_%(e)s'
+ % {'l': uri_label, 'e': etype.lower()})
# Add indexes
self.sql('CREATE INDEX uri_eid_%(e)s_idx ON uri_eid_%(e)s' '(uri)' % {'e': etype.lower()})
# Set the etype as converted
@@ -305,18 +303,17 @@
""" Drop and store table constraints and indexes """
indexes = self._dbh.application_indexes(tablename)
for name, query in indexes.items():
- sql = 'INSERT INTO cwmassive_constraints VALUES (%(e)s, %(c)s, %(t)s)'
- self.sql(sql, {'e': tablename, 'c': query, 't': 'index'})
- sql = 'DROP INDEX %s' % name
- self.sql(sql)
+ self.sql('INSERT INTO cwmassive_constraints VALUES (%(e)s, %(c)s, %(t)s)',
+ {'e': tablename, 'c': query, 't': 'index'})
+ self.sql('DROP INDEX %s' % name)
def reapply_constraint_index(self, tablename):
if not self._dbh.table_exists('cwmassive_constraints'):
self.logger.info('The table cwmassive_constraints does not exist')
return
- sql = 'SELECT query FROM cwmassive_constraints WHERE origtable = %(e)s'
- crs = self.sql(sql, {'e': tablename})
- for query, in crs.fetchall():
+ cu = self.sql('SELECT query FROM cwmassive_constraints WHERE origtable = %(e)s',
+ {'e': tablename})
+ for query, in cu.fetchall():
self.sql(query)
self.sql('DELETE FROM cwmassive_constraints WHERE origtable = %(e)s '
'AND query = %(q)s', {'e': tablename, 'q': query})
@@ -340,15 +337,15 @@
""" Get and remove all indexes for performance sake """
# Create temporary table
if not self.slave_mode and rtype not in self._rtypes:
- sql = "CREATE TABLE %s_relation_tmp (eid_from integer, eid_to integer)" % rtype.lower()
- self.sql(sql)
+ self.sql('CREATE TABLE %s_relation_tmp (eid_from integer, eid_to integer)'
+ % rtype.lower())
# Drop indexes and constraints
tablename = '%s_relation' % rtype.lower()
self.drop_and_store_indexes(tablename)
# Push the etype in the initialized table for easier restart
self.init_create_initialized_table()
- sql = 'INSERT INTO cwmassive_initialized VALUES (%(e)s, %(t)s)'
- self.sql(sql, {'e': rtype, 't': 'rtype'})
+ self.sql('INSERT INTO cwmassive_initialized VALUES (%(e)s, %(t)s)',
+ {'e': rtype, 't': 'rtype'})
# Mark rtype as "initialized" for faster check
self._rtypes.add(rtype)
@@ -369,8 +366,7 @@
self.drop_and_store_indexes(tablename)
# Push the etype in the initialized table for easier restart
self.init_create_initialized_table()
- sql = 'INSERT INTO cwmassive_initialized VALUES (%(e)s, %(t)s)'
- self.sql(sql, {'e': etype, 't': 'etype'})
+ self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype')", {'e': etype})
# Mark etype as "initialized" for faster check
self._entities.add(etype)
@@ -458,8 +454,8 @@
self._create_metatables_constraints()
# Get all the initialized etypes/rtypes
if self._dbh.table_exists('cwmassive_initialized'):
- crs = self.sql('SELECT retype, type FROM cwmassive_initialized')
- for retype, _type in crs.fetchall():
+ cu = self.sql('SELECT retype, type FROM cwmassive_initialized')
+ for retype, _type in cu.fetchall():
self.logger.info('Cleanup for %s' % retype)
if _type == 'etype':
# Cleanup entities tables - Recreate indexes
@@ -551,18 +547,17 @@
return
# Keep the correctly flush meta data in database
self.sql('CREATE TABLE IF NOT EXISTS cwmassive_metadata (etype text)')
- crs = self.sql('SELECT etype FROM cwmassive_metadata')
- already_flushed = set(e for e, in crs.fetchall())
- crs = self.sql('SELECT retype FROM cwmassive_initialized WHERE type = %(t)s',
- {'t': 'etype'})
- all_etypes = set(e for e, in crs.fetchall())
+ cu = self.sql('SELECT etype FROM cwmassive_metadata')
+ already_flushed = set(e for e, in cu.fetchall())
+ cu = self.sql('SELECT retype FROM cwmassive_initialized WHERE type = %(t)s',
+ {'t': 'etype'})
+ all_etypes = set(e for e, in cu.fetchall())
for etype in all_etypes:
if etype not in already_flushed:
# Deals with meta data
self.logger.info('Flushing meta data for %s' % etype)
self.insert_massive_meta_data(etype)
- sql = 'INSERT INTO cwmassive_metadata VALUES (%(e)s)'
- self.sql(sql, {'e': etype})
+ self.sql('INSERT INTO cwmassive_metadata VALUES (%(e)s)', {'e': etype})
def _cleanup_entities(self, etype):
""" Cleanup etype table """
@@ -573,14 +568,12 @@
def _cleanup_relations(self, rtype):
""" Cleanup rtype table """
# Push into relation table while removing duplicate
- sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT
- T.eid_from, T.eid_to FROM %(r)s_relation_tmp AS T
- WHERE NOT EXISTS (SELECT 1 FROM %(r)s_relation AS TT WHERE
- TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);''' % {'r': rtype}
- self.sql(sql)
+ self.sql('INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT '
+ 'T.eid_from, T.eid_to FROM %(r)s_relation_tmp AS T '
+ 'WHERE NOT EXISTS (SELECT 1 FROM %(r)s_relation AS TT WHERE '
+ 'TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);' % {'r': rtype})
# Drop temporary relation table
- sql = ('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()})
- self.sql(sql)
+ self.sql('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()})
# Create indexes and constraints
tablename = '%s_relation' % rtype.lower()
self.reapply_constraint_index(tablename)
@@ -596,17 +589,15 @@
self.metagen_push_relation(etype, self.source.eid, 'cw_source_relation')
self.metagen_push_relation(etype, self._etype_eid_idx[etype], 'is_relation')
self.metagen_push_relation(etype, self._etype_eid_idx[etype], 'is_instance_of_relation')
- sql = ("INSERT INTO entities (eid, type, asource, extid) "
- "SELECT cw_eid, '%s', 'system', NULL FROM cw_%s "
- "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
- % (etype, etype.lower()))
- self.sql(sql)
+ self.sql("INSERT INTO entities (eid, type, asource, extid) "
+ "SELECT cw_eid, '%s', 'system', NULL FROM cw_%s "
+ "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
+ % (etype, etype.lower()))
def metagen_push_relation(self, etype, eid_to, rtype):
- sql = ("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s "
- "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
- % (rtype, eid_to, etype.lower()))
- self.sql(sql)
+ self.sql("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s "
+ "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
+ % (rtype, eid_to, etype.lower()))
### CONSTRAINTS MANAGEMENT FUNCTIONS ##########################################
@@ -672,21 +663,10 @@
return _indexes, constraints
def table_exists(self, table_name):
- sql = "SELECT * from information_schema.tables WHERE table_name=%(t)s AND table_schema=%(s)s"
- crs = self.cnx.system_sql(sql, {'t': table_name, 's': self.pg_schema})
- res = crs.fetchall()
- if res:
- return True
- return False
-
- # def check_if_primary_key_exists_for_table(self, table_name):
- # sql = ("SELECT constraint_name FROM information_schema.table_constraints "
- # "WHERE constraint_type = 'PRIMARY KEY' AND table_name=%(t)s AND table_schema=%(s)s")
- # crs = self.cnx.system_sql(sql, {'t': table_name, 's': self.pg_schema})
- # res = crs.fetchall()
- # if res:
- # return True
- # return False
+ cu = self.cnx.system_sql('SELECT 1 from information_schema.tables '
+ 'WHERE table_name=%(t)s AND table_schema=%(s)s',
+ {'t': table_name, 's': self.pg_schema})
+ return bool(cu.fetchone())
def index_query(self, name):
"""Get the request to be used to recreate the index"""