server/mssteps.py
author Julien Jehannet <Julien Jehannet <julien.jehannet@logilab.fr>>
Tue, 02 Mar 2010 21:48:36 +0100
branchstable
changeset 4783 6dc34d4cf892
parent 4212 ab6573088b4a
child 5004 4cc020ee70e2
child 5421 8167de96c523
permissions -rw-r--r--
[F] views: fix 2 unicode errors 1. You can now use valid unicode strings in ValidationError exception. Previously, if 'err' contains unicode, UnicodeDecodeError was raised by format_errors() >>> templstr = '<li>%s</li>\n' >>> e = ValidationError(None, {None: u'oué, une exception en unicode!'}) >>> templstr % e '<li>None (None): ou\xc3\xa9, une exception en unicode!</li>\n' >>> templstr = u'<li>%s</li>\n' >>> templstr % e u'<li>None (None): ou\xe9, une exception en unicode!</li>\n' 2. The message of an Exception can contains unicode. But it now properly managed by “informal” string representation. We can easily fix the problem by using the Exception.message attribute that still contains the original message. >>> a = AssertionError(u'séfdsdf') >>> a.message u's\xe9fdsdf' >>> str(a) Traceback (most recent call last): File "<stdin>", line 1, in <module> UnicodeEncodeError: 'ascii' codec can't encode character u'\xe9' in position 1: ordinal not in range(128) >>> a = ValueError(u'fsdfsdéfsdfs') >>> str(a) Traceback (most recent call last): File "<stdin>", line 1, in <module> UnicodeEncodeError: 'ascii' codec can't encode character u'\xe9' in position 6: ordinal not in range(128) >>> a ValueError(u'fsdfsd\xe9fsdfs',) >>> unicode(a) Traceback (most recent call last): File "<stdin>", line 1, in <module> UnicodeEncodeError: 'ascii' codec can't encode character u'\xe9' in position 6: ordinal not in range(128) >>> a.message u'fsdfsd\xe9fsdfs'

"""Defines the diferent querier steps usable in plans.

FIXME : this code needs refactoring. Some problems :
* get data from the parent plan, the latest step, temporary table...
* each step has is own members (this is not necessarily bad, but a bit messy
  for now)

:organization: Logilab
:copyright: 2003-2010 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
"""
__docformat__ = "restructuredtext en"

from rql.nodes import VariableRef, Variable, Function

from cubicweb.server.ssplanner import (LimitOffsetMixIn, Step, OneFetchStep,
                                    varmap_test_repr, offset_result)

AGGR_TRANSFORMS = {'COUNT':'SUM', 'MIN':'MIN', 'MAX':'MAX', 'SUM': 'SUM'}

def remove_clauses(union, keepgroup):
    clauses = []
    for select in union.children:
        if keepgroup:
            having, orderby = select.having, select.orderby
            select.having, select.orderby = None, None
            clauses.append( (having, orderby) )
        else:
            groupby, having, orderby = select.groupby, select.having, select.orderby
            select.groupby, select.having, select.orderby = None, None, None
            clauses.append( (groupby, having, orderby) )
    return clauses

def restore_clauses(union, keepgroup, clauses):
    for i, select in enumerate(union.children):
        if keepgroup:
            select.having, select.orderby = clauses[i]
        else:
            select.groupby, select.having, select.orderby = clauses[i]


