devtools/testlib.py
changeset 10974 6557833657d6
parent 10937 eb05348b0e2d
child 10997 da712d3f0601
--- a/devtools/testlib.py	Tue Dec 15 08:55:58 2015 +0100
+++ b/devtools/testlib.py	Wed Dec 09 16:32:57 2015 +0100
@@ -18,14 +18,11 @@
 """this module contains base classes and utilities for cubicweb tests"""
 from __future__ import print_function
 
-__docformat__ = "restructuredtext en"
-
 import sys
 import re
 from os.path import dirname, join, abspath
 from math import log
 from contextlib import contextmanager
-from warnings import warn
 from itertools import chain
 
 from six import text_type, string_types
@@ -43,7 +40,7 @@
 from logilab.common.shellutils import getlogin
 
 from cubicweb import (ValidationError, NoSelectableObject, AuthenticationError,
-                      ProgrammingError, BadConnectionId)
+                      BadConnectionId)
 from cubicweb import cwconfig, devtools, web, server, repoapi
 from cubicweb.utils import json
 from cubicweb.sobjects import notification
@@ -52,7 +49,7 @@
 from cubicweb.server.session import Session
 from cubicweb.devtools import SYSTEM_ENTITIES, SYSTEM_RELATIONS, VIEW_VALIDATORS
 from cubicweb.devtools import fake, htmlparser, DEFAULT_EMPTY_DB_ID
-from cubicweb.utils import json
+
 
 # low-level utilities ##########################################################
 
@@ -67,6 +64,7 @@
             toto.write(data)
         webbrowser.open('file:///tmp/toto.html')
 
+
 def line_context_filter(line_no, center, before=3, after=None):
     """return true if line are in context
 
@@ -76,6 +74,7 @@
         after = before
     return center - before <= line_no <= center + after
 
+
 def unprotected_entities(schema, strict=False):
     """returned a set of each non final entity type, excluding "system" entities
     (eg CWGroup, CWUser...)
@@ -86,10 +85,12 @@
         protected_entities = yams.schema.BASE_TYPES.union(SYSTEM_ENTITIES)
     return set(schema.entities()) - protected_entities
 
+
 class JsonValidator(object):
     def parse_string(self, data):
         return json.loads(data.decode('ascii'))
 
+
 @contextmanager
 def real_error_handling(app):
     """By default, CubicWebTC `app` attribute (ie the publisher) is monkey
@@ -112,10 +113,12 @@
     # restore
     app.error_handler = fake_error_handler
 
+
 # email handling, to test emails sent by an application ########################
 
 MAILBOX = []
 
+
 class Email(object):
     """you'll get instances of Email into MAILBOX during tests that trigger
     some notification.
@@ -146,13 +149,17 @@
         return '<Email to %s with subject %s>' % (','.join(self.recipients),
                                                   self.message.get('Subject'))
 
+
 # the trick to get email into MAILBOX instead of actually sent: monkey patch
 # cwconfig.SMTP object
 class MockSMTP:
+
     def __init__(self, server, port):
         pass
+
     def close(self):
         pass
+
     def sendmail(self, fromaddr, recipients, msg):
         MAILBOX.append(Email(fromaddr, recipients, msg))
 
@@ -246,7 +253,6 @@
             cnx.commit()
 
 
-
 # base class for cubicweb tests requiring a full cw environments ###############
 
 class CubicWebTC(TestCase):
@@ -296,14 +302,14 @@
             try:
                 self._open_access.pop().close()
             except BadConnectionId:
-                continue # already closed
+                continue  # already closed
 
     @property
     def session(self):
         """return admin session"""
         return self._admin_session
 
-    #XXX this doesn't need to a be classmethod anymore
+    # XXX this doesn't need to a be classmethod anymore
     def _init_repo(self):
         """init the repository and connection to it.
         """
@@ -317,7 +323,6 @@
         self.admin_access = self.new_access(login)
         self._admin_session = self.admin_access._session
 
-
     # config management ########################################################
 
     @classproperty
