entities/test/unittest_wfobjs.py
branchstable
changeset 6796 e70ca9abfc51
parent 6410 2e7a7b0829ed
child 6868 2d40f3c48f31
--- a/entities/test/unittest_wfobjs.py	Fri Jan 07 18:51:50 2011 +0100
+++ b/entities/test/unittest_wfobjs.py	Mon Jan 10 12:28:09 2011 +0100
@@ -15,7 +15,9 @@
 #
 # You should have received a copy of the GNU Lesser General Public License along
 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
+
 from __future__ import with_statement
+
 from cubicweb.devtools.testlib import CubicWebTC
 from cubicweb import ValidationError
 from cubicweb.server.session import security_enabled
@@ -55,8 +57,9 @@
         wf.add_state(u'foo', initial=True)
         self.commit()
         wf.add_state(u'foo')
-        ex = self.assertRaises(ValidationError, self.commit)
-        self.assertEqual(ex.errors, {'name-subject': 'workflow already have a state of that name'})
+        with self.assertRaises(ValidationError) as cm:
+            self.commit()
+        self.assertEqual(cm.exception.errors, {'name-subject': 'workflow already have a state of that name'})
         # no pb if not in the same workflow
         wf2 = add_wf(self, 'Company')
         foo = wf2.add_state(u'foo', initial=True)
@@ -65,8 +68,9 @@
         bar = wf.add_state(u'bar')
         self.commit()
         bar.set_attributes(name=u'foo')
-        ex = self.assertRaises(ValidationError, self.commit)
-        self.assertEqual(ex.errors, {'name-subject': 'workflow already have a state of that name'})
+        with self.assertRaises(ValidationError) as cm:
+            self.commit()
+        self.assertEqual(cm.exception.errors, {'name-subject': 'workflow already have a state of that name'})
 
     def test_duplicated_transition(self):
         wf = add_wf(self, 'Company')
@@ -74,8 +78,9 @@
         bar = wf.add_state(u'bar')
         wf.add_transition(u'baz', (foo,), bar, ('managers',))
         wf.add_transition(u'baz', (bar,), foo)
-        ex = self.assertRaises(ValidationError, self.commit)
-        self.assertEqual(ex.errors, {'name-subject': 'workflow already have a transition of that name'})
+        with self.assertRaises(ValidationError) as cm:
+            self.commit()
+        self.assertEqual(cm.exception.errors, {'name-subject': 'workflow already have a transition of that name'})
         # no pb if not in the same workflow
         wf2 = add_wf(self, 'Company')
         foo = wf.add_state(u'foo', initial=True)
@@ -86,8 +91,9 @@
         biz = wf.add_transition(u'biz', (bar,), foo)
         self.commit()
         biz.set_attributes(name=u'baz')
-        ex = self.assertRaises(ValidationError, self.commit)
-        self.assertEqual(ex.errors, {'name-subject': 'workflow already have a transition of that name'})
+        with self.assertRaises(ValidationError) as cm:
+            self.commit()
+        self.assertEqual(cm.exception.errors, {'name-subject': 'workflow already have a transition of that name'})
 
 
 class WorkflowTC(CubicWebTC):
@@ -150,10 +156,10 @@
         s = wf.add_state(u'foo', initial=True)
         self.commit()
         with security_enabled(self.session, write=False):
