18 """cubicweb.server.sources.ldapusers unit and functional tests""" |
18 """cubicweb.server.sources.ldapusers unit and functional tests""" |
19 |
19 |
20 import socket |
20 import socket |
21 |
21 |
22 from logilab.common.testlib import TestCase, unittest_main, mock_object |
22 from logilab.common.testlib import TestCase, unittest_main, mock_object |
23 from cubicweb.devtools import TestServerConfiguration |
|
24 from cubicweb.devtools.testlib import CubicWebTC |
23 from cubicweb.devtools.testlib import CubicWebTC |
25 from cubicweb.devtools.repotest import RQLGeneratorTC |
24 from cubicweb.devtools.repotest import RQLGeneratorTC |
26 |
25 |
27 from cubicweb.server.sources.ldapuser import * |
26 from cubicweb.server.sources.ldapuser import * |
28 |
27 |
51 # no such user |
50 # no such user |
52 raise AuthenticationError() |
51 raise AuthenticationError() |
53 # don't check upassword ! |
52 # don't check upassword ! |
54 return self.extid2eid(user['dn'], 'CWUser', session) |
53 return self.extid2eid(user['dn'], 'CWUser', session) |
55 |
54 |
|
55 def setup_module(*args): |
|
56 global repo |
|
57 LDAPUserSourceTC._init_repo() |
|
58 repo = LDAPUserSourceTC.repo |
|
59 add_ldap_source(LDAPUserSourceTC.cnx) |
|
60 |
|
61 def teardown_module(*args): |
|
62 global repo |
|
63 repo.shutdown() |
|
64 del repo |
|
65 |
|
66 def add_ldap_source(cnx): |
|
67 cnx.request().create_entity('CWSource', name=u'ldapuser', type=u'ldapuser', |
|
68 config=u''' |
|
69 # ldap host |
|
70 host=ldap1 |
|
71 # base DN to lookup for usres |
|
72 user-base-dn=ou=People,dc=logilab,dc=fr |
|
73 # user search scope |
|
74 user-scope=ONELEVEL |
|
75 # classes of user |
|
76 user-classes=top,posixAccount |
|
77 # attribute used as login on authentication |
|
78 user-login-attr=uid |
|
79 # name of a group in which ldap users will be by default |
|
80 user-default-group=users |
|
81 # map from ldap user attributes to cubicweb attributes |
|
82 user-attrs-map=gecos:email,uid:login |
|
83 ''') |
|
84 cnx.commit() |
|
85 # XXX: need this first query else we get 'database is locked' from |
|
86 # sqlite since it doesn't support multiple connections on the same |
|
87 # database |
|
88 # so doing, ldap inserted users don't get removed between each test |
|
89 rset = cnx.cursor().execute('CWUser X') |
|
90 # check we get some users from ldap |
|
91 assert len(rset) > 1 |
56 |
92 |
57 |
93 |
58 class LDAPUserSourceTC(CubicWebTC): |
94 class LDAPUserSourceTC(CubicWebTC): |
59 config = TestServerConfiguration('data') |
|
60 config.sources_file = lambda : 'data/sourcesldap' |
|
61 |
95 |
62 def patch_authenticate(self): |
96 def patch_authenticate(self): |
63 self._orig_authenticate = LDAPUserSource.authenticate |
97 self._orig_authenticate = LDAPUserSource.authenticate |
64 LDAPUserSource.authenticate = nopwd_authenticate |
98 LDAPUserSource.authenticate = nopwd_authenticate |
65 |
|
66 def setup_database(self): |
|
67 # XXX: need this first query else we get 'database is locked' from |
|
68 # sqlite since it doesn't support multiple connections on the same |
|
69 # database |
|
70 # so doing, ldap inserted users don't get removed between each test |
|
71 rset = self.sexecute('CWUser X') |
|
72 # check we get some users from ldap |
|
73 self.assert_(len(rset) > 1) |
|
74 |
99 |
75 def tearDown(self): |
100 def tearDown(self): |
76 if hasattr(self, '_orig_authenticate'): |
101 if hasattr(self, '_orig_authenticate'): |
77 LDAPUserSource.authenticate = self._orig_authenticate |
102 LDAPUserSource.authenticate = self._orig_authenticate |
78 CubicWebTC.tearDown(self) |
103 CubicWebTC.tearDown(self) |
376 self.assertEqual(res, [[4]]) |
401 self.assertEqual(res, [[4]]) |
377 trfunc = GlobTrFunc('max', 1) |
402 trfunc = GlobTrFunc('max', 1) |
378 res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) |
403 res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) |
379 self.assertEqual(res, [[1, 5], [2, 4], [3, 6]]) |
404 self.assertEqual(res, [[1, 5], [2, 4], [3, 6]]) |
380 |
405 |
381 # XXX |
|
382 LDAPUserSourceTC._init_repo() |
|
383 repo = LDAPUserSourceTC.repo |
|
384 |
|
385 def teardown_module(*args): |
|
386 global repo |
|
387 del repo |
|
388 del RQL2LDAPFilterTC.schema |
|
389 |
|
390 class RQL2LDAPFilterTC(RQLGeneratorTC): |
406 class RQL2LDAPFilterTC(RQLGeneratorTC): |
391 schema = repo.schema |
|
392 |
407 |
393 def setUp(self): |
408 def setUp(self): |
|
409 self.schema = repo.schema |
394 RQLGeneratorTC.setUp(self) |
410 RQLGeneratorTC.setUp(self) |
395 ldapsource = repo.sources[-1] |
411 ldapsource = repo.sources[-1] |
396 self.pool = repo._get_pool() |
412 self.pool = repo._get_pool() |
397 session = mock_object(pool=self.pool) |
413 session = mock_object(pool=self.pool) |
398 self.o = RQL2LDAPFilter(ldapsource, session) |
414 self.o = RQL2LDAPFilter(ldapsource, session) |