changeset 11057 0b59724cb3f2
parent 10831 f1b5a5ea0da0
child 11767 432f87a63057
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/cubicweb/entities/	Sat Jan 16 13:48:51 2016 +0100
@@ -0,0 +1,589 @@
+# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact --
+# This file is part of CubicWeb.
+# CubicWeb is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
+# details.
+# You should have received a copy of the GNU Lesser General Public License along
+# with CubicWeb.  If not, see <>.
+"""workflow handling:
+* entity types defining workflow (Workflow, State, Transition...)
+* workflow history (TrInfo)
+* adapter for workflowable entities (IWorkflowableAdapter)
+from __future__ import print_function
+__docformat__ = "restructuredtext en"
+from six import text_type, string_types
+from logilab.common.decorators import cached, clear_cache
+from logilab.common.deprecation import deprecated
+from cubicweb.entities import AnyEntity, fetch_config
+from cubicweb.view import EntityAdapter
+from cubicweb.predicates import relation_possible
+    from cubicweb import server
+except ImportError:
+    # We need to lookup DEBUG from there,
+    # however a pure dbapi client may not have it.
+    class server(object): pass
+    server.DEBUG = False
+class WorkflowException(Exception): pass
+class Workflow(AnyEntity):
+    __regid__ = 'Workflow'
+    @property
+    def initial(self):
+        """return the initial state for this workflow"""
+        return self.initial_state and self.initial_state[0] or None
+    def is_default_workflow_of(self, etype):
+        """return True if this workflow is the default workflow for the given
+        entity type
+        """
+        return any(et for et in self.reverse_default_workflow
+                   if == etype)
+    def iter_workflows(self, _done=None):
+        """return an iterator on actual workflows, eg this workflow and its
+        subworkflows
+        """
+        # infinite loop safety belt
+        if _done is None:
+            _done = set()
+        yield self
+        _done.add(self.eid)
+        for tr in self._cw.execute('Any T WHERE T is WorkflowTransition, '
+                                   'T transition_of WF, WF eid %(wf)s',
+                                   {'wf': self.eid}).entities():
+            if tr.subwf.eid in _done:
+                continue
+            for subwf in tr.subwf.iter_workflows(_done):
+                yield subwf
+    # state / transitions accessors ############################################
+    def state_by_name(self, statename):
+        rset = self._cw.execute('Any S, SN WHERE S name SN, S name %(n)s, '
+                                'S state_of WF, WF eid %(wf)s',
+                                {'n': statename, 'wf': self.eid})
+        if rset:
+            return rset.get_entity(0, 0)
+        return None
+    def state_by_eid(self, eid):
+        rset = self._cw.execute('Any S, SN WHERE S name SN, S eid %(s)s, '
+                                'S state_of WF, WF eid %(wf)s',
+                                {'s': eid, 'wf': self.eid})
+        if rset:
+            return rset.get_entity(0, 0)
+        return None
+    def transition_by_name(self, trname):
+        rset = self._cw.execute('Any T, TN WHERE T name TN, T name %(n)s, '
+                                'T transition_of WF, WF eid %(wf)s',
+                                {'n': text_type(trname), 'wf': self.eid})
+        if rset:
+            return rset.get_entity(0, 0)
+        return None
+    def transition_by_eid(self, eid):
+        rset = self._cw.execute('Any T, TN WHERE T name TN, T eid %(t)s, '
+                                'T transition_of WF, WF eid %(wf)s',
+                                {'t': eid, 'wf': self.eid})
+        if rset:
+            return rset.get_entity(0, 0)
+        return None
+    # wf construction methods ##################################################
+    def add_state(self, name, initial=False, **kwargs):
+        """add a state to this workflow"""
+        state = self._cw.create_entity('State', name=text_type(name), **kwargs)
+        self._cw.execute('SET S state_of WF WHERE S eid %(s)s, WF eid %(wf)s',
+                         {'s': state.eid, 'wf': self.eid})
+        if initial:
+            assert not self.initial, "Initial state already defined as %s" % self.initial
+            self._cw.execute('SET WF initial_state S '
+                             'WHERE S eid %(s)s, WF eid %(wf)s',
+                             {'s': state.eid, 'wf': self.eid})
+        return state
+    def _add_transition(self, trtype, name, fromstates,
+                        requiredgroups=(), conditions=(), **kwargs):
+        tr = self._cw.create_entity(trtype, name=text_type(name), **kwargs)
+        self._cw.execute('SET T transition_of WF '
+                         'WHERE T eid %(t)s, WF eid %(wf)s',
+                         {'t': tr.eid, 'wf': self.eid})
+        assert fromstates, fromstates
+        if not isinstance(fromstates, (tuple, list)):
+            fromstates = (fromstates,)
+        for state in fromstates:
+            if hasattr(state, 'eid'):
+                state = state.eid
+            self._cw.execute('SET S allowed_transition T '
+                             'WHERE S eid %(s)s, T eid %(t)s',
+                             {'s': state, 't': tr.eid})
+        tr.set_permissions(requiredgroups, conditions, reset=False)
+        return tr
+    def add_transition(self, name, fromstates, tostate=None,
+                       requiredgroups=(), conditions=(), **kwargs):
+        """add a transition to this workflow from some state(s) to another"""
+        tr = self._add_transition('Transition', name, fromstates,
+                                  requiredgroups, conditions, **kwargs)
+        if tostate is not None:
+            if hasattr(tostate, 'eid'):
+                tostate = tostate.eid
+            self._cw.execute('SET T destination_state S '
+                             'WHERE S eid %(s)s, T eid %(t)s',
+                             {'t': tr.eid, 's': tostate})
+        return tr
+    def add_wftransition(self, name, subworkflow, fromstates, exitpoints=(),
+                         requiredgroups=(), conditions=(), **kwargs):
+        """add a workflow transition to this workflow"""
+        tr = self._add_transition('WorkflowTransition', name, fromstates,
+                                  requiredgroups, conditions, **kwargs)
+        if hasattr(subworkflow, 'eid'):
+            subworkflow = subworkflow.eid
+        assert self._cw.execute('SET T subworkflow WF WHERE WF eid %(wf)s,T eid %(t)s',
+                                {'t': tr.eid, 'wf': subworkflow})
+        for fromstate, tostate in exitpoints:
+            tr.add_exit_point(fromstate, tostate)
+        return tr
+    def replace_state(self, todelstate, replacement):
+        """migration convenience method"""
+        if not hasattr(todelstate, 'eid'):
+            todelstate = self.state_by_name(todelstate)
+        if not hasattr(replacement, 'eid'):
+            replacement = self.state_by_name(replacement)
+        args = {'os': todelstate.eid, 'ns': replacement.eid}
+        execute = self._cw.execute
+        execute('SET X in_state NS WHERE X in_state OS, '
+                'NS eid %(ns)s, OS eid %(os)s', args)
+        execute('SET X from_state NS WHERE X from_state OS, '
+                'OS eid %(os)s, NS eid %(ns)s', args)
+        execute('SET X to_state NS WHERE X to_state OS, '
+                'OS eid %(os)s, NS eid %(ns)s', args)
+        todelstate.cw_delete()
+class BaseTransition(AnyEntity):
+    """customized class for abstract transition
+    provides a specific may_be_fired method to check if the relation may be
+    fired by the logged user
+    """
+    __regid__ = 'BaseTransition'
+    fetch_attrs, cw_fetch_order = fetch_config(['name', 'type'])
+    def __init__(self, *args, **kwargs):
+        if self.cw_etype == 'BaseTransition':
+            raise WorkflowException('should not be instantiated')
+        super(BaseTransition, self).__init__(*args, **kwargs)
+    @property
+    def workflow(self):
+        return self.transition_of[0]
+    def has_input_state(self, state):
+        if hasattr(state, 'eid'):
+            state = state.eid
+        return any(s for s in self.reverse_allowed_transition if s.eid == state)
+    def may_be_fired(self, eid):
+        """return true if the logged user may fire this transition
+        `eid` is the eid of the object on which we may fire the transition
+        """
+        DBG = False
+        if server.DEBUG & server.DBG_SEC:
+            if 'transition' in server._SECURITY_CAPS:
+                DBG = True
+        user = self._cw.user
+        # check user is at least in one of the required groups if any
+        groups = frozenset( for g in self.require_group)
+        if groups:
+            matches = user.matching_groups(groups)
+            if matches:
+                if DBG:
+                    print('may_be_fired: %r may fire: user matches %s' % (, groups))
+                return matches
+            if 'owners' in groups and user.owns(eid):
+                if DBG:
+                    print('may_be_fired: %r may fire: user is owner' %
+                return True
+        # check one of the rql expression conditions matches if any
+        if self.condition:
+            if DBG:
+                print('my_be_fired: %r: %s' %
+                      (, [(rqlexpr.expression,
+                                    rqlexpr.check_expression(self._cw, eid))
+                                    for rqlexpr in self.condition]))
+            for rqlexpr in self.condition:
+                if rqlexpr.check_expression(self._cw, eid):
+                    return True
+        if self.condition or groups:
+            return False
+        return True
+    def set_permissions(self, requiredgroups=(), conditions=(), reset=True):
+        """set or add (if `reset` is False) groups and conditions for this
+        transition
+        """
+        if reset:
+            self._cw.execute('DELETE T require_group G WHERE T eid %(x)s',
+                             {'x': self.eid})
+            self._cw.execute('DELETE T condition R WHERE T eid %(x)s',
+                             {'x': self.eid})
+        for gname in requiredgroups:
+            rset = self._cw.execute('SET T require_group G '
+                                    'WHERE T eid %(x)s, G name %(gn)s',
+                                    {'x': self.eid, 'gn': text_type(gname)})
+            assert rset, '%s is not a known group' % gname
+        if isinstance(conditions, string_types):
+            conditions = (conditions,)
+        for expr in conditions:
+            if isinstance(expr, string_types):
+                kwargs = {'expr': text_type(expr)}
+            else:
+                assert isinstance(expr, dict)
+                kwargs = expr
+            kwargs['x'] = self.eid
+            kwargs.setdefault('mainvars', u'X')
+            self._cw.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", '
+                             'X expression %(expr)s, X mainvars %(mainvars)s, '
+                             'T condition X WHERE T eid %(x)s', kwargs)
+        # XXX clear caches?
+class Transition(BaseTransition):
+    """customized class for Transition entities"""
+    __regid__ = 'Transition'
+    def dc_long_title(self):
+        return '%s (%s)' % (, self._cw._(
+    def destination(self, entity):
+        try:
+            return self.destination_state[0]
+        except IndexError:
+            return entity.cw_adapt_to('IWorkflowable').latest_trinfo().previous_state
+    def potential_destinations(self):
+        try:
+            yield self.destination_state[0]
+        except IndexError:
+            for incomingstate in self.reverse_allowed_transition:
+                for tr in incomingstate.reverse_destination_state:
+                    for previousstate in tr.reverse_allowed_transition:
+                        yield previousstate
+class WorkflowTransition(BaseTransition):
+    """customized class for WorkflowTransition entities"""
+    __regid__ = 'WorkflowTransition'
+    @property
+    def subwf(self):
+        return self.subworkflow[0]
+    def destination(self, entity):
+        return self.subwf.initial
+    def potential_destinations(self):
+        yield self.subwf.initial
+    def add_exit_point(self, fromstate, tostate):
+        if hasattr(fromstate, 'eid'):
+            fromstate = fromstate.eid
+        if tostate is None:
+            self._cw.execute('INSERT SubWorkflowExitPoint X: T subworkflow_exit X, '
+                             'X subworkflow_state FS WHERE T eid %(t)s, FS eid %(fs)s',
+                             {'t': self.eid, 'fs': fromstate})
+        else:
+            if hasattr(tostate, 'eid'):
+                tostate = tostate.eid
+            self._cw.execute('INSERT SubWorkflowExitPoint X: T subworkflow_exit X, '
+                             'X subworkflow_state FS, X destination_state TS '
+                             'WHERE T eid %(t)s, FS eid %(fs)s, TS eid %(ts)s',
+                             {'t': self.eid, 'fs': fromstate, 'ts': tostate})
+    def get_exit_point(self, entity, stateeid):
+        """if state is an exit point, return its associated destination state"""
+        if hasattr(stateeid, 'eid'):
+            stateeid = stateeid.eid
+        try:
+            tostateeid = self.exit_points()[stateeid]
+        except KeyError:
+            return None
+        if tostateeid is None:
+            # go back to state from which we've entered the subworkflow
+            return entity.cw_adapt_to('IWorkflowable').subworkflow_input_trinfo().previous_state
+        return self._cw.entity_from_eid(tostateeid)
+    @cached
+    def exit_points(self):
+        result = {}
+        for ep in self.subworkflow_exit:
+            result[ep.subwf_state.eid] = ep.destination and ep.destination.eid
+        return result
+    def cw_clear_all_caches(self):
+        super(WorkflowTransition, self).cw_clear_all_caches()
+        clear_cache(self, 'exit_points')
+class SubWorkflowExitPoint(AnyEntity):
+    """customized class for SubWorkflowExitPoint entities"""
+    __regid__ = 'SubWorkflowExitPoint'
+    @property
+    def subwf_state(self):
+        return self.subworkflow_state[0]
+    @property
+    def destination(self):
+        return self.destination_state and self.destination_state[0] or None
+class State(AnyEntity):
+    """customized class for State entities"""
+    __regid__ = 'State'
+    fetch_attrs, cw_fetch_order = fetch_config(['name'])
+    rest_attr = 'eid'
+    def dc_long_title(self):
+        return '%s (%s)' % (, self._cw._(
+    @property
+    def workflow(self):
+        # take care, may be missing in multi-sources configuration
+        return self.state_of and self.state_of[0] or None
+class TrInfo(AnyEntity):
+    """customized class for Transition information entities
+    """
+    __regid__ = 'TrInfo'
+    fetch_attrs, cw_fetch_order = fetch_config(['creation_date', 'comment'],
+                                               pclass=None) # don't want modification_date
+    @property
+    def for_entity(self):
+        return self.wf_info_for[0]
+    @property
+    def previous_state(self):
+        return self.from_state[0]
+    @property
+    def new_state(self):
+        return self.to_state[0]
+    @property
+    def transition(self):
+        return self.by_transition and self.by_transition[0] or None
+class IWorkflowableAdapter(EntityAdapter):
+    """base adapter providing workflow helper methods for workflowable entities.
+    """
+    __regid__ = 'IWorkflowable'
+    __select__ = relation_possible('in_state')
+    @cached
+    def cwetype_workflow(self):
+        """return the default workflow for entities of this type"""
+        # XXX CWEType method
+        wfrset = self._cw.execute('Any WF WHERE ET default_workflow WF, '
+                                  'ET name %(et)s', {'et': text_type(self.entity.cw_etype)})
+        if wfrset:
+            return wfrset.get_entity(0, 0)
+        self.warning("can't find any workflow for %s", self.entity.cw_etype)
+        return None
+    @property
+    def main_workflow(self):
+        """return current workflow applied to this entity"""
+        if self.entity.custom_workflow:
+            return self.entity.custom_workflow[0]
+        return self.cwetype_workflow()
+    @property
+    def current_workflow(self):
+        """return current workflow applied to this entity"""
+        return self.current_state and self.current_state.workflow or self.main_workflow
+    @property
+    def current_state(self):
+        """return current state entity"""
+        return self.entity.in_state and self.entity.in_state[0] or None
+    @property
+    def state(self):
+        """return current state name"""
+        try:
+            return
+        except AttributeError:
+            self.warning('entity %s has no state', self.entity)
+            return None
+    @property
+    def printable_state(self):
+        """return current state name translated to context's language"""
+        state = self.current_state
+        if state:
+            return self._cw._(
+        return u''
+    @property
+    def workflow_history(self):
+        """return the workflow history for this entity (eg ordered list of
+        TrInfo entities)
+        """
+        return self.entity.reverse_wf_info_for
+    def latest_trinfo(self):
+        """return the latest transition information for this entity"""
+        try:
+            return self.workflow_history[-1]
+        except IndexError:
+            return None
+    def possible_transitions(self, type='normal'):
+        """generates transition that MAY be fired for the given entity,
+        expected to be in this state
+        used only by the UI
+        """
+        if self.current_state is None or self.current_workflow is None:
+            return
+        rset = self._cw.execute(
+            'Any T,TT, TN WHERE S allowed_transition T, S eid %(x)s, '
+            'T type TT, T type %(type)s, '
+            'T name TN, T transition_of WF, WF eid %(wfeid)s',
+            {'x': self.current_state.eid, 'type': text_type(type),
+             'wfeid': self.current_workflow.eid})
+        for tr in rset.entities():
+            if tr.may_be_fired(self.entity.eid):
+                yield tr
+    def subworkflow_input_trinfo(self):
+        """return the TrInfo which has be recorded when this entity went into
+        the current sub-workflow
+        """
+        if self.main_workflow.eid == self.current_workflow.eid:
+            return # doesn't make sense
+        subwfentries = []
+        for trinfo in self.workflow_history:
+            if (trinfo.transition and
+                trinfo.previous_state.workflow.eid != trinfo.new_state.workflow.eid):
+                # entering or leaving a subworkflow
+                if (subwfentries and
+                    subwfentries[-1].new_state.workflow.eid == trinfo.previous_state.workflow.eid and
+                    subwfentries[-1].previous_state.workflow.eid == trinfo.new_state.workflow.eid):
+                    # leave
+                    del subwfentries[-1]
+                else:
+                    # enter
+                    subwfentries.append(trinfo)
+        if not subwfentries:
+            return None
+        return subwfentries[-1]
+    def subworkflow_input_transition(self):
+        """return the transition which has went through the current sub-workflow
+        """
+        return getattr(self.subworkflow_input_trinfo(), 'transition', None)
+    def _add_trinfo(self, comment, commentformat, treid=None, tseid=None):
+        kwargs = {}
+        if comment is not None:
+            kwargs['comment'] = comment
+            if commentformat is not None:
+                kwargs['comment_format'] = commentformat
+        kwargs['wf_info_for'] = self.entity
+        if treid is not None:
+            kwargs['by_transition'] = self._cw.entity_from_eid(treid)
+        if tseid is not None:
+            kwargs['to_state'] = self._cw.entity_from_eid(tseid)
+        return self._cw.create_entity('TrInfo', **kwargs)
+    def _get_transition(self, tr):
+        assert self.current_workflow
+        if isinstance(tr, string_types):
+            _tr = self.current_workflow.transition_by_name(tr)
+            assert _tr is not None, 'not a %s transition: %s' % (
+                self.__regid__, tr)
+            tr = _tr
+        return tr
+    def fire_transition(self, tr, comment=None, commentformat=None):
+        """change the entity's state by firing given transition (name or entity)
+        in entity's workflow
+        """
+        tr = self._get_transition(tr)
+        return self._add_trinfo(comment, commentformat, tr.eid)
+    def fire_transition_if_possible(self, tr, comment=None, commentformat=None):
+        """change the entity's state by firing given transition (name or entity)
+        in entity's workflow if this transition is possible
+        """
+        tr = self._get_transition(tr)
+        if any(tr_ for tr_ in self.possible_transitions()
+               if tr_.eid == tr.eid):
+            self.fire_transition(tr, comment, commentformat)
+    def change_state(self, statename, comment=None, commentformat=None, tr=None):
+        """change the entity's state to the given state (name or entity) in
+        entity's workflow. This method should only by used by manager to fix an
+        entity's state when their is no matching transition, otherwise
+        fire_transition should be used.
+        """
+        assert self.current_workflow
+        if hasattr(statename, 'eid'):
+            stateeid = statename.eid
+        else:
+            state = self.current_workflow.state_by_name(statename)
+            if state is None:
+                raise WorkflowException('not a %s state: %s' % (self.__regid__,
+                                                                statename))
+            stateeid = state.eid
+        # XXX try to find matching transition?
+        return self._add_trinfo(comment, commentformat, tr and tr.eid, stateeid)
+    def set_initial_state(self, statename):
+        """set a newly created entity's state to the given state (name or entity)
+        in entity's workflow. This is useful if you don't want it to be the
+        workflow's initial state.
+        """
+        assert self.current_workflow
+        if hasattr(statename, 'eid'):
+            stateeid = statename.eid
+        else:
+            state = self.current_workflow.state_by_name(statename)
+            if state is None:
+                raise WorkflowException('not a %s state: %s' % (self.__regid__,
+                                                                statename))
+            stateeid = state.eid
+        self._cw.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
+                         {'x': self.entity.eid, 's': stateeid})