cubicweb/server/ssplanner.py
changeset 11057 0b59724cb3f2
parent 10682 7e111b606005
child 11237 f32134dd0067
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/cubicweb/server/ssplanner.py	Sat Jan 16 13:48:51 2016 +0100
@@ -0,0 +1,543 @@
+# copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
+#
+# This file is part of CubicWeb.
+#
+# CubicWeb is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License along
+# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
+"""plan execution of rql queries on a single source"""
+
+__docformat__ = "restructuredtext en"
+
+from six import text_type
+
+from rql.stmts import Union, Select
+from rql.nodes import Constant, Relation
+
+from cubicweb import QueryError
+from cubicweb.schema import VIRTUAL_RTYPES
+from cubicweb.rqlrewrite import add_types_restriction
+from cubicweb.server.edition import EditedEntity
+
+READ_ONLY_RTYPES = set(('eid', 'has_text', 'is', 'is_instance_of', 'identity'))
+
+_CONSTANT = object()
+_FROM_SUBSTEP = object()
+
+def _extract_const_attributes(plan, rqlst, to_build):
+    """add constant values to entity def, mark variables to be selected
+    """
+    to_select = {}
+    for relation in rqlst.main_relations:
+        lhs, rhs = relation.get_variable_parts()
+        rtype = relation.r_type
+        if rtype in READ_ONLY_RTYPES:
+            raise QueryError("can't assign to %s" % rtype)
+        try:
+            edef = to_build[str(lhs)]
+        except KeyError:
+            # lhs var is not to build, should be selected and added as an
+            # object relation
+            edef = to_build[str(rhs)]
+            to_select.setdefault(edef, []).append((rtype, lhs, 1))
+        else:
+            if isinstance(rhs, Constant) and not rhs.uid:
+                # add constant values to entity def
+                value = rhs.eval(plan.args)
+                eschema = edef.entity.e_schema
+                attrtype = eschema.subjrels[rtype].objects(eschema)[0]
+                if attrtype == 'Password' and isinstance(value, text_type):
+                    value = value.encode('UTF8')
+                edef.edited_attribute(rtype, value)
+            elif str(rhs) in to_build:
+                # create a relation between two newly created variables
+                plan.add_relation_def((edef, rtype, to_build[rhs.name]))
+            else:
+                to_select.setdefault(edef, []).append( (rtype, rhs, 0) )
+    return to_select
+
+def _extract_eid_consts(plan, rqlst):
+    """return a dict mapping rqlst variable object to their eid if specified in
+    the syntax tree
+    """
+    cnx = plan.cnx
+    if rqlst.where is None:
+        return {}
+    eidconsts = {}
+    neweids = cnx.transaction_data.get('neweids', ())
+    checkread = cnx.read_security
+    eschema = cnx.vreg.schema.eschema
+    for rel in rqlst.where.get_nodes(Relation):
+        # only care for 'eid' relations ...
+        if (rel.r_type == 'eid'
+            # ... that are not part of a NOT clause ...
+            and not rel.neged(strict=True)
+            # ... and where eid is specified by '=' operator.
+            and rel.children[1].operator == '='):
+            lhs, rhs = rel.get_variable_parts()
+            if isinstance(rhs, Constant):
+                eid = int(rhs.eval(plan.args))
+                # check read permission here since it may not be done by
+                # the generated select substep if not emited (eg nothing
+                # to be selected)
+                if checkread and eid not in neweids:
+                    with cnx.security_enabled(read=False):
+                        eschema(cnx.entity_metas(eid)['type']).check_perm(
+                            cnx, 'read', eid=eid)
+                eidconsts[lhs.variable] = eid
+    return eidconsts
+
+def _build_substep_query(select, origrqlst):
+    """Finalize substep select query that should be executed to get proper
+    selection of stuff to insert/update.
+
+    Return None when no query actually needed, else the given select node that
+    will be used as substep query.
+    """
+    if origrqlst.where is not None and not select.selection:
+        # no selection, append one randomly by searching for a relation which is
+        # not neged neither a type restriction (is/is_instance_of)
+        for rel in origrqlst.where.iget_nodes(Relation):
+            if not (rel.neged(traverse_scope=True) or rel.is_types_restriction()):
+                select.append_selected(rel.children[0].copy(select))
+                break
+        else:
+            return None
+    if select.selection:
+        if origrqlst.where is not None:
+            select.set_where(origrqlst.where.copy(select))
+        if getattr(origrqlst, 'having', None):
+            select.set_having([sq.copy(select) for sq in origrqlst.having])
+        return select
+    return None
+
+class SSPlanner(object):
+    """SingleSourcePlanner: build execution plan for rql queries
+
+    optimized for single source repositories
+    """
+
+    def __init__(self, schema, rqlhelper):
+        self.schema = schema
+        self.rqlhelper = rqlhelper
+
+    def build_plan(self, plan):
+        """build an execution plan from a RQL query
+
+        do nothing here, dispatch according to the statement type
+        """
+        build_plan = getattr(self, 'build_%s_plan' % plan.rqlst.TYPE)
+        for step in build_plan(plan, plan.rqlst):
+            plan.add_step(step)
+
+    def build_select_plan(self, plan, rqlst):
+        """build execution plan for a SELECT RQL query. Suppose only one source
+        is available and so avoid work need for query decomposition among sources
+
+        the rqlst should not be tagged at this point.
+        """
+        plan.preprocess(rqlst)
+        return (OneFetchStep(plan, rqlst),)
+
+    def build_insert_plan(self, plan, rqlst):
+        """get an execution plan from an INSERT RQL query"""
+        # each variable in main variables is a new entity to insert
+        to_build = {}
+        cnx = plan.cnx
+        etype_class = cnx.vreg['etypes'].etype_class
+        for etype, var in rqlst.main_variables:
+            # need to do this since entity class is shared w. web client code !
+            to_build[var.name] = EditedEntity(etype_class(etype)(cnx))
+            plan.add_entity_def(to_build[var.name])
+        # add constant values to entity def, mark variables to be selected
+        to_select = _extract_const_attributes(plan, rqlst, to_build)
+        # add necessary steps to add relations and update attributes
+        step = InsertStep(plan) # insert each entity and its relations
+        step.children += self._compute_relation_steps(plan, rqlst, to_select)
+        return (step,)
+
+    def _compute_relation_steps(self, plan, rqlst, to_select):
+        """handle the selection of relations for an insert query"""
+        eidconsts = _extract_eid_consts(plan, rqlst)
+        for edef, rdefs in to_select.items():
+            # create a select rql st to fetch needed data
+            select = Select()
+            eschema = edef.entity.e_schema
+            for i, (rtype, term, reverse) in enumerate(rdefs):
+                if getattr(term, 'variable', None) in eidconsts:
+                    value = eidconsts[term.variable]
+                else:
+                    select.append_selected(term.copy(select))
+                    value = _FROM_SUBSTEP
+                if reverse:
+                    rdefs[i] = (rtype, InsertRelationsStep.REVERSE_RELATION, value)
+                else:
+                    rschema = eschema.subjrels[rtype]
+                    if rschema.final or rschema.inlined:
+                        rdefs[i] = (rtype, InsertRelationsStep.FINAL, value)
+                    else:
+                        rdefs[i] = (rtype, InsertRelationsStep.RELATION, value)
+            step = InsertRelationsStep(plan, edef, rdefs)
+            select = _build_substep_query(select, rqlst)
+            if select is not None:
+                step.children += self._select_plan(plan, select, rqlst.solutions)
+            yield step
+
+    def build_delete_plan(self, plan, rqlst):
+        """get an execution plan from a DELETE RQL query"""
+        # build a select query to fetch entities to delete
+        steps = []
+        for etype, var in rqlst.main_variables:
+            step = DeleteEntitiesStep(plan)
+            step.children += self._sel_variable_step(plan, rqlst, etype, var)
+            steps.append(step)
+        for relation in rqlst.main_relations:
+            step = DeleteRelationsStep(plan, relation.r_type)
+            step.children += self._sel_relation_steps(plan, rqlst, relation)
+            steps.append(step)
+        return steps
+
+    def _sel_variable_step(self, plan, rqlst, etype, varref):
+        """handle the selection of variables for a delete query"""
+        select = Select()
+        varref = varref.copy(select)
+        select.defined_vars = {varref.name: varref.variable}
+        select.append_selected(varref)
+        if rqlst.where is not None:
+            select.set_where(rqlst.where.copy(select))
+        if getattr(rqlst, 'having', None):
+            select.set_having([x.copy(select) for x in rqlst.having])
+        if etype != 'Any':
+            select.add_type_restriction(varref.variable, etype)
+        return self._select_plan(plan, select, rqlst.solutions)
+
+    def _sel_relation_steps(self, plan, rqlst, relation):
+        """handle the selection of relations for a delete query"""
+        select = Select()
+        lhs, rhs = relation.get_variable_parts()
+        select.append_selected(lhs.copy(select))
+        select.append_selected(rhs.copy(select))
+        select.set_where(relation.copy(select))
+        if rqlst.where is not None:
+            select.add_restriction(rqlst.where.copy(select))
+        if getattr(rqlst, 'having', None):
+            select.set_having([x.copy(select) for x in rqlst.having])
+        return self._select_plan(plan, select, rqlst.solutions)
+
+    def build_set_plan(self, plan, rqlst):
+        """get an execution plan from an SET RQL query"""
+        getrschema = self.schema.rschema
+        select = Select()   # potential substep query
+        selectedidx = {}    # local state
+        attributes = set()  # edited attributes
+        updatedefs = []     # definition of update attributes/relations
+        selidx = residx = 0 # substep selection / resulting rset indexes
+        # search for eid const in the WHERE clause
+        eidconsts = _extract_eid_consts(plan, rqlst)
+        # build `updatedefs` describing things to update and add necessary
+        # variables to the substep selection
+        for i, relation in enumerate(rqlst.main_relations):
+            if relation.r_type in VIRTUAL_RTYPES:
+                raise QueryError('can not assign to %r relation'
+                                 % relation.r_type)
+            lhs, rhs = relation.get_variable_parts()
+            lhskey = lhs.as_string()
+            if not lhskey in selectedidx:
+                if lhs.variable in eidconsts:
+                    eid = eidconsts[lhs.variable]
+                    lhsinfo = (_CONSTANT, eid, residx)
+                else:
+                    select.append_selected(lhs.copy(select))
+                    lhsinfo = (_FROM_SUBSTEP, selidx, residx)
+                    selidx += 1
+                residx += 1
+                selectedidx[lhskey] = lhsinfo
+            else:
+                lhsinfo = selectedidx[lhskey][:-1] + (None,)
+            rhskey = rhs.as_string()
+            if not rhskey in selectedidx:
+                if isinstance(rhs, Constant):
+                    rhsinfo = (_CONSTANT, rhs.eval(plan.args), residx)
+                elif getattr(rhs, 'variable', None) in eidconsts:
+                    eid = eidconsts[rhs.variable]
+                    rhsinfo = (_CONSTANT, eid, residx)
+                else:
+                    select.append_selected(rhs.copy(select))
+                    rhsinfo = (_FROM_SUBSTEP, selidx, residx)
+                    selidx += 1
+                residx += 1
+                selectedidx[rhskey] = rhsinfo
+            else:
+                rhsinfo = selectedidx[rhskey][:-1] + (None,)
+            rschema = getrschema(relation.r_type)
+            updatedefs.append( (lhsinfo, rhsinfo, rschema) )
+        # the update step
+        step = UpdateStep(plan, updatedefs)
+        # when necessary add substep to fetch yet unknown values
+        select = _build_substep_query(select, rqlst)
+        if select is not None:
+            # set distinct to avoid potential duplicate key error
+            select.distinct = True
+            step.children += self._select_plan(plan, select, rqlst.solutions)
+        return (step,)
+
+    # internal methods ########################################################
+
+    def _select_plan(self, plan, select, solutions):
+        union = Union()
+        union.append(select)
+        select.clean_solutions(solutions)
+        add_types_restriction(self.schema, select)
+        self.rqlhelper.annotate(union)
+        return self.build_select_plan(plan, union)
+
+
+# execution steps and helper functions ########################################
+
+def varmap_test_repr(varmap, tablesinorder):
+    if varmap is None:
+        return varmap
+    maprepr = {}
+    for var, sql in varmap.items():
+        table, col = sql.split('.')
+        maprepr[var] = '%s.%s' % (tablesinorder[table], col)
+    return maprepr
+
+class Step(object):
+    """base abstract class for execution step"""
+    def __init__(self, plan):
+        self.plan = plan
+        self.children = []
+
+    def execute_child(self):
+        assert len(self.children) == 1
+        return self.children[0].execute()
+
+    def execute_children(self):
+        for step in self.children:
+            step.execute()
+
+    def execute(self):
+        """execute this step and store partial (eg this step) results"""
+        raise NotImplementedError()
+
+    def mytest_repr(self):
+        """return a representation of this step suitable for test"""
+        return (self.__class__.__name__,)
+
+    def test_repr(self):
+        """return a representation of this step suitable for test"""
+        return self.mytest_repr() + (
+            [step.test_repr() for step in self.children],)
+
+
+class OneFetchStep(Step):
+    """step consisting in fetching data from sources and directly returning
+    results
+    """
+    def __init__(self, plan, union, inputmap=None):
+        Step.__init__(self, plan)
+        self.union = union
+        self.inputmap = inputmap
+
+    def execute(self):
+        """call .syntax_tree_search with the given syntax tree on each
+        source for each solution
+        """
+        self.execute_children()
+        cnx = self.plan.cnx
+        args = self.plan.args
+        inputmap = self.inputmap
+        union = self.union
+        # do we have to use a inputmap from a previous step ? If so disable
+        # cachekey
+        if inputmap or self.plan.cache_key is None:
+            cachekey = None
+        # union may have been splited into subqueries, in which case we can't
+        # use plan.cache_key, rebuild a cache key
+        elif isinstance(self.plan.cache_key, tuple):
+            cachekey = list(self.plan.cache_key)
+            cachekey[0] = union.as_string()
+            cachekey = tuple(cachekey)
+        else:
+            cachekey = union.as_string()
+        # get results for query
+        source = cnx.repo.system_source
+        result = source.syntax_tree_search(cnx, union, args, cachekey, inputmap)
+        #print 'ONEFETCH RESULT %s' % (result)
+        return result
+
+    def mytest_repr(self):
+        """return a representation of this step suitable for test"""
+        try:
+            inputmap = varmap_test_repr(self.inputmap, self.plan.tablesinorder)
+        except AttributeError:
+            inputmap = self.inputmap
+        return (self.__class__.__name__,
+                sorted((r.as_string(kwargs=self.plan.args), r.solutions)
+                       for r in self.union.children),
+                inputmap)
+
+
+# UPDATE/INSERT/DELETE steps ##################################################
+
+class InsertRelationsStep(Step):
+    """step consisting in adding attributes/relations to entity defs from a
+    previous FetchStep
+
+    relations values comes from the latest result, with one columns for
+    each relation defined in self.rdefs
+
+    for one entity definition, we'll construct N entity, where N is the
+    number of the latest result
+    """
+
+    FINAL = 0
+    RELATION = 1
+    REVERSE_RELATION = 2
+
+    def __init__(self, plan, edef, rdefs):
+        Step.__init__(self, plan)
+        # partial entity definition to expand
+        self.edef = edef
+        # definition of relations to complete
+        self.rdefs = rdefs
+
+    def execute(self):
+        """execute this step"""
+        base_edef = self.edef
+        edefs = []
+        if self.children:
+            result = self.execute_child()
+        else:
+            result = [[]]
+        for row in result:
+            # get a new entity definition for this row
+            edef = base_edef.clone()
+            # complete this entity def using row values
+            index = 0
+            for rtype, rorder, value in self.rdefs:
+                if value is _FROM_SUBSTEP:
+                    value = row[index]
+                    index += 1
+                if rorder == InsertRelationsStep.FINAL:
+                    edef.edited_attribute(rtype, value)
+                elif rorder == InsertRelationsStep.RELATION:
+                    self.plan.add_relation_def( (edef, rtype, value) )
+                    edef.querier_pending_relations[(rtype, 'subject')] = value
+                else:
+                    self.plan.add_relation_def( (value, rtype, edef) )
+                    edef.querier_pending_relations[(rtype, 'object')] = value
+            edefs.append(edef)
+        self.plan.substitute_entity_def(base_edef, edefs)
+        return result
+
+
+class InsertStep(Step):
+    """step consisting in inserting new entities / relations"""
+
+    def execute(self):
+        """execute this step"""
+        for step in self.children:
+            assert isinstance(step, InsertRelationsStep)
+            step.plan = self.plan
+            step.execute()
+        # insert entities first
+        result = self.plan.insert_entity_defs()
+        # then relation
+        self.plan.insert_relation_defs()
+        # return eids of inserted entities
+        return result
+
+
+class DeleteEntitiesStep(Step):
+    """step consisting in deleting entities"""
+
+    def execute(self):
+        """execute this step"""
+        results = self.execute_child()
+        if results:
+            todelete = frozenset(int(eid) for eid, in results)
+            cnx = self.plan.cnx
+            cnx.repo.glob_delete_entities(cnx, todelete)
+        return results
+
+class DeleteRelationsStep(Step):
+    """step consisting in deleting relations"""
+
+    def __init__(self, plan, rtype):
+        Step.__init__(self, plan)
+        self.rtype = rtype
+
+    def execute(self):
+        """execute this step"""
+        cnx = self.plan.cnx
+        delete = cnx.repo.glob_delete_relation
+        for subj, obj in self.execute_child():
+            delete(cnx, subj, self.rtype, obj)
+
+
+class UpdateStep(Step):
+    """step consisting in updating entities / adding relations from relations
+    definitions and from results fetched in previous step
+    """
+
+    def __init__(self, plan, updatedefs):
+        Step.__init__(self, plan)
+        self.updatedefs = updatedefs
+
+    def execute(self):
+        """execute this step"""
+        cnx = self.plan.cnx
+        repo = cnx.repo
+        edefs = {}
+        relations = {}
+        # insert relations
+        if self.children:
+            result = self.execute_child()
+        else:
+            result = [[]]
+        for i, row in enumerate(result):
+            newrow = []
+            for (lhsinfo, rhsinfo, rschema) in self.updatedefs:
+                lhsval = _handle_relterm(lhsinfo, row, newrow)
+                rhsval = _handle_relterm(rhsinfo, row, newrow)
+                if rschema.final or rschema.inlined:
+                    eid = int(lhsval)
+                    try:
+                        edited = edefs[eid]
+                    except KeyError:
+                        edef = cnx.entity_from_eid(eid)
+                        edefs[eid] = edited = EditedEntity(edef)
+                    edited.edited_attribute(str(rschema), rhsval)
+                else:
+                    str_rschema = str(rschema)
+                    if str_rschema in relations:
+                        relations[str_rschema].append((lhsval, rhsval))
+                    else:
+                        relations[str_rschema] = [(lhsval, rhsval)]
+            result[i] = newrow
+        # update entities
+        repo.glob_add_relations(cnx, relations)
+        for eid, edited in edefs.items():
+            repo.glob_update_entity(cnx, edited)
+        return result
+
+def _handle_relterm(info, row, newrow):
+    if info[0] is _CONSTANT:
+        val = info[1]
+    else: # _FROM_SUBSTEP
+        val = row[info[1]]
+    if info[-1] is not None:
+        newrow.append(val)
+    return val