--- a/server/msplanner.py Wed May 13 15:06:02 2009 +0200
+++ b/server/msplanner.py Wed May 13 15:46:47 2009 +0200
@@ -16,7 +16,7 @@
"cross_relations" set in the source's mapping file and it that case, we'll
consider that we can also find in the system source some relation between
X and Y coming from different sources.
-
+
* if "relation" isn't supported by the external source but X or Y
types (or both) are, we suppose by default that can find in the system
source some relation where X and/or Y come from the external source. You
@@ -49,7 +49,7 @@
1. return the result of CWUser X WHERE X in_group G, G name 'users' from system
source, that's enough (optimization of the sql querier will avoid join on
CWUser, so we will directly get local eids)
-
+
:CWUser X,L WHERE X in_group G, X login L, G name 'users':
1. fetch Any X,L WHERE X is CWUser, X login L from both sources, store
concatenation of results into a temporary table
@@ -98,7 +98,7 @@
AbstractSource.dont_cross_relations = ()
AbstractSource.cross_relations = ()
-
+
def need_aggr_step(select, sources, stepdefs=None):
"""return True if a temporary table is necessary to store some partial
results to execute the given query
@@ -169,7 +169,7 @@
for part in subparts:
newnode.append(part)
return newnode
-
+
def same_scope(var):
"""return true if the variable is always used in the same scope"""
try:
@@ -181,7 +181,7 @@
return False
var.stinfo['samescope'] = True
return True
-
+
################################################################################
class PartPlanInformation(object):
@@ -198,19 +198,19 @@
the execution plan
:attr rqlst:
the original rql syntax tree handled by this part
-
+
:attr needsplit:
bool telling if the query has to be split into multiple steps for
execution or if it can be executed at once
-
+
:attr temptable:
a SQL temporary table name or None, if necessary to handle aggregate /
sorting for this part of the query
-
+
:attr finaltable:
a SQL table name or None, if results for this part of the query should be
written into a temporary table (usually shared by multiple PPI)
-
+
:attr sourcesterms:
a dictionary {source : {term: set([solution index, ])}} telling for each
source which terms are supported for which solutions. A "term" may be
@@ -262,23 +262,23 @@
print 'sourcesterms:'
for source, terms in self.sourcesterms.items():
print source, terms
-
+
def copy_solutions(self, solindices):
return [self._solutions[solidx].copy() for solidx in solindices]
-
+
@property
@cached
def part_sources(self):
if self._sourcesterms:
return tuple(sorted(self._sourcesterms))
return (self.system_source,)
-
+
@property
@cached
def _sys_source_set(self):
return frozenset((self.system_source, solindex)
- for solindex in self._solindices)
-
+ for solindex in self._solindices)
+
@cached
def _norel_support_set(self, relation):
"""return a set of (source, solindex) where source doesn't support the
@@ -340,7 +340,7 @@
# query
if not varobj._q_invariant and any(ifilterfalse(
source.support_relation, (r.r_type for r in rels))):
- self.needsplit = True
+ self.needsplit = True
# add source for rewritten constants to sourcesterms
for vconsts in self.rqlst.stinfo['rewritten'].itervalues():
const = vconsts[0]
@@ -397,7 +397,7 @@
self._linkedterms.setdefault(lhsv, set()).add((rhsv, rel))
self._linkedterms.setdefault(rhsv, set()).add((lhsv, rel))
return termssources
-
+
def _handle_cross_relation(self, rel, relsources, termssources):
for source in relsources:
if rel.r_type in source.cross_relations:
@@ -422,7 +422,7 @@
break
else:
self._sourcesterms.setdefault(source, {})[rel] = set(self._solindices)
-
+
def _remove_invalid_sources(self, termssources):
"""removes invalid sources from `sourcesterms` member according to
traversed relations and their properties (which sources support them,
@@ -455,7 +455,7 @@
continue
self._remove_term_sources(lhsv, rel, rhsv, termssources)
self._remove_term_sources(rhsv, rel, lhsv, termssources)
-
+
def _extern_term(self, term, termssources, inserted):
var = term.variable
if var.stinfo['constnode']:
@@ -471,7 +471,7 @@
if not termv in termssources:
termssources[termv] = self._term_sources(termv)
return termv
-
+
def _remove_sources_until_stable(self, term, termssources):
sourcesterms = self._sourcesterms
for oterm, rel in self._linkedterms.get(term, ()):
@@ -506,10 +506,10 @@
self._remove_term_sources(term, rel, oterm, termssources)
if not need_ancestor_scope or is_ancestor(oterm.scope, term.scope):
self._remove_term_sources(oterm, rel, term, termssources)
-
+
def _remove_term_sources(self, term, rel, oterm, termssources):
"""remove invalid sources for term according to oterm's sources and the
- relation between those two terms.
+ relation between those two terms.
"""
norelsup = self._norel_support_set(rel)
termsources = termssources[term]
@@ -528,21 +528,23 @@
self._remove_sources(term, invalid_sources)
termsources -= invalid_sources
self._remove_sources_until_stable(term, termssources)
-
+ if isinstance(oterm, Constant):
+ self._remove_sources(oterm, invalid_sources)
+
def _compute_needsplit(self):
"""tell according to sourcesterms if the rqlst has to be splitted for
execution among multiple sources
-
+
the execution has to be split if
* a source support an entity (non invariant) but doesn't support a
relation on it
* a source support an entity which is accessed by an optional relation
- * there is more than one source and either all sources'supported
+ * there is more than one source and either all sources'supported
variable/solutions are not equivalent or multiple variables have to
be fetched from some source
"""
# NOTE: < 2 since may be 0 on queries such as Any X WHERE X eid 2
- if len(self._sourcesterms) < 2:
+ if len(self._sourcesterms) < 2:
self.needsplit = False
elif not self.needsplit:
if not allequals(self._sourcesterms.itervalues()):
@@ -576,7 +578,7 @@
if not r is rel and self._repo.is_multi_sources_relation(r.r_type)):
return True
return False
-
+
def _set_source_for_term(self, source, term):
self._sourcesterms.setdefault(source, {})[term] = set(self._solindices)
@@ -603,7 +605,7 @@
try:
sourcesterms[source][term].remove(solindex)
except KeyError:
- return # may occur with subquery column alias
+ continue # may occur with subquery column alias
if not sourcesterms[source][term]:
del sourcesterms[source][term]
if not sourcesterms[source]:
@@ -611,7 +613,7 @@
def crossed_relation(self, source, relation):
return relation in self._crossrelations.get(source, ())
-
+
def part_steps(self):
"""precompute necessary part steps before generating actual rql for
each step. This is necessary to know if an aggregate step will be
@@ -758,10 +760,10 @@
# ensure relation is using '=' operator, else we rely on a
# sqlgenerator side effect (it won't insert an inequality operator
# in this case)
- relation.children[1].operator = '='
+ relation.children[1].operator = '='
terms.append(newvar)
needsel.add(newvar.name)
-
+
def _choose_term(self, sourceterms):
"""pick one term among terms supported by a source, which will be used
as a base to generate an execution step
@@ -798,7 +800,7 @@
# whatever (relation)
term = iter(sourceterms).next()
return term, sourceterms.pop(term)
-
+
def _expand_sources(self, selected_source, term, solindices):
"""return all sources supporting given term / solindices"""
sources = [selected_source]
@@ -806,7 +808,7 @@
for source in sourcesterms.keys():
if source is selected_source:
continue
- if not (term in sourcesterms[source] and
+ if not (term in sourcesterms[source] and
solindices.issubset(sourcesterms[source][term])):
continue
sources.append(source)
@@ -818,7 +820,7 @@
if not sourcesterms[source]:
del sourcesterms[source]
return sources
-
+
def _expand_terms(self, term, sources, sourceterms, scope, solindices):
terms = [term]
sources = sorted(sources)
@@ -876,7 +878,7 @@
modified = True
self._cleanup_sourcesterms(sources, solindices, term)
return terms
-
+
def _cleanup_sourcesterms(self, sources, solindices, term=None):
"""remove solutions so we know they are already processed"""
for source in sources:
@@ -901,7 +903,7 @@
#assert term in cross_terms
if not sourceterms:
del self._sourcesterms[source]
-
+
def merge_input_maps(self, allsolindices):
"""inputmaps is a dictionary with tuple of solution indices as key with
an associated input map as value. This function compute for each
@@ -911,7 +913,7 @@
inputmaps = {(0, 1, 2): {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1'},
(1,): {'X': 't2.C0', 'T': 't2.C1'}}
return : [([1], {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1',
- 'X': 't2.C0', 'T': 't2.C1'}),
+ 'X': 't2.C0', 'T': 't2.C1'}),
([0,2], {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1'})]
"""
if not self._inputmaps:
@@ -980,10 +982,10 @@
decompose the RQL query according to sources'schema
"""
-
+
def build_select_plan(self, plan, rqlst):
"""build execution plan for a SELECT RQL query
-
+
the rqlst should not be tagged at this point
"""
if server.DEBUG:
@@ -1030,7 +1032,7 @@
inputmap[colalias.name] = '%s.C%s' % (temptable, i)
ppi.plan.add_step(sstep)
return inputmap
-
+
def _union_plan(self, plan, union, ppis, temptable=None):
tosplit, cango, allsources = [], {}, set()
for planinfo in ppis:
@@ -1088,7 +1090,7 @@
return steps
# internal methods for multisources decomposition #########################
-
+
def split_part(self, ppi, temptable):
ppi.finaltable = temptable
plan = ppi.plan
@@ -1172,7 +1174,7 @@
step.set_limit_offset(select.limit, select.offset)
return step
-
+
class UnsupportedBranch(Exception):
pass
@@ -1185,7 +1187,7 @@
self.hasaggrstep = self.ppi.temptable
self.extneedsel = frozenset(vref.name for sortterm in ppi.rqlst.orderby
for vref in sortterm.iget_nodes(VariableRef))
-
+
def _rqlst_accept(self, rqlst, node, newroot, terms, setfunc=None):
try:
newrestr, node_ = node.accept(self, newroot, terms[:])
@@ -1293,7 +1295,7 @@
if server.DEBUG:
print '--->', newroot
return newroot, self.insertedvars
-
+
def visit_and(self, node, newroot, terms):
subparts = []
for i in xrange(len(node.children)):
@@ -1330,7 +1332,7 @@
if termsources and termsources != self.sources:
return False
return True
-
+
def visit_relation(self, node, newroot, terms):
if not node.is_types_restriction():
if node in self.skip and self.solindices.issubset(self.skip[node]):
@@ -1368,7 +1370,7 @@
if not ored:
self.skip.setdefault(node, set()).update(self.solindices)
else:
- self.mayneedvar.setdefault((node.children[0].name, rschema), []).append( (res, ored) )
+ self.mayneedvar.setdefault((node.children[0].name, rschema), []).append( (res, ored) )
else:
assert len(vrefs) == 1
vref = vrefs[0]
@@ -1391,7 +1393,7 @@
if any(v for v, _ in var.stinfo['attrvars'] if not v in terms):
return False
return True
-
+
def visit_exists(self, node, newroot, terms):
newexists = node.__class__()
self.scopes = {node: newexists}
@@ -1400,18 +1402,18 @@
return None, node
newexists.set_where(subparts[0])
return newexists, node
-
+
def visit_not(self, node, newroot, terms):
subparts, node = self._visit_children(node, newroot, terms)
if not subparts:
return None, node
return copy_node(newroot, node, subparts), node
-
+
def visit_group(self, node, newroot, terms):
if not self.final:
return None, node
return self.visit_default(node, newroot, terms)
-
+
def visit_variableref(self, node, newroot, terms):
if self.use_only_defined:
if not node.variable.name in newroot.defined_vars:
@@ -1426,14 +1428,14 @@
def visit_constant(self, node, newroot, terms):
return copy_node(newroot, node), node
-
+
def visit_default(self, node, newroot, terms):
subparts, node = self._visit_children(node, newroot, terms)
return copy_node(newroot, node, subparts), node
-
+
visit_comparison = visit_mathexpression = visit_constant = visit_function = visit_default
visit_sort = visit_sortterm = visit_default
-
+
def _visit_children(self, node, newroot, terms):
subparts = []
for i in xrange(len(node.children)):
@@ -1444,14 +1446,14 @@
if newchild is not None:
subparts.append(newchild)
return subparts, node
-
+
def process_selection(self, newroot, terms, rqlst):
if self.final:
for term in rqlst.selection:
newroot.append_selected(term.copy(newroot))
for vref in term.get_nodes(VariableRef):
self.needsel.add(vref.name)
- return
+ return
for term in rqlst.selection:
vrefs = term.get_nodes(VariableRef)
if vrefs:
@@ -1471,7 +1473,7 @@
for vref in supportedvars:
if not vref in newroot.get_selected_variables():
newroot.append_selected(VariableRef(newroot.get_variable(vref.name)))
-
+
def add_necessary_selection(self, newroot, terms):
selected = tuple(newroot.get_selected_variables())
for varname in terms: