--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/entities/test/unittest_wfobjs.py Fri Aug 21 16:26:20 2009 +0200
@@ -0,0 +1,380 @@
+from cubicweb.devtools.testlib import CubicWebTC
+from cubicweb import ValidationError
+
+def add_wf(self, etype, name=None):
+ if name is None:
+ name = unicode(etype)
+ wf = self.execute('INSERT Workflow X: X name %(n)s', {'n': name}).get_entity(0, 0)
+ self.execute('SET WF workflow_of ET WHERE WF eid %(wf)s, ET name %(et)s',
+ {'wf': wf.eid, 'et': etype})
+ return wf
+
+def parse_hist(wfhist):
+ return [(ti.previous_state.name, ti.new_state.name,
+ ti.transition and ti.transition.name, ti.comment)
+ for ti in wfhist]
+
+
+class WorkflowBuildingTC(CubicWebTC):
+
+ def test_wf_construction(self):
+ wf = add_wf(self, 'Company')
+ foo = wf.add_state(u'foo', initial=True)
+ bar = wf.add_state(u'bar')
+ self.assertEquals(wf.state_by_name('bar').eid, bar.eid)
+ self.assertEquals(wf.state_by_name('barrr'), None)
+ baz = wf.add_transition(u'baz', (foo,), bar, ('managers',))
+ self.assertEquals(wf.transition_by_name('baz').eid, baz.eid)
+ self.assertEquals(len(baz.require_group), 1)
+ self.assertEquals(baz.require_group[0].name, 'managers')
+
+ def test_duplicated_state(self):
+ wf = add_wf(self, 'Company')
+ wf.add_state(u'foo', initial=True)
+ wf.add_state(u'foo')
+ ex = self.assertRaises(ValidationError, self.commit)
+ # XXX enhance message
+ self.assertEquals(ex.errors, {'state_of': 'unique constraint S name N, Y state_of O, Y name N failed'})
+
+ def test_duplicated_transition(self):
+ wf = add_wf(self, 'Company')
+ foo = wf.add_state(u'foo', initial=True)
+ bar = wf.add_state(u'bar')
+ wf.add_transition(u'baz', (foo,), bar, ('managers',))
+ wf.add_transition(u'baz', (bar,), foo)
+ ex = self.assertRaises(ValidationError, self.commit)
+ # XXX enhance message
+ self.assertEquals(ex.errors, {'transition_of': 'unique constraint S name N, Y transition_of O, Y name N failed'})
+
+
+class WorkflowTC(CubicWebTC):
+
+ def setup_database(self):
+ rschema = self.schema['in_state']
+ for x, y in rschema.iter_rdefs():
+ self.assertEquals(rschema.rproperty(x, y, 'cardinality'), '1*')
+ self.member = self.create_user('member')
+
+ def test_workflow_base(self):
+ e = self.create_user('toto')
+ self.assertEquals(e.state, 'activated')
+ e.change_state('deactivated', u'deactivate 1')
+ self.commit()
+ e.change_state('activated', u'activate 1')
+ self.commit()
+ e.change_state('deactivated', u'deactivate 2')
+ self.commit()
+ e.clear_related_cache('wf_info_for', 'object')
+ self.assertEquals([tr.comment for tr in e.reverse_wf_info_for],
+ ['deactivate 1', 'activate 1', 'deactivate 2'])
+ self.assertEquals(e.latest_trinfo().comment, 'deactivate 2')
+
+ def test_possible_transitions(self):
+ user = self.entity('CWUser X')
+ trs = list(user.possible_transitions())
+ self.assertEquals(len(trs), 1)
+ self.assertEquals(trs[0].name, u'deactivate')
+ self.assertEquals(trs[0].destination().name, u'deactivated')
+ # test a std user get no possible transition
+ cnx = self.login('member')
+ # fetch the entity using the new session
+ trs = list(cnx.user().possible_transitions())
+ self.assertEquals(len(trs), 0)
+
+ def _test_manager_deactivate(self, user):
+ user.clear_related_cache('in_state', 'subject')
+ self.assertEquals(len(user.in_state), 1)
+ self.assertEquals(user.state, 'deactivated')
+ trinfo = user.latest_trinfo()
+ self.assertEquals(trinfo.previous_state.name, 'activated')
+ self.assertEquals(trinfo.new_state.name, 'deactivated')
+ self.assertEquals(trinfo.comment, 'deactivate user')
+ self.assertEquals(trinfo.comment_format, 'text/plain')
+ return trinfo
+
+ def test_change_state(self):
+ user = self.user()
+ user.change_state('deactivated', comment=u'deactivate user')
+ trinfo = self._test_manager_deactivate(user)
+ self.assertEquals(trinfo.transition, None)
+
+ def test_fire_transition(self):
+ user = self.user()
+ user.fire_transition('deactivate', comment=u'deactivate user')
+ self.assertEquals(user.state, 'deactivated')
+ self._test_manager_deactivate(user)
+ trinfo = self._test_manager_deactivate(user)
+ self.assertEquals(trinfo.transition.name, 'deactivate')
+
+ # XXX test managers can change state without matching transition
+
+ def _test_stduser_deactivate(self):
+ ueid = self.member.eid
+ self.create_user('tutu')
+ cnx = self.login('tutu')
+ req = self.request()
+ member = req.entity_from_eid(self.member.eid)
+ ex = self.assertRaises(ValidationError,
+ member.fire_transition, 'deactivate')
+ self.assertEquals(ex.errors, {'by_transition': "transition may not be fired"})
+ cnx.close()
+ cnx = self.login('member')
+ req = self.request()
+ member = req.entity_from_eid(self.member.eid)
+ member.fire_transition('deactivate')
+ cnx.commit()
+ ex = self.assertRaises(ValidationError,
+ member.fire_transition, 'activate')
+ self.assertEquals(ex.errors, {'by_transition': "transition may not be fired"})
+
+ def test_fire_transition_owned_by(self):
+ self.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", '
+ 'X expression "X owned_by U", T condition X '
+ 'WHERE T name "deactivate"')
+ self._test_stduser_deactivate()
+
+ def test_fire_transition_has_update_perm(self):
+ self.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", '
+ 'X expression "U has_update_permission X", T condition X '
+ 'WHERE T name "deactivate"')
+ self._test_stduser_deactivate()
+
+ def _init_wf_with_shared_state_or_tr(self):
+ req = self.request()
+ etypes = dict(self.execute('Any N, ET WHERE ET is CWEType, ET name N'
+ ', ET name IN ("CWGroup", "Bookmark")'))
+ self.grpwf = req.create_entity('Workflow', ('workflow_of', 'ET'),
+ ET=etypes['CWGroup'], name=u'group workflow')
+ self.bmkwf = req.execute('Any X WHERE X is Workflow, X workflow_of ET, ET name "Bookmark"').get_entity(0, 0)
+ self.state1 = self.grpwf.add_state(u'state1', initial=True)
+ self.execute('SET S state_of WF WHERE S eid %(s)s, WF eid %(wf)s',
+ {'s': self.state1.eid, 'wf': self.bmkwf.eid})
+ self.execute('SET WF initial_state S WHERE S eid %(s)s, WF eid %(wf)s',
+ {'s': self.state1.eid, 'wf': self.bmkwf.eid})
+ self.state2 = self.grpwf.add_state(u'state2')
+ self.group = self.add_entity('CWGroup', name=u't1')
+ self.bookmark = self.add_entity('Bookmark', title=u'111', path=u'/view')
+ # commit to link to the initial state
+ self.commit()
+
+ def test_transitions_selection(self):
+ """
+ ------------------------ tr1 -----------------
+ | state1 (CWGroup, Bookmark) | ------> | state2 (CWGroup) |
+ ------------------------ -----------------
+ | tr2 ------------------
+ `------> | state3 (Bookmark) |
+ ------------------
+ """
+ self._init_wf_with_shared_state_or_tr()
+ state3 = self.bmkwf.add_state(u'state3')
+ tr1 = self.grpwf.add_transition(u'tr1', (self.state1,), self.state2)
+ tr2 = self.bmkwf.add_transition(u'tr2', (self.state1,), state3)
+ transitions = list(self.group.possible_transitions())
+ self.assertEquals(len(transitions), 1)
+ self.assertEquals(transitions[0].name, 'tr1')
+ transitions = list(self.bookmark.possible_transitions())
+ self.assertEquals(len(transitions), 1)
+ self.assertEquals(transitions[0].name, 'tr2')
+
+
+ def test_transitions_selection2(self):
+ """
+ ------------------------ tr1 (Bookmark) -----------------------
+ | state1 (CWGroup, Bookmark) | -------------> | state2 (CWGroup,Bookmark) |
+ ------------------------ -----------------------
+ | tr2 (CWGroup) |
+ `---------------------------------/
+ """
+ self._init_wf_with_shared_state_or_tr()
+ self.execute('SET S state_of WF WHERE S eid %(s)s, WF eid %(wf)s',
+ {'s': self.state2.eid, 'wf': self.bmkwf.eid})
+ tr1 = self.bmkwf.add_transition(u'tr1', (self.state1,), self.state2)
+ tr2 = self.grpwf.add_transition(u'tr2', (self.state1,), self.state2)
+ transitions = list(self.group.possible_transitions())
+ self.assertEquals(len(transitions), 1)
+ self.assertEquals(transitions[0].name, 'tr2')
+ transitions = list(self.bookmark.possible_transitions())
+ self.assertEquals(len(transitions), 1)
+ self.assertEquals(transitions[0].name, 'tr1')
+
+
+class CustomWorkflowTC(CubicWebTC):
+
+ def setup_database(self):
+ self.member = self.create_user('member')
+
+ def test_custom_wf_replace_state_no_history(self):
+ """member in inital state with no previous history, state is simply
+ redirected when changing workflow
+ """
+ wf = add_wf(self, 'CWUser')
+ wf.add_state('asleep', initial=True)
+ self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
+ {'wf': wf.eid, 'x': self.member.eid})
+ self.member.clear_all_caches()
+ self.assertEquals(self.member.state, 'activated')# no change before commit
+ self.commit()
+ self.member.clear_all_caches()
+ self.assertEquals(self.member.current_workflow.eid, wf.eid)
+ self.assertEquals(self.member.state, 'asleep')
+ self.assertEquals(self.member.workflow_history, [])
+
+ def test_custom_wf_replace_state_keep_history(self):
+ """member in inital state with some history, state is redirected and
+ state change is recorded to history
+ """
+ self.member.fire_transition('deactivate')
+ self.member.fire_transition('activate')
+ wf = add_wf(self, 'CWUser')
+ wf.add_state('asleep', initial=True)
+ self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
+ {'wf': wf.eid, 'x': self.member.eid})
+ self.commit()
+ self.member.clear_all_caches()
+ self.assertEquals(self.member.current_workflow.eid, wf.eid)
+ self.assertEquals(self.member.state, 'asleep')
+ self.assertEquals(parse_hist(self.member.workflow_history),
+ [('activated', 'deactivated', 'deactivate', None),
+ ('deactivated', 'activated', 'activate', None),
+ ('activated', 'asleep', None, 'workflow changed to "CWUser"')])
+
+ def test_custom_wf_shared_state(self):
+ """member in some state shared by the new workflow, nothing has to be
+ done
+ """
+ self.member.fire_transition('deactivate')
+ self.assertEquals(self.member.state, 'deactivated')
+ wf = add_wf(self, 'CWUser')
+ wf.add_state('asleep', initial=True)
+ self.execute('SET S state_of WF WHERE S name "deactivated", WF eid %(wf)s',
+ {'wf': wf.eid})
+ self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
+ {'wf': wf.eid, 'x': self.member.eid})
+ self.commit()
+ self.member.clear_all_caches()
+ self.assertEquals(self.member.current_workflow.eid, wf.eid)
+ self.assertEquals(self.member.state, 'deactivated')
+ self.assertEquals(parse_hist(self.member.workflow_history),
+ [('activated', 'deactivated', 'deactivate', None)])
+
+ def test_custom_wf_no_initial_state(self):
+ """try to set a custom workflow which has no initial state"""
+ self.member.fire_transition('deactivate')
+ wf = add_wf(self, 'CWUser')
+ wf.add_state('asleep')
+ self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
+ {'wf': wf.eid, 'x': self.member.eid})
+ ex = self.assertRaises(ValidationError, self.commit)
+ self.assertEquals(ex.errors, {'custom_workflow': u'workflow has no initial state'})
+
+ def test_custom_wf_bad_etype(self):
+ """try to set a custom workflow which has no initial state"""
+ self.member.fire_transition('deactivate')
+ wf = add_wf(self, 'Company')
+ wf.add_state('asleep', initial=True)
+ self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
+ {'wf': wf.eid, 'x': self.member.eid})
+ ex = self.assertRaises(ValidationError, self.commit)
+ self.assertEquals(ex.errors, {'custom_workflow': 'constraint S is ET, O workflow_of ET failed'})
+
+ def test_del_custom_wf(self):
+ """member in some state shared by the new workflow, nothing has to be
+ done
+ """
+ self.member.fire_transition('deactivate')
+ wf = add_wf(self, 'CWUser')
+ wf.add_state('asleep', initial=True)
+ self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
+ {'wf': wf.eid, 'x': self.member.eid})
+ self.commit()
+ self.execute('DELETE X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
+ {'wf': wf.eid, 'x': self.member.eid})
+ self.member.clear_all_caches()
+ self.assertEquals(self.member.state, 'asleep')# no change before commit
+ self.commit()
+ self.member.clear_all_caches()
+ self.assertEquals(self.member.current_workflow.name, "CWUser workflow")
+ self.assertEquals(self.member.state, 'activated')
+ self.assertEquals(parse_hist(self.member.workflow_history),
+ [('activated', 'deactivated', 'deactivate', None),
+ ('deactivated', 'asleep', None, 'workflow changed to "CWUser"'),
+ ('asleep', 'activated', None, 'workflow changed to "CWUser workflow"'),])
+
+
+class WorkflowHooksTC(CubicWebTC):
+
+ def setUp(self):
+ CubicWebTC.setUp(self)
+ self.wf = self.session.user.current_workflow
+ self.session.set_pool()
+ self.s_activated = self.wf.state_by_name('activated').eid
+ self.s_deactivated = self.wf.state_by_name('deactivated').eid
+ self.s_dummy = self.wf.add_state(u'dummy').eid
+ self.wf.add_transition(u'dummy', (self.s_deactivated,), self.s_dummy)
+ ueid = self.create_user('stduser', commit=False).eid
+ # test initial state is set
+ rset = self.execute('Any N WHERE S name N, X in_state S, X eid %(x)s',
+ {'x' : ueid})
+ self.failIf(rset, rset.rows)
+ self.commit()
+ initialstate = self.execute('Any N WHERE S name N, X in_state S, X eid %(x)s',
+ {'x' : ueid})[0][0]
+ self.assertEquals(initialstate, u'activated')
+ # give access to users group on the user's wf transitions
+ # so we can test wf enforcing on euser (managers don't have anymore this
+ # enforcement
+ self.execute('SET X require_group G '
+ 'WHERE G name "users", X transition_of WF, WF eid %(wf)s',
+ {'wf': self.wf.eid})
+ self.commit()
+
+ # XXX currently, we've to rely on hooks to set initial state, or to use unsafe_execute
+ # def test_initial_state(self):
+ # cnx = self.login('stduser')
+ # cu = cnx.cursor()
+ # self.assertRaises(ValidationError, cu.execute,
+ # 'INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, '
+ # 'X in_state S WHERE S name "deactivated"', {'pwd': 'oops'})
+ # cnx.close()
+ # # though managers can do whatever he want
+ # self.execute('INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, '
+ # 'X in_state S, X in_group G WHERE S name "deactivated", G name "users"', {'pwd': 'oops'})
+ # self.commit()
+
+ # test that the workflow is correctly enforced
+ def test_transition_checking1(self):
+ cnx = self.login('stduser')
+ user = cnx.user(self.session)
+ ex = self.assertRaises(ValidationError,
+ user.fire_transition, 'activate')
+ self.assertEquals(ex.errors, {'by_transition': u"transition isn't allowed"})
+ cnx.close()
+
+ def test_transition_checking2(self):
+ cnx = self.login('stduser')
+ user = cnx.user(self.session)
+ ex = self.assertRaises(ValidationError,
+ user.fire_transition, 'dummy')
+ self.assertEquals(ex.errors, {'by_transition': u"transition isn't allowed"})
+ cnx.close()
+
+ def test_transition_checking3(self):
+ cnx = self.login('stduser')
+ session = self.session
+ user = cnx.user(session)
+ user.fire_transition('deactivate')
+ cnx.commit()
+ session.set_pool()
+ ex = self.assertRaises(ValidationError,
+ user.fire_transition, 'deactivate')
+ self.assertEquals(ex.errors, {'by_transition': u"transition isn't allowed"})
+ # get back now
+ user.fire_transition('activate')
+ cnx.commit()
+ cnx.close()
+
+
+if __name__ == '__main__':
+ from logilab.common.testlib import unittest_main
+ unittest_main()