server/test/unittest_ldapsource.py
changeset 8922 715b9eec6da9
parent 8921 da46624a0880
child 8959 69a78922114b
equal deleted inserted replaced
8921:da46624a0880 8922:715b9eec6da9
    33 from cubicweb.devtools.httptest import get_available_port
    33 from cubicweb.devtools.httptest import get_available_port
    34 from cubicweb.devtools import get_test_db_handler
    34 from cubicweb.devtools import get_test_db_handler
    35 
    35 
    36 from cubicweb.server.sources.ldapuser import GlobTrFunc, UnknownEid, RQL2LDAPFilter
    36 from cubicweb.server.sources.ldapuser import GlobTrFunc, UnknownEid, RQL2LDAPFilter
    37 
    37 
    38 CONFIG_LDAPFEED = CONFIG_LDAPUSER = u'''
    38 CONFIG_LDAPFEED = u'''
       
    39 user-base-dn=ou=People,dc=cubicweb,dc=test
       
    40 group-base-dn=ou=Group,dc=cubicweb,dc=test
       
    41 user-attrs-map=uid=login,mail=email,userPassword=upassword
       
    42 group-attrs-map=cn=name,memberUid=member
       
    43 '''
       
    44 CONFIG_LDAPUSER = u'''
    39 user-base-dn=ou=People,dc=cubicweb,dc=test
    45 user-base-dn=ou=People,dc=cubicweb,dc=test
    40 user-attrs-map=uid=login,mail=email,userPassword=upassword
    46 user-attrs-map=uid=login,mail=email,userPassword=upassword
    41 '''
    47 '''
    42 
    48 
    43 URL = None
    49 URL = None
   349         self.commit()
   355         self.commit()
   350         # and that we can now authenticate again
   356         # and that we can now authenticate again
   351         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='toto')
   357         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='toto')
   352         self.assertTrue(self.repo.connect('syt', password='syt'))
   358         self.assertTrue(self.repo.connect('syt', password='syt'))
   353 
   359 
       
   360 class LDAPFeedGroupTC(LDAPFeedTestBase):
       
   361     """
       
   362     A testcase for group support in ldapfeed.
       
   363     """
       
   364 
       
   365     def test_groups_exist(self):
       
   366         rset = self.sexecute('CWGroup X WHERE X name "dir"')
       
   367         self.assertEqual(len(rset), 1)
       
   368 
       
   369         rset = self.sexecute('CWGroup X WHERE X cw_source S, S name "ldap"')
       
   370         self.assertEqual(len(rset), 2)
       
   371 
       
   372     def test_group_deleted(self):
       
   373         rset = self.sexecute('CWGroup X WHERE X name "dir"')
       
   374         self.assertEqual(len(rset), 1)
       
   375 
       
   376     def test_in_group(self):
       
   377         rset = self.sexecute('CWGroup X WHERE X name %(name)s', {'name': 'dir'})
       
   378         dirgroup = rset.get_entity(0, 0)
       
   379         self.assertEqual(set(['syt', 'adim']),
       
   380                          set([u.login for u in dirgroup.reverse_in_group]))
       
   381         rset = self.sexecute('CWGroup X WHERE X name %(name)s', {'name': 'logilab'})
       
   382         logilabgroup = rset.get_entity(0, 0)
       
   383         self.assertEqual(set(['adim']),
       
   384                          set([u.login for u in logilabgroup.reverse_in_group]))
       
   385 
       
   386     def test_group_member_added(self):
       
   387         self.pull()
       
   388         rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
       
   389                              {'name': 'logilab'})
       
   390         self.assertEqual(len(rset), 1)
       
   391         self.assertEqual(rset[0][0], 'adim')
       
   392 
       
   393         try:
       
   394             self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',
       
   395                                    {('add', 'memberUid'): ['syt']})
       
   396             time.sleep(1.1) # timestamps precision is 1s
       
   397             self.pull()
       
   398 
       
   399             rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
       
   400                                  {'name': 'logilab'})
       
   401             self.assertEqual(len(rset), 2)
       
   402             self.assertEqual(rset[0][0], 'adim')
       
   403             self.assertEqual(rset[1][0], 'syt')
       
   404 
       
   405         finally:
       
   406             # back to normal ldap setup
       
   407             self.tearDownClass()
       
   408             self.setUpClass()
       
   409 
       
   410     def test_group_member_deleted(self):
       
   411         self.pull() # ensure we are sync'ed
       
   412         rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
       
   413                              {'name': 'logilab'})
       
   414         self.assertEqual(len(rset), 1)
       
   415         self.assertEqual(rset[0][0], 'adim')
       
   416 
       
   417         try:
       
   418             self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',
       
   419                                    {('delete', 'memberUid'): ['adim']})
       
   420             time.sleep(1.1) # timestamps precision is 1s
       
   421             self.pull()
       
   422 
       
   423             rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
       
   424                                  {'name': 'logilab'})
       
   425             self.assertEqual(len(rset), 0)
       
   426         finally:
       
   427             # back to normal ldap setup
       
   428             self.tearDownClass()
       
   429             self.setUpClass()
       
   430 
   354 
   431 
   355 class LDAPUserSourceTC(LDAPFeedTestBase):
   432 class LDAPUserSourceTC(LDAPFeedTestBase):
   356     test_db_id = 'ldap-user'
   433     test_db_id = 'ldap-user'
   357     tags = CubicWebTC.tags | Tags(('ldap'))
   434     tags = CubicWebTC.tags | Tags(('ldap'))
   358 
   435