--- a/server/mssteps.py Thu Jun 27 08:30:46 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,309 +0,0 @@
-# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
-# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
-#
-# This file is part of CubicWeb.
-#
-# CubicWeb is free software: you can redistribute it and/or modify it under the
-# terms of the GNU Lesser General Public License as published by the Free
-# Software Foundation, either version 2.1 of the License, or (at your option)
-# any later version.
-#
-# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
-# details.
-#
-# You should have received a copy of the GNU Lesser General Public License along
-# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
-"""Defines the diferent querier steps usable in plans.
-
-FIXME : this code needs refactoring. Some problems :
-* get data from the parent plan, the latest step, temporary table...
-* each step has is own members (this is not necessarily bad, but a bit messy
- for now)
-"""
-__docformat__ = "restructuredtext en"
-
-from rql.nodes import VariableRef, Variable, Function
-
-from cubicweb.server.ssplanner import (LimitOffsetMixIn, Step, OneFetchStep,
- varmap_test_repr, offset_result)
-
-AGGR_TRANSFORMS = {'COUNT':'SUM', 'MIN':'MIN', 'MAX':'MAX', 'SUM': 'SUM'}
-
-class remove_and_restore_clauses(object):
- def __init__(self, union, keepgroup):
- self.union = union
- self.keepgroup = keepgroup
- self.clauses = None
-
- def __enter__(self):
- self.clauses = clauses = []
- for select in self.union.children:
- if self.keepgroup:
- having, orderby = select.having, select.orderby
- select.having, select.orderby = (), ()
- clauses.append( (having, orderby) )
- else:
- groupby, having, orderby = select.groupby, select.having, select.orderby
- select.groupby, select.having, select.orderby = (), (), ()
- clauses.append( (groupby, having, orderby) )
-
- def __exit__(self, exctype, exc, traceback):
- for i, select in enumerate(self.union.children):
- if self.keepgroup:
- select.having, select.orderby = self.clauses[i]
- else:
- select.groupby, select.having, select.orderby = self.clauses[i]
-
-
-class FetchStep(OneFetchStep):
- """step consisting in fetching data from sources, and storing result in
- a temporary table
- """
- def __init__(self, plan, union, sources, table, keepgroup, inputmap=None):
- OneFetchStep.__init__(self, plan, union, sources)
- # temporary table to store step result
- self.table = table
- # should groupby clause be kept or not
- self.keepgroup = keepgroup
- # variables mapping to use as input
- self.inputmap = inputmap
- # output variable mapping
- srqlst = union.children[0] # sample select node
- # add additional information to the output mapping
- self.outputmap = plan.init_temp_table(table, srqlst.selection,
- srqlst.solutions[0])
- for vref in srqlst.selection:
- if not isinstance(vref, VariableRef):
- continue
- var = vref.variable
- if var.stinfo.get('attrvars'):
- for lhsvar, rtype in var.stinfo['attrvars']:
- if lhsvar.name in srqlst.defined_vars:
- key = '%s.%s' % (lhsvar.name, rtype)
- self.outputmap[key] = self.outputmap[var.name]
- else:
- rschema = self.plan.schema.rschema
- for rel in var.stinfo['rhsrelations']:
- if rschema(rel.r_type).inlined:
- lhsvar = rel.children[0]
- if lhsvar.name in srqlst.defined_vars:
- key = '%s.%s' % (lhsvar.name, rel.r_type)
- self.outputmap[key] = self.outputmap[var.name]
-
- def execute(self):
- """execute this step"""
- self.execute_children()
- plan = self.plan
- plan.create_temp_table(self.table)
- union = self.union
- with remove_and_restore_clauses(union, self.keepgroup):
- for source in self.sources:
- source.flying_insert(self.table, plan.session, union, plan.args,
- self.inputmap)
-
- def mytest_repr(self):
- """return a representation of this step suitable for test"""
- with remove_and_restore_clauses(self.union, self.keepgroup):
- try:
- inputmap = varmap_test_repr(self.inputmap, self.plan.tablesinorder)
- outputmap = varmap_test_repr(self.outputmap, self.plan.tablesinorder)
- except AttributeError:
- inputmap = self.inputmap
- outputmap = self.outputmap
- return (self.__class__.__name__,
- sorted((r.as_string(kwargs=self.plan.args), r.solutions)
- for r in self.union.children),
- sorted(self.sources), inputmap, outputmap)
-
-
-class AggrStep(LimitOffsetMixIn, Step):
- """step consisting in making aggregat from temporary data in the system
- source
- """
- def __init__(self, plan, selection, select, table, outputtable=None):
- Step.__init__(self, plan)
- # original selection
- self.selection = selection
- # original Select RQL tree
- self.select = select
- # table where are located temporary results
- self.table = table
- # optional table where to write results
- self.outputtable = outputtable
- if outputtable is not None:
- plan.init_temp_table(outputtable, selection, select.solutions[0])
-
- #self.inputmap = inputmap
-
- def mytest_repr(self):
- """return a representation of this step suitable for test"""
- try:
- # rely on a monkey patch (cf unittest_querier)
- table = self.plan.tablesinorder[self.table]
- outputtable = self.outputtable and self.plan.tablesinorder[self.outputtable]
- except AttributeError:
- # not monkey patched
- table = self.table
- outputtable = self.outputtable
- sql = self.get_sql().replace(self.table, table)
- return (self.__class__.__name__, sql, outputtable)
-
- def execute(self):
- """execute this step"""
- self.execute_children()
- sql = self.get_sql()
- if self.outputtable:
- self.plan.create_temp_table(self.outputtable)
- sql = 'INSERT INTO %s %s' % (self.outputtable, sql)
- self.plan.syssource.doexec(self.plan.session, sql, self.plan.args)
- else:
- return self.plan.sqlexec(sql, self.plan.args)
-
- def get_sql(self):
- self.inputmap = inputmap = self.children[-1].outputmap
- dbhelper=self.plan.syssource.dbhelper
- # get the select clause
- clause = []
- for i, term in enumerate(self.selection):
- try:
- var_name = inputmap[term.as_string()]
- except KeyError:
- var_name = 'C%s' % i
- if isinstance(term, Function):
- # we have to translate some aggregat function
- # (for instance COUNT -> SUM)
- orig_name = term.name
- try:
- term.name = AGGR_TRANSFORMS[term.name]
- # backup and reduce children
- orig_children = term.children
- term.children = [VariableRef(Variable(var_name))]
- clause.append(term.accept(self))
- # restaure the tree XXX necessary?
- term.name = orig_name
- term.children = orig_children
- except KeyError:
- clause.append(var_name)
- else:
- clause.append(var_name)
- for vref in term.iget_nodes(VariableRef):
- inputmap[vref.name] = var_name
- # XXX handle distinct with non selected sort term
- if self.select.distinct:
- sql = ['SELECT DISTINCT %s' % ', '.join(clause)]
- else:
- sql = ['SELECT %s' % ', '.join(clause)]
- sql.append("FROM %s" % self.table)
- # get the group/having clauses
- if self.select.groupby:
- clause = [inputmap[var.name] for var in self.select.groupby]
- grouped = set(var.name for var in self.select.groupby)
- sql.append('GROUP BY %s' % ', '.join(clause))
- else:
- grouped = None
- if self.select.having:
- clause = [term.accept(self) for term in self.select.having]
- sql.append('HAVING %s' % ', '.join(clause))
- # get the orderby clause
- if self.select.orderby:
- clause = []
- for sortterm in self.select.orderby:
- sqlterm = sortterm.term.accept(self)
- if sortterm.asc:
- clause.append(sqlterm)
- else:
- clause.append('%s DESC' % sqlterm)
- if grouped is not None:
- for vref in sortterm.iget_nodes(VariableRef):
- if not vref.name in grouped:
- sql[-1] += ', ' + self.inputmap[vref.name]
- grouped.add(vref.name)
- sql = dbhelper.sql_add_order_by(' '.join(sql),
- clause,
- None, False,
- self.limit or self.offset)
- else:
- sql = ' '.join(sql)
- clause = None
-
- sql = dbhelper.sql_add_limit_offset(sql, self.limit, self.offset, clause)
- return sql
-
- def visit_function(self, function):
- """generate SQL name for a function"""
- try:
- return self.children[0].outputmap[str(function)]
- except KeyError:
- return '%s(%s)' % (function.name,
- ','.join(c.accept(self) for c in function.children))
-
- def visit_variableref(self, variableref):
- """get the sql name for a variable reference"""
- try:
- return self.inputmap[variableref.name]
- except KeyError: # XXX duh? explain
- return variableref.variable.name
-
- def visit_constant(self, constant):
- """generate SQL name for a constant"""
- assert constant.type == 'Int'
- return str(constant.value)
-
-
-class UnionStep(LimitOffsetMixIn, Step):
- """union results of child in-memory steps (e.g. OneFetchStep / AggrStep)"""
-
- def execute(self):
- """execute this step"""
- result = []
- limit = olimit = self.limit
- offset = self.offset
- assert offset != 0
- if offset is not None:
- limit = limit + offset
- for step in self.children:
- if limit is not None:
- if offset is None:
- limit = olimit - len(result)
- step.set_limit_offset(limit, None)
- result_ = step.execute()
- if offset is not None:
- offset, result_ = offset_result(offset, result_)
- result += result_
- if limit is not None:
- if len(result) >= olimit:
- return result[:olimit]
- return result
-
- def mytest_repr(self):
- """return a representation of this step suitable for test"""
- return (self.__class__.__name__, self.limit, self.offset)
-
-
-class IntersectStep(UnionStep):
- """return intersection of results of child in-memory steps (e.g. OneFetchStep / AggrStep)"""
-
- def execute(self):
- """execute this step"""
- result = set()
- for step in self.children:
- result &= frozenset(step.execute())
- result = list(result)
- if self.offset:
- result = result[self.offset:]
- if self.limit:
- result = result[:self.limit]
- return result
-
-
-class UnionFetchStep(Step):
- """union results of child steps using temporary tables (e.g. FetchStep)"""
-
- def execute(self):
- """execute this step"""
- self.execute_children()
-
-
-__all__ = ('FetchStep', 'AggrStep', 'UnionStep', 'UnionFetchStep', 'IntersectStep')