[CWEP002] Add schema finalization checks for computed relations (rules)
Related to #3546717
# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.fromcubicwebimportValidationErrorfromcubicweb.devtools.testlibimportCubicWebTCdefadd_wf(shell,etype,name=None,default=False):ifnameisNone:name=etypereturnshell.add_workflow(name,etype,default=default,ensure_workflowable=False)defparse_hist(wfhist):return[(ti.previous_state.name,ti.new_state.name,ti.transitionandti.transition.name,ti.comment)fortiinwfhist]classWorkflowBuildingTC(CubicWebTC):deftest_wf_construction(self):withself.admin_access.shell()asshell:wf=add_wf(shell,'Company')foo=wf.add_state(u'foo',initial=True)bar=wf.add_state(u'bar')self.assertEqual(wf.state_by_name('bar').eid,bar.eid)self.assertEqual(wf.state_by_name('barrr'),None)baz=wf.add_transition(u'baz',(foo,),bar,('managers',))self.assertEqual(wf.transition_by_name('baz').eid,baz.eid)self.assertEqual(len(baz.require_group),1)self.assertEqual(baz.require_group[0].name,'managers')deftest_duplicated_state(self):withself.admin_access.shell()asshell:wf=add_wf(shell,'Company')wf.add_state(u'foo',initial=True)shell.commit()wf.add_state(u'foo')withself.assertRaises(ValidationError)ascm:shell.commit()self.assertEqual({'name-subject':'workflow already has a state of that name'},cm.exception.errors)# no pb if not in the same workflowwf2=add_wf(shell,'Company')foo=wf2.add_state(u'foo',initial=True)shell.commit()# gnark gnarkbar=wf.add_state(u'bar')shell.commit()bar.cw_set(name=u'foo')withself.assertRaises(ValidationError)ascm:shell.commit()self.assertEqual({'name-subject':'workflow already has a state of that name'},cm.exception.errors)deftest_duplicated_transition(self):withself.admin_access.shell()asshell:wf=add_wf(shell,'Company')foo=wf.add_state(u'foo',initial=True)bar=wf.add_state(u'bar')wf.add_transition(u'baz',(foo,),bar,('managers',))wf.add_transition(u'baz',(bar,),foo)withself.assertRaises(ValidationError)ascm:shell.commit()self.assertEqual(cm.exception.errors,{'name-subject':'workflow already has a transition of that name'})# no pb if not in the same workflowwf2=add_wf(shell,'Company')foo=wf.add_state(u'foo',initial=True)bar=wf.add_state(u'bar')wf.add_transition(u'baz',(foo,),bar,('managers',))shell.commit()# gnark gnarkbiz=wf.add_transition(u'biz',(bar,),foo)shell.commit()biz.cw_set(name=u'baz')withself.assertRaises(ValidationError)ascm:shell.commit()self.assertEqual(cm.exception.errors,{'name-subject':'workflow already has a transition of that name'})classWorkflowTC(CubicWebTC):defsetup_database(self):rschema=self.schema['in_state']forrdefinrschema.rdefs.itervalues():self.assertEqual(rdef.cardinality,'1*')withself.admin_access.client_cnx()ascnx:self.member_eid=self.create_user(cnx,'member').eidcnx.commit()deftest_workflow_base(self):withself.admin_access.web_request()asreq:e=self.create_user(req,'toto')iworkflowable=e.cw_adapt_to('IWorkflowable')self.assertEqual(iworkflowable.state,'activated')iworkflowable.change_state('deactivated',u'deactivate 1')req.cnx.commit()iworkflowable.change_state('activated',u'activate 1')req.cnx.commit()iworkflowable.change_state('deactivated',u'deactivate 2')req.cnx.commit()e.cw_clear_relation_cache('wf_info_for','object')self.assertEqual([tr.commentfortrine.reverse_wf_info_for],['deactivate 1','activate 1','deactivate 2'])self.assertEqual(iworkflowable.latest_trinfo().comment,'deactivate 2')deftest_possible_transitions(self):withself.admin_access.web_request()asreq:user=req.execute('CWUser X').get_entity(0,0)iworkflowable=user.cw_adapt_to('IWorkflowable')trs=list(iworkflowable.possible_transitions())self.assertEqual(len(trs),1)self.assertEqual(trs[0].name,u'deactivate')self.assertEqual(trs[0].destination(None).name,u'deactivated')# test a std user get no possible transitionwithself.new_access('member').web_request()asreq:# fetch the entity using the new sessiontrs=list(req.user.cw_adapt_to('IWorkflowable').possible_transitions())self.assertEqual(len(trs),0)def_test_manager_deactivate(self,user):iworkflowable=user.cw_adapt_to('IWorkflowable')user.cw_clear_relation_cache('in_state','subject')self.assertEqual(len(user.in_state),1)self.assertEqual(iworkflowable.state,'deactivated')trinfo=iworkflowable.latest_trinfo()self.assertEqual(trinfo.previous_state.name,'activated')self.assertEqual(trinfo.new_state.name,'deactivated')self.assertEqual(trinfo.comment,'deactivate user')self.assertEqual(trinfo.comment_format,'text/plain')returntrinfodeftest_change_state(self):withself.admin_access.client_cnx()ascnx:user=cnx.useriworkflowable=user.cw_adapt_to('IWorkflowable')iworkflowable.change_state('deactivated',comment=u'deactivate user')trinfo=self._test_manager_deactivate(user)self.assertEqual(trinfo.transition,None)deftest_set_in_state_bad_wf(self):withself.admin_access.shell()asshell:wf=add_wf(shell,'CWUser')s=wf.add_state(u'foo',initial=True)shell.commit()withself.admin_access.repo_cnx()ascnx:withcnx.security_enabled(write=False):withself.assertRaises(ValidationError)ascm:cnx.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',{'x':cnx.user.eid,'s':s.eid})self.assertEqual(cm.exception.errors,{'in_state-subject':"state doesn't belong to entity's workflow. ""You may want to set a custom workflow for this entity first."})deftest_fire_transition(self):withself.admin_access.client_cnx()ascnx:user=cnx.useriworkflowable=user.cw_adapt_to('IWorkflowable')iworkflowable.fire_transition('deactivate',comment=u'deactivate user')user.cw_clear_all_caches()self.assertEqual(iworkflowable.state,'deactivated')self._test_manager_deactivate(user)trinfo=self._test_manager_deactivate(user)self.assertEqual(trinfo.transition.name,'deactivate')deftest_goback_transition(self):withself.admin_access.web_request()asreq:wf=req.user.cw_adapt_to('IWorkflowable').current_workflowasleep=wf.add_state('asleep')wf.add_transition('rest',(wf.state_by_name('activated'),wf.state_by_name('deactivated')),asleep)wf.add_transition('wake up',asleep)user=self.create_user(req,'stduser')iworkflowable=user.cw_adapt_to('IWorkflowable')iworkflowable.fire_transition('rest')req.cnx.commit()iworkflowable.fire_transition('wake up')req.cnx.commit()self.assertEqual(iworkflowable.state,'activated')iworkflowable.fire_transition('deactivate')req.cnx.commit()iworkflowable.fire_transition('rest')req.cnx.commit()iworkflowable.fire_transition('wake up')req.cnx.commit()user.cw_clear_all_caches()self.assertEqual(iworkflowable.state,'deactivated')# XXX test managers can change state without matching transitiondef_test_stduser_deactivate(self):withself.admin_access.repo_cnx()ascnx:self.create_user(cnx,'tutu')withself.new_access('tutu').web_request()asreq:iworkflowable=req.entity_from_eid(self.member_eid).cw_adapt_to('IWorkflowable')withself.assertRaises(ValidationError)ascm:iworkflowable.fire_transition('deactivate')self.assertEqual(cm.exception.errors,{'by_transition-subject':"transition may not be fired"})withself.new_access('member').web_request()asreq:iworkflowable=req.entity_from_eid(self.member_eid).cw_adapt_to('IWorkflowable')iworkflowable.fire_transition('deactivate')req.cnx.commit()withself.assertRaises(ValidationError)ascm:iworkflowable.fire_transition('activate')self.assertEqual(cm.exception.errors,{'by_transition-subject':"transition may not be fired"})deftest_fire_transition_owned_by(self):withself.admin_access.repo_cnx()ascnx:cnx.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", ''X expression "X owned_by U", T condition X ''WHERE T name "deactivate"')cnx.commit()self._test_stduser_deactivate()deftest_fire_transition_has_update_perm(self):withself.admin_access.repo_cnx()ascnx:cnx.execute('INSERT RQLExpression X: X exprtype "ERQLExpression", ''X expression "U has_update_permission X", T condition X ''WHERE T name "deactivate"')cnx.commit()self._test_stduser_deactivate()deftest_swf_base(self):"""subworkflow +-----------+ tr1 +-----------+ | swfstate1 | ------>| swfstate2 | +-----------+ +-----------+ | tr2 +-----------+ `------>| swfstate3 | +-----------+ main workflow +--------+ swftr1 +--------+ | state1 | -------[swfstate2]->| state2 | +--------+ | +--------+ | +--------+ `-[swfstate3]-->| state3 | +--------+ """# sub-workflowwithself.admin_access.shell()asshell:swf=add_wf(shell,'CWGroup',name='subworkflow')swfstate1=swf.add_state(u'swfstate1',initial=True)swfstate2=swf.add_state(u'swfstate2')swfstate3=swf.add_state(u'swfstate3')tr1=swf.add_transition(u'tr1',(swfstate1,),swfstate2)tr2=swf.add_transition(u'tr2',(swfstate1,),swfstate3)# main workflowmwf=add_wf(shell,'CWGroup',name='main workflow',default=True)state1=mwf.add_state(u'state1',initial=True)state2=mwf.add_state(u'state2')state3=mwf.add_state(u'state3')swftr1=mwf.add_wftransition(u'swftr1',swf,state1,[(swfstate2,state2),(swfstate3,state3)])swf.cw_clear_all_caches()self.assertEqual(swftr1.destination(None).eid,swfstate1.eid)# workflows built, begin testwithself.admin_access.web_request()asreq:group=req.create_entity('CWGroup',name=u'grp1')req.cnx.commit()iworkflowable=group.cw_adapt_to('IWorkflowable')self.assertEqual(iworkflowable.current_state.eid,state1.eid)self.assertEqual(iworkflowable.current_workflow.eid,mwf.eid)self.assertEqual(iworkflowable.main_workflow.eid,mwf.eid)self.assertEqual(iworkflowable.subworkflow_input_transition(),None)iworkflowable.fire_transition('swftr1',u'go')req.cnx.commit()group.cw_clear_all_caches()self.assertEqual(iworkflowable.current_state.eid,swfstate1.eid)self.assertEqual(iworkflowable.current_workflow.eid,swf.eid)self.assertEqual(iworkflowable.main_workflow.eid,mwf.eid)self.assertEqual(iworkflowable.subworkflow_input_transition().eid,swftr1.eid)iworkflowable.fire_transition('tr1',u'go')req.cnx.commit()group.cw_clear_all_caches()self.assertEqual(iworkflowable.current_state.eid,state2.eid)self.assertEqual(iworkflowable.current_workflow.eid,mwf.eid)self.assertEqual(iworkflowable.main_workflow.eid,mwf.eid)self.assertEqual(iworkflowable.subworkflow_input_transition(),None)# force back to swfstate1 is impossible since we can't any more find# subworkflow input transitionwithself.assertRaises(ValidationError)ascm:iworkflowable.change_state(swfstate1,u'gadget')self.assertEqual(cm.exception.errors,{'to_state-subject':"state doesn't belong to entity's workflow"})req.cnx.rollback()# force back to state1iworkflowable.change_state('state1',u'gadget')iworkflowable.fire_transition('swftr1',u'au')group.cw_clear_all_caches()iworkflowable.fire_transition('tr2',u'chapeau')req.cnx.commit()group.cw_clear_all_caches()self.assertEqual(iworkflowable.current_state.eid,state3.eid)self.assertEqual(iworkflowable.current_workflow.eid,mwf.eid)self.assertEqual(iworkflowable.main_workflow.eid,mwf.eid)self.assertListEqual(parse_hist(iworkflowable.workflow_history),[('state1','swfstate1','swftr1','go'),('swfstate1','swfstate2','tr1','go'),('swfstate2','state2','swftr1','exiting from subworkflow subworkflow'),('state2','state1',None,'gadget'),('state1','swfstate1','swftr1','au'),('swfstate1','swfstate3','tr2','chapeau'),('swfstate3','state3','swftr1','exiting from subworkflow subworkflow'),])deftest_swf_exit_consistency(self):withself.admin_access.shell()asshell:# sub-workflowswf=add_wf(shell,'CWGroup',name='subworkflow')swfstate1=swf.add_state(u'swfstate1',initial=True)swfstate2=swf.add_state(u'swfstate2')tr1=swf.add_transition(u'tr1',(swfstate1,),swfstate2)# main workflowmwf=add_wf(shell,'CWGroup',name='main workflow',default=True)state1=mwf.add_state(u'state1',initial=True)state2=mwf.add_state(u'state2')state3=mwf.add_state(u'state3')mwf.add_wftransition(u'swftr1',swf,state1,[(swfstate2,state2),(swfstate2,state3)])withself.assertRaises(ValidationError)ascm:shell.commit()self.assertEqual(cm.exception.errors,{'subworkflow_exit-subject':u"can't have multiple exits on the same state"})deftest_swf_fire_in_a_row(self):withself.admin_access.shell()asshell:# sub-workflowsubwf=add_wf(shell,'CWGroup',name='subworkflow')xsigning=subwf.add_state('xsigning',initial=True)xaborted=subwf.add_state('xaborted')xsigned=subwf.add_state('xsigned')xabort=subwf.add_transition('xabort',(xsigning,),xaborted)xsign=subwf.add_transition('xsign',(xsigning,),xsigning)xcomplete=subwf.add_transition('xcomplete',(xsigning,),xsigned,type=u'auto')# main workflowtwf=add_wf(shell,'CWGroup',name='mainwf',default=True)created=twf.add_state(_('created'),initial=True)identified=twf.add_state(_('identified'))released=twf.add_state(_('released'))closed=twf.add_state(_('closed'))twf.add_wftransition(_('identify'),subwf,(created,),[(xsigned,identified),(xaborted,created)])twf.add_wftransition(_('release'),subwf,(identified,),[(xsigned,released),(xaborted,identified)])twf.add_wftransition(_('close'),subwf,(released,),[(xsigned,closed),(xaborted,released)])shell.commit()withself.admin_access.repo_cnx()ascnx:group=cnx.create_entity('CWGroup',name=u'grp1')cnx.commit()iworkflowable=group.cw_adapt_to('IWorkflowable')fortransin('identify','release','close'):iworkflowable.fire_transition(trans)cnx.commit()deftest_swf_magic_tr(self):withself.admin_access.shell()asshell:# sub-workflowsubwf=add_wf(shell,'CWGroup',name='subworkflow')xsigning=subwf.add_state('xsigning',initial=True)xaborted=subwf.add_state('xaborted')xsigned=subwf.add_state('xsigned')xabort=subwf.add_transition('xabort',(xsigning,),xaborted)xsign=subwf.add_transition('xsign',(xsigning,),xsigned)# main workflowtwf=add_wf(shell,'CWGroup',name='mainwf',default=True)created=twf.add_state(_('created'),initial=True)identified=twf.add_state(_('identified'))released=twf.add_state(_('released'))twf.add_wftransition(_('identify'),subwf,created,[(xaborted,None),(xsigned,identified)])twf.add_wftransition(_('release'),subwf,identified,[(xaborted,None)])shell.commit()withself.admin_access.web_request()asreq:group=req.create_entity('CWGroup',name=u'grp1')req.cnx.commit()iworkflowable=group.cw_adapt_to('IWorkflowable')fortrans,nextstatein(('identify','xsigning'),('xabort','created'),('identify','xsigning'),('xsign','identified'),('release','xsigning'),('xabort','identified')):iworkflowable.fire_transition(trans)req.cnx.commit()group.cw_clear_all_caches()self.assertEqual(iworkflowable.state,nextstate)classCustomWorkflowTC(CubicWebTC):defsetup_database(self):withself.admin_access.repo_cnx()ascnx:self.member_eid=self.create_user(cnx,'member').eiddeftest_custom_wf_replace_state_no_history(self):"""member in inital state with no previous history, state is simply redirected when changing workflow """withself.admin_access.shell()asshell:wf=add_wf(shell,'CWUser')wf.add_state('asleep',initial=True)withself.admin_access.web_request()asreq:req.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',{'wf':wf.eid,'x':self.member_eid})member=req.entity_from_eid(self.member_eid)iworkflowable=member.cw_adapt_to('IWorkflowable')self.assertEqual(iworkflowable.state,'activated')# no change before commitreq.cnx.commit()member.cw_clear_all_caches()self.assertEqual(iworkflowable.current_workflow.eid,wf.eid)self.assertEqual(iworkflowable.state,'asleep')self.assertEqual(iworkflowable.workflow_history,())deftest_custom_wf_replace_state_keep_history(self):"""member in inital state with some history, state is redirected and state change is recorded to history """withself.admin_access.web_request()asreq:member=req.entity_from_eid(self.member_eid)iworkflowable=member.cw_adapt_to('IWorkflowable')iworkflowable.fire_transition('deactivate')iworkflowable.fire_transition('activate')req.cnx.commit()withself.admin_access.shell()asshell:wf=add_wf(shell,'CWUser')wf.add_state('asleep',initial=True)shell.rqlexec('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',{'wf':wf.eid,'x':self.member_eid})withself.admin_access.web_request()asreq:member=req.entity_from_eid(self.member_eid)iworkflowable=member.cw_adapt_to('IWorkflowable')self.assertEqual(iworkflowable.current_workflow.eid,wf.eid)self.assertEqual(iworkflowable.state,'asleep')self.assertEqual(parse_hist(iworkflowable.workflow_history),[('activated','deactivated','deactivate',None),('deactivated','activated','activate',None),('activated','asleep',None,'workflow changed to "CWUser"')])deftest_custom_wf_no_initial_state(self):"""try to set a custom workflow which has no initial state"""withself.admin_access.shell()asshell:wf=add_wf(shell,'CWUser')wf.add_state('asleep')shell.rqlexec('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',{'wf':wf.eid,'x':self.member_eid})withself.assertRaises(ValidationError)ascm:shell.commit()self.assertEqual(cm.exception.errors,{'custom_workflow-subject':u'workflow has no initial state'})deftest_custom_wf_bad_etype(self):"""try to set a custom workflow which doesn't apply to entity type"""withself.admin_access.shell()asshell:wf=add_wf(shell,'Company')wf.add_state('asleep',initial=True)shell.rqlexec('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',{'wf':wf.eid,'x':self.member_eid})withself.assertRaises(ValidationError)ascm:shell.commit()self.assertEqual(cm.exception.errors,{'custom_workflow-subject':u"workflow isn't a workflow for this type"})deftest_del_custom_wf(self):"""member in some state shared by the new workflow, nothing has to be done """withself.admin_access.web_request()asreq:member=req.entity_from_eid(self.member_eid)iworkflowable=member.cw_adapt_to('IWorkflowable')iworkflowable.fire_transition('deactivate')req.cnx.commit()withself.admin_access.shell()asshell:wf=add_wf(shell,'CWUser')wf.add_state('asleep',initial=True)shell.rqlexec('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',{'wf':wf.eid,'x':self.member_eid})shell.commit()withself.admin_access.web_request()asreq:req.execute('DELETE X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',{'wf':wf.eid,'x':self.member_eid})member=req.entity_from_eid(self.member_eid)iworkflowable=member.cw_adapt_to('IWorkflowable')self.assertEqual(iworkflowable.state,'asleep')# no change before commitreq.cnx.commit()member.cw_clear_all_caches()self.assertEqual(iworkflowable.current_workflow.name,"default user workflow")self.assertEqual(iworkflowable.state,'activated')self.assertEqual(parse_hist(iworkflowable.workflow_history),[('activated','deactivated','deactivate',None),('deactivated','asleep',None,'workflow changed to "CWUser"'),('asleep','activated',None,'workflow changed to "default user workflow"'),])classAutoTransitionTC(CubicWebTC):defsetup_custom_wf(self):withself.admin_access.shell()asshell:wf=add_wf(shell,'CWUser')asleep=wf.add_state('asleep',initial=True)dead=wf.add_state('dead')wf.add_transition('rest',asleep,asleep)wf.add_transition('sick',asleep,dead,type=u'auto',conditions=({'expr':u'X surname "toto"','mainvars':u'X'},))returnwfdeftest_auto_transition_fired(self):wf=self.setup_custom_wf()withself.admin_access.web_request()asreq:user=self.create_user(req,'member')iworkflowable=user.cw_adapt_to('IWorkflowable')req.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',{'wf':wf.eid,'x':user.eid})req.cnx.commit()user.cw_clear_all_caches()self.assertEqual(iworkflowable.state,'asleep')self.assertEqual([t.namefortiniworkflowable.possible_transitions()],['rest'])iworkflowable.fire_transition('rest')req.cnx.commit()user.cw_clear_all_caches()self.assertEqual(iworkflowable.state,'asleep')self.assertEqual([t.namefortiniworkflowable.possible_transitions()],['rest'])self.assertEqual(parse_hist(iworkflowable.workflow_history),[('asleep','asleep','rest',None)])user.cw_set(surname=u'toto')# fulfill conditionreq.cnx.commit()iworkflowable.fire_transition('rest')req.cnx.commit()user.cw_clear_all_caches()self.assertEqual(iworkflowable.state,'dead')self.assertEqual(parse_hist(iworkflowable.workflow_history),[('asleep','asleep','rest',None),('asleep','asleep','rest',None),('asleep','dead','sick',None),])deftest_auto_transition_custom_initial_state_fired(self):wf=self.setup_custom_wf()withself.admin_access.web_request()asreq:user=self.create_user(req,'member',surname=u'toto')req.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',{'wf':wf.eid,'x':user.eid})req.cnx.commit()iworkflowable=user.cw_adapt_to('IWorkflowable')self.assertEqual(iworkflowable.state,'dead')deftest_auto_transition_initial_state_fired(self):withself.admin_access.web_request()asreq:wf=req.execute('Any WF WHERE ET default_workflow WF, ''ET name %(et)s',{'et':'CWUser'}).get_entity(0,0)dead=wf.add_state('dead')wf.add_transition('sick',wf.state_by_name('activated'),dead,type=u'auto',conditions=({'expr':u'X surname "toto"','mainvars':u'X'},))req.cnx.commit()withself.admin_access.web_request()asreq:user=self.create_user(req,'member',surname=u'toto')req.cnx.commit()iworkflowable=user.cw_adapt_to('IWorkflowable')self.assertEqual(iworkflowable.state,'dead')classWorkflowHooksTC(CubicWebTC):defsetUp(self):CubicWebTC.setUp(self)withself.admin_access.web_request()asreq:self.wf=req.user.cw_adapt_to('IWorkflowable').current_workflowself.s_activated=self.wf.state_by_name('activated').eidself.s_deactivated=self.wf.state_by_name('deactivated').eidself.s_dummy=self.wf.add_state(u'dummy').eidself.wf.add_transition(u'dummy',(self.s_deactivated,),self.s_dummy)ueid=self.create_user(req,'stduser',commit=False).eid# test initial state is setrset=req.execute('Any N WHERE S name N, X in_state S, X eid %(x)s',{'x':ueid})self.assertFalse(rset,rset.rows)req.cnx.commit()initialstate=req.execute('Any N WHERE S name N, X in_state S, X eid %(x)s',{'x':ueid})[0][0]self.assertEqual(initialstate,u'activated')# give access to users group on the user's wf transitions# so we can test wf enforcing on euser (managers don't have anymore this# enforcementreq.execute('SET X require_group G ''WHERE G name "users", X transition_of WF, WF eid %(wf)s',{'wf':self.wf.eid})req.cnx.commit()# XXX currently, we've to rely on hooks to set initial state, or to use execute# def test_initial_state(self):# cnx = self.login('stduser')# cu = cnx.cursor()# self.assertRaises(ValidationError, cu.execute,# 'INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, '# 'X in_state S WHERE S name "deactivated"', {'pwd': 'oops'})# cnx.close()# # though managers can do whatever he want# self.execute('INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, '# 'X in_state S, X in_group G WHERE S name "deactivated", G name "users"', {'pwd': 'oops'})# self.commit()# test that the workflow is correctly enforceddef_cleanup_msg(self,msg):"""remove the variable part of one specific error message"""lmsg=msg.split()lmsg.pop(1)lmsg.pop()return' '.join(lmsg)deftest_transition_checking1(self):withself.new_access('stduser').repo_cnx()ascnx:user=cnx.useriworkflowable=user.cw_adapt_to('IWorkflowable')withself.assertRaises(ValidationError)ascm:iworkflowable.fire_transition('activate')self.assertEqual(self._cleanup_msg(cm.exception.errors['by_transition-subject']),u"transition isn't allowed from")deftest_transition_checking2(self):withself.new_access('stduser').repo_cnx()ascnx:user=cnx.useriworkflowable=user.cw_adapt_to('IWorkflowable')withself.assertRaises(ValidationError)ascm:iworkflowable.fire_transition('dummy')self.assertEqual(self._cleanup_msg(cm.exception.errors['by_transition-subject']),u"transition isn't allowed from")deftest_transition_checking3(self):withself.new_access('stduser').repo_cnx()ascnx:user=cnx.useriworkflowable=user.cw_adapt_to('IWorkflowable')iworkflowable.fire_transition('deactivate')cnx.commit()withself.assertRaises(ValidationError)ascm:iworkflowable.fire_transition('deactivate')self.assertEqual(self._cleanup_msg(cm.exception.errors['by_transition-subject']),u"transition isn't allowed from")cnx.rollback()# get back nowiworkflowable.fire_transition('activate')cnx.commit()if__name__=='__main__':fromlogilab.common.testlibimportunittest_mainunittest_main()