--- a/devtools/testlib.py Mon Apr 12 14:41:01 2010 +0200
+++ b/devtools/testlib.py Tue Apr 13 12:19:24 2010 +0200
@@ -30,7 +30,7 @@
from cubicweb import ValidationError, NoSelectableObject, AuthenticationError
from cubicweb import cwconfig, devtools, web, server
-from cubicweb.dbapi import repo_connect, ConnectionProperties, ProgrammingError
+from cubicweb.dbapi import ProgrammingError, DBAPISession, repo_connect
from cubicweb.sobjects import notification
from cubicweb.web import Redirect, application
from cubicweb.server.session import security_enabled
@@ -214,11 +214,10 @@
cls.init_config(cls.config)
cls.repo.hm.call_hooks('server_startup', repo=cls.repo)
cls.vreg = cls.repo.vreg
- cls._orig_cnx = cls.cnx
+ cls.websession = DBAPISession(cls.cnx, cls.admlogin,
+ {'password': cls.admpassword})
+ cls._orig_cnx = (cls.cnx, cls.websession)
cls.config.repository = lambda x=None: cls.repo
- # necessary for authentication tests
- cls.cnx.login = cls.admlogin
- cls.cnx.authinfo = {'password': cls.admpassword}
@classmethod
def _refresh_repo(cls):
@@ -241,7 +240,7 @@
@property
def adminsession(self):
"""return current server side session (using default manager account)"""
- return self.repo._sessions[self._orig_cnx.sessionid]
+ return self.repo._sessions[self._orig_cnx[0].sessionid]
def set_option(self, optname, value):
self.config.global_set_option(optname, value)
@@ -291,7 +290,7 @@
if password is None:
password = login.encode('utf8')
if req is None:
- req = self._orig_cnx.request()
+ req = self._orig_cnx[0].request()
user = req.create_entity('CWUser', login=unicode(login),
upassword=password, **kwargs)
req.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)'
@@ -309,22 +308,21 @@
else:
if not kwargs:
kwargs['password'] = str(login)
- self.cnx = repo_connect(self.repo, unicode(login),
- cnxprops=ConnectionProperties('inmemory'),
- **kwargs)
+ self.cnx = repo_connect(self.repo, unicode(login), **kwargs)
+ self.websession = DBAPISession(self.cnx)
self._cnxs.append(self.cnx)
if login == self.vreg.config.anonymous_user()[0]:
self.cnx.anonymous_connection = True
return self.cnx
def restore_connection(self):
- if not self.cnx is self._orig_cnx:
+ if not self.cnx is self._orig_cnx[0]:
try:
self.cnx.close()
self._cnxs.remove(self.cnx)
except ProgrammingError:
pass # already closed
- self.cnx = self._orig_cnx
+ self.cnx, self.websession = self._orig_cnx
# db api ##################################################################
@@ -486,7 +484,7 @@
def request(self, *args, **kwargs):
"""return a web ui request"""
req = self.requestcls(self.vreg, form=kwargs)
- req.set_connection(self.cnx)
+ req.set_session(self.websession)
return req
def remote_call(self, fname, *args):
@@ -542,27 +540,31 @@
self.set_option('auth-mode', authmode)
self.set_option('anonymous-user', anonuser)
req = self.request()
- origcnx = req.cnx
- req.cnx = None
+ origsession = req.session
+ req.session = req.cnx = None
+ del req.execute # get back to class implementation
sh = self.app.session_handler
authm = sh.session_manager.authmanager
authm.anoninfo = self.vreg.config.anonymous_user()
+ authm.anoninfo = authm.anoninfo[0], {'password': authm.anoninfo[1]}
# not properly cleaned between tests
self.open_sessions = sh.session_manager._sessions = {}
- return req, origcnx
+ return req, origsession
- def assertAuthSuccess(self, req, origcnx, nbsessions=1):
+ def assertAuthSuccess(self, req, origsession, nbsessions=1):
sh = self.app.session_handler
path, params = self.expect_redirect(lambda x: self.app.connect(x), req)
- cnx = req.cnx
+ session = req.session
self.assertEquals(len(self.open_sessions), nbsessions, self.open_sessions)
- self.assertEquals(cnx.login, origcnx.login)
- self.assertEquals(cnx.anonymous_connection, False)
+ self.assertEquals(session.login, origsession.login)
+ self.assertEquals(session.anonymous_session, False)
self.assertEquals(path, 'view')
- self.assertEquals(params, {'__message': 'welcome %s !' % cnx.user().login})
+ self.assertEquals(params, {'__message': 'welcome %s !' % req.user.login})
def assertAuthFailure(self, req, nbsessions=0):
- self.assertRaises(AuthenticationError, self.app.connect, req)
+ self.app.connect(req)
+ self.assertIsInstance(req.session, DBAPISession)
+ self.assertEquals(req.session.cnx, None)
self.assertEquals(req.cnx, None)
self.assertEquals(len(self.open_sessions), nbsessions)
clear_cache(req, 'get_authorization')