server/test/unittest_repository.py
changeset 1808 aa09e20dd8c0
parent 1787 71c143c0ada3
child 1880 293fe4b49e28
--- a/server/test/unittest_repository.py	Tue May 05 17:18:49 2009 +0200
+++ b/server/test/unittest_repository.py	Thu May 14 12:48:11 2009 +0200
@@ -6,8 +6,7 @@
 import threading
 import time
 from copy import deepcopy
-
-from mx.DateTime import DateTimeType, now
+from datetime import datetime
 
 from logilab.common.testlib import TestCase, unittest_main
 
@@ -18,7 +17,7 @@
 from cubicweb.dbapi import connect, repo_connect
 from cubicweb.devtools.apptest import RepositoryBasedTC
 from cubicweb.devtools.repotest import tuplify
-from cubicweb.server import repository 
+from cubicweb.server import repository
 from cubicweb.server.sqlutils import SQL_PREFIX
 
 
@@ -30,10 +29,10 @@
     """ singleton providing access to a persistent storage for entities
     and relation
     """
-    
+
 #     def setUp(self):
 #         pass
-    
+
 #     def tearDown(self):
 #         self.repo.config.db_perms = True
 #         cnxid = self.repo.connect(*self.default_user_password())
@@ -47,7 +46,7 @@
         self.repo.config._cubes = None # avoid assertion error
         self.repo.fill_schema()
         pool = self.repo._get_pool()
-        table = SQL_PREFIX + 'EEType'
+        table = SQL_PREFIX + 'CWEType'
         namecol = SQL_PREFIX + 'name'
         finalcol = SQL_PREFIX + 'final'
         try:
@@ -65,17 +64,17 @@
                                                      (u'String',), (u'Time',)])
         finally:
             self.repo._free_pool(pool)
-            
+
     def test_schema_has_owner(self):
         repo = self.repo
         cnxid = repo.connect(*self.default_user_password())
-        self.failIf(repo.execute(cnxid, 'EEType X WHERE NOT X owned_by U'))
-        self.failIf(repo.execute(cnxid, 'ERType X WHERE NOT X owned_by U'))
-        self.failIf(repo.execute(cnxid, 'EFRDef X WHERE NOT X owned_by U'))
-        self.failIf(repo.execute(cnxid, 'ENFRDef X WHERE NOT X owned_by U'))
-        self.failIf(repo.execute(cnxid, 'EConstraint X WHERE NOT X owned_by U'))
-        self.failIf(repo.execute(cnxid, 'EConstraintType X WHERE NOT X owned_by U'))
-        
+        self.failIf(repo.execute(cnxid, 'CWEType X WHERE NOT X owned_by U'))
+        self.failIf(repo.execute(cnxid, 'CWRType X WHERE NOT X owned_by U'))
+        self.failIf(repo.execute(cnxid, 'CWAttribute X WHERE NOT X owned_by U'))
+        self.failIf(repo.execute(cnxid, 'CWRelation X WHERE NOT X owned_by U'))
+        self.failIf(repo.execute(cnxid, 'CWConstraint X WHERE NOT X owned_by U'))
+        self.failIf(repo.execute(cnxid, 'CWConstraintType X WHERE NOT X owned_by U'))
+
     def test_connect(self):
         login, passwd = self.default_user_password()
         self.assert_(self.repo.connect(login, passwd))
@@ -85,7 +84,7 @@
                           self.repo.connect, login, None)
         self.assertRaises(AuthenticationError,
                           self.repo.connect, None, None)
-    
+
     def test_execute(self):
         repo = self.repo
         cnxid = repo.connect(*self.default_user_password())
@@ -94,36 +93,37 @@
         repo.execute(cnxid, 'Any X where X is Personne, X nom ~= "to"')
         repo.execute(cnxid, 'Any X WHERE X has_text %(text)s', {'text': u'\xe7a'})
         repo.close(cnxid)
-        
+
     def test_login_upassword_accent(self):
         repo = self.repo
         cnxid = repo.connect(*self.default_user_password())
