[ldap] Major test refactoring
authorPierre-Yves David <pierre-yves.david@logilab.fr>
Tue, 23 Apr 2013 16:50:18 +0200
changeset 8904 030463e2b620
parent 8903 192a748550c7
child 8905 c8fc3e3b46e4
[ldap] Major test refactoring Mostly fix tests (previous refactoring), but also refactor a bit the base class to have better/simpler interaction with the slapd server.
server/test/data/ldap_test.ldif
server/test/unittest_ldapsource.py
--- a/server/test/data/ldap_test.ldif	Tue Apr 23 17:58:20 2013 +0200
+++ b/server/test/data/ldap_test.ldif	Tue Apr 23 16:50:18 2013 +0200
@@ -12,11 +12,10 @@
 
 dn: uid=syt,ou=People,dc=cubicweb,dc=test
 loginShell: /bin/bash
-objectClass: inetOrgPerson
+objectClass: OpenLDAPperson
 objectClass: posixAccount
 objectClass: top
 objectClass: shadowAccount
-structuralObjectClass: inetOrgPerson
 cn: Sylvain Thenault
 sn: Thenault
 shadowMax: 99999
@@ -35,7 +34,7 @@
 
 dn: uid=adim,ou=People,dc=cubicweb,dc=test
 loginShell: /bin/bash
-objectClass: inetOrgPerson
+objectClass: OpenLDAPperson
 objectClass: posixAccount
 objectClass: top
 objectClass: shadowAccount
@@ -46,7 +45,6 @@
 uid: adim
 homeDirectory: /home/adim
 uidNumber: 1006
-structuralObjectClass: inetOrgPerson
 givenName: Adrien
 telephoneNumber: 109
 displayName: adimascio
--- a/server/test/unittest_ldapsource.py	Tue Apr 23 17:58:20 2013 +0200
+++ b/server/test/unittest_ldapsource.py	Tue Apr 23 16:50:18 2013 +0200
@@ -95,7 +95,9 @@
             sys.stderr.write(stderr)
         config.info('DONE')
 
-class LDAPTestBase(CubicWebTC):
+
+class LDAPFeedTestBase(CubicWebTC):
+    test_db_id = 'ldap-feed'
     loglevel = 'ERROR'
 
     @classmethod
@@ -112,12 +114,75 @@
         except:
             pass
 
