server/test/unittest_ldapuser.py
changeset 8430 5bee87a14bb1
parent 8387 b59af20a868d
child 8434 39c5bb4dcc59
equal deleted inserted replaced
8429:cad2d8e03b33 8430:5bee87a14bb1
    35 from cubicweb.server.sources.ldapuser import *
    35 from cubicweb.server.sources.ldapuser import *
    36 
    36 
    37 CONFIG = u'user-base-dn=ou=People,dc=cubicweb,dc=test'
    37 CONFIG = u'user-base-dn=ou=People,dc=cubicweb,dc=test'
    38 URL = None
    38 URL = None
    39 
    39 
    40 def setUpModule(*args):
       
    41     create_slapd_configuration(LDAPUserSourceTC.config)
       
    42 
       
    43 def tearDownModule(*args):
       
    44     terminate_slapd()
       
    45 
       
    46 def create_slapd_configuration(config):
    40 def create_slapd_configuration(config):
    47     global slapd_process, URL
    41     global slapd_process, URL
    48     basedir = join(config.apphome, "ldapdb")
    42     basedir = join(config.apphome, "ldapdb")
    49     slapdconf = join(config.apphome, "slapd.conf")
    43     slapdconf = join(config.apphome, "slapd.conf")
    50     confin = file(join(config.apphome, "slapd.conf.in")).read()
    44     confin = file(join(config.apphome, "slapd.conf.in")).read()
    51     confstream = file(slapdconf, 'w')
    45     confstream = file(slapdconf, 'w')
    52     confstream.write(confin % {'apphome': config.apphome})
    46     confstream.write(confin % {'apphome': config.apphome})
    53     confstream.close()
    47     confstream.close()
    54     if not exists(basedir):
    48     if exists(basedir):
    55         os.makedirs(basedir)
    49         shutil.rmtree(basedir)
    56         # fill ldap server with some data
    50     os.makedirs(basedir)
    57         ldiffile = join(config.apphome, "ldap_test.ldif")
    51     # fill ldap server with some data
    58         print "Initing ldap database"
    52     ldiffile = join(config.apphome, "ldap_test.ldif")
    59         cmdline = "/usr/sbin/slapadd -f %s -l %s -c" % (slapdconf, ldiffile)
    53     config.info('Initing ldap database')
    60         subprocess.call(cmdline, shell=True)
    54     cmdline = "/usr/sbin/slapadd -f %s -l %s -c" % (slapdconf, ldiffile)
    61 
    55     subprocess.call(cmdline, shell=True)
    62 
    56 
    63     #ldapuri = 'ldapi://' + join(basedir, "ldapi").replace('/', '%2f')
    57     #ldapuri = 'ldapi://' + join(basedir, "ldapi").replace('/', '%2f')
    64     port = get_available_port(xrange(9000, 9100))
    58     port = get_available_port(xrange(9000, 9100))
    65     host = 'localhost:%s' % port
    59     host = 'localhost:%s' % port
    66     ldapuri = 'ldap://%s' % host
    60     ldapuri = 'ldap://%s' % host
    67     cmdline = ["/usr/sbin/slapd", "-f",  slapdconf,  "-h",  ldapuri, "-d", "0"]
    61     cmdline = ["/usr/sbin/slapd", "-f",  slapdconf,  "-h",  ldapuri, "-d", "0"]
    68     print 'Starting slapd:', ' '.join(cmdline)
    62     config.info('Starting slapd:', ' '.join(cmdline))
    69     slapd_process = subprocess.Popen(cmdline)
    63     slapd_process = subprocess.Popen(cmdline)
    70     time.sleep(0.2)
    64     time.sleep(0.2)
    71     if slapd_process.poll() is None:
    65     if slapd_process.poll() is None:
    72         print "slapd started with pid %s" % slapd_process.pid
    66         config.info('slapd started with pid %s' % slapd_process.pid)
    73     else:
    67     else:
    74         raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' %
    68         raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' %
    75                                (" ".join(cmdline), os.getcwd()))
    69                                (" ".join(cmdline), os.getcwd()))
    76     URL = u'ldap://%s' % host
    70     URL = u'ldap://%s' % host
    77 
    71 
    78 def terminate_slapd():
    72 def terminate_slapd(config):
    79     global slapd_process
    73     global slapd_process
    80     if slapd_process.returncode is None:
    74     if slapd_process.returncode is None:
    81         print "terminating slapd"
    75         config.info('terminating slapd')
    82         if hasattr(slapd_process, 'terminate'):
    76         if hasattr(slapd_process, 'terminate'):
    83             slapd_process.terminate()
    77             slapd_process.terminate()
    84         else:
    78         else:
    85             import os, signal
    79             import os, signal
    86             os.kill(slapd_process.pid, signal.SIGTERM)
    80             os.kill(slapd_process.pid, signal.SIGTERM)
    87         slapd_process.wait()
    81         slapd_process.wait()
    88         print "DONE"
    82         config.info('DONE')
    89     del slapd_process
    83     del slapd_process
    90 
    84 
    91 
    85 
    92 
    86 class LDAPTestBase(CubicWebTC):
    93 
    87     loglevel = 'ERROR'
    94 class LDAPFeedSourceTC(CubicWebTC):
    88 
       
    89     @classmethod
       
    90     def setUpClass(cls):
       
    91         from cubicweb.cwctl import init_cmdline_log_threshold
       
    92         init_cmdline_log_threshold(cls.config, cls.loglevel)
       
    93         create_slapd_configuration(cls.config)
       
    94 
       
    95     @classmethod
       
    96     def tearDownClass(cls):
       
    97         terminate_slapd(cls.config)
       
    98 
       
    99 class DeleteStuffFromLDAPFeedSourceTC(LDAPTestBase):
       
   100     test_db_id = 'ldap-feed'
       
   101 
       
   102     @classmethod
       
   103     def pre_setup_database(cls, session, config):
       
   104         session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed',
       
   105                               url=URL, config=CONFIG)
       
   106         session.commit()
       
   107         isession = session.repo.internal_session(safe=True)
       
   108         lfsource = isession.repo.sources_by_uri['ldapuser']
       
   109         stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
       
   110 
       
   111     def test_delete(self):
       
   112         uri = self.repo.sources_by_uri['ldapuser'].urls[0]
       
   113         from subprocess import call
       
   114         deletecmd = ("ldapdelete -H %s 'uid=syt,ou=People,dc=cubicweb,dc=test' "
       
   115                      "-v -x -D cn=admin,dc=cubicweb,dc=test -w'cw'" % uri)
       
   116         os.system(deletecmd)
       
   117         isession = self.session.repo.internal_session(safe=False)
       
   118         from cubicweb.server.session import security_enabled
       
   119         with security_enabled(isession, read=False, write=False):
       
   120             lfsource = isession.repo.sources_by_uri['ldapuser']
       
   121             stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
       
   122             isession.commit()
       
   123         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
       
   124         self.assertEqual(self.execute('Any N WHERE U login "syt", '
       
   125                                       'U in_state S, S name N').rows[0][0],
       
   126                          'deactivated')
       
   127 
       
   128 
       
   129 
       
   130 class LDAPFeedSourceTC(LDAPTestBase):
    95     test_db_id = 'ldap-feed'
   131     test_db_id = 'ldap-feed'
    96 
   132 
    97     @classmethod
   133     @classmethod
    98     def pre_setup_database(cls, session, config):
   134     def pre_setup_database(cls, session, config):
    99         session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed',
   135         session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed',
   148         source.reset_caches()
   184         source.reset_caches()
   149         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
   185         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
   150         self.assertEqual(len(rset), 1)
   186         self.assertEqual(len(rset), 1)
   151         e = rset.get_entity(0, 0)
   187         e = rset.get_entity(0, 0)
   152         self.assertEqual(e.eid, eid)
   188         self.assertEqual(e.eid, eid)
   153         self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', 'uri': u'system', 'use-cwuri-as-url': False},
   189         self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native',
       
   190                                                              'uri': u'system',
       
   191                                                              'use-cwuri-as-url': False},
   154                                                   'type': 'CWUser',
   192                                                   'type': 'CWUser',
   155                                                   'extid': None})
   193                                                   'extid': None})
   156         self.assertEqual(e.cw_source[0].name, 'system')
   194         self.assertEqual(e.cw_source[0].name, 'system')
   157         self.assertTrue(e.creation_date)
   195         self.assertTrue(e.creation_date)
   158         self.assertTrue(e.modification_date)
   196         self.assertTrue(e.modification_date)