server/test/unittest_repository.py
changeset 6366 1806148d6ce8
parent 6279 42079f752a9c
parent 6364 ad9ed9803eb6
child 6401 d7f5d873e1b8
--- a/server/test/unittest_repository.py	Thu Sep 23 23:28:58 2010 +0200
+++ b/server/test/unittest_repository.py	Wed Sep 29 16:16:32 2010 +0200
@@ -32,7 +32,7 @@
 from yams.constraints import UniqueConstraint
 
 from cubicweb import (BadConnectionId, RepositoryError, ValidationError,
-                      UnknownEid, AuthenticationError)
+                      UnknownEid, AuthenticationError, Unauthorized)
 from cubicweb.selectors import is_instance
 from cubicweb.schema import CubicWebSchema, RQLConstraint
 from cubicweb.dbapi import connect, multiple_connections_unfix
@@ -65,10 +65,10 @@
             self.session.set_pool()
             cu = self.session.system_sql('SELECT %s FROM %s WHERE %s is NULL' % (
                 namecol, table, finalcol))
-            self.assertEquals(cu.fetchall(), [])
+            self.assertEqual(cu.fetchall(), [])
             cu = self.session.system_sql('SELECT %s FROM %s WHERE %s=%%(final)s ORDER BY %s'
                                          % (namecol, table, finalcol, namecol), {'final': 'TRUE'})
-            self.assertEquals(cu.fetchall(), [(u'Boolean',), (u'Bytes',),
+            self.assertEqual(cu.fetchall(), [(u'Boolean',), (u'Bytes',),
                                               (u'Date',), (u'Datetime',),
                                               (u'Decimal',),(u'Float',),
                                               (u'Int',),
@@ -84,15 +84,15 @@
                    ";")
             cu = self.session.system_sql(sql)
             rows = cu.fetchall()
-            self.assertEquals(len(rows), 3)
+            self.assertEqual(len(rows), 3)
             self.test_unique_together()
         finally:
             self.repo.set_schema(origshema)
 
     def test_unique_together(self):
         person = self.repo.schema.eschema('Personne')
-        self.assertEquals(len(person._unique_together), 1)
-        self.assertUnorderedIterableEquals(person._unique_together[0],
+        self.assertEqual(len(person._unique_together), 1)
+        self.assertItemsEqual(person._unique_together[0],
                                            ('nom', 'prenom', 'inline2'))
 
     def test_schema_has_owner(self):
@@ -136,15 +136,39 @@
         repo.close(cnxid)
         self.assert_(repo.connect(u"barnabé", password=u"héhéhé".encode('UTF8')))
 
-    def test_invalid_entity_rollback(self):
+    def test_rollback_on_commit_error(self):
         cnxid = self.repo.connect(self.admlogin, password=self.admpassword)
-        # no group
         self.repo.execute(cnxid,
                           'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s',
                           {'login': u"tutetute", 'passwd': 'tutetute'})
         self.assertRaises(ValidationError, self.repo.commit, cnxid)
         self.failIf(self.repo.execute(cnxid, 'CWUser X WHERE X login "tutetute"'))
 
+    def test_rollback_on_execute_validation_error(self):
+        class ValidationErrorAfterHook(Hook):
+            __regid__ = 'valerror-after-hook'
+            __select__ = Hook.__select__ & is_instance('CWGroup')
+            events = ('after_update_entity',)
+            def __call__(self):
+                raise ValidationError(self.entity.eid, {})
+        with self.temporary_appobjects(ValidationErrorAfterHook):
+            self.assertRaises(ValidationError,
+                              self.execute, 'SET X name "toto" WHERE X is CWGroup, X name "guests"')
+            self.failIf(self.execute('Any X WHERE X is CWGroup, X name "toto"'))
+
+    def test_rollback_on_execute_unauthorized(self):
+        class UnauthorizedAfterHook(Hook):
+            __regid__ = 'unauthorized-after-hook'
+            __select__ = Hook.__select__ & is_instance('CWGroup')
+            events = ('after_update_entity',)
+            def __call__(self):
+                raise Unauthorized()
+        with self.temporary_appobjects(UnauthorizedAfterHook):
+            self.assertRaises(Unauthorized,
+                              self.execute, 'SET X name "toto" WHERE X is CWGroup, X name "guests"')
+            self.failIf(self.execute('Any X WHERE X is CWGroup, X name "toto"'))
+
+
     def test_close(self):
         repo = self.repo
         cnxid = repo.connect(self.admlogin, password=self.admpassword)
@@ -161,14 +185,14 @@
         cnxid = repo.connect(self.admlogin, password=self.admpassword)
         repo.set_shared_data(cnxid, 'data', 4)
         cnxid2 = repo.connect(self.admlogin, password=self.admpassword)
-        self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
-        self.assertEquals(repo.get_shared_data(cnxid2, 'data'), None)
+        self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4)
+        self.assertEqual(repo.get_shared_data(cnxid2, 'data'), None)
         repo.set_shared_data(cnxid2, 'data', 5)
-        self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
-        self.assertEquals(repo.get_shared_data(cnxid2, 'data'), 5)
+        self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4)
+        self.assertEqual(repo.get_shared_data(cnxid2, 'data'), 5)
         repo.get_shared_data(cnxid2, 'data', pop=True)
-        self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
-        self.assertEquals(repo.get_shared_data(cnxid2, 'data'), None)
+        self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4)
+        self.assertEqual(repo.get_shared_data(cnxid2, 'data'), None)
         repo.close(cnxid)
         repo.close(cnxid2)
         self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data')
@@ -188,19 +212,19 @@
         cnxid = repo.connect(self.admlogin, password=self.admpassword)
         # check db state
         result = repo.execute(cnxid, 'Personne X')
-        self.assertEquals(result.rowcount, 0)
+        self.assertEqual(result.rowcount, 0)
         # rollback entity insertion
         repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'")
         result = repo.execute(cnxid, 'Personne X')
-        self.assertEquals(result.rowcount, 1)
+        self.assertEqual(result.rowcount, 1)
         repo.rollback(cnxid)
         result = repo.execute(cnxid, 'Personne X')
-        self.assertEquals(result.rowcount, 0, result.rows)
+        self.assertEqual(result.rowcount, 0, result.rows)
         # commit
         repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'")
         repo.commit(cnxid)
         result = repo.execute(cnxid, 'Personne X')
-        self.assertEquals(result.rowcount, 1)
+        self.assertEqual(result.rowcount, 1)
 
     def test_transaction_base2(self):
         repo = self.repo
@@ -208,10 +232,10 @@
         # rollback relation insertion
         repo.execute(cnxid, "SET U in_group G WHERE U login 'admin', G name 'guests'")
         result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'")
-        self.assertEquals(result.rowcount, 1)
+        self.assertEqual(result.rowcount, 1)
         repo.rollback(cnxid)
         result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'")
-        self.assertEquals(result.rowcount, 0, result.rows)
+        self.assertEqual(result.rowcount, 0, result.rows)
 
     def test_transaction_base3(self):
         repo = self.repo
@@ -222,13 +246,13 @@
         user = session.user
         user.cw_adapt_to('IWorkflowable').fire_transition('deactivate')
         rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid})
