--- a/entities/test/unittest_wfobjs.py Fri Aug 21 08:45:16 2009 +0200
+++ b/entities/test/unittest_wfobjs.py Fri Aug 21 08:45:55 2009 +0200
@@ -1,11 +1,24 @@
from cubicweb.devtools.apptest import EnvBasedTC
from cubicweb import ValidationError
+def add_wf(self, etype, name=None):
+ if name is None:
+ name = unicode(etype)
+ wf = self.execute('INSERT Workflow X: X name %(n)s', {'n': name}).get_entity(0, 0)
+ self.execute('SET WF workflow_of ET WHERE WF eid %(wf)s, ET name %(et)s',
+ {'wf': wf.eid, 'et': etype})
+ return wf
+
+def parse_hist(wfhist):
+ return [(ti.previous_state.name, ti.new_state.name,
+ ti.transition and ti.transition.name, ti.comment)
+ for ti in wfhist]
+
+
class WorkflowBuildingTC(EnvBasedTC):
def test_wf_construction(self):
- wf = self.execute('INSERT Workflow X: X name "test"').get_entity(0, 0)
- self.execute('SET WF workflow_of ET WHERE ET name "Company"')
+ wf = add_wf(self, 'Company')
foo = wf.add_state(u'foo', initial=True)
bar = wf.add_state(u'bar')
self.assertEquals(wf.state_by_name('bar').eid, bar.eid)
@@ -16,8 +29,7 @@
self.assertEquals(baz.require_group[0].name, 'managers')
def test_duplicated_state(self):
- wf = self.execute('INSERT Workflow X: X name "test"').get_entity(0, 0)
- self.execute('SET WF workflow_of ET WHERE ET name "Company"')
+ wf = add_wf(self, 'Company')
wf.add_state(u'foo', initial=True)
wf.add_state(u'foo')
ex = self.assertRaises(ValidationError, self.commit)
@@ -25,8 +37,7 @@
self.assertEquals(ex.errors, {'state_of': 'unique constraint S name N, Y state_of O, Y name N failed'})
def test_duplicated_transition(self):
- wf = self.execute('INSERT Workflow X: X name "test"').get_entity(0, 0)
- self.execute('SET WF workflow_of ET WHERE ET name "Company"')
+ wf = add_wf(self, 'Company')
foo = wf.add_state(u'foo', initial=True)
bar = wf.add_state(u'bar')
wf.add_transition(u'baz', (foo,), bar, ('managers',))
@@ -187,6 +198,112 @@
self.assertEquals(len(transitions), 1)
self.assertEquals(transitions[0].name, 'tr1')
+class CustomWorkflowTC(EnvBasedTC):
+
+ def setup_database(self):
+ self.member = self.create_user('member')
+
+ def tearDown(self):
+ super(CustomWorkflowTC, self).tearDown()
+ self.execute('DELETE X custom_workflow WF')
+
+ def test_custom_wf_replace_state_no_history(self):
+ """member in inital state with no previous history, state is simply
+ redirected when changing workflow
+ """
+ wf = add_wf(self, 'CWUser')
+ wf.add_state('asleep', initial=True)
+ self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
+ {'wf': wf.eid, 'x': self.member.eid})
+ self.member.clear_all_caches()
+ self.assertEquals(self.member.state, 'activated')# no change before commit
+ self.commit()
+ self.member.clear_all_caches()
+ self.assertEquals(self.member.current_workflow.eid, wf.eid)
+ self.assertEquals(self.member.state, 'asleep')
+ self.assertEquals(self.member.workflow_history, [])
+
+ def test_custom_wf_replace_state_keep_history(self):
+ """member in inital state with some history, state is redirected and
+ state change is recorded to history
+ """
+ self.member.fire_transition('deactivate')
+ self.member.fire_transition('activate')
+ wf = add_wf(self, 'CWUser')
+ wf.add_state('asleep', initial=True)
+ self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
+ {'wf': wf.eid, 'x': self.member.eid})
+ self.commit()
+ self.member.clear_all_caches()
+ self.assertEquals(self.member.current_workflow.eid, wf.eid)
+ self.assertEquals(self.member.state, 'asleep')
+ self.assertEquals(parse_hist(self.member.workflow_history),
+ [('activated', 'deactivated', 'deactivate', None),
+ ('deactivated', 'activated', 'activate', None),
+ ('activated', 'asleep', None, 'workflow changed to "CWUser"')])
+
+ def test_custom_wf_shared_state(self):
+ """member in some state shared by the new workflow, nothing has to be
+ done
+ """
+ self.member.fire_transition('deactivate')
+ self.assertEquals(self.member.state, 'deactivated')
+ wf = add_wf(self, 'CWUser')
+ wf.add_state('asleep', initial=True)
+ self.execute('SET S state_of WF WHERE S name "deactivated", WF eid %(wf)s',
+ {'wf': wf.eid})
+ self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
+ {'wf': wf.eid, 'x': self.member.eid})
+ self.commit()
+ self.member.clear_all_caches()
+ self.assertEquals(self.member.current_workflow.eid, wf.eid)
+ self.assertEquals(self.member.state, 'deactivated')
+ self.assertEquals(parse_hist(self.member.workflow_history),
+ [('activated', 'deactivated', 'deactivate', None)])
+
+ def test_custom_wf_no_initial_state(self):
+ """try to set a custom workflow which has no initial state"""
+ self.member.fire_transition('deactivate')
+ wf = add_wf(self, 'CWUser')
+ wf.add_state('asleep')
+ self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
+ {'wf': wf.eid, 'x': self.member.eid})
+ ex = self.assertRaises(ValidationError, self.commit)
+ self.assertEquals(ex.errors, {'custom_workflow': u'workflow has no initial state'})
+
+ def test_custom_wf_bad_etype(self):
+ """try to set a custom workflow which has no initial state"""
+ self.member.fire_transition('deactivate')
+ wf = add_wf(self, 'Company')
+ wf.add_state('asleep', initial=True)
+ self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
+ {'wf': wf.eid, 'x': self.member.eid})
+ ex = self.assertRaises(ValidationError, self.commit)
+ self.assertEquals(ex.errors, {'custom_workflow': 'constraint S is ET, O workflow_of ET failed'})
+
+ def test_del_custom_wf(self):
+ """member in some state shared by the new workflow, nothing has to be
+ done
+ """
+ self.member.fire_transition('deactivate')
+ wf = add_wf(self, 'CWUser')
+ wf.add_state('asleep', initial=True)
+ self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
+ {'wf': wf.eid, 'x': self.member.eid})
+ self.commit()
+ self.execute('DELETE X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
+ {'wf': wf.eid, 'x': self.member.eid})
+ self.member.clear_all_caches()
+ self.assertEquals(self.member.state, 'asleep')# no change before commit
+ self.commit()
+ self.member.clear_all_caches()
+ self.assertEquals(self.member.current_workflow.name, "CWUser workflow")
+ self.assertEquals(self.member.state, 'activated')
+ self.assertEquals(parse_hist(self.member.workflow_history),
+ [('activated', 'deactivated', 'deactivate', None),
+ ('deactivated', 'asleep', None, 'workflow changed to "CWUser"'),
+ ('asleep', 'activated', None, 'workflow changed to "CWUser workflow"'),])
+
from cubicweb.devtools.apptest import RepositoryBasedTC