diff -r e4580e5f0703 -r bad26a22fe29 server/test/unittest_ldapuser.py --- a/server/test/unittest_ldapuser.py Fri Mar 11 09:46:45 2011 +0100 +++ b/server/test/unittest_ldapuser.py Tue Dec 07 12:18:20 2010 +0100 @@ -24,10 +24,11 @@ import subprocess from socket import socket, error as socketerror -from logilab.common.testlib import TestCase, unittest_main, mock_object +from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags from cubicweb.devtools.testlib import CubicWebTC from cubicweb.devtools.repotest import RQLGeneratorTC from cubicweb.devtools.httptest import get_available_port +from cubicweb.devtools import get_test_db_handler from cubicweb.server.sources.ldapuser import * @@ -65,26 +66,10 @@ def setUpModule(*args): create_slapd_configuration(LDAPUserSourceTC.config) - global repo - try: - LDAPUserSourceTC._init_repo() - repo = LDAPUserSourceTC.repo - add_ldap_source(LDAPUserSourceTC.cnx) - except: - terminate_slapd() - raise def tearDownModule(*args): - global repo - repo.shutdown() - del repo terminate_slapd() -def add_ldap_source(cnx): - cnx.request().create_entity('CWSource', name=u'ldapuser', type=u'ldapuser', - config=CONFIG) - cnx.commit() - def create_slapd_configuration(config): global slapd_process, CONFIG basedir = join(config.apphome, "ldapdb") @@ -128,10 +113,19 @@ os.kill(slapd_process.pid, signal.SIGTERM) slapd_process.wait() print "DONE" - del slapd_process class LDAPUserSourceTC(CubicWebTC): + test_db_id = 'ldap-user' + tags = CubicWebTC.tags | Tags(('ldap')) + + @classmethod + def pre_setup_database(cls, session, config): + session.create_entity('CWSource', name=u'ldapuser', type=u'ldapuser', + config=CONFIG) + session.commit() + # XXX keep it there + session.execute('CWUser U') def patch_authenticate(self): self._orig_authenticate = LDAPUserSource.authenticate @@ -276,14 +270,16 @@ self.session.create_entity('CWGroup', name=u'bougloup2') self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"') self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s', {'syt': SYT}) - rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")') + rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, ' + 'S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")') self.assertEqual(rset.rows, [['admin', 'activated'], [SYT, 'activated']]) def test_exists2(self): self.create_user('comme') self.create_user('cochon') self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') - rset = self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, (G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))') + rset = self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, ' + '(G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))') self.assertEqual(rset.rows, [['managers'], ['users']]) def test_exists3(self): @@ -293,7 +289,8 @@ self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')) self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': SYT}) self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': SYT})) - rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon"))') + rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" ' + 'OR EXISTS(X copain T, T login in ("comme", "cochon"))') self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', SYT]]) def test_exists4(self): @@ -398,8 +395,10 @@ def test_nonregr5(self): # original jpl query: - # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5 - rql = 'Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "%s", P is X, X creation_date CD' % self.session.user.login + # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, + # U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5 + rql = ('Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, ' + 'U login "%s", P is X, X creation_date CD') % self.session.user.login self.sexecute(rql, )#{'x': }) def test_nonregr6(self): @@ -446,9 +445,20 @@ class RQL2LDAPFilterTC(RQLGeneratorTC): + tags = RQLGeneratorTC.tags | Tags(('ldap')) + + @property + def schema(self): + """return the application schema""" + return self._schema + def setUp(self): - self.schema = repo.schema - RQLGeneratorTC.setUp(self) + self.handler = get_test_db_handler(LDAPUserSourceTC.config) + self.handler.build_db_cache('ldap-user', LDAPUserSourceTC.pre_setup_database) + self.handler.restore_database('ldap-user') + self._repo = repo = self.handler.get_repo() + self._schema = repo.schema + super(RQL2LDAPFilterTC, self).setUp() ldapsource = repo.sources[-1] self.pool = repo._get_pool() session = mock_object(pool=self.pool) @@ -457,8 +467,8 @@ for ldapcls in ldapsource.user_classes) def tearDown(self): - repo._free_pool(self.pool) - RQLGeneratorTC.tearDown(self) + self._repo.turn_repo_off() + super(RQL2LDAPFilterTC, self).tearDown() def test_base(self): rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0]