factor out code reusable for authentication tests
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Tue, 13 Oct 2009 15:59:47 +0200
changeset 3657 706d7bf0ae3d
parent 3656 9ba2e3253a88
child 3658 d8f2ec7e91fa
factor out code reusable for authentication tests
devtools/testlib.py
web/test/unittest_application.py
--- a/devtools/testlib.py	Tue Oct 13 15:59:18 2009 +0200
+++ b/devtools/testlib.py	Tue Oct 13 15:59:47 2009 +0200
@@ -21,13 +21,14 @@
 from logilab.common.pytest import nocoverage, pause_tracing, resume_tracing
 from logilab.common.debugger import Debugger
 from logilab.common.umessage import message_from_string
-from logilab.common.decorators import cached, classproperty
+from logilab.common.decorators import cached, classproperty, clear_cache
 from logilab.common.deprecation import deprecated
 
-from cubicweb import NoSelectableObject, cwconfig, devtools, web, server
+from cubicweb import NoSelectableObject, AuthenticationError
+from cubicweb import cwconfig, devtools, web, server
 from cubicweb.dbapi import repo_connect, ConnectionProperties, ProgrammingError
 from cubicweb.sobjects import notification
-from cubicweb.web import application
+from cubicweb.web import Redirect, application
 from cubicweb.devtools import SYSTEM_ENTITIES, SYSTEM_RELATIONS, VIEW_VALIDATORS
 from cubicweb.devtools import fake, htmlparser
 
@@ -226,7 +227,7 @@
         cls.config.repository = lambda x=None: cls.repo
         # necessary for authentication tests
         cls.cnx.login = cls.admlogin
-        cls.cnx.password = cls.admpassword
+        cls.cnx.authinfo = {'password': cls.admpassword}
 
     @classmethod
     def _refresh_repo(cls):
@@ -497,6 +498,9 @@
         ctrl = self.vreg['controllers'].select('json', req)
         return ctrl.publish(), req
 
+    def app_publish(self, req, path='view'):
+        return self.app.publish(path, req)
+
     def publish(self, req):
         """call the publish method of the edit controller"""
         ctrl = self.vreg['controllers'].select('edit', req)
@@ -508,23 +512,61 @@
             raise
         return result
 
-    def expect_redirect_publish(self, req):
-        """call the publish method of the edit controller, expecting to get a
-        Redirect exception."""
+    def expect_redirect(self, callback, req):
+        """call the given callback with req as argument, expecting to get a
+        Redirect exception
+        """
         try:
-            self.publish(req)
-        except web.Redirect, ex:
+            res = callback(req)
+        except Redirect, ex:
             try:
                 path, params = ex.location.split('?', 1)
-            except:
-                path, params = ex.location, ""
-            req._url = path
-            cleanup = lambda p: (p[0], unquote(p[1]))
-            params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p)
-            return req.relative_path(False), params # path.rsplit('/', 1)[-1], params
+            except ValueError:
+                path = ex.location
+                params = {}
+            else:
+                cleanup = lambda p: (p[0], unquote(p[1]))
+                params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p)
+            path = path[len(req.base_url()):]
+            return path, params
         else:
             self.fail('expected a Redirect exception')
 
