server/test/unittest_ldapuser.py
branchstable
changeset 7088 76e0dba5f8f3
parent 6991 eb2ba251f093
parent 7078 bad26a22fe29
child 7121 c2badb6de3fe
equal deleted inserted replaced
7087:376314ebf273 7088:76e0dba5f8f3
    22 import time
    22 import time
    23 from os.path import abspath, join, exists
    23 from os.path import abspath, join, exists
    24 import subprocess
    24 import subprocess
    25 from socket import socket, error as socketerror
    25 from socket import socket, error as socketerror
    26 
    26 
    27 from logilab.common.testlib import TestCase, unittest_main, mock_object
    27 from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags
    28 from cubicweb.devtools.testlib import CubicWebTC
    28 from cubicweb.devtools.testlib import CubicWebTC
    29 from cubicweb.devtools.repotest import RQLGeneratorTC
    29 from cubicweb.devtools.repotest import RQLGeneratorTC
    30 from cubicweb.devtools.httptest import get_available_port
    30 from cubicweb.devtools.httptest import get_available_port
       
    31 from cubicweb.devtools import get_test_db_handler
    31 
    32 
    32 from cubicweb.server.sources.ldapuser import *
    33 from cubicweb.server.sources.ldapuser import *
    33 
    34 
    34 SYT = 'syt'
    35 SYT = 'syt'
    35 SYT_EMAIL = 'Sylvain Thenault'
    36 SYT_EMAIL = 'Sylvain Thenault'
    62     # don't check upassword !
    63     # don't check upassword !
    63     return self.extid2eid(user['dn'], 'CWUser', session)
    64     return self.extid2eid(user['dn'], 'CWUser', session)
    64 
    65 
    65 def setUpModule(*args):
    66 def setUpModule(*args):
    66     create_slapd_configuration(LDAPUserSourceTC.config)
    67     create_slapd_configuration(LDAPUserSourceTC.config)
    67     global repo
       
    68     try:
       
    69         LDAPUserSourceTC._init_repo()
       
    70         repo = LDAPUserSourceTC.repo
       
    71         add_ldap_source(LDAPUserSourceTC.cnx)
       
    72     except:
       
    73         terminate_slapd()
       
    74         raise
       
    75 
    68 
    76 def tearDownModule(*args):
    69 def tearDownModule(*args):
    77     global repo
       
    78     repo.shutdown()
       
    79     del repo
       
    80     terminate_slapd()
    70     terminate_slapd()
    81 
       
    82 def add_ldap_source(cnx):
       
    83     cnx.request().create_entity('CWSource', name=u'ldapuser', type=u'ldapuser',
       
    84                                 config=CONFIG)
       
    85     cnx.commit()
       
    86 
    71 
    87 def create_slapd_configuration(config):
    72 def create_slapd_configuration(config):
    88     global slapd_process, CONFIG
    73     global slapd_process, CONFIG
    89     basedir = join(config.apphome, "ldapdb")
    74     basedir = join(config.apphome, "ldapdb")
    90     slapdconf = join(config.apphome, "slapd.conf")
    75     slapdconf = join(config.apphome, "slapd.conf")
   125         else:
   110         else:
   126             import os, signal
   111             import os, signal
   127             os.kill(slapd_process.pid, signal.SIGTERM)
   112             os.kill(slapd_process.pid, signal.SIGTERM)
   128         slapd_process.wait()
   113         slapd_process.wait()
   129         print "DONE"
   114         print "DONE"
   130 
       
   131     del slapd_process
   115     del slapd_process
   132 
   116 
   133 class LDAPUserSourceTC(CubicWebTC):
   117 class LDAPUserSourceTC(CubicWebTC):
       
   118     test_db_id = 'ldap-user'
       
   119     tags = CubicWebTC.tags | Tags(('ldap'))
       
   120 
       
   121     @classmethod
       
   122     def pre_setup_database(cls, session, config):
       
   123         session.create_entity('CWSource', name=u'ldapuser', type=u'ldapuser',
       
   124                                     config=CONFIG)
       
   125         session.commit()
       
   126         # XXX keep it there
       
   127         session.execute('CWUser U')
   134 
   128 
   135     def patch_authenticate(self):
   129     def patch_authenticate(self):
   136         self._orig_authenticate = LDAPUserSource.authenticate
   130         self._orig_authenticate = LDAPUserSource.authenticate
   137         LDAPUserSource.authenticate = nopwd_authenticate
   131         LDAPUserSource.authenticate = nopwd_authenticate
   138 
   132 
   273         self.session.set_pool()
   267         self.session.set_pool()
   274         self.session.create_entity('CWGroup', name=u'bougloup1')
   268         self.session.create_entity('CWGroup', name=u'bougloup1')
   275         self.session.create_entity('CWGroup', name=u'bougloup2')
   269         self.session.create_entity('CWGroup', name=u'bougloup2')
   276         self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"')
   270         self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"')
   277         self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s', {'syt': SYT})
   271         self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s', {'syt': SYT})
   278         rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")')
   272         rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, '
       
   273                              'S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")')
   279         self.assertEqual(rset.rows, [['admin', 'activated'], [SYT, 'activated']])
   274         self.assertEqual(rset.rows, [['admin', 'activated'], [SYT, 'activated']])
   280 
   275 
   281     def test_exists2(self):
   276     def test_exists2(self):
   282         self.create_user('comme')
   277         self.create_user('comme')
   283         self.create_user('cochon')
   278         self.create_user('cochon')
   284         self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
   279         self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
   285         rset = self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, (G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))')
   280         rset = self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, '
       
   281                              '(G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))')
   286         self.assertEqual(rset.rows, [['managers'], ['users']])
   282         self.assertEqual(rset.rows, [['managers'], ['users']])
   287 
   283 
   288     def test_exists3(self):
   284     def test_exists3(self):
   289         self.create_user('comme')
   285         self.create_user('comme')
   290         self.create_user('cochon')
   286         self.create_user('cochon')
   291         self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
   287         self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
   292         self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"'))
   288         self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"'))
   293         self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': SYT})
   289         self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': SYT})
   294         self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': SYT}))
   290         self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': SYT}))
   295         rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon"))')
   291         rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" '
       
   292                              'OR EXISTS(X copain T, T login in ("comme", "cochon"))')
   296         self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', SYT]])
   293         self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', SYT]])
   297 
   294 
   298     def test_exists4(self):
   295     def test_exists4(self):
   299         self.create_user('comme')
   296         self.create_user('comme')
   300         self.create_user('cochon', groups=('users', 'guests'))
   297         self.create_user('cochon', groups=('users', 'guests'))
   395         self.sexecute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA',
   392         self.sexecute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA',
   396                      {'x': emaileid})
   393                      {'x': emaileid})
   397 
   394 
   398     def test_nonregr5(self):
   395     def test_nonregr5(self):
   399         # original jpl query:
   396         # original jpl query:
   400         # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5
   397         # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser,
   401         rql = 'Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "%s", P is X, X creation_date CD' % self.session.user.login
   398         # U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5
       
   399         rql = ('Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, '
       
   400                'U login "%s", P is X, X creation_date CD') % self.session.user.login
   402         self.sexecute(rql, )#{'x': })
   401         self.sexecute(rql, )#{'x': })
   403 
   402 
   404     def test_nonregr6(self):
   403     def test_nonregr6(self):
   405         self.sexecute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File '
   404         self.sexecute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File '
   406                      'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) '
   405                      'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) '
   443         res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
   442         res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
   444         self.assertEqual(res, [[1, 5], [2, 4], [3, 6]])
   443         self.assertEqual(res, [[1, 5], [2, 4], [3, 6]])
   445 
   444 
   446 class RQL2LDAPFilterTC(RQLGeneratorTC):
   445 class RQL2LDAPFilterTC(RQLGeneratorTC):
   447 
   446 
       
   447     tags = RQLGeneratorTC.tags | Tags(('ldap'))
       
   448 
       
   449     @property
       
   450     def schema(self):
       
   451         """return the application schema"""
       
   452         return self._schema
       
   453 
   448     def setUp(self):
   454     def setUp(self):
   449         self.schema = repo.schema
   455         self.handler = get_test_db_handler(LDAPUserSourceTC.config)
   450         RQLGeneratorTC.setUp(self)
   456         self.handler.build_db_cache('ldap-user', LDAPUserSourceTC.pre_setup_database)
       
   457         self.handler.restore_database('ldap-user')
       
   458         self._repo = repo = self.handler.get_repo()
       
   459         self._schema = repo.schema
       
   460         super(RQL2LDAPFilterTC, self).setUp()
   451         ldapsource = repo.sources[-1]
   461         ldapsource = repo.sources[-1]
   452         self.pool = repo._get_pool()
   462         self.pool = repo._get_pool()
   453         session = mock_object(pool=self.pool)
   463         session = mock_object(pool=self.pool)
   454         self.o = RQL2LDAPFilter(ldapsource, session)
   464         self.o = RQL2LDAPFilter(ldapsource, session)
   455         self.ldapclasses = ''.join(ldapsource.base_filters)
   465         self.ldapclasses = ''.join(ldapsource.base_filters)
   456 
   466 
   457     def tearDown(self):
   467     def tearDown(self):
   458         repo._free_pool(self.pool)
   468         self._repo.turn_repo_off()
   459         RQLGeneratorTC.tearDown(self)
   469         super(RQL2LDAPFilterTC, self).tearDown()
   460 
   470 
   461     def test_base(self):
   471     def test_base(self):
   462         rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0]
   472         rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0]
   463         self.assertEqual(self.o.generate(rqlst, 'X')[1],
   473         self.assertEqual(self.o.generate(rqlst, 'X')[1],
   464                           '(&%s(uid=toto))' % self.ldapclasses)
   474                           '(&%s(uid=toto))' % self.ldapclasses)