server/test/unittest_repository.py
changeset 2773 b2530e3e0afb
parent 2650 18aec79ec3a3
child 2968 0e3460341023
--- a/server/test/unittest_repository.py	Tue Aug 11 17:04:59 2009 +0200
+++ b/server/test/unittest_repository.py	Tue Aug 11 17:13:32 2009 +0200
@@ -18,10 +18,11 @@
 
 from yams.constraints import UniqueConstraint
 
-from cubicweb import BadConnectionId, RepositoryError, ValidationError, UnknownEid, AuthenticationError
+from cubicweb import (BadConnectionId, RepositoryError, ValidationError,
+                      UnknownEid, AuthenticationError)
 from cubicweb.schema import CubicWebSchema, RQLConstraint
 from cubicweb.dbapi import connect, repo_connect, multiple_connections_unfix
-from cubicweb.devtools.apptest import RepositoryBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
 from cubicweb.devtools.repotest import tuplify
 from cubicweb.server import repository
 from cubicweb.server.sqlutils import SQL_PREFIX
@@ -31,48 +32,35 @@
 os.system('pyro-ns >/dev/null 2>/dev/null &')
 
 
-class RepositoryTC(RepositoryBasedTC):
+class RepositoryTC(CubicWebTC):
     """ singleton providing access to a persistent storage for entities
     and relation
     """
 
-#     def setUp(self):
-#         pass
-
-#     def tearDown(self):
-#         self.repo.config.db_perms = True
-#         cnxid = self.repo.connect(*self.default_user_password())
-#         for etype in ('Affaire', 'Note', 'Societe', 'Personne'):
-#             self.repo.execute(cnxid, 'DELETE %s X' % etype)
-#             self.repo.commit(cnxid)
-#         self.repo.close(cnxid)
-
     def test_fill_schema(self):
         self.repo.schema = CubicWebSchema(self.repo.config.appid)
         self.repo.config._cubes = None # avoid assertion error
+        self.repo.config.repairing = True # avoid versions checking
         self.repo.fill_schema()
-        pool = self.repo._get_pool()
         table = SQL_PREFIX + 'CWEType'
         namecol = SQL_PREFIX + 'name'
         finalcol = SQL_PREFIX + 'final'
-        try:
-            cu = self.session.system_sql('SELECT %s FROM %s WHERE %s is NULL' % (
-                namecol, table, finalcol))
-            self.assertEquals(cu.fetchall(), [])
-            cu = self.session.system_sql('SELECT %s FROM %s WHERE %s=%%(final)s ORDER BY %s'
-                              % (namecol, table, finalcol, namecol), {'final': 'TRUE'})
-            self.assertEquals(cu.fetchall(), [(u'Boolean',), (u'Bytes',),
-                                                     (u'Date',), (u'Datetime',),
-                                                     (u'Decimal',),(u'Float',),
-                                                     (u'Int',),
-                                                     (u'Interval',), (u'Password',),
-                                                     (u'String',), (u'Time',)])
-        finally:
-            self.repo._free_pool(pool)
+        self.session.set_pool()
+        cu = self.session.system_sql('SELECT %s FROM %s WHERE %s is NULL' % (
+            namecol, table, finalcol))
+        self.assertEquals(cu.fetchall(), [])
+        cu = self.session.system_sql('SELECT %s FROM %s WHERE %s=%%(final)s ORDER BY %s'
+                          % (namecol, table, finalcol, namecol), {'final': 'TRUE'})
+        self.assertEquals(cu.fetchall(), [(u'Boolean',), (u'Bytes',),
+                                          (u'Date',), (u'Datetime',),
+                                          (u'Decimal',),(u'Float',),
+                                          (u'Int',),
+                                          (u'Interval',), (u'Password',),
+                                          (u'String',), (u'Time',)])
 
     def test_schema_has_owner(self):
         repo = self.repo
-        cnxid = repo.connect(*self.default_user_password())
+        cnxid = repo.connect(self.admlogin, self.admpassword)
         self.failIf(repo.execute(cnxid, 'CWEType X WHERE NOT X owned_by U'))
         self.failIf(repo.execute(cnxid, 'CWRType X WHERE NOT X owned_by U'))
         self.failIf(repo.execute(cnxid, 'CWAttribute X WHERE NOT X owned_by U'))
@@ -81,18 +69,17 @@
         self.failIf(repo.execute(cnxid, 'CWConstraintType X WHERE NOT X owned_by U'))
 
     def test_connect(self):
