entities/test/unittest_wfobjs.py
changeset 5556 9ab2b4c74baf
parent 5538 752bc67064f2
child 5557 1a534c596bff
--- a/entities/test/unittest_wfobjs.py	Thu May 20 20:47:13 2010 +0200
+++ b/entities/test/unittest_wfobjs.py	Thu May 20 20:47:55 2010 +0200
@@ -100,35 +100,38 @@
 
     def test_workflow_base(self):
         e = self.create_user('toto')
-        self.assertEquals(e.state, 'activated')
-        e.change_state('deactivated', u'deactivate 1')
+        iworkflowable = e.cw_adapt_to('IWorkflowable')
+        self.assertEquals(iworkflowable.state, 'activated')
+        iworkflowable.change_state('deactivated', u'deactivate 1')
         self.commit()
-        e.change_state('activated', u'activate 1')
+        iworkflowable.change_state('activated', u'activate 1')
         self.commit()
-        e.change_state('deactivated', u'deactivate 2')
+        iworkflowable.change_state('deactivated', u'deactivate 2')
         self.commit()
         e.clear_related_cache('wf_info_for', 'object')
         self.assertEquals([tr.comment for tr in e.reverse_wf_info_for],
                           ['deactivate 1', 'activate 1', 'deactivate 2'])
-        self.assertEquals(e.latest_trinfo().comment, 'deactivate 2')
+        self.assertEquals(iworkflowable.latest_trinfo().comment, 'deactivate 2')
 
     def test_possible_transitions(self):
         user = self.execute('CWUser X').get_entity(0, 0)
-        trs = list(user.possible_transitions())
+        iworkflowable = user.cw_adapt_to('IWorkflowable')
+        trs = list(iworkflowable.possible_transitions())
         self.assertEquals(len(trs), 1)
         self.assertEquals(trs[0].name, u'deactivate')
         self.assertEquals(trs[0].destination(None).name, u'deactivated')
         # test a std user get no possible transition
         cnx = self.login('member')
         # fetch the entity using the new session
-        trs = list(cnx.user().possible_transitions())
+        trs = list(cnx.user().cw_adapt_to('IWorkflowable').possible_transitions())
         self.assertEquals(len(trs), 0)
 
     def _test_manager_deactivate(self, user):
+        iworkflowable = user.cw_adapt_to('IWorkflowable')
         user.clear_related_cache('in_state', 'subject')
         self.assertEquals(len(user.in_state), 1)
-        self.assertEquals(user.state, 'deactivated')
-        trinfo = user.latest_trinfo()
+        self.assertEquals(iworkflowable.state, 'deactivated')
+        trinfo = iworkflowable.latest_trinfo()
         self.assertEquals(trinfo.previous_state.name, 'activated')
         self.assertEquals(trinfo.new_state.name, 'deactivated')
         self.assertEquals(trinfo.comment, 'deactivate user')
@@ -137,7 +140,8 @@
 
     def test_change_state(self):
         user = self.user()
-        user.change_state('deactivated', comment=u'deactivate user')
+        iworkflowable = user.cw_adapt_to('IWorkflowable')
+        iworkflowable.change_state('deactivated', comment=u'deactivate user')
         trinfo = self._test_manager_deactivate(user)
         self.assertEquals(trinfo.transition, None)
 
@@ -154,33 +158,36 @@
 
     def test_fire_transition(self):
         user = self.user()
-        user.fire_transition('deactivate', comment=u'deactivate user')
+        iworkflowable = user.cw_adapt_to('IWorkflowable')
+        iworkflowable.fire_transition('deactivate', comment=u'deactivate user')
         user.clear_all_caches()
-        self.assertEquals(user.state, 'deactivated')
+        self.assertEquals(iworkflowable.state, 'deactivated')
         self._test_manager_deactivate(user)
         trinfo = self._test_manager_deactivate(user)
         self.assertEquals(trinfo.transition.name, 'deactivate')
 
     def test_goback_transition(self):
-        wf = self.session.user.current_workflow
+        wf = self.session.user.cw_adapt_to('IWorkflowable').current_workflow
         asleep = wf.add_state('asleep')
