"""plan execution of rql queries on a single source:organization: Logilab:copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr"""__docformat__="restructuredtext en"fromcopyimportcopyfromrql.stmtsimportUnion,Selectfromrql.nodesimportConstantfromcubicwebimportQueryError,typed_eiddefadd_types_restriction(schema,rqlst,newroot=None,solutions=None):ifnewrootisNone:assertsolutionsisNoneifhasattr(rqlst,'_types_restr_added'):returnsolutions=rqlst.solutionsnewroot=rqlstrqlst._types_restr_added=Trueelse:assertsolutionsisnotNonerqlst=rqlst.stmteschema=schema.eschemaallpossibletypes={}forsolutioninsolutions:forvarname,etypeinsolution.iteritems():ifnotvarnameinnewroot.defined_varsoreschema(etype).is_final():continueallpossibletypes.setdefault(varname,set()).add(etype)forvarnameinsorted(allpossibletypes):try:var=newroot.defined_vars[varname]exceptKeyError:continuestinfo=var.stinfoifstinfo.get('uidrels'):continue# eid specified, no need for additional type specificationtry:typerels=rqlst.defined_vars[varname].stinfo.get('typerels')exceptKeyError:assertvarnameinrqlst.aliasescontinueifnewrootisrqlstandtyperels:mytyperel=iter(typerels).next()else:forvrefinnewroot.defined_vars[varname].references():rel=vref.relation()ifrelandrel.is_types_restriction():mytyperel=relbreakelse:mytyperel=Nonepossibletypes=allpossibletypes[varname]ifmytyperelisnotNone:# variable as already some types restriction. new possible types# can only be a subset of existing ones, so only remove no more# possible typesforcstinmytyperel.get_nodes(Constant):ifnotcst.valueinpossibletypes:cst.parent.remove(cst)try:stinfo['possibletypes'].remove(cst.value)exceptKeyError:# restriction on a type not used by this query, may# occurs with X is IN(...)passelse:# we have to add types restrictionifstinfo.get('scope')isnotNone:rel=var.scope.add_type_restriction(var,possibletypes)else:# tree is not annotated yet, no scope set so add the restriction# to the rootrel=newroot.add_type_restriction(var,possibletypes)stinfo['typerels']=frozenset((rel,))stinfo['possibletypes']=possibletypesclassSSPlanner(object):"""SingleSourcePlanner: build execution plan for rql queries optimized for single source repositories """def__init__(self,schema,rqlhelper):self.schema=schemaself.rqlhelper=rqlhelperdefbuild_plan(self,plan):"""build an execution plan from a RQL query do nothing here, dispatch according to the statement type """build_plan=getattr(self,'build_%s_plan'%plan.rqlst.TYPE)forstepinbuild_plan(plan,plan.rqlst):plan.add_step(step)defbuild_select_plan(self,plan,rqlst):"""build execution plan for a SELECT RQL query. Suppose only one source is available and so avoid work need for query decomposition among sources the rqlst should not be tagged at this point. """plan.preprocess(rqlst)return(OneFetchStep(plan,rqlst,plan.session.repo.sources),)defbuild_insert_plan(self,plan,rqlst):"""get an execution plan from an INSERT RQL query"""# each variable in main variables is a new entity to insertto_build={}session=plan.sessionforetype,varinrqlst.main_variables:# need to do this since entity class is shared w. web client code !to_build[var.name]=session.etype_class(etype)(session,None,None)plan.add_entity_def(to_build[var.name])# add constant values to entity def, mark variables to be selectedto_select=plan.relation_definitions(rqlst,to_build)# add necessary steps to add relations and update attributesstep=InsertStep(plan)# insert each entity and its relationsstep.children+=self._compute_relation_steps(plan,rqlst.solutions,rqlst.where,to_select)return(step,)def_compute_relation_steps(self,plan,solutions,restriction,to_select):"""handle the selection of relations for an insert query"""foredef,rdefsinto_select.items():# create a select rql st to fetch needed dataselect=Select()eschema=edef.e_schemaforiinrange(len(rdefs)):rtype,term,reverse=rdefs[i]select.append_selected(term.copy(select))ifreverse:rdefs[i]=rtype,RelationsStep.REVERSE_RELATIONelse:rschema=eschema.subject_relation(rtype)ifrschema.is_final()orrschema.inlined:rdefs[i]=rtype,RelationsStep.FINALelse:rdefs[i]=rtype,RelationsStep.RELATIONifrestrictionisnotNone:select.set_where(restriction.copy(select))step=RelationsStep(plan,edef,rdefs)step.children+=self._select_plan(plan,select,solutions)yieldstepdefbuild_delete_plan(self,plan,rqlst):"""get an execution plan from a DELETE RQL query"""# build a select query to fetch entities to deletesteps=[]foretype,varinrqlst.main_variables:step=DeleteEntitiesStep(plan)step.children+=self._sel_variable_step(plan,rqlst.solutions,rqlst.where,etype,var)steps.append(step)forrelationinrqlst.main_relations:step=DeleteRelationsStep(plan,relation.r_type)step.children+=self._sel_relation_steps(plan,rqlst.solutions,rqlst.where,relation)steps.append(step)returnstepsdef_sel_variable_step(self,plan,solutions,restriction,etype,varref):"""handle the selection of variables for a delete query"""select=Select()varref=varref.copy(select)select.defined_vars={varref.name:varref.variable}select.append_selected(varref)ifrestrictionisnotNone:select.set_where(restriction.copy(select))ifetype!='Any':select.add_type_restriction(varref.variable,etype)returnself._select_plan(plan,select,solutions)def_sel_relation_steps(self,plan,solutions,restriction,relation):"""handle the selection of relations for a delete query"""select=Select()lhs,rhs=relation.get_variable_parts()select.append_selected(lhs.copy(select))select.append_selected(rhs.copy(select))select.set_where(relation.copy(select))ifrestrictionisnotNone:select.add_restriction(restriction.copy(select))returnself._select_plan(plan,select,solutions)defbuild_set_plan(self,plan,rqlst):"""get an execution plan from an SET RQL query"""select=Select()# extract variables to add to the selectionselected_index={}index=0relations,attrrelations=[],[]getrschema=self.schema.rschemaforrelationinrqlst.main_relations:ifrelation.r_typein('eid','has_text','identity'):raiseQueryError('can not assign to %r relation'%relation.r_type)lhs,rhs=relation.get_variable_parts()ifnotlhs.as_string('utf-8')inselected_index:select.append_selected(lhs.copy(select))selected_index[lhs.as_string('utf-8')]=indexindex+=1ifnotrhs.as_string('utf-8')inselected_index:select.append_selected(rhs.copy(select))selected_index[rhs.as_string('utf-8')]=indexindex+=1rschema=getrschema(relation.r_type)ifrschema.is_final()orrschema.inlined:attrrelations.append(relation)else:relations.append(relation)# add step necessary to fetch all selected variables valuesifrqlst.whereisnotNone:select.set_where(rqlst.where.copy(select))# set distinct to avoid potential duplicate key errorselect.distinct=Truestep=UpdateStep(plan,attrrelations,relations,selected_index)step.children+=self._select_plan(plan,select,rqlst.solutions)return(step,)# internal methods ########################################################def_select_plan(self,plan,select,solutions):union=Union()union.append(select)select.clean_solutions(solutions)add_types_restriction(self.schema,select)self.rqlhelper.annotate(union)returnself.build_select_plan(plan,union)# execution steps and helper functions ########################################defvarmap_test_repr(varmap,tablesinorder):ifvarmapisNone:returnvarmapmaprepr={}forvar,sqlinvarmap.iteritems():table,col=sql.split('.')maprepr[var]='%s.%s'%(tablesinorder[table],col)returnmapreprdefoffset_result(offset,result):offset-=len(result)ifoffset<0:result=result[offset:]offset=Noneelifoffset==0:offset=Noneresult=()returnoffset,resultclassLimitOffsetMixIn(object):limit=offset=Nonedefset_limit_offset(self,limit,offset):self.limit=limitself.offset=offsetorNoneclassStep(object):"""base abstract class for execution step"""def__init__(self,plan):self.plan=planself.children=[]defexecute_child(self):assertlen(self.children)==1returnself.children[0].execute()defexecute_children(self):forstepinself.children:step.execute()defexecute(self):"""execute this step and store partial (eg this step) results"""raiseNotImplementedError()defmytest_repr(self):"""return a representation of this step suitable for test"""return(self.__class__.__name__,)deftest_repr(self):"""return a representation of this step suitable for test"""returnself.mytest_repr()+([step.test_repr()forstepinself.children],)classOneFetchStep(LimitOffsetMixIn,Step):"""step consisting in fetching data from sources and directly returning results """def__init__(self,plan,union,sources,inputmap=None):Step.__init__(self,plan)self.union=unionself.sources=sourcesself.inputmap=inputmapself.set_limit_offset(union.children[-1].limit,union.children[-1].offset)defset_limit_offset(self,limit,offset):LimitOffsetMixIn.set_limit_offset(self,limit,offset)forselectinself.union.children:select.limit=limitselect.offset=offsetdefexecute(self):"""call .syntax_tree_search with the given syntax tree on each source for each solution """self.execute_children()session=self.plan.sessionargs=self.plan.argsinputmap=self.inputmapunion=self.union# do we have to use a inputmap from a previous step ? If so disable# cachekeyifinputmaporself.plan.cache_keyisNone:cachekey=None# union may have been splited into subqueries, rebuild a cache keyelifisinstance(self.plan.cache_key,tuple):cachekey=list(self.plan.cache_key)cachekey[0]=union.as_string()cachekey=tuple(cachekey)else:cachekey=union.as_string()result=[]# limit / offset processinglimit=self.limitoffset=self.offsetifoffsetisnotNone:iflen(self.sources)>1:# we'll have to deal with limit/offset by ourselfifunion.children[-1].limit:union.children[-1].limit=limit+offsetunion.children[-1].offset=Noneelse:offset,limit=None,Noneforsourceinself.sources:ifoffsetisNoneandlimitisnotNone:# modifying the sample rqlst is enough since sql generation# will pick it here as wellunion.children[-1].limit=limit-len(result)result_=source.syntax_tree_search(session,union,args,cachekey,inputmap)ifoffsetisnotNone:offset,result_=offset_result(offset,result_)result+=result_iflimitisnotNone:iflen(result)>=limit:returnresult[:limit]#print 'ONEFETCH RESULT %s' % (result)returnresultdefmytest_repr(self):"""return a representation of this step suitable for test"""try:inputmap=varmap_test_repr(self.inputmap,self.plan.tablesinorder)exceptAttributeError:inputmap=self.inputmapreturn(self.__class__.__name__,sorted((r.as_string(kwargs=self.plan.args),r.solutions)forrinself.union.children),self.limit,self.offset,sorted(self.sources),inputmap)# UPDATE/INSERT/DELETE steps ##################################################classRelationsStep(Step):"""step consisting in adding attributes/relations to entity defs from a previous FetchStep relations values comes from the latest result, with one columns for each relation defined in self.r_defs for one entity definition, we'll construct N entity, where N is the number of the latest result """FINAL=0RELATION=1REVERSE_RELATION=2def__init__(self,plan,e_def,r_defs):Step.__init__(self,plan)# partial entity definition to expandself.e_def=e_def# definition of relations to completeself.r_defs=r_defsdefexecute(self):"""execute this step"""base_e_def=self.e_defresult=[]forrowinself.execute_child():# get a new entity definition for this rowe_def=copy(base_e_def)# complete this entity def using row valuesforiinrange(len(self.r_defs)):rtype,rorder=self.r_defs[i]ifrorder==RelationsStep.FINAL:e_def[rtype]=row[i]elifrorder==RelationsStep.RELATION:self.plan.add_relation_def((e_def,rtype,row[i]))e_def.querier_pending_relations[(rtype,'subject')]=row[i]else:self.plan.add_relation_def((row[i],rtype,e_def))e_def.querier_pending_relations[(rtype,'object')]=row[i]result.append(e_def)self.plan.substitute_entity_def(base_e_def,result)classInsertStep(Step):"""step consisting in inserting new entities / relations"""defexecute(self):"""execute this step"""forstepinself.children:assertisinstance(step,RelationsStep)step.plan=self.planstep.execute()# insert entities firstresult=self.plan.insert_entity_defs()# then relationself.plan.insert_relation_defs()# return eids of inserted entitiesreturnresultclassDeleteEntitiesStep(Step):"""step consisting in deleting entities"""defexecute(self):"""execute this step"""todelete=frozenset(typed_eid(eid)foreid,inself.execute_child())session=self.plan.sessiondelete=session.repo.glob_delete_entity# register pending eids first to avoid multiple deletionpending=session.query_data('pendingeids',set(),setdefault=True)actual=todelete-pendingpending|=actualforeidinactual:delete(session,eid)classDeleteRelationsStep(Step):"""step consisting in deleting relations"""def__init__(self,plan,rtype):Step.__init__(self,plan)self.rtype=rtypedefexecute(self):"""execute this step"""session=self.plan.sessiondelete=session.repo.glob_delete_relationforsubj,objinself.execute_child():delete(session,subj,self.rtype,obj)classUpdateStep(Step):"""step consisting in updating entities / adding relations from relations definitions and from results fetched in previous step """def__init__(self,plan,attribute_relations,relations,selected_index):Step.__init__(self,plan)self.attribute_relations=attribute_relationsself.relations=relationsself.selected_index=selected_indexdefexecute(self):"""execute this step"""plan=self.plansession=self.plan.sessionrepo=session.repoedefs={}# insert relationsforrowinself.execute_child():forrelationinself.attribute_relations:lhs,rhs=relation.get_variable_parts()eid=typed_eid(row[self.selected_index[str(lhs)]])try:edef=edefs[eid]exceptKeyError:edefs[eid]=edef=session.eid_rset(eid).get_entity(0,0)ifisinstance(rhs,Constant):# add constant values to entity defvalue=rhs.eval(plan.args)edef[relation.r_type]=valueelse:edef[relation.r_type]=row[self.selected_index[str(rhs)]]forrelationinself.relations:subj=row[self.selected_index[str(relation.children[0])]]obj=row[self.selected_index[str(relation.children[1])]]repo.glob_add_relation(session,subj,relation.r_type,obj)# update entitiesresult=[]foreid,edefinedefs.iteritems():repo.glob_update_entity(session,edef)result.append((eid,))returnresult