1 # copyright 2003-2017 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
3 # |
|
4 # This file is part of CubicWeb. |
|
5 # |
|
6 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
7 # terms of the GNU Lesser General Public License as published by the Free |
|
8 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
9 # any later version. |
|
10 # |
|
11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
13 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
14 # details. |
|
15 # |
|
16 # You should have received a copy of the GNU Lesser General Public License along |
|
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
18 """unit tests for module cubicweb.utils""" |
|
19 |
|
20 import base64 |
|
21 import datetime |
|
22 import decimal |
|
23 import doctest |
|
24 import re |
|
25 from unittest import TestCase |
|
26 |
|
27 from cubicweb import Binary, Unauthorized |
|
28 from cubicweb.devtools.testlib import CubicWebTC |
|
29 from cubicweb.utils import (make_uid, UStringIO, RepeatList, HTMLHead, |
|
30 QueryCache) |
|
31 from cubicweb.entity import Entity |
|
32 |
|
33 try: |
|
34 from cubicweb.utils import CubicWebJsonEncoder, json |
|
35 except ImportError: |
|
36 json = None |
|
37 |
|
38 |
|
39 class MakeUidTC(TestCase): |
|
40 def test_1(self): |
|
41 self.assertNotEqual(make_uid('xyz'), make_uid('abcd')) |
|
42 self.assertNotEqual(make_uid('xyz'), make_uid('xyz')) |
|
43 |
|
44 def test_2(self): |
|
45 d = set() |
|
46 while len(d)<10000: |
|
47 uid = make_uid('xyz') |
|
48 if uid in d: |
|
49 self.fail(len(d)) |
|
50 if re.match('\d', uid): |
|
51 self.fail('make_uid must not return something begining with ' |
|
52 'some numeric character, got %s' % uid) |
|
53 d.add(uid) |
|
54 |
|
55 |
|
56 class TestQueryCache(TestCase): |
|
57 def test_querycache(self): |
|
58 c = QueryCache(ceiling=20) |
|
59 # write only |
|
60 for x in range(10): |
|
61 c[x] = x |
|
62 self.assertEqual(c._usage_report(), |
|
63 {'transientcount': 0, |
|
64 'itemcount': 10, |
|
65 'permanentcount': 0}) |
|
66 c = QueryCache(ceiling=10) |
|
67 # we should also get a warning |
|
68 for x in range(20): |
|
69 c[x] = x |
|
70 self.assertEqual(c._usage_report(), |
|
71 {'transientcount': 0, |
|
72 'itemcount': 10, |
|
73 'permanentcount': 0}) |
|
74 # write + reads |
|
75 c = QueryCache(ceiling=20) |
|
76 for n in range(4): |
|
77 for x in range(10): |
|
78 c[x] = x |
|
79 c[x] |
|
80 self.assertEqual(c._usage_report(), |
|
81 {'transientcount': 10, |
|
82 'itemcount': 10, |
|
83 'permanentcount': 0}) |
|
84 c = QueryCache(ceiling=20) |
|
85 for n in range(17): |
|
86 for x in range(10): |
|
87 c[x] = x |
|
88 c[x] |
|
89 self.assertEqual(c._usage_report(), |
|
90 {'transientcount': 0, |
|
91 'itemcount': 10, |
|
92 'permanentcount': 10}) |
|
93 c = QueryCache(ceiling=20) |
|
94 for n in range(17): |
|
95 for x in range(10): |
|
96 c[x] = x |
|
97 if n % 2: |
|
98 c[x] |
|
99 if x % 2: |
|
100 c[x] |
|
101 self.assertEqual(c._usage_report(), |
|
102 {'transientcount': 5, |
|
103 'itemcount': 10, |
|
104 'permanentcount': 5}) |
|
105 |
|
106 def test_clear_on_overflow(self): |
|
107 """Tests that only non-permanent items in the cache are wiped-out on ceiling overflow |
|
108 """ |
|
109 c = QueryCache(ceiling=10) |
|
110 # set 10 values |
|
111 for x in range(10): |
|
112 c[x] = x |
|
113 # arrange for the first 5 to be permanent |
|
114 for x in range(5): |
|
115 for r in range(QueryCache._maxlevel + 2): |
|
116 v = c[x] |
|
117 self.assertEqual(v, x) |
|
118 # Add the 11-th |
|
119 c[10] = 10 |
|
120 self.assertEqual(c._usage_report(), |
|
121 {'transientcount': 0, |
|
122 'itemcount': 6, |
|
123 'permanentcount': 5}) |
|
124 |
|
125 def test_get_with_default(self): |
|
126 """ |
|
127 Tests the capability of QueryCache for retrieving items with a default value |
|
128 """ |
|
129 c = QueryCache(ceiling=20) |
|
130 # set 10 values |
|
131 for x in range(10): |
|
132 c[x] = x |
|
133 # arrange for the first 5 to be permanent |
|
134 for x in range(5): |
|
135 for r in range(QueryCache._maxlevel + 2): |
|
136 v = c[x] |
|
137 self.assertEqual(v, x) |
|
138 self.assertEqual(c._usage_report(), |
|
139 {'transientcount': 0, |
|
140 'itemcount': 10, |
|
141 'permanentcount': 5}) |
|
142 # Test defaults for existing (including in permanents) |
|
143 for x in range(10): |
|
144 v = c.get(x, -1) |
|
145 self.assertEqual(v, x) |
|
146 # Test defaults for others |
|
147 for x in range(10, 15): |
|
148 v = c.get(x, -1) |
|
149 self.assertEqual(v, -1) |
|
150 |
|
151 def test_iterkeys(self): |
|
152 """ |
|
153 Tests the iterating on keys in the cache |
|
154 """ |
|
155 c = QueryCache(ceiling=20) |
|
156 # set 10 values |
|
157 for x in range(10): |
|
158 c[x] = x |
|
159 # arrange for the first 5 to be permanent |
|
160 for x in range(5): |
|
161 for r in range(QueryCache._maxlevel + 2): |
|
162 v = c[x] |
|
163 self.assertEqual(v, x) |
|
164 self.assertEqual(c._usage_report(), |
|
165 {'transientcount': 0, |
|
166 'itemcount': 10, |
|
167 'permanentcount': 5}) |
|
168 keys = sorted(c) |
|
169 for x in range(10): |
|
170 self.assertEqual(x, keys[x]) |
|
171 |
|
172 def test_items(self): |
|
173 """ |
|
174 Tests the iterating on key-value couples in the cache |
|
175 """ |
|
176 c = QueryCache(ceiling=20) |
|
177 # set 10 values |
|
178 for x in range(10): |
|
179 c[x] = x |
|
180 # arrange for the first 5 to be permanent |
|
181 for x in range(5): |
|
182 for r in range(QueryCache._maxlevel + 2): |
|
183 v = c[x] |
|
184 self.assertEqual(v, x) |
|
185 self.assertEqual(c._usage_report(), |
|
186 {'transientcount': 0, |
|
187 'itemcount': 10, |
|
188 'permanentcount': 5}) |
|
189 content = sorted(c.items()) |
|
190 for x in range(10): |
|
191 self.assertEqual(x, content[x][0]) |
|
192 self.assertEqual(x, content[x][1]) |
|
193 |
|
194 |
|
195 class UStringIOTC(TestCase): |
|
196 def test_boolean_value(self): |
|
197 self.assertTrue(UStringIO()) |
|
198 |
|
199 |
|
200 class RepeatListTC(TestCase): |
|
201 |
|
202 def test_base(self): |
|
203 l = RepeatList(3, (1, 3)) |
|
204 self.assertEqual(l[0], (1, 3)) |
|
205 self.assertEqual(l[2], (1, 3)) |
|
206 self.assertEqual(l[-1], (1, 3)) |
|
207 self.assertEqual(len(l), 3) |
|
208 # XXX |
|
209 self.assertEqual(l[4], (1, 3)) |
|
210 |
|
211 self.assertFalse(RepeatList(0, None)) |
|
212 |
|
213 def test_slice(self): |
|
214 l = RepeatList(3, (1, 3)) |
|
215 self.assertEqual(l[0:1], [(1, 3)]) |
|
216 self.assertEqual(l[0:4], [(1, 3)]*3) |
|
217 self.assertEqual(l[:], [(1, 3)]*3) |
|
218 |
|
219 def test_iter(self): |
|
220 self.assertEqual(list(RepeatList(3, (1, 3))), |
|
221 [(1, 3)]*3) |
|
222 |
|
223 def test_add(self): |
|
224 l = RepeatList(3, (1, 3)) |
|
225 self.assertEqual(l + [(1, 4)], [(1, 3)]*3 + [(1, 4)]) |
|
226 self.assertEqual([(1, 4)] + l, [(1, 4)] + [(1, 3)]*3) |
|
227 self.assertEqual(l + RepeatList(2, (2, 3)), [(1, 3)]*3 + [(2, 3)]*2) |
|
228 |
|
229 x = l + RepeatList(2, (1, 3)) |
|
230 self.assertIsInstance(x, RepeatList) |
|
231 self.assertEqual(len(x), 5) |
|
232 self.assertEqual(x[0], (1, 3)) |
|
233 |
|
234 x = l + [(1, 3)] * 2 |
|
235 self.assertEqual(x, [(1, 3)] * 5) |
|
236 |
|
237 def test_eq(self): |
|
238 self.assertEqual(RepeatList(3, (1, 3)), |
|
239 [(1, 3)]*3) |
|
240 |
|
241 def test_pop(self): |
|
242 l = RepeatList(3, (1, 3)) |
|
243 l.pop(2) |
|
244 self.assertEqual(l, [(1, 3)]*2) |
|
245 |
|
246 |
|
247 class JSONEncoderTC(TestCase): |
|
248 def setUp(self): |
|
249 if json is None: |
|
250 self.skipTest('json not available') |
|
251 |
|
252 def encode(self, value): |
|
253 return json.dumps(value, cls=CubicWebJsonEncoder) |
|
254 |
|
255 def test_encoding_dates(self): |
|
256 self.assertEqual(self.encode(datetime.datetime(2009, 9, 9, 20, 30)), |
|
257 '"2009/09/09 20:30:00"') |
|
258 self.assertEqual(self.encode(datetime.date(2009, 9, 9)), |
|
259 '"2009/09/09"') |
|
260 self.assertEqual(self.encode(datetime.time(20, 30)), |
|
261 '"20:30:00"') |
|
262 |
|
263 def test_encoding_decimal(self): |
|
264 self.assertEqual(self.encode(decimal.Decimal('1.2')), '1.2') |
|
265 |
|
266 def test_encoding_bare_entity(self): |
|
267 e = Entity(None) |
|
268 e.cw_attr_cache['pouet'] = 'hop' |
|
269 e.eid = 2 |
|
270 self.assertEqual(json.loads(self.encode(e)), |
|
271 {'pouet': 'hop', 'eid': 2}) |
|
272 |
|
273 def test_encoding_entity_in_list(self): |
|
274 e = Entity(None) |
|
275 e.cw_attr_cache['pouet'] = 'hop' |
|
276 e.eid = 2 |
|
277 self.assertEqual(json.loads(self.encode([e])), |
|
278 [{'pouet': 'hop', 'eid': 2}]) |
|
279 |
|
280 def test_encoding_binary(self): |
|
281 for content in (b'he he', b'h\xe9 hxe9'): |
|
282 with self.subTest(content=content): |
|
283 encoded = self.encode(Binary(content)) |
|
284 self.assertEqual(base64.b64decode(encoded), content) |
|
285 |
|
286 def test_encoding_unknown_stuff(self): |
|
287 self.assertEqual(self.encode(TestCase), 'null') |
|
288 |
|
289 |
|
290 class HTMLHeadTC(CubicWebTC): |
|
291 |
|
292 def htmlhead(self, datadir_url): |
|
293 with self.admin_access.web_request() as req: |
|
294 base_url = u'http://test.fr/data/' |
|
295 req.datadir_url = base_url |
|
296 head = HTMLHead(req) |
|
297 return head |
|
298 |
|
299 def test_concat_urls(self): |
|
300 base_url = u'http://test.fr/data/' |
|
301 head = self.htmlhead(base_url) |
|
302 urls = [base_url + u'bob1.js', |
|
303 base_url + u'bob2.js', |
|
304 base_url + u'bob3.js'] |
|
305 result = head.concat_urls(urls) |
|
306 expected = u'http://test.fr/data/??bob1.js,bob2.js,bob3.js' |
|
307 self.assertEqual(result, expected) |
|
308 |
|
309 def test_group_urls(self): |
|
310 base_url = u'http://test.fr/data/' |
|
311 head = self.htmlhead(base_url) |
|
312 urls_spec = [(base_url + u'bob0.js', None), |
|
313 (base_url + u'bob1.js', None), |
|
314 (u'http://ext.com/bob2.js', None), |
|
315 (u'http://ext.com/bob3.js', None), |
|
316 (base_url + u'bob4.css', 'all'), |
|
317 (base_url + u'bob5.css', 'all'), |
|
318 (base_url + u'bob6.css', 'print'), |
|
319 (base_url + u'bob7.css', 'print'), |
|
320 (base_url + u'bob8.css', ('all', u'[if IE 8]')), |
|
321 (base_url + u'bob9.css', ('print', u'[if IE 8]')) |
|
322 ] |
|
323 result = head.group_urls(urls_spec) |
|
324 expected = [(base_url + u'??bob0.js,bob1.js', None), |
|
325 (u'http://ext.com/bob2.js', None), |
|
326 (u'http://ext.com/bob3.js', None), |
|
327 (base_url + u'??bob4.css,bob5.css', 'all'), |
|
328 (base_url + u'??bob6.css,bob7.css', 'print'), |
|
329 (base_url + u'bob8.css', ('all', u'[if IE 8]')), |
|
330 (base_url + u'bob9.css', ('print', u'[if IE 8]')) |
|
331 ] |
|
332 self.assertEqual(list(result), expected) |
|
333 |
|
334 def test_getvalue_with_concat(self): |
|
335 self.config.global_set_option('concat-resources', True) |
|
336 base_url = u'http://test.fr/data/' |
|
337 head = self.htmlhead(base_url) |
|
338 head.add_js(base_url + u'bob0.js') |
|
339 head.add_js(base_url + u'bob1.js') |
|
340 head.add_js(u'http://ext.com/bob2.js') |
|
341 head.add_js(u'http://ext.com/bob3.js') |
|
342 head.add_css(base_url + u'bob4.css') |
|
343 head.add_css(base_url + u'bob5.css') |
|
344 head.add_css(base_url + u'bob6.css', 'print') |
|
345 head.add_css(base_url + u'bob7.css', 'print') |
|
346 head.add_ie_css(base_url + u'bob8.css') |
|
347 head.add_ie_css(base_url + u'bob9.css', 'print', u'[if lt IE 7]') |
|
348 result = head.getvalue() |
|
349 expected = u"""<head> |
|
350 <link rel="stylesheet" type="text/css" media="all" href="http://test.fr/data/??bob4.css,bob5.css"/> |
|
351 <link rel="stylesheet" type="text/css" media="print" href="http://test.fr/data/??bob6.css,bob7.css"/> |
|
352 <!--[if lt IE 8]> |
|
353 <link rel="stylesheet" type="text/css" media="all" href="http://test.fr/data/bob8.css"/> |
|
354 <!--[if lt IE 7]> |
|
355 <link rel="stylesheet" type="text/css" media="print" href="http://test.fr/data/bob9.css"/> |
|
356 <![endif]--> |
|
357 <script type="text/javascript" src="http://test.fr/data/??bob0.js,bob1.js"></script> |
|
358 <script type="text/javascript" src="http://ext.com/bob2.js"></script> |
|
359 <script type="text/javascript" src="http://ext.com/bob3.js"></script> |
|
360 </head> |
|
361 """ |
|
362 self.assertEqual(result, expected) |
|
363 |
|
364 def test_getvalue_without_concat(self): |
|
365 self.config.global_set_option('concat-resources', False) |
|
366 try: |
|
367 base_url = u'http://test.fr/data/' |
|
368 head = self.htmlhead(base_url) |
|
369 head.add_js(base_url + u'bob0.js') |
|
370 head.add_js(base_url + u'bob1.js') |
|
371 head.add_js(u'http://ext.com/bob2.js') |
|
372 head.add_js(u'http://ext.com/bob3.js') |
|
373 head.add_css(base_url + u'bob4.css') |
|
374 head.add_css(base_url + u'bob5.css') |
|
375 head.add_css(base_url + u'bob6.css', 'print') |
|
376 head.add_css(base_url + u'bob7.css', 'print') |
|
377 head.add_ie_css(base_url + u'bob8.css') |
|
378 head.add_ie_css(base_url + u'bob9.css', 'print', u'[if lt IE 7]') |
|
379 result = head.getvalue() |
|
380 expected = u"""<head> |
|
381 <link rel="stylesheet" type="text/css" media="all" href="http://test.fr/data/bob4.css"/> |
|
382 <link rel="stylesheet" type="text/css" media="all" href="http://test.fr/data/bob5.css"/> |
|
383 <link rel="stylesheet" type="text/css" media="print" href="http://test.fr/data/bob6.css"/> |
|
384 <link rel="stylesheet" type="text/css" media="print" href="http://test.fr/data/bob7.css"/> |
|
385 <!--[if lt IE 8]> |
|
386 <link rel="stylesheet" type="text/css" media="all" href="http://test.fr/data/bob8.css"/> |
|
387 <!--[if lt IE 7]> |
|
388 <link rel="stylesheet" type="text/css" media="print" href="http://test.fr/data/bob9.css"/> |
|
389 <![endif]--> |
|
390 <script type="text/javascript" src="http://test.fr/data/bob0.js"></script> |
|
391 <script type="text/javascript" src="http://test.fr/data/bob1.js"></script> |
|
392 <script type="text/javascript" src="http://ext.com/bob2.js"></script> |
|
393 <script type="text/javascript" src="http://ext.com/bob3.js"></script> |
|
394 </head> |
|
395 """ |
|
396 self.assertEqual(result, expected) |
|
397 finally: |
|
398 self.config.global_set_option('concat-resources', True) |
|
399 |
|
400 |
|
401 def UnauthorizedTC(TestCase): |
|
402 |
|
403 def _test(self, func): |
|
404 self.assertEqual(func(Unauthorized()), |
|
405 'You are not allowed to perform this operation') |
|
406 self.assertEqual(func(Unauthorized('a')), |
|
407 'a') |
|
408 self.assertEqual(func(Unauthorized('a', 'b')), |
|
409 'You are not allowed to perform a operation on b') |
|
410 self.assertEqual(func(Unauthorized('a', 'b', 'c')), |
|
411 'a b c') |
|
412 |
|
413 def test_str(self): |
|
414 self._test(str) |
|
415 |
|
416 |
|
417 |
|
418 def load_tests(loader, tests, ignore): |
|
419 import cubicweb.utils |
|
420 tests.addTests(doctest.DocTestSuite(cubicweb.utils)) |
|
421 return tests |
|
422 |
|
423 |
|
424 if __name__ == '__main__': |
|
425 import unittest |
|
426 unittest.main() |
|