server/test/unittest_ldapuser.py
brancholdstable
changeset 7078 bad26a22fe29
parent 6986 18343456ee49
child 7088 76e0dba5f8f3
equal deleted inserted replaced
7074:e4580e5f0703 7078:bad26a22fe29
    22 import time
    22 import time
    23 from os.path import abspath, join, exists
    23 from os.path import abspath, join, exists
    24 import subprocess
    24 import subprocess
    25 from socket import socket, error as socketerror
    25 from socket import socket, error as socketerror
    26 
    26 
    27 from logilab.common.testlib import TestCase, unittest_main, mock_object
    27 from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags
    28 from cubicweb.devtools.testlib import CubicWebTC
    28 from cubicweb.devtools.testlib import CubicWebTC
    29 from cubicweb.devtools.repotest import RQLGeneratorTC
    29 from cubicweb.devtools.repotest import RQLGeneratorTC
    30 from cubicweb.devtools.httptest import get_available_port
    30 from cubicweb.devtools.httptest import get_available_port
       
    31 from cubicweb.devtools import get_test_db_handler
    31 
    32 
    32 from cubicweb.server.sources.ldapuser import *
    33 from cubicweb.server.sources.ldapuser import *
    33 
    34 
    34 SYT = 'syt'
    35 SYT = 'syt'
    35 SYT_EMAIL = 'Sylvain Thenault'
    36 SYT_EMAIL = 'Sylvain Thenault'
    63     # don't check upassword !
    64     # don't check upassword !
    64     return self.extid2eid(user['dn'], 'CWUser', session)
    65     return self.extid2eid(user['dn'], 'CWUser', session)
    65 
    66 
    66 def setUpModule(*args):
    67 def setUpModule(*args):
    67     create_slapd_configuration(LDAPUserSourceTC.config)
    68     create_slapd_configuration(LDAPUserSourceTC.config)
    68     global repo
       
    69     try:
       
    70         LDAPUserSourceTC._init_repo()
       
    71         repo = LDAPUserSourceTC.repo
       
    72         add_ldap_source(LDAPUserSourceTC.cnx)
       
    73     except:
       
    74         terminate_slapd()
       
    75         raise
       
    76 
    69 
    77 def tearDownModule(*args):
    70 def tearDownModule(*args):
    78     global repo
       
    79     repo.shutdown()
       
    80     del repo
       
    81     terminate_slapd()
    71     terminate_slapd()
    82 
       
    83 def add_ldap_source(cnx):
       
    84     cnx.request().create_entity('CWSource', name=u'ldapuser', type=u'ldapuser',
       
    85                                 config=CONFIG)
       
    86     cnx.commit()
       
    87 
    72 
    88 def create_slapd_configuration(config):
    73 def create_slapd_configuration(config):
    89     global slapd_process, CONFIG
    74     global slapd_process, CONFIG
    90     basedir = join(config.apphome, "ldapdb")
    75     basedir = join(config.apphome, "ldapdb")
    91     slapdconf = join(config.apphome, "slapd.conf")
    76     slapdconf = join(config.apphome, "slapd.conf")
   126         else:
   111         else:
   127             import os, signal
   112             import os, signal
   128             os.kill(slapd_process.pid, signal.SIGTERM)
   113             os.kill(slapd_process.pid, signal.SIGTERM)
   129         slapd_process.wait()
   114         slapd_process.wait()
   130         print "DONE"
   115         print "DONE"
   131 
       
   132     del slapd_process
   116     del slapd_process
   133 
   117 
   134 class LDAPUserSourceTC(CubicWebTC):
   118 class LDAPUserSourceTC(CubicWebTC):
       
   119     test_db_id = 'ldap-user'
       
   120     tags = CubicWebTC.tags | Tags(('ldap'))
       
   121 
       
   122     @classmethod
       
   123     def pre_setup_database(cls, session, config):
       
   124         session.create_entity('CWSource', name=u'ldapuser', type=u'ldapuser',
       
   125                                     config=CONFIG)
       
   126         session.commit()
       
   127         # XXX keep it there
       
   128         session.execute('CWUser U')
   135 
   129 
   136     def patch_authenticate(self):
   130     def patch_authenticate(self):
   137         self._orig_authenticate = LDAPUserSource.authenticate
   131         self._orig_authenticate = LDAPUserSource.authenticate
   138         LDAPUserSource.authenticate = nopwd_authenticate
   132         LDAPUserSource.authenticate = nopwd_authenticate
   139 
   133 
   274         self.session.set_pool()
   268         self.session.set_pool()
   275         self.session.create_entity('CWGroup', name=u'bougloup1')
   269         self.session.create_entity('CWGroup', name=u'bougloup1')
   276         self.session.create_entity('CWGroup', name=u'bougloup2')
   270         self.session.create_entity('CWGroup', name=u'bougloup2')
   277         self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"')
   271         self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"')
   278         self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s', {'syt': SYT})
   272         self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s', {'syt': SYT})
   279         rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")')
   273         rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, '
       
   274                              'S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")')
   280         self.assertEqual(rset.rows, [['admin', 'activated'], [SYT, 'activated']])
   275         self.assertEqual(rset.rows, [['admin', 'activated'], [SYT, 'activated']])
   281 
   276 
   282     def test_exists2(self):
   277     def test_exists2(self):
   283         self.create_user('comme')
   278         self.create_user('comme')
   284         self.create_user('cochon')
   279         self.create_user('cochon')
   285         self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
   280         self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
   286         rset = self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, (G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))')
   281         rset = self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, '
       
   282                              '(G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))')
   287         self.assertEqual(rset.rows, [['managers'], ['users']])
   283         self.assertEqual(rset.rows, [['managers'], ['users']])
   288 
   284 
   289     def test_exists3(self):
   285     def test_exists3(self):
   290         self.create_user('comme')
   286         self.create_user('comme')
   291         self.create_user('cochon')
   287         self.create_user('cochon')
   292         self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
   288         self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
   293         self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"'))
   289         self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"'))
   294         self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': SYT})
   290         self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': SYT})
   295         self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': SYT}))
   291         self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': SYT}))
   296         rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon"))')
   292         rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" '
       
   293                              'OR EXISTS(X copain T, T login in ("comme", "cochon"))')
   297         self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', SYT]])
   294         self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', SYT]])
   298 
   295 
   299     def test_exists4(self):
   296     def test_exists4(self):
   300         self.create_user('comme')
   297         self.create_user('comme')
   301         self.create_user('cochon', groups=('users', 'guests'))
   298         self.create_user('cochon', groups=('users', 'guests'))
   396         self.sexecute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA',
   393         self.sexecute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA',
   397                      {'x': emaileid})
   394                      {'x': emaileid})
   398 
   395 
   399     def test_nonregr5(self):
   396     def test_nonregr5(self):
   400         # original jpl query:
   397         # original jpl query:
   401         # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5
   398         # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser,
   402         rql = 'Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "%s", P is X, X creation_date CD' % self.session.user.login
   399         # U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5
       
   400         rql = ('Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, '
       
   401                'U login "%s", P is X, X creation_date CD') % self.session.user.login
   403         self.sexecute(rql, )#{'x': })
   402         self.sexecute(rql, )#{'x': })
   404 
   403 
   405     def test_nonregr6(self):
   404     def test_nonregr6(self):
   406         self.sexecute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File '
   405         self.sexecute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File '
   407                      'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) '
   406                      'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) '
   444         res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
   443         res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
   445         self.assertEqual(res, [[1, 5], [2, 4], [3, 6]])
   444         self.assertEqual(res, [[1, 5], [2, 4], [3, 6]])
   446 
   445 
   447 class RQL2LDAPFilterTC(RQLGeneratorTC):
   446 class RQL2LDAPFilterTC(RQLGeneratorTC):
   448 
   447 
       
   448     tags = RQLGeneratorTC.tags | Tags(('ldap'))
       
   449 
       
   450     @property
       
   451     def schema(self):
       
   452         """return the application schema"""
       
   453         return self._schema
       
   454 
   449     def setUp(self):
   455     def setUp(self):
   450         self.schema = repo.schema
   456         self.handler = get_test_db_handler(LDAPUserSourceTC.config)
   451         RQLGeneratorTC.setUp(self)
   457         self.handler.build_db_cache('ldap-user', LDAPUserSourceTC.pre_setup_database)
       
   458         self.handler.restore_database('ldap-user')
       
   459         self._repo = repo = self.handler.get_repo()
       
   460         self._schema = repo.schema
       
   461         super(RQL2LDAPFilterTC, self).setUp()
   452         ldapsource = repo.sources[-1]
   462         ldapsource = repo.sources[-1]
   453         self.pool = repo._get_pool()
   463         self.pool = repo._get_pool()
   454         session = mock_object(pool=self.pool)
   464         session = mock_object(pool=self.pool)
   455         self.o = RQL2LDAPFilter(ldapsource, session)
   465         self.o = RQL2LDAPFilter(ldapsource, session)
   456         self.ldapclasses = ''.join('(objectClass=%s)' % ldapcls
   466         self.ldapclasses = ''.join('(objectClass=%s)' % ldapcls
   457                                    for ldapcls in ldapsource.user_classes)
   467                                    for ldapcls in ldapsource.user_classes)
   458 
   468 
   459     def tearDown(self):
   469     def tearDown(self):
   460         repo._free_pool(self.pool)
   470         self._repo.turn_repo_off()
   461         RQLGeneratorTC.tearDown(self)
   471         super(RQL2LDAPFilterTC, self).tearDown()
   462 
   472 
   463     def test_base(self):
   473     def test_base(self):
   464         rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0]
   474         rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0]
   465         self.assertEqual(self.o.generate(rqlst, 'X')[1],
   475         self.assertEqual(self.o.generate(rqlst, 'X')[1],
   466                           '(&%s(uid=toto))' % self.ldapclasses)
   476                           '(&%s(uid=toto))' % self.ldapclasses)