-        self.assertEquals(len(rset), 1)
+        self.assertEqual(len(rset), 1)
         repo.rollback(cnxid)
         rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid})
-        self.assertEquals(len(rset), 0)
+        self.assertEqual(len(rset), 0)
 
     def test_transaction_interleaved(self):
-        self.skip('implement me')
+        self.skipTest('implement me')
 
     def test_close_kill_processing_request(self):
         repo = self.repo
@@ -246,14 +270,14 @@
             repo.commit(cnxid)
         try:
             ex = self.assertRaises(Exception, run_transaction)
-            self.assertEquals(str(ex), 'try to access pool on a closed session')
+            self.assertEqual(str(ex), 'try to access pool on a closed session')
         finally:
             t.join()
 
     def test_initial_schema(self):
         schema = self.repo.schema
         # check order of attributes is respected
-        self.assertListEquals([r.type for r in schema.eschema('CWAttribute').ordered_relations()
+        self.assertListEqual([r.type for r in schema.eschema('CWAttribute').ordered_relations()
                                if not r.type in ('eid', 'is', 'is_instance_of', 'identity',
                                                  'creation_date', 'modification_date', 'cwuri',
                                                  'owned_by', 'created_by',
@@ -266,11 +290,11 @@
                                'indexed', 'fulltextindexed', 'internationalizable',
                                'defaultval', 'description', 'description_format'])
 
-        self.assertEquals(schema.eschema('CWEType').main_attribute(), 'name')
-        self.assertEquals(schema.eschema('State').main_attribute(), 'name')
+        self.assertEqual(schema.eschema('CWEType').main_attribute(), 'name')
+        self.assertEqual(schema.eschema('State').main_attribute(), 'name')
 
         constraints = schema.rschema('name').rdef('CWEType', 'String').constraints
-        self.assertEquals(len(constraints), 2)
+        self.assertEqual(len(constraints), 2)
         for cstr in constraints[:]:
             if isinstance(cstr, UniqueConstraint):
                 constraints.remove(cstr)
@@ -278,17 +302,17 @@
         else:
             self.fail('unique constraint not found')
         sizeconstraint = constraints[0]
-        self.assertEquals(sizeconstraint.min, None)
-        self.assertEquals(sizeconstraint.max, 64)
+        self.assertEqual(sizeconstraint.min, None)
+        self.assertEqual(sizeconstraint.max, 64)
 
         constraints = schema.rschema('relation_type').rdef('CWAttribute', 'CWRType').constraints
-        self.assertEquals(len(constraints), 1)
+        self.assertEqual(len(constraints), 1)
         cstr = constraints[0]
         self.assert_(isinstance(cstr, RQLConstraint))
-        self.assertEquals(cstr.restriction, 'O final TRUE')
+        self.assertEqual(cstr.restriction, 'O final TRUE')
 
         ownedby = schema.rschema('owned_by')
-        self.assertEquals(ownedby.objects('CWEType'), ('CWUser',))
+        self.assertEqual(ownedby.objects('CWEType'), ('CWUser',))
 
     def test_pyro(self):
         import Pyro
@@ -319,7 +343,7 @@
             schema = cnx.get_schema()
             self.failUnless(cnx.vreg)
             self.failUnless('etypes'in cnx.vreg)
-            self.assertEquals(schema.__hashmode__, None)
+            self.assertEqual(schema.__hashmode__, None)
             cu = cnx.cursor()
             rset = cu.execute('Any U,G WHERE U in_group G')
             user = iter(rset.entities()).next()
@@ -337,25 +361,25 @@
         repo = self.repo
         cnxid = repo.connect(self.admlogin, password=self.admpassword)
         session = repo._get_session(cnxid, setpool=True)
-        self.assertEquals(repo.type_and_source_from_eid(1, session),
+        self.assertEqual(repo.type_and_source_from_eid(1, session),
                           ('CWGroup', 'system', None))
-        self.assertEquals(repo.type_from_eid(1, session), 'CWGroup')
-        self.assertEquals(repo.source_from_eid(1, session).uri, 'system')
-        self.assertEquals(repo.eid2extid(repo.system_source, 1, session), None)
+        self.assertEqual(repo.type_from_eid(1, session), 'CWGroup')
+        self.assertEqual(repo.source_from_eid(1, session).uri, 'system')
+        self.assertEqual(repo.eid2extid(repo.system_source, 1, session), None)
         class dummysource: uri = 'toto'
         self.assertRaises(UnknownEid, repo.eid2extid, dummysource, 1, session)
 
     def test_public_api(self):
-        self.assertEquals(self.repo.get_schema(), self.repo.schema)
-        self.assertEquals(self.repo.source_defs(), {'system': {'adapter': 'native', 'uri': 'system'}})
+        self.assertEqual(self.repo.get_schema(), self.repo.schema)
+        self.assertEqual(self.repo.source_defs(), {'system': {'adapter': 'native', 'uri': 'system'}})
         # .properties() return a result set
-        self.assertEquals(self.repo.properties().rql, 'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U')
+        self.assertEqual(self.repo.properties().rql, 'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U')
 
     def test_session_api(self):
         repo = self.repo
         cnxid = repo.connect(self.admlogin, password=self.admpassword)
-        self.assertEquals(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {}))
-        self.assertEquals(repo.describe(cnxid, 1), (u'CWGroup', u'system', None))
+        self.assertEqual(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {}))
+        self.assertEqual(repo.describe(cnxid, 1), (u'CWGroup', u'system', None))
         repo.close(cnxid)
         self.assertRaises(BadConnectionId, repo.user_info, cnxid)
         self.assertRaises(BadConnectionId, repo.describe, cnxid, 1)
@@ -363,12 +387,12 @@
     def test_shared_data_api(self):
         repo = self.repo
         cnxid = repo.connect(self.admlogin, password=self.admpassword)
-        self.assertEquals(repo.get_shared_data(cnxid, 'data'), None)
+        self.assertEqual(repo.get_shared_data(cnxid, 'data'), None)
         repo.set_shared_data(cnxid, 'data', 4)
-        self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
+        self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4)
         repo.get_shared_data(cnxid, 'data', pop=True)
         repo.get_shared_data(cnxid, 'whatever', pop=True)
-        self.assertEquals(repo.get_shared_data(cnxid, 'data'), None)
+        self.assertEqual(repo.get_shared_data(cnxid, 'data'), None)
         repo.close(cnxid)
         self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 0)
         self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data')
