32 config.sources_file = lambda : 'data/sourcesldap' |
32 config.sources_file = lambda : 'data/sourcesldap' |
33 repo, cnx = init_test_database('sqlite', config=config) |
33 repo, cnx = init_test_database('sqlite', config=config) |
34 |
34 |
35 class LDAPUserSourceTC(RepositoryBasedTC): |
35 class LDAPUserSourceTC(RepositoryBasedTC): |
36 repo, cnx = repo, cnx |
36 repo, cnx = repo, cnx |
37 |
37 |
38 def patch_authenticate(self): |
38 def patch_authenticate(self): |
39 self._orig_authenticate = LDAPUserSource.authenticate |
39 self._orig_authenticate = LDAPUserSource.authenticate |
40 LDAPUserSource.authenticate = nopwd_authenticate |
40 LDAPUserSource.authenticate = nopwd_authenticate |
41 |
41 |
42 def setUp(self): |
42 def setUp(self): |
43 self._prepare() |
43 self._prepare() |
44 # XXX: need this first query else we get 'database is locked' from |
44 # XXX: need this first query else we get 'database is locked' from |
45 # sqlite since it doesn't support multiple connections on the same |
45 # sqlite since it doesn't support multiple connections on the same |
46 # database |
46 # database |
47 # so doing, ldap inserted users don't get removed between each test |
47 # so doing, ldap inserted users don't get removed between each test |
48 rset = self.execute('CWUser X') |
48 rset = self.execute('CWUser X') |
49 self.commit() |
49 self.commit() |
50 # check we get some users from ldap |
50 # check we get some users from ldap |
51 self.assert_(len(rset) > 1) |
51 self.assert_(len(rset) > 1) |
52 self.maxeid = self.execute('Any MAX(X)')[0][0] |
52 self.maxeid = self.execute('Any MAX(X)')[0][0] |
53 |
53 |
54 def tearDown(self): |
54 def tearDown(self): |
55 if hasattr(self, '_orig_authenticate'): |
55 if hasattr(self, '_orig_authenticate'): |
56 LDAPUserSource.authenticate = self._orig_authenticate |
56 LDAPUserSource.authenticate = self._orig_authenticate |
57 RepositoryBasedTC.tearDown(self) |
57 RepositoryBasedTC.tearDown(self) |
58 |
58 |
59 def test_authenticate(self): |
59 def test_authenticate(self): |
60 source = self.repo.sources_by_uri['ldapuser'] |
60 source = self.repo.sources_by_uri['ldapuser'] |
61 self.assertRaises(AuthenticationError, |
61 self.assertRaises(AuthenticationError, |
62 source.authenticate, self.session, 'toto', 'toto') |
62 source.authenticate, self.session, 'toto', 'toto') |
63 |
63 |
64 def test_synchronize(self): |
64 def test_synchronize(self): |
65 source = self.repo.sources_by_uri['ldapuser'] |
65 source = self.repo.sources_by_uri['ldapuser'] |
66 source.synchronize() |
66 source.synchronize() |
67 |
67 |
68 def test_base(self): |
68 def test_base(self): |
69 # check a known one |
69 # check a known one |
70 e = self.execute('CWUser X WHERE X login "syt"').get_entity(0, 0) |
70 e = self.execute('CWUser X WHERE X login "syt"').get_entity(0, 0) |
71 self.assertEquals(e.login, 'syt') |
71 self.assertEquals(e.login, 'syt') |
72 e.complete() |
72 e.complete() |
137 self.assertEquals(logins, sorted(logins)) |
137 self.assertEquals(logins, sorted(logins)) |
138 |
138 |
139 def test_or(self): |
139 def test_or(self): |
140 rset = self.execute('DISTINCT Any X WHERE X login "syt" OR (X in_group G, G name "managers")') |
140 rset = self.execute('DISTINCT Any X WHERE X login "syt" OR (X in_group G, G name "managers")') |
141 self.assertEquals(len(rset), 2, rset.rows) # syt + admin |
141 self.assertEquals(len(rset), 2, rset.rows) # syt + admin |
142 |
142 |
143 def test_nonregr_set_owned_by(self): |
143 def test_nonregr_set_owned_by(self): |
144 # test that when a user coming from ldap is triggering a transition |
144 # test that when a user coming from ldap is triggering a transition |
145 # the related TrInfo has correct owner information |
145 # the related TrInfo has correct owner information |
146 self.execute('SET X in_group G WHERE X login "syt", G name "managers"') |
146 self.execute('SET X in_group G WHERE X login "syt", G name "managers"') |
147 self.commit() |
147 self.commit() |
173 self.execute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"') |
173 self.execute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"') |
174 |
174 |
175 def test_multiple_entities_from_different_sources(self): |
175 def test_multiple_entities_from_different_sources(self): |
176 self.create_user('cochon') |
176 self.create_user('cochon') |
177 self.failUnless(self.execute('Any X,Y WHERE X login "syt", Y login "cochon"')) |
177 self.failUnless(self.execute('Any X,Y WHERE X login "syt", Y login "cochon"')) |
178 |
178 |
179 def test_exists1(self): |
179 def test_exists1(self): |
180 self.add_entity('CWGroup', name=u'bougloup1') |
180 self.add_entity('CWGroup', name=u'bougloup1') |
181 self.add_entity('CWGroup', name=u'bougloup2') |
181 self.add_entity('CWGroup', name=u'bougloup2') |
182 self.execute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"') |
182 self.execute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"') |
183 self.execute('SET U in_group G WHERE G name = "bougloup1", U login "syt"') |
183 self.execute('SET U in_group G WHERE G name = "bougloup1", U login "syt"') |
239 'EXISTS(X copain T, T login in ("comme", "cochon")) AND ' |
239 'EXISTS(X copain T, T login in ("comme", "cochon")) AND ' |
240 'NOT EXISTS(X copain T2, T2 login "billy")') |
240 'NOT EXISTS(X copain T2, T2 login "billy")') |
241 self.assertEquals(sorted(rset.rows), [['guests', 'cochon'], |
241 self.assertEquals(sorted(rset.rows), [['guests', 'cochon'], |
242 ['users', 'cochon'], |
242 ['users', 'cochon'], |
243 ['users', 'syt']]) |
243 ['users', 'syt']]) |
244 |
244 |
245 def test_cd_restriction(self): |
245 def test_cd_restriction(self): |
246 rset = self.execute('CWUser X WHERE X creation_date > "2009-02-01"') |
246 rset = self.execute('CWUser X WHERE X creation_date > "2009-02-01"') |
247 self.assertEquals(len(rset), 2) # admin/anon but no ldap user since it doesn't support creation_date |
247 self.assertEquals(len(rset), 2) # admin/anon but no ldap user since it doesn't support creation_date |
248 |
248 |
249 def test_union(self): |
249 def test_union(self): |
250 afeids = self.execute('State X') |
250 afeids = self.execute('State X') |
251 ueids = self.execute('CWUser X') |
251 ueids = self.execute('CWUser X') |
252 rset = self.execute('(Any X WHERE X is State) UNION (Any X WHERE X is CWUser)') |
252 rset = self.execute('(Any X WHERE X is State) UNION (Any X WHERE X is CWUser)') |
253 self.assertEquals(sorted(r[0] for r in rset.rows), |
253 self.assertEquals(sorted(r[0] for r in rset.rows), |
255 |
255 |
256 def _init_security_test(self): |
256 def _init_security_test(self): |
257 self.create_user('iaminguestsgrouponly', groups=('guests',)) |
257 self.create_user('iaminguestsgrouponly', groups=('guests',)) |
258 cnx = self.login('iaminguestsgrouponly') |
258 cnx = self.login('iaminguestsgrouponly') |
259 return cnx.cursor() |
259 return cnx.cursor() |
260 |
260 |
261 def test_security1(self): |
261 def test_security1(self): |
262 cu = self._init_security_test() |
262 cu = self._init_security_test() |
263 rset = cu.execute('Any X WHERE X login "syt"') |
263 rset = cu.execute('Any X WHERE X login "syt"') |
264 self.assertEquals(rset.rows, []) |
264 self.assertEquals(rset.rows, []) |
265 rset = cu.execute('Any X WHERE X login "iaminguestsgrouponly"') |
265 rset = cu.execute('Any X WHERE X login "iaminguestsgrouponly"') |
266 self.assertEquals(len(rset.rows), 1) |
266 self.assertEquals(len(rset.rows), 1) |
267 |
267 |
268 def test_security2(self): |
268 def test_security2(self): |
269 cu = self._init_security_test() |
269 cu = self._init_security_test() |
270 rset = cu.execute('Any X WHERE X has_text "syt"') |
270 rset = cu.execute('Any X WHERE X has_text "syt"') |
271 self.assertEquals(rset.rows, []) |
271 self.assertEquals(rset.rows, []) |
272 rset = cu.execute('Any X WHERE X has_text "iaminguestsgrouponly"') |
272 rset = cu.execute('Any X WHERE X has_text "iaminguestsgrouponly"') |
273 self.assertEquals(len(rset.rows), 1) |
273 self.assertEquals(len(rset.rows), 1) |
274 |
274 |
275 def test_security3(self): |
275 def test_security3(self): |
276 cu = self._init_security_test() |
276 cu = self._init_security_test() |
277 rset = cu.execute('Any F WHERE X has_text "syt", X firstname F') |
277 rset = cu.execute('Any F WHERE X has_text "syt", X firstname F') |
278 self.assertEquals(rset.rows, []) |
278 self.assertEquals(rset.rows, []) |
279 rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F') |
279 rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F') |
296 |
296 |
297 def test_nonregr4(self): |
297 def test_nonregr4(self): |
298 emaileid = self.execute('INSERT EmailAddress X: X address "toto@logilab.org"')[0][0] |
298 emaileid = self.execute('INSERT EmailAddress X: X address "toto@logilab.org"')[0][0] |
299 self.execute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA', |
299 self.execute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA', |
300 {'x': emaileid}) |
300 {'x': emaileid}) |
301 |
301 |
302 def test_nonregr5(self): |
302 def test_nonregr5(self): |
303 # original jpl query: |
303 # original jpl query: |
304 # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5 |
304 # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5 |
305 rql = 'Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "%s", P is X, X creation_date CD' % self.session.user.login |
305 rql = 'Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "%s", P is X, X creation_date CD' % self.session.user.login |
306 self.execute(rql, )#{'x': }) |
306 self.execute(rql, )#{'x': }) |
307 |
307 |
308 def test_nonregr6(self): |
308 def test_nonregr6(self): |
309 self.execute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File ' |
309 self.execute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File ' |
310 'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) ' |
310 'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) ' |
311 'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) ' |
311 'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) ' |
312 'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)', |
312 'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)', |
348 self.assertEquals(res, [[1, 5], [2, 4], [3, 6]]) |
348 self.assertEquals(res, [[1, 5], [2, 4], [3, 6]]) |
349 |
349 |
350 |
350 |
351 class RQL2LDAPFilterTC(RQLGeneratorTC): |
351 class RQL2LDAPFilterTC(RQLGeneratorTC): |
352 schema = repo.schema |
352 schema = repo.schema |
353 |
353 |
354 def setUp(self): |
354 def setUp(self): |
355 RQLGeneratorTC.setUp(self) |
355 RQLGeneratorTC.setUp(self) |
356 ldapsource = repo.sources[-1] |
356 ldapsource = repo.sources[-1] |
357 self.pool = repo._get_pool() |
357 self.pool = repo._get_pool() |
358 session = mock_object(pool=self.pool) |
358 session = mock_object(pool=self.pool) |
359 self.o = RQL2LDAPFilter(ldapsource, session) |
359 self.o = RQL2LDAPFilter(ldapsource, session) |
360 |
360 |
361 def tearDown(self): |
361 def tearDown(self): |
362 repo._free_pool(self.pool) |
362 repo._free_pool(self.pool) |
363 RQLGeneratorTC.tearDown(self) |
363 RQLGeneratorTC.tearDown(self) |
364 |
364 |
365 def test_base(self): |
365 def test_base(self): |
366 rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0] |
366 rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0] |
367 self.assertEquals(self.o.generate(rqlst, 'X')[1], |
367 self.assertEquals(self.o.generate(rqlst, 'X')[1], |
368 '(&(objectClass=top)(objectClass=posixAccount)(uid=toto))') |
368 '(&(objectClass=top)(objectClass=posixAccount)(uid=toto))') |
369 |
369 |
370 def test_kwargs(self): |
370 def test_kwargs(self): |
371 rqlst = self._prepare('CWUser X WHERE X login %(x)s').children[0] |
371 rqlst = self._prepare('CWUser X WHERE X login %(x)s').children[0] |
372 self.o._args = {'x': "toto"} |
372 self.o._args = {'x': "toto"} |
373 self.assertEquals(self.o.generate(rqlst, 'X')[1], |
373 self.assertEquals(self.o.generate(rqlst, 'X')[1], |
374 '(&(objectClass=top)(objectClass=posixAccount)(uid=toto))') |
374 '(&(objectClass=top)(objectClass=posixAccount)(uid=toto))') |
375 |
375 |
376 def test_get_attr(self): |
376 def test_get_attr(self): |
377 rqlst = self._prepare('Any X WHERE E firstname X, E eid 12').children[0] |
377 rqlst = self._prepare('Any X WHERE E firstname X, E eid 12').children[0] |
378 self.assertRaises(UnknownEid, self.o.generate, rqlst, 'E') |
378 self.assertRaises(UnknownEid, self.o.generate, rqlst, 'E') |
379 |
379 |
380 |
380 |
381 if __name__ == '__main__': |
381 if __name__ == '__main__': |
382 unittest_main() |
382 unittest_main() |