1 """plan execution of rql queries on multiple sources |
1 """plan execution of rql queries on multiple sources |
2 |
2 |
3 the best way to understand what are we trying to acheive here is to read |
3 the best way to understand what are we trying to acheive here is to read the |
4 the unit-tests in unittest_querier_planner.py |
4 unit-tests in unittest_msplanner.py |
5 |
5 |
6 |
6 |
7 |
7 What you need to know |
8 Split and execution specifications |
8 ~~~~~~~~~~~~~~~~~~~~~ |
9 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
9 1. The system source is expected to support every entity and relation types |
|
10 |
|
11 2. Given "X relation Y": |
|
12 |
|
13 * if relation, X and Y types are supported by the external source, we suppose |
|
14 by default that X and Y should both come from the same source as the |
|
15 relation. You can specify otherwise by adding relation into the |
|
16 "cross_relations" set in the source's mapping file and it that case, we'll |
|
17 consider that we can also find in the system source some relation between |
|
18 X and Y coming from different sources. |
|
19 |
|
20 * if "relation" isn't supported by the external source but X or Y |
|
21 types (or both) are, we suppose by default that can find in the system |
|
22 source some relation where X and/or Y come from the external source. You |
|
23 can specify otherwise by adding relation into the "dont_cross_relations" |
|
24 set in the source's mapping file and it that case, we'll consider that we |
|
25 can only find in the system source some relation between X and Y coming |
|
26 the system source. |
|
27 |
|
28 |
|
29 Implementation |
|
30 ~~~~~~~~~~~~~~ |
|
31 XXX explain algorithm |
|
32 |
|
33 |
|
34 Exemples of multi-sources query execution |
|
35 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
10 For a system source and a ldap user source (only EUser and its attributes |
36 For a system source and a ldap user source (only EUser and its attributes |
11 is supported, no group or such): |
37 is supported, no group or such): |
12 |
38 |
13 |
|
14 :EUser X: |
39 :EUser X: |
15 1. fetch EUser X from both sources and return concatenation of results |
40 1. fetch EUser X from both sources and return concatenation of results |
16 |
41 |
17 |
|
18 :EUser X WHERE X in_group G, G name 'users': |
42 :EUser X WHERE X in_group G, G name 'users': |
19 * catch 1 |
43 * catch 1 |
20 1. fetch EUser X from both sources, store concatenation of results |
44 1. fetch EUser X from both sources, store concatenation of results into a |
21 into a temporary table |
45 temporary table |
22 2. return the result of TMP X WHERE X in_group G, G name 'users' from |
46 2. return the result of TMP X WHERE X in_group G, G name 'users' from the |
23 the system source |
47 system source |
24 |
|
25 * catch 2 |
48 * catch 2 |
26 1. return the result of EUser X WHERE X in_group G, G name 'users' |
49 1. return the result of EUser X WHERE X in_group G, G name 'users' from system |
27 from system source, that's enough (optimization of the sql querier |
50 source, that's enough (optimization of the sql querier will avoid join on |
28 will avoid join on EUser, so we will directly get local eids) |
51 EUser, so we will directly get local eids) |
29 |
|
30 |
52 |
31 :EUser X,L WHERE X in_group G, X login L, G name 'users': |
53 :EUser X,L WHERE X in_group G, X login L, G name 'users': |
32 1. fetch Any X,L WHERE X is EUser, X login L from both sources, store |
54 1. fetch Any X,L WHERE X is EUser, X login L from both sources, store |
33 concatenation of results into a temporary table |
55 concatenation of results into a temporary table |
34 2. return the result of Any X, L WHERE X is TMP, X login LX in_group G, |
56 2. return the result of Any X, L WHERE X is TMP, X login LX in_group G, |
35 G name 'users' from the system source |
57 G name 'users' from the system source |
36 |
58 |
37 |
59 |
38 :Any X WHERE X owned_by Y: |
60 :Any X WHERE X owned_by Y: |
39 * catch 1 |
61 * catch 1 |
40 1. fetch EUser X from both sources, store concatenation of results |
62 1. fetch EUser X from both sources, store concatenation of results into a |
41 into a temporary table |
63 temporary table |
42 2. return the result of Any X WHERE X owned_by Y, Y is TMP from |
64 2. return the result of Any X WHERE X owned_by Y, Y is TMP from the system |
43 the system source |
65 source |
44 |
|
45 * catch 2 |
66 * catch 2 |
46 1. return the result of Any X WHERE X owned_by Y |
67 1. return the result of Any X WHERE X owned_by Y from system source, that's |
47 from system source, that's enough (optimization of the sql querier |
68 enough (optimization of the sql querier will avoid join on EUser, so we |
48 will avoid join on EUser, so we will directly get local eids) |
69 will directly get local eids) |
49 |
70 |
50 |
71 |
51 :organization: Logilab |
72 :organization: Logilab |
52 :copyright: 2003-2009 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
73 :copyright: 2003-2009 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
53 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr |
74 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr |
129 if not rel.scope is var.scope: |
180 if not rel.scope is var.scope: |
130 var.stinfo['samescope'] = False |
181 var.stinfo['samescope'] = False |
131 return False |
182 return False |
132 var.stinfo['samescope'] = True |
183 var.stinfo['samescope'] = True |
133 return True |
184 return True |
134 |
185 |
135 def select_group_sort(select): # XXX something similar done in rql2sql |
186 ################################################################################ |
136 # add variables used in groups and sort terms to the selection |
|
137 # if necessary |
|
138 if select.groupby: |
|
139 for vref in select.groupby: |
|
140 if not vref in select.selection: |
|
141 select.append_selected(vref.copy(select)) |
|
142 for sortterm in select.orderby: |
|
143 for vref in sortterm.iget_nodes(VariableRef): |
|
144 if not vref in select.get_selected_variables(): |
|
145 # we can't directly insert sortterm.term because it references |
|
146 # a variable of the select before the copy. |
|
147 # XXX if constant term are used to define sort, their value |
|
148 # may necessite a decay |
|
149 select.append_selected(vref.copy(select)) |
|
150 if select.groupby and not vref in select.groupby: |
|
151 select.add_group_var(vref.copy(select)) |
|
152 |
|
153 # XXX move to rql |
|
154 def is_ancestor(n1, n2): |
|
155 p = n1.parent |
|
156 while p is not None: |
|
157 if p is n2: |
|
158 return True |
|
159 p = p.parent |
|
160 return False |
|
161 |
187 |
162 class PartPlanInformation(object): |
188 class PartPlanInformation(object): |
163 """regroups necessary information to execute some part of a "global" rql |
189 """regroups necessary information to execute some part of a "global" rql |
164 query ("global" means as received by the querier, which may result in |
190 query ("global" means as received by the querier, which may result in |
165 several internal queries, e.g. parts, due to security insertions) |
191 several internal queries, e.g. parts, due to security insertions). Actually |
166 |
192 a PPI is created for each subquery and for each query in a union. |
167 it exposes as well some methods helping in executing this part on a |
193 |
|
194 It exposes as well some methods helping in executing this part on a |
168 multi-sources repository, modifying its internal structure during the |
195 multi-sources repository, modifying its internal structure during the |
169 process |
196 process. |
170 |
197 |
171 :attr solutions: a list of mappings (varname -> vartype) |
198 :attr plan: |
172 :attr sourcesvars: |
199 the execution plan |
173 a dictionnary telling for each source which variable/solution are |
200 :attr rqlst: |
174 supported, of the form {source : {varname: [solution index, ]}} |
201 the original rql syntax tree handled by this part |
|
202 |
|
203 :attr needsplit: |
|
204 bool telling if the query has to be split into multiple steps for |
|
205 execution or if it can be executed at once |
|
206 |
|
207 :attr temptable: |
|
208 a SQL temporary table name or None, if necessary to handle aggregate / |
|
209 sorting for this part of the query |
|
210 |
|
211 :attr finaltable: |
|
212 a SQL table name or None, if results for this part of the query should be |
|
213 written into a temporary table (usually shared by multiple PPI) |
|
214 |
|
215 :attr sourcesterms: |
|
216 a dictionary {source : {term: set([solution index, ])}} telling for each |
|
217 source which terms are supported for which solutions. A "term" may be |
|
218 either a rql Variable, Constant or Relation node. |
175 """ |
219 """ |
176 def __init__(self, plan, rqlst, rqlhelper=None): |
220 def __init__(self, plan, rqlst, rqlhelper=None): |
|
221 self.plan = plan |
|
222 self.rqlst = rqlst |
177 self.needsplit = False |
223 self.needsplit = False |
178 self.temptable = None |
224 self.temptable = None |
179 self.finaltable = None |
225 self.finaltable = None |
180 self.plan = plan |
226 self._schema = plan.schema |
181 self.rqlst = rqlst |
|
182 self._session = plan.session |
227 self._session = plan.session |
|
228 self._repo = self._session.repo |
183 self._solutions = rqlst.solutions |
229 self._solutions = rqlst.solutions |
184 self._solindices = range(len(self._solutions)) |
230 self._solindices = range(len(self._solutions)) |
185 # source : {var: [solution index, ]} |
231 # source : {term: [solution index, ]} |
186 self.sourcesvars = self._sourcesvars = {} |
232 self.sourcesterms = self._sourcesterms = {} |
187 # source : {relation: set(child variable and constant)} |
233 # source : {relation: set(child variable and constant)} |
188 self._crossrelations = {} |
234 self._crossrelations = {} |
189 # dictionnary of variables which are linked to each other using a non |
235 # dictionary of variables and constants which are linked to each other |
190 # final relation which is supported by multiple sources |
236 # using a non final relation supported by multiple sources (crossed or |
191 self._linkedvars = {} |
237 # not). |
192 self._crosslinkedvars = {} |
238 self._linkedterms = {} |
193 # processing |
239 # processing |
194 self._compute_sourcesvars() |
240 termssources = self._compute_sourcesterms() |
195 self._remove_invalid_sources() |
241 self._remove_invalid_sources(termssources) |
196 self._compute_needsplit() |
242 self._compute_needsplit() |
197 self.sourcesvars = {} |
243 # after initialisation, .sourcesterms contains the same thing as |
198 for k, v in self._sourcesvars.iteritems(): |
244 # ._sourcesterms though during plan construction, ._sourcesterms will |
199 self.sourcesvars[k] = {} |
245 # be modified while .sourcesterms will be kept unmodified |
|
246 self.sourcesterms = {} |
|
247 for k, v in self._sourcesterms.iteritems(): |
|
248 self.sourcesterms[k] = {} |
200 for k2, v2 in v.iteritems(): |
249 for k2, v2 in v.iteritems(): |
201 self.sourcesvars[k][k2] = v2.copy() |
250 self.sourcesterms[k][k2] = v2.copy() |
|
251 # cleanup linked var |
|
252 for var, linkedrelsinfo in self._linkedterms.iteritems(): |
|
253 self._linkedterms[var] = frozenset(x[0] for x in linkedrelsinfo) |
|
254 # map output of a step to input of a following step |
202 self._inputmaps = {} |
255 self._inputmaps = {} |
|
256 # record input map conflicts to resolve them on final step generation |
|
257 self._conflicts = [] |
203 if rqlhelper is not None: # else test |
258 if rqlhelper is not None: # else test |
204 self._insert_identity_variable = rqlhelper._annotator.rewrite_shared_optional |
259 self._insert_identity_variable = rqlhelper._annotator.rewrite_shared_optional |
|
260 if server.DEBUG: |
|
261 print 'sourcesterms:' |
|
262 for source, terms in self.sourcesterms.items(): |
|
263 print source, terms |
205 |
264 |
206 def copy_solutions(self, solindices): |
265 def copy_solutions(self, solindices): |
207 return [self._solutions[solidx].copy() for solidx in solindices] |
266 return [self._solutions[solidx].copy() for solidx in solindices] |
208 |
267 |
209 @property |
268 @property |
210 @cached |
269 @cached |
211 def part_sources(self): |
270 def part_sources(self): |
212 if self._sourcesvars: |
271 if self._sourcesterms: |
213 return tuple(sorted(self._sourcesvars)) |
272 return tuple(sorted(self._sourcesterms)) |
214 return (self._session.repo.system_source,) |
273 return (self._repo.system_source,) |
215 |
274 |
216 @property |
275 @property |
217 @cached |
276 @cached |
218 def _sys_source_set(self): |
277 def _sys_source_set(self): |
219 return frozenset((self._session.repo.system_source, solindex) |
278 return frozenset((self._repo.system_source, solindex) |
220 for solindex in self._solindices) |
279 for solindex in self._solindices) |
221 |
280 |
222 @cached |
281 @cached |
223 def _norel_support_set(self, relation): |
282 def _norel_support_set(self, relation): |
224 """return a set of (source, solindex) where source doesn't support the |
283 """return a set of (source, solindex) where source doesn't support the |
225 relation |
284 relation |
226 """ |
285 """ |
227 return frozenset((source, solidx) for source in self._session.repo.sources |
286 return frozenset((source, solidx) for source in self._repo.sources |
228 for solidx in self._solindices |
287 for solidx in self._solindices |
229 if not ((source.support_relation(relation.r_type) and |
288 if not ((source.support_relation(relation.r_type)) |
230 not self.crossed_relation(source, relation)) |
|
231 or relation.r_type in source.dont_cross_relations)) |
289 or relation.r_type in source.dont_cross_relations)) |
232 |
290 |
233 |
291 def _compute_sourcesterms(self): |
234 def _compute_sourcesvars(self): |
292 """compute for each term (variable, rewritten constant, relation) and |
235 """compute for each variable/solution in the rqlst which sources support |
293 for each solution in the rqlst which sources support them |
236 them |
|
237 """ |
294 """ |
238 repo = self._session.repo |
295 repo = self._repo |
239 eschema = repo.schema.eschema |
296 eschema = self._schema.eschema |
240 sourcesvars = self._sourcesvars |
297 sourcesterms = self._sourcesterms |
241 # find for each source which variable/solution are supported |
298 # find for each source which variable/solution are supported |
242 for varname, varobj in self.rqlst.defined_vars.items(): |
299 for varname, varobj in self.rqlst.defined_vars.items(): |
243 # if variable has an eid specified, we can get its source directly |
300 # if variable has an eid specified, we can get its source directly |
244 # NOTE: use uidrels and not constnode to deal with "X eid IN(1,2,3,4)" |
301 # NOTE: use uidrels and not constnode to deal with "X eid IN(1,2,3,4)" |
245 if varobj.stinfo['uidrels']: |
302 if varobj.stinfo['uidrels']: |
246 vrels = varobj.stinfo['relations'] - varobj.stinfo['uidrels'] |
303 vrels = varobj.stinfo['relations'] - varobj.stinfo['uidrels'] |
247 for rel in varobj.stinfo['uidrels']: |
304 for rel in varobj.stinfo['uidrels']: |
248 if rel.neged(strict=True) or rel.operator() != '=': |
305 if rel.neged(strict=True) or rel.operator() != '=': |
249 continue |
306 continue |
250 for const in rel.children[1].get_nodes(Constant): |
307 for const in rel.children[1].get_nodes(Constant): |
251 eid = const.eval(self.plan.args) |
308 eid = const.eval(self.plan.args) |
252 source = self._session.source_from_eid(eid) |
309 source = self._session.source_from_eid(eid) |
253 if vrels and not any(source.support_relation(r.r_type) |
310 if vrels and not any(source.support_relation(r.r_type) |
254 for r in vrels): |
311 for r in vrels): |
255 self._set_source_for_var(repo.system_source, varobj) |
312 self._set_source_for_term(repo.system_source, varobj) |
256 else: |
313 else: |
257 self._set_source_for_var(source, varobj) |
314 self._set_source_for_term(source, varobj) |
258 continue |
315 continue |
259 rels = varobj.stinfo['relations'] |
316 rels = varobj.stinfo['relations'] |
260 if not rels and not varobj.stinfo['typerels']: |
317 if not rels and not varobj.stinfo['typerels']: |
261 # (rare) case where the variable has no type specified nor |
318 # (rare) case where the variable has no type specified nor |
262 # relation accessed ex. "Any MAX(X)" |
319 # relation accessed ex. "Any MAX(X)" |
263 self._set_source_for_var(repo.system_source, varobj) |
320 self._set_source_for_term(repo.system_source, varobj) |
264 continue |
321 continue |
265 for i, sol in enumerate(self._solutions): |
322 for i, sol in enumerate(self._solutions): |
266 vartype = sol[varname] |
323 vartype = sol[varname] |
267 # skip final variable |
324 # skip final variable |
268 if eschema(vartype).is_final(): |
325 if eschema(vartype).is_final(): |
274 # * the variable isn't invariant |
331 # * the variable isn't invariant |
275 # * at least one supported relation specified |
332 # * at least one supported relation specified |
276 if not varobj._q_invariant or \ |
333 if not varobj._q_invariant or \ |
277 any(imap(source.support_relation, |
334 any(imap(source.support_relation, |
278 (r.r_type for r in rels if r.r_type != 'eid'))): |
335 (r.r_type for r in rels if r.r_type != 'eid'))): |
279 sourcesvars.setdefault(source, {}).setdefault(varobj, set()).add(i) |
336 sourcesterms.setdefault(source, {}).setdefault(varobj, set()).add(i) |
280 # if variable is not invariant and is used by a relation |
337 # if variable is not invariant and is used by a relation |
281 # not supported by this source, we'll have to split the |
338 # not supported by this source, we'll have to split the |
282 # query |
339 # query |
283 if not varobj._q_invariant and any(ifilterfalse( |
340 if not varobj._q_invariant and any(ifilterfalse( |
284 source.support_relation, (r.r_type for r in rels))): |
341 source.support_relation, (r.r_type for r in rels))): |
285 self.needsplit = True |
342 self.needsplit = True |
286 |
343 # add source for rewritten constants to sourcesterms |
287 def _handle_cross_relation(self, rel, relsources, vsources): |
344 for vconsts in self.rqlst.stinfo['rewritten'].itervalues(): |
288 crossvars = None |
345 const = vconsts[0] |
289 for source in relsources: |
346 source = self._session.source_from_eid(const.eval(self.plan.args)) |
290 if rel.r_type in source.cross_relations: |
347 if source is self._repo.system_source: |
291 crossvars = set(x.variable for x in rel.get_nodes(VariableRef)) |
348 for const in vconsts: |
292 crossvars.update(frozenset(x for x in rel.get_nodes(Constant))) |
349 self._set_source_for_term(source, const) |
293 assert len(crossvars) == 2 |
350 elif source in self._sourcesterms: |
294 ssource = self._session.repo.system_source |
351 source_scopes = frozenset(t.scope for t in self._sourcesterms[source]) |
295 needsplit = True |
352 for const in vconsts: |
296 flag = 0 |
353 if const.scope in source_scopes: |
297 for v in crossvars: |
354 self._set_source_for_term(source, const) |
298 if isinstance(v, Constant): |
355 # add source for relations |
299 allsols = set(self._solindices) |
356 rschema = self._schema.rschema |
300 try: |
357 termssources = {} |
301 self._sourcesvars[ssource][v] = allsols |
|
302 except KeyError: |
|
303 self._sourcesvars[ssource] = {v: allsols} |
|
304 if len(vsources[v]) == 1: |
|
305 if iter(vsources[v]).next()[0].uri == 'system': |
|
306 flag = 1 |
|
307 for ov in crossvars: |
|
308 if ov is not v and (isinstance(ov, Constant) or ov._q_invariant): |
|
309 ssset = frozenset((ssource,)) |
|
310 self._remove_sources(ov, vsources[ov] - ssset) |
|
311 else: |
|
312 for ov in crossvars: |
|
313 if ov is not v and (isinstance(ov, Constant) or ov._q_invariant): |
|
314 needsplit = False |
|
315 break |
|
316 else: |
|
317 continue |
|
318 if not rel.neged(strict=True): |
|
319 break |
|
320 else: |
|
321 self._crossrelations.setdefault(source, {})[rel] = crossvars |
|
322 if not flag: |
|
323 self._sourcesvars.setdefault(source, {})[rel] = set(self._solindices) |
|
324 self._sourcesvars.setdefault(ssource, {})[rel] = set(self._solindices) |
|
325 if needsplit: |
|
326 self.needsplit = True |
|
327 return crossvars is None |
|
328 |
|
329 def _remove_invalid_sources(self): |
|
330 """removes invalid sources from `sourcesvars` member according to |
|
331 traversed relations and their properties (which sources support them, |
|
332 can they cross sources, etc...) |
|
333 """ |
|
334 repo = self._session.repo |
|
335 rschema = repo.schema.rschema |
|
336 vsources = {} |
|
337 for rel in self.rqlst.iget_nodes(Relation): |
358 for rel in self.rqlst.iget_nodes(Relation): |
338 # process non final relations only |
359 # process non final relations only |
339 # note: don't try to get schema for 'is' relation (not available |
360 # note: don't try to get schema for 'is' relation (not available |
340 # during bootstrap) |
361 # during bootstrap) |
341 if not rel.is_types_restriction() and not rschema(rel.r_type).is_final(): |
362 if not rel.is_types_restriction() and not rschema(rel.r_type).is_final(): |
344 # attribute |
365 # attribute |
345 # |
366 # |
346 # XXX code below don't deal if some source allow relation |
367 # XXX code below don't deal if some source allow relation |
347 # crossing but not another one |
368 # crossing but not another one |
348 relsources = repo.rel_type_sources(rel.r_type) |
369 relsources = repo.rel_type_sources(rel.r_type) |
349 crossvars = None |
|
350 if len(relsources) < 2: |
370 if len(relsources) < 2: |
351 # filter out sources being there because they have this |
371 # filter out sources being there because they have this |
352 # relation in their dont_cross_relations attribute |
372 # relation in their dont_cross_relations attribute |
353 relsources = [source for source in relsources |
373 relsources = [source for source in relsources |
354 if source.support_relation(rel.r_type)] |
374 if source.support_relation(rel.r_type)] |
355 if relsources: |
375 if relsources: |
356 # this means the relation is using a variable inlined as |
376 # this means the relation is using a variable inlined as |
357 # a constant and another unsupported variable, in which |
377 # a constant and another unsupported variable, in which |
358 # case we put the relation in sourcesvars |
378 # case we put the relation in sourcesterms |
359 self._sourcesvars.setdefault(relsources[0], {})[rel] = set(self._solindices) |
379 self._sourcesterms.setdefault(relsources[0], {})[rel] = set(self._solindices) |
360 continue |
380 continue |
361 lhs, rhs = rel.get_variable_parts() |
381 lhs, rhs = rel.get_variable_parts() |
362 lhsv, rhsv = getattr(lhs, 'variable', lhs), getattr(rhs, 'variable', rhs) |
382 lhsv, rhsv = getattr(lhs, 'variable', lhs), getattr(rhs, 'variable', rhs) |
363 # update dictionnary of sources supporting lhs and rhs vars |
383 # update dictionary of sources supporting lhs and rhs vars |
364 if not lhsv in vsources: |
384 if not lhsv in termssources: |
365 vsources[lhsv] = self._term_sources(lhs) |
385 termssources[lhsv] = self._term_sources(lhs) |
366 if not rhsv in vsources: |
386 if not rhsv in termssources: |
367 vsources[rhsv] = self._term_sources(rhs) |
387 termssources[rhsv] = self._term_sources(rhs) |
368 if self._handle_cross_relation(rel, relsources, vsources): |
388 self._handle_cross_relation(rel, relsources, termssources) |
369 self._linkedvars.setdefault(lhsv, set()).add((rhsv, rel)) |
389 self._linkedterms.setdefault(lhsv, set()).add((rhsv, rel)) |
370 self._linkedvars.setdefault(rhsv, set()).add((lhsv, rel)) |
390 self._linkedterms.setdefault(rhsv, set()).add((lhsv, rel)) |
371 else: |
391 return termssources |
372 self._crosslinkedvars.setdefault(lhsv, set()).add((rhsv, rel)) |
392 |
373 self._crosslinkedvars.setdefault(rhsv, set()).add((lhsv, rel)) |
393 def _handle_cross_relation(self, rel, relsources, termssources): |
374 for term in self._linkedvars: |
394 cross_rel = False |
375 self._remove_sources_until_stable(term, vsources) |
395 for source in relsources: |
376 if len(self._sourcesvars) > 1 and hasattr(self.plan.rqlst, 'main_relations'): |
396 if rel.r_type in source.cross_relations: |
|
397 ssource = self._repo.system_source |
|
398 crossvars = set(x.variable for x in rel.get_nodes(VariableRef)) |
|
399 for const in rel.get_nodes(Constant): |
|
400 if source.uri != 'system' and not const in self._sourcesterms.get(source, ()): |
|
401 continue |
|
402 crossvars.add(const) |
|
403 # XXX this is counter intuitive, though this is currently a |
|
404 # trick to add const to system source terms so we get a |
|
405 # chance that solutions will compare to equals when |
|
406 # computing need split |
|
407 allsols = set(self._solindices) |
|
408 try: |
|
409 self._sourcesterms[ssource][const] = allsols |
|
410 except KeyError: |
|
411 self._sourcesterms[ssource] = {const: allsols} |
|
412 self._crossrelations.setdefault(source, {})[rel] = crossvars |
|
413 if len(crossvars) < 2: |
|
414 # this means there is a constant in the relation which is |
|
415 # not supported by the source, so we can stop here |
|
416 continue |
|
417 self._sourcesterms.setdefault(ssource, {})[rel] = set(self._solindices) |
|
418 cross_rel = True |
|
419 needsplit = True |
|
420 flag = False |
|
421 for term in crossvars: |
|
422 if len(termssources[term]) == 1: |
|
423 if iter(termssources[term]).next()[0].uri == 'system': |
|
424 flag = True |
|
425 for ov in crossvars: |
|
426 if ov is not term and (isinstance(ov, Constant) or ov._q_invariant): |
|
427 ssset = frozenset((ssource,)) |
|
428 self._remove_sources(ov, termssources[ov] - ssset) |
|
429 else: |
|
430 for ov in crossvars: |
|
431 if ov is not term and (isinstance(ov, Constant) or ov._q_invariant): |
|
432 needsplit = False |
|
433 break |
|
434 else: |
|
435 continue |
|
436 if not flag: |
|
437 self._sourcesterms.setdefault(source, {})[rel] = set(self._solindices) |
|
438 if needsplit: |
|
439 self.needsplit = True |
|
440 return cross_rel |
|
441 |
|
442 def _remove_invalid_sources(self, termssources): |
|
443 """removes invalid sources from `sourcesterms` member according to |
|
444 traversed relations and their properties (which sources support them, |
|
445 can they cross sources, etc...) |
|
446 """ |
|
447 for term in self._linkedterms: |
|
448 self._remove_sources_until_stable(term, termssources) |
|
449 if len(self._sourcesterms) > 1 and hasattr(self.plan.rqlst, 'main_relations'): |
377 # the querier doesn't annotate write queries, need to do it here |
450 # the querier doesn't annotate write queries, need to do it here |
378 self.plan.annotate_rqlst() |
451 self.plan.annotate_rqlst() |
379 # insert/update/delete queries, we may get extra information from |
452 # insert/update/delete queries, we may get extra information from |
380 # the main relation (eg relations to the left of the WHERE |
453 # the main relation (eg relations to the left of the WHERE |
381 if self.plan.rqlst.TYPE == 'insert': |
454 if self.plan.rqlst.TYPE == 'insert': |
382 inserted = dict((vref.variable, etype) |
455 inserted = dict((vref.variable, etype) |
383 for etype, vref in self.plan.rqlst.main_variables) |
456 for etype, vref in self.plan.rqlst.main_variables) |
384 else: |
457 else: |
385 inserted = {} |
458 inserted = {} |
|
459 repo = self._repo |
|
460 rschema = self._schema.rschema |
386 for rel in self.plan.rqlst.main_relations: |
461 for rel in self.plan.rqlst.main_relations: |
387 if not rschema(rel.r_type).is_final(): |
462 if not rschema(rel.r_type).is_final(): |
388 # nothing to do if relation is not supported by multiple sources |
463 # nothing to do if relation is not supported by multiple sources |
389 if len(repo.rel_type_sources(rel.r_type)) < 2: |
464 if len(repo.rel_type_sources(rel.r_type)) < 2: |
390 continue |
465 continue |
391 lhs, rhs = rel.get_variable_parts() |
466 lhs, rhs = rel.get_variable_parts() |
392 try: |
467 try: |
393 lhsv = self._extern_term(lhs, vsources, inserted) |
468 lhsv = self._extern_term(lhs, termssources, inserted) |
394 rhsv = self._extern_term(rhs, vsources, inserted) |
469 rhsv = self._extern_term(rhs, termssources, inserted) |
395 except KeyError, ex: |
470 except KeyError, ex: |
396 continue |
471 continue |
397 norelsup = self._norel_support_set(rel) |
472 self._remove_term_sources(lhsv, rel, rhsv, termssources) |
398 self._remove_var_sources(lhsv, norelsup, rhsv, vsources) |
473 self._remove_term_sources(rhsv, rel, lhsv, termssources) |
399 self._remove_var_sources(rhsv, norelsup, lhsv, vsources) |
474 |
400 # cleanup linked var |
475 def _extern_term(self, term, termssources, inserted): |
401 for var, linkedrelsinfo in self._linkedvars.iteritems(): |
|
402 self._linkedvars[var] = frozenset(x[0] for x in linkedrelsinfo) |
|
403 # if there are other sources than the system source, consider simplified |
|
404 # variables'source |
|
405 if self._sourcesvars and self._sourcesvars.keys() != [self._session.repo.system_source]: |
|
406 # add source for rewritten constants to sourcesvars |
|
407 for vconsts in self.rqlst.stinfo['rewritten'].itervalues(): |
|
408 const = vconsts[0] |
|
409 eid = const.eval(self.plan.args) |
|
410 source = self._session.source_from_eid(eid) |
|
411 if source is self._session.repo.system_source: |
|
412 for const in vconsts: |
|
413 self._set_source_for_var(source, const) |
|
414 elif source in self._sourcesvars: |
|
415 source_scopes = frozenset(v.scope for v in self._sourcesvars[source]) |
|
416 for const in vconsts: |
|
417 if const.scope in source_scopes: |
|
418 self._set_source_for_var(source, const) |
|
419 |
|
420 def _extern_term(self, term, vsources, inserted): |
|
421 var = term.variable |
476 var = term.variable |
422 if var.stinfo['constnode']: |
477 if var.stinfo['constnode']: |
423 termv = var.stinfo['constnode'] |
478 termv = var.stinfo['constnode'] |
424 vsources[termv] = self._term_sources(termv) |
479 termssources[termv] = self._term_sources(termv) |
425 elif var in inserted: |
480 elif var in inserted: |
426 termv = var |
481 termv = var |
427 source = self._session.repo.locate_etype_source(inserted[var]) |
482 source = self._repo.locate_etype_source(inserted[var]) |
428 vsources[termv] = set((source, solindex) for solindex in self._solindices) |
483 termssources[termv] = set((source, solindex) |
|
484 for solindex in self._solindices) |
429 else: |
485 else: |
430 termv = self.rqlst.defined_vars[var.name] |
486 termv = self.rqlst.defined_vars[var.name] |
431 if not termv in vsources: |
487 if not termv in termssources: |
432 vsources[termv] = self._term_sources(termv) |
488 termssources[termv] = self._term_sources(termv) |
433 return termv |
489 return termv |
434 |
490 |
435 def _remove_sources_until_stable(self, var, vsources): |
491 def _remove_sources_until_stable(self, term, termssources): |
436 sourcesvars = self._sourcesvars |
492 sourcesterms = self._sourcesterms |
437 for ovar, rel in self._linkedvars.get(var, ()): |
493 for oterm, rel in self._linkedterms.get(term, ()): |
438 if not var.scope is ovar.scope and rel.scope.neged(strict=True): |
494 if not term.scope is oterm.scope and rel.scope.neged(strict=True): |
439 # can't get information from relation inside a NOT exists |
495 # can't get information from relation inside a NOT exists |
440 # where variables don't belong to the same scope |
496 # where terms don't belong to the same scope |
441 continue |
497 continue |
442 need_ancestor_scope = False |
498 need_ancestor_scope = False |
443 if not (var.scope is rel.scope and ovar.scope is rel.scope): |
499 if not (term.scope is rel.scope and oterm.scope is rel.scope): |
444 if rel.ored(): |
500 if rel.ored(): |
445 continue |
501 continue |
446 if rel.ored(traverse_scope=True): |
502 if rel.ored(traverse_scope=True): |
447 # if relation has some OR as parent, constraints should only |
503 # if relation has some OR as parent, constraints should only |
448 # propagate from parent scope to child scope, nothing else |
504 # propagate from parent scope to child scope, nothing else |
449 need_ancestor_scope = True |
505 need_ancestor_scope = True |
450 relsources = self._session.repo.rel_type_sources(rel.r_type) |
506 relsources = self._repo.rel_type_sources(rel.r_type) |
451 if rel.neged(strict=True) and ( |
507 if rel.neged(strict=True) and ( |
452 len(relsources) < 2 |
508 len(relsources) < 2 |
453 or not isinstance(ovar, Variable) |
509 or not isinstance(oterm, Variable) |
454 or ovar.valuable_references() != 1 |
510 or oterm.valuable_references() != 1 |
455 or any(sourcesvars[source][var] != sourcesvars[source][ovar] |
511 or any(sourcesterms[source][term] != sourcesterms[source][oterm] |
456 for source in relsources |
512 for source in relsources |
457 if var in sourcesvars.get(source, ()) |
513 if term in sourcesterms.get(source, ()) |
458 and ovar in sourcesvars.get(source, ()))): |
514 and oterm in sourcesterms.get(source, ()))): |
459 # neged relation doesn't allow to infer variable sources unless we're |
515 # neged relation doesn't allow to infer term sources unless |
460 # on a multisource relation for a variable only used by this relation |
516 # we're on a multisource relation for a term only used by this |
461 # (eg "Any X WHERE NOT X multisource_rel Y" and over is Y), iif |
517 # relation (eg "Any X WHERE NOT X multisource_rel Y" and over is |
|
518 # Y) |
462 continue |
519 continue |
463 norelsup = self._norel_support_set(rel) |
520 # compute invalid sources for terms and remove them |
464 # compute invalid sources for variables and remove them |
521 if not need_ancestor_scope or is_ancestor(term.scope, oterm.scope): |
465 if not need_ancestor_scope or is_ancestor(var.scope, ovar.scope): |
522 self._remove_term_sources(term, rel, oterm, termssources) |
466 self._remove_var_sources(var, norelsup, ovar, vsources) |
523 if not need_ancestor_scope or is_ancestor(oterm.scope, term.scope): |
467 if not need_ancestor_scope or is_ancestor(ovar.scope, var.scope): |
524 self._remove_term_sources(oterm, rel, term, termssources) |
468 self._remove_var_sources(ovar, norelsup, var, vsources) |
525 |
469 |
526 def _remove_term_sources(self, term, rel, oterm, termssources): |
470 def _remove_var_sources(self, var, norelsup, ovar, vsources): |
527 """remove invalid sources for term according to oterm's sources and the |
471 """remove invalid sources for var according to ovar's sources and the |
528 relation between those two terms. |
472 relation between those two variables. |
|
473 """ |
529 """ |
474 varsources = vsources[var] |
530 norelsup = self._norel_support_set(rel) |
475 invalid_sources = varsources - (vsources[ovar] | norelsup) |
531 termsources = termssources[term] |
|
532 invalid_sources = termsources - (termssources[oterm] | norelsup) |
|
533 if invalid_sources and self._repo.can_cross_relation(rel.r_type): |
|
534 invalid_sources -= self._sys_source_set |
|
535 if invalid_sources and isinstance(term, Variable) and self._need_ext_source_access(term, rel): |
|
536 # if the term is a not invariant variable, we should filter out |
|
537 # source where the relation is a cross relation from invalid |
|
538 # sources |
|
539 invalid_sources = frozenset([(s, solidx) for s, solidx in invalid_sources |
|
540 if not (s in self._crossrelations and |
|
541 rel in self._crossrelations[s])]) |
476 if invalid_sources: |
542 if invalid_sources: |
477 self._remove_sources(var, invalid_sources) |
543 self._remove_sources(term, invalid_sources) |
478 varsources -= invalid_sources |
544 termsources -= invalid_sources |
479 self._remove_sources_until_stable(var, vsources) |
545 self._remove_sources_until_stable(term, termssources) |
480 |
546 |
481 def _compute_needsplit(self): |
547 def _compute_needsplit(self): |
482 """tell according to sourcesvars if the rqlst has to be splitted for |
548 """tell according to sourcesterms if the rqlst has to be splitted for |
483 execution among multiple sources |
549 execution among multiple sources |
484 |
550 |
485 the execution has to be split if |
551 the execution has to be split if |
486 * a source support an entity (non invariant) but doesn't support a |
552 * a source support an entity (non invariant) but doesn't support a |
487 relation on it |
553 relation on it |
489 * there is more than one source and either all sources'supported |
555 * there is more than one source and either all sources'supported |
490 variable/solutions are not equivalent or multiple variables have to |
556 variable/solutions are not equivalent or multiple variables have to |
491 be fetched from some source |
557 be fetched from some source |
492 """ |
558 """ |
493 # NOTE: < 2 since may be 0 on queries such as Any X WHERE X eid 2 |
559 # NOTE: < 2 since may be 0 on queries such as Any X WHERE X eid 2 |
494 if len(self._sourcesvars) < 2: |
560 if len(self._sourcesterms) < 2: |
495 self.needsplit = False |
561 self.needsplit = False |
496 elif not self.needsplit: |
562 elif not self.needsplit: |
497 if not allequals(self._sourcesvars.itervalues()): |
563 if not allequals(self._sourcesterms.itervalues()): |
498 self.needsplit = True |
564 self.needsplit = True |
499 else: |
565 else: |
500 sample = self._sourcesvars.itervalues().next() |
566 sample = self._sourcesterms.itervalues().next() |
501 if len(sample) > 1 and any(v for v in sample |
567 if len(sample) > 1: |
502 if not v in self._linkedvars |
568 for term in sample: |
503 and not v in self._crosslinkedvars): |
569 # need split if unlinked variable |
504 self.needsplit = True |
570 if isinstance(term, Variable) and not term in self._linkedterms: |
505 |
571 self.needsplit = True |
506 def _set_source_for_var(self, source, var): |
572 break |
507 self._sourcesvars.setdefault(source, {})[var] = set(self._solindices) |
573 else: |
|
574 # need split if there are some cross relation on non |
|
575 # invariant variable or if the variable is used in |
|
576 # multi-sources relation |
|
577 if self._crossrelations: |
|
578 for reldict in self._crossrelations.itervalues(): |
|
579 for rel, terms in reldict.iteritems(): |
|
580 for term in terms: |
|
581 if isinstance(term, Variable) and self._need_ext_source_access(term, rel): |
|
582 self.needsplit = True |
|
583 return |
|
584 |
|
585 @cached |
|
586 def _need_ext_source_access(self, var, rel): |
|
587 if not var._q_invariant: |
|
588 return True |
|
589 if any(r for x, r in self._linkedterms[var] |
|
590 if not r is rel and self._repo.is_multi_sources_relation(r.r_type)): |
|
591 return True |
|
592 return False |
|
593 |
|
594 def _set_source_for_term(self, source, term): |
|
595 self._sourcesterms.setdefault(source, {})[term] = set(self._solindices) |
508 |
596 |
509 def _term_sources(self, term): |
597 def _term_sources(self, term): |
510 """returns possible sources for terms `term`""" |
598 """returns possible sources for terms `term`""" |
511 if isinstance(term, Constant): |
599 if isinstance(term, Constant): |
512 source = self._session.source_from_eid(term.eval(self.plan.args)) |
600 source = self._session.source_from_eid(term.eval(self.plan.args)) |
513 return set((source, solindex) for solindex in self._solindices) |
601 return set((source, solindex) for solindex in self._solindices) |
514 else: |
602 else: |
515 var = getattr(term, 'variable', term) |
603 var = getattr(term, 'variable', term) |
516 sources = [source for source, varobjs in self.sourcesvars.iteritems() |
604 sources = [source for source, varobjs in self.sourcesterms.iteritems() |
517 if var in varobjs] |
605 if var in varobjs] |
518 return set((source, solindex) for source in sources |
606 return set((source, solindex) for source in sources |
519 for solindex in self.sourcesvars[source][var]) |
607 for solindex in self.sourcesterms[source][var]) |
520 |
608 |
521 def _remove_sources(self, var, sources): |
609 def _remove_sources(self, term, sources): |
522 """removes invalid sources (`sources`) from `sourcesvars` |
610 """removes invalid sources (`sources`) from `sourcesterms` |
523 |
611 |
524 :param sources: the list of sources to remove |
612 :param sources: the list of sources to remove |
525 :param var: the analyzed variable |
613 :param term: the analyzed term |
526 """ |
614 """ |
527 sourcesvars = self._sourcesvars |
615 sourcesterms = self._sourcesterms |
528 for source, solindex in sources: |
616 for source, solindex in sources: |
529 try: |
617 try: |
530 sourcesvars[source][var].remove(solindex) |
618 sourcesterms[source][term].remove(solindex) |
531 except KeyError: |
619 except KeyError: |
532 return # may occur with subquery column alias |
620 return # may occur with subquery column alias |
533 if not sourcesvars[source][var]: |
621 if not sourcesterms[source][term]: |
534 del sourcesvars[source][var] |
622 del sourcesterms[source][term] |
535 if not sourcesvars[source]: |
623 if not sourcesterms[source]: |
536 del sourcesvars[source] |
624 del sourcesterms[source] |
537 |
625 |
538 def crossed_relation(self, source, relation): |
626 def crossed_relation(self, source, relation): |
539 return relation in self._crossrelations.get(source, ()) |
627 return relation in self._crossrelations.get(source, ()) |
540 |
628 |
541 def part_steps(self): |
629 def part_steps(self): |
543 each step. This is necessary to know if an aggregate step will be |
631 each step. This is necessary to know if an aggregate step will be |
544 necessary or not. |
632 necessary or not. |
545 """ |
633 """ |
546 steps = [] |
634 steps = [] |
547 select = self.rqlst |
635 select = self.rqlst |
548 rschema = self.plan.schema.rschema |
636 rschema = self._schema.rschema |
549 for source in self.part_sources: |
637 for source in self.part_sources: |
550 sourcevars = self._sourcesvars[source] |
638 sourceterms = self._sourcesterms[source] |
551 while sourcevars: |
639 while sourceterms: |
552 # take a variable randomly, and all variables supporting the |
640 # take a term randomly, and all terms supporting the |
553 # same solutions |
641 # same solutions |
554 var, solindices = self._choose_var(sourcevars) |
642 term, solindices = self._choose_term(sourceterms) |
555 if source.uri == 'system': |
643 if source.uri == 'system': |
556 # ensure all variables are available for the latest step |
644 # ensure all variables are available for the latest step |
557 # (missing one will be available from temporary tables |
645 # (missing one will be available from temporary tables |
558 # of previous steps) |
646 # of previous steps) |
559 scope = select |
647 scope = select |
560 variables = scope.defined_vars.values() + scope.aliases.values() |
648 terms = scope.defined_vars.values() + scope.aliases.values() |
561 sourcevars.clear() |
649 sourceterms.clear() |
|
650 sources = [source] |
562 else: |
651 else: |
563 scope = var.scope |
652 scope = term.scope |
564 variables = self._expand_vars(var, source, sourcevars, scope, solindices) |
653 # find which sources support the same term and solutions |
565 if not sourcevars: |
654 sources = self._expand_sources(source, term, solindices) |
566 del self._sourcesvars[source] |
655 # no try to get as much terms as possible |
567 # find which sources support the same variables/solutions |
656 terms = self._expand_terms(term, sources, sourceterms, |
568 sources = self._expand_sources(source, variables, solindices) |
657 scope, solindices) |
|
658 if len(terms) == 1 and isinstance(terms[0], Constant): |
|
659 # we can't generate anything interesting with a single |
|
660 # constant term (will generate an empty "Any" query), |
|
661 # go to the next iteration directly! |
|
662 continue |
|
663 if not sourceterms: |
|
664 del self._sourcesterms[source] |
569 # suppose this is a final step until the contrary is proven |
665 # suppose this is a final step until the contrary is proven |
570 final = scope is select |
666 final = scope is select |
571 # set of variables which should be additionaly selected when |
667 # set of terms which should be additionaly selected when |
572 # possible |
668 # possible |
573 needsel = set() |
669 needsel = set() |
574 # add attribute variables and mark variables which should be |
670 # add attribute variables and mark variables which should be |
575 # additionaly selected when possible |
671 # additionaly selected when possible |
576 for var in select.defined_vars.itervalues(): |
672 for var in select.defined_vars.itervalues(): |
577 if not var in variables: |
673 if not var in terms: |
578 stinfo = var.stinfo |
674 stinfo = var.stinfo |
579 for ovar, rtype in stinfo['attrvars']: |
675 for ovar, rtype in stinfo['attrvars']: |
580 if ovar in variables: |
676 if ovar in terms: |
581 needsel.add(var.name) |
677 needsel.add(var.name) |
582 variables.append(var) |
678 terms.append(var) |
583 break |
679 break |
584 else: |
680 else: |
585 needsel.add(var.name) |
681 needsel.add(var.name) |
586 final = False |
682 final = False |
587 if final and source.uri != 'system': |
683 if final and source.uri != 'system': |
656 # supported relation with at least one end supported, check the |
750 # supported relation with at least one end supported, check the |
657 # other end is in as well. If not this usually means the |
751 # other end is in as well. If not this usually means the |
658 # variable is refed by an outer scope and should be substituted |
752 # variable is refed by an outer scope and should be substituted |
659 # using an 'identity' relation (else we'll get a conflict of |
753 # using an 'identity' relation (else we'll get a conflict of |
660 # temporary tables) |
754 # temporary tables) |
661 if rhsvar in variables and not lhsvar in variables: |
755 if rhsvar in terms and not lhsvar in terms: |
662 self._identity_substitute(rel, lhsvar, variables, needsel) |
756 self._identity_substitute(rel, lhsvar, terms, needsel) |
663 elif lhsvar in variables and not rhsvar in variables: |
757 elif lhsvar in terms and not rhsvar in terms: |
664 self._identity_substitute(rel, rhsvar, variables, needsel) |
758 self._identity_substitute(rel, rhsvar, terms, needsel) |
665 |
759 |
666 def _identity_substitute(self, relation, var, variables, needsel): |
760 def _identity_substitute(self, relation, var, terms, needsel): |
667 newvar = self._insert_identity_variable(relation.scope, var) |
761 newvar = self._insert_identity_variable(relation.scope, var) |
668 if newvar is not None: |
762 if newvar is not None: |
669 # ensure relation is using '=' operator, else we rely on a |
763 # ensure relation is using '=' operator, else we rely on a |
670 # sqlgenerator side effect (it won't insert an inequality operator |
764 # sqlgenerator side effect (it won't insert an inequality operator |
671 # in this case) |
765 # in this case) |
672 relation.children[1].operator = '=' |
766 relation.children[1].operator = '=' |
673 variables.append(newvar) |
767 terms.append(newvar) |
674 needsel.add(newvar.name) |
768 needsel.add(newvar.name) |
675 |
769 |
676 def _choose_var(self, sourcevars): |
770 def _choose_term(self, sourceterms): |
|
771 """pick one term among terms supported by a source, which will be used |
|
772 as a base to generate an execution step |
|
773 """ |
677 secondchoice = None |
774 secondchoice = None |
678 if len(self._sourcesvars) > 1: |
775 if len(self._sourcesterms) > 1: |
679 # priority to variable from subscopes |
776 # priority to variable from subscopes |
680 for var in sourcevars: |
777 for var in sourceterms: |
681 if not var.scope is self.rqlst: |
778 if not var.scope is self.rqlst: |
682 if isinstance(var, Variable): |
779 if isinstance(var, Variable): |
683 return var, sourcevars.pop(var) |
780 return var, sourceterms.pop(var) |
684 secondchoice = var |
781 secondchoice = var |
685 else: |
782 else: |
686 # priority to variable outer scope |
783 # priority to variable outer scope |
687 for var in sourcevars: |
784 for var in sourceterms: |
688 if var.scope is self.rqlst: |
785 if var.scope is self.rqlst: |
689 if isinstance(var, Variable): |
786 if isinstance(var, Variable): |
690 return var, sourcevars.pop(var) |
787 return var, sourceterms.pop(var) |
691 secondchoice = var |
788 secondchoice = var |
692 if secondchoice is not None: |
789 if secondchoice is not None: |
693 return secondchoice, sourcevars.pop(secondchoice) |
790 return secondchoice, sourceterms.pop(secondchoice) |
694 # priority to variable |
791 # priority to variable |
695 for var in sourcevars: |
792 for var in sourceterms: |
696 if isinstance(var, Variable): |
793 if isinstance(var, Variable): |
697 return var, sourcevars.pop(var) |
794 return var, sourceterms.pop(var) |
698 # whatever |
795 # whatever |
699 var = iter(sourcevars).next() |
796 var = iter(sourceterms).next() |
700 return var, sourcevars.pop(var) |
797 return var, sourceterms.pop(var) |
701 |
798 |
|
799 def _expand_sources(self, selected_source, term, solindices): |
|
800 """return all sources supporting given term / solindices""" |
|
801 sources = [selected_source] |
|
802 sourcesterms = self._sourcesterms |
|
803 for source in sourcesterms: |
|
804 if source is selected_source: |
|
805 continue |
|
806 if not (term in sourcesterms[source] and |
|
807 solindices.issubset(sourcesterms[source][term])): |
|
808 continue |
|
809 sources.append(source) |
|
810 if source.uri != 'system': |
|
811 termsolindices = sourcesterms[source][term] |
|
812 termsolindices -= solindices |
|
813 if not termsolindices: |
|
814 del sourcesterms[source][term] |
|
815 return sources |
702 |
816 |
703 def _expand_vars(self, var, source, sourcevars, scope, solindices): |
817 def _expand_terms(self, term, sources, sourceterms, scope, solindices): |
704 variables = [var] |
818 terms = [term] |
|
819 sources = sorted(sources) |
705 nbunlinked = 1 |
820 nbunlinked = 1 |
706 linkedvars = self._linkedvars |
821 linkedterms = self._linkedterms |
707 # variable has to belong to the same scope if there is more |
822 # term has to belong to the same scope if there is more |
708 # than the system source remaining |
823 # than the system source remaining |
709 if len(self._sourcesvars) > 1 and not scope is self.rqlst: |
824 if len(self._sourcesterms) > 1 and not scope is self.rqlst: |
710 candidates = (v for v in sourcevars.keys() if scope is v.scope) |
825 candidates = (t for t in sourceterms.keys() if scope is t.scope) |
711 else: |
826 else: |
712 candidates = sourcevars #.iterkeys() |
827 candidates = sourceterms #.iterkeys() |
713 # we only want one unlinked variable in each generated query |
828 # we only want one unlinked term in each generated query |
714 candidates = [v for v in candidates |
829 candidates = [t for t in candidates |
715 if isinstance(v, Constant) or |
830 if isinstance(t, Constant) or |
716 (solindices.issubset(sourcevars[v]) and v in linkedvars)] |
831 (solindices.issubset(sourceterms[t]) and t in linkedterms)] |
717 accept_var = lambda x: (isinstance(x, Constant) or any(v for v in variables if v in linkedvars.get(x, ()))) |
832 accept_term = lambda x: (not any(s for s in sources if not x in self._sourcesterms[s]) |
718 source_cross_rels = self._crossrelations.get(source, ()) |
833 and any(t for t in terms if t in linkedterms.get(x, ()))) |
719 if isinstance(var, Relation) and var in source_cross_rels: |
834 source_cross_rels = {} |
720 cross_vars = source_cross_rels.pop(var) |
835 for source in sources: |
721 base_accept_var = accept_var |
836 source_cross_rels.update(self._crossrelations.get(source, {})) |
722 accept_var = lambda x: (base_accept_var(x) or x in cross_vars) |
837 if isinstance(term, Relation) and term in source_cross_rels: |
723 for refed in cross_vars: |
838 cross_terms = source_cross_rels.pop(term) |
|
839 base_accept_term = accept_term |
|
840 accept_term = lambda x: (base_accept_term(x) or x in cross_terms) |
|
841 for refed in cross_terms: |
724 if not refed in candidates: |
842 if not refed in candidates: |
725 candidates.append(refed) |
843 candidates.append(refed) |
726 else: |
844 else: |
727 cross_vars = () |
845 cross_terms = () |
728 # repeat until no variable can't be added, since addition of a new |
846 # repeat until no term can't be added, since addition of a new |
729 # variable may permit to another one to be added |
847 # term may permit to another one to be added |
730 modified = True |
848 modified = True |
731 while modified and candidates: |
849 while modified and candidates: |
732 modified = False |
850 modified = False |
733 for var in candidates[:]: |
851 for term in candidates[:]: |
734 if accept_var(var): |
852 if isinstance(term, Constant): |
735 variables.append(var) |
853 relation = term.relation() |
736 try: |
854 if sorted(set(x[0] for x in self._term_sources(term))) != sources: |
737 # constant nodes should be systematically deleted |
855 continue |
738 if isinstance(var, Constant): |
856 terms.append(term) |
739 del sourcevars[var] |
857 candidates.remove(term) |
740 else: |
|
741 # variable nodes should be deleted once all possible |
|
742 # solutions indices have been consumed |
|
743 sourcevars[var] -= solindices |
|
744 if not sourcevars[var]: |
|
745 del sourcevars[var] |
|
746 except KeyError: |
|
747 assert var in cross_vars |
|
748 candidates.remove(var) |
|
749 modified = True |
858 modified = True |
750 return variables |
859 del sourceterms[term] |
751 |
860 elif accept_term(term): |
752 def _expand_sources(self, selected_source, vars, solindices): |
861 terms.append(term) |
753 sources = [selected_source] |
862 candidates.remove(term) |
754 sourcesvars = self._sourcesvars |
863 modified = True |
755 for source in sourcesvars: |
864 for source in sources: |
756 if source is selected_source: |
865 sourceterms = self._sourcesterms[source] |
757 continue |
866 # terms should be deleted once all possible solutions |
758 for var in vars: |
867 # indices have been consumed |
759 if not (var in sourcesvars[source] and |
868 try: |
760 solindices.issubset(sourcesvars[source][var])): |
869 sourceterms[term] -= solindices |
761 break |
870 if not sourceterms[term]: |
762 else: |
871 del sourceterms[term] |
763 sources.append(source) |
872 except KeyError: |
764 if source.uri != 'system': |
873 assert term in cross_terms |
765 for var in vars: |
874 return terms |
766 varsolindices = sourcesvars[source][var] |
875 |
767 varsolindices -= solindices |
876 def _cleanup_sourcesterms(self, sources, solindices): |
768 if not varsolindices: |
|
769 del sourcesvars[source][var] |
|
770 return sources |
|
771 |
|
772 def _cleanup_sourcesvars(self, sources, solindices): |
|
773 """on final parts, remove solutions so we know they are already processed""" |
877 """on final parts, remove solutions so we know they are already processed""" |
774 for source in sources: |
878 for source in sources: |
775 try: |
879 try: |
776 sourcevars = self._sourcesvars[source] |
880 sourceterms = self._sourcesterms[source] |
777 except KeyError: |
881 except KeyError: |
778 continue |
882 continue |
779 for var, varsolindices in sourcevars.items(): |
883 for term, termsolindices in sourceterms.items(): |
780 if isinstance(var, Relation) and self.crossed_relation(source, var): |
884 if isinstance(term, Relation) and self.crossed_relation(source, term): |
781 continue |
885 continue |
782 varsolindices -= solindices |
886 termsolindices -= solindices |
783 if not varsolindices: |
887 if not termsolindices: |
784 del sourcevars[var] |
888 del sourceterms[term] |
785 |
889 |
786 def merge_input_maps(self, allsolindices): |
890 def merge_input_maps(self, allsolindices): |
787 """inputmaps is a dictionary with tuple of solution indices as key with an |
891 """inputmaps is a dictionary with tuple of solution indices as key with |
788 associateed input map as value. This function compute for each solution |
892 an associated input map as value. This function compute for each |
789 its necessary input map and return them grouped |
893 solution its necessary input map and return them grouped |
790 |
894 |
791 ex: |
895 ex: |
792 inputmaps = {(0, 1, 2): {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1'}, |
896 inputmaps = {(0, 1, 2): {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1'}, |
793 (1,): {'X': 't2.C0', 'T': 't2.C1'}} |
897 (1,): {'X': 't2.C0', 'T': 't2.C1'}} |
794 return : [([1], {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1', |
898 return : [([1], {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1', |
820 result.append( (list(allsolindices), None) ) |
924 result.append( (list(allsolindices), None) ) |
821 return result |
925 return result |
822 |
926 |
823 def build_final_part(self, select, solindices, inputmap, sources, |
927 def build_final_part(self, select, solindices, inputmap, sources, |
824 insertedvars): |
928 insertedvars): |
825 plan = self.plan |
929 solutions = [self._solutions[i] for i in solindices] |
826 rqlst = plan.finalize(select, [self._solutions[i] for i in solindices], |
930 if self._conflicts: |
827 insertedvars) |
931 for varname, mappedto in self._conflicts: |
|
932 var = select.defined_vars[varname] |
|
933 newvar = select.make_variable() |
|
934 # XXX should use var.scope but scope hasn't been computed yet |
|
935 select.add_relation(var, 'identity', newvar) |
|
936 for sol in solutions: |
|
937 sol[newvar.name] = sol[varname] |
|
938 inputmap[newvar.name] = mappedto |
|
939 rqlst = self.plan.finalize(select, solutions, insertedvars) |
828 if self.temptable is None and self.finaltable is None: |
940 if self.temptable is None and self.finaltable is None: |
829 return OneFetchStep(plan, rqlst, sources, inputmap=inputmap) |
941 return OneFetchStep(self.plan, rqlst, sources, inputmap=inputmap) |
830 table = self.temptable or self.finaltable |
942 table = self.temptable or self.finaltable |
831 return FetchStep(plan, rqlst, sources, table, True, inputmap) |
943 return FetchStep(self.plan, rqlst, sources, table, True, inputmap) |
832 |
944 |
833 def build_non_final_part(self, select, solindices, sources, insertedvars, |
945 def build_non_final_part(self, select, solindices, sources, insertedvars, |
834 table): |
946 table): |
835 """non final step, will have to store results in a temporary table""" |
947 """non final step, will have to store results in a temporary table""" |
836 plan = self.plan |
948 solutions = [self._solutions[i] for i in solindices] |
837 rqlst = plan.finalize(select, [self._solutions[i] for i in solindices], |
949 rqlst = self.plan.finalize(select, solutions, insertedvars) |
838 insertedvars) |
950 step = FetchStep(self.plan, rqlst, sources, table, False) |
839 step = FetchStep(plan, rqlst, sources, table, False) |
|
840 # update input map for following steps, according to processed solutions |
951 # update input map for following steps, according to processed solutions |
841 inputmapkey = tuple(sorted(solindices)) |
952 inputmapkey = tuple(sorted(solindices)) |
842 inputmap = self._inputmaps.setdefault(inputmapkey, {}) |
953 inputmap = self._inputmaps.setdefault(inputmapkey, {}) |
|
954 for varname, mapping in step.outputmap.iteritems(): |
|
955 if varname in inputmap and \ |
|
956 not (mapping == inputmap[varname] or |
|
957 self._schema.eschema(solutions[0][varname]).is_final()): |
|
958 self._conflicts.append((varname, inputmap[varname])) |
843 inputmap.update(step.outputmap) |
959 inputmap.update(step.outputmap) |
844 plan.add_step(step) |
960 self.plan.add_step(step) |
845 |
961 |
846 |
962 |
847 class MSPlanner(SSPlanner): |
963 class MSPlanner(SSPlanner): |
848 """MultiSourcesPlanner: build execution plan for rql queries |
964 """MultiSourcesPlanner: build execution plan for rql queries |
849 |
965 |
970 select_group_sort(select) |
1086 select_group_sort(select) |
971 else: |
1087 else: |
972 atemptable = None |
1088 atemptable = None |
973 selection = select.selection |
1089 selection = select.selection |
974 ppi.temptable = atemptable |
1090 ppi.temptable = atemptable |
975 vfilter = VariablesFiltererVisitor(self.schema, ppi) |
1091 vfilter = TermsFiltererVisitor(self.schema, ppi) |
976 steps = [] |
1092 steps = [] |
977 for sources, variables, solindices, scope, needsel, final in stepdefs: |
1093 for sources, terms, solindices, scope, needsel, final in stepdefs: |
978 # extract an executable query using only the specified variables |
1094 # extract an executable query using only the specified terms |
979 if sources[0].uri == 'system': |
1095 if sources[0].uri == 'system': |
980 # in this case we have to merge input maps before call to |
1096 # in this case we have to merge input maps before call to |
981 # filter so already processed restriction are correctly |
1097 # filter so already processed restriction are correctly |
982 # removed |
1098 # removed |
983 solsinputmaps = ppi.merge_input_maps(solindices) |
1099 solsinputmaps = ppi.merge_input_maps(solindices) |
984 for solindices, inputmap in solsinputmaps: |
1100 for solindices, inputmap in solsinputmaps: |
985 minrqlst, insertedvars = vfilter.filter( |
1101 minrqlst, insertedvars = vfilter.filter( |
986 sources, variables, scope, set(solindices), needsel, final) |
1102 sources, terms, scope, set(solindices), needsel, final) |
987 if inputmap is None: |
1103 if inputmap is None: |
988 inputmap = subinputmap |
1104 inputmap = subinputmap |
989 else: |
1105 else: |
990 inputmap.update(subinputmap) |
1106 inputmap.update(subinputmap) |
991 steps.append(ppi.build_final_part(minrqlst, solindices, inputmap, |
1107 steps.append(ppi.build_final_part(minrqlst, solindices, inputmap, |
992 sources, insertedvars)) |
1108 sources, insertedvars)) |
993 else: |
1109 else: |
994 # this is a final part (i.e. retreiving results for the |
1110 # this is a final part (i.e. retreiving results for the |
995 # original query part) if all variable / sources have been |
1111 # original query part) if all term / sources have been |
996 # treated or if this is the last shot for used solutions |
1112 # treated or if this is the last shot for used solutions |
997 minrqlst, insertedvars = vfilter.filter( |
1113 minrqlst, insertedvars = vfilter.filter( |
998 sources, variables, scope, solindices, needsel, final) |
1114 sources, terms, scope, solindices, needsel, final) |
999 if final: |
1115 if final: |
1000 solsinputmaps = ppi.merge_input_maps(solindices) |
1116 solsinputmaps = ppi.merge_input_maps(solindices) |
1001 for solindices, inputmap in solsinputmaps: |
1117 for solindices, inputmap in solsinputmaps: |
1002 if inputmap is None: |
1118 if inputmap is None: |
1003 inputmap = subinputmap |
1119 inputmap = subinputmap |
1004 else: |
1120 else: |
1005 inputmap.update(subinputmap) |
1121 inputmap.update(subinputmap) |
1006 steps.append(ppi.build_final_part(minrqlst, solindices, inputmap, |
1122 steps.append(ppi.build_final_part(minrqlst, solindices, inputmap, |
1007 sources, insertedvars)) |
1123 sources, insertedvars)) |
1008 else: |
1124 else: |
1009 table = '_T%s%s' % (''.join(sorted(v._ms_table_key() for v in variables)), |
1125 table = '_T%s%s' % (''.join(sorted(v._ms_table_key() for v in terms)), |
1010 ''.join(sorted(str(i) for i in solindices))) |
1126 ''.join(sorted(str(i) for i in solindices))) |
1011 ppi.build_non_final_part(minrqlst, solindices, sources, |
1127 ppi.build_non_final_part(minrqlst, solindices, sources, |
1012 insertedvars, table) |
1128 insertedvars, table) |
1013 # finally: join parts, deal with aggregat/group/sorts if necessary |
1129 # finally: join parts, deal with aggregat/group/sorts if necessary |
1014 if atemptable is not None: |
1130 if atemptable is not None: |
1037 |
1153 |
1038 class UnsupportedBranch(Exception): |
1154 class UnsupportedBranch(Exception): |
1039 pass |
1155 pass |
1040 |
1156 |
1041 |
1157 |
1042 class VariablesFiltererVisitor(object): |
1158 class TermsFiltererVisitor(object): |
1043 def __init__(self, schema, ppi): |
1159 def __init__(self, schema, ppi): |
1044 self.schema = schema |
1160 self.schema = schema |
1045 self.ppi = ppi |
1161 self.ppi = ppi |
1046 self.skip = {} |
1162 self.skip = {} |
1047 self.hasaggrstep = self.ppi.temptable |
1163 self.hasaggrstep = self.ppi.temptable |
1048 self.extneedsel = frozenset(vref.name for sortterm in ppi.rqlst.orderby |
1164 self.extneedsel = frozenset(vref.name for sortterm in ppi.rqlst.orderby |
1049 for vref in sortterm.iget_nodes(VariableRef)) |
1165 for vref in sortterm.iget_nodes(VariableRef)) |
1050 |
1166 |
1051 def _rqlst_accept(self, rqlst, node, newroot, variables, setfunc=None): |
1167 def _rqlst_accept(self, rqlst, node, newroot, terms, setfunc=None): |
1052 try: |
1168 try: |
1053 newrestr, node_ = node.accept(self, newroot, variables[:]) |
1169 newrestr, node_ = node.accept(self, newroot, terms[:]) |
1054 except UnsupportedBranch: |
1170 except UnsupportedBranch: |
1055 return rqlst |
1171 return rqlst |
1056 if setfunc is not None and newrestr is not None: |
1172 if setfunc is not None and newrestr is not None: |
1057 setfunc(newrestr) |
1173 setfunc(newrestr) |
1058 if not node_ is node: |
1174 if not node_ is node: |
1059 rqlst = node.parent |
1175 rqlst = node.parent |
1060 return rqlst |
1176 return rqlst |
1061 |
1177 |
1062 def filter(self, sources, variables, rqlst, solindices, needsel, final): |
1178 def filter(self, sources, terms, rqlst, solindices, needsel, final): |
1063 if server.DEBUG: |
1179 if server.DEBUG: |
1064 print 'filter', final and 'final' or '', sources, variables, rqlst, solindices, needsel |
1180 print 'filter', final and 'final' or '', sources, terms, rqlst, solindices, needsel |
1065 newroot = Select() |
1181 newroot = Select() |
1066 self.sources = sorted(sources) |
1182 self.sources = sorted(sources) |
1067 self.variables = variables |
1183 self.terms = terms |
1068 self.solindices = solindices |
1184 self.solindices = solindices |
1069 self.final = final |
1185 self.final = final |
1070 # variables which appear in unsupported branches |
1186 # terms which appear in unsupported branches |
1071 needsel |= self.extneedsel |
1187 needsel |= self.extneedsel |
1072 self.needsel = needsel |
1188 self.needsel = needsel |
1073 # variables which appear in supported branches |
1189 # terms which appear in supported branches |
1074 self.mayneedsel = set() |
1190 self.mayneedsel = set() |
1075 # new inserted variables |
1191 # new inserted variables |
1076 self.insertedvars = [] |
1192 self.insertedvars = [] |
1077 # other structures (XXX document) |
1193 # other structures (XXX document) |
1078 self.mayneedvar, self.hasvar = {}, {} |
1194 self.mayneedvar, self.hasvar = {}, {} |
1079 self.use_only_defined = False |
1195 self.use_only_defined = False |
1080 self.scopes = {rqlst: newroot} |
1196 self.scopes = {rqlst: newroot} |
1081 if rqlst.where: |
1197 if rqlst.where: |
1082 rqlst = self._rqlst_accept(rqlst, rqlst.where, newroot, variables, |
1198 rqlst = self._rqlst_accept(rqlst, rqlst.where, newroot, terms, |
1083 newroot.set_where) |
1199 newroot.set_where) |
1084 if isinstance(rqlst, Select): |
1200 if isinstance(rqlst, Select): |
1085 self.use_only_defined = True |
1201 self.use_only_defined = True |
1086 if rqlst.groupby: |
1202 if rqlst.groupby: |
1087 groupby = [] |
1203 groupby = [] |
1088 for node in rqlst.groupby: |
1204 for node in rqlst.groupby: |
1089 rqlst = self._rqlst_accept(rqlst, node, newroot, variables, |
1205 rqlst = self._rqlst_accept(rqlst, node, newroot, terms, |
1090 groupby.append) |
1206 groupby.append) |
1091 if groupby: |
1207 if groupby: |
1092 newroot.set_groupby(groupby) |
1208 newroot.set_groupby(groupby) |
1093 if rqlst.having: |
1209 if rqlst.having: |
1094 having = [] |
1210 having = [] |
1095 for node in rqlst.having: |
1211 for node in rqlst.having: |
1096 rqlst = self._rqlst_accept(rqlst, node, newroot, variables, |
1212 rqlst = self._rqlst_accept(rqlst, node, newroot, terms, |
1097 having.append) |
1213 having.append) |
1098 if having: |
1214 if having: |
1099 newroot.set_having(having) |
1215 newroot.set_having(having) |
1100 if final and rqlst.orderby and not self.hasaggrstep: |
1216 if final and rqlst.orderby and not self.hasaggrstep: |
1101 orderby = [] |
1217 orderby = [] |
1102 for node in rqlst.orderby: |
1218 for node in rqlst.orderby: |
1103 rqlst = self._rqlst_accept(rqlst, node, newroot, variables, |
1219 rqlst = self._rqlst_accept(rqlst, node, newroot, terms, |
1104 orderby.append) |
1220 orderby.append) |
1105 if orderby: |
1221 if orderby: |
1106 newroot.set_orderby(orderby) |
1222 newroot.set_orderby(orderby) |
1107 self.process_selection(newroot, variables, rqlst) |
1223 self.process_selection(newroot, terms, rqlst) |
1108 elif not newroot.where: |
1224 elif not newroot.where: |
1109 # no restrictions have been copied, just select variables and add |
1225 # no restrictions have been copied, just select terms and add |
1110 # type restriction (done later by add_types_restriction) |
1226 # type restriction (done later by add_types_restriction) |
1111 for v in variables: |
1227 for v in terms: |
1112 if not isinstance(v, Variable): |
1228 if not isinstance(v, Variable): |
1113 continue |
1229 continue |
1114 newroot.append_selected(VariableRef(newroot.get_variable(v.name))) |
1230 newroot.append_selected(VariableRef(newroot.get_variable(v.name))) |
1115 solutions = self.ppi.copy_solutions(solindices) |
1231 solutions = self.ppi.copy_solutions(solindices) |
1116 cleanup_solutions(newroot, solutions) |
1232 cleanup_solutions(newroot, solutions) |
1231 else: |
1350 else: |
1232 assert len(vrefs) == 1 |
1351 assert len(vrefs) == 1 |
1233 vref = vrefs[0] |
1352 vref = vrefs[0] |
1234 # XXX check operator ? |
1353 # XXX check operator ? |
1235 self.hasvar[(node.children[0].name, rschema)] = vref |
1354 self.hasvar[(node.children[0].name, rschema)] = vref |
1236 if self._may_skip_attr_rel(rschema, node, vref, ored, variables, res): |
1355 if self._may_skip_attr_rel(rschema, node, vref, ored, terms, res): |
1237 self.skip.setdefault(node, set()).update(self.solindices) |
1356 self.skip.setdefault(node, set()).update(self.solindices) |
1238 elif not ored: |
1357 elif not ored: |
1239 self.skip.setdefault(node, set()).update(self.solindices) |
1358 self.skip.setdefault(node, set()).update(self.solindices) |
1240 return res, node |
1359 return res, node |
1241 |
1360 |
1242 def _may_skip_attr_rel(self, rschema, rel, vref, ored, variables, res): |
1361 def _may_skip_attr_rel(self, rschema, rel, vref, ored, terms, res): |
1243 var = vref.variable |
1362 var = vref.variable |
1244 if ored: |
1363 if ored: |
1245 return False |
1364 return False |
1246 if var.name in self.extneedsel or var.stinfo['selected']: |
1365 if var.name in self.extneedsel or var.stinfo['selected']: |
1247 return False |
1366 return False |
1248 if not same_scope(var): |
1367 if not same_scope(var): |
1249 return False |
1368 return False |
1250 if any(v for v,_ in var.stinfo['attrvars'] if not v.name in variables): |
1369 if any(v for v,_ in var.stinfo['attrvars'] if not v.name in terms): |
1251 return False |
1370 return False |
1252 return True |
1371 return True |
1253 |
1372 |
1254 def visit_exists(self, node, newroot, variables): |
1373 def visit_exists(self, node, newroot, terms): |
1255 newexists = node.__class__() |
1374 newexists = node.__class__() |
1256 self.scopes = {node: newexists} |
1375 self.scopes = {node: newexists} |
1257 subparts, node = self._visit_children(node, newroot, variables) |
1376 subparts, node = self._visit_children(node, newroot, terms) |
1258 if not subparts: |
1377 if not subparts: |
1259 return None, node |
1378 return None, node |
1260 newexists.set_where(subparts[0]) |
1379 newexists.set_where(subparts[0]) |
1261 return newexists, node |
1380 return newexists, node |
1262 |
1381 |
1263 def visit_not(self, node, newroot, variables): |
1382 def visit_not(self, node, newroot, terms): |
1264 subparts, node = self._visit_children(node, newroot, variables) |
1383 subparts, node = self._visit_children(node, newroot, terms) |
1265 if not subparts: |
1384 if not subparts: |
1266 return None, node |
1385 return None, node |
1267 return copy_node(newroot, node, subparts), node |
1386 return copy_node(newroot, node, subparts), node |
1268 |
1387 |
1269 def visit_group(self, node, newroot, variables): |
1388 def visit_group(self, node, newroot, terms): |
1270 if not self.final: |
1389 if not self.final: |
1271 return None, node |
1390 return None, node |
1272 return self.visit_default(node, newroot, variables) |
1391 return self.visit_default(node, newroot, terms) |
1273 |
1392 |
1274 def visit_variableref(self, node, newroot, variables): |
1393 def visit_variableref(self, node, newroot, terms): |
1275 if self.use_only_defined: |
1394 if self.use_only_defined: |
1276 if not node.variable.name in newroot.defined_vars: |
1395 if not node.variable.name in newroot.defined_vars: |
1277 raise UnsupportedBranch(node.name) |
1396 raise UnsupportedBranch(node.name) |
1278 elif not node.variable in variables: |
1397 elif not node.variable in terms: |
1279 raise UnsupportedBranch(node.name) |
1398 raise UnsupportedBranch(node.name) |
1280 self.mayneedsel.add(node.name) |
1399 self.mayneedsel.add(node.name) |
1281 # set scope so we can insert types restriction properly |
1400 # set scope so we can insert types restriction properly |
1282 newvar = newroot.get_variable(node.name) |
1401 newvar = newroot.get_variable(node.name) |
1283 newvar.stinfo['scope'] = self.scopes.get(node.variable.scope, newroot) |
1402 newvar.stinfo['scope'] = self.scopes.get(node.variable.scope, newroot) |
1284 return VariableRef(newvar), node |
1403 return VariableRef(newvar), node |
1285 |
1404 |
1286 def visit_constant(self, node, newroot, variables): |
1405 def visit_constant(self, node, newroot, terms): |
1287 return copy_node(newroot, node), node |
1406 return copy_node(newroot, node), node |
1288 |
1407 |
1289 def visit_default(self, node, newroot, variables): |
1408 def visit_default(self, node, newroot, terms): |
1290 subparts, node = self._visit_children(node, newroot, variables) |
1409 subparts, node = self._visit_children(node, newroot, terms) |
1291 return copy_node(newroot, node, subparts), node |
1410 return copy_node(newroot, node, subparts), node |
1292 |
1411 |
1293 visit_comparison = visit_mathexpression = visit_constant = visit_function = visit_default |
1412 visit_comparison = visit_mathexpression = visit_constant = visit_function = visit_default |
1294 visit_sort = visit_sortterm = visit_default |
1413 visit_sort = visit_sortterm = visit_default |
1295 |
1414 |
1296 def _visit_children(self, node, newroot, variables): |
1415 def _visit_children(self, node, newroot, terms): |
1297 subparts = [] |
1416 subparts = [] |
1298 for i in xrange(len(node.children)): |
1417 for i in xrange(len(node.children)): |
1299 child = node.children[i] |
1418 child = node.children[i] |
1300 newchild, child_ = child.accept(self, newroot, variables) |
1419 newchild, child_ = child.accept(self, newroot, terms) |
1301 if not child is child_: |
1420 if not child is child_: |
1302 node = child_.parent |
1421 node = child_.parent |
1303 if newchild is not None: |
1422 if newchild is not None: |
1304 subparts.append(newchild) |
1423 subparts.append(newchild) |
1305 return subparts, node |
1424 return subparts, node |
1306 |
1425 |
1307 def process_selection(self, newroot, variables, rqlst): |
1426 def process_selection(self, newroot, terms, rqlst): |
1308 if self.final: |
1427 if self.final: |
1309 for term in rqlst.selection: |
1428 for term in rqlst.selection: |
1310 newroot.append_selected(term.copy(newroot)) |
1429 newroot.append_selected(term.copy(newroot)) |
1311 for vref in term.get_nodes(VariableRef): |
1430 for vref in term.get_nodes(VariableRef): |
1312 self.needsel.add(vref.name) |
1431 self.needsel.add(vref.name) |