98 self.assertEquals(rdef.cardinality, '1*') |
98 self.assertEquals(rdef.cardinality, '1*') |
99 self.member = self.create_user('member') |
99 self.member = self.create_user('member') |
100 |
100 |
101 def test_workflow_base(self): |
101 def test_workflow_base(self): |
102 e = self.create_user('toto') |
102 e = self.create_user('toto') |
103 self.assertEquals(e.state, 'activated') |
103 iworkflowable = e.cw_adapt_to('IWorkflowable') |
104 e.change_state('deactivated', u'deactivate 1') |
104 self.assertEquals(iworkflowable.state, 'activated') |
105 self.commit() |
105 iworkflowable.change_state('deactivated', u'deactivate 1') |
106 e.change_state('activated', u'activate 1') |
106 self.commit() |
107 self.commit() |
107 iworkflowable.change_state('activated', u'activate 1') |
108 e.change_state('deactivated', u'deactivate 2') |
108 self.commit() |
109 self.commit() |
109 iworkflowable.change_state('deactivated', u'deactivate 2') |
110 e.clear_related_cache('wf_info_for', 'object') |
110 self.commit() |
|
111 e.cw_clear_relation_cache('wf_info_for', 'object') |
111 self.assertEquals([tr.comment for tr in e.reverse_wf_info_for], |
112 self.assertEquals([tr.comment for tr in e.reverse_wf_info_for], |
112 ['deactivate 1', 'activate 1', 'deactivate 2']) |
113 ['deactivate 1', 'activate 1', 'deactivate 2']) |
113 self.assertEquals(e.latest_trinfo().comment, 'deactivate 2') |
114 self.assertEquals(iworkflowable.latest_trinfo().comment, 'deactivate 2') |
114 |
115 |
115 def test_possible_transitions(self): |
116 def test_possible_transitions(self): |
116 user = self.execute('CWUser X').get_entity(0, 0) |
117 user = self.execute('CWUser X').get_entity(0, 0) |
117 trs = list(user.possible_transitions()) |
118 iworkflowable = user.cw_adapt_to('IWorkflowable') |
|
119 trs = list(iworkflowable.possible_transitions()) |
118 self.assertEquals(len(trs), 1) |
120 self.assertEquals(len(trs), 1) |
119 self.assertEquals(trs[0].name, u'deactivate') |
121 self.assertEquals(trs[0].name, u'deactivate') |
120 self.assertEquals(trs[0].destination(None).name, u'deactivated') |
122 self.assertEquals(trs[0].destination(None).name, u'deactivated') |
121 # test a std user get no possible transition |
123 # test a std user get no possible transition |
122 cnx = self.login('member') |
124 cnx = self.login('member') |
123 # fetch the entity using the new session |
125 # fetch the entity using the new session |
124 trs = list(cnx.user().possible_transitions()) |
126 trs = list(cnx.user().cw_adapt_to('IWorkflowable').possible_transitions()) |
125 self.assertEquals(len(trs), 0) |
127 self.assertEquals(len(trs), 0) |
126 |
128 |
127 def _test_manager_deactivate(self, user): |
129 def _test_manager_deactivate(self, user): |
128 user.clear_related_cache('in_state', 'subject') |
130 iworkflowable = user.cw_adapt_to('IWorkflowable') |
|
131 user.cw_clear_relation_cache('in_state', 'subject') |
129 self.assertEquals(len(user.in_state), 1) |
132 self.assertEquals(len(user.in_state), 1) |
130 self.assertEquals(user.state, 'deactivated') |
133 self.assertEquals(iworkflowable.state, 'deactivated') |
131 trinfo = user.latest_trinfo() |
134 trinfo = iworkflowable.latest_trinfo() |
132 self.assertEquals(trinfo.previous_state.name, 'activated') |
135 self.assertEquals(trinfo.previous_state.name, 'activated') |
133 self.assertEquals(trinfo.new_state.name, 'deactivated') |
136 self.assertEquals(trinfo.new_state.name, 'deactivated') |
134 self.assertEquals(trinfo.comment, 'deactivate user') |
137 self.assertEquals(trinfo.comment, 'deactivate user') |
135 self.assertEquals(trinfo.comment_format, 'text/plain') |
138 self.assertEquals(trinfo.comment_format, 'text/plain') |
136 return trinfo |
139 return trinfo |
137 |
140 |
138 def test_change_state(self): |
141 def test_change_state(self): |
139 user = self.user() |
142 user = self.user() |
140 user.change_state('deactivated', comment=u'deactivate user') |
143 iworkflowable = user.cw_adapt_to('IWorkflowable') |
|
144 iworkflowable.change_state('deactivated', comment=u'deactivate user') |
141 trinfo = self._test_manager_deactivate(user) |
145 trinfo = self._test_manager_deactivate(user) |
142 self.assertEquals(trinfo.transition, None) |
146 self.assertEquals(trinfo.transition, None) |
143 |
147 |
144 def test_set_in_state_bad_wf(self): |
148 def test_set_in_state_bad_wf(self): |
145 wf = add_wf(self, 'CWUser') |
149 wf = add_wf(self, 'CWUser') |
152 self.assertEquals(ex.errors, {'in_state-subject': "state doesn't belong to entity's workflow. " |
156 self.assertEquals(ex.errors, {'in_state-subject': "state doesn't belong to entity's workflow. " |
153 "You may want to set a custom workflow for this entity first."}) |
157 "You may want to set a custom workflow for this entity first."}) |
154 |
158 |
155 def test_fire_transition(self): |
159 def test_fire_transition(self): |
156 user = self.user() |
160 user = self.user() |
157 user.fire_transition('deactivate', comment=u'deactivate user') |
161 iworkflowable = user.cw_adapt_to('IWorkflowable') |
|
162 iworkflowable.fire_transition('deactivate', comment=u'deactivate user') |
158 user.clear_all_caches() |
163 user.clear_all_caches() |
159 self.assertEquals(user.state, 'deactivated') |
164 self.assertEquals(iworkflowable.state, 'deactivated') |
160 self._test_manager_deactivate(user) |
165 self._test_manager_deactivate(user) |
161 trinfo = self._test_manager_deactivate(user) |
166 trinfo = self._test_manager_deactivate(user) |
162 self.assertEquals(trinfo.transition.name, 'deactivate') |
167 self.assertEquals(trinfo.transition.name, 'deactivate') |
163 |
168 |
164 def test_goback_transition(self): |
169 def test_goback_transition(self): |
165 wf = self.session.user.current_workflow |
170 wf = self.session.user.cw_adapt_to('IWorkflowable').current_workflow |
166 asleep = wf.add_state('asleep') |
171 asleep = wf.add_state('asleep') |
167 wf.add_transition('rest', (wf.state_by_name('activated'), wf.state_by_name('deactivated')), |
172 wf.add_transition('rest', (wf.state_by_name('activated'), |
168 asleep) |
173 wf.state_by_name('deactivated')), |
|
174 asleep) |
169 wf.add_transition('wake up', asleep) |
175 wf.add_transition('wake up', asleep) |
170 user = self.create_user('stduser') |
176 user = self.create_user('stduser') |
171 user.fire_transition('rest') |
177 iworkflowable = user.cw_adapt_to('IWorkflowable') |
172 self.commit() |
178 iworkflowable.fire_transition('rest') |
173 user.fire_transition('wake up') |
179 self.commit() |
174 self.commit() |
180 iworkflowable.fire_transition('wake up') |
175 self.assertEquals(user.state, 'activated') |
181 self.commit() |
176 user.fire_transition('deactivate') |
182 self.assertEquals(iworkflowable.state, 'activated') |
177 self.commit() |
183 iworkflowable.fire_transition('deactivate') |
178 user.fire_transition('rest') |
184 self.commit() |
179 self.commit() |
185 iworkflowable.fire_transition('rest') |
180 user.fire_transition('wake up') |
186 self.commit() |
|
187 iworkflowable.fire_transition('wake up') |
181 self.commit() |
188 self.commit() |
182 user.clear_all_caches() |
189 user.clear_all_caches() |
183 self.assertEquals(user.state, 'deactivated') |
190 self.assertEquals(iworkflowable.state, 'deactivated') |
184 |
191 |
185 # XXX test managers can change state without matching transition |
192 # XXX test managers can change state without matching transition |
186 |
193 |
187 def _test_stduser_deactivate(self): |
194 def _test_stduser_deactivate(self): |
188 ueid = self.member.eid |
195 ueid = self.member.eid |
189 self.create_user('tutu') |
196 self.create_user('tutu') |
190 cnx = self.login('tutu') |
197 cnx = self.login('tutu') |
191 req = self.request() |
198 req = self.request() |
192 member = req.entity_from_eid(self.member.eid) |
199 iworkflowable = req.entity_from_eid(self.member.eid).cw_adapt_to('IWorkflowable') |
193 ex = self.assertRaises(ValidationError, |
200 ex = self.assertRaises(ValidationError, |
194 member.fire_transition, 'deactivate') |
201 iworkflowable.fire_transition, 'deactivate') |
195 self.assertEquals(ex.errors, {'by_transition-subject': "transition may not be fired"}) |
202 self.assertEquals(ex.errors, {'by_transition-subject': "transition may not be fired"}) |
196 cnx.close() |
203 cnx.close() |
197 cnx = self.login('member') |
204 cnx = self.login('member') |
198 req = self.request() |
205 req = self.request() |
199 member = req.entity_from_eid(self.member.eid) |
206 iworkflowable = req.entity_from_eid(self.member.eid).cw_adapt_to('IWorkflowable') |
200 member.fire_transition('deactivate') |
207 iworkflowable.fire_transition('deactivate') |
201 cnx.commit() |
208 cnx.commit() |
202 ex = self.assertRaises(ValidationError, |
209 ex = self.assertRaises(ValidationError, |
203 member.fire_transition, 'activate') |
210 iworkflowable.fire_transition, 'activate') |
204 self.assertEquals(ex.errors, {'by_transition-subject': "transition may not be fired"}) |
211 self.assertEquals(ex.errors, {'by_transition-subject': "transition may not be fired"}) |
205 |
212 |
206 def test_fire_transition_owned_by(self): |
213 def test_fire_transition_owned_by(self): |
207 self.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", ' |
214 self.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", ' |
208 'X expression "X owned_by U", T condition X ' |
215 'X expression "X owned_by U", T condition X ' |
248 state3 = mwf.add_state(u'state3') |
255 state3 = mwf.add_state(u'state3') |
249 swftr1 = mwf.add_wftransition(u'swftr1', swf, state1, |
256 swftr1 = mwf.add_wftransition(u'swftr1', swf, state1, |
250 [(swfstate2, state2), (swfstate3, state3)]) |
257 [(swfstate2, state2), (swfstate3, state3)]) |
251 self.assertEquals(swftr1.destination(None).eid, swfstate1.eid) |
258 self.assertEquals(swftr1.destination(None).eid, swfstate1.eid) |
252 # workflows built, begin test |
259 # workflows built, begin test |
253 self.group = self.request().create_entity('CWGroup', name=u'grp1') |
260 group = self.request().create_entity('CWGroup', name=u'grp1') |
254 self.commit() |
261 self.commit() |
255 self.assertEquals(self.group.current_state.eid, state1.eid) |
262 iworkflowable = group.cw_adapt_to('IWorkflowable') |
256 self.assertEquals(self.group.current_workflow.eid, mwf.eid) |
263 self.assertEquals(iworkflowable.current_state.eid, state1.eid) |
257 self.assertEquals(self.group.main_workflow.eid, mwf.eid) |
264 self.assertEquals(iworkflowable.current_workflow.eid, mwf.eid) |
258 self.assertEquals(self.group.subworkflow_input_transition(), None) |
265 self.assertEquals(iworkflowable.main_workflow.eid, mwf.eid) |
259 self.group.fire_transition('swftr1', u'go') |
266 self.assertEquals(iworkflowable.subworkflow_input_transition(), None) |
260 self.commit() |
267 iworkflowable.fire_transition('swftr1', u'go') |
261 self.group.clear_all_caches() |
268 self.commit() |
262 self.assertEquals(self.group.current_state.eid, swfstate1.eid) |
269 group.clear_all_caches() |
263 self.assertEquals(self.group.current_workflow.eid, swf.eid) |
270 self.assertEquals(iworkflowable.current_state.eid, swfstate1.eid) |
264 self.assertEquals(self.group.main_workflow.eid, mwf.eid) |
271 self.assertEquals(iworkflowable.current_workflow.eid, swf.eid) |
265 self.assertEquals(self.group.subworkflow_input_transition().eid, swftr1.eid) |
272 self.assertEquals(iworkflowable.main_workflow.eid, mwf.eid) |
266 self.group.fire_transition('tr1', u'go') |
273 self.assertEquals(iworkflowable.subworkflow_input_transition().eid, swftr1.eid) |
267 self.commit() |
274 iworkflowable.fire_transition('tr1', u'go') |
268 self.group.clear_all_caches() |
275 self.commit() |
269 self.assertEquals(self.group.current_state.eid, state2.eid) |
276 group.clear_all_caches() |
270 self.assertEquals(self.group.current_workflow.eid, mwf.eid) |
277 self.assertEquals(iworkflowable.current_state.eid, state2.eid) |
271 self.assertEquals(self.group.main_workflow.eid, mwf.eid) |
278 self.assertEquals(iworkflowable.current_workflow.eid, mwf.eid) |
272 self.assertEquals(self.group.subworkflow_input_transition(), None) |
279 self.assertEquals(iworkflowable.main_workflow.eid, mwf.eid) |
|
280 self.assertEquals(iworkflowable.subworkflow_input_transition(), None) |
273 # force back to swfstate1 is impossible since we can't any more find |
281 # force back to swfstate1 is impossible since we can't any more find |
274 # subworkflow input transition |
282 # subworkflow input transition |
275 ex = self.assertRaises(ValidationError, |
283 ex = self.assertRaises(ValidationError, |
276 self.group.change_state, swfstate1, u'gadget') |
284 iworkflowable.change_state, swfstate1, u'gadget') |
277 self.assertEquals(ex.errors, {'to_state-subject': "state doesn't belong to entity's workflow"}) |
285 self.assertEquals(ex.errors, {'to_state-subject': "state doesn't belong to entity's workflow"}) |
278 self.rollback() |
286 self.rollback() |
279 # force back to state1 |
287 # force back to state1 |
280 self.group.change_state('state1', u'gadget') |
288 iworkflowable.change_state('state1', u'gadget') |
281 self.group.fire_transition('swftr1', u'au') |
289 iworkflowable.fire_transition('swftr1', u'au') |
282 self.group.clear_all_caches() |
290 group.clear_all_caches() |
283 self.group.fire_transition('tr2', u'chapeau') |
291 iworkflowable.fire_transition('tr2', u'chapeau') |
284 self.commit() |
292 self.commit() |
285 self.group.clear_all_caches() |
293 group.clear_all_caches() |
286 self.assertEquals(self.group.current_state.eid, state3.eid) |
294 self.assertEquals(iworkflowable.current_state.eid, state3.eid) |
287 self.assertEquals(self.group.current_workflow.eid, mwf.eid) |
295 self.assertEquals(iworkflowable.current_workflow.eid, mwf.eid) |
288 self.assertEquals(self.group.main_workflow.eid, mwf.eid) |
296 self.assertEquals(iworkflowable.main_workflow.eid, mwf.eid) |
289 self.assertListEquals(parse_hist(self.group.workflow_history), |
297 self.assertListEquals(parse_hist(iworkflowable.workflow_history), |
290 [('state1', 'swfstate1', 'swftr1', 'go'), |
298 [('state1', 'swfstate1', 'swftr1', 'go'), |
291 ('swfstate1', 'swfstate2', 'tr1', 'go'), |
299 ('swfstate1', 'swfstate2', 'tr1', 'go'), |
292 ('swfstate2', 'state2', 'swftr1', 'exiting from subworkflow subworkflow'), |
300 ('swfstate2', 'state2', 'swftr1', 'exiting from subworkflow subworkflow'), |
293 ('state2', 'state1', None, 'gadget'), |
301 ('state2', 'state1', None, 'gadget'), |
294 ('state1', 'swfstate1', 'swftr1', 'au'), |
302 ('state1', 'swfstate1', 'swftr1', 'au'), |
360 twf.add_wftransition(_('release'), subwf, identified, |
369 twf.add_wftransition(_('release'), subwf, identified, |
361 [(xaborted, None)]) |
370 [(xaborted, None)]) |
362 self.commit() |
371 self.commit() |
363 group = self.request().create_entity('CWGroup', name=u'grp1') |
372 group = self.request().create_entity('CWGroup', name=u'grp1') |
364 self.commit() |
373 self.commit() |
|
374 iworkflowable = group.cw_adapt_to('IWorkflowable') |
365 for trans, nextstate in (('identify', 'xsigning'), |
375 for trans, nextstate in (('identify', 'xsigning'), |
366 ('xabort', 'created'), |
376 ('xabort', 'created'), |
367 ('identify', 'xsigning'), |
377 ('identify', 'xsigning'), |
368 ('xsign', 'identified'), |
378 ('xsign', 'identified'), |
369 ('release', 'xsigning'), |
379 ('release', 'xsigning'), |
370 ('xabort', 'identified') |
380 ('xabort', 'identified') |
371 ): |
381 ): |
372 group.fire_transition(trans) |
382 iworkflowable.fire_transition(trans) |
373 self.commit() |
383 self.commit() |
374 group.clear_all_caches() |
384 group.clear_all_caches() |
375 self.assertEquals(group.state, nextstate) |
385 self.assertEquals(iworkflowable.state, nextstate) |
376 |
386 |
377 |
387 |
378 class CustomWorkflowTC(CubicWebTC): |
388 class CustomWorkflowTC(CubicWebTC): |
379 |
389 |
380 def setup_database(self): |
390 def setup_database(self): |
387 wf = add_wf(self, 'CWUser') |
397 wf = add_wf(self, 'CWUser') |
388 wf.add_state('asleep', initial=True) |
398 wf.add_state('asleep', initial=True) |
389 self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
399 self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
390 {'wf': wf.eid, 'x': self.member.eid}) |
400 {'wf': wf.eid, 'x': self.member.eid}) |
391 self.member.clear_all_caches() |
401 self.member.clear_all_caches() |
392 self.assertEquals(self.member.state, 'activated')# no change before commit |
402 iworkflowable = self.member.cw_adapt_to('IWorkflowable') |
|
403 self.assertEquals(iworkflowable.state, 'activated')# no change before commit |
393 self.commit() |
404 self.commit() |
394 self.member.clear_all_caches() |
405 self.member.clear_all_caches() |
395 self.assertEquals(self.member.current_workflow.eid, wf.eid) |
406 self.assertEquals(iworkflowable.current_workflow.eid, wf.eid) |
396 self.assertEquals(self.member.state, 'asleep') |
407 self.assertEquals(iworkflowable.state, 'asleep') |
397 self.assertEquals(self.member.workflow_history, ()) |
408 self.assertEquals(iworkflowable.workflow_history, ()) |
398 |
409 |
399 def test_custom_wf_replace_state_keep_history(self): |
410 def test_custom_wf_replace_state_keep_history(self): |
400 """member in inital state with some history, state is redirected and |
411 """member in inital state with some history, state is redirected and |
401 state change is recorded to history |
412 state change is recorded to history |
402 """ |
413 """ |
403 self.member.fire_transition('deactivate') |
414 iworkflowable = self.member.cw_adapt_to('IWorkflowable') |
404 self.member.fire_transition('activate') |
415 iworkflowable.fire_transition('deactivate') |
|
416 iworkflowable.fire_transition('activate') |
405 wf = add_wf(self, 'CWUser') |
417 wf = add_wf(self, 'CWUser') |
406 wf.add_state('asleep', initial=True) |
418 wf.add_state('asleep', initial=True) |
407 self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
419 self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
408 {'wf': wf.eid, 'x': self.member.eid}) |
420 {'wf': wf.eid, 'x': self.member.eid}) |
409 self.commit() |
421 self.commit() |
410 self.member.clear_all_caches() |
422 self.member.clear_all_caches() |
411 self.assertEquals(self.member.current_workflow.eid, wf.eid) |
423 self.assertEquals(iworkflowable.current_workflow.eid, wf.eid) |
412 self.assertEquals(self.member.state, 'asleep') |
424 self.assertEquals(iworkflowable.state, 'asleep') |
413 self.assertEquals(parse_hist(self.member.workflow_history), |
425 self.assertEquals(parse_hist(iworkflowable.workflow_history), |
414 [('activated', 'deactivated', 'deactivate', None), |
426 [('activated', 'deactivated', 'deactivate', None), |
415 ('deactivated', 'activated', 'activate', None), |
427 ('deactivated', 'activated', 'activate', None), |
416 ('activated', 'asleep', None, 'workflow changed to "CWUser"')]) |
428 ('activated', 'asleep', None, 'workflow changed to "CWUser"')]) |
417 |
429 |
418 def test_custom_wf_no_initial_state(self): |
430 def test_custom_wf_no_initial_state(self): |
419 """try to set a custom workflow which has no initial state""" |
431 """try to set a custom workflow which has no initial state""" |
420 self.member.fire_transition('deactivate') |
432 iworkflowable = self.member.cw_adapt_to('IWorkflowable') |
|
433 iworkflowable.fire_transition('deactivate') |
421 wf = add_wf(self, 'CWUser') |
434 wf = add_wf(self, 'CWUser') |
422 wf.add_state('asleep') |
435 wf.add_state('asleep') |
423 self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
436 self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
424 {'wf': wf.eid, 'x': self.member.eid}) |
437 {'wf': wf.eid, 'x': self.member.eid}) |
425 ex = self.assertRaises(ValidationError, self.commit) |
438 ex = self.assertRaises(ValidationError, self.commit) |
436 |
449 |
437 def test_del_custom_wf(self): |
450 def test_del_custom_wf(self): |
438 """member in some state shared by the new workflow, nothing has to be |
451 """member in some state shared by the new workflow, nothing has to be |
439 done |
452 done |
440 """ |
453 """ |
441 self.member.fire_transition('deactivate') |
454 iworkflowable = self.member.cw_adapt_to('IWorkflowable') |
|
455 iworkflowable.fire_transition('deactivate') |
442 wf = add_wf(self, 'CWUser') |
456 wf = add_wf(self, 'CWUser') |
443 wf.add_state('asleep', initial=True) |
457 wf.add_state('asleep', initial=True) |
444 self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
458 self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
445 {'wf': wf.eid, 'x': self.member.eid}) |
459 {'wf': wf.eid, 'x': self.member.eid}) |
446 self.commit() |
460 self.commit() |
447 self.execute('DELETE X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
461 self.execute('DELETE X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
448 {'wf': wf.eid, 'x': self.member.eid}) |
462 {'wf': wf.eid, 'x': self.member.eid}) |
449 self.member.clear_all_caches() |
463 self.member.clear_all_caches() |
450 self.assertEquals(self.member.state, 'asleep')# no change before commit |
464 self.assertEquals(iworkflowable.state, 'asleep')# no change before commit |
451 self.commit() |
465 self.commit() |
452 self.member.clear_all_caches() |
466 self.member.clear_all_caches() |
453 self.assertEquals(self.member.current_workflow.name, "default user workflow") |
467 self.assertEquals(iworkflowable.current_workflow.name, "default user workflow") |
454 self.assertEquals(self.member.state, 'activated') |
468 self.assertEquals(iworkflowable.state, 'activated') |
455 self.assertEquals(parse_hist(self.member.workflow_history), |
469 self.assertEquals(parse_hist(iworkflowable.workflow_history), |
456 [('activated', 'deactivated', 'deactivate', None), |
470 [('activated', 'deactivated', 'deactivate', None), |
457 ('deactivated', 'asleep', None, 'workflow changed to "CWUser"'), |
471 ('deactivated', 'asleep', None, 'workflow changed to "CWUser"'), |
458 ('asleep', 'activated', None, 'workflow changed to "default user workflow"'),]) |
472 ('asleep', 'activated', None, 'workflow changed to "default user workflow"'),]) |
459 |
473 |
460 |
474 |
471 return wf |
485 return wf |
472 |
486 |
473 def test_auto_transition_fired(self): |
487 def test_auto_transition_fired(self): |
474 wf = self.setup_custom_wf() |
488 wf = self.setup_custom_wf() |
475 user = self.create_user('member') |
489 user = self.create_user('member') |
|
490 iworkflowable = user.cw_adapt_to('IWorkflowable') |
476 self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
491 self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
477 {'wf': wf.eid, 'x': user.eid}) |
492 {'wf': wf.eid, 'x': user.eid}) |
478 self.commit() |
493 self.commit() |
479 user.clear_all_caches() |
494 user.clear_all_caches() |
480 self.assertEquals(user.state, 'asleep') |
495 self.assertEquals(iworkflowable.state, 'asleep') |
481 self.assertEquals([t.name for t in user.possible_transitions()], |
496 self.assertEquals([t.name for t in iworkflowable.possible_transitions()], |
482 ['rest']) |
497 ['rest']) |
483 user.fire_transition('rest') |
498 iworkflowable.fire_transition('rest') |
484 self.commit() |
499 self.commit() |
485 user.clear_all_caches() |
500 user.clear_all_caches() |
486 self.assertEquals(user.state, 'asleep') |
501 self.assertEquals(iworkflowable.state, 'asleep') |
487 self.assertEquals([t.name for t in user.possible_transitions()], |
502 self.assertEquals([t.name for t in iworkflowable.possible_transitions()], |
488 ['rest']) |
503 ['rest']) |
489 self.assertEquals(parse_hist(user.workflow_history), |
504 self.assertEquals(parse_hist(iworkflowable.workflow_history), |
490 [('asleep', 'asleep', 'rest', None)]) |
505 [('asleep', 'asleep', 'rest', None)]) |
491 user.set_attributes(surname=u'toto') # fulfill condition |
506 user.set_attributes(surname=u'toto') # fulfill condition |
492 self.commit() |
507 self.commit() |
493 user.fire_transition('rest') |
508 iworkflowable.fire_transition('rest') |
494 self.commit() |
509 self.commit() |
495 user.clear_all_caches() |
510 user.clear_all_caches() |
496 self.assertEquals(user.state, 'dead') |
511 self.assertEquals(iworkflowable.state, 'dead') |
497 self.assertEquals(parse_hist(user.workflow_history), |
512 self.assertEquals(parse_hist(iworkflowable.workflow_history), |
498 [('asleep', 'asleep', 'rest', None), |
513 [('asleep', 'asleep', 'rest', None), |
499 ('asleep', 'asleep', 'rest', None), |
514 ('asleep', 'asleep', 'rest', None), |
500 ('asleep', 'dead', 'sick', None),]) |
515 ('asleep', 'dead', 'sick', None),]) |
501 |
516 |
502 def test_auto_transition_custom_initial_state_fired(self): |
517 def test_auto_transition_custom_initial_state_fired(self): |
503 wf = self.setup_custom_wf() |
518 wf = self.setup_custom_wf() |
504 user = self.create_user('member', surname=u'toto') |
519 user = self.create_user('member', surname=u'toto') |
505 self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
520 self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
506 {'wf': wf.eid, 'x': user.eid}) |
521 {'wf': wf.eid, 'x': user.eid}) |
507 self.commit() |
522 self.commit() |
508 self.assertEquals(user.state, 'dead') |
523 iworkflowable = user.cw_adapt_to('IWorkflowable') |
|
524 self.assertEquals(iworkflowable.state, 'dead') |
509 |
525 |
510 def test_auto_transition_initial_state_fired(self): |
526 def test_auto_transition_initial_state_fired(self): |
511 wf = self.execute('Any WF WHERE ET default_workflow WF, ' |
527 wf = self.execute('Any WF WHERE ET default_workflow WF, ' |
512 'ET name %(et)s', {'et': 'CWUser'}).get_entity(0, 0) |
528 'ET name %(et)s', {'et': 'CWUser'}).get_entity(0, 0) |
513 dead = wf.add_state('dead') |
529 dead = wf.add_state('dead') |
515 type=u'auto', conditions=({'expr': u'X surname "toto"', |
531 type=u'auto', conditions=({'expr': u'X surname "toto"', |
516 'mainvars': u'X'},)) |
532 'mainvars': u'X'},)) |
517 self.commit() |
533 self.commit() |
518 user = self.create_user('member', surname=u'toto') |
534 user = self.create_user('member', surname=u'toto') |
519 self.commit() |
535 self.commit() |
520 self.assertEquals(user.state, 'dead') |
536 iworkflowable = user.cw_adapt_to('IWorkflowable') |
|
537 self.assertEquals(iworkflowable.state, 'dead') |
521 |
538 |
522 |
539 |
523 class WorkflowHooksTC(CubicWebTC): |
540 class WorkflowHooksTC(CubicWebTC): |
524 |
541 |
525 def setUp(self): |
542 def setUp(self): |
526 CubicWebTC.setUp(self) |
543 CubicWebTC.setUp(self) |
527 self.wf = self.session.user.current_workflow |
544 self.wf = self.session.user.cw_adapt_to('IWorkflowable').current_workflow |
528 self.session.set_pool() |
545 self.session.set_pool() |
529 self.s_activated = self.wf.state_by_name('activated').eid |
546 self.s_activated = self.wf.state_by_name('activated').eid |
530 self.s_deactivated = self.wf.state_by_name('deactivated').eid |
547 self.s_deactivated = self.wf.state_by_name('deactivated').eid |
531 self.s_dummy = self.wf.add_state(u'dummy').eid |
548 self.s_dummy = self.wf.add_state(u'dummy').eid |
532 self.wf.add_transition(u'dummy', (self.s_deactivated,), self.s_dummy) |
549 self.wf.add_transition(u'dummy', (self.s_deactivated,), self.s_dummy) |
570 return ' '.join(lmsg) |
587 return ' '.join(lmsg) |
571 |
588 |
572 def test_transition_checking1(self): |
589 def test_transition_checking1(self): |
573 cnx = self.login('stduser') |
590 cnx = self.login('stduser') |
574 user = cnx.user(self.session) |
591 user = cnx.user(self.session) |
|
592 iworkflowable = user.cw_adapt_to('IWorkflowable') |
575 ex = self.assertRaises(ValidationError, |
593 ex = self.assertRaises(ValidationError, |
576 user.fire_transition, 'activate') |
594 iworkflowable.fire_transition, 'activate') |
577 self.assertEquals(self._cleanup_msg(ex.errors['by_transition-subject']), |
595 self.assertEquals(self._cleanup_msg(ex.errors['by_transition-subject']), |
578 u"transition isn't allowed from") |
596 u"transition isn't allowed from") |
579 cnx.close() |
597 cnx.close() |
580 |
598 |
581 def test_transition_checking2(self): |
599 def test_transition_checking2(self): |
582 cnx = self.login('stduser') |
600 cnx = self.login('stduser') |
583 user = cnx.user(self.session) |
601 user = cnx.user(self.session) |
|
602 iworkflowable = user.cw_adapt_to('IWorkflowable') |
584 ex = self.assertRaises(ValidationError, |
603 ex = self.assertRaises(ValidationError, |
585 user.fire_transition, 'dummy') |
604 iworkflowable.fire_transition, 'dummy') |
586 self.assertEquals(self._cleanup_msg(ex.errors['by_transition-subject']), |
605 self.assertEquals(self._cleanup_msg(ex.errors['by_transition-subject']), |
587 u"transition isn't allowed from") |
606 u"transition isn't allowed from") |
588 cnx.close() |
607 cnx.close() |
589 |
608 |
590 def test_transition_checking3(self): |
609 def test_transition_checking3(self): |
591 cnx = self.login('stduser') |
610 cnx = self.login('stduser') |
592 session = self.session |
611 session = self.session |
593 user = cnx.user(session) |
612 user = cnx.user(session) |
594 user.fire_transition('deactivate') |
613 iworkflowable = user.cw_adapt_to('IWorkflowable') |
|
614 iworkflowable.fire_transition('deactivate') |
595 cnx.commit() |
615 cnx.commit() |
596 session.set_pool() |
616 session.set_pool() |
597 ex = self.assertRaises(ValidationError, |
617 ex = self.assertRaises(ValidationError, |
598 user.fire_transition, 'deactivate') |
618 iworkflowable.fire_transition, 'deactivate') |
599 self.assertEquals(self._cleanup_msg(ex.errors['by_transition-subject']), |
619 self.assertEquals(self._cleanup_msg(ex.errors['by_transition-subject']), |
600 u"transition isn't allowed from") |
620 u"transition isn't allowed from") |
601 # get back now |
621 # get back now |
602 user.fire_transition('activate') |
622 iworkflowable.fire_transition('activate') |
603 cnx.commit() |
623 cnx.commit() |
604 cnx.close() |
624 cnx.close() |
605 |
625 |
606 |
626 |
607 if __name__ == '__main__': |
627 if __name__ == '__main__': |