entities/wfobjs.py
changeset 11057 0b59724cb3f2
parent 11052 058bb3dc685f
child 11058 23eb30449fe5
--- a/entities/wfobjs.py	Mon Jan 04 18:40:30 2016 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,589 +0,0 @@
-# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
-# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
-#
-# This file is part of CubicWeb.
-#
-# CubicWeb is free software: you can redistribute it and/or modify it under the
-# terms of the GNU Lesser General Public License as published by the Free
-# Software Foundation, either version 2.1 of the License, or (at your option)
-# any later version.
-#
-# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
-# details.
-#
-# You should have received a copy of the GNU Lesser General Public License along
-# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
-"""workflow handling:
-
-* entity types defining workflow (Workflow, State, Transition...)
-* workflow history (TrInfo)
-* adapter for workflowable entities (IWorkflowableAdapter)
-"""
-from __future__ import print_function
-
-__docformat__ = "restructuredtext en"
-
-from six import text_type, string_types
-
-from logilab.common.decorators import cached, clear_cache
-from logilab.common.deprecation import deprecated
-
-from cubicweb.entities import AnyEntity, fetch_config
-from cubicweb.view import EntityAdapter
-from cubicweb.predicates import relation_possible
-
-
-try:
-    from cubicweb import server
-except ImportError:
-    # We need to lookup DEBUG from there,
-    # however a pure dbapi client may not have it.
-    class server(object): pass
-    server.DEBUG = False
-
-
-class WorkflowException(Exception): pass
-
-class Workflow(AnyEntity):
-    __regid__ = 'Workflow'
-
-    @property
-    def initial(self):
-        """return the initial state for this workflow"""
-        return self.initial_state and self.initial_state[0] or None
-
-    def is_default_workflow_of(self, etype):
-        """return True if this workflow is the default workflow for the given
-        entity type
-        """
-        return any(et for et in self.reverse_default_workflow
-                   if et.name == etype)
-
-    def iter_workflows(self, _done=None):
-        """return an iterator on actual workflows, eg this workflow and its
-        subworkflows
-        """
-        # infinite loop safety belt
-        if _done is None:
-            _done = set()
-        yield self
-        _done.add(self.eid)
-        for tr in self._cw.execute('Any T WHERE T is WorkflowTransition, '
-                                   'T transition_of WF, WF eid %(wf)s',
-                                   {'wf': self.eid}).entities():
-            if tr.subwf.eid in _done:
-                continue
-            for subwf in tr.subwf.iter_workflows(_done):
-                yield subwf
-
-    # state / transitions accessors ############################################
-
-    def state_by_name(self, statename):
-        rset = self._cw.execute('Any S, SN WHERE S name SN, S name %(n)s, '
-                                'S state_of WF, WF eid %(wf)s',
-                                {'n': statename, 'wf': self.eid})
-        if rset:
-            return rset.get_entity(0, 0)
-        return None
-
-    def state_by_eid(self, eid):
-        rset = self._cw.execute('Any S, SN WHERE S name SN, S eid %(s)s, '
-                                'S state_of WF, WF eid %(wf)s',
-                                {'s': eid, 'wf': self.eid})
-        if rset:
-            return rset.get_entity(0, 0)
-        return None
-
-    def transition_by_name(self, trname):
-        rset = self._cw.execute('Any T, TN WHERE T name TN, T name %(n)s, '
-                                'T transition_of WF, WF eid %(wf)s',
-                                {'n': text_type(trname), 'wf': self.eid})
-        if rset:
-            return rset.get_entity(0, 0)
-        return None
-
-    def transition_by_eid(self, eid):
-        rset = self._cw.execute('Any T, TN WHERE T name TN, T eid %(t)s, '
-                                'T transition_of WF, WF eid %(wf)s',
-                                {'t': eid, 'wf': self.eid})
-        if rset:
-            return rset.get_entity(0, 0)
-        return None
-
-    # wf construction methods ##################################################
-
-    def add_state(self, name, initial=False, **kwargs):
-        """add a state to this workflow"""
-        state = self._cw.create_entity('State', name=text_type(name), **kwargs)
-        self._cw.execute('SET S state_of WF WHERE S eid %(s)s, WF eid %(wf)s',
-                         {'s': state.eid, 'wf': self.eid})
-        if initial:
-            assert not self.initial, "Initial state already defined as %s" % self.initial
-            self._cw.execute('SET WF initial_state S '
-                             'WHERE S eid %(s)s, WF eid %(wf)s',
-                             {'s': state.eid, 'wf': self.eid})
-        return state
-
-    def _add_transition(self, trtype, name, fromstates,
-                        requiredgroups=(), conditions=(), **kwargs):
-        tr = self._cw.create_entity(trtype, name=text_type(name), **kwargs)
-        self._cw.execute('SET T transition_of WF '
-                         'WHERE T eid %(t)s, WF eid %(wf)s',
-                         {'t': tr.eid, 'wf': self.eid})
-        assert fromstates, fromstates
-        if not isinstance(fromstates, (tuple, list)):
-            fromstates = (fromstates,)
-        for state in fromstates:
-            if hasattr(state, 'eid'):
-                state = state.eid
-            self._cw.execute('SET S allowed_transition T '
-                             'WHERE S eid %(s)s, T eid %(t)s',
-                             {'s': state, 't': tr.eid})
-        tr.set_permissions(requiredgroups, conditions, reset=False)
-        return tr
-
-    def add_transition(self, name, fromstates, tostate=None,
-                       requiredgroups=(), conditions=(), **kwargs):
-        """add a transition to this workflow from some state(s) to another"""
-        tr = self._add_transition('Transition', name, fromstates,
-                                  requiredgroups, conditions, **kwargs)
-        if tostate is not None:
-            if hasattr(tostate, 'eid'):
-                tostate = tostate.eid
-            self._cw.execute('SET T destination_state S '
-                             'WHERE S eid %(s)s, T eid %(t)s',
-                             {'t': tr.eid, 's': tostate})
-        return tr
-
-    def add_wftransition(self, name, subworkflow, fromstates, exitpoints=(),
-                         requiredgroups=(), conditions=(), **kwargs):
-        """add a workflow transition to this workflow"""
-        tr = self._add_transition('WorkflowTransition', name, fromstates,
-                                  requiredgroups, conditions, **kwargs)
-        if hasattr(subworkflow, 'eid'):
-            subworkflow = subworkflow.eid
-        assert self._cw.execute('SET T subworkflow WF WHERE WF eid %(wf)s,T eid %(t)s',
-                                {'t': tr.eid, 'wf': subworkflow})
-        for fromstate, tostate in exitpoints:
-            tr.add_exit_point(fromstate, tostate)
-        return tr
-
-    def replace_state(self, todelstate, replacement):
-        """migration convenience method"""
-        if not hasattr(todelstate, 'eid'):
-            todelstate = self.state_by_name(todelstate)
-        if not hasattr(replacement, 'eid'):
-            replacement = self.state_by_name(replacement)
-        args = {'os': todelstate.eid, 'ns': replacement.eid}
-        execute = self._cw.execute
-        execute('SET X in_state NS WHERE X in_state OS, '
-                'NS eid %(ns)s, OS eid %(os)s', args)
-        execute('SET X from_state NS WHERE X from_state OS, '
-                'OS eid %(os)s, NS eid %(ns)s', args)
-        execute('SET X to_state NS WHERE X to_state OS, '
-                'OS eid %(os)s, NS eid %(ns)s', args)
-        todelstate.cw_delete()
-
-
-class BaseTransition(AnyEntity):
-    """customized class for abstract transition
-
-    provides a specific may_be_fired method to check if the relation may be
-    fired by the logged user
-    """
-    __regid__ = 'BaseTransition'
-    fetch_attrs, cw_fetch_order = fetch_config(['name', 'type'])
-
-    def __init__(self, *args, **kwargs):
-        if self.cw_etype == 'BaseTransition':
-            raise WorkflowException('should not be instantiated')
-        super(BaseTransition, self).__init__(*args, **kwargs)
-
-    @property
-    def workflow(self):
-        return self.transition_of[0]
-
-    def has_input_state(self, state):
-        if hasattr(state, 'eid'):
-            state = state.eid
-        return any(s for s in self.reverse_allowed_transition if s.eid == state)
-
-    def may_be_fired(self, eid):
-        """return true if the logged user may fire this transition
-
-        `eid` is the eid of the object on which we may fire the transition
-        """
-        DBG = False
-        if server.DEBUG & server.DBG_SEC:
-            if 'transition' in server._SECURITY_CAPS:
-                DBG = True
-        user = self._cw.user
-        # check user is at least in one of the required groups if any
-        groups = frozenset(g.name for g in self.require_group)
-        if groups:
-            matches = user.matching_groups(groups)
-            if matches:
-                if DBG:
-                    print('may_be_fired: %r may fire: user matches %s' % (self.name, groups))
-                return matches
-            if 'owners' in groups and user.owns(eid):
-                if DBG:
-                    print('may_be_fired: %r may fire: user is owner' % self.name)
-                return True
-        # check one of the rql expression conditions matches if any
-        if self.condition:
-            if DBG:
-                print('my_be_fired: %r: %s' %
-                      (self.name, [(rqlexpr.expression,
-                                    rqlexpr.check_expression(self._cw, eid))
-                                    for rqlexpr in self.condition]))
-            for rqlexpr in self.condition:
-                if rqlexpr.check_expression(self._cw, eid):
-                    return True
-        if self.condition or groups:
-            return False
-        return True
-
-    def set_permissions(self, requiredgroups=(), conditions=(), reset=True):
-        """set or add (if `reset` is False) groups and conditions for this
-        transition
-        """
-        if reset:
-            self._cw.execute('DELETE T require_group G WHERE T eid %(x)s',
-                             {'x': self.eid})
-            self._cw.execute('DELETE T condition R WHERE T eid %(x)s',
-                             {'x': self.eid})
-        for gname in requiredgroups:
-            rset = self._cw.execute('SET T require_group G '
-                                    'WHERE T eid %(x)s, G name %(gn)s',
-                                    {'x': self.eid, 'gn': text_type(gname)})
-            assert rset, '%s is not a known group' % gname
-        if isinstance(conditions, string_types):
-            conditions = (conditions,)
-        for expr in conditions:
-            if isinstance(expr, string_types):
-                kwargs = {'expr': text_type(expr)}
-            else:
-                assert isinstance(expr, dict)
-                kwargs = expr
-            kwargs['x'] = self.eid
-            kwargs.setdefault('mainvars', u'X')
-            self._cw.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", '
-                             'X expression %(expr)s, X mainvars %(mainvars)s, '
-                             'T condition X WHERE T eid %(x)s', kwargs)
-        # XXX clear caches?
-
-
-class Transition(BaseTransition):
-    """customized class for Transition entities"""
-    __regid__ = 'Transition'
-
-    def dc_long_title(self):
-        return '%s (%s)' % (self.name, self._cw._(self.name))
-
-    def destination(self, entity):
-        try:
-            return self.destination_state[0]
-        except IndexError:
-            return entity.cw_adapt_to('IWorkflowable').latest_trinfo().previous_state
-
-    def potential_destinations(self):
-        try:
-            yield self.destination_state[0]
-        except IndexError:
-            for incomingstate in self.reverse_allowed_transition:
-                for tr in incomingstate.reverse_destination_state:
-                    for previousstate in tr.reverse_allowed_transition:
-                        yield previousstate
-
-
-class WorkflowTransition(BaseTransition):
-    """customized class for WorkflowTransition entities"""
-    __regid__ = 'WorkflowTransition'
-
-    @property
-    def subwf(self):
-        return self.subworkflow[0]
-
-    def destination(self, entity):
-        return self.subwf.initial
-
-    def potential_destinations(self):
-        yield self.subwf.initial
-
-    def add_exit_point(self, fromstate, tostate):
-        if hasattr(fromstate, 'eid'):
-            fromstate = fromstate.eid
-        if tostate is None:
-            self._cw.execute('INSERT SubWorkflowExitPoint X: T subworkflow_exit X, '
-                             'X subworkflow_state FS WHERE T eid %(t)s, FS eid %(fs)s',
-                             {'t': self.eid, 'fs': fromstate})
-        else:
-            if hasattr(tostate, 'eid'):
-                tostate = tostate.eid
-            self._cw.execute('INSERT SubWorkflowExitPoint X: T subworkflow_exit X, '
-                             'X subworkflow_state FS, X destination_state TS '
-                             'WHERE T eid %(t)s, FS eid %(fs)s, TS eid %(ts)s',
-                             {'t': self.eid, 'fs': fromstate, 'ts': tostate})
-
-    def get_exit_point(self, entity, stateeid):
-        """if state is an exit point, return its associated destination state"""
-        if hasattr(stateeid, 'eid'):
-            stateeid = stateeid.eid
-        try:
-            tostateeid = self.exit_points()[stateeid]
-        except KeyError:
-            return None
-        if tostateeid is None:
-            # go back to state from which we've entered the subworkflow
-            return entity.cw_adapt_to('IWorkflowable').subworkflow_input_trinfo().previous_state
-        return self._cw.entity_from_eid(tostateeid)
-
-    @cached
-    def exit_points(self):
-        result = {}
-        for ep in self.subworkflow_exit:
-            result[ep.subwf_state.eid] = ep.destination and ep.destination.eid
-        return result
-
-    def cw_clear_all_caches(self):
-        super(WorkflowTransition, self).cw_clear_all_caches()
-        clear_cache(self, 'exit_points')
-
-
-class SubWorkflowExitPoint(AnyEntity):
-    """customized class for SubWorkflowExitPoint entities"""
-    __regid__ = 'SubWorkflowExitPoint'
-
-    @property
-    def subwf_state(self):
-        return self.subworkflow_state[0]
-
-    @property
-    def destination(self):
-        return self.destination_state and self.destination_state[0] or None
-
-
-class State(AnyEntity):
-    """customized class for State entities"""
-    __regid__ = 'State'
-    fetch_attrs, cw_fetch_order = fetch_config(['name'])
-    rest_attr = 'eid'
-
-    def dc_long_title(self):
-        return '%s (%s)' % (self.name, self._cw._(self.name))
-
-    @property
-    def workflow(self):
-        # take care, may be missing in multi-sources configuration
-        return self.state_of and self.state_of[0] or None
-
-
-class TrInfo(AnyEntity):
-    """customized class for Transition information entities
-    """
-    __regid__ = 'TrInfo'
-    fetch_attrs, cw_fetch_order = fetch_config(['creation_date', 'comment'],
-                                               pclass=None) # don't want modification_date
-    @property
-    def for_entity(self):
-        return self.wf_info_for[0]
-
-    @property
-    def previous_state(self):
-        return self.from_state[0]
-
-    @property
-    def new_state(self):
-        return self.to_state[0]
-
-    @property
-    def transition(self):
-        return self.by_transition and self.by_transition[0] or None
-
-
-
-class IWorkflowableAdapter(EntityAdapter):
-    """base adapter providing workflow helper methods for workflowable entities.
-    """
-    __regid__ = 'IWorkflowable'
-    __select__ = relation_possible('in_state')
-
-    @cached
-    def cwetype_workflow(self):
-        """return the default workflow for entities of this type"""
-        # XXX CWEType method
-        wfrset = self._cw.execute('Any WF WHERE ET default_workflow WF, '
-                                  'ET name %(et)s', {'et': text_type(self.entity.cw_etype)})
-        if wfrset:
-            return wfrset.get_entity(0, 0)
-        self.warning("can't find any workflow for %s", self.entity.cw_etype)
-        return None
-
-    @property
-    def main_workflow(self):
-        """return current workflow applied to this entity"""
-        if self.entity.custom_workflow:
-            return self.entity.custom_workflow[0]
-        return self.cwetype_workflow()
-
-    @property
-    def current_workflow(self):
-        """return current workflow applied to this entity"""
-        return self.current_state and self.current_state.workflow or self.main_workflow
-
-    @property
-    def current_state(self):
-        """return current state entity"""
-        return self.entity.in_state and self.entity.in_state[0] or None
-
-    @property
-    def state(self):
-        """return current state name"""
-        try:
-            return self.current_state.name
-        except AttributeError:
-            self.warning('entity %s has no state', self.entity)
-            return None
-
-    @property
-    def printable_state(self):
-        """return current state name translated to context's language"""
-        state = self.current_state
-        if state:
-            return self._cw._(state.name)
-        return u''
-
-    @property
-    def workflow_history(self):
-        """return the workflow history for this entity (eg ordered list of
-        TrInfo entities)
-        """
-        return self.entity.reverse_wf_info_for
-
-    def latest_trinfo(self):
-        """return the latest transition information for this entity"""
-        try:
-            return self.workflow_history[-1]
-        except IndexError:
-            return None
-
-    def possible_transitions(self, type='normal'):
-        """generates transition that MAY be fired for the given entity,
-        expected to be in this state
-        used only by the UI
-        """
-        if self.current_state is None or self.current_workflow is None:
-            return
-        rset = self._cw.execute(
-            'Any T,TT, TN WHERE S allowed_transition T, S eid %(x)s, '
-            'T type TT, T type %(type)s, '
-            'T name TN, T transition_of WF, WF eid %(wfeid)s',
-            {'x': self.current_state.eid, 'type': text_type(type),
-             'wfeid': self.current_workflow.eid})
-        for tr in rset.entities():
-            if tr.may_be_fired(self.entity.eid):
-                yield tr
-
-    def subworkflow_input_trinfo(self):
-        """return the TrInfo which has be recorded when this entity went into
-        the current sub-workflow
-        """
-        if self.main_workflow.eid == self.current_workflow.eid:
-            return # doesn't make sense
-        subwfentries = []
-        for trinfo in self.workflow_history:
-            if (trinfo.transition and
-                trinfo.previous_state.workflow.eid != trinfo.new_state.workflow.eid):
-                # entering or leaving a subworkflow
-                if (subwfentries and
-                    subwfentries[-1].new_state.workflow.eid == trinfo.previous_state.workflow.eid and
-                    subwfentries[-1].previous_state.workflow.eid == trinfo.new_state.workflow.eid):
-                    # leave
-                    del subwfentries[-1]
-                else:
-                    # enter
-                    subwfentries.append(trinfo)
-        if not subwfentries:
-            return None
-        return subwfentries[-1]
-
-    def subworkflow_input_transition(self):
-        """return the transition which has went through the current sub-workflow
-        """
-        return getattr(self.subworkflow_input_trinfo(), 'transition', None)
-
-    def _add_trinfo(self, comment, commentformat, treid=None, tseid=None):
-        kwargs = {}
-        if comment is not None:
-            kwargs['comment'] = comment
-            if commentformat is not None:
-                kwargs['comment_format'] = commentformat
-        kwargs['wf_info_for'] = self.entity
-        if treid is not None:
-            kwargs['by_transition'] = self._cw.entity_from_eid(treid)
-        if tseid is not None:
-            kwargs['to_state'] = self._cw.entity_from_eid(tseid)
-        return self._cw.create_entity('TrInfo', **kwargs)
-
-    def _get_transition(self, tr):
-        assert self.current_workflow
-        if isinstance(tr, string_types):
-            _tr = self.current_workflow.transition_by_name(tr)
-            assert _tr is not None, 'not a %s transition: %s' % (
-                self.__regid__, tr)
-            tr = _tr
-        return tr
-
-    def fire_transition(self, tr, comment=None, commentformat=None):
-        """change the entity's state by firing given transition (name or entity)
-        in entity's workflow
-        """
-        tr = self._get_transition(tr)
-        return self._add_trinfo(comment, commentformat, tr.eid)
-
-    def fire_transition_if_possible(self, tr, comment=None, commentformat=None):
-        """change the entity's state by firing given transition (name or entity)
-        in entity's workflow if this transition is possible
-        """
-        tr = self._get_transition(tr)
-        if any(tr_ for tr_ in self.possible_transitions()
-               if tr_.eid == tr.eid):
-            self.fire_transition(tr, comment, commentformat)
-
-    def change_state(self, statename, comment=None, commentformat=None, tr=None):
-        """change the entity's state to the given state (name or entity) in
-        entity's workflow. This method should only by used by manager to fix an
-        entity's state when their is no matching transition, otherwise
-        fire_transition should be used.
-        """
-        assert self.current_workflow
-        if hasattr(statename, 'eid'):
-            stateeid = statename.eid
-        else:
-            state = self.current_workflow.state_by_name(statename)
-            if state is None:
-                raise WorkflowException('not a %s state: %s' % (self.__regid__,
-                                                                statename))
-            stateeid = state.eid
-        # XXX try to find matching transition?
-        return self._add_trinfo(comment, commentformat, tr and tr.eid, stateeid)
-
-    def set_initial_state(self, statename):
-        """set a newly created entity's state to the given state (name or entity)
-        in entity's workflow. This is useful if you don't want it to be the
-        workflow's initial state.
-        """
-        assert self.current_workflow
-        if hasattr(statename, 'eid'):
-            stateeid = statename.eid
-        else:
-            state = self.current_workflow.state_by_name(statename)
-            if state is None:
-                raise WorkflowException('not a %s state: %s' % (self.__regid__,
-                                                                statename))
-            stateeid = state.eid
-        self._cw.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
-                         {'x': self.entity.eid, 's': stateeid})