Added tag cubicweb-version-3.8.2 for changeset ef2e37d34013
# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""Adapters for native cubicweb sources.Notes:* extid (aka external id, the primary key of an entity in the external source from which it comes from) are stored in a varchar column encoded as a base64 string. This is because it should actually be Bytes but we want an index on it for fast querying."""from__future__importwith_statement__docformat__="restructuredtext en"frompickleimportloads,dumpsfromthreadingimportLockfromdatetimeimportdatetimefrombase64importb64decode,b64encodefromcontextlibimportcontextmanagerfromlogilab.common.compatimportanyfromlogilab.common.cacheimportCachefromlogilab.common.decoratorsimportcached,clear_cachefromlogilab.common.configurationimportMethodfromlogilab.common.shellutilsimportgetloginfromlogilab.databaseimportget_db_helperfromcubicwebimportUnknownEid,AuthenticationError,ValidationError,Binaryfromcubicwebimporttransactionastx,server,neg_rolefromcubicweb.schemaimportVIRTUAL_RTYPESfromcubicweb.cwconfigimportCubicWebNoAppConfigurationfromcubicweb.serverimporthookfromcubicweb.server.utilsimportcrypt_password,eschema_eidfromcubicweb.server.sqlutilsimportSQL_PREFIX,SQLAdapterMixInfromcubicweb.server.rqlannotationimportset_qdatafromcubicweb.server.hookimportCleanupDeletedEidsCacheOpfromcubicweb.server.sessionimporthooks_control,security_enabledfromcubicweb.server.sourcesimportAbstractSource,dbg_st_search,dbg_resultsfromcubicweb.server.sources.rql2sqlimportSQLGeneratorATTR_MAP={}NONSYSTEM_ETYPES=set()NONSYSTEM_RELATIONS=set()classLogCursor(object):def__init__(self,cursor):self.cu=cursordefexecute(self,query,args=None):"""Execute a query. it's a function just so that it shows up in profiling """ifserver.DEBUG&server.DBG_SQL:print'exec',query,argstry:self.cu.execute(str(query),args)exceptException,ex:print"sql: %r\n args: %s\ndbms message: %r"%(query,args,ex.args[0])raisedeffetchall(self):returnself.cu.fetchall()deffetchone(self):returnself.cu.fetchone()defmake_schema(selected,solution,table,typemap):"""return a sql schema to store RQL query result"""sql=[]varmap={}fori,terminenumerate(selected):name='C%s'%ikey=term.as_string()varmap[key]='%s.%s'%(table,name)ttype=term.get_type(solution)try:sql.append('%s%s'%(name,typemap[ttype]))exceptKeyError:# assert not schema(ttype).finalsql.append('%s%s'%(name,typemap['Int']))return','.join(sql),varmapdef_modified_sql(table,etypes):# XXX protect against sql injectioniflen(etypes)>1:restr='type IN (%s)'%','.join("'%s'"%etypeforetypeinetypes)else:restr="type='%s'"%etypes[0]iftable=='entities':attr='mtime'else:attr='dtime'return'SELECT type, eid FROM %s WHERE %s AND %s > %%(time)s'%(table,restr,attr)defsql_or_clauses(sql,clauses):select,restr=sql.split(' WHERE ',1)restrclauses=restr.split(' AND ')forclauseinclauses:restrclauses.remove(clause)ifrestrclauses:restr='%s AND (%s)'%(' AND '.join(restrclauses),' OR '.join(clauses))else:restr='(%s)'%' OR '.join(clauses)return'%s WHERE %s'%(select,restr)classUndoException(Exception):"""something went wrong during undoing"""def_undo_check_relation_target(tentity,rdef,role):"""check linked entity has not been redirected for this relation"""card=rdef.role_cardinality(role)ifcardin'?1'andtentity.related(rdef.rtype,role):raiseUndoException(tentity._cw._("Can't restore %(role)s relation %(rtype)s to entity %(eid)s which ""is already linked using this relation.")%{'role':neg_role(role),'rtype':rdef.rtype,'eid':tentity.eid})def_undo_rel_info(session,subj,rtype,obj):entities=[]forrole,eidin(('subject',subj),('object',obj)):try:entities.append(session.entity_from_eid(eid))exceptUnknownEid:raiseUndoException(session._("Can't restore relation %(rtype)s, %(role)s entity %(eid)s"" doesn't exist anymore.")%{'role':session._(role),'rtype':session._(rtype),'eid':eid})sentity,oentity=entitiestry:rschema=session.vreg.schema.rschema(rtype)rdef=rschema.rdefs[(sentity.__regid__,oentity.__regid__)]exceptKeyError:raiseUndoException(session._("Can't restore relation %(rtype)s between %(subj)s and ""%(obj)s, that relation does not exists anymore in the ""schema.")%{'rtype':session._(rtype),'subj':subj,'obj':obj})returnsentity,oentity,rdefdef_undo_has_later_transaction(session,eid):returnsession.system_sql('''\SELECT T.tx_uuid FROM transactions AS TREF, transactions AS TWHERE TREF.tx_uuid='%(txuuid)s' AND T.tx_uuid!='%(txuuid)s'AND T.tx_time>=TREF.tx_timeAND (EXISTS(SELECT 1 FROM tx_entity_actions AS TEA WHERE TEA.tx_uuid=T.tx_uuid AND TEA.eid=%(eid)s) OR EXISTS(SELECT 1 FROM tx_relation_actions as TRA WHERE TRA.tx_uuid=T.tx_uuid AND ( TRA.eid_from=%(eid)s OR TRA.eid_to=%(eid)s)) )'''%{'txuuid':session.transaction_data['undoing_uuid'],'eid':eid}).fetchone()classNativeSQLSource(SQLAdapterMixIn,AbstractSource):"""adapter for source using the native cubicweb schema (see below) """sqlgen_class=SQLGeneratoroptions=(('db-driver',{'type':'string','default':'postgres',# XXX use choice type'help':'database driver (postgres, mysql, sqlite, sqlserver2005)','group':'native-source','level':1,}),('db-host',{'type':'string','default':'','help':'database host','group':'native-source','level':1,}),('db-port',{'type':'string','default':'','help':'database port','group':'native-source','level':1,}),('db-name',{'type':'string','default':Method('default_instance_id'),'help':'database name','group':'native-source','level':0,}),('db-user',{'type':'string','default':CubicWebNoAppConfiguration.mode=='user'andgetlogin()or'cubicweb','help':'database user','group':'native-source','level':0,}),('db-password',{'type':'password','default':'','help':'database password','group':'native-source','level':0,}),('db-encoding',{'type':'string','default':'utf8','help':'database encoding','group':'native-source','level':1,}),('db-extra-arguments',{'type':'string','default':'','help':'set to "Trusted_Connection" if you are using SQLServer and ''want trusted authentication for the database connection','group':'native-source','level':2,}),)def__init__(self,repo,appschema,source_config,*args,**kwargs):SQLAdapterMixIn.__init__(self,source_config)self.authentifiers=[LoginPasswordAuthentifier(self)]AbstractSource.__init__(self,repo,appschema,source_config,*args,**kwargs)# sql generatorself._rql_sqlgen=self.sqlgen_class(appschema,self.dbhelper,ATTR_MAP.copy())# full text index helperself.do_fti=notrepo.config['delay-full-text-indexation']# sql queries cacheself._cache=Cache(repo.config['rql-cache-size'])self._temp_table_data={}# we need a lock to protect eid attribution function (XXX, really?# explain)self._eid_creation_lock=Lock()# (etype, attr) / storage mappingself._storages={}# entity types that may be used by other multi-sources instancesself.multisources_etypes=set(repo.config['multi-sources-etypes'])# XXX no_sqlite_wrap trick since we've a sqlite locking pb when# running unittest_multisources with the wrapping belowifself.dbdriver=='sqlite'and \notgetattr(repo.config,'no_sqlite_wrap',False):fromcubicweb.server.sources.extliteimportConnectionWrapperself.get_connection=lambda:ConnectionWrapper(self)self.check_connection=lambdacnx:cnxdefpool_reset(cnx):cnx.close()self.pool_reset=pool_reset@propertydef_sqlcnx(self):# XXX: sqlite connections can only be used in the same thread, so# create a new one each time necessary. If it appears to be time# consuming, find another wayreturnSQLAdapterMixIn.get_connection(self)defadd_authentifier(self,authentifier):self.authentifiers.append(authentifier)authentifier.source=selfauthentifier.set_schema(self.schema)defreset_caches(self):"""method called during test to reset potential source caches"""self._cache=Cache(self.repo.config['rql-cache-size'])defclear_eid_cache(self,eid,etype):"""clear potential caches for the given eid"""self._cache.pop('Any X WHERE X eid %s, X is %s'%(eid,etype),None)self._cache.pop('Any X WHERE X eid %s'%eid,None)self._cache.pop('Any %s'%eid,None)defsqlexec(self,session,sql,args=None):"""execute the query and return its result"""returnself.process_result(self.doexec(session,sql,args))definit_creating(self):pool=self.repo._get_pool()pool.pool_set()# check full text index availibilityifself.do_fti:ifnotself.dbhelper.has_fti_table(pool['system']):ifnotself.repo.config.creating:self.critical('no text index table')self.do_fti=Falsepool.pool_reset()self.repo._free_pool(pool)defbackup(self,backupfile,confirm):"""method called to create a backup of the source's data"""self.close_pool_connections()try:self.backup_to_file(backupfile,confirm)finally:self.open_pool_connections()defrestore(self,backupfile,confirm,drop):"""method called to restore a backup of source's data"""ifself.repo.config.open_connections_pools:self.close_pool_connections()try:self.restore_from_file(backupfile,confirm,drop=drop)finally:ifself.repo.config.open_connections_pools:self.open_pool_connections()definit(self):self.init_creating()# XXX deprecates [un]map_attribute ?defmap_attribute(self,etype,attr,cb,sourcedb=True):self._rql_sqlgen.attr_map['%s.%s'%(etype,attr)]=(cb,sourcedb)defunmap_attribute(self,etype,attr):self._rql_sqlgen.attr_map.pop('%s.%s'%(etype,attr),None)defset_storage(self,etype,attr,storage):storage_dict=self._storages.setdefault(etype,{})storage_dict[attr]=storageself.map_attribute(etype,attr,storage.callback,storage.is_source_callback)defunset_storage(self,etype,attr):self._storages[etype].pop(attr)# if etype has no storage left, remove the entryifnotself._storages[etype]:delself._storages[etype]self.unmap_attribute(etype,attr)defstorage(self,etype,attr):"""return the storage for the given entity type / attribute """try:returnself._storages[etype][attr]exceptKeyError:raiseException('no custom storage set for %s.%s'%(etype,attr))# ISource interface #######################################################defcompile_rql(self,rql,sols):rqlst=self.repo.vreg.rqlhelper.parse(rql)rqlst.restricted_vars=()rqlst.children[0].solutions=solsself.repo.querier.sqlgen_annotate(rqlst)set_qdata(self.schema.rschema,rqlst,())returnrqlstdefset_schema(self,schema):"""set the instance'schema"""self._cache=Cache(self.repo.config['rql-cache-size'])self.cache_hit,self.cache_miss,self.no_cache=0,0,0self.schema=schematry:self._rql_sqlgen.schema=schemaexceptAttributeError:pass# __init__forauthentifierinself.authentifiers:authentifier.set_schema(self.schema)clear_cache(self,'need_fti_indexation')defsupport_entity(self,etype,write=False):"""return true if the given entity's type is handled by this adapter if write is true, return true only if it's a RW support """returnnotetypeinNONSYSTEM_ETYPESdefsupport_relation(self,rtype,write=False):"""return true if the given relation's type is handled by this adapter if write is true, return true only if it's a RW support """ifwrite:returnnotrtypeinNONSYSTEM_RELATIONS# due to current multi-sources implementation, the system source# can't claim not supporting a relationreturnTrue#not rtype == 'content_for'defmay_cross_relation(self,rtype):returnTruedefauthenticate(self,session,login,**kwargs):"""return CWUser eid for the given login and other authentication information found in kwargs, else raise `AuthenticationError` """forauthentifierinself.authentifiers:try:returnauthentifier.authenticate(session,login,**kwargs)exceptAuthenticationError:continueraiseAuthenticationError()defsyntax_tree_search(self,session,union,args=None,cachekey=None,varmap=None):"""return result from this source for a rql query (actually from a rql syntax tree and a solution dictionary mapping each used variable to a possible type). If cachekey is given, the query necessary to fetch the results (but not the results themselves) may be cached using this key. """assertdbg_st_search(self.uri,union,varmap,args,cachekey)# remember number of actually selected term (sql generation may append some)ifcachekeyisNone:self.no_cache+=1# generate sql query if we are able to do so (not supported types...)sql,qargs,cbs=self._rql_sqlgen.generate(union,args,varmap)else:# sql may be cachedtry:sql,qargs,cbs=self._cache[cachekey]self.cache_hit+=1exceptKeyError:self.cache_miss+=1sql,qargs,cbs=self._rql_sqlgen.generate(union,args,varmap)self._cache[cachekey]=sql,qargs,cbsargs=self.merge_args(args,qargs)assertisinstance(sql,basestring),repr(sql)try:cursor=self.doexec(session,sql,args)except(self.OperationalError,self.InterfaceError):# FIXME: better detection of deconnection pbself.warning("trying to reconnect")session.pool.reconnect(self)cursor=self.doexec(session,sql,args)results=self.process_result(cursor,cbs)assertdbg_results(results)returnresultsdefflying_insert(self,table,session,union,args=None,varmap=None):"""similar as .syntax_tree_search, but inserts data in the temporary table (on-the-fly if possible, eg for the system source whose the given cursor come from). If not possible, inserts all data by calling .executemany(). """assertdbg_st_search(self.uri,union,varmap,args,prefix='ON THE FLY temp data insertion into %s from'%table)# generate sql queries if we are able to do sosql,qargs,cbs=self._rql_sqlgen.generate(union,args,varmap)query='INSERT INTO %s%s'%(table,sql.encode(self._dbencoding))self.doexec(session,query,self.merge_args(args,qargs))defmanual_insert(self,results,table,session):"""insert given result into a temporary table on the system source"""ifserver.DEBUG&server.DBG_RQL:print' manual insertion of',results,'into',tableifnotresults:returnquery_args=['%%(%s)s'%iforiinxrange(len(results[0]))]query='INSERT INTO %s VALUES(%s)'%(table,','.join(query_args))kwargs_list=[]forrowinresults:kwargs={}row=tuple(row)forindex,cellinenumerate(row):ifisinstance(cell,Binary):cell=self._binary(cell.getvalue())kwargs[str(index)]=cellkwargs_list.append(kwargs)self.doexecmany(session,query,kwargs_list)defclean_temp_data(self,session,temptables):"""remove temporary data, usually associated to temporary tables"""iftemptables:fortableintemptables:try:self.doexec(session,'DROP TABLE %s'%table)except:passtry:delself._temp_table_data[table]exceptKeyError:continue@contextmanagerdef_storage_handler(self,entity,event):# 1/ memorize values as they are before the storage is called.# For instance, the BFSStorage will replace the `data`# binary value with a Binary containing the destination path# on the filesystem. To make the entity.data usage absolutely# transparent, we'll have to reset entity.data to its binary# value once the SQL query will be executedrestore_values={}etype=entity.__regid__forattr,storageinself._storages.get(etype,{}).items():try:edited=entity.edited_attributesexceptAttributeError:assertevent=='deleted'getattr(storage,'entity_deleted')(entity,attr)else:ifattrinedited:handler=getattr(storage,'entity_%s'%event)real_value=handler(entity,attr)restore_values[attr]=real_valuetry:yield# 2/ execute the source's instructionsfinally:# 3/ restore original valuesforattr,valueinrestore_values.items():entity[attr]=valuedefadd_entity(self,session,entity):"""add a new entity to the source"""withself._storage_handler(entity,'added'):attrs=self.preprocess_entity(entity)sql=self.sqlgen.insert(SQL_PREFIX+entity.__regid__,attrs)self.doexec(session,sql,attrs)ifsession.undoable_action('C',entity.__regid__):self._record_tx_action(session,'tx_entity_actions','C',etype=entity.__regid__,eid=entity.eid)defupdate_entity(self,session,entity):"""replace an entity in the source"""withself._storage_handler(entity,'updated'):attrs=self.preprocess_entity(entity)ifsession.undoable_action('U',entity.__regid__):changes=self._save_attrs(session,entity,attrs)self._record_tx_action(session,'tx_entity_actions','U',etype=entity.__regid__,eid=entity.eid,changes=self._binary(dumps(changes)))sql=self.sqlgen.update(SQL_PREFIX+entity.__regid__,attrs,['cw_eid'])self.doexec(session,sql,attrs)defdelete_entity(self,session,entity):"""delete an entity from the source"""withself._storage_handler(entity,'deleted'):ifsession.undoable_action('D',entity.__regid__):attrs=[SQL_PREFIX+r.typeforrinentity.e_schema.subject_relations()if(r.finalorr.inlined)andnotrinVIRTUAL_RTYPES]changes=self._save_attrs(session,entity,attrs)self._record_tx_action(session,'tx_entity_actions','D',etype=entity.__regid__,eid=entity.eid,changes=self._binary(dumps(changes)))attrs={'cw_eid':entity.eid}sql=self.sqlgen.delete(SQL_PREFIX+entity.__regid__,attrs)self.doexec(session,sql,attrs)defadd_relation(self,session,subject,rtype,object,inlined=False):"""add a relation to the source"""self._add_relation(session,subject,rtype,object,inlined)ifsession.undoable_action('A',rtype):self._record_tx_action(session,'tx_relation_actions','A',eid_from=subject,rtype=rtype,eid_to=object)def_add_relation(self,session,subject,rtype,object,inlined=False):"""add a relation to the source"""ifinlinedisFalse:attrs={'eid_from':subject,'eid_to':object}sql=self.sqlgen.insert('%s_relation'%rtype,attrs)else:# used by data importetype=session.describe(subject)[0]attrs={'cw_eid':subject,SQL_PREFIX+rtype:object}sql=self.sqlgen.update(SQL_PREFIX+etype,attrs,['cw_eid'])self.doexec(session,sql,attrs)defdelete_relation(self,session,subject,rtype,object):"""delete a relation from the source"""rschema=self.schema.rschema(rtype)self._delete_relation(session,subject,rtype,object,rschema.inlined)ifsession.undoable_action('R',rtype):self._record_tx_action(session,'tx_relation_actions','R',eid_from=subject,rtype=rtype,eid_to=object)def_delete_relation(self,session,subject,rtype,object,inlined=False):"""delete a relation from the source"""ifinlined:table=SQL_PREFIX+session.describe(subject)[0]column=SQL_PREFIX+rtypesql='UPDATE %s SET %s=NULL WHERE %seid=%%(eid)s'%(table,column,SQL_PREFIX)attrs={'eid':subject}else:attrs={'eid_from':subject,'eid_to':object}sql=self.sqlgen.delete('%s_relation'%rtype,attrs)self.doexec(session,sql,attrs)defdoexec(self,session,query,args=None,rollback=True):"""Execute a query. it's a function just so that it shows up in profiling """cursor=session.pool[self.uri]ifserver.DEBUG&server.DBG_SQL:cnx=session.pool.connection(self.uri)# getattr to get the actual connection if cnx is a ConnectionWrapper# instanceprint'exec',query,args,getattr(cnx,'_cnx',cnx)try:# str(query) to avoid error if it's an unicode stringcursor.execute(str(query),args)exceptException,ex:ifself.repo.config.mode!='test':# during test we get those message when trying to alter sqlite# db schemaself.critical("sql: %r\n args: %s\ndbms message: %r",query,args,ex.args[0])ifrollback:try:session.pool.connection(self.uri).rollback()ifself.repo.config.mode!='test':self.critical('transaction has been rollbacked')except:passraisereturncursordefdoexecmany(self,session,query,args):"""Execute a query. it's a function just so that it shows up in profiling """ifserver.DEBUG&server.DBG_SQL:print'execmany',query,'with',len(args),'arguments'cursor=session.pool[self.uri]try:# str(query) to avoid error if it's an unicode stringcursor.executemany(str(query),args)exceptException,ex:ifself.repo.config.mode!='test':# during test we get those message when trying to alter sqlite# db schemaself.critical("sql many: %r\n args: %s\ndbms message: %r",query,args,ex.args[0])try:session.pool.connection(self.uri).rollback()ifself.repo.config.mode!='test':self.critical('transaction has been rollbacked')except:passraise# short cut to method requiring advanced db helper usage ##################defbinary_to_str(self,value):returnself.dbhelper.dbapi_module.binary_to_str(value)defcreate_index(self,session,table,column,unique=False):cursor=LogCursor(session.pool[self.uri])self.dbhelper.create_index(cursor,table,column,unique)defdrop_index(self,session,table,column,unique=False):cursor=LogCursor(session.pool[self.uri])self.dbhelper.drop_index(cursor,table,column,unique)# system source interface #################################################defeid_type_source(self,session,eid):"""return a tuple (type, source, extid) for the entity with id <eid>"""sql='SELECT type, source, extid FROM entities WHERE eid=%s'%eidtry:res=self.doexec(session,sql).fetchone()except:assertsession.pool,'session has no pool set'raiseUnknownEid(eid)ifresisNone:raiseUnknownEid(eid)ifres[-1]isnotNone:ifnotisinstance(res,list):res=list(res)res[-1]=b64decode(res[-1])returnresdefextid2eid(self,session,source,extid):"""get eid from an external id. Return None if no record found."""assertisinstance(extid,str)cursor=self.doexec(session,'SELECT eid FROM entities ''WHERE extid=%(x)s AND source=%(s)s',{'x':b64encode(extid),'s':source.uri})# XXX testing rowcount cause strange bug with sqlite, results are there# but rowcount is 0#if cursor.rowcount > 0:try:result=cursor.fetchone()ifresult:returnresult[0]except:passreturnNonedefmake_temp_table_name(self,table):try:# XXX remove this once returnself.dbhelper.temporary_table_name(table)exceptAttributeError:importwarningswarnings.warn('Please hg up logilab.database')returntabledeftemp_table_def(self,selected,sol,table):returnmake_schema(selected,sol,table,self.dbhelper.TYPE_MAPPING)defcreate_temp_table(self,session,table,schema):# we don't want on commit drop, this may cause problem when# running with an ldap source, and table will be deleted manually any way# on commitsql=self.dbhelper.sql_temporary_table(table,schema,False)self.doexec(session,sql)defcreate_eid(self,session):self._eid_creation_lock.acquire()try:forsqlinself.dbhelper.sqls_increment_sequence('entities_id_seq'):cursor=self.doexec(session,sql)returncursor.fetchone()[0]finally:self._eid_creation_lock.release()defadd_info(self,session,entity,source,extid,complete):"""add type and source info for an eid into the system table"""# begin by inserting eid/type/source/extid into the entities tableifextidisnotNone:assertisinstance(extid,str)extid=b64encode(extid)attrs={'type':entity.__regid__,'eid':entity.eid,'extid':extid,'source':source.uri,'mtime':datetime.now()}self.doexec(session,self.sqlgen.insert('entities',attrs),attrs)# now we can update the full text indexifself.do_ftiandself.need_fti_indexation(entity.__regid__):ifcomplete:entity.complete(entity.e_schema.indexable_attributes())self.index_entity(session,entity=entity)defupdate_info(self,session,entity,need_fti_update):"""mark entity as being modified, fulltext reindex if needed"""ifself.do_ftiandneed_fti_update:# reindex the entity only if this query is updating at least# one indexable attributeself.index_entity(session,entity=entity)# update entities.mtime.# XXX Only if entity.__regid__ in self.multisources_etypes?attrs={'eid':entity.eid,'mtime':datetime.now()}self.doexec(session,self.sqlgen.update('entities',attrs,['eid']),attrs)defdelete_info(self,session,entity,uri,extid):"""delete system information on deletion of an entity: * update the fti * remove record from the entities table * transfer it to the deleted_entities table if the entity's type is multi-sources """self.fti_unindex_entity(session,entity.eid)attrs={'eid':entity.eid}self.doexec(session,self.sqlgen.delete('entities',attrs),attrs)ifnotentity.__regid__inself.multisources_etypes:returnifextidisnotNone:assertisinstance(extid,str),type(extid)extid=b64encode(extid)attrs={'type':entity.__regid__,'eid':entity.eid,'extid':extid,'source':uri,'dtime':datetime.now()}self.doexec(session,self.sqlgen.insert('deleted_entities',attrs),attrs)defmodified_entities(self,session,etypes,mtime):"""return a 2-uple: * list of (etype, eid) of entities of the given types which have been modified since the given timestamp (actually entities whose full text index content has changed) * list of (etype, eid) of entities of the given types which have been deleted since the given timestamp """foretypeinetypes:ifnotetypeinself.multisources_etypes:self.critical('%s not listed as a multi-sources entity types. ''Modify your configuration'%etype)self.multisources_etypes.add(etype)modsql=_modified_sql('entities',etypes)cursor=self.doexec(session,modsql,{'time':mtime})modentities=cursor.fetchall()delsql=_modified_sql('deleted_entities',etypes)cursor=self.doexec(session,delsql,{'time':mtime})delentities=cursor.fetchall()returnmodentities,delentities# undo support #############################################################defundoable_transactions(self,session,ueid=None,**actionfilters):"""See :class:`cubicweb.dbapi.Connection.undoable_transactions`"""# force filtering to session's user if not a managerifnotsession.user.is_in_group('managers'):ueid=session.user.eidrestr={}ifueidisnotNone:restr['tx_user']=ueidsql=self.sqlgen.select('transactions',restr,('tx_uuid','tx_time','tx_user'))ifactionfilters:# we will need subqueries to filter transactions according to# actions donetearestr={}# filters on the tx_entity_actions tabletrarestr={}# filters on the tx_relation_actions tablegenrestr={}# generic filters, appliyable to both table# unless public explicitly set to false, we only consider public# actionsifactionfilters.pop('public',True):genrestr['txa_public']=True# put additional filters in trarestr and/or tearestrforkey,valinactionfilters.iteritems():ifkey=='etype':# filtering on etype implies filtering on entity actions# only, and with no eid specifiedassertactionfilters.get('action','C')in'CUD'assertnot'eid'inactionfilterstearestr['etype']=valelifkey=='eid':# eid filter may apply to 'eid' of tx_entity_actions or to# 'eid_from' OR 'eid_to' of tx_relation_actionsifactionfilters.get('action','C')in'CUD':tearestr['eid']=valifactionfilters.get('action','A')in'AR':trarestr['eid_from']=valtrarestr['eid_to']=valelifkey=='action':ifvalin'CUD':tearestr['txa_action']=valelse:assertvalin'AR'trarestr['txa_action']=valelse:raiseAssertionError('unknow filter %s'%key)asserttrarestrortearestr,"can't only filter on 'public'"subqsqls=[]# append subqueries to the original query, using EXISTS()iftrarestror(genrestrandnottearestr):trarestr.update(genrestr)trasql=self.sqlgen.select('tx_relation_actions',trarestr,('1',))if'eid_from'intrarestr:# replace AND by OR between eid_from/eid_to restrictiontrasql=sql_or_clauses(trasql,['eid_from = %(eid_from)s','eid_to = %(eid_to)s'])trasql+=' AND transactions.tx_uuid=tx_relation_actions.tx_uuid'subqsqls.append('EXISTS(%s)'%trasql)iftearestror(genrestrandnottrarestr):tearestr.update(genrestr)teasql=self.sqlgen.select('tx_entity_actions',tearestr,('1',))teasql+=' AND transactions.tx_uuid=tx_entity_actions.tx_uuid'subqsqls.append('EXISTS(%s)'%teasql)ifrestr:sql+=' AND %s'%' OR '.join(subqsqls)else:sql+=' WHERE %s'%' OR '.join(subqsqls)restr.update(trarestr)restr.update(tearestr)# we want results ordered by transaction's time descendantsql+=' ORDER BY tx_time DESC'cu=self.doexec(session,sql,restr)# turn results into transaction objectsreturn[tx.Transaction(*args)forargsincu.fetchall()]deftx_info(self,session,txuuid):"""See :class:`cubicweb.dbapi.Connection.transaction_info`"""returntx.Transaction(txuuid,*self._tx_info(session,txuuid))deftx_actions(self,session,txuuid,public):"""See :class:`cubicweb.dbapi.Connection.transaction_actions`"""self._tx_info(session,txuuid)restr={'tx_uuid':txuuid}ifpublic:restr['txa_public']=True# XXX use generator to avoid loading everything in memory?sql=self.sqlgen.select('tx_entity_actions',restr,('txa_action','txa_public','txa_order','etype','eid','changes'))cu=self.doexec(session,sql,restr)actions=[tx.EntityAction(a,p,o,et,e,candloads(self.binary_to_str(c)))fora,p,o,et,e,cincu.fetchall()]sql=self.sqlgen.select('tx_relation_actions',restr,('txa_action','txa_public','txa_order','rtype','eid_from','eid_to'))cu=self.doexec(session,sql,restr)actions+=[tx.RelationAction(*args)forargsincu.fetchall()]returnsorted(actions,key=lambdax:x.order)defundo_transaction(self,session,txuuid):"""See :class:`cubicweb.dbapi.Connection.undo_transaction` important note: while undoing of a transaction, only hooks in the 'integrity', 'activeintegrity' and 'undo' categories are called. """# set mode so pool isn't released subsquently until commit/rollbacksession.mode='write'errors=[]session.transaction_data['undoing_uuid']=txuuidwithhooks_control(session,session.HOOKS_DENY_ALL,'integrity','activeintegrity','undo'):withsecurity_enabled(session,read=False):foractioninreversed(self.tx_actions(session,txuuid,False)):undomethod=getattr(self,'_undo_%s'%action.action.lower())errors+=undomethod(session,action)# remove the transactions recordself.doexec(session,"DELETE FROM transactions WHERE tx_uuid='%s'"%txuuid)returnerrorsdefstart_undoable_transaction(self,session,uuid):"""session callback to insert a transaction record in the transactions table when some undoable transaction is started """ueid=session.user.eidattrs={'tx_uuid':uuid,'tx_user':ueid,'tx_time':datetime.now()}self.doexec(session,self.sqlgen.insert('transactions',attrs),attrs)def_save_attrs(self,session,entity,attrs):"""return a pickleable dictionary containing current values for given attributes of the entity """restr={'cw_eid':entity.eid}sql=self.sqlgen.select(SQL_PREFIX+entity.__regid__,restr,attrs)cu=self.doexec(session,sql,restr)values=dict(zip(attrs,cu.fetchone()))# ensure backend specific binary are converted back to stringeschema=entity.e_schemaforcolumninattrs:# [3:] remove 'cw_' prefixattr=column[3:]ifnoteschema.subjrels[attr].final:continueifeschema.destination(attr)in('Password','Bytes'):value=values[column]ifvalueisnotNone:values[column]=self.binary_to_str(value)returnvaluesdef_record_tx_action(self,session,table,action,**kwargs):"""record a transaction action in the given table (either 'tx_entity_actions' or 'tx_relation_action') """kwargs['tx_uuid']=session.transaction_uuid()kwargs['txa_action']=actionkwargs['txa_order']=session.transaction_inc_action_counter()kwargs['txa_public']=session.running_dbapi_queryself.doexec(session,self.sqlgen.insert(table,kwargs),kwargs)def_tx_info(self,session,txuuid):"""return transaction's time and user of the transaction with the given uuid. raise `NoSuchTransaction` if there is no such transaction of if the session's user isn't allowed to see it. """restr={'tx_uuid':txuuid}sql=self.sqlgen.select('transactions',restr,('tx_time','tx_user'))cu=self.doexec(session,sql,restr)try:time,ueid=cu.fetchone()exceptTypeError:raisetx.NoSuchTransaction()ifnot(session.user.is_in_group('managers')orsession.user.eid==ueid):raisetx.NoSuchTransaction()returntime,ueiddef_undo_d(self,session,action):"""undo an entity deletion"""errors=[]err=errors.appendeid=action.eidetype=action.etype_=session._# get an entity instancetry:entity=self.repo.vreg['etypes'].etype_class(etype)(session)exceptException:err("can't restore entity %s of type %s, type no more supported"%(eid,etype))returnerrors# check for schema changes, entities linked through inlined relation# still exists, rewrap binary valueseschema=entity.e_schemagetrschema=eschema.subjrelsforcolumn,valueinaction.changes.items():rtype=column[3:]# remove cw_ prefixtry:rschema=getrschema[rtype]exceptKeyError:err(_("Can't restore relation %(rtype)s of entity %(eid)s, ""this relation does not exists anymore in the schema.")%{'rtype':rtype,'eid':eid})ifnotrschema.final:assertvalueisNoneelifeschema.destination(rtype)in('Bytes','Password'):action.changes[column]=self._binary(value)entity[rtype]=Binary(value)elifisinstance(value,str):entity[rtype]=unicode(value,session.encoding,'replace')else:entity[rtype]=valueentity.set_eid(eid)session.repo.init_entity_caches(session,entity,self)entity.edited_attributes=set(entity)entity.check()self.repo.hm.call_hooks('before_add_entity',session,entity=entity)# restore the entityaction.changes['cw_eid']=eidsql=self.sqlgen.insert(SQL_PREFIX+etype,action.changes)self.doexec(session,sql,action.changes)# add explicitly is / is_instance_of whose deletion is not recorded for# consistency with addition (done by sql in hooks)self.doexec(session,'INSERT INTO is_relation(eid_from, eid_to) ''VALUES(%s, %s)'%(eid,eschema_eid(session,eschema)))foreschemainentity.e_schema.ancestors()+[entity.e_schema]:self.doexec(session,'INSERT INTO is_instance_of_relation(eid_from,''eid_to) VALUES(%s, %s)'%(eid,eschema_eid(session,eschema)))# restore record in entities (will update fti if needed)self.add_info(session,entity,self,None,True)# remove record from deleted_entities if entity's type is multi-sourcesifentity.__regid__inself.multisources_etypes:self.doexec(session,'DELETE FROM deleted_entities WHERE eid=%s'%eid)self.repo.hm.call_hooks('after_add_entity',session,entity=entity)returnerrorsdef_undo_r(self,session,action):"""undo a relation removal"""errors=[]subj,rtype,obj=action.eid_from,action.rtype,action.eid_totry:sentity,oentity,rdef=_undo_rel_info(session,subj,rtype,obj)exceptUndoException,ex:errors.append(unicode(ex))else:forrole,entityin(('subject',sentity),('object',oentity)):try:_undo_check_relation_target(entity,rdef,role)exceptUndoException,ex:errors.append(unicode(ex))continueifnoterrors:self.repo.hm.call_hooks('before_add_relation',session,eidfrom=subj,rtype=rtype,eidto=obj)# add relation in the databaseself._add_relation(session,subj,rtype,obj,rdef.rtype.inlined)# set related cachesession.update_rel_cache_add(subj,rtype,obj,rdef.rtype.symmetric)self.repo.hm.call_hooks('after_add_relation',session,eidfrom=subj,rtype=rtype,eidto=obj)returnerrorsdef_undo_c(self,session,action):"""undo an entity creation"""eid=action.eid# XXX done to avoid fetching all remaining relation for the entity# we should find an efficient way to do this (keeping current veolidf# massive deletion performance)if_undo_has_later_transaction(session,eid):msg=session._('some later transaction(s) touch entity, undo them ''first')raiseValidationError(eid,{None:msg})etype=action.etype# get an entity instancetry:entity=self.repo.vreg['etypes'].etype_class(etype)(session)exceptException:return[session._("Can't undo creation of entity %(eid)s of type %(etype)s, type ""no more supported"%{'eid':eid,'etype':etype})]entity.set_eid(eid)# for proper eid/type cache updatehook.set_operation(session,'pendingeids',eid,CleanupDeletedEidsCacheOp)self.repo.hm.call_hooks('before_delete_entity',session,entity=entity)# remove is / is_instance_of which are added using sql by hooks, hence# unvisible as transaction actionself.doexec(session,'DELETE FROM is_relation WHERE eid_from=%s'%eid)self.doexec(session,'DELETE FROM is_instance_of_relation WHERE eid_from=%s'%eid)# XXX check removal of inlined relation?# delete the entityattrs={'cw_eid':eid}sql=self.sqlgen.delete(SQL_PREFIX+entity.__regid__,attrs)self.doexec(session,sql,attrs)# remove record from entities (will update fti if needed)self.delete_info(session,entity,self.uri,None)self.repo.hm.call_hooks('after_delete_entity',session,entity=entity)return()def_undo_u(self,session,action):"""undo an entity update"""return['undoing of entity updating not yet supported.']def_undo_a(self,session,action):"""undo a relation addition"""errors=[]subj,rtype,obj=action.eid_from,action.rtype,action.eid_totry:sentity,oentity,rdef=_undo_rel_info(session,subj,rtype,obj)exceptUndoException,ex:errors.append(unicode(ex))else:rschema=rdef.rtypeifrschema.inlined:sql='SELECT 1 FROM cw_%s WHERE cw_eid=%s and cw_%s=%s'\%(sentity.__regid__,subj,rtype,obj)else:sql='SELECT 1 FROM %s_relation WHERE eid_from=%s and eid_to=%s'\%(rtype,subj,obj)cu=self.doexec(session,sql)ifcu.fetchone()isNone:errors.append(session._("Can't undo addition of relation %(rtype)s from %(subj)s to"" %(obj)s, doesn't exist anymore"%locals()))ifnoterrors:self.repo.hm.call_hooks('before_delete_relation',session,eidfrom=subj,rtype=rtype,eidto=obj)# delete relation from the databaseself._delete_relation(session,subj,rtype,obj,rschema.inlined)# set related cachesession.update_rel_cache_del(subj,rtype,obj,rschema.symmetric)self.repo.hm.call_hooks('after_delete_relation',session,eidfrom=subj,rtype=rtype,eidto=obj)returnerrors# full text index handling #################################################@cacheddefneed_fti_indexation(self,etype):eschema=self.schema.eschema(etype)ifany(eschema.indexable_attributes()):returnTrueifany(eschema.fulltext_containers()):returnTruereturnFalsedefindex_entity(self,session,entity):"""create an operation to [re]index textual content of the given entity on commit """hook.set_operation(session,'ftindex',entity.eid,FTIndexEntityOp)deffti_unindex_entity(self,session,eid):"""remove text content for entity with the given eid from the full text index """try:self.dbhelper.cursor_unindex_object(eid,session.pool['system'])exceptException:# let KeyboardInterrupt / SystemExit propagateself.exception('error while unindexing %s',eid)deffti_index_entity(self,session,entity):"""add text content of a created/modified entity to the full text index """self.debug('reindexing %r',entity.eid)try:# use cursor_index_object, not cursor_reindex_object since# unindexing done in the FTIndexEntityOpself.dbhelper.cursor_index_object(entity.eid,entity,session.pool['system'])exceptException:# let KeyboardInterrupt / SystemExit propagateself.exception('error while reindexing %s',entity)classFTIndexEntityOp(hook.LateOperation):"""operation to delay entity full text indexation to commit since fti indexing may trigger discovery of other entities, it should be triggered on precommit, not commit, and this should be done after other precommit operation which may add relations to the entity """defprecommit_event(self):session=self.sessionsource=session.repo.system_sourcependingeids=session.transaction_data.get('pendingeids',())done=session.transaction_data.setdefault('indexedeids',set())foreidinsession.transaction_data.pop('ftindex',()):ifeidinpendingeidsoreidindone:# entity added and deleted in the same transaction or already# processedreturndone.add(eid)forcontainerinsession.entity_from_eid(eid).fti_containers():source.fti_unindex_entity(session,container.eid)source.fti_index_entity(session,container)defsql_schema(driver):helper=get_db_helper(driver)typemap=helper.TYPE_MAPPINGschema="""/* Create the repository's system database */%sCREATE TABLE entities ( eid INTEGER PRIMARY KEY NOT NULL, type VARCHAR(64) NOT NULL, source VARCHAR(64) NOT NULL, mtime %s NOT NULL, extid VARCHAR(256));;CREATE INDEX entities_type_idx ON entities(type);;CREATE INDEX entities_mtime_idx ON entities(mtime);;CREATE INDEX entities_extid_idx ON entities(extid);;CREATE TABLE deleted_entities ( eid INTEGER PRIMARY KEY NOT NULL, type VARCHAR(64) NOT NULL, source VARCHAR(64) NOT NULL, dtime %s NOT NULL, extid VARCHAR(256));;CREATE INDEX deleted_entities_type_idx ON deleted_entities(type);;CREATE INDEX deleted_entities_dtime_idx ON deleted_entities(dtime);;CREATE INDEX deleted_entities_extid_idx ON deleted_entities(extid);;CREATE TABLE transactions ( tx_uuid CHAR(32) PRIMARY KEY NOT NULL, tx_user INTEGER NOT NULL, tx_time %s NOT NULL);;CREATE INDEX transactions_tx_user_idx ON transactions(tx_user);;CREATE TABLE tx_entity_actions ( tx_uuid CHAR(32) REFERENCES transactions(tx_uuid) ON DELETE CASCADE, txa_action CHAR(1) NOT NULL, txa_public %s NOT NULL, txa_order INTEGER, eid INTEGER NOT NULL, etype VARCHAR(64) NOT NULL, changes %s);;CREATE INDEX tx_entity_actions_txa_action_idx ON tx_entity_actions(txa_action);;CREATE INDEX tx_entity_actions_txa_public_idx ON tx_entity_actions(txa_public);;CREATE INDEX tx_entity_actions_eid_idx ON tx_entity_actions(eid);;CREATE INDEX tx_entity_actions_etype_idx ON tx_entity_actions(etype);;CREATE TABLE tx_relation_actions ( tx_uuid CHAR(32) REFERENCES transactions(tx_uuid) ON DELETE CASCADE, txa_action CHAR(1) NOT NULL, txa_public %s NOT NULL, txa_order INTEGER, eid_from INTEGER NOT NULL, eid_to INTEGER NOT NULL, rtype VARCHAR(256) NOT NULL);;CREATE INDEX tx_relation_actions_txa_action_idx ON tx_relation_actions(txa_action);;CREATE INDEX tx_relation_actions_txa_public_idx ON tx_relation_actions(txa_public);;CREATE INDEX tx_relation_actions_eid_from_idx ON tx_relation_actions(eid_from);;CREATE INDEX tx_relation_actions_eid_to_idx ON tx_relation_actions(eid_to);;"""%(helper.sql_create_sequence('entities_id_seq').replace(';',';;'),typemap['Datetime'],typemap['Datetime'],typemap['Datetime'],typemap['Boolean'],typemap['Bytes'],typemap['Boolean'])ifhelper.backend_name=='sqlite':# sqlite support the ON DELETE CASCADE syntax but do nothingschema+='''CREATE TRIGGER fkd_transactionsBEFORE DELETE ON transactionsFOR EACH ROW BEGIN DELETE FROM tx_entity_actions WHERE tx_uuid=OLD.tx_uuid; DELETE FROM tx_relation_actions WHERE tx_uuid=OLD.tx_uuid;END;;'''returnschemadefsql_drop_schema(driver):helper=get_db_helper(driver)return"""%sDROP TABLE entities;DROP TABLE deleted_entities;DROP TABLE tx_entity_actions;DROP TABLE tx_relation_actions;DROP TABLE transactions;"""%helper.sql_drop_sequence('entities_id_seq')defgrant_schema(user,set_owner=True):result=''fortablein('entities','deleted_entities','entities_id_seq','transactions','tx_entity_actions','tx_relation_actions'):ifset_owner:result='ALTER TABLE %s OWNER TO %s;\n'%(table,user)result+='GRANT ALL ON %s TO %s;\n'%(table,user)returnresultclassBaseAuthentifier(object):def__init__(self,source=None):self.source=sourcedefset_schema(self,schema):"""set the instance'schema"""passclassLoginPasswordAuthentifier(BaseAuthentifier):passwd_rql="Any P WHERE X is CWUser, X login %(login)s, X upassword P"auth_rql="Any X WHERE X is CWUser, X login %(login)s, X upassword %(pwd)s"_sols=({'X':'CWUser','P':'Password'},)defset_schema(self,schema):"""set the instance'schema"""if'CWUser'inschema:# probably an empty schema if not true...# rql syntax trees used to authenticate usersself._passwd_rqlst=self.source.compile_rql(self.passwd_rql,self._sols)self._auth_rqlst=self.source.compile_rql(self.auth_rql,self._sols)defauthenticate(self,session,login,password=None,**kwargs):"""return CWUser eid for the given login/password if this account is defined in this source, else raise `AuthenticationError` two queries are needed since passwords are stored crypted, so we have to fetch the salt first """args={'login':login,'pwd':password}ifpasswordisnotNone:rset=self.source.syntax_tree_search(session,self._passwd_rqlst,args)try:pwd=rset[0][0]exceptIndexError:raiseAuthenticationError('bad login')# passwords are stored using the Bytes type, so we get a StringIOifpwdisnotNone:args['pwd']=Binary(crypt_password(password,pwd.getvalue()[:2]))# get eid from login and (crypted) passwordrset=self.source.syntax_tree_search(session,self._auth_rqlst,args)try:returnrset[0][0]exceptIndexError:raiseAuthenticationError('bad password')