test/unittest_predicates.py
changeset 8220 e4ea285510e5
parent 8190 2a3c1b787688
child 8694 d901c36bcfce
child 8858 51fdbbbd07b2
equal deleted inserted replaced
8219:fb61698f93fc 8220:e4ea285510e5
       
     1 # copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     3 #
       
     4 # This file is part of CubicWeb.
       
     5 #
       
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     7 # terms of the GNU Lesser General Public License as published by the Free
       
     8 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     9 # any later version.
       
    10 #
       
    11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    14 # details.
       
    15 #
       
    16 # You should have received a copy of the GNU Lesser General Public License along
       
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    18 """unit tests for selectors mechanism"""
       
    19 from __future__ import with_statement
       
    20 
       
    21 from operator import eq, lt, le, gt
       
    22 from logilab.common.testlib import TestCase, unittest_main
       
    23 
       
    24 from cubicweb import Binary
       
    25 from cubicweb.devtools.testlib import CubicWebTC
       
    26 from cubicweb.predicates import (is_instance, adaptable, match_kwargs, match_user_groups,
       
    27                                 multi_lines_rset, score_entity, is_in_state,
       
    28                                 rql_condition, relation_possible)
       
    29 from cubicweb.selectors import on_transition # XXX on_transition is deprecated
       
    30 from cubicweb.web import action
       
    31 
       
    32 
       
    33 
       
    34 class ImplementsSelectorTC(CubicWebTC):
       
    35     def test_etype_priority(self):
       
    36         req = self.request()
       
    37         f = req.create_entity('File', data_name=u'hop.txt', data=Binary('hop'))
       
    38         rset = f.as_rset()
       
    39         anyscore = is_instance('Any')(f.__class__, req, rset=rset)
       
    40         idownscore = adaptable('IDownloadable')(f.__class__, req, rset=rset)
       
    41         self.assertTrue(idownscore > anyscore, (idownscore, anyscore))
       
    42         filescore = is_instance('File')(f.__class__, req, rset=rset)
       
    43         self.assertTrue(filescore > idownscore, (filescore, idownscore))
       
    44 
       
    45     def test_etype_inheritance_no_yams_inheritance(self):
       
    46         cls = self.vreg['etypes'].etype_class('Personne')
       
    47         self.assertFalse(is_instance('Societe').score_class(cls, self.request()))
       
    48 
       
    49     def test_yams_inheritance(self):
       
    50         cls = self.vreg['etypes'].etype_class('Transition')
       
    51         self.assertEqual(is_instance('BaseTransition').score_class(cls, self.request()),
       
    52                           3)
       
    53 
       
    54     def test_outer_join(self):
       
    55         req = self.request()
       
    56         rset = req.execute('Any U,B WHERE B? bookmarked_by U, U login "anon"')
       
    57         self.assertEqual(is_instance('Bookmark')(None, req, rset=rset, row=0, col=1),
       
    58                          0)
       
    59 
       
    60 
       
    61 class WorkflowSelectorTC(CubicWebTC):
       
    62     def _commit(self):
       
    63         self.commit()
       
    64         self.wf_entity.cw_clear_all_caches()
       
    65 
       
    66     def setup_database(self):
       
    67         wf = self.shell().add_workflow("wf_test", 'StateFull', default=True)
       
    68         created   = wf.add_state('created', initial=True)
       
    69         validated = wf.add_state('validated')
       
    70         abandoned = wf.add_state('abandoned')
       
    71         wf.add_transition('validate', created, validated, ('managers',))
       
    72         wf.add_transition('forsake', (created, validated,), abandoned, ('managers',))
       
    73 
       
    74     def setUp(self):
       
    75         super(WorkflowSelectorTC, self).setUp()
       
    76         self.req = self.request()
       
    77         self.wf_entity = self.req.create_entity('StateFull', name=u'')
       
    78         self.rset = self.wf_entity.as_rset()
       
    79         self.adapter = self.wf_entity.cw_adapt_to('IWorkflowable')
       
    80         self._commit()
       
    81         self.assertEqual(self.adapter.state, 'created')
       
    82         # enable debug mode to state/transition validation on the fly
       
    83         self.vreg.config.debugmode = True
       
    84 
       
    85     def tearDown(self):
       
    86         self.vreg.config.debugmode = False
       
    87         super(WorkflowSelectorTC, self).tearDown()
       
    88 
       
    89     def test_is_in_state(self):
       
    90         for state in ('created', 'validated', 'abandoned'):
       
    91             selector = is_in_state(state)
       
    92             self.assertEqual(selector(None, self.req, rset=self.rset),
       
    93                              state=="created")
       
    94 
       
    95         self.adapter.fire_transition('validate')
       
    96         self._commit()
       
    97         self.assertEqual(self.adapter.state, 'validated')
       
    98 
       
    99         selector = is_in_state('created')
       
   100         self.assertEqual(selector(None, self.req, rset=self.rset), 0)
       
   101         selector = is_in_state('validated')
       
   102         self.assertEqual(selector(None, self.req, rset=self.rset), 1)
       
   103         selector = is_in_state('validated', 'abandoned')
       
   104         self.assertEqual(selector(None, self.req, rset=self.rset), 1)
       
   105         selector = is_in_state('abandoned')
       
   106         self.assertEqual(selector(None, self.req, rset=self.rset), 0)
       
   107 
       
   108         self.adapter.fire_transition('forsake')
       
   109         self._commit()
       
   110         self.assertEqual(self.adapter.state, 'abandoned')
       
   111 
       
   112         selector = is_in_state('created')
       
   113         self.assertEqual(selector(None, self.req, rset=self.rset), 0)
       
   114         selector = is_in_state('validated')
       
   115         self.assertEqual(selector(None, self.req, rset=self.rset), 0)
       
   116         selector = is_in_state('validated', 'abandoned')
       
   117         self.assertEqual(selector(None, self.req, rset=self.rset), 1)
       
   118         self.assertEqual(self.adapter.state, 'abandoned')
       
   119         self.assertEqual(selector(None, self.req, rset=self.rset), 1)
       
   120 
       
   121     def test_is_in_state_unvalid_names(self):
       
   122         selector = is_in_state("unknown")
       
   123         with self.assertRaises(ValueError) as cm:
       
   124             selector(None, self.req, rset=self.rset)
       
   125         self.assertEqual(str(cm.exception),
       
   126                          "wf_test: unknown state(s): unknown")
       
   127         selector = is_in_state("weird", "unknown", "created", "weird")
       
   128         with self.assertRaises(ValueError) as cm:
       
   129             selector(None, self.req, rset=self.rset)
       
   130         self.assertEqual(str(cm.exception),
       
   131                          "wf_test: unknown state(s): unknown,weird")
       
   132 
       
   133     def test_on_transition(self):
       
   134         for transition in ('validate', 'forsake'):
       
   135             selector = on_transition(transition)
       
   136             self.assertEqual(selector(None, self.req, rset=self.rset), 0)
       
   137 
       
   138         self.adapter.fire_transition('validate')
       
   139         self._commit()
       
   140         self.assertEqual(self.adapter.state, 'validated')
       
   141 
       
   142         selector = on_transition("validate")
       
   143         self.assertEqual(selector(None, self.req, rset=self.rset), 1)
       
   144         selector = on_transition("validate", "forsake")
       
   145         self.assertEqual(selector(None, self.req, rset=self.rset), 1)
       
   146         selector = on_transition("forsake")
       
   147         self.assertEqual(selector(None, self.req, rset=self.rset), 0)
       
   148 
       
   149         self.adapter.fire_transition('forsake')
       
   150         self._commit()
       
   151         self.assertEqual(self.adapter.state, 'abandoned')
       
   152 
       
   153         selector = on_transition("validate")
       
   154         self.assertEqual(selector(None, self.req, rset=self.rset), 0)
       
   155         selector = on_transition("validate", "forsake")
       
   156         self.assertEqual(selector(None, self.req, rset=self.rset), 1)
       
   157         selector = on_transition("forsake")
       
   158         self.assertEqual(selector(None, self.req, rset=self.rset), 1)
       
   159 
       
   160     def test_on_transition_unvalid_names(self):
       
   161         selector = on_transition("unknown")
       
   162         with self.assertRaises(ValueError) as cm:
       
   163             selector(None, self.req, rset=self.rset)
       
   164         self.assertEqual(str(cm.exception),
       
   165                          "wf_test: unknown transition(s): unknown")
       
   166         selector = on_transition("weird", "unknown", "validate", "weird")
       
   167         with self.assertRaises(ValueError) as cm:
       
   168             selector(None, self.req, rset=self.rset)
       
   169         self.assertEqual(str(cm.exception),
       
   170                          "wf_test: unknown transition(s): unknown,weird")
       
   171 
       
   172     def test_on_transition_with_no_effect(self):
       
   173         """selector will not be triggered with `change_state()`"""
       
   174         self.adapter.change_state('validated')
       
   175         self._commit()
       
   176         self.assertEqual(self.adapter.state, 'validated')
       
   177 
       
   178         selector = on_transition("validate")
       
   179         self.assertEqual(selector(None, self.req, rset=self.rset), 0)
       
   180         selector = on_transition("validate", "forsake")
       
   181         self.assertEqual(selector(None, self.req, rset=self.rset), 0)
       
   182         selector = on_transition("forsake")
       
   183         self.assertEqual(selector(None, self.req, rset=self.rset), 0)
       
   184 
       
   185 
       
   186 class RelationPossibleTC(CubicWebTC):
       
   187 
       
   188     def test_rqlst_1(self):
       
   189         req = self.request()
       
   190         selector = relation_possible('in_group')
       
   191         select = self.vreg.parse(req, 'Any X WHERE X is CWUser').children[0]
       
   192         score = selector(None, req, rset=1,
       
   193                          select=select, filtered_variable=select.defined_vars['X'])
       
   194         self.assertEqual(score, 1)
       
   195 
       
   196     def test_rqlst_2(self):
       
   197         req = self.request()
       
   198         selector = relation_possible('in_group')
       
   199         select = self.vreg.parse(req, 'Any 1, COUNT(X) WHERE X is CWUser, X creation_date XD, '
       
   200                                  'Y creation_date YD, Y is CWGroup '
       
   201                                  'HAVING DAY(XD)=DAY(YD)').children[0]
       
   202         score = selector(None, req, rset=1,
       
   203                          select=select, filtered_variable=select.defined_vars['X'])
       
   204         self.assertEqual(score, 1)
       
   205 
       
   206 
       
   207 class MatchUserGroupsTC(CubicWebTC):
       
   208     def test_owners_group(self):
       
   209         """tests usage of 'owners' group with match_user_group"""
       
   210         class SomeAction(action.Action):
       
   211             __regid__ = 'yo'
       
   212             category = 'foo'
       
   213             __select__ = match_user_groups('owners')
       
   214         self.vreg._loadedmods[__name__] = {}
       
   215         self.vreg.register(SomeAction)
       
   216         SomeAction.__registered__(self.vreg['actions'])
       
   217         self.assertTrue(SomeAction in self.vreg['actions']['yo'], self.vreg['actions'])
       
   218         try:
       
   219             # login as a simple user
       
   220             req = self.request()
       
   221             self.create_user(req, 'john')
       
   222             self.login('john')
       
   223             # it should not be possible to use SomeAction not owned objects
       
   224             req = self.request()
       
   225             rset = req.execute('Any G WHERE G is CWGroup, G name "managers"')
       
   226             self.assertFalse('yo' in dict(self.pactions(req, rset)))
       
   227             # insert a new card, and check that we can use SomeAction on our object
       
   228             self.execute('INSERT Card C: C title "zoubidou"')
       
   229             self.commit()
       
   230             req = self.request()
       
   231             rset = req.execute('Card C WHERE C title "zoubidou"')
       
   232             self.assertTrue('yo' in dict(self.pactions(req, rset)), self.pactions(req, rset))
       
   233             # make sure even managers can't use the action
       
   234             self.restore_connection()
       
   235             req = self.request()
       
   236             rset = req.execute('Card C WHERE C title "zoubidou"')
       
   237             self.assertFalse('yo' in dict(self.pactions(req, rset)))
       
   238         finally:
       
   239             del self.vreg[SomeAction.__registry__][SomeAction.__regid__]
       
   240 
       
   241 
       
   242 class MultiLinesRsetSelectorTC(CubicWebTC):
       
   243     def setUp(self):
       
   244         super(MultiLinesRsetSelectorTC, self).setUp()
       
   245         self.req = self.request()
       
   246         self.req.execute('INSERT CWGroup G: G name "group1"')
       
   247         self.req.execute('INSERT CWGroup G: G name "group2"')
       
   248         self.commit()
       
   249         self.rset = self.req.execute('Any G WHERE G is CWGroup')
       
   250 
       
   251     def test_default_op_in_selector(self):
       
   252         expected = len(self.rset)
       
   253         selector = multi_lines_rset(expected)
       
   254         self.assertEqual(selector(None, self.req, rset=self.rset), 1)
       
   255         self.assertEqual(selector(None, self.req, None), 0)
       
   256         selector = multi_lines_rset(expected + 1)
       
   257         self.assertEqual(selector(None, self.req, rset=self.rset), 0)
       
   258         self.assertEqual(selector(None, self.req, None), 0)
       
   259         selector = multi_lines_rset(expected - 1)
       
   260         self.assertEqual(selector(None, self.req, rset=self.rset), 0)
       
   261         self.assertEqual(selector(None, self.req, None), 0)
       
   262 
       
   263     def test_without_rset(self):
       
   264         expected = len(self.rset)
       
   265         selector = multi_lines_rset(expected)
       
   266         self.assertEqual(selector(None, self.req, None), 0)
       
   267         selector = multi_lines_rset(expected + 1)
       
   268         self.assertEqual(selector(None, self.req, None), 0)
       
   269         selector = multi_lines_rset(expected - 1)
       
   270         self.assertEqual(selector(None, self.req, None), 0)
       
   271 
       
   272     def test_with_operators(self):
       
   273         expected = len(self.rset)
       
   274 
       
   275         # Format     'expected', 'operator', 'assert'
       
   276         testdata = (( expected,         eq,        1),
       
   277                     ( expected+1,       eq,        0),
       
   278                     ( expected-1,       eq,        0),
       
   279                     ( expected,         le,        1),
       
   280                     ( expected+1,       le,        1),
       
   281                     ( expected-1,       le,        0),
       
   282                     ( expected-1,       gt,        1),
       
   283                     ( expected,         gt,        0),
       
   284                     ( expected+1,       gt,        0),
       
   285                     ( expected+1,       lt,        1),
       
   286                     ( expected,         lt,        0),
       
   287                     ( expected-1,       lt,        0))
       
   288 
       
   289         for (expected, operator, assertion) in testdata:
       
   290             selector = multi_lines_rset(expected, operator)
       
   291             yield self.assertEqual, selector(None, self.req, rset=self.rset), assertion
       
   292 
       
   293     def test_match_kwargs_default(self):
       
   294         selector = match_kwargs( set( ('a', 'b') ) )
       
   295         self.assertEqual(selector(None, None, a=1, b=2), 2)
       
   296         self.assertEqual(selector(None, None, a=1), 0)
       
   297         self.assertEqual(selector(None, None, c=1), 0)
       
   298         self.assertEqual(selector(None, None, a=1, c=1), 0)
       
   299 
       
   300     def test_match_kwargs_any(self):
       
   301         selector = match_kwargs( set( ('a', 'b') ), mode='any')
       
   302         self.assertEqual(selector(None, None, a=1, b=2), 2)
       
   303         self.assertEqual(selector(None, None, a=1), 1)
       
   304         self.assertEqual(selector(None, None, c=1), 0)
       
   305         self.assertEqual(selector(None, None, a=1, c=1), 1)
       
   306 
       
   307 
       
   308 class ScoreEntitySelectorTC(CubicWebTC):
       
   309 
       
   310     def test_intscore_entity_selector(self):
       
   311         req = self.request()
       
   312         rset = req.execute('Any E WHERE E eid 1')
       
   313         selector = score_entity(lambda x: None)
       
   314         self.assertEqual(selector(None, req, rset=rset), 0)
       
   315         selector = score_entity(lambda x: "something")
       
   316         self.assertEqual(selector(None, req, rset=rset), 1)
       
   317         selector = score_entity(lambda x: object)
       
   318         self.assertEqual(selector(None, req, rset=rset), 1)
       
   319         rset = req.execute('Any G LIMIT 2 WHERE G is CWGroup')
       
   320         selector = score_entity(lambda x: 10)
       
   321         self.assertEqual(selector(None, req, rset=rset), 20)
       
   322         selector = score_entity(lambda x: 10, mode='any')
       
   323         self.assertEqual(selector(None, req, rset=rset), 10)
       
   324 
       
   325     def test_rql_condition_entity(self):
       
   326         req = self.request()
       
   327         selector = rql_condition('X identity U')
       
   328         rset = req.user.as_rset()
       
   329         self.assertEqual(selector(None, req, rset=rset), 1)
       
   330         self.assertEqual(selector(None, req, entity=req.user), 1)
       
   331         self.assertEqual(selector(None, req), 0)
       
   332 
       
   333     def test_rql_condition_user(self):
       
   334         req = self.request()
       
   335         selector = rql_condition('U login "admin"', user_condition=True)
       
   336         self.assertEqual(selector(None, req), 1)
       
   337         selector = rql_condition('U login "toto"', user_condition=True)
       
   338         self.assertEqual(selector(None, req), 0)
       
   339 
       
   340 if __name__ == '__main__':
       
   341     unittest_main()
       
   342