goa/rqlinterpreter.py
author Sylvain Thénault <sylvain.thenault@logilab.fr>
Fri, 21 Aug 2009 15:03:00 +0200
branchstable
changeset 2960 1c6eafc68586
parent 1977 606923dff11b
child 3689 deb13e88e037
permissions -rw-r--r--
[db-dump] don't create tarball on failed dump, properly remove temporary directory

"""provide a minimal RQL support for google appengine dbmodel

:organization: Logilab
:copyright: 2008-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
"""
__docformat__ = "restructuredtext en"

from datetime import datetime

from rql import RQLHelper, nodes

from cubicweb import Binary
from cubicweb.rset import ResultSet
from cubicweb.server import SQL_CONNECT_HOOKS

from google.appengine.api.datastore import Key, Get, Query, Entity
from google.appengine.api.datastore_types import Text, Blob
from google.appengine.api.datastore_errors import EntityNotFoundError, BadKeyError


def etype_from_key(key):
    return Key(key).kind()

def poss_var_types(myvar, ovar, kind, solutions):
    return frozenset(etypes[myvar] for etypes in solutions
                     if etypes[ovar] == kind)

def expand_result(results, result, myvar, values, dsget=None):
    values = map(dsget, values)
    if values:
        result[myvar] = values.pop(0)
        for value in values:
            newresult = result.copy()
            newresult[myvar] = value
            results.append(newresult)
    else:
        results.remove(result)

def _resolve(restrictions, solutions, fixed):
    varname = restrictions[0].searched_var
    objs = []
    for etype in frozenset(etypes[varname] for etypes in solutions):
        gqlargs = {}
        query = Query(etype)
        for restriction in restrictions:
            restriction.fill_query(fixed, query)
        pobjs = query.Run()
        if varname in fixed:
            value = fixed[varname]
            objs += (x for x in pobjs if x == value)
        else:
            objs += pobjs
    if varname in fixed and not objs:
        raise EidMismatch(varname, value)
    return objs

def _resolve_not(restrictions, solutions, fixed):
    restr = restrictions[0]
    constrvarname = restr.constraint_var
    if len(restrictions) > 1 or not constrvarname in fixed:
        raise NotImplementedError()
    varname = restr.searched_var
    objs = []
    for etype in frozenset(etypes[varname] for etypes in solutions):
        gqlargs = {}
        for operator in ('<', '>'):
            query = Query(etype)
            restr.fill_query(fixed, query, operator)
            objs += query.Run()
    return objs

def _print_results(rlist):
    return '[%s]' % ', '.join(_print_result(r) for r in rlist)

def _print_result(rdict):
    string = []
    for k, v in rdict.iteritems():
        if isinstance(v, Entity):
            string.append('%s: %s' % (k, v.key()))#_print_result(v)))
        elif isinstance(v, list):
            string.append('%s: [%s]' % (k, ', '.join(str(i) for i in v)))
        else:
            string.append('%s: %s' % (k, v))
    return '{%s}' % ', '.join(string)


class EidMismatch(Exception):
    def __init__(self, varname, value):
        self.varname = varname
        self.value = value


class Restriction(object):
    supported_operators = ('=',)
    def __init__(self, rel):
        operator = rel.children[1].operator
        if not operator in self.supported_operators:
            raise NotImplementedError('unsupported operator')
        self.rel = rel
        self.operator = operator
        self.rtype = rel.r_type
        self.var = rel.children[0]

    def __repr__(self):
        return '<%s for %s>' % (self.__class__.__name__, self.rel)

    @property
    def rhs(self):
        return self.rel.children[1].children[0]


class MultipleRestriction(object):
    def __init__(self, restrictions):
        self.restrictions = restrictions

    def resolve(self, solutions, fixed):
        return _resolve(self.restrictions, solutions, fixed)


class VariableSelection(Restriction):
    def __init__(self, rel, dsget, prefix='s'):
        Restriction.__init__(self, rel)
        self._dsget = dsget
        self._not = self.rel.neged(strict=True)
        self._prefix = prefix + '_'

    def __repr__(self):
        return '<%s%s for %s>' % (self._prefix[0], self.__class__.__name__, self.rel)

    @property
    def searched_var(self):
        if self._prefix == 's_':
            return self.var.name
        return self.rhs.name

    @property
    def constraint_var(self):
        if self._prefix == 's_':
            return self.rhs.name
        return self.var.name

    def _possible_values(self, myvar, ovar, entity, solutions, dsprefix):
        if self.rtype == 'identity':
            return (entity.key(),)
        value = entity.get(dsprefix + self.rtype)
        if value is None:
            return []
        if not isinstance(value, list):
            value = [value]
        vartypes = poss_var_types(myvar, ovar, entity.kind(), solutions)
        return (v for v in value if v.kind() in vartypes)

    def complete_and_filter(self, solutions, results):
        myvar = self.rhs.name
        ovar = self.var.name
        rtype = self.rtype
        if self.schema.rschema(rtype).is_final():
            # should be detected by rql.stcheck: "Any C WHERE NOT X attr C" doesn't make sense
            #if self._not:
            #    raise NotImplementedError()
            for result in results:
                result[myvar] = result[ovar].get('s_'+rtype)
        elif self.var.name in results[0]:
            if self.rhs.name in results[0]:
                self.filter(solutions, results)
            else:
                if self._not:
                    raise NotImplementedError()
                for result in results[:]:
                    values = self._possible_values(myvar, ovar, result[ovar],
                                                   solutions, 's_')
                    expand_result(results, result, myvar, values, self._dsget)
        else:
            assert self.rhs.name in results[0]
            self.object_complete_and_filter(solutions, results)

    def filter(self, solutions, results):
        myvar = self.rhs.name
        ovar = self.var.name
        newsols = {}
        for result in results[:]:
            entity = result[ovar]
            key = entity.key()
            if not key in newsols:
                values = self._possible_values(myvar, ovar, entity, solutions, 's_')
                newsols[key] = frozenset(v for v in values)
            if self._not:
                if result[myvar].key() in newsols[key]:
                    results.remove(result)
            elif not result[myvar].key() in newsols[key]:
                results.remove(result)

    def object_complete_and_filter(self, solutions, results):
        if self._not:
            raise NotImplementedError()
        myvar = self.var.name
        ovar = self.rhs.name
        for result in results[:]:
            values = self._possible_values(myvar, ovar, result[ovar],
                                           solutions, 'o_')
            expand_result(results, result, myvar, values, self._dsget)


class EidRestriction(Restriction):
    def __init__(self, rel, dsget):
        Restriction.__init__(self, rel)
        self._dsget = dsget

    def resolve(self, kwargs):
        value = self.rel.children[1].children[0].eval(kwargs)
        return self._dsget(value)


class RelationRestriction(VariableSelection):

    def _get_value(self, fixed):
        return fixed[self.constraint_var].key()

    def fill_query(self, fixed, query, operator=None):
        restr = '%s%s %s' % (self._prefix, self.rtype, operator or self.operator)
        query[restr] = self._get_value(fixed)

    def resolve(self, solutions, fixed):
        if self.rtype == 'identity':
            if self._not:
                raise NotImplementedError()
            return [fixed[self.constraint_var]]
        if self._not:
            return _resolve_not([self], solutions, fixed)
        return _resolve([self], solutions, fixed)


class NotRelationRestriction(RelationRestriction):

    def _get_value(self, fixed):
        return None

    def resolve(self, solutions, fixed):
        if self.rtype == 'identity':
            raise NotImplementedError()
        return _resolve([self], solutions, fixed)


class AttributeRestriction(RelationRestriction):
    supported_operators = ('=', '>', '>=', '<', '<=', 'ILIKE')
    def __init__(self, rel, kwargs):
        RelationRestriction.__init__(self, rel, None)
        value = self.rhs.eval(kwargs)
        self.value = value
        if self.operator == 'ILIKE':
            if value.startswith('%'):
                raise NotImplementedError('LIKE is only supported for prefix search')
            if not value.endswith('%'):
                raise NotImplementedError('LIKE is only supported for prefix search')
            self.operator = '>'
            self.value = value[:-1]

    def complete_and_filter(self, solutions, results):
        # check lhs var first in case this is a restriction
        assert self._not
        myvar, rtype, value = self.var.name, self.rtype, self.value
        for result in results[:]:
            if result[myvar].get('s_'+rtype) == value:
                results.remove(result)

    def _get_value(self, fixed):
        return self.value


class DateAttributeRestriction(AttributeRestriction):
    """just a thin layer on top af `AttributeRestriction` that
    tries to convert date strings such as in :
    Any X WHERE X creation_date >= '2008-03-04'
    """
    def __init__(self, rel, kwargs):
        super(DateAttributeRestriction, self).__init__(rel, kwargs)
        if isinstance(self.value, basestring):
#             try:
            self.value = datetime.strptime(self.value, '%Y-%m-%d')
#             except Exception, exc:
#                 from logging import error
#                 error('unable to parse date %s with format %%Y-%%m-%%d (exc=%s)', value, exc)


class AttributeInRestriction(AttributeRestriction):
    def __init__(self, rel, kwargs):
        RelationRestriction.__init__(self, rel, None)
        values = []
        for c in self.rel.children[1].iget_nodes(nodes.Constant):
            values.append(c.eval(kwargs))
        self.value = values

    @property
    def operator(self):
        return 'in'


class TypeRestriction(AttributeRestriction):
    def __init__(self, var):
        self.var = var

    def __repr__(self):
        return '<%s for %s>' % (self.__class__.__name__, self.var)

    def resolve(self, solutions, fixed):
        objs = []
        for etype in frozenset(etypes[self.var.name] for etypes in solutions):
            objs += Query(etype).Run()
        return objs


def append_result(res, descr, i, j, value, etype):
    if value is not None:
        if isinstance(value, Text):
            value = unicode(value)
        elif isinstance(value, Blob):
            value = Binary(str(value))
    if j == 0:
        res.append([value])
        descr.append([etype])
    else:
        res[i].append(value)
        descr[i].append(etype)


class ValueResolver(object):
    def __init__(self, functions, args, term):
        self.functions = functions
        self.args = args
        self.term = term
        self._solution = self.term.stmt.solutions[0]

    def compute(self, result):
        """return (entity type, value) to which self.term is evaluated according
        to the given result dictionnary and to query arguments (self.args)
        """
        return self.term.accept(self, result)

    def visit_function(self, node, result):
        args = tuple(n.accept(self, result)[1] for n in node.children)
        value = self.functions[node.name](*args)
        return node.get_type(self._solution, self.args), value

    def visit_variableref(self, node, result):
        value = result[node.name]
        try:
            etype = value.kind()
            value = str(value.key())
        except AttributeError:
            etype = self._solution[node.name]
        return etype, value

    def visit_constant(self, node, result):
        return node.get_type(kwargs=self.args), node.eval(self.args)


class RQLInterpreter(object):
    """algorithm:
    1. visit the restriction clauses and collect restriction for each subject
       of a relation. Different restriction types are:
       * EidRestriction
       * AttributeRestriction
       * RelationRestriction
       * VariableSelection (not really a restriction)
       -> dictionary {<variable>: [restriction...], ...}
    2. resolve eid restrictions
    3. for each select in union:
           for each solution in select'solutions:
               1. resolve variables which have attribute restriction
               2. resolve relation restriction
               3. resolve selection and add to global results
    """
    def __init__(self, schema):
        self.schema = schema
        Restriction.schema = schema # yalta!
        self.rqlhelper = RQLHelper(schema, {'eid': etype_from_key})
        self._stored_proc = {'LOWER': lambda x: x.lower(),
                             'UPPER': lambda x: x.upper()}
        for cb in SQL_CONNECT_HOOKS.get('sqlite', []):
            cb(self)

    # emulate sqlite connection interface so we can reuse stored procedures
    def create_function(self, name, nbargs, func):
        self._stored_proc[name] = func

    def create_aggregate(self, name, nbargs, func):
        self._stored_proc[name] = func


    def execute(self, operation, parameters=None, eid_key=None, build_descr=True):
        rqlst = self.rqlhelper.parse(operation, annotate=True)
        try:
            self.rqlhelper.compute_solutions(rqlst, kwargs=parameters)
        except BadKeyError:
            results, description = [], []
        else:
            results, description = self.interpret(rqlst, parameters)
        return ResultSet(results, operation, parameters, description, rqlst=rqlst)

    def interpret(self, node, kwargs, dsget=None):
        if dsget is None:
            self._dsget = Get
        else:
            self._dsget = dsget
        try:
            return node.accept(self, kwargs)
        except NotImplementedError:
            self.critical('support for query not implemented: %s', node)
            raise

    def visit_union(self, node, kwargs):
        results, description = [], []
        extra = {'kwargs': kwargs}
        for child in node.children:
            pres, pdescr = self.visit_select(child, extra)
            results += pres
            description += pdescr
        return results, description

    def visit_select(self, node, extra):
        constraints = {}
        if node.where is not None:
            node.where.accept(self, constraints, extra)
        fixed, toresolve, postresolve, postfilters = {}, {}, {}, []
        # extract NOT filters
        for vname, restrictions in constraints.items():
            for restr in restrictions[:]:
                if isinstance(restr, AttributeRestriction) and restr._not:
                    postfilters.append(restr)
                    restrictions.remove(restr)
                    if not restrictions:
                        del constraints[vname]
        # add TypeRestriction for variable which have no restrictions at all
        for varname, var in node.defined_vars.iteritems():
            if not varname in constraints:
                constraints[varname] = [TypeRestriction(var)]
        #print node, constraints
        # compute eid restrictions
        kwargs = extra['kwargs']
        for varname, restrictions in constraints.iteritems():
            for restr in restrictions[:]:
                if isinstance(restr, EidRestriction):
                    assert not varname in fixed
                    try:
                        value = restr.resolve(kwargs)
                        fixed[varname] = value
                    except EntityNotFoundError:
                        return [], []
                    restrictions.remove(restr)
        #print 'fixed', fixed.keys()
        # combine remaining restrictions
        for varname, restrictions in constraints.iteritems():
            for restr in restrictions:
                if isinstance(restr, AttributeRestriction):
                    toresolve.setdefault(varname, []).append(restr)
                elif isinstance(restr, NotRelationRestriction) or (
                    isinstance(restr, RelationRestriction) and
                    not restr.searched_var in fixed and restr.constraint_var in fixed):
                    toresolve.setdefault(varname, []).append(restr)
                else:
                    postresolve.setdefault(varname, []).append(restr)
            try:
                if len(toresolve[varname]) > 1:
                    toresolve[varname] = MultipleRestriction(toresolve[varname])
                else:
                    toresolve[varname] = toresolve[varname][0]
            except KeyError:
                pass
        #print 'toresolve %s' % toresolve
        #print 'postresolve %s' % postresolve
        # resolve additional restrictions
        if fixed:
            partres = [fixed.copy()]
        else:
            partres = []
        for varname, restr in toresolve.iteritems():
            varpartres = partres[:]
            try:
                values = tuple(restr.resolve(node.solutions, fixed))
            except EidMismatch, ex:
                varname = ex.varname
                value = ex.value
                partres = [res for res in partres if res[varname] != value]
                if partres:
                    continue
                # some join failed, no possible results
                return [], []
            if not values:
                # some join failed, no possible results
                return [], []
            if not varpartres:
                # init results
                for value in values:
                    partres.append({varname: value})
            elif not varname in partres[0]:
                # cartesian product
                for res in partres:
                    res[varname] = values[0]
                for res in partres[:]:
                    for value in values[1:]:
                        res = res.copy()
                        res[varname] = value
                        partres.append(res)
            else:
                # union
                for res in varpartres:
                    for value in values:
                        res = res.copy()
                        res[varname] = value
                        partres.append(res)
        #print 'partres', len(partres)
        #print partres
        # Note: don't check for empty partres since constant selection may still
        # produce result at this point
        # sort to get RelationRestriction before AttributeSelection
        restrictions = sorted((restr for restrictions in postresolve.itervalues()
                               for restr in restrictions),
                              key=lambda x: not isinstance(x, RelationRestriction))
        # compute stuff not doable in the previous step using datastore queries
        for restr in restrictions + postfilters:
            restr.complete_and_filter(node.solutions, partres)
            if not partres:
                # some join failed, no possible results
                return [], []
        if extra.pop('has_exists', False):
            # remove potential duplicates introduced by exists
            toremovevars = [v.name for v in node.defined_vars.itervalues()
                            if not v.scope is node]
            if toremovevars:
                newpartres = []
                for result in partres:
                    for var in toremovevars:
                        del result[var]
                    if not result in newpartres:
                        newpartres.append(result)
                if not newpartres:
                    # some join failed, no possible results
                    return [], []
                partres = newpartres
        if node.orderby:
            for sortterm in reversed(node.orderby):
                resolver = ValueResolver(self._stored_proc, kwargs, sortterm.term)
                partres.sort(reverse=not sortterm.asc,
                             key=lambda x: resolver.compute(x)[1])
        if partres:
            if node.offset:
                partres = partres[node.offset:]
            if node.limit:
                partres = partres[:node.limit]
            if not partres:
                return [], []
        #print 'completed partres', _print_results(partres)
        # compute results
        res, descr = [], []
        for j, term in enumerate(node.selection):
            resolver = ValueResolver(self._stored_proc, kwargs, term)
            if not partres:
                etype, value = resolver.compute({})
                # only constant selected
                if not res:
                    res.append([])
                    descr.append([])
                    res[0].append(value)
                    descr[0].append(etype)
            else:
                for i, sol in enumerate(partres):
                    etype, value = resolver.compute(sol)
                    append_result(res, descr, i, j, value, etype)
        #print '--------->', res
        return res, descr

    def visit_and(self, node, constraints, extra):
        for child in node.children:
            child.accept(self, constraints, extra)
    def visit_exists(self, node, constraints, extra):
        extra['has_exists'] = True
        self.visit_and(node, constraints, extra)

    def visit_not(self, node, constraints, extra):
        for child in node.children:
            child.accept(self, constraints, extra)
        try:
            extra.pop(node)
        except KeyError:
            raise NotImplementedError()

    def visit_relation(self, node, constraints, extra):
        if node.is_types_restriction():
            return
        rschema = self.schema.rschema(node.r_type)
        neged = node.neged(strict=True)
        if neged:
            # ok, we *may* process this Not node (not implemented error will be
            # raised later if we can't)
            extra[node.parent] = True
        if rschema.is_final():
            self._visit_final_relation(rschema, node, constraints, extra)
        elif neged:
            self._visit_non_final_neged_relation(rschema, node, constraints)
        else:
            self._visit_non_final_relation(rschema, node, constraints)

    def _visit_non_final_relation(self, rschema, node, constraints, not_=False):
        lhs, rhs = node.get_variable_parts()
        for v1, v2, prefix in ((lhs, rhs, 's'), (rhs, lhs, 'o')):
            #if not_:
            nbrels = len(v2.variable.stinfo['relations'])
            #else:
            #    nbrels = len(v2.variable.stinfo['relations']) - len(v2.variable.stinfo['uidrels'])
            if nbrels > 1:
                constraints.setdefault(v1.name, []).append(
                    RelationRestriction(node, self._dsget, prefix))
                # just init an empty list for v2 variable to avoid a
                # TypeRestriction being added for it
                constraints.setdefault(v2.name, [])
                break
        else:
            constraints.setdefault(rhs.name, []).append(
                VariableSelection(node, self._dsget, 's'))

    def _visit_non_final_neged_relation(self, rschema, node, constraints):
        lhs, rhs = node.get_variable_parts()
        for v1, v2, prefix in ((lhs, rhs, 's'), (rhs, lhs, 'o')):
            stinfo = v2.variable.stinfo
            if not stinfo['selected'] and len(stinfo['relations']) == 1:
                constraints.setdefault(v1.name, []).append(
                    NotRelationRestriction(node, self._dsget, prefix))
                constraints.setdefault(v2.name, [])
                break
        else:
            self._visit_non_final_relation(rschema, node, constraints, True)

    def _visit_final_relation(self, rschema, node, constraints, extra):
        varname = node.children[0].name
        if rschema.type == 'eid':
            constraints.setdefault(varname, []).append(
                EidRestriction(node, self._dsget))
        else:
            rhs = node.children[1].children[0]
            if isinstance(rhs, nodes.VariableRef):
                constraints.setdefault(rhs.name, []).append(
                    VariableSelection(node, self._dsget))
            elif isinstance(rhs, nodes.Constant):
                if rschema.objects()[0] in ('Datetime', 'Date'): # XXX
                    constraints.setdefault(varname, []).append(
                        DateAttributeRestriction(node, extra['kwargs']))
                else:
                    constraints.setdefault(varname, []).append(
                        AttributeRestriction(node, extra['kwargs']))
            elif isinstance(rhs, nodes.Function) and rhs.name == 'IN':
                constraints.setdefault(varname, []).append(
                    AttributeInRestriction(node, extra['kwargs']))
            else:
                raise NotImplementedError()

    def _not_implemented(self, *args, **kwargs):
        raise NotImplementedError()

    visit_or = _not_implemented
    # shouldn't occurs
    visit_set = _not_implemented
    visit_insert = _not_implemented
    visit_delete = _not_implemented


from logging import getLogger
from cubicweb import set_log_methods
set_log_methods(RQLInterpreter, getLogger('cubicweb.goa.rqlinterpreter'))
set_log_methods(Restriction, getLogger('cubicweb.goa.rqlinterpreter'))