diff -r 36851c8b6763 -r 64ecd4d96ac7 cubicweb/wfutils.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cubicweb/wfutils.py Mon May 11 13:57:34 2015 +0200 @@ -0,0 +1,149 @@ +# copyright 2017 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr +# +# This file is part of CubicWeb. +# +# CubicWeb is free software: you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 2.1 of the License, or (at your option) +# any later version. +# +# CubicWeb is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with CubicWeb. If not, see . +"""Workflow setup utilities. + +These functions work with a declarative workflow definition: + +.. code-block:: python + + { + 'etypes': 'CWGroup', + 'default': True, + 'initial_state': u'draft', + 'states': [u'draft', u'published'], + 'transitions': { + u'publish': { + 'fromstates': u'draft', + 'tostate': u'published', + 'requiredgroups': u'managers' + 'conditions': ( + 'U in_group X', + 'X owned_by U' + ) + } + } + } + +.. autofunction:: setup_workflow +.. autofunction:: cleanupworkflow +""" + +import collections + +from six import text_type + +from cubicweb import NoResultError + + +def get_tuple_or_list(value): + if value is None: + return None + if not isinstance(value, (tuple, list)): + value = (value,) + return value + + +def cleanupworkflow(cnx, wf, wfdef): + """Cleanup an existing workflow by removing the states and transitions that + do not exist in the given definition. + + :param cnx: A connexion with enough permissions to define a workflow + :param wf: A `Workflow` entity + :param wfdef: A workflow definition + """ + cnx.execute( + 'DELETE State S WHERE S state_of WF, WF eid %%(wf)s, ' + 'NOT S name IN (%s)' % ( + ', '.join('"%s"' % s for s in wfdef['states'])), + {'wf': wf.eid}) + + cnx.execute( + 'DELETE Transition T WHERE T transition_of WF, WF eid %%(wf)s, ' + 'NOT T name IN (%s)' % ( + ', '.join('"%s"' % s for s in wfdef['transitions'])), + {'wf': wf.eid}) + + +def setup_workflow(cnx, name, wfdef, cleanup=True): + """Create or update a workflow definition so it matches the given + definition. + + :param cnx: A connexion with enough permissions to define a workflow + :param name: The workflow name. Used to create the `Workflow` entity, or + to find an existing one. + :param wfdef: A workflow definition. + :param cleanup: Remove extra states and transitions. Can be done separatly + by calling :func:`cleanupworkflow`. + :return: The created/updated workflow entity + """ + name = text_type(name) + try: + wf = cnx.find('Workflow', name=name).one() + except NoResultError: + wf = cnx.create_entity('Workflow', name=name) + + etypes = get_tuple_or_list(wfdef['etypes']) + cnx.execute('DELETE WF workflow_of ETYPE WHERE WF eid %%(wf)s, ' + 'NOT ETYPE name IN (%s)' % ','.join('"%s"' for e in etypes), + {'wf': wf.eid}) + cnx.execute('SET WF workflow_of ETYPE WHERE' + ' NOT WF workflow_of ETYPE, WF eid %%(wf)s, ETYPE name IN (%s)' + % ','.join('"%s"' % e for e in etypes), + {'wf': wf.eid}) + if wfdef['default']: + cnx.execute( + 'SET ETYPE default_workflow X ' + 'WHERE ' + 'NOT ETYPE default_workflow X, ' + 'X eid %%(x)s, ETYPE name IN (%s)' % ','.join( + '"%s"' % e for e in etypes), + {'x': wf.eid}) + + states = {} + states_transitions = collections.defaultdict(list) + for state in wfdef['states']: + st = wf.state_by_name(state) or wf.add_state(state) + states[state] = st + + if 'initial_state' in wfdef: + wf.cw_set(initial_state=states[wfdef['initial_state']]) + + for trname, trdef in wfdef['transitions'].items(): + tr = (wf.transition_by_name(trname) or + cnx.create_entity('Transition', name=trname)) + tr.cw_set(transition_of=wf) + if trdef.get('tostate'): + tr.cw_set(destination_state=states[trdef['tostate']]) + fromstates = get_tuple_or_list(trdef.get('fromstates', ())) + for stname in fromstates: + states_transitions[stname].append(tr) + + requiredgroups = get_tuple_or_list(trdef.get('requiredgroups', ())) + conditions = get_tuple_or_list(trdef.get('conditions', ())) + + tr.set_permissions(requiredgroups, conditions, reset=True) + + for stname, transitions in states_transitions.items(): + state = states[stname] + state.cw_set(allowed_transition=None) + state.cw_set(allowed_transition=transitions) + + if cleanup: + cleanupworkflow(cnx, wf, wfdef) + + return wf