[selectors] modify workflow selectors: is_in_state, on_transition stable
authorJulien Jehannet <julien.jehannet@logilab.fr>
Fri, 28 Jan 2011 16:08:40 +0100
branchstable
changeset 6919 8fd6921f3e7c
parent 6918 9e607157d4cf
child 6920 c41336a682ed
[selectors] modify workflow selectors: is_in_state, on_transition - factorize `is_on_state` selector - add new `on_transition` selector Especially useful to match pending transitions to enable notifications when your workflow allows several transition to the same states. Note that if workflow `change_state` adapter method is used, this selector will not be triggered. In debug mode: These both selectors will check against the entity current workflow if expected values given in selector argument are valid. ValueError exception will be raised for unmatching state/transition names against the current workflow (generic etype workflow). (check against custom workflow is not implemented)
doc/book/en/devrepo/vreg.rst
selectors.py
test/data/schema.py
test/unittest_selectors.py
--- a/doc/book/en/devrepo/vreg.rst	Fri Jan 28 15:23:47 2011 +0100
+++ b/doc/book/en/devrepo/vreg.rst	Fri Jan 28 16:08:40 2011 +0100
@@ -79,6 +79,7 @@
 .. autoclass:: cubicweb.selectors.has_add_permission
 .. autoclass:: cubicweb.selectors.has_mimetype
 .. autoclass:: cubicweb.selectors.is_in_state
+.. autoclass:: cubicweb.selectors.on_transition
 .. autoclass:: cubicweb.selectors.implements
 
 
--- a/selectors.py	Fri Jan 28 15:23:47 2011 +0100
+++ b/selectors.py	Fri Jan 28 16:08:40 2011 +0100
@@ -1175,26 +1175,92 @@
 
 
 class is_in_state(score_entity):
-    """return 1 if entity is in one of the states given as argument list
+    """Return 1 if entity is in one of the states given as argument list
 
-    you should use this instead of your own :class:`score_entity` selector to
+    You should use this instead of your own :class:`score_entity` selector to
     avoid some gotchas:
 
     * possible views gives a fake entity with no state
-    * you must use the latest tr info, not entity.in_state for repository side
-      checking of the current state
+    * you must use the latest tr info thru the workflow adapter for repository
+      side checking of the current state
+
+    In debug mode, this selector can raise:
+    :raises: :exc:`ValueError` for unknown states names
+        (etype workflow only not checked in custom workflow)
+
+    :rtype: int
     """
-    def __init__(self, *states):
-        def score(entity, states=set(states)):
+    def __init__(self, *expected):
+        assert expected, self
+        self.expected = frozenset(expected)
+        def score(entity, expected=self.expected):
             adapted = entity.cw_adapt_to('IWorkflowable')
-            trinfo = adapted.latest_trinfo()
-            if trinfo is None: # entity is probably in it's initial state
-                statename = adapted.state
-            else:
-                statename = trinfo.new_state.name
-            return statename in states
+            # in debug mode only (time consuming)
+            if entity._cw.vreg.config.debugmode:
+                # validation can only be done for generic etype workflow because
+                # expected transition list could have been changed for a custom
+                # workflow (for the current entity)
+                if not entity.custom_workflow:
+                    self._validate(adapted)
+            return self._score(adapted)
         super(is_in_state, self).__init__(score)
 
+    def _score(self, adapted):
+        trinfo = adapted.latest_trinfo()
+        if trinfo is None: # entity is probably in it's initial state
+            statename = adapted.state
+        else:
+            statename = trinfo.new_state.name
+        return statename in self.expected
+
+    def _validate(self, adapted):
+        wf = adapted.current_workflow
+        valid = [n.name for n in wf.reverse_state_of]
+        unknown = sorted(self.expected.difference(valid))
+        if unknown:
+            raise ValueError("%s: unknown state(s): %s"
+                             % (wf.name, ",".join(unknown)))
+
+    def __str__(self):
+        return '%s(%s)' % (self.__class__.__name__,
+                           ','.join(str(s) for s in self.expected))
+
+
+class on_transition(is_in_state):
+    """Return 1 if entity is in one of the transitions given as argument list
+
+    Especially useful to match passed transition to enable notifications when
+    your workflow allows several transition to the same states.
+
+    Note that if workflow `change_state` adapter method is used, this selector
+    will not be triggered.
+
+    You should use this instead of your own :class:`score_entity` selector to
+    avoid some gotchas:
+
+    * possible views gives a fake entity with no state
+    * you must use the latest tr info thru the workflow adapter for repository
+      side checking of the current state
+
+    In debug mode, this selector can raise:
+    :raises: :exc:`ValueError` for unknown transition names
+        (etype workflow only not checked in custom workflow)
+
+    :rtype: int
+    """
+    def _score(self, adapted):
+        trinfo = adapted.latest_trinfo()
+        if trinfo and trinfo.by_transition:
+            return trinfo.by_transition[0].name in self.expected
+
+    def _validate(self, adapted):
+        wf = adapted.current_workflow
+        valid = [n.name for n in wf.reverse_transition_of]
+        unknown = sorted(self.expected.difference(valid))
+        if unknown:
+            raise ValueError("%s: unknown transition(s): %s"
+                             % (wf.name, ",".join(unknown)))
+
 
 # logged user selectors ########################################################
 
@@ -1433,7 +1499,7 @@
 
 # Other selectors ##############################################################
 
-
+# XXX deprecated ? maybe use on_transition selector instead ?
 class match_transition(ExpectedValueSelector):
     """Return 1 if `transition` argument is found in the input context which has
     a `.name` attribute matching one of the expected names given to the
