100 self.execute("INSERT Personne X: X nom 'bidule'") |
91 self.execute("INSERT Personne X: X nom 'bidule'") |
101 self.execute('INSERT CWGroup X: X name "staff"') |
92 self.execute('INSERT CWGroup X: X name "staff"') |
102 self.commit() |
93 self.commit() |
103 |
94 |
104 def test_insert_security(self): |
95 def test_insert_security(self): |
105 cnx = self.login('anon') |
96 with self.login('anon') as cu: |
106 cu = cnx.cursor() |
97 cu.execute("INSERT Personne X: X nom 'bidule'") |
107 cu.execute("INSERT Personne X: X nom 'bidule'") |
98 self.assertRaises(Unauthorized, self.commit) |
108 self.assertRaises(Unauthorized, cnx.commit) |
99 self.assertEqual(cu.execute('Personne X').rowcount, 1) |
109 self.assertEqual(cu.execute('Personne X').rowcount, 1) |
|
110 cnx.close() |
|
111 |
100 |
112 def test_insert_rql_permission(self): |
101 def test_insert_rql_permission(self): |
113 # test user can only add une affaire related to a societe he owns |
102 # test user can only add une affaire related to a societe he owns |
114 cnx = self.login('iaminusersgrouponly') |
103 with self.login('iaminusersgrouponly') as cu: |
115 cu = cnx.cursor() |
104 cu.execute("INSERT Affaire X: X sujet 'cool'") |
116 cu.execute("INSERT Affaire X: X sujet 'cool'") |
105 self.assertRaises(Unauthorized, self.commit) |
117 self.assertRaises(Unauthorized, cnx.commit) |
|
118 # test nothing has actually been inserted |
106 # test nothing has actually been inserted |
119 self.restore_connection() |
|
120 self.assertEqual(self.execute('Affaire X').rowcount, 1) |
107 self.assertEqual(self.execute('Affaire X').rowcount, 1) |
121 cnx = self.login('iaminusersgrouponly') |
108 with self.login('iaminusersgrouponly') as cu: |
122 cu = cnx.cursor() |
109 cu.execute("INSERT Affaire X: X sujet 'cool'") |
123 cu.execute("INSERT Affaire X: X sujet 'cool'") |
110 cu.execute("INSERT Societe X: X nom 'chouette'") |
124 cu.execute("INSERT Societe X: X nom 'chouette'") |
111 cu.execute("SET A concerne S WHERE A sujet 'cool', S nom 'chouette'") |
125 cu.execute("SET A concerne S WHERE A sujet 'cool', S nom 'chouette'") |
112 self.commit() |
126 cnx.commit() |
|
127 cnx.close() |
|
128 |
113 |
129 def test_update_security_1(self): |
114 def test_update_security_1(self): |
130 cnx = self.login('anon') |
115 with self.login('anon') as cu: |
131 cu = cnx.cursor() |
116 # local security check |
132 # local security check |
117 cu.execute( "SET X nom 'bidulechouette' WHERE X is Personne") |
133 cu.execute( "SET X nom 'bidulechouette' WHERE X is Personne") |
118 self.assertRaises(Unauthorized, self.commit) |
134 self.assertRaises(Unauthorized, cnx.commit) |
|
135 self.restore_connection() |
|
136 self.assertEqual(self.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0) |
119 self.assertEqual(self.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0) |
137 |
120 |
138 def test_update_security_2(self): |
121 def test_update_security_2(self): |
139 cnx = self.login('anon') |
122 with self.temporary_permissions(Personne={'read': ('users', 'managers'), |
140 cu = cnx.cursor() |
123 'add': ('guests', 'users', 'managers')}): |
141 self.repo.schema['Personne'].set_action_permissions('read', ('users', 'managers')) |
124 with self.login('anon') as cu: |
142 self.repo.schema['Personne'].set_action_permissions('add', ('guests', 'users', 'managers')) |
125 self.assertRaises(Unauthorized, cu.execute, "SET X nom 'bidulechouette' WHERE X is Personne") |
143 self.assertRaises(Unauthorized, cu.execute, "SET X nom 'bidulechouette' WHERE X is Personne") |
126 self.rollback() |
144 #self.assertRaises(Unauthorized, cnx.commit) |
127 # self.assertRaises(Unauthorized, cnx.commit) |
145 # test nothing has actually been inserted |
128 # test nothing has actually been inserted |
146 self.restore_connection() |
|
147 self.assertEqual(self.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0) |
129 self.assertEqual(self.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0) |
148 |
130 |
149 def test_update_security_3(self): |
131 def test_update_security_3(self): |
150 cnx = self.login('iaminusersgrouponly') |
132 with self.login('iaminusersgrouponly') as cu: |
151 cu = cnx.cursor() |
133 cu.execute("INSERT Personne X: X nom 'biduuule'") |
152 cu.execute("INSERT Personne X: X nom 'biduuule'") |
134 cu.execute("INSERT Societe X: X nom 'looogilab'") |
153 cu.execute("INSERT Societe X: X nom 'looogilab'") |
135 cu.execute("SET X travaille S WHERE X nom 'biduuule', S nom 'looogilab'") |
154 cu.execute("SET X travaille S WHERE X nom 'biduuule', S nom 'looogilab'") |
|
155 cnx.close() |
|
156 |
136 |
157 def test_update_rql_permission(self): |
137 def test_update_rql_permission(self): |
158 self.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
138 self.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
159 self.commit() |
139 self.commit() |
160 # test user can only update une affaire related to a societe he owns |
140 # test user can only update une affaire related to a societe he owns |
161 cnx = self.login('iaminusersgrouponly') |
141 with self.login('iaminusersgrouponly') as cu: |
162 cu = cnx.cursor() |
142 cu.execute("SET X sujet 'pascool' WHERE X is Affaire") |
163 cu.execute("SET X sujet 'pascool' WHERE X is Affaire") |
143 # this won't actually do anything since the selection query won't return anything |
164 # this won't actually do anything since the selection query won't return anything |
144 self.commit() |
165 cnx.commit() |
145 # to actually get Unauthorized exception, try to update an entity we can read |
166 # to actually get Unauthorized exception, try to update an entity we can read |
146 cu.execute("SET X nom 'toto' WHERE X is Societe") |
167 cu.execute("SET X nom 'toto' WHERE X is Societe") |
147 self.assertRaises(Unauthorized, self.commit) |
168 self.assertRaises(Unauthorized, cnx.commit) |
148 cu.execute("INSERT Affaire X: X sujet 'pascool'") |
169 cu.execute("INSERT Affaire X: X sujet 'pascool'") |
149 cu.execute("INSERT Societe X: X nom 'chouette'") |
170 cu.execute("INSERT Societe X: X nom 'chouette'") |
150 cu.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'") |
171 cu.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'") |
151 cu.execute("SET X sujet 'habahsicestcool' WHERE X sujet 'pascool'") |
172 cu.execute("SET X sujet 'habahsicestcool' WHERE X sujet 'pascool'") |
152 self.commit() |
173 cnx.commit() |
|
174 cnx.close() |
|
175 |
153 |
176 def test_delete_security(self): |
154 def test_delete_security(self): |
177 # FIXME: sample below fails because we don't detect "owner" can't delete |
155 # FIXME: sample below fails because we don't detect "owner" can't delete |
178 # user anyway, and since no user with login == 'bidule' exists, no |
156 # user anyway, and since no user with login == 'bidule' exists, no |
179 # exception is raised |
157 # exception is raised |
180 #user._groups = {'guests':1} |
158 #user._groups = {'guests':1} |
181 #self.assertRaises(Unauthorized, |
159 #self.assertRaises(Unauthorized, |
182 # self.o.execute, user, "DELETE CWUser X WHERE X login 'bidule'") |
160 # self.o.execute, user, "DELETE CWUser X WHERE X login 'bidule'") |
183 # check local security |
161 # check local security |
184 cnx = self.login('iaminusersgrouponly') |
162 with self.login('iaminusersgrouponly') as cu: |
185 cu = cnx.cursor() |
163 self.assertRaises(Unauthorized, cu.execute, "DELETE CWGroup Y WHERE Y name 'staff'") |
186 self.assertRaises(Unauthorized, cu.execute, "DELETE CWGroup Y WHERE Y name 'staff'") |
164 self.rollback() |
187 cnx.close() |
|
188 |
165 |
189 def test_delete_rql_permission(self): |
166 def test_delete_rql_permission(self): |
190 self.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
167 self.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
191 self.commit() |
168 self.commit() |
192 # test user can only dele une affaire related to a societe he owns |
169 # test user can only dele une affaire related to a societe he owns |
193 cnx = self.login('iaminusersgrouponly') |
170 with self.login('iaminusersgrouponly') as cu: |
194 cu = cnx.cursor() |
171 # this won't actually do anything since the selection query won't return anything |
195 # this won't actually do anything since the selection query won't return anything |
172 cu.execute("DELETE Affaire X") |
196 cu.execute("DELETE Affaire X") |
173 self.commit() |
197 cnx.commit() |
174 # to actually get Unauthorized exception, try to delete an entity we can read |
198 # to actually get Unauthorized exception, try to delete an entity we can read |
175 self.assertRaises(Unauthorized, cu.execute, "DELETE Societe S") |
199 self.assertRaises(Unauthorized, cu.execute, "DELETE Societe S") |
176 self.assertRaises(QueryError, self.commit) # can't commit anymore |
200 self.assertRaises(QueryError, cnx.commit) # can't commit anymore |
177 self.rollback() # required after Unauthorized |
201 cnx.rollback() # required after Unauthorized |
178 cu.execute("INSERT Affaire X: X sujet 'pascool'") |
202 cu.execute("INSERT Affaire X: X sujet 'pascool'") |
179 cu.execute("INSERT Societe X: X nom 'chouette'") |
203 cu.execute("INSERT Societe X: X nom 'chouette'") |
180 cu.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'") |
204 cu.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'") |
181 self.commit() |
205 cnx.commit() |
|
206 ## # this one should fail since it will try to delete two affaires, one authorized |
182 ## # this one should fail since it will try to delete two affaires, one authorized |
207 ## # and the other not |
183 ## # and the other not |
208 ## self.assertRaises(Unauthorized, cu.execute, "DELETE Affaire X") |
184 ## self.assertRaises(Unauthorized, cu.execute, "DELETE Affaire X") |
209 cu.execute("DELETE Affaire X WHERE X sujet 'pascool'") |
185 cu.execute("DELETE Affaire X WHERE X sujet 'pascool'") |
210 cnx.commit() |
186 self.commit() |
211 cnx.close() |
|
212 |
187 |
213 |
188 |
214 def test_insert_relation_rql_permission(self): |
189 def test_insert_relation_rql_permission(self): |
215 cnx = self.login('iaminusersgrouponly') |
190 with self.login('iaminusersgrouponly') as cu: |
216 session = self.session |
191 cu.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
217 cu = cnx.cursor(session) |
192 # should raise Unauthorized since user don't own S though this won't |
218 cu.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
193 # actually do anything since the selection query won't return |
219 # should raise Unauthorized since user don't own S |
194 # anything |
220 # though this won't actually do anything since the selection query won't return anything |
195 self.commit() |
221 cnx.commit() |
196 # to actually get Unauthorized exception, try to insert a relation |
222 # to actually get Unauthorized exception, try to insert a relation were we can read both entities |
197 # were we can read both entities |
223 rset = cu.execute('Personne P') |
198 rset = cu.execute('Personne P') |
224 self.assertEqual(len(rset), 1) |
199 self.assertEqual(len(rset), 1) |
225 ent = rset.get_entity(0, 0) |
200 ent = rset.get_entity(0, 0) |
226 session.set_cnxset() # necessary |
201 self.assertFalse(cu.execute('Any P,S WHERE P travaille S,P is Personne, S is Societe')) |
227 self.assertRaises(Unauthorized, ent.cw_check_perm, 'update') |
202 self.assertRaises(Unauthorized, ent.cw_check_perm, 'update') |
228 self.assertRaises(Unauthorized, |
203 self.assertRaises(Unauthorized, |
229 cu.execute, "SET P travaille S WHERE P is Personne, S is Societe") |
204 cu.execute, "SET P travaille S WHERE P is Personne, S is Societe") |
230 self.assertRaises(QueryError, cnx.commit) # can't commit anymore |
205 self.assertRaises(QueryError, self.commit) # can't commit anymore |
231 cnx.rollback() |
206 self.rollback() |
232 # test nothing has actually been inserted: |
207 # test nothing has actually been inserted: |
233 self.assertEqual(cu.execute('Any P,S WHERE P travaille S,P is Personne, S is Societe').rowcount, 0) |
208 self.assertFalse(cu.execute('Any P,S WHERE P travaille S,P is Personne, S is Societe')) |
234 cu.execute("INSERT Societe X: X nom 'chouette'") |
209 cu.execute("INSERT Societe X: X nom 'chouette'") |
235 cu.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'") |
210 cu.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'") |
236 cnx.commit() |
211 self.commit() |
237 cnx.close() |
|
238 |
212 |
239 def test_delete_relation_rql_permission(self): |
213 def test_delete_relation_rql_permission(self): |
240 self.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
214 self.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
241 self.commit() |
215 self.commit() |
242 cnx = self.login('iaminusersgrouponly') |
216 with self.login('iaminusersgrouponly') as cu: |
243 cu = cnx.cursor() |
217 # this won't actually do anything since the selection query won't return anything |
244 # this won't actually do anything since the selection query won't return anything |
218 cu.execute("DELETE A concerne S") |
245 cu.execute("DELETE A concerne S") |
219 self.commit() |
246 cnx.commit() |
|
247 # to actually get Unauthorized exception, try to delete a relation we can read |
220 # to actually get Unauthorized exception, try to delete a relation we can read |
248 self.restore_connection() |
|
249 eid = self.execute("INSERT Affaire X: X sujet 'pascool'")[0][0] |
221 eid = self.execute("INSERT Affaire X: X sujet 'pascool'")[0][0] |
250 self.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"', {'x': eid}) |
222 self.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"', {'x': eid}) |
251 self.execute("SET A concerne S WHERE A sujet 'pascool', S is Societe") |
223 self.execute("SET A concerne S WHERE A sujet 'pascool', S is Societe") |
252 self.commit() |
224 self.commit() |
253 cnx = self.login('iaminusersgrouponly') |
225 with self.login('iaminusersgrouponly') as cu: |
254 cu = cnx.cursor() |
226 self.assertRaises(Unauthorized, cu.execute, "DELETE A concerne S") |
255 self.assertRaises(Unauthorized, cu.execute, "DELETE A concerne S") |
227 self.assertRaises(QueryError, self.commit) # can't commit anymore |
256 self.assertRaises(QueryError, cnx.commit) # can't commit anymore |
228 self.rollback() # required after Unauthorized |
257 cnx.rollback() # required after Unauthorized |
229 cu.execute("INSERT Societe X: X nom 'chouette'") |
258 cu.execute("INSERT Societe X: X nom 'chouette'") |
230 cu.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'") |
259 cu.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'") |
231 self.commit() |
260 cnx.commit() |
232 cu.execute("DELETE A concerne S WHERE S nom 'chouette'") |
261 cu.execute("DELETE A concerne S WHERE S nom 'chouette'") |
233 self.commit() |
262 cnx.close() |
|
263 |
234 |
264 |
235 |
265 def test_user_can_change_its_upassword(self): |
236 def test_user_can_change_its_upassword(self): |
266 req = self.request() |
237 req = self.request() |
267 ueid = self.create_user(req, 'user').eid |
238 ueid = self.create_user(req, 'user').eid |
268 cnx = self.login('user') |
239 with self.login('user') as cu: |
269 cu = cnx.cursor() |
240 cu.execute('SET X upassword %(passwd)s WHERE X eid %(x)s', |
270 cu.execute('SET X upassword %(passwd)s WHERE X eid %(x)s', |
241 {'x': ueid, 'passwd': 'newpwd'}) |
271 {'x': ueid, 'passwd': 'newpwd'}) |
242 self.commit() |
272 cnx.commit() |
|
273 cnx.close() |
|
274 cnx = self.login('user', password='newpwd') |
243 cnx = self.login('user', password='newpwd') |
275 cnx.close() |
244 cnx.close() |
276 |
245 |
277 def test_user_cant_change_other_upassword(self): |
246 def test_user_cant_change_other_upassword(self): |
278 req = self.request() |
247 req = self.request() |
279 ueid = self.create_user(req, 'otheruser').eid |
248 ueid = self.create_user(req, 'otheruser').eid |
280 cnx = self.login('iaminusersgrouponly') |
249 with self.login('iaminusersgrouponly') as cu: |
281 cu = cnx.cursor() |
250 cu.execute('SET X upassword %(passwd)s WHERE X eid %(x)s', |
282 cu.execute('SET X upassword %(passwd)s WHERE X eid %(x)s', |
251 {'x': ueid, 'passwd': 'newpwd'}) |
283 {'x': ueid, 'passwd': 'newpwd'}) |
252 self.assertRaises(Unauthorized, self.commit) |
284 self.assertRaises(Unauthorized, cnx.commit) |
|
285 cnx.close() |
|
286 |
253 |
287 # read security test |
254 # read security test |
288 |
255 |
289 def test_read_base(self): |
256 def test_read_base(self): |
290 self.schema['Personne'].set_action_permissions('read', ('users', 'managers')) |
257 with self.temporary_permissions(Personne={'read': ('users', 'managers')}): |
291 cnx = self.login('anon') |
258 with self.login('anon') as cu: |
292 cu = cnx.cursor() |
259 self.assertRaises(Unauthorized, |
293 self.assertRaises(Unauthorized, |
260 cu.execute, 'Personne U where U nom "managers"') |
294 cu.execute, 'Personne U where U nom "managers"') |
261 self.rollback() |
295 cnx.close() |
|
296 |
262 |
297 def test_read_erqlexpr_base(self): |
263 def test_read_erqlexpr_base(self): |
298 eid = self.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
264 eid = self.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
299 self.commit() |
265 self.commit() |
300 cnx = self.login('iaminusersgrouponly') |
266 with self.login('iaminusersgrouponly') as cu: |
301 cu = cnx.cursor() |
267 rset = cu.execute('Affaire X') |
302 rset = cu.execute('Affaire X') |
268 self.assertEqual(rset.rows, []) |
303 self.assertEqual(rset.rows, []) |
269 self.assertRaises(Unauthorized, cu.execute, 'Any X WHERE X eid %(x)s', {'x': eid}) |
304 self.assertRaises(Unauthorized, cu.execute, 'Any X WHERE X eid %(x)s', {'x': eid}) |
270 # cache test |
305 # cache test |
271 self.assertRaises(Unauthorized, cu.execute, 'Any X WHERE X eid %(x)s', {'x': eid}) |
306 self.assertRaises(Unauthorized, cu.execute, 'Any X WHERE X eid %(x)s', {'x': eid}) |
272 aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
307 aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
273 soc1 = cu.execute("INSERT Societe X: X nom 'chouette'")[0][0] |
308 soc1 = cu.execute("INSERT Societe X: X nom 'chouette'")[0][0] |
274 cu.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
309 cu.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
275 self.commit() |
310 cnx.commit() |
276 rset = cu.execute('Any X WHERE X eid %(x)s', {'x': aff2}) |
311 rset = cu.execute('Any X WHERE X eid %(x)s', {'x': aff2}) |
277 self.assertEqual(rset.rows, [[aff2]]) |
312 self.assertEqual(rset.rows, [[aff2]]) |
278 # more cache test w/ NOT eid |
313 # more cache test w/ NOT eid |
279 rset = cu.execute('Affaire X WHERE NOT X eid %(x)s', {'x': eid}) |
314 rset = cu.execute('Affaire X WHERE NOT X eid %(x)s', {'x': eid}) |
280 self.assertEqual(rset.rows, [[aff2]]) |
315 self.assertEqual(rset.rows, [[aff2]]) |
281 rset = cu.execute('Affaire X WHERE NOT X eid %(x)s', {'x': aff2}) |
316 rset = cu.execute('Affaire X WHERE NOT X eid %(x)s', {'x': aff2}) |
282 self.assertEqual(rset.rows, []) |
317 self.assertEqual(rset.rows, []) |
283 # test can't update an attribute of an entity that can't be readen |
318 # test can't update an attribute of an entity that can't be readen |
284 self.assertRaises(Unauthorized, cu.execute, 'SET X sujet "hacked" WHERE X eid %(x)s', {'x': eid}) |
319 self.assertRaises(Unauthorized, cu.execute, 'SET X sujet "hacked" WHERE X eid %(x)s', {'x': eid}) |
285 self.rollback() |
320 cnx.close() |
|
321 |
286 |
322 |
287 |
323 def test_entity_created_in_transaction(self): |
288 def test_entity_created_in_transaction(self): |
324 affschema = self.schema['Affaire'] |
289 affschema = self.schema['Affaire'] |
325 origperms = affschema.permissions['read'] |
290 with self.temporary_permissions(Affaire={'read': affschema.permissions['add']}): |
326 affschema.set_action_permissions('read', affschema.permissions['add']) |
291 with self.login('iaminusersgrouponly') as cu: |
327 try: |
292 aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
328 cnx = self.login('iaminusersgrouponly') |
293 # entity created in transaction are readable *by eid* |
329 cu = cnx.cursor() |
294 self.assertTrue(cu.execute('Any X WHERE X eid %(x)s', {'x':aff2})) |
330 aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
295 # XXX would be nice if it worked |
331 # entity created in transaction are readable *by eid* |
296 rset = cu.execute("Affaire X WHERE X sujet 'cool'") |
332 self.assertTrue(cu.execute('Any X WHERE X eid %(x)s', {'x':aff2})) |
297 self.assertEqual(len(rset), 0) |
333 # XXX would be nice if it worked |
298 self.assertRaises(Unauthorized, self.commit) |
334 rset = cu.execute("Affaire X WHERE X sujet 'cool'") |
|
335 self.assertEqual(len(rset), 0) |
|
336 finally: |
|
337 affschema.set_action_permissions('read', origperms) |
|
338 cnx.close() |
|
339 |
299 |
340 def test_read_erqlexpr_has_text1(self): |
300 def test_read_erqlexpr_has_text1(self): |
341 aff1 = self.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
301 aff1 = self.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
342 card1 = self.execute("INSERT Card X: X title 'cool'")[0][0] |
302 card1 = self.execute("INSERT Card X: X title 'cool'")[0][0] |
343 self.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"', {'x': card1}) |
303 self.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"', {'x': card1}) |
344 self.commit() |
304 self.commit() |
345 cnx = self.login('iaminusersgrouponly') |
305 with self.login('iaminusersgrouponly') as cu: |
346 cu = cnx.cursor() |
306 aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
347 aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
307 soc1 = cu.execute("INSERT Societe X: X nom 'chouette'")[0][0] |
348 soc1 = cu.execute("INSERT Societe X: X nom 'chouette'")[0][0] |
308 cu.execute("SET A concerne S WHERE A eid %(a)s, S eid %(s)s", {'a': aff2, 's': soc1}) |
349 cu.execute("SET A concerne S WHERE A eid %(a)s, S eid %(s)s", {'a': aff2, 's': soc1}) |
309 self.commit() |
350 cnx.commit() |
310 self.assertRaises(Unauthorized, cu.execute, 'Any X WHERE X eid %(x)s', {'x':aff1}) |
351 self.assertRaises(Unauthorized, cu.execute, 'Any X WHERE X eid %(x)s', {'x':aff1}) |
311 self.assertTrue(cu.execute('Any X WHERE X eid %(x)s', {'x':aff2})) |
352 self.assertTrue(cu.execute('Any X WHERE X eid %(x)s', {'x':aff2})) |
312 self.assertTrue(cu.execute('Any X WHERE X eid %(x)s', {'x':card1})) |
353 self.assertTrue(cu.execute('Any X WHERE X eid %(x)s', {'x':card1})) |
313 rset = cu.execute("Any X WHERE X has_text 'cool'") |
354 rset = cu.execute("Any X WHERE X has_text 'cool'") |
314 self.assertEqual(sorted(eid for eid, in rset.rows), |
355 self.assertEqual(sorted(eid for eid, in rset.rows), |
315 [card1, aff2]) |
356 [card1, aff2]) |
316 self.rollback() |
357 cnx.close() |
|
358 |
317 |
359 def test_read_erqlexpr_has_text2(self): |
318 def test_read_erqlexpr_has_text2(self): |
360 self.execute("INSERT Personne X: X nom 'bidule'") |
319 self.execute("INSERT Personne X: X nom 'bidule'") |
361 self.execute("INSERT Societe X: X nom 'bidule'") |
320 self.execute("INSERT Societe X: X nom 'bidule'") |
362 self.commit() |
321 self.commit() |
363 self.schema['Personne'].set_action_permissions('read', ('managers',)) |
322 with self.temporary_permissions(Personne={'read': ('managers',)}): |
364 cnx = self.login('iaminusersgrouponly') |
323 with self.login('iaminusersgrouponly') as cu: |
365 cu = cnx.cursor() |
324 rset = cu.execute('Any N WHERE N has_text "bidule"') |
366 rset = cu.execute('Any N WHERE N has_text "bidule"') |
325 self.assertEqual(len(rset.rows), 1, rset.rows) |
367 self.assertEqual(len(rset.rows), 1, rset.rows) |
326 rset = cu.execute('Any N WITH N BEING (Any N WHERE N has_text "bidule")') |
368 rset = cu.execute('Any N WITH N BEING (Any N WHERE N has_text "bidule")') |
327 self.assertEqual(len(rset.rows), 1, rset.rows) |
369 self.assertEqual(len(rset.rows), 1, rset.rows) |
|
370 cnx.close() |
|
371 |
328 |
372 def test_read_erqlexpr_optional_rel(self): |
329 def test_read_erqlexpr_optional_rel(self): |
373 self.execute("INSERT Personne X: X nom 'bidule'") |
330 self.execute("INSERT Personne X: X nom 'bidule'") |
374 self.execute("INSERT Societe X: X nom 'bidule'") |
331 self.execute("INSERT Societe X: X nom 'bidule'") |
375 self.commit() |
332 self.commit() |
376 self.schema['Personne'].set_action_permissions('read', ('managers',)) |
333 with self.temporary_permissions(Personne={'read': ('managers',)}): |
377 cnx = self.login('anon') |
334 with self.login('anon') as cu: |
378 cu = cnx.cursor() |
335 rset = cu.execute('Any N,U WHERE N has_text "bidule", N owned_by U?') |
379 rset = cu.execute('Any N,U WHERE N has_text "bidule", N owned_by U?') |
336 self.assertEqual(len(rset.rows), 1, rset.rows) |
380 self.assertEqual(len(rset.rows), 1, rset.rows) |
|
381 cnx.close() |
|
382 |
337 |
383 def test_read_erqlexpr_aggregat(self): |
338 def test_read_erqlexpr_aggregat(self): |
384 self.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
339 self.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
385 self.commit() |
340 self.commit() |
386 cnx = self.login('iaminusersgrouponly') |
341 with self.login('iaminusersgrouponly') as cu: |
387 cu = cnx.cursor() |
342 rset = cu.execute('Any COUNT(X) WHERE X is Affaire') |
388 rset = cu.execute('Any COUNT(X) WHERE X is Affaire') |
343 self.assertEqual(rset.rows, [[0]]) |
389 self.assertEqual(rset.rows, [[0]]) |
344 aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
390 aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
345 soc1 = cu.execute("INSERT Societe X: X nom 'chouette'")[0][0] |
391 soc1 = cu.execute("INSERT Societe X: X nom 'chouette'")[0][0] |
346 cu.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
392 cu.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
347 self.commit() |
393 cnx.commit() |
348 rset = cu.execute('Any COUNT(X) WHERE X is Affaire') |
394 rset = cu.execute('Any COUNT(X) WHERE X is Affaire') |
349 self.assertEqual(rset.rows, [[1]]) |
395 self.assertEqual(rset.rows, [[1]]) |
350 rset = cu.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN') |
396 rset = cu.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN') |
351 values = dict(rset) |
397 values = dict(rset) |
352 self.assertEqual(values['Affaire'], 1) |
398 self.assertEqual(values['Affaire'], 1) |
353 self.assertEqual(values['Societe'], 2) |
399 self.assertEqual(values['Societe'], 2) |
354 rset = cu.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN WITH X BEING ((Affaire X) UNION (Societe X))') |
400 rset = cu.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN WITH X BEING ((Affaire X) UNION (Societe X))') |
355 self.assertEqual(len(rset), 2) |
401 self.assertEqual(len(rset), 2) |
356 values = dict(rset) |
402 values = dict(rset) |
357 self.assertEqual(values['Affaire'], 1) |
403 self.assertEqual(values['Affaire'], 1) |
358 self.assertEqual(values['Societe'], 2) |
404 self.assertEqual(values['Societe'], 2) |
|
405 cnx.close() |
|
406 |
359 |
407 |
360 |
408 def test_attribute_security(self): |
361 def test_attribute_security(self): |
409 # only managers should be able to edit the 'test' attribute of Personne entities |
362 # only managers should be able to edit the 'test' attribute of Personne entities |
410 eid = self.execute("INSERT Personne X: X nom 'bidule', X web 'http://www.debian.org', X test TRUE")[0][0] |
363 eid = self.execute("INSERT Personne X: X nom 'bidule', X web 'http://www.debian.org', X test TRUE")[0][0] |
411 self.commit() |
|
412 self.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid}) |
364 self.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid}) |
413 self.commit() |
365 self.commit() |
414 cnx = self.login('iaminusersgrouponly') |
366 with self.login('iaminusersgrouponly') as cu: |
415 cu = cnx.cursor() |
367 cu.execute("INSERT Personne X: X nom 'bidule', X web 'http://www.debian.org', X test TRUE") |
416 cu.execute("INSERT Personne X: X nom 'bidule', X web 'http://www.debian.org', X test TRUE") |
368 self.assertRaises(Unauthorized, self.commit) |
417 self.assertRaises(Unauthorized, cnx.commit) |
369 cu.execute("INSERT Personne X: X nom 'bidule', X web 'http://www.debian.org', X test FALSE") |
418 cu.execute("INSERT Personne X: X nom 'bidule', X web 'http://www.debian.org', X test FALSE") |
370 self.assertRaises(Unauthorized, self.commit) |
419 self.assertRaises(Unauthorized, cnx.commit) |
371 eid = cu.execute("INSERT Personne X: X nom 'bidule', X web 'http://www.debian.org'")[0][0] |
420 eid = cu.execute("INSERT Personne X: X nom 'bidule', X web 'http://www.debian.org'")[0][0] |
372 self.commit() |
421 cnx.commit() |
373 cu.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid}) |
422 cu.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid}) |
374 self.assertRaises(Unauthorized, self.commit) |
423 self.assertRaises(Unauthorized, cnx.commit) |
375 cu.execute('SET X test TRUE WHERE X eid %(x)s', {'x': eid}) |
424 cu.execute('SET X test TRUE WHERE X eid %(x)s', {'x': eid}) |
376 self.assertRaises(Unauthorized, self.commit) |
425 self.assertRaises(Unauthorized, cnx.commit) |
377 cu.execute('SET X web "http://www.logilab.org" WHERE X eid %(x)s', {'x': eid}) |
426 cu.execute('SET X web "http://www.logilab.org" WHERE X eid %(x)s', {'x': eid}) |
378 self.commit() |
427 cnx.commit() |
|
428 cnx.close() |
|
429 |
379 |
430 def test_attribute_security_rqlexpr(self): |
380 def test_attribute_security_rqlexpr(self): |
431 # Note.para attribute editable by managers or if the note is in "todo" state |
381 # Note.para attribute editable by managers or if the note is in "todo" state |
432 note = self.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0) |
382 note = self.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0) |
433 self.commit() |
383 self.commit() |
434 note.cw_adapt_to('IWorkflowable').fire_transition('markasdone') |
384 note.cw_adapt_to('IWorkflowable').fire_transition('markasdone') |
435 self.execute('SET X para "truc" WHERE X eid %(x)s', {'x': note.eid}) |
385 self.execute('SET X para "truc" WHERE X eid %(x)s', {'x': note.eid}) |
436 self.commit() |
386 self.commit() |
437 cnx = self.login('iaminusersgrouponly') |
387 with self.login('iaminusersgrouponly') as cu: |
438 cu = cnx.cursor() |
388 cu.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note.eid}) |
439 cu.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note.eid}) |
389 self.assertRaises(Unauthorized, self.commit) |
440 self.assertRaises(Unauthorized, cnx.commit) |
390 note2 = cu.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0) |
441 note2 = cu.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0) |
391 self.commit() |
442 cnx.commit() |
392 note2.cw_adapt_to('IWorkflowable').fire_transition('markasdone') |
443 note2.cw_adapt_to('IWorkflowable').fire_transition('markasdone') |
393 self.commit() |
444 cnx.commit() |
394 self.assertEqual(len(cu.execute('Any X WHERE X in_state S, S name "todo", X eid %(x)s', {'x': note2.eid})), |
445 self.assertEqual(len(cu.execute('Any X WHERE X in_state S, S name "todo", X eid %(x)s', {'x': note2.eid})), |
395 0) |
446 0) |
396 cu.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid}) |
447 cu.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid}) |
397 self.assertRaises(Unauthorized, self.commit) |
448 self.assertRaises(Unauthorized, cnx.commit) |
398 note2.cw_adapt_to('IWorkflowable').fire_transition('redoit') |
449 note2.cw_adapt_to('IWorkflowable').fire_transition('redoit') |
399 self.commit() |
450 cnx.commit() |
400 cu.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid}) |
451 cu.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid}) |
401 self.commit() |
452 cnx.commit() |
|
453 cnx.close() |
|
454 |
402 |
455 def test_attribute_read_security(self): |
403 def test_attribute_read_security(self): |
456 # anon not allowed to see users'login, but they can see users |
404 # anon not allowed to see users'login, but they can see users |
457 self.repo.schema['CWUser'].set_action_permissions('read', ('guests', 'users', 'managers')) |
405 login_rdef = self.repo.schema['CWUser'].rdef('login') |
458 self.repo.schema['CWUser'].rdef('login').set_action_permissions('read', ('users', 'managers')) |
406 with self.temporary_permissions((login_rdef, {'read': ('users', 'managers')}), |
459 cnx = self.login('anon') |
407 CWUser={'read': ('guests', 'users', 'managers')}): |
460 cu = cnx.cursor() |
408 with self.login('anon') as cu: |
461 rset = cu.execute('CWUser X') |
409 rset = cu.execute('CWUser X') |
462 self.assertTrue(rset) |
410 self.assertTrue(rset) |
463 x = rset.get_entity(0, 0) |
411 x = rset.get_entity(0, 0) |
464 self.assertEqual(x.login, None) |
412 self.assertEqual(x.login, None) |
465 self.assertTrue(x.creation_date) |
413 self.assertTrue(x.creation_date) |
466 x = rset.get_entity(1, 0) |
414 x = rset.get_entity(1, 0) |
467 x.complete() |
415 x.complete() |
468 self.assertEqual(x.login, None) |
416 self.assertEqual(x.login, None) |
469 self.assertTrue(x.creation_date) |
417 self.assertTrue(x.creation_date) |
470 cnx.rollback() |
|
471 cnx.close() |
|
472 |
418 |
473 def test_yams_inheritance_and_security_bug(self): |
419 def test_yams_inheritance_and_security_bug(self): |
474 oldperms = self.schema['Division'].permissions |
420 with self.temporary_permissions(Division={'read': ('managers', ERQLExpression('X owned_by U'))}): |
475 try: |
421 with self.login('iaminusersgrouponly'): |
476 self.schema['Division'].permissions = { |
422 querier = self.repo.querier |
477 'read': ('managers', ERQLExpression('X owned_by U')), |
423 rqlst = querier.parse('Any X WHERE X is_instance_of Societe') |
478 'add': ('managers', 'users'), |
424 querier.solutions(self.session, rqlst, {}) |
479 'update': ('managers', 'owners'), |
425 querier._annotate(rqlst) |
480 'delete': ('managers', 'owners')} |
426 plan = querier.plan_factory(rqlst, {}, self.session) |
481 self.login('iaminusersgrouponly') |
427 plan.preprocess(rqlst) |
482 querier = self.repo.querier |
428 self.assertEqual( |
483 rqlst = querier.parse('Any X WHERE X is_instance_of Societe') |
429 rqlst.as_string(), |
484 querier.solutions(self.session, rqlst, {}) |
430 '(Any X WHERE X is IN(SubDivision, Societe)) UNION (Any X WHERE X is Division, EXISTS(X owned_by %(B)s))') |
485 querier._annotate(rqlst) |
|
486 plan = querier.plan_factory(rqlst, {}, self.session) |
|
487 plan.preprocess(rqlst) |
|
488 self.assertEqual( |
|
489 rqlst.as_string(), |
|
490 '(Any X WHERE X is IN(SubDivision, Societe)) UNION (Any X WHERE X is Division, EXISTS(X owned_by %(B)s))') |
|
491 finally: |
|
492 self.schema['Division'].permissions = oldperms |
|
493 |
431 |
494 |
432 |
495 class BaseSchemaSecurityTC(BaseSecurityTC): |
433 class BaseSchemaSecurityTC(BaseSecurityTC): |
496 """tests related to the base schema permission configuration""" |
434 """tests related to the base schema permission configuration""" |
497 |
435 |
498 def test_user_can_delete_object_he_created(self): |
436 def test_user_can_delete_object_he_created(self): |
499 # even if some other user have changed object'state |
437 # even if some other user have changed object'state |
500 cnx = self.login('iaminusersgrouponly') |
438 with self.login('iaminusersgrouponly') as cu: |
501 cu = cnx.cursor() |
439 # due to security test, affaire has to concerne a societe the user owns |
502 # due to security test, affaire has to concerne a societe the user owns |
440 cu.execute('INSERT Societe X: X nom "ARCTIA"') |
503 cu.execute('INSERT Societe X: X nom "ARCTIA"') |
441 cu.execute('INSERT Affaire X: X ref "ARCT01", X concerne S WHERE S nom "ARCTIA"') |
504 cu.execute('INSERT Affaire X: X ref "ARCT01", X concerne S WHERE S nom "ARCTIA"') |
442 self.commit() |
505 cnx.commit() |
|
506 self.restore_connection() |
|
507 affaire = self.execute('Any X WHERE X ref "ARCT01"').get_entity(0, 0) |
443 affaire = self.execute('Any X WHERE X ref "ARCT01"').get_entity(0, 0) |
508 affaire.cw_adapt_to('IWorkflowable').fire_transition('abort') |
444 affaire.cw_adapt_to('IWorkflowable').fire_transition('abort') |
509 self.commit() |
445 self.commit() |
510 self.assertEqual(len(self.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01"')), |
446 self.assertEqual(len(self.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01"')), |
511 1) |
447 1) |
512 self.assertEqual(len(self.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01",' |
448 self.assertEqual(len(self.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01",' |
513 'X owned_by U, U login "admin"')), |
449 'X owned_by U, U login "admin"')), |
514 1) # TrInfo at the above state change |
450 1) # TrInfo at the above state change |
515 cnx = self.login('iaminusersgrouponly') |
451 with self.login('iaminusersgrouponly') as cu: |
516 cu = cnx.cursor() |
452 cu.execute('DELETE Affaire X WHERE X ref "ARCT01"') |
517 cu.execute('DELETE Affaire X WHERE X ref "ARCT01"') |
453 self.commit() |
518 cnx.commit() |
454 self.assertFalse(cu.execute('Affaire X')) |
519 self.assertFalse(cu.execute('Affaire X')) |
|
520 cnx.close() |
|
521 |
455 |
522 def test_users_and_groups_non_readable_by_guests(self): |
456 def test_users_and_groups_non_readable_by_guests(self): |
523 cnx = self.login('anon') |
457 with self.login('anon') as cu: |
524 anon = cnx.user(self.session) |
458 anon = cu.connection.user(self.session) |
525 cu = cnx.cursor() |
459 # anonymous user can only read itself |
526 # anonymous user can only read itself |
460 rset = cu.execute('Any L WHERE X owned_by U, U login L') |
527 rset = cu.execute('Any L WHERE X owned_by U, U login L') |
461 self.assertEqual(rset.rows, [['anon']]) |
528 self.assertEqual(rset.rows, [['anon']]) |
462 rset = cu.execute('CWUser X') |
529 rset = cu.execute('CWUser X') |
463 self.assertEqual(rset.rows, [[anon.eid]]) |
530 self.assertEqual(rset.rows, [[anon.eid]]) |
464 # anonymous user can read groups (necessary to check allowed transitions for instance) |
531 # anonymous user can read groups (necessary to check allowed transitions for instance) |
465 self.assert_(cu.execute('CWGroup X')) |
532 self.assert_(cu.execute('CWGroup X')) |
466 # should only be able to read the anonymous user, not another one |
533 # should only be able to read the anonymous user, not another one |
467 origuser = self.adminsession.user |
534 origuser = self.adminsession.user |
468 self.assertRaises(Unauthorized, |
535 self.assertRaises(Unauthorized, |
469 cu.execute, 'CWUser X WHERE X eid %(x)s', {'x': origuser.eid}) |
536 cu.execute, 'CWUser X WHERE X eid %(x)s', {'x': origuser.eid}) |
470 # nothing selected, nothing updated, no exception raised |
537 # nothing selected, nothing updated, no exception raised |
471 #self.assertRaises(Unauthorized, |
538 #self.assertRaises(Unauthorized, |
472 # cu.execute, 'SET X login "toto" WHERE X eid %(x)s', |
539 # cu.execute, 'SET X login "toto" WHERE X eid %(x)s', |
473 # {'x': self.user.eid}) |
540 # {'x': self.user.eid}) |
474 |
541 |
475 rset = cu.execute('CWUser X WHERE X eid %(x)s', {'x': anon.eid}) |
542 rset = cu.execute('CWUser X WHERE X eid %(x)s', {'x': anon.eid}) |
476 self.assertEqual(rset.rows, [[anon.eid]]) |
543 self.assertEqual(rset.rows, [[anon.eid]]) |
477 # but can't modify it |
544 # but can't modify it |
478 cu.execute('SET X login "toto" WHERE X eid %(x)s', {'x': anon.eid}) |
545 cu.execute('SET X login "toto" WHERE X eid %(x)s', {'x': anon.eid}) |
479 self.assertRaises(Unauthorized, self.commit) |
546 self.assertRaises(Unauthorized, cnx.commit) |
|
547 cnx.close() |
|
548 |
480 |
549 def test_in_group_relation(self): |
481 def test_in_group_relation(self): |
550 cnx = self.login('iaminusersgrouponly') |
482 with self.login('iaminusersgrouponly') as cu: |
551 cu = cnx.cursor() |
483 rql = u"DELETE U in_group G WHERE U login 'admin'" |
552 rql = u"DELETE U in_group G WHERE U login 'admin'" |
484 self.assertRaises(Unauthorized, cu.execute, rql) |
553 self.assertRaises(Unauthorized, cu.execute, rql) |
485 rql = u"SET U in_group G WHERE U login 'admin', G name 'users'" |
554 rql = u"SET U in_group G WHERE U login 'admin', G name 'users'" |
486 self.assertRaises(Unauthorized, cu.execute, rql) |
555 self.assertRaises(Unauthorized, cu.execute, rql) |
487 self.rollback() |
556 cnx.close() |
|
557 |
488 |
558 def test_owned_by(self): |
489 def test_owned_by(self): |
559 self.execute("INSERT Personne X: X nom 'bidule'") |
490 self.execute("INSERT Personne X: X nom 'bidule'") |
560 self.commit() |
491 self.commit() |
561 cnx = self.login('iaminusersgrouponly') |
492 with self.login('iaminusersgrouponly') as cu: |
562 cu = cnx.cursor() |
493 rql = u"SET X owned_by U WHERE U login 'iaminusersgrouponly', X is Personne" |
563 rql = u"SET X owned_by U WHERE U login 'iaminusersgrouponly', X is Personne" |
494 self.assertRaises(Unauthorized, cu.execute, rql) |
564 self.assertRaises(Unauthorized, cu.execute, rql) |
495 self.rollback() |
565 cnx.close() |
|
566 |
496 |
567 def test_bookmarked_by_guests_security(self): |
497 def test_bookmarked_by_guests_security(self): |
568 beid1 = self.execute('INSERT Bookmark B: B path "?vid=manage", B title "manage"')[0][0] |
498 beid1 = self.execute('INSERT Bookmark B: B path "?vid=manage", B title "manage"')[0][0] |
569 beid2 = self.execute('INSERT Bookmark B: B path "?vid=index", B title "index", B bookmarked_by U WHERE U login "anon"')[0][0] |
499 beid2 = self.execute('INSERT Bookmark B: B path "?vid=index", B title "index", B bookmarked_by U WHERE U login "anon"')[0][0] |
570 self.commit() |
500 self.commit() |
571 cnx = self.login('anon') |
501 with self.login('anon') as cu: |
572 cu = cnx.cursor() |
502 anoneid = self.session.user.eid |
573 anoneid = self.session.user.eid |
503 self.assertEqual(cu.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,' |
574 self.assertEqual(cu.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,' |
504 'B bookmarked_by U, U eid %s' % anoneid).rows, |
575 'B bookmarked_by U, U eid %s' % anoneid).rows, |
505 [['index', '?vid=index']]) |
576 [['index', '?vid=index']]) |
506 self.assertEqual(cu.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,' |
577 self.assertEqual(cu.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,' |
507 'B bookmarked_by U, U eid %(x)s', {'x': anoneid}).rows, |
578 'B bookmarked_by U, U eid %(x)s', {'x': anoneid}).rows, |
508 [['index', '?vid=index']]) |
579 [['index', '?vid=index']]) |
509 # can read others bookmarks as well |
580 # can read others bookmarks as well |
510 self.assertEqual(cu.execute('Any B where B is Bookmark, NOT B bookmarked_by U').rows, |
581 self.assertEqual(cu.execute('Any B where B is Bookmark, NOT B bookmarked_by U').rows, |
511 [[beid1]]) |
582 [[beid1]]) |
512 self.assertRaises(Unauthorized, cu.execute,'DELETE B bookmarked_by U') |
583 self.assertRaises(Unauthorized, cu.execute,'DELETE B bookmarked_by U') |
513 self.assertRaises(Unauthorized, |
584 self.assertRaises(Unauthorized, |
514 cu.execute, 'SET B bookmarked_by U WHERE U eid %(x)s, B eid %(b)s', |
585 cu.execute, 'SET B bookmarked_by U WHERE U eid %(x)s, B eid %(b)s', |
515 {'x': anoneid, 'b': beid1}) |
586 {'x': anoneid, 'b': beid1}) |
516 self.rollback() |
587 cnx.close() |
|
588 |
|
589 |
517 |
590 def test_ambigous_ordered(self): |
518 def test_ambigous_ordered(self): |
591 cnx = self.login('anon') |
519 with self.login('anon') as cu: |
592 cu = cnx.cursor() |
520 names = [t for t, in cu.execute('Any N ORDERBY lower(N) WHERE X name N')] |
593 names = [t for t, in cu.execute('Any N ORDERBY lower(N) WHERE X name N')] |
521 self.assertEqual(names, sorted(names, key=lambda x: x.lower())) |
594 self.assertEqual(names, sorted(names, key=lambda x: x.lower())) |
|
595 cnx.close() |
|
596 |
522 |
597 def test_restrict_is_instance_ok(self): |
523 def test_restrict_is_instance_ok(self): |
598 from rql import RQLException |
|
599 rset = self.execute('Any X WHERE X is_instance_of BaseTransition') |
524 rset = self.execute('Any X WHERE X is_instance_of BaseTransition') |
600 rqlst = rset.syntax_tree() |
525 rqlst = rset.syntax_tree() |
601 select = rqlst.children[0] |
526 select = rqlst.children[0] |
602 x = select.get_selected_variables().next() |
527 x = select.get_selected_variables().next() |
603 self.assertRaises(RQLException, select.add_type_restriction, |
528 self.assertRaises(RQLException, select.add_type_restriction, |