+    def expect_redirect_publish(self, req, path='view'):
+        """call the publish method of the application publisher, expecting to
+        get a Redirect exception
+        """
+        return self.expect_redirect(lambda x: self.publish(x, path), req)
+
+    def init_authentication(self, authmode, anonuser=None):
+        self.set_option('auth-mode', authmode)
+        self.set_option('anonymous-user', anonuser)
+        req = self.request()
+        origcnx = req.cnx
+        req.cnx = None
+        sh = self.app.session_handler
+        authm = sh.session_manager.authmanager
+        authm.authinforetreivers[-1].anoninfo = self.vreg.config.anonymous_user()
+        # not properly cleaned between tests
+        self.open_sessions = sh.session_manager._sessions = {}
+        return req, origcnx
+
+    def assertAuthSuccess(self, req, origcnx, nbsessions=1):
+        sh = self.app.session_handler
+        path, params = self.expect_redirect(lambda x: self.app.connect(x), req)
+        cnx = req.cnx
+        self.assertEquals(len(self.open_sessions), nbsessions, self.open_sessions)
+        self.assertEquals(cnx.login, origcnx.login)
+        self.assertEquals(cnx.anonymous_connection, False)
+        self.assertEquals(path, 'view')
+        self.assertEquals(params, {'__message': 'welcome %s !' % cnx.user().login})
+
+    def assertAuthFailure(self, req):
+        self.assertRaises(AuthenticationError, self.app.connect, req)
+        self.assertEquals(req.cnx, None)
+        self.assertEquals(len(self.open_sessions), 0)
+        clear_cache(req, 'get_authorization')
+
     # content validation #######################################################
 
     # validators are used to validate (XML, DTD, whatever) view's content
--- a/web/test/unittest_application.py	Tue Oct 13 15:59:18 2009 +0200
+++ b/web/test/unittest_application.py	Tue Oct 13 15:59:47 2009 +0200
@@ -144,30 +144,6 @@
             raise
         self.app.error_handler = raise_hdlr
 
-    def publish(self, req, path='view'):
-        return self.app.publish(path, req)
-
-    def expect_redirect(self, callback, req):
-        try:
-            res = callback(req)
-            print res
-        except Redirect, ex:
-            try:
-                path, params = ex.location.split('?', 1)
-            except ValueError:
-                path = ex.location
-                params = {}
-            else:
-                cleanup = lambda p: (p[0], unquote(p[1]))
-                params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p)
-            path = path[len(req.base_url()):]
-            return path, params
-        else:
-            self.fail('expected a Redirect exception')
-
-    def expect_redirect_publish(self, req, path='view'):
-        return self.expect_redirect(lambda x: self.publish(x, path), req)
-
     def test_cnx_user_groups_sync(self):
         user = self.user()
         self.assertEquals(user.groups, set(('managers',)))
@@ -201,7 +177,7 @@
              # just a sample, missing some necessary information for real life
             '__errorurl': 'view?vid=edition...'
             }
-        path, params = self.expect_redirect_publish(req, 'edit')
+        path, params = self.expect_redirect(lambda x: self.app_publish(x, 'edit'), req)
         forminfo = req.get_session_data('view?vid=edition...')
         eidmap = forminfo['eidmap']
         self.assertEquals(eidmap, {})
@@ -234,7 +210,7 @@
         req.form = form
         # monkey patch edited_eid to ensure both entities are edited, not only X
         req.edited_eids = lambda : ('Y', 'X')
-        path, params = self.expect_redirect_publish(req, 'edit')
+        path, params = self.expect_redirect(lambda x: self.app_publish(x, 'edit'), req)
         forminfo = req.get_session_data('view?vid=edition...')
         self.assertUnorderedIterableEquals(forminfo['eidmap'].keys(), ['X', 'Y'])
         self.assertEquals(forminfo['errors'].entity, forminfo['eidmap']['X'])
@@ -286,63 +262,37 @@
         req = self.request()
         origcnx = req.cnx
         req.form['__fblogin'] = u'turlututu'
-        page = self.publish(req)
+        page = self.app_publish(req)
         self.failIf(req.cnx is origcnx)
         self.assertEquals(req.user.login, 'turlututu')
         self.failUnless('turlututu' in page, page)
 
     # authentication tests ####################################################
 
