[test/ldap] a bit of pep8
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Mon, 23 Nov 2015 14:23:08 +0100
changeset 10905 adffe1308a8f
parent 10904 1ad66961ce8b
child 10906 e1ce0866afe9
[test/ldap] a bit of pep8
server/test/unittest_ldapsource.py
--- a/server/test/unittest_ldapsource.py	Wed Jul 29 10:05:37 2015 +0200
+++ b/server/test/unittest_ldapsource.py	Mon Nov 23 14:23:08 2015 +0100
@@ -22,16 +22,15 @@
 import sys
 import shutil
 import time
-from os.path import join, exists
 import subprocess
 import tempfile
+from os.path import join
 
 from six import string_types
 from six.moves import range
 
 from cubicweb import AuthenticationError
 from cubicweb.devtools.testlib import CubicWebTC
-from cubicweb.devtools.repotest import RQLGeneratorTC
 from cubicweb.devtools.httptest import get_available_port
 
 
@@ -48,6 +47,7 @@
 
 URL = None
 
+
 def create_slapd_configuration(cls):
     global URL
     slapddir = tempfile.mkdtemp('cw-unittest-ldap')
@@ -70,11 +70,11 @@
         sys.stdout.write(stdout)
         sys.stderr.write(stderr)
 
-    #ldapuri = 'ldapi://' + join(basedir, "ldapi").replace('/', '%2f')
+    # ldapuri = 'ldapi://' + join(basedir, "ldapi").replace('/', '%2f')
     port = get_available_port(range(9000, 9100))
     host = 'localhost:%s' % port
     ldapuri = 'ldap://%s' % host
-    cmdline = ["/usr/sbin/slapd", "-f",  slapdconf,  "-h",  ldapuri, "-d", "0"]
+    cmdline = ["/usr/sbin/slapd", "-f", slapdconf, "-h", ldapuri, "-d", "0"]
     config.info('Starting slapd:', ' '.join(cmdline))
     PIPE = subprocess.PIPE
     cls.slapd_process = subprocess.Popen(cmdline, stdout=PIPE, stderr=PIPE)
@@ -87,6 +87,7 @@
     URL = u'ldap://%s' % host
     return slapddir
 
+
 def terminate_slapd(cls):
     config = cls.config
     if cls.slapd_process and cls.slapd_process.returncode is None:
@@ -94,7 +95,7 @@
         if hasattr(cls.slapd_process, 'terminate'):
             cls.slapd_process.terminate()
         else:
-            import os, signal
+            import signal
             os.kill(cls.slapd_process.pid, signal.SIGTERM)
         stdout, stderr = cls.slapd_process.communicate()
         if cls.slapd_process.returncode:
@@ -143,7 +144,7 @@
             cnx.execute('DELETE Any E WHERE E cw_source S, S name "ldap"')
             cnx.execute('SET S config %(conf)s, S url %(url)s '
                         'WHERE S is CWSource, S name "ldap"',
-                        {"conf": CONFIG_LDAPFEED, 'url': URL} )
+                        {"conf": CONFIG_LDAPFEED, 'url': URL})
             cnx.commit()
         with self.repo.internal_cnx() as cnx:
             self.pull(cnx)
@@ -152,32 +153,32 @@
         """
         add an LDAP entity
         """
-        modcmd = ['dn: %s'%dn, 'changetype: add']
+        modcmd = ['dn: %s' % dn, 'changetype: add']
         for key, values in mods.items():
             if isinstance(values, string_types):
                 values = [values]
             for value in values:
-                modcmd.append('%s: %s'%(key, value))
+                modcmd.append('%s: %s' % (key, value))
         self._ldapmodify(modcmd)
 
     def delete_ldap_entry(self, dn):
         """
         delete an LDAP entity
         """
-        modcmd = ['dn: %s'%dn, 'changetype: delete']
+        modcmd = ['dn: %s' % dn, 'changetype: delete']
         self._ldapmodify(modcmd)
 
     def update_ldap_entry(self, dn, mods):
         """
         modify one or more attributes of an LDAP entity
         """
-        modcmd = ['dn: %s'%dn, 'changetype: modify']
+        modcmd = ['dn: %s' % dn, 'changetype: modify']
         for (kind, key), values in mods.items():
             modcmd.append('%s: %s' % (kind, key))
             if isinstance(values, string_types):
                 values = [values]
             for value in values:
-                modcmd.append('%s: %s'%(key, value))
+                modcmd.append('%s: %s' % (key, value))
             modcmd.append('-')
         self._ldapmodify(modcmd)
 
@@ -190,7 +191,8 @@
         p.stdin.write('\n'.join(modcmd).encode('ascii'))
         p.stdin.close()
         if p.wait():