-        wf.add_transition('rest', (wf.state_by_name('activated'), wf.state_by_name('deactivated')),
-                               asleep)
+        wf.add_transition('rest', (wf.state_by_name('activated'),
+                                   wf.state_by_name('deactivated')),
+                          asleep)
         wf.add_transition('wake up', asleep)
         user = self.create_user('stduser')
-        user.fire_transition('rest')
+        iworkflowable = user.cw_adapt_to('IWorkflowable')
+        iworkflowable.fire_transition('rest')
         self.commit()
-        user.fire_transition('wake up')
+        iworkflowable.fire_transition('wake up')
         self.commit()
-        self.assertEquals(user.state, 'activated')
-        user.fire_transition('deactivate')
+        self.assertEquals(iworkflowable.state, 'activated')
+        iworkflowable.fire_transition('deactivate')
         self.commit()
-        user.fire_transition('rest')
+        iworkflowable.fire_transition('rest')
         self.commit()
-        user.fire_transition('wake up')
+        iworkflowable.fire_transition('wake up')
         self.commit()
         user.clear_all_caches()
-        self.assertEquals(user.state, 'deactivated')
+        self.assertEquals(iworkflowable.state, 'deactivated')
 
     # XXX test managers can change state without matching transition
 
@@ -189,18 +196,18 @@
         self.create_user('tutu')
         cnx = self.login('tutu')
         req = self.request()
-        member = req.entity_from_eid(self.member.eid)
+        iworkflowable = req.entity_from_eid(self.member.eid).cw_adapt_to('IWorkflowable')
         ex = self.assertRaises(ValidationError,
-                               member.fire_transition, 'deactivate')
+                               iworkflowable.fire_transition, 'deactivate')
         self.assertEquals(ex.errors, {'by_transition-subject': "transition may not be fired"})
         cnx.close()
         cnx = self.login('member')
         req = self.request()
-        member = req.entity_from_eid(self.member.eid)
-        member.fire_transition('deactivate')
+        iworkflowable = req.entity_from_eid(self.member.eid).cw_adapt_to('IWorkflowable')
+        iworkflowable.fire_transition('deactivate')
         cnx.commit()
         ex = self.assertRaises(ValidationError,
-                               member.fire_transition, 'activate')
+                               iworkflowable.fire_transition, 'activate')
         self.assertEquals(ex.errors, {'by_transition-subject': "transition may not be fired"})
 
     def test_fire_transition_owned_by(self):
@@ -250,43 +257,44 @@
                                       [(swfstate2, state2), (swfstate3, state3)])
         self.assertEquals(swftr1.destination(None).eid, swfstate1.eid)
         # workflows built, begin test
-        self.group = self.request().create_entity('CWGroup', name=u'grp1')
+        group = self.request().create_entity('CWGroup', name=u'grp1')
         self.commit()
-        self.assertEquals(self.group.current_state.eid, state1.eid)
-        self.assertEquals(self.group.current_workflow.eid, mwf.eid)
-        self.assertEquals(self.group.main_workflow.eid, mwf.eid)
-        self.assertEquals(self.group.subworkflow_input_transition(), None)
-        self.group.fire_transition('swftr1', u'go')
+        iworkflowable = group.cw_adapt_to('IWorkflowable')
+        self.assertEquals(iworkflowable.current_state.eid, state1.eid)
+        self.assertEquals(iworkflowable.current_workflow.eid, mwf.eid)
+        self.assertEquals(iworkflowable.main_workflow.eid, mwf.eid)
+        self.assertEquals(iworkflowable.subworkflow_input_transition(), None)
+        iworkflowable.fire_transition('swftr1', u'go')
         self.commit()
-        self.group.clear_all_caches()
-        self.assertEquals(self.group.current_state.eid, swfstate1.eid)
-        self.assertEquals(self.group.current_workflow.eid, swf.eid)
-        self.assertEquals(self.group.main_workflow.eid, mwf.eid)
-        self.assertEquals(self.group.subworkflow_input_transition().eid, swftr1.eid)
-        self.group.fire_transition('tr1', u'go')
+        group.clear_all_caches()
+        self.assertEquals(iworkflowable.current_state.eid, swfstate1.eid)
+        self.assertEquals(iworkflowable.current_workflow.eid, swf.eid)
+        self.assertEquals(iworkflowable.main_workflow.eid, mwf.eid)
+        self.assertEquals(iworkflowable.subworkflow_input_transition().eid, swftr1.eid)
+        iworkflowable.fire_transition('tr1', u'go')
         self.commit()
