--- a/entities/wfobjs.py Mon Jan 04 18:40:30 2016 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,589 +0,0 @@
-# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
-# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
-#
-# This file is part of CubicWeb.
-#
-# CubicWeb is free software: you can redistribute it and/or modify it under the
-# terms of the GNU Lesser General Public License as published by the Free
-# Software Foundation, either version 2.1 of the License, or (at your option)
-# any later version.
-#
-# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
-# details.
-#
-# You should have received a copy of the GNU Lesser General Public License along
-# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
-"""workflow handling:
-
-* entity types defining workflow (Workflow, State, Transition...)
-* workflow history (TrInfo)
-* adapter for workflowable entities (IWorkflowableAdapter)
-"""
-from __future__ import print_function
-
-__docformat__ = "restructuredtext en"
-
-from six import text_type, string_types
-
-from logilab.common.decorators import cached, clear_cache
-from logilab.common.deprecation import deprecated
-
-from cubicweb.entities import AnyEntity, fetch_config
-from cubicweb.view import EntityAdapter
-from cubicweb.predicates import relation_possible
-
-
-try:
- from cubicweb import server
-except ImportError:
- # We need to lookup DEBUG from there,
- # however a pure dbapi client may not have it.
- class server(object): pass
- server.DEBUG = False
-
-
-class WorkflowException(Exception): pass
-
-class Workflow(AnyEntity):
- __regid__ = 'Workflow'
-
- @property
- def initial(self):
- """return the initial state for this workflow"""
- return self.initial_state and self.initial_state[0] or None
-
- def is_default_workflow_of(self, etype):
- """return True if this workflow is the default workflow for the given
- entity type
- """
- return any(et for et in self.reverse_default_workflow
- if et.name == etype)
-
- def iter_workflows(self, _done=None):
- """return an iterator on actual workflows, eg this workflow and its
- subworkflows
- """
- # infinite loop safety belt
- if _done is None:
- _done = set()
- yield self
- _done.add(self.eid)
- for tr in self._cw.execute('Any T WHERE T is WorkflowTransition, '
- 'T transition_of WF, WF eid %(wf)s',
- {'wf': self.eid}).entities():
- if tr.subwf.eid in _done:
- continue
- for subwf in tr.subwf.iter_workflows(_done):
- yield subwf
-
- # state / transitions accessors ############################################
-
- def state_by_name(self, statename):
- rset = self._cw.execute('Any S, SN WHERE S name SN, S name %(n)s, '
- 'S state_of WF, WF eid %(wf)s',
- {'n': statename, 'wf': self.eid})
- if rset:
- return rset.get_entity(0, 0)
- return None
-
- def state_by_eid(self, eid):
- rset = self._cw.execute('Any S, SN WHERE S name SN, S eid %(s)s, '
- 'S state_of WF, WF eid %(wf)s',
- {'s': eid, 'wf': self.eid})
- if rset:
- return rset.get_entity(0, 0)
- return None
-
- def transition_by_name(self, trname):
- rset = self._cw.execute('Any T, TN WHERE T name TN, T name %(n)s, '
- 'T transition_of WF, WF eid %(wf)s',
- {'n': text_type(trname), 'wf': self.eid})
- if rset:
- return rset.get_entity(0, 0)
- return None
-
- def transition_by_eid(self, eid):
- rset = self._cw.execute('Any T, TN WHERE T name TN, T eid %(t)s, '
- 'T transition_of WF, WF eid %(wf)s',
- {'t': eid, 'wf': self.eid})
- if rset:
- return rset.get_entity(0, 0)
- return None
-
- # wf construction methods ##################################################
-
- def add_state(self, name, initial=False, **kwargs):
- """add a state to this workflow"""
- state = self._cw.create_entity('State', name=text_type(name), **kwargs)
- self._cw.execute('SET S state_of WF WHERE S eid %(s)s, WF eid %(wf)s',
- {'s': state.eid, 'wf': self.eid})
- if initial:
- assert not self.initial, "Initial state already defined as %s" % self.initial
- self._cw.execute('SET WF initial_state S '
- 'WHERE S eid %(s)s, WF eid %(wf)s',
- {'s': state.eid, 'wf': self.eid})
- return state
-
- def _add_transition(self, trtype, name, fromstates,
- requiredgroups=(), conditions=(), **kwargs):
- tr = self._cw.create_entity(trtype, name=text_type(name), **kwargs)
- self._cw.execute('SET T transition_of WF '
- 'WHERE T eid %(t)s, WF eid %(wf)s',
- {'t': tr.eid, 'wf': self.eid})
- assert fromstates, fromstates
- if not isinstance(fromstates, (tuple, list)):
- fromstates = (fromstates,)
- for state in fromstates:
- if hasattr(state, 'eid'):
- state = state.eid
- self._cw.execute('SET S allowed_transition T '
- 'WHERE S eid %(s)s, T eid %(t)s',
- {'s': state, 't': tr.eid})
- tr.set_permissions(requiredgroups, conditions, reset=False)
- return tr
-
- def add_transition(self, name, fromstates, tostate=None,
- requiredgroups=(), conditions=(), **kwargs):
- """add a transition to this workflow from some state(s) to another"""
- tr = self._add_transition('Transition', name, fromstates,
- requiredgroups, conditions, **kwargs)
- if tostate is not None:
- if hasattr(tostate, 'eid'):
- tostate = tostate.eid
- self._cw.execute('SET T destination_state S '
- 'WHERE S eid %(s)s, T eid %(t)s',
- {'t': tr.eid, 's': tostate})
- return tr
-
- def add_wftransition(self, name, subworkflow, fromstates, exitpoints=(),
- requiredgroups=(), conditions=(), **kwargs):
- """add a workflow transition to this workflow"""
- tr = self._add_transition('WorkflowTransition', name, fromstates,
- requiredgroups, conditions, **kwargs)
- if hasattr(subworkflow, 'eid'):
- subworkflow = subworkflow.eid
- assert self._cw.execute('SET T subworkflow WF WHERE WF eid %(wf)s,T eid %(t)s',
- {'t': tr.eid, 'wf': subworkflow})
- for fromstate, tostate in exitpoints:
- tr.add_exit_point(fromstate, tostate)
- return tr
-
- def replace_state(self, todelstate, replacement):
- """migration convenience method"""
- if not hasattr(todelstate, 'eid'):
- todelstate = self.state_by_name(todelstate)
- if not hasattr(replacement, 'eid'):
- replacement = self.state_by_name(replacement)
- args = {'os': todelstate.eid, 'ns': replacement.eid}
- execute = self._cw.execute
- execute('SET X in_state NS WHERE X in_state OS, '
- 'NS eid %(ns)s, OS eid %(os)s', args)
- execute('SET X from_state NS WHERE X from_state OS, '
- 'OS eid %(os)s, NS eid %(ns)s', args)
- execute('SET X to_state NS WHERE X to_state OS, '
- 'OS eid %(os)s, NS eid %(ns)s', args)
- todelstate.cw_delete()
-
-
-class BaseTransition(AnyEntity):
- """customized class for abstract transition
-
- provides a specific may_be_fired method to check if the relation may be
- fired by the logged user
- """
- __regid__ = 'BaseTransition'
- fetch_attrs, cw_fetch_order = fetch_config(['name', 'type'])
-
- def __init__(self, *args, **kwargs):
- if self.cw_etype == 'BaseTransition':
- raise WorkflowException('should not be instantiated')
- super(BaseTransition, self).__init__(*args, **kwargs)
-
- @property
- def workflow(self):
- return self.transition_of[0]
-
- def has_input_state(self, state):
- if hasattr(state, 'eid'):
- state = state.eid
- return any(s for s in self.reverse_allowed_transition if s.eid == state)
-
- def may_be_fired(self, eid):
- """return true if the logged user may fire this transition
-
- `eid` is the eid of the object on which we may fire the transition
- """
- DBG = False
- if server.DEBUG & server.DBG_SEC:
- if 'transition' in server._SECURITY_CAPS:
- DBG = True
- user = self._cw.user
- # check user is at least in one of the required groups if any
- groups = frozenset(g.name for g in self.require_group)
- if groups:
- matches = user.matching_groups(groups)
- if matches:
- if DBG:
- print('may_be_fired: %r may fire: user matches %s' % (self.name, groups))
- return matches
- if 'owners' in groups and user.owns(eid):
- if DBG:
- print('may_be_fired: %r may fire: user is owner' % self.name)
- return True
- # check one of the rql expression conditions matches if any
- if self.condition:
- if DBG:
- print('my_be_fired: %r: %s' %
- (self.name, [(rqlexpr.expression,
- rqlexpr.check_expression(self._cw, eid))
- for rqlexpr in self.condition]))
- for rqlexpr in self.condition:
- if rqlexpr.check_expression(self._cw, eid):
- return True
- if self.condition or groups:
- return False
- return True
-
- def set_permissions(self, requiredgroups=(), conditions=(), reset=True):
- """set or add (if `reset` is False) groups and conditions for this
- transition
- """
- if reset:
- self._cw.execute('DELETE T require_group G WHERE T eid %(x)s',
- {'x': self.eid})
- self._cw.execute('DELETE T condition R WHERE T eid %(x)s',
- {'x': self.eid})
- for gname in requiredgroups:
- rset = self._cw.execute('SET T require_group G '
- 'WHERE T eid %(x)s, G name %(gn)s',
- {'x': self.eid, 'gn': text_type(gname)})
- assert rset, '%s is not a known group' % gname
- if isinstance(conditions, string_types):
- conditions = (conditions,)
- for expr in conditions:
- if isinstance(expr, string_types):
- kwargs = {'expr': text_type(expr)}
- else:
- assert isinstance(expr, dict)
- kwargs = expr
- kwargs['x'] = self.eid
- kwargs.setdefault('mainvars', u'X')
- self._cw.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", '
- 'X expression %(expr)s, X mainvars %(mainvars)s, '
- 'T condition X WHERE T eid %(x)s', kwargs)
- # XXX clear caches?
-
-
-class Transition(BaseTransition):
- """customized class for Transition entities"""
- __regid__ = 'Transition'
-
- def dc_long_title(self):
- return '%s (%s)' % (self.name, self._cw._(self.name))
-
- def destination(self, entity):
- try:
- return self.destination_state[0]
- except IndexError:
- return entity.cw_adapt_to('IWorkflowable').latest_trinfo().previous_state
-
- def potential_destinations(self):
- try:
- yield self.destination_state[0]
- except IndexError:
- for incomingstate in self.reverse_allowed_transition:
- for tr in incomingstate.reverse_destination_state:
- for previousstate in tr.reverse_allowed_transition:
- yield previousstate
-
-
-class WorkflowTransition(BaseTransition):
- """customized class for WorkflowTransition entities"""
- __regid__ = 'WorkflowTransition'
-
- @property
- def subwf(self):
- return self.subworkflow[0]
-
- def destination(self, entity):
- return self.subwf.initial
-
- def potential_destinations(self):
- yield self.subwf.initial
-
- def add_exit_point(self, fromstate, tostate):
- if hasattr(fromstate, 'eid'):
- fromstate = fromstate.eid
- if tostate is None:
- self._cw.execute('INSERT SubWorkflowExitPoint X: T subworkflow_exit X, '
- 'X subworkflow_state FS WHERE T eid %(t)s, FS eid %(fs)s',
- {'t': self.eid, 'fs': fromstate})
- else:
- if hasattr(tostate, 'eid'):
- tostate = tostate.eid
- self._cw.execute('INSERT SubWorkflowExitPoint X: T subworkflow_exit X, '
- 'X subworkflow_state FS, X destination_state TS '
- 'WHERE T eid %(t)s, FS eid %(fs)s, TS eid %(ts)s',
- {'t': self.eid, 'fs': fromstate, 'ts': tostate})
-
- def get_exit_point(self, entity, stateeid):
- """if state is an exit point, return its associated destination state"""
- if hasattr(stateeid, 'eid'):
- stateeid = stateeid.eid
- try:
- tostateeid = self.exit_points()[stateeid]
- except KeyError:
- return None
- if tostateeid is None:
- # go back to state from which we've entered the subworkflow
- return entity.cw_adapt_to('IWorkflowable').subworkflow_input_trinfo().previous_state
- return self._cw.entity_from_eid(tostateeid)
-
- @cached
- def exit_points(self):
- result = {}
- for ep in self.subworkflow_exit:
- result[ep.subwf_state.eid] = ep.destination and ep.destination.eid
- return result
-
- def cw_clear_all_caches(self):
- super(WorkflowTransition, self).cw_clear_all_caches()
- clear_cache(self, 'exit_points')
-
-
-class SubWorkflowExitPoint(AnyEntity):
- """customized class for SubWorkflowExitPoint entities"""
- __regid__ = 'SubWorkflowExitPoint'
-
- @property
- def subwf_state(self):
- return self.subworkflow_state[0]
-
- @property
- def destination(self):
- return self.destination_state and self.destination_state[0] or None
-
-
-class State(AnyEntity):
- """customized class for State entities"""
- __regid__ = 'State'
- fetch_attrs, cw_fetch_order = fetch_config(['name'])
- rest_attr = 'eid'
-
- def dc_long_title(self):
- return '%s (%s)' % (self.name, self._cw._(self.name))
-
- @property
- def workflow(self):
- # take care, may be missing in multi-sources configuration
- return self.state_of and self.state_of[0] or None
-
-
-class TrInfo(AnyEntity):
- """customized class for Transition information entities
- """
- __regid__ = 'TrInfo'
- fetch_attrs, cw_fetch_order = fetch_config(['creation_date', 'comment'],
- pclass=None) # don't want modification_date
- @property
- def for_entity(self):
- return self.wf_info_for[0]
-
- @property
- def previous_state(self):
- return self.from_state[0]
-
- @property
- def new_state(self):
- return self.to_state[0]
-
- @property
- def transition(self):
- return self.by_transition and self.by_transition[0] or None
-
-
-
-class IWorkflowableAdapter(EntityAdapter):
- """base adapter providing workflow helper methods for workflowable entities.
- """
- __regid__ = 'IWorkflowable'
- __select__ = relation_possible('in_state')
-
- @cached
- def cwetype_workflow(self):
- """return the default workflow for entities of this type"""
- # XXX CWEType method
- wfrset = self._cw.execute('Any WF WHERE ET default_workflow WF, '
- 'ET name %(et)s', {'et': text_type(self.entity.cw_etype)})
- if wfrset:
- return wfrset.get_entity(0, 0)
- self.warning("can't find any workflow for %s", self.entity.cw_etype)
- return None
-
- @property
- def main_workflow(self):
- """return current workflow applied to this entity"""
- if self.entity.custom_workflow:
- return self.entity.custom_workflow[0]
- return self.cwetype_workflow()
-
- @property
- def current_workflow(self):
- """return current workflow applied to this entity"""
- return self.current_state and self.current_state.workflow or self.main_workflow
-
- @property
- def current_state(self):
- """return current state entity"""
- return self.entity.in_state and self.entity.in_state[0] or None
-
- @property
- def state(self):
- """return current state name"""
- try:
- return self.current_state.name
- except AttributeError:
- self.warning('entity %s has no state', self.entity)
- return None
-
- @property
- def printable_state(self):
- """return current state name translated to context's language"""
- state = self.current_state
- if state:
- return self._cw._(state.name)
- return u''
-
- @property
- def workflow_history(self):
- """return the workflow history for this entity (eg ordered list of
- TrInfo entities)
- """
- return self.entity.reverse_wf_info_for
-
- def latest_trinfo(self):
- """return the latest transition information for this entity"""
- try:
- return self.workflow_history[-1]
- except IndexError:
- return None
-
- def possible_transitions(self, type='normal'):
- """generates transition that MAY be fired for the given entity,
- expected to be in this state
- used only by the UI
- """
- if self.current_state is None or self.current_workflow is None:
- return
- rset = self._cw.execute(
- 'Any T,TT, TN WHERE S allowed_transition T, S eid %(x)s, '
- 'T type TT, T type %(type)s, '
- 'T name TN, T transition_of WF, WF eid %(wfeid)s',
- {'x': self.current_state.eid, 'type': text_type(type),
- 'wfeid': self.current_workflow.eid})
- for tr in rset.entities():
- if tr.may_be_fired(self.entity.eid):
- yield tr
-
- def subworkflow_input_trinfo(self):
- """return the TrInfo which has be recorded when this entity went into
- the current sub-workflow
- """
- if self.main_workflow.eid == self.current_workflow.eid:
- return # doesn't make sense
- subwfentries = []
- for trinfo in self.workflow_history:
- if (trinfo.transition and
- trinfo.previous_state.workflow.eid != trinfo.new_state.workflow.eid):
- # entering or leaving a subworkflow
- if (subwfentries and
- subwfentries[-1].new_state.workflow.eid == trinfo.previous_state.workflow.eid and
- subwfentries[-1].previous_state.workflow.eid == trinfo.new_state.workflow.eid):
- # leave
- del subwfentries[-1]
- else:
- # enter
- subwfentries.append(trinfo)
- if not subwfentries:
- return None
- return subwfentries[-1]
-
- def subworkflow_input_transition(self):
- """return the transition which has went through the current sub-workflow
- """
- return getattr(self.subworkflow_input_trinfo(), 'transition', None)
-
- def _add_trinfo(self, comment, commentformat, treid=None, tseid=None):
- kwargs = {}
- if comment is not None:
- kwargs['comment'] = comment
- if commentformat is not None:
- kwargs['comment_format'] = commentformat
- kwargs['wf_info_for'] = self.entity
- if treid is not None:
- kwargs['by_transition'] = self._cw.entity_from_eid(treid)
- if tseid is not None:
- kwargs['to_state'] = self._cw.entity_from_eid(tseid)
- return self._cw.create_entity('TrInfo', **kwargs)
-
- def _get_transition(self, tr):
- assert self.current_workflow
- if isinstance(tr, string_types):
- _tr = self.current_workflow.transition_by_name(tr)
- assert _tr is not None, 'not a %s transition: %s' % (
- self.__regid__, tr)
- tr = _tr
- return tr
-
- def fire_transition(self, tr, comment=None, commentformat=None):
- """change the entity's state by firing given transition (name or entity)
- in entity's workflow
- """
- tr = self._get_transition(tr)
- return self._add_trinfo(comment, commentformat, tr.eid)
-
- def fire_transition_if_possible(self, tr, comment=None, commentformat=None):
- """change the entity's state by firing given transition (name or entity)
- in entity's workflow if this transition is possible
- """
- tr = self._get_transition(tr)
- if any(tr_ for tr_ in self.possible_transitions()
- if tr_.eid == tr.eid):
- self.fire_transition(tr, comment, commentformat)
-
- def change_state(self, statename, comment=None, commentformat=None, tr=None):
- """change the entity's state to the given state (name or entity) in
- entity's workflow. This method should only by used by manager to fix an
- entity's state when their is no matching transition, otherwise
- fire_transition should be used.
- """
- assert self.current_workflow
- if hasattr(statename, 'eid'):
- stateeid = statename.eid
- else:
- state = self.current_workflow.state_by_name(statename)
- if state is None:
- raise WorkflowException('not a %s state: %s' % (self.__regid__,
- statename))
- stateeid = state.eid
- # XXX try to find matching transition?
- return self._add_trinfo(comment, commentformat, tr and tr.eid, stateeid)
-
- def set_initial_state(self, statename):
- """set a newly created entity's state to the given state (name or entity)
- in entity's workflow. This is useful if you don't want it to be the
- workflow's initial state.
- """
- assert self.current_workflow
- if hasattr(statename, 'eid'):
- stateeid = statename.eid
- else:
- state = self.current_workflow.state_by_name(statename)
- if state is None:
- raise WorkflowException('not a %s state: %s' % (self.__regid__,
- statename))
- stateeid = state.eid
- self._cw.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
- {'x': self.entity.eid, 's': stateeid})