diff -r 58c519e5a31f -r b2530e3e0afb server/test/unittest_repository.py --- a/server/test/unittest_repository.py Tue Aug 11 17:04:59 2009 +0200 +++ b/server/test/unittest_repository.py Tue Aug 11 17:13:32 2009 +0200 @@ -18,10 +18,11 @@ from yams.constraints import UniqueConstraint -from cubicweb import BadConnectionId, RepositoryError, ValidationError, UnknownEid, AuthenticationError +from cubicweb import (BadConnectionId, RepositoryError, ValidationError, + UnknownEid, AuthenticationError) from cubicweb.schema import CubicWebSchema, RQLConstraint from cubicweb.dbapi import connect, repo_connect, multiple_connections_unfix -from cubicweb.devtools.apptest import RepositoryBasedTC +from cubicweb.devtools.testlib import CubicWebTC from cubicweb.devtools.repotest import tuplify from cubicweb.server import repository from cubicweb.server.sqlutils import SQL_PREFIX @@ -31,48 +32,35 @@ os.system('pyro-ns >/dev/null 2>/dev/null &') -class RepositoryTC(RepositoryBasedTC): +class RepositoryTC(CubicWebTC): """ singleton providing access to a persistent storage for entities and relation """ -# def setUp(self): -# pass - -# def tearDown(self): -# self.repo.config.db_perms = True -# cnxid = self.repo.connect(*self.default_user_password()) -# for etype in ('Affaire', 'Note', 'Societe', 'Personne'): -# self.repo.execute(cnxid, 'DELETE %s X' % etype) -# self.repo.commit(cnxid) -# self.repo.close(cnxid) - def test_fill_schema(self): self.repo.schema = CubicWebSchema(self.repo.config.appid) self.repo.config._cubes = None # avoid assertion error + self.repo.config.repairing = True # avoid versions checking self.repo.fill_schema() - pool = self.repo._get_pool() table = SQL_PREFIX + 'CWEType' namecol = SQL_PREFIX + 'name' finalcol = SQL_PREFIX + 'final' - try: - cu = self.session.system_sql('SELECT %s FROM %s WHERE %s is NULL' % ( - namecol, table, finalcol)) - self.assertEquals(cu.fetchall(), []) - cu = self.session.system_sql('SELECT %s FROM %s WHERE %s=%%(final)s ORDER BY %s' - % (namecol, table, finalcol, namecol), {'final': 'TRUE'}) - self.assertEquals(cu.fetchall(), [(u'Boolean',), (u'Bytes',), - (u'Date',), (u'Datetime',), - (u'Decimal',),(u'Float',), - (u'Int',), - (u'Interval',), (u'Password',), - (u'String',), (u'Time',)]) - finally: - self.repo._free_pool(pool) + self.session.set_pool() + cu = self.session.system_sql('SELECT %s FROM %s WHERE %s is NULL' % ( + namecol, table, finalcol)) + self.assertEquals(cu.fetchall(), []) + cu = self.session.system_sql('SELECT %s FROM %s WHERE %s=%%(final)s ORDER BY %s' + % (namecol, table, finalcol, namecol), {'final': 'TRUE'}) + self.assertEquals(cu.fetchall(), [(u'Boolean',), (u'Bytes',), + (u'Date',), (u'Datetime',), + (u'Decimal',),(u'Float',), + (u'Int',), + (u'Interval',), (u'Password',), + (u'String',), (u'Time',)]) def test_schema_has_owner(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) self.failIf(repo.execute(cnxid, 'CWEType X WHERE NOT X owned_by U')) self.failIf(repo.execute(cnxid, 'CWRType X WHERE NOT X owned_by U')) self.failIf(repo.execute(cnxid, 'CWAttribute X WHERE NOT X owned_by U')) @@ -81,18 +69,17 @@ self.failIf(repo.execute(cnxid, 'CWConstraintType X WHERE NOT X owned_by U')) def test_connect(self): - login, passwd = self.default_user_password() - self.assert_(self.repo.connect(login, passwd)) + self.assert_(self.repo.connect(self.admlogin, self.admpassword)) self.assertRaises(AuthenticationError, - self.repo.connect, login, 'nimportnawak') + self.repo.connect, self.admlogin, 'nimportnawak') self.assertRaises(AuthenticationError, - self.repo.connect, login, None) + self.repo.connect, self.admlogin, None) self.assertRaises(AuthenticationError, self.repo.connect, None, None) def test_execute(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) repo.execute(cnxid, 'Any X') repo.execute(cnxid, 'Any X where X is Personne') repo.execute(cnxid, 'Any X where X is Personne, X nom ~= "to"') @@ -101,7 +88,7 @@ def test_login_upassword_accent(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_state S, X in_group G WHERE S name "activated", G name "users"', {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')}) repo.commit(cnxid) @@ -110,7 +97,7 @@ def test_invalid_entity_rollback(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) # no group repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_state S WHERE S name "activated"', {'login': u"tutetute", 'passwd': 'tutetute'}) @@ -120,7 +107,7 @@ def test_close(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) self.assert_(cnxid) repo.close(cnxid) self.assertRaises(BadConnectionId, repo.execute, cnxid, 'Any X') @@ -131,9 +118,9 @@ def test_shared_data(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) repo.set_shared_data(cnxid, 'data', 4) - cnxid2 = repo.connect(*self.default_user_password()) + cnxid2 = repo.connect(self.admlogin, self.admpassword) self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) self.assertEquals(repo.get_shared_data(cnxid2, 'data'), None) repo.set_shared_data(cnxid2, 'data', 5) @@ -151,14 +138,14 @@ def test_check_session(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) self.assertEquals(repo.check_session(cnxid), None) repo.close(cnxid) self.assertRaises(BadConnectionId, repo.check_session, cnxid) def test_transaction_base(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) # check db state result = repo.execute(cnxid, 'Personne X') self.assertEquals(result.rowcount, 0) @@ -177,7 +164,7 @@ def test_transaction_base2(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) # rollback relation insertion repo.execute(cnxid, "SET U in_group G WHERE U login 'admin', G name 'guests'") result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'") @@ -188,7 +175,7 @@ def test_transaction_base3(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) # rollback state change which trigger TrInfo insertion ueid = repo._get_session(cnxid).user.eid rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid}) @@ -206,7 +193,7 @@ def test_close_wait_processing_request(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) repo.execute(cnxid, 'INSERT CWUser X: X login "toto", X upassword "tutu", X in_group G WHERE G name "users"') repo.commit(cnxid) # close has to be in the thread due to sqlite limitations @@ -290,7 +277,7 @@ def test_internal_api(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) session = repo._get_session(cnxid, setpool=True) self.assertEquals(repo.type_and_source_from_eid(1, session), ('CWGroup', 'system', None)) @@ -308,7 +295,7 @@ def test_session_api(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) self.assertEquals(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {})) self.assertEquals(repo.describe(cnxid, 1), (u'CWGroup', u'system', None)) repo.close(cnxid) @@ -317,7 +304,7 @@ def test_shared_data_api(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) self.assertEquals(repo.get_shared_data(cnxid, 'data'), None) repo.set_shared_data(cnxid, 'data', 4) self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) @@ -343,37 +330,34 @@ # print 'test time: %.3f (time) %.3f (cpu)' % ((time() - t), clock() - c) -class DataHelpersTC(RepositoryBasedTC): - - def setUp(self): - """ called before each test from this class """ - cnxid = self.repo.connect(*self.default_user_password()) - self.session = self.repo._sessions[cnxid] - self.session.set_pool() - - def tearDown(self): - self.session.rollback() +class DataHelpersTC(CubicWebTC): def test_create_eid(self): + self.session.set_pool() self.assert_(self.repo.system_source.create_eid(self.session)) def test_source_from_eid(self): + self.session.set_pool() self.assertEquals(self.repo.source_from_eid(1, self.session), self.repo.sources_by_uri['system']) def test_source_from_eid_raise(self): + self.session.set_pool() self.assertRaises(UnknownEid, self.repo.source_from_eid, -2, self.session) def test_type_from_eid(self): + self.session.set_pool() self.assertEquals(self.repo.type_from_eid(1, self.session), 'CWGroup') def test_type_from_eid_raise(self): + self.session.set_pool() self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, self.session) def test_add_delete_info(self): entity = self.repo.vreg['etypes'].etype_class('Personne')(self.session) entity.eid = -1 entity.complete = lambda x: None + self.session.set_pool() self.repo.add_info(self.session, entity, self.repo.sources_by_uri['system']) cu = self.session.system_sql('SELECT * FROM entities WHERE eid = -1') data = cu.fetchall() @@ -388,13 +372,14 @@ self.assertEquals(data, []) -class FTITC(RepositoryBasedTC): +class FTITC(CubicWebTC): def test_reindex_and_modified_since(self): eidp = self.execute('INSERT Personne X: X nom "toto", X prenom "tutu"')[0][0] self.commit() ts = datetime.now() self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1) + self.session.set_pool() cu = self.session.system_sql('SELECT mtime, eid FROM entities WHERE eid = %s' % eidp) omtime = cu.fetchone()[0] # our sqlite datetime adapter is ignore seconds fraction, so we have to @@ -403,6 +388,7 @@ self.execute('SET X nom "tata" WHERE X eid %(x)s', {'x': eidp}, 'x') self.commit() self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1) + self.session.set_pool() cu = self.session.system_sql('SELECT mtime FROM entities WHERE eid = %s' % eidp) mtime = cu.fetchone()[0] self.failUnless(omtime < mtime) @@ -440,7 +426,7 @@ self.assertEquals(rset.rows, [[self.session.user.eid]]) -class DBInitTC(RepositoryBasedTC): +class DBInitTC(CubicWebTC): def test_versions_inserted(self): inserted = [r[0] for r in self.execute('Any K ORDERBY K WHERE P pkey K, P pkey ~= "system.version.%"')] @@ -450,11 +436,11 @@ u'system.version.file', u'system.version.folder', u'system.version.tag']) -class InlineRelHooksTC(RepositoryBasedTC): +class InlineRelHooksTC(CubicWebTC): """test relation hooks are called for inlined relations """ def setUp(self): - RepositoryBasedTC.setUp(self) + CubicWebTC.setUp(self) self.hm = self.repo.hm self.called = []