--- a/entities/test/unittest_wfobjs.py Fri Jan 07 18:51:50 2011 +0100
+++ b/entities/test/unittest_wfobjs.py Mon Jan 10 12:28:09 2011 +0100
@@ -15,7 +15,9 @@
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
+
from __future__ import with_statement
+
from cubicweb.devtools.testlib import CubicWebTC
from cubicweb import ValidationError
from cubicweb.server.session import security_enabled
@@ -55,8 +57,9 @@
wf.add_state(u'foo', initial=True)
self.commit()
wf.add_state(u'foo')
- ex = self.assertRaises(ValidationError, self.commit)
- self.assertEqual(ex.errors, {'name-subject': 'workflow already have a state of that name'})
+ with self.assertRaises(ValidationError) as cm:
+ self.commit()
+ self.assertEqual(cm.exception.errors, {'name-subject': 'workflow already have a state of that name'})
# no pb if not in the same workflow
wf2 = add_wf(self, 'Company')
foo = wf2.add_state(u'foo', initial=True)
@@ -65,8 +68,9 @@
bar = wf.add_state(u'bar')
self.commit()
bar.set_attributes(name=u'foo')
- ex = self.assertRaises(ValidationError, self.commit)
- self.assertEqual(ex.errors, {'name-subject': 'workflow already have a state of that name'})
+ with self.assertRaises(ValidationError) as cm:
+ self.commit()
+ self.assertEqual(cm.exception.errors, {'name-subject': 'workflow already have a state of that name'})
def test_duplicated_transition(self):
wf = add_wf(self, 'Company')
@@ -74,8 +78,9 @@
bar = wf.add_state(u'bar')
wf.add_transition(u'baz', (foo,), bar, ('managers',))
wf.add_transition(u'baz', (bar,), foo)
- ex = self.assertRaises(ValidationError, self.commit)
- self.assertEqual(ex.errors, {'name-subject': 'workflow already have a transition of that name'})
+ with self.assertRaises(ValidationError) as cm:
+ self.commit()
+ self.assertEqual(cm.exception.errors, {'name-subject': 'workflow already have a transition of that name'})
# no pb if not in the same workflow
wf2 = add_wf(self, 'Company')
foo = wf.add_state(u'foo', initial=True)
@@ -86,8 +91,9 @@
biz = wf.add_transition(u'biz', (bar,), foo)
self.commit()
biz.set_attributes(name=u'baz')
- ex = self.assertRaises(ValidationError, self.commit)
- self.assertEqual(ex.errors, {'name-subject': 'workflow already have a transition of that name'})
+ with self.assertRaises(ValidationError) as cm:
+ self.commit()
+ self.assertEqual(cm.exception.errors, {'name-subject': 'workflow already have a transition of that name'})
class WorkflowTC(CubicWebTC):
@@ -150,10 +156,10 @@
s = wf.add_state(u'foo', initial=True)
self.commit()
with security_enabled(self.session, write=False):
- ex = self.assertRaises(ValidationError, self.session.execute,
- 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
- {'x': self.user().eid, 's': s.eid})
- self.assertEqual(ex.errors, {'in_state-subject': "state doesn't belong to entity's workflow. "
+ with self.assertRaises(ValidationError) as cm:
+ self.session.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
+ {'x': self.user().eid, 's': s.eid})
+ self.assertEqual(cm.exception.errors, {'in_state-subject': "state doesn't belong to entity's workflow. "
"You may want to set a custom workflow for this entity first."})
def test_fire_transition(self):
@@ -197,18 +203,18 @@
cnx = self.login('tutu')
req = self.request()
iworkflowable = req.entity_from_eid(self.member.eid).cw_adapt_to('IWorkflowable')
- ex = self.assertRaises(ValidationError,
- iworkflowable.fire_transition, 'deactivate')
- self.assertEqual(ex.errors, {'by_transition-subject': "transition may not be fired"})
+ with self.assertRaises(ValidationError) as cm:
+ iworkflowable.fire_transition('deactivate')
+ self.assertEqual(cm.exception.errors, {'by_transition-subject': "transition may not be fired"})
cnx.close()
cnx = self.login('member')
req = self.request()
iworkflowable = req.entity_from_eid(self.member.eid).cw_adapt_to('IWorkflowable')
iworkflowable.fire_transition('deactivate')
cnx.commit()
- ex = self.assertRaises(ValidationError,
- iworkflowable.fire_transition, 'activate')
- self.assertEqual(ex.errors, {'by_transition-subject': "transition may not be fired"})
+ with self.assertRaises(ValidationError) as cm:
+ iworkflowable.fire_transition('activate')
+ self.assertEqual(cm.exception.errors, {'by_transition-subject': "transition may not be fired"})
def test_fire_transition_owned_by(self):
self.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", '
@@ -280,9 +286,9 @@
self.assertEqual(iworkflowable.subworkflow_input_transition(), None)
# force back to swfstate1 is impossible since we can't any more find
# subworkflow input transition
- ex = self.assertRaises(ValidationError,
- iworkflowable.change_state, swfstate1, u'gadget')
- self.assertEqual(ex.errors, {'to_state-subject': "state doesn't belong to entity's workflow"})
+ with self.assertRaises(ValidationError) as cm:
+ iworkflowable.change_state(swfstate1, u'gadget')
+ self.assertEqual(cm.exception.errors, {'to_state-subject': "state doesn't belong to entity's workflow"})
self.rollback()
# force back to state1
iworkflowable.change_state('state1', u'gadget')
@@ -317,8 +323,9 @@
state3 = mwf.add_state(u'state3')
mwf.add_wftransition(u'swftr1', swf, state1,
[(swfstate2, state2), (swfstate2, state3)])
- ex = self.assertRaises(ValidationError, self.commit)
- self.assertEqual(ex.errors, {'subworkflow_exit-subject': u"can't have multiple exits on the same state"})
+ with self.assertRaises(ValidationError) as cm:
+ self.commit()
+ self.assertEqual(cm.exception.errors, {'subworkflow_exit-subject': u"can't have multiple exits on the same state"})
def test_swf_fire_in_a_row(self):
# sub-workflow
@@ -435,8 +442,9 @@
wf.add_state('asleep')
self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
{'wf': wf.eid, 'x': self.member.eid})
- ex = self.assertRaises(ValidationError, self.commit)
- self.assertEqual(ex.errors, {'custom_workflow-subject': u'workflow has no initial state'})
+ with self.assertRaises(ValidationError) as cm:
+ self.commit()
+ self.assertEqual(cm.exception.errors, {'custom_workflow-subject': u'workflow has no initial state'})
def test_custom_wf_bad_etype(self):
"""try to set a custom workflow which doesn't apply to entity type"""
@@ -444,8 +452,9 @@
wf.add_state('asleep', initial=True)
self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
{'wf': wf.eid, 'x': self.member.eid})
- ex = self.assertRaises(ValidationError, self.commit)
- self.assertEqual(ex.errors, {'custom_workflow-subject': u"workflow isn't a workflow for this type"})
+ with self.assertRaises(ValidationError) as cm:
+ self.commit()
+ self.assertEqual(cm.exception.errors, {'custom_workflow-subject': u"workflow isn't a workflow for this type"})
def test_del_custom_wf(self):
"""member in some state shared by the new workflow, nothing has to be
@@ -590,9 +599,9 @@
cnx = self.login('stduser')
user = cnx.user(self.session)
iworkflowable = user.cw_adapt_to('IWorkflowable')
- ex = self.assertRaises(ValidationError,
- iworkflowable.fire_transition, 'activate')
- self.assertEqual(self._cleanup_msg(ex.errors['by_transition-subject']),
+ with self.assertRaises(ValidationError) as cm:
+ iworkflowable.fire_transition('activate')
+ self.assertEqual(self._cleanup_msg(cm.exception.errors['by_transition-subject']),
u"transition isn't allowed from")
cnx.close()
@@ -600,9 +609,9 @@
cnx = self.login('stduser')
user = cnx.user(self.session)
iworkflowable = user.cw_adapt_to('IWorkflowable')
- ex = self.assertRaises(ValidationError,
- iworkflowable.fire_transition, 'dummy')
- self.assertEqual(self._cleanup_msg(ex.errors['by_transition-subject']),
+ with self.assertRaises(ValidationError) as cm:
+ iworkflowable.fire_transition('dummy')
+ self.assertEqual(self._cleanup_msg(cm.exception.errors['by_transition-subject']),
u"transition isn't allowed from")
cnx.close()
@@ -614,9 +623,9 @@
iworkflowable.fire_transition('deactivate')
cnx.commit()
session.set_pool()
- ex = self.assertRaises(ValidationError,
- iworkflowable.fire_transition, 'deactivate')
- self.assertEqual(self._cleanup_msg(ex.errors['by_transition-subject']),
+ with self.assertRaises(ValidationError) as cm:
+ iworkflowable.fire_transition('deactivate')
+ self.assertEqual(self._cleanup_msg(cm.exception.errors['by_transition-subject']),
u"transition isn't allowed from")
cnx.rollback()
session.set_pool()