action, view and service so managers can start source synchronization from the web ui
Closes #5474286
# coding: utf-8# copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT ANY# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.importloggingfromdatetimeimportdatetimefromcollectionsimportdefaultdictfromioimportStringIOfromsix.movesimportrangefromyams.constraintsimportSizeConstraintfrompsycopg2importProgrammingErrorfromcubicweb.dataimportimportstores,pgstorefromcubicweb.utilsimportmake_uidfromcubicweb.server.sqlutilsimportSQL_PREFIXclassMassiveObjectStore(stores.RQLObjectStore):""" Store for massive import of data, with delayed insertion of meta data. WARNINGS: - This store may be only used with PostgreSQL for now, as it relies on the COPY FROM method, and on specific PostgreSQL tables to get all the indexes. - This store can only insert relations that are not inlined (i.e., which do *not* have inlined=True in their definition in the schema). It should be used as follows: store = MassiveObjectStore(cnx) store.init_rtype_table('Person', 'lives_in', 'Location') ... store.prepare_insert_entity('Person', subj_iid_attribute=person_iid, ...) store.prepare_insert_entity('Location', obj_iid_attribute=location_iid, ...) ... # subj_iid_attribute and obj_iid_attribute are argument names # chosen by the user (e.g. "cwuri"). These names can be identical. # person_iid and location_iid are unique IDs and depend on the data # (e.g URI). store.flush() store.relate_by_iid(person_iid, 'lives_in', location_iid) # For example: store.prepare_insert_entity('Person', cwuri='http://dbpedia.org/toto', name='Toto') store.prepare_insert_entity('Location', uri='http://geonames.org/11111', name='Somewhere') store.flush() store.relate_by_iid('http://dbpedia.org/toto', 'lives_in', 'http://geonames.org/11111') # Finally store.convert_relations('Person', 'lives_in', 'Location', 'subj_iid_attribute', 'obj_iid_attribute') # For the previous example: store.convert_relations('Person', 'lives_in', 'Location', 'cwuri', 'uri') ... store.commit() store.finish() """# size of eid range reserved by the store for each batcheids_seq_range=10000# initial eid (None means use the value in the db)eids_seq_start=None# max size of the iid, used to create the iid_eid conversion tableiid_maxsize=1024def__init__(self,cnx,on_commit_callback=None,on_rollback_callback=None,slave_mode=False,source=None):""" Create a MassiveObject store, with the following attributes: - cnx: CubicWeb cnx """super(MassiveObjectStore,self).__init__(cnx)self.logger=logging.getLogger('dataimport.massive_store')self._cnx=cnxself.sql=cnx.system_sqlself._data_uri_relations=defaultdict(list)# etypes for which we have a uri_eid_%(etype)s tableself._init_uri_eid=set()# etypes for which we have a uri_eid_%(e)s_idx indexself._uri_eid_inserted=set()# set of rtypes for which we have a %(rtype)s_relation_iid_tmp tableself._uri_rtypes=set()# set of etypes whose tables are createdself._entities=set()# set of rtypes for which we have a %(rtype)s_relation_tmp tableself._rtypes=set()self.slave_mode=slave_modeself.default_values=get_default_values(cnx.vreg.schema)pg_schema=cnx.repo.config.system_source_config.get('db-namespace')or'public'self._dbh=PGHelper(self._cnx,pg_schema)self._data_entities=defaultdict(list)self._data_relations=defaultdict(list)self._now=datetime.now()self._default_cwuri=make_uid('_auto_generated')self._count_cwuri=0self.on_commit_callback=on_commit_callbackself.on_rollback_callback=on_rollback_callback# Do our meta tables already exist?self._init_massive_metatables()# Internal markers of initializationifself.eids_seq_startisnotNoneandnotself.slave_mode:self._cnx.system_sql(self._cnx.repo.system_source.dbhelper.sql_restart_numrange('entities_id_seq',initial_value=self.eids_seq_start+1))cnx.commit()self.get_next_eid=lambdag=self._get_eid_gen():next(g)# recreate then when self.finish() is calledifnotself.slave_mode:self._drop_metatables_constraints()ifsourceisNone:source=cnx.repo.system_sourceself.source=sourceself._etype_eid_idx=dict(cnx.execute('Any XN,X WHERE X is CWEType, X name XN'))cnx.read_security=Falsecnx.write_security=False### INIT FUNCTIONS ########################################################definit_rtype_table(self,etype_from,rtype,etype_to):""" Build temporary table for standard rtype """# Create an uri_eid table for each etype for a better# control of which etype is concerned by a particular# possibly multivalued relation.foretypein(etype_from,etype_to):ifetypeandetypenotinself._init_uri_eid:self._init_uri_eid_table(etype)ifrtypenotinself._uri_rtypes:# Create the temporary tableifnotself._cnx.repo.schema.rschema(rtype).inlined:try:sql='CREATE TABLE %(r)s_relation_iid_tmp (uri_from character ' \'varying(%(s)s), uri_to character varying(%(s)s))'self.sql(sql%{'r':rtype,'s':self.iid_maxsize})exceptProgrammingError:# XXX Already exist (probably due to multiple import)passelse:self.logger.warning("inlined relation %s: cannot insert it",rtype)# Add it to the initialized setself._uri_rtypes.add(rtype)def_init_uri_eid_table(self,etype):""" Build a temporary table for id/eid convertion """try:sql="CREATE TABLE uri_eid_%(e)s (uri character varying(%(size)s), eid integer)"self.sql(sql%{'e':etype.lower(),'size':self.iid_maxsize,})exceptProgrammingError:# XXX Already exist (probably due to multiple import)pass# Add it to the initialized setself._init_uri_eid.add(etype)def_init_massive_metatables(self):# Check if our tables are not already created (i.e. a restart)self._initialized_table_created=self._dbh.table_exists('cwmassive_initialized')self._constraint_table_created=self._dbh.table_exists('cwmassive_constraints')self._metadata_table_created=self._dbh.table_exists('cwmassive_metadata')### RELATE FUNCTION #######################################################defrelate_by_iid(self,iid_from,rtype,iid_to):"""Add new relation based on the internal id (iid) of the entities (not the eid)"""# Push dataifisinstance(iid_from,unicode):iid_from=iid_from.encode('utf-8')ifisinstance(iid_to,unicode):iid_to=iid_to.encode('utf-8')self._data_uri_relations[rtype].append({'uri_from':iid_from,'uri_to':iid_to})### FLUSH FUNCTIONS #######################################################defflush_relations(self):""" Flush the relations data """forrtype,datainself._data_uri_relations.items():ifnotdata:self.logger.info('No data for rtype %s',rtype)buf=StringIO('\n'.join(['%(uri_from)s\t%(uri_to)s'%dfordindata]))ifnotbuf:self.logger.info('Empty Buffer for rtype %s',rtype)continuecursor=self._cnx.cnxset.cuifnotself._cnx.repo.schema.rschema(rtype).inlined:cursor.copy_from(buf,'%s_relation_iid_tmp'%rtype.lower(),null='NULL',columns=('uri_from','uri_to'))else:self.logger.warning("inlined relation %s: cannot insert it",rtype)buf.close()# Clear data cacheself._data_uri_relations[rtype]=[]deffill_uri_eid_table(self,etype,uri_label):""" Fill the uri_eid table """self.logger.info('Fill uri_eid for etype %s',etype)sql='INSERT INTO uri_eid_%(e)s SELECT cw_%(l)s, cw_eid FROM cw_%(e)s'self.sql(sql%{'l':uri_label,'e':etype.lower()})# Add indexesself.sql('CREATE INDEX uri_eid_%(e)s_idx ON uri_eid_%(e)s''(uri)'%{'e':etype.lower()})# Set the etype as convertedself._uri_eid_inserted.add(etype)defconvert_relations(self,etype_from,rtype,etype_to,uri_label_from='cwuri',uri_label_to='cwuri'):""" Flush the converted relations """# Always flush relations to be sureself.logger.info('Convert relations %s%s%s',etype_from,rtype,etype_to)self.flush_relations()ifuri_label_fromandetype_fromnotinself._uri_eid_inserted:self.fill_uri_eid_table(etype_from,uri_label_from)ifuri_label_toandetype_tonotinself._uri_eid_inserted:self.fill_uri_eid_table(etype_to,uri_label_to)ifself._cnx.repo.schema.rschema(rtype).inlined:self.logger.warning("Can't insert inlined relation %s",rtype)returnifuri_label_fromanduri_label_to:sql='''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT O1.eid, O2.eid FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(ef)s as O1, uri_eid_%(et)s as O2 WHERE O1.uri=T.uri_from AND O2.uri=T.uri_to AND NOT EXISTS ( SELECT 1 FROM %(r)s_relation AS TT WHERE TT.eid_from=O1.eid AND TT.eid_to=O2.eid); '''elifuri_label_to:sql='''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT CAST(T.uri_from AS INTEGER), O1.eid FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(et)s as O1 WHERE O1.uri=T.uri_to AND NOT EXISTS ( SELECT 1 FROM %(r)s_relation AS TT WHERE TT.eid_from=CAST(T.uri_from AS INTEGER) AND TT.eid_to=O1.eid); '''elifuri_label_from:sql='''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT O1.eid, T.uri_to O1.eid, CAST(T.uri_to AS INTEGER) FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(ef)s as O1 WHERE O1.uri=T.uri_from AND NOT EXISTS ( SELECT 1 FROM %(r)s_relation AS TT WHERE TT.eid_from=O1.eid AND TT.eid_to=CAST(T.uri_to AS INTEGER)); '''try:self.sql(sql%{'r':rtype.lower(),'et':etype_to.lower()ifetype_toelseu'','ef':etype_from.lower()ifetype_fromelseu''})exceptExceptionasex:self.logger.error("Can't insert relation %s: %s",rtype,ex)### SQL UTILITIES #########################################################defdrop_and_store_indexes_constraints(self,tablename):# Drop indexes and constraintsifnotself._constraint_table_created:# Create a table to save the constraints# Allow reload even after crashsql="CREATE TABLE cwmassive_constraints (origtable text, query text, type varchar(256))"self.sql(sql)self._constraint_table_created=Trueself._drop_table_constraints_indexes(tablename)def_drop_table_constraints_indexes(self,tablename):""" Drop and store table constraints and indexes """indexes,constraints=self._dbh.application_indexes_constraints(tablename)forname,queryinconstraints.items():sql='INSERT INTO cwmassive_constraints VALUES (%(e)s, %(c)s, %(t)s)'self.sql(sql,{'e':tablename,'c':query,'t':'constraint'})sql='ALTER TABLE %s DROP CONSTRAINT %s CASCADE'%(tablename,name)self.sql(sql)forname,queryinindexes.items():sql='INSERT INTO cwmassive_constraints VALUES (%(e)s, %(c)s, %(t)s)'self.sql(sql,{'e':tablename,'c':query,'t':'index'})sql='DROP INDEX %s'%nameself.sql(sql)defreapply_constraint_index(self,tablename):ifnotself._dbh.table_exists('cwmassive_constraints'):self.logger.info('The table cwmassive_constraints does not exist')returnsql='SELECT query FROM cwmassive_constraints WHERE origtable = %(e)s'crs=self.sql(sql,{'e':tablename})forquery,incrs.fetchall():self.sql(query)self.sql('DELETE FROM cwmassive_constraints WHERE origtable = %(e)s ''AND query = %(q)s',{'e':tablename,'q':query})def_drop_metatables_constraints(self):""" Drop all the constraints for the meta data"""fortablenamein('created_by_relation','owned_by_relation','is_instance_of_relation','is_relation','entities'):self.drop_and_store_indexes_constraints(tablename)def_create_metatables_constraints(self):""" Create all the constraints for the meta data"""fortablenamein('entities','created_by_relation','owned_by_relation','is_instance_of_relation','is_relation'):# Indexes and constraintsself.reapply_constraint_index(tablename)definit_relation_table(self,rtype):""" Get and remove all indexes for performance sake """# Create temporary tableifnotself.slave_modeandrtypenotinself._rtypes:sql="CREATE TABLE %s_relation_tmp (eid_from integer, eid_to integer)"%rtype.lower()self.sql(sql)# Drop indexes and constraintstablename='%s_relation'%rtype.lower()self.drop_and_store_indexes_constraints(tablename)# Push the etype in the initialized table for easier restartself.init_create_initialized_table()sql='INSERT INTO cwmassive_initialized VALUES (%(e)s, %(t)s)'self.sql(sql,{'e':rtype,'t':'rtype'})# Mark rtype as "initialized" for faster checkself._rtypes.add(rtype)definit_create_initialized_table(self):""" Create the cwmassive initialized table """ifnotself._initialized_table_created:sql="CREATE TABLE cwmassive_initialized (retype text, type varchar(128))"self.sql(sql)self._initialized_table_created=Truedefinit_etype_table(self,etype):""" Add eid sequence to a particular etype table and remove all indexes for performance sake """ifetypenotinself._entities:# Only for non-initialized etype and not slave mode storeifnotself.slave_mode:ifself.eids_seq_rangeisNone:# Eids are directly set by the entities_id_seq.# We attach this sequence to all the created etypes.sql=("ALTER TABLE cw_%s ALTER COLUMN cw_eid ""SET DEFAULT nextval('entities_id_seq')"%etype.lower())self.sql(sql)# Drop indexes and constraintstablename='cw_%s'%etype.lower()self.drop_and_store_indexes_constraints(tablename)# Push the etype in the initialized table for easier restartself.init_create_initialized_table()sql='INSERT INTO cwmassive_initialized VALUES (%(e)s, %(t)s)'self.sql(sql,{'e':etype,'t':'etype'})# Mark etype as "initialized" for faster checkself._entities.add(etype)### ENTITIES CREATION #####################################################def_get_eid_gen(self):""" Function getting the next eid. This is done by preselecting a given number of eids from the 'entities_id_seq', and then storing them"""whileTrue:last_eid=self._cnx.repo.system_source.create_eid(self._cnx,self.eids_seq_range)foreidinrange(last_eid-self.eids_seq_range+1,last_eid+1):yieldeiddef_apply_default_values(self,etype,kwargs):"""Apply the default values for a given etype, attribute and value."""default_values=self.default_values[etype]missing_keys=set(default_values)-set(kwargs)kwargs.update((key,default_values[key])forkeyinmissing_keys)# store api ################################################################defprepare_insert_entity(self,etype,**kwargs):"""Given an entity type, attributes and inlined relations, returns the inserted entity's eid. """# Init the table if necessaryself.init_etype_table(etype)# Add meta data if not givenif'modification_date'notinkwargs:kwargs['modification_date']=self._nowif'creation_date'notinkwargs:kwargs['creation_date']=self._nowif'cwuri'notinkwargs:kwargs['cwuri']=self._default_cwuri+str(self._count_cwuri)self._count_cwuri+=1if'eid'notinkwargsandself.eids_seq_rangeisnotNone:# If eid is not given and the eids sequence is set,# use the value from the sequencekwargs['eid']=self.get_next_eid()self._apply_default_values(etype,kwargs)self._data_entities[etype].append(kwargs)returnkwargs.get('eid')defprepare_insert_relation(self,eid_from,rtype,eid_to,**kwargs):"""Insert into the database a relation ``rtype`` between entities with eids ``eid_from`` and ``eid_to``. """# Init the table if necessaryself.init_relation_table(rtype)self._data_relations[rtype].append({'eid_from':eid_from,'eid_to':eid_to})defflush(self):"""Flush the data"""self.flush_entities()self.flush_internal_relations()self.flush_relations()defcommit(self):"""Commit the database transaction."""self.on_commit()super(MassiveObjectStore,self).commit()deffinish(self):"""Remove temporary tables and columns."""self.logger.info("Start cleaning")ifself.slave_mode:raiseRuntimeError('Store cleanup is not allowed in slave mode')self.logger.info("Start cleaning")# Cleanup relations tablesforetypeinself._init_uri_eid:self.sql('DROP TABLE uri_eid_%s'%etype.lower())# Remove relations tablesforrtypeinself._uri_rtypes:ifnotself._cnx.repo.schema.rschema(rtype).inlined:self.sql('DROP TABLE %(r)s_relation_iid_tmp'%{'r':rtype})else:self.logger.warning("inlined relation %s: no cleanup to be done for it"%rtype)# Get all the initialized etypes/rtypesifself._dbh.table_exists('cwmassive_initialized'):crs=self.sql('SELECT retype, type FROM cwmassive_initialized')forretype,_typeincrs.fetchall():self.logger.info('Cleanup for %s'%retype)if_type=='etype':# Cleanup entities tables - Recreate indexesself._cleanup_entities(retype)elif_type=='rtype':# Cleanup relations tablesself._cleanup_relations(retype)self.sql('DELETE FROM cwmassive_initialized WHERE retype = %(e)s',{'e':retype})# Create meta constraints (entities, is_instance_of, ...)self._create_metatables_constraints()# Delete the meta data tablefortable_namein('cwmassive_initialized','cwmassive_constraints','cwmassive_metadata'):ifself._dbh.table_exists(table_name):self.sql('DROP TABLE %s'%table_name)self.commit()### FLUSH #################################################################defon_commit(self):ifself.on_commit_callback:self.on_commit_callback()defon_rollback(self,exc,etype,data):ifself.on_rollback_callback:self.on_rollback_callback(exc,etype,data)self._cnx.rollback()else:raiseexcdefflush_internal_relations(self):""" Flush the relations data """forrtype,datainself._data_relations.items():ifnotdata:# There is no data for these etype for this flush round.continuebuf=pgstore._create_copyfrom_buffer(data,('eid_from','eid_to'))ifnotbuf:# The buffer is empty. This is probably due to error in _create_copyfrom_bufferraiseValueErrorcursor=self._cnx.cnxset.cu# Push into the tmp tablecursor.copy_from(buf,'%s_relation_tmp'%rtype.lower(),null='NULL',columns=('eid_from','eid_to'))# Clear data cacheself._data_relations[rtype]=[]defflush_entities(self):""" Flush the entities data """foretype,datainself._data_entities.items():ifnotdata:# There is no data for these etype for this flush round.continue# XXX It may be interresting to directly infer the columns'# names from the schema instead of using .keys()columns=data[0].keys()# XXX For now, the _create_copyfrom_buffer does a "row[column]"# which can lead to a key error.# Thus we should create dictionary with all the keys.columns=set()fordindata:columns.update(d.keys())_data=[]_base_data=dict.fromkeys(columns)fordindata:_d=_base_data.copy()_d.update(d)_data.append(_d)buf=pgstore._create_copyfrom_buffer(_data,columns)ifnotbuf:# The buffer is empty. This is probably due to error in _create_copyfrom_bufferraiseValueError('Error in buffer creation for etype %s'%etype)columns=['cw_%s'%attrforattrincolumns]cursor=self._cnx.cnxset.cutry:cursor.copy_from(buf,'cw_%s'%etype.lower(),null='NULL',columns=columns)exceptExceptionasexc:self.on_rollback(exc,etype,data)# Clear data cacheself._data_entities[etype]=[]self.flush_meta_data()defflush_meta_data(self):""" Flush the meta data (entities table, is_instance table, ...) """ifself.slave_mode:raiseRuntimeError('Flushing meta data is not allow in slave mode')ifnotself._dbh.table_exists('cwmassive_initialized'):self.logger.info('No information available for initialized etypes/rtypes')returnifnotself._metadata_table_created:# Keep the correctly flush meta data in databasesql="CREATE TABLE cwmassive_metadata (etype text)"self.sql(sql)self._metadata_table_created=Truecrs=self.sql('SELECT etype FROM cwmassive_metadata')already_flushed=set(efore,incrs.fetchall())crs=self.sql('SELECT retype FROM cwmassive_initialized WHERE type = %(t)s',{'t':'etype'})all_etypes=set(efore,incrs.fetchall())foretypeinall_etypes:ifetypenotinalready_flushed:# Deals with meta dataself.logger.info('Flushing meta data for %s'%etype)self.insert_massive_meta_data(etype)sql='INSERT INTO cwmassive_metadata VALUES (%(e)s)'self.sql(sql,{'e':etype})def_cleanup_entities(self,etype):""" Cleanup etype table """ifself.eids_seq_rangeisNone:# Remove DEFAULT eids sequence if addedsql='ALTER TABLE cw_%s ALTER COLUMN cw_eid DROP DEFAULT;'%etype.lower()self.sql(sql)# Create indexes and constraintstablename=SQL_PREFIX+etype.lower()self.reapply_constraint_index(tablename)def_cleanup_relations(self,rtype):""" Cleanup rtype table """# Push into relation table while removing duplicatesql='''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT T.eid_from, T.eid_to FROM %(r)s_relation_tmp AS T WHERE NOT EXISTS (SELECT 1 FROM %(r)s_relation AS TT WHERE TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);'''%{'r':rtype}self.sql(sql)# Drop temporary relation tablesql=('DROP TABLE %(r)s_relation_tmp'%{'r':rtype.lower()})self.sql(sql)# Create indexes and constraintstablename='%s_relation'%rtype.lower()self.reapply_constraint_index(tablename)definsert_massive_meta_data(self,etype):""" Massive insertion of meta data for a given etype, based on SQL statements. """# Push data - Use coalesce to avoid NULL (and get 0), if there is no# entities of this type in the entities table.# Meta data relationsself.metagen_push_relation(etype,self._cnx.user.eid,'created_by_relation')self.metagen_push_relation(etype,self._cnx.user.eid,'owned_by_relation')self.metagen_push_relation(etype,self.source.eid,'cw_source_relation')self.metagen_push_relation(etype,self._etype_eid_idx[etype],'is_relation')self.metagen_push_relation(etype,self._etype_eid_idx[etype],'is_instance_of_relation')sql=("INSERT INTO entities (eid, type, asource, extid) ""SELECT cw_eid, '%s', 'system', NULL FROM cw_%s ""WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"%(etype,etype.lower()))self.sql(sql)defmetagen_push_relation(self,etype,eid_to,rtype):sql=("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s ""WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"%(rtype,eid_to,etype.lower()))self.sql(sql)### CONSTRAINTS MANAGEMENT FUNCTIONS ##########################################defget_size_constraints(schema):"""analyzes yams ``schema`` and returns the list of size constraints. The returned value is a dictionary mapping entity types to a sub-dictionnaries mapping attribute names -> max size. """size_constraints={}# iterates on all entity typesforeschemainschema.entities():# for each entity type, iterates on attribute definitionssize_constraints[eschema.type]=eschema_constraints={}forrschema,aschemaineschema.attribute_definitions():# for each attribute, if a size constraint is found,# append it to the size constraint listmaxsize=Nonerdef=rschema.rdef(eschema,aschema)forconstraintinrdef.constraints:ifisinstance(constraint,SizeConstraint):maxsize=constraint.maxeschema_constraints[rschema.type]=maxsizereturnsize_constraintsdefget_default_values(schema):"""analyzes yams ``schema`` and returns the list of default values. The returned value is a dictionary mapping entity types to a sub-dictionnaries mapping attribute names -> default values. """default_values={}# iterates on all entity typesforeschemainschema.entities():# for each entity type, iterates on attribute definitionsdefault_values[eschema.type]=eschema_constraints={}forrschema,_ineschema.attribute_definitions():# for each attribute, if a size constraint is found,# append it to the size constraint listifeschema.default(rschema.type)isnotNone:eschema_constraints[rschema.type]=eschema.default(rschema.type)returndefault_valuesclassPGHelper(object):def__init__(self,cnx,pg_schema='public'):self.cnx=cnx# Deals with pg schema, see #3216686self.pg_schema=pg_schemadefapplication_indexes_constraints(self,tablename):""" Get all the indexes/constraints for a given tablename """indexes=self.application_indexes(tablename)constraints=self.application_constraints(tablename)_indexes={}forname,queryinindexes.items():# Remove pkey indexes (automatically created by constraints)# Specific cases of primary key, see #3224079ifnamenotinconstraints:_indexes[name]=queryreturn_indexes,constraintsdeftable_exists(self,table_name):sql="SELECT * from information_schema.tables WHERE table_name=%(t)s AND table_schema=%(s)s"crs=self.cnx.system_sql(sql,{'t':table_name,'s':self.pg_schema})res=crs.fetchall()ifres:returnTruereturnFalse# def check_if_primary_key_exists_for_table(self, table_name):# sql = ("SELECT constraint_name FROM information_schema.table_constraints "# "WHERE constraint_type = 'PRIMARY KEY' AND table_name=%(t)s AND table_schema=%(s)s")# crs = self.cnx.system_sql(sql, {'t': table_name, 's': self.pg_schema})# res = crs.fetchall()# if res:# return True# return Falsedefindex_query(self,name):"""Get the request to be used to recreate the index"""returnself.cnx.system_sql("SELECT pg_get_indexdef(c.oid) ""from pg_catalog.pg_class c ""LEFT JOIN pg_catalog.pg_namespace n ""ON n.oid = c.relnamespace ""WHERE c.relname = %(r)s AND n.nspname=%(n)s",{'r':name,'n':self.pg_schema}).fetchone()[0]defconstraint_query(self,name):"""Get the request to be used to recreate the constraint"""returnself.cnx.system_sql("SELECT pg_get_constraintdef(c.oid) ""from pg_catalog.pg_constraint c ""LEFT JOIN pg_catalog.pg_namespace n ""ON n.oid = c.connamespace ""WHERE c.conname = %(r)s AND n.nspname=%(n)s",{'r':name,'n':self.pg_schema}).fetchone()[0]defapplication_indexes(self,tablename):""" Iterate over all the indexes """# This SQL query (cf http://www.postgresql.org/message-id/432F450F.4080700@squiz.net)# aims at getting all the indexes for each table.sql='''SELECT c.relname as "Name" FROM pg_catalog.pg_class c JOIN pg_catalog.pg_index i ON i.indexrelid = c.oid JOIN pg_catalog.pg_class c2 ON i.indrelid = c2.oid LEFT JOIN pg_catalog.pg_user u ON u.usesysid = c.relowner LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace WHERE c.relkind IN ('i','') AND c2.relname = '%s' AND n.nspname NOT IN ('pg_catalog', 'pg_toast') AND pg_catalog.pg_table_is_visible(c.oid);'''%tablenameindexes_list=self.cnx.system_sql(sql).fetchall()indexes={}forname,inindexes_list:indexes[name]=self.index_query(name)returnindexesdefapplication_constraints(self,tablename):""" Iterate over all the constraints """sql='''SELECT i.conname as "Name" FROM pg_catalog.pg_class c JOIN pg_catalog.pg_constraint i ON i.conrelid = c.oid JOIN pg_catalog.pg_class c2 ON i.conrelid=c2.oid LEFT JOIN pg_catalog.pg_user u ON u.usesysid = c.relowner LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace WHERE c2.relname = '%s' AND n.nspname NOT IN ('pg_catalog', 'pg_toast') AND pg_catalog.pg_table_is_visible(c.oid);'''%tablenameindexes_list=self.cnx.system_sql(sql).fetchall()constraints={}forname,inindexes_list:query=self.constraint_query(name)constraints[name]='ALTER TABLE %s ADD CONSTRAINT %s%s'%(tablename,name,query)returnconstraints