15 # |
15 # |
16 # You should have received a copy of the GNU Lesser General Public License along |
16 # You should have received a copy of the GNU Lesser General Public License along |
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
18 """Base classes and utilities for cubicweb tests""" |
18 """Base classes and utilities for cubicweb tests""" |
19 |
19 |
20 from __future__ import print_function |
|
21 |
|
22 import sys |
20 import sys |
23 import re |
21 import re |
24 import warnings |
|
25 from os.path import dirname, join, abspath |
22 from os.path import dirname, join, abspath |
26 from math import log |
23 from math import log |
27 from contextlib import contextmanager |
24 from contextlib import contextmanager |
28 from inspect import isgeneratorfunction |
25 from inspect import isgeneratorfunction |
29 from itertools import chain |
26 from itertools import chain |
30 |
27 from unittest import TestCase |
31 from six import binary_type, text_type, string_types, reraise |
28 from urllib.parse import urlparse, parse_qs, unquote as urlunquote |
32 from six.moves import range |
|
33 from six.moves.urllib.parse import urlparse, parse_qs, unquote as urlunquote |
|
34 |
29 |
35 import yams.schema |
30 import yams.schema |
36 |
31 |
37 from logilab.common.testlib import Tags, nocoverage |
32 from logilab.common.testlib import Tags, nocoverage |
38 from logilab.common.debugger import Debugger |
33 from logilab.common.debugger import Debugger |
50 from cubicweb.server.hook import SendMailOp |
45 from cubicweb.server.hook import SendMailOp |
51 from cubicweb.devtools import SYSTEM_ENTITIES, SYSTEM_RELATIONS, VIEW_VALIDATORS |
46 from cubicweb.devtools import SYSTEM_ENTITIES, SYSTEM_RELATIONS, VIEW_VALIDATORS |
52 from cubicweb.devtools import fake, htmlparser, DEFAULT_EMPTY_DB_ID |
47 from cubicweb.devtools import fake, htmlparser, DEFAULT_EMPTY_DB_ID |
53 from cubicweb.devtools.fill import insert_entity_queries, make_relations_queries |
48 from cubicweb.devtools.fill import insert_entity_queries, make_relations_queries |
54 from cubicweb.web.views.authentication import Session |
49 from cubicweb.web.views.authentication import Session |
55 |
|
56 if sys.version_info[:2] < (3, 4): |
|
57 from unittest2 import TestCase |
|
58 if not hasattr(TestCase, 'subTest'): |
|
59 raise ImportError('no subTest support in available unittest2') |
|
60 try: |
|
61 from backports.tempfile import TemporaryDirectory # noqa |
|
62 except ImportError: |
|
63 # backports.tempfile not available |
|
64 TemporaryDirectory = None |
|
65 else: |
|
66 from unittest import TestCase |
|
67 from tempfile import TemporaryDirectory # noqa |
|
68 |
|
69 # in python 2.7, DeprecationWarning are not shown anymore by default |
|
70 warnings.filterwarnings('default', category=DeprecationWarning) |
|
71 |
50 |
72 |
51 |
73 # provide a data directory for the test class ################################## |
52 # provide a data directory for the test class ################################## |
74 |
53 |
75 class BaseTestCase(TestCase): |
54 class BaseTestCase(TestCase): |
324 |
303 |
325 def new_access(self, login): |
304 def new_access(self, login): |
326 """provide a new RepoAccess object for a given user |
305 """provide a new RepoAccess object for a given user |
327 |
306 |
328 The access is automatically closed at the end of the test.""" |
307 The access is automatically closed at the end of the test.""" |
329 login = text_type(login) |
|
330 access = RepoAccess(self.repo, login, self.requestcls) |
308 access = RepoAccess(self.repo, login, self.requestcls) |
331 self._open_access.add(access) |
309 self._open_access.add(access) |
332 return access |
310 return access |
333 |
311 |
334 def _close_access(self): |
312 def _close_access(self): |
345 db_handler = devtools.get_test_db_handler(self.config, self.init_config) |
323 db_handler = devtools.get_test_db_handler(self.config, self.init_config) |
346 db_handler.build_db_cache(self.test_db_id, self.pre_setup_database) |
324 db_handler.build_db_cache(self.test_db_id, self.pre_setup_database) |
347 db_handler.restore_database(self.test_db_id) |
325 db_handler.restore_database(self.test_db_id) |
348 self.repo = db_handler.get_repo(startup=True) |
326 self.repo = db_handler.get_repo(startup=True) |
349 # get an admin session (without actual login) |
327 # get an admin session (without actual login) |
350 login = text_type(db_handler.config.default_admin_config['login']) |
328 login = db_handler.config.default_admin_config['login'] |
351 self.admin_access = self.new_access(login) |
329 self.admin_access = self.new_access(login) |
352 |
330 |
353 # config management ######################################################## |
331 # config management ######################################################## |
354 |
332 |
355 @classmethod # XXX could be turned into a regular method |
333 @classmethod # XXX could be turned into a regular method |
363 |
341 |
364 This method will be called by the database handler once the config has |
342 This method will be called by the database handler once the config has |
365 been properly bootstrapped. |
343 been properly bootstrapped. |
366 """ |
344 """ |
367 admincfg = config.default_admin_config |
345 admincfg = config.default_admin_config |
368 cls.admlogin = text_type(admincfg['login']) |
346 cls.admlogin = admincfg['login'] |
369 cls.admpassword = admincfg['password'] |
347 cls.admpassword = admincfg['password'] |
370 # uncomment the line below if you want rql queries to be logged |
348 # uncomment the line below if you want rql queries to be logged |
371 # config.global_set_option('query-log-file', |
349 # config.global_set_option('query-log-file', |
372 # '/tmp/test_rql_log.' + `os.getpid()`) |
350 # '/tmp/test_rql_log.' + `os.getpid()`) |
373 config.global_set_option('log-file', None) |
351 config.global_set_option('log-file', None) |
456 def create_user(self, req, login=None, groups=('users',), password=None, |
434 def create_user(self, req, login=None, groups=('users',), password=None, |
457 email=None, commit=True, **kwargs): |
435 email=None, commit=True, **kwargs): |
458 """create and return a new user entity""" |
436 """create and return a new user entity""" |
459 if password is None: |
437 if password is None: |
460 password = login |
438 password = login |
461 if login is not None: |
|
462 login = text_type(login) |
|
463 user = req.create_entity('CWUser', login=login, |
439 user = req.create_entity('CWUser', login=login, |
464 upassword=password, **kwargs) |
440 upassword=password, **kwargs) |
465 req.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)' |
441 req.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)' |
466 % ','.join(repr(str(g)) for g in groups), |
442 % ','.join(repr(str(g)) for g in groups), |
467 {'x': user.eid}) |
443 {'x': user.eid}) |
468 if email is not None: |
444 if email is not None: |
469 req.create_entity('EmailAddress', address=text_type(email), |
445 req.create_entity('EmailAddress', address=email, |
470 reverse_primary_email=user) |
446 reverse_primary_email=user) |
471 user.cw_clear_relation_cache('in_group', 'subject') |
447 user.cw_clear_relation_cache('in_group', 'subject') |
472 if commit: |
448 if commit: |
473 try: |
449 try: |
474 req.commit() # req is a session |
450 req.commit() # req is a session |
522 (entity type / relation definition). Resulting permissions will be |
498 (entity type / relation definition). Resulting permissions will be |
523 similar to `orig_permissions.update(partial_perms)`. |
499 similar to `orig_permissions.update(partial_perms)`. |
524 """ |
500 """ |
525 torestore = [] |
501 torestore = [] |
526 for erschema, etypeperms in chain(perm_overrides, perm_kwoverrides.items()): |
502 for erschema, etypeperms in chain(perm_overrides, perm_kwoverrides.items()): |
527 if isinstance(erschema, string_types): |
503 if isinstance(erschema, str): |
528 erschema = self.schema[erschema] |
504 erschema = self.schema[erschema] |
529 for action, actionperms in etypeperms.items(): |
505 for action, actionperms in etypeperms.items(): |
530 origperms = erschema.permissions[action] |
506 origperms = erschema.permissions[action] |
531 erschema.set_action_permissions(action, actionperms) |
507 erschema.set_action_permissions(action, actionperms) |
532 torestore.append([erschema, action, origperms]) |
508 torestore.append([erschema, action, origperms]) |
728 """parses `url` and builds the corresponding CW-web request |
704 """parses `url` and builds the corresponding CW-web request |
729 |
705 |
730 req.form will be setup using the url's query string |
706 req.form will be setup using the url's query string |
731 """ |
707 """ |
732 with self.admin_access.web_request(url=url) as req: |
708 with self.admin_access.web_request(url=url) as req: |
733 if isinstance(url, text_type): |
709 if isinstance(url, str): |
734 url = url.encode(req.encoding) # req.setup_params() expects encoded strings |
710 url = url.encode(req.encoding) # req.setup_params() expects encoded strings |
735 querystring = urlparse(url)[-2] |
711 querystring = urlparse(url)[-2] |
736 params = parse_qs(querystring) |
712 params = parse_qs(querystring) |
737 req.setup_params(params) |
713 req.setup_params(params) |
738 yield req |
714 yield req |
909 klass, exc, tcbk = sys.exc_info() |
885 klass, exc, tcbk = sys.exc_info() |
910 try: |
886 try: |
911 msg = '[%s in %s] %s' % (klass, view.__regid__, exc) |
887 msg = '[%s in %s] %s' % (klass, view.__regid__, exc) |
912 except Exception: |
888 except Exception: |
913 msg = '[%s in %s] undisplayable exception' % (klass, view.__regid__) |
889 msg = '[%s in %s] undisplayable exception' % (klass, view.__regid__) |
914 reraise(AssertionError, AssertionError(msg), sys.exc_info()[-1]) |
890 raise AssertionError(msg).with_traceback(sys.exc_info()[-1]) |
915 return self._check_html(output, view, template) |
891 return self._check_html(output, view, template) |
916 |
892 |
917 def get_validator(self, view=None, content_type=None, output=None): |
893 def get_validator(self, view=None, content_type=None, output=None): |
918 if view is not None: |
894 if view is not None: |
919 try: |
895 try: |
942 |
918 |
943 @nocoverage |
919 @nocoverage |
944 def _check_html(self, output, view, template='main-template'): |
920 def _check_html(self, output, view, template='main-template'): |
945 """raises an exception if the HTML is invalid""" |
921 """raises an exception if the HTML is invalid""" |
946 output = output.strip() |
922 output = output.strip() |
947 if isinstance(output, text_type): |
923 if isinstance(output, str): |
948 # XXX |
924 # XXX |
949 output = output.encode('utf-8') |
925 output = output.encode('utf-8') |
950 validator = self.get_validator(view, output=output) |
926 validator = self.get_validator(view, output=output) |
951 if validator is None: |
927 if validator is None: |
952 return output # return raw output if no validator is defined |
928 return output # return raw output if no validator is defined |
975 msg += str_exc.encode(sys.getdefaultencoding(), 'replace') |
951 msg += str_exc.encode(sys.getdefaultencoding(), 'replace') |
976 if content is not None: |
952 if content is not None: |
977 position = getattr(exc, "position", (0,))[0] |
953 position = getattr(exc, "position", (0,))[0] |
978 if position: |
954 if position: |
979 # define filter |
955 # define filter |
980 if isinstance(content, binary_type): |
956 if isinstance(content, bytes): |
981 content = text_type(content, sys.getdefaultencoding(), 'replace') |
957 content = str(content, sys.getdefaultencoding(), 'replace') |
982 content = validator.preprocess_data(content) |
958 content = validator.preprocess_data(content) |
983 content = content.splitlines() |
959 content = content.splitlines() |
984 width = int(log(len(content), 10)) + 1 |
960 width = int(log(len(content), 10)) + 1 |
985 line_template = " %" + ("%i" % width) + "i: %s" |
961 line_template = " %" + ("%i" % width) + "i: %s" |
986 # XXX no need to iterate the whole file except to get |
962 # XXX no need to iterate the whole file except to get |