|
1 """plan execution of rql queries on multiple sources |
|
2 |
|
3 the best way to understand what are we trying to acheive here is to read |
|
4 the unit-tests in unittest_querier_planner.py |
|
5 |
|
6 |
|
7 |
|
8 Split and execution specifications |
|
9 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
10 For a system source and a ldap user source (only EUser and its attributes |
|
11 is supported, no group or such): |
|
12 |
|
13 |
|
14 :EUser X: |
|
15 1. fetch EUser X from both sources and return concatenation of results |
|
16 |
|
17 |
|
18 :EUser X WHERE X in_group G, G name 'users': |
|
19 * catch 1 |
|
20 1. fetch EUser X from both sources, store concatenation of results |
|
21 into a temporary table |
|
22 2. return the result of TMP X WHERE X in_group G, G name 'users' from |
|
23 the system source |
|
24 |
|
25 * catch 2 |
|
26 1. return the result of EUser X WHERE X in_group G, G name 'users' |
|
27 from system source, that's enough (optimization of the sql querier |
|
28 will avoid join on EUser, so we will directly get local eids) |
|
29 |
|
30 |
|
31 :EUser X,L WHERE X in_group G, X login L, G name 'users': |
|
32 1. fetch Any X,L WHERE X is EUser, X login L from both sources, store |
|
33 concatenation of results into a temporary table |
|
34 2. return the result of Any X, L WHERE X is TMP, X login LX in_group G, |
|
35 G name 'users' from the system source |
|
36 |
|
37 |
|
38 :Any X WHERE X owned_by Y: |
|
39 * catch 1 |
|
40 1. fetch EUser X from both sources, store concatenation of results |
|
41 into a temporary table |
|
42 2. return the result of Any X WHERE X owned_by Y, Y is TMP from |
|
43 the system source |
|
44 |
|
45 * catch 2 |
|
46 1. return the result of Any X WHERE X owned_by Y |
|
47 from system source, that's enough (optimization of the sql querier |
|
48 will avoid join on EUser, so we will directly get local eids) |
|
49 |
|
50 |
|
51 :organization: Logilab |
|
52 :copyright: 2003-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
53 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
54 """ |
|
55 __docformat__ = "restructuredtext en" |
|
56 |
|
57 from itertools import imap, ifilterfalse |
|
58 |
|
59 from logilab.common.compat import any |
|
60 from logilab.common.decorators import cached |
|
61 |
|
62 from rql.stmts import Union, Select |
|
63 from rql.nodes import VariableRef, Comparison, Relation, Constant, Exists, Variable |
|
64 |
|
65 from cubicweb import server |
|
66 from cubicweb.common.utils import make_uid |
|
67 from cubicweb.server.utils import cleanup_solutions |
|
68 from cubicweb.server.ssplanner import SSPlanner, OneFetchStep, add_types_restriction |
|
69 from cubicweb.server.mssteps import * |
|
70 from cubicweb.server.sources import AbstractSource |
|
71 |
|
72 Variable._ms_table_key = lambda x: x.name |
|
73 Relation._ms_table_key = lambda x: x.r_type |
|
74 # str() Constant.value to ensure generated table name won't be unicode |
|
75 Constant._ms_table_key = lambda x: str(x.value) |
|
76 |
|
77 AbstractSource.dont_cross_relations = () |
|
78 |
|
79 def allequals(solutions): |
|
80 """return true if all solutions are identical""" |
|
81 sol = solutions.next() |
|
82 for sol_ in solutions: |
|
83 if sol_ != sol: |
|
84 return False |
|
85 return True |
|
86 |
|
87 def need_aggr_step(select, sources, stepdefs=None): |
|
88 """return True if a temporary table is necessary to store some partial |
|
89 results to execute the given query |
|
90 """ |
|
91 if len(sources) == 1: |
|
92 # can do everything at once with a single source |
|
93 return False |
|
94 if select.orderby or select.groupby or select.has_aggregat: |
|
95 # if more than one source, we need a temp table to deal with sort / |
|
96 # groups / aggregat if : |
|
97 # * the rqlst won't be splitted (in the other case the last query |
|
98 # using partial temporary table can do sort/groups/aggregat without |
|
99 # the need for a later AggrStep) |
|
100 # * the rqlst is splitted in multiple steps and there are more than one |
|
101 # final step |
|
102 if stepdefs is None: |
|
103 return True |
|
104 has_one_final = False |
|
105 fstepsolindices = set() |
|
106 for stepdef in stepdefs: |
|
107 if stepdef[-1]: |
|
108 if has_one_final or frozenset(stepdef[2]) != fstepsolindices: |
|
109 return True |
|
110 has_one_final = True |
|
111 else: |
|
112 fstepsolindices.update(stepdef[2]) |
|
113 return False |
|
114 |
|
115 def copy_node(newroot, node, subparts=()): |
|
116 newnode = node.__class__(*node.initargs(newroot)) |
|
117 for part in subparts: |
|
118 newnode.append(part) |
|
119 return newnode |
|
120 |
|
121 def same_scope(var): |
|
122 """return true if the variable is always used in the same scope""" |
|
123 try: |
|
124 return var.stinfo['samescope'] |
|
125 except KeyError: |
|
126 for rel in var.stinfo['relations']: |
|
127 if not rel.scope is var.scope: |
|
128 var.stinfo['samescope'] = False |
|
129 return False |
|
130 var.stinfo['samescope'] = True |
|
131 return True |
|
132 |
|
133 def select_group_sort(select): # XXX something similar done in rql2sql |
|
134 # add variables used in groups and sort terms to the selection |
|
135 # if necessary |
|
136 if select.groupby: |
|
137 for vref in select.groupby: |
|
138 if not vref in select.selection: |
|
139 select.append_selected(vref.copy(select)) |
|
140 for sortterm in select.orderby: |
|
141 for vref in sortterm.iget_nodes(VariableRef): |
|
142 if not vref in select.get_selected_variables(): |
|
143 # we can't directly insert sortterm.term because it references |
|
144 # a variable of the select before the copy. |
|
145 # XXX if constant term are used to define sort, their value |
|
146 # may necessite a decay |
|
147 select.append_selected(vref.copy(select)) |
|
148 if select.groupby and not vref in select.groupby: |
|
149 select.add_group_var(vref.copy(select)) |
|
150 |
|
151 |
|
152 class PartPlanInformation(object): |
|
153 """regroups necessary information to execute some part of a "global" rql |
|
154 query ("global" means as received by the querier, which may result in |
|
155 several internal queries, e.g. parts, due to security insertions) |
|
156 |
|
157 it exposes as well some methods helping in executing this part on a |
|
158 multi-sources repository, modifying its internal structure during the |
|
159 process |
|
160 |
|
161 :attr solutions: a list of mappings (varname -> vartype) |
|
162 :attr sourcesvars: |
|
163 a dictionnary telling for each source which variable/solution are |
|
164 supported, of the form {source : {varname: [solution index, ]}} |
|
165 """ |
|
166 def __init__(self, plan, rqlst, rqlhelper=None): |
|
167 self.needsplit = False |
|
168 self.temptable = None |
|
169 self.finaltable = None |
|
170 self.plan = plan |
|
171 self.rqlst = rqlst |
|
172 self._session = plan.session |
|
173 self._solutions = rqlst.solutions |
|
174 self._solindices = range(len(self._solutions)) |
|
175 # source : {varname: [solution index, ]} |
|
176 self._sourcesvars = {} |
|
177 # dictionnary of variables which are linked to each other using a non |
|
178 # final relation which is supported by multiple sources |
|
179 self._linkedvars = {} |
|
180 # processing |
|
181 self._compute_sourcesvars() |
|
182 self._remove_invalid_sources() |
|
183 #if server.DEBUG: |
|
184 # print 'planner sources vars', self._sourcesvars |
|
185 self._compute_needsplit() |
|
186 self._inputmaps = {} |
|
187 if rqlhelper is not None: # else test |
|
188 self._insert_identity_variable = rqlhelper._annotator.rewrite_shared_optional |
|
189 |
|
190 def copy_solutions(self, solindices): |
|
191 return [self._solutions[solidx].copy() for solidx in solindices] |
|
192 |
|
193 @property |
|
194 @cached |
|
195 def part_sources(self): |
|
196 if self._sourcesvars: |
|
197 return tuple(sorted(self._sourcesvars)) |
|
198 return (self._session.repo.system_source,) |
|
199 |
|
200 @property |
|
201 @cached |
|
202 def _sys_source_set(self): |
|
203 return frozenset((self._session.repo.system_source, solindex) |
|
204 for solindex in self._solindices) |
|
205 |
|
206 @cached |
|
207 def _norel_support_set(self, rtype): |
|
208 """return a set of (source, solindex) where source doesn't support the |
|
209 relation |
|
210 """ |
|
211 return frozenset((source, solidx) for source in self._session.repo.sources |
|
212 for solidx in self._solindices |
|
213 if not (source.support_relation(rtype) |
|
214 or rtype in source.dont_cross_relations)) |
|
215 |
|
216 def _compute_sourcesvars(self): |
|
217 """compute for each variable/solution in the rqlst which sources support |
|
218 them |
|
219 """ |
|
220 repo = self._session.repo |
|
221 eschema = repo.schema.eschema |
|
222 sourcesvars = self._sourcesvars |
|
223 # find for each source which variable/solution are supported |
|
224 for varname, varobj in self.rqlst.defined_vars.items(): |
|
225 # if variable has an eid specified, we can get its source directly |
|
226 # NOTE: use uidrels and not constnode to deal with "X eid IN(1,2,3,4)" |
|
227 if varobj.stinfo['uidrels']: |
|
228 vrels = varobj.stinfo['relations'] - varobj.stinfo['uidrels'] |
|
229 for rel in varobj.stinfo['uidrels']: |
|
230 if rel.neged(strict=True) or rel.operator() != '=': |
|
231 continue |
|
232 for const in rel.children[1].get_nodes(Constant): |
|
233 eid = const.eval(self.plan.args) |
|
234 source = self._session.source_from_eid(eid) |
|
235 if vrels and not any(source.support_relation(r.r_type) |
|
236 for r in vrels): |
|
237 self._set_source_for_var(repo.system_source, varobj) |
|
238 else: |
|
239 self._set_source_for_var(source, varobj) |
|
240 continue |
|
241 rels = varobj.stinfo['relations'] |
|
242 if not rels and not varobj.stinfo['typerels']: |
|
243 # (rare) case where the variable has no type specified nor |
|
244 # relation accessed ex. "Any MAX(X)" |
|
245 self._set_source_for_var(repo.system_source, varobj) |
|
246 continue |
|
247 for i, sol in enumerate(self._solutions): |
|
248 vartype = sol[varname] |
|
249 # skip final variable |
|
250 if eschema(vartype).is_final(): |
|
251 break |
|
252 for source in repo.sources: |
|
253 if source.support_entity(vartype): |
|
254 # the source support the entity type, though we will |
|
255 # actually have to fetch from it only if |
|
256 # * the variable isn't invariant |
|
257 # * at least one supported relation specified |
|
258 if not varobj._q_invariant or \ |
|
259 any(imap(source.support_relation, |
|
260 (r.r_type for r in rels if r.r_type != 'eid'))): |
|
261 sourcesvars.setdefault(source, {}).setdefault(varobj, set()).add(i) |
|
262 # if variable is not invariant and is used by a relation |
|
263 # not supported by this source, we'll have to split the |
|
264 # query |
|
265 if not varobj._q_invariant and any(ifilterfalse( |
|
266 source.support_relation, (r.r_type for r in rels))): |
|
267 self.needsplit = True |
|
268 |
|
269 def _remove_invalid_sources(self): |
|
270 """removes invalid sources from `sourcesvars` member""" |
|
271 repo = self._session.repo |
|
272 rschema = repo.schema.rschema |
|
273 vsources = {} |
|
274 for rel in self.rqlst.iget_nodes(Relation): |
|
275 # process non final relations only |
|
276 # note: don't try to get schema for 'is' relation (not available |
|
277 # during bootstrap) |
|
278 if not rel.is_types_restriction() and not rschema(rel.r_type).is_final(): |
|
279 # nothing to do if relation is not supported by multiple sources |
|
280 relsources = [source for source in repo.sources |
|
281 if source.support_relation(rel.r_type) |
|
282 or rel.r_type in source.dont_cross_relations] |
|
283 if len(relsources) < 2: |
|
284 if relsources:# and not relsources[0] in self._sourcesvars: |
|
285 # this means the relation is using a variable inlined as |
|
286 # a constant and another unsupported variable, in which |
|
287 # case we put the relation in sourcesvars |
|
288 self._sourcesvars.setdefault(relsources[0], {})[rel] = set(self._solindices) |
|
289 continue |
|
290 lhs, rhs = rel.get_variable_parts() |
|
291 lhsv, rhsv = getattr(lhs, 'variable', lhs), getattr(rhs, 'variable', rhs) |
|
292 # update dictionnary of sources supporting lhs and rhs vars |
|
293 if not lhsv in vsources: |
|
294 vsources[lhsv] = self._term_sources(lhs) |
|
295 if not rhsv in vsources: |
|
296 vsources[rhsv] = self._term_sources(rhs) |
|
297 self._linkedvars.setdefault(lhsv, set()).add((rhsv, rel)) |
|
298 self._linkedvars.setdefault(rhsv, set()).add((lhsv, rel)) |
|
299 for term in self._linkedvars: |
|
300 self._remove_sources_until_stable(term, vsources) |
|
301 if len(self._sourcesvars) > 1 and hasattr(self.plan.rqlst, 'main_relations'): |
|
302 # the querier doesn't annotate write queries, need to do it here |
|
303 self.plan.annotate_rqlst() |
|
304 # insert/update/delete queries, we may get extra information from |
|
305 # the main relation (eg relations to the left of the WHERE |
|
306 if self.plan.rqlst.TYPE == 'insert': |
|
307 inserted = dict((vref.variable, etype) |
|
308 for etype, vref in self.plan.rqlst.main_variables) |
|
309 else: |
|
310 inserted = {} |
|
311 for rel in self.plan.rqlst.main_relations: |
|
312 if not rschema(rel.r_type).is_final(): |
|
313 # nothing to do if relation is not supported by multiple sources |
|
314 relsources = [source for source in repo.sources |
|
315 if source.support_relation(rel.r_type) |
|
316 or rel.r_type in source.dont_cross_relations] |
|
317 if len(relsources) < 2: |
|
318 continue |
|
319 lhs, rhs = rel.get_variable_parts() |
|
320 try: |
|
321 lhsv = self._extern_term(lhs, vsources, inserted) |
|
322 rhsv = self._extern_term(rhs, vsources, inserted) |
|
323 except KeyError, ex: |
|
324 continue |
|
325 norelsup = self._norel_support_set(rel.r_type) |
|
326 self._remove_var_sources(lhsv, norelsup, rhsv, vsources) |
|
327 self._remove_var_sources(rhsv, norelsup, lhsv, vsources) |
|
328 # cleanup linked var |
|
329 for var, linkedrelsinfo in self._linkedvars.iteritems(): |
|
330 self._linkedvars[var] = frozenset(x[0] for x in linkedrelsinfo) |
|
331 # if there are other sources than the system source, consider simplified |
|
332 # variables'source |
|
333 if self._sourcesvars and self._sourcesvars.keys() != [self._session.repo.system_source]: |
|
334 # add source for rewritten constants to sourcesvars |
|
335 for vconsts in self.rqlst.stinfo['rewritten'].itervalues(): |
|
336 const = vconsts[0] |
|
337 eid = const.eval(self.plan.args) |
|
338 source = self._session.source_from_eid(eid) |
|
339 if source is self._session.repo.system_source: |
|
340 for const in vconsts: |
|
341 self._set_source_for_var(source, const) |
|
342 elif source in self._sourcesvars: |
|
343 source_scopes = frozenset(v.scope for v in self._sourcesvars[source]) |
|
344 for const in vconsts: |
|
345 if const.scope in source_scopes: |
|
346 self._set_source_for_var(source, const) |
|
347 |
|
348 def _extern_term(self, term, vsources, inserted): |
|
349 var = term.variable |
|
350 if var.stinfo['constnode']: |
|
351 termv = var.stinfo['constnode'] |
|
352 vsources[termv] = self._term_sources(termv) |
|
353 elif var in inserted: |
|
354 termv = var |
|
355 source = self._session.repo.locate_etype_source(inserted[var]) |
|
356 vsources[termv] = set((source, solindex) for solindex in self._solindices) |
|
357 else: |
|
358 termv = self.rqlst.defined_vars[var.name] |
|
359 if not termv in vsources: |
|
360 vsources[termv] = self._term_sources(termv) |
|
361 return termv |
|
362 |
|
363 def _remove_sources_until_stable(self, var, vsources): |
|
364 for ovar, rel in self._linkedvars.get(var, ()): |
|
365 if not var.scope is ovar.scope and rel.scope.neged(strict=True): |
|
366 # can't get information from relation inside a NOT exists |
|
367 # where variables don't belong to the same scope |
|
368 continue |
|
369 if rel.neged(strict=True): |
|
370 # neged relation doesn't allow to infer variable sources |
|
371 continue |
|
372 norelsup = self._norel_support_set(rel.r_type) |
|
373 # compute invalid sources for variables and remove them |
|
374 self._remove_var_sources(var, norelsup, ovar, vsources) |
|
375 self._remove_var_sources(ovar, norelsup, var, vsources) |
|
376 |
|
377 def _remove_var_sources(self, var, norelsup, ovar, vsources): |
|
378 """remove invalid sources for var according to ovar's sources and the |
|
379 relation between those two variables. |
|
380 """ |
|
381 varsources = vsources[var] |
|
382 invalid_sources = varsources - (vsources[ovar] | norelsup) |
|
383 if invalid_sources: |
|
384 self._remove_sources(var, invalid_sources) |
|
385 varsources -= invalid_sources |
|
386 self._remove_sources_until_stable(var, vsources) |
|
387 |
|
388 def _compute_needsplit(self): |
|
389 """tell according to sourcesvars if the rqlst has to be splitted for |
|
390 execution among multiple sources |
|
391 |
|
392 the execution has to be split if |
|
393 * a source support an entity (non invariant) but doesn't support a |
|
394 relation on it |
|
395 * a source support an entity which is accessed by an optional relation |
|
396 * there is more than one sources and either all sources'supported |
|
397 variable/solutions are not equivalent or multiple variables have to |
|
398 be fetched from some source |
|
399 """ |
|
400 # NOTE: < 2 since may be 0 on queries such as Any X WHERE X eid 2 |
|
401 if len(self._sourcesvars) < 2: |
|
402 self.needsplit = False |
|
403 elif not self.needsplit: |
|
404 if not allequals(self._sourcesvars.itervalues()): |
|
405 self.needsplit = True |
|
406 else: |
|
407 sample = self._sourcesvars.itervalues().next() |
|
408 if len(sample) > 1 and any(v for v in sample |
|
409 if not v in self._linkedvars): |
|
410 self.needsplit = True |
|
411 |
|
412 def _set_source_for_var(self, source, var): |
|
413 self._sourcesvars.setdefault(source, {})[var] = set(self._solindices) |
|
414 |
|
415 def _term_sources(self, term): |
|
416 """returns possible sources for terms `term`""" |
|
417 if isinstance(term, Constant): |
|
418 source = self._session.source_from_eid(term.eval(self.plan.args)) |
|
419 return set((source, solindex) for solindex in self._solindices) |
|
420 else: |
|
421 var = getattr(term, 'variable', term) |
|
422 sources = [source for source, varobjs in self._sourcesvars.iteritems() |
|
423 if var in varobjs] |
|
424 return set((source, solindex) for source in sources |
|
425 for solindex in self._sourcesvars[source][var]) |
|
426 |
|
427 def _remove_sources(self, var, sources): |
|
428 """removes invalid sources (`sources`) from `sourcesvars` |
|
429 |
|
430 :param sources: the list of sources to remove |
|
431 :param var: the analyzed variable |
|
432 """ |
|
433 sourcesvars = self._sourcesvars |
|
434 for source, solindex in sources: |
|
435 try: |
|
436 sourcesvars[source][var].remove(solindex) |
|
437 except KeyError: |
|
438 return # may occur with subquery column alias |
|
439 if not sourcesvars[source][var]: |
|
440 del sourcesvars[source][var] |
|
441 if not sourcesvars[source]: |
|
442 del sourcesvars[source] |
|
443 |
|
444 def part_steps(self): |
|
445 """precompute necessary part steps before generating actual rql for |
|
446 each step. This is necessary to know if an aggregate step will be |
|
447 necessary or not. |
|
448 """ |
|
449 steps = [] |
|
450 select = self.rqlst |
|
451 rschema = self.plan.schema.rschema |
|
452 for source in self.part_sources: |
|
453 sourcevars = self._sourcesvars[source] |
|
454 while sourcevars: |
|
455 # take a variable randomly, and all variables supporting the |
|
456 # same solutions |
|
457 var, solindices = self._choose_var(sourcevars) |
|
458 if source.uri == 'system': |
|
459 # ensure all variables are available for the latest step |
|
460 # (missing one will be available from temporary tables |
|
461 # of previous steps) |
|
462 scope = select |
|
463 variables = scope.defined_vars.values() + scope.aliases.values() |
|
464 sourcevars.clear() |
|
465 else: |
|
466 scope = var.scope |
|
467 variables = self._expand_vars(var, sourcevars, scope, solindices) |
|
468 if not sourcevars: |
|
469 del self._sourcesvars[source] |
|
470 # find which sources support the same variables/solutions |
|
471 sources = self._expand_sources(source, variables, solindices) |
|
472 # suppose this is a final step until the contrary is proven |
|
473 final = scope is select |
|
474 # set of variables which should be additionaly selected when |
|
475 # possible |
|
476 needsel = set() |
|
477 # add attribute variables and mark variables which should be |
|
478 # additionaly selected when possible |
|
479 for var in select.defined_vars.itervalues(): |
|
480 if not var in variables: |
|
481 stinfo = var.stinfo |
|
482 for ovar, rtype in stinfo['attrvars']: |
|
483 if ovar in variables: |
|
484 needsel.add(var.name) |
|
485 variables.append(var) |
|
486 break |
|
487 else: |
|
488 needsel.add(var.name) |
|
489 final = False |
|
490 if final and source.uri != 'system': |
|
491 # check rewritten constants |
|
492 for vconsts in select.stinfo['rewritten'].itervalues(): |
|
493 const = vconsts[0] |
|
494 eid = const.eval(self.plan.args) |
|
495 _source = self._session.source_from_eid(eid) |
|
496 if len(sources) > 1 or not _source in sources: |
|
497 # if constant is only used by an identity relation, |
|
498 # skip |
|
499 for c in vconsts: |
|
500 rel = c.relation() |
|
501 if rel is None or not rel.neged(strict=True): |
|
502 final = False |
|
503 break |
|
504 break |
|
505 # check where all relations are supported by the sources |
|
506 for rel in scope.iget_nodes(Relation): |
|
507 if rel.is_types_restriction(): |
|
508 continue |
|
509 # take care not overwriting the existing "source" identifier |
|
510 for _source in sources: |
|
511 if not _source.support_relation(rel.r_type): |
|
512 for vref in rel.iget_nodes(VariableRef): |
|
513 needsel.add(vref.name) |
|
514 final = False |
|
515 break |
|
516 else: |
|
517 if not scope is select: |
|
518 self._exists_relation(rel, variables, needsel) |
|
519 # if relation is supported by all sources and some of |
|
520 # its lhs/rhs variable isn't in "variables", and the |
|
521 # other end *is* in "variables", mark it have to be |
|
522 # selected |
|
523 if source.uri != 'system' and not rschema(rel.r_type).is_final(): |
|
524 lhs, rhs = rel.get_variable_parts() |
|
525 try: |
|
526 lhsvar = lhs.variable |
|
527 except AttributeError: |
|
528 lhsvar = lhs |
|
529 try: |
|
530 rhsvar = rhs.variable |
|
531 except AttributeError: |
|
532 rhsvar = rhs |
|
533 if lhsvar in variables and not rhsvar in variables: |
|
534 needsel.add(lhsvar.name) |
|
535 elif rhsvar in variables and not lhsvar in variables: |
|
536 needsel.add(rhsvar.name) |
|
537 if final: |
|
538 self._cleanup_sourcesvars(sources, solindices) |
|
539 # XXX rename: variables may contain Relation and Constant nodes... |
|
540 steps.append( (sources, variables, solindices, scope, needsel, |
|
541 final) ) |
|
542 return steps |
|
543 |
|
544 def _exists_relation(self, rel, variables, needsel): |
|
545 rschema = self.plan.schema.rschema(rel.r_type) |
|
546 lhs, rhs = rel.get_variable_parts() |
|
547 try: |
|
548 lhsvar, rhsvar = lhs.variable, rhs.variable |
|
549 except AttributeError: |
|
550 pass |
|
551 else: |
|
552 # supported relation with at least one end supported, check the |
|
553 # other end is in as well. If not this usually means the |
|
554 # variable is refed by an outer scope and should be substituted |
|
555 # using an 'identity' relation (else we'll get a conflict of |
|
556 # temporary tables) |
|
557 if rhsvar in variables and not lhsvar in variables: |
|
558 self._identity_substitute(rel, lhsvar, variables, needsel) |
|
559 elif lhsvar in variables and not rhsvar in variables: |
|
560 self._identity_substitute(rel, rhsvar, variables, needsel) |
|
561 |
|
562 def _identity_substitute(self, relation, var, variables, needsel): |
|
563 newvar = self._insert_identity_variable(relation.scope, var) |
|
564 if newvar is not None: |
|
565 # ensure relation is using '=' operator, else we rely on a |
|
566 # sqlgenerator side effect (it won't insert an inequality operator |
|
567 # in this case) |
|
568 relation.children[1].operator = '=' |
|
569 variables.append(newvar) |
|
570 needsel.add(newvar.name) |
|
571 #self.insertedvars.append((var.name, self.schema['identity'], |
|
572 # newvar.name)) |
|
573 |
|
574 def _choose_var(self, sourcevars): |
|
575 secondchoice = None |
|
576 if len(self._sourcesvars) > 1: |
|
577 # priority to variable from subscopes |
|
578 for var in sourcevars: |
|
579 if not var.scope is self.rqlst: |
|
580 if isinstance(var, Variable): |
|
581 return var, sourcevars.pop(var) |
|
582 secondchoice = var |
|
583 else: |
|
584 # priority to variable outer scope |
|
585 for var in sourcevars: |
|
586 if var.scope is self.rqlst: |
|
587 if isinstance(var, Variable): |
|
588 return var, sourcevars.pop(var) |
|
589 secondchoice = var |
|
590 if secondchoice is not None: |
|
591 return secondchoice, sourcevars.pop(secondchoice) |
|
592 # priority to variable |
|
593 for var in sourcevars: |
|
594 if isinstance(var, Variable): |
|
595 return var, sourcevars.pop(var) |
|
596 # whatever |
|
597 var = iter(sourcevars).next() |
|
598 return var, sourcevars.pop(var) |
|
599 |
|
600 def _expand_vars(self, var, sourcevars, scope, solindices): |
|
601 variables = [var] |
|
602 nbunlinked = 1 |
|
603 linkedvars = self._linkedvars |
|
604 # variable has to belong to the same scope if there is more |
|
605 # than the system source remaining |
|
606 if len(self._sourcesvars) > 1 and not scope is self.rqlst: |
|
607 candidates = (v for v in sourcevars.keys() if scope is v.scope) |
|
608 else: |
|
609 candidates = sourcevars #.iterkeys() |
|
610 candidates = [v for v in candidates |
|
611 if isinstance(v, Constant) or |
|
612 (solindices.issubset(sourcevars[v]) and v in linkedvars)] |
|
613 # repeat until no variable can't be added, since addition of a new |
|
614 # variable may permit to another one to be added |
|
615 modified = True |
|
616 while modified and candidates: |
|
617 modified = False |
|
618 for var in candidates[:]: |
|
619 # we only want one unlinked variable in each generated query |
|
620 if isinstance(var, Constant) or \ |
|
621 any(v for v in variables if v in linkedvars[var]): |
|
622 variables.append(var) |
|
623 # constant nodes should be systematically deleted |
|
624 if isinstance(var, Constant): |
|
625 del sourcevars[var] |
|
626 # variable nodes should be deleted once all possible solution |
|
627 # indices have been consumed |
|
628 else: |
|
629 sourcevars[var] -= solindices |
|
630 if not sourcevars[var]: |
|
631 del sourcevars[var] |
|
632 candidates.remove(var) |
|
633 modified = True |
|
634 return variables |
|
635 |
|
636 def _expand_sources(self, selected_source, vars, solindices): |
|
637 sources = [selected_source] |
|
638 sourcesvars = self._sourcesvars |
|
639 for source in sourcesvars: |
|
640 if source is selected_source: |
|
641 continue |
|
642 for var in vars: |
|
643 if not (var in sourcesvars[source] and |
|
644 solindices.issubset(sourcesvars[source][var])): |
|
645 break |
|
646 else: |
|
647 sources.append(source) |
|
648 if source.uri != 'system': |
|
649 for var in vars: |
|
650 varsolindices = sourcesvars[source][var] |
|
651 varsolindices -= solindices |
|
652 if not varsolindices: |
|
653 del sourcesvars[source][var] |
|
654 |
|
655 return sources |
|
656 |
|
657 def _cleanup_sourcesvars(self, sources, solindices): |
|
658 """on final parts, remove solutions so we know they are already processed""" |
|
659 for source in sources: |
|
660 try: |
|
661 sourcevar = self._sourcesvars[source] |
|
662 except KeyError: |
|
663 continue |
|
664 for var, varsolindices in sourcevar.items(): |
|
665 varsolindices -= solindices |
|
666 if not varsolindices: |
|
667 del sourcevar[var] |
|
668 |
|
669 def merge_input_maps(self, allsolindices): |
|
670 """inputmaps is a dictionary with tuple of solution indices as key with an |
|
671 associateed input map as value. This function compute for each solution |
|
672 its necessary input map and return them grouped |
|
673 |
|
674 ex: |
|
675 inputmaps = {(0, 1, 2): {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1'}, |
|
676 (1,): {'X': 't2.C0', 'T': 't2.C1'}} |
|
677 return : [([1], {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1', |
|
678 'X': 't2.C0', 'T': 't2.C1'}), |
|
679 ([0,2], {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1'})] |
|
680 """ |
|
681 if not self._inputmaps: |
|
682 return [(allsolindices, None)] |
|
683 mapbysol = {} |
|
684 # compute a single map for each solution |
|
685 for solindices, basemap in self._inputmaps.iteritems(): |
|
686 for solindex in solindices: |
|
687 solmap = mapbysol.setdefault(solindex, {}) |
|
688 solmap.update(basemap) |
|
689 try: |
|
690 allsolindices.remove(solindex) |
|
691 except KeyError: |
|
692 continue # already removed |
|
693 # group results by identical input map |
|
694 result = [] |
|
695 for solindex, solmap in mapbysol.iteritems(): |
|
696 for solindices, commonmap in result: |
|
697 if commonmap == solmap: |
|
698 solindices.append(solindex) |
|
699 break |
|
700 else: |
|
701 result.append( ([solindex], solmap) ) |
|
702 if allsolindices: |
|
703 result.append( (list(allsolindices), None) ) |
|
704 return result |
|
705 |
|
706 def build_final_part(self, select, solindices, inputmap, sources, |
|
707 insertedvars): |
|
708 plan = self.plan |
|
709 rqlst = plan.finalize(select, [self._solutions[i] for i in solindices], |
|
710 insertedvars) |
|
711 if self.temptable is None and self.finaltable is None: |
|
712 return OneFetchStep(plan, rqlst, sources, inputmap=inputmap) |
|
713 table = self.temptable or self.finaltable |
|
714 return FetchStep(plan, rqlst, sources, table, True, inputmap) |
|
715 |
|
716 def build_non_final_part(self, select, solindices, sources, insertedvars, |
|
717 table): |
|
718 """non final step, will have to store results in a temporary table""" |
|
719 plan = self.plan |
|
720 rqlst = plan.finalize(select, [self._solutions[i] for i in solindices], |
|
721 insertedvars) |
|
722 step = FetchStep(plan, rqlst, sources, table, False) |
|
723 # update input map for following steps, according to processed solutions |
|
724 inputmapkey = tuple(sorted(solindices)) |
|
725 inputmap = self._inputmaps.setdefault(inputmapkey, {}) |
|
726 inputmap.update(step.outputmap) |
|
727 plan.add_step(step) |
|
728 |
|
729 |
|
730 class MSPlanner(SSPlanner): |
|
731 """MultiSourcesPlanner: build execution plan for rql queries |
|
732 |
|
733 decompose the RQL query according to sources'schema |
|
734 """ |
|
735 |
|
736 def build_select_plan(self, plan, rqlst): |
|
737 """build execution plan for a SELECT RQL query |
|
738 |
|
739 the rqlst should not be tagged at this point |
|
740 """ |
|
741 if server.DEBUG: |
|
742 print '-'*80 |
|
743 print 'PLANNING', rqlst |
|
744 for select in rqlst.children: |
|
745 if len(select.solutions) > 1: |
|
746 hasmultiplesols = True |
|
747 break |
|
748 else: |
|
749 hasmultiplesols = False |
|
750 # preprocess deals with security insertion and returns a new syntax tree |
|
751 # which have to be executed to fulfill the query: according |
|
752 # to permissions for variable's type, different rql queries may have to |
|
753 # be executed |
|
754 plan.preprocess(rqlst) |
|
755 ppis = [PartPlanInformation(plan, select, self.rqlhelper) |
|
756 for select in rqlst.children] |
|
757 steps = self._union_plan(plan, rqlst, ppis) |
|
758 if server.DEBUG: |
|
759 from pprint import pprint |
|
760 for step in plan.steps: |
|
761 pprint(step.test_repr()) |
|
762 pprint(steps[0].test_repr()) |
|
763 return steps |
|
764 |
|
765 def _ppi_subqueries(self, ppi): |
|
766 # part plan info for subqueries |
|
767 plan = ppi.plan |
|
768 inputmap = {} |
|
769 for subquery in ppi.rqlst.with_[:]: |
|
770 sppis = [PartPlanInformation(plan, select) |
|
771 for select in subquery.query.children] |
|
772 for sppi in sppis: |
|
773 if sppi.needsplit or sppi.part_sources != ppi.part_sources: |
|
774 temptable = 'T%s' % make_uid(id(subquery)) |
|
775 sstep = self._union_plan(plan, subquery.query, sppis, temptable)[0] |
|
776 break |
|
777 else: |
|
778 sstep = None |
|
779 if sstep is not None: |
|
780 ppi.rqlst.with_.remove(subquery) |
|
781 for i, colalias in enumerate(subquery.aliases): |
|
782 inputmap[colalias.name] = '%s.C%s' % (temptable, i) |
|
783 ppi.plan.add_step(sstep) |
|
784 return inputmap |
|
785 |
|
786 def _union_plan(self, plan, union, ppis, temptable=None): |
|
787 tosplit, cango, allsources = [], {}, set() |
|
788 for planinfo in ppis: |
|
789 if planinfo.needsplit: |
|
790 tosplit.append(planinfo) |
|
791 else: |
|
792 cango.setdefault(planinfo.part_sources, []).append(planinfo) |
|
793 for source in planinfo.part_sources: |
|
794 allsources.add(source) |
|
795 # first add steps for query parts which doesn't need to splitted |
|
796 steps = [] |
|
797 for sources, cppis in cango.iteritems(): |
|
798 byinputmap = {} |
|
799 for ppi in cppis: |
|
800 select = ppi.rqlst |
|
801 if sources != (plan.session.repo.system_source,): |
|
802 add_types_restriction(self.schema, select) |
|
803 # part plan info for subqueries |
|
804 inputmap = self._ppi_subqueries(ppi) |
|
805 aggrstep = need_aggr_step(select, sources) |
|
806 if aggrstep: |
|
807 atemptable = 'T%s' % make_uid(id(select)) |
|
808 sunion = Union() |
|
809 sunion.append(select) |
|
810 selected = select.selection[:] |
|
811 select_group_sort(select) |
|
812 step = AggrStep(plan, selected, select, atemptable, temptable) |
|
813 step.set_limit_offset(select.limit, select.offset) |
|
814 select.limit = None |
|
815 select.offset = 0 |
|
816 fstep = FetchStep(plan, sunion, sources, atemptable, True, inputmap) |
|
817 step.children.append(fstep) |
|
818 steps.append(step) |
|
819 else: |
|
820 byinputmap.setdefault(tuple(inputmap.iteritems()), []).append( (select) ) |
|
821 for inputmap, queries in byinputmap.iteritems(): |
|
822 inputmap = dict(inputmap) |
|
823 sunion = Union() |
|
824 for select in queries: |
|
825 sunion.append(select) |
|
826 if temptable: |
|
827 steps.append(FetchStep(plan, sunion, sources, temptable, True, inputmap)) |
|
828 else: |
|
829 steps.append(OneFetchStep(plan, sunion, sources, inputmap)) |
|
830 # then add steps for splitted query parts |
|
831 for planinfo in tosplit: |
|
832 steps.append(self.split_part(planinfo, temptable)) |
|
833 if len(steps) > 1: |
|
834 if temptable: |
|
835 step = UnionFetchStep(plan) |
|
836 else: |
|
837 step = UnionStep(plan) |
|
838 step.children = steps |
|
839 return (step,) |
|
840 return steps |
|
841 |
|
842 # internal methods for multisources decomposition ######################### |
|
843 |
|
844 def split_part(self, ppi, temptable): |
|
845 ppi.finaltable = temptable |
|
846 plan = ppi.plan |
|
847 select = ppi.rqlst |
|
848 subinputmap = self._ppi_subqueries(ppi) |
|
849 stepdefs = ppi.part_steps() |
|
850 if need_aggr_step(select, ppi.part_sources, stepdefs): |
|
851 atemptable = 'T%s' % make_uid(id(select)) |
|
852 selection = select.selection[:] |
|
853 select_group_sort(select) |
|
854 else: |
|
855 atemptable = None |
|
856 selection = select.selection |
|
857 ppi.temptable = atemptable |
|
858 vfilter = VariablesFiltererVisitor(self.schema, ppi) |
|
859 steps = [] |
|
860 for sources, variables, solindices, scope, needsel, final in stepdefs: |
|
861 # extract an executable query using only the specified variables |
|
862 if sources[0].uri == 'system': |
|
863 # in this case we have to merge input maps before call to |
|
864 # filter so already processed restriction are correctly |
|
865 # removed |
|
866 solsinputmaps = ppi.merge_input_maps(solindices) |
|
867 for solindices, inputmap in solsinputmaps: |
|
868 minrqlst, insertedvars = vfilter.filter( |
|
869 sources, variables, scope, set(solindices), needsel, final) |
|
870 if inputmap is None: |
|
871 inputmap = subinputmap |
|
872 else: |
|
873 inputmap.update(subinputmap) |
|
874 steps.append(ppi.build_final_part(minrqlst, solindices, inputmap, |
|
875 sources, insertedvars)) |
|
876 else: |
|
877 # this is a final part (i.e. retreiving results for the |
|
878 # original query part) if all variable / sources have been |
|
879 # treated or if this is the last shot for used solutions |
|
880 minrqlst, insertedvars = vfilter.filter( |
|
881 sources, variables, scope, solindices, needsel, final) |
|
882 if final: |
|
883 solsinputmaps = ppi.merge_input_maps(solindices) |
|
884 for solindices, inputmap in solsinputmaps: |
|
885 if inputmap is None: |
|
886 inputmap = subinputmap |
|
887 else: |
|
888 inputmap.update(subinputmap) |
|
889 steps.append(ppi.build_final_part(minrqlst, solindices, inputmap, |
|
890 sources, insertedvars)) |
|
891 else: |
|
892 table = '_T%s%s' % (''.join(sorted(v._ms_table_key() for v in variables)), |
|
893 ''.join(sorted(str(i) for i in solindices))) |
|
894 ppi.build_non_final_part(minrqlst, solindices, sources, |
|
895 insertedvars, table) |
|
896 # finally: join parts, deal with aggregat/group/sorts if necessary |
|
897 if atemptable is not None: |
|
898 step = AggrStep(plan, selection, select, atemptable, temptable) |
|
899 step.children = steps |
|
900 elif len(steps) > 1: |
|
901 if temptable: |
|
902 step = UnionFetchStep(plan) |
|
903 else: |
|
904 step = UnionStep(plan) |
|
905 step.children = steps |
|
906 else: |
|
907 step = steps[0] |
|
908 if select.limit is not None or select.offset: |
|
909 step.set_limit_offset(select.limit, select.offset) |
|
910 return step |
|
911 |
|
912 |
|
913 class UnsupportedBranch(Exception): |
|
914 pass |
|
915 |
|
916 |
|
917 class VariablesFiltererVisitor(object): |
|
918 def __init__(self, schema, ppi): |
|
919 self.schema = schema |
|
920 self.ppi = ppi |
|
921 self.skip = {} |
|
922 self.hasaggrstep = self.ppi.temptable |
|
923 self.extneedsel = frozenset(vref.name for sortterm in ppi.rqlst.orderby |
|
924 for vref in sortterm.iget_nodes(VariableRef)) |
|
925 |
|
926 def _rqlst_accept(self, rqlst, node, newroot, variables, setfunc=None): |
|
927 try: |
|
928 newrestr, node_ = node.accept(self, newroot, variables[:]) |
|
929 except UnsupportedBranch: |
|
930 return rqlst |
|
931 if setfunc is not None and newrestr is not None: |
|
932 setfunc(newrestr) |
|
933 if not node_ is node: |
|
934 rqlst = node.parent |
|
935 return rqlst |
|
936 |
|
937 def filter(self, sources, variables, rqlst, solindices, needsel, final): |
|
938 if server.DEBUG: |
|
939 print 'filter', final and 'final' or '', sources, variables, rqlst, solindices, needsel |
|
940 newroot = Select() |
|
941 self.sources = sources |
|
942 self.solindices = solindices |
|
943 self.final = final |
|
944 # variables which appear in unsupported branches |
|
945 needsel |= self.extneedsel |
|
946 self.needsel = needsel |
|
947 # variables which appear in supported branches |
|
948 self.mayneedsel = set() |
|
949 # new inserted variables |
|
950 self.insertedvars = [] |
|
951 # other structures (XXX document) |
|
952 self.mayneedvar, self.hasvar = {}, {} |
|
953 self.use_only_defined = False |
|
954 self.scopes = {rqlst: newroot} |
|
955 if rqlst.where: |
|
956 rqlst = self._rqlst_accept(rqlst, rqlst.where, newroot, variables, |
|
957 newroot.set_where) |
|
958 if isinstance(rqlst, Select): |
|
959 self.use_only_defined = True |
|
960 if rqlst.groupby: |
|
961 groupby = [] |
|
962 for node in rqlst.groupby: |
|
963 rqlst = self._rqlst_accept(rqlst, node, newroot, variables, |
|
964 groupby.append) |
|
965 if groupby: |
|
966 newroot.set_groupby(groupby) |
|
967 if rqlst.having: |
|
968 having = [] |
|
969 for node in rqlst.having: |
|
970 rqlst = self._rqlst_accept(rqlst, node, newroot, variables, |
|
971 having.append) |
|
972 if having: |
|
973 newroot.set_having(having) |
|
974 if final and rqlst.orderby and not self.hasaggrstep: |
|
975 orderby = [] |
|
976 for node in rqlst.orderby: |
|
977 rqlst = self._rqlst_accept(rqlst, node, newroot, variables, |
|
978 orderby.append) |
|
979 if orderby: |
|
980 newroot.set_orderby(orderby) |
|
981 self.process_selection(newroot, variables, rqlst) |
|
982 elif not newroot.where: |
|
983 # no restrictions have been copied, just select variables and add |
|
984 # type restriction (done later by add_types_restriction) |
|
985 for v in variables: |
|
986 if not isinstance(v, Variable): |
|
987 continue |
|
988 newroot.append_selected(VariableRef(newroot.get_variable(v.name))) |
|
989 solutions = self.ppi.copy_solutions(solindices) |
|
990 cleanup_solutions(newroot, solutions) |
|
991 newroot.set_possible_types(solutions) |
|
992 if final: |
|
993 if self.hasaggrstep: |
|
994 self.add_necessary_selection(newroot, self.mayneedsel & self.extneedsel) |
|
995 newroot.distinct = rqlst.distinct |
|
996 else: |
|
997 self.add_necessary_selection(newroot, self.mayneedsel & self.needsel) |
|
998 # insert vars to fetch constant values when needed |
|
999 for (varname, rschema), reldefs in self.mayneedvar.iteritems(): |
|
1000 for rel, ored in reldefs: |
|
1001 if not (varname, rschema) in self.hasvar: |
|
1002 self.hasvar[(varname, rschema)] = None # just to avoid further insertion |
|
1003 cvar = newroot.make_variable() |
|
1004 for sol in newroot.solutions: |
|
1005 sol[cvar.name] = rschema.objects(sol[varname])[0] |
|
1006 # if the current restriction is not used in a OR branch, |
|
1007 # we can keep it, else we have to drop the constant |
|
1008 # restriction (or we may miss some results) |
|
1009 if not ored: |
|
1010 rel = rel.copy(newroot) |
|
1011 newroot.add_restriction(rel) |
|
1012 # add a relation to link the variable |
|
1013 newroot.remove_node(rel.children[1]) |
|
1014 cmp = Comparison('=') |
|
1015 rel.append(cmp) |
|
1016 cmp.append(VariableRef(cvar)) |
|
1017 self.insertedvars.append((varname, rschema, cvar.name)) |
|
1018 newroot.append_selected(VariableRef(newroot.get_variable(cvar.name))) |
|
1019 # NOTE: even if the restriction is done by this query, we have |
|
1020 # to let it in the original rqlst so that it appears anyway in |
|
1021 # the "final" query, else we may change the meaning of the query |
|
1022 # if there are NOT somewhere : |
|
1023 # 'NOT X relation Y, Y name "toto"' means X WHERE X isn't related |
|
1024 # to Y whose name is toto while |
|
1025 # 'NOT X relation Y' means X WHERE X has no 'relation' (whatever Y) |
|
1026 elif ored: |
|
1027 newroot.remove_node(rel) |
|
1028 add_types_restriction(self.schema, rqlst, newroot, solutions) |
|
1029 if server.DEBUG: |
|
1030 print '--->', newroot |
|
1031 return newroot, self.insertedvars |
|
1032 |
|
1033 def visit_and(self, node, newroot, variables): |
|
1034 subparts = [] |
|
1035 for i in xrange(len(node.children)): |
|
1036 child = node.children[i] |
|
1037 try: |
|
1038 newchild, child_ = child.accept(self, newroot, variables) |
|
1039 if not child_ is child: |
|
1040 node = child_.parent |
|
1041 if newchild is None: |
|
1042 continue |
|
1043 subparts.append(newchild) |
|
1044 except UnsupportedBranch: |
|
1045 continue |
|
1046 if not subparts: |
|
1047 return None, node |
|
1048 if len(subparts) == 1: |
|
1049 return subparts[0], node |
|
1050 return copy_node(newroot, node, subparts), node |
|
1051 |
|
1052 visit_or = visit_and |
|
1053 |
|
1054 def _relation_supported(self, rtype): |
|
1055 for source in self.sources: |
|
1056 if not source.support_relation(rtype): |
|
1057 return False |
|
1058 return True |
|
1059 |
|
1060 def visit_relation(self, node, newroot, variables): |
|
1061 if not node.is_types_restriction(): |
|
1062 if node in self.skip and self.solindices.issubset(self.skip[node]): |
|
1063 if not self.schema.rschema(node.r_type).is_final(): |
|
1064 # can't really skip the relation if one variable is selected and only |
|
1065 # referenced by this relation |
|
1066 for vref in node.iget_nodes(VariableRef): |
|
1067 stinfo = vref.variable.stinfo |
|
1068 if stinfo['selected'] and len(stinfo['relations']) == 1: |
|
1069 break |
|
1070 else: |
|
1071 return None, node |
|
1072 else: |
|
1073 return None, node |
|
1074 if not self._relation_supported(node.r_type): |
|
1075 raise UnsupportedBranch() |
|
1076 # don't copy type restriction unless this is the only relation for the |
|
1077 # rhs variable, else they'll be reinserted later as needed (else we may |
|
1078 # copy a type restriction while the variable is not actually used) |
|
1079 elif not any(self._relation_supported(rel.r_type) |
|
1080 for rel in node.children[0].variable.stinfo['relations']): |
|
1081 rel, node = self.visit_default(node, newroot, variables) |
|
1082 return rel, node |
|
1083 else: |
|
1084 raise UnsupportedBranch() |
|
1085 rschema = self.schema.rschema(node.r_type) |
|
1086 res = self.visit_default(node, newroot, variables)[0] |
|
1087 ored = node.ored() |
|
1088 if rschema.is_final() or rschema.inlined: |
|
1089 vrefs = node.children[1].get_nodes(VariableRef) |
|
1090 if not vrefs: |
|
1091 if not ored: |
|
1092 self.skip.setdefault(node, set()).update(self.solindices) |
|
1093 else: |
|
1094 self.mayneedvar.setdefault((node.children[0].name, rschema), []).append( (res, ored) ) |
|
1095 |
|
1096 else: |
|
1097 assert len(vrefs) == 1 |
|
1098 vref = vrefs[0] |
|
1099 # XXX check operator ? |
|
1100 self.hasvar[(node.children[0].name, rschema)] = vref |
|
1101 if self._may_skip_attr_rel(rschema, node, vref, ored, variables, res): |
|
1102 self.skip.setdefault(node, set()).update(self.solindices) |
|
1103 elif not ored: |
|
1104 self.skip.setdefault(node, set()).update(self.solindices) |
|
1105 return res, node |
|
1106 |
|
1107 def _may_skip_attr_rel(self, rschema, rel, vref, ored, variables, res): |
|
1108 var = vref.variable |
|
1109 if ored: |
|
1110 return False |
|
1111 if var.name in self.extneedsel or var.stinfo['selected']: |
|
1112 return False |
|
1113 if not same_scope(var): |
|
1114 return False |
|
1115 if any(v for v,_ in var.stinfo['attrvars'] if not v.name in variables): |
|
1116 return False |
|
1117 return True |
|
1118 |
|
1119 def visit_exists(self, node, newroot, variables): |
|
1120 newexists = node.__class__() |
|
1121 self.scopes = {node: newexists} |
|
1122 subparts, node = self._visit_children(node, newroot, variables) |
|
1123 if not subparts: |
|
1124 return None, node |
|
1125 newexists.set_where(subparts[0]) |
|
1126 return newexists, node |
|
1127 |
|
1128 def visit_not(self, node, newroot, variables): |
|
1129 subparts, node = self._visit_children(node, newroot, variables) |
|
1130 if not subparts: |
|
1131 return None, node |
|
1132 return copy_node(newroot, node, subparts), node |
|
1133 |
|
1134 def visit_group(self, node, newroot, variables): |
|
1135 if not self.final: |
|
1136 return None, node |
|
1137 return self.visit_default(node, newroot, variables) |
|
1138 |
|
1139 def visit_variableref(self, node, newroot, variables): |
|
1140 if self.use_only_defined: |
|
1141 if not node.variable.name in newroot.defined_vars: |
|
1142 raise UnsupportedBranch(node.name) |
|
1143 elif not node.variable in variables: |
|
1144 raise UnsupportedBranch(node.name) |
|
1145 self.mayneedsel.add(node.name) |
|
1146 # set scope so we can insert types restriction properly |
|
1147 newvar = newroot.get_variable(node.name) |
|
1148 newvar.stinfo['scope'] = self.scopes.get(node.variable.scope, newroot) |
|
1149 return VariableRef(newvar), node |
|
1150 |
|
1151 def visit_constant(self, node, newroot, variables): |
|
1152 return copy_node(newroot, node), node |
|
1153 |
|
1154 def visit_default(self, node, newroot, variables): |
|
1155 subparts, node = self._visit_children(node, newroot, variables) |
|
1156 return copy_node(newroot, node, subparts), node |
|
1157 |
|
1158 visit_comparison = visit_mathexpression = visit_constant = visit_function = visit_default |
|
1159 visit_sort = visit_sortterm = visit_default |
|
1160 |
|
1161 def _visit_children(self, node, newroot, variables): |
|
1162 subparts = [] |
|
1163 for i in xrange(len(node.children)): |
|
1164 child = node.children[i] |
|
1165 newchild, child_ = child.accept(self, newroot, variables) |
|
1166 if not child is child_: |
|
1167 node = child_.parent |
|
1168 if newchild is not None: |
|
1169 subparts.append(newchild) |
|
1170 return subparts, node |
|
1171 |
|
1172 def process_selection(self, newroot, variables, rqlst): |
|
1173 if self.final: |
|
1174 for term in rqlst.selection: |
|
1175 newroot.append_selected(term.copy(newroot)) |
|
1176 for vref in term.get_nodes(VariableRef): |
|
1177 self.needsel.add(vref.name) |
|
1178 return |
|
1179 for term in rqlst.selection: |
|
1180 vrefs = term.get_nodes(VariableRef) |
|
1181 if vrefs: |
|
1182 supportedvars = [] |
|
1183 for vref in vrefs: |
|
1184 var = vref.variable |
|
1185 if var in variables: |
|
1186 supportedvars.append(vref) |
|
1187 continue |
|
1188 else: |
|
1189 self.needsel.add(vref.name) |
|
1190 break |
|
1191 else: |
|
1192 for vref in vrefs: |
|
1193 newroot.append_selected(vref.copy(newroot)) |
|
1194 supportedvars = [] |
|
1195 for vref in supportedvars: |
|
1196 if not vref in newroot.get_selected_variables(): |
|
1197 newroot.append_selected(VariableRef(newroot.get_variable(vref.name))) |
|
1198 |
|
1199 def add_necessary_selection(self, newroot, variables): |
|
1200 selected = tuple(newroot.get_selected_variables()) |
|
1201 for varname in variables: |
|
1202 var = newroot.defined_vars[varname] |
|
1203 for vref in var.references(): |
|
1204 rel = vref.relation() |
|
1205 if rel is None and vref in selected: |
|
1206 # already selected |
|
1207 break |
|
1208 else: |
|
1209 selvref = VariableRef(var) |
|
1210 newroot.append_selected(selvref) |
|
1211 if newroot.groupby: |
|
1212 newroot.add_group_var(VariableRef(selvref.variable, noautoref=1)) |