server/msplanner.py
branchtls-sprint
changeset 1785 01245e2a777d
parent 1409 f4dee84a618f
child 1786 eccd1885d42e
--- a/server/msplanner.py	Wed May 13 15:06:02 2009 +0200
+++ b/server/msplanner.py	Wed May 13 15:46:47 2009 +0200
@@ -16,7 +16,7 @@
      "cross_relations" set in the source's mapping file and it that case, we'll
      consider that we can also find in the system source some relation between
      X and Y coming from different sources.
-     
+
    * if "relation" isn't supported by the external source but X or Y
      types (or both) are, we suppose by default that can find in the system
      source some relation where X and/or Y come from the external source. You
@@ -49,7 +49,7 @@
   1. return the result of CWUser X WHERE X in_group G, G name 'users' from system
      source, that's enough (optimization of the sql querier will avoid join on
      CWUser, so we will directly get local eids)
-    
+
 :CWUser X,L WHERE X in_group G, X login L, G name 'users':
 1. fetch Any X,L WHERE X is CWUser, X login L from both sources, store
    concatenation of results into a temporary table
@@ -98,7 +98,7 @@
 
 AbstractSource.dont_cross_relations = ()
 AbstractSource.cross_relations = ()
-    
+
 def need_aggr_step(select, sources, stepdefs=None):
     """return True if a temporary table is necessary to store some partial
     results to execute the given query
@@ -169,7 +169,7 @@
     for part in subparts:
         newnode.append(part)
     return newnode
-        
+
 def same_scope(var):
     """return true if the variable is always used in the same scope"""
     try:
@@ -181,7 +181,7 @@
                 return False
         var.stinfo['samescope'] = True
         return True
-    
+
 ################################################################################
 
 class PartPlanInformation(object):
@@ -198,19 +198,19 @@
       the execution plan
     :attr rqlst:
       the original rql syntax tree handled by this part
-      
+
     :attr needsplit:
       bool telling if the query has to be split into multiple steps for
       execution or if it can be executed at once
-      
+
     :attr temptable:
       a SQL temporary table name or None, if necessary to handle aggregate /
       sorting for this part of the query
-      
+
     :attr finaltable:
       a SQL table name or None, if results for this part of the query should be
       written into a temporary table (usually shared by multiple PPI)
-      
+
     :attr sourcesterms:
       a dictionary {source : {term: set([solution index, ])}} telling for each
       source which terms are supported for which solutions. A "term" may be
@@ -262,23 +262,23 @@
             print 'sourcesterms:'
             for source, terms in self.sourcesterms.items():
                 print source, terms
-            
+
     def copy_solutions(self, solindices):
         return [self._solutions[solidx].copy() for solidx in solindices]
-    
+
     @property
     @cached
     def part_sources(self):
         if self._sourcesterms:
             return tuple(sorted(self._sourcesterms))
         return (self.system_source,)
-    
+
     @property
     @cached
     def _sys_source_set(self):
         return frozenset((self.system_source, solindex)
-                         for solindex in self._solindices)        
-       
+                         for solindex in self._solindices)
+
     @cached
     def _norel_support_set(self, relation):
         """return a set of (source, solindex) where source doesn't support the
@@ -340,7 +340,7 @@
                         # query
                         if not varobj._q_invariant and any(ifilterfalse(
                             source.support_relation, (r.r_type for r in rels))):
-                            self.needsplit = True               
+                            self.needsplit = True
         # add source for rewritten constants to sourcesterms
         for vconsts in self.rqlst.stinfo['rewritten'].itervalues():
             const = vconsts[0]
@@ -397,7 +397,7 @@
                 self._linkedterms.setdefault(lhsv, set()).add((rhsv, rel))
                 self._linkedterms.setdefault(rhsv, set()).add((lhsv, rel))
         return termssources
-            
+
     def _handle_cross_relation(self, rel, relsources, termssources):
         for source in relsources:
             if rel.r_type in source.cross_relations:
@@ -422,7 +422,7 @@
                         break
                 else:
                     self._sourcesterms.setdefault(source, {})[rel] = set(self._solindices)
-    
+
     def _remove_invalid_sources(self, termssources):
         """removes invalid sources from `sourcesterms` member according to
         traversed relations and their properties (which sources support them,
@@ -455,7 +455,7 @@
                         continue
                     self._remove_term_sources(lhsv, rel, rhsv, termssources)
                     self._remove_term_sources(rhsv, rel, lhsv, termssources)
-                                    
+
     def _extern_term(self, term, termssources, inserted):
         var = term.variable
         if var.stinfo['constnode']:
@@ -471,7 +471,7 @@
             if not termv in termssources:
                 termssources[termv] = self._term_sources(termv)
         return termv
-        
+
     def _remove_sources_until_stable(self, term, termssources):
         sourcesterms = self._sourcesterms
         for oterm, rel in self._linkedterms.get(term, ()):
@@ -506,10 +506,10 @@
                 self._remove_term_sources(term, rel, oterm, termssources)
             if not need_ancestor_scope or is_ancestor(oterm.scope, term.scope):
                 self._remove_term_sources(oterm, rel, term, termssources)
-    
+
     def _remove_term_sources(self, term, rel, oterm, termssources):
         """remove invalid sources for term according to oterm's sources and the
-        relation between those two terms. 
+        relation between those two terms.
         """
         norelsup = self._norel_support_set(rel)
         termsources = termssources[term]
@@ -528,21 +528,23 @@
             self._remove_sources(term, invalid_sources)
             termsources -= invalid_sources
             self._remove_sources_until_stable(term, termssources)
-        
+            if isinstance(oterm, Constant):
+                self._remove_sources(oterm, invalid_sources)
+
     def _compute_needsplit(self):
         """tell according to sourcesterms if the rqlst has to be splitted for
         execution among multiple sources
-        
+
         the execution has to be split if
         * a source support an entity (non invariant) but doesn't support a
           relation on it
         * a source support an entity which is accessed by an optional relation
-        * there is more than one source and either all sources'supported        
+        * there is more than one source and either all sources'supported
           variable/solutions are not equivalent or multiple variables have to
           be fetched from some source
         """
         # NOTE: < 2 since may be 0 on queries such as Any X WHERE X eid 2
-        if len(self._sourcesterms) < 2: 
+        if len(self._sourcesterms) < 2:
             self.needsplit = False
         elif not self.needsplit:
             if not allequals(self._sourcesterms.itervalues()):
@@ -576,7 +578,7 @@
                 if not r is rel and self._repo.is_multi_sources_relation(r.r_type)):
             return True
         return False
-        
+
     def _set_source_for_term(self, source, term):
         self._sourcesterms.setdefault(source, {})[term] = set(self._solindices)
 
@@ -603,7 +605,7 @@
             try:
                 sourcesterms[source][term].remove(solindex)
             except KeyError:
-                return # may occur with subquery column alias
+                continue # may occur with subquery column alias
             if not sourcesterms[source][term]:
                 del sourcesterms[source][term]
                 if not sourcesterms[source]:
@@ -611,7 +613,7 @@
 
     def crossed_relation(self, source, relation):
         return relation in self._crossrelations.get(source, ())
-    
+
     def part_steps(self):
         """precompute necessary part steps before generating actual rql for
         each step. This is necessary to know if an aggregate step will be
@@ -758,10 +760,10 @@
             # ensure relation is using '=' operator, else we rely on a
             # sqlgenerator side effect (it won't insert an inequality operator
             # in this case)
-            relation.children[1].operator = '=' 
+            relation.children[1].operator = '='
             terms.append(newvar)
             needsel.add(newvar.name)
-        
+
     def _choose_term(self, sourceterms):
         """pick one term among terms supported by a source, which will be used
         as a base to generate an execution step
@@ -798,7 +800,7 @@
         # whatever (relation)
         term = iter(sourceterms).next()
         return term, sourceterms.pop(term)
-            
+
     def _expand_sources(self, selected_source, term, solindices):
         """return all sources supporting given term / solindices"""
         sources = [selected_source]
@@ -806,7 +808,7 @@
         for source in sourcesterms.keys():
             if source is selected_source:
                 continue
-            if not (term in sourcesterms[source] and 
+            if not (term in sourcesterms[source] and
                     solindices.issubset(sourcesterms[source][term])):
                 continue
             sources.append(source)
@@ -818,7 +820,7 @@
                     if not sourcesterms[source]:
                         del sourcesterms[source]
         return sources
-            
+
     def _expand_terms(self, term, sources, sourceterms, scope, solindices):
         terms = [term]
         sources = sorted(sources)
@@ -876,7 +878,7 @@
                     modified = True
                     self._cleanup_sourcesterms(sources, solindices, term)
         return terms
-    
+
     def _cleanup_sourcesterms(self, sources, solindices, term=None):
         """remove solutions so we know they are already processed"""
         for source in sources:
@@ -901,7 +903,7 @@
                     #assert term in cross_terms
             if not sourceterms:
                 del self._sourcesterms[source]
-                
+
     def merge_input_maps(self, allsolindices):
         """inputmaps is a dictionary with tuple of solution indices as key with
         an associated input map as value. This function compute for each
