--- a/devtools/testlib.py Tue Dec 15 08:55:58 2015 +0100
+++ b/devtools/testlib.py Wed Dec 09 16:32:57 2015 +0100
@@ -18,14 +18,11 @@
"""this module contains base classes and utilities for cubicweb tests"""
from __future__ import print_function
-__docformat__ = "restructuredtext en"
-
import sys
import re
from os.path import dirname, join, abspath
from math import log
from contextlib import contextmanager
-from warnings import warn
from itertools import chain
from six import text_type, string_types
@@ -43,7 +40,7 @@
from logilab.common.shellutils import getlogin
from cubicweb import (ValidationError, NoSelectableObject, AuthenticationError,
- ProgrammingError, BadConnectionId)
+ BadConnectionId)
from cubicweb import cwconfig, devtools, web, server, repoapi
from cubicweb.utils import json
from cubicweb.sobjects import notification
@@ -52,7 +49,7 @@
from cubicweb.server.session import Session
from cubicweb.devtools import SYSTEM_ENTITIES, SYSTEM_RELATIONS, VIEW_VALIDATORS
from cubicweb.devtools import fake, htmlparser, DEFAULT_EMPTY_DB_ID
-from cubicweb.utils import json
+
# low-level utilities ##########################################################
@@ -67,6 +64,7 @@
toto.write(data)
webbrowser.open('file:///tmp/toto.html')
+
def line_context_filter(line_no, center, before=3, after=None):
"""return true if line are in context
@@ -76,6 +74,7 @@
after = before
return center - before <= line_no <= center + after
+
def unprotected_entities(schema, strict=False):
"""returned a set of each non final entity type, excluding "system" entities
(eg CWGroup, CWUser...)
@@ -86,10 +85,12 @@
protected_entities = yams.schema.BASE_TYPES.union(SYSTEM_ENTITIES)
return set(schema.entities()) - protected_entities
+
class JsonValidator(object):
def parse_string(self, data):
return json.loads(data.decode('ascii'))
+
@contextmanager
def real_error_handling(app):
"""By default, CubicWebTC `app` attribute (ie the publisher) is monkey
@@ -112,10 +113,12 @@
# restore
app.error_handler = fake_error_handler
+
# email handling, to test emails sent by an application ########################
MAILBOX = []
+
class Email(object):
"""you'll get instances of Email into MAILBOX during tests that trigger
some notification.
@@ -146,13 +149,17 @@
return '<Email to %s with subject %s>' % (','.join(self.recipients),
self.message.get('Subject'))
+
# the trick to get email into MAILBOX instead of actually sent: monkey patch
# cwconfig.SMTP object
class MockSMTP:
+
def __init__(self, server, port):
pass
+
def close(self):
pass
+
def sendmail(self, fromaddr, recipients, msg):
MAILBOX.append(Email(fromaddr, recipients, msg))
@@ -246,7 +253,6 @@
cnx.commit()
-
# base class for cubicweb tests requiring a full cw environments ###############
class CubicWebTC(TestCase):
@@ -296,14 +302,14 @@
try:
self._open_access.pop().close()
except BadConnectionId:
- continue # already closed
+ continue # already closed
@property
def session(self):
"""return admin session"""
return self._admin_session
- #XXX this doesn't need to a be classmethod anymore
+ # XXX this doesn't need to a be classmethod anymore
def _init_repo(self):
"""init the repository and connection to it.
"""
@@ -317,7 +323,6 @@
self.admin_access = self.new_access(login)
self._admin_session = self.admin_access._session
-
# config management ########################################################
@classproperty
@@ -338,7 +343,7 @@
config.mode = 'test'
return config
- @classmethod # XXX could be turned into a regular method
+ @classmethod # XXX could be turned into a regular method
def init_config(cls, config):
"""configuration initialization hooks.
@@ -354,13 +359,13 @@
cls.admlogin = text_type(admincfg['login'])
cls.admpassword = admincfg['password']
# uncomment the line below if you want rql queries to be logged
- #config.global_set_option('query-log-file',
- # '/tmp/test_rql_log.' + `os.getpid()`)
+ # config.global_set_option('query-log-file',
+ # '/tmp/test_rql_log.' + `os.getpid()`)
config.global_set_option('log-file', None)
# set default-dest-addrs to a dumb email address to avoid mailbox or
# mail queue pollution
config.global_set_option('default-dest-addrs', ['whatever'])
- send_to = '%s@logilab.fr' % getlogin()
+ send_to = '%s@logilab.fr' % getlogin()
config.global_set_option('sender-addr', send_to)
config.global_set_option('default-dest-addrs', send_to)
config.global_set_option('sender-name', 'cubicweb-test')
@@ -370,14 +375,13 @@
# web resources
try:
config.global_set_option('embed-allowed', re.compile('.*'))
- except Exception: # not in server only configuration
+ except Exception: # not in server only configuration
pass
@property
def vreg(self):
return self.repo.vreg
-
# global resources accessors ###############################################
@property
@@ -411,7 +415,7 @@
self.addCleanup(self._close_access)
self.config.set_anonymous_allowed(self.anonymous_allowed)
self.setup_database()
- MAILBOX[:] = [] # reset mailbox
+ MAILBOX[:] = [] # reset mailbox
def tearDown(self):
# XXX hack until logilab.common.testlib is fixed
@@ -427,8 +431,10 @@
# monkey patch send mail operation so emails are sent synchronously
_old_mail_postcommit_event = SendMailOp.postcommit_event
SendMailOp.postcommit_event = SendMailOp.sendmails
+
def reverse_SendMailOp_monkey_patch():
SendMailOp.postcommit_event = _old_mail_postcommit_event
+
self.addCleanup(reverse_SendMailOp_monkey_patch)
def setup_database(self):
@@ -452,7 +458,7 @@
else:
return req.user
- @iclassmethod # XXX turn into a class method
+ @iclassmethod # XXX turn into a class method
def create_user(self, req, login=None, groups=('users',), password=None,
email=None, commit=True, **kwargs):
"""create and return a new user entity"""
@@ -471,12 +477,11 @@
user.cw_clear_relation_cache('in_group', 'subject')
if commit:
try:
- req.commit() # req is a session
+ req.commit() # req is a session
except AttributeError:
req.cnx.commit()
return user
-
# other utilities #########################################################
@contextmanager
@@ -555,7 +560,6 @@
self.assertListEqual(sorted(tr.name for tr in transitions),
sorted(expected))
-
# views and actions registries inspection ##################################
def pviews(self, req, rset):
@@ -591,6 +595,7 @@
@property
def items(self):
return self
+
class fake_box(object):
def action_link(self, action, **kwargs):
return (action.title, action.url())
@@ -617,7 +622,7 @@
try:
view = viewsvreg._select_best(views, req, rset=rset)
if view is None:
- raise NoSelectableObject((req,), {'rset':rset}, views)
+ raise NoSelectableObject((req,), {'rset': rset}, views)
if view.linkable():
yield view
else:
@@ -648,7 +653,6 @@
else:
not_selected(self.vreg, view)
-
# web ui testing utilities #################################################
@property
@@ -656,8 +660,10 @@
def app(self):
"""return a cubicweb publisher"""
publisher = application.CubicWebPublisher(self.repo, self.config)
+
def raise_error_handler(*args, **kwargs):
raise
+
publisher.error_handler = raise_error_handler
return publisher
@@ -709,7 +715,7 @@
for fields that are not tied to the given entity
"""
assert field_dict or entity_field_dicts, \
- 'field_dict and entity_field_dicts arguments must not be both unspecified'
+ 'field_dict and entity_field_dicts arguments must not be both unspecified'
if field_dict is None:
field_dict = {}
form = {'__form_id': formid}
@@ -717,9 +723,11 @@
for field, value in field_dict.items():
fields.append(field)
form[field] = value
+
def _add_entity_field(entity, field, value):
entity_fields.append(field)
form[eid_param(field, entity.eid)] = value
+
for entity, field_dict in entity_field_dicts:
if '__maineid' not in form:
form['__maineid'] = entity.eid
@@ -742,7 +750,7 @@
"""
req = self.request(url=url)
if isinstance(url, unicode):
- url = url.encode(req.encoding) # req.setup_params() expects encoded strings
+ url = url.encode(req.encoding) # req.setup_params() expects encoded strings
querystring = urlparse(url)[-2]
params = parse_qs(querystring)
req.setup_params(params)
@@ -756,7 +764,7 @@
"""
with self.admin_access.web_request(url=url) as req:
if isinstance(url, unicode):
- url = url.encode(req.encoding) # req.setup_params() expects encoded strings
+ url = url.encode(req.encoding) # req.setup_params() expects encoded strings
querystring = urlparse(url)[-2]
params = parse_qs(querystring)
req.setup_params(params)
@@ -799,7 +807,7 @@
else:
cleanup = lambda p: (p[0], urlunquote(p[1]))
params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p)
- if path.startswith(req.base_url()): # may be relative
+ if path.startswith(req.base_url()): # may be relative
path = path[len(req.base_url()):]
return path, params
@@ -818,8 +826,8 @@
"""call the publish method of the application publisher, expecting to
get a Redirect exception
"""
- result = self.app_handle_request(req, path)
- self.assertTrue(300 <= req.status_out <400, req.status_out)
+ self.app_handle_request(req, path)
+ self.assertTrue(300 <= req.status_out < 400, req.status_out)
location = req.get_response_header('location')
return self._parse_location(req, location)
@@ -828,7 +836,6 @@
def expect_redirect_publish(self, *args, **kwargs):
return self.expect_redirect_handle_request(*args, **kwargs)
-
def set_auth_mode(self, authmode, anonuser=None):
self.set_option('auth-mode', authmode)
self.set_option('anonymous-user', anonuser)
@@ -877,8 +884,8 @@
#
# do not set html validators here, we need HTMLValidator for html
# snippets
- #'text/html': DTDValidator,
- #'application/xhtml+xml': DTDValidator,
+ # 'text/html': DTDValidator,
+ # 'application/xhtml+xml': DTDValidator,
'application/xml': htmlparser.XMLValidator,
'text/xml': htmlparser.XMLValidator,
'application/json': JsonValidator,
@@ -892,7 +899,6 @@
vid_validators = dict((vid, htmlparser.VALMAP[valkey])
for vid, valkey in VIEW_VALIDATORS.items())
-
def view(self, vid, rset=None, req=None, template='main-template',
**kwargs):
"""This method tests the view `vid` on `rset` using `template`
@@ -921,7 +927,7 @@
else:
self.set_description("testing vid=%s defined in %s without rset" % (
vid, view.__module__))
- if template is None: # raw view testing, no template
+ if template is None: # raw view testing, no template
viewfunc = view.render
else:
kwargs['view'] = view
@@ -929,7 +935,6 @@
rset=rset, **kwargs)
return self._test_view(viewfunc, view, template, kwargs)
-
def _test_view(self, viewfunc, view, template='main-template', kwargs={}):
"""this method does the actual call to the view
@@ -989,11 +994,11 @@
output = output.encode('utf-8')
validator = self.get_validator(view, output=output)
if validator is None:
- return output # return raw output if no validator is defined
+ return output # return raw output if no validator is defined
if isinstance(validator, htmlparser.DTDValidator):
# XXX remove <canvas> used in progress widget, unknown in html dtd
output = re.sub('<canvas.*?></canvas>', '', output)
- return self.assertWellFormed(validator, output.strip(), context= view.__regid__)
+ return self.assertWellFormed(validator, output.strip(), context=view.__regid__)
def assertWellFormed(self, validator, content, context=None):
try:
@@ -1069,6 +1074,7 @@
# XXX cleanup unprotected_entities & all mess
+
def how_many_dict(schema, cnx, how_many, skip):
"""given a schema, compute how many entities by type we need to be able to
satisfy relations cardinality.
@@ -1097,7 +1103,7 @@
# reverse subj and obj in the above explanation
relmap.setdefault((rschema, obj), []).append(str(subj))
unprotected = unprotected_entities(schema)
- for etype in skip: # XXX (syt) duh? explain or kill
+ for etype in skip: # XXX (syt) duh? explain or kill
unprotected.add(etype)
howmanydict = {}
# step 1, compute a base number of each entity types: number of already
@@ -1143,7 +1149,6 @@
def post_populate(self, cnx):
pass
-
@nocoverage
def auto_populate(self, how_many):
"""this method populates the database with `how_many` entities
@@ -1183,7 +1188,7 @@
except ValidationError as ex:
# failed to satisfy some constraint
print('error in automatic db population', ex)
- cnx.commit_state = None # reset uncommitable flag
+ cnx.commit_state = None # reset uncommitable flag
self.post_populate(cnx)
def iter_individual_rsets(self, etypes=None, limit=None):
@@ -1218,7 +1223,8 @@
# test a mixed query (DISTINCT/GROUP to avoid getting duplicate
# X which make muledit view failing for instance (html validation fails
# because of some duplicate "id" attributes)
- yield req.execute('DISTINCT Any X, MAX(Y) GROUPBY X WHERE X is %s, Y is %s' % (etype1, etype2))
+ yield req.execute('DISTINCT Any X, MAX(Y) GROUPBY X WHERE X is %s, Y is %s' %
+ (etype1, etype2))
# test some application-specific queries if defined
for rql in self.application_rql:
yield req.execute(rql)
@@ -1241,7 +1247,8 @@
# resultset's syntax tree
rset = backup_rset
for action in self.list_actions_for(rset):
- yield InnerTest(self._testname(rset, action.__regid__, 'action'), self._test_action, action)
+ yield InnerTest(self._testname(rset, action.__regid__, 'action'),
+ self._test_action, action)
for box in self.list_boxes_for(rset):
w = [].append
yield InnerTest(self._testname(rset, box.__regid__, 'box'), box.render, w)
@@ -1269,21 +1276,18 @@
# machinery (else some views may fail)
self.app
- ## one each
def test_one_each_config(self):
self.auto_populate(1)
for rset in self.iter_automatic_rsets(limit=1):
for testargs in self._test_everything_for(rset):
yield testargs
- ## ten each
def test_ten_each_config(self):
self.auto_populate(10)
for rset in self.iter_automatic_rsets(limit=10):
for testargs in self._test_everything_for(rset):
yield testargs
- ## startup views
def test_startup_views(self):
for vid in self.list_startup_views():
with self.admin_access.web_request() as req: