5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr |
5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr |
6 :license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses |
6 :license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses |
7 """ |
7 """ |
8 __docformat__ = "restructuredtext en" |
8 __docformat__ = "restructuredtext en" |
9 |
9 |
|
10 from warnings import warn |
|
11 |
|
12 from logilab.common.decorators import cached |
|
13 from logilab.common.deprecation import deprecated |
|
14 |
10 from cubicweb.entities import AnyEntity, fetch_config |
15 from cubicweb.entities import AnyEntity, fetch_config |
11 |
16 from cubicweb.interfaces import IWorkflowable |
12 |
17 from cubicweb.common.mixins import MI_REL_TRIGGERS |
13 class Transition(AnyEntity): |
18 |
14 """customized class for Transition entities |
19 |
15 |
20 class Workflow(AnyEntity): |
16 provides a specific may_be_passed method to check if the relation may be |
21 id = 'Workflow' |
17 passed by the logged user |
22 |
|
23 @property |
|
24 def initial(self): |
|
25 """return the initial state for this workflow""" |
|
26 return self.initial_state and self.initial_state[0] |
|
27 |
|
28 def is_default_workflow_of(self, etype): |
|
29 """return True if this workflow is the default workflow for the given |
|
30 entity type |
|
31 """ |
|
32 return any(et for et in self.default_workflow_of if et.name == etype) |
|
33 |
|
34 def after_deletion_path(self): |
|
35 """return (path, parameters) which should be used as redirect |
|
36 information when this entity is being deleted |
|
37 """ |
|
38 if self.workflow_of: |
|
39 return self.workflow_of[0].rest_path(), {'vid': 'workflow'} |
|
40 return super(Workflow, self).after_deletion_path() |
|
41 |
|
42 # state / transitions accessors ############################################ |
|
43 |
|
44 def state_by_name(self, statename): |
|
45 rset = self.req.execute('Any S, SN WHERE S name SN, S name %(n)s, ' |
|
46 'S state_of WF, WF eid %(wf)s', |
|
47 {'n': statename, 'wf': self.eid}, 'wf') |
|
48 if rset: |
|
49 return rset.get_entity(0, 0) |
|
50 return None |
|
51 |
|
52 def state_by_eid(self, eid): |
|
53 rset = self.req.execute('Any S, SN WHERE S name SN, S eid %(s)s, ' |
|
54 'S state_of WF, WF eid %(wf)s', |
|
55 {'s': eid, 'wf': self.eid}, ('wf', 's')) |
|
56 if rset: |
|
57 return rset.get_entity(0, 0) |
|
58 return None |
|
59 |
|
60 def transition_by_name(self, trname): |
|
61 rset = self.req.execute('Any T, TN WHERE T name TN, T name %(n)s, ' |
|
62 'T transition_of WF, WF eid %(wf)s', |
|
63 {'n': trname, 'wf': self.eid}, 'wf') |
|
64 if rset: |
|
65 return rset.get_entity(0, 0) |
|
66 return None |
|
67 |
|
68 def transition_by_eid(self, eid): |
|
69 rset = self.req.execute('Any T, TN WHERE T name TN, T eid %(t)s, ' |
|
70 'T transition_of WF, WF eid %(wf)s', |
|
71 {'t': eid, 'wf': self.eid}, ('wf', 't')) |
|
72 if rset: |
|
73 return rset.get_entity(0, 0) |
|
74 return None |
|
75 |
|
76 # wf construction methods ################################################## |
|
77 |
|
78 def add_state(self, name, initial=False, **kwargs): |
|
79 """method to ease workflow definition: add a state for one or more |
|
80 entity type(s) |
|
81 """ |
|
82 state = self.req.create_entity('State', name=name, **kwargs) |
|
83 self.req.execute('SET S state_of WF WHERE S eid %(s)s, WF eid %(wf)s', |
|
84 {'s': state.eid, 'wf': self.eid}, ('s', 'wf')) |
|
85 if initial: |
|
86 assert not self.initial |
|
87 self.req.execute('SET WF initial_state S ' |
|
88 'WHERE S eid %(s)s, WF eid %(wf)s', |
|
89 {'s': state.eid, 'wf': self.eid}, ('s', 'wf')) |
|
90 return state |
|
91 |
|
92 def add_transition(self, name, fromstates, tostate, |
|
93 requiredgroups=(), conditions=(), **kwargs): |
|
94 """method to ease workflow definition: add a transition for one or more |
|
95 entity type(s), from one or more state and to a single state |
|
96 """ |
|
97 tr = self.req.create_entity('Transition', name=name, **kwargs) |
|
98 self.req.execute('SET T transition_of WF ' |
|
99 'WHERE T eid %(t)s, WF eid %(wf)s', |
|
100 {'t': tr.eid, 'wf': self.eid}, ('t', 'wf')) |
|
101 for state in fromstates: |
|
102 if hasattr(state, 'eid'): |
|
103 state = state.eid |
|
104 self.req.execute('SET S allowed_transition T ' |
|
105 'WHERE S eid %(s)s, T eid %(t)s', |
|
106 {'s': state, 't': tr.eid}, ('s', 't')) |
|
107 if hasattr(tostate, 'eid'): |
|
108 tostate = tostate.eid |
|
109 self.req.execute('SET T destination_state S ' |
|
110 'WHERE S eid %(s)s, T eid %(t)s', |
|
111 {'t': tr.eid, 's': tostate}, ('s', 't')) |
|
112 tr.set_transition_permissions(requiredgroups, conditions, reset=False) |
|
113 return tr |
|
114 |
|
115 |
|
116 class BaseTransition(AnyEntity): |
|
117 """customized class for abstract transition |
|
118 |
|
119 provides a specific may_be_fired method to check if the relation may be |
|
120 fired by the logged user |
18 """ |
121 """ |
19 id = 'Transition' |
122 id = 'Transition' |
20 fetch_attrs, fetch_order = fetch_config(['name']) |
123 fetch_attrs, fetch_order = fetch_config(['name']) |
21 |
124 |
22 def may_be_passed(self, eid, stateeid): |
125 def may_be_fired(self, eid): |
23 """return true if the logged user may pass this transition |
126 """return true if the logged user may fire this transition |
24 |
127 |
25 `eid` is the eid of the object on which we may pass the transition |
128 `eid` is the eid of the object on which we may fire the transition |
26 `stateeid` is the eid of the current object'state XXX unused |
|
27 """ |
129 """ |
28 user = self.req.user |
130 user = self.req.user |
29 # check user is at least in one of the required groups if any |
131 # check user is at least in one of the required groups if any |
30 groups = frozenset(g.name for g in self.require_group) |
132 groups = frozenset(g.name for g in self.require_group) |
31 if groups: |
133 if groups: |
41 return True |
143 return True |
42 if self.condition or groups: |
144 if self.condition or groups: |
43 return False |
145 return False |
44 return True |
146 return True |
45 |
147 |
46 def destination(self): |
|
47 return self.destination_state[0] |
|
48 |
|
49 def after_deletion_path(self): |
148 def after_deletion_path(self): |
50 """return (path, parameters) which should be used as redirect |
149 """return (path, parameters) which should be used as redirect |
51 information when this entity is being deleted |
150 information when this entity is being deleted |
52 """ |
151 """ |
53 if self.transition_of: |
152 if self.transition_of: |
54 return self.transition_of[0].rest_path(), {'vid': 'workflow'} |
153 return self.transition_of[0].rest_path(), {} |
55 return super(Transition, self).after_deletion_path() |
154 return super(Transition, self).after_deletion_path() |
56 |
155 |
|
156 def set_transition_permissions(self, requiredgroups=(), conditions=(), |
|
157 reset=True): |
|
158 """set or add (if `reset` is False) groups and conditions for this |
|
159 transition |
|
160 """ |
|
161 if reset: |
|
162 self.req.execute('DELETE T require_group G WHERE T eid %(x)s', |
|
163 {'x': self.eid}, 'x') |
|
164 self.req.execute('DELETE T condition R WHERE T eid %(x)s', |
|
165 {'x': self.eid}, 'x') |
|
166 for gname in requiredgroups: |
|
167 ### XXX ensure gname validity |
|
168 rset = self.req.execute('SET T require_group G ' |
|
169 'WHERE T eid %(x)s, G name %(gn)s', |
|
170 {'x': self.eid, 'gn': gname}, 'x') |
|
171 assert rset, '%s is not a known group' % gname |
|
172 if isinstance(conditions, basestring): |
|
173 conditions = (conditions,) |
|
174 for expr in conditions: |
|
175 if isinstance(expr, str): |
|
176 expr = unicode(expr) |
|
177 self.req.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", ' |
|
178 'X expression %(expr)s, T condition X ' |
|
179 'WHERE T eid %(x)s', |
|
180 {'x': self.eid, 'expr': expr}, 'x') |
|
181 # XXX clear caches? |
|
182 |
|
183 |
|
184 class Transition(BaseTransition): |
|
185 """customized class for Transition entities""" |
|
186 id = 'Transition' |
|
187 |
|
188 def destination(self): |
|
189 return self.destination_state[0] |
|
190 |
|
191 def has_input_state(self, state): |
|
192 if hasattr(state, 'eid'): |
|
193 state = state.eid |
|
194 return any(s for s in self.reverse_allowed_transition if s.eid == state) |
|
195 |
|
196 |
|
197 class WorkflowTransition(BaseTransition): |
|
198 """customized class for WorkflowTransition entities""" |
|
199 id = 'WorkflowTransition' |
|
200 |
|
201 @property |
|
202 def subwf(self): |
|
203 return self.subworkflow[0] |
|
204 |
|
205 def destination(self): |
|
206 return self.subwf.initial |
|
207 |
57 |
208 |
58 class State(AnyEntity): |
209 class State(AnyEntity): |
59 """customized class for State entities |
210 """customized class for State entities""" |
60 |
|
61 provides a specific transitions method returning transitions that may be |
|
62 passed by the current user for the given entity |
|
63 """ |
|
64 id = 'State' |
211 id = 'State' |
65 fetch_attrs, fetch_order = fetch_config(['name']) |
212 fetch_attrs, fetch_order = fetch_config(['name']) |
66 rest_attr = 'eid' |
213 rest_attr = 'eid' |
67 |
214 |
68 def transitions(self, entity, desteid=None): |
|
69 """generates transition that MAY be passed""" |
|
70 rql = ('Any T,N,DS where S allowed_transition T, S eid %(x)s, ' |
|
71 'T name N, T destination_state DS, ' |
|
72 'T transition_of ET, ET name %(et)s') |
|
73 if desteid is not None: |
|
74 rql += ', DS eid %(ds)s' |
|
75 rset = self.req.execute(rql, {'x': self.eid, 'et': str(entity.e_schema), |
|
76 'ds': desteid}, 'x') |
|
77 for tr in rset.entities(): |
|
78 if tr.may_be_passed(entity.eid, self.eid): |
|
79 yield tr |
|
80 |
|
81 def after_deletion_path(self): |
215 def after_deletion_path(self): |
82 """return (path, parameters) which should be used as redirect |
216 """return (path, parameters) which should be used as redirect |
83 information when this entity is being deleted |
217 information when this entity is being deleted |
84 """ |
218 """ |
85 if self.state_of: |
219 if self.state_of: |
86 return self.state_of[0].rest_path(), {'vid': 'workflow'} |
220 return self.state_of[0].rest_path(), {} |
87 return super(State, self).after_deletion_path() |
221 return super(State, self).after_deletion_path() |
88 |
222 |
89 |
223 |
90 class TrInfo(AnyEntity): |
224 class TrInfo(AnyEntity): |
91 """customized class for Transition information entities |
225 """customized class for Transition information entities |
93 id = 'TrInfo' |
227 id = 'TrInfo' |
94 fetch_attrs, fetch_order = fetch_config(['creation_date', 'comment'], |
228 fetch_attrs, fetch_order = fetch_config(['creation_date', 'comment'], |
95 pclass=None) # don't want modification_date |
229 pclass=None) # don't want modification_date |
96 @property |
230 @property |
97 def for_entity(self): |
231 def for_entity(self): |
98 return self.wf_info_for and self.wf_info_for[0] |
232 return self.wf_info_for[0] |
|
233 |
99 @property |
234 @property |
100 def previous_state(self): |
235 def previous_state(self): |
101 return self.from_state and self.from_state[0] |
236 return self.from_state[0] |
102 |
237 |
103 @property |
238 @property |
104 def new_state(self): |
239 def new_state(self): |
105 return self.to_state[0] |
240 return self.to_state[0] |
|
241 |
|
242 @property |
|
243 def transition(self): |
|
244 return self.by_transition and self.by_transition[0] or None |
106 |
245 |
107 def after_deletion_path(self): |
246 def after_deletion_path(self): |
108 """return (path, parameters) which should be used as redirect |
247 """return (path, parameters) which should be used as redirect |
109 information when this entity is being deleted |
248 information when this entity is being deleted |
110 """ |
249 """ |
111 if self.for_entity: |
250 if self.for_entity: |
112 return self.for_entity.rest_path(), {} |
251 return self.for_entity.rest_path(), {} |
113 return 'view', {} |
252 return 'view', {} |
|
253 |
|
254 |
|
255 class WorkflowableMixIn(object): |
|
256 """base mixin providing workflow helper methods for workflowable entities. |
|
257 This mixin will be automatically set on class supporting the 'in_state' |
|
258 relation (which implies supporting 'wf_info_for' as well) |
|
259 """ |
|
260 __implements__ = (IWorkflowable,) |
|
261 |
|
262 @property |
|
263 @cached |
|
264 def current_workflow(self): |
|
265 """return current workflow applied to this entity""" |
|
266 if self.custom_workflow: |
|
267 return self.custom_workflow[0] |
|
268 wfrset = self.req.execute('Any WF WHERE X is ET, X eid %(x)s, WF workflow_of ET', |
|
269 {'x': self.eid}, 'x') |
|
270 if len(wfrset) == 1: |
|
271 return wfrset.get_entity(0, 0) |
|
272 if len(wfrset) > 1: |
|
273 for wf in wfrset.entities(): |
|
274 if wf.is_default_workflow_of(self.id): |
|
275 return wf |
|
276 self.warning("can't find default workflow for %s", self.id) |
|
277 else: |
|
278 self.warning("can't find any workflow for %s", self.id) |
|
279 return None |
|
280 |
|
281 @property |
|
282 def current_state(self): |
|
283 """return current state entity""" |
|
284 return self.in_state and self.in_state[0] or None |
|
285 |
|
286 @property |
|
287 def state(self): |
|
288 """return current state name""" |
|
289 try: |
|
290 return self.in_state[0].name |
|
291 except IndexError: |
|
292 self.warning('entity %s has no state', self) |
|
293 return None |
|
294 |
|
295 @property |
|
296 def printable_state(self): |
|
297 """return current state name translated to context's language""" |
|
298 state = self.current_state |
|
299 if state: |
|
300 return self.req._(state.name) |
|
301 return u'' |
|
302 |
|
303 def latest_trinfo(self): |
|
304 """return the latest transition information for this entity""" |
|
305 return self.reverse_wf_info_for[-1] |
|
306 |
|
307 def possible_transitions(self): |
|
308 """generates transition that MAY be fired for the given entity, |
|
309 expected to be in this state |
|
310 """ |
|
311 if self.current_state is None or self.current_workflow is None: |
|
312 return |
|
313 rset = self.req.execute( |
|
314 'Any T,N WHERE S allowed_transition T, S eid %(x)s, ' |
|
315 'T name N, T transition_of WF, WF eid %(wfeid)s', |
|
316 {'x': self.current_state.eid, |
|
317 'wfeid': self.current_workflow.eid}, 'x') |
|
318 for tr in rset.entities(): |
|
319 if tr.may_be_fired(self.eid): |
|
320 yield tr |
|
321 |
|
322 def _get_tr_kwargs(self, comment, commentformat): |
|
323 kwargs = {} |
|
324 if comment is not None: |
|
325 kwargs['comment'] = comment |
|
326 if commentformat is not None: |
|
327 kwargs['comment_format'] = commentformat |
|
328 return kwargs |
|
329 |
|
330 def fire_transition(self, trname, comment=None, commentformat=None): |
|
331 """change the entity's state by firing transition of the given name in |
|
332 entity's workflow |
|
333 """ |
|
334 assert self.current_workflow |
|
335 tr = self.current_workflow.transition_by_name(trname) |
|
336 assert tr is not None, 'not a %s transition: %s' % (self.id, state) |
|
337 # XXX try to find matching transition? |
|
338 self.req.create_entity('TrInfo', ('by_transition', 'T'), |
|
339 ('wf_info_for', 'E'), T=tr.eid, E=self.eid, |
|
340 **self._get_tr_kwargs(comment, commentformat)) |
|
341 |
|
342 def change_state(self, statename, comment=None, commentformat=None): |
|
343 """change the entity's state to the state of the given name in entity's |
|
344 workflow. This method should only by used by manager to fix an entity's |
|
345 state when their is no matching transition, otherwise fire_transition |
|
346 should be used. |
|
347 """ |
|
348 assert self.current_workflow |
|
349 if not isinstance(statename, basestring): |
|
350 warn('give a state name') |
|
351 state = self.current_workflow.state_by_eid(statename) |
|
352 assert state is not None, 'not a %s state: %s' % (self.id, state) |
|
353 else: |
|
354 state = self.current_workflow.state_by_name(statename) |
|
355 # XXX try to find matching transition? |
|
356 self.req.create_entity('TrInfo', ('to_state', 'S'), |
|
357 ('wf_info_for', 'E'), S=state.eid, E=self.eid, |
|
358 **self._get_tr_kwargs(comment, commentformat)) |
|
359 |
|
360 @deprecated('get transition from current workflow and use its may_be_fired method') |
|
361 def can_pass_transition(self, trname): |
|
362 """return the Transition instance if the current user can fire the |
|
363 transition with the given name, else None |
|
364 """ |
|
365 tr = self.current_workflow and self.current_workflow.transition_by_name(trname) |
|
366 if tr and tr.may_be_fired(self.eid): |
|
367 return tr |
|
368 |
|
369 @property |
|
370 @deprecated('use printable_state') |
|
371 def displayable_state(self): |
|
372 return self.req._(self.state) |
|
373 |
|
374 MI_REL_TRIGGERS[('in_state', 'subject')] = WorkflowableMixIn |