--- a/server/msplanner.py Thu Apr 02 18:52:08 2009 +0200
+++ b/server/msplanner.py Fri Apr 03 19:04:00 2009 +0200
@@ -1,32 +1,54 @@
"""plan execution of rql queries on multiple sources
-the best way to understand what are we trying to acheive here is to read
-the unit-tests in unittest_querier_planner.py
+the best way to understand what are we trying to acheive here is to read the
+unit-tests in unittest_msplanner.py
+What you need to know
+~~~~~~~~~~~~~~~~~~~~~
+1. The system source is expected to support every entity and relation types
-Split and execution specifications
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+2. Given "X relation Y":
+
+ * if relation, X and Y types are supported by the external source, we suppose
+ by default that X and Y should both come from the same source as the
+ relation. You can specify otherwise by adding relation into the
+ "cross_relations" set in the source's mapping file and it that case, we'll
+ consider that we can also find in the system source some relation between
+ X and Y coming from different sources.
+
+ * if "relation" isn't supported by the external source but X or Y
+ types (or both) are, we suppose by default that can find in the system
+ source some relation where X and/or Y come from the external source. You
+ can specify otherwise by adding relation into the "dont_cross_relations"
+ set in the source's mapping file and it that case, we'll consider that we
+ can only find in the system source some relation between X and Y coming
+ the system source.
+
+
+Implementation
+~~~~~~~~~~~~~~
+XXX explain algorithm
+
+
+Exemples of multi-sources query execution
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
For a system source and a ldap user source (only EUser and its attributes
is supported, no group or such):
-
:EUser X:
1. fetch EUser X from both sources and return concatenation of results
-
:EUser X WHERE X in_group G, G name 'users':
* catch 1
- 1. fetch EUser X from both sources, store concatenation of results
- into a temporary table
- 2. return the result of TMP X WHERE X in_group G, G name 'users' from
- the system source
-
+ 1. fetch EUser X from both sources, store concatenation of results into a
+ temporary table
+ 2. return the result of TMP X WHERE X in_group G, G name 'users' from the
+ system source
* catch 2
- 1. return the result of EUser X WHERE X in_group G, G name 'users'
- from system source, that's enough (optimization of the sql querier
- will avoid join on EUser, so we will directly get local eids)
-
+ 1. return the result of EUser X WHERE X in_group G, G name 'users' from system
+ source, that's enough (optimization of the sql querier will avoid join on
+ EUser, so we will directly get local eids)
:EUser X,L WHERE X in_group G, X login L, G name 'users':
1. fetch Any X,L WHERE X is EUser, X login L from both sources, store
@@ -37,15 +59,14 @@
:Any X WHERE X owned_by Y:
* catch 1
- 1. fetch EUser X from both sources, store concatenation of results
- into a temporary table
- 2. return the result of Any X WHERE X owned_by Y, Y is TMP from
- the system source
-
+ 1. fetch EUser X from both sources, store concatenation of results into a
+ temporary table
+ 2. return the result of Any X WHERE X owned_by Y, Y is TMP from the system
+ source
* catch 2
- 1. return the result of Any X WHERE X owned_by Y
- from system source, that's enough (optimization of the sql querier
- will avoid join on EUser, so we will directly get local eids)
+ 1. return the result of Any X WHERE X owned_by Y from system source, that's
+ enough (optimization of the sql querier will avoid join on EUser, so we
+ will directly get local eids)
:organization: Logilab
@@ -66,7 +87,8 @@
from cubicweb import server
from cubicweb.common.utils import make_uid
from cubicweb.server.utils import cleanup_solutions
-from cubicweb.server.ssplanner import SSPlanner, OneFetchStep, add_types_restriction
+from cubicweb.server.ssplanner import (SSPlanner, OneFetchStep,
+ add_types_restriction)
from cubicweb.server.mssteps import *
from cubicweb.server.sources import AbstractSource
@@ -77,14 +99,6 @@
AbstractSource.dont_cross_relations = ()
AbstractSource.cross_relations = ()
-
-def allequals(solutions):
- """return true if all solutions are identical"""
- sol = solutions.next()
- for sol_ in solutions:
- if sol_ != sol:
- return False
- return True
def need_aggr_step(select, sources, stepdefs=None):
"""return True if a temporary table is necessary to store some partial
@@ -114,24 +128,6 @@
fstepsolindices.update(stepdef[2])
return False
-def copy_node(newroot, node, subparts=()):
- newnode = node.__class__(*node.initargs(newroot))
- for part in subparts:
- newnode.append(part)
- return newnode
-
-def same_scope(var):
- """return true if the variable is always used in the same scope"""
- try:
- return var.stinfo['samescope']
- except KeyError:
- for rel in var.stinfo['relations']:
- if not rel.scope is var.scope:
- var.stinfo['samescope'] = False
- return False
- var.stinfo['samescope'] = True
- return True
-
def select_group_sort(select): # XXX something similar done in rql2sql
# add variables used in groups and sort terms to the selection
# if necessary
@@ -150,7 +146,17 @@
if select.groupby and not vref in select.groupby:
select.add_group_var(vref.copy(select))
-# XXX move to rql
+def allequals(solutions):
+ """return true if all solutions are identical"""
+ sol = solutions.next()
+ noconstsol = None
+ for sol_ in solutions:
+ if sol_ != sol:
+ return False
+ return True
+
+# XXX move functions below to rql ##############################################
+
def is_ancestor(n1, n2):
p = n1.parent
while p is not None:
@@ -159,49 +165,102 @@
p = p.parent
return False
+def copy_node(newroot, node, subparts=()):
+ newnode = node.__class__(*node.initargs(newroot))
+ for part in subparts:
+ newnode.append(part)
+ return newnode
+
+def same_scope(var):
+ """return true if the variable is always used in the same scope"""
+ try:
+ return var.stinfo['samescope']
+ except KeyError:
+ for rel in var.stinfo['relations']:
+ if not rel.scope is var.scope:
+ var.stinfo['samescope'] = False
+ return False
+ var.stinfo['samescope'] = True
+ return True
+
+################################################################################
+
class PartPlanInformation(object):
"""regroups necessary information to execute some part of a "global" rql
query ("global" means as received by the querier, which may result in
- several internal queries, e.g. parts, due to security insertions)
+ several internal queries, e.g. parts, due to security insertions). Actually
+ a PPI is created for each subquery and for each query in a union.
- it exposes as well some methods helping in executing this part on a
+ It exposes as well some methods helping in executing this part on a
multi-sources repository, modifying its internal structure during the
- process
+ process.
- :attr solutions: a list of mappings (varname -> vartype)
- :attr sourcesvars:
- a dictionnary telling for each source which variable/solution are
- supported, of the form {source : {varname: [solution index, ]}}
+ :attr plan:
+ the execution plan
+ :attr rqlst:
+ the original rql syntax tree handled by this part
+
+ :attr needsplit:
+ bool telling if the query has to be split into multiple steps for
+ execution or if it can be executed at once
+
+ :attr temptable:
+ a SQL temporary table name or None, if necessary to handle aggregate /
+ sorting for this part of the query
+
+ :attr finaltable:
+ a SQL table name or None, if results for this part of the query should be
+ written into a temporary table (usually shared by multiple PPI)
+
+ :attr sourcesterms:
+ a dictionary {source : {term: set([solution index, ])}} telling for each
+ source which terms are supported for which solutions. A "term" may be
+ either a rql Variable, Constant or Relation node.
"""
def __init__(self, plan, rqlst, rqlhelper=None):
+ self.plan = plan
+ self.rqlst = rqlst
self.needsplit = False
self.temptable = None
self.finaltable = None
- self.plan = plan
- self.rqlst = rqlst
+ self._schema = plan.schema
self._session = plan.session
+ self._repo = self._session.repo
self._solutions = rqlst.solutions
self._solindices = range(len(self._solutions))
- # source : {var: [solution index, ]}
- self.sourcesvars = self._sourcesvars = {}
+ # source : {term: [solution index, ]}
+ self.sourcesterms = self._sourcesterms = {}
# source : {relation: set(child variable and constant)}
self._crossrelations = {}
- # dictionnary of variables which are linked to each other using a non
- # final relation which is supported by multiple sources
- self._linkedvars = {}
- self._crosslinkedvars = {}
+ # dictionary of variables and constants which are linked to each other
+ # using a non final relation supported by multiple sources (crossed or
+ # not).
+ self._linkedterms = {}
# processing
- self._compute_sourcesvars()
- self._remove_invalid_sources()
+ termssources = self._compute_sourcesterms()
+ self._remove_invalid_sources(termssources)
self._compute_needsplit()
- self.sourcesvars = {}
- for k, v in self._sourcesvars.iteritems():
- self.sourcesvars[k] = {}
+ # after initialisation, .sourcesterms contains the same thing as
+ # ._sourcesterms though during plan construction, ._sourcesterms will
+ # be modified while .sourcesterms will be kept unmodified
+ self.sourcesterms = {}
+ for k, v in self._sourcesterms.iteritems():
+ self.sourcesterms[k] = {}
for k2, v2 in v.iteritems():
- self.sourcesvars[k][k2] = v2.copy()
+ self.sourcesterms[k][k2] = v2.copy()
+ # cleanup linked var
+ for var, linkedrelsinfo in self._linkedterms.iteritems():
+ self._linkedterms[var] = frozenset(x[0] for x in linkedrelsinfo)
+ # map output of a step to input of a following step
self._inputmaps = {}
+ # record input map conflicts to resolve them on final step generation
+ self._conflicts = []
if rqlhelper is not None: # else test
self._insert_identity_variable = rqlhelper._annotator.rewrite_shared_optional
+ if server.DEBUG:
+ print 'sourcesterms:'
+ for source, terms in self.sourcesterms.items():
+ print source, terms
def copy_solutions(self, solindices):
return [self._solutions[solidx].copy() for solidx in solindices]
@@ -209,14 +268,14 @@
@property
@cached
def part_sources(self):
- if self._sourcesvars:
- return tuple(sorted(self._sourcesvars))
- return (self._session.repo.system_source,)
+ if self._sourcesterms:
+ return tuple(sorted(self._sourcesterms))
+ return (self._repo.system_source,)
@property
@cached
def _sys_source_set(self):
- return frozenset((self._session.repo.system_source, solindex)
+ return frozenset((self._repo.system_source, solindex)
for solindex in self._solindices)
@cached
@@ -224,20 +283,18 @@
"""return a set of (source, solindex) where source doesn't support the
relation
"""
- return frozenset((source, solidx) for source in self._session.repo.sources
+ return frozenset((source, solidx) for source in self._repo.sources
for solidx in self._solindices
- if not ((source.support_relation(relation.r_type) and
- not self.crossed_relation(source, relation))
+ if not ((source.support_relation(relation.r_type))
or relation.r_type in source.dont_cross_relations))
-
- def _compute_sourcesvars(self):
- """compute for each variable/solution in the rqlst which sources support
- them
+ def _compute_sourcesterms(self):
+ """compute for each term (variable, rewritten constant, relation) and
+ for each solution in the rqlst which sources support them
"""
- repo = self._session.repo
- eschema = repo.schema.eschema
- sourcesvars = self._sourcesvars
+ repo = self._repo
+ eschema = self._schema.eschema
+ sourcesterms = self._sourcesterms
# find for each source which variable/solution are supported
for varname, varobj in self.rqlst.defined_vars.items():
# if variable has an eid specified, we can get its source directly
@@ -245,22 +302,22 @@
if varobj.stinfo['uidrels']:
vrels = varobj.stinfo['relations'] - varobj.stinfo['uidrels']
for rel in varobj.stinfo['uidrels']:
- if rel.neged(strict=True) or rel.operator() != '=':
+ if rel.neged(strict=True) or rel.operator() != '=':
continue
for const in rel.children[1].get_nodes(Constant):
eid = const.eval(self.plan.args)
source = self._session.source_from_eid(eid)
if vrels and not any(source.support_relation(r.r_type)
for r in vrels):
- self._set_source_for_var(repo.system_source, varobj)
+ self._set_source_for_term(repo.system_source, varobj)
else:
- self._set_source_for_var(source, varobj)
+ self._set_source_for_term(source, varobj)
continue
rels = varobj.stinfo['relations']
if not rels and not varobj.stinfo['typerels']:
# (rare) case where the variable has no type specified nor
# relation accessed ex. "Any MAX(X)"
- self._set_source_for_var(repo.system_source, varobj)
+ self._set_source_for_term(repo.system_source, varobj)
continue
for i, sol in enumerate(self._solutions):
vartype = sol[varname]
@@ -276,64 +333,28 @@
if not varobj._q_invariant or \
any(imap(source.support_relation,
(r.r_type for r in rels if r.r_type != 'eid'))):
- sourcesvars.setdefault(source, {}).setdefault(varobj, set()).add(i)
+ sourcesterms.setdefault(source, {}).setdefault(varobj, set()).add(i)
# if variable is not invariant and is used by a relation
# not supported by this source, we'll have to split the
# query
if not varobj._q_invariant and any(ifilterfalse(
source.support_relation, (r.r_type for r in rels))):
self.needsplit = True
-
- def _handle_cross_relation(self, rel, relsources, vsources):
- crossvars = None
- for source in relsources:
- if rel.r_type in source.cross_relations:
- crossvars = set(x.variable for x in rel.get_nodes(VariableRef))
- crossvars.update(frozenset(x for x in rel.get_nodes(Constant)))
- assert len(crossvars) == 2
- ssource = self._session.repo.system_source
- needsplit = True
- flag = 0
- for v in crossvars:
- if isinstance(v, Constant):
- allsols = set(self._solindices)
- try:
- self._sourcesvars[ssource][v] = allsols
- except KeyError:
- self._sourcesvars[ssource] = {v: allsols}
- if len(vsources[v]) == 1:
- if iter(vsources[v]).next()[0].uri == 'system':
- flag = 1
- for ov in crossvars:
- if ov is not v and (isinstance(ov, Constant) or ov._q_invariant):
- ssset = frozenset((ssource,))
- self._remove_sources(ov, vsources[ov] - ssset)
- else:
- for ov in crossvars:
- if ov is not v and (isinstance(ov, Constant) or ov._q_invariant):
- needsplit = False
- break
- else:
- continue
- if not rel.neged(strict=True):
- break
- else:
- self._crossrelations.setdefault(source, {})[rel] = crossvars
- if not flag:
- self._sourcesvars.setdefault(source, {})[rel] = set(self._solindices)
- self._sourcesvars.setdefault(ssource, {})[rel] = set(self._solindices)
- if needsplit:
- self.needsplit = True
- return crossvars is None
-
- def _remove_invalid_sources(self):
- """removes invalid sources from `sourcesvars` member according to
- traversed relations and their properties (which sources support them,
- can they cross sources, etc...)
- """
- repo = self._session.repo
- rschema = repo.schema.rschema
- vsources = {}
+ # add source for rewritten constants to sourcesterms
+ for vconsts in self.rqlst.stinfo['rewritten'].itervalues():
+ const = vconsts[0]
+ source = self._session.source_from_eid(const.eval(self.plan.args))
+ if source is self._repo.system_source:
+ for const in vconsts:
+ self._set_source_for_term(source, const)
+ elif source in self._sourcesterms:
+ source_scopes = frozenset(t.scope for t in self._sourcesterms[source])
+ for const in vconsts:
+ if const.scope in source_scopes:
+ self._set_source_for_term(source, const)
+ # add source for relations
+ rschema = self._schema.rschema
+ termssources = {}
for rel in self.rqlst.iget_nodes(Relation):
# process non final relations only
# note: don't try to get schema for 'is' relation (not available
@@ -346,7 +367,6 @@
# XXX code below don't deal if some source allow relation
# crossing but not another one
relsources = repo.rel_type_sources(rel.r_type)
- crossvars = None
if len(relsources) < 2:
# filter out sources being there because they have this
# relation in their dont_cross_relations attribute
@@ -355,25 +375,78 @@
if relsources:
# this means the relation is using a variable inlined as
# a constant and another unsupported variable, in which
- # case we put the relation in sourcesvars
- self._sourcesvars.setdefault(relsources[0], {})[rel] = set(self._solindices)
+ # case we put the relation in sourcesterms
+ self._sourcesterms.setdefault(relsources[0], {})[rel] = set(self._solindices)
continue
lhs, rhs = rel.get_variable_parts()
lhsv, rhsv = getattr(lhs, 'variable', lhs), getattr(rhs, 'variable', rhs)
- # update dictionnary of sources supporting lhs and rhs vars
- if not lhsv in vsources:
- vsources[lhsv] = self._term_sources(lhs)
- if not rhsv in vsources:
- vsources[rhsv] = self._term_sources(rhs)
- if self._handle_cross_relation(rel, relsources, vsources):
- self._linkedvars.setdefault(lhsv, set()).add((rhsv, rel))
- self._linkedvars.setdefault(rhsv, set()).add((lhsv, rel))
- else:
- self._crosslinkedvars.setdefault(lhsv, set()).add((rhsv, rel))
- self._crosslinkedvars.setdefault(rhsv, set()).add((lhsv, rel))
- for term in self._linkedvars:
- self._remove_sources_until_stable(term, vsources)
- if len(self._sourcesvars) > 1 and hasattr(self.plan.rqlst, 'main_relations'):
+ # update dictionary of sources supporting lhs and rhs vars
+ if not lhsv in termssources:
+ termssources[lhsv] = self._term_sources(lhs)
+ if not rhsv in termssources:
+ termssources[rhsv] = self._term_sources(rhs)
+ self._handle_cross_relation(rel, relsources, termssources)
+ self._linkedterms.setdefault(lhsv, set()).add((rhsv, rel))
+ self._linkedterms.setdefault(rhsv, set()).add((lhsv, rel))
+ return termssources
+
+ def _handle_cross_relation(self, rel, relsources, termssources):
+ cross_rel = False
+ for source in relsources:
+ if rel.r_type in source.cross_relations:
+ ssource = self._repo.system_source
+ crossvars = set(x.variable for x in rel.get_nodes(VariableRef))
+ for const in rel.get_nodes(Constant):
+ if source.uri != 'system' and not const in self._sourcesterms.get(source, ()):
+ continue
+ crossvars.add(const)
+ # XXX this is counter intuitive, though this is currently a
+ # trick to add const to system source terms so we get a
+ # chance that solutions will compare to equals when
+ # computing need split
+ allsols = set(self._solindices)
+ try:
+ self._sourcesterms[ssource][const] = allsols
+ except KeyError:
+ self._sourcesterms[ssource] = {const: allsols}
+ self._crossrelations.setdefault(source, {})[rel] = crossvars
+ if len(crossvars) < 2:
+ # this means there is a constant in the relation which is
+ # not supported by the source, so we can stop here
+ continue
+ self._sourcesterms.setdefault(ssource, {})[rel] = set(self._solindices)
+ cross_rel = True
+ needsplit = True
+ flag = False
+ for term in crossvars:
+ if len(termssources[term]) == 1:
+ if iter(termssources[term]).next()[0].uri == 'system':
+ flag = True
+ for ov in crossvars:
+ if ov is not term and (isinstance(ov, Constant) or ov._q_invariant):
+ ssset = frozenset((ssource,))
+ self._remove_sources(ov, termssources[ov] - ssset)
+ else:
+ for ov in crossvars:
+ if ov is not term and (isinstance(ov, Constant) or ov._q_invariant):
+ needsplit = False
+ break
+ else:
+ continue
+ if not flag:
+ self._sourcesterms.setdefault(source, {})[rel] = set(self._solindices)
+ if needsplit:
+ self.needsplit = True
+ return cross_rel
+
+ def _remove_invalid_sources(self, termssources):
+ """removes invalid sources from `sourcesterms` member according to
+ traversed relations and their properties (which sources support them,
+ can they cross sources, etc...)
+ """
+ for term in self._linkedterms:
+ self._remove_sources_until_stable(term, termssources)
+ if len(self._sourcesterms) > 1 and hasattr(self.plan.rqlst, 'main_relations'):
# the querier doesn't annotate write queries, need to do it here
self.plan.annotate_rqlst()
# insert/update/delete queries, we may get extra information from
@@ -383,6 +456,8 @@
for etype, vref in self.plan.rqlst.main_variables)
else:
inserted = {}
+ repo = self._repo
+ rschema = self._schema.rschema
for rel in self.plan.rqlst.main_relations:
if not rschema(rel.r_type).is_final():
# nothing to do if relation is not supported by multiple sources
@@ -390,96 +465,87 @@
continue
lhs, rhs = rel.get_variable_parts()
try:
- lhsv = self._extern_term(lhs, vsources, inserted)
- rhsv = self._extern_term(rhs, vsources, inserted)
+ lhsv = self._extern_term(lhs, termssources, inserted)
+ rhsv = self._extern_term(rhs, termssources, inserted)
except KeyError, ex:
continue
- norelsup = self._norel_support_set(rel)
- self._remove_var_sources(lhsv, norelsup, rhsv, vsources)
- self._remove_var_sources(rhsv, norelsup, lhsv, vsources)
- # cleanup linked var
- for var, linkedrelsinfo in self._linkedvars.iteritems():
- self._linkedvars[var] = frozenset(x[0] for x in linkedrelsinfo)
- # if there are other sources than the system source, consider simplified
- # variables'source
- if self._sourcesvars and self._sourcesvars.keys() != [self._session.repo.system_source]:
- # add source for rewritten constants to sourcesvars
- for vconsts in self.rqlst.stinfo['rewritten'].itervalues():
- const = vconsts[0]
- eid = const.eval(self.plan.args)
- source = self._session.source_from_eid(eid)
- if source is self._session.repo.system_source:
- for const in vconsts:
- self._set_source_for_var(source, const)
- elif source in self._sourcesvars:
- source_scopes = frozenset(v.scope for v in self._sourcesvars[source])
- for const in vconsts:
- if const.scope in source_scopes:
- self._set_source_for_var(source, const)
-
- def _extern_term(self, term, vsources, inserted):
+ self._remove_term_sources(lhsv, rel, rhsv, termssources)
+ self._remove_term_sources(rhsv, rel, lhsv, termssources)
+
+ def _extern_term(self, term, termssources, inserted):
var = term.variable
if var.stinfo['constnode']:
termv = var.stinfo['constnode']
- vsources[termv] = self._term_sources(termv)
+ termssources[termv] = self._term_sources(termv)
elif var in inserted:
termv = var
- source = self._session.repo.locate_etype_source(inserted[var])
- vsources[termv] = set((source, solindex) for solindex in self._solindices)
+ source = self._repo.locate_etype_source(inserted[var])
+ termssources[termv] = set((source, solindex)
+ for solindex in self._solindices)
else:
termv = self.rqlst.defined_vars[var.name]
- if not termv in vsources:
- vsources[termv] = self._term_sources(termv)
+ if not termv in termssources:
+ termssources[termv] = self._term_sources(termv)
return termv
- def _remove_sources_until_stable(self, var, vsources):
- sourcesvars = self._sourcesvars
- for ovar, rel in self._linkedvars.get(var, ()):
- if not var.scope is ovar.scope and rel.scope.neged(strict=True):
+ def _remove_sources_until_stable(self, term, termssources):
+ sourcesterms = self._sourcesterms
+ for oterm, rel in self._linkedterms.get(term, ()):
+ if not term.scope is oterm.scope and rel.scope.neged(strict=True):
# can't get information from relation inside a NOT exists
- # where variables don't belong to the same scope
+ # where terms don't belong to the same scope
continue
need_ancestor_scope = False
- if not (var.scope is rel.scope and ovar.scope is rel.scope):
+ if not (term.scope is rel.scope and oterm.scope is rel.scope):
if rel.ored():
continue
if rel.ored(traverse_scope=True):
# if relation has some OR as parent, constraints should only
# propagate from parent scope to child scope, nothing else
need_ancestor_scope = True
- relsources = self._session.repo.rel_type_sources(rel.r_type)
+ relsources = self._repo.rel_type_sources(rel.r_type)
if rel.neged(strict=True) and (
len(relsources) < 2
- or not isinstance(ovar, Variable)
- or ovar.valuable_references() != 1
- or any(sourcesvars[source][var] != sourcesvars[source][ovar]
+ or not isinstance(oterm, Variable)
+ or oterm.valuable_references() != 1
+ or any(sourcesterms[source][term] != sourcesterms[source][oterm]
for source in relsources
- if var in sourcesvars.get(source, ())
- and ovar in sourcesvars.get(source, ()))):
- # neged relation doesn't allow to infer variable sources unless we're
- # on a multisource relation for a variable only used by this relation
- # (eg "Any X WHERE NOT X multisource_rel Y" and over is Y), iif
+ if term in sourcesterms.get(source, ())
+ and oterm in sourcesterms.get(source, ()))):
+ # neged relation doesn't allow to infer term sources unless
+ # we're on a multisource relation for a term only used by this
+ # relation (eg "Any X WHERE NOT X multisource_rel Y" and over is
+ # Y)
continue
- norelsup = self._norel_support_set(rel)
- # compute invalid sources for variables and remove them
- if not need_ancestor_scope or is_ancestor(var.scope, ovar.scope):
- self._remove_var_sources(var, norelsup, ovar, vsources)
- if not need_ancestor_scope or is_ancestor(ovar.scope, var.scope):
- self._remove_var_sources(ovar, norelsup, var, vsources)
+ # compute invalid sources for terms and remove them
+ if not need_ancestor_scope or is_ancestor(term.scope, oterm.scope):
+ self._remove_term_sources(term, rel, oterm, termssources)
+ if not need_ancestor_scope or is_ancestor(oterm.scope, term.scope):
+ self._remove_term_sources(oterm, rel, term, termssources)
- def _remove_var_sources(self, var, norelsup, ovar, vsources):
- """remove invalid sources for var according to ovar's sources and the
- relation between those two variables.
+ def _remove_term_sources(self, term, rel, oterm, termssources):
+ """remove invalid sources for term according to oterm's sources and the
+ relation between those two terms.
"""
- varsources = vsources[var]
- invalid_sources = varsources - (vsources[ovar] | norelsup)
+ norelsup = self._norel_support_set(rel)
+ termsources = termssources[term]
+ invalid_sources = termsources - (termssources[oterm] | norelsup)
+ if invalid_sources and self._repo.can_cross_relation(rel.r_type):
+ invalid_sources -= self._sys_source_set
+ if invalid_sources and isinstance(term, Variable) and self._need_ext_source_access(term, rel):
+ # if the term is a not invariant variable, we should filter out
+ # source where the relation is a cross relation from invalid
+ # sources
+ invalid_sources = frozenset([(s, solidx) for s, solidx in invalid_sources
+ if not (s in self._crossrelations and
+ rel in self._crossrelations[s])])
if invalid_sources:
- self._remove_sources(var, invalid_sources)
- varsources -= invalid_sources
- self._remove_sources_until_stable(var, vsources)
+ self._remove_sources(term, invalid_sources)
+ termsources -= invalid_sources
+ self._remove_sources_until_stable(term, termssources)
def _compute_needsplit(self):
- """tell according to sourcesvars if the rqlst has to be splitted for
+ """tell according to sourcesterms if the rqlst has to be splitted for
execution among multiple sources
the execution has to be split if
@@ -491,20 +557,42 @@
be fetched from some source
"""
# NOTE: < 2 since may be 0 on queries such as Any X WHERE X eid 2
- if len(self._sourcesvars) < 2:
+ if len(self._sourcesterms) < 2:
self.needsplit = False
elif not self.needsplit:
- if not allequals(self._sourcesvars.itervalues()):
+ if not allequals(self._sourcesterms.itervalues()):
self.needsplit = True
else:
- sample = self._sourcesvars.itervalues().next()
- if len(sample) > 1 and any(v for v in sample
- if not v in self._linkedvars
- and not v in self._crosslinkedvars):
- self.needsplit = True
-
- def _set_source_for_var(self, source, var):
- self._sourcesvars.setdefault(source, {})[var] = set(self._solindices)
+ sample = self._sourcesterms.itervalues().next()
+ if len(sample) > 1:
+ for term in sample:
+ # need split if unlinked variable
+ if isinstance(term, Variable) and not term in self._linkedterms:
+ self.needsplit = True
+ break
+ else:
+ # need split if there are some cross relation on non
+ # invariant variable or if the variable is used in
+ # multi-sources relation
+ if self._crossrelations:
+ for reldict in self._crossrelations.itervalues():
+ for rel, terms in reldict.iteritems():
+ for term in terms:
+ if isinstance(term, Variable) and self._need_ext_source_access(term, rel):
+ self.needsplit = True
+ return
+
+ @cached
+ def _need_ext_source_access(self, var, rel):
+ if not var._q_invariant:
+ return True
+ if any(r for x, r in self._linkedterms[var]
+ if not r is rel and self._repo.is_multi_sources_relation(r.r_type)):
+ return True
+ return False
+
+ def _set_source_for_term(self, source, term):
+ self._sourcesterms.setdefault(source, {})[term] = set(self._solindices)
def _term_sources(self, term):
"""returns possible sources for terms `term`"""
@@ -513,27 +601,27 @@
return set((source, solindex) for solindex in self._solindices)
else:
var = getattr(term, 'variable', term)
- sources = [source for source, varobjs in self.sourcesvars.iteritems()
+ sources = [source for source, varobjs in self.sourcesterms.iteritems()
if var in varobjs]
return set((source, solindex) for source in sources
- for solindex in self.sourcesvars[source][var])
+ for solindex in self.sourcesterms[source][var])
- def _remove_sources(self, var, sources):
- """removes invalid sources (`sources`) from `sourcesvars`
+ def _remove_sources(self, term, sources):
+ """removes invalid sources (`sources`) from `sourcesterms`
:param sources: the list of sources to remove
- :param var: the analyzed variable
+ :param term: the analyzed term
"""
- sourcesvars = self._sourcesvars
+ sourcesterms = self._sourcesterms
for source, solindex in sources:
try:
- sourcesvars[source][var].remove(solindex)
+ sourcesterms[source][term].remove(solindex)
except KeyError:
return # may occur with subquery column alias
- if not sourcesvars[source][var]:
- del sourcesvars[source][var]
- if not sourcesvars[source]:
- del sourcesvars[source]
+ if not sourcesterms[source][term]:
+ del sourcesterms[source][term]
+ if not sourcesterms[source]:
+ del sourcesterms[source]
def crossed_relation(self, source, relation):
return relation in self._crossrelations.get(source, ())
@@ -545,41 +633,49 @@
"""
steps = []
select = self.rqlst
- rschema = self.plan.schema.rschema
+ rschema = self._schema.rschema
for source in self.part_sources:
- sourcevars = self._sourcesvars[source]
- while sourcevars:
- # take a variable randomly, and all variables supporting the
+ sourceterms = self._sourcesterms[source]
+ while sourceterms:
+ # take a term randomly, and all terms supporting the
# same solutions
- var, solindices = self._choose_var(sourcevars)
+ term, solindices = self._choose_term(sourceterms)
if source.uri == 'system':
# ensure all variables are available for the latest step
# (missing one will be available from temporary tables
# of previous steps)
scope = select
- variables = scope.defined_vars.values() + scope.aliases.values()
- sourcevars.clear()
+ terms = scope.defined_vars.values() + scope.aliases.values()
+ sourceterms.clear()
+ sources = [source]
else:
- scope = var.scope
- variables = self._expand_vars(var, source, sourcevars, scope, solindices)
- if not sourcevars:
- del self._sourcesvars[source]
- # find which sources support the same variables/solutions
- sources = self._expand_sources(source, variables, solindices)
+ scope = term.scope
+ # find which sources support the same term and solutions
+ sources = self._expand_sources(source, term, solindices)
+ # no try to get as much terms as possible
+ terms = self._expand_terms(term, sources, sourceterms,
+ scope, solindices)
+ if len(terms) == 1 and isinstance(terms[0], Constant):
+ # we can't generate anything interesting with a single
+ # constant term (will generate an empty "Any" query),
+ # go to the next iteration directly!
+ continue
+ if not sourceterms:
+ del self._sourcesterms[source]
# suppose this is a final step until the contrary is proven
final = scope is select
- # set of variables which should be additionaly selected when
+ # set of terms which should be additionaly selected when
# possible
needsel = set()
# add attribute variables and mark variables which should be
# additionaly selected when possible
for var in select.defined_vars.itervalues():
- if not var in variables:
+ if not var in terms:
stinfo = var.stinfo
for ovar, rtype in stinfo['attrvars']:
- if ovar in variables:
+ if ovar in terms:
needsel.add(var.name)
- variables.append(var)
+ terms.append(var)
break
else:
needsel.add(var.name)
@@ -591,15 +687,14 @@
eid = const.eval(self.plan.args)
_source = self._session.source_from_eid(eid)
if len(sources) > 1 or not _source in sources:
- # if there is some rewriten constant used by a
- # not neged relation while there are some source
- # not supporting the associated entity, this step
- # can't be final (unless the relation is explicitly
- # in `variables`, eg cross relations)
+ # if there is some rewriten constant used by a not
+ # neged relation while there are some source not
+ # supporting the associated entity, this step can't
+ # be final (unless the relation is explicitly in
+ # `terms`, eg cross relations)
for c in vconsts:
rel = c.relation()
- if rel is None or not (rel in variables or rel.neged(strict=True)):
- #if rel is not None and rel.r_type == 'identity' and not rel.neged(strict=True):
+ if rel is None or not (rel in terms or rel.neged(strict=True)):
final = False
break
break
@@ -614,15 +709,15 @@
needsel.add(vref.name)
final = False
break
- elif self.crossed_relation(_source, rel) and not rel in variables:
+ elif self.crossed_relation(_source, rel) and not rel in terms:
final = False
break
else:
if not scope is select:
- self._exists_relation(rel, variables, needsel)
+ self._exists_relation(rel, terms, needsel)
# if relation is supported by all sources and some of
- # its lhs/rhs variable isn't in "variables", and the
- # other end *is* in "variables", mark it have to be
+ # its lhs/rhs variable isn't in "terms", and the
+ # other end *is* in "terms", mark it have to be
# selected
if source.uri != 'system' and not rschema(rel.r_type).is_final():
lhs, rhs = rel.get_variable_parts()
@@ -634,19 +729,18 @@
rhsvar = rhs.variable
except AttributeError:
rhsvar = rhs
- if lhsvar in variables and not rhsvar in variables:
+ if lhsvar in terms and not rhsvar in terms:
needsel.add(lhsvar.name)
- elif rhsvar in variables and not lhsvar in variables:
+ elif rhsvar in terms and not lhsvar in terms:
needsel.add(rhsvar.name)
if final:
- self._cleanup_sourcesvars(sources, solindices)
- # XXX rename: variables may contain Relation and Constant nodes...
- steps.append( (sources, variables, solindices, scope, needsel,
- final) )
+ self._cleanup_sourcesterms(sources, solindices)
+ steps.append((sources, terms, solindices, scope, needsel, final)
+ )
return steps
- def _exists_relation(self, rel, variables, needsel):
- rschema = self.plan.schema.rschema(rel.r_type)
+ def _exists_relation(self, rel, terms, needsel):
+ rschema = self._schema.rschema(rel.r_type)
lhs, rhs = rel.get_variable_parts()
try:
lhsvar, rhsvar = lhs.variable, rhs.variable
@@ -658,135 +752,145 @@
# variable is refed by an outer scope and should be substituted
# using an 'identity' relation (else we'll get a conflict of
# temporary tables)
- if rhsvar in variables and not lhsvar in variables:
- self._identity_substitute(rel, lhsvar, variables, needsel)
- elif lhsvar in variables and not rhsvar in variables:
- self._identity_substitute(rel, rhsvar, variables, needsel)
+ if rhsvar in terms and not lhsvar in terms:
+ self._identity_substitute(rel, lhsvar, terms, needsel)
+ elif lhsvar in terms and not rhsvar in terms:
+ self._identity_substitute(rel, rhsvar, terms, needsel)
- def _identity_substitute(self, relation, var, variables, needsel):
+ def _identity_substitute(self, relation, var, terms, needsel):
newvar = self._insert_identity_variable(relation.scope, var)
if newvar is not None:
# ensure relation is using '=' operator, else we rely on a
# sqlgenerator side effect (it won't insert an inequality operator
# in this case)
relation.children[1].operator = '='
- variables.append(newvar)
+ terms.append(newvar)
needsel.add(newvar.name)
- def _choose_var(self, sourcevars):
+ def _choose_term(self, sourceterms):
+ """pick one term among terms supported by a source, which will be used
+ as a base to generate an execution step
+ """
secondchoice = None
- if len(self._sourcesvars) > 1:
+ if len(self._sourcesterms) > 1:
# priority to variable from subscopes
- for var in sourcevars:
+ for var in sourceterms:
if not var.scope is self.rqlst:
if isinstance(var, Variable):
- return var, sourcevars.pop(var)
+ return var, sourceterms.pop(var)
secondchoice = var
else:
# priority to variable outer scope
- for var in sourcevars:
+ for var in sourceterms:
if var.scope is self.rqlst:
if isinstance(var, Variable):
- return var, sourcevars.pop(var)
+ return var, sourceterms.pop(var)
secondchoice = var
if secondchoice is not None:
- return secondchoice, sourcevars.pop(secondchoice)
+ return secondchoice, sourceterms.pop(secondchoice)
# priority to variable
- for var in sourcevars:
+ for var in sourceterms:
if isinstance(var, Variable):
- return var, sourcevars.pop(var)
+ return var, sourceterms.pop(var)
# whatever
- var = iter(sourcevars).next()
- return var, sourcevars.pop(var)
-
+ var = iter(sourceterms).next()
+ return var, sourceterms.pop(var)
- def _expand_vars(self, var, source, sourcevars, scope, solindices):
- variables = [var]
+ def _expand_sources(self, selected_source, term, solindices):
+ """return all sources supporting given term / solindices"""
+ sources = [selected_source]
+ sourcesterms = self._sourcesterms
+ for source in sourcesterms:
+ if source is selected_source:
+ continue
+ if not (term in sourcesterms[source] and
+ solindices.issubset(sourcesterms[source][term])):
+ continue
+ sources.append(source)
+ if source.uri != 'system':
+ termsolindices = sourcesterms[source][term]
+ termsolindices -= solindices
+ if not termsolindices:
+ del sourcesterms[source][term]
+ return sources
+
+ def _expand_terms(self, term, sources, sourceterms, scope, solindices):
+ terms = [term]
+ sources = sorted(sources)
nbunlinked = 1
- linkedvars = self._linkedvars
- # variable has to belong to the same scope if there is more
+ linkedterms = self._linkedterms
+ # term has to belong to the same scope if there is more
# than the system source remaining
- if len(self._sourcesvars) > 1 and not scope is self.rqlst:
- candidates = (v for v in sourcevars.keys() if scope is v.scope)
+ if len(self._sourcesterms) > 1 and not scope is self.rqlst:
+ candidates = (t for t in sourceterms.keys() if scope is t.scope)
else:
- candidates = sourcevars #.iterkeys()
- # we only want one unlinked variable in each generated query
- candidates = [v for v in candidates
- if isinstance(v, Constant) or
- (solindices.issubset(sourcevars[v]) and v in linkedvars)]
- accept_var = lambda x: (isinstance(x, Constant) or any(v for v in variables if v in linkedvars.get(x, ())))
- source_cross_rels = self._crossrelations.get(source, ())
- if isinstance(var, Relation) and var in source_cross_rels:
- cross_vars = source_cross_rels.pop(var)
- base_accept_var = accept_var
- accept_var = lambda x: (base_accept_var(x) or x in cross_vars)
- for refed in cross_vars:
+ candidates = sourceterms #.iterkeys()
+ # we only want one unlinked term in each generated query
+ candidates = [t for t in candidates
+ if isinstance(t, Constant) or
+ (solindices.issubset(sourceterms[t]) and t in linkedterms)]
+ accept_term = lambda x: (not any(s for s in sources if not x in self._sourcesterms[s])
+ and any(t for t in terms if t in linkedterms.get(x, ())))
+ source_cross_rels = {}
+ for source in sources:
+ source_cross_rels.update(self._crossrelations.get(source, {}))
+ if isinstance(term, Relation) and term in source_cross_rels:
+ cross_terms = source_cross_rels.pop(term)
+ base_accept_term = accept_term
+ accept_term = lambda x: (base_accept_term(x) or x in cross_terms)
+ for refed in cross_terms:
if not refed in candidates:
candidates.append(refed)
else:
- cross_vars = ()
- # repeat until no variable can't be added, since addition of a new
- # variable may permit to another one to be added
+ cross_terms = ()
+ # repeat until no term can't be added, since addition of a new
+ # term may permit to another one to be added
modified = True
while modified and candidates:
modified = False
- for var in candidates[:]:
- if accept_var(var):
- variables.append(var)
- try:
- # constant nodes should be systematically deleted
- if isinstance(var, Constant):
- del sourcevars[var]
- else:
- # variable nodes should be deleted once all possible
- # solutions indices have been consumed
- sourcevars[var] -= solindices
- if not sourcevars[var]:
- del sourcevars[var]
- except KeyError:
- assert var in cross_vars
- candidates.remove(var)
+ for term in candidates[:]:
+ if isinstance(term, Constant):
+ relation = term.relation()
+ if sorted(set(x[0] for x in self._term_sources(term))) != sources:
+ continue
+ terms.append(term)
+ candidates.remove(term)
modified = True
- return variables
+ del sourceterms[term]
+ elif accept_term(term):
+ terms.append(term)
+ candidates.remove(term)
+ modified = True
+ for source in sources:
+ sourceterms = self._sourcesterms[source]
+ # terms should be deleted once all possible solutions
+ # indices have been consumed
+ try:
+ sourceterms[term] -= solindices
+ if not sourceterms[term]:
+ del sourceterms[term]
+ except KeyError:
+ assert term in cross_terms
+ return terms
- def _expand_sources(self, selected_source, vars, solindices):
- sources = [selected_source]
- sourcesvars = self._sourcesvars
- for source in sourcesvars:
- if source is selected_source:
- continue
- for var in vars:
- if not (var in sourcesvars[source] and
- solindices.issubset(sourcesvars[source][var])):
- break
- else:
- sources.append(source)
- if source.uri != 'system':
- for var in vars:
- varsolindices = sourcesvars[source][var]
- varsolindices -= solindices
- if not varsolindices:
- del sourcesvars[source][var]
- return sources
-
- def _cleanup_sourcesvars(self, sources, solindices):
+ def _cleanup_sourcesterms(self, sources, solindices):
"""on final parts, remove solutions so we know they are already processed"""
for source in sources:
try:
- sourcevars = self._sourcesvars[source]
+ sourceterms = self._sourcesterms[source]
except KeyError:
continue
- for var, varsolindices in sourcevars.items():
- if isinstance(var, Relation) and self.crossed_relation(source, var):
+ for term, termsolindices in sourceterms.items():
+ if isinstance(term, Relation) and self.crossed_relation(source, term):
continue
- varsolindices -= solindices
- if not varsolindices:
- del sourcevars[var]
+ termsolindices -= solindices
+ if not termsolindices:
+ del sourceterms[term]
def merge_input_maps(self, allsolindices):
- """inputmaps is a dictionary with tuple of solution indices as key with an
- associateed input map as value. This function compute for each solution
- its necessary input map and return them grouped
+ """inputmaps is a dictionary with tuple of solution indices as key with
+ an associated input map as value. This function compute for each
+ solution its necessary input map and return them grouped
ex:
inputmaps = {(0, 1, 2): {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1'},
@@ -822,26 +926,38 @@
def build_final_part(self, select, solindices, inputmap, sources,
insertedvars):
- plan = self.plan
- rqlst = plan.finalize(select, [self._solutions[i] for i in solindices],
- insertedvars)
+ solutions = [self._solutions[i] for i in solindices]
+ if self._conflicts:
+ for varname, mappedto in self._conflicts:
+ var = select.defined_vars[varname]
+ newvar = select.make_variable()
+ # XXX should use var.scope but scope hasn't been computed yet
+ select.add_relation(var, 'identity', newvar)
+ for sol in solutions:
+ sol[newvar.name] = sol[varname]
+ inputmap[newvar.name] = mappedto
+ rqlst = self.plan.finalize(select, solutions, insertedvars)
if self.temptable is None and self.finaltable is None:
- return OneFetchStep(plan, rqlst, sources, inputmap=inputmap)
+ return OneFetchStep(self.plan, rqlst, sources, inputmap=inputmap)
table = self.temptable or self.finaltable
- return FetchStep(plan, rqlst, sources, table, True, inputmap)
+ return FetchStep(self.plan, rqlst, sources, table, True, inputmap)
def build_non_final_part(self, select, solindices, sources, insertedvars,
table):
"""non final step, will have to store results in a temporary table"""
- plan = self.plan
- rqlst = plan.finalize(select, [self._solutions[i] for i in solindices],
- insertedvars)
- step = FetchStep(plan, rqlst, sources, table, False)
+ solutions = [self._solutions[i] for i in solindices]
+ rqlst = self.plan.finalize(select, solutions, insertedvars)
+ step = FetchStep(self.plan, rqlst, sources, table, False)
# update input map for following steps, according to processed solutions
inputmapkey = tuple(sorted(solindices))
inputmap = self._inputmaps.setdefault(inputmapkey, {})
+ for varname, mapping in step.outputmap.iteritems():
+ if varname in inputmap and \
+ not (mapping == inputmap[varname] or
+ self._schema.eschema(solutions[0][varname]).is_final()):
+ self._conflicts.append((varname, inputmap[varname]))
inputmap.update(step.outputmap)
- plan.add_step(step)
+ self.plan.add_step(step)
class MSPlanner(SSPlanner):
@@ -972,10 +1088,10 @@
atemptable = None
selection = select.selection
ppi.temptable = atemptable
- vfilter = VariablesFiltererVisitor(self.schema, ppi)
+ vfilter = TermsFiltererVisitor(self.schema, ppi)
steps = []
- for sources, variables, solindices, scope, needsel, final in stepdefs:
- # extract an executable query using only the specified variables
+ for sources, terms, solindices, scope, needsel, final in stepdefs:
+ # extract an executable query using only the specified terms
if sources[0].uri == 'system':
# in this case we have to merge input maps before call to
# filter so already processed restriction are correctly
@@ -983,7 +1099,7 @@
solsinputmaps = ppi.merge_input_maps(solindices)
for solindices, inputmap in solsinputmaps:
minrqlst, insertedvars = vfilter.filter(
- sources, variables, scope, set(solindices), needsel, final)
+ sources, terms, scope, set(solindices), needsel, final)
if inputmap is None:
inputmap = subinputmap
else:
@@ -992,10 +1108,10 @@
sources, insertedvars))
else:
# this is a final part (i.e. retreiving results for the
- # original query part) if all variable / sources have been
+ # original query part) if all term / sources have been
# treated or if this is the last shot for used solutions
minrqlst, insertedvars = vfilter.filter(
- sources, variables, scope, solindices, needsel, final)
+ sources, terms, scope, solindices, needsel, final)
if final:
solsinputmaps = ppi.merge_input_maps(solindices)
for solindices, inputmap in solsinputmaps:
@@ -1004,9 +1120,9 @@
else:
inputmap.update(subinputmap)
steps.append(ppi.build_final_part(minrqlst, solindices, inputmap,
- sources, insertedvars))
+ sources, insertedvars))
else:
- table = '_T%s%s' % (''.join(sorted(v._ms_table_key() for v in variables)),
+ table = '_T%s%s' % (''.join(sorted(v._ms_table_key() for v in terms)),
''.join(sorted(str(i) for i in solindices)))
ppi.build_non_final_part(minrqlst, solindices, sources,
insertedvars, table)
@@ -1039,7 +1155,7 @@
pass
-class VariablesFiltererVisitor(object):
+class TermsFiltererVisitor(object):
def __init__(self, schema, ppi):
self.schema = schema
self.ppi = ppi
@@ -1048,9 +1164,9 @@
self.extneedsel = frozenset(vref.name for sortterm in ppi.rqlst.orderby
for vref in sortterm.iget_nodes(VariableRef))
- def _rqlst_accept(self, rqlst, node, newroot, variables, setfunc=None):
+ def _rqlst_accept(self, rqlst, node, newroot, terms, setfunc=None):
try:
- newrestr, node_ = node.accept(self, newroot, variables[:])
+ newrestr, node_ = node.accept(self, newroot, terms[:])
except UnsupportedBranch:
return rqlst
if setfunc is not None and newrestr is not None:
@@ -1059,18 +1175,18 @@
rqlst = node.parent
return rqlst
- def filter(self, sources, variables, rqlst, solindices, needsel, final):
+ def filter(self, sources, terms, rqlst, solindices, needsel, final):
if server.DEBUG:
- print 'filter', final and 'final' or '', sources, variables, rqlst, solindices, needsel
+ print 'filter', final and 'final' or '', sources, terms, rqlst, solindices, needsel
newroot = Select()
self.sources = sorted(sources)
- self.variables = variables
+ self.terms = terms
self.solindices = solindices
self.final = final
- # variables which appear in unsupported branches
+ # terms which appear in unsupported branches
needsel |= self.extneedsel
self.needsel = needsel
- # variables which appear in supported branches
+ # terms which appear in supported branches
self.mayneedsel = set()
# new inserted variables
self.insertedvars = []
@@ -1079,36 +1195,36 @@
self.use_only_defined = False
self.scopes = {rqlst: newroot}
if rqlst.where:
- rqlst = self._rqlst_accept(rqlst, rqlst.where, newroot, variables,
+ rqlst = self._rqlst_accept(rqlst, rqlst.where, newroot, terms,
newroot.set_where)
if isinstance(rqlst, Select):
self.use_only_defined = True
if rqlst.groupby:
groupby = []
for node in rqlst.groupby:
- rqlst = self._rqlst_accept(rqlst, node, newroot, variables,
+ rqlst = self._rqlst_accept(rqlst, node, newroot, terms,
groupby.append)
if groupby:
newroot.set_groupby(groupby)
if rqlst.having:
having = []
for node in rqlst.having:
- rqlst = self._rqlst_accept(rqlst, node, newroot, variables,
+ rqlst = self._rqlst_accept(rqlst, node, newroot, terms,
having.append)
if having:
newroot.set_having(having)
if final and rqlst.orderby and not self.hasaggrstep:
orderby = []
for node in rqlst.orderby:
- rqlst = self._rqlst_accept(rqlst, node, newroot, variables,
+ rqlst = self._rqlst_accept(rqlst, node, newroot, terms,
orderby.append)
if orderby:
newroot.set_orderby(orderby)
- self.process_selection(newroot, variables, rqlst)
+ self.process_selection(newroot, terms, rqlst)
elif not newroot.where:
- # no restrictions have been copied, just select variables and add
+ # no restrictions have been copied, just select terms and add
# type restriction (done later by add_types_restriction)
- for v in variables:
+ for v in terms:
if not isinstance(v, Variable):
continue
newroot.append_selected(VariableRef(newroot.get_variable(v.name)))
@@ -1156,12 +1272,12 @@
print '--->', newroot
return newroot, self.insertedvars
- def visit_and(self, node, newroot, variables):
+ def visit_and(self, node, newroot, terms):
subparts = []
for i in xrange(len(node.children)):
child = node.children[i]
try:
- newchild, child_ = child.accept(self, newroot, variables)
+ newchild, child_ = child.accept(self, newroot, terms)
if not child_ is child:
node = child_.parent
if newchild is None:
@@ -1180,10 +1296,10 @@
def _relation_supported(self, relation):
rtype = relation.r_type
for source in self.sources:
- if not source.support_relation(rtype) \
- or (rtype in source.cross_relations and not relation in self.variables):#self.ppi.crossed_relation(source, relation):
+ if not source.support_relation(rtype) or (
+ rtype in source.cross_relations and not relation in self.terms):
return False
- if not self.final:
+ if not self.final and not relation in self.terms:
rschema = self.schema.rschema(relation.r_type)
if not rschema.is_final():
for term in relation.get_nodes((VariableRef, Constant)):
@@ -1193,7 +1309,7 @@
return False
return True
- def visit_relation(self, node, newroot, variables):
+ def visit_relation(self, node, newroot, terms):
if not node.is_types_restriction():
if node in self.skip and self.solindices.issubset(self.skip[node]):
if not self.schema.rschema(node.r_type).is_final():
@@ -1214,12 +1330,15 @@
# copy a type restriction while the variable is not actually used)
elif not any(self._relation_supported(rel)
for rel in node.children[0].variable.stinfo['relations']):
- rel, node = self.visit_default(node, newroot, variables)
+ rel, node = self.visit_default(node, newroot, terms)
return rel, node
else:
raise UnsupportedBranch()
rschema = self.schema.rschema(node.r_type)
- res = self.visit_default(node, newroot, variables)[0]
+ try:
+ res = self.visit_default(node, newroot, terms)[0]
+ except Exception, ex:
+ raise
ored = node.ored()
if rschema.is_final() or rschema.inlined:
vrefs = node.children[1].get_nodes(VariableRef)
@@ -1233,13 +1352,13 @@
vref = vrefs[0]
# XXX check operator ?
self.hasvar[(node.children[0].name, rschema)] = vref
- if self._may_skip_attr_rel(rschema, node, vref, ored, variables, res):
+ if self._may_skip_attr_rel(rschema, node, vref, ored, terms, res):
self.skip.setdefault(node, set()).update(self.solindices)
elif not ored:
self.skip.setdefault(node, set()).update(self.solindices)
return res, node
- def _may_skip_attr_rel(self, rschema, rel, vref, ored, variables, res):
+ def _may_skip_attr_rel(self, rschema, rel, vref, ored, terms, res):
var = vref.variable
if ored:
return False
@@ -1247,35 +1366,35 @@
return False
if not same_scope(var):
return False
- if any(v for v,_ in var.stinfo['attrvars'] if not v.name in variables):
+ if any(v for v,_ in var.stinfo['attrvars'] if not v.name in terms):
return False
return True
- def visit_exists(self, node, newroot, variables):
+ def visit_exists(self, node, newroot, terms):
newexists = node.__class__()
self.scopes = {node: newexists}
- subparts, node = self._visit_children(node, newroot, variables)
+ subparts, node = self._visit_children(node, newroot, terms)
if not subparts:
return None, node
newexists.set_where(subparts[0])
return newexists, node
- def visit_not(self, node, newroot, variables):
- subparts, node = self._visit_children(node, newroot, variables)
+ def visit_not(self, node, newroot, terms):
+ subparts, node = self._visit_children(node, newroot, terms)
if not subparts:
return None, node
return copy_node(newroot, node, subparts), node
- def visit_group(self, node, newroot, variables):
+ def visit_group(self, node, newroot, terms):
if not self.final:
return None, node
- return self.visit_default(node, newroot, variables)
+ return self.visit_default(node, newroot, terms)
- def visit_variableref(self, node, newroot, variables):
+ def visit_variableref(self, node, newroot, terms):
if self.use_only_defined:
if not node.variable.name in newroot.defined_vars:
raise UnsupportedBranch(node.name)
- elif not node.variable in variables:
+ elif not node.variable in terms:
raise UnsupportedBranch(node.name)
self.mayneedsel.add(node.name)
# set scope so we can insert types restriction properly
@@ -1283,28 +1402,28 @@
newvar.stinfo['scope'] = self.scopes.get(node.variable.scope, newroot)
return VariableRef(newvar), node
- def visit_constant(self, node, newroot, variables):
+ def visit_constant(self, node, newroot, terms):
return copy_node(newroot, node), node
- def visit_default(self, node, newroot, variables):
- subparts, node = self._visit_children(node, newroot, variables)
+ def visit_default(self, node, newroot, terms):
+ subparts, node = self._visit_children(node, newroot, terms)
return copy_node(newroot, node, subparts), node
visit_comparison = visit_mathexpression = visit_constant = visit_function = visit_default
visit_sort = visit_sortterm = visit_default
- def _visit_children(self, node, newroot, variables):
+ def _visit_children(self, node, newroot, terms):
subparts = []
for i in xrange(len(node.children)):
child = node.children[i]
- newchild, child_ = child.accept(self, newroot, variables)
+ newchild, child_ = child.accept(self, newroot, terms)
if not child is child_:
node = child_.parent
if newchild is not None:
subparts.append(newchild)
return subparts, node
- def process_selection(self, newroot, variables, rqlst):
+ def process_selection(self, newroot, terms, rqlst):
if self.final:
for term in rqlst.selection:
newroot.append_selected(term.copy(newroot))
@@ -1317,7 +1436,7 @@
supportedvars = []
for vref in vrefs:
var = vref.variable
- if var in variables:
+ if var in terms:
supportedvars.append(vref)
continue
else:
@@ -1331,9 +1450,9 @@
if not vref in newroot.get_selected_variables():
newroot.append_selected(VariableRef(newroot.get_variable(vref.name)))
- def add_necessary_selection(self, newroot, variables):
+ def add_necessary_selection(self, newroot, terms):
selected = tuple(newroot.get_selected_variables())
- for varname in variables:
+ for varname in terms:
var = newroot.defined_vars[varname]
for vref in var.references():
rel = vref.relation()
--- a/server/test/unittest_msplanner.py Thu Apr 02 18:52:08 2009 +0200
+++ b/server/test/unittest_msplanner.py Fri Apr 03 19:04:00 2009 +0200
@@ -118,19 +118,19 @@
def _test(self, rql, *args):
if len(args) == 3:
- kwargs, sourcesvars, needsplit = args
+ kwargs, sourcesterms, needsplit = args
else:
- sourcesvars, needsplit = args
+ sourcesterms, needsplit = args
kwargs = None
plan = self._prepare_plan(rql, kwargs)
union = plan.rqlst
plan.preprocess(union)
ppi = PartPlanInformation(plan, union.children[0])
- for sourcevars in ppi._sourcesvars.itervalues():
+ for sourcevars in ppi._sourcesterms.itervalues():
for var in sourcevars.keys():
solindices = sourcevars.pop(var)
sourcevars[var._ms_table_key()] = solindices
- self.assertEquals(ppi._sourcesvars, sourcesvars)
+ self.assertEquals(ppi._sourcesterms, sourcesterms)
self.assertEquals(ppi.needsplit, needsplit)
@@ -163,7 +163,7 @@
"""
ueid = self.session.user.eid
self._test('Any X WHERE X eid %(x)s', {'x': ueid},
- {}, False)
+ {self.system: {'x': s[0]}}, False)
def test_simple_invariant(self):
"""retrieve EUser X from system source only (X is invariant and in_group not supported by ldap source)
@@ -241,8 +241,11 @@
def test_complex_optional(self):
ueid = self.session.user.eid
self._test('Any U WHERE WF wf_info_for X, X eid %(x)s, WF owned_by U?, WF from_state FS', {'x': ueid},
- {self.system: {'WF': s[0], 'FS': s[0], 'U': s[0], 'from_state': s[0], 'owned_by': s[0], 'wf_info_for': s[0]}}, False)
-
+ {self.system: {'WF': s[0], 'FS': s[0], 'U': s[0],
+ 'from_state': s[0], 'owned_by': s[0], 'wf_info_for': s[0],
+ 'x': s[0]}},
+ False)
+
def test_exists4(self):
"""
State S could come from both rql source and system source,
@@ -269,13 +272,45 @@
repo._type_source_cache[999999] = ('Note', 'cards', 999999)
self._test('Any U WHERE U in_group G, (G name IN ("managers", "logilab") OR (X require_permission P?, P name "bla", P require_group G)), X eid %(x)s, U eid %(u)s',
{'x': 999999, 'u': self.session.user.eid},
- {self.system: {'P': s[0], 'G': s[0], 'X': s[0], 'require_permission': s[0], 'in_group': s[0], 'P': s[0], 'require_group': s[0]}}, False)
+ {self.system: {'P': s[0], 'G': s[0], 'X': s[0],
+ 'require_permission': s[0], 'in_group': s[0], 'P': s[0], 'require_group': s[0],
+ 'u': s[0]}},
+ False)
def test_delete_relation1(self):
ueid = self.session.user.eid
self._test('Any X, Y WHERE X created_by Y, X eid %(x)s, NOT Y eid %(y)s',
{'x': ueid, 'y': ueid},
- {self.system: {'Y': s[0], 'created_by': s[0]}}, False)
+ {self.system: {'Y': s[0], 'created_by': s[0], 'x': s[0]}}, False)
+
+ def test_crossed_relation_eid_1_needattr(self):
+ repo._type_source_cache[999999] = ('Note', 'system', 999999)
+ ueid = self.session.user.eid
+ self._test('Any Y,T WHERE X eid %(x)s, X multisource_crossed_rel Y, Y type T',
+ {'x': 999999,},
+ {self.rql: {'Y': s[0]}, self.system: {'Y': s[0], 'x': s[0]}}, True)
+
+ def test_crossed_relation_eid_1_invariant(self):
+ repo._type_source_cache[999999] = ('Note', 'system', 999999)
+ self._test('Any Y WHERE X eid %(x)s, X multisource_crossed_rel Y',
+ {'x': 999999},
+ {self.system: {'Y': s[0], 'x': s[0]}}, False)
+
+ def test_crossed_relation_eid_2_invariant(self):
+ repo._type_source_cache[999999] = ('Note', 'cards', 999999)
+ self._test('Any Y WHERE X eid %(x)s, X multisource_crossed_rel Y',
+ {'x': 999999,},
+ {self.rql: {'Y': s[0], 'multisource_crossed_rel': s[0], 'x': s[0]},
+ self.system: {'Y': s[0], 'multisource_crossed_rel': s[0], 'x': s[0]}},
+ False)
+
+ def test_version_crossed_depends_on_1(self):
+ repo._type_source_cache[999999] = ('Note', 'cards', 999999)
+ self._test('Any X,AD,AE WHERE E eid %(x)s, E multisource_crossed_rel X, X in_state AD, AD name AE',
+ {'x': 999999},
+ {self.rql: {'X': s[0], 'AD': s[0], 'multisource_crossed_rel': s[0], 'x': s[0]},
+ self.system: {'X': s[0], 'AD': s[0], 'multisource_crossed_rel': s[0], 'x': s[0]}},
+ True)
@@ -1142,32 +1177,27 @@
# in the source where %(x)s is not coming from and will be removed during rql
# generation for the external source
self._test('Any SN WHERE NOT X in_state S, X eid %(x)s, S name SN',
- [('OneFetchStep', [('Any SN WHERE NOT 5 in_state S, S name SN, S is State', [{'S': 'State', 'SN': 'String'}])],
+ [('OneFetchStep', [('Any SN WHERE NOT 5 in_state S, S name SN, S is State',
+ [{'S': 'State', 'SN': 'String'}])],
None, None, [self.rql, self.system], {}, [])],
{'x': ueid})
def test_not_relation_no_split_external(self):
repo._type_source_cache[999999] = ('Note', 'cards', 999999)
- # similar to the above test but with an eid coming from the external source
+ # similar to the above test but with an eid coming from the external source.
+ # the same plan may be used, since we won't find any record in the system source
+ # linking 9999999 to a state
self._test('Any SN WHERE NOT X in_state S, X eid %(x)s, S name SN',
- [('UnionStep', None, None,
- [('OneFetchStep',
- [('Any SN WHERE NOT 999999 in_state S, S name SN, S is State',
- [{'S': 'State', 'SN': 'String'}])],
- None, None, [self.rql], {},
- []),
- ('OneFetchStep',
- [('Any SN WHERE S name SN, S is State',
- [{'S': 'State', 'SN': 'String'}])],
- None, None, [self.system], {},
- [])]
- )],
+ [('OneFetchStep', [('Any SN WHERE NOT 999999 in_state S, S name SN, S is State',
+ [{'S': 'State', 'SN': 'String'}])],
+ None, None, [self.rql, self.system], {}, [])],
{'x': 999999})
def test_not_relation_need_split(self):
ueid = self.session.user.eid
self._test('Any SN WHERE NOT X in_state S, S name SN',
- [('FetchStep', [('Any SN,S WHERE S name SN, S is State', [{'S': 'State', 'SN': 'String'}])],
+ [('FetchStep', [('Any SN,S WHERE S name SN, S is State',
+ [{'S': 'State', 'SN': 'String'}])],
[self.rql, self.system], None, {'S': 'table0.C1', 'S.name': 'table0.C0', 'SN': 'table0.C0'},
[]),
('FetchStep', [('Any X WHERE X is Note', [{'X': 'Note'}])],
@@ -1372,7 +1402,6 @@
def test_crossed_relation_eid_1_invariant(self):
repo._type_source_cache[999999] = ('Note', 'system', 999999)
- ueid = self.session.user.eid
self._test('Any Y WHERE X eid %(x)s, X multisource_crossed_rel Y',
[('OneFetchStep', [('Any Y WHERE 999999 multisource_crossed_rel Y', [{u'Y': 'Note'}])],
None, None, [self.system], {}, [])
@@ -1381,7 +1410,6 @@
def test_crossed_relation_eid_1_needattr(self):
repo._type_source_cache[999999] = ('Note', 'system', 999999)
- ueid = self.session.user.eid
self._test('Any Y,T WHERE X eid %(x)s, X multisource_crossed_rel Y, Y type T',
[('FetchStep', [('Any Y,T WHERE Y type T, Y is Note', [{'T': 'String', 'Y': 'Note'}])],
[self.rql, self.system], None,
@@ -1395,16 +1423,14 @@
def test_crossed_relation_eid_2_invariant(self):
repo._type_source_cache[999999] = ('Note', 'cards', 999999)
- ueid = self.session.user.eid
self._test('Any Y WHERE X eid %(x)s, X multisource_crossed_rel Y',
[('OneFetchStep', [('Any Y WHERE 999999 multisource_crossed_rel Y, Y is Note', [{'Y': 'Note'}])],
None, None, [self.rql, self.system], {}, [])
],
{'x': 999999,})
- def test_crossed_relation_eid_2_needattr(self):
+ def test_crossed_relation_eid_2_needattr_XXXFIXME(self):
repo._type_source_cache[999999] = ('Note', 'cards', 999999)
- ueid = self.session.user.eid
self._test('Any Y,T WHERE X eid %(x)s, X multisource_crossed_rel Y, Y type T',
[('FetchStep', [('Any Y,T WHERE Y type T, Y is Note', [{'T': 'String', 'Y': 'Note'}])],
[self.rql, self.system], None,
@@ -1419,7 +1445,6 @@
def test_crossed_relation_eid_not_1(self):
repo._type_source_cache[999999] = ('Note', 'system', 999999)
- ueid = self.session.user.eid
self._test('Any Y WHERE X eid %(x)s, NOT X multisource_crossed_rel Y',
[('FetchStep', [('Any Y WHERE Y is Note', [{'Y': 'Note'}])],
[self.rql, self.system], None, {'Y': 'table0.C0'}, []),
@@ -1431,14 +1456,12 @@
# def test_crossed_relation_eid_not_2(self):
# repo._type_source_cache[999999] = ('Note', 'cards', 999999)
-# ueid = self.session.user.eid
# self._test('Any Y WHERE X eid %(x)s, NOT X multisource_crossed_rel Y',
# [],
# {'x': 999999,})
- def test_crossed_relation_base(self):
+ def test_crossed_relation_base_XXXFIXME(self):
repo._type_source_cache[999999] = ('Note', 'system', 999999)
- ueid = self.session.user.eid
self._test('Any X,Y,T WHERE X multisource_crossed_rel Y, Y type T, X type T',
[('FetchStep', [('Any X,T WHERE X type T, X is Note', [{'T': 'String', 'X': 'Note'}])],
[self.rql, self.system], None,
@@ -1768,8 +1791,8 @@
[('FetchStep',
[('Any X,AA,AB WHERE X login AA, X modification_date AB, X is EUser',
[{'AA': 'String', 'AB': 'Datetime', 'X': 'EUser'}])],
- [self.ldap], None, {'AA': 'table0.C1', 'AB': 'table0.C2',
- 'X': 'table0.C0', 'X.login': 'table0.C1', 'X.modification_date': 'table0.C2'},
+ [self.ldap, self.system], None, {'AA': 'table0.C1', 'AB': 'table0.C2',
+ 'X': 'table0.C0', 'X.login': 'table0.C1', 'X.modification_date': 'table0.C2'},
[]),
('OneFetchStep',
[('Any X,AA,AB ORDERBY AA WHERE 999999 owned_by X, X login AA, X modification_date AB, X is EUser',
@@ -1799,7 +1822,7 @@
self._test('Any X ORDERBY Z DESC WHERE X modification_date Z, E eid %(x)s, E see_also X',
[('FetchStep', [('Any X,Z WHERE X modification_date Z, X is Note',
[{'X': 'Note', 'Z': 'Datetime'}])],
- [self.rql], None, {'X': 'table0.C0', 'X.modification_date': 'table0.C1', 'Z': 'table0.C1'},
+ [self.rql, self.system], None, {'X': 'table0.C0', 'X.modification_date': 'table0.C1', 'Z': 'table0.C1'},
[]),
('AggrStep', 'Any X ORDERBY Z DESC',
None, None, 'table1', None,
@@ -1881,12 +1904,11 @@
class MSPlannerTwoSameExternalSourcesTC(BasePlannerTC):
"""test planner related feature on a 3-sources repository:
- * 2 rql source supporting Card
+ * 2 rql sources supporting Card
"""
repo = repo
def setUp(self):
- #_QuerierTC.setUp(self)
self.o = repo.querier
self.session = repo._sessions.values()[0]
self.pool = self.session.set_pool()
@@ -1903,7 +1925,10 @@
self.rql2 = self.sources[-1]
do_monkey_patch()
self.planner = MSPlanner(self.o.schema, self.o._rqlhelper)
-
+ assert repo.sources_by_uri['cards2'].support_relation('multisource_crossed_rel')
+ assert 'multisource_crossed_rel' in repo.sources_by_uri['cards2'].cross_relations
+ assert repo.sources_by_uri['cards'].support_relation('multisource_crossed_rel')
+ assert 'multisource_crossed_rel' in repo.sources_by_uri['cards'].cross_relations
_test = test_plan
def tearDown(self):
@@ -1928,7 +1953,55 @@
{'X': 'table0.C0', 'X.title': 'table0.C1', 'XT': 'table0.C1'},
[])],
{'t': 999999})
-
+
+ def test_version_depends_on(self):
+ self.repo._type_source_cache[999999] = ('Note', 'cards', 999999)
+ self._test('Any X,AD,AE WHERE E eid %(x)s, E migrated_from X, X in_state AD, AD name AE',
+ [('FetchStep', [('Any X,AD,AE WHERE X in_state AD, AD name AE, AD is State, X is Note',
+ [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])],
+ [self.rql, self.rql2, self.system],
+ None, {'AD': 'table0.C1', 'AD.name': 'table0.C2',
+ 'AE': 'table0.C2', 'X': 'table0.C0'},
+ []),
+ ('OneFetchStep', [('Any X,AD,AE WHERE 999999 migrated_from X, AD name AE, AD is State, X is Note',
+ [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])],
+ None, None, [self.system],
+ {'AD': 'table0.C1', 'AD.name': 'table0.C2', 'AE': 'table0.C2', 'X': 'table0.C0'},
+ [])],
+ {'x': 999999})
+
+ def test_version_crossed_depends_on_1(self):
+ self.repo._type_source_cache[999999] = ('Note', 'cards', 999999)
+ self._test('Any X,AD,AE WHERE E eid %(x)s, E multisource_crossed_rel X, X in_state AD, AD name AE',
+ [('FetchStep', [('Any X,AD,AE WHERE X in_state AD, AD name AE, AD is State, X is Note',
+ [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])],
+ [self.rql, self.rql2, self.system],
+ None, {'AD': 'table0.C1', 'AD.name': 'table0.C2',
+ 'AE': 'table0.C2', 'X': 'table0.C0'},
+ []),
+ ('FetchStep', [('Any X WHERE 999999 multisource_crossed_rel X, X is Note',
+ [{'X': 'Note'}])],
+ [self.rql, self.system], None, {'X': 'table1.C0'},
+ []),
+ ('OneFetchStep', [('Any X,AD,AE WHERE AD name AE, AD is State, X is Note, X identity A',
+ [{'A': 'Note', 'AD': 'State', 'AE': 'String', 'X': 'Note'}])],
+ None, None, [self.system],
+ {'A': 'table0.C0', 'AD': 'table0.C1', 'AD.name': 'table0.C2',
+ 'AE': 'table0.C2', 'X': 'table1.C0'},
+ [])],
+ {'x': 999999})
+
+ def test_version_crossed_depends_on_2_XXXFIXME(self):
+ self.repo._type_source_cache[999999] = ('Note', 'system', 999999)
+ self._test('Any X,AD,AE WHERE E eid %(x)s, E multisource_crossed_rel X, X in_state AD, AD name AE',
+ [],
+ {'x': 999999})
+
+ def test_version_crossed_depends_on_3_XXXFIXME(self):
+ self._test('Any X,AD,AE WHERE E multisource_crossed_rel X, X in_state AD, AD name AE, E is Note',
+ [])
+
+
if __name__ == '__main__':
from logilab.common.testlib import unittest_main
unittest_main()