--- a/server/test/unittest_ldapsource.py Wed Jul 29 10:05:37 2015 +0200
+++ b/server/test/unittest_ldapsource.py Mon Nov 23 14:23:08 2015 +0100
@@ -22,16 +22,15 @@
import sys
import shutil
import time
-from os.path import join, exists
import subprocess
import tempfile
+from os.path import join
from six import string_types
from six.moves import range
from cubicweb import AuthenticationError
from cubicweb.devtools.testlib import CubicWebTC
-from cubicweb.devtools.repotest import RQLGeneratorTC
from cubicweb.devtools.httptest import get_available_port
@@ -48,6 +47,7 @@
URL = None
+
def create_slapd_configuration(cls):
global URL
slapddir = tempfile.mkdtemp('cw-unittest-ldap')
@@ -70,11 +70,11 @@
sys.stdout.write(stdout)
sys.stderr.write(stderr)
- #ldapuri = 'ldapi://' + join(basedir, "ldapi").replace('/', '%2f')
+ # ldapuri = 'ldapi://' + join(basedir, "ldapi").replace('/', '%2f')
port = get_available_port(range(9000, 9100))
host = 'localhost:%s' % port
ldapuri = 'ldap://%s' % host
- cmdline = ["/usr/sbin/slapd", "-f", slapdconf, "-h", ldapuri, "-d", "0"]
+ cmdline = ["/usr/sbin/slapd", "-f", slapdconf, "-h", ldapuri, "-d", "0"]
config.info('Starting slapd:', ' '.join(cmdline))
PIPE = subprocess.PIPE
cls.slapd_process = subprocess.Popen(cmdline, stdout=PIPE, stderr=PIPE)
@@ -87,6 +87,7 @@
URL = u'ldap://%s' % host
return slapddir
+
def terminate_slapd(cls):
config = cls.config
if cls.slapd_process and cls.slapd_process.returncode is None:
@@ -94,7 +95,7 @@
if hasattr(cls.slapd_process, 'terminate'):
cls.slapd_process.terminate()
else:
- import os, signal
+ import signal
os.kill(cls.slapd_process.pid, signal.SIGTERM)
stdout, stderr = cls.slapd_process.communicate()
if cls.slapd_process.returncode:
@@ -143,7 +144,7 @@
cnx.execute('DELETE Any E WHERE E cw_source S, S name "ldap"')
cnx.execute('SET S config %(conf)s, S url %(url)s '
'WHERE S is CWSource, S name "ldap"',
- {"conf": CONFIG_LDAPFEED, 'url': URL} )
+ {"conf": CONFIG_LDAPFEED, 'url': URL})
cnx.commit()
with self.repo.internal_cnx() as cnx:
self.pull(cnx)
@@ -152,32 +153,32 @@
"""
add an LDAP entity
"""
- modcmd = ['dn: %s'%dn, 'changetype: add']
+ modcmd = ['dn: %s' % dn, 'changetype: add']
for key, values in mods.items():
if isinstance(values, string_types):
values = [values]
for value in values:
- modcmd.append('%s: %s'%(key, value))
+ modcmd.append('%s: %s' % (key, value))
self._ldapmodify(modcmd)
def delete_ldap_entry(self, dn):
"""
delete an LDAP entity
"""
- modcmd = ['dn: %s'%dn, 'changetype: delete']
+ modcmd = ['dn: %s' % dn, 'changetype: delete']
self._ldapmodify(modcmd)
def update_ldap_entry(self, dn, mods):
"""
modify one or more attributes of an LDAP entity
"""
- modcmd = ['dn: %s'%dn, 'changetype: modify']
+ modcmd = ['dn: %s' % dn, 'changetype: modify']
for (kind, key), values in mods.items():
modcmd.append('%s: %s' % (kind, key))
if isinstance(values, string_types):
values = [values]
for value in values:
- modcmd.append('%s: %s'%(key, value))
+ modcmd.append('%s: %s' % (key, value))
modcmd.append('-')
self._ldapmodify(modcmd)
@@ -190,7 +191,8 @@
p.stdin.write('\n'.join(modcmd).encode('ascii'))
p.stdin.close()
if p.wait():
- raise RuntimeError("ldap update failed: %s"%('\n'.join(p.stderr.readlines())))
+ raise RuntimeError("ldap update failed: %s" % ('\n'.join(p.stderr.readlines())))
+
class CheckWrongGroup(LDAPFeedTestBase):
"""
@@ -200,18 +202,17 @@
def test_wrong_group(self):
with self.admin_access.repo_cnx() as cnx:
- source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
+ source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0, 0)
config = source.repo_source.check_config(source)
# inject a bogus group here, along with at least a valid one
- config['user-default-group'] = ('thisgroupdoesnotexists','users')
+ config['user-default-group'] = ('thisgroupdoesnotexists', 'users')
source.repo_source.update_config(source, config)
cnx.commit()
# here we emitted an error log entry
- stats = source.repo_source.pull_data(cnx, force=True, raise_on_error=True)
+ source.repo_source.pull_data(cnx, force=True, raise_on_error=True)
cnx.commit()
-
class LDAPFeedUserTC(LDAPFeedTestBase):
"""
A testcase for CWUser support in ldapfeed (basic tests and authentication).
@@ -288,12 +289,12 @@
# and that the password stored in the system source is not empty or so
user = cnx.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
user.cw_clear_all_caches()
- pwd = cnx.system_sql("SELECT cw_upassword FROM cw_cwuser WHERE cw_login='syt';").fetchall()[0][0]
+ cu = cnx.system_sql("SELECT cw_upassword FROM cw_cwuser WHERE cw_login='syt';")
+ pwd = cu.fetchall()[0][0]
self.assertIsNotNone(pwd)
self.assertTrue(str(pwd))
-
class LDAPFeedUserDeletionTC(LDAPFeedTestBase):
"""
A testcase for situations where users are deleted from or
@@ -303,7 +304,7 @@
def test_a_filter_inactivate(self):
""" filtered out people should be deactivated, unable to authenticate """
with self.admin_access.repo_cnx() as cnx:
- source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
+ source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0, 0)
config = source.repo_source.check_config(source)
# filter with adim's phone number
config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109')
@@ -350,21 +351,22 @@
self.pull(cnx)
# reinsert syt
self.add_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test',
- { 'objectClass': ['OpenLDAPperson','posixAccount','top','shadowAccount'],
- 'cn': 'Sylvain Thenault',
- 'sn': 'Thenault',
- 'gidNumber': '1004',
- 'uid': 'syt',
- 'homeDirectory': '/home/syt',
- 'shadowFlag': '134538764',
- 'uidNumber': '1004',
- 'givenName': 'Sylvain',
- 'telephoneNumber': '106',
- 'displayName': 'sthenault',
- 'gecos': 'Sylvain Thenault',
- 'mail': ['sylvain.thenault@logilab.fr','syt@logilab.fr'],
- 'userPassword': 'syt',
- })
+ {'objectClass': ['OpenLDAPperson', 'posixAccount', 'top',
+ 'shadowAccount'],
+ 'cn': 'Sylvain Thenault',
+ 'sn': 'Thenault',
+ 'gidNumber': '1004',
+ 'uid': 'syt',
+ 'homeDirectory': '/home/syt',
+ 'shadowFlag': '134538764',
+ 'uidNumber': '1004',
+ 'givenName': 'Sylvain',
+ 'telephoneNumber': '106',
+ 'displayName': 'sthenault',
+ 'gecos': 'Sylvain Thenault',
+ 'mail': ['sylvain.thenault@logilab.fr', 'syt@logilab.fr'],
+ 'userPassword': 'syt',
+ })
with self.repo.internal_cnx() as cnx:
self.pull(cnx)
with self.admin_access.repo_cnx() as cnx:
@@ -437,7 +439,7 @@
try:
self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',
- {('add', 'memberUid'): ['syt']})
+ {('add', 'memberUid'): ['syt']})
time.sleep(1.1) # timestamps precision is 1s
with self.repo.internal_cnx() as cnx:
self.pull(cnx)
@@ -456,7 +458,7 @@
def test_group_member_deleted(self):
with self.repo.internal_cnx() as cnx:
- self.pull(cnx) # ensure we are sync'ed
+ self.pull(cnx) # ensure we are sync'ed
with self.admin_access.repo_cnx() as cnx:
rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',
{'name': 'logilab'})
@@ -480,7 +482,6 @@
self.setUpClass()
-
if __name__ == '__main__':
from logilab.common.testlib import unittest_main
unittest_main()