cubicweb/web/test/unittest_application.py
changeset 11057 0b59724cb3f2
parent 10890 504a67206fdc
child 11129 97095348b3ee
equal deleted inserted replaced
11052:058bb3dc685f 11057:0b59724cb3f2
       
     1 # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     3 #
       
     4 # This file is part of CubicWeb.
       
     5 #
       
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     7 # terms of the GNU Lesser General Public License as published by the Free
       
     8 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     9 # any later version.
       
    10 #
       
    11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    14 # details.
       
    15 #
       
    16 # You should have received a copy of the GNU Lesser General Public License along
       
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    18 """unit tests for cubicweb.web.application"""
       
    19 
       
    20 import base64
       
    21 
       
    22 from six import text_type
       
    23 from six.moves import http_client
       
    24 from six.moves.http_cookies import SimpleCookie
       
    25 
       
    26 from logilab.common.testlib import TestCase, unittest_main
       
    27 from logilab.common.decorators import clear_cache, classproperty
       
    28 
       
    29 from cubicweb import AuthenticationError
       
    30 from cubicweb import view
       
    31 from cubicweb.devtools.testlib import CubicWebTC, real_error_handling
       
    32 from cubicweb.devtools.fake import FakeRequest
       
    33 from cubicweb.web import LogOut, Redirect, INTERNAL_FIELD_VALUE
       
    34 from cubicweb.web.views.basecontrollers import ViewController
       
    35 from cubicweb.web.application import anonymized_request
       
    36 from cubicweb import repoapi
       
    37 
       
    38 class FakeMapping:
       
    39     """emulates a mapping module"""
       
    40     def __init__(self):
       
    41         self.ENTITIES_MAP = {}
       
    42         self.ATTRIBUTES_MAP = {}
       
    43         self.RELATIONS_MAP = {}
       
    44 
       
    45 class MockCursor:
       
    46     def __init__(self):
       
    47         self.executed = []
       
    48     def execute(self, rql, args=None, build_descr=False):
       
    49         args = args or {}
       
    50         self.executed.append(rql % args)
       
    51 
       
    52 
       
    53 class FakeController(ViewController):
       
    54 
       
    55     def __init__(self, form=None):
       
    56         self._cw = FakeRequest()
       
    57         self._cw.form = form or {}
       
    58         self._cursor = MockCursor()
       
    59         self._cw.execute = self._cursor.execute
       
    60 
       
    61     def new_cursor(self):
       
    62         self._cursor = MockCursor()
       
    63         self._cw.execute = self._cursor.execute
       
    64 
       
    65     def set_form(self, form):
       
    66         self._cw.form = form
       
    67 
       
    68 
       
    69 class RequestBaseTC(TestCase):
       
    70     def setUp(self):
       
    71         self._cw = FakeRequest()
       
    72 
       
    73 
       
    74     def test_list_arg(self):
       
    75         """tests the list_arg() function"""
       
    76         list_arg = self._cw.list_form_param
       
    77         self.assertEqual(list_arg('arg3', {}), [])
       
    78         d = {'arg1' : "value1",
       
    79              'arg2' : ('foo', INTERNAL_FIELD_VALUE,),
       
    80              'arg3' : ['bar']}
       
    81         self.assertEqual(list_arg('arg1', d, True), ['value1'])
       
    82         self.assertEqual(d, {'arg2' : ('foo', INTERNAL_FIELD_VALUE), 'arg3' : ['bar'],})
       
    83         self.assertEqual(list_arg('arg2', d, True), ['foo'])
       
    84         self.assertEqual({'arg3' : ['bar'],}, d)
       
    85         self.assertEqual(list_arg('arg3', d), ['bar',])
       
    86         self.assertEqual({'arg3' : ['bar'],}, d)
       
    87 
       
    88 
       
    89     def test_from_controller(self):
       
    90         self._cw.vreg['controllers'] = {'view': 1, 'login': 1}
       
    91         self.assertEqual(self._cw.from_controller(), 'view')
       
    92         req = FakeRequest(url='project?vid=list')
       
    93         req.vreg['controllers'] = {'view': 1, 'login': 1}
       
    94         # this assertion is just to make sure that relative_path can be
       
    95         # correctly computed as it is used in from_controller()
       
    96         self.assertEqual(req.relative_path(False), 'project')
       
    97         self.assertEqual(req.from_controller(), 'view')
       
    98         # test on a valid non-view controller
       
    99         req = FakeRequest(url='login?x=1&y=2')
       
   100         req.vreg['controllers'] = {'view': 1, 'login': 1}
       
   101         self.assertEqual(req.relative_path(False), 'login')
       
   102         self.assertEqual(req.from_controller(), 'login')
       
   103 
       
   104 
       
   105 class UtilsTC(TestCase):
       
   106     """test suite for misc application utilities"""
       
   107 
       
   108     def setUp(self):
       
   109         self.ctrl = FakeController()
       
   110 
       
   111     #def test_which_mapping(self):
       
   112     #    """tests which mapping is used (application or core)"""
       
   113     #    init_mapping()
       
   114     #    from cubicweb.common import mapping
       
   115     #    self.assertEqual(mapping.MAPPING_USED, 'core')
       
   116     #    sys.modules['mapping'] = FakeMapping()
       
   117     #    init_mapping()
       
   118     #    self.assertEqual(mapping.MAPPING_USED, 'application')
       
   119     #    del sys.modules['mapping']
       
   120 
       
   121     def test_execute_linkto(self):
       
   122         """tests the execute_linkto() function"""
       
   123         self.assertEqual(self.ctrl.execute_linkto(), None)
       
   124         self.assertEqual(self.ctrl._cursor.executed,
       
   125                           [])
       
   126 
       
   127         self.ctrl.set_form({'__linkto' : 'works_for:12_13_14:object',
       
   128                               'eid': 8})
       
   129         self.ctrl.execute_linkto()
       
   130         self.assertEqual(self.ctrl._cursor.executed,
       
   131                           ['SET Y works_for X WHERE X eid 8, Y eid %s' % i
       
   132                            for i in (12, 13, 14)])
       
   133 
       
   134         self.ctrl.new_cursor()
       
   135         self.ctrl.set_form({'__linkto' : 'works_for:12_13_14:subject',
       
   136                               'eid': 8})
       
   137         self.ctrl.execute_linkto()
       
   138         self.assertEqual(self.ctrl._cursor.executed,
       
   139                           ['SET X works_for Y WHERE X eid 8, Y eid %s' % i
       
   140                            for i in (12, 13, 14)])
       
   141 
       
   142 
       
   143         self.ctrl.new_cursor()
       
   144         self.ctrl._cw.form = {'__linkto' : 'works_for:12_13_14:object'}
       
   145         self.ctrl.execute_linkto(eid=8)
       
   146         self.assertEqual(self.ctrl._cursor.executed,
       
   147                           ['SET Y works_for X WHERE X eid 8, Y eid %s' % i
       
   148                            for i in (12, 13, 14)])
       
   149 
       
   150         self.ctrl.new_cursor()
       
   151         self.ctrl.set_form({'__linkto' : 'works_for:12_13_14:subject'})
       
   152         self.ctrl.execute_linkto(eid=8)
       
   153         self.assertEqual(self.ctrl._cursor.executed,
       
   154                           ['SET X works_for Y WHERE X eid 8, Y eid %s' % i
       
   155                            for i in (12, 13, 14)])
       
   156 
       
   157 
       
   158 class ApplicationTC(CubicWebTC):
       
   159 
       
   160     @classproperty
       
   161     def config(cls):
       
   162         try:
       
   163             return cls.__dict__['_config']
       
   164         except KeyError:
       
   165             config = super(ApplicationTC, cls).config
       
   166             config.global_set_option('allow-email-login', True)
       
   167             return config
       
   168 
       
   169     def test_cnx_user_groups_sync(self):
       
   170         with self.admin_access.client_cnx() as cnx:
       
   171             user = cnx.user
       
   172             self.assertEqual(user.groups, set(('managers',)))
       
   173             cnx.execute('SET X in_group G WHERE X eid %s, G name "guests"' % user.eid)
       
   174             user = cnx.user
       
   175             self.assertEqual(user.groups, set(('managers',)))
       
   176             cnx.commit()
       
   177             user = cnx.user
       
   178             self.assertEqual(user.groups, set(('managers', 'guests')))
       
   179             # cleanup
       
   180             cnx.execute('DELETE X in_group G WHERE X eid %s, G name "guests"' % user.eid)
       
   181             cnx.commit()
       
   182 
       
   183     def test_publish_validation_error(self):
       
   184         with self.admin_access.web_request() as req:
       
   185             user = req.user
       
   186             eid = text_type(user.eid)
       
   187             req.form = {
       
   188                 'eid':       eid,
       
   189                 '__type:'+eid:    'CWUser', '_cw_entity_fields:'+eid: 'login-subject',
       
   190                 'login-subject:'+eid:     '', # ERROR: no login specified
       
   191                  # just a sample, missing some necessary information for real life
       
   192                 '__errorurl': 'view?vid=edition...'
       
   193                 }
       
   194             path, params = self.expect_redirect_handle_request(req, 'edit')
       
   195             forminfo = req.session.data['view?vid=edition...']
       
   196             eidmap = forminfo['eidmap']
       
   197             self.assertEqual(eidmap, {})
       
   198             values = forminfo['values']
       
   199             self.assertEqual(values['login-subject:'+eid], '')
       
   200             self.assertEqual(values['eid'], eid)
       
   201             error = forminfo['error']
       
   202             self.assertEqual(error.entity, user.eid)
       
   203             self.assertEqual(error.errors['login-subject'], 'required field')
       
   204 
       
   205 
       
   206     def test_validation_error_dont_loose_subentity_data_ctrl(self):
       
   207         """test creation of two linked entities
       
   208 
       
   209         error occurs on the web controller
       
   210         """
       
   211         with self.admin_access.web_request() as req:
       
   212             # set Y before X to ensure both entities are edited, not only X
       
   213             req.form = {'eid': ['Y', 'X'], '__maineid': 'X',
       
   214                         '__type:X': 'CWUser', '_cw_entity_fields:X': 'login-subject',
       
   215                         # missing required field
       
   216                         'login-subject:X': u'',
       
   217                         # but email address is set
       
   218                         '__type:Y': 'EmailAddress', '_cw_entity_fields:Y': 'address-subject',
       
   219                         'address-subject:Y': u'bougloup@logilab.fr',
       
   220                         'use_email-object:Y': 'X',
       
   221                         # necessary to get validation error handling
       
   222                         '__errorurl': 'view?vid=edition...',
       
   223                         }
       
   224             path, params = self.expect_redirect_handle_request(req, 'edit')
       
   225             forminfo = req.session.data['view?vid=edition...']
       
   226             self.assertEqual(set(forminfo['eidmap']), set('XY'))
       
   227             self.assertEqual(forminfo['eidmap']['X'], None)
       
   228             self.assertIsInstance(forminfo['eidmap']['Y'], int)
       
   229             self.assertEqual(forminfo['error'].entity, 'X')
       
   230             self.assertEqual(forminfo['error'].errors,
       
   231                               {'login-subject': 'required field'})
       
   232             self.assertEqual(forminfo['values'], req.form)
       
   233 
       
   234 
       
   235     def test_validation_error_dont_loose_subentity_data_repo(self):
       
   236         """test creation of two linked entities
       
   237 
       
   238         error occurs on the repository
       
   239         """
       
   240         with self.admin_access.web_request() as req:
       
   241             # set Y before X to ensure both entities are edited, not only X
       
   242             req.form = {'eid': ['Y', 'X'], '__maineid': 'X',
       
   243                         '__type:X': 'CWUser', '_cw_entity_fields:X': 'login-subject,upassword-subject',
       
   244                         # already existent user
       
   245                         'login-subject:X': u'admin',
       
   246                         'upassword-subject:X': u'admin', 'upassword-subject-confirm:X': u'admin',
       
   247                         '__type:Y': 'EmailAddress', '_cw_entity_fields:Y': 'address-subject',
       
   248                         'address-subject:Y': u'bougloup@logilab.fr',
       
   249                         'use_email-object:Y': 'X',
       
   250                         # necessary to get validation error handling
       
   251                         '__errorurl': 'view?vid=edition...',
       
   252                         }
       
   253             path, params = self.expect_redirect_handle_request(req, 'edit')
       
   254             forminfo = req.session.data['view?vid=edition...']
       
   255             self.assertEqual(set(forminfo['eidmap']), set('XY'))
       
   256             self.assertIsInstance(forminfo['eidmap']['X'], int)
       
   257             self.assertIsInstance(forminfo['eidmap']['Y'], int)
       
   258             self.assertEqual(forminfo['error'].entity, forminfo['eidmap']['X'])
       
   259             self.assertEqual(forminfo['error'].errors,
       
   260                               {'login-subject': u'the value "admin" is already used, use another one'})
       
   261             self.assertEqual(forminfo['values'], req.form)
       
   262 
       
   263     def test_ajax_view_raise_arbitrary_error(self):
       
   264         class ErrorAjaxView(view.View):
       
   265             __regid__ = 'test.ajax.error'
       
   266             def call(self):
       
   267                 raise Exception('whatever')
       
   268         with self.temporary_appobjects(ErrorAjaxView):
       
   269             with real_error_handling(self.app) as app:
       
   270                 with self.admin_access.web_request(vid='test.ajax.error') as req:
       
   271                     req.ajax_request = True
       
   272                     page = app.handle_request(req, '')
       
   273         self.assertEqual(http_client.INTERNAL_SERVER_ERROR,
       
   274                          req.status_out)
       
   275 
       
   276     def _test_cleaned(self, kwargs, injected, cleaned):
       
   277         with self.admin_access.web_request(**kwargs) as req:
       
   278             page = self.app_handle_request(req, 'view')
       
   279             self.assertNotIn(injected.encode('ascii'), page)
       
   280             self.assertIn(cleaned.encode('ascii'), page)
       
   281 
       
   282     def test_nonregr_script_kiddies(self):
       
   283         """test against current script injection"""
       
   284         injected = '<i>toto</i>'
       
   285         cleaned = 'toto'
       
   286         for kwargs in ({'vid': injected},
       
   287                        {'vtitle': injected},
       
   288                        ):
       
   289             yield self._test_cleaned, kwargs, injected, cleaned
       
   290 
       
   291     def test_site_wide_eproperties_sync(self):
       
   292         # XXX work in all-in-one configuration but not in twisted for instance
       
   293         # in which case we need a kindof repo -> http server notification
       
   294         # protocol
       
   295         vreg = self.app.vreg
       
   296         # default value
       
   297         self.assertEqual(vreg.property_value('ui.language'), 'en')
       
   298         with self.admin_access.client_cnx() as cnx:
       
   299             cnx.execute('INSERT CWProperty X: X value "fr", X pkey "ui.language"')
       
   300             self.assertEqual(vreg.property_value('ui.language'), 'en')
       
   301             cnx.commit()
       
   302             self.assertEqual(vreg.property_value('ui.language'), 'fr')
       
   303             cnx.execute('SET X value "de" WHERE X pkey "ui.language"')
       
   304             self.assertEqual(vreg.property_value('ui.language'), 'fr')
       
   305             cnx.commit()
       
   306             self.assertEqual(vreg.property_value('ui.language'), 'de')
       
   307             cnx.execute('DELETE CWProperty X WHERE X pkey "ui.language"')
       
   308             self.assertEqual(vreg.property_value('ui.language'), 'de')
       
   309             cnx.commit()
       
   310             self.assertEqual(vreg.property_value('ui.language'), 'en')
       
   311 
       
   312     # authentication tests ####################################################
       
   313 
       
   314     def test_http_auth_no_anon(self):
       
   315         req, origsession = self.init_authentication('http')
       
   316         self.assertAuthFailure(req)
       
   317         self.app.handle_request(req, 'login')
       
   318         self.assertEqual(401, req.status_out)
       
   319         clear_cache(req, 'get_authorization')
       
   320         authstr = base64.encodestring(('%s:%s' % (self.admlogin, self.admpassword)).encode('ascii'))
       
   321         req.set_request_header('Authorization', 'basic %s' % authstr.decode('ascii'))
       
   322         self.assertAuthSuccess(req, origsession)
       
   323         self.assertRaises(LogOut, self.app_handle_request, req, 'logout')
       
   324         self.assertEqual(len(self.open_sessions), 0)
       
   325 
       
   326     def test_cookie_auth_no_anon(self):
       
   327         req, origsession = self.init_authentication('cookie')
       
   328         self.assertAuthFailure(req)
       
   329         try:
       
   330             form = self.app.handle_request(req, 'login')
       
   331         except Redirect as redir:
       
   332             self.fail('anonymous user should get login form')
       
   333         clear_cache(req, 'get_authorization')
       
   334         self.assertIn(b'__login', form)
       
   335         self.assertIn(b'__password', form)
       
   336         self.assertFalse(req.cnx) # Mock cnx are False
       
   337         req.form['__login'] = self.admlogin
       
   338         req.form['__password'] = self.admpassword
       
   339         self.assertAuthSuccess(req, origsession)
       
   340         self.assertRaises(LogOut, self.app_handle_request, req, 'logout')
       
   341         self.assertEqual(len(self.open_sessions), 0)
       
   342 
       
   343     def test_login_by_email(self):
       
   344         with self.admin_access.client_cnx() as cnx:
       
   345             login = cnx.user.login
       
   346             address = login + u'@localhost'
       
   347             cnx.execute('INSERT EmailAddress X: X address %(address)s, U primary_email X '
       
   348                         'WHERE U login %(login)s', {'address': address, 'login': login})
       
   349             cnx.commit()
       
   350         # # option allow-email-login not set
       
   351         req, origsession = self.init_authentication('cookie')
       
   352         # req.form['__login'] = address
       
   353         # req.form['__password'] = self.admpassword
       
   354         # self.assertAuthFailure(req)
       
   355         # option allow-email-login set
       
   356         #origsession.login = address
       
   357         self.set_option('allow-email-login', True)
       
   358         req.form['__login'] = address
       
   359         req.form['__password'] = self.admpassword
       
   360         self.assertAuthSuccess(req, origsession)
       
   361         self.assertRaises(LogOut, self.app_handle_request, req, 'logout')
       
   362         self.assertEqual(len(self.open_sessions), 0)
       
   363 
       
   364     def _reset_cookie(self, req):
       
   365         # preparing the suite of the test
       
   366         # set session id in cookie
       
   367         cookie = SimpleCookie()
       
   368         sessioncookie = self.app.session_handler.session_cookie(req)
       
   369         cookie[sessioncookie] = req.session.sessionid
       
   370         req.set_request_header('Cookie', cookie[sessioncookie].OutputString(),
       
   371                                raw=True)
       
   372         clear_cache(req, 'get_authorization')
       
   373 
       
   374     def _test_auth_anon(self, req):
       
   375         asession = self.app.get_session(req)
       
   376         # important otherwise _reset_cookie will not use the right session
       
   377         req.set_cnx(repoapi.Connection(asession))
       
   378         self.assertEqual(len(self.open_sessions), 1)
       
   379         self.assertEqual(asession.login, 'anon')
       
   380         self.assertTrue(asession.anonymous_session)
       
   381         self._reset_cookie(req)
       
   382 
       
   383     def _test_anon_auth_fail(self, req):
       
   384         self.assertEqual(1, len(self.open_sessions))
       
   385         session = self.app.get_session(req)
       
   386         # important otherwise _reset_cookie will not use the right session
       
   387         req.set_cnx(repoapi.Connection(session))
       
   388         self.assertEqual(req.message, 'authentication failure')
       
   389         self.assertEqual(req.session.anonymous_session, True)
       
   390         self.assertEqual(1, len(self.open_sessions))
       
   391         self._reset_cookie(req)
       
   392 
       
   393     def test_http_auth_anon_allowed(self):
       
   394         req, origsession = self.init_authentication('http', 'anon')
       
   395         self._test_auth_anon(req)
       
   396         authstr = base64.encodestring(b'toto:pouet')
       
   397         req.set_request_header('Authorization', 'basic %s' % authstr.decode('ascii'))
       
   398         self._test_anon_auth_fail(req)
       
   399         authstr = base64.encodestring(('%s:%s' % (self.admlogin, self.admpassword)).encode('ascii'))
       
   400         req.set_request_header('Authorization', 'basic %s' % authstr.decode('ascii'))
       
   401         self.assertAuthSuccess(req, origsession)
       
   402         self.assertRaises(LogOut, self.app_handle_request, req, 'logout')
       
   403         self.assertEqual(len(self.open_sessions), 0)
       
   404 
       
   405     def test_cookie_auth_anon_allowed(self):
       
   406         req, origsession = self.init_authentication('cookie', 'anon')
       
   407         self._test_auth_anon(req)
       
   408         req.form['__login'] = 'toto'
       
   409         req.form['__password'] = 'pouet'
       
   410         self._test_anon_auth_fail(req)
       
   411         req.form['__login'] = self.admlogin
       
   412         req.form['__password'] = self.admpassword
       
   413         self.assertAuthSuccess(req, origsession)
       
   414         self.assertRaises(LogOut, self.app_handle_request, req, 'logout')
       
   415         self.assertEqual(0, len(self.open_sessions))
       
   416 
       
   417     def test_anonymized_request(self):
       
   418         with self.admin_access.web_request() as req:
       
   419             self.assertEqual(self.admlogin, req.session.user.login)
       
   420             # admin should see anon + admin
       
   421             self.assertEqual(2, len(list(req.find('CWUser'))))
       
   422             with anonymized_request(req):
       
   423                 self.assertEqual('anon', req.session.login, 'anon')
       
   424                 # anon should only see anon user
       
   425                 self.assertEqual(1, len(list(req.find('CWUser'))))
       
   426             self.assertEqual(self.admlogin, req.session.login)
       
   427             self.assertEqual(2, len(list(req.find('CWUser'))))
       
   428 
       
   429     def test_non_regr_optional_first_var(self):
       
   430         with self.admin_access.web_request() as req:
       
   431             # expect a rset with None in [0][0]
       
   432             req.form['rql'] = 'rql:Any OV1, X WHERE X custom_workflow OV1?'
       
   433             self.app_handle_request(req)
       
   434 
       
   435 
       
   436 if __name__ == '__main__':
       
   437     unittest_main()