1 # copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
3 # |
|
4 # This file is part of CubicWeb. |
|
5 # |
|
6 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
7 # terms of the GNU Lesser General Public License as published by the Free |
|
8 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
9 # any later version. |
|
10 # |
|
11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
13 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
14 # details. |
|
15 # |
|
16 # You should have received a copy of the GNU Lesser General Public License along |
|
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
18 """plan execution of rql queries on a single source""" |
|
19 |
|
20 __docformat__ = "restructuredtext en" |
|
21 |
|
22 from six import text_type |
|
23 |
|
24 from rql.stmts import Union, Select |
|
25 from rql.nodes import Constant, Relation |
|
26 |
|
27 from cubicweb import QueryError |
|
28 from cubicweb.schema import VIRTUAL_RTYPES |
|
29 from cubicweb.rqlrewrite import add_types_restriction |
|
30 from cubicweb.server.edition import EditedEntity |
|
31 |
|
32 READ_ONLY_RTYPES = set(('eid', 'has_text', 'is', 'is_instance_of', 'identity')) |
|
33 |
|
34 _CONSTANT = object() |
|
35 _FROM_SUBSTEP = object() |
|
36 |
|
37 def _extract_const_attributes(plan, rqlst, to_build): |
|
38 """add constant values to entity def, mark variables to be selected |
|
39 """ |
|
40 to_select = {} |
|
41 for relation in rqlst.main_relations: |
|
42 lhs, rhs = relation.get_variable_parts() |
|
43 rtype = relation.r_type |
|
44 if rtype in READ_ONLY_RTYPES: |
|
45 raise QueryError("can't assign to %s" % rtype) |
|
46 try: |
|
47 edef = to_build[str(lhs)] |
|
48 except KeyError: |
|
49 # lhs var is not to build, should be selected and added as an |
|
50 # object relation |
|
51 edef = to_build[str(rhs)] |
|
52 to_select.setdefault(edef, []).append((rtype, lhs, 1)) |
|
53 else: |
|
54 if isinstance(rhs, Constant) and not rhs.uid: |
|
55 # add constant values to entity def |
|
56 value = rhs.eval(plan.args) |
|
57 eschema = edef.entity.e_schema |
|
58 attrtype = eschema.subjrels[rtype].objects(eschema)[0] |
|
59 if attrtype == 'Password' and isinstance(value, text_type): |
|
60 value = value.encode('UTF8') |
|
61 edef.edited_attribute(rtype, value) |
|
62 elif str(rhs) in to_build: |
|
63 # create a relation between two newly created variables |
|
64 plan.add_relation_def((edef, rtype, to_build[rhs.name])) |
|
65 else: |
|
66 to_select.setdefault(edef, []).append( (rtype, rhs, 0) ) |
|
67 return to_select |
|
68 |
|
69 def _extract_eid_consts(plan, rqlst): |
|
70 """return a dict mapping rqlst variable object to their eid if specified in |
|
71 the syntax tree |
|
72 """ |
|
73 cnx = plan.cnx |
|
74 if rqlst.where is None: |
|
75 return {} |
|
76 eidconsts = {} |
|
77 neweids = cnx.transaction_data.get('neweids', ()) |
|
78 checkread = cnx.read_security |
|
79 eschema = cnx.vreg.schema.eschema |
|
80 for rel in rqlst.where.get_nodes(Relation): |
|
81 # only care for 'eid' relations ... |
|
82 if (rel.r_type == 'eid' |
|
83 # ... that are not part of a NOT clause ... |
|
84 and not rel.neged(strict=True) |
|
85 # ... and where eid is specified by '=' operator. |
|
86 and rel.children[1].operator == '='): |
|
87 lhs, rhs = rel.get_variable_parts() |
|
88 if isinstance(rhs, Constant): |
|
89 eid = int(rhs.eval(plan.args)) |
|
90 # check read permission here since it may not be done by |
|
91 # the generated select substep if not emited (eg nothing |
|
92 # to be selected) |
|
93 if checkread and eid not in neweids: |
|
94 with cnx.security_enabled(read=False): |
|
95 eschema(cnx.entity_metas(eid)['type']).check_perm( |
|
96 cnx, 'read', eid=eid) |
|
97 eidconsts[lhs.variable] = eid |
|
98 return eidconsts |
|
99 |
|
100 def _build_substep_query(select, origrqlst): |
|
101 """Finalize substep select query that should be executed to get proper |
|
102 selection of stuff to insert/update. |
|
103 |
|
104 Return None when no query actually needed, else the given select node that |
|
105 will be used as substep query. |
|
106 """ |
|
107 if origrqlst.where is not None and not select.selection: |
|
108 # no selection, append one randomly by searching for a relation which is |
|
109 # not neged neither a type restriction (is/is_instance_of) |
|
110 for rel in origrqlst.where.iget_nodes(Relation): |
|
111 if not (rel.neged(traverse_scope=True) or rel.is_types_restriction()): |
|
112 select.append_selected(rel.children[0].copy(select)) |
|
113 break |
|
114 else: |
|
115 return None |
|
116 if select.selection: |
|
117 if origrqlst.where is not None: |
|
118 select.set_where(origrqlst.where.copy(select)) |
|
119 if getattr(origrqlst, 'having', None): |
|
120 select.set_having([sq.copy(select) for sq in origrqlst.having]) |
|
121 return select |
|
122 return None |
|
123 |
|
124 class SSPlanner(object): |
|
125 """SingleSourcePlanner: build execution plan for rql queries |
|
126 |
|
127 optimized for single source repositories |
|
128 """ |
|
129 |
|
130 def __init__(self, schema, rqlhelper): |
|
131 self.schema = schema |
|
132 self.rqlhelper = rqlhelper |
|
133 |
|
134 def build_plan(self, plan): |
|
135 """build an execution plan from a RQL query |
|
136 |
|
137 do nothing here, dispatch according to the statement type |
|
138 """ |
|
139 build_plan = getattr(self, 'build_%s_plan' % plan.rqlst.TYPE) |
|
140 for step in build_plan(plan, plan.rqlst): |
|
141 plan.add_step(step) |
|
142 |
|
143 def build_select_plan(self, plan, rqlst): |
|
144 """build execution plan for a SELECT RQL query. Suppose only one source |
|
145 is available and so avoid work need for query decomposition among sources |
|
146 |
|
147 the rqlst should not be tagged at this point. |
|
148 """ |
|
149 plan.preprocess(rqlst) |
|
150 return (OneFetchStep(plan, rqlst),) |
|
151 |
|
152 def build_insert_plan(self, plan, rqlst): |
|
153 """get an execution plan from an INSERT RQL query""" |
|
154 # each variable in main variables is a new entity to insert |
|
155 to_build = {} |
|
156 cnx = plan.cnx |
|
157 etype_class = cnx.vreg['etypes'].etype_class |
|
158 for etype, var in rqlst.main_variables: |
|
159 # need to do this since entity class is shared w. web client code ! |
|
160 to_build[var.name] = EditedEntity(etype_class(etype)(cnx)) |
|
161 plan.add_entity_def(to_build[var.name]) |
|
162 # add constant values to entity def, mark variables to be selected |
|
163 to_select = _extract_const_attributes(plan, rqlst, to_build) |
|
164 # add necessary steps to add relations and update attributes |
|
165 step = InsertStep(plan) # insert each entity and its relations |
|
166 step.children += self._compute_relation_steps(plan, rqlst, to_select) |
|
167 return (step,) |
|
168 |
|
169 def _compute_relation_steps(self, plan, rqlst, to_select): |
|
170 """handle the selection of relations for an insert query""" |
|
171 eidconsts = _extract_eid_consts(plan, rqlst) |
|
172 for edef, rdefs in to_select.items(): |
|
173 # create a select rql st to fetch needed data |
|
174 select = Select() |
|
175 eschema = edef.entity.e_schema |
|
176 for i, (rtype, term, reverse) in enumerate(rdefs): |
|
177 if getattr(term, 'variable', None) in eidconsts: |
|
178 value = eidconsts[term.variable] |
|
179 else: |
|
180 select.append_selected(term.copy(select)) |
|
181 value = _FROM_SUBSTEP |
|
182 if reverse: |
|
183 rdefs[i] = (rtype, InsertRelationsStep.REVERSE_RELATION, value) |
|
184 else: |
|
185 rschema = eschema.subjrels[rtype] |
|
186 if rschema.final or rschema.inlined: |
|
187 rdefs[i] = (rtype, InsertRelationsStep.FINAL, value) |
|
188 else: |
|
189 rdefs[i] = (rtype, InsertRelationsStep.RELATION, value) |
|
190 step = InsertRelationsStep(plan, edef, rdefs) |
|
191 select = _build_substep_query(select, rqlst) |
|
192 if select is not None: |
|
193 step.children += self._select_plan(plan, select, rqlst.solutions) |
|
194 yield step |
|
195 |
|
196 def build_delete_plan(self, plan, rqlst): |
|
197 """get an execution plan from a DELETE RQL query""" |
|
198 # build a select query to fetch entities to delete |
|
199 steps = [] |
|
200 for etype, var in rqlst.main_variables: |
|
201 step = DeleteEntitiesStep(plan) |
|
202 step.children += self._sel_variable_step(plan, rqlst, etype, var) |
|
203 steps.append(step) |
|
204 for relation in rqlst.main_relations: |
|
205 step = DeleteRelationsStep(plan, relation.r_type) |
|
206 step.children += self._sel_relation_steps(plan, rqlst, relation) |
|
207 steps.append(step) |
|
208 return steps |
|
209 |
|
210 def _sel_variable_step(self, plan, rqlst, etype, varref): |
|
211 """handle the selection of variables for a delete query""" |
|
212 select = Select() |
|
213 varref = varref.copy(select) |
|
214 select.defined_vars = {varref.name: varref.variable} |
|
215 select.append_selected(varref) |
|
216 if rqlst.where is not None: |
|
217 select.set_where(rqlst.where.copy(select)) |
|
218 if getattr(rqlst, 'having', None): |
|
219 select.set_having([x.copy(select) for x in rqlst.having]) |
|
220 if etype != 'Any': |
|
221 select.add_type_restriction(varref.variable, etype) |
|
222 return self._select_plan(plan, select, rqlst.solutions) |
|
223 |
|
224 def _sel_relation_steps(self, plan, rqlst, relation): |
|
225 """handle the selection of relations for a delete query""" |
|
226 select = Select() |
|
227 lhs, rhs = relation.get_variable_parts() |
|
228 select.append_selected(lhs.copy(select)) |
|
229 select.append_selected(rhs.copy(select)) |
|
230 select.set_where(relation.copy(select)) |
|
231 if rqlst.where is not None: |
|
232 select.add_restriction(rqlst.where.copy(select)) |
|
233 if getattr(rqlst, 'having', None): |
|
234 select.set_having([x.copy(select) for x in rqlst.having]) |
|
235 return self._select_plan(plan, select, rqlst.solutions) |
|
236 |
|
237 def build_set_plan(self, plan, rqlst): |
|
238 """get an execution plan from an SET RQL query""" |
|
239 getrschema = self.schema.rschema |
|
240 select = Select() # potential substep query |
|
241 selectedidx = {} # local state |
|
242 attributes = set() # edited attributes |
|
243 updatedefs = [] # definition of update attributes/relations |
|
244 selidx = residx = 0 # substep selection / resulting rset indexes |
|
245 # search for eid const in the WHERE clause |
|
246 eidconsts = _extract_eid_consts(plan, rqlst) |
|
247 # build `updatedefs` describing things to update and add necessary |
|
248 # variables to the substep selection |
|
249 for i, relation in enumerate(rqlst.main_relations): |
|
250 if relation.r_type in VIRTUAL_RTYPES: |
|
251 raise QueryError('can not assign to %r relation' |
|
252 % relation.r_type) |
|
253 lhs, rhs = relation.get_variable_parts() |
|
254 lhskey = lhs.as_string() |
|
255 if not lhskey in selectedidx: |
|
256 if lhs.variable in eidconsts: |
|
257 eid = eidconsts[lhs.variable] |
|
258 lhsinfo = (_CONSTANT, eid, residx) |
|
259 else: |
|
260 select.append_selected(lhs.copy(select)) |
|
261 lhsinfo = (_FROM_SUBSTEP, selidx, residx) |
|
262 selidx += 1 |
|
263 residx += 1 |
|
264 selectedidx[lhskey] = lhsinfo |
|
265 else: |
|
266 lhsinfo = selectedidx[lhskey][:-1] + (None,) |
|
267 rhskey = rhs.as_string() |
|
268 if not rhskey in selectedidx: |
|
269 if isinstance(rhs, Constant): |
|
270 rhsinfo = (_CONSTANT, rhs.eval(plan.args), residx) |
|
271 elif getattr(rhs, 'variable', None) in eidconsts: |
|
272 eid = eidconsts[rhs.variable] |
|
273 rhsinfo = (_CONSTANT, eid, residx) |
|
274 else: |
|
275 select.append_selected(rhs.copy(select)) |
|
276 rhsinfo = (_FROM_SUBSTEP, selidx, residx) |
|
277 selidx += 1 |
|
278 residx += 1 |
|
279 selectedidx[rhskey] = rhsinfo |
|
280 else: |
|
281 rhsinfo = selectedidx[rhskey][:-1] + (None,) |
|
282 rschema = getrschema(relation.r_type) |
|
283 updatedefs.append( (lhsinfo, rhsinfo, rschema) ) |
|
284 # the update step |
|
285 step = UpdateStep(plan, updatedefs) |
|
286 # when necessary add substep to fetch yet unknown values |
|
287 select = _build_substep_query(select, rqlst) |
|
288 if select is not None: |
|
289 # set distinct to avoid potential duplicate key error |
|
290 select.distinct = True |
|
291 step.children += self._select_plan(plan, select, rqlst.solutions) |
|
292 return (step,) |
|
293 |
|
294 # internal methods ######################################################## |
|
295 |
|
296 def _select_plan(self, plan, select, solutions): |
|
297 union = Union() |
|
298 union.append(select) |
|
299 select.clean_solutions(solutions) |
|
300 add_types_restriction(self.schema, select) |
|
301 self.rqlhelper.annotate(union) |
|
302 return self.build_select_plan(plan, union) |
|
303 |
|
304 |
|
305 # execution steps and helper functions ######################################## |
|
306 |
|
307 def varmap_test_repr(varmap, tablesinorder): |
|
308 if varmap is None: |
|
309 return varmap |
|
310 maprepr = {} |
|
311 for var, sql in varmap.items(): |
|
312 table, col = sql.split('.') |
|
313 maprepr[var] = '%s.%s' % (tablesinorder[table], col) |
|
314 return maprepr |
|
315 |
|
316 class Step(object): |
|
317 """base abstract class for execution step""" |
|
318 def __init__(self, plan): |
|
319 self.plan = plan |
|
320 self.children = [] |
|
321 |
|
322 def execute_child(self): |
|
323 assert len(self.children) == 1 |
|
324 return self.children[0].execute() |
|
325 |
|
326 def execute_children(self): |
|
327 for step in self.children: |
|
328 step.execute() |
|
329 |
|
330 def execute(self): |
|
331 """execute this step and store partial (eg this step) results""" |
|
332 raise NotImplementedError() |
|
333 |
|
334 def mytest_repr(self): |
|
335 """return a representation of this step suitable for test""" |
|
336 return (self.__class__.__name__,) |
|
337 |
|
338 def test_repr(self): |
|
339 """return a representation of this step suitable for test""" |
|
340 return self.mytest_repr() + ( |
|
341 [step.test_repr() for step in self.children],) |
|
342 |
|
343 |
|
344 class OneFetchStep(Step): |
|
345 """step consisting in fetching data from sources and directly returning |
|
346 results |
|
347 """ |
|
348 def __init__(self, plan, union, inputmap=None): |
|
349 Step.__init__(self, plan) |
|
350 self.union = union |
|
351 self.inputmap = inputmap |
|
352 |
|
353 def execute(self): |
|
354 """call .syntax_tree_search with the given syntax tree on each |
|
355 source for each solution |
|
356 """ |
|
357 self.execute_children() |
|
358 cnx = self.plan.cnx |
|
359 args = self.plan.args |
|
360 inputmap = self.inputmap |
|
361 union = self.union |
|
362 # do we have to use a inputmap from a previous step ? If so disable |
|
363 # cachekey |
|
364 if inputmap or self.plan.cache_key is None: |
|
365 cachekey = None |
|
366 # union may have been splited into subqueries, in which case we can't |
|
367 # use plan.cache_key, rebuild a cache key |
|
368 elif isinstance(self.plan.cache_key, tuple): |
|
369 cachekey = list(self.plan.cache_key) |
|
370 cachekey[0] = union.as_string() |
|
371 cachekey = tuple(cachekey) |
|
372 else: |
|
373 cachekey = union.as_string() |
|
374 # get results for query |
|
375 source = cnx.repo.system_source |
|
376 result = source.syntax_tree_search(cnx, union, args, cachekey, inputmap) |
|
377 #print 'ONEFETCH RESULT %s' % (result) |
|
378 return result |
|
379 |
|
380 def mytest_repr(self): |
|
381 """return a representation of this step suitable for test""" |
|
382 try: |
|
383 inputmap = varmap_test_repr(self.inputmap, self.plan.tablesinorder) |
|
384 except AttributeError: |
|
385 inputmap = self.inputmap |
|
386 return (self.__class__.__name__, |
|
387 sorted((r.as_string(kwargs=self.plan.args), r.solutions) |
|
388 for r in self.union.children), |
|
389 inputmap) |
|
390 |
|
391 |
|
392 # UPDATE/INSERT/DELETE steps ################################################## |
|
393 |
|
394 class InsertRelationsStep(Step): |
|
395 """step consisting in adding attributes/relations to entity defs from a |
|
396 previous FetchStep |
|
397 |
|
398 relations values comes from the latest result, with one columns for |
|
399 each relation defined in self.rdefs |
|
400 |
|
401 for one entity definition, we'll construct N entity, where N is the |
|
402 number of the latest result |
|
403 """ |
|
404 |
|
405 FINAL = 0 |
|
406 RELATION = 1 |
|
407 REVERSE_RELATION = 2 |
|
408 |
|
409 def __init__(self, plan, edef, rdefs): |
|
410 Step.__init__(self, plan) |
|
411 # partial entity definition to expand |
|
412 self.edef = edef |
|
413 # definition of relations to complete |
|
414 self.rdefs = rdefs |
|
415 |
|
416 def execute(self): |
|
417 """execute this step""" |
|
418 base_edef = self.edef |
|
419 edefs = [] |
|
420 if self.children: |
|
421 result = self.execute_child() |
|
422 else: |
|
423 result = [[]] |
|
424 for row in result: |
|
425 # get a new entity definition for this row |
|
426 edef = base_edef.clone() |
|
427 # complete this entity def using row values |
|
428 index = 0 |
|
429 for rtype, rorder, value in self.rdefs: |
|
430 if value is _FROM_SUBSTEP: |
|
431 value = row[index] |
|
432 index += 1 |
|
433 if rorder == InsertRelationsStep.FINAL: |
|
434 edef.edited_attribute(rtype, value) |
|
435 elif rorder == InsertRelationsStep.RELATION: |
|
436 self.plan.add_relation_def( (edef, rtype, value) ) |
|
437 edef.querier_pending_relations[(rtype, 'subject')] = value |
|
438 else: |
|
439 self.plan.add_relation_def( (value, rtype, edef) ) |
|
440 edef.querier_pending_relations[(rtype, 'object')] = value |
|
441 edefs.append(edef) |
|
442 self.plan.substitute_entity_def(base_edef, edefs) |
|
443 return result |
|
444 |
|
445 |
|
446 class InsertStep(Step): |
|
447 """step consisting in inserting new entities / relations""" |
|
448 |
|
449 def execute(self): |
|
450 """execute this step""" |
|
451 for step in self.children: |
|
452 assert isinstance(step, InsertRelationsStep) |
|
453 step.plan = self.plan |
|
454 step.execute() |
|
455 # insert entities first |
|
456 result = self.plan.insert_entity_defs() |
|
457 # then relation |
|
458 self.plan.insert_relation_defs() |
|
459 # return eids of inserted entities |
|
460 return result |
|
461 |
|
462 |
|
463 class DeleteEntitiesStep(Step): |
|
464 """step consisting in deleting entities""" |
|
465 |
|
466 def execute(self): |
|
467 """execute this step""" |
|
468 results = self.execute_child() |
|
469 if results: |
|
470 todelete = frozenset(int(eid) for eid, in results) |
|
471 cnx = self.plan.cnx |
|
472 cnx.repo.glob_delete_entities(cnx, todelete) |
|
473 return results |
|
474 |
|
475 class DeleteRelationsStep(Step): |
|
476 """step consisting in deleting relations""" |
|
477 |
|
478 def __init__(self, plan, rtype): |
|
479 Step.__init__(self, plan) |
|
480 self.rtype = rtype |
|
481 |
|
482 def execute(self): |
|
483 """execute this step""" |
|
484 cnx = self.plan.cnx |
|
485 delete = cnx.repo.glob_delete_relation |
|
486 for subj, obj in self.execute_child(): |
|
487 delete(cnx, subj, self.rtype, obj) |
|
488 |
|
489 |
|
490 class UpdateStep(Step): |
|
491 """step consisting in updating entities / adding relations from relations |
|
492 definitions and from results fetched in previous step |
|
493 """ |
|
494 |
|
495 def __init__(self, plan, updatedefs): |
|
496 Step.__init__(self, plan) |
|
497 self.updatedefs = updatedefs |
|
498 |
|
499 def execute(self): |
|
500 """execute this step""" |
|
501 cnx = self.plan.cnx |
|
502 repo = cnx.repo |
|
503 edefs = {} |
|
504 relations = {} |
|
505 # insert relations |
|
506 if self.children: |
|
507 result = self.execute_child() |
|
508 else: |
|
509 result = [[]] |
|
510 for i, row in enumerate(result): |
|
511 newrow = [] |
|
512 for (lhsinfo, rhsinfo, rschema) in self.updatedefs: |
|
513 lhsval = _handle_relterm(lhsinfo, row, newrow) |
|
514 rhsval = _handle_relterm(rhsinfo, row, newrow) |
|
515 if rschema.final or rschema.inlined: |
|
516 eid = int(lhsval) |
|
517 try: |
|
518 edited = edefs[eid] |
|
519 except KeyError: |
|
520 edef = cnx.entity_from_eid(eid) |
|
521 edefs[eid] = edited = EditedEntity(edef) |
|
522 edited.edited_attribute(str(rschema), rhsval) |
|
523 else: |
|
524 str_rschema = str(rschema) |
|
525 if str_rschema in relations: |
|
526 relations[str_rschema].append((lhsval, rhsval)) |
|
527 else: |
|
528 relations[str_rschema] = [(lhsval, rhsval)] |
|
529 result[i] = newrow |
|
530 # update entities |
|
531 repo.glob_add_relations(cnx, relations) |
|
532 for eid, edited in edefs.items(): |
|
533 repo.glob_update_entity(cnx, edited) |
|
534 return result |
|
535 |
|
536 def _handle_relterm(info, row, newrow): |
|
537 if info[0] is _CONSTANT: |
|
538 val = info[1] |
|
539 else: # _FROM_SUBSTEP |
|
540 val = row[info[1]] |
|
541 if info[-1] is not None: |
|
542 newrow.append(val) |
|
543 return val |
|