--- a/entities/test/unittest_wfobjs.py Thu May 20 20:47:13 2010 +0200
+++ b/entities/test/unittest_wfobjs.py Thu May 20 20:47:55 2010 +0200
@@ -100,35 +100,38 @@
def test_workflow_base(self):
e = self.create_user('toto')
- self.assertEquals(e.state, 'activated')
- e.change_state('deactivated', u'deactivate 1')
+ iworkflowable = e.cw_adapt_to('IWorkflowable')
+ self.assertEquals(iworkflowable.state, 'activated')
+ iworkflowable.change_state('deactivated', u'deactivate 1')
self.commit()
- e.change_state('activated', u'activate 1')
+ iworkflowable.change_state('activated', u'activate 1')
self.commit()
- e.change_state('deactivated', u'deactivate 2')
+ iworkflowable.change_state('deactivated', u'deactivate 2')
self.commit()
e.clear_related_cache('wf_info_for', 'object')
self.assertEquals([tr.comment for tr in e.reverse_wf_info_for],
['deactivate 1', 'activate 1', 'deactivate 2'])
- self.assertEquals(e.latest_trinfo().comment, 'deactivate 2')
+ self.assertEquals(iworkflowable.latest_trinfo().comment, 'deactivate 2')
def test_possible_transitions(self):
user = self.execute('CWUser X').get_entity(0, 0)
- trs = list(user.possible_transitions())
+ iworkflowable = user.cw_adapt_to('IWorkflowable')
+ trs = list(iworkflowable.possible_transitions())
self.assertEquals(len(trs), 1)
self.assertEquals(trs[0].name, u'deactivate')
self.assertEquals(trs[0].destination(None).name, u'deactivated')
# test a std user get no possible transition
cnx = self.login('member')
# fetch the entity using the new session
- trs = list(cnx.user().possible_transitions())
+ trs = list(cnx.user().cw_adapt_to('IWorkflowable').possible_transitions())
self.assertEquals(len(trs), 0)
def _test_manager_deactivate(self, user):
+ iworkflowable = user.cw_adapt_to('IWorkflowable')
user.clear_related_cache('in_state', 'subject')
self.assertEquals(len(user.in_state), 1)
- self.assertEquals(user.state, 'deactivated')
- trinfo = user.latest_trinfo()
+ self.assertEquals(iworkflowable.state, 'deactivated')
+ trinfo = iworkflowable.latest_trinfo()
self.assertEquals(trinfo.previous_state.name, 'activated')
self.assertEquals(trinfo.new_state.name, 'deactivated')
self.assertEquals(trinfo.comment, 'deactivate user')
@@ -137,7 +140,8 @@
def test_change_state(self):
user = self.user()
- user.change_state('deactivated', comment=u'deactivate user')
+ iworkflowable = user.cw_adapt_to('IWorkflowable')
+ iworkflowable.change_state('deactivated', comment=u'deactivate user')
trinfo = self._test_manager_deactivate(user)
self.assertEquals(trinfo.transition, None)
@@ -154,33 +158,36 @@
def test_fire_transition(self):
user = self.user()
- user.fire_transition('deactivate', comment=u'deactivate user')
+ iworkflowable = user.cw_adapt_to('IWorkflowable')
+ iworkflowable.fire_transition('deactivate', comment=u'deactivate user')
user.clear_all_caches()
- self.assertEquals(user.state, 'deactivated')
+ self.assertEquals(iworkflowable.state, 'deactivated')
self._test_manager_deactivate(user)
trinfo = self._test_manager_deactivate(user)
self.assertEquals(trinfo.transition.name, 'deactivate')
def test_goback_transition(self):
- wf = self.session.user.current_workflow
+ wf = self.session.user.cw_adapt_to('IWorkflowable').current_workflow
asleep = wf.add_state('asleep')
- wf.add_transition('rest', (wf.state_by_name('activated'), wf.state_by_name('deactivated')),
- asleep)
+ wf.add_transition('rest', (wf.state_by_name('activated'),
+ wf.state_by_name('deactivated')),
+ asleep)
wf.add_transition('wake up', asleep)
user = self.create_user('stduser')
- user.fire_transition('rest')
+ iworkflowable = user.cw_adapt_to('IWorkflowable')
+ iworkflowable.fire_transition('rest')
self.commit()
- user.fire_transition('wake up')
+ iworkflowable.fire_transition('wake up')
self.commit()
- self.assertEquals(user.state, 'activated')
- user.fire_transition('deactivate')
+ self.assertEquals(iworkflowable.state, 'activated')
+ iworkflowable.fire_transition('deactivate')
self.commit()
- user.fire_transition('rest')
+ iworkflowable.fire_transition('rest')
self.commit()
- user.fire_transition('wake up')
+ iworkflowable.fire_transition('wake up')
self.commit()
user.clear_all_caches()
- self.assertEquals(user.state, 'deactivated')
+ self.assertEquals(iworkflowable.state, 'deactivated')
# XXX test managers can change state without matching transition
@@ -189,18 +196,18 @@
self.create_user('tutu')
cnx = self.login('tutu')
req = self.request()
- member = req.entity_from_eid(self.member.eid)
+ iworkflowable = req.entity_from_eid(self.member.eid).cw_adapt_to('IWorkflowable')
ex = self.assertRaises(ValidationError,
- member.fire_transition, 'deactivate')
+ iworkflowable.fire_transition, 'deactivate')
self.assertEquals(ex.errors, {'by_transition-subject': "transition may not be fired"})
cnx.close()
cnx = self.login('member')
req = self.request()
- member = req.entity_from_eid(self.member.eid)
- member.fire_transition('deactivate')
+ iworkflowable = req.entity_from_eid(self.member.eid).cw_adapt_to('IWorkflowable')
+ iworkflowable.fire_transition('deactivate')
cnx.commit()
ex = self.assertRaises(ValidationError,
- member.fire_transition, 'activate')
+ iworkflowable.fire_transition, 'activate')
self.assertEquals(ex.errors, {'by_transition-subject': "transition may not be fired"})
def test_fire_transition_owned_by(self):
@@ -250,43 +257,44 @@
[(swfstate2, state2), (swfstate3, state3)])
self.assertEquals(swftr1.destination(None).eid, swfstate1.eid)
# workflows built, begin test
- self.group = self.request().create_entity('CWGroup', name=u'grp1')
+ group = self.request().create_entity('CWGroup', name=u'grp1')
self.commit()
- self.assertEquals(self.group.current_state.eid, state1.eid)
- self.assertEquals(self.group.current_workflow.eid, mwf.eid)
- self.assertEquals(self.group.main_workflow.eid, mwf.eid)
- self.assertEquals(self.group.subworkflow_input_transition(), None)
- self.group.fire_transition('swftr1', u'go')
+ iworkflowable = group.cw_adapt_to('IWorkflowable')
+ self.assertEquals(iworkflowable.current_state.eid, state1.eid)
+ self.assertEquals(iworkflowable.current_workflow.eid, mwf.eid)
+ self.assertEquals(iworkflowable.main_workflow.eid, mwf.eid)
+ self.assertEquals(iworkflowable.subworkflow_input_transition(), None)
+ iworkflowable.fire_transition('swftr1', u'go')
self.commit()
- self.group.clear_all_caches()
- self.assertEquals(self.group.current_state.eid, swfstate1.eid)
- self.assertEquals(self.group.current_workflow.eid, swf.eid)
- self.assertEquals(self.group.main_workflow.eid, mwf.eid)
- self.assertEquals(self.group.subworkflow_input_transition().eid, swftr1.eid)
- self.group.fire_transition('tr1', u'go')
+ group.clear_all_caches()
+ self.assertEquals(iworkflowable.current_state.eid, swfstate1.eid)
+ self.assertEquals(iworkflowable.current_workflow.eid, swf.eid)
+ self.assertEquals(iworkflowable.main_workflow.eid, mwf.eid)
+ self.assertEquals(iworkflowable.subworkflow_input_transition().eid, swftr1.eid)
+ iworkflowable.fire_transition('tr1', u'go')
self.commit()
- self.group.clear_all_caches()
- self.assertEquals(self.group.current_state.eid, state2.eid)
- self.assertEquals(self.group.current_workflow.eid, mwf.eid)
- self.assertEquals(self.group.main_workflow.eid, mwf.eid)
- self.assertEquals(self.group.subworkflow_input_transition(), None)
+ group.clear_all_caches()
+ self.assertEquals(iworkflowable.current_state.eid, state2.eid)
+ self.assertEquals(iworkflowable.current_workflow.eid, mwf.eid)
+ self.assertEquals(iworkflowable.main_workflow.eid, mwf.eid)
+ self.assertEquals(iworkflowable.subworkflow_input_transition(), None)
# force back to swfstate1 is impossible since we can't any more find
# subworkflow input transition
ex = self.assertRaises(ValidationError,
- self.group.change_state, swfstate1, u'gadget')
+ iworkflowable.change_state, swfstate1, u'gadget')
self.assertEquals(ex.errors, {'to_state-subject': "state doesn't belong to entity's workflow"})
self.rollback()
# force back to state1
- self.group.change_state('state1', u'gadget')
- self.group.fire_transition('swftr1', u'au')
- self.group.clear_all_caches()
- self.group.fire_transition('tr2', u'chapeau')
+ iworkflowable.change_state('state1', u'gadget')
+ iworkflowable.fire_transition('swftr1', u'au')
+ group.clear_all_caches()
+ iworkflowable.fire_transition('tr2', u'chapeau')
self.commit()
- self.group.clear_all_caches()
- self.assertEquals(self.group.current_state.eid, state3.eid)
- self.assertEquals(self.group.current_workflow.eid, mwf.eid)
- self.assertEquals(self.group.main_workflow.eid, mwf.eid)
- self.assertListEquals(parse_hist(self.group.workflow_history),
+ group.clear_all_caches()
+ self.assertEquals(iworkflowable.current_state.eid, state3.eid)
+ self.assertEquals(iworkflowable.current_workflow.eid, mwf.eid)
+ self.assertEquals(iworkflowable.main_workflow.eid, mwf.eid)
+ self.assertListEquals(parse_hist(iworkflowable.workflow_history),
[('state1', 'swfstate1', 'swftr1', 'go'),
('swfstate1', 'swfstate2', 'tr1', 'go'),
('swfstate2', 'state2', 'swftr1', 'exiting from subworkflow subworkflow'),
@@ -337,8 +345,9 @@
self.commit()
group = self.request().create_entity('CWGroup', name=u'grp1')
self.commit()
+ iworkflowable = group.cw_adapt_to('IWorkflowable')
for trans in ('identify', 'release', 'close'):
- group.fire_transition(trans)
+ iworkflowable.fire_transition(trans)
self.commit()
@@ -362,6 +371,7 @@
self.commit()
group = self.request().create_entity('CWGroup', name=u'grp1')
self.commit()
+ iworkflowable = group.cw_adapt_to('IWorkflowable')
for trans, nextstate in (('identify', 'xsigning'),
('xabort', 'created'),
('identify', 'xsigning'),
@@ -369,10 +379,10 @@
('release', 'xsigning'),
('xabort', 'identified')
):
- group.fire_transition(trans)
+ iworkflowable.fire_transition(trans)
self.commit()
group.clear_all_caches()
- self.assertEquals(group.state, nextstate)
+ self.assertEquals(iworkflowable.state, nextstate)
class CustomWorkflowTC(CubicWebTC):
@@ -389,35 +399,38 @@
self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
{'wf': wf.eid, 'x': self.member.eid})
self.member.clear_all_caches()
- self.assertEquals(self.member.state, 'activated')# no change before commit
+ iworkflowable = self.member.cw_adapt_to('IWorkflowable')
+ self.assertEquals(iworkflowable.state, 'activated')# no change before commit
self.commit()
self.member.clear_all_caches()
- self.assertEquals(self.member.current_workflow.eid, wf.eid)
- self.assertEquals(self.member.state, 'asleep')
- self.assertEquals(self.member.workflow_history, ())
+ self.assertEquals(iworkflowable.current_workflow.eid, wf.eid)
+ self.assertEquals(iworkflowable.state, 'asleep')
+ self.assertEquals(iworkflowable.workflow_history, ())
def test_custom_wf_replace_state_keep_history(self):
"""member in inital state with some history, state is redirected and
state change is recorded to history
"""
- self.member.fire_transition('deactivate')
- self.member.fire_transition('activate')
+ iworkflowable = self.member.cw_adapt_to('IWorkflowable')
+ iworkflowable.fire_transition('deactivate')
+ iworkflowable.fire_transition('activate')
wf = add_wf(self, 'CWUser')
wf.add_state('asleep', initial=True)
self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
{'wf': wf.eid, 'x': self.member.eid})
self.commit()
self.member.clear_all_caches()
- self.assertEquals(self.member.current_workflow.eid, wf.eid)
- self.assertEquals(self.member.state, 'asleep')
- self.assertEquals(parse_hist(self.member.workflow_history),
+ self.assertEquals(iworkflowable.current_workflow.eid, wf.eid)
+ self.assertEquals(iworkflowable.state, 'asleep')
+ self.assertEquals(parse_hist(iworkflowable.workflow_history),
[('activated', 'deactivated', 'deactivate', None),
('deactivated', 'activated', 'activate', None),
('activated', 'asleep', None, 'workflow changed to "CWUser"')])
def test_custom_wf_no_initial_state(self):
"""try to set a custom workflow which has no initial state"""
- self.member.fire_transition('deactivate')
+ iworkflowable = self.member.cw_adapt_to('IWorkflowable')
+ iworkflowable.fire_transition('deactivate')
wf = add_wf(self, 'CWUser')
wf.add_state('asleep')
self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
@@ -438,7 +451,8 @@
"""member in some state shared by the new workflow, nothing has to be
done
"""
- self.member.fire_transition('deactivate')
+ iworkflowable = self.member.cw_adapt_to('IWorkflowable')
+ iworkflowable.fire_transition('deactivate')
wf = add_wf(self, 'CWUser')
wf.add_state('asleep', initial=True)
self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
@@ -447,12 +461,12 @@
self.execute('DELETE X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
{'wf': wf.eid, 'x': self.member.eid})
self.member.clear_all_caches()
- self.assertEquals(self.member.state, 'asleep')# no change before commit
+ self.assertEquals(iworkflowable.state, 'asleep')# no change before commit
self.commit()
self.member.clear_all_caches()
- self.assertEquals(self.member.current_workflow.name, "default user workflow")
- self.assertEquals(self.member.state, 'activated')
- self.assertEquals(parse_hist(self.member.workflow_history),
+ self.assertEquals(iworkflowable.current_workflow.name, "default user workflow")
+ self.assertEquals(iworkflowable.state, 'activated')
+ self.assertEquals(parse_hist(iworkflowable.workflow_history),
[('activated', 'deactivated', 'deactivate', None),
('deactivated', 'asleep', None, 'workflow changed to "CWUser"'),
('asleep', 'activated', None, 'workflow changed to "default user workflow"'),])
@@ -473,28 +487,29 @@
def test_auto_transition_fired(self):
wf = self.setup_custom_wf()
user = self.create_user('member')
+ iworkflowable = user.cw_adapt_to('IWorkflowable')
self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
{'wf': wf.eid, 'x': user.eid})
self.commit()
user.clear_all_caches()
- self.assertEquals(user.state, 'asleep')
- self.assertEquals([t.name for t in user.possible_transitions()],
+ self.assertEquals(iworkflowable.state, 'asleep')
+ self.assertEquals([t.name for t in iworkflowable.possible_transitions()],
['rest'])
- user.fire_transition('rest')
+ iworkflowable.fire_transition('rest')
self.commit()
user.clear_all_caches()
- self.assertEquals(user.state, 'asleep')
- self.assertEquals([t.name for t in user.possible_transitions()],
+ self.assertEquals(iworkflowable.state, 'asleep')
+ self.assertEquals([t.name for t in iworkflowable.possible_transitions()],
['rest'])
- self.assertEquals(parse_hist(user.workflow_history),
+ self.assertEquals(parse_hist(iworkflowable.workflow_history),
[('asleep', 'asleep', 'rest', None)])
user.set_attributes(surname=u'toto') # fulfill condition
self.commit()
- user.fire_transition('rest')
+ iworkflowable.fire_transition('rest')
self.commit()
user.clear_all_caches()
- self.assertEquals(user.state, 'dead')
- self.assertEquals(parse_hist(user.workflow_history),
+ self.assertEquals(iworkflowable.state, 'dead')
+ self.assertEquals(parse_hist(iworkflowable.workflow_history),
[('asleep', 'asleep', 'rest', None),
('asleep', 'asleep', 'rest', None),
('asleep', 'dead', 'sick', None),])
@@ -505,7 +520,8 @@
self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
{'wf': wf.eid, 'x': user.eid})
self.commit()
- self.assertEquals(user.state, 'dead')
+ iworkflowable = user.cw_adapt_to('IWorkflowable')
+ self.assertEquals(iworkflowable.state, 'dead')
def test_auto_transition_initial_state_fired(self):
wf = self.execute('Any WF WHERE ET default_workflow WF, '
@@ -517,14 +533,15 @@
self.commit()
user = self.create_user('member', surname=u'toto')
self.commit()
- self.assertEquals(user.state, 'dead')
+ iworkflowable = user.cw_adapt_to('IWorkflowable')
+ self.assertEquals(iworkflowable.state, 'dead')
class WorkflowHooksTC(CubicWebTC):
def setUp(self):
CubicWebTC.setUp(self)
- self.wf = self.session.user.current_workflow
+ self.wf = self.session.user.cw_adapt_to('IWorkflowable').current_workflow
self.session.set_pool()
self.s_activated = self.wf.state_by_name('activated').eid
self.s_deactivated = self.wf.state_by_name('deactivated').eid
@@ -572,8 +589,9 @@
def test_transition_checking1(self):
cnx = self.login('stduser')
user = cnx.user(self.session)
+ iworkflowable = user.cw_adapt_to('IWorkflowable')
ex = self.assertRaises(ValidationError,
- user.fire_transition, 'activate')
+ iworkflowable.fire_transition, 'activate')
self.assertEquals(self._cleanup_msg(ex.errors['by_transition-subject']),
u"transition isn't allowed from")
cnx.close()
@@ -581,8 +599,9 @@
def test_transition_checking2(self):
cnx = self.login('stduser')
user = cnx.user(self.session)
+ iworkflowable = user.cw_adapt_to('IWorkflowable')
ex = self.assertRaises(ValidationError,
- user.fire_transition, 'dummy')
+ iworkflowable.fire_transition, 'dummy')
self.assertEquals(self._cleanup_msg(ex.errors['by_transition-subject']),
u"transition isn't allowed from")
cnx.close()
@@ -591,15 +610,16 @@
cnx = self.login('stduser')
session = self.session
user = cnx.user(session)
- user.fire_transition('deactivate')
+ iworkflowable = user.cw_adapt_to('IWorkflowable')
+ iworkflowable.fire_transition('deactivate')
cnx.commit()
session.set_pool()
ex = self.assertRaises(ValidationError,
- user.fire_transition, 'deactivate')
+ iworkflowable.fire_transition, 'deactivate')
self.assertEquals(self._cleanup_msg(ex.errors['by_transition-subject']),
u"transition isn't allowed from")
# get back now
- user.fire_transition('activate')
+ iworkflowable.fire_transition('activate')
cnx.commit()
cnx.close()