1 # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
3 # |
|
4 # This file is part of CubicWeb. |
|
5 # |
|
6 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
7 # terms of the GNU Lesser General Public License as published by the Free |
|
8 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
9 # any later version. |
|
10 # |
|
11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
13 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
14 # details. |
|
15 # |
|
16 # You should have received a copy of the GNU Lesser General Public License along |
|
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
18 """unit tests for cubicweb.web.application""" |
|
19 |
|
20 import base64 |
|
21 |
|
22 from six import text_type |
|
23 from six.moves import http_client |
|
24 from six.moves.http_cookies import SimpleCookie |
|
25 |
|
26 from logilab.common.testlib import TestCase, unittest_main |
|
27 from logilab.common.decorators import clear_cache, classproperty |
|
28 |
|
29 from cubicweb import AuthenticationError |
|
30 from cubicweb import view |
|
31 from cubicweb.devtools.testlib import CubicWebTC, real_error_handling |
|
32 from cubicweb.devtools.fake import FakeRequest |
|
33 from cubicweb.web import LogOut, Redirect, INTERNAL_FIELD_VALUE |
|
34 from cubicweb.web.views.basecontrollers import ViewController |
|
35 from cubicweb.web.application import anonymized_request |
|
36 from cubicweb import repoapi |
|
37 |
|
38 class FakeMapping: |
|
39 """emulates a mapping module""" |
|
40 def __init__(self): |
|
41 self.ENTITIES_MAP = {} |
|
42 self.ATTRIBUTES_MAP = {} |
|
43 self.RELATIONS_MAP = {} |
|
44 |
|
45 class MockCursor: |
|
46 def __init__(self): |
|
47 self.executed = [] |
|
48 def execute(self, rql, args=None, build_descr=False): |
|
49 args = args or {} |
|
50 self.executed.append(rql % args) |
|
51 |
|
52 |
|
53 class FakeController(ViewController): |
|
54 |
|
55 def __init__(self, form=None): |
|
56 self._cw = FakeRequest() |
|
57 self._cw.form = form or {} |
|
58 self._cursor = MockCursor() |
|
59 self._cw.execute = self._cursor.execute |
|
60 |
|
61 def new_cursor(self): |
|
62 self._cursor = MockCursor() |
|
63 self._cw.execute = self._cursor.execute |
|
64 |
|
65 def set_form(self, form): |
|
66 self._cw.form = form |
|
67 |
|
68 |
|
69 class RequestBaseTC(TestCase): |
|
70 def setUp(self): |
|
71 self._cw = FakeRequest() |
|
72 |
|
73 |
|
74 def test_list_arg(self): |
|
75 """tests the list_arg() function""" |
|
76 list_arg = self._cw.list_form_param |
|
77 self.assertEqual(list_arg('arg3', {}), []) |
|
78 d = {'arg1' : "value1", |
|
79 'arg2' : ('foo', INTERNAL_FIELD_VALUE,), |
|
80 'arg3' : ['bar']} |
|
81 self.assertEqual(list_arg('arg1', d, True), ['value1']) |
|
82 self.assertEqual(d, {'arg2' : ('foo', INTERNAL_FIELD_VALUE), 'arg3' : ['bar'],}) |
|
83 self.assertEqual(list_arg('arg2', d, True), ['foo']) |
|
84 self.assertEqual({'arg3' : ['bar'],}, d) |
|
85 self.assertEqual(list_arg('arg3', d), ['bar',]) |
|
86 self.assertEqual({'arg3' : ['bar'],}, d) |
|
87 |
|
88 |
|
89 def test_from_controller(self): |
|
90 self._cw.vreg['controllers'] = {'view': 1, 'login': 1} |
|
91 self.assertEqual(self._cw.from_controller(), 'view') |
|
92 req = FakeRequest(url='project?vid=list') |
|
93 req.vreg['controllers'] = {'view': 1, 'login': 1} |
|
94 # this assertion is just to make sure that relative_path can be |
|
95 # correctly computed as it is used in from_controller() |
|
96 self.assertEqual(req.relative_path(False), 'project') |
|
97 self.assertEqual(req.from_controller(), 'view') |
|
98 # test on a valid non-view controller |
|
99 req = FakeRequest(url='login?x=1&y=2') |
|
100 req.vreg['controllers'] = {'view': 1, 'login': 1} |
|
101 self.assertEqual(req.relative_path(False), 'login') |
|
102 self.assertEqual(req.from_controller(), 'login') |
|
103 |
|
104 |
|
105 class UtilsTC(TestCase): |
|
106 """test suite for misc application utilities""" |
|
107 |
|
108 def setUp(self): |
|
109 self.ctrl = FakeController() |
|
110 |
|
111 #def test_which_mapping(self): |
|
112 # """tests which mapping is used (application or core)""" |
|
113 # init_mapping() |
|
114 # from cubicweb.common import mapping |
|
115 # self.assertEqual(mapping.MAPPING_USED, 'core') |
|
116 # sys.modules['mapping'] = FakeMapping() |
|
117 # init_mapping() |
|
118 # self.assertEqual(mapping.MAPPING_USED, 'application') |
|
119 # del sys.modules['mapping'] |
|
120 |
|
121 def test_execute_linkto(self): |
|
122 """tests the execute_linkto() function""" |
|
123 self.assertEqual(self.ctrl.execute_linkto(), None) |
|
124 self.assertEqual(self.ctrl._cursor.executed, |
|
125 []) |
|
126 |
|
127 self.ctrl.set_form({'__linkto' : 'works_for:12_13_14:object', |
|
128 'eid': 8}) |
|
129 self.ctrl.execute_linkto() |
|
130 self.assertEqual(self.ctrl._cursor.executed, |
|
131 ['SET Y works_for X WHERE X eid 8, Y eid %s' % i |
|
132 for i in (12, 13, 14)]) |
|
133 |
|
134 self.ctrl.new_cursor() |
|
135 self.ctrl.set_form({'__linkto' : 'works_for:12_13_14:subject', |
|
136 'eid': 8}) |
|
137 self.ctrl.execute_linkto() |
|
138 self.assertEqual(self.ctrl._cursor.executed, |
|
139 ['SET X works_for Y WHERE X eid 8, Y eid %s' % i |
|
140 for i in (12, 13, 14)]) |
|
141 |
|
142 |
|
143 self.ctrl.new_cursor() |
|
144 self.ctrl._cw.form = {'__linkto' : 'works_for:12_13_14:object'} |
|
145 self.ctrl.execute_linkto(eid=8) |
|
146 self.assertEqual(self.ctrl._cursor.executed, |
|
147 ['SET Y works_for X WHERE X eid 8, Y eid %s' % i |
|
148 for i in (12, 13, 14)]) |
|
149 |
|
150 self.ctrl.new_cursor() |
|
151 self.ctrl.set_form({'__linkto' : 'works_for:12_13_14:subject'}) |
|
152 self.ctrl.execute_linkto(eid=8) |
|
153 self.assertEqual(self.ctrl._cursor.executed, |
|
154 ['SET X works_for Y WHERE X eid 8, Y eid %s' % i |
|
155 for i in (12, 13, 14)]) |
|
156 |
|
157 |
|
158 class ApplicationTC(CubicWebTC): |
|
159 |
|
160 @classproperty |
|
161 def config(cls): |
|
162 try: |
|
163 return cls.__dict__['_config'] |
|
164 except KeyError: |
|
165 config = super(ApplicationTC, cls).config |
|
166 config.global_set_option('allow-email-login', True) |
|
167 return config |
|
168 |
|
169 def test_cnx_user_groups_sync(self): |
|
170 with self.admin_access.client_cnx() as cnx: |
|
171 user = cnx.user |
|
172 self.assertEqual(user.groups, set(('managers',))) |
|
173 cnx.execute('SET X in_group G WHERE X eid %s, G name "guests"' % user.eid) |
|
174 user = cnx.user |
|
175 self.assertEqual(user.groups, set(('managers',))) |
|
176 cnx.commit() |
|
177 user = cnx.user |
|
178 self.assertEqual(user.groups, set(('managers', 'guests'))) |
|
179 # cleanup |
|
180 cnx.execute('DELETE X in_group G WHERE X eid %s, G name "guests"' % user.eid) |
|
181 cnx.commit() |
|
182 |
|
183 def test_publish_validation_error(self): |
|
184 with self.admin_access.web_request() as req: |
|
185 user = req.user |
|
186 eid = text_type(user.eid) |
|
187 req.form = { |
|
188 'eid': eid, |
|
189 '__type:'+eid: 'CWUser', '_cw_entity_fields:'+eid: 'login-subject', |
|
190 'login-subject:'+eid: '', # ERROR: no login specified |
|
191 # just a sample, missing some necessary information for real life |
|
192 '__errorurl': 'view?vid=edition...' |
|
193 } |
|
194 path, params = self.expect_redirect_handle_request(req, 'edit') |
|
195 forminfo = req.session.data['view?vid=edition...'] |
|
196 eidmap = forminfo['eidmap'] |
|
197 self.assertEqual(eidmap, {}) |
|
198 values = forminfo['values'] |
|
199 self.assertEqual(values['login-subject:'+eid], '') |
|
200 self.assertEqual(values['eid'], eid) |
|
201 error = forminfo['error'] |
|
202 self.assertEqual(error.entity, user.eid) |
|
203 self.assertEqual(error.errors['login-subject'], 'required field') |
|
204 |
|
205 |
|
206 def test_validation_error_dont_loose_subentity_data_ctrl(self): |
|
207 """test creation of two linked entities |
|
208 |
|
209 error occurs on the web controller |
|
210 """ |
|
211 with self.admin_access.web_request() as req: |
|
212 # set Y before X to ensure both entities are edited, not only X |
|
213 req.form = {'eid': ['Y', 'X'], '__maineid': 'X', |
|
214 '__type:X': 'CWUser', '_cw_entity_fields:X': 'login-subject', |
|
215 # missing required field |
|
216 'login-subject:X': u'', |
|
217 # but email address is set |
|
218 '__type:Y': 'EmailAddress', '_cw_entity_fields:Y': 'address-subject', |
|
219 'address-subject:Y': u'bougloup@logilab.fr', |
|
220 'use_email-object:Y': 'X', |
|
221 # necessary to get validation error handling |
|
222 '__errorurl': 'view?vid=edition...', |
|
223 } |
|
224 path, params = self.expect_redirect_handle_request(req, 'edit') |
|
225 forminfo = req.session.data['view?vid=edition...'] |
|
226 self.assertEqual(set(forminfo['eidmap']), set('XY')) |
|
227 self.assertEqual(forminfo['eidmap']['X'], None) |
|
228 self.assertIsInstance(forminfo['eidmap']['Y'], int) |
|
229 self.assertEqual(forminfo['error'].entity, 'X') |
|
230 self.assertEqual(forminfo['error'].errors, |
|
231 {'login-subject': 'required field'}) |
|
232 self.assertEqual(forminfo['values'], req.form) |
|
233 |
|
234 |
|
235 def test_validation_error_dont_loose_subentity_data_repo(self): |
|
236 """test creation of two linked entities |
|
237 |
|
238 error occurs on the repository |
|
239 """ |
|
240 with self.admin_access.web_request() as req: |
|
241 # set Y before X to ensure both entities are edited, not only X |
|
242 req.form = {'eid': ['Y', 'X'], '__maineid': 'X', |
|
243 '__type:X': 'CWUser', '_cw_entity_fields:X': 'login-subject,upassword-subject', |
|
244 # already existent user |
|
245 'login-subject:X': u'admin', |
|
246 'upassword-subject:X': u'admin', 'upassword-subject-confirm:X': u'admin', |
|
247 '__type:Y': 'EmailAddress', '_cw_entity_fields:Y': 'address-subject', |
|
248 'address-subject:Y': u'bougloup@logilab.fr', |
|
249 'use_email-object:Y': 'X', |
|
250 # necessary to get validation error handling |
|
251 '__errorurl': 'view?vid=edition...', |
|
252 } |
|
253 path, params = self.expect_redirect_handle_request(req, 'edit') |
|
254 forminfo = req.session.data['view?vid=edition...'] |
|
255 self.assertEqual(set(forminfo['eidmap']), set('XY')) |
|
256 self.assertIsInstance(forminfo['eidmap']['X'], int) |
|
257 self.assertIsInstance(forminfo['eidmap']['Y'], int) |
|
258 self.assertEqual(forminfo['error'].entity, forminfo['eidmap']['X']) |
|
259 self.assertEqual(forminfo['error'].errors, |
|
260 {'login-subject': u'the value "admin" is already used, use another one'}) |
|
261 self.assertEqual(forminfo['values'], req.form) |
|
262 |
|
263 def test_ajax_view_raise_arbitrary_error(self): |
|
264 class ErrorAjaxView(view.View): |
|
265 __regid__ = 'test.ajax.error' |
|
266 def call(self): |
|
267 raise Exception('whatever') |
|
268 with self.temporary_appobjects(ErrorAjaxView): |
|
269 with real_error_handling(self.app) as app: |
|
270 with self.admin_access.web_request(vid='test.ajax.error') as req: |
|
271 req.ajax_request = True |
|
272 page = app.handle_request(req, '') |
|
273 self.assertEqual(http_client.INTERNAL_SERVER_ERROR, |
|
274 req.status_out) |
|
275 |
|
276 def _test_cleaned(self, kwargs, injected, cleaned): |
|
277 with self.admin_access.web_request(**kwargs) as req: |
|
278 page = self.app_handle_request(req, 'view') |
|
279 self.assertNotIn(injected.encode('ascii'), page) |
|
280 self.assertIn(cleaned.encode('ascii'), page) |
|
281 |
|
282 def test_nonregr_script_kiddies(self): |
|
283 """test against current script injection""" |
|
284 injected = '<i>toto</i>' |
|
285 cleaned = 'toto' |
|
286 for kwargs in ({'vid': injected}, |
|
287 {'vtitle': injected}, |
|
288 ): |
|
289 yield self._test_cleaned, kwargs, injected, cleaned |
|
290 |
|
291 def test_site_wide_eproperties_sync(self): |
|
292 # XXX work in all-in-one configuration but not in twisted for instance |
|
293 # in which case we need a kindof repo -> http server notification |
|
294 # protocol |
|
295 vreg = self.app.vreg |
|
296 # default value |
|
297 self.assertEqual(vreg.property_value('ui.language'), 'en') |
|
298 with self.admin_access.client_cnx() as cnx: |
|
299 cnx.execute('INSERT CWProperty X: X value "fr", X pkey "ui.language"') |
|
300 self.assertEqual(vreg.property_value('ui.language'), 'en') |
|
301 cnx.commit() |
|
302 self.assertEqual(vreg.property_value('ui.language'), 'fr') |
|
303 cnx.execute('SET X value "de" WHERE X pkey "ui.language"') |
|
304 self.assertEqual(vreg.property_value('ui.language'), 'fr') |
|
305 cnx.commit() |
|
306 self.assertEqual(vreg.property_value('ui.language'), 'de') |
|
307 cnx.execute('DELETE CWProperty X WHERE X pkey "ui.language"') |
|
308 self.assertEqual(vreg.property_value('ui.language'), 'de') |
|
309 cnx.commit() |
|
310 self.assertEqual(vreg.property_value('ui.language'), 'en') |
|
311 |
|
312 # authentication tests #################################################### |
|
313 |
|
314 def test_http_auth_no_anon(self): |
|
315 req, origsession = self.init_authentication('http') |
|
316 self.assertAuthFailure(req) |
|
317 self.app.handle_request(req, 'login') |
|
318 self.assertEqual(401, req.status_out) |
|
319 clear_cache(req, 'get_authorization') |
|
320 authstr = base64.encodestring(('%s:%s' % (self.admlogin, self.admpassword)).encode('ascii')) |
|
321 req.set_request_header('Authorization', 'basic %s' % authstr.decode('ascii')) |
|
322 self.assertAuthSuccess(req, origsession) |
|
323 self.assertRaises(LogOut, self.app_handle_request, req, 'logout') |
|
324 self.assertEqual(len(self.open_sessions), 0) |
|
325 |
|
326 def test_cookie_auth_no_anon(self): |
|
327 req, origsession = self.init_authentication('cookie') |
|
328 self.assertAuthFailure(req) |
|
329 try: |
|
330 form = self.app.handle_request(req, 'login') |
|
331 except Redirect as redir: |
|
332 self.fail('anonymous user should get login form') |
|
333 clear_cache(req, 'get_authorization') |
|
334 self.assertIn(b'__login', form) |
|
335 self.assertIn(b'__password', form) |
|
336 self.assertFalse(req.cnx) # Mock cnx are False |
|
337 req.form['__login'] = self.admlogin |
|
338 req.form['__password'] = self.admpassword |
|
339 self.assertAuthSuccess(req, origsession) |
|
340 self.assertRaises(LogOut, self.app_handle_request, req, 'logout') |
|
341 self.assertEqual(len(self.open_sessions), 0) |
|
342 |
|
343 def test_login_by_email(self): |
|
344 with self.admin_access.client_cnx() as cnx: |
|
345 login = cnx.user.login |
|
346 address = login + u'@localhost' |
|
347 cnx.execute('INSERT EmailAddress X: X address %(address)s, U primary_email X ' |
|
348 'WHERE U login %(login)s', {'address': address, 'login': login}) |
|
349 cnx.commit() |
|
350 # # option allow-email-login not set |
|
351 req, origsession = self.init_authentication('cookie') |
|
352 # req.form['__login'] = address |
|
353 # req.form['__password'] = self.admpassword |
|
354 # self.assertAuthFailure(req) |
|
355 # option allow-email-login set |
|
356 #origsession.login = address |
|
357 self.set_option('allow-email-login', True) |
|
358 req.form['__login'] = address |
|
359 req.form['__password'] = self.admpassword |
|
360 self.assertAuthSuccess(req, origsession) |
|
361 self.assertRaises(LogOut, self.app_handle_request, req, 'logout') |
|
362 self.assertEqual(len(self.open_sessions), 0) |
|
363 |
|
364 def _reset_cookie(self, req): |
|
365 # preparing the suite of the test |
|
366 # set session id in cookie |
|
367 cookie = SimpleCookie() |
|
368 sessioncookie = self.app.session_handler.session_cookie(req) |
|
369 cookie[sessioncookie] = req.session.sessionid |
|
370 req.set_request_header('Cookie', cookie[sessioncookie].OutputString(), |
|
371 raw=True) |
|
372 clear_cache(req, 'get_authorization') |
|
373 |
|
374 def _test_auth_anon(self, req): |
|
375 asession = self.app.get_session(req) |
|
376 # important otherwise _reset_cookie will not use the right session |
|
377 req.set_cnx(repoapi.Connection(asession)) |
|
378 self.assertEqual(len(self.open_sessions), 1) |
|
379 self.assertEqual(asession.login, 'anon') |
|
380 self.assertTrue(asession.anonymous_session) |
|
381 self._reset_cookie(req) |
|
382 |
|
383 def _test_anon_auth_fail(self, req): |
|
384 self.assertEqual(1, len(self.open_sessions)) |
|
385 session = self.app.get_session(req) |
|
386 # important otherwise _reset_cookie will not use the right session |
|
387 req.set_cnx(repoapi.Connection(session)) |
|
388 self.assertEqual(req.message, 'authentication failure') |
|
389 self.assertEqual(req.session.anonymous_session, True) |
|
390 self.assertEqual(1, len(self.open_sessions)) |
|
391 self._reset_cookie(req) |
|
392 |
|
393 def test_http_auth_anon_allowed(self): |
|
394 req, origsession = self.init_authentication('http', 'anon') |
|
395 self._test_auth_anon(req) |
|
396 authstr = base64.encodestring(b'toto:pouet') |
|
397 req.set_request_header('Authorization', 'basic %s' % authstr.decode('ascii')) |
|
398 self._test_anon_auth_fail(req) |
|
399 authstr = base64.encodestring(('%s:%s' % (self.admlogin, self.admpassword)).encode('ascii')) |
|
400 req.set_request_header('Authorization', 'basic %s' % authstr.decode('ascii')) |
|
401 self.assertAuthSuccess(req, origsession) |
|
402 self.assertRaises(LogOut, self.app_handle_request, req, 'logout') |
|
403 self.assertEqual(len(self.open_sessions), 0) |
|
404 |
|
405 def test_cookie_auth_anon_allowed(self): |
|
406 req, origsession = self.init_authentication('cookie', 'anon') |
|
407 self._test_auth_anon(req) |
|
408 req.form['__login'] = 'toto' |
|
409 req.form['__password'] = 'pouet' |
|
410 self._test_anon_auth_fail(req) |
|
411 req.form['__login'] = self.admlogin |
|
412 req.form['__password'] = self.admpassword |
|
413 self.assertAuthSuccess(req, origsession) |
|
414 self.assertRaises(LogOut, self.app_handle_request, req, 'logout') |
|
415 self.assertEqual(0, len(self.open_sessions)) |
|
416 |
|
417 def test_anonymized_request(self): |
|
418 with self.admin_access.web_request() as req: |
|
419 self.assertEqual(self.admlogin, req.session.user.login) |
|
420 # admin should see anon + admin |
|
421 self.assertEqual(2, len(list(req.find('CWUser')))) |
|
422 with anonymized_request(req): |
|
423 self.assertEqual('anon', req.session.login, 'anon') |
|
424 # anon should only see anon user |
|
425 self.assertEqual(1, len(list(req.find('CWUser')))) |
|
426 self.assertEqual(self.admlogin, req.session.login) |
|
427 self.assertEqual(2, len(list(req.find('CWUser')))) |
|
428 |
|
429 def test_non_regr_optional_first_var(self): |
|
430 with self.admin_access.web_request() as req: |
|
431 # expect a rset with None in [0][0] |
|
432 req.form['rql'] = 'rql:Any OV1, X WHERE X custom_workflow OV1?' |
|
433 self.app_handle_request(req) |
|
434 |
|
435 |
|
436 if __name__ == '__main__': |
|
437 unittest_main() |
|