server/test/unittest_ldapuser.py
branchstable
changeset 7088 76e0dba5f8f3
parent 6991 eb2ba251f093
parent 7078 bad26a22fe29
child 7121 c2badb6de3fe
--- a/server/test/unittest_ldapuser.py	Wed Mar 16 09:45:57 2011 +0100
+++ b/server/test/unittest_ldapuser.py	Wed Mar 16 18:12:57 2011 +0100
@@ -24,10 +24,11 @@
 import subprocess
 from socket import socket, error as socketerror
 
-from logilab.common.testlib import TestCase, unittest_main, mock_object
+from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags
 from cubicweb.devtools.testlib import CubicWebTC
 from cubicweb.devtools.repotest import RQLGeneratorTC
 from cubicweb.devtools.httptest import get_available_port
+from cubicweb.devtools import get_test_db_handler
 
 from cubicweb.server.sources.ldapuser import *
 
@@ -64,26 +65,10 @@
 
 def setUpModule(*args):
     create_slapd_configuration(LDAPUserSourceTC.config)
-    global repo
-    try:
-        LDAPUserSourceTC._init_repo()
-        repo = LDAPUserSourceTC.repo
-        add_ldap_source(LDAPUserSourceTC.cnx)
-    except:
-        terminate_slapd()
-        raise
 
 def tearDownModule(*args):
-    global repo
-    repo.shutdown()
-    del repo
     terminate_slapd()
 
-def add_ldap_source(cnx):
-    cnx.request().create_entity('CWSource', name=u'ldapuser', type=u'ldapuser',
-                                config=CONFIG)
-    cnx.commit()
-
 def create_slapd_configuration(config):
     global slapd_process, CONFIG
     basedir = join(config.apphome, "ldapdb")
@@ -127,10 +112,19 @@
             os.kill(slapd_process.pid, signal.SIGTERM)
         slapd_process.wait()
         print "DONE"
-
     del slapd_process
 
 class LDAPUserSourceTC(CubicWebTC):
+    test_db_id = 'ldap-user'
+    tags = CubicWebTC.tags | Tags(('ldap'))
+
+    @classmethod
+    def pre_setup_database(cls, session, config):
+        session.create_entity('CWSource', name=u'ldapuser', type=u'ldapuser',
+                                    config=CONFIG)
+        session.commit()
+        # XXX keep it there
+        session.execute('CWUser U')
 
     def patch_authenticate(self):
         self._orig_authenticate = LDAPUserSource.authenticate
@@ -275,14 +269,16 @@
         self.session.create_entity('CWGroup', name=u'bougloup2')
         self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"')
         self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s', {'syt': SYT})
-        rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")')
+        rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, '
+                             'S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")')
         self.assertEqual(rset.rows, [['admin', 'activated'], [SYT, 'activated']])
 
     def test_exists2(self):
         self.create_user('comme')
         self.create_user('cochon')
         self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
-        rset = self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, (G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))')
+        rset = self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, '
+                             '(G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))')
         self.assertEqual(rset.rows, [['managers'], ['users']])
 
     def test_exists3(self):
@@ -292,7 +288,8 @@
         self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"'))
         self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': SYT})
         self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': SYT}))
-        rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon"))')
+        rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" '
+                             'OR EXISTS(X copain T, T login in ("comme", "cochon"))')
         self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', SYT]])
 
     def test_exists4(self):
@@ -397,8 +394,10 @@
 
     def test_nonregr5(self):
         # original jpl query:
-        # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5
-        rql = 'Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "%s", P is X, X creation_date CD' % self.session.user.login
+        # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser,
+        # U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5
+        rql = ('Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, '
+               'U login "%s", P is X, X creation_date CD') % self.session.user.login
         self.sexecute(rql, )#{'x': })
 
     def test_nonregr6(self):
@@ -445,9 +444,20 @@
 
 class RQL2LDAPFilterTC(RQLGeneratorTC):
 
+    tags = RQLGeneratorTC.tags | Tags(('ldap'))
+
+    @property
+    def schema(self):
+        """return the application schema"""
+        return self._schema
+
     def setUp(self):
-        self.schema = repo.schema
-        RQLGeneratorTC.setUp(self)
+        self.handler = get_test_db_handler(LDAPUserSourceTC.config)
+        self.handler.build_db_cache('ldap-user', LDAPUserSourceTC.pre_setup_database)
+        self.handler.restore_database('ldap-user')
+        self._repo = repo = self.handler.get_repo()
+        self._schema = repo.schema
+        super(RQL2LDAPFilterTC, self).setUp()
         ldapsource = repo.sources[-1]
         self.pool = repo._get_pool()
         session = mock_object(pool=self.pool)
@@ -455,8 +465,8 @@
         self.ldapclasses = ''.join(ldapsource.base_filters)
 
     def tearDown(self):
-        repo._free_pool(self.pool)
-        RQLGeneratorTC.tearDown(self)
+        self._repo.turn_repo_off()
+        super(RQL2LDAPFilterTC, self).tearDown()
 
     def test_base(self):
         rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0]