|
1 # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
3 # |
|
4 # This file is part of CubicWeb. |
|
5 # |
|
6 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
7 # terms of the GNU Lesser General Public License as published by the Free |
|
8 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
9 # any later version. |
|
10 # |
|
11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
13 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
14 # details. |
|
15 # |
|
16 # You should have received a copy of the GNU Lesser General Public License along |
|
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
18 """this module contains base classes and utilities for cubicweb tests""" |
|
19 from __future__ import print_function |
|
20 |
|
21 import sys |
|
22 import re |
|
23 from os.path import dirname, join, abspath |
|
24 from math import log |
|
25 from contextlib import contextmanager |
|
26 from itertools import chain |
|
27 |
|
28 from six import text_type, string_types |
|
29 from six.moves import range |
|
30 from six.moves.urllib.parse import urlparse, parse_qs, unquote as urlunquote |
|
31 |
|
32 import yams.schema |
|
33 |
|
34 from logilab.common.testlib import TestCase, InnerTest, Tags |
|
35 from logilab.common.pytest import nocoverage, pause_trace |
|
36 from logilab.common.debugger import Debugger |
|
37 from logilab.common.umessage import message_from_string |
|
38 from logilab.common.decorators import cached, classproperty, clear_cache, iclassmethod |
|
39 from logilab.common.deprecation import deprecated, class_deprecated |
|
40 from logilab.common.shellutils import getlogin |
|
41 |
|
42 from cubicweb import (ValidationError, NoSelectableObject, AuthenticationError, |
|
43 BadConnectionId) |
|
44 from cubicweb import cwconfig, devtools, web, server, repoapi |
|
45 from cubicweb.utils import json |
|
46 from cubicweb.sobjects import notification |
|
47 from cubicweb.web import Redirect, application, eid_param |
|
48 from cubicweb.server.hook import SendMailOp |
|
49 from cubicweb.server.session import Session |
|
50 from cubicweb.devtools import SYSTEM_ENTITIES, SYSTEM_RELATIONS, VIEW_VALIDATORS |
|
51 from cubicweb.devtools import fake, htmlparser, DEFAULT_EMPTY_DB_ID |
|
52 |
|
53 |
|
54 # low-level utilities ########################################################## |
|
55 |
|
56 class CubicWebDebugger(Debugger): |
|
57 """special debugger class providing a 'view' function which saves some |
|
58 html into a temporary file and open a web browser to examinate it. |
|
59 """ |
|
60 def do_view(self, arg): |
|
61 import webbrowser |
|
62 data = self._getval(arg) |
|
63 with open('/tmp/toto.html', 'w') as toto: |
|
64 toto.write(data) |
|
65 webbrowser.open('file:///tmp/toto.html') |
|
66 |
|
67 |
|
68 def line_context_filter(line_no, center, before=3, after=None): |
|
69 """return true if line are in context |
|
70 |
|
71 if after is None: after = before |
|
72 """ |
|
73 if after is None: |
|
74 after = before |
|
75 return center - before <= line_no <= center + after |
|
76 |
|
77 |
|
78 def unprotected_entities(schema, strict=False): |
|
79 """returned a set of each non final entity type, excluding "system" entities |
|
80 (eg CWGroup, CWUser...) |
|
81 """ |
|
82 if strict: |
|
83 protected_entities = yams.schema.BASE_TYPES |
|
84 else: |
|
85 protected_entities = yams.schema.BASE_TYPES.union(SYSTEM_ENTITIES) |
|
86 return set(schema.entities()) - protected_entities |
|
87 |
|
88 |
|
89 class JsonValidator(object): |
|
90 def parse_string(self, data): |
|
91 return json.loads(data.decode('ascii')) |
|
92 |
|
93 |
|
94 @contextmanager |
|
95 def real_error_handling(app): |
|
96 """By default, CubicWebTC `app` attribute (ie the publisher) is monkey |
|
97 patched so that unexpected error are raised rather than going through the |
|
98 `error_handler` method. |
|
99 |
|
100 By using this context manager you disable this monkey-patching temporarily. |
|
101 Hence when publishihng a request no error will be raised, you'll get |
|
102 req.status_out set to an HTTP error status code and the generated page will |
|
103 usually hold a traceback as HTML. |
|
104 |
|
105 >>> with real_error_handling(app): |
|
106 >>> page = app.handle_request(req) |
|
107 """ |
|
108 # remove the monkey patched error handler |
|
109 fake_error_handler = app.error_handler |
|
110 del app.error_handler |
|
111 # return the app |
|
112 yield app |
|
113 # restore |
|
114 app.error_handler = fake_error_handler |
|
115 |
|
116 |
|
117 # email handling, to test emails sent by an application ######################## |
|
118 |
|
119 MAILBOX = [] |
|
120 |
|
121 |
|
122 class Email(object): |
|
123 """you'll get instances of Email into MAILBOX during tests that trigger |
|
124 some notification. |
|
125 |
|
126 * `msg` is the original message object |
|
127 |
|
128 * `recipients` is a list of email address which are the recipients of this |
|
129 message |
|
130 """ |
|
131 def __init__(self, fromaddr, recipients, msg): |
|
132 self.fromaddr = fromaddr |
|
133 self.recipients = recipients |
|
134 self.msg = msg |
|
135 |
|
136 @property |
|
137 def message(self): |
|
138 return message_from_string(self.msg) |
|
139 |
|
140 @property |
|
141 def subject(self): |
|
142 return self.message.get('Subject') |
|
143 |
|
144 @property |
|
145 def content(self): |
|
146 return self.message.get_payload(decode=True) |
|
147 |
|
148 def __repr__(self): |
|
149 return '<Email to %s with subject %s>' % (','.join(self.recipients), |
|
150 self.message.get('Subject')) |
|
151 |
|
152 |
|
153 # the trick to get email into MAILBOX instead of actually sent: monkey patch |
|
154 # cwconfig.SMTP object |
|
155 class MockSMTP: |
|
156 |
|
157 def __init__(self, server, port): |
|
158 pass |
|
159 |
|
160 def close(self): |
|
161 pass |
|
162 |
|
163 def sendmail(self, fromaddr, recipients, msg): |
|
164 MAILBOX.append(Email(fromaddr, recipients, msg)) |
|
165 |
|
166 cwconfig.SMTP = MockSMTP |
|
167 |
|
168 |
|
169 # Repoaccess utility ###############################################3########### |
|
170 |
|
171 class RepoAccess(object): |
|
172 """An helper to easily create object to access the repo as a specific user |
|
173 |
|
174 Each RepoAccess have it own session. |
|
175 |
|
176 A repo access can create three type of object: |
|
177 |
|
178 .. automethod:: cubicweb.testlib.RepoAccess.cnx |
|
179 .. automethod:: cubicweb.testlib.RepoAccess.web_request |
|
180 |
|
181 The RepoAccess need to be closed to destroy the associated Session. |
|
182 TestCase usually take care of this aspect for the user. |
|
183 |
|
184 .. automethod:: cubicweb.testlib.RepoAccess.close |
|
185 """ |
|
186 |
|
187 def __init__(self, repo, login, requestcls): |
|
188 self._repo = repo |
|
189 self._login = login |
|
190 self.requestcls = requestcls |
|
191 self._session = self._unsafe_connect(login) |
|
192 |
|
193 def _unsafe_connect(self, login, **kwargs): |
|
194 """ a completely unsafe connect method for the tests """ |
|
195 # use an internal connection |
|
196 with self._repo.internal_cnx() as cnx: |
|
197 # try to get a user object |
|
198 user = cnx.find('CWUser', login=login).one() |
|
199 user.groups |
|
200 user.properties |
|
201 user.login |
|
202 session = Session(user, self._repo) |
|
203 self._repo._sessions[session.sessionid] = session |
|
204 user._cw = user.cw_rset.req = session |
|
205 with session.new_cnx() as cnx: |
|
206 self._repo.hm.call_hooks('session_open', cnx) |
|
207 # commit connection at this point in case write operation has been |
|
208 # done during `session_open` hooks |
|
209 cnx.commit() |
|
210 return session |
|
211 |
|
212 @contextmanager |
|
213 def cnx(self): |
|
214 """Context manager returning a server side connection for the user""" |
|
215 with self._session.new_cnx() as cnx: |
|
216 yield cnx |
|
217 |
|
218 # aliases for bw compat |
|
219 client_cnx = repo_cnx = cnx |
|
220 |
|
221 @contextmanager |
|
222 def web_request(self, url=None, headers={}, method='GET', **kwargs): |
|
223 """Context manager returning a web request pre-linked to a client cnx |
|
224 |
|
225 To commit and rollback use:: |
|
226 |
|
227 req.cnx.commit() |
|
228 req.cnx.rolback() |
|
229 """ |
|
230 req = self.requestcls(self._repo.vreg, url=url, headers=headers, |
|
231 method=method, form=kwargs) |
|
232 with self._session.new_cnx() as cnx: |
|
233 req.set_cnx(cnx) |
|
234 yield req |
|
235 |
|
236 def close(self): |
|
237 """Close the session associated to the RepoAccess""" |
|
238 if self._session is not None: |
|
239 self._repo.close(self._session.sessionid) |
|
240 self._session = None |
|
241 |
|
242 @contextmanager |
|
243 def shell(self): |
|
244 from cubicweb.server.migractions import ServerMigrationHelper |
|
245 with self._session.new_cnx() as cnx: |
|
246 mih = ServerMigrationHelper(None, repo=self._repo, cnx=cnx, |
|
247 interactive=False, |
|
248 # hack so it don't try to load fs schema |
|
249 schema=1) |
|
250 yield mih |
|
251 cnx.commit() |
|
252 |
|
253 |
|
254 # base class for cubicweb tests requiring a full cw environments ############### |
|
255 |
|
256 class CubicWebTC(TestCase): |
|
257 """abstract class for test using an apptest environment |
|
258 |
|
259 attributes: |
|
260 |
|
261 * `vreg`, the vregistry |
|
262 * `schema`, self.vreg.schema |
|
263 * `config`, cubicweb configuration |
|
264 * `cnx`, repoapi connection to the repository using an admin user |
|
265 * `session`, server side session associated to `cnx` |
|
266 * `app`, the cubicweb publisher (for web testing) |
|
267 * `repo`, the repository object |
|
268 * `admlogin`, login of the admin user |
|
269 * `admpassword`, password of the admin user |
|
270 * `shell`, create and use shell environment |
|
271 * `anonymous_allowed`: flag telling if anonymous browsing should be allowed |
|
272 """ |
|
273 appid = 'data' |
|
274 configcls = devtools.ApptestConfiguration |
|
275 requestcls = fake.FakeRequest |
|
276 tags = TestCase.tags | Tags('cubicweb', 'cw_repo') |
|
277 test_db_id = DEFAULT_EMPTY_DB_ID |
|
278 |
|
279 # anonymous is logged by default in cubicweb test cases |
|
280 anonymous_allowed = True |
|
281 |
|
282 def __init__(self, *args, **kwargs): |
|
283 self._admin_session = None |
|
284 self.repo = None |
|
285 self._open_access = set() |
|
286 super(CubicWebTC, self).__init__(*args, **kwargs) |
|
287 |
|
288 # repository connection handling ########################################### |
|
289 |
|
290 def new_access(self, login): |
|
291 """provide a new RepoAccess object for a given user |
|
292 |
|
293 The access is automatically closed at the end of the test.""" |
|
294 login = text_type(login) |
|
295 access = RepoAccess(self.repo, login, self.requestcls) |
|
296 self._open_access.add(access) |
|
297 return access |
|
298 |
|
299 def _close_access(self): |
|
300 while self._open_access: |
|
301 try: |
|
302 self._open_access.pop().close() |
|
303 except BadConnectionId: |
|
304 continue # already closed |
|
305 |
|
306 @property |
|
307 def session(self): |
|
308 """return admin session""" |
|
309 return self._admin_session |
|
310 |
|
311 # XXX this doesn't need to a be classmethod anymore |
|
312 def _init_repo(self): |
|
313 """init the repository and connection to it. |
|
314 """ |
|
315 # get or restore and working db. |
|
316 db_handler = devtools.get_test_db_handler(self.config, self.init_config) |
|
317 db_handler.build_db_cache(self.test_db_id, self.pre_setup_database) |
|
318 db_handler.restore_database(self.test_db_id) |
|
319 self.repo = db_handler.get_repo(startup=True) |
|
320 # get an admin session (without actual login) |
|
321 login = text_type(db_handler.config.default_admin_config['login']) |
|
322 self.admin_access = self.new_access(login) |
|
323 self._admin_session = self.admin_access._session |
|
324 |
|
325 # config management ######################################################## |
|
326 |
|
327 @classproperty |
|
328 def config(cls): |
|
329 """return the configuration object |
|
330 |
|
331 Configuration is cached on the test class. |
|
332 """ |
|
333 if cls is CubicWebTC: |
|
334 # Prevent direct use of CubicWebTC directly to avoid database |
|
335 # caching issues |
|
336 return None |
|
337 try: |
|
338 return cls.__dict__['_config'] |
|
339 except KeyError: |
|
340 home = abspath(join(dirname(sys.modules[cls.__module__].__file__), cls.appid)) |
|
341 config = cls._config = cls.configcls(cls.appid, apphome=home) |
|
342 config.mode = 'test' |
|
343 return config |
|
344 |
|
345 @classmethod # XXX could be turned into a regular method |
|
346 def init_config(cls, config): |
|
347 """configuration initialization hooks. |
|
348 |
|
349 You may only want to override here the configuraton logic. |
|
350 |
|
351 Otherwise, consider to use a different :class:`ApptestConfiguration` |
|
352 defined in the `configcls` class attribute. |
|
353 |
|
354 This method will be called by the database handler once the config has |
|
355 been properly bootstrapped. |
|
356 """ |
|
357 admincfg = config.default_admin_config |
|
358 cls.admlogin = text_type(admincfg['login']) |
|
359 cls.admpassword = admincfg['password'] |
|
360 # uncomment the line below if you want rql queries to be logged |
|
361 # config.global_set_option('query-log-file', |
|
362 # '/tmp/test_rql_log.' + `os.getpid()`) |
|
363 config.global_set_option('log-file', None) |
|
364 # set default-dest-addrs to a dumb email address to avoid mailbox or |
|
365 # mail queue pollution |
|
366 config.global_set_option('default-dest-addrs', ['whatever']) |
|
367 send_to = '%s@logilab.fr' % getlogin() |
|
368 config.global_set_option('sender-addr', send_to) |
|
369 config.global_set_option('default-dest-addrs', send_to) |
|
370 config.global_set_option('sender-name', 'cubicweb-test') |
|
371 config.global_set_option('sender-addr', 'cubicweb-test@logilab.fr') |
|
372 # default_base_url on config class isn't enough for TestServerConfiguration |
|
373 config.global_set_option('base-url', config.default_base_url()) |
|
374 # web resources |
|
375 try: |
|
376 config.global_set_option('embed-allowed', re.compile('.*')) |
|
377 except Exception: # not in server only configuration |
|
378 pass |
|
379 |
|
380 @property |
|
381 def vreg(self): |
|
382 return self.repo.vreg |
|
383 |
|
384 # global resources accessors ############################################### |
|
385 |
|
386 @property |
|
387 def schema(self): |
|
388 """return the application schema""" |
|
389 return self.vreg.schema |
|
390 |
|
391 def set_option(self, optname, value): |
|
392 self.config.global_set_option(optname, value) |
|
393 |
|
394 def set_debug(self, debugmode): |
|
395 server.set_debug(debugmode) |
|
396 |
|
397 def debugged(self, debugmode): |
|
398 return server.debugged(debugmode) |
|
399 |
|
400 # default test setup and teardown ######################################### |
|
401 |
|
402 def setUp(self): |
|
403 # monkey patch send mail operation so emails are sent synchronously |
|
404 self._patch_SendMailOp() |
|
405 with pause_trace(): |
|
406 previous_failure = self.__class__.__dict__.get('_repo_init_failed') |
|
407 if previous_failure is not None: |
|
408 self.skipTest('repository is not initialised: %r' % previous_failure) |
|
409 try: |
|
410 self._init_repo() |
|
411 except Exception as ex: |
|
412 self.__class__._repo_init_failed = ex |
|
413 raise |
|
414 self.addCleanup(self._close_access) |
|
415 self.config.set_anonymous_allowed(self.anonymous_allowed) |
|
416 self.setup_database() |
|
417 MAILBOX[:] = [] # reset mailbox |
|
418 |
|
419 def tearDown(self): |
|
420 # XXX hack until logilab.common.testlib is fixed |
|
421 if self._admin_session is not None: |
|
422 self.repo.close(self._admin_session.sessionid) |
|
423 self._admin_session = None |
|
424 while self._cleanups: |
|
425 cleanup, args, kwargs = self._cleanups.pop(-1) |
|
426 cleanup(*args, **kwargs) |
|
427 self.repo.turn_repo_off() |
|
428 |
|
429 def _patch_SendMailOp(self): |
|
430 # monkey patch send mail operation so emails are sent synchronously |
|
431 _old_mail_postcommit_event = SendMailOp.postcommit_event |
|
432 SendMailOp.postcommit_event = SendMailOp.sendmails |
|
433 |
|
434 def reverse_SendMailOp_monkey_patch(): |
|
435 SendMailOp.postcommit_event = _old_mail_postcommit_event |
|
436 |
|
437 self.addCleanup(reverse_SendMailOp_monkey_patch) |
|
438 |
|
439 def setup_database(self): |
|
440 """add your database setup code by overriding this method""" |
|
441 |
|
442 @classmethod |
|
443 def pre_setup_database(cls, cnx, config): |
|
444 """add your pre database setup code by overriding this method |
|
445 |
|
446 Do not forget to set the cls.test_db_id value to enable caching of the |
|
447 result. |
|
448 """ |
|
449 |
|
450 # user / session management ############################################### |
|
451 |
|
452 @deprecated('[3.19] explicitly use RepoAccess object in test instead') |
|
453 def user(self, req=None): |
|
454 """return the application schema""" |
|
455 if req is None: |
|
456 return self.request().user |
|
457 else: |
|
458 return req.user |
|
459 |
|
460 @iclassmethod # XXX turn into a class method |
|
461 def create_user(self, req, login=None, groups=('users',), password=None, |
|
462 email=None, commit=True, **kwargs): |
|
463 """create and return a new user entity""" |
|
464 if password is None: |
|
465 password = login |
|
466 if login is not None: |
|
467 login = text_type(login) |
|
468 user = req.create_entity('CWUser', login=login, |
|
469 upassword=password, **kwargs) |
|
470 req.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)' |
|
471 % ','.join(repr(str(g)) for g in groups), |
|
472 {'x': user.eid}) |
|
473 if email is not None: |
|
474 req.create_entity('EmailAddress', address=text_type(email), |
|
475 reverse_primary_email=user) |
|
476 user.cw_clear_relation_cache('in_group', 'subject') |
|
477 if commit: |
|
478 try: |
|
479 req.commit() # req is a session |
|
480 except AttributeError: |
|
481 req.cnx.commit() |
|
482 return user |
|
483 |
|
484 # other utilities ######################################################### |
|
485 |
|
486 @contextmanager |
|
487 def temporary_appobjects(self, *appobjects): |
|
488 self.vreg._loadedmods.setdefault(self.__module__, {}) |
|
489 for obj in appobjects: |
|
490 self.vreg.register(obj) |
|
491 registered = getattr(obj, '__registered__', None) |
|
492 if registered: |
|
493 for registry in obj.__registries__: |
|
494 registered(self.vreg[registry]) |
|
495 try: |
|
496 yield |
|
497 finally: |
|
498 for obj in appobjects: |
|
499 self.vreg.unregister(obj) |
|
500 |
|
501 @contextmanager |
|
502 def temporary_permissions(self, *perm_overrides, **perm_kwoverrides): |
|
503 """Set custom schema permissions within context. |
|
504 |
|
505 There are two ways to call this method, which may be used together : |
|
506 |
|
507 * using positional argument(s): |
|
508 |
|
509 .. sourcecode:: python |
|
510 |
|
511 rdef = self.schema['CWUser'].rdef('login') |
|
512 with self.temporary_permissions((rdef, {'read': ()})): |
|
513 ... |
|
514 |
|
515 |
|
516 * using named argument(s): |
|
517 |
|
518 .. sourcecode:: python |
|
519 |
|
520 with self.temporary_permissions(CWUser={'read': ()}): |
|
521 ... |
|
522 |
|
523 Usually the former will be preferred to override permissions on a |
|
524 relation definition, while the latter is well suited for entity types. |
|
525 |
|
526 The allowed keys in the permission dictionary depend on the schema type |
|
527 (entity type / relation definition). Resulting permissions will be |
|
528 similar to `orig_permissions.update(partial_perms)`. |
|
529 """ |
|
530 torestore = [] |
|
531 for erschema, etypeperms in chain(perm_overrides, perm_kwoverrides.items()): |
|
532 if isinstance(erschema, string_types): |
|
533 erschema = self.schema[erschema] |
|
534 for action, actionperms in etypeperms.items(): |
|
535 origperms = erschema.permissions[action] |
|
536 erschema.set_action_permissions(action, actionperms) |
|
537 torestore.append([erschema, action, origperms]) |
|
538 try: |
|
539 yield |
|
540 finally: |
|
541 for erschema, action, permissions in torestore: |
|
542 if action is None: |
|
543 erschema.permissions = permissions |
|
544 else: |
|
545 erschema.set_action_permissions(action, permissions) |
|
546 |
|
547 def assertModificationDateGreater(self, entity, olddate): |
|
548 entity.cw_attr_cache.pop('modification_date', None) |
|
549 self.assertGreater(entity.modification_date, olddate) |
|
550 |
|
551 def assertMessageEqual(self, req, params, expected_msg): |
|
552 msg = req.session.data[params['_cwmsgid']] |
|
553 self.assertEqual(expected_msg, msg) |
|
554 |
|
555 # workflow utilities ####################################################### |
|
556 |
|
557 def assertPossibleTransitions(self, entity, expected): |
|
558 transitions = entity.cw_adapt_to('IWorkflowable').possible_transitions() |
|
559 self.assertListEqual(sorted(tr.name for tr in transitions), |
|
560 sorted(expected)) |
|
561 |
|
562 # views and actions registries inspection ################################## |
|
563 |
|
564 def pviews(self, req, rset): |
|
565 return sorted((a.__regid__, a.__class__) |
|
566 for a in self.vreg['views'].possible_views(req, rset=rset)) |
|
567 |
|
568 def pactions(self, req, rset, |
|
569 skipcategories=('addrelated', 'siteactions', 'useractions', |
|
570 'footer', 'manage')): |
|
571 return [(a.__regid__, a.__class__) |
|
572 for a in self.vreg['actions'].poss_visible_objects(req, rset=rset) |
|
573 if a.category not in skipcategories] |
|
574 |
|
575 def pactions_by_cats(self, req, rset, categories=('addrelated',)): |
|
576 return [(a.__regid__, a.__class__) |
|
577 for a in self.vreg['actions'].poss_visible_objects(req, rset=rset) |
|
578 if a.category in categories] |
|
579 |
|
580 def pactionsdict(self, req, rset, |
|
581 skipcategories=('addrelated', 'siteactions', 'useractions', |
|
582 'footer', 'manage')): |
|
583 res = {} |
|
584 for a in self.vreg['actions'].poss_visible_objects(req, rset=rset): |
|
585 if a.category not in skipcategories: |
|
586 res.setdefault(a.category, []).append(a.__class__) |
|
587 return res |
|
588 |
|
589 def action_submenu(self, req, rset, id): |
|
590 return self._test_action(self.vreg['actions'].select(id, req, rset=rset)) |
|
591 |
|
592 def _test_action(self, action): |
|
593 class fake_menu(list): |
|
594 @property |
|
595 def items(self): |
|
596 return self |
|
597 |
|
598 class fake_box(object): |
|
599 def action_link(self, action, **kwargs): |
|
600 return (action.title, action.url()) |
|
601 submenu = fake_menu() |
|
602 action.fill_menu(fake_box(), submenu) |
|
603 return submenu |
|
604 |
|
605 def list_views_for(self, rset): |
|
606 """returns the list of views that can be applied on `rset`""" |
|
607 req = rset.req |
|
608 only_once_vids = ('primary', 'secondary', 'text') |
|
609 req.data['ex'] = ValueError("whatever") |
|
610 viewsvreg = self.vreg['views'] |
|
611 for vid, views in viewsvreg.items(): |
|
612 if vid[0] == '_': |
|
613 continue |
|
614 if rset.rowcount > 1 and vid in only_once_vids: |
|
615 continue |
|
616 views = [view for view in views |
|
617 if view.category != 'startupview' |
|
618 and not issubclass(view, notification.NotificationView) |
|
619 and not isinstance(view, class_deprecated)] |
|
620 if views: |
|
621 try: |
|
622 view = viewsvreg._select_best(views, req, rset=rset) |
|
623 if view is None: |
|
624 raise NoSelectableObject((req,), {'rset': rset}, views) |
|
625 if view.linkable(): |
|
626 yield view |
|
627 else: |
|
628 not_selected(self.vreg, view) |
|
629 # else the view is expected to be used as subview and should |
|
630 # not be tested directly |
|
631 except NoSelectableObject: |
|
632 continue |
|
633 |
|
634 def list_actions_for(self, rset): |
|
635 """returns the list of actions that can be applied on `rset`""" |
|
636 req = rset.req |
|
637 for action in self.vreg['actions'].possible_objects(req, rset=rset): |
|
638 yield action |
|
639 |
|
640 def list_boxes_for(self, rset): |
|
641 """returns the list of boxes that can be applied on `rset`""" |
|
642 req = rset.req |
|
643 for box in self.vreg['ctxcomponents'].possible_objects(req, rset=rset): |
|
644 yield box |
|
645 |
|
646 def list_startup_views(self): |
|
647 """returns the list of startup views""" |
|
648 with self.admin_access.web_request() as req: |
|
649 for view in self.vreg['views'].possible_views(req, None): |
|
650 if view.category == 'startupview': |
|
651 yield view.__regid__ |
|
652 else: |
|
653 not_selected(self.vreg, view) |
|
654 |
|
655 # web ui testing utilities ################################################# |
|
656 |
|
657 @property |
|
658 @cached |
|
659 def app(self): |
|
660 """return a cubicweb publisher""" |
|
661 publisher = application.CubicWebPublisher(self.repo, self.config) |
|
662 |
|
663 def raise_error_handler(*args, **kwargs): |
|
664 raise |
|
665 |
|
666 publisher.error_handler = raise_error_handler |
|
667 return publisher |
|
668 |
|
669 @deprecated('[3.19] use the .remote_calling method') |
|
670 def remote_call(self, fname, *args): |
|
671 """remote json call simulation""" |
|
672 dump = json.dumps |
|
673 args = [dump(arg) for arg in args] |
|
674 req = self.request(fname=fname, pageid='123', arg=args) |
|
675 ctrl = self.vreg['controllers'].select('ajax', req) |
|
676 return ctrl.publish(), req |
|
677 |
|
678 @contextmanager |
|
679 def remote_calling(self, fname, *args): |
|
680 """remote json call simulation""" |
|
681 args = [json.dumps(arg) for arg in args] |
|
682 with self.admin_access.web_request(fname=fname, pageid='123', arg=args) as req: |
|
683 ctrl = self.vreg['controllers'].select('ajax', req) |
|
684 yield ctrl.publish(), req |
|
685 |
|
686 def app_handle_request(self, req, path='view'): |
|
687 return self.app.core_handle(req, path) |
|
688 |
|
689 @deprecated("[3.15] app_handle_request is the new and better way" |
|
690 " (beware of small semantic changes)") |
|
691 def app_publish(self, *args, **kwargs): |
|
692 return self.app_handle_request(*args, **kwargs) |
|
693 |
|
694 def ctrl_publish(self, req, ctrl='edit', rset=None): |
|
695 """call the publish method of the edit controller""" |
|
696 ctrl = self.vreg['controllers'].select(ctrl, req, appli=self.app) |
|
697 try: |
|
698 result = ctrl.publish(rset) |
|
699 req.cnx.commit() |
|
700 except web.Redirect: |
|
701 req.cnx.commit() |
|
702 raise |
|
703 return result |
|
704 |
|
705 @staticmethod |
|
706 def fake_form(formid, field_dict=None, entity_field_dicts=()): |
|
707 """Build _cw.form dictionnary to fake posting of some standard cubicweb form |
|
708 |
|
709 * `formid`, the form id, usually form's __regid__ |
|
710 |
|
711 * `field_dict`, dictionary of name:value for fields that are not tied to an entity |
|
712 |
|
713 * `entity_field_dicts`, list of (entity, dictionary) where dictionary contains name:value |
|
714 for fields that are not tied to the given entity |
|
715 """ |
|
716 assert field_dict or entity_field_dicts, \ |
|
717 'field_dict and entity_field_dicts arguments must not be both unspecified' |
|
718 if field_dict is None: |
|
719 field_dict = {} |
|
720 form = {'__form_id': formid} |
|
721 fields = [] |
|
722 for field, value in field_dict.items(): |
|
723 fields.append(field) |
|
724 form[field] = value |
|
725 |
|
726 def _add_entity_field(entity, field, value): |
|
727 entity_fields.append(field) |
|
728 form[eid_param(field, entity.eid)] = value |
|
729 |
|
730 for entity, field_dict in entity_field_dicts: |
|
731 if '__maineid' not in form: |
|
732 form['__maineid'] = entity.eid |
|
733 entity_fields = [] |
|
734 form.setdefault('eid', []).append(entity.eid) |
|
735 _add_entity_field(entity, '__type', entity.cw_etype) |
|
736 for field, value in field_dict.items(): |
|
737 _add_entity_field(entity, field, value) |
|
738 if entity_fields: |
|
739 form[eid_param('_cw_entity_fields', entity.eid)] = ','.join(entity_fields) |
|
740 if fields: |
|
741 form['_cw_fields'] = ','.join(sorted(fields)) |
|
742 return form |
|
743 |
|
744 @deprecated('[3.19] use .admin_request_from_url instead') |
|
745 def req_from_url(self, url): |
|
746 """parses `url` and builds the corresponding CW-web request |
|
747 |
|
748 req.form will be setup using the url's query string |
|
749 """ |
|
750 req = self.request(url=url) |
|
751 if isinstance(url, unicode): |
|
752 url = url.encode(req.encoding) # req.setup_params() expects encoded strings |
|
753 querystring = urlparse(url)[-2] |
|
754 params = parse_qs(querystring) |
|
755 req.setup_params(params) |
|
756 return req |
|
757 |
|
758 @contextmanager |
|
759 def admin_request_from_url(self, url): |
|
760 """parses `url` and builds the corresponding CW-web request |
|
761 |
|
762 req.form will be setup using the url's query string |
|
763 """ |
|
764 with self.admin_access.web_request(url=url) as req: |
|
765 if isinstance(url, unicode): |
|
766 url = url.encode(req.encoding) # req.setup_params() expects encoded strings |
|
767 querystring = urlparse(url)[-2] |
|
768 params = parse_qs(querystring) |
|
769 req.setup_params(params) |
|
770 yield req |
|
771 |
|
772 def url_publish(self, url, data=None): |
|
773 """takes `url`, uses application's app_resolver to find the appropriate |
|
774 controller and result set, then publishes the result. |
|
775 |
|
776 To simulate post of www-form-encoded data, give a `data` dictionary |
|
777 containing desired key/value associations. |
|
778 |
|
779 This should pretty much correspond to what occurs in a real CW server |
|
780 except the apache-rewriter component is not called. |
|
781 """ |
|
782 with self.admin_request_from_url(url) as req: |
|
783 if data is not None: |
|
784 req.form.update(data) |
|
785 ctrlid, rset = self.app.url_resolver.process(req, req.relative_path(False)) |
|
786 return self.ctrl_publish(req, ctrlid, rset) |
|
787 |
|
788 def http_publish(self, url, data=None): |
|
789 """like `url_publish`, except this returns a http response, even in case |
|
790 of errors. You may give form parameters using the `data` argument. |
|
791 """ |
|
792 with self.admin_request_from_url(url) as req: |
|
793 if data is not None: |
|
794 req.form.update(data) |
|
795 with real_error_handling(self.app): |
|
796 result = self.app_handle_request(req, req.relative_path(False)) |
|
797 return result, req |
|
798 |
|
799 @staticmethod |
|
800 def _parse_location(req, location): |
|
801 try: |
|
802 path, params = location.split('?', 1) |
|
803 except ValueError: |
|
804 path = location |
|
805 params = {} |
|
806 else: |
|
807 cleanup = lambda p: (p[0], urlunquote(p[1])) |
|
808 params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p) |
|
809 if path.startswith(req.base_url()): # may be relative |
|
810 path = path[len(req.base_url()):] |
|
811 return path, params |
|
812 |
|
813 def expect_redirect(self, callback, req): |
|
814 """call the given callback with req as argument, expecting to get a |
|
815 Redirect exception |
|
816 """ |
|
817 try: |
|
818 callback(req) |
|
819 except Redirect as ex: |
|
820 return self._parse_location(req, ex.location) |
|
821 else: |
|
822 self.fail('expected a Redirect exception') |
|
823 |
|
824 def expect_redirect_handle_request(self, req, path='edit'): |
|
825 """call the publish method of the application publisher, expecting to |
|
826 get a Redirect exception |
|
827 """ |
|
828 self.app_handle_request(req, path) |
|
829 self.assertTrue(300 <= req.status_out < 400, req.status_out) |
|
830 location = req.get_response_header('location') |
|
831 return self._parse_location(req, location) |
|
832 |
|
833 @deprecated("[3.15] expect_redirect_handle_request is the new and better way" |
|
834 " (beware of small semantic changes)") |
|
835 def expect_redirect_publish(self, *args, **kwargs): |
|
836 return self.expect_redirect_handle_request(*args, **kwargs) |
|
837 |
|
838 def set_auth_mode(self, authmode, anonuser=None): |
|
839 self.set_option('auth-mode', authmode) |
|
840 self.set_option('anonymous-user', anonuser) |
|
841 if anonuser is None: |
|
842 self.config.anonymous_credential = None |
|
843 else: |
|
844 self.config.anonymous_credential = (anonuser, anonuser) |
|
845 |
|
846 def init_authentication(self, authmode, anonuser=None): |
|
847 self.set_auth_mode(authmode, anonuser) |
|
848 req = self.requestcls(self.vreg, url='login') |
|
849 sh = self.app.session_handler |
|
850 authm = sh.session_manager.authmanager |
|
851 authm.anoninfo = self.vreg.config.anonymous_user() |
|
852 authm.anoninfo = authm.anoninfo[0], {'password': authm.anoninfo[1]} |
|
853 # not properly cleaned between tests |
|
854 self.open_sessions = sh.session_manager._sessions = {} |
|
855 return req, self.session |
|
856 |
|
857 def assertAuthSuccess(self, req, origsession, nbsessions=1): |
|
858 sh = self.app.session_handler |
|
859 session = self.app.get_session(req) |
|
860 cnx = repoapi.Connection(session) |
|
861 req.set_cnx(cnx) |
|
862 self.assertEqual(len(self.open_sessions), nbsessions, self.open_sessions) |
|
863 self.assertEqual(session.login, origsession.login) |
|
864 self.assertEqual(session.anonymous_session, False) |
|
865 |
|
866 def assertAuthFailure(self, req, nbsessions=0): |
|
867 with self.assertRaises(AuthenticationError): |
|
868 self.app.get_session(req) |
|
869 # +0 since we do not track the opened session |
|
870 self.assertEqual(len(self.open_sessions), nbsessions) |
|
871 clear_cache(req, 'get_authorization') |
|
872 |
|
873 # content validation ####################################################### |
|
874 |
|
875 # validators are used to validate (XML, DTD, whatever) view's content |
|
876 # validators availables are : |
|
877 # DTDValidator : validates XML + declared DTD |
|
878 # SaxOnlyValidator : guarantees XML is well formed |
|
879 # None : do not try to validate anything |
|
880 # validators used must be imported from from.devtools.htmlparser |
|
881 content_type_validators = { |
|
882 # maps MIME type : validator name |
|
883 # |
|
884 # do not set html validators here, we need HTMLValidator for html |
|
885 # snippets |
|
886 # 'text/html': DTDValidator, |
|
887 # 'application/xhtml+xml': DTDValidator, |
|
888 'application/xml': htmlparser.XMLValidator, |
|
889 'text/xml': htmlparser.XMLValidator, |
|
890 'application/json': JsonValidator, |
|
891 'text/plain': None, |
|
892 'text/comma-separated-values': None, |
|
893 'text/x-vcard': None, |
|
894 'text/calendar': None, |
|
895 'image/png': None, |
|
896 } |
|
897 # maps vid : validator name (override content_type_validators) |
|
898 vid_validators = dict((vid, htmlparser.VALMAP[valkey]) |
|
899 for vid, valkey in VIEW_VALIDATORS.items()) |
|
900 |
|
901 def view(self, vid, rset=None, req=None, template='main-template', |
|
902 **kwargs): |
|
903 """This method tests the view `vid` on `rset` using `template` |
|
904 |
|
905 If no error occurred while rendering the view, the HTML is analyzed |
|
906 and parsed. |
|
907 |
|
908 :returns: an instance of `cubicweb.devtools.htmlparser.PageInfo` |
|
909 encapsulation the generated HTML |
|
910 """ |
|
911 if req is None: |
|
912 if rset is None: |
|
913 req = self.request() |
|
914 else: |
|
915 req = rset.req |
|
916 req.form['vid'] = vid |
|
917 viewsreg = self.vreg['views'] |
|
918 view = viewsreg.select(vid, req, rset=rset, **kwargs) |
|
919 # set explicit test description |
|
920 if rset is not None: |
|
921 # coerce to "bytes" on py2 because the description will be sent to |
|
922 # sys.stdout/stderr which takes "bytes" on py2 and "unicode" on py3 |
|
923 rql = str(rset.printable_rql()) |
|
924 self.set_description("testing vid=%s defined in %s with (%s)" % ( |
|
925 vid, view.__module__, rql)) |
|
926 else: |
|
927 self.set_description("testing vid=%s defined in %s without rset" % ( |
|
928 vid, view.__module__)) |
|
929 if template is None: # raw view testing, no template |
|
930 viewfunc = view.render |
|
931 else: |
|
932 kwargs['view'] = view |
|
933 viewfunc = lambda **k: viewsreg.main_template(req, template, |
|
934 rset=rset, **kwargs) |
|
935 return self._test_view(viewfunc, view, template, kwargs) |
|
936 |
|
937 def _test_view(self, viewfunc, view, template='main-template', kwargs={}): |
|
938 """this method does the actual call to the view |
|
939 |
|
940 If no error occurred while rendering the view, the HTML is analyzed |
|
941 and parsed. |
|
942 |
|
943 :returns: an instance of `cubicweb.devtools.htmlparser.PageInfo` |
|
944 encapsulation the generated HTML |
|
945 """ |
|
946 try: |
|
947 output = viewfunc(**kwargs) |
|
948 except Exception: |
|
949 # hijack exception: generative tests stop when the exception |
|
950 # is not an AssertionError |
|
951 klass, exc, tcbk = sys.exc_info() |
|
952 try: |
|
953 msg = '[%s in %s] %s' % (klass, view.__regid__, exc) |
|
954 except Exception: |
|
955 msg = '[%s in %s] undisplayable exception' % (klass, view.__regid__) |
|
956 exc = AssertionError(msg) |
|
957 exc.__traceback__ = tcbk |
|
958 raise exc |
|
959 return self._check_html(output, view, template) |
|
960 |
|
961 def get_validator(self, view=None, content_type=None, output=None): |
|
962 if view is not None: |
|
963 try: |
|
964 return self.vid_validators[view.__regid__]() |
|
965 except KeyError: |
|
966 if content_type is None: |
|
967 content_type = view.content_type |
|
968 if content_type is None: |
|
969 content_type = 'text/html' |
|
970 if content_type in ('text/html', 'application/xhtml+xml') and output: |
|
971 if output.startswith(b'<!DOCTYPE html>'): |
|
972 # only check XML well-formness since HTMLValidator isn't html5 |
|
973 # compatible and won't like various other extensions |
|
974 default_validator = htmlparser.XMLSyntaxValidator |
|
975 elif output.startswith(b'<?xml'): |
|
976 default_validator = htmlparser.DTDValidator |
|
977 else: |
|
978 default_validator = htmlparser.HTMLValidator |
|
979 else: |
|
980 default_validator = None |
|
981 validatorclass = self.content_type_validators.get(content_type, |
|
982 default_validator) |
|
983 if validatorclass is None: |
|
984 return |
|
985 return validatorclass() |
|
986 |
|
987 @nocoverage |
|
988 def _check_html(self, output, view, template='main-template'): |
|
989 """raises an exception if the HTML is invalid""" |
|
990 output = output.strip() |
|
991 if isinstance(output, text_type): |
|
992 # XXX |
|
993 output = output.encode('utf-8') |
|
994 validator = self.get_validator(view, output=output) |
|
995 if validator is None: |
|
996 return output # return raw output if no validator is defined |
|
997 if isinstance(validator, htmlparser.DTDValidator): |
|
998 # XXX remove <canvas> used in progress widget, unknown in html dtd |
|
999 output = re.sub('<canvas.*?></canvas>', '', output) |
|
1000 return self.assertWellFormed(validator, output.strip(), context=view.__regid__) |
|
1001 |
|
1002 def assertWellFormed(self, validator, content, context=None): |
|
1003 try: |
|
1004 return validator.parse_string(content) |
|
1005 except Exception: |
|
1006 # hijack exception: generative tests stop when the exception |
|
1007 # is not an AssertionError |
|
1008 klass, exc, tcbk = sys.exc_info() |
|
1009 if context is None: |
|
1010 msg = u'[%s]' % (klass,) |
|
1011 else: |
|
1012 msg = u'[%s in %s]' % (klass, context) |
|
1013 msg = msg.encode(sys.getdefaultencoding(), 'replace') |
|
1014 |
|
1015 try: |
|
1016 str_exc = str(exc) |
|
1017 except Exception: |
|
1018 str_exc = 'undisplayable exception' |
|
1019 msg += str_exc.encode(sys.getdefaultencoding(), 'replace') |
|
1020 if content is not None: |
|
1021 position = getattr(exc, "position", (0,))[0] |
|
1022 if position: |
|
1023 # define filter |
|
1024 if isinstance(content, str): |
|
1025 content = unicode(content, sys.getdefaultencoding(), 'replace') |
|
1026 content = validator.preprocess_data(content) |
|
1027 content = content.splitlines() |
|
1028 width = int(log(len(content), 10)) + 1 |
|
1029 line_template = " %" + ("%i" % width) + "i: %s" |
|
1030 # XXX no need to iterate the whole file except to get |
|
1031 # the line number |
|
1032 content = u'\n'.join(line_template % (idx + 1, line) |
|
1033 for idx, line in enumerate(content) |
|
1034 if line_context_filter(idx+1, position)) |
|
1035 msg += u'\nfor content:\n%s' % content |
|
1036 exc = AssertionError(msg) |
|
1037 exc.__traceback__ = tcbk |
|
1038 raise exc |
|
1039 |
|
1040 def assertDocTestFile(self, testfile): |
|
1041 # doctest returns tuple (failure_count, test_count) |
|
1042 with self.admin_access.shell() as mih: |
|
1043 result = mih.process_script(testfile) |
|
1044 if result[0] and result[1]: |
|
1045 raise self.failureException("doctest file '%s' failed" |
|
1046 % testfile) |
|
1047 |
|
1048 # notifications ############################################################ |
|
1049 |
|
1050 def assertSentEmail(self, subject, recipients=None, nb_msgs=None): |
|
1051 """test recipients in system mailbox for given email subject |
|
1052 |
|
1053 :param subject: email subject to find in mailbox |
|
1054 :param recipients: list of email recipients |
|
1055 :param nb_msgs: expected number of entries |
|
1056 :returns: list of matched emails |
|
1057 """ |
|
1058 messages = [email for email in MAILBOX |
|
1059 if email.message.get('Subject') == subject] |
|
1060 if recipients is not None: |
|
1061 sent_to = set() |
|
1062 for msg in messages: |
|
1063 sent_to.update(msg.recipients) |
|
1064 self.assertSetEqual(set(recipients), sent_to) |
|
1065 if nb_msgs is not None: |
|
1066 self.assertEqual(len(MAILBOX), nb_msgs) |
|
1067 return messages |
|
1068 |
|
1069 |
|
1070 # auto-populating test classes and utilities ################################### |
|
1071 |
|
1072 from cubicweb.devtools.fill import insert_entity_queries, make_relations_queries |
|
1073 |
|
1074 # XXX cleanup unprotected_entities & all mess |
|
1075 |
|
1076 |
|
1077 def how_many_dict(schema, cnx, how_many, skip): |
|
1078 """given a schema, compute how many entities by type we need to be able to |
|
1079 satisfy relations cardinality. |
|
1080 |
|
1081 The `how_many` argument tells how many entities of which type we want at |
|
1082 least. |
|
1083 |
|
1084 Return a dictionary with entity types as key, and the number of entities for |
|
1085 this type as value. |
|
1086 """ |
|
1087 relmap = {} |
|
1088 for rschema in schema.relations(): |
|
1089 if rschema.final: |
|
1090 continue |
|
1091 for subj, obj in rschema.rdefs: |
|
1092 card = rschema.rdef(subj, obj).cardinality |
|
1093 # if the relation is mandatory, we'll need at least as many subj and |
|
1094 # obj to satisfy it |
|
1095 if card[0] in '1+' and card[1] in '1?': |
|
1096 # subj has to be linked to at least one obj, |
|
1097 # but obj can be linked to only one subj |
|
1098 # -> we need at least as many subj as obj to satisfy |
|
1099 # cardinalities for this relation |
|
1100 relmap.setdefault((rschema, subj), []).append(str(obj)) |
|
1101 if card[1] in '1+' and card[0] in '1?': |
|
1102 # reverse subj and obj in the above explanation |
|
1103 relmap.setdefault((rschema, obj), []).append(str(subj)) |
|
1104 unprotected = unprotected_entities(schema) |
|
1105 for etype in skip: # XXX (syt) duh? explain or kill |
|
1106 unprotected.add(etype) |
|
1107 howmanydict = {} |
|
1108 # step 1, compute a base number of each entity types: number of already |
|
1109 # existing entities of this type + `how_many` |
|
1110 for etype in unprotected_entities(schema, strict=True): |
|
1111 howmanydict[str(etype)] = cnx.execute('Any COUNT(X) WHERE X is %s' % etype)[0][0] |
|
1112 if etype in unprotected: |
|
1113 howmanydict[str(etype)] += how_many |
|
1114 # step 2, augment nb entity per types to satisfy cardinality constraints, |
|
1115 # by recomputing for each relation that constrained an entity type: |
|
1116 # |
|
1117 # new num for etype = max(current num, sum(num for possible target etypes)) |
|
1118 # |
|
1119 # XXX we should first check there is no cycle then propagate changes |
|
1120 for (rschema, etype), targets in relmap.items(): |
|
1121 relfactor = sum(howmanydict[e] for e in targets) |
|
1122 howmanydict[str(etype)] = max(relfactor, howmanydict[etype]) |
|
1123 return howmanydict |
|
1124 |
|
1125 |
|
1126 class AutoPopulateTest(CubicWebTC): |
|
1127 """base class for test with auto-populating of the database""" |
|
1128 __abstract__ = True |
|
1129 |
|
1130 test_db_id = 'autopopulate' |
|
1131 |
|
1132 tags = CubicWebTC.tags | Tags('autopopulated') |
|
1133 |
|
1134 pdbclass = CubicWebDebugger |
|
1135 # this is a hook to be able to define a list of rql queries |
|
1136 # that are application dependent and cannot be guessed automatically |
|
1137 application_rql = [] |
|
1138 |
|
1139 no_auto_populate = () |
|
1140 ignored_relations = set() |
|
1141 |
|
1142 def to_test_etypes(self): |
|
1143 return unprotected_entities(self.schema, strict=True) |
|
1144 |
|
1145 def custom_populate(self, how_many, cnx): |
|
1146 pass |
|
1147 |
|
1148 def post_populate(self, cnx): |
|
1149 pass |
|
1150 |
|
1151 @nocoverage |
|
1152 def auto_populate(self, how_many): |
|
1153 """this method populates the database with `how_many` entities |
|
1154 of each possible type. It also inserts random relations between them |
|
1155 """ |
|
1156 with self.admin_access.cnx() as cnx: |
|
1157 with cnx.security_enabled(read=False, write=False): |
|
1158 self._auto_populate(cnx, how_many) |
|
1159 cnx.commit() |
|
1160 |
|
1161 def _auto_populate(self, cnx, how_many): |
|
1162 self.custom_populate(how_many, cnx) |
|
1163 vreg = self.vreg |
|
1164 howmanydict = how_many_dict(self.schema, cnx, how_many, self.no_auto_populate) |
|
1165 for etype in unprotected_entities(self.schema): |
|
1166 if etype in self.no_auto_populate: |
|
1167 continue |
|
1168 nb = howmanydict.get(etype, how_many) |
|
1169 for rql, args in insert_entity_queries(etype, self.schema, vreg, nb): |
|
1170 cnx.execute(rql, args) |
|
1171 edict = {} |
|
1172 for etype in unprotected_entities(self.schema, strict=True): |
|
1173 rset = cnx.execute('%s X' % etype) |
|
1174 edict[str(etype)] = set(row[0] for row in rset.rows) |
|
1175 existingrels = {} |
|
1176 ignored_relations = SYSTEM_RELATIONS | self.ignored_relations |
|
1177 for rschema in self.schema.relations(): |
|
1178 if rschema.final or rschema in ignored_relations: |
|
1179 continue |
|
1180 rset = cnx.execute('DISTINCT Any X,Y WHERE X %s Y' % rschema) |
|
1181 existingrels.setdefault(rschema.type, set()).update((x, y) for x, y in rset) |
|
1182 q = make_relations_queries(self.schema, edict, cnx, ignored_relations, |
|
1183 existingrels=existingrels) |
|
1184 for rql, args in q: |
|
1185 try: |
|
1186 cnx.execute(rql, args) |
|
1187 except ValidationError as ex: |
|
1188 # failed to satisfy some constraint |
|
1189 print('error in automatic db population', ex) |
|
1190 cnx.commit_state = None # reset uncommitable flag |
|
1191 self.post_populate(cnx) |
|
1192 |
|
1193 def iter_individual_rsets(self, etypes=None, limit=None): |
|
1194 etypes = etypes or self.to_test_etypes() |
|
1195 with self.admin_access.web_request() as req: |
|
1196 for etype in etypes: |
|
1197 if limit: |
|
1198 rql = 'Any X LIMIT %s WHERE X is %s' % (limit, etype) |
|
1199 else: |
|
1200 rql = 'Any X WHERE X is %s' % etype |
|
1201 rset = req.execute(rql) |
|
1202 for row in range(len(rset)): |
|
1203 if limit and row > limit: |
|
1204 break |
|
1205 # XXX iirk |
|
1206 rset2 = rset.limit(limit=1, offset=row) |
|
1207 yield rset2 |
|
1208 |
|
1209 def iter_automatic_rsets(self, limit=10): |
|
1210 """generates basic resultsets for each entity type""" |
|
1211 etypes = self.to_test_etypes() |
|
1212 if not etypes: |
|
1213 return |
|
1214 with self.admin_access.web_request() as req: |
|
1215 for etype in etypes: |
|
1216 yield req.execute('Any X LIMIT %s WHERE X is %s' % (limit, etype)) |
|
1217 etype1 = etypes.pop() |
|
1218 try: |
|
1219 etype2 = etypes.pop() |
|
1220 except KeyError: |
|
1221 etype2 = etype1 |
|
1222 # test a mixed query (DISTINCT/GROUP to avoid getting duplicate |
|
1223 # X which make muledit view failing for instance (html validation fails |
|
1224 # because of some duplicate "id" attributes) |
|
1225 yield req.execute('DISTINCT Any X, MAX(Y) GROUPBY X WHERE X is %s, Y is %s' % |
|
1226 (etype1, etype2)) |
|
1227 # test some application-specific queries if defined |
|
1228 for rql in self.application_rql: |
|
1229 yield req.execute(rql) |
|
1230 |
|
1231 def _test_everything_for(self, rset): |
|
1232 """this method tries to find everything that can be tested |
|
1233 for `rset` and yields a callable test (as needed in generative tests) |
|
1234 """ |
|
1235 propdefs = self.vreg['propertydefs'] |
|
1236 # make all components visible |
|
1237 for k, v in propdefs.items(): |
|
1238 if k.endswith('visible') and not v['default']: |
|
1239 propdefs[k]['default'] = True |
|
1240 for view in self.list_views_for(rset): |
|
1241 backup_rset = rset.copy(rset.rows, rset.description) |
|
1242 yield InnerTest(self._testname(rset, view.__regid__, 'view'), |
|
1243 self.view, view.__regid__, rset, |
|
1244 rset.req.reset_headers(), 'main-template') |
|
1245 # We have to do this because some views modify the |
|
1246 # resultset's syntax tree |
|
1247 rset = backup_rset |
|
1248 for action in self.list_actions_for(rset): |
|
1249 yield InnerTest(self._testname(rset, action.__regid__, 'action'), |
|
1250 self._test_action, action) |
|
1251 for box in self.list_boxes_for(rset): |
|
1252 w = [].append |
|
1253 yield InnerTest(self._testname(rset, box.__regid__, 'box'), box.render, w) |
|
1254 |
|
1255 @staticmethod |
|
1256 def _testname(rset, objid, objtype): |
|
1257 return '%s_%s_%s' % ('_'.join(rset.column_types(0)), objid, objtype) |
|
1258 |
|
1259 |
|
1260 # concrete class for automated application testing ############################ |
|
1261 |
|
1262 class AutomaticWebTest(AutoPopulateTest): |
|
1263 """import this if you wan automatic tests to be ran""" |
|
1264 |
|
1265 tags = AutoPopulateTest.tags | Tags('web', 'generated') |
|
1266 |
|
1267 def setUp(self): |
|
1268 if self.__class__ is AutomaticWebTest: |
|
1269 # Prevent direct use of AutomaticWebTest to avoid database caching |
|
1270 # issues. |
|
1271 return |
|
1272 super(AutomaticWebTest, self).setUp() |
|
1273 |
|
1274 # access to self.app for proper initialization of the authentication |
|
1275 # machinery (else some views may fail) |
|
1276 self.app |
|
1277 |
|
1278 def test_one_each_config(self): |
|
1279 self.auto_populate(1) |
|
1280 for rset in self.iter_automatic_rsets(limit=1): |
|
1281 for testargs in self._test_everything_for(rset): |
|
1282 yield testargs |
|
1283 |
|
1284 def test_ten_each_config(self): |
|
1285 self.auto_populate(10) |
|
1286 for rset in self.iter_automatic_rsets(limit=10): |
|
1287 for testargs in self._test_everything_for(rset): |
|
1288 yield testargs |
|
1289 |
|
1290 def test_startup_views(self): |
|
1291 for vid in self.list_startup_views(): |
|
1292 with self.admin_access.web_request() as req: |
|
1293 yield self.view, vid, None, req |
|
1294 |
|
1295 |
|
1296 # registry instrumentization ################################################### |
|
1297 |
|
1298 def not_selected(vreg, appobject): |
|
1299 try: |
|
1300 vreg._selected[appobject.__class__] -= 1 |
|
1301 except (KeyError, AttributeError): |
|
1302 pass |
|
1303 |
|
1304 |
|
1305 # def vreg_instrumentize(testclass): |
|
1306 # # XXX broken |
|
1307 # from cubicweb.devtools.apptest import TestEnvironment |
|
1308 # env = testclass._env = TestEnvironment('data', configcls=testclass.configcls) |
|
1309 # for reg in env.vreg.values(): |
|
1310 # reg._selected = {} |
|
1311 # try: |
|
1312 # orig_select_best = reg.__class__.__orig_select_best |
|
1313 # except Exception: |
|
1314 # orig_select_best = reg.__class__._select_best |
|
1315 # def instr_select_best(self, *args, **kwargs): |
|
1316 # selected = orig_select_best(self, *args, **kwargs) |
|
1317 # try: |
|
1318 # self._selected[selected.__class__] += 1 |
|
1319 # except KeyError: |
|
1320 # self._selected[selected.__class__] = 1 |
|
1321 # except AttributeError: |
|
1322 # pass # occurs on reg used to restore database |
|
1323 # return selected |
|
1324 # reg.__class__._select_best = instr_select_best |
|
1325 # reg.__class__.__orig_select_best = orig_select_best |
|
1326 |
|
1327 |
|
1328 # def print_untested_objects(testclass, skipregs=('hooks', 'etypes')): |
|
1329 # for regname, reg in testclass._env.vreg.items(): |
|
1330 # if regname in skipregs: |
|
1331 # continue |
|
1332 # for appobjects in reg.values(): |
|
1333 # for appobject in appobjects: |
|
1334 # if not reg._selected.get(appobject): |
|
1335 # print 'not tested', regname, appobject |