cubicweb/wfutils.py
changeset 11963 64ecd4d96ac7
child 12355 c703dc95c82e
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/cubicweb/wfutils.py	Mon May 11 13:57:34 2015 +0200
@@ -0,0 +1,149 @@
+# copyright 2017 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
+#
+# This file is part of CubicWeb.
+#
+# CubicWeb is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License along
+# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
+"""Workflow setup utilities.
+
+These functions work with a declarative workflow definition:
+
+.. code-block:: python
+
+        {
+            'etypes': 'CWGroup',
+            'default': True,
+            'initial_state': u'draft',
+            'states': [u'draft', u'published'],
+            'transitions': {
+                u'publish': {
+                    'fromstates': u'draft',
+                    'tostate': u'published',
+                    'requiredgroups': u'managers'
+                    'conditions': (
+                        'U in_group X',
+                        'X owned_by U'
+                    )
+                }
+            }
+        }
+
+.. autofunction:: setup_workflow
+.. autofunction:: cleanupworkflow
+"""
+
+import collections
+
+from six import text_type
+
+from cubicweb import NoResultError
+
+
+def get_tuple_or_list(value):
+    if value is None:
+        return None
+    if not isinstance(value, (tuple, list)):
+        value = (value,)
+    return value
+
+
+def cleanupworkflow(cnx, wf, wfdef):
+    """Cleanup an existing workflow by removing the states and transitions that
+    do not exist in the given definition.
+
+    :param cnx: A connexion with enough permissions to define a workflow
+    :param wf: A `Workflow` entity
+    :param wfdef: A workflow definition
+    """
+    cnx.execute(
+        'DELETE State S WHERE S state_of WF, WF eid %%(wf)s, '
+        'NOT S name IN (%s)' % (
+            ', '.join('"%s"' % s for s in wfdef['states'])),
+        {'wf': wf.eid})
+
+    cnx.execute(
+        'DELETE Transition T WHERE T transition_of WF, WF eid %%(wf)s, '
+        'NOT T name IN (%s)' % (
+            ', '.join('"%s"' % s for s in wfdef['transitions'])),
+        {'wf': wf.eid})
+
+
+def setup_workflow(cnx, name, wfdef, cleanup=True):
+    """Create or update a workflow definition so it matches the given
+    definition.
+
+    :param cnx: A connexion with enough permissions to define a workflow
+    :param name: The workflow name. Used to create the `Workflow` entity, or
+                 to find an existing one.
+    :param wfdef: A workflow definition.
+    :param cleanup: Remove extra states and transitions. Can be done separatly
+                    by calling :func:`cleanupworkflow`.
+    :return: The created/updated workflow entity
+    """
+    name = text_type(name)
+    try:
+        wf = cnx.find('Workflow', name=name).one()
+    except NoResultError:
+        wf = cnx.create_entity('Workflow', name=name)
+
+    etypes = get_tuple_or_list(wfdef['etypes'])
+    cnx.execute('DELETE WF workflow_of ETYPE WHERE WF eid %%(wf)s, '
+                'NOT ETYPE name IN (%s)' % ','.join('"%s"' for e in etypes),
+                {'wf': wf.eid})
+    cnx.execute('SET WF workflow_of ETYPE WHERE'
+                ' NOT WF workflow_of ETYPE, WF eid %%(wf)s, ETYPE name IN (%s)'
+                % ','.join('"%s"' % e for e in etypes),
+                {'wf': wf.eid})
+    if wfdef['default']:
+        cnx.execute(
+            'SET ETYPE default_workflow X '
+            'WHERE '
+            'NOT ETYPE default_workflow X, '
+            'X eid %%(x)s, ETYPE name IN (%s)' % ','.join(
+                '"%s"' % e for e in etypes),
+            {'x': wf.eid})
+
+    states = {}
+    states_transitions = collections.defaultdict(list)
+    for state in wfdef['states']:
+        st = wf.state_by_name(state) or wf.add_state(state)
+        states[state] = st
+
+    if 'initial_state' in wfdef:
+        wf.cw_set(initial_state=states[wfdef['initial_state']])
+
+    for trname, trdef in wfdef['transitions'].items():
+        tr = (wf.transition_by_name(trname) or
+              cnx.create_entity('Transition', name=trname))
+        tr.cw_set(transition_of=wf)
+        if trdef.get('tostate'):
+            tr.cw_set(destination_state=states[trdef['tostate']])
+        fromstates = get_tuple_or_list(trdef.get('fromstates', ()))
+        for stname in fromstates:
+            states_transitions[stname].append(tr)
+
+        requiredgroups = get_tuple_or_list(trdef.get('requiredgroups', ()))
+        conditions = get_tuple_or_list(trdef.get('conditions', ()))
+
+        tr.set_permissions(requiredgroups, conditions, reset=True)
+
+    for stname, transitions in states_transitions.items():
+        state = states[stname]
+        state.cw_set(allowed_transition=None)
+        state.cw_set(allowed_transition=transitions)
+
+    if cleanup:
+        cleanupworkflow(cnx, wf, wfdef)
+
+    return wf