test/unittest_selectors.py
brancholdstable
changeset 7074 e4580e5f0703
parent 6981 d1a279ece4a0
child 7244 a918f76441ce
--- a/test/unittest_selectors.py	Fri Dec 10 12:17:18 2010 +0100
+++ b/test/unittest_selectors.py	Fri Mar 11 09:46:45 2011 +0100
@@ -1,4 +1,4 @@
-# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
 #
 # This file is part of CubicWeb.
@@ -16,6 +16,7 @@
 # You should have received a copy of the GNU Lesser General Public License along
 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
 """unit tests for selectors mechanism"""
+from __future__ import with_statement
 
 from operator import eq, lt, le, gt
 from logilab.common.testlib import TestCase, unittest_main
@@ -24,10 +25,11 @@
 from cubicweb.devtools.testlib import CubicWebTC
 from cubicweb.appobject import Selector, AndSelector, OrSelector
 from cubicweb.selectors import (is_instance, adaptable, match_user_groups,
-                                multi_lines_rset)
-from cubicweb.interfaces import IDownloadable
+                                multi_lines_rset, score_entity, is_in_state,
+                                on_transition)
 from cubicweb.web import action
 
+
 class _1_(Selector):
     def __call__(self, *args, **kwargs):
         return 1
@@ -138,6 +140,35 @@
         self.assertEqual(selector(None), 0)
 
 
+class IsInStateSelectorTC(CubicWebTC):
+    def setup_database(self):
+        wf = self.shell().add_workflow("testwf", 'StateFull', default=True)
+        initial = wf.add_state(u'initial', initial=True)
+        final = wf.add_state(u'final')
+        wf.add_transition(u'forward', (initial,), final)
+
+    def test_initial_state(self):
+        req = self.request()
+        entity = req.create_entity('StateFull')
+        selector = is_in_state(u'initial')
+        self.commit()
+        score = selector(entity.__class__, None, entity=entity)
+        self.assertEqual(score, 1)
+
+    def test_final_state(self):
+        req = self.request()
+        entity = req.create_entity('StateFull')
+        selector = is_in_state(u'initial')
+        self.commit()
+        entity.cw_adapt_to('IWorkflowable').fire_transition(u'forward')
+        self.commit()
+        score = selector(entity.__class__, None, entity=entity)
+        self.assertEqual(score, 0)
+        selector = is_in_state(u'final')
+        score = selector(entity.__class__, None, entity=entity)
+        self.assertEqual(score, 1)
+
+
 class ImplementsSelectorTC(CubicWebTC):
     def test_etype_priority(self):
         req = self.request()
@@ -159,6 +190,131 @@
                           3)
 
 
