server/test/unittest_ldapsource.py
author Julien Cristau <julien.cristau@logilab.fr>
Wed, 29 Jan 2014 15:50:19 +0100
changeset 9496 e699fbcc9a61
parent 9283 5f2c5eb1a820
child 9674 96549de9dd70
permissions -rw-r--r--
[server/repo] Use session.sessionid instead of session.id Silences deprecation warning.

# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
"""cubicweb.server.sources.ldapusers unit and functional tests"""

import os
import sys
import shutil
import time
from os.path import join, exists
import subprocess
import tempfile

from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags

from cubicweb import AuthenticationError
from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.devtools.repotest import RQLGeneratorTC
from cubicweb.devtools.httptest import get_available_port
from cubicweb.devtools import get_test_db_handler


CONFIG_LDAPFEED = u'''
user-base-dn=ou=People,dc=cubicweb,dc=test
group-base-dn=ou=Group,dc=cubicweb,dc=test
user-attrs-map=uid=login,mail=email,userPassword=upassword
group-attrs-map=cn=name,memberUid=member
'''
CONFIG_LDAPUSER = u'''
user-base-dn=ou=People,dc=cubicweb,dc=test
user-attrs-map=uid=login,mail=email,userPassword=upassword
'''

URL = None

def create_slapd_configuration(cls):
    global URL
    slapddir = tempfile.mkdtemp('cw-unittest-ldap')
    config = cls.config
    slapdconf = join(config.apphome, "slapd.conf")
    confin = file(join(config.apphome, "slapd.conf.in")).read()
    confstream = file(slapdconf, 'w')
    confstream.write(confin % {'apphome': config.apphome, 'testdir': slapddir})
    confstream.close()
    # fill ldap server with some data
    ldiffile = join(config.apphome, "ldap_test.ldif")
    config.info('Initing ldap database')
    cmdline = ['/usr/sbin/slapadd', '-f', slapdconf, '-l', ldiffile, '-c']
    PIPE = subprocess.PIPE
    slapproc = subprocess.Popen(cmdline, stdout=PIPE, stderr=PIPE)
    stdout, stderr = slapproc.communicate()
    if slapproc.returncode:
        print >> sys.stderr, ('slapadd returned with status: %s'
                              % slapproc.returncode)
        sys.stdout.write(stdout)
        sys.stderr.write(stderr)

    #ldapuri = 'ldapi://' + join(basedir, "ldapi").replace('/', '%2f')
    port = get_available_port(xrange(9000, 9100))
    host = 'localhost:%s' % port
    ldapuri = 'ldap://%s' % host
    cmdline = ["/usr/sbin/slapd", "-f",  slapdconf,  "-h",  ldapuri, "-d", "0"]
    config.info('Starting slapd:', ' '.join(cmdline))
    PIPE = subprocess.PIPE
    cls.slapd_process = subprocess.Popen(cmdline, stdout=PIPE, stderr=PIPE)
    time.sleep(0.2)
    if cls.slapd_process.poll() is None:
        config.info('slapd started with pid %s', cls.slapd_process.pid)
    else:
        raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' %
                               (" ".join(cmdline), os.getcwd()))
    URL = u'ldap://%s' % host
    return slapddir

def terminate_slapd(cls):
    config = cls.config
    if cls.slapd_process and cls.slapd_process.returncode is None:
        config.info('terminating slapd')
        if hasattr(cls.slapd_process, 'terminate'):
            cls.slapd_process.terminate()
        else:
            import os, signal
            os.kill(cls.slapd_process.pid, signal.SIGTERM)
        stdout, stderr = cls.slapd_process.communicate()
        if cls.slapd_process.returncode:
            print >> sys.stderr, ('slapd returned with status: %s'
                                  % cls.slapd_process.returncode)
            sys.stdout.write(stdout)
            sys.stderr.write(stderr)
        config.info('DONE')


