entities/test/unittest_wfobjs.py
changeset 3023 7864fee8b4ec
parent 2968 0e3460341023
parent 2992 a5b8bf107a1a
child 3024 bfaf056f1029
--- a/entities/test/unittest_wfobjs.py	Fri Aug 21 16:26:20 2009 +0200
+++ b/entities/test/unittest_wfobjs.py	Wed Aug 26 14:45:56 2009 +0200
@@ -1,12 +1,15 @@
 from cubicweb.devtools.testlib import CubicWebTC
 from cubicweb import ValidationError
 
-def add_wf(self, etype, name=None):
+def add_wf(self, etype, name=None, default=False):
     if name is None:
-        name = unicode(etype)
-    wf = self.execute('INSERT Workflow X: X name %(n)s', {'n': name}).get_entity(0, 0)
+        name = etype
+    wf = self.execute('INSERT Workflow X: X name %(n)s', {'n': unicode(name)}).get_entity(0, 0)
     self.execute('SET WF workflow_of ET WHERE WF eid %(wf)s, ET name %(et)s',
                  {'wf': wf.eid, 'et': etype})
+    if default:
+        self.execute('SET ET default_workflow WF WHERE WF eid %(wf)s, ET name %(et)s',
+                     {'wf': wf.eid, 'et': etype})
     return wf
 
 def parse_hist(wfhist):
@@ -31,10 +34,15 @@
     def test_duplicated_state(self):
         wf = add_wf(self, 'Company')
         wf.add_state(u'foo', initial=True)
+        self.commit()
         wf.add_state(u'foo')
         ex = self.assertRaises(ValidationError, self.commit)
         # XXX enhance message
         self.assertEquals(ex.errors, {'state_of': 'unique constraint S name N, Y state_of O, Y name N failed'})
+        # no pb if not in the same workflow
+        wf2 = add_wf(self, 'Company')
+        foo = wf2.add_state(u'foo', initial=True)
+        self.commit()
 
     def test_duplicated_transition(self):
         wf = add_wf(self, 'Company')
@@ -98,9 +106,20 @@
         trinfo = self._test_manager_deactivate(user)
         self.assertEquals(trinfo.transition, None)
 
+    def test_set_in_state_bad_wf(self):
+        wf = add_wf(self, 'CWUser')
+        s = wf.add_state(u'foo', initial=True)
+        self.commit()
+        ex = self.assertRaises(ValidationError, self.session().unsafe_execute,
+                               'SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
+                               {'x': self.user().eid, 's': s.eid}, 'x')
+        self.assertEquals(ex.errors, {'in_state': "state doesn't belong to entity's workflow. "
+                                      "You may want to set a custom workflow for this entity first."})
+
     def test_fire_transition(self):
         user = self.user()
         user.fire_transition('deactivate', comment=u'deactivate user')
+        user.clear_all_caches()
         self.assertEquals(user.state, 'deactivated')
         self._test_manager_deactivate(user)
         trinfo = self._test_manager_deactivate(user)
@@ -139,64 +158,102 @@
                      'WHERE T name "deactivate"')
         self._test_stduser_deactivate()
 
