entities/test/unittest_wfobjs.py
changeset 11057 0b59724cb3f2
parent 11052 058bb3dc685f
child 11058 23eb30449fe5
equal deleted inserted replaced
11052:058bb3dc685f 11057:0b59724cb3f2
     1 # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     3 #
       
     4 # This file is part of CubicWeb.
       
     5 #
       
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     7 # terms of the GNU Lesser General Public License as published by the Free
       
     8 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     9 # any later version.
       
    10 #
       
    11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    14 # details.
       
    15 #
       
    16 # You should have received a copy of the GNU Lesser General Public License along
       
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    18 
       
    19 from cubicweb import ValidationError
       
    20 from cubicweb.devtools.testlib import CubicWebTC
       
    21 
       
    22 def add_wf(shell, etype, name=None, default=False):
       
    23     if name is None:
       
    24         name = etype
       
    25     return shell.add_workflow(name, etype, default=default,
       
    26                               ensure_workflowable=False)
       
    27 
       
    28 def parse_hist(wfhist):
       
    29     return [(ti.previous_state.name, ti.new_state.name,
       
    30              ti.transition and ti.transition.name, ti.comment)
       
    31             for ti in wfhist]
       
    32 
       
    33 
       
    34 class WorkflowBuildingTC(CubicWebTC):
       
    35 
       
    36     def test_wf_construction(self):
       
    37         with self.admin_access.shell() as shell:
       
    38             wf = add_wf(shell, 'Company')
       
    39             foo = wf.add_state(u'foo', initial=True)
       
    40             bar = wf.add_state(u'bar')
       
    41             self.assertEqual(wf.state_by_name('bar').eid, bar.eid)
       
    42             self.assertEqual(wf.state_by_name('barrr'), None)
       
    43             baz = wf.add_transition(u'baz', (foo,), bar, ('managers',))
       
    44             self.assertEqual(wf.transition_by_name('baz').eid, baz.eid)
       
    45             self.assertEqual(len(baz.require_group), 1)
       
    46             self.assertEqual(baz.require_group[0].name, 'managers')
       
    47 
       
    48     def test_duplicated_state(self):
       
    49         with self.admin_access.shell() as shell:
       
    50             wf = add_wf(shell, 'Company')
       
    51             wf.add_state(u'foo', initial=True)
       
    52             shell.commit()
       
    53             with self.assertRaises(ValidationError) as cm:
       
    54                 wf.add_state(u'foo')
       
    55             self.assertEqual({'name': u'%(KEY-rtype)s is part of violated unicity constraint',
       
    56                               'state_of': u'%(KEY-rtype)s is part of violated unicity constraint',
       
    57                               '': u'some relations violate a unicity constraint'},
       
    58                              cm.exception.errors)
       
    59             shell.rollback()
       
    60             # no pb if not in the same workflow
       
    61             wf2 = add_wf(shell, 'Company')
       
    62             foo = wf2.add_state(u'foo', initial=True)
       
    63             shell.commit()
       
    64             # gnark gnark
       
    65             bar = wf.add_state(u'bar')
       
    66             shell.commit()
       
    67             with self.assertRaises(ValidationError) as cm:
       
    68                 bar.cw_set(name=u'foo')
       
    69             shell.rollback()
       
    70             self.assertEqual({'name': u'%(KEY-rtype)s is part of violated unicity constraint',
       
    71                               'state_of': u'%(KEY-rtype)s is part of violated unicity constraint',
       
    72                               '': u'some relations violate a unicity constraint'},
       
    73                              cm.exception.errors)
       
    74 
       
    75     def test_duplicated_transition(self):
       
    76         with self.admin_access.shell() as shell:
       
    77             wf = add_wf(shell, 'Company')
       
    78             foo = wf.add_state(u'foo', initial=True)
       
    79             bar = wf.add_state(u'bar')
       
    80             wf.add_transition(u'baz', (foo,), bar, ('managers',))
       
    81             with self.assertRaises(ValidationError) as cm:
       
    82                 wf.add_transition(u'baz', (bar,), foo)
       
    83             self.assertEqual({'name': u'%(KEY-rtype)s is part of violated unicity constraint',
       
    84                               'transition_of': u'%(KEY-rtype)s is part of violated unicity constraint',
       
    85                               '': u'some relations violate a unicity constraint'},
       
    86                              cm.exception.errors)
       
    87             shell.rollback()
       
    88             # no pb if not in the same workflow
       
    89             wf2 = add_wf(shell, 'Company')
       
    90             foo = wf2.add_state(u'foo', initial=True)
       
    91             bar = wf2.add_state(u'bar')
       
    92             wf2.add_transition(u'baz', (foo,), bar, ('managers',))
       
    93             shell.commit()
       
    94             # gnark gnark
       
    95             biz = wf2.add_transition(u'biz', (bar,), foo)
       
    96             shell.commit()
       
    97             with self.assertRaises(ValidationError) as cm:
       
    98                 biz.cw_set(name=u'baz')
       
    99             shell.rollback()
       
   100             self.assertEqual({'name': u'%(KEY-rtype)s is part of violated unicity constraint',
       
   101                               'transition_of': u'%(KEY-rtype)s is part of violated unicity constraint',
       
   102                               '': u'some relations violate a unicity constraint'},
       
   103                              cm.exception.errors)
       
   104 
       
   105 
       
   106 class WorkflowTC(CubicWebTC):
       
   107 
       
   108     def setup_database(self):
       
   109         rschema = self.schema['in_state']
       
   110         for rdef in rschema.rdefs.values():
       
   111             self.assertEqual(rdef.cardinality, '1*')
       
   112         with self.admin_access.client_cnx() as cnx:
       
   113             self.member_eid = self.create_user(cnx, 'member').eid
       
   114             cnx.commit()
       
   115 
       
   116     def test_workflow_base(self):
       
   117         with self.admin_access.web_request() as req:
       
   118             e = self.create_user(req, 'toto')
       
   119             iworkflowable = e.cw_adapt_to('IWorkflowable')
       
   120             self.assertEqual(iworkflowable.state, 'activated')
       
   121             iworkflowable.change_state('deactivated', u'deactivate 1')
       
   122             req.cnx.commit()
       
   123             iworkflowable.change_state('activated', u'activate 1')
       
   124             req.cnx.commit()
       
   125             iworkflowable.change_state('deactivated', u'deactivate 2')
       
   126             req.cnx.commit()
       
   127             e.cw_clear_relation_cache('wf_info_for', 'object')
       
   128             self.assertEqual([tr.comment for tr in e.reverse_wf_info_for],
       
   129                               ['deactivate 1', 'activate 1', 'deactivate 2'])
       
   130             self.assertEqual(iworkflowable.latest_trinfo().comment, 'deactivate 2')
       
   131 
       
   132     def test_possible_transitions(self):
       
   133         with self.admin_access.web_request() as req:
       
   134             user = req.execute('CWUser X').get_entity(0, 0)
       
   135             iworkflowable = user.cw_adapt_to('IWorkflowable')
       
   136             trs = list(iworkflowable.possible_transitions())
       
   137             self.assertEqual(len(trs), 1)
       
   138             self.assertEqual(trs[0].name, u'deactivate')
       
   139             self.assertEqual(trs[0].destination(None).name, u'deactivated')
       
   140         # test a std user get no possible transition
       
   141         with self.new_access('member').web_request() as req:
       
   142             # fetch the entity using the new session
       
   143             trs = list(req.user.cw_adapt_to('IWorkflowable').possible_transitions())
       
   144             self.assertEqual(len(trs), 0)
       
   145 
       
   146     def _test_manager_deactivate(self, user):
       
   147         iworkflowable = user.cw_adapt_to('IWorkflowable')
       
   148         user.cw_clear_relation_cache('in_state', 'subject')
       
   149         self.assertEqual(len(user.in_state), 1)
       
   150         self.assertEqual(iworkflowable.state, 'deactivated')
       
   151         trinfo = iworkflowable.latest_trinfo()
       
   152         self.assertEqual(trinfo.previous_state.name, 'activated')
       
   153         self.assertEqual(trinfo.new_state.name, 'deactivated')
       
   154         self.assertEqual(trinfo.comment, 'deactivate user')
       
   155         self.assertEqual(trinfo.comment_format, 'text/plain')
       
   156         return trinfo
       
   157 
       
   158     def test_change_state(self):
       
   159         with self.admin_access.client_cnx() as cnx:
       
   160             user = cnx.user
       
   161             iworkflowable = user.cw_adapt_to('IWorkflowable')
       
   162             iworkflowable.change_state('deactivated', comment=u'deactivate user')
       
   163             trinfo = self._test_manager_deactivate(user)
       
   164             self.assertEqual(trinfo.transition, None)
       
   165 
       
   166     def test_set_in_state_bad_wf(self):
       
   167         with self.admin_access.shell() as shell:
       
   168             wf = add_wf(shell, 'CWUser')
       
   169             s = wf.add_state(u'foo', initial=True)
       
   170             shell.commit()
       
   171         with self.admin_access.repo_cnx() as cnx:
       
   172             with cnx.security_enabled(write=False):
       
   173                 with self.assertRaises(ValidationError) as cm:
       
   174                     cnx.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
       
   175                                 {'x': cnx.user.eid, 's': s.eid})
       
   176                 self.assertEqual(cm.exception.errors, {'in_state-subject': "state doesn't belong to entity's workflow. "
       
   177                                           "You may want to set a custom workflow for this entity first."})
       
   178 
       
   179     def test_fire_transition(self):
       
   180         with self.admin_access.client_cnx() as cnx:
       
   181             user = cnx.user
       
   182             iworkflowable = user.cw_adapt_to('IWorkflowable')
       
   183             iworkflowable.fire_transition('deactivate', comment=u'deactivate user')
       
   184             user.cw_clear_all_caches()
       
   185             self.assertEqual(iworkflowable.state, 'deactivated')
       
   186             self._test_manager_deactivate(user)
       
   187             trinfo = self._test_manager_deactivate(user)
       
   188             self.assertEqual(trinfo.transition.name, 'deactivate')
       
   189 
       
   190     def test_goback_transition(self):
       
   191         with self.admin_access.web_request() as req:
       
   192             wf = req.user.cw_adapt_to('IWorkflowable').current_workflow
       
   193             asleep = wf.add_state('asleep')
       
   194             wf.add_transition('rest', (wf.state_by_name('activated'),
       
   195                                        wf.state_by_name('deactivated')),
       
   196                               asleep)
       
   197             wf.add_transition('wake up', asleep)
       
   198             user = self.create_user(req, 'stduser')
       
   199             iworkflowable = user.cw_adapt_to('IWorkflowable')
       
   200             iworkflowable.fire_transition('rest')
       
   201             req.cnx.commit()
       
   202             iworkflowable.fire_transition('wake up')
       
   203             req.cnx.commit()
       
   204             self.assertEqual(iworkflowable.state, 'activated')
       
   205             iworkflowable.fire_transition('deactivate')
       
   206             req.cnx.commit()
       
   207             iworkflowable.fire_transition('rest')
       
   208             req.cnx.commit()
       
   209             iworkflowable.fire_transition('wake up')
       
   210             req.cnx.commit()
       
   211             user.cw_clear_all_caches()
       
   212             self.assertEqual(iworkflowable.state, 'deactivated')
       
   213 
       
   214     # XXX test managers can change state without matching transition
       
   215 
       
   216     def _test_stduser_deactivate(self):
       
   217         with self.admin_access.repo_cnx() as cnx:
       
   218             self.create_user(cnx, 'tutu')
       
   219         with self.new_access('tutu').web_request() as req:
       
   220             iworkflowable = req.entity_from_eid(self.member_eid).cw_adapt_to('IWorkflowable')
       
   221             with self.assertRaises(ValidationError) as cm:
       
   222                 iworkflowable.fire_transition('deactivate')
       
   223             self.assertEqual(cm.exception.errors, {'by_transition-subject': "transition may not be fired"})
       
   224         with self.new_access('member').web_request() as req:
       
   225             iworkflowable = req.entity_from_eid(self.member_eid).cw_adapt_to('IWorkflowable')
       
   226             iworkflowable.fire_transition('deactivate')
       
   227             req.cnx.commit()
       
   228             with self.assertRaises(ValidationError) as cm:
       
   229                 iworkflowable.fire_transition('activate')
       
   230             self.assertEqual(cm.exception.errors, {'by_transition-subject': "transition may not be fired"})
       
   231 
       
   232     def test_fire_transition_owned_by(self):
       
   233         with self.admin_access.repo_cnx() as cnx:
       
   234             cnx.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", '
       
   235                         'X expression "X owned_by U", T condition X '
       
   236                         'WHERE T name "deactivate"')
       
   237             cnx.commit()
       
   238         self._test_stduser_deactivate()
       
   239 
       
   240     def test_fire_transition_has_update_perm(self):
       
   241         with self.admin_access.repo_cnx() as cnx:
       
   242             cnx.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", '
       
   243                         'X expression "U has_update_permission X", T condition X '
       
   244                         'WHERE T name "deactivate"')
       
   245             cnx.commit()
       
   246         self._test_stduser_deactivate()
       
   247 
       
   248     def test_swf_base(self):
       
   249         """subworkflow
       
   250 
       
   251         +-----------+  tr1   +-----------+
       
   252         | swfstate1 | ------>| swfstate2 |
       
   253         +-----------+        +-----------+
       
   254                   |  tr2  +-----------+
       
   255                   `------>| swfstate3 |
       
   256                           +-----------+
       
   257 
       
   258         main workflow
       
   259 
       
   260         +--------+  swftr1             +--------+
       
   261         | state1 | -------[swfstate2]->| state2 |
       
   262         +--------+     |               +--------+
       
   263                        |               +--------+
       
   264                        `-[swfstate3]-->| state3 |
       
   265                                        +--------+
       
   266         """
       
   267         # sub-workflow
       
   268         with self.admin_access.shell() as shell:
       
   269             swf = add_wf(shell, 'CWGroup', name='subworkflow')
       
   270             swfstate1 = swf.add_state(u'swfstate1', initial=True)
       
   271             swfstate2 = swf.add_state(u'swfstate2')
       
   272             swfstate3 = swf.add_state(u'swfstate3')
       
   273             tr1 = swf.add_transition(u'tr1', (swfstate1,), swfstate2)
       
   274             tr2 = swf.add_transition(u'tr2', (swfstate1,), swfstate3)
       
   275             # main workflow
       
   276             mwf = add_wf(shell, 'CWGroup', name='main workflow', default=True)
       
   277             state1 = mwf.add_state(u'state1', initial=True)
       
   278             state2 = mwf.add_state(u'state2')
       
   279             state3 = mwf.add_state(u'state3')
       
   280             swftr1 = mwf.add_wftransition(u'swftr1', swf, state1,
       
   281                                           [(swfstate2, state2), (swfstate3, state3)])
       
   282             swf.cw_clear_all_caches()
       
   283             self.assertEqual(swftr1.destination(None).eid, swfstate1.eid)
       
   284         # workflows built, begin test
       
   285         with self.admin_access.web_request() as req:
       
   286             group = req.create_entity('CWGroup', name=u'grp1')
       
   287             req.cnx.commit()
       
   288             iworkflowable = group.cw_adapt_to('IWorkflowable')
       
   289             self.assertEqual(iworkflowable.current_state.eid, state1.eid)
       
   290             self.assertEqual(iworkflowable.current_workflow.eid, mwf.eid)
       
   291             self.assertEqual(iworkflowable.main_workflow.eid, mwf.eid)
       
   292             self.assertEqual(iworkflowable.subworkflow_input_transition(), None)
       
   293             iworkflowable.fire_transition('swftr1', u'go')
       
   294             req.cnx.commit()
       
   295             group.cw_clear_all_caches()
       
   296             self.assertEqual(iworkflowable.current_state.eid, swfstate1.eid)
       
   297             self.assertEqual(iworkflowable.current_workflow.eid, swf.eid)
       
   298             self.assertEqual(iworkflowable.main_workflow.eid, mwf.eid)
       
   299             self.assertEqual(iworkflowable.subworkflow_input_transition().eid, swftr1.eid)
       
   300             iworkflowable.fire_transition('tr1', u'go')
       
   301             req.cnx.commit()
       
   302             group.cw_clear_all_caches()
       
   303             self.assertEqual(iworkflowable.current_state.eid, state2.eid)
       
   304             self.assertEqual(iworkflowable.current_workflow.eid, mwf.eid)
       
   305             self.assertEqual(iworkflowable.main_workflow.eid, mwf.eid)
       
   306             self.assertEqual(iworkflowable.subworkflow_input_transition(), None)
       
   307             # force back to swfstate1 is impossible since we can't any more find
       
   308             # subworkflow input transition
       
   309             with self.assertRaises(ValidationError) as cm:
       
   310                 iworkflowable.change_state(swfstate1, u'gadget')
       
   311             self.assertEqual(cm.exception.errors, {'to_state-subject': "state doesn't belong to entity's workflow"})
       
   312             req.cnx.rollback()
       
   313             # force back to state1
       
   314             iworkflowable.change_state('state1', u'gadget')
       
   315             iworkflowable.fire_transition('swftr1', u'au')
       
   316             group.cw_clear_all_caches()
       
   317             iworkflowable.fire_transition('tr2', u'chapeau')
       
   318             req.cnx.commit()
       
   319             group.cw_clear_all_caches()
       
   320             self.assertEqual(iworkflowable.current_state.eid, state3.eid)
       
   321             self.assertEqual(iworkflowable.current_workflow.eid, mwf.eid)
       
   322             self.assertEqual(iworkflowable.main_workflow.eid, mwf.eid)
       
   323             self.assertListEqual(parse_hist(iworkflowable.workflow_history),
       
   324                                   [('state1', 'swfstate1', 'swftr1', 'go'),
       
   325                                    ('swfstate1', 'swfstate2', 'tr1', 'go'),
       
   326                                    ('swfstate2', 'state2', 'swftr1', 'exiting from subworkflow subworkflow'),
       
   327                                    ('state2', 'state1', None, 'gadget'),
       
   328                                    ('state1', 'swfstate1', 'swftr1', 'au'),
       
   329                                    ('swfstate1', 'swfstate3', 'tr2', 'chapeau'),
       
   330                                    ('swfstate3', 'state3', 'swftr1', 'exiting from subworkflow subworkflow'),
       
   331                                    ])
       
   332 
       
   333     def test_swf_exit_consistency(self):
       
   334         with self.admin_access.shell() as shell:
       
   335             # sub-workflow
       
   336             swf = add_wf(shell, 'CWGroup', name='subworkflow')
       
   337             swfstate1 = swf.add_state(u'swfstate1', initial=True)
       
   338             swfstate2 = swf.add_state(u'swfstate2')
       
   339             tr1 = swf.add_transition(u'tr1', (swfstate1,), swfstate2)
       
   340             # main workflow
       
   341             mwf = add_wf(shell, 'CWGroup', name='main workflow', default=True)
       
   342             state1 = mwf.add_state(u'state1', initial=True)
       
   343             state2 = mwf.add_state(u'state2')
       
   344             state3 = mwf.add_state(u'state3')
       
   345             mwf.add_wftransition(u'swftr1', swf, state1,
       
   346                                  [(swfstate2, state2), (swfstate2, state3)])
       
   347             with self.assertRaises(ValidationError) as cm:
       
   348                 shell.commit()
       
   349             self.assertEqual(cm.exception.errors, {'subworkflow_exit-subject': u"can't have multiple exits on the same state"})
       
   350 
       
   351     def test_swf_fire_in_a_row(self):
       
   352         with self.admin_access.shell() as shell:
       
   353             # sub-workflow
       
   354             subwf = add_wf(shell, 'CWGroup', name='subworkflow')
       
   355             xsigning = subwf.add_state('xsigning', initial=True)
       
   356             xaborted = subwf.add_state('xaborted')
       
   357             xsigned = subwf.add_state('xsigned')
       
   358             xabort = subwf.add_transition('xabort', (xsigning,), xaborted)
       
   359             xsign = subwf.add_transition('xsign', (xsigning,), xsigning)
       
   360             xcomplete = subwf.add_transition('xcomplete', (xsigning,), xsigned,
       
   361                                              type=u'auto')
       
   362             # main workflow
       
   363             twf = add_wf(shell, 'CWGroup', name='mainwf', default=True)
       
   364             created    = twf.add_state(_('created'), initial=True)
       
   365             identified = twf.add_state(_('identified'))
       
   366             released   = twf.add_state(_('released'))
       
   367             closed   = twf.add_state(_('closed'))
       
   368             twf.add_wftransition(_('identify'), subwf, (created,),
       
   369                                  [(xsigned, identified), (xaborted, created)])
       
   370             twf.add_wftransition(_('release'), subwf, (identified,),
       
   371                                  [(xsigned, released), (xaborted, identified)])
       
   372             twf.add_wftransition(_('close'), subwf, (released,),
       
   373                                  [(xsigned, closed), (xaborted, released)])
       
   374             shell.commit()
       
   375         with self.admin_access.repo_cnx() as cnx:
       
   376             group = cnx.create_entity('CWGroup', name=u'grp1')
       
   377             cnx.commit()
       
   378             iworkflowable = group.cw_adapt_to('IWorkflowable')
       
   379             for trans in ('identify', 'release', 'close'):
       
   380                 iworkflowable.fire_transition(trans)
       
   381                 cnx.commit()
       
   382 
       
   383 
       
   384     def test_swf_magic_tr(self):
       
   385         with self.admin_access.shell() as shell:
       
   386             # sub-workflow
       
   387             subwf = add_wf(shell, 'CWGroup', name='subworkflow')
       
   388             xsigning = subwf.add_state('xsigning', initial=True)
       
   389             xaborted = subwf.add_state('xaborted')
       
   390             xsigned = subwf.add_state('xsigned')
       
   391             xabort = subwf.add_transition('xabort', (xsigning,), xaborted)
       
   392             xsign = subwf.add_transition('xsign', (xsigning,), xsigned)
       
   393             # main workflow
       
   394             twf = add_wf(shell, 'CWGroup', name='mainwf', default=True)
       
   395             created    = twf.add_state(_('created'), initial=True)
       
   396             identified = twf.add_state(_('identified'))
       
   397             released   = twf.add_state(_('released'))
       
   398             twf.add_wftransition(_('identify'), subwf, created,
       
   399                                  [(xaborted, None), (xsigned, identified)])
       
   400             twf.add_wftransition(_('release'), subwf, identified,
       
   401                                  [(xaborted, None)])
       
   402             shell.commit()
       
   403         with self.admin_access.web_request() as req:
       
   404             group = req.create_entity('CWGroup', name=u'grp1')
       
   405             req.cnx.commit()
       
   406             iworkflowable = group.cw_adapt_to('IWorkflowable')
       
   407             for trans, nextstate in (('identify', 'xsigning'),
       
   408                                      ('xabort', 'created'),
       
   409                                      ('identify', 'xsigning'),
       
   410                                      ('xsign', 'identified'),
       
   411                                      ('release', 'xsigning'),
       
   412                                      ('xabort', 'identified')
       
   413                                      ):
       
   414                 iworkflowable.fire_transition(trans)
       
   415                 req.cnx.commit()
       
   416                 group.cw_clear_all_caches()
       
   417                 self.assertEqual(iworkflowable.state, nextstate)
       
   418 
       
   419     def test_replace_state(self):
       
   420         with self.admin_access.shell() as shell:
       
   421             wf = add_wf(shell, 'CWGroup', name='groupwf', default=True)
       
   422             s_new = wf.add_state('new', initial=True)
       
   423             s_state1 = wf.add_state('state1')
       
   424             wf.add_transition('tr', (s_new,), s_state1)
       
   425             shell.commit()
       
   426 
       
   427         with self.admin_access.repo_cnx() as cnx:
       
   428             group = cnx.create_entity('CWGroup', name=u'grp1')
       
   429             cnx.commit()
       
   430 
       
   431             iwf = group.cw_adapt_to('IWorkflowable')
       
   432             iwf.fire_transition('tr')
       
   433             cnx.commit()
       
   434             group.cw_clear_all_caches()
       
   435 
       
   436             wf = cnx.entity_from_eid(wf.eid)
       
   437             wf.add_state('state2')
       
   438             with cnx.security_enabled(write=False):
       
   439                 wf.replace_state('state1', 'state2')
       
   440             cnx.commit()
       
   441 
       
   442             self.assertEqual(iwf.state, 'state2')
       
   443             self.assertEqual(iwf.latest_trinfo().to_state[0].name, 'state2')
       
   444 
       
   445 
       
   446 class CustomWorkflowTC(CubicWebTC):
       
   447 
       
   448     def setup_database(self):
       
   449         with self.admin_access.repo_cnx() as cnx:
       
   450             self.member_eid = self.create_user(cnx, 'member').eid
       
   451 
       
   452     def test_custom_wf_replace_state_no_history(self):
       
   453         """member in inital state with no previous history, state is simply
       
   454         redirected when changing workflow
       
   455         """
       
   456         with self.admin_access.shell() as shell:
       
   457             wf = add_wf(shell, 'CWUser')
       
   458             wf.add_state('asleep', initial=True)
       
   459         with self.admin_access.web_request() as req:
       
   460             req.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
       
   461                      {'wf': wf.eid, 'x': self.member_eid})
       
   462             member = req.entity_from_eid(self.member_eid)
       
   463             iworkflowable = member.cw_adapt_to('IWorkflowable')
       
   464             self.assertEqual(iworkflowable.state, 'activated') # no change before commit
       
   465             req.cnx.commit()
       
   466             member.cw_clear_all_caches()
       
   467             self.assertEqual(iworkflowable.current_workflow.eid, wf.eid)
       
   468             self.assertEqual(iworkflowable.state, 'asleep')
       
   469             self.assertEqual(iworkflowable.workflow_history, ())
       
   470 
       
   471     def test_custom_wf_replace_state_keep_history(self):
       
   472         """member in inital state with some history, state is redirected and
       
   473         state change is recorded to history
       
   474         """
       
   475         with self.admin_access.web_request() as req:
       
   476             member = req.entity_from_eid(self.member_eid)
       
   477             iworkflowable = member.cw_adapt_to('IWorkflowable')
       
   478             iworkflowable.fire_transition('deactivate')
       
   479             iworkflowable.fire_transition('activate')
       
   480             req.cnx.commit()
       
   481         with self.admin_access.shell() as shell:
       
   482             wf = add_wf(shell, 'CWUser')
       
   483             wf.add_state('asleep', initial=True)
       
   484             shell.rqlexec('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
       
   485                           {'wf': wf.eid, 'x': self.member_eid})
       
   486         with self.admin_access.web_request() as req:
       
   487             member = req.entity_from_eid(self.member_eid)
       
   488             iworkflowable = member.cw_adapt_to('IWorkflowable')
       
   489             self.assertEqual(iworkflowable.current_workflow.eid, wf.eid)
       
   490             self.assertEqual(iworkflowable.state, 'asleep')
       
   491             self.assertEqual(parse_hist(iworkflowable.workflow_history),
       
   492                              [('activated', 'deactivated', 'deactivate', None),
       
   493                               ('deactivated', 'activated', 'activate', None),
       
   494                               ('activated', 'asleep', None, 'workflow changed to "CWUser"')])
       
   495 
       
   496     def test_custom_wf_no_initial_state(self):
       
   497         """try to set a custom workflow which has no initial state"""
       
   498         with self.admin_access.shell() as shell:
       
   499             wf = add_wf(shell, 'CWUser')
       
   500             wf.add_state('asleep')
       
   501             shell.rqlexec('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
       
   502                           {'wf': wf.eid, 'x': self.member_eid})
       
   503             with self.assertRaises(ValidationError) as cm:
       
   504                 shell.commit()
       
   505             self.assertEqual(cm.exception.errors, {'custom_workflow-subject': u'workflow has no initial state'})
       
   506 
       
   507     def test_custom_wf_bad_etype(self):
       
   508         """try to set a custom workflow which doesn't apply to entity type"""
       
   509         with self.admin_access.shell() as shell:
       
   510             wf = add_wf(shell, 'Company')
       
   511             wf.add_state('asleep', initial=True)
       
   512             shell.rqlexec('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
       
   513                          {'wf': wf.eid, 'x': self.member_eid})
       
   514             with self.assertRaises(ValidationError) as cm:
       
   515                 shell.commit()
       
   516             self.assertEqual(cm.exception.errors, {'custom_workflow-subject': u"workflow isn't a workflow for this type"})
       
   517 
       
   518     def test_del_custom_wf(self):
       
   519         """member in some state shared by the new workflow, nothing has to be
       
   520         done
       
   521         """
       
   522         with self.admin_access.web_request() as req:
       
   523             member = req.entity_from_eid(self.member_eid)
       
   524             iworkflowable = member.cw_adapt_to('IWorkflowable')
       
   525             iworkflowable.fire_transition('deactivate')
       
   526             req.cnx.commit()
       
   527         with self.admin_access.shell() as shell:
       
   528             wf = add_wf(shell, 'CWUser')
       
   529             wf.add_state('asleep', initial=True)
       
   530             shell.rqlexec('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
       
   531                          {'wf': wf.eid, 'x': self.member_eid})
       
   532             shell.commit()
       
   533         with self.admin_access.web_request() as req:
       
   534             req.execute('DELETE X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
       
   535                          {'wf': wf.eid, 'x': self.member_eid})
       
   536             member = req.entity_from_eid(self.member_eid)
       
   537             iworkflowable = member.cw_adapt_to('IWorkflowable')
       
   538             self.assertEqual(iworkflowable.state, 'asleep')# no change before commit
       
   539             req.cnx.commit()
       
   540             member.cw_clear_all_caches()
       
   541             self.assertEqual(iworkflowable.current_workflow.name, "default user workflow")
       
   542             self.assertEqual(iworkflowable.state, 'activated')
       
   543             self.assertEqual(parse_hist(iworkflowable.workflow_history),
       
   544                               [('activated', 'deactivated', 'deactivate', None),
       
   545                                ('deactivated', 'asleep', None, 'workflow changed to "CWUser"'),
       
   546                                ('asleep', 'activated', None, 'workflow changed to "default user workflow"'),])
       
   547 
       
   548 
       
   549 class AutoTransitionTC(CubicWebTC):
       
   550 
       
   551     def setup_custom_wf(self):
       
   552         with self.admin_access.shell() as shell:
       
   553             wf = add_wf(shell, 'CWUser')
       
   554             asleep = wf.add_state('asleep', initial=True)
       
   555             dead = wf.add_state('dead')
       
   556             wf.add_transition('rest', asleep, asleep)
       
   557             wf.add_transition('sick', asleep, dead, type=u'auto',
       
   558                               conditions=({'expr': u'X surname "toto"',
       
   559                                            'mainvars': u'X'},))
       
   560         return wf
       
   561 
       
   562     def test_auto_transition_fired(self):
       
   563         wf = self.setup_custom_wf()
       
   564         with self.admin_access.web_request() as req:
       
   565             user = self.create_user(req, 'member')
       
   566             iworkflowable = user.cw_adapt_to('IWorkflowable')
       
   567             req.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
       
   568                         {'wf': wf.eid, 'x': user.eid})
       
   569             req.cnx.commit()
       
   570             user.cw_clear_all_caches()
       
   571             self.assertEqual(iworkflowable.state, 'asleep')
       
   572             self.assertEqual([t.name for t in iworkflowable.possible_transitions()],
       
   573                               ['rest'])
       
   574             iworkflowable.fire_transition('rest')
       
   575             req.cnx.commit()
       
   576             user.cw_clear_all_caches()
       
   577             self.assertEqual(iworkflowable.state, 'asleep')
       
   578             self.assertEqual([t.name for t in iworkflowable.possible_transitions()],
       
   579                               ['rest'])
       
   580             self.assertEqual(parse_hist(iworkflowable.workflow_history),
       
   581                               [('asleep', 'asleep', 'rest', None)])
       
   582             user.cw_set(surname=u'toto') # fulfill condition
       
   583             req.cnx.commit()
       
   584             iworkflowable.fire_transition('rest')
       
   585             req.cnx.commit()
       
   586             user.cw_clear_all_caches()
       
   587             self.assertEqual(iworkflowable.state, 'dead')
       
   588             self.assertEqual(parse_hist(iworkflowable.workflow_history),
       
   589                               [('asleep', 'asleep', 'rest', None),
       
   590                                ('asleep', 'asleep', 'rest', None),
       
   591                                ('asleep', 'dead', 'sick', None),])
       
   592 
       
   593     def test_auto_transition_custom_initial_state_fired(self):
       
   594         wf = self.setup_custom_wf()
       
   595         with self.admin_access.web_request() as req:
       
   596             user = self.create_user(req, 'member', surname=u'toto')
       
   597             req.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
       
   598                         {'wf': wf.eid, 'x': user.eid})
       
   599             req.cnx.commit()
       
   600             user.cw_clear_all_caches()
       
   601             iworkflowable = user.cw_adapt_to('IWorkflowable')
       
   602             self.assertEqual(iworkflowable.state, 'dead')
       
   603 
       
   604     def test_auto_transition_initial_state_fired(self):
       
   605         with self.admin_access.web_request() as req:
       
   606             wf = req.execute('Any WF WHERE ET default_workflow WF, '
       
   607                              'ET name %(et)s', {'et': 'CWUser'}).get_entity(0, 0)
       
   608             dead = wf.add_state('dead')
       
   609             wf.add_transition('sick', wf.state_by_name('activated'), dead,
       
   610                               type=u'auto', conditions=({'expr': u'X surname "toto"',
       
   611                                                          'mainvars': u'X'},))
       
   612             req.cnx.commit()
       
   613         with self.admin_access.web_request() as req:
       
   614             user = self.create_user(req, 'member', surname=u'toto')
       
   615             req.cnx.commit()
       
   616             iworkflowable = user.cw_adapt_to('IWorkflowable')
       
   617             self.assertEqual(iworkflowable.state, 'dead')
       
   618 
       
   619 
       
   620 class WorkflowHooksTC(CubicWebTC):
       
   621 
       
   622     def setUp(self):
       
   623         CubicWebTC.setUp(self)
       
   624         with self.admin_access.web_request() as req:
       
   625             self.wf = req.user.cw_adapt_to('IWorkflowable').current_workflow
       
   626             self.s_activated = self.wf.state_by_name('activated').eid
       
   627             self.s_deactivated = self.wf.state_by_name('deactivated').eid
       
   628             self.s_dummy = self.wf.add_state(u'dummy').eid
       
   629             self.wf.add_transition(u'dummy', (self.s_deactivated,), self.s_dummy)
       
   630             ueid = self.create_user(req, 'stduser', commit=False).eid
       
   631             # test initial state is set
       
   632             rset = req.execute('Any N WHERE S name N, X in_state S, X eid %(x)s',
       
   633                                 {'x' : ueid})
       
   634             self.assertFalse(rset, rset.rows)
       
   635             req.cnx.commit()
       
   636             initialstate = req.execute('Any N WHERE S name N, X in_state S, X eid %(x)s',
       
   637                                         {'x' : ueid})[0][0]
       
   638             self.assertEqual(initialstate, u'activated')
       
   639             # give access to users group on the user's wf transitions
       
   640             # so we can test wf enforcing on euser (managers don't have anymore this
       
   641             # enforcement
       
   642             req.execute('SET X require_group G '
       
   643                          'WHERE G name "users", X transition_of WF, WF eid %(wf)s',
       
   644                          {'wf': self.wf.eid})
       
   645             req.cnx.commit()
       
   646 
       
   647     # XXX currently, we've to rely on hooks to set initial state, or to use execute
       
   648     # def test_initial_state(self):
       
   649     #     cnx = self.login('stduser')
       
   650     #     cu = cnx.cursor()
       
   651     #     self.assertRaises(ValidationError, cu.execute,
       
   652     #                       'INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, '
       
   653     #                       'X in_state S WHERE S name "deactivated"', {'pwd': 'oops'})
       
   654     #     cnx.close()
       
   655     #     # though managers can do whatever he want
       
   656     #     self.execute('INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, '
       
   657     #                  'X in_state S, X in_group G WHERE S name "deactivated", G name "users"', {'pwd': 'oops'})
       
   658     #     self.commit()
       
   659 
       
   660     # test that the workflow is correctly enforced
       
   661 
       
   662     def _cleanup_msg(self, msg):
       
   663         """remove the variable part of one specific error message"""
       
   664         lmsg = msg.split()
       
   665         lmsg.pop(1)
       
   666         lmsg.pop()
       
   667         return ' '.join(lmsg)
       
   668 
       
   669     def test_transition_checking1(self):
       
   670         with self.new_access('stduser').repo_cnx() as cnx:
       
   671             user = cnx.user
       
   672             iworkflowable = user.cw_adapt_to('IWorkflowable')
       
   673             with self.assertRaises(ValidationError) as cm:
       
   674                 iworkflowable.fire_transition('activate')
       
   675             self.assertEqual(self._cleanup_msg(cm.exception.errors['by_transition-subject']),
       
   676                              u"transition isn't allowed from")
       
   677 
       
   678     def test_transition_checking2(self):
       
   679         with self.new_access('stduser').repo_cnx() as cnx:
       
   680             user = cnx.user
       
   681             iworkflowable = user.cw_adapt_to('IWorkflowable')
       
   682             with self.assertRaises(ValidationError) as cm:
       
   683                 iworkflowable.fire_transition('dummy')
       
   684             self.assertEqual(self._cleanup_msg(cm.exception.errors['by_transition-subject']),
       
   685                              u"transition isn't allowed from")
       
   686 
       
   687     def test_transition_checking3(self):
       
   688         with self.new_access('stduser').repo_cnx() as cnx:
       
   689             user = cnx.user
       
   690             iworkflowable = user.cw_adapt_to('IWorkflowable')
       
   691             iworkflowable.fire_transition('deactivate')
       
   692             cnx.commit()
       
   693             with self.assertRaises(ValidationError) as cm:
       
   694                 iworkflowable.fire_transition('deactivate')
       
   695             self.assertEqual(self._cleanup_msg(cm.exception.errors['by_transition-subject']),
       
   696                                                 u"transition isn't allowed from")
       
   697             cnx.rollback()
       
   698             # get back now
       
   699             iworkflowable.fire_transition('activate')
       
   700             cnx.commit()
       
   701 
       
   702 
       
   703 if __name__ == '__main__':
       
   704     from logilab.common.testlib import unittest_main
       
   705     unittest_main()