changeset 11057 0b59724cb3f2
parent 11052 058bb3dc685f
child 11058 23eb30449fe5
--- a/devtools/	Mon Jan 04 18:40:30 2016 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,1335 +0,0 @@
-# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
-# contact --
-# This file is part of CubicWeb.
-# CubicWeb is free software: you can redistribute it and/or modify it under the
-# terms of the GNU Lesser General Public License as published by the Free
-# Software Foundation, either version 2.1 of the License, or (at your option)
-# any later version.
-# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
-# details.
-# You should have received a copy of the GNU Lesser General Public License along
-# with CubicWeb.  If not, see <>.
-"""this module contains base classes and utilities for cubicweb tests"""
-from __future__ import print_function
-import sys
-import re
-from os.path import dirname, join, abspath
-from math import log
-from contextlib import contextmanager
-from itertools import chain
-from six import text_type, string_types
-from six.moves import range
-from six.moves.urllib.parse import urlparse, parse_qs, unquote as urlunquote
-import yams.schema
-from logilab.common.testlib import TestCase, InnerTest, Tags
-from logilab.common.pytest import nocoverage, pause_trace
-from logilab.common.debugger import Debugger
-from logilab.common.umessage import message_from_string
-from logilab.common.decorators import cached, classproperty, clear_cache, iclassmethod
-from logilab.common.deprecation import deprecated, class_deprecated
-from logilab.common.shellutils import getlogin
-from cubicweb import (ValidationError, NoSelectableObject, AuthenticationError,
-                      BadConnectionId)
-from cubicweb import cwconfig, devtools, web, server, repoapi
-from cubicweb.utils import json
-from cubicweb.sobjects import notification
-from cubicweb.web import Redirect, application, eid_param
-from cubicweb.server.hook import SendMailOp
-from cubicweb.server.session import Session
-from cubicweb.devtools import fake, htmlparser, DEFAULT_EMPTY_DB_ID
-# low-level utilities ##########################################################
-class CubicWebDebugger(Debugger):
-    """special debugger class providing a 'view' function which saves some
-    html into a temporary file and open a web browser to examinate it.
-    """
-    def do_view(self, arg):
-        import webbrowser
-        data = self._getval(arg)
-        with open('/tmp/toto.html', 'w') as toto:
-            toto.write(data)
-def line_context_filter(line_no, center, before=3, after=None):
-    """return true if line are in context
-    if after is None: after = before
-    """
-    if after is None:
-        after = before
-    return center - before <= line_no <= center + after
-def unprotected_entities(schema, strict=False):
-    """returned a set of each non final entity type, excluding "system" entities
-    (eg CWGroup, CWUser...)
-    """
-    if strict:
-        protected_entities = yams.schema.BASE_TYPES
-    else:
-        protected_entities = yams.schema.BASE_TYPES.union(SYSTEM_ENTITIES)
-    return set(schema.entities()) - protected_entities
-class JsonValidator(object):
-    def parse_string(self, data):
-        return json.loads(data.decode('ascii'))
-def real_error_handling(app):
-    """By default, CubicWebTC `app` attribute (ie the publisher) is monkey
-    patched so that unexpected error are raised rather than going through the
-    `error_handler` method.
-    By using this context manager you disable this monkey-patching temporarily.
-    Hence when publishihng a request no error will be raised, you'll get
-    req.status_out set to an HTTP error status code and the generated page will
-    usually hold a traceback as HTML.
-    >>> with real_error_handling(app):
-    >>>     page = app.handle_request(req)
-    """
-    # remove the monkey patched error handler
-    fake_error_handler = app.error_handler
-    del app.error_handler
-    # return the app
-    yield app
-    # restore
-    app.error_handler = fake_error_handler
-# email handling, to test emails sent by an application ########################
-class Email(object):
-    """you'll get instances of Email into MAILBOX during tests that trigger
-    some notification.
-    * `msg` is the original message object
-    * `recipients` is a list of email address which are the recipients of this
-      message
-    """
-    def __init__(self, fromaddr, recipients, msg):
-        self.fromaddr = fromaddr
-        self.recipients = recipients
-        self.msg = msg
-    @property
-    def message(self):
-        return message_from_string(self.msg)
-    @property
-    def subject(self):
-        return self.message.get('Subject')
-    @property
-    def content(self):
-        return self.message.get_payload(decode=True)
-    def __repr__(self):
-        return '<Email to %s with subject %s>' % (','.join(self.recipients),
-                                                  self.message.get('Subject'))
-# the trick to get email into MAILBOX instead of actually sent: monkey patch
-# cwconfig.SMTP object
-class MockSMTP:
-    def __init__(self, server, port):
-        pass
-    def close(self):
-        pass
-    def sendmail(self, fromaddr, recipients, msg):
-        MAILBOX.append(Email(fromaddr, recipients, msg))
-cwconfig.SMTP = MockSMTP
-# Repoaccess utility ###############################################3###########
-class RepoAccess(object):
-    """An helper to easily create object to access the repo as a specific user
-    Each RepoAccess have it own session.
-    A repo access can create three type of object:
-    .. automethod:: cubicweb.testlib.RepoAccess.cnx
-    .. automethod:: cubicweb.testlib.RepoAccess.web_request
-    The RepoAccess need to be closed to destroy the associated Session.
-    TestCase usually take care of this aspect for the user.
-    .. automethod:: cubicweb.testlib.RepoAccess.close
-    """
-    def __init__(self, repo, login, requestcls):
-        self._repo = repo
-        self._login = login
-        self.requestcls = requestcls
-        self._session = self._unsafe_connect(login)
-    def _unsafe_connect(self, login, **kwargs):
-        """ a completely unsafe connect method for the tests """
-        # use an internal connection
-        with self._repo.internal_cnx() as cnx:
-            # try to get a user object
-            user = cnx.find('CWUser', login=login).one()
-            user.groups
-            user.login
-            session = Session(user, self._repo)
-            self._repo._sessions[session.sessionid] = session
-            user._cw = user.cw_rset.req = session
-        with session.new_cnx() as cnx:
-  'session_open', cnx)
-            # commit connection at this point in case write operation has been
-            # done during `session_open` hooks
-            cnx.commit()
-        return session
-    @contextmanager
-    def cnx(self):
-        """Context manager returning a server side connection for the user"""
-        with self._session.new_cnx() as cnx:
-            yield cnx
-    # aliases for bw compat
-    client_cnx = repo_cnx = cnx
-    @contextmanager
-    def web_request(self, url=None, headers={}, method='GET', **kwargs):
-        """Context manager returning a web request pre-linked to a client cnx
-        To commit and rollback use::
-            req.cnx.commit()
-            req.cnx.rolback()
-        """
-        req = self.requestcls(self._repo.vreg, url=url, headers=headers,
-                              method=method, form=kwargs)
-        with self._session.new_cnx() as cnx:
-            req.set_cnx(cnx)
-            yield req
-    def close(self):
-        """Close the session associated to the RepoAccess"""
-        if self._session is not None:
-            self._repo.close(self._session.sessionid)
-        self._session = None
-    @contextmanager
-    def shell(self):
-        from cubicweb.server.migractions import ServerMigrationHelper
-        with self._session.new_cnx() as cnx:
-            mih = ServerMigrationHelper(None, repo=self._repo, cnx=cnx,
-                                        interactive=False,
-                                        # hack so it don't try to load fs schema
-                                        schema=1)
-            yield mih
-            cnx.commit()
-# base class for cubicweb tests requiring a full cw environments ###############
-class CubicWebTC(TestCase):
-    """abstract class for test using an apptest environment
-    attributes:
-    * `vreg`, the vregistry
-    * `schema`, self.vreg.schema
-    * `config`, cubicweb configuration
-    * `cnx`, repoapi connection to the repository using an admin user
-    * `session`, server side session associated to `cnx`
-    * `app`, the cubicweb publisher (for web testing)
-    * `repo`, the repository object
-    * `admlogin`, login of the admin user
-    * `admpassword`, password of the admin user
-    * `shell`, create and use shell environment
-    * `anonymous_allowed`: flag telling if anonymous browsing should be allowed
-    """
-    appid = 'data'
-    configcls = devtools.ApptestConfiguration
-    requestcls = fake.FakeRequest
-    tags = TestCase.tags | Tags('cubicweb', 'cw_repo')
-    test_db_id = DEFAULT_EMPTY_DB_ID
-    # anonymous is logged by default in cubicweb test cases
-    anonymous_allowed = True
-    def __init__(self, *args, **kwargs):
-        self._admin_session = None
-        self.repo = None
-        self._open_access = set()
-        super(CubicWebTC, self).__init__(*args, **kwargs)
-    # repository connection handling ###########################################
-    def new_access(self, login):
-        """provide a new RepoAccess object for a given user
-        The access is automatically closed at the end of the test."""
-        login = text_type(login)
-        access = RepoAccess(self.repo, login, self.requestcls)
-        self._open_access.add(access)
-        return access
-    def _close_access(self):
-        while self._open_access:
-            try:
-                self._open_access.pop().close()
-            except BadConnectionId:
-                continue  # already closed
-    @property
-    def session(self):
-        """return admin session"""
-        return self._admin_session
-    # XXX this doesn't need to a be classmethod anymore
-    def _init_repo(self):
-        """init the repository and connection to it.
-        """
-        # get or restore and working db.
-        db_handler = devtools.get_test_db_handler(self.config, self.init_config)
-        db_handler.build_db_cache(self.test_db_id, self.pre_setup_database)
-        db_handler.restore_database(self.test_db_id)
-        self.repo = db_handler.get_repo(startup=True)
-        # get an admin session (without actual login)
-        login = text_type(db_handler.config.default_admin_config['login'])
-        self.admin_access = self.new_access(login)
-        self._admin_session = self.admin_access._session
-    # config management ########################################################
-    @classproperty
-    def config(cls):
-        """return the configuration object
-        Configuration is cached on the test class.
-        """
-        if cls is CubicWebTC:
-            # Prevent direct use of CubicWebTC directly to avoid database
-            # caching issues
-            return None
-        try:
-            return cls.__dict__['_config']
-        except KeyError:
-            home = abspath(join(dirname(sys.modules[cls.__module__].__file__), cls.appid))
-            config = cls._config = cls.configcls(cls.appid, apphome=home)
-            config.mode = 'test'
-            return config
-    @classmethod  # XXX could be turned into a regular method
-    def init_config(cls, config):
-        """configuration initialization hooks.
-        You may only want to override here the configuraton logic.
-        Otherwise, consider to use a different :class:`ApptestConfiguration`
-        defined in the `configcls` class attribute.
-        This method will be called by the database handler once the config has
-        been properly bootstrapped.
-        """
-        admincfg = config.default_admin_config
-        cls.admlogin = text_type(admincfg['login'])
-        cls.admpassword = admincfg['password']
-        # uncomment the line below if you want rql queries to be logged
-        # config.global_set_option('query-log-file',
-        #                          '/tmp/test_rql_log.' + `os.getpid()`)
-        config.global_set_option('log-file', None)
-        # set default-dest-addrs to a dumb email address to avoid mailbox or
-        # mail queue pollution
-        config.global_set_option('default-dest-addrs', ['whatever'])
-        send_to = '' % getlogin()
-        config.global_set_option('sender-addr', send_to)
-        config.global_set_option('default-dest-addrs', send_to)
-        config.global_set_option('sender-name', 'cubicweb-test')
-        config.global_set_option('sender-addr', '')
-        # default_base_url on config class isn't enough for TestServerConfiguration
-        config.global_set_option('base-url', config.default_base_url())
-        # web resources
-        try:
-            config.global_set_option('embed-allowed', re.compile('.*'))
-        except Exception:  # not in server only configuration
-            pass
-    @property
-    def vreg(self):
-        return self.repo.vreg
-    # global resources accessors ###############################################
-    @property
-    def schema(self):
-        """return the application schema"""
-        return self.vreg.schema
-    def set_option(self, optname, value):
-        self.config.global_set_option(optname, value)
-    def set_debug(self, debugmode):
-        server.set_debug(debugmode)
-    def debugged(self, debugmode):
-        return server.debugged(debugmode)
-    # default test setup and teardown #########################################
-    def setUp(self):
-        # monkey patch send mail operation so emails are sent synchronously
-        self._patch_SendMailOp()
-        with pause_trace():
-            previous_failure = self.__class__.__dict__.get('_repo_init_failed')
-            if previous_failure is not None:
-                self.skipTest('repository is not initialised: %r' % previous_failure)
-            try:
-                self._init_repo()
-            except Exception as ex:
-                self.__class__._repo_init_failed = ex
-                raise
-            self.addCleanup(self._close_access)
-        self.config.set_anonymous_allowed(self.anonymous_allowed)
-        self.setup_database()
-        MAILBOX[:] = []  # reset mailbox
-    def tearDown(self):
-        # XXX hack until logilab.common.testlib is fixed
-        if self._admin_session is not None:
-            self.repo.close(self._admin_session.sessionid)
-            self._admin_session = None
-        while self._cleanups:
-            cleanup, args, kwargs = self._cleanups.pop(-1)
-            cleanup(*args, **kwargs)
-        self.repo.turn_repo_off()
-    def _patch_SendMailOp(self):
-        # monkey patch send mail operation so emails are sent synchronously
-        _old_mail_postcommit_event = SendMailOp.postcommit_event
-        SendMailOp.postcommit_event = SendMailOp.sendmails
-        def reverse_SendMailOp_monkey_patch():
-            SendMailOp.postcommit_event = _old_mail_postcommit_event
-        self.addCleanup(reverse_SendMailOp_monkey_patch)
-    def setup_database(self):
-        """add your database setup code by overriding this method"""
-    @classmethod
-    def pre_setup_database(cls, cnx, config):
-        """add your pre database setup code by overriding this method
-        Do not forget to set the cls.test_db_id value to enable caching of the
-        result.
-        """
-    # user / session management ###############################################
-    @deprecated('[3.19] explicitly use RepoAccess object in test instead')
-    def user(self, req=None):
-        """return the application schema"""
-        if req is None:
-            return self.request().user
-        else:
-            return req.user
-    @iclassmethod  # XXX turn into a class method
-    def create_user(self, req, login=None, groups=('users',), password=None,
-                    email=None, commit=True, **kwargs):
-        """create and return a new user entity"""
-        if password is None:
-            password = login
-        if login is not None:
-            login = text_type(login)
-        user = req.create_entity('CWUser', login=login,
-                                 upassword=password, **kwargs)
-        req.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)'
-                    % ','.join(repr(str(g)) for g in groups),
-                    {'x': user.eid})
-        if email is not None:
-            req.create_entity('EmailAddress', address=text_type(email),
-                              reverse_primary_email=user)
-        user.cw_clear_relation_cache('in_group', 'subject')
-        if commit:
-            try:
-                req.commit()  # req is a session
-            except AttributeError:
-                req.cnx.commit()
-        return user
-    # other utilities #########################################################
-    @contextmanager
-    def temporary_appobjects(self, *appobjects):
-        self.vreg._loadedmods.setdefault(self.__module__, {})
-        for obj in appobjects:
-            self.vreg.register(obj)
-            registered = getattr(obj, '__registered__', None)
-            if registered:
-                for registry in obj.__registries__:
-                    registered(self.vreg[registry])
-        try:
-            yield
-        finally:
-            for obj in appobjects:
-                self.vreg.unregister(obj)
-    @contextmanager
-    def temporary_permissions(self, *perm_overrides, **perm_kwoverrides):
-        """Set custom schema permissions within context.
-        There are two ways to call this method, which may be used together :
-        * using positional argument(s):
-          .. sourcecode:: python
-                rdef = self.schema['CWUser'].rdef('login')
-                with self.temporary_permissions((rdef, {'read': ()})):
-                    ...
-        * using named argument(s):
-          .. sourcecode:: python
-                with self.temporary_permissions(CWUser={'read': ()}):
-                    ...
-        Usually the former will be preferred to override permissions on a
-        relation definition, while the latter is well suited for entity types.
-        The allowed keys in the permission dictionary depend on the schema type
-        (entity type / relation definition). Resulting permissions will be
-        similar to `orig_permissions.update(partial_perms)`.
-        """
-        torestore = []
-        for erschema, etypeperms in chain(perm_overrides, perm_kwoverrides.items()):
-            if isinstance(erschema, string_types):
-                erschema = self.schema[erschema]
-            for action, actionperms in etypeperms.items():
-                origperms = erschema.permissions[action]
-                erschema.set_action_permissions(action, actionperms)
-                torestore.append([erschema, action, origperms])
-        try:
-            yield
-        finally:
-            for erschema, action, permissions in torestore:
-                if action is None:
-                    erschema.permissions = permissions
-                else:
-                    erschema.set_action_permissions(action, permissions)
-    def assertModificationDateGreater(self, entity, olddate):
-        entity.cw_attr_cache.pop('modification_date', None)
-        self.assertGreater(entity.modification_date, olddate)
-    def assertMessageEqual(self, req, params, expected_msg):
-        msg =[params['_cwmsgid']]
-        self.assertEqual(expected_msg, msg)
-    # workflow utilities #######################################################
-    def assertPossibleTransitions(self, entity, expected):
-        transitions = entity.cw_adapt_to('IWorkflowable').possible_transitions()
-        self.assertListEqual(sorted( for tr in transitions),
-                             sorted(expected))
-    # views and actions registries inspection ##################################
-    def pviews(self, req, rset):
-        return sorted((a.__regid__, a.__class__)
-                      for a in self.vreg['views'].possible_views(req, rset=rset))
-    def pactions(self, req, rset,
-                 skipcategories=('addrelated', 'siteactions', 'useractions',
-                                 'footer', 'manage')):
-        return [(a.__regid__, a.__class__)
-                for a in self.vreg['actions'].poss_visible_objects(req, rset=rset)
-                if a.category not in skipcategories]
-    def pactions_by_cats(self, req, rset, categories=('addrelated',)):
-        return [(a.__regid__, a.__class__)
-                for a in self.vreg['actions'].poss_visible_objects(req, rset=rset)
-                if a.category in categories]
-    def pactionsdict(self, req, rset,
-                     skipcategories=('addrelated', 'siteactions', 'useractions',
-                                     'footer', 'manage')):
-        res = {}
-        for a in self.vreg['actions'].poss_visible_objects(req, rset=rset):
-            if a.category not in skipcategories:
-                res.setdefault(a.category, []).append(a.__class__)
-        return res
-    def action_submenu(self, req, rset, id):
-        return self._test_action(self.vreg['actions'].select(id, req, rset=rset))
-    def _test_action(self, action):
-        class fake_menu(list):
-            @property
-            def items(self):
-                return self
-        class fake_box(object):
-            def action_link(self, action, **kwargs):
-                return (action.title, action.url())
-        submenu = fake_menu()
-        action.fill_menu(fake_box(), submenu)
-        return submenu
-    def list_views_for(self, rset):
-        """returns the list of views that can be applied on `rset`"""
-        req = rset.req
-        only_once_vids = ('primary', 'secondary', 'text')
-['ex'] = ValueError("whatever")
-        viewsvreg = self.vreg['views']
-        for vid, views in viewsvreg.items():
-            if vid[0] == '_':
-                continue
-            if rset.rowcount > 1 and vid in only_once_vids:
-                continue
-            views = [view for view in views
-                     if view.category != 'startupview'
-                     and not issubclass(view, notification.NotificationView)
-                     and not isinstance(view, class_deprecated)]
-            if views:
-                try:
-                    view = viewsvreg._select_best(views, req, rset=rset)
-                    if view is None:
-                        raise NoSelectableObject((req,), {'rset': rset}, views)
-                    if view.linkable():
-                        yield view
-                    else:
-                        not_selected(self.vreg, view)
-                    # else the view is expected to be used as subview and should
-                    # not be tested directly
-                except NoSelectableObject:
-                    continue
-    def list_actions_for(self, rset):
-        """returns the list of actions that can be applied on `rset`"""
-        req = rset.req
-        for action in self.vreg['actions'].possible_objects(req, rset=rset):
-            yield action
-    def list_boxes_for(self, rset):
-        """returns the list of boxes that can be applied on `rset`"""
-        req = rset.req
-        for box in self.vreg['ctxcomponents'].possible_objects(req, rset=rset):
-            yield box
-    def list_startup_views(self):
-        """returns the list of startup views"""
-        with self.admin_access.web_request() as req:
-            for view in self.vreg['views'].possible_views(req, None):
-                if view.category == 'startupview':
-                    yield view.__regid__
-                else:
-                    not_selected(self.vreg, view)
-    # web ui testing utilities #################################################
-    @property
-    @cached
-    def app(self):
-        """return a cubicweb publisher"""
-        publisher = application.CubicWebPublisher(self.repo, self.config)
-        def raise_error_handler(*args, **kwargs):
-            raise
-        publisher.error_handler = raise_error_handler
-        return publisher
-    @deprecated('[3.19] use the .remote_calling method')
-    def remote_call(self, fname, *args):
-        """remote json call simulation"""
-        dump = json.dumps
-        args = [dump(arg) for arg in args]
-        req = self.request(fname=fname, pageid='123', arg=args)
-        ctrl = self.vreg['controllers'].select('ajax', req)
-        return ctrl.publish(), req
-    @contextmanager
-    def remote_calling(self, fname, *args):
-        """remote json call simulation"""
-        args = [json.dumps(arg) for arg in args]
-        with self.admin_access.web_request(fname=fname, pageid='123', arg=args) as req:
-            ctrl = self.vreg['controllers'].select('ajax', req)
-            yield ctrl.publish(), req
-    def app_handle_request(self, req, path='view'):
-        return, path)
-    @deprecated("[3.15] app_handle_request is the new and better way"
-                " (beware of small semantic changes)")
-    def app_publish(self, *args, **kwargs):
-        return self.app_handle_request(*args, **kwargs)
-    def ctrl_publish(self, req, ctrl='edit', rset=None):
-        """call the publish method of the edit controller"""
-        ctrl = self.vreg['controllers'].select(ctrl, req,
-        try:
-            result = ctrl.publish(rset)
-            req.cnx.commit()
-        except web.Redirect:
-            req.cnx.commit()
-            raise
-        return result
-    @staticmethod
-    def fake_form(formid, field_dict=None, entity_field_dicts=()):
-        """Build _cw.form dictionnary to fake posting of some standard cubicweb form
-        * `formid`, the form id, usually form's __regid__
-        * `field_dict`, dictionary of name:value for fields that are not tied to an entity
-        * `entity_field_dicts`, list of (entity, dictionary) where dictionary contains name:value
-          for fields that are not tied to the given entity
-        """
-        assert field_dict or entity_field_dicts, \
-            'field_dict and entity_field_dicts arguments must not be both unspecified'
-        if field_dict is None:
-            field_dict = {}
-        form = {'__form_id': formid}
-        fields = []
-        for field, value in field_dict.items():
-            fields.append(field)
-            form[field] = value
-        def _add_entity_field(entity, field, value):
-            entity_fields.append(field)
-            form[eid_param(field, entity.eid)] = value
-        for entity, field_dict in entity_field_dicts:
-            if '__maineid' not in form:
-                form['__maineid'] = entity.eid
-            entity_fields = []
-            form.setdefault('eid', []).append(entity.eid)
-            _add_entity_field(entity, '__type', entity.cw_etype)
-            for field, value in field_dict.items():
-                _add_entity_field(entity, field, value)
-            if entity_fields:
-                form[eid_param('_cw_entity_fields', entity.eid)] = ','.join(entity_fields)
-        if fields:
-            form['_cw_fields'] = ','.join(sorted(fields))
-        return form
-    @deprecated('[3.19] use .admin_request_from_url instead')
-    def req_from_url(self, url):
-        """parses `url` and builds the corresponding CW-web request
-        req.form will be setup using the url's query string
-        """
-        req = self.request(url=url)
-        if isinstance(url, unicode):
-            url = url.encode(req.encoding)  # req.setup_params() expects encoded strings
-        querystring = urlparse(url)[-2]
-        params = parse_qs(querystring)
-        req.setup_params(params)
-        return req
-    @contextmanager
-    def admin_request_from_url(self, url):
-        """parses `url` and builds the corresponding CW-web request
-        req.form will be setup using the url's query string
-        """
-        with self.admin_access.web_request(url=url) as req:
-            if isinstance(url, unicode):
-                url = url.encode(req.encoding)  # req.setup_params() expects encoded strings
-            querystring = urlparse(url)[-2]
-            params = parse_qs(querystring)
-            req.setup_params(params)
-            yield req
-    def url_publish(self, url, data=None):
-        """takes `url`, uses application's app_resolver to find the appropriate
-        controller and result set, then publishes the result.
-        To simulate post of www-form-encoded data, give a `data` dictionary
-        containing desired key/value associations.
-        This should pretty much correspond to what occurs in a real CW server
-        except the apache-rewriter component is not called.
-        """
-        with self.admin_request_from_url(url) as req:
-            if data is not None:
-                req.form.update(data)
-            ctrlid, rset =, req.relative_path(False))
-            return self.ctrl_publish(req, ctrlid, rset)
-    def http_publish(self, url, data=None):
-        """like `url_publish`, except this returns a http response, even in case
-        of errors. You may give form parameters using the `data` argument.
-        """
-        with self.admin_request_from_url(url) as req:
-            if data is not None:
-                req.form.update(data)
-            with real_error_handling(
-                result = self.app_handle_request(req, req.relative_path(False))
-            return result, req
-    @staticmethod
-    def _parse_location(req, location):
-        try:
-            path, params = location.split('?', 1)
-        except ValueError:
-            path = location
-            params = {}
-        else:
-            cleanup = lambda p: (p[0], urlunquote(p[1]))
-            params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p)
-        if path.startswith(req.base_url()):  # may be relative
-            path = path[len(req.base_url()):]
-        return path, params
-    def expect_redirect(self, callback, req):
-        """call the given callback with req as argument, expecting to get a
-        Redirect exception
-        """
-        try:
-            callback(req)
-        except Redirect as ex:
-            return self._parse_location(req, ex.location)
-        else:
-  'expected a Redirect exception')
-    def expect_redirect_handle_request(self, req, path='edit'):
-        """call the publish method of the application publisher, expecting to
-        get a Redirect exception
-        """
-        self.app_handle_request(req, path)
-        self.assertTrue(300 <= req.status_out < 400, req.status_out)
-        location = req.get_response_header('location')
-        return self._parse_location(req, location)
-    @deprecated("[3.15] expect_redirect_handle_request is the new and better way"
-                " (beware of small semantic changes)")
-    def expect_redirect_publish(self, *args, **kwargs):
-        return self.expect_redirect_handle_request(*args, **kwargs)
-    def set_auth_mode(self, authmode, anonuser=None):
-        self.set_option('auth-mode', authmode)
-        self.set_option('anonymous-user', anonuser)
-        if anonuser is None:
-            self.config.anonymous_credential = None
-        else:
-            self.config.anonymous_credential = (anonuser, anonuser)
-    def init_authentication(self, authmode, anonuser=None):
-        self.set_auth_mode(authmode, anonuser)
-        req = self.requestcls(self.vreg, url='login')
-        sh =
-        authm = sh.session_manager.authmanager
-        authm.anoninfo = self.vreg.config.anonymous_user()
-        authm.anoninfo = authm.anoninfo[0], {'password': authm.anoninfo[1]}
-        # not properly cleaned between tests
-        self.open_sessions = sh.session_manager._sessions = {}
-        return req, self.session
-    def assertAuthSuccess(self, req, origsession, nbsessions=1):
-        sh =
-        session =
-        cnx = repoapi.Connection(session)
-        req.set_cnx(cnx)
-        self.assertEqual(len(self.open_sessions), nbsessions, self.open_sessions)
-        self.assertEqual(session.login, origsession.login)
-        self.assertEqual(session.anonymous_session, False)
-    def assertAuthFailure(self, req, nbsessions=0):
-        with self.assertRaises(AuthenticationError):
-        # +0 since we do not track the opened session
-        self.assertEqual(len(self.open_sessions), nbsessions)
-        clear_cache(req, 'get_authorization')
-    # content validation #######################################################
-    # validators are used to validate (XML, DTD, whatever) view's content
-    # validators availables are :
-    #  DTDValidator : validates XML + declared DTD
-    #  SaxOnlyValidator : guarantees XML is well formed
-    #  None : do not try to validate anything
-    # validators used must be imported from from.devtools.htmlparser
-    content_type_validators = {
-        # maps MIME type : validator name
-        #
-        # do not set html validators here, we need HTMLValidator for html
-        # snippets
-        # 'text/html': DTDValidator,
-        # 'application/xhtml+xml': DTDValidator,
-        'application/xml': htmlparser.XMLValidator,
-        'text/xml': htmlparser.XMLValidator,
-        'application/json': JsonValidator,
-        'text/plain': None,
-        'text/comma-separated-values': None,
-        'text/x-vcard': None,
-        'text/calendar': None,
-        'image/png': None,
-        }
-    # maps vid : validator name (override content_type_validators)
-    vid_validators = dict((vid, htmlparser.VALMAP[valkey])
-                          for vid, valkey in VIEW_VALIDATORS.items())
-    def view(self, vid, rset=None, req=None, template='main-template',
-             **kwargs):
-        """This method tests the view `vid` on `rset` using `template`
-        If no error occurred while rendering the view, the HTML is analyzed
-        and parsed.
-        :returns: an instance of `cubicweb.devtools.htmlparser.PageInfo`
-                  encapsulation the generated HTML
-        """
-        if req is None:
-            if rset is None:
-                req = self.request()
-            else:
-                req = rset.req
-        req.form['vid'] = vid
-        viewsreg = self.vreg['views']
-        view =, req, rset=rset, **kwargs)
-        # set explicit test description
-        if rset is not None:
-            # coerce to "bytes" on py2 because the description will be sent to
-            # sys.stdout/stderr which takes "bytes" on py2 and "unicode" on py3
-            rql = str(rset.printable_rql())
-            self.set_description("testing vid=%s defined in %s with (%s)" % (
-                vid, view.__module__, rql))
-        else:
-            self.set_description("testing vid=%s defined in %s without rset" % (
-                vid, view.__module__))
-        if template is None:  # raw view testing, no template
-            viewfunc = view.render
-        else:
-            kwargs['view'] = view
-            viewfunc = lambda **k: viewsreg.main_template(req, template,
-                                                          rset=rset, **kwargs)
-        return self._test_view(viewfunc, view, template, kwargs)
-    def _test_view(self, viewfunc, view, template='main-template', kwargs={}):
-        """this method does the actual call to the view
-        If no error occurred while rendering the view, the HTML is analyzed
-        and parsed.
-        :returns: an instance of `cubicweb.devtools.htmlparser.PageInfo`
-                  encapsulation the generated HTML
-        """
-        try:
-            output = viewfunc(**kwargs)
-        except Exception:
-            # hijack exception: generative tests stop when the exception
-            # is not an AssertionError
-            klass, exc, tcbk = sys.exc_info()
-            try:
-                msg = '[%s in %s] %s' % (klass, view.__regid__, exc)
-            except Exception:
-                msg = '[%s in %s] undisplayable exception' % (klass, view.__regid__)
-            exc = AssertionError(msg)
-            exc.__traceback__ = tcbk
-            raise exc
-        return self._check_html(output, view, template)
-    def get_validator(self, view=None, content_type=None, output=None):
-        if view is not None:
-            try:
-                return self.vid_validators[view.__regid__]()
-            except KeyError:
-                if content_type is None:
-                    content_type = view.content_type
-        if content_type is None:
-            content_type = 'text/html'
-        if content_type in ('text/html', 'application/xhtml+xml') and output:
-            if output.startswith(b'<!DOCTYPE html>'):
-                # only check XML well-formness since HTMLValidator isn't html5
-                # compatible and won't like various other extensions
-                default_validator = htmlparser.XMLSyntaxValidator
-            elif output.startswith(b'<?xml'):
-                default_validator = htmlparser.DTDValidator
-            else:
-                default_validator = htmlparser.HTMLValidator
-        else:
-            default_validator = None
-        validatorclass = self.content_type_validators.get(content_type,
-                                                          default_validator)
-        if validatorclass is None:
-            return
-        return validatorclass()
-    @nocoverage
-    def _check_html(self, output, view, template='main-template'):
-        """raises an exception if the HTML is invalid"""
-        output = output.strip()
-        if isinstance(output, text_type):
-            # XXX
-            output = output.encode('utf-8')
-        validator = self.get_validator(view, output=output)
-        if validator is None:
-            return output  # return raw output if no validator is defined
-        if isinstance(validator, htmlparser.DTDValidator):
-            # XXX remove <canvas> used in progress widget, unknown in html dtd
-            output = re.sub('<canvas.*?></canvas>', '', output)
-        return self.assertWellFormed(validator, output.strip(), context=view.__regid__)
-    def assertWellFormed(self, validator, content, context=None):
-        try:
-            return validator.parse_string(content)
-        except Exception:
-            # hijack exception: generative tests stop when the exception
-            # is not an AssertionError
-            klass, exc, tcbk = sys.exc_info()
-            if context is None:
-                msg = u'[%s]' % (klass,)
-            else:
-                msg = u'[%s in %s]' % (klass, context)
-            msg = msg.encode(sys.getdefaultencoding(), 'replace')
-            try:
-                str_exc = str(exc)
-            except Exception:
-                str_exc = 'undisplayable exception'
-            msg += str_exc.encode(sys.getdefaultencoding(), 'replace')
-            if content is not None:
-                position = getattr(exc, "position", (0,))[0]
-                if position:
-                    # define filter
-                    if isinstance(content, str):
-                        content = unicode(content, sys.getdefaultencoding(), 'replace')
-                    content = validator.preprocess_data(content)
-                    content = content.splitlines()
-                    width = int(log(len(content), 10)) + 1
-                    line_template = " %" + ("%i" % width) + "i: %s"
-                    # XXX no need to iterate the whole file except to get
-                    # the line number
-                    content = u'\n'.join(line_template % (idx + 1, line)
-                                         for idx, line in enumerate(content)
-                                         if line_context_filter(idx+1, position))
-                    msg += u'\nfor content:\n%s' % content
-            exc = AssertionError(msg)
-            exc.__traceback__ = tcbk
-            raise exc
-    def assertDocTestFile(self, testfile):
-        # doctest returns tuple (failure_count, test_count)
-        with as mih:
-            result = mih.process_script(testfile)
-        if result[0] and result[1]:
-            raise self.failureException("doctest file '%s' failed"
-                                        % testfile)
-    # notifications ############################################################
-    def assertSentEmail(self, subject, recipients=None, nb_msgs=None):
-        """test recipients in system mailbox for given email subject
-        :param subject: email subject to find in mailbox
-        :param recipients: list of email recipients
-        :param nb_msgs: expected number of entries
-        :returns: list of matched emails
-        """
-        messages = [email for email in MAILBOX
-                    if email.message.get('Subject') == subject]
-        if recipients is not None:
-            sent_to = set()
-            for msg in messages:
-                sent_to.update(msg.recipients)
-            self.assertSetEqual(set(recipients), sent_to)
-        if nb_msgs is not None:
-            self.assertEqual(len(MAILBOX), nb_msgs)
-        return messages
-# auto-populating test classes and utilities ###################################
-from cubicweb.devtools.fill import insert_entity_queries, make_relations_queries
-# XXX cleanup unprotected_entities & all mess
-def how_many_dict(schema, cnx, how_many, skip):
-    """given a schema, compute how many entities by type we need to be able to
-    satisfy relations cardinality.
-    The `how_many` argument tells how many entities of which type we want at
-    least.
-    Return a dictionary with entity types as key, and the number of entities for
-    this type as value.
-    """
-    relmap = {}
-    for rschema in schema.relations():
-        if
-            continue
-        for subj, obj in rschema.rdefs:
-            card = rschema.rdef(subj, obj).cardinality
-            # if the relation is mandatory, we'll need at least as many subj and
-            # obj to satisfy it
-            if card[0] in '1+' and card[1] in '1?':
-                # subj has to be linked to at least one obj,
-                # but obj can be linked to only one subj
-                # -> we need at least as many subj as obj to satisfy
-                #    cardinalities for this relation
-                relmap.setdefault((rschema, subj), []).append(str(obj))
-            if card[1] in '1+' and card[0] in '1?':
-                # reverse subj and obj in the above explanation
-                relmap.setdefault((rschema, obj), []).append(str(subj))
-    unprotected = unprotected_entities(schema)
-    for etype in skip:  # XXX (syt) duh? explain or kill
-        unprotected.add(etype)
-    howmanydict = {}
-    # step 1, compute a base number of each entity types: number of already
-    # existing entities of this type + `how_many`
-    for etype in unprotected_entities(schema, strict=True):
-        howmanydict[str(etype)] = cnx.execute('Any COUNT(X) WHERE X is %s' % etype)[0][0]
-        if etype in unprotected:
-            howmanydict[str(etype)] += how_many
-    # step 2, augment nb entity per types to satisfy cardinality constraints,
-    # by recomputing for each relation that constrained an entity type:
-    #
-    # new num for etype = max(current num, sum(num for possible target etypes))
-    #
-    # XXX we should first check there is no cycle then propagate changes
-    for (rschema, etype), targets in relmap.items():
-        relfactor = sum(howmanydict[e] for e in targets)
-        howmanydict[str(etype)] = max(relfactor, howmanydict[etype])
-    return howmanydict
-class AutoPopulateTest(CubicWebTC):
-    """base class for test with auto-populating of the database"""
-    __abstract__ = True
-    test_db_id = 'autopopulate'
-    tags = CubicWebTC.tags | Tags('autopopulated')
-    pdbclass = CubicWebDebugger
-    # this is a hook to be able to define a list of rql queries
-    # that are application dependent and cannot be guessed automatically
-    application_rql = []
-    no_auto_populate = ()
-    ignored_relations = set()
-    def to_test_etypes(self):
-        return unprotected_entities(self.schema, strict=True)
-    def custom_populate(self, how_many, cnx):
-        pass
-    def post_populate(self, cnx):
-        pass
-    @nocoverage
-    def auto_populate(self, how_many):
-        """this method populates the database with `how_many` entities
-        of each possible type. It also inserts random relations between them
-        """
-        with self.admin_access.cnx() as cnx:
-            with cnx.security_enabled(read=False, write=False):
-                self._auto_populate(cnx, how_many)
-                cnx.commit()
-    def _auto_populate(self, cnx, how_many):
-        self.custom_populate(how_many, cnx)
-        vreg = self.vreg
-        howmanydict = how_many_dict(self.schema, cnx, how_many, self.no_auto_populate)
-        for etype in unprotected_entities(self.schema):
-            if etype in self.no_auto_populate:
-                continue
-            nb = howmanydict.get(etype, how_many)
-            for rql, args in insert_entity_queries(etype, self.schema, vreg, nb):
-                cnx.execute(rql, args)
-        edict = {}
-        for etype in unprotected_entities(self.schema, strict=True):
-            rset = cnx.execute('%s X' % etype)
-            edict[str(etype)] = set(row[0] for row in rset.rows)
-        existingrels = {}
-        ignored_relations = SYSTEM_RELATIONS | self.ignored_relations
-        for rschema in self.schema.relations():
-            if or rschema in ignored_relations:
-                continue
-            rset = cnx.execute('DISTINCT Any X,Y WHERE X %s Y' % rschema)
-            existingrels.setdefault(rschema.type, set()).update((x, y) for x, y in rset)
-        q = make_relations_queries(self.schema, edict, cnx, ignored_relations,
-                                   existingrels=existingrels)
-        for rql, args in q:
-            try:
-                cnx.execute(rql, args)
-            except ValidationError as ex:
-                # failed to satisfy some constraint
-                print('error in automatic db population', ex)
-                cnx.commit_state = None  # reset uncommitable flag
-        self.post_populate(cnx)
-    def iter_individual_rsets(self, etypes=None, limit=None):
-        etypes = etypes or self.to_test_etypes()
-        with self.admin_access.web_request() as req:
-            for etype in etypes:
-                if limit:
-                    rql = 'Any X LIMIT %s WHERE X is %s' % (limit, etype)
-                else:
-                    rql = 'Any X WHERE X is %s' % etype
-                rset = req.execute(rql)
-                for row in range(len(rset)):
-                    if limit and row > limit:
-                        break
-                    # XXX iirk
-                    rset2 = rset.limit(limit=1, offset=row)
-                    yield rset2
-    def iter_automatic_rsets(self, limit=10):
-        """generates basic resultsets for each entity type"""
-        etypes = self.to_test_etypes()
-        if not etypes:
-            return
-        with self.admin_access.web_request() as req:
-            for etype in etypes:
-                yield req.execute('Any X LIMIT %s WHERE X is %s' % (limit, etype))
-            etype1 = etypes.pop()
-            try:
-                etype2 = etypes.pop()
-            except KeyError:
-                etype2 = etype1
-            # test a mixed query (DISTINCT/GROUP to avoid getting duplicate
-            # X which make muledit view failing for instance (html validation fails
-            # because of some duplicate "id" attributes)
-            yield req.execute('DISTINCT Any X, MAX(Y) GROUPBY X WHERE X is %s, Y is %s' %
-                              (etype1, etype2))
-            # test some application-specific queries if defined
-            for rql in self.application_rql:
-                yield req.execute(rql)
-    def _test_everything_for(self, rset):
-        """this method tries to find everything that can be tested
-        for `rset` and yields a callable test (as needed in generative tests)
-        """
-        propdefs = self.vreg['propertydefs']
-        # make all components visible
-        for k, v in propdefs.items():
-            if k.endswith('visible') and not v['default']:
-                propdefs[k]['default'] = True
-        for view in self.list_views_for(rset):
-            backup_rset = rset.copy(rset.rows, rset.description)
-            yield InnerTest(self._testname(rset, view.__regid__, 'view'),
-                            self.view, view.__regid__, rset,
-                            rset.req.reset_headers(), 'main-template')
-            # We have to do this because some views modify the
-            # resultset's syntax tree
-            rset = backup_rset
-        for action in self.list_actions_for(rset):
-            yield InnerTest(self._testname(rset, action.__regid__, 'action'),
-                            self._test_action, action)
-        for box in self.list_boxes_for(rset):
-            w = [].append
-            yield InnerTest(self._testname(rset, box.__regid__, 'box'), box.render, w)
-    @staticmethod
-    def _testname(rset, objid, objtype):
-        return '%s_%s_%s' % ('_'.join(rset.column_types(0)), objid, objtype)
-# concrete class for automated application testing  ############################
-class AutomaticWebTest(AutoPopulateTest):
-    """import this if you wan automatic tests to be ran"""
-    tags = AutoPopulateTest.tags | Tags('web', 'generated')
-    def setUp(self):
-        if self.__class__ is AutomaticWebTest:
-            # Prevent direct use of AutomaticWebTest to avoid database caching
-            # issues.
-            return
-        super(AutomaticWebTest, self).setUp()
-        # access to for proper initialization of the authentication
-        # machinery (else some views may fail)
-    def test_one_each_config(self):
-        self.auto_populate(1)
-        for rset in self.iter_automatic_rsets(limit=1):
-            for testargs in self._test_everything_for(rset):
-                yield testargs
-    def test_ten_each_config(self):
-        self.auto_populate(10)
-        for rset in self.iter_automatic_rsets(limit=10):
-            for testargs in self._test_everything_for(rset):
-                yield testargs
-    def test_startup_views(self):
-        for vid in self.list_startup_views():
-            with self.admin_access.web_request() as req:
-                yield self.view, vid, None, req
-# registry instrumentization ###################################################
-def not_selected(vreg, appobject):
-    try:
-        vreg._selected[appobject.__class__] -= 1
-    except (KeyError, AttributeError):
-        pass
-# def vreg_instrumentize(testclass):
-#     # XXX broken
-#     from cubicweb.devtools.apptest import TestEnvironment
-#     env = testclass._env = TestEnvironment('data', configcls=testclass.configcls)
-#     for reg in env.vreg.values():
-#         reg._selected = {}
-#         try:
-#             orig_select_best = reg.__class__.__orig_select_best
-#         except Exception:
-#             orig_select_best = reg.__class__._select_best
-#         def instr_select_best(self, *args, **kwargs):
-#             selected = orig_select_best(self, *args, **kwargs)
-#             try:
-#                 self._selected[selected.__class__] += 1
-#             except KeyError:
-#                 self._selected[selected.__class__] = 1
-#             except AttributeError:
-#                 pass # occurs on reg used to restore database
-#             return selected
-#         reg.__class__._select_best = instr_select_best
-#         reg.__class__.__orig_select_best = orig_select_best
-# def print_untested_objects(testclass, skipregs=('hooks', 'etypes')):
-#     for regname, reg in testclass._env.vreg.items():
-#         if regname in skipregs:
-#             continue
-#         for appobjects in reg.values():
-#             for appobject in appobjects:
-#                 if not reg._selected.get(appobject):
-#                     print 'not tested', regname, appobject