server/msplanner.py
changeset 257 4c7d3af7e94d
child 341 0a426be2f3a2
equal deleted inserted replaced
256:3dbee583526c 257:4c7d3af7e94d
       
     1 """plan execution of rql queries on multiple sources
       
     2 
       
     3 the best way to understand what are we trying to acheive here is to read
       
     4 the unit-tests in unittest_querier_planner.py
       
     5 
       
     6 
       
     7 
       
     8 Split and execution specifications
       
     9 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
       
    10 For a system source and a ldap user source (only EUser and its attributes
       
    11 is supported, no group or such):
       
    12 
       
    13 
       
    14 :EUser X:
       
    15 1. fetch EUser X from both sources and return concatenation of results
       
    16 
       
    17 
       
    18 :EUser X WHERE X in_group G, G name 'users':
       
    19 * catch 1
       
    20   1. fetch EUser X from both sources, store concatenation of results
       
    21      into a temporary table
       
    22   2. return the result of TMP X WHERE X in_group G, G name 'users' from
       
    23      the system source
       
    24      
       
    25 * catch 2
       
    26   1. return the result of EUser X WHERE X in_group G, G name 'users'
       
    27      from system source, that's enough (optimization of the sql querier
       
    28      will avoid join on EUser, so we will directly get local eids)
       
    29 
       
    30     
       
    31 :EUser X,L WHERE X in_group G, X login L, G name 'users':
       
    32 1. fetch Any X,L WHERE X is EUser, X login L from both sources, store
       
    33    concatenation of results into a temporary table
       
    34 2. return the result of Any X, L WHERE X is TMP, X login LX in_group G,
       
    35    G name 'users' from the system source
       
    36 
       
    37 
       
    38 :Any X WHERE X owned_by Y:
       
    39 * catch 1
       
    40   1. fetch EUser X from both sources, store concatenation of results
       
    41      into a temporary table
       
    42   2. return the result of Any X WHERE X owned_by Y, Y is TMP from
       
    43      the system source
       
    44      
       
    45 * catch 2
       
    46   1. return the result of Any X WHERE X owned_by Y
       
    47      from system source, that's enough (optimization of the sql querier
       
    48      will avoid join on EUser, so we will directly get local eids)
       
    49 
       
    50 
       
    51 :organization: Logilab
       
    52 :copyright: 2003-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
    53 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
    54 """
       
    55 __docformat__ = "restructuredtext en"
       
    56 
       
    57 from itertools import imap, ifilterfalse
       
    58 
       
    59 from logilab.common.compat import any
       
    60 from logilab.common.decorators import cached
       
    61 
       
    62 from rql.stmts import Union, Select
       
    63 from rql.nodes import VariableRef, Comparison, Relation, Constant, Exists, Variable
       
    64 
       
    65 from cubicweb import server
       
    66 from cubicweb.common.utils import make_uid
       
    67 from cubicweb.server.utils import cleanup_solutions
       
    68 from cubicweb.server.ssplanner import SSPlanner, OneFetchStep, add_types_restriction
       
    69 from cubicweb.server.mssteps import *
       
    70 from cubicweb.server.sources import AbstractSource
       
    71 
       
    72 Variable._ms_table_key = lambda x: x.name
       
    73 Relation._ms_table_key = lambda x: x.r_type
       
    74 # str() Constant.value to ensure generated table name won't be unicode
       
    75 Constant._ms_table_key = lambda x: str(x.value)
       
    76 
       
    77 AbstractSource.dont_cross_relations = ()
       
    78 
       
    79 def allequals(solutions):
       
    80     """return true if all solutions are identical"""
       
    81     sol = solutions.next()
       
    82     for sol_ in solutions:
       
    83         if sol_ != sol:
       
    84             return False
       
    85     return True
       
    86     
       
    87 def need_aggr_step(select, sources, stepdefs=None):
       
    88     """return True if a temporary table is necessary to store some partial
       
    89     results to execute the given query
       
    90     """
       
    91     if len(sources) == 1:
       
    92         # can do everything at once with a single source
       
    93         return False
       
    94     if select.orderby or select.groupby or select.has_aggregat:
       
    95         # if more than one source, we need a temp table to deal with sort /
       
    96         # groups / aggregat if :
       
    97         # * the rqlst won't be splitted (in the other case the last query
       
    98         #   using partial temporary table can do sort/groups/aggregat without
       
    99         #   the need for a later AggrStep)
       
   100         # * the rqlst is splitted in multiple steps and there are more than one
       
   101         #   final step
       
   102         if stepdefs is None:
       
   103             return True
       
   104         has_one_final = False
       
   105         fstepsolindices = set()
       
   106         for stepdef in stepdefs:
       
   107             if stepdef[-1]:
       
   108                 if has_one_final or frozenset(stepdef[2]) != fstepsolindices:
       
   109                     return True
       
   110                 has_one_final = True
       
   111             else:
       
   112                 fstepsolindices.update(stepdef[2])
       
   113     return False
       
   114 
       
   115 def copy_node(newroot, node, subparts=()):
       
   116     newnode = node.__class__(*node.initargs(newroot))
       
   117     for part in subparts:
       
   118         newnode.append(part)
       
   119     return newnode
       
   120         
       
   121 def same_scope(var):
       
   122     """return true if the variable is always used in the same scope"""
       
   123     try:
       
   124         return var.stinfo['samescope']
       
   125     except KeyError:
       
   126         for rel in var.stinfo['relations']:
       
   127             if not rel.scope is var.scope:
       
   128                 var.stinfo['samescope'] = False
       
   129                 return False
       
   130         var.stinfo['samescope'] = True
       
   131         return True
       
   132 
       
   133 def select_group_sort(select): # XXX something similar done in rql2sql
       
   134     # add variables used in groups and sort terms to the selection
       
   135     # if necessary
       
   136     if select.groupby:
       
   137         for vref in select.groupby:
       
   138             if not vref in select.selection:
       
   139                 select.append_selected(vref.copy(select))
       
   140     for sortterm in select.orderby:
       
   141         for vref in sortterm.iget_nodes(VariableRef):
       
   142             if not vref in select.get_selected_variables():
       
   143                 # we can't directly insert sortterm.term because it references
       
   144                 # a variable of the select before the copy.
       
   145                 # XXX if constant term are used to define sort, their value
       
   146                 # may necessite a decay
       
   147                 select.append_selected(vref.copy(select))
       
   148                 if select.groupby and not vref in select.groupby:
       
   149                     select.add_group_var(vref.copy(select))
       
   150             
       
   151 
       
   152 class PartPlanInformation(object):
       
   153     """regroups necessary information to execute some part of a "global" rql
       
   154     query ("global" means as received by the querier, which may result in
       
   155     several internal queries, e.g. parts, due to security insertions)
       
   156 
       
   157     it exposes as well some methods helping in executing this part on a
       
   158     multi-sources repository, modifying its internal structure during the
       
   159     process
       
   160 
       
   161     :attr solutions: a list of mappings (varname -> vartype)
       
   162     :attr sourcesvars:
       
   163       a dictionnary telling for each source which variable/solution are
       
   164       supported, of the form {source : {varname: [solution index, ]}}
       
   165     """
       
   166     def __init__(self, plan, rqlst, rqlhelper=None):
       
   167         self.needsplit = False
       
   168         self.temptable = None
       
   169         self.finaltable = None
       
   170         self.plan = plan
       
   171         self.rqlst = rqlst
       
   172         self._session = plan.session
       
   173         self._solutions = rqlst.solutions
       
   174         self._solindices = range(len(self._solutions))
       
   175         # source : {varname: [solution index, ]}
       
   176         self._sourcesvars = {}
       
   177         # dictionnary of variables which are linked to each other using a non
       
   178         # final relation which is supported by multiple sources
       
   179         self._linkedvars = {}
       
   180         # processing
       
   181         self._compute_sourcesvars()
       
   182         self._remove_invalid_sources()
       
   183         #if server.DEBUG:
       
   184         #    print 'planner sources vars', self._sourcesvars
       
   185         self._compute_needsplit()
       
   186         self._inputmaps = {}
       
   187         if rqlhelper is not None: # else test
       
   188             self._insert_identity_variable = rqlhelper._annotator.rewrite_shared_optional
       
   189 
       
   190     def copy_solutions(self, solindices):
       
   191         return [self._solutions[solidx].copy() for solidx in solindices]
       
   192     
       
   193     @property
       
   194     @cached
       
   195     def part_sources(self):
       
   196         if self._sourcesvars:
       
   197             return tuple(sorted(self._sourcesvars))
       
   198         return (self._session.repo.system_source,)
       
   199     
       
   200     @property
       
   201     @cached
       
   202     def _sys_source_set(self):
       
   203         return frozenset((self._session.repo.system_source, solindex)
       
   204                          for solindex in self._solindices)        
       
   205        
       
   206     @cached
       
   207     def _norel_support_set(self, rtype):
       
   208         """return a set of (source, solindex) where source doesn't support the
       
   209         relation
       
   210         """
       
   211         return frozenset((source, solidx) for source in self._session.repo.sources
       
   212                          for solidx in self._solindices
       
   213                          if not (source.support_relation(rtype)
       
   214                                  or rtype in source.dont_cross_relations))
       
   215     
       
   216     def _compute_sourcesvars(self):
       
   217         """compute for each variable/solution in the rqlst which sources support
       
   218         them
       
   219         """
       
   220         repo = self._session.repo
       
   221         eschema = repo.schema.eschema
       
   222         sourcesvars = self._sourcesvars
       
   223         # find for each source which variable/solution are supported
       
   224         for varname, varobj in self.rqlst.defined_vars.items():
       
   225             # if variable has an eid specified, we can get its source directly
       
   226             # NOTE: use uidrels and not constnode to deal with "X eid IN(1,2,3,4)"
       
   227             if varobj.stinfo['uidrels']:
       
   228                 vrels = varobj.stinfo['relations'] - varobj.stinfo['uidrels']
       
   229                 for rel in varobj.stinfo['uidrels']:
       
   230                     if  rel.neged(strict=True) or rel.operator() != '=':
       
   231                         continue
       
   232                     for const in rel.children[1].get_nodes(Constant):
       
   233                         eid = const.eval(self.plan.args)
       
   234                         source = self._session.source_from_eid(eid)
       
   235                         if vrels and not any(source.support_relation(r.r_type)
       
   236                                              for r in vrels):
       
   237                             self._set_source_for_var(repo.system_source, varobj)
       
   238                         else:
       
   239                             self._set_source_for_var(source, varobj)
       
   240                 continue
       
   241             rels = varobj.stinfo['relations']
       
   242             if not rels and not varobj.stinfo['typerels']:
       
   243                 # (rare) case where the variable has no type specified nor
       
   244                 # relation accessed ex. "Any MAX(X)"
       
   245                 self._set_source_for_var(repo.system_source, varobj)
       
   246                 continue
       
   247             for i, sol in enumerate(self._solutions):
       
   248                 vartype = sol[varname]
       
   249                 # skip final variable
       
   250                 if eschema(vartype).is_final():
       
   251                     break
       
   252                 for source in repo.sources:
       
   253                     if source.support_entity(vartype):
       
   254                         # the source support the entity type, though we will
       
   255                         # actually have to fetch from it only if
       
   256                         # * the variable isn't invariant
       
   257                         # * at least one supported relation specified
       
   258                         if not varobj._q_invariant or \
       
   259                                any(imap(source.support_relation,
       
   260                                         (r.r_type for r in rels if r.r_type != 'eid'))):
       
   261                             sourcesvars.setdefault(source, {}).setdefault(varobj, set()).add(i)
       
   262                         # if variable is not invariant and is used by a relation
       
   263                         # not supported by this source, we'll have to split the
       
   264                         # query
       
   265                         if not varobj._q_invariant and any(ifilterfalse(
       
   266                             source.support_relation, (r.r_type for r in rels))):
       
   267                             self.needsplit = True               
       
   268             
       
   269     def _remove_invalid_sources(self):
       
   270         """removes invalid sources from `sourcesvars` member"""
       
   271         repo = self._session.repo
       
   272         rschema = repo.schema.rschema
       
   273         vsources = {}
       
   274         for rel in self.rqlst.iget_nodes(Relation):
       
   275             # process non final relations only
       
   276             # note: don't try to get schema for 'is' relation (not available
       
   277             # during bootstrap)
       
   278             if not rel.is_types_restriction() and not rschema(rel.r_type).is_final():
       
   279                 # nothing to do if relation is not supported by multiple sources
       
   280                 relsources = [source for source in repo.sources
       
   281                               if source.support_relation(rel.r_type)
       
   282                               or rel.r_type in source.dont_cross_relations]
       
   283                 if len(relsources) < 2:
       
   284                     if relsources:# and not relsources[0] in self._sourcesvars:
       
   285                         # this means the relation is using a variable inlined as
       
   286                         # a constant and another unsupported variable, in which
       
   287                         # case we put the relation in sourcesvars
       
   288                         self._sourcesvars.setdefault(relsources[0], {})[rel] = set(self._solindices)
       
   289                     continue
       
   290                 lhs, rhs = rel.get_variable_parts()
       
   291                 lhsv, rhsv = getattr(lhs, 'variable', lhs), getattr(rhs, 'variable', rhs)
       
   292                 # update dictionnary of sources supporting lhs and rhs vars
       
   293                 if not lhsv in vsources:
       
   294                     vsources[lhsv] = self._term_sources(lhs)
       
   295                 if not rhsv in vsources:
       
   296                     vsources[rhsv] = self._term_sources(rhs)
       
   297                 self._linkedvars.setdefault(lhsv, set()).add((rhsv, rel))
       
   298                 self._linkedvars.setdefault(rhsv, set()).add((lhsv, rel))
       
   299         for term in self._linkedvars:
       
   300             self._remove_sources_until_stable(term, vsources)
       
   301         if len(self._sourcesvars) > 1 and hasattr(self.plan.rqlst, 'main_relations'):
       
   302             # the querier doesn't annotate write queries, need to do it here
       
   303             self.plan.annotate_rqlst()
       
   304             # insert/update/delete queries, we may get extra information from
       
   305             # the main relation (eg relations to the left of the WHERE
       
   306             if self.plan.rqlst.TYPE == 'insert':
       
   307                 inserted = dict((vref.variable, etype)
       
   308                                 for etype, vref in self.plan.rqlst.main_variables)
       
   309             else:
       
   310                 inserted = {}
       
   311             for rel in self.plan.rqlst.main_relations:
       
   312                 if not rschema(rel.r_type).is_final():
       
   313                     # nothing to do if relation is not supported by multiple sources
       
   314                     relsources = [source for source in repo.sources
       
   315                                   if source.support_relation(rel.r_type)
       
   316                                   or rel.r_type in source.dont_cross_relations]
       
   317                     if len(relsources) < 2:
       
   318                         continue
       
   319                     lhs, rhs = rel.get_variable_parts()
       
   320                     try:
       
   321                         lhsv = self._extern_term(lhs, vsources, inserted)
       
   322                         rhsv = self._extern_term(rhs, vsources, inserted)
       
   323                     except KeyError, ex:
       
   324                         continue
       
   325                     norelsup = self._norel_support_set(rel.r_type)
       
   326                     self._remove_var_sources(lhsv, norelsup, rhsv, vsources)
       
   327                     self._remove_var_sources(rhsv, norelsup, lhsv, vsources)
       
   328         # cleanup linked var
       
   329         for var, linkedrelsinfo in self._linkedvars.iteritems():
       
   330             self._linkedvars[var] = frozenset(x[0] for x in linkedrelsinfo)
       
   331         # if there are other sources than the system source, consider simplified
       
   332         # variables'source
       
   333         if self._sourcesvars and self._sourcesvars.keys() != [self._session.repo.system_source]:
       
   334             # add source for rewritten constants to sourcesvars
       
   335             for vconsts in self.rqlst.stinfo['rewritten'].itervalues():
       
   336                 const = vconsts[0]
       
   337                 eid = const.eval(self.plan.args)
       
   338                 source = self._session.source_from_eid(eid)
       
   339                 if source is self._session.repo.system_source:
       
   340                     for const in vconsts:
       
   341                         self._set_source_for_var(source, const)
       
   342                 elif source in self._sourcesvars:
       
   343                     source_scopes = frozenset(v.scope for v in self._sourcesvars[source])
       
   344                     for const in vconsts:
       
   345                         if const.scope in source_scopes:
       
   346                             self._set_source_for_var(source, const)
       
   347         
       
   348     def _extern_term(self, term, vsources, inserted):
       
   349         var = term.variable
       
   350         if var.stinfo['constnode']:
       
   351             termv = var.stinfo['constnode']
       
   352             vsources[termv] = self._term_sources(termv)
       
   353         elif var in inserted:
       
   354             termv = var
       
   355             source = self._session.repo.locate_etype_source(inserted[var])
       
   356             vsources[termv] = set((source, solindex) for solindex in self._solindices)
       
   357         else:
       
   358             termv = self.rqlst.defined_vars[var.name]
       
   359             if not termv in vsources:
       
   360                 vsources[termv] = self._term_sources(termv)
       
   361         return termv
       
   362         
       
   363     def _remove_sources_until_stable(self, var, vsources):
       
   364         for ovar, rel in self._linkedvars.get(var, ()):
       
   365             if not var.scope is ovar.scope and rel.scope.neged(strict=True):
       
   366                 # can't get information from relation inside a NOT exists
       
   367                 # where variables don't belong to the same scope
       
   368                 continue
       
   369             if rel.neged(strict=True):
       
   370                 # neged relation doesn't allow to infer variable sources
       
   371                 continue
       
   372             norelsup = self._norel_support_set(rel.r_type)
       
   373             # compute invalid sources for variables and remove them
       
   374             self._remove_var_sources(var, norelsup, ovar, vsources)
       
   375             self._remove_var_sources(ovar, norelsup, var, vsources)
       
   376     
       
   377     def _remove_var_sources(self, var, norelsup, ovar, vsources):
       
   378         """remove invalid sources for var according to ovar's sources and the
       
   379         relation between those two variables. 
       
   380         """
       
   381         varsources = vsources[var]
       
   382         invalid_sources = varsources - (vsources[ovar] | norelsup)
       
   383         if invalid_sources:
       
   384             self._remove_sources(var, invalid_sources)
       
   385             varsources -= invalid_sources
       
   386             self._remove_sources_until_stable(var, vsources)
       
   387         
       
   388     def _compute_needsplit(self):
       
   389         """tell according to sourcesvars if the rqlst has to be splitted for
       
   390         execution among multiple sources
       
   391         
       
   392         the execution has to be split if
       
   393         * a source support an entity (non invariant) but doesn't support a
       
   394           relation on it
       
   395         * a source support an entity which is accessed by an optional relation
       
   396         * there is more than one sources and either all sources'supported        
       
   397           variable/solutions are not equivalent or multiple variables have to
       
   398           be fetched from some source
       
   399         """
       
   400         # NOTE: < 2 since may be 0 on queries such as Any X WHERE X eid 2
       
   401         if len(self._sourcesvars) < 2: 
       
   402             self.needsplit = False
       
   403         elif not self.needsplit:
       
   404             if not allequals(self._sourcesvars.itervalues()):
       
   405                 self.needsplit = True
       
   406             else:
       
   407                 sample = self._sourcesvars.itervalues().next()
       
   408                 if len(sample) > 1 and any(v for v in sample
       
   409                                            if not v in self._linkedvars):
       
   410                     self.needsplit = True
       
   411 
       
   412     def _set_source_for_var(self, source, var):
       
   413         self._sourcesvars.setdefault(source, {})[var] = set(self._solindices)
       
   414 
       
   415     def _term_sources(self, term):
       
   416         """returns possible sources for terms `term`"""
       
   417         if isinstance(term, Constant):
       
   418             source = self._session.source_from_eid(term.eval(self.plan.args))
       
   419             return set((source, solindex) for solindex in self._solindices)
       
   420         else:
       
   421             var = getattr(term, 'variable', term)
       
   422             sources = [source for source, varobjs in self._sourcesvars.iteritems()
       
   423                        if var in varobjs]
       
   424             return set((source, solindex) for source in sources
       
   425                        for solindex in self._sourcesvars[source][var])
       
   426 
       
   427     def _remove_sources(self, var, sources):
       
   428         """removes invalid sources (`sources`) from `sourcesvars`
       
   429 
       
   430         :param sources: the list of sources to remove
       
   431         :param var: the analyzed variable
       
   432         """
       
   433         sourcesvars = self._sourcesvars
       
   434         for source, solindex in sources:
       
   435             try:
       
   436                 sourcesvars[source][var].remove(solindex)
       
   437             except KeyError:
       
   438                 return # may occur with subquery column alias
       
   439             if not sourcesvars[source][var]:
       
   440                 del sourcesvars[source][var]
       
   441                 if not sourcesvars[source]:
       
   442                     del sourcesvars[source]
       
   443 
       
   444     def part_steps(self):
       
   445         """precompute necessary part steps before generating actual rql for
       
   446         each step. This is necessary to know if an aggregate step will be
       
   447         necessary or not.
       
   448         """
       
   449         steps = []
       
   450         select = self.rqlst
       
   451         rschema = self.plan.schema.rschema
       
   452         for source in self.part_sources:
       
   453             sourcevars = self._sourcesvars[source]
       
   454             while sourcevars:
       
   455                 # take a variable randomly, and all variables supporting the
       
   456                 # same solutions
       
   457                 var, solindices = self._choose_var(sourcevars)
       
   458                 if source.uri == 'system':
       
   459                     # ensure all variables are available for the latest step
       
   460                     # (missing one will be available from temporary tables
       
   461                     # of previous steps)
       
   462                     scope = select
       
   463                     variables = scope.defined_vars.values() + scope.aliases.values()
       
   464                     sourcevars.clear()
       
   465                 else:
       
   466                     scope = var.scope
       
   467                     variables = self._expand_vars(var, sourcevars, scope, solindices)
       
   468                     if not sourcevars:
       
   469                         del self._sourcesvars[source]
       
   470                 # find which sources support the same variables/solutions
       
   471                 sources = self._expand_sources(source, variables, solindices)
       
   472                 # suppose this is a final step until the contrary is proven
       
   473                 final = scope is select
       
   474                 # set of variables which should be additionaly selected when
       
   475                 # possible
       
   476                 needsel = set()
       
   477                 # add attribute variables and mark variables which should be
       
   478                 # additionaly selected when possible
       
   479                 for var in select.defined_vars.itervalues():
       
   480                     if not var in variables:
       
   481                         stinfo = var.stinfo
       
   482                         for ovar, rtype in stinfo['attrvars']:
       
   483                             if ovar in variables:
       
   484                                 needsel.add(var.name)
       
   485                                 variables.append(var)
       
   486                                 break
       
   487                         else:
       
   488                             needsel.add(var.name)
       
   489                             final = False
       
   490                 if final and source.uri != 'system':
       
   491                     # check rewritten constants
       
   492                     for vconsts in select.stinfo['rewritten'].itervalues():
       
   493                         const = vconsts[0]
       
   494                         eid = const.eval(self.plan.args)
       
   495                         _source = self._session.source_from_eid(eid)
       
   496                         if len(sources) > 1 or not _source in sources:
       
   497                             # if constant is only used by an identity relation,
       
   498                             # skip
       
   499                             for c in vconsts:
       
   500                                 rel = c.relation()
       
   501                                 if rel is None or not rel.neged(strict=True):
       
   502                                     final = False
       
   503                                     break
       
   504                             break
       
   505                 # check where all relations are supported by the sources
       
   506                 for rel in scope.iget_nodes(Relation):
       
   507                     if rel.is_types_restriction():
       
   508                         continue
       
   509                     # take care not overwriting the existing "source" identifier
       
   510                     for _source in sources:
       
   511                         if not _source.support_relation(rel.r_type):
       
   512                             for vref in rel.iget_nodes(VariableRef):
       
   513                                 needsel.add(vref.name)
       
   514                             final = False
       
   515                             break
       
   516                     else:
       
   517                         if not scope is select:
       
   518                             self._exists_relation(rel, variables, needsel)
       
   519                         # if relation is supported by all sources and some of
       
   520                         # its lhs/rhs variable isn't in "variables", and the
       
   521                         # other end *is* in "variables", mark it have to be
       
   522                         # selected
       
   523                         if source.uri != 'system' and not rschema(rel.r_type).is_final():
       
   524                             lhs, rhs = rel.get_variable_parts()
       
   525                             try:
       
   526                                 lhsvar = lhs.variable
       
   527                             except AttributeError:
       
   528                                 lhsvar = lhs
       
   529                             try:
       
   530                                 rhsvar = rhs.variable
       
   531                             except AttributeError:
       
   532                                 rhsvar = rhs
       
   533                             if lhsvar in variables and not rhsvar in variables:
       
   534                                 needsel.add(lhsvar.name)
       
   535                             elif rhsvar in variables and not lhsvar in variables:
       
   536                                 needsel.add(rhsvar.name)
       
   537                 if final:
       
   538                     self._cleanup_sourcesvars(sources, solindices)
       
   539                 # XXX rename: variables may contain Relation and Constant nodes...
       
   540                 steps.append( (sources, variables, solindices, scope, needsel,
       
   541                                final) )
       
   542         return steps
       
   543 
       
   544     def _exists_relation(self, rel, variables, needsel):
       
   545         rschema = self.plan.schema.rschema(rel.r_type)
       
   546         lhs, rhs = rel.get_variable_parts()
       
   547         try:
       
   548             lhsvar, rhsvar = lhs.variable, rhs.variable
       
   549         except AttributeError:
       
   550             pass
       
   551         else:
       
   552             # supported relation with at least one end supported, check the
       
   553             # other end is in as well. If not this usually means the
       
   554             # variable is refed by an outer scope and should be substituted
       
   555             # using an 'identity' relation (else we'll get a conflict of
       
   556             # temporary tables)
       
   557             if rhsvar in variables and not lhsvar in variables:
       
   558                 self._identity_substitute(rel, lhsvar, variables, needsel)
       
   559             elif lhsvar in variables and not rhsvar in variables:
       
   560                 self._identity_substitute(rel, rhsvar, variables, needsel)
       
   561 
       
   562     def _identity_substitute(self, relation, var, variables, needsel):
       
   563         newvar = self._insert_identity_variable(relation.scope, var)
       
   564         if newvar is not None:
       
   565             # ensure relation is using '=' operator, else we rely on a
       
   566             # sqlgenerator side effect (it won't insert an inequality operator
       
   567             # in this case)
       
   568             relation.children[1].operator = '=' 
       
   569             variables.append(newvar)
       
   570             needsel.add(newvar.name)
       
   571             #self.insertedvars.append((var.name, self.schema['identity'],
       
   572             #                          newvar.name))
       
   573         
       
   574     def _choose_var(self, sourcevars):
       
   575         secondchoice = None
       
   576         if len(self._sourcesvars) > 1:
       
   577             # priority to variable from subscopes
       
   578             for var in sourcevars:
       
   579                 if not var.scope is self.rqlst:
       
   580                     if isinstance(var, Variable):
       
   581                         return var, sourcevars.pop(var)
       
   582                     secondchoice = var
       
   583         else:
       
   584             # priority to variable outer scope
       
   585             for var in sourcevars:
       
   586                 if var.scope is self.rqlst:
       
   587                     if isinstance(var, Variable):
       
   588                         return var, sourcevars.pop(var)
       
   589                     secondchoice = var
       
   590         if secondchoice is not None:
       
   591             return secondchoice, sourcevars.pop(secondchoice)
       
   592         # priority to variable
       
   593         for var in sourcevars:
       
   594             if isinstance(var, Variable):
       
   595                 return var, sourcevars.pop(var)
       
   596         # whatever
       
   597         var = iter(sourcevars).next()
       
   598         return var, sourcevars.pop(var)
       
   599             
       
   600     def _expand_vars(self, var, sourcevars, scope, solindices):
       
   601         variables = [var]
       
   602         nbunlinked = 1
       
   603         linkedvars = self._linkedvars
       
   604         # variable has to belong to the same scope if there is more
       
   605         # than the system source remaining
       
   606         if len(self._sourcesvars) > 1 and not scope is self.rqlst:
       
   607             candidates = (v for v in sourcevars.keys() if scope is v.scope)
       
   608         else:
       
   609             candidates = sourcevars #.iterkeys()
       
   610         candidates = [v for v in candidates
       
   611                       if isinstance(v, Constant) or
       
   612                       (solindices.issubset(sourcevars[v]) and v in linkedvars)]
       
   613         # repeat until no variable can't be added, since addition of a new
       
   614         # variable may permit to another one to be added
       
   615         modified = True
       
   616         while modified and candidates:
       
   617             modified = False
       
   618             for var in candidates[:]:
       
   619                 # we only want one unlinked variable in each generated query
       
   620                 if isinstance(var, Constant) or \
       
   621                        any(v for v in variables if v in linkedvars[var]):
       
   622                     variables.append(var)
       
   623                     # constant nodes should be systematically deleted
       
   624                     if isinstance(var, Constant):
       
   625                         del sourcevars[var]
       
   626                     # variable nodes should be deleted once all possible solution
       
   627                     # indices have been consumed
       
   628                     else:
       
   629                         sourcevars[var] -= solindices
       
   630                         if not sourcevars[var]:
       
   631                             del sourcevars[var]
       
   632                     candidates.remove(var)
       
   633                     modified = True
       
   634         return variables
       
   635     
       
   636     def _expand_sources(self, selected_source, vars, solindices):
       
   637         sources = [selected_source]
       
   638         sourcesvars = self._sourcesvars
       
   639         for source in sourcesvars:
       
   640             if source is selected_source:
       
   641                 continue
       
   642             for var in vars:
       
   643                 if not (var in sourcesvars[source] and 
       
   644                         solindices.issubset(sourcesvars[source][var])):
       
   645                     break
       
   646             else:
       
   647                 sources.append(source)
       
   648                 if source.uri != 'system':
       
   649                     for var in vars:
       
   650                         varsolindices = sourcesvars[source][var]
       
   651                         varsolindices -= solindices
       
   652                         if not varsolindices:
       
   653                             del sourcesvars[source][var]
       
   654                 
       
   655         return sources
       
   656     
       
   657     def _cleanup_sourcesvars(self, sources, solindices):
       
   658         """on final parts, remove solutions so we know they are already processed"""
       
   659         for source in sources:
       
   660             try:
       
   661                 sourcevar = self._sourcesvars[source]
       
   662             except KeyError:
       
   663                 continue
       
   664             for var, varsolindices in sourcevar.items():
       
   665                 varsolindices -= solindices
       
   666                 if not varsolindices:
       
   667                     del sourcevar[var]
       
   668                     
       
   669     def merge_input_maps(self, allsolindices):
       
   670         """inputmaps is a dictionary with tuple of solution indices as key with an
       
   671         associateed input map as value. This function compute for each solution 
       
   672         its necessary input map and return them grouped
       
   673 
       
   674         ex:
       
   675         inputmaps = {(0, 1, 2): {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1'},
       
   676                      (1,): {'X': 't2.C0', 'T': 't2.C1'}}
       
   677         return : [([1],  {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1',
       
   678                            'X': 't2.C0', 'T': 't2.C1'}),                   
       
   679                   ([0,2], {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1'})]
       
   680         """
       
   681         if not self._inputmaps:
       
   682             return [(allsolindices, None)]
       
   683         mapbysol = {}
       
   684         # compute a single map for each solution
       
   685         for solindices, basemap in self._inputmaps.iteritems():
       
   686             for solindex in solindices:
       
   687                 solmap = mapbysol.setdefault(solindex, {})
       
   688                 solmap.update(basemap)
       
   689                 try:
       
   690                     allsolindices.remove(solindex)
       
   691                 except KeyError:
       
   692                     continue # already removed
       
   693         # group results by identical input map
       
   694         result = []
       
   695         for solindex, solmap in mapbysol.iteritems():
       
   696             for solindices, commonmap in result:
       
   697                 if commonmap == solmap:
       
   698                     solindices.append(solindex)
       
   699                     break
       
   700             else:
       
   701                 result.append( ([solindex], solmap) )
       
   702         if allsolindices:
       
   703             result.append( (list(allsolindices), None) )
       
   704         return result
       
   705 
       
   706     def build_final_part(self, select, solindices, inputmap,  sources,
       
   707                          insertedvars):
       
   708         plan = self.plan
       
   709         rqlst = plan.finalize(select, [self._solutions[i] for i in solindices],
       
   710                               insertedvars)
       
   711         if self.temptable is None and self.finaltable is None:
       
   712             return OneFetchStep(plan, rqlst, sources, inputmap=inputmap)
       
   713         table = self.temptable or self.finaltable
       
   714         return FetchStep(plan, rqlst, sources, table, True, inputmap)
       
   715 
       
   716     def build_non_final_part(self, select, solindices, sources, insertedvars,
       
   717                              table):
       
   718         """non final step, will have to store results in a temporary table"""
       
   719         plan = self.plan
       
   720         rqlst = plan.finalize(select, [self._solutions[i] for i in solindices],
       
   721                               insertedvars)
       
   722         step = FetchStep(plan, rqlst, sources, table, False)
       
   723         # update input map for following steps, according to processed solutions
       
   724         inputmapkey = tuple(sorted(solindices))
       
   725         inputmap = self._inputmaps.setdefault(inputmapkey, {})
       
   726         inputmap.update(step.outputmap)
       
   727         plan.add_step(step)
       
   728 
       
   729 
       
   730 class MSPlanner(SSPlanner):
       
   731     """MultiSourcesPlanner: build execution plan for rql queries
       
   732 
       
   733     decompose the RQL query according to sources'schema
       
   734     """
       
   735         
       
   736     def build_select_plan(self, plan, rqlst):
       
   737         """build execution plan for a SELECT RQL query
       
   738                
       
   739         the rqlst should not be tagged at this point
       
   740         """
       
   741         if server.DEBUG:
       
   742             print '-'*80
       
   743             print 'PLANNING', rqlst
       
   744         for select in rqlst.children:
       
   745             if len(select.solutions) > 1:
       
   746                 hasmultiplesols = True
       
   747                 break
       
   748         else:
       
   749             hasmultiplesols = False
       
   750         # preprocess deals with security insertion and returns a new syntax tree
       
   751         # which have to be executed to fulfill the query: according
       
   752         # to permissions for variable's type, different rql queries may have to
       
   753         # be executed
       
   754         plan.preprocess(rqlst)
       
   755         ppis = [PartPlanInformation(plan, select, self.rqlhelper)
       
   756                 for select in rqlst.children]
       
   757         steps = self._union_plan(plan, rqlst, ppis)
       
   758         if server.DEBUG:
       
   759             from pprint import pprint
       
   760             for step in plan.steps:
       
   761                 pprint(step.test_repr())
       
   762             pprint(steps[0].test_repr())
       
   763         return steps
       
   764 
       
   765     def _ppi_subqueries(self, ppi):
       
   766         # part plan info for subqueries
       
   767         plan = ppi.plan
       
   768         inputmap = {}
       
   769         for subquery in ppi.rqlst.with_[:]:
       
   770             sppis = [PartPlanInformation(plan, select)
       
   771                      for select in subquery.query.children]
       
   772             for sppi in sppis:
       
   773                 if sppi.needsplit or sppi.part_sources != ppi.part_sources:
       
   774                     temptable = 'T%s' % make_uid(id(subquery))
       
   775                     sstep = self._union_plan(plan, subquery.query, sppis, temptable)[0]
       
   776                     break
       
   777             else:
       
   778                 sstep = None
       
   779             if sstep is not None:
       
   780                 ppi.rqlst.with_.remove(subquery)
       
   781                 for i, colalias in enumerate(subquery.aliases):
       
   782                     inputmap[colalias.name] = '%s.C%s' % (temptable, i)
       
   783                 ppi.plan.add_step(sstep)
       
   784         return inputmap
       
   785     
       
   786     def _union_plan(self, plan, union, ppis, temptable=None):
       
   787         tosplit, cango, allsources = [], {}, set()
       
   788         for planinfo in ppis:
       
   789             if planinfo.needsplit:
       
   790                 tosplit.append(planinfo)
       
   791             else:
       
   792                 cango.setdefault(planinfo.part_sources, []).append(planinfo)
       
   793             for source in planinfo.part_sources:
       
   794                 allsources.add(source)
       
   795         # first add steps for query parts which doesn't need to splitted
       
   796         steps = []
       
   797         for sources, cppis in cango.iteritems():
       
   798             byinputmap = {}
       
   799             for ppi in cppis:
       
   800                 select = ppi.rqlst
       
   801                 if sources != (plan.session.repo.system_source,):
       
   802                     add_types_restriction(self.schema, select)
       
   803                 # part plan info for subqueries
       
   804                 inputmap = self._ppi_subqueries(ppi)
       
   805                 aggrstep = need_aggr_step(select, sources)
       
   806                 if aggrstep:
       
   807                     atemptable = 'T%s' % make_uid(id(select))
       
   808                     sunion = Union()
       
   809                     sunion.append(select)
       
   810                     selected = select.selection[:]
       
   811                     select_group_sort(select)
       
   812                     step = AggrStep(plan, selected, select, atemptable, temptable)
       
   813                     step.set_limit_offset(select.limit, select.offset)
       
   814                     select.limit = None
       
   815                     select.offset = 0
       
   816                     fstep = FetchStep(plan, sunion, sources, atemptable, True, inputmap)
       
   817                     step.children.append(fstep)
       
   818                     steps.append(step)
       
   819                 else:
       
   820                     byinputmap.setdefault(tuple(inputmap.iteritems()), []).append( (select) )
       
   821             for inputmap, queries in byinputmap.iteritems():
       
   822                 inputmap = dict(inputmap)
       
   823                 sunion = Union()
       
   824                 for select in queries:
       
   825                     sunion.append(select)
       
   826                 if temptable:
       
   827                     steps.append(FetchStep(plan, sunion, sources, temptable, True, inputmap))
       
   828                 else:
       
   829                     steps.append(OneFetchStep(plan, sunion, sources, inputmap))
       
   830         # then add steps for splitted query parts
       
   831         for planinfo in tosplit:
       
   832             steps.append(self.split_part(planinfo, temptable))
       
   833         if len(steps) > 1:
       
   834             if temptable:
       
   835                 step = UnionFetchStep(plan)
       
   836             else:
       
   837                 step = UnionStep(plan)
       
   838             step.children = steps
       
   839             return (step,)
       
   840         return steps
       
   841 
       
   842     # internal methods for multisources decomposition #########################
       
   843     
       
   844     def split_part(self, ppi, temptable):
       
   845         ppi.finaltable = temptable
       
   846         plan = ppi.plan
       
   847         select = ppi.rqlst
       
   848         subinputmap = self._ppi_subqueries(ppi)
       
   849         stepdefs = ppi.part_steps()
       
   850         if need_aggr_step(select, ppi.part_sources, stepdefs):
       
   851             atemptable = 'T%s' % make_uid(id(select))
       
   852             selection = select.selection[:]
       
   853             select_group_sort(select)
       
   854         else:
       
   855             atemptable = None
       
   856             selection = select.selection
       
   857         ppi.temptable = atemptable
       
   858         vfilter = VariablesFiltererVisitor(self.schema, ppi)
       
   859         steps = []
       
   860         for sources, variables, solindices, scope, needsel, final in stepdefs:
       
   861             # extract an executable query using only the specified variables
       
   862             if sources[0].uri == 'system':
       
   863                 # in this case we have to merge input maps before call to
       
   864                 # filter so already processed restriction are correctly
       
   865                 # removed
       
   866                 solsinputmaps = ppi.merge_input_maps(solindices)
       
   867                 for solindices, inputmap in solsinputmaps:
       
   868                     minrqlst, insertedvars = vfilter.filter(
       
   869                         sources, variables, scope, set(solindices), needsel, final)
       
   870                     if inputmap is None:
       
   871                         inputmap = subinputmap
       
   872                     else:
       
   873                         inputmap.update(subinputmap)
       
   874                     steps.append(ppi.build_final_part(minrqlst, solindices, inputmap,
       
   875                                                       sources, insertedvars))
       
   876             else:
       
   877                 # this is a final part (i.e. retreiving results for the
       
   878                 # original query part) if all variable / sources have been
       
   879                 # treated or if this is the last shot for used solutions
       
   880                 minrqlst, insertedvars = vfilter.filter(
       
   881                     sources, variables, scope, solindices, needsel, final)
       
   882                 if final:
       
   883                     solsinputmaps = ppi.merge_input_maps(solindices)
       
   884                     for solindices, inputmap in solsinputmaps:
       
   885                         if inputmap is None:
       
   886                             inputmap = subinputmap
       
   887                         else:
       
   888                             inputmap.update(subinputmap)
       
   889                         steps.append(ppi.build_final_part(minrqlst, solindices, inputmap,
       
   890                                                   sources, insertedvars))
       
   891                 else:
       
   892                     table = '_T%s%s' % (''.join(sorted(v._ms_table_key() for v in variables)),
       
   893                                         ''.join(sorted(str(i) for i in solindices)))
       
   894                     ppi.build_non_final_part(minrqlst, solindices, sources,
       
   895                                              insertedvars, table)
       
   896         # finally: join parts, deal with aggregat/group/sorts if necessary
       
   897         if atemptable is not None:
       
   898             step = AggrStep(plan, selection, select, atemptable, temptable)
       
   899             step.children = steps
       
   900         elif len(steps) > 1:
       
   901             if temptable:
       
   902                 step = UnionFetchStep(plan)
       
   903             else:
       
   904                 step = UnionStep(plan)
       
   905             step.children = steps
       
   906         else:
       
   907             step = steps[0]
       
   908         if select.limit is not None or select.offset:
       
   909             step.set_limit_offset(select.limit, select.offset)
       
   910         return step
       
   911 
       
   912     
       
   913 class UnsupportedBranch(Exception):
       
   914     pass
       
   915 
       
   916 
       
   917 class VariablesFiltererVisitor(object):
       
   918     def __init__(self, schema, ppi):
       
   919         self.schema = schema
       
   920         self.ppi = ppi
       
   921         self.skip = {}
       
   922         self.hasaggrstep = self.ppi.temptable
       
   923         self.extneedsel = frozenset(vref.name for sortterm in ppi.rqlst.orderby
       
   924                                     for vref in sortterm.iget_nodes(VariableRef))
       
   925         
       
   926     def _rqlst_accept(self, rqlst, node, newroot, variables, setfunc=None):
       
   927         try:
       
   928             newrestr, node_ = node.accept(self, newroot, variables[:])
       
   929         except UnsupportedBranch:
       
   930             return rqlst
       
   931         if setfunc is not None and newrestr is not None:
       
   932             setfunc(newrestr)
       
   933         if not node_ is node:
       
   934             rqlst = node.parent
       
   935         return rqlst
       
   936 
       
   937     def filter(self, sources, variables, rqlst, solindices, needsel, final):
       
   938         if server.DEBUG:
       
   939             print 'filter', final and 'final' or '', sources, variables, rqlst, solindices, needsel
       
   940         newroot = Select()
       
   941         self.sources = sources
       
   942         self.solindices = solindices
       
   943         self.final = final
       
   944         # variables which appear in unsupported branches
       
   945         needsel |= self.extneedsel
       
   946         self.needsel = needsel
       
   947         # variables which appear in supported branches
       
   948         self.mayneedsel = set()
       
   949         # new inserted variables
       
   950         self.insertedvars = []
       
   951         # other structures (XXX document)
       
   952         self.mayneedvar, self.hasvar = {}, {}
       
   953         self.use_only_defined = False
       
   954         self.scopes = {rqlst: newroot}
       
   955         if rqlst.where:
       
   956             rqlst = self._rqlst_accept(rqlst, rqlst.where, newroot, variables,
       
   957                                        newroot.set_where)
       
   958         if isinstance(rqlst, Select):
       
   959             self.use_only_defined = True
       
   960             if rqlst.groupby:
       
   961                 groupby = []
       
   962                 for node in rqlst.groupby:
       
   963                     rqlst = self._rqlst_accept(rqlst, node, newroot, variables,
       
   964                                                groupby.append)
       
   965                 if groupby:
       
   966                     newroot.set_groupby(groupby)
       
   967             if rqlst.having:
       
   968                 having = []
       
   969                 for node in rqlst.having:
       
   970                     rqlst = self._rqlst_accept(rqlst, node, newroot, variables,
       
   971                                                having.append)
       
   972                 if having:
       
   973                     newroot.set_having(having)
       
   974             if final and rqlst.orderby and not self.hasaggrstep:
       
   975                 orderby = []
       
   976                 for node in rqlst.orderby:
       
   977                     rqlst = self._rqlst_accept(rqlst, node, newroot, variables,
       
   978                                                orderby.append)
       
   979                 if orderby:
       
   980                     newroot.set_orderby(orderby)
       
   981             self.process_selection(newroot, variables, rqlst)
       
   982         elif not newroot.where:
       
   983             # no restrictions have been copied, just select variables and add
       
   984             # type restriction (done later by add_types_restriction)
       
   985             for v in variables:
       
   986                 if not isinstance(v, Variable):
       
   987                     continue
       
   988                 newroot.append_selected(VariableRef(newroot.get_variable(v.name)))
       
   989         solutions = self.ppi.copy_solutions(solindices)
       
   990         cleanup_solutions(newroot, solutions)
       
   991         newroot.set_possible_types(solutions)
       
   992         if final:
       
   993             if self.hasaggrstep:
       
   994                 self.add_necessary_selection(newroot, self.mayneedsel & self.extneedsel)
       
   995             newroot.distinct = rqlst.distinct
       
   996         else:
       
   997             self.add_necessary_selection(newroot, self.mayneedsel & self.needsel)
       
   998             # insert vars to fetch constant values when needed
       
   999             for (varname, rschema), reldefs in self.mayneedvar.iteritems():
       
  1000                 for rel, ored in reldefs:
       
  1001                     if not (varname, rschema) in self.hasvar:
       
  1002                         self.hasvar[(varname, rschema)] = None # just to avoid further insertion
       
  1003                         cvar = newroot.make_variable()
       
  1004                         for sol in newroot.solutions:
       
  1005                             sol[cvar.name] = rschema.objects(sol[varname])[0]
       
  1006                         # if the current restriction is not used in a OR branch,
       
  1007                         # we can keep it, else we have to drop the constant
       
  1008                         # restriction (or we may miss some results)
       
  1009                         if not ored:
       
  1010                             rel = rel.copy(newroot)
       
  1011                             newroot.add_restriction(rel)
       
  1012                         # add a relation to link the variable
       
  1013                         newroot.remove_node(rel.children[1])
       
  1014                         cmp = Comparison('=')
       
  1015                         rel.append(cmp)
       
  1016                         cmp.append(VariableRef(cvar))
       
  1017                         self.insertedvars.append((varname, rschema, cvar.name))
       
  1018                         newroot.append_selected(VariableRef(newroot.get_variable(cvar.name)))
       
  1019                         # NOTE: even if the restriction is done by this query, we have
       
  1020                         # to let it in the original rqlst so that it appears anyway in
       
  1021                         # the "final" query, else we may change the meaning of the query
       
  1022                         # if there are NOT somewhere :
       
  1023                         # 'NOT X relation Y, Y name "toto"' means X WHERE X isn't related
       
  1024                         # to Y whose name is toto while
       
  1025                         # 'NOT X relation Y' means X WHERE X has no 'relation' (whatever Y)
       
  1026                     elif ored:
       
  1027                         newroot.remove_node(rel)
       
  1028         add_types_restriction(self.schema, rqlst, newroot, solutions)
       
  1029         if server.DEBUG:
       
  1030             print '--->', newroot
       
  1031         return newroot, self.insertedvars
       
  1032         
       
  1033     def visit_and(self, node, newroot, variables):
       
  1034         subparts = []
       
  1035         for i in xrange(len(node.children)):
       
  1036             child = node.children[i]
       
  1037             try:
       
  1038                 newchild, child_ = child.accept(self, newroot, variables)
       
  1039                 if not child_ is child:
       
  1040                     node = child_.parent
       
  1041                 if newchild is None:
       
  1042                     continue
       
  1043                 subparts.append(newchild)
       
  1044             except UnsupportedBranch:
       
  1045                 continue
       
  1046         if not subparts:
       
  1047             return None, node
       
  1048         if len(subparts) == 1:
       
  1049             return subparts[0], node
       
  1050         return copy_node(newroot, node, subparts), node
       
  1051 
       
  1052     visit_or = visit_and
       
  1053 
       
  1054     def _relation_supported(self, rtype):
       
  1055         for source in self.sources:
       
  1056             if not source.support_relation(rtype):
       
  1057                 return False
       
  1058         return True
       
  1059         
       
  1060     def visit_relation(self, node, newroot, variables):
       
  1061         if not node.is_types_restriction():
       
  1062             if node in self.skip and self.solindices.issubset(self.skip[node]):
       
  1063                 if not self.schema.rschema(node.r_type).is_final():
       
  1064                     # can't really skip the relation if one variable is selected and only
       
  1065                     # referenced by this relation
       
  1066                     for vref in node.iget_nodes(VariableRef):
       
  1067                         stinfo = vref.variable.stinfo
       
  1068                         if stinfo['selected'] and len(stinfo['relations']) == 1:
       
  1069                             break
       
  1070                     else:
       
  1071                         return None, node
       
  1072                 else:
       
  1073                     return None, node
       
  1074             if not self._relation_supported(node.r_type):
       
  1075                 raise UnsupportedBranch()
       
  1076         # don't copy type restriction unless this is the only relation for the
       
  1077         # rhs variable, else they'll be reinserted later as needed (else we may
       
  1078         # copy a type restriction while the variable is not actually used)
       
  1079         elif not any(self._relation_supported(rel.r_type)
       
  1080                      for rel in node.children[0].variable.stinfo['relations']):
       
  1081             rel, node = self.visit_default(node, newroot, variables)
       
  1082             return rel, node
       
  1083         else:
       
  1084             raise UnsupportedBranch()
       
  1085         rschema = self.schema.rschema(node.r_type)
       
  1086         res = self.visit_default(node, newroot, variables)[0]
       
  1087         ored = node.ored()
       
  1088         if rschema.is_final() or rschema.inlined:
       
  1089             vrefs = node.children[1].get_nodes(VariableRef)
       
  1090             if not vrefs:
       
  1091                 if not ored:
       
  1092                     self.skip.setdefault(node, set()).update(self.solindices)
       
  1093                 else:
       
  1094                     self.mayneedvar.setdefault((node.children[0].name, rschema), []).append( (res, ored) )
       
  1095                     
       
  1096             else:
       
  1097                 assert len(vrefs) == 1
       
  1098                 vref = vrefs[0]
       
  1099                 # XXX check operator ?
       
  1100                 self.hasvar[(node.children[0].name, rschema)] = vref
       
  1101                 if self._may_skip_attr_rel(rschema, node, vref, ored, variables, res):
       
  1102                     self.skip.setdefault(node, set()).update(self.solindices)
       
  1103         elif not ored:
       
  1104             self.skip.setdefault(node, set()).update(self.solindices)
       
  1105         return res, node
       
  1106 
       
  1107     def _may_skip_attr_rel(self, rschema, rel, vref, ored, variables, res):
       
  1108         var = vref.variable
       
  1109         if ored:
       
  1110             return False
       
  1111         if var.name in self.extneedsel or var.stinfo['selected']:
       
  1112             return False
       
  1113         if not same_scope(var):
       
  1114             return False
       
  1115         if any(v for v,_ in var.stinfo['attrvars'] if not v.name in variables):
       
  1116             return False
       
  1117         return True
       
  1118         
       
  1119     def visit_exists(self, node, newroot, variables):
       
  1120         newexists = node.__class__()
       
  1121         self.scopes = {node: newexists}
       
  1122         subparts, node = self._visit_children(node, newroot, variables)
       
  1123         if not subparts:
       
  1124             return None, node
       
  1125         newexists.set_where(subparts[0])
       
  1126         return newexists, node
       
  1127     
       
  1128     def visit_not(self, node, newroot, variables):
       
  1129         subparts, node = self._visit_children(node, newroot, variables)
       
  1130         if not subparts:
       
  1131             return None, node
       
  1132         return copy_node(newroot, node, subparts), node
       
  1133     
       
  1134     def visit_group(self, node, newroot, variables):
       
  1135         if not self.final:
       
  1136             return None, node
       
  1137         return self.visit_default(node, newroot, variables)
       
  1138             
       
  1139     def visit_variableref(self, node, newroot, variables):
       
  1140         if self.use_only_defined:
       
  1141             if not node.variable.name in newroot.defined_vars:
       
  1142                 raise UnsupportedBranch(node.name)
       
  1143         elif not node.variable in variables:
       
  1144             raise UnsupportedBranch(node.name)
       
  1145         self.mayneedsel.add(node.name)
       
  1146         # set scope so we can insert types restriction properly
       
  1147         newvar = newroot.get_variable(node.name)
       
  1148         newvar.stinfo['scope'] = self.scopes.get(node.variable.scope, newroot)
       
  1149         return VariableRef(newvar), node
       
  1150 
       
  1151     def visit_constant(self, node, newroot, variables):
       
  1152         return copy_node(newroot, node), node
       
  1153     
       
  1154     def visit_default(self, node, newroot, variables):
       
  1155         subparts, node = self._visit_children(node, newroot, variables)
       
  1156         return copy_node(newroot, node, subparts), node
       
  1157         
       
  1158     visit_comparison = visit_mathexpression = visit_constant = visit_function = visit_default
       
  1159     visit_sort = visit_sortterm = visit_default
       
  1160     
       
  1161     def _visit_children(self, node, newroot, variables):
       
  1162         subparts = []
       
  1163         for i in xrange(len(node.children)):
       
  1164             child = node.children[i]
       
  1165             newchild, child_ = child.accept(self, newroot, variables)
       
  1166             if not child is child_:
       
  1167                 node = child_.parent
       
  1168             if newchild is not None:
       
  1169                 subparts.append(newchild)
       
  1170         return subparts, node
       
  1171     
       
  1172     def process_selection(self, newroot, variables, rqlst):
       
  1173         if self.final:
       
  1174             for term in rqlst.selection:
       
  1175                 newroot.append_selected(term.copy(newroot))
       
  1176                 for vref in term.get_nodes(VariableRef):
       
  1177                     self.needsel.add(vref.name)
       
  1178             return 
       
  1179         for term in rqlst.selection:
       
  1180             vrefs = term.get_nodes(VariableRef)
       
  1181             if vrefs:
       
  1182                 supportedvars = []
       
  1183                 for vref in vrefs:
       
  1184                     var = vref.variable
       
  1185                     if var in variables:
       
  1186                         supportedvars.append(vref)
       
  1187                         continue
       
  1188                     else:
       
  1189                         self.needsel.add(vref.name)
       
  1190                         break
       
  1191                 else:
       
  1192                     for vref in vrefs:
       
  1193                         newroot.append_selected(vref.copy(newroot))
       
  1194                     supportedvars = []
       
  1195                 for vref in supportedvars:
       
  1196                     if not vref in newroot.get_selected_variables():
       
  1197                         newroot.append_selected(VariableRef(newroot.get_variable(vref.name)))
       
  1198             
       
  1199     def add_necessary_selection(self, newroot, variables):
       
  1200         selected = tuple(newroot.get_selected_variables())
       
  1201         for varname in variables:
       
  1202             var = newroot.defined_vars[varname]
       
  1203             for vref in var.references():
       
  1204                 rel = vref.relation()
       
  1205                 if rel is None and vref in selected:
       
  1206                     # already selected
       
  1207                     break
       
  1208             else:
       
  1209                 selvref = VariableRef(var)
       
  1210                 newroot.append_selected(selvref)
       
  1211                 if newroot.groupby:
       
  1212                     newroot.add_group_var(VariableRef(selvref.variable, noautoref=1))