|
1 # copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
3 # |
|
4 # This file is part of CubicWeb. |
|
5 # |
|
6 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
7 # terms of the GNU Lesser General Public License as published by the Free |
|
8 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
9 # any later version. |
|
10 # |
|
11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
13 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
14 # details. |
|
15 # |
|
16 # You should have received a copy of the GNU Lesser General Public License along |
|
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
18 """cubicweb.server.sources.ldapusers unit and functional tests""" |
|
19 |
|
20 import os |
|
21 import shutil |
|
22 import time |
|
23 from os.path import join, exists |
|
24 import subprocess |
|
25 import tempfile |
|
26 |
|
27 from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags |
|
28 |
|
29 from cubicweb import AuthenticationError |
|
30 from cubicweb.devtools.testlib import CubicWebTC |
|
31 from cubicweb.devtools.repotest import RQLGeneratorTC |
|
32 from cubicweb.devtools.httptest import get_available_port |
|
33 from cubicweb.devtools import get_test_db_handler |
|
34 |
|
35 from cubicweb.server.sources.ldapuser import GlobTrFunc, UnknownEid, RQL2LDAPFilter |
|
36 |
|
37 CONFIG = u'user-base-dn=ou=People,dc=cubicweb,dc=test' |
|
38 URL = None |
|
39 |
|
40 def create_slapd_configuration(cls): |
|
41 global URL |
|
42 slapddir = tempfile.mkdtemp('cw-unittest-ldap') |
|
43 config = cls.config |
|
44 slapdconf = join(config.apphome, "slapd.conf") |
|
45 confin = file(join(config.apphome, "slapd.conf.in")).read() |
|
46 confstream = file(slapdconf, 'w') |
|
47 confstream.write(confin % {'apphome': config.apphome, 'testdir': slapddir}) |
|
48 confstream.close() |
|
49 # fill ldap server with some data |
|
50 ldiffile = join(config.apphome, "ldap_test.ldif") |
|
51 config.info('Initing ldap database') |
|
52 cmdline = "/usr/sbin/slapadd -f %s -l %s -c" % (slapdconf, ldiffile) |
|
53 subprocess.call(cmdline, shell=True) |
|
54 |
|
55 #ldapuri = 'ldapi://' + join(basedir, "ldapi").replace('/', '%2f') |
|
56 port = get_available_port(xrange(9000, 9100)) |
|
57 host = 'localhost:%s' % port |
|
58 ldapuri = 'ldap://%s' % host |
|
59 cmdline = ["/usr/sbin/slapd", "-f", slapdconf, "-h", ldapuri, "-d", "0"] |
|
60 config.info('Starting slapd:', ' '.join(cmdline)) |
|
61 cls.slapd_process = subprocess.Popen(cmdline) |
|
62 time.sleep(0.2) |
|
63 if cls.slapd_process.poll() is None: |
|
64 config.info('slapd started with pid %s' % cls.slapd_process.pid) |
|
65 else: |
|
66 raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' % |
|
67 (" ".join(cmdline), os.getcwd())) |
|
68 URL = u'ldap://%s' % host |
|
69 return slapddir |
|
70 |
|
71 def terminate_slapd(cls): |
|
72 config = cls.config |
|
73 if cls.slapd_process and cls.slapd_process.returncode is None: |
|
74 config.info('terminating slapd') |
|
75 if hasattr(cls.slapd_process, 'terminate'): |
|
76 cls.slapd_process.terminate() |
|
77 else: |
|
78 import os, signal |
|
79 os.kill(cls.slapd_process.pid, signal.SIGTERM) |
|
80 cls.slapd_process.wait() |
|
81 config.info('DONE') |
|
82 |
|
83 class LDAPTestBase(CubicWebTC): |
|
84 loglevel = 'ERROR' |
|
85 |
|
86 @classmethod |
|
87 def setUpClass(cls): |
|
88 from cubicweb.cwctl import init_cmdline_log_threshold |
|
89 init_cmdline_log_threshold(cls.config, cls.loglevel) |
|
90 cls._tmpdir = create_slapd_configuration(cls) |
|
91 |
|
92 @classmethod |
|
93 def tearDownClass(cls): |
|
94 terminate_slapd(cls) |
|
95 try: |
|
96 shutil.rmtree(cls._tmpdir) |
|
97 except: |
|
98 pass |
|
99 |
|
100 class CheckWrongGroup(LDAPTestBase): |
|
101 |
|
102 def test_wrong_group(self): |
|
103 self.session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed', |
|
104 url=URL, config=CONFIG) |
|
105 self.commit() |
|
106 with self.session.repo.internal_session(safe=True) as session: |
|
107 source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0) |
|
108 config = source.repo_source.check_config(source) |
|
109 # inject a bogus group here, along with at least a valid one |
|
110 config['user-default-group'] = ('thisgroupdoesnotexists','users') |
|
111 source.repo_source.update_config(source, config) |
|
112 session.commit(free_cnxset=False) |
|
113 # here we emitted an error log entry |
|
114 stats = source.repo_source.pull_data(session, force=True, raise_on_error=True) |
|
115 session.commit() |
|
116 |
|
117 class DeleteStuffFromLDAPFeedSourceTC(LDAPTestBase): |
|
118 test_db_id = 'ldap-feed' |
|
119 |
|
120 @classmethod |
|
121 def pre_setup_database(cls, session, config): |
|
122 session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed', |
|
123 url=URL, config=CONFIG) |
|
124 session.commit() |
|
125 with session.repo.internal_session(safe=True) as isession: |
|
126 lfsource = isession.repo.sources_by_uri['ldapuser'] |
|
127 stats = lfsource.pull_data(isession, force=True, raise_on_error=True) |
|
128 |
|
129 def _pull(self): |
|
130 with self.session.repo.internal_session() as isession: |
|
131 lfsource = isession.repo.sources_by_uri['ldapuser'] |
|
132 stats = lfsource.pull_data(isession, force=True, raise_on_error=True) |
|
133 isession.commit() |
|
134 |
|
135 def test_a_filter_inactivate(self): |
|
136 """ filtered out people should be deactivated, unable to authenticate """ |
|
137 source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0) |
|
138 config = source.repo_source.check_config(source) |
|
139 # filter with adim's phone number |
|
140 config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109') |
|
141 source.repo_source.update_config(source, config) |
|
142 self.commit() |
|
143 self._pull() |
|
144 self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt') |
|
145 self.assertEqual(self.execute('Any N WHERE U login "syt", ' |
|
146 'U in_state S, S name N').rows[0][0], |
|
147 'deactivated') |
|
148 self.assertEqual(self.execute('Any N WHERE U login "adim", ' |
|
149 'U in_state S, S name N').rows[0][0], |
|
150 'activated') |
|
151 # unfilter, syt should be activated again |
|
152 config['user-filter'] = u'' |
|
153 source.repo_source.update_config(source, config) |
|
154 self.commit() |
|
155 self._pull() |
|
156 self.assertEqual(self.execute('Any N WHERE U login "syt", ' |
|
157 'U in_state S, S name N').rows[0][0], |
|
158 'activated') |
|
159 self.assertEqual(self.execute('Any N WHERE U login "adim", ' |
|
160 'U in_state S, S name N').rows[0][0], |
|
161 'activated') |
|
162 |
|
163 def test_delete(self): |
|
164 """ delete syt, pull, check deactivation, repull, |
|
165 readd syt, pull, check activation |
|
166 """ |
|
167 uri = self.repo.sources_by_uri['ldapuser'].urls[0] |
|
168 deletecmd = ("ldapdelete -H %s 'uid=syt,ou=People,dc=cubicweb,dc=test' " |
|
169 "-v -x -D cn=admin,dc=cubicweb,dc=test -w'cw'" % uri) |
|
170 os.system(deletecmd) |
|
171 self._pull() |
|
172 self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt') |
|
173 self.assertEqual(self.execute('Any N WHERE U login "syt", ' |
|
174 'U in_state S, S name N').rows[0][0], |
|
175 'deactivated') |
|
176 # check that it doesn't choke |
|
177 self._pull() |
|
178 # reset the fscking ldap thing |
|
179 self.tearDownClass() |
|
180 self.setUpClass() |
|
181 self._pull() |
|
182 self.assertEqual(self.execute('Any N WHERE U login "syt", ' |
|
183 'U in_state S, S name N').rows[0][0], |
|
184 'activated') |
|
185 # test reactivating the user isn't enough to authenticate, as the native source |
|
186 # refuse to authenticate user from other sources |
|
187 os.system(deletecmd) |
|
188 self._pull() |
|
189 user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0) |
|
190 user.cw_adapt_to('IWorkflowable').fire_transition('activate') |
|
191 self.commit() |
|
192 self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt') |
|
193 |
|
194 class LDAPFeedSourceTC(LDAPTestBase): |
|
195 test_db_id = 'ldap-feed' |
|
196 |
|
197 @classmethod |
|
198 def pre_setup_database(cls, session, config): |
|
199 session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed', |
|
200 url=URL, config=CONFIG) |
|
201 session.commit() |
|
202 isession = session.repo.internal_session(safe=True) |
|
203 lfsource = isession.repo.sources_by_uri['ldapuser'] |
|
204 stats = lfsource.pull_data(isession, force=True, raise_on_error=True) |
|
205 |
|
206 def setUp(self): |
|
207 super(LDAPFeedSourceTC, self).setUp() |
|
208 # ldap source url in the database may use a different port as the one |
|
209 # just attributed |
|
210 lfsource = self.repo.sources_by_uri['ldapuser'] |
|
211 lfsource.urls = [URL] |
|
212 |
|
213 def assertMetadata(self, entity): |
|
214 self.assertTrue(entity.creation_date) |
|
215 self.assertTrue(entity.modification_date) |
|
216 |
|
217 def test_authenticate(self): |
|
218 source = self.repo.sources_by_uri['ldapuser'] |
|
219 self.session.set_cnxset() |
|
220 # ensure we won't be logged against |
|
221 self.assertRaises(AuthenticationError, |
|
222 source.authenticate, self.session, 'toto', 'toto') |
|
223 self.assertTrue(source.authenticate(self.session, 'syt', 'syt')) |
|
224 self.assertTrue(self.repo.connect('syt', password='syt')) |
|
225 |
|
226 def test_base(self): |
|
227 # check a known one |
|
228 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
|
229 e = rset.get_entity(0, 0) |
|
230 self.assertEqual(e.login, 'syt') |
|
231 e.complete() |
|
232 self.assertMetadata(e) |
|
233 self.assertEqual(e.firstname, None) |
|
234 self.assertEqual(e.surname, None) |
|
235 self.assertEqual(e.in_group[0].name, 'users') |
|
236 self.assertEqual(e.owned_by[0].login, 'syt') |
|
237 self.assertEqual(e.created_by, ()) |
|
238 self.assertEqual(e.primary_email[0].address, 'Sylvain Thenault') |
|
239 # email content should be indexed on the user |
|
240 rset = self.sexecute('CWUser X WHERE X has_text "thenault"') |
|
241 self.assertEqual(rset.rows, [[e.eid]]) |
|
242 |
|
243 def test_copy_to_system_source(self): |
|
244 source = self.repo.sources_by_uri['ldapuser'] |
|
245 eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] |
|
246 self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid}) |
|
247 self.commit() |
|
248 source.reset_caches() |
|
249 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
|
250 self.assertEqual(len(rset), 1) |
|
251 e = rset.get_entity(0, 0) |
|
252 self.assertEqual(e.eid, eid) |
|
253 self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', |
|
254 'uri': u'system', |
|
255 'use-cwuri-as-url': False}, |
|
256 'type': 'CWUser', |
|
257 'extid': None}) |
|
258 self.assertEqual(e.cw_source[0].name, 'system') |
|
259 self.assertTrue(e.creation_date) |
|
260 self.assertTrue(e.modification_date) |
|
261 source.pull_data(self.session) |
|
262 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
|
263 self.assertEqual(len(rset), 1) |
|
264 self.assertTrue(self.repo.system_source.authenticate( |
|
265 self.session, 'syt', password='syt')) |
|
266 |
|
267 |
|
268 class LDAPUserSourceTC(LDAPFeedSourceTC): |
|
269 test_db_id = 'ldap-user' |
|
270 tags = CubicWebTC.tags | Tags(('ldap')) |
|
271 |
|
272 @classmethod |
|
273 def pre_setup_database(cls, session, config): |
|
274 session.create_entity('CWSource', name=u'ldapuser', type=u'ldapuser', |
|
275 url=URL, config=CONFIG) |
|
276 session.commit() |
|
277 # XXX keep it there |
|
278 session.execute('CWUser U') |
|
279 |
|
280 def assertMetadata(self, entity): |
|
281 self.assertEqual(entity.creation_date, None) |
|
282 self.assertEqual(entity.modification_date, None) |
|
283 |
|
284 def test_synchronize(self): |
|
285 source = self.repo.sources_by_uri['ldapuser'] |
|
286 source.synchronize() |
|
287 |
|
288 def test_base(self): |
|
289 # check a known one |
|
290 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
|
291 e = rset.get_entity(0, 0) |
|
292 self.assertEqual(e.login, 'syt') |
|
293 e.complete() |
|
294 self.assertMetadata(e) |
|
295 self.assertEqual(e.firstname, None) |
|
296 self.assertEqual(e.surname, None) |
|
297 self.assertEqual(e.in_group[0].name, 'users') |
|
298 self.assertEqual(e.owned_by[0].login, 'syt') |
|
299 self.assertEqual(e.created_by, ()) |
|
300 self.assertEqual(e.primary_email[0].address, 'Sylvain Thenault') |
|
301 # email content should be indexed on the user |
|
302 rset = self.sexecute('CWUser X WHERE X has_text "thenault"') |
|
303 self.assertEqual(rset.rows, [[e.eid]]) |
|
304 |
|
305 def test_not(self): |
|
306 eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] |
|
307 rset = self.sexecute('CWUser X WHERE NOT X eid %s' % eid) |
|
308 self.assert_(rset) |
|
309 self.assert_(not eid in (r[0] for r in rset)) |
|
310 |
|
311 def test_multiple(self): |
|
312 seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] |
|
313 aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'})[0][0] |
|
314 rset = self.sexecute('CWUser X, Y WHERE X login %(syt)s, Y login %(adim)s', |
|
315 {'syt': 'syt', 'adim': 'adim'}) |
|
316 self.assertEqual(rset.rows, [[seid, aeid]]) |
|
317 rset = self.sexecute('Any X,Y,L WHERE X login L, X login %(syt)s, Y login %(adim)s', |
|
318 {'syt': 'syt', 'adim': 'adim'}) |
|
319 self.assertEqual(rset.rows, [[seid, aeid, 'syt']]) |
|
320 |
|
321 def test_in(self): |
|
322 seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] |
|
323 aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'})[0][0] |
|
324 rset = self.sexecute('Any X,L ORDERBY L WHERE X login IN("%s", "%s"), X login L' % ('syt', 'adim')) |
|
325 self.assertEqual(rset.rows, [[aeid, 'adim'], [seid, 'syt']]) |
|
326 |
|
327 def test_relations(self): |
|
328 eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] |
|
329 rset = self.sexecute('Any X,E WHERE X is CWUser, X login L, X primary_email E') |
|
330 self.assert_(eid in (r[0] for r in rset)) |
|
331 rset = self.sexecute('Any X,L,E WHERE X is CWUser, X login L, X primary_email E') |
|
332 self.assert_('syt' in (r[1] for r in rset)) |
|
333 |
|
334 def test_count(self): |
|
335 nbusers = self.sexecute('Any COUNT(X) WHERE X is CWUser')[0][0] |
|
336 # just check this is a possible number |
|
337 self.assert_(nbusers > 1, nbusers) |
|
338 self.assert_(nbusers < 30, nbusers) |
|
339 |
|
340 def test_upper(self): |
|
341 eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] |
|
342 rset = self.sexecute('Any UPPER(L) WHERE X eid %s, X login L' % eid) |
|
343 self.assertEqual(rset[0][0], 'syt'.upper()) |
|
344 |
|
345 def test_unknown_attr(self): |
|
346 eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] |
|
347 rset = self.sexecute('Any L,C,M WHERE X eid %s, X login L, ' |
|
348 'X creation_date C, X modification_date M' % eid) |
|
349 self.assertEqual(rset[0][0], 'syt') |
|
350 self.assertEqual(rset[0][1], None) |
|
351 self.assertEqual(rset[0][2], None) |
|
352 |
|
353 def test_sort(self): |
|
354 logins = [l for l, in self.sexecute('Any L ORDERBY L WHERE X login L')] |
|
355 self.assertEqual(logins, sorted(logins)) |
|
356 |
|
357 def test_lower_sort(self): |
|
358 logins = [l for l, in self.sexecute('Any L ORDERBY lower(L) WHERE X login L')] |
|
359 self.assertEqual(logins, sorted(logins)) |
|
360 |
|
361 def test_or(self): |
|
362 rset = self.sexecute('DISTINCT Any X WHERE X login %(login)s OR (X in_group G, G name "managers")', |
|
363 {'login': 'syt'}) |
|
364 self.assertEqual(len(rset), 2, rset.rows) # syt + admin |
|
365 |
|
366 def test_nonregr_set_owned_by(self): |
|
367 # test that when a user coming from ldap is triggering a transition |
|
368 # the related TrInfo has correct owner information |
|
369 self.sexecute('SET X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': 'syt'}) |
|
370 self.commit() |
|
371 syt = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}).get_entity(0, 0) |
|
372 self.assertEqual([g.name for g in syt.in_group], ['managers', 'users']) |
|
373 cnx = self.login('syt', password='syt') |
|
374 cu = cnx.cursor() |
|
375 adim = cu.execute('CWUser X WHERE X login %(login)s', {'login': 'adim'}).get_entity(0, 0) |
|
376 iworkflowable = adim.cw_adapt_to('IWorkflowable') |
|
377 iworkflowable.fire_transition('deactivate') |
|
378 try: |
|
379 cnx.commit() |
|
380 adim.cw_clear_all_caches() |
|
381 self.assertEqual(adim.in_state[0].name, 'deactivated') |
|
382 trinfo = iworkflowable.latest_trinfo() |
|
383 self.assertEqual(trinfo.owned_by[0].login, 'syt') |
|
384 # select from_state to skip the user's creation TrInfo |
|
385 rset = self.sexecute('Any U ORDERBY D DESC WHERE WF wf_info_for X,' |
|
386 'WF creation_date D, WF from_state FS,' |
|
387 'WF owned_by U?, X eid %(x)s', |
|
388 {'x': adim.eid}) |
|
389 self.assertEqual(rset.rows, [[syt.eid]]) |
|
390 finally: |
|
391 # restore db state |
|
392 self.restore_connection() |
|
393 adim = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'}).get_entity(0, 0) |
|
394 adim.cw_adapt_to('IWorkflowable').fire_transition('activate') |
|
395 self.sexecute('DELETE X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': 'syt'}) |
|
396 |
|
397 def test_same_column_names(self): |
|
398 self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"') |
|
399 |
|
400 def test_multiple_entities_from_different_sources(self): |
|
401 req = self.request() |
|
402 self.create_user(req, 'cochon') |
|
403 self.assertTrue(self.sexecute('Any X,Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'})) |
|
404 |
|
405 def test_exists1(self): |
|
406 self.session.set_cnxset() |
|
407 self.session.create_entity('CWGroup', name=u'bougloup1') |
|
408 self.session.create_entity('CWGroup', name=u'bougloup2') |
|
409 self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"') |
|
410 self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s', {'syt': 'syt'}) |
|
411 rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, ' |
|
412 'S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")') |
|
413 self.assertEqual(rset.rows, [['admin', 'activated'], ['syt', 'activated']]) |
|
414 |
|
415 def test_exists2(self): |
|
416 req = self.request() |
|
417 self.create_user(req, 'comme') |
|
418 self.create_user(req, 'cochon') |
|
419 self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') |
|
420 rset = self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, ' |
|
421 '(G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))') |
|
422 self.assertEqual(rset.rows, [['managers'], ['users']]) |
|
423 |
|
424 def test_exists3(self): |
|
425 req = self.request() |
|
426 self.create_user(req, 'comme') |
|
427 self.create_user(req, 'cochon') |
|
428 self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') |
|
429 self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')) |
|
430 self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'}) |
|
431 self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': 'syt'})) |
|
432 rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" ' |
|
433 'OR EXISTS(X copain T, T login in ("comme", "cochon"))') |
|
434 self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', 'syt']]) |
|
435 |
|
436 def test_exists4(self): |
|
437 req = self.request() |
|
438 self.create_user(req, 'comme') |
|
439 self.create_user(req, 'cochon', groups=('users', 'guests')) |
|
440 self.create_user(req, 'billy') |
|
441 self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') |
|
442 self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"') |
|
443 self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"') |
|
444 self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "billy"', {'syt': 'syt'}) |
|
445 # search for group name, login where |
|
446 # CWUser copain with "comme" or "cochon" AND same login as the copain |
|
447 # OR |
|
448 # CWUser in_state activated AND not copain with billy |
|
449 # |
|
450 # SO we expect everybody but "comme" and "syt" |
|
451 rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, ' |
|
452 'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR ' |
|
453 'EXISTS(X in_state S, S name "activated", NOT X copain T2, T2 login "billy")') |
|
454 all = self.sexecute('Any GN, L WHERE X in_group G, X login L, G name GN') |
|
455 all.rows.remove(['users', 'comme']) |
|
456 all.rows.remove(['users', 'syt']) |
|
457 self.assertEqual(sorted(rset.rows), sorted(all.rows)) |
|
458 |
|
459 def test_exists5(self): |
|
460 req = self.request() |
|
461 self.create_user(req, 'comme') |
|
462 self.create_user(req, 'cochon', groups=('users', 'guests')) |
|
463 self.create_user(req, 'billy') |
|
464 self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') |
|
465 self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"') |
|
466 self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"') |
|
467 self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'}) |
|
468 rset= self.sexecute('Any L WHERE X login L, ' |
|
469 'EXISTS(X copain T, T login in ("comme", "cochon")) AND ' |
|
470 'NOT EXISTS(X copain T2, T2 login "billy")') |
|
471 self.assertEqual(sorted(rset.rows), [['cochon'], ['syt']]) |
|
472 rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, ' |
|
473 'EXISTS(X copain T, T login in ("comme", "cochon")) AND ' |
|
474 'NOT EXISTS(X copain T2, T2 login "billy")') |
|
475 self.assertEqual(sorted(rset.rows), [['guests', 'cochon'], |
|
476 ['users', 'cochon'], |
|
477 ['users', 'syt']]) |
|
478 |
|
479 def test_cd_restriction(self): |
|
480 rset = self.sexecute('CWUser X WHERE X creation_date > "2009-02-01"') |
|
481 # admin/anon but no ldap user since it doesn't support creation_date |
|
482 self.assertEqual(sorted(e.login for e in rset.entities()), |
|
483 ['admin', 'anon']) |
|
484 |
|
485 def test_union(self): |
|
486 afeids = self.sexecute('State X') |
|
487 ueids = self.sexecute('CWUser X') |
|
488 rset = self.sexecute('(Any X WHERE X is State) UNION (Any X WHERE X is CWUser)') |
|
489 self.assertEqual(sorted(r[0] for r in rset.rows), |
|
490 sorted(r[0] for r in afeids + ueids)) |
|
491 |
|
492 def _init_security_test(self): |
|
493 req = self.request() |
|
494 self.create_user(req, 'iaminguestsgrouponly', groups=('guests',)) |
|
495 cnx = self.login('iaminguestsgrouponly') |
|
496 return cnx.cursor() |
|
497 |
|
498 def test_security1(self): |
|
499 cu = self._init_security_test() |
|
500 rset = cu.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
|
501 self.assertEqual(rset.rows, []) |
|
502 rset = cu.execute('Any X WHERE X login "iaminguestsgrouponly"') |
|
503 self.assertEqual(len(rset.rows), 1) |
|
504 |
|
505 def test_security2(self): |
|
506 cu = self._init_security_test() |
|
507 rset = cu.execute('Any X WHERE X has_text %(syt)s', {'syt': 'syt'}) |
|
508 self.assertEqual(rset.rows, []) |
|
509 rset = cu.execute('Any X WHERE X has_text "iaminguestsgrouponly"') |
|
510 self.assertEqual(len(rset.rows), 1) |
|
511 |
|
512 def test_security3(self): |
|
513 cu = self._init_security_test() |
|
514 rset = cu.execute('Any F WHERE X has_text %(syt)s, X firstname F', {'syt': 'syt'}) |
|
515 self.assertEqual(rset.rows, []) |
|
516 rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F') |
|
517 self.assertEqual(rset.rows, [[None]]) |
|
518 |
|
519 def test_nonregr1(self): |
|
520 self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, ' |
|
521 'X modification_date AA', |
|
522 {'x': self.session.user.eid}) |
|
523 |
|
524 def test_nonregr2(self): |
|
525 self.sexecute('Any X,L,AA WHERE E eid %(x)s, E owned_by X, ' |
|
526 'X login L, X modification_date AA', |
|
527 {'x': self.session.user.eid}) |
|
528 |
|
529 def test_nonregr3(self): |
|
530 self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, ' |
|
531 'X modification_date AA', |
|
532 {'x': self.session.user.eid}) |
|
533 |
|
534 def test_nonregr4(self): |
|
535 emaileid = self.sexecute('INSERT EmailAddress X: X address "toto@logilab.org"')[0][0] |
|
536 self.sexecute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA', |
|
537 {'x': emaileid}) |
|
538 |
|
539 def test_nonregr5(self): |
|
540 # original jpl query: |
|
541 # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, |
|
542 # U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5 |
|
543 rql = ('Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, ' |
|
544 'U login "%s", P is X, X creation_date CD') % self.session.user.login |
|
545 self.sexecute(rql, )#{'x': }) |
|
546 |
|
547 def test_nonregr6(self): |
|
548 self.sexecute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File ' |
|
549 'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) ' |
|
550 'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) ' |
|
551 'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)', |
|
552 {'x': self.session.user.eid}) |
|
553 |
|
554 class GlobTrFuncTC(TestCase): |
|
555 |
|
556 def test_count(self): |
|
557 trfunc = GlobTrFunc('count', 0) |
|
558 res = trfunc.apply([[1], [2], [3], [4]]) |
|
559 self.assertEqual(res, [[4]]) |
|
560 trfunc = GlobTrFunc('count', 1) |
|
561 res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) |
|
562 self.assertEqual(res, [[1, 2], [2, 1], [3, 1]]) |
|
563 |
|
564 def test_sum(self): |
|
565 trfunc = GlobTrFunc('sum', 0) |
|
566 res = trfunc.apply([[1], [2], [3], [4]]) |
|
567 self.assertEqual(res, [[10]]) |
|
568 trfunc = GlobTrFunc('sum', 1) |
|
569 res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) |
|
570 self.assertEqual(res, [[1, 7], [2, 4], [3, 6]]) |
|
571 |
|
572 def test_min(self): |
|
573 trfunc = GlobTrFunc('min', 0) |
|
574 res = trfunc.apply([[1], [2], [3], [4]]) |
|
575 self.assertEqual(res, [[1]]) |
|
576 trfunc = GlobTrFunc('min', 1) |
|
577 res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) |
|
578 self.assertEqual(res, [[1, 2], [2, 4], [3, 6]]) |
|
579 |
|
580 def test_max(self): |
|
581 trfunc = GlobTrFunc('max', 0) |
|
582 res = trfunc.apply([[1], [2], [3], [4]]) |
|
583 self.assertEqual(res, [[4]]) |
|
584 trfunc = GlobTrFunc('max', 1) |
|
585 res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) |
|
586 self.assertEqual(res, [[1, 5], [2, 4], [3, 6]]) |
|
587 |
|
588 |
|
589 class RQL2LDAPFilterTC(RQLGeneratorTC): |
|
590 |
|
591 tags = RQLGeneratorTC.tags | Tags(('ldap')) |
|
592 |
|
593 @property |
|
594 def schema(self): |
|
595 """return the application schema""" |
|
596 return self._schema |
|
597 |
|
598 def setUp(self): |
|
599 self.handler = get_test_db_handler(LDAPUserSourceTC.config) |
|
600 self.handler.build_db_cache('ldap-user', LDAPUserSourceTC.pre_setup_database) |
|
601 self.handler.restore_database('ldap-user') |
|
602 self._repo = repo = self.handler.get_repo() |
|
603 self._schema = repo.schema |
|
604 super(RQL2LDAPFilterTC, self).setUp() |
|
605 ldapsource = repo.sources[-1] |
|
606 self.cnxset = repo._get_cnxset() |
|
607 session = mock_object(cnxset=self.cnxset) |
|
608 self.o = RQL2LDAPFilter(ldapsource, session) |
|
609 self.ldapclasses = ''.join(ldapsource.base_filters) |
|
610 |
|
611 def tearDown(self): |
|
612 self._repo.turn_repo_off() |
|
613 super(RQL2LDAPFilterTC, self).tearDown() |
|
614 |
|
615 def test_base(self): |
|
616 rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0] |
|
617 self.assertEqual(self.o.generate(rqlst, 'X')[1], |
|
618 '(&%s(uid=toto))' % self.ldapclasses) |
|
619 |
|
620 def test_kwargs(self): |
|
621 rqlst = self._prepare('CWUser X WHERE X login %(x)s').children[0] |
|
622 self.o._args = {'x': "toto"} |
|
623 self.assertEqual(self.o.generate(rqlst, 'X')[1], |
|
624 '(&%s(uid=toto))' % self.ldapclasses) |
|
625 |
|
626 def test_get_attr(self): |
|
627 rqlst = self._prepare('Any X WHERE E firstname X, E eid 12').children[0] |
|
628 self.assertRaises(UnknownEid, self.o.generate, rqlst, 'E') |
|
629 |
|
630 |
|
631 if __name__ == '__main__': |
|
632 unittest_main() |