diff -r 058bb3dc685f -r 0b59724cb3f2 cubicweb/server/ssplanner.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cubicweb/server/ssplanner.py Sat Jan 16 13:48:51 2016 +0100 @@ -0,0 +1,543 @@ +# copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr +# +# This file is part of CubicWeb. +# +# CubicWeb is free software: you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 2.1 of the License, or (at your option) +# any later version. +# +# CubicWeb is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with CubicWeb. If not, see . +"""plan execution of rql queries on a single source""" + +__docformat__ = "restructuredtext en" + +from six import text_type + +from rql.stmts import Union, Select +from rql.nodes import Constant, Relation + +from cubicweb import QueryError +from cubicweb.schema import VIRTUAL_RTYPES +from cubicweb.rqlrewrite import add_types_restriction +from cubicweb.server.edition import EditedEntity + +READ_ONLY_RTYPES = set(('eid', 'has_text', 'is', 'is_instance_of', 'identity')) + +_CONSTANT = object() +_FROM_SUBSTEP = object() + +def _extract_const_attributes(plan, rqlst, to_build): + """add constant values to entity def, mark variables to be selected + """ + to_select = {} + for relation in rqlst.main_relations: + lhs, rhs = relation.get_variable_parts() + rtype = relation.r_type + if rtype in READ_ONLY_RTYPES: + raise QueryError("can't assign to %s" % rtype) + try: + edef = to_build[str(lhs)] + except KeyError: + # lhs var is not to build, should be selected and added as an + # object relation + edef = to_build[str(rhs)] + to_select.setdefault(edef, []).append((rtype, lhs, 1)) + else: + if isinstance(rhs, Constant) and not rhs.uid: + # add constant values to entity def + value = rhs.eval(plan.args) + eschema = edef.entity.e_schema + attrtype = eschema.subjrels[rtype].objects(eschema)[0] + if attrtype == 'Password' and isinstance(value, text_type): + value = value.encode('UTF8') + edef.edited_attribute(rtype, value) + elif str(rhs) in to_build: + # create a relation between two newly created variables + plan.add_relation_def((edef, rtype, to_build[rhs.name])) + else: + to_select.setdefault(edef, []).append( (rtype, rhs, 0) ) + return to_select + +def _extract_eid_consts(plan, rqlst): + """return a dict mapping rqlst variable object to their eid if specified in + the syntax tree + """ + cnx = plan.cnx + if rqlst.where is None: + return {} + eidconsts = {} + neweids = cnx.transaction_data.get('neweids', ()) + checkread = cnx.read_security + eschema = cnx.vreg.schema.eschema + for rel in rqlst.where.get_nodes(Relation): + # only care for 'eid' relations ... + if (rel.r_type == 'eid' + # ... that are not part of a NOT clause ... + and not rel.neged(strict=True) + # ... and where eid is specified by '=' operator. + and rel.children[1].operator == '='): + lhs, rhs = rel.get_variable_parts() + if isinstance(rhs, Constant): + eid = int(rhs.eval(plan.args)) + # check read permission here since it may not be done by + # the generated select substep if not emited (eg nothing + # to be selected) + if checkread and eid not in neweids: + with cnx.security_enabled(read=False): + eschema(cnx.entity_metas(eid)['type']).check_perm( + cnx, 'read', eid=eid) + eidconsts[lhs.variable] = eid + return eidconsts + +def _build_substep_query(select, origrqlst): + """Finalize substep select query that should be executed to get proper + selection of stuff to insert/update. + + Return None when no query actually needed, else the given select node that + will be used as substep query. + """ + if origrqlst.where is not None and not select.selection: + # no selection, append one randomly by searching for a relation which is + # not neged neither a type restriction (is/is_instance_of) + for rel in origrqlst.where.iget_nodes(Relation): + if not (rel.neged(traverse_scope=True) or rel.is_types_restriction()): + select.append_selected(rel.children[0].copy(select)) + break + else: + return None + if select.selection: + if origrqlst.where is not None: + select.set_where(origrqlst.where.copy(select)) + if getattr(origrqlst, 'having', None): + select.set_having([sq.copy(select) for sq in origrqlst.having]) + return select + return None + +class SSPlanner(object): + """SingleSourcePlanner: build execution plan for rql queries + + optimized for single source repositories + """ + + def __init__(self, schema, rqlhelper): + self.schema = schema + self.rqlhelper = rqlhelper + + def build_plan(self, plan): + """build an execution plan from a RQL query + + do nothing here, dispatch according to the statement type + """ + build_plan = getattr(self, 'build_%s_plan' % plan.rqlst.TYPE) + for step in build_plan(plan, plan.rqlst): + plan.add_step(step) + + def build_select_plan(self, plan, rqlst): + """build execution plan for a SELECT RQL query. Suppose only one source + is available and so avoid work need for query decomposition among sources + + the rqlst should not be tagged at this point. + """ + plan.preprocess(rqlst) + return (OneFetchStep(plan, rqlst),) + + def build_insert_plan(self, plan, rqlst): + """get an execution plan from an INSERT RQL query""" + # each variable in main variables is a new entity to insert + to_build = {} + cnx = plan.cnx + etype_class = cnx.vreg['etypes'].etype_class + for etype, var in rqlst.main_variables: + # need to do this since entity class is shared w. web client code ! + to_build[var.name] = EditedEntity(etype_class(etype)(cnx)) + plan.add_entity_def(to_build[var.name]) + # add constant values to entity def, mark variables to be selected + to_select = _extract_const_attributes(plan, rqlst, to_build) + # add necessary steps to add relations and update attributes + step = InsertStep(plan) # insert each entity and its relations + step.children += self._compute_relation_steps(plan, rqlst, to_select) + return (step,) + + def _compute_relation_steps(self, plan, rqlst, to_select): + """handle the selection of relations for an insert query""" + eidconsts = _extract_eid_consts(plan, rqlst) + for edef, rdefs in to_select.items(): + # create a select rql st to fetch needed data + select = Select() + eschema = edef.entity.e_schema + for i, (rtype, term, reverse) in enumerate(rdefs): + if getattr(term, 'variable', None) in eidconsts: + value = eidconsts[term.variable] + else: + select.append_selected(term.copy(select)) + value = _FROM_SUBSTEP + if reverse: + rdefs[i] = (rtype, InsertRelationsStep.REVERSE_RELATION, value) + else: + rschema = eschema.subjrels[rtype] + if rschema.final or rschema.inlined: + rdefs[i] = (rtype, InsertRelationsStep.FINAL, value) + else: + rdefs[i] = (rtype, InsertRelationsStep.RELATION, value) + step = InsertRelationsStep(plan, edef, rdefs) + select = _build_substep_query(select, rqlst) + if select is not None: + step.children += self._select_plan(plan, select, rqlst.solutions) + yield step + + def build_delete_plan(self, plan, rqlst): + """get an execution plan from a DELETE RQL query""" + # build a select query to fetch entities to delete + steps = [] + for etype, var in rqlst.main_variables: + step = DeleteEntitiesStep(plan) + step.children += self._sel_variable_step(plan, rqlst, etype, var) + steps.append(step) + for relation in rqlst.main_relations: + step = DeleteRelationsStep(plan, relation.r_type) + step.children += self._sel_relation_steps(plan, rqlst, relation) + steps.append(step) + return steps + + def _sel_variable_step(self, plan, rqlst, etype, varref): + """handle the selection of variables for a delete query""" + select = Select() + varref = varref.copy(select) + select.defined_vars = {varref.name: varref.variable} + select.append_selected(varref) + if rqlst.where is not None: + select.set_where(rqlst.where.copy(select)) + if getattr(rqlst, 'having', None): + select.set_having([x.copy(select) for x in rqlst.having]) + if etype != 'Any': + select.add_type_restriction(varref.variable, etype) + return self._select_plan(plan, select, rqlst.solutions) + + def _sel_relation_steps(self, plan, rqlst, relation): + """handle the selection of relations for a delete query""" + select = Select() + lhs, rhs = relation.get_variable_parts() + select.append_selected(lhs.copy(select)) + select.append_selected(rhs.copy(select)) + select.set_where(relation.copy(select)) + if rqlst.where is not None: + select.add_restriction(rqlst.where.copy(select)) + if getattr(rqlst, 'having', None): + select.set_having([x.copy(select) for x in rqlst.having]) + return self._select_plan(plan, select, rqlst.solutions) + + def build_set_plan(self, plan, rqlst): + """get an execution plan from an SET RQL query""" + getrschema = self.schema.rschema + select = Select() # potential substep query + selectedidx = {} # local state + attributes = set() # edited attributes + updatedefs = [] # definition of update attributes/relations + selidx = residx = 0 # substep selection / resulting rset indexes + # search for eid const in the WHERE clause + eidconsts = _extract_eid_consts(plan, rqlst) + # build `updatedefs` describing things to update and add necessary + # variables to the substep selection + for i, relation in enumerate(rqlst.main_relations): + if relation.r_type in VIRTUAL_RTYPES: + raise QueryError('can not assign to %r relation' + % relation.r_type) + lhs, rhs = relation.get_variable_parts() + lhskey = lhs.as_string() + if not lhskey in selectedidx: + if lhs.variable in eidconsts: + eid = eidconsts[lhs.variable] + lhsinfo = (_CONSTANT, eid, residx) + else: + select.append_selected(lhs.copy(select)) + lhsinfo = (_FROM_SUBSTEP, selidx, residx) + selidx += 1 + residx += 1 + selectedidx[lhskey] = lhsinfo + else: + lhsinfo = selectedidx[lhskey][:-1] + (None,) + rhskey = rhs.as_string() + if not rhskey in selectedidx: + if isinstance(rhs, Constant): + rhsinfo = (_CONSTANT, rhs.eval(plan.args), residx) + elif getattr(rhs, 'variable', None) in eidconsts: + eid = eidconsts[rhs.variable] + rhsinfo = (_CONSTANT, eid, residx) + else: + select.append_selected(rhs.copy(select)) + rhsinfo = (_FROM_SUBSTEP, selidx, residx) + selidx += 1 + residx += 1 + selectedidx[rhskey] = rhsinfo + else: + rhsinfo = selectedidx[rhskey][:-1] + (None,) + rschema = getrschema(relation.r_type) + updatedefs.append( (lhsinfo, rhsinfo, rschema) ) + # the update step + step = UpdateStep(plan, updatedefs) + # when necessary add substep to fetch yet unknown values + select = _build_substep_query(select, rqlst) + if select is not None: + # set distinct to avoid potential duplicate key error + select.distinct = True + step.children += self._select_plan(plan, select, rqlst.solutions) + return (step,) + + # internal methods ######################################################## + + def _select_plan(self, plan, select, solutions): + union = Union() + union.append(select) + select.clean_solutions(solutions) + add_types_restriction(self.schema, select) + self.rqlhelper.annotate(union) + return self.build_select_plan(plan, union) + + +# execution steps and helper functions ######################################## + +def varmap_test_repr(varmap, tablesinorder): + if varmap is None: + return varmap + maprepr = {} + for var, sql in varmap.items(): + table, col = sql.split('.') + maprepr[var] = '%s.%s' % (tablesinorder[table], col) + return maprepr + +class Step(object): + """base abstract class for execution step""" + def __init__(self, plan): + self.plan = plan + self.children = [] + + def execute_child(self): + assert len(self.children) == 1 + return self.children[0].execute() + + def execute_children(self): + for step in self.children: + step.execute() + + def execute(self): + """execute this step and store partial (eg this step) results""" + raise NotImplementedError() + + def mytest_repr(self): + """return a representation of this step suitable for test""" + return (self.__class__.__name__,) + + def test_repr(self): + """return a representation of this step suitable for test""" + return self.mytest_repr() + ( + [step.test_repr() for step in self.children],) + + +class OneFetchStep(Step): + """step consisting in fetching data from sources and directly returning + results + """ + def __init__(self, plan, union, inputmap=None): + Step.__init__(self, plan) + self.union = union + self.inputmap = inputmap + + def execute(self): + """call .syntax_tree_search with the given syntax tree on each + source for each solution + """ + self.execute_children() + cnx = self.plan.cnx + args = self.plan.args + inputmap = self.inputmap + union = self.union + # do we have to use a inputmap from a previous step ? If so disable + # cachekey + if inputmap or self.plan.cache_key is None: + cachekey = None + # union may have been splited into subqueries, in which case we can't + # use plan.cache_key, rebuild a cache key + elif isinstance(self.plan.cache_key, tuple): + cachekey = list(self.plan.cache_key) + cachekey[0] = union.as_string() + cachekey = tuple(cachekey) + else: + cachekey = union.as_string() + # get results for query + source = cnx.repo.system_source + result = source.syntax_tree_search(cnx, union, args, cachekey, inputmap) + #print 'ONEFETCH RESULT %s' % (result) + return result + + def mytest_repr(self): + """return a representation of this step suitable for test""" + try: + inputmap = varmap_test_repr(self.inputmap, self.plan.tablesinorder) + except AttributeError: + inputmap = self.inputmap + return (self.__class__.__name__, + sorted((r.as_string(kwargs=self.plan.args), r.solutions) + for r in self.union.children), + inputmap) + + +# UPDATE/INSERT/DELETE steps ################################################## + +class InsertRelationsStep(Step): + """step consisting in adding attributes/relations to entity defs from a + previous FetchStep + + relations values comes from the latest result, with one columns for + each relation defined in self.rdefs + + for one entity definition, we'll construct N entity, where N is the + number of the latest result + """ + + FINAL = 0 + RELATION = 1 + REVERSE_RELATION = 2 + + def __init__(self, plan, edef, rdefs): + Step.__init__(self, plan) + # partial entity definition to expand + self.edef = edef + # definition of relations to complete + self.rdefs = rdefs + + def execute(self): + """execute this step""" + base_edef = self.edef + edefs = [] + if self.children: + result = self.execute_child() + else: + result = [[]] + for row in result: + # get a new entity definition for this row + edef = base_edef.clone() + # complete this entity def using row values + index = 0 + for rtype, rorder, value in self.rdefs: + if value is _FROM_SUBSTEP: + value = row[index] + index += 1 + if rorder == InsertRelationsStep.FINAL: + edef.edited_attribute(rtype, value) + elif rorder == InsertRelationsStep.RELATION: + self.plan.add_relation_def( (edef, rtype, value) ) + edef.querier_pending_relations[(rtype, 'subject')] = value + else: + self.plan.add_relation_def( (value, rtype, edef) ) + edef.querier_pending_relations[(rtype, 'object')] = value + edefs.append(edef) + self.plan.substitute_entity_def(base_edef, edefs) + return result + + +class InsertStep(Step): + """step consisting in inserting new entities / relations""" + + def execute(self): + """execute this step""" + for step in self.children: + assert isinstance(step, InsertRelationsStep) + step.plan = self.plan + step.execute() + # insert entities first + result = self.plan.insert_entity_defs() + # then relation + self.plan.insert_relation_defs() + # return eids of inserted entities + return result + + +class DeleteEntitiesStep(Step): + """step consisting in deleting entities""" + + def execute(self): + """execute this step""" + results = self.execute_child() + if results: + todelete = frozenset(int(eid) for eid, in results) + cnx = self.plan.cnx + cnx.repo.glob_delete_entities(cnx, todelete) + return results + +class DeleteRelationsStep(Step): + """step consisting in deleting relations""" + + def __init__(self, plan, rtype): + Step.__init__(self, plan) + self.rtype = rtype + + def execute(self): + """execute this step""" + cnx = self.plan.cnx + delete = cnx.repo.glob_delete_relation + for subj, obj in self.execute_child(): + delete(cnx, subj, self.rtype, obj) + + +class UpdateStep(Step): + """step consisting in updating entities / adding relations from relations + definitions and from results fetched in previous step + """ + + def __init__(self, plan, updatedefs): + Step.__init__(self, plan) + self.updatedefs = updatedefs + + def execute(self): + """execute this step""" + cnx = self.plan.cnx + repo = cnx.repo + edefs = {} + relations = {} + # insert relations + if self.children: + result = self.execute_child() + else: + result = [[]] + for i, row in enumerate(result): + newrow = [] + for (lhsinfo, rhsinfo, rschema) in self.updatedefs: + lhsval = _handle_relterm(lhsinfo, row, newrow) + rhsval = _handle_relterm(rhsinfo, row, newrow) + if rschema.final or rschema.inlined: + eid = int(lhsval) + try: + edited = edefs[eid] + except KeyError: + edef = cnx.entity_from_eid(eid) + edefs[eid] = edited = EditedEntity(edef) + edited.edited_attribute(str(rschema), rhsval) + else: + str_rschema = str(rschema) + if str_rschema in relations: + relations[str_rschema].append((lhsval, rhsval)) + else: + relations[str_rschema] = [(lhsval, rhsval)] + result[i] = newrow + # update entities + repo.glob_add_relations(cnx, relations) + for eid, edited in edefs.items(): + repo.glob_update_entity(cnx, edited) + return result + +def _handle_relterm(info, row, newrow): + if info[0] is _CONSTANT: + val = info[1] + else: # _FROM_SUBSTEP + val = row[info[1]] + if info[-1] is not None: + newrow.append(val) + return val