server/ssplanner.py
author Sylvain Thénault <sylvain.thenault@logilab.fr>
Fri, 09 Oct 2009 16:31:06 +0200
branchstable
changeset 3628 440931181322
parent 3437 a30b5b5138a4
child 3648 665c37544060
child 3689 deb13e88e037
permissions -rw-r--r--
[wf engine] support for subwf exit point with no destination state: go back to state from which we entered into the subworkflow

"""plan execution of rql queries on a single source

:organization: Logilab
:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
"""
__docformat__ = "restructuredtext en"

from copy import copy

from rql.stmts import Union, Select
from rql.nodes import Constant

from cubicweb import QueryError, typed_eid
from cubicweb.schema import VIRTUAL_RTYPES
from cubicweb.rqlrewrite import add_types_restriction


class SSPlanner(object):
    """SingleSourcePlanner: build execution plan for rql queries

    optimized for single source repositories
    """

    def __init__(self, schema, rqlhelper):
        self.schema = schema
        self.rqlhelper = rqlhelper

    def build_plan(self, plan):
        """build an execution plan from a RQL query

        do nothing here, dispatch according to the statement type
        """
        build_plan = getattr(self, 'build_%s_plan' % plan.rqlst.TYPE)
        for step in build_plan(plan, plan.rqlst):
            plan.add_step(step)

    def build_select_plan(self, plan, rqlst):
        """build execution plan for a SELECT RQL query. Suppose only one source
        is available and so avoid work need for query decomposition among sources

        the rqlst should not be tagged at this point.
        """
        plan.preprocess(rqlst)
        return (OneFetchStep(plan, rqlst, plan.session.repo.sources),)

    def build_insert_plan(self, plan, rqlst):
        """get an execution plan from an INSERT RQL query"""
        # each variable in main variables is a new entity to insert
        to_build = {}
        session = plan.session
        etype_class = session.vreg['etypes'].etype_class
        for etype, var in rqlst.main_variables:
            # need to do this since entity class is shared w. web client code !
            to_build[var.name] = etype_class(etype)(session)
            plan.add_entity_def(to_build[var.name])
        # add constant values to entity def, mark variables to be selected
        to_select = plan.relation_definitions(rqlst, to_build)
        # add necessary steps to add relations and update attributes
        step = InsertStep(plan) # insert each entity and its relations
        step.children += self._compute_relation_steps(plan, rqlst.solutions,
                                                      rqlst.where, to_select)
        return (step,)

    def _compute_relation_steps(self, plan, solutions, restriction, to_select):
        """handle the selection of relations for an insert query"""
        for edef, rdefs in to_select.items():
            # create a select rql st to fetch needed data
            select = Select()
            eschema = edef.e_schema
            for i in range(len(rdefs)):
                rtype, term, reverse = rdefs[i]
                select.append_selected(term.copy(select))
                if reverse:
                    rdefs[i] = rtype, RelationsStep.REVERSE_RELATION
                else:
                    rschema = eschema.subject_relation(rtype)
                    if rschema.is_final() or rschema.inlined:
                        rdefs[i] = rtype, RelationsStep.FINAL
                    else:
                        rdefs[i] = rtype, RelationsStep.RELATION
            if restriction is not None:
                select.set_where(restriction.copy(select))
            step = RelationsStep(plan, edef, rdefs)
            step.children += self._select_plan(plan, select, solutions)
            yield step

    def build_delete_plan(self, plan, rqlst):
        """get an execution plan from a DELETE RQL query"""
        # build a select query to fetch entities to delete
        steps = []
        for etype, var in rqlst.main_variables:
            step = DeleteEntitiesStep(plan)
            step.children += self._sel_variable_step(plan, rqlst.solutions,
                                                     rqlst.where, etype, var)
            steps.append(step)
        for relation in rqlst.main_relations:
            step = DeleteRelationsStep(plan, relation.r_type)
            step.children += self._sel_relation_steps(plan, rqlst.solutions,
                                                      rqlst.where, relation)
            steps.append(step)
        return steps

    def _sel_variable_step(self, plan, solutions, restriction, etype, varref):
        """handle the selection of variables for a delete query"""
        select = Select()
        varref = varref.copy(select)
        select.defined_vars = {varref.name: varref.variable}
        select.append_selected(varref)
        if restriction is not None:
            select.set_where(restriction.copy(select))
        if etype != 'Any':
            select.add_type_restriction(varref.variable, etype)
        return self._select_plan(plan, select, solutions)

    def _sel_relation_steps(self, plan, solutions, restriction, relation):
        """handle the selection of relations for a delete query"""
        select = Select()
        lhs, rhs = relation.get_variable_parts()
        select.append_selected(lhs.copy(select))
        select.append_selected(rhs.copy(select))
        select.set_where(relation.copy(select))
        if restriction is not None:
            select.add_restriction(restriction.copy(select))
        return self._select_plan(plan, select, solutions)

    def build_set_plan(self, plan, rqlst):
        """get an execution plan from an SET RQL query"""
        select = Select()
        # extract variables to add to the selection
        selected_index = {}
        index = 0
        relations, attrrelations = [], []
        getrschema = self.schema.rschema
        for relation in rqlst.main_relations:
            if relation.r_type in VIRTUAL_RTYPES:
                raise QueryError('can not assign to %r relation'
                                 % relation.r_type)
            lhs, rhs = relation.get_variable_parts()
            if not lhs.as_string('utf-8') in selected_index:
                select.append_selected(lhs.copy(select))
                selected_index[lhs.as_string('utf-8')] = index
                index += 1
            if not rhs.as_string('utf-8') in selected_index:
                select.append_selected(rhs.copy(select))
                selected_index[rhs.as_string('utf-8')] = index
                index += 1
            rschema = getrschema(relation.r_type)
            if rschema.is_final() or rschema.inlined:
                attrrelations.append(relation)
            else:
                relations.append(relation)
        # add step necessary to fetch all selected variables values
        if rqlst.where is not None:
            select.set_where(rqlst.where.copy(select))
        # set distinct to avoid potential duplicate key error
        select.distinct = True
        step = UpdateStep(plan, attrrelations, relations, selected_index)
        step.children += self._select_plan(plan, select, rqlst.solutions)
        return (step,)

    # internal methods ########################################################

    def _select_plan(self, plan, select, solutions):
        union = Union()
        union.append(select)
        select.clean_solutions(solutions)
        add_types_restriction(self.schema, select)
        self.rqlhelper.annotate(union)
        return self.build_select_plan(plan, union)


