server/msplanner.py
changeset 257 4c7d3af7e94d
child 341 0a426be2f3a2
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/server/msplanner.py	Mon Dec 22 17:34:15 2008 +0100
@@ -0,0 +1,1212 @@
+"""plan execution of rql queries on multiple sources
+
+the best way to understand what are we trying to acheive here is to read
+the unit-tests in unittest_querier_planner.py
+
+
+
+Split and execution specifications
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+For a system source and a ldap user source (only EUser and its attributes
+is supported, no group or such):
+
+
+:EUser X:
+1. fetch EUser X from both sources and return concatenation of results
+
+
+:EUser X WHERE X in_group G, G name 'users':
+* catch 1
+  1. fetch EUser X from both sources, store concatenation of results
+     into a temporary table
+  2. return the result of TMP X WHERE X in_group G, G name 'users' from
+     the system source
+     
+* catch 2
+  1. return the result of EUser X WHERE X in_group G, G name 'users'
+     from system source, that's enough (optimization of the sql querier
+     will avoid join on EUser, so we will directly get local eids)
+
+    
+:EUser X,L WHERE X in_group G, X login L, G name 'users':
+1. fetch Any X,L WHERE X is EUser, X login L from both sources, store
+   concatenation of results into a temporary table
+2. return the result of Any X, L WHERE X is TMP, X login LX in_group G,
+   G name 'users' from the system source
+
+
+:Any X WHERE X owned_by Y:
+* catch 1
+  1. fetch EUser X from both sources, store concatenation of results
+     into a temporary table
+  2. return the result of Any X WHERE X owned_by Y, Y is TMP from
+     the system source
+     
+* catch 2
+  1. return the result of Any X WHERE X owned_by Y
+     from system source, that's enough (optimization of the sql querier
+     will avoid join on EUser, so we will directly get local eids)
+
+
+:organization: Logilab
+:copyright: 2003-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
+"""
+__docformat__ = "restructuredtext en"
+
+from itertools import imap, ifilterfalse
+
+from logilab.common.compat import any
+from logilab.common.decorators import cached
+
+from rql.stmts import Union, Select
+from rql.nodes import VariableRef, Comparison, Relation, Constant, Exists, Variable
+
+from cubicweb import server
+from cubicweb.common.utils import make_uid
+from cubicweb.server.utils import cleanup_solutions
+from cubicweb.server.ssplanner import SSPlanner, OneFetchStep, add_types_restriction
+from cubicweb.server.mssteps import *
+from cubicweb.server.sources import AbstractSource
+
+Variable._ms_table_key = lambda x: x.name
+Relation._ms_table_key = lambda x: x.r_type
+# str() Constant.value to ensure generated table name won't be unicode
+Constant._ms_table_key = lambda x: str(x.value)
+
+AbstractSource.dont_cross_relations = ()
+
+def allequals(solutions):
+    """return true if all solutions are identical"""
+    sol = solutions.next()
+    for sol_ in solutions:
+        if sol_ != sol:
+            return False
+    return True
+    
+def need_aggr_step(select, sources, stepdefs=None):
+    """return True if a temporary table is necessary to store some partial
+    results to execute the given query
+    """
+    if len(sources) == 1:
+        # can do everything at once with a single source
+        return False
+    if select.orderby or select.groupby or select.has_aggregat:
+        # if more than one source, we need a temp table to deal with sort /
+        # groups / aggregat if :
+        # * the rqlst won't be splitted (in the other case the last query
+        #   using partial temporary table can do sort/groups/aggregat without
+        #   the need for a later AggrStep)
+        # * the rqlst is splitted in multiple steps and there are more than one
+        #   final step
+        if stepdefs is None:
+            return True
+        has_one_final = False
+        fstepsolindices = set()
+        for stepdef in stepdefs:
+            if stepdef[-1]:
+                if has_one_final or frozenset(stepdef[2]) != fstepsolindices:
+                    return True
+                has_one_final = True
+            else:
+                fstepsolindices.update(stepdef[2])
+    return False
+
+def copy_node(newroot, node, subparts=()):
+    newnode = node.__class__(*node.initargs(newroot))
+    for part in subparts:
+        newnode.append(part)
+    return newnode
+        
+def same_scope(var):
+    """return true if the variable is always used in the same scope"""
+    try:
+        return var.stinfo['samescope']
+    except KeyError:
+        for rel in var.stinfo['relations']:
+            if not rel.scope is var.scope:
+                var.stinfo['samescope'] = False
+                return False
+        var.stinfo['samescope'] = True
+        return True
+
+def select_group_sort(select): # XXX something similar done in rql2sql
+    # add variables used in groups and sort terms to the selection
+    # if necessary
+    if select.groupby:
+        for vref in select.groupby:
+            if not vref in select.selection:
+                select.append_selected(vref.copy(select))
+    for sortterm in select.orderby:
+        for vref in sortterm.iget_nodes(VariableRef):
+            if not vref in select.get_selected_variables():
+                # we can't directly insert sortterm.term because it references
+                # a variable of the select before the copy.
+                # XXX if constant term are used to define sort, their value
+                # may necessite a decay
+                select.append_selected(vref.copy(select))
+                if select.groupby and not vref in select.groupby:
+                    select.add_group_var(vref.copy(select))
+            
+
+class PartPlanInformation(object):
+    """regroups necessary information to execute some part of a "global" rql
+    query ("global" means as received by the querier, which may result in
+    several internal queries, e.g. parts, due to security insertions)
+
+    it exposes as well some methods helping in executing this part on a
+    multi-sources repository, modifying its internal structure during the
+    process
+
+    :attr solutions: a list of mappings (varname -> vartype)
+    :attr sourcesvars:
+      a dictionnary telling for each source which variable/solution are
+      supported, of the form {source : {varname: [solution index, ]}}
+    """
+    def __init__(self, plan, rqlst, rqlhelper=None):
+        self.needsplit = False
+        self.temptable = None
+        self.finaltable = None
+        self.plan = plan
+        self.rqlst = rqlst
+        self._session = plan.session
+        self._solutions = rqlst.solutions
+        self._solindices = range(len(self._solutions))
+        # source : {varname: [solution index, ]}
+        self._sourcesvars = {}
+        # dictionnary of variables which are linked to each other using a non
+        # final relation which is supported by multiple sources
+        self._linkedvars = {}
+        # processing
+        self._compute_sourcesvars()
+        self._remove_invalid_sources()
+        #if server.DEBUG:
+        #    print 'planner sources vars', self._sourcesvars
+        self._compute_needsplit()
+        self._inputmaps = {}
+        if rqlhelper is not None: # else test
+            self._insert_identity_variable = rqlhelper._annotator.rewrite_shared_optional
+
+    def copy_solutions(self, solindices):
+        return [self._solutions[solidx].copy() for solidx in solindices]
+    
+    @property
+    @cached
+    def part_sources(self):
+        if self._sourcesvars:
+            return tuple(sorted(self._sourcesvars))
+        return (self._session.repo.system_source,)
+    
+    @property
+    @cached
+    def _sys_source_set(self):
+        return frozenset((self._session.repo.system_source, solindex)
+                         for solindex in self._solindices)        
+       
+    @cached
+    def _norel_support_set(self, rtype):
+        """return a set of (source, solindex) where source doesn't support the
+        relation
+        """
+        return frozenset((source, solidx) for source in self._session.repo.sources
+                         for solidx in self._solindices
+                         if not (source.support_relation(rtype)
+                                 or rtype in source.dont_cross_relations))
+    
+    def _compute_sourcesvars(self):
+        """compute for each variable/solution in the rqlst which sources support
+        them
+        """
+        repo = self._session.repo
+        eschema = repo.schema.eschema
+        sourcesvars = self._sourcesvars
+        # find for each source which variable/solution are supported
+        for varname, varobj in self.rqlst.defined_vars.items():
+            # if variable has an eid specified, we can get its source directly
+            # NOTE: use uidrels and not constnode to deal with "X eid IN(1,2,3,4)"
+            if varobj.stinfo['uidrels']:
+                vrels = varobj.stinfo['relations'] - varobj.stinfo['uidrels']
+                for rel in varobj.stinfo['uidrels']:
+                    if  rel.neged(strict=True) or rel.operator() != '=':
+                        continue
+                    for const in rel.children[1].get_nodes(Constant):
+                        eid = const.eval(self.plan.args)
+                        source = self._session.source_from_eid(eid)
+                        if vrels and not any(source.support_relation(r.r_type)
+                                             for r in vrels):
+                            self._set_source_for_var(repo.system_source, varobj)
+                        else:
+                            self._set_source_for_var(source, varobj)
+                continue
+            rels = varobj.stinfo['relations']
+            if not rels and not varobj.stinfo['typerels']:
+                # (rare) case where the variable has no type specified nor
+                # relation accessed ex. "Any MAX(X)"
+                self._set_source_for_var(repo.system_source, varobj)
+                continue
+            for i, sol in enumerate(self._solutions):
+                vartype = sol[varname]
+                # skip final variable
+                if eschema(vartype).is_final():
+                    break
+                for source in repo.sources:
+                    if source.support_entity(vartype):
+                        # the source support the entity type, though we will
+                        # actually have to fetch from it only if
+                        # * the variable isn't invariant
+                        # * at least one supported relation specified
+                        if not varobj._q_invariant or \
+                               any(imap(source.support_relation,
+                                        (r.r_type for r in rels if r.r_type != 'eid'))):
+                            sourcesvars.setdefault(source, {}).setdefault(varobj, set()).add(i)
+                        # if variable is not invariant and is used by a relation
+                        # not supported by this source, we'll have to split the
+                        # query
+                        if not varobj._q_invariant and any(ifilterfalse(
+                            source.support_relation, (r.r_type for r in rels))):
+                            self.needsplit = True               
+            
+    def _remove_invalid_sources(self):
+        """removes invalid sources from `sourcesvars` member"""
+        repo = self._session.repo
+        rschema = repo.schema.rschema
+        vsources = {}
+        for rel in self.rqlst.iget_nodes(Relation):
+            # process non final relations only
+            # note: don't try to get schema for 'is' relation (not available
+            # during bootstrap)
+            if not rel.is_types_restriction() and not rschema(rel.r_type).is_final():
+                # nothing to do if relation is not supported by multiple sources
+                relsources = [source for source in repo.sources
+                              if source.support_relation(rel.r_type)
+                              or rel.r_type in source.dont_cross_relations]
+                if len(relsources) < 2:
+                    if relsources:# and not relsources[0] in self._sourcesvars:
+                        # this means the relation is using a variable inlined as
+                        # a constant and another unsupported variable, in which
+                        # case we put the relation in sourcesvars
+                        self._sourcesvars.setdefault(relsources[0], {})[rel] = set(self._solindices)
+                    continue
+                lhs, rhs = rel.get_variable_parts()
+                lhsv, rhsv = getattr(lhs, 'variable', lhs), getattr(rhs, 'variable', rhs)
+                # update dictionnary of sources supporting lhs and rhs vars
+                if not lhsv in vsources:
+                    vsources[lhsv] = self._term_sources(lhs)
+                if not rhsv in vsources:
+                    vsources[rhsv] = self._term_sources(rhs)
+                self._linkedvars.setdefault(lhsv, set()).add((rhsv, rel))
+                self._linkedvars.setdefault(rhsv, set()).add((lhsv, rel))
+        for term in self._linkedvars:
+            self._remove_sources_until_stable(term, vsources)
+        if len(self._sourcesvars) > 1 and hasattr(self.plan.rqlst, 'main_relations'):
+            # the querier doesn't annotate write queries, need to do it here
+            self.plan.annotate_rqlst()
+            # insert/update/delete queries, we may get extra information from
+            # the main relation (eg relations to the left of the WHERE
+            if self.plan.rqlst.TYPE == 'insert':
+                inserted = dict((vref.variable, etype)
+                                for etype, vref in self.plan.rqlst.main_variables)
+            else:
+                inserted = {}
+            for rel in self.plan.rqlst.main_relations:
+                if not rschema(rel.r_type).is_final():
+                    # nothing to do if relation is not supported by multiple sources
+                    relsources = [source for source in repo.sources
+                                  if source.support_relation(rel.r_type)
+                                  or rel.r_type in source.dont_cross_relations]
+                    if len(relsources) < 2:
+                        continue
+                    lhs, rhs = rel.get_variable_parts()
+                    try:
+                        lhsv = self._extern_term(lhs, vsources, inserted)
+                        rhsv = self._extern_term(rhs, vsources, inserted)
+                    except KeyError, ex:
+                        continue
+                    norelsup = self._norel_support_set(rel.r_type)
+                    self._remove_var_sources(lhsv, norelsup, rhsv, vsources)
+                    self._remove_var_sources(rhsv, norelsup, lhsv, vsources)
+        # cleanup linked var
+        for var, linkedrelsinfo in self._linkedvars.iteritems():
+            self._linkedvars[var] = frozenset(x[0] for x in linkedrelsinfo)
+        # if there are other sources than the system source, consider simplified
+        # variables'source
+        if self._sourcesvars and self._sourcesvars.keys() != [self._session.repo.system_source]:
+            # add source for rewritten constants to sourcesvars
+            for vconsts in self.rqlst.stinfo['rewritten'].itervalues():
+                const = vconsts[0]
+                eid = const.eval(self.plan.args)
+                source = self._session.source_from_eid(eid)
+                if source is self._session.repo.system_source:
+                    for const in vconsts:
+                        self._set_source_for_var(source, const)
+                elif source in self._sourcesvars:
+                    source_scopes = frozenset(v.scope for v in self._sourcesvars[source])
+                    for const in vconsts:
+                        if const.scope in source_scopes:
+                            self._set_source_for_var(source, const)
+        
+    def _extern_term(self, term, vsources, inserted):
+        var = term.variable
+        if var.stinfo['constnode']:
+            termv = var.stinfo['constnode']
+            vsources[termv] = self._term_sources(termv)
+        elif var in inserted:
+            termv = var
+            source = self._session.repo.locate_etype_source(inserted[var])
+            vsources[termv] = set((source, solindex) for solindex in self._solindices)
+        else:
+            termv = self.rqlst.defined_vars[var.name]
+            if not termv in vsources:
+                vsources[termv] = self._term_sources(termv)
+        return termv
+        
+    def _remove_sources_until_stable(self, var, vsources):
+        for ovar, rel in self._linkedvars.get(var, ()):
+            if not var.scope is ovar.scope and rel.scope.neged(strict=True):
+                # can't get information from relation inside a NOT exists
+                # where variables don't belong to the same scope
+                continue
+            if rel.neged(strict=True):
+                # neged relation doesn't allow to infer variable sources
+                continue
+            norelsup = self._norel_support_set(rel.r_type)
+            # compute invalid sources for variables and remove them
+            self._remove_var_sources(var, norelsup, ovar, vsources)
+            self._remove_var_sources(ovar, norelsup, var, vsources)
+    
+    def _remove_var_sources(self, var, norelsup, ovar, vsources):
+        """remove invalid sources for var according to ovar's sources and the
+        relation between those two variables. 
+        """
+        varsources = vsources[var]
+        invalid_sources = varsources - (vsources[ovar] | norelsup)
+        if invalid_sources:
+            self._remove_sources(var, invalid_sources)
+            varsources -= invalid_sources
+            self._remove_sources_until_stable(var, vsources)
+        
+    def _compute_needsplit(self):
+        """tell according to sourcesvars if the rqlst has to be splitted for
+        execution among multiple sources
+        
+        the execution has to be split if
+        * a source support an entity (non invariant) but doesn't support a
+          relation on it
+        * a source support an entity which is accessed by an optional relation
+        * there is more than one sources and either all sources'supported        
+          variable/solutions are not equivalent or multiple variables have to
+          be fetched from some source
+        """
+        # NOTE: < 2 since may be 0 on queries such as Any X WHERE X eid 2
+        if len(self._sourcesvars) < 2: 
+            self.needsplit = False
+        elif not self.needsplit:
+            if not allequals(self._sourcesvars.itervalues()):
+                self.needsplit = True
+            else:
+                sample = self._sourcesvars.itervalues().next()
+                if len(sample) > 1 and any(v for v in sample
+                                           if not v in self._linkedvars):
+                    self.needsplit = True
+
+    def _set_source_for_var(self, source, var):
+        self._sourcesvars.setdefault(source, {})[var] = set(self._solindices)
+
+    def _term_sources(self, term):
+        """returns possible sources for terms `term`"""
+        if isinstance(term, Constant):
+            source = self._session.source_from_eid(term.eval(self.plan.args))
+            return set((source, solindex) for solindex in self._solindices)
+        else:
+            var = getattr(term, 'variable', term)
+            sources = [source for source, varobjs in self._sourcesvars.iteritems()
+                       if var in varobjs]
+            return set((source, solindex) for source in sources
+                       for solindex in self._sourcesvars[source][var])
+
+    def _remove_sources(self, var, sources):
+        """removes invalid sources (`sources`) from `sourcesvars`
+
+        :param sources: the list of sources to remove
+        :param var: the analyzed variable
+        """
+        sourcesvars = self._sourcesvars
+        for source, solindex in sources:
+            try:
+                sourcesvars[source][var].remove(solindex)
+            except KeyError:
+                return # may occur with subquery column alias
+            if not sourcesvars[source][var]:
+                del sourcesvars[source][var]
+                if not sourcesvars[source]:
+                    del sourcesvars[source]
+
+    def part_steps(self):
+        """precompute necessary part steps before generating actual rql for
+        each step. This is necessary to know if an aggregate step will be
+        necessary or not.
+        """
+        steps = []
+        select = self.rqlst
+        rschema = self.plan.schema.rschema
+        for source in self.part_sources:
+            sourcevars = self._sourcesvars[source]
+            while sourcevars:
+                # take a variable randomly, and all variables supporting the
+                # same solutions
+                var, solindices = self._choose_var(sourcevars)
+                if source.uri == 'system':
+                    # ensure all variables are available for the latest step
+                    # (missing one will be available from temporary tables
+                    # of previous steps)
+                    scope = select
+                    variables = scope.defined_vars.values() + scope.aliases.values()
+                    sourcevars.clear()
+                else:
+                    scope = var.scope
+                    variables = self._expand_vars(var, sourcevars, scope, solindices)
+                    if not sourcevars:
+                        del self._sourcesvars[source]
+                # find which sources support the same variables/solutions
+                sources = self._expand_sources(source, variables, solindices)
+                # suppose this is a final step until the contrary is proven
+                final = scope is select
+                # set of variables which should be additionaly selected when
+                # possible
+                needsel = set()
+                # add attribute variables and mark variables which should be
+                # additionaly selected when possible
+                for var in select.defined_vars.itervalues():
+                    if not var in variables:
+                        stinfo = var.stinfo
+                        for ovar, rtype in stinfo['attrvars']:
+                            if ovar in variables:
+                                needsel.add(var.name)
+                                variables.append(var)
+                                break
+                        else:
+                            needsel.add(var.name)
+                            final = False
+                if final and source.uri != 'system':
+                    # check rewritten constants
+                    for vconsts in select.stinfo['rewritten'].itervalues():
+                        const = vconsts[0]
+                        eid = const.eval(self.plan.args)
+                        _source = self._session.source_from_eid(eid)
+                        if len(sources) > 1 or not _source in sources:
+                            # if constant is only used by an identity relation,
+                            # skip
+                            for c in vconsts:
+                                rel = c.relation()
+                                if rel is None or not rel.neged(strict=True):
+                                    final = False
+                                    break
+                            break
+                # check where all relations are supported by the sources
+                for rel in scope.iget_nodes(Relation):
+                    if rel.is_types_restriction():
+                        continue
+                    # take care not overwriting the existing "source" identifier
+                    for _source in sources:
+                        if not _source.support_relation(rel.r_type):
+                            for vref in rel.iget_nodes(VariableRef):
+                                needsel.add(vref.name)
+                            final = False
+                            break
+                    else:
+                        if not scope is select:
+                            self._exists_relation(rel, variables, needsel)
+                        # if relation is supported by all sources and some of
+                        # its lhs/rhs variable isn't in "variables", and the
+                        # other end *is* in "variables", mark it have to be
+                        # selected
+                        if source.uri != 'system' and not rschema(rel.r_type).is_final():
+                            lhs, rhs = rel.get_variable_parts()
+                            try:
+                                lhsvar = lhs.variable
+                            except AttributeError:
+                                lhsvar = lhs
+                            try:
+                                rhsvar = rhs.variable
+                            except AttributeError:
+                                rhsvar = rhs
+                            if lhsvar in variables and not rhsvar in variables:
+                                needsel.add(lhsvar.name)
+                            elif rhsvar in variables and not lhsvar in variables:
+                                needsel.add(rhsvar.name)
+                if final:
+                    self._cleanup_sourcesvars(sources, solindices)
+                # XXX rename: variables may contain Relation and Constant nodes...
+                steps.append( (sources, variables, solindices, scope, needsel,
+                               final) )
+        return steps
+
+    def _exists_relation(self, rel, variables, needsel):
+        rschema = self.plan.schema.rschema(rel.r_type)
+        lhs, rhs = rel.get_variable_parts()
+        try:
+            lhsvar, rhsvar = lhs.variable, rhs.variable
+        except AttributeError:
+            pass
+        else:
+            # supported relation with at least one end supported, check the
+            # other end is in as well. If not this usually means the
+            # variable is refed by an outer scope and should be substituted
+            # using an 'identity' relation (else we'll get a conflict of
+            # temporary tables)
+            if rhsvar in variables and not lhsvar in variables:
+                self._identity_substitute(rel, lhsvar, variables, needsel)
+            elif lhsvar in variables and not rhsvar in variables:
+                self._identity_substitute(rel, rhsvar, variables, needsel)
+
+    def _identity_substitute(self, relation, var, variables, needsel):
+        newvar = self._insert_identity_variable(relation.scope, var)
+        if newvar is not None:
+            # ensure relation is using '=' operator, else we rely on a
+            # sqlgenerator side effect (it won't insert an inequality operator
+            # in this case)
+            relation.children[1].operator = '=' 
+            variables.append(newvar)
+            needsel.add(newvar.name)
+            #self.insertedvars.append((var.name, self.schema['identity'],
+            #                          newvar.name))
+        
+    def _choose_var(self, sourcevars):
+        secondchoice = None
+        if len(self._sourcesvars) > 1:
+            # priority to variable from subscopes
+            for var in sourcevars:
+                if not var.scope is self.rqlst:
+                    if isinstance(var, Variable):
+                        return var, sourcevars.pop(var)
+                    secondchoice = var
+        else:
+            # priority to variable outer scope
+            for var in sourcevars:
+                if var.scope is self.rqlst:
+                    if isinstance(var, Variable):
+                        return var, sourcevars.pop(var)
+                    secondchoice = var
+        if secondchoice is not None:
+            return secondchoice, sourcevars.pop(secondchoice)
+        # priority to variable
+        for var in sourcevars:
+            if isinstance(var, Variable):
+                return var, sourcevars.pop(var)
+        # whatever
+        var = iter(sourcevars).next()
+        return var, sourcevars.pop(var)
+            
+    def _expand_vars(self, var, sourcevars, scope, solindices):
+        variables = [var]
+        nbunlinked = 1
+        linkedvars = self._linkedvars
+        # variable has to belong to the same scope if there is more
+        # than the system source remaining
+        if len(self._sourcesvars) > 1 and not scope is self.rqlst:
+            candidates = (v for v in sourcevars.keys() if scope is v.scope)
+        else:
+            candidates = sourcevars #.iterkeys()
+        candidates = [v for v in candidates
+                      if isinstance(v, Constant) or
+                      (solindices.issubset(sourcevars[v]) and v in linkedvars)]
+        # repeat until no variable can't be added, since addition of a new
+        # variable may permit to another one to be added
+        modified = True
+        while modified and candidates:
+            modified = False
+            for var in candidates[:]:
+                # we only want one unlinked variable in each generated query
+                if isinstance(var, Constant) or \
+                       any(v for v in variables if v in linkedvars[var]):
+                    variables.append(var)
+                    # constant nodes should be systematically deleted
+                    if isinstance(var, Constant):
+                        del sourcevars[var]
+                    # variable nodes should be deleted once all possible solution
+                    # indices have been consumed
+                    else:
+                        sourcevars[var] -= solindices
+                        if not sourcevars[var]:
+                            del sourcevars[var]
+                    candidates.remove(var)
+                    modified = True
+        return variables
+    
+    def _expand_sources(self, selected_source, vars, solindices):
+        sources = [selected_source]
+        sourcesvars = self._sourcesvars
+        for source in sourcesvars:
+            if source is selected_source:
+                continue
+            for var in vars:
+                if not (var in sourcesvars[source] and 
+                        solindices.issubset(sourcesvars[source][var])):
+                    break
+            else:
+                sources.append(source)
+                if source.uri != 'system':
+                    for var in vars:
+                        varsolindices = sourcesvars[source][var]
+                        varsolindices -= solindices
+                        if not varsolindices:
+                            del sourcesvars[source][var]
+                
+        return sources
+    
+    def _cleanup_sourcesvars(self, sources, solindices):
+        """on final parts, remove solutions so we know they are already processed"""
+        for source in sources:
+            try:
+                sourcevar = self._sourcesvars[source]
+            except KeyError:
+                continue
+            for var, varsolindices in sourcevar.items():
+                varsolindices -= solindices
+                if not varsolindices:
+                    del sourcevar[var]
+                    
+    def merge_input_maps(self, allsolindices):
+        """inputmaps is a dictionary with tuple of solution indices as key with an
+        associateed input map as value. This function compute for each solution 
+        its necessary input map and return them grouped
+
+        ex:
+        inputmaps = {(0, 1, 2): {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1'},
+                     (1,): {'X': 't2.C0', 'T': 't2.C1'}}
+        return : [([1],  {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1',
+                           'X': 't2.C0', 'T': 't2.C1'}),                   
+                  ([0,2], {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1'})]
+        """
+        if not self._inputmaps:
+            return [(allsolindices, None)]
+        mapbysol = {}
+        # compute a single map for each solution
+        for solindices, basemap in self._inputmaps.iteritems():
+            for solindex in solindices:
+                solmap = mapbysol.setdefault(solindex, {})
+                solmap.update(basemap)
+                try:
+                    allsolindices.remove(solindex)
+                except KeyError:
+                    continue # already removed
+        # group results by identical input map
+        result = []
+        for solindex, solmap in mapbysol.iteritems():
+            for solindices, commonmap in result:
+                if commonmap == solmap:
+                    solindices.append(solindex)
+                    break
+            else:
+                result.append( ([solindex], solmap) )
+        if allsolindices:
+            result.append( (list(allsolindices), None) )
+        return result
+
+    def build_final_part(self, select, solindices, inputmap,  sources,
+                         insertedvars):
+        plan = self.plan
+        rqlst = plan.finalize(select, [self._solutions[i] for i in solindices],
+                              insertedvars)
+        if self.temptable is None and self.finaltable is None:
+            return OneFetchStep(plan, rqlst, sources, inputmap=inputmap)
+        table = self.temptable or self.finaltable
+        return FetchStep(plan, rqlst, sources, table, True, inputmap)
+
+    def build_non_final_part(self, select, solindices, sources, insertedvars,
+                             table):
+        """non final step, will have to store results in a temporary table"""
+        plan = self.plan
+        rqlst = plan.finalize(select, [self._solutions[i] for i in solindices],
+                              insertedvars)
+        step = FetchStep(plan, rqlst, sources, table, False)
+        # update input map for following steps, according to processed solutions
+        inputmapkey = tuple(sorted(solindices))
+        inputmap = self._inputmaps.setdefault(inputmapkey, {})
+        inputmap.update(step.outputmap)
+        plan.add_step(step)
+
+
+class MSPlanner(SSPlanner):
+    """MultiSourcesPlanner: build execution plan for rql queries
+
+    decompose the RQL query according to sources'schema
+    """
+        
+    def build_select_plan(self, plan, rqlst):
+        """build execution plan for a SELECT RQL query
+               
+        the rqlst should not be tagged at this point
+        """
+        if server.DEBUG:
+            print '-'*80
+            print 'PLANNING', rqlst
+        for select in rqlst.children:
+            if len(select.solutions) > 1:
+                hasmultiplesols = True
+                break
+        else:
+            hasmultiplesols = False
+        # preprocess deals with security insertion and returns a new syntax tree
+        # which have to be executed to fulfill the query: according
+        # to permissions for variable's type, different rql queries may have to
+        # be executed
+        plan.preprocess(rqlst)
+        ppis = [PartPlanInformation(plan, select, self.rqlhelper)
+                for select in rqlst.children]
+        steps = self._union_plan(plan, rqlst, ppis)
+        if server.DEBUG:
+            from pprint import pprint
+            for step in plan.steps:
+                pprint(step.test_repr())
+            pprint(steps[0].test_repr())
+        return steps
+
+    def _ppi_subqueries(self, ppi):
+        # part plan info for subqueries
+        plan = ppi.plan
+        inputmap = {}
+        for subquery in ppi.rqlst.with_[:]:
+            sppis = [PartPlanInformation(plan, select)
+                     for select in subquery.query.children]
+            for sppi in sppis:
+                if sppi.needsplit or sppi.part_sources != ppi.part_sources:
+                    temptable = 'T%s' % make_uid(id(subquery))
+                    sstep = self._union_plan(plan, subquery.query, sppis, temptable)[0]
+                    break
+            else:
+                sstep = None
+            if sstep is not None:
+                ppi.rqlst.with_.remove(subquery)
+                for i, colalias in enumerate(subquery.aliases):
+                    inputmap[colalias.name] = '%s.C%s' % (temptable, i)
+                ppi.plan.add_step(sstep)
+        return inputmap
+    
+    def _union_plan(self, plan, union, ppis, temptable=None):
+        tosplit, cango, allsources = [], {}, set()
+        for planinfo in ppis:
+            if planinfo.needsplit:
+                tosplit.append(planinfo)
+            else:
+                cango.setdefault(planinfo.part_sources, []).append(planinfo)
+            for source in planinfo.part_sources:
+                allsources.add(source)
+        # first add steps for query parts which doesn't need to splitted
+        steps = []
+        for sources, cppis in cango.iteritems():
+            byinputmap = {}
+            for ppi in cppis:
+                select = ppi.rqlst
+                if sources != (plan.session.repo.system_source,):
+                    add_types_restriction(self.schema, select)
+                # part plan info for subqueries
+                inputmap = self._ppi_subqueries(ppi)
+                aggrstep = need_aggr_step(select, sources)
+                if aggrstep:
+                    atemptable = 'T%s' % make_uid(id(select))
+                    sunion = Union()
+                    sunion.append(select)
+                    selected = select.selection[:]
+                    select_group_sort(select)
+                    step = AggrStep(plan, selected, select, atemptable, temptable)
+                    step.set_limit_offset(select.limit, select.offset)
+                    select.limit = None
+                    select.offset = 0
+                    fstep = FetchStep(plan, sunion, sources, atemptable, True, inputmap)
+                    step.children.append(fstep)
+                    steps.append(step)
+                else:
+                    byinputmap.setdefault(tuple(inputmap.iteritems()), []).append( (select) )
+            for inputmap, queries in byinputmap.iteritems():
+                inputmap = dict(inputmap)
+                sunion = Union()
+                for select in queries:
+                    sunion.append(select)
+                if temptable:
+                    steps.append(FetchStep(plan, sunion, sources, temptable, True, inputmap))
+                else:
+                    steps.append(OneFetchStep(plan, sunion, sources, inputmap))
+        # then add steps for splitted query parts
+        for planinfo in tosplit:
+            steps.append(self.split_part(planinfo, temptable))
+        if len(steps) > 1:
+            if temptable:
+                step = UnionFetchStep(plan)
+            else:
+                step = UnionStep(plan)
+            step.children = steps
+            return (step,)
+        return steps
+
+    # internal methods for multisources decomposition #########################
+    
+    def split_part(self, ppi, temptable):
+        ppi.finaltable = temptable
+        plan = ppi.plan
+        select = ppi.rqlst
+        subinputmap = self._ppi_subqueries(ppi)
+        stepdefs = ppi.part_steps()
+        if need_aggr_step(select, ppi.part_sources, stepdefs):
+            atemptable = 'T%s' % make_uid(id(select))
+            selection = select.selection[:]
+            select_group_sort(select)
+        else:
+            atemptable = None
+            selection = select.selection
+        ppi.temptable = atemptable
+        vfilter = VariablesFiltererVisitor(self.schema, ppi)
+        steps = []
+        for sources, variables, solindices, scope, needsel, final in stepdefs:
+            # extract an executable query using only the specified variables
+            if sources[0].uri == 'system':
+                # in this case we have to merge input maps before call to
+                # filter so already processed restriction are correctly
+                # removed
+                solsinputmaps = ppi.merge_input_maps(solindices)
+                for solindices, inputmap in solsinputmaps:
+                    minrqlst, insertedvars = vfilter.filter(
+                        sources, variables, scope, set(solindices), needsel, final)
+                    if inputmap is None:
+                        inputmap = subinputmap
+                    else:
+                        inputmap.update(subinputmap)
+                    steps.append(ppi.build_final_part(minrqlst, solindices, inputmap,
+                                                      sources, insertedvars))
+            else:
+                # this is a final part (i.e. retreiving results for the
+                # original query part) if all variable / sources have been
+                # treated or if this is the last shot for used solutions
+                minrqlst, insertedvars = vfilter.filter(
+                    sources, variables, scope, solindices, needsel, final)
+                if final:
+                    solsinputmaps = ppi.merge_input_maps(solindices)
+                    for solindices, inputmap in solsinputmaps:
+                        if inputmap is None:
+                            inputmap = subinputmap
+                        else:
+                            inputmap.update(subinputmap)
+                        steps.append(ppi.build_final_part(minrqlst, solindices, inputmap,
+                                                  sources, insertedvars))
+                else:
+                    table = '_T%s%s' % (''.join(sorted(v._ms_table_key() for v in variables)),
+                                        ''.join(sorted(str(i) for i in solindices)))
+                    ppi.build_non_final_part(minrqlst, solindices, sources,
+                                             insertedvars, table)
+        # finally: join parts, deal with aggregat/group/sorts if necessary
+        if atemptable is not None:
+            step = AggrStep(plan, selection, select, atemptable, temptable)
+            step.children = steps
+        elif len(steps) > 1:
+            if temptable:
+                step = UnionFetchStep(plan)
+            else:
+                step = UnionStep(plan)
+            step.children = steps
+        else:
+            step = steps[0]
+        if select.limit is not None or select.offset:
+            step.set_limit_offset(select.limit, select.offset)
+        return step
+
+    
+class UnsupportedBranch(Exception):
+    pass
+
+
+class VariablesFiltererVisitor(object):
+    def __init__(self, schema, ppi):
+        self.schema = schema
+        self.ppi = ppi
+        self.skip = {}
+        self.hasaggrstep = self.ppi.temptable
+        self.extneedsel = frozenset(vref.name for sortterm in ppi.rqlst.orderby
+                                    for vref in sortterm.iget_nodes(VariableRef))
+        
+    def _rqlst_accept(self, rqlst, node, newroot, variables, setfunc=None):
+        try:
+            newrestr, node_ = node.accept(self, newroot, variables[:])
+        except UnsupportedBranch:
+            return rqlst
+        if setfunc is not None and newrestr is not None:
+            setfunc(newrestr)
+        if not node_ is node:
+            rqlst = node.parent
+        return rqlst
+
+    def filter(self, sources, variables, rqlst, solindices, needsel, final):
+        if server.DEBUG:
+            print 'filter', final and 'final' or '', sources, variables, rqlst, solindices, needsel
+        newroot = Select()
+        self.sources = sources
+        self.solindices = solindices
+        self.final = final
+        # variables which appear in unsupported branches
+        needsel |= self.extneedsel
+        self.needsel = needsel
+        # variables which appear in supported branches
+        self.mayneedsel = set()
+        # new inserted variables
+        self.insertedvars = []
+        # other structures (XXX document)
+        self.mayneedvar, self.hasvar = {}, {}
+        self.use_only_defined = False
+        self.scopes = {rqlst: newroot}
+        if rqlst.where:
+            rqlst = self._rqlst_accept(rqlst, rqlst.where, newroot, variables,
+                                       newroot.set_where)
+        if isinstance(rqlst, Select):
+            self.use_only_defined = True
+            if rqlst.groupby:
+                groupby = []
+                for node in rqlst.groupby:
+                    rqlst = self._rqlst_accept(rqlst, node, newroot, variables,
+                                               groupby.append)
+                if groupby:
+                    newroot.set_groupby(groupby)
+            if rqlst.having:
+                having = []
+                for node in rqlst.having:
+                    rqlst = self._rqlst_accept(rqlst, node, newroot, variables,
+                                               having.append)
+                if having:
+                    newroot.set_having(having)
+            if final and rqlst.orderby and not self.hasaggrstep:
+                orderby = []
+                for node in rqlst.orderby:
+                    rqlst = self._rqlst_accept(rqlst, node, newroot, variables,
+                                               orderby.append)
+                if orderby:
+                    newroot.set_orderby(orderby)
+            self.process_selection(newroot, variables, rqlst)
+        elif not newroot.where:
+            # no restrictions have been copied, just select variables and add
+            # type restriction (done later by add_types_restriction)
+            for v in variables:
+                if not isinstance(v, Variable):
+                    continue
+                newroot.append_selected(VariableRef(newroot.get_variable(v.name)))
+        solutions = self.ppi.copy_solutions(solindices)
+        cleanup_solutions(newroot, solutions)
+        newroot.set_possible_types(solutions)
+        if final:
+            if self.hasaggrstep:
+                self.add_necessary_selection(newroot, self.mayneedsel & self.extneedsel)
+            newroot.distinct = rqlst.distinct
+        else:
+            self.add_necessary_selection(newroot, self.mayneedsel & self.needsel)
+            # insert vars to fetch constant values when needed
+            for (varname, rschema), reldefs in self.mayneedvar.iteritems():
+                for rel, ored in reldefs:
+                    if not (varname, rschema) in self.hasvar:
+                        self.hasvar[(varname, rschema)] = None # just to avoid further insertion
+                        cvar = newroot.make_variable()
+                        for sol in newroot.solutions:
+                            sol[cvar.name] = rschema.objects(sol[varname])[0]
+                        # if the current restriction is not used in a OR branch,
+                        # we can keep it, else we have to drop the constant
+                        # restriction (or we may miss some results)
+                        if not ored:
+                            rel = rel.copy(newroot)
+                            newroot.add_restriction(rel)
+                        # add a relation to link the variable
+                        newroot.remove_node(rel.children[1])
+                        cmp = Comparison('=')
+                        rel.append(cmp)
+                        cmp.append(VariableRef(cvar))
+                        self.insertedvars.append((varname, rschema, cvar.name))
+                        newroot.append_selected(VariableRef(newroot.get_variable(cvar.name)))
+                        # NOTE: even if the restriction is done by this query, we have
+                        # to let it in the original rqlst so that it appears anyway in
+                        # the "final" query, else we may change the meaning of the query
+                        # if there are NOT somewhere :
+                        # 'NOT X relation Y, Y name "toto"' means X WHERE X isn't related
+                        # to Y whose name is toto while
+                        # 'NOT X relation Y' means X WHERE X has no 'relation' (whatever Y)
+                    elif ored:
+                        newroot.remove_node(rel)
+        add_types_restriction(self.schema, rqlst, newroot, solutions)
+        if server.DEBUG:
+            print '--->', newroot
+        return newroot, self.insertedvars
+        
+    def visit_and(self, node, newroot, variables):
+        subparts = []
+        for i in xrange(len(node.children)):
+            child = node.children[i]
+            try:
+                newchild, child_ = child.accept(self, newroot, variables)
+                if not child_ is child:
+                    node = child_.parent
+                if newchild is None:
+                    continue
+                subparts.append(newchild)
+            except UnsupportedBranch:
+                continue
+        if not subparts:
+            return None, node
+        if len(subparts) == 1:
+            return subparts[0], node
+        return copy_node(newroot, node, subparts), node
+
+    visit_or = visit_and
+
+    def _relation_supported(self, rtype):
+        for source in self.sources:
+            if not source.support_relation(rtype):
+                return False
+        return True
+        
+    def visit_relation(self, node, newroot, variables):
+        if not node.is_types_restriction():
+            if node in self.skip and self.solindices.issubset(self.skip[node]):
+                if not self.schema.rschema(node.r_type).is_final():
+                    # can't really skip the relation if one variable is selected and only
+                    # referenced by this relation
+                    for vref in node.iget_nodes(VariableRef):
+                        stinfo = vref.variable.stinfo
+                        if stinfo['selected'] and len(stinfo['relations']) == 1:
+                            break
+                    else:
+                        return None, node
+                else:
+                    return None, node
+            if not self._relation_supported(node.r_type):
+                raise UnsupportedBranch()
+        # don't copy type restriction unless this is the only relation for the
+        # rhs variable, else they'll be reinserted later as needed (else we may
+        # copy a type restriction while the variable is not actually used)
+        elif not any(self._relation_supported(rel.r_type)
+                     for rel in node.children[0].variable.stinfo['relations']):
+            rel, node = self.visit_default(node, newroot, variables)
+            return rel, node
+        else:
+            raise UnsupportedBranch()
+        rschema = self.schema.rschema(node.r_type)
+        res = self.visit_default(node, newroot, variables)[0]
+        ored = node.ored()
+        if rschema.is_final() or rschema.inlined:
+            vrefs = node.children[1].get_nodes(VariableRef)
+            if not vrefs:
+                if not ored:
+                    self.skip.setdefault(node, set()).update(self.solindices)
+                else:
+                    self.mayneedvar.setdefault((node.children[0].name, rschema), []).append( (res, ored) )
+                    
+            else:
+                assert len(vrefs) == 1
+                vref = vrefs[0]
+                # XXX check operator ?
+                self.hasvar[(node.children[0].name, rschema)] = vref
+                if self._may_skip_attr_rel(rschema, node, vref, ored, variables, res):
+                    self.skip.setdefault(node, set()).update(self.solindices)
+        elif not ored:
+            self.skip.setdefault(node, set()).update(self.solindices)
+        return res, node
+
+    def _may_skip_attr_rel(self, rschema, rel, vref, ored, variables, res):
+        var = vref.variable
+        if ored:
+            return False
+        if var.name in self.extneedsel or var.stinfo['selected']:
+            return False
+        if not same_scope(var):
+            return False
+        if any(v for v,_ in var.stinfo['attrvars'] if not v.name in variables):
+            return False
+        return True
+        
+    def visit_exists(self, node, newroot, variables):
+        newexists = node.__class__()
+        self.scopes = {node: newexists}
+        subparts, node = self._visit_children(node, newroot, variables)
+        if not subparts:
+            return None, node
+        newexists.set_where(subparts[0])
+        return newexists, node
+    
+    def visit_not(self, node, newroot, variables):
+        subparts, node = self._visit_children(node, newroot, variables)
+        if not subparts:
+            return None, node
+        return copy_node(newroot, node, subparts), node
+    
+    def visit_group(self, node, newroot, variables):
+        if not self.final:
+            return None, node
+        return self.visit_default(node, newroot, variables)
+            
+    def visit_variableref(self, node, newroot, variables):
+        if self.use_only_defined:
+            if not node.variable.name in newroot.defined_vars:
+                raise UnsupportedBranch(node.name)
+        elif not node.variable in variables:
+            raise UnsupportedBranch(node.name)
+        self.mayneedsel.add(node.name)
+        # set scope so we can insert types restriction properly
+        newvar = newroot.get_variable(node.name)
+        newvar.stinfo['scope'] = self.scopes.get(node.variable.scope, newroot)
+        return VariableRef(newvar), node
+
+    def visit_constant(self, node, newroot, variables):
+        return copy_node(newroot, node), node
+    
+    def visit_default(self, node, newroot, variables):
+        subparts, node = self._visit_children(node, newroot, variables)
+        return copy_node(newroot, node, subparts), node
+        
+    visit_comparison = visit_mathexpression = visit_constant = visit_function = visit_default
+    visit_sort = visit_sortterm = visit_default
+    
+    def _visit_children(self, node, newroot, variables):
+        subparts = []
+        for i in xrange(len(node.children)):
+            child = node.children[i]
+            newchild, child_ = child.accept(self, newroot, variables)
+            if not child is child_:
+                node = child_.parent
+            if newchild is not None:
+                subparts.append(newchild)
+        return subparts, node
+    
+    def process_selection(self, newroot, variables, rqlst):
+        if self.final:
+            for term in rqlst.selection:
+                newroot.append_selected(term.copy(newroot))
+                for vref in term.get_nodes(VariableRef):
+                    self.needsel.add(vref.name)
+            return 
+        for term in rqlst.selection:
+            vrefs = term.get_nodes(VariableRef)
+            if vrefs:
+                supportedvars = []
+                for vref in vrefs:
+                    var = vref.variable
+                    if var in variables:
+                        supportedvars.append(vref)
+                        continue
+                    else:
+                        self.needsel.add(vref.name)
+                        break
+                else:
+                    for vref in vrefs:
+                        newroot.append_selected(vref.copy(newroot))
+                    supportedvars = []
+                for vref in supportedvars:
+                    if not vref in newroot.get_selected_variables():
+                        newroot.append_selected(VariableRef(newroot.get_variable(vref.name)))
+            
+    def add_necessary_selection(self, newroot, variables):
+        selected = tuple(newroot.get_selected_variables())
+        for varname in variables:
+            var = newroot.defined_vars[varname]
+            for vref in var.references():
+                rel = vref.relation()
+                if rel is None and vref in selected:
+                    # already selected
+                    break
+            else:
+                selvref = VariableRef(var)
+                newroot.append_selected(selvref)
+                if newroot.groupby:
+                    newroot.add_group_var(VariableRef(selvref.variable, noautoref=1))