cubicweb/devtools/testlib.py
changeset 11057 0b59724cb3f2
parent 11014 9c9f5e913f9c
child 11069 020de2d09c0f
equal deleted inserted replaced
11052:058bb3dc685f 11057:0b59724cb3f2
       
     1 # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     3 #
       
     4 # This file is part of CubicWeb.
       
     5 #
       
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     7 # terms of the GNU Lesser General Public License as published by the Free
       
     8 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     9 # any later version.
       
    10 #
       
    11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    14 # details.
       
    15 #
       
    16 # You should have received a copy of the GNU Lesser General Public License along
       
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    18 """this module contains base classes and utilities for cubicweb tests"""
       
    19 from __future__ import print_function
       
    20 
       
    21 import sys
       
    22 import re
       
    23 from os.path import dirname, join, abspath
       
    24 from math import log
       
    25 from contextlib import contextmanager
       
    26 from itertools import chain
       
    27 
       
    28 from six import text_type, string_types
       
    29 from six.moves import range
       
    30 from six.moves.urllib.parse import urlparse, parse_qs, unquote as urlunquote
       
    31 
       
    32 import yams.schema
       
    33 
       
    34 from logilab.common.testlib import TestCase, InnerTest, Tags
       
    35 from logilab.common.pytest import nocoverage, pause_trace
       
    36 from logilab.common.debugger import Debugger
       
    37 from logilab.common.umessage import message_from_string
       
    38 from logilab.common.decorators import cached, classproperty, clear_cache, iclassmethod
       
    39 from logilab.common.deprecation import deprecated, class_deprecated
       
    40 from logilab.common.shellutils import getlogin
       
    41 
       
    42 from cubicweb import (ValidationError, NoSelectableObject, AuthenticationError,
       
    43                       BadConnectionId)
       
    44 from cubicweb import cwconfig, devtools, web, server, repoapi
       
    45 from cubicweb.utils import json
       
    46 from cubicweb.sobjects import notification
       
    47 from cubicweb.web import Redirect, application, eid_param
       
    48 from cubicweb.server.hook import SendMailOp
       
    49 from cubicweb.server.session import Session
       
    50 from cubicweb.devtools import SYSTEM_ENTITIES, SYSTEM_RELATIONS, VIEW_VALIDATORS
       
    51 from cubicweb.devtools import fake, htmlparser, DEFAULT_EMPTY_DB_ID
       
    52 
       
    53 
       
    54 # low-level utilities ##########################################################
       
    55 
       
    56 class CubicWebDebugger(Debugger):
       
    57     """special debugger class providing a 'view' function which saves some
       
    58     html into a temporary file and open a web browser to examinate it.
       
    59     """
       
    60     def do_view(self, arg):
       
    61         import webbrowser
       
    62         data = self._getval(arg)
       
    63         with open('/tmp/toto.html', 'w') as toto:
       
    64             toto.write(data)
       
    65         webbrowser.open('file:///tmp/toto.html')
       
    66 
       
    67 
       
    68 def line_context_filter(line_no, center, before=3, after=None):
       
    69     """return true if line are in context
       
    70 
       
    71     if after is None: after = before
       
    72     """
       
    73     if after is None:
       
    74         after = before
       
    75     return center - before <= line_no <= center + after
       
    76 
       
    77 
       
    78 def unprotected_entities(schema, strict=False):
       
    79     """returned a set of each non final entity type, excluding "system" entities
       
    80     (eg CWGroup, CWUser...)
       
    81     """
       
    82     if strict:
       
    83         protected_entities = yams.schema.BASE_TYPES
       
    84     else:
       
    85         protected_entities = yams.schema.BASE_TYPES.union(SYSTEM_ENTITIES)
       
    86     return set(schema.entities()) - protected_entities
       
    87 
       
    88 
       
    89 class JsonValidator(object):
       
    90     def parse_string(self, data):
       
    91         return json.loads(data.decode('ascii'))
       
    92 
       
    93 
       
    94 @contextmanager
       
    95 def real_error_handling(app):
       
    96     """By default, CubicWebTC `app` attribute (ie the publisher) is monkey
       
    97     patched so that unexpected error are raised rather than going through the
       
    98     `error_handler` method.
       
    99 
       
   100     By using this context manager you disable this monkey-patching temporarily.
       
   101     Hence when publishihng a request no error will be raised, you'll get
       
   102     req.status_out set to an HTTP error status code and the generated page will
       
   103     usually hold a traceback as HTML.
       
   104 
       
   105     >>> with real_error_handling(app):
       
   106     >>>     page = app.handle_request(req)
       
   107     """
       
   108     # remove the monkey patched error handler
       
   109     fake_error_handler = app.error_handler
       
   110     del app.error_handler
       
   111     # return the app
       
   112     yield app
       
   113     # restore
       
   114     app.error_handler = fake_error_handler
       
   115 
       
   116 
       
   117 # email handling, to test emails sent by an application ########################
       
   118 
       
   119 MAILBOX = []
       
   120 
       
   121 
       
   122 class Email(object):
       
   123     """you'll get instances of Email into MAILBOX during tests that trigger
       
   124     some notification.
       
   125 
       
   126     * `msg` is the original message object
       
   127 
       
   128     * `recipients` is a list of email address which are the recipients of this
       
   129       message
       
   130     """
       
   131     def __init__(self, fromaddr, recipients, msg):
       
   132         self.fromaddr = fromaddr
       
   133         self.recipients = recipients
       
   134         self.msg = msg
       
   135 
       
   136     @property
       
   137     def message(self):
       
   138         return message_from_string(self.msg)
       
   139 
       
   140     @property
       
   141     def subject(self):
       
   142         return self.message.get('Subject')
       
   143 
       
   144     @property
       
   145     def content(self):
       
   146         return self.message.get_payload(decode=True)
       
   147 
       
   148     def __repr__(self):
       
   149         return '<Email to %s with subject %s>' % (','.join(self.recipients),
       
   150                                                   self.message.get('Subject'))
       
   151 
       
   152 
       
   153 # the trick to get email into MAILBOX instead of actually sent: monkey patch
       
   154 # cwconfig.SMTP object
       
   155 class MockSMTP:
       
   156 
       
   157     def __init__(self, server, port):
       
   158         pass
       
   159 
       
   160     def close(self):
       
   161         pass
       
   162 
       
   163     def sendmail(self, fromaddr, recipients, msg):
       
   164         MAILBOX.append(Email(fromaddr, recipients, msg))
       
   165 
       
   166 cwconfig.SMTP = MockSMTP
       
   167 
       
   168 
       
   169 # Repoaccess utility ###############################################3###########
       
   170 
       
   171 class RepoAccess(object):
       
   172     """An helper to easily create object to access the repo as a specific user
       
   173 
       
   174     Each RepoAccess have it own session.
       
   175 
       
   176     A repo access can create three type of object:
       
   177 
       
   178     .. automethod:: cubicweb.testlib.RepoAccess.cnx
       
   179     .. automethod:: cubicweb.testlib.RepoAccess.web_request
       
   180 
       
   181     The RepoAccess need to be closed to destroy the associated Session.
       
   182     TestCase usually take care of this aspect for the user.
       
   183 
       
   184     .. automethod:: cubicweb.testlib.RepoAccess.close
       
   185     """
       
   186 
       
   187     def __init__(self, repo, login, requestcls):
       
   188         self._repo = repo
       
   189         self._login = login
       
   190         self.requestcls = requestcls
       
   191         self._session = self._unsafe_connect(login)
       
   192 
       
   193     def _unsafe_connect(self, login, **kwargs):
       
   194         """ a completely unsafe connect method for the tests """
       
   195         # use an internal connection
       
   196         with self._repo.internal_cnx() as cnx:
       
   197             # try to get a user object
       
   198             user = cnx.find('CWUser', login=login).one()
       
   199             user.groups
       
   200             user.properties
       
   201             user.login
       
   202             session = Session(user, self._repo)
       
   203             self._repo._sessions[session.sessionid] = session
       
   204             user._cw = user.cw_rset.req = session
       
   205         with session.new_cnx() as cnx:
       
   206             self._repo.hm.call_hooks('session_open', cnx)
       
   207             # commit connection at this point in case write operation has been
       
   208             # done during `session_open` hooks
       
   209             cnx.commit()
       
   210         return session
       
   211 
       
   212     @contextmanager
       
   213     def cnx(self):
       
   214         """Context manager returning a server side connection for the user"""
       
   215         with self._session.new_cnx() as cnx:
       
   216             yield cnx
       
   217 
       
   218     # aliases for bw compat
       
   219     client_cnx = repo_cnx = cnx
       
   220 
       
   221     @contextmanager
       
   222     def web_request(self, url=None, headers={}, method='GET', **kwargs):
       
   223         """Context manager returning a web request pre-linked to a client cnx
       
   224 
       
   225         To commit and rollback use::
       
   226 
       
   227             req.cnx.commit()
       
   228             req.cnx.rolback()
       
   229         """
       
   230         req = self.requestcls(self._repo.vreg, url=url, headers=headers,
       
   231                               method=method, form=kwargs)
       
   232         with self._session.new_cnx() as cnx:
       
   233             req.set_cnx(cnx)
       
   234             yield req
       
   235 
       
   236     def close(self):
       
   237         """Close the session associated to the RepoAccess"""
       
   238         if self._session is not None:
       
   239             self._repo.close(self._session.sessionid)
       
   240         self._session = None
       
   241 
       
   242     @contextmanager
       
   243     def shell(self):
       
   244         from cubicweb.server.migractions import ServerMigrationHelper
       
   245         with self._session.new_cnx() as cnx:
       
   246             mih = ServerMigrationHelper(None, repo=self._repo, cnx=cnx,
       
   247                                         interactive=False,
       
   248                                         # hack so it don't try to load fs schema
       
   249                                         schema=1)
       
   250             yield mih
       
   251             cnx.commit()
       
   252 
       
   253 
       
   254 # base class for cubicweb tests requiring a full cw environments ###############
       
   255 
       
   256 class CubicWebTC(TestCase):
       
   257     """abstract class for test using an apptest environment
       
   258 
       
   259     attributes:
       
   260 
       
   261     * `vreg`, the vregistry
       
   262     * `schema`, self.vreg.schema
       
   263     * `config`, cubicweb configuration
       
   264     * `cnx`, repoapi connection to the repository using an admin user
       
   265     * `session`, server side session associated to `cnx`
       
   266     * `app`, the cubicweb publisher (for web testing)
       
   267     * `repo`, the repository object
       
   268     * `admlogin`, login of the admin user
       
   269     * `admpassword`, password of the admin user
       
   270     * `shell`, create and use shell environment
       
   271     * `anonymous_allowed`: flag telling if anonymous browsing should be allowed
       
   272     """
       
   273     appid = 'data'
       
   274     configcls = devtools.ApptestConfiguration
       
   275     requestcls = fake.FakeRequest
       
   276     tags = TestCase.tags | Tags('cubicweb', 'cw_repo')
       
   277     test_db_id = DEFAULT_EMPTY_DB_ID
       
   278 
       
   279     # anonymous is logged by default in cubicweb test cases
       
   280     anonymous_allowed = True
       
   281 
       
   282     def __init__(self, *args, **kwargs):
       
   283         self._admin_session = None
       
   284         self.repo = None
       
   285         self._open_access = set()
       
   286         super(CubicWebTC, self).__init__(*args, **kwargs)
       
   287 
       
   288     # repository connection handling ###########################################
       
   289 
       
   290     def new_access(self, login):
       
   291         """provide a new RepoAccess object for a given user
       
   292 
       
   293         The access is automatically closed at the end of the test."""
       
   294         login = text_type(login)
       
   295         access = RepoAccess(self.repo, login, self.requestcls)
       
   296         self._open_access.add(access)
       
   297         return access
       
   298 
       
   299     def _close_access(self):
       
   300         while self._open_access:
       
   301             try:
       
   302                 self._open_access.pop().close()
       
   303             except BadConnectionId:
       
   304                 continue  # already closed
       
   305 
       
   306     @property
       
   307     def session(self):
       
   308         """return admin session"""
       
   309         return self._admin_session
       
   310 
       
   311     # XXX this doesn't need to a be classmethod anymore
       
   312     def _init_repo(self):
       
   313         """init the repository and connection to it.
       
   314         """
       
   315         # get or restore and working db.
       
   316         db_handler = devtools.get_test_db_handler(self.config, self.init_config)
       
   317         db_handler.build_db_cache(self.test_db_id, self.pre_setup_database)
       
   318         db_handler.restore_database(self.test_db_id)
       
   319         self.repo = db_handler.get_repo(startup=True)
       
   320         # get an admin session (without actual login)
       
   321         login = text_type(db_handler.config.default_admin_config['login'])
       
   322         self.admin_access = self.new_access(login)
       
   323         self._admin_session = self.admin_access._session
       
   324 
       
   325     # config management ########################################################
       
   326 
       
   327     @classproperty
       
   328     def config(cls):
       
   329         """return the configuration object
       
   330 
       
   331         Configuration is cached on the test class.
       
   332         """
       
   333         if cls is CubicWebTC:
       
   334             # Prevent direct use of CubicWebTC directly to avoid database
       
   335             # caching issues
       
   336             return None
       
   337         try:
       
   338             return cls.__dict__['_config']
       
   339         except KeyError:
       
   340             home = abspath(join(dirname(sys.modules[cls.__module__].__file__), cls.appid))
       
   341             config = cls._config = cls.configcls(cls.appid, apphome=home)
       
   342             config.mode = 'test'
       
   343             return config
       
   344 
       
   345     @classmethod  # XXX could be turned into a regular method
       
   346     def init_config(cls, config):
       
   347         """configuration initialization hooks.
       
   348 
       
   349         You may only want to override here the configuraton logic.
       
   350 
       
   351         Otherwise, consider to use a different :class:`ApptestConfiguration`
       
   352         defined in the `configcls` class attribute.
       
   353 
       
   354         This method will be called by the database handler once the config has
       
   355         been properly bootstrapped.
       
   356         """
       
   357         admincfg = config.default_admin_config
       
   358         cls.admlogin = text_type(admincfg['login'])
       
   359         cls.admpassword = admincfg['password']
       
   360         # uncomment the line below if you want rql queries to be logged
       
   361         # config.global_set_option('query-log-file',
       
   362         #                          '/tmp/test_rql_log.' + `os.getpid()`)
       
   363         config.global_set_option('log-file', None)
       
   364         # set default-dest-addrs to a dumb email address to avoid mailbox or
       
   365         # mail queue pollution
       
   366         config.global_set_option('default-dest-addrs', ['whatever'])
       
   367         send_to = '%s@logilab.fr' % getlogin()
       
   368         config.global_set_option('sender-addr', send_to)
       
   369         config.global_set_option('default-dest-addrs', send_to)
       
   370         config.global_set_option('sender-name', 'cubicweb-test')
       
   371         config.global_set_option('sender-addr', 'cubicweb-test@logilab.fr')
       
   372         # default_base_url on config class isn't enough for TestServerConfiguration
       
   373         config.global_set_option('base-url', config.default_base_url())
       
   374         # web resources
       
   375         try:
       
   376             config.global_set_option('embed-allowed', re.compile('.*'))
       
   377         except Exception:  # not in server only configuration
       
   378             pass
       
   379 
       
   380     @property
       
   381     def vreg(self):
       
   382         return self.repo.vreg
       
   383 
       
   384     # global resources accessors ###############################################
       
   385 
       
   386     @property
       
   387     def schema(self):
       
   388         """return the application schema"""
       
   389         return self.vreg.schema
       
   390 
       
   391     def set_option(self, optname, value):
       
   392         self.config.global_set_option(optname, value)
       
   393 
       
   394     def set_debug(self, debugmode):
       
   395         server.set_debug(debugmode)
       
   396 
       
   397     def debugged(self, debugmode):
       
   398         return server.debugged(debugmode)
       
   399 
       
   400     # default test setup and teardown #########################################
       
   401 
       
   402     def setUp(self):
       
   403         # monkey patch send mail operation so emails are sent synchronously
       
   404         self._patch_SendMailOp()
       
   405         with pause_trace():
       
   406             previous_failure = self.__class__.__dict__.get('_repo_init_failed')
       
   407             if previous_failure is not None:
       
   408                 self.skipTest('repository is not initialised: %r' % previous_failure)
       
   409             try:
       
   410                 self._init_repo()
       
   411             except Exception as ex:
       
   412                 self.__class__._repo_init_failed = ex
       
   413                 raise
       
   414             self.addCleanup(self._close_access)
       
   415         self.config.set_anonymous_allowed(self.anonymous_allowed)
       
   416         self.setup_database()
       
   417         MAILBOX[:] = []  # reset mailbox
       
   418 
       
   419     def tearDown(self):
       
   420         # XXX hack until logilab.common.testlib is fixed
       
   421         if self._admin_session is not None:
       
   422             self.repo.close(self._admin_session.sessionid)
       
   423             self._admin_session = None
       
   424         while self._cleanups:
       
   425             cleanup, args, kwargs = self._cleanups.pop(-1)
       
   426             cleanup(*args, **kwargs)
       
   427         self.repo.turn_repo_off()
       
   428 
       
   429     def _patch_SendMailOp(self):
       
   430         # monkey patch send mail operation so emails are sent synchronously
       
   431         _old_mail_postcommit_event = SendMailOp.postcommit_event
       
   432         SendMailOp.postcommit_event = SendMailOp.sendmails
       
   433 
       
   434         def reverse_SendMailOp_monkey_patch():
       
   435             SendMailOp.postcommit_event = _old_mail_postcommit_event
       
   436 
       
   437         self.addCleanup(reverse_SendMailOp_monkey_patch)
       
   438 
       
   439     def setup_database(self):
       
   440         """add your database setup code by overriding this method"""
       
   441 
       
   442     @classmethod
       
   443     def pre_setup_database(cls, cnx, config):
       
   444         """add your pre database setup code by overriding this method
       
   445 
       
   446         Do not forget to set the cls.test_db_id value to enable caching of the
       
   447         result.
       
   448         """
       
   449 
       
   450     # user / session management ###############################################
       
   451 
       
   452     @deprecated('[3.19] explicitly use RepoAccess object in test instead')
       
   453     def user(self, req=None):
       
   454         """return the application schema"""
       
   455         if req is None:
       
   456             return self.request().user
       
   457         else:
       
   458             return req.user
       
   459 
       
   460     @iclassmethod  # XXX turn into a class method
       
   461     def create_user(self, req, login=None, groups=('users',), password=None,
       
   462                     email=None, commit=True, **kwargs):
       
   463         """create and return a new user entity"""
       
   464         if password is None:
       
   465             password = login
       
   466         if login is not None:
       
   467             login = text_type(login)
       
   468         user = req.create_entity('CWUser', login=login,
       
   469                                  upassword=password, **kwargs)
       
   470         req.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)'
       
   471                     % ','.join(repr(str(g)) for g in groups),
       
   472                     {'x': user.eid})
       
   473         if email is not None:
       
   474             req.create_entity('EmailAddress', address=text_type(email),
       
   475                               reverse_primary_email=user)
       
   476         user.cw_clear_relation_cache('in_group', 'subject')
       
   477         if commit:
       
   478             try:
       
   479                 req.commit()  # req is a session
       
   480             except AttributeError:
       
   481                 req.cnx.commit()
       
   482         return user
       
   483 
       
   484     # other utilities #########################################################
       
   485 
       
   486     @contextmanager
       
   487     def temporary_appobjects(self, *appobjects):
       
   488         self.vreg._loadedmods.setdefault(self.__module__, {})
       
   489         for obj in appobjects:
       
   490             self.vreg.register(obj)
       
   491             registered = getattr(obj, '__registered__', None)
       
   492             if registered:
       
   493                 for registry in obj.__registries__:
       
   494                     registered(self.vreg[registry])
       
   495         try:
       
   496             yield
       
   497         finally:
       
   498             for obj in appobjects:
       
   499                 self.vreg.unregister(obj)
       
   500 
       
   501     @contextmanager
       
   502     def temporary_permissions(self, *perm_overrides, **perm_kwoverrides):
       
   503         """Set custom schema permissions within context.
       
   504 
       
   505         There are two ways to call this method, which may be used together :
       
   506 
       
   507         * using positional argument(s):
       
   508 
       
   509           .. sourcecode:: python
       
   510 
       
   511                 rdef = self.schema['CWUser'].rdef('login')
       
   512                 with self.temporary_permissions((rdef, {'read': ()})):
       
   513                     ...
       
   514 
       
   515 
       
   516         * using named argument(s):
       
   517 
       
   518           .. sourcecode:: python
       
   519 
       
   520                 with self.temporary_permissions(CWUser={'read': ()}):
       
   521                     ...
       
   522 
       
   523         Usually the former will be preferred to override permissions on a
       
   524         relation definition, while the latter is well suited for entity types.
       
   525 
       
   526         The allowed keys in the permission dictionary depend on the schema type
       
   527         (entity type / relation definition). Resulting permissions will be
       
   528         similar to `orig_permissions.update(partial_perms)`.
       
   529         """
       
   530         torestore = []
       
   531         for erschema, etypeperms in chain(perm_overrides, perm_kwoverrides.items()):
       
   532             if isinstance(erschema, string_types):
       
   533                 erschema = self.schema[erschema]
       
   534             for action, actionperms in etypeperms.items():
       
   535                 origperms = erschema.permissions[action]
       
   536                 erschema.set_action_permissions(action, actionperms)
       
   537                 torestore.append([erschema, action, origperms])
       
   538         try:
       
   539             yield
       
   540         finally:
       
   541             for erschema, action, permissions in torestore:
       
   542                 if action is None:
       
   543                     erschema.permissions = permissions
       
   544                 else:
       
   545                     erschema.set_action_permissions(action, permissions)
       
   546 
       
   547     def assertModificationDateGreater(self, entity, olddate):
       
   548         entity.cw_attr_cache.pop('modification_date', None)
       
   549         self.assertGreater(entity.modification_date, olddate)
       
   550 
       
   551     def assertMessageEqual(self, req, params, expected_msg):
       
   552         msg = req.session.data[params['_cwmsgid']]
       
   553         self.assertEqual(expected_msg, msg)
       
   554 
       
   555     # workflow utilities #######################################################
       
   556 
       
   557     def assertPossibleTransitions(self, entity, expected):
       
   558         transitions = entity.cw_adapt_to('IWorkflowable').possible_transitions()
       
   559         self.assertListEqual(sorted(tr.name for tr in transitions),
       
   560                              sorted(expected))
       
   561 
       
   562     # views and actions registries inspection ##################################
       
   563 
       
   564     def pviews(self, req, rset):
       
   565         return sorted((a.__regid__, a.__class__)
       
   566                       for a in self.vreg['views'].possible_views(req, rset=rset))
       
   567 
       
   568     def pactions(self, req, rset,
       
   569                  skipcategories=('addrelated', 'siteactions', 'useractions',
       
   570                                  'footer', 'manage')):
       
   571         return [(a.__regid__, a.__class__)
       
   572                 for a in self.vreg['actions'].poss_visible_objects(req, rset=rset)
       
   573                 if a.category not in skipcategories]
       
   574 
       
   575     def pactions_by_cats(self, req, rset, categories=('addrelated',)):
       
   576         return [(a.__regid__, a.__class__)
       
   577                 for a in self.vreg['actions'].poss_visible_objects(req, rset=rset)
       
   578                 if a.category in categories]
       
   579 
       
   580     def pactionsdict(self, req, rset,
       
   581                      skipcategories=('addrelated', 'siteactions', 'useractions',
       
   582                                      'footer', 'manage')):
       
   583         res = {}
       
   584         for a in self.vreg['actions'].poss_visible_objects(req, rset=rset):
       
   585             if a.category not in skipcategories:
       
   586                 res.setdefault(a.category, []).append(a.__class__)
       
   587         return res
       
   588 
       
   589     def action_submenu(self, req, rset, id):
       
   590         return self._test_action(self.vreg['actions'].select(id, req, rset=rset))
       
   591 
       
   592     def _test_action(self, action):
       
   593         class fake_menu(list):
       
   594             @property
       
   595             def items(self):
       
   596                 return self
       
   597 
       
   598         class fake_box(object):
       
   599             def action_link(self, action, **kwargs):
       
   600                 return (action.title, action.url())
       
   601         submenu = fake_menu()
       
   602         action.fill_menu(fake_box(), submenu)
       
   603         return submenu
       
   604 
       
   605     def list_views_for(self, rset):
       
   606         """returns the list of views that can be applied on `rset`"""
       
   607         req = rset.req
       
   608         only_once_vids = ('primary', 'secondary', 'text')
       
   609         req.data['ex'] = ValueError("whatever")
       
   610         viewsvreg = self.vreg['views']
       
   611         for vid, views in viewsvreg.items():
       
   612             if vid[0] == '_':
       
   613                 continue
       
   614             if rset.rowcount > 1 and vid in only_once_vids:
       
   615                 continue
       
   616             views = [view for view in views
       
   617                      if view.category != 'startupview'
       
   618                      and not issubclass(view, notification.NotificationView)
       
   619                      and not isinstance(view, class_deprecated)]
       
   620             if views:
       
   621                 try:
       
   622                     view = viewsvreg._select_best(views, req, rset=rset)
       
   623                     if view is None:
       
   624                         raise NoSelectableObject((req,), {'rset': rset}, views)
       
   625                     if view.linkable():
       
   626                         yield view
       
   627                     else:
       
   628                         not_selected(self.vreg, view)
       
   629                     # else the view is expected to be used as subview and should
       
   630                     # not be tested directly
       
   631                 except NoSelectableObject:
       
   632                     continue
       
   633 
       
   634     def list_actions_for(self, rset):
       
   635         """returns the list of actions that can be applied on `rset`"""
       
   636         req = rset.req
       
   637         for action in self.vreg['actions'].possible_objects(req, rset=rset):
       
   638             yield action
       
   639 
       
   640     def list_boxes_for(self, rset):
       
   641         """returns the list of boxes that can be applied on `rset`"""
       
   642         req = rset.req
       
   643         for box in self.vreg['ctxcomponents'].possible_objects(req, rset=rset):
       
   644             yield box
       
   645 
       
   646     def list_startup_views(self):
       
   647         """returns the list of startup views"""
       
   648         with self.admin_access.web_request() as req:
       
   649             for view in self.vreg['views'].possible_views(req, None):
       
   650                 if view.category == 'startupview':
       
   651                     yield view.__regid__
       
   652                 else:
       
   653                     not_selected(self.vreg, view)
       
   654 
       
   655     # web ui testing utilities #################################################
       
   656 
       
   657     @property
       
   658     @cached
       
   659     def app(self):
       
   660         """return a cubicweb publisher"""
       
   661         publisher = application.CubicWebPublisher(self.repo, self.config)
       
   662 
       
   663         def raise_error_handler(*args, **kwargs):
       
   664             raise
       
   665 
       
   666         publisher.error_handler = raise_error_handler
       
   667         return publisher
       
   668 
       
   669     @deprecated('[3.19] use the .remote_calling method')
       
   670     def remote_call(self, fname, *args):
       
   671         """remote json call simulation"""
       
   672         dump = json.dumps
       
   673         args = [dump(arg) for arg in args]
       
   674         req = self.request(fname=fname, pageid='123', arg=args)
       
   675         ctrl = self.vreg['controllers'].select('ajax', req)
       
   676         return ctrl.publish(), req
       
   677 
       
   678     @contextmanager
       
   679     def remote_calling(self, fname, *args):
       
   680         """remote json call simulation"""
       
   681         args = [json.dumps(arg) for arg in args]
       
   682         with self.admin_access.web_request(fname=fname, pageid='123', arg=args) as req:
       
   683             ctrl = self.vreg['controllers'].select('ajax', req)
       
   684             yield ctrl.publish(), req
       
   685 
       
   686     def app_handle_request(self, req, path='view'):
       
   687         return self.app.core_handle(req, path)
       
   688 
       
   689     @deprecated("[3.15] app_handle_request is the new and better way"
       
   690                 " (beware of small semantic changes)")
       
   691     def app_publish(self, *args, **kwargs):
       
   692         return self.app_handle_request(*args, **kwargs)
       
   693 
       
   694     def ctrl_publish(self, req, ctrl='edit', rset=None):
       
   695         """call the publish method of the edit controller"""
       
   696         ctrl = self.vreg['controllers'].select(ctrl, req, appli=self.app)
       
   697         try:
       
   698             result = ctrl.publish(rset)
       
   699             req.cnx.commit()
       
   700         except web.Redirect:
       
   701             req.cnx.commit()
       
   702             raise
       
   703         return result
       
   704 
       
   705     @staticmethod
       
   706     def fake_form(formid, field_dict=None, entity_field_dicts=()):
       
   707         """Build _cw.form dictionnary to fake posting of some standard cubicweb form
       
   708 
       
   709         * `formid`, the form id, usually form's __regid__
       
   710 
       
   711         * `field_dict`, dictionary of name:value for fields that are not tied to an entity
       
   712 
       
   713         * `entity_field_dicts`, list of (entity, dictionary) where dictionary contains name:value
       
   714           for fields that are not tied to the given entity
       
   715         """
       
   716         assert field_dict or entity_field_dicts, \
       
   717             'field_dict and entity_field_dicts arguments must not be both unspecified'
       
   718         if field_dict is None:
       
   719             field_dict = {}
       
   720         form = {'__form_id': formid}
       
   721         fields = []
       
   722         for field, value in field_dict.items():
       
   723             fields.append(field)
       
   724             form[field] = value
       
   725 
       
   726         def _add_entity_field(entity, field, value):
       
   727             entity_fields.append(field)
       
   728             form[eid_param(field, entity.eid)] = value
       
   729 
       
   730         for entity, field_dict in entity_field_dicts:
       
   731             if '__maineid' not in form:
       
   732                 form['__maineid'] = entity.eid
       
   733             entity_fields = []
       
   734             form.setdefault('eid', []).append(entity.eid)
       
   735             _add_entity_field(entity, '__type', entity.cw_etype)
       
   736             for field, value in field_dict.items():
       
   737                 _add_entity_field(entity, field, value)
       
   738             if entity_fields:
       
   739                 form[eid_param('_cw_entity_fields', entity.eid)] = ','.join(entity_fields)
       
   740         if fields:
       
   741             form['_cw_fields'] = ','.join(sorted(fields))
       
   742         return form
       
   743 
       
   744     @deprecated('[3.19] use .admin_request_from_url instead')
       
   745     def req_from_url(self, url):
       
   746         """parses `url` and builds the corresponding CW-web request
       
   747 
       
   748         req.form will be setup using the url's query string
       
   749         """
       
   750         req = self.request(url=url)
       
   751         if isinstance(url, unicode):
       
   752             url = url.encode(req.encoding)  # req.setup_params() expects encoded strings
       
   753         querystring = urlparse(url)[-2]
       
   754         params = parse_qs(querystring)
       
   755         req.setup_params(params)
       
   756         return req
       
   757 
       
   758     @contextmanager
       
   759     def admin_request_from_url(self, url):
       
   760         """parses `url` and builds the corresponding CW-web request
       
   761 
       
   762         req.form will be setup using the url's query string
       
   763         """
       
   764         with self.admin_access.web_request(url=url) as req:
       
   765             if isinstance(url, unicode):
       
   766                 url = url.encode(req.encoding)  # req.setup_params() expects encoded strings
       
   767             querystring = urlparse(url)[-2]
       
   768             params = parse_qs(querystring)
       
   769             req.setup_params(params)
       
   770             yield req
       
   771 
       
   772     def url_publish(self, url, data=None):
       
   773         """takes `url`, uses application's app_resolver to find the appropriate
       
   774         controller and result set, then publishes the result.
       
   775 
       
   776         To simulate post of www-form-encoded data, give a `data` dictionary
       
   777         containing desired key/value associations.
       
   778 
       
   779         This should pretty much correspond to what occurs in a real CW server
       
   780         except the apache-rewriter component is not called.
       
   781         """
       
   782         with self.admin_request_from_url(url) as req:
       
   783             if data is not None:
       
   784                 req.form.update(data)
       
   785             ctrlid, rset = self.app.url_resolver.process(req, req.relative_path(False))
       
   786             return self.ctrl_publish(req, ctrlid, rset)
       
   787 
       
   788     def http_publish(self, url, data=None):
       
   789         """like `url_publish`, except this returns a http response, even in case
       
   790         of errors. You may give form parameters using the `data` argument.
       
   791         """
       
   792         with self.admin_request_from_url(url) as req:
       
   793             if data is not None:
       
   794                 req.form.update(data)
       
   795             with real_error_handling(self.app):
       
   796                 result = self.app_handle_request(req, req.relative_path(False))
       
   797             return result, req
       
   798 
       
   799     @staticmethod
       
   800     def _parse_location(req, location):
       
   801         try:
       
   802             path, params = location.split('?', 1)
       
   803         except ValueError:
       
   804             path = location
       
   805             params = {}
       
   806         else:
       
   807             cleanup = lambda p: (p[0], urlunquote(p[1]))
       
   808             params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p)
       
   809         if path.startswith(req.base_url()):  # may be relative
       
   810             path = path[len(req.base_url()):]
       
   811         return path, params
       
   812 
       
   813     def expect_redirect(self, callback, req):
       
   814         """call the given callback with req as argument, expecting to get a
       
   815         Redirect exception
       
   816         """
       
   817         try:
       
   818             callback(req)
       
   819         except Redirect as ex:
       
   820             return self._parse_location(req, ex.location)
       
   821         else:
       
   822             self.fail('expected a Redirect exception')
       
   823 
       
   824     def expect_redirect_handle_request(self, req, path='edit'):
       
   825         """call the publish method of the application publisher, expecting to
       
   826         get a Redirect exception
       
   827         """
       
   828         self.app_handle_request(req, path)
       
   829         self.assertTrue(300 <= req.status_out < 400, req.status_out)
       
   830         location = req.get_response_header('location')
       
   831         return self._parse_location(req, location)
       
   832 
       
   833     @deprecated("[3.15] expect_redirect_handle_request is the new and better way"
       
   834                 " (beware of small semantic changes)")
       
   835     def expect_redirect_publish(self, *args, **kwargs):
       
   836         return self.expect_redirect_handle_request(*args, **kwargs)
       
   837 
       
   838     def set_auth_mode(self, authmode, anonuser=None):
       
   839         self.set_option('auth-mode', authmode)
       
   840         self.set_option('anonymous-user', anonuser)
       
   841         if anonuser is None:
       
   842             self.config.anonymous_credential = None
       
   843         else:
       
   844             self.config.anonymous_credential = (anonuser, anonuser)
       
   845 
       
   846     def init_authentication(self, authmode, anonuser=None):
       
   847         self.set_auth_mode(authmode, anonuser)
       
   848         req = self.requestcls(self.vreg, url='login')
       
   849         sh = self.app.session_handler
       
   850         authm = sh.session_manager.authmanager
       
   851         authm.anoninfo = self.vreg.config.anonymous_user()
       
   852         authm.anoninfo = authm.anoninfo[0], {'password': authm.anoninfo[1]}
       
   853         # not properly cleaned between tests
       
   854         self.open_sessions = sh.session_manager._sessions = {}
       
   855         return req, self.session
       
   856 
       
   857     def assertAuthSuccess(self, req, origsession, nbsessions=1):
       
   858         sh = self.app.session_handler
       
   859         session = self.app.get_session(req)
       
   860         cnx = repoapi.Connection(session)
       
   861         req.set_cnx(cnx)
       
   862         self.assertEqual(len(self.open_sessions), nbsessions, self.open_sessions)
       
   863         self.assertEqual(session.login, origsession.login)
       
   864         self.assertEqual(session.anonymous_session, False)
       
   865 
       
   866     def assertAuthFailure(self, req, nbsessions=0):
       
   867         with self.assertRaises(AuthenticationError):
       
   868             self.app.get_session(req)
       
   869         # +0 since we do not track the opened session
       
   870         self.assertEqual(len(self.open_sessions), nbsessions)
       
   871         clear_cache(req, 'get_authorization')
       
   872 
       
   873     # content validation #######################################################
       
   874 
       
   875     # validators are used to validate (XML, DTD, whatever) view's content
       
   876     # validators availables are :
       
   877     #  DTDValidator : validates XML + declared DTD
       
   878     #  SaxOnlyValidator : guarantees XML is well formed
       
   879     #  None : do not try to validate anything
       
   880     # validators used must be imported from from.devtools.htmlparser
       
   881     content_type_validators = {
       
   882         # maps MIME type : validator name
       
   883         #
       
   884         # do not set html validators here, we need HTMLValidator for html
       
   885         # snippets
       
   886         # 'text/html': DTDValidator,
       
   887         # 'application/xhtml+xml': DTDValidator,
       
   888         'application/xml': htmlparser.XMLValidator,
       
   889         'text/xml': htmlparser.XMLValidator,
       
   890         'application/json': JsonValidator,
       
   891         'text/plain': None,
       
   892         'text/comma-separated-values': None,
       
   893         'text/x-vcard': None,
       
   894         'text/calendar': None,
       
   895         'image/png': None,
       
   896         }
       
   897     # maps vid : validator name (override content_type_validators)
       
   898     vid_validators = dict((vid, htmlparser.VALMAP[valkey])
       
   899                           for vid, valkey in VIEW_VALIDATORS.items())
       
   900 
       
   901     def view(self, vid, rset=None, req=None, template='main-template',
       
   902              **kwargs):
       
   903         """This method tests the view `vid` on `rset` using `template`
       
   904 
       
   905         If no error occurred while rendering the view, the HTML is analyzed
       
   906         and parsed.
       
   907 
       
   908         :returns: an instance of `cubicweb.devtools.htmlparser.PageInfo`
       
   909                   encapsulation the generated HTML
       
   910         """
       
   911         if req is None:
       
   912             if rset is None:
       
   913                 req = self.request()
       
   914             else:
       
   915                 req = rset.req
       
   916         req.form['vid'] = vid
       
   917         viewsreg = self.vreg['views']
       
   918         view = viewsreg.select(vid, req, rset=rset, **kwargs)
       
   919         # set explicit test description
       
   920         if rset is not None:
       
   921             # coerce to "bytes" on py2 because the description will be sent to
       
   922             # sys.stdout/stderr which takes "bytes" on py2 and "unicode" on py3
       
   923             rql = str(rset.printable_rql())
       
   924             self.set_description("testing vid=%s defined in %s with (%s)" % (
       
   925                 vid, view.__module__, rql))
       
   926         else:
       
   927             self.set_description("testing vid=%s defined in %s without rset" % (
       
   928                 vid, view.__module__))
       
   929         if template is None:  # raw view testing, no template
       
   930             viewfunc = view.render
       
   931         else:
       
   932             kwargs['view'] = view
       
   933             viewfunc = lambda **k: viewsreg.main_template(req, template,
       
   934                                                           rset=rset, **kwargs)
       
   935         return self._test_view(viewfunc, view, template, kwargs)
       
   936 
       
   937     def _test_view(self, viewfunc, view, template='main-template', kwargs={}):
       
   938         """this method does the actual call to the view
       
   939 
       
   940         If no error occurred while rendering the view, the HTML is analyzed
       
   941         and parsed.
       
   942 
       
   943         :returns: an instance of `cubicweb.devtools.htmlparser.PageInfo`
       
   944                   encapsulation the generated HTML
       
   945         """
       
   946         try:
       
   947             output = viewfunc(**kwargs)
       
   948         except Exception:
       
   949             # hijack exception: generative tests stop when the exception
       
   950             # is not an AssertionError
       
   951             klass, exc, tcbk = sys.exc_info()
       
   952             try:
       
   953                 msg = '[%s in %s] %s' % (klass, view.__regid__, exc)
       
   954             except Exception:
       
   955                 msg = '[%s in %s] undisplayable exception' % (klass, view.__regid__)
       
   956             exc = AssertionError(msg)
       
   957             exc.__traceback__ = tcbk
       
   958             raise exc
       
   959         return self._check_html(output, view, template)
       
   960 
       
   961     def get_validator(self, view=None, content_type=None, output=None):
       
   962         if view is not None:
       
   963             try:
       
   964                 return self.vid_validators[view.__regid__]()
       
   965             except KeyError:
       
   966                 if content_type is None:
       
   967                     content_type = view.content_type
       
   968         if content_type is None:
       
   969             content_type = 'text/html'
       
   970         if content_type in ('text/html', 'application/xhtml+xml') and output:
       
   971             if output.startswith(b'<!DOCTYPE html>'):
       
   972                 # only check XML well-formness since HTMLValidator isn't html5
       
   973                 # compatible and won't like various other extensions
       
   974                 default_validator = htmlparser.XMLSyntaxValidator
       
   975             elif output.startswith(b'<?xml'):
       
   976                 default_validator = htmlparser.DTDValidator
       
   977             else:
       
   978                 default_validator = htmlparser.HTMLValidator
       
   979         else:
       
   980             default_validator = None
       
   981         validatorclass = self.content_type_validators.get(content_type,
       
   982                                                           default_validator)
       
   983         if validatorclass is None:
       
   984             return
       
   985         return validatorclass()
       
   986 
       
   987     @nocoverage
       
   988     def _check_html(self, output, view, template='main-template'):
       
   989         """raises an exception if the HTML is invalid"""
       
   990         output = output.strip()
       
   991         if isinstance(output, text_type):
       
   992             # XXX
       
   993             output = output.encode('utf-8')
       
   994         validator = self.get_validator(view, output=output)
       
   995         if validator is None:
       
   996             return output  # return raw output if no validator is defined
       
   997         if isinstance(validator, htmlparser.DTDValidator):
       
   998             # XXX remove <canvas> used in progress widget, unknown in html dtd
       
   999             output = re.sub('<canvas.*?></canvas>', '', output)
       
  1000         return self.assertWellFormed(validator, output.strip(), context=view.__regid__)
       
  1001 
       
  1002     def assertWellFormed(self, validator, content, context=None):
       
  1003         try:
       
  1004             return validator.parse_string(content)
       
  1005         except Exception:
       
  1006             # hijack exception: generative tests stop when the exception
       
  1007             # is not an AssertionError
       
  1008             klass, exc, tcbk = sys.exc_info()
       
  1009             if context is None:
       
  1010                 msg = u'[%s]' % (klass,)
       
  1011             else:
       
  1012                 msg = u'[%s in %s]' % (klass, context)
       
  1013             msg = msg.encode(sys.getdefaultencoding(), 'replace')
       
  1014 
       
  1015             try:
       
  1016                 str_exc = str(exc)
       
  1017             except Exception:
       
  1018                 str_exc = 'undisplayable exception'
       
  1019             msg += str_exc.encode(sys.getdefaultencoding(), 'replace')
       
  1020             if content is not None:
       
  1021                 position = getattr(exc, "position", (0,))[0]
       
  1022                 if position:
       
  1023                     # define filter
       
  1024                     if isinstance(content, str):
       
  1025                         content = unicode(content, sys.getdefaultencoding(), 'replace')
       
  1026                     content = validator.preprocess_data(content)
       
  1027                     content = content.splitlines()
       
  1028                     width = int(log(len(content), 10)) + 1
       
  1029                     line_template = " %" + ("%i" % width) + "i: %s"
       
  1030                     # XXX no need to iterate the whole file except to get
       
  1031                     # the line number
       
  1032                     content = u'\n'.join(line_template % (idx + 1, line)
       
  1033                                          for idx, line in enumerate(content)
       
  1034                                          if line_context_filter(idx+1, position))
       
  1035                     msg += u'\nfor content:\n%s' % content
       
  1036             exc = AssertionError(msg)
       
  1037             exc.__traceback__ = tcbk
       
  1038             raise exc
       
  1039 
       
  1040     def assertDocTestFile(self, testfile):
       
  1041         # doctest returns tuple (failure_count, test_count)
       
  1042         with self.admin_access.shell() as mih:
       
  1043             result = mih.process_script(testfile)
       
  1044         if result[0] and result[1]:
       
  1045             raise self.failureException("doctest file '%s' failed"
       
  1046                                         % testfile)
       
  1047 
       
  1048     # notifications ############################################################
       
  1049 
       
  1050     def assertSentEmail(self, subject, recipients=None, nb_msgs=None):
       
  1051         """test recipients in system mailbox for given email subject
       
  1052 
       
  1053         :param subject: email subject to find in mailbox
       
  1054         :param recipients: list of email recipients
       
  1055         :param nb_msgs: expected number of entries
       
  1056         :returns: list of matched emails
       
  1057         """
       
  1058         messages = [email for email in MAILBOX
       
  1059                     if email.message.get('Subject') == subject]
       
  1060         if recipients is not None:
       
  1061             sent_to = set()
       
  1062             for msg in messages:
       
  1063                 sent_to.update(msg.recipients)
       
  1064             self.assertSetEqual(set(recipients), sent_to)
       
  1065         if nb_msgs is not None:
       
  1066             self.assertEqual(len(MAILBOX), nb_msgs)
       
  1067         return messages
       
  1068 
       
  1069 
       
  1070 # auto-populating test classes and utilities ###################################
       
  1071 
       
  1072 from cubicweb.devtools.fill import insert_entity_queries, make_relations_queries
       
  1073 
       
  1074 # XXX cleanup unprotected_entities & all mess
       
  1075 
       
  1076 
       
  1077 def how_many_dict(schema, cnx, how_many, skip):
       
  1078     """given a schema, compute how many entities by type we need to be able to
       
  1079     satisfy relations cardinality.
       
  1080 
       
  1081     The `how_many` argument tells how many entities of which type we want at
       
  1082     least.
       
  1083 
       
  1084     Return a dictionary with entity types as key, and the number of entities for
       
  1085     this type as value.
       
  1086     """
       
  1087     relmap = {}
       
  1088     for rschema in schema.relations():
       
  1089         if rschema.final:
       
  1090             continue
       
  1091         for subj, obj in rschema.rdefs:
       
  1092             card = rschema.rdef(subj, obj).cardinality
       
  1093             # if the relation is mandatory, we'll need at least as many subj and
       
  1094             # obj to satisfy it
       
  1095             if card[0] in '1+' and card[1] in '1?':
       
  1096                 # subj has to be linked to at least one obj,
       
  1097                 # but obj can be linked to only one subj
       
  1098                 # -> we need at least as many subj as obj to satisfy
       
  1099                 #    cardinalities for this relation
       
  1100                 relmap.setdefault((rschema, subj), []).append(str(obj))
       
  1101             if card[1] in '1+' and card[0] in '1?':
       
  1102                 # reverse subj and obj in the above explanation
       
  1103                 relmap.setdefault((rschema, obj), []).append(str(subj))
       
  1104     unprotected = unprotected_entities(schema)
       
  1105     for etype in skip:  # XXX (syt) duh? explain or kill
       
  1106         unprotected.add(etype)
       
  1107     howmanydict = {}
       
  1108     # step 1, compute a base number of each entity types: number of already
       
  1109     # existing entities of this type + `how_many`
       
  1110     for etype in unprotected_entities(schema, strict=True):
       
  1111         howmanydict[str(etype)] = cnx.execute('Any COUNT(X) WHERE X is %s' % etype)[0][0]
       
  1112         if etype in unprotected:
       
  1113             howmanydict[str(etype)] += how_many
       
  1114     # step 2, augment nb entity per types to satisfy cardinality constraints,
       
  1115     # by recomputing for each relation that constrained an entity type:
       
  1116     #
       
  1117     # new num for etype = max(current num, sum(num for possible target etypes))
       
  1118     #
       
  1119     # XXX we should first check there is no cycle then propagate changes
       
  1120     for (rschema, etype), targets in relmap.items():
       
  1121         relfactor = sum(howmanydict[e] for e in targets)
       
  1122         howmanydict[str(etype)] = max(relfactor, howmanydict[etype])
       
  1123     return howmanydict
       
  1124 
       
  1125 
       
  1126 class AutoPopulateTest(CubicWebTC):
       
  1127     """base class for test with auto-populating of the database"""
       
  1128     __abstract__ = True
       
  1129 
       
  1130     test_db_id = 'autopopulate'
       
  1131 
       
  1132     tags = CubicWebTC.tags | Tags('autopopulated')
       
  1133 
       
  1134     pdbclass = CubicWebDebugger
       
  1135     # this is a hook to be able to define a list of rql queries
       
  1136     # that are application dependent and cannot be guessed automatically
       
  1137     application_rql = []
       
  1138 
       
  1139     no_auto_populate = ()
       
  1140     ignored_relations = set()
       
  1141 
       
  1142     def to_test_etypes(self):
       
  1143         return unprotected_entities(self.schema, strict=True)
       
  1144 
       
  1145     def custom_populate(self, how_many, cnx):
       
  1146         pass
       
  1147 
       
  1148     def post_populate(self, cnx):
       
  1149         pass
       
  1150 
       
  1151     @nocoverage
       
  1152     def auto_populate(self, how_many):
       
  1153         """this method populates the database with `how_many` entities
       
  1154         of each possible type. It also inserts random relations between them
       
  1155         """
       
  1156         with self.admin_access.cnx() as cnx:
       
  1157             with cnx.security_enabled(read=False, write=False):
       
  1158                 self._auto_populate(cnx, how_many)
       
  1159                 cnx.commit()
       
  1160 
       
  1161     def _auto_populate(self, cnx, how_many):
       
  1162         self.custom_populate(how_many, cnx)
       
  1163         vreg = self.vreg
       
  1164         howmanydict = how_many_dict(self.schema, cnx, how_many, self.no_auto_populate)
       
  1165         for etype in unprotected_entities(self.schema):
       
  1166             if etype in self.no_auto_populate:
       
  1167                 continue
       
  1168             nb = howmanydict.get(etype, how_many)
       
  1169             for rql, args in insert_entity_queries(etype, self.schema, vreg, nb):
       
  1170                 cnx.execute(rql, args)
       
  1171         edict = {}
       
  1172         for etype in unprotected_entities(self.schema, strict=True):
       
  1173             rset = cnx.execute('%s X' % etype)
       
  1174             edict[str(etype)] = set(row[0] for row in rset.rows)
       
  1175         existingrels = {}
       
  1176         ignored_relations = SYSTEM_RELATIONS | self.ignored_relations
       
  1177         for rschema in self.schema.relations():
       
  1178             if rschema.final or rschema in ignored_relations:
       
  1179                 continue
       
  1180             rset = cnx.execute('DISTINCT Any X,Y WHERE X %s Y' % rschema)
       
  1181             existingrels.setdefault(rschema.type, set()).update((x, y) for x, y in rset)
       
  1182         q = make_relations_queries(self.schema, edict, cnx, ignored_relations,
       
  1183                                    existingrels=existingrels)
       
  1184         for rql, args in q:
       
  1185             try:
       
  1186                 cnx.execute(rql, args)
       
  1187             except ValidationError as ex:
       
  1188                 # failed to satisfy some constraint
       
  1189                 print('error in automatic db population', ex)
       
  1190                 cnx.commit_state = None  # reset uncommitable flag
       
  1191         self.post_populate(cnx)
       
  1192 
       
  1193     def iter_individual_rsets(self, etypes=None, limit=None):
       
  1194         etypes = etypes or self.to_test_etypes()
       
  1195         with self.admin_access.web_request() as req:
       
  1196             for etype in etypes:
       
  1197                 if limit:
       
  1198                     rql = 'Any X LIMIT %s WHERE X is %s' % (limit, etype)
       
  1199                 else:
       
  1200                     rql = 'Any X WHERE X is %s' % etype
       
  1201                 rset = req.execute(rql)
       
  1202                 for row in range(len(rset)):
       
  1203                     if limit and row > limit:
       
  1204                         break
       
  1205                     # XXX iirk
       
  1206                     rset2 = rset.limit(limit=1, offset=row)
       
  1207                     yield rset2
       
  1208 
       
  1209     def iter_automatic_rsets(self, limit=10):
       
  1210         """generates basic resultsets for each entity type"""
       
  1211         etypes = self.to_test_etypes()
       
  1212         if not etypes:
       
  1213             return
       
  1214         with self.admin_access.web_request() as req:
       
  1215             for etype in etypes:
       
  1216                 yield req.execute('Any X LIMIT %s WHERE X is %s' % (limit, etype))
       
  1217             etype1 = etypes.pop()
       
  1218             try:
       
  1219                 etype2 = etypes.pop()
       
  1220             except KeyError:
       
  1221                 etype2 = etype1
       
  1222             # test a mixed query (DISTINCT/GROUP to avoid getting duplicate
       
  1223             # X which make muledit view failing for instance (html validation fails
       
  1224             # because of some duplicate "id" attributes)
       
  1225             yield req.execute('DISTINCT Any X, MAX(Y) GROUPBY X WHERE X is %s, Y is %s' %
       
  1226                               (etype1, etype2))
       
  1227             # test some application-specific queries if defined
       
  1228             for rql in self.application_rql:
       
  1229                 yield req.execute(rql)
       
  1230 
       
  1231     def _test_everything_for(self, rset):
       
  1232         """this method tries to find everything that can be tested
       
  1233         for `rset` and yields a callable test (as needed in generative tests)
       
  1234         """
       
  1235         propdefs = self.vreg['propertydefs']
       
  1236         # make all components visible
       
  1237         for k, v in propdefs.items():
       
  1238             if k.endswith('visible') and not v['default']:
       
  1239                 propdefs[k]['default'] = True
       
  1240         for view in self.list_views_for(rset):
       
  1241             backup_rset = rset.copy(rset.rows, rset.description)
       
  1242             yield InnerTest(self._testname(rset, view.__regid__, 'view'),
       
  1243                             self.view, view.__regid__, rset,
       
  1244                             rset.req.reset_headers(), 'main-template')
       
  1245             # We have to do this because some views modify the
       
  1246             # resultset's syntax tree
       
  1247             rset = backup_rset
       
  1248         for action in self.list_actions_for(rset):
       
  1249             yield InnerTest(self._testname(rset, action.__regid__, 'action'),
       
  1250                             self._test_action, action)
       
  1251         for box in self.list_boxes_for(rset):
       
  1252             w = [].append
       
  1253             yield InnerTest(self._testname(rset, box.__regid__, 'box'), box.render, w)
       
  1254 
       
  1255     @staticmethod
       
  1256     def _testname(rset, objid, objtype):
       
  1257         return '%s_%s_%s' % ('_'.join(rset.column_types(0)), objid, objtype)
       
  1258 
       
  1259 
       
  1260 # concrete class for automated application testing  ############################
       
  1261 
       
  1262 class AutomaticWebTest(AutoPopulateTest):
       
  1263     """import this if you wan automatic tests to be ran"""
       
  1264 
       
  1265     tags = AutoPopulateTest.tags | Tags('web', 'generated')
       
  1266 
       
  1267     def setUp(self):
       
  1268         if self.__class__ is AutomaticWebTest:
       
  1269             # Prevent direct use of AutomaticWebTest to avoid database caching
       
  1270             # issues.
       
  1271             return
       
  1272         super(AutomaticWebTest, self).setUp()
       
  1273 
       
  1274         # access to self.app for proper initialization of the authentication
       
  1275         # machinery (else some views may fail)
       
  1276         self.app
       
  1277 
       
  1278     def test_one_each_config(self):
       
  1279         self.auto_populate(1)
       
  1280         for rset in self.iter_automatic_rsets(limit=1):
       
  1281             for testargs in self._test_everything_for(rset):
       
  1282                 yield testargs
       
  1283 
       
  1284     def test_ten_each_config(self):
       
  1285         self.auto_populate(10)
       
  1286         for rset in self.iter_automatic_rsets(limit=10):
       
  1287             for testargs in self._test_everything_for(rset):
       
  1288                 yield testargs
       
  1289 
       
  1290     def test_startup_views(self):
       
  1291         for vid in self.list_startup_views():
       
  1292             with self.admin_access.web_request() as req:
       
  1293                 yield self.view, vid, None, req
       
  1294 
       
  1295 
       
  1296 # registry instrumentization ###################################################
       
  1297 
       
  1298 def not_selected(vreg, appobject):
       
  1299     try:
       
  1300         vreg._selected[appobject.__class__] -= 1
       
  1301     except (KeyError, AttributeError):
       
  1302         pass
       
  1303 
       
  1304 
       
  1305 # def vreg_instrumentize(testclass):
       
  1306 #     # XXX broken
       
  1307 #     from cubicweb.devtools.apptest import TestEnvironment
       
  1308 #     env = testclass._env = TestEnvironment('data', configcls=testclass.configcls)
       
  1309 #     for reg in env.vreg.values():
       
  1310 #         reg._selected = {}
       
  1311 #         try:
       
  1312 #             orig_select_best = reg.__class__.__orig_select_best
       
  1313 #         except Exception:
       
  1314 #             orig_select_best = reg.__class__._select_best
       
  1315 #         def instr_select_best(self, *args, **kwargs):
       
  1316 #             selected = orig_select_best(self, *args, **kwargs)
       
  1317 #             try:
       
  1318 #                 self._selected[selected.__class__] += 1
       
  1319 #             except KeyError:
       
  1320 #                 self._selected[selected.__class__] = 1
       
  1321 #             except AttributeError:
       
  1322 #                 pass # occurs on reg used to restore database
       
  1323 #             return selected
       
  1324 #         reg.__class__._select_best = instr_select_best
       
  1325 #         reg.__class__.__orig_select_best = orig_select_best
       
  1326 
       
  1327 
       
  1328 # def print_untested_objects(testclass, skipregs=('hooks', 'etypes')):
       
  1329 #     for regname, reg in testclass._env.vreg.items():
       
  1330 #         if regname in skipregs:
       
  1331 #             continue
       
  1332 #         for appobjects in reg.values():
       
  1333 #             for appobject in appobjects:
       
  1334 #                 if not reg._selected.get(appobject):
       
  1335 #                     print 'not tested', regname, appobject