154 MAILBOX.append(Email(fromaddr, recipients, msg)) |
154 MAILBOX.append(Email(fromaddr, recipients, msg)) |
155 |
155 |
156 cwconfig.SMTP = MockSMTP |
156 cwconfig.SMTP = MockSMTP |
157 |
157 |
158 |
158 |
159 class TestCaseConnectionProxy(object): |
|
160 """thin wrapper around `cubicweb.repoapi.Connection` context-manager |
|
161 used in CubicWebTC (cf. `cubicweb.devtools.testlib.CubicWebTC.login` method) |
|
162 |
|
163 It just proxies to the default connection context manager but |
|
164 restores the original connection on exit. |
|
165 """ |
|
166 def __init__(self, testcase, cnx): |
|
167 self.testcase = testcase |
|
168 self.cnx = cnx |
|
169 |
|
170 def __getattr__(self, attrname): |
|
171 return getattr(self.cnx, attrname) |
|
172 |
|
173 def __enter__(self): |
|
174 # already open |
|
175 return self.cnx |
|
176 |
|
177 def __exit__(self, exctype, exc, tb): |
|
178 try: |
|
179 return self.cnx.__exit__(exctype, exc, tb) |
|
180 finally: |
|
181 self.testcase.restore_connection() |
|
182 |
|
183 # Repoaccess utility ###############################################3########### |
159 # Repoaccess utility ###############################################3########### |
184 |
160 |
185 class RepoAccess(object): |
161 class RepoAccess(object): |
186 """An helper to easily create object to access the repo as a specific user |
162 """An helper to easily create object to access the repo as a specific user |
187 |
163 |
289 appid = 'data' |
265 appid = 'data' |
290 configcls = devtools.ApptestConfiguration |
266 configcls = devtools.ApptestConfiguration |
291 requestcls = fake.FakeRequest |
267 requestcls = fake.FakeRequest |
292 tags = TestCase.tags | Tags('cubicweb', 'cw_repo') |
268 tags = TestCase.tags | Tags('cubicweb', 'cw_repo') |
293 test_db_id = DEFAULT_EMPTY_DB_ID |
269 test_db_id = DEFAULT_EMPTY_DB_ID |
294 _cnxs = set() # establised connection |
|
295 # stay on connection for leak detection purpose |
|
296 |
270 |
297 # anonymous is logged by default in cubicweb test cases |
271 # anonymous is logged by default in cubicweb test cases |
298 anonymous_allowed = True |
272 anonymous_allowed = True |
299 |
273 |
300 def __init__(self, *args, **kwargs): |
274 def __init__(self, *args, **kwargs): |
301 self._admin_session = None |
275 self._admin_session = None |
302 self._admin_cnx = None |
|
303 self._current_session = None |
|
304 self._current_cnx = None |
|
305 self.repo = None |
276 self.repo = None |
306 self._open_access = set() |
277 self._open_access = set() |
307 super(CubicWebTC, self).__init__(*args, **kwargs) |
278 super(CubicWebTC, self).__init__(*args, **kwargs) |
308 |
279 |
309 # repository connection handling ########################################### |
280 # repository connection handling ########################################### |
322 try: |
293 try: |
323 self._open_access.pop().close() |
294 self._open_access.pop().close() |
324 except BadConnectionId: |
295 except BadConnectionId: |
325 continue # already closed |
296 continue # already closed |
326 |
297 |
327 @deprecated('[3.19] explicitly use RepoAccess object in test instead') |
|
328 def set_cnx(self, cnx): |
|
329 assert getattr(cnx, '_session', None) is not None |
|
330 if cnx is self._admin_cnx: |
|
331 self._pop_custom_cnx() |
|
332 else: |
|
333 self._cnxs.add(cnx) # register the cnx to make sure it is removed |
|
334 self._current_session = cnx.session |
|
335 self._current_cnx = cnx |
|
336 |
|
337 @property |
298 @property |
338 @deprecated('[3.19] explicitly use RepoAccess object in test instead') |
|
339 def cnx(self): |
|
340 # XXX we want to deprecate this |
|
341 cnx = self._current_cnx |
|
342 if cnx is None: |
|
343 cnx = self._admin_cnx |
|
344 return cnx |
|
345 |
|
346 def _close_cnx(self): |
|
347 """ensure that all cnx used by a test have been closed""" |
|
348 for cnx in list(self._cnxs): |
|
349 cnx.rollback() |
|
350 self._cnxs.remove(cnx) |
|
351 |
|
352 @property |
|
353 @deprecated('[3.19] explicitly use RepoAccess object in test instead') |
|
354 def session(self): |
299 def session(self): |
355 """return current server side session""" |
300 """return admin session""" |
356 session = self._current_session |
|
357 if session is None: |
|
358 session = self._admin_session |
|
359 return session |
|
360 |
|
361 @property |
|
362 @deprecated('[3.19] explicitly use RepoAccess object in test instead') |
|
363 def websession(self): |
|
364 return self.session |
|
365 |
|
366 @property |
|
367 @deprecated('[3.19] explicitly use RepoAccess object in test instead') |
|
368 def adminsession(self): |
|
369 """return current server side session (using default manager account)""" |
|
370 return self._admin_session |
301 return self._admin_session |
371 |
|
372 @deprecated('[3.19] explicitly use RepoAccess object in test instead') |
|
373 def login(self, login, **kwargs): |
|
374 """return a connection for the given login/password""" |
|
375 __ = kwargs.pop('autoclose', True) # not used anymore |
|
376 if login == self.admlogin: |
|
377 # undo any previous login, if we're not used as a context manager |
|
378 self.restore_connection() |
|
379 return self.cnx |
|
380 else: |
|
381 if not kwargs: |
|
382 kwargs['password'] = str(login) |
|
383 cnx = repoapi.connect(self.repo, login, **kwargs) |
|
384 self.set_cnx(cnx) |
|
385 cnx.__enter__() |
|
386 return TestCaseConnectionProxy(self, cnx) |
|
387 |
|
388 @deprecated('[3.19] explicitly use RepoAccess object in test instead') |
|
389 def restore_connection(self): |
|
390 self._pop_custom_cnx() |
|
391 |
|
392 def _pop_custom_cnx(self): |
|
393 if self._current_cnx is not None: |
|
394 if self._current_cnx._open: |
|
395 self._current_cnx.close() |
|
396 if not self._current_session.closed: |
|
397 self.repo.close(self._current_session.sessionid) |
|
398 self._current_cnx = None |
|
399 self._current_session = None |
|
400 |
302 |
401 #XXX this doesn't need to a be classmethod anymore |
303 #XXX this doesn't need to a be classmethod anymore |
402 def _init_repo(self): |
304 def _init_repo(self): |
403 """init the repository and connection to it. |
305 """init the repository and connection to it. |
404 """ |
306 """ |
409 self.repo = db_handler.get_repo(startup=True) |
311 self.repo = db_handler.get_repo(startup=True) |
410 # get an admin session (without actual login) |
312 # get an admin session (without actual login) |
411 login = unicode(db_handler.config.default_admin_config['login']) |
313 login = unicode(db_handler.config.default_admin_config['login']) |
412 self.admin_access = self.new_access(login) |
314 self.admin_access = self.new_access(login) |
413 self._admin_session = self.admin_access._session |
315 self._admin_session = self.admin_access._session |
414 self._admin_cnx = repoapi.Connection(self._admin_session) |
|
415 self._cnxs.add(self._admin_cnx) |
|
416 self._admin_cnx.__enter__() |
|
417 self.config.repository = lambda x=None: self.repo |
316 self.config.repository = lambda x=None: self.repo |
418 |
317 |
419 |
318 |
420 # config management ######################################################## |
319 # config management ######################################################## |
421 |
320 |
480 @property |
379 @property |
481 def schema(self): |
380 def schema(self): |
482 """return the application schema""" |
381 """return the application schema""" |
483 return self.vreg.schema |
382 return self.vreg.schema |
484 |
383 |
485 @deprecated('[3.19] explicitly use RepoAccess object in test instead') |
|
486 def shell(self): |
|
487 """return a shell session object""" |
|
488 from cubicweb.server.migractions import ServerMigrationHelper |
|
489 return ServerMigrationHelper(None, repo=self.repo, cnx=self.cnx, |
|
490 interactive=False, |
|
491 # hack so it don't try to load fs schema |
|
492 schema=1) |
|
493 |
|
494 def set_option(self, optname, value): |
384 def set_option(self, optname, value): |
495 self.config.global_set_option(optname, value) |
385 self.config.global_set_option(optname, value) |
496 |
386 |
497 def set_debug(self, debugmode): |
387 def set_debug(self, debugmode): |
498 server.set_debug(debugmode) |
388 server.set_debug(debugmode) |
509 previous_failure = self.__class__.__dict__.get('_repo_init_failed') |
399 previous_failure = self.__class__.__dict__.get('_repo_init_failed') |
510 if previous_failure is not None: |
400 if previous_failure is not None: |
511 self.skipTest('repository is not initialised: %r' % previous_failure) |
401 self.skipTest('repository is not initialised: %r' % previous_failure) |
512 try: |
402 try: |
513 self._init_repo() |
403 self._init_repo() |
514 self.addCleanup(self._close_cnx) |
|
515 except Exception as ex: |
404 except Exception as ex: |
516 self.__class__._repo_init_failed = ex |
405 self.__class__._repo_init_failed = ex |
517 raise |
406 raise |
518 self.addCleanup(self._close_access) |
407 self.addCleanup(self._close_access) |
519 self.setup_database() |
408 self.setup_database() |
520 self._admin_cnx.commit() |
|
521 MAILBOX[:] = [] # reset mailbox |
409 MAILBOX[:] = [] # reset mailbox |
522 |
410 |
523 def tearDown(self): |
411 def tearDown(self): |
524 # XXX hack until logilab.common.testlib is fixed |
412 # XXX hack until logilab.common.testlib is fixed |
525 if self._admin_cnx is not None: |
|
526 self._admin_cnx = None |
|
527 if self._admin_session is not None: |
413 if self._admin_session is not None: |
528 self.repo.close(self._admin_session.sessionid) |
414 self.repo.close(self._admin_session.sessionid) |
529 self._admin_session = None |
415 self._admin_session = None |
530 while self._cleanups: |
416 while self._cleanups: |
531 cleanup, args, kwargs = self._cleanups.pop(-1) |
417 cleanup, args, kwargs = self._cleanups.pop(-1) |
563 |
449 |
564 @iclassmethod # XXX turn into a class method |
450 @iclassmethod # XXX turn into a class method |
565 def create_user(self, req, login=None, groups=('users',), password=None, |
451 def create_user(self, req, login=None, groups=('users',), password=None, |
566 email=None, commit=True, **kwargs): |
452 email=None, commit=True, **kwargs): |
567 """create and return a new user entity""" |
453 """create and return a new user entity""" |
568 if isinstance(req, basestring): |
454 if password is None: |
569 warn('[3.12] create_user arguments are now (req, login[, groups, password, commit, **kwargs])', |
455 password = login |
570 DeprecationWarning, stacklevel=2) |
|
571 if not isinstance(groups, (tuple, list)): |
|
572 password = groups |
|
573 groups = login |
|
574 elif isinstance(login, tuple): |
|
575 groups = login |
|
576 req = self._admin_cnx |
|
577 if login is not None: |
456 if login is not None: |
578 login = unicode(login) |
457 login = unicode(login) |
579 if password is None: |
|
580 password = login |
|
581 user = req.create_entity('CWUser', login=login, |
458 user = req.create_entity('CWUser', login=login, |
582 upassword=password, **kwargs) |
459 upassword=password, **kwargs) |
583 req.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)' |
460 req.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)' |
584 % ','.join(repr(str(g)) for g in groups), |
461 % ','.join(repr(str(g)) for g in groups), |
585 {'x': user.eid}) |
462 {'x': user.eid}) |