-            ex = self.assertRaises(ValidationError, self.session.execute,
-                               'SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
-                               {'x': self.user().eid, 's': s.eid})
-            self.assertEqual(ex.errors, {'in_state-subject': "state doesn't belong to entity's workflow. "
+            with self.assertRaises(ValidationError) as cm:
+                self.session.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
+                                     {'x': self.user().eid, 's': s.eid})
+            self.assertEqual(cm.exception.errors, {'in_state-subject': "state doesn't belong to entity's workflow. "
                                       "You may want to set a custom workflow for this entity first."})
 
     def test_fire_transition(self):
@@ -197,18 +203,18 @@
         cnx = self.login('tutu')
         req = self.request()
         iworkflowable = req.entity_from_eid(self.member.eid).cw_adapt_to('IWorkflowable')
-        ex = self.assertRaises(ValidationError,
-                               iworkflowable.fire_transition, 'deactivate')
-        self.assertEqual(ex.errors, {'by_transition-subject': "transition may not be fired"})
+        with self.assertRaises(ValidationError) as cm:
+            iworkflowable.fire_transition('deactivate')
+        self.assertEqual(cm.exception.errors, {'by_transition-subject': "transition may not be fired"})
         cnx.close()
         cnx = self.login('member')
         req = self.request()
         iworkflowable = req.entity_from_eid(self.member.eid).cw_adapt_to('IWorkflowable')
         iworkflowable.fire_transition('deactivate')
         cnx.commit()
-        ex = self.assertRaises(ValidationError,
-                               iworkflowable.fire_transition, 'activate')
-        self.assertEqual(ex.errors, {'by_transition-subject': "transition may not be fired"})
+        with self.assertRaises(ValidationError) as cm:
+            iworkflowable.fire_transition('activate')
+        self.assertEqual(cm.exception.errors, {'by_transition-subject': "transition may not be fired"})
 
     def test_fire_transition_owned_by(self):
         self.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", '
@@ -280,9 +286,9 @@
         self.assertEqual(iworkflowable.subworkflow_input_transition(), None)
         # force back to swfstate1 is impossible since we can't any more find
         # subworkflow input transition
-        ex = self.assertRaises(ValidationError,
-                               iworkflowable.change_state, swfstate1, u'gadget')
-        self.assertEqual(ex.errors, {'to_state-subject': "state doesn't belong to entity's workflow"})
+        with self.assertRaises(ValidationError) as cm:
+            iworkflowable.change_state(swfstate1, u'gadget')
+        self.assertEqual(cm.exception.errors, {'to_state-subject': "state doesn't belong to entity's workflow"})
         self.rollback()
         # force back to state1
         iworkflowable.change_state('state1', u'gadget')
@@ -317,8 +323,9 @@
         state3 = mwf.add_state(u'state3')
         mwf.add_wftransition(u'swftr1', swf, state1,
                              [(swfstate2, state2), (swfstate2, state3)])
-        ex = self.assertRaises(ValidationError, self.commit)
-        self.assertEqual(ex.errors, {'subworkflow_exit-subject': u"can't have multiple exits on the same state"})
+        with self.assertRaises(ValidationError) as cm:
+            self.commit()
+        self.assertEqual(cm.exception.errors, {'subworkflow_exit-subject': u"can't have multiple exits on the same state"})
 
     def test_swf_fire_in_a_row(self):
         # sub-workflow
@@ -435,8 +442,9 @@
         wf.add_state('asleep')
         self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
                      {'wf': wf.eid, 'x': self.member.eid})
-        ex = self.assertRaises(ValidationError, self.commit)
-        self.assertEqual(ex.errors, {'custom_workflow-subject': u'workflow has no initial state'})
+        with self.assertRaises(ValidationError) as cm:
+            self.commit()
+        self.assertEqual(cm.exception.errors, {'custom_workflow-subject': u'workflow has no initial state'})
 
     def test_custom_wf_bad_etype(self):
         """try to set a custom workflow which doesn't apply to entity type"""
@@ -444,8 +452,9 @@
         wf.add_state('asleep', initial=True)
         self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
                      {'wf': wf.eid, 'x': self.member.eid})
-        ex = self.assertRaises(ValidationError, self.commit)
-        self.assertEqual(ex.errors, {'custom_workflow-subject': u"workflow isn't a workflow for this type"})
+        with self.assertRaises(ValidationError) as cm:
+            self.commit()
+        self.assertEqual(cm.exception.errors, {'custom_workflow-subject': u"workflow isn't a workflow for this type"})
 
     def test_del_custom_wf(self):
         """member in some state shared by the new workflow, nothing has to be
@@ -590,9 +599,9 @@
         cnx = self.login('stduser')
         user = cnx.user(self.session)
         iworkflowable = user.cw_adapt_to('IWorkflowable')
-        ex = self.assertRaises(ValidationError,
-                               iworkflowable.fire_transition, 'activate')
-        self.assertEqual(self._cleanup_msg(ex.errors['by_transition-subject']),
+        with self.assertRaises(ValidationError) as cm:
+            iworkflowable.fire_transition('activate')
+        self.assertEqual(self._cleanup_msg(cm.exception.errors['by_transition-subject']),
                           u"transition isn't allowed from")
         cnx.close()
 
@@ -600,9 +609,9 @@
         cnx = self.login('stduser')
         user = cnx.user(self.session)
         iworkflowable = user.cw_adapt_to('IWorkflowable')
-        ex = self.assertRaises(ValidationError,
-                               iworkflowable.fire_transition, 'dummy')
-        self.assertEqual(self._cleanup_msg(ex.errors['by_transition-subject']),
+        with self.assertRaises(ValidationError) as cm:
+            iworkflowable.fire_transition('dummy')
+        self.assertEqual(self._cleanup_msg(cm.exception.errors['by_transition-subject']),
                           u"transition isn't allowed from")
         cnx.close()
 
@@ -614,9 +623,9 @@
         iworkflowable.fire_transition('deactivate')
         cnx.commit()
         session.set_pool()
-        ex = self.assertRaises(ValidationError,
-                               iworkflowable.fire_transition, 'deactivate')
-        self.assertEqual(self._cleanup_msg(ex.errors['by_transition-subject']),
+        with self.assertRaises(ValidationError) as cm:
+            iworkflowable.fire_transition('deactivate')
+        self.assertEqual(self._cleanup_msg(cm.exception.errors['by_transition-subject']),
                                             u"transition isn't allowed from")
         cnx.rollback()
         session.set_pool()