# execution steps and helper functions ########################################

def varmap_test_repr(varmap, tablesinorder):
    if varmap is None:
        return varmap
    maprepr = {}
    for var, sql in varmap.iteritems():
        table, col = sql.split('.')
        maprepr[var] = '%s.%s' % (tablesinorder[table], col)
    return maprepr

def offset_result(offset, result):
    offset -= len(result)
    if offset < 0:
        result = result[offset:]
        offset = None
    elif offset == 0:
        offset = None
        result = ()
    return offset, result


class LimitOffsetMixIn(object):
    limit = offset = None
    def set_limit_offset(self, limit, offset):
        self.limit = limit
        self.offset = offset or None


class Step(object):
    """base abstract class for execution step"""
    def __init__(self, plan):
        self.plan = plan
        self.children = []

    def execute_child(self):
        assert len(self.children) == 1
        return self.children[0].execute()

    def execute_children(self):
        for step in self.children:
            step.execute()

    def execute(self):
        """execute this step and store partial (eg this step) results"""
        raise NotImplementedError()

    def mytest_repr(self):
        """return a representation of this step suitable for test"""
        return (self.__class__.__name__,)

    def test_repr(self):
        """return a representation of this step suitable for test"""
        return self.mytest_repr() + (
            [step.test_repr() for step in self.children],)


class OneFetchStep(LimitOffsetMixIn, Step):
    """step consisting in fetching data from sources and directly returning
    results
    """
    def __init__(self, plan, union, sources, inputmap=None):
        Step.__init__(self, plan)
        self.union = union
        self.sources = sources
        self.inputmap = inputmap
        self.set_limit_offset(union.children[-1].limit, union.children[-1].offset)

    def set_limit_offset(self, limit, offset):
        LimitOffsetMixIn.set_limit_offset(self, limit, offset)
        for select in self.union.children:
            select.limit = limit
            select.offset = offset

    def execute(self):
        """call .syntax_tree_search with the given syntax tree on each
        source for each solution
        """
        self.execute_children()
        session = self.plan.session
        args = self.plan.args
        inputmap = self.inputmap
        union = self.union
        # do we have to use a inputmap from a previous step ? If so disable
        # cachekey
        if inputmap or self.plan.cache_key is None:
            cachekey = None
        # union may have been splited into subqueries, rebuild a cache key
        elif isinstance(self.plan.cache_key, tuple):
            cachekey = list(self.plan.cache_key)
            cachekey[0] = union.as_string()
            cachekey = tuple(cachekey)
        else:
            cachekey = union.as_string()
        result = []
        # limit / offset processing
        limit = self.limit
        offset = self.offset
        if offset is not None:
            if len(self.sources) > 1:
                # we'll have to deal with limit/offset by ourself
                if union.children[-1].limit:
                    union.children[-1].limit = limit + offset
                union.children[-1].offset = None
            else:
                offset, limit = None, None
        for source in self.sources:
            if offset is None and limit is not None:
                # modifying the sample rqlst is enough since sql generation
                # will pick it here as well
                union.children[-1].limit = limit - len(result)
            result_ = source.syntax_tree_search(session, union, args, cachekey,
                                                inputmap)
            if offset is not None:
                offset, result_ = offset_result(offset, result_)
            result += result_
            if limit is not None:
                if len(result) >= limit:
                    return result[:limit]
        #print 'ONEFETCH RESULT %s' % (result)
        return result

    def mytest_repr(self):
        """return a representation of this step suitable for test"""
        try:
            inputmap = varmap_test_repr(self.inputmap, self.plan.tablesinorder)
        except AttributeError:
            inputmap = self.inputmap
        return (self.__class__.__name__,
                sorted((r.as_string(kwargs=self.plan.args), r.solutions)
                       for r in self.union.children),
                self.limit, self.offset,
                sorted(self.sources), inputmap)


