"""workflow definition and history related entities:organization: Logilab:copyright: 2001-2010 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses"""__docformat__="restructuredtext en"fromwarningsimportwarnfromlogilab.common.decoratorsimportcached,clear_cachefromlogilab.common.deprecationimportdeprecatedfromlogilab.common.compatimportanyfromcubicweb.entitiesimportAnyEntity,fetch_configfromcubicweb.interfacesimportIWorkflowablefromcubicweb.mixinsimportMI_REL_TRIGGERSclassWorkflowException(Exception):passclassWorkflow(AnyEntity):__regid__='Workflow'@propertydefinitial(self):"""return the initial state for this workflow"""returnself.initial_stateandself.initial_state[0]orNonedefis_default_workflow_of(self,etype):"""return True if this workflow is the default workflow for the given entity type """returnany(etforetinself.reverse_default_workflowifet.name==etype)# XXX define parent() instead? what if workflow of multiple types?defafter_deletion_path(self):"""return (path, parameters) which should be used as redirect information when this entity is being deleted """ifself.workflow_of:returnself.workflow_of[0].rest_path(),{'vid':'workflow'}returnsuper(Workflow,self).after_deletion_path()defiter_workflows(self,_done=None):"""return an iterator on actual workflows, eg this workflow and its subworkflows """# infinite loop safety beltif_doneisNone:_done=set()yieldself_done.add(self.eid)fortrinself._cw.execute('Any T WHERE T is WorkflowTransition, ''T transition_of WF, WF eid %(wf)s',{'wf':self.eid}).entities():iftr.subwf.eidin_done:continueforsubwfintr.subwf.iter_workflows(_done):yieldsubwf# state / transitions accessors ############################################defstate_by_name(self,statename):rset=self._cw.execute('Any S, SN WHERE S name SN, S name %(n)s, ''S state_of WF, WF eid %(wf)s',{'n':statename,'wf':self.eid},'wf')ifrset:returnrset.get_entity(0,0)returnNonedefstate_by_eid(self,eid):rset=self._cw.execute('Any S, SN WHERE S name SN, S eid %(s)s, ''S state_of WF, WF eid %(wf)s',{'s':eid,'wf':self.eid},('wf','s'))ifrset:returnrset.get_entity(0,0)returnNonedeftransition_by_name(self,trname):rset=self._cw.execute('Any T, TN WHERE T name TN, T name %(n)s, ''T transition_of WF, WF eid %(wf)s',{'n':trname,'wf':self.eid},'wf')ifrset:returnrset.get_entity(0,0)returnNonedeftransition_by_eid(self,eid):rset=self._cw.execute('Any T, TN WHERE T name TN, T eid %(t)s, ''T transition_of WF, WF eid %(wf)s',{'t':eid,'wf':self.eid},('wf','t'))ifrset:returnrset.get_entity(0,0)returnNone# wf construction methods ##################################################defadd_state(self,name,initial=False,**kwargs):"""add a state to this workflow"""state=self._cw.create_entity('State',name=unicode(name),**kwargs)self._cw.execute('SET S state_of WF WHERE S eid %(s)s, WF eid %(wf)s',{'s':state.eid,'wf':self.eid},('s','wf'))ifinitial:assertnotself.initial,"Initial state already defined as %s"%self.initialself._cw.execute('SET WF initial_state S ''WHERE S eid %(s)s, WF eid %(wf)s',{'s':state.eid,'wf':self.eid},('s','wf'))returnstatedef_add_transition(self,trtype,name,fromstates,requiredgroups=(),conditions=(),**kwargs):tr=self._cw.create_entity(trtype,name=unicode(name),**kwargs)self._cw.execute('SET T transition_of WF ''WHERE T eid %(t)s, WF eid %(wf)s',{'t':tr.eid,'wf':self.eid},('t','wf'))assertfromstates,fromstatesifnotisinstance(fromstates,(tuple,list)):fromstates=(fromstates,)forstateinfromstates:ifhasattr(state,'eid'):state=state.eidself._cw.execute('SET S allowed_transition T ''WHERE S eid %(s)s, T eid %(t)s',{'s':state,'t':tr.eid},('s','t'))tr.set_permissions(requiredgroups,conditions,reset=False)returntrdefadd_transition(self,name,fromstates,tostate=None,requiredgroups=(),conditions=(),**kwargs):"""add a transition to this workflow from some state(s) to another"""tr=self._add_transition('Transition',name,fromstates,requiredgroups,conditions,**kwargs)iftostateisnotNone:ifhasattr(tostate,'eid'):tostate=tostate.eidself._cw.execute('SET T destination_state S ''WHERE S eid %(s)s, T eid %(t)s',{'t':tr.eid,'s':tostate},('s','t'))returntrdefadd_wftransition(self,name,subworkflow,fromstates,exitpoints=(),requiredgroups=(),conditions=(),**kwargs):"""add a workflow transition to this workflow"""tr=self._add_transition('WorkflowTransition',name,fromstates,requiredgroups,conditions,**kwargs)ifhasattr(subworkflow,'eid'):subworkflow=subworkflow.eidassertself._cw.execute('SET T subworkflow WF WHERE WF eid %(wf)s,T eid %(t)s',{'t':tr.eid,'wf':subworkflow},('wf','t'))forfromstate,tostateinexitpoints:tr.add_exit_point(fromstate,tostate)returntrdefreplace_state(self,todelstate,replacement):"""migration convenience method"""ifnothasattr(todelstate,'eid'):todelstate=self.state_by_name(todelstate)ifnothasattr(replacement,'eid'):replacement=self.state_by_name(replacement)execute=self._cw.executeexecute('SET X in_state S WHERE S eid %(s)s',{'s':todelstate.eid},'s')execute('SET X from_state NS WHERE X to_state OS, OS eid %(os)s, NS eid %(ns)s',{'os':todelstate.eid,'ns':replacement.eid},'s')execute('SET X to_state NS WHERE X to_state OS, OS eid %(os)s, NS eid %(ns)s',{'os':todelstate.eid,'ns':replacement.eid},'s')todelstate.delete()classBaseTransition(AnyEntity):"""customized class for abstract transition provides a specific may_be_fired method to check if the relation may be fired by the logged user """__regid__='BaseTransition'fetch_attrs,fetch_order=fetch_config(['name'])def__init__(self,*args,**kwargs):ifself.__regid__=='BaseTransition':raiseWorkflowException('should not be instantiated')super(BaseTransition,self).__init__(*args,**kwargs)@propertydefworkflow(self):returnself.transition_of[0]defhas_input_state(self,state):ifhasattr(state,'eid'):state=state.eidreturnany(sforsinself.reverse_allowed_transitionifs.eid==state)defmay_be_fired(self,eid):"""return true if the logged user may fire this transition `eid` is the eid of the object on which we may fire the transition """user=self._cw.user# check user is at least in one of the required groups if anygroups=frozenset(g.nameforginself.require_group)ifgroups:matches=user.matching_groups(groups)ifmatches:returnmatchesif'owners'ingroupsanduser.owns(eid):returnTrue# check one of the rql expression conditions matches if anyifself.condition:forrqlexprinself.condition:ifrqlexpr.check_expression(self._cw,eid):returnTrueifself.conditionorgroups:returnFalsereturnTruedefafter_deletion_path(self):"""return (path, parameters) which should be used as redirect information when this entity is being deleted """ifself.transition_of:returnself.transition_of[0].rest_path(),{}returnsuper(BaseTransition,self).after_deletion_path()defset_permissions(self,requiredgroups=(),conditions=(),reset=True):"""set or add (if `reset` is False) groups and conditions for this transition """ifreset:self._cw.execute('DELETE T require_group G WHERE T eid %(x)s',{'x':self.eid},'x')self._cw.execute('DELETE T condition R WHERE T eid %(x)s',{'x':self.eid},'x')forgnameinrequiredgroups:rset=self._cw.execute('SET T require_group G ''WHERE T eid %(x)s, G name %(gn)s',{'x':self.eid,'gn':gname},'x')assertrset,'%s is not a known group'%gnameifisinstance(conditions,basestring):conditions=(conditions,)forexprinconditions:ifisinstance(expr,basestring):kwargs={'expr':unicode(expr)}else:assertisinstance(expr,dict)kwargs=exprkwargs['x']=self.eidkwargs.setdefault('mainvars',u'X')self._cw.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", ''X expression %(expr)s, X mainvars %(mainvars)s, ''T condition X WHERE T eid %(x)s',kwargs,'x')# XXX clear caches?@deprecated('[3.6.1] use set_permission')defset_transition_permissions(self,requiredgroups=(),conditions=(),reset=True):returnself.set_permissions(requiredgroups,conditions,reset)classTransition(BaseTransition):"""customized class for Transition entities"""__regid__='Transition'defdestination(self,entity):try:returnself.destination_state[0]exceptIndexError:returnentity.latest_trinfo().previous_statedefpotential_destinations(self):try:yieldself.destination_state[0]exceptIndexError:forincomingstateinself.reverse_allowed_transition:fortrinincomingstate.reverse_destination_state:forpreviousstateintr.reverse_allowed_transition:yieldpreviousstatedefparent(self):returnself.workflowclassWorkflowTransition(BaseTransition):"""customized class for WorkflowTransition entities"""__regid__='WorkflowTransition'@propertydefsubwf(self):returnself.subworkflow[0]defdestination(self,entity):returnself.subwf.initialdefpotential_destinations(self):yieldself.subwf.initialdefadd_exit_point(self,fromstate,tostate):ifhasattr(fromstate,'eid'):fromstate=fromstate.eidiftostateisNone:self._cw.execute('INSERT SubWorkflowExitPoint X: T subworkflow_exit X, ''X subworkflow_state FS WHERE T eid %(t)s, FS eid %(fs)s',{'t':self.eid,'fs':fromstate},('t','fs'))else:ifhasattr(tostate,'eid'):tostate=tostate.eidself._cw.execute('INSERT SubWorkflowExitPoint X: T subworkflow_exit X, ''X subworkflow_state FS, X destination_state TS ''WHERE T eid %(t)s, FS eid %(fs)s, TS eid %(ts)s',{'t':self.eid,'fs':fromstate,'ts':tostate},('t','fs','ts'))defget_exit_point(self,entity,stateeid):"""if state is an exit point, return its associated destination state"""ifhasattr(stateeid,'eid'):stateeid=stateeid.eidtry:tostateeid=self.exit_points()[stateeid]exceptKeyError:returnNoneiftostateeidisNone:# go back to state from which we've entered the subworkflowreturnentity.subworkflow_input_trinfo().previous_statereturnself._cw.entity_from_eid(tostateeid)@cacheddefexit_points(self):result={}forepinself.subworkflow_exit:result[ep.subwf_state.eid]=ep.destinationandep.destination.eidreturnresultdefclear_all_caches(self):super(WorkflowTransition,self).clear_all_caches()clear_cache(self,'exit_points')classSubWorkflowExitPoint(AnyEntity):"""customized class for SubWorkflowExitPoint entities"""__regid__='SubWorkflowExitPoint'@propertydefsubwf_state(self):returnself.subworkflow_state[0]@propertydefdestination(self):returnself.destination_stateandself.destination_state[0]orNonedefparent(self):returnself.reverse_subworkflow_exit[0]classState(AnyEntity):"""customized class for State entities"""__regid__='State'fetch_attrs,fetch_order=fetch_config(['name'])rest_attr='eid'@propertydefworkflow(self):# take care, may be missing in multi-sources configurationreturnself.state_ofandself.state_of[0]defparent(self):returnself.workflowclassTrInfo(AnyEntity):"""customized class for Transition information entities """__regid__='TrInfo'fetch_attrs,fetch_order=fetch_config(['creation_date','comment'],pclass=None)# don't want modification_date@propertydeffor_entity(self):returnself.wf_info_for[0]@propertydefprevious_state(self):returnself.from_state[0]@propertydefnew_state(self):returnself.to_state[0]@propertydeftransition(self):returnself.by_transitionandself.by_transition[0]orNonedefparent(self):returnself.for_entityclassWorkflowableMixIn(object):"""base mixin providing workflow helper methods for workflowable entities. This mixin will be automatically set on class supporting the 'in_state' relation (which implies supporting 'wf_info_for' as well) """__implements__=(IWorkflowable,)@propertydefmain_workflow(self):"""return current workflow applied to this entity"""ifself.custom_workflow:returnself.custom_workflow[0]returnself.cwetype_workflow()@propertydefcurrent_workflow(self):"""return current workflow applied to this entity"""returnself.current_stateandself.current_state.workfloworself.main_workflow@propertydefcurrent_state(self):"""return current state entity"""returnself.in_stateandself.in_state[0]orNone@propertydefstate(self):"""return current state name"""try:returnself.in_state[0].nameexceptIndexError:self.warning('entity %s has no state',self)returnNone@propertydefprintable_state(self):"""return current state name translated to context's language"""state=self.current_stateifstate:returnself._cw._(state.name)returnu''@propertydefworkflow_history(self):"""return the workflow history for this entity (eg ordered list of TrInfo entities) """returnself.reverse_wf_info_fordeflatest_trinfo(self):"""return the latest transition information for this entity"""try:returnself.reverse_wf_info_for[-1]exceptIndexError:returnNone@cacheddefcwetype_workflow(self):"""return the default workflow for entities of this type"""# XXX CWEType methodwfrset=self._cw.execute('Any WF WHERE ET default_workflow WF, ''ET name %(et)s',{'et':self.__regid__})ifwfrset:returnwfrset.get_entity(0,0)self.warning("can't find any workflow for %s",self.__regid__)returnNonedefpossible_transitions(self,type='normal'):"""generates transition that MAY be fired for the given entity, expected to be in this state used only by the UI """ifself.current_stateisNoneorself.current_workflowisNone:returnrset=self._cw.execute('Any T,TT, TN WHERE S allowed_transition T, S eid %(x)s, ''T type TT, T type %(type)s, ''T name TN, T transition_of WF, WF eid %(wfeid)s',{'x':self.current_state.eid,'type':type,'wfeid':self.current_workflow.eid},'x')fortrinrset.entities():iftr.may_be_fired(self.eid):yieldtrdef_add_trinfo(self,comment,commentformat,treid=None,tseid=None):kwargs={}ifcommentisnotNone:kwargs['comment']=commentifcommentformatisnotNone:kwargs['comment_format']=commentformatkwargs['wf_info_for']=selfiftreidisnotNone:kwargs['by_transition']=self._cw.entity_from_eid(treid)iftseidisnotNone:kwargs['to_state']=self._cw.entity_from_eid(tseid)returnself._cw.create_entity('TrInfo',**kwargs)deffire_transition(self,tr,comment=None,commentformat=None):"""change the entity's state by firing transition of the given name in entity's workflow """assertself.current_workflowifisinstance(tr,basestring):_tr=self.current_workflow.transition_by_name(tr)assert_trisnotNone,'not a %s transition: %s'%(self.__regid__,tr)tr=_trreturnself._add_trinfo(comment,commentformat,tr.eid)defchange_state(self,statename,comment=None,commentformat=None,tr=None):"""change the entity's state to the given state (name or entity) in entity's workflow. This method should only by used by manager to fix an entity's state when their is no matching transition, otherwise fire_transition should be used. """assertself.current_workflowifhasattr(statename,'eid'):stateeid=statename.eidelse:ifnotisinstance(statename,basestring):warn('[3.5] give a state name',DeprecationWarning)state=self.current_workflow.state_by_eid(statename)else:state=self.current_workflow.state_by_name(statename)ifstateisNone:raiseWorkflowException('not a %s state: %s'%(self.__regid__,statename))stateeid=state.eid# XXX try to find matching transition?returnself._add_trinfo(comment,commentformat,trandtr.eid,stateeid)defsubworkflow_input_trinfo(self):"""return the TrInfo which has be recorded when this entity went into the current sub-workflow """ifself.main_workflow.eid==self.current_workflow.eid:return# doesn't make sensesubwfentries=[]fortrinfoinself.workflow_history:if(trinfo.transitionandtrinfo.previous_state.workflow.eid!=trinfo.new_state.workflow.eid):# entering or leaving a subworkflowif(subwfentriesandsubwfentries[-1].new_state.workflow.eid==trinfo.previous_state.workflow.eidandsubwfentries[-1].previous_state.workflow.eid==trinfo.new_state.workflow.eid):# leavedelsubwfentries[-1]else:# entersubwfentries.append(trinfo)ifnotsubwfentries:returnNonereturnsubwfentries[-1]defsubworkflow_input_transition(self):"""return the transition which has went through the current sub-workflow """returngetattr(self.subworkflow_input_trinfo(),'transition',None)defclear_all_caches(self):super(WorkflowableMixIn,self).clear_all_caches()clear_cache(self,'cwetype_workflow')@deprecated('[3.5] get transition from current workflow and use its may_be_fired method')defcan_pass_transition(self,trname):"""return the Transition instance if the current user can fire the transition with the given name, else None """tr=self.current_workflowandself.current_workflow.transition_by_name(trname)iftrandtr.may_be_fired(self.eid):returntr@property@deprecated('[3.5] use printable_state')defdisplayable_state(self):returnself._cw._(self.state)MI_REL_TRIGGERS[('in_state','subject')]=WorkflowableMixIn