-    def _init_auth(self, authmode, anonuser=None):
-        self.set_option('auth-mode', authmode)
-        self.set_option('anonymous-user', anonuser)
-        req = self.request()
-        origcnx = req.cnx
-        req.cnx = None
-        sh = self.app.session_handler
-        # not properly cleaned between tests
-        self.open_sessions = sh.session_manager._sessions = {}
-        return req, origcnx
-
-    def _test_auth_succeed(self, req, origcnx):
-        sh = self.app.session_handler
-        path, params = self.expect_redirect(lambda x: self.app.connect(x), req)
-        cnx = req.cnx
-        self.assertEquals(len(self.open_sessions), 1, self.open_sessions)
-        self.assertEquals(cnx.login, origcnx.login)
-        self.assertEquals(cnx.password, origcnx.password)
-        self.assertEquals(cnx.anonymous_connection, False)
-        self.assertEquals(path, 'view')
-        self.assertEquals(params, {'__message': 'welcome %s !' % cnx.user().login})
-
-    def _test_auth_fail(self, req):
-        self.assertRaises(AuthenticationError, self.app.connect, req)
+    def test_http_auth_no_anon(self):
+        req, origcnx = self.init_authentication('http')
+        self.assertAuthFailure(req)
+        self.assertRaises(ExplicitLogin, self.app_publish, req, 'login')
         self.assertEquals(req.cnx, None)
-        self.assertEquals(len(self.open_sessions), 0)
-        clear_cache(req, 'get_authorization')
-
-    def test_http_auth_no_anon(self):
-        req, origcnx = self._init_auth('http')
-        self._test_auth_fail(req)
-        self.assertRaises(ExplicitLogin, self.publish, req, 'login')
-        self.assertEquals(req.cnx, None)
-        authstr = base64.encodestring('%s:%s' % (origcnx.login, origcnx.password))
+        authstr = base64.encodestring('%s:%s' % (origcnx.login, origcnx.authinfo['password']))
         req._headers['Authorization'] = 'basic %s' % authstr
-        self._test_auth_succeed(req, origcnx)
-        self.assertRaises(AuthenticationError, self.publish, req, 'logout')
+        self.assertAuthSuccess(req, origcnx)
+        self.assertEquals(req.cnx.authinfo, {'password': origcnx.authinfo['password']})
+        self.assertRaises(AuthenticationError, self.app_publish, req, 'logout')
         self.assertEquals(len(self.open_sessions), 0)
 
     def test_cookie_auth_no_anon(self):
-        req, origcnx = self._init_auth('cookie')
-        self._test_auth_fail(req)
-        form = self.publish(req, 'login')
+        req, origcnx = self.init_authentication('cookie')
+        self.assertAuthFailure(req)
+        form = self.app_publish(req, 'login')
         self.failUnless('__login' in form)
         self.failUnless('__password' in form)
         self.assertEquals(req.cnx, None)
         req.form['__login'] = origcnx.login
-        req.form['__password'] = origcnx.password
-        self._test_auth_succeed(req, origcnx)
-        self.assertRaises(AuthenticationError, self.publish, req, 'logout')
+        req.form['__password'] = origcnx.authinfo['password']
+        self.assertAuthSuccess(req, origcnx)
+        self.assertEquals(req.cnx.authinfo, {'password': origcnx.authinfo['password']})
+        self.assertRaises(AuthenticationError, self.app_publish, req, 'logout')
         self.assertEquals(len(self.open_sessions), 0)
 
     def test_login_by_email(self):
@@ -352,28 +302,20 @@
                      'WHERE U login %(login)s', {'address': address, 'login': login})
         self.commit()
         # option allow-email-login not set
-        req, origcnx = self._init_auth('cookie')
+        req, origcnx = self.init_authentication('cookie')
         req.form['__login'] = address
-        req.form['__password'] = origcnx.password
-        self._test_auth_fail(req)
+        req.form['__password'] = origcnx.authinfo['password']
+        self.assertAuthFailure(req)
         # option allow-email-login set
         origcnx.login = address
         self.set_option('allow-email-login', True)
         req.form['__login'] = address
