entities/test/unittest_wfobjs.py
branch3.5
changeset 2949 a2aa2c51f3be
parent 2939 a613cc003ab7
child 2954 48507919b6e3
equal deleted inserted replaced
2948:d3cd8bd20ee5 2949:a2aa2c51f3be
     1 from cubicweb.devtools.apptest import EnvBasedTC
     1 from cubicweb.devtools.apptest import EnvBasedTC
     2 from cubicweb import ValidationError
     2 from cubicweb import ValidationError
     3 
     3 
       
     4 def add_wf(self, etype, name=None):
       
     5     if name is None:
       
     6         name = unicode(etype)
       
     7     wf = self.execute('INSERT Workflow X: X name %(n)s', {'n': name}).get_entity(0, 0)
       
     8     self.execute('SET WF workflow_of ET WHERE WF eid %(wf)s, ET name %(et)s',
       
     9                  {'wf': wf.eid, 'et': etype})
       
    10     return wf
       
    11 
       
    12 def parse_hist(wfhist):
       
    13     return [(ti.previous_state.name, ti.new_state.name,
       
    14              ti.transition and ti.transition.name, ti.comment)
       
    15             for ti in wfhist]
       
    16 
       
    17 
     4 class WorkflowBuildingTC(EnvBasedTC):
    18 class WorkflowBuildingTC(EnvBasedTC):
     5 
    19 
     6     def test_wf_construction(self):
    20     def test_wf_construction(self):
     7         wf = self.execute('INSERT Workflow X: X name "test"').get_entity(0, 0)
    21         wf = add_wf(self, 'Company')
     8         self.execute('SET WF workflow_of ET WHERE ET name "Company"')
       
     9         foo = wf.add_state(u'foo', initial=True)
    22         foo = wf.add_state(u'foo', initial=True)
    10         bar = wf.add_state(u'bar')
    23         bar = wf.add_state(u'bar')
    11         self.assertEquals(wf.state_by_name('bar').eid, bar.eid)
    24         self.assertEquals(wf.state_by_name('bar').eid, bar.eid)
    12         self.assertEquals(wf.state_by_name('barrr'), None)
    25         self.assertEquals(wf.state_by_name('barrr'), None)
    13         baz = wf.add_transition(u'baz', (foo,), bar, ('managers',))
    26         baz = wf.add_transition(u'baz', (foo,), bar, ('managers',))
    14         self.assertEquals(wf.transition_by_name('baz').eid, baz.eid)
    27         self.assertEquals(wf.transition_by_name('baz').eid, baz.eid)
    15         self.assertEquals(len(baz.require_group), 1)
    28         self.assertEquals(len(baz.require_group), 1)
    16         self.assertEquals(baz.require_group[0].name, 'managers')
    29         self.assertEquals(baz.require_group[0].name, 'managers')
    17 
    30 
    18     def test_duplicated_state(self):
    31     def test_duplicated_state(self):
    19         wf = self.execute('INSERT Workflow X: X name "test"').get_entity(0, 0)
    32         wf = add_wf(self, 'Company')
    20         self.execute('SET WF workflow_of ET WHERE ET name "Company"')
       
    21         wf.add_state(u'foo', initial=True)
    33         wf.add_state(u'foo', initial=True)
    22         wf.add_state(u'foo')
    34         wf.add_state(u'foo')
    23         ex = self.assertRaises(ValidationError, self.commit)
    35         ex = self.assertRaises(ValidationError, self.commit)
    24         # XXX enhance message
    36         # XXX enhance message
    25         self.assertEquals(ex.errors, {'state_of': 'unique constraint S name N, Y state_of O, Y name N failed'})
    37         self.assertEquals(ex.errors, {'state_of': 'unique constraint S name N, Y state_of O, Y name N failed'})
    26 
    38 
    27     def test_duplicated_transition(self):
    39     def test_duplicated_transition(self):
    28         wf = self.execute('INSERT Workflow X: X name "test"').get_entity(0, 0)
    40         wf = add_wf(self, 'Company')
    29         self.execute('SET WF workflow_of ET WHERE ET name "Company"')
       
    30         foo = wf.add_state(u'foo', initial=True)
    41         foo = wf.add_state(u'foo', initial=True)
    31         bar = wf.add_state(u'bar')
    42         bar = wf.add_state(u'bar')
    32         wf.add_transition(u'baz', (foo,), bar, ('managers',))
    43         wf.add_transition(u'baz', (foo,), bar, ('managers',))
    33         wf.add_transition(u'baz', (bar,), foo)
    44         wf.add_transition(u'baz', (bar,), foo)
    34         ex = self.assertRaises(ValidationError, self.commit)
    45         ex = self.assertRaises(ValidationError, self.commit)
   185         self.assertEquals(transitions[0].name, 'tr2')
   196         self.assertEquals(transitions[0].name, 'tr2')
   186         transitions = list(self.bookmark.possible_transitions())
   197         transitions = list(self.bookmark.possible_transitions())
   187         self.assertEquals(len(transitions), 1)
   198         self.assertEquals(len(transitions), 1)
   188         self.assertEquals(transitions[0].name, 'tr1')
   199         self.assertEquals(transitions[0].name, 'tr1')
   189 
   200 
       
   201 class CustomWorkflowTC(EnvBasedTC):
       
   202 
       
   203     def setup_database(self):
       
   204         self.member = self.create_user('member')
       
   205 
       
   206     def tearDown(self):
       
   207         super(CustomWorkflowTC, self).tearDown()
       
   208         self.execute('DELETE X custom_workflow WF')
       
   209 
       
   210     def test_custom_wf_replace_state_no_history(self):
       
   211         """member in inital state with no previous history, state is simply
       
   212         redirected when changing workflow
       
   213         """
       
   214         wf = add_wf(self, 'CWUser')
       
   215         wf.add_state('asleep', initial=True)
       
   216         self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
       
   217                      {'wf': wf.eid, 'x': self.member.eid})
       
   218         self.member.clear_all_caches()
       
   219         self.assertEquals(self.member.state, 'activated')# no change before commit
       
   220         self.commit()
       
   221         self.member.clear_all_caches()
       
   222         self.assertEquals(self.member.current_workflow.eid, wf.eid)
       
   223         self.assertEquals(self.member.state, 'asleep')
       
   224         self.assertEquals(self.member.workflow_history, [])
       
   225 
       
   226     def test_custom_wf_replace_state_keep_history(self):
       
   227         """member in inital state with some history, state is redirected and
       
   228         state change is recorded to history
       
   229         """
       
   230         self.member.fire_transition('deactivate')
       
   231         self.member.fire_transition('activate')
       
   232         wf = add_wf(self, 'CWUser')
       
   233         wf.add_state('asleep', initial=True)
       
   234         self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
       
   235                      {'wf': wf.eid, 'x': self.member.eid})
       
   236         self.commit()
       
   237         self.member.clear_all_caches()
       
   238         self.assertEquals(self.member.current_workflow.eid, wf.eid)
       
   239         self.assertEquals(self.member.state, 'asleep')
       
   240         self.assertEquals(parse_hist(self.member.workflow_history),
       
   241                           [('activated', 'deactivated', 'deactivate', None),
       
   242                            ('deactivated', 'activated', 'activate', None),
       
   243                            ('activated', 'asleep', None, 'workflow changed to "CWUser"')])
       
   244 
       
   245     def test_custom_wf_shared_state(self):
       
   246         """member in some state shared by the new workflow, nothing has to be
       
   247         done
       
   248         """
       
   249         self.member.fire_transition('deactivate')
       
   250         self.assertEquals(self.member.state, 'deactivated')
       
   251         wf = add_wf(self, 'CWUser')
       
   252         wf.add_state('asleep', initial=True)
       
   253         self.execute('SET S state_of WF WHERE S name "deactivated", WF eid %(wf)s',
       
   254                      {'wf': wf.eid})
       
   255         self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
       
   256                      {'wf': wf.eid, 'x': self.member.eid})
       
   257         self.commit()
       
   258         self.member.clear_all_caches()
       
   259         self.assertEquals(self.member.current_workflow.eid, wf.eid)
       
   260         self.assertEquals(self.member.state, 'deactivated')
       
   261         self.assertEquals(parse_hist(self.member.workflow_history),
       
   262                           [('activated', 'deactivated', 'deactivate', None)])
       
   263 
       
   264     def test_custom_wf_no_initial_state(self):
       
   265         """try to set a custom workflow which has no initial state"""
       
   266         self.member.fire_transition('deactivate')
       
   267         wf = add_wf(self, 'CWUser')
       
   268         wf.add_state('asleep')
       
   269         self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
       
   270                      {'wf': wf.eid, 'x': self.member.eid})
       
   271         ex = self.assertRaises(ValidationError, self.commit)
       
   272         self.assertEquals(ex.errors, {'custom_workflow': u'workflow has no initial state'})
       
   273 
       
   274     def test_custom_wf_bad_etype(self):
       
   275         """try to set a custom workflow which has no initial state"""
       
   276         self.member.fire_transition('deactivate')
       
   277         wf = add_wf(self, 'Company')
       
   278         wf.add_state('asleep', initial=True)
       
   279         self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
       
   280                      {'wf': wf.eid, 'x': self.member.eid})
       
   281         ex = self.assertRaises(ValidationError, self.commit)
       
   282         self.assertEquals(ex.errors, {'custom_workflow': 'constraint S is ET, O workflow_of ET failed'})
       
   283 
       
   284     def test_del_custom_wf(self):
       
   285         """member in some state shared by the new workflow, nothing has to be
       
   286         done
       
   287         """
       
   288         self.member.fire_transition('deactivate')
       
   289         wf = add_wf(self, 'CWUser')
       
   290         wf.add_state('asleep', initial=True)
       
   291         self.execute('SET X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
       
   292                      {'wf': wf.eid, 'x': self.member.eid})
       
   293         self.commit()
       
   294         self.execute('DELETE X custom_workflow WF WHERE X eid %(x)s, WF eid %(wf)s',
       
   295                      {'wf': wf.eid, 'x': self.member.eid})
       
   296         self.member.clear_all_caches()
       
   297         self.assertEquals(self.member.state, 'asleep')# no change before commit
       
   298         self.commit()
       
   299         self.member.clear_all_caches()
       
   300         self.assertEquals(self.member.current_workflow.name, "CWUser workflow")
       
   301         self.assertEquals(self.member.state, 'activated')
       
   302         self.assertEquals(parse_hist(self.member.workflow_history),
       
   303                           [('activated', 'deactivated', 'deactivate', None),
       
   304                            ('deactivated', 'asleep', None, 'workflow changed to "CWUser"'),
       
   305                            ('asleep', 'activated', None, 'workflow changed to "CWUser workflow"'),])
       
   306 
   190 
   307 
   191 from cubicweb.devtools.apptest import RepositoryBasedTC
   308 from cubicweb.devtools.apptest import RepositoryBasedTC
   192 
   309 
   193 class WorkflowHooksTC(RepositoryBasedTC):
   310 class WorkflowHooksTC(RepositoryBasedTC):
   194 
   311