entities/test/unittest_wfobjs.py
branch3.5
changeset 2920 64322aa83a1d
child 2939 a613cc003ab7
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/entities/test/unittest_wfobjs.py	Thu Aug 20 17:44:27 2009 +0200
@@ -0,0 +1,251 @@
+from cubicweb.devtools.apptest import EnvBasedTC
+from cubicweb import ValidationError
+
+class WorkflowTC(EnvBasedTC):
+
+    def setup_database(self):
+        rschema = self.schema['in_state']
+        for x, y in rschema.iter_rdefs():
+            self.assertEquals(rschema.rproperty(x, y, 'cardinality'), '1*')
+        self.member = self.create_user('member')
+
+    def test_workflow_base(self):
+        e = self.create_user('toto')
+        self.assertEquals(e.state, 'activated')
+        e.change_state('deactivated', u'deactivate 1')
+        self.commit()
+        e.change_state('activated', u'activate 1')
+        self.commit()
+        e.change_state('deactivated', u'deactivate 2')
+        self.commit()
+        e.clear_related_cache('wf_info_for', 'object')
+        self.assertEquals([tr.comment for tr in e.reverse_wf_info_for],
+                          ['deactivate 1', 'activate 1', 'deactivate 2'])
+        self.assertEquals(e.latest_trinfo().comment, 'deactivate 2')
+
+    # def test_wf_construction(self): # XXX update or kill me
+    #     bar = self.mh.cmd_add_state(u'bar', ('Personne', 'Email'), initial=True)
+    #     baz = self.mh.cmd_add_transition(u'baz', ('Personne', 'Email'),
+    #                                      (foo,), bar, ('managers',))
+    #     for etype in ('Personne', 'Email'):
+    #         t1 = self.mh.rqlexec('Any N WHERE T transition_of ET, ET name "%s", T name N' %
+    #                              etype)[0][0]
+    #         self.assertEquals(t1, "baz")
+    #     gn = self.mh.rqlexec('Any GN WHERE T require_group G, G name GN, T eid %s' % baz)[0][0]
+    #     self.assertEquals(gn, 'managers')
+
+    def test_possible_transitions(self):
+        user = self.entity('CWUser X')
+        trs = list(user.possible_transitions())
+        self.assertEquals(len(trs), 1)
+        self.assertEquals(trs[0].name, u'deactivate')
+        self.assertEquals(trs[0].destination().name, u'deactivated')
+        # test a std user get no possible transition
+        cnx = self.login('member')
+        # fetch the entity using the new session
+        trs = list(cnx.user().possible_transitions())
+        self.assertEquals(len(trs), 0)
+
+    def _test_manager_deactivate(self, user):
+        user.clear_related_cache('in_state', 'subject')
+        self.assertEquals(len(user.in_state), 1)
+        self.assertEquals(user.state, 'deactivated')
+        trinfo = user.latest_trinfo()
+        self.assertEquals(trinfo.previous_state.name, 'activated')
+        self.assertEquals(trinfo.new_state.name, 'deactivated')
+        self.assertEquals(trinfo.comment, 'deactivate user')
+        self.assertEquals(trinfo.comment_format, 'text/plain')
+        return trinfo
+
+    def test_change_state(self):
+        user = self.user()
+        user.change_state('deactivated', comment=u'deactivate user')
+        trinfo = self._test_manager_deactivate(user)
+        self.assertEquals(trinfo.transition, None)
+
+    def test_fire_transition(self):
+        user = self.user()
+        user.fire_transition('deactivate', comment=u'deactivate user')
+        self.assertEquals(user.state, 'deactivated')
+        self._test_manager_deactivate(user)
+        trinfo = self._test_manager_deactivate(user)
+        self.assertEquals(trinfo.transition.name, 'deactivate')
+
+    # XXX test managers can change state without matching transition
+
+    def _test_stduser_deactivate(self):
+        ueid = self.member.eid
+        self.create_user('tutu')
+        cnx = self.login('tutu')
+        req = self.request()
+        member = req.entity_from_eid(self.member.eid)
+        ex = self.assertRaises(ValidationError,
+                               member.fire_transition, 'deactivate')
+        self.assertEquals(ex.errors, {'by_transition': "transition may not be fired"})
+        cnx.close()
+        cnx = self.login('member')
+        req = self.request()
+        member = req.entity_from_eid(self.member.eid)
+        member.fire_transition('deactivate')
+        cnx.commit()
+        ex = self.assertRaises(ValidationError,
+                               member.fire_transition, 'activate')
+        self.assertEquals(ex.errors, {'by_transition': "transition may not be fired"})
+
+    def test_fire_transition_owned_by(self):
+        self.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", '
+                     'X expression "X owned_by U", T condition X '
+                     'WHERE T name "deactivate"')
+        self._test_stduser_deactivate()
+
+    def test_fire_transition_has_update_perm(self):
+        self.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", '
+                     'X expression "U has_update_permission X", T condition X '
+                     'WHERE T name "deactivate"')
+        self._test_stduser_deactivate()
+
+    def _init_wf_with_shared_state_or_tr(self):
+        req = self.request()
+        etypes = dict(self.execute('Any N, ET WHERE ET is CWEType, ET name N'
+                                   ', ET name IN ("CWGroup", "Bookmark")'))
+        self.grpwf = req.create_entity('Workflow', ('workflow_of', 'ET'),
+                                       ET=etypes['CWGroup'], name=u'group workflow')
+        self.bmkwf = req.execute('Any X WHERE X is Workflow, X workflow_of ET, ET name "Bookmark"').get_entity(0, 0)
+        self.state1 = self.grpwf.add_state(u'state1', initial=True)
+        self.execute('SET S state_of WF WHERE S eid %(s)s, WF eid %(wf)s',
+                     {'s': self.state1.eid, 'wf': self.bmkwf.eid})
+        self.execute('SET WF initial_state S WHERE S eid %(s)s, WF eid %(wf)s',
+                     {'s': self.state1.eid, 'wf': self.bmkwf.eid})
+        self.state2 = self.grpwf.add_state(u'state2')
+        self.group = self.add_entity('CWGroup', name=u't1')
+        self.bookmark = self.add_entity('Bookmark', title=u'111', path=u'/view')
+        # commit to link to the initial state
+        self.commit()
+
+    def test_transitions_selection(self):
+        """
+        ------------------------  tr1    -----------------
+        | state1 (CWGroup, Bookmark) | ------> | state2 (CWGroup) |
+        ------------------------         -----------------
+                  |  tr2    ------------------
+                  `------>  | state3 (Bookmark) |
+                            ------------------
+        """
+        self._init_wf_with_shared_state_or_tr()
+        state3 = self.bmkwf.add_state(u'state3')
+        tr1 = self.grpwf.add_transition(u'tr1', (self.state1,), self.state2)
+        tr2 = self.bmkwf.add_transition(u'tr2', (self.state1,), state3)
+        transitions = list(self.group.possible_transitions())
+        self.assertEquals(len(transitions), 1)
+        self.assertEquals(transitions[0].name, 'tr1')
+        transitions = list(self.bookmark.possible_transitions())
+        self.assertEquals(len(transitions), 1)
+        self.assertEquals(transitions[0].name, 'tr2')
+
+
+    def test_transitions_selection2(self):
+        """
+        ------------------------  tr1 (Bookmark)   -----------------------
+        | state1 (CWGroup, Bookmark) | -------------> | state2 (CWGroup,Bookmark) |
+        ------------------------                -----------------------
+                  |  tr2 (CWGroup)                     |
+                  `---------------------------------/
+        """
+        self._init_wf_with_shared_state_or_tr()
+        self.execute('SET S state_of WF WHERE S eid %(s)s, WF eid %(wf)s',
+                     {'s': self.state2.eid, 'wf': self.bmkwf.eid})
+        tr1 = self.bmkwf.add_transition(u'tr1', (self.state1,), self.state2)
+        tr2 = self.grpwf.add_transition(u'tr2', (self.state1,), self.state2)
+        transitions = list(self.group.possible_transitions())
+        self.assertEquals(len(transitions), 1)
+        self.assertEquals(transitions[0].name, 'tr2')
+        transitions = list(self.bookmark.possible_transitions())
+        self.assertEquals(len(transitions), 1)
+        self.assertEquals(transitions[0].name, 'tr1')
+
+
+from cubicweb.devtools.apptest import RepositoryBasedTC
+
+class WorkflowHooksTC(RepositoryBasedTC):
+
+    def setUp(self):
+        RepositoryBasedTC.setUp(self)
+        self.wf = self.session.user.current_workflow
+        self.s_activated = self.wf.state_by_name('activated').eid
+        self.s_deactivated = self.wf.state_by_name('deactivated').eid
+        self.s_dummy = self.wf.add_state(u'dummy').eid
+        self.wf.add_transition(u'dummy', (self.s_deactivated,), self.s_dummy)
+        ueid = self.create_user('stduser', commit=False)
+        # test initial state is set
+        rset = self.execute('Any N WHERE S name N, X in_state S, X eid %(x)s',
+                            {'x' : ueid})
+        self.failIf(rset, rset.rows)
+        self.commit()
+        initialstate = self.execute('Any N WHERE S name N, X in_state S, X eid %(x)s',
+                                    {'x' : ueid})[0][0]
+        self.assertEquals(initialstate, u'activated')
+        # give access to users group on the user's wf transitions
+        # so we can test wf enforcing on euser (managers don't have anymore this
+        # enforcement
+        self.execute('SET X require_group G '
+                     'WHERE G name "users", X transition_of WF, WF eid %(wf)s',
+                     {'wf': self.wf.eid})
+        self.commit()
+
+    def tearDown(self):
+        self.execute('DELETE X require_group G '
+                     'WHERE G name "users", X transition_of WF, WF eid %(wf)s',
+                     {'wf': self.wf.eid})
+        self.commit()
+        RepositoryBasedTC.tearDown(self)
+
+    # XXX currently, we've to rely on hooks to set initial state, or to use unsafe_execute
+    # def test_initial_state(self):
+    #     cnx = self.login('stduser')
+    #     cu = cnx.cursor()
+    #     self.assertRaises(ValidationError, cu.execute,
+    #                       'INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, '
+    #                       'X in_state S WHERE S name "deactivated"', {'pwd': 'oops'})
+    #     cnx.close()
+    #     # though managers can do whatever he want
+    #     self.execute('INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, '
+    #                  'X in_state S, X in_group G WHERE S name "deactivated", G name "users"', {'pwd': 'oops'})
+    #     self.commit()
+
+    # test that the workflow is correctly enforced
+    def test_transition_checking1(self):
+        cnx = self.login('stduser')
+        user = cnx.user(self.current_session())
+        ex = self.assertRaises(ValidationError,
+                               user.fire_transition, 'activate')
+        self.assertEquals(ex.errors, {'by_transition': u"transition isn't allowed"})
+        cnx.close()
+
+    def test_transition_checking2(self):
+        cnx = self.login('stduser')
+        user = cnx.user(self.current_session())
+        assert user.state == 'activated'
+        ex = self.assertRaises(ValidationError,
+                               user.fire_transition, 'dummy')
+        self.assertEquals(ex.errors, {'by_transition': u"transition isn't allowed"})
+        cnx.close()
+
+    def test_transition_checking3(self):
+        cnx = self.login('stduser')
+        session = self.current_session()
+        user = cnx.user(session)
+        user.fire_transition('deactivate')
+        cnx.commit()
+        session.set_pool()
+        ex = self.assertRaises(ValidationError,
+                               user.fire_transition, 'deactivate')
+        self.assertEquals(ex.errors, {'by_transition': u"transition isn't allowed"})
+        # get back now
+        user.fire_transition('activate')
+        cnx.commit()
+        cnx.close()
+
+
+if __name__ == '__main__':
+    from logilab.common.testlib import unittest_main
+    unittest_main()