-class CheckWrongGroup(LDAPTestBase):
+    @classmethod
+    def pre_setup_database(cls, session, config):
+        session.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed',
+                              url=URL, config=CONFIG_LDAPFEED)
+
+        session.commit()
+        return cls._pull(session)
+
+    @classmethod
+    def _pull(cls, session):
+        with session.repo.internal_session() as isession:
+            lfsource = isession.repo.sources_by_uri['ldap']
+            stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
+            isession.commit()
+            return stats
+
+    def pull(self):
+        return self._pull(self.session)
+
+    def setup_database(self):
+        if self.test_db_id == 'ldap-feed':
+            with self.session.repo.internal_session(safe=True) as session:
+                session.execute('DELETE Any E WHERE E cw_source S, S name "ldap"')
+                session.commit()
+        if self.test_db_id == 'ldap-feed':
+            src = self.sexecute('CWSource S WHERE S name "ldap"').get_entity(0,0)
+            src.cw_set(config=CONFIG_LDAPFEED)
+        self.session.commit()
+        self.pull()
+
+    def delete_ldap_entry(self, dn):
+        """
+        delete an LDAP entity
+        """
+        modcmd = ['dn: %s'%dn, 'changetype: delete']
+        self._ldapmodify(modcmd)
+
+    def update_ldap_entry(self, dn, mods):
+        """
+        modify one or more attributes of an LDAP entity
+        """
+        modcmd = ['dn: %s'%dn, 'changetype: modify']
+        for (kind, key), values in mods.iteritems():
+            modcmd.append('%s: %s' % (kind, key))
+            if isinstance(values, basestring):
+                values = [values]
+            for value in values:
+                modcmd.append('%s: %s'%(key, value))
+            modcmd.append('-')
+        self._ldapmodify(modcmd)
+
+    def _ldapmodify(self, modcmd):
+        uri = self.repo.sources_by_uri['ldap'].urls[0]
+        updatecmd = ['ldapmodify', '-H', uri, '-v', '-x', '-D',
+                     'cn=admin,dc=cubicweb,dc=test', '-w', 'cw']
+        PIPE = subprocess.PIPE
+        p = subprocess.Popen(updatecmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
+        p.stdin.write('\n'.join(modcmd))
+        p.stdin.close()
+        if p.wait():
+            raise RuntimeError("ldap update failed: %s"%('\n'.join(p.stderr.readlines())))
+
+class CheckWrongGroup(LDAPFeedTestBase):
+    """
+    A testcase for situations where the default group for CWUser
+    created from LDAP is wrongly configured.
+    """
 
     def test_wrong_group(self):
-        self.session.create_entity('CWSource', name=u'ldapfeed', type=u'ldapfeed', parser=u'ldapfeed',
-                                   url=URL, config=CONFIG_LDAPFEED)
-        self.commit()
         with self.session.repo.internal_session(safe=True) as session:
             source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
             config = source.repo_source.check_config(source)
@@ -130,102 +195,11 @@
             session.commit()
 
 
-class DeleteStuffFromLDAPFeedSourceTC(LDAPTestBase):
-    test_db_id = 'ldap-feed'
 
-    @classmethod
-    def pre_setup_database(cls, session, config):
-        session.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed',
-                              url=URL, config=CONFIG_LDAPFEED)
-        session.commit()
-        with session.repo.internal_session(safe=True) as isession:
-            lfsource = isession.repo.sources_by_uri['ldap']
-            stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
-
-    def _pull(self):
-        with self.session.repo.internal_session() as isession:
-            lfsource = isession.repo.sources_by_uri['ldap']
-            stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
-            isession.commit()
-
-    def test_a_filter_inactivate(self):
-        """ filtered out people should be deactivated, unable to authenticate """
-        source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
-        config = source.repo_source.check_config(source)
-        # filter with adim's phone number
-        config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109')
-        source.repo_source.update_config(source, config)
-        self.commit()
-        self._pull()
-        self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
-        self.assertEqual(self.execute('Any N WHERE U login "syt", '
-                                      'U in_state S, S name N').rows[0][0],
-                         'deactivated')
-        self.assertEqual(self.execute('Any N WHERE U login "adim", '
-                                      'U in_state S, S name N').rows[0][0],
-                         'activated')
-        # unfilter, syt should be activated again
-        config['user-filter'] = u''
-        source.repo_source.update_config(source, config)
-        self.commit()
-        self._pull()
-        self.assertEqual(self.execute('Any N WHERE U login "syt", '
-                                      'U in_state S, S name N').rows[0][0],
-                         'activated')
-        self.assertEqual(self.execute('Any N WHERE U login "adim", '
-                                      'U in_state S, S name N').rows[0][0],
-                         'activated')
-
-    def test_delete(self):
-        """ delete syt, pull, check deactivation, repull,
-        readd syt, pull, check activation
-        """
-        uri = self.repo.sources_by_uri['ldap'].urls[0]
-        deletecmd = ("ldapdelete -H %s 'uid=syt,ou=People,dc=cubicweb,dc=test' "
-                     "-v -x -D cn=admin,dc=cubicweb,dc=test -w'cw'" % uri)
-        os.system(deletecmd)
-        self._pull()
-        self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
-        self.assertEqual(self.execute('Any N WHERE U login "syt", '
-                                      'U in_state S, S name N').rows[0][0],
-                         'deactivated')
-        # check that it doesn't choke
-        self._pull()
-        # reset the fscking ldap thing
-        self.tearDownClass()
-        self.setUpClass()
-        self._pull()
-        self.assertEqual(self.execute('Any N WHERE U login "syt", '
-                                      'U in_state S, S name N').rows[0][0],
-                         'activated')
-        # test reactivating the user isn't enough to authenticate, as the native source
-        # refuse to authenticate user from other sources
-        os.system(deletecmd)
-        self._pull()
-        user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
-        user.cw_adapt_to('IWorkflowable').fire_transition('activate')
-        self.commit()
-        self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
-
-
-class LDAPFeedSourceTC(LDAPTestBase):
-    test_db_id = 'ldap-feed'
-
-    @classmethod
-    def pre_setup_database(cls, session, config):
-        session.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed',
-                              url=URL, config=CONFIG_LDAPFEED)
-        session.commit()
-        isession = session.repo.internal_session(safe=True)
-        lfsource = isession.repo.sources_by_uri['ldap']
-        stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
-
-    def setUp(self):
-        super(LDAPFeedSourceTC, self).setUp()
-        # ldap source url in the database may use a different port as the one
-        # just attributed
-        lfsource = self.repo.sources_by_uri['ldap']
-        lfsource.urls = [URL]
+class LDAPFeedUserTC(LDAPFeedTestBase):
+    """
+    A testcase for CWUser support in ldapfeed (basic tests and authentication).
+    """
 
     def assertMetadata(self, entity):
         self.assertTrue(entity.creation_date)
