server/ssplanner.py
changeset 0 b97547f5f1fa
child 1016 26387b836099
equal deleted inserted replaced
-1:000000000000 0:b97547f5f1fa
       
     1 """plan execution of rql queries on a single source
       
     2 
       
     3 :organization: Logilab
       
     4 :copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     6 """
       
     7 __docformat__ = "restructuredtext en"
       
     8 
       
     9 from copy import copy
       
    10 
       
    11 from rql.stmts import Union, Select
       
    12 from rql.nodes import Constant
       
    13 
       
    14 from cubicweb import QueryError, typed_eid
       
    15 
       
    16 def add_types_restriction(schema, rqlst, newroot=None, solutions=None):
       
    17     if newroot is None:
       
    18         assert solutions is None
       
    19         if hasattr(rqlst, '_types_restr_added'):
       
    20             return
       
    21         solutions = rqlst.solutions
       
    22         newroot = rqlst
       
    23         rqlst._types_restr_added = True
       
    24     else:
       
    25         assert solutions is not None
       
    26         rqlst = rqlst.stmt
       
    27     eschema = schema.eschema
       
    28     allpossibletypes = {}
       
    29     for solution in solutions:
       
    30         for varname, etype in solution.iteritems():
       
    31             if not varname in newroot.defined_vars or eschema(etype).is_final():
       
    32                 continue
       
    33             allpossibletypes.setdefault(varname, set()).add(etype)
       
    34     for varname in sorted(allpossibletypes):
       
    35         try:
       
    36             var = newroot.defined_vars[varname]
       
    37         except KeyError:
       
    38             continue
       
    39         stinfo = var.stinfo
       
    40         if stinfo.get('uidrels'):
       
    41             continue # eid specified, no need for additional type specification
       
    42         try:
       
    43             typerels = rqlst.defined_vars[varname].stinfo.get('typerels')
       
    44         except KeyError:
       
    45             assert varname in rqlst.aliases
       
    46             continue
       
    47         if newroot is rqlst and typerels:
       
    48             mytyperel = iter(typerels).next()
       
    49         else:
       
    50             for vref in newroot.defined_vars[varname].references():
       
    51                 rel = vref.relation()
       
    52                 if rel and rel.is_types_restriction():
       
    53                     mytyperel = rel
       
    54                     break
       
    55             else:
       
    56                 mytyperel = None
       
    57         possibletypes = allpossibletypes[varname]
       
    58         if mytyperel is not None:
       
    59             # variable as already some types restriction. new possible types
       
    60             # can only be a subset of existing ones, so only remove no more
       
    61             # possible types
       
    62             for cst in mytyperel.get_nodes(Constant):
       
    63                 if not cst.value in possibletypes:
       
    64                     cst.parent.remove(cst)
       
    65                     try:
       
    66                         stinfo['possibletypes'].remove(cst.value)
       
    67                     except KeyError:
       
    68                         # restriction on a type not used by this query, may
       
    69                         # occurs with X is IN(...)
       
    70                         pass
       
    71         else:
       
    72             # we have to add types restriction
       
    73             if stinfo.get('scope') is not None:
       
    74                 rel = var.scope.add_type_restriction(var, possibletypes)
       
    75             else:
       
    76                 # tree is not annotated yet, no scope set so add the restriction
       
    77                 # to the root
       
    78                 rel = newroot.add_type_restriction(var, possibletypes)
       
    79             stinfo['typerels'] = frozenset((rel,))
       
    80             stinfo['possibletypes'] = possibletypes
       
    81         
       
    82 class SSPlanner(object):
       
    83     """SingleSourcePlanner: build execution plan for rql queries
       
    84 
       
    85     optimized for single source repositories
       
    86     """
       
    87     
       
    88     def __init__(self, schema, rqlhelper):
       
    89         self.schema = schema
       
    90         self.rqlhelper = rqlhelper
       
    91 
       
    92     def build_plan(self, plan):
       
    93         """build an execution plan from a RQL query
       
    94         
       
    95         do nothing here, dispatch according to the statement type
       
    96         """
       
    97         build_plan = getattr(self, 'build_%s_plan' % plan.rqlst.TYPE)
       
    98         for step in build_plan(plan, plan.rqlst):
       
    99             plan.add_step(step)
       
   100     
       
   101     def build_select_plan(self, plan, rqlst):
       
   102         """build execution plan for a SELECT RQL query. Suppose only one source
       
   103         is available and so avoid work need for query decomposition among sources
       
   104                
       
   105         the rqlst should not be tagged at this point.
       
   106         """
       
   107         plan.preprocess(rqlst)
       
   108         return (OneFetchStep(plan, rqlst, plan.session.repo.sources),)
       
   109             
       
   110     def build_insert_plan(self, plan, rqlst):
       
   111         """get an execution plan from an INSERT RQL query"""
       
   112         # each variable in main variables is a new entity to insert
       
   113         to_build = {}
       
   114         session = plan.session
       
   115         for etype, var in rqlst.main_variables:
       
   116             # need to do this since entity class is shared w. web client code !
       
   117             to_build[var.name] = session.etype_class(etype)(session, None, None)
       
   118             plan.add_entity_def(to_build[var.name])
       
   119         # add constant values to entity def, mark variables to be selected
       
   120         to_select = plan.relation_definitions(rqlst, to_build)
       
   121         # add necessary steps to add relations and update attributes
       
   122         step = InsertStep(plan) # insert each entity and its relations
       
   123         step.children += self._compute_relation_steps(plan, rqlst.solutions,
       
   124                                                       rqlst.where, to_select)
       
   125         return (step,)
       
   126         
       
   127     def _compute_relation_steps(self, plan, solutions, restriction, to_select):
       
   128         """handle the selection of relations for an insert query"""
       
   129         for edef, rdefs in to_select.items():
       
   130             # create a select rql st to fetch needed data
       
   131             select = Select()
       
   132             eschema = edef.e_schema
       
   133             for i in range(len(rdefs)):
       
   134                 rtype, term, reverse = rdefs[i]
       
   135                 select.append_selected(term.copy(select))
       
   136                 if reverse:
       
   137                     rdefs[i] = rtype, RelationsStep.REVERSE_RELATION
       
   138                 else:
       
   139                     rschema = eschema.subject_relation(rtype)
       
   140                     if rschema.is_final() or rschema.inlined:
       
   141                         rdefs[i] = rtype, RelationsStep.FINAL
       
   142                     else:
       
   143                         rdefs[i] = rtype, RelationsStep.RELATION
       
   144             if restriction is not None:
       
   145                 select.set_where(restriction.copy(select))
       
   146             step = RelationsStep(plan, edef, rdefs)
       
   147             step.children += self._select_plan(plan, select, solutions)
       
   148             yield step
       
   149     
       
   150     def build_delete_plan(self, plan, rqlst):
       
   151         """get an execution plan from a DELETE RQL query"""
       
   152         # build a select query to fetch entities to delete
       
   153         steps = []
       
   154         for etype, var in rqlst.main_variables:
       
   155             step = DeleteEntitiesStep(plan)
       
   156             step.children += self._sel_variable_step(plan, rqlst.solutions,
       
   157                                                      rqlst.where, etype, var)
       
   158             steps.append(step)
       
   159         for relation in rqlst.main_relations:
       
   160             step = DeleteRelationsStep(plan, relation.r_type)
       
   161             step.children += self._sel_relation_steps(plan, rqlst.solutions,
       
   162                                                       rqlst.where, relation)
       
   163             steps.append(step)
       
   164         return steps
       
   165 
       
   166     def _sel_variable_step(self, plan, solutions, restriction, etype, varref):
       
   167         """handle the selection of variables for a delete query"""
       
   168         select = Select()
       
   169         varref = varref.copy(select)
       
   170         select.defined_vars = {varref.name: varref.variable}
       
   171         select.append_selected(varref)
       
   172         if restriction is not None:
       
   173             select.set_where(restriction.copy(select))
       
   174         if etype != 'Any':
       
   175             select.add_type_restriction(varref.variable, etype)
       
   176         return self._select_plan(plan, select, solutions)
       
   177         
       
   178     def _sel_relation_steps(self, plan, solutions, restriction, relation):
       
   179         """handle the selection of relations for a delete query"""
       
   180         select = Select()
       
   181         lhs, rhs = relation.get_variable_parts()
       
   182         select.append_selected(lhs.copy(select))
       
   183         select.append_selected(rhs.copy(select))
       
   184         select.set_where(relation.copy(select))
       
   185         if restriction is not None:
       
   186             select.add_restriction(restriction.copy(select))
       
   187         return self._select_plan(plan, select, solutions)
       
   188     
       
   189     def build_set_plan(self, plan, rqlst):
       
   190         """get an execution plan from an SET RQL query"""
       
   191         select = Select()
       
   192         # extract variables to add to the selection
       
   193         selected_index = {}
       
   194         index = 0
       
   195         relations, attrrelations = [], []
       
   196         getrschema = self.schema.rschema
       
   197         for relation in rqlst.main_relations:
       
   198             if relation.r_type in ('eid', 'has_text', 'identity'):
       
   199                 raise QueryError('can not assign to %r relation'
       
   200                                  % relation.r_type)
       
   201             lhs, rhs = relation.get_variable_parts()
       
   202             if not lhs.as_string('utf-8') in selected_index:
       
   203                 select.append_selected(lhs.copy(select))
       
   204                 selected_index[lhs.as_string('utf-8')] = index
       
   205                 index += 1
       
   206             if not rhs.as_string('utf-8') in selected_index:
       
   207                 select.append_selected(rhs.copy(select))
       
   208                 selected_index[rhs.as_string('utf-8')] = index
       
   209                 index += 1
       
   210             rschema = getrschema(relation.r_type)
       
   211             if rschema.is_final() or rschema.inlined:
       
   212                 attrrelations.append(relation)
       
   213             else:
       
   214                 relations.append(relation)
       
   215         # add step necessary to fetch all selected variables values
       
   216         if rqlst.where is not None:
       
   217             select.set_where(rqlst.where.copy(select))
       
   218         # set distinct to avoid potential duplicate key error
       
   219         select.distinct = True
       
   220         step = UpdateStep(plan, attrrelations, relations, selected_index)
       
   221         step.children += self._select_plan(plan, select, rqlst.solutions)
       
   222         return (step,)
       
   223 
       
   224     # internal methods ########################################################
       
   225     
       
   226     def _select_plan(self, plan, select, solutions):
       
   227         union = Union()
       
   228         union.append(select)
       
   229         select.clean_solutions(solutions)
       
   230         add_types_restriction(self.schema, select)        
       
   231         self.rqlhelper.annotate(union)
       
   232         return self.build_select_plan(plan, union)
       
   233 
       
   234 
       
   235 # execution steps and helper functions ########################################
       
   236 
       
   237 def varmap_test_repr(varmap, tablesinorder):
       
   238     if varmap is None:
       
   239         return varmap
       
   240     maprepr = {}
       
   241     for var, sql in varmap.iteritems():
       
   242         table, col = sql.split('.')
       
   243         maprepr[var] = '%s.%s' % (tablesinorder[table], col)
       
   244     return maprepr
       
   245 
       
   246 def offset_result(offset, result):
       
   247     offset -= len(result)
       
   248     if offset < 0:
       
   249         result = result[offset:]
       
   250         offset = None
       
   251     elif offset == 0:
       
   252         offset = None
       
   253         result = ()
       
   254     return offset, result
       
   255 
       
   256 
       
   257 class LimitOffsetMixIn(object):
       
   258     limit = offset = None
       
   259     def set_limit_offset(self, limit, offset):
       
   260         self.limit = limit
       
   261         self.offset = offset or None
       
   262 
       
   263         
       
   264 class Step(object):
       
   265     """base abstract class for execution step"""
       
   266     def __init__(self, plan):
       
   267         self.plan = plan
       
   268         self.children = []
       
   269         
       
   270     def execute_child(self):
       
   271         assert len(self.children) == 1
       
   272         return self.children[0].execute()
       
   273     
       
   274     def execute_children(self):
       
   275         for step in self.children:
       
   276             step.execute()
       
   277         
       
   278     def execute(self):
       
   279         """execute this step and store partial (eg this step) results"""
       
   280         raise NotImplementedError()
       
   281     
       
   282     def mytest_repr(self):
       
   283         """return a representation of this step suitable for test"""
       
   284         return (self.__class__.__name__,)
       
   285     
       
   286     def test_repr(self):
       
   287         """return a representation of this step suitable for test"""
       
   288         return self.mytest_repr() + (
       
   289             [step.test_repr() for step in self.children],)
       
   290 
       
   291         
       
   292 class OneFetchStep(LimitOffsetMixIn, Step):
       
   293     """step consisting in fetching data from sources and directly returning
       
   294     results
       
   295     """
       
   296     def __init__(self, plan, union, sources, inputmap=None):
       
   297         Step.__init__(self, plan)
       
   298         self.union = union
       
   299         self.sources = sources
       
   300         self.inputmap = inputmap
       
   301         self.set_limit_offset(union.children[-1].limit, union.children[-1].offset)
       
   302 
       
   303     def set_limit_offset(self, limit, offset):
       
   304         LimitOffsetMixIn.set_limit_offset(self, limit, offset)
       
   305         for select in self.union.children:
       
   306             select.limit = limit
       
   307             select.offset = offset
       
   308         
       
   309     def execute(self):
       
   310         """call .syntax_tree_search with the given syntax tree on each
       
   311         source for each solution
       
   312         """
       
   313         self.execute_children()
       
   314         session = self.plan.session
       
   315         args = self.plan.args
       
   316         inputmap = self.inputmap
       
   317         union = self.union
       
   318         # do we have to use a inputmap from a previous step ? If so disable
       
   319         # cachekey
       
   320         if inputmap or self.plan.cache_key is None:
       
   321             cachekey = None
       
   322         # union may have been splited into subqueries, rebuild a cache key
       
   323         elif isinstance(self.plan.cache_key, tuple):
       
   324             cachekey = list(self.plan.cache_key)
       
   325             cachekey[0] = union.as_string()
       
   326             cachekey = tuple(cachekey)
       
   327         else:
       
   328             cachekey = union.as_string()
       
   329         result = []
       
   330         # limit / offset processing
       
   331         limit = self.limit
       
   332         offset = self.offset
       
   333         if offset is not None:
       
   334             if len(self.sources) > 1:
       
   335                 # we'll have to deal with limit/offset by ourself
       
   336                 if union.children[-1].limit:
       
   337                     union.children[-1].limit = limit + offset
       
   338                 union.children[-1].offset = None
       
   339             else:
       
   340                 offset, limit = None, None
       
   341         for source in self.sources:
       
   342             if offset is None and limit is not None:
       
   343                 # modifying the sample rqlst is enough since sql generation
       
   344                 # will pick it here as well
       
   345                 union.children[-1].limit = limit - len(result)
       
   346             result_ = source.syntax_tree_search(session, union, args, cachekey,
       
   347                                                 inputmap)
       
   348             if offset is not None:
       
   349                 offset, result_ = offset_result(offset, result_)
       
   350             result += result_
       
   351             if limit is not None:
       
   352                 if len(result) >= limit:
       
   353                     return result[:limit]
       
   354         #print 'ONEFETCH RESULT %s' % (result)
       
   355         return result
       
   356 
       
   357     def mytest_repr(self):
       
   358         """return a representation of this step suitable for test"""
       
   359         try:
       
   360             inputmap = varmap_test_repr(self.inputmap, self.plan.tablesinorder)
       
   361         except AttributeError:
       
   362             inputmap = self.inputmap
       
   363         return (self.__class__.__name__,
       
   364                 sorted((r.as_string(kwargs=self.plan.args), r.solutions)
       
   365                        for r in self.union.children),
       
   366                 self.limit, self.offset,
       
   367                 sorted(self.sources), inputmap)
       
   368 
       
   369 
       
   370 # UPDATE/INSERT/DELETE steps ##################################################
       
   371 
       
   372 class RelationsStep(Step):
       
   373     """step consisting in adding attributes/relations to entity defs from a
       
   374     previous FetchStep
       
   375 
       
   376     relations values comes from the latest result, with one columns for
       
   377     each relation defined in self.r_defs
       
   378     
       
   379     for one entity definition, we'll construct N entity, where N is the
       
   380     number of the latest result
       
   381     """
       
   382     
       
   383     FINAL = 0
       
   384     RELATION = 1
       
   385     REVERSE_RELATION = 2
       
   386     
       
   387     def __init__(self, plan, e_def, r_defs):
       
   388         Step.__init__(self, plan)
       
   389         # partial entity definition to expand
       
   390         self.e_def = e_def
       
   391         # definition of relations to complete
       
   392         self.r_defs = r_defs
       
   393         
       
   394     def execute(self):
       
   395         """execute this step"""
       
   396         base_e_def = self.e_def
       
   397         result = []
       
   398         for row in self.execute_child():
       
   399             # get a new entity definition for this row
       
   400             e_def = copy(base_e_def)
       
   401             # complete this entity def using row values
       
   402             for i in range(len(self.r_defs)):
       
   403                 rtype, rorder = self.r_defs[i]
       
   404                 if rorder == RelationsStep.FINAL:
       
   405                     e_def[rtype] = row[i]
       
   406                 elif rorder == RelationsStep.RELATION:
       
   407                     self.plan.add_relation_def( (e_def, rtype, row[i]) )
       
   408                     e_def.querier_pending_relations[(rtype, 'subject')] = row[i]
       
   409                 else:
       
   410                     self.plan.add_relation_def( (row[i], rtype, e_def) )
       
   411                     e_def.querier_pending_relations[(rtype, 'object')] = row[i]
       
   412             result.append(e_def)
       
   413         self.plan.substitute_entity_def(base_e_def, result)
       
   414 
       
   415 
       
   416 class InsertStep(Step):
       
   417     """step consisting in inserting new entities / relations"""
       
   418     
       
   419     def execute(self):
       
   420         """execute this step"""
       
   421         for step in self.children:
       
   422             assert isinstance(step, RelationsStep)
       
   423             step.plan = self.plan
       
   424             step.execute()
       
   425         # insert entities first
       
   426         result = self.plan.insert_entity_defs()
       
   427         # then relation
       
   428         self.plan.insert_relation_defs()
       
   429         # return eids of inserted entities
       
   430         return result
       
   431 
       
   432 
       
   433 class DeleteEntitiesStep(Step):
       
   434     """step consisting in deleting entities"""
       
   435 
       
   436     def execute(self):
       
   437         """execute this step"""
       
   438         todelete = frozenset(typed_eid(eid) for eid, in self.execute_child())
       
   439         session = self.plan.session
       
   440         delete = session.repo.glob_delete_entity
       
   441         # register pending eids first to avoid multiple deletion
       
   442         pending = session.query_data('pendingeids', set(), setdefault=True)
       
   443         actual = todelete - pending
       
   444         pending |= actual
       
   445         for eid in actual:
       
   446             delete(session, eid)
       
   447             
       
   448     
       
   449 class DeleteRelationsStep(Step):
       
   450     """step consisting in deleting relations"""
       
   451 
       
   452     def __init__(self, plan, rtype):
       
   453         Step.__init__(self, plan)
       
   454         self.rtype = rtype
       
   455         
       
   456     def execute(self):
       
   457         """execute this step"""
       
   458         session = self.plan.session
       
   459         delete = session.repo.glob_delete_relation
       
   460         for subj, obj in self.execute_child():
       
   461             delete(session, subj, self.rtype, obj)
       
   462     
       
   463 
       
   464 class UpdateStep(Step):
       
   465     """step consisting in updating entities / adding relations from relations
       
   466     definitions and from results fetched in previous step
       
   467     """
       
   468     
       
   469     def __init__(self, plan, attribute_relations, relations, selected_index):
       
   470         Step.__init__(self, plan)
       
   471         self.attribute_relations = attribute_relations
       
   472         self.relations = relations
       
   473         self.selected_index = selected_index
       
   474         
       
   475     def execute(self):
       
   476         """execute this step"""
       
   477         plan = self.plan
       
   478         session = self.plan.session
       
   479         repo = session.repo
       
   480         edefs = {}
       
   481         # insert relations
       
   482         for row in self.execute_child():
       
   483             for relation in self.attribute_relations:
       
   484                 lhs, rhs = relation.get_variable_parts()
       
   485                 eid = typed_eid(row[self.selected_index[str(lhs)]])
       
   486                 try:
       
   487                     edef = edefs[eid]
       
   488                 except KeyError:
       
   489                     edefs[eid] = edef = session.eid_rset(eid).get_entity(0, 0)
       
   490                 if isinstance(rhs, Constant):
       
   491                     # add constant values to entity def
       
   492                     value = rhs.eval(plan.args)
       
   493                     edef[relation.r_type] = value
       
   494                 else:
       
   495                     edef[relation.r_type] = row[self.selected_index[str(rhs)]]
       
   496             for relation in self.relations:
       
   497                 subj = row[self.selected_index[str(relation.children[0])]]
       
   498                 obj = row[self.selected_index[str(relation.children[1])]]
       
   499                 repo.glob_add_relation(session, subj, relation.r_type, obj)
       
   500         # update entities
       
   501         result = []
       
   502         for eid, edef in edefs.iteritems():
       
   503             repo.glob_update_entity(session, edef)
       
   504             result.append( (eid,) )
       
   505         return result