server/test/unittest_ldapuser.py
changeset 8430 5bee87a14bb1
parent 8387 b59af20a868d
child 8434 39c5bb4dcc59
--- a/server/test/unittest_ldapuser.py	Fri May 25 17:18:00 2012 +0200
+++ b/server/test/unittest_ldapuser.py	Thu May 31 15:56:21 2012 +0200
@@ -37,12 +37,6 @@
 CONFIG = u'user-base-dn=ou=People,dc=cubicweb,dc=test'
 URL = None
 
-def setUpModule(*args):
-    create_slapd_configuration(LDAPUserSourceTC.config)
-
-def tearDownModule(*args):
-    terminate_slapd()
-
 def create_slapd_configuration(config):
     global slapd_process, URL
     basedir = join(config.apphome, "ldapdb")
@@ -51,47 +45,89 @@
     confstream = file(slapdconf, 'w')
     confstream.write(confin % {'apphome': config.apphome})
     confstream.close()
-    if not exists(basedir):
-        os.makedirs(basedir)
-        # fill ldap server with some data
-        ldiffile = join(config.apphome, "ldap_test.ldif")
-        print "Initing ldap database"
-        cmdline = "/usr/sbin/slapadd -f %s -l %s -c" % (slapdconf, ldiffile)
-        subprocess.call(cmdline, shell=True)
-
+    if exists(basedir):
+        shutil.rmtree(basedir)
+    os.makedirs(basedir)
+    # fill ldap server with some data
+    ldiffile = join(config.apphome, "ldap_test.ldif")
+    config.info('Initing ldap database')
+    cmdline = "/usr/sbin/slapadd -f %s -l %s -c" % (slapdconf, ldiffile)
+    subprocess.call(cmdline, shell=True)
 
     #ldapuri = 'ldapi://' + join(basedir, "ldapi").replace('/', '%2f')
     port = get_available_port(xrange(9000, 9100))
     host = 'localhost:%s' % port
     ldapuri = 'ldap://%s' % host
     cmdline = ["/usr/sbin/slapd", "-f",  slapdconf,  "-h",  ldapuri, "-d", "0"]
-    print 'Starting slapd:', ' '.join(cmdline)
+    config.info('Starting slapd:', ' '.join(cmdline))
     slapd_process = subprocess.Popen(cmdline)
     time.sleep(0.2)
     if slapd_process.poll() is None:
-        print "slapd started with pid %s" % slapd_process.pid
+        config.info('slapd started with pid %s' % slapd_process.pid)
     else:
         raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' %
                                (" ".join(cmdline), os.getcwd()))
     URL = u'ldap://%s' % host
 
-def terminate_slapd():
+def terminate_slapd(config):
     global slapd_process
     if slapd_process.returncode is None:
-        print "terminating slapd"
+        config.info('terminating slapd')
         if hasattr(slapd_process, 'terminate'):
             slapd_process.terminate()
         else:
             import os, signal
             os.kill(slapd_process.pid, signal.SIGTERM)
         slapd_process.wait()
-        print "DONE"
+        config.info('DONE')
     del slapd_process
 
 
+class LDAPTestBase(CubicWebTC):
+    loglevel = 'ERROR'
+
+    @classmethod
+    def setUpClass(cls):
+        from cubicweb.cwctl import init_cmdline_log_threshold
+        init_cmdline_log_threshold(cls.config, cls.loglevel)
+        create_slapd_configuration(cls.config)
+
+    @classmethod
+    def tearDownClass(cls):
+        terminate_slapd(cls.config)
+
+class DeleteStuffFromLDAPFeedSourceTC(LDAPTestBase):
+    test_db_id = 'ldap-feed'
+
+    @classmethod
+    def pre_setup_database(cls, session, config):
+        session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed',
+                              url=URL, config=CONFIG)
+        session.commit()
+        isession = session.repo.internal_session(safe=True)
+        lfsource = isession.repo.sources_by_uri['ldapuser']
+        stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
+
+    def test_delete(self):
+        uri = self.repo.sources_by_uri['ldapuser'].urls[0]
+        from subprocess import call
+        deletecmd = ("ldapdelete -H %s 'uid=syt,ou=People,dc=cubicweb,dc=test' "
+                     "-v -x -D cn=admin,dc=cubicweb,dc=test -w'cw'" % uri)
+        os.system(deletecmd)
+        isession = self.session.repo.internal_session(safe=False)
+        from cubicweb.server.session import security_enabled
+        with security_enabled(isession, read=False, write=False):
+            lfsource = isession.repo.sources_by_uri['ldapuser']
+            stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
+            isession.commit()
+        self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
+        self.assertEqual(self.execute('Any N WHERE U login "syt", '
+                                      'U in_state S, S name N').rows[0][0],
+                         'deactivated')
 
 
-class LDAPFeedSourceTC(CubicWebTC):
+
+class LDAPFeedSourceTC(LDAPTestBase):
     test_db_id = 'ldap-feed'
 
     @classmethod
@@ -150,7 +186,9 @@
         self.assertEqual(len(rset), 1)
         e = rset.get_entity(0, 0)
         self.assertEqual(e.eid, eid)
-        self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', 'uri': u'system', 'use-cwuri-as-url': False},
+        self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native',
+                                                             'uri': u'system',
+                                                             'use-cwuri-as-url': False},
                                                   'type': 'CWUser',
                                                   'extid': None})
         self.assertEqual(e.cw_source[0].name, 'system')