--- a/server/test/unittest_repository.py Tue Aug 11 17:04:59 2009 +0200
+++ b/server/test/unittest_repository.py Tue Aug 11 17:13:32 2009 +0200
@@ -18,10 +18,11 @@
from yams.constraints import UniqueConstraint
-from cubicweb import BadConnectionId, RepositoryError, ValidationError, UnknownEid, AuthenticationError
+from cubicweb import (BadConnectionId, RepositoryError, ValidationError,
+ UnknownEid, AuthenticationError)
from cubicweb.schema import CubicWebSchema, RQLConstraint
from cubicweb.dbapi import connect, repo_connect, multiple_connections_unfix
-from cubicweb.devtools.apptest import RepositoryBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.devtools.repotest import tuplify
from cubicweb.server import repository
from cubicweb.server.sqlutils import SQL_PREFIX
@@ -31,48 +32,35 @@
os.system('pyro-ns >/dev/null 2>/dev/null &')
-class RepositoryTC(RepositoryBasedTC):
+class RepositoryTC(CubicWebTC):
""" singleton providing access to a persistent storage for entities
and relation
"""
-# def setUp(self):
-# pass
-
-# def tearDown(self):
-# self.repo.config.db_perms = True
-# cnxid = self.repo.connect(*self.default_user_password())
-# for etype in ('Affaire', 'Note', 'Societe', 'Personne'):
-# self.repo.execute(cnxid, 'DELETE %s X' % etype)
-# self.repo.commit(cnxid)
-# self.repo.close(cnxid)
-
def test_fill_schema(self):
self.repo.schema = CubicWebSchema(self.repo.config.appid)
self.repo.config._cubes = None # avoid assertion error
+ self.repo.config.repairing = True # avoid versions checking
self.repo.fill_schema()
- pool = self.repo._get_pool()
table = SQL_PREFIX + 'CWEType'
namecol = SQL_PREFIX + 'name'
finalcol = SQL_PREFIX + 'final'
- try:
- cu = self.session.system_sql('SELECT %s FROM %s WHERE %s is NULL' % (
- namecol, table, finalcol))
- self.assertEquals(cu.fetchall(), [])
- cu = self.session.system_sql('SELECT %s FROM %s WHERE %s=%%(final)s ORDER BY %s'
- % (namecol, table, finalcol, namecol), {'final': 'TRUE'})
- self.assertEquals(cu.fetchall(), [(u'Boolean',), (u'Bytes',),
- (u'Date',), (u'Datetime',),
- (u'Decimal',),(u'Float',),
- (u'Int',),
- (u'Interval',), (u'Password',),
- (u'String',), (u'Time',)])
- finally:
- self.repo._free_pool(pool)
+ self.session.set_pool()
+ cu = self.session.system_sql('SELECT %s FROM %s WHERE %s is NULL' % (
+ namecol, table, finalcol))
+ self.assertEquals(cu.fetchall(), [])
+ cu = self.session.system_sql('SELECT %s FROM %s WHERE %s=%%(final)s ORDER BY %s'
+ % (namecol, table, finalcol, namecol), {'final': 'TRUE'})
+ self.assertEquals(cu.fetchall(), [(u'Boolean',), (u'Bytes',),
+ (u'Date',), (u'Datetime',),
+ (u'Decimal',),(u'Float',),
+ (u'Int',),
+ (u'Interval',), (u'Password',),
+ (u'String',), (u'Time',)])
def test_schema_has_owner(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
self.failIf(repo.execute(cnxid, 'CWEType X WHERE NOT X owned_by U'))
self.failIf(repo.execute(cnxid, 'CWRType X WHERE NOT X owned_by U'))
self.failIf(repo.execute(cnxid, 'CWAttribute X WHERE NOT X owned_by U'))
@@ -81,18 +69,17 @@
self.failIf(repo.execute(cnxid, 'CWConstraintType X WHERE NOT X owned_by U'))
def test_connect(self):
- login, passwd = self.default_user_password()
- self.assert_(self.repo.connect(login, passwd))
+ self.assert_(self.repo.connect(self.admlogin, self.admpassword))
self.assertRaises(AuthenticationError,
- self.repo.connect, login, 'nimportnawak')
+ self.repo.connect, self.admlogin, 'nimportnawak')
self.assertRaises(AuthenticationError,
- self.repo.connect, login, None)
+ self.repo.connect, self.admlogin, None)
self.assertRaises(AuthenticationError,
self.repo.connect, None, None)
def test_execute(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
repo.execute(cnxid, 'Any X')
repo.execute(cnxid, 'Any X where X is Personne')
repo.execute(cnxid, 'Any X where X is Personne, X nom ~= "to"')
@@ -101,7 +88,7 @@
def test_login_upassword_accent(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_state S, X in_group G WHERE S name "activated", G name "users"',
{'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')})
repo.commit(cnxid)
@@ -110,7 +97,7 @@
def test_invalid_entity_rollback(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
# no group
repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_state S WHERE S name "activated"',
{'login': u"tutetute", 'passwd': 'tutetute'})
@@ -120,7 +107,7 @@
def test_close(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
self.assert_(cnxid)
repo.close(cnxid)
self.assertRaises(BadConnectionId, repo.execute, cnxid, 'Any X')
@@ -131,9 +118,9 @@
def test_shared_data(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
repo.set_shared_data(cnxid, 'data', 4)
- cnxid2 = repo.connect(*self.default_user_password())
+ cnxid2 = repo.connect(self.admlogin, self.admpassword)
self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
self.assertEquals(repo.get_shared_data(cnxid2, 'data'), None)
repo.set_shared_data(cnxid2, 'data', 5)
@@ -151,14 +138,14 @@
def test_check_session(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
self.assertEquals(repo.check_session(cnxid), None)
repo.close(cnxid)
self.assertRaises(BadConnectionId, repo.check_session, cnxid)
def test_transaction_base(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
# check db state
result = repo.execute(cnxid, 'Personne X')
self.assertEquals(result.rowcount, 0)
@@ -177,7 +164,7 @@
def test_transaction_base2(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
# rollback relation insertion
repo.execute(cnxid, "SET U in_group G WHERE U login 'admin', G name 'guests'")
result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'")
@@ -188,7 +175,7 @@
def test_transaction_base3(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
# rollback state change which trigger TrInfo insertion
ueid = repo._get_session(cnxid).user.eid
rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid})
@@ -206,7 +193,7 @@
def test_close_wait_processing_request(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
repo.execute(cnxid, 'INSERT CWUser X: X login "toto", X upassword "tutu", X in_group G WHERE G name "users"')
repo.commit(cnxid)
# close has to be in the thread due to sqlite limitations
@@ -290,7 +277,7 @@
def test_internal_api(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
session = repo._get_session(cnxid, setpool=True)
self.assertEquals(repo.type_and_source_from_eid(1, session),
('CWGroup', 'system', None))
@@ -308,7 +295,7 @@
def test_session_api(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
self.assertEquals(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {}))
self.assertEquals(repo.describe(cnxid, 1), (u'CWGroup', u'system', None))
repo.close(cnxid)
@@ -317,7 +304,7 @@
def test_shared_data_api(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
self.assertEquals(repo.get_shared_data(cnxid, 'data'), None)
repo.set_shared_data(cnxid, 'data', 4)
self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
@@ -343,37 +330,34 @@
# print 'test time: %.3f (time) %.3f (cpu)' % ((time() - t), clock() - c)
-class DataHelpersTC(RepositoryBasedTC):
-
- def setUp(self):
- """ called before each test from this class """
- cnxid = self.repo.connect(*self.default_user_password())
- self.session = self.repo._sessions[cnxid]
- self.session.set_pool()
-
- def tearDown(self):
- self.session.rollback()
+class DataHelpersTC(CubicWebTC):
def test_create_eid(self):
+ self.session.set_pool()
self.assert_(self.repo.system_source.create_eid(self.session))
def test_source_from_eid(self):
+ self.session.set_pool()
self.assertEquals(self.repo.source_from_eid(1, self.session),
self.repo.sources_by_uri['system'])
def test_source_from_eid_raise(self):
+ self.session.set_pool()
self.assertRaises(UnknownEid, self.repo.source_from_eid, -2, self.session)
def test_type_from_eid(self):
+ self.session.set_pool()
self.assertEquals(self.repo.type_from_eid(1, self.session), 'CWGroup')
def test_type_from_eid_raise(self):
+ self.session.set_pool()
self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, self.session)
def test_add_delete_info(self):
entity = self.repo.vreg['etypes'].etype_class('Personne')(self.session)
entity.eid = -1
entity.complete = lambda x: None
+ self.session.set_pool()
self.repo.add_info(self.session, entity, self.repo.sources_by_uri['system'])
cu = self.session.system_sql('SELECT * FROM entities WHERE eid = -1')
data = cu.fetchall()
@@ -388,13 +372,14 @@
self.assertEquals(data, [])
-class FTITC(RepositoryBasedTC):
+class FTITC(CubicWebTC):
def test_reindex_and_modified_since(self):
eidp = self.execute('INSERT Personne X: X nom "toto", X prenom "tutu"')[0][0]
self.commit()
ts = datetime.now()
self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1)
+ self.session.set_pool()
cu = self.session.system_sql('SELECT mtime, eid FROM entities WHERE eid = %s' % eidp)
omtime = cu.fetchone()[0]
# our sqlite datetime adapter is ignore seconds fraction, so we have to
@@ -403,6 +388,7 @@
self.execute('SET X nom "tata" WHERE X eid %(x)s', {'x': eidp}, 'x')
self.commit()
self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1)
+ self.session.set_pool()
cu = self.session.system_sql('SELECT mtime FROM entities WHERE eid = %s' % eidp)
mtime = cu.fetchone()[0]
self.failUnless(omtime < mtime)
@@ -440,7 +426,7 @@
self.assertEquals(rset.rows, [[self.session.user.eid]])
-class DBInitTC(RepositoryBasedTC):
+class DBInitTC(CubicWebTC):
def test_versions_inserted(self):
inserted = [r[0] for r in self.execute('Any K ORDERBY K WHERE P pkey K, P pkey ~= "system.version.%"')]
@@ -450,11 +436,11 @@
u'system.version.file', u'system.version.folder',
u'system.version.tag'])
-class InlineRelHooksTC(RepositoryBasedTC):
+class InlineRelHooksTC(CubicWebTC):
"""test relation hooks are called for inlined relations
"""
def setUp(self):
- RepositoryBasedTC.setUp(self)
+ CubicWebTC.setUp(self)
self.hm = self.repo.hm
self.called = []