server/test/unittest_ldapuser.py
changeset 8434 39c5bb4dcc59
parent 8430 5bee87a14bb1
child 8438 2c79c29193e7
equal deleted inserted replaced
8433:ff9d6d269877 8434:39c5bb4dcc59
    18 """cubicweb.server.sources.ldapusers unit and functional tests"""
    18 """cubicweb.server.sources.ldapusers unit and functional tests"""
    19 
    19 
    20 import os
    20 import os
    21 import shutil
    21 import shutil
    22 import time
    22 import time
    23 from os.path import abspath, join, exists
    23 from os.path import join, exists
    24 import subprocess
    24 import subprocess
    25 from socket import socket, error as socketerror
       
    26 
    25 
    27 from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags
    26 from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags
    28 
    27 
    29 from cubicweb import AuthenticationError
    28 from cubicweb import AuthenticationError
    30 from cubicweb.devtools.testlib import CubicWebTC
    29 from cubicweb.devtools.testlib import CubicWebTC
    31 from cubicweb.devtools.repotest import RQLGeneratorTC
    30 from cubicweb.devtools.repotest import RQLGeneratorTC
    32 from cubicweb.devtools.httptest import get_available_port
    31 from cubicweb.devtools.httptest import get_available_port
    33 from cubicweb.devtools import get_test_db_handler
    32 from cubicweb.devtools import get_test_db_handler
    34 
    33 
    35 from cubicweb.server.sources.ldapuser import *
    34 from cubicweb.server.session import security_enabled
       
    35 from cubicweb.server.sources.ldapuser import GlobTrFunc, UnknownEid, RQL2LDAPFilter
    36 
    36 
    37 CONFIG = u'user-base-dn=ou=People,dc=cubicweb,dc=test'
    37 CONFIG = u'user-base-dn=ou=People,dc=cubicweb,dc=test'
    38 URL = None
    38 URL = None
    39 
    39 
    40 def create_slapd_configuration(config):
    40 def create_slapd_configuration(cls):
    41     global slapd_process, URL
    41     global URL
       
    42     config = cls.config
    42     basedir = join(config.apphome, "ldapdb")
    43     basedir = join(config.apphome, "ldapdb")
    43     slapdconf = join(config.apphome, "slapd.conf")
    44     slapdconf = join(config.apphome, "slapd.conf")
    44     confin = file(join(config.apphome, "slapd.conf.in")).read()
    45     confin = file(join(config.apphome, "slapd.conf.in")).read()
    45     confstream = file(slapdconf, 'w')
    46     confstream = file(slapdconf, 'w')
    46     confstream.write(confin % {'apphome': config.apphome})
    47     confstream.write(confin % {'apphome': config.apphome})
    58     port = get_available_port(xrange(9000, 9100))
    59     port = get_available_port(xrange(9000, 9100))
    59     host = 'localhost:%s' % port
    60     host = 'localhost:%s' % port
    60     ldapuri = 'ldap://%s' % host
    61     ldapuri = 'ldap://%s' % host
    61     cmdline = ["/usr/sbin/slapd", "-f",  slapdconf,  "-h",  ldapuri, "-d", "0"]
    62     cmdline = ["/usr/sbin/slapd", "-f",  slapdconf,  "-h",  ldapuri, "-d", "0"]
    62     config.info('Starting slapd:', ' '.join(cmdline))
    63     config.info('Starting slapd:', ' '.join(cmdline))
    63     slapd_process = subprocess.Popen(cmdline)
    64     cls.slapd_process = subprocess.Popen(cmdline)
    64     time.sleep(0.2)
    65     time.sleep(0.2)
    65     if slapd_process.poll() is None:
    66     if cls.slapd_process.poll() is None:
    66         config.info('slapd started with pid %s' % slapd_process.pid)
    67         config.info('slapd started with pid %s' % cls.slapd_process.pid)
    67     else:
    68     else:
    68         raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' %
    69         raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' %
    69                                (" ".join(cmdline), os.getcwd()))
    70                                (" ".join(cmdline), os.getcwd()))
    70     URL = u'ldap://%s' % host
    71     URL = u'ldap://%s' % host
    71 
    72 
    72 def terminate_slapd(config):
    73 def terminate_slapd(cls):
    73     global slapd_process
    74     config = cls.config
    74     if slapd_process.returncode is None:
    75     if cls.slapd_process and cls.slapd_process.returncode is None:
    75         config.info('terminating slapd')
    76         config.info('terminating slapd')
    76         if hasattr(slapd_process, 'terminate'):
    77         if hasattr(cls.slapd_process, 'terminate'):
    77             slapd_process.terminate()
    78             cls.slapd_process.terminate()
    78         else:
    79         else:
    79             import os, signal
    80             import os, signal
    80             os.kill(slapd_process.pid, signal.SIGTERM)
    81             os.kill(cls.slapd_process.pid, signal.SIGTERM)
    81         slapd_process.wait()
    82         cls.slapd_process.wait()
    82         config.info('DONE')
    83         config.info('DONE')
    83     del slapd_process
       
    84 
       
    85 
    84 
    86 class LDAPTestBase(CubicWebTC):
    85 class LDAPTestBase(CubicWebTC):
    87     loglevel = 'ERROR'
    86     loglevel = 'ERROR'
    88 
    87 
    89     @classmethod
    88     @classmethod
    90     def setUpClass(cls):
    89     def setUpClass(cls):
    91         from cubicweb.cwctl import init_cmdline_log_threshold
    90         from cubicweb.cwctl import init_cmdline_log_threshold
    92         init_cmdline_log_threshold(cls.config, cls.loglevel)
    91         init_cmdline_log_threshold(cls.config, cls.loglevel)
    93         create_slapd_configuration(cls.config)
    92         create_slapd_configuration(cls)
    94 
    93 
    95     @classmethod
    94     @classmethod
    96     def tearDownClass(cls):
    95     def tearDownClass(cls):
    97         terminate_slapd(cls.config)
    96         terminate_slapd(cls)
    98 
    97 
    99 class DeleteStuffFromLDAPFeedSourceTC(LDAPTestBase):
    98 class DeleteStuffFromLDAPFeedSourceTC(LDAPTestBase):
   100     test_db_id = 'ldap-feed'
    99     test_db_id = 'ldap-feed'
   101 
   100 
   102     @classmethod
   101     @classmethod
   106         session.commit()
   105         session.commit()
   107         isession = session.repo.internal_session(safe=True)
   106         isession = session.repo.internal_session(safe=True)
   108         lfsource = isession.repo.sources_by_uri['ldapuser']
   107         lfsource = isession.repo.sources_by_uri['ldapuser']
   109         stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
   108         stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
   110 
   109 
       
   110     def _pull(self):
       
   111         with self.session.repo.internal_session() as isession:
       
   112             with security_enabled(isession, read=False, write=False):
       
   113                 lfsource = isession.repo.sources_by_uri['ldapuser']
       
   114                 stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
       
   115                 isession.commit()
       
   116 
   111     def test_delete(self):
   117     def test_delete(self):
       
   118         """ delete syt, pull, check deactivation, repull,
       
   119         readd syt, pull, check activation
       
   120         """
   112         uri = self.repo.sources_by_uri['ldapuser'].urls[0]
   121         uri = self.repo.sources_by_uri['ldapuser'].urls[0]
   113         from subprocess import call
       
   114         deletecmd = ("ldapdelete -H %s 'uid=syt,ou=People,dc=cubicweb,dc=test' "
   122         deletecmd = ("ldapdelete -H %s 'uid=syt,ou=People,dc=cubicweb,dc=test' "
   115                      "-v -x -D cn=admin,dc=cubicweb,dc=test -w'cw'" % uri)
   123                      "-v -x -D cn=admin,dc=cubicweb,dc=test -w'cw'" % uri)
   116         os.system(deletecmd)
   124         os.system(deletecmd)
   117         isession = self.session.repo.internal_session(safe=False)
   125         self._pull()
   118         from cubicweb.server.session import security_enabled
       
   119         with security_enabled(isession, read=False, write=False):
       
   120             lfsource = isession.repo.sources_by_uri['ldapuser']
       
   121             stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
       
   122             isession.commit()
       
   123         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
   126         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
   124         self.assertEqual(self.execute('Any N WHERE U login "syt", '
   127         self.assertEqual(self.execute('Any N WHERE U login "syt", '
   125                                       'U in_state S, S name N').rows[0][0],
   128                                       'U in_state S, S name N').rows[0][0],
   126                          'deactivated')
   129                          'deactivated')
   127 
   130         # check that it doesn't choke
   128 
   131         self._pull()
       
   132         # reset the fscking ldap thing
       
   133         self.tearDownClass()
       
   134         self.setUpClass()
       
   135         self._pull()
       
   136         # still deactivated, but a warning has been emitted ...
       
   137         self.assertEqual(self.execute('Any N WHERE U login "syt", '
       
   138                                       'U in_state S, S name N').rows[0][0],
       
   139                          'deactivated')
   129 
   140 
   130 class LDAPFeedSourceTC(LDAPTestBase):
   141 class LDAPFeedSourceTC(LDAPTestBase):
   131     test_db_id = 'ldap-feed'
   142     test_db_id = 'ldap-feed'
   132 
   143 
   133     @classmethod
   144     @classmethod