server/msplanner.py
changeset 1228 91ae10ffb611
parent 1124 5548b608b7d9
child 1229 dd9bdcfc03b6
equal deleted inserted replaced
1219:054bb575c013 1228:91ae10ffb611
     1 """plan execution of rql queries on multiple sources
     1 """plan execution of rql queries on multiple sources
     2 
     2 
     3 the best way to understand what are we trying to acheive here is to read
     3 the best way to understand what are we trying to acheive here is to read the
     4 the unit-tests in unittest_querier_planner.py
     4 unit-tests in unittest_msplanner.py
     5 
     5 
     6 
     6 
     7 
     7 What you need to know
     8 Split and execution specifications
     8 ~~~~~~~~~~~~~~~~~~~~~
     9 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     9 1. The system source is expected  to support every entity and relation types
       
    10 
       
    11 2. Given "X relation Y":
       
    12 
       
    13    * if relation, X and Y types are supported by the external source, we suppose
       
    14      by default that X and Y should both come from the same source as the
       
    15      relation. You can specify otherwise by adding relation into the
       
    16      "cross_relations" set in the source's mapping file and it that case, we'll
       
    17      consider that we can also find in the system source some relation between
       
    18      X and Y coming from different sources.
       
    19      
       
    20    * if "relation" isn't supported by the external source but X or Y
       
    21      types (or both) are, we suppose by default that can find in the system
       
    22      source some relation where X and/or Y come from the external source. You
       
    23      can specify otherwise by adding relation into the "dont_cross_relations"
       
    24      set in the source's mapping file and it that case, we'll consider that we
       
    25      can only find in the system source some relation between X and Y coming
       
    26      the system source.
       
    27 
       
    28 
       
    29 Implementation
       
    30 ~~~~~~~~~~~~~~
       
    31 XXX explain algorithm
       
    32 
       
    33 
       
    34 Exemples of multi-sources query execution
       
    35 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    10 For a system source and a ldap user source (only EUser and its attributes
    36 For a system source and a ldap user source (only EUser and its attributes
    11 is supported, no group or such):
    37 is supported, no group or such):
    12 
    38 
    13 
       
    14 :EUser X:
    39 :EUser X:
    15 1. fetch EUser X from both sources and return concatenation of results
    40 1. fetch EUser X from both sources and return concatenation of results
    16 
    41 
    17 
       
    18 :EUser X WHERE X in_group G, G name 'users':
    42 :EUser X WHERE X in_group G, G name 'users':
    19 * catch 1
    43 * catch 1
    20   1. fetch EUser X from both sources, store concatenation of results
    44   1. fetch EUser X from both sources, store concatenation of results into a
    21      into a temporary table
    45      temporary table
    22   2. return the result of TMP X WHERE X in_group G, G name 'users' from
    46   2. return the result of TMP X WHERE X in_group G, G name 'users' from the
    23      the system source
    47      system source
    24      
       
    25 * catch 2
    48 * catch 2
    26   1. return the result of EUser X WHERE X in_group G, G name 'users'
    49   1. return the result of EUser X WHERE X in_group G, G name 'users' from system
    27      from system source, that's enough (optimization of the sql querier
    50      source, that's enough (optimization of the sql querier will avoid join on
    28      will avoid join on EUser, so we will directly get local eids)
    51      EUser, so we will directly get local eids)
    29 
       
    30     
    52     
    31 :EUser X,L WHERE X in_group G, X login L, G name 'users':
    53 :EUser X,L WHERE X in_group G, X login L, G name 'users':
    32 1. fetch Any X,L WHERE X is EUser, X login L from both sources, store
    54 1. fetch Any X,L WHERE X is EUser, X login L from both sources, store
    33    concatenation of results into a temporary table
    55    concatenation of results into a temporary table
    34 2. return the result of Any X, L WHERE X is TMP, X login LX in_group G,
    56 2. return the result of Any X, L WHERE X is TMP, X login LX in_group G,
    35    G name 'users' from the system source
    57    G name 'users' from the system source
    36 
    58 
    37 
    59 
    38 :Any X WHERE X owned_by Y:
    60 :Any X WHERE X owned_by Y:
    39 * catch 1
    61 * catch 1
    40   1. fetch EUser X from both sources, store concatenation of results
    62   1. fetch EUser X from both sources, store concatenation of results into a
    41      into a temporary table
    63      temporary table
    42   2. return the result of Any X WHERE X owned_by Y, Y is TMP from
    64   2. return the result of Any X WHERE X owned_by Y, Y is TMP from the system
    43      the system source
    65      source
    44      
       
    45 * catch 2
    66 * catch 2
    46   1. return the result of Any X WHERE X owned_by Y
    67   1. return the result of Any X WHERE X owned_by Y from system source, that's
    47      from system source, that's enough (optimization of the sql querier
    68      enough (optimization of the sql querier will avoid join on EUser, so we
    48      will avoid join on EUser, so we will directly get local eids)
    69      will directly get local eids)
    49 
    70 
    50 
    71 
    51 :organization: Logilab
    72 :organization: Logilab
    52 :copyright: 2003-2009 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
    73 :copyright: 2003-2009 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
    53 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
    74 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
    64 from rql.nodes import VariableRef, Comparison, Relation, Constant, Exists, Variable
    85 from rql.nodes import VariableRef, Comparison, Relation, Constant, Exists, Variable
    65 
    86 
    66 from cubicweb import server
    87 from cubicweb import server
    67 from cubicweb.common.utils import make_uid
    88 from cubicweb.common.utils import make_uid
    68 from cubicweb.server.utils import cleanup_solutions
    89 from cubicweb.server.utils import cleanup_solutions
    69 from cubicweb.server.ssplanner import SSPlanner, OneFetchStep, add_types_restriction
    90 from cubicweb.server.ssplanner import (SSPlanner, OneFetchStep,
       
    91                                        add_types_restriction)
    70 from cubicweb.server.mssteps import *
    92 from cubicweb.server.mssteps import *
    71 from cubicweb.server.sources import AbstractSource
    93 from cubicweb.server.sources import AbstractSource
    72 
    94 
    73 Variable._ms_table_key = lambda x: x.name
    95 Variable._ms_table_key = lambda x: x.name
    74 Relation._ms_table_key = lambda x: x.r_type
    96 Relation._ms_table_key = lambda x: x.r_type
    75 # str() Constant.value to ensure generated table name won't be unicode
    97 # str() Constant.value to ensure generated table name won't be unicode
    76 Constant._ms_table_key = lambda x: str(x.value)
    98 Constant._ms_table_key = lambda x: str(x.value)
    77 
    99 
    78 AbstractSource.dont_cross_relations = ()
   100 AbstractSource.dont_cross_relations = ()
    79 AbstractSource.cross_relations = ()
   101 AbstractSource.cross_relations = ()
    80 
       
    81 def allequals(solutions):
       
    82     """return true if all solutions are identical"""
       
    83     sol = solutions.next()
       
    84     for sol_ in solutions:
       
    85         if sol_ != sol:
       
    86             return False
       
    87     return True
       
    88     
   102     
    89 def need_aggr_step(select, sources, stepdefs=None):
   103 def need_aggr_step(select, sources, stepdefs=None):
    90     """return True if a temporary table is necessary to store some partial
   104     """return True if a temporary table is necessary to store some partial
    91     results to execute the given query
   105     results to execute the given query
    92     """
   106     """
   112                 has_one_final = True
   126                 has_one_final = True
   113             else:
   127             else:
   114                 fstepsolindices.update(stepdef[2])
   128                 fstepsolindices.update(stepdef[2])
   115     return False
   129     return False
   116 
   130 
       
   131 def select_group_sort(select): # XXX something similar done in rql2sql
       
   132     # add variables used in groups and sort terms to the selection
       
   133     # if necessary
       
   134     if select.groupby:
       
   135         for vref in select.groupby:
       
   136             if not vref in select.selection:
       
   137                 select.append_selected(vref.copy(select))
       
   138     for sortterm in select.orderby:
       
   139         for vref in sortterm.iget_nodes(VariableRef):
       
   140             if not vref in select.get_selected_variables():
       
   141                 # we can't directly insert sortterm.term because it references
       
   142                 # a variable of the select before the copy.
       
   143                 # XXX if constant term are used to define sort, their value
       
   144                 # may necessite a decay
       
   145                 select.append_selected(vref.copy(select))
       
   146                 if select.groupby and not vref in select.groupby:
       
   147                     select.add_group_var(vref.copy(select))
       
   148 
       
   149 def allequals(solutions):
       
   150     """return true if all solutions are identical"""
       
   151     sol = solutions.next()
       
   152     noconstsol = None
       
   153     for sol_ in solutions:
       
   154         if sol_ != sol:
       
   155             return False
       
   156     return True
       
   157 
       
   158 # XXX move functions below to rql ##############################################
       
   159 
       
   160 def is_ancestor(n1, n2):
       
   161     p = n1.parent
       
   162     while p is not None:
       
   163         if p is n2:
       
   164             return True
       
   165         p = p.parent
       
   166     return False
       
   167 
   117 def copy_node(newroot, node, subparts=()):
   168 def copy_node(newroot, node, subparts=()):
   118     newnode = node.__class__(*node.initargs(newroot))
   169     newnode = node.__class__(*node.initargs(newroot))
   119     for part in subparts:
   170     for part in subparts:
   120         newnode.append(part)
   171         newnode.append(part)
   121     return newnode
   172     return newnode
   129             if not rel.scope is var.scope:
   180             if not rel.scope is var.scope:
   130                 var.stinfo['samescope'] = False
   181                 var.stinfo['samescope'] = False
   131                 return False
   182                 return False
   132         var.stinfo['samescope'] = True
   183         var.stinfo['samescope'] = True
   133         return True
   184         return True
   134 
   185     
   135 def select_group_sort(select): # XXX something similar done in rql2sql
   186 ################################################################################
   136     # add variables used in groups and sort terms to the selection
       
   137     # if necessary
       
   138     if select.groupby:
       
   139         for vref in select.groupby:
       
   140             if not vref in select.selection:
       
   141                 select.append_selected(vref.copy(select))
       
   142     for sortterm in select.orderby:
       
   143         for vref in sortterm.iget_nodes(VariableRef):
       
   144             if not vref in select.get_selected_variables():
       
   145                 # we can't directly insert sortterm.term because it references
       
   146                 # a variable of the select before the copy.
       
   147                 # XXX if constant term are used to define sort, their value
       
   148                 # may necessite a decay
       
   149                 select.append_selected(vref.copy(select))
       
   150                 if select.groupby and not vref in select.groupby:
       
   151                     select.add_group_var(vref.copy(select))
       
   152 
       
   153 # XXX move to rql
       
   154 def is_ancestor(n1, n2):
       
   155     p = n1.parent
       
   156     while p is not None:
       
   157         if p is n2:
       
   158             return True
       
   159         p = p.parent
       
   160     return False
       
   161 
   187 
   162 class PartPlanInformation(object):
   188 class PartPlanInformation(object):
   163     """regroups necessary information to execute some part of a "global" rql
   189     """regroups necessary information to execute some part of a "global" rql
   164     query ("global" means as received by the querier, which may result in
   190     query ("global" means as received by the querier, which may result in
   165     several internal queries, e.g. parts, due to security insertions)
   191     several internal queries, e.g. parts, due to security insertions). Actually
   166 
   192     a PPI is created for each subquery and for each query in a union.
   167     it exposes as well some methods helping in executing this part on a
   193 
       
   194     It exposes as well some methods helping in executing this part on a
   168     multi-sources repository, modifying its internal structure during the
   195     multi-sources repository, modifying its internal structure during the
   169     process
   196     process.
   170 
   197 
   171     :attr solutions: a list of mappings (varname -> vartype)
   198     :attr plan:
   172     :attr sourcesvars:
   199       the execution plan
   173       a dictionnary telling for each source which variable/solution are
   200     :attr rqlst:
   174       supported, of the form {source : {varname: [solution index, ]}}
   201       the original rql syntax tree handled by this part
       
   202       
       
   203     :attr needsplit:
       
   204       bool telling if the query has to be split into multiple steps for
       
   205       execution or if it can be executed at once
       
   206       
       
   207     :attr temptable:
       
   208       a SQL temporary table name or None, if necessary to handle aggregate /
       
   209       sorting for this part of the query
       
   210       
       
   211     :attr finaltable:
       
   212       a SQL table name or None, if results for this part of the query should be
       
   213       written into a temporary table (usually shared by multiple PPI)
       
   214       
       
   215     :attr sourcesterms:
       
   216       a dictionary {source : {term: set([solution index, ])}} telling for each
       
   217       source which terms are supported for which solutions. A "term" may be
       
   218       either a rql Variable, Constant or Relation node.
   175     """
   219     """
   176     def __init__(self, plan, rqlst, rqlhelper=None):
   220     def __init__(self, plan, rqlst, rqlhelper=None):
       
   221         self.plan = plan
       
   222         self.rqlst = rqlst
   177         self.needsplit = False
   223         self.needsplit = False
   178         self.temptable = None
   224         self.temptable = None
   179         self.finaltable = None
   225         self.finaltable = None
   180         self.plan = plan
   226         self._schema = plan.schema
   181         self.rqlst = rqlst
       
   182         self._session = plan.session
   227         self._session = plan.session
       
   228         self._repo = self._session.repo
   183         self._solutions = rqlst.solutions
   229         self._solutions = rqlst.solutions
   184         self._solindices = range(len(self._solutions))
   230         self._solindices = range(len(self._solutions))
   185         # source : {var: [solution index, ]}
   231         # source : {term: [solution index, ]}
   186         self.sourcesvars = self._sourcesvars = {}
   232         self.sourcesterms = self._sourcesterms = {}
   187         # source : {relation: set(child variable and constant)}
   233         # source : {relation: set(child variable and constant)}
   188         self._crossrelations = {}
   234         self._crossrelations = {}
   189         # dictionnary of variables which are linked to each other using a non
   235         # dictionary of variables and constants which are linked to each other
   190         # final relation which is supported by multiple sources
   236         # using a non final relation supported by multiple sources (crossed or
   191         self._linkedvars = {}
   237         # not).
   192         self._crosslinkedvars = {}
   238         self._linkedterms = {}
   193         # processing
   239         # processing
   194         self._compute_sourcesvars()
   240         termssources = self._compute_sourcesterms()
   195         self._remove_invalid_sources()
   241         self._remove_invalid_sources(termssources)
   196         self._compute_needsplit()
   242         self._compute_needsplit()
   197         self.sourcesvars = {}
   243         # after initialisation, .sourcesterms contains the same thing as
   198         for k, v in self._sourcesvars.iteritems():
   244         # ._sourcesterms though during plan construction, ._sourcesterms will
   199             self.sourcesvars[k] = {}
   245         # be modified while .sourcesterms will be kept unmodified
       
   246         self.sourcesterms = {}
       
   247         for k, v in self._sourcesterms.iteritems():
       
   248             self.sourcesterms[k] = {}
   200             for k2, v2 in v.iteritems():
   249             for k2, v2 in v.iteritems():
   201                 self.sourcesvars[k][k2] = v2.copy()
   250                 self.sourcesterms[k][k2] = v2.copy()
       
   251         # cleanup linked var
       
   252         for var, linkedrelsinfo in self._linkedterms.iteritems():
       
   253             self._linkedterms[var] = frozenset(x[0] for x in linkedrelsinfo)
       
   254         # map output of a step to input of a following step
   202         self._inputmaps = {}
   255         self._inputmaps = {}
       
   256         # record input map conflicts to resolve them on final step generation
       
   257         self._conflicts = []
   203         if rqlhelper is not None: # else test
   258         if rqlhelper is not None: # else test
   204             self._insert_identity_variable = rqlhelper._annotator.rewrite_shared_optional
   259             self._insert_identity_variable = rqlhelper._annotator.rewrite_shared_optional
       
   260         if server.DEBUG:
       
   261             print 'sourcesterms:'
       
   262             for source, terms in self.sourcesterms.items():
       
   263                 print source, terms
   205             
   264             
   206     def copy_solutions(self, solindices):
   265     def copy_solutions(self, solindices):
   207         return [self._solutions[solidx].copy() for solidx in solindices]
   266         return [self._solutions[solidx].copy() for solidx in solindices]
   208     
   267     
   209     @property
   268     @property
   210     @cached
   269     @cached
   211     def part_sources(self):
   270     def part_sources(self):
   212         if self._sourcesvars:
   271         if self._sourcesterms:
   213             return tuple(sorted(self._sourcesvars))
   272             return tuple(sorted(self._sourcesterms))
   214         return (self._session.repo.system_source,)
   273         return (self._repo.system_source,)
   215     
   274     
   216     @property
   275     @property
   217     @cached
   276     @cached
   218     def _sys_source_set(self):
   277     def _sys_source_set(self):
   219         return frozenset((self._session.repo.system_source, solindex)
   278         return frozenset((self._repo.system_source, solindex)
   220                          for solindex in self._solindices)        
   279                          for solindex in self._solindices)        
   221        
   280        
   222     @cached
   281     @cached
   223     def _norel_support_set(self, relation):
   282     def _norel_support_set(self, relation):
   224         """return a set of (source, solindex) where source doesn't support the
   283         """return a set of (source, solindex) where source doesn't support the
   225         relation
   284         relation
   226         """
   285         """
   227         return frozenset((source, solidx) for source in self._session.repo.sources
   286         return frozenset((source, solidx) for source in self._repo.sources
   228                          for solidx in self._solindices
   287                          for solidx in self._solindices
   229                          if not ((source.support_relation(relation.r_type) and
   288                          if not ((source.support_relation(relation.r_type))
   230                                   not self.crossed_relation(source, relation))
       
   231                                  or relation.r_type in source.dont_cross_relations))
   289                                  or relation.r_type in source.dont_cross_relations))
   232 
   290 
   233 
   291     def _compute_sourcesterms(self):
   234     def _compute_sourcesvars(self):
   292         """compute for each term (variable, rewritten constant, relation) and
   235         """compute for each variable/solution in the rqlst which sources support
   293         for each solution in the rqlst which sources support them
   236         them
       
   237         """
   294         """
   238         repo = self._session.repo
   295         repo = self._repo
   239         eschema = repo.schema.eschema
   296         eschema = self._schema.eschema
   240         sourcesvars = self._sourcesvars
   297         sourcesterms = self._sourcesterms
   241         # find for each source which variable/solution are supported
   298         # find for each source which variable/solution are supported
   242         for varname, varobj in self.rqlst.defined_vars.items():
   299         for varname, varobj in self.rqlst.defined_vars.items():
   243             # if variable has an eid specified, we can get its source directly
   300             # if variable has an eid specified, we can get its source directly
   244             # NOTE: use uidrels and not constnode to deal with "X eid IN(1,2,3,4)"
   301             # NOTE: use uidrels and not constnode to deal with "X eid IN(1,2,3,4)"
   245             if varobj.stinfo['uidrels']:
   302             if varobj.stinfo['uidrels']:
   246                 vrels = varobj.stinfo['relations'] - varobj.stinfo['uidrels']
   303                 vrels = varobj.stinfo['relations'] - varobj.stinfo['uidrels']
   247                 for rel in varobj.stinfo['uidrels']:
   304                 for rel in varobj.stinfo['uidrels']:
   248                     if  rel.neged(strict=True) or rel.operator() != '=':
   305                     if rel.neged(strict=True) or rel.operator() != '=':
   249                         continue
   306                         continue
   250                     for const in rel.children[1].get_nodes(Constant):
   307                     for const in rel.children[1].get_nodes(Constant):
   251                         eid = const.eval(self.plan.args)
   308                         eid = const.eval(self.plan.args)
   252                         source = self._session.source_from_eid(eid)
   309                         source = self._session.source_from_eid(eid)
   253                         if vrels and not any(source.support_relation(r.r_type)
   310                         if vrels and not any(source.support_relation(r.r_type)
   254                                              for r in vrels):
   311                                              for r in vrels):
   255                             self._set_source_for_var(repo.system_source, varobj)
   312                             self._set_source_for_term(repo.system_source, varobj)
   256                         else:
   313                         else:
   257                             self._set_source_for_var(source, varobj)
   314                             self._set_source_for_term(source, varobj)
   258                 continue
   315                 continue
   259             rels = varobj.stinfo['relations']
   316             rels = varobj.stinfo['relations']
   260             if not rels and not varobj.stinfo['typerels']:
   317             if not rels and not varobj.stinfo['typerels']:
   261                 # (rare) case where the variable has no type specified nor
   318                 # (rare) case where the variable has no type specified nor
   262                 # relation accessed ex. "Any MAX(X)"
   319                 # relation accessed ex. "Any MAX(X)"
   263                 self._set_source_for_var(repo.system_source, varobj)
   320                 self._set_source_for_term(repo.system_source, varobj)
   264                 continue
   321                 continue
   265             for i, sol in enumerate(self._solutions):
   322             for i, sol in enumerate(self._solutions):
   266                 vartype = sol[varname]
   323                 vartype = sol[varname]
   267                 # skip final variable
   324                 # skip final variable
   268                 if eschema(vartype).is_final():
   325                 if eschema(vartype).is_final():
   274                         # * the variable isn't invariant
   331                         # * the variable isn't invariant
   275                         # * at least one supported relation specified
   332                         # * at least one supported relation specified
   276                         if not varobj._q_invariant or \
   333                         if not varobj._q_invariant or \
   277                                any(imap(source.support_relation,
   334                                any(imap(source.support_relation,
   278                                         (r.r_type for r in rels if r.r_type != 'eid'))):
   335                                         (r.r_type for r in rels if r.r_type != 'eid'))):
   279                             sourcesvars.setdefault(source, {}).setdefault(varobj, set()).add(i)
   336                             sourcesterms.setdefault(source, {}).setdefault(varobj, set()).add(i)
   280                         # if variable is not invariant and is used by a relation
   337                         # if variable is not invariant and is used by a relation
   281                         # not supported by this source, we'll have to split the
   338                         # not supported by this source, we'll have to split the
   282                         # query
   339                         # query
   283                         if not varobj._q_invariant and any(ifilterfalse(
   340                         if not varobj._q_invariant and any(ifilterfalse(
   284                             source.support_relation, (r.r_type for r in rels))):
   341                             source.support_relation, (r.r_type for r in rels))):
   285                             self.needsplit = True               
   342                             self.needsplit = True               
   286 
   343         # add source for rewritten constants to sourcesterms
   287     def _handle_cross_relation(self, rel, relsources, vsources):
   344         for vconsts in self.rqlst.stinfo['rewritten'].itervalues():
   288         crossvars = None
   345             const = vconsts[0]
   289         for source in relsources:
   346             source = self._session.source_from_eid(const.eval(self.plan.args))
   290             if rel.r_type in source.cross_relations:
   347             if source is self._repo.system_source:
   291                 crossvars = set(x.variable for x in rel.get_nodes(VariableRef))
   348                 for const in vconsts:
   292                 crossvars.update(frozenset(x for x in rel.get_nodes(Constant)))
   349                     self._set_source_for_term(source, const)
   293                 assert len(crossvars) == 2
   350             elif source in self._sourcesterms:
   294                 ssource = self._session.repo.system_source
   351                 source_scopes = frozenset(t.scope for t in self._sourcesterms[source])
   295                 needsplit = True
   352                 for const in vconsts:
   296                 flag = 0
   353                     if const.scope in source_scopes:
   297                 for v in crossvars:
   354                         self._set_source_for_term(source, const)
   298                     if isinstance(v, Constant):
   355         # add source for relations
   299                         allsols = set(self._solindices)
   356         rschema = self._schema.rschema
   300                         try:
   357         termssources = {}
   301                             self._sourcesvars[ssource][v] = allsols
       
   302                         except KeyError:
       
   303                             self._sourcesvars[ssource] = {v: allsols}
       
   304                     if len(vsources[v]) == 1:
       
   305                         if iter(vsources[v]).next()[0].uri == 'system':
       
   306                             flag = 1
       
   307                             for ov in crossvars:
       
   308                                 if ov is not v and (isinstance(ov, Constant) or ov._q_invariant):
       
   309                                     ssset = frozenset((ssource,))
       
   310                                     self._remove_sources(ov, vsources[ov] - ssset)
       
   311                         else:
       
   312                             for ov in crossvars:
       
   313                                 if ov is not v and (isinstance(ov, Constant) or ov._q_invariant):
       
   314                                     needsplit = False
       
   315                                     break
       
   316                             else:
       
   317                                 continue
       
   318                         if not rel.neged(strict=True):
       
   319                             break
       
   320                 else:
       
   321                     self._crossrelations.setdefault(source, {})[rel] = crossvars
       
   322                     if not flag:
       
   323                         self._sourcesvars.setdefault(source, {})[rel] = set(self._solindices)
       
   324                     self._sourcesvars.setdefault(ssource, {})[rel] = set(self._solindices)
       
   325                     if needsplit:
       
   326                         self.needsplit = True
       
   327         return crossvars is None
       
   328         
       
   329     def _remove_invalid_sources(self):
       
   330         """removes invalid sources from `sourcesvars` member according to
       
   331         traversed relations and their properties (which sources support them,
       
   332         can they cross sources, etc...)
       
   333         """
       
   334         repo = self._session.repo
       
   335         rschema = repo.schema.rschema
       
   336         vsources = {}
       
   337         for rel in self.rqlst.iget_nodes(Relation):
   358         for rel in self.rqlst.iget_nodes(Relation):
   338             # process non final relations only
   359             # process non final relations only
   339             # note: don't try to get schema for 'is' relation (not available
   360             # note: don't try to get schema for 'is' relation (not available
   340             # during bootstrap)
   361             # during bootstrap)
   341             if not rel.is_types_restriction() and not rschema(rel.r_type).is_final():
   362             if not rel.is_types_restriction() and not rschema(rel.r_type).is_final():
   344                 # attribute
   365                 # attribute
   345                 #
   366                 #
   346                 # XXX code below don't deal if some source allow relation
   367                 # XXX code below don't deal if some source allow relation
   347                 #     crossing but not another one
   368                 #     crossing but not another one
   348                 relsources = repo.rel_type_sources(rel.r_type)
   369                 relsources = repo.rel_type_sources(rel.r_type)
   349                 crossvars = None
       
   350                 if len(relsources) < 2:
   370                 if len(relsources) < 2:
   351                     # filter out sources being there because they have this
   371                     # filter out sources being there because they have this
   352                     # relation in their dont_cross_relations attribute
   372                     # relation in their dont_cross_relations attribute
   353                     relsources = [source for source in relsources
   373                     relsources = [source for source in relsources
   354                                   if source.support_relation(rel.r_type)]
   374                                   if source.support_relation(rel.r_type)]
   355                     if relsources:
   375                     if relsources:
   356                         # this means the relation is using a variable inlined as
   376                         # this means the relation is using a variable inlined as
   357                         # a constant and another unsupported variable, in which
   377                         # a constant and another unsupported variable, in which
   358                         # case we put the relation in sourcesvars
   378                         # case we put the relation in sourcesterms
   359                         self._sourcesvars.setdefault(relsources[0], {})[rel] = set(self._solindices)
   379                         self._sourcesterms.setdefault(relsources[0], {})[rel] = set(self._solindices)
   360                     continue
   380                     continue
   361                 lhs, rhs = rel.get_variable_parts()
   381                 lhs, rhs = rel.get_variable_parts()
   362                 lhsv, rhsv = getattr(lhs, 'variable', lhs), getattr(rhs, 'variable', rhs)
   382                 lhsv, rhsv = getattr(lhs, 'variable', lhs), getattr(rhs, 'variable', rhs)
   363                 # update dictionnary of sources supporting lhs and rhs vars
   383                 # update dictionary of sources supporting lhs and rhs vars
   364                 if not lhsv in vsources:
   384                 if not lhsv in termssources:
   365                     vsources[lhsv] = self._term_sources(lhs)
   385                     termssources[lhsv] = self._term_sources(lhs)
   366                 if not rhsv in vsources:
   386                 if not rhsv in termssources:
   367                     vsources[rhsv] = self._term_sources(rhs)
   387                     termssources[rhsv] = self._term_sources(rhs)
   368                 if self._handle_cross_relation(rel, relsources, vsources):
   388                 self._handle_cross_relation(rel, relsources, termssources)
   369                     self._linkedvars.setdefault(lhsv, set()).add((rhsv, rel))
   389                 self._linkedterms.setdefault(lhsv, set()).add((rhsv, rel))
   370                     self._linkedvars.setdefault(rhsv, set()).add((lhsv, rel))
   390                 self._linkedterms.setdefault(rhsv, set()).add((lhsv, rel))
   371                 else:
   391         return termssources
   372                     self._crosslinkedvars.setdefault(lhsv, set()).add((rhsv, rel))
   392             
   373                     self._crosslinkedvars.setdefault(rhsv, set()).add((lhsv, rel))
   393     def _handle_cross_relation(self, rel, relsources, termssources):
   374         for term in self._linkedvars:
   394         cross_rel = False
   375             self._remove_sources_until_stable(term, vsources)
   395         for source in relsources:
   376         if len(self._sourcesvars) > 1 and hasattr(self.plan.rqlst, 'main_relations'):
   396             if rel.r_type in source.cross_relations:
       
   397                 ssource = self._repo.system_source
       
   398                 crossvars = set(x.variable for x in rel.get_nodes(VariableRef))
       
   399                 for const in rel.get_nodes(Constant):
       
   400                     if source.uri != 'system' and not const in self._sourcesterms.get(source, ()):
       
   401                         continue
       
   402                     crossvars.add(const)
       
   403                     # XXX this is counter intuitive, though this is currently a
       
   404                     # trick to add const to system source terms so we get a
       
   405                     # chance that solutions will compare to equals when
       
   406                     # computing need split
       
   407                     allsols = set(self._solindices)
       
   408                     try:
       
   409                         self._sourcesterms[ssource][const] = allsols
       
   410                     except KeyError:
       
   411                         self._sourcesterms[ssource] = {const: allsols}
       
   412                 self._crossrelations.setdefault(source, {})[rel] = crossvars
       
   413                 if len(crossvars) < 2:
       
   414                     # this means there is a constant in the relation which is
       
   415                     # not supported by the source, so we can stop here
       
   416                     continue
       
   417                 self._sourcesterms.setdefault(ssource, {})[rel] = set(self._solindices)
       
   418                 cross_rel = True
       
   419                 needsplit = True
       
   420                 flag = False
       
   421                 for term in crossvars:
       
   422                     if len(termssources[term]) == 1:
       
   423                         if iter(termssources[term]).next()[0].uri == 'system':
       
   424                             flag = True
       
   425                             for ov in crossvars:
       
   426                                 if ov is not term and (isinstance(ov, Constant) or ov._q_invariant):
       
   427                                     ssset = frozenset((ssource,))
       
   428                                     self._remove_sources(ov, termssources[ov] - ssset)
       
   429                         else:
       
   430                             for ov in crossvars:
       
   431                                 if ov is not term and (isinstance(ov, Constant) or ov._q_invariant):
       
   432                                     needsplit = False
       
   433                                     break
       
   434                             else:
       
   435                                 continue
       
   436                 if not flag:
       
   437                     self._sourcesterms.setdefault(source, {})[rel] = set(self._solindices)
       
   438                 if needsplit:
       
   439                     self.needsplit = True
       
   440         return cross_rel
       
   441     
       
   442     def _remove_invalid_sources(self, termssources):
       
   443         """removes invalid sources from `sourcesterms` member according to
       
   444         traversed relations and their properties (which sources support them,
       
   445         can they cross sources, etc...)
       
   446         """
       
   447         for term in self._linkedterms:
       
   448             self._remove_sources_until_stable(term, termssources)
       
   449         if len(self._sourcesterms) > 1 and hasattr(self.plan.rqlst, 'main_relations'):
   377             # the querier doesn't annotate write queries, need to do it here
   450             # the querier doesn't annotate write queries, need to do it here
   378             self.plan.annotate_rqlst()
   451             self.plan.annotate_rqlst()
   379             # insert/update/delete queries, we may get extra information from
   452             # insert/update/delete queries, we may get extra information from
   380             # the main relation (eg relations to the left of the WHERE
   453             # the main relation (eg relations to the left of the WHERE
   381             if self.plan.rqlst.TYPE == 'insert':
   454             if self.plan.rqlst.TYPE == 'insert':
   382                 inserted = dict((vref.variable, etype)
   455                 inserted = dict((vref.variable, etype)
   383                                 for etype, vref in self.plan.rqlst.main_variables)
   456                                 for etype, vref in self.plan.rqlst.main_variables)
   384             else:
   457             else:
   385                 inserted = {}
   458                 inserted = {}
       
   459             repo = self._repo
       
   460             rschema = self._schema.rschema
   386             for rel in self.plan.rqlst.main_relations:
   461             for rel in self.plan.rqlst.main_relations:
   387                 if not rschema(rel.r_type).is_final():
   462                 if not rschema(rel.r_type).is_final():
   388                     # nothing to do if relation is not supported by multiple sources
   463                     # nothing to do if relation is not supported by multiple sources
   389                     if len(repo.rel_type_sources(rel.r_type)) < 2:
   464                     if len(repo.rel_type_sources(rel.r_type)) < 2:
   390                         continue
   465                         continue
   391                     lhs, rhs = rel.get_variable_parts()
   466                     lhs, rhs = rel.get_variable_parts()
   392                     try:
   467                     try:
   393                         lhsv = self._extern_term(lhs, vsources, inserted)
   468                         lhsv = self._extern_term(lhs, termssources, inserted)
   394                         rhsv = self._extern_term(rhs, vsources, inserted)
   469                         rhsv = self._extern_term(rhs, termssources, inserted)
   395                     except KeyError, ex:
   470                     except KeyError, ex:
   396                         continue
   471                         continue
   397                     norelsup = self._norel_support_set(rel)
   472                     self._remove_term_sources(lhsv, rel, rhsv, termssources)
   398                     self._remove_var_sources(lhsv, norelsup, rhsv, vsources)
   473                     self._remove_term_sources(rhsv, rel, lhsv, termssources)
   399                     self._remove_var_sources(rhsv, norelsup, lhsv, vsources)
   474                                     
   400         # cleanup linked var
   475     def _extern_term(self, term, termssources, inserted):
   401         for var, linkedrelsinfo in self._linkedvars.iteritems():
       
   402             self._linkedvars[var] = frozenset(x[0] for x in linkedrelsinfo)
       
   403         # if there are other sources than the system source, consider simplified
       
   404         # variables'source
       
   405         if self._sourcesvars and self._sourcesvars.keys() != [self._session.repo.system_source]:
       
   406             # add source for rewritten constants to sourcesvars
       
   407             for vconsts in self.rqlst.stinfo['rewritten'].itervalues():
       
   408                 const = vconsts[0]
       
   409                 eid = const.eval(self.plan.args)
       
   410                 source = self._session.source_from_eid(eid)
       
   411                 if source is self._session.repo.system_source:
       
   412                     for const in vconsts:
       
   413                         self._set_source_for_var(source, const)
       
   414                 elif source in self._sourcesvars:
       
   415                     source_scopes = frozenset(v.scope for v in self._sourcesvars[source])
       
   416                     for const in vconsts:
       
   417                         if const.scope in source_scopes:
       
   418                             self._set_source_for_var(source, const)
       
   419                             
       
   420     def _extern_term(self, term, vsources, inserted):
       
   421         var = term.variable
   476         var = term.variable
   422         if var.stinfo['constnode']:
   477         if var.stinfo['constnode']:
   423             termv = var.stinfo['constnode']
   478             termv = var.stinfo['constnode']
   424             vsources[termv] = self._term_sources(termv)
   479             termssources[termv] = self._term_sources(termv)
   425         elif var in inserted:
   480         elif var in inserted:
   426             termv = var
   481             termv = var
   427             source = self._session.repo.locate_etype_source(inserted[var])
   482             source = self._repo.locate_etype_source(inserted[var])
   428             vsources[termv] = set((source, solindex) for solindex in self._solindices)
   483             termssources[termv] = set((source, solindex)
       
   484                                       for solindex in self._solindices)
   429         else:
   485         else:
   430             termv = self.rqlst.defined_vars[var.name]
   486             termv = self.rqlst.defined_vars[var.name]
   431             if not termv in vsources:
   487             if not termv in termssources:
   432                 vsources[termv] = self._term_sources(termv)
   488                 termssources[termv] = self._term_sources(termv)
   433         return termv
   489         return termv
   434         
   490         
   435     def _remove_sources_until_stable(self, var, vsources):
   491     def _remove_sources_until_stable(self, term, termssources):
   436         sourcesvars = self._sourcesvars
   492         sourcesterms = self._sourcesterms
   437         for ovar, rel in self._linkedvars.get(var, ()):
   493         for oterm, rel in self._linkedterms.get(term, ()):
   438             if not var.scope is ovar.scope and rel.scope.neged(strict=True):
   494             if not term.scope is oterm.scope and rel.scope.neged(strict=True):
   439                 # can't get information from relation inside a NOT exists
   495                 # can't get information from relation inside a NOT exists
   440                 # where variables don't belong to the same scope
   496                 # where terms don't belong to the same scope
   441                 continue
   497                 continue
   442             need_ancestor_scope = False
   498             need_ancestor_scope = False
   443             if not (var.scope is rel.scope and ovar.scope is rel.scope):
   499             if not (term.scope is rel.scope and oterm.scope is rel.scope):
   444                 if rel.ored():
   500                 if rel.ored():
   445                     continue
   501                     continue
   446                 if rel.ored(traverse_scope=True):
   502                 if rel.ored(traverse_scope=True):
   447                     # if relation has some OR as parent, constraints should only
   503                     # if relation has some OR as parent, constraints should only
   448                     # propagate from parent scope to child scope, nothing else
   504                     # propagate from parent scope to child scope, nothing else
   449                     need_ancestor_scope = True
   505                     need_ancestor_scope = True
   450             relsources = self._session.repo.rel_type_sources(rel.r_type)
   506             relsources = self._repo.rel_type_sources(rel.r_type)
   451             if rel.neged(strict=True) and (
   507             if rel.neged(strict=True) and (
   452                 len(relsources) < 2
   508                 len(relsources) < 2
   453                 or not isinstance(ovar, Variable)
   509                 or not isinstance(oterm, Variable)
   454                 or ovar.valuable_references() != 1
   510                 or oterm.valuable_references() != 1
   455                 or any(sourcesvars[source][var] != sourcesvars[source][ovar]
   511                 or any(sourcesterms[source][term] != sourcesterms[source][oterm]
   456                        for source in relsources
   512                        for source in relsources
   457                        if var in sourcesvars.get(source, ())
   513                        if term in sourcesterms.get(source, ())
   458                        and ovar in sourcesvars.get(source, ()))):
   514                        and oterm in sourcesterms.get(source, ()))):
   459                 # neged relation doesn't allow to infer variable sources unless we're
   515                 # neged relation doesn't allow to infer term sources unless
   460                 # on a multisource relation for a variable only used by this relation
   516                 # we're on a multisource relation for a term only used by this
   461                 # (eg "Any X WHERE NOT X multisource_rel Y" and over is Y), iif 
   517                 # relation (eg "Any X WHERE NOT X multisource_rel Y" and over is
       
   518                 # Y)
   462                 continue
   519                 continue
   463             norelsup = self._norel_support_set(rel)
   520             # compute invalid sources for terms and remove them
   464             # compute invalid sources for variables and remove them
   521             if not need_ancestor_scope or is_ancestor(term.scope, oterm.scope):
   465             if not need_ancestor_scope or is_ancestor(var.scope, ovar.scope):
   522                 self._remove_term_sources(term, rel, oterm, termssources)
   466                 self._remove_var_sources(var, norelsup, ovar, vsources)
   523             if not need_ancestor_scope or is_ancestor(oterm.scope, term.scope):
   467             if not need_ancestor_scope or is_ancestor(ovar.scope, var.scope):
   524                 self._remove_term_sources(oterm, rel, term, termssources)
   468                 self._remove_var_sources(ovar, norelsup, var, vsources)
   525     
   469     
   526     def _remove_term_sources(self, term, rel, oterm, termssources):
   470     def _remove_var_sources(self, var, norelsup, ovar, vsources):
   527         """remove invalid sources for term according to oterm's sources and the
   471         """remove invalid sources for var according to ovar's sources and the
   528         relation between those two terms. 
   472         relation between those two variables. 
       
   473         """
   529         """
   474         varsources = vsources[var]
   530         norelsup = self._norel_support_set(rel)
   475         invalid_sources = varsources - (vsources[ovar] | norelsup)
   531         termsources = termssources[term]
       
   532         invalid_sources = termsources - (termssources[oterm] | norelsup)
       
   533         if invalid_sources and self._repo.can_cross_relation(rel.r_type):
       
   534             invalid_sources -= self._sys_source_set
       
   535             if invalid_sources and isinstance(term, Variable) and self._need_ext_source_access(term, rel):
       
   536                 # if the term is a not invariant variable, we should filter out
       
   537                 # source where the relation is a cross relation from invalid
       
   538                 # sources
       
   539                 invalid_sources = frozenset([(s, solidx) for s, solidx in invalid_sources
       
   540                                              if not (s in self._crossrelations and
       
   541                                                      rel in self._crossrelations[s])])
   476         if invalid_sources:
   542         if invalid_sources:
   477             self._remove_sources(var, invalid_sources)
   543             self._remove_sources(term, invalid_sources)
   478             varsources -= invalid_sources
   544             termsources -= invalid_sources
   479             self._remove_sources_until_stable(var, vsources)
   545             self._remove_sources_until_stable(term, termssources)
   480         
   546         
   481     def _compute_needsplit(self):
   547     def _compute_needsplit(self):
   482         """tell according to sourcesvars if the rqlst has to be splitted for
   548         """tell according to sourcesterms if the rqlst has to be splitted for
   483         execution among multiple sources
   549         execution among multiple sources
   484         
   550         
   485         the execution has to be split if
   551         the execution has to be split if
   486         * a source support an entity (non invariant) but doesn't support a
   552         * a source support an entity (non invariant) but doesn't support a
   487           relation on it
   553           relation on it
   489         * there is more than one source and either all sources'supported        
   555         * there is more than one source and either all sources'supported        
   490           variable/solutions are not equivalent or multiple variables have to
   556           variable/solutions are not equivalent or multiple variables have to
   491           be fetched from some source
   557           be fetched from some source
   492         """
   558         """
   493         # NOTE: < 2 since may be 0 on queries such as Any X WHERE X eid 2
   559         # NOTE: < 2 since may be 0 on queries such as Any X WHERE X eid 2
   494         if len(self._sourcesvars) < 2: 
   560         if len(self._sourcesterms) < 2: 
   495             self.needsplit = False
   561             self.needsplit = False
   496         elif not self.needsplit:
   562         elif not self.needsplit:
   497             if not allequals(self._sourcesvars.itervalues()):
   563             if not allequals(self._sourcesterms.itervalues()):
   498                 self.needsplit = True
   564                 self.needsplit = True
   499             else:
   565             else:
   500                 sample = self._sourcesvars.itervalues().next()
   566                 sample = self._sourcesterms.itervalues().next()
   501                 if len(sample) > 1 and any(v for v in sample
   567                 if len(sample) > 1:
   502                                            if not v in self._linkedvars
   568                     for term in sample:
   503                                            and not v in self._crosslinkedvars):
   569                         # need split if unlinked variable
   504                     self.needsplit = True
   570                         if isinstance(term, Variable) and not term in self._linkedterms:
   505             
   571                             self.needsplit = True
   506     def _set_source_for_var(self, source, var):
   572                             break
   507         self._sourcesvars.setdefault(source, {})[var] = set(self._solindices)
   573                     else:
       
   574                         # need split if there are some cross relation on non
       
   575                         # invariant variable or if the variable is used in
       
   576                         # multi-sources relation
       
   577                         if self._crossrelations:
       
   578                             for reldict in self._crossrelations.itervalues():
       
   579                                 for rel, terms in reldict.iteritems():
       
   580                                     for term in terms:
       
   581                                         if isinstance(term, Variable) and self._need_ext_source_access(term, rel):
       
   582                                             self.needsplit = True
       
   583                                             return
       
   584 
       
   585     @cached
       
   586     def _need_ext_source_access(self, var, rel):
       
   587         if not var._q_invariant:
       
   588             return True
       
   589         if  any(r for x, r in self._linkedterms[var]
       
   590                 if not r is rel and self._repo.is_multi_sources_relation(r.r_type)):
       
   591             return True
       
   592         return False
       
   593         
       
   594     def _set_source_for_term(self, source, term):
       
   595         self._sourcesterms.setdefault(source, {})[term] = set(self._solindices)
   508 
   596 
   509     def _term_sources(self, term):
   597     def _term_sources(self, term):
   510         """returns possible sources for terms `term`"""
   598         """returns possible sources for terms `term`"""
   511         if isinstance(term, Constant):
   599         if isinstance(term, Constant):
   512             source = self._session.source_from_eid(term.eval(self.plan.args))
   600             source = self._session.source_from_eid(term.eval(self.plan.args))
   513             return set((source, solindex) for solindex in self._solindices)
   601             return set((source, solindex) for solindex in self._solindices)
   514         else:
   602         else:
   515             var = getattr(term, 'variable', term)
   603             var = getattr(term, 'variable', term)
   516             sources = [source for source, varobjs in self.sourcesvars.iteritems()
   604             sources = [source for source, varobjs in self.sourcesterms.iteritems()
   517                        if var in varobjs]
   605                        if var in varobjs]
   518             return set((source, solindex) for source in sources
   606             return set((source, solindex) for source in sources
   519                        for solindex in self.sourcesvars[source][var])
   607                        for solindex in self.sourcesterms[source][var])
   520 
   608 
   521     def _remove_sources(self, var, sources):
   609     def _remove_sources(self, term, sources):
   522         """removes invalid sources (`sources`) from `sourcesvars`
   610         """removes invalid sources (`sources`) from `sourcesterms`
   523 
   611 
   524         :param sources: the list of sources to remove
   612         :param sources: the list of sources to remove
   525         :param var: the analyzed variable
   613         :param term: the analyzed term
   526         """
   614         """
   527         sourcesvars = self._sourcesvars
   615         sourcesterms = self._sourcesterms
   528         for source, solindex in sources:
   616         for source, solindex in sources:
   529             try:
   617             try:
   530                 sourcesvars[source][var].remove(solindex)
   618                 sourcesterms[source][term].remove(solindex)
   531             except KeyError:
   619             except KeyError:
   532                 return # may occur with subquery column alias
   620                 return # may occur with subquery column alias
   533             if not sourcesvars[source][var]:
   621             if not sourcesterms[source][term]:
   534                 del sourcesvars[source][var]
   622                 del sourcesterms[source][term]
   535                 if not sourcesvars[source]:
   623                 if not sourcesterms[source]:
   536                     del sourcesvars[source]
   624                     del sourcesterms[source]
   537 
   625 
   538     def crossed_relation(self, source, relation):
   626     def crossed_relation(self, source, relation):
   539         return relation in self._crossrelations.get(source, ())
   627         return relation in self._crossrelations.get(source, ())
   540     
   628     
   541     def part_steps(self):
   629     def part_steps(self):
   543         each step. This is necessary to know if an aggregate step will be
   631         each step. This is necessary to know if an aggregate step will be
   544         necessary or not.
   632         necessary or not.
   545         """
   633         """
   546         steps = []
   634         steps = []
   547         select = self.rqlst
   635         select = self.rqlst
   548         rschema = self.plan.schema.rschema
   636         rschema = self._schema.rschema
   549         for source in self.part_sources:
   637         for source in self.part_sources:
   550             sourcevars = self._sourcesvars[source]
   638             sourceterms = self._sourcesterms[source]
   551             while sourcevars:
   639             while sourceterms:
   552                 # take a variable randomly, and all variables supporting the
   640                 # take a term randomly, and all terms supporting the
   553                 # same solutions
   641                 # same solutions
   554                 var, solindices = self._choose_var(sourcevars)
   642                 term, solindices = self._choose_term(sourceterms)
   555                 if source.uri == 'system':
   643                 if source.uri == 'system':
   556                     # ensure all variables are available for the latest step
   644                     # ensure all variables are available for the latest step
   557                     # (missing one will be available from temporary tables
   645                     # (missing one will be available from temporary tables
   558                     # of previous steps)
   646                     # of previous steps)
   559                     scope = select
   647                     scope = select
   560                     variables = scope.defined_vars.values() + scope.aliases.values()
   648                     terms = scope.defined_vars.values() + scope.aliases.values()
   561                     sourcevars.clear()
   649                     sourceterms.clear()
       
   650                     sources = [source]
   562                 else:
   651                 else:
   563                     scope = var.scope
   652                     scope = term.scope
   564                     variables = self._expand_vars(var, source, sourcevars, scope, solindices)
   653                     # find which sources support the same term and solutions
   565                     if not sourcevars:
   654                     sources = self._expand_sources(source, term, solindices)
   566                         del self._sourcesvars[source]
   655                     # no try to get as much terms as possible
   567                 # find which sources support the same variables/solutions
   656                     terms = self._expand_terms(term, sources, sourceterms,
   568                 sources = self._expand_sources(source, variables, solindices)
   657                                                scope, solindices)
       
   658                     if len(terms) == 1 and isinstance(terms[0], Constant):
       
   659                         # we can't generate anything interesting with a single
       
   660                         # constant term (will generate an empty "Any" query),
       
   661                         # go to the next iteration directly!
       
   662                         continue
       
   663                     if not sourceterms:
       
   664                         del self._sourcesterms[source]
   569                 # suppose this is a final step until the contrary is proven
   665                 # suppose this is a final step until the contrary is proven
   570                 final = scope is select
   666                 final = scope is select
   571                 # set of variables which should be additionaly selected when
   667                 # set of terms which should be additionaly selected when
   572                 # possible
   668                 # possible
   573                 needsel = set()
   669                 needsel = set()
   574                 # add attribute variables and mark variables which should be
   670                 # add attribute variables and mark variables which should be
   575                 # additionaly selected when possible
   671                 # additionaly selected when possible
   576                 for var in select.defined_vars.itervalues():
   672                 for var in select.defined_vars.itervalues():
   577                     if not var in variables:
   673                     if not var in terms:
   578                         stinfo = var.stinfo
   674                         stinfo = var.stinfo
   579                         for ovar, rtype in stinfo['attrvars']:
   675                         for ovar, rtype in stinfo['attrvars']:
   580                             if ovar in variables:
   676                             if ovar in terms:
   581                                 needsel.add(var.name)
   677                                 needsel.add(var.name)
   582                                 variables.append(var)
   678                                 terms.append(var)
   583                                 break
   679                                 break
   584                         else:
   680                         else:
   585                             needsel.add(var.name)
   681                             needsel.add(var.name)
   586                             final = False
   682                             final = False
   587                 if final and source.uri != 'system':
   683                 if final and source.uri != 'system':
   589                     for vconsts in select.stinfo['rewritten'].itervalues():
   685                     for vconsts in select.stinfo['rewritten'].itervalues():
   590                         const = vconsts[0]
   686                         const = vconsts[0]
   591                         eid = const.eval(self.plan.args)
   687                         eid = const.eval(self.plan.args)
   592                         _source = self._session.source_from_eid(eid)
   688                         _source = self._session.source_from_eid(eid)
   593                         if len(sources) > 1 or not _source in sources:
   689                         if len(sources) > 1 or not _source in sources:
   594                             # if there is some rewriten constant used by a
   690                             # if there is some rewriten constant used by a not
   595                             # not neged relation while there are some source
   691                             # neged relation while there are some source not
   596                             # not supporting the associated entity, this step
   692                             # supporting the associated entity, this step can't
   597                             # can't be final (unless the relation is explicitly
   693                             # be final (unless the relation is explicitly in
   598                             # in `variables`, eg cross relations)
   694                             # `terms`, eg cross relations)
   599                             for c in vconsts:
   695                             for c in vconsts:
   600                                 rel = c.relation()
   696                                 rel = c.relation()
   601                                 if rel is None or not (rel in variables or rel.neged(strict=True)):
   697                                 if rel is None or not (rel in terms or rel.neged(strict=True)):
   602                                 #if rel is not None and rel.r_type == 'identity' and not rel.neged(strict=True):
       
   603                                     final = False
   698                                     final = False
   604                                     break
   699                                     break
   605                             break
   700                             break
   606                 # check where all relations are supported by the sources
   701                 # check where all relations are supported by the sources
   607                 for rel in scope.iget_nodes(Relation):
   702                 for rel in scope.iget_nodes(Relation):
   612                         if not _source.support_relation(rel.r_type):
   707                         if not _source.support_relation(rel.r_type):
   613                             for vref in rel.iget_nodes(VariableRef):
   708                             for vref in rel.iget_nodes(VariableRef):
   614                                 needsel.add(vref.name)
   709                                 needsel.add(vref.name)
   615                             final = False
   710                             final = False
   616                             break
   711                             break
   617                         elif self.crossed_relation(_source, rel) and not rel in variables:
   712                         elif self.crossed_relation(_source, rel) and not rel in terms:
   618                             final = False
   713                             final = False
   619                             break
   714                             break
   620                     else:
   715                     else:
   621                         if not scope is select:
   716                         if not scope is select:
   622                             self._exists_relation(rel, variables, needsel)
   717                             self._exists_relation(rel, terms, needsel)
   623                         # if relation is supported by all sources and some of
   718                         # if relation is supported by all sources and some of
   624                         # its lhs/rhs variable isn't in "variables", and the
   719                         # its lhs/rhs variable isn't in "terms", and the
   625                         # other end *is* in "variables", mark it have to be
   720                         # other end *is* in "terms", mark it have to be
   626                         # selected
   721                         # selected
   627                         if source.uri != 'system' and not rschema(rel.r_type).is_final():
   722                         if source.uri != 'system' and not rschema(rel.r_type).is_final():
   628                             lhs, rhs = rel.get_variable_parts()
   723                             lhs, rhs = rel.get_variable_parts()
   629                             try:
   724                             try:
   630                                 lhsvar = lhs.variable
   725                                 lhsvar = lhs.variable
   632                                 lhsvar = lhs
   727                                 lhsvar = lhs
   633                             try:
   728                             try:
   634                                 rhsvar = rhs.variable
   729                                 rhsvar = rhs.variable
   635                             except AttributeError:
   730                             except AttributeError:
   636                                 rhsvar = rhs
   731                                 rhsvar = rhs
   637                             if lhsvar in variables and not rhsvar in variables:
   732                             if lhsvar in terms and not rhsvar in terms:
   638                                 needsel.add(lhsvar.name)
   733                                 needsel.add(lhsvar.name)
   639                             elif rhsvar in variables and not lhsvar in variables:
   734                             elif rhsvar in terms and not lhsvar in terms:
   640                                 needsel.add(rhsvar.name)
   735                                 needsel.add(rhsvar.name)
   641                 if final:
   736                 if final:
   642                     self._cleanup_sourcesvars(sources, solindices)
   737                     self._cleanup_sourcesterms(sources, solindices)
   643                 # XXX rename: variables may contain Relation and Constant nodes...
   738                 steps.append((sources, terms, solindices, scope, needsel, final)
   644                 steps.append( (sources, variables, solindices, scope, needsel,
   739                              )
   645                                final) )
       
   646         return steps
   740         return steps
   647 
   741 
   648     def _exists_relation(self, rel, variables, needsel):
   742     def _exists_relation(self, rel, terms, needsel):
   649         rschema = self.plan.schema.rschema(rel.r_type)
   743         rschema = self._schema.rschema(rel.r_type)
   650         lhs, rhs = rel.get_variable_parts()
   744         lhs, rhs = rel.get_variable_parts()
   651         try:
   745         try:
   652             lhsvar, rhsvar = lhs.variable, rhs.variable
   746             lhsvar, rhsvar = lhs.variable, rhs.variable
   653         except AttributeError:
   747         except AttributeError:
   654             pass
   748             pass
   656             # supported relation with at least one end supported, check the
   750             # supported relation with at least one end supported, check the
   657             # other end is in as well. If not this usually means the
   751             # other end is in as well. If not this usually means the
   658             # variable is refed by an outer scope and should be substituted
   752             # variable is refed by an outer scope and should be substituted
   659             # using an 'identity' relation (else we'll get a conflict of
   753             # using an 'identity' relation (else we'll get a conflict of
   660             # temporary tables)
   754             # temporary tables)
   661             if rhsvar in variables and not lhsvar in variables:
   755             if rhsvar in terms and not lhsvar in terms:
   662                 self._identity_substitute(rel, lhsvar, variables, needsel)
   756                 self._identity_substitute(rel, lhsvar, terms, needsel)
   663             elif lhsvar in variables and not rhsvar in variables:
   757             elif lhsvar in terms and not rhsvar in terms:
   664                 self._identity_substitute(rel, rhsvar, variables, needsel)
   758                 self._identity_substitute(rel, rhsvar, terms, needsel)
   665 
   759 
   666     def _identity_substitute(self, relation, var, variables, needsel):
   760     def _identity_substitute(self, relation, var, terms, needsel):
   667         newvar = self._insert_identity_variable(relation.scope, var)
   761         newvar = self._insert_identity_variable(relation.scope, var)
   668         if newvar is not None:
   762         if newvar is not None:
   669             # ensure relation is using '=' operator, else we rely on a
   763             # ensure relation is using '=' operator, else we rely on a
   670             # sqlgenerator side effect (it won't insert an inequality operator
   764             # sqlgenerator side effect (it won't insert an inequality operator
   671             # in this case)
   765             # in this case)
   672             relation.children[1].operator = '=' 
   766             relation.children[1].operator = '=' 
   673             variables.append(newvar)
   767             terms.append(newvar)
   674             needsel.add(newvar.name)
   768             needsel.add(newvar.name)
   675         
   769         
   676     def _choose_var(self, sourcevars):
   770     def _choose_term(self, sourceterms):
       
   771         """pick one term among terms supported by a source, which will be used
       
   772         as a base to generate an execution step
       
   773         """
   677         secondchoice = None
   774         secondchoice = None
   678         if len(self._sourcesvars) > 1:
   775         if len(self._sourcesterms) > 1:
   679             # priority to variable from subscopes
   776             # priority to variable from subscopes
   680             for var in sourcevars:
   777             for var in sourceterms:
   681                 if not var.scope is self.rqlst:
   778                 if not var.scope is self.rqlst:
   682                     if isinstance(var, Variable):
   779                     if isinstance(var, Variable):
   683                         return var, sourcevars.pop(var)
   780                         return var, sourceterms.pop(var)
   684                     secondchoice = var
   781                     secondchoice = var
   685         else:
   782         else:
   686             # priority to variable outer scope
   783             # priority to variable outer scope
   687             for var in sourcevars:
   784             for var in sourceterms:
   688                 if var.scope is self.rqlst:
   785                 if var.scope is self.rqlst:
   689                     if isinstance(var, Variable):
   786                     if isinstance(var, Variable):
   690                         return var, sourcevars.pop(var)
   787                         return var, sourceterms.pop(var)
   691                     secondchoice = var
   788                     secondchoice = var
   692         if secondchoice is not None:
   789         if secondchoice is not None:
   693             return secondchoice, sourcevars.pop(secondchoice)
   790             return secondchoice, sourceterms.pop(secondchoice)
   694         # priority to variable
   791         # priority to variable
   695         for var in sourcevars:
   792         for var in sourceterms:
   696             if isinstance(var, Variable):
   793             if isinstance(var, Variable):
   697                 return var, sourcevars.pop(var)
   794                 return var, sourceterms.pop(var)
   698         # whatever
   795         # whatever
   699         var = iter(sourcevars).next()
   796         var = iter(sourceterms).next()
   700         return var, sourcevars.pop(var)
   797         return var, sourceterms.pop(var)
   701             
   798             
       
   799     def _expand_sources(self, selected_source, term, solindices):
       
   800         """return all sources supporting given term / solindices"""
       
   801         sources = [selected_source]
       
   802         sourcesterms = self._sourcesterms
       
   803         for source in sourcesterms:
       
   804             if source is selected_source:
       
   805                 continue
       
   806             if not (term in sourcesterms[source] and 
       
   807                     solindices.issubset(sourcesterms[source][term])):
       
   808                 continue
       
   809             sources.append(source)
       
   810             if source.uri != 'system':
       
   811                 termsolindices = sourcesterms[source][term]
       
   812                 termsolindices -= solindices
       
   813                 if not termsolindices:
       
   814                     del sourcesterms[source][term]                
       
   815         return sources
   702             
   816             
   703     def _expand_vars(self, var, source, sourcevars, scope, solindices):
   817     def _expand_terms(self, term, sources, sourceterms, scope, solindices):
   704         variables = [var]
   818         terms = [term]
       
   819         sources = sorted(sources)
   705         nbunlinked = 1
   820         nbunlinked = 1
   706         linkedvars = self._linkedvars
   821         linkedterms = self._linkedterms
   707         # variable has to belong to the same scope if there is more
   822         # term has to belong to the same scope if there is more
   708         # than the system source remaining
   823         # than the system source remaining
   709         if len(self._sourcesvars) > 1 and not scope is self.rqlst:
   824         if len(self._sourcesterms) > 1 and not scope is self.rqlst:
   710             candidates = (v for v in sourcevars.keys() if scope is v.scope)
   825             candidates = (t for t in sourceterms.keys() if scope is t.scope)
   711         else:
   826         else:
   712             candidates = sourcevars #.iterkeys()
   827             candidates = sourceterms #.iterkeys()
   713         # we only want one unlinked variable in each generated query
   828         # we only want one unlinked term in each generated query
   714         candidates = [v for v in candidates
   829         candidates = [t for t in candidates
   715                       if isinstance(v, Constant) or
   830                       if isinstance(t, Constant) or
   716                       (solindices.issubset(sourcevars[v]) and v in linkedvars)]
   831                       (solindices.issubset(sourceterms[t]) and t in linkedterms)]
   717         accept_var = lambda x: (isinstance(x, Constant) or any(v for v in variables if v in linkedvars.get(x, ())))
   832         accept_term = lambda x: (not any(s for s in sources if not x in self._sourcesterms[s])
   718         source_cross_rels = self._crossrelations.get(source, ())
   833                                 and any(t for t in terms if t in linkedterms.get(x, ())))
   719         if isinstance(var, Relation) and var in source_cross_rels:
   834         source_cross_rels = {}
   720             cross_vars = source_cross_rels.pop(var)
   835         for source in sources:
   721             base_accept_var = accept_var
   836             source_cross_rels.update(self._crossrelations.get(source, {}))
   722             accept_var = lambda x: (base_accept_var(x) or x in cross_vars)
   837         if isinstance(term, Relation) and term in source_cross_rels:
   723             for refed in cross_vars:
   838             cross_terms = source_cross_rels.pop(term)
       
   839             base_accept_term = accept_term
       
   840             accept_term = lambda x: (base_accept_term(x) or x in cross_terms)
       
   841             for refed in cross_terms:
   724                 if not refed in candidates:
   842                 if not refed in candidates:
   725                     candidates.append(refed)
   843                     candidates.append(refed)
   726         else:
   844         else:
   727             cross_vars = ()
   845             cross_terms = ()
   728         # repeat until no variable can't be added, since addition of a new
   846         # repeat until no term can't be added, since addition of a new
   729         # variable may permit to another one to be added
   847         # term may permit to another one to be added
   730         modified = True
   848         modified = True
   731         while modified and candidates:
   849         while modified and candidates:
   732             modified = False
   850             modified = False
   733             for var in candidates[:]:
   851             for term in candidates[:]:
   734                 if accept_var(var):
   852                 if isinstance(term, Constant):
   735                     variables.append(var)
   853                     relation = term.relation()
   736                     try:
   854                     if sorted(set(x[0] for x in self._term_sources(term))) != sources:
   737                         # constant nodes should be systematically deleted
   855                         continue
   738                         if isinstance(var, Constant):
   856                     terms.append(term)
   739                             del sourcevars[var]
   857                     candidates.remove(term)
   740                         else:
       
   741                             # variable nodes should be deleted once all possible
       
   742                             # solutions indices have been consumed
       
   743                             sourcevars[var] -= solindices
       
   744                             if not sourcevars[var]:
       
   745                                 del sourcevars[var]
       
   746                     except KeyError:
       
   747                         assert var in cross_vars
       
   748                     candidates.remove(var)
       
   749                     modified = True
   858                     modified = True
   750         return variables
   859                     del sourceterms[term]
   751     
   860                 elif accept_term(term):
   752     def _expand_sources(self, selected_source, vars, solindices):
   861                     terms.append(term)
   753         sources = [selected_source]
   862                     candidates.remove(term)
   754         sourcesvars = self._sourcesvars
   863                     modified = True
   755         for source in sourcesvars:
   864                     for source in sources:
   756             if source is selected_source:
   865                         sourceterms = self._sourcesterms[source]
   757                 continue
   866                         # terms should be deleted once all possible solutions
   758             for var in vars:
   867                         # indices have been consumed
   759                 if not (var in sourcesvars[source] and 
   868                         try:
   760                         solindices.issubset(sourcesvars[source][var])):
   869                             sourceterms[term] -= solindices
   761                     break
   870                             if not sourceterms[term]:
   762             else:
   871                                 del sourceterms[term]
   763                 sources.append(source)
   872                         except KeyError:
   764                 if source.uri != 'system':
   873                             assert term in cross_terms
   765                     for var in vars:
   874         return terms
   766                         varsolindices = sourcesvars[source][var]
   875     
   767                         varsolindices -= solindices
   876     def _cleanup_sourcesterms(self, sources, solindices):
   768                         if not varsolindices:
       
   769                             del sourcesvars[source][var]                
       
   770         return sources
       
   771     
       
   772     def _cleanup_sourcesvars(self, sources, solindices):
       
   773         """on final parts, remove solutions so we know they are already processed"""
   877         """on final parts, remove solutions so we know they are already processed"""
   774         for source in sources:
   878         for source in sources:
   775             try:
   879             try:
   776                 sourcevars = self._sourcesvars[source]
   880                 sourceterms = self._sourcesterms[source]
   777             except KeyError:
   881             except KeyError:
   778                 continue
   882                 continue
   779             for var, varsolindices in sourcevars.items():
   883             for term, termsolindices in sourceterms.items():
   780                 if isinstance(var, Relation) and self.crossed_relation(source, var):
   884                 if isinstance(term, Relation) and self.crossed_relation(source, term):
   781                     continue
   885                     continue
   782                 varsolindices -= solindices
   886                 termsolindices -= solindices
   783                 if not varsolindices:
   887                 if not termsolindices:
   784                     del sourcevars[var]
   888                     del sourceterms[term]
   785                     
   889                     
   786     def merge_input_maps(self, allsolindices):
   890     def merge_input_maps(self, allsolindices):
   787         """inputmaps is a dictionary with tuple of solution indices as key with an
   891         """inputmaps is a dictionary with tuple of solution indices as key with
   788         associateed input map as value. This function compute for each solution 
   892         an associated input map as value. This function compute for each
   789         its necessary input map and return them grouped
   893         solution its necessary input map and return them grouped
   790 
   894 
   791         ex:
   895         ex:
   792         inputmaps = {(0, 1, 2): {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1'},
   896         inputmaps = {(0, 1, 2): {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1'},
   793                      (1,): {'X': 't2.C0', 'T': 't2.C1'}}
   897                      (1,): {'X': 't2.C0', 'T': 't2.C1'}}
   794         return : [([1],  {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1',
   898         return : [([1],  {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1',
   820             result.append( (list(allsolindices), None) )
   924             result.append( (list(allsolindices), None) )
   821         return result
   925         return result
   822 
   926 
   823     def build_final_part(self, select, solindices, inputmap,  sources,
   927     def build_final_part(self, select, solindices, inputmap,  sources,
   824                          insertedvars):
   928                          insertedvars):
   825         plan = self.plan
   929         solutions = [self._solutions[i] for i in solindices]
   826         rqlst = plan.finalize(select, [self._solutions[i] for i in solindices],
   930         if self._conflicts:
   827                               insertedvars)
   931             for varname, mappedto in self._conflicts:
       
   932                 var = select.defined_vars[varname]
       
   933                 newvar = select.make_variable()
       
   934                 # XXX should use var.scope but scope hasn't been computed yet
       
   935                 select.add_relation(var, 'identity', newvar)
       
   936                 for sol in solutions:
       
   937                     sol[newvar.name] = sol[varname]
       
   938                 inputmap[newvar.name] = mappedto
       
   939         rqlst = self.plan.finalize(select, solutions, insertedvars)
   828         if self.temptable is None and self.finaltable is None:
   940         if self.temptable is None and self.finaltable is None:
   829             return OneFetchStep(plan, rqlst, sources, inputmap=inputmap)
   941             return OneFetchStep(self.plan, rqlst, sources, inputmap=inputmap)
   830         table = self.temptable or self.finaltable
   942         table = self.temptable or self.finaltable
   831         return FetchStep(plan, rqlst, sources, table, True, inputmap)
   943         return FetchStep(self.plan, rqlst, sources, table, True, inputmap)
   832 
   944 
   833     def build_non_final_part(self, select, solindices, sources, insertedvars,
   945     def build_non_final_part(self, select, solindices, sources, insertedvars,
   834                              table):
   946                              table):
   835         """non final step, will have to store results in a temporary table"""
   947         """non final step, will have to store results in a temporary table"""
   836         plan = self.plan
   948         solutions = [self._solutions[i] for i in solindices]
   837         rqlst = plan.finalize(select, [self._solutions[i] for i in solindices],
   949         rqlst = self.plan.finalize(select, solutions, insertedvars)
   838                               insertedvars)
   950         step = FetchStep(self.plan, rqlst, sources, table, False)
   839         step = FetchStep(plan, rqlst, sources, table, False)
       
   840         # update input map for following steps, according to processed solutions
   951         # update input map for following steps, according to processed solutions
   841         inputmapkey = tuple(sorted(solindices))
   952         inputmapkey = tuple(sorted(solindices))
   842         inputmap = self._inputmaps.setdefault(inputmapkey, {})
   953         inputmap = self._inputmaps.setdefault(inputmapkey, {})
       
   954         for varname, mapping in step.outputmap.iteritems():
       
   955             if varname in inputmap and \
       
   956                    not (mapping == inputmap[varname] or
       
   957                         self._schema.eschema(solutions[0][varname]).is_final()):
       
   958                 self._conflicts.append((varname, inputmap[varname]))
   843         inputmap.update(step.outputmap)
   959         inputmap.update(step.outputmap)
   844         plan.add_step(step)
   960         self.plan.add_step(step)
   845 
   961 
   846 
   962 
   847 class MSPlanner(SSPlanner):
   963 class MSPlanner(SSPlanner):
   848     """MultiSourcesPlanner: build execution plan for rql queries
   964     """MultiSourcesPlanner: build execution plan for rql queries
   849 
   965 
   970             select_group_sort(select)
  1086             select_group_sort(select)
   971         else:
  1087         else:
   972             atemptable = None
  1088             atemptable = None
   973             selection = select.selection
  1089             selection = select.selection
   974         ppi.temptable = atemptable
  1090         ppi.temptable = atemptable
   975         vfilter = VariablesFiltererVisitor(self.schema, ppi)
  1091         vfilter = TermsFiltererVisitor(self.schema, ppi)
   976         steps = []
  1092         steps = []
   977         for sources, variables, solindices, scope, needsel, final in stepdefs:
  1093         for sources, terms, solindices, scope, needsel, final in stepdefs:
   978             # extract an executable query using only the specified variables
  1094             # extract an executable query using only the specified terms
   979             if sources[0].uri == 'system':
  1095             if sources[0].uri == 'system':
   980                 # in this case we have to merge input maps before call to
  1096                 # in this case we have to merge input maps before call to
   981                 # filter so already processed restriction are correctly
  1097                 # filter so already processed restriction are correctly
   982                 # removed
  1098                 # removed
   983                 solsinputmaps = ppi.merge_input_maps(solindices)
  1099                 solsinputmaps = ppi.merge_input_maps(solindices)
   984                 for solindices, inputmap in solsinputmaps:
  1100                 for solindices, inputmap in solsinputmaps:
   985                     minrqlst, insertedvars = vfilter.filter(
  1101                     minrqlst, insertedvars = vfilter.filter(
   986                         sources, variables, scope, set(solindices), needsel, final)
  1102                         sources, terms, scope, set(solindices), needsel, final)
   987                     if inputmap is None:
  1103                     if inputmap is None:
   988                         inputmap = subinputmap
  1104                         inputmap = subinputmap
   989                     else:
  1105                     else:
   990                         inputmap.update(subinputmap)
  1106                         inputmap.update(subinputmap)
   991                     steps.append(ppi.build_final_part(minrqlst, solindices, inputmap,
  1107                     steps.append(ppi.build_final_part(minrqlst, solindices, inputmap,
   992                                                       sources, insertedvars))
  1108                                                       sources, insertedvars))
   993             else:
  1109             else:
   994                 # this is a final part (i.e. retreiving results for the
  1110                 # this is a final part (i.e. retreiving results for the
   995                 # original query part) if all variable / sources have been
  1111                 # original query part) if all term / sources have been
   996                 # treated or if this is the last shot for used solutions
  1112                 # treated or if this is the last shot for used solutions
   997                 minrqlst, insertedvars = vfilter.filter(
  1113                 minrqlst, insertedvars = vfilter.filter(
   998                     sources, variables, scope, solindices, needsel, final)
  1114                     sources, terms, scope, solindices, needsel, final)
   999                 if final:
  1115                 if final:
  1000                     solsinputmaps = ppi.merge_input_maps(solindices)
  1116                     solsinputmaps = ppi.merge_input_maps(solindices)
  1001                     for solindices, inputmap in solsinputmaps:
  1117                     for solindices, inputmap in solsinputmaps:
  1002                         if inputmap is None:
  1118                         if inputmap is None:
  1003                             inputmap = subinputmap
  1119                             inputmap = subinputmap
  1004                         else:
  1120                         else:
  1005                             inputmap.update(subinputmap)
  1121                             inputmap.update(subinputmap)
  1006                         steps.append(ppi.build_final_part(minrqlst, solindices, inputmap,
  1122                         steps.append(ppi.build_final_part(minrqlst, solindices, inputmap,
  1007                                                   sources, insertedvars))
  1123                                                           sources, insertedvars))
  1008                 else:
  1124                 else:
  1009                     table = '_T%s%s' % (''.join(sorted(v._ms_table_key() for v in variables)),
  1125                     table = '_T%s%s' % (''.join(sorted(v._ms_table_key() for v in terms)),
  1010                                         ''.join(sorted(str(i) for i in solindices)))
  1126                                         ''.join(sorted(str(i) for i in solindices)))
  1011                     ppi.build_non_final_part(minrqlst, solindices, sources,
  1127                     ppi.build_non_final_part(minrqlst, solindices, sources,
  1012                                              insertedvars, table)
  1128                                              insertedvars, table)
  1013         # finally: join parts, deal with aggregat/group/sorts if necessary
  1129         # finally: join parts, deal with aggregat/group/sorts if necessary
  1014         if atemptable is not None:
  1130         if atemptable is not None:
  1037     
  1153     
  1038 class UnsupportedBranch(Exception):
  1154 class UnsupportedBranch(Exception):
  1039     pass
  1155     pass
  1040 
  1156 
  1041 
  1157 
  1042 class VariablesFiltererVisitor(object):
  1158 class TermsFiltererVisitor(object):
  1043     def __init__(self, schema, ppi):
  1159     def __init__(self, schema, ppi):
  1044         self.schema = schema
  1160         self.schema = schema
  1045         self.ppi = ppi
  1161         self.ppi = ppi
  1046         self.skip = {}
  1162         self.skip = {}
  1047         self.hasaggrstep = self.ppi.temptable
  1163         self.hasaggrstep = self.ppi.temptable
  1048         self.extneedsel = frozenset(vref.name for sortterm in ppi.rqlst.orderby
  1164         self.extneedsel = frozenset(vref.name for sortterm in ppi.rqlst.orderby
  1049                                     for vref in sortterm.iget_nodes(VariableRef))
  1165                                     for vref in sortterm.iget_nodes(VariableRef))
  1050         
  1166         
  1051     def _rqlst_accept(self, rqlst, node, newroot, variables, setfunc=None):
  1167     def _rqlst_accept(self, rqlst, node, newroot, terms, setfunc=None):
  1052         try:
  1168         try:
  1053             newrestr, node_ = node.accept(self, newroot, variables[:])
  1169             newrestr, node_ = node.accept(self, newroot, terms[:])
  1054         except UnsupportedBranch:
  1170         except UnsupportedBranch:
  1055             return rqlst
  1171             return rqlst
  1056         if setfunc is not None and newrestr is not None:
  1172         if setfunc is not None and newrestr is not None:
  1057             setfunc(newrestr)
  1173             setfunc(newrestr)
  1058         if not node_ is node:
  1174         if not node_ is node:
  1059             rqlst = node.parent
  1175             rqlst = node.parent
  1060         return rqlst
  1176         return rqlst
  1061 
  1177 
  1062     def filter(self, sources, variables, rqlst, solindices, needsel, final):
  1178     def filter(self, sources, terms, rqlst, solindices, needsel, final):
  1063         if server.DEBUG:
  1179         if server.DEBUG:
  1064             print 'filter', final and 'final' or '', sources, variables, rqlst, solindices, needsel
  1180             print 'filter', final and 'final' or '', sources, terms, rqlst, solindices, needsel
  1065         newroot = Select()
  1181         newroot = Select()
  1066         self.sources = sorted(sources)
  1182         self.sources = sorted(sources)
  1067         self.variables = variables
  1183         self.terms = terms
  1068         self.solindices = solindices
  1184         self.solindices = solindices
  1069         self.final = final
  1185         self.final = final
  1070         # variables which appear in unsupported branches
  1186         # terms which appear in unsupported branches
  1071         needsel |= self.extneedsel
  1187         needsel |= self.extneedsel
  1072         self.needsel = needsel
  1188         self.needsel = needsel
  1073         # variables which appear in supported branches
  1189         # terms which appear in supported branches
  1074         self.mayneedsel = set()
  1190         self.mayneedsel = set()
  1075         # new inserted variables
  1191         # new inserted variables
  1076         self.insertedvars = []
  1192         self.insertedvars = []
  1077         # other structures (XXX document)
  1193         # other structures (XXX document)
  1078         self.mayneedvar, self.hasvar = {}, {}
  1194         self.mayneedvar, self.hasvar = {}, {}
  1079         self.use_only_defined = False
  1195         self.use_only_defined = False
  1080         self.scopes = {rqlst: newroot}
  1196         self.scopes = {rqlst: newroot}
  1081         if rqlst.where:
  1197         if rqlst.where:
  1082             rqlst = self._rqlst_accept(rqlst, rqlst.where, newroot, variables,
  1198             rqlst = self._rqlst_accept(rqlst, rqlst.where, newroot, terms,
  1083                                        newroot.set_where)
  1199                                        newroot.set_where)
  1084         if isinstance(rqlst, Select):
  1200         if isinstance(rqlst, Select):
  1085             self.use_only_defined = True
  1201             self.use_only_defined = True
  1086             if rqlst.groupby:
  1202             if rqlst.groupby:
  1087                 groupby = []
  1203                 groupby = []
  1088                 for node in rqlst.groupby:
  1204                 for node in rqlst.groupby:
  1089                     rqlst = self._rqlst_accept(rqlst, node, newroot, variables,
  1205                     rqlst = self._rqlst_accept(rqlst, node, newroot, terms,
  1090                                                groupby.append)
  1206                                                groupby.append)
  1091                 if groupby:
  1207                 if groupby:
  1092                     newroot.set_groupby(groupby)
  1208                     newroot.set_groupby(groupby)
  1093             if rqlst.having:
  1209             if rqlst.having:
  1094                 having = []
  1210                 having = []
  1095                 for node in rqlst.having:
  1211                 for node in rqlst.having:
  1096                     rqlst = self._rqlst_accept(rqlst, node, newroot, variables,
  1212                     rqlst = self._rqlst_accept(rqlst, node, newroot, terms,
  1097                                                having.append)
  1213                                                having.append)
  1098                 if having:
  1214                 if having:
  1099                     newroot.set_having(having)
  1215                     newroot.set_having(having)
  1100             if final and rqlst.orderby and not self.hasaggrstep:
  1216             if final and rqlst.orderby and not self.hasaggrstep:
  1101                 orderby = []
  1217                 orderby = []
  1102                 for node in rqlst.orderby:
  1218                 for node in rqlst.orderby:
  1103                     rqlst = self._rqlst_accept(rqlst, node, newroot, variables,
  1219                     rqlst = self._rqlst_accept(rqlst, node, newroot, terms,
  1104                                                orderby.append)
  1220                                                orderby.append)
  1105                 if orderby:
  1221                 if orderby:
  1106                     newroot.set_orderby(orderby)
  1222                     newroot.set_orderby(orderby)
  1107             self.process_selection(newroot, variables, rqlst)
  1223             self.process_selection(newroot, terms, rqlst)
  1108         elif not newroot.where:
  1224         elif not newroot.where:
  1109             # no restrictions have been copied, just select variables and add
  1225             # no restrictions have been copied, just select terms and add
  1110             # type restriction (done later by add_types_restriction)
  1226             # type restriction (done later by add_types_restriction)
  1111             for v in variables:
  1227             for v in terms:
  1112                 if not isinstance(v, Variable):
  1228                 if not isinstance(v, Variable):
  1113                     continue
  1229                     continue
  1114                 newroot.append_selected(VariableRef(newroot.get_variable(v.name)))
  1230                 newroot.append_selected(VariableRef(newroot.get_variable(v.name)))
  1115         solutions = self.ppi.copy_solutions(solindices)
  1231         solutions = self.ppi.copy_solutions(solindices)
  1116         cleanup_solutions(newroot, solutions)
  1232         cleanup_solutions(newroot, solutions)
  1154         add_types_restriction(self.schema, rqlst, newroot, solutions)
  1270         add_types_restriction(self.schema, rqlst, newroot, solutions)
  1155         if server.DEBUG:
  1271         if server.DEBUG:
  1156             print '--->', newroot
  1272             print '--->', newroot
  1157         return newroot, self.insertedvars
  1273         return newroot, self.insertedvars
  1158         
  1274         
  1159     def visit_and(self, node, newroot, variables):
  1275     def visit_and(self, node, newroot, terms):
  1160         subparts = []
  1276         subparts = []
  1161         for i in xrange(len(node.children)):
  1277         for i in xrange(len(node.children)):
  1162             child = node.children[i]
  1278             child = node.children[i]
  1163             try:
  1279             try:
  1164                 newchild, child_ = child.accept(self, newroot, variables)
  1280                 newchild, child_ = child.accept(self, newroot, terms)
  1165                 if not child_ is child:
  1281                 if not child_ is child:
  1166                     node = child_.parent
  1282                     node = child_.parent
  1167                 if newchild is None:
  1283                 if newchild is None:
  1168                     continue
  1284                     continue
  1169                 subparts.append(newchild)
  1285                 subparts.append(newchild)
  1178     visit_or = visit_and
  1294     visit_or = visit_and
  1179 
  1295 
  1180     def _relation_supported(self, relation):
  1296     def _relation_supported(self, relation):
  1181         rtype = relation.r_type
  1297         rtype = relation.r_type
  1182         for source in self.sources:
  1298         for source in self.sources:
  1183             if not source.support_relation(rtype) \
  1299             if not source.support_relation(rtype) or (
  1184                    or (rtype in source.cross_relations and not relation in self.variables):#self.ppi.crossed_relation(source, relation):
  1300                 rtype in source.cross_relations and not relation in self.terms):
  1185                 return False
  1301                 return False
  1186         if not self.final:
  1302         if not self.final and not relation in self.terms:
  1187             rschema = self.schema.rschema(relation.r_type)
  1303             rschema = self.schema.rschema(relation.r_type)
  1188             if not rschema.is_final():
  1304             if not rschema.is_final():
  1189                 for term in relation.get_nodes((VariableRef, Constant)):
  1305                 for term in relation.get_nodes((VariableRef, Constant)):
  1190                     term = getattr(term, 'variable', term)
  1306                     term = getattr(term, 'variable', term)
  1191                     termsources = sorted(set(x[0] for x in self.ppi._term_sources(term)))
  1307                     termsources = sorted(set(x[0] for x in self.ppi._term_sources(term)))
  1192                     if termsources and termsources != self.sources:
  1308                     if termsources and termsources != self.sources:
  1193                         return False
  1309                         return False
  1194         return True
  1310         return True
  1195         
  1311         
  1196     def visit_relation(self, node, newroot, variables):
  1312     def visit_relation(self, node, newroot, terms):
  1197         if not node.is_types_restriction():
  1313         if not node.is_types_restriction():
  1198             if node in self.skip and self.solindices.issubset(self.skip[node]):
  1314             if node in self.skip and self.solindices.issubset(self.skip[node]):
  1199                 if not self.schema.rschema(node.r_type).is_final():
  1315                 if not self.schema.rschema(node.r_type).is_final():
  1200                     # can't really skip the relation if one variable is selected and only
  1316                     # can't really skip the relation if one variable is selected and only
  1201                     # referenced by this relation
  1317                     # referenced by this relation
  1212         # don't copy type restriction unless this is the only relation for the
  1328         # don't copy type restriction unless this is the only relation for the
  1213         # rhs variable, else they'll be reinserted later as needed (else we may
  1329         # rhs variable, else they'll be reinserted later as needed (else we may
  1214         # copy a type restriction while the variable is not actually used)
  1330         # copy a type restriction while the variable is not actually used)
  1215         elif not any(self._relation_supported(rel)
  1331         elif not any(self._relation_supported(rel)
  1216                      for rel in node.children[0].variable.stinfo['relations']):
  1332                      for rel in node.children[0].variable.stinfo['relations']):
  1217             rel, node = self.visit_default(node, newroot, variables)
  1333             rel, node = self.visit_default(node, newroot, terms)
  1218             return rel, node
  1334             return rel, node
  1219         else:
  1335         else:
  1220             raise UnsupportedBranch()
  1336             raise UnsupportedBranch()
  1221         rschema = self.schema.rschema(node.r_type)
  1337         rschema = self.schema.rschema(node.r_type)
  1222         res = self.visit_default(node, newroot, variables)[0]
  1338         try:
       
  1339             res = self.visit_default(node, newroot, terms)[0]
       
  1340         except Exception, ex:
       
  1341             raise
  1223         ored = node.ored()
  1342         ored = node.ored()
  1224         if rschema.is_final() or rschema.inlined:
  1343         if rschema.is_final() or rschema.inlined:
  1225             vrefs = node.children[1].get_nodes(VariableRef)
  1344             vrefs = node.children[1].get_nodes(VariableRef)
  1226             if not vrefs:
  1345             if not vrefs:
  1227                 if not ored:
  1346                 if not ored:
  1231             else:
  1350             else:
  1232                 assert len(vrefs) == 1
  1351                 assert len(vrefs) == 1
  1233                 vref = vrefs[0]
  1352                 vref = vrefs[0]
  1234                 # XXX check operator ?
  1353                 # XXX check operator ?
  1235                 self.hasvar[(node.children[0].name, rschema)] = vref
  1354                 self.hasvar[(node.children[0].name, rschema)] = vref
  1236                 if self._may_skip_attr_rel(rschema, node, vref, ored, variables, res):
  1355                 if self._may_skip_attr_rel(rschema, node, vref, ored, terms, res):
  1237                     self.skip.setdefault(node, set()).update(self.solindices)
  1356                     self.skip.setdefault(node, set()).update(self.solindices)
  1238         elif not ored:
  1357         elif not ored:
  1239             self.skip.setdefault(node, set()).update(self.solindices)
  1358             self.skip.setdefault(node, set()).update(self.solindices)
  1240         return res, node
  1359         return res, node
  1241 
  1360 
  1242     def _may_skip_attr_rel(self, rschema, rel, vref, ored, variables, res):
  1361     def _may_skip_attr_rel(self, rschema, rel, vref, ored, terms, res):
  1243         var = vref.variable
  1362         var = vref.variable
  1244         if ored:
  1363         if ored:
  1245             return False
  1364             return False
  1246         if var.name in self.extneedsel or var.stinfo['selected']:
  1365         if var.name in self.extneedsel or var.stinfo['selected']:
  1247             return False
  1366             return False
  1248         if not same_scope(var):
  1367         if not same_scope(var):
  1249             return False
  1368             return False
  1250         if any(v for v,_ in var.stinfo['attrvars'] if not v.name in variables):
  1369         if any(v for v,_ in var.stinfo['attrvars'] if not v.name in terms):
  1251             return False
  1370             return False
  1252         return True
  1371         return True
  1253         
  1372         
  1254     def visit_exists(self, node, newroot, variables):
  1373     def visit_exists(self, node, newroot, terms):
  1255         newexists = node.__class__()
  1374         newexists = node.__class__()
  1256         self.scopes = {node: newexists}
  1375         self.scopes = {node: newexists}
  1257         subparts, node = self._visit_children(node, newroot, variables)
  1376         subparts, node = self._visit_children(node, newroot, terms)
  1258         if not subparts:
  1377         if not subparts:
  1259             return None, node
  1378             return None, node
  1260         newexists.set_where(subparts[0])
  1379         newexists.set_where(subparts[0])
  1261         return newexists, node
  1380         return newexists, node
  1262     
  1381     
  1263     def visit_not(self, node, newroot, variables):
  1382     def visit_not(self, node, newroot, terms):
  1264         subparts, node = self._visit_children(node, newroot, variables)
  1383         subparts, node = self._visit_children(node, newroot, terms)
  1265         if not subparts:
  1384         if not subparts:
  1266             return None, node
  1385             return None, node
  1267         return copy_node(newroot, node, subparts), node
  1386         return copy_node(newroot, node, subparts), node
  1268     
  1387     
  1269     def visit_group(self, node, newroot, variables):
  1388     def visit_group(self, node, newroot, terms):
  1270         if not self.final:
  1389         if not self.final:
  1271             return None, node
  1390             return None, node
  1272         return self.visit_default(node, newroot, variables)
  1391         return self.visit_default(node, newroot, terms)
  1273             
  1392             
  1274     def visit_variableref(self, node, newroot, variables):
  1393     def visit_variableref(self, node, newroot, terms):
  1275         if self.use_only_defined:
  1394         if self.use_only_defined:
  1276             if not node.variable.name in newroot.defined_vars:
  1395             if not node.variable.name in newroot.defined_vars:
  1277                 raise UnsupportedBranch(node.name)
  1396                 raise UnsupportedBranch(node.name)
  1278         elif not node.variable in variables:
  1397         elif not node.variable in terms:
  1279             raise UnsupportedBranch(node.name)
  1398             raise UnsupportedBranch(node.name)
  1280         self.mayneedsel.add(node.name)
  1399         self.mayneedsel.add(node.name)
  1281         # set scope so we can insert types restriction properly
  1400         # set scope so we can insert types restriction properly
  1282         newvar = newroot.get_variable(node.name)
  1401         newvar = newroot.get_variable(node.name)
  1283         newvar.stinfo['scope'] = self.scopes.get(node.variable.scope, newroot)
  1402         newvar.stinfo['scope'] = self.scopes.get(node.variable.scope, newroot)
  1284         return VariableRef(newvar), node
  1403         return VariableRef(newvar), node
  1285 
  1404 
  1286     def visit_constant(self, node, newroot, variables):
  1405     def visit_constant(self, node, newroot, terms):
  1287         return copy_node(newroot, node), node
  1406         return copy_node(newroot, node), node
  1288     
  1407     
  1289     def visit_default(self, node, newroot, variables):
  1408     def visit_default(self, node, newroot, terms):
  1290         subparts, node = self._visit_children(node, newroot, variables)
  1409         subparts, node = self._visit_children(node, newroot, terms)
  1291         return copy_node(newroot, node, subparts), node
  1410         return copy_node(newroot, node, subparts), node
  1292         
  1411         
  1293     visit_comparison = visit_mathexpression = visit_constant = visit_function = visit_default
  1412     visit_comparison = visit_mathexpression = visit_constant = visit_function = visit_default
  1294     visit_sort = visit_sortterm = visit_default
  1413     visit_sort = visit_sortterm = visit_default
  1295     
  1414     
  1296     def _visit_children(self, node, newroot, variables):
  1415     def _visit_children(self, node, newroot, terms):
  1297         subparts = []
  1416         subparts = []
  1298         for i in xrange(len(node.children)):
  1417         for i in xrange(len(node.children)):
  1299             child = node.children[i]
  1418             child = node.children[i]
  1300             newchild, child_ = child.accept(self, newroot, variables)
  1419             newchild, child_ = child.accept(self, newroot, terms)
  1301             if not child is child_:
  1420             if not child is child_:
  1302                 node = child_.parent
  1421                 node = child_.parent
  1303             if newchild is not None:
  1422             if newchild is not None:
  1304                 subparts.append(newchild)
  1423                 subparts.append(newchild)
  1305         return subparts, node
  1424         return subparts, node
  1306     
  1425     
  1307     def process_selection(self, newroot, variables, rqlst):
  1426     def process_selection(self, newroot, terms, rqlst):
  1308         if self.final:
  1427         if self.final:
  1309             for term in rqlst.selection:
  1428             for term in rqlst.selection:
  1310                 newroot.append_selected(term.copy(newroot))
  1429                 newroot.append_selected(term.copy(newroot))
  1311                 for vref in term.get_nodes(VariableRef):
  1430                 for vref in term.get_nodes(VariableRef):
  1312                     self.needsel.add(vref.name)
  1431                     self.needsel.add(vref.name)
  1315             vrefs = term.get_nodes(VariableRef)
  1434             vrefs = term.get_nodes(VariableRef)
  1316             if vrefs:
  1435             if vrefs:
  1317                 supportedvars = []
  1436                 supportedvars = []
  1318                 for vref in vrefs:
  1437                 for vref in vrefs:
  1319                     var = vref.variable
  1438                     var = vref.variable
  1320                     if var in variables:
  1439                     if var in terms:
  1321                         supportedvars.append(vref)
  1440                         supportedvars.append(vref)
  1322                         continue
  1441                         continue
  1323                     else:
  1442                     else:
  1324                         self.needsel.add(vref.name)
  1443                         self.needsel.add(vref.name)
  1325                         break
  1444                         break
  1329                     supportedvars = []
  1448                     supportedvars = []
  1330                 for vref in supportedvars:
  1449                 for vref in supportedvars:
  1331                     if not vref in newroot.get_selected_variables():
  1450                     if not vref in newroot.get_selected_variables():
  1332                         newroot.append_selected(VariableRef(newroot.get_variable(vref.name)))
  1451                         newroot.append_selected(VariableRef(newroot.get_variable(vref.name)))
  1333             
  1452             
  1334     def add_necessary_selection(self, newroot, variables):
  1453     def add_necessary_selection(self, newroot, terms):
  1335         selected = tuple(newroot.get_selected_variables())
  1454         selected = tuple(newroot.get_selected_variables())
  1336         for varname in variables:
  1455         for varname in terms:
  1337             var = newroot.defined_vars[varname]
  1456             var = newroot.defined_vars[varname]
  1338             for vref in var.references():
  1457             for vref in var.references():
  1339                 rel = vref.relation()
  1458                 rel = vref.relation()
  1340                 if rel is None and vref in selected:
  1459                 if rel is None and vref in selected:
  1341                     # already selected
  1460                     # already selected