1 # copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
3 # |
|
4 # This file is part of CubicWeb. |
|
5 # |
|
6 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
7 # terms of the GNU Lesser General Public License as published by the Free |
|
8 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
9 # any later version. |
|
10 # |
|
11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
13 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
14 # details. |
|
15 # |
|
16 # You should have received a copy of the GNU Lesser General Public License along |
|
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
18 """plan execution of rql queries on multiple sources |
|
19 |
|
20 the best way to understand what are we trying to acheive here is to read the |
|
21 unit-tests in unittest_msplanner.py |
|
22 |
|
23 |
|
24 What you need to know |
|
25 ~~~~~~~~~~~~~~~~~~~~~ |
|
26 1. The system source is expected to support every entity and relation types |
|
27 |
|
28 2. Given "X relation Y": |
|
29 |
|
30 * if relation, X and Y types are supported by the external source, we suppose |
|
31 by default that X and Y should both come from the same source as the |
|
32 relation. You can specify otherwise by adding relation into the |
|
33 "cross_relations" set in the source's mapping file and it that case, we'll |
|
34 consider that we can also find in the system source some relation between |
|
35 X and Y coming from different sources. |
|
36 |
|
37 * if "relation" isn't supported by the external source but X or Y |
|
38 types (or both) are, we suppose by default that can find in the system |
|
39 source some relation where X and/or Y come from the external source. You |
|
40 can specify otherwise by adding relation into the "dont_cross_relations" |
|
41 set in the source's mapping file and it that case, we'll consider that we |
|
42 can only find in the system source some relation between X and Y coming |
|
43 the system source. |
|
44 |
|
45 |
|
46 Implementation |
|
47 ~~~~~~~~~~~~~~ |
|
48 XXX explain algorithm |
|
49 |
|
50 |
|
51 Exemples of multi-sources query execution |
|
52 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
53 For a system source and a ldap user source (only CWUser and its attributes |
|
54 is supported, no group or such): |
|
55 |
|
56 :CWUser X: |
|
57 1. fetch CWUser X from both sources and return concatenation of results |
|
58 |
|
59 :CWUser X WHERE X in_group G, G name 'users': |
|
60 * catch 1 |
|
61 1. fetch CWUser X from both sources, store concatenation of results into a |
|
62 temporary table |
|
63 2. return the result of TMP X WHERE X in_group G, G name 'users' from the |
|
64 system source |
|
65 * catch 2 |
|
66 1. return the result of CWUser X WHERE X in_group G, G name 'users' from system |
|
67 source, that's enough (optimization of the sql querier will avoid join on |
|
68 CWUser, so we will directly get local eids) |
|
69 |
|
70 :CWUser X,L WHERE X in_group G, X login L, G name 'users': |
|
71 1. fetch Any X,L WHERE X is CWUser, X login L from both sources, store |
|
72 concatenation of results into a temporary table |
|
73 2. return the result of Any X, L WHERE X is TMP, X login LX in_group G, |
|
74 G name 'users' from the system source |
|
75 |
|
76 |
|
77 :Any X WHERE X owned_by Y: |
|
78 * catch 1 |
|
79 1. fetch CWUser X from both sources, store concatenation of results into a |
|
80 temporary table |
|
81 2. return the result of Any X WHERE X owned_by Y, Y is TMP from the system |
|
82 source |
|
83 * catch 2 |
|
84 1. return the result of Any X WHERE X owned_by Y from system source, that's |
|
85 enough (optimization of the sql querier will avoid join on CWUser, so we |
|
86 will directly get local eids) |
|
87 """ |
|
88 |
|
89 __docformat__ = "restructuredtext en" |
|
90 |
|
91 from itertools import imap, ifilterfalse |
|
92 |
|
93 from logilab.common.compat import any |
|
94 from logilab.common.decorators import cached |
|
95 from logilab.common.deprecation import deprecated |
|
96 |
|
97 from rql import BadRQLQuery |
|
98 from rql.stmts import Union, Select |
|
99 from rql.nodes import (VariableRef, Comparison, Relation, Constant, Variable, |
|
100 Not, Exists, SortTerm, Function) |
|
101 |
|
102 from cubicweb import server |
|
103 from cubicweb.utils import make_uid |
|
104 from cubicweb.rqlrewrite import add_types_restriction, cleanup_solutions |
|
105 from cubicweb.server.ssplanner import SSPlanner, OneFetchStep |
|
106 from cubicweb.server.mssteps import * |
|
107 |
|
108 Variable._ms_table_key = lambda x: x.name |
|
109 Relation._ms_table_key = lambda x: x.r_type |
|
110 # str() Constant.value to ensure generated table name won't be unicode |
|
111 Constant._ms_table_key = lambda x: str(x.value) |
|
112 |
|
113 Variable._ms_may_be_processed = lambda x, terms, linkedterms: any( |
|
114 t for t in terms if t in linkedterms.get(x, ())) |
|
115 Relation._ms_may_be_processed = lambda x, terms, linkedterms: all( |
|
116 getattr(hs, 'variable', hs) in terms for hs in x.get_variable_parts()) |
|
117 |
|
118 def ms_scope(term): |
|
119 rel = None |
|
120 scope = term.scope |
|
121 if isinstance(term, Variable) and len(term.stinfo['relations']) == 1: |
|
122 rel = iter(term.stinfo['relations']).next().relation() |
|
123 elif isinstance(term, Constant): |
|
124 rel = term.relation() |
|
125 elif isinstance(term, Relation): |
|
126 rel = term |
|
127 if rel is not None and ( |
|
128 rel.r_type != 'identity' and rel.scope is scope |
|
129 and isinstance(rel.parent, Exists) and rel.parent.neged(strict=True)): |
|
130 return scope.parent.scope |
|
131 return scope |
|
132 |
|
133 def need_intersect(select, getrschema): |
|
134 for rel in select.iget_nodes(Relation): |
|
135 if isinstance(rel.parent, Exists) and rel.parent.neged(strict=True) and not rel.is_types_restriction(): |
|
136 rschema = getrschema(rel.r_type) |
|
137 if not rschema.final: |
|
138 # if one of the relation's variable is ambiguous but not |
|
139 # invariant, an intersection will be necessary |
|
140 for vref in rel.get_nodes(VariableRef): |
|
141 var = vref.variable |
|
142 if (var.valuable_references() == 1 |
|
143 and len(var.stinfo['possibletypes']) > 1): |
|
144 return True |
|
145 return False |
|
146 |
|
147 def neged_relation(rel): |
|
148 parent = rel.parent |
|
149 return isinstance(parent, Not) or (isinstance(parent, Exists) and |
|
150 isinstance(parent.parent, Not)) |
|
151 |
|
152 def need_source_access_relation(vargraph): |
|
153 if not vargraph: |
|
154 return False |
|
155 # check vargraph contains some other relation than the identity relation |
|
156 # test of key nature since it may be a variable name (don't care about that) |
|
157 # or a 2-uple (var1, var2) associated to the relation to traverse to go from |
|
158 # var1 to var2 |
|
159 return any(key for key, val in vargraph.iteritems() |
|
160 if isinstance(key, tuple) and val != 'identity') |
|
161 |
|
162 def need_aggr_step(select, sources, stepdefs=None): |
|
163 """return True if a temporary table is necessary to store some partial |
|
164 results to execute the given query |
|
165 """ |
|
166 if len(sources) == 1: |
|
167 # can do everything at once with a single source |
|
168 return False |
|
169 if select.orderby or select.groupby or select.has_aggregat: |
|
170 # if more than one source, we need a temp table to deal with sort / |
|
171 # groups / aggregat if : |
|
172 # * the rqlst won't be splitted (in the other case the last query |
|
173 # using partial temporary table can do sort/groups/aggregat without |
|
174 # the need for a later AggrStep) |
|
175 # * the rqlst is splitted in multiple steps and there are more than one |
|
176 # final step |
|
177 if stepdefs is None: |
|
178 return True |
|
179 has_one_final = False |
|
180 fstepsolindices = set() |
|
181 for stepdef in stepdefs: |
|
182 if stepdef[-1]: |
|
183 if has_one_final or frozenset(stepdef[2]) != fstepsolindices: |
|
184 return True |
|
185 has_one_final = True |
|
186 else: |
|
187 fstepsolindices.update(stepdef[2]) |
|
188 return False |
|
189 |
|
190 def select_group_sort(select): # XXX something similar done in rql2sql |
|
191 # add variables used in groups and sort terms to the selection |
|
192 # if necessary |
|
193 if select.groupby: |
|
194 for vref in select.groupby: |
|
195 if not vref in select.selection: |
|
196 select.append_selected(vref.copy(select)) |
|
197 for sortterm in select.orderby: |
|
198 for vref in sortterm.iget_nodes(VariableRef): |
|
199 if not vref in select.get_selected_variables(): |
|
200 # we can't directly insert sortterm.term because it references |
|
201 # a variable of the select before the copy. |
|
202 # XXX if constant term are used to define sort, their value |
|
203 # may necessite a decay |
|
204 select.append_selected(vref.copy(select)) |
|
205 if select.groupby and not vref in select.groupby: |
|
206 select.add_group_var(vref.copy(select)) |
|
207 |
|
208 def allequals(solutions): |
|
209 """return true if all solutions are identical""" |
|
210 sol = solutions.next() |
|
211 noconstsol = None |
|
212 for sol_ in solutions: |
|
213 if sol_ != sol: |
|
214 return False |
|
215 return True |
|
216 |
|
217 # XXX move functions below to rql ############################################## |
|
218 |
|
219 def is_ancestor(n1, n2): |
|
220 """return True if n2 is a parent scope of n1""" |
|
221 p = n1.parent |
|
222 while p is not None: |
|
223 if p is n2: |
|
224 return True |
|
225 p = p.parent |
|
226 return False |
|
227 |
|
228 def copy_node(newroot, node, subparts=()): |
|
229 newnode = node.__class__(*node.initargs(newroot)) |
|
230 for part in subparts: |
|
231 newnode.append(part) |
|
232 return newnode |
|
233 |
|
234 def used_in_outer_scope(var, scope): |
|
235 """return true if the variable is used in an outer scope of the given scope |
|
236 """ |
|
237 for rel in var.stinfo['relations']: |
|
238 rscope = ms_scope(rel) |
|
239 if not rscope is scope and is_ancestor(scope, rscope): |
|
240 return True |
|
241 return False |
|
242 |
|
243 ################################################################################ |
|
244 |
|
245 class PartPlanInformation(object): |
|
246 """regroups necessary information to execute some part of a "global" rql |
|
247 query ("global" means as received by the querier, which may result in |
|
248 several internal queries, e.g. parts, due to security insertions). Actually |
|
249 a PPI is created for each subquery and for each query in a union. |
|
250 |
|
251 It exposes as well some methods helping in executing this part on a |
|
252 multi-sources repository, modifying its internal structure during the |
|
253 process. |
|
254 |
|
255 :attr plan: |
|
256 the execution plan |
|
257 :attr rqlst: |
|
258 the original rql syntax tree handled by this part |
|
259 |
|
260 :attr needsplit: |
|
261 bool telling if the query has to be split into multiple steps for |
|
262 execution or if it can be executed at once |
|
263 |
|
264 :attr temptable: |
|
265 a SQL temporary table name or None, if necessary to handle aggregate / |
|
266 sorting for this part of the query |
|
267 |
|
268 :attr finaltable: |
|
269 a SQL table name or None, if results for this part of the query should be |
|
270 written into a temporary table (usually shared by multiple PPI) |
|
271 |
|
272 :attr sourcesterms: |
|
273 a dictionary {source : {term: set([solution index, ])}} telling for each |
|
274 source which terms are supported for which solutions. A "term" may be |
|
275 either a rql Variable, Constant or Relation node. |
|
276 """ |
|
277 def __init__(self, plan, rqlst, rqlhelper=None): |
|
278 self.plan = plan |
|
279 self.rqlst = rqlst |
|
280 self.needsplit = False |
|
281 self.temptable = None |
|
282 self.finaltable = None |
|
283 # shortcuts |
|
284 self._schema = plan.schema |
|
285 self._session = plan.session |
|
286 self._repo = self._session.repo |
|
287 self._solutions = rqlst.solutions |
|
288 self._solindices = range(len(self._solutions)) |
|
289 self.system_source = self._repo.system_source |
|
290 # source : {term: [solution index, ]} |
|
291 self.sourcesterms = self._sourcesterms = {} |
|
292 # source : {relation: set(child variable and constant)} |
|
293 self._crossrelations = {} |
|
294 # term : set(sources) |
|
295 self._discarded_sources = {} |
|
296 # dictionary of variables and constants which are linked to each other |
|
297 # using a non final relation supported by multiple sources (crossed or |
|
298 # not). |
|
299 self._linkedterms = {} |
|
300 # processing |
|
301 termssources = self._compute_sourcesterms() |
|
302 self._remove_invalid_sources(termssources) |
|
303 self._compute_needsplit() |
|
304 # after initialisation, .sourcesterms contains the same thing as |
|
305 # ._sourcesterms though during plan construction, ._sourcesterms will |
|
306 # be modified while .sourcesterms will be kept unmodified |
|
307 self.sourcesterms = {} |
|
308 for k, v in self._sourcesterms.iteritems(): |
|
309 self.sourcesterms[k] = {} |
|
310 for k2, v2 in v.iteritems(): |
|
311 self.sourcesterms[k][k2] = v2.copy() |
|
312 # cleanup linked var |
|
313 for var, linkedrelsinfo in self._linkedterms.iteritems(): |
|
314 self._linkedterms[var] = frozenset(x[0] for x in linkedrelsinfo) |
|
315 # map output of a step to input of a following step |
|
316 self._inputmaps = {} |
|
317 # record input map conflicts to resolve them on final step generation |
|
318 self._conflicts = [] |
|
319 if rqlhelper is not None: # else test |
|
320 self._insert_identity_variable = rqlhelper._annotator.rewrite_shared_optional |
|
321 if server.DEBUG & server.DBG_MS: |
|
322 print 'sourcesterms:' |
|
323 self._debug_sourcesterms() |
|
324 |
|
325 def _debug_sourcesterms(self): |
|
326 for source in self._sourcesterms: |
|
327 print '-', source |
|
328 for term, sols in self._sourcesterms[source].items(): |
|
329 print ' -', term, id(term), ':', sols |
|
330 |
|
331 def copy_solutions(self, solindices): |
|
332 return [self._solutions[solidx].copy() for solidx in solindices] |
|
333 |
|
334 @property |
|
335 @cached |
|
336 def part_sources(self): |
|
337 if self._sourcesterms: |
|
338 return tuple(sorted(self._sourcesterms)) |
|
339 return (self.system_source,) |
|
340 |
|
341 @property |
|
342 @cached |
|
343 def _sys_source_set(self): |
|
344 return frozenset((self.system_source, solindex) |
|
345 for solindex in self._solindices) |
|
346 |
|
347 @cached |
|
348 def _norel_support_set(self, relation): |
|
349 """return a set of (source, solindex) where source doesn't support the |
|
350 relation |
|
351 """ |
|
352 return frozenset((source, solidx) for source in self._repo.sources |
|
353 for solidx in self._solindices |
|
354 if not ((source.support_relation(relation.r_type)) |
|
355 or relation.r_type in source.dont_cross_relations)) |
|
356 |
|
357 def _compute_sourcesterms(self): |
|
358 """compute for each term (variable, rewritten constant, relation) and |
|
359 for each solution in the rqlst which sources support them |
|
360 """ |
|
361 repo = self._repo |
|
362 eschema = self._schema.eschema |
|
363 sourcesterms = self._sourcesterms |
|
364 # find for each source which variable/solution are supported |
|
365 for varname, varobj in self.rqlst.defined_vars.items(): |
|
366 # if variable has an eid specified, we can get its source directly |
|
367 # NOTE: use uidrel and not constnode to deal with "X eid IN(1,2,3,4)" |
|
368 if varobj.stinfo['uidrel'] is not None: |
|
369 rel = varobj.stinfo['uidrel'] |
|
370 hasrel = len(varobj.stinfo['relations']) > 1 |
|
371 for const in rel.children[1].get_nodes(Constant): |
|
372 eid = const.eval(self.plan.args) |
|
373 source = self._session.source_from_eid(eid) |
|
374 if (source is self.system_source |
|
375 or (hasrel and varobj._q_invariant and |
|
376 not any(source.support_relation(r.r_type) |
|
377 for r in varobj.stinfo['relations'] |
|
378 if not r is rel))): |
|
379 self._set_source_for_term(self.system_source, varobj) |
|
380 else: |
|
381 self._set_source_for_term(source, varobj) |
|
382 continue |
|
383 rels = varobj.stinfo['relations'] |
|
384 if not rels and varobj.stinfo['typerel'] is None: |
|
385 # (rare) case where the variable has no type specified nor |
|
386 # relation accessed ex. "Any MAX(X)" |
|
387 self._set_source_for_term(self.system_source, varobj) |
|
388 continue |
|
389 for i, sol in enumerate(self._solutions): |
|
390 vartype = sol[varname] |
|
391 # skip final variable |
|
392 if eschema(vartype).final: |
|
393 break |
|
394 for source in repo.sources: |
|
395 if source.support_entity(vartype): |
|
396 # the source support the entity type, though we will |
|
397 # actually have to fetch from it only if |
|
398 # * the variable isn't invariant |
|
399 # * at least one supported relation specified |
|
400 if not varobj._q_invariant or \ |
|
401 any(imap(source.support_relation, |
|
402 (r.r_type for r in rels if r.r_type not in ('identity', 'eid')))): |
|
403 sourcesterms.setdefault(source, {}).setdefault(varobj, set()).add(i) |
|
404 # if variable is not invariant and is used by a relation |
|
405 # not supported by this source, we'll have to split the |
|
406 # query |
|
407 if not varobj._q_invariant and any(ifilterfalse( |
|
408 source.support_relation, (r.r_type for r in rels))): |
|
409 self.needsplit = True |
|
410 # add source for rewritten constants to sourcesterms |
|
411 self._const_vars = {} |
|
412 for vconsts in self.rqlst.stinfo['rewritten'].itervalues(): |
|
413 # remember those consts come from the same variable |
|
414 for const in vconsts: |
|
415 self._const_vars[const] = vconsts |
|
416 source = self._session.source_from_eid(const.eval(self.plan.args)) |
|
417 if source is self.system_source: |
|
418 for const in vconsts: |
|
419 self._set_source_for_term(source, const) |
|
420 elif not self._sourcesterms: |
|
421 for const in vconsts: |
|
422 self._set_source_for_term(source, const) |
|
423 elif source in self._sourcesterms: |
|
424 source_scopes = frozenset(ms_scope(t) for t in self._sourcesterms[source]) |
|
425 for const in vconsts: |
|
426 if ms_scope(const) in source_scopes: |
|
427 self._set_source_for_term(source, const) |
|
428 # if system source is used, add every rewritten constant |
|
429 # to its supported terms even when associated entity |
|
430 # doesn't actually come from it so we get a changes that |
|
431 # allequals will return True as expected when computing |
|
432 # needsplit |
|
433 # check const is used in a relation restriction |
|
434 if const.relation() and self.system_source in sourcesterms: |
|
435 self._set_source_for_term(self.system_source, const) |
|
436 # add source for relations |
|
437 rschema = self._schema.rschema |
|
438 termssources = {} |
|
439 sourcerels = [] |
|
440 for rel in self.rqlst.iget_nodes(Relation): |
|
441 # process non final relations only |
|
442 # note: don't try to get schema for 'is' relation (not available |
|
443 # during bootstrap) |
|
444 if rel.r_type == 'cw_source': |
|
445 sourcerels.append(rel) |
|
446 if not (rel.is_types_restriction() or rschema(rel.r_type).final): |
|
447 # nothing to do if relation is not supported by multiple sources |
|
448 # or if some source has it listed in its cross_relations |
|
449 # attribute |
|
450 # |
|
451 # XXX code below don't deal if some source allow relation |
|
452 # crossing but not another one |
|
453 relsources = [s for s in repo.rel_type_sources(rel.r_type) |
|
454 if s is self.system_source |
|
455 or s in self._sourcesterms] |
|
456 if len(relsources) < 2: |
|
457 # filter out sources being there because they have this |
|
458 # relation in their dont_cross_relations attribute |
|
459 relsources = [source for source in relsources |
|
460 if source.support_relation(rel.r_type)] |
|
461 if relsources: |
|
462 # this means the relation is using a variable inlined as |
|
463 # a constant and another unsupported variable, in which |
|
464 # case we put the relation in sourcesterms |
|
465 self._sourcesterms.setdefault(relsources[0], {})[rel] = set(self._solindices) |
|
466 continue |
|
467 lhs, rhs = rel.get_variable_parts() |
|
468 lhsv, rhsv = getattr(lhs, 'variable', lhs), getattr(rhs, 'variable', rhs) |
|
469 # update dictionary of sources supporting lhs and rhs vars |
|
470 if not lhsv in termssources: |
|
471 termssources[lhsv] = self._term_sources(lhs) |
|
472 if not rhsv in termssources: |
|
473 termssources[rhsv] = self._term_sources(rhs) |
|
474 self._handle_cross_relation(rel, relsources, termssources) |
|
475 self._linkedterms.setdefault(lhsv, set()).add((rhsv, rel)) |
|
476 self._linkedterms.setdefault(rhsv, set()).add((lhsv, rel)) |
|
477 # extract information from cw_source relation |
|
478 for srel in sourcerels: |
|
479 vref = srel.children[1].children[0] |
|
480 sourceeids, sourcenames = [], [] |
|
481 if isinstance(vref, Constant): |
|
482 # simplified variable |
|
483 sourceeids = None, (vref.eval(self.plan.args),) |
|
484 var = vref |
|
485 else: |
|
486 var = vref.variable |
|
487 for rel in var.stinfo['relations'] - var.stinfo['rhsrelations']: |
|
488 # skip neged eid relation since it's the kind of query |
|
489 # generated when clearing old value of '?1" relation, |
|
490 # cw_source included. See |
|
491 # unittest_ldapuser.test_copy_to_system_source |
|
492 if rel.r_type == 'name' or \ |
|
493 (rel.r_type == 'eid' and not rel.neged(strict=True)): |
|
494 if rel.r_type == 'eid': |
|
495 slist = sourceeids |
|
496 else: |
|
497 slist = sourcenames |
|
498 sources = [cst.eval(self.plan.args) |
|
499 for cst in rel.children[1].get_nodes(Constant)] |
|
500 if sources: |
|
501 if slist: |
|
502 # don't attempt to do anything |
|
503 sourcenames = sourceeids = None |
|
504 break |
|
505 slist[:] = (rel, sources) |
|
506 if sourceeids: |
|
507 rel, values = sourceeids |
|
508 sourcesdict = self._repo.sources_by_eid |
|
509 elif sourcenames: |
|
510 rel, values = sourcenames |
|
511 sourcesdict = self._repo.sources_by_uri |
|
512 else: |
|
513 sourcesdict = None |
|
514 if sourcesdict is not None: |
|
515 lhs = srel.children[0] |
|
516 try: |
|
517 sources = [sourcesdict[key] for key in values] |
|
518 except KeyError: |
|
519 raise BadRQLQuery('source conflict for term %s' % lhs.as_string()) |
|
520 if isinstance(lhs, Constant): |
|
521 source = self._session.source_from_eid(lhs.eval(self.plan.args)) |
|
522 if not source in sources: |
|
523 raise BadRQLQuery('source conflict for term %s' % lhs.as_string()) |
|
524 else: |
|
525 lhs = getattr(lhs, 'variable', lhs) |
|
526 invariant = getattr(lhs, '_q_invariant', False) |
|
527 # XXX NOT NOT |
|
528 neged = srel.neged(traverse_scope=True) or (rel and rel.neged(strict=True)) |
|
529 has_copy_based_source = False |
|
530 sources_ = [] |
|
531 for source in sources: |
|
532 if source.copy_based_source: |
|
533 has_copy_based_source = True |
|
534 if not self.system_source in sources_: |
|
535 sources_.append(self.system_source) |
|
536 else: |
|
537 sources_.append(source) |
|
538 sources = sources_ |
|
539 if neged: |
|
540 for source in sources: |
|
541 if invariant and source is self.system_source: |
|
542 continue |
|
543 self._remove_source_term(source, lhs) |
|
544 self._discarded_sources.setdefault(lhs, set()).add(source) |
|
545 usesys = self.system_source not in sources |
|
546 else: |
|
547 for source, terms in sourcesterms.items(): |
|
548 if lhs in terms and not source in sources: |
|
549 if invariant and source is self.system_source: |
|
550 continue |
|
551 self._remove_source_term(source, lhs) |
|
552 self._discarded_sources.setdefault(lhs, set()).add(source) |
|
553 usesys = self.system_source in sources |
|
554 if rel is None or (len(var.stinfo['relations']) == 2 and |
|
555 not var.stinfo['selected']): |
|
556 self._remove_source_term(self.system_source, var) |
|
557 if not (has_copy_based_source or len(sources) > 1 |
|
558 or usesys or invariant): |
|
559 if rel is None: |
|
560 srel.parent.remove(srel) |
|
561 else: |
|
562 self.rqlst.undefine_variable(var) |
|
563 self._remove_source_term(self.system_source, srel) |
|
564 return termssources |
|
565 |
|
566 def _handle_cross_relation(self, rel, relsources, termssources): |
|
567 for source in relsources: |
|
568 if rel.r_type in source.cross_relations: |
|
569 ssource = self.system_source |
|
570 crossvars = set(x.variable for x in rel.get_nodes(VariableRef)) |
|
571 for const in rel.get_nodes(Constant): |
|
572 if source.uri != 'system' and not const in self._sourcesterms.get(source, ()): |
|
573 continue |
|
574 crossvars.add(const) |
|
575 self._crossrelations.setdefault(source, {})[rel] = crossvars |
|
576 if len(crossvars) < 2: |
|
577 # this means there is a constant in the relation which is |
|
578 # not supported by the source, so we can stop here |
|
579 continue |
|
580 self._sourcesterms.setdefault(ssource, {})[rel] = set(self._solindices) |
|
581 solindices = None |
|
582 for term in crossvars: |
|
583 if len(termssources[term]) == 1 and iter(termssources[term]).next()[0].uri == 'system': |
|
584 for ov in crossvars: |
|
585 if ov is not term and (isinstance(ov, Constant) or ov._q_invariant): |
|
586 ssset = frozenset((ssource,)) |
|
587 self._remove_sources(ov, termssources[ov] - ssset) |
|
588 break |
|
589 if solindices is None: |
|
590 solindices = set(sol for s, sol in termssources[term] |
|
591 if s is source) |
|
592 else: |
|
593 solindices &= set(sol for s, sol in termssources[term] |
|
594 if s is source) |
|
595 else: |
|
596 self._sourcesterms.setdefault(source, {})[rel] = solindices |
|
597 |
|
598 def _remove_invalid_sources(self, termssources): |
|
599 """removes invalid sources from `sourcesterms` member according to |
|
600 traversed relations and their properties (which sources support them, |
|
601 can they cross sources, etc...) |
|
602 """ |
|
603 for term in self._linkedterms: |
|
604 self._remove_sources_until_stable(term, termssources) |
|
605 if len(self._sourcesterms) > 1 and hasattr(self.plan.rqlst, 'main_relations'): |
|
606 # the querier doesn't annotate write queries, need to do it here |
|
607 self.plan.annotate_rqlst() |
|
608 # insert/update/delete queries, we may get extra information from |
|
609 # the main relation (eg relations to the left of the WHERE |
|
610 if self.plan.rqlst.TYPE == 'insert': |
|
611 inserted = dict((vref.variable, etype) |
|
612 for etype, vref in self.plan.rqlst.main_variables) |
|
613 else: |
|
614 inserted = {} |
|
615 repo = self._repo |
|
616 rschema = self._schema.rschema |
|
617 for rel in self.plan.rqlst.main_relations: |
|
618 if not rschema(rel.r_type).final: |
|
619 # nothing to do if relation is not supported by multiple sources |
|
620 if len(repo.rel_type_sources(rel.r_type)) < 2: |
|
621 continue |
|
622 lhs, rhs = rel.get_variable_parts() |
|
623 try: |
|
624 lhsv = self._extern_term(lhs, termssources, inserted) |
|
625 rhsv = self._extern_term(rhs, termssources, inserted) |
|
626 except KeyError: |
|
627 continue |
|
628 self._remove_term_sources(lhsv, rel, rhsv, termssources) |
|
629 self._remove_term_sources(rhsv, rel, lhsv, termssources) |
|
630 |
|
631 def _extern_term(self, term, termssources, inserted): |
|
632 var = term.variable |
|
633 if var.stinfo['constnode']: |
|
634 termv = var.stinfo['constnode'] |
|
635 termssources[termv] = self._term_sources(termv) |
|
636 elif var in inserted: |
|
637 termv = var |
|
638 source = self._repo.locate_etype_source(inserted[var]) |
|
639 termssources[termv] = set((source, solindex) |
|
640 for solindex in self._solindices) |
|
641 else: |
|
642 termv = self.rqlst.defined_vars[var.name] |
|
643 if not termv in termssources: |
|
644 termssources[termv] = self._term_sources(termv) |
|
645 return termv |
|
646 |
|
647 def _remove_sources_until_stable(self, term, termssources): |
|
648 sourcesterms = self._sourcesterms |
|
649 for oterm, rel in self._linkedterms.get(term, ()): |
|
650 tscope = ms_scope(term) |
|
651 otscope = ms_scope(oterm) |
|
652 rscope = ms_scope(rel) |
|
653 if not tscope is otscope and rscope.neged(strict=True): |
|
654 # can't get information from relation inside a NOT exists |
|
655 # where terms don't belong to the same scope |
|
656 continue |
|
657 need_ancestor_scope = False |
|
658 if not (tscope is rscope and otscope is rscope): |
|
659 if rel.ored(): |
|
660 continue |
|
661 if rel.ored(traverse_scope=True): |
|
662 # if relation has some OR as parent, constraints should only |
|
663 # propagate from parent scope to child scope, nothing else |
|
664 need_ancestor_scope = True |
|
665 relsources = self._repo.rel_type_sources(rel.r_type) |
|
666 if neged_relation(rel) and ( |
|
667 len(relsources) < 2 |
|
668 or not isinstance(oterm, Variable) |
|
669 or oterm.valuable_references() != 1 |
|
670 or any(sourcesterms[source][term] != sourcesterms[source][oterm] |
|
671 for source in relsources |
|
672 if term in sourcesterms.get(source, ()) |
|
673 and oterm in sourcesterms.get(source, ()))): |
|
674 # neged relation doesn't allow to infer term sources unless |
|
675 # we're on a multisource relation for a term only used by this |
|
676 # relation (eg "Any X WHERE NOT X multisource_rel Y" and over is |
|
677 # Y) |
|
678 continue |
|
679 # compute invalid sources for terms and remove them |
|
680 if not need_ancestor_scope or is_ancestor(tscope, otscope): |
|
681 self._remove_term_sources(term, rel, oterm, termssources) |
|
682 if not need_ancestor_scope or is_ancestor(otscope, tscope): |
|
683 self._remove_term_sources(oterm, rel, term, termssources) |
|
684 |
|
685 def _remove_term_sources(self, term, rel, oterm, termssources): |
|
686 """remove invalid sources for term according to oterm's sources and the |
|
687 relation between those two terms. |
|
688 """ |
|
689 norelsup = self._norel_support_set(rel) |
|
690 termsources = termssources[term] |
|
691 invalid_sources = termsources - (termssources[oterm] | norelsup) |
|
692 if invalid_sources and self._repo.can_cross_relation(rel.r_type): |
|
693 invalid_sources -= self._sys_source_set |
|
694 if invalid_sources and isinstance(term, Variable) \ |
|
695 and self._need_ext_source_access(term, rel): |
|
696 # if the term is a not invariant variable, we should filter out |
|
697 # source where the relation is a cross relation from invalid |
|
698 # sources |
|
699 invalid_sources = frozenset((s, solidx) for s, solidx in invalid_sources |
|
700 if not (s in self._crossrelations and |
|
701 rel in self._crossrelations[s])) |
|
702 if invalid_sources: |
|
703 self._remove_sources(term, invalid_sources) |
|
704 discarded = self._discarded_sources.get(term) |
|
705 if discarded is not None and not any(x[0] for x in (termsources-invalid_sources) |
|
706 if not x[0] in discarded): |
|
707 raise BadRQLQuery('relation %s cant be crossed but %s and %s should ' |
|
708 'come from difference sources' % |
|
709 (rel.r_type, term.as_string(), oterm.as_string())) |
|
710 # if term is a rewritten const, we can apply the same changes to |
|
711 # all other consts inserted from the same original variable |
|
712 for const in self._const_vars.get(term, ()): |
|
713 if const is not term: |
|
714 self._remove_sources(const, invalid_sources) |
|
715 termsources -= invalid_sources |
|
716 self._remove_sources_until_stable(term, termssources) |
|
717 if isinstance(oterm, Constant): |
|
718 self._remove_sources(oterm, invalid_sources) |
|
719 |
|
720 def _compute_needsplit(self): |
|
721 """tell according to sourcesterms if the rqlst has to be splitted for |
|
722 execution among multiple sources |
|
723 |
|
724 the execution has to be split if |
|
725 * a source support an entity (non invariant) but doesn't support a |
|
726 relation on it |
|
727 * a source support an entity which is accessed by an optional relation |
|
728 * there is more than one source and either all sources'supported |
|
729 variable/solutions are not equivalent or multiple variables have to |
|
730 be fetched from some source |
|
731 """ |
|
732 # NOTE: < 2 since may be 0 on queries such as Any X WHERE X eid 2 |
|
733 if len(self._sourcesterms) < 2: |
|
734 self.needsplit = False |
|
735 # if this is not the system source but we have only constant terms |
|
736 # and no relation (other than eid), apply query on the system source |
|
737 # |
|
738 # testing for rqlst with nothing in vargraph nor defined_vars is the |
|
739 # simplest way the check the condition explained below |
|
740 if not self.system_source in self._sourcesterms and \ |
|
741 not self.rqlst.defined_vars and \ |
|
742 not need_source_access_relation(self.rqlst.vargraph): |
|
743 self._sourcesterms = {self.system_source: {}} |
|
744 elif not self.needsplit: |
|
745 if not allequals(self._sourcesterms.itervalues()): |
|
746 for source, terms in self._sourcesterms.iteritems(): |
|
747 if source is self.system_source: |
|
748 continue |
|
749 if any(x for x in terms if not isinstance(x, Constant)): |
|
750 self.needsplit = True |
|
751 return |
|
752 self._sourcesterms = {self.system_source: {}} |
|
753 self.needsplit = False |
|
754 else: |
|
755 sample = self._sourcesterms.itervalues().next() |
|
756 if len(sample) > 1: |
|
757 for term in sample: |
|
758 # need split if unlinked variable |
|
759 if isinstance(term, Variable) and not term in self._linkedterms: |
|
760 self.needsplit = True |
|
761 break |
|
762 else: |
|
763 # need split if there are some cross relation on non |
|
764 # invariant variable or if the variable is used in |
|
765 # multi-sources relation |
|
766 if self._crossrelations: |
|
767 for reldict in self._crossrelations.itervalues(): |
|
768 for rel, terms in reldict.iteritems(): |
|
769 for term in terms: |
|
770 if isinstance(term, Variable) \ |
|
771 and self._need_ext_source_access(term, rel): |
|
772 self.needsplit = True |
|
773 return |
|
774 else: |
|
775 # remove sources only accessing to constant nodes |
|
776 for source, terms in self._sourcesterms.items(): |
|
777 if source is self.system_source: |
|
778 continue |
|
779 if not any(x for x in terms if not isinstance(x, Constant)): |
|
780 del self._sourcesterms[source] |
|
781 if len(self._sourcesterms) < 2: |
|
782 self.needsplit = False |
|
783 |
|
784 @cached |
|
785 def _need_ext_source_access(self, var, rel): |
|
786 if not var._q_invariant: |
|
787 return True |
|
788 if any(r for x, r in self._linkedterms[var] |
|
789 if not r is rel and self._repo.is_multi_sources_relation(r.r_type)): |
|
790 return True |
|
791 return False |
|
792 |
|
793 def _set_source_for_term(self, source, term): |
|
794 self._sourcesterms.setdefault(source, {})[term] = set(self._solindices) |
|
795 |
|
796 def _term_sources(self, term): |
|
797 """returns possible sources for terms `term`""" |
|
798 if isinstance(term, Constant): |
|
799 source = self._session.source_from_eid(term.eval(self.plan.args)) |
|
800 return set((source, solindex) for solindex in self._solindices) |
|
801 else: |
|
802 var = getattr(term, 'variable', term) |
|
803 sources = [source for source, varobjs in self.sourcesterms.iteritems() |
|
804 if var in varobjs] |
|
805 return set((source, solindex) for source in sources |
|
806 for solindex in self.sourcesterms[source][var]) |
|
807 |
|
808 def _remove_sources(self, term, sources): |
|
809 """removes invalid sources (`sources`) from `sourcesterms` |
|
810 |
|
811 :param sources: the list of sources to remove |
|
812 :param term: the analyzed term |
|
813 """ |
|
814 sourcesterms = self._sourcesterms |
|
815 for source, solindex in sources: |
|
816 try: |
|
817 sourcesterms[source][term].remove(solindex) |
|
818 except KeyError: |
|
819 import rql.base as rqlb |
|
820 assert isinstance(term, (rqlb.BaseNode, Variable)), repr(term) |
|
821 continue # may occur with subquery column alias |
|
822 if not sourcesterms[source][term]: |
|
823 self._remove_source_term(source, term) |
|
824 |
|
825 def _remove_source_term(self, source, term): |
|
826 try: |
|
827 poped = self._sourcesterms[source].pop(term, None) |
|
828 except KeyError: |
|
829 pass |
|
830 else: |
|
831 if not self._sourcesterms[source]: |
|
832 del self._sourcesterms[source] |
|
833 |
|
834 def crossed_relation(self, source, relation): |
|
835 return relation in self._crossrelations.get(source, ()) |
|
836 |
|
837 def part_steps(self): |
|
838 """precompute necessary part steps before generating actual rql for |
|
839 each step. This is necessary to know if an aggregate step will be |
|
840 necessary or not. |
|
841 """ |
|
842 steps = [] |
|
843 select = self.rqlst |
|
844 rschema = self._schema.rschema |
|
845 for source in self.part_sources: |
|
846 try: |
|
847 sourceterms = self._sourcesterms[source] |
|
848 except KeyError: |
|
849 continue # already proceed |
|
850 while sourceterms: |
|
851 # take a term randomly, and all terms supporting the |
|
852 # same solutions |
|
853 term, solindices = self._choose_term(source, sourceterms) |
|
854 if source.uri == 'system': |
|
855 # ensure all variables are available for the latest step |
|
856 # (missing one will be available from temporary tables |
|
857 # of previous steps) |
|
858 scope = select |
|
859 terms = scope.defined_vars.values() + scope.aliases.values() |
|
860 sourceterms.clear() |
|
861 sources = [source] |
|
862 else: |
|
863 scope = ms_scope(term) |
|
864 # find which sources support the same term and solutions |
|
865 sources = self._expand_sources(source, term, solindices) |
|
866 # no try to get as much terms as possible |
|
867 terms = self._expand_terms(term, sources, sourceterms, |
|
868 scope, solindices) |
|
869 if len(terms) == 1 and isinstance(terms[0], Constant): |
|
870 # we can't generate anything interesting with a single |
|
871 # constant term (will generate an empty "Any" query), |
|
872 # go to the next iteration directly! |
|
873 continue |
|
874 if not sourceterms: |
|
875 try: |
|
876 del self._sourcesterms[source] |
|
877 except KeyError: |
|
878 # XXX already cleaned |
|
879 pass |
|
880 # set of terms which should be additionaly selected when |
|
881 # possible |
|
882 needsel = set() |
|
883 if not self._sourcesterms and scope is select: |
|
884 terms += scope.defined_vars.values() + scope.aliases.values() |
|
885 if isinstance(term, Relation) and len(sources) > 1: |
|
886 variants = set() |
|
887 partterms = [term] |
|
888 for vref in term.get_nodes(VariableRef): |
|
889 if not vref.variable._q_invariant: |
|
890 variants.add(vref.name) |
|
891 if len(variants) == 2: |
|
892 # we need an extra-step to fetch relations from each source |
|
893 # before a join with prefetched inputs |
|
894 # (see test_crossed_relation_noeid_needattr in |
|
895 # unittest_msplanner / unittest_multisources) |
|
896 lhs, rhs = term.get_variable_parts() |
|
897 steps.append( (sources, [term, getattr(lhs, 'variable', lhs), |
|
898 getattr(rhs, 'variable', rhs)], |
|
899 solindices, scope, variants, False) ) |
|
900 sources = [self.system_source] |
|
901 final = True |
|
902 else: |
|
903 # suppose this is a final step until the contrary is proven |
|
904 final = scope is select |
|
905 # add attribute variables and mark variables which should be |
|
906 # additionaly selected when possible |
|
907 for var in select.defined_vars.itervalues(): |
|
908 if not var in terms: |
|
909 stinfo = var.stinfo |
|
910 for ovar, rtype in stinfo.get('attrvars', ()): |
|
911 if ovar in terms: |
|
912 needsel.add(var.name) |
|
913 terms.append(var) |
|
914 break |
|
915 else: |
|
916 needsel.add(var.name) |
|
917 final = False |
|
918 # check all relations are supported by the sources |
|
919 for rel in scope.iget_nodes(Relation): |
|
920 if rel.is_types_restriction(): |
|
921 continue |
|
922 # take care not overwriting the existing "source" identifier |
|
923 for _source in sources: |
|
924 if not _source.support_relation(rel.r_type) or ( |
|
925 self.crossed_relation(_source, rel) and not rel in terms): |
|
926 for vref in rel.iget_nodes(VariableRef): |
|
927 needsel.add(vref.name) |
|
928 final = False |
|
929 break |
|
930 else: |
|
931 if not scope is select: |
|
932 self._exists_relation(rel, terms, needsel, source) |
|
933 # if relation is supported by all sources and some of |
|
934 # its lhs/rhs variable isn't in "terms", and the |
|
935 # other end *is* in "terms", mark it have to be |
|
936 # selected |
|
937 if source.uri != 'system' and not rschema(rel.r_type).final: |
|
938 lhs, rhs = rel.get_variable_parts() |
|
939 try: |
|
940 lhsvar = lhs.variable |
|
941 except AttributeError: |
|
942 lhsvar = lhs |
|
943 try: |
|
944 rhsvar = rhs.variable |
|
945 except AttributeError: |
|
946 rhsvar = rhs |
|
947 try: |
|
948 if lhsvar in terms and not rhsvar in terms: |
|
949 needsel.add(lhsvar.name) |
|
950 elif rhsvar in terms and not lhsvar in terms: |
|
951 needsel.add(rhsvar.name) |
|
952 except AttributeError: |
|
953 continue # not an attribute, no selection needed |
|
954 if final and source.uri != 'system': |
|
955 # check rewritten constants |
|
956 for vconsts in select.stinfo['rewritten'].itervalues(): |
|
957 const = vconsts[0] |
|
958 eid = const.eval(self.plan.args) |
|
959 _source = self._session.source_from_eid(eid) |
|
960 if len(sources) > 1 or not _source in sources: |
|
961 # if there is some rewriten constant used by a not |
|
962 # neged relation while there are some source not |
|
963 # supporting the associated entity, this step can't |
|
964 # be final (unless the relation is explicitly in |
|
965 # `terms`, eg cross relations) |
|
966 for c in vconsts: |
|
967 rel = c.relation() |
|
968 if rel is None or not (rel in terms or neged_relation(rel)): |
|
969 final = False |
|
970 break |
|
971 break |
|
972 if final: |
|
973 self._cleanup_sourcesterms(sources, solindices) |
|
974 steps.append((sources, terms, solindices, scope, needsel, final) |
|
975 ) |
|
976 if not steps[-1][-1]: |
|
977 # add a final step |
|
978 terms = select.defined_vars.values() + select.aliases.values() |
|
979 steps.append( ([self.system_source], terms, set(self._solindices), |
|
980 select, set(), True) ) |
|
981 return steps |
|
982 |
|
983 def _exists_relation(self, rel, terms, needsel, source): |
|
984 rschema = self._schema.rschema(rel.r_type) |
|
985 lhs, rhs = rel.get_variable_parts() |
|
986 try: |
|
987 lhsvar, rhsvar = lhs.variable, rhs.variable |
|
988 except AttributeError: |
|
989 pass |
|
990 else: |
|
991 # supported relation with at least one end supported, check the |
|
992 # other end is in as well. If not this usually means the |
|
993 # variable is refed by an outer scope and should be substituted |
|
994 # using an 'identity' relation (else we'll get a conflict of |
|
995 # temporary tables) |
|
996 relscope = ms_scope(rel) |
|
997 lhsscope = ms_scope(lhsvar) |
|
998 rhsscope = ms_scope(rhsvar) |
|
999 if rhsvar in terms and not lhsvar in terms and lhsscope is lhsvar.stmt: |
|
1000 self._identity_substitute(rel, lhsvar, terms, needsel, relscope) |
|
1001 elif lhsvar in terms and not rhsvar in terms and rhsscope is rhsvar.stmt: |
|
1002 self._identity_substitute(rel, rhsvar, terms, needsel, relscope) |
|
1003 elif self.crossed_relation(source, rel): |
|
1004 if lhsscope is not relscope: |
|
1005 self._identity_substitute(rel, lhsvar, terms, needsel, |
|
1006 relscope, lhsscope) |
|
1007 if rhsscope is not relscope: |
|
1008 self._identity_substitute(rel, rhsvar, terms, needsel, |
|
1009 relscope, rhsscope) |
|
1010 |
|
1011 def _identity_substitute(self, relation, var, terms, needsel, exist, |
|
1012 idrelscope=None): |
|
1013 newvar = self._insert_identity_variable(exist, var, idrelscope) |
|
1014 # ensure relation is using '=' operator, else we rely on a |
|
1015 # sqlgenerator side effect (it won't insert an inequality operator |
|
1016 # in this case) |
|
1017 relation.children[1].operator = '=' |
|
1018 terms.append(newvar) |
|
1019 needsel.add(newvar.name) |
|
1020 |
|
1021 def _choose_term(self, source, sourceterms): |
|
1022 """pick one term among terms supported by a source, which will be used |
|
1023 as a base to generate an execution step |
|
1024 """ |
|
1025 secondchoice = None |
|
1026 if len(self._sourcesterms) > 1: |
|
1027 # first, return non invariant variable of crossed relation, then the |
|
1028 # crossed relation itself |
|
1029 for term in sourceterms: |
|
1030 if (isinstance(term, Relation) |
|
1031 and self.crossed_relation(source, term) |
|
1032 and not ms_scope(term) is self.rqlst): |
|
1033 for vref in term.get_variable_parts(): |
|
1034 try: |
|
1035 var = vref.variable |
|
1036 except AttributeError: |
|
1037 # Constant |
|
1038 continue |
|
1039 if ((len(var.stinfo['relations']) > 1 or var.stinfo['selected']) |
|
1040 and var in sourceterms): |
|
1041 return var, sourceterms.pop(var) |
|
1042 return term, sourceterms.pop(term) |
|
1043 # priority to variable from subscopes |
|
1044 for term in sourceterms: |
|
1045 if not ms_scope(term) is self.rqlst: |
|
1046 if isinstance(term, Variable): |
|
1047 return term, sourceterms.pop(term) |
|
1048 secondchoice = term |
|
1049 else: |
|
1050 # priority to variable from outer scope |
|
1051 for term in sourceterms: |
|
1052 if ms_scope(term) is self.rqlst: |
|
1053 if isinstance(term, Variable): |
|
1054 return term, sourceterms.pop(term) |
|
1055 secondchoice = term |
|
1056 if secondchoice is not None: |
|
1057 return secondchoice, sourceterms.pop(secondchoice) |
|
1058 # priority to variable with the less solutions supported and with the |
|
1059 # most valuable refs. Add variable name for test predictability |
|
1060 variables = sorted([(var, sols) for (var, sols) in sourceterms.items() |
|
1061 if isinstance(var, Variable)], |
|
1062 key=lambda (v, s): (len(s), -v.valuable_references(), v.name)) |
|
1063 if variables: |
|
1064 var = variables[0][0] |
|
1065 return var, sourceterms.pop(var) |
|
1066 # priority to constant |
|
1067 for term in sourceterms: |
|
1068 if isinstance(term, Constant): |
|
1069 return term, sourceterms.pop(term) |
|
1070 # whatever (relation) |
|
1071 term = iter(sourceterms).next() |
|
1072 return term, sourceterms.pop(term) |
|
1073 |
|
1074 def _expand_sources(self, selected_source, term, solindices): |
|
1075 """return all sources supporting given term / solindices""" |
|
1076 sources = [selected_source] |
|
1077 sourcesterms = self._sourcesterms |
|
1078 for source in list(sourcesterms): |
|
1079 if source is selected_source: |
|
1080 continue |
|
1081 if not (term in sourcesterms[source] and |
|
1082 solindices.issubset(sourcesterms[source][term])): |
|
1083 continue |
|
1084 sources.append(source) |
|
1085 if source.uri != 'system' or not (isinstance(term, Variable) and not term in self._linkedterms): |
|
1086 termsolindices = sourcesterms[source][term] |
|
1087 termsolindices -= solindices |
|
1088 if not termsolindices: |
|
1089 del sourcesterms[source][term] |
|
1090 if not sourcesterms[source]: |
|
1091 del sourcesterms[source] |
|
1092 return sources |
|
1093 |
|
1094 def _expand_terms(self, term, sources, sourceterms, scope, solindices): |
|
1095 terms = [term] |
|
1096 sources = sorted(sources) |
|
1097 sourcesterms = self._sourcesterms |
|
1098 linkedterms = self._linkedterms |
|
1099 # term has to belong to the same scope if there is more |
|
1100 # than the system source remaining |
|
1101 if len(sourcesterms) > 1 and not scope is self.rqlst: |
|
1102 candidates = (t for t in sourceterms if scope is ms_scope(t)) |
|
1103 else: |
|
1104 candidates = sourceterms |
|
1105 # we only want one unlinked term in each generated query |
|
1106 candidates = [t for t in candidates |
|
1107 if isinstance(t, (Constant, Relation)) or |
|
1108 (solindices.issubset(sourceterms[t]) and t in linkedterms)] |
|
1109 cross_rels = {} |
|
1110 for source in sources: |
|
1111 cross_rels.update(self._crossrelations.get(source, {})) |
|
1112 exclude = {} |
|
1113 for crossvars in cross_rels.itervalues(): |
|
1114 vars = [t for t in crossvars if isinstance(t, Variable)] |
|
1115 try: |
|
1116 exclude[vars[0]] = vars[1] |
|
1117 exclude[vars[1]] = vars[0] |
|
1118 except IndexError: |
|
1119 pass |
|
1120 accept_term = lambda x: (not any(s for s in sources |
|
1121 if not x in sourcesterms.get(s, ())) |
|
1122 and x._ms_may_be_processed(terms, linkedterms) |
|
1123 and not exclude.get(x) in terms) |
|
1124 if isinstance(term, Relation) and term in cross_rels: |
|
1125 cross_terms = cross_rels.pop(term) |
|
1126 base_accept_term = accept_term |
|
1127 accept_term = lambda x: (base_accept_term(x) or x in cross_terms) |
|
1128 for refed in cross_terms: |
|
1129 if not refed in candidates: |
|
1130 terms.append(refed) |
|
1131 # repeat until no term can't be added, since addition of a new |
|
1132 # term may permit to another one to be added |
|
1133 modified = True |
|
1134 while modified and candidates: |
|
1135 modified = False |
|
1136 for term in candidates[:]: |
|
1137 if isinstance(term, Constant): |
|
1138 termsources = set(x[0] for x in self._term_sources(term)) |
|
1139 # ensure system source is there for constant |
|
1140 if self.system_source in sources: |
|
1141 termsources.add(self.system_source) |
|
1142 if sorted(termsources) != sources: |
|
1143 continue |
|
1144 terms.append(term) |
|
1145 candidates.remove(term) |
|
1146 modified = True |
|
1147 del sourceterms[term] |
|
1148 elif accept_term(term): |
|
1149 terms.append(term) |
|
1150 candidates.remove(term) |
|
1151 modified = True |
|
1152 self._cleanup_sourcesterms(sources, solindices, term) |
|
1153 return terms |
|
1154 |
|
1155 def _cleanup_sourcesterms(self, sources, solindices, term=None): |
|
1156 """remove solutions so we know they are already processed""" |
|
1157 for source in sources: |
|
1158 try: |
|
1159 sourceterms = self._sourcesterms[source] |
|
1160 except KeyError: |
|
1161 continue |
|
1162 if term is None: |
|
1163 for term, termsolindices in sourceterms.items(): |
|
1164 if isinstance(term, Relation) and self.crossed_relation(source, term): |
|
1165 continue |
|
1166 termsolindices -= solindices |
|
1167 if not termsolindices: |
|
1168 del sourceterms[term] |
|
1169 else: |
|
1170 try: |
|
1171 sourceterms[term] -= solindices |
|
1172 if not sourceterms[term]: |
|
1173 del sourceterms[term] |
|
1174 except KeyError: |
|
1175 pass |
|
1176 #assert term in cross_terms |
|
1177 if not sourceterms: |
|
1178 del self._sourcesterms[source] |
|
1179 |
|
1180 def merge_input_maps(self, allsolindices, complete=True): |
|
1181 """inputmaps is a dictionary with tuple of solution indices as key with |
|
1182 an associated input map as value. This function compute for each |
|
1183 solution its necessary input map and return them grouped |
|
1184 |
|
1185 ex: |
|
1186 inputmaps = {(0, 1, 2): {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1'}, |
|
1187 (1,): {'X': 't2.C0', 'T': 't2.C1'}} |
|
1188 return : [([1], {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1', |
|
1189 'X': 't2.C0', 'T': 't2.C1'}), |
|
1190 ([0,2], {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1'})] |
|
1191 """ |
|
1192 if not self._inputmaps: |
|
1193 return [(allsolindices, None)] |
|
1194 _allsolindices = allsolindices.copy() |
|
1195 mapbysol = {} |
|
1196 # compute a single map for each solution |
|
1197 for solindices, basemap in self._inputmaps.iteritems(): |
|
1198 for solindex in solindices: |
|
1199 if not (complete or solindex in allsolindices): |
|
1200 continue |
|
1201 solmap = mapbysol.setdefault(solindex, {}) |
|
1202 solmap.update(basemap) |
|
1203 try: |
|
1204 _allsolindices.remove(solindex) |
|
1205 except KeyError: |
|
1206 continue # already removed |
|
1207 # group results by identical input map |
|
1208 result = [] |
|
1209 for solindex, solmap in mapbysol.iteritems(): |
|
1210 for solindices, commonmap in result: |
|
1211 if commonmap == solmap: |
|
1212 solindices.append(solindex) |
|
1213 break |
|
1214 else: |
|
1215 result.append( ([solindex], solmap) ) |
|
1216 if _allsolindices: |
|
1217 result.append( (list(_allsolindices), None) ) |
|
1218 return result |
|
1219 |
|
1220 def build_final_part(self, select, solindices, inputmap, sources, |
|
1221 insertedvars): |
|
1222 solutions = [self._solutions[i] for i in solindices] |
|
1223 if self._conflicts and inputmap: |
|
1224 for varname, mappedto in self._conflicts: |
|
1225 var = select.defined_vars[varname] |
|
1226 newvar = select.make_variable() |
|
1227 # XXX should use var.scope but scope hasn't been computed yet |
|
1228 select.add_relation(var, 'identity', newvar) |
|
1229 for sol in solutions: |
|
1230 sol[newvar.name] = sol[varname] |
|
1231 inputmap[newvar.name] = mappedto |
|
1232 rqlst = self.plan.finalize(select, solutions, insertedvars) |
|
1233 if self.temptable is None and self.finaltable is None: |
|
1234 return OneFetchStep(self.plan, rqlst, sources, inputmap=inputmap) |
|
1235 table = self.temptable or self.finaltable |
|
1236 return FetchStep(self.plan, rqlst, sources, table, True, inputmap) |
|
1237 |
|
1238 def build_non_final_part(self, select, solindices, sources, insertedvars, |
|
1239 table): |
|
1240 """non final step, will have to store results in a temporary table""" |
|
1241 inputmapkey = tuple(sorted(solindices)) |
|
1242 solutions = [self._solutions[i] for i in solindices] |
|
1243 # XXX be smarter vs rql comparison |
|
1244 idx_key = (select.as_string(), inputmapkey, |
|
1245 tuple(sorted(sources)), tuple(sorted(insertedvars))) |
|
1246 try: |
|
1247 # if a similar step has already been process, simply backport its |
|
1248 # input map |
|
1249 step = self.plan.ms_steps_idx[idx_key] |
|
1250 except KeyError: |
|
1251 # processing needed |
|
1252 rqlst = self.plan.finalize(select, solutions, insertedvars) |
|
1253 step = FetchStep(self.plan, rqlst, sources, table, False) |
|
1254 self.plan.ms_steps_idx[idx_key] = step |
|
1255 self.plan.add_step(step) |
|
1256 # update input map for following steps, according to processed solutions |
|
1257 inputmap = self._inputmaps.setdefault(inputmapkey, {}) |
|
1258 for varname, mapping in step.outputmap.iteritems(): |
|
1259 if varname in inputmap and not '.' in varname and \ |
|
1260 not (mapping == inputmap[varname] or |
|
1261 self._schema.eschema(solutions[0][varname]).final): |
|
1262 self._conflicts.append((varname, inputmap[varname])) |
|
1263 inputmap.update(step.outputmap) |
|
1264 |
|
1265 |
|
1266 @deprecated('[3.18] old multi-source system will go away in the next version') |
|
1267 class MSPlanner(SSPlanner): |
|
1268 """MultiSourcesPlanner: build execution plan for rql queries |
|
1269 |
|
1270 decompose the RQL query according to sources'schema |
|
1271 """ |
|
1272 |
|
1273 def build_select_plan(self, plan, rqlst): |
|
1274 """build execution plan for a SELECT RQL query |
|
1275 |
|
1276 the rqlst should not be tagged at this point |
|
1277 """ |
|
1278 # preprocess deals with security insertion and returns a new syntax tree |
|
1279 # which have to be executed to fulfill the query: according |
|
1280 # to permissions for variable's type, different rql queries may have to |
|
1281 # be executed |
|
1282 plan.preprocess(rqlst) |
|
1283 if server.DEBUG & server.DBG_MS: |
|
1284 print '-'*80 |
|
1285 print 'PLANNING', rqlst |
|
1286 ppis = [PartPlanInformation(plan, select, self.rqlhelper) |
|
1287 for select in rqlst.children] |
|
1288 plan.ms_steps_idx = {} |
|
1289 steps = self._union_plan(plan, ppis) |
|
1290 if server.DEBUG & server.DBG_MS: |
|
1291 from pprint import pprint |
|
1292 for step in plan.steps: |
|
1293 pprint(step.test_repr()) |
|
1294 pprint(steps[0].test_repr()) |
|
1295 return steps |
|
1296 |
|
1297 def _ppi_subqueries(self, ppi): |
|
1298 # part plan info for subqueries |
|
1299 plan = ppi.plan |
|
1300 inputmap = {} |
|
1301 for subquery in ppi.rqlst.with_[:]: |
|
1302 sppis = [PartPlanInformation(plan, select) |
|
1303 for select in subquery.query.children] |
|
1304 for sppi in sppis: |
|
1305 if sppi.needsplit or sppi.part_sources != ppi.part_sources: |
|
1306 temptable = plan.make_temp_table_name('T%s' % make_uid(id(subquery))) |
|
1307 sstep = self._union_plan(plan, sppis, temptable)[0] |
|
1308 break |
|
1309 else: |
|
1310 sstep = None |
|
1311 if sstep is not None: |
|
1312 ppi.rqlst.with_.remove(subquery) |
|
1313 for i, colalias in enumerate(subquery.aliases): |
|
1314 inputmap[colalias.name] = '%s.C%s' % (temptable, i) |
|
1315 ppi.plan.add_step(sstep) |
|
1316 return inputmap |
|
1317 |
|
1318 def _union_plan(self, plan, ppis, temptable=None): |
|
1319 tosplit, cango, allsources = [], {}, set() |
|
1320 for planinfo in ppis: |
|
1321 if planinfo.needsplit: |
|
1322 tosplit.append(planinfo) |
|
1323 else: |
|
1324 cango.setdefault(planinfo.part_sources, []).append(planinfo) |
|
1325 for source in planinfo.part_sources: |
|
1326 allsources.add(source) |
|
1327 # first add steps for query parts which doesn't need to splitted |
|
1328 steps = [] |
|
1329 for sources, cppis in cango.iteritems(): |
|
1330 byinputmap = {} |
|
1331 for ppi in cppis: |
|
1332 select = ppi.rqlst |
|
1333 if sources != (ppi.system_source,): |
|
1334 add_types_restriction(self.schema, select) |
|
1335 # part plan info for subqueries |
|
1336 inputmap = self._ppi_subqueries(ppi) |
|
1337 aggrstep = need_aggr_step(select, sources) |
|
1338 if aggrstep: |
|
1339 atemptable = plan.make_temp_table_name('T%s' % make_uid(id(select))) |
|
1340 sunion = Union() |
|
1341 sunion.append(select) |
|
1342 selected = select.selection[:] |
|
1343 select_group_sort(select) |
|
1344 step = AggrStep(plan, selected, select, atemptable, temptable) |
|
1345 step.set_limit_offset(select.limit, select.offset) |
|
1346 select.limit = None |
|
1347 select.offset = 0 |
|
1348 fstep = FetchStep(plan, sunion, sources, atemptable, True, inputmap) |
|
1349 step.children.append(fstep) |
|
1350 steps.append(step) |
|
1351 else: |
|
1352 byinputmap.setdefault(tuple(inputmap.iteritems()), []).append( (select) ) |
|
1353 for inputmap, queries in byinputmap.iteritems(): |
|
1354 inputmap = dict(inputmap) |
|
1355 sunion = Union() |
|
1356 for select in queries: |
|
1357 sunion.append(select) |
|
1358 if temptable: |
|
1359 steps.append(FetchStep(plan, sunion, sources, temptable, True, inputmap)) |
|
1360 else: |
|
1361 steps.append(OneFetchStep(plan, sunion, sources, inputmap)) |
|
1362 # then add steps for splitted query parts |
|
1363 for planinfo in tosplit: |
|
1364 steps.append(self.split_part(planinfo, temptable)) |
|
1365 if len(steps) > 1: |
|
1366 if temptable: |
|
1367 step = UnionFetchStep(plan) |
|
1368 else: |
|
1369 step = UnionStep(plan) |
|
1370 step.children = steps |
|
1371 return (step,) |
|
1372 return steps |
|
1373 |
|
1374 # internal methods for multisources decomposition ######################### |
|
1375 |
|
1376 def split_part(self, ppi, temptable): |
|
1377 ppi.finaltable = temptable |
|
1378 plan = ppi.plan |
|
1379 select = ppi.rqlst |
|
1380 subinputmap = self._ppi_subqueries(ppi) |
|
1381 stepdefs = ppi.part_steps() |
|
1382 if need_aggr_step(select, ppi.part_sources, stepdefs): |
|
1383 atemptable = plan.make_temp_table_name('T%s' % make_uid(id(select))) |
|
1384 selection = select.selection[:] |
|
1385 select_group_sort(select) |
|
1386 else: |
|
1387 atemptable = None |
|
1388 selection = select.selection |
|
1389 ppi.temptable = atemptable |
|
1390 vfilter = TermsFiltererVisitor(self.schema, ppi) |
|
1391 steps = [] |
|
1392 multifinal = len([x for x in stepdefs if x[-1]]) >= 2 |
|
1393 for sources, terms, solindices, scope, needsel, final in stepdefs: |
|
1394 # extract an executable query using only the specified terms |
|
1395 if sources[0].uri == 'system': |
|
1396 # in this case we have to merge input maps before call to |
|
1397 # filter so already processed restriction are correctly |
|
1398 # removed |
|
1399 solsinputmaps = ppi.merge_input_maps( |
|
1400 solindices, complete=not (final and multifinal)) |
|
1401 for solindices, inputmap in solsinputmaps: |
|
1402 minrqlst, insertedvars = vfilter.filter( |
|
1403 sources, terms, scope, set(solindices), needsel, final) |
|
1404 if inputmap is None: |
|
1405 inputmap = subinputmap |
|
1406 else: |
|
1407 inputmap.update(subinputmap) |
|
1408 steps.append(ppi.build_final_part(minrqlst, solindices, inputmap, |
|
1409 sources, insertedvars)) |
|
1410 else: |
|
1411 # this is a final part (i.e. retreiving results for the |
|
1412 # original query part) if all term / sources have been |
|
1413 # treated or if this is the last shot for used solutions |
|
1414 minrqlst, insertedvars = vfilter.filter( |
|
1415 sources, terms, scope, solindices, needsel, final) |
|
1416 if final: |
|
1417 solsinputmaps = ppi.merge_input_maps( |
|
1418 solindices, complete=not (final and multifinal)) |
|
1419 if len(solsinputmaps) > 1: |
|
1420 refrqlst = minrqlst |
|
1421 for solindices, inputmap in solsinputmaps: |
|
1422 if inputmap is None: |
|
1423 inputmap = subinputmap |
|
1424 else: |
|
1425 inputmap.update(subinputmap) |
|
1426 if len(solsinputmaps) > 1: |
|
1427 minrqlst = refrqlst.copy() |
|
1428 sources = sources[:] |
|
1429 if inputmap and len(sources) > 1: |
|
1430 sources.remove(ppi.system_source) |
|
1431 steps.append(ppi.build_final_part(minrqlst, solindices, None, |
|
1432 sources, insertedvars)) |
|
1433 steps.append(ppi.build_final_part(minrqlst, solindices, inputmap, |
|
1434 [ppi.system_source], insertedvars)) |
|
1435 else: |
|
1436 steps.append(ppi.build_final_part(minrqlst, solindices, inputmap, |
|
1437 sources, insertedvars)) |
|
1438 else: |
|
1439 table = plan.make_temp_table_name('T%s' % make_uid(id(select))) |
|
1440 ppi.build_non_final_part(minrqlst, solindices, sources, |
|
1441 insertedvars, table) |
|
1442 # finally: join parts, deal with aggregat/group/sorts if necessary |
|
1443 if atemptable is not None: |
|
1444 step = AggrStep(plan, selection, select, atemptable, temptable) |
|
1445 step.children = steps |
|
1446 elif len(steps) > 1: |
|
1447 getrschema = self.schema.rschema |
|
1448 if need_intersect(select, getrschema) or any(need_intersect(select, getrschema) |
|
1449 for step in steps |
|
1450 for select in step.union.children): |
|
1451 if temptable: |
|
1452 raise NotImplementedError('oops') # IntersectFetchStep(plan) |
|
1453 else: |
|
1454 step = IntersectStep(plan) |
|
1455 else: |
|
1456 if temptable: |
|
1457 step = UnionFetchStep(plan) |
|
1458 else: |
|
1459 step = UnionStep(plan) |
|
1460 step.children = steps |
|
1461 else: |
|
1462 step = steps[0] |
|
1463 if select.limit is not None or select.offset: |
|
1464 step.set_limit_offset(select.limit, select.offset) |
|
1465 return step |
|
1466 |
|
1467 |
|
1468 class UnsupportedBranch(Exception): |
|
1469 pass |
|
1470 |
|
1471 |
|
1472 class TermsFiltererVisitor(object): |
|
1473 def __init__(self, schema, ppi): |
|
1474 self.schema = schema |
|
1475 self.ppi = ppi |
|
1476 self.skip = {} |
|
1477 self.hasaggrstep = self.ppi.temptable |
|
1478 self.extneedsel = frozenset(vref.name for sortterm in ppi.rqlst.orderby |
|
1479 for vref in sortterm.iget_nodes(VariableRef)) |
|
1480 |
|
1481 def _rqlst_accept(self, rqlst, node, newroot, terms, setfunc=None): |
|
1482 try: |
|
1483 newrestr, node_ = node.accept(self, newroot, terms[:]) |
|
1484 except UnsupportedBranch: |
|
1485 return rqlst |
|
1486 if setfunc is not None and newrestr is not None: |
|
1487 setfunc(newrestr) |
|
1488 if not node_ is node: |
|
1489 rqlst = node.parent |
|
1490 return rqlst |
|
1491 |
|
1492 def filter(self, sources, terms, rqlst, solindices, needsel, final): |
|
1493 if server.DEBUG & server.DBG_MS: |
|
1494 print 'filter', final and 'final' or '', sources, terms, rqlst, solindices, needsel |
|
1495 newroot = Select() |
|
1496 self.sources = sorted(sources) |
|
1497 self.terms = terms |
|
1498 self.solindices = solindices |
|
1499 self.final = final |
|
1500 self._pending_vrefs = [] |
|
1501 # terms which appear in unsupported branches |
|
1502 needsel |= self.extneedsel |
|
1503 self.needsel = needsel |
|
1504 # terms which appear in supported branches |
|
1505 self.mayneedsel = set() |
|
1506 # new inserted variables |
|
1507 self.insertedvars = [] |
|
1508 # other structures (XXX document) |
|
1509 self.mayneedvar, self.hasvar = {}, {} |
|
1510 self.use_only_defined = False |
|
1511 self.scopes = {rqlst: newroot} |
|
1512 self.current_scope = rqlst |
|
1513 if rqlst.where: |
|
1514 rqlst = self._rqlst_accept(rqlst, rqlst.where, newroot, terms, |
|
1515 newroot.set_where) |
|
1516 if isinstance(rqlst, Select): |
|
1517 self.use_only_defined = True |
|
1518 if rqlst.groupby: |
|
1519 groupby = [] |
|
1520 for node in rqlst.groupby: |
|
1521 rqlst = self._rqlst_accept(rqlst, node, newroot, terms, |
|
1522 groupby.append) |
|
1523 if groupby: |
|
1524 newroot.set_groupby(groupby) |
|
1525 if rqlst.having: |
|
1526 having = [] |
|
1527 for node in rqlst.having: |
|
1528 rqlst = self._rqlst_accept(rqlst, node, newroot, terms, |
|
1529 having.append) |
|
1530 if having: |
|
1531 newroot.set_having(having) |
|
1532 if final and rqlst.orderby and not self.hasaggrstep: |
|
1533 orderby = [] |
|
1534 for node in rqlst.orderby: |
|
1535 rqlst = self._rqlst_accept(rqlst, node, newroot, terms, |
|
1536 orderby.append) |
|
1537 if orderby: |
|
1538 newroot.set_orderby(orderby) |
|
1539 elif rqlst.orderby: |
|
1540 for sortterm in rqlst.orderby: |
|
1541 if any(f for f in sortterm.iget_nodes(Function) if f.name == 'FTIRANK'): |
|
1542 newnode, oldnode = sortterm.accept(self, newroot, terms) |
|
1543 if newnode is not None: |
|
1544 newroot.add_sort_term(newnode) |
|
1545 self.process_selection(newroot, terms, rqlst) |
|
1546 elif not newroot.where: |
|
1547 # no restrictions have been copied, just select terms and add |
|
1548 # type restriction (done later by add_types_restriction) |
|
1549 for v in terms: |
|
1550 if not isinstance(v, Variable): |
|
1551 continue |
|
1552 newroot.append_selected(VariableRef(newroot.get_variable(v.name))) |
|
1553 solutions = self.ppi.copy_solutions(solindices) |
|
1554 cleanup_solutions(newroot, solutions) |
|
1555 newroot.set_possible_types(solutions) |
|
1556 if final: |
|
1557 if self.hasaggrstep: |
|
1558 self.add_necessary_selection(newroot, self.mayneedsel & self.extneedsel) |
|
1559 newroot.distinct = rqlst.distinct |
|
1560 else: |
|
1561 self.add_necessary_selection(newroot, self.mayneedsel & self.needsel) |
|
1562 # insert vars to fetch constant values when needed |
|
1563 for (varname, rschema), reldefs in self.mayneedvar.iteritems(): |
|
1564 for rel, ored in reldefs: |
|
1565 if not (varname, rschema) in self.hasvar: |
|
1566 self.hasvar[(varname, rschema)] = None # just to avoid further insertion |
|
1567 cvar = newroot.make_variable() |
|
1568 for sol in newroot.solutions: |
|
1569 sol[cvar.name] = rschema.objects(sol[varname])[0] |
|
1570 # if the current restriction is not used in a OR branch, |
|
1571 # we can keep it, else we have to drop the constant |
|
1572 # restriction (or we may miss some results) |
|
1573 if not ored: |
|
1574 rel = rel.copy(newroot) |
|
1575 newroot.add_restriction(rel) |
|
1576 # add a relation to link the variable |
|
1577 newroot.remove_node(rel.children[1]) |
|
1578 cmp = Comparison('=') |
|
1579 rel.append(cmp) |
|
1580 cmp.append(VariableRef(cvar)) |
|
1581 self.insertedvars.append((varname, rschema, cvar.name)) |
|
1582 newroot.append_selected(VariableRef(newroot.get_variable(cvar.name))) |
|
1583 # NOTE: even if the restriction is done by this query, we have |
|
1584 # to let it in the original rqlst so that it appears anyway in |
|
1585 # the "final" query, else we may change the meaning of the query |
|
1586 # if there are NOT somewhere : |
|
1587 # 'NOT X relation Y, Y name "toto"' means X WHERE X isn't related |
|
1588 # to Y whose name is toto while |
|
1589 # 'NOT X relation Y' means X WHERE X has no 'relation' (whatever Y) |
|
1590 elif ored: |
|
1591 newroot.remove_node(rel) |
|
1592 add_types_restriction(self.schema, rqlst, newroot, solutions) |
|
1593 if server.DEBUG & server.DBG_MS: |
|
1594 print '--->', newroot |
|
1595 return newroot, self.insertedvars |
|
1596 |
|
1597 def visit_and(self, node, newroot, terms): |
|
1598 subparts = [] |
|
1599 for i in xrange(len(node.children)): |
|
1600 child = node.children[i] |
|
1601 try: |
|
1602 newchild, child_ = child.accept(self, newroot, terms) |
|
1603 if not child_ is child: |
|
1604 node = child_.parent |
|
1605 if newchild is None: |
|
1606 continue |
|
1607 subparts.append(newchild) |
|
1608 except UnsupportedBranch: |
|
1609 continue |
|
1610 if not subparts: |
|
1611 return None, node |
|
1612 if len(subparts) == 1: |
|
1613 return subparts[0], node |
|
1614 return copy_node(newroot, node, subparts), node |
|
1615 |
|
1616 visit_or = visit_and |
|
1617 |
|
1618 def _relation_supported(self, relation): |
|
1619 rtype = relation.r_type |
|
1620 for source in self.sources: |
|
1621 if not source.support_relation(rtype) or ( |
|
1622 rtype in source.cross_relations and not relation in self.terms): |
|
1623 return False |
|
1624 if not self.final and not relation in self.terms: |
|
1625 rschema = self.schema.rschema(relation.r_type) |
|
1626 if not rschema.final: |
|
1627 for term in relation.get_nodes((VariableRef, Constant)): |
|
1628 term = getattr(term, 'variable', term) |
|
1629 termsources = sorted(set(x[0] for x in self.ppi._term_sources(term))) |
|
1630 if termsources and termsources != self.sources: |
|
1631 return False |
|
1632 return True |
|
1633 |
|
1634 def visit_relation(self, node, newroot, terms): |
|
1635 if not node.is_types_restriction(): |
|
1636 if not node in terms and node in self.skip and self.solindices.issubset(self.skip[node]): |
|
1637 return None, node |
|
1638 if not self._relation_supported(node): |
|
1639 raise UnsupportedBranch() |
|
1640 # don't copy type restriction unless this is the only supported relation |
|
1641 # for the lhs variable, else they'll be reinserted later as needed (in |
|
1642 # other cases we may copy a type restriction while the variable is not |
|
1643 # actually used) |
|
1644 elif not (node.neged(strict=True) or |
|
1645 any(self._relation_supported(rel) |
|
1646 for rel in node.children[0].variable.stinfo['relations'])): |
|
1647 return self.visit_default(node, newroot, terms) |
|
1648 else: |
|
1649 raise UnsupportedBranch() |
|
1650 rschema = self.schema.rschema(node.r_type) |
|
1651 self._pending_vrefs = [] |
|
1652 try: |
|
1653 res = self.visit_default(node, newroot, terms)[0] |
|
1654 except Exception: |
|
1655 # when a relation isn't supported, we should dereference potentially |
|
1656 # introduced variable refs |
|
1657 for vref in self._pending_vrefs: |
|
1658 vref.unregister_reference() |
|
1659 raise |
|
1660 ored = node.ored() |
|
1661 if rschema.final or rschema.inlined: |
|
1662 vrefs = node.children[1].get_nodes(VariableRef) |
|
1663 if not vrefs: |
|
1664 if not ored: |
|
1665 self.skip.setdefault(node, set()).update(self.solindices) |
|
1666 else: |
|
1667 self.mayneedvar.setdefault((node.children[0].name, rschema), []).append( (res, ored) ) |
|
1668 else: |
|
1669 assert len(vrefs) == 1 |
|
1670 vref = vrefs[0] |
|
1671 # XXX check operator ? |
|
1672 self.hasvar[(node.children[0].name, rschema)] = vref |
|
1673 if self._may_skip_attr_rel(rschema, node, vref, ored, terms, res): |
|
1674 self.skip.setdefault(node, set()).update(self.solindices) |
|
1675 elif not ored: |
|
1676 self.skip.setdefault(node, set()).update(self.solindices) |
|
1677 return res, node |
|
1678 |
|
1679 def _may_skip_attr_rel(self, rschema, rel, vref, ored, terms, res): |
|
1680 var = vref.variable |
|
1681 if ored: |
|
1682 return False |
|
1683 if var.name in self.extneedsel or var.stinfo['selected']: |
|
1684 return False |
|
1685 if not var in terms or used_in_outer_scope(var, self.current_scope): |
|
1686 return False |
|
1687 if any(v for v, _ in var.stinfo.get('attrvars', ()) if not v in terms): |
|
1688 return False |
|
1689 return True |
|
1690 |
|
1691 def visit_exists(self, node, newroot, terms): |
|
1692 newexists = node.__class__() |
|
1693 self.scopes = {node: newexists} |
|
1694 subparts, node = self._visit_children(node, newroot, terms) |
|
1695 if not subparts: |
|
1696 return None, node |
|
1697 newexists.set_where(subparts[0]) |
|
1698 return newexists, node |
|
1699 |
|
1700 def visit_not(self, node, newroot, terms): |
|
1701 subparts, node = self._visit_children(node, newroot, terms) |
|
1702 if not subparts: |
|
1703 return None, node |
|
1704 return copy_node(newroot, node, subparts), node |
|
1705 |
|
1706 def visit_group(self, node, newroot, terms): |
|
1707 if not self.final: |
|
1708 return None, node |
|
1709 return self.visit_default(node, newroot, terms) |
|
1710 |
|
1711 def visit_variableref(self, node, newroot, terms): |
|
1712 if self.use_only_defined: |
|
1713 if not node.variable.name in newroot.defined_vars: |
|
1714 raise UnsupportedBranch(node.name) |
|
1715 elif not node.variable in terms: |
|
1716 raise UnsupportedBranch(node.name) |
|
1717 self.mayneedsel.add(node.name) |
|
1718 # set scope so we can insert types restriction properly |
|
1719 newvar = newroot.get_variable(node.name) |
|
1720 newvar.stinfo['scope'] = self.scopes.get(node.variable.scope, newroot) |
|
1721 vref = VariableRef(newvar) |
|
1722 self._pending_vrefs.append(vref) |
|
1723 return vref, node |
|
1724 |
|
1725 def visit_constant(self, node, newroot, terms): |
|
1726 return copy_node(newroot, node), node |
|
1727 |
|
1728 def visit_comparison(self, node, newroot, terms): |
|
1729 subparts, node = self._visit_children(node, newroot, terms) |
|
1730 copy = copy_node(newroot, node, subparts) |
|
1731 # ignore comparison operator when fetching non final query |
|
1732 if not self.final and isinstance(node.children[0], VariableRef): |
|
1733 copy.operator = '=' |
|
1734 return copy, node |
|
1735 |
|
1736 def visit_function(self, node, newroot, terms): |
|
1737 if node.name == 'FTIRANK': |
|
1738 # FTIRANK is somewhat special... Rank function should be included in |
|
1739 # the same query has the has_text relation, potentially added to |
|
1740 # selection for latter usage |
|
1741 if not self.hasaggrstep and self.final and node not in self.skip: |
|
1742 return self.visit_default(node, newroot, terms) |
|
1743 elif any(s for s in self.sources if s.uri != 'system'): |
|
1744 return None, node |
|
1745 # p = node.parent |
|
1746 # while p is not None and not isinstance(p, SortTerm): |
|
1747 # p = p.parent |
|
1748 # if isinstance(p, SortTerm): |
|
1749 if not self.hasaggrstep and self.final and node in self.skip: |
|
1750 return Constant(self.skip[node], 'Int'), node |
|
1751 # XXX only if not yet selected |
|
1752 newroot.append_selected(node.copy(newroot)) |
|
1753 self.skip[node] = len(newroot.selection) |
|
1754 return None, node |
|
1755 return self.visit_default(node, newroot, terms) |
|
1756 |
|
1757 def visit_default(self, node, newroot, terms): |
|
1758 subparts, node = self._visit_children(node, newroot, terms) |
|
1759 return copy_node(newroot, node, subparts), node |
|
1760 |
|
1761 visit_mathexpression = visit_constant = visit_default |
|
1762 |
|
1763 def visit_sortterm(self, node, newroot, terms): |
|
1764 subparts, node = self._visit_children(node, newroot, terms) |
|
1765 if not subparts: |
|
1766 return None, node |
|
1767 return copy_node(newroot, node, subparts), node |
|
1768 |
|
1769 def _visit_children(self, node, newroot, terms): |
|
1770 subparts = [] |
|
1771 for i in xrange(len(node.children)): |
|
1772 child = node.children[i] |
|
1773 newchild, child_ = child.accept(self, newroot, terms) |
|
1774 if not child is child_: |
|
1775 node = child_.parent |
|
1776 if newchild is not None: |
|
1777 subparts.append(newchild) |
|
1778 return subparts, node |
|
1779 |
|
1780 def process_selection(self, newroot, terms, rqlst): |
|
1781 if self.final: |
|
1782 for term in rqlst.selection: |
|
1783 newroot.append_selected(term.copy(newroot)) |
|
1784 for vref in term.get_nodes(VariableRef): |
|
1785 self.needsel.add(vref.name) |
|
1786 return |
|
1787 for term in rqlst.selection: |
|
1788 vrefs = term.get_nodes(VariableRef) |
|
1789 if vrefs: |
|
1790 supportedvars = [] |
|
1791 for vref in vrefs: |
|
1792 var = vref.variable |
|
1793 if var in terms: |
|
1794 supportedvars.append(vref) |
|
1795 continue |
|
1796 else: |
|
1797 self.needsel.add(vref.name) |
|
1798 break |
|
1799 else: |
|
1800 for vref in vrefs: |
|
1801 newroot.append_selected(vref.copy(newroot)) |
|
1802 supportedvars = [] |
|
1803 for vref in supportedvars: |
|
1804 if not vref in newroot.get_selected_variables(): |
|
1805 newroot.append_selected(VariableRef(newroot.get_variable(vref.name))) |
|
1806 elif term in self.terms: |
|
1807 newroot.append_selected(term.copy(newroot)) |
|
1808 |
|
1809 def add_necessary_selection(self, newroot, terms): |
|
1810 selected = tuple(newroot.get_selected_variables()) |
|
1811 for varname in terms: |
|
1812 var = newroot.defined_vars[varname] |
|
1813 for vref in var.references(): |
|
1814 rel = vref.relation() |
|
1815 if rel is None and vref in selected: |
|
1816 # already selected |
|
1817 break |
|
1818 else: |
|
1819 selvref = VariableRef(var) |
|
1820 newroot.append_selected(selvref) |
|
1821 if newroot.groupby: |
|
1822 newroot.add_group_var(VariableRef(selvref.variable, noautoref=1)) |
|