# UPDATE/INSERT/DELETE steps ##################################################

class RelationsStep(Step):
    """step consisting in adding attributes/relations to entity defs from a
    previous FetchStep

    relations values comes from the latest result, with one columns for
    each relation defined in self.rdefs

    for one entity definition, we'll construct N entity, where N is the
    number of the latest result
    """

    FINAL = 0
    RELATION = 1
    REVERSE_RELATION = 2

    def __init__(self, plan, edef, rdefs):
        Step.__init__(self, plan)
        # partial entity definition to expand
        self.edef = edef
        # definition of relations to complete
        self.rdefs = rdefs

    def execute(self):
        """execute this step"""
        base_edef = self.edef
        edefs = []
        result = self.execute_child()
        for row in result:
            # get a new entity definition for this row
            edef = copy(base_edef)
            # complete this entity def using row values
            for i in range(len(self.rdefs)):
                rtype, rorder = self.rdefs[i]
                if rorder == RelationsStep.FINAL:
                    edef[rtype] = row[i]
                elif rorder == RelationsStep.RELATION:
                    self.plan.add_relation_def( (edef, rtype, row[i]) )
                    edef.querier_pending_relations[(rtype, 'subject')] = row[i]
                else:
                    self.plan.add_relation_def( (row[i], rtype, edef) )
                    edef.querier_pending_relations[(rtype, 'object')] = row[i]
            edefs.append(edef)
        self.plan.substitute_entity_def(base_edef, edefs)
        return result


class InsertStep(Step):
    """step consisting in inserting new entities / relations"""

    def execute(self):
        """execute this step"""
        for step in self.children:
            assert isinstance(step, RelationsStep)
            step.plan = self.plan
            step.execute()
        # insert entities first
        result = self.plan.insert_entity_defs()
        # then relation
        self.plan.insert_relation_defs()
        # return eids of inserted entities
        return result


class DeleteEntitiesStep(Step):
    """step consisting in deleting entities"""

    def execute(self):
        """execute this step"""
        todelete = frozenset(typed_eid(eid) for eid, in self.execute_child())
        session = self.plan.session
        delete = session.repo.glob_delete_entity
        # register pending eids first to avoid multiple deletion
        pending = session.transaction_data.setdefault('pendingeids', set())
        actual = todelete - pending
        pending |= actual
        for eid in actual:
            delete(session, eid)


class DeleteRelationsStep(Step):
    """step consisting in deleting relations"""

    def __init__(self, plan, rtype):
        Step.__init__(self, plan)
        self.rtype = rtype

    def execute(self):
        """execute this step"""
        session = self.plan.session
        delete = session.repo.glob_delete_relation
        for subj, obj in self.execute_child():
            delete(session, subj, self.rtype, obj)


class UpdateStep(Step):
    """step consisting in updating entities / adding relations from relations
    definitions and from results fetched in previous step
    """

    def __init__(self, plan, attribute_relations, relations, selected_index):
        Step.__init__(self, plan)
        self.attribute_relations = attribute_relations
        self.relations = relations
        self.selected_index = selected_index

    def execute(self):
        """execute this step"""
        plan = self.plan
        session = self.plan.session
        repo = session.repo
        edefs = {}
        # insert relations
        attributes = set([relation.r_type for relation in self.attribute_relations])
        result = self.execute_child()
        for row in result:
            for relation in self.attribute_relations:
                lhs, rhs = relation.get_variable_parts()
                eid = typed_eid(row[self.selected_index[str(lhs)]])
                try:
                    edef = edefs[eid]
                except KeyError:
                    edefs[eid] = edef = session.entity_from_eid(eid)
                if isinstance(rhs, Constant):
                    # add constant values to entity def
                    value = rhs.eval(plan.args)
                    edef[relation.r_type] = value
                else:
                    edef[relation.r_type] = row[self.selected_index[str(rhs)]]
            for relation in self.relations:
                subj = row[self.selected_index[str(relation.children[0])]]
                obj = row[self.selected_index[str(relation.children[1])]]
                repo.glob_add_relation(session, subj, relation.r_type, obj)
        # update entities
        for eid, edef in edefs.iteritems():
            repo.glob_update_entity(session, edef, attributes)
        return result