cubicweb/server/ssplanner.py
changeset 11057 0b59724cb3f2
parent 10682 7e111b606005
child 11237 f32134dd0067
equal deleted inserted replaced
11052:058bb3dc685f 11057:0b59724cb3f2
       
     1 # copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     3 #
       
     4 # This file is part of CubicWeb.
       
     5 #
       
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     7 # terms of the GNU Lesser General Public License as published by the Free
       
     8 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     9 # any later version.
       
    10 #
       
    11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    14 # details.
       
    15 #
       
    16 # You should have received a copy of the GNU Lesser General Public License along
       
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    18 """plan execution of rql queries on a single source"""
       
    19 
       
    20 __docformat__ = "restructuredtext en"
       
    21 
       
    22 from six import text_type
       
    23 
       
    24 from rql.stmts import Union, Select
       
    25 from rql.nodes import Constant, Relation
       
    26 
       
    27 from cubicweb import QueryError
       
    28 from cubicweb.schema import VIRTUAL_RTYPES
       
    29 from cubicweb.rqlrewrite import add_types_restriction
       
    30 from cubicweb.server.edition import EditedEntity
       
    31 
       
    32 READ_ONLY_RTYPES = set(('eid', 'has_text', 'is', 'is_instance_of', 'identity'))
       
    33 
       
    34 _CONSTANT = object()
       
    35 _FROM_SUBSTEP = object()
       
    36 
       
    37 def _extract_const_attributes(plan, rqlst, to_build):
       
    38     """add constant values to entity def, mark variables to be selected
       
    39     """
       
    40     to_select = {}
       
    41     for relation in rqlst.main_relations:
       
    42         lhs, rhs = relation.get_variable_parts()
       
    43         rtype = relation.r_type
       
    44         if rtype in READ_ONLY_RTYPES:
       
    45             raise QueryError("can't assign to %s" % rtype)
       
    46         try:
       
    47             edef = to_build[str(lhs)]
       
    48         except KeyError:
       
    49             # lhs var is not to build, should be selected and added as an
       
    50             # object relation
       
    51             edef = to_build[str(rhs)]
       
    52             to_select.setdefault(edef, []).append((rtype, lhs, 1))
       
    53         else:
       
    54             if isinstance(rhs, Constant) and not rhs.uid:
       
    55                 # add constant values to entity def
       
    56                 value = rhs.eval(plan.args)
       
    57                 eschema = edef.entity.e_schema
       
    58                 attrtype = eschema.subjrels[rtype].objects(eschema)[0]
       
    59                 if attrtype == 'Password' and isinstance(value, text_type):
       
    60                     value = value.encode('UTF8')
       
    61                 edef.edited_attribute(rtype, value)
       
    62             elif str(rhs) in to_build:
       
    63                 # create a relation between two newly created variables
       
    64                 plan.add_relation_def((edef, rtype, to_build[rhs.name]))
       
    65             else:
       
    66                 to_select.setdefault(edef, []).append( (rtype, rhs, 0) )
       
    67     return to_select
       
    68 
       
    69 def _extract_eid_consts(plan, rqlst):
       
    70     """return a dict mapping rqlst variable object to their eid if specified in
       
    71     the syntax tree
       
    72     """
       
    73     cnx = plan.cnx
       
    74     if rqlst.where is None:
       
    75         return {}
       
    76     eidconsts = {}
       
    77     neweids = cnx.transaction_data.get('neweids', ())
       
    78     checkread = cnx.read_security
       
    79     eschema = cnx.vreg.schema.eschema
       
    80     for rel in rqlst.where.get_nodes(Relation):
       
    81         # only care for 'eid' relations ...
       
    82         if (rel.r_type == 'eid'
       
    83             # ... that are not part of a NOT clause ...
       
    84             and not rel.neged(strict=True)
       
    85             # ... and where eid is specified by '=' operator.
       
    86             and rel.children[1].operator == '='):
       
    87             lhs, rhs = rel.get_variable_parts()
       
    88             if isinstance(rhs, Constant):
       
    89                 eid = int(rhs.eval(plan.args))
       
    90                 # check read permission here since it may not be done by
       
    91                 # the generated select substep if not emited (eg nothing
       
    92                 # to be selected)
       
    93                 if checkread and eid not in neweids:
       
    94                     with cnx.security_enabled(read=False):
       
    95                         eschema(cnx.entity_metas(eid)['type']).check_perm(
       
    96                             cnx, 'read', eid=eid)
       
    97                 eidconsts[lhs.variable] = eid
       
    98     return eidconsts
       
    99 
       
   100 def _build_substep_query(select, origrqlst):
       
   101     """Finalize substep select query that should be executed to get proper
       
   102     selection of stuff to insert/update.
       
   103 
       
   104     Return None when no query actually needed, else the given select node that
       
   105     will be used as substep query.
       
   106     """
       
   107     if origrqlst.where is not None and not select.selection:
       
   108         # no selection, append one randomly by searching for a relation which is
       
   109         # not neged neither a type restriction (is/is_instance_of)
       
   110         for rel in origrqlst.where.iget_nodes(Relation):
       
   111             if not (rel.neged(traverse_scope=True) or rel.is_types_restriction()):
       
   112                 select.append_selected(rel.children[0].copy(select))
       
   113                 break
       
   114         else:
       
   115             return None
       
   116     if select.selection:
       
   117         if origrqlst.where is not None:
       
   118             select.set_where(origrqlst.where.copy(select))
       
   119         if getattr(origrqlst, 'having', None):
       
   120             select.set_having([sq.copy(select) for sq in origrqlst.having])
       
   121         return select
       
   122     return None
       
   123 
       
   124 class SSPlanner(object):
       
   125     """SingleSourcePlanner: build execution plan for rql queries
       
   126 
       
   127     optimized for single source repositories
       
   128     """
       
   129 
       
   130     def __init__(self, schema, rqlhelper):
       
   131         self.schema = schema
       
   132         self.rqlhelper = rqlhelper
       
   133 
       
   134     def build_plan(self, plan):
       
   135         """build an execution plan from a RQL query
       
   136 
       
   137         do nothing here, dispatch according to the statement type
       
   138         """
       
   139         build_plan = getattr(self, 'build_%s_plan' % plan.rqlst.TYPE)
       
   140         for step in build_plan(plan, plan.rqlst):
       
   141             plan.add_step(step)
       
   142 
       
   143     def build_select_plan(self, plan, rqlst):
       
   144         """build execution plan for a SELECT RQL query. Suppose only one source
       
   145         is available and so avoid work need for query decomposition among sources
       
   146 
       
   147         the rqlst should not be tagged at this point.
       
   148         """
       
   149         plan.preprocess(rqlst)
       
   150         return (OneFetchStep(plan, rqlst),)
       
   151 
       
   152     def build_insert_plan(self, plan, rqlst):
       
   153         """get an execution plan from an INSERT RQL query"""
       
   154         # each variable in main variables is a new entity to insert
       
   155         to_build = {}
       
   156         cnx = plan.cnx
       
   157         etype_class = cnx.vreg['etypes'].etype_class
       
   158         for etype, var in rqlst.main_variables:
       
   159             # need to do this since entity class is shared w. web client code !
       
   160             to_build[var.name] = EditedEntity(etype_class(etype)(cnx))
       
   161             plan.add_entity_def(to_build[var.name])
       
   162         # add constant values to entity def, mark variables to be selected
       
   163         to_select = _extract_const_attributes(plan, rqlst, to_build)
       
   164         # add necessary steps to add relations and update attributes
       
   165         step = InsertStep(plan) # insert each entity and its relations
       
   166         step.children += self._compute_relation_steps(plan, rqlst, to_select)
       
   167         return (step,)
       
   168 
       
   169     def _compute_relation_steps(self, plan, rqlst, to_select):
       
   170         """handle the selection of relations for an insert query"""
       
   171         eidconsts = _extract_eid_consts(plan, rqlst)
       
   172         for edef, rdefs in to_select.items():
       
   173             # create a select rql st to fetch needed data
       
   174             select = Select()
       
   175             eschema = edef.entity.e_schema
       
   176             for i, (rtype, term, reverse) in enumerate(rdefs):
       
   177                 if getattr(term, 'variable', None) in eidconsts:
       
   178                     value = eidconsts[term.variable]
       
   179                 else:
       
   180                     select.append_selected(term.copy(select))
       
   181                     value = _FROM_SUBSTEP
       
   182                 if reverse:
       
   183                     rdefs[i] = (rtype, InsertRelationsStep.REVERSE_RELATION, value)
       
   184                 else:
       
   185                     rschema = eschema.subjrels[rtype]
       
   186                     if rschema.final or rschema.inlined:
       
   187                         rdefs[i] = (rtype, InsertRelationsStep.FINAL, value)
       
   188                     else:
       
   189                         rdefs[i] = (rtype, InsertRelationsStep.RELATION, value)
       
   190             step = InsertRelationsStep(plan, edef, rdefs)
       
   191             select = _build_substep_query(select, rqlst)
       
   192             if select is not None:
       
   193                 step.children += self._select_plan(plan, select, rqlst.solutions)
       
   194             yield step
       
   195 
       
   196     def build_delete_plan(self, plan, rqlst):
       
   197         """get an execution plan from a DELETE RQL query"""
       
   198         # build a select query to fetch entities to delete
       
   199         steps = []
       
   200         for etype, var in rqlst.main_variables:
       
   201             step = DeleteEntitiesStep(plan)
       
   202             step.children += self._sel_variable_step(plan, rqlst, etype, var)
       
   203             steps.append(step)
       
   204         for relation in rqlst.main_relations:
       
   205             step = DeleteRelationsStep(plan, relation.r_type)
       
   206             step.children += self._sel_relation_steps(plan, rqlst, relation)
       
   207             steps.append(step)
       
   208         return steps
       
   209 
       
   210     def _sel_variable_step(self, plan, rqlst, etype, varref):
       
   211         """handle the selection of variables for a delete query"""
       
   212         select = Select()
       
   213         varref = varref.copy(select)
       
   214         select.defined_vars = {varref.name: varref.variable}
       
   215         select.append_selected(varref)
       
   216         if rqlst.where is not None:
       
   217             select.set_where(rqlst.where.copy(select))
       
   218         if getattr(rqlst, 'having', None):
       
   219             select.set_having([x.copy(select) for x in rqlst.having])
       
   220         if etype != 'Any':
       
   221             select.add_type_restriction(varref.variable, etype)
       
   222         return self._select_plan(plan, select, rqlst.solutions)
       
   223 
       
   224     def _sel_relation_steps(self, plan, rqlst, relation):
       
   225         """handle the selection of relations for a delete query"""
       
   226         select = Select()
       
   227         lhs, rhs = relation.get_variable_parts()
       
   228         select.append_selected(lhs.copy(select))
       
   229         select.append_selected(rhs.copy(select))
       
   230         select.set_where(relation.copy(select))
       
   231         if rqlst.where is not None:
       
   232             select.add_restriction(rqlst.where.copy(select))
       
   233         if getattr(rqlst, 'having', None):
       
   234             select.set_having([x.copy(select) for x in rqlst.having])
       
   235         return self._select_plan(plan, select, rqlst.solutions)
       
   236 
       
   237     def build_set_plan(self, plan, rqlst):
       
   238         """get an execution plan from an SET RQL query"""
       
   239         getrschema = self.schema.rschema
       
   240         select = Select()   # potential substep query
       
   241         selectedidx = {}    # local state
       
   242         attributes = set()  # edited attributes
       
   243         updatedefs = []     # definition of update attributes/relations
       
   244         selidx = residx = 0 # substep selection / resulting rset indexes
       
   245         # search for eid const in the WHERE clause
       
   246         eidconsts = _extract_eid_consts(plan, rqlst)
       
   247         # build `updatedefs` describing things to update and add necessary
       
   248         # variables to the substep selection
       
   249         for i, relation in enumerate(rqlst.main_relations):
       
   250             if relation.r_type in VIRTUAL_RTYPES:
       
   251                 raise QueryError('can not assign to %r relation'
       
   252                                  % relation.r_type)
       
   253             lhs, rhs = relation.get_variable_parts()
       
   254             lhskey = lhs.as_string()
       
   255             if not lhskey in selectedidx:
       
   256                 if lhs.variable in eidconsts:
       
   257                     eid = eidconsts[lhs.variable]
       
   258                     lhsinfo = (_CONSTANT, eid, residx)
       
   259                 else:
       
   260                     select.append_selected(lhs.copy(select))
       
   261                     lhsinfo = (_FROM_SUBSTEP, selidx, residx)
       
   262                     selidx += 1
       
   263                 residx += 1
       
   264                 selectedidx[lhskey] = lhsinfo
       
   265             else:
       
   266                 lhsinfo = selectedidx[lhskey][:-1] + (None,)
       
   267             rhskey = rhs.as_string()
       
   268             if not rhskey in selectedidx:
       
   269                 if isinstance(rhs, Constant):
       
   270                     rhsinfo = (_CONSTANT, rhs.eval(plan.args), residx)
       
   271                 elif getattr(rhs, 'variable', None) in eidconsts:
       
   272                     eid = eidconsts[rhs.variable]
       
   273                     rhsinfo = (_CONSTANT, eid, residx)
       
   274                 else:
       
   275                     select.append_selected(rhs.copy(select))
       
   276                     rhsinfo = (_FROM_SUBSTEP, selidx, residx)
       
   277                     selidx += 1
       
   278                 residx += 1
       
   279                 selectedidx[rhskey] = rhsinfo
       
   280             else:
       
   281                 rhsinfo = selectedidx[rhskey][:-1] + (None,)
       
   282             rschema = getrschema(relation.r_type)
       
   283             updatedefs.append( (lhsinfo, rhsinfo, rschema) )
       
   284         # the update step
       
   285         step = UpdateStep(plan, updatedefs)
       
   286         # when necessary add substep to fetch yet unknown values
       
   287         select = _build_substep_query(select, rqlst)
       
   288         if select is not None:
       
   289             # set distinct to avoid potential duplicate key error
       
   290             select.distinct = True
       
   291             step.children += self._select_plan(plan, select, rqlst.solutions)
       
   292         return (step,)
       
   293 
       
   294     # internal methods ########################################################
       
   295 
       
   296     def _select_plan(self, plan, select, solutions):
       
   297         union = Union()
       
   298         union.append(select)
       
   299         select.clean_solutions(solutions)
       
   300         add_types_restriction(self.schema, select)
       
   301         self.rqlhelper.annotate(union)
       
   302         return self.build_select_plan(plan, union)
       
   303 
       
   304 
       
   305 # execution steps and helper functions ########################################
       
   306 
       
   307 def varmap_test_repr(varmap, tablesinorder):
       
   308     if varmap is None:
       
   309         return varmap
       
   310     maprepr = {}
       
   311     for var, sql in varmap.items():
       
   312         table, col = sql.split('.')
       
   313         maprepr[var] = '%s.%s' % (tablesinorder[table], col)
       
   314     return maprepr
       
   315 
       
   316 class Step(object):
       
   317     """base abstract class for execution step"""
       
   318     def __init__(self, plan):
       
   319         self.plan = plan
       
   320         self.children = []
       
   321 
       
   322     def execute_child(self):
       
   323         assert len(self.children) == 1
       
   324         return self.children[0].execute()
       
   325 
       
   326     def execute_children(self):
       
   327         for step in self.children:
       
   328             step.execute()
       
   329 
       
   330     def execute(self):
       
   331         """execute this step and store partial (eg this step) results"""
       
   332         raise NotImplementedError()
       
   333 
       
   334     def mytest_repr(self):
       
   335         """return a representation of this step suitable for test"""
       
   336         return (self.__class__.__name__,)
       
   337 
       
   338     def test_repr(self):
       
   339         """return a representation of this step suitable for test"""
       
   340         return self.mytest_repr() + (
       
   341             [step.test_repr() for step in self.children],)
       
   342 
       
   343 
       
   344 class OneFetchStep(Step):
       
   345     """step consisting in fetching data from sources and directly returning
       
   346     results
       
   347     """
       
   348     def __init__(self, plan, union, inputmap=None):
       
   349         Step.__init__(self, plan)
       
   350         self.union = union
       
   351         self.inputmap = inputmap
       
   352 
       
   353     def execute(self):
       
   354         """call .syntax_tree_search with the given syntax tree on each
       
   355         source for each solution
       
   356         """
       
   357         self.execute_children()
       
   358         cnx = self.plan.cnx
       
   359         args = self.plan.args
       
   360         inputmap = self.inputmap
       
   361         union = self.union
       
   362         # do we have to use a inputmap from a previous step ? If so disable
       
   363         # cachekey
       
   364         if inputmap or self.plan.cache_key is None:
       
   365             cachekey = None
       
   366         # union may have been splited into subqueries, in which case we can't
       
   367         # use plan.cache_key, rebuild a cache key
       
   368         elif isinstance(self.plan.cache_key, tuple):
       
   369             cachekey = list(self.plan.cache_key)
       
   370             cachekey[0] = union.as_string()
       
   371             cachekey = tuple(cachekey)
       
   372         else:
       
   373             cachekey = union.as_string()
       
   374         # get results for query
       
   375         source = cnx.repo.system_source
       
   376         result = source.syntax_tree_search(cnx, union, args, cachekey, inputmap)
       
   377         #print 'ONEFETCH RESULT %s' % (result)
       
   378         return result
       
   379 
       
   380     def mytest_repr(self):
       
   381         """return a representation of this step suitable for test"""
       
   382         try:
       
   383             inputmap = varmap_test_repr(self.inputmap, self.plan.tablesinorder)
       
   384         except AttributeError:
       
   385             inputmap = self.inputmap
       
   386         return (self.__class__.__name__,
       
   387                 sorted((r.as_string(kwargs=self.plan.args), r.solutions)
       
   388                        for r in self.union.children),
       
   389                 inputmap)
       
   390 
       
   391 
       
   392 # UPDATE/INSERT/DELETE steps ##################################################
       
   393 
       
   394 class InsertRelationsStep(Step):
       
   395     """step consisting in adding attributes/relations to entity defs from a
       
   396     previous FetchStep
       
   397 
       
   398     relations values comes from the latest result, with one columns for
       
   399     each relation defined in self.rdefs
       
   400 
       
   401     for one entity definition, we'll construct N entity, where N is the
       
   402     number of the latest result
       
   403     """
       
   404 
       
   405     FINAL = 0
       
   406     RELATION = 1
       
   407     REVERSE_RELATION = 2
       
   408 
       
   409     def __init__(self, plan, edef, rdefs):
       
   410         Step.__init__(self, plan)
       
   411         # partial entity definition to expand
       
   412         self.edef = edef
       
   413         # definition of relations to complete
       
   414         self.rdefs = rdefs
       
   415 
       
   416     def execute(self):
       
   417         """execute this step"""
       
   418         base_edef = self.edef
       
   419         edefs = []
       
   420         if self.children:
       
   421             result = self.execute_child()
       
   422         else:
       
   423             result = [[]]
       
   424         for row in result:
       
   425             # get a new entity definition for this row
       
   426             edef = base_edef.clone()
       
   427             # complete this entity def using row values
       
   428             index = 0
       
   429             for rtype, rorder, value in self.rdefs:
       
   430                 if value is _FROM_SUBSTEP:
       
   431                     value = row[index]
       
   432                     index += 1
       
   433                 if rorder == InsertRelationsStep.FINAL:
       
   434                     edef.edited_attribute(rtype, value)
       
   435                 elif rorder == InsertRelationsStep.RELATION:
       
   436                     self.plan.add_relation_def( (edef, rtype, value) )
       
   437                     edef.querier_pending_relations[(rtype, 'subject')] = value
       
   438                 else:
       
   439                     self.plan.add_relation_def( (value, rtype, edef) )
       
   440                     edef.querier_pending_relations[(rtype, 'object')] = value
       
   441             edefs.append(edef)
       
   442         self.plan.substitute_entity_def(base_edef, edefs)
       
   443         return result
       
   444 
       
   445 
       
   446 class InsertStep(Step):
       
   447     """step consisting in inserting new entities / relations"""
       
   448 
       
   449     def execute(self):
       
   450         """execute this step"""
       
   451         for step in self.children:
       
   452             assert isinstance(step, InsertRelationsStep)
       
   453             step.plan = self.plan
       
   454             step.execute()
       
   455         # insert entities first
       
   456         result = self.plan.insert_entity_defs()
       
   457         # then relation
       
   458         self.plan.insert_relation_defs()
       
   459         # return eids of inserted entities
       
   460         return result
       
   461 
       
   462 
       
   463 class DeleteEntitiesStep(Step):
       
   464     """step consisting in deleting entities"""
       
   465 
       
   466     def execute(self):
       
   467         """execute this step"""
       
   468         results = self.execute_child()
       
   469         if results:
       
   470             todelete = frozenset(int(eid) for eid, in results)
       
   471             cnx = self.plan.cnx
       
   472             cnx.repo.glob_delete_entities(cnx, todelete)
       
   473         return results
       
   474 
       
   475 class DeleteRelationsStep(Step):
       
   476     """step consisting in deleting relations"""
       
   477 
       
   478     def __init__(self, plan, rtype):
       
   479         Step.__init__(self, plan)
       
   480         self.rtype = rtype
       
   481 
       
   482     def execute(self):
       
   483         """execute this step"""
       
   484         cnx = self.plan.cnx
       
   485         delete = cnx.repo.glob_delete_relation
       
   486         for subj, obj in self.execute_child():
       
   487             delete(cnx, subj, self.rtype, obj)
       
   488 
       
   489 
       
   490 class UpdateStep(Step):
       
   491     """step consisting in updating entities / adding relations from relations
       
   492     definitions and from results fetched in previous step
       
   493     """
       
   494 
       
   495     def __init__(self, plan, updatedefs):
       
   496         Step.__init__(self, plan)
       
   497         self.updatedefs = updatedefs
       
   498 
       
   499     def execute(self):
       
   500         """execute this step"""
       
   501         cnx = self.plan.cnx
       
   502         repo = cnx.repo
       
   503         edefs = {}
       
   504         relations = {}
       
   505         # insert relations
       
   506         if self.children:
       
   507             result = self.execute_child()
       
   508         else:
       
   509             result = [[]]
       
   510         for i, row in enumerate(result):
       
   511             newrow = []
       
   512             for (lhsinfo, rhsinfo, rschema) in self.updatedefs:
       
   513                 lhsval = _handle_relterm(lhsinfo, row, newrow)
       
   514                 rhsval = _handle_relterm(rhsinfo, row, newrow)
       
   515                 if rschema.final or rschema.inlined:
       
   516                     eid = int(lhsval)
       
   517                     try:
       
   518                         edited = edefs[eid]
       
   519                     except KeyError:
       
   520                         edef = cnx.entity_from_eid(eid)
       
   521                         edefs[eid] = edited = EditedEntity(edef)
       
   522                     edited.edited_attribute(str(rschema), rhsval)
       
   523                 else:
       
   524                     str_rschema = str(rschema)
       
   525                     if str_rschema in relations:
       
   526                         relations[str_rschema].append((lhsval, rhsval))
       
   527                     else:
       
   528                         relations[str_rschema] = [(lhsval, rhsval)]
       
   529             result[i] = newrow
       
   530         # update entities
       
   531         repo.glob_add_relations(cnx, relations)
       
   532         for eid, edited in edefs.items():
       
   533             repo.glob_update_entity(cnx, edited)
       
   534         return result
       
   535 
       
   536 def _handle_relterm(info, row, newrow):
       
   537     if info[0] is _CONSTANT:
       
   538         val = info[1]
       
   539     else: # _FROM_SUBSTEP
       
   540         val = row[info[1]]
       
   541     if info[-1] is not None:
       
   542         newrow.append(val)
       
   543     return val