diff -r 5c20a7f13c84 -r af40e615dc89 server/test/unittest_hooks.py --- a/server/test/unittest_hooks.py Mon Apr 06 16:11:38 2009 +0200 +++ b/server/test/unittest_hooks.py Mon Apr 06 16:18:46 2009 +0200 @@ -7,8 +7,9 @@ from logilab.common.testlib import TestCase, unittest_main from cubicweb.devtools.apptest import RepositoryBasedTC, get_versions -from cubicweb.common import ConnectionError, RepositoryError, ValidationError -from cubicweb.server.repository import * +from cubicweb import ConnectionError, RepositoryError, ValidationError, AuthenticationError, BadConnectionId +from cubicweb.server.sqlutils import SQL_PREFIX +from cubicweb.server.repository import Repository orig_get_versions = Repository.get_versions @@ -250,7 +251,12 @@ repo.config._cubes = None repo.fill_schema() RepositoryBasedTC.setUp(self) - + + def index_exists(self, etype, attr, unique=False): + dbhelper = self.session.pool.source('system').dbhelper + sqlcursor = self.session.pool['system'] + return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique) + def test_base(self): schema = self.repo.schema dbhelper = self.session.pool.source('system').dbhelper @@ -281,11 +287,11 @@ 'WHERE RT name "comments", E name "Societe2", C name "Comment"') self.failIf('nom' in schema['Societe2'].subject_relations()) self.failIf('concerne2' in schema['Societe2'].subject_relations()) - self.failIf(dbhelper.index_exists(sqlcursor, 'Societe2', 'nom')) + self.failIf(self.index_exists('Societe2', 'nom')) self.commit() self.failUnless('nom' in schema['Societe2'].subject_relations()) self.failUnless('concerne2' in schema['Societe2'].subject_relations()) - self.failUnless(dbhelper.index_exists(sqlcursor, 'Societe2', 'nom')) + self.failUnless(self.index_exists('Societe2', 'nom')) # now we should be able to insert and query Societe2 s2eid = self.execute('INSERT Societe2 X: X nom "logilab"')[0][0] self.execute('Societe2 X WHERE X nom "logilab"') @@ -304,11 +310,11 @@ # schema should be cleaned on delete (after commit) self.execute('DELETE EEType X WHERE X name "Societe2"') self.execute('DELETE ERType X WHERE X name "concerne2"') - self.failUnless(dbhelper.index_exists(sqlcursor, 'Societe2', 'nom')) + self.failUnless(self.index_exists('Societe2', 'nom')) self.failUnless(schema.has_entity('Societe2')) self.failUnless(schema.has_relation('concerne2')) self.commit() - self.failIf(dbhelper.index_exists(sqlcursor, 'Societe2', 'nom')) + self.failIf(self.index_exists('Societe2', 'nom')) self.failIf(schema.has_entity('Societe2')) self.failIf(schema.has_entity('concerne2')) @@ -385,7 +391,7 @@ self.failUnless(self.schema['inline2'].inlined) self.commit() self.failIf(self.schema['inline2'].inlined) - self.failIf(dbhelper.index_exists(sqlcursor, 'Personne', 'inline2')) + self.failIf(self.index_exists('Personne', 'inline2')) rset = self.execute('Any X, Y WHERE X inline2 Y') self.assertEquals(len(rset), 1) self.assertEquals(rset.rows[0], [peid, aeid]) @@ -398,7 +404,7 @@ self.failIf(self.schema['inline2'].inlined) self.commit() self.failUnless(self.schema['inline2'].inlined) - self.failUnless(dbhelper.index_exists(sqlcursor, 'Personne', 'inline2')) + self.failUnless(self.index_exists('Personne', 'inline2')) rset = self.execute('Any X, Y WHERE X inline2 Y') self.assertEquals(len(rset), 1) self.assertEquals(rset.rows[0], [peid, aeid]) @@ -409,39 +415,44 @@ try: self.execute('SET X indexed TRUE WHERE X relation_type R, R name "sujet"') self.failIf(self.schema['sujet'].rproperty('Affaire', 'String', 'indexed')) - self.failIf(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet')) + self.failIf(self.index_exists('Affaire', 'sujet')) self.commit() self.failUnless(self.schema['sujet'].rproperty('Affaire', 'String', 'indexed')) - self.failUnless(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet')) + self.failUnless(self.index_exists('Affaire', 'sujet')) finally: self.execute('SET X indexed FALSE WHERE X relation_type R, R name "sujet"') self.failUnless(self.schema['sujet'].rproperty('Affaire', 'String', 'indexed')) - self.failUnless(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet')) + self.failUnless(self.index_exists('Affaire', 'sujet')) self.commit() self.failIf(self.schema['sujet'].rproperty('Affaire', 'String', 'indexed')) - self.failIf(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet')) + self.failIf(self.index_exists('Affaire', 'sujet')) def test_unique_change(self): dbhelper = self.session.pool.source('system').dbhelper sqlcursor = self.session.pool['system'] try: - self.execute('INSERT EConstraint X: X cstrtype CT, DEF constrained_by X ' - 'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,' - 'RT name "sujet", E name "Affaire"') - self.failIf(self.schema['Affaire'].has_unique_values('sujet')) - self.failIf(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet', unique=True)) - self.commit() - self.failUnless(self.schema['Affaire'].has_unique_values('sujet')) - self.failUnless(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet', unique=True)) + try: + self.execute('INSERT EConstraint X: X cstrtype CT, DEF constrained_by X ' + 'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,' + 'RT name "sujet", E name "Affaire"') + self.failIf(self.schema['Affaire'].has_unique_values('sujet')) + self.failIf(self.index_exists('Affaire', 'sujet', unique=True)) + self.commit() + self.failUnless(self.schema['Affaire'].has_unique_values('sujet')) + self.failUnless(self.index_exists('Affaire', 'sujet', unique=True)) + except: + import traceback + traceback.print_exc() + raise finally: self.execute('DELETE DEF constrained_by X WHERE X cstrtype CT, ' 'CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,' 'RT name "sujet", E name "Affaire"') self.failUnless(self.schema['Affaire'].has_unique_values('sujet')) - self.failUnless(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet', unique=True)) + self.failUnless(self.index_exists('Affaire', 'sujet', unique=True)) self.commit() self.failIf(self.schema['Affaire'].has_unique_values('sujet')) - self.failIf(dbhelper.index_exists(sqlcursor, 'Affaire', 'sujet', unique=True)) + self.failIf(self.index_exists('Affaire', 'sujet', unique=True)) class WorkflowHooksTC(RepositoryBasedTC):