--- a/test/data/schema.py	Fri Jan 28 15:23:47 2011 +0100
+++ b/test/data/schema.py	Fri Jan 28 16:08:40 2011 +0100
@@ -51,6 +51,7 @@
     subject = 'CWUser'
     object = 'Note'
 
+
 class StateFull(WorkflowableEntityType):
     name = String()
 
--- a/test/unittest_selectors.py	Fri Jan 28 15:23:47 2011 +0100
+++ b/test/unittest_selectors.py	Fri Jan 28 16:08:40 2011 +0100
@@ -24,10 +24,10 @@
 from cubicweb.devtools.testlib import CubicWebTC
 from cubicweb.appobject import Selector, AndSelector, OrSelector
 from cubicweb.selectors import (is_instance, adaptable, match_user_groups,
-                                multi_lines_rset, score_entity, is_in_state)
-from cubicweb.interfaces import IDownloadable
+                                multi_lines_rset, score_entity, is_in_state,
+                                on_transition)
 from cubicweb.web import action
-from cubicweb.server.migractions import ServerMigrationHelper
+
 
 class _1_(Selector):
     def __call__(self, *args, **kwargs):
@@ -138,12 +138,10 @@
         selector |= _0_()
         self.assertEqual(selector(None), 0)
 
+
 class IsInStateSelectorTC(CubicWebTC):
     def setup_database(self):
-        mh = ServerMigrationHelper(self.repo.config, None,
-                                   repo=self.repo, cnx=self.cnx,
-                                   interactive=False)
-        wf = mh.cmd_add_workflow('testwf', 'StateFull', default=True)
+        wf = self.shell().add_workflow("testwf", 'StateFull', default=True)
         initial = wf.add_state(u'initial', initial=True)
         final = wf.add_state(u'final')
         wf.add_transition(u'forward', (initial,), final)
@@ -191,6 +189,131 @@
                           3)
 
 
