"""plan execution of rql queries on a single source:organization: Logilab:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses"""__docformat__="restructuredtext en"fromcopyimportcopyfromrql.stmtsimportUnion,Selectfromrql.nodesimportConstantfromcubicwebimportQueryError,typed_eidfromcubicweb.schemaimportVIRTUAL_RTYPESdefadd_types_restriction(schema,rqlst,newroot=None,solutions=None):ifnewrootisNone:assertsolutionsisNoneifhasattr(rqlst,'_types_restr_added'):returnsolutions=rqlst.solutionsnewroot=rqlstrqlst._types_restr_added=Trueelse:assertsolutionsisnotNonerqlst=rqlst.stmteschema=schema.eschemaallpossibletypes={}forsolutioninsolutions:forvarname,etypeinsolution.iteritems():ifnotvarnameinnewroot.defined_varsoreschema(etype).is_final():continueallpossibletypes.setdefault(varname,set()).add(etype)forvarnameinsorted(allpossibletypes):try:var=newroot.defined_vars[varname]exceptKeyError:continuestinfo=var.stinfoifstinfo.get('uidrels'):continue# eid specified, no need for additional type specificationtry:typerels=rqlst.defined_vars[varname].stinfo.get('typerels')exceptKeyError:assertvarnameinrqlst.aliasescontinueifnewrootisrqlstandtyperels:mytyperel=iter(typerels).next()else:forvrefinnewroot.defined_vars[varname].references():rel=vref.relation()ifrelandrel.is_types_restriction():mytyperel=relbreakelse:mytyperel=Nonepossibletypes=allpossibletypes[varname]ifmytyperelisnotNone:# variable as already some types restriction. new possible types# can only be a subset of existing ones, so only remove no more# possible typesforcstinmytyperel.get_nodes(Constant):ifnotcst.valueinpossibletypes:cst.parent.remove(cst)try:stinfo['possibletypes'].remove(cst.value)exceptKeyError:# restriction on a type not used by this query, may# occurs with X is IN(...)passelse:# we have to add types restrictionifstinfo.get('scope')isnotNone:rel=var.scope.add_type_restriction(var,possibletypes)else:# tree is not annotated yet, no scope set so add the restriction# to the rootrel=newroot.add_type_restriction(var,possibletypes)stinfo['typerels']=frozenset((rel,))stinfo['possibletypes']=possibletypesclassSSPlanner(object):"""SingleSourcePlanner: build execution plan for rql queries optimized for single source repositories """def__init__(self,schema,rqlhelper):self.schema=schemaself.rqlhelper=rqlhelperdefbuild_plan(self,plan):"""build an execution plan from a RQL query do nothing here, dispatch according to the statement type """build_plan=getattr(self,'build_%s_plan'%plan.rqlst.TYPE)forstepinbuild_plan(plan,plan.rqlst):plan.add_step(step)defbuild_select_plan(self,plan,rqlst):"""build execution plan for a SELECT RQL query. Suppose only one source is available and so avoid work need for query decomposition among sources the rqlst should not be tagged at this point. """plan.preprocess(rqlst)return(OneFetchStep(plan,rqlst,plan.session.repo.sources),)defbuild_insert_plan(self,plan,rqlst):"""get an execution plan from an INSERT RQL query"""# each variable in main variables is a new entity to insertto_build={}session=plan.sessionetype_class=session.vreg['etypes'].etype_classforetype,varinrqlst.main_variables:# need to do this since entity class is shared w. web client code !to_build[var.name]=etype_class(etype)(session)plan.add_entity_def(to_build[var.name])# add constant values to entity def, mark variables to be selectedto_select=plan.relation_definitions(rqlst,to_build)# add necessary steps to add relations and update attributesstep=InsertStep(plan)# insert each entity and its relationsstep.children+=self._compute_relation_steps(plan,rqlst.solutions,rqlst.where,to_select)return(step,)def_compute_relation_steps(self,plan,solutions,restriction,to_select):"""handle the selection of relations for an insert query"""foredef,rdefsinto_select.items():# create a select rql st to fetch needed dataselect=Select()eschema=edef.e_schemaforiinrange(len(rdefs)):rtype,term,reverse=rdefs[i]select.append_selected(term.copy(select))ifreverse:rdefs[i]=rtype,RelationsStep.REVERSE_RELATIONelse:rschema=eschema.subject_relation(rtype)ifrschema.is_final()orrschema.inlined:rdefs[i]=rtype,RelationsStep.FINALelse:rdefs[i]=rtype,RelationsStep.RELATIONifrestrictionisnotNone:select.set_where(restriction.copy(select))step=RelationsStep(plan,edef,rdefs)step.children+=self._select_plan(plan,select,solutions)yieldstepdefbuild_delete_plan(self,plan,rqlst):"""get an execution plan from a DELETE RQL query"""# build a select query to fetch entities to deletesteps=[]foretype,varinrqlst.main_variables:step=DeleteEntitiesStep(plan)step.children+=self._sel_variable_step(plan,rqlst.solutions,rqlst.where,etype,var)steps.append(step)forrelationinrqlst.main_relations:step=DeleteRelationsStep(plan,relation.r_type)step.children+=self._sel_relation_steps(plan,rqlst.solutions,rqlst.where,relation)steps.append(step)returnstepsdef_sel_variable_step(self,plan,solutions,restriction,etype,varref):"""handle the selection of variables for a delete query"""select=Select()varref=varref.copy(select)select.defined_vars={varref.name:varref.variable}select.append_selected(varref)ifrestrictionisnotNone:select.set_where(restriction.copy(select))ifetype!='Any':select.add_type_restriction(varref.variable,etype)returnself._select_plan(plan,select,solutions)def_sel_relation_steps(self,plan,solutions,restriction,relation):"""handle the selection of relations for a delete query"""select=Select()lhs,rhs=relation.get_variable_parts()select.append_selected(lhs.copy(select))select.append_selected(rhs.copy(select))select.set_where(relation.copy(select))ifrestrictionisnotNone:select.add_restriction(restriction.copy(select))returnself._select_plan(plan,select,solutions)defbuild_set_plan(self,plan,rqlst):"""get an execution plan from an SET RQL query"""select=Select()# extract variables to add to the selectionselected_index={}index=0relations,attrrelations=[],[]getrschema=self.schema.rschemaforrelationinrqlst.main_relations:ifrelation.r_typeinVIRTUAL_RTYPES:raiseQueryError('can not assign to %r relation'%relation.r_type)lhs,rhs=relation.get_variable_parts()ifnotlhs.as_string('utf-8')inselected_index:select.append_selected(lhs.copy(select))selected_index[lhs.as_string('utf-8')]=indexindex+=1ifnotrhs.as_string('utf-8')inselected_index:select.append_selected(rhs.copy(select))selected_index[rhs.as_string('utf-8')]=indexindex+=1rschema=getrschema(relation.r_type)ifrschema.is_final()orrschema.inlined:attrrelations.append(relation)else:relations.append(relation)# add step necessary to fetch all selected variables valuesifrqlst.whereisnotNone:select.set_where(rqlst.where.copy(select))# set distinct to avoid potential duplicate key errorselect.distinct=Truestep=UpdateStep(plan,attrrelations,relations,selected_index)step.children+=self._select_plan(plan,select,rqlst.solutions)return(step,)# internal methods ########################################################def_select_plan(self,plan,select,solutions):union=Union()union.append(select)select.clean_solutions(solutions)add_types_restriction(self.schema,select)self.rqlhelper.annotate(union)returnself.build_select_plan(plan,union)# execution steps and helper functions ########################################defvarmap_test_repr(varmap,tablesinorder):ifvarmapisNone:returnvarmapmaprepr={}forvar,sqlinvarmap.iteritems():table,col=sql.split('.')maprepr[var]='%s.%s'%(tablesinorder[table],col)returnmapreprdefoffset_result(offset,result):offset-=len(result)ifoffset<0:result=result[offset:]offset=Noneelifoffset==0:offset=Noneresult=()returnoffset,resultclassLimitOffsetMixIn(object):limit=offset=Nonedefset_limit_offset(self,limit,offset):self.limit=limitself.offset=offsetorNoneclassStep(object):"""base abstract class for execution step"""def__init__(self,plan):self.plan=planself.children=[]defexecute_child(self):assertlen(self.children)==1returnself.children[0].execute()defexecute_children(self):forstepinself.children:step.execute()defexecute(self):"""execute this step and store partial (eg this step) results"""raiseNotImplementedError()defmytest_repr(self):"""return a representation of this step suitable for test"""return(self.__class__.__name__,)deftest_repr(self):"""return a representation of this step suitable for test"""returnself.mytest_repr()+([step.test_repr()forstepinself.children],)classOneFetchStep(LimitOffsetMixIn,Step):"""step consisting in fetching data from sources and directly returning results """def__init__(self,plan,union,sources,inputmap=None):Step.__init__(self,plan)self.union=unionself.sources=sourcesself.inputmap=inputmapself.set_limit_offset(union.children[-1].limit,union.children[-1].offset)defset_limit_offset(self,limit,offset):LimitOffsetMixIn.set_limit_offset(self,limit,offset)forselectinself.union.children:select.limit=limitselect.offset=offsetdefexecute(self):"""call .syntax_tree_search with the given syntax tree on each source for each solution """self.execute_children()session=self.plan.sessionargs=self.plan.argsinputmap=self.inputmapunion=self.union# do we have to use a inputmap from a previous step ? If so disable# cachekeyifinputmaporself.plan.cache_keyisNone:cachekey=None# union may have been splited into subqueries, rebuild a cache keyelifisinstance(self.plan.cache_key,tuple):cachekey=list(self.plan.cache_key)cachekey[0]=union.as_string()cachekey=tuple(cachekey)else:cachekey=union.as_string()result=[]# limit / offset processinglimit=self.limitoffset=self.offsetifoffsetisnotNone:iflen(self.sources)>1:# we'll have to deal with limit/offset by ourselfifunion.children[-1].limit:union.children[-1].limit=limit+offsetunion.children[-1].offset=Noneelse:offset,limit=None,Noneforsourceinself.sources:ifoffsetisNoneandlimitisnotNone:# modifying the sample rqlst is enough since sql generation# will pick it here as wellunion.children[-1].limit=limit-len(result)result_=source.syntax_tree_search(session,union,args,cachekey,inputmap)ifoffsetisnotNone:offset,result_=offset_result(offset,result_)result+=result_iflimitisnotNone:iflen(result)>=limit:returnresult[:limit]#print 'ONEFETCH RESULT %s' % (result)returnresultdefmytest_repr(self):"""return a representation of this step suitable for test"""try:inputmap=varmap_test_repr(self.inputmap,self.plan.tablesinorder)exceptAttributeError:inputmap=self.inputmapreturn(self.__class__.__name__,sorted((r.as_string(kwargs=self.plan.args),r.solutions)forrinself.union.children),self.limit,self.offset,sorted(self.sources),inputmap)# UPDATE/INSERT/DELETE steps ##################################################classRelationsStep(Step):"""step consisting in adding attributes/relations to entity defs from a previous FetchStep relations values comes from the latest result, with one columns for each relation defined in self.rdefs for one entity definition, we'll construct N entity, where N is the number of the latest result """FINAL=0RELATION=1REVERSE_RELATION=2def__init__(self,plan,edef,rdefs):Step.__init__(self,plan)# partial entity definition to expandself.edef=edef# definition of relations to completeself.rdefs=rdefsdefexecute(self):"""execute this step"""base_edef=self.edefedefs=[]result=self.execute_child()forrowinresult:# get a new entity definition for this rowedef=copy(base_edef)# complete this entity def using row valuesforiinrange(len(self.rdefs)):rtype,rorder=self.rdefs[i]ifrorder==RelationsStep.FINAL:edef[rtype]=row[i]elifrorder==RelationsStep.RELATION:self.plan.add_relation_def((edef,rtype,row[i]))edef.querier_pending_relations[(rtype,'subject')]=row[i]else:self.plan.add_relation_def((row[i],rtype,edef))edef.querier_pending_relations[(rtype,'object')]=row[i]edefs.append(edef)self.plan.substitute_entity_def(base_edef,edefs)returnresultclassInsertStep(Step):"""step consisting in inserting new entities / relations"""defexecute(self):"""execute this step"""forstepinself.children:assertisinstance(step,RelationsStep)step.plan=self.planstep.execute()# insert entities firstresult=self.plan.insert_entity_defs()# then relationself.plan.insert_relation_defs()# return eids of inserted entitiesreturnresultclassDeleteEntitiesStep(Step):"""step consisting in deleting entities"""defexecute(self):"""execute this step"""todelete=frozenset(typed_eid(eid)foreid,inself.execute_child())session=self.plan.sessiondelete=session.repo.glob_delete_entity# register pending eids first to avoid multiple deletionpending=session.transaction_data.setdefault('pendingeids',set())actual=todelete-pendingpending|=actualforeidinactual:delete(session,eid)classDeleteRelationsStep(Step):"""step consisting in deleting relations"""def__init__(self,plan,rtype):Step.__init__(self,plan)self.rtype=rtypedefexecute(self):"""execute this step"""session=self.plan.sessiondelete=session.repo.glob_delete_relationforsubj,objinself.execute_child():delete(session,subj,self.rtype,obj)classUpdateStep(Step):"""step consisting in updating entities / adding relations from relations definitions and from results fetched in previous step """def__init__(self,plan,attribute_relations,relations,selected_index):Step.__init__(self,plan)self.attribute_relations=attribute_relationsself.relations=relationsself.selected_index=selected_indexdefexecute(self):"""execute this step"""plan=self.plansession=self.plan.sessionrepo=session.repoedefs={}# insert relationsattributes=set([relation.r_typeforrelationinself.attribute_relations])result=self.execute_child()forrowinresult:forrelationinself.attribute_relations:lhs,rhs=relation.get_variable_parts()eid=typed_eid(row[self.selected_index[str(lhs)]])try:edef=edefs[eid]exceptKeyError:edefs[eid]=edef=session.entity_from_eid(eid)ifisinstance(rhs,Constant):# add constant values to entity defvalue=rhs.eval(plan.args)edef[relation.r_type]=valueelse:edef[relation.r_type]=row[self.selected_index[str(rhs)]]forrelationinself.relations:subj=row[self.selected_index[str(relation.children[0])]]obj=row[self.selected_index[str(relation.children[1])]]repo.glob_add_relation(session,subj,relation.r_type,obj)# update entitiesforeid,edefinedefs.iteritems():repo.glob_update_entity(session,edef,attributes)returnresult