+class WorkflowSelectorTC(CubicWebTC):
+    def _commit(self):
+        self.commit()
+        self.wf_entity.clear_all_caches()
+
+    def setup_database(self):
+        wf = self.shell().add_workflow("wf_test", 'StateFull', default=True)
+        created   = wf.add_state('created', initial=True)
+        validated = wf.add_state('validated')
+        abandoned = wf.add_state('abandoned')
+        wf.add_transition('validate', created, validated, ('managers',))
+        wf.add_transition('forsake', (created, validated,), abandoned, ('managers',))
+
+    def setUp(self):
+        super(WorkflowSelectorTC, self).setUp()
+        self.req = self.request()
+        self.wf_entity = self.req.create_entity('StateFull', name=u'')
+        self.rset = self.wf_entity.as_rset()
+        self.adapter = self.wf_entity.cw_adapt_to('IWorkflowable')
+        self._commit()
+        self.assertEqual(self.adapter.state, 'created')
+        # enable debug mode to state/transition validation on the fly
+        self.vreg.config.debugmode = True
+
+    def tearDown(self):
+        self.vreg.config.debugmode = False
+        super(WorkflowSelectorTC, self).tearDown()
+
+    def test_is_in_state(self):
+        for state in ('created', 'validated', 'abandoned'):
+            selector = is_in_state(state)
+            self.assertEqual(selector(None, self.req, self.rset),
+                             state=="created")
+
+        self.adapter.fire_transition('validate')
+        self._commit()
+        self.assertEqual(self.adapter.state, 'validated')
+
+        selector = is_in_state('created')
+        self.assertEqual(selector(None, self.req, self.rset), 0)
+        selector = is_in_state('validated')
+        self.assertEqual(selector(None, self.req, self.rset), 1)
+        selector = is_in_state('validated', 'abandoned')
+        self.assertEqual(selector(None, self.req, self.rset), 1)
+        selector = is_in_state('abandoned')
+        self.assertEqual(selector(None, self.req, self.rset), 0)
+
+        self.adapter.fire_transition('forsake')
+        self._commit()
+        self.assertEqual(self.adapter.state, 'abandoned')
+
+        selector = is_in_state('created')
+        self.assertEqual(selector(None, self.req, self.rset), 0)
+        selector = is_in_state('validated')
+        self.assertEqual(selector(None, self.req, self.rset), 0)
+        selector = is_in_state('validated', 'abandoned')
+        self.assertEqual(selector(None, self.req, self.rset), 1)
+        self.assertEqual(self.adapter.state, 'abandoned')
+        self.assertEqual(selector(None, self.req, self.rset), 1)
+
+    def test_is_in_state_unvalid_names(self):
+        selector = is_in_state("unknown")
+        with self.assertRaises(ValueError) as cm:
+            selector(None, self.req, self.rset)
+        self.assertEqual(str(cm.exception),
+                         "wf_test: unknown state(s): unknown")
+        selector = is_in_state("weird", "unknown", "created", "weird")
+        with self.assertRaises(ValueError) as cm:
+            selector(None, self.req, self.rset)
+        self.assertEqual(str(cm.exception),
+                         "wf_test: unknown state(s): unknown,weird")
+
+    def test_on_transition(self):
+        for transition in ('validate', 'forsake'):
+            selector = on_transition(transition)
+            self.assertEqual(selector(None, self.req, self.rset), 0)
+
+        self.adapter.fire_transition('validate')
+        self._commit()
+        self.assertEqual(self.adapter.state, 'validated')
+
+        selector = on_transition("validate")
+        self.assertEqual(selector(None, self.req, self.rset), 1)
+        selector = on_transition("validate", "forsake")
+        self.assertEqual(selector(None, self.req, self.rset), 1)
+        selector = on_transition("forsake")
+        self.assertEqual(selector(None, self.req, self.rset), 0)
+
+        self.adapter.fire_transition('forsake')
+        self._commit()
+        self.assertEqual(self.adapter.state, 'abandoned')
+
+        selector = on_transition("validate")
+        self.assertEqual(selector(None, self.req, self.rset), 0)
+        selector = on_transition("validate", "forsake")
+        self.assertEqual(selector(None, self.req, self.rset), 1)
+        selector = on_transition("forsake")
+        self.assertEqual(selector(None, self.req, self.rset), 1)
+
+    def test_on_transition_unvalid_names(self):
+        selector = on_transition("unknown")
+        with self.assertRaises(ValueError) as cm:
+            selector(None, self.req, self.rset)
+        self.assertEqual(str(cm.exception),
+                         "wf_test: unknown transition(s): unknown")
+        selector = on_transition("weird", "unknown", "validate", "weird")
+        with self.assertRaises(ValueError) as cm:
+            selector(None, self.req, self.rset)
+        self.assertEqual(str(cm.exception),
+                         "wf_test: unknown transition(s): unknown,weird")
+
+    def test_on_transition_with_no_effect(self):
+        """selector will not be triggered with `change_state()`"""
+        self.adapter.change_state('validated')
+        self._commit()
+        self.assertEqual(self.adapter.state, 'validated')
+
+        selector = on_transition("validate")
+        self.assertEqual(selector(None, self.req, self.rset), 0)
+        selector = on_transition("validate", "forsake")
+        self.assertEqual(selector(None, self.req, self.rset), 0)
+        selector = on_transition("forsake")
+        self.assertEqual(selector(None, self.req, self.rset), 0)
+
+
 class MatchUserGroupsTC(CubicWebTC):
     def test_owners_group(self):
         """tests usage of 'owners' group with match_user_group"""
@@ -245,6 +401,24 @@
             yield self.assertEqual, selector(None, self.req, self.rset), assertion
 
 
+class ScoreEntitySelectorTC(CubicWebTC):
+
+    def test_intscore_entity_selector(self):
+        req = self.request()
+        rset = req.execute('Any E WHERE E eid 1')
+        selector = score_entity(lambda x: None)
+        self.assertEqual(selector(None, req, rset), 0)
+        selector = score_entity(lambda x: "something")
+        self.assertEqual(selector(None, req, rset), 1)
+        selector = score_entity(lambda x: object)
+        self.assertEqual(selector(None, req, rset), 1)
+        rset = req.execute('Any G LIMIT 2 WHERE G is CWGroup')
+        selector = score_entity(lambda x: 10)
+        self.assertEqual(selector(None, req, rset), 20)
+        selector = score_entity(lambda x: 10, once_is_enough=True)
+        self.assertEqual(selector(None, req, rset), 10)
+
+
 if __name__ == '__main__':
     unittest_main()