@@ -911,7 +913,7 @@
         inputmaps = {(0, 1, 2): {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1'},
                      (1,): {'X': 't2.C0', 'T': 't2.C1'}}
         return : [([1],  {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1',
-                           'X': 't2.C0', 'T': 't2.C1'}),                   
+                           'X': 't2.C0', 'T': 't2.C1'}),
                   ([0,2], {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1'})]
         """
         if not self._inputmaps:
@@ -980,10 +982,10 @@
 
     decompose the RQL query according to sources'schema
     """
-        
+
     def build_select_plan(self, plan, rqlst):
         """build execution plan for a SELECT RQL query
-               
+
         the rqlst should not be tagged at this point
         """
         if server.DEBUG:
@@ -1030,7 +1032,7 @@
                     inputmap[colalias.name] = '%s.C%s' % (temptable, i)
                 ppi.plan.add_step(sstep)
         return inputmap
-    
+
     def _union_plan(self, plan, union, ppis, temptable=None):
         tosplit, cango, allsources = [], {}, set()
         for planinfo in ppis:
@@ -1088,7 +1090,7 @@
         return steps
 
     # internal methods for multisources decomposition #########################
-    
+
     def split_part(self, ppi, temptable):
         ppi.finaltable = temptable
         plan = ppi.plan
@@ -1172,7 +1174,7 @@
             step.set_limit_offset(select.limit, select.offset)
         return step
 
-    
+
 class UnsupportedBranch(Exception):
     pass
 
@@ -1185,7 +1187,7 @@
         self.hasaggrstep = self.ppi.temptable
         self.extneedsel = frozenset(vref.name for sortterm in ppi.rqlst.orderby
                                     for vref in sortterm.iget_nodes(VariableRef))
-        
+
     def _rqlst_accept(self, rqlst, node, newroot, terms, setfunc=None):
         try:
             newrestr, node_ = node.accept(self, newroot, terms[:])
@@ -1293,7 +1295,7 @@
         if server.DEBUG:
             print '--->', newroot
         return newroot, self.insertedvars
-        
+
     def visit_and(self, node, newroot, terms):
         subparts = []
         for i in xrange(len(node.children)):
@@ -1330,7 +1332,7 @@
                     if termsources and termsources != self.sources:
                         return False
         return True
-        
+
     def visit_relation(self, node, newroot, terms):
         if not node.is_types_restriction():
             if node in self.skip and self.solindices.issubset(self.skip[node]):
@@ -1368,7 +1370,7 @@
                 if not ored:
                     self.skip.setdefault(node, set()).update(self.solindices)
                 else:
-                    self.mayneedvar.setdefault((node.children[0].name, rschema), []).append( (res, ored) )                    
+                    self.mayneedvar.setdefault((node.children[0].name, rschema), []).append( (res, ored) )
             else:
                 assert len(vrefs) == 1
                 vref = vrefs[0]
@@ -1391,7 +1393,7 @@
         if any(v for v, _ in var.stinfo['attrvars'] if not v in terms):
             return False
         return True
-        
+
     def visit_exists(self, node, newroot, terms):
         newexists = node.__class__()
         self.scopes = {node: newexists}
@@ -1400,18 +1402,18 @@
             return None, node
         newexists.set_where(subparts[0])
         return newexists, node
-    
+
     def visit_not(self, node, newroot, terms):
         subparts, node = self._visit_children(node, newroot, terms)
         if not subparts:
             return None, node
         return copy_node(newroot, node, subparts), node
-    
+
     def visit_group(self, node, newroot, terms):
         if not self.final:
             return None, node
         return self.visit_default(node, newroot, terms)
-            
+
     def visit_variableref(self, node, newroot, terms):
         if self.use_only_defined:
             if not node.variable.name in newroot.defined_vars:
@@ -1426,14 +1428,14 @@
 
     def visit_constant(self, node, newroot, terms):
         return copy_node(newroot, node), node
-    
+
     def visit_default(self, node, newroot, terms):
         subparts, node = self._visit_children(node, newroot, terms)
         return copy_node(newroot, node, subparts), node
-        
+
     visit_comparison = visit_mathexpression = visit_constant = visit_function = visit_default
     visit_sort = visit_sortterm = visit_default
-    
+
     def _visit_children(self, node, newroot, terms):
         subparts = []
         for i in xrange(len(node.children)):
@@ -1444,14 +1446,14 @@
             if newchild is not None:
                 subparts.append(newchild)
         return subparts, node
-    
+
     def process_selection(self, newroot, terms, rqlst):
         if self.final:
             for term in rqlst.selection:
                 newroot.append_selected(term.copy(newroot))
                 for vref in term.get_nodes(VariableRef):
                     self.needsel.add(vref.name)
-            return 
+            return
         for term in rqlst.selection:
             vrefs = term.get_nodes(VariableRef)
             if vrefs:
@@ -1471,7 +1473,7 @@
                 for vref in supportedvars:
                     if not vref in newroot.get_selected_variables():
                         newroot.append_selected(VariableRef(newroot.get_variable(vref.name)))
-            
+
     def add_necessary_selection(self, newroot, terms):
         selected = tuple(newroot.get_selected_variables())
         for varname in terms: