req.py
changeset 11057 0b59724cb3f2
parent 11052 058bb3dc685f
child 11058 23eb30449fe5
--- a/req.py	Mon Jan 04 18:40:30 2016 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,508 +0,0 @@
-# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
-# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
-#
-# This file is part of CubicWeb.
-#
-# CubicWeb is free software: you can redistribute it and/or modify it under the
-# terms of the GNU Lesser General Public License as published by the Free
-# Software Foundation, either version 2.1 of the License, or (at your option)
-# any later version.
-#
-# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
-# details.
-#
-# You should have received a copy of the GNU Lesser General Public License along
-# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
-"""Base class for request/session"""
-
-__docformat__ = "restructuredtext en"
-
-from warnings import warn
-from datetime import time, datetime, timedelta
-
-from six import PY2, PY3, text_type
-from six.moves.urllib.parse import parse_qs, parse_qsl, quote as urlquote, unquote as urlunquote, urlsplit, urlunsplit
-
-from logilab.common.decorators import cached
-from logilab.common.deprecation import deprecated
-from logilab.common.date import ustrftime, strptime, todate, todatetime
-
-from rql.utils import rqlvar_maker
-
-from cubicweb import (Unauthorized, NoSelectableObject, NoResultError,
-                      MultipleResultsError, uilib)
-from cubicweb.rset import ResultSet
-
-ONESECOND = timedelta(0, 1, 0)
-CACHE_REGISTRY = {}
-
-class FindEntityError(Exception):
-    """raised when find_one_entity() can not return one and only one entity"""
-
-class Cache(dict):
-    def __init__(self):
-        super(Cache, self).__init__()
-        _now = datetime.now()
-        self.cache_creation_date = _now
-        self.latest_cache_lookup = _now
-
-
-class RequestSessionBase(object):
-    """base class containing stuff shared by server session and web request
-
-    request/session is the main resources accessor, mainly through it's vreg
-    attribute:
-
-    :attribute vreg: the instance's registry
-    :attribute vreg.schema: the instance's schema
-    :attribute vreg.config: the instance's configuration
-    """
-    is_request = True # False for repository session
-
-    def __init__(self, vreg):
-        self.vreg = vreg
-        try:
-            encoding = vreg.property_value('ui.encoding')
-        except Exception: # no vreg or property not registered
-            encoding = 'utf-8'
-        self.encoding = encoding
-        # cache result of execution for (rql expr / eids),
-        # should be emptied on commit/rollback of the server session / web
-        # connection
-        self.user = None
-        self.local_perm_cache = {}
-        self._ = text_type
-
-    def _set_user(self, orig_user):
-        """set the user for this req_session_base
-
-        A special method is needed to ensure the linked user is linked to the
-        connection too.
-        """
-        rset = self.eid_rset(orig_user.eid, 'CWUser')
-        user_cls = self.vreg['etypes'].etype_class('CWUser')
-        user = user_cls(self, rset, row=0, groups=orig_user.groups,
-                        properties=orig_user.properties)
-        user.cw_attr_cache['login'] = orig_user.login # cache login
-        self.user = user
-        self.set_entity_cache(user)
-        self.set_language(user.prefered_language())
-
-
-    def set_language(self, lang):
-        """install i18n configuration for `lang` translation.
-
-        Raises :exc:`KeyError` if translation doesn't exist.
-        """
-        self.lang = lang
-        gettext, pgettext = self.vreg.config.translations[lang]
-        # use _cw.__ to translate a message without registering it to the catalog
-        self._ = self.__ = gettext
-        self.pgettext = pgettext
-
-    def get_option_value(self, option):
-        raise NotImplementedError
-
-    def property_value(self, key):
-        """return value of the property with the given key, giving priority to
-        user specific value if any, else using site value
-        """
-        if self.user:
-            val = self.user.property_value(key)
-            if val is not None:
-                return val
-        return self.vreg.property_value(key)
-
-    def etype_rset(self, etype, size=1):
-        """return a fake result set for a particular entity type"""
-        rset = ResultSet([('A',)]*size, '%s X' % etype,
-                         description=[(etype,)]*size)
-        def get_entity(row, col=0, etype=etype, req=self, rset=rset):
-            return req.vreg['etypes'].etype_class(etype)(req, rset, row, col)
-        rset.get_entity = get_entity
-        rset.req = self
-        return rset
-
-    def eid_rset(self, eid, etype=None):
-        """return a result set for the given eid without doing actual query
-        (we have the eid, we can suppose it exists and user has access to the
-        entity)
-        """
-        eid = int(eid)
-        if etype is None:
-            etype = self.entity_metas(eid)['type']
-        rset = ResultSet([(eid,)], 'Any X WHERE X eid %(x)s', {'x': eid},
-                         [(etype,)])
-        rset.req = self
-        return rset
-
-    def empty_rset(self):
-        """ return a guaranteed empty result """
-        rset = ResultSet([], 'Any X WHERE X eid -1')
-        rset.req = self
-        return rset
-
-    def entity_from_eid(self, eid, etype=None):
-        """return an entity instance for the given eid. No query is done"""
-        try:
-            return self.entity_cache(eid)
-        except KeyError:
-            rset = self.eid_rset(eid, etype)
-            entity = rset.get_entity(0, 0)
-            self.set_entity_cache(entity)
-            return entity
-
-    def entity_cache(self, eid):
-        raise KeyError
-
-    def set_entity_cache(self, entity):
-        pass
-
-    def create_entity(self, etype, **kwargs):
-        """add a new entity of the given type
-
-        Example (in a shell session):
-
-        >>> c = create_entity('Company', name=u'Logilab')
-        >>> create_entity('Person', firstname=u'John', surname=u'Doe',
-        ...               works_for=c)
-
-        """
-        cls = self.vreg['etypes'].etype_class(etype)
-        return cls.cw_instantiate(self.execute, **kwargs)
-
-    @deprecated('[3.18] use find(etype, **kwargs).entities()')
-    def find_entities(self, etype, **kwargs):
-        """find entities of the given type and attribute values.
-
-        >>> users = find_entities('CWGroup', name=u'users')
-        >>> groups = find_entities('CWGroup')
-        """
-        return self.find(etype, **kwargs).entities()
-
-    @deprecated('[3.18] use find(etype, **kwargs).one()')
-    def find_one_entity(self, etype, **kwargs):
-        """find one entity of the given type and attribute values.
-        raise :exc:`FindEntityError` if can not return one and only one entity.
-
-        >>> users = find_one_entity('CWGroup', name=u'users')
-        >>> groups = find_one_entity('CWGroup')
-        Exception()
-        """
-        try:
-            return self.find(etype, **kwargs).one()
-        except (NoResultError, MultipleResultsError) as e:
-            raise FindEntityError("%s: (%s, %s)" % (str(e), etype, kwargs))
-
-    def find(self, etype, **kwargs):
-        """find entities of the given type and attribute values.
-
-        :returns: A :class:`ResultSet`
-
-        >>> users = find('CWGroup', name=u"users").one()
-        >>> groups = find('CWGroup').entities()
-        """
-        parts = ['Any X WHERE X is %s' % etype]
-        varmaker = rqlvar_maker(defined='X')
-        eschema = self.vreg.schema.eschema(etype)
-        for attr, value in kwargs.items():
-            if isinstance(value, list) or isinstance(value, tuple):
-                raise NotImplementedError("List of values are not supported")
-            if hasattr(value, 'eid'):
-                kwargs[attr] = value.eid
-            if attr.startswith('reverse_'):
-                attr = attr[8:]
-                assert attr in eschema.objrels, \
-                    '%s not in %s object relations' % (attr, eschema)
-                parts.append(
-                    '%(varname)s %(attr)s X, '
-                    '%(varname)s eid %%(reverse_%(attr)s)s'
-                    % {'attr': attr, 'varname': next(varmaker)})
-            else:
-                assert attr in eschema.subjrels, \
-                    '%s not in %s subject relations' % (attr, eschema)
-                parts.append('X %(attr)s %%(%(attr)s)s' % {'attr': attr})
-
-        rql = ', '.join(parts)
-
-        return self.execute(rql, kwargs)
-
-    def ensure_ro_rql(self, rql):
-        """raise an exception if the given rql is not a select query"""
-        first = rql.split(None, 1)[0].lower()
-        if first in ('insert', 'set', 'delete'):
-            raise Unauthorized(self._('only select queries are authorized'))
-
-    def get_cache(self, cachename):
-        """cachename should be dotted names as in :
-
-        - cubicweb.mycache
-        - cubes.blog.mycache
-        - etc.
-        """
-        warn.warning('[3.19] .get_cache will disappear soon. '
-                     'Distributed caching mechanisms are being introduced instead.'
-                     'Other caching mechanism can be used more reliably '
-                     'to the same effect.',
-                     DeprecationWarning)
-        if cachename in CACHE_REGISTRY:
-            cache = CACHE_REGISTRY[cachename]
-        else:
-            cache = CACHE_REGISTRY[cachename] = Cache()
-        _now = datetime.now()
-        if _now > cache.latest_cache_lookup + ONESECOND:
-            ecache = self.execute(
-                'Any C,T WHERE C is CWCache, C name %(name)s, C timestamp T',
-                {'name':cachename}).get_entity(0,0)
-            cache.latest_cache_lookup = _now
-            if not ecache.valid(cache.cache_creation_date):
-                cache.clear()
-                cache.cache_creation_date = _now
-        return cache
-
-    # url generation methods ##################################################
-
-    def build_url(self, *args, **kwargs):
-        """return an absolute URL using params dictionary key/values as URL
-        parameters. Values are automatically URL quoted, and the
-        publishing method to use may be specified or will be guessed.
-
-        if ``__secure__`` argument is True, the request will try to build a
-        https url.
-
-        raises :exc:`ValueError` if None is found in arguments
-        """
-        # use *args since we don't want first argument to be "anonymous" to
-        # avoid potential clash with kwargs
-        method = None
-        if args:
-            assert len(args) == 1, 'only 0 or 1 non-named-argument expected'
-            method = args[0]
-        if method is None:
-            method = 'view'
-        # XXX I (adim) think that if method is passed explicitly, we should
-        #     not try to process it and directly call req.build_url()
-        base_url = kwargs.pop('base_url', None)
-        if base_url is None:
-            secure = kwargs.pop('__secure__', None)
-            base_url = self.base_url(secure=secure)
-        if '_restpath' in kwargs:
-            assert method == 'view', repr(method)
-            path = kwargs.pop('_restpath')
-        else:
-            path = method
-        if not kwargs:
-            return u'%s%s' % (base_url, path)
-        return u'%s%s?%s' % (base_url, path, self.build_url_params(**kwargs))
-
-    def build_url_params(self, **kwargs):
-        """return encoded params to incorporate them in a URL"""
-        args = []
-        for param, values in kwargs.items():
-            if not isinstance(values, (list, tuple)):
-                values = (values,)
-            for value in values:
-                assert value is not None
-                args.append(u'%s=%s' % (param, self.url_quote(value)))
-        return '&'.join(args)
-
-    def url_quote(self, value, safe=''):
-        """urllib.quote is not unicode safe, use this method to do the
-        necessary encoding / decoding. Also it's designed to quote each
-        part of a url path and so the '/' character will be encoded as well.
-        """
-        if PY2 and isinstance(value, unicode):
-            quoted = urlquote(value.encode(self.encoding), safe=safe)
-            return unicode(quoted, self.encoding)
-        return urlquote(str(value), safe=safe)
-
-    def url_unquote(self, quoted):
-        """returns a unicode unquoted string
-
-        decoding is based on `self.encoding` which is the encoding
-        used in `url_quote`
-        """
-        if PY3:
-            return urlunquote(quoted)
-        if isinstance(quoted, unicode):
-            quoted = quoted.encode(self.encoding)
-        try:
-            return unicode(urlunquote(quoted), self.encoding)
-        except UnicodeDecodeError: # might occurs on manually typed URLs
-            return unicode(urlunquote(quoted), 'iso-8859-1')
-
-    def url_parse_qsl(self, querystring):
-        """return a list of (key, val) found in the url quoted query string"""
-        if PY3:
-            for key, val in parse_qsl(querystring):
-                yield key, val
-            return
-        if isinstance(querystring, unicode):
-            querystring = querystring.encode(self.encoding)
-        for key, val in parse_qsl(querystring):
-            try:
-                yield unicode(key, self.encoding), unicode(val, self.encoding)
-            except UnicodeDecodeError: # might occurs on manually typed URLs
-                yield unicode(key, 'iso-8859-1'), unicode(val, 'iso-8859-1')
-
-
-    def rebuild_url(self, url, **newparams):
-        """return the given url with newparams inserted. If any new params
-        is already specified in the url, it's overriden by the new value
-
-        newparams may only be mono-valued.
-        """
-        if PY2 and isinstance(url, unicode):
-            url = url.encode(self.encoding)
-        schema, netloc, path, query, fragment = urlsplit(url)
-        query = parse_qs(query)
-        # sort for testing predictability
-        for key, val in sorted(newparams.items()):
-            query[key] = (self.url_quote(val),)
-        query = '&'.join(u'%s=%s' % (param, value)
-                         for param, values in sorted(query.items())
-                         for value in values)
-        return urlunsplit((schema, netloc, path, query, fragment))
-
-    # bound user related methods ###############################################
-
-    @cached
-    def user_data(self):
-        """returns a dictionary with this user's information.
-
-        The keys are :
-
-        login
-            The user login
-
-        name
-            The user name, returned by user.name()
-
-        email
-            The user principal email
-
-        """
-        userinfo = {}
-        user = self.user
-        userinfo['login'] = user.login
-        userinfo['name'] = user.name()
-        userinfo['email'] = user.cw_adapt_to('IEmailable').get_email()
-        return userinfo
-
-    # formating methods #######################################################
-
-    def view(self, __vid, rset=None, __fallback_oid=None, __registry='views',
-             initargs=None, w=None, **kwargs):
-        """Select object with the given id (`__oid`) then render it.  If the
-        object isn't selectable, try to select fallback object if
-        `__fallback_oid` is specified.
-
-        If specified `initargs` is expected to be a dictionary containing
-        arguments that should be given to selection (hence to object's __init__
-        as well), but not to render(). Other arbitrary keyword arguments will be
-        given to selection *and* to render(), and so should be handled by
-        object's call or cell_call method..
-        """
-        if initargs is None:
-            initargs = kwargs
-        else:
-            initargs.update(kwargs)
-        try:
-            view =  self.vreg[__registry].select(__vid, self, rset=rset, **initargs)
-        except NoSelectableObject:
-            if __fallback_oid is None:
-                raise
-            view =  self.vreg[__registry].select(__fallback_oid, self,
-                                                 rset=rset, **initargs)
-        return view.render(w=w, **kwargs)
-
-    def printable_value(self, attrtype, value, props=None, displaytime=True,
-                        formatters=uilib.PRINTERS):
-        """return a displayablye value (i.e. unicode string)"""
-        if value is None:
-            return u''
-        try:
-            as_string = formatters[attrtype]
-        except KeyError:
-            self.error('given bad attrtype %s', attrtype)
-            return unicode(value)
-        return as_string(value, self, props, displaytime)
-
-    def format_date(self, date, date_format=None, time=False):
-        """return a string for a date time according to instance's
-        configuration
-        """
-        if date is not None:
-            if date_format is None:
-                if time:
-                    date_format = self.property_value('ui.datetime-format')
-                else:
-                    date_format = self.property_value('ui.date-format')
-            return ustrftime(date, date_format)
-        return u''
-
-    def format_time(self, time):
-        """return a string for a time according to instance's
-        configuration
-        """
-        if time is not None:
-            return ustrftime(time, self.property_value('ui.time-format'))
-        return u''
-
-    def format_float(self, num):
-        """return a string for floating point number according to instance's
-        configuration
-        """
-        if num is not None:
-            return self.property_value('ui.float-format') % num
-        return u''
-
-    def parse_datetime(self, value, etype='Datetime'):
-        """get a datetime or time from a string (according to etype)
-        Datetime formatted as Date are accepted
-        """
-        assert etype in ('Datetime', 'Date', 'Time'), etype
-        # XXX raise proper validation error
-        if etype == 'Datetime':
-            format = self.property_value('ui.datetime-format')
-            try:
-                return todatetime(strptime(value, format))
-            except ValueError:
-                pass
-        elif etype == 'Time':
-            format = self.property_value('ui.time-format')
-            try:
-                # (adim) I can't find a way to parse a Time with a custom format
-                date = strptime(value, format) # this returns a DateTime
-                return time(date.hour, date.minute, date.second)
-            except ValueError:
-                raise ValueError(self._('can\'t parse %(value)r (expected %(format)s)')
-                                 % {'value': value, 'format': format})
-        try:
-            format = self.property_value('ui.date-format')
-            dt = strptime(value, format)
-            if etype == 'Datetime':
-                return todatetime(dt)
-            return todate(dt)
-        except ValueError:
-            raise ValueError(self._('can\'t parse %(value)r (expected %(format)s)')
-                             % {'value': value, 'format': format})
-
-    def _base_url(self, secure=None):
-        if secure:
-            return self.vreg.config.get('https-url') or self.vreg.config['base-url']
-        return self.vreg.config['base-url']
-
-    def base_url(self, secure=None):
-        """return the root url of the instance
-        """
-        url = self._base_url(secure=secure)
-        return url if url is None else url.rstrip('/') + '/'
-
-    # abstract methods to override according to the web front-end #############
-
-    def describe(self, eid, asdict=False):
-        """return a tuple (type, sourceuri, extid) for the entity with id <eid>"""
-        raise NotImplementedError