25 |
25 |
26 from cubicweb.server.sources.ldapuser import * |
26 from cubicweb.server.sources.ldapuser import * |
27 |
27 |
28 if '17.1' in socket.gethostbyname('ldap1'): |
28 if '17.1' in socket.gethostbyname('ldap1'): |
29 SYT = 'syt' |
29 SYT = 'syt' |
|
30 SYT_EMAIL = 'Sylvain Thenault' |
30 ADIM = 'adim' |
31 ADIM = 'adim' |
|
32 CONFIG = u'''host=ldap1 |
|
33 user-base-dn=ou=People,dc=logilab,dc=fr |
|
34 user-scope=ONELEVEL |
|
35 user-classes=top,posixAccount |
|
36 user-login-attr=uid |
|
37 user-default-group=users |
|
38 user-attrs-map=gecos:email,uid:login |
|
39 ''' |
31 else: |
40 else: |
32 SYT = 'sthenault' |
41 SYT = 'sthenault' |
|
42 SYT_EMAIL = 'sylvain.thenault@logilab.fr' |
33 ADIM = 'adimascio' |
43 ADIM = 'adimascio' |
|
44 CONFIG = u'''host=ldap1 |
|
45 user-base-dn=ou=People,dc=logilab,dc=net |
|
46 user-scope=ONELEVEL |
|
47 user-classes=top,OpenLDAPperson |
|
48 user-login-attr=uid |
|
49 user-default-group=users |
|
50 user-attrs-map=mail:email,uid:login |
|
51 ''' |
34 |
52 |
35 |
53 |
36 def nopwd_authenticate(self, session, login, password): |
54 def nopwd_authenticate(self, session, login, password): |
37 """used to monkey patch the source to get successful authentication without |
55 """used to monkey patch the source to get successful authentication without |
38 upassword checking |
56 upassword checking |
63 repo.shutdown() |
81 repo.shutdown() |
64 del repo |
82 del repo |
65 |
83 |
66 def add_ldap_source(cnx): |
84 def add_ldap_source(cnx): |
67 cnx.request().create_entity('CWSource', name=u'ldapuser', type=u'ldapuser', |
85 cnx.request().create_entity('CWSource', name=u'ldapuser', type=u'ldapuser', |
68 config=u''' |
86 config=CONFIG) |
69 # ldap host |
|
70 host=ldap1 |
|
71 # base DN to lookup for usres |
|
72 user-base-dn=ou=People,dc=logilab,dc=fr |
|
73 # user search scope |
|
74 user-scope=ONELEVEL |
|
75 # classes of user |
|
76 user-classes=top,posixAccount |
|
77 # attribute used as login on authentication |
|
78 user-login-attr=uid |
|
79 # name of a group in which ldap users will be by default |
|
80 user-default-group=users |
|
81 # map from ldap user attributes to cubicweb attributes |
|
82 user-attrs-map=gecos:email,uid:login |
|
83 ''') |
|
84 cnx.commit() |
87 cnx.commit() |
85 # XXX: need this first query else we get 'database is locked' from |
88 # XXX: need this first query else we get 'database is locked' from |
86 # sqlite since it doesn't support multiple connections on the same |
89 # sqlite since it doesn't support multiple connections on the same |
87 # database |
90 # database |
88 # so doing, ldap inserted users don't get removed between each test |
91 # so doing, ldap inserted users don't get removed between each test |
122 self.assertEqual(e.firstname, None) |
125 self.assertEqual(e.firstname, None) |
123 self.assertEqual(e.surname, None) |
126 self.assertEqual(e.surname, None) |
124 self.assertEqual(e.in_group[0].name, 'users') |
127 self.assertEqual(e.in_group[0].name, 'users') |
125 self.assertEqual(e.owned_by[0].login, SYT) |
128 self.assertEqual(e.owned_by[0].login, SYT) |
126 self.assertEqual(e.created_by, ()) |
129 self.assertEqual(e.created_by, ()) |
127 self.assertEqual(e.primary_email[0].address, 'Sylvain Thenault') |
130 self.assertEqual(e.primary_email[0].address, SYT_EMAIL) |
128 # email content should be indexed on the user |
131 # email content should be indexed on the user |
129 rset = self.sexecute('CWUser X WHERE X has_text "thenault"') |
132 rset = self.sexecute('CWUser X WHERE X has_text "thenault"') |
130 self.assertEqual(rset.rows, [[e.eid]]) |
133 self.assertEqual(rset.rows, [[e.eid]]) |
131 |
134 |
132 def test_not(self): |
135 def test_not(self): |
410 RQLGeneratorTC.setUp(self) |
413 RQLGeneratorTC.setUp(self) |
411 ldapsource = repo.sources[-1] |
414 ldapsource = repo.sources[-1] |
412 self.pool = repo._get_pool() |
415 self.pool = repo._get_pool() |
413 session = mock_object(pool=self.pool) |
416 session = mock_object(pool=self.pool) |
414 self.o = RQL2LDAPFilter(ldapsource, session) |
417 self.o = RQL2LDAPFilter(ldapsource, session) |
|
418 self.ldapclasses = ''.join('(objectClass=%s)' % ldapcls |
|
419 for ldapcls in ldapsource.user_classes) |
415 |
420 |
416 def tearDown(self): |
421 def tearDown(self): |
417 repo._free_pool(self.pool) |
422 repo._free_pool(self.pool) |
418 RQLGeneratorTC.tearDown(self) |
423 RQLGeneratorTC.tearDown(self) |
419 |
424 |
420 def test_base(self): |
425 def test_base(self): |
421 rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0] |
426 rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0] |
422 self.assertEqual(self.o.generate(rqlst, 'X')[1], |
427 self.assertEqual(self.o.generate(rqlst, 'X')[1], |
423 '(&(objectClass=top)(objectClass=posixAccount)(uid=toto))') |
428 '(&%s(uid=toto))' % self.ldapclasses) |
424 |
429 |
425 def test_kwargs(self): |
430 def test_kwargs(self): |
426 rqlst = self._prepare('CWUser X WHERE X login %(x)s').children[0] |
431 rqlst = self._prepare('CWUser X WHERE X login %(x)s').children[0] |
427 self.o._args = {'x': "toto"} |
432 self.o._args = {'x': "toto"} |
428 self.assertEqual(self.o.generate(rqlst, 'X')[1], |
433 self.assertEqual(self.o.generate(rqlst, 'X')[1], |
429 '(&(objectClass=top)(objectClass=posixAccount)(uid=toto))') |
434 '(&%s(uid=toto))' % self.ldapclasses) |
430 |
435 |
431 def test_get_attr(self): |
436 def test_get_attr(self): |
432 rqlst = self._prepare('Any X WHERE E firstname X, E eid 12').children[0] |
437 rqlst = self._prepare('Any X WHERE E firstname X, E eid 12').children[0] |
433 self.assertRaises(UnknownEid, self.o.generate, rqlst, 'E') |
438 self.assertRaises(UnknownEid, self.o.generate, rqlst, 'E') |
434 |
439 |