web/test/unittest_application.py
branchtls-sprint
changeset 1802 d628defebc17
parent 1490 6b024694d493
child 1977 606923dff11b
equal deleted inserted replaced
1801:672acc730ce5 1802:d628defebc17
    42 
    42 
    43 
    43 
    44 class RequestBaseTC(TestCase):
    44 class RequestBaseTC(TestCase):
    45     def setUp(self):
    45     def setUp(self):
    46         self.req = FakeRequest()
    46         self.req = FakeRequest()
    47         
    47 
    48 
    48 
    49     def test_list_arg(self):
    49     def test_list_arg(self):
    50         """tests the list_arg() function"""
    50         """tests the list_arg() function"""
    51         list_arg = self.req.list_form_param
    51         list_arg = self.req.list_form_param
    52         self.assertEquals(list_arg('arg3', {}), [])
    52         self.assertEquals(list_arg('arg3', {}), [])
    71         # test on a valid non-view controller
    71         # test on a valid non-view controller
    72         req = FakeRequest(url='login?x=1&y=2')
    72         req = FakeRequest(url='login?x=1&y=2')
    73         self.assertEquals(req.relative_path(False), 'login')
    73         self.assertEquals(req.relative_path(False), 'login')
    74         self.assertEquals(req.from_controller(), 'login')
    74         self.assertEquals(req.from_controller(), 'login')
    75 
    75 
    76         
    76 
    77 class UtilsTC(TestCase):
    77 class UtilsTC(TestCase):
    78     """test suite for misc application utilities"""
    78     """test suite for misc application utilities"""
    79 
    79 
    80     def setUp(self):
    80     def setUp(self):
    81         self.ctrl = FakeController()
    81         self.ctrl = FakeController()
    82     
    82 
    83     #def test_which_mapping(self):
    83     #def test_which_mapping(self):
    84     #    """tests which mapping is used (application or core)"""
    84     #    """tests which mapping is used (application or core)"""
    85     #    init_mapping()
    85     #    init_mapping()
    86     #    from cubicweb.common import mapping
    86     #    from cubicweb.common import mapping
    87     #    self.assertEquals(mapping.MAPPING_USED, 'core')
    87     #    self.assertEquals(mapping.MAPPING_USED, 'core')
    93     def test_execute_linkto(self):
    93     def test_execute_linkto(self):
    94         """tests the execute_linkto() function"""
    94         """tests the execute_linkto() function"""
    95         self.assertEquals(self.ctrl.execute_linkto(), None)
    95         self.assertEquals(self.ctrl.execute_linkto(), None)
    96         self.assertEquals(self.ctrl._cursor.executed,
    96         self.assertEquals(self.ctrl._cursor.executed,
    97                           [])
    97                           [])
    98         
    98 
    99         self.ctrl.set_form({'__linkto' : 'works_for:12_13_14:object',
    99         self.ctrl.set_form({'__linkto' : 'works_for:12_13_14:object',
   100                               'eid': 8})
   100                               'eid': 8})
   101         self.ctrl.execute_linkto()
   101         self.ctrl.execute_linkto()
   102         self.assertEquals(self.ctrl._cursor.executed,
   102         self.assertEquals(self.ctrl._cursor.executed,
   103                           ['SET Y works_for X WHERE X eid 8, Y eid %s' % i
   103                           ['SET Y works_for X WHERE X eid 8, Y eid %s' % i
   108                               'eid': 8})
   108                               'eid': 8})
   109         self.ctrl.execute_linkto()
   109         self.ctrl.execute_linkto()
   110         self.assertEquals(self.ctrl._cursor.executed,
   110         self.assertEquals(self.ctrl._cursor.executed,
   111                           ['SET X works_for Y WHERE X eid 8, Y eid %s' % i
   111                           ['SET X works_for Y WHERE X eid 8, Y eid %s' % i
   112                            for i in (12, 13, 14)])
   112                            for i in (12, 13, 14)])
   113         
   113 
   114 
   114 
   115         self.ctrl.new_cursor()
   115         self.ctrl.new_cursor()
   116         self.ctrl.req.form = {'__linkto' : 'works_for:12_13_14:object'}
   116         self.ctrl.req.form = {'__linkto' : 'works_for:12_13_14:object'}
   117         self.ctrl.execute_linkto(eid=8)
   117         self.ctrl.execute_linkto(eid=8)
   118         self.assertEquals(self.ctrl._cursor.executed,
   118         self.assertEquals(self.ctrl._cursor.executed,
   150                 params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p)
   150                 params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p)
   151             path = path[len(req.base_url()):]
   151             path = path[len(req.base_url()):]
   152             return path, params
   152             return path, params
   153         else:
   153         else:
   154             self.fail('expected a Redirect exception')
   154             self.fail('expected a Redirect exception')
   155         
   155 
   156     def expect_redirect_publish(self, req, path='view'):
   156     def expect_redirect_publish(self, req, path='view'):
   157         return self.expect_redirect(lambda x: self.publish(x, path), req)
   157         return self.expect_redirect(lambda x: self.publish(x, path), req)
   158     
   158 
   159     def test_cnx_user_groups_sync(self):
   159     def test_cnx_user_groups_sync(self):
   160         user = self.user()
   160         user = self.user()
   161         self.assertEquals(user.groups, set(('managers',)))
   161         self.assertEquals(user.groups, set(('managers',)))
   162         self.execute('SET X in_group G WHERE X eid %s, G name "guests"' % user.eid)
   162         self.execute('SET X in_group G WHERE X eid %s, G name "guests"' % user.eid)
   163         user = self.user()
   163         user = self.user()
   166         user = self.user()
   166         user = self.user()
   167         self.assertEquals(user.groups, set(('managers', 'guests')))
   167         self.assertEquals(user.groups, set(('managers', 'guests')))
   168         # cleanup
   168         # cleanup
   169         self.execute('DELETE X in_group G WHERE X eid %s, G name "guests"' % user.eid)
   169         self.execute('DELETE X in_group G WHERE X eid %s, G name "guests"' % user.eid)
   170         self.commit()
   170         self.commit()
   171     
   171 
   172     def test_nonregr_publish1(self):
   172     def test_nonregr_publish1(self):
   173         req = self.request(u'CWEType X WHERE X final FALSE, X meta FALSE')
   173         req = self.request(u'CWEType X WHERE X final FALSE, X meta FALSE')
   174         self.app.publish('view', req)
   174         self.app.publish('view', req)
   175         
   175 
   176     def test_nonregr_publish2(self):
   176     def test_nonregr_publish2(self):
   177         req = self.request(u'Any count(N) WHERE N todo_by U, N is Note, U eid %s'
   177         req = self.request(u'Any count(N) WHERE N todo_by U, N is Note, U eid %s'
   178                            % self.user().eid)
   178                            % self.user().eid)
   179         self.app.publish('view', req)
   179         self.app.publish('view', req)
   180         
   180 
   181     def test_publish_validation_error(self):
   181     def test_publish_validation_error(self):
   182         req = self.request()
   182         req = self.request()
   183         user = self.user()
   183         user = self.user()
   184         req.form = {
   184         req.form = {
   185             'eid':       `user.eid`,
   185             'eid':       `user.eid`,
   202         self.assertEquals(errors.errors['login'], 'required attribute')
   202         self.assertEquals(errors.errors['login'], 'required attribute')
   203 
   203 
   204 
   204 
   205     def test_validation_error_dont_loose_subentity_data(self):
   205     def test_validation_error_dont_loose_subentity_data(self):
   206         """test creation of two linked entities
   206         """test creation of two linked entities
   207         """        
   207         """
   208         req = self.request()
   208         req = self.request()
   209         form = {'eid': ['X', 'Y'],
   209         form = {'eid': ['X', 'Y'],
   210                 '__type:X': 'CWUser',
   210                 '__type:X': 'CWUser',
   211                 # missing required field
   211                 # missing required field
   212                 'login:X': u'', 'edits-login:X': '', 
   212                 'login:X': u'', 'edits-login:X': '',
   213                 'surname:X': u'Mr Ouaoua', 'edits-surname:X': '',
   213                 'surname:X': u'Mr Ouaoua', 'edits-surname:X': '',
   214                 '__type:Y': 'EmailAddress',
   214                 '__type:Y': 'EmailAddress',
   215                 # but email address is set
   215                 # but email address is set
   216                 'address:Y': u'bougloup@logilab.fr', 'edits-address:Y': '',
   216                 'address:Y': u'bougloup@logilab.fr', 'edits-address:Y': '',
   217                 'alias:Y': u'', 'edits-alias:Y': '',
   217                 'alias:Y': u'', 'edits-alias:Y': '',
   227         self.assertUnorderedIterableEquals(forminfo['eidmap'].keys(), ['X', 'Y'])
   227         self.assertUnorderedIterableEquals(forminfo['eidmap'].keys(), ['X', 'Y'])
   228         self.assertEquals(forminfo['errors'].entity, forminfo['eidmap']['X'])
   228         self.assertEquals(forminfo['errors'].entity, forminfo['eidmap']['X'])
   229         self.assertEquals(forminfo['errors'].errors, {'login': 'required attribute',
   229         self.assertEquals(forminfo['errors'].errors, {'login': 'required attribute',
   230                                                       'upassword': 'required attribute'})
   230                                                       'upassword': 'required attribute'})
   231         self.assertEquals(forminfo['values'], form)
   231         self.assertEquals(forminfo['values'], form)
   232         
   232 
   233     def _test_cleaned(self, kwargs, injected, cleaned):
   233     def _test_cleaned(self, kwargs, injected, cleaned):
   234         req = self.request(**kwargs)
   234         req = self.request(**kwargs)
   235         page = self.app.publish('view', req)
   235         page = self.app.publish('view', req)
   236         self.failIf(injected in page, (kwargs, injected))
   236         self.failIf(injected in page, (kwargs, injected))
   237         self.failUnless(cleaned in page, (kwargs, cleaned))
   237         self.failUnless(cleaned in page, (kwargs, cleaned))
   238         
   238 
   239     def test_nonregr_script_kiddies(self):
   239     def test_nonregr_script_kiddies(self):
   240         """test against current script injection"""
   240         """test against current script injection"""
   241         injected = '<i>toto</i>'
   241         injected = '<i>toto</i>'
   242         cleaned = 'toto'
   242         cleaned = 'toto'
   243         for kwargs in ({'__message': injected},
   243         for kwargs in ({'__message': injected},
   244                        {'vid': injected},
   244                        {'vid': injected},
   245                        {'vtitle': injected},
   245                        {'vtitle': injected},
   246                        ):
   246                        ):
   247             yield self._test_cleaned, kwargs, injected, cleaned
   247             yield self._test_cleaned, kwargs, injected, cleaned
   248         
   248 
   249     def test_site_wide_eproperties_sync(self):
   249     def test_site_wide_eproperties_sync(self):
   250         # XXX work in all-in-one configuration but not in twisted for instance
   250         # XXX work in all-in-one configuration but not in twisted for instance
   251         # in which case we need a kindof repo -> http server notification
   251         # in which case we need a kindof repo -> http server notification
   252         # protocol
   252         # protocol
   253         vreg = self.app.vreg
   253         vreg = self.app.vreg
   276         req.form['__fblogin'] = u'turlututu'
   276         req.form['__fblogin'] = u'turlututu'
   277         page = self.publish(req)
   277         page = self.publish(req)
   278         self.failIf(req.cnx is origcnx)
   278         self.failIf(req.cnx is origcnx)
   279         self.assertEquals(req.user.login, 'turlututu')
   279         self.assertEquals(req.user.login, 'turlututu')
   280         self.failUnless('turlututu' in page, page)
   280         self.failUnless('turlututu' in page, page)
   281         
   281 
   282     # authentication tests ####################################################
   282     # authentication tests ####################################################
   283 
   283 
   284     def _init_auth(self, authmode, anonuser=None):
   284     def _init_auth(self, authmode, anonuser=None):
   285         self.set_option('auth-mode', authmode)
   285         self.set_option('auth-mode', authmode)
   286         self.set_option('anonymous-user', anonuser)
   286         self.set_option('anonymous-user', anonuser)
   287         req = self.request()
   287         req = self.request()
   288         origcnx = req.cnx
   288         origcnx = req.cnx
   289         req.cnx = None
   289         req.cnx = None
   290         sh = self.app.session_handler
   290         sh = self.app.session_handler
   291         # not properly cleaned between tests
   291         # not properly cleaned between tests
   292         self.open_sessions = sh.session_manager._sessions = {} 
   292         self.open_sessions = sh.session_manager._sessions = {}
   293         return req, origcnx
   293         return req, origcnx
   294 
   294 
   295     def _test_auth_succeed(self, req, origcnx):
   295     def _test_auth_succeed(self, req, origcnx):
   296         sh = self.app.session_handler
   296         sh = self.app.session_handler
   297         path, params = self.expect_redirect(lambda x: self.app.connect(x), req)
   297         path, params = self.expect_redirect(lambda x: self.app.connect(x), req)
   298         cnx = req.cnx
   298         cnx = req.cnx
   299         self.assertEquals(len(self.open_sessions), 1, self.open_sessions)
   299         self.assertEquals(len(self.open_sessions), 1, self.open_sessions)
   300         self.assertEquals(cnx.login, origcnx.login)
   300         self.assertEquals(cnx.login, origcnx.login)
   301         self.assertEquals(cnx.password, origcnx.password)
   301         self.assertEquals(cnx.password, origcnx.password)
   302         self.assertEquals(cnx.anonymous_connection, False) 
   302         self.assertEquals(cnx.anonymous_connection, False)
   303         self.assertEquals(path, 'view')
   303         self.assertEquals(path, 'view')
   304         self.assertEquals(params, {'__message': 'welcome %s !' % origcnx.login})
   304         self.assertEquals(params, {'__message': 'welcome %s !' % origcnx.login})
   305     
   305 
   306     def _test_auth_fail(self, req):
   306     def _test_auth_fail(self, req):
   307         self.assertRaises(AuthenticationError, self.app.connect, req)
   307         self.assertRaises(AuthenticationError, self.app.connect, req)
   308         self.assertEquals(req.cnx, None)
   308         self.assertEquals(req.cnx, None)
   309         self.assertEquals(len(self.open_sessions), 0) 
   309         self.assertEquals(len(self.open_sessions), 0)
   310         clear_cache(req, 'get_authorization')
   310         clear_cache(req, 'get_authorization')
   311         
   311 
   312     def test_http_auth_no_anon(self):
   312     def test_http_auth_no_anon(self):
   313         req, origcnx = self._init_auth('http')
   313         req, origcnx = self._init_auth('http')
   314         self._test_auth_fail(req)
   314         self._test_auth_fail(req)
   315         self.assertRaises(ExplicitLogin, self.publish, req, 'login')
   315         self.assertRaises(ExplicitLogin, self.publish, req, 'login')
   316         self.assertEquals(req.cnx, None)
   316         self.assertEquals(req.cnx, None)
   317         authstr = base64.encodestring('%s:%s' % (origcnx.login, origcnx.password))
   317         authstr = base64.encodestring('%s:%s' % (origcnx.login, origcnx.password))
   318         req._headers['Authorization'] = 'basic %s' % authstr
   318         req._headers['Authorization'] = 'basic %s' % authstr
   319         self._test_auth_succeed(req, origcnx)
   319         self._test_auth_succeed(req, origcnx)
   320         self.assertRaises(AuthenticationError, self.publish, req, 'logout')
   320         self.assertRaises(AuthenticationError, self.publish, req, 'logout')
   321         self.assertEquals(len(self.open_sessions), 0) 
   321         self.assertEquals(len(self.open_sessions), 0)
   322 
   322 
   323     def test_cookie_auth_no_anon(self):
   323     def test_cookie_auth_no_anon(self):
   324         req, origcnx = self._init_auth('cookie')
   324         req, origcnx = self._init_auth('cookie')
   325         self._test_auth_fail(req)
   325         self._test_auth_fail(req)
   326         form = self.publish(req, 'login')
   326         form = self.publish(req, 'login')
   329         self.assertEquals(req.cnx, None)
   329         self.assertEquals(req.cnx, None)
   330         req.form['__login'] = origcnx.login
   330         req.form['__login'] = origcnx.login
   331         req.form['__password'] = origcnx.password
   331         req.form['__password'] = origcnx.password
   332         self._test_auth_succeed(req, origcnx)
   332         self._test_auth_succeed(req, origcnx)
   333         self.assertRaises(AuthenticationError, self.publish, req, 'logout')
   333         self.assertRaises(AuthenticationError, self.publish, req, 'logout')
   334         self.assertEquals(len(self.open_sessions), 0) 
   334         self.assertEquals(len(self.open_sessions), 0)
   335 
   335 
   336     def test_login_by_email(self):
   336     def test_login_by_email(self):
   337         login = self.request().user.login
   337         login = self.request().user.login
   338         address = login + u'@localhost'
   338         address = login + u'@localhost'
   339         self.execute('INSERT EmailAddress X: X address %(address)s, U primary_email X '
   339         self.execute('INSERT EmailAddress X: X address %(address)s, U primary_email X '
   349         req, origcnx = self._init_auth('cookie')
   349         req, origcnx = self._init_auth('cookie')
   350         req.form['__login'] = address
   350         req.form['__login'] = address
   351         req.form['__password'] = origcnx.password
   351         req.form['__password'] = origcnx.password
   352         self._test_auth_succeed(req, origcnx)
   352         self._test_auth_succeed(req, origcnx)
   353         self.assertRaises(AuthenticationError, self.publish, req, 'logout')
   353         self.assertRaises(AuthenticationError, self.publish, req, 'logout')
   354         self.assertEquals(len(self.open_sessions), 0) 
   354         self.assertEquals(len(self.open_sessions), 0)
   355 
   355 
   356     def _test_auth_anon(self, req):
   356     def _test_auth_anon(self, req):
   357         self.app.connect(req)
   357         self.app.connect(req)
   358         acnx = req.cnx
   358         acnx = req.cnx
   359         self.assertEquals(len(self.open_sessions), 1)
   359         self.assertEquals(len(self.open_sessions), 1)
   360         self.assertEquals(acnx.login, 'anon') 
   360         self.assertEquals(acnx.login, 'anon')
   361         self.assertEquals(acnx.password, 'anon') 
   361         self.assertEquals(acnx.password, 'anon')
   362         self.failUnless(acnx.anonymous_connection)
   362         self.failUnless(acnx.anonymous_connection)
   363         self._reset_cookie(req)
   363         self._reset_cookie(req)
   364         
   364 
   365     def _reset_cookie(self, req):
   365     def _reset_cookie(self, req):
   366         # preparing the suite of the test
   366         # preparing the suite of the test
   367         # set session id in cookie
   367         # set session id in cookie
   368         cookie = Cookie.SimpleCookie()
   368         cookie = Cookie.SimpleCookie()
   369         cookie['__session'] = req.cnx.sessionid
   369         cookie['__session'] = req.cnx.sessionid
   370         req._headers['Cookie'] = cookie['__session'].OutputString()
   370         req._headers['Cookie'] = cookie['__session'].OutputString()
   371         clear_cache(req, 'get_authorization')
   371         clear_cache(req, 'get_authorization')
   372         # reset cnx as if it was a new incoming request
   372         # reset cnx as if it was a new incoming request
   373         req.cnx = None
   373         req.cnx = None
   374         
   374 
   375     def _test_anon_auth_fail(self, req):
   375     def _test_anon_auth_fail(self, req):
   376         self.assertEquals(len(self.open_sessions), 1) 
   376         self.assertEquals(len(self.open_sessions), 1)
   377         self.app.connect(req)
   377         self.app.connect(req)
   378         self.assertEquals(req.message, 'authentication failure')
   378         self.assertEquals(req.message, 'authentication failure')
   379         self.assertEquals(req.cnx.anonymous_connection, True)
   379         self.assertEquals(req.cnx.anonymous_connection, True)
   380         self.assertEquals(len(self.open_sessions), 1) 
   380         self.assertEquals(len(self.open_sessions), 1)
   381         self._reset_cookie(req)
   381         self._reset_cookie(req)
   382         
   382 
   383     def test_http_auth_anon_allowed(self):
   383     def test_http_auth_anon_allowed(self):
   384         req, origcnx = self._init_auth('http', 'anon')
   384         req, origcnx = self._init_auth('http', 'anon')
   385         self._test_auth_anon(req)
   385         self._test_auth_anon(req)
   386         authstr = base64.encodestring('toto:pouet')
   386         authstr = base64.encodestring('toto:pouet')
   387         req._headers['Authorization'] = 'basic %s' % authstr
   387         req._headers['Authorization'] = 'basic %s' % authstr
   388         self._test_anon_auth_fail(req)
   388         self._test_anon_auth_fail(req)
   389         authstr = base64.encodestring('%s:%s' % (origcnx.login, origcnx.password))
   389         authstr = base64.encodestring('%s:%s' % (origcnx.login, origcnx.password))
   390         req._headers['Authorization'] = 'basic %s' % authstr
   390         req._headers['Authorization'] = 'basic %s' % authstr
   391         self._test_auth_succeed(req, origcnx)
   391         self._test_auth_succeed(req, origcnx)
   392         self.assertRaises(AuthenticationError, self.publish, req, 'logout')
   392         self.assertRaises(AuthenticationError, self.publish, req, 'logout')
   393         self.assertEquals(len(self.open_sessions), 0) 
   393         self.assertEquals(len(self.open_sessions), 0)
   394         
   394 
   395     def test_cookie_auth_anon_allowed(self):
   395     def test_cookie_auth_anon_allowed(self):
   396         req, origcnx = self._init_auth('cookie', 'anon')
   396         req, origcnx = self._init_auth('cookie', 'anon')
   397         self._test_auth_anon(req)
   397         self._test_auth_anon(req)
   398         req.form['__login'] = 'toto'
   398         req.form['__login'] = 'toto'
   399         req.form['__password'] = 'pouet'
   399         req.form['__password'] = 'pouet'
   400         self._test_anon_auth_fail(req)
   400         self._test_anon_auth_fail(req)
   401         req.form['__login'] = origcnx.login
   401         req.form['__login'] = origcnx.login
   402         req.form['__password'] = origcnx.password
   402         req.form['__password'] = origcnx.password
   403         self._test_auth_succeed(req, origcnx)
   403         self._test_auth_succeed(req, origcnx)
   404         self.assertRaises(AuthenticationError, self.publish, req, 'logout')
   404         self.assertRaises(AuthenticationError, self.publish, req, 'logout')
   405         self.assertEquals(len(self.open_sessions), 0) 
   405         self.assertEquals(len(self.open_sessions), 0)
   406 
   406 
   407 
   407 
   408 if __name__ == '__main__':
   408 if __name__ == '__main__':
   409     unittest_main()
   409     unittest_main()