diff -r 058bb3dc685f -r 0b59724cb3f2 devtools/testlib.py --- a/devtools/testlib.py Mon Jan 04 18:40:30 2016 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1335 +0,0 @@ -# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. -# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr -# -# This file is part of CubicWeb. -# -# CubicWeb is free software: you can redistribute it and/or modify it under the -# terms of the GNU Lesser General Public License as published by the Free -# Software Foundation, either version 2.1 of the License, or (at your option) -# any later version. -# -# CubicWeb is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License along -# with CubicWeb. If not, see . -"""this module contains base classes and utilities for cubicweb tests""" -from __future__ import print_function - -import sys -import re -from os.path import dirname, join, abspath -from math import log -from contextlib import contextmanager -from itertools import chain - -from six import text_type, string_types -from six.moves import range -from six.moves.urllib.parse import urlparse, parse_qs, unquote as urlunquote - -import yams.schema - -from logilab.common.testlib import TestCase, InnerTest, Tags -from logilab.common.pytest import nocoverage, pause_trace -from logilab.common.debugger import Debugger -from logilab.common.umessage import message_from_string -from logilab.common.decorators import cached, classproperty, clear_cache, iclassmethod -from logilab.common.deprecation import deprecated, class_deprecated -from logilab.common.shellutils import getlogin - -from cubicweb import (ValidationError, NoSelectableObject, AuthenticationError, - BadConnectionId) -from cubicweb import cwconfig, devtools, web, server, repoapi -from cubicweb.utils import json -from cubicweb.sobjects import notification -from cubicweb.web import Redirect, application, eid_param -from cubicweb.server.hook import SendMailOp -from cubicweb.server.session import Session -from cubicweb.devtools import SYSTEM_ENTITIES, SYSTEM_RELATIONS, VIEW_VALIDATORS -from cubicweb.devtools import fake, htmlparser, DEFAULT_EMPTY_DB_ID - - -# low-level utilities ########################################################## - -class CubicWebDebugger(Debugger): - """special debugger class providing a 'view' function which saves some - html into a temporary file and open a web browser to examinate it. - """ - def do_view(self, arg): - import webbrowser - data = self._getval(arg) - with open('/tmp/toto.html', 'w') as toto: - toto.write(data) - webbrowser.open('file:///tmp/toto.html') - - -def line_context_filter(line_no, center, before=3, after=None): - """return true if line are in context - - if after is None: after = before - """ - if after is None: - after = before - return center - before <= line_no <= center + after - - -def unprotected_entities(schema, strict=False): - """returned a set of each non final entity type, excluding "system" entities - (eg CWGroup, CWUser...) - """ - if strict: - protected_entities = yams.schema.BASE_TYPES - else: - protected_entities = yams.schema.BASE_TYPES.union(SYSTEM_ENTITIES) - return set(schema.entities()) - protected_entities - - -class JsonValidator(object): - def parse_string(self, data): - return json.loads(data.decode('ascii')) - - -@contextmanager -def real_error_handling(app): - """By default, CubicWebTC `app` attribute (ie the publisher) is monkey - patched so that unexpected error are raised rather than going through the - `error_handler` method. - - By using this context manager you disable this monkey-patching temporarily. - Hence when publishihng a request no error will be raised, you'll get - req.status_out set to an HTTP error status code and the generated page will - usually hold a traceback as HTML. - - >>> with real_error_handling(app): - >>> page = app.handle_request(req) - """ - # remove the monkey patched error handler - fake_error_handler = app.error_handler - del app.error_handler - # return the app - yield app - # restore - app.error_handler = fake_error_handler - - -# email handling, to test emails sent by an application ######################## - -MAILBOX = [] - - -class Email(object): - """you'll get instances of Email into MAILBOX during tests that trigger - some notification. - - * `msg` is the original message object - - * `recipients` is a list of email address which are the recipients of this - message - """ - def __init__(self, fromaddr, recipients, msg): - self.fromaddr = fromaddr - self.recipients = recipients - self.msg = msg - - @property - def message(self): - return message_from_string(self.msg) - - @property - def subject(self): - return self.message.get('Subject') - - @property - def content(self): - return self.message.get_payload(decode=True) - - def __repr__(self): - return '' % (','.join(self.recipients), - self.message.get('Subject')) - - -# the trick to get email into MAILBOX instead of actually sent: monkey patch -# cwconfig.SMTP object -class MockSMTP: - - def __init__(self, server, port): - pass - - def close(self): - pass - - def sendmail(self, fromaddr, recipients, msg): - MAILBOX.append(Email(fromaddr, recipients, msg)) - -cwconfig.SMTP = MockSMTP - - -# Repoaccess utility ###############################################3########### - -class RepoAccess(object): - """An helper to easily create object to access the repo as a specific user - - Each RepoAccess have it own session. - - A repo access can create three type of object: - - .. automethod:: cubicweb.testlib.RepoAccess.cnx - .. automethod:: cubicweb.testlib.RepoAccess.web_request - - The RepoAccess need to be closed to destroy the associated Session. - TestCase usually take care of this aspect for the user. - - .. automethod:: cubicweb.testlib.RepoAccess.close - """ - - def __init__(self, repo, login, requestcls): - self._repo = repo - self._login = login - self.requestcls = requestcls - self._session = self._unsafe_connect(login) - - def _unsafe_connect(self, login, **kwargs): - """ a completely unsafe connect method for the tests """ - # use an internal connection - with self._repo.internal_cnx() as cnx: - # try to get a user object - user = cnx.find('CWUser', login=login).one() - user.groups - user.properties - user.login - session = Session(user, self._repo) - self._repo._sessions[session.sessionid] = session - user._cw = user.cw_rset.req = session - with session.new_cnx() as cnx: - self._repo.hm.call_hooks('session_open', cnx) - # commit connection at this point in case write operation has been - # done during `session_open` hooks - cnx.commit() - return session - - @contextmanager - def cnx(self): - """Context manager returning a server side connection for the user""" - with self._session.new_cnx() as cnx: - yield cnx - - # aliases for bw compat - client_cnx = repo_cnx = cnx - - @contextmanager - def web_request(self, url=None, headers={}, method='GET', **kwargs): - """Context manager returning a web request pre-linked to a client cnx - - To commit and rollback use:: - - req.cnx.commit() - req.cnx.rolback() - """ - req = self.requestcls(self._repo.vreg, url=url, headers=headers, - method=method, form=kwargs) - with self._session.new_cnx() as cnx: - req.set_cnx(cnx) - yield req - - def close(self): - """Close the session associated to the RepoAccess""" - if self._session is not None: - self._repo.close(self._session.sessionid) - self._session = None - - @contextmanager - def shell(self): - from cubicweb.server.migractions import ServerMigrationHelper - with self._session.new_cnx() as cnx: - mih = ServerMigrationHelper(None, repo=self._repo, cnx=cnx, - interactive=False, - # hack so it don't try to load fs schema - schema=1) - yield mih - cnx.commit() - - -# base class for cubicweb tests requiring a full cw environments ############### - -class CubicWebTC(TestCase): - """abstract class for test using an apptest environment - - attributes: - - * `vreg`, the vregistry - * `schema`, self.vreg.schema - * `config`, cubicweb configuration - * `cnx`, repoapi connection to the repository using an admin user - * `session`, server side session associated to `cnx` - * `app`, the cubicweb publisher (for web testing) - * `repo`, the repository object - * `admlogin`, login of the admin user - * `admpassword`, password of the admin user - * `shell`, create and use shell environment - * `anonymous_allowed`: flag telling if anonymous browsing should be allowed - """ - appid = 'data' - configcls = devtools.ApptestConfiguration - requestcls = fake.FakeRequest - tags = TestCase.tags | Tags('cubicweb', 'cw_repo') - test_db_id = DEFAULT_EMPTY_DB_ID - - # anonymous is logged by default in cubicweb test cases - anonymous_allowed = True - - def __init__(self, *args, **kwargs): - self._admin_session = None - self.repo = None - self._open_access = set() - super(CubicWebTC, self).__init__(*args, **kwargs) - - # repository connection handling ########################################### - - def new_access(self, login): - """provide a new RepoAccess object for a given user - - The access is automatically closed at the end of the test.""" - login = text_type(login) - access = RepoAccess(self.repo, login, self.requestcls) - self._open_access.add(access) - return access - - def _close_access(self): - while self._open_access: - try: - self._open_access.pop().close() - except BadConnectionId: - continue # already closed - - @property - def session(self): - """return admin session""" - return self._admin_session - - # XXX this doesn't need to a be classmethod anymore - def _init_repo(self): - """init the repository and connection to it. - """ - # get or restore and working db. - db_handler = devtools.get_test_db_handler(self.config, self.init_config) - db_handler.build_db_cache(self.test_db_id, self.pre_setup_database) - db_handler.restore_database(self.test_db_id) - self.repo = db_handler.get_repo(startup=True) - # get an admin session (without actual login) - login = text_type(db_handler.config.default_admin_config['login']) - self.admin_access = self.new_access(login) - self._admin_session = self.admin_access._session - - # config management ######################################################## - - @classproperty - def config(cls): - """return the configuration object - - Configuration is cached on the test class. - """ - if cls is CubicWebTC: - # Prevent direct use of CubicWebTC directly to avoid database - # caching issues - return None - try: - return cls.__dict__['_config'] - except KeyError: - home = abspath(join(dirname(sys.modules[cls.__module__].__file__), cls.appid)) - config = cls._config = cls.configcls(cls.appid, apphome=home) - config.mode = 'test' - return config - - @classmethod # XXX could be turned into a regular method - def init_config(cls, config): - """configuration initialization hooks. - - You may only want to override here the configuraton logic. - - Otherwise, consider to use a different :class:`ApptestConfiguration` - defined in the `configcls` class attribute. - - This method will be called by the database handler once the config has - been properly bootstrapped. - """ - admincfg = config.default_admin_config - cls.admlogin = text_type(admincfg['login']) - cls.admpassword = admincfg['password'] - # uncomment the line below if you want rql queries to be logged - # config.global_set_option('query-log-file', - # '/tmp/test_rql_log.' + `os.getpid()`) - config.global_set_option('log-file', None) - # set default-dest-addrs to a dumb email address to avoid mailbox or - # mail queue pollution - config.global_set_option('default-dest-addrs', ['whatever']) - send_to = '%s@logilab.fr' % getlogin() - config.global_set_option('sender-addr', send_to) - config.global_set_option('default-dest-addrs', send_to) - config.global_set_option('sender-name', 'cubicweb-test') - config.global_set_option('sender-addr', 'cubicweb-test@logilab.fr') - # default_base_url on config class isn't enough for TestServerConfiguration - config.global_set_option('base-url', config.default_base_url()) - # web resources - try: - config.global_set_option('embed-allowed', re.compile('.*')) - except Exception: # not in server only configuration - pass - - @property - def vreg(self): - return self.repo.vreg - - # global resources accessors ############################################### - - @property - def schema(self): - """return the application schema""" - return self.vreg.schema - - def set_option(self, optname, value): - self.config.global_set_option(optname, value) - - def set_debug(self, debugmode): - server.set_debug(debugmode) - - def debugged(self, debugmode): - return server.debugged(debugmode) - - # default test setup and teardown ######################################### - - def setUp(self): - # monkey patch send mail operation so emails are sent synchronously - self._patch_SendMailOp() - with pause_trace(): - previous_failure = self.__class__.__dict__.get('_repo_init_failed') - if previous_failure is not None: - self.skipTest('repository is not initialised: %r' % previous_failure) - try: - self._init_repo() - except Exception as ex: - self.__class__._repo_init_failed = ex - raise - self.addCleanup(self._close_access) - self.config.set_anonymous_allowed(self.anonymous_allowed) - self.setup_database() - MAILBOX[:] = [] # reset mailbox - - def tearDown(self): - # XXX hack until logilab.common.testlib is fixed - if self._admin_session is not None: - self.repo.close(self._admin_session.sessionid) - self._admin_session = None - while self._cleanups: - cleanup, args, kwargs = self._cleanups.pop(-1) - cleanup(*args, **kwargs) - self.repo.turn_repo_off() - - def _patch_SendMailOp(self): - # monkey patch send mail operation so emails are sent synchronously - _old_mail_postcommit_event = SendMailOp.postcommit_event - SendMailOp.postcommit_event = SendMailOp.sendmails - - def reverse_SendMailOp_monkey_patch(): - SendMailOp.postcommit_event = _old_mail_postcommit_event - - self.addCleanup(reverse_SendMailOp_monkey_patch) - - def setup_database(self): - """add your database setup code by overriding this method""" - - @classmethod - def pre_setup_database(cls, cnx, config): - """add your pre database setup code by overriding this method - - Do not forget to set the cls.test_db_id value to enable caching of the - result. - """ - - # user / session management ############################################### - - @deprecated('[3.19] explicitly use RepoAccess object in test instead') - def user(self, req=None): - """return the application schema""" - if req is None: - return self.request().user - else: - return req.user - - @iclassmethod # XXX turn into a class method - def create_user(self, req, login=None, groups=('users',), password=None, - email=None, commit=True, **kwargs): - """create and return a new user entity""" - if password is None: - password = login - if login is not None: - login = text_type(login) - user = req.create_entity('CWUser', login=login, - upassword=password, **kwargs) - req.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)' - % ','.join(repr(str(g)) for g in groups), - {'x': user.eid}) - if email is not None: - req.create_entity('EmailAddress', address=text_type(email), - reverse_primary_email=user) - user.cw_clear_relation_cache('in_group', 'subject') - if commit: - try: - req.commit() # req is a session - except AttributeError: - req.cnx.commit() - return user - - # other utilities ######################################################### - - @contextmanager - def temporary_appobjects(self, *appobjects): - self.vreg._loadedmods.setdefault(self.__module__, {}) - for obj in appobjects: - self.vreg.register(obj) - registered = getattr(obj, '__registered__', None) - if registered: - for registry in obj.__registries__: - registered(self.vreg[registry]) - try: - yield - finally: - for obj in appobjects: - self.vreg.unregister(obj) - - @contextmanager - def temporary_permissions(self, *perm_overrides, **perm_kwoverrides): - """Set custom schema permissions within context. - - There are two ways to call this method, which may be used together : - - * using positional argument(s): - - .. sourcecode:: python - - rdef = self.schema['CWUser'].rdef('login') - with self.temporary_permissions((rdef, {'read': ()})): - ... - - - * using named argument(s): - - .. sourcecode:: python - - with self.temporary_permissions(CWUser={'read': ()}): - ... - - Usually the former will be preferred to override permissions on a - relation definition, while the latter is well suited for entity types. - - The allowed keys in the permission dictionary depend on the schema type - (entity type / relation definition). Resulting permissions will be - similar to `orig_permissions.update(partial_perms)`. - """ - torestore = [] - for erschema, etypeperms in chain(perm_overrides, perm_kwoverrides.items()): - if isinstance(erschema, string_types): - erschema = self.schema[erschema] - for action, actionperms in etypeperms.items(): - origperms = erschema.permissions[action] - erschema.set_action_permissions(action, actionperms) - torestore.append([erschema, action, origperms]) - try: - yield - finally: - for erschema, action, permissions in torestore: - if action is None: - erschema.permissions = permissions - else: - erschema.set_action_permissions(action, permissions) - - def assertModificationDateGreater(self, entity, olddate): - entity.cw_attr_cache.pop('modification_date', None) - self.assertGreater(entity.modification_date, olddate) - - def assertMessageEqual(self, req, params, expected_msg): - msg = req.session.data[params['_cwmsgid']] - self.assertEqual(expected_msg, msg) - - # workflow utilities ####################################################### - - def assertPossibleTransitions(self, entity, expected): - transitions = entity.cw_adapt_to('IWorkflowable').possible_transitions() - self.assertListEqual(sorted(tr.name for tr in transitions), - sorted(expected)) - - # views and actions registries inspection ################################## - - def pviews(self, req, rset): - return sorted((a.__regid__, a.__class__) - for a in self.vreg['views'].possible_views(req, rset=rset)) - - def pactions(self, req, rset, - skipcategories=('addrelated', 'siteactions', 'useractions', - 'footer', 'manage')): - return [(a.__regid__, a.__class__) - for a in self.vreg['actions'].poss_visible_objects(req, rset=rset) - if a.category not in skipcategories] - - def pactions_by_cats(self, req, rset, categories=('addrelated',)): - return [(a.__regid__, a.__class__) - for a in self.vreg['actions'].poss_visible_objects(req, rset=rset) - if a.category in categories] - - def pactionsdict(self, req, rset, - skipcategories=('addrelated', 'siteactions', 'useractions', - 'footer', 'manage')): - res = {} - for a in self.vreg['actions'].poss_visible_objects(req, rset=rset): - if a.category not in skipcategories: - res.setdefault(a.category, []).append(a.__class__) - return res - - def action_submenu(self, req, rset, id): - return self._test_action(self.vreg['actions'].select(id, req, rset=rset)) - - def _test_action(self, action): - class fake_menu(list): - @property - def items(self): - return self - - class fake_box(object): - def action_link(self, action, **kwargs): - return (action.title, action.url()) - submenu = fake_menu() - action.fill_menu(fake_box(), submenu) - return submenu - - def list_views_for(self, rset): - """returns the list of views that can be applied on `rset`""" - req = rset.req - only_once_vids = ('primary', 'secondary', 'text') - req.data['ex'] = ValueError("whatever") - viewsvreg = self.vreg['views'] - for vid, views in viewsvreg.items(): - if vid[0] == '_': - continue - if rset.rowcount > 1 and vid in only_once_vids: - continue - views = [view for view in views - if view.category != 'startupview' - and not issubclass(view, notification.NotificationView) - and not isinstance(view, class_deprecated)] - if views: - try: - view = viewsvreg._select_best(views, req, rset=rset) - if view is None: - raise NoSelectableObject((req,), {'rset': rset}, views) - if view.linkable(): - yield view - else: - not_selected(self.vreg, view) - # else the view is expected to be used as subview and should - # not be tested directly - except NoSelectableObject: - continue - - def list_actions_for(self, rset): - """returns the list of actions that can be applied on `rset`""" - req = rset.req - for action in self.vreg['actions'].possible_objects(req, rset=rset): - yield action - - def list_boxes_for(self, rset): - """returns the list of boxes that can be applied on `rset`""" - req = rset.req - for box in self.vreg['ctxcomponents'].possible_objects(req, rset=rset): - yield box - - def list_startup_views(self): - """returns the list of startup views""" - with self.admin_access.web_request() as req: - for view in self.vreg['views'].possible_views(req, None): - if view.category == 'startupview': - yield view.__regid__ - else: - not_selected(self.vreg, view) - - # web ui testing utilities ################################################# - - @property - @cached - def app(self): - """return a cubicweb publisher""" - publisher = application.CubicWebPublisher(self.repo, self.config) - - def raise_error_handler(*args, **kwargs): - raise - - publisher.error_handler = raise_error_handler - return publisher - - @deprecated('[3.19] use the .remote_calling method') - def remote_call(self, fname, *args): - """remote json call simulation""" - dump = json.dumps - args = [dump(arg) for arg in args] - req = self.request(fname=fname, pageid='123', arg=args) - ctrl = self.vreg['controllers'].select('ajax', req) - return ctrl.publish(), req - - @contextmanager - def remote_calling(self, fname, *args): - """remote json call simulation""" - args = [json.dumps(arg) for arg in args] - with self.admin_access.web_request(fname=fname, pageid='123', arg=args) as req: - ctrl = self.vreg['controllers'].select('ajax', req) - yield ctrl.publish(), req - - def app_handle_request(self, req, path='view'): - return self.app.core_handle(req, path) - - @deprecated("[3.15] app_handle_request is the new and better way" - " (beware of small semantic changes)") - def app_publish(self, *args, **kwargs): - return self.app_handle_request(*args, **kwargs) - - def ctrl_publish(self, req, ctrl='edit', rset=None): - """call the publish method of the edit controller""" - ctrl = self.vreg['controllers'].select(ctrl, req, appli=self.app) - try: - result = ctrl.publish(rset) - req.cnx.commit() - except web.Redirect: - req.cnx.commit() - raise - return result - - @staticmethod - def fake_form(formid, field_dict=None, entity_field_dicts=()): - """Build _cw.form dictionnary to fake posting of some standard cubicweb form - - * `formid`, the form id, usually form's __regid__ - - * `field_dict`, dictionary of name:value for fields that are not tied to an entity - - * `entity_field_dicts`, list of (entity, dictionary) where dictionary contains name:value - for fields that are not tied to the given entity - """ - assert field_dict or entity_field_dicts, \ - 'field_dict and entity_field_dicts arguments must not be both unspecified' - if field_dict is None: - field_dict = {} - form = {'__form_id': formid} - fields = [] - for field, value in field_dict.items(): - fields.append(field) - form[field] = value - - def _add_entity_field(entity, field, value): - entity_fields.append(field) - form[eid_param(field, entity.eid)] = value - - for entity, field_dict in entity_field_dicts: - if '__maineid' not in form: - form['__maineid'] = entity.eid - entity_fields = [] - form.setdefault('eid', []).append(entity.eid) - _add_entity_field(entity, '__type', entity.cw_etype) - for field, value in field_dict.items(): - _add_entity_field(entity, field, value) - if entity_fields: - form[eid_param('_cw_entity_fields', entity.eid)] = ','.join(entity_fields) - if fields: - form['_cw_fields'] = ','.join(sorted(fields)) - return form - - @deprecated('[3.19] use .admin_request_from_url instead') - def req_from_url(self, url): - """parses `url` and builds the corresponding CW-web request - - req.form will be setup using the url's query string - """ - req = self.request(url=url) - if isinstance(url, unicode): - url = url.encode(req.encoding) # req.setup_params() expects encoded strings - querystring = urlparse(url)[-2] - params = parse_qs(querystring) - req.setup_params(params) - return req - - @contextmanager - def admin_request_from_url(self, url): - """parses `url` and builds the corresponding CW-web request - - req.form will be setup using the url's query string - """ - with self.admin_access.web_request(url=url) as req: - if isinstance(url, unicode): - url = url.encode(req.encoding) # req.setup_params() expects encoded strings - querystring = urlparse(url)[-2] - params = parse_qs(querystring) - req.setup_params(params) - yield req - - def url_publish(self, url, data=None): - """takes `url`, uses application's app_resolver to find the appropriate - controller and result set, then publishes the result. - - To simulate post of www-form-encoded data, give a `data` dictionary - containing desired key/value associations. - - This should pretty much correspond to what occurs in a real CW server - except the apache-rewriter component is not called. - """ - with self.admin_request_from_url(url) as req: - if data is not None: - req.form.update(data) - ctrlid, rset = self.app.url_resolver.process(req, req.relative_path(False)) - return self.ctrl_publish(req, ctrlid, rset) - - def http_publish(self, url, data=None): - """like `url_publish`, except this returns a http response, even in case - of errors. You may give form parameters using the `data` argument. - """ - with self.admin_request_from_url(url) as req: - if data is not None: - req.form.update(data) - with real_error_handling(self.app): - result = self.app_handle_request(req, req.relative_path(False)) - return result, req - - @staticmethod - def _parse_location(req, location): - try: - path, params = location.split('?', 1) - except ValueError: - path = location - params = {} - else: - cleanup = lambda p: (p[0], urlunquote(p[1])) - params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p) - if path.startswith(req.base_url()): # may be relative - path = path[len(req.base_url()):] - return path, params - - def expect_redirect(self, callback, req): - """call the given callback with req as argument, expecting to get a - Redirect exception - """ - try: - callback(req) - except Redirect as ex: - return self._parse_location(req, ex.location) - else: - self.fail('expected a Redirect exception') - - def expect_redirect_handle_request(self, req, path='edit'): - """call the publish method of the application publisher, expecting to - get a Redirect exception - """ - self.app_handle_request(req, path) - self.assertTrue(300 <= req.status_out < 400, req.status_out) - location = req.get_response_header('location') - return self._parse_location(req, location) - - @deprecated("[3.15] expect_redirect_handle_request is the new and better way" - " (beware of small semantic changes)") - def expect_redirect_publish(self, *args, **kwargs): - return self.expect_redirect_handle_request(*args, **kwargs) - - def set_auth_mode(self, authmode, anonuser=None): - self.set_option('auth-mode', authmode) - self.set_option('anonymous-user', anonuser) - if anonuser is None: - self.config.anonymous_credential = None - else: - self.config.anonymous_credential = (anonuser, anonuser) - - def init_authentication(self, authmode, anonuser=None): - self.set_auth_mode(authmode, anonuser) - req = self.requestcls(self.vreg, url='login') - sh = self.app.session_handler - authm = sh.session_manager.authmanager - authm.anoninfo = self.vreg.config.anonymous_user() - authm.anoninfo = authm.anoninfo[0], {'password': authm.anoninfo[1]} - # not properly cleaned between tests - self.open_sessions = sh.session_manager._sessions = {} - return req, self.session - - def assertAuthSuccess(self, req, origsession, nbsessions=1): - sh = self.app.session_handler - session = self.app.get_session(req) - cnx = repoapi.Connection(session) - req.set_cnx(cnx) - self.assertEqual(len(self.open_sessions), nbsessions, self.open_sessions) - self.assertEqual(session.login, origsession.login) - self.assertEqual(session.anonymous_session, False) - - def assertAuthFailure(self, req, nbsessions=0): - with self.assertRaises(AuthenticationError): - self.app.get_session(req) - # +0 since we do not track the opened session - self.assertEqual(len(self.open_sessions), nbsessions) - clear_cache(req, 'get_authorization') - - # content validation ####################################################### - - # validators are used to validate (XML, DTD, whatever) view's content - # validators availables are : - # DTDValidator : validates XML + declared DTD - # SaxOnlyValidator : guarantees XML is well formed - # None : do not try to validate anything - # validators used must be imported from from.devtools.htmlparser - content_type_validators = { - # maps MIME type : validator name - # - # do not set html validators here, we need HTMLValidator for html - # snippets - # 'text/html': DTDValidator, - # 'application/xhtml+xml': DTDValidator, - 'application/xml': htmlparser.XMLValidator, - 'text/xml': htmlparser.XMLValidator, - 'application/json': JsonValidator, - 'text/plain': None, - 'text/comma-separated-values': None, - 'text/x-vcard': None, - 'text/calendar': None, - 'image/png': None, - } - # maps vid : validator name (override content_type_validators) - vid_validators = dict((vid, htmlparser.VALMAP[valkey]) - for vid, valkey in VIEW_VALIDATORS.items()) - - def view(self, vid, rset=None, req=None, template='main-template', - **kwargs): - """This method tests the view `vid` on `rset` using `template` - - If no error occurred while rendering the view, the HTML is analyzed - and parsed. - - :returns: an instance of `cubicweb.devtools.htmlparser.PageInfo` - encapsulation the generated HTML - """ - if req is None: - if rset is None: - req = self.request() - else: - req = rset.req - req.form['vid'] = vid - viewsreg = self.vreg['views'] - view = viewsreg.select(vid, req, rset=rset, **kwargs) - # set explicit test description - if rset is not None: - # coerce to "bytes" on py2 because the description will be sent to - # sys.stdout/stderr which takes "bytes" on py2 and "unicode" on py3 - rql = str(rset.printable_rql()) - self.set_description("testing vid=%s defined in %s with (%s)" % ( - vid, view.__module__, rql)) - else: - self.set_description("testing vid=%s defined in %s without rset" % ( - vid, view.__module__)) - if template is None: # raw view testing, no template - viewfunc = view.render - else: - kwargs['view'] = view - viewfunc = lambda **k: viewsreg.main_template(req, template, - rset=rset, **kwargs) - return self._test_view(viewfunc, view, template, kwargs) - - def _test_view(self, viewfunc, view, template='main-template', kwargs={}): - """this method does the actual call to the view - - If no error occurred while rendering the view, the HTML is analyzed - and parsed. - - :returns: an instance of `cubicweb.devtools.htmlparser.PageInfo` - encapsulation the generated HTML - """ - try: - output = viewfunc(**kwargs) - except Exception: - # hijack exception: generative tests stop when the exception - # is not an AssertionError - klass, exc, tcbk = sys.exc_info() - try: - msg = '[%s in %s] %s' % (klass, view.__regid__, exc) - except Exception: - msg = '[%s in %s] undisplayable exception' % (klass, view.__regid__) - exc = AssertionError(msg) - exc.__traceback__ = tcbk - raise exc - return self._check_html(output, view, template) - - def get_validator(self, view=None, content_type=None, output=None): - if view is not None: - try: - return self.vid_validators[view.__regid__]() - except KeyError: - if content_type is None: - content_type = view.content_type - if content_type is None: - content_type = 'text/html' - if content_type in ('text/html', 'application/xhtml+xml') and output: - if output.startswith(b''): - # only check XML well-formness since HTMLValidator isn't html5 - # compatible and won't like various other extensions - default_validator = htmlparser.XMLSyntaxValidator - elif output.startswith(b' used in progress widget, unknown in html dtd - output = re.sub('', '', output) - return self.assertWellFormed(validator, output.strip(), context=view.__regid__) - - def assertWellFormed(self, validator, content, context=None): - try: - return validator.parse_string(content) - except Exception: - # hijack exception: generative tests stop when the exception - # is not an AssertionError - klass, exc, tcbk = sys.exc_info() - if context is None: - msg = u'[%s]' % (klass,) - else: - msg = u'[%s in %s]' % (klass, context) - msg = msg.encode(sys.getdefaultencoding(), 'replace') - - try: - str_exc = str(exc) - except Exception: - str_exc = 'undisplayable exception' - msg += str_exc.encode(sys.getdefaultencoding(), 'replace') - if content is not None: - position = getattr(exc, "position", (0,))[0] - if position: - # define filter - if isinstance(content, str): - content = unicode(content, sys.getdefaultencoding(), 'replace') - content = validator.preprocess_data(content) - content = content.splitlines() - width = int(log(len(content), 10)) + 1 - line_template = " %" + ("%i" % width) + "i: %s" - # XXX no need to iterate the whole file except to get - # the line number - content = u'\n'.join(line_template % (idx + 1, line) - for idx, line in enumerate(content) - if line_context_filter(idx+1, position)) - msg += u'\nfor content:\n%s' % content - exc = AssertionError(msg) - exc.__traceback__ = tcbk - raise exc - - def assertDocTestFile(self, testfile): - # doctest returns tuple (failure_count, test_count) - with self.admin_access.shell() as mih: - result = mih.process_script(testfile) - if result[0] and result[1]: - raise self.failureException("doctest file '%s' failed" - % testfile) - - # notifications ############################################################ - - def assertSentEmail(self, subject, recipients=None, nb_msgs=None): - """test recipients in system mailbox for given email subject - - :param subject: email subject to find in mailbox - :param recipients: list of email recipients - :param nb_msgs: expected number of entries - :returns: list of matched emails - """ - messages = [email for email in MAILBOX - if email.message.get('Subject') == subject] - if recipients is not None: - sent_to = set() - for msg in messages: - sent_to.update(msg.recipients) - self.assertSetEqual(set(recipients), sent_to) - if nb_msgs is not None: - self.assertEqual(len(MAILBOX), nb_msgs) - return messages - - -# auto-populating test classes and utilities ################################### - -from cubicweb.devtools.fill import insert_entity_queries, make_relations_queries - -# XXX cleanup unprotected_entities & all mess - - -def how_many_dict(schema, cnx, how_many, skip): - """given a schema, compute how many entities by type we need to be able to - satisfy relations cardinality. - - The `how_many` argument tells how many entities of which type we want at - least. - - Return a dictionary with entity types as key, and the number of entities for - this type as value. - """ - relmap = {} - for rschema in schema.relations(): - if rschema.final: - continue - for subj, obj in rschema.rdefs: - card = rschema.rdef(subj, obj).cardinality - # if the relation is mandatory, we'll need at least as many subj and - # obj to satisfy it - if card[0] in '1+' and card[1] in '1?': - # subj has to be linked to at least one obj, - # but obj can be linked to only one subj - # -> we need at least as many subj as obj to satisfy - # cardinalities for this relation - relmap.setdefault((rschema, subj), []).append(str(obj)) - if card[1] in '1+' and card[0] in '1?': - # reverse subj and obj in the above explanation - relmap.setdefault((rschema, obj), []).append(str(subj)) - unprotected = unprotected_entities(schema) - for etype in skip: # XXX (syt) duh? explain or kill - unprotected.add(etype) - howmanydict = {} - # step 1, compute a base number of each entity types: number of already - # existing entities of this type + `how_many` - for etype in unprotected_entities(schema, strict=True): - howmanydict[str(etype)] = cnx.execute('Any COUNT(X) WHERE X is %s' % etype)[0][0] - if etype in unprotected: - howmanydict[str(etype)] += how_many - # step 2, augment nb entity per types to satisfy cardinality constraints, - # by recomputing for each relation that constrained an entity type: - # - # new num for etype = max(current num, sum(num for possible target etypes)) - # - # XXX we should first check there is no cycle then propagate changes - for (rschema, etype), targets in relmap.items(): - relfactor = sum(howmanydict[e] for e in targets) - howmanydict[str(etype)] = max(relfactor, howmanydict[etype]) - return howmanydict - - -class AutoPopulateTest(CubicWebTC): - """base class for test with auto-populating of the database""" - __abstract__ = True - - test_db_id = 'autopopulate' - - tags = CubicWebTC.tags | Tags('autopopulated') - - pdbclass = CubicWebDebugger - # this is a hook to be able to define a list of rql queries - # that are application dependent and cannot be guessed automatically - application_rql = [] - - no_auto_populate = () - ignored_relations = set() - - def to_test_etypes(self): - return unprotected_entities(self.schema, strict=True) - - def custom_populate(self, how_many, cnx): - pass - - def post_populate(self, cnx): - pass - - @nocoverage - def auto_populate(self, how_many): - """this method populates the database with `how_many` entities - of each possible type. It also inserts random relations between them - """ - with self.admin_access.cnx() as cnx: - with cnx.security_enabled(read=False, write=False): - self._auto_populate(cnx, how_many) - cnx.commit() - - def _auto_populate(self, cnx, how_many): - self.custom_populate(how_many, cnx) - vreg = self.vreg - howmanydict = how_many_dict(self.schema, cnx, how_many, self.no_auto_populate) - for etype in unprotected_entities(self.schema): - if etype in self.no_auto_populate: - continue - nb = howmanydict.get(etype, how_many) - for rql, args in insert_entity_queries(etype, self.schema, vreg, nb): - cnx.execute(rql, args) - edict = {} - for etype in unprotected_entities(self.schema, strict=True): - rset = cnx.execute('%s X' % etype) - edict[str(etype)] = set(row[0] for row in rset.rows) - existingrels = {} - ignored_relations = SYSTEM_RELATIONS | self.ignored_relations - for rschema in self.schema.relations(): - if rschema.final or rschema in ignored_relations: - continue - rset = cnx.execute('DISTINCT Any X,Y WHERE X %s Y' % rschema) - existingrels.setdefault(rschema.type, set()).update((x, y) for x, y in rset) - q = make_relations_queries(self.schema, edict, cnx, ignored_relations, - existingrels=existingrels) - for rql, args in q: - try: - cnx.execute(rql, args) - except ValidationError as ex: - # failed to satisfy some constraint - print('error in automatic db population', ex) - cnx.commit_state = None # reset uncommitable flag - self.post_populate(cnx) - - def iter_individual_rsets(self, etypes=None, limit=None): - etypes = etypes or self.to_test_etypes() - with self.admin_access.web_request() as req: - for etype in etypes: - if limit: - rql = 'Any X LIMIT %s WHERE X is %s' % (limit, etype) - else: - rql = 'Any X WHERE X is %s' % etype - rset = req.execute(rql) - for row in range(len(rset)): - if limit and row > limit: - break - # XXX iirk - rset2 = rset.limit(limit=1, offset=row) - yield rset2 - - def iter_automatic_rsets(self, limit=10): - """generates basic resultsets for each entity type""" - etypes = self.to_test_etypes() - if not etypes: - return - with self.admin_access.web_request() as req: - for etype in etypes: - yield req.execute('Any X LIMIT %s WHERE X is %s' % (limit, etype)) - etype1 = etypes.pop() - try: - etype2 = etypes.pop() - except KeyError: - etype2 = etype1 - # test a mixed query (DISTINCT/GROUP to avoid getting duplicate - # X which make muledit view failing for instance (html validation fails - # because of some duplicate "id" attributes) - yield req.execute('DISTINCT Any X, MAX(Y) GROUPBY X WHERE X is %s, Y is %s' % - (etype1, etype2)) - # test some application-specific queries if defined - for rql in self.application_rql: - yield req.execute(rql) - - def _test_everything_for(self, rset): - """this method tries to find everything that can be tested - for `rset` and yields a callable test (as needed in generative tests) - """ - propdefs = self.vreg['propertydefs'] - # make all components visible - for k, v in propdefs.items(): - if k.endswith('visible') and not v['default']: - propdefs[k]['default'] = True - for view in self.list_views_for(rset): - backup_rset = rset.copy(rset.rows, rset.description) - yield InnerTest(self._testname(rset, view.__regid__, 'view'), - self.view, view.__regid__, rset, - rset.req.reset_headers(), 'main-template') - # We have to do this because some views modify the - # resultset's syntax tree - rset = backup_rset - for action in self.list_actions_for(rset): - yield InnerTest(self._testname(rset, action.__regid__, 'action'), - self._test_action, action) - for box in self.list_boxes_for(rset): - w = [].append - yield InnerTest(self._testname(rset, box.__regid__, 'box'), box.render, w) - - @staticmethod - def _testname(rset, objid, objtype): - return '%s_%s_%s' % ('_'.join(rset.column_types(0)), objid, objtype) - - -# concrete class for automated application testing ############################ - -class AutomaticWebTest(AutoPopulateTest): - """import this if you wan automatic tests to be ran""" - - tags = AutoPopulateTest.tags | Tags('web', 'generated') - - def setUp(self): - if self.__class__ is AutomaticWebTest: - # Prevent direct use of AutomaticWebTest to avoid database caching - # issues. - return - super(AutomaticWebTest, self).setUp() - - # access to self.app for proper initialization of the authentication - # machinery (else some views may fail) - self.app - - def test_one_each_config(self): - self.auto_populate(1) - for rset in self.iter_automatic_rsets(limit=1): - for testargs in self._test_everything_for(rset): - yield testargs - - def test_ten_each_config(self): - self.auto_populate(10) - for rset in self.iter_automatic_rsets(limit=10): - for testargs in self._test_everything_for(rset): - yield testargs - - def test_startup_views(self): - for vid in self.list_startup_views(): - with self.admin_access.web_request() as req: - yield self.view, vid, None, req - - -# registry instrumentization ################################################### - -def not_selected(vreg, appobject): - try: - vreg._selected[appobject.__class__] -= 1 - except (KeyError, AttributeError): - pass - - -# def vreg_instrumentize(testclass): -# # XXX broken -# from cubicweb.devtools.apptest import TestEnvironment -# env = testclass._env = TestEnvironment('data', configcls=testclass.configcls) -# for reg in env.vreg.values(): -# reg._selected = {} -# try: -# orig_select_best = reg.__class__.__orig_select_best -# except Exception: -# orig_select_best = reg.__class__._select_best -# def instr_select_best(self, *args, **kwargs): -# selected = orig_select_best(self, *args, **kwargs) -# try: -# self._selected[selected.__class__] += 1 -# except KeyError: -# self._selected[selected.__class__] = 1 -# except AttributeError: -# pass # occurs on reg used to restore database -# return selected -# reg.__class__._select_best = instr_select_best -# reg.__class__.__orig_select_best = orig_select_best - - -# def print_untested_objects(testclass, skipregs=('hooks', 'etypes')): -# for regname, reg in testclass._env.vreg.items(): -# if regname in skipregs: -# continue -# for appobjects in reg.values(): -# for appobject in appobjects: -# if not reg._selected.get(appobject): -# print 'not tested', regname, appobject