entities/test/unittest_wfobjs.py
branch3.5
changeset 2920 64322aa83a1d
child 2939 a613cc003ab7
equal deleted inserted replaced
2919:662f35236d1c 2920:64322aa83a1d
       
     1 from cubicweb.devtools.apptest import EnvBasedTC
       
     2 from cubicweb import ValidationError
       
     3 
       
     4 class WorkflowTC(EnvBasedTC):
       
     5 
       
     6     def setup_database(self):
       
     7         rschema = self.schema['in_state']
       
     8         for x, y in rschema.iter_rdefs():
       
     9             self.assertEquals(rschema.rproperty(x, y, 'cardinality'), '1*')
       
    10         self.member = self.create_user('member')
       
    11 
       
    12     def test_workflow_base(self):
       
    13         e = self.create_user('toto')
       
    14         self.assertEquals(e.state, 'activated')
       
    15         e.change_state('deactivated', u'deactivate 1')
       
    16         self.commit()
       
    17         e.change_state('activated', u'activate 1')
       
    18         self.commit()
       
    19         e.change_state('deactivated', u'deactivate 2')
       
    20         self.commit()
       
    21         e.clear_related_cache('wf_info_for', 'object')
       
    22         self.assertEquals([tr.comment for tr in e.reverse_wf_info_for],
       
    23                           ['deactivate 1', 'activate 1', 'deactivate 2'])
       
    24         self.assertEquals(e.latest_trinfo().comment, 'deactivate 2')
       
    25 
       
    26     # def test_wf_construction(self): # XXX update or kill me
       
    27     #     bar = self.mh.cmd_add_state(u'bar', ('Personne', 'Email'), initial=True)
       
    28     #     baz = self.mh.cmd_add_transition(u'baz', ('Personne', 'Email'),
       
    29     #                                      (foo,), bar, ('managers',))
       
    30     #     for etype in ('Personne', 'Email'):
       
    31     #         t1 = self.mh.rqlexec('Any N WHERE T transition_of ET, ET name "%s", T name N' %
       
    32     #                              etype)[0][0]
       
    33     #         self.assertEquals(t1, "baz")
       
    34     #     gn = self.mh.rqlexec('Any GN WHERE T require_group G, G name GN, T eid %s' % baz)[0][0]
       
    35     #     self.assertEquals(gn, 'managers')
       
    36 
       
    37     def test_possible_transitions(self):
       
    38         user = self.entity('CWUser X')
       
    39         trs = list(user.possible_transitions())
       
    40         self.assertEquals(len(trs), 1)
       
    41         self.assertEquals(trs[0].name, u'deactivate')
       
    42         self.assertEquals(trs[0].destination().name, u'deactivated')
       
    43         # test a std user get no possible transition
       
    44         cnx = self.login('member')
       
    45         # fetch the entity using the new session
       
    46         trs = list(cnx.user().possible_transitions())
       
    47         self.assertEquals(len(trs), 0)
       
    48 
       
    49     def _test_manager_deactivate(self, user):
       
    50         user.clear_related_cache('in_state', 'subject')
       
    51         self.assertEquals(len(user.in_state), 1)
       
    52         self.assertEquals(user.state, 'deactivated')
       
    53         trinfo = user.latest_trinfo()
       
    54         self.assertEquals(trinfo.previous_state.name, 'activated')
       
    55         self.assertEquals(trinfo.new_state.name, 'deactivated')
       
    56         self.assertEquals(trinfo.comment, 'deactivate user')
       
    57         self.assertEquals(trinfo.comment_format, 'text/plain')
       
    58         return trinfo
       
    59 
       
    60     def test_change_state(self):
       
    61         user = self.user()
       
    62         user.change_state('deactivated', comment=u'deactivate user')
       
    63         trinfo = self._test_manager_deactivate(user)
       
    64         self.assertEquals(trinfo.transition, None)
       
    65 
       
    66     def test_fire_transition(self):
       
    67         user = self.user()
       
    68         user.fire_transition('deactivate', comment=u'deactivate user')
       
    69         self.assertEquals(user.state, 'deactivated')
       
    70         self._test_manager_deactivate(user)
       
    71         trinfo = self._test_manager_deactivate(user)
       
    72         self.assertEquals(trinfo.transition.name, 'deactivate')
       
    73 
       
    74     # XXX test managers can change state without matching transition
       
    75 
       
    76     def _test_stduser_deactivate(self):
       
    77         ueid = self.member.eid
       
    78         self.create_user('tutu')
       
    79         cnx = self.login('tutu')
       
    80         req = self.request()
       
    81         member = req.entity_from_eid(self.member.eid)
       
    82         ex = self.assertRaises(ValidationError,
       
    83                                member.fire_transition, 'deactivate')
       
    84         self.assertEquals(ex.errors, {'by_transition': "transition may not be fired"})
       
    85         cnx.close()
       
    86         cnx = self.login('member')
       
    87         req = self.request()
       
    88         member = req.entity_from_eid(self.member.eid)
       
    89         member.fire_transition('deactivate')
       
    90         cnx.commit()
       
    91         ex = self.assertRaises(ValidationError,
       
    92                                member.fire_transition, 'activate')
       
    93         self.assertEquals(ex.errors, {'by_transition': "transition may not be fired"})
       
    94 
       
    95     def test_fire_transition_owned_by(self):
       
    96         self.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", '
       
    97                      'X expression "X owned_by U", T condition X '
       
    98                      'WHERE T name "deactivate"')
       
    99         self._test_stduser_deactivate()
       
   100 
       
   101     def test_fire_transition_has_update_perm(self):
       
   102         self.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", '
       
   103                      'X expression "U has_update_permission X", T condition X '
       
   104                      'WHERE T name "deactivate"')
       
   105         self._test_stduser_deactivate()
       
   106 
       
   107     def _init_wf_with_shared_state_or_tr(self):
       
   108         req = self.request()
       
   109         etypes = dict(self.execute('Any N, ET WHERE ET is CWEType, ET name N'
       
   110                                    ', ET name IN ("CWGroup", "Bookmark")'))
       
   111         self.grpwf = req.create_entity('Workflow', ('workflow_of', 'ET'),
       
   112                                        ET=etypes['CWGroup'], name=u'group workflow')
       
   113         self.bmkwf = req.execute('Any X WHERE X is Workflow, X workflow_of ET, ET name "Bookmark"').get_entity(0, 0)
       
   114         self.state1 = self.grpwf.add_state(u'state1', initial=True)
       
   115         self.execute('SET S state_of WF WHERE S eid %(s)s, WF eid %(wf)s',
       
   116                      {'s': self.state1.eid, 'wf': self.bmkwf.eid})
       
   117         self.execute('SET WF initial_state S WHERE S eid %(s)s, WF eid %(wf)s',
       
   118                      {'s': self.state1.eid, 'wf': self.bmkwf.eid})
       
   119         self.state2 = self.grpwf.add_state(u'state2')
       
   120         self.group = self.add_entity('CWGroup', name=u't1')
       
   121         self.bookmark = self.add_entity('Bookmark', title=u'111', path=u'/view')
       
   122         # commit to link to the initial state
       
   123         self.commit()
       
   124 
       
   125     def test_transitions_selection(self):
       
   126         """
       
   127         ------------------------  tr1    -----------------
       
   128         | state1 (CWGroup, Bookmark) | ------> | state2 (CWGroup) |
       
   129         ------------------------         -----------------
       
   130                   |  tr2    ------------------
       
   131                   `------>  | state3 (Bookmark) |
       
   132                             ------------------
       
   133         """
       
   134         self._init_wf_with_shared_state_or_tr()
       
   135         state3 = self.bmkwf.add_state(u'state3')
       
   136         tr1 = self.grpwf.add_transition(u'tr1', (self.state1,), self.state2)
       
   137         tr2 = self.bmkwf.add_transition(u'tr2', (self.state1,), state3)
       
   138         transitions = list(self.group.possible_transitions())
       
   139         self.assertEquals(len(transitions), 1)
       
   140         self.assertEquals(transitions[0].name, 'tr1')
       
   141         transitions = list(self.bookmark.possible_transitions())
       
   142         self.assertEquals(len(transitions), 1)
       
   143         self.assertEquals(transitions[0].name, 'tr2')
       
   144 
       
   145 
       
   146     def test_transitions_selection2(self):
       
   147         """
       
   148         ------------------------  tr1 (Bookmark)   -----------------------
       
   149         | state1 (CWGroup, Bookmark) | -------------> | state2 (CWGroup,Bookmark) |
       
   150         ------------------------                -----------------------
       
   151                   |  tr2 (CWGroup)                     |
       
   152                   `---------------------------------/
       
   153         """
       
   154         self._init_wf_with_shared_state_or_tr()
       
   155         self.execute('SET S state_of WF WHERE S eid %(s)s, WF eid %(wf)s',
       
   156                      {'s': self.state2.eid, 'wf': self.bmkwf.eid})
       
   157         tr1 = self.bmkwf.add_transition(u'tr1', (self.state1,), self.state2)
       
   158         tr2 = self.grpwf.add_transition(u'tr2', (self.state1,), self.state2)
       
   159         transitions = list(self.group.possible_transitions())
       
   160         self.assertEquals(len(transitions), 1)
       
   161         self.assertEquals(transitions[0].name, 'tr2')
       
   162         transitions = list(self.bookmark.possible_transitions())
       
   163         self.assertEquals(len(transitions), 1)
       
   164         self.assertEquals(transitions[0].name, 'tr1')
       
   165 
       
   166 
       
   167 from cubicweb.devtools.apptest import RepositoryBasedTC
       
   168 
       
   169 class WorkflowHooksTC(RepositoryBasedTC):
       
   170 
       
   171     def setUp(self):
       
   172         RepositoryBasedTC.setUp(self)
       
   173         self.wf = self.session.user.current_workflow
       
   174         self.s_activated = self.wf.state_by_name('activated').eid
       
   175         self.s_deactivated = self.wf.state_by_name('deactivated').eid
       
   176         self.s_dummy = self.wf.add_state(u'dummy').eid
       
   177         self.wf.add_transition(u'dummy', (self.s_deactivated,), self.s_dummy)
       
   178         ueid = self.create_user('stduser', commit=False)
       
   179         # test initial state is set
       
   180         rset = self.execute('Any N WHERE S name N, X in_state S, X eid %(x)s',
       
   181                             {'x' : ueid})
       
   182         self.failIf(rset, rset.rows)
       
   183         self.commit()
       
   184         initialstate = self.execute('Any N WHERE S name N, X in_state S, X eid %(x)s',
       
   185                                     {'x' : ueid})[0][0]
       
   186         self.assertEquals(initialstate, u'activated')
       
   187         # give access to users group on the user's wf transitions
       
   188         # so we can test wf enforcing on euser (managers don't have anymore this
       
   189         # enforcement
       
   190         self.execute('SET X require_group G '
       
   191                      'WHERE G name "users", X transition_of WF, WF eid %(wf)s',
       
   192                      {'wf': self.wf.eid})
       
   193         self.commit()
       
   194 
       
   195     def tearDown(self):
       
   196         self.execute('DELETE X require_group G '
       
   197                      'WHERE G name "users", X transition_of WF, WF eid %(wf)s',
       
   198                      {'wf': self.wf.eid})
       
   199         self.commit()
       
   200         RepositoryBasedTC.tearDown(self)
       
   201 
       
   202     # XXX currently, we've to rely on hooks to set initial state, or to use unsafe_execute
       
   203     # def test_initial_state(self):
       
   204     #     cnx = self.login('stduser')
       
   205     #     cu = cnx.cursor()
       
   206     #     self.assertRaises(ValidationError, cu.execute,
       
   207     #                       'INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, '
       
   208     #                       'X in_state S WHERE S name "deactivated"', {'pwd': 'oops'})
       
   209     #     cnx.close()
       
   210     #     # though managers can do whatever he want
       
   211     #     self.execute('INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, '
       
   212     #                  'X in_state S, X in_group G WHERE S name "deactivated", G name "users"', {'pwd': 'oops'})
       
   213     #     self.commit()
       
   214 
       
   215     # test that the workflow is correctly enforced
       
   216     def test_transition_checking1(self):
       
   217         cnx = self.login('stduser')
       
   218         user = cnx.user(self.current_session())
       
   219         ex = self.assertRaises(ValidationError,
       
   220                                user.fire_transition, 'activate')
       
   221         self.assertEquals(ex.errors, {'by_transition': u"transition isn't allowed"})
       
   222         cnx.close()
       
   223 
       
   224     def test_transition_checking2(self):
       
   225         cnx = self.login('stduser')
       
   226         user = cnx.user(self.current_session())
       
   227         assert user.state == 'activated'
       
   228         ex = self.assertRaises(ValidationError,
       
   229                                user.fire_transition, 'dummy')
       
   230         self.assertEquals(ex.errors, {'by_transition': u"transition isn't allowed"})
       
   231         cnx.close()
       
   232 
       
   233     def test_transition_checking3(self):
       
   234         cnx = self.login('stduser')
       
   235         session = self.current_session()
       
   236         user = cnx.user(session)
       
   237         user.fire_transition('deactivate')
       
   238         cnx.commit()
       
   239         session.set_pool()
       
   240         ex = self.assertRaises(ValidationError,
       
   241                                user.fire_transition, 'deactivate')
       
   242         self.assertEquals(ex.errors, {'by_transition': u"transition isn't allowed"})
       
   243         # get back now
       
   244         user.fire_transition('activate')
       
   245         cnx.commit()
       
   246         cnx.close()
       
   247 
       
   248 
       
   249 if __name__ == '__main__':
       
   250     from logilab.common.testlib import unittest_main
       
   251     unittest_main()