server/test/unittest_hooks.py
branchtls-sprint
changeset 1263 01152fffd593
parent 1251 af40e615dc89
child 1398 5fe84a5f7035
equal deleted inserted replaced
1246:76b3cd5d4f31 1263:01152fffd593
     5 """
     5 """
     6 
     6 
     7 from logilab.common.testlib import TestCase, unittest_main
     7 from logilab.common.testlib import TestCase, unittest_main
     8 from cubicweb.devtools.apptest import RepositoryBasedTC, get_versions
     8 from cubicweb.devtools.apptest import RepositoryBasedTC, get_versions
     9 
     9 
    10 from cubicweb.common import ConnectionError, RepositoryError, ValidationError
    10 from cubicweb import ConnectionError, RepositoryError, ValidationError, AuthenticationError, BadConnectionId
    11 from cubicweb.server.repository import *
    11 from cubicweb.server.sqlutils import SQL_PREFIX
       
    12 from cubicweb.server.repository import Repository
    12 
    13 
    13 orig_get_versions = Repository.get_versions
    14 orig_get_versions = Repository.get_versions
    14 
    15 
    15 def setup_module(*args):
    16 def setup_module(*args):
    16     Repository.get_versions = get_versions
    17     Repository.get_versions = get_versions
   248             repo = self.repo # set by the RepositoryBasedTC metaclass
   249             repo = self.repo # set by the RepositoryBasedTC metaclass
   249             # force to read schema from the database
   250             # force to read schema from the database
   250             repo.config._cubes = None
   251             repo.config._cubes = None
   251             repo.fill_schema()
   252             repo.fill_schema()
   252         RepositoryBasedTC.setUp(self)
   253         RepositoryBasedTC.setUp(self)
   253             
   254 
       
   255     def index_exists(self, etype, attr, unique=False):
       
   256         dbhelper = self.session.pool.source('system').dbhelper    
       
   257         sqlcursor = self.session.pool['system']
       
   258         return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique)
       
   259         
   254     def test_base(self):
   260     def test_base(self):
   255         schema = self.repo.schema
   261         schema = self.repo.schema
   256         dbhelper = self.session.pool.source('system').dbhelper    
   262         dbhelper = self.session.pool.source('system').dbhelper    
   257         sqlcursor = self.session.pool['system']
   263         sqlcursor = self.session.pool['system']
   258         self.failIf(schema.has_entity('Societe2'))
   264         self.failIf(schema.has_entity('Societe2'))
   279             'WHERE RT name "concerne2", E name "Societe2"')[0][0]
   285             'WHERE RT name "concerne2", E name "Societe2"')[0][0]
   280         self.execute('INSERT ENFRDef X: X cardinality "?*", X relation_type RT, X from_entity E, X to_entity C '
   286         self.execute('INSERT ENFRDef X: X cardinality "?*", X relation_type RT, X from_entity E, X to_entity C '
   281                      'WHERE RT name "comments", E name "Societe2", C name "Comment"')
   287                      'WHERE RT name "comments", E name "Societe2", C name "Comment"')
   282         self.failIf('nom' in schema['Societe2'].subject_relations())
   288         self.failIf('nom' in schema['Societe2'].subject_relations())
   283         self.failIf('concerne2' in schema['Societe2'].subject_relations())
   289         self.failIf('concerne2' in schema['Societe2'].subject_relations())
   284         self.failIf(dbhelper.index_exists(sqlcursor, 'Societe2', 'nom'))
   290         self.failIf(self.index_exists('Societe2', 'nom'))
   285         self.commit()
   291         self.commit()
   286         self.failUnless('nom' in schema['Societe2'].subject_relations())
   292         self.failUnless('nom' in schema['Societe2'].subject_relations())
   287         self.failUnless('concerne2' in schema['Societe2'].subject_relations())
   293         self.failUnless('concerne2' in schema['Societe2'].subject_relations())
   288         self.failUnless(dbhelper.index_exists(sqlcursor, 'Societe2', 'nom'))
   294         self.failUnless(self.index_exists('Societe2', 'nom'))
   289         # now we should be able to insert and query Societe2
   295         # now we should be able to insert and query Societe2
   290         s2eid = self.execute('INSERT Societe2 X: X nom "logilab"')[0][0]
   296         s2eid = self.execute('INSERT Societe2 X: X nom "logilab"')[0][0]
   291         self.execute('Societe2 X WHERE X nom "logilab"')
   297         self.execute('Societe2 X WHERE X nom "logilab"')
   292         self.execute('SET X concerne2 X WHERE X nom "logilab"')
   298         self.execute('SET X concerne2 X WHERE X nom "logilab"')
   293         rset = self.execute('Any X WHERE X concerne2 Y')
   299         rset = self.execute('Any X WHERE X concerne2 Y')
   302         self.failIf('concerne2' in schema['Societe2'].subject_relations())
   308         self.failIf('concerne2' in schema['Societe2'].subject_relations())
   303         self.failIf(self.execute('Any X WHERE X concerne2 Y'))
   309         self.failIf(self.execute('Any X WHERE X concerne2 Y'))
   304         # schema should be cleaned on delete (after commit)
   310         # schema should be cleaned on delete (after commit)
   305         self.execute('DELETE EEType X WHERE X name "Societe2"')
   311         self.execute('DELETE EEType X WHERE X name "Societe2"')
   306         self.execute('DELETE ERType X WHERE X name "concerne2"')
   312         self.execute('DELETE ERType X WHERE X name "concerne2"')
   307         self.failUnless(dbhelper.index_exists(sqlcursor, 'Societe2', 'nom'))
   313         self.failUnless(self.index_exists('Societe2', 'nom'))
   308         self.failUnless(schema.has_entity('Societe2'))
   314         self.failUnless(schema.has_entity('Societe2'))
   309         self.failUnless(schema.has_relation('concerne2'))
   315         self.failUnless(schema.has_relation('concerne2'))
   310         self.commit()
   316         self.commit()
   311         self.failIf(dbhelper.index_exists(sqlcursor, 'Societe2', 'nom'))
   317         self.failIf(self.index_exists('Societe2', 'nom'))
   312         self.failIf(schema.has_entity('Societe2'))
   318         self.failIf(schema.has_entity('Societe2'))
   313         self.failIf(schema.has_entity('concerne2'))
   319         self.failIf(schema.has_entity('concerne2'))
   314 
   320 
   315     def test_is_instance_of_insertions(self):
   321     def test_is_instance_of_insertions(self):
   316         seid = self.execute('INSERT SubDivision S: S nom "subdiv"')[0][0]
   322         seid = self.execute('INSERT SubDivision S: S nom "subdiv"')[0][0]
   383             try:
   389             try:
   384                 self.execute('SET X inlined FALSE WHERE X name "inline2"')
   390                 self.execute('SET X inlined FALSE WHERE X name "inline2"')
   385                 self.failUnless(self.schema['inline2'].inlined)
   391                 self.failUnless(self.schema['inline2'].inlined)
   386                 self.commit()
   392                 self.commit()
   387                 self.failIf(self.schema['inline2'].inlined)
   393                 self.failIf(self.schema['inline2'].inlined)
   388                 self.failIf(dbhelper.index_exists(sqlcursor, 'Personne', 'inline2'))
   394                 self.failIf(self.index_exists('Personne', 'inline2'))
   389                 rset = self.execute('Any X, Y WHERE X inline2 Y')
   395                 rset = self.execute('Any X, Y WHERE X inline2 Y')
   390                 self.assertEquals(len(rset), 1)
   396                 self.assertEquals(len(rset), 1)
   391                 self.assertEquals(rset.rows[0], [peid, aeid])
   397                 self.assertEquals(rset.rows[0], [peid, aeid])
   392             except:
   398             except:
   393                 import traceback
   399                 import traceback
   396         finally:
   402         finally:
   397             self.execute('SET X inlined TRUE WHERE X name "inline2"')
   403             self.execute('SET X inlined TRUE WHERE X name "inline2"')
   398             self.failIf(self.schema['inline2'].inlined)
   404             self.failIf(self.schema['inline2'].inlined)
   399             self.commit()
   405             self.commit()
   400             self.failUnless(self.schema['inline2'].inlined)
   406             self.failUnless(self.schema['inline2'].inlined)
   401             self.failUnless(dbhelper.index_exists(sqlcursor, 'Personne', 'inline2'))
   407             self.failUnless(self.index_exists('Personne', 'inline2'))
   402             rset = self.execute('Any X, Y WHERE X inline2 Y')
   408             rset = self.execute('Any X, Y WHERE X inline2 Y')
   403             self.assertEquals(len(rset), 1)
   409             self.assertEquals(len(rset), 1)
   404             self.assertEquals(rset.rows[0], [peid, aeid])
   410             self.assertEquals(rset.rows[0], [peid, aeid])
   405 
   411 
   406     def test_indexed_change(self):
   412     def test_indexed_change(self):
   407         dbhelper = self.session.pool.source('system').dbhelper    
   413         dbhelper = self.session.pool.source('system').dbhelper    
   408         sqlcursor = self.session.pool['system']
   414         sqlcursor = self.session.pool['system']
   409         try:
   415         try:
   410             self.execute('SET X indexed TRUE WHERE X relation_type R, R name "sujet"')
   416             self.execute('SET X indexed TRUE WHERE X relation_type R, R name "sujet"')
   411             self.failIf(self.schema['sujet'].rproperty('Affaire', 'String', 'indexed'))
   417             self.failIf(self.schema['sujet'].rproperty('Affaire', 'String', 'indexed'))
   412             self.failIf(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet'))
   418             self.failIf(self.index_exists('Affaire', 'sujet'))
   413             self.commit()
   419             self.commit()
   414             self.failUnless(self.schema['sujet'].rproperty('Affaire', 'String', 'indexed'))
   420             self.failUnless(self.schema['sujet'].rproperty('Affaire', 'String', 'indexed'))
   415             self.failUnless(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet'))
   421             self.failUnless(self.index_exists('Affaire', 'sujet'))
   416         finally:
   422         finally:
   417             self.execute('SET X indexed FALSE WHERE X relation_type R, R name "sujet"')
   423             self.execute('SET X indexed FALSE WHERE X relation_type R, R name "sujet"')
   418             self.failUnless(self.schema['sujet'].rproperty('Affaire', 'String', 'indexed'))
   424             self.failUnless(self.schema['sujet'].rproperty('Affaire', 'String', 'indexed'))
   419             self.failUnless(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet'))
   425             self.failUnless(self.index_exists('Affaire', 'sujet'))
   420             self.commit()
   426             self.commit()
   421             self.failIf(self.schema['sujet'].rproperty('Affaire', 'String', 'indexed'))
   427             self.failIf(self.schema['sujet'].rproperty('Affaire', 'String', 'indexed'))
   422             self.failIf(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet'))
   428             self.failIf(self.index_exists('Affaire', 'sujet'))
   423 
   429 
   424     def test_unique_change(self):
   430     def test_unique_change(self):
   425         dbhelper = self.session.pool.source('system').dbhelper    
   431         dbhelper = self.session.pool.source('system').dbhelper    
   426         sqlcursor = self.session.pool['system']
   432         sqlcursor = self.session.pool['system']
   427         try:
   433         try:
   428             self.execute('INSERT EConstraint X: X cstrtype CT, DEF constrained_by X '
   434             try:
   429                          'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,'
   435                 self.execute('INSERT EConstraint X: X cstrtype CT, DEF constrained_by X '
   430                          'RT name "sujet", E name "Affaire"')
   436                              'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,'
   431             self.failIf(self.schema['Affaire'].has_unique_values('sujet'))
   437                              'RT name "sujet", E name "Affaire"')
   432             self.failIf(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet', unique=True))
   438                 self.failIf(self.schema['Affaire'].has_unique_values('sujet'))
   433             self.commit()
   439                 self.failIf(self.index_exists('Affaire', 'sujet', unique=True))
   434             self.failUnless(self.schema['Affaire'].has_unique_values('sujet'))
   440                 self.commit()
   435             self.failUnless(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet', unique=True))
   441                 self.failUnless(self.schema['Affaire'].has_unique_values('sujet'))
       
   442                 self.failUnless(self.index_exists('Affaire', 'sujet', unique=True))
       
   443             except:
       
   444                 import traceback
       
   445                 traceback.print_exc()
       
   446                 raise
   436         finally:
   447         finally:
   437             self.execute('DELETE DEF constrained_by X WHERE X cstrtype CT, '
   448             self.execute('DELETE DEF constrained_by X WHERE X cstrtype CT, '
   438                          'CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,'
   449                          'CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,'
   439                          'RT name "sujet", E name "Affaire"')
   450                          'RT name "sujet", E name "Affaire"')
   440             self.failUnless(self.schema['Affaire'].has_unique_values('sujet'))
   451             self.failUnless(self.schema['Affaire'].has_unique_values('sujet'))
   441             self.failUnless(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet', unique=True))
   452             self.failUnless(self.index_exists('Affaire', 'sujet', unique=True))
   442             self.commit()
   453             self.commit()
   443             self.failIf(self.schema['Affaire'].has_unique_values('sujet'))
   454             self.failIf(self.schema['Affaire'].has_unique_values('sujet'))
   444             self.failIf(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet', unique=True))
   455             self.failIf(self.index_exists('Affaire', 'sujet', unique=True))
   445         
   456         
   446 
   457 
   447 class WorkflowHooksTC(RepositoryBasedTC):
   458 class WorkflowHooksTC(RepositoryBasedTC):
   448 
   459 
   449     def setUp(self):
   460     def setUp(self):