@@ -394,14 +418,14 @@
                      {'x': note.eid, 'p': p1.eid})
         rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s',
                             {'x': note.eid})
-        self.assertEquals(len(rset), 1)
+        self.assertEqual(len(rset), 1)
         p2 = self.request().create_entity('Personne', nom=u'tutu')
         self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s',
                      {'x': note.eid, 'p': p2.eid})
         rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s',
                             {'x': note.eid})
-        self.assertEquals(len(rset), 1)
-        self.assertEquals(rset.rows[0][0], p2.eid)
+        self.assertEqual(len(rset), 1)
+        self.assertEqual(rset.rows[0][0], p2.eid)
 
     def test_delete_if_object_inlined_singlecard(self):
         req = self.request()
@@ -409,7 +433,7 @@
         req.create_entity('Personne', nom=u'Vincent', fiche=c)
         req.create_entity('Personne', nom=u'Florent', fiche=c)
         self.commit()
-        self.assertEquals(len(c.reverse_fiche), 1)
+        self.assertEqual(len(c.reverse_fiche), 1)
 
     def test_set_attributes_in_before_update(self):
         # local hook
@@ -430,7 +454,7 @@
             addr.set_attributes(address=u'a@b.com')
             rset = self.execute('Any A,AA WHERE X eid %(x)s, X address A, X alias AA',
                                 {'x': addr.eid})