@@ -338,7 +343,7 @@
             config.mode = 'test'
             return config
 
-    @classmethod # XXX could be turned into a regular method
+    @classmethod  # XXX could be turned into a regular method
     def init_config(cls, config):
         """configuration initialization hooks.
 
@@ -354,13 +359,13 @@
         cls.admlogin = text_type(admincfg['login'])
         cls.admpassword = admincfg['password']
         # uncomment the line below if you want rql queries to be logged
-        #config.global_set_option('query-log-file',
-        #                         '/tmp/test_rql_log.' + `os.getpid()`)
+        # config.global_set_option('query-log-file',
+        #                          '/tmp/test_rql_log.' + `os.getpid()`)
         config.global_set_option('log-file', None)
         # set default-dest-addrs to a dumb email address to avoid mailbox or
         # mail queue pollution
         config.global_set_option('default-dest-addrs', ['whatever'])
-        send_to =  '%s@logilab.fr' % getlogin()
+        send_to = '%s@logilab.fr' % getlogin()
         config.global_set_option('sender-addr', send_to)
         config.global_set_option('default-dest-addrs', send_to)
         config.global_set_option('sender-name', 'cubicweb-test')
@@ -370,14 +375,13 @@
         # web resources
         try:
             config.global_set_option('embed-allowed', re.compile('.*'))
-        except Exception: # not in server only configuration
+        except Exception:  # not in server only configuration
             pass
 
     @property
     def vreg(self):
         return self.repo.vreg
 
-
     # global resources accessors ###############################################
 
     @property
@@ -411,7 +415,7 @@
             self.addCleanup(self._close_access)
         self.config.set_anonymous_allowed(self.anonymous_allowed)
         self.setup_database()
-        MAILBOX[:] = [] # reset mailbox
+        MAILBOX[:] = []  # reset mailbox
 
     def tearDown(self):
         # XXX hack until logilab.common.testlib is fixed
@@ -427,8 +431,10 @@
         # monkey patch send mail operation so emails are sent synchronously
         _old_mail_postcommit_event = SendMailOp.postcommit_event
         SendMailOp.postcommit_event = SendMailOp.sendmails
+
         def reverse_SendMailOp_monkey_patch():
             SendMailOp.postcommit_event = _old_mail_postcommit_event
+
         self.addCleanup(reverse_SendMailOp_monkey_patch)
 
     def setup_database(self):
@@ -452,7 +458,7 @@
         else:
             return req.user
 
-    @iclassmethod # XXX turn into a class method
+    @iclassmethod  # XXX turn into a class method
     def create_user(self, req, login=None, groups=('users',), password=None,
                     email=None, commit=True, **kwargs):
         """create and return a new user entity"""
@@ -471,12 +477,11 @@
         user.cw_clear_relation_cache('in_group', 'subject')
         if commit:
             try:
-                req.commit() # req is a session
+                req.commit()  # req is a session
             except AttributeError:
                 req.cnx.commit()
         return user
 
-
     # other utilities #########################################################
 
     @contextmanager
@@ -555,7 +560,6 @@
         self.assertListEqual(sorted(tr.name for tr in transitions),
                              sorted(expected))
 
-
     # views and actions registries inspection ##################################
 
     def pviews(self, req, rset):
@@ -591,6 +595,7 @@
             @property
             def items(self):
                 return self
+
         class fake_box(object):
             def action_link(self, action, **kwargs):
                 return (action.title, action.url())
@@ -617,7 +622,7 @@
                 try:
                     view = viewsvreg._select_best(views, req, rset=rset)
                     if view is None:
-                        raise NoSelectableObject((req,), {'rset':rset}, views)
+                        raise NoSelectableObject((req,), {'rset': rset}, views)
                     if view.linkable():
                         yield view
                     else:
@@ -648,7 +653,6 @@
                 else:
                     not_selected(self.vreg, view)
 
-
     # web ui testing utilities #################################################
 
     @property
@@ -656,8 +660,10 @@
     def app(self):
         """return a cubicweb publisher"""
         publisher = application.CubicWebPublisher(self.repo, self.config)
+
         def raise_error_handler(*args, **kwargs):
             raise
+
         publisher.error_handler = raise_error_handler
         return publisher
 
@@ -709,7 +715,7 @@
           for fields that are not tied to the given entity
         """
         assert field_dict or entity_field_dicts, \
