server/test/unittest_ldapuser.py
branchtls-sprint
changeset 1398 5fe84a5f7035
parent 1122 9f37de24251f
child 1802 d628defebc17
equal deleted inserted replaced
1397:6cbc7bc8ea6d 1398:5fe84a5f7035
    22                             searchstr)[0]
    22                             searchstr)[0]
    23     except IndexError:
    23     except IndexError:
    24         # no such user
    24         # no such user
    25         raise AuthenticationError()
    25         raise AuthenticationError()
    26     # don't check upassword !
    26     # don't check upassword !
    27     return self.extid2eid(user['dn'], 'EUser', session)
    27     return self.extid2eid(user['dn'], 'CWUser', session)
    28 
    28 
    29 
    29 
    30 
    30 
    31 config = TestServerConfiguration('data')
    31 config = TestServerConfiguration('data')
    32 config.sources_file = lambda : 'data/sourcesldap'
    32 config.sources_file = lambda : 'data/sourcesldap'
    43         self._prepare()
    43         self._prepare()
    44         # XXX: need this first query else we get 'database is locked' from 
    44         # XXX: need this first query else we get 'database is locked' from 
    45         # sqlite since it doesn't support multiple connections on the same
    45         # sqlite since it doesn't support multiple connections on the same
    46         # database
    46         # database
    47         # so doing, ldap inserted users don't get removed between each test
    47         # so doing, ldap inserted users don't get removed between each test
    48         rset = self.execute('EUser X')
    48         rset = self.execute('CWUser X')
    49         self.commit()
    49         self.commit()
    50         # check we get some users from ldap
    50         # check we get some users from ldap
    51         self.assert_(len(rset) > 1)
    51         self.assert_(len(rset) > 1)
    52         self.maxeid = self.execute('Any MAX(X)')[0][0]
    52         self.maxeid = self.execute('Any MAX(X)')[0][0]
    53         
    53         
    65         source = self.repo.sources_by_uri['ldapuser']
    65         source = self.repo.sources_by_uri['ldapuser']
    66         source.synchronize()
    66         source.synchronize()
    67         
    67         
    68     def test_base(self):
    68     def test_base(self):
    69         # check a known one
    69         # check a known one
    70         e = self.execute('EUser X WHERE X login "syt"').get_entity(0, 0)
    70         e = self.execute('CWUser X WHERE X login "syt"').get_entity(0, 0)
    71         self.assertEquals(e.login, 'syt')
    71         self.assertEquals(e.login, 'syt')
    72         e.complete()
    72         e.complete()
    73         self.assertEquals(e.creation_date, None)
    73         self.assertEquals(e.creation_date, None)
    74         self.assertEquals(e.modification_date, None)
    74         self.assertEquals(e.modification_date, None)
    75         self.assertEquals(e.firstname, None)
    75         self.assertEquals(e.firstname, None)
    77         self.assertEquals(e.in_group[0].name, 'users')
    77         self.assertEquals(e.in_group[0].name, 'users')
    78         self.assertEquals(e.owned_by[0].login, 'syt')
    78         self.assertEquals(e.owned_by[0].login, 'syt')
    79         self.assertEquals(e.created_by, [])
    79         self.assertEquals(e.created_by, [])
    80         self.assertEquals(e.primary_email[0].address, 'Sylvain Thenault')
    80         self.assertEquals(e.primary_email[0].address, 'Sylvain Thenault')
    81         # email content should be indexed on the user
    81         # email content should be indexed on the user
    82         rset = self.execute('EUser X WHERE X has_text "thenault"')
    82         rset = self.execute('CWUser X WHERE X has_text "thenault"')
    83         self.assertEquals(rset.rows, [[e.eid]])
    83         self.assertEquals(rset.rows, [[e.eid]])
    84 
    84 
    85     def test_not(self):
    85     def test_not(self):
    86         eid = self.execute('EUser X WHERE X login "syt"')[0][0]
    86         eid = self.execute('CWUser X WHERE X login "syt"')[0][0]
    87         rset = self.execute('EUser X WHERE NOT X eid %s' % eid)
    87         rset = self.execute('CWUser X WHERE NOT X eid %s' % eid)
    88         self.assert_(rset)
    88         self.assert_(rset)
    89         self.assert_(not eid in (r[0] for r in rset))
    89         self.assert_(not eid in (r[0] for r in rset))
    90 
    90 
    91     def test_multiple(self):
    91     def test_multiple(self):
    92         seid = self.execute('EUser X WHERE X login "syt"')[0][0]
    92         seid = self.execute('CWUser X WHERE X login "syt"')[0][0]
    93         aeid = self.execute('EUser X WHERE X login "adim"')[0][0]
    93         aeid = self.execute('CWUser X WHERE X login "adim"')[0][0]
    94         rset = self.execute('EUser X, Y WHERE X login "syt", Y login "adim"')
    94         rset = self.execute('CWUser X, Y WHERE X login "syt", Y login "adim"')
    95         self.assertEquals(rset.rows, [[seid, aeid]])
    95         self.assertEquals(rset.rows, [[seid, aeid]])
    96         rset = self.execute('Any X,Y,L WHERE X login L, X login "syt", Y login "adim"')
    96         rset = self.execute('Any X,Y,L WHERE X login L, X login "syt", Y login "adim"')
    97         self.assertEquals(rset.rows, [[seid, aeid, 'syt']])
    97         self.assertEquals(rset.rows, [[seid, aeid, 'syt']])
    98 
    98 
    99     def test_in(self):
    99     def test_in(self):
   100         seid = self.execute('EUser X WHERE X login "syt"')[0][0]
   100         seid = self.execute('CWUser X WHERE X login "syt"')[0][0]
   101         aeid = self.execute('EUser X WHERE X login "adim"')[0][0]
   101         aeid = self.execute('CWUser X WHERE X login "adim"')[0][0]
   102         rset = self.execute('Any X,L ORDERBY L WHERE X login IN("syt", "adim"), X login L')
   102         rset = self.execute('Any X,L ORDERBY L WHERE X login IN("syt", "adim"), X login L')
   103         self.assertEquals(rset.rows, [[aeid, 'adim'], [seid, 'syt']])
   103         self.assertEquals(rset.rows, [[aeid, 'adim'], [seid, 'syt']])
   104 
   104 
   105     def test_relations(self):
   105     def test_relations(self):
   106         eid = self.execute('EUser X WHERE X login "syt"')[0][0]
   106         eid = self.execute('CWUser X WHERE X login "syt"')[0][0]
   107         rset = self.execute('Any X,E WHERE X is EUser, X login L, X primary_email E')
   107         rset = self.execute('Any X,E WHERE X is CWUser, X login L, X primary_email E')
   108         self.assert_(eid in (r[0] for r in rset))
   108         self.assert_(eid in (r[0] for r in rset))
   109         rset = self.execute('Any X,L,E WHERE X is EUser, X login L, X primary_email E')
   109         rset = self.execute('Any X,L,E WHERE X is CWUser, X login L, X primary_email E')
   110         self.assert_('syt' in (r[1] for r in rset))
   110         self.assert_('syt' in (r[1] for r in rset))
   111 
   111 
   112     def test_count(self):
   112     def test_count(self):
   113         nbusers = self.execute('Any COUNT(X) WHERE X is EUser')[0][0]
   113         nbusers = self.execute('Any COUNT(X) WHERE X is CWUser')[0][0]
   114         # just check this is a possible number
   114         # just check this is a possible number
   115         self.assert_(nbusers > 1, nbusers)
   115         self.assert_(nbusers > 1, nbusers)
   116         self.assert_(nbusers < 30, nbusers)
   116         self.assert_(nbusers < 30, nbusers)
   117 
   117 
   118     def test_upper(self):
   118     def test_upper(self):
   119         eid = self.execute('EUser X WHERE X login "syt"')[0][0]
   119         eid = self.execute('CWUser X WHERE X login "syt"')[0][0]
   120         rset = self.execute('Any UPPER(L) WHERE X eid %s, X login L' % eid)
   120         rset = self.execute('Any UPPER(L) WHERE X eid %s, X login L' % eid)
   121         self.assertEquals(rset[0][0], 'SYT')
   121         self.assertEquals(rset[0][0], 'SYT')
   122 
   122 
   123     def test_unknown_attr(self):
   123     def test_unknown_attr(self):
   124         eid = self.execute('EUser X WHERE X login "syt"')[0][0]
   124         eid = self.execute('CWUser X WHERE X login "syt"')[0][0]
   125         rset = self.execute('Any L,C,M WHERE X eid %s, X login L, '
   125         rset = self.execute('Any L,C,M WHERE X eid %s, X login L, '
   126                             'X creation_date C, X modification_date M' % eid)
   126                             'X creation_date C, X modification_date M' % eid)
   127         self.assertEquals(rset[0][0], 'syt')
   127         self.assertEquals(rset[0][0], 'syt')
   128         self.assertEquals(rset[0][1], None)
   128         self.assertEquals(rset[0][1], None)
   129         self.assertEquals(rset[0][2], None)
   129         self.assertEquals(rset[0][2], None)
   143     def test_nonregr_set_owned_by(self):
   143     def test_nonregr_set_owned_by(self):
   144         # test that when a user coming from ldap is triggering a transition
   144         # test that when a user coming from ldap is triggering a transition
   145         # the related TrInfo has correct owner information
   145         # the related TrInfo has correct owner information
   146         self.execute('SET X in_group G WHERE X login "syt", G name "managers"')
   146         self.execute('SET X in_group G WHERE X login "syt", G name "managers"')
   147         self.commit()
   147         self.commit()
   148         syt = self.execute('EUser X WHERE X login "syt"').get_entity(0, 0)
   148         syt = self.execute('CWUser X WHERE X login "syt"').get_entity(0, 0)
   149         self.assertEquals([g.name for g in syt.in_group], ['managers', 'users'])
   149         self.assertEquals([g.name for g in syt.in_group], ['managers', 'users'])
   150         self.patch_authenticate()
   150         self.patch_authenticate()
   151         cnx = self.login('syt', 'dummypassword')
   151         cnx = self.login('syt', 'dummypassword')
   152         cu = cnx.cursor()
   152         cu = cnx.cursor()
   153         cu.execute('SET X in_state S WHERE X login "alf", S name "deactivated"')
   153         cu.execute('SET X in_state S WHERE X login "alf", S name "deactivated"')
   154         try:
   154         try:
   155             cnx.commit()
   155             cnx.commit()
   156             alf = self.execute('EUser X WHERE X login "alf"').get_entity(0, 0)
   156             alf = self.execute('CWUser X WHERE X login "alf"').get_entity(0, 0)
   157             self.assertEquals(alf.in_state[0].name, 'deactivated')
   157             self.assertEquals(alf.in_state[0].name, 'deactivated')
   158             trinfo = alf.latest_trinfo()
   158             trinfo = alf.latest_trinfo()
   159             self.assertEquals(trinfo.owned_by[0].login, 'syt')
   159             self.assertEquals(trinfo.owned_by[0].login, 'syt')
   160             # select from_state to skip the user's creation TrInfo
   160             # select from_state to skip the user's creation TrInfo
   161             rset = self.execute('Any U ORDERBY D DESC WHERE WF wf_info_for X,'
   161             rset = self.execute('Any U ORDERBY D DESC WHERE WF wf_info_for X,'
   175     def test_multiple_entities_from_different_sources(self):
   175     def test_multiple_entities_from_different_sources(self):
   176         self.create_user('cochon')
   176         self.create_user('cochon')
   177         self.failUnless(self.execute('Any X,Y WHERE X login "syt", Y login "cochon"'))
   177         self.failUnless(self.execute('Any X,Y WHERE X login "syt", Y login "cochon"'))
   178         
   178         
   179     def test_exists1(self):
   179     def test_exists1(self):
   180         self.add_entity('EGroup', name=u'bougloup1')
   180         self.add_entity('CWGroup', name=u'bougloup1')
   181         self.add_entity('EGroup', name=u'bougloup2')
   181         self.add_entity('CWGroup', name=u'bougloup2')
   182         self.execute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"')
   182         self.execute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"')
   183         self.execute('SET U in_group G WHERE G name = "bougloup1", U login "syt"')
   183         self.execute('SET U in_group G WHERE G name = "bougloup1", U login "syt"')
   184         rset = self.execute('Any L,SN ORDERBY L WHERE X in_state S, S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")')
   184         rset = self.execute('Any L,SN ORDERBY L WHERE X in_state S, S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")')
   185         self.assertEquals(rset.rows, [['admin', 'activated'], ['syt', 'activated']])
   185         self.assertEquals(rset.rows, [['admin', 'activated'], ['syt', 'activated']])
   186 
   186 
   208         self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"')
   208         self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"')
   209         self.execute('SET X copain Y WHERE X login "cochon", Y login "cochon"')
   209         self.execute('SET X copain Y WHERE X login "cochon", Y login "cochon"')
   210         self.execute('SET X copain Y WHERE X login "comme", Y login "billy"')
   210         self.execute('SET X copain Y WHERE X login "comme", Y login "billy"')
   211         self.execute('SET X copain Y WHERE X login "syt", Y login "billy"')
   211         self.execute('SET X copain Y WHERE X login "syt", Y login "billy"')
   212         # search for group name, login where
   212         # search for group name, login where
   213         #   EUser copain with "comme" or "cochon" AND same login as the copain
   213         #   CWUser copain with "comme" or "cochon" AND same login as the copain
   214         # OR
   214         # OR
   215         #   EUser in_state activated AND not copain with billy
   215         #   CWUser in_state activated AND not copain with billy
   216         #
   216         #
   217         # SO we expect everybody but "comme" and "syt"
   217         # SO we expect everybody but "comme" and "syt"
   218         rset= self.execute('Any GN,L WHERE X in_group G, X login L, G name GN, '
   218         rset= self.execute('Any GN,L WHERE X in_group G, X login L, G name GN, '
   219                            'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR '
   219                            'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR '
   220                            'EXISTS(X in_state S, S name "activated", NOT X copain T2, T2 login "billy")')
   220                            'EXISTS(X in_state S, S name "activated", NOT X copain T2, T2 login "billy")')
   241         self.assertEquals(sorted(rset.rows), [['guests', 'cochon'],
   241         self.assertEquals(sorted(rset.rows), [['guests', 'cochon'],
   242                                               ['users', 'cochon'],
   242                                               ['users', 'cochon'],
   243                                               ['users', 'syt']])
   243                                               ['users', 'syt']])
   244         
   244         
   245     def test_cd_restriction(self):
   245     def test_cd_restriction(self):
   246         rset = self.execute('EUser X WHERE X creation_date > "2009-02-01"')
   246         rset = self.execute('CWUser X WHERE X creation_date > "2009-02-01"')
   247         self.assertEquals(len(rset), 2) # admin/anon but no ldap user since it doesn't support creation_date
   247         self.assertEquals(len(rset), 2) # admin/anon but no ldap user since it doesn't support creation_date
   248         
   248         
   249     def test_union(self):
   249     def test_union(self):
   250         afeids = self.execute('State X')
   250         afeids = self.execute('State X')
   251         ueids = self.execute('EUser X')
   251         ueids = self.execute('CWUser X')
   252         rset = self.execute('(Any X WHERE X is State) UNION (Any X WHERE X is EUser)')
   252         rset = self.execute('(Any X WHERE X is State) UNION (Any X WHERE X is CWUser)')
   253         self.assertEquals(sorted(r[0] for r in rset.rows),
   253         self.assertEquals(sorted(r[0] for r in rset.rows),
   254                           sorted(r[0] for r in afeids + ueids))
   254                           sorted(r[0] for r in afeids + ueids))
   255 
   255 
   256     def _init_security_test(self):
   256     def _init_security_test(self):
   257         self.create_user('iaminguestsgrouponly', groups=('guests',))
   257         self.create_user('iaminguestsgrouponly', groups=('guests',))
   299         self.execute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA',
   299         self.execute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA',
   300                      {'x': emaileid})
   300                      {'x': emaileid})
   301         
   301         
   302     def test_nonregr5(self):
   302     def test_nonregr5(self):
   303         # original jpl query:
   303         # original jpl query:
   304         # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is EUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5
   304         # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5
   305         rql = 'Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "%s", P is X, X creation_date CD' % self.session.user.login
   305         rql = 'Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "%s", P is X, X creation_date CD' % self.session.user.login
   306         self.execute(rql, )#{'x': })
   306         self.execute(rql, )#{'x': })
   307         
   307         
   308     def test_nonregr6(self):
   308     def test_nonregr6(self):
   309         self.execute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File '
   309         self.execute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File '
   310                      'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) '
   310                      'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) '
   311                      'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) '
   311                      'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) '
   312                      'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is EUser)',
   312                      'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)',
   313                      {'x': self.session.user.eid})
   313                      {'x': self.session.user.eid})
   314 
   314 
   315 
   315 
   316 class GlobTrFuncTC(TestCase):
   316 class GlobTrFuncTC(TestCase):
   317 
   317 
   361     def tearDown(self):
   361     def tearDown(self):
   362         repo._free_pool(self.pool)
   362         repo._free_pool(self.pool)
   363         RQLGeneratorTC.tearDown(self)
   363         RQLGeneratorTC.tearDown(self)
   364         
   364         
   365     def test_base(self):
   365     def test_base(self):
   366         rqlst = self._prepare('EUser X WHERE X login "toto"').children[0]
   366         rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0]
   367         self.assertEquals(self.o.generate(rqlst, 'X')[1],
   367         self.assertEquals(self.o.generate(rqlst, 'X')[1],
   368                           '(&(objectClass=top)(objectClass=posixAccount)(uid=toto))')
   368                           '(&(objectClass=top)(objectClass=posixAccount)(uid=toto))')
   369         
   369         
   370     def test_kwargs(self):
   370     def test_kwargs(self):
   371         rqlst = self._prepare('EUser X WHERE X login %(x)s').children[0]
   371         rqlst = self._prepare('CWUser X WHERE X login %(x)s').children[0]
   372         self.o._args = {'x': "toto"}
   372         self.o._args = {'x': "toto"}
   373         self.assertEquals(self.o.generate(rqlst, 'X')[1],
   373         self.assertEquals(self.o.generate(rqlst, 'X')[1],
   374                           '(&(objectClass=top)(objectClass=posixAccount)(uid=toto))')
   374                           '(&(objectClass=top)(objectClass=posixAccount)(uid=toto))')
   375         
   375         
   376     def test_get_attr(self):
   376     def test_get_attr(self):