33 from cubicweb.devtools.httptest import get_available_port |
33 from cubicweb.devtools.httptest import get_available_port |
34 from cubicweb.devtools import get_test_db_handler |
34 from cubicweb.devtools import get_test_db_handler |
35 |
35 |
36 from cubicweb.server.sources.ldapuser import GlobTrFunc, UnknownEid, RQL2LDAPFilter |
36 from cubicweb.server.sources.ldapuser import GlobTrFunc, UnknownEid, RQL2LDAPFilter |
37 |
37 |
38 CONFIG_LDAPFEED = CONFIG_LDAPUSER = u''' |
38 CONFIG_LDAPFEED = u''' |
|
39 user-base-dn=ou=People,dc=cubicweb,dc=test |
|
40 group-base-dn=ou=Group,dc=cubicweb,dc=test |
|
41 user-attrs-map=uid=login,mail=email,userPassword=upassword |
|
42 group-attrs-map=cn=name,memberUid=member |
|
43 ''' |
|
44 CONFIG_LDAPUSER = u''' |
39 user-base-dn=ou=People,dc=cubicweb,dc=test |
45 user-base-dn=ou=People,dc=cubicweb,dc=test |
40 user-attrs-map=uid=login,mail=email,userPassword=upassword |
46 user-attrs-map=uid=login,mail=email,userPassword=upassword |
41 ''' |
47 ''' |
42 |
48 |
43 URL = None |
49 URL = None |
349 self.commit() |
355 self.commit() |
350 # and that we can now authenticate again |
356 # and that we can now authenticate again |
351 self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='toto') |
357 self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='toto') |
352 self.assertTrue(self.repo.connect('syt', password='syt')) |
358 self.assertTrue(self.repo.connect('syt', password='syt')) |
353 |
359 |
|
360 class LDAPFeedGroupTC(LDAPFeedTestBase): |
|
361 """ |
|
362 A testcase for group support in ldapfeed. |
|
363 """ |
|
364 |
|
365 def test_groups_exist(self): |
|
366 rset = self.sexecute('CWGroup X WHERE X name "dir"') |
|
367 self.assertEqual(len(rset), 1) |
|
368 |
|
369 rset = self.sexecute('CWGroup X WHERE X cw_source S, S name "ldap"') |
|
370 self.assertEqual(len(rset), 2) |
|
371 |
|
372 def test_group_deleted(self): |
|
373 rset = self.sexecute('CWGroup X WHERE X name "dir"') |
|
374 self.assertEqual(len(rset), 1) |
|
375 |
|
376 def test_in_group(self): |
|
377 rset = self.sexecute('CWGroup X WHERE X name %(name)s', {'name': 'dir'}) |
|
378 dirgroup = rset.get_entity(0, 0) |
|
379 self.assertEqual(set(['syt', 'adim']), |
|
380 set([u.login for u in dirgroup.reverse_in_group])) |
|
381 rset = self.sexecute('CWGroup X WHERE X name %(name)s', {'name': 'logilab'}) |
|
382 logilabgroup = rset.get_entity(0, 0) |
|
383 self.assertEqual(set(['adim']), |
|
384 set([u.login for u in logilabgroup.reverse_in_group])) |
|
385 |
|
386 def test_group_member_added(self): |
|
387 self.pull() |
|
388 rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L', |
|
389 {'name': 'logilab'}) |
|
390 self.assertEqual(len(rset), 1) |
|
391 self.assertEqual(rset[0][0], 'adim') |
|
392 |
|
393 try: |
|
394 self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test', |
|
395 {('add', 'memberUid'): ['syt']}) |
|
396 time.sleep(1.1) # timestamps precision is 1s |
|
397 self.pull() |
|
398 |
|
399 rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L', |
|
400 {'name': 'logilab'}) |
|
401 self.assertEqual(len(rset), 2) |
|
402 self.assertEqual(rset[0][0], 'adim') |
|
403 self.assertEqual(rset[1][0], 'syt') |
|
404 |
|
405 finally: |
|
406 # back to normal ldap setup |
|
407 self.tearDownClass() |
|
408 self.setUpClass() |
|
409 |
|
410 def test_group_member_deleted(self): |
|
411 self.pull() # ensure we are sync'ed |
|
412 rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L', |
|
413 {'name': 'logilab'}) |
|
414 self.assertEqual(len(rset), 1) |
|
415 self.assertEqual(rset[0][0], 'adim') |
|
416 |
|
417 try: |
|
418 self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test', |
|
419 {('delete', 'memberUid'): ['adim']}) |
|
420 time.sleep(1.1) # timestamps precision is 1s |
|
421 self.pull() |
|
422 |
|
423 rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L', |
|
424 {'name': 'logilab'}) |
|
425 self.assertEqual(len(rset), 0) |
|
426 finally: |
|
427 # back to normal ldap setup |
|
428 self.tearDownClass() |
|
429 self.setUpClass() |
|
430 |
354 |
431 |
355 class LDAPUserSourceTC(LDAPFeedTestBase): |
432 class LDAPUserSourceTC(LDAPFeedTestBase): |
356 test_db_id = 'ldap-user' |
433 test_db_id = 'ldap-user' |
357 tags = CubicWebTC.tags | Tags(('ldap')) |
434 tags = CubicWebTC.tags | Tags(('ldap')) |
358 |
435 |