class LDAPFeedTestBase(CubicWebTC):
    test_db_id = 'ldap-feed'
    loglevel = 'ERROR'

    @classmethod
    def setUpClass(cls):
        from cubicweb.cwctl import init_cmdline_log_threshold
        init_cmdline_log_threshold(cls.config, cls.loglevel)
        cls._tmpdir = create_slapd_configuration(cls)

    @classmethod
    def tearDownClass(cls):
        terminate_slapd(cls)
        try:
            shutil.rmtree(cls._tmpdir)
        except:
            pass

    @classmethod
    def pre_setup_database(cls, session, config):
        session.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed',
                              url=URL, config=CONFIG_LDAPFEED)

        session.commit()
        return cls._pull(session)

    @classmethod
    def _pull(cls, session):
        with session.repo.internal_session() as isession:
            lfsource = isession.repo.sources_by_uri['ldap']
            stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
            isession.commit()
            return stats

    def pull(self):
        return self._pull(self.session)

    def setup_database(self):
        with self.session.repo.internal_session(safe=True) as session:
            session.execute('DELETE Any E WHERE E cw_source S, S name "ldap"')
            session.execute('SET S config %(conf)s, S url %(url)s '
                            'WHERE S is CWSource, S name "ldap"',
                            {"conf": CONFIG_LDAPFEED, 'url': URL} )
            session.commit()
        self.pull()

    def add_ldap_entry(self, dn, mods):
        """
        add an LDAP entity
        """
        modcmd = ['dn: %s'%dn, 'changetype: add']
        for key, values in mods.iteritems():
            if isinstance(values, basestring):
                values = [values]
            for value in values:
                modcmd.append('%s: %s'%(key, value))
        self._ldapmodify(modcmd)

    def delete_ldap_entry(self, dn):
        """
        delete an LDAP entity
        """
        modcmd = ['dn: %s'%dn, 'changetype: delete']
        self._ldapmodify(modcmd)

    def update_ldap_entry(self, dn, mods):
        """
        modify one or more attributes of an LDAP entity
        """
        modcmd = ['dn: %s'%dn, 'changetype: modify']
        for (kind, key), values in mods.iteritems():
            modcmd.append('%s: %s' % (kind, key))
            if isinstance(values, basestring):
                values = [values]
            for value in values:
                modcmd.append('%s: %s'%(key, value))
            modcmd.append('-')
        self._ldapmodify(modcmd)

    def _ldapmodify(self, modcmd):
        uri = self.repo.sources_by_uri['ldap'].urls[0]
        updatecmd = ['ldapmodify', '-H', uri, '-v', '-x', '-D',
                     'cn=admin,dc=cubicweb,dc=test', '-w', 'cw']
        PIPE = subprocess.PIPE
        p = subprocess.Popen(updatecmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
        p.stdin.write('\n'.join(modcmd))
        p.stdin.close()
        if p.wait():
            raise RuntimeError("ldap update failed: %s"%('\n'.join(p.stderr.readlines())))

class CheckWrongGroup(LDAPFeedTestBase):
    """
    A testcase for situations where the default group for CWUser
    created from LDAP is wrongly configured.
    """

    def test_wrong_group(self):
        with self.session.repo.internal_session(safe=True) as session:
            source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
            config = source.repo_source.check_config(source)
            # inject a bogus group here, along with at least a valid one
            config['user-default-group'] = ('thisgroupdoesnotexists','users')
            source.repo_source.update_config(source, config)
            session.commit(free_cnxset=False)
            # here we emitted an error log entry
            stats = source.repo_source.pull_data(session, force=True, raise_on_error=True)
            session.commit()



class LDAPFeedUserTC(LDAPFeedTestBase):
    """
    A testcase for CWUser support in ldapfeed (basic tests and authentication).
    """

    def assertMetadata(self, entity):
        self.assertTrue(entity.creation_date)
        self.assertTrue(entity.modification_date)

    def test_authenticate(self):
        source = self.repo.sources_by_uri['ldap']
        self.session.set_cnxset()
        # ensure we won't be logged against
        self.assertRaises(AuthenticationError,
                          source.authenticate, self.session, 'toto', 'toto')
        self.assertTrue(source.authenticate(self.session, 'syt', 'syt'))
        self.assertTrue(self.repo.connect('syt', password='syt'))

    def test_base(self):
        # check a known one
        rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
        e = rset.get_entity(0, 0)
        self.assertEqual(e.login, 'syt')
        e.complete()
        self.assertMetadata(e)
        self.assertEqual(e.firstname, None)
        self.assertEqual(e.surname, None)
        self.assertTrue('users' in [g.name for g in e.in_group])
        self.assertEqual(e.owned_by[0].login, 'syt')
        self.assertEqual(e.created_by, ())
        addresses = [pe.address for pe in e.use_email]
        addresses.sort()
        self.assertEqual(['sylvain.thenault@logilab.fr', 'syt@logilab.fr'],
                         addresses)
        self.assertIn(e.primary_email[0].address, ['sylvain.thenault@logilab.fr',
                                                   'syt@logilab.fr'])
        # email content should be indexed on the user
        rset = self.sexecute('CWUser X WHERE X has_text "thenault"')
        self.assertEqual(rset.rows, [[e.eid]])

    def test_copy_to_system_source(self):
        "make sure we can 'convert' an LDAP user into a system one"
        source = self.repo.sources_by_uri['ldap']
        eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
        self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
        self.commit()
        source.reset_caches()
        rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
        self.assertEqual(len(rset), 1)
        e = rset.get_entity(0, 0)
        self.assertEqual(e.eid, eid)
        self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native',
                                                             'uri': u'system',
                                                             'use-cwuri-as-url': False},
                                                  'type': 'CWUser',
                                                  'extid': None})
        self.assertEqual(e.cw_source[0].name, 'system')
        self.assertTrue(e.creation_date)
        self.assertTrue(e.modification_date)
        source.pull_data(self.session)
        rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
        self.assertEqual(len(rset), 1)
        self.assertTrue(self.repo.system_source.authenticate(
                self.session, 'syt', password='syt'))
        # make sure the pull from ldap have not "reverted" user as a ldap-feed user
        self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native',
                                                             'uri': u'system',
                                                             'use-cwuri-as-url': False},
                                                  'type': 'CWUser',
                                                  'extid': None})
        # and that the password stored in the system source is not empty or so
        user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
        user.cw_clear_all_caches()
        pwd = self.session.system_sql("SELECT cw_upassword FROM cw_cwuser WHERE cw_login='syt';").fetchall()[0][0]
        self.assertIsNotNone(pwd)
        self.assertTrue(str(pwd))



