server/test/unittest_ldapuser.py
changeset 8188 1867e252e487
parent 8146 67b9b273b70d
child 8229 b7bc631816f7
equal deleted inserted replaced
8187:981f6e487788 8188:1867e252e487
     1 # copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     1 # copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
     3 #
     3 #
     4 # This file is part of CubicWeb.
     4 # This file is part of CubicWeb.
     5 #
     5 #
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
    23 from os.path import abspath, join, exists
    23 from os.path import abspath, join, exists
    24 import subprocess
    24 import subprocess
    25 from socket import socket, error as socketerror
    25 from socket import socket, error as socketerror
    26 
    26 
    27 from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags
    27 from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags
       
    28 
       
    29 from cubicweb import AuthenticationError
    28 from cubicweb.devtools.testlib import CubicWebTC
    30 from cubicweb.devtools.testlib import CubicWebTC
    29 from cubicweb.devtools.repotest import RQLGeneratorTC
    31 from cubicweb.devtools.repotest import RQLGeneratorTC
    30 from cubicweb.devtools.httptest import get_available_port
    32 from cubicweb.devtools.httptest import get_available_port
    31 from cubicweb.devtools import get_test_db_handler
    33 from cubicweb.devtools import get_test_db_handler
    32 
    34 
    33 from cubicweb.server.sources.ldapuser import *
    35 from cubicweb.server.sources.ldapuser import *
    34 
    36 
    35 CONFIG = u'''host=%s
    37 CONFIG = u'''user-base-dn=ou=People,dc=cubicweb,dc=test
    36 user-base-dn=ou=People,dc=cubicweb,dc=test
       
    37 user-scope=ONELEVEL
    38 user-scope=ONELEVEL
    38 user-classes=top,posixAccount
    39 user-classes=top,posixAccount
    39 user-login-attr=uid
    40 user-login-attr=uid
    40 user-default-group=users
    41 user-default-group=users
    41 user-attrs-map=gecos:email,uid:login
    42 user-attrs-map=gecos:email,uid:login
    42 '''
    43 '''
    43 
    44 URL = None
    44 
    45 
    45 def setUpModule(*args):
    46 def setUpModule(*args):
    46     create_slapd_configuration(LDAPUserSourceTC.config)
    47     create_slapd_configuration(LDAPUserSourceTC.config)
    47 
    48 
    48 def tearDownModule(*args):
    49 def tearDownModule(*args):
    49     terminate_slapd()
    50     terminate_slapd()
    50 
    51 
    51 def create_slapd_configuration(config):
    52 def create_slapd_configuration(config):
    52     global slapd_process, CONFIG
    53     global slapd_process, URL
    53     basedir = join(config.apphome, "ldapdb")
    54     basedir = join(config.apphome, "ldapdb")
    54     slapdconf = join(config.apphome, "slapd.conf")
    55     slapdconf = join(config.apphome, "slapd.conf")
    55     confin = file(join(config.apphome, "slapd.conf.in")).read()
    56     confin = file(join(config.apphome, "slapd.conf.in")).read()
    56     confstream = file(slapdconf, 'w')
    57     confstream = file(slapdconf, 'w')
    57     confstream.write(confin % {'apphome': config.apphome})
    58     confstream.write(confin % {'apphome': config.apphome})
    76     if slapd_process.poll() is None:
    77     if slapd_process.poll() is None:
    77         print "slapd started with pid %s" % slapd_process.pid
    78         print "slapd started with pid %s" % slapd_process.pid
    78     else:
    79     else:
    79         raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' %
    80         raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' %
    80                                (" ".join(cmdline), os.getcwd()))
    81                                (" ".join(cmdline), os.getcwd()))
    81     CONFIG = CONFIG % host
    82     URL = u'ldap://%s' % host
    82 
    83 
    83 def terminate_slapd():
    84 def terminate_slapd():
    84     global slapd_process
    85     global slapd_process
    85     if slapd_process.returncode is None:
    86     if slapd_process.returncode is None:
    86         print "terminating slapd"
    87         print "terminating slapd"
    91             os.kill(slapd_process.pid, signal.SIGTERM)
    92             os.kill(slapd_process.pid, signal.SIGTERM)
    92         slapd_process.wait()
    93         slapd_process.wait()
    93         print "DONE"
    94         print "DONE"
    94     del slapd_process
    95     del slapd_process
    95 
    96 
    96 class LDAPUserSourceTC(CubicWebTC):
    97 
    97     test_db_id = 'ldap-user'
    98 
    98     tags = CubicWebTC.tags | Tags(('ldap'))
    99 
       
   100 class LDAPFeedSourceTC(CubicWebTC):
       
   101     test_db_id = 'ldap-feed'
    99 
   102 
   100     @classmethod
   103     @classmethod
   101     def pre_setup_database(cls, session, config):
   104     def pre_setup_database(cls, session, config):
   102         session.create_entity('CWSource', name=u'ldapuser', type=u'ldapuser',
   105         session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed',
   103                               config=CONFIG)
   106                               url=URL, config=CONFIG)
   104         session.commit()
   107         session.commit()
   105         # XXX keep it there
   108         isession = session.repo.internal_session()
   106         session.execute('CWUser U')
   109         lfsource = isession.repo.sources_by_uri['ldapuser']
       
   110         stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
       
   111 
       
   112     def setUp(self):
       
   113         super(LDAPFeedSourceTC, self).setUp()
       
   114         # ldap source url in the database may use a different port as the one
       
   115         # just attributed
       
   116         lfsource = self.repo.sources_by_uri['ldapuser']
       
   117         lfsource.urls = [URL]
       
   118 
       
   119     def assertMetadata(self, entity):
       
   120         self.assertTrue(entity.creation_date)
       
   121         self.assertTrue(entity.modification_date)
   107 
   122 
   108     def test_authenticate(self):
   123     def test_authenticate(self):
   109         source = self.repo.sources_by_uri['ldapuser']
   124         source = self.repo.sources_by_uri['ldapuser']
   110         self.session.set_cnxset()
   125         self.session.set_cnxset()
   111         self.assertRaises(AuthenticationError,
   126         self.assertRaises(AuthenticationError,
   112                           source.authenticate, self.session, 'toto', 'toto')
   127                           source.authenticate, self.session, 'toto', 'toto')
   113 
       
   114     def test_synchronize(self):
       
   115         source = self.repo.sources_by_uri['ldapuser']
       
   116         source.synchronize()
       
   117 
   128 
   118     def test_base(self):
   129     def test_base(self):
   119         # check a known one
   130         # check a known one
   120         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
   131         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
   121         e = rset.get_entity(0, 0)
   132         e = rset.get_entity(0, 0)
   122         self.assertEqual(e.login, 'syt')
   133         self.assertEqual(e.login, 'syt')
   123         e.complete()
   134         e.complete()
   124         self.assertEqual(e.creation_date, None)
   135         self.assertMetadata(e)
   125         self.assertEqual(e.modification_date, None)
   136         self.assertEqual(e.firstname, None)
       
   137         self.assertEqual(e.surname, None)
       
   138         self.assertEqual(e.in_group[0].name, 'users')
       
   139         self.assertEqual(e.owned_by[0].login, 'syt')
       
   140         self.assertEqual(e.created_by, ())
       
   141         self.assertEqual(e.primary_email[0].address, 'Sylvain Thenault')
       
   142         # email content should be indexed on the user
       
   143         rset = self.sexecute('CWUser X WHERE X has_text "thenault"')
       
   144         self.assertEqual(rset.rows, [[e.eid]])
       
   145 
       
   146     def test_copy_to_system_source(self):
       
   147         source = self.repo.sources_by_uri['ldapuser']
       
   148         eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
       
   149         self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
       
   150         self.commit()
       
   151         source.reset_caches()
       
   152         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
       
   153         self.assertEqual(len(rset), 1)
       
   154         e = rset.get_entity(0, 0)
       
   155         self.assertEqual(e.eid, eid)
       
   156         self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', 'uri': u'system', 'use-cwuri-as-url': False},
       
   157                                                   'type': 'CWUser',
       
   158                                                   'extid': None})
       
   159         self.assertEqual(e.cw_source[0].name, 'system')
       
   160         self.assertTrue(e.creation_date)
       
   161         self.assertTrue(e.modification_date)
       
   162         # XXX test some password has been set
       
   163         source.pull_data(self.session)
       
   164         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
       
   165         self.assertEqual(len(rset), 1)
       
   166 
       
   167 
       
   168 class LDAPUserSourceTC(LDAPFeedSourceTC):
       
   169     test_db_id = 'ldap-user'
       
   170     tags = CubicWebTC.tags | Tags(('ldap'))
       
   171 
       
   172     @classmethod
       
   173     def pre_setup_database(cls, session, config):
       
   174         session.create_entity('CWSource', name=u'ldapuser', type=u'ldapuser',
       
   175                               url=URL, config=CONFIG)
       
   176         session.commit()
       
   177         # XXX keep it there
       
   178         session.execute('CWUser U')
       
   179 
       
   180     def assertMetadata(self, entity):
       
   181         self.assertEqual(entity.creation_date, None)
       
   182         self.assertEqual(entity.modification_date, None)
       
   183 
       
   184     def test_synchronize(self):
       
   185         source = self.repo.sources_by_uri['ldapuser']
       
   186         source.synchronize()
       
   187 
       
   188     def test_base(self):
       
   189         # check a known one
       
   190         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
       
   191         e = rset.get_entity(0, 0)
       
   192         self.assertEqual(e.login, 'syt')
       
   193         e.complete()
       
   194         self.assertMetadata(e)
   126         self.assertEqual(e.firstname, None)
   195         self.assertEqual(e.firstname, None)
   127         self.assertEqual(e.surname, None)
   196         self.assertEqual(e.surname, None)
   128         self.assertEqual(e.in_group[0].name, 'users')
   197         self.assertEqual(e.in_group[0].name, 'users')
   129         self.assertEqual(e.owned_by[0].login, 'syt')
   198         self.assertEqual(e.owned_by[0].login, 'syt')
   130         self.assertEqual(e.created_by, ())
   199         self.assertEqual(e.created_by, ())
   345         rset = cu.execute('Any F WHERE X has_text %(syt)s, X firstname F', {'syt': 'syt'})
   414         rset = cu.execute('Any F WHERE X has_text %(syt)s, X firstname F', {'syt': 'syt'})
   346         self.assertEqual(rset.rows, [])
   415         self.assertEqual(rset.rows, [])
   347         rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F')
   416         rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F')
   348         self.assertEqual(rset.rows, [[None]])
   417         self.assertEqual(rset.rows, [[None]])
   349 
   418 
   350     def test_copy_to_system_source(self):
       
   351         source = self.repo.sources_by_uri['ldapuser']
       
   352         eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
       
   353         self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
       
   354         self.commit()
       
   355         source.reset_caches()
       
   356         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
       
   357         self.assertEqual(len(rset), 1)
       
   358         e = rset.get_entity(0, 0)
       
   359         self.assertEqual(e.eid, eid)
       
   360         self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', 'uri': u'system', 'use-cwuri-as-url': False},
       
   361                                                   'type': 'CWUser',
       
   362                                                   'extid': None})
       
   363         self.assertEqual(e.cw_source[0].name, 'system')
       
   364         self.assertTrue(e.creation_date)
       
   365         self.assertTrue(e.modification_date)
       
   366         # XXX test some password has been set
       
   367         source.synchronize()
       
   368         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
       
   369         self.assertEqual(len(rset), 1)
       
   370 
       
   371     def test_nonregr1(self):
   419     def test_nonregr1(self):
   372         self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, '
   420         self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, '
   373                      'X modification_date AA',
   421                      'X modification_date AA',
   374                      {'x': self.session.user.eid})
   422                      {'x': self.session.user.eid})
   375 
   423 
   401                      'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) '
   449                      'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) '
   402                      'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) '
   450                      'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) '
   403                      'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)',
   451                      'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)',
   404                      {'x': self.session.user.eid})
   452                      {'x': self.session.user.eid})
   405 
   453 
   406 
       
   407 class GlobTrFuncTC(TestCase):
   454 class GlobTrFuncTC(TestCase):
   408 
   455 
   409     def test_count(self):
   456     def test_count(self):
   410         trfunc = GlobTrFunc('count', 0)
   457         trfunc = GlobTrFunc('count', 0)
   411         res = trfunc.apply([[1], [2], [3], [4]])
   458         res = trfunc.apply([[1], [2], [3], [4]])
   435         res = trfunc.apply([[1], [2], [3], [4]])
   482         res = trfunc.apply([[1], [2], [3], [4]])
   436         self.assertEqual(res, [[4]])
   483         self.assertEqual(res, [[4]])
   437         trfunc = GlobTrFunc('max', 1)
   484         trfunc = GlobTrFunc('max', 1)
   438         res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
   485         res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
   439         self.assertEqual(res, [[1, 5], [2, 4], [3, 6]])
   486         self.assertEqual(res, [[1, 5], [2, 4], [3, 6]])
       
   487 
   440 
   488 
   441 class RQL2LDAPFilterTC(RQLGeneratorTC):
   489 class RQL2LDAPFilterTC(RQLGeneratorTC):
   442 
   490 
   443     tags = RQLGeneratorTC.tags | Tags(('ldap'))
   491     tags = RQLGeneratorTC.tags | Tags(('ldap'))
   444 
   492