-        repo.execute(cnxid, 'INSERT EUser X: X login %(login)s, X upassword %(passwd)s, X in_state S, X in_group G WHERE S name "activated", G name "users"',
+        repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_state S, X in_group G WHERE S name "activated", G name "users"',
                      {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')})
         repo.commit(cnxid)
         repo.close(cnxid)
         self.assert_(repo.connect(u"barnabé", u"héhéhé".encode('UTF8')))
-    
+
     def test_invalid_entity_rollback(self):
         repo = self.repo
         cnxid = repo.connect(*self.default_user_password())
-        repo.execute(cnxid, 'INSERT EUser X: X login %(login)s, X upassword %(passwd)s, X in_state S WHERE S name "activated"',
+        # no group
+        repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_state S WHERE S name "activated"',
                      {'login': u"tutetute", 'passwd': 'tutetute'})
         self.assertRaises(ValidationError, repo.commit, cnxid)
-        rset = repo.execute(cnxid, 'EUser X WHERE X login "tutetute"')
+        rset = repo.execute(cnxid, 'CWUser X WHERE X login "tutetute"')
         self.assertEquals(rset.rowcount, 0)
-        
+
     def test_close(self):
         repo = self.repo
         cnxid = repo.connect(*self.default_user_password())
         self.assert_(cnxid)
         repo.close(cnxid)
         self.assertRaises(BadConnectionId, repo.execute, cnxid, 'Any X')
-    
+
     def test_invalid_cnxid(self):
         self.assertRaises(BadConnectionId, self.repo.execute, 0, 'Any X')
         self.assertRaises(BadConnectionId, self.repo.close, None)
-    
+
     def test_shared_data(self):
         repo = self.repo
         cnxid = repo.connect(*self.default_user_password())
@@ -180,7 +180,7 @@
         repo.rollback(cnxid)
         result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'")
         self.assertEquals(result.rowcount, 0, result.rows)
-        
+
     def test_transaction_base3(self):
         repo = self.repo
         cnxid = repo.connect(*self.default_user_password())
@@ -195,26 +195,26 @@
         repo.rollback(cnxid)
         rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid})
         self.assertEquals(len(rset), 1)
-        
+
     def test_transaction_interleaved(self):
         self.skip('implement me')
 
     def test_initial_schema(self):
         schema = self.repo.schema
         # check order of attributes is respected