class LDAPFeedUserDeletionTC(LDAPFeedTestBase):
    """
    A testcase for situations where users are deleted from or
    unavailabe in the LDAP database.
    """
    def test_a_filter_inactivate(self):
        """ filtered out people should be deactivated, unable to authenticate """
        source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
        config = source.repo_source.check_config(source)
        # filter with adim's phone number
        config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109')
        source.repo_source.update_config(source, config)
        self.commit()
        self.pull()
        self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
        self.assertEqual(self.execute('Any N WHERE U login "syt", '
                                      'U in_state S, S name N').rows[0][0],
                         'deactivated')
        self.assertEqual(self.execute('Any N WHERE U login "adim", '
                                      'U in_state S, S name N').rows[0][0],
                         'activated')
        # unfilter, syt should be activated again
        config['user-filter'] = u''
        source.repo_source.update_config(source, config)
        self.commit()
        self.pull()
        self.assertEqual(self.execute('Any N WHERE U login "syt", '
                                      'U in_state S, S name N').rows[0][0],
                         'activated')
        self.assertEqual(self.execute('Any N WHERE U login "adim", '
                                      'U in_state S, S name N').rows[0][0],
                         'activated')

    def test_delete(self):
        """ delete syt, pull, check deactivation, repull,
        read syt, pull, check activation
        """
        self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
        self.pull()
        self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
        self.assertEqual(self.execute('Any N WHERE U login "syt", '
                                      'U in_state S, S name N').rows[0][0],
                         'deactivated')
        # check that it doesn't choke
        self.pull()
        # reinsert syt
        self.add_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test',
                            { 'objectClass': ['OpenLDAPperson','posixAccount','top','shadowAccount'],
                              'cn': 'Sylvain Thenault',
                              'sn': 'Thenault',
                              'gidNumber': '1004',
                              'uid': 'syt',
                              'homeDirectory': '/home/syt',
                              'shadowFlag': '134538764',
                              'uidNumber': '1004',
                              'givenName': 'Sylvain',
                              'telephoneNumber': '106',
                              'displayName': 'sthenault',
                              'gecos': 'Sylvain Thenault',
                              'mail': ['sylvain.thenault@logilab.fr','syt@logilab.fr'],
                              'userPassword': 'syt',
                             })
        self.pull()
        self.assertEqual(self.execute('Any N WHERE U login "syt", '
                                      'U in_state S, S name N').rows[0][0],
                         'activated')

    def test_reactivate_deleted(self):
        # test reactivating BY HAND the user isn't enough to
        # authenticate, as the native source refuse to authenticate
        # user from other sources
        self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
        self.pull()
        # reactivate user (which source is still ldap-feed)
        user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
        user.cw_adapt_to('IWorkflowable').fire_transition('activate')
        self.commit()
        with self.assertRaises(AuthenticationError):
            self.repo.connect('syt', password='syt')

        # ok now let's try to make it a system user
        self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': user.eid})
        self.commit()
        # and that we can now authenticate again
        self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='toto')
        self.assertTrue(self.repo.connect('syt', password='syt'))

