# HG changeset patch # User sylvain.thenault@logilab.fr # Date 1242226790 -7200 # Node ID 71c143c0ada36121731c5d32d6b5d08111f609ef # Parent eccd1885d42eb29ee99eac7741e2c340f7d73400 fix test diff -r eccd1885d42e -r 71c143c0ada3 server/test/data/bootstrap_cubes --- a/server/test/data/bootstrap_cubes Wed May 13 16:28:21 2009 +0200 +++ b/server/test/data/bootstrap_cubes Wed May 13 16:59:50 2009 +0200 @@ -1,1 +1,1 @@ -comment,folder,tag,basket,email,file +card,comment,folder,tag,basket,email,file diff -r eccd1885d42e -r 71c143c0ada3 server/test/unittest_hookhelper.py --- a/server/test/unittest_hookhelper.py Wed May 13 16:28:21 2009 +0200 +++ b/server/test/unittest_hookhelper.py Wed May 13 16:59:50 2009 +0200 @@ -4,23 +4,23 @@ from logilab.common.testlib import unittest_main from cubicweb.devtools.apptest import RepositoryBasedTC -from cubicweb.server.pool import LateOperation +from cubicweb.server.pool import LateOperation, Operation, SingleLastOperation from cubicweb.server.hookhelper import * class HookHelpersTC(RepositoryBasedTC): - + def setUp(self): RepositoryBasedTC.setUp(self) self.hm = self.repo.hm - + def test_late_operation(self): session = self.session l1 = LateOperation(session) l2 = LateOperation(session) l3 = Operation(session) self.assertEquals(session.pending_operations, [l3, l1, l2]) - + def test_single_last_operation(self): session = self.session l0 = SingleLastOperation(session) @@ -30,7 +30,7 @@ self.assertEquals(session.pending_operations, [l3, l1, l2, l0]) l4 = SingleLastOperation(session) self.assertEquals(session.pending_operations, [l3, l1, l2, l4]) - + def test_global_operation_order(self): from cubicweb.server import hooks, schemahooks session = self.session @@ -42,8 +42,8 @@ op4 = hooks.DelayedDeleteOp(session) op5 = hooks.CheckORelationOp(session) self.assertEquals(session.pending_operations, [op1, op2, op4, op5, op3]) - - + + def test_in_state_notification(self): result = [] # test both email notification and transition_information @@ -78,6 +78,6 @@ if isinstance(op, SendMailOp)] self.assertEquals(len(searchedops), 0, self.session.pending_operations) - + if __name__ == '__main__': unittest_main() diff -r eccd1885d42e -r 71c143c0ada3 server/test/unittest_hooks.py --- a/server/test/unittest_hooks.py Wed May 13 16:28:21 2009 +0200 +++ b/server/test/unittest_hooks.py Wed May 13 16:59:50 2009 +0200 @@ -20,9 +20,9 @@ Repository.get_versions = orig_get_versions - + class CoreHooksTC(RepositoryBasedTC): - + def test_delete_internal_entities(self): self.assertRaises(RepositoryError, self.execute, 'DELETE CWEType X WHERE X name "CWEType"') @@ -40,17 +40,17 @@ self.execute('DELETE X in_group Y WHERE X login "toto"') self.execute('SET X in_group Y WHERE X login "toto", Y name "guests"') self.commit() - + def test_delete_required_relations_object(self): self.skip('no sample in the schema ! YAGNI ? Kermaat ?') - + def test_static_vocabulary_check(self): self.assertRaises(ValidationError, self.execute, 'SET X composite "whatever" WHERE X from_entity FE, FE name "CWUser", X relation_type RT, RT name "in_group"') - + def test_missing_required_relations_subject_inline(self): - # missing in_group relation + # missing in_group relation self.execute('INSERT CWUser X: X login "toto", X upassword "hop"') self.assertRaises(ValidationError, self.commit) @@ -76,7 +76,7 @@ self.execute('SET X sender Y WHERE X is Email, Y is EmailAddress') rset = self.execute('Any S WHERE X sender S, X eid %s' % eeid) self.assertEquals(len(rset), 1) - + def test_composite_1(self): self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"') self.execute('INSERT EmailPart X: X content_format "text/plain", X ordernum 1, X content "this is a test"') @@ -90,7 +90,7 @@ self.commit() rset = self.execute('Any X WHERE X is EmailPart') self.assertEquals(len(rset), 0) - + def test_composite_2(self): self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"') self.execute('INSERT EmailPart X: X content_format "text/plain", X ordernum 1, X content "this is a test"') @@ -102,7 +102,7 @@ self.commit() rset = self.execute('Any X WHERE X is EmailPart') self.assertEquals(len(rset), 0) - + def test_composite_redirection(self): self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"') self.execute('INSERT EmailPart X: X content_format "text/plain", X ordernum 1, X content "this is a test"') @@ -148,11 +148,11 @@ self.execute('SET A descr "R&D
yo" WHERE A eid %s' % entity.eid) entity = self.execute('Any A WHERE A eid %s' % entity.eid).get_entity(0, 0) self.assertEquals(entity.descr, u'R&D
yo
') - + + - class UserGroupHooksTC(RepositoryBasedTC): - + def test_user_synchronization(self): self.create_user('toto', password='hop', commit=False) self.assertRaises(AuthenticationError, @@ -193,9 +193,9 @@ self.execute('DELETE EmailAddress X WHERE X eid %s' % eid) self.commit() self.failIf(self.execute('Any X WHERE X created_by Y, X eid >= %(x)s', {'x': eid})) - + class CWPropertyHooksTC(RepositoryBasedTC): - + def test_unexistant_eproperty(self): ex = self.assertRaises(ValidationError, self.execute, 'INSERT CWProperty X: X pkey "bla.bla", X value "hop", X for_user U') @@ -203,12 +203,12 @@ ex = self.assertRaises(ValidationError, self.execute, 'INSERT CWProperty X: X pkey "bla.bla", X value "hop"') self.assertEquals(ex.errors, {'pkey': 'unknown property key'}) - + def test_site_wide_eproperty(self): ex = self.assertRaises(ValidationError, self.execute, 'INSERT CWProperty X: X pkey "ui.site-title", X value "hop", X for_user U') self.assertEquals(ex.errors, {'for_user': "site-wide property can't be set for user"}) - + def test_bad_type_eproperty(self): ex = self.assertRaises(ValidationError, self.execute, 'INSERT CWProperty X: X pkey "ui.language", X value "hop", X for_user U') @@ -216,10 +216,10 @@ ex = self.assertRaises(ValidationError, self.execute, 'INSERT CWProperty X: X pkey "ui.language", X value "hop"') self.assertEquals(ex.errors, {'value': u'unauthorized value'}) - - + + class SchemaHooksTC(RepositoryBasedTC): - + def test_duplicate_etype_error(self): # check we can't add a CWEType or CWRType entity if it already exists one # with the same name @@ -229,7 +229,7 @@ self.execute, 'INSERT CWEType X: X name "Societe"') self.assertRaises((ValidationError, RepositoryError), self.execute, 'INSERT CWRType X: X name "in_group"') - + def test_validation_unique_constraint(self): self.assertRaises(ValidationError, self.execute, 'INSERT CWUser X: X login "admin"') @@ -253,13 +253,13 @@ RepositoryBasedTC.setUp(self) def index_exists(self, etype, attr, unique=False): - dbhelper = self.session.pool.source('system').dbhelper + dbhelper = self.session.pool.source('system').dbhelper sqlcursor = self.session.pool['system'] return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique) - + def test_base(self): schema = self.repo.schema - dbhelper = self.session.pool.source('system').dbhelper + dbhelper = self.session.pool.source('system').dbhelper sqlcursor = self.session.pool['system'] self.failIf(schema.has_entity('Societe2')) self.failIf(schema.has_entity('concerne2')) @@ -332,8 +332,8 @@ self.failUnless('subdiv' in snames) snames = [name for name, in self.execute('Any N WHERE S is_instance_of Division, S nom N')] self.failUnless('subdiv' in snames) - - + + def test_perms_synchronization_1(self): schema = self.repo.schema self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', 'users'))) @@ -374,9 +374,9 @@ self.execute('Any X WHERE X is CWEType, X name "CWEType"') # schema modification hooks tests ######################################### - + def test_uninline_relation(self): - dbhelper = self.session.pool.source('system').dbhelper + dbhelper = self.session.pool.source('system').dbhelper sqlcursor = self.session.pool['system'] # Personne inline2 Affaire inline # insert a person without inline2 relation (not mandatory) @@ -410,7 +410,7 @@ self.assertEquals(rset.rows[0], [peid, aeid]) def test_indexed_change(self): - dbhelper = self.session.pool.source('system').dbhelper + dbhelper = self.session.pool.source('system').dbhelper sqlcursor = self.session.pool['system'] try: self.execute('SET X indexed TRUE WHERE X relation_type R, R name "sujet"') @@ -428,7 +428,7 @@ self.failIf(self.index_exists('Affaire', 'sujet')) def test_unique_change(self): - dbhelper = self.session.pool.source('system').dbhelper + dbhelper = self.session.pool.source('system').dbhelper sqlcursor = self.session.pool['system'] try: try: @@ -453,7 +453,7 @@ self.commit() self.failIf(self.schema['Affaire'].has_unique_values('sujet')) self.failIf(self.index_exists('Affaire', 'sujet', unique=True)) - + class WorkflowHooksTC(RepositoryBasedTC): @@ -468,7 +468,7 @@ # enforcement self.execute('SET X require_group G WHERE G name "users", X transition_of ET, ET name "CWUser"') self.commit() - + def tearDown(self): self.execute('DELETE X require_group G WHERE G name "users", X transition_of ET, ET name "CWUser"') self.commit() @@ -483,7 +483,7 @@ initialstate = self.execute('Any N WHERE S name N, X in_state S, X eid %(x)s', {'x' : ueid})[0][0] self.assertEquals(initialstate, u'activated') - + def test_initial_state(self): cnx = self.login('stduser') cu = cnx.cursor() @@ -495,7 +495,7 @@ self.execute('INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, ' 'X in_state S, X in_group G WHERE S name "deactivated", G name "users"', {'pwd': 'oops'}) self.commit() - + # test that the workflow is correctly enforced def test_transition_checking1(self): cnx = self.login('stduser') @@ -505,7 +505,7 @@ cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s', {'x': ueid, 's': self.s_activated}, 'x') cnx.close() - + def test_transition_checking2(self): cnx = self.login('stduser') cu = cnx.cursor() @@ -514,7 +514,7 @@ cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s', {'x': ueid, 's': self.s_dummy}, 'x') cnx.close() - + def test_transition_checking3(self): cnx = self.login('stduser') cu = cnx.cursor() @@ -530,7 +530,7 @@ {'x': ueid, 's': self.s_activated}, 'x') cnx.commit() cnx.close() - + def test_transition_checking4(self): cnx = self.login('stduser') cu = cnx.cursor() @@ -559,7 +559,7 @@ self.assertEquals(tr.comment, None) self.assertEquals(tr.from_state[0].eid, self.s_activated) self.assertEquals(tr.to_state[0].eid, self.s_deactivated) - + self.session.set_shared_data('trcomment', u'il est pas sage celui-la') self.session.set_shared_data('trcommentformat', u'text/plain') self.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s', @@ -591,6 +591,6 @@ cu = cnx.cursor() self.failUnless(cu.execute("INSERT Note X: X type 'a', X in_state S WHERE S name 'todo'")) cnx.commit() - + if __name__ == '__main__': unittest_main() diff -r eccd1885d42e -r 71c143c0ada3 server/test/unittest_migractions.py --- a/server/test/unittest_migractions.py Wed May 13 16:28:21 2009 +0200 +++ b/server/test/unittest_migractions.py Wed May 13 16:59:50 2009 +0200 @@ -19,10 +19,10 @@ def teardown_module(*args): Repository.get_versions = orig_get_versions - + class MigrationCommandsTC(RepositoryBasedTC): copy_schema = True - + def setUp(self): if not hasattr(self, '_repo'): # first initialization @@ -43,7 +43,7 @@ interactive=False) assert self.cnx is self.mh._cnx assert self.session is self.mh.session, (self.session.id, self.mh.session.id) - + def test_add_attribute_int(self): self.failIf('whatever' in self.schema) paraordernum = self.mh.rqlexec('Any O WHERE X name "Note", RT name "para", RDEF from_entity X, RDEF relation_type RT, RDEF ordernum O')[0][0] @@ -73,7 +73,7 @@ fields = dict(x.strip().split()[:2] for x in notesql.split('(', 1)[1].rsplit(')', 1)[0].split(',')) self.assertEquals(fields['%sshortpara' % SQL_PREFIX], 'varchar(64)') self.mh.rollback() - + def test_add_datetime_with_default_value_attribute(self): self.failIf('mydate' in self.schema) self.mh.cmd_add_attribute('Note', 'mydate') @@ -88,7 +88,7 @@ self.assertEquals(d1, date.today()) self.assertEquals(d2, testdate) self.mh.rollback() - + def test_rename_attribute(self): self.failIf('civility' in self.schema) eid1 = self.mh.rqlexec('INSERT Personne X: X nom "lui", X sexe "M"')[0][0] @@ -121,7 +121,7 @@ self.assertEquals(t1, "baz") gn = self.mh.rqlexec('Any GN WHERE T require_group G, G name GN, T eid %s' % baz)[0][0] self.assertEquals(gn, 'managers') - + def test_add_entity_type(self): self.failIf('Folder2' in self.schema) self.failIf('filed_under2' in self.schema) @@ -137,7 +137,7 @@ self.assertEquals([str(rs) for rs in self.schema['Folder2'].object_relations()], ['filed_under2', 'identity']) self.assertEquals(sorted(str(e) for e in self.schema['filed_under2'].subjects()), - ['Affaire', 'Card', 'Division', 'Email', 'EmailThread', 'File', + ['Affaire', 'Card', 'Division', 'Email', 'EmailThread', 'File', 'Folder2', 'Image', 'Note', 'Personne', 'Societe', 'SubDivision']) self.assertEquals(self.schema['filed_under2'].objects(), ('Folder2',)) eschema = self.schema.eschema('Folder2') @@ -164,7 +164,7 @@ self.mh.cmd_add_relation_type('filed_under2') self.failUnless('filed_under2' in self.schema) self.assertEquals(sorted(str(e) for e in self.schema['filed_under2'].subjects()), - ['Affaire', 'Card', 'Division', 'Email', 'EmailThread', 'File', + ['Affaire', 'Card', 'Division', 'Email', 'EmailThread', 'File', 'Folder2', 'Image', 'Note', 'Personne', 'Societe', 'SubDivision']) self.assertEquals(self.schema['filed_under2'].objects(), ('Folder2',)) @@ -178,8 +178,8 @@ def test_add_relation_definition(self): self.mh.cmd_add_relation_definition('Societe', 'in_state', 'State') - self.assertEquals(sorted(self.schema['in_state'].subjects()), - ['Affaire', 'Division', 'CWUser', 'Note', 'Societe', 'SubDivision']) + self.assertEquals(sorted(str(x) for x in self.schema['in_state'].subjects()), + ['Affaire', 'CWUser', 'Division', 'Note', 'Societe', 'SubDivision']) self.assertEquals(self.schema['in_state'].objects(), ('State',)) def test_add_relation_definition_nortype(self): @@ -195,7 +195,7 @@ self.mh.cmd_drop_relation_definition('Personne', 'concerne', 'Affaire') self.assertEquals(sorted(str(e) for e in self.schema['concerne'].subjects()), ['Affaire']) self.assertEquals(sorted(str(e) for e in self.schema['concerne'].objects()), ['Division', 'Note', 'Societe', 'SubDivision']) - + def test_drop_relation_definition_with_specialization(self): self.failUnless('concerne' in self.schema) self.assertEquals(sorted(str(e) for e in self.schema['concerne'].subjects()), ['Affaire', 'Personne']) @@ -204,13 +204,13 @@ self.mh.cmd_drop_relation_definition('None', 'concerne', 'Societe') self.assertEquals(sorted(str(e) for e in self.schema['concerne'].subjects()), ['Affaire', 'Personne']) self.assertEquals(sorted(str(e) for e in self.schema['concerne'].objects()), ['Affaire', 'Note']) - + def test_drop_relation_definition2(self): self.failUnless('evaluee' in self.schema) self.mh.cmd_drop_relation_definition('Personne', 'evaluee', 'Note') self.failUnless('evaluee' in self.schema) self.assertEquals(sorted(self.schema['evaluee'].subjects()), - ['Division', 'CWUser', 'Societe', 'SubDivision']) + ['CWUser', 'Division', 'Societe', 'SubDivision']) self.assertEquals(sorted(self.schema['evaluee'].objects()), ['Note']) @@ -229,7 +229,7 @@ finally: self.mh.cmd_change_relation_props('Affaire', 'concerne', 'Societe', cardinality='**') - + def test_change_relation_props_final(self): rschema = self.schema['adel'] card = rschema.rproperty('Personne', 'String', 'fulltextindexed') @@ -255,7 +255,7 @@ # self.assertEquals([rs.type for rs in migrschema['Personne'].ordered_relations() if rs.is_final()], # expected) migrschema['Personne'].description = 'blabla bla' - migrschema['titre'].description = 'usually a title' + migrschema['titre'].description = 'usually a title' migrschema['titre']._rproperties[('Personne', 'String')]['description'] = 'title for this person' # rinorderbefore = cursor.execute('Any O,N WHERE X is CWAttribute, X relation_type RT, RT name N,' # 'X from_entity FE, FE name "Personne",' @@ -264,9 +264,9 @@ # u'sexe', u'promo', u'titre', u'adel', u'ass', u'web', u'tel', # u'fax', u'datenaiss', u'test', u'description'] # self.assertListEquals(rinorderbefore, map(list, zip([0, 0]+range(1, len(expected)), expected))) - - self.mh.cmd_synchronize_schema(commit=False) - + + self.mh.cmd_sync_schema_props_perms(commit=False) + self.assertEquals(cursor.execute('Any D WHERE X name "Personne", X description D')[0][0], 'blabla bla') self.assertEquals(cursor.execute('Any D WHERE X name "titre", X description D')[0][0], @@ -300,7 +300,7 @@ # no more rqlexpr to delete and add para attribute self.failIf(self._rrqlexpr_rset('add', 'para')) self.failIf(self._rrqlexpr_rset('delete', 'para')) - # new rql expr to add ecrit_par relation + # new rql expr to add ecrit_par relation rexpr = self._rrqlexpr_entity('add', 'ecrit_par') self.assertEquals(rexpr.expression, 'O require_permission P, P name "add_note", ' @@ -333,8 +333,8 @@ self.assertEquals(len(cursor.execute('RQLExpression X WHERE NOT ET1 read_permission X, NOT ET2 add_permission X, ' 'NOT ET3 delete_permission X, NOT ET4 update_permission X')), 8+1) # finally - self.assertEquals(len(cursor.execute('RQLExpression X')), nbrqlexpr_start + 1 + 2) - + self.assertEquals(len(cursor.execute('RQLExpression X')), nbrqlexpr_start + 1 + 2) + self.mh.rollback() def _erqlexpr_rset(self, action, ertype): @@ -351,7 +351,7 @@ rset = self._rrqlexpr_rset(action, ertype) self.assertEquals(len(rset), 1) return rset.get_entity(0, 0) - + def test_set_size_constraint(self): # existing previous value try: @@ -378,7 +378,7 @@ cubes.remove('email') cubes.remove('file') self.assertEquals(set(self.config.cubes()), cubes) - for ertype in ('Email', 'EmailThread', 'EmailPart', 'File', 'Image', + for ertype in ('Email', 'EmailThread', 'EmailPart', 'File', 'Image', 'sender', 'in_thread', 'reply_to', 'data_format'): self.failIf(ertype in schema, ertype) self.assertEquals(sorted(schema['see_also']._rproperties.keys()), @@ -402,7 +402,7 @@ cubes.add('email') cubes.add('file') self.assertEquals(set(self.config.cubes()), cubes) - for ertype in ('Email', 'EmailThread', 'EmailPart', 'File', 'Image', + for ertype in ('Email', 'EmailThread', 'EmailPart', 'File', 'Image', 'sender', 'in_thread', 'reply_to', 'data_format'): self.failUnless(ertype in schema, ertype) self.assertEquals(sorted(schema['see_also']._rproperties.keys()), @@ -426,17 +426,13 @@ self.maxeid = self.execute('Any MAX(X)')[0][0] # why this commit is necessary is unclear to me (though without it # next test may fail complaining of missing tables - self.commit() + self.commit() def test_set_state(self): user = self.session.user - self.set_debug(True) self.mh.set_state(user.eid, 'deactivated') user.clear_related_cache('in_state', 'subject') - try: - self.assertEquals(user.state, 'deactivated') - finally: - self.set_debug(False) - + self.assertEquals(user.state, 'deactivated') + if __name__ == '__main__': unittest_main() diff -r eccd1885d42e -r 71c143c0ada3 server/test/unittest_querier.py --- a/server/test/unittest_querier.py Wed May 13 16:28:21 2009 +0200 +++ b/server/test/unittest_querier.py Wed May 13 16:59:50 2009 +0200 @@ -40,10 +40,10 @@ class MakeSchemaTC(TestCase): def test_known_values(self): solution = {'A': 'String', 'B': 'CWUser'} - self.assertEquals(make_schema((Variable('A'), Variable('B')), solution, + self.assertEquals(make_schema((Variable('A'), Variable('B')), solution, 'table0', TYPEMAP), ('C0 text,C1 integer', {'A': 'table0.C0', 'B': 'table0.C1'})) - + repo, cnx = init_test_database('sqlite') @@ -51,19 +51,19 @@ class UtilsTC(BaseQuerierTC): repo = repo - + def get_max_eid(self): # no need for cleanup here return None def cleanup(self): # no need for cleanup here pass - + def test_preprocess_1(self): reid = self.execute('Any X WHERE X is CWRType, X name "owned_by"')[0][0] rqlst = self._prepare('Any COUNT(RDEF) WHERE RDEF relation_type X, X eid %(x)s', {'x': reid}) self.assertEquals(rqlst.solutions, [{'RDEF': 'CWAttribute'}, {'RDEF': 'CWRelation'}]) - + def test_preprocess_2(self): teid = self.execute("INSERT Tag X: X name 'tag'")[0][0] #geid = self.execute("CWGroup G WHERE G name 'users'")[0][0] @@ -73,7 +73,7 @@ # the query may be optimized, should keep only one solution # (any one, etype will be discarded) self.assertEquals(len(rqlst.solutions), 1) - + def test_preprocess_security(self): plan = self._prepare_plan('Any ETN,COUNT(X) GROUPBY ETN ' 'WHERE X is ET, ET name ETN') @@ -109,7 +109,7 @@ 'ET': 'CWEType', 'ETN': 'String'}]) rql, solutions = partrqls[1] self.assertEquals(rql, 'Any ETN,X WHERE X is ET, ET name ETN, ET is CWEType, ' - 'X is IN(Bookmark, Card, Comment, Division, CWCache, CWConstraint, CWConstraintType, CWEType, CWAttribute, CWGroup, CWRelation, CWPermission, CWProperty, CWRType, CWUser, Email, EmailAddress, EmailPart, EmailThread, File, Folder, Image, Note, Personne, RQLExpression, Societe, State, SubDivision, Tag, TrInfo, Transition)') + 'X is IN(Bookmark, CWAttribute, CWCache, CWConstraint, CWConstraintType, CWEType, CWGroup, CWPermission, CWProperty, CWRType, CWRelation, CWUser, Card, Comment, Division, Email, EmailAddress, EmailPart, EmailThread, File, Folder, Image, Note, Personne, RQLExpression, Societe, State, SubDivision, Tag, TrInfo, Transition)') self.assertListEquals(sorted(solutions), sorted([{'X': 'Bookmark', 'ETN': 'String', 'ET': 'CWEType'}, {'X': 'Card', 'ETN': 'String', 'ET': 'CWEType'}, @@ -162,11 +162,11 @@ self.assertEquals(len(subq.children), 3) self.assertEquals([t.as_string() for t in union.children[0].selection], ['MAX(X)']) - + def test_preprocess_nonregr(self): rqlst = self._prepare('Any S ORDERBY SI WHERE NOT S ecrit_par O, S para SI') self.assertEquals(len(rqlst.solutions), 1) - + def test_build_description(self): # should return an empty result set rset = self.execute('Any X WHERE X eid %(x)s', {'x': self.session.user.eid}) @@ -207,66 +207,66 @@ def test_unknown_eid(self): # should return an empty result set self.failIf(self.execute('Any X WHERE X eid 99999999')) - + # selection queries tests ################################################# - + def test_select_1(self): rset = self.execute('Any X ORDERBY X WHERE X is CWGroup') result, descr = rset.rows, rset.description self.assertEquals(tuplify(result), [(1,), (2,), (3,), (4,)]) self.assertEquals(descr, [('CWGroup',), ('CWGroup',), ('CWGroup',), ('CWGroup',)]) - + def test_select_2(self): rset = self.execute('Any X ORDERBY N WHERE X is CWGroup, X name N') self.assertEquals(tuplify(rset.rows), [(3,), (1,), (4,), (2,)]) self.assertEquals(rset.description, [('CWGroup',), ('CWGroup',), ('CWGroup',), ('CWGroup',)]) rset = self.execute('Any X ORDERBY N DESC WHERE X is CWGroup, X name N') self.assertEquals(tuplify(rset.rows), [(2,), (4,), (1,), (3,)]) - + def test_select_3(self): rset = self.execute('Any N GROUPBY N WHERE X is CWGroup, X name N') result, descr = rset.rows, rset.description result.sort() self.assertEquals(tuplify(result), [('guests',), ('managers',), ('owners',), ('users',)]) self.assertEquals(descr, [('String',), ('String',), ('String',), ('String',)]) - + def test_select_is(self): rset = self.execute('Any X, TN ORDERBY TN LIMIT 10 WHERE X is T, T name TN') result, descr = rset.rows, rset.description self.assertEquals(result[0][1], descr[0][0]) - + def test_select_is_aggr(self): rset = self.execute('Any TN, COUNT(X) GROUPBY TN ORDERBY 2 DESC WHERE X is T, T name TN') result, descr = rset.rows, rset.description self.assertEquals(descr[0][0], 'String') self.assertEquals(descr[0][1], 'Int') self.assertEquals(result[0][0], 'CWRelation') - + def test_select_groupby_orderby(self): rset = self.execute('Any N GROUPBY N ORDERBY N WHERE X is CWGroup, X name N') self.assertEquals(tuplify(rset.rows), [('guests',), ('managers',), ('owners',), ('users',)]) self.assertEquals(rset.description, [('String',), ('String',), ('String',), ('String',)]) - + def test_select_complex_groupby(self): rset = self.execute('Any N GROUPBY N WHERE X name N') rset = self.execute('Any N,MAX(D) GROUPBY N LIMIT 5 WHERE X name N, X creation_date D') - + def test_select_inlined_groupby(self): seid = self.execute('State X WHERE X name "deactivated"')[0][0] rset = self.execute('Any U,L,S GROUPBY U,L,S WHERE X in_state S, U login L, S eid %s' % seid) - + def test_select_complex_orderby(self): rset1 = self.execute('Any N ORDERBY N WHERE X name N') self.assertEquals(sorted(rset1.rows), rset1.rows) rset = self.execute('Any N ORDERBY N LIMIT 5 OFFSET 1 WHERE X name N') - self.assertEquals(rset.rows[0][0], rset1.rows[1][0]) + self.assertEquals(rset.rows[0][0], rset1.rows[1][0]) self.assertEquals(len(rset), 5) - + def test_select_5(self): rset = self.execute('Any X, TMP ORDERBY TMP WHERE X name TMP, X is CWGroup') self.assertEquals(tuplify(rset.rows), [(3, 'guests',), (1, 'managers',), (4, 'owners',), (2, 'users',)]) self.assertEquals(rset.description, [('CWGroup', 'String',), ('CWGroup', 'String',), ('CWGroup', 'String',), ('CWGroup', 'String',)]) - + def test_select_6(self): self.execute("INSERT Personne X: X nom 'bidule'")[0] rset = self.execute('Any Y where X name TMP, Y nom in (TMP, "bidule")') @@ -274,7 +274,7 @@ self.assert_(('Personne',) in rset.description) rset = self.execute('DISTINCT Any Y where X name TMP, Y nom in (TMP, "bidule")') self.assert_(('Personne',) in rset.description) - + def test_select_not_attr(self): self.execute("INSERT Personne X: X nom 'bidule'") self.execute("INSERT Societe X: X nom 'chouette'") @@ -285,13 +285,13 @@ self.execute("SET P travaille S WHERE P nom 'bidule', S nom 'chouette'") rset = self.execute('Personne X WHERE NOT X travaille S') self.assertEquals(len(rset.rows), 0, rset.rows) - + def test_select_is_in(self): self.execute("INSERT Personne X: X nom 'bidule'") self.execute("INSERT Societe X: X nom 'chouette'") self.assertEquals(len(self.execute("Any X WHERE X is IN (Personne, Societe)")), 2) - + def test_select_not_rel(self): self.execute("INSERT Personne X: X nom 'bidule'") self.execute("INSERT Societe X: X nom 'chouette'") @@ -301,7 +301,7 @@ self.assertEquals(len(rset.rows), 1, rset.rows) rset = self.execute('Personne X WHERE NOT X travaille S, S nom "chouette"') self.assertEquals(len(rset.rows), 1, rset.rows) - + def test_select_nonregr_inlined(self): self.execute("INSERT Note X: X para 'bidule'") self.execute("INSERT Personne X: X nom 'chouette'") @@ -310,7 +310,7 @@ rset = self.execute('Any U,T ORDERBY T DESC WHERE U is CWUser, ' 'N ecrit_par U, N type T')#, {'x': self.ueid}) self.assertEquals(len(rset.rows), 0) - + def test_select_nonregr_edition_not(self): groupeids = set((1, 2, 3)) groupreadperms = set(r[0] for r in self.execute('Any Y WHERE X name "CWGroup", Y eid IN(1, 2, 3), X read_permission Y')) @@ -318,7 +318,7 @@ self.assertEquals(sorted(r[0] for r in rset.rows), sorted(groupeids - groupreadperms)) rset = self.execute('DISTINCT Any Y WHERE X name "CWGroup", Y eid IN(1, 2, 3), NOT X read_permission Y') self.assertEquals(sorted(r[0] for r in rset.rows), sorted(groupeids - groupreadperms)) - + def test_select_outer_join(self): peid1 = self.execute("INSERT Personne X: X nom 'bidule'")[0][0] peid2 = self.execute("INSERT Personne X: X nom 'autre'")[0][0] @@ -331,7 +331,7 @@ self.assertEquals(rset.rows, [[peid1, seid1], [peid2, None]]) rset = self.execute('Any S,X ORDERBY S WHERE X? travaille S') self.assertEquals(rset.rows, [[seid1, peid1], [seid2, None]]) - + def test_select_outer_join_optimized(self): peid1 = self.execute("INSERT Personne X: X nom 'bidule'")[0][0] rset = self.execute('Any X WHERE X eid %(x)s, P? connait X', {'x':peid1}, 'x') @@ -372,8 +372,8 @@ self.failUnless(['users', 'tag'] in rset.rows) self.failUnless(['activated', None] in rset.rows) rset = self.execute("Any GN,TN ORDERBY GN WHERE T tags G?, T name TN, G name GN") - self.assertEquals(rset.rows, [[None, 'tagbis'], ['users', 'tag']]) - + self.assertEquals(rset.rows, [[None, 'tagbis'], ['users', 'tag']]) + def test_select_not_inline_rel(self): self.execute("INSERT Personne X: X nom 'bidule'") self.execute("INSERT Note X: X type 'a'") @@ -381,7 +381,7 @@ self.execute("SET X ecrit_par Y WHERE X type 'a', Y nom 'bidule'") rset = self.execute('Note X WHERE NOT X ecrit_par P') self.assertEquals(len(rset.rows), 1, rset.rows) - + def test_select_not_unlinked_multiple_solutions(self): self.execute("INSERT Personne X: X nom 'bidule'") self.execute("INSERT Note X: X type 'a'") @@ -395,13 +395,13 @@ self.assertEquals(len(rset.rows), 1) self.assertEquals(len(rset.rows[0]), 1) self.assertEquals(rset.description, [('Int',)]) - + def test_select_aggregat_sum(self): rset = self.execute('Any SUM(O) WHERE X ordernum O') self.assertEquals(len(rset.rows), 1) self.assertEquals(len(rset.rows[0]), 1) self.assertEquals(rset.description, [('Int',)]) - + def test_select_aggregat_min(self): rset = self.execute('Any MIN(X) WHERE X is Personne') self.assertEquals(len(rset.rows), 1) @@ -411,7 +411,7 @@ self.assertEquals(len(rset.rows), 1) self.assertEquals(len(rset.rows[0]), 1) self.assertEquals(rset.description, [('Int',)]) - + def test_select_aggregat_max(self): rset = self.execute('Any MAX(X) WHERE X is Personne') self.assertEquals(len(rset.rows), 1) @@ -444,7 +444,7 @@ rset = self.execute('Any X,N ORDERBY GROUP_SORT_VALUE(N) WHERE X is CWGroup, X name N, NOT U in_group X, U login "admin"') self.failUnlessEqual(len(rset), 3) self.failUnlessEqual(rset[0][1], 'owners') - + def test_select_aggregat_sort(self): rset = self.execute('Any G, COUNT(U) GROUPBY G ORDERBY 2 WHERE U in_group G') self.assertEquals(len(rset.rows), 2) @@ -475,7 +475,7 @@ result = rset.rows result.sort() self.assertEquals(tuplify(result), [(1,), (2,), (3,), (4,), (5,)]) - + def test_select_upper(self): rset = self.execute('Any X, UPPER(L) ORDERBY L WHERE X is CWUser, X login L') self.assertEquals(len(rset.rows), 2) @@ -494,7 +494,7 @@ ## self.assertEquals(rset.rows[0][0], 'admin') ## rset = self.execute('Any L WHERE %(x)s login L', {'x':ueid}) ## self.assertEquals(rset.rows[0][0], 'admin') - + def test_select_searchable_text_1(self): rset = self.execute(u"INSERT Personne X: X nom 'bidüle'") rset = self.execute(u"INSERT Societe X: X nom 'bidüle'") @@ -509,7 +509,7 @@ self.failIf([r[0] for r in rset.rows if r[0] in biduleeids]) # duh? rset = self.execute('Any X WHERE X has_text %(text)s', {'text': u'ça'}) - + def test_select_searchable_text_2(self): rset = self.execute("INSERT Personne X: X nom 'bidule'") rset = self.execute("INSERT Personne X: X nom 'chouette'") @@ -517,7 +517,7 @@ self.commit() rset = self.execute('Personne N where N has_text "bidule"') self.assertEquals(len(rset.rows), 1, rset.rows) - + def test_select_searchable_text_3(self): rset = self.execute("INSERT Personne X: X nom 'bidule', X sexe 'M'") rset = self.execute("INSERT Personne X: X nom 'bidule', X sexe 'F'") @@ -525,7 +525,7 @@ self.commit() rset = self.execute('Any X where X has_text "bidule" and X sexe "M"') self.assertEquals(len(rset.rows), 1, rset.rows) - + def test_select_multiple_searchable_text(self): self.execute(u"INSERT Personne X: X nom 'bidüle'") self.execute("INSERT Societe X: X nom 'chouette', S travaille X") @@ -536,7 +536,7 @@ 'text2': u'chouette',} ) self.assertEquals(len(rset.rows), 1, rset.rows) - + def test_select_no_descr(self): rset = self.execute('Any X WHERE X is CWGroup', build_descr=0) rset.rows.sort() @@ -549,7 +549,7 @@ self.assertEquals(rset.description, [('CWGroup',), ('CWGroup',)]) rset = self.execute('CWGroup X ORDERBY N LIMIT 2 OFFSET 2 WHERE X name N') self.assertEquals(tuplify(rset.rows), [(4,), (2,)]) - + def test_select_symetric(self): self.execute("INSERT Personne X: X nom 'machin'") self.execute("INSERT Personne X: X nom 'bidule'") @@ -569,14 +569,14 @@ self.assertEquals(len(rset.rows), 2, rset.rows) rset = self.execute('Any P where P2 connait P, P2 nom "chouette"') self.assertEquals(len(rset.rows), 2, rset.rows) - + def test_select_inline(self): self.execute("INSERT Personne X: X nom 'bidule'") self.execute("INSERT Note X: X type 'a'") self.execute("SET X ecrit_par Y WHERE X type 'a', Y nom 'bidule'") rset = self.execute('Any N where N ecrit_par X, X nom "bidule"') self.assertEquals(len(rset.rows), 1, rset.rows) - + def test_select_creation_date(self): self.execute("INSERT Personne X: X nom 'bidule'") rset = self.execute('Any D WHERE X nom "bidule", X creation_date D') @@ -593,7 +593,7 @@ self.execute("SET P travaille S WHERE P nom 'chouette', S nom 'caesium'") rset = self.execute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, S1 nom "logilab", S2 nom "caesium"') self.assertEqual(len(rset.rows), 2) - + def test_select_or_sym_relation(self): self.execute("INSERT Personne X: X nom 'bidule'") self.execute("INSERT Personne X: X nom 'chouette'") @@ -608,7 +608,7 @@ self.assertEqual(len(rset.rows), 2, rset.rows) rset = self.execute('DISTINCT Any P WHERE P connait S OR S connait P, S nom "chouette"') self.assertEqual(len(rset.rows), 2, rset.rows) - + def test_select_follow_relation(self): self.execute("INSERT Affaire X: X sujet 'cool'") self.execute("INSERT Societe X: X nom 'chouette'") @@ -635,7 +635,7 @@ self.execute("INSERT Affaire X: X sujet 'abcd'") rset = self.execute('DISTINCT Any S ORDERBY S WHERE A is Affaire, A sujet S') self.assertEqual(rset.rows, [['abcd'], ['important'], ['minor'], ['normal'], ['zou']]) - + def test_select_ordered_distinct_3(self): rset = self.execute('DISTINCT Any N ORDERBY GROUP_SORT_VALUE(N) WHERE X is CWGroup, X name N') self.assertEqual(rset.rows, [['owners'], ['guests'], ['users'], ['managers']]) @@ -650,13 +650,13 @@ rset = self.execute('Any X,E WHERE X owned_by U, X eid E, U eid %(u)s', {'u': self.session.user.eid}) self.failUnless(rset) self.assertEquals(rset.description[0][1], 'Int') - + # def test_select_rewritten_optional(self): # eid = self.execute("INSERT Affaire X: X sujet 'cool'")[0][0] # rset = self.execute('Any X WHERE X eid %(x)s, EXISTS(X owned_by U) OR EXISTS(X concerne S?, S owned_by U)', # {'x': eid}, 'x') # self.assertEquals(rset.rows, [[eid]]) - + def test_today_bug(self): self.execute("INSERT Tag X: X name 'bidule', X creation_date NOW") self.execute("INSERT Tag Y: Y name 'toto'") @@ -689,7 +689,7 @@ 'Int', 'Interval', 'Password', 'String', 'Time']) - + def test_select_constant(self): rset = self.execute('Any X, "toto" ORDERBY X WHERE X is CWGroup') self.assertEquals(rset.rows, @@ -718,14 +718,14 @@ self.assertEquals(rset.description, [('Transition', 'String'), ('State', 'String'), ('Transition', 'String'), ('State', 'String')]) - + def test_select_union_aggregat(self): # meaningless, the goal in to have group by done on different attribute # for each sub-query self.execute('(Any N,COUNT(X) GROUPBY N WHERE X name N, X is State)' ' UNION ' '(Any N,COUNT(X) GROUPBY N ORDERBY 2 WHERE X login N)') - + def test_select_union_aggregat_independant_group(self): self.execute('INSERT State X: X name "hop"') self.execute('INSERT State X: X name "hop"') @@ -736,7 +736,7 @@ ' UNION ' '(Any N,COUNT(X) GROUPBY N WHERE X name N, X is Transition HAVING COUNT(X)>1))') self.assertEquals(rset.rows, [[u'hop', 2], [u'hop', 2]]) - + def test_select_union_selection_with_diff_variables(self): rset = self.execute('(Any N WHERE X name N, X is State)' ' UNION ' @@ -746,7 +746,7 @@ 'deactivate', 'deactivated', 'done', 'en cours', 'end', 'finie', 'markasdone', 'pitetre', 'redoit', 'start', 'todo']) - + def test_exists(self): geid = self.execute("INSERT CWGroup X: X name 'lulufanclub'")[0][0] self.execute("SET U in_group G WHERE G name 'lulufanclub'") @@ -784,15 +784,15 @@ rset = self.execute('Any X WITH X BEING ((Any NULL) UNION (Any "toto"))') self.assertEquals(rset.rows, [[None], ['toto']]) self.assertEquals(rset.description, [(None,), ('String',)]) - + # insertion queries tests ################################################# - + def test_insert_is(self): eid, = self.execute("INSERT Personne X: X nom 'bidule'")[0] etype, = self.execute("Any TN WHERE X is T, X eid %s, T name TN" % eid)[0] self.assertEquals(etype, 'Personne') self.execute("INSERT Personne X: X nom 'managers'") - + def test_insert_1(self): rset = self.execute("INSERT Personne X: X nom 'bidule'") self.assertEquals(len(rset.rows), 1) @@ -819,7 +819,7 @@ self.execute("INSERT Personne X: X nom Y WHERE U login 'admin', U login Y") rset = self.execute('Personne X WHERE X nom "admin"') self.assert_(rset.rows) - self.assertEquals(rset.description, [('Personne',)]) + self.assertEquals(rset.description, [('Personne',)]) def test_insert_4(self): self.execute("INSERT Societe Y: Y nom 'toto'") @@ -827,7 +827,7 @@ rset = self.execute('Any X, Y WHERE X nom "bidule", Y nom "toto", X travaille Y') self.assert_(rset.rows) self.assertEquals(rset.description, [('Personne', 'Societe',)]) - + def test_insert_4bis(self): peid = self.execute("INSERT Personne X: X nom 'bidule'")[0][0] seid = self.execute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X eid %(x)s", @@ -836,7 +836,7 @@ self.execute("INSERT Personne X: X nom 'chouette', X travaille Y WHERE Y eid %(x)s", {'x': str(seid)}) self.assertEqual(len(self.execute('Any X, Y WHERE X travaille Y')), 2) - + def test_insert_4ter(self): peid = self.execute("INSERT Personne X: X nom 'bidule'")[0][0] seid = self.execute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X eid %(x)s", @@ -891,7 +891,7 @@ 'E primary_email EM, EM address "X", E in_group G ' 'WHERE G name "managers"') self.assertEquals(list(rset.description[0]), ['CWUser', 'EmailAddress']) - + # deletion queries tests ################################################## def test_delete_1(self): @@ -901,7 +901,7 @@ self.execute("DELETE Personne Y WHERE Y nom 'toto'") rset = self.execute('Personne X WHERE X nom "toto"') self.assertEqual(len(rset.rows), 0) - + def test_delete_2(self): rset = self.execute("INSERT Personne X, Personne Y, Societe Z : X nom 'syt', Y nom 'adim', Z nom 'Logilab', X travaille Z, Y travaille Z") self.assertEquals(len(rset), 1) @@ -960,7 +960,7 @@ self.assertEquals(len(sqlc.fetchall()), 0) sqlc.execute('SELECT * FROM owned_by_relation WHERE eid_from=%s'%eeid) self.assertEquals(len(sqlc.fetchall()), 0) - + def test_nonregr_delete_cache2(self): eid = self.execute("INSERT Folder T: T name 'toto'")[0][0] self.commit() @@ -979,7 +979,7 @@ self.assertEquals(rset.rows, []) rset = self.execute("Folder X WHERE X eid %s" %eid) self.assertEquals(rset.rows, []) - + # update queries tests #################################################### def test_update_1(self): @@ -989,7 +989,7 @@ self.execute("SET X nom 'tutu', X prenom 'original' WHERE X is Personne, X nom 'toto'") rset = self.execute('Any Y, Z WHERE X is Personne, X nom Y, X prenom Z') self.assertEqual(tuplify(rset.rows), [('tutu', 'original')]) - + def test_update_2(self): self.execute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'") #rset = self.execute('Any X, Y WHERE X nom "bidule", Y nom "toto"') @@ -999,7 +999,7 @@ self.execute("SET X travaille Y WHERE X nom 'bidule', Y nom 'toto'") rset = self.execute('Any X, Y WHERE X travaille Y') self.assertEqual(len(rset.rows), 1) - + def test_update_2bis(self): rset = self.execute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'") eid1, eid2 = rset[0][0], rset[0][1] @@ -1007,7 +1007,7 @@ {'x': str(eid1), 'y': str(eid2)}) rset = self.execute('Any X, Y WHERE X travaille Y') self.assertEqual(len(rset.rows), 1) - + def test_update_2ter(self): rset = self.execute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'") eid1, eid2 = rset[0][0], rset[0][1] @@ -1015,10 +1015,10 @@ {'x': unicode(eid1), 'y': unicode(eid2)}) rset = self.execute('Any X, Y WHERE X travaille Y') self.assertEqual(len(rset.rows), 1) - + ## def test_update_4(self): ## self.execute("SET X know Y WHERE X ami Y") - + def test_update_multiple1(self): peid1 = self.execute("INSERT Personne Y: Y nom 'tutu'")[0][0] peid2 = self.execute("INSERT Personne Y: Y nom 'toto'")[0][0] @@ -1054,16 +1054,16 @@ self.execute('SET X title XN + %(suffix)s WHERE X is Bookmark, X title XN', {'suffix': u'-moved'}) newname = self.execute('Any XN WHERE X eid %(x)s, X title XN', {'x': beid}, 'x')[0][0] self.assertEquals(newname, 'toto-moved') - + def test_update_query_error(self): self.execute("INSERT Personne Y: Y nom 'toto'") self.assertRaises(Exception, self.execute, "SET X nom 'toto', X is Personne") self.assertRaises(QueryError, self.execute, "SET X nom 'toto', X has_text 'tutu' WHERE X is Personne") self.assertRaises(QueryError, self.execute, "SET X login 'tutu', X eid %s" % cnx.user(self.session).eid) - + # upassword encryption tests ################################################# - + def test_insert_upassword(self): rset = self.execute("INSERT CWUser X: X login 'bob', X upassword 'toto'") self.assertEquals(len(rset.rows), 1) @@ -1074,11 +1074,11 @@ cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'" % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX)) passwd = cursor.fetchone()[0].getvalue() - self.assertEquals(passwd, crypt_password('toto', passwd[:2])) + self.assertEquals(passwd, crypt_password('toto', passwd[:2])) rset = self.execute("Any X WHERE X is CWUser, X login 'bob', X upassword '%s'" % passwd) self.assertEquals(len(rset.rows), 1) self.assertEquals(rset.description, [('CWUser',)]) - + def test_update_upassword(self): cursor = self.pool['system'] rset = self.execute("INSERT CWUser X: X login 'bob', X upassword %(pwd)s", {'pwd': 'toto'}) @@ -1088,13 +1088,13 @@ cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'" % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX)) passwd = cursor.fetchone()[0].getvalue() - self.assertEquals(passwd, crypt_password('tutu', passwd[:2])) + self.assertEquals(passwd, crypt_password('tutu', passwd[:2])) rset = self.execute("Any X WHERE X is CWUser, X login 'bob', X upassword '%s'" % passwd) self.assertEquals(len(rset.rows), 1) self.assertEquals(rset.description, [('CWUser',)]) # non regression tests #################################################### - + def test_nonregr_1(self): teid = self.execute("INSERT Tag X: X name 'tag'")[0][0] self.execute("SET X tags Y WHERE X name 'tag', Y is State, Y name 'activated'") @@ -1113,7 +1113,7 @@ rset = self.execute('Any X WHERE E eid %(x)s, E tags X', {'x': teid}) self.assertEquals(rset.rows, [[geid]]) - + def test_nonregr_3(self): """bad sql generated on the second query (destination_state is not detected as an inlined relation) @@ -1151,10 +1151,10 @@ self.assertEquals(rset1.rows, rset2.rows) self.assertEquals(rset1.rows, rset3.rows) self.assertEquals(rset1.rows, rset4.rows) - + def test_nonregr_6(self): self.execute('Any N,COUNT(S) GROUPBY N ORDERBY COUNT(N) WHERE S name N, S is State') - + def test_sqlite_encoding(self): """XXX this test was trying to show a bug on use of lower which only occurs with non ascii string and misconfigured locale @@ -1215,7 +1215,7 @@ """ self.skip('retry me once http://www.sqlite.org/cvstrac/tktview?tn=3773 is fixed') self.execute('Any X ORDERBY D DESC WHERE X creation_date D') - + def test_nonregr_extra_joins(self): ueid = self.session.user.eid teid1 = self.execute("INSERT Folder X: X name 'folder1'")[0][0] @@ -1251,7 +1251,7 @@ "E firstname %(firstname)s, E surname %(surname)s " "WHERE E eid %(x)s, G name 'users', S name 'activated'", {'x':ueid, 'firstname': u'jean', 'surname': u'paul'}, 'x') - + def test_nonregr_u_owned_by_u(self): ueid = self.execute("INSERT CWUser X: X login 'bob', X upassword 'toto', X in_group G " "WHERE G name 'users'")[0][0] diff -r eccd1885d42e -r 71c143c0ada3 server/test/unittest_repository.py --- a/server/test/unittest_repository.py Wed May 13 16:28:21 2009 +0200 +++ b/server/test/unittest_repository.py Wed May 13 16:59:50 2009 +0200 @@ -17,7 +17,7 @@ from cubicweb.dbapi import connect, repo_connect from cubicweb.devtools.apptest import RepositoryBasedTC from cubicweb.devtools.repotest import tuplify -from cubicweb.server import repository +from cubicweb.server import repository from cubicweb.server.sqlutils import SQL_PREFIX @@ -29,10 +29,10 @@ """ singleton providing access to a persistent storage for entities and relation """ - + # def setUp(self): # pass - + # def tearDown(self): # self.repo.config.db_perms = True # cnxid = self.repo.connect(*self.default_user_password()) @@ -64,7 +64,7 @@ (u'String',), (u'Time',)]) finally: self.repo._free_pool(pool) - + def test_schema_has_owner(self): repo = self.repo cnxid = repo.connect(*self.default_user_password()) @@ -74,7 +74,7 @@ self.failIf(repo.execute(cnxid, 'CWRelation X WHERE NOT X owned_by U')) self.failIf(repo.execute(cnxid, 'CWConstraint X WHERE NOT X owned_by U')) self.failIf(repo.execute(cnxid, 'CWConstraintType X WHERE NOT X owned_by U')) - + def test_connect(self): login, passwd = self.default_user_password() self.assert_(self.repo.connect(login, passwd)) @@ -84,7 +84,7 @@ self.repo.connect, login, None) self.assertRaises(AuthenticationError, self.repo.connect, None, None) - + def test_execute(self): repo = self.repo cnxid = repo.connect(*self.default_user_password()) @@ -93,7 +93,7 @@ repo.execute(cnxid, 'Any X where X is Personne, X nom ~= "to"') repo.execute(cnxid, 'Any X WHERE X has_text %(text)s', {'text': u'\xe7a'}) repo.close(cnxid) - + def test_login_upassword_accent(self): repo = self.repo cnxid = repo.connect(*self.default_user_password()) @@ -102,27 +102,28 @@ repo.commit(cnxid) repo.close(cnxid) self.assert_(repo.connect(u"barnabé", u"héhéhé".encode('UTF8'))) - + def test_invalid_entity_rollback(self): repo = self.repo cnxid = repo.connect(*self.default_user_password()) + # no group repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_state S WHERE S name "activated"', {'login': u"tutetute", 'passwd': 'tutetute'}) self.assertRaises(ValidationError, repo.commit, cnxid) rset = repo.execute(cnxid, 'CWUser X WHERE X login "tutetute"') self.assertEquals(rset.rowcount, 0) - + def test_close(self): repo = self.repo cnxid = repo.connect(*self.default_user_password()) self.assert_(cnxid) repo.close(cnxid) self.assertRaises(BadConnectionId, repo.execute, cnxid, 'Any X') - + def test_invalid_cnxid(self): self.assertRaises(BadConnectionId, self.repo.execute, 0, 'Any X') self.assertRaises(BadConnectionId, self.repo.close, None) - + def test_shared_data(self): repo = self.repo cnxid = repo.connect(*self.default_user_password()) @@ -179,7 +180,7 @@ repo.rollback(cnxid) result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'") self.assertEquals(result.rowcount, 0, result.rows) - + def test_transaction_base3(self): repo = self.repo cnxid = repo.connect(*self.default_user_password()) @@ -194,7 +195,7 @@ repo.rollback(cnxid) rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid}) self.assertEquals(len(rset), 1) - + def test_transaction_interleaved(self): self.skip('implement me') @@ -202,11 +203,11 @@ schema = self.repo.schema # check order of attributes is respected self.assertListEquals([r.type for r in schema.eschema('CWAttribute').ordered_relations() - if not r.type in ('eid', 'is', 'is_instance_of', 'identity', + if not r.type in ('eid', 'is', 'is_instance_of', 'identity', 'creation_date', 'modification_date', 'owned_by', 'created_by')], ['relation_type', 'from_entity', 'to_entity', 'constrained_by', - 'cardinality', 'ordernum', + 'cardinality', 'ordernum', 'indexed', 'fulltextindexed', 'internationalizable', 'defaultval', 'description_format', 'description']) @@ -254,14 +255,14 @@ t.join() finally: repository.pyro_unregister(self.repo.config) - + def _pyro_client(self, lock): cnx = connect(self.repo.config.appid, u'admin', 'gingkow') # check we can get the schema schema = cnx.get_schema() self.assertEquals(schema.__hashmode__, None) rset = cnx.cursor().execute('Any U,G WHERE U in_group G') - + def test_internal_api(self): repo = self.repo @@ -301,10 +302,10 @@ repo.close(cnxid) self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 0) self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data') - + class DataHelpersTC(RepositoryBasedTC): - + def setUp(self): """ called before each test from this class """ cnxid = self.repo.connect(*self.default_user_password()) @@ -313,7 +314,7 @@ def tearDown(self): self.session.rollback() - + def test_create_eid(self): self.assert_(self.repo.system_source.create_eid(self.session)) @@ -326,10 +327,10 @@ def test_type_from_eid(self): self.assertEquals(self.repo.type_from_eid(1, self.session), 'CWGroup') - + def test_type_from_eid_raise(self): self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, self.session) - + def test_add_delete_info(self): entity = self.repo.vreg.etype_class('Personne')(self.session, None, None) entity.eid = -1 @@ -350,7 +351,7 @@ class FTITC(RepositoryBasedTC): - + def test_reindex_and_modified_since(self): cursor = self.session.pool['system'] eidp = self.execute('INSERT Personne X: X nom "toto", X prenom "tutu"')[0][0] @@ -400,18 +401,18 @@ self.commit() rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'}) self.assertEquals(rset.rows, [[self.session.user.eid]]) - - + + class DBInitTC(RepositoryBasedTC): - + def test_versions_inserted(self): inserted = [r[0] for r in self.execute('Any K ORDERBY K WHERE P pkey K, P pkey ~= "system.version.%"')] self.assertEquals(inserted, - [u'system.version.basket', u'system.version.comment', - u'system.version.cubicweb', u'system.version.email', - u'system.version.file', u'system.version.folder', + [u'system.version.basket', u'system.version.card', u'system.version.comment', + u'system.version.cubicweb', u'system.version.email', + u'system.version.file', u'system.version.folder', u'system.version.tag']) - + class InlineRelHooksTC(RepositoryBasedTC): """test relation hooks are called for inlined relations """ @@ -419,13 +420,13 @@ RepositoryBasedTC.setUp(self) self.hm = self.repo.hm self.called = [] - + def _before_relation_hook(self, pool, fromeid, rtype, toeid): self.called.append((fromeid, rtype, toeid)) def _after_relation_hook(self, pool, fromeid, rtype, toeid): self.called.append((fromeid, rtype, toeid)) - + def test_before_add_inline_relation(self): """make sure before_