server/test/unittest_ldapuser.py
changeset 8683 d537786e52b8
parent 8668 4fea61c636b2
parent 8681 48731a0d3df8
child 8684 6c7c2a02c9a0
--- a/server/test/unittest_ldapuser.py	Fri Jan 25 13:28:23 2013 +0100
+++ b/server/test/unittest_ldapuser.py	Fri Jan 25 14:33:40 2013 +0100
@@ -23,6 +23,7 @@
 import time
 from os.path import join, exists
 import subprocess
+import tempfile
 
 from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags
 
@@ -39,16 +40,13 @@
 
 def create_slapd_configuration(cls):
     global URL
+    slapddir = tempfile.mkdtemp('cw-unittest-ldap')
     config = cls.config
-    basedir = join(config.apphome, "ldapdb")
     slapdconf = join(config.apphome, "slapd.conf")
     confin = file(join(config.apphome, "slapd.conf.in")).read()
     confstream = file(slapdconf, 'w')
-    confstream.write(confin % {'apphome': config.apphome})
+    confstream.write(confin % {'apphome': config.apphome, 'testdir': slapddir})
     confstream.close()
-    if exists(basedir):
-        shutil.rmtree(basedir)
-    os.makedirs(basedir)
     # fill ldap server with some data
     ldiffile = join(config.apphome, "ldap_test.ldif")
     config.info('Initing ldap database')
@@ -69,6 +67,7 @@
         raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' %
                                (" ".join(cmdline), os.getcwd()))
     URL = u'ldap://%s' % host
+    return slapddir
 
 def terminate_slapd(cls):
     config = cls.config
@@ -89,11 +88,32 @@
     def setUpClass(cls):
         from cubicweb.cwctl import init_cmdline_log_threshold
         init_cmdline_log_threshold(cls.config, cls.loglevel)
-        create_slapd_configuration(cls)
+        cls._tmpdir = create_slapd_configuration(cls)
 
     @classmethod
     def tearDownClass(cls):
         terminate_slapd(cls)
+        try:
+            shutil.rmtree(cls._tmpdir)
+        except:
+            pass
+
+class CheckWrongGroup(LDAPTestBase):
+
+    def test_wrong_group(self):
+        self.session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed',
+                                   url=URL, config=CONFIG)
+        self.commit()
+        with self.session.repo.internal_session(safe=True) as session:
+            source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
+            config = source.repo_source.check_config(source)
+            # inject a bogus group here, along with at least a valid one
+            config['user-default-group'] = ('thisgroupdoesnotexists','users')
+            source.repo_source.update_config(source, config)
+            session.commit(free_cnxset=False)
+            # here we emitted an error log entry
+            stats = source.repo_source.pull_data(session, force=True, raise_on_error=True)
+            session.commit()
 
     def setUp(self):
         super(LDAPTestBase, self).setUp()
@@ -244,10 +264,6 @@
         source.pull_data(self.session)
         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
         self.assertEqual(len(rset), 1)
-        # test some password has been set
-        cu = self.session.system_sql('SELECT cw_upassword FROM cw_CWUser WHERE cw_eid=%s' % rset[0][0])
-        value = str(cu.fetchall()[0][0])
-        self.assertEqual(value, '{SSHA}v/8xJQP3uoaTBZz1T7Y0B3qOxRN1cj7D')
         self.assertTrue(self.repo.system_source.authenticate(
                 self.session, 'syt', password='syt'))