# HG changeset patch # User Sylvain Thénault # Date 1255442387 -7200 # Node ID 706d7bf0ae3d2e84149d183a2be8ad0bbd0229b9 # Parent 9ba2e3253a8868b37b5a365f3aece1f1a729876a factor out code reusable for authentication tests diff -r 9ba2e3253a88 -r 706d7bf0ae3d devtools/testlib.py --- a/devtools/testlib.py Tue Oct 13 15:59:18 2009 +0200 +++ b/devtools/testlib.py Tue Oct 13 15:59:47 2009 +0200 @@ -21,13 +21,14 @@ from logilab.common.pytest import nocoverage, pause_tracing, resume_tracing from logilab.common.debugger import Debugger from logilab.common.umessage import message_from_string -from logilab.common.decorators import cached, classproperty +from logilab.common.decorators import cached, classproperty, clear_cache from logilab.common.deprecation import deprecated -from cubicweb import NoSelectableObject, cwconfig, devtools, web, server +from cubicweb import NoSelectableObject, AuthenticationError +from cubicweb import cwconfig, devtools, web, server from cubicweb.dbapi import repo_connect, ConnectionProperties, ProgrammingError from cubicweb.sobjects import notification -from cubicweb.web import application +from cubicweb.web import Redirect, application from cubicweb.devtools import SYSTEM_ENTITIES, SYSTEM_RELATIONS, VIEW_VALIDATORS from cubicweb.devtools import fake, htmlparser @@ -226,7 +227,7 @@ cls.config.repository = lambda x=None: cls.repo # necessary for authentication tests cls.cnx.login = cls.admlogin - cls.cnx.password = cls.admpassword + cls.cnx.authinfo = {'password': cls.admpassword} @classmethod def _refresh_repo(cls): @@ -497,6 +498,9 @@ ctrl = self.vreg['controllers'].select('json', req) return ctrl.publish(), req + def app_publish(self, req, path='view'): + return self.app.publish(path, req) + def publish(self, req): """call the publish method of the edit controller""" ctrl = self.vreg['controllers'].select('edit', req) @@ -508,23 +512,61 @@ raise return result - def expect_redirect_publish(self, req): - """call the publish method of the edit controller, expecting to get a - Redirect exception.""" + def expect_redirect(self, callback, req): + """call the given callback with req as argument, expecting to get a + Redirect exception + """ try: - self.publish(req) - except web.Redirect, ex: + res = callback(req) + except Redirect, ex: try: path, params = ex.location.split('?', 1) - except: - path, params = ex.location, "" - req._url = path - cleanup = lambda p: (p[0], unquote(p[1])) - params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p) - return req.relative_path(False), params # path.rsplit('/', 1)[-1], params + except ValueError: + path = ex.location + params = {} + else: + cleanup = lambda p: (p[0], unquote(p[1])) + params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p) + path = path[len(req.base_url()):] + return path, params else: self.fail('expected a Redirect exception') + def expect_redirect_publish(self, req, path='view'): + """call the publish method of the application publisher, expecting to + get a Redirect exception + """ + return self.expect_redirect(lambda x: self.publish(x, path), req) + + def init_authentication(self, authmode, anonuser=None): + self.set_option('auth-mode', authmode) + self.set_option('anonymous-user', anonuser) + req = self.request() + origcnx = req.cnx + req.cnx = None + sh = self.app.session_handler + authm = sh.session_manager.authmanager + authm.authinforetreivers[-1].anoninfo = self.vreg.config.anonymous_user() + # not properly cleaned between tests + self.open_sessions = sh.session_manager._sessions = {} + return req, origcnx + + def assertAuthSuccess(self, req, origcnx, nbsessions=1): + sh = self.app.session_handler + path, params = self.expect_redirect(lambda x: self.app.connect(x), req) + cnx = req.cnx + self.assertEquals(len(self.open_sessions), nbsessions, self.open_sessions) + self.assertEquals(cnx.login, origcnx.login) + self.assertEquals(cnx.anonymous_connection, False) + self.assertEquals(path, 'view') + self.assertEquals(params, {'__message': 'welcome %s !' % cnx.user().login}) + + def assertAuthFailure(self, req): + self.assertRaises(AuthenticationError, self.app.connect, req) + self.assertEquals(req.cnx, None) + self.assertEquals(len(self.open_sessions), 0) + clear_cache(req, 'get_authorization') + # content validation ####################################################### # validators are used to validate (XML, DTD, whatever) view's content diff -r 9ba2e3253a88 -r 706d7bf0ae3d web/test/unittest_application.py --- a/web/test/unittest_application.py Tue Oct 13 15:59:18 2009 +0200 +++ b/web/test/unittest_application.py Tue Oct 13 15:59:47 2009 +0200 @@ -144,30 +144,6 @@ raise self.app.error_handler = raise_hdlr - def publish(self, req, path='view'): - return self.app.publish(path, req) - - def expect_redirect(self, callback, req): - try: - res = callback(req) - print res - except Redirect, ex: - try: - path, params = ex.location.split('?', 1) - except ValueError: - path = ex.location - params = {} - else: - cleanup = lambda p: (p[0], unquote(p[1])) - params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p) - path = path[len(req.base_url()):] - return path, params - else: - self.fail('expected a Redirect exception') - - def expect_redirect_publish(self, req, path='view'): - return self.expect_redirect(lambda x: self.publish(x, path), req) - def test_cnx_user_groups_sync(self): user = self.user() self.assertEquals(user.groups, set(('managers',))) @@ -201,7 +177,7 @@ # just a sample, missing some necessary information for real life '__errorurl': 'view?vid=edition...' } - path, params = self.expect_redirect_publish(req, 'edit') + path, params = self.expect_redirect(lambda x: self.app_publish(x, 'edit'), req) forminfo = req.get_session_data('view?vid=edition...') eidmap = forminfo['eidmap'] self.assertEquals(eidmap, {}) @@ -234,7 +210,7 @@ req.form = form # monkey patch edited_eid to ensure both entities are edited, not only X req.edited_eids = lambda : ('Y', 'X') - path, params = self.expect_redirect_publish(req, 'edit') + path, params = self.expect_redirect(lambda x: self.app_publish(x, 'edit'), req) forminfo = req.get_session_data('view?vid=edition...') self.assertUnorderedIterableEquals(forminfo['eidmap'].keys(), ['X', 'Y']) self.assertEquals(forminfo['errors'].entity, forminfo['eidmap']['X']) @@ -286,63 +262,37 @@ req = self.request() origcnx = req.cnx req.form['__fblogin'] = u'turlututu' - page = self.publish(req) + page = self.app_publish(req) self.failIf(req.cnx is origcnx) self.assertEquals(req.user.login, 'turlututu') self.failUnless('turlututu' in page, page) # authentication tests #################################################### - def _init_auth(self, authmode, anonuser=None): - self.set_option('auth-mode', authmode) - self.set_option('anonymous-user', anonuser) - req = self.request() - origcnx = req.cnx - req.cnx = None - sh = self.app.session_handler - # not properly cleaned between tests - self.open_sessions = sh.session_manager._sessions = {} - return req, origcnx - - def _test_auth_succeed(self, req, origcnx): - sh = self.app.session_handler - path, params = self.expect_redirect(lambda x: self.app.connect(x), req) - cnx = req.cnx - self.assertEquals(len(self.open_sessions), 1, self.open_sessions) - self.assertEquals(cnx.login, origcnx.login) - self.assertEquals(cnx.password, origcnx.password) - self.assertEquals(cnx.anonymous_connection, False) - self.assertEquals(path, 'view') - self.assertEquals(params, {'__message': 'welcome %s !' % cnx.user().login}) - - def _test_auth_fail(self, req): - self.assertRaises(AuthenticationError, self.app.connect, req) + def test_http_auth_no_anon(self): + req, origcnx = self.init_authentication('http') + self.assertAuthFailure(req) + self.assertRaises(ExplicitLogin, self.app_publish, req, 'login') self.assertEquals(req.cnx, None) - self.assertEquals(len(self.open_sessions), 0) - clear_cache(req, 'get_authorization') - - def test_http_auth_no_anon(self): - req, origcnx = self._init_auth('http') - self._test_auth_fail(req) - self.assertRaises(ExplicitLogin, self.publish, req, 'login') - self.assertEquals(req.cnx, None) - authstr = base64.encodestring('%s:%s' % (origcnx.login, origcnx.password)) + authstr = base64.encodestring('%s:%s' % (origcnx.login, origcnx.authinfo['password'])) req._headers['Authorization'] = 'basic %s' % authstr - self._test_auth_succeed(req, origcnx) - self.assertRaises(AuthenticationError, self.publish, req, 'logout') + self.assertAuthSuccess(req, origcnx) + self.assertEquals(req.cnx.authinfo, {'password': origcnx.authinfo['password']}) + self.assertRaises(AuthenticationError, self.app_publish, req, 'logout') self.assertEquals(len(self.open_sessions), 0) def test_cookie_auth_no_anon(self): - req, origcnx = self._init_auth('cookie') - self._test_auth_fail(req) - form = self.publish(req, 'login') + req, origcnx = self.init_authentication('cookie') + self.assertAuthFailure(req) + form = self.app_publish(req, 'login') self.failUnless('__login' in form) self.failUnless('__password' in form) self.assertEquals(req.cnx, None) req.form['__login'] = origcnx.login - req.form['__password'] = origcnx.password - self._test_auth_succeed(req, origcnx) - self.assertRaises(AuthenticationError, self.publish, req, 'logout') + req.form['__password'] = origcnx.authinfo['password'] + self.assertAuthSuccess(req, origcnx) + self.assertEquals(req.cnx.authinfo, {'password': origcnx.authinfo['password']}) + self.assertRaises(AuthenticationError, self.app_publish, req, 'logout') self.assertEquals(len(self.open_sessions), 0) def test_login_by_email(self): @@ -352,28 +302,20 @@ 'WHERE U login %(login)s', {'address': address, 'login': login}) self.commit() # option allow-email-login not set - req, origcnx = self._init_auth('cookie') + req, origcnx = self.init_authentication('cookie') req.form['__login'] = address - req.form['__password'] = origcnx.password - self._test_auth_fail(req) + req.form['__password'] = origcnx.authinfo['password'] + self.assertAuthFailure(req) # option allow-email-login set origcnx.login = address self.set_option('allow-email-login', True) req.form['__login'] = address - req.form['__password'] = origcnx.password - self._test_auth_succeed(req, origcnx) - self.assertRaises(AuthenticationError, self.publish, req, 'logout') + req.form['__password'] = origcnx.authinfo['password'] + self.assertAuthSuccess(req, origcnx) + self.assertEquals(req.cnx.authinfo, {'password': origcnx.authinfo['password']}) + self.assertRaises(AuthenticationError, self.app_publish, req, 'logout') self.assertEquals(len(self.open_sessions), 0) - def _test_auth_anon(self, req): - self.app.connect(req) - acnx = req.cnx - self.assertEquals(len(self.open_sessions), 1) - self.assertEquals(acnx.login, 'anon') - self.assertEquals(acnx.password, 'anon') - self.failUnless(acnx.anonymous_connection) - self._reset_cookie(req) - def _reset_cookie(self, req): # preparing the suite of the test # set session id in cookie @@ -384,6 +326,15 @@ # reset cnx as if it was a new incoming request req.cnx = None + def _test_auth_anon(self, req): + self.app.connect(req) + acnx = req.cnx + self.assertEquals(len(self.open_sessions), 1) + self.assertEquals(acnx.login, 'anon') + self.assertEquals(acnx.authinfo['password'], 'anon') + self.failUnless(acnx.anonymous_connection) + self._reset_cookie(req) + def _test_anon_auth_fail(self, req): self.assertEquals(len(self.open_sessions), 1) self.app.connect(req) @@ -393,35 +344,36 @@ self._reset_cookie(req) def test_http_auth_anon_allowed(self): - req, origcnx = self._init_auth('http', 'anon') + req, origcnx = self.init_authentication('http', 'anon') self._test_auth_anon(req) authstr = base64.encodestring('toto:pouet') req._headers['Authorization'] = 'basic %s' % authstr self._test_anon_auth_fail(req) - authstr = base64.encodestring('%s:%s' % (origcnx.login, origcnx.password)) + authstr = base64.encodestring('%s:%s' % (origcnx.login, origcnx.authinfo['password'])) req._headers['Authorization'] = 'basic %s' % authstr - self._test_auth_succeed(req, origcnx) - self.assertRaises(AuthenticationError, self.publish, req, 'logout') + self.assertAuthSuccess(req, origcnx) + self.assertEquals(req.cnx.authinfo, {'password': origcnx.authinfo['password']}) + self.assertRaises(AuthenticationError, self.app_publish, req, 'logout') self.assertEquals(len(self.open_sessions), 0) def test_cookie_auth_anon_allowed(self): - req, origcnx = self._init_auth('cookie', 'anon') + req, origcnx = self.init_authentication('cookie', 'anon') self._test_auth_anon(req) req.form['__login'] = 'toto' req.form['__password'] = 'pouet' self._test_anon_auth_fail(req) req.form['__login'] = origcnx.login - req.form['__password'] = origcnx.password - self._test_auth_succeed(req, origcnx) - self.assertRaises(AuthenticationError, self.publish, req, 'logout') + req.form['__password'] = origcnx.authinfo['password'] + self.assertAuthSuccess(req, origcnx) + self.assertEquals(req.cnx.authinfo, {'password': origcnx.authinfo['password']}) + self.assertRaises(AuthenticationError, self.app_publish, req, 'logout') self.assertEquals(len(self.open_sessions), 0) def test_non_regr_optional_first_var(self): req = self.request() # expect a rset with None in [0][0] req.form['rql'] = 'rql:Any OV1, X WHERE X custom_workflow OV1?' - self.publish(req) - print 'yuea' + self.app_publish(req) if __name__ == '__main__': unittest_main()