diff -r 058bb3dc685f -r 0b59724cb3f2 cubicweb/devtools/testlib.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cubicweb/devtools/testlib.py Sat Jan 16 13:48:51 2016 +0100 @@ -0,0 +1,1335 @@ +# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr +# +# This file is part of CubicWeb. +# +# CubicWeb is free software: you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 2.1 of the License, or (at your option) +# any later version. +# +# CubicWeb is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with CubicWeb. If not, see . +"""this module contains base classes and utilities for cubicweb tests""" +from __future__ import print_function + +import sys +import re +from os.path import dirname, join, abspath +from math import log +from contextlib import contextmanager +from itertools import chain + +from six import text_type, string_types +from six.moves import range +from six.moves.urllib.parse import urlparse, parse_qs, unquote as urlunquote + +import yams.schema + +from logilab.common.testlib import TestCase, InnerTest, Tags +from logilab.common.pytest import nocoverage, pause_trace +from logilab.common.debugger import Debugger +from logilab.common.umessage import message_from_string +from logilab.common.decorators import cached, classproperty, clear_cache, iclassmethod +from logilab.common.deprecation import deprecated, class_deprecated +from logilab.common.shellutils import getlogin + +from cubicweb import (ValidationError, NoSelectableObject, AuthenticationError, + BadConnectionId) +from cubicweb import cwconfig, devtools, web, server, repoapi +from cubicweb.utils import json +from cubicweb.sobjects import notification +from cubicweb.web import Redirect, application, eid_param +from cubicweb.server.hook import SendMailOp +from cubicweb.server.session import Session +from cubicweb.devtools import SYSTEM_ENTITIES, SYSTEM_RELATIONS, VIEW_VALIDATORS +from cubicweb.devtools import fake, htmlparser, DEFAULT_EMPTY_DB_ID + + +# low-level utilities ########################################################## + +class CubicWebDebugger(Debugger): + """special debugger class providing a 'view' function which saves some + html into a temporary file and open a web browser to examinate it. + """ + def do_view(self, arg): + import webbrowser + data = self._getval(arg) + with open('/tmp/toto.html', 'w') as toto: + toto.write(data) + webbrowser.open('file:///tmp/toto.html') + + +def line_context_filter(line_no, center, before=3, after=None): + """return true if line are in context + + if after is None: after = before + """ + if after is None: + after = before + return center - before <= line_no <= center + after + + +def unprotected_entities(schema, strict=False): + """returned a set of each non final entity type, excluding "system" entities + (eg CWGroup, CWUser...) + """ + if strict: + protected_entities = yams.schema.BASE_TYPES + else: + protected_entities = yams.schema.BASE_TYPES.union(SYSTEM_ENTITIES) + return set(schema.entities()) - protected_entities + + +class JsonValidator(object): + def parse_string(self, data): + return json.loads(data.decode('ascii')) + + +@contextmanager +def real_error_handling(app): + """By default, CubicWebTC `app` attribute (ie the publisher) is monkey + patched so that unexpected error are raised rather than going through the + `error_handler` method. + + By using this context manager you disable this monkey-patching temporarily. + Hence when publishihng a request no error will be raised, you'll get + req.status_out set to an HTTP error status code and the generated page will + usually hold a traceback as HTML. + + >>> with real_error_handling(app): + >>> page = app.handle_request(req) + """ + # remove the monkey patched error handler + fake_error_handler = app.error_handler + del app.error_handler + # return the app + yield app + # restore + app.error_handler = fake_error_handler + + +# email handling, to test emails sent by an application ######################## + +MAILBOX = [] + + +class Email(object): + """you'll get instances of Email into MAILBOX during tests that trigger + some notification. + + * `msg` is the original message object + + * `recipients` is a list of email address which are the recipients of this + message + """ + def __init__(self, fromaddr, recipients, msg): + self.fromaddr = fromaddr + self.recipients = recipients + self.msg = msg + + @property + def message(self): + return message_from_string(self.msg) + + @property + def subject(self): + return self.message.get('Subject') + + @property + def content(self): + return self.message.get_payload(decode=True) + + def __repr__(self): + return '' % (','.join(self.recipients), + self.message.get('Subject')) + + +# the trick to get email into MAILBOX instead of actually sent: monkey patch +# cwconfig.SMTP object +class MockSMTP: + + def __init__(self, server, port): + pass + + def close(self): + pass + + def sendmail(self, fromaddr, recipients, msg): + MAILBOX.append(Email(fromaddr, recipients, msg)) + +cwconfig.SMTP = MockSMTP + + +# Repoaccess utility ###############################################3########### + +class RepoAccess(object): + """An helper to easily create object to access the repo as a specific user + + Each RepoAccess have it own session. + + A repo access can create three type of object: + + .. automethod:: cubicweb.testlib.RepoAccess.cnx + .. automethod:: cubicweb.testlib.RepoAccess.web_request + + The RepoAccess need to be closed to destroy the associated Session. + TestCase usually take care of this aspect for the user. + + .. automethod:: cubicweb.testlib.RepoAccess.close + """ + + def __init__(self, repo, login, requestcls): + self._repo = repo + self._login = login + self.requestcls = requestcls + self._session = self._unsafe_connect(login) + + def _unsafe_connect(self, login, **kwargs): + """ a completely unsafe connect method for the tests """ + # use an internal connection + with self._repo.internal_cnx() as cnx: + # try to get a user object + user = cnx.find('CWUser', login=login).one() + user.groups + user.properties + user.login + session = Session(user, self._repo) + self._repo._sessions[session.sessionid] = session + user._cw = user.cw_rset.req = session + with session.new_cnx() as cnx: + self._repo.hm.call_hooks('session_open', cnx) + # commit connection at this point in case write operation has been + # done during `session_open` hooks + cnx.commit() + return session + + @contextmanager + def cnx(self): + """Context manager returning a server side connection for the user""" + with self._session.new_cnx() as cnx: + yield cnx + + # aliases for bw compat + client_cnx = repo_cnx = cnx + + @contextmanager + def web_request(self, url=None, headers={}, method='GET', **kwargs): + """Context manager returning a web request pre-linked to a client cnx + + To commit and rollback use:: + + req.cnx.commit() + req.cnx.rolback() + """ + req = self.requestcls(self._repo.vreg, url=url, headers=headers, + method=method, form=kwargs) + with self._session.new_cnx() as cnx: + req.set_cnx(cnx) + yield req + + def close(self): + """Close the session associated to the RepoAccess""" + if self._session is not None: + self._repo.close(self._session.sessionid) + self._session = None + + @contextmanager + def shell(self): + from cubicweb.server.migractions import ServerMigrationHelper + with self._session.new_cnx() as cnx: + mih = ServerMigrationHelper(None, repo=self._repo, cnx=cnx, + interactive=False, + # hack so it don't try to load fs schema + schema=1) + yield mih + cnx.commit() + + +# base class for cubicweb tests requiring a full cw environments ############### + +class CubicWebTC(TestCase): + """abstract class for test using an apptest environment + + attributes: + + * `vreg`, the vregistry + * `schema`, self.vreg.schema + * `config`, cubicweb configuration + * `cnx`, repoapi connection to the repository using an admin user + * `session`, server side session associated to `cnx` + * `app`, the cubicweb publisher (for web testing) + * `repo`, the repository object + * `admlogin`, login of the admin user + * `admpassword`, password of the admin user + * `shell`, create and use shell environment + * `anonymous_allowed`: flag telling if anonymous browsing should be allowed + """ + appid = 'data' + configcls = devtools.ApptestConfiguration + requestcls = fake.FakeRequest + tags = TestCase.tags | Tags('cubicweb', 'cw_repo') + test_db_id = DEFAULT_EMPTY_DB_ID + + # anonymous is logged by default in cubicweb test cases + anonymous_allowed = True + + def __init__(self, *args, **kwargs): + self._admin_session = None + self.repo = None + self._open_access = set() + super(CubicWebTC, self).__init__(*args, **kwargs) + + # repository connection handling ########################################### + + def new_access(self, login): + """provide a new RepoAccess object for a given user + + The access is automatically closed at the end of the test.""" + login = text_type(login) + access = RepoAccess(self.repo, login, self.requestcls) + self._open_access.add(access) + return access + + def _close_access(self): + while self._open_access: + try: + self._open_access.pop().close() + except BadConnectionId: + continue # already closed + + @property + def session(self): + """return admin session""" + return self._admin_session + + # XXX this doesn't need to a be classmethod anymore + def _init_repo(self): + """init the repository and connection to it. + """ + # get or restore and working db. + db_handler = devtools.get_test_db_handler(self.config, self.init_config) + db_handler.build_db_cache(self.test_db_id, self.pre_setup_database) + db_handler.restore_database(self.test_db_id) + self.repo = db_handler.get_repo(startup=True) + # get an admin session (without actual login) + login = text_type(db_handler.config.default_admin_config['login']) + self.admin_access = self.new_access(login) + self._admin_session = self.admin_access._session + + # config management ######################################################## + + @classproperty + def config(cls): + """return the configuration object + + Configuration is cached on the test class. + """ + if cls is CubicWebTC: + # Prevent direct use of CubicWebTC directly to avoid database + # caching issues + return None + try: + return cls.__dict__['_config'] + except KeyError: + home = abspath(join(dirname(sys.modules[cls.__module__].__file__), cls.appid)) + config = cls._config = cls.configcls(cls.appid, apphome=home) + config.mode = 'test' + return config + + @classmethod # XXX could be turned into a regular method + def init_config(cls, config): + """configuration initialization hooks. + + You may only want to override here the configuraton logic. + + Otherwise, consider to use a different :class:`ApptestConfiguration` + defined in the `configcls` class attribute. + + This method will be called by the database handler once the config has + been properly bootstrapped. + """ + admincfg = config.default_admin_config + cls.admlogin = text_type(admincfg['login']) + cls.admpassword = admincfg['password'] + # uncomment the line below if you want rql queries to be logged + # config.global_set_option('query-log-file', + # '/tmp/test_rql_log.' + `os.getpid()`) + config.global_set_option('log-file', None) + # set default-dest-addrs to a dumb email address to avoid mailbox or + # mail queue pollution + config.global_set_option('default-dest-addrs', ['whatever']) + send_to = '%s@logilab.fr' % getlogin() + config.global_set_option('sender-addr', send_to) + config.global_set_option('default-dest-addrs', send_to) + config.global_set_option('sender-name', 'cubicweb-test') + config.global_set_option('sender-addr', 'cubicweb-test@logilab.fr') + # default_base_url on config class isn't enough for TestServerConfiguration + config.global_set_option('base-url', config.default_base_url()) + # web resources + try: + config.global_set_option('embed-allowed', re.compile('.*')) + except Exception: # not in server only configuration + pass + + @property + def vreg(self): + return self.repo.vreg + + # global resources accessors ############################################### + + @property + def schema(self): + """return the application schema""" + return self.vreg.schema + + def set_option(self, optname, value): + self.config.global_set_option(optname, value) + + def set_debug(self, debugmode): + server.set_debug(debugmode) + + def debugged(self, debugmode): + return server.debugged(debugmode) + + # default test setup and teardown ######################################### + + def setUp(self): + # monkey patch send mail operation so emails are sent synchronously + self._patch_SendMailOp() + with pause_trace(): + previous_failure = self.__class__.__dict__.get('_repo_init_failed') + if previous_failure is not None: + self.skipTest('repository is not initialised: %r' % previous_failure) + try: + self._init_repo() + except Exception as ex: + self.__class__._repo_init_failed = ex + raise + self.addCleanup(self._close_access) + self.config.set_anonymous_allowed(self.anonymous_allowed) + self.setup_database() + MAILBOX[:] = [] # reset mailbox + + def tearDown(self): + # XXX hack until logilab.common.testlib is fixed + if self._admin_session is not None: + self.repo.close(self._admin_session.sessionid) + self._admin_session = None + while self._cleanups: + cleanup, args, kwargs = self._cleanups.pop(-1) + cleanup(*args, **kwargs) + self.repo.turn_repo_off() + + def _patch_SendMailOp(self): + # monkey patch send mail operation so emails are sent synchronously + _old_mail_postcommit_event = SendMailOp.postcommit_event + SendMailOp.postcommit_event = SendMailOp.sendmails + + def reverse_SendMailOp_monkey_patch(): + SendMailOp.postcommit_event = _old_mail_postcommit_event + + self.addCleanup(reverse_SendMailOp_monkey_patch) + + def setup_database(self): + """add your database setup code by overriding this method""" + + @classmethod + def pre_setup_database(cls, cnx, config): + """add your pre database setup code by overriding this method + + Do not forget to set the cls.test_db_id value to enable caching of the + result. + """ + + # user / session management ############################################### + + @deprecated('[3.19] explicitly use RepoAccess object in test instead') + def user(self, req=None): + """return the application schema""" + if req is None: + return self.request().user + else: + return req.user + + @iclassmethod # XXX turn into a class method + def create_user(self, req, login=None, groups=('users',), password=None, + email=None, commit=True, **kwargs): + """create and return a new user entity""" + if password is None: + password = login + if login is not None: + login = text_type(login) + user = req.create_entity('CWUser', login=login, + upassword=password, **kwargs) + req.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)' + % ','.join(repr(str(g)) for g in groups), + {'x': user.eid}) + if email is not None: + req.create_entity('EmailAddress', address=text_type(email), + reverse_primary_email=user) + user.cw_clear_relation_cache('in_group', 'subject') + if commit: + try: + req.commit() # req is a session + except AttributeError: + req.cnx.commit() + return user + + # other utilities ######################################################### + + @contextmanager + def temporary_appobjects(self, *appobjects): + self.vreg._loadedmods.setdefault(self.__module__, {}) + for obj in appobjects: + self.vreg.register(obj) + registered = getattr(obj, '__registered__', None) + if registered: + for registry in obj.__registries__: + registered(self.vreg[registry]) + try: + yield + finally: + for obj in appobjects: + self.vreg.unregister(obj) + + @contextmanager + def temporary_permissions(self, *perm_overrides, **perm_kwoverrides): + """Set custom schema permissions within context. + + There are two ways to call this method, which may be used together : + + * using positional argument(s): + + .. sourcecode:: python + + rdef = self.schema['CWUser'].rdef('login') + with self.temporary_permissions((rdef, {'read': ()})): + ... + + + * using named argument(s): + + .. sourcecode:: python + + with self.temporary_permissions(CWUser={'read': ()}): + ... + + Usually the former will be preferred to override permissions on a + relation definition, while the latter is well suited for entity types. + + The allowed keys in the permission dictionary depend on the schema type + (entity type / relation definition). Resulting permissions will be + similar to `orig_permissions.update(partial_perms)`. + """ + torestore = [] + for erschema, etypeperms in chain(perm_overrides, perm_kwoverrides.items()): + if isinstance(erschema, string_types): + erschema = self.schema[erschema] + for action, actionperms in etypeperms.items(): + origperms = erschema.permissions[action] + erschema.set_action_permissions(action, actionperms) + torestore.append([erschema, action, origperms]) + try: + yield + finally: + for erschema, action, permissions in torestore: + if action is None: + erschema.permissions = permissions + else: + erschema.set_action_permissions(action, permissions) + + def assertModificationDateGreater(self, entity, olddate): + entity.cw_attr_cache.pop('modification_date', None) + self.assertGreater(entity.modification_date, olddate) + + def assertMessageEqual(self, req, params, expected_msg): + msg = req.session.data[params['_cwmsgid']] + self.assertEqual(expected_msg, msg) + + # workflow utilities ####################################################### + + def assertPossibleTransitions(self, entity, expected): + transitions = entity.cw_adapt_to('IWorkflowable').possible_transitions() + self.assertListEqual(sorted(tr.name for tr in transitions), + sorted(expected)) + + # views and actions registries inspection ################################## + + def pviews(self, req, rset): + return sorted((a.__regid__, a.__class__) + for a in self.vreg['views'].possible_views(req, rset=rset)) + + def pactions(self, req, rset, + skipcategories=('addrelated', 'siteactions', 'useractions', + 'footer', 'manage')): + return [(a.__regid__, a.__class__) + for a in self.vreg['actions'].poss_visible_objects(req, rset=rset) + if a.category not in skipcategories] + + def pactions_by_cats(self, req, rset, categories=('addrelated',)): + return [(a.__regid__, a.__class__) + for a in self.vreg['actions'].poss_visible_objects(req, rset=rset) + if a.category in categories] + + def pactionsdict(self, req, rset, + skipcategories=('addrelated', 'siteactions', 'useractions', + 'footer', 'manage')): + res = {} + for a in self.vreg['actions'].poss_visible_objects(req, rset=rset): + if a.category not in skipcategories: + res.setdefault(a.category, []).append(a.__class__) + return res + + def action_submenu(self, req, rset, id): + return self._test_action(self.vreg['actions'].select(id, req, rset=rset)) + + def _test_action(self, action): + class fake_menu(list): + @property + def items(self): + return self + + class fake_box(object): + def action_link(self, action, **kwargs): + return (action.title, action.url()) + submenu = fake_menu() + action.fill_menu(fake_box(), submenu) + return submenu + + def list_views_for(self, rset): + """returns the list of views that can be applied on `rset`""" + req = rset.req + only_once_vids = ('primary', 'secondary', 'text') + req.data['ex'] = ValueError("whatever") + viewsvreg = self.vreg['views'] + for vid, views in viewsvreg.items(): + if vid[0] == '_': + continue + if rset.rowcount > 1 and vid in only_once_vids: + continue + views = [view for view in views + if view.category != 'startupview' + and not issubclass(view, notification.NotificationView) + and not isinstance(view, class_deprecated)] + if views: + try: + view = viewsvreg._select_best(views, req, rset=rset) + if view is None: + raise NoSelectableObject((req,), {'rset': rset}, views) + if view.linkable(): + yield view + else: + not_selected(self.vreg, view) + # else the view is expected to be used as subview and should + # not be tested directly + except NoSelectableObject: + continue + + def list_actions_for(self, rset): + """returns the list of actions that can be applied on `rset`""" + req = rset.req + for action in self.vreg['actions'].possible_objects(req, rset=rset): + yield action + + def list_boxes_for(self, rset): + """returns the list of boxes that can be applied on `rset`""" + req = rset.req + for box in self.vreg['ctxcomponents'].possible_objects(req, rset=rset): + yield box + + def list_startup_views(self): + """returns the list of startup views""" + with self.admin_access.web_request() as req: + for view in self.vreg['views'].possible_views(req, None): + if view.category == 'startupview': + yield view.__regid__ + else: + not_selected(self.vreg, view) + + # web ui testing utilities ################################################# + + @property + @cached + def app(self): + """return a cubicweb publisher""" + publisher = application.CubicWebPublisher(self.repo, self.config) + + def raise_error_handler(*args, **kwargs): + raise + + publisher.error_handler = raise_error_handler + return publisher + + @deprecated('[3.19] use the .remote_calling method') + def remote_call(self, fname, *args): + """remote json call simulation""" + dump = json.dumps + args = [dump(arg) for arg in args] + req = self.request(fname=fname, pageid='123', arg=args) + ctrl = self.vreg['controllers'].select('ajax', req) + return ctrl.publish(), req + + @contextmanager + def remote_calling(self, fname, *args): + """remote json call simulation""" + args = [json.dumps(arg) for arg in args] + with self.admin_access.web_request(fname=fname, pageid='123', arg=args) as req: + ctrl = self.vreg['controllers'].select('ajax', req) + yield ctrl.publish(), req + + def app_handle_request(self, req, path='view'): + return self.app.core_handle(req, path) + + @deprecated("[3.15] app_handle_request is the new and better way" + " (beware of small semantic changes)") + def app_publish(self, *args, **kwargs): + return self.app_handle_request(*args, **kwargs) + + def ctrl_publish(self, req, ctrl='edit', rset=None): + """call the publish method of the edit controller""" + ctrl = self.vreg['controllers'].select(ctrl, req, appli=self.app) + try: + result = ctrl.publish(rset) + req.cnx.commit() + except web.Redirect: + req.cnx.commit() + raise + return result + + @staticmethod + def fake_form(formid, field_dict=None, entity_field_dicts=()): + """Build _cw.form dictionnary to fake posting of some standard cubicweb form + + * `formid`, the form id, usually form's __regid__ + + * `field_dict`, dictionary of name:value for fields that are not tied to an entity + + * `entity_field_dicts`, list of (entity, dictionary) where dictionary contains name:value + for fields that are not tied to the given entity + """ + assert field_dict or entity_field_dicts, \ + 'field_dict and entity_field_dicts arguments must not be both unspecified' + if field_dict is None: + field_dict = {} + form = {'__form_id': formid} + fields = [] + for field, value in field_dict.items(): + fields.append(field) + form[field] = value + + def _add_entity_field(entity, field, value): + entity_fields.append(field) + form[eid_param(field, entity.eid)] = value + + for entity, field_dict in entity_field_dicts: + if '__maineid' not in form: + form['__maineid'] = entity.eid + entity_fields = [] + form.setdefault('eid', []).append(entity.eid) + _add_entity_field(entity, '__type', entity.cw_etype) + for field, value in field_dict.items(): + _add_entity_field(entity, field, value) + if entity_fields: + form[eid_param('_cw_entity_fields', entity.eid)] = ','.join(entity_fields) + if fields: + form['_cw_fields'] = ','.join(sorted(fields)) + return form + + @deprecated('[3.19] use .admin_request_from_url instead') + def req_from_url(self, url): + """parses `url` and builds the corresponding CW-web request + + req.form will be setup using the url's query string + """ + req = self.request(url=url) + if isinstance(url, unicode): + url = url.encode(req.encoding) # req.setup_params() expects encoded strings + querystring = urlparse(url)[-2] + params = parse_qs(querystring) + req.setup_params(params) + return req + + @contextmanager + def admin_request_from_url(self, url): + """parses `url` and builds the corresponding CW-web request + + req.form will be setup using the url's query string + """ + with self.admin_access.web_request(url=url) as req: + if isinstance(url, unicode): + url = url.encode(req.encoding) # req.setup_params() expects encoded strings + querystring = urlparse(url)[-2] + params = parse_qs(querystring) + req.setup_params(params) + yield req + + def url_publish(self, url, data=None): + """takes `url`, uses application's app_resolver to find the appropriate + controller and result set, then publishes the result. + + To simulate post of www-form-encoded data, give a `data` dictionary + containing desired key/value associations. + + This should pretty much correspond to what occurs in a real CW server + except the apache-rewriter component is not called. + """ + with self.admin_request_from_url(url) as req: + if data is not None: + req.form.update(data) + ctrlid, rset = self.app.url_resolver.process(req, req.relative_path(False)) + return self.ctrl_publish(req, ctrlid, rset) + + def http_publish(self, url, data=None): + """like `url_publish`, except this returns a http response, even in case + of errors. You may give form parameters using the `data` argument. + """ + with self.admin_request_from_url(url) as req: + if data is not None: + req.form.update(data) + with real_error_handling(self.app): + result = self.app_handle_request(req, req.relative_path(False)) + return result, req + + @staticmethod + def _parse_location(req, location): + try: + path, params = location.split('?', 1) + except ValueError: + path = location + params = {} + else: + cleanup = lambda p: (p[0], urlunquote(p[1])) + params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p) + if path.startswith(req.base_url()): # may be relative + path = path[len(req.base_url()):] + return path, params + + def expect_redirect(self, callback, req): + """call the given callback with req as argument, expecting to get a + Redirect exception + """ + try: + callback(req) + except Redirect as ex: + return self._parse_location(req, ex.location) + else: + self.fail('expected a Redirect exception') + + def expect_redirect_handle_request(self, req, path='edit'): + """call the publish method of the application publisher, expecting to + get a Redirect exception + """ + self.app_handle_request(req, path) + self.assertTrue(300 <= req.status_out < 400, req.status_out) + location = req.get_response_header('location') + return self._parse_location(req, location) + + @deprecated("[3.15] expect_redirect_handle_request is the new and better way" + " (beware of small semantic changes)") + def expect_redirect_publish(self, *args, **kwargs): + return self.expect_redirect_handle_request(*args, **kwargs) + + def set_auth_mode(self, authmode, anonuser=None): + self.set_option('auth-mode', authmode) + self.set_option('anonymous-user', anonuser) + if anonuser is None: + self.config.anonymous_credential = None + else: + self.config.anonymous_credential = (anonuser, anonuser) + + def init_authentication(self, authmode, anonuser=None): + self.set_auth_mode(authmode, anonuser) + req = self.requestcls(self.vreg, url='login') + sh = self.app.session_handler + authm = sh.session_manager.authmanager + authm.anoninfo = self.vreg.config.anonymous_user() + authm.anoninfo = authm.anoninfo[0], {'password': authm.anoninfo[1]} + # not properly cleaned between tests + self.open_sessions = sh.session_manager._sessions = {} + return req, self.session + + def assertAuthSuccess(self, req, origsession, nbsessions=1): + sh = self.app.session_handler + session = self.app.get_session(req) + cnx = repoapi.Connection(session) + req.set_cnx(cnx) + self.assertEqual(len(self.open_sessions), nbsessions, self.open_sessions) + self.assertEqual(session.login, origsession.login) + self.assertEqual(session.anonymous_session, False) + + def assertAuthFailure(self, req, nbsessions=0): + with self.assertRaises(AuthenticationError): + self.app.get_session(req) + # +0 since we do not track the opened session + self.assertEqual(len(self.open_sessions), nbsessions) + clear_cache(req, 'get_authorization') + + # content validation ####################################################### + + # validators are used to validate (XML, DTD, whatever) view's content + # validators availables are : + # DTDValidator : validates XML + declared DTD + # SaxOnlyValidator : guarantees XML is well formed + # None : do not try to validate anything + # validators used must be imported from from.devtools.htmlparser + content_type_validators = { + # maps MIME type : validator name + # + # do not set html validators here, we need HTMLValidator for html + # snippets + # 'text/html': DTDValidator, + # 'application/xhtml+xml': DTDValidator, + 'application/xml': htmlparser.XMLValidator, + 'text/xml': htmlparser.XMLValidator, + 'application/json': JsonValidator, + 'text/plain': None, + 'text/comma-separated-values': None, + 'text/x-vcard': None, + 'text/calendar': None, + 'image/png': None, + } + # maps vid : validator name (override content_type_validators) + vid_validators = dict((vid, htmlparser.VALMAP[valkey]) + for vid, valkey in VIEW_VALIDATORS.items()) + + def view(self, vid, rset=None, req=None, template='main-template', + **kwargs): + """This method tests the view `vid` on `rset` using `template` + + If no error occurred while rendering the view, the HTML is analyzed + and parsed. + + :returns: an instance of `cubicweb.devtools.htmlparser.PageInfo` + encapsulation the generated HTML + """ + if req is None: + if rset is None: + req = self.request() + else: + req = rset.req + req.form['vid'] = vid + viewsreg = self.vreg['views'] + view = viewsreg.select(vid, req, rset=rset, **kwargs) + # set explicit test description + if rset is not None: + # coerce to "bytes" on py2 because the description will be sent to + # sys.stdout/stderr which takes "bytes" on py2 and "unicode" on py3 + rql = str(rset.printable_rql()) + self.set_description("testing vid=%s defined in %s with (%s)" % ( + vid, view.__module__, rql)) + else: + self.set_description("testing vid=%s defined in %s without rset" % ( + vid, view.__module__)) + if template is None: # raw view testing, no template + viewfunc = view.render + else: + kwargs['view'] = view + viewfunc = lambda **k: viewsreg.main_template(req, template, + rset=rset, **kwargs) + return self._test_view(viewfunc, view, template, kwargs) + + def _test_view(self, viewfunc, view, template='main-template', kwargs={}): + """this method does the actual call to the view + + If no error occurred while rendering the view, the HTML is analyzed + and parsed. + + :returns: an instance of `cubicweb.devtools.htmlparser.PageInfo` + encapsulation the generated HTML + """ + try: + output = viewfunc(**kwargs) + except Exception: + # hijack exception: generative tests stop when the exception + # is not an AssertionError + klass, exc, tcbk = sys.exc_info() + try: + msg = '[%s in %s] %s' % (klass, view.__regid__, exc) + except Exception: + msg = '[%s in %s] undisplayable exception' % (klass, view.__regid__) + exc = AssertionError(msg) + exc.__traceback__ = tcbk + raise exc + return self._check_html(output, view, template) + + def get_validator(self, view=None, content_type=None, output=None): + if view is not None: + try: + return self.vid_validators[view.__regid__]() + except KeyError: + if content_type is None: + content_type = view.content_type + if content_type is None: + content_type = 'text/html' + if content_type in ('text/html', 'application/xhtml+xml') and output: + if output.startswith(b''): + # only check XML well-formness since HTMLValidator isn't html5 + # compatible and won't like various other extensions + default_validator = htmlparser.XMLSyntaxValidator + elif output.startswith(b' used in progress widget, unknown in html dtd + output = re.sub('', '', output) + return self.assertWellFormed(validator, output.strip(), context=view.__regid__) + + def assertWellFormed(self, validator, content, context=None): + try: + return validator.parse_string(content) + except Exception: + # hijack exception: generative tests stop when the exception + # is not an AssertionError + klass, exc, tcbk = sys.exc_info() + if context is None: + msg = u'[%s]' % (klass,) + else: + msg = u'[%s in %s]' % (klass, context) + msg = msg.encode(sys.getdefaultencoding(), 'replace') + + try: + str_exc = str(exc) + except Exception: + str_exc = 'undisplayable exception' + msg += str_exc.encode(sys.getdefaultencoding(), 'replace') + if content is not None: + position = getattr(exc, "position", (0,))[0] + if position: + # define filter + if isinstance(content, str): + content = unicode(content, sys.getdefaultencoding(), 'replace') + content = validator.preprocess_data(content) + content = content.splitlines() + width = int(log(len(content), 10)) + 1 + line_template = " %" + ("%i" % width) + "i: %s" + # XXX no need to iterate the whole file except to get + # the line number + content = u'\n'.join(line_template % (idx + 1, line) + for idx, line in enumerate(content) + if line_context_filter(idx+1, position)) + msg += u'\nfor content:\n%s' % content + exc = AssertionError(msg) + exc.__traceback__ = tcbk + raise exc + + def assertDocTestFile(self, testfile): + # doctest returns tuple (failure_count, test_count) + with self.admin_access.shell() as mih: + result = mih.process_script(testfile) + if result[0] and result[1]: + raise self.failureException("doctest file '%s' failed" + % testfile) + + # notifications ############################################################ + + def assertSentEmail(self, subject, recipients=None, nb_msgs=None): + """test recipients in system mailbox for given email subject + + :param subject: email subject to find in mailbox + :param recipients: list of email recipients + :param nb_msgs: expected number of entries + :returns: list of matched emails + """ + messages = [email for email in MAILBOX + if email.message.get('Subject') == subject] + if recipients is not None: + sent_to = set() + for msg in messages: + sent_to.update(msg.recipients) + self.assertSetEqual(set(recipients), sent_to) + if nb_msgs is not None: + self.assertEqual(len(MAILBOX), nb_msgs) + return messages + + +# auto-populating test classes and utilities ################################### + +from cubicweb.devtools.fill import insert_entity_queries, make_relations_queries + +# XXX cleanup unprotected_entities & all mess + + +def how_many_dict(schema, cnx, how_many, skip): + """given a schema, compute how many entities by type we need to be able to + satisfy relations cardinality. + + The `how_many` argument tells how many entities of which type we want at + least. + + Return a dictionary with entity types as key, and the number of entities for + this type as value. + """ + relmap = {} + for rschema in schema.relations(): + if rschema.final: + continue + for subj, obj in rschema.rdefs: + card = rschema.rdef(subj, obj).cardinality + # if the relation is mandatory, we'll need at least as many subj and + # obj to satisfy it + if card[0] in '1+' and card[1] in '1?': + # subj has to be linked to at least one obj, + # but obj can be linked to only one subj + # -> we need at least as many subj as obj to satisfy + # cardinalities for this relation + relmap.setdefault((rschema, subj), []).append(str(obj)) + if card[1] in '1+' and card[0] in '1?': + # reverse subj and obj in the above explanation + relmap.setdefault((rschema, obj), []).append(str(subj)) + unprotected = unprotected_entities(schema) + for etype in skip: # XXX (syt) duh? explain or kill + unprotected.add(etype) + howmanydict = {} + # step 1, compute a base number of each entity types: number of already + # existing entities of this type + `how_many` + for etype in unprotected_entities(schema, strict=True): + howmanydict[str(etype)] = cnx.execute('Any COUNT(X) WHERE X is %s' % etype)[0][0] + if etype in unprotected: + howmanydict[str(etype)] += how_many + # step 2, augment nb entity per types to satisfy cardinality constraints, + # by recomputing for each relation that constrained an entity type: + # + # new num for etype = max(current num, sum(num for possible target etypes)) + # + # XXX we should first check there is no cycle then propagate changes + for (rschema, etype), targets in relmap.items(): + relfactor = sum(howmanydict[e] for e in targets) + howmanydict[str(etype)] = max(relfactor, howmanydict[etype]) + return howmanydict + + +class AutoPopulateTest(CubicWebTC): + """base class for test with auto-populating of the database""" + __abstract__ = True + + test_db_id = 'autopopulate' + + tags = CubicWebTC.tags | Tags('autopopulated') + + pdbclass = CubicWebDebugger + # this is a hook to be able to define a list of rql queries + # that are application dependent and cannot be guessed automatically + application_rql = [] + + no_auto_populate = () + ignored_relations = set() + + def to_test_etypes(self): + return unprotected_entities(self.schema, strict=True) + + def custom_populate(self, how_many, cnx): + pass + + def post_populate(self, cnx): + pass + + @nocoverage + def auto_populate(self, how_many): + """this method populates the database with `how_many` entities + of each possible type. It also inserts random relations between them + """ + with self.admin_access.cnx() as cnx: + with cnx.security_enabled(read=False, write=False): + self._auto_populate(cnx, how_many) + cnx.commit() + + def _auto_populate(self, cnx, how_many): + self.custom_populate(how_many, cnx) + vreg = self.vreg + howmanydict = how_many_dict(self.schema, cnx, how_many, self.no_auto_populate) + for etype in unprotected_entities(self.schema): + if etype in self.no_auto_populate: + continue + nb = howmanydict.get(etype, how_many) + for rql, args in insert_entity_queries(etype, self.schema, vreg, nb): + cnx.execute(rql, args) + edict = {} + for etype in unprotected_entities(self.schema, strict=True): + rset = cnx.execute('%s X' % etype) + edict[str(etype)] = set(row[0] for row in rset.rows) + existingrels = {} + ignored_relations = SYSTEM_RELATIONS | self.ignored_relations + for rschema in self.schema.relations(): + if rschema.final or rschema in ignored_relations: + continue + rset = cnx.execute('DISTINCT Any X,Y WHERE X %s Y' % rschema) + existingrels.setdefault(rschema.type, set()).update((x, y) for x, y in rset) + q = make_relations_queries(self.schema, edict, cnx, ignored_relations, + existingrels=existingrels) + for rql, args in q: + try: + cnx.execute(rql, args) + except ValidationError as ex: + # failed to satisfy some constraint + print('error in automatic db population', ex) + cnx.commit_state = None # reset uncommitable flag + self.post_populate(cnx) + + def iter_individual_rsets(self, etypes=None, limit=None): + etypes = etypes or self.to_test_etypes() + with self.admin_access.web_request() as req: + for etype in etypes: + if limit: + rql = 'Any X LIMIT %s WHERE X is %s' % (limit, etype) + else: + rql = 'Any X WHERE X is %s' % etype + rset = req.execute(rql) + for row in range(len(rset)): + if limit and row > limit: + break + # XXX iirk + rset2 = rset.limit(limit=1, offset=row) + yield rset2 + + def iter_automatic_rsets(self, limit=10): + """generates basic resultsets for each entity type""" + etypes = self.to_test_etypes() + if not etypes: + return + with self.admin_access.web_request() as req: + for etype in etypes: + yield req.execute('Any X LIMIT %s WHERE X is %s' % (limit, etype)) + etype1 = etypes.pop() + try: + etype2 = etypes.pop() + except KeyError: + etype2 = etype1 + # test a mixed query (DISTINCT/GROUP to avoid getting duplicate + # X which make muledit view failing for instance (html validation fails + # because of some duplicate "id" attributes) + yield req.execute('DISTINCT Any X, MAX(Y) GROUPBY X WHERE X is %s, Y is %s' % + (etype1, etype2)) + # test some application-specific queries if defined + for rql in self.application_rql: + yield req.execute(rql) + + def _test_everything_for(self, rset): + """this method tries to find everything that can be tested + for `rset` and yields a callable test (as needed in generative tests) + """ + propdefs = self.vreg['propertydefs'] + # make all components visible + for k, v in propdefs.items(): + if k.endswith('visible') and not v['default']: + propdefs[k]['default'] = True + for view in self.list_views_for(rset): + backup_rset = rset.copy(rset.rows, rset.description) + yield InnerTest(self._testname(rset, view.__regid__, 'view'), + self.view, view.__regid__, rset, + rset.req.reset_headers(), 'main-template') + # We have to do this because some views modify the + # resultset's syntax tree + rset = backup_rset + for action in self.list_actions_for(rset): + yield InnerTest(self._testname(rset, action.__regid__, 'action'), + self._test_action, action) + for box in self.list_boxes_for(rset): + w = [].append + yield InnerTest(self._testname(rset, box.__regid__, 'box'), box.render, w) + + @staticmethod + def _testname(rset, objid, objtype): + return '%s_%s_%s' % ('_'.join(rset.column_types(0)), objid, objtype) + + +# concrete class for automated application testing ############################ + +class AutomaticWebTest(AutoPopulateTest): + """import this if you wan automatic tests to be ran""" + + tags = AutoPopulateTest.tags | Tags('web', 'generated') + + def setUp(self): + if self.__class__ is AutomaticWebTest: + # Prevent direct use of AutomaticWebTest to avoid database caching + # issues. + return + super(AutomaticWebTest, self).setUp() + + # access to self.app for proper initialization of the authentication + # machinery (else some views may fail) + self.app + + def test_one_each_config(self): + self.auto_populate(1) + for rset in self.iter_automatic_rsets(limit=1): + for testargs in self._test_everything_for(rset): + yield testargs + + def test_ten_each_config(self): + self.auto_populate(10) + for rset in self.iter_automatic_rsets(limit=10): + for testargs in self._test_everything_for(rset): + yield testargs + + def test_startup_views(self): + for vid in self.list_startup_views(): + with self.admin_access.web_request() as req: + yield self.view, vid, None, req + + +# registry instrumentization ################################################### + +def not_selected(vreg, appobject): + try: + vreg._selected[appobject.__class__] -= 1 + except (KeyError, AttributeError): + pass + + +# def vreg_instrumentize(testclass): +# # XXX broken +# from cubicweb.devtools.apptest import TestEnvironment +# env = testclass._env = TestEnvironment('data', configcls=testclass.configcls) +# for reg in env.vreg.values(): +# reg._selected = {} +# try: +# orig_select_best = reg.__class__.__orig_select_best +# except Exception: +# orig_select_best = reg.__class__._select_best +# def instr_select_best(self, *args, **kwargs): +# selected = orig_select_best(self, *args, **kwargs) +# try: +# self._selected[selected.__class__] += 1 +# except KeyError: +# self._selected[selected.__class__] = 1 +# except AttributeError: +# pass # occurs on reg used to restore database +# return selected +# reg.__class__._select_best = instr_select_best +# reg.__class__.__orig_select_best = orig_select_best + + +# def print_untested_objects(testclass, skipregs=('hooks', 'etypes')): +# for regname, reg in testclass._env.vreg.items(): +# if regname in skipregs: +# continue +# for appobjects in reg.values(): +# for appobject in appobjects: +# if not reg._selected.get(appobject): +# print 'not tested', regname, appobject