21 import os |
21 import os |
22 import shutil |
22 import shutil |
23 import time |
23 import time |
24 from os.path import join, exists |
24 from os.path import join, exists |
25 import subprocess |
25 import subprocess |
|
26 import tempfile |
26 |
27 |
27 from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags |
28 from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags |
28 |
29 |
29 from cubicweb import AuthenticationError |
30 from cubicweb import AuthenticationError |
30 from cubicweb.devtools.testlib import CubicWebTC |
31 from cubicweb.devtools.testlib import CubicWebTC |
37 CONFIG = u'user-base-dn=ou=People,dc=cubicweb,dc=test' |
38 CONFIG = u'user-base-dn=ou=People,dc=cubicweb,dc=test' |
38 URL = None |
39 URL = None |
39 |
40 |
40 def create_slapd_configuration(cls): |
41 def create_slapd_configuration(cls): |
41 global URL |
42 global URL |
|
43 slapddir = tempfile.mkdtemp('cw-unittest-ldap') |
42 config = cls.config |
44 config = cls.config |
43 basedir = join(config.apphome, "ldapdb") |
|
44 slapdconf = join(config.apphome, "slapd.conf") |
45 slapdconf = join(config.apphome, "slapd.conf") |
45 confin = file(join(config.apphome, "slapd.conf.in")).read() |
46 confin = file(join(config.apphome, "slapd.conf.in")).read() |
46 confstream = file(slapdconf, 'w') |
47 confstream = file(slapdconf, 'w') |
47 confstream.write(confin % {'apphome': config.apphome}) |
48 confstream.write(confin % {'apphome': config.apphome, 'testdir': slapddir}) |
48 confstream.close() |
49 confstream.close() |
49 if exists(basedir): |
|
50 shutil.rmtree(basedir) |
|
51 os.makedirs(basedir) |
|
52 # fill ldap server with some data |
50 # fill ldap server with some data |
53 ldiffile = join(config.apphome, "ldap_test.ldif") |
51 ldiffile = join(config.apphome, "ldap_test.ldif") |
54 config.info('Initing ldap database') |
52 config.info('Initing ldap database') |
55 cmdline = "/usr/sbin/slapadd -f %s -l %s -c" % (slapdconf, ldiffile) |
53 cmdline = "/usr/sbin/slapadd -f %s -l %s -c" % (slapdconf, ldiffile) |
56 subprocess.check_call(cmdline, shell=True) == 0 |
54 subprocess.check_call(cmdline, shell=True) == 0 |
67 config.info('slapd started with pid %s' % cls.slapd_process.pid) |
65 config.info('slapd started with pid %s' % cls.slapd_process.pid) |
68 else: |
66 else: |
69 raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' % |
67 raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' % |
70 (" ".join(cmdline), os.getcwd())) |
68 (" ".join(cmdline), os.getcwd())) |
71 URL = u'ldap://%s' % host |
69 URL = u'ldap://%s' % host |
|
70 return slapddir |
72 |
71 |
73 def terminate_slapd(cls): |
72 def terminate_slapd(cls): |
74 config = cls.config |
73 config = cls.config |
75 if cls.slapd_process and cls.slapd_process.returncode is None: |
74 if cls.slapd_process and cls.slapd_process.returncode is None: |
76 config.info('terminating slapd') |
75 config.info('terminating slapd') |
87 |
86 |
88 @classmethod |
87 @classmethod |
89 def setUpClass(cls): |
88 def setUpClass(cls): |
90 from cubicweb.cwctl import init_cmdline_log_threshold |
89 from cubicweb.cwctl import init_cmdline_log_threshold |
91 init_cmdline_log_threshold(cls.config, cls.loglevel) |
90 init_cmdline_log_threshold(cls.config, cls.loglevel) |
92 create_slapd_configuration(cls) |
91 cls._tmpdir = create_slapd_configuration(cls) |
93 |
92 |
94 @classmethod |
93 @classmethod |
95 def tearDownClass(cls): |
94 def tearDownClass(cls): |
96 terminate_slapd(cls) |
95 terminate_slapd(cls) |
|
96 try: |
|
97 shutil.rmtree(cls._tmpdir) |
|
98 except: |
|
99 pass |
|
100 |
|
101 class CheckWrongGroup(LDAPTestBase): |
|
102 |
|
103 def test_wrong_group(self): |
|
104 self.session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed', |
|
105 url=URL, config=CONFIG) |
|
106 self.commit() |
|
107 with self.session.repo.internal_session(safe=True) as session: |
|
108 source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0) |
|
109 config = source.repo_source.check_config(source) |
|
110 # inject a bogus group here, along with at least a valid one |
|
111 config['user-default-group'] = ('thisgroupdoesnotexists','users') |
|
112 source.repo_source.update_config(source, config) |
|
113 session.commit(free_cnxset=False) |
|
114 # here we emitted an error log entry |
|
115 stats = source.repo_source.pull_data(session, force=True, raise_on_error=True) |
|
116 session.commit() |
97 |
117 |
98 def setUp(self): |
118 def setUp(self): |
99 super(LDAPTestBase, self).setUp() |
119 super(LDAPTestBase, self).setUp() |
100 # ldap source url in the database may use a different port as the one |
120 # ldap source url in the database may use a different port as the one |
101 # just attributed |
121 # just attributed |
242 self.assertTrue(e.creation_date) |
262 self.assertTrue(e.creation_date) |
243 self.assertTrue(e.modification_date) |
263 self.assertTrue(e.modification_date) |
244 source.pull_data(self.session) |
264 source.pull_data(self.session) |
245 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
265 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
246 self.assertEqual(len(rset), 1) |
266 self.assertEqual(len(rset), 1) |
247 # test some password has been set |
|
248 cu = self.session.system_sql('SELECT cw_upassword FROM cw_CWUser WHERE cw_eid=%s' % rset[0][0]) |
|
249 value = str(cu.fetchall()[0][0]) |
|
250 self.assertEqual(value, '{SSHA}v/8xJQP3uoaTBZz1T7Y0B3qOxRN1cj7D') |
|
251 self.assertTrue(self.repo.system_source.authenticate( |
267 self.assertTrue(self.repo.system_source.authenticate( |
252 self.session, 'syt', password='syt')) |
268 self.session, 'syt', password='syt')) |
253 |
269 |
254 |
270 |
255 class LDAPUserSourceTC(LDAPFeedSourceTC): |
271 class LDAPUserSourceTC(LDAPFeedSourceTC): |