diff -r 0939ad2edf63 -r 6557833657d6 devtools/testlib.py --- a/devtools/testlib.py Tue Dec 15 08:55:58 2015 +0100 +++ b/devtools/testlib.py Wed Dec 09 16:32:57 2015 +0100 @@ -18,14 +18,11 @@ """this module contains base classes and utilities for cubicweb tests""" from __future__ import print_function -__docformat__ = "restructuredtext en" - import sys import re from os.path import dirname, join, abspath from math import log from contextlib import contextmanager -from warnings import warn from itertools import chain from six import text_type, string_types @@ -43,7 +40,7 @@ from logilab.common.shellutils import getlogin from cubicweb import (ValidationError, NoSelectableObject, AuthenticationError, - ProgrammingError, BadConnectionId) + BadConnectionId) from cubicweb import cwconfig, devtools, web, server, repoapi from cubicweb.utils import json from cubicweb.sobjects import notification @@ -52,7 +49,7 @@ from cubicweb.server.session import Session from cubicweb.devtools import SYSTEM_ENTITIES, SYSTEM_RELATIONS, VIEW_VALIDATORS from cubicweb.devtools import fake, htmlparser, DEFAULT_EMPTY_DB_ID -from cubicweb.utils import json + # low-level utilities ########################################################## @@ -67,6 +64,7 @@ toto.write(data) webbrowser.open('file:///tmp/toto.html') + def line_context_filter(line_no, center, before=3, after=None): """return true if line are in context @@ -76,6 +74,7 @@ after = before return center - before <= line_no <= center + after + def unprotected_entities(schema, strict=False): """returned a set of each non final entity type, excluding "system" entities (eg CWGroup, CWUser...) @@ -86,10 +85,12 @@ protected_entities = yams.schema.BASE_TYPES.union(SYSTEM_ENTITIES) return set(schema.entities()) - protected_entities + class JsonValidator(object): def parse_string(self, data): return json.loads(data.decode('ascii')) + @contextmanager def real_error_handling(app): """By default, CubicWebTC `app` attribute (ie the publisher) is monkey @@ -112,10 +113,12 @@ # restore app.error_handler = fake_error_handler + # email handling, to test emails sent by an application ######################## MAILBOX = [] + class Email(object): """you'll get instances of Email into MAILBOX during tests that trigger some notification. @@ -146,13 +149,17 @@ return '' % (','.join(self.recipients), self.message.get('Subject')) + # the trick to get email into MAILBOX instead of actually sent: monkey patch # cwconfig.SMTP object class MockSMTP: + def __init__(self, server, port): pass + def close(self): pass + def sendmail(self, fromaddr, recipients, msg): MAILBOX.append(Email(fromaddr, recipients, msg)) @@ -246,7 +253,6 @@ cnx.commit() - # base class for cubicweb tests requiring a full cw environments ############### class CubicWebTC(TestCase): @@ -296,14 +302,14 @@ try: self._open_access.pop().close() except BadConnectionId: - continue # already closed + continue # already closed @property def session(self): """return admin session""" return self._admin_session - #XXX this doesn't need to a be classmethod anymore + # XXX this doesn't need to a be classmethod anymore def _init_repo(self): """init the repository and connection to it. """ @@ -317,7 +323,6 @@ self.admin_access = self.new_access(login) self._admin_session = self.admin_access._session - # config management ######################################################## @classproperty @@ -338,7 +343,7 @@ config.mode = 'test' return config - @classmethod # XXX could be turned into a regular method + @classmethod # XXX could be turned into a regular method def init_config(cls, config): """configuration initialization hooks. @@ -354,13 +359,13 @@ cls.admlogin = text_type(admincfg['login']) cls.admpassword = admincfg['password'] # uncomment the line below if you want rql queries to be logged - #config.global_set_option('query-log-file', - # '/tmp/test_rql_log.' + `os.getpid()`) + # config.global_set_option('query-log-file', + # '/tmp/test_rql_log.' + `os.getpid()`) config.global_set_option('log-file', None) # set default-dest-addrs to a dumb email address to avoid mailbox or # mail queue pollution config.global_set_option('default-dest-addrs', ['whatever']) - send_to = '%s@logilab.fr' % getlogin() + send_to = '%s@logilab.fr' % getlogin() config.global_set_option('sender-addr', send_to) config.global_set_option('default-dest-addrs', send_to) config.global_set_option('sender-name', 'cubicweb-test') @@ -370,14 +375,13 @@ # web resources try: config.global_set_option('embed-allowed', re.compile('.*')) - except Exception: # not in server only configuration + except Exception: # not in server only configuration pass @property def vreg(self): return self.repo.vreg - # global resources accessors ############################################### @property @@ -411,7 +415,7 @@ self.addCleanup(self._close_access) self.config.set_anonymous_allowed(self.anonymous_allowed) self.setup_database() - MAILBOX[:] = [] # reset mailbox + MAILBOX[:] = [] # reset mailbox def tearDown(self): # XXX hack until logilab.common.testlib is fixed @@ -427,8 +431,10 @@ # monkey patch send mail operation so emails are sent synchronously _old_mail_postcommit_event = SendMailOp.postcommit_event SendMailOp.postcommit_event = SendMailOp.sendmails + def reverse_SendMailOp_monkey_patch(): SendMailOp.postcommit_event = _old_mail_postcommit_event + self.addCleanup(reverse_SendMailOp_monkey_patch) def setup_database(self): @@ -452,7 +458,7 @@ else: return req.user - @iclassmethod # XXX turn into a class method + @iclassmethod # XXX turn into a class method def create_user(self, req, login=None, groups=('users',), password=None, email=None, commit=True, **kwargs): """create and return a new user entity""" @@ -471,12 +477,11 @@ user.cw_clear_relation_cache('in_group', 'subject') if commit: try: - req.commit() # req is a session + req.commit() # req is a session except AttributeError: req.cnx.commit() return user - # other utilities ######################################################### @contextmanager @@ -555,7 +560,6 @@ self.assertListEqual(sorted(tr.name for tr in transitions), sorted(expected)) - # views and actions registries inspection ################################## def pviews(self, req, rset): @@ -591,6 +595,7 @@ @property def items(self): return self + class fake_box(object): def action_link(self, action, **kwargs): return (action.title, action.url()) @@ -617,7 +622,7 @@ try: view = viewsvreg._select_best(views, req, rset=rset) if view is None: - raise NoSelectableObject((req,), {'rset':rset}, views) + raise NoSelectableObject((req,), {'rset': rset}, views) if view.linkable(): yield view else: @@ -648,7 +653,6 @@ else: not_selected(self.vreg, view) - # web ui testing utilities ################################################# @property @@ -656,8 +660,10 @@ def app(self): """return a cubicweb publisher""" publisher = application.CubicWebPublisher(self.repo, self.config) + def raise_error_handler(*args, **kwargs): raise + publisher.error_handler = raise_error_handler return publisher @@ -709,7 +715,7 @@ for fields that are not tied to the given entity """ assert field_dict or entity_field_dicts, \ - 'field_dict and entity_field_dicts arguments must not be both unspecified' + 'field_dict and entity_field_dicts arguments must not be both unspecified' if field_dict is None: field_dict = {} form = {'__form_id': formid} @@ -717,9 +723,11 @@ for field, value in field_dict.items(): fields.append(field) form[field] = value + def _add_entity_field(entity, field, value): entity_fields.append(field) form[eid_param(field, entity.eid)] = value + for entity, field_dict in entity_field_dicts: if '__maineid' not in form: form['__maineid'] = entity.eid @@ -742,7 +750,7 @@ """ req = self.request(url=url) if isinstance(url, unicode): - url = url.encode(req.encoding) # req.setup_params() expects encoded strings + url = url.encode(req.encoding) # req.setup_params() expects encoded strings querystring = urlparse(url)[-2] params = parse_qs(querystring) req.setup_params(params) @@ -756,7 +764,7 @@ """ with self.admin_access.web_request(url=url) as req: if isinstance(url, unicode): - url = url.encode(req.encoding) # req.setup_params() expects encoded strings + url = url.encode(req.encoding) # req.setup_params() expects encoded strings querystring = urlparse(url)[-2] params = parse_qs(querystring) req.setup_params(params) @@ -799,7 +807,7 @@ else: cleanup = lambda p: (p[0], urlunquote(p[1])) params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p) - if path.startswith(req.base_url()): # may be relative + if path.startswith(req.base_url()): # may be relative path = path[len(req.base_url()):] return path, params @@ -818,8 +826,8 @@ """call the publish method of the application publisher, expecting to get a Redirect exception """ - result = self.app_handle_request(req, path) - self.assertTrue(300 <= req.status_out <400, req.status_out) + self.app_handle_request(req, path) + self.assertTrue(300 <= req.status_out < 400, req.status_out) location = req.get_response_header('location') return self._parse_location(req, location) @@ -828,7 +836,6 @@ def expect_redirect_publish(self, *args, **kwargs): return self.expect_redirect_handle_request(*args, **kwargs) - def set_auth_mode(self, authmode, anonuser=None): self.set_option('auth-mode', authmode) self.set_option('anonymous-user', anonuser) @@ -877,8 +884,8 @@ # # do not set html validators here, we need HTMLValidator for html # snippets - #'text/html': DTDValidator, - #'application/xhtml+xml': DTDValidator, + # 'text/html': DTDValidator, + # 'application/xhtml+xml': DTDValidator, 'application/xml': htmlparser.XMLValidator, 'text/xml': htmlparser.XMLValidator, 'application/json': JsonValidator, @@ -892,7 +899,6 @@ vid_validators = dict((vid, htmlparser.VALMAP[valkey]) for vid, valkey in VIEW_VALIDATORS.items()) - def view(self, vid, rset=None, req=None, template='main-template', **kwargs): """This method tests the view `vid` on `rset` using `template` @@ -921,7 +927,7 @@ else: self.set_description("testing vid=%s defined in %s without rset" % ( vid, view.__module__)) - if template is None: # raw view testing, no template + if template is None: # raw view testing, no template viewfunc = view.render else: kwargs['view'] = view @@ -929,7 +935,6 @@ rset=rset, **kwargs) return self._test_view(viewfunc, view, template, kwargs) - def _test_view(self, viewfunc, view, template='main-template', kwargs={}): """this method does the actual call to the view @@ -989,11 +994,11 @@ output = output.encode('utf-8') validator = self.get_validator(view, output=output) if validator is None: - return output # return raw output if no validator is defined + return output # return raw output if no validator is defined if isinstance(validator, htmlparser.DTDValidator): # XXX remove used in progress widget, unknown in html dtd output = re.sub('', '', output) - return self.assertWellFormed(validator, output.strip(), context= view.__regid__) + return self.assertWellFormed(validator, output.strip(), context=view.__regid__) def assertWellFormed(self, validator, content, context=None): try: @@ -1069,6 +1074,7 @@ # XXX cleanup unprotected_entities & all mess + def how_many_dict(schema, cnx, how_many, skip): """given a schema, compute how many entities by type we need to be able to satisfy relations cardinality. @@ -1097,7 +1103,7 @@ # reverse subj and obj in the above explanation relmap.setdefault((rschema, obj), []).append(str(subj)) unprotected = unprotected_entities(schema) - for etype in skip: # XXX (syt) duh? explain or kill + for etype in skip: # XXX (syt) duh? explain or kill unprotected.add(etype) howmanydict = {} # step 1, compute a base number of each entity types: number of already @@ -1143,7 +1149,6 @@ def post_populate(self, cnx): pass - @nocoverage def auto_populate(self, how_many): """this method populates the database with `how_many` entities @@ -1183,7 +1188,7 @@ except ValidationError as ex: # failed to satisfy some constraint print('error in automatic db population', ex) - cnx.commit_state = None # reset uncommitable flag + cnx.commit_state = None # reset uncommitable flag self.post_populate(cnx) def iter_individual_rsets(self, etypes=None, limit=None): @@ -1218,7 +1223,8 @@ # test a mixed query (DISTINCT/GROUP to avoid getting duplicate # X which make muledit view failing for instance (html validation fails # because of some duplicate "id" attributes) - yield req.execute('DISTINCT Any X, MAX(Y) GROUPBY X WHERE X is %s, Y is %s' % (etype1, etype2)) + yield req.execute('DISTINCT Any X, MAX(Y) GROUPBY X WHERE X is %s, Y is %s' % + (etype1, etype2)) # test some application-specific queries if defined for rql in self.application_rql: yield req.execute(rql) @@ -1241,7 +1247,8 @@ # resultset's syntax tree rset = backup_rset for action in self.list_actions_for(rset): - yield InnerTest(self._testname(rset, action.__regid__, 'action'), self._test_action, action) + yield InnerTest(self._testname(rset, action.__regid__, 'action'), + self._test_action, action) for box in self.list_boxes_for(rset): w = [].append yield InnerTest(self._testname(rset, box.__regid__, 'box'), box.render, w) @@ -1269,21 +1276,18 @@ # machinery (else some views may fail) self.app - ## one each def test_one_each_config(self): self.auto_populate(1) for rset in self.iter_automatic_rsets(limit=1): for testargs in self._test_everything_for(rset): yield testargs - ## ten each def test_ten_each_config(self): self.auto_populate(10) for rset in self.iter_automatic_rsets(limit=10): for testargs in self._test_everything_for(rset): yield testargs - ## startup views def test_startup_views(self): for vid in self.list_startup_views(): with self.admin_access.web_request() as req: