1 # copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
3 # |
|
4 # This file is part of CubicWeb. |
|
5 # |
|
6 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
7 # terms of the GNU Lesser General Public License as published by the Free |
|
8 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
9 # any later version. |
|
10 # |
|
11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
13 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
14 # details. |
|
15 # |
|
16 # You should have received a copy of the GNU Lesser General Public License along |
|
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
18 """unit tests for selectors mechanism""" |
|
19 from __future__ import with_statement |
|
20 |
|
21 from operator import eq, lt, le, gt |
|
22 from logilab.common.testlib import TestCase, unittest_main |
|
23 |
|
24 from cubicweb import Binary |
|
25 from cubicweb.devtools.testlib import CubicWebTC |
|
26 from cubicweb.appobject import Selector, AndSelector, OrSelector |
|
27 from cubicweb.selectors import (is_instance, adaptable, match_kwargs, match_user_groups, |
|
28 multi_lines_rset, score_entity, is_in_state, |
|
29 on_transition, rql_condition, relation_possible) |
|
30 from cubicweb.web import action |
|
31 |
|
32 |
|
33 class _1_(Selector): |
|
34 def __call__(self, *args, **kwargs): |
|
35 return 1 |
|
36 |
|
37 class _0_(Selector): |
|
38 def __call__(self, *args, **kwargs): |
|
39 return 0 |
|
40 |
|
41 def _2_(*args, **kwargs): |
|
42 return 2 |
|
43 |
|
44 |
|
45 class SelectorsTC(TestCase): |
|
46 def test_basic_and(self): |
|
47 selector = _1_() & _1_() |
|
48 self.assertEqual(selector(None), 2) |
|
49 selector = _1_() & _0_() |
|
50 self.assertEqual(selector(None), 0) |
|
51 selector = _0_() & _1_() |
|
52 self.assertEqual(selector(None), 0) |
|
53 |
|
54 def test_basic_or(self): |
|
55 selector = _1_() | _1_() |
|
56 self.assertEqual(selector(None), 1) |
|
57 selector = _1_() | _0_() |
|
58 self.assertEqual(selector(None), 1) |
|
59 selector = _0_() | _1_() |
|
60 self.assertEqual(selector(None), 1) |
|
61 selector = _0_() | _0_() |
|
62 self.assertEqual(selector(None), 0) |
|
63 |
|
64 def test_selector_and_function(self): |
|
65 selector = _1_() & _2_ |
|
66 self.assertEqual(selector(None), 3) |
|
67 selector = _2_ & _1_() |
|
68 self.assertEqual(selector(None), 3) |
|
69 |
|
70 def test_three_and(self): |
|
71 selector = _1_() & _1_() & _1_() |
|
72 self.assertEqual(selector(None), 3) |
|
73 selector = _1_() & _0_() & _1_() |
|
74 self.assertEqual(selector(None), 0) |
|
75 selector = _0_() & _1_() & _1_() |
|
76 self.assertEqual(selector(None), 0) |
|
77 |
|
78 def test_three_or(self): |
|
79 selector = _1_() | _1_() | _1_() |
|
80 self.assertEqual(selector(None), 1) |
|
81 selector = _1_() | _0_() | _1_() |
|
82 self.assertEqual(selector(None), 1) |
|
83 selector = _0_() | _1_() | _1_() |
|
84 self.assertEqual(selector(None), 1) |
|
85 selector = _0_() | _0_() | _0_() |
|
86 self.assertEqual(selector(None), 0) |
|
87 |
|
88 def test_composition(self): |
|
89 selector = (_1_() & _1_()) & (_1_() & _1_()) |
|
90 self.assertTrue(isinstance(selector, AndSelector)) |
|
91 self.assertEqual(len(selector.selectors), 4) |
|
92 self.assertEqual(selector(None), 4) |
|
93 selector = (_1_() & _0_()) | (_1_() & _1_()) |
|
94 self.assertTrue(isinstance(selector, OrSelector)) |
|
95 self.assertEqual(len(selector.selectors), 2) |
|
96 self.assertEqual(selector(None), 2) |
|
97 |
|
98 def test_search_selectors(self): |
|
99 sel = is_instance('something') |
|
100 self.assertIs(sel.search_selector(is_instance), sel) |
|
101 csel = AndSelector(sel, Selector()) |
|
102 self.assertIs(csel.search_selector(is_instance), sel) |
|
103 csel = AndSelector(Selector(), sel) |
|
104 self.assertIs(csel.search_selector(is_instance), sel) |
|
105 self.assertIs(csel.search_selector((AndSelector, OrSelector)), csel) |
|
106 self.assertIs(csel.search_selector((OrSelector, AndSelector)), csel) |
|
107 self.assertIs(csel.search_selector((is_instance, score_entity)), sel) |
|
108 self.assertIs(csel.search_selector((score_entity, is_instance)), sel) |
|
109 |
|
110 def test_inplace_and(self): |
|
111 selector = _1_() |
|
112 selector &= _1_() |
|
113 selector &= _1_() |
|
114 self.assertEqual(selector(None), 3) |
|
115 selector = _1_() |
|
116 selector &= _0_() |
|
117 selector &= _1_() |
|
118 self.assertEqual(selector(None), 0) |
|
119 selector = _0_() |
|
120 selector &= _1_() |
|
121 selector &= _1_() |
|
122 self.assertEqual(selector(None), 0) |
|
123 selector = _0_() |
|
124 selector &= _0_() |
|
125 selector &= _0_() |
|
126 self.assertEqual(selector(None), 0) |
|
127 |
|
128 def test_inplace_or(self): |
|
129 selector = _1_() |
|
130 selector |= _1_() |
|
131 selector |= _1_() |
|
132 self.assertEqual(selector(None), 1) |
|
133 selector = _1_() |
|
134 selector |= _0_() |
|
135 selector |= _1_() |
|
136 self.assertEqual(selector(None), 1) |
|
137 selector = _0_() |
|
138 selector |= _1_() |
|
139 selector |= _1_() |
|
140 self.assertEqual(selector(None), 1) |
|
141 selector = _0_() |
|
142 selector |= _0_() |
|
143 selector |= _0_() |
|
144 self.assertEqual(selector(None), 0) |
|
145 |
|
146 |
|
147 class ImplementsSelectorTC(CubicWebTC): |
|
148 def test_etype_priority(self): |
|
149 req = self.request() |
|
150 f = req.create_entity('File', data_name=u'hop.txt', data=Binary('hop')) |
|
151 rset = f.as_rset() |
|
152 anyscore = is_instance('Any')(f.__class__, req, rset=rset) |
|
153 idownscore = adaptable('IDownloadable')(f.__class__, req, rset=rset) |
|
154 self.assertTrue(idownscore > anyscore, (idownscore, anyscore)) |
|
155 filescore = is_instance('File')(f.__class__, req, rset=rset) |
|
156 self.assertTrue(filescore > idownscore, (filescore, idownscore)) |
|
157 |
|
158 def test_etype_inheritance_no_yams_inheritance(self): |
|
159 cls = self.vreg['etypes'].etype_class('Personne') |
|
160 self.assertFalse(is_instance('Societe').score_class(cls, self.request())) |
|
161 |
|
162 def test_yams_inheritance(self): |
|
163 cls = self.vreg['etypes'].etype_class('Transition') |
|
164 self.assertEqual(is_instance('BaseTransition').score_class(cls, self.request()), |
|
165 3) |
|
166 |
|
167 def test_outer_join(self): |
|
168 req = self.request() |
|
169 rset = req.execute('Any U,B WHERE B? bookmarked_by U, U login "anon"') |
|
170 self.assertEqual(is_instance('Bookmark')(None, req, rset=rset, row=0, col=1), |
|
171 0) |
|
172 |
|
173 |
|
174 class WorkflowSelectorTC(CubicWebTC): |
|
175 def _commit(self): |
|
176 self.commit() |
|
177 self.wf_entity.cw_clear_all_caches() |
|
178 |
|
179 def setup_database(self): |
|
180 wf = self.shell().add_workflow("wf_test", 'StateFull', default=True) |
|
181 created = wf.add_state('created', initial=True) |
|
182 validated = wf.add_state('validated') |
|
183 abandoned = wf.add_state('abandoned') |
|
184 wf.add_transition('validate', created, validated, ('managers',)) |
|
185 wf.add_transition('forsake', (created, validated,), abandoned, ('managers',)) |
|
186 |
|
187 def setUp(self): |
|
188 super(WorkflowSelectorTC, self).setUp() |
|
189 self.req = self.request() |
|
190 self.wf_entity = self.req.create_entity('StateFull', name=u'') |
|
191 self.rset = self.wf_entity.as_rset() |
|
192 self.adapter = self.wf_entity.cw_adapt_to('IWorkflowable') |
|
193 self._commit() |
|
194 self.assertEqual(self.adapter.state, 'created') |
|
195 # enable debug mode to state/transition validation on the fly |
|
196 self.vreg.config.debugmode = True |
|
197 |
|
198 def tearDown(self): |
|
199 self.vreg.config.debugmode = False |
|
200 super(WorkflowSelectorTC, self).tearDown() |
|
201 |
|
202 def test_is_in_state(self): |
|
203 for state in ('created', 'validated', 'abandoned'): |
|
204 selector = is_in_state(state) |
|
205 self.assertEqual(selector(None, self.req, rset=self.rset), |
|
206 state=="created") |
|
207 |
|
208 self.adapter.fire_transition('validate') |
|
209 self._commit() |
|
210 self.assertEqual(self.adapter.state, 'validated') |
|
211 |
|
212 selector = is_in_state('created') |
|
213 self.assertEqual(selector(None, self.req, rset=self.rset), 0) |
|
214 selector = is_in_state('validated') |
|
215 self.assertEqual(selector(None, self.req, rset=self.rset), 1) |
|
216 selector = is_in_state('validated', 'abandoned') |
|
217 self.assertEqual(selector(None, self.req, rset=self.rset), 1) |
|
218 selector = is_in_state('abandoned') |
|
219 self.assertEqual(selector(None, self.req, rset=self.rset), 0) |
|
220 |
|
221 self.adapter.fire_transition('forsake') |
|
222 self._commit() |
|
223 self.assertEqual(self.adapter.state, 'abandoned') |
|
224 |
|
225 selector = is_in_state('created') |
|
226 self.assertEqual(selector(None, self.req, rset=self.rset), 0) |
|
227 selector = is_in_state('validated') |
|
228 self.assertEqual(selector(None, self.req, rset=self.rset), 0) |
|
229 selector = is_in_state('validated', 'abandoned') |
|
230 self.assertEqual(selector(None, self.req, rset=self.rset), 1) |
|
231 self.assertEqual(self.adapter.state, 'abandoned') |
|
232 self.assertEqual(selector(None, self.req, rset=self.rset), 1) |
|
233 |
|
234 def test_is_in_state_unvalid_names(self): |
|
235 selector = is_in_state("unknown") |
|
236 with self.assertRaises(ValueError) as cm: |
|
237 selector(None, self.req, rset=self.rset) |
|
238 self.assertEqual(str(cm.exception), |
|
239 "wf_test: unknown state(s): unknown") |
|
240 selector = is_in_state("weird", "unknown", "created", "weird") |
|
241 with self.assertRaises(ValueError) as cm: |
|
242 selector(None, self.req, rset=self.rset) |
|
243 self.assertEqual(str(cm.exception), |
|
244 "wf_test: unknown state(s): unknown,weird") |
|
245 |
|
246 def test_on_transition(self): |
|
247 for transition in ('validate', 'forsake'): |
|
248 selector = on_transition(transition) |
|
249 self.assertEqual(selector(None, self.req, rset=self.rset), 0) |
|
250 |
|
251 self.adapter.fire_transition('validate') |
|
252 self._commit() |
|
253 self.assertEqual(self.adapter.state, 'validated') |
|
254 |
|
255 selector = on_transition("validate") |
|
256 self.assertEqual(selector(None, self.req, rset=self.rset), 1) |
|
257 selector = on_transition("validate", "forsake") |
|
258 self.assertEqual(selector(None, self.req, rset=self.rset), 1) |
|
259 selector = on_transition("forsake") |
|
260 self.assertEqual(selector(None, self.req, rset=self.rset), 0) |
|
261 |
|
262 self.adapter.fire_transition('forsake') |
|
263 self._commit() |
|
264 self.assertEqual(self.adapter.state, 'abandoned') |
|
265 |
|
266 selector = on_transition("validate") |
|
267 self.assertEqual(selector(None, self.req, rset=self.rset), 0) |
|
268 selector = on_transition("validate", "forsake") |
|
269 self.assertEqual(selector(None, self.req, rset=self.rset), 1) |
|
270 selector = on_transition("forsake") |
|
271 self.assertEqual(selector(None, self.req, rset=self.rset), 1) |
|
272 |
|
273 def test_on_transition_unvalid_names(self): |
|
274 selector = on_transition("unknown") |
|
275 with self.assertRaises(ValueError) as cm: |
|
276 selector(None, self.req, rset=self.rset) |
|
277 self.assertEqual(str(cm.exception), |
|
278 "wf_test: unknown transition(s): unknown") |
|
279 selector = on_transition("weird", "unknown", "validate", "weird") |
|
280 with self.assertRaises(ValueError) as cm: |
|
281 selector(None, self.req, rset=self.rset) |
|
282 self.assertEqual(str(cm.exception), |
|
283 "wf_test: unknown transition(s): unknown,weird") |
|
284 |
|
285 def test_on_transition_with_no_effect(self): |
|
286 """selector will not be triggered with `change_state()`""" |
|
287 self.adapter.change_state('validated') |
|
288 self._commit() |
|
289 self.assertEqual(self.adapter.state, 'validated') |
|
290 |
|
291 selector = on_transition("validate") |
|
292 self.assertEqual(selector(None, self.req, rset=self.rset), 0) |
|
293 selector = on_transition("validate", "forsake") |
|
294 self.assertEqual(selector(None, self.req, rset=self.rset), 0) |
|
295 selector = on_transition("forsake") |
|
296 self.assertEqual(selector(None, self.req, rset=self.rset), 0) |
|
297 |
|
298 |
|
299 class RelationPossibleTC(CubicWebTC): |
|
300 |
|
301 def test_rqlst_1(self): |
|
302 req = self.request() |
|
303 selector = relation_possible('in_group') |
|
304 select = self.vreg.parse(req, 'Any X WHERE X is CWUser').children[0] |
|
305 score = selector(None, req, rset=1, |
|
306 select=select, filtered_variable=select.defined_vars['X']) |
|
307 self.assertEqual(score, 1) |
|
308 |
|
309 def test_rqlst_2(self): |
|
310 req = self.request() |
|
311 selector = relation_possible('in_group') |
|
312 select = self.vreg.parse(req, 'Any 1, COUNT(X) WHERE X is CWUser, X creation_date XD, ' |
|
313 'Y creation_date YD, Y is CWGroup ' |
|
314 'HAVING DAY(XD)=DAY(YD)').children[0] |
|
315 score = selector(None, req, rset=1, |
|
316 select=select, filtered_variable=select.defined_vars['X']) |
|
317 self.assertEqual(score, 1) |
|
318 |
|
319 |
|
320 class MatchUserGroupsTC(CubicWebTC): |
|
321 def test_owners_group(self): |
|
322 """tests usage of 'owners' group with match_user_group""" |
|
323 class SomeAction(action.Action): |
|
324 __regid__ = 'yo' |
|
325 category = 'foo' |
|
326 __select__ = match_user_groups('owners') |
|
327 self.vreg._loadedmods[__name__] = {} |
|
328 self.vreg.register(SomeAction) |
|
329 SomeAction.__registered__(self.vreg['actions']) |
|
330 self.assertTrue(SomeAction in self.vreg['actions']['yo'], self.vreg['actions']) |
|
331 try: |
|
332 # login as a simple user |
|
333 req = self.request() |
|
334 self.create_user(req, 'john') |
|
335 self.login('john') |
|
336 # it should not be possible to use SomeAction not owned objects |
|
337 req = self.request() |
|
338 rset = req.execute('Any G WHERE G is CWGroup, G name "managers"') |
|
339 self.assertFalse('yo' in dict(self.pactions(req, rset))) |
|
340 # insert a new card, and check that we can use SomeAction on our object |
|
341 self.execute('INSERT Card C: C title "zoubidou"') |
|
342 self.commit() |
|
343 req = self.request() |
|
344 rset = req.execute('Card C WHERE C title "zoubidou"') |
|
345 self.assertTrue('yo' in dict(self.pactions(req, rset)), self.pactions(req, rset)) |
|
346 # make sure even managers can't use the action |
|
347 self.restore_connection() |
|
348 req = self.request() |
|
349 rset = req.execute('Card C WHERE C title "zoubidou"') |
|
350 self.assertFalse('yo' in dict(self.pactions(req, rset))) |
|
351 finally: |
|
352 del self.vreg[SomeAction.__registry__][SomeAction.__regid__] |
|
353 |
|
354 |
|
355 class MultiLinesRsetSelectorTC(CubicWebTC): |
|
356 def setUp(self): |
|
357 super(MultiLinesRsetSelectorTC, self).setUp() |
|
358 self.req = self.request() |
|
359 self.req.execute('INSERT CWGroup G: G name "group1"') |
|
360 self.req.execute('INSERT CWGroup G: G name "group2"') |
|
361 self.commit() |
|
362 self.rset = self.req.execute('Any G WHERE G is CWGroup') |
|
363 |
|
364 def test_default_op_in_selector(self): |
|
365 expected = len(self.rset) |
|
366 selector = multi_lines_rset(expected) |
|
367 self.assertEqual(selector(None, self.req, rset=self.rset), 1) |
|
368 self.assertEqual(selector(None, self.req, None), 0) |
|
369 selector = multi_lines_rset(expected + 1) |
|
370 self.assertEqual(selector(None, self.req, rset=self.rset), 0) |
|
371 self.assertEqual(selector(None, self.req, None), 0) |
|
372 selector = multi_lines_rset(expected - 1) |
|
373 self.assertEqual(selector(None, self.req, rset=self.rset), 0) |
|
374 self.assertEqual(selector(None, self.req, None), 0) |
|
375 |
|
376 def test_without_rset(self): |
|
377 expected = len(self.rset) |
|
378 selector = multi_lines_rset(expected) |
|
379 self.assertEqual(selector(None, self.req, None), 0) |
|
380 selector = multi_lines_rset(expected + 1) |
|
381 self.assertEqual(selector(None, self.req, None), 0) |
|
382 selector = multi_lines_rset(expected - 1) |
|
383 self.assertEqual(selector(None, self.req, None), 0) |
|
384 |
|
385 def test_with_operators(self): |
|
386 expected = len(self.rset) |
|
387 |
|
388 # Format 'expected', 'operator', 'assert' |
|
389 testdata = (( expected, eq, 1), |
|
390 ( expected+1, eq, 0), |
|
391 ( expected-1, eq, 0), |
|
392 ( expected, le, 1), |
|
393 ( expected+1, le, 1), |
|
394 ( expected-1, le, 0), |
|
395 ( expected-1, gt, 1), |
|
396 ( expected, gt, 0), |
|
397 ( expected+1, gt, 0), |
|
398 ( expected+1, lt, 1), |
|
399 ( expected, lt, 0), |
|
400 ( expected-1, lt, 0)) |
|
401 |
|
402 for (expected, operator, assertion) in testdata: |
|
403 selector = multi_lines_rset(expected, operator) |
|
404 yield self.assertEqual, selector(None, self.req, rset=self.rset), assertion |
|
405 |
|
406 def test_match_kwargs_default(self): |
|
407 selector = match_kwargs( set( ('a', 'b') ) ) |
|
408 self.assertEqual(selector(None, None, a=1, b=2), 2) |
|
409 self.assertEqual(selector(None, None, a=1), 0) |
|
410 self.assertEqual(selector(None, None, c=1), 0) |
|
411 self.assertEqual(selector(None, None, a=1, c=1), 0) |
|
412 |
|
413 def test_match_kwargs_any(self): |
|
414 selector = match_kwargs( set( ('a', 'b') ), mode='any') |
|
415 self.assertEqual(selector(None, None, a=1, b=2), 2) |
|
416 self.assertEqual(selector(None, None, a=1), 1) |
|
417 self.assertEqual(selector(None, None, c=1), 0) |
|
418 self.assertEqual(selector(None, None, a=1, c=1), 1) |
|
419 |
|
420 |
|
421 class ScoreEntitySelectorTC(CubicWebTC): |
|
422 |
|
423 def test_intscore_entity_selector(self): |
|
424 req = self.request() |
|
425 rset = req.execute('Any E WHERE E eid 1') |
|
426 selector = score_entity(lambda x: None) |
|
427 self.assertEqual(selector(None, req, rset=rset), 0) |
|
428 selector = score_entity(lambda x: "something") |
|
429 self.assertEqual(selector(None, req, rset=rset), 1) |
|
430 selector = score_entity(lambda x: object) |
|
431 self.assertEqual(selector(None, req, rset=rset), 1) |
|
432 rset = req.execute('Any G LIMIT 2 WHERE G is CWGroup') |
|
433 selector = score_entity(lambda x: 10) |
|
434 self.assertEqual(selector(None, req, rset=rset), 20) |
|
435 selector = score_entity(lambda x: 10, mode='any') |
|
436 self.assertEqual(selector(None, req, rset=rset), 10) |
|
437 |
|
438 def test_rql_condition_entity(self): |
|
439 req = self.request() |
|
440 selector = rql_condition('X identity U') |
|
441 rset = req.user.as_rset() |
|
442 self.assertEqual(selector(None, req, rset=rset), 1) |
|
443 self.assertEqual(selector(None, req, entity=req.user), 1) |
|
444 self.assertEqual(selector(None, req), 0) |
|
445 |
|
446 def test_rql_condition_user(self): |
|
447 req = self.request() |
|
448 selector = rql_condition('U login "admin"', user_condition=True) |
|
449 self.assertEqual(selector(None, req), 1) |
|
450 selector = rql_condition('U login "toto"', user_condition=True) |
|
451 self.assertEqual(selector(None, req), 0) |
|
452 |
|
453 if __name__ == '__main__': |
|
454 unittest_main() |
|
455 |
|