22 import time |
22 import time |
23 from os.path import abspath, join, exists |
23 from os.path import abspath, join, exists |
24 import subprocess |
24 import subprocess |
25 from socket import socket, error as socketerror |
25 from socket import socket, error as socketerror |
26 |
26 |
27 from logilab.common.testlib import TestCase, unittest_main, mock_object |
27 from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags |
28 from cubicweb.devtools.testlib import CubicWebTC |
28 from cubicweb.devtools.testlib import CubicWebTC |
29 from cubicweb.devtools.repotest import RQLGeneratorTC |
29 from cubicweb.devtools.repotest import RQLGeneratorTC |
30 from cubicweb.devtools.httptest import get_available_port |
30 from cubicweb.devtools.httptest import get_available_port |
|
31 from cubicweb.devtools import get_test_db_handler |
31 |
32 |
32 from cubicweb.server.sources.ldapuser import * |
33 from cubicweb.server.sources.ldapuser import * |
33 |
34 |
34 SYT = 'syt' |
35 SYT = 'syt' |
35 SYT_EMAIL = 'Sylvain Thenault' |
36 SYT_EMAIL = 'Sylvain Thenault' |
62 # don't check upassword ! |
63 # don't check upassword ! |
63 return self.extid2eid(user['dn'], 'CWUser', session) |
64 return self.extid2eid(user['dn'], 'CWUser', session) |
64 |
65 |
65 def setUpModule(*args): |
66 def setUpModule(*args): |
66 create_slapd_configuration(LDAPUserSourceTC.config) |
67 create_slapd_configuration(LDAPUserSourceTC.config) |
67 global repo |
|
68 try: |
|
69 LDAPUserSourceTC._init_repo() |
|
70 repo = LDAPUserSourceTC.repo |
|
71 add_ldap_source(LDAPUserSourceTC.cnx) |
|
72 except: |
|
73 terminate_slapd() |
|
74 raise |
|
75 |
68 |
76 def tearDownModule(*args): |
69 def tearDownModule(*args): |
77 global repo |
|
78 repo.shutdown() |
|
79 del repo |
|
80 terminate_slapd() |
70 terminate_slapd() |
81 |
|
82 def add_ldap_source(cnx): |
|
83 cnx.request().create_entity('CWSource', name=u'ldapuser', type=u'ldapuser', |
|
84 config=CONFIG) |
|
85 cnx.commit() |
|
86 |
71 |
87 def create_slapd_configuration(config): |
72 def create_slapd_configuration(config): |
88 global slapd_process, CONFIG |
73 global slapd_process, CONFIG |
89 basedir = join(config.apphome, "ldapdb") |
74 basedir = join(config.apphome, "ldapdb") |
90 slapdconf = join(config.apphome, "slapd.conf") |
75 slapdconf = join(config.apphome, "slapd.conf") |
273 self.session.set_pool() |
267 self.session.set_pool() |
274 self.session.create_entity('CWGroup', name=u'bougloup1') |
268 self.session.create_entity('CWGroup', name=u'bougloup1') |
275 self.session.create_entity('CWGroup', name=u'bougloup2') |
269 self.session.create_entity('CWGroup', name=u'bougloup2') |
276 self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"') |
270 self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"') |
277 self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s', {'syt': SYT}) |
271 self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s', {'syt': SYT}) |
278 rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")') |
272 rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, ' |
|
273 'S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")') |
279 self.assertEqual(rset.rows, [['admin', 'activated'], [SYT, 'activated']]) |
274 self.assertEqual(rset.rows, [['admin', 'activated'], [SYT, 'activated']]) |
280 |
275 |
281 def test_exists2(self): |
276 def test_exists2(self): |
282 self.create_user('comme') |
277 self.create_user('comme') |
283 self.create_user('cochon') |
278 self.create_user('cochon') |
284 self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') |
279 self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') |
285 rset = self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, (G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))') |
280 rset = self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, ' |
|
281 '(G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))') |
286 self.assertEqual(rset.rows, [['managers'], ['users']]) |
282 self.assertEqual(rset.rows, [['managers'], ['users']]) |
287 |
283 |
288 def test_exists3(self): |
284 def test_exists3(self): |
289 self.create_user('comme') |
285 self.create_user('comme') |
290 self.create_user('cochon') |
286 self.create_user('cochon') |
291 self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') |
287 self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') |
292 self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')) |
288 self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')) |
293 self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': SYT}) |
289 self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': SYT}) |
294 self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': SYT})) |
290 self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': SYT})) |
295 rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon"))') |
291 rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" ' |
|
292 'OR EXISTS(X copain T, T login in ("comme", "cochon"))') |
296 self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', SYT]]) |
293 self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', SYT]]) |
297 |
294 |
298 def test_exists4(self): |
295 def test_exists4(self): |
299 self.create_user('comme') |
296 self.create_user('comme') |
300 self.create_user('cochon', groups=('users', 'guests')) |
297 self.create_user('cochon', groups=('users', 'guests')) |
395 self.sexecute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA', |
392 self.sexecute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA', |
396 {'x': emaileid}) |
393 {'x': emaileid}) |
397 |
394 |
398 def test_nonregr5(self): |
395 def test_nonregr5(self): |
399 # original jpl query: |
396 # original jpl query: |
400 # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5 |
397 # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, |
401 rql = 'Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "%s", P is X, X creation_date CD' % self.session.user.login |
398 # U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5 |
|
399 rql = ('Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, ' |
|
400 'U login "%s", P is X, X creation_date CD') % self.session.user.login |
402 self.sexecute(rql, )#{'x': }) |
401 self.sexecute(rql, )#{'x': }) |
403 |
402 |
404 def test_nonregr6(self): |
403 def test_nonregr6(self): |
405 self.sexecute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File ' |
404 self.sexecute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File ' |
406 'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) ' |
405 'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) ' |
443 res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) |
442 res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) |
444 self.assertEqual(res, [[1, 5], [2, 4], [3, 6]]) |
443 self.assertEqual(res, [[1, 5], [2, 4], [3, 6]]) |
445 |
444 |
446 class RQL2LDAPFilterTC(RQLGeneratorTC): |
445 class RQL2LDAPFilterTC(RQLGeneratorTC): |
447 |
446 |
|
447 tags = RQLGeneratorTC.tags | Tags(('ldap')) |
|
448 |
|
449 @property |
|
450 def schema(self): |
|
451 """return the application schema""" |
|
452 return self._schema |
|
453 |
448 def setUp(self): |
454 def setUp(self): |
449 self.schema = repo.schema |
455 self.handler = get_test_db_handler(LDAPUserSourceTC.config) |
450 RQLGeneratorTC.setUp(self) |
456 self.handler.build_db_cache('ldap-user', LDAPUserSourceTC.pre_setup_database) |
|
457 self.handler.restore_database('ldap-user') |
|
458 self._repo = repo = self.handler.get_repo() |
|
459 self._schema = repo.schema |
|
460 super(RQL2LDAPFilterTC, self).setUp() |
451 ldapsource = repo.sources[-1] |
461 ldapsource = repo.sources[-1] |
452 self.pool = repo._get_pool() |
462 self.pool = repo._get_pool() |
453 session = mock_object(pool=self.pool) |
463 session = mock_object(pool=self.pool) |
454 self.o = RQL2LDAPFilter(ldapsource, session) |
464 self.o = RQL2LDAPFilter(ldapsource, session) |
455 self.ldapclasses = ''.join(ldapsource.base_filters) |
465 self.ldapclasses = ''.join(ldapsource.base_filters) |
456 |
466 |
457 def tearDown(self): |
467 def tearDown(self): |
458 repo._free_pool(self.pool) |
468 self._repo.turn_repo_off() |
459 RQLGeneratorTC.tearDown(self) |
469 super(RQL2LDAPFilterTC, self).tearDown() |
460 |
470 |
461 def test_base(self): |
471 def test_base(self): |
462 rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0] |
472 rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0] |
463 self.assertEqual(self.o.generate(rqlst, 'X')[1], |
473 self.assertEqual(self.o.generate(rqlst, 'X')[1], |
464 '(&%s(uid=toto))' % self.ldapclasses) |
474 '(&%s(uid=toto))' % self.ldapclasses) |