|
1 """plan execution of rql queries on a single source |
|
2 |
|
3 :organization: Logilab |
|
4 :copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
6 """ |
|
7 __docformat__ = "restructuredtext en" |
|
8 |
|
9 from copy import copy |
|
10 |
|
11 from rql.stmts import Union, Select |
|
12 from rql.nodes import Constant |
|
13 |
|
14 from cubicweb import QueryError, typed_eid |
|
15 |
|
16 def add_types_restriction(schema, rqlst, newroot=None, solutions=None): |
|
17 if newroot is None: |
|
18 assert solutions is None |
|
19 if hasattr(rqlst, '_types_restr_added'): |
|
20 return |
|
21 solutions = rqlst.solutions |
|
22 newroot = rqlst |
|
23 rqlst._types_restr_added = True |
|
24 else: |
|
25 assert solutions is not None |
|
26 rqlst = rqlst.stmt |
|
27 eschema = schema.eschema |
|
28 allpossibletypes = {} |
|
29 for solution in solutions: |
|
30 for varname, etype in solution.iteritems(): |
|
31 if not varname in newroot.defined_vars or eschema(etype).is_final(): |
|
32 continue |
|
33 allpossibletypes.setdefault(varname, set()).add(etype) |
|
34 for varname in sorted(allpossibletypes): |
|
35 try: |
|
36 var = newroot.defined_vars[varname] |
|
37 except KeyError: |
|
38 continue |
|
39 stinfo = var.stinfo |
|
40 if stinfo.get('uidrels'): |
|
41 continue # eid specified, no need for additional type specification |
|
42 try: |
|
43 typerels = rqlst.defined_vars[varname].stinfo.get('typerels') |
|
44 except KeyError: |
|
45 assert varname in rqlst.aliases |
|
46 continue |
|
47 if newroot is rqlst and typerels: |
|
48 mytyperel = iter(typerels).next() |
|
49 else: |
|
50 for vref in newroot.defined_vars[varname].references(): |
|
51 rel = vref.relation() |
|
52 if rel and rel.is_types_restriction(): |
|
53 mytyperel = rel |
|
54 break |
|
55 else: |
|
56 mytyperel = None |
|
57 possibletypes = allpossibletypes[varname] |
|
58 if mytyperel is not None: |
|
59 # variable as already some types restriction. new possible types |
|
60 # can only be a subset of existing ones, so only remove no more |
|
61 # possible types |
|
62 for cst in mytyperel.get_nodes(Constant): |
|
63 if not cst.value in possibletypes: |
|
64 cst.parent.remove(cst) |
|
65 try: |
|
66 stinfo['possibletypes'].remove(cst.value) |
|
67 except KeyError: |
|
68 # restriction on a type not used by this query, may |
|
69 # occurs with X is IN(...) |
|
70 pass |
|
71 else: |
|
72 # we have to add types restriction |
|
73 if stinfo.get('scope') is not None: |
|
74 rel = var.scope.add_type_restriction(var, possibletypes) |
|
75 else: |
|
76 # tree is not annotated yet, no scope set so add the restriction |
|
77 # to the root |
|
78 rel = newroot.add_type_restriction(var, possibletypes) |
|
79 stinfo['typerels'] = frozenset((rel,)) |
|
80 stinfo['possibletypes'] = possibletypes |
|
81 |
|
82 class SSPlanner(object): |
|
83 """SingleSourcePlanner: build execution plan for rql queries |
|
84 |
|
85 optimized for single source repositories |
|
86 """ |
|
87 |
|
88 def __init__(self, schema, rqlhelper): |
|
89 self.schema = schema |
|
90 self.rqlhelper = rqlhelper |
|
91 |
|
92 def build_plan(self, plan): |
|
93 """build an execution plan from a RQL query |
|
94 |
|
95 do nothing here, dispatch according to the statement type |
|
96 """ |
|
97 build_plan = getattr(self, 'build_%s_plan' % plan.rqlst.TYPE) |
|
98 for step in build_plan(plan, plan.rqlst): |
|
99 plan.add_step(step) |
|
100 |
|
101 def build_select_plan(self, plan, rqlst): |
|
102 """build execution plan for a SELECT RQL query. Suppose only one source |
|
103 is available and so avoid work need for query decomposition among sources |
|
104 |
|
105 the rqlst should not be tagged at this point. |
|
106 """ |
|
107 plan.preprocess(rqlst) |
|
108 return (OneFetchStep(plan, rqlst, plan.session.repo.sources),) |
|
109 |
|
110 def build_insert_plan(self, plan, rqlst): |
|
111 """get an execution plan from an INSERT RQL query""" |
|
112 # each variable in main variables is a new entity to insert |
|
113 to_build = {} |
|
114 session = plan.session |
|
115 for etype, var in rqlst.main_variables: |
|
116 # need to do this since entity class is shared w. web client code ! |
|
117 to_build[var.name] = session.etype_class(etype)(session, None, None) |
|
118 plan.add_entity_def(to_build[var.name]) |
|
119 # add constant values to entity def, mark variables to be selected |
|
120 to_select = plan.relation_definitions(rqlst, to_build) |
|
121 # add necessary steps to add relations and update attributes |
|
122 step = InsertStep(plan) # insert each entity and its relations |
|
123 step.children += self._compute_relation_steps(plan, rqlst.solutions, |
|
124 rqlst.where, to_select) |
|
125 return (step,) |
|
126 |
|
127 def _compute_relation_steps(self, plan, solutions, restriction, to_select): |
|
128 """handle the selection of relations for an insert query""" |
|
129 for edef, rdefs in to_select.items(): |
|
130 # create a select rql st to fetch needed data |
|
131 select = Select() |
|
132 eschema = edef.e_schema |
|
133 for i in range(len(rdefs)): |
|
134 rtype, term, reverse = rdefs[i] |
|
135 select.append_selected(term.copy(select)) |
|
136 if reverse: |
|
137 rdefs[i] = rtype, RelationsStep.REVERSE_RELATION |
|
138 else: |
|
139 rschema = eschema.subject_relation(rtype) |
|
140 if rschema.is_final() or rschema.inlined: |
|
141 rdefs[i] = rtype, RelationsStep.FINAL |
|
142 else: |
|
143 rdefs[i] = rtype, RelationsStep.RELATION |
|
144 if restriction is not None: |
|
145 select.set_where(restriction.copy(select)) |
|
146 step = RelationsStep(plan, edef, rdefs) |
|
147 step.children += self._select_plan(plan, select, solutions) |
|
148 yield step |
|
149 |
|
150 def build_delete_plan(self, plan, rqlst): |
|
151 """get an execution plan from a DELETE RQL query""" |
|
152 # build a select query to fetch entities to delete |
|
153 steps = [] |
|
154 for etype, var in rqlst.main_variables: |
|
155 step = DeleteEntitiesStep(plan) |
|
156 step.children += self._sel_variable_step(plan, rqlst.solutions, |
|
157 rqlst.where, etype, var) |
|
158 steps.append(step) |
|
159 for relation in rqlst.main_relations: |
|
160 step = DeleteRelationsStep(plan, relation.r_type) |
|
161 step.children += self._sel_relation_steps(plan, rqlst.solutions, |
|
162 rqlst.where, relation) |
|
163 steps.append(step) |
|
164 return steps |
|
165 |
|
166 def _sel_variable_step(self, plan, solutions, restriction, etype, varref): |
|
167 """handle the selection of variables for a delete query""" |
|
168 select = Select() |
|
169 varref = varref.copy(select) |
|
170 select.defined_vars = {varref.name: varref.variable} |
|
171 select.append_selected(varref) |
|
172 if restriction is not None: |
|
173 select.set_where(restriction.copy(select)) |
|
174 if etype != 'Any': |
|
175 select.add_type_restriction(varref.variable, etype) |
|
176 return self._select_plan(plan, select, solutions) |
|
177 |
|
178 def _sel_relation_steps(self, plan, solutions, restriction, relation): |
|
179 """handle the selection of relations for a delete query""" |
|
180 select = Select() |
|
181 lhs, rhs = relation.get_variable_parts() |
|
182 select.append_selected(lhs.copy(select)) |
|
183 select.append_selected(rhs.copy(select)) |
|
184 select.set_where(relation.copy(select)) |
|
185 if restriction is not None: |
|
186 select.add_restriction(restriction.copy(select)) |
|
187 return self._select_plan(plan, select, solutions) |
|
188 |
|
189 def build_set_plan(self, plan, rqlst): |
|
190 """get an execution plan from an SET RQL query""" |
|
191 select = Select() |
|
192 # extract variables to add to the selection |
|
193 selected_index = {} |
|
194 index = 0 |
|
195 relations, attrrelations = [], [] |
|
196 getrschema = self.schema.rschema |
|
197 for relation in rqlst.main_relations: |
|
198 if relation.r_type in ('eid', 'has_text', 'identity'): |
|
199 raise QueryError('can not assign to %r relation' |
|
200 % relation.r_type) |
|
201 lhs, rhs = relation.get_variable_parts() |
|
202 if not lhs.as_string('utf-8') in selected_index: |
|
203 select.append_selected(lhs.copy(select)) |
|
204 selected_index[lhs.as_string('utf-8')] = index |
|
205 index += 1 |
|
206 if not rhs.as_string('utf-8') in selected_index: |
|
207 select.append_selected(rhs.copy(select)) |
|
208 selected_index[rhs.as_string('utf-8')] = index |
|
209 index += 1 |
|
210 rschema = getrschema(relation.r_type) |
|
211 if rschema.is_final() or rschema.inlined: |
|
212 attrrelations.append(relation) |
|
213 else: |
|
214 relations.append(relation) |
|
215 # add step necessary to fetch all selected variables values |
|
216 if rqlst.where is not None: |
|
217 select.set_where(rqlst.where.copy(select)) |
|
218 # set distinct to avoid potential duplicate key error |
|
219 select.distinct = True |
|
220 step = UpdateStep(plan, attrrelations, relations, selected_index) |
|
221 step.children += self._select_plan(plan, select, rqlst.solutions) |
|
222 return (step,) |
|
223 |
|
224 # internal methods ######################################################## |
|
225 |
|
226 def _select_plan(self, plan, select, solutions): |
|
227 union = Union() |
|
228 union.append(select) |
|
229 select.clean_solutions(solutions) |
|
230 add_types_restriction(self.schema, select) |
|
231 self.rqlhelper.annotate(union) |
|
232 return self.build_select_plan(plan, union) |
|
233 |
|
234 |
|
235 # execution steps and helper functions ######################################## |
|
236 |
|
237 def varmap_test_repr(varmap, tablesinorder): |
|
238 if varmap is None: |
|
239 return varmap |
|
240 maprepr = {} |
|
241 for var, sql in varmap.iteritems(): |
|
242 table, col = sql.split('.') |
|
243 maprepr[var] = '%s.%s' % (tablesinorder[table], col) |
|
244 return maprepr |
|
245 |
|
246 def offset_result(offset, result): |
|
247 offset -= len(result) |
|
248 if offset < 0: |
|
249 result = result[offset:] |
|
250 offset = None |
|
251 elif offset == 0: |
|
252 offset = None |
|
253 result = () |
|
254 return offset, result |
|
255 |
|
256 |
|
257 class LimitOffsetMixIn(object): |
|
258 limit = offset = None |
|
259 def set_limit_offset(self, limit, offset): |
|
260 self.limit = limit |
|
261 self.offset = offset or None |
|
262 |
|
263 |
|
264 class Step(object): |
|
265 """base abstract class for execution step""" |
|
266 def __init__(self, plan): |
|
267 self.plan = plan |
|
268 self.children = [] |
|
269 |
|
270 def execute_child(self): |
|
271 assert len(self.children) == 1 |
|
272 return self.children[0].execute() |
|
273 |
|
274 def execute_children(self): |
|
275 for step in self.children: |
|
276 step.execute() |
|
277 |
|
278 def execute(self): |
|
279 """execute this step and store partial (eg this step) results""" |
|
280 raise NotImplementedError() |
|
281 |
|
282 def mytest_repr(self): |
|
283 """return a representation of this step suitable for test""" |
|
284 return (self.__class__.__name__,) |
|
285 |
|
286 def test_repr(self): |
|
287 """return a representation of this step suitable for test""" |
|
288 return self.mytest_repr() + ( |
|
289 [step.test_repr() for step in self.children],) |
|
290 |
|
291 |
|
292 class OneFetchStep(LimitOffsetMixIn, Step): |
|
293 """step consisting in fetching data from sources and directly returning |
|
294 results |
|
295 """ |
|
296 def __init__(self, plan, union, sources, inputmap=None): |
|
297 Step.__init__(self, plan) |
|
298 self.union = union |
|
299 self.sources = sources |
|
300 self.inputmap = inputmap |
|
301 self.set_limit_offset(union.children[-1].limit, union.children[-1].offset) |
|
302 |
|
303 def set_limit_offset(self, limit, offset): |
|
304 LimitOffsetMixIn.set_limit_offset(self, limit, offset) |
|
305 for select in self.union.children: |
|
306 select.limit = limit |
|
307 select.offset = offset |
|
308 |
|
309 def execute(self): |
|
310 """call .syntax_tree_search with the given syntax tree on each |
|
311 source for each solution |
|
312 """ |
|
313 self.execute_children() |
|
314 session = self.plan.session |
|
315 args = self.plan.args |
|
316 inputmap = self.inputmap |
|
317 union = self.union |
|
318 # do we have to use a inputmap from a previous step ? If so disable |
|
319 # cachekey |
|
320 if inputmap or self.plan.cache_key is None: |
|
321 cachekey = None |
|
322 # union may have been splited into subqueries, rebuild a cache key |
|
323 elif isinstance(self.plan.cache_key, tuple): |
|
324 cachekey = list(self.plan.cache_key) |
|
325 cachekey[0] = union.as_string() |
|
326 cachekey = tuple(cachekey) |
|
327 else: |
|
328 cachekey = union.as_string() |
|
329 result = [] |
|
330 # limit / offset processing |
|
331 limit = self.limit |
|
332 offset = self.offset |
|
333 if offset is not None: |
|
334 if len(self.sources) > 1: |
|
335 # we'll have to deal with limit/offset by ourself |
|
336 if union.children[-1].limit: |
|
337 union.children[-1].limit = limit + offset |
|
338 union.children[-1].offset = None |
|
339 else: |
|
340 offset, limit = None, None |
|
341 for source in self.sources: |
|
342 if offset is None and limit is not None: |
|
343 # modifying the sample rqlst is enough since sql generation |
|
344 # will pick it here as well |
|
345 union.children[-1].limit = limit - len(result) |
|
346 result_ = source.syntax_tree_search(session, union, args, cachekey, |
|
347 inputmap) |
|
348 if offset is not None: |
|
349 offset, result_ = offset_result(offset, result_) |
|
350 result += result_ |
|
351 if limit is not None: |
|
352 if len(result) >= limit: |
|
353 return result[:limit] |
|
354 #print 'ONEFETCH RESULT %s' % (result) |
|
355 return result |
|
356 |
|
357 def mytest_repr(self): |
|
358 """return a representation of this step suitable for test""" |
|
359 try: |
|
360 inputmap = varmap_test_repr(self.inputmap, self.plan.tablesinorder) |
|
361 except AttributeError: |
|
362 inputmap = self.inputmap |
|
363 return (self.__class__.__name__, |
|
364 sorted((r.as_string(kwargs=self.plan.args), r.solutions) |
|
365 for r in self.union.children), |
|
366 self.limit, self.offset, |
|
367 sorted(self.sources), inputmap) |
|
368 |
|
369 |
|
370 # UPDATE/INSERT/DELETE steps ################################################## |
|
371 |
|
372 class RelationsStep(Step): |
|
373 """step consisting in adding attributes/relations to entity defs from a |
|
374 previous FetchStep |
|
375 |
|
376 relations values comes from the latest result, with one columns for |
|
377 each relation defined in self.r_defs |
|
378 |
|
379 for one entity definition, we'll construct N entity, where N is the |
|
380 number of the latest result |
|
381 """ |
|
382 |
|
383 FINAL = 0 |
|
384 RELATION = 1 |
|
385 REVERSE_RELATION = 2 |
|
386 |
|
387 def __init__(self, plan, e_def, r_defs): |
|
388 Step.__init__(self, plan) |
|
389 # partial entity definition to expand |
|
390 self.e_def = e_def |
|
391 # definition of relations to complete |
|
392 self.r_defs = r_defs |
|
393 |
|
394 def execute(self): |
|
395 """execute this step""" |
|
396 base_e_def = self.e_def |
|
397 result = [] |
|
398 for row in self.execute_child(): |
|
399 # get a new entity definition for this row |
|
400 e_def = copy(base_e_def) |
|
401 # complete this entity def using row values |
|
402 for i in range(len(self.r_defs)): |
|
403 rtype, rorder = self.r_defs[i] |
|
404 if rorder == RelationsStep.FINAL: |
|
405 e_def[rtype] = row[i] |
|
406 elif rorder == RelationsStep.RELATION: |
|
407 self.plan.add_relation_def( (e_def, rtype, row[i]) ) |
|
408 e_def.querier_pending_relations[(rtype, 'subject')] = row[i] |
|
409 else: |
|
410 self.plan.add_relation_def( (row[i], rtype, e_def) ) |
|
411 e_def.querier_pending_relations[(rtype, 'object')] = row[i] |
|
412 result.append(e_def) |
|
413 self.plan.substitute_entity_def(base_e_def, result) |
|
414 |
|
415 |
|
416 class InsertStep(Step): |
|
417 """step consisting in inserting new entities / relations""" |
|
418 |
|
419 def execute(self): |
|
420 """execute this step""" |
|
421 for step in self.children: |
|
422 assert isinstance(step, RelationsStep) |
|
423 step.plan = self.plan |
|
424 step.execute() |
|
425 # insert entities first |
|
426 result = self.plan.insert_entity_defs() |
|
427 # then relation |
|
428 self.plan.insert_relation_defs() |
|
429 # return eids of inserted entities |
|
430 return result |
|
431 |
|
432 |
|
433 class DeleteEntitiesStep(Step): |
|
434 """step consisting in deleting entities""" |
|
435 |
|
436 def execute(self): |
|
437 """execute this step""" |
|
438 todelete = frozenset(typed_eid(eid) for eid, in self.execute_child()) |
|
439 session = self.plan.session |
|
440 delete = session.repo.glob_delete_entity |
|
441 # register pending eids first to avoid multiple deletion |
|
442 pending = session.query_data('pendingeids', set(), setdefault=True) |
|
443 actual = todelete - pending |
|
444 pending |= actual |
|
445 for eid in actual: |
|
446 delete(session, eid) |
|
447 |
|
448 |
|
449 class DeleteRelationsStep(Step): |
|
450 """step consisting in deleting relations""" |
|
451 |
|
452 def __init__(self, plan, rtype): |
|
453 Step.__init__(self, plan) |
|
454 self.rtype = rtype |
|
455 |
|
456 def execute(self): |
|
457 """execute this step""" |
|
458 session = self.plan.session |
|
459 delete = session.repo.glob_delete_relation |
|
460 for subj, obj in self.execute_child(): |
|
461 delete(session, subj, self.rtype, obj) |
|
462 |
|
463 |
|
464 class UpdateStep(Step): |
|
465 """step consisting in updating entities / adding relations from relations |
|
466 definitions and from results fetched in previous step |
|
467 """ |
|
468 |
|
469 def __init__(self, plan, attribute_relations, relations, selected_index): |
|
470 Step.__init__(self, plan) |
|
471 self.attribute_relations = attribute_relations |
|
472 self.relations = relations |
|
473 self.selected_index = selected_index |
|
474 |
|
475 def execute(self): |
|
476 """execute this step""" |
|
477 plan = self.plan |
|
478 session = self.plan.session |
|
479 repo = session.repo |
|
480 edefs = {} |
|
481 # insert relations |
|
482 for row in self.execute_child(): |
|
483 for relation in self.attribute_relations: |
|
484 lhs, rhs = relation.get_variable_parts() |
|
485 eid = typed_eid(row[self.selected_index[str(lhs)]]) |
|
486 try: |
|
487 edef = edefs[eid] |
|
488 except KeyError: |
|
489 edefs[eid] = edef = session.eid_rset(eid).get_entity(0, 0) |
|
490 if isinstance(rhs, Constant): |
|
491 # add constant values to entity def |
|
492 value = rhs.eval(plan.args) |
|
493 edef[relation.r_type] = value |
|
494 else: |
|
495 edef[relation.r_type] = row[self.selected_index[str(rhs)]] |
|
496 for relation in self.relations: |
|
497 subj = row[self.selected_index[str(relation.children[0])]] |
|
498 obj = row[self.selected_index[str(relation.children[1])]] |
|
499 repo.glob_add_relation(session, subj, relation.r_type, obj) |
|
500 # update entities |
|
501 result = [] |
|
502 for eid, edef in edefs.iteritems(): |
|
503 repo.glob_update_entity(session, edef) |
|
504 result.append( (eid,) ) |
|
505 return result |