class FetchStep(OneFetchStep):
    """step consisting in fetching data from sources, and storing result in
    a temporary table
    """
    def __init__(self, plan, union, sources, table, keepgroup, inputmap=None):
        OneFetchStep.__init__(self, plan, union, sources)
        # temporary table to store step result
        self.table = table
        # should groupby clause be kept or not
        self.keepgroup = keepgroup
        # variables mapping to use as input
        self.inputmap = inputmap
        # output variable mapping
        srqlst = union.children[0] # sample select node
        # add additional information to the output mapping
        self.outputmap = plan.init_temp_table(table, srqlst.selection,
                                              srqlst.solutions[0])
        for vref in srqlst.selection:
            if not isinstance(vref, VariableRef):
                continue
            var = vref.variable
            if var.stinfo['attrvars']:
                for lhsvar, rtype in var.stinfo['attrvars']:
                    if lhsvar.name in srqlst.defined_vars:
                        key = '%s.%s' % (lhsvar.name, rtype)
                        self.outputmap[key] = self.outputmap[var.name]
            else:
                rschema = self.plan.schema.rschema
                for rel in var.stinfo['rhsrelations']:
                    if rschema(rel.r_type).inlined:
                        lhsvar = rel.children[0]
                        if lhsvar.name in srqlst.defined_vars:
                            key = '%s.%s' % (lhsvar.name, rel.r_type)
                            self.outputmap[key] = self.outputmap[var.name]

    def execute(self):
        """execute this step"""
        self.execute_children()
        plan = self.plan
        plan.create_temp_table(self.table)
        union = self.union
        # XXX 2.5 use "with"
        clauses = remove_clauses(union, self.keepgroup)
        for source in self.sources:
            source.flying_insert(self.table, plan.session, union, plan.args,
                                 self.inputmap)
        restore_clauses(union, self.keepgroup, clauses)

    def mytest_repr(self):
        """return a representation of this step suitable for test"""
        clauses = remove_clauses(self.union, self.keepgroup)
        try:
            inputmap = varmap_test_repr(self.inputmap, self.plan.tablesinorder)
            outputmap = varmap_test_repr(self.outputmap, self.plan.tablesinorder)
        except AttributeError:
            inputmap = self.inputmap
            outputmap = self.outputmap
        try:
            return (self.__class__.__name__,
                sorted((r.as_string(kwargs=self.plan.args), r.solutions)
                       for r in self.union.children),
                sorted(self.sources), inputmap, outputmap)
        finally:
            restore_clauses(self.union, self.keepgroup, clauses)


class AggrStep(LimitOffsetMixIn, Step):
    """step consisting in making aggregat from temporary data in the system
    source
    """
    def __init__(self, plan, selection, select, table, outputtable=None):
        Step.__init__(self, plan)
        # original selection
        self.selection = selection
        # original Select RQL tree
        self.select = select
        # table where are located temporary results
        self.table = table
        # optional table where to write results
        self.outputtable = outputtable
        if outputtable is not None:
            plan.init_temp_table(outputtable, selection, select.solutions[0])

        #self.inputmap = inputmap

    def mytest_repr(self):
        """return a representation of this step suitable for test"""
        sel = self.select.selection
        restr = self.select.where
        self.select.selection = self.selection
        self.select.where = None
        rql = self.select.as_string(kwargs=self.plan.args)
        self.select.selection = sel
        self.select.where = restr
        try:
            # rely on a monkey patch (cf unittest_querier)
            table = self.plan.tablesinorder[self.table]
            outputtable = self.outputtable and self.plan.tablesinorder[self.outputtable]
        except AttributeError:
            # not monkey patched
            table = self.table
            outputtable = self.outputtable
        return (self.__class__.__name__, rql, self.limit, self.offset, table,
                outputtable)

    def execute(self):
        """execute this step"""
        self.execute_children()
        self.inputmap = inputmap = self.children[-1].outputmap
        # get the select clause
        clause = []
        for i, term in enumerate(self.selection):
            try:
                var_name = inputmap[term.as_string()]
            except KeyError:
                var_name = 'C%s' % i
            if isinstance(term, Function):
                # we have to translate some aggregat function
                # (for instance COUNT -> SUM)
                orig_name = term.name
                try:
                    term.name = AGGR_TRANSFORMS[term.name]
                    # backup and reduce children
                    orig_children = term.children
                    term.children = [VariableRef(Variable(var_name))]
                    clause.append(term.accept(self))
                    # restaure the tree XXX necessary?
                    term.name = orig_name
                    term.children = orig_children
                except KeyError:
                    clause.append(var_name)
            else:
                clause.append(var_name)
                for vref in term.iget_nodes(VariableRef):
                    inputmap[vref.name] = var_name
        # XXX handle distinct with non selected sort term
        if self.select.distinct:
            sql = ['SELECT DISTINCT %s' % ', '.join(clause)]
        else:
            sql = ['SELECT %s' % ', '.join(clause)]
        sql.append("FROM %s" % self.table)
        # get the group/having clauses
        if self.select.groupby:
            clause = [inputmap[var.name] for var in self.select.groupby]
            grouped = set(var.name for var in self.select.groupby)
            sql.append('GROUP BY %s' % ', '.join(clause))
        else:
            grouped = None
        if self.select.having:
            clause = [term.accept(self) for term in self.select.having]
            sql.append('HAVING %s' % ', '.join(clause))
        # get the orderby clause
        if self.select.orderby:
            clause = []
            for sortterm in self.select.orderby:
                sqlterm = sortterm.term.accept(self)
                if sortterm.asc:
                    clause.append(sqlterm)
                else:
                    clause.append('%s DESC' % sqlterm)
                if grouped is not None:
                    for vref in sortterm.iget_nodes(VariableRef):
                        if not vref.name in grouped:
                            sql[-1] += ', ' + self.inputmap[vref.name]
                            grouped.add(vref.name)
            sql.append('ORDER BY %s' % ', '.join(clause))
        if self.limit:
            sql.append('LIMIT %s' % self.limit)
        if self.offset:
            sql.append('OFFSET %s' % self.offset)
        #print 'DATA', plan.sqlexec('SELECT * FROM %s' % self.table, None)
        sql = ' '.join(sql)
        if self.outputtable:
            self.plan.create_temp_table(self.outputtable)
            sql = 'INSERT INTO %s %s' % (self.outputtable, sql)
        return self.plan.sqlexec(sql, self.plan.args)

    def visit_function(self, function):
        """generate SQL name for a function"""
        return '%s(%s)' % (function.name,
                           ','.join(c.accept(self) for c in function.children))

    def visit_variableref(self, variableref):
        """get the sql name for a variable reference"""
        try:
            return self.inputmap[variableref.name]
        except KeyError: # XXX duh? explain
            return variableref.variable.name

    def visit_constant(self, constant):
        """generate SQL name for a constant"""
        assert constant.type == 'Int'
        return str(constant.value)


class UnionStep(LimitOffsetMixIn, Step):
    """union results of child in-memory steps (e.g. OneFetchStep / AggrStep)"""

    def execute(self):
        """execute this step"""
        result = []
        limit = olimit = self.limit
        offset = self.offset
        assert offset != 0
        if offset is not None:
            limit = limit + offset
        for step in self.children:
            if limit is not None:
                if offset is None:
                    limit = olimit - len(result)
                step.set_limit_offset(limit, None)
            result_ = step.execute()
            if offset is not None:
                offset, result_ = offset_result(offset, result_)
            result += result_
            if limit is not None:
                if len(result) >= olimit:
                    return result[:olimit]
        return result

    def mytest_repr(self):
        """return a representation of this step suitable for test"""
        return (self.__class__.__name__, self.limit, self.offset)


class IntersectStep(UnionStep):
    """return intersection of results of child in-memory steps (e.g. OneFetchStep / AggrStep)"""

    def execute(self):
        """execute this step"""
        result = set()
        for step in self.children:
            result &= frozenset(step.execute())
        result = list(result)
        if self.offset:
            result = result[self.offset:]
        if self.limit:
            result = result[:self.limit]
        return result


class UnionFetchStep(Step):
    """union results of child steps using temporary tables (e.g. FetchStep)"""

    def execute(self):
        """execute this step"""
        self.execute_children()


__all__ = ('FetchStep', 'AggrStep', 'UnionStep', 'UnionFetchStep', 'IntersectStep')