server/msplanner.py
changeset 9448 3e7cad3967c5
parent 9447 0636c4960259
child 9449 287a05ec7ab1
equal deleted inserted replaced
9447:0636c4960259 9448:3e7cad3967c5
     1 # copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     3 #
       
     4 # This file is part of CubicWeb.
       
     5 #
       
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     7 # terms of the GNU Lesser General Public License as published by the Free
       
     8 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     9 # any later version.
       
    10 #
       
    11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    14 # details.
       
    15 #
       
    16 # You should have received a copy of the GNU Lesser General Public License along
       
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    18 """plan execution of rql queries on multiple sources
       
    19 
       
    20 the best way to understand what are we trying to acheive here is to read the
       
    21 unit-tests in unittest_msplanner.py
       
    22 
       
    23 
       
    24 What you need to know
       
    25 ~~~~~~~~~~~~~~~~~~~~~
       
    26 1. The system source is expected  to support every entity and relation types
       
    27 
       
    28 2. Given "X relation Y":
       
    29 
       
    30    * if relation, X and Y types are supported by the external source, we suppose
       
    31      by default that X and Y should both come from the same source as the
       
    32      relation. You can specify otherwise by adding relation into the
       
    33      "cross_relations" set in the source's mapping file and it that case, we'll
       
    34      consider that we can also find in the system source some relation between
       
    35      X and Y coming from different sources.
       
    36 
       
    37    * if "relation" isn't supported by the external source but X or Y
       
    38      types (or both) are, we suppose by default that can find in the system
       
    39      source some relation where X and/or Y come from the external source. You
       
    40      can specify otherwise by adding relation into the "dont_cross_relations"
       
    41      set in the source's mapping file and it that case, we'll consider that we
       
    42      can only find in the system source some relation between X and Y coming
       
    43      the system source.
       
    44 
       
    45 
       
    46 Implementation
       
    47 ~~~~~~~~~~~~~~
       
    48 XXX explain algorithm
       
    49 
       
    50 
       
    51 Exemples of multi-sources query execution
       
    52 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
       
    53 For a system source and a ldap user source (only CWUser and its attributes
       
    54 is supported, no group or such):
       
    55 
       
    56 :CWUser X:
       
    57 1. fetch CWUser X from both sources and return concatenation of results
       
    58 
       
    59 :CWUser X WHERE X in_group G, G name 'users':
       
    60 * catch 1
       
    61   1. fetch CWUser X from both sources, store concatenation of results into a
       
    62      temporary table
       
    63   2. return the result of TMP X WHERE X in_group G, G name 'users' from the
       
    64      system source
       
    65 * catch 2
       
    66   1. return the result of CWUser X WHERE X in_group G, G name 'users' from system
       
    67      source, that's enough (optimization of the sql querier will avoid join on
       
    68      CWUser, so we will directly get local eids)
       
    69 
       
    70 :CWUser X,L WHERE X in_group G, X login L, G name 'users':
       
    71 1. fetch Any X,L WHERE X is CWUser, X login L from both sources, store
       
    72    concatenation of results into a temporary table
       
    73 2. return the result of Any X, L WHERE X is TMP, X login LX in_group G,
       
    74    G name 'users' from the system source
       
    75 
       
    76 
       
    77 :Any X WHERE X owned_by Y:
       
    78 * catch 1
       
    79   1. fetch CWUser X from both sources, store concatenation of results into a
       
    80      temporary table
       
    81   2. return the result of Any X WHERE X owned_by Y, Y is TMP from the system
       
    82      source
       
    83 * catch 2
       
    84   1. return the result of Any X WHERE X owned_by Y from system source, that's
       
    85      enough (optimization of the sql querier will avoid join on CWUser, so we
       
    86      will directly get local eids)
       
    87 """
       
    88 
       
    89 __docformat__ = "restructuredtext en"
       
    90 
       
    91 from itertools import imap, ifilterfalse
       
    92 
       
    93 from logilab.common.compat import any
       
    94 from logilab.common.decorators import cached
       
    95 from logilab.common.deprecation import deprecated
       
    96 
       
    97 from rql import BadRQLQuery
       
    98 from rql.stmts import Union, Select
       
    99 from rql.nodes import (VariableRef, Comparison, Relation, Constant, Variable,
       
   100                        Not, Exists, SortTerm, Function)
       
   101 
       
   102 from cubicweb import server
       
   103 from cubicweb.utils import make_uid
       
   104 from cubicweb.rqlrewrite import add_types_restriction, cleanup_solutions
       
   105 from cubicweb.server.ssplanner import SSPlanner, OneFetchStep
       
   106 from cubicweb.server.mssteps import *
       
   107 
       
   108 Variable._ms_table_key = lambda x: x.name
       
   109 Relation._ms_table_key = lambda x: x.r_type
       
   110 # str() Constant.value to ensure generated table name won't be unicode
       
   111 Constant._ms_table_key = lambda x: str(x.value)
       
   112 
       
   113 Variable._ms_may_be_processed = lambda x, terms, linkedterms: any(
       
   114     t for t in terms if t in linkedterms.get(x, ()))
       
   115 Relation._ms_may_be_processed = lambda x, terms, linkedterms: all(
       
   116     getattr(hs, 'variable', hs) in terms for hs in x.get_variable_parts())
       
   117 
       
   118 def ms_scope(term):
       
   119     rel = None
       
   120     scope = term.scope
       
   121     if isinstance(term, Variable) and len(term.stinfo['relations']) == 1:
       
   122         rel = iter(term.stinfo['relations']).next().relation()
       
   123     elif isinstance(term, Constant):
       
   124         rel = term.relation()
       
   125     elif isinstance(term, Relation):
       
   126         rel = term
       
   127     if rel is not None and (
       
   128         rel.r_type != 'identity' and rel.scope is scope
       
   129         and isinstance(rel.parent, Exists) and rel.parent.neged(strict=True)):
       
   130         return scope.parent.scope
       
   131     return scope
       
   132 
       
   133 def need_intersect(select, getrschema):
       
   134     for rel in select.iget_nodes(Relation):
       
   135         if isinstance(rel.parent, Exists) and rel.parent.neged(strict=True) and not rel.is_types_restriction():
       
   136             rschema = getrschema(rel.r_type)
       
   137             if not rschema.final:
       
   138                 # if one of the relation's variable is ambiguous but not
       
   139                 # invariant, an intersection will be necessary
       
   140                 for vref in rel.get_nodes(VariableRef):
       
   141                     var = vref.variable
       
   142                     if (var.valuable_references() == 1
       
   143                         and len(var.stinfo['possibletypes']) > 1):
       
   144                         return True
       
   145     return False
       
   146 
       
   147 def neged_relation(rel):
       
   148     parent = rel.parent
       
   149     return isinstance(parent, Not) or (isinstance(parent, Exists) and
       
   150                                        isinstance(parent.parent, Not))
       
   151 
       
   152 def need_source_access_relation(vargraph):
       
   153     if not vargraph:
       
   154         return False
       
   155     # check vargraph contains some other relation than the identity relation
       
   156     # test of key nature since it may be a variable name (don't care about that)
       
   157     # or a 2-uple (var1, var2) associated to the relation to traverse to go from
       
   158     # var1 to var2
       
   159     return any(key for key, val in vargraph.iteritems()
       
   160                if isinstance(key, tuple) and val != 'identity')
       
   161 
       
   162 def need_aggr_step(select, sources, stepdefs=None):
       
   163     """return True if a temporary table is necessary to store some partial
       
   164     results to execute the given query
       
   165     """
       
   166     if len(sources) == 1:
       
   167         # can do everything at once with a single source
       
   168         return False
       
   169     if select.orderby or select.groupby or select.has_aggregat:
       
   170         # if more than one source, we need a temp table to deal with sort /
       
   171         # groups / aggregat if :
       
   172         # * the rqlst won't be splitted (in the other case the last query
       
   173         #   using partial temporary table can do sort/groups/aggregat without
       
   174         #   the need for a later AggrStep)
       
   175         # * the rqlst is splitted in multiple steps and there are more than one
       
   176         #   final step
       
   177         if stepdefs is None:
       
   178             return True
       
   179         has_one_final = False
       
   180         fstepsolindices = set()
       
   181         for stepdef in stepdefs:
       
   182             if stepdef[-1]:
       
   183                 if has_one_final or frozenset(stepdef[2]) != fstepsolindices:
       
   184                     return True
       
   185                 has_one_final = True
       
   186             else:
       
   187                 fstepsolindices.update(stepdef[2])
       
   188     return False
       
   189 
       
   190 def select_group_sort(select): # XXX something similar done in rql2sql
       
   191     # add variables used in groups and sort terms to the selection
       
   192     # if necessary
       
   193     if select.groupby:
       
   194         for vref in select.groupby:
       
   195             if not vref in select.selection:
       
   196                 select.append_selected(vref.copy(select))
       
   197     for sortterm in select.orderby:
       
   198         for vref in sortterm.iget_nodes(VariableRef):
       
   199             if not vref in select.get_selected_variables():
       
   200                 # we can't directly insert sortterm.term because it references
       
   201                 # a variable of the select before the copy.
       
   202                 # XXX if constant term are used to define sort, their value
       
   203                 # may necessite a decay
       
   204                 select.append_selected(vref.copy(select))
       
   205                 if select.groupby and not vref in select.groupby:
       
   206                     select.add_group_var(vref.copy(select))
       
   207 
       
   208 def allequals(solutions):
       
   209     """return true if all solutions are identical"""
       
   210     sol = solutions.next()
       
   211     noconstsol = None
       
   212     for sol_ in solutions:
       
   213         if sol_ != sol:
       
   214             return False
       
   215     return True
       
   216 
       
   217 # XXX move functions below to rql ##############################################
       
   218 
       
   219 def is_ancestor(n1, n2):
       
   220     """return True if n2 is a parent scope of n1"""
       
   221     p = n1.parent
       
   222     while p is not None:
       
   223         if p is n2:
       
   224             return True
       
   225         p = p.parent
       
   226     return False
       
   227 
       
   228 def copy_node(newroot, node, subparts=()):
       
   229     newnode = node.__class__(*node.initargs(newroot))
       
   230     for part in subparts:
       
   231         newnode.append(part)
       
   232     return newnode
       
   233 
       
   234 def used_in_outer_scope(var, scope):
       
   235     """return true if the variable is used in an outer scope of the given scope
       
   236     """
       
   237     for rel in var.stinfo['relations']:
       
   238         rscope = ms_scope(rel)
       
   239         if not rscope is scope and is_ancestor(scope, rscope):
       
   240             return True
       
   241     return False
       
   242 
       
   243 ################################################################################
       
   244 
       
   245 class PartPlanInformation(object):
       
   246     """regroups necessary information to execute some part of a "global" rql
       
   247     query ("global" means as received by the querier, which may result in
       
   248     several internal queries, e.g. parts, due to security insertions). Actually
       
   249     a PPI is created for each subquery and for each query in a union.
       
   250 
       
   251     It exposes as well some methods helping in executing this part on a
       
   252     multi-sources repository, modifying its internal structure during the
       
   253     process.
       
   254 
       
   255     :attr plan:
       
   256       the execution plan
       
   257     :attr rqlst:
       
   258       the original rql syntax tree handled by this part
       
   259 
       
   260     :attr needsplit:
       
   261       bool telling if the query has to be split into multiple steps for
       
   262       execution or if it can be executed at once
       
   263 
       
   264     :attr temptable:
       
   265       a SQL temporary table name or None, if necessary to handle aggregate /
       
   266       sorting for this part of the query
       
   267 
       
   268     :attr finaltable:
       
   269       a SQL table name or None, if results for this part of the query should be
       
   270       written into a temporary table (usually shared by multiple PPI)
       
   271 
       
   272     :attr sourcesterms:
       
   273       a dictionary {source : {term: set([solution index, ])}} telling for each
       
   274       source which terms are supported for which solutions. A "term" may be
       
   275       either a rql Variable, Constant or Relation node.
       
   276     """
       
   277     def __init__(self, plan, rqlst, rqlhelper=None):
       
   278         self.plan = plan
       
   279         self.rqlst = rqlst
       
   280         self.needsplit = False
       
   281         self.temptable = None
       
   282         self.finaltable = None
       
   283         # shortcuts
       
   284         self._schema = plan.schema
       
   285         self._session = plan.session
       
   286         self._repo = self._session.repo
       
   287         self._solutions = rqlst.solutions
       
   288         self._solindices = range(len(self._solutions))
       
   289         self.system_source = self._repo.system_source
       
   290         # source : {term: [solution index, ]}
       
   291         self.sourcesterms = self._sourcesterms = {}
       
   292         # source : {relation: set(child variable and constant)}
       
   293         self._crossrelations = {}
       
   294         # term : set(sources)
       
   295         self._discarded_sources = {}
       
   296         # dictionary of variables and constants which are linked to each other
       
   297         # using a non final relation supported by multiple sources (crossed or
       
   298         # not).
       
   299         self._linkedterms = {}
       
   300         # processing
       
   301         termssources = self._compute_sourcesterms()
       
   302         self._remove_invalid_sources(termssources)
       
   303         self._compute_needsplit()
       
   304         # after initialisation, .sourcesterms contains the same thing as
       
   305         # ._sourcesterms though during plan construction, ._sourcesterms will
       
   306         # be modified while .sourcesterms will be kept unmodified
       
   307         self.sourcesterms = {}
       
   308         for k, v in self._sourcesterms.iteritems():
       
   309             self.sourcesterms[k] = {}
       
   310             for k2, v2 in v.iteritems():
       
   311                 self.sourcesterms[k][k2] = v2.copy()
       
   312         # cleanup linked var
       
   313         for var, linkedrelsinfo in self._linkedterms.iteritems():
       
   314             self._linkedterms[var] = frozenset(x[0] for x in linkedrelsinfo)
       
   315         # map output of a step to input of a following step
       
   316         self._inputmaps = {}
       
   317         # record input map conflicts to resolve them on final step generation
       
   318         self._conflicts = []
       
   319         if rqlhelper is not None: # else test
       
   320             self._insert_identity_variable = rqlhelper._annotator.rewrite_shared_optional
       
   321         if server.DEBUG & server.DBG_MS:
       
   322             print 'sourcesterms:'
       
   323             self._debug_sourcesterms()
       
   324 
       
   325     def _debug_sourcesterms(self):
       
   326         for source in self._sourcesterms:
       
   327             print '-', source
       
   328             for term, sols in self._sourcesterms[source].items():
       
   329                 print '  -', term, id(term), ':', sols
       
   330 
       
   331     def copy_solutions(self, solindices):
       
   332         return [self._solutions[solidx].copy() for solidx in solindices]
       
   333 
       
   334     @property
       
   335     @cached
       
   336     def part_sources(self):
       
   337         if self._sourcesterms:
       
   338             return tuple(sorted(self._sourcesterms))
       
   339         return (self.system_source,)
       
   340 
       
   341     @property
       
   342     @cached
       
   343     def _sys_source_set(self):
       
   344         return frozenset((self.system_source, solindex)
       
   345                          for solindex in self._solindices)
       
   346 
       
   347     @cached
       
   348     def _norel_support_set(self, relation):
       
   349         """return a set of (source, solindex) where source doesn't support the
       
   350         relation
       
   351         """
       
   352         return frozenset((source, solidx) for source in self._repo.sources
       
   353                          for solidx in self._solindices
       
   354                          if not ((source.support_relation(relation.r_type))
       
   355                                  or relation.r_type in source.dont_cross_relations))
       
   356 
       
   357     def _compute_sourcesterms(self):
       
   358         """compute for each term (variable, rewritten constant, relation) and
       
   359         for each solution in the rqlst which sources support them
       
   360         """
       
   361         repo = self._repo
       
   362         eschema = self._schema.eschema
       
   363         sourcesterms = self._sourcesterms
       
   364         # find for each source which variable/solution are supported
       
   365         for varname, varobj in self.rqlst.defined_vars.items():
       
   366             # if variable has an eid specified, we can get its source directly
       
   367             # NOTE: use uidrel and not constnode to deal with "X eid IN(1,2,3,4)"
       
   368             if varobj.stinfo['uidrel'] is not None:
       
   369                 rel = varobj.stinfo['uidrel']
       
   370                 hasrel = len(varobj.stinfo['relations']) > 1
       
   371                 for const in rel.children[1].get_nodes(Constant):
       
   372                     eid = const.eval(self.plan.args)
       
   373                     source = self._session.source_from_eid(eid)
       
   374                     if (source is self.system_source
       
   375                         or (hasrel and varobj._q_invariant and
       
   376                             not any(source.support_relation(r.r_type)
       
   377                                     for r in varobj.stinfo['relations']
       
   378                                     if not r is rel))):
       
   379                         self._set_source_for_term(self.system_source, varobj)
       
   380                     else:
       
   381                         self._set_source_for_term(source, varobj)
       
   382                 continue
       
   383             rels = varobj.stinfo['relations']
       
   384             if not rels and varobj.stinfo['typerel'] is None:
       
   385                 # (rare) case where the variable has no type specified nor
       
   386                 # relation accessed ex. "Any MAX(X)"
       
   387                 self._set_source_for_term(self.system_source, varobj)
       
   388                 continue
       
   389             for i, sol in enumerate(self._solutions):
       
   390                 vartype = sol[varname]
       
   391                 # skip final variable
       
   392                 if eschema(vartype).final:
       
   393                     break
       
   394                 for source in repo.sources:
       
   395                     if source.support_entity(vartype):
       
   396                         # the source support the entity type, though we will
       
   397                         # actually have to fetch from it only if
       
   398                         # * the variable isn't invariant
       
   399                         # * at least one supported relation specified
       
   400                         if not varobj._q_invariant or \
       
   401                                any(imap(source.support_relation,
       
   402                                         (r.r_type for r in rels if r.r_type not in ('identity', 'eid')))):
       
   403                             sourcesterms.setdefault(source, {}).setdefault(varobj, set()).add(i)
       
   404                         # if variable is not invariant and is used by a relation
       
   405                         # not supported by this source, we'll have to split the
       
   406                         # query
       
   407                         if not varobj._q_invariant and any(ifilterfalse(
       
   408                             source.support_relation, (r.r_type for r in rels))):
       
   409                             self.needsplit = True
       
   410         # add source for rewritten constants to sourcesterms
       
   411         self._const_vars = {}
       
   412         for vconsts in self.rqlst.stinfo['rewritten'].itervalues():
       
   413             # remember those consts come from the same variable
       
   414             for const in vconsts:
       
   415                 self._const_vars[const] = vconsts
       
   416             source = self._session.source_from_eid(const.eval(self.plan.args))
       
   417             if source is self.system_source:
       
   418                 for const in vconsts:
       
   419                     self._set_source_for_term(source, const)
       
   420             elif not self._sourcesterms:
       
   421                 for const in vconsts:
       
   422                     self._set_source_for_term(source, const)
       
   423             elif source in self._sourcesterms:
       
   424                 source_scopes = frozenset(ms_scope(t) for t in self._sourcesterms[source])
       
   425                 for const in vconsts:
       
   426                     if ms_scope(const) in source_scopes:
       
   427                         self._set_source_for_term(source, const)
       
   428                         # if system source is used, add every rewritten constant
       
   429                         # to its supported terms even when associated entity
       
   430                         # doesn't actually come from it so we get a changes that
       
   431                         # allequals will return True as expected when computing
       
   432                         # needsplit
       
   433                         # check const is used in a relation restriction
       
   434                         if const.relation() and self.system_source in sourcesterms:
       
   435                             self._set_source_for_term(self.system_source, const)
       
   436         # add source for relations
       
   437         rschema = self._schema.rschema
       
   438         termssources = {}
       
   439         sourcerels = []
       
   440         for rel in self.rqlst.iget_nodes(Relation):
       
   441             # process non final relations only
       
   442             # note: don't try to get schema for 'is' relation (not available
       
   443             # during bootstrap)
       
   444             if rel.r_type == 'cw_source':
       
   445                 sourcerels.append(rel)
       
   446             if not (rel.is_types_restriction() or rschema(rel.r_type).final):
       
   447                 # nothing to do if relation is not supported by multiple sources
       
   448                 # or if some source has it listed in its cross_relations
       
   449                 # attribute
       
   450                 #
       
   451                 # XXX code below don't deal if some source allow relation
       
   452                 #     crossing but not another one
       
   453                 relsources = [s for s in repo.rel_type_sources(rel.r_type)
       
   454                                if s is self.system_source
       
   455                                or s in self._sourcesterms]
       
   456                 if len(relsources) < 2:
       
   457                     # filter out sources being there because they have this
       
   458                     # relation in their dont_cross_relations attribute
       
   459                     relsources = [source for source in relsources
       
   460                                   if source.support_relation(rel.r_type)]
       
   461                     if relsources:
       
   462                         # this means the relation is using a variable inlined as
       
   463                         # a constant and another unsupported variable, in which
       
   464                         # case we put the relation in sourcesterms
       
   465                         self._sourcesterms.setdefault(relsources[0], {})[rel] = set(self._solindices)
       
   466                     continue
       
   467                 lhs, rhs = rel.get_variable_parts()
       
   468                 lhsv, rhsv = getattr(lhs, 'variable', lhs), getattr(rhs, 'variable', rhs)
       
   469                 # update dictionary of sources supporting lhs and rhs vars
       
   470                 if not lhsv in termssources:
       
   471                     termssources[lhsv] = self._term_sources(lhs)
       
   472                 if not rhsv in termssources:
       
   473                     termssources[rhsv] = self._term_sources(rhs)
       
   474                 self._handle_cross_relation(rel, relsources, termssources)
       
   475                 self._linkedterms.setdefault(lhsv, set()).add((rhsv, rel))
       
   476                 self._linkedterms.setdefault(rhsv, set()).add((lhsv, rel))
       
   477         # extract information from cw_source relation
       
   478         for srel in sourcerels:
       
   479             vref = srel.children[1].children[0]
       
   480             sourceeids, sourcenames = [], []
       
   481             if isinstance(vref, Constant):
       
   482                 # simplified variable
       
   483                 sourceeids = None, (vref.eval(self.plan.args),)
       
   484                 var = vref
       
   485             else:
       
   486                 var = vref.variable
       
   487                 for rel in var.stinfo['relations'] - var.stinfo['rhsrelations']:
       
   488                     # skip neged eid relation since it's the kind of query
       
   489                     # generated when clearing old value of '?1" relation,
       
   490                     # cw_source included. See
       
   491                     # unittest_ldapuser.test_copy_to_system_source
       
   492                     if rel.r_type == 'name' or \
       
   493                        (rel.r_type == 'eid' and not rel.neged(strict=True)):
       
   494                         if rel.r_type == 'eid':
       
   495                             slist = sourceeids
       
   496                         else:
       
   497                             slist = sourcenames
       
   498                         sources = [cst.eval(self.plan.args)
       
   499                                    for cst in rel.children[1].get_nodes(Constant)]
       
   500                         if sources:
       
   501                             if slist:
       
   502                                 # don't attempt to do anything
       
   503                                 sourcenames = sourceeids = None
       
   504                                 break
       
   505                             slist[:] = (rel, sources)
       
   506             if sourceeids:
       
   507                 rel, values = sourceeids
       
   508                 sourcesdict = self._repo.sources_by_eid
       
   509             elif sourcenames:
       
   510                 rel, values = sourcenames
       
   511                 sourcesdict = self._repo.sources_by_uri
       
   512             else:
       
   513                 sourcesdict = None
       
   514             if sourcesdict is not None:
       
   515                 lhs = srel.children[0]
       
   516                 try:
       
   517                     sources = [sourcesdict[key] for key in values]
       
   518                 except KeyError:
       
   519                     raise BadRQLQuery('source conflict for term %s' % lhs.as_string())
       
   520                 if isinstance(lhs, Constant):
       
   521                     source = self._session.source_from_eid(lhs.eval(self.plan.args))
       
   522                     if not source in sources:
       
   523                         raise BadRQLQuery('source conflict for term %s' % lhs.as_string())
       
   524                 else:
       
   525                     lhs = getattr(lhs, 'variable', lhs)
       
   526                 invariant = getattr(lhs, '_q_invariant', False)
       
   527                 # XXX NOT NOT
       
   528                 neged = srel.neged(traverse_scope=True) or (rel and rel.neged(strict=True))
       
   529                 has_copy_based_source = False
       
   530                 sources_ = []
       
   531                 for source in sources:
       
   532                     if source.copy_based_source:
       
   533                         has_copy_based_source = True
       
   534                         if not self.system_source in sources_:
       
   535                             sources_.append(self.system_source)
       
   536                     else:
       
   537                         sources_.append(source)
       
   538                 sources = sources_
       
   539                 if neged:
       
   540                     for source in sources:
       
   541                         if invariant and source is self.system_source:
       
   542                             continue
       
   543                         self._remove_source_term(source, lhs)
       
   544                         self._discarded_sources.setdefault(lhs, set()).add(source)
       
   545                     usesys = self.system_source not in sources
       
   546                 else:
       
   547                     for source, terms in sourcesterms.items():
       
   548                         if lhs in terms and not source in sources:
       
   549                             if invariant and source is self.system_source:
       
   550                                 continue
       
   551                             self._remove_source_term(source, lhs)
       
   552                             self._discarded_sources.setdefault(lhs, set()).add(source)
       
   553                     usesys = self.system_source in sources
       
   554                 if rel is None or (len(var.stinfo['relations']) == 2 and
       
   555                                    not var.stinfo['selected']):
       
   556                     self._remove_source_term(self.system_source, var)
       
   557                     if not (has_copy_based_source or len(sources) > 1
       
   558                             or usesys or invariant):
       
   559                         if rel is None:
       
   560                             srel.parent.remove(srel)
       
   561                         else:
       
   562                             self.rqlst.undefine_variable(var)
       
   563                         self._remove_source_term(self.system_source, srel)
       
   564         return termssources
       
   565 
       
   566     def _handle_cross_relation(self, rel, relsources, termssources):
       
   567         for source in relsources:
       
   568             if rel.r_type in source.cross_relations:
       
   569                 ssource = self.system_source
       
   570                 crossvars = set(x.variable for x in rel.get_nodes(VariableRef))
       
   571                 for const in rel.get_nodes(Constant):
       
   572                     if source.uri != 'system' and not const in self._sourcesterms.get(source, ()):
       
   573                         continue
       
   574                     crossvars.add(const)
       
   575                 self._crossrelations.setdefault(source, {})[rel] = crossvars
       
   576                 if len(crossvars) < 2:
       
   577                     # this means there is a constant in the relation which is
       
   578                     # not supported by the source, so we can stop here
       
   579                     continue
       
   580                 self._sourcesterms.setdefault(ssource, {})[rel] = set(self._solindices)
       
   581                 solindices = None
       
   582                 for term in crossvars:
       
   583                     if len(termssources[term]) == 1 and iter(termssources[term]).next()[0].uri == 'system':
       
   584                         for ov in crossvars:
       
   585                             if ov is not term and (isinstance(ov, Constant) or ov._q_invariant):
       
   586                                 ssset = frozenset((ssource,))
       
   587                                 self._remove_sources(ov, termssources[ov] - ssset)
       
   588                         break
       
   589                     if solindices is None:
       
   590                         solindices = set(sol for s, sol in termssources[term]
       
   591                                          if s is source)
       
   592                     else:
       
   593                         solindices &= set(sol for s, sol in termssources[term]
       
   594                                           if s is source)
       
   595                 else:
       
   596                     self._sourcesterms.setdefault(source, {})[rel] = solindices
       
   597 
       
   598     def _remove_invalid_sources(self, termssources):
       
   599         """removes invalid sources from `sourcesterms` member according to
       
   600         traversed relations and their properties (which sources support them,
       
   601         can they cross sources, etc...)
       
   602         """
       
   603         for term in self._linkedterms:
       
   604             self._remove_sources_until_stable(term, termssources)
       
   605         if len(self._sourcesterms) > 1 and hasattr(self.plan.rqlst, 'main_relations'):
       
   606             # the querier doesn't annotate write queries, need to do it here
       
   607             self.plan.annotate_rqlst()
       
   608             # insert/update/delete queries, we may get extra information from
       
   609             # the main relation (eg relations to the left of the WHERE
       
   610             if self.plan.rqlst.TYPE == 'insert':
       
   611                 inserted = dict((vref.variable, etype)
       
   612                                 for etype, vref in self.plan.rqlst.main_variables)
       
   613             else:
       
   614                 inserted = {}
       
   615             repo = self._repo
       
   616             rschema = self._schema.rschema
       
   617             for rel in self.plan.rqlst.main_relations:
       
   618                 if not rschema(rel.r_type).final:
       
   619                     # nothing to do if relation is not supported by multiple sources
       
   620                     if len(repo.rel_type_sources(rel.r_type)) < 2:
       
   621                         continue
       
   622                     lhs, rhs = rel.get_variable_parts()
       
   623                     try:
       
   624                         lhsv = self._extern_term(lhs, termssources, inserted)
       
   625                         rhsv = self._extern_term(rhs, termssources, inserted)
       
   626                     except KeyError:
       
   627                         continue
       
   628                     self._remove_term_sources(lhsv, rel, rhsv, termssources)
       
   629                     self._remove_term_sources(rhsv, rel, lhsv, termssources)
       
   630 
       
   631     def _extern_term(self, term, termssources, inserted):
       
   632         var = term.variable
       
   633         if var.stinfo['constnode']:
       
   634             termv = var.stinfo['constnode']
       
   635             termssources[termv] = self._term_sources(termv)
       
   636         elif var in inserted:
       
   637             termv = var
       
   638             source = self._repo.locate_etype_source(inserted[var])
       
   639             termssources[termv] = set((source, solindex)
       
   640                                       for solindex in self._solindices)
       
   641         else:
       
   642             termv = self.rqlst.defined_vars[var.name]
       
   643             if not termv in termssources:
       
   644                 termssources[termv] = self._term_sources(termv)
       
   645         return termv
       
   646 
       
   647     def _remove_sources_until_stable(self, term, termssources):
       
   648         sourcesterms = self._sourcesterms
       
   649         for oterm, rel in self._linkedterms.get(term, ()):
       
   650             tscope = ms_scope(term)
       
   651             otscope = ms_scope(oterm)
       
   652             rscope = ms_scope(rel)
       
   653             if not tscope is otscope and rscope.neged(strict=True):
       
   654                 # can't get information from relation inside a NOT exists
       
   655                 # where terms don't belong to the same scope
       
   656                 continue
       
   657             need_ancestor_scope = False
       
   658             if not (tscope is rscope and otscope is rscope):
       
   659                 if rel.ored():
       
   660                     continue
       
   661                 if rel.ored(traverse_scope=True):
       
   662                     # if relation has some OR as parent, constraints should only
       
   663                     # propagate from parent scope to child scope, nothing else
       
   664                     need_ancestor_scope = True
       
   665             relsources = self._repo.rel_type_sources(rel.r_type)
       
   666             if neged_relation(rel) and (
       
   667                 len(relsources) < 2
       
   668                 or not isinstance(oterm, Variable)
       
   669                 or oterm.valuable_references() != 1
       
   670                 or any(sourcesterms[source][term] != sourcesterms[source][oterm]
       
   671                        for source in relsources
       
   672                        if term in sourcesterms.get(source, ())
       
   673                        and oterm in sourcesterms.get(source, ()))):
       
   674                 # neged relation doesn't allow to infer term sources unless
       
   675                 # we're on a multisource relation for a term only used by this
       
   676                 # relation (eg "Any X WHERE NOT X multisource_rel Y" and over is
       
   677                 # Y)
       
   678                 continue
       
   679             # compute invalid sources for terms and remove them
       
   680             if not need_ancestor_scope or is_ancestor(tscope, otscope):
       
   681                 self._remove_term_sources(term, rel, oterm, termssources)
       
   682             if not need_ancestor_scope or is_ancestor(otscope, tscope):
       
   683                 self._remove_term_sources(oterm, rel, term, termssources)
       
   684 
       
   685     def _remove_term_sources(self, term, rel, oterm, termssources):
       
   686         """remove invalid sources for term according to oterm's sources and the
       
   687         relation between those two terms.
       
   688         """
       
   689         norelsup = self._norel_support_set(rel)
       
   690         termsources = termssources[term]
       
   691         invalid_sources = termsources - (termssources[oterm] | norelsup)
       
   692         if invalid_sources and self._repo.can_cross_relation(rel.r_type):
       
   693             invalid_sources -= self._sys_source_set
       
   694             if invalid_sources and isinstance(term, Variable) \
       
   695                    and self._need_ext_source_access(term, rel):
       
   696                 # if the term is a not invariant variable, we should filter out
       
   697                 # source where the relation is a cross relation from invalid
       
   698                 # sources
       
   699                 invalid_sources = frozenset((s, solidx) for s, solidx in invalid_sources
       
   700                                             if not (s in self._crossrelations and
       
   701                                                     rel in self._crossrelations[s]))
       
   702         if invalid_sources:
       
   703             self._remove_sources(term, invalid_sources)
       
   704             discarded = self._discarded_sources.get(term)
       
   705             if discarded is not None and not any(x[0] for x in (termsources-invalid_sources)
       
   706                                                  if not x[0] in discarded):
       
   707                 raise BadRQLQuery('relation %s cant be crossed but %s and %s should '
       
   708                               'come from difference sources' %
       
   709                               (rel.r_type, term.as_string(), oterm.as_string()))
       
   710             # if term is a rewritten const, we can apply the same changes to
       
   711             # all other consts inserted from the same original variable
       
   712             for const in self._const_vars.get(term, ()):
       
   713                 if const is not term:
       
   714                     self._remove_sources(const, invalid_sources)
       
   715             termsources -= invalid_sources
       
   716             self._remove_sources_until_stable(term, termssources)
       
   717             if isinstance(oterm, Constant):
       
   718                 self._remove_sources(oterm, invalid_sources)
       
   719 
       
   720     def _compute_needsplit(self):
       
   721         """tell according to sourcesterms if the rqlst has to be splitted for
       
   722         execution among multiple sources
       
   723 
       
   724         the execution has to be split if
       
   725         * a source support an entity (non invariant) but doesn't support a
       
   726           relation on it
       
   727         * a source support an entity which is accessed by an optional relation
       
   728         * there is more than one source and either all sources'supported
       
   729           variable/solutions are not equivalent or multiple variables have to
       
   730           be fetched from some source
       
   731         """
       
   732         # NOTE: < 2 since may be 0 on queries such as Any X WHERE X eid 2
       
   733         if len(self._sourcesterms) < 2:
       
   734             self.needsplit = False
       
   735             # if this is not the system source but we have only constant terms
       
   736             # and no relation (other than eid), apply query on the system source
       
   737             #
       
   738             # testing for rqlst with nothing in vargraph nor defined_vars is the
       
   739             # simplest way the check the condition explained below
       
   740             if not self.system_source in self._sourcesterms and \
       
   741                    not self.rqlst.defined_vars and \
       
   742                    not need_source_access_relation(self.rqlst.vargraph):
       
   743                 self._sourcesterms = {self.system_source: {}}
       
   744         elif not self.needsplit:
       
   745             if not allequals(self._sourcesterms.itervalues()):
       
   746                 for source, terms in self._sourcesterms.iteritems():
       
   747                     if source is self.system_source:
       
   748                         continue
       
   749                     if any(x for x in terms if not isinstance(x, Constant)):
       
   750                         self.needsplit = True
       
   751                         return
       
   752                 self._sourcesterms = {self.system_source: {}}
       
   753                 self.needsplit = False
       
   754             else:
       
   755                 sample = self._sourcesterms.itervalues().next()
       
   756                 if len(sample) > 1:
       
   757                     for term in sample:
       
   758                         # need split if unlinked variable
       
   759                         if isinstance(term, Variable) and not term in self._linkedterms:
       
   760                             self.needsplit = True
       
   761                             break
       
   762                     else:
       
   763                         # need split if there are some cross relation on non
       
   764                         # invariant variable or if the variable is used in
       
   765                         # multi-sources relation
       
   766                         if self._crossrelations:
       
   767                             for reldict in self._crossrelations.itervalues():
       
   768                                 for rel, terms in reldict.iteritems():
       
   769                                     for term in terms:
       
   770                                         if isinstance(term, Variable) \
       
   771                                                and self._need_ext_source_access(term, rel):
       
   772                                             self.needsplit = True
       
   773                                             return
       
   774         else:
       
   775             # remove sources only accessing to constant nodes
       
   776             for source, terms in self._sourcesterms.items():
       
   777                 if source is self.system_source:
       
   778                     continue
       
   779                 if not any(x for x in terms if not isinstance(x, Constant)):
       
   780                     del self._sourcesterms[source]
       
   781             if len(self._sourcesterms) < 2:
       
   782                 self.needsplit = False
       
   783 
       
   784     @cached
       
   785     def _need_ext_source_access(self, var, rel):
       
   786         if not var._q_invariant:
       
   787             return True
       
   788         if  any(r for x, r in self._linkedterms[var]
       
   789                 if not r is rel and self._repo.is_multi_sources_relation(r.r_type)):
       
   790             return True
       
   791         return False
       
   792 
       
   793     def _set_source_for_term(self, source, term):
       
   794         self._sourcesterms.setdefault(source, {})[term] = set(self._solindices)
       
   795 
       
   796     def _term_sources(self, term):
       
   797         """returns possible sources for terms `term`"""
       
   798         if isinstance(term, Constant):
       
   799             source = self._session.source_from_eid(term.eval(self.plan.args))
       
   800             return set((source, solindex) for solindex in self._solindices)
       
   801         else:
       
   802             var = getattr(term, 'variable', term)
       
   803             sources = [source for source, varobjs in self.sourcesterms.iteritems()
       
   804                        if var in varobjs]
       
   805             return set((source, solindex) for source in sources
       
   806                        for solindex in self.sourcesterms[source][var])
       
   807 
       
   808     def _remove_sources(self, term, sources):
       
   809         """removes invalid sources (`sources`) from `sourcesterms`
       
   810 
       
   811         :param sources: the list of sources to remove
       
   812         :param term: the analyzed term
       
   813         """
       
   814         sourcesterms = self._sourcesterms
       
   815         for source, solindex in sources:
       
   816             try:
       
   817                 sourcesterms[source][term].remove(solindex)
       
   818             except KeyError:
       
   819                 import rql.base as rqlb
       
   820                 assert isinstance(term, (rqlb.BaseNode, Variable)), repr(term)
       
   821                 continue # may occur with subquery column alias
       
   822             if not sourcesterms[source][term]:
       
   823                 self._remove_source_term(source, term)
       
   824 
       
   825     def _remove_source_term(self, source, term):
       
   826         try:
       
   827             poped = self._sourcesterms[source].pop(term, None)
       
   828         except KeyError:
       
   829             pass
       
   830         else:
       
   831             if not self._sourcesterms[source]:
       
   832                 del self._sourcesterms[source]
       
   833 
       
   834     def crossed_relation(self, source, relation):
       
   835         return relation in self._crossrelations.get(source, ())
       
   836 
       
   837     def part_steps(self):
       
   838         """precompute necessary part steps before generating actual rql for
       
   839         each step. This is necessary to know if an aggregate step will be
       
   840         necessary or not.
       
   841         """
       
   842         steps = []
       
   843         select = self.rqlst
       
   844         rschema = self._schema.rschema
       
   845         for source in self.part_sources:
       
   846             try:
       
   847                 sourceterms = self._sourcesterms[source]
       
   848             except KeyError:
       
   849                 continue # already proceed
       
   850             while sourceterms:
       
   851                 # take a term randomly, and all terms supporting the
       
   852                 # same solutions
       
   853                 term, solindices = self._choose_term(source, sourceterms)
       
   854                 if source.uri == 'system':
       
   855                     # ensure all variables are available for the latest step
       
   856                     # (missing one will be available from temporary tables
       
   857                     # of previous steps)
       
   858                     scope = select
       
   859                     terms = scope.defined_vars.values() + scope.aliases.values()
       
   860                     sourceterms.clear()
       
   861                     sources = [source]
       
   862                 else:
       
   863                     scope = ms_scope(term)
       
   864                     # find which sources support the same term and solutions
       
   865                     sources = self._expand_sources(source, term, solindices)
       
   866                     # no try to get as much terms as possible
       
   867                     terms = self._expand_terms(term, sources, sourceterms,
       
   868                                                scope, solindices)
       
   869                     if len(terms) == 1 and isinstance(terms[0], Constant):
       
   870                         # we can't generate anything interesting with a single
       
   871                         # constant term (will generate an empty "Any" query),
       
   872                         # go to the next iteration directly!
       
   873                         continue
       
   874                     if not sourceterms:
       
   875                         try:
       
   876                             del self._sourcesterms[source]
       
   877                         except KeyError:
       
   878                             # XXX already cleaned
       
   879                             pass
       
   880                 # set of terms which should be additionaly selected when
       
   881                 # possible
       
   882                 needsel = set()
       
   883                 if not self._sourcesterms and scope is select:
       
   884                     terms += scope.defined_vars.values() + scope.aliases.values()
       
   885                     if isinstance(term, Relation) and len(sources) > 1:
       
   886                         variants = set()
       
   887                         partterms = [term]
       
   888                         for vref in term.get_nodes(VariableRef):
       
   889                             if not vref.variable._q_invariant:
       
   890                                 variants.add(vref.name)
       
   891                         if len(variants) == 2:
       
   892                             # we need an extra-step to fetch relations from each source
       
   893                             # before a join with prefetched inputs
       
   894                             # (see test_crossed_relation_noeid_needattr in
       
   895                             #  unittest_msplanner / unittest_multisources)
       
   896                             lhs, rhs = term.get_variable_parts()
       
   897                             steps.append( (sources, [term, getattr(lhs, 'variable', lhs),
       
   898                                                      getattr(rhs, 'variable', rhs)],
       
   899                                            solindices, scope, variants, False) )
       
   900                             sources = [self.system_source]
       
   901                     final = True
       
   902                 else:
       
   903                     # suppose this is a final step until the contrary is proven
       
   904                     final = scope is select
       
   905                     # add attribute variables and mark variables which should be
       
   906                     # additionaly selected when possible
       
   907                     for var in select.defined_vars.itervalues():
       
   908                         if not var in terms:
       
   909                             stinfo = var.stinfo
       
   910                             for ovar, rtype in stinfo.get('attrvars', ()):
       
   911                                 if ovar in terms:
       
   912                                     needsel.add(var.name)
       
   913                                     terms.append(var)
       
   914                                     break
       
   915                             else:
       
   916                                 needsel.add(var.name)
       
   917                                 final = False
       
   918                     # check all relations are supported by the sources
       
   919                     for rel in scope.iget_nodes(Relation):
       
   920                         if rel.is_types_restriction():
       
   921                             continue
       
   922                         # take care not overwriting the existing "source" identifier
       
   923                         for _source in sources:
       
   924                             if not _source.support_relation(rel.r_type) or (
       
   925                                 self.crossed_relation(_source, rel) and not rel in terms):
       
   926                                 for vref in rel.iget_nodes(VariableRef):
       
   927                                     needsel.add(vref.name)
       
   928                                 final = False
       
   929                                 break
       
   930                         else:
       
   931                             if not scope is select:
       
   932                                 self._exists_relation(rel, terms, needsel, source)
       
   933                             # if relation is supported by all sources and some of
       
   934                             # its lhs/rhs variable isn't in "terms", and the
       
   935                             # other end *is* in "terms", mark it have to be
       
   936                             # selected
       
   937                             if source.uri != 'system' and not rschema(rel.r_type).final:
       
   938                                 lhs, rhs = rel.get_variable_parts()
       
   939                                 try:
       
   940                                     lhsvar = lhs.variable
       
   941                                 except AttributeError:
       
   942                                     lhsvar = lhs
       
   943                                 try:
       
   944                                     rhsvar = rhs.variable
       
   945                                 except AttributeError:
       
   946                                     rhsvar = rhs
       
   947                                 try:
       
   948                                     if lhsvar in terms and not rhsvar in terms:
       
   949                                         needsel.add(lhsvar.name)
       
   950                                     elif rhsvar in terms and not lhsvar in terms:
       
   951                                         needsel.add(rhsvar.name)
       
   952                                 except AttributeError:
       
   953                                     continue # not an attribute, no selection needed
       
   954                 if final and source.uri != 'system':
       
   955                     # check rewritten constants
       
   956                     for vconsts in select.stinfo['rewritten'].itervalues():
       
   957                         const = vconsts[0]
       
   958                         eid = const.eval(self.plan.args)
       
   959                         _source = self._session.source_from_eid(eid)
       
   960                         if len(sources) > 1 or not _source in sources:
       
   961                             # if there is some rewriten constant used by a not
       
   962                             # neged relation while there are some source not
       
   963                             # supporting the associated entity, this step can't
       
   964                             # be final (unless the relation is explicitly in
       
   965                             # `terms`, eg cross relations)
       
   966                             for c in vconsts:
       
   967                                 rel = c.relation()
       
   968                                 if rel is None or not (rel in terms or neged_relation(rel)):
       
   969                                     final = False
       
   970                                     break
       
   971                             break
       
   972                 if final:
       
   973                     self._cleanup_sourcesterms(sources, solindices)
       
   974                 steps.append((sources, terms, solindices, scope, needsel, final)
       
   975                              )
       
   976         if not steps[-1][-1]:
       
   977             # add a final step
       
   978             terms = select.defined_vars.values() + select.aliases.values()
       
   979             steps.append( ([self.system_source], terms, set(self._solindices),
       
   980                            select, set(), True) )
       
   981         return steps
       
   982 
       
   983     def _exists_relation(self, rel, terms, needsel, source):
       
   984         rschema = self._schema.rschema(rel.r_type)
       
   985         lhs, rhs = rel.get_variable_parts()
       
   986         try:
       
   987             lhsvar, rhsvar = lhs.variable, rhs.variable
       
   988         except AttributeError:
       
   989             pass
       
   990         else:
       
   991             # supported relation with at least one end supported, check the
       
   992             # other end is in as well. If not this usually means the
       
   993             # variable is refed by an outer scope and should be substituted
       
   994             # using an 'identity' relation (else we'll get a conflict of
       
   995             # temporary tables)
       
   996             relscope = ms_scope(rel)
       
   997             lhsscope = ms_scope(lhsvar)
       
   998             rhsscope = ms_scope(rhsvar)
       
   999             if rhsvar in terms and not lhsvar in terms and lhsscope is lhsvar.stmt:
       
  1000                 self._identity_substitute(rel, lhsvar, terms, needsel, relscope)
       
  1001             elif lhsvar in terms and not rhsvar in terms and rhsscope is rhsvar.stmt:
       
  1002                 self._identity_substitute(rel, rhsvar, terms, needsel, relscope)
       
  1003             elif self.crossed_relation(source, rel):
       
  1004                 if lhsscope is not relscope:
       
  1005                     self._identity_substitute(rel, lhsvar, terms, needsel,
       
  1006                                               relscope, lhsscope)
       
  1007                 if rhsscope is not relscope:
       
  1008                     self._identity_substitute(rel, rhsvar, terms, needsel,
       
  1009                                               relscope, rhsscope)
       
  1010 
       
  1011     def _identity_substitute(self, relation, var, terms, needsel, exist,
       
  1012                              idrelscope=None):
       
  1013         newvar = self._insert_identity_variable(exist, var, idrelscope)
       
  1014         # ensure relation is using '=' operator, else we rely on a
       
  1015         # sqlgenerator side effect (it won't insert an inequality operator
       
  1016         # in this case)
       
  1017         relation.children[1].operator = '='
       
  1018         terms.append(newvar)
       
  1019         needsel.add(newvar.name)
       
  1020 
       
  1021     def _choose_term(self, source, sourceterms):
       
  1022         """pick one term among terms supported by a source, which will be used
       
  1023         as a base to generate an execution step
       
  1024         """
       
  1025         secondchoice = None
       
  1026         if len(self._sourcesterms) > 1:
       
  1027             # first, return non invariant variable of crossed relation, then the
       
  1028             # crossed relation itself
       
  1029             for term in sourceterms:
       
  1030                 if (isinstance(term, Relation)
       
  1031                     and self.crossed_relation(source, term)
       
  1032                     and not ms_scope(term) is self.rqlst):
       
  1033                     for vref in term.get_variable_parts():
       
  1034                         try:
       
  1035                             var = vref.variable
       
  1036                         except AttributeError:
       
  1037                             # Constant
       
  1038                             continue
       
  1039                         if ((len(var.stinfo['relations']) > 1 or var.stinfo['selected'])
       
  1040                             and var in sourceterms):
       
  1041                             return var, sourceterms.pop(var)
       
  1042                     return term, sourceterms.pop(term)
       
  1043             # priority to variable from subscopes
       
  1044             for term in sourceterms:
       
  1045                 if not ms_scope(term) is self.rqlst:
       
  1046                     if isinstance(term, Variable):
       
  1047                         return term, sourceterms.pop(term)
       
  1048                     secondchoice = term
       
  1049         else:
       
  1050             # priority to variable from outer scope
       
  1051             for term in sourceterms:
       
  1052                 if ms_scope(term) is self.rqlst:
       
  1053                     if isinstance(term, Variable):
       
  1054                         return term, sourceterms.pop(term)
       
  1055                     secondchoice = term
       
  1056         if secondchoice is not None:
       
  1057             return secondchoice, sourceterms.pop(secondchoice)
       
  1058         # priority to variable with the less solutions supported and with the
       
  1059         # most valuable refs. Add variable name for test predictability
       
  1060         variables = sorted([(var, sols) for (var, sols) in sourceterms.items()
       
  1061                             if isinstance(var, Variable)],
       
  1062                            key=lambda (v, s): (len(s), -v.valuable_references(), v.name))
       
  1063         if variables:
       
  1064             var = variables[0][0]
       
  1065             return var, sourceterms.pop(var)
       
  1066         # priority to constant
       
  1067         for term in sourceterms:
       
  1068             if isinstance(term, Constant):
       
  1069                 return term, sourceterms.pop(term)
       
  1070         # whatever (relation)
       
  1071         term = iter(sourceterms).next()
       
  1072         return term, sourceterms.pop(term)
       
  1073 
       
  1074     def _expand_sources(self, selected_source, term, solindices):
       
  1075         """return all sources supporting given term / solindices"""
       
  1076         sources = [selected_source]
       
  1077         sourcesterms = self._sourcesterms
       
  1078         for source in list(sourcesterms):
       
  1079             if source is selected_source:
       
  1080                 continue
       
  1081             if not (term in sourcesterms[source] and
       
  1082                     solindices.issubset(sourcesterms[source][term])):
       
  1083                 continue
       
  1084             sources.append(source)
       
  1085             if source.uri != 'system' or not (isinstance(term, Variable) and not term in self._linkedterms):
       
  1086                 termsolindices = sourcesterms[source][term]
       
  1087                 termsolindices -= solindices
       
  1088                 if not termsolindices:
       
  1089                     del sourcesterms[source][term]
       
  1090                     if not sourcesterms[source]:
       
  1091                         del sourcesterms[source]
       
  1092         return sources
       
  1093 
       
  1094     def _expand_terms(self, term, sources, sourceterms, scope, solindices):
       
  1095         terms = [term]
       
  1096         sources = sorted(sources)
       
  1097         sourcesterms = self._sourcesterms
       
  1098         linkedterms = self._linkedterms
       
  1099         # term has to belong to the same scope if there is more
       
  1100         # than the system source remaining
       
  1101         if len(sourcesterms) > 1 and not scope is self.rqlst:
       
  1102             candidates = (t for t in sourceterms if scope is ms_scope(t))
       
  1103         else:
       
  1104             candidates = sourceterms
       
  1105         # we only want one unlinked term in each generated query
       
  1106         candidates = [t for t in candidates
       
  1107                       if isinstance(t, (Constant, Relation)) or
       
  1108                       (solindices.issubset(sourceterms[t]) and t in linkedterms)]
       
  1109         cross_rels = {}
       
  1110         for source in sources:
       
  1111             cross_rels.update(self._crossrelations.get(source, {}))
       
  1112         exclude = {}
       
  1113         for crossvars in cross_rels.itervalues():
       
  1114             vars = [t for t in crossvars if isinstance(t, Variable)]
       
  1115             try:
       
  1116                 exclude[vars[0]] = vars[1]
       
  1117                 exclude[vars[1]] = vars[0]
       
  1118             except IndexError:
       
  1119                 pass
       
  1120         accept_term = lambda x: (not any(s for s in sources
       
  1121                                          if not x in sourcesterms.get(s, ()))
       
  1122                                  and x._ms_may_be_processed(terms, linkedterms)
       
  1123                                  and not exclude.get(x) in terms)
       
  1124         if isinstance(term, Relation) and term in cross_rels:
       
  1125             cross_terms = cross_rels.pop(term)
       
  1126             base_accept_term = accept_term
       
  1127             accept_term = lambda x: (base_accept_term(x) or x in cross_terms)
       
  1128             for refed in cross_terms:
       
  1129                 if not refed in candidates:
       
  1130                     terms.append(refed)
       
  1131         # repeat until no term can't be added, since addition of a new
       
  1132         # term may permit to another one to be added
       
  1133         modified = True
       
  1134         while modified and candidates:
       
  1135             modified = False
       
  1136             for term in candidates[:]:
       
  1137                 if isinstance(term, Constant):
       
  1138                     termsources = set(x[0] for x in self._term_sources(term))
       
  1139                     # ensure system source is there for constant
       
  1140                     if self.system_source in sources:
       
  1141                         termsources.add(self.system_source)
       
  1142                     if sorted(termsources) != sources:
       
  1143                         continue
       
  1144                     terms.append(term)
       
  1145                     candidates.remove(term)
       
  1146                     modified = True
       
  1147                     del sourceterms[term]
       
  1148                 elif accept_term(term):
       
  1149                     terms.append(term)
       
  1150                     candidates.remove(term)
       
  1151                     modified = True
       
  1152                     self._cleanup_sourcesterms(sources, solindices, term)
       
  1153         return terms
       
  1154 
       
  1155     def _cleanup_sourcesterms(self, sources, solindices, term=None):
       
  1156         """remove solutions so we know they are already processed"""
       
  1157         for source in sources:
       
  1158             try:
       
  1159                 sourceterms = self._sourcesterms[source]
       
  1160             except KeyError:
       
  1161                 continue
       
  1162             if term is None:
       
  1163                 for term, termsolindices in sourceterms.items():
       
  1164                     if isinstance(term, Relation) and self.crossed_relation(source, term):
       
  1165                         continue
       
  1166                     termsolindices -= solindices
       
  1167                     if not termsolindices:
       
  1168                         del sourceterms[term]
       
  1169             else:
       
  1170                 try:
       
  1171                     sourceterms[term] -= solindices
       
  1172                     if not sourceterms[term]:
       
  1173                         del sourceterms[term]
       
  1174                 except KeyError:
       
  1175                     pass
       
  1176                     #assert term in cross_terms
       
  1177             if not sourceterms:
       
  1178                 del self._sourcesterms[source]
       
  1179 
       
  1180     def merge_input_maps(self, allsolindices, complete=True):
       
  1181         """inputmaps is a dictionary with tuple of solution indices as key with
       
  1182         an associated input map as value. This function compute for each
       
  1183         solution its necessary input map and return them grouped
       
  1184 
       
  1185         ex:
       
  1186         inputmaps = {(0, 1, 2): {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1'},
       
  1187                      (1,): {'X': 't2.C0', 'T': 't2.C1'}}
       
  1188         return : [([1],  {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1',
       
  1189                            'X': 't2.C0', 'T': 't2.C1'}),
       
  1190                   ([0,2], {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1'})]
       
  1191         """
       
  1192         if not self._inputmaps:
       
  1193             return [(allsolindices, None)]
       
  1194         _allsolindices = allsolindices.copy()
       
  1195         mapbysol = {}
       
  1196         # compute a single map for each solution
       
  1197         for solindices, basemap in self._inputmaps.iteritems():
       
  1198             for solindex in solindices:
       
  1199                 if not (complete or solindex in allsolindices):
       
  1200                     continue
       
  1201                 solmap = mapbysol.setdefault(solindex, {})
       
  1202                 solmap.update(basemap)
       
  1203                 try:
       
  1204                     _allsolindices.remove(solindex)
       
  1205                 except KeyError:
       
  1206                     continue # already removed
       
  1207         # group results by identical input map
       
  1208         result = []
       
  1209         for solindex, solmap in mapbysol.iteritems():
       
  1210             for solindices, commonmap in result:
       
  1211                 if commonmap == solmap:
       
  1212                     solindices.append(solindex)
       
  1213                     break
       
  1214             else:
       
  1215                 result.append( ([solindex], solmap) )
       
  1216         if _allsolindices:
       
  1217             result.append( (list(_allsolindices), None) )
       
  1218         return result
       
  1219 
       
  1220     def build_final_part(self, select, solindices, inputmap,  sources,
       
  1221                          insertedvars):
       
  1222         solutions = [self._solutions[i] for i in solindices]
       
  1223         if self._conflicts and inputmap:
       
  1224             for varname, mappedto in self._conflicts:
       
  1225                 var = select.defined_vars[varname]
       
  1226                 newvar = select.make_variable()
       
  1227                 # XXX should use var.scope but scope hasn't been computed yet
       
  1228                 select.add_relation(var, 'identity', newvar)
       
  1229                 for sol in solutions:
       
  1230                     sol[newvar.name] = sol[varname]
       
  1231                 inputmap[newvar.name] = mappedto
       
  1232         rqlst = self.plan.finalize(select, solutions, insertedvars)
       
  1233         if self.temptable is None and self.finaltable is None:
       
  1234             return OneFetchStep(self.plan, rqlst, sources, inputmap=inputmap)
       
  1235         table = self.temptable or self.finaltable
       
  1236         return FetchStep(self.plan, rqlst, sources, table, True, inputmap)
       
  1237 
       
  1238     def build_non_final_part(self, select, solindices, sources, insertedvars,
       
  1239                              table):
       
  1240         """non final step, will have to store results in a temporary table"""
       
  1241         inputmapkey = tuple(sorted(solindices))
       
  1242         solutions = [self._solutions[i] for i in solindices]
       
  1243         # XXX be smarter vs rql comparison
       
  1244         idx_key = (select.as_string(), inputmapkey,
       
  1245                    tuple(sorted(sources)), tuple(sorted(insertedvars)))
       
  1246         try:
       
  1247             # if a similar step has already been process, simply backport its
       
  1248             # input map
       
  1249             step = self.plan.ms_steps_idx[idx_key]
       
  1250         except KeyError:
       
  1251             # processing needed
       
  1252             rqlst = self.plan.finalize(select, solutions, insertedvars)
       
  1253             step = FetchStep(self.plan, rqlst, sources, table, False)
       
  1254             self.plan.ms_steps_idx[idx_key] = step
       
  1255             self.plan.add_step(step)
       
  1256         # update input map for following steps, according to processed solutions
       
  1257         inputmap = self._inputmaps.setdefault(inputmapkey, {})
       
  1258         for varname, mapping in step.outputmap.iteritems():
       
  1259             if varname in inputmap and not '.' in varname and  \
       
  1260                    not (mapping == inputmap[varname] or
       
  1261                         self._schema.eschema(solutions[0][varname]).final):
       
  1262                 self._conflicts.append((varname, inputmap[varname]))
       
  1263         inputmap.update(step.outputmap)
       
  1264 
       
  1265 
       
  1266 @deprecated('[3.18] old multi-source system will go away in the next version')
       
  1267 class MSPlanner(SSPlanner):
       
  1268     """MultiSourcesPlanner: build execution plan for rql queries
       
  1269 
       
  1270     decompose the RQL query according to sources'schema
       
  1271     """
       
  1272 
       
  1273     def build_select_plan(self, plan, rqlst):
       
  1274         """build execution plan for a SELECT RQL query
       
  1275 
       
  1276         the rqlst should not be tagged at this point
       
  1277         """
       
  1278         # preprocess deals with security insertion and returns a new syntax tree
       
  1279         # which have to be executed to fulfill the query: according
       
  1280         # to permissions for variable's type, different rql queries may have to
       
  1281         # be executed
       
  1282         plan.preprocess(rqlst)
       
  1283         if server.DEBUG & server.DBG_MS:
       
  1284             print '-'*80
       
  1285             print 'PLANNING', rqlst
       
  1286         ppis = [PartPlanInformation(plan, select, self.rqlhelper)
       
  1287                 for select in rqlst.children]
       
  1288         plan.ms_steps_idx = {}
       
  1289         steps = self._union_plan(plan, ppis)
       
  1290         if server.DEBUG & server.DBG_MS:
       
  1291             from pprint import pprint
       
  1292             for step in plan.steps:
       
  1293                 pprint(step.test_repr())
       
  1294             pprint(steps[0].test_repr())
       
  1295         return steps
       
  1296 
       
  1297     def _ppi_subqueries(self, ppi):
       
  1298         # part plan info for subqueries
       
  1299         plan = ppi.plan
       
  1300         inputmap = {}
       
  1301         for subquery in ppi.rqlst.with_[:]:
       
  1302             sppis = [PartPlanInformation(plan, select)
       
  1303                      for select in subquery.query.children]
       
  1304             for sppi in sppis:
       
  1305                 if sppi.needsplit or sppi.part_sources != ppi.part_sources:
       
  1306                     temptable = plan.make_temp_table_name('T%s' % make_uid(id(subquery)))
       
  1307                     sstep = self._union_plan(plan, sppis, temptable)[0]
       
  1308                     break
       
  1309             else:
       
  1310                 sstep = None
       
  1311             if sstep is not None:
       
  1312                 ppi.rqlst.with_.remove(subquery)
       
  1313                 for i, colalias in enumerate(subquery.aliases):
       
  1314                     inputmap[colalias.name] = '%s.C%s' % (temptable, i)
       
  1315                 ppi.plan.add_step(sstep)
       
  1316         return inputmap
       
  1317 
       
  1318     def _union_plan(self, plan, ppis, temptable=None):
       
  1319         tosplit, cango, allsources = [], {}, set()
       
  1320         for planinfo in ppis:
       
  1321             if planinfo.needsplit:
       
  1322                 tosplit.append(planinfo)
       
  1323             else:
       
  1324                 cango.setdefault(planinfo.part_sources, []).append(planinfo)
       
  1325             for source in planinfo.part_sources:
       
  1326                 allsources.add(source)
       
  1327         # first add steps for query parts which doesn't need to splitted
       
  1328         steps = []
       
  1329         for sources, cppis in cango.iteritems():
       
  1330             byinputmap = {}
       
  1331             for ppi in cppis:
       
  1332                 select = ppi.rqlst
       
  1333                 if sources != (ppi.system_source,):
       
  1334                     add_types_restriction(self.schema, select)
       
  1335                 # part plan info for subqueries
       
  1336                 inputmap = self._ppi_subqueries(ppi)
       
  1337                 aggrstep = need_aggr_step(select, sources)
       
  1338                 if aggrstep:
       
  1339                     atemptable = plan.make_temp_table_name('T%s' % make_uid(id(select)))
       
  1340                     sunion = Union()
       
  1341                     sunion.append(select)
       
  1342                     selected = select.selection[:]
       
  1343                     select_group_sort(select)
       
  1344                     step = AggrStep(plan, selected, select, atemptable, temptable)
       
  1345                     step.set_limit_offset(select.limit, select.offset)
       
  1346                     select.limit = None
       
  1347                     select.offset = 0
       
  1348                     fstep = FetchStep(plan, sunion, sources, atemptable, True, inputmap)
       
  1349                     step.children.append(fstep)
       
  1350                     steps.append(step)
       
  1351                 else:
       
  1352                     byinputmap.setdefault(tuple(inputmap.iteritems()), []).append( (select) )
       
  1353             for inputmap, queries in byinputmap.iteritems():
       
  1354                 inputmap = dict(inputmap)
       
  1355                 sunion = Union()
       
  1356                 for select in queries:
       
  1357                     sunion.append(select)
       
  1358                 if temptable:
       
  1359                     steps.append(FetchStep(plan, sunion, sources, temptable, True, inputmap))
       
  1360                 else:
       
  1361                     steps.append(OneFetchStep(plan, sunion, sources, inputmap))
       
  1362         # then add steps for splitted query parts
       
  1363         for planinfo in tosplit:
       
  1364             steps.append(self.split_part(planinfo, temptable))
       
  1365         if len(steps) > 1:
       
  1366             if temptable:
       
  1367                 step = UnionFetchStep(plan)
       
  1368             else:
       
  1369                 step = UnionStep(plan)
       
  1370             step.children = steps
       
  1371             return (step,)
       
  1372         return steps
       
  1373 
       
  1374     # internal methods for multisources decomposition #########################
       
  1375 
       
  1376     def split_part(self, ppi, temptable):
       
  1377         ppi.finaltable = temptable
       
  1378         plan = ppi.plan
       
  1379         select = ppi.rqlst
       
  1380         subinputmap = self._ppi_subqueries(ppi)
       
  1381         stepdefs = ppi.part_steps()
       
  1382         if need_aggr_step(select, ppi.part_sources, stepdefs):
       
  1383             atemptable = plan.make_temp_table_name('T%s' % make_uid(id(select)))
       
  1384             selection = select.selection[:]
       
  1385             select_group_sort(select)
       
  1386         else:
       
  1387             atemptable = None
       
  1388             selection = select.selection
       
  1389         ppi.temptable = atemptable
       
  1390         vfilter = TermsFiltererVisitor(self.schema, ppi)
       
  1391         steps = []
       
  1392         multifinal = len([x for x in stepdefs if x[-1]]) >= 2
       
  1393         for sources, terms, solindices, scope, needsel, final in stepdefs:
       
  1394             # extract an executable query using only the specified terms
       
  1395             if sources[0].uri == 'system':
       
  1396                 # in this case we have to merge input maps before call to
       
  1397                 # filter so already processed restriction are correctly
       
  1398                 # removed
       
  1399                 solsinputmaps = ppi.merge_input_maps(
       
  1400                     solindices, complete=not (final and multifinal))
       
  1401                 for solindices, inputmap in solsinputmaps:
       
  1402                     minrqlst, insertedvars = vfilter.filter(
       
  1403                         sources, terms, scope, set(solindices), needsel, final)
       
  1404                     if inputmap is None:
       
  1405                         inputmap = subinputmap
       
  1406                     else:
       
  1407                         inputmap.update(subinputmap)
       
  1408                     steps.append(ppi.build_final_part(minrqlst, solindices, inputmap,
       
  1409                                                       sources, insertedvars))
       
  1410             else:
       
  1411                 # this is a final part (i.e. retreiving results for the
       
  1412                 # original query part) if all term / sources have been
       
  1413                 # treated or if this is the last shot for used solutions
       
  1414                 minrqlst, insertedvars = vfilter.filter(
       
  1415                     sources, terms, scope, solindices, needsel, final)
       
  1416                 if final:
       
  1417                     solsinputmaps = ppi.merge_input_maps(
       
  1418                         solindices, complete=not (final and multifinal))
       
  1419                     if len(solsinputmaps) > 1:
       
  1420                         refrqlst = minrqlst
       
  1421                     for solindices, inputmap in solsinputmaps:
       
  1422                         if inputmap is None:
       
  1423                             inputmap = subinputmap
       
  1424                         else:
       
  1425                             inputmap.update(subinputmap)
       
  1426                         if len(solsinputmaps) > 1:
       
  1427                             minrqlst = refrqlst.copy()
       
  1428                             sources = sources[:]
       
  1429                         if inputmap and len(sources) > 1:
       
  1430                             sources.remove(ppi.system_source)
       
  1431                             steps.append(ppi.build_final_part(minrqlst, solindices, None,
       
  1432                                                               sources, insertedvars))
       
  1433                             steps.append(ppi.build_final_part(minrqlst, solindices, inputmap,
       
  1434                                                               [ppi.system_source], insertedvars))
       
  1435                         else:
       
  1436                             steps.append(ppi.build_final_part(minrqlst, solindices, inputmap,
       
  1437                                                               sources, insertedvars))
       
  1438                 else:
       
  1439                     table = plan.make_temp_table_name('T%s' % make_uid(id(select)))
       
  1440                     ppi.build_non_final_part(minrqlst, solindices, sources,
       
  1441                                              insertedvars, table)
       
  1442         # finally: join parts, deal with aggregat/group/sorts if necessary
       
  1443         if atemptable is not None:
       
  1444             step = AggrStep(plan, selection, select, atemptable, temptable)
       
  1445             step.children = steps
       
  1446         elif len(steps) > 1:
       
  1447             getrschema = self.schema.rschema
       
  1448             if need_intersect(select, getrschema) or any(need_intersect(select, getrschema)
       
  1449                                                          for step in steps
       
  1450                                                          for select in step.union.children):
       
  1451                 if temptable:
       
  1452                     raise NotImplementedError('oops') # IntersectFetchStep(plan)
       
  1453                 else:
       
  1454                     step = IntersectStep(plan)
       
  1455             else:
       
  1456                 if temptable:
       
  1457                     step = UnionFetchStep(plan)
       
  1458                 else:
       
  1459                     step = UnionStep(plan)
       
  1460             step.children = steps
       
  1461         else:
       
  1462             step = steps[0]
       
  1463         if select.limit is not None or select.offset:
       
  1464             step.set_limit_offset(select.limit, select.offset)
       
  1465         return step
       
  1466 
       
  1467 
       
  1468 class UnsupportedBranch(Exception):
       
  1469     pass
       
  1470 
       
  1471 
       
  1472 class TermsFiltererVisitor(object):
       
  1473     def __init__(self, schema, ppi):
       
  1474         self.schema = schema
       
  1475         self.ppi = ppi
       
  1476         self.skip = {}
       
  1477         self.hasaggrstep = self.ppi.temptable
       
  1478         self.extneedsel = frozenset(vref.name for sortterm in ppi.rqlst.orderby
       
  1479                                     for vref in sortterm.iget_nodes(VariableRef))
       
  1480 
       
  1481     def _rqlst_accept(self, rqlst, node, newroot, terms, setfunc=None):
       
  1482         try:
       
  1483             newrestr, node_ = node.accept(self, newroot, terms[:])
       
  1484         except UnsupportedBranch:
       
  1485             return rqlst
       
  1486         if setfunc is not None and newrestr is not None:
       
  1487             setfunc(newrestr)
       
  1488         if not node_ is node:
       
  1489             rqlst = node.parent
       
  1490         return rqlst
       
  1491 
       
  1492     def filter(self, sources, terms, rqlst, solindices, needsel, final):
       
  1493         if server.DEBUG & server.DBG_MS:
       
  1494             print 'filter', final and 'final' or '', sources, terms, rqlst, solindices, needsel
       
  1495         newroot = Select()
       
  1496         self.sources = sorted(sources)
       
  1497         self.terms = terms
       
  1498         self.solindices = solindices
       
  1499         self.final = final
       
  1500         self._pending_vrefs = []
       
  1501         # terms which appear in unsupported branches
       
  1502         needsel |= self.extneedsel
       
  1503         self.needsel = needsel
       
  1504         # terms which appear in supported branches
       
  1505         self.mayneedsel = set()
       
  1506         # new inserted variables
       
  1507         self.insertedvars = []
       
  1508         # other structures (XXX document)
       
  1509         self.mayneedvar, self.hasvar = {}, {}
       
  1510         self.use_only_defined = False
       
  1511         self.scopes = {rqlst: newroot}
       
  1512         self.current_scope = rqlst
       
  1513         if rqlst.where:
       
  1514             rqlst = self._rqlst_accept(rqlst, rqlst.where, newroot, terms,
       
  1515                                        newroot.set_where)
       
  1516         if isinstance(rqlst, Select):
       
  1517             self.use_only_defined = True
       
  1518             if rqlst.groupby:
       
  1519                 groupby = []
       
  1520                 for node in rqlst.groupby:
       
  1521                     rqlst = self._rqlst_accept(rqlst, node, newroot, terms,
       
  1522                                                groupby.append)
       
  1523                 if groupby:
       
  1524                     newroot.set_groupby(groupby)
       
  1525             if rqlst.having:
       
  1526                 having = []
       
  1527                 for node in rqlst.having:
       
  1528                     rqlst = self._rqlst_accept(rqlst, node, newroot, terms,
       
  1529                                                having.append)
       
  1530                 if having:
       
  1531                     newroot.set_having(having)
       
  1532             if final and rqlst.orderby and not self.hasaggrstep:
       
  1533                 orderby = []
       
  1534                 for node in rqlst.orderby:
       
  1535                     rqlst = self._rqlst_accept(rqlst, node, newroot, terms,
       
  1536                                                orderby.append)
       
  1537                 if orderby:
       
  1538                     newroot.set_orderby(orderby)
       
  1539             elif rqlst.orderby:
       
  1540                 for sortterm in rqlst.orderby:
       
  1541                     if any(f for f in sortterm.iget_nodes(Function) if f.name == 'FTIRANK'):
       
  1542                         newnode, oldnode = sortterm.accept(self, newroot, terms)
       
  1543                         if newnode is not None:
       
  1544                             newroot.add_sort_term(newnode)
       
  1545             self.process_selection(newroot, terms, rqlst)
       
  1546         elif not newroot.where:
       
  1547             # no restrictions have been copied, just select terms and add
       
  1548             # type restriction (done later by add_types_restriction)
       
  1549             for v in terms:
       
  1550                 if not isinstance(v, Variable):
       
  1551                     continue
       
  1552                 newroot.append_selected(VariableRef(newroot.get_variable(v.name)))
       
  1553         solutions = self.ppi.copy_solutions(solindices)
       
  1554         cleanup_solutions(newroot, solutions)
       
  1555         newroot.set_possible_types(solutions)
       
  1556         if final:
       
  1557             if self.hasaggrstep:
       
  1558                 self.add_necessary_selection(newroot, self.mayneedsel & self.extneedsel)
       
  1559             newroot.distinct = rqlst.distinct
       
  1560         else:
       
  1561             self.add_necessary_selection(newroot, self.mayneedsel & self.needsel)
       
  1562             # insert vars to fetch constant values when needed
       
  1563             for (varname, rschema), reldefs in self.mayneedvar.iteritems():
       
  1564                 for rel, ored in reldefs:
       
  1565                     if not (varname, rschema) in self.hasvar:
       
  1566                         self.hasvar[(varname, rschema)] = None # just to avoid further insertion
       
  1567                         cvar = newroot.make_variable()
       
  1568                         for sol in newroot.solutions:
       
  1569                             sol[cvar.name] = rschema.objects(sol[varname])[0]
       
  1570                         # if the current restriction is not used in a OR branch,
       
  1571                         # we can keep it, else we have to drop the constant
       
  1572                         # restriction (or we may miss some results)
       
  1573                         if not ored:
       
  1574                             rel = rel.copy(newroot)
       
  1575                             newroot.add_restriction(rel)
       
  1576                         # add a relation to link the variable
       
  1577                         newroot.remove_node(rel.children[1])
       
  1578                         cmp = Comparison('=')
       
  1579                         rel.append(cmp)
       
  1580                         cmp.append(VariableRef(cvar))
       
  1581                         self.insertedvars.append((varname, rschema, cvar.name))
       
  1582                         newroot.append_selected(VariableRef(newroot.get_variable(cvar.name)))
       
  1583                         # NOTE: even if the restriction is done by this query, we have
       
  1584                         # to let it in the original rqlst so that it appears anyway in
       
  1585                         # the "final" query, else we may change the meaning of the query
       
  1586                         # if there are NOT somewhere :
       
  1587                         # 'NOT X relation Y, Y name "toto"' means X WHERE X isn't related
       
  1588                         # to Y whose name is toto while
       
  1589                         # 'NOT X relation Y' means X WHERE X has no 'relation' (whatever Y)
       
  1590                     elif ored:
       
  1591                         newroot.remove_node(rel)
       
  1592         add_types_restriction(self.schema, rqlst, newroot, solutions)
       
  1593         if server.DEBUG & server.DBG_MS:
       
  1594             print '--->', newroot
       
  1595         return newroot, self.insertedvars
       
  1596 
       
  1597     def visit_and(self, node, newroot, terms):
       
  1598         subparts = []
       
  1599         for i in xrange(len(node.children)):
       
  1600             child = node.children[i]
       
  1601             try:
       
  1602                 newchild, child_ = child.accept(self, newroot, terms)
       
  1603                 if not child_ is child:
       
  1604                     node = child_.parent
       
  1605                 if newchild is None:
       
  1606                     continue
       
  1607                 subparts.append(newchild)
       
  1608             except UnsupportedBranch:
       
  1609                 continue
       
  1610         if not subparts:
       
  1611             return None, node
       
  1612         if len(subparts) == 1:
       
  1613             return subparts[0], node
       
  1614         return copy_node(newroot, node, subparts), node
       
  1615 
       
  1616     visit_or = visit_and
       
  1617 
       
  1618     def _relation_supported(self, relation):
       
  1619         rtype = relation.r_type
       
  1620         for source in self.sources:
       
  1621             if not source.support_relation(rtype) or (
       
  1622                 rtype in source.cross_relations and not relation in self.terms):
       
  1623                 return False
       
  1624         if not self.final and not relation in self.terms:
       
  1625             rschema = self.schema.rschema(relation.r_type)
       
  1626             if not rschema.final:
       
  1627                 for term in relation.get_nodes((VariableRef, Constant)):
       
  1628                     term = getattr(term, 'variable', term)
       
  1629                     termsources = sorted(set(x[0] for x in self.ppi._term_sources(term)))
       
  1630                     if termsources and termsources != self.sources:
       
  1631                         return False
       
  1632         return True
       
  1633 
       
  1634     def visit_relation(self, node, newroot, terms):
       
  1635         if not node.is_types_restriction():
       
  1636             if not node in terms and node in self.skip and self.solindices.issubset(self.skip[node]):
       
  1637                 return None, node
       
  1638             if not self._relation_supported(node):
       
  1639                 raise UnsupportedBranch()
       
  1640         # don't copy type restriction unless this is the only supported relation
       
  1641         # for the lhs variable, else they'll be reinserted later as needed (in
       
  1642         # other cases we may copy a type restriction while the variable is not
       
  1643         # actually used)
       
  1644         elif not (node.neged(strict=True) or
       
  1645                   any(self._relation_supported(rel)
       
  1646                       for rel in node.children[0].variable.stinfo['relations'])):
       
  1647             return self.visit_default(node, newroot, terms)
       
  1648         else:
       
  1649             raise UnsupportedBranch()
       
  1650         rschema = self.schema.rschema(node.r_type)
       
  1651         self._pending_vrefs = []
       
  1652         try:
       
  1653             res = self.visit_default(node, newroot, terms)[0]
       
  1654         except Exception:
       
  1655             # when a relation isn't supported, we should dereference potentially
       
  1656             # introduced variable refs
       
  1657             for vref in self._pending_vrefs:
       
  1658                 vref.unregister_reference()
       
  1659             raise
       
  1660         ored = node.ored()
       
  1661         if rschema.final or rschema.inlined:
       
  1662             vrefs = node.children[1].get_nodes(VariableRef)
       
  1663             if not vrefs:
       
  1664                 if not ored:
       
  1665                     self.skip.setdefault(node, set()).update(self.solindices)
       
  1666                 else:
       
  1667                     self.mayneedvar.setdefault((node.children[0].name, rschema), []).append( (res, ored) )
       
  1668             else:
       
  1669                 assert len(vrefs) == 1
       
  1670                 vref = vrefs[0]
       
  1671                 # XXX check operator ?
       
  1672                 self.hasvar[(node.children[0].name, rschema)] = vref
       
  1673                 if self._may_skip_attr_rel(rschema, node, vref, ored, terms, res):
       
  1674                     self.skip.setdefault(node, set()).update(self.solindices)
       
  1675         elif not ored:
       
  1676             self.skip.setdefault(node, set()).update(self.solindices)
       
  1677         return res, node
       
  1678 
       
  1679     def _may_skip_attr_rel(self, rschema, rel, vref, ored, terms, res):
       
  1680         var = vref.variable
       
  1681         if ored:
       
  1682             return False
       
  1683         if var.name in self.extneedsel or var.stinfo['selected']:
       
  1684             return False
       
  1685         if not var in terms or used_in_outer_scope(var, self.current_scope):
       
  1686             return False
       
  1687         if any(v for v, _ in var.stinfo.get('attrvars', ()) if not v in terms):
       
  1688             return False
       
  1689         return True
       
  1690 
       
  1691     def visit_exists(self, node, newroot, terms):
       
  1692         newexists = node.__class__()
       
  1693         self.scopes = {node: newexists}
       
  1694         subparts, node = self._visit_children(node, newroot, terms)
       
  1695         if not subparts:
       
  1696             return None, node
       
  1697         newexists.set_where(subparts[0])
       
  1698         return newexists, node
       
  1699 
       
  1700     def visit_not(self, node, newroot, terms):
       
  1701         subparts, node = self._visit_children(node, newroot, terms)
       
  1702         if not subparts:
       
  1703             return None, node
       
  1704         return copy_node(newroot, node, subparts), node
       
  1705 
       
  1706     def visit_group(self, node, newroot, terms):
       
  1707         if not self.final:
       
  1708             return None, node
       
  1709         return self.visit_default(node, newroot, terms)
       
  1710 
       
  1711     def visit_variableref(self, node, newroot, terms):
       
  1712         if self.use_only_defined:
       
  1713             if not node.variable.name in newroot.defined_vars:
       
  1714                 raise UnsupportedBranch(node.name)
       
  1715         elif not node.variable in terms:
       
  1716             raise UnsupportedBranch(node.name)
       
  1717         self.mayneedsel.add(node.name)
       
  1718         # set scope so we can insert types restriction properly
       
  1719         newvar = newroot.get_variable(node.name)
       
  1720         newvar.stinfo['scope'] = self.scopes.get(node.variable.scope, newroot)
       
  1721         vref = VariableRef(newvar)
       
  1722         self._pending_vrefs.append(vref)
       
  1723         return vref, node
       
  1724 
       
  1725     def visit_constant(self, node, newroot, terms):
       
  1726         return copy_node(newroot, node), node
       
  1727 
       
  1728     def visit_comparison(self, node, newroot, terms):
       
  1729         subparts, node = self._visit_children(node, newroot, terms)
       
  1730         copy = copy_node(newroot, node, subparts)
       
  1731         # ignore comparison operator when fetching non final query
       
  1732         if not self.final and isinstance(node.children[0], VariableRef):
       
  1733             copy.operator = '='
       
  1734         return copy, node
       
  1735 
       
  1736     def visit_function(self, node, newroot, terms):
       
  1737         if node.name == 'FTIRANK':
       
  1738             # FTIRANK is somewhat special... Rank function should be included in
       
  1739             # the same query has the has_text relation, potentially added to
       
  1740             # selection for latter usage
       
  1741             if not self.hasaggrstep and self.final and node not in self.skip:
       
  1742                 return self.visit_default(node, newroot, terms)
       
  1743             elif any(s for s in self.sources if s.uri != 'system'):
       
  1744                 return None, node
       
  1745             # p = node.parent
       
  1746             # while p is not None and not isinstance(p, SortTerm):
       
  1747             #     p = p.parent
       
  1748             # if isinstance(p, SortTerm):
       
  1749             if not self.hasaggrstep and self.final and node in self.skip:
       
  1750                 return Constant(self.skip[node], 'Int'), node
       
  1751             # XXX only if not yet selected
       
  1752             newroot.append_selected(node.copy(newroot))
       
  1753             self.skip[node] = len(newroot.selection)
       
  1754             return None, node
       
  1755         return self.visit_default(node, newroot, terms)
       
  1756 
       
  1757     def visit_default(self, node, newroot, terms):
       
  1758         subparts, node = self._visit_children(node, newroot, terms)
       
  1759         return copy_node(newroot, node, subparts), node
       
  1760 
       
  1761     visit_mathexpression = visit_constant = visit_default
       
  1762 
       
  1763     def visit_sortterm(self, node, newroot, terms):
       
  1764         subparts, node = self._visit_children(node, newroot, terms)
       
  1765         if not subparts:
       
  1766             return None, node
       
  1767         return copy_node(newroot, node, subparts), node
       
  1768 
       
  1769     def _visit_children(self, node, newroot, terms):
       
  1770         subparts = []
       
  1771         for i in xrange(len(node.children)):
       
  1772             child = node.children[i]
       
  1773             newchild, child_ = child.accept(self, newroot, terms)
       
  1774             if not child is child_:
       
  1775                 node = child_.parent
       
  1776             if newchild is not None:
       
  1777                 subparts.append(newchild)
       
  1778         return subparts, node
       
  1779 
       
  1780     def process_selection(self, newroot, terms, rqlst):
       
  1781         if self.final:
       
  1782             for term in rqlst.selection:
       
  1783                 newroot.append_selected(term.copy(newroot))
       
  1784                 for vref in term.get_nodes(VariableRef):
       
  1785                     self.needsel.add(vref.name)
       
  1786             return
       
  1787         for term in rqlst.selection:
       
  1788             vrefs = term.get_nodes(VariableRef)
       
  1789             if vrefs:
       
  1790                 supportedvars = []
       
  1791                 for vref in vrefs:
       
  1792                     var = vref.variable
       
  1793                     if var in terms:
       
  1794                         supportedvars.append(vref)
       
  1795                         continue
       
  1796                     else:
       
  1797                         self.needsel.add(vref.name)
       
  1798                         break
       
  1799                 else:
       
  1800                     for vref in vrefs:
       
  1801                         newroot.append_selected(vref.copy(newroot))
       
  1802                     supportedvars = []
       
  1803                 for vref in supportedvars:
       
  1804                     if not vref in newroot.get_selected_variables():
       
  1805                         newroot.append_selected(VariableRef(newroot.get_variable(vref.name)))
       
  1806             elif term in self.terms:
       
  1807                 newroot.append_selected(term.copy(newroot))
       
  1808 
       
  1809     def add_necessary_selection(self, newroot, terms):
       
  1810         selected = tuple(newroot.get_selected_variables())
       
  1811         for varname in terms:
       
  1812             var = newroot.defined_vars[varname]
       
  1813             for vref in var.references():
       
  1814                 rel = vref.relation()
       
  1815                 if rel is None and vref in selected:
       
  1816                     # already selected
       
  1817                     break
       
  1818             else:
       
  1819                 selvref = VariableRef(var)
       
  1820                 newroot.append_selected(selvref)
       
  1821                 if newroot.groupby:
       
  1822                     newroot.add_group_var(VariableRef(selvref.variable, noautoref=1))