-        req.form['__password'] = origcnx.password
-        self._test_auth_succeed(req, origcnx)
-        self.assertRaises(AuthenticationError, self.publish, req, 'logout')
+        req.form['__password'] = origcnx.authinfo['password']
+        self.assertAuthSuccess(req, origcnx)
+        self.assertEquals(req.cnx.authinfo, {'password': origcnx.authinfo['password']})
+        self.assertRaises(AuthenticationError, self.app_publish, req, 'logout')
         self.assertEquals(len(self.open_sessions), 0)
 
-    def _test_auth_anon(self, req):
-        self.app.connect(req)
-        acnx = req.cnx
-        self.assertEquals(len(self.open_sessions), 1)
-        self.assertEquals(acnx.login, 'anon')
-        self.assertEquals(acnx.password, 'anon')
-        self.failUnless(acnx.anonymous_connection)
-        self._reset_cookie(req)
-
     def _reset_cookie(self, req):
         # preparing the suite of the test
         # set session id in cookie
@@ -384,6 +326,15 @@
         # reset cnx as if it was a new incoming request
         req.cnx = None
 
+    def _test_auth_anon(self, req):
+        self.app.connect(req)
+        acnx = req.cnx
+        self.assertEquals(len(self.open_sessions), 1)
+        self.assertEquals(acnx.login, 'anon')
+        self.assertEquals(acnx.authinfo['password'], 'anon')
+        self.failUnless(acnx.anonymous_connection)
+        self._reset_cookie(req)
+
     def _test_anon_auth_fail(self, req):
         self.assertEquals(len(self.open_sessions), 1)
         self.app.connect(req)
@@ -393,35 +344,36 @@
         self._reset_cookie(req)
 
     def test_http_auth_anon_allowed(self):
-        req, origcnx = self._init_auth('http', 'anon')
+        req, origcnx = self.init_authentication('http', 'anon')
         self._test_auth_anon(req)
         authstr = base64.encodestring('toto:pouet')
         req._headers['Authorization'] = 'basic %s' % authstr
         self._test_anon_auth_fail(req)
-        authstr = base64.encodestring('%s:%s' % (origcnx.login, origcnx.password))
+        authstr = base64.encodestring('%s:%s' % (origcnx.login, origcnx.authinfo['password']))
         req._headers['Authorization'] = 'basic %s' % authstr
-        self._test_auth_succeed(req, origcnx)
-        self.assertRaises(AuthenticationError, self.publish, req, 'logout')
+        self.assertAuthSuccess(req, origcnx)
+        self.assertEquals(req.cnx.authinfo, {'password': origcnx.authinfo['password']})
+        self.assertRaises(AuthenticationError, self.app_publish, req, 'logout')
         self.assertEquals(len(self.open_sessions), 0)
 
     def test_cookie_auth_anon_allowed(self):
-        req, origcnx = self._init_auth('cookie', 'anon')
+        req, origcnx = self.init_authentication('cookie', 'anon')
         self._test_auth_anon(req)
         req.form['__login'] = 'toto'
         req.form['__password'] = 'pouet'
         self._test_anon_auth_fail(req)
         req.form['__login'] = origcnx.login
-        req.form['__password'] = origcnx.password
-        self._test_auth_succeed(req, origcnx)
-        self.assertRaises(AuthenticationError, self.publish, req, 'logout')
+        req.form['__password'] = origcnx.authinfo['password']
+        self.assertAuthSuccess(req, origcnx)
+        self.assertEquals(req.cnx.authinfo, {'password': origcnx.authinfo['password']})
+        self.assertRaises(AuthenticationError, self.app_publish, req, 'logout')
         self.assertEquals(len(self.open_sessions), 0)
 
     def test_non_regr_optional_first_var(self):
         req = self.request()
         # expect a rset with None in [0][0]
         req.form['rql'] = 'rql:Any OV1, X WHERE X custom_workflow OV1?'
-        self.publish(req)
-        print 'yuea'
+        self.app_publish(req)
 
 if __name__ == '__main__':
     unittest_main()