-        self.group.clear_all_caches()
-        self.assertEquals(self.group.current_state.eid, state2.eid)
-        self.assertEquals(self.group.current_workflow.eid, mwf.eid)
-        self.assertEquals(self.group.main_workflow.eid, mwf.eid)
-        self.assertEquals(self.group.subworkflow_input_transition(), None)
+        group.clear_all_caches()
+        self.assertEquals(iworkflowable.current_state.eid, state2.eid)
+        self.assertEquals(iworkflowable.current_workflow.eid, mwf.eid)
+        self.assertEquals(iworkflowable.main_workflow.eid, mwf.eid)
+        self.assertEquals(iworkflowable.subworkflow_input_transition(), None)
         # force back to swfstate1 is impossible since we can't any more find
         # subworkflow input transition
         ex = self.assertRaises(ValidationError,
-                               self.group.change_state, swfstate1, u'gadget')
+                               iworkflowable.change_state, swfstate1, u'gadget')
         self.assertEquals(ex.errors, {'to_state-subject': "state doesn't belong to entity's workflow"})
         self.rollback()
         # force back to state1
-        self.group.change_state('state1', u'gadget')
-        self.group.fire_transition('swftr1', u'au')
-        self.group.clear_all_caches()
-        self.group.fire_transition('tr2', u'chapeau')
+        iworkflowable.change_state('state1', u'gadget')
+        iworkflowable.fire_transition('swftr1', u'au')
+        group.clear_all_caches()
+        iworkflowable.fire_transition('tr2', u'chapeau')
         self.commit()
