test/unittest_selectors.py
brancholdstable
changeset 8746 88c71ad83d47
parent 8507 0c111b232927
parent 8742 bd374bd906f3
child 8818 d8b0984c923c
equal deleted inserted replaced
8507:0c111b232927 8746:88c71ad83d47
     1 # copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     3 #
       
     4 # This file is part of CubicWeb.
       
     5 #
       
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     7 # terms of the GNU Lesser General Public License as published by the Free
       
     8 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     9 # any later version.
       
    10 #
       
    11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    14 # details.
       
    15 #
       
    16 # You should have received a copy of the GNU Lesser General Public License along
       
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    18 """unit tests for selectors mechanism"""
       
    19 from __future__ import with_statement
       
    20 
       
    21 from operator import eq, lt, le, gt
       
    22 from logilab.common.testlib import TestCase, unittest_main
       
    23 
       
    24 from cubicweb import Binary
       
    25 from cubicweb.devtools.testlib import CubicWebTC
       
    26 from cubicweb.appobject import Selector, AndSelector, OrSelector
       
    27 from cubicweb.selectors import (is_instance, adaptable, match_kwargs, match_user_groups,
       
    28                                 multi_lines_rset, score_entity, is_in_state,
       
    29                                 on_transition, rql_condition, relation_possible)
       
    30 from cubicweb.web import action
       
    31 
       
    32 
       
    33 class _1_(Selector):
       
    34     def __call__(self, *args, **kwargs):
       
    35         return 1
       
    36 
       
    37 class _0_(Selector):
       
    38     def __call__(self, *args, **kwargs):
       
    39         return 0
       
    40 
       
    41 def _2_(*args, **kwargs):
       
    42     return 2
       
    43 
       
    44 
       
    45 class SelectorsTC(TestCase):
       
    46     def test_basic_and(self):
       
    47         selector = _1_() & _1_()
       
    48         self.assertEqual(selector(None), 2)
       
    49         selector = _1_() & _0_()
       
    50         self.assertEqual(selector(None), 0)
       
    51         selector = _0_() & _1_()
       
    52         self.assertEqual(selector(None), 0)
       
    53 
       
    54     def test_basic_or(self):
       
    55         selector = _1_() | _1_()
       
    56         self.assertEqual(selector(None), 1)
       
    57         selector = _1_() | _0_()
       
    58         self.assertEqual(selector(None), 1)
       
    59         selector = _0_() | _1_()
       
    60         self.assertEqual(selector(None), 1)
       
    61         selector = _0_() | _0_()
       
    62         self.assertEqual(selector(None), 0)
       
    63 
       
    64     def test_selector_and_function(self):
       
    65         selector = _1_() & _2_
       
    66         self.assertEqual(selector(None), 3)
       
    67         selector = _2_ & _1_()
       
    68         self.assertEqual(selector(None), 3)
       
    69 
       
    70     def test_three_and(self):
       
    71         selector = _1_() & _1_() & _1_()
       
    72         self.assertEqual(selector(None), 3)
       
    73         selector = _1_() & _0_() & _1_()
       
    74         self.assertEqual(selector(None), 0)
       
    75         selector = _0_() & _1_() & _1_()
       
    76         self.assertEqual(selector(None), 0)
       
    77 
       
    78     def test_three_or(self):
       
    79         selector = _1_() | _1_() | _1_()
       
    80         self.assertEqual(selector(None), 1)
       
    81         selector = _1_() | _0_() | _1_()
       
    82         self.assertEqual(selector(None), 1)
       
    83         selector = _0_() | _1_() | _1_()
       
    84         self.assertEqual(selector(None), 1)
       
    85         selector = _0_() | _0_() | _0_()
       
    86         self.assertEqual(selector(None), 0)
       
    87 
       
    88     def test_composition(self):
       
    89         selector = (_1_() & _1_()) & (_1_() & _1_())
       
    90         self.assertTrue(isinstance(selector, AndSelector))
       
    91         self.assertEqual(len(selector.selectors), 4)
       
    92         self.assertEqual(selector(None), 4)
       
    93         selector = (_1_() & _0_()) | (_1_() & _1_())
       
    94         self.assertTrue(isinstance(selector, OrSelector))
       
    95         self.assertEqual(len(selector.selectors), 2)
       
    96         self.assertEqual(selector(None), 2)
       
    97 
       
    98     def test_search_selectors(self):
       
    99         sel = is_instance('something')
       
   100         self.assertIs(sel.search_selector(is_instance), sel)
       
   101         csel = AndSelector(sel, Selector())
       
   102         self.assertIs(csel.search_selector(is_instance), sel)
       
   103         csel = AndSelector(Selector(), sel)
       
   104         self.assertIs(csel.search_selector(is_instance), sel)
       
   105         self.assertIs(csel.search_selector((AndSelector, OrSelector)), csel)
       
   106         self.assertIs(csel.search_selector((OrSelector, AndSelector)), csel)
       
   107         self.assertIs(csel.search_selector((is_instance, score_entity)),  sel)
       
   108         self.assertIs(csel.search_selector((score_entity, is_instance)), sel)
       
   109 
       
   110     def test_inplace_and(self):
       
   111         selector = _1_()
       
   112         selector &= _1_()
       
   113         selector &= _1_()
       
   114         self.assertEqual(selector(None), 3)
       
   115         selector = _1_()
       
   116         selector &= _0_()
       
   117         selector &= _1_()
       
   118         self.assertEqual(selector(None), 0)
       
   119         selector = _0_()
       
   120         selector &= _1_()
       
   121         selector &= _1_()
       
   122         self.assertEqual(selector(None), 0)
       
   123         selector = _0_()
       
   124         selector &= _0_()
       
   125         selector &= _0_()
       
   126         self.assertEqual(selector(None), 0)
       
   127 
       
   128     def test_inplace_or(self):
       
   129         selector = _1_()
       
   130         selector |= _1_()
       
   131         selector |= _1_()
       
   132         self.assertEqual(selector(None), 1)
       
   133         selector = _1_()
       
   134         selector |= _0_()
       
   135         selector |= _1_()
       
   136         self.assertEqual(selector(None), 1)
       
   137         selector = _0_()
       
   138         selector |= _1_()
       
   139         selector |= _1_()
       
   140         self.assertEqual(selector(None), 1)
       
   141         selector = _0_()
       
   142         selector |= _0_()
       
   143         selector |= _0_()
       
   144         self.assertEqual(selector(None), 0)
       
   145 
       
   146 
       
   147 class ImplementsSelectorTC(CubicWebTC):
       
   148     def test_etype_priority(self):
       
   149         req = self.request()
       
   150         f = req.create_entity('File', data_name=u'hop.txt', data=Binary('hop'))
       
   151         rset = f.as_rset()
       
   152         anyscore = is_instance('Any')(f.__class__, req, rset=rset)
       
   153         idownscore = adaptable('IDownloadable')(f.__class__, req, rset=rset)
       
   154         self.assertTrue(idownscore > anyscore, (idownscore, anyscore))
       
   155         filescore = is_instance('File')(f.__class__, req, rset=rset)
       
   156         self.assertTrue(filescore > idownscore, (filescore, idownscore))
       
   157 
       
   158     def test_etype_inheritance_no_yams_inheritance(self):
       
   159         cls = self.vreg['etypes'].etype_class('Personne')
       
   160         self.assertFalse(is_instance('Societe').score_class(cls, self.request()))
       
   161 
       
   162     def test_yams_inheritance(self):
       
   163         cls = self.vreg['etypes'].etype_class('Transition')
       
   164         self.assertEqual(is_instance('BaseTransition').score_class(cls, self.request()),
       
   165                           3)
       
   166 
       
   167     def test_outer_join(self):
       
   168         req = self.request()
       
   169         rset = req.execute('Any U,B WHERE B? bookmarked_by U, U login "anon"')
       
   170         self.assertEqual(is_instance('Bookmark')(None, req, rset=rset, row=0, col=1),
       
   171                          0)
       
   172 
       
   173 
       
   174 class WorkflowSelectorTC(CubicWebTC):
       
   175     def _commit(self):
       
   176         self.commit()
       
   177         self.wf_entity.cw_clear_all_caches()
       
   178 
       
   179     def setup_database(self):
       
   180         wf = self.shell().add_workflow("wf_test", 'StateFull', default=True)
       
   181         created   = wf.add_state('created', initial=True)
       
   182         validated = wf.add_state('validated')
       
   183         abandoned = wf.add_state('abandoned')
       
   184         wf.add_transition('validate', created, validated, ('managers',))
       
   185         wf.add_transition('forsake', (created, validated,), abandoned, ('managers',))
       
   186 
       
   187     def setUp(self):
       
   188         super(WorkflowSelectorTC, self).setUp()
       
   189         self.req = self.request()
       
   190         self.wf_entity = self.req.create_entity('StateFull', name=u'')
       
   191         self.rset = self.wf_entity.as_rset()
       
   192         self.adapter = self.wf_entity.cw_adapt_to('IWorkflowable')
       
   193         self._commit()
       
   194         self.assertEqual(self.adapter.state, 'created')
       
   195         # enable debug mode to state/transition validation on the fly
       
   196         self.vreg.config.debugmode = True
       
   197 
       
   198     def tearDown(self):
       
   199         self.vreg.config.debugmode = False
       
   200         super(WorkflowSelectorTC, self).tearDown()
       
   201 
       
   202     def test_is_in_state(self):
       
   203         for state in ('created', 'validated', 'abandoned'):
       
   204             selector = is_in_state(state)
       
   205             self.assertEqual(selector(None, self.req, rset=self.rset),
       
   206                              state=="created")
       
   207 
       
   208         self.adapter.fire_transition('validate')
       
   209         self._commit()
       
   210         self.assertEqual(self.adapter.state, 'validated')
       
   211 
       
   212         selector = is_in_state('created')
       
   213         self.assertEqual(selector(None, self.req, rset=self.rset), 0)
       
   214         selector = is_in_state('validated')
       
   215         self.assertEqual(selector(None, self.req, rset=self.rset), 1)
       
   216         selector = is_in_state('validated', 'abandoned')
       
   217         self.assertEqual(selector(None, self.req, rset=self.rset), 1)
       
   218         selector = is_in_state('abandoned')
       
   219         self.assertEqual(selector(None, self.req, rset=self.rset), 0)
       
   220 
       
   221         self.adapter.fire_transition('forsake')
       
   222         self._commit()
       
   223         self.assertEqual(self.adapter.state, 'abandoned')
       
   224 
       
   225         selector = is_in_state('created')
       
   226         self.assertEqual(selector(None, self.req, rset=self.rset), 0)
       
   227         selector = is_in_state('validated')
       
   228         self.assertEqual(selector(None, self.req, rset=self.rset), 0)
       
   229         selector = is_in_state('validated', 'abandoned')
       
   230         self.assertEqual(selector(None, self.req, rset=self.rset), 1)
       
   231         self.assertEqual(self.adapter.state, 'abandoned')
       
   232         self.assertEqual(selector(None, self.req, rset=self.rset), 1)
       
   233 
       
   234     def test_is_in_state_unvalid_names(self):
       
   235         selector = is_in_state("unknown")
       
   236         with self.assertRaises(ValueError) as cm:
       
   237             selector(None, self.req, rset=self.rset)
       
   238         self.assertEqual(str(cm.exception),
       
   239                          "wf_test: unknown state(s): unknown")
       
   240         selector = is_in_state("weird", "unknown", "created", "weird")
       
   241         with self.assertRaises(ValueError) as cm:
       
   242             selector(None, self.req, rset=self.rset)
       
   243         self.assertEqual(str(cm.exception),
       
   244                          "wf_test: unknown state(s): unknown,weird")
       
   245 
       
   246     def test_on_transition(self):
       
   247         for transition in ('validate', 'forsake'):
       
   248             selector = on_transition(transition)
       
   249             self.assertEqual(selector(None, self.req, rset=self.rset), 0)
       
   250 
       
   251         self.adapter.fire_transition('validate')
       
   252         self._commit()
       
   253         self.assertEqual(self.adapter.state, 'validated')
       
   254 
       
   255         selector = on_transition("validate")
       
   256         self.assertEqual(selector(None, self.req, rset=self.rset), 1)
       
   257         selector = on_transition("validate", "forsake")
       
   258         self.assertEqual(selector(None, self.req, rset=self.rset), 1)
       
   259         selector = on_transition("forsake")
       
   260         self.assertEqual(selector(None, self.req, rset=self.rset), 0)
       
   261 
       
   262         self.adapter.fire_transition('forsake')
       
   263         self._commit()
       
   264         self.assertEqual(self.adapter.state, 'abandoned')
       
   265 
       
   266         selector = on_transition("validate")
       
   267         self.assertEqual(selector(None, self.req, rset=self.rset), 0)
       
   268         selector = on_transition("validate", "forsake")
       
   269         self.assertEqual(selector(None, self.req, rset=self.rset), 1)
       
   270         selector = on_transition("forsake")
       
   271         self.assertEqual(selector(None, self.req, rset=self.rset), 1)
       
   272 
       
   273     def test_on_transition_unvalid_names(self):
       
   274         selector = on_transition("unknown")
       
   275         with self.assertRaises(ValueError) as cm:
       
   276             selector(None, self.req, rset=self.rset)
       
   277         self.assertEqual(str(cm.exception),
       
   278                          "wf_test: unknown transition(s): unknown")
       
   279         selector = on_transition("weird", "unknown", "validate", "weird")
       
   280         with self.assertRaises(ValueError) as cm:
       
   281             selector(None, self.req, rset=self.rset)
       
   282         self.assertEqual(str(cm.exception),
       
   283                          "wf_test: unknown transition(s): unknown,weird")
       
   284 
       
   285     def test_on_transition_with_no_effect(self):
       
   286         """selector will not be triggered with `change_state()`"""
       
   287         self.adapter.change_state('validated')
       
   288         self._commit()
       
   289         self.assertEqual(self.adapter.state, 'validated')
       
   290 
       
   291         selector = on_transition("validate")
       
   292         self.assertEqual(selector(None, self.req, rset=self.rset), 0)
       
   293         selector = on_transition("validate", "forsake")
       
   294         self.assertEqual(selector(None, self.req, rset=self.rset), 0)
       
   295         selector = on_transition("forsake")
       
   296         self.assertEqual(selector(None, self.req, rset=self.rset), 0)
       
   297 
       
   298 
       
   299 class RelationPossibleTC(CubicWebTC):
       
   300 
       
   301     def test_rqlst_1(self):
       
   302         req = self.request()
       
   303         selector = relation_possible('in_group')
       
   304         select = self.vreg.parse(req, 'Any X WHERE X is CWUser').children[0]
       
   305         score = selector(None, req, rset=1,
       
   306                          select=select, filtered_variable=select.defined_vars['X'])
       
   307         self.assertEqual(score, 1)
       
   308 
       
   309     def test_rqlst_2(self):
       
   310         req = self.request()
       
   311         selector = relation_possible('in_group')
       
   312         select = self.vreg.parse(req, 'Any 1, COUNT(X) WHERE X is CWUser, X creation_date XD, '
       
   313                                  'Y creation_date YD, Y is CWGroup '
       
   314                                  'HAVING DAY(XD)=DAY(YD)').children[0]
       
   315         score = selector(None, req, rset=1,
       
   316                          select=select, filtered_variable=select.defined_vars['X'])
       
   317         self.assertEqual(score, 1)
       
   318 
       
   319 
       
   320 class MatchUserGroupsTC(CubicWebTC):
       
   321     def test_owners_group(self):
       
   322         """tests usage of 'owners' group with match_user_group"""
       
   323         class SomeAction(action.Action):
       
   324             __regid__ = 'yo'
       
   325             category = 'foo'
       
   326             __select__ = match_user_groups('owners')
       
   327         self.vreg._loadedmods[__name__] = {}
       
   328         self.vreg.register(SomeAction)
       
   329         SomeAction.__registered__(self.vreg['actions'])
       
   330         self.assertTrue(SomeAction in self.vreg['actions']['yo'], self.vreg['actions'])
       
   331         try:
       
   332             # login as a simple user
       
   333             req = self.request()
       
   334             self.create_user(req, 'john')
       
   335             self.login('john')
       
   336             # it should not be possible to use SomeAction not owned objects
       
   337             req = self.request()
       
   338             rset = req.execute('Any G WHERE G is CWGroup, G name "managers"')
       
   339             self.assertFalse('yo' in dict(self.pactions(req, rset)))
       
   340             # insert a new card, and check that we can use SomeAction on our object
       
   341             self.execute('INSERT Card C: C title "zoubidou"')
       
   342             self.commit()
       
   343             req = self.request()
       
   344             rset = req.execute('Card C WHERE C title "zoubidou"')
       
   345             self.assertTrue('yo' in dict(self.pactions(req, rset)), self.pactions(req, rset))
       
   346             # make sure even managers can't use the action
       
   347             self.restore_connection()
       
   348             req = self.request()
       
   349             rset = req.execute('Card C WHERE C title "zoubidou"')
       
   350             self.assertFalse('yo' in dict(self.pactions(req, rset)))
       
   351         finally:
       
   352             del self.vreg[SomeAction.__registry__][SomeAction.__regid__]
       
   353 
       
   354 
       
   355 class MultiLinesRsetSelectorTC(CubicWebTC):
       
   356     def setUp(self):
       
   357         super(MultiLinesRsetSelectorTC, self).setUp()
       
   358         self.req = self.request()
       
   359         self.req.execute('INSERT CWGroup G: G name "group1"')
       
   360         self.req.execute('INSERT CWGroup G: G name "group2"')
       
   361         self.commit()
       
   362         self.rset = self.req.execute('Any G WHERE G is CWGroup')
       
   363 
       
   364     def test_default_op_in_selector(self):
       
   365         expected = len(self.rset)
       
   366         selector = multi_lines_rset(expected)
       
   367         self.assertEqual(selector(None, self.req, rset=self.rset), 1)
       
   368         self.assertEqual(selector(None, self.req, None), 0)
       
   369         selector = multi_lines_rset(expected + 1)
       
   370         self.assertEqual(selector(None, self.req, rset=self.rset), 0)
       
   371         self.assertEqual(selector(None, self.req, None), 0)
       
   372         selector = multi_lines_rset(expected - 1)
       
   373         self.assertEqual(selector(None, self.req, rset=self.rset), 0)
       
   374         self.assertEqual(selector(None, self.req, None), 0)
       
   375 
       
   376     def test_without_rset(self):
       
   377         expected = len(self.rset)
       
   378         selector = multi_lines_rset(expected)
       
   379         self.assertEqual(selector(None, self.req, None), 0)
       
   380         selector = multi_lines_rset(expected + 1)
       
   381         self.assertEqual(selector(None, self.req, None), 0)
       
   382         selector = multi_lines_rset(expected - 1)
       
   383         self.assertEqual(selector(None, self.req, None), 0)
       
   384 
       
   385     def test_with_operators(self):
       
   386         expected = len(self.rset)
       
   387 
       
   388         # Format     'expected', 'operator', 'assert'
       
   389         testdata = (( expected,         eq,        1),
       
   390                     ( expected+1,       eq,        0),
       
   391                     ( expected-1,       eq,        0),
       
   392                     ( expected,         le,        1),
       
   393                     ( expected+1,       le,        1),
       
   394                     ( expected-1,       le,        0),
       
   395                     ( expected-1,       gt,        1),
       
   396                     ( expected,         gt,        0),
       
   397                     ( expected+1,       gt,        0),
       
   398                     ( expected+1,       lt,        1),
       
   399                     ( expected,         lt,        0),
       
   400                     ( expected-1,       lt,        0))
       
   401 
       
   402         for (expected, operator, assertion) in testdata:
       
   403             selector = multi_lines_rset(expected, operator)
       
   404             yield self.assertEqual, selector(None, self.req, rset=self.rset), assertion
       
   405 
       
   406     def test_match_kwargs_default(self):
       
   407         selector = match_kwargs( set( ('a', 'b') ) )
       
   408         self.assertEqual(selector(None, None, a=1, b=2), 2)
       
   409         self.assertEqual(selector(None, None, a=1), 0)
       
   410         self.assertEqual(selector(None, None, c=1), 0)
       
   411         self.assertEqual(selector(None, None, a=1, c=1), 0)
       
   412 
       
   413     def test_match_kwargs_any(self):
       
   414         selector = match_kwargs( set( ('a', 'b') ), mode='any')
       
   415         self.assertEqual(selector(None, None, a=1, b=2), 2)
       
   416         self.assertEqual(selector(None, None, a=1), 1)
       
   417         self.assertEqual(selector(None, None, c=1), 0)
       
   418         self.assertEqual(selector(None, None, a=1, c=1), 1)
       
   419 
       
   420 
       
   421 class ScoreEntitySelectorTC(CubicWebTC):
       
   422 
       
   423     def test_intscore_entity_selector(self):
       
   424         req = self.request()
       
   425         rset = req.execute('Any E WHERE E eid 1')
       
   426         selector = score_entity(lambda x: None)
       
   427         self.assertEqual(selector(None, req, rset=rset), 0)
       
   428         selector = score_entity(lambda x: "something")
       
   429         self.assertEqual(selector(None, req, rset=rset), 1)
       
   430         selector = score_entity(lambda x: object)
       
   431         self.assertEqual(selector(None, req, rset=rset), 1)
       
   432         rset = req.execute('Any G LIMIT 2 WHERE G is CWGroup')
       
   433         selector = score_entity(lambda x: 10)
       
   434         self.assertEqual(selector(None, req, rset=rset), 20)
       
   435         selector = score_entity(lambda x: 10, mode='any')
       
   436         self.assertEqual(selector(None, req, rset=rset), 10)
       
   437 
       
   438     def test_rql_condition_entity(self):
       
   439         req = self.request()
       
   440         selector = rql_condition('X identity U')
       
   441         rset = req.user.as_rset()
       
   442         self.assertEqual(selector(None, req, rset=rset), 1)
       
   443         self.assertEqual(selector(None, req, entity=req.user), 1)
       
   444         self.assertEqual(selector(None, req), 0)
       
   445 
       
   446     def test_rql_condition_user(self):
       
   447         req = self.request()
       
   448         selector = rql_condition('U login "admin"', user_condition=True)
       
   449         self.assertEqual(selector(None, req), 1)
       
   450         selector = rql_condition('U login "toto"', user_condition=True)
       
   451         self.assertEqual(selector(None, req), 0)
       
   452 
       
   453 if __name__ == '__main__':
       
   454     unittest_main()
       
   455