-        login, passwd = self.default_user_password()
-        self.assert_(self.repo.connect(login, passwd))
+        self.assert_(self.repo.connect(self.admlogin, self.admpassword))
         self.assertRaises(AuthenticationError,
-                          self.repo.connect, login, 'nimportnawak')
+                          self.repo.connect, self.admlogin, 'nimportnawak')
         self.assertRaises(AuthenticationError,
-                          self.repo.connect, login, None)
+                          self.repo.connect, self.admlogin, None)
         self.assertRaises(AuthenticationError,
                           self.repo.connect, None, None)
 
     def test_execute(self):
         repo = self.repo
-        cnxid = repo.connect(*self.default_user_password())
+        cnxid = repo.connect(self.admlogin, self.admpassword)
         repo.execute(cnxid, 'Any X')
         repo.execute(cnxid, 'Any X where X is Personne')
         repo.execute(cnxid, 'Any X where X is Personne, X nom ~= "to"')
@@ -101,7 +88,7 @@
 
     def test_login_upassword_accent(self):
         repo = self.repo
-        cnxid = repo.connect(*self.default_user_password())
+        cnxid = repo.connect(self.admlogin, self.admpassword)
         repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_state S, X in_group G WHERE S name "activated", G name "users"',
                      {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')})
         repo.commit(cnxid)
@@ -110,7 +97,7 @@
 
     def test_invalid_entity_rollback(self):
         repo = self.repo
-        cnxid = repo.connect(*self.default_user_password())
+        cnxid = repo.connect(self.admlogin, self.admpassword)
         # no group
         repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_state S WHERE S name "activated"',
                      {'login': u"tutetute", 'passwd': 'tutetute'})
@@ -120,7 +107,7 @@
 
     def test_close(self):
         repo = self.repo
-        cnxid = repo.connect(*self.default_user_password())
+        cnxid = repo.connect(self.admlogin, self.admpassword)
         self.assert_(cnxid)
         repo.close(cnxid)
         self.assertRaises(BadConnectionId, repo.execute, cnxid, 'Any X')
@@ -131,9 +118,9 @@
 
     def test_shared_data(self):
         repo = self.repo
-        cnxid = repo.connect(*self.default_user_password())
+        cnxid = repo.connect(self.admlogin, self.admpassword)
         repo.set_shared_data(cnxid, 'data', 4)
-        cnxid2 = repo.connect(*self.default_user_password())
+        cnxid2 = repo.connect(self.admlogin, self.admpassword)
         self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
         self.assertEquals(repo.get_shared_data(cnxid2, 'data'), None)
         repo.set_shared_data(cnxid2, 'data', 5)
@@ -151,14 +138,14 @@
 
     def test_check_session(self):
         repo = self.repo
-        cnxid = repo.connect(*self.default_user_password())
+        cnxid = repo.connect(self.admlogin, self.admpassword)
         self.assertEquals(repo.check_session(cnxid), None)
         repo.close(cnxid)
         self.assertRaises(BadConnectionId, repo.check_session, cnxid)
 
     def test_transaction_base(self):
         repo = self.repo
-        cnxid = repo.connect(*self.default_user_password())
+        cnxid = repo.connect(self.admlogin, self.admpassword)
         # check db state
         result = repo.execute(cnxid, 'Personne X')
         self.assertEquals(result.rowcount, 0)
@@ -177,7 +164,7 @@
 
     def test_transaction_base2(self):
         repo = self.repo
-        cnxid = repo.connect(*self.default_user_password())
+        cnxid = repo.connect(self.admlogin, self.admpassword)
         # rollback relation insertion
         repo.execute(cnxid, "SET U in_group G WHERE U login 'admin', G name 'guests'")
         result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'")
@@ -188,7 +175,7 @@
 
     def test_transaction_base3(self):
         repo = self.repo
-        cnxid = repo.connect(*self.default_user_password())
+        cnxid = repo.connect(self.admlogin, self.admpassword)
         # rollback state change which trigger TrInfo insertion
         ueid = repo._get_session(cnxid).user.eid
         rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid})
@@ -206,7 +193,7 @@
 
     def test_close_wait_processing_request(self):
         repo = self.repo
-        cnxid = repo.connect(*self.default_user_password())
+        cnxid = repo.connect(self.admlogin, self.admpassword)
         repo.execute(cnxid, 'INSERT CWUser X: X login "toto", X upassword "tutu", X in_group G WHERE G name "users"')
         repo.commit(cnxid)
         # close has to be in the thread due to sqlite limitations
@@ -290,7 +277,7 @@
 
     def test_internal_api(self):
         repo = self.repo
