"""Helper classes to execute RQL queries on a set of sources, performingsecurity checking and data aggregation.:organization: Logilab:copyright: 2001-2010 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses"""__docformat__="restructuredtext en"fromitertoolsimportrepeatfromlogilab.common.cacheimportCachefromlogilab.common.compatimportanyfromrqlimportRQLHelper,RQLSyntaxErrorfromrql.stmtsimportUnion,Selectfromrql.nodesimport(Relation,VariableRef,Constant,SubQuery)fromcubicwebimportUnauthorized,QueryError,UnknownEid,typed_eidfromcubicwebimportserverfromcubicweb.rsetimportResultSetfromcubicweb.server.utilsimportcleanup_solutionsfromcubicweb.server.rqlannotationimportSQLGenAnnotator,set_qdatafromcubicweb.server.ssplannerimportadd_types_restrictionREAD_ONLY_RTYPES=set(('eid','has_text','is','is_instance_of','identity'))defempty_rset(session,rql,args,rqlst=None):"""build an empty result set object"""returnResultSet([],rql,args,rqlst=rqlst)defupdate_varmap(varmap,selected,table):"""return a sql schema to store RQL query result"""fori,terminenumerate(selected):key=term.as_string()value='%s.C%s'%(table,i)ifvarmap.get(key,value)!=value:raiseException('variable name conflict on %s'%key)varmap[key]=value# permission utilities ########################################################defvar_kwargs(restriction,args):varkwargs={}forrelinrestriction.iget_nodes(Relation):cmp=rel.children[1]ifrel.r_type=='eid'andcmp.operator=='='and \notrel.neged(strict=True)and \isinstance(cmp.children[0],Constant)and \cmp.children[0].type=='Substitute':varkwargs[rel.children[0].name]=typed_eid(cmp.children[0].eval(args))returnvarkwargsdefcheck_no_password_selected(rqlst):"""check that Password entities are not selected"""forsolutioninrqlst.solutions:if'Password'insolution.itervalues():raiseUnauthorized('Password selection is not allowed')defcheck_read_access(schema,user,rqlst,solution):"""check that the given user has credentials to access data read the query return a dict defining necessary local checks (due to use of rql expression in the schema), keys are variable names and values associated rql expression for the associated variable with the given solution """ifrqlst.whereisnotNone:forrelinrqlst.where.iget_nodes(Relation):# XXX has_text may have specific perm ?ifrel.r_typeinREAD_ONLY_RTYPES:continueifnotschema.rschema(rel.r_type).has_access(user,'read'):raiseUnauthorized('read',rel.r_type)localchecks={}# iterate on defined_vars and not on solutions to ignore column aliasesforvarnameinrqlst.defined_vars:etype=solution[varname]eschema=schema.eschema(etype)ifnoteschema.has_access(user,'read'):erqlexprs=eschema.get_rqlexprs('read')ifnoterqlexprs:ex=Unauthorized('read',etype)ex.var=varnameraiseex#assert len(erqlexprs) == 1localchecks[varname]=tuple(erqlexprs)returnlocalchecksdefnoinvariant_vars(restricted,select,nbtrees):# a variable can actually be invariant if it has not been restricted for# security reason or if security assertion hasn't modified the possible# solutions for the queryifnbtrees!=1:forvnameinrestricted:try:yieldselect.defined_vars[vname]exceptKeyError:# this is an aliascontinueelse:forvnameinrestricted:try:var=select.defined_vars[vname]exceptKeyError:# this is an aliascontinueiflen(var.stinfo['possibletypes'])!=1:yieldvardef_expand_selection(terms,selected,aliases,select,newselect):forterminterms:forvrefinterm.iget_nodes(VariableRef):ifnotvref.nameinselected:select.append_selected(vref)colalias=newselect.get_variable(vref.name,len(aliases))aliases.append(VariableRef(colalias))selected.add(vref.name)# Plans #######################################################################classExecutionPlan(object):"""the execution model of a rql query, composed of querier steps"""def__init__(self,querier,rqlst,args,session):# original rql syntax treeself.rqlst=rqlstself.args=argsor{}# session executing the queryself.session=session# quick reference to the system sourceself.syssource=session.pool.source('system')# execution stepsself.steps=[]# index of temporary tables created during executionself.temp_tables={}# various resource accesorsself.querier=querierself.schema=querier.schemaself.sqlannotate=querier.sqlgen_annotateself.rqlhelper=session.vreg.rqlhelperdefannotate_rqlst(self):ifnotself.rqlst.annotated:self.rqlhelper.annotate(self.rqlst)defadd_step(self,step):"""add a step to the plan"""self.steps.append(step)defclean(self):"""remove temporary tables"""self.syssource.clean_temp_data(self.session,self.temp_tables)defsqlexec(self,sql,args=None):returnself.syssource.sqlexec(self.session,sql,args)defexecute(self):"""execute a plan and return resulting rows"""try:forstepinself.steps:result=step.execute()# the latest executed step contains the full query resultreturnresultfinally:self.clean()definit_temp_table(self,table,selected,sol):"""initialize sql schema and variable map for a temporary table which will be used to store result for the given rqlst """try:outputmap,sqlschema,_=self.temp_tables[table]update_varmap(outputmap,selected,table)exceptKeyError:sqlschema,outputmap=self.syssource.temp_table_def(selected,sol,table)self.temp_tables[table]=[outputmap,sqlschema,False]returnoutputmapdefcreate_temp_table(self,table):"""create a temporary table to store result for the given rqlst"""ifnotself.temp_tables[table][-1]:sqlschema=self.temp_tables[table][1]self.syssource.create_temp_table(self.session,table,sqlschema)self.temp_tables[table][-1]=Truedefpreprocess(self,union,security=True):"""insert security when necessary then annotate rql st for sql generation return rqlst to actually execute """noinvariant=set()ifsecurityandnotself.session.is_super_session:self._insert_security(union,noinvariant)self.rqlhelper.simplify(union)self.sqlannotate(union)set_qdata(self.schema.rschema,union,noinvariant)ifunion.has_text_query:self.cache_key=Nonedef_insert_security(self,union,noinvariant):rh=self.rqlhelperforselectinunion.children[:]:forsubqueryinselect.with_:self._insert_security(subquery.query,noinvariant)localchecks,restricted=self._check_permissions(select)ifany(localchecks):rewrite=self.session.rql_rewriter.rewritenbtrees=len(localchecks)myunion=union# transform in subquery when len(localchecks)>1 and groupsifnbtrees>1and(select.orderbyorselect.groupbyorselect.havingorselect.has_aggregatorselect.limitorselect.offset):newselect=Select()# only select variables in subqueriesorigselection=select.selectionselect.select_only_variables()select.has_aggregat=False# create subquery first so correct node are used on copy# (eg ColumnAlias instead of Variable)aliases=[VariableRef(newselect.get_variable(vref.name,i))fori,vrefinenumerate(select.selection)]selected=set(vref.nameforvrefinaliases)# now copy original selection and groupsforterminorigselection:newselect.append_selected(term.copy(newselect))ifselect.orderby:newselect.set_orderby([s.copy(newselect)forsinselect.orderby])_expand_selection(select.orderby,selected,aliases,select,newselect)select.orderby=()# XXX dereference?ifselect.groupby:newselect.set_groupby([g.copy(newselect)forginselect.groupby])_expand_selection(select.groupby,selected,aliases,select,newselect)select.groupby=()# XXX dereference?ifselect.having:newselect.set_having([g.copy(newselect)forginselect.having])_expand_selection(select.having,selected,aliases,select,newselect)select.having=()# XXX dereference?ifselect.limit:newselect.limit=select.limitselect.limit=Noneifselect.offset:newselect.offset=select.offsetselect.offset=0myunion=Union()newselect.set_with([SubQuery(aliases,myunion)],check=False)solutions=[sol.copy()forsolinselect.solutions]cleanup_solutions(newselect,solutions)newselect.set_possible_types(solutions)# if some solutions doesn't need rewriting, insert original# select as first union subqueryif()inlocalchecks:myunion.append(select)# we're done, replace original select by the new select with# subqueries (more added in the loop below)union.replace(select,newselect)elifnot()inlocalchecks:union.remove(select)forlcheckdef,lchecksolutionsinlocalchecks.iteritems():ifnotlcheckdef:continuemyrqlst=select.copy(solutions=lchecksolutions)myunion.append(myrqlst)# in-place rewrite + annotation / simplificationlcheckdef=[((varmap,'X'),rqlexprs)forvarmap,rqlexprsinlcheckdef]rewrite(myrqlst,lcheckdef,lchecksolutions,self.args)noinvariant.update(noinvariant_vars(restricted,myrqlst,nbtrees))if()inlocalchecks:select.set_possible_types(localchecks[()])add_types_restriction(self.schema,select)noinvariant.update(noinvariant_vars(restricted,select,nbtrees))def_check_permissions(self,rqlst):"""return a dict defining "local checks", e.g. RQLExpression defined in the schema that should be inserted in the original query solutions where a variable has a type which the user can't definitly read are removed, else if the user may read it (eg if an rql expression is defined for the "read" permission of the related type), the local checks dict for the solution is updated return a dict with entries for each different local check necessary, with associated solutions as value. A local check is defined by a list of 2-uple, with variable name as first item and the necessary rql expression as second item for each variable which has to be checked. So solutions which don't require local checks will be associated to the empty tuple key. note: rqlst should not have been simplified at this point """assertnotself.session.is_super_sessionuser=self.session.userschema=self.schemamsgs=[]# dictionnary of variables restricted for security reasonlocalchecks={}ifrqlst.whereisnotNone:varkwargs=var_kwargs(rqlst.where,self.args)neweids=self.session.transaction_data.get('neweids',())else:varkwargs=Nonerestricted_vars=set()newsolutions=[]forsolutioninrqlst.solutions:try:localcheck=check_read_access(schema,user,rqlst,solution)exceptUnauthorized,ex:msg='remove %s from solutions since %s has no %s access to %s'msg%=(solution,user.login,ex.args[0],ex.args[1])msgs.append(msg)LOGGER.info(msg)else:newsolutions.append(solution)ifvarkwargs:# try to benefit of rqlexpr.check cache for entities which# are specified by eid in query'argsforvarname,eidinvarkwargs.iteritems():try:rqlexprs=localcheck.pop(varname)exceptKeyError:continueifeidinneweids:continueforrqlexprinrqlexprs:ifrqlexpr.check(self.session,eid):breakelse:raiseUnauthorized()restricted_vars.update(localcheck)localchecks.setdefault(tuple(localcheck.iteritems()),[]).append(solution)# raise Unautorized exception if the user can't access to any solutionifnotnewsolutions:raiseUnauthorized('\n'.join(msgs))rqlst.set_possible_types(newsolutions)returnlocalchecks,restricted_varsdeffinalize(self,select,solutions,insertedvars):rqlst=Union()rqlst.append(select)formainvarname,rschema,newvarnameininsertedvars:nvartype=str(rschema.objects(solutions[0][mainvarname])[0])forsolinsolutions:sol[newvarname]=nvartypeselect.clean_solutions(solutions)self.rqlhelper.annotate(rqlst)self.preprocess(rqlst,security=False)returnrqlstclassInsertPlan(ExecutionPlan):"""an execution model specific to the INSERT rql query """def__init__(self,querier,rqlst,args,session):ExecutionPlan.__init__(self,querier,rqlst,args,session)# save originaly selected variable, we may modify this# dictionary for substitution (query parameters)self.selected=rqlst.selection# list of new or updated entities definition (utils.Entity)self.e_defs=[[]]# list of new relation definition (3-uple (from_eid, r_type, to_eid)self.r_defs=[]# indexes to track entity definitions bound to relation definitionsself._r_subj_index={}self._r_obj_index={}self._expanded_r_defs={}defrelation_definitions(self,rqlst,to_build):"""add constant values to entity def, mark variables to be selected """to_select={}forrelationinrqlst.main_relations:lhs,rhs=relation.get_variable_parts()rtype=relation.r_typeifrtypeinREAD_ONLY_RTYPES:raiseQueryError("can't assign to %s"%rtype)try:edef=to_build[str(lhs)]exceptKeyError:# lhs var is not to build, should be selected and added as an# object relationedef=to_build[str(rhs)]to_select.setdefault(edef,[]).append((rtype,lhs,1))else:ifisinstance(rhs,Constant)andnotrhs.uid:# add constant values to entity defvalue=rhs.eval(self.args)eschema=edef.e_schemaattrtype=eschema.subjrels[rtype].objects(eschema)[0]ifattrtype=='Password'andisinstance(value,unicode):value=value.encode('UTF8')edef[rtype]=valueelifto_build.has_key(str(rhs)):# create a relation between two newly created variablesself.add_relation_def((edef,rtype,to_build[rhs.name]))else:to_select.setdefault(edef,[]).append((rtype,rhs,0))returnto_selectdefadd_entity_def(self,edef):"""add an entity definition to build"""edef.querier_pending_relations={}self.e_defs[-1].append(edef)defadd_relation_def(self,rdef):"""add an relation definition to build"""self.r_defs.append(rdef)ifnotisinstance(rdef[0],int):self._r_subj_index.setdefault(rdef[0],[]).append(rdef)ifnotisinstance(rdef[2],int):self._r_obj_index.setdefault(rdef[2],[]).append(rdef)defsubstitute_entity_def(self,edef,edefs):"""substitute an incomplete entity definition by a list of complete equivalents e.g. on queries such as :: INSERT Personne X, Societe Y: X nom N, Y nom 'toto', X travaille Y WHERE U login 'admin', U login N X will be inserted as many times as U exists, and so the X travaille Y relations as to be added as many time as X is inserted """ifnotedefsornotself.e_defs:# no result, no entity will be createdself.e_defs=()return# first remove the incomplete entity definitioncolidx=self.e_defs[0].index(edef)fori,rowinenumerate(self.e_defs[:]):self.e_defs[i][colidx]=edefs[0]samplerow=self.e_defs[i]foredefinedefs[1:]:row=samplerow[:]row[colidx]=edefself.e_defs.append(row)# now, see if this entity def is referenced as subject in some relation# definitionifself._r_subj_index.has_key(edef):forrdefinself._r_subj_index[edef]:expanded=self._expanded(rdef)result=[]forexp_rdefinexpanded:foredefinedefs:result.append((edef,exp_rdef[1],exp_rdef[2]))self._expanded_r_defs[rdef]=result# and finally, see if this entity def is referenced as object in some# relation definitionifself._r_obj_index.has_key(edef):forrdefinself._r_obj_index[edef]:expanded=self._expanded(rdef)result=[]forexp_rdefinexpanded:foredefinedefs:result.append((exp_rdef[0],exp_rdef[1],edef))self._expanded_r_defs[rdef]=resultdef_expanded(self,rdef):"""return expanded value for the given relation definition"""try:returnself._expanded_r_defs[rdef]exceptKeyError:self.r_defs.remove(rdef)return[rdef]defrelation_defs(self):"""return the list for relation definitions to insert"""forrdefsinself._expanded_r_defs.values():forrdefinrdefs:yieldrdefforrdefinself.r_defs:yieldrdefdefinsert_entity_defs(self):"""return eids of inserted entities in a suitable form for the resulting result set, e.g.: e.g. on queries such as :: INSERT Personne X, Societe Y: X nom N, Y nom 'toto', X travaille Y WHERE U login 'admin', U login N if there is two entities matching U, the result set will look like [(eidX1, eidY1), (eidX2, eidY2)] """session=self.sessionrepo=session.reporesults=[]forrowinself.e_defs:results.append([repo.glob_add_entity(session,edef)foredefinrow])returnresultsdefinsert_relation_defs(self):session=self.sessionrepo=session.repoforsubj,rtype,objinself.relation_defs():# if a string is given into args instead of an int, we get it hereifisinstance(subj,basestring):subj=typed_eid(subj)elifnotisinstance(subj,(int,long)):subj=subj.eidifisinstance(obj,basestring):obj=typed_eid(obj)elifnotisinstance(obj,(int,long)):obj=obj.eidifrepo.schema.rschema(rtype).inlined:entity=session.entity_from_eid(subj)entity[rtype]=objrepo.glob_update_entity(session,entity,set((rtype,)))else:repo.glob_add_relation(session,subj,rtype,obj)classQuerierHelper(object):"""helper class to execute rql queries, putting all things together"""def__init__(self,repo,schema):# system info helperself._repo=repo# instance schemaself.set_schema(schema)defset_schema(self,schema):self.schema=schemarepo=self._repo# rql parsing / analysing helperself.solutions=repo.vreg.solutionsself._rql_cache=Cache(repo.config['rql-cache-size'])self.cache_hit,self.cache_miss=0,0# rql planner# note: don't use repo.sources, may not be built yet, and also "admin"# isn't an actual sourcerqlhelper=repo.vreg.rqlhelperself._parse=rqlhelper.parseself._annotate=rqlhelper.annotateiflen([uriforuriinrepo.config.sources()ifuri!='admin'])<2:fromcubicweb.server.ssplannerimportSSPlannerself._planner=SSPlanner(schema,rqlhelper)else:fromcubicweb.server.msplannerimportMSPlannerself._planner=MSPlanner(schema,rqlhelper)# sql generation annotatorself.sqlgen_annotate=SQLGenAnnotator(schema).annotatedefparse(self,rql,annotate=False):"""return a rql syntax tree for the given rql"""try:returnself._parse(unicode(rql),annotate=annotate)exceptUnicodeError:raiseRQLSyntaxError(rql)defplan_factory(self,rqlst,args,session):"""create an execution plan for an INSERT RQL query"""ifrqlst.TYPE=='insert':returnInsertPlan(self,rqlst,args,session)returnExecutionPlan(self,rqlst,args,session)defexecute(self,session,rql,args=None,eid_key=None,build_descr=True):"""execute a rql query, return resulting rows and their description in a `ResultSet` object * `rql` should be an unicode string or a plain ascii string * `args` the optional parameters dictionary associated to the query * `build_descr` is a boolean flag indicating if the description should be built on select queries (if false, the description will be en empty list) * `eid_key` must be both a key in args and a substitution in the rql query. It should be used to enhance cacheability of rql queries. It may be a tuple for keys in args. eid_key must be providen in case where a eid substitution is providen and resolve some ambiguity in the possible solutions infered for each variable in the query. on INSERT queries, there will be on row with the eid of each inserted entity result for DELETE and SET queries is undefined yet to maximize the rql parsing/analyzing cache performance, you should always use substitute arguments in queries (eg avoid query such as 'Any X WHERE X eid 123'!) """ifserver.DEBUG&(server.DBG_RQL|server.DBG_SQL):ifserver.DEBUG&(server.DBG_MORE|server.DBG_SQL):print'*'*80print'querier input',rql,args# parse the query and binds variablesifeid_keyisnotNone:ifnotisinstance(eid_key,(tuple,list)):eid_key=(eid_key,)cachekey=[rql]forkeyineid_key:try:etype=self._repo.type_from_eid(args[key],session)exceptKeyError:raiseQueryError('bad cache key %s (no value)'%key)exceptTypeError:raiseQueryError('bad cache key %s (value: %r)'%(key,args[key]))exceptUnknownEid:# we want queries such as "Any X WHERE X eid 9999"# return an empty result instead of raising UnknownEidreturnempty_rset(session,rql,args)cachekey.append(etype)# ensure eid is correctly typed in argsargs[key]=typed_eid(args[key])cachekey=tuple(cachekey)else:cachekey=rqltry:rqlst=self._rql_cache[cachekey]self.cache_hit+=1exceptKeyError:self.cache_miss+=1rqlst=self.parse(rql)try:self.solutions(session,rqlst,args)exceptUnknownEid:# we want queries such as "Any X WHERE X eid 9999"# return an empty result instead of raising UnknownEidreturnempty_rset(session,rql,args,rqlst)self._rql_cache[cachekey]=rqlstorig_rqlst=rqlstifnotrqlst.TYPE=='select':ifnotsession.is_super_session:check_no_password_selected(rqlst)# write query, ensure session's mode is 'write' so connections# won't be released until commit/rollbacksession.mode='write'cachekey=Noneelse:ifnotsession.is_super_session:forselectinrqlst.children:check_no_password_selected(select)# on select query, always copy the cached rqlst so we don't have to# bother modifying it. This is not necessary on write queries since# a new syntax tree is built from them.rqlst=rqlst.copy()self._annotate(rqlst)# make an execution planplan=self.plan_factory(rqlst,args,session)plan.cache_key=cachekeyself._planner.build_plan(plan)# execute the plantry:results=plan.execute()exceptUnauthorized:# XXX this could be done in security's after_add_relation hooks# since it's actually realy only needed there (other relations# security is done *before* actual changes, and add/update entity# security is done after changes but in an operation, and exception# generated in operation's events properly generate a rollback on# the session). Even though, this is done here for a better# consistency: getting an Unauthorized exception means the# transaction has been rollbackedsession.rollback()raise# build a description for the results if necessarydescr=()ifbuild_descr:ifrqlst.TYPE=='select':# sample selectiondescr=session.build_description(orig_rqlst,args,results)elifrqlst.TYPE=='insert':# on insert plan, some entities may have been auto-casted,# so compute description manually even if there is only# one solutionbasedescr=[None]*len(plan.selected)todetermine=zip(xrange(len(plan.selected)),repeat(False))descr=session._build_descr(results,basedescr,todetermine)# FIXME: get number of affected entities / relations on non# selection queries ?# return a result set objectreturnResultSet(results,rql,args,descr,eid_key,orig_rqlst)fromloggingimportgetLoggerfromcubicwebimportset_log_methodsLOGGER=getLogger('cubicweb.querier')set_log_methods(QuerierHelper,LOGGER)