-            self.assertEquals(rset.rows, [[u'a@b.com', u'foo']])
+            self.assertEqual(rset.rows, [[u'a@b.com', u'foo']])
 
     def test_set_attributes_in_before_add(self):
         # local hook
@@ -477,7 +501,7 @@
 
     def test_source_from_eid(self):
         self.session.set_pool()
-        self.assertEquals(self.repo.source_from_eid(1, self.session),
+        self.assertEqual(self.repo.source_from_eid(1, self.session),
                           self.repo.sources_by_uri['system'])
 
     def test_source_from_eid_raise(self):
@@ -486,7 +510,7 @@
 
     def test_type_from_eid(self):
         self.session.set_pool()
-        self.assertEquals(self.repo.type_from_eid(1, self.session), 'CWGroup')
+        self.assertEqual(self.repo.type_from_eid(1, self.session), 'CWGroup')
 
     def test_type_from_eid_raise(self):
         self.session.set_pool()
@@ -503,12 +527,12 @@
         self.assertIsInstance(data[0][3], datetime)
         data[0] = list(data[0])
         data[0][3] = None
-        self.assertEquals(tuplify(data), [(-1, 'Personne', 'system', None, None)])
+        self.assertEqual(tuplify(data), [(-1, 'Personne', 'system', None, None)])
         self.repo.delete_info(self.session, entity, 'system', None)
         #self.repo.commit()
         cu = self.session.system_sql('SELECT * FROM entities WHERE eid = -1')
         data = cu.fetchall()
-        self.assertEquals(data, [])
+        self.assertEqual(data, [])
 
 
 class FTITC(CubicWebTC):
@@ -518,7 +542,7 @@
         eidp = self.execute('INSERT Personne X: X nom "toto", X prenom "tutu"')[0][0]
         self.commit()
         ts = datetime.now()
-        self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1)
+        self.assertEqual(len(self.execute('Personne X WHERE X has_text "tutu"')), 1)
         self.session.set_pool()
         cu = self.session.system_sql('SELECT mtime, eid FROM entities WHERE eid = %s' % eidp)
         omtime = cu.fetchone()[0]
@@ -527,23 +551,23 @@
         time.sleep(1 - (ts.second - int(ts.second)))
         self.execute('SET X nom "tata" WHERE X eid %(x)s', {'x': eidp})
         self.commit()
-        self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1)
+        self.assertEqual(len(self.execute('Personne X WHERE X has_text "tutu"')), 1)
         self.session.set_pool()
         cu = self.session.system_sql('SELECT mtime FROM entities WHERE eid = %s' % eidp)
         mtime = cu.fetchone()[0]
         self.failUnless(omtime < mtime)
         self.commit()
         date, modified, deleted = self.repo.entities_modified_since(('Personne',), omtime)
