155 |
155 |
156 cwconfig.SMTP = MockSMTP |
156 cwconfig.SMTP = MockSMTP |
157 |
157 |
158 |
158 |
159 class TestCaseConnectionProxy(object): |
159 class TestCaseConnectionProxy(object): |
160 """thin wrapper around `cubicweb.repoapi.ClientConnection` context-manager |
160 """thin wrapper around `cubicweb.repoapi.Connection` context-manager |
161 used in CubicWebTC (cf. `cubicweb.devtools.testlib.CubicWebTC.login` method) |
161 used in CubicWebTC (cf. `cubicweb.devtools.testlib.CubicWebTC.login` method) |
162 |
162 |
163 It just proxies to the default connection context manager but |
163 It just proxies to the default connection context manager but |
164 restores the original connection on exit. |
164 restores the original connection on exit. |
165 """ |
165 """ |
297 # anonymous is logged by default in cubicweb test cases |
297 # anonymous is logged by default in cubicweb test cases |
298 anonymous_allowed = True |
298 anonymous_allowed = True |
299 |
299 |
300 def __init__(self, *args, **kwargs): |
300 def __init__(self, *args, **kwargs): |
301 self._admin_session = None |
301 self._admin_session = None |
302 self._admin_clt_cnx = None |
302 self._admin_cnx = None |
303 self._current_session = None |
303 self._current_session = None |
304 self._current_clt_cnx = None |
304 self._current_cnx = None |
305 self.repo = None |
305 self.repo = None |
306 self._open_access = set() |
306 self._open_access = set() |
307 super(CubicWebTC, self).__init__(*args, **kwargs) |
307 super(CubicWebTC, self).__init__(*args, **kwargs) |
308 |
308 |
309 # repository connection handling ########################################### |
309 # repository connection handling ########################################### |
325 continue # already closed |
325 continue # already closed |
326 |
326 |
327 @deprecated('[3.19] explicitly use RepoAccess object in test instead') |
327 @deprecated('[3.19] explicitly use RepoAccess object in test instead') |
328 def set_cnx(self, cnx): |
328 def set_cnx(self, cnx): |
329 assert getattr(cnx, '_session', None) is not None |
329 assert getattr(cnx, '_session', None) is not None |
330 if cnx is self._admin_clt_cnx: |
330 if cnx is self._admin_cnx: |
331 self._pop_custom_cnx() |
331 self._pop_custom_cnx() |
332 else: |
332 else: |
333 self._cnxs.add(cnx) # register the cnx to make sure it is removed |
333 self._cnxs.add(cnx) # register the cnx to make sure it is removed |
334 self._current_session = cnx.session |
334 self._current_session = cnx.session |
335 self._current_clt_cnx = cnx |
335 self._current_cnx = cnx |
336 |
336 |
337 @property |
337 @property |
338 @deprecated('[3.19] explicitly use RepoAccess object in test instead') |
338 @deprecated('[3.19] explicitly use RepoAccess object in test instead') |
339 def cnx(self): |
339 def cnx(self): |
340 # XXX we want to deprecate this |
340 # XXX we want to deprecate this |
341 clt_cnx = self._current_clt_cnx |
341 cnx = self._current_cnx |
342 if clt_cnx is None: |
342 if cnx is None: |
343 clt_cnx = self._admin_clt_cnx |
343 cnx = self._admin_cnx |
344 return clt_cnx |
344 return cnx |
345 |
345 |
346 def _close_cnx(self): |
346 def _close_cnx(self): |
347 """ensure that all cnx used by a test have been closed""" |
347 """ensure that all cnx used by a test have been closed""" |
348 for cnx in list(self._cnxs): |
348 for cnx in list(self._cnxs): |
349 cnx.rollback() |
349 cnx.rollback() |
378 self.restore_connection() |
378 self.restore_connection() |
379 return self.cnx |
379 return self.cnx |
380 else: |
380 else: |
381 if not kwargs: |
381 if not kwargs: |
382 kwargs['password'] = str(login) |
382 kwargs['password'] = str(login) |
383 clt_cnx = repoapi.connect(self.repo, login, **kwargs) |
383 cnx = repoapi.connect(self.repo, login, **kwargs) |
384 self.set_cnx(clt_cnx) |
384 self.set_cnx(cnx) |
385 clt_cnx.__enter__() |
385 cnx.__enter__() |
386 return TestCaseConnectionProxy(self, clt_cnx) |
386 return TestCaseConnectionProxy(self, cnx) |
387 |
387 |
388 @deprecated('[3.19] explicitly use RepoAccess object in test instead') |
388 @deprecated('[3.19] explicitly use RepoAccess object in test instead') |
389 def restore_connection(self): |
389 def restore_connection(self): |
390 self._pop_custom_cnx() |
390 self._pop_custom_cnx() |
391 |
391 |
392 def _pop_custom_cnx(self): |
392 def _pop_custom_cnx(self): |
393 if self._current_clt_cnx is not None: |
393 if self._current_cnx is not None: |
394 if self._current_clt_cnx._open: |
394 if self._current_cnx._open: |
395 self._current_clt_cnx.close() |
395 self._current_cnx.close() |
396 if not self._current_session.closed: |
396 if not self._current_session.closed: |
397 self.repo.close(self._current_session.sessionid) |
397 self.repo.close(self._current_session.sessionid) |
398 self._current_clt_cnx = None |
398 self._current_cnx = None |
399 self._current_session = None |
399 self._current_session = None |
400 |
400 |
401 #XXX this doesn't need to a be classmethod anymore |
401 #XXX this doesn't need to a be classmethod anymore |
402 def _init_repo(self): |
402 def _init_repo(self): |
403 """init the repository and connection to it. |
403 """init the repository and connection to it. |
409 self.repo = db_handler.get_repo(startup=True) |
409 self.repo = db_handler.get_repo(startup=True) |
410 # get an admin session (without actual login) |
410 # get an admin session (without actual login) |
411 login = unicode(db_handler.config.default_admin_config['login']) |
411 login = unicode(db_handler.config.default_admin_config['login']) |
412 self.admin_access = self.new_access(login) |
412 self.admin_access = self.new_access(login) |
413 self._admin_session = self.admin_access._session |
413 self._admin_session = self.admin_access._session |
414 self._admin_clt_cnx = repoapi.Connection(self._admin_session) |
414 self._admin_cnx = repoapi.Connection(self._admin_session) |
415 self._cnxs.add(self._admin_clt_cnx) |
415 self._cnxs.add(self._admin_cnx) |
416 self._admin_clt_cnx.__enter__() |
416 self._admin_cnx.__enter__() |
417 self.config.repository = lambda x=None: self.repo |
417 self.config.repository = lambda x=None: self.repo |
418 |
418 |
419 |
419 |
420 # config management ######################################################## |
420 # config management ######################################################## |
421 |
421 |
515 except Exception as ex: |
515 except Exception as ex: |
516 self.__class__._repo_init_failed = ex |
516 self.__class__._repo_init_failed = ex |
517 raise |
517 raise |
518 self.addCleanup(self._close_access) |
518 self.addCleanup(self._close_access) |
519 self.setup_database() |
519 self.setup_database() |
520 self._admin_clt_cnx.commit() |
520 self._admin_cnx.commit() |
521 MAILBOX[:] = [] # reset mailbox |
521 MAILBOX[:] = [] # reset mailbox |
522 |
522 |
523 def tearDown(self): |
523 def tearDown(self): |
524 # XXX hack until logilab.common.testlib is fixed |
524 # XXX hack until logilab.common.testlib is fixed |
525 if self._admin_clt_cnx is not None: |
525 if self._admin_cnx is not None: |
526 self._admin_clt_cnx = None |
526 self._admin_cnx = None |
527 if self._admin_session is not None: |
527 if self._admin_session is not None: |
528 self.repo.close(self._admin_session.sessionid) |
528 self.repo.close(self._admin_session.sessionid) |
529 self._admin_session = None |
529 self._admin_session = None |
530 while self._cleanups: |
530 while self._cleanups: |
531 cleanup, args, kwargs = self._cleanups.pop(-1) |
531 cleanup, args, kwargs = self._cleanups.pop(-1) |
571 if not isinstance(groups, (tuple, list)): |
571 if not isinstance(groups, (tuple, list)): |
572 password = groups |
572 password = groups |
573 groups = login |
573 groups = login |
574 elif isinstance(login, tuple): |
574 elif isinstance(login, tuple): |
575 groups = login |
575 groups = login |
576 login = req |
576 req = self._admin_cnx |
577 assert not isinstance(self, type) |
|
578 req = self._admin_clt_cnx |
|
579 if login is not None: |
577 if login is not None: |
580 login = unicode(login) |
578 login = unicode(login) |
581 if password is None: |
579 if password is None: |
582 password = login |
580 password = login |
583 user = req.create_entity('CWUser', login=login, |
581 user = req.create_entity('CWUser', login=login, |
967 return req, self.session |
965 return req, self.session |
968 |
966 |
969 def assertAuthSuccess(self, req, origsession, nbsessions=1): |
967 def assertAuthSuccess(self, req, origsession, nbsessions=1): |
970 sh = self.app.session_handler |
968 sh = self.app.session_handler |
971 session = self.app.get_session(req) |
969 session = self.app.get_session(req) |
972 clt_cnx = repoapi.Connection(session) |
970 cnx = repoapi.Connection(session) |
973 req.set_cnx(clt_cnx) |
971 req.set_cnx(cnx) |
974 self.assertEqual(len(self.open_sessions), nbsessions, self.open_sessions) |
972 self.assertEqual(len(self.open_sessions), nbsessions, self.open_sessions) |
975 self.assertEqual(session.login, origsession.login) |
973 self.assertEqual(session.login, origsession.login) |
976 self.assertEqual(session.anonymous_session, False) |
974 self.assertEqual(session.anonymous_session, False) |
977 |
975 |
978 def assertAuthFailure(self, req, nbsessions=0): |
976 def assertAuthFailure(self, req, nbsessions=0): |