diff -r 192a748550c7 -r 030463e2b620 server/test/unittest_ldapsource.py --- a/server/test/unittest_ldapsource.py Tue Apr 23 17:58:20 2013 +0200 +++ b/server/test/unittest_ldapsource.py Tue Apr 23 16:50:18 2013 +0200 @@ -95,7 +95,9 @@ sys.stderr.write(stderr) config.info('DONE') -class LDAPTestBase(CubicWebTC): + +class LDAPFeedTestBase(CubicWebTC): + test_db_id = 'ldap-feed' loglevel = 'ERROR' @classmethod @@ -112,12 +114,75 @@ except: pass -class CheckWrongGroup(LDAPTestBase): + @classmethod + def pre_setup_database(cls, session, config): + session.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed', + url=URL, config=CONFIG_LDAPFEED) + + session.commit() + return cls._pull(session) + + @classmethod + def _pull(cls, session): + with session.repo.internal_session() as isession: + lfsource = isession.repo.sources_by_uri['ldap'] + stats = lfsource.pull_data(isession, force=True, raise_on_error=True) + isession.commit() + return stats + + def pull(self): + return self._pull(self.session) + + def setup_database(self): + if self.test_db_id == 'ldap-feed': + with self.session.repo.internal_session(safe=True) as session: + session.execute('DELETE Any E WHERE E cw_source S, S name "ldap"') + session.commit() + if self.test_db_id == 'ldap-feed': + src = self.sexecute('CWSource S WHERE S name "ldap"').get_entity(0,0) + src.cw_set(config=CONFIG_LDAPFEED) + self.session.commit() + self.pull() + + def delete_ldap_entry(self, dn): + """ + delete an LDAP entity + """ + modcmd = ['dn: %s'%dn, 'changetype: delete'] + self._ldapmodify(modcmd) + + def update_ldap_entry(self, dn, mods): + """ + modify one or more attributes of an LDAP entity + """ + modcmd = ['dn: %s'%dn, 'changetype: modify'] + for (kind, key), values in mods.iteritems(): + modcmd.append('%s: %s' % (kind, key)) + if isinstance(values, basestring): + values = [values] + for value in values: + modcmd.append('%s: %s'%(key, value)) + modcmd.append('-') + self._ldapmodify(modcmd) + + def _ldapmodify(self, modcmd): + uri = self.repo.sources_by_uri['ldap'].urls[0] + updatecmd = ['ldapmodify', '-H', uri, '-v', '-x', '-D', + 'cn=admin,dc=cubicweb,dc=test', '-w', 'cw'] + PIPE = subprocess.PIPE + p = subprocess.Popen(updatecmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) + p.stdin.write('\n'.join(modcmd)) + p.stdin.close() + if p.wait(): + raise RuntimeError("ldap update failed: %s"%('\n'.join(p.stderr.readlines()))) + +class CheckWrongGroup(LDAPFeedTestBase): + """ + A testcase for situations where the default group for CWUser + created from LDAP is wrongly configured. + """ def test_wrong_group(self): - self.session.create_entity('CWSource', name=u'ldapfeed', type=u'ldapfeed', parser=u'ldapfeed', - url=URL, config=CONFIG_LDAPFEED) - self.commit() with self.session.repo.internal_session(safe=True) as session: source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0) config = source.repo_source.check_config(source) @@ -130,102 +195,11 @@ session.commit() -class DeleteStuffFromLDAPFeedSourceTC(LDAPTestBase): - test_db_id = 'ldap-feed' - @classmethod - def pre_setup_database(cls, session, config): - session.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed', - url=URL, config=CONFIG_LDAPFEED) - session.commit() - with session.repo.internal_session(safe=True) as isession: - lfsource = isession.repo.sources_by_uri['ldap'] - stats = lfsource.pull_data(isession, force=True, raise_on_error=True) - - def _pull(self): - with self.session.repo.internal_session() as isession: - lfsource = isession.repo.sources_by_uri['ldap'] - stats = lfsource.pull_data(isession, force=True, raise_on_error=True) - isession.commit() - - def test_a_filter_inactivate(self): - """ filtered out people should be deactivated, unable to authenticate """ - source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0) - config = source.repo_source.check_config(source) - # filter with adim's phone number - config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109') - source.repo_source.update_config(source, config) - self.commit() - self._pull() - self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt') - self.assertEqual(self.execute('Any N WHERE U login "syt", ' - 'U in_state S, S name N').rows[0][0], - 'deactivated') - self.assertEqual(self.execute('Any N WHERE U login "adim", ' - 'U in_state S, S name N').rows[0][0], - 'activated') - # unfilter, syt should be activated again - config['user-filter'] = u'' - source.repo_source.update_config(source, config) - self.commit() - self._pull() - self.assertEqual(self.execute('Any N WHERE U login "syt", ' - 'U in_state S, S name N').rows[0][0], - 'activated') - self.assertEqual(self.execute('Any N WHERE U login "adim", ' - 'U in_state S, S name N').rows[0][0], - 'activated') - - def test_delete(self): - """ delete syt, pull, check deactivation, repull, - readd syt, pull, check activation - """ - uri = self.repo.sources_by_uri['ldap'].urls[0] - deletecmd = ("ldapdelete -H %s 'uid=syt,ou=People,dc=cubicweb,dc=test' " - "-v -x -D cn=admin,dc=cubicweb,dc=test -w'cw'" % uri) - os.system(deletecmd) - self._pull() - self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt') - self.assertEqual(self.execute('Any N WHERE U login "syt", ' - 'U in_state S, S name N').rows[0][0], - 'deactivated') - # check that it doesn't choke - self._pull() - # reset the fscking ldap thing - self.tearDownClass() - self.setUpClass() - self._pull() - self.assertEqual(self.execute('Any N WHERE U login "syt", ' - 'U in_state S, S name N').rows[0][0], - 'activated') - # test reactivating the user isn't enough to authenticate, as the native source - # refuse to authenticate user from other sources - os.system(deletecmd) - self._pull() - user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0) - user.cw_adapt_to('IWorkflowable').fire_transition('activate') - self.commit() - self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt') - - -class LDAPFeedSourceTC(LDAPTestBase): - test_db_id = 'ldap-feed' - - @classmethod - def pre_setup_database(cls, session, config): - session.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed', - url=URL, config=CONFIG_LDAPFEED) - session.commit() - isession = session.repo.internal_session(safe=True) - lfsource = isession.repo.sources_by_uri['ldap'] - stats = lfsource.pull_data(isession, force=True, raise_on_error=True) - - def setUp(self): - super(LDAPFeedSourceTC, self).setUp() - # ldap source url in the database may use a different port as the one - # just attributed - lfsource = self.repo.sources_by_uri['ldap'] - lfsource.urls = [URL] +class LDAPFeedUserTC(LDAPFeedTestBase): + """ + A testcase for CWUser support in ldapfeed (basic tests and authentication). + """ def assertMetadata(self, entity): self.assertTrue(entity.creation_date) @@ -282,7 +256,81 @@ self.session, 'syt', password='syt')) -class LDAPUserSourceTC(LDAPFeedSourceTC): +class LDAPFeedUserDeletionTC(LDAPFeedTestBase): + """ + A testcase for situations where users are deleted from or + unavailabe in the LDAP database. + """ + def test_a_filter_inactivate(self): + """ filtered out people should be deactivated, unable to authenticate """ + source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0) + config = source.repo_source.check_config(source) + # filter with adim's phone number + config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109') + source.repo_source.update_config(source, config) + self.commit() + self.pull() + self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt') + self.assertEqual(self.execute('Any N WHERE U login "syt", ' + 'U in_state S, S name N').rows[0][0], + 'deactivated') + self.assertEqual(self.execute('Any N WHERE U login "adim", ' + 'U in_state S, S name N').rows[0][0], + 'activated') + # unfilter, syt should be activated again + config['user-filter'] = u'' + source.repo_source.update_config(source, config) + self.commit() + self.pull() + self.assertEqual(self.execute('Any N WHERE U login "syt", ' + 'U in_state S, S name N').rows[0][0], + 'activated') + self.assertEqual(self.execute('Any N WHERE U login "adim", ' + 'U in_state S, S name N').rows[0][0], + 'activated') + + def test_delete(self): + """ delete syt, pull, check deactivation, repull, + read syt, pull, check activation + """ + self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test') + self.pull() + self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt') + self.assertEqual(self.execute('Any N WHERE U login "syt", ' + 'U in_state S, S name N').rows[0][0], + 'deactivated') + # check that it doesn't choke + self.pull() + # reset the ldap database + self.tearDownClass() + self.setUpClass() + self.pull() + self.assertEqual(self.execute('Any N WHERE U login "syt", ' + 'U in_state S, S name N').rows[0][0], + 'activated') + + def test_reactivate_deleted(self): + # test reactivating BY HAND the user isn't enough to + # authenticate, as the native source refuse to authenticate + # user from other sources + self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test') + self.pull() + # reactivate user (which source is still ldap-feed) + user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0) + user.cw_adapt_to('IWorkflowable').fire_transition('activate') + self.commit() + with self.assertRaises(AuthenticationError): + self.repo.connect('syt', password='syt') + + # ok now let's try to make it a system user + self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': user.eid}) + self.commit() + # and that we can now authenticate again + self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='toto') + self.assertTrue(self.repo.connect('syt', password='syt')) + + +class LDAPUserSourceTC(LDAPFeedTestBase): test_db_id = 'ldap-user' tags = CubicWebTC.tags | Tags(('ldap'))