|
1 # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
3 # |
|
4 # This file is part of CubicWeb. |
|
5 # |
|
6 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
7 # terms of the GNU Lesser General Public License as published by the Free |
|
8 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
9 # any later version. |
|
10 # |
|
11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
13 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
14 # details. |
|
15 # |
|
16 # You should have received a copy of the GNU Lesser General Public License along |
|
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
18 """functional tests for server'security""" |
|
19 |
|
20 from six.moves import range |
|
21 |
|
22 from logilab.common.testlib import unittest_main |
|
23 |
|
24 from cubicweb.devtools.testlib import CubicWebTC |
|
25 from cubicweb import Unauthorized, ValidationError, QueryError, Binary |
|
26 from cubicweb.schema import ERQLExpression |
|
27 from cubicweb.server.querier import get_local_checks, check_relations_read_access |
|
28 from cubicweb.server.utils import _CRYPTO_CTX |
|
29 |
|
30 |
|
31 class BaseSecurityTC(CubicWebTC): |
|
32 |
|
33 def setup_database(self): |
|
34 super(BaseSecurityTC, self).setup_database() |
|
35 with self.admin_access.client_cnx() as cnx: |
|
36 self.create_user(cnx, u'iaminusersgrouponly') |
|
37 hash = _CRYPTO_CTX.encrypt('oldpassword', scheme='des_crypt') |
|
38 self.create_user(cnx, u'oldpassword', password=Binary(hash.encode('ascii'))) |
|
39 |
|
40 class LowLevelSecurityFunctionTC(BaseSecurityTC): |
|
41 |
|
42 def test_check_relation_read_access(self): |
|
43 rql = u'Personne U WHERE U nom "managers"' |
|
44 rqlst = self.repo.vreg.rqlhelper.parse(rql).children[0] |
|
45 nom = self.repo.schema['Personne'].rdef('nom') |
|
46 with self.temporary_permissions((nom, {'read': ('users', 'managers')})): |
|
47 with self.admin_access.repo_cnx() as cnx: |
|
48 self.repo.vreg.solutions(cnx, rqlst, None) |
|
49 check_relations_read_access(cnx, rqlst, {}) |
|
50 with self.new_access(u'anon').repo_cnx() as cnx: |
|
51 self.assertRaises(Unauthorized, |
|
52 check_relations_read_access, |
|
53 cnx, rqlst, {}) |
|
54 self.assertRaises(Unauthorized, cnx.execute, rql) |
|
55 |
|
56 def test_get_local_checks(self): |
|
57 rql = u'Personne U WHERE U nom "managers"' |
|
58 rqlst = self.repo.vreg.rqlhelper.parse(rql).children[0] |
|
59 with self.temporary_permissions(Personne={'read': ('users', 'managers')}): |
|
60 with self.admin_access.repo_cnx() as cnx: |
|
61 self.repo.vreg.solutions(cnx, rqlst, None) |
|
62 solution = rqlst.solutions[0] |
|
63 localchecks = get_local_checks(cnx, rqlst, solution) |
|
64 self.assertEqual({}, localchecks) |
|
65 with self.new_access(u'anon').repo_cnx() as cnx: |
|
66 self.assertRaises(Unauthorized, |
|
67 get_local_checks, |
|
68 cnx, rqlst, solution) |
|
69 self.assertRaises(Unauthorized, cnx.execute, rql) |
|
70 |
|
71 def test_upassword_not_selectable(self): |
|
72 with self.admin_access.repo_cnx() as cnx: |
|
73 self.assertRaises(Unauthorized, |
|
74 cnx.execute, 'Any X,P WHERE X is CWUser, X upassword P') |
|
75 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
76 self.assertRaises(Unauthorized, |
|
77 cnx.execute, 'Any X,P WHERE X is CWUser, X upassword P') |
|
78 |
|
79 def test_update_password(self): |
|
80 """Ensure that if a user's password is stored with a deprecated hash, |
|
81 it will be updated on next login |
|
82 """ |
|
83 with self.repo.internal_cnx() as cnx: |
|
84 oldhash = cnx.system_sql("SELECT cw_upassword FROM cw_CWUser " |
|
85 "WHERE cw_login = 'oldpassword'").fetchone()[0] |
|
86 oldhash = self.repo.system_source.binary_to_str(oldhash) |
|
87 self.repo.close(self.repo.connect('oldpassword', password='oldpassword')) |
|
88 newhash = cnx.system_sql("SELECT cw_upassword FROM cw_CWUser " |
|
89 "WHERE cw_login = 'oldpassword'").fetchone()[0] |
|
90 newhash = self.repo.system_source.binary_to_str(newhash) |
|
91 self.assertNotEqual(oldhash, newhash) |
|
92 self.assertTrue(newhash.startswith(b'$6$')) |
|
93 self.repo.close(self.repo.connect('oldpassword', password='oldpassword')) |
|
94 newnewhash = cnx.system_sql("SELECT cw_upassword FROM cw_CWUser WHERE " |
|
95 "cw_login = 'oldpassword'").fetchone()[0] |
|
96 newnewhash = self.repo.system_source.binary_to_str(newnewhash) |
|
97 self.assertEqual(newhash, newnewhash) |
|
98 |
|
99 |
|
100 class SecurityRewritingTC(BaseSecurityTC): |
|
101 def hijack_source_execute(self): |
|
102 def syntax_tree_search(*args, **kwargs): |
|
103 self.query = (args, kwargs) |
|
104 return [] |
|
105 self.repo.system_source.syntax_tree_search = syntax_tree_search |
|
106 |
|
107 def tearDown(self): |
|
108 self.repo.system_source.__dict__.pop('syntax_tree_search', None) |
|
109 super(SecurityRewritingTC, self).tearDown() |
|
110 |
|
111 def test_not_relation_read_security(self): |
|
112 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
113 self.hijack_source_execute() |
|
114 cnx.execute('Any U WHERE NOT A todo_by U, A is Affaire') |
|
115 self.assertEqual(self.query[0][1].as_string(), |
|
116 'Any U WHERE NOT EXISTS(A todo_by U), A is Affaire') |
|
117 cnx.execute('Any U WHERE NOT EXISTS(A todo_by U), A is Affaire') |
|
118 self.assertEqual(self.query[0][1].as_string(), |
|
119 'Any U WHERE NOT EXISTS(A todo_by U), A is Affaire') |
|
120 |
|
121 class SecurityTC(BaseSecurityTC): |
|
122 |
|
123 def setUp(self): |
|
124 super(SecurityTC, self).setUp() |
|
125 # implicitly test manager can add some entities |
|
126 with self.admin_access.repo_cnx() as cnx: |
|
127 cnx.execute("INSERT Affaire X: X sujet 'cool'") |
|
128 cnx.execute("INSERT Societe X: X nom 'logilab'") |
|
129 cnx.execute("INSERT Personne X: X nom 'bidule'") |
|
130 cnx.execute('INSERT CWGroup X: X name "staff"') |
|
131 cnx.commit() |
|
132 |
|
133 def test_insert_security(self): |
|
134 with self.new_access(u'anon').repo_cnx() as cnx: |
|
135 cnx.execute("INSERT Personne X: X nom 'bidule'") |
|
136 self.assertRaises(Unauthorized, cnx.commit) |
|
137 self.assertEqual(cnx.execute('Personne X').rowcount, 1) |
|
138 |
|
139 def test_insert_security_2(self): |
|
140 with self.new_access(u'anon').repo_cnx() as cnx: |
|
141 cnx.execute("INSERT Affaire X") |
|
142 self.assertRaises(Unauthorized, cnx.commit) |
|
143 # anon has no read permission on Affaire entities, so |
|
144 # rowcount == 0 |
|
145 self.assertEqual(cnx.execute('Affaire X').rowcount, 0) |
|
146 |
|
147 def test_insert_rql_permission(self): |
|
148 # test user can only add une affaire related to a societe he owns |
|
149 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
150 cnx.execute("INSERT Affaire X: X sujet 'cool'") |
|
151 self.assertRaises(Unauthorized, cnx.commit) |
|
152 # test nothing has actually been inserted |
|
153 with self.admin_access.repo_cnx() as cnx: |
|
154 self.assertEqual(cnx.execute('Affaire X').rowcount, 1) |
|
155 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
156 cnx.execute("INSERT Affaire X: X sujet 'cool'") |
|
157 cnx.execute("INSERT Societe X: X nom 'chouette'") |
|
158 cnx.execute("SET A concerne S WHERE A sujet 'cool', S nom 'chouette'") |
|
159 cnx.commit() |
|
160 |
|
161 def test_update_security_1(self): |
|
162 with self.new_access(u'anon').repo_cnx() as cnx: |
|
163 # local security check |
|
164 cnx.execute( "SET X nom 'bidulechouette' WHERE X is Personne") |
|
165 self.assertRaises(Unauthorized, cnx.commit) |
|
166 with self.admin_access.repo_cnx() as cnx: |
|
167 self.assertEqual(cnx.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0) |
|
168 |
|
169 def test_update_security_2(self): |
|
170 with self.temporary_permissions(Personne={'read': ('users', 'managers'), |
|
171 'add': ('guests', 'users', 'managers')}): |
|
172 with self.new_access(u'anon').repo_cnx() as cnx: |
|
173 self.assertRaises(Unauthorized, cnx.execute, |
|
174 "SET X nom 'bidulechouette' WHERE X is Personne") |
|
175 # test nothing has actually been inserted |
|
176 with self.admin_access.repo_cnx() as cnx: |
|
177 self.assertEqual(cnx.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0) |
|
178 |
|
179 def test_update_security_3(self): |
|
180 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
181 cnx.execute("INSERT Personne X: X nom 'biduuule'") |
|
182 cnx.execute("INSERT Societe X: X nom 'looogilab'") |
|
183 cnx.execute("SET X travaille S WHERE X nom 'biduuule', S nom 'looogilab'") |
|
184 |
|
185 def test_insert_immutable_attribute_update(self): |
|
186 with self.admin_access.repo_cnx() as cnx: |
|
187 cnx.create_entity('Old', name=u'Babar') |
|
188 cnx.commit() |
|
189 # this should be equivalent |
|
190 o = cnx.create_entity('Old') |
|
191 o.cw_set(name=u'Celeste') |
|
192 cnx.commit() |
|
193 |
|
194 def test_update_rql_permission(self): |
|
195 with self.admin_access.repo_cnx() as cnx: |
|
196 cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
|
197 cnx.commit() |
|
198 # test user can only update une affaire related to a societe he owns |
|
199 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
200 cnx.execute("SET X sujet 'pascool' WHERE X is Affaire") |
|
201 # this won't actually do anything since the selection query won't return anything |
|
202 cnx.commit() |
|
203 # to actually get Unauthorized exception, try to update an entity we can read |
|
204 cnx.execute("SET X nom 'toto' WHERE X is Societe") |
|
205 self.assertRaises(Unauthorized, cnx.commit) |
|
206 cnx.execute("INSERT Affaire X: X sujet 'pascool'") |
|
207 cnx.execute("INSERT Societe X: X nom 'chouette'") |
|
208 cnx.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'") |
|
209 cnx.execute("SET X sujet 'habahsicestcool' WHERE X sujet 'pascool'") |
|
210 cnx.commit() |
|
211 |
|
212 def test_delete_security(self): |
|
213 # FIXME: sample below fails because we don't detect "owner" can't delete |
|
214 # user anyway, and since no user with login == 'bidule' exists, no |
|
215 # exception is raised |
|
216 #user._groups = {'guests':1} |
|
217 #self.assertRaises(Unauthorized, |
|
218 # self.o.execute, user, "DELETE CWUser X WHERE X login 'bidule'") |
|
219 # check local security |
|
220 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
221 self.assertRaises(Unauthorized, cnx.execute, "DELETE CWGroup Y WHERE Y name 'staff'") |
|
222 |
|
223 def test_delete_rql_permission(self): |
|
224 with self.admin_access.repo_cnx() as cnx: |
|
225 cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
|
226 cnx.commit() |
|
227 # test user can only dele une affaire related to a societe he owns |
|
228 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
229 # this won't actually do anything since the selection query won't return anything |
|
230 cnx.execute("DELETE Affaire X") |
|
231 cnx.commit() |
|
232 # to actually get Unauthorized exception, try to delete an entity we can read |
|
233 self.assertRaises(Unauthorized, cnx.execute, "DELETE Societe S") |
|
234 self.assertRaises(QueryError, cnx.commit) # can't commit anymore |
|
235 cnx.rollback() |
|
236 cnx.execute("INSERT Affaire X: X sujet 'pascool'") |
|
237 cnx.execute("INSERT Societe X: X nom 'chouette'") |
|
238 cnx.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'") |
|
239 cnx.commit() |
|
240 ## # this one should fail since it will try to delete two affaires, one authorized |
|
241 ## # and the other not |
|
242 ## self.assertRaises(Unauthorized, cnx.execute, "DELETE Affaire X") |
|
243 cnx.execute("DELETE Affaire X WHERE X sujet 'pascool'") |
|
244 cnx.commit() |
|
245 |
|
246 def test_insert_relation_rql_permission(self): |
|
247 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
248 cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
|
249 # should raise Unauthorized since user don't own S though this won't |
|
250 # actually do anything since the selection query won't return |
|
251 # anything |
|
252 cnx.commit() |
|
253 # to actually get Unauthorized exception, try to insert a relation |
|
254 # were we can read both entities |
|
255 rset = cnx.execute('Personne P') |
|
256 self.assertEqual(len(rset), 1) |
|
257 ent = rset.get_entity(0, 0) |
|
258 self.assertFalse(cnx.execute('Any P,S WHERE P travaille S,P is Personne, S is Societe')) |
|
259 self.assertRaises(Unauthorized, ent.cw_check_perm, 'update') |
|
260 self.assertRaises(Unauthorized, |
|
261 cnx.execute, "SET P travaille S WHERE P is Personne, S is Societe") |
|
262 self.assertRaises(QueryError, cnx.commit) # can't commit anymore |
|
263 cnx.rollback() |
|
264 # test nothing has actually been inserted: |
|
265 self.assertFalse(cnx.execute('Any P,S WHERE P travaille S,P is Personne, S is Societe')) |
|
266 cnx.execute("INSERT Societe X: X nom 'chouette'") |
|
267 cnx.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'") |
|
268 cnx.commit() |
|
269 |
|
270 def test_delete_relation_rql_permission(self): |
|
271 with self.admin_access.repo_cnx() as cnx: |
|
272 cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
|
273 cnx.commit() |
|
274 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
275 # this won't actually do anything since the selection query won't return anything |
|
276 cnx.execute("DELETE A concerne S") |
|
277 cnx.commit() |
|
278 with self.admin_access.repo_cnx() as cnx: |
|
279 # to actually get Unauthorized exception, try to delete a relation we can read |
|
280 eid = cnx.execute("INSERT Affaire X: X sujet 'pascool'")[0][0] |
|
281 cnx.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"', |
|
282 {'x': eid}) |
|
283 cnx.execute("SET A concerne S WHERE A sujet 'pascool', S is Societe") |
|
284 cnx.commit() |
|
285 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
286 self.assertRaises(Unauthorized, cnx.execute, "DELETE A concerne S") |
|
287 self.assertRaises(QueryError, cnx.commit) # can't commit anymore |
|
288 cnx.rollback() |
|
289 cnx.execute("INSERT Societe X: X nom 'chouette'") |
|
290 cnx.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'") |
|
291 cnx.commit() |
|
292 cnx.execute("DELETE A concerne S WHERE S nom 'chouette'") |
|
293 cnx.commit() |
|
294 |
|
295 |
|
296 def test_user_can_change_its_upassword(self): |
|
297 with self.admin_access.repo_cnx() as cnx: |
|
298 ueid = self.create_user(cnx, u'user').eid |
|
299 with self.new_access(u'user').repo_cnx() as cnx: |
|
300 cnx.execute('SET X upassword %(passwd)s WHERE X eid %(x)s', |
|
301 {'x': ueid, 'passwd': b'newpwd'}) |
|
302 cnx.commit() |
|
303 self.repo.close(self.repo.connect('user', password='newpwd')) |
|
304 |
|
305 def test_user_cant_change_other_upassword(self): |
|
306 with self.admin_access.repo_cnx() as cnx: |
|
307 ueid = self.create_user(cnx, u'otheruser').eid |
|
308 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
309 cnx.execute('SET X upassword %(passwd)s WHERE X eid %(x)s', |
|
310 {'x': ueid, 'passwd': b'newpwd'}) |
|
311 self.assertRaises(Unauthorized, cnx.commit) |
|
312 |
|
313 # read security test |
|
314 |
|
315 def test_read_base(self): |
|
316 with self.temporary_permissions(Personne={'read': ('users', 'managers')}): |
|
317 with self.new_access(u'anon').repo_cnx() as cnx: |
|
318 self.assertRaises(Unauthorized, |
|
319 cnx.execute, 'Personne U where U nom "managers"') |
|
320 |
|
321 def test_read_erqlexpr_base(self): |
|
322 with self.admin_access.repo_cnx() as cnx: |
|
323 eid = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
|
324 cnx.commit() |
|
325 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
326 rset = cnx.execute('Affaire X') |
|
327 self.assertEqual(rset.rows, []) |
|
328 self.assertRaises(Unauthorized, cnx.execute, 'Any X WHERE X eid %(x)s', {'x': eid}) |
|
329 # cache test |
|
330 self.assertRaises(Unauthorized, cnx.execute, 'Any X WHERE X eid %(x)s', {'x': eid}) |
|
331 aff2 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
|
332 soc1 = cnx.execute("INSERT Societe X: X nom 'chouette'")[0][0] |
|
333 cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
|
334 cnx.commit() |
|
335 rset = cnx.execute('Any X WHERE X eid %(x)s', {'x': aff2}) |
|
336 self.assertEqual(rset.rows, [[aff2]]) |
|
337 # more cache test w/ NOT eid |
|
338 rset = cnx.execute('Affaire X WHERE NOT X eid %(x)s', {'x': eid}) |
|
339 self.assertEqual(rset.rows, [[aff2]]) |
|
340 rset = cnx.execute('Affaire X WHERE NOT X eid %(x)s', {'x': aff2}) |
|
341 self.assertEqual(rset.rows, []) |
|
342 # test can't update an attribute of an entity that can't be readen |
|
343 self.assertRaises(Unauthorized, cnx.execute, |
|
344 'SET X sujet "hacked" WHERE X eid %(x)s', {'x': eid}) |
|
345 |
|
346 |
|
347 def test_entity_created_in_transaction(self): |
|
348 affschema = self.schema['Affaire'] |
|
349 with self.temporary_permissions(Affaire={'read': affschema.permissions['add']}): |
|
350 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
351 aff2 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
|
352 # entity created in transaction are readable *by eid* |
|
353 self.assertTrue(cnx.execute('Any X WHERE X eid %(x)s', {'x':aff2})) |
|
354 # XXX would be nice if it worked |
|
355 rset = cnx.execute("Affaire X WHERE X sujet 'cool'") |
|
356 self.assertEqual(len(rset), 0) |
|
357 self.assertRaises(Unauthorized, cnx.commit) |
|
358 |
|
359 def test_read_erqlexpr_has_text1(self): |
|
360 with self.admin_access.repo_cnx() as cnx: |
|
361 aff1 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
|
362 card1 = cnx.execute("INSERT Card X: X title 'cool'")[0][0] |
|
363 cnx.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"', |
|
364 {'x': card1}) |
|
365 cnx.commit() |
|
366 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
367 aff2 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
|
368 soc1 = cnx.execute("INSERT Societe X: X nom 'chouette'")[0][0] |
|
369 cnx.execute("SET A concerne S WHERE A eid %(a)s, S eid %(s)s", {'a': aff2, 's': soc1}) |
|
370 cnx.commit() |
|
371 self.assertRaises(Unauthorized, cnx.execute, 'Any X WHERE X eid %(x)s', {'x':aff1}) |
|
372 self.assertTrue(cnx.execute('Any X WHERE X eid %(x)s', {'x':aff2})) |
|
373 self.assertTrue(cnx.execute('Any X WHERE X eid %(x)s', {'x':card1})) |
|
374 rset = cnx.execute("Any X WHERE X has_text 'cool'") |
|
375 self.assertEqual(sorted(eid for eid, in rset.rows), |
|
376 [card1, aff2]) |
|
377 |
|
378 def test_read_erqlexpr_has_text2(self): |
|
379 with self.admin_access.repo_cnx() as cnx: |
|
380 cnx.execute("INSERT Personne X: X nom 'bidule'") |
|
381 cnx.execute("INSERT Societe X: X nom 'bidule'") |
|
382 cnx.commit() |
|
383 with self.temporary_permissions(Personne={'read': ('managers',)}): |
|
384 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
385 rset = cnx.execute('Any N WHERE N has_text "bidule"') |
|
386 self.assertEqual(len(rset.rows), 1, rset.rows) |
|
387 rset = cnx.execute('Any N WITH N BEING (Any N WHERE N has_text "bidule")') |
|
388 self.assertEqual(len(rset.rows), 1, rset.rows) |
|
389 |
|
390 def test_read_erqlexpr_optional_rel(self): |
|
391 with self.admin_access.repo_cnx() as cnx: |
|
392 cnx.execute("INSERT Personne X: X nom 'bidule'") |
|
393 cnx.execute("INSERT Societe X: X nom 'bidule'") |
|
394 cnx.commit() |
|
395 with self.temporary_permissions(Personne={'read': ('managers',)}): |
|
396 with self.new_access(u'anon').repo_cnx() as cnx: |
|
397 rset = cnx.execute('Any N,U WHERE N has_text "bidule", N owned_by U?') |
|
398 self.assertEqual(len(rset.rows), 1, rset.rows) |
|
399 |
|
400 def test_read_erqlexpr_aggregat(self): |
|
401 with self.admin_access.repo_cnx() as cnx: |
|
402 cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
|
403 cnx.commit() |
|
404 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
405 rset = cnx.execute('Any COUNT(X) WHERE X is Affaire') |
|
406 self.assertEqual(rset.rows, [[0]]) |
|
407 aff2 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
|
408 soc1 = cnx.execute("INSERT Societe X: X nom 'chouette'")[0][0] |
|
409 cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
|
410 cnx.commit() |
|
411 rset = cnx.execute('Any COUNT(X) WHERE X is Affaire') |
|
412 self.assertEqual(rset.rows, [[1]]) |
|
413 rset = cnx.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN') |
|
414 values = dict(rset) |
|
415 self.assertEqual(values['Affaire'], 1) |
|
416 self.assertEqual(values['Societe'], 2) |
|
417 rset = cnx.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN ' |
|
418 'WITH X BEING ((Affaire X) UNION (Societe X))') |
|
419 self.assertEqual(len(rset), 2) |
|
420 values = dict(rset) |
|
421 self.assertEqual(values['Affaire'], 1) |
|
422 self.assertEqual(values['Societe'], 2) |
|
423 |
|
424 |
|
425 def test_attribute_security(self): |
|
426 with self.admin_access.repo_cnx() as cnx: |
|
427 # only managers should be able to edit the 'test' attribute of Personne entities |
|
428 eid = cnx.execute("INSERT Personne X: X nom 'bidule', " |
|
429 "X web 'http://www.debian.org', X test TRUE")[0][0] |
|
430 cnx.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid}) |
|
431 cnx.commit() |
|
432 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
433 cnx.execute("INSERT Personne X: X nom 'bidule', " |
|
434 "X web 'http://www.debian.org', X test TRUE") |
|
435 self.assertRaises(Unauthorized, cnx.commit) |
|
436 cnx.execute("INSERT Personne X: X nom 'bidule', " |
|
437 "X web 'http://www.debian.org', X test FALSE") |
|
438 self.assertRaises(Unauthorized, cnx.commit) |
|
439 eid = cnx.execute("INSERT Personne X: X nom 'bidule', " |
|
440 "X web 'http://www.debian.org'")[0][0] |
|
441 cnx.commit() |
|
442 cnx.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid}) |
|
443 self.assertRaises(Unauthorized, cnx.commit) |
|
444 cnx.execute('SET X test TRUE WHERE X eid %(x)s', {'x': eid}) |
|
445 self.assertRaises(Unauthorized, cnx.commit) |
|
446 cnx.execute('SET X web "http://www.logilab.org" WHERE X eid %(x)s', {'x': eid}) |
|
447 cnx.commit() |
|
448 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
449 cnx.execute('INSERT Frozable F: F name "Foo"') |
|
450 cnx.commit() |
|
451 cnx.execute('SET F name "Bar" WHERE F is Frozable') |
|
452 cnx.commit() |
|
453 cnx.execute('SET F name "BaBar" WHERE F is Frozable') |
|
454 cnx.execute('SET F frozen True WHERE F is Frozable') |
|
455 with self.assertRaises(Unauthorized): |
|
456 cnx.commit() |
|
457 cnx.rollback() |
|
458 cnx.execute('SET F frozen True WHERE F is Frozable') |
|
459 cnx.commit() |
|
460 cnx.execute('SET F name "Bar" WHERE F is Frozable') |
|
461 with self.assertRaises(Unauthorized): |
|
462 cnx.commit() |
|
463 |
|
464 def test_attribute_security_rqlexpr(self): |
|
465 with self.admin_access.repo_cnx() as cnx: |
|
466 # Note.para attribute editable by managers or if the note is in "todo" state |
|
467 note = cnx.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0) |
|
468 cnx.commit() |
|
469 note.cw_adapt_to('IWorkflowable').fire_transition('markasdone') |
|
470 cnx.execute('SET X para "truc" WHERE X eid %(x)s', {'x': note.eid}) |
|
471 cnx.commit() |
|
472 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
473 cnx.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note.eid}) |
|
474 self.assertRaises(Unauthorized, cnx.commit) |
|
475 note2 = cnx.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0) |
|
476 cnx.commit() |
|
477 note2.cw_adapt_to('IWorkflowable').fire_transition('markasdone') |
|
478 cnx.commit() |
|
479 self.assertEqual(len(cnx.execute('Any X WHERE X in_state S, S name "todo", X eid %(x)s', |
|
480 {'x': note2.eid})), |
|
481 0) |
|
482 cnx.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid}) |
|
483 self.assertRaises(Unauthorized, cnx.commit) |
|
484 note2.cw_adapt_to('IWorkflowable').fire_transition('redoit') |
|
485 cnx.commit() |
|
486 cnx.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid}) |
|
487 cnx.commit() |
|
488 cnx.execute("INSERT Note X: X something 'A'") |
|
489 self.assertRaises(Unauthorized, cnx.commit) |
|
490 cnx.execute("INSERT Note X: X para 'zogzog', X something 'A'") |
|
491 cnx.commit() |
|
492 note = cnx.execute("INSERT Note X").get_entity(0,0) |
|
493 cnx.commit() |
|
494 note.cw_set(something=u'B') |
|
495 cnx.commit() |
|
496 note.cw_set(something=None, para=u'zogzog') |
|
497 cnx.commit() |
|
498 |
|
499 def test_attribute_read_security(self): |
|
500 # anon not allowed to see users'login, but they can see users |
|
501 login_rdef = self.repo.schema['CWUser'].rdef('login') |
|
502 with self.temporary_permissions((login_rdef, {'read': ('users', 'managers')}), |
|
503 CWUser={'read': ('guests', 'users', 'managers')}): |
|
504 with self.new_access(u'anon').repo_cnx() as cnx: |
|
505 rset = cnx.execute('CWUser X') |
|
506 self.assertTrue(rset) |
|
507 x = rset.get_entity(0, 0) |
|
508 x.complete() |
|
509 self.assertEqual(x.login, None) |
|
510 self.assertTrue(x.creation_date) |
|
511 x = rset.get_entity(1, 0) |
|
512 x.complete() |
|
513 self.assertEqual(x.login, None) |
|
514 self.assertTrue(x.creation_date) |
|
515 |
|
516 def test_yams_inheritance_and_security_bug(self): |
|
517 with self.temporary_permissions(Division={'read': ('managers', |
|
518 ERQLExpression('X owned_by U'))}): |
|
519 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
520 querier = cnx.repo.querier |
|
521 rqlst = querier.parse('Any X WHERE X is_instance_of Societe') |
|
522 querier.solutions(cnx, rqlst, {}) |
|
523 querier._annotate(rqlst) |
|
524 plan = querier.plan_factory(rqlst, {}, cnx) |
|
525 plan.preprocess(rqlst) |
|
526 self.assertEqual( |
|
527 rqlst.as_string(), |
|
528 '(Any X WHERE X is IN(Societe, SubDivision)) UNION ' |
|
529 '(Any X WHERE X is Division, EXISTS(X owned_by %(B)s))') |
|
530 |
|
531 |
|
532 class BaseSchemaSecurityTC(BaseSecurityTC): |
|
533 """tests related to the base schema permission configuration""" |
|
534 |
|
535 def test_user_can_delete_object_he_created(self): |
|
536 # even if some other user have changed object'state |
|
537 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
538 # due to security test, affaire has to concerne a societe the user owns |
|
539 cnx.execute('INSERT Societe X: X nom "ARCTIA"') |
|
540 cnx.execute('INSERT Affaire X: X ref "ARCT01", X concerne S WHERE S nom "ARCTIA"') |
|
541 cnx.commit() |
|
542 with self.admin_access.repo_cnx() as cnx: |
|
543 affaire = cnx.execute('Any X WHERE X ref "ARCT01"').get_entity(0, 0) |
|
544 affaire.cw_adapt_to('IWorkflowable').fire_transition('abort') |
|
545 cnx.commit() |
|
546 self.assertEqual(len(cnx.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01"')), |
|
547 1) |
|
548 self.assertEqual(len(cnx.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01",' |
|
549 'X owned_by U, U login "admin"')), |
|
550 1) # TrInfo at the above state change |
|
551 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
552 cnx.execute('DELETE Affaire X WHERE X ref "ARCT01"') |
|
553 cnx.commit() |
|
554 self.assertFalse(cnx.execute('Affaire X')) |
|
555 |
|
556 def test_users_and_groups_non_readable_by_guests(self): |
|
557 with self.repo.internal_cnx() as cnx: |
|
558 admineid = cnx.execute('CWUser U WHERE U login "admin"').rows[0][0] |
|
559 with self.new_access(u'anon').repo_cnx() as cnx: |
|
560 anon = cnx.user |
|
561 # anonymous user can only read itself |
|
562 rset = cnx.execute('Any L WHERE X owned_by U, U login L') |
|
563 self.assertEqual([['anon']], rset.rows) |
|
564 rset = cnx.execute('CWUser X') |
|
565 self.assertEqual([[anon.eid]], rset.rows) |
|
566 # anonymous user can read groups (necessary to check allowed transitions for instance) |
|
567 self.assertTrue(cnx.execute('CWGroup X')) |
|
568 # should only be able to read the anonymous user, not another one |
|
569 self.assertRaises(Unauthorized, |
|
570 cnx.execute, 'CWUser X WHERE X eid %(x)s', {'x': admineid}) |
|
571 rset = cnx.execute('CWUser X WHERE X eid %(x)s', {'x': anon.eid}) |
|
572 self.assertEqual([[anon.eid]], rset.rows) |
|
573 # but can't modify it |
|
574 cnx.execute('SET X login "toto" WHERE X eid %(x)s', {'x': anon.eid}) |
|
575 self.assertRaises(Unauthorized, cnx.commit) |
|
576 |
|
577 def test_in_group_relation(self): |
|
578 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
579 rql = u"DELETE U in_group G WHERE U login 'admin'" |
|
580 self.assertRaises(Unauthorized, cnx.execute, rql) |
|
581 rql = u"SET U in_group G WHERE U login 'admin', G name 'users'" |
|
582 self.assertRaises(Unauthorized, cnx.execute, rql) |
|
583 |
|
584 def test_owned_by(self): |
|
585 with self.admin_access.repo_cnx() as cnx: |
|
586 cnx.execute("INSERT Personne X: X nom 'bidule'") |
|
587 cnx.commit() |
|
588 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
589 rql = u"SET X owned_by U WHERE U login 'iaminusersgrouponly', X is Personne" |
|
590 self.assertRaises(Unauthorized, cnx.execute, rql) |
|
591 |
|
592 def test_bookmarked_by_guests_security(self): |
|
593 with self.admin_access.repo_cnx() as cnx: |
|
594 beid1 = cnx.execute('INSERT Bookmark B: B path "?vid=manage", B title "manage"')[0][0] |
|
595 beid2 = cnx.execute('INSERT Bookmark B: B path "?vid=index", B title "index", ' |
|
596 'B bookmarked_by U WHERE U login "anon"')[0][0] |
|
597 cnx.commit() |
|
598 with self.new_access(u'anon').repo_cnx() as cnx: |
|
599 anoneid = cnx.user.eid |
|
600 self.assertEqual(cnx.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,' |
|
601 'B bookmarked_by U, U eid %s' % anoneid).rows, |
|
602 [['index', '?vid=index']]) |
|
603 self.assertEqual(cnx.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,' |
|
604 'B bookmarked_by U, U eid %(x)s', {'x': anoneid}).rows, |
|
605 [['index', '?vid=index']]) |
|
606 # can read others bookmarks as well |
|
607 self.assertEqual(cnx.execute('Any B where B is Bookmark, NOT B bookmarked_by U').rows, |
|
608 [[beid1]]) |
|
609 self.assertRaises(Unauthorized, cnx.execute,'DELETE B bookmarked_by U') |
|
610 self.assertRaises(Unauthorized, |
|
611 cnx.execute, 'SET B bookmarked_by U WHERE U eid %(x)s, B eid %(b)s', |
|
612 {'x': anoneid, 'b': beid1}) |
|
613 |
|
614 def test_ambigous_ordered(self): |
|
615 with self.new_access(u'anon').repo_cnx() as cnx: |
|
616 names = [t for t, in cnx.execute('Any N ORDERBY lower(N) WHERE X name N')] |
|
617 self.assertEqual(names, sorted(names, key=lambda x: x.lower())) |
|
618 |
|
619 def test_in_state_without_update_perm(self): |
|
620 """check a user change in_state without having update permission on the |
|
621 subject |
|
622 """ |
|
623 with self.admin_access.repo_cnx() as cnx: |
|
624 eid = cnx.execute('INSERT Affaire X: X ref "ARCT01"')[0][0] |
|
625 cnx.commit() |
|
626 with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: |
|
627 # needed to remove rql expr granting update perm to the user |
|
628 affschema = self.schema['Affaire'] |
|
629 with self.temporary_permissions(Affaire={'update': affschema.get_groups('update'), |
|
630 'read': ('users',)}): |
|
631 self.assertRaises(Unauthorized, |
|
632 affschema.check_perm, cnx, 'update', eid=eid) |
|
633 aff = cnx.execute('Any X WHERE X ref "ARCT01"').get_entity(0, 0) |
|
634 aff.cw_adapt_to('IWorkflowable').fire_transition('abort') |
|
635 cnx.commit() |
|
636 # though changing a user state (even logged user) is reserved to managers |
|
637 user = cnx.user |
|
638 # XXX wether it should raise Unauthorized or ValidationError is not clear |
|
639 # the best would probably ValidationError if the transition doesn't exist |
|
640 # from the current state but Unauthorized if it exists but user can't pass it |
|
641 self.assertRaises(ValidationError, |
|
642 user.cw_adapt_to('IWorkflowable').fire_transition, 'deactivate') |
|
643 |
|
644 def test_trinfo_security(self): |
|
645 with self.admin_access.repo_cnx() as cnx: |
|
646 aff = cnx.execute('INSERT Affaire X: X ref "ARCT01"').get_entity(0, 0) |
|
647 iworkflowable = aff.cw_adapt_to('IWorkflowable') |
|
648 cnx.commit() |
|
649 iworkflowable.fire_transition('abort') |
|
650 cnx.commit() |
|
651 # can change tr info comment |
|
652 cnx.execute('SET TI comment %(c)s WHERE TI wf_info_for X, X ref "ARCT01"', |
|
653 {'c': u'bouh!'}) |
|
654 cnx.commit() |
|
655 aff.cw_clear_relation_cache('wf_info_for', 'object') |
|
656 trinfo = iworkflowable.latest_trinfo() |
|
657 self.assertEqual(trinfo.comment, 'bouh!') |
|
658 # but not from_state/to_state |
|
659 aff.cw_clear_relation_cache('wf_info_for', role='object') |
|
660 self.assertRaises(Unauthorized, cnx.execute, |
|
661 'SET TI from_state S WHERE TI eid %(ti)s, S name "ben non"', |
|
662 {'ti': trinfo.eid}) |
|
663 self.assertRaises(Unauthorized, cnx.execute, |
|
664 'SET TI to_state S WHERE TI eid %(ti)s, S name "pitetre"', |
|
665 {'ti': trinfo.eid}) |
|
666 |
|
667 def test_emailaddress_security(self): |
|
668 # check for prexisting email adresse |
|
669 with self.admin_access.repo_cnx() as cnx: |
|
670 if cnx.execute('Any X WHERE X is EmailAddress'): |
|
671 rset = cnx.execute('Any X, U WHERE X is EmailAddress, U use_email X') |
|
672 msg = ['Preexisting email readable by anon found!'] |
|
673 tmpl = ' - "%s" used by user "%s"' |
|
674 for i in range(len(rset)): |
|
675 email, user = rset.get_entity(i, 0), rset.get_entity(i, 1) |
|
676 msg.append(tmpl % (email.dc_title(), user.dc_title())) |
|
677 raise RuntimeError('\n'.join(msg)) |
|
678 # actual test |
|
679 cnx.execute('INSERT EmailAddress X: X address "hop"').get_entity(0, 0) |
|
680 cnx.execute('INSERT EmailAddress X: X address "anon", ' |
|
681 'U use_email X WHERE U login "anon"').get_entity(0, 0) |
|
682 cnx.commit() |
|
683 self.assertEqual(len(cnx.execute('Any X WHERE X is EmailAddress')), 2) |
|
684 with self.new_access(u'anon').repo_cnx() as cnx: |
|
685 self.assertEqual(len(cnx.execute('Any X WHERE X is EmailAddress')), 1) |
|
686 |
|
687 if __name__ == '__main__': |
|
688 unittest_main() |