15 # |
15 # |
16 # You should have received a copy of the GNU Lesser General Public License along |
16 # You should have received a copy of the GNU Lesser General Public License along |
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
18 """cubicweb.server.sources.ldapusers unit and functional tests""" |
18 """cubicweb.server.sources.ldapusers unit and functional tests""" |
19 |
19 |
20 import socket |
20 import os |
|
21 import shutil |
|
22 import time |
|
23 from os.path import abspath, join, exists |
|
24 import subprocess |
|
25 from socket import socket, error as socketerror |
21 |
26 |
22 from logilab.common.testlib import TestCase, unittest_main, mock_object |
27 from logilab.common.testlib import TestCase, unittest_main, mock_object |
23 from cubicweb.devtools.testlib import CubicWebTC |
28 from cubicweb.devtools.testlib import CubicWebTC |
24 from cubicweb.devtools.repotest import RQLGeneratorTC |
29 from cubicweb.devtools.repotest import RQLGeneratorTC |
25 |
30 |
26 from cubicweb.server.sources.ldapuser import * |
31 from cubicweb.server.sources.ldapuser import * |
27 |
32 |
28 if '17.1' in socket.gethostbyname('ldap1'): |
33 SYT = 'syt' |
29 SYT = 'syt' |
34 SYT_EMAIL = 'Sylvain Thenault' |
30 SYT_EMAIL = 'Sylvain Thenault' |
35 ADIM = 'adim' |
31 ADIM = 'adim' |
36 CONFIG = u'''host=%s |
32 CONFIG = u'''host=ldap1 |
37 user-base-dn=ou=People,dc=cubicweb,dc=test |
33 user-base-dn=ou=People,dc=logilab,dc=fr |
|
34 user-scope=ONELEVEL |
38 user-scope=ONELEVEL |
35 user-classes=top,posixAccount |
39 user-classes=top,posixAccount |
36 user-login-attr=uid |
40 user-login-attr=uid |
37 user-default-group=users |
41 user-default-group=users |
38 user-attrs-map=gecos:email,uid:login |
42 user-attrs-map=gecos:email,uid:login |
39 ''' |
|
40 else: |
|
41 SYT = 'sthenault' |
|
42 SYT_EMAIL = 'sylvain.thenault@logilab.fr' |
|
43 ADIM = 'adimascio' |
|
44 CONFIG = u'''host=ldap1 |
|
45 user-base-dn=ou=People,dc=logilab,dc=net |
|
46 user-scope=ONELEVEL |
|
47 user-classes=top,OpenLDAPperson |
|
48 user-login-attr=uid |
|
49 user-default-group=users |
|
50 user-attrs-map=mail:email,uid:login |
|
51 ''' |
43 ''' |
52 |
44 |
53 |
45 |
54 def nopwd_authenticate(self, session, login, password): |
46 def nopwd_authenticate(self, session, login, password): |
55 """used to monkey patch the source to get successful authentication without |
47 """used to monkey patch the source to get successful authentication without |
69 raise AuthenticationError() |
61 raise AuthenticationError() |
70 # don't check upassword ! |
62 # don't check upassword ! |
71 return self.extid2eid(user['dn'], 'CWUser', session) |
63 return self.extid2eid(user['dn'], 'CWUser', session) |
72 |
64 |
73 def setUpModule(*args): |
65 def setUpModule(*args): |
|
66 create_slapd_configuration(LDAPUserSourceTC.config) |
74 global repo |
67 global repo |
75 LDAPUserSourceTC._init_repo() |
68 try: |
76 repo = LDAPUserSourceTC.repo |
69 LDAPUserSourceTC._init_repo() |
77 add_ldap_source(LDAPUserSourceTC.cnx) |
70 repo = LDAPUserSourceTC.repo |
|
71 add_ldap_source(LDAPUserSourceTC.cnx) |
|
72 except: |
|
73 terminate_slapd() |
|
74 raise |
78 |
75 |
79 def tearDownModule(*args): |
76 def tearDownModule(*args): |
80 global repo |
77 global repo |
81 repo.shutdown() |
78 repo.shutdown() |
82 del repo |
79 del repo |
|
80 terminate_slapd() |
83 |
81 |
84 def add_ldap_source(cnx): |
82 def add_ldap_source(cnx): |
85 cnx.request().create_entity('CWSource', name=u'ldapuser', type=u'ldapuser', |
83 cnx.request().create_entity('CWSource', name=u'ldapuser', type=u'ldapuser', |
86 config=CONFIG) |
84 config=CONFIG) |
87 cnx.commit() |
85 cnx.commit() |
91 # so doing, ldap inserted users don't get removed between each test |
89 # so doing, ldap inserted users don't get removed between each test |
92 rset = cnx.cursor().execute('CWUser X') |
90 rset = cnx.cursor().execute('CWUser X') |
93 # check we get some users from ldap |
91 # check we get some users from ldap |
94 assert len(rset) > 1 |
92 assert len(rset) > 1 |
95 |
93 |
|
94 def create_slapd_configuration(config): |
|
95 global slapd_process, CONFIG |
|
96 basedir = join(config.apphome, "ldapdb") |
|
97 slapdconf = join(config.apphome, "slapd.conf") |
|
98 if not exists(basedir): |
|
99 os.makedirs(basedir) |
|
100 # fill ldap server with some data |
|
101 ldiffile = join(config.apphome, "ldap_test.ldif") |
|
102 print "Initing ldap database" |
|
103 cmdline = "/usr/sbin/slapadd -f %s -l %s -c" % (slapdconf, ldiffile) |
|
104 subprocess.call(cmdline, shell=True) |
|
105 |
|
106 |
|
107 #ldapuri = 'ldapi://' + join(basedir, "ldapi").replace('/', '%2f') |
|
108 for port in range(9000, 9100): |
|
109 try: |
|
110 socket().bind(('localhost', port)) |
|
111 except socketerror, e: |
|
112 if e.errno == 98: # Address already in use |
|
113 pass |
|
114 else: |
|
115 raise |
|
116 else: |
|
117 break |
|
118 else: |
|
119 raise Exception("Can't find a free TCP port on localhost") |
|
120 |
|
121 host = 'localhost:%s' % port |
|
122 ldapuri = 'ldap://%s' % host |
|
123 cmdline = ["/usr/sbin/slapd", "-f", slapdconf, "-h", ldapuri, "-d", "0"] |
|
124 print "Starting slapd on", ldapuri |
|
125 slapd_process = subprocess.Popen(cmdline) |
|
126 time.sleep(0.2) |
|
127 if slapd_process.poll() is None: |
|
128 print "slapd started with pid %s" % slapd_process.pid |
|
129 else: |
|
130 raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' % |
|
131 (" ".join(cmdline), os.getcwd())) |
|
132 CONFIG = CONFIG % host |
|
133 |
|
134 def terminate_slapd(): |
|
135 global slapd_process |
|
136 if slapd_process.returncode is None: |
|
137 print "terminating slapd" |
|
138 slapd_process.terminate() |
|
139 slapd_process.wait() |
|
140 print "DONE" |
|
141 |
|
142 del slapd_process |
96 |
143 |
97 class LDAPUserSourceTC(CubicWebTC): |
144 class LDAPUserSourceTC(CubicWebTC): |
98 |
145 |
99 def patch_authenticate(self): |
146 def patch_authenticate(self): |
100 self._orig_authenticate = LDAPUserSource.authenticate |
147 self._orig_authenticate = LDAPUserSource.authenticate |
115 source = self.repo.sources_by_uri['ldapuser'] |
162 source = self.repo.sources_by_uri['ldapuser'] |
116 source.synchronize() |
163 source.synchronize() |
117 |
164 |
118 def test_base(self): |
165 def test_base(self): |
119 # check a known one |
166 # check a known one |
120 e = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT}).get_entity(0, 0) |
167 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT}) |
|
168 e = rset.get_entity(0, 0) |
121 self.assertEqual(e.login, SYT) |
169 self.assertEqual(e.login, SYT) |
122 e.complete() |
170 e.complete() |
123 self.assertEqual(e.creation_date, None) |
171 self.assertEqual(e.creation_date, None) |
124 self.assertEqual(e.modification_date, None) |
172 self.assertEqual(e.modification_date, None) |
125 self.assertEqual(e.firstname, None) |
173 self.assertEqual(e.firstname, None) |