changeset 0 b97547f5f1fa
child 1016 26387b836099
equal deleted inserted replaced
-1:000000000000 0:b97547f5f1fa
     1 """plan execution of rql queries on a single source
     3 :organization: Logilab
     4 :copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     5 :contact: --
     6 """
     7 __docformat__ = "restructuredtext en"
     9 from copy import copy
    11 from rql.stmts import Union, Select
    12 from rql.nodes import Constant
    14 from cubicweb import QueryError, typed_eid
    16 def add_types_restriction(schema, rqlst, newroot=None, solutions=None):
    17     if newroot is None:
    18         assert solutions is None
    19         if hasattr(rqlst, '_types_restr_added'):
    20             return
    21         solutions =
    22         newroot = rqlst
    23         rqlst._types_restr_added = True
    24     else:
    25         assert solutions is not None
    26         rqlst = rqlst.stmt
    27     eschema = schema.eschema
    28     allpossibletypes = {}
    29     for solution in solutions:
    30         for varname, etype in solution.iteritems():
    31             if not varname in newroot.defined_vars or eschema(etype).is_final():
    32                 continue
    33             allpossibletypes.setdefault(varname, set()).add(etype)
    34     for varname in sorted(allpossibletypes):
    35         try:
    36             var = newroot.defined_vars[varname]
    37         except KeyError:
    38             continue
    39         stinfo = var.stinfo
    40         if stinfo.get('uidrels'):
    41             continue # eid specified, no need for additional type specification
    42         try:
    43             typerels = rqlst.defined_vars[varname].stinfo.get('typerels')
    44         except KeyError:
    45             assert varname in rqlst.aliases
    46             continue
    47         if newroot is rqlst and typerels:
    48             mytyperel = iter(typerels).next()
    49         else:
    50             for vref in newroot.defined_vars[varname].references():
    51                 rel = vref.relation()
    52                 if rel and rel.is_types_restriction():
    53                     mytyperel = rel
    54                     break
    55             else:
    56                 mytyperel = None
    57         possibletypes = allpossibletypes[varname]
    58         if mytyperel is not None:
    59             # variable as already some types restriction. new possible types
    60             # can only be a subset of existing ones, so only remove no more
    61             # possible types
    62             for cst in mytyperel.get_nodes(Constant):
    63                 if not cst.value in possibletypes:
    64                     cst.parent.remove(cst)
    65                     try:
    66                         stinfo['possibletypes'].remove(cst.value)
    67                     except KeyError:
    68                         # restriction on a type not used by this query, may
    69                         # occurs with X is IN(...)
    70                         pass
    71         else:
    72             # we have to add types restriction
    73             if stinfo.get('scope') is not None:
    74                 rel = var.scope.add_type_restriction(var, possibletypes)
    75             else:
    76                 # tree is not annotated yet, no scope set so add the restriction
    77                 # to the root
    78                 rel = newroot.add_type_restriction(var, possibletypes)
    79             stinfo['typerels'] = frozenset((rel,))
    80             stinfo['possibletypes'] = possibletypes
    82 class SSPlanner(object):
    83     """SingleSourcePlanner: build execution plan for rql queries
    85     optimized for single source repositories
    86     """
    88     def __init__(self, schema, rqlhelper):
    89         self.schema = schema
    90         self.rqlhelper = rqlhelper
    92     def build_plan(self, plan):
    93         """build an execution plan from a RQL query
    95         do nothing here, dispatch according to the statement type
    96         """
    97         build_plan = getattr(self, 'build_%s_plan' % plan.rqlst.TYPE)
    98         for step in build_plan(plan, plan.rqlst):
    99             plan.add_step(step)
   101     def build_select_plan(self, plan, rqlst):
   102         """build execution plan for a SELECT RQL query. Suppose only one source
   103         is available and so avoid work need for query decomposition among sources
   105         the rqlst should not be tagged at this point.
   106         """
   107         plan.preprocess(rqlst)
   108         return (OneFetchStep(plan, rqlst, plan.session.repo.sources),)
   110     def build_insert_plan(self, plan, rqlst):
   111         """get an execution plan from an INSERT RQL query"""
   112         # each variable in main variables is a new entity to insert
   113         to_build = {}
   114         session = plan.session
   115         for etype, var in rqlst.main_variables:
   116             # need to do this since entity class is shared w. web client code !
   117             to_build[] = session.etype_class(etype)(session, None, None)
   118             plan.add_entity_def(to_build[])
   119         # add constant values to entity def, mark variables to be selected
   120         to_select = plan.relation_definitions(rqlst, to_build)
   121         # add necessary steps to add relations and update attributes
   122         step = InsertStep(plan) # insert each entity and its relations
   123         step.children += self._compute_relation_steps(plan,,
   124                                                       rqlst.where, to_select)
   125         return (step,)
   127     def _compute_relation_steps(self, plan, solutions, restriction, to_select):
   128         """handle the selection of relations for an insert query"""
   129         for edef, rdefs in to_select.items():
   130             # create a select rql st to fetch needed data
   131             select = Select()
   132             eschema = edef.e_schema
   133             for i in range(len(rdefs)):
   134                 rtype, term, reverse = rdefs[i]
   135                 select.append_selected(term.copy(select))
   136                 if reverse:
   137                     rdefs[i] = rtype, RelationsStep.REVERSE_RELATION
   138                 else:
   139                     rschema = eschema.subject_relation(rtype)
   140                     if rschema.is_final() or rschema.inlined:
   141                         rdefs[i] = rtype, RelationsStep.FINAL
   142                     else:
   143                         rdefs[i] = rtype, RelationsStep.RELATION
   144             if restriction is not None:
   145                 select.set_where(restriction.copy(select))
   146             step = RelationsStep(plan, edef, rdefs)
   147             step.children += self._select_plan(plan, select, solutions)
   148             yield step
   150     def build_delete_plan(self, plan, rqlst):
   151         """get an execution plan from a DELETE RQL query"""
   152         # build a select query to fetch entities to delete
   153         steps = []
   154         for etype, var in rqlst.main_variables:
   155             step = DeleteEntitiesStep(plan)
   156             step.children += self._sel_variable_step(plan,,
   157                                                      rqlst.where, etype, var)
   158             steps.append(step)
   159         for relation in rqlst.main_relations:
   160             step = DeleteRelationsStep(plan, relation.r_type)
   161             step.children += self._sel_relation_steps(plan,,
   162                                                       rqlst.where, relation)
   163             steps.append(step)
   164         return steps
   166     def _sel_variable_step(self, plan, solutions, restriction, etype, varref):
   167         """handle the selection of variables for a delete query"""
   168         select = Select()
   169         varref = varref.copy(select)
   170         select.defined_vars = { varref.variable}
   171         select.append_selected(varref)
   172         if restriction is not None:
   173             select.set_where(restriction.copy(select))
   174         if etype != 'Any':
   175             select.add_type_restriction(varref.variable, etype)
   176         return self._select_plan(plan, select, solutions)
   178     def _sel_relation_steps(self, plan, solutions, restriction, relation):
   179         """handle the selection of relations for a delete query"""
   180         select = Select()
   181         lhs, rhs = relation.get_variable_parts()
   182         select.append_selected(lhs.copy(select))
   183         select.append_selected(rhs.copy(select))
   184         select.set_where(relation.copy(select))
   185         if restriction is not None:
   186             select.add_restriction(restriction.copy(select))
   187         return self._select_plan(plan, select, solutions)
   189     def build_set_plan(self, plan, rqlst):
   190         """get an execution plan from an SET RQL query"""
   191         select = Select()
   192         # extract variables to add to the selection
   193         selected_index = {}
   194         index = 0
   195         relations, attrrelations = [], []
   196         getrschema = self.schema.rschema
   197         for relation in rqlst.main_relations:
   198             if relation.r_type in ('eid', 'has_text', 'identity'):
   199                 raise QueryError('can not assign to %r relation'
   200                                  % relation.r_type)
   201             lhs, rhs = relation.get_variable_parts()
   202             if not lhs.as_string('utf-8') in selected_index:
   203                 select.append_selected(lhs.copy(select))
   204                 selected_index[lhs.as_string('utf-8')] = index
   205                 index += 1
   206             if not rhs.as_string('utf-8') in selected_index:
   207                 select.append_selected(rhs.copy(select))
   208                 selected_index[rhs.as_string('utf-8')] = index
   209                 index += 1
   210             rschema = getrschema(relation.r_type)
   211             if rschema.is_final() or rschema.inlined:
   212                 attrrelations.append(relation)
   213             else:
   214                 relations.append(relation)
   215         # add step necessary to fetch all selected variables values
   216         if rqlst.where is not None:
   217             select.set_where(rqlst.where.copy(select))
   218         # set distinct to avoid potential duplicate key error
   219         select.distinct = True
   220         step = UpdateStep(plan, attrrelations, relations, selected_index)
   221         step.children += self._select_plan(plan, select,
   222         return (step,)
   224     # internal methods ########################################################
   226     def _select_plan(self, plan, select, solutions):
   227         union = Union()
   228         union.append(select)
   229         select.clean_solutions(solutions)
   230         add_types_restriction(self.schema, select)        
   231         self.rqlhelper.annotate(union)
   232         return self.build_select_plan(plan, union)
   235 # execution steps and helper functions ########################################
   237 def varmap_test_repr(varmap, tablesinorder):
   238     if varmap is None:
   239         return varmap
   240     maprepr = {}
   241     for var, sql in varmap.iteritems():
   242         table, col = sql.split('.')
   243         maprepr[var] = '%s.%s' % (tablesinorder[table], col)
   244     return maprepr
   246 def offset_result(offset, result):
   247     offset -= len(result)
   248     if offset < 0:
   249         result = result[offset:]
   250         offset = None
   251     elif offset == 0:
   252         offset = None
   253         result = ()
   254     return offset, result
   257 class LimitOffsetMixIn(object):
   258     limit = offset = None
   259     def set_limit_offset(self, limit, offset):
   260         self.limit = limit
   261         self.offset = offset or None
   264 class Step(object):
   265     """base abstract class for execution step"""
   266     def __init__(self, plan):
   267         self.plan = plan
   268         self.children = []
   270     def execute_child(self):
   271         assert len(self.children) == 1
   272         return self.children[0].execute()
   274     def execute_children(self):
   275         for step in self.children:
   276             step.execute()
   278     def execute(self):
   279         """execute this step and store partial (eg this step) results"""
   280         raise NotImplementedError()
   282     def mytest_repr(self):
   283         """return a representation of this step suitable for test"""
   284         return (self.__class__.__name__,)
   286     def test_repr(self):
   287         """return a representation of this step suitable for test"""
   288         return self.mytest_repr() + (
   289             [step.test_repr() for step in self.children],)
   292 class OneFetchStep(LimitOffsetMixIn, Step):
   293     """step consisting in fetching data from sources and directly returning
   294     results
   295     """
   296     def __init__(self, plan, union, sources, inputmap=None):
   297         Step.__init__(self, plan)
   298         self.union = union
   299         self.sources = sources
   300         self.inputmap = inputmap
   301         self.set_limit_offset(union.children[-1].limit, union.children[-1].offset)
   303     def set_limit_offset(self, limit, offset):
   304         LimitOffsetMixIn.set_limit_offset(self, limit, offset)
   305         for select in self.union.children:
   306             select.limit = limit
   307             select.offset = offset
   309     def execute(self):
   310         """call .syntax_tree_search with the given syntax tree on each
   311         source for each solution
   312         """
   313         self.execute_children()
   314         session = self.plan.session
   315         args = self.plan.args
   316         inputmap = self.inputmap
   317         union = self.union
   318         # do we have to use a inputmap from a previous step ? If so disable
   319         # cachekey
   320         if inputmap or self.plan.cache_key is None:
   321             cachekey = None
   322         # union may have been splited into subqueries, rebuild a cache key
   323         elif isinstance(self.plan.cache_key, tuple):
   324             cachekey = list(self.plan.cache_key)
   325             cachekey[0] = union.as_string()
   326             cachekey = tuple(cachekey)
   327         else:
   328             cachekey = union.as_string()
   329         result = []
   330         # limit / offset processing
   331         limit = self.limit
   332         offset = self.offset
   333         if offset is not None:
   334             if len(self.sources) > 1:
   335                 # we'll have to deal with limit/offset by ourself
   336                 if union.children[-1].limit:
   337                     union.children[-1].limit = limit + offset
   338                 union.children[-1].offset = None
   339             else:
   340                 offset, limit = None, None
   341         for source in self.sources:
   342             if offset is None and limit is not None:
   343                 # modifying the sample rqlst is enough since sql generation
   344                 # will pick it here as well
   345                 union.children[-1].limit = limit - len(result)
   346             result_ = source.syntax_tree_search(session, union, args, cachekey,
   347                                                 inputmap)
   348             if offset is not None:
   349                 offset, result_ = offset_result(offset, result_)
   350             result += result_
   351             if limit is not None:
   352                 if len(result) >= limit:
   353                     return result[:limit]
   354         #print 'ONEFETCH RESULT %s' % (result)
   355         return result
   357     def mytest_repr(self):
   358         """return a representation of this step suitable for test"""
   359         try:
   360             inputmap = varmap_test_repr(self.inputmap, self.plan.tablesinorder)
   361         except AttributeError:
   362             inputmap = self.inputmap
   363         return (self.__class__.__name__,
   364                 sorted((r.as_string(kwargs=self.plan.args),
   365                        for r in self.union.children),
   366                 self.limit, self.offset,
   367                 sorted(self.sources), inputmap)
   370 # UPDATE/INSERT/DELETE steps ##################################################
   372 class RelationsStep(Step):
   373     """step consisting in adding attributes/relations to entity defs from a
   374     previous FetchStep
   376     relations values comes from the latest result, with one columns for
   377     each relation defined in self.r_defs
   379     for one entity definition, we'll construct N entity, where N is the
   380     number of the latest result
   381     """
   383     FINAL = 0
   384     RELATION = 1
   385     REVERSE_RELATION = 2
   387     def __init__(self, plan, e_def, r_defs):
   388         Step.__init__(self, plan)
   389         # partial entity definition to expand
   390         self.e_def = e_def
   391         # definition of relations to complete
   392         self.r_defs = r_defs
   394     def execute(self):
   395         """execute this step"""
   396         base_e_def = self.e_def
   397         result = []
   398         for row in self.execute_child():
   399             # get a new entity definition for this row
   400             e_def = copy(base_e_def)
   401             # complete this entity def using row values
   402             for i in range(len(self.r_defs)):
   403                 rtype, rorder = self.r_defs[i]
   404                 if rorder == RelationsStep.FINAL:
   405                     e_def[rtype] = row[i]
   406                 elif rorder == RelationsStep.RELATION:
   407                     self.plan.add_relation_def( (e_def, rtype, row[i]) )
   408                     e_def.querier_pending_relations[(rtype, 'subject')] = row[i]
   409                 else:
   410                     self.plan.add_relation_def( (row[i], rtype, e_def) )
   411                     e_def.querier_pending_relations[(rtype, 'object')] = row[i]
   412             result.append(e_def)
   413         self.plan.substitute_entity_def(base_e_def, result)
   416 class InsertStep(Step):
   417     """step consisting in inserting new entities / relations"""
   419     def execute(self):
   420         """execute this step"""
   421         for step in self.children:
   422             assert isinstance(step, RelationsStep)
   423             step.plan = self.plan
   424             step.execute()
   425         # insert entities first
   426         result = self.plan.insert_entity_defs()
   427         # then relation
   428         self.plan.insert_relation_defs()
   429         # return eids of inserted entities
   430         return result
   433 class DeleteEntitiesStep(Step):
   434     """step consisting in deleting entities"""
   436     def execute(self):
   437         """execute this step"""
   438         todelete = frozenset(typed_eid(eid) for eid, in self.execute_child())
   439         session = self.plan.session
   440         delete = session.repo.glob_delete_entity
   441         # register pending eids first to avoid multiple deletion
   442         pending = session.query_data('pendingeids', set(), setdefault=True)
   443         actual = todelete - pending
   444         pending |= actual
   445         for eid in actual:
   446             delete(session, eid)
   449 class DeleteRelationsStep(Step):
   450     """step consisting in deleting relations"""
   452     def __init__(self, plan, rtype):
   453         Step.__init__(self, plan)
   454         self.rtype = rtype
   456     def execute(self):
   457         """execute this step"""
   458         session = self.plan.session
   459         delete = session.repo.glob_delete_relation
   460         for subj, obj in self.execute_child():
   461             delete(session, subj, self.rtype, obj)
   464 class UpdateStep(Step):
   465     """step consisting in updating entities / adding relations from relations
   466     definitions and from results fetched in previous step
   467     """
   469     def __init__(self, plan, attribute_relations, relations, selected_index):
   470         Step.__init__(self, plan)
   471         self.attribute_relations = attribute_relations
   472         self.relations = relations
   473         self.selected_index = selected_index
   475     def execute(self):
   476         """execute this step"""
   477         plan = self.plan
   478         session = self.plan.session
   479         repo = session.repo
   480         edefs = {}
   481         # insert relations
   482         for row in self.execute_child():
   483             for relation in self.attribute_relations:
   484                 lhs, rhs = relation.get_variable_parts()
   485                 eid = typed_eid(row[self.selected_index[str(lhs)]])
   486                 try:
   487                     edef = edefs[eid]
   488                 except KeyError:
   489                     edefs[eid] = edef = session.eid_rset(eid).get_entity(0, 0)
   490                 if isinstance(rhs, Constant):
   491                     # add constant values to entity def
   492                     value = rhs.eval(plan.args)
   493                     edef[relation.r_type] = value
   494                 else:
   495                     edef[relation.r_type] = row[self.selected_index[str(rhs)]]
   496             for relation in self.relations:
   497                 subj = row[self.selected_index[str(relation.children[0])]]
   498                 obj = row[self.selected_index[str(relation.children[1])]]
   499                 repo.glob_add_relation(session, subj, relation.r_type, obj)
   500         # update entities
   501         result = []
   502         for eid, edef in edefs.iteritems():
   503             repo.glob_update_entity(session, edef)
   504             result.append( (eid,) )
   505         return result