-                'field_dict and entity_field_dicts arguments must not be both unspecified'
+            'field_dict and entity_field_dicts arguments must not be both unspecified'
         if field_dict is None:
             field_dict = {}
         form = {'__form_id': formid}
@@ -717,9 +723,11 @@
         for field, value in field_dict.items():
             fields.append(field)
             form[field] = value
+
         def _add_entity_field(entity, field, value):
             entity_fields.append(field)
             form[eid_param(field, entity.eid)] = value
+
         for entity, field_dict in entity_field_dicts:
             if '__maineid' not in form:
                 form['__maineid'] = entity.eid
@@ -742,7 +750,7 @@
         """
         req = self.request(url=url)
         if isinstance(url, unicode):
-            url = url.encode(req.encoding) # req.setup_params() expects encoded strings
+            url = url.encode(req.encoding)  # req.setup_params() expects encoded strings
         querystring = urlparse(url)[-2]
         params = parse_qs(querystring)
         req.setup_params(params)
@@ -756,7 +764,7 @@
         """
         with self.admin_access.web_request(url=url) as req:
             if isinstance(url, unicode):
-                url = url.encode(req.encoding) # req.setup_params() expects encoded strings
+                url = url.encode(req.encoding)  # req.setup_params() expects encoded strings
             querystring = urlparse(url)[-2]
             params = parse_qs(querystring)
             req.setup_params(params)
@@ -799,7 +807,7 @@
         else:
             cleanup = lambda p: (p[0], urlunquote(p[1]))
             params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p)
-        if path.startswith(req.base_url()): # may be relative
+        if path.startswith(req.base_url()):  # may be relative
             path = path[len(req.base_url()):]
         return path, params
 
@@ -818,8 +826,8 @@
         """call the publish method of the application publisher, expecting to
         get a Redirect exception
         """
-        result = self.app_handle_request(req, path)
-        self.assertTrue(300 <= req.status_out <400, req.status_out)
+        self.app_handle_request(req, path)
+        self.assertTrue(300 <= req.status_out < 400, req.status_out)
         location = req.get_response_header('location')
         return self._parse_location(req, location)
 
@@ -828,7 +836,6 @@
     def expect_redirect_publish(self, *args, **kwargs):
         return self.expect_redirect_handle_request(*args, **kwargs)
 
-
     def set_auth_mode(self, authmode, anonuser=None):
         self.set_option('auth-mode', authmode)
         self.set_option('anonymous-user', anonuser)
@@ -877,8 +884,8 @@
         #
         # do not set html validators here, we need HTMLValidator for html
         # snippets
-        #'text/html': DTDValidator,
-        #'application/xhtml+xml': DTDValidator,
+        # 'text/html': DTDValidator,
+        # 'application/xhtml+xml': DTDValidator,
         'application/xml': htmlparser.XMLValidator,
         'text/xml': htmlparser.XMLValidator,
         'application/json': JsonValidator,
@@ -892,7 +899,6 @@
     vid_validators = dict((vid, htmlparser.VALMAP[valkey])
                           for vid, valkey in VIEW_VALIDATORS.items())
 
-
     def view(self, vid, rset=None, req=None, template='main-template',
              **kwargs):
         """This method tests the view `vid` on `rset` using `template`
@@ -921,7 +927,7 @@
         else:
             self.set_description("testing vid=%s defined in %s without rset" % (
                 vid, view.__module__))
-        if template is None: # raw view testing, no template
+        if template is None:  # raw view testing, no template
             viewfunc = view.render
         else:
             kwargs['view'] = view
@@ -929,7 +935,6 @@
                                                           rset=rset, **kwargs)
         return self._test_view(viewfunc, view, template, kwargs)
 
-
     def _test_view(self, viewfunc, view, template='main-template', kwargs={}):
         """this method does the actual call to the view
 
