diff -r 058bb3dc685f -r 0b59724cb3f2 web/request.py --- a/web/request.py Mon Jan 04 18:40:30 2016 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1138 +0,0 @@ -# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. -# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr -# -# This file is part of CubicWeb. -# -# CubicWeb is free software: you can redistribute it and/or modify it under the -# terms of the GNU Lesser General Public License as published by the Free -# Software Foundation, either version 2.1 of the License, or (at your option) -# any later version. -# -# CubicWeb is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License along -# with CubicWeb. If not, see . -"""abstract class for http request""" - -__docformat__ = "restructuredtext en" - -import time -import random -import base64 -from hashlib import sha1 # pylint: disable=E0611 -from calendar import timegm -from datetime import date, datetime -from warnings import warn -from io import BytesIO - -from six import PY2, binary_type, text_type, string_types -from six.moves import http_client -from six.moves.urllib.parse import urlsplit, quote as urlquote -from six.moves.http_cookies import SimpleCookie - -from rql.utils import rqlvar_maker - -from logilab.common.decorators import cached -from logilab.common.deprecation import deprecated -from logilab.mtconverter import xml_escape - -from cubicweb import AuthenticationError -from cubicweb.req import RequestSessionBase -from cubicweb.uilib import remove_html_tags, js -from cubicweb.utils import HTMLHead, make_uid -from cubicweb.view import TRANSITIONAL_DOCTYPE_NOEXT -from cubicweb.web import (INTERNAL_FIELD_VALUE, LOGGER, NothingToEdit, - RequestError, StatusResponse) -from cubicweb.web.httpcache import get_validators -from cubicweb.web.http_headers import Headers, Cookie, parseDateTime - -_MARKER = object() - -def build_cb_uid(seed): - sha = sha1(('%s%s%s' % (time.time(), seed, random.random())).encode('ascii')) - return 'cb_%s' % (sha.hexdigest()) - - -def list_form_param(form, param, pop=False): - """get param from form parameters and return its value as a list, - skipping internal markers if any - - * if the parameter isn't defined, return an empty list - * if the parameter is a single (unicode) value, return a list - containing that value - * if the parameter is already a list or tuple, just skip internal - markers - - if pop is True, the parameter is removed from the form dictionary - """ - if pop: - try: - value = form.pop(param) - except KeyError: - return [] - else: - value = form.get(param, ()) - if value is None: - value = () - elif not isinstance(value, (list, tuple)): - value = [value] - return [v for v in value if v != INTERNAL_FIELD_VALUE] - - -class Counter(object): - """A picklable counter object, usable for e.g. page tab index count""" - __slots__ = ('value',) - - def __init__(self, initialvalue=0): - self.value = initialvalue - - def __call__(self): - value = self.value - self.value += 1 - return value - - def __getstate__(self): - return {'value': self.value} - - def __setstate__(self, state): - self.value = state['value'] - - -class _CubicWebRequestBase(RequestSessionBase): - """abstract HTTP request, should be extended according to the HTTP backend - Immutable attributes that describe the received query and generic configuration - """ - ajax_request = False # to be set to True by ajax controllers - - def __init__(self, vreg, https=False, form=None, headers=None): - """ - :vreg: Vregistry, - :https: boolean, s this a https request - :form: Forms value - :headers: dict, request header - """ - super(_CubicWebRequestBase, self).__init__(vreg) - #: (Boolean) Is this an https request. - self.https = https - #: User interface property (vary with https) (see :ref:`uiprops`) - self.uiprops = None - #: url for serving datadir (vary with https) (see :ref:`resources`) - self.datadir_url = None - if https and vreg.config.https_uiprops is not None: - self.uiprops = vreg.config.https_uiprops - else: - self.uiprops = vreg.config.uiprops - if https and vreg.config.https_datadir_url is not None: - self.datadir_url = vreg.config.https_datadir_url - else: - self.datadir_url = vreg.config.datadir_url - #: enable UStringIO's write tracing - self.tracehtml = False - if vreg.config.debugmode: - self.tracehtml = bool(form.pop('_cwtracehtml', False)) - #: raw html headers that can be added from any view - self.html_headers = HTMLHead(self, tracewrites=self.tracehtml) - #: received headers - self._headers_in = Headers() - if headers is not None: - for k, v in headers.items(): - self._headers_in.addRawHeader(k, v) - #: form parameters - self.setup_params(form) - #: received body - self.content = BytesIO() - # prepare output header - #: Header used for the final response - self.headers_out = Headers() - #: HTTP status use by the final response - self.status_out = 200 - # set up language based on request headers or site default (we don't - # have a user yet, and might not get one) - self.set_user_language(None) - #: dictionary that may be used to store request data that has to be - #: shared among various components used to publish the request (views, - #: controller, application...) - self.data = {} - self._search_state = None - #: page id, set by htmlheader template - self.pageid = None - self._set_pageid() - - def _set_pageid(self): - """initialize self.pageid - if req.form provides a specific pageid, use it, otherwise build a - new one. - """ - pid = self.form.get('pageid') - if pid is None: - pid = make_uid(id(self)) - self.html_headers.define_var('pageid', pid, override=False) - self.pageid = pid - - def _get_json_request(self): - warn('[3.15] self._cw.json_request is deprecated, use self._cw.ajax_request instead', - DeprecationWarning, stacklevel=2) - return self.ajax_request - def _set_json_request(self, value): - warn('[3.15] self._cw.json_request is deprecated, use self._cw.ajax_request instead', - DeprecationWarning, stacklevel=2) - self.ajax_request = value - json_request = property(_get_json_request, _set_json_request) - - def _base_url(self, secure=None): - """return the root url of the instance - - secure = False -> base-url - secure = None -> https-url if req.https - secure = True -> https if it exist - """ - if secure is None: - secure = self.https - base_url = None - if secure: - base_url = self.vreg.config.get('https-url') - if base_url is None: - base_url = super(_CubicWebRequestBase, self)._base_url() - return base_url - - @property - def authmode(self): - """Authentification mode of the instance - (see :ref:`WebServerConfig`)""" - return self.vreg.config['auth-mode'] - - # Various variable generator. - - @property - def varmaker(self): - """the rql varmaker is exposed both as a property and as the - set_varmaker function since we've two use cases: - - * accessing the req.varmaker property to get a new variable name - - * calling req.set_varmaker() to ensure a varmaker is set for later ajax - calls sharing our .pageid - """ - return self.set_varmaker() - - def next_tabindex(self): - nextfunc = self.get_page_data('nexttabfunc') - if nextfunc is None: - nextfunc = Counter(1) - self.set_page_data('nexttabfunc', nextfunc) - return nextfunc() - - def set_varmaker(self): - varmaker = self.get_page_data('rql_varmaker') - if varmaker is None: - varmaker = rqlvar_maker() - self.set_page_data('rql_varmaker', varmaker) - return varmaker - - # input form parameters management ######################################## - - # common form parameters which should be protected against html values - # XXX can't add 'eid' for instance since it may be multivalued - # dont put rql as well, if query contains < and > it will be corrupted! - no_script_form_params = set(('vid', - 'etype', - 'vtitle', 'title', - '__redirectvid', '__redirectrql')) - - def setup_params(self, params): - """WARNING: we're intentionally leaving INTERNAL_FIELD_VALUE here - - subclasses should overrides to - """ - self.form = {} - if params is None: - return - encoding = self.encoding - for param, val in params.items(): - if isinstance(val, (tuple, list)): - if PY2: - val = [unicode(x, encoding) for x in val] - if len(val) == 1: - val = val[0] - elif PY2 and isinstance(val, str): - val = unicode(val, encoding) - if param in self.no_script_form_params and val: - val = self.no_script_form_param(param, val) - if param == '_cwmsgid': - self.set_message_id(val) - else: - self.form[param] = val - - def no_script_form_param(self, param, value): - """ensure there is no script in a user form param - - by default return a cleaned string instead of raising a security - exception - - this method should be called on every user input (form at least) fields - that are at some point inserted in a generated html page to protect - against script kiddies - """ - # safety belt for strange urls like http://...?vtitle=yo&vtitle=yo - if isinstance(value, (list, tuple)): - self.error('no_script_form_param got a list (%s). Who generated the URL ?', - repr(value)) - value = value[0] - return remove_html_tags(value) - - def list_form_param(self, param, form=None, pop=False): - """get param from form parameters and return its value as a list, - skipping internal markers if any - - * if the parameter isn't defined, return an empty list - * if the parameter is a single (unicode) value, return a list - containing that value - * if the parameter is already a list or tuple, just skip internal - markers - - if pop is True, the parameter is removed from the form dictionary - """ - if form is None: - form = self.form - return list_form_param(form, param, pop) - - def reset_headers(self): - """used by AutomaticWebTest to clear html headers between tests on - the same resultset - """ - self.html_headers = HTMLHead(self) - return self - - # web state helpers ####################################################### - - @property - def message(self): - try: - return self.session.data.pop(self._msgid, u'') - except AttributeError: - try: - return self._msg - except AttributeError: - return None - - def set_message(self, msg): - assert isinstance(msg, text_type) - self.reset_message() - self._msg = msg - - def set_message_id(self, msgid): - self._msgid = msgid - - @cached - def redirect_message_id(self): - return make_uid() - - def set_redirect_message(self, msg): - # TODO - this should probably be merged with append_to_redirect_message - assert isinstance(msg, text_type) - msgid = self.redirect_message_id() - self.session.data[msgid] = msg - return msgid - - def append_to_redirect_message(self, msg): - msgid = self.redirect_message_id() - currentmsg = self.session.data.get(msgid) - if currentmsg is not None: - currentmsg = u'%s %s' % (currentmsg, msg) - else: - currentmsg = msg - self.session.data[msgid] = currentmsg - return msgid - - def reset_message(self): - if hasattr(self, '_msg'): - del self._msg - if hasattr(self, '_msgid'): - self.session.data.pop(self._msgid, u'') - del self._msgid - - def _load_search_state(self, searchstate): - if searchstate is None or searchstate == 'normal': - self._search_state = ('normal',) - else: - self._search_state = ('linksearch', searchstate.split(':')) - assert len(self._search_state[-1]) == 4, 'invalid searchstate' - - @property - def search_state(self): - """search state: 'normal' or 'linksearch' (i.e. searching for an object - to create a relation with another)""" - if self._search_state is None: - searchstate = self.session.data.get('search_state', 'normal') - self._load_search_state(searchstate) - return self._search_state - - @search_state.setter - def search_state(self, searchstate): - self._search_state = searchstate - - def update_search_state(self): - """update the current search state if needed""" - searchstate = self.form.get('__mode') - if searchstate: - self.set_search_state(searchstate) - - def set_search_state(self, searchstate): - """set a new search state""" - self.session.data['search_state'] = searchstate - self._load_search_state(searchstate) - - def match_search_state(self, rset): - """when searching an entity to create a relation, return True if entities in - the given rset may be used as relation end - """ - try: - searchedtype = self.search_state[1][-1] - except IndexError: - return False # no searching for association - for etype in rset.column_types(0): - if etype != searchedtype: - return False - return True - - # web edition helpers ##################################################### - - @cached # so it's writed only once - def fckeditor_config(self): - fckeditor_url = self.build_url('fckeditor/fckeditor.js') - self.add_js(fckeditor_url, localfile=False) - self.html_headers.define_var('fcklang', self.lang) - self.html_headers.define_var('fckconfigpath', - self.data_url('cubicweb.fckcwconfig.js')) - def use_fckeditor(self): - return self.vreg.config.fckeditor_installed() and self.property_value('ui.fckeditor') - - def edited_eids(self, withtype=False): - """return a list of edited eids""" - yielded = False - # warning: use .keys since the caller may change `form` - form = self.form - try: - eids = form['eid'] - except KeyError: - raise NothingToEdit(self._('no selected entities')) - if isinstance(eids, string_types): - eids = (eids,) - for peid in eids: - if withtype: - typekey = '__type:%s' % peid - assert typekey in form, 'no entity type specified' - yield peid, form[typekey] - else: - yield peid - yielded = True - if not yielded: - raise NothingToEdit(self._('no selected entities')) - - # minparams=3 by default: at least eid, __type, and some params to change - def extract_entity_params(self, eid, minparams=3): - """extract form parameters relative to the given eid""" - params = {} - eid = str(eid) - form = self.form - for param in form: - try: - name, peid = param.split(':', 1) - except ValueError: - if not param.startswith('__') and param not in ('eid', '_cw_fields'): - self.warning('param %s mis-formatted', param) - continue - if peid == eid: - value = form[param] - if value == INTERNAL_FIELD_VALUE: - value = None - params[name] = value - params['eid'] = eid - if len(params) < minparams: - raise RequestError(self._('missing parameters for entity %s') % eid) - return params - - # XXX this should go to the GenericRelationsField. missing edition cancel protocol. - - def remove_pending_operations(self): - """shortcut to clear req's pending_{delete,insert} entries - - This is needed when the edition is completed (whether it's validated - or cancelled) - """ - self.session.data.pop('pending_insert', None) - self.session.data.pop('pending_delete', None) - - def cancel_edition(self, errorurl): - """remove pending operations and `errorurl`'s specific stored data - """ - self.session.data.pop(errorurl, None) - self.remove_pending_operations() - - # high level methods for HTTP headers management ########################## - - # must be cached since login/password are popped from the form dictionary - # and this method may be called multiple times during authentication - @cached - def get_authorization(self): - """Parse and return the Authorization header""" - if self.authmode == "cookie": - try: - user = self.form.pop("__login") - passwd = self.form.pop("__password", '') - return user, passwd.encode('UTF8') - except KeyError: - self.debug('no login/password in form params') - return None, None - else: - return self.header_authorization() - - def get_cookie(self): - """retrieve request cookies, returns an empty cookie if not found""" - # XXX use http_headers implementation - try: - return SimpleCookie(self.get_header('Cookie')) - except KeyError: - return SimpleCookie() - - def set_cookie(self, name, value, maxage=300, expires=None, secure=False, httponly=False): - """set / update a cookie - - by default, cookie will be available for the next 5 minutes. - Give maxage = None to have a "session" cookie expiring when the - client close its browser - """ - if isinstance(name, SimpleCookie): - warn('[3.13] set_cookie now takes name and value as two first ' - 'argument, not anymore cookie object and name', - DeprecationWarning, stacklevel=2) - secure = name[value]['secure'] - name, value = value, name[value].value - if maxage: # don't check is None, 0 may be specified - assert expires is None, 'both max age and expires cant be specified' - expires = maxage + time.time() - elif expires: - # we don't want to handle times before the EPOCH (cause bug on - # windows). Also use > and not >= else expires == 0 and Cookie think - # that means no expire... - assert expires > date(1970, 1, 1) - expires = timegm(expires.timetuple()) - else: - expires = None - # make sure cookie is set on the correct path - cookie = Cookie(str(name), str(value), self.base_url_path(), - expires=expires, secure=secure, httponly=httponly) - self.headers_out.addHeader('Set-cookie', cookie) - - def remove_cookie(self, name, bwcompat=None): - """remove a cookie by expiring it""" - if bwcompat is not None: - warn('[3.13] remove_cookie now take only a name as argument', - DeprecationWarning, stacklevel=2) - name = bwcompat - self.set_cookie(name, '', maxage=0, expires=date(2000, 1, 1)) - - def set_content_type(self, content_type, filename=None, encoding=None, - disposition='inline'): - """set output content type for this request. An optional filename - may be given. - - The disposition argument may be `attachement` or `inline` as specified - for the Content-disposition HTTP header. The disposition parameter have - no effect if no filename are specified. - """ - if content_type.startswith('text/') and ';charset=' not in content_type: - content_type += ';charset=' + (encoding or self.encoding) - self.set_header('content-type', content_type) - if filename: - header = [disposition] - unicode_filename = None - try: - ascii_filename = filename.encode('ascii').decode('ascii') - except UnicodeEncodeError: - # fallback filename for very old browser - unicode_filename = filename - ascii_filename = filename.encode('ascii', 'ignore').decode('ascii') - # escape " and \ - # see http://greenbytes.de/tech/tc2231/#attwithfilenameandextparamescaped - ascii_filename = ascii_filename.replace('\x5c', r'\\').replace('"', r'\"') - header.append('filename="%s"' % ascii_filename) - if unicode_filename is not None: - # encoded filename according RFC5987 - urlquoted_filename = urlquote(unicode_filename.encode('utf-8'), '') - header.append("filename*=utf-8''" + urlquoted_filename) - self.set_header('content-disposition', ';'.join(header)) - - # high level methods for HTML headers management ########################## - - def add_onload(self, jscode): - self.html_headers.add_onload(jscode) - - def add_js(self, jsfiles, localfile=True): - """specify a list of JS files to include in the HTML headers. - - :param jsfiles: a JS filename or a list of JS filenames - :param localfile: if True, the default data dir prefix is added to the - JS filename - """ - if isinstance(jsfiles, string_types): - jsfiles = (jsfiles,) - for jsfile in jsfiles: - if localfile: - jsfile = self.data_url(jsfile) - self.html_headers.add_js(jsfile) - - def add_css(self, cssfiles, media=u'all', localfile=True, ieonly=False, - iespec=u'[if lt IE 8]'): - """specify a CSS file to include in the HTML headers - - :param cssfiles: a CSS filename or a list of CSS filenames. - :param media: the CSS's media if necessary - :param localfile: if True, the default data dir prefix is added to the - CSS filename - :param ieonly: True if this css is specific to IE - :param iespec: conditional expression that will be used around - the css inclusion. cf: - http://msdn.microsoft.com/en-us/library/ms537512(VS.85).aspx - """ - if isinstance(cssfiles, string_types): - cssfiles = (cssfiles,) - if ieonly: - if self.ie_browser(): - extraargs = [iespec] - add_css = self.html_headers.add_ie_css - else: - return # no need to do anything on non IE browsers - else: - extraargs = [] - add_css = self.html_headers.add_css - for cssfile in cssfiles: - if localfile: - cssfile = self.data_url(cssfile) - add_css(cssfile, media, *extraargs) - - def ajax_replace_url(self, nodeid, replacemode='replace', **extraparams): - """builds an ajax url that will replace nodeid's content - - :param nodeid: the dom id of the node to replace - :param replacemode: defines how the replacement should be done. - - Possible values are : - - 'replace' to replace the node's content with the generated HTML - - 'swap' to replace the node itself with the generated HTML - - 'append' to append the generated HTML to the node's content - - Arbitrary extra named arguments may be given, they will be included as - parameters of the generated url. - """ - # define a function in headers and use it in the link to avoid url - # unescaping pb: browsers give the js expression to the interpreter - # after having url unescaping the content. This may make appear some - # quote or other special characters that will break the js expression. - extraparams.setdefault('fname', 'view') - # remove pageid from the generated URL as it's forced as a parameter - # to the loadxhtml call below. - extraparams.pop('pageid', None) - url = self.build_url('ajax', **extraparams) - cbname = build_cb_uid(url[:50]) - # think to propagate pageid. XXX see https://www.cubicweb.org/ticket/1753121 - jscode = u'function %s() { $("#%s").%s; }' % ( - cbname, nodeid, js.loadxhtml(url, {'pageid': self.pageid}, - 'get', replacemode)) - self.html_headers.add_post_inline_script(jscode) - return "javascript: %s()" % cbname - - # urls/path management #################################################### - - def build_url(self, *args, **kwargs): - """return an absolute URL using params dictionary key/values as URL - parameters. Values are automatically URL quoted, and the - publishing method to use may be specified or will be guessed. - """ - if '__message' in kwargs: - msg = kwargs.pop('__message') - kwargs['_cwmsgid'] = self.set_redirect_message(msg) - if not args: - method = 'view' - if (self.from_controller() == 'view' - and not '_restpath' in kwargs): - method = self.relative_path(includeparams=False) or 'view' - args = (method,) - return super(_CubicWebRequestBase, self).build_url(*args, **kwargs) - - def url(self, includeparams=True): - """return currently accessed url""" - return self.base_url() + self.relative_path(includeparams) - - def selected(self, url): - """return True if the url is equivalent to currently accessed url""" - reqpath = self.relative_path().lower() - baselen = len(self.base_url()) - return (reqpath == url[baselen:].lower()) - - def base_url_prepend_host(self, hostname): - protocol, roothost = urlsplit(self.base_url())[:2] - if roothost.startswith('www.'): - roothost = roothost[4:] - return '%s://%s.%s' % (protocol, hostname, roothost) - - def base_url_path(self): - """returns the absolute path of the base url""" - return urlsplit(self.base_url())[2] - - def data_url(self, relpath): - """returns the absolute path for a data resource""" - return self.datadir_url + relpath - - @cached - def from_controller(self): - """return the id (string) of the controller issuing the request""" - controller = self.relative_path(False).split('/', 1)[0] - if controller in self.vreg['controllers']: - return controller - return 'view' - - def is_client_cache_valid(self): - """check if a client cached page exists (as specified in request - headers) and is still usable. - - Return False if the page has to be calculated, else True. - - Some response cache headers may be set by this method. - """ - modified = True - # Here, we search for any invalid 'not modified' condition - # see http://www.w3.org/Protocols/rfc2616/rfc2616-sec13.html#sec13.3 - validators = get_validators(self._headers_in) - if validators: # if we have no - modified = any(func(val, self.headers_out) for func, val in validators) - # Forge expected response - if not modified: - # overwrite headers_out to forge a brand new not-modified response - self.headers_out = self._forge_cached_headers() - if self.http_method() in ('HEAD', 'GET'): - self.status_out = http_client.NOT_MODIFIED - else: - self.status_out = http_client.PRECONDITION_FAILED - # XXX replace by True once validate_cache bw compat method is dropped - return self.status_out - # XXX replace by False once validate_cache bw compat method is dropped - return None - - @deprecated('[3.18] use .is_client_cache_valid() method instead') - def validate_cache(self): - """raise a `StatusResponse` exception if a cached page along the way - exists and is still usable. - """ - status_code = self.is_client_cache_valid() - if status_code is not None: - raise StatusResponse(status_code) - - # abstract methods to override according to the web front-end ############# - - def http_method(self): - """returns 'POST', 'GET', 'HEAD', etc.""" - raise NotImplementedError() - - def _forge_cached_headers(self): - # overwrite headers_out to forge a brand new not-modified response - headers = Headers() - for header in ( - # Required from sec 10.3.5: - 'date', 'etag', 'content-location', 'expires', - 'cache-control', 'vary', - # Others: - 'server', 'proxy-authenticate', 'www-authenticate', 'warning'): - value = self._headers_in.getRawHeaders(header) - if value is not None: - headers.setRawHeaders(header, value) - return headers - - def relative_path(self, includeparams=True): - """return the normalized path of the request (ie at least relative - to the instance's root, but some other normalization may be needed - so that the returned path may be used to compare to generated urls - - :param includeparams: - boolean indicating if GET form parameters should be kept in the path - """ - raise NotImplementedError() - - # http headers ############################################################ - - ### incoming headers - - def get_header(self, header, default=None, raw=True): - """return the value associated with the given input header, raise - KeyError if the header is not set - """ - if raw: - return self._headers_in.getRawHeaders(header, [default])[0] - return self._headers_in.getHeader(header, default) - - def header_accept_language(self): - """returns an ordered list of preferred languages""" - acceptedlangs = self.get_header('Accept-Language', raw=False) or {} - for lang, _ in sorted(acceptedlangs.items(), key=lambda x: x[1], - reverse=True): - lang = lang.split('-')[0] - yield lang - - def header_if_modified_since(self): - """If the HTTP header If-modified-since is set, return the equivalent - date time value (GMT), else return None - """ - mtime = self.get_header('If-modified-since', raw=False) - if mtime: - return datetime.utcfromtimestamp(mtime) - return None - - ### outcoming headers - def set_header(self, header, value, raw=True): - """set an output HTTP header""" - if raw: - # adding encoded header is important, else page content - # will be reconverted back to unicode and apart unefficiency, this - # may cause decoding problem (e.g. when downloading a file) - self.headers_out.setRawHeaders(header, [str(value)]) - else: - self.headers_out.setHeader(header, value) - - def add_header(self, header, value): - """add an output HTTP header""" - # adding encoded header is important, else page content - # will be reconverted back to unicode and apart unefficiency, this - # may cause decoding problem (e.g. when downloading a file) - self.headers_out.addRawHeader(header, str(value)) - - def remove_header(self, header): - """remove an output HTTP header""" - self.headers_out.removeHeader(header) - - def header_authorization(self): - """returns a couple (auth-type, auth-value)""" - auth = self.get_header("Authorization", None) - if auth: - scheme, rest = auth.split(' ', 1) - scheme = scheme.lower() - try: - assert scheme == "basic" - user, passwd = base64.decodestring(rest.encode('ascii')).split(b":", 1) - # XXX HTTP header encoding: use email.Header? - return user.decode('UTF8'), passwd - except Exception as ex: - self.debug('bad authorization %s (%s: %s)', - auth, ex.__class__.__name__, ex) - return None, None - - def parse_accept_header(self, header): - """returns an ordered list of accepted values""" - try: - value_parser, value_sort_key = ACCEPT_HEADER_PARSER[header.lower()] - except KeyError: - value_parser = value_sort_key = None - accepteds = self.get_header(header, '') - values = _parse_accept_header(accepteds, value_parser, value_sort_key) - return (raw_value for (raw_value, parsed_value, score) in values) - - @deprecated('[3.17] demote_to_html is deprecated as we always serve html') - def demote_to_html(self): - """helper method to dynamically set request content type to text/html - - The global doctype and xmldec must also be changed otherwise the browser - will display '<[' at the beginning of the page - """ - pass - - - # xml doctype ############################################################# - - def set_doctype(self, doctype, reset_xmldecl=None): - """helper method to dynamically change page doctype - - :param doctype: the new doctype, e.g. '' - """ - if reset_xmldecl is not None: - warn('[3.17] reset_xmldecl is deprecated as we only serve html', - DeprecationWarning, stacklevel=2) - self.main_stream.set_doctype(doctype) - - # page data management #################################################### - - def get_page_data(self, key, default=None): - """return value associated to `key` in current page data""" - page_data = self.session.data.get(self.pageid) - if page_data is None: - return default - return page_data.get(key, default) - - def set_page_data(self, key, value): - """set value associated to `key` in current page data""" - self.html_headers.add_unload_pagedata() - page_data = self.session.data.setdefault(self.pageid, {}) - page_data[key] = value - self.session.data[self.pageid] = page_data - - def del_page_data(self, key=None): - """remove value associated to `key` in current page data - if `key` is None, all page data will be cleared - """ - if key is None: - self.session.data.pop(self.pageid, None) - else: - try: - del self.session.data[self.pageid][key] - except KeyError: - pass - - # user-agent detection #################################################### - - @cached - def useragent(self): - return self.get_header('User-Agent', None) - - def ie_browser(self): - useragent = self.useragent() - return useragent and 'MSIE' in useragent - - @deprecated('[3.17] xhtml_browser is deprecated (xhtml is no longer served)') - def xhtml_browser(self): - """return True if the browser is considered as xhtml compatible. - - If the instance is configured to always return text/html and not - application/xhtml+xml, this method will always return False, even though - this is semantically different - """ - return False - - def html_content_type(self): - return 'text/html' - - def set_user_language(self, user): - vreg = self.vreg - if user is not None: - try: - # 1. user-specified language - lang = vreg.typed_value('ui.language', user.properties['ui.language']) - self.set_language(lang) - return - except KeyError: - pass - if vreg.config.get('language-negociation', False): - # 2. http accept-language - self.headers_out.addHeader('Vary', 'Accept-Language') - for lang in self.header_accept_language(): - if lang in self.translations: - self.set_language(lang) - return - # 3. site's default language - self.set_default_language(vreg) - - -def _cnx_func(name): - def proxy(req, *args, **kwargs): - return getattr(req.cnx, name)(*args, **kwargs) - return proxy - -class _NeedAuthAccessMock(object): - - def __getattribute__(self, attr): - raise AuthenticationError() - - def __bool__(self): - return False - - __nonzero__ = __bool__ - -class _MockAnonymousSession(object): - sessionid = 'thisisnotarealsession' - - @property - def data(self): - return {} - - @property - def anonymous_session(self): - return True - -class ConnectionCubicWebRequestBase(_CubicWebRequestBase): - cnx = None - session = None - - def __init__(self, vreg, https=False, form=None, headers={}): - """""" - self.vreg = vreg - try: - # no vreg or config which doesn't handle translations - self.translations = vreg.config.translations - except AttributeError: - self.translations = {} - super(ConnectionCubicWebRequestBase, self).__init__(vreg, https=https, - form=form, headers=headers) - self.session = _MockAnonymousSession() - self.cnx = self.user = _NeedAuthAccessMock() - - @property - def transaction_data(self): - return self.cnx.transaction_data - - def set_cnx(self, cnx): - self.cnx = cnx - self.session = cnx.session - self._set_user(cnx.user) - self.set_user_language(cnx.user) - - def execute(self, *args, **kwargs): - rset = self.cnx.execute(*args, **kwargs) - rset.req = self - return rset - - def set_default_language(self, vreg): - try: - lang = vreg.property_value('ui.language') - except Exception: # property may not be registered - lang = 'en' - try: - self.set_language(lang) - except KeyError: - # this occurs usually during test execution - self._ = self.__ = text_type - self.pgettext = lambda x, y: text_type(y) - - entity_metas = _cnx_func('entity_metas') - source_defs = _cnx_func('source_defs') - get_shared_data = _cnx_func('get_shared_data') - set_shared_data = _cnx_func('set_shared_data') - describe = _cnx_func('describe') # deprecated XXX - - # security ################################################################# - - security_enabled = _cnx_func('security_enabled') - - # server-side service call ################################################# - - def call_service(self, regid, **kwargs): - return self.cnx.call_service(regid, **kwargs) - - # entities cache management ############################################### - - def entity_cache(self, eid): - return self.transaction_data['req_ecache'][eid] - - def set_entity_cache(self, entity): - ecache = self.transaction_data.setdefault('req_ecache', {}) - ecache.setdefault(entity.eid, entity) - - def cached_entities(self): - return self.transaction_data.get('req_ecache', {}).values() - - def drop_entity_cache(self, eid=None): - if eid is None: - self.transaction_data.pop('req_ecache', None) - else: - del self.transaction_data['req_ecache'][eid] - - -CubicWebRequestBase = ConnectionCubicWebRequestBase - - -## HTTP-accept parsers / utilies ############################################## -def _mimetype_sort_key(accept_info): - """accepted mimetypes must be sorted by : - - 1/ highest score first - 2/ most specific mimetype first, e.g. : - - 'text/html level=1' is more specific 'text/html' - - 'text/html' is more specific than 'text/*' - - 'text/*' itself more specific than '*/*' - - """ - raw_value, (media_type, media_subtype, media_type_params), score = accept_info - # FIXME: handle '+' in media_subtype ? (should xhtml+xml have a - # higher precedence than xml ?) - if media_subtype == '*': - score -= 0.0001 - if media_type == '*': - score -= 0.0001 - return 1./score, media_type, media_subtype, 1./(1+len(media_type_params)) - -def _charset_sort_key(accept_info): - """accepted mimetypes must be sorted by : - - 1/ highest score first - 2/ most specific charset first, e.g. : - - 'utf-8' is more specific than '*' - """ - raw_value, value, score = accept_info - if value == '*': - score -= 0.0001 - return 1./score, value - -def _parse_accept_header(raw_header, value_parser=None, value_sort_key=None): - """returns an ordered list accepted types - - :param value_parser: a function to parse a raw accept chunk. If None - is provided, the function defaults to identity. If a function is provided, - it must accept 2 parameters ``value`` and ``other_params``. ``value`` is - the value found before the first ';', `other_params` is a dictionary - built from all other chunks after this first ';' - - :param value_sort_key: a key function to sort values found in the accept - header. This function will be passed a 3-tuple - (raw_value, parsed_value, score). If None is provided, the default - sort_key is 1./score - - :return: a list of 3-tuple (raw_value, parsed_value, score), - ordered by score. ``parsed_value`` will be the return value of - ``value_parser(raw_value)`` - """ - if value_sort_key is None: - value_sort_key = lambda infos: 1./infos[-1] - values = [] - for info in raw_header.split(','): - score = 1.0 - other_params = {} - try: - value, infodef = info.split(';', 1) - except ValueError: - value = info - else: - for info in infodef.split(';'): - try: - infokey, infoval = info.split('=') - if infokey == 'q': # XXX 'level' - score = float(infoval) - continue - except ValueError: - continue - other_params[infokey] = infoval - parsed_value = value_parser(value, other_params) if value_parser else value - values.append( (value.strip(), parsed_value, score) ) - values.sort(key=value_sort_key) - return values - - -def _mimetype_parser(value, other_params): - """return a 3-tuple - (type, subtype, type_params) corresponding to the mimetype definition - e.g. : for 'text/*', `mimetypeinfo` will be ('text', '*', {}), for - 'text/html;level=1', `mimetypeinfo` will be ('text', '*', {'level': '1'}) - """ - try: - media_type, media_subtype = value.strip().split('/', 1) - except ValueError: # safety belt : '/' should always be present - media_type = value.strip() - media_subtype = '*' - return (media_type, media_subtype, other_params) - - -ACCEPT_HEADER_PARSER = { - 'accept': (_mimetype_parser, _mimetype_sort_key), - 'accept-charset': (None, _charset_sort_key), - } - -from cubicweb import set_log_methods -set_log_methods(_CubicWebRequestBase, LOGGER)