--- a/server/test/unittest_hooks.py Mon Apr 06 12:37:45 2009 +0200
+++ b/server/test/unittest_hooks.py Tue Apr 07 09:30:23 2009 +0200
@@ -7,8 +7,9 @@
from logilab.common.testlib import TestCase, unittest_main
from cubicweb.devtools.apptest import RepositoryBasedTC, get_versions
-from cubicweb.common import ConnectionError, RepositoryError, ValidationError
-from cubicweb.server.repository import *
+from cubicweb import ConnectionError, RepositoryError, ValidationError, AuthenticationError, BadConnectionId
+from cubicweb.server.sqlutils import SQL_PREFIX
+from cubicweb.server.repository import Repository
orig_get_versions = Repository.get_versions
@@ -250,7 +251,12 @@
repo.config._cubes = None
repo.fill_schema()
RepositoryBasedTC.setUp(self)
-
+
+ def index_exists(self, etype, attr, unique=False):
+ dbhelper = self.session.pool.source('system').dbhelper
+ sqlcursor = self.session.pool['system']
+ return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique)
+
def test_base(self):
schema = self.repo.schema
dbhelper = self.session.pool.source('system').dbhelper
@@ -281,11 +287,11 @@
'WHERE RT name "comments", E name "Societe2", C name "Comment"')
self.failIf('nom' in schema['Societe2'].subject_relations())
self.failIf('concerne2' in schema['Societe2'].subject_relations())
- self.failIf(dbhelper.index_exists(sqlcursor, 'Societe2', 'nom'))
+ self.failIf(self.index_exists('Societe2', 'nom'))
self.commit()
self.failUnless('nom' in schema['Societe2'].subject_relations())
self.failUnless('concerne2' in schema['Societe2'].subject_relations())
- self.failUnless(dbhelper.index_exists(sqlcursor, 'Societe2', 'nom'))
+ self.failUnless(self.index_exists('Societe2', 'nom'))
# now we should be able to insert and query Societe2
s2eid = self.execute('INSERT Societe2 X: X nom "logilab"')[0][0]
self.execute('Societe2 X WHERE X nom "logilab"')
@@ -304,11 +310,11 @@
# schema should be cleaned on delete (after commit)
self.execute('DELETE EEType X WHERE X name "Societe2"')
self.execute('DELETE ERType X WHERE X name "concerne2"')
- self.failUnless(dbhelper.index_exists(sqlcursor, 'Societe2', 'nom'))
+ self.failUnless(self.index_exists('Societe2', 'nom'))
self.failUnless(schema.has_entity('Societe2'))
self.failUnless(schema.has_relation('concerne2'))
self.commit()
- self.failIf(dbhelper.index_exists(sqlcursor, 'Societe2', 'nom'))
+ self.failIf(self.index_exists('Societe2', 'nom'))
self.failIf(schema.has_entity('Societe2'))
self.failIf(schema.has_entity('concerne2'))
@@ -385,7 +391,7 @@
self.failUnless(self.schema['inline2'].inlined)
self.commit()
self.failIf(self.schema['inline2'].inlined)
- self.failIf(dbhelper.index_exists(sqlcursor, 'Personne', 'inline2'))
+ self.failIf(self.index_exists('Personne', 'inline2'))
rset = self.execute('Any X, Y WHERE X inline2 Y')
self.assertEquals(len(rset), 1)
self.assertEquals(rset.rows[0], [peid, aeid])
@@ -398,7 +404,7 @@
self.failIf(self.schema['inline2'].inlined)
self.commit()
self.failUnless(self.schema['inline2'].inlined)
- self.failUnless(dbhelper.index_exists(sqlcursor, 'Personne', 'inline2'))
+ self.failUnless(self.index_exists('Personne', 'inline2'))
rset = self.execute('Any X, Y WHERE X inline2 Y')
self.assertEquals(len(rset), 1)
self.assertEquals(rset.rows[0], [peid, aeid])
@@ -409,39 +415,44 @@
try:
self.execute('SET X indexed TRUE WHERE X relation_type R, R name "sujet"')
self.failIf(self.schema['sujet'].rproperty('Affaire', 'String', 'indexed'))
- self.failIf(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet'))
+ self.failIf(self.index_exists('Affaire', 'sujet'))
self.commit()
self.failUnless(self.schema['sujet'].rproperty('Affaire', 'String', 'indexed'))
- self.failUnless(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet'))
+ self.failUnless(self.index_exists('Affaire', 'sujet'))
finally:
self.execute('SET X indexed FALSE WHERE X relation_type R, R name "sujet"')
self.failUnless(self.schema['sujet'].rproperty('Affaire', 'String', 'indexed'))
- self.failUnless(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet'))
+ self.failUnless(self.index_exists('Affaire', 'sujet'))
self.commit()
self.failIf(self.schema['sujet'].rproperty('Affaire', 'String', 'indexed'))
- self.failIf(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet'))
+ self.failIf(self.index_exists('Affaire', 'sujet'))
def test_unique_change(self):
dbhelper = self.session.pool.source('system').dbhelper
sqlcursor = self.session.pool['system']
try:
- self.execute('INSERT EConstraint X: X cstrtype CT, DEF constrained_by X '
- 'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,'
- 'RT name "sujet", E name "Affaire"')
- self.failIf(self.schema['Affaire'].has_unique_values('sujet'))
- self.failIf(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet', unique=True))
- self.commit()
- self.failUnless(self.schema['Affaire'].has_unique_values('sujet'))
- self.failUnless(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet', unique=True))
+ try:
+ self.execute('INSERT EConstraint X: X cstrtype CT, DEF constrained_by X '
+ 'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,'
+ 'RT name "sujet", E name "Affaire"')
+ self.failIf(self.schema['Affaire'].has_unique_values('sujet'))
+ self.failIf(self.index_exists('Affaire', 'sujet', unique=True))
+ self.commit()
+ self.failUnless(self.schema['Affaire'].has_unique_values('sujet'))
+ self.failUnless(self.index_exists('Affaire', 'sujet', unique=True))
+ except:
+ import traceback
+ traceback.print_exc()
+ raise
finally:
self.execute('DELETE DEF constrained_by X WHERE X cstrtype CT, '
'CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,'
'RT name "sujet", E name "Affaire"')
self.failUnless(self.schema['Affaire'].has_unique_values('sujet'))
- self.failUnless(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet', unique=True))
+ self.failUnless(self.index_exists('Affaire', 'sujet', unique=True))
self.commit()
self.failIf(self.schema['Affaire'].has_unique_values('sujet'))
- self.failIf(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet', unique=True))
+ self.failIf(self.index_exists('Affaire', 'sujet', unique=True))
class WorkflowHooksTC(RepositoryBasedTC):