--- a/devtools/testlib.py Tue Oct 13 15:59:18 2009 +0200
+++ b/devtools/testlib.py Tue Oct 13 15:59:47 2009 +0200
@@ -21,13 +21,14 @@
from logilab.common.pytest import nocoverage, pause_tracing, resume_tracing
from logilab.common.debugger import Debugger
from logilab.common.umessage import message_from_string
-from logilab.common.decorators import cached, classproperty
+from logilab.common.decorators import cached, classproperty, clear_cache
from logilab.common.deprecation import deprecated
-from cubicweb import NoSelectableObject, cwconfig, devtools, web, server
+from cubicweb import NoSelectableObject, AuthenticationError
+from cubicweb import cwconfig, devtools, web, server
from cubicweb.dbapi import repo_connect, ConnectionProperties, ProgrammingError
from cubicweb.sobjects import notification
-from cubicweb.web import application
+from cubicweb.web import Redirect, application
from cubicweb.devtools import SYSTEM_ENTITIES, SYSTEM_RELATIONS, VIEW_VALIDATORS
from cubicweb.devtools import fake, htmlparser
@@ -226,7 +227,7 @@
cls.config.repository = lambda x=None: cls.repo
# necessary for authentication tests
cls.cnx.login = cls.admlogin
- cls.cnx.password = cls.admpassword
+ cls.cnx.authinfo = {'password': cls.admpassword}
@classmethod
def _refresh_repo(cls):
@@ -497,6 +498,9 @@
ctrl = self.vreg['controllers'].select('json', req)
return ctrl.publish(), req
+ def app_publish(self, req, path='view'):
+ return self.app.publish(path, req)
+
def publish(self, req):
"""call the publish method of the edit controller"""
ctrl = self.vreg['controllers'].select('edit', req)
@@ -508,23 +512,61 @@
raise
return result
- def expect_redirect_publish(self, req):
- """call the publish method of the edit controller, expecting to get a
- Redirect exception."""
+ def expect_redirect(self, callback, req):
+ """call the given callback with req as argument, expecting to get a
+ Redirect exception
+ """
try:
- self.publish(req)
- except web.Redirect, ex:
+ res = callback(req)
+ except Redirect, ex:
try:
path, params = ex.location.split('?', 1)
- except:
- path, params = ex.location, ""
- req._url = path
- cleanup = lambda p: (p[0], unquote(p[1]))
- params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p)
- return req.relative_path(False), params # path.rsplit('/', 1)[-1], params
+ except ValueError:
+ path = ex.location
+ params = {}
+ else:
+ cleanup = lambda p: (p[0], unquote(p[1]))
+ params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p)
+ path = path[len(req.base_url()):]
+ return path, params
else:
self.fail('expected a Redirect exception')
+ def expect_redirect_publish(self, req, path='view'):
+ """call the publish method of the application publisher, expecting to
+ get a Redirect exception
+ """
+ return self.expect_redirect(lambda x: self.publish(x, path), req)
+
+ def init_authentication(self, authmode, anonuser=None):
+ self.set_option('auth-mode', authmode)
+ self.set_option('anonymous-user', anonuser)
+ req = self.request()
+ origcnx = req.cnx
+ req.cnx = None
+ sh = self.app.session_handler
+ authm = sh.session_manager.authmanager
+ authm.authinforetreivers[-1].anoninfo = self.vreg.config.anonymous_user()
+ # not properly cleaned between tests
+ self.open_sessions = sh.session_manager._sessions = {}
+ return req, origcnx
+
+ def assertAuthSuccess(self, req, origcnx, nbsessions=1):
+ sh = self.app.session_handler
+ path, params = self.expect_redirect(lambda x: self.app.connect(x), req)
+ cnx = req.cnx
+ self.assertEquals(len(self.open_sessions), nbsessions, self.open_sessions)
+ self.assertEquals(cnx.login, origcnx.login)
+ self.assertEquals(cnx.anonymous_connection, False)
+ self.assertEquals(path, 'view')
+ self.assertEquals(params, {'__message': 'welcome %s !' % cnx.user().login})
+
+ def assertAuthFailure(self, req):
+ self.assertRaises(AuthenticationError, self.app.connect, req)
+ self.assertEquals(req.cnx, None)
+ self.assertEquals(len(self.open_sessions), 0)
+ clear_cache(req, 'get_authorization')
+
# content validation #######################################################
# validators are used to validate (XML, DTD, whatever) view's content
--- a/web/test/unittest_application.py Tue Oct 13 15:59:18 2009 +0200
+++ b/web/test/unittest_application.py Tue Oct 13 15:59:47 2009 +0200
@@ -144,30 +144,6 @@
raise
self.app.error_handler = raise_hdlr
- def publish(self, req, path='view'):
- return self.app.publish(path, req)
-
- def expect_redirect(self, callback, req):
- try:
- res = callback(req)
- print res
- except Redirect, ex:
- try:
- path, params = ex.location.split('?', 1)
- except ValueError:
- path = ex.location
- params = {}
- else:
- cleanup = lambda p: (p[0], unquote(p[1]))
- params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p)
- path = path[len(req.base_url()):]
- return path, params
- else:
- self.fail('expected a Redirect exception')
-
- def expect_redirect_publish(self, req, path='view'):
- return self.expect_redirect(lambda x: self.publish(x, path), req)
-
def test_cnx_user_groups_sync(self):
user = self.user()
self.assertEquals(user.groups, set(('managers',)))
@@ -201,7 +177,7 @@
# just a sample, missing some necessary information for real life
'__errorurl': 'view?vid=edition...'
}
- path, params = self.expect_redirect_publish(req, 'edit')
+ path, params = self.expect_redirect(lambda x: self.app_publish(x, 'edit'), req)
forminfo = req.get_session_data('view?vid=edition...')
eidmap = forminfo['eidmap']
self.assertEquals(eidmap, {})
@@ -234,7 +210,7 @@
req.form = form
# monkey patch edited_eid to ensure both entities are edited, not only X
req.edited_eids = lambda : ('Y', 'X')
- path, params = self.expect_redirect_publish(req, 'edit')
+ path, params = self.expect_redirect(lambda x: self.app_publish(x, 'edit'), req)
forminfo = req.get_session_data('view?vid=edition...')
self.assertUnorderedIterableEquals(forminfo['eidmap'].keys(), ['X', 'Y'])
self.assertEquals(forminfo['errors'].entity, forminfo['eidmap']['X'])
@@ -286,63 +262,37 @@
req = self.request()
origcnx = req.cnx
req.form['__fblogin'] = u'turlututu'
- page = self.publish(req)
+ page = self.app_publish(req)
self.failIf(req.cnx is origcnx)
self.assertEquals(req.user.login, 'turlututu')
self.failUnless('turlututu' in page, page)
# authentication tests ####################################################
- def _init_auth(self, authmode, anonuser=None):
- self.set_option('auth-mode', authmode)
- self.set_option('anonymous-user', anonuser)
- req = self.request()
- origcnx = req.cnx
- req.cnx = None
- sh = self.app.session_handler
- # not properly cleaned between tests
- self.open_sessions = sh.session_manager._sessions = {}
- return req, origcnx
-
- def _test_auth_succeed(self, req, origcnx):
- sh = self.app.session_handler
- path, params = self.expect_redirect(lambda x: self.app.connect(x), req)
- cnx = req.cnx
- self.assertEquals(len(self.open_sessions), 1, self.open_sessions)
- self.assertEquals(cnx.login, origcnx.login)
- self.assertEquals(cnx.password, origcnx.password)
- self.assertEquals(cnx.anonymous_connection, False)
- self.assertEquals(path, 'view')
- self.assertEquals(params, {'__message': 'welcome %s !' % cnx.user().login})
-
- def _test_auth_fail(self, req):
- self.assertRaises(AuthenticationError, self.app.connect, req)
+ def test_http_auth_no_anon(self):
+ req, origcnx = self.init_authentication('http')
+ self.assertAuthFailure(req)
+ self.assertRaises(ExplicitLogin, self.app_publish, req, 'login')
self.assertEquals(req.cnx, None)
- self.assertEquals(len(self.open_sessions), 0)
- clear_cache(req, 'get_authorization')
-
- def test_http_auth_no_anon(self):
- req, origcnx = self._init_auth('http')
- self._test_auth_fail(req)
- self.assertRaises(ExplicitLogin, self.publish, req, 'login')
- self.assertEquals(req.cnx, None)
- authstr = base64.encodestring('%s:%s' % (origcnx.login, origcnx.password))
+ authstr = base64.encodestring('%s:%s' % (origcnx.login, origcnx.authinfo['password']))
req._headers['Authorization'] = 'basic %s' % authstr
- self._test_auth_succeed(req, origcnx)
- self.assertRaises(AuthenticationError, self.publish, req, 'logout')
+ self.assertAuthSuccess(req, origcnx)
+ self.assertEquals(req.cnx.authinfo, {'password': origcnx.authinfo['password']})
+ self.assertRaises(AuthenticationError, self.app_publish, req, 'logout')
self.assertEquals(len(self.open_sessions), 0)
def test_cookie_auth_no_anon(self):
- req, origcnx = self._init_auth('cookie')
- self._test_auth_fail(req)
- form = self.publish(req, 'login')
+ req, origcnx = self.init_authentication('cookie')
+ self.assertAuthFailure(req)
+ form = self.app_publish(req, 'login')
self.failUnless('__login' in form)
self.failUnless('__password' in form)
self.assertEquals(req.cnx, None)
req.form['__login'] = origcnx.login
- req.form['__password'] = origcnx.password
- self._test_auth_succeed(req, origcnx)
- self.assertRaises(AuthenticationError, self.publish, req, 'logout')
+ req.form['__password'] = origcnx.authinfo['password']
+ self.assertAuthSuccess(req, origcnx)
+ self.assertEquals(req.cnx.authinfo, {'password': origcnx.authinfo['password']})
+ self.assertRaises(AuthenticationError, self.app_publish, req, 'logout')
self.assertEquals(len(self.open_sessions), 0)
def test_login_by_email(self):
@@ -352,28 +302,20 @@
'WHERE U login %(login)s', {'address': address, 'login': login})
self.commit()
# option allow-email-login not set
- req, origcnx = self._init_auth('cookie')
+ req, origcnx = self.init_authentication('cookie')
req.form['__login'] = address
- req.form['__password'] = origcnx.password
- self._test_auth_fail(req)
+ req.form['__password'] = origcnx.authinfo['password']
+ self.assertAuthFailure(req)
# option allow-email-login set
origcnx.login = address
self.set_option('allow-email-login', True)
req.form['__login'] = address
- req.form['__password'] = origcnx.password
- self._test_auth_succeed(req, origcnx)
- self.assertRaises(AuthenticationError, self.publish, req, 'logout')
+ req.form['__password'] = origcnx.authinfo['password']
+ self.assertAuthSuccess(req, origcnx)
+ self.assertEquals(req.cnx.authinfo, {'password': origcnx.authinfo['password']})
+ self.assertRaises(AuthenticationError, self.app_publish, req, 'logout')
self.assertEquals(len(self.open_sessions), 0)
- def _test_auth_anon(self, req):
- self.app.connect(req)
- acnx = req.cnx
- self.assertEquals(len(self.open_sessions), 1)
- self.assertEquals(acnx.login, 'anon')
- self.assertEquals(acnx.password, 'anon')
- self.failUnless(acnx.anonymous_connection)
- self._reset_cookie(req)
-
def _reset_cookie(self, req):
# preparing the suite of the test
# set session id in cookie
@@ -384,6 +326,15 @@
# reset cnx as if it was a new incoming request
req.cnx = None
+ def _test_auth_anon(self, req):
+ self.app.connect(req)
+ acnx = req.cnx
+ self.assertEquals(len(self.open_sessions), 1)
+ self.assertEquals(acnx.login, 'anon')
+ self.assertEquals(acnx.authinfo['password'], 'anon')
+ self.failUnless(acnx.anonymous_connection)
+ self._reset_cookie(req)
+
def _test_anon_auth_fail(self, req):
self.assertEquals(len(self.open_sessions), 1)
self.app.connect(req)
@@ -393,35 +344,36 @@
self._reset_cookie(req)
def test_http_auth_anon_allowed(self):
- req, origcnx = self._init_auth('http', 'anon')
+ req, origcnx = self.init_authentication('http', 'anon')
self._test_auth_anon(req)
authstr = base64.encodestring('toto:pouet')
req._headers['Authorization'] = 'basic %s' % authstr
self._test_anon_auth_fail(req)
- authstr = base64.encodestring('%s:%s' % (origcnx.login, origcnx.password))
+ authstr = base64.encodestring('%s:%s' % (origcnx.login, origcnx.authinfo['password']))
req._headers['Authorization'] = 'basic %s' % authstr
- self._test_auth_succeed(req, origcnx)
- self.assertRaises(AuthenticationError, self.publish, req, 'logout')
+ self.assertAuthSuccess(req, origcnx)
+ self.assertEquals(req.cnx.authinfo, {'password': origcnx.authinfo['password']})
+ self.assertRaises(AuthenticationError, self.app_publish, req, 'logout')
self.assertEquals(len(self.open_sessions), 0)
def test_cookie_auth_anon_allowed(self):
- req, origcnx = self._init_auth('cookie', 'anon')
+ req, origcnx = self.init_authentication('cookie', 'anon')
self._test_auth_anon(req)
req.form['__login'] = 'toto'
req.form['__password'] = 'pouet'
self._test_anon_auth_fail(req)
req.form['__login'] = origcnx.login
- req.form['__password'] = origcnx.password
- self._test_auth_succeed(req, origcnx)
- self.assertRaises(AuthenticationError, self.publish, req, 'logout')
+ req.form['__password'] = origcnx.authinfo['password']
+ self.assertAuthSuccess(req, origcnx)
+ self.assertEquals(req.cnx.authinfo, {'password': origcnx.authinfo['password']})
+ self.assertRaises(AuthenticationError, self.app_publish, req, 'logout')
self.assertEquals(len(self.open_sessions), 0)
def test_non_regr_optional_first_var(self):
req = self.request()
# expect a rset with None in [0][0]
req.form['rql'] = 'rql:Any OV1, X WHERE X custom_workflow OV1?'
- self.publish(req)
- print 'yuea'
+ self.app_publish(req)
if __name__ == '__main__':
unittest_main()