# HG changeset patch # User Sylvain Thénault # Date 1448284988 -3600 # Node ID adffe1308a8f0dd1a3b6707a299292c7793331b7 # Parent 1ad66961ce8bbb2d1ffbe301547cecdd4299480d [test/ldap] a bit of pep8 diff -r 1ad66961ce8b -r adffe1308a8f server/test/unittest_ldapsource.py --- a/server/test/unittest_ldapsource.py Wed Jul 29 10:05:37 2015 +0200 +++ b/server/test/unittest_ldapsource.py Mon Nov 23 14:23:08 2015 +0100 @@ -22,16 +22,15 @@ import sys import shutil import time -from os.path import join, exists import subprocess import tempfile +from os.path import join from six import string_types from six.moves import range from cubicweb import AuthenticationError from cubicweb.devtools.testlib import CubicWebTC -from cubicweb.devtools.repotest import RQLGeneratorTC from cubicweb.devtools.httptest import get_available_port @@ -48,6 +47,7 @@ URL = None + def create_slapd_configuration(cls): global URL slapddir = tempfile.mkdtemp('cw-unittest-ldap') @@ -70,11 +70,11 @@ sys.stdout.write(stdout) sys.stderr.write(stderr) - #ldapuri = 'ldapi://' + join(basedir, "ldapi").replace('/', '%2f') + # ldapuri = 'ldapi://' + join(basedir, "ldapi").replace('/', '%2f') port = get_available_port(range(9000, 9100)) host = 'localhost:%s' % port ldapuri = 'ldap://%s' % host - cmdline = ["/usr/sbin/slapd", "-f", slapdconf, "-h", ldapuri, "-d", "0"] + cmdline = ["/usr/sbin/slapd", "-f", slapdconf, "-h", ldapuri, "-d", "0"] config.info('Starting slapd:', ' '.join(cmdline)) PIPE = subprocess.PIPE cls.slapd_process = subprocess.Popen(cmdline, stdout=PIPE, stderr=PIPE) @@ -87,6 +87,7 @@ URL = u'ldap://%s' % host return slapddir + def terminate_slapd(cls): config = cls.config if cls.slapd_process and cls.slapd_process.returncode is None: @@ -94,7 +95,7 @@ if hasattr(cls.slapd_process, 'terminate'): cls.slapd_process.terminate() else: - import os, signal + import signal os.kill(cls.slapd_process.pid, signal.SIGTERM) stdout, stderr = cls.slapd_process.communicate() if cls.slapd_process.returncode: @@ -143,7 +144,7 @@ cnx.execute('DELETE Any E WHERE E cw_source S, S name "ldap"') cnx.execute('SET S config %(conf)s, S url %(url)s ' 'WHERE S is CWSource, S name "ldap"', - {"conf": CONFIG_LDAPFEED, 'url': URL} ) + {"conf": CONFIG_LDAPFEED, 'url': URL}) cnx.commit() with self.repo.internal_cnx() as cnx: self.pull(cnx) @@ -152,32 +153,32 @@ """ add an LDAP entity """ - modcmd = ['dn: %s'%dn, 'changetype: add'] + modcmd = ['dn: %s' % dn, 'changetype: add'] for key, values in mods.items(): if isinstance(values, string_types): values = [values] for value in values: - modcmd.append('%s: %s'%(key, value)) + modcmd.append('%s: %s' % (key, value)) self._ldapmodify(modcmd) def delete_ldap_entry(self, dn): """ delete an LDAP entity """ - modcmd = ['dn: %s'%dn, 'changetype: delete'] + modcmd = ['dn: %s' % dn, 'changetype: delete'] self._ldapmodify(modcmd) def update_ldap_entry(self, dn, mods): """ modify one or more attributes of an LDAP entity """ - modcmd = ['dn: %s'%dn, 'changetype: modify'] + modcmd = ['dn: %s' % dn, 'changetype: modify'] for (kind, key), values in mods.items(): modcmd.append('%s: %s' % (kind, key)) if isinstance(values, string_types): values = [values] for value in values: - modcmd.append('%s: %s'%(key, value)) + modcmd.append('%s: %s' % (key, value)) modcmd.append('-') self._ldapmodify(modcmd) @@ -190,7 +191,8 @@ p.stdin.write('\n'.join(modcmd).encode('ascii')) p.stdin.close() if p.wait(): - raise RuntimeError("ldap update failed: %s"%('\n'.join(p.stderr.readlines()))) + raise RuntimeError("ldap update failed: %s" % ('\n'.join(p.stderr.readlines()))) + class CheckWrongGroup(LDAPFeedTestBase): """ @@ -200,18 +202,17 @@ def test_wrong_group(self): with self.admin_access.repo_cnx() as cnx: - source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0) + source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0, 0) config = source.repo_source.check_config(source) # inject a bogus group here, along with at least a valid one - config['user-default-group'] = ('thisgroupdoesnotexists','users') + config['user-default-group'] = ('thisgroupdoesnotexists', 'users') source.repo_source.update_config(source, config) cnx.commit() # here we emitted an error log entry - stats = source.repo_source.pull_data(cnx, force=True, raise_on_error=True) + source.repo_source.pull_data(cnx, force=True, raise_on_error=True) cnx.commit() - class LDAPFeedUserTC(LDAPFeedTestBase): """ A testcase for CWUser support in ldapfeed (basic tests and authentication). @@ -288,12 +289,12 @@ # and that the password stored in the system source is not empty or so user = cnx.execute('CWUser U WHERE U login "syt"').get_entity(0, 0) user.cw_clear_all_caches() - pwd = cnx.system_sql("SELECT cw_upassword FROM cw_cwuser WHERE cw_login='syt';").fetchall()[0][0] + cu = cnx.system_sql("SELECT cw_upassword FROM cw_cwuser WHERE cw_login='syt';") + pwd = cu.fetchall()[0][0] self.assertIsNotNone(pwd) self.assertTrue(str(pwd)) - class LDAPFeedUserDeletionTC(LDAPFeedTestBase): """ A testcase for situations where users are deleted from or @@ -303,7 +304,7 @@ def test_a_filter_inactivate(self): """ filtered out people should be deactivated, unable to authenticate """ with self.admin_access.repo_cnx() as cnx: - source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0) + source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0, 0) config = source.repo_source.check_config(source) # filter with adim's phone number config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109') @@ -350,21 +351,22 @@ self.pull(cnx) # reinsert syt self.add_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test', - { 'objectClass': ['OpenLDAPperson','posixAccount','top','shadowAccount'], - 'cn': 'Sylvain Thenault', - 'sn': 'Thenault', - 'gidNumber': '1004', - 'uid': 'syt', - 'homeDirectory': '/home/syt', - 'shadowFlag': '134538764', - 'uidNumber': '1004', - 'givenName': 'Sylvain', - 'telephoneNumber': '106', - 'displayName': 'sthenault', - 'gecos': 'Sylvain Thenault', - 'mail': ['sylvain.thenault@logilab.fr','syt@logilab.fr'], - 'userPassword': 'syt', - }) + {'objectClass': ['OpenLDAPperson', 'posixAccount', 'top', + 'shadowAccount'], + 'cn': 'Sylvain Thenault', + 'sn': 'Thenault', + 'gidNumber': '1004', + 'uid': 'syt', + 'homeDirectory': '/home/syt', + 'shadowFlag': '134538764', + 'uidNumber': '1004', + 'givenName': 'Sylvain', + 'telephoneNumber': '106', + 'displayName': 'sthenault', + 'gecos': 'Sylvain Thenault', + 'mail': ['sylvain.thenault@logilab.fr', 'syt@logilab.fr'], + 'userPassword': 'syt', + }) with self.repo.internal_cnx() as cnx: self.pull(cnx) with self.admin_access.repo_cnx() as cnx: @@ -437,7 +439,7 @@ try: self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test', - {('add', 'memberUid'): ['syt']}) + {('add', 'memberUid'): ['syt']}) time.sleep(1.1) # timestamps precision is 1s with self.repo.internal_cnx() as cnx: self.pull(cnx) @@ -456,7 +458,7 @@ def test_group_member_deleted(self): with self.repo.internal_cnx() as cnx: - self.pull(cnx) # ensure we are sync'ed + self.pull(cnx) # ensure we are sync'ed with self.admin_access.repo_cnx() as cnx: rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L', {'name': 'logilab'}) @@ -480,7 +482,6 @@ self.setUpClass() - if __name__ == '__main__': from logilab.common.testlib import unittest_main unittest_main()