class LDAPFeedGroupTC(LDAPFeedTestBase):
    """
    A testcase for group support in ldapfeed.
    """

    def test_groups_exist(self):
        rset = self.sexecute('CWGroup X WHERE X name "dir"')
        self.assertEqual(len(rset), 1)

        rset = self.sexecute('CWGroup X WHERE X cw_source S, S name "ldap"')
        self.assertEqual(len(rset), 2)

    def test_group_deleted(self):
        rset = self.sexecute('CWGroup X WHERE X name "dir"')
        self.assertEqual(len(rset), 1)

    def test_in_group(self):
        rset = self.sexecute('CWGroup X WHERE X name %(name)s', {'name': 'dir'})
        dirgroup = rset.get_entity(0, 0)
        self.assertEqual(set(['syt', 'adim']),
                         set([u.login for u in dirgroup.reverse_in_group]))
        rset = self.sexecute('CWGroup X WHERE X name %(name)s', {'name': 'logilab'})
        logilabgroup = rset.get_entity(0, 0)
        self.assertEqual(set(['adim']),
                         set([u.login for u in logilabgroup.reverse_in_group]))

    def test_group_member_added(self):
        self.pull()
        rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
                             {'name': 'logilab'})
        self.assertEqual(len(rset), 1)
        self.assertEqual(rset[0][0], 'adim')

        try:
            self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',
                                   {('add', 'memberUid'): ['syt']})
            time.sleep(1.1) # timestamps precision is 1s
            self.pull()

            rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
                                 {'name': 'logilab'})
            self.assertEqual(len(rset), 2)
            members = set([u[0] for u in rset])
            self.assertEqual(set(['adim', 'syt']), members)

        finally:
            # back to normal ldap setup
            self.tearDownClass()
            self.setUpClass()

    def test_group_member_deleted(self):
        self.pull() # ensure we are sync'ed
        rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
                             {'name': 'logilab'})
        self.assertEqual(len(rset), 1)
        self.assertEqual(rset[0][0], 'adim')

        try:
            self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',
                                   {('delete', 'memberUid'): ['adim']})
            time.sleep(1.1) # timestamps precision is 1s
            self.pull()

            rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
                                 {'name': 'logilab'})
            self.assertEqual(len(rset), 0)
        finally:
            # back to normal ldap setup
            self.tearDownClass()
            self.setUpClass()



if __name__ == '__main__':
    unittest_main()