+class WorkflowSelectorTC(CubicWebTC):
+    def _commit(self):
+        self.commit()
+        self.wf_entity.clear_all_caches()
+
+    def setup_database(self):
+        wf = self.shell().add_workflow("wf_test", 'StateFull', default=True)
+        created   = wf.add_state('created', initial=True)
+        validated = wf.add_state('validated')
+        abandoned = wf.add_state('abandoned')
+        wf.add_transition('validate', created, validated, ('managers',))
+        wf.add_transition('forsake', (created, validated,), abandoned, ('managers',))
+
+    def setUp(self):
+        super(WorkflowSelectorTC, self).setUp()
+        self.req = self.request()
+        self.wf_entity = self.req.create_entity('StateFull', name=u'')
+        self.rset = self.wf_entity.as_rset()
+        self.adapter = self.wf_entity.cw_adapt_to('IWorkflowable')
+        self._commit()
+        self.assertEqual(self.adapter.state, 'created')
+        # enable debug mode to state/transition validation on the fly
+        self.vreg.config.debugmode = True
+
+    def tearDown(self):
+        self.vreg.config.debugmode = False
+        super(WorkflowSelectorTC, self).tearDown()
+
+    def test_is_in_state(self):
+        for state in ('created', 'validated', 'abandoned'):
+            selector = is_in_state(state)
+            self.assertEqual(selector(None, self.req, self.rset),
+                             state=="created")
+
+        self.adapter.fire_transition('validate')
+        self._commit()
+        self.assertEqual(self.adapter.state, 'validated')
+
+        selector = is_in_state('created')
+        self.assertEqual(selector(None, self.req, self.rset), 0)
+        selector = is_in_state('validated')
+        self.assertEqual(selector(None, self.req, self.rset), 1)
+        selector = is_in_state('validated', 'abandoned')
+        self.assertEqual(selector(None, self.req, self.rset), 1)
+        selector = is_in_state('abandoned')
+        self.assertEqual(selector(None, self.req, self.rset), 0)
+
+        self.adapter.fire_transition('forsake')
+        self._commit()
+        self.assertEqual(self.adapter.state, 'abandoned')
+
+        selector = is_in_state('created')
+        self.assertEqual(selector(None, self.req, self.rset), 0)
+        selector = is_in_state('validated')
+        self.assertEqual(selector(None, self.req, self.rset), 0)
+        selector = is_in_state('validated', 'abandoned')
+        self.assertEqual(selector(None, self.req, self.rset), 1)
+        self.assertEqual(self.adapter.state, 'abandoned')
+        self.assertEqual(selector(None, self.req, self.rset), 1)
+
+    def test_is_in_state_unvalid_names(self):
+        selector = is_in_state("unknown")
+        with self.assertRaises(ValueError) as cm:
+            selector(None, self.req, self.rset)
+        self.assertEqual(str(cm.exception),
+                         "wf_test: unknown state(s): unknown")
+        selector = is_in_state("weird", "unknown", "created", "weird")
+        with self.assertRaises(ValueError) as cm:
+            selector(None, self.req, self.rset)
+        self.assertEqual(str(cm.exception),
+                         "wf_test: unknown state(s): unknown,weird")
+
+    def test_on_transition(self):
+        for transition in ('validate', 'forsake'):
+            selector = on_transition(transition)
+            self.assertEqual(selector(None, self.req, self.rset), 0)
+
+        self.adapter.fire_transition('validate')
+        self._commit()
+        self.assertEqual(self.adapter.state, 'validated')
+
+        selector = on_transition("validate")
+        self.assertEqual(selector(None, self.req, self.rset), 1)
+        selector = on_transition("validate", "forsake")
+        self.assertEqual(selector(None, self.req, self.rset), 1)
+        selector = on_transition("forsake")
+        self.assertEqual(selector(None, self.req, self.rset), 0)
+
+        self.adapter.fire_transition('forsake')
+        self._commit()
+        self.assertEqual(self.adapter.state, 'abandoned')
+
+        selector = on_transition("validate")
+        self.assertEqual(selector(None, self.req, self.rset), 0)
+        selector = on_transition("validate", "forsake")
+        self.assertEqual(selector(None, self.req, self.rset), 1)
+        selector = on_transition("forsake")
+        self.assertEqual(selector(None, self.req, self.rset), 1)
+
+    def test_on_transition_unvalid_names(self):
+        selector = on_transition("unknown")
+        with self.assertRaises(ValueError) as cm:
+            selector(None, self.req, self.rset)
+        self.assertEqual(str(cm.exception),
+                         "wf_test: unknown transition(s): unknown")
+        selector = on_transition("weird", "unknown", "validate", "weird")
+        with self.assertRaises(ValueError) as cm:
+            selector(None, self.req, self.rset)
+        self.assertEqual(str(cm.exception),
+                         "wf_test: unknown transition(s): unknown,weird")
+
+    def test_on_transition_with_no_effect(self):
+        """selector will not be triggered with `change_state()`"""
+        self.adapter.change_state('validated')
+        self._commit()
+        self.assertEqual(self.adapter.state, 'validated')
+
+        selector = on_transition("validate")
+        self.assertEqual(selector(None, self.req, self.rset), 0)
+        selector = on_transition("validate", "forsake")
+        self.assertEqual(selector(None, self.req, self.rset), 0)
+        selector = on_transition("forsake")
+        self.assertEqual(selector(None, self.req, self.rset), 0)
+
+
 class MatchUserGroupsTC(CubicWebTC):
     def test_owners_group(self):
         """tests usage of 'owners' group with match_user_group"""