--- a/entities/test/unittest_wfobjs.py Thu Apr 10 10:05:20 2014 +0200
+++ b/entities/test/unittest_wfobjs.py Fri Apr 04 18:30:45 2014 +0200
@@ -1,4 +1,4 @@
-# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
@@ -19,12 +19,11 @@
from cubicweb import ValidationError
from cubicweb.devtools.testlib import CubicWebTC
-
-def add_wf(self, etype, name=None, default=False):
+def add_wf(shell, etype, name=None, default=False):
if name is None:
name = etype
- return self.shell().add_workflow(name, etype, default=default,
- ensure_workflowable=False)
+ return shell.add_workflow(name, etype, default=default,
+ ensure_workflowable=False)
def parse_hist(wfhist):
return [(ti.previous_state.name, ti.new_state.name,
@@ -35,101 +34,104 @@
class WorkflowBuildingTC(CubicWebTC):
def test_wf_construction(self):
- wf = add_wf(self, 'Company')
- foo = wf.add_state(u'foo', initial=True)
- bar = wf.add_state(u'bar')
- self.assertEqual(wf.state_by_name('bar').eid, bar.eid)
- self.assertEqual(wf.state_by_name('barrr'), None)
- baz = wf.add_transition(u'baz', (foo,), bar, ('managers',))
- self.assertEqual(wf.transition_by_name('baz').eid, baz.eid)
- self.assertEqual(len(baz.require_group), 1)
- self.assertEqual(baz.require_group[0].name, 'managers')
+ with self.admin_access.shell() as shell:
+ wf = add_wf(shell, 'Company')
+ foo = wf.add_state(u'foo', initial=True)
+ bar = wf.add_state(u'bar')
+ self.assertEqual(wf.state_by_name('bar').eid, bar.eid)
+ self.assertEqual(wf.state_by_name('barrr'), None)
+ baz = wf.add_transition(u'baz', (foo,), bar, ('managers',))
+ self.assertEqual(wf.transition_by_name('baz').eid, baz.eid)
+ self.assertEqual(len(baz.require_group), 1)
+ self.assertEqual(baz.require_group[0].name, 'managers')
def test_duplicated_state(self):
- wf = add_wf(self, 'Company')
- wf.add_state(u'foo', initial=True)
- self.commit()
- wf.add_state(u'foo')
- with self.assertRaises(ValidationError) as cm:
- self.commit()
- self.assertEqual({'name-subject': 'workflow already has a state of that name'},
- cm.exception.errors)
- # no pb if not in the same workflow
- wf2 = add_wf(self, 'Company')
- foo = wf2.add_state(u'foo', initial=True)
- self.commit()
- # gnark gnark
- bar = wf.add_state(u'bar')
- self.commit()
- bar.cw_set(name=u'foo')
- with self.assertRaises(ValidationError) as cm:
- self.commit()
- self.assertEqual({'name-subject': 'workflow already has a state of that name'},
- cm.exception.errors)
+ with self.admin_access.shell() as shell:
+ wf = add_wf(shell, 'Company')
+ wf.add_state(u'foo', initial=True)
+ shell.commit()
+ wf.add_state(u'foo')
+ with self.assertRaises(ValidationError) as cm:
+ shell.commit()
+ self.assertEqual({'name-subject': 'workflow already has a state of that name'},
+ cm.exception.errors)
+ # no pb if not in the same workflow
+ wf2 = add_wf(shell, 'Company')
+ foo = wf2.add_state(u'foo', initial=True)
+ shell.commit()
+ # gnark gnark
+ bar = wf.add_state(u'bar')
+ shell.commit()
+ bar.cw_set(name=u'foo')
+ with self.assertRaises(ValidationError) as cm:
+ shell.commit()
+ self.assertEqual({'name-subject': 'workflow already has a state of that name'},
+ cm.exception.errors)
def test_duplicated_transition(self):
- wf = add_wf(self, 'Company')
- foo = wf.add_state(u'foo', initial=True)
- bar = wf.add_state(u'bar')
- wf.add_transition(u'baz', (foo,), bar, ('managers',))
- wf.add_transition(u'baz', (bar,), foo)
- with self.assertRaises(ValidationError) as cm:
- self.commit()
- self.assertEqual(cm.exception.errors, {'name-subject': 'workflow already has a transition of that name'})
- # no pb if not in the same workflow
- wf2 = add_wf(self, 'Company')
- foo = wf.add_state(u'foo', initial=True)
- bar = wf.add_state(u'bar')
- wf.add_transition(u'baz', (foo,), bar, ('managers',))
- self.commit()
- # gnark gnark
- biz = wf.add_transition(u'biz', (bar,), foo)
- self.commit()
- biz.cw_set(name=u'baz')
- with self.assertRaises(ValidationError) as cm:
- self.commit()
- self.assertEqual(cm.exception.errors, {'name-subject': 'workflow already has a transition of that name'})
+ with self.admin_access.shell() as shell:
+ wf = add_wf(shell, 'Company')
+ foo = wf.add_state(u'foo', initial=True)
+ bar = wf.add_state(u'bar')
+ wf.add_transition(u'baz', (foo,), bar, ('managers',))
+ wf.add_transition(u'baz', (bar,), foo)
+ with self.assertRaises(ValidationError) as cm:
+ shell.commit()
+ self.assertEqual(cm.exception.errors, {'name-subject': 'workflow already has a transition of that name'})
+ # no pb if not in the same workflow
+ wf2 = add_wf(shell, 'Company')
+ foo = wf.add_state(u'foo', initial=True)
+ bar = wf.add_state(u'bar')
+ wf.add_transition(u'baz', (foo,), bar, ('managers',))
+ shell.commit()
+ # gnark gnark
+ biz = wf.add_transition(u'biz', (bar,), foo)
+ shell.commit()
+ biz.cw_set(name=u'baz')
+ with self.assertRaises(ValidationError) as cm:
+ shell.commit()
+ self.assertEqual(cm.exception.errors, {'name-subject': 'workflow already has a transition of that name'})
class WorkflowTC(CubicWebTC):
def setup_database(self):
- req = self.request()
rschema = self.schema['in_state']
for rdef in rschema.rdefs.itervalues():
self.assertEqual(rdef.cardinality, '1*')
- self.member = self.create_user(req, 'member')
+ with self.admin_access.client_cnx() as cnx:
+ self.member_eid = self.create_user(cnx, 'member').eid
+ cnx.commit()
def test_workflow_base(self):
- req = self.request()
- e = self.create_user(req, 'toto')
- iworkflowable = e.cw_adapt_to('IWorkflowable')
- self.assertEqual(iworkflowable.state, 'activated')
- iworkflowable.change_state('deactivated', u'deactivate 1')
- self.commit()
- iworkflowable.change_state('activated', u'activate 1')
- self.commit()
- iworkflowable.change_state('deactivated', u'deactivate 2')
- self.commit()
- e.cw_clear_relation_cache('wf_info_for', 'object')
- self.assertEqual([tr.comment for tr in e.reverse_wf_info_for],
- ['deactivate 1', 'activate 1', 'deactivate 2'])
- self.assertEqual(iworkflowable.latest_trinfo().comment, 'deactivate 2')
+ with self.admin_access.web_request() as req:
+ e = self.create_user(req, 'toto')
+ iworkflowable = e.cw_adapt_to('IWorkflowable')
+ self.assertEqual(iworkflowable.state, 'activated')
+ iworkflowable.change_state('deactivated', u'deactivate 1')
+ req.cnx.commit()
+ iworkflowable.change_state('activated', u'activate 1')
+ req.cnx.commit()
+ iworkflowable.change_state('deactivated', u'deactivate 2')
+ req.cnx.commit()
+ e.cw_clear_relation_cache('wf_info_for', 'object')
+ self.assertEqual([tr.comment for tr in e.reverse_wf_info_for],
+ ['deactivate 1', 'activate 1', 'deactivate 2'])
+ self.assertEqual(iworkflowable.latest_trinfo().comment, 'deactivate 2')
def test_possible_transitions(self):
- user = self.execute('CWUser X').get_entity(0, 0)
- iworkflowable = user.cw_adapt_to('IWorkflowable')
- trs = list(iworkflowable.possible_transitions())
- self.assertEqual(len(trs), 1)
- self.assertEqual(trs[0].name, u'deactivate')
- self.assertEqual(trs[0].destination(None).name, u'deactivated')
+ with self.admin_access.web_request() as req:
+ user = req.execute('CWUser X').get_entity(0, 0)
+ iworkflowable = user.cw_adapt_to('IWorkflowable')
+ trs = list(iworkflowable.possible_transitions())
+ self.assertEqual(len(trs), 1)
+ self.assertEqual(trs[0].name, u'deactivate')
+ self.assertEqual(trs[0].destination(None).name, u'deactivated')
# test a std user get no possible transition
- cnx = self.login('member')
- req = self.request()
- # fetch the entity using the new session
- trs = list(req.user.cw_adapt_to('IWorkflowable').possible_transitions())
- self.assertEqual(len(trs), 0)
- cnx.close()
+ with self.new_access('member').web_request() as req:
+ # fetch the entity using the new session
+ trs = list(req.user.cw_adapt_to('IWorkflowable').possible_transitions())
+ self.assertEqual(len(trs), 0)
def _test_manager_deactivate(self, user):
iworkflowable = user.cw_adapt_to('IWorkflowable')
@@ -144,90 +146,93 @@
return trinfo
def test_change_state(self):
- user = self.user()
- iworkflowable = user.cw_adapt_to('IWorkflowable')
- iworkflowable.change_state('deactivated', comment=u'deactivate user')
- trinfo = self._test_manager_deactivate(user)
- self.assertEqual(trinfo.transition, None)
+ with self.admin_access.client_cnx() as cnx:
+ user = cnx.user
+ iworkflowable = user.cw_adapt_to('IWorkflowable')
+ iworkflowable.change_state('deactivated', comment=u'deactivate user')
+ trinfo = self._test_manager_deactivate(user)
+ self.assertEqual(trinfo.transition, None)
def test_set_in_state_bad_wf(self):
- wf = add_wf(self, 'CWUser')
- s = wf.add_state(u'foo', initial=True)
- self.commit()
- with self.session.security_enabled(write=False):
- with self.assertRaises(ValidationError) as cm:
- self.session.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
- {'x': self.user().eid, 's': s.eid})
- self.assertEqual(cm.exception.errors, {'in_state-subject': "state doesn't belong to entity's workflow. "
- "You may want to set a custom workflow for this entity first."})
+ with self.admin_access.shell() as shell:
+ wf = add_wf(shell, 'CWUser')
+ s = wf.add_state(u'foo', initial=True)
+ shell.commit()
+ with self.admin_access.repo_cnx() as cnx:
+ with cnx.security_enabled(write=False):
+ with self.assertRaises(ValidationError) as cm:
+ cnx.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
+ {'x': cnx.user.eid, 's': s.eid})
+ self.assertEqual(cm.exception.errors, {'in_state-subject': "state doesn't belong to entity's workflow. "
+ "You may want to set a custom workflow for this entity first."})
def test_fire_transition(self):
- user = self.user()
- iworkflowable = user.cw_adapt_to('IWorkflowable')
- iworkflowable.fire_transition('deactivate', comment=u'deactivate user')
- user.cw_clear_all_caches()
- self.assertEqual(iworkflowable.state, 'deactivated')
- self._test_manager_deactivate(user)
- trinfo = self._test_manager_deactivate(user)
- self.assertEqual(trinfo.transition.name, 'deactivate')
+ with self.admin_access.client_cnx() as cnx:
+ user = cnx.user
+ iworkflowable = user.cw_adapt_to('IWorkflowable')
+ iworkflowable.fire_transition('deactivate', comment=u'deactivate user')
+ user.cw_clear_all_caches()
+ self.assertEqual(iworkflowable.state, 'deactivated')
+ self._test_manager_deactivate(user)
+ trinfo = self._test_manager_deactivate(user)
+ self.assertEqual(trinfo.transition.name, 'deactivate')
def test_goback_transition(self):
- req = self.request()
- wf = req.user.cw_adapt_to('IWorkflowable').current_workflow
- asleep = wf.add_state('asleep')
- wf.add_transition('rest', (wf.state_by_name('activated'),
- wf.state_by_name('deactivated')),
- asleep)
- wf.add_transition('wake up', asleep)
- user = self.create_user(req, 'stduser')
- iworkflowable = user.cw_adapt_to('IWorkflowable')
- iworkflowable.fire_transition('rest')
- self.commit()
- iworkflowable.fire_transition('wake up')
- self.commit()
- self.assertEqual(iworkflowable.state, 'activated')
- iworkflowable.fire_transition('deactivate')
- self.commit()
- iworkflowable.fire_transition('rest')
- self.commit()
- iworkflowable.fire_transition('wake up')
- self.commit()
- user.cw_clear_all_caches()
- self.assertEqual(iworkflowable.state, 'deactivated')
+ with self.admin_access.web_request() as req:
+ wf = req.user.cw_adapt_to('IWorkflowable').current_workflow
+ asleep = wf.add_state('asleep')
+ wf.add_transition('rest', (wf.state_by_name('activated'),
+ wf.state_by_name('deactivated')),
+ asleep)
+ wf.add_transition('wake up', asleep)
+ user = self.create_user(req, 'stduser')
+ iworkflowable = user.cw_adapt_to('IWorkflowable')
+ iworkflowable.fire_transition('rest')
+ req.cnx.commit()
+ iworkflowable.fire_transition('wake up')
+ req.cnx.commit()
+ self.assertEqual(iworkflowable.state, 'activated')
+ iworkflowable.fire_transition('deactivate')
+ req.cnx.commit()
+ iworkflowable.fire_transition('rest')
+ req.cnx.commit()
+ iworkflowable.fire_transition('wake up')
+ req.cnx.commit()
+ user.cw_clear_all_caches()
+ self.assertEqual(iworkflowable.state, 'deactivated')
# XXX test managers can change state without matching transition
def _test_stduser_deactivate(self):
- ueid = self.member.eid
- req = self.request()
- self.create_user(req, 'tutu')
- cnx = self.login('tutu')
- req = self.request()
- iworkflowable = req.entity_from_eid(self.member.eid).cw_adapt_to('IWorkflowable')
- with self.assertRaises(ValidationError) as cm:
+ with self.admin_access.repo_cnx() as cnx:
+ self.create_user(cnx, 'tutu')
+ with self.new_access('tutu').web_request() as req:
+ iworkflowable = req.entity_from_eid(self.member_eid).cw_adapt_to('IWorkflowable')
+ with self.assertRaises(ValidationError) as cm:
+ iworkflowable.fire_transition('deactivate')
+ self.assertEqual(cm.exception.errors, {'by_transition-subject': "transition may not be fired"})
+ with self.new_access('member').web_request() as req:
+ iworkflowable = req.entity_from_eid(self.member_eid).cw_adapt_to('IWorkflowable')
iworkflowable.fire_transition('deactivate')
- self.assertEqual(cm.exception.errors, {'by_transition-subject': "transition may not be fired"})
- cnx.close()
- cnx = self.login('member')
- req = self.request()
- iworkflowable = req.entity_from_eid(self.member.eid).cw_adapt_to('IWorkflowable')
- iworkflowable.fire_transition('deactivate')
- cnx.commit()
- with self.assertRaises(ValidationError) as cm:
- iworkflowable.fire_transition('activate')
- self.assertEqual(cm.exception.errors, {'by_transition-subject': "transition may not be fired"})
- cnx.close()
+ req.cnx.commit()
+ with self.assertRaises(ValidationError) as cm:
+ iworkflowable.fire_transition('activate')
+ self.assertEqual(cm.exception.errors, {'by_transition-subject': "transition may not be fired"})
def test_fire_transition_owned_by(self):
- self.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", '
- 'X expression "X owned_by U", T condition X '
- 'WHERE T name "deactivate"')
+ with self.admin_access.repo_cnx() as cnx:
+ cnx.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", '
+ 'X expression "X owned_by U", T condition X '
+ 'WHERE T name "deactivate"')
+ cnx.commit()
self._test_stduser_deactivate()
def test_fire_transition_has_update_perm(self):
- self.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", '
- 'X expression "U has_update_permission X", T condition X '
- 'WHERE T name "deactivate"')
+ with self.admin_access.repo_cnx() as cnx:
+ cnx.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", '
+ 'X expression "U has_update_permission X", T condition X '
+ 'WHERE T name "deactivate"')
+ cnx.commit()
self._test_stduser_deactivate()
def test_swf_base(self):
@@ -250,335 +255,357 @@
+--------+
"""
# sub-workflow
- swf = add_wf(self, 'CWGroup', name='subworkflow')
- swfstate1 = swf.add_state(u'swfstate1', initial=True)
- swfstate2 = swf.add_state(u'swfstate2')
- swfstate3 = swf.add_state(u'swfstate3')
- tr1 = swf.add_transition(u'tr1', (swfstate1,), swfstate2)
- tr2 = swf.add_transition(u'tr2', (swfstate1,), swfstate3)
- # main workflow
- mwf = add_wf(self, 'CWGroup', name='main workflow', default=True)
- state1 = mwf.add_state(u'state1', initial=True)
- state2 = mwf.add_state(u'state2')
- state3 = mwf.add_state(u'state3')
- swftr1 = mwf.add_wftransition(u'swftr1', swf, state1,
- [(swfstate2, state2), (swfstate3, state3)])
- swf.cw_clear_all_caches()
- self.assertEqual(swftr1.destination(None).eid, swfstate1.eid)
+ with self.admin_access.shell() as shell:
+ swf = add_wf(shell, 'CWGroup', name='subworkflow')
+ swfstate1 = swf.add_state(u'swfstate1', initial=True)
+ swfstate2 = swf.add_state(u'swfstate2')
+ swfstate3 = swf.add_state(u'swfstate3')
+ tr1 = swf.add_transition(u'tr1', (swfstate1,), swfstate2)
+ tr2 = swf.add_transition(u'tr2', (swfstate1,), swfstate3)
+ # main workflow
+ mwf = add_wf(shell, 'CWGroup', name='main workflow', default=True)
+ state1 = mwf.add_state(u'state1', initial=True)
+ state2 = mwf.add_state(u'state2')
+ state3 = mwf.add_state(u'state3')
+ swftr1 = mwf.add_wftransition(u'swftr1', swf, state1,
+ [(swfstate2, state2), (swfstate3, state3)])
+ swf.cw_clear_all_caches()
+ self.assertEqual(swftr1.destination(None).eid, swfstate1.eid)
# workflows built, begin test
- group = self.request().create_entity('CWGroup', name=u'grp1')
- self.commit()
- iworkflowable = group.cw_adapt_to('IWorkflowable')
- self.assertEqual(iworkflowable.current_state.eid, state1.eid)
- self.assertEqual(iworkflowable.current_workflow.eid, mwf.eid)
- self.assertEqual(iworkflowable.main_workflow.eid, mwf.eid)
- self.assertEqual(iworkflowable.subworkflow_input_transition(), None)
- iworkflowable.fire_transition('swftr1', u'go')
- self.commit()
- group.cw_clear_all_caches()
- self.assertEqual(iworkflowable.current_state.eid, swfstate1.eid)
- self.assertEqual(iworkflowable.current_workflow.eid, swf.eid)
- self.assertEqual(iworkflowable.main_workflow.eid, mwf.eid)
- self.assertEqual(iworkflowable.subworkflow_input_transition().eid, swftr1.eid)
- iworkflowable.fire_transition('tr1', u'go')
- self.commit()
- group.cw_clear_all_caches()
- self.assertEqual(iworkflowable.current_state.eid, state2.eid)
- self.assertEqual(iworkflowable.current_workflow.eid, mwf.eid)
- self.assertEqual(iworkflowable.main_workflow.eid, mwf.eid)
- self.assertEqual(iworkflowable.subworkflow_input_transition(), None)
- # force back to swfstate1 is impossible since we can't any more find
- # subworkflow input transition
- with self.assertRaises(ValidationError) as cm:
- iworkflowable.change_state(swfstate1, u'gadget')
- self.assertEqual(cm.exception.errors, {'to_state-subject': "state doesn't belong to entity's workflow"})
- self.rollback()
- # force back to state1
- iworkflowable.change_state('state1', u'gadget')
- iworkflowable.fire_transition('swftr1', u'au')
- group.cw_clear_all_caches()
- iworkflowable.fire_transition('tr2', u'chapeau')
- self.commit()
- group.cw_clear_all_caches()
- self.assertEqual(iworkflowable.current_state.eid, state3.eid)
- self.assertEqual(iworkflowable.current_workflow.eid, mwf.eid)
- self.assertEqual(iworkflowable.main_workflow.eid, mwf.eid)
- self.assertListEqual(parse_hist(iworkflowable.workflow_history),
- [('state1', 'swfstate1', 'swftr1', 'go'),
- ('swfstate1', 'swfstate2', 'tr1', 'go'),
- ('swfstate2', 'state2', 'swftr1', 'exiting from subworkflow subworkflow'),
- ('state2', 'state1', None, 'gadget'),
- ('state1', 'swfstate1', 'swftr1', 'au'),
- ('swfstate1', 'swfstate3', 'tr2', 'chapeau'),
- ('swfstate3', 'state3', 'swftr1', 'exiting from subworkflow subworkflow'),
- ])
+ with self.admin_access.web_request() as req:
+ group = req.create_entity('CWGroup', name=u'grp1')
+ req.cnx.commit()
+ iworkflowable = group.cw_adapt_to('IWorkflowable')
+ self.assertEqual(iworkflowable.current_state.eid, state1.eid)
+ self.assertEqual(iworkflowable.current_workflow.eid, mwf.eid)
+ self.assertEqual(iworkflowable.main_workflow.eid, mwf.eid)
+ self.assertEqual(iworkflowable.subworkflow_input_transition(), None)
+ iworkflowable.fire_transition('swftr1', u'go')
+ req.cnx.commit()
+ group.cw_clear_all_caches()
+ self.assertEqual(iworkflowable.current_state.eid, swfstate1.eid)
+ self.assertEqual(iworkflowable.current_workflow.eid, swf.eid)
+ self.assertEqual(iworkflowable.main_workflow.eid, mwf.eid)
+ self.assertEqual(iworkflowable.subworkflow_input_transition().eid, swftr1.eid)
+ iworkflowable.fire_transition('tr1', u'go')
+ req.cnx.commit()
+ group.cw_clear_all_caches()
+ self.assertEqual(iworkflowable.current_state.eid, state2.eid)
+ self.assertEqual(iworkflowable.current_workflow.eid, mwf.eid)
+ self.assertEqual(iworkflowable.main_workflow.eid, mwf.eid)
+ self.assertEqual(iworkflowable.subworkflow_input_transition(), None)
+ # force back to swfstate1 is impossible since we can't any more find
+ # subworkflow input transition
+ with self.assertRaises(ValidationError) as cm:
+ iworkflowable.change_state(swfstate1, u'gadget')
+ self.assertEqual(cm.exception.errors, {'to_state-subject': "state doesn't belong to entity's workflow"})
+ req.cnx.rollback()
+ # force back to state1
+ iworkflowable.change_state('state1', u'gadget')
+ iworkflowable.fire_transition('swftr1', u'au')
+ group.cw_clear_all_caches()
+ iworkflowable.fire_transition('tr2', u'chapeau')
+ req.cnx.commit()
+ group.cw_clear_all_caches()
+ self.assertEqual(iworkflowable.current_state.eid, state3.eid)
+ self.assertEqual(iworkflowable.current_workflow.eid, mwf.eid)
+ self.assertEqual(iworkflowable.main_workflow.eid, mwf.eid)
+ self.assertListEqual(parse_hist(iworkflowable.workflow_history),
+ [('state1', 'swfstate1', 'swftr1', 'go'),
+ ('swfstate1', 'swfstate2', 'tr1', 'go'),
+ ('swfstate2', 'state2', 'swftr1', 'exiting from subworkflow subworkflow'),
+ ('state2', 'state1', None, 'gadget'),
+ ('state1', 'swfstate1', 'swftr1', 'au'),
+ ('swfstate1', 'swfstate3', 'tr2', 'chapeau'),
+ ('swfstate3', 'state3', 'swftr1', 'exiting from subworkflow subworkflow'),
+ ])
def test_swf_exit_consistency(self):
- # sub-workflow
- swf = add_wf(self, 'CWGroup', name='subworkflow')
- swfstate1 = swf.add_state(u'swfstate1', initial=True)
- swfstate2 = swf.add_state(u'swfstate2')
- tr1 = swf.add_transition(u'tr1', (swfstate1,), swfstate2)
- # main workflow
- mwf = add_wf(self, 'CWGroup', name='main workflow', default=True)
- state1 = mwf.add_state(u'state1', initial=True)
- state2 = mwf.add_state(u'state2')
- state3 = mwf.add_state(u'state3')
- mwf.add_wftransition(u'swftr1', swf, state1,
- [(swfstate2, state2), (swfstate2, state3)])
- with self.assertRaises(ValidationError) as cm:
- self.commit()
- self.assertEqual(cm.exception.errors, {'subworkflow_exit-subject': u"can't have multiple exits on the same state"})
+ with self.admin_access.shell() as shell:
+ # sub-workflow
+ swf = add_wf(shell, 'CWGroup', name='subworkflow')
+ swfstate1 = swf.add_state(u'swfstate1', initial=True)
+ swfstate2 = swf.add_state(u'swfstate2')
+ tr1 = swf.add_transition(u'tr1', (swfstate1,), swfstate2)
+ # main workflow
+ mwf = add_wf(shell, 'CWGroup', name='main workflow', default=True)
+ state1 = mwf.add_state(u'state1', initial=True)
+ state2 = mwf.add_state(u'state2')
+ state3 = mwf.add_state(u'state3')
+ mwf.add_wftransition(u'swftr1', swf, state1,
+ [(swfstate2, state2), (swfstate2, state3)])
+ with self.assertRaises(ValidationError) as cm:
+ shell.commit()
+ self.assertEqual(cm.exception.errors, {'subworkflow_exit-subject': u"can't have multiple exits on the same state"})
def test_swf_fire_in_a_row(self):
- # sub-workflow
- subwf = add_wf(self, 'CWGroup', name='subworkflow')
- xsigning = subwf.add_state('xsigning', initial=True)
- xaborted = subwf.add_state('xaborted')
- xsigned = subwf.add_state('xsigned')
- xabort = subwf.add_transition('xabort', (xsigning,), xaborted)
- xsign = subwf.add_transition('xsign', (xsigning,), xsigning)
- xcomplete = subwf.add_transition('xcomplete', (xsigning,), xsigned,
- type=u'auto')
- # main workflow
- twf = add_wf(self, 'CWGroup', name='mainwf', default=True)
- created = twf.add_state(_('created'), initial=True)
- identified = twf.add_state(_('identified'))
- released = twf.add_state(_('released'))
- closed = twf.add_state(_('closed'))
- twf.add_wftransition(_('identify'), subwf, (created,),
- [(xsigned, identified), (xaborted, created)])
- twf.add_wftransition(_('release'), subwf, (identified,),
- [(xsigned, released), (xaborted, identified)])
- twf.add_wftransition(_('close'), subwf, (released,),
- [(xsigned, closed), (xaborted, released)])
- self.commit()
- group = self.request().create_entity('CWGroup', name=u'grp1')
- self.commit()
- iworkflowable = group.cw_adapt_to('IWorkflowable')
- for trans in ('identify', 'release', 'close'):
- iworkflowable.fire_transition(trans)
- self.commit()
+ with self.admin_access.shell() as shell:
+ # sub-workflow
+ subwf = add_wf(shell, 'CWGroup', name='subworkflow')
+ xsigning = subwf.add_state('xsigning', initial=True)
+ xaborted = subwf.add_state('xaborted')
+ xsigned = subwf.add_state('xsigned')
+ xabort = subwf.add_transition('xabort', (xsigning,), xaborted)
+ xsign = subwf.add_transition('xsign', (xsigning,), xsigning)
+ xcomplete = subwf.add_transition('xcomplete', (xsigning,), xsigned,
+ type=u'auto')
+ # main workflow
+ twf = add_wf(shell, 'CWGroup', name='mainwf', default=True)
+ created = twf.add_state(_('created'), initial=True)
+ identified = twf.add_state(_('identified'))
+ released = twf.add_state(_('released'))
+ closed = twf.add_state(_('closed'))
+ twf.add_wftransition(_('identify'), subwf, (created,),
+ [(xsigned, identified), (xaborted, created)])
+ twf.add_wftransition(_('release'), subwf, (identified,),
+ [(xsigned, released), (xaborted, identified)])
+ twf.add_wftransition(_('close'), subwf, (released,),
+ [(xsigned, closed), (xaborted, released)])
+ shell.commit()
+ with self.admin_access.repo_cnx() as cnx:
+ group = cnx.create_entity('CWGroup', name=u'grp1')
+ cnx.commit()
+ iworkflowable = group.cw_adapt_to('IWorkflowable')
+ for trans in ('identify', 'release', 'close'):
+ iworkflowable.fire_transition(trans)
+ cnx.commit()
def test_swf_magic_tr(self):
- # sub-workflow
- subwf = add_wf(self, 'CWGroup', name='subworkflow')
- xsigning = subwf.add_state('xsigning', initial=True)
- xaborted = subwf.add_state('xaborted')
- xsigned = subwf.add_state('xsigned')
- xabort = subwf.add_transition('xabort', (xsigning,), xaborted)
- xsign = subwf.add_transition('xsign', (xsigning,), xsigned)
- # main workflow
- twf = add_wf(self, 'CWGroup', name='mainwf', default=True)
- created = twf.add_state(_('created'), initial=True)
- identified = twf.add_state(_('identified'))
- released = twf.add_state(_('released'))
- twf.add_wftransition(_('identify'), subwf, created,
- [(xaborted, None), (xsigned, identified)])
- twf.add_wftransition(_('release'), subwf, identified,
- [(xaborted, None)])
- self.commit()
- group = self.request().create_entity('CWGroup', name=u'grp1')
- self.commit()
- iworkflowable = group.cw_adapt_to('IWorkflowable')
- for trans, nextstate in (('identify', 'xsigning'),
- ('xabort', 'created'),
- ('identify', 'xsigning'),
- ('xsign', 'identified'),
- ('release', 'xsigning'),
- ('xabort', 'identified')
- ):
- iworkflowable.fire_transition(trans)
- self.commit()
- group.cw_clear_all_caches()
- self.assertEqual(iworkflowable.state, nextstate)
+ with self.admin_access.shell() as shell:
+ # sub-workflow
+ subwf = add_wf(shell, 'CWGroup', name='subworkflow')
+ xsigning = subwf.add_state('xsigning', initial=True)
+ xaborted = subwf.add_state('xaborted')
+ xsigned = subwf.add_state('xsigned')
+ xabort = subwf.add_transition('xabort', (xsigning,), xaborted)
+ xsign = subwf.add_transition('xsign', (xsigning,), xsigned)
+ # main workflow
+ twf = add_wf(shell, 'CWGroup', name='mainwf', default=True)
+ created = twf.add_state(_('created'), initial=True)
+ identified = twf.add_state(_('identified'))
+ released = twf.add_state(_('released'))
+ twf.add_wftransition(_('identify'), subwf, created,
+ [(xaborted, None), (xsigned, identified)])
+ twf.add_wftransition(_('release'), subwf, identified,
+ [(xaborted, None)])
+ shell.commit()
+ with self.admin_access.web_request() as req:
+ group = req.create_entity('CWGroup', name=u'grp1')
+ req.cnx.commit()
+ iworkflowable = group.cw_adapt_to('IWorkflowable')
+ for trans, nextstate in (('identify', 'xsigning'),
+ ('xabort', 'created'),
+ ('identify', 'xsigning'),
+ ('xsign', 'identified'),
+ ('release', 'xsigning'),
+ ('xabort', 'identified')
+ ):
+ iworkflowable.fire_transition(trans)
+ req.cnx.commit()
+ group.cw_clear_all_caches()
+ self.assertEqual(iworkflowable.state, nextstate)
class CustomWorkflowTC(CubicWebTC):
def setup_database(self):
- req = self.request()
- self.member = self.create_user(req, 'member')
+ with self.admin_access.repo_cnx() as cnx:
+ self.member_eid = self.create_user(cnx, 'member').eid
def test_custom_wf_replace_state_no_history(self):
"""member in inital state with no previous history, state is simply
redirected when changing workflow
"""
- wf = add_wf(self, 'CWUser')
- wf.add_state('asleep', initial=True)
- self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
- {'wf': wf.eid, 'x': self.member.eid})
- self.member.cw_clear_all_caches()
- iworkflowable = self.member.cw_adapt_to('IWorkflowable')
- self.assertEqual(iworkflowable.state, 'activated')# no change before commit
- self.commit()
- self.member.cw_clear_all_caches()
- self.assertEqual(iworkflowable.current_workflow.eid, wf.eid)
- self.assertEqual(iworkflowable.state, 'asleep')
- self.assertEqual(iworkflowable.workflow_history, ())
+ with self.admin_access.shell() as shell:
+ wf = add_wf(shell, 'CWUser')
+ wf.add_state('asleep', initial=True)
+ with self.admin_access.web_request() as req:
+ req.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
+ {'wf': wf.eid, 'x': self.member_eid})
+ member = req.entity_from_eid(self.member_eid)
+ iworkflowable = member.cw_adapt_to('IWorkflowable')
+ self.assertEqual(iworkflowable.state, 'activated') # no change before commit
+ req.cnx.commit()
+ member.cw_clear_all_caches()
+ self.assertEqual(iworkflowable.current_workflow.eid, wf.eid)
+ self.assertEqual(iworkflowable.state, 'asleep')
+ self.assertEqual(iworkflowable.workflow_history, ())
def test_custom_wf_replace_state_keep_history(self):
"""member in inital state with some history, state is redirected and
state change is recorded to history
"""
- iworkflowable = self.member.cw_adapt_to('IWorkflowable')
- iworkflowable.fire_transition('deactivate')
- iworkflowable.fire_transition('activate')
- wf = add_wf(self, 'CWUser')
- wf.add_state('asleep', initial=True)
- self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
- {'wf': wf.eid, 'x': self.member.eid})
- self.commit()
- self.member.cw_clear_all_caches()
- self.assertEqual(iworkflowable.current_workflow.eid, wf.eid)
- self.assertEqual(iworkflowable.state, 'asleep')
- self.assertEqual(parse_hist(iworkflowable.workflow_history),
- [('activated', 'deactivated', 'deactivate', None),
- ('deactivated', 'activated', 'activate', None),
- ('activated', 'asleep', None, 'workflow changed to "CWUser"')])
+ with self.admin_access.web_request() as req:
+ member = req.entity_from_eid(self.member_eid)
+ iworkflowable = member.cw_adapt_to('IWorkflowable')
+ iworkflowable.fire_transition('deactivate')
+ iworkflowable.fire_transition('activate')
+ req.cnx.commit()
+ with self.admin_access.shell() as shell:
+ wf = add_wf(shell, 'CWUser')
+ wf.add_state('asleep', initial=True)
+ shell.rqlexec('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
+ {'wf': wf.eid, 'x': self.member_eid})
+ with self.admin_access.web_request() as req:
+ member = req.entity_from_eid(self.member_eid)
+ iworkflowable = member.cw_adapt_to('IWorkflowable')
+ self.assertEqual(iworkflowable.current_workflow.eid, wf.eid)
+ self.assertEqual(iworkflowable.state, 'asleep')
+ self.assertEqual(parse_hist(iworkflowable.workflow_history),
+ [('activated', 'deactivated', 'deactivate', None),
+ ('deactivated', 'activated', 'activate', None),
+ ('activated', 'asleep', None, 'workflow changed to "CWUser"')])
def test_custom_wf_no_initial_state(self):
"""try to set a custom workflow which has no initial state"""
- iworkflowable = self.member.cw_adapt_to('IWorkflowable')
- iworkflowable.fire_transition('deactivate')
- wf = add_wf(self, 'CWUser')
- wf.add_state('asleep')
- self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
- {'wf': wf.eid, 'x': self.member.eid})
- with self.assertRaises(ValidationError) as cm:
- self.commit()
- self.assertEqual(cm.exception.errors, {'custom_workflow-subject': u'workflow has no initial state'})
+ with self.admin_access.shell() as shell:
+ wf = add_wf(shell, 'CWUser')
+ wf.add_state('asleep')
+ shell.rqlexec('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
+ {'wf': wf.eid, 'x': self.member_eid})
+ with self.assertRaises(ValidationError) as cm:
+ shell.commit()
+ self.assertEqual(cm.exception.errors, {'custom_workflow-subject': u'workflow has no initial state'})
def test_custom_wf_bad_etype(self):
"""try to set a custom workflow which doesn't apply to entity type"""
- wf = add_wf(self, 'Company')
- wf.add_state('asleep', initial=True)
- self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
- {'wf': wf.eid, 'x': self.member.eid})
- with self.assertRaises(ValidationError) as cm:
- self.commit()
- self.assertEqual(cm.exception.errors, {'custom_workflow-subject': u"workflow isn't a workflow for this type"})
+ with self.admin_access.shell() as shell:
+ wf = add_wf(shell, 'Company')
+ wf.add_state('asleep', initial=True)
+ shell.rqlexec('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
+ {'wf': wf.eid, 'x': self.member_eid})
+ with self.assertRaises(ValidationError) as cm:
+ shell.commit()
+ self.assertEqual(cm.exception.errors, {'custom_workflow-subject': u"workflow isn't a workflow for this type"})
def test_del_custom_wf(self):
"""member in some state shared by the new workflow, nothing has to be
done
"""
- iworkflowable = self.member.cw_adapt_to('IWorkflowable')
- iworkflowable.fire_transition('deactivate')
- wf = add_wf(self, 'CWUser')
- wf.add_state('asleep', initial=True)
- self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
- {'wf': wf.eid, 'x': self.member.eid})
- self.commit()
- self.execute('DELETE X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
- {'wf': wf.eid, 'x': self.member.eid})
- self.member.cw_clear_all_caches()
- self.assertEqual(iworkflowable.state, 'asleep')# no change before commit
- self.commit()
- self.member.cw_clear_all_caches()
- self.assertEqual(iworkflowable.current_workflow.name, "default user workflow")
- self.assertEqual(iworkflowable.state, 'activated')
- self.assertEqual(parse_hist(iworkflowable.workflow_history),
- [('activated', 'deactivated', 'deactivate', None),
- ('deactivated', 'asleep', None, 'workflow changed to "CWUser"'),
- ('asleep', 'activated', None, 'workflow changed to "default user workflow"'),])
+ with self.admin_access.web_request() as req:
+ member = req.entity_from_eid(self.member_eid)
+ iworkflowable = member.cw_adapt_to('IWorkflowable')
+ iworkflowable.fire_transition('deactivate')
+ req.cnx.commit()
+ with self.admin_access.shell() as shell:
+ wf = add_wf(shell, 'CWUser')
+ wf.add_state('asleep', initial=True)
+ shell.rqlexec('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
+ {'wf': wf.eid, 'x': self.member_eid})
+ shell.commit()
+ with self.admin_access.web_request() as req:
+ req.execute('DELETE X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
+ {'wf': wf.eid, 'x': self.member_eid})
+ member = req.entity_from_eid(self.member_eid)
+ iworkflowable = member.cw_adapt_to('IWorkflowable')
+ self.assertEqual(iworkflowable.state, 'asleep')# no change before commit
+ req.cnx.commit()
+ member.cw_clear_all_caches()
+ self.assertEqual(iworkflowable.current_workflow.name, "default user workflow")
+ self.assertEqual(iworkflowable.state, 'activated')
+ self.assertEqual(parse_hist(iworkflowable.workflow_history),
+ [('activated', 'deactivated', 'deactivate', None),
+ ('deactivated', 'asleep', None, 'workflow changed to "CWUser"'),
+ ('asleep', 'activated', None, 'workflow changed to "default user workflow"'),])
class AutoTransitionTC(CubicWebTC):
def setup_custom_wf(self):
- wf = add_wf(self, 'CWUser')
- asleep = wf.add_state('asleep', initial=True)
- dead = wf.add_state('dead')
- wf.add_transition('rest', asleep, asleep)
- wf.add_transition('sick', asleep, dead, type=u'auto',
- conditions=({'expr': u'X surname "toto"',
- 'mainvars': u'X'},))
+ with self.admin_access.shell() as shell:
+ wf = add_wf(shell, 'CWUser')
+ asleep = wf.add_state('asleep', initial=True)
+ dead = wf.add_state('dead')
+ wf.add_transition('rest', asleep, asleep)
+ wf.add_transition('sick', asleep, dead, type=u'auto',
+ conditions=({'expr': u'X surname "toto"',
+ 'mainvars': u'X'},))
return wf
def test_auto_transition_fired(self):
wf = self.setup_custom_wf()
- req = self.request()
- user = self.create_user(req, 'member')
- iworkflowable = user.cw_adapt_to('IWorkflowable')
- self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
- {'wf': wf.eid, 'x': user.eid})
- self.commit()
- user.cw_clear_all_caches()
- self.assertEqual(iworkflowable.state, 'asleep')
- self.assertEqual([t.name for t in iworkflowable.possible_transitions()],
- ['rest'])
- iworkflowable.fire_transition('rest')
- self.commit()
- user.cw_clear_all_caches()
- self.assertEqual(iworkflowable.state, 'asleep')
- self.assertEqual([t.name for t in iworkflowable.possible_transitions()],
- ['rest'])
- self.assertEqual(parse_hist(iworkflowable.workflow_history),
- [('asleep', 'asleep', 'rest', None)])
- user.cw_set(surname=u'toto') # fulfill condition
- self.commit()
- iworkflowable.fire_transition('rest')
- self.commit()
- user.cw_clear_all_caches()
- self.assertEqual(iworkflowable.state, 'dead')
- self.assertEqual(parse_hist(iworkflowable.workflow_history),
- [('asleep', 'asleep', 'rest', None),
- ('asleep', 'asleep', 'rest', None),
- ('asleep', 'dead', 'sick', None),])
+ with self.admin_access.web_request() as req:
+ user = self.create_user(req, 'member')
+ iworkflowable = user.cw_adapt_to('IWorkflowable')
+ req.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
+ {'wf': wf.eid, 'x': user.eid})
+ req.cnx.commit()
+ user.cw_clear_all_caches()
+ self.assertEqual(iworkflowable.state, 'asleep')
+ self.assertEqual([t.name for t in iworkflowable.possible_transitions()],
+ ['rest'])
+ iworkflowable.fire_transition('rest')
+ req.cnx.commit()
+ user.cw_clear_all_caches()
+ self.assertEqual(iworkflowable.state, 'asleep')
+ self.assertEqual([t.name for t in iworkflowable.possible_transitions()],
+ ['rest'])
+ self.assertEqual(parse_hist(iworkflowable.workflow_history),
+ [('asleep', 'asleep', 'rest', None)])
+ user.cw_set(surname=u'toto') # fulfill condition
+ req.cnx.commit()
+ iworkflowable.fire_transition('rest')
+ req.cnx.commit()
+ user.cw_clear_all_caches()
+ self.assertEqual(iworkflowable.state, 'dead')
+ self.assertEqual(parse_hist(iworkflowable.workflow_history),
+ [('asleep', 'asleep', 'rest', None),
+ ('asleep', 'asleep', 'rest', None),
+ ('asleep', 'dead', 'sick', None),])
def test_auto_transition_custom_initial_state_fired(self):
wf = self.setup_custom_wf()
- req = self.request()
- user = self.create_user(req, 'member', surname=u'toto')
- self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
+ with self.admin_access.web_request() as req:
+ user = self.create_user(req, 'member', surname=u'toto')
+ req.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
{'wf': wf.eid, 'x': user.eid})
- self.commit()
- iworkflowable = user.cw_adapt_to('IWorkflowable')
- self.assertEqual(iworkflowable.state, 'dead')
+ req.cnx.commit()
+ iworkflowable = user.cw_adapt_to('IWorkflowable')
+ self.assertEqual(iworkflowable.state, 'dead')
def test_auto_transition_initial_state_fired(self):
- wf = self.execute('Any WF WHERE ET default_workflow WF, '
- 'ET name %(et)s', {'et': 'CWUser'}).get_entity(0, 0)
- dead = wf.add_state('dead')
- wf.add_transition('sick', wf.state_by_name('activated'), dead,
- type=u'auto', conditions=({'expr': u'X surname "toto"',
- 'mainvars': u'X'},))
- self.commit()
- req = self.request()
- user = self.create_user(req, 'member', surname=u'toto')
- self.commit()
- iworkflowable = user.cw_adapt_to('IWorkflowable')
- self.assertEqual(iworkflowable.state, 'dead')
+ with self.admin_access.web_request() as req:
+ wf = req.execute('Any WF WHERE ET default_workflow WF, '
+ 'ET name %(et)s', {'et': 'CWUser'}).get_entity(0, 0)
+ dead = wf.add_state('dead')
+ wf.add_transition('sick', wf.state_by_name('activated'), dead,
+ type=u'auto', conditions=({'expr': u'X surname "toto"',
+ 'mainvars': u'X'},))
+ req.cnx.commit()
+ with self.admin_access.web_request() as req:
+ user = self.create_user(req, 'member', surname=u'toto')
+ req.cnx.commit()
+ iworkflowable = user.cw_adapt_to('IWorkflowable')
+ self.assertEqual(iworkflowable.state, 'dead')
class WorkflowHooksTC(CubicWebTC):
def setUp(self):
CubicWebTC.setUp(self)
- req = self.request()
- self.wf = req.user.cw_adapt_to('IWorkflowable').current_workflow
- self.s_activated = self.wf.state_by_name('activated').eid
- self.s_deactivated = self.wf.state_by_name('deactivated').eid
- self.s_dummy = self.wf.add_state(u'dummy').eid
- self.wf.add_transition(u'dummy', (self.s_deactivated,), self.s_dummy)
- ueid = self.create_user(req, 'stduser', commit=False).eid
- # test initial state is set
- rset = self.execute('Any N WHERE S name N, X in_state S, X eid %(x)s',
- {'x' : ueid})
- self.assertFalse(rset, rset.rows)
- self.commit()
- initialstate = self.execute('Any N WHERE S name N, X in_state S, X eid %(x)s',
- {'x' : ueid})[0][0]
- self.assertEqual(initialstate, u'activated')
- # give access to users group on the user's wf transitions
- # so we can test wf enforcing on euser (managers don't have anymore this
- # enforcement
- self.execute('SET X require_group G '
- 'WHERE G name "users", X transition_of WF, WF eid %(wf)s',
- {'wf': self.wf.eid})
- self.commit()
+ with self.admin_access.web_request() as req:
+ self.wf = req.user.cw_adapt_to('IWorkflowable').current_workflow
+ self.s_activated = self.wf.state_by_name('activated').eid
+ self.s_deactivated = self.wf.state_by_name('deactivated').eid
+ self.s_dummy = self.wf.add_state(u'dummy').eid
+ self.wf.add_transition(u'dummy', (self.s_deactivated,), self.s_dummy)
+ ueid = self.create_user(req, 'stduser', commit=False).eid
+ # test initial state is set
+ rset = req.execute('Any N WHERE S name N, X in_state S, X eid %(x)s',
+ {'x' : ueid})
+ self.assertFalse(rset, rset.rows)
+ req.cnx.commit()
+ initialstate = req.execute('Any N WHERE S name N, X in_state S, X eid %(x)s',
+ {'x' : ueid})[0][0]
+ self.assertEqual(initialstate, u'activated')
+ # give access to users group on the user's wf transitions
+ # so we can test wf enforcing on euser (managers don't have anymore this
+ # enforcement
+ req.execute('SET X require_group G '
+ 'WHERE G name "users", X transition_of WF, WF eid %(wf)s',
+ {'wf': self.wf.eid})
+ req.cnx.commit()
# XXX currently, we've to rely on hooks to set initial state, or to use execute
# def test_initial_state(self):
@@ -603,42 +630,37 @@
return ' '.join(lmsg)
def test_transition_checking1(self):
- cnx = self.login('stduser')
- user = cnx.user(self.session)
- iworkflowable = user.cw_adapt_to('IWorkflowable')
- with self.assertRaises(ValidationError) as cm:
- iworkflowable.fire_transition('activate')
- self.assertEqual(self._cleanup_msg(cm.exception.errors['by_transition-subject']),
- u"transition isn't allowed from")
- cnx.close()
+ with self.new_access('stduser').repo_cnx() as cnx:
+ user = cnx.user
+ iworkflowable = user.cw_adapt_to('IWorkflowable')
+ with self.assertRaises(ValidationError) as cm:
+ iworkflowable.fire_transition('activate')
+ self.assertEqual(self._cleanup_msg(cm.exception.errors['by_transition-subject']),
+ u"transition isn't allowed from")
def test_transition_checking2(self):
- cnx = self.login('stduser')
- user = cnx.user(self.session)
- iworkflowable = user.cw_adapt_to('IWorkflowable')
- with self.assertRaises(ValidationError) as cm:
- iworkflowable.fire_transition('dummy')
- self.assertEqual(self._cleanup_msg(cm.exception.errors['by_transition-subject']),
- u"transition isn't allowed from")
- cnx.close()
+ with self.new_access('stduser').repo_cnx() as cnx:
+ user = cnx.user
+ iworkflowable = user.cw_adapt_to('IWorkflowable')
+ with self.assertRaises(ValidationError) as cm:
+ iworkflowable.fire_transition('dummy')
+ self.assertEqual(self._cleanup_msg(cm.exception.errors['by_transition-subject']),
+ u"transition isn't allowed from")
def test_transition_checking3(self):
- with self.login('stduser') as cnx:
- session = self.session
- user = self.user()
+ with self.new_access('stduser').repo_cnx() as cnx:
+ user = cnx.user
iworkflowable = user.cw_adapt_to('IWorkflowable')
iworkflowable.fire_transition('deactivate')
- session.commit()
- session.set_cnxset()
+ cnx.commit()
with self.assertRaises(ValidationError) as cm:
iworkflowable.fire_transition('deactivate')
self.assertEqual(self._cleanup_msg(cm.exception.errors['by_transition-subject']),
u"transition isn't allowed from")
- session.rollback()
- session.set_cnxset()
+ cnx.rollback()
# get back now
iworkflowable.fire_transition('activate')
- session.commit()
+ cnx.commit()
if __name__ == '__main__':