|
1 # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
3 # |
|
4 # This file is part of CubicWeb. |
|
5 # |
|
6 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
7 # terms of the GNU Lesser General Public License as published by the Free |
|
8 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
9 # any later version. |
|
10 # |
|
11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
13 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
14 # details. |
|
15 # |
|
16 # You should have received a copy of the GNU Lesser General Public License along |
|
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
18 |
|
19 from cubicweb import ValidationError |
|
20 from cubicweb.devtools.testlib import CubicWebTC |
|
21 |
|
22 def add_wf(shell, etype, name=None, default=False): |
|
23 if name is None: |
|
24 name = etype |
|
25 return shell.add_workflow(name, etype, default=default, |
|
26 ensure_workflowable=False) |
|
27 |
|
28 def parse_hist(wfhist): |
|
29 return [(ti.previous_state.name, ti.new_state.name, |
|
30 ti.transition and ti.transition.name, ti.comment) |
|
31 for ti in wfhist] |
|
32 |
|
33 |
|
34 class WorkflowBuildingTC(CubicWebTC): |
|
35 |
|
36 def test_wf_construction(self): |
|
37 with self.admin_access.shell() as shell: |
|
38 wf = add_wf(shell, 'Company') |
|
39 foo = wf.add_state(u'foo', initial=True) |
|
40 bar = wf.add_state(u'bar') |
|
41 self.assertEqual(wf.state_by_name('bar').eid, bar.eid) |
|
42 self.assertEqual(wf.state_by_name('barrr'), None) |
|
43 baz = wf.add_transition(u'baz', (foo,), bar, ('managers',)) |
|
44 self.assertEqual(wf.transition_by_name('baz').eid, baz.eid) |
|
45 self.assertEqual(len(baz.require_group), 1) |
|
46 self.assertEqual(baz.require_group[0].name, 'managers') |
|
47 |
|
48 def test_duplicated_state(self): |
|
49 with self.admin_access.shell() as shell: |
|
50 wf = add_wf(shell, 'Company') |
|
51 wf.add_state(u'foo', initial=True) |
|
52 shell.commit() |
|
53 with self.assertRaises(ValidationError) as cm: |
|
54 wf.add_state(u'foo') |
|
55 self.assertEqual({'name': u'%(KEY-rtype)s is part of violated unicity constraint', |
|
56 'state_of': u'%(KEY-rtype)s is part of violated unicity constraint', |
|
57 '': u'some relations violate a unicity constraint'}, |
|
58 cm.exception.errors) |
|
59 shell.rollback() |
|
60 # no pb if not in the same workflow |
|
61 wf2 = add_wf(shell, 'Company') |
|
62 foo = wf2.add_state(u'foo', initial=True) |
|
63 shell.commit() |
|
64 # gnark gnark |
|
65 bar = wf.add_state(u'bar') |
|
66 shell.commit() |
|
67 with self.assertRaises(ValidationError) as cm: |
|
68 bar.cw_set(name=u'foo') |
|
69 shell.rollback() |
|
70 self.assertEqual({'name': u'%(KEY-rtype)s is part of violated unicity constraint', |
|
71 'state_of': u'%(KEY-rtype)s is part of violated unicity constraint', |
|
72 '': u'some relations violate a unicity constraint'}, |
|
73 cm.exception.errors) |
|
74 |
|
75 def test_duplicated_transition(self): |
|
76 with self.admin_access.shell() as shell: |
|
77 wf = add_wf(shell, 'Company') |
|
78 foo = wf.add_state(u'foo', initial=True) |
|
79 bar = wf.add_state(u'bar') |
|
80 wf.add_transition(u'baz', (foo,), bar, ('managers',)) |
|
81 with self.assertRaises(ValidationError) as cm: |
|
82 wf.add_transition(u'baz', (bar,), foo) |
|
83 self.assertEqual({'name': u'%(KEY-rtype)s is part of violated unicity constraint', |
|
84 'transition_of': u'%(KEY-rtype)s is part of violated unicity constraint', |
|
85 '': u'some relations violate a unicity constraint'}, |
|
86 cm.exception.errors) |
|
87 shell.rollback() |
|
88 # no pb if not in the same workflow |
|
89 wf2 = add_wf(shell, 'Company') |
|
90 foo = wf2.add_state(u'foo', initial=True) |
|
91 bar = wf2.add_state(u'bar') |
|
92 wf2.add_transition(u'baz', (foo,), bar, ('managers',)) |
|
93 shell.commit() |
|
94 # gnark gnark |
|
95 biz = wf2.add_transition(u'biz', (bar,), foo) |
|
96 shell.commit() |
|
97 with self.assertRaises(ValidationError) as cm: |
|
98 biz.cw_set(name=u'baz') |
|
99 shell.rollback() |
|
100 self.assertEqual({'name': u'%(KEY-rtype)s is part of violated unicity constraint', |
|
101 'transition_of': u'%(KEY-rtype)s is part of violated unicity constraint', |
|
102 '': u'some relations violate a unicity constraint'}, |
|
103 cm.exception.errors) |
|
104 |
|
105 |
|
106 class WorkflowTC(CubicWebTC): |
|
107 |
|
108 def setup_database(self): |
|
109 rschema = self.schema['in_state'] |
|
110 for rdef in rschema.rdefs.values(): |
|
111 self.assertEqual(rdef.cardinality, '1*') |
|
112 with self.admin_access.client_cnx() as cnx: |
|
113 self.member_eid = self.create_user(cnx, 'member').eid |
|
114 cnx.commit() |
|
115 |
|
116 def test_workflow_base(self): |
|
117 with self.admin_access.web_request() as req: |
|
118 e = self.create_user(req, 'toto') |
|
119 iworkflowable = e.cw_adapt_to('IWorkflowable') |
|
120 self.assertEqual(iworkflowable.state, 'activated') |
|
121 iworkflowable.change_state('deactivated', u'deactivate 1') |
|
122 req.cnx.commit() |
|
123 iworkflowable.change_state('activated', u'activate 1') |
|
124 req.cnx.commit() |
|
125 iworkflowable.change_state('deactivated', u'deactivate 2') |
|
126 req.cnx.commit() |
|
127 e.cw_clear_relation_cache('wf_info_for', 'object') |
|
128 self.assertEqual([tr.comment for tr in e.reverse_wf_info_for], |
|
129 ['deactivate 1', 'activate 1', 'deactivate 2']) |
|
130 self.assertEqual(iworkflowable.latest_trinfo().comment, 'deactivate 2') |
|
131 |
|
132 def test_possible_transitions(self): |
|
133 with self.admin_access.web_request() as req: |
|
134 user = req.execute('CWUser X').get_entity(0, 0) |
|
135 iworkflowable = user.cw_adapt_to('IWorkflowable') |
|
136 trs = list(iworkflowable.possible_transitions()) |
|
137 self.assertEqual(len(trs), 1) |
|
138 self.assertEqual(trs[0].name, u'deactivate') |
|
139 self.assertEqual(trs[0].destination(None).name, u'deactivated') |
|
140 # test a std user get no possible transition |
|
141 with self.new_access('member').web_request() as req: |
|
142 # fetch the entity using the new session |
|
143 trs = list(req.user.cw_adapt_to('IWorkflowable').possible_transitions()) |
|
144 self.assertEqual(len(trs), 0) |
|
145 |
|
146 def _test_manager_deactivate(self, user): |
|
147 iworkflowable = user.cw_adapt_to('IWorkflowable') |
|
148 user.cw_clear_relation_cache('in_state', 'subject') |
|
149 self.assertEqual(len(user.in_state), 1) |
|
150 self.assertEqual(iworkflowable.state, 'deactivated') |
|
151 trinfo = iworkflowable.latest_trinfo() |
|
152 self.assertEqual(trinfo.previous_state.name, 'activated') |
|
153 self.assertEqual(trinfo.new_state.name, 'deactivated') |
|
154 self.assertEqual(trinfo.comment, 'deactivate user') |
|
155 self.assertEqual(trinfo.comment_format, 'text/plain') |
|
156 return trinfo |
|
157 |
|
158 def test_change_state(self): |
|
159 with self.admin_access.client_cnx() as cnx: |
|
160 user = cnx.user |
|
161 iworkflowable = user.cw_adapt_to('IWorkflowable') |
|
162 iworkflowable.change_state('deactivated', comment=u'deactivate user') |
|
163 trinfo = self._test_manager_deactivate(user) |
|
164 self.assertEqual(trinfo.transition, None) |
|
165 |
|
166 def test_set_in_state_bad_wf(self): |
|
167 with self.admin_access.shell() as shell: |
|
168 wf = add_wf(shell, 'CWUser') |
|
169 s = wf.add_state(u'foo', initial=True) |
|
170 shell.commit() |
|
171 with self.admin_access.repo_cnx() as cnx: |
|
172 with cnx.security_enabled(write=False): |
|
173 with self.assertRaises(ValidationError) as cm: |
|
174 cnx.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s', |
|
175 {'x': cnx.user.eid, 's': s.eid}) |
|
176 self.assertEqual(cm.exception.errors, {'in_state-subject': "state doesn't belong to entity's workflow. " |
|
177 "You may want to set a custom workflow for this entity first."}) |
|
178 |
|
179 def test_fire_transition(self): |
|
180 with self.admin_access.client_cnx() as cnx: |
|
181 user = cnx.user |
|
182 iworkflowable = user.cw_adapt_to('IWorkflowable') |
|
183 iworkflowable.fire_transition('deactivate', comment=u'deactivate user') |
|
184 user.cw_clear_all_caches() |
|
185 self.assertEqual(iworkflowable.state, 'deactivated') |
|
186 self._test_manager_deactivate(user) |
|
187 trinfo = self._test_manager_deactivate(user) |
|
188 self.assertEqual(trinfo.transition.name, 'deactivate') |
|
189 |
|
190 def test_goback_transition(self): |
|
191 with self.admin_access.web_request() as req: |
|
192 wf = req.user.cw_adapt_to('IWorkflowable').current_workflow |
|
193 asleep = wf.add_state('asleep') |
|
194 wf.add_transition('rest', (wf.state_by_name('activated'), |
|
195 wf.state_by_name('deactivated')), |
|
196 asleep) |
|
197 wf.add_transition('wake up', asleep) |
|
198 user = self.create_user(req, 'stduser') |
|
199 iworkflowable = user.cw_adapt_to('IWorkflowable') |
|
200 iworkflowable.fire_transition('rest') |
|
201 req.cnx.commit() |
|
202 iworkflowable.fire_transition('wake up') |
|
203 req.cnx.commit() |
|
204 self.assertEqual(iworkflowable.state, 'activated') |
|
205 iworkflowable.fire_transition('deactivate') |
|
206 req.cnx.commit() |
|
207 iworkflowable.fire_transition('rest') |
|
208 req.cnx.commit() |
|
209 iworkflowable.fire_transition('wake up') |
|
210 req.cnx.commit() |
|
211 user.cw_clear_all_caches() |
|
212 self.assertEqual(iworkflowable.state, 'deactivated') |
|
213 |
|
214 # XXX test managers can change state without matching transition |
|
215 |
|
216 def _test_stduser_deactivate(self): |
|
217 with self.admin_access.repo_cnx() as cnx: |
|
218 self.create_user(cnx, 'tutu') |
|
219 with self.new_access('tutu').web_request() as req: |
|
220 iworkflowable = req.entity_from_eid(self.member_eid).cw_adapt_to('IWorkflowable') |
|
221 with self.assertRaises(ValidationError) as cm: |
|
222 iworkflowable.fire_transition('deactivate') |
|
223 self.assertEqual(cm.exception.errors, {'by_transition-subject': "transition may not be fired"}) |
|
224 with self.new_access('member').web_request() as req: |
|
225 iworkflowable = req.entity_from_eid(self.member_eid).cw_adapt_to('IWorkflowable') |
|
226 iworkflowable.fire_transition('deactivate') |
|
227 req.cnx.commit() |
|
228 with self.assertRaises(ValidationError) as cm: |
|
229 iworkflowable.fire_transition('activate') |
|
230 self.assertEqual(cm.exception.errors, {'by_transition-subject': "transition may not be fired"}) |
|
231 |
|
232 def test_fire_transition_owned_by(self): |
|
233 with self.admin_access.repo_cnx() as cnx: |
|
234 cnx.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", ' |
|
235 'X expression "X owned_by U", T condition X ' |
|
236 'WHERE T name "deactivate"') |
|
237 cnx.commit() |
|
238 self._test_stduser_deactivate() |
|
239 |
|
240 def test_fire_transition_has_update_perm(self): |
|
241 with self.admin_access.repo_cnx() as cnx: |
|
242 cnx.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", ' |
|
243 'X expression "U has_update_permission X", T condition X ' |
|
244 'WHERE T name "deactivate"') |
|
245 cnx.commit() |
|
246 self._test_stduser_deactivate() |
|
247 |
|
248 def test_swf_base(self): |
|
249 """subworkflow |
|
250 |
|
251 +-----------+ tr1 +-----------+ |
|
252 | swfstate1 | ------>| swfstate2 | |
|
253 +-----------+ +-----------+ |
|
254 | tr2 +-----------+ |
|
255 `------>| swfstate3 | |
|
256 +-----------+ |
|
257 |
|
258 main workflow |
|
259 |
|
260 +--------+ swftr1 +--------+ |
|
261 | state1 | -------[swfstate2]->| state2 | |
|
262 +--------+ | +--------+ |
|
263 | +--------+ |
|
264 `-[swfstate3]-->| state3 | |
|
265 +--------+ |
|
266 """ |
|
267 # sub-workflow |
|
268 with self.admin_access.shell() as shell: |
|
269 swf = add_wf(shell, 'CWGroup', name='subworkflow') |
|
270 swfstate1 = swf.add_state(u'swfstate1', initial=True) |
|
271 swfstate2 = swf.add_state(u'swfstate2') |
|
272 swfstate3 = swf.add_state(u'swfstate3') |
|
273 tr1 = swf.add_transition(u'tr1', (swfstate1,), swfstate2) |
|
274 tr2 = swf.add_transition(u'tr2', (swfstate1,), swfstate3) |
|
275 # main workflow |
|
276 mwf = add_wf(shell, 'CWGroup', name='main workflow', default=True) |
|
277 state1 = mwf.add_state(u'state1', initial=True) |
|
278 state2 = mwf.add_state(u'state2') |
|
279 state3 = mwf.add_state(u'state3') |
|
280 swftr1 = mwf.add_wftransition(u'swftr1', swf, state1, |
|
281 [(swfstate2, state2), (swfstate3, state3)]) |
|
282 swf.cw_clear_all_caches() |
|
283 self.assertEqual(swftr1.destination(None).eid, swfstate1.eid) |
|
284 # workflows built, begin test |
|
285 with self.admin_access.web_request() as req: |
|
286 group = req.create_entity('CWGroup', name=u'grp1') |
|
287 req.cnx.commit() |
|
288 iworkflowable = group.cw_adapt_to('IWorkflowable') |
|
289 self.assertEqual(iworkflowable.current_state.eid, state1.eid) |
|
290 self.assertEqual(iworkflowable.current_workflow.eid, mwf.eid) |
|
291 self.assertEqual(iworkflowable.main_workflow.eid, mwf.eid) |
|
292 self.assertEqual(iworkflowable.subworkflow_input_transition(), None) |
|
293 iworkflowable.fire_transition('swftr1', u'go') |
|
294 req.cnx.commit() |
|
295 group.cw_clear_all_caches() |
|
296 self.assertEqual(iworkflowable.current_state.eid, swfstate1.eid) |
|
297 self.assertEqual(iworkflowable.current_workflow.eid, swf.eid) |
|
298 self.assertEqual(iworkflowable.main_workflow.eid, mwf.eid) |
|
299 self.assertEqual(iworkflowable.subworkflow_input_transition().eid, swftr1.eid) |
|
300 iworkflowable.fire_transition('tr1', u'go') |
|
301 req.cnx.commit() |
|
302 group.cw_clear_all_caches() |
|
303 self.assertEqual(iworkflowable.current_state.eid, state2.eid) |
|
304 self.assertEqual(iworkflowable.current_workflow.eid, mwf.eid) |
|
305 self.assertEqual(iworkflowable.main_workflow.eid, mwf.eid) |
|
306 self.assertEqual(iworkflowable.subworkflow_input_transition(), None) |
|
307 # force back to swfstate1 is impossible since we can't any more find |
|
308 # subworkflow input transition |
|
309 with self.assertRaises(ValidationError) as cm: |
|
310 iworkflowable.change_state(swfstate1, u'gadget') |
|
311 self.assertEqual(cm.exception.errors, {'to_state-subject': "state doesn't belong to entity's workflow"}) |
|
312 req.cnx.rollback() |
|
313 # force back to state1 |
|
314 iworkflowable.change_state('state1', u'gadget') |
|
315 iworkflowable.fire_transition('swftr1', u'au') |
|
316 group.cw_clear_all_caches() |
|
317 iworkflowable.fire_transition('tr2', u'chapeau') |
|
318 req.cnx.commit() |
|
319 group.cw_clear_all_caches() |
|
320 self.assertEqual(iworkflowable.current_state.eid, state3.eid) |
|
321 self.assertEqual(iworkflowable.current_workflow.eid, mwf.eid) |
|
322 self.assertEqual(iworkflowable.main_workflow.eid, mwf.eid) |
|
323 self.assertListEqual(parse_hist(iworkflowable.workflow_history), |
|
324 [('state1', 'swfstate1', 'swftr1', 'go'), |
|
325 ('swfstate1', 'swfstate2', 'tr1', 'go'), |
|
326 ('swfstate2', 'state2', 'swftr1', 'exiting from subworkflow subworkflow'), |
|
327 ('state2', 'state1', None, 'gadget'), |
|
328 ('state1', 'swfstate1', 'swftr1', 'au'), |
|
329 ('swfstate1', 'swfstate3', 'tr2', 'chapeau'), |
|
330 ('swfstate3', 'state3', 'swftr1', 'exiting from subworkflow subworkflow'), |
|
331 ]) |
|
332 |
|
333 def test_swf_exit_consistency(self): |
|
334 with self.admin_access.shell() as shell: |
|
335 # sub-workflow |
|
336 swf = add_wf(shell, 'CWGroup', name='subworkflow') |
|
337 swfstate1 = swf.add_state(u'swfstate1', initial=True) |
|
338 swfstate2 = swf.add_state(u'swfstate2') |
|
339 tr1 = swf.add_transition(u'tr1', (swfstate1,), swfstate2) |
|
340 # main workflow |
|
341 mwf = add_wf(shell, 'CWGroup', name='main workflow', default=True) |
|
342 state1 = mwf.add_state(u'state1', initial=True) |
|
343 state2 = mwf.add_state(u'state2') |
|
344 state3 = mwf.add_state(u'state3') |
|
345 mwf.add_wftransition(u'swftr1', swf, state1, |
|
346 [(swfstate2, state2), (swfstate2, state3)]) |
|
347 with self.assertRaises(ValidationError) as cm: |
|
348 shell.commit() |
|
349 self.assertEqual(cm.exception.errors, {'subworkflow_exit-subject': u"can't have multiple exits on the same state"}) |
|
350 |
|
351 def test_swf_fire_in_a_row(self): |
|
352 with self.admin_access.shell() as shell: |
|
353 # sub-workflow |
|
354 subwf = add_wf(shell, 'CWGroup', name='subworkflow') |
|
355 xsigning = subwf.add_state('xsigning', initial=True) |
|
356 xaborted = subwf.add_state('xaborted') |
|
357 xsigned = subwf.add_state('xsigned') |
|
358 xabort = subwf.add_transition('xabort', (xsigning,), xaborted) |
|
359 xsign = subwf.add_transition('xsign', (xsigning,), xsigning) |
|
360 xcomplete = subwf.add_transition('xcomplete', (xsigning,), xsigned, |
|
361 type=u'auto') |
|
362 # main workflow |
|
363 twf = add_wf(shell, 'CWGroup', name='mainwf', default=True) |
|
364 created = twf.add_state(_('created'), initial=True) |
|
365 identified = twf.add_state(_('identified')) |
|
366 released = twf.add_state(_('released')) |
|
367 closed = twf.add_state(_('closed')) |
|
368 twf.add_wftransition(_('identify'), subwf, (created,), |
|
369 [(xsigned, identified), (xaborted, created)]) |
|
370 twf.add_wftransition(_('release'), subwf, (identified,), |
|
371 [(xsigned, released), (xaborted, identified)]) |
|
372 twf.add_wftransition(_('close'), subwf, (released,), |
|
373 [(xsigned, closed), (xaborted, released)]) |
|
374 shell.commit() |
|
375 with self.admin_access.repo_cnx() as cnx: |
|
376 group = cnx.create_entity('CWGroup', name=u'grp1') |
|
377 cnx.commit() |
|
378 iworkflowable = group.cw_adapt_to('IWorkflowable') |
|
379 for trans in ('identify', 'release', 'close'): |
|
380 iworkflowable.fire_transition(trans) |
|
381 cnx.commit() |
|
382 |
|
383 |
|
384 def test_swf_magic_tr(self): |
|
385 with self.admin_access.shell() as shell: |
|
386 # sub-workflow |
|
387 subwf = add_wf(shell, 'CWGroup', name='subworkflow') |
|
388 xsigning = subwf.add_state('xsigning', initial=True) |
|
389 xaborted = subwf.add_state('xaborted') |
|
390 xsigned = subwf.add_state('xsigned') |
|
391 xabort = subwf.add_transition('xabort', (xsigning,), xaborted) |
|
392 xsign = subwf.add_transition('xsign', (xsigning,), xsigned) |
|
393 # main workflow |
|
394 twf = add_wf(shell, 'CWGroup', name='mainwf', default=True) |
|
395 created = twf.add_state(_('created'), initial=True) |
|
396 identified = twf.add_state(_('identified')) |
|
397 released = twf.add_state(_('released')) |
|
398 twf.add_wftransition(_('identify'), subwf, created, |
|
399 [(xaborted, None), (xsigned, identified)]) |
|
400 twf.add_wftransition(_('release'), subwf, identified, |
|
401 [(xaborted, None)]) |
|
402 shell.commit() |
|
403 with self.admin_access.web_request() as req: |
|
404 group = req.create_entity('CWGroup', name=u'grp1') |
|
405 req.cnx.commit() |
|
406 iworkflowable = group.cw_adapt_to('IWorkflowable') |
|
407 for trans, nextstate in (('identify', 'xsigning'), |
|
408 ('xabort', 'created'), |
|
409 ('identify', 'xsigning'), |
|
410 ('xsign', 'identified'), |
|
411 ('release', 'xsigning'), |
|
412 ('xabort', 'identified') |
|
413 ): |
|
414 iworkflowable.fire_transition(trans) |
|
415 req.cnx.commit() |
|
416 group.cw_clear_all_caches() |
|
417 self.assertEqual(iworkflowable.state, nextstate) |
|
418 |
|
419 def test_replace_state(self): |
|
420 with self.admin_access.shell() as shell: |
|
421 wf = add_wf(shell, 'CWGroup', name='groupwf', default=True) |
|
422 s_new = wf.add_state('new', initial=True) |
|
423 s_state1 = wf.add_state('state1') |
|
424 wf.add_transition('tr', (s_new,), s_state1) |
|
425 shell.commit() |
|
426 |
|
427 with self.admin_access.repo_cnx() as cnx: |
|
428 group = cnx.create_entity('CWGroup', name=u'grp1') |
|
429 cnx.commit() |
|
430 |
|
431 iwf = group.cw_adapt_to('IWorkflowable') |
|
432 iwf.fire_transition('tr') |
|
433 cnx.commit() |
|
434 group.cw_clear_all_caches() |
|
435 |
|
436 wf = cnx.entity_from_eid(wf.eid) |
|
437 wf.add_state('state2') |
|
438 with cnx.security_enabled(write=False): |
|
439 wf.replace_state('state1', 'state2') |
|
440 cnx.commit() |
|
441 |
|
442 self.assertEqual(iwf.state, 'state2') |
|
443 self.assertEqual(iwf.latest_trinfo().to_state[0].name, 'state2') |
|
444 |
|
445 |
|
446 class CustomWorkflowTC(CubicWebTC): |
|
447 |
|
448 def setup_database(self): |
|
449 with self.admin_access.repo_cnx() as cnx: |
|
450 self.member_eid = self.create_user(cnx, 'member').eid |
|
451 |
|
452 def test_custom_wf_replace_state_no_history(self): |
|
453 """member in inital state with no previous history, state is simply |
|
454 redirected when changing workflow |
|
455 """ |
|
456 with self.admin_access.shell() as shell: |
|
457 wf = add_wf(shell, 'CWUser') |
|
458 wf.add_state('asleep', initial=True) |
|
459 with self.admin_access.web_request() as req: |
|
460 req.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
|
461 {'wf': wf.eid, 'x': self.member_eid}) |
|
462 member = req.entity_from_eid(self.member_eid) |
|
463 iworkflowable = member.cw_adapt_to('IWorkflowable') |
|
464 self.assertEqual(iworkflowable.state, 'activated') # no change before commit |
|
465 req.cnx.commit() |
|
466 member.cw_clear_all_caches() |
|
467 self.assertEqual(iworkflowable.current_workflow.eid, wf.eid) |
|
468 self.assertEqual(iworkflowable.state, 'asleep') |
|
469 self.assertEqual(iworkflowable.workflow_history, ()) |
|
470 |
|
471 def test_custom_wf_replace_state_keep_history(self): |
|
472 """member in inital state with some history, state is redirected and |
|
473 state change is recorded to history |
|
474 """ |
|
475 with self.admin_access.web_request() as req: |
|
476 member = req.entity_from_eid(self.member_eid) |
|
477 iworkflowable = member.cw_adapt_to('IWorkflowable') |
|
478 iworkflowable.fire_transition('deactivate') |
|
479 iworkflowable.fire_transition('activate') |
|
480 req.cnx.commit() |
|
481 with self.admin_access.shell() as shell: |
|
482 wf = add_wf(shell, 'CWUser') |
|
483 wf.add_state('asleep', initial=True) |
|
484 shell.rqlexec('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
|
485 {'wf': wf.eid, 'x': self.member_eid}) |
|
486 with self.admin_access.web_request() as req: |
|
487 member = req.entity_from_eid(self.member_eid) |
|
488 iworkflowable = member.cw_adapt_to('IWorkflowable') |
|
489 self.assertEqual(iworkflowable.current_workflow.eid, wf.eid) |
|
490 self.assertEqual(iworkflowable.state, 'asleep') |
|
491 self.assertEqual(parse_hist(iworkflowable.workflow_history), |
|
492 [('activated', 'deactivated', 'deactivate', None), |
|
493 ('deactivated', 'activated', 'activate', None), |
|
494 ('activated', 'asleep', None, 'workflow changed to "CWUser"')]) |
|
495 |
|
496 def test_custom_wf_no_initial_state(self): |
|
497 """try to set a custom workflow which has no initial state""" |
|
498 with self.admin_access.shell() as shell: |
|
499 wf = add_wf(shell, 'CWUser') |
|
500 wf.add_state('asleep') |
|
501 shell.rqlexec('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
|
502 {'wf': wf.eid, 'x': self.member_eid}) |
|
503 with self.assertRaises(ValidationError) as cm: |
|
504 shell.commit() |
|
505 self.assertEqual(cm.exception.errors, {'custom_workflow-subject': u'workflow has no initial state'}) |
|
506 |
|
507 def test_custom_wf_bad_etype(self): |
|
508 """try to set a custom workflow which doesn't apply to entity type""" |
|
509 with self.admin_access.shell() as shell: |
|
510 wf = add_wf(shell, 'Company') |
|
511 wf.add_state('asleep', initial=True) |
|
512 shell.rqlexec('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
|
513 {'wf': wf.eid, 'x': self.member_eid}) |
|
514 with self.assertRaises(ValidationError) as cm: |
|
515 shell.commit() |
|
516 self.assertEqual(cm.exception.errors, {'custom_workflow-subject': u"workflow isn't a workflow for this type"}) |
|
517 |
|
518 def test_del_custom_wf(self): |
|
519 """member in some state shared by the new workflow, nothing has to be |
|
520 done |
|
521 """ |
|
522 with self.admin_access.web_request() as req: |
|
523 member = req.entity_from_eid(self.member_eid) |
|
524 iworkflowable = member.cw_adapt_to('IWorkflowable') |
|
525 iworkflowable.fire_transition('deactivate') |
|
526 req.cnx.commit() |
|
527 with self.admin_access.shell() as shell: |
|
528 wf = add_wf(shell, 'CWUser') |
|
529 wf.add_state('asleep', initial=True) |
|
530 shell.rqlexec('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
|
531 {'wf': wf.eid, 'x': self.member_eid}) |
|
532 shell.commit() |
|
533 with self.admin_access.web_request() as req: |
|
534 req.execute('DELETE X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
|
535 {'wf': wf.eid, 'x': self.member_eid}) |
|
536 member = req.entity_from_eid(self.member_eid) |
|
537 iworkflowable = member.cw_adapt_to('IWorkflowable') |
|
538 self.assertEqual(iworkflowable.state, 'asleep')# no change before commit |
|
539 req.cnx.commit() |
|
540 member.cw_clear_all_caches() |
|
541 self.assertEqual(iworkflowable.current_workflow.name, "default user workflow") |
|
542 self.assertEqual(iworkflowable.state, 'activated') |
|
543 self.assertEqual(parse_hist(iworkflowable.workflow_history), |
|
544 [('activated', 'deactivated', 'deactivate', None), |
|
545 ('deactivated', 'asleep', None, 'workflow changed to "CWUser"'), |
|
546 ('asleep', 'activated', None, 'workflow changed to "default user workflow"'),]) |
|
547 |
|
548 |
|
549 class AutoTransitionTC(CubicWebTC): |
|
550 |
|
551 def setup_custom_wf(self): |
|
552 with self.admin_access.shell() as shell: |
|
553 wf = add_wf(shell, 'CWUser') |
|
554 asleep = wf.add_state('asleep', initial=True) |
|
555 dead = wf.add_state('dead') |
|
556 wf.add_transition('rest', asleep, asleep) |
|
557 wf.add_transition('sick', asleep, dead, type=u'auto', |
|
558 conditions=({'expr': u'X surname "toto"', |
|
559 'mainvars': u'X'},)) |
|
560 return wf |
|
561 |
|
562 def test_auto_transition_fired(self): |
|
563 wf = self.setup_custom_wf() |
|
564 with self.admin_access.web_request() as req: |
|
565 user = self.create_user(req, 'member') |
|
566 iworkflowable = user.cw_adapt_to('IWorkflowable') |
|
567 req.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
|
568 {'wf': wf.eid, 'x': user.eid}) |
|
569 req.cnx.commit() |
|
570 user.cw_clear_all_caches() |
|
571 self.assertEqual(iworkflowable.state, 'asleep') |
|
572 self.assertEqual([t.name for t in iworkflowable.possible_transitions()], |
|
573 ['rest']) |
|
574 iworkflowable.fire_transition('rest') |
|
575 req.cnx.commit() |
|
576 user.cw_clear_all_caches() |
|
577 self.assertEqual(iworkflowable.state, 'asleep') |
|
578 self.assertEqual([t.name for t in iworkflowable.possible_transitions()], |
|
579 ['rest']) |
|
580 self.assertEqual(parse_hist(iworkflowable.workflow_history), |
|
581 [('asleep', 'asleep', 'rest', None)]) |
|
582 user.cw_set(surname=u'toto') # fulfill condition |
|
583 req.cnx.commit() |
|
584 iworkflowable.fire_transition('rest') |
|
585 req.cnx.commit() |
|
586 user.cw_clear_all_caches() |
|
587 self.assertEqual(iworkflowable.state, 'dead') |
|
588 self.assertEqual(parse_hist(iworkflowable.workflow_history), |
|
589 [('asleep', 'asleep', 'rest', None), |
|
590 ('asleep', 'asleep', 'rest', None), |
|
591 ('asleep', 'dead', 'sick', None),]) |
|
592 |
|
593 def test_auto_transition_custom_initial_state_fired(self): |
|
594 wf = self.setup_custom_wf() |
|
595 with self.admin_access.web_request() as req: |
|
596 user = self.create_user(req, 'member', surname=u'toto') |
|
597 req.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
|
598 {'wf': wf.eid, 'x': user.eid}) |
|
599 req.cnx.commit() |
|
600 user.cw_clear_all_caches() |
|
601 iworkflowable = user.cw_adapt_to('IWorkflowable') |
|
602 self.assertEqual(iworkflowable.state, 'dead') |
|
603 |
|
604 def test_auto_transition_initial_state_fired(self): |
|
605 with self.admin_access.web_request() as req: |
|
606 wf = req.execute('Any WF WHERE ET default_workflow WF, ' |
|
607 'ET name %(et)s', {'et': 'CWUser'}).get_entity(0, 0) |
|
608 dead = wf.add_state('dead') |
|
609 wf.add_transition('sick', wf.state_by_name('activated'), dead, |
|
610 type=u'auto', conditions=({'expr': u'X surname "toto"', |
|
611 'mainvars': u'X'},)) |
|
612 req.cnx.commit() |
|
613 with self.admin_access.web_request() as req: |
|
614 user = self.create_user(req, 'member', surname=u'toto') |
|
615 req.cnx.commit() |
|
616 iworkflowable = user.cw_adapt_to('IWorkflowable') |
|
617 self.assertEqual(iworkflowable.state, 'dead') |
|
618 |
|
619 |
|
620 class WorkflowHooksTC(CubicWebTC): |
|
621 |
|
622 def setUp(self): |
|
623 CubicWebTC.setUp(self) |
|
624 with self.admin_access.web_request() as req: |
|
625 self.wf = req.user.cw_adapt_to('IWorkflowable').current_workflow |
|
626 self.s_activated = self.wf.state_by_name('activated').eid |
|
627 self.s_deactivated = self.wf.state_by_name('deactivated').eid |
|
628 self.s_dummy = self.wf.add_state(u'dummy').eid |
|
629 self.wf.add_transition(u'dummy', (self.s_deactivated,), self.s_dummy) |
|
630 ueid = self.create_user(req, 'stduser', commit=False).eid |
|
631 # test initial state is set |
|
632 rset = req.execute('Any N WHERE S name N, X in_state S, X eid %(x)s', |
|
633 {'x' : ueid}) |
|
634 self.assertFalse(rset, rset.rows) |
|
635 req.cnx.commit() |
|
636 initialstate = req.execute('Any N WHERE S name N, X in_state S, X eid %(x)s', |
|
637 {'x' : ueid})[0][0] |
|
638 self.assertEqual(initialstate, u'activated') |
|
639 # give access to users group on the user's wf transitions |
|
640 # so we can test wf enforcing on euser (managers don't have anymore this |
|
641 # enforcement |
|
642 req.execute('SET X require_group G ' |
|
643 'WHERE G name "users", X transition_of WF, WF eid %(wf)s', |
|
644 {'wf': self.wf.eid}) |
|
645 req.cnx.commit() |
|
646 |
|
647 # XXX currently, we've to rely on hooks to set initial state, or to use execute |
|
648 # def test_initial_state(self): |
|
649 # cnx = self.login('stduser') |
|
650 # cu = cnx.cursor() |
|
651 # self.assertRaises(ValidationError, cu.execute, |
|
652 # 'INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, ' |
|
653 # 'X in_state S WHERE S name "deactivated"', {'pwd': 'oops'}) |
|
654 # cnx.close() |
|
655 # # though managers can do whatever he want |
|
656 # self.execute('INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, ' |
|
657 # 'X in_state S, X in_group G WHERE S name "deactivated", G name "users"', {'pwd': 'oops'}) |
|
658 # self.commit() |
|
659 |
|
660 # test that the workflow is correctly enforced |
|
661 |
|
662 def _cleanup_msg(self, msg): |
|
663 """remove the variable part of one specific error message""" |
|
664 lmsg = msg.split() |
|
665 lmsg.pop(1) |
|
666 lmsg.pop() |
|
667 return ' '.join(lmsg) |
|
668 |
|
669 def test_transition_checking1(self): |
|
670 with self.new_access('stduser').repo_cnx() as cnx: |
|
671 user = cnx.user |
|
672 iworkflowable = user.cw_adapt_to('IWorkflowable') |
|
673 with self.assertRaises(ValidationError) as cm: |
|
674 iworkflowable.fire_transition('activate') |
|
675 self.assertEqual(self._cleanup_msg(cm.exception.errors['by_transition-subject']), |
|
676 u"transition isn't allowed from") |
|
677 |
|
678 def test_transition_checking2(self): |
|
679 with self.new_access('stduser').repo_cnx() as cnx: |
|
680 user = cnx.user |
|
681 iworkflowable = user.cw_adapt_to('IWorkflowable') |
|
682 with self.assertRaises(ValidationError) as cm: |
|
683 iworkflowable.fire_transition('dummy') |
|
684 self.assertEqual(self._cleanup_msg(cm.exception.errors['by_transition-subject']), |
|
685 u"transition isn't allowed from") |
|
686 |
|
687 def test_transition_checking3(self): |
|
688 with self.new_access('stduser').repo_cnx() as cnx: |
|
689 user = cnx.user |
|
690 iworkflowable = user.cw_adapt_to('IWorkflowable') |
|
691 iworkflowable.fire_transition('deactivate') |
|
692 cnx.commit() |
|
693 with self.assertRaises(ValidationError) as cm: |
|
694 iworkflowable.fire_transition('deactivate') |
|
695 self.assertEqual(self._cleanup_msg(cm.exception.errors['by_transition-subject']), |
|
696 u"transition isn't allowed from") |
|
697 cnx.rollback() |
|
698 # get back now |
|
699 iworkflowable.fire_transition('activate') |
|
700 cnx.commit() |
|
701 |
|
702 |
|
703 if __name__ == '__main__': |
|
704 from logilab.common.testlib import unittest_main |
|
705 unittest_main() |