server/mssteps.py
branchtls-sprint
changeset 1802 d628defebc17
parent 470 5d4a9db36738
child 1977 606923dff11b
equal deleted inserted replaced
1801:672acc730ce5 1802:d628defebc17
    14 from rql.nodes import VariableRef, Variable, Function
    14 from rql.nodes import VariableRef, Variable, Function
    15 
    15 
    16 from cubicweb.server.ssplanner import (LimitOffsetMixIn, Step, OneFetchStep,
    16 from cubicweb.server.ssplanner import (LimitOffsetMixIn, Step, OneFetchStep,
    17                                     varmap_test_repr, offset_result)
    17                                     varmap_test_repr, offset_result)
    18 
    18 
    19 AGGR_TRANSFORMS = {'COUNT':'SUM', 'MIN':'MIN', 'MAX':'MAX', 'SUM': 'SUM'} 
    19 AGGR_TRANSFORMS = {'COUNT':'SUM', 'MIN':'MIN', 'MAX':'MAX', 'SUM': 'SUM'}
    20 
    20 
    21 def remove_clauses(union, keepgroup):
    21 def remove_clauses(union, keepgroup):
    22     clauses = []
    22     clauses = []
    23     for select in union.children:
    23     for select in union.children:
    24         if keepgroup:
    24         if keepgroup:
    71                     if rschema(rel.r_type).inlined:
    71                     if rschema(rel.r_type).inlined:
    72                         lhsvar = rel.children[0]
    72                         lhsvar = rel.children[0]
    73                         if lhsvar.name in srqlst.defined_vars:
    73                         if lhsvar.name in srqlst.defined_vars:
    74                             key = '%s.%s' % (lhsvar.name, rel.r_type)
    74                             key = '%s.%s' % (lhsvar.name, rel.r_type)
    75                             self.outputmap[key] = self.outputmap[var.name]
    75                             self.outputmap[key] = self.outputmap[var.name]
    76                 
    76 
    77     def execute(self):
    77     def execute(self):
    78         """execute this step"""
    78         """execute this step"""
    79         self.execute_children()
    79         self.execute_children()
    80         plan = self.plan
    80         plan = self.plan
    81         plan.create_temp_table(self.table)
    81         plan.create_temp_table(self.table)
    84         clauses = remove_clauses(union, self.keepgroup)
    84         clauses = remove_clauses(union, self.keepgroup)
    85         for source in self.sources:
    85         for source in self.sources:
    86             source.flying_insert(self.table, plan.session, union, plan.args,
    86             source.flying_insert(self.table, plan.session, union, plan.args,
    87                                  self.inputmap)
    87                                  self.inputmap)
    88         restore_clauses(union, self.keepgroup, clauses)
    88         restore_clauses(union, self.keepgroup, clauses)
    89             
    89 
    90     def mytest_repr(self):
    90     def mytest_repr(self):
    91         """return a representation of this step suitable for test"""
    91         """return a representation of this step suitable for test"""
    92         clauses = remove_clauses(self.union, self.keepgroup)
    92         clauses = remove_clauses(self.union, self.keepgroup)
    93         try:
    93         try:
    94             inputmap = varmap_test_repr(self.inputmap, self.plan.tablesinorder)
    94             inputmap = varmap_test_repr(self.inputmap, self.plan.tablesinorder)
   102                        for r in self.union.children),
   102                        for r in self.union.children),
   103                 sorted(self.sources), inputmap, outputmap)
   103                 sorted(self.sources), inputmap, outputmap)
   104         finally:
   104         finally:
   105             restore_clauses(self.union, self.keepgroup, clauses)
   105             restore_clauses(self.union, self.keepgroup, clauses)
   106 
   106 
   107     
   107 
   108 class AggrStep(LimitOffsetMixIn, Step):
   108 class AggrStep(LimitOffsetMixIn, Step):
   109     """step consisting in making aggregat from temporary data in the system
   109     """step consisting in making aggregat from temporary data in the system
   110     source
   110     source
   111     """
   111     """
   112     def __init__(self, plan, selection, select, table, outputtable=None):
   112     def __init__(self, plan, selection, select, table, outputtable=None):
   121         self.outputtable = outputtable
   121         self.outputtable = outputtable
   122         if outputtable is not None:
   122         if outputtable is not None:
   123             plan.init_temp_table(outputtable, selection, select.solutions[0])
   123             plan.init_temp_table(outputtable, selection, select.solutions[0])
   124 
   124 
   125         #self.inputmap = inputmap
   125         #self.inputmap = inputmap
   126         
   126 
   127     def mytest_repr(self):
   127     def mytest_repr(self):
   128         """return a representation of this step suitable for test"""
   128         """return a representation of this step suitable for test"""
   129         sel = self.select.selection
   129         sel = self.select.selection
   130         restr = self.select.where
   130         restr = self.select.where
   131         self.select.selection = self.selection
   131         self.select.selection = self.selection
   165                     orig_children = term.children
   165                     orig_children = term.children
   166                     term.children = [VariableRef(Variable(var_name))]
   166                     term.children = [VariableRef(Variable(var_name))]
   167                     clause.append(term.accept(self))
   167                     clause.append(term.accept(self))
   168                     # restaure the tree XXX necessary?
   168                     # restaure the tree XXX necessary?
   169                     term.name = orig_name
   169                     term.name = orig_name
   170                     term.children = orig_children                
   170                     term.children = orig_children
   171                 except KeyError:
   171                 except KeyError:
   172                     clause.append(var_name)
   172                     clause.append(var_name)
   173             else:
   173             else:
   174                 clause.append(var_name)
   174                 clause.append(var_name)
   175                 for vref in term.iget_nodes(VariableRef):
   175                 for vref in term.iget_nodes(VariableRef):
   213         sql = ' '.join(sql)
   213         sql = ' '.join(sql)
   214         if self.outputtable:
   214         if self.outputtable:
   215             self.plan.create_temp_table(self.outputtable)
   215             self.plan.create_temp_table(self.outputtable)
   216             sql = 'INSERT INTO %s %s' % (self.outputtable, sql)
   216             sql = 'INSERT INTO %s %s' % (self.outputtable, sql)
   217         return self.plan.sqlexec(sql, self.plan.args)
   217         return self.plan.sqlexec(sql, self.plan.args)
   218     
   218 
   219     def visit_function(self, function):
   219     def visit_function(self, function):
   220         """generate SQL name for a function"""
   220         """generate SQL name for a function"""
   221         return '%s(%s)' % (function.name,
   221         return '%s(%s)' % (function.name,
   222                            ','.join(c.accept(self) for c in function.children))
   222                            ','.join(c.accept(self) for c in function.children))
   223         
   223 
   224     def visit_variableref(self, variableref):
   224     def visit_variableref(self, variableref):
   225         """get the sql name for a variable reference"""
   225         """get the sql name for a variable reference"""
   226         try:
   226         try:
   227             return self.inputmap[variableref.name]
   227             return self.inputmap[variableref.name]
   228         except KeyError: # XXX duh? explain
   228         except KeyError: # XXX duh? explain
   229             return variableref.variable.name
   229             return variableref.variable.name
   230         
   230 
   231     def visit_constant(self, constant):
   231     def visit_constant(self, constant):
   232         """generate SQL name for a constant"""
   232         """generate SQL name for a constant"""
   233         assert constant.type == 'Int'
   233         assert constant.type == 'Int'
   234         return str(constant.value)
   234         return str(constant.value)
   235     
   235 
   236 
   236 
   237 class UnionStep(LimitOffsetMixIn, Step):
   237 class UnionStep(LimitOffsetMixIn, Step):
   238     """union results of child in-memory steps (e.g. OneFetchStep / AggrStep)"""
   238     """union results of child in-memory steps (e.g. OneFetchStep / AggrStep)"""
   239         
   239 
   240     def execute(self):
   240     def execute(self):
   241         """execute this step"""
   241         """execute this step"""
   242         result = []
   242         result = []
   243         limit = olimit = self.limit
   243         limit = olimit = self.limit
   244         offset = self.offset
   244         offset = self.offset
   256             result += result_
   256             result += result_
   257             if limit is not None:
   257             if limit is not None:
   258                 if len(result) >= olimit:
   258                 if len(result) >= olimit:
   259                     return result[:olimit]
   259                     return result[:olimit]
   260         return result
   260         return result
   261         
   261 
   262     def mytest_repr(self):
   262     def mytest_repr(self):
   263         """return a representation of this step suitable for test"""
   263         """return a representation of this step suitable for test"""
   264         return (self.__class__.__name__, self.limit, self.offset)
   264         return (self.__class__.__name__, self.limit, self.offset)
   265 
   265 
   266 
   266 
   267 class IntersectStep(UnionStep):
   267 class IntersectStep(UnionStep):
   268     """return intersection of results of child in-memory steps (e.g. OneFetchStep / AggrStep)"""
   268     """return intersection of results of child in-memory steps (e.g. OneFetchStep / AggrStep)"""
   269         
   269 
   270     def execute(self):
   270     def execute(self):
   271         """execute this step"""
   271         """execute this step"""
   272         result = set()
   272         result = set()
   273         for step in self.children:
   273         for step in self.children:
   274             result &= frozenset(step.execute())
   274             result &= frozenset(step.execute())