|
1 # -*- coding: iso-8859-1 -*- |
|
2 """unit tests for cubicweb.web.application""" |
|
3 |
|
4 from logilab.common.testlib import TestCase, unittest_main |
|
5 import base64, Cookie |
|
6 |
|
7 import sys |
|
8 from urllib import unquote |
|
9 from logilab.common.decorators import clear_cache |
|
10 |
|
11 from cubicweb.web import Redirect, AuthenticationError, ExplicitLogin, INTERNAL_FIELD_VALUE |
|
12 from cubicweb.web.views.basecontrollers import ViewController |
|
13 from cubicweb.devtools._apptest import FakeRequest |
|
14 |
|
15 class FakeMapping: |
|
16 """emulates a mapping module""" |
|
17 def __init__(self): |
|
18 self.ENTITIES_MAP = {} |
|
19 self.ATTRIBUTES_MAP = {} |
|
20 self.RELATIONS_MAP = {} |
|
21 |
|
22 class MockCursor: |
|
23 def __init__(self): |
|
24 self.executed = [] |
|
25 def execute(self, rql, args=None, cachekey=None): |
|
26 args = args or {} |
|
27 self.executed.append(rql % args) |
|
28 |
|
29 |
|
30 class FakeController(ViewController): |
|
31 |
|
32 def __init__(self, form=None): |
|
33 self.req = FakeRequest() |
|
34 self.req.form = form or {} |
|
35 self._cursor = self.req.cursor = MockCursor() |
|
36 |
|
37 def new_cursor(self): |
|
38 self._cursor = self.req.cursor = MockCursor() |
|
39 |
|
40 def set_form(self, form): |
|
41 self.req.form = form |
|
42 |
|
43 |
|
44 class RequestBaseTC(TestCase): |
|
45 def setUp(self): |
|
46 self.req = FakeRequest() |
|
47 |
|
48 |
|
49 def test_list_arg(self): |
|
50 """tests the list_arg() function""" |
|
51 list_arg = self.req.list_form_param |
|
52 self.assertEquals(list_arg('arg3', {}), []) |
|
53 d = {'arg1' : "value1", |
|
54 'arg2' : ('foo', INTERNAL_FIELD_VALUE,), |
|
55 'arg3' : ['bar']} |
|
56 self.assertEquals(list_arg('arg1', d, True), ['value1']) |
|
57 self.assertEquals(d, {'arg2' : ('foo', INTERNAL_FIELD_VALUE), 'arg3' : ['bar'],}) |
|
58 self.assertEquals(list_arg('arg2', d, True), ['foo']) |
|
59 self.assertEquals({'arg3' : ['bar'],}, d) |
|
60 self.assertEquals(list_arg('arg3', d), ['bar',]) |
|
61 self.assertEquals({'arg3' : ['bar'],}, d) |
|
62 |
|
63 |
|
64 def test_from_controller(self): |
|
65 self.assertEquals(self.req.from_controller(), 'view') |
|
66 req = FakeRequest(url='project?vid=list') |
|
67 # this assertion is just to make sure that relative_path can be |
|
68 # correctly computed as it is used in from_controller() |
|
69 self.assertEquals(req.relative_path(False), 'project') |
|
70 self.assertEquals(req.from_controller(), 'view') |
|
71 # test on a valid non-view controller |
|
72 req = FakeRequest(url='login?x=1&y=2') |
|
73 self.assertEquals(req.relative_path(False), 'login') |
|
74 self.assertEquals(req.from_controller(), 'login') |
|
75 |
|
76 |
|
77 class UtilsTC(TestCase): |
|
78 """test suite for misc application utilities""" |
|
79 |
|
80 def setUp(self): |
|
81 self.ctrl = FakeController() |
|
82 |
|
83 #def test_which_mapping(self): |
|
84 # """tests which mapping is used (application or core)""" |
|
85 # init_mapping() |
|
86 # from cubicweb.common import mapping |
|
87 # self.assertEquals(mapping.MAPPING_USED, 'core') |
|
88 # sys.modules['mapping'] = FakeMapping() |
|
89 # init_mapping() |
|
90 # self.assertEquals(mapping.MAPPING_USED, 'application') |
|
91 # del sys.modules['mapping'] |
|
92 |
|
93 def test_execute_linkto(self): |
|
94 """tests the execute_linkto() function""" |
|
95 self.assertEquals(self.ctrl.execute_linkto(), None) |
|
96 self.assertEquals(self.ctrl._cursor.executed, |
|
97 []) |
|
98 |
|
99 self.ctrl.set_form({'__linkto' : 'works_for:12_13_14:object', |
|
100 'eid': 8}) |
|
101 self.ctrl.execute_linkto() |
|
102 self.assertEquals(self.ctrl._cursor.executed, |
|
103 ['SET Y works_for X WHERE X eid 8, Y eid %s' % i |
|
104 for i in (12, 13, 14)]) |
|
105 |
|
106 self.ctrl.new_cursor() |
|
107 self.ctrl.set_form({'__linkto' : 'works_for:12_13_14:subject', |
|
108 'eid': 8}) |
|
109 self.ctrl.execute_linkto() |
|
110 self.assertEquals(self.ctrl._cursor.executed, |
|
111 ['SET X works_for Y WHERE X eid 8, Y eid %s' % i |
|
112 for i in (12, 13, 14)]) |
|
113 |
|
114 |
|
115 self.ctrl.new_cursor() |
|
116 self.ctrl.req.form = {'__linkto' : 'works_for:12_13_14:object'} |
|
117 self.ctrl.execute_linkto(eid=8) |
|
118 self.assertEquals(self.ctrl._cursor.executed, |
|
119 ['SET Y works_for X WHERE X eid 8, Y eid %s' % i |
|
120 for i in (12, 13, 14)]) |
|
121 |
|
122 self.ctrl.new_cursor() |
|
123 self.ctrl.set_form({'__linkto' : 'works_for:12_13_14:subject'}) |
|
124 self.ctrl.execute_linkto(eid=8) |
|
125 self.assertEquals(self.ctrl._cursor.executed, |
|
126 ['SET X works_for Y WHERE X eid 8, Y eid %s' % i |
|
127 for i in (12, 13, 14)]) |
|
128 |
|
129 |
|
130 from cubicweb.devtools.apptest import EnvBasedTC |
|
131 |
|
132 |
|
133 class ApplicationTC(EnvBasedTC): |
|
134 |
|
135 def publish(self, req, path='view'): |
|
136 return self.app.publish(path, req) |
|
137 |
|
138 def expect_redirect(self, callback, req): |
|
139 try: |
|
140 res = callback(req) |
|
141 print res |
|
142 except Redirect, ex: |
|
143 try: |
|
144 path, params = ex.location.split('?', 1) |
|
145 except ValueError: |
|
146 path = ex.location |
|
147 params = {} |
|
148 else: |
|
149 cleanup = lambda p: (p[0], unquote(p[1])) |
|
150 params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p) |
|
151 path = path[len(req.base_url()):] |
|
152 return path, params |
|
153 else: |
|
154 self.fail('expected a Redirect exception') |
|
155 |
|
156 def expect_redirect_publish(self, req, path='view'): |
|
157 return self.expect_redirect(lambda x: self.publish(x, path), req) |
|
158 |
|
159 def test_cnx_user_groups_sync(self): |
|
160 user = self.user() |
|
161 self.assertEquals(user.groups, set(('managers',))) |
|
162 self.execute('SET X in_group G WHERE X eid %s, G name "guests"' % user.eid) |
|
163 user = self.user() |
|
164 self.assertEquals(user.groups, set(('managers',))) |
|
165 self.commit() |
|
166 user = self.user() |
|
167 self.assertEquals(user.groups, set(('managers', 'guests'))) |
|
168 # cleanup |
|
169 self.execute('DELETE X in_group G WHERE X eid %s, G name "guests"' % user.eid) |
|
170 self.commit() |
|
171 |
|
172 def test_nonregr_publish1(self): |
|
173 req = self.request(u'EEType X WHERE X final FALSE, X meta FALSE') |
|
174 self.app.publish('view', req) |
|
175 |
|
176 def test_nonregr_publish2(self): |
|
177 req = self.request(u'Any count(N) WHERE N todo_by U, N is Note, U eid %s' |
|
178 % self.user().eid) |
|
179 self.app.publish('view', req) |
|
180 |
|
181 def test_publish_validation_error(self): |
|
182 req = self.request() |
|
183 user = self.user() |
|
184 req.form = { |
|
185 'eid': `user.eid`, |
|
186 '__type:'+`user.eid`: 'EUser', |
|
187 'login:'+`user.eid`: '', # ERROR: no login specified |
|
188 'edits-login:'+`user.eid`: unicode(user.login), |
|
189 # just a sample, missing some necessary information for real life |
|
190 '__errorurl': 'view?vid=edition...' |
|
191 } |
|
192 path, params = self.expect_redirect_publish(req, 'edit') |
|
193 forminfo = req.get_session_data('view?vid=edition...') |
|
194 eidmap = forminfo['eidmap'] |
|
195 self.assertEquals(eidmap, {}) |
|
196 values = forminfo['values'] |
|
197 self.assertEquals(values['login:'+`user.eid`], '') |
|
198 self.assertEquals(values['edits-login:'+`user.eid`], user.login) |
|
199 self.assertEquals(values['eid'], `user.eid`) |
|
200 errors = forminfo['errors'] |
|
201 self.assertEquals(errors.entity, user.eid) |
|
202 self.assertEquals(errors.errors['login'], 'required attribute') |
|
203 |
|
204 |
|
205 def test_validation_error_dont_loose_subentity_data(self): |
|
206 """test creation of two linked entities |
|
207 """ |
|
208 req = self.request() |
|
209 form = {'eid': ['X', 'Y'], |
|
210 '__type:X': 'EUser', |
|
211 # missing required field |
|
212 'login:X': u'', 'edits-login:X': '', |
|
213 'surname:X': u'Mr Ouaoua', 'edits-surname:X': '', |
|
214 '__type:Y': 'EmailAddress', |
|
215 # but email address is set |
|
216 'address:Y': u'bougloup@logilab.fr', 'edits-address:Y': '', |
|
217 'alias:Y': u'', 'edits-alias:Y': '', |
|
218 'use_email:X': 'Y', 'edits-use_email:X': INTERNAL_FIELD_VALUE, |
|
219 # necessary to get validation error handling |
|
220 '__errorurl': 'view?vid=edition...', |
|
221 } |
|
222 req.form = form |
|
223 # monkey patch edited_eid to ensure both entities are edited, not only X |
|
224 req.edited_eids = lambda : ('Y', 'X') |
|
225 path, params = self.expect_redirect_publish(req, 'edit') |
|
226 forminfo = req.get_session_data('view?vid=edition...') |
|
227 self.assertUnorderedIterableEquals(forminfo['eidmap'].keys(), ['X', 'Y']) |
|
228 self.assertEquals(forminfo['errors'].entity, forminfo['eidmap']['X']) |
|
229 self.assertEquals(forminfo['errors'].errors, {'login': 'required attribute', |
|
230 'upassword': 'required attribute'}) |
|
231 self.assertEquals(forminfo['values'], form) |
|
232 |
|
233 def _test_cleaned(self, kwargs, injected, cleaned): |
|
234 req = self.request(**kwargs) |
|
235 page = self.app.publish('view', req) |
|
236 self.failIf(injected in page, (kwargs, injected)) |
|
237 self.failUnless(cleaned in page, (kwargs, cleaned)) |
|
238 |
|
239 def test_nonregr_script_kiddies(self): |
|
240 """test against current script injection""" |
|
241 injected = '<i>toto</i>' |
|
242 cleaned = 'toto' |
|
243 for kwargs in ({'__message': injected}, |
|
244 {'vid': injected}, |
|
245 {'vtitle': injected}, |
|
246 ): |
|
247 yield self._test_cleaned, kwargs, injected, cleaned |
|
248 |
|
249 def test_site_wide_eproperties_sync(self): |
|
250 # XXX work in all-in-one configuration but not in twisted for instance |
|
251 # in which case we need a kindof repo -> http server notification |
|
252 # protocol |
|
253 vreg = self.app.vreg |
|
254 # default value |
|
255 self.assertEquals(vreg.property_value('ui.language'), 'en') |
|
256 self.execute('INSERT EProperty X: X value "fr", X pkey "ui.language"') |
|
257 self.assertEquals(vreg.property_value('ui.language'), 'en') |
|
258 self.commit() |
|
259 self.assertEquals(vreg.property_value('ui.language'), 'fr') |
|
260 self.execute('SET X value "de" WHERE X pkey "ui.language"') |
|
261 self.assertEquals(vreg.property_value('ui.language'), 'fr') |
|
262 self.commit() |
|
263 self.assertEquals(vreg.property_value('ui.language'), 'de') |
|
264 self.execute('DELETE EProperty X WHERE X pkey "ui.language"') |
|
265 self.assertEquals(vreg.property_value('ui.language'), 'de') |
|
266 self.commit() |
|
267 self.assertEquals(vreg.property_value('ui.language'), 'en') |
|
268 |
|
269 def test_fb_login_concept(self): |
|
270 """see data/views.py""" |
|
271 self.set_option('auth-mode', 'cookie') |
|
272 self.set_option('anonymous-user', 'anon') |
|
273 self.login('anon') |
|
274 req = self.request() |
|
275 origcnx = req.cnx |
|
276 req.form['__fblogin'] = u'turlututu' |
|
277 page = self.publish(req) |
|
278 self.failIf(req.cnx is origcnx) |
|
279 self.assertEquals(req.user.login, 'turlututu') |
|
280 self.failUnless('turlututu' in page, page) |
|
281 |
|
282 # authentication tests #################################################### |
|
283 |
|
284 def _init_auth(self, authmode, anonuser=None): |
|
285 self.set_option('auth-mode', authmode) |
|
286 self.set_option('anonymous-user', anonuser) |
|
287 req = self.request() |
|
288 origcnx = req.cnx |
|
289 req.cnx = None |
|
290 sh = self.app.session_handler |
|
291 # not properly cleaned between tests |
|
292 self.open_sessions = sh.session_manager._sessions = {} |
|
293 return req, origcnx |
|
294 |
|
295 def _test_auth_succeed(self, req, origcnx): |
|
296 sh = self.app.session_handler |
|
297 path, params = self.expect_redirect(lambda x: self.app.connect(x), req) |
|
298 cnx = req.cnx |
|
299 self.assertEquals(len(self.open_sessions), 1, self.open_sessions) |
|
300 self.assertEquals(cnx.login, origcnx.login) |
|
301 self.assertEquals(cnx.password, origcnx.password) |
|
302 self.assertEquals(cnx.anonymous_connection, False) |
|
303 self.assertEquals(path, 'view') |
|
304 self.assertEquals(params, {'__message': 'welcome %s !' % origcnx.login}) |
|
305 |
|
306 def _test_auth_fail(self, req): |
|
307 self.assertRaises(AuthenticationError, self.app.connect, req) |
|
308 self.assertEquals(req.cnx, None) |
|
309 self.assertEquals(len(self.open_sessions), 0) |
|
310 clear_cache(req, 'get_authorization') |
|
311 |
|
312 def test_http_auth_no_anon(self): |
|
313 req, origcnx = self._init_auth('http') |
|
314 self._test_auth_fail(req) |
|
315 self.assertRaises(ExplicitLogin, self.publish, req, 'login') |
|
316 self.assertEquals(req.cnx, None) |
|
317 authstr = base64.encodestring('%s:%s' % (origcnx.login, origcnx.password)) |
|
318 req._headers['Authorization'] = 'basic %s' % authstr |
|
319 self._test_auth_succeed(req, origcnx) |
|
320 self.assertRaises(AuthenticationError, self.publish, req, 'logout') |
|
321 self.assertEquals(len(self.open_sessions), 0) |
|
322 |
|
323 def test_cookie_auth_no_anon(self): |
|
324 req, origcnx = self._init_auth('cookie') |
|
325 self._test_auth_fail(req) |
|
326 form = self.publish(req, 'login') |
|
327 self.failUnless('__login' in form) |
|
328 self.failUnless('__password' in form) |
|
329 self.assertEquals(req.cnx, None) |
|
330 req.form['__login'] = origcnx.login |
|
331 req.form['__password'] = origcnx.password |
|
332 self._test_auth_succeed(req, origcnx) |
|
333 self.assertRaises(AuthenticationError, self.publish, req, 'logout') |
|
334 self.assertEquals(len(self.open_sessions), 0) |
|
335 |
|
336 def _test_auth_anon(self, req): |
|
337 self.app.connect(req) |
|
338 acnx = req.cnx |
|
339 self.assertEquals(len(self.open_sessions), 1) |
|
340 self.assertEquals(acnx.login, 'anon') |
|
341 self.assertEquals(acnx.password, 'anon') |
|
342 self.failUnless(acnx.anonymous_connection) |
|
343 self._reset_cookie(req) |
|
344 |
|
345 def _reset_cookie(self, req): |
|
346 # preparing the suite of the test |
|
347 # set session id in cookie |
|
348 cookie = Cookie.SimpleCookie() |
|
349 cookie['__session'] = req.cnx.sessionid |
|
350 req._headers['Cookie'] = cookie['__session'].OutputString() |
|
351 clear_cache(req, 'get_authorization') |
|
352 # reset cnx as if it was a new incoming request |
|
353 req.cnx = None |
|
354 |
|
355 def _test_anon_auth_fail(self, req): |
|
356 self.assertEquals(len(self.open_sessions), 1) |
|
357 self.app.connect(req) |
|
358 self.assertEquals(req.message, 'authentication failure') |
|
359 self.assertEquals(req.cnx.anonymous_connection, True) |
|
360 self.assertEquals(len(self.open_sessions), 1) |
|
361 self._reset_cookie(req) |
|
362 |
|
363 def test_http_auth_anon_allowed(self): |
|
364 req, origcnx = self._init_auth('http', 'anon') |
|
365 self._test_auth_anon(req) |
|
366 authstr = base64.encodestring('toto:pouet') |
|
367 req._headers['Authorization'] = 'basic %s' % authstr |
|
368 self._test_anon_auth_fail(req) |
|
369 authstr = base64.encodestring('%s:%s' % (origcnx.login, origcnx.password)) |
|
370 req._headers['Authorization'] = 'basic %s' % authstr |
|
371 self._test_auth_succeed(req, origcnx) |
|
372 self.assertRaises(AuthenticationError, self.publish, req, 'logout') |
|
373 self.assertEquals(len(self.open_sessions), 0) |
|
374 |
|
375 def test_cookie_auth_anon_allowed(self): |
|
376 req, origcnx = self._init_auth('cookie', 'anon') |
|
377 self._test_auth_anon(req) |
|
378 req.form['__login'] = 'toto' |
|
379 req.form['__password'] = 'pouet' |
|
380 self._test_anon_auth_fail(req) |
|
381 req.form['__login'] = origcnx.login |
|
382 req.form['__password'] = origcnx.password |
|
383 self._test_auth_succeed(req, origcnx) |
|
384 self.assertRaises(AuthenticationError, self.publish, req, 'logout') |
|
385 self.assertEquals(len(self.open_sessions), 0) |
|
386 |
|
387 |
|
388 |
|
389 |
|
390 if __name__ == '__main__': |
|
391 unittest_main() |