server/test/unittest_hooks.py
changeset 1251 af40e615dc89
parent 0 b97547f5f1fa
child 1398 5fe84a5f7035
--- a/server/test/unittest_hooks.py	Mon Apr 06 16:11:38 2009 +0200
+++ b/server/test/unittest_hooks.py	Mon Apr 06 16:18:46 2009 +0200
@@ -7,8 +7,9 @@
 from logilab.common.testlib import TestCase, unittest_main
 from cubicweb.devtools.apptest import RepositoryBasedTC, get_versions
 
-from cubicweb.common import ConnectionError, RepositoryError, ValidationError
-from cubicweb.server.repository import *
+from cubicweb import ConnectionError, RepositoryError, ValidationError, AuthenticationError, BadConnectionId
+from cubicweb.server.sqlutils import SQL_PREFIX
+from cubicweb.server.repository import Repository
 
 orig_get_versions = Repository.get_versions
 
@@ -250,7 +251,12 @@
             repo.config._cubes = None
             repo.fill_schema()
         RepositoryBasedTC.setUp(self)
-            
+
+    def index_exists(self, etype, attr, unique=False):
+        dbhelper = self.session.pool.source('system').dbhelper    
+        sqlcursor = self.session.pool['system']
+        return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique)
+        
     def test_base(self):
         schema = self.repo.schema
         dbhelper = self.session.pool.source('system').dbhelper    
@@ -281,11 +287,11 @@
                      'WHERE RT name "comments", E name "Societe2", C name "Comment"')
         self.failIf('nom' in schema['Societe2'].subject_relations())
         self.failIf('concerne2' in schema['Societe2'].subject_relations())
-        self.failIf(dbhelper.index_exists(sqlcursor, 'Societe2', 'nom'))
+        self.failIf(self.index_exists('Societe2', 'nom'))
         self.commit()
         self.failUnless('nom' in schema['Societe2'].subject_relations())
         self.failUnless('concerne2' in schema['Societe2'].subject_relations())
-        self.failUnless(dbhelper.index_exists(sqlcursor, 'Societe2', 'nom'))
+        self.failUnless(self.index_exists('Societe2', 'nom'))
         # now we should be able to insert and query Societe2
         s2eid = self.execute('INSERT Societe2 X: X nom "logilab"')[0][0]
         self.execute('Societe2 X WHERE X nom "logilab"')
@@ -304,11 +310,11 @@
         # schema should be cleaned on delete (after commit)
         self.execute('DELETE EEType X WHERE X name "Societe2"')
         self.execute('DELETE ERType X WHERE X name "concerne2"')
-        self.failUnless(dbhelper.index_exists(sqlcursor, 'Societe2', 'nom'))
+        self.failUnless(self.index_exists('Societe2', 'nom'))
         self.failUnless(schema.has_entity('Societe2'))
         self.failUnless(schema.has_relation('concerne2'))
         self.commit()
-        self.failIf(dbhelper.index_exists(sqlcursor, 'Societe2', 'nom'))
+        self.failIf(self.index_exists('Societe2', 'nom'))
         self.failIf(schema.has_entity('Societe2'))
         self.failIf(schema.has_entity('concerne2'))
 
@@ -385,7 +391,7 @@
                 self.failUnless(self.schema['inline2'].inlined)
                 self.commit()
                 self.failIf(self.schema['inline2'].inlined)
-                self.failIf(dbhelper.index_exists(sqlcursor, 'Personne', 'inline2'))
+                self.failIf(self.index_exists('Personne', 'inline2'))
                 rset = self.execute('Any X, Y WHERE X inline2 Y')
                 self.assertEquals(len(rset), 1)
                 self.assertEquals(rset.rows[0], [peid, aeid])
@@ -398,7 +404,7 @@
             self.failIf(self.schema['inline2'].inlined)
             self.commit()
             self.failUnless(self.schema['inline2'].inlined)
-            self.failUnless(dbhelper.index_exists(sqlcursor, 'Personne', 'inline2'))
+            self.failUnless(self.index_exists('Personne', 'inline2'))
             rset = self.execute('Any X, Y WHERE X inline2 Y')
             self.assertEquals(len(rset), 1)
             self.assertEquals(rset.rows[0], [peid, aeid])
@@ -409,39 +415,44 @@
         try:
             self.execute('SET X indexed TRUE WHERE X relation_type R, R name "sujet"')
             self.failIf(self.schema['sujet'].rproperty('Affaire', 'String', 'indexed'))
-            self.failIf(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet'))
+            self.failIf(self.index_exists('Affaire', 'sujet'))
             self.commit()
             self.failUnless(self.schema['sujet'].rproperty('Affaire', 'String', 'indexed'))
-            self.failUnless(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet'))
+            self.failUnless(self.index_exists('Affaire', 'sujet'))
         finally:
             self.execute('SET X indexed FALSE WHERE X relation_type R, R name "sujet"')
             self.failUnless(self.schema['sujet'].rproperty('Affaire', 'String', 'indexed'))
-            self.failUnless(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet'))
+            self.failUnless(self.index_exists('Affaire', 'sujet'))
             self.commit()
             self.failIf(self.schema['sujet'].rproperty('Affaire', 'String', 'indexed'))
-            self.failIf(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet'))
+            self.failIf(self.index_exists('Affaire', 'sujet'))
 
     def test_unique_change(self):
         dbhelper = self.session.pool.source('system').dbhelper    
         sqlcursor = self.session.pool['system']
         try:
-            self.execute('INSERT EConstraint X: X cstrtype CT, DEF constrained_by X '
-                         'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,'
-                         'RT name "sujet", E name "Affaire"')
-            self.failIf(self.schema['Affaire'].has_unique_values('sujet'))
-            self.failIf(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet', unique=True))
-            self.commit()
-            self.failUnless(self.schema['Affaire'].has_unique_values('sujet'))
-            self.failUnless(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet', unique=True))
+            try:
+                self.execute('INSERT EConstraint X: X cstrtype CT, DEF constrained_by X '
+                             'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,'
+                             'RT name "sujet", E name "Affaire"')
+                self.failIf(self.schema['Affaire'].has_unique_values('sujet'))
+                self.failIf(self.index_exists('Affaire', 'sujet', unique=True))
+                self.commit()
+                self.failUnless(self.schema['Affaire'].has_unique_values('sujet'))
+                self.failUnless(self.index_exists('Affaire', 'sujet', unique=True))
+            except:
+                import traceback
+                traceback.print_exc()
+                raise
         finally:
             self.execute('DELETE DEF constrained_by X WHERE X cstrtype CT, '
                          'CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,'
                          'RT name "sujet", E name "Affaire"')
             self.failUnless(self.schema['Affaire'].has_unique_values('sujet'))
-            self.failUnless(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet', unique=True))
+            self.failUnless(self.index_exists('Affaire', 'sujet', unique=True))
             self.commit()
             self.failIf(self.schema['Affaire'].has_unique_values('sujet'))
-            self.failIf(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet', unique=True))
+            self.failIf(self.index_exists('Affaire', 'sujet', unique=True))
         
 
 class WorkflowHooksTC(RepositoryBasedTC):