--- a/server/ssplanner.py Mon Jan 04 18:40:30 2016 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,543 +0,0 @@
-# copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
-# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
-#
-# This file is part of CubicWeb.
-#
-# CubicWeb is free software: you can redistribute it and/or modify it under the
-# terms of the GNU Lesser General Public License as published by the Free
-# Software Foundation, either version 2.1 of the License, or (at your option)
-# any later version.
-#
-# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
-# details.
-#
-# You should have received a copy of the GNU Lesser General Public License along
-# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
-"""plan execution of rql queries on a single source"""
-
-__docformat__ = "restructuredtext en"
-
-from six import text_type
-
-from rql.stmts import Union, Select
-from rql.nodes import Constant, Relation
-
-from cubicweb import QueryError
-from cubicweb.schema import VIRTUAL_RTYPES
-from cubicweb.rqlrewrite import add_types_restriction
-from cubicweb.server.edition import EditedEntity
-
-READ_ONLY_RTYPES = set(('eid', 'has_text', 'is', 'is_instance_of', 'identity'))
-
-_CONSTANT = object()
-_FROM_SUBSTEP = object()
-
-def _extract_const_attributes(plan, rqlst, to_build):
- """add constant values to entity def, mark variables to be selected
- """
- to_select = {}
- for relation in rqlst.main_relations:
- lhs, rhs = relation.get_variable_parts()
- rtype = relation.r_type
- if rtype in READ_ONLY_RTYPES:
- raise QueryError("can't assign to %s" % rtype)
- try:
- edef = to_build[str(lhs)]
- except KeyError:
- # lhs var is not to build, should be selected and added as an
- # object relation
- edef = to_build[str(rhs)]
- to_select.setdefault(edef, []).append((rtype, lhs, 1))
- else:
- if isinstance(rhs, Constant) and not rhs.uid:
- # add constant values to entity def
- value = rhs.eval(plan.args)
- eschema = edef.entity.e_schema
- attrtype = eschema.subjrels[rtype].objects(eschema)[0]
- if attrtype == 'Password' and isinstance(value, text_type):
- value = value.encode('UTF8')
- edef.edited_attribute(rtype, value)
- elif str(rhs) in to_build:
- # create a relation between two newly created variables
- plan.add_relation_def((edef, rtype, to_build[rhs.name]))
- else:
- to_select.setdefault(edef, []).append( (rtype, rhs, 0) )
- return to_select
-
-def _extract_eid_consts(plan, rqlst):
- """return a dict mapping rqlst variable object to their eid if specified in
- the syntax tree
- """
- cnx = plan.cnx
- if rqlst.where is None:
- return {}
- eidconsts = {}
- neweids = cnx.transaction_data.get('neweids', ())
- checkread = cnx.read_security
- eschema = cnx.vreg.schema.eschema
- for rel in rqlst.where.get_nodes(Relation):
- # only care for 'eid' relations ...
- if (rel.r_type == 'eid'
- # ... that are not part of a NOT clause ...
- and not rel.neged(strict=True)
- # ... and where eid is specified by '=' operator.
- and rel.children[1].operator == '='):
- lhs, rhs = rel.get_variable_parts()
- if isinstance(rhs, Constant):
- eid = int(rhs.eval(plan.args))
- # check read permission here since it may not be done by
- # the generated select substep if not emited (eg nothing
- # to be selected)
- if checkread and eid not in neweids:
- with cnx.security_enabled(read=False):
- eschema(cnx.entity_metas(eid)['type']).check_perm(
- cnx, 'read', eid=eid)
- eidconsts[lhs.variable] = eid
- return eidconsts
-
-def _build_substep_query(select, origrqlst):
- """Finalize substep select query that should be executed to get proper
- selection of stuff to insert/update.
-
- Return None when no query actually needed, else the given select node that
- will be used as substep query.
- """
- if origrqlst.where is not None and not select.selection:
- # no selection, append one randomly by searching for a relation which is
- # not neged neither a type restriction (is/is_instance_of)
- for rel in origrqlst.where.iget_nodes(Relation):
- if not (rel.neged(traverse_scope=True) or rel.is_types_restriction()):
- select.append_selected(rel.children[0].copy(select))
- break
- else:
- return None
- if select.selection:
- if origrqlst.where is not None:
- select.set_where(origrqlst.where.copy(select))
- if getattr(origrqlst, 'having', None):
- select.set_having([sq.copy(select) for sq in origrqlst.having])
- return select
- return None
-
-class SSPlanner(object):
- """SingleSourcePlanner: build execution plan for rql queries
-
- optimized for single source repositories
- """
-
- def __init__(self, schema, rqlhelper):
- self.schema = schema
- self.rqlhelper = rqlhelper
-
- def build_plan(self, plan):
- """build an execution plan from a RQL query
-
- do nothing here, dispatch according to the statement type
- """
- build_plan = getattr(self, 'build_%s_plan' % plan.rqlst.TYPE)
- for step in build_plan(plan, plan.rqlst):
- plan.add_step(step)
-
- def build_select_plan(self, plan, rqlst):
- """build execution plan for a SELECT RQL query. Suppose only one source
- is available and so avoid work need for query decomposition among sources
-
- the rqlst should not be tagged at this point.
- """
- plan.preprocess(rqlst)
- return (OneFetchStep(plan, rqlst),)
-
- def build_insert_plan(self, plan, rqlst):
- """get an execution plan from an INSERT RQL query"""
- # each variable in main variables is a new entity to insert
- to_build = {}
- cnx = plan.cnx
- etype_class = cnx.vreg['etypes'].etype_class
- for etype, var in rqlst.main_variables:
- # need to do this since entity class is shared w. web client code !
- to_build[var.name] = EditedEntity(etype_class(etype)(cnx))
- plan.add_entity_def(to_build[var.name])
- # add constant values to entity def, mark variables to be selected
- to_select = _extract_const_attributes(plan, rqlst, to_build)
- # add necessary steps to add relations and update attributes
- step = InsertStep(plan) # insert each entity and its relations
- step.children += self._compute_relation_steps(plan, rqlst, to_select)
- return (step,)
-
- def _compute_relation_steps(self, plan, rqlst, to_select):
- """handle the selection of relations for an insert query"""
- eidconsts = _extract_eid_consts(plan, rqlst)
- for edef, rdefs in to_select.items():
- # create a select rql st to fetch needed data
- select = Select()
- eschema = edef.entity.e_schema
- for i, (rtype, term, reverse) in enumerate(rdefs):
- if getattr(term, 'variable', None) in eidconsts:
- value = eidconsts[term.variable]
- else:
- select.append_selected(term.copy(select))
- value = _FROM_SUBSTEP
- if reverse:
- rdefs[i] = (rtype, InsertRelationsStep.REVERSE_RELATION, value)
- else:
- rschema = eschema.subjrels[rtype]
- if rschema.final or rschema.inlined:
- rdefs[i] = (rtype, InsertRelationsStep.FINAL, value)
- else:
- rdefs[i] = (rtype, InsertRelationsStep.RELATION, value)
- step = InsertRelationsStep(plan, edef, rdefs)
- select = _build_substep_query(select, rqlst)
- if select is not None:
- step.children += self._select_plan(plan, select, rqlst.solutions)
- yield step
-
- def build_delete_plan(self, plan, rqlst):
- """get an execution plan from a DELETE RQL query"""
- # build a select query to fetch entities to delete
- steps = []
- for etype, var in rqlst.main_variables:
- step = DeleteEntitiesStep(plan)
- step.children += self._sel_variable_step(plan, rqlst, etype, var)
- steps.append(step)
- for relation in rqlst.main_relations:
- step = DeleteRelationsStep(plan, relation.r_type)
- step.children += self._sel_relation_steps(plan, rqlst, relation)
- steps.append(step)
- return steps
-
- def _sel_variable_step(self, plan, rqlst, etype, varref):
- """handle the selection of variables for a delete query"""
- select = Select()
- varref = varref.copy(select)
- select.defined_vars = {varref.name: varref.variable}
- select.append_selected(varref)
- if rqlst.where is not None:
- select.set_where(rqlst.where.copy(select))
- if getattr(rqlst, 'having', None):
- select.set_having([x.copy(select) for x in rqlst.having])
- if etype != 'Any':
- select.add_type_restriction(varref.variable, etype)
- return self._select_plan(plan, select, rqlst.solutions)
-
- def _sel_relation_steps(self, plan, rqlst, relation):
- """handle the selection of relations for a delete query"""
- select = Select()
- lhs, rhs = relation.get_variable_parts()
- select.append_selected(lhs.copy(select))
- select.append_selected(rhs.copy(select))
- select.set_where(relation.copy(select))
- if rqlst.where is not None:
- select.add_restriction(rqlst.where.copy(select))
- if getattr(rqlst, 'having', None):
- select.set_having([x.copy(select) for x in rqlst.having])
- return self._select_plan(plan, select, rqlst.solutions)
-
- def build_set_plan(self, plan, rqlst):
- """get an execution plan from an SET RQL query"""
- getrschema = self.schema.rschema
- select = Select() # potential substep query
- selectedidx = {} # local state
- attributes = set() # edited attributes
- updatedefs = [] # definition of update attributes/relations
- selidx = residx = 0 # substep selection / resulting rset indexes
- # search for eid const in the WHERE clause
- eidconsts = _extract_eid_consts(plan, rqlst)
- # build `updatedefs` describing things to update and add necessary
- # variables to the substep selection
- for i, relation in enumerate(rqlst.main_relations):
- if relation.r_type in VIRTUAL_RTYPES:
- raise QueryError('can not assign to %r relation'
- % relation.r_type)
- lhs, rhs = relation.get_variable_parts()
- lhskey = lhs.as_string()
- if not lhskey in selectedidx:
- if lhs.variable in eidconsts:
- eid = eidconsts[lhs.variable]
- lhsinfo = (_CONSTANT, eid, residx)
- else:
- select.append_selected(lhs.copy(select))
- lhsinfo = (_FROM_SUBSTEP, selidx, residx)
- selidx += 1
- residx += 1
- selectedidx[lhskey] = lhsinfo
- else:
- lhsinfo = selectedidx[lhskey][:-1] + (None,)
- rhskey = rhs.as_string()
- if not rhskey in selectedidx:
- if isinstance(rhs, Constant):
- rhsinfo = (_CONSTANT, rhs.eval(plan.args), residx)
- elif getattr(rhs, 'variable', None) in eidconsts:
- eid = eidconsts[rhs.variable]
- rhsinfo = (_CONSTANT, eid, residx)
- else:
- select.append_selected(rhs.copy(select))
- rhsinfo = (_FROM_SUBSTEP, selidx, residx)
- selidx += 1
- residx += 1
- selectedidx[rhskey] = rhsinfo
- else:
- rhsinfo = selectedidx[rhskey][:-1] + (None,)
- rschema = getrschema(relation.r_type)
- updatedefs.append( (lhsinfo, rhsinfo, rschema) )
- # the update step
- step = UpdateStep(plan, updatedefs)
- # when necessary add substep to fetch yet unknown values
- select = _build_substep_query(select, rqlst)
- if select is not None:
- # set distinct to avoid potential duplicate key error
- select.distinct = True
- step.children += self._select_plan(plan, select, rqlst.solutions)
- return (step,)
-
- # internal methods ########################################################
-
- def _select_plan(self, plan, select, solutions):
- union = Union()
- union.append(select)
- select.clean_solutions(solutions)
- add_types_restriction(self.schema, select)
- self.rqlhelper.annotate(union)
- return self.build_select_plan(plan, union)
-
-
-# execution steps and helper functions ########################################
-
-def varmap_test_repr(varmap, tablesinorder):
- if varmap is None:
- return varmap
- maprepr = {}
- for var, sql in varmap.items():
- table, col = sql.split('.')
- maprepr[var] = '%s.%s' % (tablesinorder[table], col)
- return maprepr
-
-class Step(object):
- """base abstract class for execution step"""
- def __init__(self, plan):
- self.plan = plan
- self.children = []
-
- def execute_child(self):
- assert len(self.children) == 1
- return self.children[0].execute()
-
- def execute_children(self):
- for step in self.children:
- step.execute()
-
- def execute(self):
- """execute this step and store partial (eg this step) results"""
- raise NotImplementedError()
-
- def mytest_repr(self):
- """return a representation of this step suitable for test"""
- return (self.__class__.__name__,)
-
- def test_repr(self):
- """return a representation of this step suitable for test"""
- return self.mytest_repr() + (
- [step.test_repr() for step in self.children],)
-
-
-class OneFetchStep(Step):
- """step consisting in fetching data from sources and directly returning
- results
- """
- def __init__(self, plan, union, inputmap=None):
- Step.__init__(self, plan)
- self.union = union
- self.inputmap = inputmap
-
- def execute(self):
- """call .syntax_tree_search with the given syntax tree on each
- source for each solution
- """
- self.execute_children()
- cnx = self.plan.cnx
- args = self.plan.args
- inputmap = self.inputmap
- union = self.union
- # do we have to use a inputmap from a previous step ? If so disable
- # cachekey
- if inputmap or self.plan.cache_key is None:
- cachekey = None
- # union may have been splited into subqueries, in which case we can't
- # use plan.cache_key, rebuild a cache key
- elif isinstance(self.plan.cache_key, tuple):
- cachekey = list(self.plan.cache_key)
- cachekey[0] = union.as_string()
- cachekey = tuple(cachekey)
- else:
- cachekey = union.as_string()
- # get results for query
- source = cnx.repo.system_source
- result = source.syntax_tree_search(cnx, union, args, cachekey, inputmap)
- #print 'ONEFETCH RESULT %s' % (result)
- return result
-
- def mytest_repr(self):
- """return a representation of this step suitable for test"""
- try:
- inputmap = varmap_test_repr(self.inputmap, self.plan.tablesinorder)
- except AttributeError:
- inputmap = self.inputmap
- return (self.__class__.__name__,
- sorted((r.as_string(kwargs=self.plan.args), r.solutions)
- for r in self.union.children),
- inputmap)
-
-
-# UPDATE/INSERT/DELETE steps ##################################################
-
-class InsertRelationsStep(Step):
- """step consisting in adding attributes/relations to entity defs from a
- previous FetchStep
-
- relations values comes from the latest result, with one columns for
- each relation defined in self.rdefs
-
- for one entity definition, we'll construct N entity, where N is the
- number of the latest result
- """
-
- FINAL = 0
- RELATION = 1
- REVERSE_RELATION = 2
-
- def __init__(self, plan, edef, rdefs):
- Step.__init__(self, plan)
- # partial entity definition to expand
- self.edef = edef
- # definition of relations to complete
- self.rdefs = rdefs
-
- def execute(self):
- """execute this step"""
- base_edef = self.edef
- edefs = []
- if self.children:
- result = self.execute_child()
- else:
- result = [[]]
- for row in result:
- # get a new entity definition for this row
- edef = base_edef.clone()
- # complete this entity def using row values
- index = 0
- for rtype, rorder, value in self.rdefs:
- if value is _FROM_SUBSTEP:
- value = row[index]
- index += 1
- if rorder == InsertRelationsStep.FINAL:
- edef.edited_attribute(rtype, value)
- elif rorder == InsertRelationsStep.RELATION:
- self.plan.add_relation_def( (edef, rtype, value) )
- edef.querier_pending_relations[(rtype, 'subject')] = value
- else:
- self.plan.add_relation_def( (value, rtype, edef) )
- edef.querier_pending_relations[(rtype, 'object')] = value
- edefs.append(edef)
- self.plan.substitute_entity_def(base_edef, edefs)
- return result
-
-
-class InsertStep(Step):
- """step consisting in inserting new entities / relations"""
-
- def execute(self):
- """execute this step"""
- for step in self.children:
- assert isinstance(step, InsertRelationsStep)
- step.plan = self.plan
- step.execute()
- # insert entities first
- result = self.plan.insert_entity_defs()
- # then relation
- self.plan.insert_relation_defs()
- # return eids of inserted entities
- return result
-
-
-class DeleteEntitiesStep(Step):
- """step consisting in deleting entities"""
-
- def execute(self):
- """execute this step"""
- results = self.execute_child()
- if results:
- todelete = frozenset(int(eid) for eid, in results)
- cnx = self.plan.cnx
- cnx.repo.glob_delete_entities(cnx, todelete)
- return results
-
-class DeleteRelationsStep(Step):
- """step consisting in deleting relations"""
-
- def __init__(self, plan, rtype):
- Step.__init__(self, plan)
- self.rtype = rtype
-
- def execute(self):
- """execute this step"""
- cnx = self.plan.cnx
- delete = cnx.repo.glob_delete_relation
- for subj, obj in self.execute_child():
- delete(cnx, subj, self.rtype, obj)
-
-
-class UpdateStep(Step):
- """step consisting in updating entities / adding relations from relations
- definitions and from results fetched in previous step
- """
-
- def __init__(self, plan, updatedefs):
- Step.__init__(self, plan)
- self.updatedefs = updatedefs
-
- def execute(self):
- """execute this step"""
- cnx = self.plan.cnx
- repo = cnx.repo
- edefs = {}
- relations = {}
- # insert relations
- if self.children:
- result = self.execute_child()
- else:
- result = [[]]
- for i, row in enumerate(result):
- newrow = []
- for (lhsinfo, rhsinfo, rschema) in self.updatedefs:
- lhsval = _handle_relterm(lhsinfo, row, newrow)
- rhsval = _handle_relterm(rhsinfo, row, newrow)
- if rschema.final or rschema.inlined:
- eid = int(lhsval)
- try:
- edited = edefs[eid]
- except KeyError:
- edef = cnx.entity_from_eid(eid)
- edefs[eid] = edited = EditedEntity(edef)
- edited.edited_attribute(str(rschema), rhsval)
- else:
- str_rschema = str(rschema)
- if str_rschema in relations:
- relations[str_rschema].append((lhsval, rhsval))
- else:
- relations[str_rschema] = [(lhsval, rhsval)]
- result[i] = newrow
- # update entities
- repo.glob_add_relations(cnx, relations)
- for eid, edited in edefs.items():
- repo.glob_update_entity(cnx, edited)
- return result
-
-def _handle_relterm(info, row, newrow):
- if info[0] is _CONSTANT:
- val = info[1]
- else: # _FROM_SUBSTEP
- val = row[info[1]]
- if info[-1] is not None:
- newrow.append(val)
- return val