-        cnxid = repo.connect(*self.default_user_password())
+        cnxid = repo.connect(self.admlogin, self.admpassword)
         session = repo._get_session(cnxid, setpool=True)
         self.assertEquals(repo.type_and_source_from_eid(1, session),
                           ('CWGroup', 'system', None))
@@ -308,7 +295,7 @@
 
     def test_session_api(self):
         repo = self.repo
-        cnxid = repo.connect(*self.default_user_password())
+        cnxid = repo.connect(self.admlogin, self.admpassword)
         self.assertEquals(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {}))
         self.assertEquals(repo.describe(cnxid, 1), (u'CWGroup', u'system', None))
         repo.close(cnxid)
@@ -317,7 +304,7 @@
 
     def test_shared_data_api(self):
         repo = self.repo
-        cnxid = repo.connect(*self.default_user_password())
+        cnxid = repo.connect(self.admlogin, self.admpassword)
         self.assertEquals(repo.get_shared_data(cnxid, 'data'), None)
         repo.set_shared_data(cnxid, 'data', 4)
         self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
@@ -343,37 +330,34 @@
 #         print 'test time: %.3f (time) %.3f (cpu)' % ((time() - t), clock() - c)
 
 
-class DataHelpersTC(RepositoryBasedTC):
-
-    def setUp(self):
-        """ called before each test from this class """
-        cnxid = self.repo.connect(*self.default_user_password())
-        self.session = self.repo._sessions[cnxid]
-        self.session.set_pool()
-
-    def tearDown(self):
-        self.session.rollback()
+class DataHelpersTC(CubicWebTC):
 
     def test_create_eid(self):
+        self.session.set_pool()
         self.assert_(self.repo.system_source.create_eid(self.session))
 
     def test_source_from_eid(self):
+        self.session.set_pool()
         self.assertEquals(self.repo.source_from_eid(1, self.session),
                           self.repo.sources_by_uri['system'])
 
     def test_source_from_eid_raise(self):
+        self.session.set_pool()
         self.assertRaises(UnknownEid, self.repo.source_from_eid, -2, self.session)
 
     def test_type_from_eid(self):
+        self.session.set_pool()
         self.assertEquals(self.repo.type_from_eid(1, self.session), 'CWGroup')
 
     def test_type_from_eid_raise(self):
+        self.session.set_pool()
         self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, self.session)
 
     def test_add_delete_info(self):
         entity = self.repo.vreg['etypes'].etype_class('Personne')(self.session)
         entity.eid = -1
         entity.complete = lambda x: None
+        self.session.set_pool()
         self.repo.add_info(self.session, entity, self.repo.sources_by_uri['system'])
         cu = self.session.system_sql('SELECT * FROM entities WHERE eid = -1')
         data = cu.fetchall()
@@ -388,13 +372,14 @@
         self.assertEquals(data, [])
 
 
-class FTITC(RepositoryBasedTC):
+class FTITC(CubicWebTC):
 
     def test_reindex_and_modified_since(self):
         eidp = self.execute('INSERT Personne X: X nom "toto", X prenom "tutu"')[0][0]
         self.commit()
         ts = datetime.now()
         self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1)
+        self.session.set_pool()
         cu = self.session.system_sql('SELECT mtime, eid FROM entities WHERE eid = %s' % eidp)
         omtime = cu.fetchone()[0]
         # our sqlite datetime adapter is ignore seconds fraction, so we have to
@@ -403,6 +388,7 @@
         self.execute('SET X nom "tata" WHERE X eid %(x)s', {'x': eidp}, 'x')
         self.commit()
         self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1)
+        self.session.set_pool()
         cu = self.session.system_sql('SELECT mtime FROM entities WHERE eid = %s' % eidp)
         mtime = cu.fetchone()[0]
         self.failUnless(omtime < mtime)
@@ -440,7 +426,7 @@
         self.assertEquals(rset.rows, [[self.session.user.eid]])
 
 
-class DBInitTC(RepositoryBasedTC):
+class DBInitTC(CubicWebTC):
 
     def test_versions_inserted(self):
         inserted = [r[0] for r in self.execute('Any K ORDERBY K WHERE P pkey K, P pkey ~= "system.version.%"')]
@@ -450,11 +436,11 @@
                            u'system.version.file', u'system.version.folder',
                            u'system.version.tag'])
 
-class InlineRelHooksTC(RepositoryBasedTC):
+class InlineRelHooksTC(CubicWebTC):
     """test relation hooks are called for inlined relations
     """
     def setUp(self):
-        RepositoryBasedTC.setUp(self)
+        CubicWebTC.setUp(self)
         self.hm = self.repo.hm
         self.called = []