28 from logilab.common.decorators import cached, classproperty, clear_cache |
28 from logilab.common.decorators import cached, classproperty, clear_cache |
29 from logilab.common.deprecation import deprecated |
29 from logilab.common.deprecation import deprecated |
30 |
30 |
31 from cubicweb import ValidationError, NoSelectableObject, AuthenticationError |
31 from cubicweb import ValidationError, NoSelectableObject, AuthenticationError |
32 from cubicweb import cwconfig, devtools, web, server |
32 from cubicweb import cwconfig, devtools, web, server |
33 from cubicweb.dbapi import repo_connect, ConnectionProperties, ProgrammingError |
33 from cubicweb.dbapi import ProgrammingError, DBAPISession, repo_connect |
34 from cubicweb.sobjects import notification |
34 from cubicweb.sobjects import notification |
35 from cubicweb.web import Redirect, application |
35 from cubicweb.web import Redirect, application |
36 from cubicweb.server.session import security_enabled |
36 from cubicweb.server.session import security_enabled |
37 from cubicweb.devtools import SYSTEM_ENTITIES, SYSTEM_RELATIONS, VIEW_VALIDATORS |
37 from cubicweb.devtools import SYSTEM_ENTITIES, SYSTEM_RELATIONS, VIEW_VALIDATORS |
38 from cubicweb.devtools import fake, htmlparser |
38 from cubicweb.devtools import fake, htmlparser |
212 def _build_repo(cls): |
212 def _build_repo(cls): |
213 cls.repo, cls.cnx = devtools.init_test_database(config=cls.config) |
213 cls.repo, cls.cnx = devtools.init_test_database(config=cls.config) |
214 cls.init_config(cls.config) |
214 cls.init_config(cls.config) |
215 cls.repo.hm.call_hooks('server_startup', repo=cls.repo) |
215 cls.repo.hm.call_hooks('server_startup', repo=cls.repo) |
216 cls.vreg = cls.repo.vreg |
216 cls.vreg = cls.repo.vreg |
217 cls._orig_cnx = cls.cnx |
217 cls.websession = DBAPISession(cls.cnx, cls.admlogin, |
|
218 {'password': cls.admpassword}) |
|
219 cls._orig_cnx = (cls.cnx, cls.websession) |
218 cls.config.repository = lambda x=None: cls.repo |
220 cls.config.repository = lambda x=None: cls.repo |
219 # necessary for authentication tests |
|
220 cls.cnx.login = cls.admlogin |
|
221 cls.cnx.authinfo = {'password': cls.admpassword} |
|
222 |
221 |
223 @classmethod |
222 @classmethod |
224 def _refresh_repo(cls): |
223 def _refresh_repo(cls): |
225 refresh_repo(cls.repo, cls.reset_schema, cls.reset_vreg) |
224 refresh_repo(cls.repo, cls.reset_schema, cls.reset_vreg) |
226 |
225 |
239 return session |
238 return session |
240 |
239 |
241 @property |
240 @property |
242 def adminsession(self): |
241 def adminsession(self): |
243 """return current server side session (using default manager account)""" |
242 """return current server side session (using default manager account)""" |
244 return self.repo._sessions[self._orig_cnx.sessionid] |
243 return self.repo._sessions[self._orig_cnx[0].sessionid] |
245 |
244 |
246 def set_option(self, optname, value): |
245 def set_option(self, optname, value): |
247 self.config.global_set_option(optname, value) |
246 self.config.global_set_option(optname, value) |
248 |
247 |
249 def set_debug(self, debugmode): |
248 def set_debug(self, debugmode): |
289 commit=True, **kwargs): |
288 commit=True, **kwargs): |
290 """create and return a new user entity""" |
289 """create and return a new user entity""" |
291 if password is None: |
290 if password is None: |
292 password = login.encode('utf8') |
291 password = login.encode('utf8') |
293 if req is None: |
292 if req is None: |
294 req = self._orig_cnx.request() |
293 req = self._orig_cnx[0].request() |
295 user = req.create_entity('CWUser', login=unicode(login), |
294 user = req.create_entity('CWUser', login=unicode(login), |
296 upassword=password, **kwargs) |
295 upassword=password, **kwargs) |
297 req.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)' |
296 req.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)' |
298 % ','.join(repr(g) for g in groups), |
297 % ','.join(repr(g) for g in groups), |
299 {'x': user.eid}) |
298 {'x': user.eid}) |
307 if login == self.admlogin: |
306 if login == self.admlogin: |
308 self.restore_connection() |
307 self.restore_connection() |
309 else: |
308 else: |
310 if not kwargs: |
309 if not kwargs: |
311 kwargs['password'] = str(login) |
310 kwargs['password'] = str(login) |
312 self.cnx = repo_connect(self.repo, unicode(login), |
311 self.cnx = repo_connect(self.repo, unicode(login), **kwargs) |
313 cnxprops=ConnectionProperties('inmemory'), |
312 self.websession = DBAPISession(self.cnx) |
314 **kwargs) |
|
315 self._cnxs.append(self.cnx) |
313 self._cnxs.append(self.cnx) |
316 if login == self.vreg.config.anonymous_user()[0]: |
314 if login == self.vreg.config.anonymous_user()[0]: |
317 self.cnx.anonymous_connection = True |
315 self.cnx.anonymous_connection = True |
318 return self.cnx |
316 return self.cnx |
319 |
317 |
320 def restore_connection(self): |
318 def restore_connection(self): |
321 if not self.cnx is self._orig_cnx: |
319 if not self.cnx is self._orig_cnx[0]: |
322 try: |
320 try: |
323 self.cnx.close() |
321 self.cnx.close() |
324 self._cnxs.remove(self.cnx) |
322 self._cnxs.remove(self.cnx) |
325 except ProgrammingError: |
323 except ProgrammingError: |
326 pass # already closed |
324 pass # already closed |
327 self.cnx = self._orig_cnx |
325 self.cnx, self.websession = self._orig_cnx |
328 |
326 |
329 # db api ################################################################## |
327 # db api ################################################################## |
330 |
328 |
331 @nocoverage |
329 @nocoverage |
332 def cursor(self, req=None): |
330 def cursor(self, req=None): |
484 |
482 |
485 requestcls = fake.FakeRequest |
483 requestcls = fake.FakeRequest |
486 def request(self, *args, **kwargs): |
484 def request(self, *args, **kwargs): |
487 """return a web ui request""" |
485 """return a web ui request""" |
488 req = self.requestcls(self.vreg, form=kwargs) |
486 req = self.requestcls(self.vreg, form=kwargs) |
489 req.set_connection(self.cnx) |
487 req.set_session(self.websession) |
490 return req |
488 return req |
491 |
489 |
492 def remote_call(self, fname, *args): |
490 def remote_call(self, fname, *args): |
493 """remote json call simulation""" |
491 """remote json call simulation""" |
494 dump = simplejson.dumps |
492 dump = simplejson.dumps |
540 |
538 |
541 def init_authentication(self, authmode, anonuser=None): |
539 def init_authentication(self, authmode, anonuser=None): |
542 self.set_option('auth-mode', authmode) |
540 self.set_option('auth-mode', authmode) |
543 self.set_option('anonymous-user', anonuser) |
541 self.set_option('anonymous-user', anonuser) |
544 req = self.request() |
542 req = self.request() |
545 origcnx = req.cnx |
543 origsession = req.session |
546 req.cnx = None |
544 req.session = req.cnx = None |
|
545 del req.execute # get back to class implementation |
547 sh = self.app.session_handler |
546 sh = self.app.session_handler |
548 authm = sh.session_manager.authmanager |
547 authm = sh.session_manager.authmanager |
549 authm.anoninfo = self.vreg.config.anonymous_user() |
548 authm.anoninfo = self.vreg.config.anonymous_user() |
|
549 authm.anoninfo = authm.anoninfo[0], {'password': authm.anoninfo[1]} |
550 # not properly cleaned between tests |
550 # not properly cleaned between tests |
551 self.open_sessions = sh.session_manager._sessions = {} |
551 self.open_sessions = sh.session_manager._sessions = {} |
552 return req, origcnx |
552 return req, origsession |
553 |
553 |
554 def assertAuthSuccess(self, req, origcnx, nbsessions=1): |
554 def assertAuthSuccess(self, req, origsession, nbsessions=1): |
555 sh = self.app.session_handler |
555 sh = self.app.session_handler |
556 path, params = self.expect_redirect(lambda x: self.app.connect(x), req) |
556 path, params = self.expect_redirect(lambda x: self.app.connect(x), req) |
557 cnx = req.cnx |
557 session = req.session |
558 self.assertEquals(len(self.open_sessions), nbsessions, self.open_sessions) |
558 self.assertEquals(len(self.open_sessions), nbsessions, self.open_sessions) |
559 self.assertEquals(cnx.login, origcnx.login) |
559 self.assertEquals(session.login, origsession.login) |
560 self.assertEquals(cnx.anonymous_connection, False) |
560 self.assertEquals(session.anonymous_session, False) |
561 self.assertEquals(path, 'view') |
561 self.assertEquals(path, 'view') |
562 self.assertEquals(params, {'__message': 'welcome %s !' % cnx.user().login}) |
562 self.assertEquals(params, {'__message': 'welcome %s !' % req.user.login}) |
563 |
563 |
564 def assertAuthFailure(self, req, nbsessions=0): |
564 def assertAuthFailure(self, req, nbsessions=0): |
565 self.assertRaises(AuthenticationError, self.app.connect, req) |
565 self.app.connect(req) |
|
566 self.assertIsInstance(req.session, DBAPISession) |
|
567 self.assertEquals(req.session.cnx, None) |
566 self.assertEquals(req.cnx, None) |
568 self.assertEquals(req.cnx, None) |
567 self.assertEquals(len(self.open_sessions), nbsessions) |
569 self.assertEquals(len(self.open_sessions), nbsessions) |
568 clear_cache(req, 'get_authorization') |
570 clear_cache(req, 'get_authorization') |
569 |
571 |
570 # content validation ####################################################### |
572 # content validation ####################################################### |