server/test/unittest_ldapsource.py
changeset 8922 715b9eec6da9
parent 8921 da46624a0880
child 8959 69a78922114b
--- a/server/test/unittest_ldapsource.py	Wed Apr 24 17:57:14 2013 +0200
+++ b/server/test/unittest_ldapsource.py	Wed Apr 24 18:11:37 2013 +0200
@@ -35,7 +35,13 @@
 
 from cubicweb.server.sources.ldapuser import GlobTrFunc, UnknownEid, RQL2LDAPFilter
 
-CONFIG_LDAPFEED = CONFIG_LDAPUSER = u'''
+CONFIG_LDAPFEED = u'''
+user-base-dn=ou=People,dc=cubicweb,dc=test
+group-base-dn=ou=Group,dc=cubicweb,dc=test
+user-attrs-map=uid=login,mail=email,userPassword=upassword
+group-attrs-map=cn=name,memberUid=member
+'''
+CONFIG_LDAPUSER = u'''
 user-base-dn=ou=People,dc=cubicweb,dc=test
 user-attrs-map=uid=login,mail=email,userPassword=upassword
 '''
@@ -351,6 +357,77 @@
         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='toto')
         self.assertTrue(self.repo.connect('syt', password='syt'))
 
+class LDAPFeedGroupTC(LDAPFeedTestBase):
+    """
+    A testcase for group support in ldapfeed.
+    """
+
+    def test_groups_exist(self):
+        rset = self.sexecute('CWGroup X WHERE X name "dir"')
+        self.assertEqual(len(rset), 1)
+
+        rset = self.sexecute('CWGroup X WHERE X cw_source S, S name "ldap"')
+        self.assertEqual(len(rset), 2)
+
+    def test_group_deleted(self):
+        rset = self.sexecute('CWGroup X WHERE X name "dir"')
+        self.assertEqual(len(rset), 1)
+
+    def test_in_group(self):
+        rset = self.sexecute('CWGroup X WHERE X name %(name)s', {'name': 'dir'})
+        dirgroup = rset.get_entity(0, 0)
+        self.assertEqual(set(['syt', 'adim']),
+                         set([u.login for u in dirgroup.reverse_in_group]))
+        rset = self.sexecute('CWGroup X WHERE X name %(name)s', {'name': 'logilab'})
+        logilabgroup = rset.get_entity(0, 0)
+        self.assertEqual(set(['adim']),
+                         set([u.login for u in logilabgroup.reverse_in_group]))
+
+    def test_group_member_added(self):
+        self.pull()
+        rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
+                             {'name': 'logilab'})
+        self.assertEqual(len(rset), 1)
+        self.assertEqual(rset[0][0], 'adim')
+
+        try:
+            self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',
+                                   {('add', 'memberUid'): ['syt']})
+            time.sleep(1.1) # timestamps precision is 1s
+            self.pull()
+
+            rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
+                                 {'name': 'logilab'})
+            self.assertEqual(len(rset), 2)
+            self.assertEqual(rset[0][0], 'adim')
+            self.assertEqual(rset[1][0], 'syt')
+
+        finally:
+            # back to normal ldap setup
+            self.tearDownClass()
+            self.setUpClass()
+
+    def test_group_member_deleted(self):
+        self.pull() # ensure we are sync'ed
+        rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
+                             {'name': 'logilab'})
+        self.assertEqual(len(rset), 1)
+        self.assertEqual(rset[0][0], 'adim')
+
+        try:
+            self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',
+                                   {('delete', 'memberUid'): ['adim']})
+            time.sleep(1.1) # timestamps precision is 1s
+            self.pull()
+
+            rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
+                                 {'name': 'logilab'})
+            self.assertEqual(len(rset), 0)
+        finally:
+            # back to normal ldap setup
+            self.tearDownClass()
+            self.setUpClass()
+
 
 class LDAPUserSourceTC(LDAPFeedTestBase):
     test_db_id = 'ldap-user'