--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/cubicweb/devtools/testlib.py Sat Jan 16 13:48:51 2016 +0100
@@ -0,0 +1,1335 @@
+# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
+#
+# This file is part of CubicWeb.
+#
+# CubicWeb is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License along
+# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
+"""this module contains base classes and utilities for cubicweb tests"""
+from __future__ import print_function
+
+import sys
+import re
+from os.path import dirname, join, abspath
+from math import log
+from contextlib import contextmanager
+from itertools import chain
+
+from six import text_type, string_types
+from six.moves import range
+from six.moves.urllib.parse import urlparse, parse_qs, unquote as urlunquote
+
+import yams.schema
+
+from logilab.common.testlib import TestCase, InnerTest, Tags
+from logilab.common.pytest import nocoverage, pause_trace
+from logilab.common.debugger import Debugger
+from logilab.common.umessage import message_from_string
+from logilab.common.decorators import cached, classproperty, clear_cache, iclassmethod
+from logilab.common.deprecation import deprecated, class_deprecated
+from logilab.common.shellutils import getlogin
+
+from cubicweb import (ValidationError, NoSelectableObject, AuthenticationError,
+ BadConnectionId)
+from cubicweb import cwconfig, devtools, web, server, repoapi
+from cubicweb.utils import json
+from cubicweb.sobjects import notification
+from cubicweb.web import Redirect, application, eid_param
+from cubicweb.server.hook import SendMailOp
+from cubicweb.server.session import Session
+from cubicweb.devtools import SYSTEM_ENTITIES, SYSTEM_RELATIONS, VIEW_VALIDATORS
+from cubicweb.devtools import fake, htmlparser, DEFAULT_EMPTY_DB_ID
+
+
+# low-level utilities ##########################################################
+
+class CubicWebDebugger(Debugger):
+ """special debugger class providing a 'view' function which saves some
+ html into a temporary file and open a web browser to examinate it.
+ """
+ def do_view(self, arg):
+ import webbrowser
+ data = self._getval(arg)
+ with open('/tmp/toto.html', 'w') as toto:
+ toto.write(data)
+ webbrowser.open('file:///tmp/toto.html')
+
+
+def line_context_filter(line_no, center, before=3, after=None):
+ """return true if line are in context
+
+ if after is None: after = before
+ """
+ if after is None:
+ after = before
+ return center - before <= line_no <= center + after
+
+
+def unprotected_entities(schema, strict=False):
+ """returned a set of each non final entity type, excluding "system" entities
+ (eg CWGroup, CWUser...)
+ """
+ if strict:
+ protected_entities = yams.schema.BASE_TYPES
+ else:
+ protected_entities = yams.schema.BASE_TYPES.union(SYSTEM_ENTITIES)
+ return set(schema.entities()) - protected_entities
+
+
+class JsonValidator(object):
+ def parse_string(self, data):
+ return json.loads(data.decode('ascii'))
+
+
+@contextmanager
+def real_error_handling(app):
+ """By default, CubicWebTC `app` attribute (ie the publisher) is monkey
+ patched so that unexpected error are raised rather than going through the
+ `error_handler` method.
+
+ By using this context manager you disable this monkey-patching temporarily.
+ Hence when publishihng a request no error will be raised, you'll get
+ req.status_out set to an HTTP error status code and the generated page will
+ usually hold a traceback as HTML.
+
+ >>> with real_error_handling(app):
+ >>> page = app.handle_request(req)
+ """
+ # remove the monkey patched error handler
+ fake_error_handler = app.error_handler
+ del app.error_handler
+ # return the app
+ yield app
+ # restore
+ app.error_handler = fake_error_handler
+
+
+# email handling, to test emails sent by an application ########################
+
+MAILBOX = []
+
+
+class Email(object):
+ """you'll get instances of Email into MAILBOX during tests that trigger
+ some notification.
+
+ * `msg` is the original message object
+
+ * `recipients` is a list of email address which are the recipients of this
+ message
+ """
+ def __init__(self, fromaddr, recipients, msg):
+ self.fromaddr = fromaddr
+ self.recipients = recipients
+ self.msg = msg
+
+ @property
+ def message(self):
+ return message_from_string(self.msg)
+
+ @property
+ def subject(self):
+ return self.message.get('Subject')
+
+ @property
+ def content(self):
+ return self.message.get_payload(decode=True)
+
+ def __repr__(self):
+ return '<Email to %s with subject %s>' % (','.join(self.recipients),
+ self.message.get('Subject'))
+
+
+# the trick to get email into MAILBOX instead of actually sent: monkey patch
+# cwconfig.SMTP object
+class MockSMTP:
+
+ def __init__(self, server, port):
+ pass
+
+ def close(self):
+ pass
+
+ def sendmail(self, fromaddr, recipients, msg):
+ MAILBOX.append(Email(fromaddr, recipients, msg))
+
+cwconfig.SMTP = MockSMTP
+
+
+# Repoaccess utility ###############################################3###########
+
+class RepoAccess(object):
+ """An helper to easily create object to access the repo as a specific user
+
+ Each RepoAccess have it own session.
+
+ A repo access can create three type of object:
+
+ .. automethod:: cubicweb.testlib.RepoAccess.cnx
+ .. automethod:: cubicweb.testlib.RepoAccess.web_request
+
+ The RepoAccess need to be closed to destroy the associated Session.
+ TestCase usually take care of this aspect for the user.
+
+ .. automethod:: cubicweb.testlib.RepoAccess.close
+ """
+
+ def __init__(self, repo, login, requestcls):
+ self._repo = repo
+ self._login = login
+ self.requestcls = requestcls
+ self._session = self._unsafe_connect(login)
+
+ def _unsafe_connect(self, login, **kwargs):
+ """ a completely unsafe connect method for the tests """
+ # use an internal connection
+ with self._repo.internal_cnx() as cnx:
+ # try to get a user object
+ user = cnx.find('CWUser', login=login).one()
+ user.groups
+ user.properties
+ user.login
+ session = Session(user, self._repo)
+ self._repo._sessions[session.sessionid] = session
+ user._cw = user.cw_rset.req = session
+ with session.new_cnx() as cnx:
+ self._repo.hm.call_hooks('session_open', cnx)
+ # commit connection at this point in case write operation has been
+ # done during `session_open` hooks
+ cnx.commit()
+ return session
+
+ @contextmanager
+ def cnx(self):
+ """Context manager returning a server side connection for the user"""
+ with self._session.new_cnx() as cnx:
+ yield cnx
+
+ # aliases for bw compat
+ client_cnx = repo_cnx = cnx
+
+ @contextmanager
+ def web_request(self, url=None, headers={}, method='GET', **kwargs):
+ """Context manager returning a web request pre-linked to a client cnx
+
+ To commit and rollback use::
+
+ req.cnx.commit()
+ req.cnx.rolback()
+ """
+ req = self.requestcls(self._repo.vreg, url=url, headers=headers,
+ method=method, form=kwargs)
+ with self._session.new_cnx() as cnx:
+ req.set_cnx(cnx)
+ yield req
+
+ def close(self):
+ """Close the session associated to the RepoAccess"""
+ if self._session is not None:
+ self._repo.close(self._session.sessionid)
+ self._session = None
+
+ @contextmanager
+ def shell(self):
+ from cubicweb.server.migractions import ServerMigrationHelper
+ with self._session.new_cnx() as cnx:
+ mih = ServerMigrationHelper(None, repo=self._repo, cnx=cnx,
+ interactive=False,
+ # hack so it don't try to load fs schema
+ schema=1)
+ yield mih
+ cnx.commit()
+
+
+# base class for cubicweb tests requiring a full cw environments ###############
+
+class CubicWebTC(TestCase):
+ """abstract class for test using an apptest environment
+
+ attributes:
+
+ * `vreg`, the vregistry
+ * `schema`, self.vreg.schema
+ * `config`, cubicweb configuration
+ * `cnx`, repoapi connection to the repository using an admin user
+ * `session`, server side session associated to `cnx`
+ * `app`, the cubicweb publisher (for web testing)
+ * `repo`, the repository object
+ * `admlogin`, login of the admin user
+ * `admpassword`, password of the admin user
+ * `shell`, create and use shell environment
+ * `anonymous_allowed`: flag telling if anonymous browsing should be allowed
+ """
+ appid = 'data'
+ configcls = devtools.ApptestConfiguration
+ requestcls = fake.FakeRequest
+ tags = TestCase.tags | Tags('cubicweb', 'cw_repo')
+ test_db_id = DEFAULT_EMPTY_DB_ID
+
+ # anonymous is logged by default in cubicweb test cases
+ anonymous_allowed = True
+
+ def __init__(self, *args, **kwargs):
+ self._admin_session = None
+ self.repo = None
+ self._open_access = set()
+ super(CubicWebTC, self).__init__(*args, **kwargs)
+
+ # repository connection handling ###########################################
+
+ def new_access(self, login):
+ """provide a new RepoAccess object for a given user
+
+ The access is automatically closed at the end of the test."""
+ login = text_type(login)
+ access = RepoAccess(self.repo, login, self.requestcls)
+ self._open_access.add(access)
+ return access
+
+ def _close_access(self):
+ while self._open_access:
+ try:
+ self._open_access.pop().close()
+ except BadConnectionId:
+ continue # already closed
+
+ @property
+ def session(self):
+ """return admin session"""
+ return self._admin_session
+
+ # XXX this doesn't need to a be classmethod anymore
+ def _init_repo(self):
+ """init the repository and connection to it.
+ """
+ # get or restore and working db.
+ db_handler = devtools.get_test_db_handler(self.config, self.init_config)
+ db_handler.build_db_cache(self.test_db_id, self.pre_setup_database)
+ db_handler.restore_database(self.test_db_id)
+ self.repo = db_handler.get_repo(startup=True)
+ # get an admin session (without actual login)
+ login = text_type(db_handler.config.default_admin_config['login'])
+ self.admin_access = self.new_access(login)
+ self._admin_session = self.admin_access._session
+
+ # config management ########################################################
+
+ @classproperty
+ def config(cls):
+ """return the configuration object
+
+ Configuration is cached on the test class.
+ """
+ if cls is CubicWebTC:
+ # Prevent direct use of CubicWebTC directly to avoid database
+ # caching issues
+ return None
+ try:
+ return cls.__dict__['_config']
+ except KeyError:
+ home = abspath(join(dirname(sys.modules[cls.__module__].__file__), cls.appid))
+ config = cls._config = cls.configcls(cls.appid, apphome=home)
+ config.mode = 'test'
+ return config
+
+ @classmethod # XXX could be turned into a regular method
+ def init_config(cls, config):
+ """configuration initialization hooks.
+
+ You may only want to override here the configuraton logic.
+
+ Otherwise, consider to use a different :class:`ApptestConfiguration`
+ defined in the `configcls` class attribute.
+
+ This method will be called by the database handler once the config has
+ been properly bootstrapped.
+ """
+ admincfg = config.default_admin_config
+ cls.admlogin = text_type(admincfg['login'])
+ cls.admpassword = admincfg['password']
+ # uncomment the line below if you want rql queries to be logged
+ # config.global_set_option('query-log-file',
+ # '/tmp/test_rql_log.' + `os.getpid()`)
+ config.global_set_option('log-file', None)
+ # set default-dest-addrs to a dumb email address to avoid mailbox or
+ # mail queue pollution
+ config.global_set_option('default-dest-addrs', ['whatever'])
+ send_to = '%s@logilab.fr' % getlogin()
+ config.global_set_option('sender-addr', send_to)
+ config.global_set_option('default-dest-addrs', send_to)
+ config.global_set_option('sender-name', 'cubicweb-test')
+ config.global_set_option('sender-addr', 'cubicweb-test@logilab.fr')
+ # default_base_url on config class isn't enough for TestServerConfiguration
+ config.global_set_option('base-url', config.default_base_url())
+ # web resources
+ try:
+ config.global_set_option('embed-allowed', re.compile('.*'))
+ except Exception: # not in server only configuration
+ pass
+
+ @property
+ def vreg(self):
+ return self.repo.vreg
+
+ # global resources accessors ###############################################
+
+ @property
+ def schema(self):
+ """return the application schema"""
+ return self.vreg.schema
+
+ def set_option(self, optname, value):
+ self.config.global_set_option(optname, value)
+
+ def set_debug(self, debugmode):
+ server.set_debug(debugmode)
+
+ def debugged(self, debugmode):
+ return server.debugged(debugmode)
+
+ # default test setup and teardown #########################################
+
+ def setUp(self):
+ # monkey patch send mail operation so emails are sent synchronously
+ self._patch_SendMailOp()
+ with pause_trace():
+ previous_failure = self.__class__.__dict__.get('_repo_init_failed')
+ if previous_failure is not None:
+ self.skipTest('repository is not initialised: %r' % previous_failure)
+ try:
+ self._init_repo()
+ except Exception as ex:
+ self.__class__._repo_init_failed = ex
+ raise
+ self.addCleanup(self._close_access)
+ self.config.set_anonymous_allowed(self.anonymous_allowed)
+ self.setup_database()
+ MAILBOX[:] = [] # reset mailbox
+
+ def tearDown(self):
+ # XXX hack until logilab.common.testlib is fixed
+ if self._admin_session is not None:
+ self.repo.close(self._admin_session.sessionid)
+ self._admin_session = None
+ while self._cleanups:
+ cleanup, args, kwargs = self._cleanups.pop(-1)
+ cleanup(*args, **kwargs)
+ self.repo.turn_repo_off()
+
+ def _patch_SendMailOp(self):
+ # monkey patch send mail operation so emails are sent synchronously
+ _old_mail_postcommit_event = SendMailOp.postcommit_event
+ SendMailOp.postcommit_event = SendMailOp.sendmails
+
+ def reverse_SendMailOp_monkey_patch():
+ SendMailOp.postcommit_event = _old_mail_postcommit_event
+
+ self.addCleanup(reverse_SendMailOp_monkey_patch)
+
+ def setup_database(self):
+ """add your database setup code by overriding this method"""
+
+ @classmethod
+ def pre_setup_database(cls, cnx, config):
+ """add your pre database setup code by overriding this method
+
+ Do not forget to set the cls.test_db_id value to enable caching of the
+ result.
+ """
+
+ # user / session management ###############################################
+
+ @deprecated('[3.19] explicitly use RepoAccess object in test instead')
+ def user(self, req=None):
+ """return the application schema"""
+ if req is None:
+ return self.request().user
+ else:
+ return req.user
+
+ @iclassmethod # XXX turn into a class method
+ def create_user(self, req, login=None, groups=('users',), password=None,
+ email=None, commit=True, **kwargs):
+ """create and return a new user entity"""
+ if password is None:
+ password = login
+ if login is not None:
+ login = text_type(login)
+ user = req.create_entity('CWUser', login=login,
+ upassword=password, **kwargs)
+ req.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)'
+ % ','.join(repr(str(g)) for g in groups),
+ {'x': user.eid})
+ if email is not None:
+ req.create_entity('EmailAddress', address=text_type(email),
+ reverse_primary_email=user)
+ user.cw_clear_relation_cache('in_group', 'subject')
+ if commit:
+ try:
+ req.commit() # req is a session
+ except AttributeError:
+ req.cnx.commit()
+ return user
+
+ # other utilities #########################################################
+
+ @contextmanager
+ def temporary_appobjects(self, *appobjects):
+ self.vreg._loadedmods.setdefault(self.__module__, {})
+ for obj in appobjects:
+ self.vreg.register(obj)
+ registered = getattr(obj, '__registered__', None)
+ if registered:
+ for registry in obj.__registries__:
+ registered(self.vreg[registry])
+ try:
+ yield
+ finally:
+ for obj in appobjects:
+ self.vreg.unregister(obj)
+
+ @contextmanager
+ def temporary_permissions(self, *perm_overrides, **perm_kwoverrides):
+ """Set custom schema permissions within context.
+
+ There are two ways to call this method, which may be used together :
+
+ * using positional argument(s):
+
+ .. sourcecode:: python
+
+ rdef = self.schema['CWUser'].rdef('login')
+ with self.temporary_permissions((rdef, {'read': ()})):
+ ...
+
+
+ * using named argument(s):
+
+ .. sourcecode:: python
+
+ with self.temporary_permissions(CWUser={'read': ()}):
+ ...
+
+ Usually the former will be preferred to override permissions on a
+ relation definition, while the latter is well suited for entity types.
+
+ The allowed keys in the permission dictionary depend on the schema type
+ (entity type / relation definition). Resulting permissions will be
+ similar to `orig_permissions.update(partial_perms)`.
+ """
+ torestore = []
+ for erschema, etypeperms in chain(perm_overrides, perm_kwoverrides.items()):
+ if isinstance(erschema, string_types):
+ erschema = self.schema[erschema]
+ for action, actionperms in etypeperms.items():
+ origperms = erschema.permissions[action]
+ erschema.set_action_permissions(action, actionperms)
+ torestore.append([erschema, action, origperms])
+ try:
+ yield
+ finally:
+ for erschema, action, permissions in torestore:
+ if action is None:
+ erschema.permissions = permissions
+ else:
+ erschema.set_action_permissions(action, permissions)
+
+ def assertModificationDateGreater(self, entity, olddate):
+ entity.cw_attr_cache.pop('modification_date', None)
+ self.assertGreater(entity.modification_date, olddate)
+
+ def assertMessageEqual(self, req, params, expected_msg):
+ msg = req.session.data[params['_cwmsgid']]
+ self.assertEqual(expected_msg, msg)
+
+ # workflow utilities #######################################################
+
+ def assertPossibleTransitions(self, entity, expected):
+ transitions = entity.cw_adapt_to('IWorkflowable').possible_transitions()
+ self.assertListEqual(sorted(tr.name for tr in transitions),
+ sorted(expected))
+
+ # views and actions registries inspection ##################################
+
+ def pviews(self, req, rset):
+ return sorted((a.__regid__, a.__class__)
+ for a in self.vreg['views'].possible_views(req, rset=rset))
+
+ def pactions(self, req, rset,
+ skipcategories=('addrelated', 'siteactions', 'useractions',
+ 'footer', 'manage')):
+ return [(a.__regid__, a.__class__)
+ for a in self.vreg['actions'].poss_visible_objects(req, rset=rset)
+ if a.category not in skipcategories]
+
+ def pactions_by_cats(self, req, rset, categories=('addrelated',)):
+ return [(a.__regid__, a.__class__)
+ for a in self.vreg['actions'].poss_visible_objects(req, rset=rset)
+ if a.category in categories]
+
+ def pactionsdict(self, req, rset,
+ skipcategories=('addrelated', 'siteactions', 'useractions',
+ 'footer', 'manage')):
+ res = {}
+ for a in self.vreg['actions'].poss_visible_objects(req, rset=rset):
+ if a.category not in skipcategories:
+ res.setdefault(a.category, []).append(a.__class__)
+ return res
+
+ def action_submenu(self, req, rset, id):
+ return self._test_action(self.vreg['actions'].select(id, req, rset=rset))
+
+ def _test_action(self, action):
+ class fake_menu(list):
+ @property
+ def items(self):
+ return self
+
+ class fake_box(object):
+ def action_link(self, action, **kwargs):
+ return (action.title, action.url())
+ submenu = fake_menu()
+ action.fill_menu(fake_box(), submenu)
+ return submenu
+
+ def list_views_for(self, rset):
+ """returns the list of views that can be applied on `rset`"""
+ req = rset.req
+ only_once_vids = ('primary', 'secondary', 'text')
+ req.data['ex'] = ValueError("whatever")
+ viewsvreg = self.vreg['views']
+ for vid, views in viewsvreg.items():
+ if vid[0] == '_':
+ continue
+ if rset.rowcount > 1 and vid in only_once_vids:
+ continue
+ views = [view for view in views
+ if view.category != 'startupview'
+ and not issubclass(view, notification.NotificationView)
+ and not isinstance(view, class_deprecated)]
+ if views:
+ try:
+ view = viewsvreg._select_best(views, req, rset=rset)
+ if view is None:
+ raise NoSelectableObject((req,), {'rset': rset}, views)
+ if view.linkable():
+ yield view
+ else:
+ not_selected(self.vreg, view)
+ # else the view is expected to be used as subview and should
+ # not be tested directly
+ except NoSelectableObject:
+ continue
+
+ def list_actions_for(self, rset):
+ """returns the list of actions that can be applied on `rset`"""
+ req = rset.req
+ for action in self.vreg['actions'].possible_objects(req, rset=rset):
+ yield action
+
+ def list_boxes_for(self, rset):
+ """returns the list of boxes that can be applied on `rset`"""
+ req = rset.req
+ for box in self.vreg['ctxcomponents'].possible_objects(req, rset=rset):
+ yield box
+
+ def list_startup_views(self):
+ """returns the list of startup views"""
+ with self.admin_access.web_request() as req:
+ for view in self.vreg['views'].possible_views(req, None):
+ if view.category == 'startupview':
+ yield view.__regid__
+ else:
+ not_selected(self.vreg, view)
+
+ # web ui testing utilities #################################################
+
+ @property
+ @cached
+ def app(self):
+ """return a cubicweb publisher"""
+ publisher = application.CubicWebPublisher(self.repo, self.config)
+
+ def raise_error_handler(*args, **kwargs):
+ raise
+
+ publisher.error_handler = raise_error_handler
+ return publisher
+
+ @deprecated('[3.19] use the .remote_calling method')
+ def remote_call(self, fname, *args):
+ """remote json call simulation"""
+ dump = json.dumps
+ args = [dump(arg) for arg in args]
+ req = self.request(fname=fname, pageid='123', arg=args)
+ ctrl = self.vreg['controllers'].select('ajax', req)
+ return ctrl.publish(), req
+
+ @contextmanager
+ def remote_calling(self, fname, *args):
+ """remote json call simulation"""
+ args = [json.dumps(arg) for arg in args]
+ with self.admin_access.web_request(fname=fname, pageid='123', arg=args) as req:
+ ctrl = self.vreg['controllers'].select('ajax', req)
+ yield ctrl.publish(), req
+
+ def app_handle_request(self, req, path='view'):
+ return self.app.core_handle(req, path)
+
+ @deprecated("[3.15] app_handle_request is the new and better way"
+ " (beware of small semantic changes)")
+ def app_publish(self, *args, **kwargs):
+ return self.app_handle_request(*args, **kwargs)
+
+ def ctrl_publish(self, req, ctrl='edit', rset=None):
+ """call the publish method of the edit controller"""
+ ctrl = self.vreg['controllers'].select(ctrl, req, appli=self.app)
+ try:
+ result = ctrl.publish(rset)
+ req.cnx.commit()
+ except web.Redirect:
+ req.cnx.commit()
+ raise
+ return result
+
+ @staticmethod
+ def fake_form(formid, field_dict=None, entity_field_dicts=()):
+ """Build _cw.form dictionnary to fake posting of some standard cubicweb form
+
+ * `formid`, the form id, usually form's __regid__
+
+ * `field_dict`, dictionary of name:value for fields that are not tied to an entity
+
+ * `entity_field_dicts`, list of (entity, dictionary) where dictionary contains name:value
+ for fields that are not tied to the given entity
+ """
+ assert field_dict or entity_field_dicts, \
+ 'field_dict and entity_field_dicts arguments must not be both unspecified'
+ if field_dict is None:
+ field_dict = {}
+ form = {'__form_id': formid}
+ fields = []
+ for field, value in field_dict.items():
+ fields.append(field)
+ form[field] = value
+
+ def _add_entity_field(entity, field, value):
+ entity_fields.append(field)
+ form[eid_param(field, entity.eid)] = value
+
+ for entity, field_dict in entity_field_dicts:
+ if '__maineid' not in form:
+ form['__maineid'] = entity.eid
+ entity_fields = []
+ form.setdefault('eid', []).append(entity.eid)
+ _add_entity_field(entity, '__type', entity.cw_etype)
+ for field, value in field_dict.items():
+ _add_entity_field(entity, field, value)
+ if entity_fields:
+ form[eid_param('_cw_entity_fields', entity.eid)] = ','.join(entity_fields)
+ if fields:
+ form['_cw_fields'] = ','.join(sorted(fields))
+ return form
+
+ @deprecated('[3.19] use .admin_request_from_url instead')
+ def req_from_url(self, url):
+ """parses `url` and builds the corresponding CW-web request
+
+ req.form will be setup using the url's query string
+ """
+ req = self.request(url=url)
+ if isinstance(url, unicode):
+ url = url.encode(req.encoding) # req.setup_params() expects encoded strings
+ querystring = urlparse(url)[-2]
+ params = parse_qs(querystring)
+ req.setup_params(params)
+ return req
+
+ @contextmanager
+ def admin_request_from_url(self, url):
+ """parses `url` and builds the corresponding CW-web request
+
+ req.form will be setup using the url's query string
+ """
+ with self.admin_access.web_request(url=url) as req:
+ if isinstance(url, unicode):
+ url = url.encode(req.encoding) # req.setup_params() expects encoded strings
+ querystring = urlparse(url)[-2]
+ params = parse_qs(querystring)
+ req.setup_params(params)
+ yield req
+
+ def url_publish(self, url, data=None):
+ """takes `url`, uses application's app_resolver to find the appropriate
+ controller and result set, then publishes the result.
+
+ To simulate post of www-form-encoded data, give a `data` dictionary
+ containing desired key/value associations.
+
+ This should pretty much correspond to what occurs in a real CW server
+ except the apache-rewriter component is not called.
+ """
+ with self.admin_request_from_url(url) as req:
+ if data is not None:
+ req.form.update(data)
+ ctrlid, rset = self.app.url_resolver.process(req, req.relative_path(False))
+ return self.ctrl_publish(req, ctrlid, rset)
+
+ def http_publish(self, url, data=None):
+ """like `url_publish`, except this returns a http response, even in case
+ of errors. You may give form parameters using the `data` argument.
+ """
+ with self.admin_request_from_url(url) as req:
+ if data is not None:
+ req.form.update(data)
+ with real_error_handling(self.app):
+ result = self.app_handle_request(req, req.relative_path(False))
+ return result, req
+
+ @staticmethod
+ def _parse_location(req, location):
+ try:
+ path, params = location.split('?', 1)
+ except ValueError:
+ path = location
+ params = {}
+ else:
+ cleanup = lambda p: (p[0], urlunquote(p[1]))
+ params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p)
+ if path.startswith(req.base_url()): # may be relative
+ path = path[len(req.base_url()):]
+ return path, params
+
+ def expect_redirect(self, callback, req):
+ """call the given callback with req as argument, expecting to get a
+ Redirect exception
+ """
+ try:
+ callback(req)
+ except Redirect as ex:
+ return self._parse_location(req, ex.location)
+ else:
+ self.fail('expected a Redirect exception')
+
+ def expect_redirect_handle_request(self, req, path='edit'):
+ """call the publish method of the application publisher, expecting to
+ get a Redirect exception
+ """
+ self.app_handle_request(req, path)
+ self.assertTrue(300 <= req.status_out < 400, req.status_out)
+ location = req.get_response_header('location')
+ return self._parse_location(req, location)
+
+ @deprecated("[3.15] expect_redirect_handle_request is the new and better way"
+ " (beware of small semantic changes)")
+ def expect_redirect_publish(self, *args, **kwargs):
+ return self.expect_redirect_handle_request(*args, **kwargs)
+
+ def set_auth_mode(self, authmode, anonuser=None):
+ self.set_option('auth-mode', authmode)
+ self.set_option('anonymous-user', anonuser)
+ if anonuser is None:
+ self.config.anonymous_credential = None
+ else:
+ self.config.anonymous_credential = (anonuser, anonuser)
+
+ def init_authentication(self, authmode, anonuser=None):
+ self.set_auth_mode(authmode, anonuser)
+ req = self.requestcls(self.vreg, url='login')
+ sh = self.app.session_handler
+ authm = sh.session_manager.authmanager
+ authm.anoninfo = self.vreg.config.anonymous_user()
+ authm.anoninfo = authm.anoninfo[0], {'password': authm.anoninfo[1]}
+ # not properly cleaned between tests
+ self.open_sessions = sh.session_manager._sessions = {}
+ return req, self.session
+
+ def assertAuthSuccess(self, req, origsession, nbsessions=1):
+ sh = self.app.session_handler
+ session = self.app.get_session(req)
+ cnx = repoapi.Connection(session)
+ req.set_cnx(cnx)
+ self.assertEqual(len(self.open_sessions), nbsessions, self.open_sessions)
+ self.assertEqual(session.login, origsession.login)
+ self.assertEqual(session.anonymous_session, False)
+
+ def assertAuthFailure(self, req, nbsessions=0):
+ with self.assertRaises(AuthenticationError):
+ self.app.get_session(req)
+ # +0 since we do not track the opened session
+ self.assertEqual(len(self.open_sessions), nbsessions)
+ clear_cache(req, 'get_authorization')
+
+ # content validation #######################################################
+
+ # validators are used to validate (XML, DTD, whatever) view's content
+ # validators availables are :
+ # DTDValidator : validates XML + declared DTD
+ # SaxOnlyValidator : guarantees XML is well formed
+ # None : do not try to validate anything
+ # validators used must be imported from from.devtools.htmlparser
+ content_type_validators = {
+ # maps MIME type : validator name
+ #
+ # do not set html validators here, we need HTMLValidator for html
+ # snippets
+ # 'text/html': DTDValidator,
+ # 'application/xhtml+xml': DTDValidator,
+ 'application/xml': htmlparser.XMLValidator,
+ 'text/xml': htmlparser.XMLValidator,
+ 'application/json': JsonValidator,
+ 'text/plain': None,
+ 'text/comma-separated-values': None,
+ 'text/x-vcard': None,
+ 'text/calendar': None,
+ 'image/png': None,
+ }
+ # maps vid : validator name (override content_type_validators)
+ vid_validators = dict((vid, htmlparser.VALMAP[valkey])
+ for vid, valkey in VIEW_VALIDATORS.items())
+
+ def view(self, vid, rset=None, req=None, template='main-template',
+ **kwargs):
+ """This method tests the view `vid` on `rset` using `template`
+
+ If no error occurred while rendering the view, the HTML is analyzed
+ and parsed.
+
+ :returns: an instance of `cubicweb.devtools.htmlparser.PageInfo`
+ encapsulation the generated HTML
+ """
+ if req is None:
+ if rset is None:
+ req = self.request()
+ else:
+ req = rset.req
+ req.form['vid'] = vid
+ viewsreg = self.vreg['views']
+ view = viewsreg.select(vid, req, rset=rset, **kwargs)
+ # set explicit test description
+ if rset is not None:
+ # coerce to "bytes" on py2 because the description will be sent to
+ # sys.stdout/stderr which takes "bytes" on py2 and "unicode" on py3
+ rql = str(rset.printable_rql())
+ self.set_description("testing vid=%s defined in %s with (%s)" % (
+ vid, view.__module__, rql))
+ else:
+ self.set_description("testing vid=%s defined in %s without rset" % (
+ vid, view.__module__))
+ if template is None: # raw view testing, no template
+ viewfunc = view.render
+ else:
+ kwargs['view'] = view
+ viewfunc = lambda **k: viewsreg.main_template(req, template,
+ rset=rset, **kwargs)
+ return self._test_view(viewfunc, view, template, kwargs)
+
+ def _test_view(self, viewfunc, view, template='main-template', kwargs={}):
+ """this method does the actual call to the view
+
+ If no error occurred while rendering the view, the HTML is analyzed
+ and parsed.
+
+ :returns: an instance of `cubicweb.devtools.htmlparser.PageInfo`
+ encapsulation the generated HTML
+ """
+ try:
+ output = viewfunc(**kwargs)
+ except Exception:
+ # hijack exception: generative tests stop when the exception
+ # is not an AssertionError
+ klass, exc, tcbk = sys.exc_info()
+ try:
+ msg = '[%s in %s] %s' % (klass, view.__regid__, exc)
+ except Exception:
+ msg = '[%s in %s] undisplayable exception' % (klass, view.__regid__)
+ exc = AssertionError(msg)
+ exc.__traceback__ = tcbk
+ raise exc
+ return self._check_html(output, view, template)
+
+ def get_validator(self, view=None, content_type=None, output=None):
+ if view is not None:
+ try:
+ return self.vid_validators[view.__regid__]()
+ except KeyError:
+ if content_type is None:
+ content_type = view.content_type
+ if content_type is None:
+ content_type = 'text/html'
+ if content_type in ('text/html', 'application/xhtml+xml') and output:
+ if output.startswith(b'<!DOCTYPE html>'):
+ # only check XML well-formness since HTMLValidator isn't html5
+ # compatible and won't like various other extensions
+ default_validator = htmlparser.XMLSyntaxValidator
+ elif output.startswith(b'<?xml'):
+ default_validator = htmlparser.DTDValidator
+ else:
+ default_validator = htmlparser.HTMLValidator
+ else:
+ default_validator = None
+ validatorclass = self.content_type_validators.get(content_type,
+ default_validator)
+ if validatorclass is None:
+ return
+ return validatorclass()
+
+ @nocoverage
+ def _check_html(self, output, view, template='main-template'):
+ """raises an exception if the HTML is invalid"""
+ output = output.strip()
+ if isinstance(output, text_type):
+ # XXX
+ output = output.encode('utf-8')
+ validator = self.get_validator(view, output=output)
+ if validator is None:
+ return output # return raw output if no validator is defined
+ if isinstance(validator, htmlparser.DTDValidator):
+ # XXX remove <canvas> used in progress widget, unknown in html dtd
+ output = re.sub('<canvas.*?></canvas>', '', output)
+ return self.assertWellFormed(validator, output.strip(), context=view.__regid__)
+
+ def assertWellFormed(self, validator, content, context=None):
+ try:
+ return validator.parse_string(content)
+ except Exception:
+ # hijack exception: generative tests stop when the exception
+ # is not an AssertionError
+ klass, exc, tcbk = sys.exc_info()
+ if context is None:
+ msg = u'[%s]' % (klass,)
+ else:
+ msg = u'[%s in %s]' % (klass, context)
+ msg = msg.encode(sys.getdefaultencoding(), 'replace')
+
+ try:
+ str_exc = str(exc)
+ except Exception:
+ str_exc = 'undisplayable exception'
+ msg += str_exc.encode(sys.getdefaultencoding(), 'replace')
+ if content is not None:
+ position = getattr(exc, "position", (0,))[0]
+ if position:
+ # define filter
+ if isinstance(content, str):
+ content = unicode(content, sys.getdefaultencoding(), 'replace')
+ content = validator.preprocess_data(content)
+ content = content.splitlines()
+ width = int(log(len(content), 10)) + 1
+ line_template = " %" + ("%i" % width) + "i: %s"
+ # XXX no need to iterate the whole file except to get
+ # the line number
+ content = u'\n'.join(line_template % (idx + 1, line)
+ for idx, line in enumerate(content)
+ if line_context_filter(idx+1, position))
+ msg += u'\nfor content:\n%s' % content
+ exc = AssertionError(msg)
+ exc.__traceback__ = tcbk
+ raise exc
+
+ def assertDocTestFile(self, testfile):
+ # doctest returns tuple (failure_count, test_count)
+ with self.admin_access.shell() as mih:
+ result = mih.process_script(testfile)
+ if result[0] and result[1]:
+ raise self.failureException("doctest file '%s' failed"
+ % testfile)
+
+ # notifications ############################################################
+
+ def assertSentEmail(self, subject, recipients=None, nb_msgs=None):
+ """test recipients in system mailbox for given email subject
+
+ :param subject: email subject to find in mailbox
+ :param recipients: list of email recipients
+ :param nb_msgs: expected number of entries
+ :returns: list of matched emails
+ """
+ messages = [email for email in MAILBOX
+ if email.message.get('Subject') == subject]
+ if recipients is not None:
+ sent_to = set()
+ for msg in messages:
+ sent_to.update(msg.recipients)
+ self.assertSetEqual(set(recipients), sent_to)
+ if nb_msgs is not None:
+ self.assertEqual(len(MAILBOX), nb_msgs)
+ return messages
+
+
+# auto-populating test classes and utilities ###################################
+
+from cubicweb.devtools.fill import insert_entity_queries, make_relations_queries
+
+# XXX cleanup unprotected_entities & all mess
+
+
+def how_many_dict(schema, cnx, how_many, skip):
+ """given a schema, compute how many entities by type we need to be able to
+ satisfy relations cardinality.
+
+ The `how_many` argument tells how many entities of which type we want at
+ least.
+
+ Return a dictionary with entity types as key, and the number of entities for
+ this type as value.
+ """
+ relmap = {}
+ for rschema in schema.relations():
+ if rschema.final:
+ continue
+ for subj, obj in rschema.rdefs:
+ card = rschema.rdef(subj, obj).cardinality
+ # if the relation is mandatory, we'll need at least as many subj and
+ # obj to satisfy it
+ if card[0] in '1+' and card[1] in '1?':
+ # subj has to be linked to at least one obj,
+ # but obj can be linked to only one subj
+ # -> we need at least as many subj as obj to satisfy
+ # cardinalities for this relation
+ relmap.setdefault((rschema, subj), []).append(str(obj))
+ if card[1] in '1+' and card[0] in '1?':
+ # reverse subj and obj in the above explanation
+ relmap.setdefault((rschema, obj), []).append(str(subj))
+ unprotected = unprotected_entities(schema)
+ for etype in skip: # XXX (syt) duh? explain or kill
+ unprotected.add(etype)
+ howmanydict = {}
+ # step 1, compute a base number of each entity types: number of already
+ # existing entities of this type + `how_many`
+ for etype in unprotected_entities(schema, strict=True):
+ howmanydict[str(etype)] = cnx.execute('Any COUNT(X) WHERE X is %s' % etype)[0][0]
+ if etype in unprotected:
+ howmanydict[str(etype)] += how_many
+ # step 2, augment nb entity per types to satisfy cardinality constraints,
+ # by recomputing for each relation that constrained an entity type:
+ #
+ # new num for etype = max(current num, sum(num for possible target etypes))
+ #
+ # XXX we should first check there is no cycle then propagate changes
+ for (rschema, etype), targets in relmap.items():
+ relfactor = sum(howmanydict[e] for e in targets)
+ howmanydict[str(etype)] = max(relfactor, howmanydict[etype])
+ return howmanydict
+
+
+class AutoPopulateTest(CubicWebTC):
+ """base class for test with auto-populating of the database"""
+ __abstract__ = True
+
+ test_db_id = 'autopopulate'
+
+ tags = CubicWebTC.tags | Tags('autopopulated')
+
+ pdbclass = CubicWebDebugger
+ # this is a hook to be able to define a list of rql queries
+ # that are application dependent and cannot be guessed automatically
+ application_rql = []
+
+ no_auto_populate = ()
+ ignored_relations = set()
+
+ def to_test_etypes(self):
+ return unprotected_entities(self.schema, strict=True)
+
+ def custom_populate(self, how_many, cnx):
+ pass
+
+ def post_populate(self, cnx):
+ pass
+
+ @nocoverage
+ def auto_populate(self, how_many):
+ """this method populates the database with `how_many` entities
+ of each possible type. It also inserts random relations between them
+ """
+ with self.admin_access.cnx() as cnx:
+ with cnx.security_enabled(read=False, write=False):
+ self._auto_populate(cnx, how_many)
+ cnx.commit()
+
+ def _auto_populate(self, cnx, how_many):
+ self.custom_populate(how_many, cnx)
+ vreg = self.vreg
+ howmanydict = how_many_dict(self.schema, cnx, how_many, self.no_auto_populate)
+ for etype in unprotected_entities(self.schema):
+ if etype in self.no_auto_populate:
+ continue
+ nb = howmanydict.get(etype, how_many)
+ for rql, args in insert_entity_queries(etype, self.schema, vreg, nb):
+ cnx.execute(rql, args)
+ edict = {}
+ for etype in unprotected_entities(self.schema, strict=True):
+ rset = cnx.execute('%s X' % etype)
+ edict[str(etype)] = set(row[0] for row in rset.rows)
+ existingrels = {}
+ ignored_relations = SYSTEM_RELATIONS | self.ignored_relations
+ for rschema in self.schema.relations():
+ if rschema.final or rschema in ignored_relations:
+ continue
+ rset = cnx.execute('DISTINCT Any X,Y WHERE X %s Y' % rschema)
+ existingrels.setdefault(rschema.type, set()).update((x, y) for x, y in rset)
+ q = make_relations_queries(self.schema, edict, cnx, ignored_relations,
+ existingrels=existingrels)
+ for rql, args in q:
+ try:
+ cnx.execute(rql, args)
+ except ValidationError as ex:
+ # failed to satisfy some constraint
+ print('error in automatic db population', ex)
+ cnx.commit_state = None # reset uncommitable flag
+ self.post_populate(cnx)
+
+ def iter_individual_rsets(self, etypes=None, limit=None):
+ etypes = etypes or self.to_test_etypes()
+ with self.admin_access.web_request() as req:
+ for etype in etypes:
+ if limit:
+ rql = 'Any X LIMIT %s WHERE X is %s' % (limit, etype)
+ else:
+ rql = 'Any X WHERE X is %s' % etype
+ rset = req.execute(rql)
+ for row in range(len(rset)):
+ if limit and row > limit:
+ break
+ # XXX iirk
+ rset2 = rset.limit(limit=1, offset=row)
+ yield rset2
+
+ def iter_automatic_rsets(self, limit=10):
+ """generates basic resultsets for each entity type"""
+ etypes = self.to_test_etypes()
+ if not etypes:
+ return
+ with self.admin_access.web_request() as req:
+ for etype in etypes:
+ yield req.execute('Any X LIMIT %s WHERE X is %s' % (limit, etype))
+ etype1 = etypes.pop()
+ try:
+ etype2 = etypes.pop()
+ except KeyError:
+ etype2 = etype1
+ # test a mixed query (DISTINCT/GROUP to avoid getting duplicate
+ # X which make muledit view failing for instance (html validation fails
+ # because of some duplicate "id" attributes)
+ yield req.execute('DISTINCT Any X, MAX(Y) GROUPBY X WHERE X is %s, Y is %s' %
+ (etype1, etype2))
+ # test some application-specific queries if defined
+ for rql in self.application_rql:
+ yield req.execute(rql)
+
+ def _test_everything_for(self, rset):
+ """this method tries to find everything that can be tested
+ for `rset` and yields a callable test (as needed in generative tests)
+ """
+ propdefs = self.vreg['propertydefs']
+ # make all components visible
+ for k, v in propdefs.items():
+ if k.endswith('visible') and not v['default']:
+ propdefs[k]['default'] = True
+ for view in self.list_views_for(rset):
+ backup_rset = rset.copy(rset.rows, rset.description)
+ yield InnerTest(self._testname(rset, view.__regid__, 'view'),
+ self.view, view.__regid__, rset,
+ rset.req.reset_headers(), 'main-template')
+ # We have to do this because some views modify the
+ # resultset's syntax tree
+ rset = backup_rset
+ for action in self.list_actions_for(rset):
+ yield InnerTest(self._testname(rset, action.__regid__, 'action'),
+ self._test_action, action)
+ for box in self.list_boxes_for(rset):
+ w = [].append
+ yield InnerTest(self._testname(rset, box.__regid__, 'box'), box.render, w)
+
+ @staticmethod
+ def _testname(rset, objid, objtype):
+ return '%s_%s_%s' % ('_'.join(rset.column_types(0)), objid, objtype)
+
+
+# concrete class for automated application testing ############################
+
+class AutomaticWebTest(AutoPopulateTest):
+ """import this if you wan automatic tests to be ran"""
+
+ tags = AutoPopulateTest.tags | Tags('web', 'generated')
+
+ def setUp(self):
+ if self.__class__ is AutomaticWebTest:
+ # Prevent direct use of AutomaticWebTest to avoid database caching
+ # issues.
+ return
+ super(AutomaticWebTest, self).setUp()
+
+ # access to self.app for proper initialization of the authentication
+ # machinery (else some views may fail)
+ self.app
+
+ def test_one_each_config(self):
+ self.auto_populate(1)
+ for rset in self.iter_automatic_rsets(limit=1):
+ for testargs in self._test_everything_for(rset):
+ yield testargs
+
+ def test_ten_each_config(self):
+ self.auto_populate(10)
+ for rset in self.iter_automatic_rsets(limit=10):
+ for testargs in self._test_everything_for(rset):
+ yield testargs
+
+ def test_startup_views(self):
+ for vid in self.list_startup_views():
+ with self.admin_access.web_request() as req:
+ yield self.view, vid, None, req
+
+
+# registry instrumentization ###################################################
+
+def not_selected(vreg, appobject):
+ try:
+ vreg._selected[appobject.__class__] -= 1
+ except (KeyError, AttributeError):
+ pass
+
+
+# def vreg_instrumentize(testclass):
+# # XXX broken
+# from cubicweb.devtools.apptest import TestEnvironment
+# env = testclass._env = TestEnvironment('data', configcls=testclass.configcls)
+# for reg in env.vreg.values():
+# reg._selected = {}
+# try:
+# orig_select_best = reg.__class__.__orig_select_best
+# except Exception:
+# orig_select_best = reg.__class__._select_best
+# def instr_select_best(self, *args, **kwargs):
+# selected = orig_select_best(self, *args, **kwargs)
+# try:
+# self._selected[selected.__class__] += 1
+# except KeyError:
+# self._selected[selected.__class__] = 1
+# except AttributeError:
+# pass # occurs on reg used to restore database
+# return selected
+# reg.__class__._select_best = instr_select_best
+# reg.__class__.__orig_select_best = orig_select_best
+
+
+# def print_untested_objects(testclass, skipregs=('hooks', 'etypes')):
+# for regname, reg in testclass._env.vreg.items():
+# if regname in skipregs:
+# continue
+# for appobjects in reg.values():
+# for appobject in appobjects:
+# if not reg._selected.get(appobject):
+# print 'not tested', regname, appobject