--- a/entities/test/unittest_wfobjs.py Fri Aug 21 16:26:20 2009 +0200
+++ b/entities/test/unittest_wfobjs.py Wed Aug 26 14:45:56 2009 +0200
@@ -1,12 +1,15 @@
from cubicweb.devtools.testlib import CubicWebTC
from cubicweb import ValidationError
-def add_wf(self, etype, name=None):
+def add_wf(self, etype, name=None, default=False):
if name is None:
- name = unicode(etype)
- wf = self.execute('INSERT Workflow X: X name %(n)s', {'n': name}).get_entity(0, 0)
+ name = etype
+ wf = self.execute('INSERT Workflow X: X name %(n)s', {'n': unicode(name)}).get_entity(0, 0)
self.execute('SET WF workflow_of ET WHERE WF eid %(wf)s, ET name %(et)s',
{'wf': wf.eid, 'et': etype})
+ if default:
+ self.execute('SET ET default_workflow WF WHERE WF eid %(wf)s, ET name %(et)s',
+ {'wf': wf.eid, 'et': etype})
return wf
def parse_hist(wfhist):
@@ -31,10 +34,15 @@
def test_duplicated_state(self):
wf = add_wf(self, 'Company')
wf.add_state(u'foo', initial=True)
+ self.commit()
wf.add_state(u'foo')
ex = self.assertRaises(ValidationError, self.commit)
# XXX enhance message
self.assertEquals(ex.errors, {'state_of': 'unique constraint S name N, Y state_of O, Y name N failed'})
+ # no pb if not in the same workflow
+ wf2 = add_wf(self, 'Company')
+ foo = wf2.add_state(u'foo', initial=True)
+ self.commit()
def test_duplicated_transition(self):
wf = add_wf(self, 'Company')
@@ -98,9 +106,20 @@
trinfo = self._test_manager_deactivate(user)
self.assertEquals(trinfo.transition, None)
+ def test_set_in_state_bad_wf(self):
+ wf = add_wf(self, 'CWUser')
+ s = wf.add_state(u'foo', initial=True)
+ self.commit()
+ ex = self.assertRaises(ValidationError, self.session().unsafe_execute,
+ 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
+ {'x': self.user().eid, 's': s.eid}, 'x')
+ self.assertEquals(ex.errors, {'in_state': "state doesn't belong to entity's workflow. "
+ "You may want to set a custom workflow for this entity first."})
+
def test_fire_transition(self):
user = self.user()
user.fire_transition('deactivate', comment=u'deactivate user')
+ user.clear_all_caches()
self.assertEquals(user.state, 'deactivated')
self._test_manager_deactivate(user)
trinfo = self._test_manager_deactivate(user)
@@ -139,64 +158,102 @@
'WHERE T name "deactivate"')
self._test_stduser_deactivate()
- def _init_wf_with_shared_state_or_tr(self):
- req = self.request()
- etypes = dict(self.execute('Any N, ET WHERE ET is CWEType, ET name N'
- ', ET name IN ("CWGroup", "Bookmark")'))
- self.grpwf = req.create_entity('Workflow', ('workflow_of', 'ET'),
- ET=etypes['CWGroup'], name=u'group workflow')
- self.bmkwf = req.execute('Any X WHERE X is Workflow, X workflow_of ET, ET name "Bookmark"').get_entity(0, 0)
- self.state1 = self.grpwf.add_state(u'state1', initial=True)
- self.execute('SET S state_of WF WHERE S eid %(s)s, WF eid %(wf)s',
- {'s': self.state1.eid, 'wf': self.bmkwf.eid})
- self.execute('SET WF initial_state S WHERE S eid %(s)s, WF eid %(wf)s',
- {'s': self.state1.eid, 'wf': self.bmkwf.eid})
- self.state2 = self.grpwf.add_state(u'state2')
- self.group = self.add_entity('CWGroup', name=u't1')
- self.bookmark = self.add_entity('Bookmark', title=u'111', path=u'/view')
- # commit to link to the initial state
- self.commit()
+ def test_subworkflow_base(self):
+ """subworkflow
- def test_transitions_selection(self):
- """
- ------------------------ tr1 -----------------
- | state1 (CWGroup, Bookmark) | ------> | state2 (CWGroup) |
- ------------------------ -----------------
- | tr2 ------------------
- `------> | state3 (Bookmark) |
- ------------------
+ +-----------+ tr1 +-----------+
+ | swfstate1 | ------>| swfstate2 |
+ +-----------+ +-----------+
+ | tr2 +-----------+
+ `------>| swfstate3 |
+ +-----------+
+
+ main workflow
+
+ +--------+ swftr1 +--------+
+ | state1 | -------[swfstate2]->| state2 |
+ +--------+ | +--------+
+ | +--------+
+ `-[swfstate3]-->| state3 |
+ +--------+
"""
- self._init_wf_with_shared_state_or_tr()
- state3 = self.bmkwf.add_state(u'state3')
- tr1 = self.grpwf.add_transition(u'tr1', (self.state1,), self.state2)
- tr2 = self.bmkwf.add_transition(u'tr2', (self.state1,), state3)
- transitions = list(self.group.possible_transitions())
- self.assertEquals(len(transitions), 1)
- self.assertEquals(transitions[0].name, 'tr1')
- transitions = list(self.bookmark.possible_transitions())
- self.assertEquals(len(transitions), 1)
- self.assertEquals(transitions[0].name, 'tr2')
-
+ # sub-workflow
+ swf = add_wf(self, 'CWGroup', name='subworkflow')
+ swfstate1 = swf.add_state(u'swfstate1', initial=True)
+ swfstate2 = swf.add_state(u'swfstate2')
+ swfstate3 = swf.add_state(u'swfstate3')
+ tr1 = swf.add_transition(u'tr1', (swfstate1,), swfstate2)
+ tr2 = swf.add_transition(u'tr2', (swfstate1,), swfstate3)
+ # main workflow
+ mwf = add_wf(self, 'CWGroup', name='main workflow', default=True)
+ state1 = mwf.add_state(u'state1', initial=True)
+ state2 = mwf.add_state(u'state2')
+ state3 = mwf.add_state(u'state3')
+ swftr1 = mwf.add_wftransition(u'swftr1', swf, state1,
+ [(swfstate2, state2), (swfstate3, state3)])
+ self.assertEquals(swftr1.destination().eid, swfstate1.eid)
+ # workflows built, begin test
+ self.group = self.add_entity('CWGroup', name=u'grp1')
+ self.commit()
+ self.assertEquals(self.group.current_state.eid, state1.eid)
+ self.assertEquals(self.group.current_workflow.eid, mwf.eid)
+ self.assertEquals(self.group.main_workflow.eid, mwf.eid)
+ self.assertEquals(self.group.subworkflow_input_transition(), None)
+ self.group.fire_transition('swftr1', u'go')
+ self.commit()
+ self.group.clear_all_caches()
+ self.assertEquals(self.group.current_state.eid, swfstate1.eid)
+ self.assertEquals(self.group.current_workflow.eid, swf.eid)
+ self.assertEquals(self.group.main_workflow.eid, mwf.eid)
+ self.assertEquals(self.group.subworkflow_input_transition().eid, swftr1.eid)
+ self.group.fire_transition('tr1', u'go')
+ self.commit()
+ self.group.clear_all_caches()
+ self.assertEquals(self.group.current_state.eid, state2.eid)
+ self.assertEquals(self.group.current_workflow.eid, mwf.eid)
+ self.assertEquals(self.group.main_workflow.eid, mwf.eid)
+ self.assertEquals(self.group.subworkflow_input_transition(), None)
+ # force back to swfstate1 is impossible since we can't any more find
+ # subworkflow input transition
+ ex = self.assertRaises(ValidationError,
+ self.group.change_state, swfstate1, u'gadget')
+ self.assertEquals(ex.errors, {'to_state': "state doesn't belong to entity's current workflow"})
+ self.rollback()
+ # force back to state1
+ self.group.change_state('state1', u'gadget')
+ self.group.fire_transition('swftr1', u'au')
+ self.group.clear_all_caches()
+ self.group.fire_transition('tr2', u'chapeau')
+ self.commit()
+ self.group.clear_all_caches()
+ self.assertEquals(self.group.current_state.eid, state3.eid)
+ self.assertEquals(self.group.current_workflow.eid, mwf.eid)
+ self.assertEquals(self.group.main_workflow.eid, mwf.eid)
+ self.assertListEquals(parse_hist(self.group.workflow_history),
+ [('state1', 'swfstate1', 'swftr1', 'go'),
+ ('swfstate1', 'swfstate2', 'tr1', 'go'),
+ ('swfstate2', 'state2', 'swftr1', 'exiting from subworkflow subworkflow'),
+ ('state2', 'state1', None, 'gadget'),
+ ('state1', 'swfstate1', 'swftr1', 'au'),
+ ('swfstate1', 'swfstate3', 'tr2', 'chapeau'),
+ ('swfstate3', 'state3', 'swftr1', 'exiting from subworkflow subworkflow'),
+ ])
- def test_transitions_selection2(self):
- """
- ------------------------ tr1 (Bookmark) -----------------------
- | state1 (CWGroup, Bookmark) | -------------> | state2 (CWGroup,Bookmark) |
- ------------------------ -----------------------
- | tr2 (CWGroup) |
- `---------------------------------/
- """
- self._init_wf_with_shared_state_or_tr()
- self.execute('SET S state_of WF WHERE S eid %(s)s, WF eid %(wf)s',
- {'s': self.state2.eid, 'wf': self.bmkwf.eid})
- tr1 = self.bmkwf.add_transition(u'tr1', (self.state1,), self.state2)
- tr2 = self.grpwf.add_transition(u'tr2', (self.state1,), self.state2)
- transitions = list(self.group.possible_transitions())
- self.assertEquals(len(transitions), 1)
- self.assertEquals(transitions[0].name, 'tr2')
- transitions = list(self.bookmark.possible_transitions())
- self.assertEquals(len(transitions), 1)
- self.assertEquals(transitions[0].name, 'tr1')
+ def test_subworkflow_exit_consistency(self):
+ # sub-workflow
+ swf = add_wf(self, 'CWGroup', name='subworkflow')
+ swfstate1 = swf.add_state(u'swfstate1', initial=True)
+ swfstate2 = swf.add_state(u'swfstate2')
+ tr1 = swf.add_transition(u'tr1', (swfstate1,), swfstate2)
+ # main workflow
+ mwf = add_wf(self, 'CWGroup', name='main workflow', default=True)
+ state1 = mwf.add_state(u'state1', initial=True)
+ state2 = mwf.add_state(u'state2')
+ state3 = mwf.add_state(u'state3')
+ mwf.add_wftransition(u'swftr1', swf, state1,
+ [(swfstate2, state2), (swfstate2, state3)])
+ ex = self.assertRaises(ValidationError, self.commit)
+ self.assertEquals(ex.errors, {'subworkflow_exit': u"can't have multiple exits on the same state"})
class CustomWorkflowTC(CubicWebTC):
@@ -239,25 +296,6 @@
('deactivated', 'activated', 'activate', None),
('activated', 'asleep', None, 'workflow changed to "CWUser"')])
- def test_custom_wf_shared_state(self):
- """member in some state shared by the new workflow, nothing has to be
- done
- """
- self.member.fire_transition('deactivate')
- self.assertEquals(self.member.state, 'deactivated')
- wf = add_wf(self, 'CWUser')
- wf.add_state('asleep', initial=True)
- self.execute('SET S state_of WF WHERE S name "deactivated", WF eid %(wf)s',
- {'wf': wf.eid})
- self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
- {'wf': wf.eid, 'x': self.member.eid})
- self.commit()
- self.member.clear_all_caches()
- self.assertEquals(self.member.current_workflow.eid, wf.eid)
- self.assertEquals(self.member.state, 'deactivated')
- self.assertEquals(parse_hist(self.member.workflow_history),
- [('activated', 'deactivated', 'deactivate', None)])
-
def test_custom_wf_no_initial_state(self):
"""try to set a custom workflow which has no initial state"""
self.member.fire_transition('deactivate')
@@ -269,8 +307,7 @@
self.assertEquals(ex.errors, {'custom_workflow': u'workflow has no initial state'})
def test_custom_wf_bad_etype(self):
- """try to set a custom workflow which has no initial state"""
- self.member.fire_transition('deactivate')
+ """try to set a custom workflow which doesn't apply to entity type"""
wf = add_wf(self, 'Company')
wf.add_state('asleep', initial=True)
self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',