server/test/unittest_hooks.py
changeset 0 b97547f5f1fa
child 1251 af40e615dc89
equal deleted inserted replaced
-1:000000000000 0:b97547f5f1fa
       
     1 # -*- coding: utf-8 -*-
       
     2 """functional tests for core hooks
       
     3 
       
     4 note: most schemahooks.py hooks are actually tested in unittest_migrations.py
       
     5 """
       
     6 
       
     7 from logilab.common.testlib import TestCase, unittest_main
       
     8 from cubicweb.devtools.apptest import RepositoryBasedTC, get_versions
       
     9 
       
    10 from cubicweb.common import ConnectionError, RepositoryError, ValidationError
       
    11 from cubicweb.server.repository import *
       
    12 
       
    13 orig_get_versions = Repository.get_versions
       
    14 
       
    15 def setup_module(*args):
       
    16     Repository.get_versions = get_versions
       
    17 
       
    18 def teardown_module(*args):
       
    19     Repository.get_versions = orig_get_versions
       
    20 
       
    21 
       
    22     
       
    23 class CoreHooksTC(RepositoryBasedTC):
       
    24         
       
    25     def test_delete_internal_entities(self):
       
    26         self.assertRaises(RepositoryError, self.execute,
       
    27                           'DELETE EEType X WHERE X name "EEType"')
       
    28         self.assertRaises(RepositoryError, self.execute,
       
    29                           'DELETE ERType X WHERE X name "relation_type"')
       
    30         self.assertRaises(RepositoryError, self.execute,
       
    31                           'DELETE EGroup X WHERE X name "owners"')
       
    32 
       
    33     def test_delete_required_relations_subject(self):
       
    34         self.execute('INSERT EUser X: X login "toto", X upassword "hop", X in_group Y, X in_state S '
       
    35                      'WHERE Y name "users", S name "activated"')
       
    36         self.commit()
       
    37         self.execute('DELETE X in_group Y WHERE X login "toto", Y name "users"')
       
    38         self.assertRaises(ValidationError, self.commit)
       
    39         self.execute('DELETE X in_group Y WHERE X login "toto"')
       
    40         self.execute('SET X in_group Y WHERE X login "toto", Y name "guests"')
       
    41         self.commit()
       
    42         
       
    43     def test_delete_required_relations_object(self):
       
    44         self.skip('no sample in the schema ! YAGNI ? Kermaat ?')
       
    45     
       
    46     def test_static_vocabulary_check(self):
       
    47         self.assertRaises(ValidationError,
       
    48                           self.execute,
       
    49                           'SET X composite "whatever" WHERE X from_entity FE, FE name "EUser", X relation_type RT, RT name "in_group"')
       
    50     
       
    51     def test_missing_required_relations_subject_inline(self):
       
    52         # missing in_group relation 
       
    53         self.execute('INSERT EUser X: X login "toto", X upassword "hop"')
       
    54         self.assertRaises(ValidationError,
       
    55                           self.commit)
       
    56 
       
    57     def test_delete_if_singlecard1(self):
       
    58         self.assertEquals(self.repo.schema['in_state'].inlined, False)
       
    59         ueid, = self.execute('INSERT EUser X: X login "toto", X upassword "hop", X in_group Y, X in_state S '
       
    60                              'WHERE Y name "users", S name "activated"')[0]
       
    61         self.commit()
       
    62         self.execute('SET X in_state S WHERE S name "deactivated", X eid %(x)s', {'x': ueid})
       
    63         rset = self.execute('Any S WHERE X in_state S, X eid %(x)s', {'x': ueid})
       
    64         self.assertEquals(len(rset), 1)
       
    65         self.assertRaises(Exception, self.execute, 'SET X in_state S WHERE S name "deactivated", X eid %s' % ueid)
       
    66         rset2 = self.execute('Any S WHERE X in_state S, X eid %(x)s', {'x': ueid})
       
    67         self.assertEquals(rset.rows, rset2.rows)
       
    68 
       
    69     def test_inlined(self):
       
    70         self.assertEquals(self.repo.schema['sender'].inlined, True)
       
    71         self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')
       
    72         self.execute('INSERT EmailPart X: X content_format "text/plain", X ordernum 1, X content "this is a test"')
       
    73         eeid = self.execute('INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y, X parts P '
       
    74                             'WHERE Y is EmailAddress, P is EmailPart')[0][0]
       
    75         self.execute('SET X sender Y WHERE X is Email, Y is EmailAddress')
       
    76         rset = self.execute('Any S WHERE X sender S, X eid %s' % eeid)
       
    77         self.assertEquals(len(rset), 1)
       
    78         
       
    79     def test_composite_1(self):
       
    80         self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')
       
    81         self.execute('INSERT EmailPart X: X content_format "text/plain", X ordernum 1, X content "this is a test"')
       
    82         self.execute('INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y, X parts P '
       
    83                      'WHERE Y is EmailAddress, P is EmailPart')
       
    84         self.failUnless(self.execute('Email X WHERE X sender Y'))
       
    85         self.commit()
       
    86         self.execute('DELETE Email X')
       
    87         rset = self.execute('Any X WHERE X is EmailPart')
       
    88         self.assertEquals(len(rset), 1)
       
    89         self.commit()
       
    90         rset = self.execute('Any X WHERE X is EmailPart')
       
    91         self.assertEquals(len(rset), 0)
       
    92             
       
    93     def test_composite_2(self):
       
    94         self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')
       
    95         self.execute('INSERT EmailPart X: X content_format "text/plain", X ordernum 1, X content "this is a test"')
       
    96         self.execute('INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y, X parts P '
       
    97                      'WHERE Y is EmailAddress, P is EmailPart')
       
    98         self.commit()
       
    99         self.execute('DELETE Email X')
       
   100         self.execute('DELETE EmailPart X')
       
   101         self.commit()
       
   102         rset = self.execute('Any X WHERE X is EmailPart')
       
   103         self.assertEquals(len(rset), 0)
       
   104             
       
   105     def test_composite_redirection(self):
       
   106         self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')
       
   107         self.execute('INSERT EmailPart X: X content_format "text/plain", X ordernum 1, X content "this is a test"')
       
   108         self.execute('INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y, X parts P '
       
   109                      'WHERE Y is EmailAddress, P is EmailPart')
       
   110         self.execute('INSERT Email X: X messageid "<2345>", X subject "test2", X sender Y, X recipients Y '
       
   111                      'WHERE Y is EmailAddress')
       
   112         self.commit()
       
   113         self.execute('DELETE X parts Y WHERE X messageid "<1234>"')
       
   114         self.execute('SET X parts Y WHERE X messageid "<2345>"')
       
   115         self.commit()
       
   116         rset = self.execute('Any X WHERE X is EmailPart')
       
   117         self.assertEquals(len(rset), 1)
       
   118         self.assertEquals(rset.get_entity(0, 0).reverse_parts[0].messageid, '<2345>')
       
   119 
       
   120     def test_unsatisfied_constraints(self):
       
   121         self.execute('INSERT ENFRDef X: X from_entity FE, X relation_type RT, X to_entity TE '
       
   122                      'WHERE FE name "Affaire", RT name "concerne", TE name "String"')
       
   123         self.assertRaises(ValidationError,
       
   124                           self.commit)
       
   125 
       
   126 
       
   127     def test_html_tidy_hook(self):
       
   128         entity = self.execute('INSERT Affaire A: A descr_format "text/html", A descr "yo"').get_entity(0, 0)
       
   129         self.assertEquals(entity.descr, u'yo')
       
   130         entity = self.execute('INSERT Affaire A: A descr_format "text/html", A descr "<b>yo"').get_entity(0, 0)
       
   131         self.assertEquals(entity.descr, u'<b>yo</b>')
       
   132         entity = self.execute('INSERT Affaire A: A descr_format "text/html", A descr "<b>yo</b>"').get_entity(0, 0)
       
   133         self.assertEquals(entity.descr, u'<b>yo</b>')
       
   134         entity = self.execute('INSERT Affaire A: A descr_format "text/html", A descr "<b>R&D</b>"').get_entity(0, 0)
       
   135         self.assertEquals(entity.descr, u'<b>R&amp;D</b>')
       
   136         xml = u"<div>c&apos;est <b>l'ét&eacute;"
       
   137         entity = self.execute('INSERT Affaire A: A descr_format "text/html", A descr %(d)s',
       
   138                               {'d': xml}).get_entity(0, 0)
       
   139         self.assertEquals(entity.descr, u"<div>c'est <b>l'été</b></div>")
       
   140 
       
   141     def test_nonregr_html_tidy_hook_no_update(self):
       
   142         entity = self.execute('INSERT Affaire A: A descr_format "text/html", A descr "yo"').get_entity(0, 0)
       
   143         self.assertEquals(entity.descr, u'yo')
       
   144         self.execute('SET A ref "REF" WHERE A eid %s' % entity.eid)
       
   145         entity = self.execute('Any A WHERE A eid %s' % entity.eid).get_entity(0, 0)
       
   146         self.assertEquals(entity.descr, u'yo')
       
   147         self.execute('SET A descr "R&D<p>yo" WHERE A eid %s' % entity.eid)
       
   148         entity = self.execute('Any A WHERE A eid %s' % entity.eid).get_entity(0, 0)
       
   149         self.assertEquals(entity.descr, u'R&amp;D<p>yo</p>')
       
   150         
       
   151 
       
   152         
       
   153 class UserGroupHooksTC(RepositoryBasedTC):
       
   154     
       
   155     def test_user_synchronization(self):
       
   156         self.create_user('toto', password='hop', commit=False)
       
   157         self.assertRaises(AuthenticationError,
       
   158                           self.repo.connect, u'toto', 'hop')
       
   159         self.commit()
       
   160         cnxid = self.repo.connect(u'toto', 'hop')
       
   161         self.failIfEqual(cnxid, self.cnxid)
       
   162         self.execute('DELETE EUser X WHERE X login "toto"')
       
   163         self.repo.execute(cnxid, 'State X')
       
   164         self.commit()
       
   165         self.assertRaises(BadConnectionId,
       
   166                           self.repo.execute, cnxid, 'State X')
       
   167 
       
   168     def test_user_group_synchronization(self):
       
   169         user = self.session.user
       
   170         self.assertEquals(user.groups, set(('managers',)))
       
   171         self.execute('SET X in_group G WHERE X eid %s, G name "guests"' % user.eid)
       
   172         self.assertEquals(user.groups, set(('managers',)))
       
   173         self.commit()
       
   174         self.assertEquals(user.groups, set(('managers', 'guests')))
       
   175         self.execute('DELETE X in_group G WHERE X eid %s, G name "guests"' % user.eid)
       
   176         self.assertEquals(user.groups, set(('managers', 'guests')))
       
   177         self.commit()
       
   178         self.assertEquals(user.groups, set(('managers',)))
       
   179 
       
   180     def test_user_composite_owner(self):
       
   181         ueid = self.create_user('toto')
       
   182         # composite of euser should be owned by the euser regardless of who created it
       
   183         self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", U use_email X '
       
   184                      'WHERE U login "toto"')
       
   185         self.commit()
       
   186         self.assertEquals(self.execute('Any A WHERE X owned_by U, U use_email X,'
       
   187                                        'U login "toto", X address A')[0][0],
       
   188                           'toto@logilab.fr')
       
   189 
       
   190     def test_no_created_by_on_deleted_entity(self):
       
   191         eid = self.execute('INSERT EmailAddress X: X address "toto@logilab.fr"')[0][0]
       
   192         self.execute('DELETE EmailAddress X WHERE X eid %s' % eid)
       
   193         self.commit()
       
   194         self.failIf(self.execute('Any X WHERE X created_by Y, X eid >= %(x)s', {'x': eid}))
       
   195         
       
   196 class EPropertyHooksTC(RepositoryBasedTC):
       
   197     
       
   198     def test_unexistant_eproperty(self):
       
   199         ex = self.assertRaises(ValidationError,
       
   200                           self.execute, 'INSERT EProperty X: X pkey "bla.bla", X value "hop", X for_user U')
       
   201         self.assertEquals(ex.errors, {'pkey': 'unknown property key'})
       
   202         ex = self.assertRaises(ValidationError,
       
   203                           self.execute, 'INSERT EProperty X: X pkey "bla.bla", X value "hop"')
       
   204         self.assertEquals(ex.errors, {'pkey': 'unknown property key'})
       
   205         
       
   206     def test_site_wide_eproperty(self):
       
   207         ex = self.assertRaises(ValidationError,
       
   208                                self.execute, 'INSERT EProperty X: X pkey "ui.site-title", X value "hop", X for_user U')
       
   209         self.assertEquals(ex.errors, {'for_user': "site-wide property can't be set for user"})
       
   210         
       
   211     def test_bad_type_eproperty(self):
       
   212         ex = self.assertRaises(ValidationError,
       
   213                                self.execute, 'INSERT EProperty X: X pkey "ui.language", X value "hop", X for_user U')
       
   214         self.assertEquals(ex.errors, {'value': u'unauthorized value'})
       
   215         ex = self.assertRaises(ValidationError,
       
   216                           self.execute, 'INSERT EProperty X: X pkey "ui.language", X value "hop"')
       
   217         self.assertEquals(ex.errors, {'value': u'unauthorized value'})
       
   218         
       
   219         
       
   220 class SchemaHooksTC(RepositoryBasedTC):
       
   221         
       
   222     def test_duplicate_etype_error(self):
       
   223         # check we can't add a EEType or ERType entity if it already exists one
       
   224         # with the same name
       
   225         #
       
   226         # according to hook order, we'll get a repository or validation error
       
   227         self.assertRaises((ValidationError, RepositoryError),
       
   228                           self.execute, 'INSERT EEType X: X name "Societe"')
       
   229         self.assertRaises((ValidationError, RepositoryError),
       
   230                           self.execute, 'INSERT ERType X: X name "in_group"')
       
   231         
       
   232     def test_validation_unique_constraint(self):
       
   233         self.assertRaises(ValidationError,
       
   234                           self.execute, 'INSERT EUser X: X login "admin"')
       
   235         try:
       
   236             self.execute('INSERT EUser X: X login "admin"')
       
   237         except ValidationError, ex:
       
   238             self.assertIsInstance(ex.entity, int)
       
   239             self.assertEquals(ex.errors, {'login': 'the value "admin" is already used, use another one'})
       
   240 
       
   241 
       
   242 class SchemaModificationHooksTC(RepositoryBasedTC):
       
   243     copy_schema = True
       
   244 
       
   245     def setUp(self):
       
   246         if not hasattr(self, '_repo'):
       
   247             # first initialization
       
   248             repo = self.repo # set by the RepositoryBasedTC metaclass
       
   249             # force to read schema from the database
       
   250             repo.config._cubes = None
       
   251             repo.fill_schema()
       
   252         RepositoryBasedTC.setUp(self)
       
   253             
       
   254     def test_base(self):
       
   255         schema = self.repo.schema
       
   256         dbhelper = self.session.pool.source('system').dbhelper    
       
   257         sqlcursor = self.session.pool['system']
       
   258         self.failIf(schema.has_entity('Societe2'))
       
   259         self.failIf(schema.has_entity('concerne2'))
       
   260         # schema should be update on insertion (after commit)
       
   261         self.execute('INSERT EEType X: X name "Societe2", X description "", X meta FALSE, X final FALSE')
       
   262         self.execute('INSERT ERType X: X name "concerne2", X description "", X meta FALSE, X final FALSE, X symetric FALSE')
       
   263         self.failIf(schema.has_entity('Societe2'))
       
   264         self.failIf(schema.has_entity('concerne2'))
       
   265         self.execute('SET X read_permission G WHERE X is EEType, X name "Societe2", G is EGroup')
       
   266         self.execute('SET X read_permission G WHERE X is ERType, X name "concerne2", G is EGroup')
       
   267         self.execute('SET X add_permission G WHERE X is EEType, X name "Societe2", G is EGroup, G name "managers"')
       
   268         self.execute('SET X add_permission G WHERE X is ERType, X name "concerne2", G is EGroup, G name "managers"')
       
   269         self.execute('SET X delete_permission G WHERE X is EEType, X name "Societe2", G is EGroup, G name "owners"')
       
   270         self.execute('SET X delete_permission G WHERE X is ERType, X name "concerne2", G is EGroup, G name "owners"')
       
   271         # have to commit before adding definition relations
       
   272         self.commit()
       
   273         self.failUnless(schema.has_entity('Societe2'))
       
   274         self.failUnless(schema.has_relation('concerne2'))
       
   275         self.execute('INSERT EFRDef X: X cardinality "11", X defaultval "noname", X indexed TRUE, X relation_type RT, X from_entity E, X to_entity F '
       
   276                      'WHERE RT name "nom", E name "Societe2", F name "String"')
       
   277         concerne2_rdef_eid = self.execute(
       
   278             'INSERT ENFRDef X: X cardinality "**", X relation_type RT, X from_entity E, X to_entity E '
       
   279             'WHERE RT name "concerne2", E name "Societe2"')[0][0]
       
   280         self.execute('INSERT ENFRDef X: X cardinality "?*", X relation_type RT, X from_entity E, X to_entity C '
       
   281                      'WHERE RT name "comments", E name "Societe2", C name "Comment"')
       
   282         self.failIf('nom' in schema['Societe2'].subject_relations())
       
   283         self.failIf('concerne2' in schema['Societe2'].subject_relations())
       
   284         self.failIf(dbhelper.index_exists(sqlcursor, 'Societe2', 'nom'))
       
   285         self.commit()
       
   286         self.failUnless('nom' in schema['Societe2'].subject_relations())
       
   287         self.failUnless('concerne2' in schema['Societe2'].subject_relations())
       
   288         self.failUnless(dbhelper.index_exists(sqlcursor, 'Societe2', 'nom'))
       
   289         # now we should be able to insert and query Societe2
       
   290         s2eid = self.execute('INSERT Societe2 X: X nom "logilab"')[0][0]
       
   291         self.execute('Societe2 X WHERE X nom "logilab"')
       
   292         self.execute('SET X concerne2 X WHERE X nom "logilab"')
       
   293         rset = self.execute('Any X WHERE X concerne2 Y')
       
   294         self.assertEquals(rset.rows, [[s2eid]])
       
   295         # check that when a relation definition is deleted, existing relations are deleted
       
   296         self.execute('INSERT ENFRDef X: X cardinality "**", X relation_type RT, X from_entity E, X to_entity E '
       
   297                      'WHERE RT name "concerne2", E name "Societe"')
       
   298         self.commit()
       
   299         self.execute('DELETE ENFRDef X WHERE X eid %(x)s', {'x': concerne2_rdef_eid}, 'x')
       
   300         self.commit()
       
   301         self.failUnless('concerne2' in schema['Societe'].subject_relations())
       
   302         self.failIf('concerne2' in schema['Societe2'].subject_relations())
       
   303         self.failIf(self.execute('Any X WHERE X concerne2 Y'))
       
   304         # schema should be cleaned on delete (after commit)
       
   305         self.execute('DELETE EEType X WHERE X name "Societe2"')
       
   306         self.execute('DELETE ERType X WHERE X name "concerne2"')
       
   307         self.failUnless(dbhelper.index_exists(sqlcursor, 'Societe2', 'nom'))
       
   308         self.failUnless(schema.has_entity('Societe2'))
       
   309         self.failUnless(schema.has_relation('concerne2'))
       
   310         self.commit()
       
   311         self.failIf(dbhelper.index_exists(sqlcursor, 'Societe2', 'nom'))
       
   312         self.failIf(schema.has_entity('Societe2'))
       
   313         self.failIf(schema.has_entity('concerne2'))
       
   314 
       
   315     def test_is_instance_of_insertions(self):
       
   316         seid = self.execute('INSERT SubDivision S: S nom "subdiv"')[0][0]
       
   317         is_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is ET, ET name ETN' % seid)]
       
   318         self.assertEquals(is_etypes, ['SubDivision'])
       
   319         instanceof_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is_instance_of ET, ET name ETN' % seid)]
       
   320         self.assertEquals(sorted(instanceof_etypes), ['Division', 'Societe', 'SubDivision'])
       
   321         snames = [name for name, in self.execute('Any N WHERE S is Societe, S nom N')]
       
   322         self.failIf('subdiv' in snames)
       
   323         snames = [name for name, in self.execute('Any N WHERE S is Division, S nom N')]
       
   324         self.failIf('subdiv' in snames)
       
   325         snames = [name for name, in self.execute('Any N WHERE S is_instance_of Societe, S nom N')]
       
   326         self.failUnless('subdiv' in snames)
       
   327         snames = [name for name, in self.execute('Any N WHERE S is_instance_of Division, S nom N')]
       
   328         self.failUnless('subdiv' in snames)
       
   329         
       
   330         
       
   331     def test_perms_synchronization_1(self):
       
   332         schema = self.repo.schema
       
   333         self.assertEquals(schema['EUser'].get_groups('read'), set(('managers', 'users')))
       
   334         self.failUnless(self.execute('Any X, Y WHERE X is EEType, X name "EUser", Y is EGroup, Y name "users"')[0])
       
   335         self.execute('DELETE X read_permission Y WHERE X is EEType, X name "EUser", Y name "users"')
       
   336         self.assertEquals(schema['EUser'].get_groups('read'), set(('managers', 'users', )))
       
   337         self.commit()
       
   338         self.assertEquals(schema['EUser'].get_groups('read'), set(('managers', )))
       
   339         self.execute('SET X read_permission Y WHERE X is EEType, X name "EUser", Y name "users"')
       
   340         self.commit()
       
   341         self.assertEquals(schema['EUser'].get_groups('read'), set(('managers', 'users',)))
       
   342 
       
   343     def test_perms_synchronization_2(self):
       
   344         schema = self.repo.schema['in_group']
       
   345         self.assertEquals(schema.get_groups('read'), set(('managers', 'users', 'guests')))
       
   346         self.execute('DELETE X read_permission Y WHERE X is ERType, X name "in_group", Y name "guests"')
       
   347         self.assertEquals(schema.get_groups('read'), set(('managers', 'users', 'guests')))
       
   348         self.commit()
       
   349         self.assertEquals(schema.get_groups('read'), set(('managers', 'users')))
       
   350         self.execute('SET X read_permission Y WHERE X is ERType, X name "in_group", Y name "guests"')
       
   351         self.assertEquals(schema.get_groups('read'), set(('managers', 'users')))
       
   352         self.commit()
       
   353         self.assertEquals(schema.get_groups('read'), set(('managers', 'users', 'guests')))
       
   354 
       
   355     def test_nonregr_user_edit_itself(self):
       
   356         ueid = self.session.user.eid
       
   357         groupeids = [eid for eid, in self.execute('EGroup G WHERE G name in ("managers", "users")')]
       
   358         self.execute('DELETE X in_group Y WHERE X eid %s' % ueid)
       
   359         self.execute('SET X surname "toto" WHERE X eid %s' % ueid)
       
   360         self.execute('SET X in_group Y WHERE X eid %s, Y name "managers"' % ueid)
       
   361         self.commit()
       
   362         eeid = self.execute('Any X WHERE X is EEType, X name "EEType"')[0][0]
       
   363         self.execute('DELETE X read_permission Y WHERE X eid %s' % eeid)
       
   364         self.execute('SET X final FALSE WHERE X eid %s' % eeid)
       
   365         self.execute('SET X read_permission Y WHERE X eid %s, Y eid in (%s, %s)'
       
   366                      % (eeid, groupeids[0], groupeids[1]))
       
   367         self.commit()
       
   368         self.execute('Any X WHERE X is EEType, X name "EEType"')
       
   369 
       
   370     # schema modification hooks tests #########################################
       
   371     
       
   372     def test_uninline_relation(self):
       
   373         dbhelper = self.session.pool.source('system').dbhelper    
       
   374         sqlcursor = self.session.pool['system']
       
   375         # Personne inline2 Affaire inline
       
   376         # insert a person without inline2 relation (not mandatory)
       
   377         self.execute('INSERT Personne X: X nom "toto"')
       
   378         peid = self.execute('INSERT Personne X: X nom "tutu"')[0][0]
       
   379         aeid = self.execute('INSERT Affaire X: X ref "tata"')[0][0]
       
   380         self.execute('SET X inline2 Y WHERE X eid %(x)s, Y eid %(y)s', {'x': peid, 'y': aeid})
       
   381         self.failUnless(self.schema['inline2'].inlined)
       
   382         try:
       
   383             try:
       
   384                 self.execute('SET X inlined FALSE WHERE X name "inline2"')
       
   385                 self.failUnless(self.schema['inline2'].inlined)
       
   386                 self.commit()
       
   387                 self.failIf(self.schema['inline2'].inlined)
       
   388                 self.failIf(dbhelper.index_exists(sqlcursor, 'Personne', 'inline2'))
       
   389                 rset = self.execute('Any X, Y WHERE X inline2 Y')
       
   390                 self.assertEquals(len(rset), 1)
       
   391                 self.assertEquals(rset.rows[0], [peid, aeid])
       
   392             except:
       
   393                 import traceback
       
   394                 traceback.print_exc()
       
   395                 raise
       
   396         finally:
       
   397             self.execute('SET X inlined TRUE WHERE X name "inline2"')
       
   398             self.failIf(self.schema['inline2'].inlined)
       
   399             self.commit()
       
   400             self.failUnless(self.schema['inline2'].inlined)
       
   401             self.failUnless(dbhelper.index_exists(sqlcursor, 'Personne', 'inline2'))
       
   402             rset = self.execute('Any X, Y WHERE X inline2 Y')
       
   403             self.assertEquals(len(rset), 1)
       
   404             self.assertEquals(rset.rows[0], [peid, aeid])
       
   405 
       
   406     def test_indexed_change(self):
       
   407         dbhelper = self.session.pool.source('system').dbhelper    
       
   408         sqlcursor = self.session.pool['system']
       
   409         try:
       
   410             self.execute('SET X indexed TRUE WHERE X relation_type R, R name "sujet"')
       
   411             self.failIf(self.schema['sujet'].rproperty('Affaire', 'String', 'indexed'))
       
   412             self.failIf(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet'))
       
   413             self.commit()
       
   414             self.failUnless(self.schema['sujet'].rproperty('Affaire', 'String', 'indexed'))
       
   415             self.failUnless(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet'))
       
   416         finally:
       
   417             self.execute('SET X indexed FALSE WHERE X relation_type R, R name "sujet"')
       
   418             self.failUnless(self.schema['sujet'].rproperty('Affaire', 'String', 'indexed'))
       
   419             self.failUnless(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet'))
       
   420             self.commit()
       
   421             self.failIf(self.schema['sujet'].rproperty('Affaire', 'String', 'indexed'))
       
   422             self.failIf(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet'))
       
   423 
       
   424     def test_unique_change(self):
       
   425         dbhelper = self.session.pool.source('system').dbhelper    
       
   426         sqlcursor = self.session.pool['system']
       
   427         try:
       
   428             self.execute('INSERT EConstraint X: X cstrtype CT, DEF constrained_by X '
       
   429                          'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,'
       
   430                          'RT name "sujet", E name "Affaire"')
       
   431             self.failIf(self.schema['Affaire'].has_unique_values('sujet'))
       
   432             self.failIf(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet', unique=True))
       
   433             self.commit()
       
   434             self.failUnless(self.schema['Affaire'].has_unique_values('sujet'))
       
   435             self.failUnless(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet', unique=True))
       
   436         finally:
       
   437             self.execute('DELETE DEF constrained_by X WHERE X cstrtype CT, '
       
   438                          'CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,'
       
   439                          'RT name "sujet", E name "Affaire"')
       
   440             self.failUnless(self.schema['Affaire'].has_unique_values('sujet'))
       
   441             self.failUnless(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet', unique=True))
       
   442             self.commit()
       
   443             self.failIf(self.schema['Affaire'].has_unique_values('sujet'))
       
   444             self.failIf(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet', unique=True))
       
   445         
       
   446 
       
   447 class WorkflowHooksTC(RepositoryBasedTC):
       
   448 
       
   449     def setUp(self):
       
   450         RepositoryBasedTC.setUp(self)
       
   451         self.s_activated = self.execute('State X WHERE X name "activated"')[0][0]
       
   452         self.s_deactivated = self.execute('State X WHERE X name "deactivated"')[0][0]
       
   453         self.s_dummy = self.execute('INSERT State X: X name "dummy", X state_of E WHERE E name "EUser"')[0][0]
       
   454         self.create_user('stduser')
       
   455         # give access to users group on the user's wf transitions
       
   456         # so we can test wf enforcing on euser (managers don't have anymore this
       
   457         # enforcement
       
   458         self.execute('SET X require_group G WHERE G name "users", X transition_of ET, ET name "EUser"')
       
   459         self.commit()
       
   460         
       
   461     def tearDown(self):
       
   462         self.execute('DELETE X require_group G WHERE G name "users", X transition_of ET, ET name "EUser"')
       
   463         self.commit()
       
   464         RepositoryBasedTC.tearDown(self)
       
   465 
       
   466     def test_set_initial_state(self):
       
   467         ueid = self.execute('INSERT EUser E: E login "x", E upassword "x", E in_group G '
       
   468                             'WHERE G name "users"')[0][0]
       
   469         self.failIf(self.execute('Any N WHERE S name N, X in_state S, X eid %(x)s',
       
   470                                  {'x' : ueid}))
       
   471         self.commit()
       
   472         initialstate = self.execute('Any N WHERE S name N, X in_state S, X eid %(x)s',
       
   473                                     {'x' : ueid})[0][0]
       
   474         self.assertEquals(initialstate, u'activated')
       
   475         
       
   476     def test_initial_state(self):
       
   477         cnx = self.login('stduser')
       
   478         cu = cnx.cursor()
       
   479         self.assertRaises(ValidationError, cu.execute,
       
   480                           'INSERT EUser X: X login "badaboum", X upassword %(pwd)s, '
       
   481                           'X in_state S WHERE S name "deactivated"', {'pwd': 'oops'})
       
   482         cnx.close()
       
   483         # though managers can do whatever he want
       
   484         self.execute('INSERT EUser X: X login "badaboum", X upassword %(pwd)s, '
       
   485                      'X in_state S, X in_group G WHERE S name "deactivated", G name "users"', {'pwd': 'oops'})
       
   486         self.commit()
       
   487         
       
   488     # test that the workflow is correctly enforced
       
   489     def test_transition_checking1(self):
       
   490         cnx = self.login('stduser')
       
   491         cu = cnx.cursor()
       
   492         ueid = cnx.user(self.current_session()).eid
       
   493         self.assertRaises(ValidationError,
       
   494                           cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
       
   495                           {'x': ueid, 's': self.s_activated}, 'x')
       
   496         cnx.close()
       
   497         
       
   498     def test_transition_checking2(self):
       
   499         cnx = self.login('stduser')
       
   500         cu = cnx.cursor()
       
   501         ueid = cnx.user(self.current_session()).eid
       
   502         self.assertRaises(ValidationError,
       
   503                           cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
       
   504                           {'x': ueid, 's': self.s_dummy}, 'x')
       
   505         cnx.close()
       
   506         
       
   507     def test_transition_checking3(self):
       
   508         cnx = self.login('stduser')
       
   509         cu = cnx.cursor()
       
   510         ueid = cnx.user(self.current_session()).eid
       
   511         cu.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
       
   512                       {'x': ueid, 's': self.s_deactivated}, 'x')
       
   513         cnx.commit()
       
   514         self.assertRaises(ValidationError,
       
   515                           cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
       
   516                           {'x': ueid, 's': self.s_deactivated}, 'x')
       
   517         # get back now
       
   518         cu.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
       
   519                       {'x': ueid, 's': self.s_activated}, 'x')
       
   520         cnx.commit()
       
   521         cnx.close()
       
   522         
       
   523     def test_transition_checking4(self):
       
   524         cnx = self.login('stduser')
       
   525         cu = cnx.cursor()
       
   526         ueid = cnx.user(self.current_session()).eid
       
   527         cu.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
       
   528                    {'x': ueid, 's': self.s_deactivated}, 'x')
       
   529         cnx.commit()
       
   530         self.assertRaises(ValidationError,
       
   531                           cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
       
   532                           {'x': ueid, 's': self.s_dummy}, 'x')
       
   533         # get back now
       
   534         cu.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
       
   535                       {'x': ueid, 's': self.s_activated}, 'x')
       
   536         cnx.commit()
       
   537         cnx.close()
       
   538 
       
   539     def test_transition_information(self):
       
   540         ueid = self.session.user.eid
       
   541         self.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
       
   542                       {'x': ueid, 's': self.s_deactivated}, 'x')
       
   543         self.commit()
       
   544         rset = self.execute('TrInfo T ORDERBY T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid})
       
   545         self.assertEquals(len(rset), 2)
       
   546         tr = rset.get_entity(1, 0)
       
   547         #tr.complete()
       
   548         self.assertEquals(tr.comment, None)
       
   549         self.assertEquals(tr.from_state[0].eid, self.s_activated)
       
   550         self.assertEquals(tr.to_state[0].eid, self.s_deactivated)
       
   551         
       
   552         self.session.set_shared_data('trcomment', u'il est pas sage celui-la')
       
   553         self.session.set_shared_data('trcommentformat', u'text/plain')
       
   554         self.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
       
   555                      {'x': ueid, 's': self.s_activated}, 'x')
       
   556         self.commit()
       
   557         rset = self.execute('TrInfo T ORDERBY T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid})
       
   558         self.assertEquals(len(rset), 3)
       
   559         tr = rset.get_entity(2, 0)
       
   560         #tr.complete()
       
   561         self.assertEquals(tr.comment, u'il est pas sage celui-la')
       
   562         self.assertEquals(tr.comment_format, u'text/plain')
       
   563         self.assertEquals(tr.from_state[0].eid, self.s_deactivated)
       
   564         self.assertEquals(tr.to_state[0].eid, self.s_activated)
       
   565         self.assertEquals(tr.owned_by[0].login, 'admin')
       
   566 
       
   567     def test_transition_information_on_creation(self):
       
   568         ueid = self.create_user('toto')
       
   569         rset = self.execute('TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid})
       
   570         self.assertEquals(len(rset), 1)
       
   571         tr = rset.get_entity(0, 0)
       
   572         #tr.complete()
       
   573         self.assertEquals(tr.comment, None)
       
   574         self.assertEquals(tr.from_state, [])
       
   575         self.assertEquals(tr.to_state[0].eid, self.s_activated)
       
   576 
       
   577     def test_std_users_can_create_trinfo(self):
       
   578         self.create_user('toto')
       
   579         cnx = self.login('toto')
       
   580         cu = cnx.cursor()
       
   581         self.failUnless(cu.execute("INSERT Note X: X type 'a', X in_state S WHERE S name 'todo'"))
       
   582         cnx.commit()
       
   583     
       
   584 if __name__ == '__main__':
       
   585     unittest_main()