-        self.group.clear_all_caches()
-        self.assertEquals(self.group.current_state.eid, state3.eid)
-        self.assertEquals(self.group.current_workflow.eid, mwf.eid)
-        self.assertEquals(self.group.main_workflow.eid, mwf.eid)
-        self.assertListEquals(parse_hist(self.group.workflow_history),
+        group.clear_all_caches()
+        self.assertEquals(iworkflowable.current_state.eid, state3.eid)
+        self.assertEquals(iworkflowable.current_workflow.eid, mwf.eid)
+        self.assertEquals(iworkflowable.main_workflow.eid, mwf.eid)
+        self.assertListEquals(parse_hist(iworkflowable.workflow_history),
                               [('state1', 'swfstate1', 'swftr1', 'go'),
                                ('swfstate1', 'swfstate2', 'tr1', 'go'),
                                ('swfstate2', 'state2', 'swftr1', 'exiting from subworkflow subworkflow'),
@@ -337,8 +345,9 @@
         self.commit()
         group = self.request().create_entity('CWGroup', name=u'grp1')
         self.commit()
+        iworkflowable = group.cw_adapt_to('IWorkflowable')
         for trans in ('identify', 'release', 'close'):
-            group.fire_transition(trans)
+            iworkflowable.fire_transition(trans)
             self.commit()
 
 
@@ -362,6 +371,7 @@
         self.commit()
         group = self.request().create_entity('CWGroup', name=u'grp1')
         self.commit()
+        iworkflowable = group.cw_adapt_to('IWorkflowable')
         for trans, nextstate in (('identify', 'xsigning'),
                                  ('xabort', 'created'),
                                  ('identify', 'xsigning'),
@@ -369,10 +379,10 @@
                                  ('release', 'xsigning'),
                                  ('xabort', 'identified')
                                  ):
-            group.fire_transition(trans)
+            iworkflowable.fire_transition(trans)
             self.commit()
             group.clear_all_caches()
-            self.assertEquals(group.state, nextstate)
+            self.assertEquals(iworkflowable.state, nextstate)
 
 
 class CustomWorkflowTC(CubicWebTC):
@@ -389,35 +399,38 @@
         self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
                      {'wf': wf.eid, 'x': self.member.eid})
         self.member.clear_all_caches()
-        self.assertEquals(self.member.state, 'activated')# no change before commit
+        iworkflowable = self.member.cw_adapt_to('IWorkflowable')
+        self.assertEquals(iworkflowable.state, 'activated')# no change before commit
         self.commit()
         self.member.clear_all_caches()
-        self.assertEquals(self.member.current_workflow.eid, wf.eid)
-        self.assertEquals(self.member.state, 'asleep')
-        self.assertEquals(self.member.workflow_history, ())
+        self.assertEquals(iworkflowable.current_workflow.eid, wf.eid)
+        self.assertEquals(iworkflowable.state, 'asleep')
+        self.assertEquals(iworkflowable.workflow_history, ())
 
     def test_custom_wf_replace_state_keep_history(self):
         """member in inital state with some history, state is redirected and
         state change is recorded to history
         """
-        self.member.fire_transition('deactivate')
-        self.member.fire_transition('activate')
+        iworkflowable = self.member.cw_adapt_to('IWorkflowable')
+        iworkflowable.fire_transition('deactivate')
+        iworkflowable.fire_transition('activate')
         wf = add_wf(self, 'CWUser')
         wf.add_state('asleep', initial=True)
         self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
                      {'wf': wf.eid, 'x': self.member.eid})
         self.commit()
         self.member.clear_all_caches()
-        self.assertEquals(self.member.current_workflow.eid, wf.eid)
-        self.assertEquals(self.member.state, 'asleep')
-        self.assertEquals(parse_hist(self.member.workflow_history),
+        self.assertEquals(iworkflowable.current_workflow.eid, wf.eid)
+        self.assertEquals(iworkflowable.state, 'asleep')
+        self.assertEquals(parse_hist(iworkflowable.workflow_history),
                           [('activated', 'deactivated', 'deactivate', None),
                            ('deactivated', 'activated', 'activate', None),
                            ('activated', 'asleep', None, 'workflow changed to "CWUser"')])
 
     def test_custom_wf_no_initial_state(self):
         """try to set a custom workflow which has no initial state"""
-        self.member.fire_transition('deactivate')
+        iworkflowable = self.member.cw_adapt_to('IWorkflowable')
+        iworkflowable.fire_transition('deactivate')
         wf = add_wf(self, 'CWUser')
         wf.add_state('asleep')
         self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
@@ -438,7 +451,8 @@
         """member in some state shared by the new workflow, nothing has to be
         done
         """
-        self.member.fire_transition('deactivate')
+        iworkflowable = self.member.cw_adapt_to('IWorkflowable')
+        iworkflowable.fire_transition('deactivate')
         wf = add_wf(self, 'CWUser')
         wf.add_state('asleep', initial=True)
         self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
@@ -447,12 +461,12 @@
         self.execute('DELETE X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
                      {'wf': wf.eid, 'x': self.member.eid})
         self.member.clear_all_caches()
-        self.assertEquals(self.member.state, 'asleep')# no change before commit
+        self.assertEquals(iworkflowable.state, 'asleep')# no change before commit
         self.commit()
         self.member.clear_all_caches()
-        self.assertEquals(self.member.current_workflow.name, "default user workflow")
-        self.assertEquals(self.member.state, 'activated')
-        self.assertEquals(parse_hist(self.member.workflow_history),
+        self.assertEquals(iworkflowable.current_workflow.name, "default user workflow")
+        self.assertEquals(iworkflowable.state, 'activated')
+        self.assertEquals(parse_hist(iworkflowable.workflow_history),
                           [('activated', 'deactivated', 'deactivate', None),
                            ('deactivated', 'asleep', None, 'workflow changed to "CWUser"'),
                            ('asleep', 'activated', None, 'workflow changed to "default user workflow"'),])
@@ -473,28 +487,29 @@
     def test_auto_transition_fired(self):
         wf = self.setup_custom_wf()
         user = self.create_user('member')
+        iworkflowable = user.cw_adapt_to('IWorkflowable')
         self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
                      {'wf': wf.eid, 'x': user.eid})
         self.commit()
         user.clear_all_caches()
-        self.assertEquals(user.state, 'asleep')
-        self.assertEquals([t.name for t in user.possible_transitions()],
+        self.assertEquals(iworkflowable.state, 'asleep')
+        self.assertEquals([t.name for t in iworkflowable.possible_transitions()],
                           ['rest'])
-        user.fire_transition('rest')
+        iworkflowable.fire_transition('rest')
         self.commit()
         user.clear_all_caches()
-        self.assertEquals(user.state, 'asleep')
-        self.assertEquals([t.name for t in user.possible_transitions()],
+        self.assertEquals(iworkflowable.state, 'asleep')
+        self.assertEquals([t.name for t in iworkflowable.possible_transitions()],
                           ['rest'])
-        self.assertEquals(parse_hist(user.workflow_history),
+        self.assertEquals(parse_hist(iworkflowable.workflow_history),
                           [('asleep', 'asleep', 'rest', None)])
         user.set_attributes(surname=u'toto') # fulfill condition
         self.commit()
-        user.fire_transition('rest')
+        iworkflowable.fire_transition('rest')
         self.commit()
         user.clear_all_caches()
-        self.assertEquals(user.state, 'dead')
-        self.assertEquals(parse_hist(user.workflow_history),
+        self.assertEquals(iworkflowable.state, 'dead')
+        self.assertEquals(parse_hist(iworkflowable.workflow_history),
                           [('asleep', 'asleep', 'rest', None),
                            ('asleep', 'asleep', 'rest', None),
                            ('asleep', 'dead', 'sick', None),])
@@ -505,7 +520,8 @@
         self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
                      {'wf': wf.eid, 'x': user.eid})
         self.commit()
-        self.assertEquals(user.state, 'dead')
+        iworkflowable = user.cw_adapt_to('IWorkflowable')
+        self.assertEquals(iworkflowable.state, 'dead')
 
     def test_auto_transition_initial_state_fired(self):
         wf = self.execute('Any WF WHERE ET default_workflow WF, '
@@ -517,14 +533,15 @@
         self.commit()
         user = self.create_user('member', surname=u'toto')
         self.commit()
-        self.assertEquals(user.state, 'dead')
+        iworkflowable = user.cw_adapt_to('IWorkflowable')
+        self.assertEquals(iworkflowable.state, 'dead')
 
 
 class WorkflowHooksTC(CubicWebTC):
 
     def setUp(self):
         CubicWebTC.setUp(self)
-        self.wf = self.session.user.current_workflow
+        self.wf = self.session.user.cw_adapt_to('IWorkflowable').current_workflow
         self.session.set_pool()
         self.s_activated = self.wf.state_by_name('activated').eid
         self.s_deactivated = self.wf.state_by_name('deactivated').eid
@@ -572,8 +589,9 @@
     def test_transition_checking1(self):
         cnx = self.login('stduser')
         user = cnx.user(self.session)
+        iworkflowable = user.cw_adapt_to('IWorkflowable')
         ex = self.assertRaises(ValidationError,
-                               user.fire_transition, 'activate')
+                               iworkflowable.fire_transition, 'activate')
         self.assertEquals(self._cleanup_msg(ex.errors['by_transition-subject']),
                           u"transition isn't allowed from")
         cnx.close()
@@ -581,8 +599,9 @@
     def test_transition_checking2(self):
         cnx = self.login('stduser')
         user = cnx.user(self.session)
+        iworkflowable = user.cw_adapt_to('IWorkflowable')
         ex = self.assertRaises(ValidationError,
-                               user.fire_transition, 'dummy')
+                               iworkflowable.fire_transition, 'dummy')
         self.assertEquals(self._cleanup_msg(ex.errors['by_transition-subject']),
                           u"transition isn't allowed from")
         cnx.close()
@@ -591,15 +610,16 @@
         cnx = self.login('stduser')
         session = self.session
         user = cnx.user(session)
-        user.fire_transition('deactivate')
+        iworkflowable = user.cw_adapt_to('IWorkflowable')
+        iworkflowable.fire_transition('deactivate')
         cnx.commit()
         session.set_pool()
         ex = self.assertRaises(ValidationError,
-                               user.fire_transition, 'deactivate')
+                               iworkflowable.fire_transition, 'deactivate')
         self.assertEquals(self._cleanup_msg(ex.errors['by_transition-subject']),
                                             u"transition isn't allowed from")
         # get back now
-        user.fire_transition('activate')
+        iworkflowable.fire_transition('activate')
         cnx.commit()
         cnx.close()