[form] simply skip unknown argument given to form constructor, don't fail
"""plan execution of rql queries on multiple sourcesthe best way to understand what are we trying to acheive here is to read theunit-tests in unittest_msplanner.pyWhat you need to know~~~~~~~~~~~~~~~~~~~~~1. The system source is expected to support every entity and relation types2. Given "X relation Y": * if relation, X and Y types are supported by the external source, we suppose by default that X and Y should both come from the same source as the relation. You can specify otherwise by adding relation into the "cross_relations" set in the source's mapping file and it that case, we'll consider that we can also find in the system source some relation between X and Y coming from different sources. * if "relation" isn't supported by the external source but X or Y types (or both) are, we suppose by default that can find in the system source some relation where X and/or Y come from the external source. You can specify otherwise by adding relation into the "dont_cross_relations" set in the source's mapping file and it that case, we'll consider that we can only find in the system source some relation between X and Y coming the system source.Implementation~~~~~~~~~~~~~~XXX explain algorithmExemples of multi-sources query execution~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~For a system source and a ldap user source (only CWUser and its attributesis supported, no group or such)::CWUser X:1. fetch CWUser X from both sources and return concatenation of results:CWUser X WHERE X in_group G, G name 'users':* catch 1 1. fetch CWUser X from both sources, store concatenation of results into a temporary table 2. return the result of TMP X WHERE X in_group G, G name 'users' from the system source* catch 2 1. return the result of CWUser X WHERE X in_group G, G name 'users' from system source, that's enough (optimization of the sql querier will avoid join on CWUser, so we will directly get local eids):CWUser X,L WHERE X in_group G, X login L, G name 'users':1. fetch Any X,L WHERE X is CWUser, X login L from both sources, store concatenation of results into a temporary table2. return the result of Any X, L WHERE X is TMP, X login LX in_group G, G name 'users' from the system source:Any X WHERE X owned_by Y:* catch 1 1. fetch CWUser X from both sources, store concatenation of results into a temporary table 2. return the result of Any X WHERE X owned_by Y, Y is TMP from the system source* catch 2 1. return the result of Any X WHERE X owned_by Y from system source, that's enough (optimization of the sql querier will avoid join on CWUser, so we will directly get local eids):organization: Logilab:copyright: 2003-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses"""__docformat__="restructuredtext en"fromitertoolsimportimap,ifilterfalsefromlogilab.common.compatimportanyfromlogilab.common.decoratorsimportcachedfromrql.stmtsimportUnion,Selectfromrql.nodesimportVariableRef,Comparison,Relation,Constant,Variablefromcubicwebimportserverfromcubicweb.utilsimportmake_uidfromcubicweb.server.utilsimportcleanup_solutionsfromcubicweb.server.ssplannerimport(SSPlanner,OneFetchStep,add_types_restriction)fromcubicweb.server.msstepsimport*fromcubicweb.server.sourcesimportAbstractSourceVariable._ms_table_key=lambdax:x.nameRelation._ms_table_key=lambdax:x.r_type# str() Constant.value to ensure generated table name won't be unicodeConstant._ms_table_key=lambdax:str(x.value)defneed_source_access_relation(vargraph):ifnotvargraph:returnFalse# check vargraph contains some other relation than the identity relation# test of key nature since it may be a variable name (don't care about that)# or a 2-uple (var1, var2) associated to the relation to traverse to go from# var1 to var2returnany(keyforkey,valinvargraph.iteritems()ifisinstance(key,tuple)andval!='identity')defneed_aggr_step(select,sources,stepdefs=None):"""return True if a temporary table is necessary to store some partial results to execute the given query """iflen(sources)==1:# can do everything at once with a single sourcereturnFalseifselect.orderbyorselect.groupbyorselect.has_aggregat:# if more than one source, we need a temp table to deal with sort /# groups / aggregat if :# * the rqlst won't be splitted (in the other case the last query# using partial temporary table can do sort/groups/aggregat without# the need for a later AggrStep)# * the rqlst is splitted in multiple steps and there are more than one# final stepifstepdefsisNone:returnTruehas_one_final=Falsefstepsolindices=set()forstepdefinstepdefs:ifstepdef[-1]:ifhas_one_finalorfrozenset(stepdef[2])!=fstepsolindices:returnTruehas_one_final=Trueelse:fstepsolindices.update(stepdef[2])returnFalsedefselect_group_sort(select):# XXX something similar done in rql2sql# add variables used in groups and sort terms to the selection# if necessaryifselect.groupby:forvrefinselect.groupby:ifnotvrefinselect.selection:select.append_selected(vref.copy(select))forsortterminselect.orderby:forvrefinsortterm.iget_nodes(VariableRef):ifnotvrefinselect.get_selected_variables():# we can't directly insert sortterm.term because it references# a variable of the select before the copy.# XXX if constant term are used to define sort, their value# may necessite a decayselect.append_selected(vref.copy(select))ifselect.groupbyandnotvrefinselect.groupby:select.add_group_var(vref.copy(select))defallequals(solutions):"""return true if all solutions are identical"""sol=solutions.next()noconstsol=Noneforsol_insolutions:ifsol_!=sol:returnFalsereturnTrue# XXX move functions below to rql ##############################################defis_ancestor(n1,n2):"""return True if n2 is a parent scope of n1"""p=n1.parentwhilepisnotNone:ifpisn2:returnTruep=p.parentreturnFalsedefcopy_node(newroot,node,subparts=()):newnode=node.__class__(*node.initargs(newroot))forpartinsubparts:newnode.append(part)returnnewnodedefused_in_outer_scope(var,scope):"""return true if the variable is used in an outer scope of the given scope """forrelinvar.stinfo['relations']:rscope=rel.scopeifnotrscopeisscopeandis_ancestor(scope,rscope):returnTruereturnFalse################################################################################classPartPlanInformation(object):"""regroups necessary information to execute some part of a "global" rql query ("global" means as received by the querier, which may result in several internal queries, e.g. parts, due to security insertions). Actually a PPI is created for each subquery and for each query in a union. It exposes as well some methods helping in executing this part on a multi-sources repository, modifying its internal structure during the process. :attr plan: the execution plan :attr rqlst: the original rql syntax tree handled by this part :attr needsplit: bool telling if the query has to be split into multiple steps for execution or if it can be executed at once :attr temptable: a SQL temporary table name or None, if necessary to handle aggregate / sorting for this part of the query :attr finaltable: a SQL table name or None, if results for this part of the query should be written into a temporary table (usually shared by multiple PPI) :attr sourcesterms: a dictionary {source : {term: set([solution index, ])}} telling for each source which terms are supported for which solutions. A "term" may be either a rql Variable, Constant or Relation node. """def__init__(self,plan,rqlst,rqlhelper=None):self.plan=planself.rqlst=rqlstself.needsplit=Falseself.temptable=Noneself.finaltable=None# shortcutsself._schema=plan.schemaself._session=plan.sessionself._repo=self._session.repoself._solutions=rqlst.solutionsself._solindices=range(len(self._solutions))self.system_source=self._repo.system_source# source : {term: [solution index, ]}self.sourcesterms=self._sourcesterms={}# source : {relation: set(child variable and constant)}self._crossrelations={}# dictionary of variables and constants which are linked to each other# using a non final relation supported by multiple sources (crossed or# not).self._linkedterms={}# processingtermssources=self._compute_sourcesterms()self._remove_invalid_sources(termssources)self._compute_needsplit()# after initialisation, .sourcesterms contains the same thing as# ._sourcesterms though during plan construction, ._sourcesterms will# be modified while .sourcesterms will be kept unmodifiedself.sourcesterms={}fork,vinself._sourcesterms.iteritems():self.sourcesterms[k]={}fork2,v2inv.iteritems():self.sourcesterms[k][k2]=v2.copy()# cleanup linked varforvar,linkedrelsinfoinself._linkedterms.iteritems():self._linkedterms[var]=frozenset(x[0]forxinlinkedrelsinfo)# map output of a step to input of a following stepself._inputmaps={}# record input map conflicts to resolve them on final step generationself._conflicts=[]ifrqlhelperisnotNone:# else testself._insert_identity_variable=rqlhelper._annotator.rewrite_shared_optionalifserver.DEBUG&server.DBG_MS:print'sourcesterms:'self._debug_sourcesterms()def_debug_sourcesterms(self):forsourceinself._sourcesterms:print'-',sourceforterm,solsinself._sourcesterms[source].items():print' -',term,id(term),':',solsdefcopy_solutions(self,solindices):return[self._solutions[solidx].copy()forsolidxinsolindices]@property@cacheddefpart_sources(self):ifself._sourcesterms:returntuple(sorted(self._sourcesterms))return(self.system_source,)@property@cacheddef_sys_source_set(self):returnfrozenset((self.system_source,solindex)forsolindexinself._solindices)@cacheddef_norel_support_set(self,relation):"""return a set of (source, solindex) where source doesn't support the relation """returnfrozenset((source,solidx)forsourceinself._repo.sourcesforsolidxinself._solindicesifnot((source.support_relation(relation.r_type))orrelation.r_typeinsource.dont_cross_relations))def_compute_sourcesterms(self):"""compute for each term (variable, rewritten constant, relation) and for each solution in the rqlst which sources support them """repo=self._repoeschema=self._schema.eschemasourcesterms=self._sourcesterms# find for each source which variable/solution are supportedforvarname,varobjinself.rqlst.defined_vars.items():# if variable has an eid specified, we can get its source directly# NOTE: use uidrels and not constnode to deal with "X eid IN(1,2,3,4)"ifvarobj.stinfo['uidrels']:vrels=varobj.stinfo['relations']-varobj.stinfo['uidrels']forrelinvarobj.stinfo['uidrels']:ifrel.neged(strict=True)orrel.operator()!='=':continueforconstinrel.children[1].get_nodes(Constant):eid=const.eval(self.plan.args)source=self._session.source_from_eid(eid)ifvrelsandnotany(source.support_relation(r.r_type)forrinvrels):self._set_source_for_term(self.system_source,varobj)else:self._set_source_for_term(source,varobj)continuerels=varobj.stinfo['relations']ifnotrelsandnotvarobj.stinfo['typerels']:# (rare) case where the variable has no type specified nor# relation accessed ex. "Any MAX(X)"self._set_source_for_term(self.system_source,varobj)continuefori,solinenumerate(self._solutions):vartype=sol[varname]# skip final variableifeschema(vartype).is_final():breakforsourceinrepo.sources:ifsource.support_entity(vartype):# the source support the entity type, though we will# actually have to fetch from it only if# * the variable isn't invariant# * at least one supported relation specifiedifnotvarobj._q_invariantor \any(imap(source.support_relation,(r.r_typeforrinrelsifr.r_typenotin('identity','eid')))):sourcesterms.setdefault(source,{}).setdefault(varobj,set()).add(i)# if variable is not invariant and is used by a relation# not supported by this source, we'll have to split the# queryifnotvarobj._q_invariantandany(ifilterfalse(source.support_relation,(r.r_typeforrinrels))):self.needsplit=True# add source for rewritten constants to sourcestermsforvconstsinself.rqlst.stinfo['rewritten'].itervalues():const=vconsts[0]source=self._session.source_from_eid(const.eval(self.plan.args))ifsourceisself.system_source:forconstinvconsts:self._set_source_for_term(source,const)elifnotself._sourcesterms:self._set_source_for_term(source,const)elifsourceinself._sourcesterms:source_scopes=frozenset(t.scopefortinself._sourcesterms[source])forconstinvconsts:ifconst.scopeinsource_scopes:self._set_source_for_term(source,const)# if system source is used, add every rewritten constant# to its supported terms even when associated entity# doesn't actually come from it so we get a changes# that allequals will return True as expected when# computing needsplit# check const is used in a relation restrictionifconst.relation()andself.system_sourceinsourcesterms:self._set_source_for_term(self.system_source,const)# add source for relationsrschema=self._schema.rschematermssources={}forrelinself.rqlst.iget_nodes(Relation):# process non final relations only# note: don't try to get schema for 'is' relation (not available# during bootstrap)ifnotrel.is_types_restriction()andnotrschema(rel.r_type).is_final():# nothing to do if relation is not supported by multiple sources# or if some source has it listed in its cross_relations# attribute## XXX code below don't deal if some source allow relation# crossing but not another onerelsources=repo.rel_type_sources(rel.r_type)iflen(relsources)<2:# filter out sources being there because they have this# relation in their dont_cross_relations attributerelsources=[sourceforsourceinrelsourcesifsource.support_relation(rel.r_type)]ifrelsources:# this means the relation is using a variable inlined as# a constant and another unsupported variable, in which# case we put the relation in sourcestermsself._sourcesterms.setdefault(relsources[0],{})[rel]=set(self._solindices)continuelhs,rhs=rel.get_variable_parts()lhsv,rhsv=getattr(lhs,'variable',lhs),getattr(rhs,'variable',rhs)# update dictionary of sources supporting lhs and rhs varsifnotlhsvintermssources:termssources[lhsv]=self._term_sources(lhs)ifnotrhsvintermssources:termssources[rhsv]=self._term_sources(rhs)self._handle_cross_relation(rel,relsources,termssources)self._linkedterms.setdefault(lhsv,set()).add((rhsv,rel))self._linkedterms.setdefault(rhsv,set()).add((lhsv,rel))returntermssourcesdef_handle_cross_relation(self,rel,relsources,termssources):forsourceinrelsources:ifrel.r_typeinsource.cross_relations:ssource=self.system_sourcecrossvars=set(x.variableforxinrel.get_nodes(VariableRef))forconstinrel.get_nodes(Constant):ifsource.uri!='system'andnotconstinself._sourcesterms.get(source,()):continuecrossvars.add(const)self._crossrelations.setdefault(source,{})[rel]=crossvarsiflen(crossvars)<2:# this means there is a constant in the relation which is# not supported by the source, so we can stop herecontinueself._sourcesterms.setdefault(ssource,{})[rel]=set(self._solindices)fortermincrossvars:iflen(termssources[term])==1anditer(termssources[term]).next()[0].uri=='system':forovincrossvars:ifovisnottermand(isinstance(ov,Constant)orov._q_invariant):ssset=frozenset((ssource,))self._remove_sources(ov,termssources[ov]-ssset)breakelse:self._sourcesterms.setdefault(source,{})[rel]=set(self._solindices)def_remove_invalid_sources(self,termssources):"""removes invalid sources from `sourcesterms` member according to traversed relations and their properties (which sources support them, can they cross sources, etc...) """forterminself._linkedterms:self._remove_sources_until_stable(term,termssources)iflen(self._sourcesterms)>1andhasattr(self.plan.rqlst,'main_relations'):# the querier doesn't annotate write queries, need to do it hereself.plan.annotate_rqlst()# insert/update/delete queries, we may get extra information from# the main relation (eg relations to the left of the WHEREifself.plan.rqlst.TYPE=='insert':inserted=dict((vref.variable,etype)foretype,vrefinself.plan.rqlst.main_variables)else:inserted={}repo=self._reporschema=self._schema.rschemaforrelinself.plan.rqlst.main_relations:ifnotrschema(rel.r_type).is_final():# nothing to do if relation is not supported by multiple sourcesiflen(repo.rel_type_sources(rel.r_type))<2:continuelhs,rhs=rel.get_variable_parts()try:lhsv=self._extern_term(lhs,termssources,inserted)rhsv=self._extern_term(rhs,termssources,inserted)exceptKeyError,ex:continueself._remove_term_sources(lhsv,rel,rhsv,termssources)self._remove_term_sources(rhsv,rel,lhsv,termssources)def_extern_term(self,term,termssources,inserted):var=term.variableifvar.stinfo['constnode']:termv=var.stinfo['constnode']termssources[termv]=self._term_sources(termv)elifvarininserted:termv=varsource=self._repo.locate_etype_source(inserted[var])termssources[termv]=set((source,solindex)forsolindexinself._solindices)else:termv=self.rqlst.defined_vars[var.name]ifnottermvintermssources:termssources[termv]=self._term_sources(termv)returntermvdef_remove_sources_until_stable(self,term,termssources):sourcesterms=self._sourcestermsforoterm,relinself._linkedterms.get(term,()):ifnotterm.scopeisoterm.scopeandrel.scope.neged(strict=True):# can't get information from relation inside a NOT exists# where terms don't belong to the same scopecontinueneed_ancestor_scope=Falseifnot(term.scopeisrel.scopeandoterm.scopeisrel.scope):ifrel.ored():continueifrel.ored(traverse_scope=True):# if relation has some OR as parent, constraints should only# propagate from parent scope to child scope, nothing elseneed_ancestor_scope=Truerelsources=self._repo.rel_type_sources(rel.r_type)ifrel.neged(strict=True)and(len(relsources)<2ornotisinstance(oterm,Variable)oroterm.valuable_references()!=1orany(sourcesterms[source][term]!=sourcesterms[source][oterm]forsourceinrelsourcesifterminsourcesterms.get(source,())andoterminsourcesterms.get(source,()))):# neged relation doesn't allow to infer term sources unless# we're on a multisource relation for a term only used by this# relation (eg "Any X WHERE NOT X multisource_rel Y" and over is# Y)continue# compute invalid sources for terms and remove themifnotneed_ancestor_scopeoris_ancestor(term.scope,oterm.scope):self._remove_term_sources(term,rel,oterm,termssources)ifnotneed_ancestor_scopeoris_ancestor(oterm.scope,term.scope):self._remove_term_sources(oterm,rel,term,termssources)def_remove_term_sources(self,term,rel,oterm,termssources):"""remove invalid sources for term according to oterm's sources and the relation between those two terms. """norelsup=self._norel_support_set(rel)termsources=termssources[term]invalid_sources=termsources-(termssources[oterm]|norelsup)ifinvalid_sourcesandself._repo.can_cross_relation(rel.r_type):invalid_sources-=self._sys_source_setifinvalid_sourcesandisinstance(term,Variable) \andself._need_ext_source_access(term,rel):# if the term is a not invariant variable, we should filter out# source where the relation is a cross relation from invalid# sourcesinvalid_sources=frozenset((s,solidx)fors,solidxininvalid_sourcesifnot(sinself._crossrelationsandrelinself._crossrelations[s]))ifinvalid_sources:self._remove_sources(term,invalid_sources)termsources-=invalid_sourcesself._remove_sources_until_stable(term,termssources)ifisinstance(oterm,Constant):self._remove_sources(oterm,invalid_sources)def_compute_needsplit(self):"""tell according to sourcesterms if the rqlst has to be splitted for execution among multiple sources the execution has to be split if * a source support an entity (non invariant) but doesn't support a relation on it * a source support an entity which is accessed by an optional relation * there is more than one source and either all sources'supported variable/solutions are not equivalent or multiple variables have to be fetched from some source """# NOTE: < 2 since may be 0 on queries such as Any X WHERE X eid 2iflen(self._sourcesterms)<2:self.needsplit=False# if this is not the system source but we have only constant terms# and no relation (other than eid), apply query on the system source## testing for rqlst with nothing in vargraph nor defined_vars is the# simplest way the check the condition explained belowifnotself.system_sourceinself._sourcestermsand \notself.rqlst.defined_varsand \notneed_source_access_relation(self.rqlst.vargraph):self._sourcesterms={self.system_source:{}}elifnotself.needsplit:ifnotallequals(self._sourcesterms.itervalues()):forsource,termsinself._sourcesterms.iteritems():ifsourceisself.system_source:continueifany(xforxintermsifnotisinstance(x,Constant)):self.needsplit=Truereturnself._sourcesterms={self.system_source:{}}self.needsplit=Falseelse:sample=self._sourcesterms.itervalues().next()iflen(sample)>1:forterminsample:# need split if unlinked variableifisinstance(term,Variable)andnotterminself._linkedterms:self.needsplit=Truebreakelse:# need split if there are some cross relation on non# invariant variable or if the variable is used in# multi-sources relationifself._crossrelations:forreldictinself._crossrelations.itervalues():forrel,termsinreldict.iteritems():forterminterms:ifisinstance(term,Variable) \andself._need_ext_source_access(term,rel):self.needsplit=Truereturn@cacheddef_need_ext_source_access(self,var,rel):ifnotvar._q_invariant:returnTrueifany(rforx,rinself._linkedterms[var]ifnotrisrelandself._repo.is_multi_sources_relation(r.r_type)):returnTruereturnFalsedef_set_source_for_term(self,source,term):self._sourcesterms.setdefault(source,{})[term]=set(self._solindices)def_term_sources(self,term):"""returns possible sources for terms `term`"""ifisinstance(term,Constant):source=self._session.source_from_eid(term.eval(self.plan.args))returnset((source,solindex)forsolindexinself._solindices)else:var=getattr(term,'variable',term)sources=[sourceforsource,varobjsinself.sourcesterms.iteritems()ifvarinvarobjs]returnset((source,solindex)forsourceinsourcesforsolindexinself.sourcesterms[source][var])def_remove_sources(self,term,sources):"""removes invalid sources (`sources`) from `sourcesterms` :param sources: the list of sources to remove :param term: the analyzed term """sourcesterms=self._sourcestermsforsource,solindexinsources:try:sourcesterms[source][term].remove(solindex)exceptKeyError:importrql.baseasrqlbassertisinstance(term,rqlb.BaseNode),repr(term)continue# may occur with subquery column aliasifnotsourcesterms[source][term]:delsourcesterms[source][term]ifnotsourcesterms[source]:delsourcesterms[source]defcrossed_relation(self,source,relation):returnrelationinself._crossrelations.get(source,())defpart_steps(self):"""precompute necessary part steps before generating actual rql for each step. This is necessary to know if an aggregate step will be necessary or not. """steps=[]select=self.rqlstrschema=self._schema.rschemaforsourceinself.part_sources:try:sourceterms=self._sourcesterms[source]exceptKeyError:continue# already proceedwhilesourceterms:# take a term randomly, and all terms supporting the# same solutionsterm,solindices=self._choose_term(sourceterms)ifsource.uri=='system':# ensure all variables are available for the latest step# (missing one will be available from temporary tables# of previous steps)scope=selectterms=scope.defined_vars.values()+scope.aliases.values()sourceterms.clear()sources=[source]else:scope=term.scope# find which sources support the same term and solutionssources=self._expand_sources(source,term,solindices)# no try to get as much terms as possibleterms=self._expand_terms(term,sources,sourceterms,scope,solindices)iflen(terms)==1andisinstance(terms[0],Constant):# we can't generate anything interesting with a single# constant term (will generate an empty "Any" query),# go to the next iteration directly!continueifnotsourceterms:try:delself._sourcesterms[source]exceptKeyError:# XXX already cleanedpass# set of terms which should be additionaly selected when# possibleneedsel=set()ifnotself._sourcesterms:terms+=scope.defined_vars.values()+scope.aliases.values()final=Trueelse:# suppose this is a final step until the contrary is provenfinal=scopeisselect# add attribute variables and mark variables which should be# additionaly selected when possibleforvarinselect.defined_vars.itervalues():ifnotvarinterms:stinfo=var.stinfoforovar,rtypeinstinfo['attrvars']:ifovarinterms:needsel.add(var.name)terms.append(var)breakelse:needsel.add(var.name)final=False# check where all relations are supported by the sourcesforrelinscope.iget_nodes(Relation):ifrel.is_types_restriction():continue# take care not overwriting the existing "source" identifierfor_sourceinsources:ifnot_source.support_relation(rel.r_type)or(self.crossed_relation(_source,rel)andnotrelinterms):forvrefinrel.iget_nodes(VariableRef):needsel.add(vref.name)final=Falsebreakelse:ifnotscopeisselect:self._exists_relation(rel,terms,needsel)# if relation is supported by all sources and some of# its lhs/rhs variable isn't in "terms", and the# other end *is* in "terms", mark it have to be# selectedifsource.uri!='system'andnotrschema(rel.r_type).is_final():lhs,rhs=rel.get_variable_parts()try:lhsvar=lhs.variableexceptAttributeError:lhsvar=lhstry:rhsvar=rhs.variableexceptAttributeError:rhsvar=rhsiflhsvarintermsandnotrhsvarinterms:needsel.add(lhsvar.name)elifrhsvarintermsandnotlhsvarinterms:needsel.add(rhsvar.name)iffinalandsource.uri!='system':# check rewritten constantsforvconstsinselect.stinfo['rewritten'].itervalues():const=vconsts[0]eid=const.eval(self.plan.args)_source=self._session.source_from_eid(eid)iflen(sources)>1ornot_sourceinsources:# if there is some rewriten constant used by a not# neged relation while there are some source not# supporting the associated entity, this step can't# be final (unless the relation is explicitly in# `terms`, eg cross relations)forcinvconsts:rel=c.relation()ifrelisNoneornot(relintermsorrel.neged(strict=True)):final=Falsebreakbreakiffinal:self._cleanup_sourcesterms(sources,solindices)steps.append((sources,terms,solindices,scope,needsel,final))returnstepsdef_exists_relation(self,rel,terms,needsel):rschema=self._schema.rschema(rel.r_type)lhs,rhs=rel.get_variable_parts()try:lhsvar,rhsvar=lhs.variable,rhs.variableexceptAttributeError:passelse:# supported relation with at least one end supported, check the# other end is in as well. If not this usually means the# variable is refed by an outer scope and should be substituted# using an 'identity' relation (else we'll get a conflict of# temporary tables)ifrhsvarintermsandnotlhsvarinterms:self._identity_substitute(rel,lhsvar,terms,needsel)eliflhsvarintermsandnotrhsvarinterms:self._identity_substitute(rel,rhsvar,terms,needsel)def_identity_substitute(self,relation,var,terms,needsel):newvar=self._insert_identity_variable(relation.scope,var)ifnewvarisnotNone:# ensure relation is using '=' operator, else we rely on a# sqlgenerator side effect (it won't insert an inequality operator# in this case)relation.children[1].operator='='terms.append(newvar)needsel.add(newvar.name)def_choose_term(self,sourceterms):"""pick one term among terms supported by a source, which will be used as a base to generate an execution step """secondchoice=Noneiflen(self._sourcesterms)>1:# priority to variable from subscopesforterminsourceterms:ifnotterm.scopeisself.rqlst:ifisinstance(term,Variable):returnterm,sourceterms.pop(term)secondchoice=termelse:# priority to variable from outer scopeforterminsourceterms:ifterm.scopeisself.rqlst:ifisinstance(term,Variable):returnterm,sourceterms.pop(term)secondchoice=termifsecondchoiceisnotNone:returnsecondchoice,sourceterms.pop(secondchoice)# priority to variable with the less solutions supported and with the# most valuable refs. Add variable name for test predictabilityvariables=sorted([(var,sols)for(var,sols)insourceterms.items()ifisinstance(var,Variable)],key=lambda(v,s):(len(s),-v.valuable_references(),v.name))ifvariables:var=variables[0][0]returnvar,sourceterms.pop(var)# priority to constantforterminsourceterms:ifisinstance(term,Constant):returnterm,sourceterms.pop(term)# whatever (relation)term=iter(sourceterms).next()returnterm,sourceterms.pop(term)def_expand_sources(self,selected_source,term,solindices):"""return all sources supporting given term / solindices"""sources=[selected_source]sourcesterms=self._sourcestermsforsourceinsourcesterms.keys():ifsourceisselected_source:continueifnot(terminsourcesterms[source]andsolindices.issubset(sourcesterms[source][term])):continuesources.append(source)ifsource.uri!='system'ornot(isinstance(term,Variable)andnotterminself._linkedterms):termsolindices=sourcesterms[source][term]termsolindices-=solindicesifnottermsolindices:delsourcesterms[source][term]ifnotsourcesterms[source]:delsourcesterms[source]returnsourcesdef_expand_terms(self,term,sources,sourceterms,scope,solindices):terms=[term]sources=sorted(sources)sourcesterms=self._sourcestermsnbunlinked=1linkedterms=self._linkedterms# term has to belong to the same scope if there is more# than the system source remainingiflen(sourcesterms)>1andnotscopeisself.rqlst:candidates=(tfortinsourceterms.keys()ifscopeist.scope)else:candidates=sourceterms#.iterkeys()# we only want one unlinked term in each generated querycandidates=[tfortincandidatesifisinstance(t,(Constant,Relation))or(solindices.issubset(sourceterms[t])andtinlinkedterms)]cross_rels={}forsourceinsources:cross_rels.update(self._crossrelations.get(source,{}))exclude={}forrel,crossvarsincross_rels.iteritems():vars=[tfortincrossvarsifisinstance(t,Variable)]try:exclude[vars[0]]=vars[1]exclude[vars[1]]=vars[0]exceptIndexError:passaccept_term=lambdax:(notany(sforsinsourcesifnotxinsourcesterms.get(s,()))andany(tfortintermsiftinlinkedterms.get(x,()))andnotexclude.get(x)interms)ifisinstance(term,Relation)andtermincross_rels:cross_terms=cross_rels.pop(term)base_accept_term=accept_termaccept_term=lambdax:(base_accept_term(x)orxincross_terms)forrefedincross_terms:ifnotrefedincandidates:terms.append(refed)# repeat until no term can't be added, since addition of a new# term may permit to another one to be addedmodified=Truewhilemodifiedandcandidates:modified=Falsefortermincandidates[:]:ifisinstance(term,Constant):relation=term.relation()ifsorted(set(x[0]forxinself._term_sources(term)))!=sources:continueterms.append(term)candidates.remove(term)modified=Truedelsourceterms[term]elifaccept_term(term):terms.append(term)candidates.remove(term)modified=Trueself._cleanup_sourcesterms(sources,solindices,term)returntermsdef_cleanup_sourcesterms(self,sources,solindices,term=None):"""remove solutions so we know they are already processed"""forsourceinsources:try:sourceterms=self._sourcesterms[source]exceptKeyError:continueiftermisNone:forterm,termsolindicesinsourceterms.items():ifisinstance(term,Relation)andself.crossed_relation(source,term):continuetermsolindices-=solindicesifnottermsolindices:delsourceterms[term]else:try:sourceterms[term]-=solindicesifnotsourceterms[term]:delsourceterms[term]exceptKeyError:pass#assert term in cross_termsifnotsourceterms:delself._sourcesterms[source]defmerge_input_maps(self,allsolindices):"""inputmaps is a dictionary with tuple of solution indices as key with an associated input map as value. This function compute for each solution its necessary input map and return them grouped ex: inputmaps = {(0, 1, 2): {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1'}, (1,): {'X': 't2.C0', 'T': 't2.C1'}} return : [([1], {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1', 'X': 't2.C0', 'T': 't2.C1'}), ([0,2], {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1'})] """ifnotself._inputmaps:return[(allsolindices,None)]mapbysol={}# compute a single map for each solutionforsolindices,basemapinself._inputmaps.iteritems():forsolindexinsolindices:solmap=mapbysol.setdefault(solindex,{})solmap.update(basemap)try:allsolindices.remove(solindex)exceptKeyError:continue# already removed# group results by identical input mapresult=[]forsolindex,solmapinmapbysol.iteritems():forsolindices,commonmapinresult:ifcommonmap==solmap:solindices.append(solindex)breakelse:result.append(([solindex],solmap))ifallsolindices:result.append((list(allsolindices),None))returnresultdefbuild_final_part(self,select,solindices,inputmap,sources,insertedvars):solutions=[self._solutions[i]foriinsolindices]ifself._conflicts:forvarname,mappedtoinself._conflicts:var=select.defined_vars[varname]newvar=select.make_variable()# XXX should use var.scope but scope hasn't been computed yetselect.add_relation(var,'identity',newvar)forsolinsolutions:sol[newvar.name]=sol[varname]inputmap[newvar.name]=mappedtorqlst=self.plan.finalize(select,solutions,insertedvars)ifself.temptableisNoneandself.finaltableisNone:returnOneFetchStep(self.plan,rqlst,sources,inputmap=inputmap)table=self.temptableorself.finaltablereturnFetchStep(self.plan,rqlst,sources,table,True,inputmap)defbuild_non_final_part(self,select,solindices,sources,insertedvars,table):"""non final step, will have to store results in a temporary table"""solutions=[self._solutions[i]foriinsolindices]rqlst=self.plan.finalize(select,solutions,insertedvars)step=FetchStep(self.plan,rqlst,sources,table,False)# update input map for following steps, according to processed solutionsinputmapkey=tuple(sorted(solindices))inputmap=self._inputmaps.setdefault(inputmapkey,{})forvarname,mappinginstep.outputmap.iteritems():ifvarnameininputmapand \not(mapping==inputmap[varname]orself._schema.eschema(solutions[0][varname]).is_final()):self._conflicts.append((varname,inputmap[varname]))inputmap.update(step.outputmap)self.plan.add_step(step)classMSPlanner(SSPlanner):"""MultiSourcesPlanner: build execution plan for rql queries decompose the RQL query according to sources'schema """defbuild_select_plan(self,plan,rqlst):"""build execution plan for a SELECT RQL query the rqlst should not be tagged at this point """ifserver.DEBUG&server.DBG_MS:print'-'*80print'PLANNING',rqlstforselectinrqlst.children:iflen(select.solutions)>1:hasmultiplesols=Truebreakelse:hasmultiplesols=False# preprocess deals with security insertion and returns a new syntax tree# which have to be executed to fulfill the query: according# to permissions for variable's type, different rql queries may have to# be executedplan.preprocess(rqlst)ppis=[PartPlanInformation(plan,select,self.rqlhelper)forselectinrqlst.children]steps=self._union_plan(plan,rqlst,ppis)ifserver.DEBUG&server.DBG_MS:frompprintimportpprintforstepinplan.steps:pprint(step.test_repr())pprint(steps[0].test_repr())returnstepsdef_ppi_subqueries(self,ppi):# part plan info for subqueriesplan=ppi.planinputmap={}forsubqueryinppi.rqlst.with_[:]:sppis=[PartPlanInformation(plan,select)forselectinsubquery.query.children]forsppiinsppis:ifsppi.needsplitorsppi.part_sources!=ppi.part_sources:temptable='T%s'%make_uid(id(subquery))sstep=self._union_plan(plan,subquery.query,sppis,temptable)[0]breakelse:sstep=NoneifsstepisnotNone:ppi.rqlst.with_.remove(subquery)fori,colaliasinenumerate(subquery.aliases):inputmap[colalias.name]='%s.C%s'%(temptable,i)ppi.plan.add_step(sstep)returninputmapdef_union_plan(self,plan,union,ppis,temptable=None):tosplit,cango,allsources=[],{},set()forplaninfoinppis:ifplaninfo.needsplit:tosplit.append(planinfo)else:cango.setdefault(planinfo.part_sources,[]).append(planinfo)forsourceinplaninfo.part_sources:allsources.add(source)# first add steps for query parts which doesn't need to splittedsteps=[]forsources,cppisincango.iteritems():byinputmap={}forppiincppis:select=ppi.rqlstifsources!=(ppi.system_source,):add_types_restriction(self.schema,select)# part plan info for subqueriesinputmap=self._ppi_subqueries(ppi)aggrstep=need_aggr_step(select,sources)ifaggrstep:atemptable='T%s'%make_uid(id(select))sunion=Union()sunion.append(select)selected=select.selection[:]select_group_sort(select)step=AggrStep(plan,selected,select,atemptable,temptable)step.set_limit_offset(select.limit,select.offset)select.limit=Noneselect.offset=0fstep=FetchStep(plan,sunion,sources,atemptable,True,inputmap)step.children.append(fstep)steps.append(step)else:byinputmap.setdefault(tuple(inputmap.iteritems()),[]).append((select))forinputmap,queriesinbyinputmap.iteritems():inputmap=dict(inputmap)sunion=Union()forselectinqueries:sunion.append(select)iftemptable:steps.append(FetchStep(plan,sunion,sources,temptable,True,inputmap))else:steps.append(OneFetchStep(plan,sunion,sources,inputmap))# then add steps for splitted query partsforplaninfointosplit:steps.append(self.split_part(planinfo,temptable))iflen(steps)>1:iftemptable:step=UnionFetchStep(plan)else:step=UnionStep(plan)step.children=stepsreturn(step,)returnsteps# internal methods for multisources decomposition #########################defsplit_part(self,ppi,temptable):ppi.finaltable=temptableplan=ppi.planselect=ppi.rqlstsubinputmap=self._ppi_subqueries(ppi)stepdefs=ppi.part_steps()ifneed_aggr_step(select,ppi.part_sources,stepdefs):atemptable='T%s'%make_uid(id(select))selection=select.selection[:]select_group_sort(select)else:atemptable=Noneselection=select.selectionppi.temptable=atemptablevfilter=TermsFiltererVisitor(self.schema,ppi)steps=[]forsources,terms,solindices,scope,needsel,finalinstepdefs:# extract an executable query using only the specified termsifsources[0].uri=='system':# in this case we have to merge input maps before call to# filter so already processed restriction are correctly# removedsolsinputmaps=ppi.merge_input_maps(solindices)forsolindices,inputmapinsolsinputmaps:minrqlst,insertedvars=vfilter.filter(sources,terms,scope,set(solindices),needsel,final)ifinputmapisNone:inputmap=subinputmapelse:inputmap.update(subinputmap)steps.append(ppi.build_final_part(minrqlst,solindices,inputmap,sources,insertedvars))else:# this is a final part (i.e. retreiving results for the# original query part) if all term / sources have been# treated or if this is the last shot for used solutionsminrqlst,insertedvars=vfilter.filter(sources,terms,scope,solindices,needsel,final)iffinal:solsinputmaps=ppi.merge_input_maps(solindices)forsolindices,inputmapinsolsinputmaps:ifinputmapisNone:inputmap=subinputmapelse:inputmap.update(subinputmap)ifinputmapandlen(sources)>1:sources.remove(ppi.system_source)steps.append(ppi.build_final_part(minrqlst,solindices,None,sources,insertedvars))steps.append(ppi.build_final_part(minrqlst,solindices,inputmap,[ppi.system_source],insertedvars))else:steps.append(ppi.build_final_part(minrqlst,solindices,inputmap,sources,insertedvars))else:table='_T%s%s'%(''.join(sorted(v._ms_table_key()forvinterms)),''.join(sorted(str(i)foriinsolindices)))ppi.build_non_final_part(minrqlst,solindices,sources,insertedvars,table)# finally: join parts, deal with aggregat/group/sorts if necessaryifatemptableisnotNone:step=AggrStep(plan,selection,select,atemptable,temptable)step.children=stepseliflen(steps)>1:ifselect.need_intersectorany(select.need_intersectforstepinstepsforselectinstep.union.children):iftemptable:step=IntersectFetchStep(plan)else:step=IntersectStep(plan)else:iftemptable:step=UnionFetchStep(plan)else:step=UnionStep(plan)step.children=stepselse:step=steps[0]ifselect.limitisnotNoneorselect.offset:step.set_limit_offset(select.limit,select.offset)returnstepclassUnsupportedBranch(Exception):passclassTermsFiltererVisitor(object):def__init__(self,schema,ppi):self.schema=schemaself.ppi=ppiself.skip={}self.hasaggrstep=self.ppi.temptableself.extneedsel=frozenset(vref.nameforsortterminppi.rqlst.orderbyforvrefinsortterm.iget_nodes(VariableRef))def_rqlst_accept(self,rqlst,node,newroot,terms,setfunc=None):try:newrestr,node_=node.accept(self,newroot,terms[:])exceptUnsupportedBranch:returnrqlstifsetfuncisnotNoneandnewrestrisnotNone:setfunc(newrestr)ifnotnode_isnode:rqlst=node.parentreturnrqlstdeffilter(self,sources,terms,rqlst,solindices,needsel,final):ifserver.DEBUG&server.DBG_MS:print'filter',finaland'final'or'',sources,terms,rqlst,solindices,needselnewroot=Select()self.sources=sorted(sources)self.terms=termsself.solindices=solindicesself.final=finalself._pending_vrefs=[]# terms which appear in unsupported branchesneedsel|=self.extneedselself.needsel=needsel# terms which appear in supported branchesself.mayneedsel=set()# new inserted variablesself.insertedvars=[]# other structures (XXX document)self.mayneedvar,self.hasvar={},{}self.use_only_defined=Falseself.scopes={rqlst:newroot}self.current_scope=rqlstifrqlst.where:rqlst=self._rqlst_accept(rqlst,rqlst.where,newroot,terms,newroot.set_where)ifisinstance(rqlst,Select):self.use_only_defined=Trueifrqlst.groupby:groupby=[]fornodeinrqlst.groupby:rqlst=self._rqlst_accept(rqlst,node,newroot,terms,groupby.append)ifgroupby:newroot.set_groupby(groupby)ifrqlst.having:having=[]fornodeinrqlst.having:rqlst=self._rqlst_accept(rqlst,node,newroot,terms,having.append)ifhaving:newroot.set_having(having)iffinalandrqlst.orderbyandnotself.hasaggrstep:orderby=[]fornodeinrqlst.orderby:rqlst=self._rqlst_accept(rqlst,node,newroot,terms,orderby.append)iforderby:newroot.set_orderby(orderby)self.process_selection(newroot,terms,rqlst)elifnotnewroot.where:# no restrictions have been copied, just select terms and add# type restriction (done later by add_types_restriction)forvinterms:ifnotisinstance(v,Variable):continuenewroot.append_selected(VariableRef(newroot.get_variable(v.name)))solutions=self.ppi.copy_solutions(solindices)cleanup_solutions(newroot,solutions)newroot.set_possible_types(solutions)iffinal:ifself.hasaggrstep:self.add_necessary_selection(newroot,self.mayneedsel&self.extneedsel)newroot.distinct=rqlst.distinctelse:self.add_necessary_selection(newroot,self.mayneedsel&self.needsel)# insert vars to fetch constant values when neededfor(varname,rschema),reldefsinself.mayneedvar.iteritems():forrel,oredinreldefs:ifnot(varname,rschema)inself.hasvar:self.hasvar[(varname,rschema)]=None# just to avoid further insertioncvar=newroot.make_variable()forsolinnewroot.solutions:sol[cvar.name]=rschema.objects(sol[varname])[0]# if the current restriction is not used in a OR branch,# we can keep it, else we have to drop the constant# restriction (or we may miss some results)ifnotored:rel=rel.copy(newroot)newroot.add_restriction(rel)# add a relation to link the variablenewroot.remove_node(rel.children[1])cmp=Comparison('=')rel.append(cmp)cmp.append(VariableRef(cvar))self.insertedvars.append((varname,rschema,cvar.name))newroot.append_selected(VariableRef(newroot.get_variable(cvar.name)))# NOTE: even if the restriction is done by this query, we have# to let it in the original rqlst so that it appears anyway in# the "final" query, else we may change the meaning of the query# if there are NOT somewhere :# 'NOT X relation Y, Y name "toto"' means X WHERE X isn't related# to Y whose name is toto while# 'NOT X relation Y' means X WHERE X has no 'relation' (whatever Y)elifored:newroot.remove_node(rel)add_types_restriction(self.schema,rqlst,newroot,solutions)ifserver.DEBUG&server.DBG_MS:print'--->',newrootreturnnewroot,self.insertedvarsdefvisit_and(self,node,newroot,terms):subparts=[]foriinxrange(len(node.children)):child=node.children[i]try:newchild,child_=child.accept(self,newroot,terms)ifnotchild_ischild:node=child_.parentifnewchildisNone:continuesubparts.append(newchild)exceptUnsupportedBranch:continueifnotsubparts:returnNone,nodeiflen(subparts)==1:returnsubparts[0],nodereturncopy_node(newroot,node,subparts),nodevisit_or=visit_anddef_relation_supported(self,relation):rtype=relation.r_typeforsourceinself.sources:ifnotsource.support_relation(rtype)or(rtypeinsource.cross_relationsandnotrelationinself.terms):returnFalseifnotself.finalandnotrelationinself.terms:rschema=self.schema.rschema(relation.r_type)ifnotrschema.is_final():forterminrelation.get_nodes((VariableRef,Constant)):term=getattr(term,'variable',term)termsources=sorted(set(x[0]forxinself.ppi._term_sources(term)))iftermsourcesandtermsources!=self.sources:returnFalsereturnTruedefvisit_relation(self,node,newroot,terms):ifnotnode.is_types_restriction():ifnodeinself.skipandself.solindices.issubset(self.skip[node]):ifnotself.schema.rschema(node.r_type).is_final():# can't really skip the relation if one variable is selected and only# referenced by this relationforvrefinnode.iget_nodes(VariableRef):stinfo=vref.variable.stinfoifstinfo['selected']andlen(stinfo['relations'])==1:breakelse:returnNone,nodeelse:returnNone,nodeifnotself._relation_supported(node):raiseUnsupportedBranch()# don't copy type restriction unless this is the only relation for the# rhs variable, else they'll be reinserted later as needed (else we may# copy a type restriction while the variable is not actually used)elifnotany(self._relation_supported(rel)forrelinnode.children[0].variable.stinfo['relations']):rel,node=self.visit_default(node,newroot,terms)returnrel,nodeelse:raiseUnsupportedBranch()rschema=self.schema.rschema(node.r_type)self._pending_vrefs=[]try:res=self.visit_default(node,newroot,terms)[0]except:# when a relation isn't supported, we should dereference potentially# introduced variable refsforvrefinself._pending_vrefs:vref.unregister_reference()raiseored=node.ored()ifrschema.is_final()orrschema.inlined:vrefs=node.children[1].get_nodes(VariableRef)ifnotvrefs:ifnotored:self.skip.setdefault(node,set()).update(self.solindices)else:self.mayneedvar.setdefault((node.children[0].name,rschema),[]).append((res,ored))else:assertlen(vrefs)==1vref=vrefs[0]# XXX check operator ?self.hasvar[(node.children[0].name,rschema)]=vrefifself._may_skip_attr_rel(rschema,node,vref,ored,terms,res):self.skip.setdefault(node,set()).update(self.solindices)elifnotored:self.skip.setdefault(node,set()).update(self.solindices)returnres,nodedef_may_skip_attr_rel(self,rschema,rel,vref,ored,terms,res):var=vref.variableifored:returnFalseifvar.nameinself.extneedselorvar.stinfo['selected']:returnFalseifnotvarintermsorused_in_outer_scope(var,self.current_scope):returnFalseifany(vforv,_invar.stinfo['attrvars']ifnotvinterms):returnFalsereturnTruedefvisit_exists(self,node,newroot,terms):newexists=node.__class__()self.scopes={node:newexists}subparts,node=self._visit_children(node,newroot,terms)ifnotsubparts:returnNone,nodenewexists.set_where(subparts[0])returnnewexists,nodedefvisit_not(self,node,newroot,terms):subparts,node=self._visit_children(node,newroot,terms)ifnotsubparts:returnNone,nodereturncopy_node(newroot,node,subparts),nodedefvisit_group(self,node,newroot,terms):ifnotself.final:returnNone,nodereturnself.visit_default(node,newroot,terms)defvisit_variableref(self,node,newroot,terms):ifself.use_only_defined:ifnotnode.variable.nameinnewroot.defined_vars:raiseUnsupportedBranch(node.name)elifnotnode.variableinterms:raiseUnsupportedBranch(node.name)self.mayneedsel.add(node.name)# set scope so we can insert types restriction properlynewvar=newroot.get_variable(node.name)newvar.stinfo['scope']=self.scopes.get(node.variable.scope,newroot)vref=VariableRef(newvar)self._pending_vrefs.append(vref)returnvref,nodedefvisit_constant(self,node,newroot,terms):returncopy_node(newroot,node),nodedefvisit_default(self,node,newroot,terms):subparts,node=self._visit_children(node,newroot,terms)returncopy_node(newroot,node,subparts),nodevisit_comparison=visit_mathexpression=visit_constant=visit_function=visit_defaultvisit_sort=visit_sortterm=visit_defaultdef_visit_children(self,node,newroot,terms):subparts=[]foriinxrange(len(node.children)):child=node.children[i]newchild,child_=child.accept(self,newroot,terms)ifnotchildischild_:node=child_.parentifnewchildisnotNone:subparts.append(newchild)returnsubparts,nodedefprocess_selection(self,newroot,terms,rqlst):ifself.final:forterminrqlst.selection:newroot.append_selected(term.copy(newroot))forvrefinterm.get_nodes(VariableRef):self.needsel.add(vref.name)returnforterminrqlst.selection:vrefs=term.get_nodes(VariableRef)ifvrefs:supportedvars=[]forvrefinvrefs:var=vref.variableifvarinterms:supportedvars.append(vref)continueelse:self.needsel.add(vref.name)breakelse:forvrefinvrefs:newroot.append_selected(vref.copy(newroot))supportedvars=[]forvrefinsupportedvars:ifnotvrefinnewroot.get_selected_variables():newroot.append_selected(VariableRef(newroot.get_variable(vref.name)))defadd_necessary_selection(self,newroot,terms):selected=tuple(newroot.get_selected_variables())forvarnameinterms:var=newroot.defined_vars[varname]forvrefinvar.references():rel=vref.relation()ifrelisNoneandvrefinselected:# already selectedbreakelse:selvref=VariableRef(var)newroot.append_selected(selvref)ifnewroot.groupby:newroot.add_group_var(VariableRef(selvref.variable,noautoref=1))