--- a/server/test/unittest_ldapsource.py Tue Apr 23 17:58:20 2013 +0200
+++ b/server/test/unittest_ldapsource.py Tue Apr 23 16:50:18 2013 +0200
@@ -95,7 +95,9 @@
sys.stderr.write(stderr)
config.info('DONE')
-class LDAPTestBase(CubicWebTC):
+
+class LDAPFeedTestBase(CubicWebTC):
+ test_db_id = 'ldap-feed'
loglevel = 'ERROR'
@classmethod
@@ -112,12 +114,75 @@
except:
pass
-class CheckWrongGroup(LDAPTestBase):
+ @classmethod
+ def pre_setup_database(cls, session, config):
+ session.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed',
+ url=URL, config=CONFIG_LDAPFEED)
+
+ session.commit()
+ return cls._pull(session)
+
+ @classmethod
+ def _pull(cls, session):
+ with session.repo.internal_session() as isession:
+ lfsource = isession.repo.sources_by_uri['ldap']
+ stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
+ isession.commit()
+ return stats
+
+ def pull(self):
+ return self._pull(self.session)
+
+ def setup_database(self):
+ if self.test_db_id == 'ldap-feed':
+ with self.session.repo.internal_session(safe=True) as session:
+ session.execute('DELETE Any E WHERE E cw_source S, S name "ldap"')
+ session.commit()
+ if self.test_db_id == 'ldap-feed':
+ src = self.sexecute('CWSource S WHERE S name "ldap"').get_entity(0,0)
+ src.cw_set(config=CONFIG_LDAPFEED)
+ self.session.commit()
+ self.pull()
+
+ def delete_ldap_entry(self, dn):
+ """
+ delete an LDAP entity
+ """
+ modcmd = ['dn: %s'%dn, 'changetype: delete']
+ self._ldapmodify(modcmd)
+
+ def update_ldap_entry(self, dn, mods):
+ """
+ modify one or more attributes of an LDAP entity
+ """
+ modcmd = ['dn: %s'%dn, 'changetype: modify']
+ for (kind, key), values in mods.iteritems():
+ modcmd.append('%s: %s' % (kind, key))
+ if isinstance(values, basestring):
+ values = [values]
+ for value in values:
+ modcmd.append('%s: %s'%(key, value))
+ modcmd.append('-')
+ self._ldapmodify(modcmd)
+
+ def _ldapmodify(self, modcmd):
+ uri = self.repo.sources_by_uri['ldap'].urls[0]
+ updatecmd = ['ldapmodify', '-H', uri, '-v', '-x', '-D',
+ 'cn=admin,dc=cubicweb,dc=test', '-w', 'cw']
+ PIPE = subprocess.PIPE
+ p = subprocess.Popen(updatecmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
+ p.stdin.write('\n'.join(modcmd))
+ p.stdin.close()
+ if p.wait():
+ raise RuntimeError("ldap update failed: %s"%('\n'.join(p.stderr.readlines())))
+
+class CheckWrongGroup(LDAPFeedTestBase):
+ """
+ A testcase for situations where the default group for CWUser
+ created from LDAP is wrongly configured.
+ """
def test_wrong_group(self):
- self.session.create_entity('CWSource', name=u'ldapfeed', type=u'ldapfeed', parser=u'ldapfeed',
- url=URL, config=CONFIG_LDAPFEED)
- self.commit()
with self.session.repo.internal_session(safe=True) as session:
source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
config = source.repo_source.check_config(source)
@@ -130,102 +195,11 @@
session.commit()
-class DeleteStuffFromLDAPFeedSourceTC(LDAPTestBase):
- test_db_id = 'ldap-feed'
- @classmethod
- def pre_setup_database(cls, session, config):
- session.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed',
- url=URL, config=CONFIG_LDAPFEED)
- session.commit()
- with session.repo.internal_session(safe=True) as isession:
- lfsource = isession.repo.sources_by_uri['ldap']
- stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
-
- def _pull(self):
- with self.session.repo.internal_session() as isession:
- lfsource = isession.repo.sources_by_uri['ldap']
- stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
- isession.commit()
-
- def test_a_filter_inactivate(self):
- """ filtered out people should be deactivated, unable to authenticate """
- source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
- config = source.repo_source.check_config(source)
- # filter with adim's phone number
- config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109')
- source.repo_source.update_config(source, config)
- self.commit()
- self._pull()
- self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
- self.assertEqual(self.execute('Any N WHERE U login "syt", '
- 'U in_state S, S name N').rows[0][0],
- 'deactivated')
- self.assertEqual(self.execute('Any N WHERE U login "adim", '
- 'U in_state S, S name N').rows[0][0],
- 'activated')
- # unfilter, syt should be activated again
- config['user-filter'] = u''
- source.repo_source.update_config(source, config)
- self.commit()
- self._pull()
- self.assertEqual(self.execute('Any N WHERE U login "syt", '
- 'U in_state S, S name N').rows[0][0],
- 'activated')
- self.assertEqual(self.execute('Any N WHERE U login "adim", '
- 'U in_state S, S name N').rows[0][0],
- 'activated')
-
- def test_delete(self):
- """ delete syt, pull, check deactivation, repull,
- readd syt, pull, check activation
- """
- uri = self.repo.sources_by_uri['ldap'].urls[0]
- deletecmd = ("ldapdelete -H %s 'uid=syt,ou=People,dc=cubicweb,dc=test' "
- "-v -x -D cn=admin,dc=cubicweb,dc=test -w'cw'" % uri)
- os.system(deletecmd)
- self._pull()
- self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
- self.assertEqual(self.execute('Any N WHERE U login "syt", '
- 'U in_state S, S name N').rows[0][0],
- 'deactivated')
- # check that it doesn't choke
- self._pull()
- # reset the fscking ldap thing
- self.tearDownClass()
- self.setUpClass()
- self._pull()
- self.assertEqual(self.execute('Any N WHERE U login "syt", '
- 'U in_state S, S name N').rows[0][0],
- 'activated')
- # test reactivating the user isn't enough to authenticate, as the native source
- # refuse to authenticate user from other sources
- os.system(deletecmd)
- self._pull()
- user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
- user.cw_adapt_to('IWorkflowable').fire_transition('activate')
- self.commit()
- self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
-
-
-class LDAPFeedSourceTC(LDAPTestBase):
- test_db_id = 'ldap-feed'
-
- @classmethod
- def pre_setup_database(cls, session, config):
- session.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed',
- url=URL, config=CONFIG_LDAPFEED)
- session.commit()
- isession = session.repo.internal_session(safe=True)
- lfsource = isession.repo.sources_by_uri['ldap']
- stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
-
- def setUp(self):
- super(LDAPFeedSourceTC, self).setUp()
- # ldap source url in the database may use a different port as the one
- # just attributed
- lfsource = self.repo.sources_by_uri['ldap']
- lfsource.urls = [URL]
+class LDAPFeedUserTC(LDAPFeedTestBase):
+ """
+ A testcase for CWUser support in ldapfeed (basic tests and authentication).
+ """
def assertMetadata(self, entity):
self.assertTrue(entity.creation_date)
@@ -282,7 +256,81 @@
self.session, 'syt', password='syt'))
-class LDAPUserSourceTC(LDAPFeedSourceTC):
+class LDAPFeedUserDeletionTC(LDAPFeedTestBase):
+ """
+ A testcase for situations where users are deleted from or
+ unavailabe in the LDAP database.
+ """
+ def test_a_filter_inactivate(self):
+ """ filtered out people should be deactivated, unable to authenticate """
+ source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
+ config = source.repo_source.check_config(source)
+ # filter with adim's phone number
+ config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109')
+ source.repo_source.update_config(source, config)
+ self.commit()
+ self.pull()
+ self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
+ self.assertEqual(self.execute('Any N WHERE U login "syt", '
+ 'U in_state S, S name N').rows[0][0],
+ 'deactivated')
+ self.assertEqual(self.execute('Any N WHERE U login "adim", '
+ 'U in_state S, S name N').rows[0][0],
+ 'activated')
+ # unfilter, syt should be activated again
+ config['user-filter'] = u''
+ source.repo_source.update_config(source, config)
+ self.commit()
+ self.pull()
+ self.assertEqual(self.execute('Any N WHERE U login "syt", '
+ 'U in_state S, S name N').rows[0][0],
+ 'activated')
+ self.assertEqual(self.execute('Any N WHERE U login "adim", '
+ 'U in_state S, S name N').rows[0][0],
+ 'activated')
+
+ def test_delete(self):
+ """ delete syt, pull, check deactivation, repull,
+ read syt, pull, check activation
+ """
+ self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
+ self.pull()
+ self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
+ self.assertEqual(self.execute('Any N WHERE U login "syt", '
+ 'U in_state S, S name N').rows[0][0],
+ 'deactivated')
+ # check that it doesn't choke
+ self.pull()
+ # reset the ldap database
+ self.tearDownClass()
+ self.setUpClass()
+ self.pull()
+ self.assertEqual(self.execute('Any N WHERE U login "syt", '
+ 'U in_state S, S name N').rows[0][0],
+ 'activated')
+
+ def test_reactivate_deleted(self):
+ # test reactivating BY HAND the user isn't enough to
+ # authenticate, as the native source refuse to authenticate
+ # user from other sources
+ self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
+ self.pull()
+ # reactivate user (which source is still ldap-feed)
+ user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
+ user.cw_adapt_to('IWorkflowable').fire_transition('activate')
+ self.commit()
+ with self.assertRaises(AuthenticationError):
+ self.repo.connect('syt', password='syt')
+
+ # ok now let's try to make it a system user
+ self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': user.eid})
+ self.commit()
+ # and that we can now authenticate again
+ self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='toto')
+ self.assertTrue(self.repo.connect('syt', password='syt'))
+
+
+class LDAPUserSourceTC(LDAPFeedTestBase):
test_db_id = 'ldap-user'
tags = CubicWebTC.tags | Tags(('ldap'))