[test/ldap source] better source naming, some cleanups (prepares #2528116)
authorJérôme Roy
Wed, 10 Apr 2013 17:50:11 +0200
changeset 8901 661b6aaac240
parent 8900 010a59e12d89
child 8902 9edfa3660bf4
[test/ldap source] better source naming, some cleanups (prepares #2528116) ldap uri becomes "ldap" in all cases prepare separation of CONFIGs by source type
server/test/unittest_ldapsource.py
--- a/server/test/unittest_ldapsource.py	Tue Apr 23 15:33:50 2013 +0200
+++ b/server/test/unittest_ldapsource.py	Wed Apr 10 17:50:11 2013 +0200
@@ -34,7 +34,8 @@
 
 from cubicweb.server.sources.ldapuser import GlobTrFunc, UnknownEid, RQL2LDAPFilter
 
-CONFIG = u'user-base-dn=ou=People,dc=cubicweb,dc=test'
+CONFIG_LDAPFEED = CONFIG_LDAPUSER = u'''user-base-dn=ou=People,dc=cubicweb,dc=test'''
+
 URL = None
 
 def create_slapd_configuration(cls):
@@ -61,7 +62,7 @@
     cls.slapd_process = subprocess.Popen(cmdline)
     time.sleep(0.2)
     if cls.slapd_process.poll() is None:
-        config.info('slapd started with pid %s' % cls.slapd_process.pid)
+        config.info('slapd started with pid %s', cls.slapd_process.pid)
     else:
         raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' %
                                (" ".join(cmdline), os.getcwd()))
@@ -100,8 +101,8 @@
 class CheckWrongGroup(LDAPTestBase):
 
     def test_wrong_group(self):
-        self.session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed',
-                                   url=URL, config=CONFIG)
+        self.session.create_entity('CWSource', name=u'ldapfeed', type=u'ldapfeed', parser=u'ldapfeed',
+                                   url=URL, config=CONFIG_LDAPFEED)
         self.commit()
         with self.session.repo.internal_session(safe=True) as session:
             source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
@@ -114,21 +115,22 @@
             stats = source.repo_source.pull_data(session, force=True, raise_on_error=True)
             session.commit()
 
+
 class DeleteStuffFromLDAPFeedSourceTC(LDAPTestBase):
     test_db_id = 'ldap-feed'
 
     @classmethod
     def pre_setup_database(cls, session, config):
-        session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed',
-                              url=URL, config=CONFIG)
+        session.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed',
+                              url=URL, config=CONFIG_LDAPFEED)
         session.commit()
         with session.repo.internal_session(safe=True) as isession:
-            lfsource = isession.repo.sources_by_uri['ldapuser']
+            lfsource = isession.repo.sources_by_uri['ldap']
             stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
 
     def _pull(self):
         with self.session.repo.internal_session() as isession:
-            lfsource = isession.repo.sources_by_uri['ldapuser']
+            lfsource = isession.repo.sources_by_uri['ldap']
             stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
             isession.commit()
 
@@ -164,7 +166,7 @@
         """ delete syt, pull, check deactivation, repull,
         readd syt, pull, check activation
         """
-        uri = self.repo.sources_by_uri['ldapuser'].urls[0]
+        uri = self.repo.sources_by_uri['ldap'].urls[0]
         deletecmd = ("ldapdelete -H %s 'uid=syt,ou=People,dc=cubicweb,dc=test' "
                      "-v -x -D cn=admin,dc=cubicweb,dc=test -w'cw'" % uri)
         os.system(deletecmd)
@@ -191,23 +193,24 @@
         self.commit()
         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
 
+
 class LDAPFeedSourceTC(LDAPTestBase):
     test_db_id = 'ldap-feed'
 
     @classmethod
     def pre_setup_database(cls, session, config):
-        session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed',
-                              url=URL, config=CONFIG)
+        session.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed',
+                              url=URL, config=CONFIG_LDAPFEED)
         session.commit()
         isession = session.repo.internal_session(safe=True)
-        lfsource = isession.repo.sources_by_uri['ldapuser']
+        lfsource = isession.repo.sources_by_uri['ldap']
         stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
 
     def setUp(self):
         super(LDAPFeedSourceTC, self).setUp()
         # ldap source url in the database may use a different port as the one
         # just attributed
-        lfsource = self.repo.sources_by_uri['ldapuser']
+        lfsource = self.repo.sources_by_uri['ldap']
         lfsource.urls = [URL]
 
     def assertMetadata(self, entity):
@@ -215,7 +218,7 @@
         self.assertTrue(entity.modification_date)
 
     def test_authenticate(self):
-        source = self.repo.sources_by_uri['ldapuser']
+        source = self.repo.sources_by_uri['ldap']
         self.session.set_cnxset()
         # ensure we won't be logged against
         self.assertRaises(AuthenticationError,
@@ -241,7 +244,7 @@
         self.assertEqual(rset.rows, [[e.eid]])
 
     def test_copy_to_system_source(self):
-        source = self.repo.sources_by_uri['ldapuser']
+        source = self.repo.sources_by_uri['ldap']
         eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
         self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
         self.commit()
@@ -271,8 +274,8 @@
 
     @classmethod
     def pre_setup_database(cls, session, config):
-        session.create_entity('CWSource', name=u'ldapuser', type=u'ldapuser',
-                              url=URL, config=CONFIG)
+        session.create_entity('CWSource', name=u'ldap', type=u'ldapuser',
+                              url=URL, config=CONFIG_LDAPUSER)
         session.commit()
         # XXX keep it there
         session.execute('CWUser U')
@@ -282,7 +285,7 @@
         self.assertEqual(entity.modification_date, None)
 
     def test_synchronize(self):
-        source = self.repo.sources_by_uri['ldapuser']
+        source = self.repo.sources_by_uri['ldap']
         source.synchronize()
 
     def test_base(self):