server/test/unittest_ldapuser.py
branchstable
changeset 6887 72d7feeb071e
parent 6781 5062d86d6ffe
child 6888 c02e5ba43366
equal deleted inserted replaced
6886:b571d2d32971 6887:72d7feeb071e
    15 #
    15 #
    16 # You should have received a copy of the GNU Lesser General Public License along
    16 # You should have received a copy of the GNU Lesser General Public License along
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
    18 """cubicweb.server.sources.ldapusers unit and functional tests"""
    18 """cubicweb.server.sources.ldapusers unit and functional tests"""
    19 
    19 
    20 import socket
    20 import os
       
    21 import shutil
       
    22 import time
       
    23 from os.path import abspath, join, exists
       
    24 import subprocess
       
    25 from socket import socket, error as socketerror
    21 
    26 
    22 from logilab.common.testlib import TestCase, unittest_main, mock_object
    27 from logilab.common.testlib import TestCase, unittest_main, mock_object
    23 from cubicweb.devtools.testlib import CubicWebTC
    28 from cubicweb.devtools.testlib import CubicWebTC
    24 from cubicweb.devtools.repotest import RQLGeneratorTC
    29 from cubicweb.devtools.repotest import RQLGeneratorTC
    25 
    30 
    26 from cubicweb.server.sources.ldapuser import *
    31 from cubicweb.server.sources.ldapuser import *
    27 
    32 
    28 if '17.1' in socket.gethostbyname('ldap1'):
    33 SYT = 'syt'
    29     SYT = 'syt'
    34 SYT_EMAIL = 'Sylvain Thenault'
    30     SYT_EMAIL = 'Sylvain Thenault'
    35 ADIM = 'adim'
    31     ADIM = 'adim'
    36 CONFIG = u'''host=%s
    32     CONFIG = u'''host=ldap1
    37 user-base-dn=ou=People,dc=cubicweb,dc=test
    33 user-base-dn=ou=People,dc=logilab,dc=fr
       
    34 user-scope=ONELEVEL
    38 user-scope=ONELEVEL
    35 user-classes=top,posixAccount
    39 user-classes=top,posixAccount
    36 user-login-attr=uid
    40 user-login-attr=uid
    37 user-default-group=users
    41 user-default-group=users
    38 user-attrs-map=gecos:email,uid:login
    42 user-attrs-map=gecos:email,uid:login
    39 '''
       
    40 else:
       
    41     SYT = 'sthenault'
       
    42     SYT_EMAIL = 'sylvain.thenault@logilab.fr'
       
    43     ADIM = 'adimascio'
       
    44     CONFIG = u'''host=ldap1
       
    45 user-base-dn=ou=People,dc=logilab,dc=net
       
    46 user-scope=ONELEVEL
       
    47 user-classes=top,OpenLDAPperson
       
    48 user-login-attr=uid
       
    49 user-default-group=users
       
    50 user-attrs-map=mail:email,uid:login
       
    51 '''
    43 '''
    52 
    44 
    53 
    45 
    54 def nopwd_authenticate(self, session, login, password):
    46 def nopwd_authenticate(self, session, login, password):
    55     """used to monkey patch the source to get successful authentication without
    47     """used to monkey patch the source to get successful authentication without
    69         raise AuthenticationError()
    61         raise AuthenticationError()
    70     # don't check upassword !
    62     # don't check upassword !
    71     return self.extid2eid(user['dn'], 'CWUser', session)
    63     return self.extid2eid(user['dn'], 'CWUser', session)
    72 
    64 
    73 def setUpModule(*args):
    65 def setUpModule(*args):
       
    66     create_slapd_configuration(LDAPUserSourceTC.config)
    74     global repo
    67     global repo
    75     LDAPUserSourceTC._init_repo()
    68     try:
    76     repo = LDAPUserSourceTC.repo
    69         LDAPUserSourceTC._init_repo()
    77     add_ldap_source(LDAPUserSourceTC.cnx)
    70         repo = LDAPUserSourceTC.repo
       
    71         add_ldap_source(LDAPUserSourceTC.cnx)
       
    72     except:
       
    73         terminate_slapd()
       
    74         raise
    78 
    75 
    79 def tearDownModule(*args):
    76 def tearDownModule(*args):
    80     global repo
    77     global repo
    81     repo.shutdown()
    78     repo.shutdown()
    82     del repo
    79     del repo
       
    80     terminate_slapd()
    83 
    81 
    84 def add_ldap_source(cnx):
    82 def add_ldap_source(cnx):
    85     cnx.request().create_entity('CWSource', name=u'ldapuser', type=u'ldapuser',
    83     cnx.request().create_entity('CWSource', name=u'ldapuser', type=u'ldapuser',
    86                                 config=CONFIG)
    84                                 config=CONFIG)
    87     cnx.commit()
    85     cnx.commit()
    91     # so doing, ldap inserted users don't get removed between each test
    89     # so doing, ldap inserted users don't get removed between each test
    92     rset = cnx.cursor().execute('CWUser X')
    90     rset = cnx.cursor().execute('CWUser X')
    93     # check we get some users from ldap
    91     # check we get some users from ldap
    94     assert len(rset) > 1
    92     assert len(rset) > 1
    95 
    93 
       
    94 def create_slapd_configuration(config):
       
    95     global slapd_process, CONFIG
       
    96     basedir = join(config.apphome, "ldapdb")
       
    97     slapdconf = join(config.apphome, "slapd.conf")
       
    98     if not exists(basedir):
       
    99         os.makedirs(basedir)
       
   100         # fill ldap server with some data
       
   101         ldiffile = join(config.apphome, "ldap_test.ldif")
       
   102         print "Initing ldap database"
       
   103         cmdline = "/usr/sbin/slapadd -f %s -l %s -c" % (slapdconf, ldiffile)
       
   104         subprocess.call(cmdline, shell=True)
       
   105 
       
   106 
       
   107     #ldapuri = 'ldapi://' + join(basedir, "ldapi").replace('/', '%2f')
       
   108     for port in range(9000, 9100):
       
   109         try:
       
   110             socket().bind(('localhost', port))
       
   111         except socketerror, e:
       
   112             if e.errno == 98: # Address already in use
       
   113                 pass
       
   114             else:
       
   115                 raise
       
   116         else:
       
   117             break
       
   118     else:
       
   119         raise Exception("Can't find a free TCP port on localhost")
       
   120 
       
   121     host = 'localhost:%s' % port
       
   122     ldapuri = 'ldap://%s' % host
       
   123     cmdline = ["/usr/sbin/slapd", "-f",  slapdconf,  "-h",  ldapuri, "-d", "0"]
       
   124     print "Starting slapd on", ldapuri
       
   125     slapd_process = subprocess.Popen(cmdline)
       
   126     time.sleep(0.2)
       
   127     if slapd_process.poll() is None:
       
   128         print "slapd started with pid %s" % slapd_process.pid
       
   129     else:
       
   130         raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' %
       
   131                                (" ".join(cmdline), os.getcwd()))
       
   132     CONFIG = CONFIG % host
       
   133 
       
   134 def terminate_slapd():
       
   135     global slapd_process
       
   136     if slapd_process.returncode is None:
       
   137         print "terminating slapd"
       
   138         slapd_process.terminate()
       
   139         slapd_process.wait()
       
   140         print "DONE"
       
   141 
       
   142     del slapd_process
    96 
   143 
    97 class LDAPUserSourceTC(CubicWebTC):
   144 class LDAPUserSourceTC(CubicWebTC):
    98 
   145 
    99     def patch_authenticate(self):
   146     def patch_authenticate(self):
   100         self._orig_authenticate = LDAPUserSource.authenticate
   147         self._orig_authenticate = LDAPUserSource.authenticate
   115         source = self.repo.sources_by_uri['ldapuser']
   162         source = self.repo.sources_by_uri['ldapuser']
   116         source.synchronize()
   163         source.synchronize()
   117 
   164 
   118     def test_base(self):
   165     def test_base(self):
   119         # check a known one
   166         # check a known one
   120         e = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT}).get_entity(0, 0)
   167         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})
       
   168         e = rset.get_entity(0, 0)
   121         self.assertEqual(e.login, SYT)
   169         self.assertEqual(e.login, SYT)
   122         e.complete()
   170         e.complete()
   123         self.assertEqual(e.creation_date, None)
   171         self.assertEqual(e.creation_date, None)
   124         self.assertEqual(e.modification_date, None)
   172         self.assertEqual(e.modification_date, None)
   125         self.assertEqual(e.firstname, None)
   173         self.assertEqual(e.firstname, None)