@@ -282,7 +256,81 @@
                 self.session, 'syt', password='syt'))
 
 
-class LDAPUserSourceTC(LDAPFeedSourceTC):
+class LDAPFeedUserDeletionTC(LDAPFeedTestBase):
+    """
+    A testcase for situations where users are deleted from or
+    unavailabe in the LDAP database.
+    """
+    def test_a_filter_inactivate(self):
+        """ filtered out people should be deactivated, unable to authenticate """
+        source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
+        config = source.repo_source.check_config(source)
+        # filter with adim's phone number
+        config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109')
+        source.repo_source.update_config(source, config)
+        self.commit()
+        self.pull()
+        self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
+        self.assertEqual(self.execute('Any N WHERE U login "syt", '
+                                      'U in_state S, S name N').rows[0][0],
+                         'deactivated')
+        self.assertEqual(self.execute('Any N WHERE U login "adim", '
+                                      'U in_state S, S name N').rows[0][0],
+                         'activated')
+        # unfilter, syt should be activated again
+        config['user-filter'] = u''
+        source.repo_source.update_config(source, config)
+        self.commit()
+        self.pull()
+        self.assertEqual(self.execute('Any N WHERE U login "syt", '
+                                      'U in_state S, S name N').rows[0][0],
+                         'activated')
+        self.assertEqual(self.execute('Any N WHERE U login "adim", '
+                                      'U in_state S, S name N').rows[0][0],
+                         'activated')
+
+    def test_delete(self):
+        """ delete syt, pull, check deactivation, repull,
+        read syt, pull, check activation
+        """
+        self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
+        self.pull()
+        self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
+        self.assertEqual(self.execute('Any N WHERE U login "syt", '
+                                      'U in_state S, S name N').rows[0][0],
+                         'deactivated')
+        # check that it doesn't choke
+        self.pull()
+        # reset the ldap database
+        self.tearDownClass()
+        self.setUpClass()
+        self.pull()
+        self.assertEqual(self.execute('Any N WHERE U login "syt", '
+                                      'U in_state S, S name N').rows[0][0],
+                         'activated')
+
+    def test_reactivate_deleted(self):
+        # test reactivating BY HAND the user isn't enough to
+        # authenticate, as the native source refuse to authenticate
+        # user from other sources
+        self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
+        self.pull()
+        # reactivate user (which source is still ldap-feed)
+        user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
+        user.cw_adapt_to('IWorkflowable').fire_transition('activate')
+        self.commit()
+        with self.assertRaises(AuthenticationError):
+            self.repo.connect('syt', password='syt')
+
+        # ok now let's try to make it a system user
+        self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': user.eid})
+        self.commit()
+        # and that we can now authenticate again
+        self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='toto')
+        self.assertTrue(self.repo.connect('syt', password='syt'))
+
+
+class LDAPUserSourceTC(LDAPFeedTestBase):
     test_db_id = 'ldap-user'
     tags = CubicWebTC.tags | Tags(('ldap'))