diff -r fd45fc498c1b -r 3a83759854ee cubicweb/dataimport/massive_store.py --- a/cubicweb/dataimport/massive_store.py Thu Jan 28 14:14:57 2016 +0100 +++ b/cubicweb/dataimport/massive_store.py Thu Jan 28 14:02:31 2016 +0100 @@ -159,21 +159,19 @@ '(origtable text, query text, type varchar(256))') constraints = self._dbh.application_constraints(tablename) for name, query in constraints.items(): - sql = 'INSERT INTO cwmassive_constraints VALUES (%(e)s, %(c)s, %(t)s)' - self.sql(sql, {'e': tablename, 'c': query, 't': 'constraint'}) - sql = 'ALTER TABLE %s DROP CONSTRAINT %s' % (tablename, name) - self.sql(sql) + self.sql('INSERT INTO cwmassive_constraints VALUES (%(e)s, %(c)s, %(t)s)', + {'e': tablename, 'c': query, 't': 'constraint'}) + self.sql('ALTER TABLE %s DROP CONSTRAINT %s' % (tablename, name)) def reapply_all_constraints(self): if not self._dbh.table_exists('cwmassive_constraints'): self.logger.info('The table cwmassive_constraints does not exist') return - sql = 'SELECT query FROM cwmassive_constraints WHERE type = %(t)s' - crs = self.sql(sql, {'t': 'constraint'}) - for query, in crs.fetchall(): + cu = self.sql("SELECT query FROM cwmassive_constraints WHERE type='constraint'") + for query, in cu.fetchall(): self.sql(query) - self.sql('DELETE FROM cwmassive_constraints WHERE type = %(t)s ' - 'AND query = %(q)s', {'t': 'constraint', 'q': query}) + self.sql("DELETE FROM cwmassive_constraints WHERE type='constraint' AND query=%(q)s", + {'q': query}) def init_rtype_table(self, etype_from, rtype, etype_to): """ Build temporary table for standard rtype """ @@ -242,8 +240,8 @@ """ Fill the uri_eid table """ self.logger.info('Fill uri_eid for etype %s', etype) - sql = 'INSERT INTO uri_eid_%(e)s SELECT cw_%(l)s, cw_eid FROM cw_%(e)s' - self.sql(sql % {'l': uri_label, 'e': etype.lower()}) + self.sql('INSERT INTO uri_eid_%(e)s SELECT cw_%(l)s, cw_eid FROM cw_%(e)s' + % {'l': uri_label, 'e': etype.lower()}) # Add indexes self.sql('CREATE INDEX uri_eid_%(e)s_idx ON uri_eid_%(e)s' '(uri)' % {'e': etype.lower()}) # Set the etype as converted @@ -305,18 +303,17 @@ """ Drop and store table constraints and indexes """ indexes = self._dbh.application_indexes(tablename) for name, query in indexes.items(): - sql = 'INSERT INTO cwmassive_constraints VALUES (%(e)s, %(c)s, %(t)s)' - self.sql(sql, {'e': tablename, 'c': query, 't': 'index'}) - sql = 'DROP INDEX %s' % name - self.sql(sql) + self.sql('INSERT INTO cwmassive_constraints VALUES (%(e)s, %(c)s, %(t)s)', + {'e': tablename, 'c': query, 't': 'index'}) + self.sql('DROP INDEX %s' % name) def reapply_constraint_index(self, tablename): if not self._dbh.table_exists('cwmassive_constraints'): self.logger.info('The table cwmassive_constraints does not exist') return - sql = 'SELECT query FROM cwmassive_constraints WHERE origtable = %(e)s' - crs = self.sql(sql, {'e': tablename}) - for query, in crs.fetchall(): + cu = self.sql('SELECT query FROM cwmassive_constraints WHERE origtable = %(e)s', + {'e': tablename}) + for query, in cu.fetchall(): self.sql(query) self.sql('DELETE FROM cwmassive_constraints WHERE origtable = %(e)s ' 'AND query = %(q)s', {'e': tablename, 'q': query}) @@ -340,15 +337,15 @@ """ Get and remove all indexes for performance sake """ # Create temporary table if not self.slave_mode and rtype not in self._rtypes: - sql = "CREATE TABLE %s_relation_tmp (eid_from integer, eid_to integer)" % rtype.lower() - self.sql(sql) + self.sql('CREATE TABLE %s_relation_tmp (eid_from integer, eid_to integer)' + % rtype.lower()) # Drop indexes and constraints tablename = '%s_relation' % rtype.lower() self.drop_and_store_indexes(tablename) # Push the etype in the initialized table for easier restart self.init_create_initialized_table() - sql = 'INSERT INTO cwmassive_initialized VALUES (%(e)s, %(t)s)' - self.sql(sql, {'e': rtype, 't': 'rtype'}) + self.sql('INSERT INTO cwmassive_initialized VALUES (%(e)s, %(t)s)', + {'e': rtype, 't': 'rtype'}) # Mark rtype as "initialized" for faster check self._rtypes.add(rtype) @@ -369,8 +366,7 @@ self.drop_and_store_indexes(tablename) # Push the etype in the initialized table for easier restart self.init_create_initialized_table() - sql = 'INSERT INTO cwmassive_initialized VALUES (%(e)s, %(t)s)' - self.sql(sql, {'e': etype, 't': 'etype'}) + self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype')", {'e': etype}) # Mark etype as "initialized" for faster check self._entities.add(etype) @@ -458,8 +454,8 @@ self._create_metatables_constraints() # Get all the initialized etypes/rtypes if self._dbh.table_exists('cwmassive_initialized'): - crs = self.sql('SELECT retype, type FROM cwmassive_initialized') - for retype, _type in crs.fetchall(): + cu = self.sql('SELECT retype, type FROM cwmassive_initialized') + for retype, _type in cu.fetchall(): self.logger.info('Cleanup for %s' % retype) if _type == 'etype': # Cleanup entities tables - Recreate indexes @@ -551,18 +547,17 @@ return # Keep the correctly flush meta data in database self.sql('CREATE TABLE IF NOT EXISTS cwmassive_metadata (etype text)') - crs = self.sql('SELECT etype FROM cwmassive_metadata') - already_flushed = set(e for e, in crs.fetchall()) - crs = self.sql('SELECT retype FROM cwmassive_initialized WHERE type = %(t)s', - {'t': 'etype'}) - all_etypes = set(e for e, in crs.fetchall()) + cu = self.sql('SELECT etype FROM cwmassive_metadata') + already_flushed = set(e for e, in cu.fetchall()) + cu = self.sql('SELECT retype FROM cwmassive_initialized WHERE type = %(t)s', + {'t': 'etype'}) + all_etypes = set(e for e, in cu.fetchall()) for etype in all_etypes: if etype not in already_flushed: # Deals with meta data self.logger.info('Flushing meta data for %s' % etype) self.insert_massive_meta_data(etype) - sql = 'INSERT INTO cwmassive_metadata VALUES (%(e)s)' - self.sql(sql, {'e': etype}) + self.sql('INSERT INTO cwmassive_metadata VALUES (%(e)s)', {'e': etype}) def _cleanup_entities(self, etype): """ Cleanup etype table """ @@ -573,14 +568,12 @@ def _cleanup_relations(self, rtype): """ Cleanup rtype table """ # Push into relation table while removing duplicate - sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT - T.eid_from, T.eid_to FROM %(r)s_relation_tmp AS T - WHERE NOT EXISTS (SELECT 1 FROM %(r)s_relation AS TT WHERE - TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);''' % {'r': rtype} - self.sql(sql) + self.sql('INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT ' + 'T.eid_from, T.eid_to FROM %(r)s_relation_tmp AS T ' + 'WHERE NOT EXISTS (SELECT 1 FROM %(r)s_relation AS TT WHERE ' + 'TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);' % {'r': rtype}) # Drop temporary relation table - sql = ('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()}) - self.sql(sql) + self.sql('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()}) # Create indexes and constraints tablename = '%s_relation' % rtype.lower() self.reapply_constraint_index(tablename) @@ -596,17 +589,15 @@ self.metagen_push_relation(etype, self.source.eid, 'cw_source_relation') self.metagen_push_relation(etype, self._etype_eid_idx[etype], 'is_relation') self.metagen_push_relation(etype, self._etype_eid_idx[etype], 'is_instance_of_relation') - sql = ("INSERT INTO entities (eid, type, asource, extid) " - "SELECT cw_eid, '%s', 'system', NULL FROM cw_%s " - "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" - % (etype, etype.lower())) - self.sql(sql) + self.sql("INSERT INTO entities (eid, type, asource, extid) " + "SELECT cw_eid, '%s', 'system', NULL FROM cw_%s " + "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" + % (etype, etype.lower())) def metagen_push_relation(self, etype, eid_to, rtype): - sql = ("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s " - "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" - % (rtype, eid_to, etype.lower())) - self.sql(sql) + self.sql("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s " + "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" + % (rtype, eid_to, etype.lower())) ### CONSTRAINTS MANAGEMENT FUNCTIONS ########################################## @@ -672,21 +663,10 @@ return _indexes, constraints def table_exists(self, table_name): - sql = "SELECT * from information_schema.tables WHERE table_name=%(t)s AND table_schema=%(s)s" - crs = self.cnx.system_sql(sql, {'t': table_name, 's': self.pg_schema}) - res = crs.fetchall() - if res: - return True - return False - - # def check_if_primary_key_exists_for_table(self, table_name): - # sql = ("SELECT constraint_name FROM information_schema.table_constraints " - # "WHERE constraint_type = 'PRIMARY KEY' AND table_name=%(t)s AND table_schema=%(s)s") - # crs = self.cnx.system_sql(sql, {'t': table_name, 's': self.pg_schema}) - # res = crs.fetchall() - # if res: - # return True - # return False + cu = self.cnx.system_sql('SELECT 1 from information_schema.tables ' + 'WHERE table_name=%(t)s AND table_schema=%(s)s', + {'t': table_name, 's': self.pg_schema}) + return bool(cu.fetchone()) def index_query(self, name): """Get the request to be used to recreate the index"""