--- a/server/test/unittest_ldapsource.py Tue Apr 23 15:33:50 2013 +0200
+++ b/server/test/unittest_ldapsource.py Wed Apr 10 17:50:11 2013 +0200
@@ -34,7 +34,8 @@
from cubicweb.server.sources.ldapuser import GlobTrFunc, UnknownEid, RQL2LDAPFilter
-CONFIG = u'user-base-dn=ou=People,dc=cubicweb,dc=test'
+CONFIG_LDAPFEED = CONFIG_LDAPUSER = u'''user-base-dn=ou=People,dc=cubicweb,dc=test'''
+
URL = None
def create_slapd_configuration(cls):
@@ -61,7 +62,7 @@
cls.slapd_process = subprocess.Popen(cmdline)
time.sleep(0.2)
if cls.slapd_process.poll() is None:
- config.info('slapd started with pid %s' % cls.slapd_process.pid)
+ config.info('slapd started with pid %s', cls.slapd_process.pid)
else:
raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' %
(" ".join(cmdline), os.getcwd()))
@@ -100,8 +101,8 @@
class CheckWrongGroup(LDAPTestBase):
def test_wrong_group(self):
- self.session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed',
- url=URL, config=CONFIG)
+ self.session.create_entity('CWSource', name=u'ldapfeed', type=u'ldapfeed', parser=u'ldapfeed',
+ url=URL, config=CONFIG_LDAPFEED)
self.commit()
with self.session.repo.internal_session(safe=True) as session:
source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
@@ -114,21 +115,22 @@
stats = source.repo_source.pull_data(session, force=True, raise_on_error=True)
session.commit()
+
class DeleteStuffFromLDAPFeedSourceTC(LDAPTestBase):
test_db_id = 'ldap-feed'
@classmethod
def pre_setup_database(cls, session, config):
- session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed',
- url=URL, config=CONFIG)
+ session.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed',
+ url=URL, config=CONFIG_LDAPFEED)
session.commit()
with session.repo.internal_session(safe=True) as isession:
- lfsource = isession.repo.sources_by_uri['ldapuser']
+ lfsource = isession.repo.sources_by_uri['ldap']
stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
def _pull(self):
with self.session.repo.internal_session() as isession:
- lfsource = isession.repo.sources_by_uri['ldapuser']
+ lfsource = isession.repo.sources_by_uri['ldap']
stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
isession.commit()
@@ -164,7 +166,7 @@
""" delete syt, pull, check deactivation, repull,
readd syt, pull, check activation
"""
- uri = self.repo.sources_by_uri['ldapuser'].urls[0]
+ uri = self.repo.sources_by_uri['ldap'].urls[0]
deletecmd = ("ldapdelete -H %s 'uid=syt,ou=People,dc=cubicweb,dc=test' "
"-v -x -D cn=admin,dc=cubicweb,dc=test -w'cw'" % uri)
os.system(deletecmd)
@@ -191,23 +193,24 @@
self.commit()
self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
+
class LDAPFeedSourceTC(LDAPTestBase):
test_db_id = 'ldap-feed'
@classmethod
def pre_setup_database(cls, session, config):
- session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed',
- url=URL, config=CONFIG)
+ session.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed',
+ url=URL, config=CONFIG_LDAPFEED)
session.commit()
isession = session.repo.internal_session(safe=True)
- lfsource = isession.repo.sources_by_uri['ldapuser']
+ lfsource = isession.repo.sources_by_uri['ldap']
stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
def setUp(self):
super(LDAPFeedSourceTC, self).setUp()
# ldap source url in the database may use a different port as the one
# just attributed
- lfsource = self.repo.sources_by_uri['ldapuser']
+ lfsource = self.repo.sources_by_uri['ldap']
lfsource.urls = [URL]
def assertMetadata(self, entity):
@@ -215,7 +218,7 @@
self.assertTrue(entity.modification_date)
def test_authenticate(self):
- source = self.repo.sources_by_uri['ldapuser']
+ source = self.repo.sources_by_uri['ldap']
self.session.set_cnxset()
# ensure we won't be logged against
self.assertRaises(AuthenticationError,
@@ -241,7 +244,7 @@
self.assertEqual(rset.rows, [[e.eid]])
def test_copy_to_system_source(self):
- source = self.repo.sources_by_uri['ldapuser']
+ source = self.repo.sources_by_uri['ldap']
eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
self.commit()
@@ -271,8 +274,8 @@
@classmethod
def pre_setup_database(cls, session, config):
- session.create_entity('CWSource', name=u'ldapuser', type=u'ldapuser',
- url=URL, config=CONFIG)
+ session.create_entity('CWSource', name=u'ldap', type=u'ldapuser',
+ url=URL, config=CONFIG_LDAPUSER)
session.commit()
# XXX keep it there
session.execute('CWUser U')
@@ -282,7 +285,7 @@
self.assertEqual(entity.modification_date, None)
def test_synchronize(self):
- source = self.repo.sources_by_uri['ldapuser']
+ source = self.repo.sources_by_uri['ldap']
source.synchronize()
def test_base(self):