cubicweb/wfutils.py
author Philippe Pepiot <ph@itsalwaysdns.eu>
Mon, 30 Mar 2020 15:46:12 +0200
changeset 12963 dd9e98b25213
parent 12567 26744ad37953
permissions -rw-r--r--
[server] dynamically close idle database connections When pool hasn't been empty for `idle_timeout` time, start closing connections.

# copyright 2017 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
"""Workflow setup utilities.

These functions work with a declarative workflow definition:

.. code-block:: python

        {
            'etypes': 'CWGroup',
            'default': True,
            'initial_state': u'draft',
            'states': [u'draft', u'published'],
            'transitions': {
                u'publish': {
                    'fromstates': u'draft',
                    'tostate': u'published',
                    'requiredgroups': u'managers'
                    'conditions': (
                        'U in_group X',
                        'X owned_by U'
                    )
                }
            }
        }

.. autofunction:: setup_workflow
.. autofunction:: cleanupworkflow
"""

import collections

from cubicweb import NoResultError


def get_tuple_or_list(value):
    if value is None:
        return None
    if not isinstance(value, (tuple, list)):
        value = (value,)
    return value


def cleanupworkflow(cnx, wf, wfdef):
    """Cleanup an existing workflow by removing the states and transitions that
    do not exist in the given definition.

    :param cnx: A connexion with enough permissions to define a workflow
    :param wf: A `Workflow` entity
    :param wfdef: A workflow definition
    """
    cnx.execute(
        'DELETE State S WHERE S state_of WF, WF eid %%(wf)s, '
        'NOT S name IN (%s)' % (
            ', '.join('"%s"' % s for s in wfdef['states'])),
        {'wf': wf.eid})

    cnx.execute(
        'DELETE Transition T WHERE T transition_of WF, WF eid %%(wf)s, '
        'NOT T name IN (%s)' % (
            ', '.join('"%s"' % s for s in wfdef['transitions'])),
        {'wf': wf.eid})


def setup_workflow(cnx, name, wfdef, cleanup=True):
    """Create or update a workflow definition so it matches the given
    definition.

    :param cnx: A connexion with enough permissions to define a workflow
    :param name: The workflow name. Used to create the `Workflow` entity, or
                 to find an existing one.
    :param wfdef: A workflow definition.
    :param cleanup: Remove extra states and transitions. Can be done separatly
                    by calling :func:`cleanupworkflow`.
    :return: The created/updated workflow entity
    """
    try:
        wf = cnx.find('Workflow', name=name).one()
    except NoResultError:
        wf = cnx.create_entity('Workflow', name=name)

    etypes = get_tuple_or_list(wfdef['etypes'])
    cnx.execute('DELETE WF workflow_of ETYPE WHERE WF eid %%(wf)s, '
                'NOT ETYPE name IN (%s)' % ','.join('"%s"' for e in etypes),
                {'wf': wf.eid})
    cnx.execute('SET WF workflow_of ETYPE WHERE'
                ' NOT WF workflow_of ETYPE, WF eid %%(wf)s, ETYPE name IN (%s)'
                % ','.join('"%s"' % e for e in etypes),
                {'wf': wf.eid})
    if wfdef['default']:
        cnx.execute(
            'SET ETYPE default_workflow X '
            'WHERE '
            'NOT ETYPE default_workflow X, '
            'X eid %%(x)s, ETYPE name IN (%s)' % ','.join(
                '"%s"' % e for e in etypes),
            {'x': wf.eid})

    states = {}
    states_transitions = collections.defaultdict(list)
    for state in wfdef['states']:
        st = wf.state_by_name(state) or wf.add_state(state)
        states[state] = st

    if 'initial_state' in wfdef:
        wf.cw_set(initial_state=states[wfdef['initial_state']])

    for trname, trdef in wfdef['transitions'].items():
        tr = (wf.transition_by_name(trname)
              or cnx.create_entity('Transition', name=trname))
        tr.cw_set(transition_of=wf)
        if trdef.get('tostate'):
            tr.cw_set(destination_state=states[trdef['tostate']])
        fromstates = get_tuple_or_list(trdef.get('fromstates', ()))
        for stname in fromstates:
            states_transitions[stname].append(tr)

        requiredgroups = get_tuple_or_list(trdef.get('requiredgroups', ()))
        conditions = get_tuple_or_list(trdef.get('conditions', ()))

        tr.set_permissions(requiredgroups, conditions, reset=True)

    for stname, transitions in states_transitions.items():
        state = states[stname]
        state.cw_set(allowed_transition=None)
        state.cw_set(allowed_transition=transitions)

    if cleanup:
        cleanupworkflow(cnx, wf, wfdef)

    return wf