devtools/testlib.py
changeset 5223 6abd6e3599f4
parent 5177 395e1ff018ae
child 5238 31c12863fd9d
--- a/devtools/testlib.py	Mon Apr 12 14:41:01 2010 +0200
+++ b/devtools/testlib.py	Tue Apr 13 12:19:24 2010 +0200
@@ -30,7 +30,7 @@
 
 from cubicweb import ValidationError, NoSelectableObject, AuthenticationError
 from cubicweb import cwconfig, devtools, web, server
-from cubicweb.dbapi import repo_connect, ConnectionProperties, ProgrammingError
+from cubicweb.dbapi import ProgrammingError, DBAPISession, repo_connect
 from cubicweb.sobjects import notification
 from cubicweb.web import Redirect, application
 from cubicweb.server.session import security_enabled
@@ -214,11 +214,10 @@
         cls.init_config(cls.config)
         cls.repo.hm.call_hooks('server_startup', repo=cls.repo)
         cls.vreg = cls.repo.vreg
-        cls._orig_cnx = cls.cnx
+        cls.websession = DBAPISession(cls.cnx, cls.admlogin,
+                                      {'password': cls.admpassword})
+        cls._orig_cnx = (cls.cnx, cls.websession)
         cls.config.repository = lambda x=None: cls.repo
-        # necessary for authentication tests
-        cls.cnx.login = cls.admlogin
-        cls.cnx.authinfo = {'password': cls.admpassword}
 
     @classmethod
     def _refresh_repo(cls):
@@ -241,7 +240,7 @@
     @property
     def adminsession(self):
         """return current server side session (using default manager account)"""
-        return self.repo._sessions[self._orig_cnx.sessionid]
+        return self.repo._sessions[self._orig_cnx[0].sessionid]
 
     def set_option(self, optname, value):
         self.config.global_set_option(optname, value)
@@ -291,7 +290,7 @@
         if password is None:
             password = login.encode('utf8')
         if req is None:
-            req = self._orig_cnx.request()
+            req = self._orig_cnx[0].request()
         user = req.create_entity('CWUser', login=unicode(login),
                                  upassword=password, **kwargs)
         req.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)'
@@ -309,22 +308,21 @@
         else:
             if not kwargs:
                 kwargs['password'] = str(login)
-            self.cnx = repo_connect(self.repo, unicode(login),
-                                    cnxprops=ConnectionProperties('inmemory'),
-                                    **kwargs)
+            self.cnx = repo_connect(self.repo, unicode(login), **kwargs)
+            self.websession = DBAPISession(self.cnx)
             self._cnxs.append(self.cnx)
         if login == self.vreg.config.anonymous_user()[0]:
             self.cnx.anonymous_connection = True
         return self.cnx
 
     def restore_connection(self):
-        if not self.cnx is self._orig_cnx:
+        if not self.cnx is self._orig_cnx[0]:
             try:
                 self.cnx.close()
                 self._cnxs.remove(self.cnx)
             except ProgrammingError:
                 pass # already closed
-        self.cnx = self._orig_cnx
+        self.cnx, self.websession = self._orig_cnx
 
     # db api ##################################################################
 
@@ -486,7 +484,7 @@
     def request(self, *args, **kwargs):
         """return a web ui request"""
         req = self.requestcls(self.vreg, form=kwargs)
-        req.set_connection(self.cnx)
+        req.set_session(self.websession)
         return req
 
     def remote_call(self, fname, *args):
@@ -542,27 +540,31 @@
         self.set_option('auth-mode', authmode)
         self.set_option('anonymous-user', anonuser)
         req = self.request()
-        origcnx = req.cnx
-        req.cnx = None
+        origsession = req.session
+        req.session = req.cnx = None
+        del req.execute # get back to class implementation
         sh = self.app.session_handler
         authm = sh.session_manager.authmanager
         authm.anoninfo = self.vreg.config.anonymous_user()
+        authm.anoninfo = authm.anoninfo[0], {'password': authm.anoninfo[1]}
         # not properly cleaned between tests
         self.open_sessions = sh.session_manager._sessions = {}
-        return req, origcnx
+        return req, origsession
 
-    def assertAuthSuccess(self, req, origcnx, nbsessions=1):
+    def assertAuthSuccess(self, req, origsession, nbsessions=1):
         sh = self.app.session_handler
         path, params = self.expect_redirect(lambda x: self.app.connect(x), req)
-        cnx = req.cnx
+        session = req.session
         self.assertEquals(len(self.open_sessions), nbsessions, self.open_sessions)
-        self.assertEquals(cnx.login, origcnx.login)
-        self.assertEquals(cnx.anonymous_connection, False)
+        self.assertEquals(session.login, origsession.login)
+        self.assertEquals(session.anonymous_session, False)
         self.assertEquals(path, 'view')
-        self.assertEquals(params, {'__message': 'welcome %s !' % cnx.user().login})
+        self.assertEquals(params, {'__message': 'welcome %s !' % req.user.login})
 
     def assertAuthFailure(self, req, nbsessions=0):
-        self.assertRaises(AuthenticationError, self.app.connect, req)
+        self.app.connect(req)
+        self.assertIsInstance(req.session, DBAPISession)
+        self.assertEquals(req.session.cnx, None)
         self.assertEquals(req.cnx, None)
         self.assertEquals(len(self.open_sessions), nbsessions)
         clear_cache(req, 'get_authorization')