server/ssplanner.py
changeset 11057 0b59724cb3f2
parent 11052 058bb3dc685f
child 11058 23eb30449fe5
--- a/server/ssplanner.py	Mon Jan 04 18:40:30 2016 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,543 +0,0 @@
-# copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
-# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
-#
-# This file is part of CubicWeb.
-#
-# CubicWeb is free software: you can redistribute it and/or modify it under the
-# terms of the GNU Lesser General Public License as published by the Free
-# Software Foundation, either version 2.1 of the License, or (at your option)
-# any later version.
-#
-# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
-# details.
-#
-# You should have received a copy of the GNU Lesser General Public License along
-# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
-"""plan execution of rql queries on a single source"""
-
-__docformat__ = "restructuredtext en"
-
-from six import text_type
-
-from rql.stmts import Union, Select
-from rql.nodes import Constant, Relation
-
-from cubicweb import QueryError
-from cubicweb.schema import VIRTUAL_RTYPES
-from cubicweb.rqlrewrite import add_types_restriction
-from cubicweb.server.edition import EditedEntity
-
-READ_ONLY_RTYPES = set(('eid', 'has_text', 'is', 'is_instance_of', 'identity'))
-
-_CONSTANT = object()
-_FROM_SUBSTEP = object()
-
-def _extract_const_attributes(plan, rqlst, to_build):
-    """add constant values to entity def, mark variables to be selected
-    """
-    to_select = {}
-    for relation in rqlst.main_relations:
-        lhs, rhs = relation.get_variable_parts()
-        rtype = relation.r_type
-        if rtype in READ_ONLY_RTYPES:
-            raise QueryError("can't assign to %s" % rtype)
-        try:
-            edef = to_build[str(lhs)]
-        except KeyError:
-            # lhs var is not to build, should be selected and added as an
-            # object relation
-            edef = to_build[str(rhs)]
-            to_select.setdefault(edef, []).append((rtype, lhs, 1))
-        else:
-            if isinstance(rhs, Constant) and not rhs.uid:
-                # add constant values to entity def
-                value = rhs.eval(plan.args)
-                eschema = edef.entity.e_schema
-                attrtype = eschema.subjrels[rtype].objects(eschema)[0]
-                if attrtype == 'Password' and isinstance(value, text_type):
-                    value = value.encode('UTF8')
-                edef.edited_attribute(rtype, value)
-            elif str(rhs) in to_build:
-                # create a relation between two newly created variables
-                plan.add_relation_def((edef, rtype, to_build[rhs.name]))
-            else:
-                to_select.setdefault(edef, []).append( (rtype, rhs, 0) )
-    return to_select
-
-def _extract_eid_consts(plan, rqlst):
-    """return a dict mapping rqlst variable object to their eid if specified in
-    the syntax tree
-    """
-    cnx = plan.cnx
-    if rqlst.where is None:
-        return {}
-    eidconsts = {}
-    neweids = cnx.transaction_data.get('neweids', ())
-    checkread = cnx.read_security
-    eschema = cnx.vreg.schema.eschema
-    for rel in rqlst.where.get_nodes(Relation):
-        # only care for 'eid' relations ...
-        if (rel.r_type == 'eid'
-            # ... that are not part of a NOT clause ...
-            and not rel.neged(strict=True)
-            # ... and where eid is specified by '=' operator.
-            and rel.children[1].operator == '='):
-            lhs, rhs = rel.get_variable_parts()
-            if isinstance(rhs, Constant):
-                eid = int(rhs.eval(plan.args))
-                # check read permission here since it may not be done by
-                # the generated select substep if not emited (eg nothing
-                # to be selected)
-                if checkread and eid not in neweids:
-                    with cnx.security_enabled(read=False):
-                        eschema(cnx.entity_metas(eid)['type']).check_perm(
-                            cnx, 'read', eid=eid)
-                eidconsts[lhs.variable] = eid
-    return eidconsts
-
-def _build_substep_query(select, origrqlst):
-    """Finalize substep select query that should be executed to get proper
-    selection of stuff to insert/update.
-
-    Return None when no query actually needed, else the given select node that
-    will be used as substep query.
-    """
-    if origrqlst.where is not None and not select.selection:
-        # no selection, append one randomly by searching for a relation which is
-        # not neged neither a type restriction (is/is_instance_of)
-        for rel in origrqlst.where.iget_nodes(Relation):
-            if not (rel.neged(traverse_scope=True) or rel.is_types_restriction()):
-                select.append_selected(rel.children[0].copy(select))
-                break
-        else:
-            return None
-    if select.selection:
-        if origrqlst.where is not None:
-            select.set_where(origrqlst.where.copy(select))
-        if getattr(origrqlst, 'having', None):
-            select.set_having([sq.copy(select) for sq in origrqlst.having])
-        return select
-    return None
-
-class SSPlanner(object):
-    """SingleSourcePlanner: build execution plan for rql queries
-
-    optimized for single source repositories
-    """
-
-    def __init__(self, schema, rqlhelper):
-        self.schema = schema
-        self.rqlhelper = rqlhelper
-
-    def build_plan(self, plan):
-        """build an execution plan from a RQL query
-
-        do nothing here, dispatch according to the statement type
-        """
-        build_plan = getattr(self, 'build_%s_plan' % plan.rqlst.TYPE)
-        for step in build_plan(plan, plan.rqlst):
-            plan.add_step(step)
-
-    def build_select_plan(self, plan, rqlst):
-        """build execution plan for a SELECT RQL query. Suppose only one source
-        is available and so avoid work need for query decomposition among sources
-
-        the rqlst should not be tagged at this point.
-        """
-        plan.preprocess(rqlst)
-        return (OneFetchStep(plan, rqlst),)
-
-    def build_insert_plan(self, plan, rqlst):
-        """get an execution plan from an INSERT RQL query"""
-        # each variable in main variables is a new entity to insert
-        to_build = {}
-        cnx = plan.cnx
-        etype_class = cnx.vreg['etypes'].etype_class
-        for etype, var in rqlst.main_variables:
-            # need to do this since entity class is shared w. web client code !
-            to_build[var.name] = EditedEntity(etype_class(etype)(cnx))
-            plan.add_entity_def(to_build[var.name])
-        # add constant values to entity def, mark variables to be selected
-        to_select = _extract_const_attributes(plan, rqlst, to_build)
-        # add necessary steps to add relations and update attributes
-        step = InsertStep(plan) # insert each entity and its relations
-        step.children += self._compute_relation_steps(plan, rqlst, to_select)
-        return (step,)
-
-    def _compute_relation_steps(self, plan, rqlst, to_select):
-        """handle the selection of relations for an insert query"""
-        eidconsts = _extract_eid_consts(plan, rqlst)
-        for edef, rdefs in to_select.items():
-            # create a select rql st to fetch needed data
-            select = Select()
-            eschema = edef.entity.e_schema
-            for i, (rtype, term, reverse) in enumerate(rdefs):
-                if getattr(term, 'variable', None) in eidconsts:
-                    value = eidconsts[term.variable]
-                else:
-                    select.append_selected(term.copy(select))
-                    value = _FROM_SUBSTEP
-                if reverse:
-                    rdefs[i] = (rtype, InsertRelationsStep.REVERSE_RELATION, value)
-                else:
-                    rschema = eschema.subjrels[rtype]
-                    if rschema.final or rschema.inlined:
-                        rdefs[i] = (rtype, InsertRelationsStep.FINAL, value)
-                    else:
-                        rdefs[i] = (rtype, InsertRelationsStep.RELATION, value)
-            step = InsertRelationsStep(plan, edef, rdefs)
-            select = _build_substep_query(select, rqlst)
-            if select is not None:
-                step.children += self._select_plan(plan, select, rqlst.solutions)
-            yield step
-
-    def build_delete_plan(self, plan, rqlst):
-        """get an execution plan from a DELETE RQL query"""
-        # build a select query to fetch entities to delete
-        steps = []
-        for etype, var in rqlst.main_variables:
-            step = DeleteEntitiesStep(plan)
-            step.children += self._sel_variable_step(plan, rqlst, etype, var)
-            steps.append(step)
-        for relation in rqlst.main_relations:
-            step = DeleteRelationsStep(plan, relation.r_type)
-            step.children += self._sel_relation_steps(plan, rqlst, relation)
-            steps.append(step)
-        return steps
-
-    def _sel_variable_step(self, plan, rqlst, etype, varref):
-        """handle the selection of variables for a delete query"""
-        select = Select()
-        varref = varref.copy(select)
-        select.defined_vars = {varref.name: varref.variable}
-        select.append_selected(varref)
-        if rqlst.where is not None:
-            select.set_where(rqlst.where.copy(select))
-        if getattr(rqlst, 'having', None):
-            select.set_having([x.copy(select) for x in rqlst.having])
-        if etype != 'Any':
-            select.add_type_restriction(varref.variable, etype)
-        return self._select_plan(plan, select, rqlst.solutions)
-
-    def _sel_relation_steps(self, plan, rqlst, relation):
-        """handle the selection of relations for a delete query"""
-        select = Select()
-        lhs, rhs = relation.get_variable_parts()
-        select.append_selected(lhs.copy(select))
-        select.append_selected(rhs.copy(select))
-        select.set_where(relation.copy(select))
-        if rqlst.where is not None:
-            select.add_restriction(rqlst.where.copy(select))
-        if getattr(rqlst, 'having', None):
-            select.set_having([x.copy(select) for x in rqlst.having])
-        return self._select_plan(plan, select, rqlst.solutions)
-
-    def build_set_plan(self, plan, rqlst):
-        """get an execution plan from an SET RQL query"""
-        getrschema = self.schema.rschema
-        select = Select()   # potential substep query
-        selectedidx = {}    # local state
-        attributes = set()  # edited attributes
-        updatedefs = []     # definition of update attributes/relations
-        selidx = residx = 0 # substep selection / resulting rset indexes
-        # search for eid const in the WHERE clause
-        eidconsts = _extract_eid_consts(plan, rqlst)
-        # build `updatedefs` describing things to update and add necessary
-        # variables to the substep selection
-        for i, relation in enumerate(rqlst.main_relations):
-            if relation.r_type in VIRTUAL_RTYPES:
-                raise QueryError('can not assign to %r relation'
-                                 % relation.r_type)
-            lhs, rhs = relation.get_variable_parts()
-            lhskey = lhs.as_string()
-            if not lhskey in selectedidx:
-                if lhs.variable in eidconsts:
-                    eid = eidconsts[lhs.variable]
-                    lhsinfo = (_CONSTANT, eid, residx)
-                else:
-                    select.append_selected(lhs.copy(select))
-                    lhsinfo = (_FROM_SUBSTEP, selidx, residx)
-                    selidx += 1
-                residx += 1
-                selectedidx[lhskey] = lhsinfo
-            else:
-                lhsinfo = selectedidx[lhskey][:-1] + (None,)
-            rhskey = rhs.as_string()
-            if not rhskey in selectedidx:
-                if isinstance(rhs, Constant):
-                    rhsinfo = (_CONSTANT, rhs.eval(plan.args), residx)
-                elif getattr(rhs, 'variable', None) in eidconsts:
-                    eid = eidconsts[rhs.variable]
-                    rhsinfo = (_CONSTANT, eid, residx)
-                else:
-                    select.append_selected(rhs.copy(select))
-                    rhsinfo = (_FROM_SUBSTEP, selidx, residx)
-                    selidx += 1
-                residx += 1
-                selectedidx[rhskey] = rhsinfo
-            else:
-                rhsinfo = selectedidx[rhskey][:-1] + (None,)
-            rschema = getrschema(relation.r_type)
-            updatedefs.append( (lhsinfo, rhsinfo, rschema) )
-        # the update step
-        step = UpdateStep(plan, updatedefs)
-        # when necessary add substep to fetch yet unknown values
-        select = _build_substep_query(select, rqlst)
-        if select is not None:
-            # set distinct to avoid potential duplicate key error
-            select.distinct = True
-            step.children += self._select_plan(plan, select, rqlst.solutions)
-        return (step,)
-
-    # internal methods ########################################################
-
-    def _select_plan(self, plan, select, solutions):
-        union = Union()
-        union.append(select)
-        select.clean_solutions(solutions)
-        add_types_restriction(self.schema, select)
-        self.rqlhelper.annotate(union)
-        return self.build_select_plan(plan, union)
-
-
-# execution steps and helper functions ########################################
-
-def varmap_test_repr(varmap, tablesinorder):
-    if varmap is None:
-        return varmap
-    maprepr = {}
-    for var, sql in varmap.items():
-        table, col = sql.split('.')
-        maprepr[var] = '%s.%s' % (tablesinorder[table], col)
-    return maprepr
-
-class Step(object):
-    """base abstract class for execution step"""
-    def __init__(self, plan):
-        self.plan = plan
-        self.children = []
-
-    def execute_child(self):
-        assert len(self.children) == 1
-        return self.children[0].execute()
-
-    def execute_children(self):
-        for step in self.children:
-            step.execute()
-
-    def execute(self):
-        """execute this step and store partial (eg this step) results"""
-        raise NotImplementedError()
-
-    def mytest_repr(self):
-        """return a representation of this step suitable for test"""
-        return (self.__class__.__name__,)
-
-    def test_repr(self):
-        """return a representation of this step suitable for test"""
-        return self.mytest_repr() + (
-            [step.test_repr() for step in self.children],)
-
-
-class OneFetchStep(Step):
-    """step consisting in fetching data from sources and directly returning
-    results
-    """
-    def __init__(self, plan, union, inputmap=None):
-        Step.__init__(self, plan)
-        self.union = union
-        self.inputmap = inputmap
-
-    def execute(self):
-        """call .syntax_tree_search with the given syntax tree on each
-        source for each solution
-        """
-        self.execute_children()
-        cnx = self.plan.cnx
-        args = self.plan.args
-        inputmap = self.inputmap
-        union = self.union
-        # do we have to use a inputmap from a previous step ? If so disable
-        # cachekey
-        if inputmap or self.plan.cache_key is None:
-            cachekey = None
-        # union may have been splited into subqueries, in which case we can't
-        # use plan.cache_key, rebuild a cache key
-        elif isinstance(self.plan.cache_key, tuple):
-            cachekey = list(self.plan.cache_key)
-            cachekey[0] = union.as_string()
-            cachekey = tuple(cachekey)
-        else:
-            cachekey = union.as_string()
-        # get results for query
-        source = cnx.repo.system_source
-        result = source.syntax_tree_search(cnx, union, args, cachekey, inputmap)
-        #print 'ONEFETCH RESULT %s' % (result)
-        return result
-
-    def mytest_repr(self):
-        """return a representation of this step suitable for test"""
-        try:
-            inputmap = varmap_test_repr(self.inputmap, self.plan.tablesinorder)
-        except AttributeError:
-            inputmap = self.inputmap
-        return (self.__class__.__name__,
-                sorted((r.as_string(kwargs=self.plan.args), r.solutions)
-                       for r in self.union.children),
-                inputmap)
-
-
-# UPDATE/INSERT/DELETE steps ##################################################
-
-class InsertRelationsStep(Step):
-    """step consisting in adding attributes/relations to entity defs from a
-    previous FetchStep
-
-    relations values comes from the latest result, with one columns for
-    each relation defined in self.rdefs
-
-    for one entity definition, we'll construct N entity, where N is the
-    number of the latest result
-    """
-
-    FINAL = 0
-    RELATION = 1
-    REVERSE_RELATION = 2
-
-    def __init__(self, plan, edef, rdefs):
-        Step.__init__(self, plan)
-        # partial entity definition to expand
-        self.edef = edef
-        # definition of relations to complete
-        self.rdefs = rdefs
-
-    def execute(self):
-        """execute this step"""
-        base_edef = self.edef
-        edefs = []
-        if self.children:
-            result = self.execute_child()
-        else:
-            result = [[]]
-        for row in result:
-            # get a new entity definition for this row
-            edef = base_edef.clone()
-            # complete this entity def using row values
-            index = 0
-            for rtype, rorder, value in self.rdefs:
-                if value is _FROM_SUBSTEP:
-                    value = row[index]
-                    index += 1
-                if rorder == InsertRelationsStep.FINAL:
-                    edef.edited_attribute(rtype, value)
-                elif rorder == InsertRelationsStep.RELATION:
-                    self.plan.add_relation_def( (edef, rtype, value) )
-                    edef.querier_pending_relations[(rtype, 'subject')] = value
-                else:
-                    self.plan.add_relation_def( (value, rtype, edef) )
-                    edef.querier_pending_relations[(rtype, 'object')] = value
-            edefs.append(edef)
-        self.plan.substitute_entity_def(base_edef, edefs)
-        return result
-
-
-class InsertStep(Step):
-    """step consisting in inserting new entities / relations"""
-
-    def execute(self):
-        """execute this step"""
-        for step in self.children:
-            assert isinstance(step, InsertRelationsStep)
-            step.plan = self.plan
-            step.execute()
-        # insert entities first
-        result = self.plan.insert_entity_defs()
-        # then relation
-        self.plan.insert_relation_defs()
-        # return eids of inserted entities
-        return result
-
-
-class DeleteEntitiesStep(Step):
-    """step consisting in deleting entities"""
-
-    def execute(self):
-        """execute this step"""
-        results = self.execute_child()
-        if results:
-            todelete = frozenset(int(eid) for eid, in results)
-            cnx = self.plan.cnx
-            cnx.repo.glob_delete_entities(cnx, todelete)
-        return results
-
-class DeleteRelationsStep(Step):
-    """step consisting in deleting relations"""
-
-    def __init__(self, plan, rtype):
-        Step.__init__(self, plan)
-        self.rtype = rtype
-
-    def execute(self):
-        """execute this step"""
-        cnx = self.plan.cnx
-        delete = cnx.repo.glob_delete_relation
-        for subj, obj in self.execute_child():
-            delete(cnx, subj, self.rtype, obj)
-
-
-class UpdateStep(Step):
-    """step consisting in updating entities / adding relations from relations
-    definitions and from results fetched in previous step
-    """
-
-    def __init__(self, plan, updatedefs):
-        Step.__init__(self, plan)
-        self.updatedefs = updatedefs
-
-    def execute(self):
-        """execute this step"""
-        cnx = self.plan.cnx
-        repo = cnx.repo
-        edefs = {}
-        relations = {}
-        # insert relations
-        if self.children:
-            result = self.execute_child()
-        else:
-            result = [[]]
-        for i, row in enumerate(result):
-            newrow = []
-            for (lhsinfo, rhsinfo, rschema) in self.updatedefs:
-                lhsval = _handle_relterm(lhsinfo, row, newrow)
-                rhsval = _handle_relterm(rhsinfo, row, newrow)
-                if rschema.final or rschema.inlined:
-                    eid = int(lhsval)
-                    try:
-                        edited = edefs[eid]
-                    except KeyError:
-                        edef = cnx.entity_from_eid(eid)
-                        edefs[eid] = edited = EditedEntity(edef)
-                    edited.edited_attribute(str(rschema), rhsval)
-                else:
-                    str_rschema = str(rschema)
-                    if str_rschema in relations:
-                        relations[str_rschema].append((lhsval, rhsval))
-                    else:
-                        relations[str_rschema] = [(lhsval, rhsval)]
-            result[i] = newrow
-        # update entities
-        repo.glob_add_relations(cnx, relations)
-        for eid, edited in edefs.items():
-            repo.glob_update_entity(cnx, edited)
-        return result
-
-def _handle_relterm(info, row, newrow):
-    if info[0] is _CONSTANT:
-        val = info[1]
-    else: # _FROM_SUBSTEP
-        val = row[info[1]]
-    if info[-1] is not None:
-        newrow.append(val)
-    return val