18 """cubicweb.server.sources.ldapusers unit and functional tests""" |
18 """cubicweb.server.sources.ldapusers unit and functional tests""" |
19 |
19 |
20 import os |
20 import os |
21 import shutil |
21 import shutil |
22 import time |
22 import time |
23 from os.path import abspath, join, exists |
23 from os.path import join, exists |
24 import subprocess |
24 import subprocess |
25 from socket import socket, error as socketerror |
|
26 |
25 |
27 from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags |
26 from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags |
28 |
27 |
29 from cubicweb import AuthenticationError |
28 from cubicweb import AuthenticationError |
30 from cubicweb.devtools.testlib import CubicWebTC |
29 from cubicweb.devtools.testlib import CubicWebTC |
31 from cubicweb.devtools.repotest import RQLGeneratorTC |
30 from cubicweb.devtools.repotest import RQLGeneratorTC |
32 from cubicweb.devtools.httptest import get_available_port |
31 from cubicweb.devtools.httptest import get_available_port |
33 from cubicweb.devtools import get_test_db_handler |
32 from cubicweb.devtools import get_test_db_handler |
34 |
33 |
35 from cubicweb.server.sources.ldapuser import * |
34 from cubicweb.server.session import security_enabled |
|
35 from cubicweb.server.sources.ldapuser import GlobTrFunc, UnknownEid, RQL2LDAPFilter |
36 |
36 |
37 CONFIG = u'user-base-dn=ou=People,dc=cubicweb,dc=test' |
37 CONFIG = u'user-base-dn=ou=People,dc=cubicweb,dc=test' |
38 URL = None |
38 URL = None |
39 |
39 |
40 def create_slapd_configuration(config): |
40 def create_slapd_configuration(cls): |
41 global slapd_process, URL |
41 global URL |
|
42 config = cls.config |
42 basedir = join(config.apphome, "ldapdb") |
43 basedir = join(config.apphome, "ldapdb") |
43 slapdconf = join(config.apphome, "slapd.conf") |
44 slapdconf = join(config.apphome, "slapd.conf") |
44 confin = file(join(config.apphome, "slapd.conf.in")).read() |
45 confin = file(join(config.apphome, "slapd.conf.in")).read() |
45 confstream = file(slapdconf, 'w') |
46 confstream = file(slapdconf, 'w') |
46 confstream.write(confin % {'apphome': config.apphome}) |
47 confstream.write(confin % {'apphome': config.apphome}) |
58 port = get_available_port(xrange(9000, 9100)) |
59 port = get_available_port(xrange(9000, 9100)) |
59 host = 'localhost:%s' % port |
60 host = 'localhost:%s' % port |
60 ldapuri = 'ldap://%s' % host |
61 ldapuri = 'ldap://%s' % host |
61 cmdline = ["/usr/sbin/slapd", "-f", slapdconf, "-h", ldapuri, "-d", "0"] |
62 cmdline = ["/usr/sbin/slapd", "-f", slapdconf, "-h", ldapuri, "-d", "0"] |
62 config.info('Starting slapd:', ' '.join(cmdline)) |
63 config.info('Starting slapd:', ' '.join(cmdline)) |
63 slapd_process = subprocess.Popen(cmdline) |
64 cls.slapd_process = subprocess.Popen(cmdline) |
64 time.sleep(0.2) |
65 time.sleep(0.2) |
65 if slapd_process.poll() is None: |
66 if cls.slapd_process.poll() is None: |
66 config.info('slapd started with pid %s' % slapd_process.pid) |
67 config.info('slapd started with pid %s' % cls.slapd_process.pid) |
67 else: |
68 else: |
68 raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' % |
69 raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' % |
69 (" ".join(cmdline), os.getcwd())) |
70 (" ".join(cmdline), os.getcwd())) |
70 URL = u'ldap://%s' % host |
71 URL = u'ldap://%s' % host |
71 |
72 |
72 def terminate_slapd(config): |
73 def terminate_slapd(cls): |
73 global slapd_process |
74 config = cls.config |
74 if slapd_process.returncode is None: |
75 if cls.slapd_process and cls.slapd_process.returncode is None: |
75 config.info('terminating slapd') |
76 config.info('terminating slapd') |
76 if hasattr(slapd_process, 'terminate'): |
77 if hasattr(cls.slapd_process, 'terminate'): |
77 slapd_process.terminate() |
78 cls.slapd_process.terminate() |
78 else: |
79 else: |
79 import os, signal |
80 import os, signal |
80 os.kill(slapd_process.pid, signal.SIGTERM) |
81 os.kill(cls.slapd_process.pid, signal.SIGTERM) |
81 slapd_process.wait() |
82 cls.slapd_process.wait() |
82 config.info('DONE') |
83 config.info('DONE') |
83 del slapd_process |
|
84 |
|
85 |
84 |
86 class LDAPTestBase(CubicWebTC): |
85 class LDAPTestBase(CubicWebTC): |
87 loglevel = 'ERROR' |
86 loglevel = 'ERROR' |
88 |
87 |
89 @classmethod |
88 @classmethod |
90 def setUpClass(cls): |
89 def setUpClass(cls): |
91 from cubicweb.cwctl import init_cmdline_log_threshold |
90 from cubicweb.cwctl import init_cmdline_log_threshold |
92 init_cmdline_log_threshold(cls.config, cls.loglevel) |
91 init_cmdline_log_threshold(cls.config, cls.loglevel) |
93 create_slapd_configuration(cls.config) |
92 create_slapd_configuration(cls) |
94 |
93 |
95 @classmethod |
94 @classmethod |
96 def tearDownClass(cls): |
95 def tearDownClass(cls): |
97 terminate_slapd(cls.config) |
96 terminate_slapd(cls) |
98 |
97 |
99 class DeleteStuffFromLDAPFeedSourceTC(LDAPTestBase): |
98 class DeleteStuffFromLDAPFeedSourceTC(LDAPTestBase): |
100 test_db_id = 'ldap-feed' |
99 test_db_id = 'ldap-feed' |
101 |
100 |
102 @classmethod |
101 @classmethod |
106 session.commit() |
105 session.commit() |
107 isession = session.repo.internal_session(safe=True) |
106 isession = session.repo.internal_session(safe=True) |
108 lfsource = isession.repo.sources_by_uri['ldapuser'] |
107 lfsource = isession.repo.sources_by_uri['ldapuser'] |
109 stats = lfsource.pull_data(isession, force=True, raise_on_error=True) |
108 stats = lfsource.pull_data(isession, force=True, raise_on_error=True) |
110 |
109 |
|
110 def _pull(self): |
|
111 with self.session.repo.internal_session() as isession: |
|
112 with security_enabled(isession, read=False, write=False): |
|
113 lfsource = isession.repo.sources_by_uri['ldapuser'] |
|
114 stats = lfsource.pull_data(isession, force=True, raise_on_error=True) |
|
115 isession.commit() |
|
116 |
111 def test_delete(self): |
117 def test_delete(self): |
|
118 """ delete syt, pull, check deactivation, repull, |
|
119 readd syt, pull, check activation |
|
120 """ |
112 uri = self.repo.sources_by_uri['ldapuser'].urls[0] |
121 uri = self.repo.sources_by_uri['ldapuser'].urls[0] |
113 from subprocess import call |
|
114 deletecmd = ("ldapdelete -H %s 'uid=syt,ou=People,dc=cubicweb,dc=test' " |
122 deletecmd = ("ldapdelete -H %s 'uid=syt,ou=People,dc=cubicweb,dc=test' " |
115 "-v -x -D cn=admin,dc=cubicweb,dc=test -w'cw'" % uri) |
123 "-v -x -D cn=admin,dc=cubicweb,dc=test -w'cw'" % uri) |
116 os.system(deletecmd) |
124 os.system(deletecmd) |
117 isession = self.session.repo.internal_session(safe=False) |
125 self._pull() |
118 from cubicweb.server.session import security_enabled |
|
119 with security_enabled(isession, read=False, write=False): |
|
120 lfsource = isession.repo.sources_by_uri['ldapuser'] |
|
121 stats = lfsource.pull_data(isession, force=True, raise_on_error=True) |
|
122 isession.commit() |
|
123 self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt') |
126 self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt') |
124 self.assertEqual(self.execute('Any N WHERE U login "syt", ' |
127 self.assertEqual(self.execute('Any N WHERE U login "syt", ' |
125 'U in_state S, S name N').rows[0][0], |
128 'U in_state S, S name N').rows[0][0], |
126 'deactivated') |
129 'deactivated') |
127 |
130 # check that it doesn't choke |
128 |
131 self._pull() |
|
132 # reset the fscking ldap thing |
|
133 self.tearDownClass() |
|
134 self.setUpClass() |
|
135 self._pull() |
|
136 # still deactivated, but a warning has been emitted ... |
|
137 self.assertEqual(self.execute('Any N WHERE U login "syt", ' |
|
138 'U in_state S, S name N').rows[0][0], |
|
139 'deactivated') |
129 |
140 |
130 class LDAPFeedSourceTC(LDAPTestBase): |
141 class LDAPFeedSourceTC(LDAPTestBase): |
131 test_db_id = 'ldap-feed' |
142 test_db_id = 'ldap-feed' |
132 |
143 |
133 @classmethod |
144 @classmethod |