-    def _init_wf_with_shared_state_or_tr(self):
-        req = self.request()
-        etypes = dict(self.execute('Any N, ET WHERE ET is CWEType, ET name N'
-                                   ', ET name IN ("CWGroup", "Bookmark")'))
-        self.grpwf = req.create_entity('Workflow', ('workflow_of', 'ET'),
-                                       ET=etypes['CWGroup'], name=u'group workflow')
-        self.bmkwf = req.execute('Any X WHERE X is Workflow, X workflow_of ET, ET name "Bookmark"').get_entity(0, 0)
-        self.state1 = self.grpwf.add_state(u'state1', initial=True)
-        self.execute('SET S state_of WF WHERE S eid %(s)s, WF eid %(wf)s',
-                     {'s': self.state1.eid, 'wf': self.bmkwf.eid})
-        self.execute('SET WF initial_state S WHERE S eid %(s)s, WF eid %(wf)s',
-                     {'s': self.state1.eid, 'wf': self.bmkwf.eid})
-        self.state2 = self.grpwf.add_state(u'state2')
-        self.group = self.add_entity('CWGroup', name=u't1')
-        self.bookmark = self.add_entity('Bookmark', title=u'111', path=u'/view')
-        # commit to link to the initial state
-        self.commit()
+    def test_subworkflow_base(self):
+        """subworkflow
 
-    def test_transitions_selection(self):
-        """
-        ------------------------  tr1    -----------------
-        | state1 (CWGroup, Bookmark) | ------> | state2 (CWGroup) |
-        ------------------------         -----------------
-                  |  tr2    ------------------
-                  `------>  | state3 (Bookmark) |
-                            ------------------
+        +-----------+  tr1   +-----------+
+        | swfstate1 | ------>| swfstate2 |
+        +-----------+        +-----------+
+                  |  tr2  +-----------+
+                  `------>| swfstate3 |
+                          +-----------+
+
+        main workflow
+
+        +--------+  swftr1             +--------+
+        | state1 | -------[swfstate2]->| state2 |
+        +--------+     |               +--------+
+                       |               +--------+
+                       `-[swfstate3]-->| state3 |
+                                       +--------+
         """
-        self._init_wf_with_shared_state_or_tr()
-        state3 = self.bmkwf.add_state(u'state3')
-        tr1 = self.grpwf.add_transition(u'tr1', (self.state1,), self.state2)
-        tr2 = self.bmkwf.add_transition(u'tr2', (self.state1,), state3)
-        transitions = list(self.group.possible_transitions())
-        self.assertEquals(len(transitions), 1)
-        self.assertEquals(transitions[0].name, 'tr1')
-        transitions = list(self.bookmark.possible_transitions())
-        self.assertEquals(len(transitions), 1)
-        self.assertEquals(transitions[0].name, 'tr2')
-
+        # sub-workflow
+        swf = add_wf(self, 'CWGroup', name='subworkflow')
+        swfstate1 = swf.add_state(u'swfstate1', initial=True)
+        swfstate2 = swf.add_state(u'swfstate2')
+        swfstate3 = swf.add_state(u'swfstate3')
+        tr1 = swf.add_transition(u'tr1', (swfstate1,), swfstate2)
+        tr2 = swf.add_transition(u'tr2', (swfstate1,), swfstate3)
+        # main workflow
+        mwf = add_wf(self, 'CWGroup', name='main workflow', default=True)
+        state1 = mwf.add_state(u'state1', initial=True)
+        state2 = mwf.add_state(u'state2')
+        state3 = mwf.add_state(u'state3')
+        swftr1 = mwf.add_wftransition(u'swftr1', swf, state1,
+                                      [(swfstate2, state2), (swfstate3, state3)])
+        self.assertEquals(swftr1.destination().eid, swfstate1.eid)
+        # workflows built, begin test
+        self.group = self.add_entity('CWGroup', name=u'grp1')
+        self.commit()
+        self.assertEquals(self.group.current_state.eid, state1.eid)
+        self.assertEquals(self.group.current_workflow.eid, mwf.eid)
+        self.assertEquals(self.group.main_workflow.eid, mwf.eid)
+        self.assertEquals(self.group.subworkflow_input_transition(), None)
+        self.group.fire_transition('swftr1', u'go')
+        self.commit()
+        self.group.clear_all_caches()
+        self.assertEquals(self.group.current_state.eid, swfstate1.eid)
+        self.assertEquals(self.group.current_workflow.eid, swf.eid)
+        self.assertEquals(self.group.main_workflow.eid, mwf.eid)
+        self.assertEquals(self.group.subworkflow_input_transition().eid, swftr1.eid)
+        self.group.fire_transition('tr1', u'go')
+        self.commit()
+        self.group.clear_all_caches()
+        self.assertEquals(self.group.current_state.eid, state2.eid)
+        self.assertEquals(self.group.current_workflow.eid, mwf.eid)
+        self.assertEquals(self.group.main_workflow.eid, mwf.eid)
+        self.assertEquals(self.group.subworkflow_input_transition(), None)
+        # force back to swfstate1 is impossible since we can't any more find
+        # subworkflow input transition
+        ex = self.assertRaises(ValidationError,
+                               self.group.change_state, swfstate1, u'gadget')
+        self.assertEquals(ex.errors, {'to_state': "state doesn't belong to entity's current workflow"})
+        self.rollback()
+        # force back to state1
+        self.group.change_state('state1', u'gadget')
+        self.group.fire_transition('swftr1', u'au')
+        self.group.clear_all_caches()
+        self.group.fire_transition('tr2', u'chapeau')
+        self.commit()
+        self.group.clear_all_caches()
+        self.assertEquals(self.group.current_state.eid, state3.eid)
+        self.assertEquals(self.group.current_workflow.eid, mwf.eid)
+        self.assertEquals(self.group.main_workflow.eid, mwf.eid)
+        self.assertListEquals(parse_hist(self.group.workflow_history),
+                              [('state1', 'swfstate1', 'swftr1', 'go'),
+                               ('swfstate1', 'swfstate2', 'tr1', 'go'),
+                               ('swfstate2', 'state2', 'swftr1', 'exiting from subworkflow subworkflow'),
+                               ('state2', 'state1', None, 'gadget'),
+                               ('state1', 'swfstate1', 'swftr1', 'au'),
+                               ('swfstate1', 'swfstate3', 'tr2', 'chapeau'),
+                               ('swfstate3', 'state3', 'swftr1', 'exiting from subworkflow subworkflow'),
+                               ])
 
-    def test_transitions_selection2(self):
-        """
-        ------------------------  tr1 (Bookmark)   -----------------------
-        | state1 (CWGroup, Bookmark) | -------------> | state2 (CWGroup,Bookmark) |
-        ------------------------                -----------------------
-                  |  tr2 (CWGroup)                     |
-                  `---------------------------------/
-        """
-        self._init_wf_with_shared_state_or_tr()
-        self.execute('SET S state_of WF WHERE S eid %(s)s, WF eid %(wf)s',
-                     {'s': self.state2.eid, 'wf': self.bmkwf.eid})
-        tr1 = self.bmkwf.add_transition(u'tr1', (self.state1,), self.state2)
-        tr2 = self.grpwf.add_transition(u'tr2', (self.state1,), self.state2)
-        transitions = list(self.group.possible_transitions())
-        self.assertEquals(len(transitions), 1)
-        self.assertEquals(transitions[0].name, 'tr2')
-        transitions = list(self.bookmark.possible_transitions())
-        self.assertEquals(len(transitions), 1)
-        self.assertEquals(transitions[0].name, 'tr1')
+    def test_subworkflow_exit_consistency(self):
+        # sub-workflow
+        swf = add_wf(self, 'CWGroup', name='subworkflow')
+        swfstate1 = swf.add_state(u'swfstate1', initial=True)
+        swfstate2 = swf.add_state(u'swfstate2')
+        tr1 = swf.add_transition(u'tr1', (swfstate1,), swfstate2)
+        # main workflow
+        mwf = add_wf(self, 'CWGroup', name='main workflow', default=True)
+        state1 = mwf.add_state(u'state1', initial=True)
+        state2 = mwf.add_state(u'state2')
+        state3 = mwf.add_state(u'state3')
+        mwf.add_wftransition(u'swftr1', swf, state1,
+                             [(swfstate2, state2), (swfstate2, state3)])
+        ex = self.assertRaises(ValidationError, self.commit)
+        self.assertEquals(ex.errors, {'subworkflow_exit': u"can't have multiple exits on the same state"})
 
 
 class CustomWorkflowTC(CubicWebTC):
@@ -239,25 +296,6 @@
                            ('deactivated', 'activated', 'activate', None),
                            ('activated', 'asleep', None, 'workflow changed to "CWUser"')])
 
-    def test_custom_wf_shared_state(self):
-        """member in some state shared by the new workflow, nothing has to be
-        done
-        """
-        self.member.fire_transition('deactivate')
-        self.assertEquals(self.member.state, 'deactivated')
-        wf = add_wf(self, 'CWUser')
-        wf.add_state('asleep', initial=True)
-        self.execute('SET S state_of WF WHERE S name "deactivated", WF eid %(wf)s',
-                     {'wf': wf.eid})
-        self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
-                     {'wf': wf.eid, 'x': self.member.eid})
-        self.commit()
-        self.member.clear_all_caches()
-        self.assertEquals(self.member.current_workflow.eid, wf.eid)
-        self.assertEquals(self.member.state, 'deactivated')
-        self.assertEquals(parse_hist(self.member.workflow_history),
-                          [('activated', 'deactivated', 'deactivate', None)])
-
     def test_custom_wf_no_initial_state(self):
         """try to set a custom workflow which has no initial state"""
         self.member.fire_transition('deactivate')
@@ -269,8 +307,7 @@
         self.assertEquals(ex.errors, {'custom_workflow': u'workflow has no initial state'})
 
     def test_custom_wf_bad_etype(self):
-        """try to set a custom workflow which has no initial state"""
-        self.member.fire_transition('deactivate')
+        """try to set a custom workflow which doesn't apply to entity type"""
         wf = add_wf(self, 'Company')
         wf.add_state('asleep', initial=True)
         self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',