diff -r e3994fcc21c3 -r 1806148d6ce8 server/test/unittest_repository.py --- a/server/test/unittest_repository.py Thu Sep 23 23:28:58 2010 +0200 +++ b/server/test/unittest_repository.py Wed Sep 29 16:16:32 2010 +0200 @@ -32,7 +32,7 @@ from yams.constraints import UniqueConstraint from cubicweb import (BadConnectionId, RepositoryError, ValidationError, - UnknownEid, AuthenticationError) + UnknownEid, AuthenticationError, Unauthorized) from cubicweb.selectors import is_instance from cubicweb.schema import CubicWebSchema, RQLConstraint from cubicweb.dbapi import connect, multiple_connections_unfix @@ -65,10 +65,10 @@ self.session.set_pool() cu = self.session.system_sql('SELECT %s FROM %s WHERE %s is NULL' % ( namecol, table, finalcol)) - self.assertEquals(cu.fetchall(), []) + self.assertEqual(cu.fetchall(), []) cu = self.session.system_sql('SELECT %s FROM %s WHERE %s=%%(final)s ORDER BY %s' % (namecol, table, finalcol, namecol), {'final': 'TRUE'}) - self.assertEquals(cu.fetchall(), [(u'Boolean',), (u'Bytes',), + self.assertEqual(cu.fetchall(), [(u'Boolean',), (u'Bytes',), (u'Date',), (u'Datetime',), (u'Decimal',),(u'Float',), (u'Int',), @@ -84,15 +84,15 @@ ";") cu = self.session.system_sql(sql) rows = cu.fetchall() - self.assertEquals(len(rows), 3) + self.assertEqual(len(rows), 3) self.test_unique_together() finally: self.repo.set_schema(origshema) def test_unique_together(self): person = self.repo.schema.eschema('Personne') - self.assertEquals(len(person._unique_together), 1) - self.assertUnorderedIterableEquals(person._unique_together[0], + self.assertEqual(len(person._unique_together), 1) + self.assertItemsEqual(person._unique_together[0], ('nom', 'prenom', 'inline2')) def test_schema_has_owner(self): @@ -136,15 +136,39 @@ repo.close(cnxid) self.assert_(repo.connect(u"barnabé", password=u"héhéhé".encode('UTF8'))) - def test_invalid_entity_rollback(self): + def test_rollback_on_commit_error(self): cnxid = self.repo.connect(self.admlogin, password=self.admpassword) - # no group self.repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s', {'login': u"tutetute", 'passwd': 'tutetute'}) self.assertRaises(ValidationError, self.repo.commit, cnxid) self.failIf(self.repo.execute(cnxid, 'CWUser X WHERE X login "tutetute"')) + def test_rollback_on_execute_validation_error(self): + class ValidationErrorAfterHook(Hook): + __regid__ = 'valerror-after-hook' + __select__ = Hook.__select__ & is_instance('CWGroup') + events = ('after_update_entity',) + def __call__(self): + raise ValidationError(self.entity.eid, {}) + with self.temporary_appobjects(ValidationErrorAfterHook): + self.assertRaises(ValidationError, + self.execute, 'SET X name "toto" WHERE X is CWGroup, X name "guests"') + self.failIf(self.execute('Any X WHERE X is CWGroup, X name "toto"')) + + def test_rollback_on_execute_unauthorized(self): + class UnauthorizedAfterHook(Hook): + __regid__ = 'unauthorized-after-hook' + __select__ = Hook.__select__ & is_instance('CWGroup') + events = ('after_update_entity',) + def __call__(self): + raise Unauthorized() + with self.temporary_appobjects(UnauthorizedAfterHook): + self.assertRaises(Unauthorized, + self.execute, 'SET X name "toto" WHERE X is CWGroup, X name "guests"') + self.failIf(self.execute('Any X WHERE X is CWGroup, X name "toto"')) + + def test_close(self): repo = self.repo cnxid = repo.connect(self.admlogin, password=self.admpassword) @@ -161,14 +185,14 @@ cnxid = repo.connect(self.admlogin, password=self.admpassword) repo.set_shared_data(cnxid, 'data', 4) cnxid2 = repo.connect(self.admlogin, password=self.admpassword) - self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) - self.assertEquals(repo.get_shared_data(cnxid2, 'data'), None) + self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4) + self.assertEqual(repo.get_shared_data(cnxid2, 'data'), None) repo.set_shared_data(cnxid2, 'data', 5) - self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) - self.assertEquals(repo.get_shared_data(cnxid2, 'data'), 5) + self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4) + self.assertEqual(repo.get_shared_data(cnxid2, 'data'), 5) repo.get_shared_data(cnxid2, 'data', pop=True) - self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) - self.assertEquals(repo.get_shared_data(cnxid2, 'data'), None) + self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4) + self.assertEqual(repo.get_shared_data(cnxid2, 'data'), None) repo.close(cnxid) repo.close(cnxid2) self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data') @@ -188,19 +212,19 @@ cnxid = repo.connect(self.admlogin, password=self.admpassword) # check db state result = repo.execute(cnxid, 'Personne X') - self.assertEquals(result.rowcount, 0) + self.assertEqual(result.rowcount, 0) # rollback entity insertion repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'") result = repo.execute(cnxid, 'Personne X') - self.assertEquals(result.rowcount, 1) + self.assertEqual(result.rowcount, 1) repo.rollback(cnxid) result = repo.execute(cnxid, 'Personne X') - self.assertEquals(result.rowcount, 0, result.rows) + self.assertEqual(result.rowcount, 0, result.rows) # commit repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'") repo.commit(cnxid) result = repo.execute(cnxid, 'Personne X') - self.assertEquals(result.rowcount, 1) + self.assertEqual(result.rowcount, 1) def test_transaction_base2(self): repo = self.repo @@ -208,10 +232,10 @@ # rollback relation insertion repo.execute(cnxid, "SET U in_group G WHERE U login 'admin', G name 'guests'") result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'") - self.assertEquals(result.rowcount, 1) + self.assertEqual(result.rowcount, 1) repo.rollback(cnxid) result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'") - self.assertEquals(result.rowcount, 0, result.rows) + self.assertEqual(result.rowcount, 0, result.rows) def test_transaction_base3(self): repo = self.repo @@ -222,13 +246,13 @@ user = session.user user.cw_adapt_to('IWorkflowable').fire_transition('deactivate') rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid}) - self.assertEquals(len(rset), 1) + self.assertEqual(len(rset), 1) repo.rollback(cnxid) rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid}) - self.assertEquals(len(rset), 0) + self.assertEqual(len(rset), 0) def test_transaction_interleaved(self): - self.skip('implement me') + self.skipTest('implement me') def test_close_kill_processing_request(self): repo = self.repo @@ -246,14 +270,14 @@ repo.commit(cnxid) try: ex = self.assertRaises(Exception, run_transaction) - self.assertEquals(str(ex), 'try to access pool on a closed session') + self.assertEqual(str(ex), 'try to access pool on a closed session') finally: t.join() def test_initial_schema(self): schema = self.repo.schema # check order of attributes is respected - self.assertListEquals([r.type for r in schema.eschema('CWAttribute').ordered_relations() + self.assertListEqual([r.type for r in schema.eschema('CWAttribute').ordered_relations() if not r.type in ('eid', 'is', 'is_instance_of', 'identity', 'creation_date', 'modification_date', 'cwuri', 'owned_by', 'created_by', @@ -266,11 +290,11 @@ 'indexed', 'fulltextindexed', 'internationalizable', 'defaultval', 'description', 'description_format']) - self.assertEquals(schema.eschema('CWEType').main_attribute(), 'name') - self.assertEquals(schema.eschema('State').main_attribute(), 'name') + self.assertEqual(schema.eschema('CWEType').main_attribute(), 'name') + self.assertEqual(schema.eschema('State').main_attribute(), 'name') constraints = schema.rschema('name').rdef('CWEType', 'String').constraints - self.assertEquals(len(constraints), 2) + self.assertEqual(len(constraints), 2) for cstr in constraints[:]: if isinstance(cstr, UniqueConstraint): constraints.remove(cstr) @@ -278,17 +302,17 @@ else: self.fail('unique constraint not found') sizeconstraint = constraints[0] - self.assertEquals(sizeconstraint.min, None) - self.assertEquals(sizeconstraint.max, 64) + self.assertEqual(sizeconstraint.min, None) + self.assertEqual(sizeconstraint.max, 64) constraints = schema.rschema('relation_type').rdef('CWAttribute', 'CWRType').constraints - self.assertEquals(len(constraints), 1) + self.assertEqual(len(constraints), 1) cstr = constraints[0] self.assert_(isinstance(cstr, RQLConstraint)) - self.assertEquals(cstr.restriction, 'O final TRUE') + self.assertEqual(cstr.restriction, 'O final TRUE') ownedby = schema.rschema('owned_by') - self.assertEquals(ownedby.objects('CWEType'), ('CWUser',)) + self.assertEqual(ownedby.objects('CWEType'), ('CWUser',)) def test_pyro(self): import Pyro @@ -319,7 +343,7 @@ schema = cnx.get_schema() self.failUnless(cnx.vreg) self.failUnless('etypes'in cnx.vreg) - self.assertEquals(schema.__hashmode__, None) + self.assertEqual(schema.__hashmode__, None) cu = cnx.cursor() rset = cu.execute('Any U,G WHERE U in_group G') user = iter(rset.entities()).next() @@ -337,25 +361,25 @@ repo = self.repo cnxid = repo.connect(self.admlogin, password=self.admpassword) session = repo._get_session(cnxid, setpool=True) - self.assertEquals(repo.type_and_source_from_eid(1, session), + self.assertEqual(repo.type_and_source_from_eid(1, session), ('CWGroup', 'system', None)) - self.assertEquals(repo.type_from_eid(1, session), 'CWGroup') - self.assertEquals(repo.source_from_eid(1, session).uri, 'system') - self.assertEquals(repo.eid2extid(repo.system_source, 1, session), None) + self.assertEqual(repo.type_from_eid(1, session), 'CWGroup') + self.assertEqual(repo.source_from_eid(1, session).uri, 'system') + self.assertEqual(repo.eid2extid(repo.system_source, 1, session), None) class dummysource: uri = 'toto' self.assertRaises(UnknownEid, repo.eid2extid, dummysource, 1, session) def test_public_api(self): - self.assertEquals(self.repo.get_schema(), self.repo.schema) - self.assertEquals(self.repo.source_defs(), {'system': {'adapter': 'native', 'uri': 'system'}}) + self.assertEqual(self.repo.get_schema(), self.repo.schema) + self.assertEqual(self.repo.source_defs(), {'system': {'adapter': 'native', 'uri': 'system'}}) # .properties() return a result set - self.assertEquals(self.repo.properties().rql, 'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U') + self.assertEqual(self.repo.properties().rql, 'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U') def test_session_api(self): repo = self.repo cnxid = repo.connect(self.admlogin, password=self.admpassword) - self.assertEquals(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {})) - self.assertEquals(repo.describe(cnxid, 1), (u'CWGroup', u'system', None)) + self.assertEqual(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {})) + self.assertEqual(repo.describe(cnxid, 1), (u'CWGroup', u'system', None)) repo.close(cnxid) self.assertRaises(BadConnectionId, repo.user_info, cnxid) self.assertRaises(BadConnectionId, repo.describe, cnxid, 1) @@ -363,12 +387,12 @@ def test_shared_data_api(self): repo = self.repo cnxid = repo.connect(self.admlogin, password=self.admpassword) - self.assertEquals(repo.get_shared_data(cnxid, 'data'), None) + self.assertEqual(repo.get_shared_data(cnxid, 'data'), None) repo.set_shared_data(cnxid, 'data', 4) - self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) + self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4) repo.get_shared_data(cnxid, 'data', pop=True) repo.get_shared_data(cnxid, 'whatever', pop=True) - self.assertEquals(repo.get_shared_data(cnxid, 'data'), None) + self.assertEqual(repo.get_shared_data(cnxid, 'data'), None) repo.close(cnxid) self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 0) self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data') @@ -394,14 +418,14 @@ {'x': note.eid, 'p': p1.eid}) rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s', {'x': note.eid}) - self.assertEquals(len(rset), 1) + self.assertEqual(len(rset), 1) p2 = self.request().create_entity('Personne', nom=u'tutu') self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s', {'x': note.eid, 'p': p2.eid}) rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s', {'x': note.eid}) - self.assertEquals(len(rset), 1) - self.assertEquals(rset.rows[0][0], p2.eid) + self.assertEqual(len(rset), 1) + self.assertEqual(rset.rows[0][0], p2.eid) def test_delete_if_object_inlined_singlecard(self): req = self.request() @@ -409,7 +433,7 @@ req.create_entity('Personne', nom=u'Vincent', fiche=c) req.create_entity('Personne', nom=u'Florent', fiche=c) self.commit() - self.assertEquals(len(c.reverse_fiche), 1) + self.assertEqual(len(c.reverse_fiche), 1) def test_set_attributes_in_before_update(self): # local hook @@ -430,7 +454,7 @@ addr.set_attributes(address=u'a@b.com') rset = self.execute('Any A,AA WHERE X eid %(x)s, X address A, X alias AA', {'x': addr.eid}) - self.assertEquals(rset.rows, [[u'a@b.com', u'foo']]) + self.assertEqual(rset.rows, [[u'a@b.com', u'foo']]) def test_set_attributes_in_before_add(self): # local hook @@ -477,7 +501,7 @@ def test_source_from_eid(self): self.session.set_pool() - self.assertEquals(self.repo.source_from_eid(1, self.session), + self.assertEqual(self.repo.source_from_eid(1, self.session), self.repo.sources_by_uri['system']) def test_source_from_eid_raise(self): @@ -486,7 +510,7 @@ def test_type_from_eid(self): self.session.set_pool() - self.assertEquals(self.repo.type_from_eid(1, self.session), 'CWGroup') + self.assertEqual(self.repo.type_from_eid(1, self.session), 'CWGroup') def test_type_from_eid_raise(self): self.session.set_pool() @@ -503,12 +527,12 @@ self.assertIsInstance(data[0][3], datetime) data[0] = list(data[0]) data[0][3] = None - self.assertEquals(tuplify(data), [(-1, 'Personne', 'system', None, None)]) + self.assertEqual(tuplify(data), [(-1, 'Personne', 'system', None, None)]) self.repo.delete_info(self.session, entity, 'system', None) #self.repo.commit() cu = self.session.system_sql('SELECT * FROM entities WHERE eid = -1') data = cu.fetchall() - self.assertEquals(data, []) + self.assertEqual(data, []) class FTITC(CubicWebTC): @@ -518,7 +542,7 @@ eidp = self.execute('INSERT Personne X: X nom "toto", X prenom "tutu"')[0][0] self.commit() ts = datetime.now() - self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1) + self.assertEqual(len(self.execute('Personne X WHERE X has_text "tutu"')), 1) self.session.set_pool() cu = self.session.system_sql('SELECT mtime, eid FROM entities WHERE eid = %s' % eidp) omtime = cu.fetchone()[0] @@ -527,23 +551,23 @@ time.sleep(1 - (ts.second - int(ts.second))) self.execute('SET X nom "tata" WHERE X eid %(x)s', {'x': eidp}) self.commit() - self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1) + self.assertEqual(len(self.execute('Personne X WHERE X has_text "tutu"')), 1) self.session.set_pool() cu = self.session.system_sql('SELECT mtime FROM entities WHERE eid = %s' % eidp) mtime = cu.fetchone()[0] self.failUnless(omtime < mtime) self.commit() date, modified, deleted = self.repo.entities_modified_since(('Personne',), omtime) - self.assertEquals(modified, [('Personne', eidp)]) - self.assertEquals(deleted, []) + self.assertEqual(modified, [('Personne', eidp)]) + self.assertEqual(deleted, []) date, modified, deleted = self.repo.entities_modified_since(('Personne',), mtime) - self.assertEquals(modified, []) - self.assertEquals(deleted, []) + self.assertEqual(modified, []) + self.assertEqual(deleted, []) self.execute('DELETE Personne X WHERE X eid %(x)s', {'x': eidp}) self.commit() date, modified, deleted = self.repo.entities_modified_since(('Personne',), omtime) - self.assertEquals(modified, []) - self.assertEquals(deleted, [('Personne', eidp)]) + self.assertEqual(modified, []) + self.assertEqual(deleted, [('Personne', eidp)]) def test_fulltext_container_entity(self): assert self.schema.rschema('use_email').fulltext_container == 'subject' @@ -551,27 +575,27 @@ toto = req.create_entity('EmailAddress', address=u'toto@logilab.fr') self.commit() rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'}) - self.assertEquals(rset.rows, []) + self.assertEqual(rset.rows, []) req.user.set_relations(use_email=toto) self.commit() rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'}) - self.assertEquals(rset.rows, [[req.user.eid]]) + self.assertEqual(rset.rows, [[req.user.eid]]) req.execute('DELETE X use_email Y WHERE X login "admin", Y eid %(y)s', {'y': toto.eid}) self.commit() rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'}) - self.assertEquals(rset.rows, []) + self.assertEqual(rset.rows, []) tutu = req.create_entity('EmailAddress', address=u'tutu@logilab.fr') req.user.set_relations(use_email=tutu) self.commit() rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'}) - self.assertEquals(rset.rows, [[req.user.eid]]) + self.assertEqual(rset.rows, [[req.user.eid]]) tutu.set_attributes(address=u'hip@logilab.fr') self.commit() rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'}) - self.assertEquals(rset.rows, []) + self.assertEqual(rset.rows, []) rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'hip'}) - self.assertEquals(rset.rows, [[req.user.eid]]) + self.assertEqual(rset.rows, [[req.user.eid]]) def test_no_uncessary_ftiindex_op(self): req = self.request() @@ -584,7 +608,7 @@ def test_versions_inserted(self): inserted = [r[0] for r in self.execute('Any K ORDERBY K WHERE P pkey K, P pkey ~= "system.version.%"')] - self.assertEquals(inserted, + self.assertEqual(inserted, [u'system.version.basket', u'system.version.card', u'system.version.comment', u'system.version.cubicweb', u'system.version.email', u'system.version.file', u'system.version.folder', @@ -616,15 +640,15 @@ eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0] eidn = self.execute('INSERT Note X: X type "T"')[0][0] self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"') - self.assertEquals(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp), + self.assertEqual(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp), ('after_add_relation', eidn, 'ecrit_par', eidp)]) CALLED[:] = () self.execute('DELETE N ecrit_par Y WHERE N type "T", Y nom "toto"') - self.assertEquals(CALLED, [('before_delete_relation', eidn, 'ecrit_par', eidp), + self.assertEqual(CALLED, [('before_delete_relation', eidn, 'ecrit_par', eidp), ('after_delete_relation', eidn, 'ecrit_par', eidp)]) CALLED[:] = () eidn = self.execute('INSERT Note N: N ecrit_par P WHERE P nom "toto"')[0][0] - self.assertEquals(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp), + self.assertEqual(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp), ('after_add_relation', eidn, 'ecrit_par', eidp)]) def test_unique_contraint(self): @@ -638,7 +662,7 @@ req = self.request() req.create_entity('Note', type=u'todo', inline1=a01) ex = self.assertRaises(ValidationError, req.cnx.commit) - self.assertEquals(ex.errors, {'inline1-subject': u'RQLUniqueConstraint S type T, S inline1 A1, A1 todo_by C, Y type T, Y inline1 A2, A2 todo_by C failed'}) + self.assertEqual(ex.errors, {'inline1-subject': u'RQLUniqueConstraint S type T, S inline1 A1, A1 todo_by C, Y type T, Y inline1 A2, A2 todo_by C failed'}) if __name__ == '__main__': unittest_main()