|
1 from cubicweb.devtools.apptest import EnvBasedTC |
|
2 from cubicweb import ValidationError |
|
3 |
|
4 class WorkflowTC(EnvBasedTC): |
|
5 |
|
6 def setup_database(self): |
|
7 rschema = self.schema['in_state'] |
|
8 for x, y in rschema.iter_rdefs(): |
|
9 self.assertEquals(rschema.rproperty(x, y, 'cardinality'), '1*') |
|
10 self.member = self.create_user('member') |
|
11 |
|
12 def test_workflow_base(self): |
|
13 e = self.create_user('toto') |
|
14 self.assertEquals(e.state, 'activated') |
|
15 e.change_state('deactivated', u'deactivate 1') |
|
16 self.commit() |
|
17 e.change_state('activated', u'activate 1') |
|
18 self.commit() |
|
19 e.change_state('deactivated', u'deactivate 2') |
|
20 self.commit() |
|
21 e.clear_related_cache('wf_info_for', 'object') |
|
22 self.assertEquals([tr.comment for tr in e.reverse_wf_info_for], |
|
23 ['deactivate 1', 'activate 1', 'deactivate 2']) |
|
24 self.assertEquals(e.latest_trinfo().comment, 'deactivate 2') |
|
25 |
|
26 # def test_wf_construction(self): # XXX update or kill me |
|
27 # bar = self.mh.cmd_add_state(u'bar', ('Personne', 'Email'), initial=True) |
|
28 # baz = self.mh.cmd_add_transition(u'baz', ('Personne', 'Email'), |
|
29 # (foo,), bar, ('managers',)) |
|
30 # for etype in ('Personne', 'Email'): |
|
31 # t1 = self.mh.rqlexec('Any N WHERE T transition_of ET, ET name "%s", T name N' % |
|
32 # etype)[0][0] |
|
33 # self.assertEquals(t1, "baz") |
|
34 # gn = self.mh.rqlexec('Any GN WHERE T require_group G, G name GN, T eid %s' % baz)[0][0] |
|
35 # self.assertEquals(gn, 'managers') |
|
36 |
|
37 def test_possible_transitions(self): |
|
38 user = self.entity('CWUser X') |
|
39 trs = list(user.possible_transitions()) |
|
40 self.assertEquals(len(trs), 1) |
|
41 self.assertEquals(trs[0].name, u'deactivate') |
|
42 self.assertEquals(trs[0].destination().name, u'deactivated') |
|
43 # test a std user get no possible transition |
|
44 cnx = self.login('member') |
|
45 # fetch the entity using the new session |
|
46 trs = list(cnx.user().possible_transitions()) |
|
47 self.assertEquals(len(trs), 0) |
|
48 |
|
49 def _test_manager_deactivate(self, user): |
|
50 user.clear_related_cache('in_state', 'subject') |
|
51 self.assertEquals(len(user.in_state), 1) |
|
52 self.assertEquals(user.state, 'deactivated') |
|
53 trinfo = user.latest_trinfo() |
|
54 self.assertEquals(trinfo.previous_state.name, 'activated') |
|
55 self.assertEquals(trinfo.new_state.name, 'deactivated') |
|
56 self.assertEquals(trinfo.comment, 'deactivate user') |
|
57 self.assertEquals(trinfo.comment_format, 'text/plain') |
|
58 return trinfo |
|
59 |
|
60 def test_change_state(self): |
|
61 user = self.user() |
|
62 user.change_state('deactivated', comment=u'deactivate user') |
|
63 trinfo = self._test_manager_deactivate(user) |
|
64 self.assertEquals(trinfo.transition, None) |
|
65 |
|
66 def test_fire_transition(self): |
|
67 user = self.user() |
|
68 user.fire_transition('deactivate', comment=u'deactivate user') |
|
69 self.assertEquals(user.state, 'deactivated') |
|
70 self._test_manager_deactivate(user) |
|
71 trinfo = self._test_manager_deactivate(user) |
|
72 self.assertEquals(trinfo.transition.name, 'deactivate') |
|
73 |
|
74 # XXX test managers can change state without matching transition |
|
75 |
|
76 def _test_stduser_deactivate(self): |
|
77 ueid = self.member.eid |
|
78 self.create_user('tutu') |
|
79 cnx = self.login('tutu') |
|
80 req = self.request() |
|
81 member = req.entity_from_eid(self.member.eid) |
|
82 ex = self.assertRaises(ValidationError, |
|
83 member.fire_transition, 'deactivate') |
|
84 self.assertEquals(ex.errors, {'by_transition': "transition may not be fired"}) |
|
85 cnx.close() |
|
86 cnx = self.login('member') |
|
87 req = self.request() |
|
88 member = req.entity_from_eid(self.member.eid) |
|
89 member.fire_transition('deactivate') |
|
90 cnx.commit() |
|
91 ex = self.assertRaises(ValidationError, |
|
92 member.fire_transition, 'activate') |
|
93 self.assertEquals(ex.errors, {'by_transition': "transition may not be fired"}) |
|
94 |
|
95 def test_fire_transition_owned_by(self): |
|
96 self.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", ' |
|
97 'X expression "X owned_by U", T condition X ' |
|
98 'WHERE T name "deactivate"') |
|
99 self._test_stduser_deactivate() |
|
100 |
|
101 def test_fire_transition_has_update_perm(self): |
|
102 self.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", ' |
|
103 'X expression "U has_update_permission X", T condition X ' |
|
104 'WHERE T name "deactivate"') |
|
105 self._test_stduser_deactivate() |
|
106 |
|
107 def _init_wf_with_shared_state_or_tr(self): |
|
108 req = self.request() |
|
109 etypes = dict(self.execute('Any N, ET WHERE ET is CWEType, ET name N' |
|
110 ', ET name IN ("CWGroup", "Bookmark")')) |
|
111 self.grpwf = req.create_entity('Workflow', ('workflow_of', 'ET'), |
|
112 ET=etypes['CWGroup'], name=u'group workflow') |
|
113 self.bmkwf = req.execute('Any X WHERE X is Workflow, X workflow_of ET, ET name "Bookmark"').get_entity(0, 0) |
|
114 self.state1 = self.grpwf.add_state(u'state1', initial=True) |
|
115 self.execute('SET S state_of WF WHERE S eid %(s)s, WF eid %(wf)s', |
|
116 {'s': self.state1.eid, 'wf': self.bmkwf.eid}) |
|
117 self.execute('SET WF initial_state S WHERE S eid %(s)s, WF eid %(wf)s', |
|
118 {'s': self.state1.eid, 'wf': self.bmkwf.eid}) |
|
119 self.state2 = self.grpwf.add_state(u'state2') |
|
120 self.group = self.add_entity('CWGroup', name=u't1') |
|
121 self.bookmark = self.add_entity('Bookmark', title=u'111', path=u'/view') |
|
122 # commit to link to the initial state |
|
123 self.commit() |
|
124 |
|
125 def test_transitions_selection(self): |
|
126 """ |
|
127 ------------------------ tr1 ----------------- |
|
128 | state1 (CWGroup, Bookmark) | ------> | state2 (CWGroup) | |
|
129 ------------------------ ----------------- |
|
130 | tr2 ------------------ |
|
131 `------> | state3 (Bookmark) | |
|
132 ------------------ |
|
133 """ |
|
134 self._init_wf_with_shared_state_or_tr() |
|
135 state3 = self.bmkwf.add_state(u'state3') |
|
136 tr1 = self.grpwf.add_transition(u'tr1', (self.state1,), self.state2) |
|
137 tr2 = self.bmkwf.add_transition(u'tr2', (self.state1,), state3) |
|
138 transitions = list(self.group.possible_transitions()) |
|
139 self.assertEquals(len(transitions), 1) |
|
140 self.assertEquals(transitions[0].name, 'tr1') |
|
141 transitions = list(self.bookmark.possible_transitions()) |
|
142 self.assertEquals(len(transitions), 1) |
|
143 self.assertEquals(transitions[0].name, 'tr2') |
|
144 |
|
145 |
|
146 def test_transitions_selection2(self): |
|
147 """ |
|
148 ------------------------ tr1 (Bookmark) ----------------------- |
|
149 | state1 (CWGroup, Bookmark) | -------------> | state2 (CWGroup,Bookmark) | |
|
150 ------------------------ ----------------------- |
|
151 | tr2 (CWGroup) | |
|
152 `---------------------------------/ |
|
153 """ |
|
154 self._init_wf_with_shared_state_or_tr() |
|
155 self.execute('SET S state_of WF WHERE S eid %(s)s, WF eid %(wf)s', |
|
156 {'s': self.state2.eid, 'wf': self.bmkwf.eid}) |
|
157 tr1 = self.bmkwf.add_transition(u'tr1', (self.state1,), self.state2) |
|
158 tr2 = self.grpwf.add_transition(u'tr2', (self.state1,), self.state2) |
|
159 transitions = list(self.group.possible_transitions()) |
|
160 self.assertEquals(len(transitions), 1) |
|
161 self.assertEquals(transitions[0].name, 'tr2') |
|
162 transitions = list(self.bookmark.possible_transitions()) |
|
163 self.assertEquals(len(transitions), 1) |
|
164 self.assertEquals(transitions[0].name, 'tr1') |
|
165 |
|
166 |
|
167 from cubicweb.devtools.apptest import RepositoryBasedTC |
|
168 |
|
169 class WorkflowHooksTC(RepositoryBasedTC): |
|
170 |
|
171 def setUp(self): |
|
172 RepositoryBasedTC.setUp(self) |
|
173 self.wf = self.session.user.current_workflow |
|
174 self.s_activated = self.wf.state_by_name('activated').eid |
|
175 self.s_deactivated = self.wf.state_by_name('deactivated').eid |
|
176 self.s_dummy = self.wf.add_state(u'dummy').eid |
|
177 self.wf.add_transition(u'dummy', (self.s_deactivated,), self.s_dummy) |
|
178 ueid = self.create_user('stduser', commit=False) |
|
179 # test initial state is set |
|
180 rset = self.execute('Any N WHERE S name N, X in_state S, X eid %(x)s', |
|
181 {'x' : ueid}) |
|
182 self.failIf(rset, rset.rows) |
|
183 self.commit() |
|
184 initialstate = self.execute('Any N WHERE S name N, X in_state S, X eid %(x)s', |
|
185 {'x' : ueid})[0][0] |
|
186 self.assertEquals(initialstate, u'activated') |
|
187 # give access to users group on the user's wf transitions |
|
188 # so we can test wf enforcing on euser (managers don't have anymore this |
|
189 # enforcement |
|
190 self.execute('SET X require_group G ' |
|
191 'WHERE G name "users", X transition_of WF, WF eid %(wf)s', |
|
192 {'wf': self.wf.eid}) |
|
193 self.commit() |
|
194 |
|
195 def tearDown(self): |
|
196 self.execute('DELETE X require_group G ' |
|
197 'WHERE G name "users", X transition_of WF, WF eid %(wf)s', |
|
198 {'wf': self.wf.eid}) |
|
199 self.commit() |
|
200 RepositoryBasedTC.tearDown(self) |
|
201 |
|
202 # XXX currently, we've to rely on hooks to set initial state, or to use unsafe_execute |
|
203 # def test_initial_state(self): |
|
204 # cnx = self.login('stduser') |
|
205 # cu = cnx.cursor() |
|
206 # self.assertRaises(ValidationError, cu.execute, |
|
207 # 'INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, ' |
|
208 # 'X in_state S WHERE S name "deactivated"', {'pwd': 'oops'}) |
|
209 # cnx.close() |
|
210 # # though managers can do whatever he want |
|
211 # self.execute('INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, ' |
|
212 # 'X in_state S, X in_group G WHERE S name "deactivated", G name "users"', {'pwd': 'oops'}) |
|
213 # self.commit() |
|
214 |
|
215 # test that the workflow is correctly enforced |
|
216 def test_transition_checking1(self): |
|
217 cnx = self.login('stduser') |
|
218 user = cnx.user(self.current_session()) |
|
219 ex = self.assertRaises(ValidationError, |
|
220 user.fire_transition, 'activate') |
|
221 self.assertEquals(ex.errors, {'by_transition': u"transition isn't allowed"}) |
|
222 cnx.close() |
|
223 |
|
224 def test_transition_checking2(self): |
|
225 cnx = self.login('stduser') |
|
226 user = cnx.user(self.current_session()) |
|
227 assert user.state == 'activated' |
|
228 ex = self.assertRaises(ValidationError, |
|
229 user.fire_transition, 'dummy') |
|
230 self.assertEquals(ex.errors, {'by_transition': u"transition isn't allowed"}) |
|
231 cnx.close() |
|
232 |
|
233 def test_transition_checking3(self): |
|
234 cnx = self.login('stduser') |
|
235 session = self.current_session() |
|
236 user = cnx.user(session) |
|
237 user.fire_transition('deactivate') |
|
238 cnx.commit() |
|
239 session.set_pool() |
|
240 ex = self.assertRaises(ValidationError, |
|
241 user.fire_transition, 'deactivate') |
|
242 self.assertEquals(ex.errors, {'by_transition': u"transition isn't allowed"}) |
|
243 # get back now |
|
244 user.fire_transition('activate') |
|
245 cnx.commit() |
|
246 cnx.close() |
|
247 |
|
248 |
|
249 if __name__ == '__main__': |
|
250 from logilab.common.testlib import unittest_main |
|
251 unittest_main() |