-            raise RuntimeError("ldap update failed: %s"%('\n'.join(p.stderr.readlines())))
+            raise RuntimeError("ldap update failed: %s" % ('\n'.join(p.stderr.readlines())))
+
 
 class CheckWrongGroup(LDAPFeedTestBase):
     """
@@ -200,18 +202,17 @@
 
     def test_wrong_group(self):
         with self.admin_access.repo_cnx() as cnx:
-            source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
+            source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0, 0)
             config = source.repo_source.check_config(source)
             # inject a bogus group here, along with at least a valid one
-            config['user-default-group'] = ('thisgroupdoesnotexists','users')
+            config['user-default-group'] = ('thisgroupdoesnotexists', 'users')
             source.repo_source.update_config(source, config)
             cnx.commit()
             # here we emitted an error log entry
-            stats = source.repo_source.pull_data(cnx, force=True, raise_on_error=True)
+            source.repo_source.pull_data(cnx, force=True, raise_on_error=True)
             cnx.commit()
 
 
-
 class LDAPFeedUserTC(LDAPFeedTestBase):
     """
     A testcase for CWUser support in ldapfeed (basic tests and authentication).
@@ -288,12 +289,12 @@
             # and that the password stored in the system source is not empty or so
             user = cnx.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
             user.cw_clear_all_caches()
-            pwd = cnx.system_sql("SELECT cw_upassword FROM cw_cwuser WHERE cw_login='syt';").fetchall()[0][0]
+            cu = cnx.system_sql("SELECT cw_upassword FROM cw_cwuser WHERE cw_login='syt';")
+            pwd = cu.fetchall()[0][0]
             self.assertIsNotNone(pwd)
             self.assertTrue(str(pwd))
 
 
-
 class LDAPFeedUserDeletionTC(LDAPFeedTestBase):
     """
     A testcase for situations where users are deleted from or
@@ -303,7 +304,7 @@
     def test_a_filter_inactivate(self):
         """ filtered out people should be deactivated, unable to authenticate """
         with self.admin_access.repo_cnx() as cnx:
-            source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
+            source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0, 0)
             config = source.repo_source.check_config(source)
             # filter with adim's phone number
             config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109')
@@ -350,21 +351,22 @@
             self.pull(cnx)
         # reinsert syt
         self.add_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test',
-                            { 'objectClass': ['OpenLDAPperson','posixAccount','top','shadowAccount'],
-                              'cn': 'Sylvain Thenault',
-                              'sn': 'Thenault',
-                              'gidNumber': '1004',
-                              'uid': 'syt',
-                              'homeDirectory': '/home/syt',
-                              'shadowFlag': '134538764',
-                              'uidNumber': '1004',
-                              'givenName': 'Sylvain',
-                              'telephoneNumber': '106',
-                              'displayName': 'sthenault',
-                              'gecos': 'Sylvain Thenault',
-                              'mail': ['sylvain.thenault@logilab.fr','syt@logilab.fr'],
-                              'userPassword': 'syt',
-                          })
+                            {'objectClass': ['OpenLDAPperson', 'posixAccount', 'top',
+                                             'shadowAccount'],
+                             'cn': 'Sylvain Thenault',
+                             'sn': 'Thenault',
+                             'gidNumber': '1004',
+                             'uid': 'syt',
+                             'homeDirectory': '/home/syt',
+                             'shadowFlag': '134538764',
+                             'uidNumber': '1004',
+                             'givenName': 'Sylvain',
+                             'telephoneNumber': '106',
+                             'displayName': 'sthenault',
+                             'gecos': 'Sylvain Thenault',
+                             'mail': ['sylvain.thenault@logilab.fr', 'syt@logilab.fr'],
+                             'userPassword': 'syt',
+                             })
         with self.repo.internal_cnx() as cnx:
             self.pull(cnx)
         with self.admin_access.repo_cnx() as cnx:
@@ -437,7 +439,7 @@
 
         try:
             self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',
-                                       {('add', 'memberUid'): ['syt']})
+                                   {('add', 'memberUid'): ['syt']})
             time.sleep(1.1) # timestamps precision is 1s
             with self.repo.internal_cnx() as cnx:
                 self.pull(cnx)
@@ -456,7 +458,7 @@
 
     def test_group_member_deleted(self):
         with self.repo.internal_cnx() as cnx:
-            self.pull(cnx) # ensure we are sync'ed
+            self.pull(cnx)  # ensure we are sync'ed
         with self.admin_access.repo_cnx() as cnx:
             rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',
                                {'name': 'logilab'})
@@ -480,7 +482,6 @@
             self.setUpClass()
 
 
-
 if __name__ == '__main__':
     from logilab.common.testlib import unittest_main
     unittest_main()