90 def tearDown(self): |
87 def tearDown(self): |
91 self.repo.system_source.__dict__.pop('syntax_tree_search', None) |
88 self.repo.system_source.__dict__.pop('syntax_tree_search', None) |
92 super(SecurityRewritingTC, self).tearDown() |
89 super(SecurityRewritingTC, self).tearDown() |
93 |
90 |
94 def test_not_relation_read_security(self): |
91 def test_not_relation_read_security(self): |
95 with self.login('iaminusersgrouponly'): |
92 with self.new_access('iaminusersgrouponly').repo_cnx() as cnx: |
96 self.hijack_source_execute() |
93 self.hijack_source_execute() |
97 self.execute('Any U WHERE NOT A todo_by U, A is Affaire') |
94 cnx.execute('Any U WHERE NOT A todo_by U, A is Affaire') |
98 self.assertEqual(self.query[0][1].as_string(), |
95 self.assertEqual(self.query[0][1].as_string(), |
99 'Any U WHERE NOT EXISTS(A todo_by U), A is Affaire') |
96 'Any U WHERE NOT EXISTS(A todo_by U), A is Affaire') |
100 self.execute('Any U WHERE NOT EXISTS(A todo_by U), A is Affaire') |
97 cnx.execute('Any U WHERE NOT EXISTS(A todo_by U), A is Affaire') |
101 self.assertEqual(self.query[0][1].as_string(), |
98 self.assertEqual(self.query[0][1].as_string(), |
102 'Any U WHERE NOT EXISTS(A todo_by U), A is Affaire') |
99 'Any U WHERE NOT EXISTS(A todo_by U), A is Affaire') |
103 |
100 |
104 class SecurityTC(BaseSecurityTC): |
101 class SecurityTC(BaseSecurityTC): |
105 |
102 |
106 def setUp(self): |
103 def setUp(self): |
107 BaseSecurityTC.setUp(self) |
104 super(SecurityTC, self).setUp() |
108 # implicitly test manager can add some entities |
105 # implicitly test manager can add some entities |
109 self.execute("INSERT Affaire X: X sujet 'cool'") |
106 with self.admin_access.repo_cnx() as cnx: |
110 self.execute("INSERT Societe X: X nom 'logilab'") |
107 cnx.execute("INSERT Affaire X: X sujet 'cool'") |
111 self.execute("INSERT Personne X: X nom 'bidule'") |
108 cnx.execute("INSERT Societe X: X nom 'logilab'") |
112 self.execute('INSERT CWGroup X: X name "staff"') |
109 cnx.execute("INSERT Personne X: X nom 'bidule'") |
113 self.commit() |
110 cnx.execute('INSERT CWGroup X: X name "staff"') |
|
111 cnx.commit() |
114 |
112 |
115 def test_insert_security(self): |
113 def test_insert_security(self): |
116 with self.login('anon') as cu: |
114 with self.new_access('anon').repo_cnx() as cnx: |
117 cu.execute("INSERT Personne X: X nom 'bidule'") |
115 cnx.execute("INSERT Personne X: X nom 'bidule'") |
118 self.assertRaises(Unauthorized, self.commit) |
116 self.assertRaises(Unauthorized, cnx.commit) |
119 self.assertEqual(cu.execute('Personne X').rowcount, 1) |
117 self.assertEqual(cnx.execute('Personne X').rowcount, 1) |
120 |
118 |
121 def test_insert_rql_permission(self): |
119 def test_insert_rql_permission(self): |
122 # test user can only add une affaire related to a societe he owns |
120 # test user can only add une affaire related to a societe he owns |
123 with self.login('iaminusersgrouponly') as cu: |
121 with self.new_access('iaminusersgrouponly').repo_cnx() as cnx: |
124 cu.execute("INSERT Affaire X: X sujet 'cool'") |
122 cnx.execute("INSERT Affaire X: X sujet 'cool'") |
125 self.assertRaises(Unauthorized, self.commit) |
123 self.assertRaises(Unauthorized, cnx.commit) |
126 # test nothing has actually been inserted |
124 # test nothing has actually been inserted |
127 self.assertEqual(self.execute('Affaire X').rowcount, 1) |
125 with self.admin_access.repo_cnx() as cnx: |
128 with self.login('iaminusersgrouponly') as cu: |
126 self.assertEqual(cnx.execute('Affaire X').rowcount, 1) |
129 cu.execute("INSERT Affaire X: X sujet 'cool'") |
127 with self.new_access('iaminusersgrouponly').repo_cnx() as cnx: |
130 cu.execute("INSERT Societe X: X nom 'chouette'") |
128 cnx.execute("INSERT Affaire X: X sujet 'cool'") |
131 cu.execute("SET A concerne S WHERE A sujet 'cool', S nom 'chouette'") |
129 cnx.execute("INSERT Societe X: X nom 'chouette'") |
132 self.commit() |
130 cnx.execute("SET A concerne S WHERE A sujet 'cool', S nom 'chouette'") |
|
131 cnx.commit() |
133 |
132 |
134 def test_update_security_1(self): |
133 def test_update_security_1(self): |
135 with self.login('anon') as cu: |
134 with self.new_access('anon').repo_cnx() as cnx: |
136 # local security check |
135 # local security check |
137 cu.execute( "SET X nom 'bidulechouette' WHERE X is Personne") |
136 cnx.execute( "SET X nom 'bidulechouette' WHERE X is Personne") |
138 self.assertRaises(Unauthorized, self.commit) |
137 self.assertRaises(Unauthorized, cnx.commit) |
139 self.assertEqual(self.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0) |
138 with self.admin_access.repo_cnx() as cnx: |
|
139 self.assertEqual(cnx.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0) |
140 |
140 |
141 def test_update_security_2(self): |
141 def test_update_security_2(self): |
142 with self.temporary_permissions(Personne={'read': ('users', 'managers'), |
142 with self.temporary_permissions(Personne={'read': ('users', 'managers'), |
143 'add': ('guests', 'users', 'managers')}): |
143 'add': ('guests', 'users', 'managers')}): |
144 with self.login('anon') as cu: |
144 with self.new_access('anon').repo_cnx() as cnx: |
145 self.assertRaises(Unauthorized, cu.execute, |
145 self.assertRaises(Unauthorized, cnx.execute, |
146 "SET X nom 'bidulechouette' WHERE X is Personne") |
146 "SET X nom 'bidulechouette' WHERE X is Personne") |
147 self.rollback() |
|
148 # self.assertRaises(Unauthorized, cnx.commit) |
|
149 # test nothing has actually been inserted |
147 # test nothing has actually been inserted |
150 self.assertEqual(self.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0) |
148 with self.admin_access.repo_cnx() as cnx: |
|
149 self.assertEqual(cnx.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0) |
151 |
150 |
152 def test_update_security_3(self): |
151 def test_update_security_3(self): |
153 with self.login('iaminusersgrouponly') as cu: |
152 with self.new_access('iaminusersgrouponly').repo_cnx() as cnx: |
154 cu.execute("INSERT Personne X: X nom 'biduuule'") |
153 cnx.execute("INSERT Personne X: X nom 'biduuule'") |
155 cu.execute("INSERT Societe X: X nom 'looogilab'") |
154 cnx.execute("INSERT Societe X: X nom 'looogilab'") |
156 cu.execute("SET X travaille S WHERE X nom 'biduuule', S nom 'looogilab'") |
155 cnx.execute("SET X travaille S WHERE X nom 'biduuule', S nom 'looogilab'") |
157 |
156 |
158 def test_update_rql_permission(self): |
157 def test_update_rql_permission(self): |
159 self.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
158 with self.admin_access.repo_cnx() as cnx: |
160 self.commit() |
159 cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
|
160 cnx.commit() |
161 # test user can only update une affaire related to a societe he owns |
161 # test user can only update une affaire related to a societe he owns |
162 with self.login('iaminusersgrouponly') as cu: |
162 with self.new_access('iaminusersgrouponly').repo_cnx() as cnx: |
163 cu.execute("SET X sujet 'pascool' WHERE X is Affaire") |
163 cnx.execute("SET X sujet 'pascool' WHERE X is Affaire") |
164 # this won't actually do anything since the selection query won't return anything |
164 # this won't actually do anything since the selection query won't return anything |
165 self.commit() |
165 cnx.commit() |
166 # to actually get Unauthorized exception, try to update an entity we can read |
166 # to actually get Unauthorized exception, try to update an entity we can read |
167 cu.execute("SET X nom 'toto' WHERE X is Societe") |
167 cnx.execute("SET X nom 'toto' WHERE X is Societe") |
168 self.assertRaises(Unauthorized, self.commit) |
168 self.assertRaises(Unauthorized, cnx.commit) |
169 cu.execute("INSERT Affaire X: X sujet 'pascool'") |
169 cnx.execute("INSERT Affaire X: X sujet 'pascool'") |
170 cu.execute("INSERT Societe X: X nom 'chouette'") |
170 cnx.execute("INSERT Societe X: X nom 'chouette'") |
171 cu.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'") |
171 cnx.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'") |
172 cu.execute("SET X sujet 'habahsicestcool' WHERE X sujet 'pascool'") |
172 cnx.execute("SET X sujet 'habahsicestcool' WHERE X sujet 'pascool'") |
173 self.commit() |
173 cnx.commit() |
174 |
174 |
175 def test_delete_security(self): |
175 def test_delete_security(self): |
176 # FIXME: sample below fails because we don't detect "owner" can't delete |
176 # FIXME: sample below fails because we don't detect "owner" can't delete |
177 # user anyway, and since no user with login == 'bidule' exists, no |
177 # user anyway, and since no user with login == 'bidule' exists, no |
178 # exception is raised |
178 # exception is raised |
179 #user._groups = {'guests':1} |
179 #user._groups = {'guests':1} |
180 #self.assertRaises(Unauthorized, |
180 #self.assertRaises(Unauthorized, |
181 # self.o.execute, user, "DELETE CWUser X WHERE X login 'bidule'") |
181 # self.o.execute, user, "DELETE CWUser X WHERE X login 'bidule'") |
182 # check local security |
182 # check local security |
183 with self.login('iaminusersgrouponly') as cu: |
183 with self.new_access('iaminusersgrouponly').repo_cnx() as cnx: |
184 self.assertRaises(Unauthorized, cu.execute, "DELETE CWGroup Y WHERE Y name 'staff'") |
184 self.assertRaises(Unauthorized, cnx.execute, "DELETE CWGroup Y WHERE Y name 'staff'") |
185 self.rollback() |
|
186 |
185 |
187 def test_delete_rql_permission(self): |
186 def test_delete_rql_permission(self): |
188 self.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
187 with self.admin_access.repo_cnx() as cnx: |
189 self.commit() |
188 cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
|
189 cnx.commit() |
190 # test user can only dele une affaire related to a societe he owns |
190 # test user can only dele une affaire related to a societe he owns |
191 with self.login('iaminusersgrouponly') as cu: |
191 with self.new_access('iaminusersgrouponly').repo_cnx() as cnx: |
192 # this won't actually do anything since the selection query won't return anything |
192 # this won't actually do anything since the selection query won't return anything |
193 cu.execute("DELETE Affaire X") |
193 cnx.execute("DELETE Affaire X") |
194 self.commit() |
194 cnx.commit() |
195 # to actually get Unauthorized exception, try to delete an entity we can read |
195 # to actually get Unauthorized exception, try to delete an entity we can read |
196 self.assertRaises(Unauthorized, cu.execute, "DELETE Societe S") |
196 self.assertRaises(Unauthorized, cnx.execute, "DELETE Societe S") |
197 self.assertRaises(QueryError, self.commit) # can't commit anymore |
197 self.assertRaises(QueryError, cnx.commit) # can't commit anymore |
198 self.rollback() # required after Unauthorized |
198 cnx.rollback() |
199 cu.execute("INSERT Affaire X: X sujet 'pascool'") |
199 cnx.execute("INSERT Affaire X: X sujet 'pascool'") |
200 cu.execute("INSERT Societe X: X nom 'chouette'") |
200 cnx.execute("INSERT Societe X: X nom 'chouette'") |
201 cu.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'") |
201 cnx.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'") |
202 self.commit() |
202 cnx.commit() |
203 ## # this one should fail since it will try to delete two affaires, one authorized |
203 ## # this one should fail since it will try to delete two affaires, one authorized |
204 ## # and the other not |
204 ## # and the other not |
205 ## self.assertRaises(Unauthorized, cu.execute, "DELETE Affaire X") |
205 ## self.assertRaises(Unauthorized, cnx.execute, "DELETE Affaire X") |
206 cu.execute("DELETE Affaire X WHERE X sujet 'pascool'") |
206 cnx.execute("DELETE Affaire X WHERE X sujet 'pascool'") |
207 self.commit() |
207 cnx.commit() |
208 |
|
209 |
208 |
210 def test_insert_relation_rql_permission(self): |
209 def test_insert_relation_rql_permission(self): |
211 with self.login('iaminusersgrouponly') as cu: |
210 with self.new_access('iaminusersgrouponly').repo_cnx() as cnx: |
212 cu.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
211 cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
213 # should raise Unauthorized since user don't own S though this won't |
212 # should raise Unauthorized since user don't own S though this won't |
214 # actually do anything since the selection query won't return |
213 # actually do anything since the selection query won't return |
215 # anything |
214 # anything |
216 self.commit() |
215 cnx.commit() |
217 # to actually get Unauthorized exception, try to insert a relation |
216 # to actually get Unauthorized exception, try to insert a relation |
218 # were we can read both entities |
217 # were we can read both entities |
219 rset = cu.execute('Personne P') |
218 rset = cnx.execute('Personne P') |
220 self.assertEqual(len(rset), 1) |
219 self.assertEqual(len(rset), 1) |
221 ent = rset.get_entity(0, 0) |
220 ent = rset.get_entity(0, 0) |
222 self.assertFalse(cu.execute('Any P,S WHERE P travaille S,P is Personne, S is Societe')) |
221 self.assertFalse(cnx.execute('Any P,S WHERE P travaille S,P is Personne, S is Societe')) |
223 self.assertRaises(Unauthorized, ent.cw_check_perm, 'update') |
222 self.assertRaises(Unauthorized, ent.cw_check_perm, 'update') |
224 self.assertRaises(Unauthorized, |
223 self.assertRaises(Unauthorized, |
225 cu.execute, "SET P travaille S WHERE P is Personne, S is Societe") |
224 cnx.execute, "SET P travaille S WHERE P is Personne, S is Societe") |
226 self.assertRaises(QueryError, self.commit) # can't commit anymore |
225 self.assertRaises(QueryError, cnx.commit) # can't commit anymore |
227 self.rollback() |
226 cnx.rollback() |
228 # test nothing has actually been inserted: |
227 # test nothing has actually been inserted: |
229 self.assertFalse(cu.execute('Any P,S WHERE P travaille S,P is Personne, S is Societe')) |
228 self.assertFalse(cnx.execute('Any P,S WHERE P travaille S,P is Personne, S is Societe')) |
230 cu.execute("INSERT Societe X: X nom 'chouette'") |
229 cnx.execute("INSERT Societe X: X nom 'chouette'") |
231 cu.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'") |
230 cnx.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'") |
232 self.commit() |
231 cnx.commit() |
233 |
232 |
234 def test_delete_relation_rql_permission(self): |
233 def test_delete_relation_rql_permission(self): |
235 self.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
234 with self.admin_access.repo_cnx() as cnx: |
236 self.commit() |
235 cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
237 with self.login('iaminusersgrouponly') as cu: |
236 cnx.commit() |
|
237 with self.new_access('iaminusersgrouponly').repo_cnx() as cnx: |
238 # this won't actually do anything since the selection query won't return anything |
238 # this won't actually do anything since the selection query won't return anything |
239 cu.execute("DELETE A concerne S") |
239 cnx.execute("DELETE A concerne S") |
240 self.commit() |
240 cnx.commit() |
241 # to actually get Unauthorized exception, try to delete a relation we can read |
241 with self.admin_access.repo_cnx() as cnx: |
242 eid = self.execute("INSERT Affaire X: X sujet 'pascool'")[0][0] |
242 # to actually get Unauthorized exception, try to delete a relation we can read |
243 self.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"', |
243 eid = cnx.execute("INSERT Affaire X: X sujet 'pascool'")[0][0] |
244 {'x': eid}) |
244 cnx.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"', |
245 self.execute("SET A concerne S WHERE A sujet 'pascool', S is Societe") |
245 {'x': eid}) |
246 self.commit() |
246 cnx.execute("SET A concerne S WHERE A sujet 'pascool', S is Societe") |
247 with self.login('iaminusersgrouponly') as cu: |
247 cnx.commit() |
248 self.assertRaises(Unauthorized, cu.execute, "DELETE A concerne S") |
248 with self.new_access('iaminusersgrouponly').repo_cnx() as cnx: |
249 self.assertRaises(QueryError, self.commit) # can't commit anymore |
249 self.assertRaises(Unauthorized, cnx.execute, "DELETE A concerne S") |
250 self.rollback() # required after Unauthorized |
250 self.assertRaises(QueryError, cnx.commit) # can't commit anymore |
251 cu.execute("INSERT Societe X: X nom 'chouette'") |
251 cnx.rollback() |
252 cu.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'") |
252 cnx.execute("INSERT Societe X: X nom 'chouette'") |
253 self.commit() |
253 cnx.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'") |
254 cu.execute("DELETE A concerne S WHERE S nom 'chouette'") |
254 cnx.commit() |
255 self.commit() |
255 cnx.execute("DELETE A concerne S WHERE S nom 'chouette'") |
|
256 cnx.commit() |
256 |
257 |
257 |
258 |
258 def test_user_can_change_its_upassword(self): |
259 def test_user_can_change_its_upassword(self): |
259 req = self.request() |
260 with self.admin_access.repo_cnx() as cnx: |
260 ueid = self.create_user(req, 'user').eid |
261 ueid = self.create_user(cnx, 'user').eid |
261 with self.login('user') as cu: |
262 with self.new_access('user').repo_cnx() as cnx: |
262 cu.execute('SET X upassword %(passwd)s WHERE X eid %(x)s', |
263 cnx.execute('SET X upassword %(passwd)s WHERE X eid %(x)s', |
263 {'x': ueid, 'passwd': 'newpwd'}) |
264 {'x': ueid, 'passwd': 'newpwd'}) |
264 self.commit() |
265 cnx.commit() |
265 cnx = self.login('user', password='newpwd') |
266 self.repo.close(self.repo.connect('user', password='newpwd')) |
266 cnx.close() |
|
267 |
267 |
268 def test_user_cant_change_other_upassword(self): |
268 def test_user_cant_change_other_upassword(self): |
269 req = self.request() |
269 with self.admin_access.repo_cnx() as cnx: |
270 ueid = self.create_user(req, 'otheruser').eid |
270 ueid = self.create_user(cnx, 'otheruser').eid |
271 with self.login('iaminusersgrouponly') as cu: |
271 with self.new_access('iaminusersgrouponly').repo_cnx() as cnx: |
272 cu.execute('SET X upassword %(passwd)s WHERE X eid %(x)s', |
272 cnx.execute('SET X upassword %(passwd)s WHERE X eid %(x)s', |
273 {'x': ueid, 'passwd': 'newpwd'}) |
273 {'x': ueid, 'passwd': 'newpwd'}) |
274 self.assertRaises(Unauthorized, self.commit) |
274 self.assertRaises(Unauthorized, cnx.commit) |
275 |
275 |
276 # read security test |
276 # read security test |
277 |
277 |
278 def test_read_base(self): |
278 def test_read_base(self): |
279 with self.temporary_permissions(Personne={'read': ('users', 'managers')}): |
279 with self.temporary_permissions(Personne={'read': ('users', 'managers')}): |
280 with self.login('anon') as cu: |
280 with self.new_access('anon').repo_cnx() as cnx: |
281 self.assertRaises(Unauthorized, |
281 self.assertRaises(Unauthorized, |
282 cu.execute, 'Personne U where U nom "managers"') |
282 cnx.execute, 'Personne U where U nom "managers"') |
283 self.rollback() |
|
284 |
283 |
285 def test_read_erqlexpr_base(self): |
284 def test_read_erqlexpr_base(self): |
286 eid = self.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
285 with self.admin_access.repo_cnx() as cnx: |
287 self.commit() |
286 eid = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
288 with self.login('iaminusersgrouponly') as cu: |
287 cnx.commit() |
289 rset = cu.execute('Affaire X') |
288 with self.new_access('iaminusersgrouponly').repo_cnx() as cnx: |
|
289 rset = cnx.execute('Affaire X') |
290 self.assertEqual(rset.rows, []) |
290 self.assertEqual(rset.rows, []) |
291 self.assertRaises(Unauthorized, cu.execute, 'Any X WHERE X eid %(x)s', {'x': eid}) |
291 self.assertRaises(Unauthorized, cnx.execute, 'Any X WHERE X eid %(x)s', {'x': eid}) |
292 # cache test |
292 # cache test |
293 self.assertRaises(Unauthorized, cu.execute, 'Any X WHERE X eid %(x)s', {'x': eid}) |
293 self.assertRaises(Unauthorized, cnx.execute, 'Any X WHERE X eid %(x)s', {'x': eid}) |
294 aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
294 aff2 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
295 soc1 = cu.execute("INSERT Societe X: X nom 'chouette'")[0][0] |
295 soc1 = cnx.execute("INSERT Societe X: X nom 'chouette'")[0][0] |
296 cu.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
296 cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
297 self.commit() |
297 cnx.commit() |
298 rset = cu.execute('Any X WHERE X eid %(x)s', {'x': aff2}) |
298 rset = cnx.execute('Any X WHERE X eid %(x)s', {'x': aff2}) |
299 self.assertEqual(rset.rows, [[aff2]]) |
299 self.assertEqual(rset.rows, [[aff2]]) |
300 # more cache test w/ NOT eid |
300 # more cache test w/ NOT eid |
301 rset = cu.execute('Affaire X WHERE NOT X eid %(x)s', {'x': eid}) |
301 rset = cnx.execute('Affaire X WHERE NOT X eid %(x)s', {'x': eid}) |
302 self.assertEqual(rset.rows, [[aff2]]) |
302 self.assertEqual(rset.rows, [[aff2]]) |
303 rset = cu.execute('Affaire X WHERE NOT X eid %(x)s', {'x': aff2}) |
303 rset = cnx.execute('Affaire X WHERE NOT X eid %(x)s', {'x': aff2}) |
304 self.assertEqual(rset.rows, []) |
304 self.assertEqual(rset.rows, []) |
305 # test can't update an attribute of an entity that can't be readen |
305 # test can't update an attribute of an entity that can't be readen |
306 self.assertRaises(Unauthorized, cu.execute, |
306 self.assertRaises(Unauthorized, cnx.execute, |
307 'SET X sujet "hacked" WHERE X eid %(x)s', {'x': eid}) |
307 'SET X sujet "hacked" WHERE X eid %(x)s', {'x': eid}) |
308 self.rollback() |
|
309 |
308 |
310 |
309 |
311 def test_entity_created_in_transaction(self): |
310 def test_entity_created_in_transaction(self): |
312 affschema = self.schema['Affaire'] |
311 affschema = self.schema['Affaire'] |
313 with self.temporary_permissions(Affaire={'read': affschema.permissions['add']}): |
312 with self.temporary_permissions(Affaire={'read': affschema.permissions['add']}): |
314 with self.login('iaminusersgrouponly') as cu: |
313 with self.new_access('iaminusersgrouponly').repo_cnx() as cnx: |
315 aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
314 aff2 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
316 # entity created in transaction are readable *by eid* |
315 # entity created in transaction are readable *by eid* |
317 self.assertTrue(cu.execute('Any X WHERE X eid %(x)s', {'x':aff2})) |
316 self.assertTrue(cnx.execute('Any X WHERE X eid %(x)s', {'x':aff2})) |
318 # XXX would be nice if it worked |
317 # XXX would be nice if it worked |
319 rset = cu.execute("Affaire X WHERE X sujet 'cool'") |
318 rset = cnx.execute("Affaire X WHERE X sujet 'cool'") |
320 self.assertEqual(len(rset), 0) |
319 self.assertEqual(len(rset), 0) |
321 self.assertRaises(Unauthorized, self.commit) |
320 self.assertRaises(Unauthorized, cnx.commit) |
322 |
321 |
323 def test_read_erqlexpr_has_text1(self): |
322 def test_read_erqlexpr_has_text1(self): |
324 aff1 = self.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
323 with self.admin_access.repo_cnx() as cnx: |
325 card1 = self.execute("INSERT Card X: X title 'cool'")[0][0] |
324 aff1 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
326 self.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"', |
325 card1 = cnx.execute("INSERT Card X: X title 'cool'")[0][0] |
327 {'x': card1}) |
326 cnx.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"', |
328 self.commit() |
327 {'x': card1}) |
329 with self.login('iaminusersgrouponly') as cu: |
328 cnx.commit() |
330 aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
329 with self.new_access('iaminusersgrouponly').repo_cnx() as cnx: |
331 soc1 = cu.execute("INSERT Societe X: X nom 'chouette'")[0][0] |
330 aff2 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
332 cu.execute("SET A concerne S WHERE A eid %(a)s, S eid %(s)s", {'a': aff2, 's': soc1}) |
331 soc1 = cnx.execute("INSERT Societe X: X nom 'chouette'")[0][0] |
333 self.commit() |
332 cnx.execute("SET A concerne S WHERE A eid %(a)s, S eid %(s)s", {'a': aff2, 's': soc1}) |
334 self.assertRaises(Unauthorized, cu.execute, 'Any X WHERE X eid %(x)s', {'x':aff1}) |
333 cnx.commit() |
335 self.assertTrue(cu.execute('Any X WHERE X eid %(x)s', {'x':aff2})) |
334 self.assertRaises(Unauthorized, cnx.execute, 'Any X WHERE X eid %(x)s', {'x':aff1}) |
336 self.assertTrue(cu.execute('Any X WHERE X eid %(x)s', {'x':card1})) |
335 self.assertTrue(cnx.execute('Any X WHERE X eid %(x)s', {'x':aff2})) |
337 rset = cu.execute("Any X WHERE X has_text 'cool'") |
336 self.assertTrue(cnx.execute('Any X WHERE X eid %(x)s', {'x':card1})) |
|
337 rset = cnx.execute("Any X WHERE X has_text 'cool'") |
338 self.assertEqual(sorted(eid for eid, in rset.rows), |
338 self.assertEqual(sorted(eid for eid, in rset.rows), |
339 [card1, aff2]) |
339 [card1, aff2]) |
340 self.rollback() |
|
341 |
340 |
342 def test_read_erqlexpr_has_text2(self): |
341 def test_read_erqlexpr_has_text2(self): |
343 self.execute("INSERT Personne X: X nom 'bidule'") |
342 with self.admin_access.repo_cnx() as cnx: |
344 self.execute("INSERT Societe X: X nom 'bidule'") |
343 cnx.execute("INSERT Personne X: X nom 'bidule'") |
345 self.commit() |
344 cnx.execute("INSERT Societe X: X nom 'bidule'") |
|
345 cnx.commit() |
346 with self.temporary_permissions(Personne={'read': ('managers',)}): |
346 with self.temporary_permissions(Personne={'read': ('managers',)}): |
347 with self.login('iaminusersgrouponly') as cu: |
347 with self.new_access('iaminusersgrouponly').repo_cnx() as cnx: |
348 rset = cu.execute('Any N WHERE N has_text "bidule"') |
348 rset = cnx.execute('Any N WHERE N has_text "bidule"') |
349 self.assertEqual(len(rset.rows), 1, rset.rows) |
349 self.assertEqual(len(rset.rows), 1, rset.rows) |
350 rset = cu.execute('Any N WITH N BEING (Any N WHERE N has_text "bidule")') |
350 rset = cnx.execute('Any N WITH N BEING (Any N WHERE N has_text "bidule")') |
351 self.assertEqual(len(rset.rows), 1, rset.rows) |
351 self.assertEqual(len(rset.rows), 1, rset.rows) |
352 |
352 |
353 def test_read_erqlexpr_optional_rel(self): |
353 def test_read_erqlexpr_optional_rel(self): |
354 self.execute("INSERT Personne X: X nom 'bidule'") |
354 with self.admin_access.repo_cnx() as cnx: |
355 self.execute("INSERT Societe X: X nom 'bidule'") |
355 cnx.execute("INSERT Personne X: X nom 'bidule'") |
356 self.commit() |
356 cnx.execute("INSERT Societe X: X nom 'bidule'") |
|
357 cnx.commit() |
357 with self.temporary_permissions(Personne={'read': ('managers',)}): |
358 with self.temporary_permissions(Personne={'read': ('managers',)}): |
358 with self.login('anon') as cu: |
359 with self.new_access('anon').repo_cnx() as cnx: |
359 rset = cu.execute('Any N,U WHERE N has_text "bidule", N owned_by U?') |
360 rset = cnx.execute('Any N,U WHERE N has_text "bidule", N owned_by U?') |
360 self.assertEqual(len(rset.rows), 1, rset.rows) |
361 self.assertEqual(len(rset.rows), 1, rset.rows) |
361 |
362 |
362 def test_read_erqlexpr_aggregat(self): |
363 def test_read_erqlexpr_aggregat(self): |
363 self.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
364 with self.admin_access.repo_cnx() as cnx: |
364 self.commit() |
365 cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
365 with self.login('iaminusersgrouponly') as cu: |
366 cnx.commit() |
366 rset = cu.execute('Any COUNT(X) WHERE X is Affaire') |
367 with self.new_access('iaminusersgrouponly').repo_cnx() as cnx: |
|
368 rset = cnx.execute('Any COUNT(X) WHERE X is Affaire') |
367 self.assertEqual(rset.rows, [[0]]) |
369 self.assertEqual(rset.rows, [[0]]) |
368 aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
370 aff2 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
369 soc1 = cu.execute("INSERT Societe X: X nom 'chouette'")[0][0] |
371 soc1 = cnx.execute("INSERT Societe X: X nom 'chouette'")[0][0] |
370 cu.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
372 cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
371 self.commit() |
373 cnx.commit() |
372 rset = cu.execute('Any COUNT(X) WHERE X is Affaire') |
374 rset = cnx.execute('Any COUNT(X) WHERE X is Affaire') |
373 self.assertEqual(rset.rows, [[1]]) |
375 self.assertEqual(rset.rows, [[1]]) |
374 rset = cu.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN') |
376 rset = cnx.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN') |
375 values = dict(rset) |
377 values = dict(rset) |
376 self.assertEqual(values['Affaire'], 1) |
378 self.assertEqual(values['Affaire'], 1) |
377 self.assertEqual(values['Societe'], 2) |
379 self.assertEqual(values['Societe'], 2) |
378 rset = cu.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN ' |
380 rset = cnx.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN ' |
379 'WITH X BEING ((Affaire X) UNION (Societe X))') |
381 'WITH X BEING ((Affaire X) UNION (Societe X))') |
380 self.assertEqual(len(rset), 2) |
382 self.assertEqual(len(rset), 2) |
381 values = dict(rset) |
383 values = dict(rset) |
382 self.assertEqual(values['Affaire'], 1) |
384 self.assertEqual(values['Affaire'], 1) |
383 self.assertEqual(values['Societe'], 2) |
385 self.assertEqual(values['Societe'], 2) |
384 |
386 |
385 |
387 |
386 def test_attribute_security(self): |
388 def test_attribute_security(self): |
387 # only managers should be able to edit the 'test' attribute of Personne entities |
389 with self.admin_access.repo_cnx() as cnx: |
388 eid = self.execute("INSERT Personne X: X nom 'bidule', " |
390 # only managers should be able to edit the 'test' attribute of Personne entities |
389 "X web 'http://www.debian.org', X test TRUE")[0][0] |
391 eid = cnx.execute("INSERT Personne X: X nom 'bidule', " |
390 self.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid}) |
392 "X web 'http://www.debian.org', X test TRUE")[0][0] |
391 self.commit() |
393 cnx.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid}) |
392 with self.login('iaminusersgrouponly') as cu: |
394 cnx.commit() |
393 cu.execute("INSERT Personne X: X nom 'bidule', " |
395 with self.new_access('iaminusersgrouponly').repo_cnx() as cnx: |
|
396 cnx.execute("INSERT Personne X: X nom 'bidule', " |
394 "X web 'http://www.debian.org', X test TRUE") |
397 "X web 'http://www.debian.org', X test TRUE") |
395 self.assertRaises(Unauthorized, self.commit) |
398 self.assertRaises(Unauthorized, cnx.commit) |
396 cu.execute("INSERT Personne X: X nom 'bidule', " |
399 cnx.execute("INSERT Personne X: X nom 'bidule', " |
397 "X web 'http://www.debian.org', X test FALSE") |
400 "X web 'http://www.debian.org', X test FALSE") |
398 self.assertRaises(Unauthorized, self.commit) |
401 self.assertRaises(Unauthorized, cnx.commit) |
399 eid = cu.execute("INSERT Personne X: X nom 'bidule', " |
402 eid = cnx.execute("INSERT Personne X: X nom 'bidule', " |
400 "X web 'http://www.debian.org'")[0][0] |
403 "X web 'http://www.debian.org'")[0][0] |
401 self.commit() |
404 cnx.commit() |
402 cu.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid}) |
405 cnx.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid}) |
403 self.assertRaises(Unauthorized, self.commit) |
406 self.assertRaises(Unauthorized, cnx.commit) |
404 cu.execute('SET X test TRUE WHERE X eid %(x)s', {'x': eid}) |
407 cnx.execute('SET X test TRUE WHERE X eid %(x)s', {'x': eid}) |
405 self.assertRaises(Unauthorized, self.commit) |
408 self.assertRaises(Unauthorized, cnx.commit) |
406 cu.execute('SET X web "http://www.logilab.org" WHERE X eid %(x)s', {'x': eid}) |
409 cnx.execute('SET X web "http://www.logilab.org" WHERE X eid %(x)s', {'x': eid}) |
407 self.commit() |
410 cnx.commit() |
408 |
411 |
409 def test_attribute_security_rqlexpr(self): |
412 def test_attribute_security_rqlexpr(self): |
410 # Note.para attribute editable by managers or if the note is in "todo" state |
413 with self.admin_access.repo_cnx() as cnx: |
411 note = self.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0) |
414 # Note.para attribute editable by managers or if the note is in "todo" state |
412 self.commit() |
415 note = cnx.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0) |
413 note.cw_adapt_to('IWorkflowable').fire_transition('markasdone') |
416 cnx.commit() |
414 self.execute('SET X para "truc" WHERE X eid %(x)s', {'x': note.eid}) |
417 note.cw_adapt_to('IWorkflowable').fire_transition('markasdone') |
415 self.commit() |
418 cnx.execute('SET X para "truc" WHERE X eid %(x)s', {'x': note.eid}) |
416 with self.login('iaminusersgrouponly') as cu: |
419 cnx.commit() |
417 cu.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note.eid}) |
420 with self.new_access('iaminusersgrouponly').repo_cnx() as cnx: |
418 self.assertRaises(Unauthorized, self.commit) |
421 cnx.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note.eid}) |
419 note2 = cu.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0) |
422 self.assertRaises(Unauthorized, cnx.commit) |
420 self.commit() |
423 note2 = cnx.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0) |
|
424 cnx.commit() |
421 note2.cw_adapt_to('IWorkflowable').fire_transition('markasdone') |
425 note2.cw_adapt_to('IWorkflowable').fire_transition('markasdone') |
422 self.commit() |
426 cnx.commit() |
423 self.assertEqual(len(cu.execute('Any X WHERE X in_state S, S name "todo", X eid %(x)s', |
427 self.assertEqual(len(cnx.execute('Any X WHERE X in_state S, S name "todo", X eid %(x)s', |
424 {'x': note2.eid})), |
428 {'x': note2.eid})), |
425 0) |
429 0) |
426 cu.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid}) |
430 cnx.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid}) |
427 self.assertRaises(Unauthorized, self.commit) |
431 self.assertRaises(Unauthorized, cnx.commit) |
428 note2.cw_adapt_to('IWorkflowable').fire_transition('redoit') |
432 note2.cw_adapt_to('IWorkflowable').fire_transition('redoit') |
429 self.commit() |
433 cnx.commit() |
430 cu.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid}) |
434 cnx.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid}) |
431 self.commit() |
435 cnx.commit() |
432 cu.execute("INSERT Note X: X something 'A'") |
436 cnx.execute("INSERT Note X: X something 'A'") |
433 self.assertRaises(Unauthorized, self.commit) |
437 self.assertRaises(Unauthorized, cnx.commit) |
434 cu.execute("INSERT Note X: X para 'zogzog', X something 'A'") |
438 cnx.execute("INSERT Note X: X para 'zogzog', X something 'A'") |
435 self.commit() |
439 cnx.commit() |
436 note = cu.execute("INSERT Note X").get_entity(0,0) |
440 note = cnx.execute("INSERT Note X").get_entity(0,0) |
437 self.commit() |
441 cnx.commit() |
438 note.cw_set(something=u'B') |
442 note.cw_set(something=u'B') |
439 self.commit() |
443 cnx.commit() |
440 note.cw_set(something=None, para=u'zogzog') |
444 note.cw_set(something=None, para=u'zogzog') |
441 self.commit() |
445 cnx.commit() |
442 |
446 |
443 def test_attribute_read_security(self): |
447 def test_attribute_read_security(self): |
444 # anon not allowed to see users'login, but they can see users |
448 # anon not allowed to see users'login, but they can see users |
445 login_rdef = self.repo.schema['CWUser'].rdef('login') |
449 login_rdef = self.repo.schema['CWUser'].rdef('login') |
446 with self.temporary_permissions((login_rdef, {'read': ('users', 'managers')}), |
450 with self.temporary_permissions((login_rdef, {'read': ('users', 'managers')}), |
447 CWUser={'read': ('guests', 'users', 'managers')}): |
451 CWUser={'read': ('guests', 'users', 'managers')}): |
448 with self.login('anon') as cu: |
452 with self.new_access('anon').repo_cnx() as cnx: |
449 rset = cu.execute('CWUser X') |
453 rset = cnx.execute('CWUser X') |
450 self.assertTrue(rset) |
454 self.assertTrue(rset) |
451 x = rset.get_entity(0, 0) |
455 x = rset.get_entity(0, 0) |
452 self.assertEqual(x.login, None) |
456 self.assertEqual(x.login, None) |
453 self.assertTrue(x.creation_date) |
457 self.assertTrue(x.creation_date) |
454 x = rset.get_entity(1, 0) |
458 x = rset.get_entity(1, 0) |
475 class BaseSchemaSecurityTC(BaseSecurityTC): |
479 class BaseSchemaSecurityTC(BaseSecurityTC): |
476 """tests related to the base schema permission configuration""" |
480 """tests related to the base schema permission configuration""" |
477 |
481 |
478 def test_user_can_delete_object_he_created(self): |
482 def test_user_can_delete_object_he_created(self): |
479 # even if some other user have changed object'state |
483 # even if some other user have changed object'state |
480 with self.login('iaminusersgrouponly') as cu: |
484 with self.new_access('iaminusersgrouponly').repo_cnx() as cnx: |
481 # due to security test, affaire has to concerne a societe the user owns |
485 # due to security test, affaire has to concerne a societe the user owns |
482 cu.execute('INSERT Societe X: X nom "ARCTIA"') |
486 cnx.execute('INSERT Societe X: X nom "ARCTIA"') |
483 cu.execute('INSERT Affaire X: X ref "ARCT01", X concerne S WHERE S nom "ARCTIA"') |
487 cnx.execute('INSERT Affaire X: X ref "ARCT01", X concerne S WHERE S nom "ARCTIA"') |
484 self.commit() |
488 cnx.commit() |
485 affaire = self.execute('Any X WHERE X ref "ARCT01"').get_entity(0, 0) |
489 with self.admin_access.repo_cnx() as cnx: |
486 affaire.cw_adapt_to('IWorkflowable').fire_transition('abort') |
490 affaire = cnx.execute('Any X WHERE X ref "ARCT01"').get_entity(0, 0) |
487 self.commit() |
491 affaire.cw_adapt_to('IWorkflowable').fire_transition('abort') |
488 self.assertEqual(len(self.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01"')), |
492 cnx.commit() |
489 1) |
493 self.assertEqual(len(cnx.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01"')), |
490 self.assertEqual(len(self.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01",' |
494 1) |
491 'X owned_by U, U login "admin"')), |
495 self.assertEqual(len(cnx.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01",' |
492 1) # TrInfo at the above state change |
496 'X owned_by U, U login "admin"')), |
493 with self.login('iaminusersgrouponly') as cu: |
497 1) # TrInfo at the above state change |
494 cu.execute('DELETE Affaire X WHERE X ref "ARCT01"') |
498 with self.new_access('iaminusersgrouponly').repo_cnx() as cnx: |
495 self.commit() |
499 cnx.execute('DELETE Affaire X WHERE X ref "ARCT01"') |
496 self.assertFalse(cu.execute('Affaire X')) |
500 cnx.commit() |
|
501 self.assertFalse(cnx.execute('Affaire X')) |
497 |
502 |
498 def test_users_and_groups_non_readable_by_guests(self): |
503 def test_users_and_groups_non_readable_by_guests(self): |
499 with self.login('anon') as cu: |
504 with self.repo.internal_cnx() as cnx: |
500 anon = cu.connection.user(self.session) |
505 admineid = cnx.execute('CWUser U WHERE U login "admin"').rows[0][0] |
|
506 with self.new_access('anon').repo_cnx() as cnx: |
|
507 anon = cnx.user |
501 # anonymous user can only read itself |
508 # anonymous user can only read itself |
502 rset = cu.execute('Any L WHERE X owned_by U, U login L') |
509 rset = cnx.execute('Any L WHERE X owned_by U, U login L') |
503 self.assertEqual([['anon']], rset.rows) |
510 self.assertEqual([['anon']], rset.rows) |
504 rset = cu.execute('CWUser X') |
511 rset = cnx.execute('CWUser X') |
505 self.assertEqual([[anon.eid]], rset.rows) |
512 self.assertEqual([[anon.eid]], rset.rows) |
506 # anonymous user can read groups (necessary to check allowed transitions for instance) |
513 # anonymous user can read groups (necessary to check allowed transitions for instance) |
507 self.assert_(cu.execute('CWGroup X')) |
514 self.assert_(cnx.execute('CWGroup X')) |
508 # should only be able to read the anonymous user, not another one |
515 # should only be able to read the anonymous user, not another one |
509 origuser = self.adminsession.user |
|
510 self.assertRaises(Unauthorized, |
516 self.assertRaises(Unauthorized, |
511 cu.execute, 'CWUser X WHERE X eid %(x)s', {'x': origuser.eid}) |
517 cnx.execute, 'CWUser X WHERE X eid %(x)s', {'x': admineid}) |
512 # nothing selected, nothing updated, no exception raised |
518 rset = cnx.execute('CWUser X WHERE X eid %(x)s', {'x': anon.eid}) |
513 #self.assertRaises(Unauthorized, |
|
514 # cu.execute, 'SET X login "toto" WHERE X eid %(x)s', |
|
515 # {'x': self.user.eid}) |
|
516 |
|
517 rset = cu.execute('CWUser X WHERE X eid %(x)s', {'x': anon.eid}) |
|
518 self.assertEqual([[anon.eid]], rset.rows) |
519 self.assertEqual([[anon.eid]], rset.rows) |
519 # but can't modify it |
520 # but can't modify it |
520 cu.execute('SET X login "toto" WHERE X eid %(x)s', {'x': anon.eid}) |
521 cnx.execute('SET X login "toto" WHERE X eid %(x)s', {'x': anon.eid}) |
521 self.assertRaises(Unauthorized, self.commit) |
522 self.assertRaises(Unauthorized, cnx.commit) |
522 |
523 |
523 def test_in_group_relation(self): |
524 def test_in_group_relation(self): |
524 with self.login('iaminusersgrouponly') as cu: |
525 with self.new_access('iaminusersgrouponly').repo_cnx() as cnx: |
525 rql = u"DELETE U in_group G WHERE U login 'admin'" |
526 rql = u"DELETE U in_group G WHERE U login 'admin'" |
526 self.assertRaises(Unauthorized, cu.execute, rql) |
527 self.assertRaises(Unauthorized, cnx.execute, rql) |
527 rql = u"SET U in_group G WHERE U login 'admin', G name 'users'" |
528 rql = u"SET U in_group G WHERE U login 'admin', G name 'users'" |
528 self.assertRaises(Unauthorized, cu.execute, rql) |
529 self.assertRaises(Unauthorized, cnx.execute, rql) |
529 self.rollback() |
|
530 |
530 |
531 def test_owned_by(self): |
531 def test_owned_by(self): |
532 self.execute("INSERT Personne X: X nom 'bidule'") |
532 with self.admin_access.repo_cnx() as cnx: |
533 self.commit() |
533 cnx.execute("INSERT Personne X: X nom 'bidule'") |
534 with self.login('iaminusersgrouponly') as cu: |
534 cnx.commit() |
|
535 with self.new_access('iaminusersgrouponly').repo_cnx() as cnx: |
535 rql = u"SET X owned_by U WHERE U login 'iaminusersgrouponly', X is Personne" |
536 rql = u"SET X owned_by U WHERE U login 'iaminusersgrouponly', X is Personne" |
536 self.assertRaises(Unauthorized, cu.execute, rql) |
537 self.assertRaises(Unauthorized, cnx.execute, rql) |
537 self.rollback() |
|
538 |
538 |
539 def test_bookmarked_by_guests_security(self): |
539 def test_bookmarked_by_guests_security(self): |
540 beid1 = self.execute('INSERT Bookmark B: B path "?vid=manage", B title "manage"')[0][0] |
540 with self.admin_access.repo_cnx() as cnx: |
541 beid2 = self.execute('INSERT Bookmark B: B path "?vid=index", B title "index", ' |
541 beid1 = cnx.execute('INSERT Bookmark B: B path "?vid=manage", B title "manage"')[0][0] |
542 'B bookmarked_by U WHERE U login "anon"')[0][0] |
542 beid2 = cnx.execute('INSERT Bookmark B: B path "?vid=index", B title "index", ' |
543 self.commit() |
543 'B bookmarked_by U WHERE U login "anon"')[0][0] |
544 with self.login('anon') as cu: |
544 cnx.commit() |
545 anoneid = self.session.user.eid |
545 with self.new_access('anon').repo_cnx() as cnx: |
546 self.assertEqual(cu.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,' |
546 anoneid = cnx.user.eid |
|
547 self.assertEqual(cnx.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,' |
547 'B bookmarked_by U, U eid %s' % anoneid).rows, |
548 'B bookmarked_by U, U eid %s' % anoneid).rows, |
548 [['index', '?vid=index']]) |
549 [['index', '?vid=index']]) |
549 self.assertEqual(cu.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,' |
550 self.assertEqual(cnx.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,' |
550 'B bookmarked_by U, U eid %(x)s', {'x': anoneid}).rows, |
551 'B bookmarked_by U, U eid %(x)s', {'x': anoneid}).rows, |
551 [['index', '?vid=index']]) |
552 [['index', '?vid=index']]) |
552 # can read others bookmarks as well |
553 # can read others bookmarks as well |
553 self.assertEqual(cu.execute('Any B where B is Bookmark, NOT B bookmarked_by U').rows, |
554 self.assertEqual(cnx.execute('Any B where B is Bookmark, NOT B bookmarked_by U').rows, |
554 [[beid1]]) |
555 [[beid1]]) |
555 self.assertRaises(Unauthorized, cu.execute,'DELETE B bookmarked_by U') |
556 self.assertRaises(Unauthorized, cnx.execute,'DELETE B bookmarked_by U') |
556 self.assertRaises(Unauthorized, |
557 self.assertRaises(Unauthorized, |
557 cu.execute, 'SET B bookmarked_by U WHERE U eid %(x)s, B eid %(b)s', |
558 cnx.execute, 'SET B bookmarked_by U WHERE U eid %(x)s, B eid %(b)s', |
558 {'x': anoneid, 'b': beid1}) |
559 {'x': anoneid, 'b': beid1}) |
559 self.rollback() |
|
560 |
560 |
561 def test_ambigous_ordered(self): |
561 def test_ambigous_ordered(self): |
562 with self.login('anon') as cu: |
562 with self.new_access('anon').repo_cnx() as cnx: |
563 names = [t for t, in cu.execute('Any N ORDERBY lower(N) WHERE X name N')] |
563 names = [t for t, in cnx.execute('Any N ORDERBY lower(N) WHERE X name N')] |
564 self.assertEqual(names, sorted(names, key=lambda x: x.lower())) |
564 self.assertEqual(names, sorted(names, key=lambda x: x.lower())) |
565 |
565 |
566 def test_in_state_without_update_perm(self): |
566 def test_in_state_without_update_perm(self): |
567 """check a user change in_state without having update permission on the |
567 """check a user change in_state without having update permission on the |
568 subject |
568 subject |
569 """ |
569 """ |
570 eid = self.execute('INSERT Affaire X: X ref "ARCT01"')[0][0] |
570 with self.admin_access.repo_cnx() as cnx: |
571 self.commit() |
571 eid = cnx.execute('INSERT Affaire X: X ref "ARCT01"')[0][0] |
572 with self.login('iaminusersgrouponly') as cu: |
572 cnx.commit() |
573 session = self.session |
573 with self.new_access('iaminusersgrouponly').repo_cnx() as cnx: |
574 # needed to avoid check_perm error |
|
575 session.set_cnxset() |
|
576 # needed to remove rql expr granting update perm to the user |
574 # needed to remove rql expr granting update perm to the user |
577 affschema = self.schema['Affaire'] |
575 affschema = self.schema['Affaire'] |
578 with self.temporary_permissions(Affaire={'update': affschema.get_groups('update'), |
576 with self.temporary_permissions(Affaire={'update': affschema.get_groups('update'), |
579 'read': ('users',)}): |
577 'read': ('users',)}): |
580 self.assertRaises(Unauthorized, |
578 self.assertRaises(Unauthorized, |
581 affschema.check_perm, session, 'update', eid=eid) |
579 affschema.check_perm, cnx, 'update', eid=eid) |
582 aff = cu.execute('Any X WHERE X ref "ARCT01"').get_entity(0, 0) |
580 aff = cnx.execute('Any X WHERE X ref "ARCT01"').get_entity(0, 0) |
583 aff.cw_adapt_to('IWorkflowable').fire_transition('abort') |
581 aff.cw_adapt_to('IWorkflowable').fire_transition('abort') |
584 self.commit() |
582 cnx.commit() |
585 # though changing a user state (even logged user) is reserved to managers |
583 # though changing a user state (even logged user) is reserved to managers |
586 user = self.user(session) |
584 user = cnx.user |
587 session.set_cnxset() |
|
588 # XXX wether it should raise Unauthorized or ValidationError is not clear |
585 # XXX wether it should raise Unauthorized or ValidationError is not clear |
589 # the best would probably ValidationError if the transition doesn't exist |
586 # the best would probably ValidationError if the transition doesn't exist |
590 # from the current state but Unauthorized if it exists but user can't pass it |
587 # from the current state but Unauthorized if it exists but user can't pass it |
591 self.assertRaises(ValidationError, |
588 self.assertRaises(ValidationError, |
592 user.cw_adapt_to('IWorkflowable').fire_transition, 'deactivate') |
589 user.cw_adapt_to('IWorkflowable').fire_transition, 'deactivate') |
593 self.rollback() # else will fail on login cm exit |
|
594 |
590 |
595 def test_trinfo_security(self): |
591 def test_trinfo_security(self): |
596 aff = self.execute('INSERT Affaire X: X ref "ARCT01"').get_entity(0, 0) |
592 with self.admin_access.repo_cnx() as cnx: |
597 iworkflowable = aff.cw_adapt_to('IWorkflowable') |
593 aff = cnx.execute('INSERT Affaire X: X ref "ARCT01"').get_entity(0, 0) |
598 self.commit() |
594 iworkflowable = aff.cw_adapt_to('IWorkflowable') |
599 iworkflowable.fire_transition('abort') |
595 cnx.commit() |
600 self.commit() |
596 iworkflowable.fire_transition('abort') |
601 # can change tr info comment |
597 cnx.commit() |
602 self.execute('SET TI comment %(c)s WHERE TI wf_info_for X, X ref "ARCT01"', |
598 # can change tr info comment |
603 {'c': u'bouh!'}) |
599 cnx.execute('SET TI comment %(c)s WHERE TI wf_info_for X, X ref "ARCT01"', |
604 self.commit() |
600 {'c': u'bouh!'}) |
605 aff.cw_clear_relation_cache('wf_info_for', 'object') |
601 cnx.commit() |
606 trinfo = iworkflowable.latest_trinfo() |
602 aff.cw_clear_relation_cache('wf_info_for', 'object') |
607 self.assertEqual(trinfo.comment, 'bouh!') |
603 trinfo = iworkflowable.latest_trinfo() |
608 # but not from_state/to_state |
604 self.assertEqual(trinfo.comment, 'bouh!') |
609 aff.cw_clear_relation_cache('wf_info_for', role='object') |
605 # but not from_state/to_state |
610 self.assertRaises(Unauthorized, |
606 aff.cw_clear_relation_cache('wf_info_for', role='object') |
611 self.execute, 'SET TI from_state S WHERE TI eid %(ti)s, S name "ben non"', |
607 self.assertRaises(Unauthorized, cnx.execute, |
612 {'ti': trinfo.eid}) |
608 'SET TI from_state S WHERE TI eid %(ti)s, S name "ben non"', |
613 self.assertRaises(Unauthorized, |
609 {'ti': trinfo.eid}) |
614 self.execute, 'SET TI to_state S WHERE TI eid %(ti)s, S name "pitetre"', |
610 self.assertRaises(Unauthorized, cnx.execute, |
615 {'ti': trinfo.eid}) |
611 'SET TI to_state S WHERE TI eid %(ti)s, S name "pitetre"', |
|
612 {'ti': trinfo.eid}) |
616 |
613 |
617 def test_emailaddress_security(self): |
614 def test_emailaddress_security(self): |
618 # check for prexisting email adresse |
615 # check for prexisting email adresse |
619 if self.execute('Any X WHERE X is EmailAddress'): |
616 with self.admin_access.repo_cnx() as cnx: |
620 rset = self.execute('Any X, U WHERE X is EmailAddress, U use_email X') |
617 if cnx.execute('Any X WHERE X is EmailAddress'): |
621 msg = ['Preexisting email readable by anon found!'] |
618 rset = cnx.execute('Any X, U WHERE X is EmailAddress, U use_email X') |
622 tmpl = ' - "%s" used by user "%s"' |
619 msg = ['Preexisting email readable by anon found!'] |
623 for i in xrange(len(rset)): |
620 tmpl = ' - "%s" used by user "%s"' |
624 email, user = rset.get_entity(i, 0), rset.get_entity(i, 1) |
621 for i in xrange(len(rset)): |
625 msg.append(tmpl % (email.dc_title(), user.dc_title())) |
622 email, user = rset.get_entity(i, 0), rset.get_entity(i, 1) |
626 raise RuntimeError('\n'.join(msg)) |
623 msg.append(tmpl % (email.dc_title(), user.dc_title())) |
627 # actual test |
624 raise RuntimeError('\n'.join(msg)) |
628 self.execute('INSERT EmailAddress X: X address "hop"').get_entity(0, 0) |
625 # actual test |
629 self.execute('INSERT EmailAddress X: X address "anon", ' |
626 cnx.execute('INSERT EmailAddress X: X address "hop"').get_entity(0, 0) |
630 'U use_email X WHERE U login "anon"').get_entity(0, 0) |
627 cnx.execute('INSERT EmailAddress X: X address "anon", ' |
631 self.commit() |
628 'U use_email X WHERE U login "anon"').get_entity(0, 0) |
632 self.assertEqual(len(self.execute('Any X WHERE X is EmailAddress')), 2) |
629 cnx.commit() |
633 self.login('anon') |
630 self.assertEqual(len(cnx.execute('Any X WHERE X is EmailAddress')), 2) |
634 self.assertEqual(len(self.execute('Any X WHERE X is EmailAddress')), 1) |
631 with self.new_access('anon').repo_cnx() as cnx: |
|
632 self.assertEqual(len(cnx.execute('Any X WHERE X is EmailAddress')), 1) |
635 |
633 |
636 if __name__ == '__main__': |
634 if __name__ == '__main__': |
637 unittest_main() |
635 unittest_main() |