-        self.assertListEquals([r.type for r in schema.eschema('EFRDef').ordered_relations()
-                               if not r.type in ('eid', 'is', 'is_instance_of', 'identity', 
+        self.assertListEquals([r.type for r in schema.eschema('CWAttribute').ordered_relations()
+                               if not r.type in ('eid', 'is', 'is_instance_of', 'identity',
                                                  'creation_date', 'modification_date',
                                                  'owned_by', 'created_by')],
                               ['relation_type', 'from_entity', 'to_entity', 'constrained_by',
-                               'cardinality', 'ordernum', 
+                               'cardinality', 'ordernum',
                                'indexed', 'fulltextindexed', 'internationalizable',
                                'defaultval', 'description_format', 'description'])
 
-        self.assertEquals(schema.eschema('EEType').main_attribute(), 'name')
+        self.assertEquals(schema.eschema('CWEType').main_attribute(), 'name')
         self.assertEquals(schema.eschema('State').main_attribute(), 'name')
 
-        constraints = schema.rschema('name').rproperty('EEType', 'String', 'constraints')
+        constraints = schema.rschema('name').rproperty('CWEType', 'String', 'constraints')
         self.assertEquals(len(constraints), 2)
         for cstr in constraints[:]:
             if isinstance(cstr, UniqueConstraint):
@@ -226,14 +226,14 @@
         self.assertEquals(sizeconstraint.min, None)
         self.assertEquals(sizeconstraint.max, 64)
 
-        constraints = schema.rschema('relation_type').rproperty('EFRDef', 'ERType', 'constraints')
+        constraints = schema.rschema('relation_type').rproperty('CWAttribute', 'CWRType', 'constraints')
         self.assertEquals(len(constraints), 1)
         cstr = constraints[0]
         self.assert_(isinstance(cstr, RQLConstraint))
         self.assertEquals(cstr.restriction, 'O final TRUE')
 
         ownedby = schema.rschema('owned_by')
-        self.assertEquals(ownedby.objects('EEType'), ('EUser',))
+        self.assertEquals(ownedby.objects('CWEType'), ('CWUser',))
 
     def test_pyro(self):
         import Pyro
@@ -255,21 +255,21 @@
             t.join()
         finally:
             repository.pyro_unregister(self.repo.config)
-            
+
     def _pyro_client(self, lock):
         cnx = connect(self.repo.config.appid, u'admin', 'gingkow')
         # check we can get the schema
         schema = cnx.get_schema()
         self.assertEquals(schema.__hashmode__, None)
         rset = cnx.cursor().execute('Any U,G WHERE U in_group G')
-        
+
 
     def test_internal_api(self):
         repo = self.repo
         cnxid = repo.connect(*self.default_user_password())
         session = repo._get_session(cnxid, setpool=True)
-        self.assertEquals(repo.type_and_source_from_eid(1, session), ('EGroup', 'system', None))
-        self.assertEquals(repo.type_from_eid(1, session), 'EGroup')
+        self.assertEquals(repo.type_and_source_from_eid(1, session), ('CWGroup', 'system', None))
+        self.assertEquals(repo.type_from_eid(1, session), 'CWGroup')
         self.assertEquals(repo.source_from_eid(1, session).uri, 'system')
         self.assertEquals(repo.eid2extid(repo.system_source, 1, session), None)
         class dummysource: uri = 'toto'
@@ -279,13 +279,13 @@
         self.assertEquals(self.repo.get_schema(), self.repo.schema)
         self.assertEquals(self.repo.source_defs(), {'system': {'adapter': 'native', 'uri': 'system'}})
         # .properties() return a result set
-        self.assertEquals(self.repo.properties().rql, 'Any K,V WHERE P is EProperty,P pkey K, P value V, NOT P for_user U')
+        self.assertEquals(self.repo.properties().rql, 'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U')
 
     def test_session_api(self):
         repo = self.repo
         cnxid = repo.connect(*self.default_user_password())
         self.assertEquals(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {}))
-        self.assertEquals(repo.describe(cnxid, 1), (u'EGroup', u'system', None))
+        self.assertEquals(repo.describe(cnxid, 1), (u'CWGroup', u'system', None))
         repo.close(cnxid)
         self.assertRaises(BadConnectionId, repo.user_info, cnxid)
         self.assertRaises(BadConnectionId, repo.describe, cnxid, 1)
@@ -302,10 +302,10 @@
         repo.close(cnxid)
         self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 0)
         self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data')
-        
+
 
 class DataHelpersTC(RepositoryBasedTC):
-    
+
     def setUp(self):
         """ called before each test from this class """
         cnxid = self.repo.connect(*self.default_user_password())
@@ -314,7 +314,7 @@
 
     def tearDown(self):
         self.session.rollback()
-        
+
     def test_create_eid(self):
         self.assert_(self.repo.system_source.create_eid(self.session))
 
@@ -326,11 +326,11 @@
         self.assertRaises(UnknownEid, self.repo.source_from_eid, -2, self.session)
 
     def test_type_from_eid(self):
-        self.assertEquals(self.repo.type_from_eid(1, self.session), 'EGroup')
-        
+        self.assertEquals(self.repo.type_from_eid(1, self.session), 'CWGroup')
+
     def test_type_from_eid_raise(self):
         self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, self.session)
-        
+
     def test_add_delete_info(self):
         entity = self.repo.vreg.etype_class('Personne')(self.session, None, None)
         entity.eid = -1
@@ -339,7 +339,7 @@
         cursor = self.session.pool['system']
         cursor.execute('SELECT * FROM entities WHERE eid = -1')
         data = cursor.fetchall()
