--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/cubicweb/server/test/unittest_ldapsource.py Sat Jan 16 13:48:51 2016 +0100
@@ -0,0 +1,491 @@
+# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
+#
+# This file is part of CubicWeb.
+#
+# CubicWeb is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License along
+# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
+"""cubicweb.server.sources.ldapfeed unit and functional tests
+
+Those tests expect to have slapd, python-ldap3 and ldapscripts packages installed.
+"""
+from __future__ import print_function
+
+import os
+import sys
+import shutil
+import time
+import subprocess
+import tempfile
+import unittest
+from os.path import join
+
+from six import string_types
+from six.moves import range
+
+from cubicweb import AuthenticationError
+from cubicweb.devtools.testlib import CubicWebTC
+from cubicweb.devtools.httptest import get_available_port
+
+
+CONFIG_LDAPFEED = u'''
+user-base-dn=ou=People,dc=cubicweb,dc=test
+group-base-dn=ou=Group,dc=cubicweb,dc=test
+user-attrs-map=uid=login,mail=email,userPassword=upassword
+group-attrs-map=cn=name,memberUid=member
+'''
+CONFIG_LDAPUSER = u'''
+user-base-dn=ou=People,dc=cubicweb,dc=test
+user-attrs-map=uid=login,mail=email,userPassword=upassword
+'''
+
+URL = None
+
+
+def create_slapd_configuration(cls):
+ global URL
+ slapddir = tempfile.mkdtemp('cw-unittest-ldap')
+ config = cls.config
+ slapdconf = join(config.apphome, "slapd.conf")
+ confin = open(join(config.apphome, "slapd.conf.in")).read()
+ confstream = open(slapdconf, 'w')
+ confstream.write(confin % {'apphome': config.apphome, 'testdir': slapddir})
+ confstream.close()
+ # fill ldap server with some data
+ ldiffile = join(config.apphome, "ldap_test.ldif")
+ config.info('Initing ldap database')
+ cmdline = ['/usr/sbin/slapadd', '-f', slapdconf, '-l', ldiffile, '-c']
+ PIPE = subprocess.PIPE
+ slapproc = subprocess.Popen(cmdline, stdout=PIPE, stderr=PIPE)
+ stdout, stderr = slapproc.communicate()
+ if slapproc.returncode:
+ print('slapadd returned with status: %s'
+ % slapproc.returncode, file=sys.stderr)
+ sys.stdout.write(stdout)
+ sys.stderr.write(stderr)
+
+ # ldapuri = 'ldapi://' + join(basedir, "ldapi").replace('/', '%2f')
+ port = get_available_port(range(9000, 9100))
+ host = 'localhost:%s' % port
+ ldapuri = 'ldap://%s' % host
+ cmdline = ["/usr/sbin/slapd", "-f", slapdconf, "-h", ldapuri, "-d", "0"]
+ config.info('Starting slapd:', ' '.join(cmdline))
+ PIPE = subprocess.PIPE
+ cls.slapd_process = subprocess.Popen(cmdline, stdout=PIPE, stderr=PIPE)
+ time.sleep(0.2)
+ if cls.slapd_process.poll() is None:
+ config.info('slapd started with pid %s', cls.slapd_process.pid)
+ else:
+ raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' %
+ (" ".join(cmdline), os.getcwd()))
+ URL = u'ldap://%s' % host
+ return slapddir
+
+
+def terminate_slapd(cls):
+ config = cls.config
+ if cls.slapd_process and cls.slapd_process.returncode is None:
+ config.info('terminating slapd')
+ if hasattr(cls.slapd_process, 'terminate'):
+ cls.slapd_process.terminate()
+ else:
+ import signal
+ os.kill(cls.slapd_process.pid, signal.SIGTERM)
+ stdout, stderr = cls.slapd_process.communicate()
+ if cls.slapd_process.returncode:
+ print('slapd returned with status: %s'
+ % cls.slapd_process.returncode, file=sys.stderr)
+ sys.stdout.write(stdout)
+ sys.stderr.write(stderr)
+ config.info('DONE')
+
+
+class LDAPFeedTestBase(CubicWebTC):
+ test_db_id = 'ldap-feed'
+ loglevel = 'ERROR'
+
+ @classmethod
+ def setUpClass(cls):
+ if not os.path.exists('/usr/sbin/slapd'):
+ raise unittest.SkipTest('slapd not found')
+ from cubicweb.cwctl import init_cmdline_log_threshold
+ init_cmdline_log_threshold(cls.config, cls.loglevel)
+ cls._tmpdir = create_slapd_configuration(cls)
+
+ @classmethod
+ def tearDownClass(cls):
+ terminate_slapd(cls)
+ try:
+ shutil.rmtree(cls._tmpdir)
+ except:
+ pass
+
+ @classmethod
+ def pre_setup_database(cls, cnx, config):
+ cnx.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed',
+ url=URL, config=CONFIG_LDAPFEED)
+
+ cnx.commit()
+ return cls.pull(cnx)
+
+ @classmethod
+ def pull(self, cnx):
+ lfsource = cnx.repo.sources_by_uri['ldap']
+ stats = lfsource.pull_data(cnx, force=True, raise_on_error=True)
+ cnx.commit()
+ return stats
+
+ def setup_database(self):
+ with self.admin_access.repo_cnx() as cnx:
+ cnx.execute('DELETE Any E WHERE E cw_source S, S name "ldap"')
+ cnx.execute('SET S config %(conf)s, S url %(url)s '
+ 'WHERE S is CWSource, S name "ldap"',
+ {"conf": CONFIG_LDAPFEED, 'url': URL})
+ cnx.commit()
+ with self.repo.internal_cnx() as cnx:
+ self.pull(cnx)
+
+ def add_ldap_entry(self, dn, mods):
+ """
+ add an LDAP entity
+ """
+ modcmd = ['dn: %s' % dn, 'changetype: add']
+ for key, values in mods.items():
+ if isinstance(values, string_types):
+ values = [values]
+ for value in values:
+ modcmd.append('%s: %s' % (key, value))
+ self._ldapmodify(modcmd)
+
+ def delete_ldap_entry(self, dn):
+ """
+ delete an LDAP entity
+ """
+ modcmd = ['dn: %s' % dn, 'changetype: delete']
+ self._ldapmodify(modcmd)
+
+ def update_ldap_entry(self, dn, mods):
+ """
+ modify one or more attributes of an LDAP entity
+ """
+ modcmd = ['dn: %s' % dn, 'changetype: modify']
+ for (kind, key), values in mods.items():
+ modcmd.append('%s: %s' % (kind, key))
+ if isinstance(values, string_types):
+ values = [values]
+ for value in values:
+ modcmd.append('%s: %s' % (key, value))
+ modcmd.append('-')
+ self._ldapmodify(modcmd)
+
+ def _ldapmodify(self, modcmd):
+ uri = self.repo.sources_by_uri['ldap'].urls[0]
+ updatecmd = ['ldapmodify', '-H', uri, '-v', '-x', '-D',
+ 'cn=admin,dc=cubicweb,dc=test', '-w', 'cw']
+ PIPE = subprocess.PIPE
+ p = subprocess.Popen(updatecmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
+ p.stdin.write('\n'.join(modcmd).encode('ascii'))
+ p.stdin.close()
+ if p.wait():
+ raise RuntimeError("ldap update failed: %s" % ('\n'.join(p.stderr.readlines())))
+
+
+class CheckWrongGroup(LDAPFeedTestBase):
+ """
+ A testcase for situations where the default group for CWUser
+ created from LDAP is wrongly configured.
+ """
+
+ def test_wrong_group(self):
+ with self.admin_access.repo_cnx() as cnx:
+ source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0, 0)
+ config = source.repo_source.check_config(source)
+ # inject a bogus group here, along with at least a valid one
+ config['user-default-group'] = ('thisgroupdoesnotexists', 'users')
+ source.repo_source.update_config(source, config)
+ cnx.commit()
+ # here we emitted an error log entry
+ source.repo_source.pull_data(cnx, force=True, raise_on_error=True)
+ cnx.commit()
+
+
+class LDAPFeedUserTC(LDAPFeedTestBase):
+ """
+ A testcase for CWUser support in ldapfeed (basic tests and authentication).
+ """
+
+ def assertMetadata(self, entity):
+ self.assertTrue(entity.creation_date)
+ self.assertTrue(entity.modification_date)
+
+ def test_authenticate(self):
+ source = self.repo.sources_by_uri['ldap']
+ with self.admin_access.repo_cnx() as cnx:
+ # ensure we won't be logged against
+ self.assertRaises(AuthenticationError,
+ source.authenticate, cnx, 'toto', 'toto')
+ self.assertTrue(source.authenticate(cnx, 'syt', 'syt'))
+ sessionid = self.repo.connect('syt', password='syt')
+ self.assertTrue(sessionid)
+ self.repo.close(sessionid)
+
+ def test_base(self):
+ with self.admin_access.repo_cnx() as cnx:
+ # check a known one
+ rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
+ e = rset.get_entity(0, 0)
+ self.assertEqual(e.login, 'syt')
+ e.complete()
+ self.assertMetadata(e)
+ self.assertEqual(e.firstname, None)
+ self.assertEqual(e.surname, None)
+ self.assertIn('users', set(g.name for g in e.in_group))
+ self.assertEqual(e.owned_by[0].login, 'syt')
+ self.assertEqual(e.created_by, ())
+ addresses = [pe.address for pe in e.use_email]
+ addresses.sort()
+ self.assertEqual(['sylvain.thenault@logilab.fr', 'syt@logilab.fr'],
+ addresses)
+ self.assertIn(e.primary_email[0].address, ['sylvain.thenault@logilab.fr',
+ 'syt@logilab.fr'])
+ # email content should be indexed on the user
+ rset = cnx.execute('CWUser X WHERE X has_text "thenault"')
+ self.assertEqual(rset.rows, [[e.eid]])
+
+ def test_copy_to_system_source(self):
+ "make sure we can 'convert' an LDAP user into a system one"
+ with self.admin_access.repo_cnx() as cnx:
+ source = self.repo.sources_by_uri['ldap']
+ eid = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
+ cnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
+ cnx.commit()
+ source.reset_caches()
+ rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
+ self.assertEqual(len(rset), 1)
+ e = rset.get_entity(0, 0)
+ self.assertEqual(e.eid, eid)
+ self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native',
+ 'uri': u'system',
+ 'use-cwuri-as-url': False},
+ 'type': 'CWUser',
+ 'extid': None})
+ self.assertEqual(e.cw_source[0].name, 'system')
+ self.assertTrue(e.creation_date)
+ self.assertTrue(e.modification_date)
+ source.pull_data(cnx)
+ rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
+ self.assertEqual(len(rset), 1)
+ self.assertTrue(self.repo.system_source.authenticate(cnx, 'syt', password='syt'))
+ # make sure the pull from ldap have not "reverted" user as a ldap-feed user
+ self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native',
+ 'uri': u'system',
+ 'use-cwuri-as-url': False},
+ 'type': 'CWUser',
+ 'extid': None})
+ # and that the password stored in the system source is not empty or so
+ user = cnx.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
+ user.cw_clear_all_caches()
+ cu = cnx.system_sql("SELECT cw_upassword FROM cw_cwuser WHERE cw_login='syt';")
+ pwd = cu.fetchall()[0][0]
+ self.assertIsNotNone(pwd)
+ self.assertTrue(str(pwd))
+
+
+class LDAPFeedUserDeletionTC(LDAPFeedTestBase):
+ """
+ A testcase for situations where users are deleted from or
+ unavailable in the LDAP database.
+ """
+
+ def test_a_filter_inactivate(self):
+ """ filtered out people should be deactivated, unable to authenticate """
+ with self.admin_access.repo_cnx() as cnx:
+ source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0, 0)
+ config = source.repo_source.check_config(source)
+ # filter with adim's phone number
+ config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109')
+ source.repo_source.update_config(source, config)
+ cnx.commit()
+ with self.repo.internal_cnx() as cnx:
+ self.pull(cnx)
+ self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
+ with self.admin_access.repo_cnx() as cnx:
+ self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
+ 'U in_state S, S name N').rows[0][0],
+ 'deactivated')
+ self.assertEqual(cnx.execute('Any N WHERE U login "adim", '
+ 'U in_state S, S name N').rows[0][0],
+ 'activated')
+ # unfilter, syt should be activated again
+ config['user-filter'] = u''
+ source.repo_source.update_config(source, config)
+ cnx.commit()
+ with self.repo.internal_cnx() as cnx:
+ self.pull(cnx)
+ with self.admin_access.repo_cnx() as cnx:
+ self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
+ 'U in_state S, S name N').rows[0][0],
+ 'activated')
+ self.assertEqual(cnx.execute('Any N WHERE U login "adim", '
+ 'U in_state S, S name N').rows[0][0],
+ 'activated')
+
+ def test_delete(self):
+ """ delete syt, pull, check deactivation, repull,
+ read syt, pull, check activation
+ """
+ self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
+ with self.repo.internal_cnx() as cnx:
+ self.pull(cnx)
+ self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
+ with self.admin_access.repo_cnx() as cnx:
+ self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
+ 'U in_state S, S name N').rows[0][0],
+ 'deactivated')
+ with self.repo.internal_cnx() as cnx:
+ # check that it doesn't choke
+ self.pull(cnx)
+ # reinsert syt
+ self.add_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test',
+ {'objectClass': ['OpenLDAPperson', 'posixAccount', 'top',
+ 'shadowAccount'],
+ 'cn': 'Sylvain Thenault',
+ 'sn': 'Thenault',
+ 'gidNumber': '1004',
+ 'uid': 'syt',
+ 'homeDirectory': '/home/syt',
+ 'shadowFlag': '134538764',
+ 'uidNumber': '1004',
+ 'givenName': 'Sylvain',
+ 'telephoneNumber': '106',
+ 'displayName': 'sthenault',
+ 'gecos': 'Sylvain Thenault',
+ 'mail': ['sylvain.thenault@logilab.fr', 'syt@logilab.fr'],
+ 'userPassword': 'syt',
+ })
+ with self.repo.internal_cnx() as cnx:
+ self.pull(cnx)
+ with self.admin_access.repo_cnx() as cnx:
+ self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
+ 'U in_state S, S name N').rows[0][0],
+ 'activated')
+
+ def test_reactivate_deleted(self):
+ # test reactivating BY HAND the user isn't enough to
+ # authenticate, as the native source refuse to authenticate
+ # user from other sources
+ self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
+ with self.repo.internal_cnx() as cnx:
+ self.pull(cnx)
+ with self.admin_access.repo_cnx() as cnx:
+ # reactivate user (which source is still ldap-feed)
+ user = cnx.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
+ user.cw_adapt_to('IWorkflowable').fire_transition('activate')
+ cnx.commit()
+ with self.assertRaises(AuthenticationError):
+ self.repo.connect('syt', password='syt')
+
+ # ok now let's try to make it a system user
+ cnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': user.eid})
+ cnx.commit()
+ # and that we can now authenticate again
+ self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='toto')
+ sessionid = self.repo.connect('syt', password='syt')
+ self.assertTrue(sessionid)
+ self.repo.close(sessionid)
+
+
+class LDAPFeedGroupTC(LDAPFeedTestBase):
+ """
+ A testcase for group support in ldapfeed.
+ """
+
+ def test_groups_exist(self):
+ with self.admin_access.repo_cnx() as cnx:
+ rset = cnx.execute('CWGroup X WHERE X name "dir"')
+ self.assertEqual(len(rset), 1)
+
+ rset = cnx.execute('CWGroup X WHERE X cw_source S, S name "ldap"')
+ self.assertEqual(len(rset), 2)
+
+ def test_group_deleted(self):
+ with self.admin_access.repo_cnx() as cnx:
+ rset = cnx.execute('CWGroup X WHERE X name "dir"')
+ self.assertEqual(len(rset), 1)
+
+ def test_in_group(self):
+ with self.admin_access.repo_cnx() as cnx:
+ rset = cnx.execute('CWGroup X WHERE X name %(name)s', {'name': 'dir'})
+ dirgroup = rset.get_entity(0, 0)
+ self.assertEqual(set(['syt', 'adim']),
+ set([u.login for u in dirgroup.reverse_in_group]))
+ rset = cnx.execute('CWGroup X WHERE X name %(name)s', {'name': 'logilab'})
+ logilabgroup = rset.get_entity(0, 0)
+ self.assertEqual(set(['adim']),
+ set([u.login for u in logilabgroup.reverse_in_group]))
+
+ def test_group_member_added(self):
+ with self.repo.internal_cnx() as cnx:
+ self.pull(cnx)
+ with self.admin_access.repo_cnx() as cnx:
+ rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',
+ {'name': 'logilab'})
+ self.assertEqual(len(rset), 1)
+ self.assertEqual(rset[0][0], 'adim')
+
+ try:
+ self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',
+ {('add', 'memberUid'): ['syt']})
+ with self.repo.internal_cnx() as cnx:
+ self.pull(cnx)
+
+ with self.admin_access.repo_cnx() as cnx:
+ rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',
+ {'name': 'logilab'})
+ self.assertEqual(len(rset), 2)
+ members = set([u[0] for u in rset])
+ self.assertEqual(set(['adim', 'syt']), members)
+
+ finally:
+ # back to normal ldap setup
+ self.tearDownClass()
+ self.setUpClass()
+
+ def test_group_member_deleted(self):
+ with self.repo.internal_cnx() as cnx:
+ self.pull(cnx) # ensure we are sync'ed
+ with self.admin_access.repo_cnx() as cnx:
+ rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',
+ {'name': 'logilab'})
+ self.assertEqual(len(rset), 1)
+ self.assertEqual(rset[0][0], 'adim')
+
+ try:
+ self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',
+ {('delete', 'memberUid'): ['adim']})
+ with self.repo.internal_cnx() as cnx:
+ self.pull(cnx)
+
+ with self.admin_access.repo_cnx() as cnx:
+ rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',
+ {'name': 'logilab'})
+ self.assertEqual(len(rset), 0, rset.rows)
+ finally:
+ # back to normal ldap setup
+ self.tearDownClass()
+ self.setUpClass()
+
+
+if __name__ == '__main__':
+ from logilab.common.testlib import unittest_main
+ unittest_main()