server/test/unittest_ldapuser.py
changeset 8683 d537786e52b8
parent 8668 4fea61c636b2
parent 8681 48731a0d3df8
child 8684 6c7c2a02c9a0
equal deleted inserted replaced
8682:20bd1cdf86ae 8683:d537786e52b8
    21 import os
    21 import os
    22 import shutil
    22 import shutil
    23 import time
    23 import time
    24 from os.path import join, exists
    24 from os.path import join, exists
    25 import subprocess
    25 import subprocess
       
    26 import tempfile
    26 
    27 
    27 from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags
    28 from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags
    28 
    29 
    29 from cubicweb import AuthenticationError
    30 from cubicweb import AuthenticationError
    30 from cubicweb.devtools.testlib import CubicWebTC
    31 from cubicweb.devtools.testlib import CubicWebTC
    37 CONFIG = u'user-base-dn=ou=People,dc=cubicweb,dc=test'
    38 CONFIG = u'user-base-dn=ou=People,dc=cubicweb,dc=test'
    38 URL = None
    39 URL = None
    39 
    40 
    40 def create_slapd_configuration(cls):
    41 def create_slapd_configuration(cls):
    41     global URL
    42     global URL
       
    43     slapddir = tempfile.mkdtemp('cw-unittest-ldap')
    42     config = cls.config
    44     config = cls.config
    43     basedir = join(config.apphome, "ldapdb")
       
    44     slapdconf = join(config.apphome, "slapd.conf")
    45     slapdconf = join(config.apphome, "slapd.conf")
    45     confin = file(join(config.apphome, "slapd.conf.in")).read()
    46     confin = file(join(config.apphome, "slapd.conf.in")).read()
    46     confstream = file(slapdconf, 'w')
    47     confstream = file(slapdconf, 'w')
    47     confstream.write(confin % {'apphome': config.apphome})
    48     confstream.write(confin % {'apphome': config.apphome, 'testdir': slapddir})
    48     confstream.close()
    49     confstream.close()
    49     if exists(basedir):
       
    50         shutil.rmtree(basedir)
       
    51     os.makedirs(basedir)
       
    52     # fill ldap server with some data
    50     # fill ldap server with some data
    53     ldiffile = join(config.apphome, "ldap_test.ldif")
    51     ldiffile = join(config.apphome, "ldap_test.ldif")
    54     config.info('Initing ldap database')
    52     config.info('Initing ldap database')
    55     cmdline = "/usr/sbin/slapadd -f %s -l %s -c" % (slapdconf, ldiffile)
    53     cmdline = "/usr/sbin/slapadd -f %s -l %s -c" % (slapdconf, ldiffile)
    56     subprocess.check_call(cmdline, shell=True) == 0
    54     subprocess.check_call(cmdline, shell=True) == 0
    67         config.info('slapd started with pid %s' % cls.slapd_process.pid)
    65         config.info('slapd started with pid %s' % cls.slapd_process.pid)
    68     else:
    66     else:
    69         raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' %
    67         raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' %
    70                                (" ".join(cmdline), os.getcwd()))
    68                                (" ".join(cmdline), os.getcwd()))
    71     URL = u'ldap://%s' % host
    69     URL = u'ldap://%s' % host
       
    70     return slapddir
    72 
    71 
    73 def terminate_slapd(cls):
    72 def terminate_slapd(cls):
    74     config = cls.config
    73     config = cls.config
    75     if cls.slapd_process and cls.slapd_process.returncode is None:
    74     if cls.slapd_process and cls.slapd_process.returncode is None:
    76         config.info('terminating slapd')
    75         config.info('terminating slapd')
    87 
    86 
    88     @classmethod
    87     @classmethod
    89     def setUpClass(cls):
    88     def setUpClass(cls):
    90         from cubicweb.cwctl import init_cmdline_log_threshold
    89         from cubicweb.cwctl import init_cmdline_log_threshold
    91         init_cmdline_log_threshold(cls.config, cls.loglevel)
    90         init_cmdline_log_threshold(cls.config, cls.loglevel)
    92         create_slapd_configuration(cls)
    91         cls._tmpdir = create_slapd_configuration(cls)
    93 
    92 
    94     @classmethod
    93     @classmethod
    95     def tearDownClass(cls):
    94     def tearDownClass(cls):
    96         terminate_slapd(cls)
    95         terminate_slapd(cls)
       
    96         try:
       
    97             shutil.rmtree(cls._tmpdir)
       
    98         except:
       
    99             pass
       
   100 
       
   101 class CheckWrongGroup(LDAPTestBase):
       
   102 
       
   103     def test_wrong_group(self):
       
   104         self.session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed',
       
   105                                    url=URL, config=CONFIG)
       
   106         self.commit()
       
   107         with self.session.repo.internal_session(safe=True) as session:
       
   108             source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
       
   109             config = source.repo_source.check_config(source)
       
   110             # inject a bogus group here, along with at least a valid one
       
   111             config['user-default-group'] = ('thisgroupdoesnotexists','users')
       
   112             source.repo_source.update_config(source, config)
       
   113             session.commit(free_cnxset=False)
       
   114             # here we emitted an error log entry
       
   115             stats = source.repo_source.pull_data(session, force=True, raise_on_error=True)
       
   116             session.commit()
    97 
   117 
    98     def setUp(self):
   118     def setUp(self):
    99         super(LDAPTestBase, self).setUp()
   119         super(LDAPTestBase, self).setUp()
   100         # ldap source url in the database may use a different port as the one
   120         # ldap source url in the database may use a different port as the one
   101         # just attributed
   121         # just attributed
   242         self.assertTrue(e.creation_date)
   262         self.assertTrue(e.creation_date)
   243         self.assertTrue(e.modification_date)
   263         self.assertTrue(e.modification_date)
   244         source.pull_data(self.session)
   264         source.pull_data(self.session)
   245         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
   265         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
   246         self.assertEqual(len(rset), 1)
   266         self.assertEqual(len(rset), 1)
   247         # test some password has been set
       
   248         cu = self.session.system_sql('SELECT cw_upassword FROM cw_CWUser WHERE cw_eid=%s' % rset[0][0])
       
   249         value = str(cu.fetchall()[0][0])
       
   250         self.assertEqual(value, '{SSHA}v/8xJQP3uoaTBZz1T7Y0B3qOxRN1cj7D')
       
   251         self.assertTrue(self.repo.system_source.authenticate(
   267         self.assertTrue(self.repo.system_source.authenticate(
   252                 self.session, 'syt', password='syt'))
   268                 self.session, 'syt', password='syt'))
   253 
   269 
   254 
   270 
   255 class LDAPUserSourceTC(LDAPFeedSourceTC):
   271 class LDAPUserSourceTC(LDAPFeedSourceTC):