-        self.assertIsInstance(data[0][3], DateTimeType)
+        self.assertIsInstance(data[0][3], datetime)
         data[0] = list(data[0])
         data[0][3] = None
         self.assertEquals(tuplify(data), [(-1, 'Personne', 'system', None, None)])
@@ -351,12 +351,12 @@
 
 
 class FTITC(RepositoryBasedTC):
-    
+
     def test_reindex_and_modified_since(self):
         cursor = self.session.pool['system']
         eidp = self.execute('INSERT Personne X: X nom "toto", X prenom "tutu"')[0][0]
         self.commit()
-        ts = now()
+        ts = datetime.now()
         self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1)
         cursor.execute('SELECT mtime, eid FROM entities WHERE eid = %s' % eidp)
         omtime = cursor.fetchone()[0]
@@ -401,18 +401,18 @@
         self.commit()
         rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'})
         self.assertEquals(rset.rows, [[self.session.user.eid]])
-        
-        
+
+
 class DBInitTC(RepositoryBasedTC):
-    
+
     def test_versions_inserted(self):
         inserted = [r[0] for r in self.execute('Any K ORDERBY K WHERE P pkey K, P pkey ~= "system.version.%"')]
         self.assertEquals(inserted,
-                          [u'system.version.basket', u'system.version.comment', 
-                           u'system.version.cubicweb', u'system.version.email', 
-                           u'system.version.file', u'system.version.folder', 
+                          [u'system.version.basket', u'system.version.card', u'system.version.comment',
+                           u'system.version.cubicweb', u'system.version.email',
+                           u'system.version.file', u'system.version.folder',
                            u'system.version.tag'])
-        
+
 class InlineRelHooksTC(RepositoryBasedTC):
     """test relation hooks are called for inlined relations
     """
@@ -420,13 +420,13 @@
         RepositoryBasedTC.setUp(self)
         self.hm = self.repo.hm
         self.called = []
-    
+
     def _before_relation_hook(self, pool, fromeid, rtype, toeid):
         self.called.append((fromeid, rtype, toeid))
 
     def _after_relation_hook(self, pool, fromeid, rtype, toeid):
         self.called.append((fromeid, rtype, toeid))
-        
+
     def test_before_add_inline_relation(self):
         """make sure before_<event>_relation hooks are called directly"""
         self.hm.register_hook(self._before_relation_hook,
@@ -435,7 +435,7 @@
         eidn = self.execute('INSERT Note X: X type "T"')[0][0]
         self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"')
         self.assertEquals(self.called, [(eidn, 'ecrit_par', eidp)])
-        
+
     def test_after_add_inline_relation(self):
         """make sure after_<event>_relation hooks are deferred"""
         self.hm.register_hook(self._after_relation_hook,
@@ -445,15 +445,15 @@
         self.assertEquals(self.called, [])
         self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"')
         self.assertEquals(self.called, [(eidn, 'ecrit_par', eidp,)])
-        
+
     def test_after_add_inline(self):
         """make sure after_<event>_relation hooks are deferred"""
         self.hm.register_hook(self._after_relation_hook,
                              'after_add_relation', 'in_state')
-        eidp = self.execute('INSERT EUser X: X login "toto", X upassword "tutu", X in_state S WHERE S name "activated"')[0][0]
+        eidp = self.execute('INSERT CWUser X: X login "toto", X upassword "tutu", X in_state S WHERE S name "activated"')[0][0]
         eids = self.execute('State X WHERE X name "activated"')[0][0]
         self.assertEquals(self.called, [(eidp, 'in_state', eids,)])
-    
+
     def test_before_delete_inline_relation(self):
         """make sure before_<event>_relation hooks are called directly"""
         self.hm.register_hook(self._before_relation_hook,
@@ -478,6 +478,6 @@
         self.execute('DELETE N ecrit_par Y WHERE N type "T", Y nom "toto"')
         self.assertEquals(self.called, [(eidn, 'ecrit_par', eidp,)])
 
-    
+
 if __name__ == '__main__':
     unittest_main()