changeset 11057 0b59724cb3f2
parent 10682 7e111b606005
child 11237 f32134dd0067
equal deleted inserted replaced
11052:058bb3dc685f 11057:0b59724cb3f2
     1 # copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     2 # contact --
     3 #
     4 # This file is part of CubicWeb.
     5 #
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
     7 # terms of the GNU Lesser General Public License as published by the Free
     8 # Software Foundation, either version 2.1 of the License, or (at your option)
     9 # any later version.
    10 #
    11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
    12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
    14 # details.
    15 #
    16 # You should have received a copy of the GNU Lesser General Public License along
    17 # with CubicWeb.  If not, see <>.
    18 """plan execution of rql queries on a single source"""
    20 __docformat__ = "restructuredtext en"
    22 from six import text_type
    24 from rql.stmts import Union, Select
    25 from rql.nodes import Constant, Relation
    27 from cubicweb import QueryError
    28 from cubicweb.schema import VIRTUAL_RTYPES
    29 from cubicweb.rqlrewrite import add_types_restriction
    30 from cubicweb.server.edition import EditedEntity
    32 READ_ONLY_RTYPES = set(('eid', 'has_text', 'is', 'is_instance_of', 'identity'))
    34 _CONSTANT = object()
    35 _FROM_SUBSTEP = object()
    37 def _extract_const_attributes(plan, rqlst, to_build):
    38     """add constant values to entity def, mark variables to be selected
    39     """
    40     to_select = {}
    41     for relation in rqlst.main_relations:
    42         lhs, rhs = relation.get_variable_parts()
    43         rtype = relation.r_type
    44         if rtype in READ_ONLY_RTYPES:
    45             raise QueryError("can't assign to %s" % rtype)
    46         try:
    47             edef = to_build[str(lhs)]
    48         except KeyError:
    49             # lhs var is not to build, should be selected and added as an
    50             # object relation
    51             edef = to_build[str(rhs)]
    52             to_select.setdefault(edef, []).append((rtype, lhs, 1))
    53         else:
    54             if isinstance(rhs, Constant) and not rhs.uid:
    55                 # add constant values to entity def
    56                 value = rhs.eval(plan.args)
    57                 eschema = edef.entity.e_schema
    58                 attrtype = eschema.subjrels[rtype].objects(eschema)[0]
    59                 if attrtype == 'Password' and isinstance(value, text_type):
    60                     value = value.encode('UTF8')
    61                 edef.edited_attribute(rtype, value)
    62             elif str(rhs) in to_build:
    63                 # create a relation between two newly created variables
    64                 plan.add_relation_def((edef, rtype, to_build[]))
    65             else:
    66                 to_select.setdefault(edef, []).append( (rtype, rhs, 0) )
    67     return to_select
    69 def _extract_eid_consts(plan, rqlst):
    70     """return a dict mapping rqlst variable object to their eid if specified in
    71     the syntax tree
    72     """
    73     cnx = plan.cnx
    74     if rqlst.where is None:
    75         return {}
    76     eidconsts = {}
    77     neweids = cnx.transaction_data.get('neweids', ())
    78     checkread = cnx.read_security
    79     eschema = cnx.vreg.schema.eschema
    80     for rel in rqlst.where.get_nodes(Relation):
    81         # only care for 'eid' relations ...
    82         if (rel.r_type == 'eid'
    83             # ... that are not part of a NOT clause ...
    84             and not rel.neged(strict=True)
    85             # ... and where eid is specified by '=' operator.
    86             and rel.children[1].operator == '='):
    87             lhs, rhs = rel.get_variable_parts()
    88             if isinstance(rhs, Constant):
    89                 eid = int(rhs.eval(plan.args))
    90                 # check read permission here since it may not be done by
    91                 # the generated select substep if not emited (eg nothing
    92                 # to be selected)
    93                 if checkread and eid not in neweids:
    94                     with cnx.security_enabled(read=False):
    95                         eschema(cnx.entity_metas(eid)['type']).check_perm(
    96                             cnx, 'read', eid=eid)
    97                 eidconsts[lhs.variable] = eid
    98     return eidconsts
   100 def _build_substep_query(select, origrqlst):
   101     """Finalize substep select query that should be executed to get proper
   102     selection of stuff to insert/update.
   104     Return None when no query actually needed, else the given select node that
   105     will be used as substep query.
   106     """
   107     if origrqlst.where is not None and not select.selection:
   108         # no selection, append one randomly by searching for a relation which is
   109         # not neged neither a type restriction (is/is_instance_of)
   110         for rel in origrqlst.where.iget_nodes(Relation):
   111             if not (rel.neged(traverse_scope=True) or rel.is_types_restriction()):
   112                 select.append_selected(rel.children[0].copy(select))
   113                 break
   114         else:
   115             return None
   116     if select.selection:
   117         if origrqlst.where is not None:
   118             select.set_where(origrqlst.where.copy(select))
   119         if getattr(origrqlst, 'having', None):
   120             select.set_having([sq.copy(select) for sq in origrqlst.having])
   121         return select
   122     return None
   124 class SSPlanner(object):
   125     """SingleSourcePlanner: build execution plan for rql queries
   127     optimized for single source repositories
   128     """
   130     def __init__(self, schema, rqlhelper):
   131         self.schema = schema
   132         self.rqlhelper = rqlhelper
   134     def build_plan(self, plan):
   135         """build an execution plan from a RQL query
   137         do nothing here, dispatch according to the statement type
   138         """
   139         build_plan = getattr(self, 'build_%s_plan' % plan.rqlst.TYPE)
   140         for step in build_plan(plan, plan.rqlst):
   141             plan.add_step(step)
   143     def build_select_plan(self, plan, rqlst):
   144         """build execution plan for a SELECT RQL query. Suppose only one source
   145         is available and so avoid work need for query decomposition among sources
   147         the rqlst should not be tagged at this point.
   148         """
   149         plan.preprocess(rqlst)
   150         return (OneFetchStep(plan, rqlst),)
   152     def build_insert_plan(self, plan, rqlst):
   153         """get an execution plan from an INSERT RQL query"""
   154         # each variable in main variables is a new entity to insert
   155         to_build = {}
   156         cnx = plan.cnx
   157         etype_class = cnx.vreg['etypes'].etype_class
   158         for etype, var in rqlst.main_variables:
   159             # need to do this since entity class is shared w. web client code !
   160             to_build[] = EditedEntity(etype_class(etype)(cnx))
   161             plan.add_entity_def(to_build[])
   162         # add constant values to entity def, mark variables to be selected
   163         to_select = _extract_const_attributes(plan, rqlst, to_build)
   164         # add necessary steps to add relations and update attributes
   165         step = InsertStep(plan) # insert each entity and its relations
   166         step.children += self._compute_relation_steps(plan, rqlst, to_select)
   167         return (step,)
   169     def _compute_relation_steps(self, plan, rqlst, to_select):
   170         """handle the selection of relations for an insert query"""
   171         eidconsts = _extract_eid_consts(plan, rqlst)
   172         for edef, rdefs in to_select.items():
   173             # create a select rql st to fetch needed data
   174             select = Select()
   175             eschema = edef.entity.e_schema
   176             for i, (rtype, term, reverse) in enumerate(rdefs):
   177                 if getattr(term, 'variable', None) in eidconsts:
   178                     value = eidconsts[term.variable]
   179                 else:
   180                     select.append_selected(term.copy(select))
   181                     value = _FROM_SUBSTEP
   182                 if reverse:
   183                     rdefs[i] = (rtype, InsertRelationsStep.REVERSE_RELATION, value)
   184                 else:
   185                     rschema = eschema.subjrels[rtype]
   186                     if or rschema.inlined:
   187                         rdefs[i] = (rtype, InsertRelationsStep.FINAL, value)
   188                     else:
   189                         rdefs[i] = (rtype, InsertRelationsStep.RELATION, value)
   190             step = InsertRelationsStep(plan, edef, rdefs)
   191             select = _build_substep_query(select, rqlst)
   192             if select is not None:
   193                 step.children += self._select_plan(plan, select,
   194             yield step
   196     def build_delete_plan(self, plan, rqlst):
   197         """get an execution plan from a DELETE RQL query"""
   198         # build a select query to fetch entities to delete
   199         steps = []
   200         for etype, var in rqlst.main_variables:
   201             step = DeleteEntitiesStep(plan)
   202             step.children += self._sel_variable_step(plan, rqlst, etype, var)
   203             steps.append(step)
   204         for relation in rqlst.main_relations:
   205             step = DeleteRelationsStep(plan, relation.r_type)
   206             step.children += self._sel_relation_steps(plan, rqlst, relation)
   207             steps.append(step)
   208         return steps
   210     def _sel_variable_step(self, plan, rqlst, etype, varref):
   211         """handle the selection of variables for a delete query"""
   212         select = Select()
   213         varref = varref.copy(select)
   214         select.defined_vars = { varref.variable}
   215         select.append_selected(varref)
   216         if rqlst.where is not None:
   217             select.set_where(rqlst.where.copy(select))
   218         if getattr(rqlst, 'having', None):
   219             select.set_having([x.copy(select) for x in rqlst.having])
   220         if etype != 'Any':
   221             select.add_type_restriction(varref.variable, etype)
   222         return self._select_plan(plan, select,
   224     def _sel_relation_steps(self, plan, rqlst, relation):
   225         """handle the selection of relations for a delete query"""
   226         select = Select()
   227         lhs, rhs = relation.get_variable_parts()
   228         select.append_selected(lhs.copy(select))
   229         select.append_selected(rhs.copy(select))
   230         select.set_where(relation.copy(select))
   231         if rqlst.where is not None:
   232             select.add_restriction(rqlst.where.copy(select))
   233         if getattr(rqlst, 'having', None):
   234             select.set_having([x.copy(select) for x in rqlst.having])
   235         return self._select_plan(plan, select,
   237     def build_set_plan(self, plan, rqlst):
   238         """get an execution plan from an SET RQL query"""
   239         getrschema = self.schema.rschema
   240         select = Select()   # potential substep query
   241         selectedidx = {}    # local state
   242         attributes = set()  # edited attributes
   243         updatedefs = []     # definition of update attributes/relations
   244         selidx = residx = 0 # substep selection / resulting rset indexes
   245         # search for eid const in the WHERE clause
   246         eidconsts = _extract_eid_consts(plan, rqlst)
   247         # build `updatedefs` describing things to update and add necessary
   248         # variables to the substep selection
   249         for i, relation in enumerate(rqlst.main_relations):
   250             if relation.r_type in VIRTUAL_RTYPES:
   251                 raise QueryError('can not assign to %r relation'
   252                                  % relation.r_type)
   253             lhs, rhs = relation.get_variable_parts()
   254             lhskey = lhs.as_string()
   255             if not lhskey in selectedidx:
   256                 if lhs.variable in eidconsts:
   257                     eid = eidconsts[lhs.variable]
   258                     lhsinfo = (_CONSTANT, eid, residx)
   259                 else:
   260                     select.append_selected(lhs.copy(select))
   261                     lhsinfo = (_FROM_SUBSTEP, selidx, residx)
   262                     selidx += 1
   263                 residx += 1
   264                 selectedidx[lhskey] = lhsinfo
   265             else:
   266                 lhsinfo = selectedidx[lhskey][:-1] + (None,)
   267             rhskey = rhs.as_string()
   268             if not rhskey in selectedidx:
   269                 if isinstance(rhs, Constant):
   270                     rhsinfo = (_CONSTANT, rhs.eval(plan.args), residx)
   271                 elif getattr(rhs, 'variable', None) in eidconsts:
   272                     eid = eidconsts[rhs.variable]
   273                     rhsinfo = (_CONSTANT, eid, residx)
   274                 else:
   275                     select.append_selected(rhs.copy(select))
   276                     rhsinfo = (_FROM_SUBSTEP, selidx, residx)
   277                     selidx += 1
   278                 residx += 1
   279                 selectedidx[rhskey] = rhsinfo
   280             else:
   281                 rhsinfo = selectedidx[rhskey][:-1] + (None,)
   282             rschema = getrschema(relation.r_type)
   283             updatedefs.append( (lhsinfo, rhsinfo, rschema) )
   284         # the update step
   285         step = UpdateStep(plan, updatedefs)
   286         # when necessary add substep to fetch yet unknown values
   287         select = _build_substep_query(select, rqlst)
   288         if select is not None:
   289             # set distinct to avoid potential duplicate key error
   290             select.distinct = True
   291             step.children += self._select_plan(plan, select,
   292         return (step,)
   294     # internal methods ########################################################
   296     def _select_plan(self, plan, select, solutions):
   297         union = Union()
   298         union.append(select)
   299         select.clean_solutions(solutions)
   300         add_types_restriction(self.schema, select)
   301         self.rqlhelper.annotate(union)
   302         return self.build_select_plan(plan, union)
   305 # execution steps and helper functions ########################################
   307 def varmap_test_repr(varmap, tablesinorder):
   308     if varmap is None:
   309         return varmap
   310     maprepr = {}
   311     for var, sql in varmap.items():
   312         table, col = sql.split('.')
   313         maprepr[var] = '%s.%s' % (tablesinorder[table], col)
   314     return maprepr
   316 class Step(object):
   317     """base abstract class for execution step"""
   318     def __init__(self, plan):
   319         self.plan = plan
   320         self.children = []
   322     def execute_child(self):
   323         assert len(self.children) == 1
   324         return self.children[0].execute()
   326     def execute_children(self):
   327         for step in self.children:
   328             step.execute()
   330     def execute(self):
   331         """execute this step and store partial (eg this step) results"""
   332         raise NotImplementedError()
   334     def mytest_repr(self):
   335         """return a representation of this step suitable for test"""
   336         return (self.__class__.__name__,)
   338     def test_repr(self):
   339         """return a representation of this step suitable for test"""
   340         return self.mytest_repr() + (
   341             [step.test_repr() for step in self.children],)
   344 class OneFetchStep(Step):
   345     """step consisting in fetching data from sources and directly returning
   346     results
   347     """
   348     def __init__(self, plan, union, inputmap=None):
   349         Step.__init__(self, plan)
   350         self.union = union
   351         self.inputmap = inputmap
   353     def execute(self):
   354         """call .syntax_tree_search with the given syntax tree on each
   355         source for each solution
   356         """
   357         self.execute_children()
   358         cnx = self.plan.cnx
   359         args = self.plan.args
   360         inputmap = self.inputmap
   361         union = self.union
   362         # do we have to use a inputmap from a previous step ? If so disable
   363         # cachekey
   364         if inputmap or self.plan.cache_key is None:
   365             cachekey = None
   366         # union may have been splited into subqueries, in which case we can't
   367         # use plan.cache_key, rebuild a cache key
   368         elif isinstance(self.plan.cache_key, tuple):
   369             cachekey = list(self.plan.cache_key)
   370             cachekey[0] = union.as_string()
   371             cachekey = tuple(cachekey)
   372         else:
   373             cachekey = union.as_string()
   374         # get results for query
   375         source = cnx.repo.system_source
   376         result = source.syntax_tree_search(cnx, union, args, cachekey, inputmap)
   377         #print 'ONEFETCH RESULT %s' % (result)
   378         return result
   380     def mytest_repr(self):
   381         """return a representation of this step suitable for test"""
   382         try:
   383             inputmap = varmap_test_repr(self.inputmap, self.plan.tablesinorder)
   384         except AttributeError:
   385             inputmap = self.inputmap
   386         return (self.__class__.__name__,
   387                 sorted((r.as_string(kwargs=self.plan.args),
   388                        for r in self.union.children),
   389                 inputmap)
   392 # UPDATE/INSERT/DELETE steps ##################################################
   394 class InsertRelationsStep(Step):
   395     """step consisting in adding attributes/relations to entity defs from a
   396     previous FetchStep
   398     relations values comes from the latest result, with one columns for
   399     each relation defined in self.rdefs
   401     for one entity definition, we'll construct N entity, where N is the
   402     number of the latest result
   403     """
   405     FINAL = 0
   406     RELATION = 1
   407     REVERSE_RELATION = 2
   409     def __init__(self, plan, edef, rdefs):
   410         Step.__init__(self, plan)
   411         # partial entity definition to expand
   412         self.edef = edef
   413         # definition of relations to complete
   414         self.rdefs = rdefs
   416     def execute(self):
   417         """execute this step"""
   418         base_edef = self.edef
   419         edefs = []
   420         if self.children:
   421             result = self.execute_child()
   422         else:
   423             result = [[]]
   424         for row in result:
   425             # get a new entity definition for this row
   426             edef = base_edef.clone()
   427             # complete this entity def using row values
   428             index = 0
   429             for rtype, rorder, value in self.rdefs:
   430                 if value is _FROM_SUBSTEP:
   431                     value = row[index]
   432                     index += 1
   433                 if rorder == InsertRelationsStep.FINAL:
   434                     edef.edited_attribute(rtype, value)
   435                 elif rorder == InsertRelationsStep.RELATION:
   436                     self.plan.add_relation_def( (edef, rtype, value) )
   437                     edef.querier_pending_relations[(rtype, 'subject')] = value
   438                 else:
   439                     self.plan.add_relation_def( (value, rtype, edef) )
   440                     edef.querier_pending_relations[(rtype, 'object')] = value
   441             edefs.append(edef)
   442         self.plan.substitute_entity_def(base_edef, edefs)
   443         return result
   446 class InsertStep(Step):
   447     """step consisting in inserting new entities / relations"""
   449     def execute(self):
   450         """execute this step"""
   451         for step in self.children:
   452             assert isinstance(step, InsertRelationsStep)
   453             step.plan = self.plan
   454             step.execute()
   455         # insert entities first
   456         result = self.plan.insert_entity_defs()
   457         # then relation
   458         self.plan.insert_relation_defs()
   459         # return eids of inserted entities
   460         return result
   463 class DeleteEntitiesStep(Step):
   464     """step consisting in deleting entities"""
   466     def execute(self):
   467         """execute this step"""
   468         results = self.execute_child()
   469         if results:
   470             todelete = frozenset(int(eid) for eid, in results)
   471             cnx = self.plan.cnx
   472             cnx.repo.glob_delete_entities(cnx, todelete)
   473         return results
   475 class DeleteRelationsStep(Step):
   476     """step consisting in deleting relations"""
   478     def __init__(self, plan, rtype):
   479         Step.__init__(self, plan)
   480         self.rtype = rtype
   482     def execute(self):
   483         """execute this step"""
   484         cnx = self.plan.cnx
   485         delete = cnx.repo.glob_delete_relation
   486         for subj, obj in self.execute_child():
   487             delete(cnx, subj, self.rtype, obj)
   490 class UpdateStep(Step):
   491     """step consisting in updating entities / adding relations from relations
   492     definitions and from results fetched in previous step
   493     """
   495     def __init__(self, plan, updatedefs):
   496         Step.__init__(self, plan)
   497         self.updatedefs = updatedefs
   499     def execute(self):
   500         """execute this step"""
   501         cnx = self.plan.cnx
   502         repo = cnx.repo
   503         edefs = {}
   504         relations = {}
   505         # insert relations
   506         if self.children:
   507             result = self.execute_child()
   508         else:
   509             result = [[]]
   510         for i, row in enumerate(result):
   511             newrow = []
   512             for (lhsinfo, rhsinfo, rschema) in self.updatedefs:
   513                 lhsval = _handle_relterm(lhsinfo, row, newrow)
   514                 rhsval = _handle_relterm(rhsinfo, row, newrow)
   515                 if or rschema.inlined:
   516                     eid = int(lhsval)
   517                     try:
   518                         edited = edefs[eid]
   519                     except KeyError:
   520                         edef = cnx.entity_from_eid(eid)
   521                         edefs[eid] = edited = EditedEntity(edef)
   522                     edited.edited_attribute(str(rschema), rhsval)
   523                 else:
   524                     str_rschema = str(rschema)
   525                     if str_rschema in relations:
   526                         relations[str_rschema].append((lhsval, rhsval))
   527                     else:
   528                         relations[str_rschema] = [(lhsval, rhsval)]
   529             result[i] = newrow
   530         # update entities
   531         repo.glob_add_relations(cnx, relations)
   532         for eid, edited in edefs.items():
   533             repo.glob_update_entity(cnx, edited)
   534         return result
   536 def _handle_relterm(info, row, newrow):
   537     if info[0] is _CONSTANT:
   538         val = info[1]
   539     else: # _FROM_SUBSTEP
   540         val = row[info[1]]
   541     if info[-1] is not None:
   542         newrow.append(val)
   543     return val