@@ -989,11 +994,11 @@
             output = output.encode('utf-8')
         validator = self.get_validator(view, output=output)
         if validator is None:
-            return output # return raw output if no validator is defined
+            return output  # return raw output if no validator is defined
         if isinstance(validator, htmlparser.DTDValidator):
             # XXX remove <canvas> used in progress widget, unknown in html dtd
             output = re.sub('<canvas.*?></canvas>', '', output)
-        return self.assertWellFormed(validator, output.strip(), context= view.__regid__)
+        return self.assertWellFormed(validator, output.strip(), context=view.__regid__)
 
     def assertWellFormed(self, validator, content, context=None):
         try:
@@ -1069,6 +1074,7 @@
 
 # XXX cleanup unprotected_entities & all mess
 
+
 def how_many_dict(schema, cnx, how_many, skip):
     """given a schema, compute how many entities by type we need to be able to
     satisfy relations cardinality.
@@ -1097,7 +1103,7 @@
                 # reverse subj and obj in the above explanation
                 relmap.setdefault((rschema, obj), []).append(str(subj))
     unprotected = unprotected_entities(schema)
-    for etype in skip: # XXX (syt) duh? explain or kill
+    for etype in skip:  # XXX (syt) duh? explain or kill
         unprotected.add(etype)
     howmanydict = {}
     # step 1, compute a base number of each entity types: number of already
@@ -1143,7 +1149,6 @@
     def post_populate(self, cnx):
         pass
 
-
     @nocoverage
     def auto_populate(self, how_many):
         """this method populates the database with `how_many` entities
@@ -1183,7 +1188,7 @@
             except ValidationError as ex:
                 # failed to satisfy some constraint
                 print('error in automatic db population', ex)
-                cnx.commit_state = None # reset uncommitable flag
+                cnx.commit_state = None  # reset uncommitable flag
         self.post_populate(cnx)
 
     def iter_individual_rsets(self, etypes=None, limit=None):
@@ -1218,7 +1223,8 @@
             # test a mixed query (DISTINCT/GROUP to avoid getting duplicate
             # X which make muledit view failing for instance (html validation fails
             # because of some duplicate "id" attributes)
-            yield req.execute('DISTINCT Any X, MAX(Y) GROUPBY X WHERE X is %s, Y is %s' % (etype1, etype2))
+            yield req.execute('DISTINCT Any X, MAX(Y) GROUPBY X WHERE X is %s, Y is %s' %
+                              (etype1, etype2))
             # test some application-specific queries if defined
             for rql in self.application_rql:
                 yield req.execute(rql)
@@ -1241,7 +1247,8 @@
             # resultset's syntax tree
             rset = backup_rset
         for action in self.list_actions_for(rset):
-            yield InnerTest(self._testname(rset, action.__regid__, 'action'), self._test_action, action)
+            yield InnerTest(self._testname(rset, action.__regid__, 'action'),
+                            self._test_action, action)
         for box in self.list_boxes_for(rset):
             w = [].append
             yield InnerTest(self._testname(rset, box.__regid__, 'box'), box.render, w)
@@ -1269,21 +1276,18 @@
         # machinery (else some views may fail)
         self.app
 
-    ## one each
     def test_one_each_config(self):
         self.auto_populate(1)
         for rset in self.iter_automatic_rsets(limit=1):
             for testargs in self._test_everything_for(rset):
                 yield testargs
 
-    ## ten each
     def test_ten_each_config(self):
         self.auto_populate(10)
         for rset in self.iter_automatic_rsets(limit=10):
             for testargs in self._test_everything_for(rset):
                 yield testargs
 
-    ## startup views
     def test_startup_views(self):
         for vid in self.list_startup_views():
             with self.admin_access.web_request() as req: