1 from cubicweb.devtools.apptest import EnvBasedTC |
1 from cubicweb.devtools.apptest import EnvBasedTC |
2 from cubicweb import ValidationError |
2 from cubicweb import ValidationError |
3 |
3 |
|
4 def add_wf(self, etype, name=None): |
|
5 if name is None: |
|
6 name = unicode(etype) |
|
7 wf = self.execute('INSERT Workflow X: X name %(n)s', {'n': name}).get_entity(0, 0) |
|
8 self.execute('SET WF workflow_of ET WHERE WF eid %(wf)s, ET name %(et)s', |
|
9 {'wf': wf.eid, 'et': etype}) |
|
10 return wf |
|
11 |
|
12 def parse_hist(wfhist): |
|
13 return [(ti.previous_state.name, ti.new_state.name, |
|
14 ti.transition and ti.transition.name, ti.comment) |
|
15 for ti in wfhist] |
|
16 |
|
17 |
4 class WorkflowBuildingTC(EnvBasedTC): |
18 class WorkflowBuildingTC(EnvBasedTC): |
5 |
19 |
6 def test_wf_construction(self): |
20 def test_wf_construction(self): |
7 wf = self.execute('INSERT Workflow X: X name "test"').get_entity(0, 0) |
21 wf = add_wf(self, 'Company') |
8 self.execute('SET WF workflow_of ET WHERE ET name "Company"') |
|
9 foo = wf.add_state(u'foo', initial=True) |
22 foo = wf.add_state(u'foo', initial=True) |
10 bar = wf.add_state(u'bar') |
23 bar = wf.add_state(u'bar') |
11 self.assertEquals(wf.state_by_name('bar').eid, bar.eid) |
24 self.assertEquals(wf.state_by_name('bar').eid, bar.eid) |
12 self.assertEquals(wf.state_by_name('barrr'), None) |
25 self.assertEquals(wf.state_by_name('barrr'), None) |
13 baz = wf.add_transition(u'baz', (foo,), bar, ('managers',)) |
26 baz = wf.add_transition(u'baz', (foo,), bar, ('managers',)) |
14 self.assertEquals(wf.transition_by_name('baz').eid, baz.eid) |
27 self.assertEquals(wf.transition_by_name('baz').eid, baz.eid) |
15 self.assertEquals(len(baz.require_group), 1) |
28 self.assertEquals(len(baz.require_group), 1) |
16 self.assertEquals(baz.require_group[0].name, 'managers') |
29 self.assertEquals(baz.require_group[0].name, 'managers') |
17 |
30 |
18 def test_duplicated_state(self): |
31 def test_duplicated_state(self): |
19 wf = self.execute('INSERT Workflow X: X name "test"').get_entity(0, 0) |
32 wf = add_wf(self, 'Company') |
20 self.execute('SET WF workflow_of ET WHERE ET name "Company"') |
|
21 wf.add_state(u'foo', initial=True) |
33 wf.add_state(u'foo', initial=True) |
22 wf.add_state(u'foo') |
34 wf.add_state(u'foo') |
23 ex = self.assertRaises(ValidationError, self.commit) |
35 ex = self.assertRaises(ValidationError, self.commit) |
24 # XXX enhance message |
36 # XXX enhance message |
25 self.assertEquals(ex.errors, {'state_of': 'unique constraint S name N, Y state_of O, Y name N failed'}) |
37 self.assertEquals(ex.errors, {'state_of': 'unique constraint S name N, Y state_of O, Y name N failed'}) |
26 |
38 |
27 def test_duplicated_transition(self): |
39 def test_duplicated_transition(self): |
28 wf = self.execute('INSERT Workflow X: X name "test"').get_entity(0, 0) |
40 wf = add_wf(self, 'Company') |
29 self.execute('SET WF workflow_of ET WHERE ET name "Company"') |
|
30 foo = wf.add_state(u'foo', initial=True) |
41 foo = wf.add_state(u'foo', initial=True) |
31 bar = wf.add_state(u'bar') |
42 bar = wf.add_state(u'bar') |
32 wf.add_transition(u'baz', (foo,), bar, ('managers',)) |
43 wf.add_transition(u'baz', (foo,), bar, ('managers',)) |
33 wf.add_transition(u'baz', (bar,), foo) |
44 wf.add_transition(u'baz', (bar,), foo) |
34 ex = self.assertRaises(ValidationError, self.commit) |
45 ex = self.assertRaises(ValidationError, self.commit) |
185 self.assertEquals(transitions[0].name, 'tr2') |
196 self.assertEquals(transitions[0].name, 'tr2') |
186 transitions = list(self.bookmark.possible_transitions()) |
197 transitions = list(self.bookmark.possible_transitions()) |
187 self.assertEquals(len(transitions), 1) |
198 self.assertEquals(len(transitions), 1) |
188 self.assertEquals(transitions[0].name, 'tr1') |
199 self.assertEquals(transitions[0].name, 'tr1') |
189 |
200 |
|
201 class CustomWorkflowTC(EnvBasedTC): |
|
202 |
|
203 def setup_database(self): |
|
204 self.member = self.create_user('member') |
|
205 |
|
206 def tearDown(self): |
|
207 super(CustomWorkflowTC, self).tearDown() |
|
208 self.execute('DELETE X custom_workflow WF') |
|
209 |
|
210 def test_custom_wf_replace_state_no_history(self): |
|
211 """member in inital state with no previous history, state is simply |
|
212 redirected when changing workflow |
|
213 """ |
|
214 wf = add_wf(self, 'CWUser') |
|
215 wf.add_state('asleep', initial=True) |
|
216 self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
|
217 {'wf': wf.eid, 'x': self.member.eid}) |
|
218 self.member.clear_all_caches() |
|
219 self.assertEquals(self.member.state, 'activated')# no change before commit |
|
220 self.commit() |
|
221 self.member.clear_all_caches() |
|
222 self.assertEquals(self.member.current_workflow.eid, wf.eid) |
|
223 self.assertEquals(self.member.state, 'asleep') |
|
224 self.assertEquals(self.member.workflow_history, []) |
|
225 |
|
226 def test_custom_wf_replace_state_keep_history(self): |
|
227 """member in inital state with some history, state is redirected and |
|
228 state change is recorded to history |
|
229 """ |
|
230 self.member.fire_transition('deactivate') |
|
231 self.member.fire_transition('activate') |
|
232 wf = add_wf(self, 'CWUser') |
|
233 wf.add_state('asleep', initial=True) |
|
234 self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
|
235 {'wf': wf.eid, 'x': self.member.eid}) |
|
236 self.commit() |
|
237 self.member.clear_all_caches() |
|
238 self.assertEquals(self.member.current_workflow.eid, wf.eid) |
|
239 self.assertEquals(self.member.state, 'asleep') |
|
240 self.assertEquals(parse_hist(self.member.workflow_history), |
|
241 [('activated', 'deactivated', 'deactivate', None), |
|
242 ('deactivated', 'activated', 'activate', None), |
|
243 ('activated', 'asleep', None, 'workflow changed to "CWUser"')]) |
|
244 |
|
245 def test_custom_wf_shared_state(self): |
|
246 """member in some state shared by the new workflow, nothing has to be |
|
247 done |
|
248 """ |
|
249 self.member.fire_transition('deactivate') |
|
250 self.assertEquals(self.member.state, 'deactivated') |
|
251 wf = add_wf(self, 'CWUser') |
|
252 wf.add_state('asleep', initial=True) |
|
253 self.execute('SET S state_of WF WHERE S name "deactivated", WF eid %(wf)s', |
|
254 {'wf': wf.eid}) |
|
255 self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
|
256 {'wf': wf.eid, 'x': self.member.eid}) |
|
257 self.commit() |
|
258 self.member.clear_all_caches() |
|
259 self.assertEquals(self.member.current_workflow.eid, wf.eid) |
|
260 self.assertEquals(self.member.state, 'deactivated') |
|
261 self.assertEquals(parse_hist(self.member.workflow_history), |
|
262 [('activated', 'deactivated', 'deactivate', None)]) |
|
263 |
|
264 def test_custom_wf_no_initial_state(self): |
|
265 """try to set a custom workflow which has no initial state""" |
|
266 self.member.fire_transition('deactivate') |
|
267 wf = add_wf(self, 'CWUser') |
|
268 wf.add_state('asleep') |
|
269 self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
|
270 {'wf': wf.eid, 'x': self.member.eid}) |
|
271 ex = self.assertRaises(ValidationError, self.commit) |
|
272 self.assertEquals(ex.errors, {'custom_workflow': u'workflow has no initial state'}) |
|
273 |
|
274 def test_custom_wf_bad_etype(self): |
|
275 """try to set a custom workflow which has no initial state""" |
|
276 self.member.fire_transition('deactivate') |
|
277 wf = add_wf(self, 'Company') |
|
278 wf.add_state('asleep', initial=True) |
|
279 self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
|
280 {'wf': wf.eid, 'x': self.member.eid}) |
|
281 ex = self.assertRaises(ValidationError, self.commit) |
|
282 self.assertEquals(ex.errors, {'custom_workflow': 'constraint S is ET, O workflow_of ET failed'}) |
|
283 |
|
284 def test_del_custom_wf(self): |
|
285 """member in some state shared by the new workflow, nothing has to be |
|
286 done |
|
287 """ |
|
288 self.member.fire_transition('deactivate') |
|
289 wf = add_wf(self, 'CWUser') |
|
290 wf.add_state('asleep', initial=True) |
|
291 self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
|
292 {'wf': wf.eid, 'x': self.member.eid}) |
|
293 self.commit() |
|
294 self.execute('DELETE X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s', |
|
295 {'wf': wf.eid, 'x': self.member.eid}) |
|
296 self.member.clear_all_caches() |
|
297 self.assertEquals(self.member.state, 'asleep')# no change before commit |
|
298 self.commit() |
|
299 self.member.clear_all_caches() |
|
300 self.assertEquals(self.member.current_workflow.name, "CWUser workflow") |
|
301 self.assertEquals(self.member.state, 'activated') |
|
302 self.assertEquals(parse_hist(self.member.workflow_history), |
|
303 [('activated', 'deactivated', 'deactivate', None), |
|
304 ('deactivated', 'asleep', None, 'workflow changed to "CWUser"'), |
|
305 ('asleep', 'activated', None, 'workflow changed to "CWUser workflow"'),]) |
|
306 |
190 |
307 |
191 from cubicweb.devtools.apptest import RepositoryBasedTC |
308 from cubicweb.devtools.apptest import RepositoryBasedTC |
192 |
309 |
193 class WorkflowHooksTC(RepositoryBasedTC): |
310 class WorkflowHooksTC(RepositoryBasedTC): |
194 |
311 |