22 import time |
22 import time |
23 from os.path import abspath, join, exists |
23 from os.path import abspath, join, exists |
24 import subprocess |
24 import subprocess |
25 from socket import socket, error as socketerror |
25 from socket import socket, error as socketerror |
26 |
26 |
27 from logilab.common.testlib import TestCase, unittest_main, mock_object |
27 from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags |
28 from cubicweb.devtools.testlib import CubicWebTC |
28 from cubicweb.devtools.testlib import CubicWebTC |
29 from cubicweb.devtools.repotest import RQLGeneratorTC |
29 from cubicweb.devtools.repotest import RQLGeneratorTC |
30 from cubicweb.devtools.httptest import get_available_port |
30 from cubicweb.devtools.httptest import get_available_port |
|
31 from cubicweb.devtools import get_test_db_handler |
31 |
32 |
32 from cubicweb.server.sources.ldapuser import * |
33 from cubicweb.server.sources.ldapuser import * |
33 |
34 |
34 SYT = 'syt' |
35 SYT = 'syt' |
35 SYT_EMAIL = 'Sylvain Thenault' |
36 SYT_EMAIL = 'Sylvain Thenault' |
63 # don't check upassword ! |
64 # don't check upassword ! |
64 return self.extid2eid(user['dn'], 'CWUser', session) |
65 return self.extid2eid(user['dn'], 'CWUser', session) |
65 |
66 |
66 def setUpModule(*args): |
67 def setUpModule(*args): |
67 create_slapd_configuration(LDAPUserSourceTC.config) |
68 create_slapd_configuration(LDAPUserSourceTC.config) |
68 global repo |
|
69 try: |
|
70 LDAPUserSourceTC._init_repo() |
|
71 repo = LDAPUserSourceTC.repo |
|
72 add_ldap_source(LDAPUserSourceTC.cnx) |
|
73 except: |
|
74 terminate_slapd() |
|
75 raise |
|
76 |
69 |
77 def tearDownModule(*args): |
70 def tearDownModule(*args): |
78 global repo |
|
79 repo.shutdown() |
|
80 del repo |
|
81 terminate_slapd() |
71 terminate_slapd() |
82 |
|
83 def add_ldap_source(cnx): |
|
84 cnx.request().create_entity('CWSource', name=u'ldapuser', type=u'ldapuser', |
|
85 config=CONFIG) |
|
86 cnx.commit() |
|
87 |
72 |
88 def create_slapd_configuration(config): |
73 def create_slapd_configuration(config): |
89 global slapd_process, CONFIG |
74 global slapd_process, CONFIG |
90 basedir = join(config.apphome, "ldapdb") |
75 basedir = join(config.apphome, "ldapdb") |
91 slapdconf = join(config.apphome, "slapd.conf") |
76 slapdconf = join(config.apphome, "slapd.conf") |
274 self.session.set_pool() |
268 self.session.set_pool() |
275 self.session.create_entity('CWGroup', name=u'bougloup1') |
269 self.session.create_entity('CWGroup', name=u'bougloup1') |
276 self.session.create_entity('CWGroup', name=u'bougloup2') |
270 self.session.create_entity('CWGroup', name=u'bougloup2') |
277 self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"') |
271 self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"') |
278 self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s', {'syt': SYT}) |
272 self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s', {'syt': SYT}) |
279 rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")') |
273 rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, ' |
|
274 'S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")') |
280 self.assertEqual(rset.rows, [['admin', 'activated'], [SYT, 'activated']]) |
275 self.assertEqual(rset.rows, [['admin', 'activated'], [SYT, 'activated']]) |
281 |
276 |
282 def test_exists2(self): |
277 def test_exists2(self): |
283 self.create_user('comme') |
278 self.create_user('comme') |
284 self.create_user('cochon') |
279 self.create_user('cochon') |
285 self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') |
280 self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') |
286 rset = self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, (G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))') |
281 rset = self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, ' |
|
282 '(G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))') |
287 self.assertEqual(rset.rows, [['managers'], ['users']]) |
283 self.assertEqual(rset.rows, [['managers'], ['users']]) |
288 |
284 |
289 def test_exists3(self): |
285 def test_exists3(self): |
290 self.create_user('comme') |
286 self.create_user('comme') |
291 self.create_user('cochon') |
287 self.create_user('cochon') |
292 self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') |
288 self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') |
293 self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')) |
289 self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')) |
294 self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': SYT}) |
290 self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': SYT}) |
295 self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': SYT})) |
291 self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': SYT})) |
296 rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon"))') |
292 rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" ' |
|
293 'OR EXISTS(X copain T, T login in ("comme", "cochon"))') |
297 self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', SYT]]) |
294 self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', SYT]]) |
298 |
295 |
299 def test_exists4(self): |
296 def test_exists4(self): |
300 self.create_user('comme') |
297 self.create_user('comme') |
301 self.create_user('cochon', groups=('users', 'guests')) |
298 self.create_user('cochon', groups=('users', 'guests')) |
396 self.sexecute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA', |
393 self.sexecute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA', |
397 {'x': emaileid}) |
394 {'x': emaileid}) |
398 |
395 |
399 def test_nonregr5(self): |
396 def test_nonregr5(self): |
400 # original jpl query: |
397 # original jpl query: |
401 # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5 |
398 # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, |
402 rql = 'Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "%s", P is X, X creation_date CD' % self.session.user.login |
399 # U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5 |
|
400 rql = ('Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, ' |
|
401 'U login "%s", P is X, X creation_date CD') % self.session.user.login |
403 self.sexecute(rql, )#{'x': }) |
402 self.sexecute(rql, )#{'x': }) |
404 |
403 |
405 def test_nonregr6(self): |
404 def test_nonregr6(self): |
406 self.sexecute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File ' |
405 self.sexecute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File ' |
407 'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) ' |
406 'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) ' |
444 res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) |
443 res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) |
445 self.assertEqual(res, [[1, 5], [2, 4], [3, 6]]) |
444 self.assertEqual(res, [[1, 5], [2, 4], [3, 6]]) |
446 |
445 |
447 class RQL2LDAPFilterTC(RQLGeneratorTC): |
446 class RQL2LDAPFilterTC(RQLGeneratorTC): |
448 |
447 |
|
448 tags = RQLGeneratorTC.tags | Tags(('ldap')) |
|
449 |
|
450 @property |
|
451 def schema(self): |
|
452 """return the application schema""" |
|
453 return self._schema |
|
454 |
449 def setUp(self): |
455 def setUp(self): |
450 self.schema = repo.schema |
456 self.handler = get_test_db_handler(LDAPUserSourceTC.config) |
451 RQLGeneratorTC.setUp(self) |
457 self.handler.build_db_cache('ldap-user', LDAPUserSourceTC.pre_setup_database) |
|
458 self.handler.restore_database('ldap-user') |
|
459 self._repo = repo = self.handler.get_repo() |
|
460 self._schema = repo.schema |
|
461 super(RQL2LDAPFilterTC, self).setUp() |
452 ldapsource = repo.sources[-1] |
462 ldapsource = repo.sources[-1] |
453 self.pool = repo._get_pool() |
463 self.pool = repo._get_pool() |
454 session = mock_object(pool=self.pool) |
464 session = mock_object(pool=self.pool) |
455 self.o = RQL2LDAPFilter(ldapsource, session) |
465 self.o = RQL2LDAPFilter(ldapsource, session) |
456 self.ldapclasses = ''.join('(objectClass=%s)' % ldapcls |
466 self.ldapclasses = ''.join('(objectClass=%s)' % ldapcls |
457 for ldapcls in ldapsource.user_classes) |
467 for ldapcls in ldapsource.user_classes) |
458 |
468 |
459 def tearDown(self): |
469 def tearDown(self): |
460 repo._free_pool(self.pool) |
470 self._repo.turn_repo_off() |
461 RQLGeneratorTC.tearDown(self) |
471 super(RQL2LDAPFilterTC, self).tearDown() |
462 |
472 |
463 def test_base(self): |
473 def test_base(self): |
464 rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0] |
474 rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0] |
465 self.assertEqual(self.o.generate(rqlst, 'X')[1], |
475 self.assertEqual(self.o.generate(rqlst, 'X')[1], |
466 '(&%s(uid=toto))' % self.ldapclasses) |
476 '(&%s(uid=toto))' % self.ldapclasses) |