[pkg] Set version to 3.22.2.dev0
So that cubes used in test dependencies do not install a released CubicWeb.
# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""Adapters for native cubicweb sources.Notes:* extid (aka external id, the primary key of an entity in the external source from which it comes from) are stored in a varchar column encoded as a base64 string. This is because it should actually be Bytes but we want an index on it for fast querying."""from__future__importprint_function__docformat__="restructuredtext en"fromthreadingimportLockfromdatetimeimportdatetimefrombase64importb64encodefromcontextlibimportcontextmanagerfromos.pathimportbasenameimportreimportitertoolsimportzipfileimportloggingimportsysfromsiximportPY2,text_type,binary_type,string_typesfromsix.movesimportrange,cPickleaspicklefromlogilab.common.decoratorsimportcached,clear_cachefromlogilab.common.configurationimportMethodfromlogilab.common.shellutilsimportgetlogin,ASKfromlogilab.databaseimportget_db_helper,sqlgenfromyams.schemaimportrole_namefromcubicwebimport(UnknownEid,AuthenticationError,ValidationError,Binary,UniqueTogetherError,UndoTransactionException,ViolatedConstraint)fromcubicwebimporttransactionastx,server,neg_rolefromcubicweb.utilsimportQueryCachefromcubicweb.schemaimportVIRTUAL_RTYPESfromcubicweb.cwconfigimportCubicWebNoAppConfigurationfromcubicweb.serverimporthookfromcubicweb.serverimportschema2sqlasy2sqlfromcubicweb.server.utilsimportcrypt_password,verify_and_updatefromcubicweb.server.sqlutilsimportSQL_PREFIX,SQLAdapterMixInfromcubicweb.server.rqlannotationimportset_qdatafromcubicweb.server.hookimportCleanupDeletedEidsCacheOpfromcubicweb.server.editionimportEditedEntityfromcubicweb.server.sourcesimportAbstractSource,dbg_st_search,dbg_resultsfromcubicweb.server.sources.rql2sqlimportSQLGeneratorfromcubicweb.statsd_loggerimportstatsd_timeitATTR_MAP={}NONSYSTEM_ETYPES=set()NONSYSTEM_RELATIONS=set()classLogCursor(object):def__init__(self,cursor):self.cu=cursordefexecute(self,query,args=None):"""Execute a query. it's a function just so that it shows up in profiling """ifserver.DEBUG&server.DBG_SQL:print('exec',query,args)try:self.cu.execute(str(query),args)exceptExceptionasex:print("sql: %r\n args: %s\ndbms message: %r"%(query,args,ex.args[0]))raisedeffetchall(self):returnself.cu.fetchall()deffetchone(self):returnself.cu.fetchone()defsql_or_clauses(sql,clauses):select,restr=sql.split(' WHERE ',1)restrclauses=restr.split(' AND ')forclauseinclauses:restrclauses.remove(clause)ifrestrclauses:restr='%s AND (%s)'%(' AND '.join(restrclauses),' OR '.join(clauses))else:restr='(%s)'%' OR '.join(clauses)return'%s WHERE %s'%(select,restr)defrdef_table_column(rdef):"""return table and column used to store the given relation definition in the database """return(SQL_PREFIX+str(rdef.subject),SQL_PREFIX+str(rdef.rtype))defrdef_physical_info(dbhelper,rdef):"""return backend type and a boolean flag if NULL values should be allowed for a given relation definition """ifnotrdef.object.final:returndbhelper.TYPE_MAPPING['Int']coltype=y2sql.type_from_rdef(dbhelper,rdef,creating=False)allownull=rdef.cardinality[0]!='1'returncoltype,allownullclass_UndoException(Exception):"""something went wrong during undoing"""def__unicode__(self):"""Called by the unicode builtin; should return a Unicode object Type of _UndoException message must be `unicode` by design in CubicWeb. """assertisinstance(self.args[0],text_type)returnself.args[0]def_undo_check_relation_target(tentity,rdef,role):"""check linked entity has not been redirected for this relation"""card=rdef.role_cardinality(role)ifcardin'?1'andtentity.related(rdef.rtype,role):raise_UndoException(tentity._cw._("Can't restore %(role)s relation %(rtype)s to entity %(eid)s which ""is already linked using this relation.")%{'role':neg_role(role),'rtype':rdef.rtype,'eid':tentity.eid})def_undo_rel_info(cnx,subj,rtype,obj):entities=[]forrole,eidin(('subject',subj),('object',obj)):try:entities.append(cnx.entity_from_eid(eid))exceptUnknownEid:raise_UndoException(cnx._("Can't restore relation %(rtype)s, %(role)s entity %(eid)s"" doesn't exist anymore.")%{'role':cnx._(role),'rtype':cnx._(rtype),'eid':eid})sentity,oentity=entitiestry:rschema=cnx.vreg.schema.rschema(rtype)rdef=rschema.rdefs[(sentity.cw_etype,oentity.cw_etype)]exceptKeyError:raise_UndoException(cnx._("Can't restore relation %(rtype)s between %(subj)s and ""%(obj)s, that relation does not exists anymore in the ""schema.")%{'rtype':cnx._(rtype),'subj':subj,'obj':obj})returnsentity,oentity,rdefdef_undo_has_later_transaction(cnx,eid):returncnx.system_sql('''\SELECT T.tx_uuid FROM transactions AS TREF, transactions AS TWHERE TREF.tx_uuid='%(txuuid)s' AND T.tx_uuid!='%(txuuid)s'AND T.tx_time>=TREF.tx_timeAND (EXISTS(SELECT 1 FROM tx_entity_actions AS TEA WHERE TEA.tx_uuid=T.tx_uuid AND TEA.eid=%(eid)s) OR EXISTS(SELECT 1 FROM tx_relation_actions as TRA WHERE TRA.tx_uuid=T.tx_uuid AND ( TRA.eid_from=%(eid)s OR TRA.eid_to=%(eid)s)) )'''%{'txuuid':cnx.transaction_data['undoing_uuid'],'eid':eid}).fetchone()classDefaultEidGenerator(object):__slots__=('source','cnx','lock')def__init__(self,source):self.source=sourceself.cnx=Noneself.lock=Lock()defclose(self):ifself.cnx:self.cnx.close()self.cnx=Nonedefcreate_eid(self,_cnx,count=1):# lock needed to prevent 'Connection is busy with results for another# command (0)' errors with SQLServerassertcount>0withself.lock:returnself._create_eid(count)def_create_eid(self,count):# internal function doing the eid creation without locking.# needed for the recursive handling of disconnections (otherwise we# deadlock on self._eid_cnx_locksource=self.sourceifself.cnxisNone:self.cnx=source.get_connection()cnx=self.cnxtry:cursor=cnx.cursor()forsqlinsource.dbhelper.sqls_increment_numrange('entities_id_seq',count):cursor.execute(sql)eid=cursor.fetchone()[0]except(source.OperationalError,source.InterfaceError):# FIXME: better detection of deconnection pbsource.warning("trying to reconnect create eid connection")self.cnx=Nonereturnself._create_eid(count)exceptsource.DbapiErrorasexc:# We get this one with pyodbc and SQL Server when connection was resetifexc.args[0]=='08S01':source.warning("trying to reconnect create eid connection")self.cnx=Nonereturnself._create_eid(count)else:raiseexceptException:# WTF?cnx.rollback()self.cnx=Nonesource.exception('create eid failed in an unforeseen way on SQL statement %s',sql)raiseelse:cnx.commit()returneidclassSQLITEEidGenerator(object):__slots__=('source','lock')def__init__(self,source):self.source=sourceself.lock=Lock()defclose(self):passdefcreate_eid(self,cnx,count=1):assertcount>0source=self.sourcewithself.lock:forsqlinsource.dbhelper.sqls_increment_numrange('entities_id_seq',count):cursor=source.doexec(cnx,sql)returncursor.fetchone()[0]classNativeSQLSource(SQLAdapterMixIn,AbstractSource):"""adapter for source using the native cubicweb schema (see below) """sqlgen_class=SQLGeneratoroptions=(('db-driver',{'type':'string','default':'postgres',# XXX use choice type'help':'database driver (postgres, sqlite, sqlserver2005)','group':'native-source','level':0,}),('db-host',{'type':'string','default':'','help':'database host','group':'native-source','level':1,}),('db-port',{'type':'string','default':'','help':'database port','group':'native-source','level':1,}),('db-name',{'type':'string','default':Method('default_instance_id'),'help':'database name','group':'native-source','level':0,}),('db-namespace',{'type':'string','default':'','help':'database namespace (schema) name','group':'native-source','level':1,}),('db-user',{'type':'string','default':CubicWebNoAppConfiguration.mode=='user'andgetlogin()or'cubicweb','help':'database user','group':'native-source','level':0,}),('db-password',{'type':'password','default':'','help':'database password','group':'native-source','level':0,}),('db-encoding',{'type':'string','default':'utf8','help':'database encoding','group':'native-source','level':1,}),('db-extra-arguments',{'type':'string','default':'','help':'set to "Trusted_Connection" if you are using SQLServer and ''want trusted authentication for the database connection','group':'native-source','level':2,}),('db-statement-timeout',{'type':'int','default':0,'help':'sql statement timeout, in milliseconds (postgres only)','group':'native-source','level':2,}),)def__init__(self,repo,source_config,*args,**kwargs):SQLAdapterMixIn.__init__(self,source_config,repairing=repo.config.repairing)self.authentifiers=[LoginPasswordAuthentifier(self)]ifrepo.config['allow-email-login']:self.authentifiers.insert(0,EmailPasswordAuthentifier(self))AbstractSource.__init__(self,repo,source_config,*args,**kwargs)# sql generatorself._rql_sqlgen=self.sqlgen_class(self.schema,self.dbhelper,ATTR_MAP.copy())# full text index helperself.do_fti=notrepo.config['delay-full-text-indexation']# sql queries cacheself._cache=QueryCache(repo.config['rql-cache-size'])# (etype, attr) / storage mappingself._storages={}self.binary_to_str=self.dbhelper.dbapi_module.binary_to_strifself.dbdriver=='sqlite':self.eid_generator=SQLITEEidGenerator(self)else:self.eid_generator=DefaultEidGenerator(self)self.create_eid=self.eid_generator.create_eiddefcheck_config(self,source_entity):"""check configuration of source entity"""ifsource_entity.host_config:msg=source_entity._cw._('the system source has its configuration ''stored on the file-system')raiseValidationError(source_entity.eid,{role_name('config','subject'):msg})defadd_authentifier(self,authentifier):self.authentifiers.append(authentifier)authentifier.source=selfauthentifier.set_schema(self.schema)defreset_caches(self):"""method called during test to reset potential source caches"""self._cache=QueryCache(self.repo.config['rql-cache-size'])defclear_eid_cache(self,eid,etype):"""clear potential caches for the given eid"""self._cache.pop('Any X WHERE X eid %s, X is %s'%(eid,etype),None)self._cache.pop('Any X WHERE X eid %s'%eid,None)self._cache.pop('Any %s'%eid,None)@statsd_timeitdefsqlexec(self,cnx,sql,args=None):"""execute the query and return its result"""returnself.process_result(self.doexec(cnx,sql,args))definit_creating(self,cnxset=None):# check full text index availibilityifself.do_fti:ifcnxsetisNone:_cnxset=self.repo._get_cnxset()else:_cnxset=cnxsetifnotself.dbhelper.has_fti_table(_cnxset.cu):ifnotself.repo.config.creating:self.critical('no text index table')self.do_fti=FalseifcnxsetisNone:_cnxset.cnxset_freed()self.repo._free_cnxset(_cnxset)defbackup(self,backupfile,confirm,format='native'):"""method called to create a backup of the source's data"""ifformat=='portable':# ensure the schema is the one stored in the database: if repository# started in quick_start mode, the file system's one has been loaded# so force reloadifself.repo.config.quick_start:self.repo.set_schema(self.repo.deserialize_schema(),resetvreg=False)helper=DatabaseIndependentBackupRestore(self)self.close_source_connections()try:helper.backup(backupfile)finally:self.open_source_connections()elifformat=='native':self.close_source_connections()try:self.backup_to_file(backupfile,confirm)finally:self.open_source_connections()else:raiseValueError('Unknown format %r'%format)defrestore(self,backupfile,confirm,drop,format='native'):"""method called to restore a backup of source's data"""ifself.repo.config.init_cnxset_pool:self.close_source_connections()try:ifformat=='portable':helper=DatabaseIndependentBackupRestore(self)helper.restore(backupfile)elifformat=='native':self.restore_from_file(backupfile,confirm,drop=drop)else:raiseValueError('Unknown format %r'%format)finally:ifself.repo.config.init_cnxset_pool:self.open_source_connections()definit(self,activated,source_entity):try:# test if 'asource' column existsquery=self.dbhelper.sql_add_limit_offset('SELECT asource FROM entities',1)source_entity._cw.system_sql(query)exceptExceptionasex:self.eid_type_source=self.eid_type_source_pre_131super(NativeSQLSource,self).init(activated,source_entity)self.init_creating(source_entity._cw.cnxset)defshutdown(self):self.eid_generator.close()# XXX deprecates [un]map_attribute?defmap_attribute(self,etype,attr,cb,sourcedb=True):self._rql_sqlgen.attr_map[u'%s.%s'%(etype,attr)]=(cb,sourcedb)defunmap_attribute(self,etype,attr):self._rql_sqlgen.attr_map.pop(u'%s.%s'%(etype,attr),None)defset_storage(self,etype,attr,storage):storage_dict=self._storages.setdefault(etype,{})storage_dict[attr]=storageself.map_attribute(etype,attr,storage.callback,storage.is_source_callback)defunset_storage(self,etype,attr):self._storages[etype].pop(attr)# if etype has no storage left, remove the entryifnotself._storages[etype]:delself._storages[etype]self.unmap_attribute(etype,attr)defstorage(self,etype,attr):"""return the storage for the given entity type / attribute """try:returnself._storages[etype][attr]exceptKeyError:raiseException('no custom storage set for %s.%s'%(etype,attr))# ISource interface #######################################################@statsd_timeitdefcompile_rql(self,rql,sols):rqlst=self.repo.vreg.rqlhelper.parse(rql)rqlst.restricted_vars=()rqlst.children[0].solutions=solsself.repo.querier.sqlgen_annotate(rqlst)set_qdata(self.schema.rschema,rqlst,())returnrqlstdefset_schema(self,schema):"""set the instance'schema"""self._cache=QueryCache(self.repo.config['rql-cache-size'])self.cache_hit,self.cache_miss,self.no_cache=0,0,0self.schema=schematry:self._rql_sqlgen.schema=schemaexceptAttributeError:pass# __init__forauthentifierinself.authentifiers:authentifier.set_schema(self.schema)clear_cache(self,'need_fti_indexation')defsupport_entity(self,etype,write=False):"""return true if the given entity's type is handled by this adapter if write is true, return true only if it's a RW support """returnnotetypeinNONSYSTEM_ETYPESdefsupport_relation(self,rtype,write=False):"""return true if the given relation's type is handled by this adapter if write is true, return true only if it's a RW support """ifwrite:returnnotrtypeinNONSYSTEM_RELATIONS# due to current multi-sources implementation, the system source# can't claim not supporting a relationreturnTrue#not rtype == 'content_for'@statsd_timeitdefauthenticate(self,cnx,login,**kwargs):"""return CWUser eid for the given login and other authentication information found in kwargs, else raise `AuthenticationError` """forauthentifierinself.authentifiers:try:returnauthentifier.authenticate(cnx,login,**kwargs)exceptAuthenticationError:continueraiseAuthenticationError()defsyntax_tree_search(self,cnx,union,args=None,cachekey=None,varmap=None):"""return result from this source for a rql query (actually from a rql syntax tree and a solution dictionary mapping each used variable to a possible type). If cachekey is given, the query necessary to fetch the results (but not the results themselves) may be cached using this key. """assertdbg_st_search(self.uri,union,varmap,args,cachekey)# remember number of actually selected term (sql generation may append some)ifcachekeyisNone:self.no_cache+=1# generate sql query if we are able to do so (not supported types...)sql,qargs,cbs=self._rql_sqlgen.generate(union,args,varmap)else:# sql may be cachedtry:sql,qargs,cbs=self._cache[cachekey]self.cache_hit+=1exceptKeyError:self.cache_miss+=1sql,qargs,cbs=self._rql_sqlgen.generate(union,args,varmap)self._cache[cachekey]=sql,qargs,cbsargs=self.merge_args(args,qargs)assertisinstance(sql,string_types),repr(sql)cursor=self.doexec(cnx,sql,args)results=self.process_result(cursor,cnx,cbs)assertdbg_results(results)returnresults@contextmanagerdef_fixup_cw(self,cnx,entity):_cw=entity._cwentity._cw=cnxtry:yieldfinally:entity._cw=_cw@contextmanagerdef_storage_handler(self,cnx,entity,event):# 1/ memorize values as they are before the storage is called.# For instance, the BFSStorage will replace the `data`# binary value with a Binary containing the destination path# on the filesystem. To make the entity.data usage absolutely# transparent, we'll have to reset entity.data to its binary# value once the SQL query will be executedrestore_values=[]ifisinstance(entity,list):entities=entityelse:entities=[entity]etype=entities[0].__regid__forattr,storageinself._storages.get(etype,{}).items():forentityinentities:withself._fixup_cw(cnx,entity):ifevent=='deleted':storage.entity_deleted(entity,attr)else:edited=entity.cw_editedifattrinedited:handler=getattr(storage,'entity_%s'%event)to_restore=handler(entity,attr)restore_values.append((entity,attr,to_restore))try:yield# 2/ execute the source's instructionsfinally:# 3/ restore original valuesforentity,attr,valueinrestore_values:entity.cw_edited.edited_attribute(attr,value)defadd_entity(self,cnx,entity):"""add a new entity to the source"""withself._storage_handler(cnx,entity,'added'):attrs=self.preprocess_entity(entity)sql=self.sqlgen.insert(SQL_PREFIX+entity.cw_etype,attrs)self.doexec(cnx,sql,attrs)ifcnx.ertype_supports_undo(entity.cw_etype):self._record_tx_action(cnx,'tx_entity_actions',u'C',etype=text_type(entity.cw_etype),eid=entity.eid)defupdate_entity(self,cnx,entity):"""replace an entity in the source"""withself._storage_handler(cnx,entity,'updated'):attrs=self.preprocess_entity(entity)ifcnx.ertype_supports_undo(entity.cw_etype):changes=self._save_attrs(cnx,entity,attrs)self._record_tx_action(cnx,'tx_entity_actions',u'U',etype=text_type(entity.cw_etype),eid=entity.eid,changes=self._binary(pickle.dumps(changes)))sql=self.sqlgen.update(SQL_PREFIX+entity.cw_etype,attrs,['cw_eid'])self.doexec(cnx,sql,attrs)defdelete_entity(self,cnx,entity):"""delete an entity from the source"""withself._storage_handler(cnx,entity,'deleted'):ifcnx.ertype_supports_undo(entity.cw_etype):attrs=[SQL_PREFIX+r.typeforrinentity.e_schema.subject_relations()if(r.finalorr.inlined)andnotrinVIRTUAL_RTYPES]changes=self._save_attrs(cnx,entity,attrs)self._record_tx_action(cnx,'tx_entity_actions',u'D',etype=text_type(entity.cw_etype),eid=entity.eid,changes=self._binary(pickle.dumps(changes)))attrs={'cw_eid':entity.eid}sql=self.sqlgen.delete(SQL_PREFIX+entity.cw_etype,attrs)self.doexec(cnx,sql,attrs)defadd_relation(self,cnx,subject,rtype,object,inlined=False):"""add a relation to the source"""self._add_relations(cnx,rtype,[(subject,object)],inlined)ifcnx.ertype_supports_undo(rtype):self._record_tx_action(cnx,'tx_relation_actions',u'A',eid_from=subject,rtype=text_type(rtype),eid_to=object)defadd_relations(self,cnx,rtype,subj_obj_list,inlined=False):"""add a relations to the source"""self._add_relations(cnx,rtype,subj_obj_list,inlined)ifcnx.ertype_supports_undo(rtype):forsubject,objectinsubj_obj_list:self._record_tx_action(cnx,'tx_relation_actions',u'A',eid_from=subject,rtype=text_type(rtype),eid_to=object)def_add_relations(self,cnx,rtype,subj_obj_list,inlined=False):"""add a relation to the source"""sql=[]ifinlinedisFalse:attrs=[{'eid_from':subject,'eid_to':object}forsubject,objectinsubj_obj_list]sql.append((self.sqlgen.insert('%s_relation'%rtype,attrs[0]),attrs))else:# used by data importetypes={}forsubject,objectinsubj_obj_list:etype=cnx.entity_metas(subject)['type']ifetypeinetypes:etypes[etype].append((subject,object))else:etypes[etype]=[(subject,object)]forsubj_etype,subj_obj_listinetypes.items():attrs=[{'cw_eid':subject,SQL_PREFIX+rtype:object}forsubject,objectinsubj_obj_list]sql.append((self.sqlgen.update(SQL_PREFIX+etype,attrs[0],['cw_eid']),attrs))forstatement,attrsinsql:self.doexecmany(cnx,statement,attrs)defdelete_relation(self,cnx,subject,rtype,object):"""delete a relation from the source"""rschema=self.schema.rschema(rtype)self._delete_relation(cnx,subject,rtype,object,rschema.inlined)ifcnx.ertype_supports_undo(rtype):self._record_tx_action(cnx,'tx_relation_actions',u'R',eid_from=subject,rtype=text_type(rtype),eid_to=object)def_delete_relation(self,cnx,subject,rtype,object,inlined=False):"""delete a relation from the source"""ifinlined:table=SQL_PREFIX+cnx.entity_metas(subject)['type']column=SQL_PREFIX+rtypesql='UPDATE %s SET %s=NULL WHERE %seid=%%(eid)s'%(table,column,SQL_PREFIX)attrs={'eid':subject}else:attrs={'eid_from':subject,'eid_to':object}sql=self.sqlgen.delete('%s_relation'%rtype,attrs)self.doexec(cnx,sql,attrs)@statsd_timeitdefdoexec(self,cnx,query,args=None,rollback=True):"""Execute a query. it's a function just so that it shows up in profiling """cursor=cnx.cnxset.cuifserver.DEBUG&server.DBG_SQL:print('exec',query,args,cnx.cnxset.cnx)try:# str(query) to avoid error if it's a unicode stringcursor.execute(str(query),args)exceptExceptionasex:ifself.repo.config.mode!='test':# during test we get those message when trying to alter sqlite# db schemaself.info("sql: %r\n args: %s\ndbms message: %r",query,args,ex.args[0])ifrollback:try:cnx.cnxset.rollback()ifself.repo.config.mode!='test':self.debug('transaction has been rolled back')exceptExceptionasex:passifex.__class__.__name__=='IntegrityError':# need string comparison because of various backendsforarginex.args:# postgres, sqlservermo=re.search("unique_[a-z0-9]{32}",arg)ifmoisnotNone:raiseUniqueTogetherError(cnx,cstrname=mo.group(0))# old sqlitemo=re.search('columns? (.*) (?:is|are) not unique',arg)ifmoisnotNone:# sqlite in use# we left chop the 'cw_' prefix of attribute namesrtypes=[c.strip()[3:]forcinmo.group(1).split(',')]raiseUniqueTogetherError(cnx,rtypes=rtypes)# sqlite after http://www.sqlite.org/cgi/src/info/c80e229dd9c1230aifarg.startswith('UNIQUE constraint failed:'):# message looks like: "UNIQUE constraint failed: foo.cw_bar, foo.cw_baz"# so drop the prefix, split on comma, drop the tablenames, and drop "cw_"columns=arg.split(':',1)[1].split(',')rtypes=[c.split('.',1)[1].strip()[3:]forcincolumns]raiseUniqueTogetherError(cnx,rtypes=rtypes)mo=re.search('"cstr[a-f0-9]{32}"',arg)ifmoisnotNone:# postgresqlraiseViolatedConstraint(cnx,cstrname=mo.group(0)[1:-1])ifarg.startswith('CHECK constraint failed:'):# sqlite3 (new)raiseViolatedConstraint(cnx,cstrname=arg.split(':',1)[1].strip())mo=re.match('^constraint (cstr.*) failed$',arg)ifmoisnotNone:# sqlite3 (old)raiseViolatedConstraint(cnx,cstrname=mo.group(1))raisereturncursor@statsd_timeitdefdoexecmany(self,cnx,query,args):"""Execute a query. it's a function just so that it shows up in profiling """ifserver.DEBUG&server.DBG_SQL:print('execmany',query,'with',len(args),'arguments',cnx.cnxset.cnx)cursor=cnx.cnxset.cutry:# str(query) to avoid error if it's a unicode stringcursor.executemany(str(query),args)exceptExceptionasex:ifself.repo.config.mode!='test':# during test we get those message when trying to alter sqlite# db schemaself.critical("sql many: %r\n args: %s\ndbms message: %r",query,args,ex.args[0])try:cnx.cnxset.rollback()ifself.repo.config.mode!='test':self.critical('transaction has been rolled back')exceptException:passraise# short cut to method requiring advanced db helper usage ##################defupdate_rdef_column(self,cnx,rdef):"""update physical column for a relation definition (final or inlined) """table,column=rdef_table_column(rdef)coltype,allownull=rdef_physical_info(self.dbhelper,rdef)ifnotself.dbhelper.alter_column_support:self.error("backend can't alter %s.%s to %s%s",table,column,coltype,notallownulland'NOT NULL'or'')returnself.dbhelper.change_col_type(LogCursor(cnx.cnxset.cu),table,column,coltype,allownull)self.info('altered %s.%s: now %s%s',table,column,coltype,notallownulland'NOT NULL'or'')defupdate_rdef_null_allowed(self,cnx,rdef):"""update NULL / NOT NULL of physical column for a relation definition (final or inlined) """ifnotself.dbhelper.alter_column_support:# not supported (and NOT NULL not set by yams in that case, so no# worry)returntable,column=rdef_table_column(rdef)coltype,allownull=rdef_physical_info(self.dbhelper,rdef)self.dbhelper.set_null_allowed(LogCursor(cnx.cnxset.cu),table,column,coltype,allownull)defupdate_rdef_indexed(self,cnx,rdef):table,column=rdef_table_column(rdef)ifrdef.indexed:self.create_index(cnx,table,column)else:self.drop_index(cnx,table,column)defupdate_rdef_unique(self,cnx,rdef):table,column=rdef_table_column(rdef)ifrdef.constraint_by_type('UniqueConstraint'):self.create_index(cnx,table,column,unique=True)else:self.drop_index(cnx,table,column,unique=True)defcreate_index(self,cnx,table,column,unique=False):cursor=LogCursor(cnx.cnxset.cu)self.dbhelper.create_index(cursor,table,column,unique)defdrop_index(self,cnx,table,column,unique=False):cursor=LogCursor(cnx.cnxset.cu)self.dbhelper.drop_index(cursor,table,column,unique)# system source interface #################################################def_eid_type_source(self,cnx,eid,sql):try:res=self.doexec(cnx,sql).fetchone()ifresisnotNone:returnresexceptException:self.exception('failed to query entities table for eid %s',eid)raiseUnknownEid(eid)defeid_type_source(self,cnx,eid):# pylint: disable=E0202"""return a tuple (type, extid, source) for the entity with id <eid>"""sql='SELECT type, extid, asource FROM entities WHERE eid=%s'%eidres=self._eid_type_source(cnx,eid,sql)ifnotisinstance(res,list):res=list(res)res[-2]=self.decode_extid(res[-2])returnresdefeid_type_source_pre_131(self,cnx,eid):"""return a tuple (type, extid, source) for the entity with id <eid>"""sql='SELECT type, extid FROM entities WHERE eid=%s'%eidres=self._eid_type_source(cnx,eid,sql)ifnotisinstance(res,list):res=list(res)res[-1]=self.decode_extid(res[-1])res.append("system")returnresdefextid2eid(self,cnx,extid):"""get eid from an external id. Return None if no record found."""assertisinstance(extid,binary_type)args={'x':b64encode(extid).decode('ascii')}cursor=self.doexec(cnx,'SELECT eid FROM entities WHERE extid=%(x)s',args)# XXX testing rowcount cause strange bug with sqlite, results are there# but rowcount is 0#if cursor.rowcount > 0:try:result=cursor.fetchone()ifresult:returnresult[0]exceptException:passcursor=self.doexec(cnx,'SELECT eid FROM moved_entities WHERE extid=%(x)s',args)try:result=cursor.fetchone()ifresult:# entity was moved to the system source, return negative# number to tell the external source to ignore itreturn-result[0]exceptException:passreturnNonedef_handle_is_relation_sql(self,cnx,sql,attrs):""" Handler for specific is_relation sql that may be overwritten in some stores"""self.doexec(cnx,sql%attrs)_handle_insert_entity_sql=doexec_handle_is_instance_of_sql=_handle_source_relation_sql=_handle_is_relation_sqldefadd_info(self,cnx,entity,source,extid):"""add type and source info for an eid into the system table"""assertcnx.cnxsetisnotNone# begin by inserting eid/type/source/extid into the entities tableifextidisnotNone:assertisinstance(extid,binary_type)extid=b64encode(extid).decode('ascii')attrs={'type':text_type(entity.cw_etype),'eid':entity.eid,'extid':extid,'asource':text_type(source.uri)}self._handle_insert_entity_sql(cnx,self.sqlgen.insert('entities',attrs),attrs)# insert core relations: is, is_instance_of and cw_sourceifentity.e_schema.eidisnotNone:# else schema has not yet been serializedself._handle_is_relation_sql(cnx,'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)',(entity.eid,entity.e_schema.eid))foreschemainentity.e_schema.ancestors()+[entity.e_schema]:self._handle_is_relation_sql(cnx,'INSERT INTO is_instance_of_relation(eid_from,eid_to) VALUES (%s,%s)',(entity.eid,eschema.eid))if'CWSource'inself.schemaandsource.eidisnotNone:# else, cw < 3.10self._handle_is_relation_sql(cnx,'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)',(entity.eid,source.eid))# now we can update the full text indexifself.need_fti_indexation(entity.cw_etype):self.index_entity(cnx,entity=entity)defupdate_info(self,cnx,entity,need_fti_update):"""mark entity as being modified, fulltext reindex if needed"""ifneed_fti_update:# reindex the entity only if this query is updating at least# one indexable attributeself.index_entity(cnx,entity=entity)defdelete_info_multi(self,cnx,entities):"""delete system information on deletion of a list of entities with the same etype and belinging to the same source * update the fti * remove record from the `entities` table """self.fti_unindex_entities(cnx,entities)attrs={'eid':'(%s)'%','.join([str(_e.eid)for_einentities])}self.doexec(cnx,self.sqlgen.delete_many('entities',attrs),attrs)# undo support #############################################################defundoable_transactions(self,cnx,ueid=None,**actionfilters):"""See :class:`cubicweb.repoapi.Connection.undoable_transactions`"""# force filtering to connection's user if not a managerifnotcnx.user.is_in_group('managers'):ueid=cnx.user.eidrestr={}ifueidisnotNone:restr['tx_user']=ueidsql=self.sqlgen.select('transactions',restr,('tx_uuid','tx_time','tx_user'))ifactionfilters:# we will need subqueries to filter transactions according to# actions donetearestr={}# filters on the tx_entity_actions tabletrarestr={}# filters on the tx_relation_actions tablegenrestr={}# generic filters, appliyable to both table# unless public explicitly set to false, we only consider public# actionsifactionfilters.pop('public',True):genrestr['txa_public']=True# put additional filters in trarestr and/or tearestrforkey,valinactionfilters.items():ifkey=='etype':# filtering on etype implies filtering on entity actions# only, and with no eid specifiedassertactionfilters.get('action','C')in'CUD'assertnot'eid'inactionfilterstearestr['etype']=text_type(val)elifkey=='eid':# eid filter may apply to 'eid' of tx_entity_actions or to# 'eid_from' OR 'eid_to' of tx_relation_actionsifactionfilters.get('action','C')in'CUD':tearestr['eid']=valifactionfilters.get('action','A')in'AR':trarestr['eid_from']=valtrarestr['eid_to']=valelifkey=='action':ifvalin'CUD':tearestr['txa_action']=text_type(val)else:assertvalin'AR'trarestr['txa_action']=text_type(val)else:raiseAssertionError('unknow filter %s'%key)asserttrarestrortearestr,"can't only filter on 'public'"subqsqls=[]# append subqueries to the original query, using EXISTS()iftrarestror(genrestrandnottearestr):trarestr.update(genrestr)trasql=self.sqlgen.select('tx_relation_actions',trarestr,('1',))if'eid_from'intrarestr:# replace AND by OR between eid_from/eid_to restrictiontrasql=sql_or_clauses(trasql,['eid_from = %(eid_from)s','eid_to = %(eid_to)s'])trasql+=' AND transactions.tx_uuid=tx_relation_actions.tx_uuid'subqsqls.append('EXISTS(%s)'%trasql)iftearestror(genrestrandnottrarestr):tearestr.update(genrestr)teasql=self.sqlgen.select('tx_entity_actions',tearestr,('1',))teasql+=' AND transactions.tx_uuid=tx_entity_actions.tx_uuid'subqsqls.append('EXISTS(%s)'%teasql)ifrestr:sql+=' AND %s'%' OR '.join(subqsqls)else:sql+=' WHERE %s'%' OR '.join(subqsqls)restr.update(trarestr)restr.update(tearestr)# we want results ordered by transaction's time descendantsql+=' ORDER BY tx_time DESC'cu=self.doexec(cnx,sql,restr)# turn results into transaction objectsreturn[tx.Transaction(cnx,*args)forargsincu.fetchall()]deftx_info(self,cnx,txuuid):"""See :class:`cubicweb.repoapi.Connection.transaction_info`"""returntx.Transaction(cnx,txuuid,*self._tx_info(cnx,text_type(txuuid)))deftx_actions(self,cnx,txuuid,public):"""See :class:`cubicweb.repoapi.Connection.transaction_actions`"""txuuid=text_type(txuuid)self._tx_info(cnx,txuuid)restr={'tx_uuid':txuuid}ifpublic:restr['txa_public']=True# XXX use generator to avoid loading everything in memory?sql=self.sqlgen.select('tx_entity_actions',restr,('txa_action','txa_public','txa_order','etype','eid','changes'))withcnx.ensure_cnx_set:cu=self.doexec(cnx,sql,restr)actions=[tx.EntityAction(a,p,o,et,e,candpickle.loads(self.binary_to_str(c)))fora,p,o,et,e,cincu.fetchall()]sql=self.sqlgen.select('tx_relation_actions',restr,('txa_action','txa_public','txa_order','rtype','eid_from','eid_to'))withcnx.ensure_cnx_set:cu=self.doexec(cnx,sql,restr)actions+=[tx.RelationAction(*args)forargsincu.fetchall()]returnsorted(actions,key=lambdax:x.order)defundo_transaction(self,cnx,txuuid):"""See :class:`cubicweb.repoapi.Connection.undo_transaction` important note: while undoing of a transaction, only hooks in the 'integrity', 'activeintegrity' and 'undo' categories are called. """errors=[]cnx.transaction_data['undoing_uuid']=txuuidwithcnx.deny_all_hooks_but('integrity','activeintegrity','undo'):withcnx.security_enabled(read=False):foractioninreversed(self.tx_actions(cnx,txuuid,False)):undomethod=getattr(self,'_undo_%s'%action.action.lower())errors+=undomethod(cnx,action)# remove the transactions recordself.doexec(cnx,"DELETE FROM transactions WHERE tx_uuid='%s'"%txuuid)iferrors:raiseUndoTransactionException(txuuid,errors)else:returndefstart_undoable_transaction(self,cnx,uuid):"""connection callback to insert a transaction record in the transactions table when some undoable transaction is started """ueid=cnx.user.eidattrs={'tx_uuid':uuid,'tx_user':ueid,'tx_time':datetime.utcnow()}self.doexec(cnx,self.sqlgen.insert('transactions',attrs),attrs)def_save_attrs(self,cnx,entity,attrs):"""return a pickleable dictionary containing current values for given attributes of the entity """restr={'cw_eid':entity.eid}sql=self.sqlgen.select(SQL_PREFIX+entity.cw_etype,restr,attrs)cu=self.doexec(cnx,sql,restr)values=dict(zip(attrs,cu.fetchone()))# ensure backend specific binary are converted back to stringeschema=entity.e_schemaforcolumninattrs:# [3:] remove 'cw_' prefixattr=column[3:]ifnoteschema.subjrels[attr].final:continueifeschema.destination(attr)in('Password','Bytes'):value=values[column]ifvalueisnotNone:values[column]=self.binary_to_str(value)returnvaluesdef_record_tx_action(self,cnx,table,action,**kwargs):"""record a transaction action in the given table (either 'tx_entity_actions' or 'tx_relation_action') """kwargs['tx_uuid']=cnx.transaction_uuid()kwargs['txa_action']=actionkwargs['txa_order']=cnx.transaction_inc_action_counter()kwargs['txa_public']=notcnx.hooks_in_progressself.doexec(cnx,self.sqlgen.insert(table,kwargs),kwargs)def_tx_info(self,cnx,txuuid):"""return transaction's time and user of the transaction with the given uuid. raise `NoSuchTransaction` if there is no such transaction of if the connection's user isn't allowed to see it. """restr={'tx_uuid':txuuid}sql=self.sqlgen.select('transactions',restr,('tx_time','tx_user'))cu=self.doexec(cnx,sql,restr)try:time,ueid=cu.fetchone()exceptTypeError:raisetx.NoSuchTransaction(txuuid)ifnot(cnx.user.is_in_group('managers')orcnx.user.eid==ueid):raisetx.NoSuchTransaction(txuuid)returntime,ueiddef_reedit_entity(self,entity,changes,err):cnx=entity._cweid=entity.eidentity.cw_edited=edited=EditedEntity(entity)# check for schema changes, entities linked through inlined relation# still exists, rewrap binary valueseschema=entity.e_schemagetrschema=eschema.subjrelsforcolumn,valueinchanges.items():rtype=column[len(SQL_PREFIX):]ifrtype=="eid":continue# XXX should even `eid` be stored in action changes?try:rschema=getrschema[rtype]exceptKeyError:err(cnx._("can't restore relation %(rtype)s of entity %(eid)s, ""this relation does not exist in the schema anymore.")%{'rtype':rtype,'eid':eid})ifnotrschema.final:ifnotrschema.inlined:assertvalueisNone# rschema is an inlined relationelifvalueisnotNone:# not a deletion: we must put something in editedtry:entity._cw.entity_from_eid(value)# check target existsedited[rtype]=valueexceptUnknownEid:err(cnx._("can't restore entity %(eid)s of type %(eschema)s, ""target of %(rtype)s (eid %(value)s) does not exist any longer")%locals())changes[column]=Noneelifeschema.destination(rtype)in('Bytes','Password'):changes[column]=self._binary(value)edited[rtype]=Binary(value)elifPY2andisinstance(value,str):edited[rtype]=text_type(value,cnx.encoding,'replace')else:edited[rtype]=value# This must only be done after init_entitiy_caches : defered in calling functions# edited.check()def_undo_d(self,cnx,action):"""undo an entity deletion"""errors=[]err=errors.appendeid=action.eidetype=action.etype_=cnx._# get an entity instancetry:entity=self.repo.vreg['etypes'].etype_class(etype)(cnx)exceptException:err("can't restore entity %s of type %s, type no more supported"%(eid,etype))returnerrorsself._reedit_entity(entity,action.changes,err)entity.eid=eidcnx.repo.init_entity_caches(cnx,entity,self)entity.cw_edited.check()self.repo.hm.call_hooks('before_add_entity',cnx,entity=entity)# restore the entityaction.changes['cw_eid']=eid# restore record in entities (will update fti if needed)self.add_info(cnx,entity,self,None)sql=self.sqlgen.insert(SQL_PREFIX+etype,action.changes)self.doexec(cnx,sql,action.changes)self.repo.hm.call_hooks('after_add_entity',cnx,entity=entity)returnerrorsdef_undo_r(self,cnx,action):"""undo a relation removal"""errors=[]subj,rtype,obj=action.eid_from,action.rtype,action.eid_totry:sentity,oentity,rdef=_undo_rel_info(cnx,subj,rtype,obj)except_UndoExceptionasex:errors.append(text_type(ex))else:forrole,entityin(('subject',sentity),('object',oentity)):try:_undo_check_relation_target(entity,rdef,role)except_UndoExceptionasex:errors.append(text_type(ex))continueifnoterrors:self.repo.hm.call_hooks('before_add_relation',cnx,eidfrom=subj,rtype=rtype,eidto=obj)# add relation in the databaseself._add_relations(cnx,rtype,[(subj,obj)],rdef.rtype.inlined)# set related cachecnx.update_rel_cache_add(subj,rtype,obj,rdef.rtype.symmetric)self.repo.hm.call_hooks('after_add_relation',cnx,eidfrom=subj,rtype=rtype,eidto=obj)returnerrorsdef_undo_c(self,cnx,action):"""undo an entity creation"""eid=action.eid# XXX done to avoid fetching all remaining relation for the entity# we should find an efficient way to do this (keeping current veolidf# massive deletion performance)if_undo_has_later_transaction(cnx,eid):msg=cnx._('some later transaction(s) touch entity, undo them ''first')raiseValidationError(eid,{None:msg})etype=action.etype# get an entity instancetry:entity=self.repo.vreg['etypes'].etype_class(etype)(cnx)exceptException:return[cnx._("Can't undo creation of entity %(eid)s of type %(etype)s, type ""no more supported"%{'eid':eid,'etype':etype})]entity.eid=eid# for proper eid/type cache updateCleanupDeletedEidsCacheOp.get_instance(cnx).add_data(eid)self.repo.hm.call_hooks('before_delete_entity',cnx,entity=entity)# remove is / is_instance_of which are added using sql by hooks, hence# unvisible as transaction actionself.doexec(cnx,'DELETE FROM is_relation WHERE eid_from=%s'%eid)self.doexec(cnx,'DELETE FROM is_instance_of_relation WHERE eid_from=%s'%eid)self.doexec(cnx,'DELETE FROM cw_source_relation WHERE eid_from=%s'%eid)# XXX check removal of inlined relation?# delete the entityattrs={'cw_eid':eid}sql=self.sqlgen.delete(SQL_PREFIX+entity.cw_etype,attrs)self.doexec(cnx,sql,attrs)# remove record from entities (will update fti if needed)self.delete_info_multi(cnx,[entity])self.repo.hm.call_hooks('after_delete_entity',cnx,entity=entity)return()def_undo_u(self,cnx,action):"""undo an entity update"""errors=[]err=errors.appendtry:entity=cnx.entity_from_eid(action.eid)exceptUnknownEid:err(cnx._("can't restore state of entity %s, it has been ""deleted inbetween")%action.eid)returnerrorsself._reedit_entity(entity,action.changes,err)entity.cw_edited.check()self.repo.hm.call_hooks('before_update_entity',cnx,entity=entity)sql=self.sqlgen.update(SQL_PREFIX+entity.cw_etype,action.changes,['cw_eid'])self.doexec(cnx,sql,action.changes)self.repo.hm.call_hooks('after_update_entity',cnx,entity=entity)returnerrorsdef_undo_a(self,cnx,action):"""undo a relation addition"""errors=[]subj,rtype,obj=action.eid_from,action.rtype,action.eid_totry:sentity,oentity,rdef=_undo_rel_info(cnx,subj,rtype,obj)except_UndoExceptionasex:errors.append(text_type(ex))else:rschema=rdef.rtypeifrschema.inlined:sql='SELECT 1 FROM cw_%s WHERE cw_eid=%s and cw_%s=%s'\%(sentity.cw_etype,subj,rtype,obj)else:sql='SELECT 1 FROM %s_relation WHERE eid_from=%s and eid_to=%s'\%(rtype,subj,obj)cu=self.doexec(cnx,sql)ifcu.fetchone()isNone:errors.append(cnx._("Can't undo addition of relation %(rtype)s from %(subj)s to"" %(obj)s, doesn't exist anymore"%locals()))ifnoterrors:self.repo.hm.call_hooks('before_delete_relation',cnx,eidfrom=subj,rtype=rtype,eidto=obj)# delete relation from the databaseself._delete_relation(cnx,subj,rtype,obj,rschema.inlined)# set related cachecnx.update_rel_cache_del(subj,rtype,obj,rschema.symmetric)self.repo.hm.call_hooks('after_delete_relation',cnx,eidfrom=subj,rtype=rtype,eidto=obj)returnerrors# full text index handling #################################################@cacheddefneed_fti_indexation(self,etype):eschema=self.schema.eschema(etype)ifany(eschema.indexable_attributes()):returnTrueifany(eschema.fulltext_containers()):returnTruereturnFalsedefindex_entity(self,cnx,entity):"""create an operation to [re]index textual content of the given entity on commit """ifself.do_fti:FTIndexEntityOp.get_instance(cnx).add_data(entity.eid)deffti_unindex_entities(self,cnx,entities):"""remove text content for entities from the full text index """cursor=cnx.cnxset.cucursor_unindex_object=self.dbhelper.cursor_unindex_objecttry:forentityinentities:cursor_unindex_object(entity.eid,cursor)exceptException:# let KeyboardInterrupt / SystemExit propagateself.exception('error while unindexing %s',entity)deffti_index_entities(self,cnx,entities):"""add text content of created/modified entities to the full text index """cursor_index_object=self.dbhelper.cursor_index_objectcursor=cnx.cnxset.cutry:# use cursor_index_object, not cursor_reindex_object since# unindexing done in the FTIndexEntityOpforentityinentities:cursor_index_object(entity.eid,entity.cw_adapt_to('IFTIndexable'),cursor)exceptException:# let KeyboardInterrupt / SystemExit propagateself.exception('error while indexing %s',entity)classFTIndexEntityOp(hook.DataOperationMixIn,hook.LateOperation):"""operation to delay entity full text indexation to commit since fti indexing may trigger discovery of other entities, it should be triggered on precommit, not commit, and this should be done after other precommit operation which may add relations to the entity """defprecommit_event(self):cnx=self.cnxsource=cnx.repo.system_sourcependingeids=cnx.transaction_data.get('pendingeids',())done=cnx.transaction_data.setdefault('indexedeids',set())to_reindex=set()foreidinself.get_data():ifeidinpendingeidsoreidindone:# entity added and deleted in the same transaction or already# processedcontinuedone.add(eid)iftindexable=cnx.entity_from_eid(eid).cw_adapt_to('IFTIndexable')to_reindex|=set(iftindexable.fti_containers())source.fti_unindex_entities(cnx,to_reindex)source.fti_index_entities(cnx,to_reindex)defsql_schema(driver):helper=get_db_helper(driver)typemap=helper.TYPE_MAPPINGschema="""/* Create the repository's system database */%sCREATE TABLE entities ( eid INTEGER PRIMARY KEY NOT NULL, type VARCHAR(64) NOT NULL, asource VARCHAR(128) NOT NULL, extid VARCHAR(256));;CREATE INDEX entities_type_idx ON entities(type);;CREATE TABLE moved_entities ( eid INTEGER PRIMARY KEY NOT NULL, extid VARCHAR(256) UNIQUE NOT NULL);;CREATE TABLE transactions ( tx_uuid CHAR(32) PRIMARY KEY NOT NULL, tx_user INTEGER NOT NULL, tx_time %s NOT NULL);;CREATE INDEX transactions_tx_user_idx ON transactions(tx_user);;CREATE INDEX transactions_tx_time_idx ON transactions(tx_time);;CREATE TABLE tx_entity_actions ( tx_uuid CHAR(32) REFERENCES transactions(tx_uuid) ON DELETE CASCADE, txa_action CHAR(1) NOT NULL, txa_public %s NOT NULL, txa_order INTEGER, eid INTEGER NOT NULL, etype VARCHAR(64) NOT NULL, changes %s);;CREATE INDEX tx_entity_actions_txa_action_idx ON tx_entity_actions(txa_action);;CREATE INDEX tx_entity_actions_txa_public_idx ON tx_entity_actions(txa_public);;CREATE INDEX tx_entity_actions_eid_idx ON tx_entity_actions(eid);;CREATE INDEX tx_entity_actions_etype_idx ON tx_entity_actions(etype);;CREATE INDEX tx_entity_actions_tx_uuid_idx ON tx_entity_actions(tx_uuid);;CREATE TABLE tx_relation_actions ( tx_uuid CHAR(32) REFERENCES transactions(tx_uuid) ON DELETE CASCADE, txa_action CHAR(1) NOT NULL, txa_public %s NOT NULL, txa_order INTEGER, eid_from INTEGER NOT NULL, eid_to INTEGER NOT NULL, rtype VARCHAR(256) NOT NULL);;CREATE INDEX tx_relation_actions_txa_action_idx ON tx_relation_actions(txa_action);;CREATE INDEX tx_relation_actions_txa_public_idx ON tx_relation_actions(txa_public);;CREATE INDEX tx_relation_actions_eid_from_idx ON tx_relation_actions(eid_from);;CREATE INDEX tx_relation_actions_eid_to_idx ON tx_relation_actions(eid_to);;CREATE INDEX tx_relation_actions_tx_uuid_idx ON tx_relation_actions(tx_uuid);;"""%(helper.sql_create_numrange('entities_id_seq').replace(';',';;'),typemap['Datetime'],typemap['Boolean'],typemap['Bytes'],typemap['Boolean'])ifhelper.backend_name=='sqlite':# sqlite support the ON DELETE CASCADE syntax but do nothingschema+='''CREATE TRIGGER fkd_transactionsBEFORE DELETE ON transactionsFOR EACH ROW BEGIN DELETE FROM tx_entity_actions WHERE tx_uuid=OLD.tx_uuid; DELETE FROM tx_relation_actions WHERE tx_uuid=OLD.tx_uuid;END;;'''schema+=';;'.join(helper.sqls_create_multicol_unique_index('entities',['extid']))schema+=';;\n'returnschemadefsql_drop_schema(driver):helper=get_db_helper(driver)return"""%s;%sDROP TABLE entities;DROP TABLE tx_entity_actions;DROP TABLE tx_relation_actions;DROP TABLE transactions;"""%(';'.join(helper.sqls_drop_multicol_unique_index('entities',['extid'])),helper.sql_drop_numrange('entities_id_seq'))defgrant_schema(user,set_owner=True):result=''fortablein('entities','entities_id_seq','transactions','tx_entity_actions','tx_relation_actions'):ifset_owner:result='ALTER TABLE %s OWNER TO %s;\n'%(table,user)result+='GRANT ALL ON %s TO %s;\n'%(table,user)returnresultclassBaseAuthentifier(object):def__init__(self,source=None):self.source=sourcedefset_schema(self,schema):"""set the instance'schema"""passclassLoginPasswordAuthentifier(BaseAuthentifier):passwd_rql='Any P WHERE X is CWUser, X login %(login)s, X upassword P'auth_rql=(u'Any X WHERE X is CWUser, X login %(login)s, X upassword %(pwd)s, ''X cw_source S, S name "system"')_sols=({'X':'CWUser','P':'Password','S':'CWSource'},)defset_schema(self,schema):"""set the instance'schema"""if'CWUser'inschema:# probably an empty schema if not true...# rql syntax trees used to authenticate usersself._passwd_rqlst=self.source.compile_rql(self.passwd_rql,self._sols)self._auth_rqlst=self.source.compile_rql(self.auth_rql,self._sols)defauthenticate(self,cnx,login,password=None,**kwargs):"""return CWUser eid for the given login/password if this account is defined in this source, else raise `AuthenticationError` two queries are needed since passwords are stored crypted, so we have to fetch the salt first """args={'login':login,'pwd':None}ifpasswordisnotNone:rset=self.source.syntax_tree_search(cnx,self._passwd_rqlst,args)try:pwd=rset[0][0]exceptIndexError:raiseAuthenticationError('bad login')ifpwdisNone:# if pwd is None but a password is provided, something is wrongraiseAuthenticationError('bad password')# passwords are stored using the Bytes type, so we get a StringIOargs['pwd']=Binary(crypt_password(password,pwd.getvalue()))# get eid from login and (crypted) passwordrset=self.source.syntax_tree_search(cnx,self._auth_rqlst,args)pwd=args['pwd']try:user=rset[0][0]# If the stored hash uses a deprecated scheme (e.g. DES or MD5 used# before 3.14.7), update with a fresh oneifpwdisnotNoneandpwd.getvalue():verify,newhash=verify_and_update(password,pwd.getvalue())ifnotverify:# should not happen, but...raiseAuthenticationError('bad password')ifnewhash:cnx.system_sql("UPDATE %s SET %s=%%(newhash)s WHERE %s=%%(login)s"%(SQL_PREFIX+'CWUser',SQL_PREFIX+'upassword',SQL_PREFIX+'login'),{'newhash':self.source._binary(newhash.encode('ascii')),'login':login})cnx.commit()returnuserexceptIndexError:raiseAuthenticationError('bad password')classEmailPasswordAuthentifier(BaseAuthentifier):defauthenticate(self,cnx,login,**authinfo):# email_auth flag prevent from infinite recursion (call to# repo.check_auth_info at the end of this method may lead us here again)ifnot'@'inloginorauthinfo.pop('email_auth',None):raiseAuthenticationError('not an email')rset=cnx.execute('Any L WHERE U login L, U primary_email M, ''M address %(login)s',{'login':login},build_descr=False)ifrset.rowcount!=1:raiseAuthenticationError('unexisting email')login=rset.rows[0][0]authinfo['email_auth']=Truereturnself.source.repo.check_auth_info(cnx,login,authinfo)classDatabaseIndependentBackupRestore(object):"""Helper class to perform db backend agnostic backup and restore The backup and restore methods are used to dump / restore the system database in a database independent format. The file is a Zip archive containing the following files: * format.txt: the format of the archive. Currently '1.1' * tables.txt: list of filenames in the archive tables/ directory * sequences.txt: list of filenames in the archive sequences/ directory * numranges.txt: list of filenames in the archive numrange/ directory * versions.txt: the list of cube versions from CWProperty * tables/<tablename>.<chunkno>: pickled data * sequences/<sequencename>: pickled data The pickled data format for tables, numranges and sequences is a tuple of 3 elements: * the table name * a tuple of column names * a list of rows (as tuples with one element per column) Tables are saved in chunks in different files in order to prevent a too high memory consumption. """blocksize=100def__init__(self,source):""" :param: source an instance of the system source """self._source=sourceself.logger=logging.getLogger('cubicweb.ctl')self.logger.setLevel(logging.INFO)self.logger.addHandler(logging.StreamHandler(sys.stdout))self.schema=self._source.schemaself.dbhelper=self._source.dbhelperself.cnx=Noneself.cursor=Noneself.sql_generator=sqlgen.SQLGenerator()defget_connection(self):returnself._source.get_connection()defbackup(self,backupfile):archive=zipfile.ZipFile(backupfile,'w',allowZip64=True)self.cnx=self.get_connection()try:self.cursor=self.cnx.cursor()self.cursor.arraysize=100self.logger.info('writing metadata')self.write_metadata(archive)forseqinself.get_sequences():self.logger.info('processing sequence %s',seq)self.write_sequence(archive,seq)fornumrangeinself.get_numranges():self.logger.info('processing numrange %s',numrange)self.write_numrange(archive,numrange)fortableinself.get_tables():self.logger.info('processing table %s',table)self.write_table(archive,table)finally:archive.close()self.cnx.close()self.logger.info('done')defget_tables(self):non_entity_tables=['entities','transactions','tx_entity_actions','tx_relation_actions',]etype_tables=[]relation_tables=[]prefix='cw_'foretypeinself.schema.entities():eschema=self.schema.eschema(etype)ifeschema.final:continueetype_tables.append('%s%s'%(prefix,etype))forrtypeinself.schema.relations():rschema=self.schema.rschema(rtype)ifrschema.finalorrschema.inlinedorrschemainVIRTUAL_RTYPES:continuerelation_tables.append('%s_relation'%rtype)returnnon_entity_tables+etype_tables+relation_tablesdefget_sequences(self):return[]defget_numranges(self):return['entities_id_seq']defwrite_metadata(self,archive):archive.writestr('format.txt','1.1')archive.writestr('tables.txt','\n'.join(self.get_tables()))archive.writestr('sequences.txt','\n'.join(self.get_sequences()))archive.writestr('numranges.txt','\n'.join(self.get_numranges()))versions=self._get_versions()versions_str='\n'.join('%s%s'%(k,v)fork,vinversions)archive.writestr('versions.txt',versions_str)defwrite_sequence(self,archive,seq):sql=self.dbhelper.sql_sequence_current_state(seq)columns,rows_iterator=self._get_cols_and_rows(sql)rows=list(rows_iterator)serialized=self._serialize(seq,columns,rows)archive.writestr('sequences/%s'%seq,serialized)defwrite_numrange(self,archive,numrange):sql=self.dbhelper.sql_numrange_current_state(numrange)columns,rows_iterator=self._get_cols_and_rows(sql)rows=list(rows_iterator)serialized=self._serialize(numrange,columns,rows)archive.writestr('numrange/%s'%numrange,serialized)defwrite_table(self,archive,table):nb_lines_sql='SELECT COUNT(*) FROM %s'%tableself.cursor.execute(nb_lines_sql)rowcount=self.cursor.fetchone()[0]sql='SELECT * FROM %s'%tablecolumns,rows_iterator=self._get_cols_and_rows(sql)self.logger.info('number of rows: %d',rowcount)blocksize=self.blocksizeifrowcount>0:fori,startinenumerate(range(0,rowcount,blocksize)):rows=list(itertools.islice(rows_iterator,blocksize))serialized=self._serialize(table,columns,rows)archive.writestr('tables/%s.%04d'%(table,i),serialized)self.logger.debug('wrote rows %d to %d (out of %d) to %s.%04d',start,start+len(rows)-1,rowcount,table,i)else:rows=[]serialized=self._serialize(table,columns,rows)archive.writestr('tables/%s.%04d'%(table,0),serialized)def_get_cols_and_rows(self,sql):process_result=self._source.iter_process_resultself.cursor.execute(sql)columns=(d[0]fordinself.cursor.description)rows=process_result(self.cursor)returntuple(columns),rowsdef_serialize(self,name,columns,rows):returnpickle.dumps((name,columns,rows),pickle.HIGHEST_PROTOCOL)defrestore(self,backupfile):archive=zipfile.ZipFile(backupfile,'r',allowZip64=True)self.cnx=self.get_connection()self.cursor=self.cnx.cursor()sequences,numranges,tables,table_chunks=self.read_metadata(archive,backupfile)forseqinsequences:self.logger.info('restoring sequence %s',seq)self.read_sequence(archive,seq)fornumrangeinnumranges:self.logger.info('restoring numrange %s',numrange)self.read_numrange(archive,numrange)fortableintables:self.logger.info('restoring table %s',table)self.read_table(archive,table,sorted(table_chunks[table]))self.cnx.close()archive.close()self.logger.info('done')defread_metadata(self,archive,backupfile):formatinfo=archive.read('format.txt')self.logger.info('checking metadata')ifformatinfo.strip()!="1.1":self.logger.critical('Unsupported format in archive: %s',formatinfo)raiseValueError('Unknown format in %s: %s'%(backupfile,formatinfo))tables=archive.read('tables.txt').splitlines()sequences=archive.read('sequences.txt').splitlines()numranges=archive.read('numranges.txt').splitlines()archive_versions=self._parse_versions(archive.read('versions.txt'))db_versions=set(self._get_versions())ifarchive_versions!=db_versions:self.logger.critical('Restore warning: versions do not match')new_cubes=db_versions-archive_versionsifnew_cubes:self.logger.critical('In the db:\n%s','\n'.join('%s: %s'%(cube,ver)forcube,verinsorted(new_cubes)))old_cubes=archive_versions-db_versionsifold_cubes:self.logger.critical('In the archive:\n%s','\n'.join('%s: %s'%(cube,ver)forcube,verinsorted(old_cubes)))ifnotASK.confirm('Versions mismatch: continue anyway ?',False):raiseValueError('Unable to restore: versions do not match')table_chunks={}fornameinarchive.namelist():ifnotname.startswith('tables/'):continuefilename=basename(name)tablename,_ext=filename.rsplit('.',1)table_chunks.setdefault(tablename,[]).append(name)returnsequences,numranges,tables,table_chunksdefread_sequence(self,archive,seq):seqname,columns,rows=pickle.loads(archive.read('sequences/%s'%seq))assertseqname==seqassertlen(rows)==1assertlen(rows[0])==1value=rows[0][0]sql=self.dbhelper.sql_restart_sequence(seq,value)self.cursor.execute(sql)self.cnx.commit()defread_numrange(self,archive,numrange):rangename,columns,rows=pickle.loads(archive.read('numrange/%s'%numrange))assertrangename==numrangeassertlen(rows)==1assertlen(rows[0])==1value=rows[0][0]sql=self.dbhelper.sql_restart_numrange(numrange,value)self.cursor.execute(sql)self.cnx.commit()defread_table(self,archive,table,filenames):merge_args=self._source.merge_argsself.cursor.execute('DELETE FROM %s'%table)self.cnx.commit()row_count=0forfilenameinfilenames:tablename,columns,rows=pickle.loads(archive.read(filename))asserttablename==tableifnotrows:continueinsert=self.sql_generator.insert(table,dict(zip(columns,rows[0])))forrowinrows:self.cursor.execute(insert,merge_args(dict(zip(columns,row)),{}))row_count+=len(rows)self.cnx.commit()self.logger.info('inserted %d rows',row_count)def_parse_versions(self,version_str):versions=set()forlineinversion_str.splitlines():versions.add(tuple(line.split()))returnversionsdef_get_versions(self):version_sql='SELECT cw_pkey, cw_value FROM cw_CWProperty'versions=[]self.cursor.execute(version_sql)forpkey,valueinself.cursor.fetchall():ifpkey.startswith(u'system.version'):versions.append((pkey,value))returnversions