server/test/unittest_ldapuser.py
branchtls-sprint
changeset 1802 d628defebc17
parent 1398 5fe84a5f7035
child 1977 606923dff11b
equal deleted inserted replaced
1801:672acc730ce5 1802:d628defebc17
    32 config.sources_file = lambda : 'data/sourcesldap'
    32 config.sources_file = lambda : 'data/sourcesldap'
    33 repo, cnx = init_test_database('sqlite', config=config)
    33 repo, cnx = init_test_database('sqlite', config=config)
    34 
    34 
    35 class LDAPUserSourceTC(RepositoryBasedTC):
    35 class LDAPUserSourceTC(RepositoryBasedTC):
    36     repo, cnx = repo, cnx
    36     repo, cnx = repo, cnx
    37     
    37 
    38     def patch_authenticate(self):
    38     def patch_authenticate(self):
    39         self._orig_authenticate = LDAPUserSource.authenticate
    39         self._orig_authenticate = LDAPUserSource.authenticate
    40         LDAPUserSource.authenticate = nopwd_authenticate
    40         LDAPUserSource.authenticate = nopwd_authenticate
    41 
    41 
    42     def setUp(self):
    42     def setUp(self):
    43         self._prepare()
    43         self._prepare()
    44         # XXX: need this first query else we get 'database is locked' from 
    44         # XXX: need this first query else we get 'database is locked' from
    45         # sqlite since it doesn't support multiple connections on the same
    45         # sqlite since it doesn't support multiple connections on the same
    46         # database
    46         # database
    47         # so doing, ldap inserted users don't get removed between each test
    47         # so doing, ldap inserted users don't get removed between each test
    48         rset = self.execute('CWUser X')
    48         rset = self.execute('CWUser X')
    49         self.commit()
    49         self.commit()
    50         # check we get some users from ldap
    50         # check we get some users from ldap
    51         self.assert_(len(rset) > 1)
    51         self.assert_(len(rset) > 1)
    52         self.maxeid = self.execute('Any MAX(X)')[0][0]
    52         self.maxeid = self.execute('Any MAX(X)')[0][0]
    53         
    53 
    54     def tearDown(self):
    54     def tearDown(self):
    55         if hasattr(self, '_orig_authenticate'):
    55         if hasattr(self, '_orig_authenticate'):
    56             LDAPUserSource.authenticate = self._orig_authenticate
    56             LDAPUserSource.authenticate = self._orig_authenticate
    57         RepositoryBasedTC.tearDown(self)
    57         RepositoryBasedTC.tearDown(self)
    58             
    58 
    59     def test_authenticate(self):
    59     def test_authenticate(self):
    60         source = self.repo.sources_by_uri['ldapuser']
    60         source = self.repo.sources_by_uri['ldapuser']
    61         self.assertRaises(AuthenticationError,
    61         self.assertRaises(AuthenticationError,
    62                           source.authenticate, self.session, 'toto', 'toto')
    62                           source.authenticate, self.session, 'toto', 'toto')
    63         
    63 
    64     def test_synchronize(self):
    64     def test_synchronize(self):
    65         source = self.repo.sources_by_uri['ldapuser']
    65         source = self.repo.sources_by_uri['ldapuser']
    66         source.synchronize()
    66         source.synchronize()
    67         
    67 
    68     def test_base(self):
    68     def test_base(self):
    69         # check a known one
    69         # check a known one
    70         e = self.execute('CWUser X WHERE X login "syt"').get_entity(0, 0)
    70         e = self.execute('CWUser X WHERE X login "syt"').get_entity(0, 0)
    71         self.assertEquals(e.login, 'syt')
    71         self.assertEquals(e.login, 'syt')
    72         e.complete()
    72         e.complete()
   137         self.assertEquals(logins, sorted(logins))
   137         self.assertEquals(logins, sorted(logins))
   138 
   138 
   139     def test_or(self):
   139     def test_or(self):
   140         rset = self.execute('DISTINCT Any X WHERE X login "syt" OR (X in_group G, G name "managers")')
   140         rset = self.execute('DISTINCT Any X WHERE X login "syt" OR (X in_group G, G name "managers")')
   141         self.assertEquals(len(rset), 2, rset.rows) # syt + admin
   141         self.assertEquals(len(rset), 2, rset.rows) # syt + admin
   142         
   142 
   143     def test_nonregr_set_owned_by(self):
   143     def test_nonregr_set_owned_by(self):
   144         # test that when a user coming from ldap is triggering a transition
   144         # test that when a user coming from ldap is triggering a transition
   145         # the related TrInfo has correct owner information
   145         # the related TrInfo has correct owner information
   146         self.execute('SET X in_group G WHERE X login "syt", G name "managers"')
   146         self.execute('SET X in_group G WHERE X login "syt", G name "managers"')
   147         self.commit()
   147         self.commit()
   173         self.execute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')
   173         self.execute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')
   174 
   174 
   175     def test_multiple_entities_from_different_sources(self):
   175     def test_multiple_entities_from_different_sources(self):
   176         self.create_user('cochon')
   176         self.create_user('cochon')
   177         self.failUnless(self.execute('Any X,Y WHERE X login "syt", Y login "cochon"'))
   177         self.failUnless(self.execute('Any X,Y WHERE X login "syt", Y login "cochon"'))
   178         
   178 
   179     def test_exists1(self):
   179     def test_exists1(self):
   180         self.add_entity('CWGroup', name=u'bougloup1')
   180         self.add_entity('CWGroup', name=u'bougloup1')
   181         self.add_entity('CWGroup', name=u'bougloup2')
   181         self.add_entity('CWGroup', name=u'bougloup2')
   182         self.execute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"')
   182         self.execute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"')
   183         self.execute('SET U in_group G WHERE G name = "bougloup1", U login "syt"')
   183         self.execute('SET U in_group G WHERE G name = "bougloup1", U login "syt"')
   239                            'EXISTS(X copain T, T login in ("comme", "cochon")) AND '
   239                            'EXISTS(X copain T, T login in ("comme", "cochon")) AND '
   240                            'NOT EXISTS(X copain T2, T2 login "billy")')
   240                            'NOT EXISTS(X copain T2, T2 login "billy")')
   241         self.assertEquals(sorted(rset.rows), [['guests', 'cochon'],
   241         self.assertEquals(sorted(rset.rows), [['guests', 'cochon'],
   242                                               ['users', 'cochon'],
   242                                               ['users', 'cochon'],
   243                                               ['users', 'syt']])
   243                                               ['users', 'syt']])
   244         
   244 
   245     def test_cd_restriction(self):
   245     def test_cd_restriction(self):
   246         rset = self.execute('CWUser X WHERE X creation_date > "2009-02-01"')
   246         rset = self.execute('CWUser X WHERE X creation_date > "2009-02-01"')
   247         self.assertEquals(len(rset), 2) # admin/anon but no ldap user since it doesn't support creation_date
   247         self.assertEquals(len(rset), 2) # admin/anon but no ldap user since it doesn't support creation_date
   248         
   248 
   249     def test_union(self):
   249     def test_union(self):
   250         afeids = self.execute('State X')
   250         afeids = self.execute('State X')
   251         ueids = self.execute('CWUser X')
   251         ueids = self.execute('CWUser X')
   252         rset = self.execute('(Any X WHERE X is State) UNION (Any X WHERE X is CWUser)')
   252         rset = self.execute('(Any X WHERE X is State) UNION (Any X WHERE X is CWUser)')
   253         self.assertEquals(sorted(r[0] for r in rset.rows),
   253         self.assertEquals(sorted(r[0] for r in rset.rows),
   255 
   255 
   256     def _init_security_test(self):
   256     def _init_security_test(self):
   257         self.create_user('iaminguestsgrouponly', groups=('guests',))
   257         self.create_user('iaminguestsgrouponly', groups=('guests',))
   258         cnx = self.login('iaminguestsgrouponly')
   258         cnx = self.login('iaminguestsgrouponly')
   259         return cnx.cursor()
   259         return cnx.cursor()
   260     
   260 
   261     def test_security1(self):
   261     def test_security1(self):
   262         cu = self._init_security_test()
   262         cu = self._init_security_test()
   263         rset = cu.execute('Any X WHERE X login "syt"')
   263         rset = cu.execute('Any X WHERE X login "syt"')
   264         self.assertEquals(rset.rows, [])
   264         self.assertEquals(rset.rows, [])
   265         rset = cu.execute('Any X WHERE X login "iaminguestsgrouponly"')
   265         rset = cu.execute('Any X WHERE X login "iaminguestsgrouponly"')
   266         self.assertEquals(len(rset.rows), 1)
   266         self.assertEquals(len(rset.rows), 1)
   267     
   267 
   268     def test_security2(self):
   268     def test_security2(self):
   269         cu = self._init_security_test()
   269         cu = self._init_security_test()
   270         rset = cu.execute('Any X WHERE X has_text "syt"')
   270         rset = cu.execute('Any X WHERE X has_text "syt"')
   271         self.assertEquals(rset.rows, [])
   271         self.assertEquals(rset.rows, [])
   272         rset = cu.execute('Any X WHERE X has_text "iaminguestsgrouponly"')
   272         rset = cu.execute('Any X WHERE X has_text "iaminguestsgrouponly"')
   273         self.assertEquals(len(rset.rows), 1)
   273         self.assertEquals(len(rset.rows), 1)
   274     
   274 
   275     def test_security3(self):
   275     def test_security3(self):
   276         cu = self._init_security_test()
   276         cu = self._init_security_test()
   277         rset = cu.execute('Any F WHERE X has_text "syt", X firstname F')
   277         rset = cu.execute('Any F WHERE X has_text "syt", X firstname F')
   278         self.assertEquals(rset.rows, [])
   278         self.assertEquals(rset.rows, [])
   279         rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F')
   279         rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F')
   296 
   296 
   297     def test_nonregr4(self):
   297     def test_nonregr4(self):
   298         emaileid = self.execute('INSERT EmailAddress X: X address "toto@logilab.org"')[0][0]
   298         emaileid = self.execute('INSERT EmailAddress X: X address "toto@logilab.org"')[0][0]
   299         self.execute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA',
   299         self.execute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA',
   300                      {'x': emaileid})
   300                      {'x': emaileid})
   301         
   301 
   302     def test_nonregr5(self):
   302     def test_nonregr5(self):
   303         # original jpl query:
   303         # original jpl query:
   304         # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5
   304         # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5
   305         rql = 'Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "%s", P is X, X creation_date CD' % self.session.user.login
   305         rql = 'Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "%s", P is X, X creation_date CD' % self.session.user.login
   306         self.execute(rql, )#{'x': })
   306         self.execute(rql, )#{'x': })
   307         
   307 
   308     def test_nonregr6(self):
   308     def test_nonregr6(self):
   309         self.execute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File '
   309         self.execute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File '
   310                      'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) '
   310                      'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) '
   311                      'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) '
   311                      'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) '
   312                      'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)',
   312                      'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)',
   348         self.assertEquals(res, [[1, 5], [2, 4], [3, 6]])
   348         self.assertEquals(res, [[1, 5], [2, 4], [3, 6]])
   349 
   349 
   350 
   350 
   351 class RQL2LDAPFilterTC(RQLGeneratorTC):
   351 class RQL2LDAPFilterTC(RQLGeneratorTC):
   352     schema = repo.schema
   352     schema = repo.schema
   353     
   353 
   354     def setUp(self):
   354     def setUp(self):
   355         RQLGeneratorTC.setUp(self)
   355         RQLGeneratorTC.setUp(self)
   356         ldapsource = repo.sources[-1]
   356         ldapsource = repo.sources[-1]
   357         self.pool = repo._get_pool()
   357         self.pool = repo._get_pool()
   358         session = mock_object(pool=self.pool)
   358         session = mock_object(pool=self.pool)
   359         self.o = RQL2LDAPFilter(ldapsource, session)
   359         self.o = RQL2LDAPFilter(ldapsource, session)
   360         
   360 
   361     def tearDown(self):
   361     def tearDown(self):
   362         repo._free_pool(self.pool)
   362         repo._free_pool(self.pool)
   363         RQLGeneratorTC.tearDown(self)
   363         RQLGeneratorTC.tearDown(self)
   364         
   364 
   365     def test_base(self):
   365     def test_base(self):
   366         rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0]
   366         rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0]
   367         self.assertEquals(self.o.generate(rqlst, 'X')[1],
   367         self.assertEquals(self.o.generate(rqlst, 'X')[1],
   368                           '(&(objectClass=top)(objectClass=posixAccount)(uid=toto))')
   368                           '(&(objectClass=top)(objectClass=posixAccount)(uid=toto))')
   369         
   369 
   370     def test_kwargs(self):
   370     def test_kwargs(self):
   371         rqlst = self._prepare('CWUser X WHERE X login %(x)s').children[0]
   371         rqlst = self._prepare('CWUser X WHERE X login %(x)s').children[0]
   372         self.o._args = {'x': "toto"}
   372         self.o._args = {'x': "toto"}
   373         self.assertEquals(self.o.generate(rqlst, 'X')[1],
   373         self.assertEquals(self.o.generate(rqlst, 'X')[1],
   374                           '(&(objectClass=top)(objectClass=posixAccount)(uid=toto))')
   374                           '(&(objectClass=top)(objectClass=posixAccount)(uid=toto))')
   375         
   375 
   376     def test_get_attr(self):
   376     def test_get_attr(self):
   377         rqlst = self._prepare('Any X WHERE E firstname X, E eid 12').children[0]
   377         rqlst = self._prepare('Any X WHERE E firstname X, E eid 12').children[0]
   378         self.assertRaises(UnknownEid, self.o.generate, rqlst, 'E')
   378         self.assertRaises(UnknownEid, self.o.generate, rqlst, 'E')
   379         
   379 
   380         
   380 
   381 if __name__ == '__main__':
   381 if __name__ == '__main__':
   382     unittest_main()
   382     unittest_main()