-        self.assertEquals(modified, [('Personne', eidp)])
-        self.assertEquals(deleted, [])
+        self.assertEqual(modified, [('Personne', eidp)])
+        self.assertEqual(deleted, [])
         date, modified, deleted = self.repo.entities_modified_since(('Personne',), mtime)
-        self.assertEquals(modified, [])
-        self.assertEquals(deleted, [])
+        self.assertEqual(modified, [])
+        self.assertEqual(deleted, [])
         self.execute('DELETE Personne X WHERE X eid %(x)s', {'x': eidp})
         self.commit()
         date, modified, deleted = self.repo.entities_modified_since(('Personne',), omtime)
-        self.assertEquals(modified, [])
-        self.assertEquals(deleted, [('Personne', eidp)])
+        self.assertEqual(modified, [])
+        self.assertEqual(deleted, [('Personne', eidp)])
 
     def test_fulltext_container_entity(self):
         assert self.schema.rschema('use_email').fulltext_container == 'subject'
@@ -551,27 +575,27 @@
         toto = req.create_entity('EmailAddress', address=u'toto@logilab.fr')
         self.commit()
         rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
-        self.assertEquals(rset.rows, [])
+        self.assertEqual(rset.rows, [])
         req.user.set_relations(use_email=toto)
         self.commit()
         rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
-        self.assertEquals(rset.rows, [[req.user.eid]])
+        self.assertEqual(rset.rows, [[req.user.eid]])
         req.execute('DELETE X use_email Y WHERE X login "admin", Y eid %(y)s',
                     {'y': toto.eid})
         self.commit()
         rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
-        self.assertEquals(rset.rows, [])
+        self.assertEqual(rset.rows, [])
         tutu = req.create_entity('EmailAddress', address=u'tutu@logilab.fr')
         req.user.set_relations(use_email=tutu)
         self.commit()
         rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'})
-        self.assertEquals(rset.rows, [[req.user.eid]])
+        self.assertEqual(rset.rows, [[req.user.eid]])
         tutu.set_attributes(address=u'hip@logilab.fr')
         self.commit()
         rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'})
-        self.assertEquals(rset.rows, [])
+        self.assertEqual(rset.rows, [])
         rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'hip'})
-        self.assertEquals(rset.rows, [[req.user.eid]])
+        self.assertEqual(rset.rows, [[req.user.eid]])
 
     def test_no_uncessary_ftiindex_op(self):
         req = self.request()
@@ -584,7 +608,7 @@
 
     def test_versions_inserted(self):
         inserted = [r[0] for r in self.execute('Any K ORDERBY K WHERE P pkey K, P pkey ~= "system.version.%"')]
-        self.assertEquals(inserted,
+        self.assertEqual(inserted,
                           [u'system.version.basket', u'system.version.card', u'system.version.comment',
                            u'system.version.cubicweb', u'system.version.email',
                            u'system.version.file', u'system.version.folder',
@@ -616,15 +640,15 @@
             eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0]
             eidn = self.execute('INSERT Note X: X type "T"')[0][0]
             self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"')
-            self.assertEquals(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp),
+            self.assertEqual(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp),
                                        ('after_add_relation', eidn, 'ecrit_par', eidp)])
             CALLED[:] = ()
             self.execute('DELETE N ecrit_par Y WHERE N type "T", Y nom "toto"')
-            self.assertEquals(CALLED, [('before_delete_relation', eidn, 'ecrit_par', eidp),
+            self.assertEqual(CALLED, [('before_delete_relation', eidn, 'ecrit_par', eidp),
                                        ('after_delete_relation', eidn, 'ecrit_par', eidp)])
             CALLED[:] = ()
             eidn = self.execute('INSERT Note N: N ecrit_par P WHERE P nom "toto"')[0][0]
-            self.assertEquals(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp),
+            self.assertEqual(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp),
                                        ('after_add_relation', eidn, 'ecrit_par', eidp)])
 
     def test_unique_contraint(self):
@@ -638,7 +662,7 @@
         req = self.request()
         req.create_entity('Note', type=u'todo', inline1=a01)
         ex = self.assertRaises(ValidationError, req.cnx.commit)
-        self.assertEquals(ex.errors, {'inline1-subject': u'RQLUniqueConstraint S type T, S inline1 A1, A1 todo_by C, Y type T, Y inline1 A2, A2 todo_by C failed'})
+        self.assertEqual(ex.errors, {'inline1-subject': u'RQLUniqueConstraint S type T, S inline1 A1, A1 todo_by C, Y type T, Y inline1 A2, A2 todo_by C failed'})
 
 if __name__ == '__main__':
     unittest_main()