# HG changeset patch # User Sylvain Thénault # Date 1250783255 -7200 # Node ID 8e2544e78a5e235aafaf0d769950f6893132fd91 # Parent 64322aa83a1de81c21384ef4bbdbfd0d028da85b test and fix rset returned by SET query diff -r 64322aa83a1d -r 8e2544e78a5e server/ssplanner.py --- a/server/ssplanner.py Thu Aug 20 17:44:27 2009 +0200 +++ b/server/ssplanner.py Thu Aug 20 17:47:35 2009 +0200 @@ -377,7 +377,7 @@ previous FetchStep relations values comes from the latest result, with one columns for - each relation defined in self.r_defs + each relation defined in self.rdefs for one entity definition, we'll construct N entity, where N is the number of the latest result @@ -387,33 +387,35 @@ RELATION = 1 REVERSE_RELATION = 2 - def __init__(self, plan, e_def, r_defs): + def __init__(self, plan, edef, rdefs): Step.__init__(self, plan) # partial entity definition to expand - self.e_def = e_def + self.edef = edef # definition of relations to complete - self.r_defs = r_defs + self.rdefs = rdefs def execute(self): """execute this step""" - base_e_def = self.e_def - result = [] - for row in self.execute_child(): + base_edef = self.edef + edefs = [] + result = self.execute_child() + for row in result: # get a new entity definition for this row - e_def = copy(base_e_def) + edef = copy(base_edef) # complete this entity def using row values - for i in range(len(self.r_defs)): - rtype, rorder = self.r_defs[i] + for i in range(len(self.rdefs)): + rtype, rorder = self.rdefs[i] if rorder == RelationsStep.FINAL: - e_def[rtype] = row[i] + edef[rtype] = row[i] elif rorder == RelationsStep.RELATION: - self.plan.add_relation_def( (e_def, rtype, row[i]) ) - e_def.querier_pending_relations[(rtype, 'subject')] = row[i] + self.plan.add_relation_def( (edef, rtype, row[i]) ) + edef.querier_pending_relations[(rtype, 'subject')] = row[i] else: - self.plan.add_relation_def( (row[i], rtype, e_def) ) - e_def.querier_pending_relations[(rtype, 'object')] = row[i] - result.append(e_def) - self.plan.substitute_entity_def(base_e_def, result) + self.plan.add_relation_def( (row[i], rtype, edef) ) + edef.querier_pending_relations[(rtype, 'object')] = row[i] + edefs.append(edef) + self.plan.substitute_entity_def(base_edef, edefs) + return result class InsertStep(Step): @@ -483,7 +485,8 @@ edefs = {} # insert relations attributes = set([relation.r_type for relation in self.attribute_relations]) - for row in self.execute_child(): + result = self.execute_child() + for row in result: for relation in self.attribute_relations: lhs, rhs = relation.get_variable_parts() eid = typed_eid(row[self.selected_index[str(lhs)]]) @@ -502,8 +505,6 @@ obj = row[self.selected_index[str(relation.children[1])]] repo.glob_add_relation(session, subj, relation.r_type, obj) # update entities - result = [] for eid, edef in edefs.iteritems(): repo.glob_update_entity(session, edef, attributes) - result.append( (eid,) ) return result diff -r 64322aa83a1d -r 8e2544e78a5e server/test/unittest_hooks.py --- a/server/test/unittest_hooks.py Thu Aug 20 17:44:27 2009 +0200 +++ b/server/test/unittest_hooks.py Thu Aug 20 17:47:35 2009 +0200 @@ -155,6 +155,40 @@ self.assertEquals(entity.descr, u'R&D

yo

') + def test_metadata_cwuri(self): + eid = self.execute('INSERT Note X')[0][0] + cwuri = self.execute('Any U WHERE X eid %s, X cwuri U' % eid)[0][0] + self.assertEquals(cwuri, self.repo.config['base-url'] + 'eid/%s' % eid) + + def test_metadata_creation_modification_date(self): + _now = datetime.now() + eid = self.execute('INSERT Note X')[0][0] + creation_date, modification_date = self.execute('Any CD, MD WHERE X eid %s, ' + 'X creation_date CD, ' + 'X modification_date MD' % eid)[0] + self.assertEquals((creation_date - _now).seconds, 0) + self.assertEquals((modification_date - _now).seconds, 0) + + def test_metadata__date(self): + _now = datetime.now() + eid = self.execute('INSERT Note X')[0][0] + creation_date = self.execute('Any D WHERE X eid %s, X creation_date D' % eid)[0][0] + self.assertEquals((creation_date - _now).seconds, 0) + + def test_metadata_created_by(self): + eid = self.execute('INSERT Note X')[0][0] + self.commit() # fire operations + rset = self.execute('Any U WHERE X eid %s, X created_by U' % eid) + self.assertEquals(len(rset), 1) # make sure we have only one creator + self.assertEquals(rset[0][0], self.session.user.eid) + + def test_metadata_owned_by(self): + eid = self.execute('INSERT Note X')[0][0] + self.commit() # fire operations + rset = self.execute('Any U WHERE X eid %s, X owned_by U' % eid) + self.assertEquals(len(rset), 1) # make sure we have only one owner + self.assertEquals(rset[0][0], self.session.user.eid) + class UserGroupHooksTC(RepositoryBasedTC): @@ -480,177 +514,5 @@ 'RT name "prenom", E name "Personne"') self.commit() - -class WorkflowHooksTC(RepositoryBasedTC): - - def setUp(self): - RepositoryBasedTC.setUp(self) - self.s_activated = self.execute('State X WHERE X name "activated"')[0][0] - self.s_deactivated = self.execute('State X WHERE X name "deactivated"')[0][0] - self.s_dummy = self.execute('INSERT State X: X name "dummy", X state_of E WHERE E name "CWUser"')[0][0] - self.create_user('stduser') - # give access to users group on the user's wf transitions - # so we can test wf enforcing on euser (managers don't have anymore this - # enforcement - self.execute('SET X require_group G WHERE G name "users", X transition_of ET, ET name "CWUser"') - self.commit() - - def tearDown(self): - self.execute('DELETE X require_group G WHERE G name "users", X transition_of ET, ET name "CWUser"') - self.commit() - RepositoryBasedTC.tearDown(self) - - def test_set_initial_state(self): - ueid = self.execute('INSERT CWUser E: E login "x", E upassword "x", E in_group G ' - 'WHERE G name "users"')[0][0] - self.failIf(self.execute('Any N WHERE S name N, X in_state S, X eid %(x)s', - {'x' : ueid})) - self.commit() - initialstate = self.execute('Any N WHERE S name N, X in_state S, X eid %(x)s', - {'x' : ueid})[0][0] - self.assertEquals(initialstate, u'activated') - - def test_initial_state(self): - cnx = self.login('stduser') - cu = cnx.cursor() - self.assertRaises(ValidationError, cu.execute, - 'INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, ' - 'X in_state S WHERE S name "deactivated"', {'pwd': 'oops'}) - cnx.close() - # though managers can do whatever he want - self.execute('INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, ' - 'X in_state S, X in_group G WHERE S name "deactivated", G name "users"', {'pwd': 'oops'}) - self.commit() - - # test that the workflow is correctly enforced - def test_transition_checking1(self): - cnx = self.login('stduser') - cu = cnx.cursor() - ueid = cnx.user(self.current_session()).eid - self.assertRaises(ValidationError, - cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s', - {'x': ueid, 's': self.s_activated}, 'x') - cnx.close() - - def test_transition_checking2(self): - cnx = self.login('stduser') - cu = cnx.cursor() - ueid = cnx.user(self.current_session()).eid - self.assertRaises(ValidationError, - cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s', - {'x': ueid, 's': self.s_dummy}, 'x') - cnx.close() - - def test_transition_checking3(self): - cnx = self.login('stduser') - cu = cnx.cursor() - ueid = cnx.user(self.current_session()).eid - cu.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s', - {'x': ueid, 's': self.s_deactivated}, 'x') - cnx.commit() - self.assertRaises(ValidationError, - cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s', - {'x': ueid, 's': self.s_deactivated}, 'x') - # get back now - cu.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s', - {'x': ueid, 's': self.s_activated}, 'x') - cnx.commit() - cnx.close() - - def test_transition_checking4(self): - cnx = self.login('stduser') - cu = cnx.cursor() - ueid = cnx.user(self.current_session()).eid - cu.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s', - {'x': ueid, 's': self.s_deactivated}, 'x') - cnx.commit() - self.assertRaises(ValidationError, - cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s', - {'x': ueid, 's': self.s_dummy}, 'x') - # get back now - cu.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s', - {'x': ueid, 's': self.s_activated}, 'x') - cnx.commit() - cnx.close() - - def test_transition_information(self): - ueid = self.session.user.eid - self.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s', - {'x': ueid, 's': self.s_deactivated}, 'x') - self.commit() - rset = self.execute('TrInfo T ORDERBY T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid}) - self.assertEquals(len(rset), 2) - tr = rset.get_entity(1, 0) - #tr.complete() - self.assertEquals(tr.comment, None) - self.assertEquals(tr.from_state[0].eid, self.s_activated) - self.assertEquals(tr.to_state[0].eid, self.s_deactivated) - - self.session.set_shared_data('trcomment', u'il est pas sage celui-la') - self.session.set_shared_data('trcommentformat', u'text/plain') - self.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s', - {'x': ueid, 's': self.s_activated}, 'x') - self.commit() - rset = self.execute('TrInfo T ORDERBY T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid}) - self.assertEquals(len(rset), 3) - tr = rset.get_entity(2, 0) - #tr.complete() - self.assertEquals(tr.comment, u'il est pas sage celui-la') - self.assertEquals(tr.comment_format, u'text/plain') - self.assertEquals(tr.from_state[0].eid, self.s_deactivated) - self.assertEquals(tr.to_state[0].eid, self.s_activated) - self.assertEquals(tr.owned_by[0].login, 'admin') - - def test_transition_information_on_creation(self): - ueid = self.create_user('toto') - rset = self.execute('TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid}) - self.assertEquals(len(rset), 1) - tr = rset.get_entity(0, 0) - #tr.complete() - self.assertEquals(tr.comment, None) - self.assertEquals(tr.from_state, []) - self.assertEquals(tr.to_state[0].eid, self.s_activated) - - def test_std_users_can_create_trinfo(self): - self.create_user('toto') - cnx = self.login('toto') - cu = cnx.cursor() - self.failUnless(cu.execute("INSERT Note X: X type 'a', X in_state S WHERE S name 'todo'")) - cnx.commit() - - def test_metadata_cwuri(self): - eid = self.execute('INSERT Note X')[0][0] - cwuri = self.execute('Any U WHERE X eid %s, X cwuri U' % eid)[0][0] - self.assertEquals(cwuri, self.repo.config['base-url'] + 'eid/%s' % eid) - - def test_metadata_creation_modification_date(self): - _now = datetime.now() - eid = self.execute('INSERT Note X')[0][0] - creation_date, modification_date = self.execute('Any CD, MD WHERE X eid %s, ' - 'X creation_date CD, ' - 'X modification_date MD' % eid)[0] - self.assertEquals((creation_date - _now).seconds, 0) - self.assertEquals((modification_date - _now).seconds, 0) - - def test_metadata__date(self): - _now = datetime.now() - eid = self.execute('INSERT Note X')[0][0] - creation_date = self.execute('Any D WHERE X eid %s, X creation_date D' % eid)[0][0] - self.assertEquals((creation_date - _now).seconds, 0) - - def test_metadata_created_by(self): - eid = self.execute('INSERT Note X')[0][0] - self.commit() # fire operations - rset = self.execute('Any U WHERE X eid %s, X created_by U' % eid) - self.assertEquals(len(rset), 1) # make sure we have only one creator - self.assertEquals(rset[0][0], self.session.user.eid) - - def test_metadata_owned_by(self): - eid = self.execute('INSERT Note X')[0][0] - self.commit() # fire operations - rset = self.execute('Any U WHERE X eid %s, X owned_by U' % eid) - self.assertEquals(len(rset), 1) # make sure we have only one owner - self.assertEquals(rset[0][0], self.session.user.eid) - if __name__ == '__main__': unittest_main() diff -r 64322aa83a1d -r 8e2544e78a5e server/test/unittest_querier.py --- a/server/test/unittest_querier.py Thu Aug 20 17:44:27 2009 +0200 +++ b/server/test/unittest_querier.py Thu Aug 20 17:47:35 2009 +0200 @@ -990,17 +990,15 @@ peid = self.execute("INSERT Personne Y: Y nom 'toto'")[0][0] rset = self.execute('Personne X WHERE X nom "toto"') self.assertEqual(len(rset.rows), 1) - self.execute("SET X nom 'tutu', X prenom 'original' WHERE X is Personne, X nom 'toto'") + rset = self.execute("SET X nom 'tutu', X prenom 'original' WHERE X is Personne, X nom 'toto'") + self.assertEqual(tuplify(rset.rows), [(peid, 'tutu', 'original')]) rset = self.execute('Any Y, Z WHERE X is Personne, X nom Y, X prenom Z') self.assertEqual(tuplify(rset.rows), [('tutu', 'original')]) def test_update_2(self): - self.execute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'") - #rset = self.execute('Any X, Y WHERE X nom "bidule", Y nom "toto"') - #self.assertEqual(len(rset.rows), 1) - #rset = self.execute('Any X, Y WHERE X travaille Y') - #self.assertEqual(len(rset.rows), 0) - self.execute("SET X travaille Y WHERE X nom 'bidule', Y nom 'toto'") + peid, seid = self.execute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'")[0] + rset = self.execute("SET X travaille Y WHERE X nom 'bidule', Y nom 'toto'") + self.assertEquals(tuplify(rset.rows), [(peid, seid)]) rset = self.execute('Any X, Y WHERE X travaille Y') self.assertEqual(len(rset.rows), 1) @@ -1020,9 +1018,6 @@ rset = self.execute('Any X, Y WHERE X travaille Y') self.assertEqual(len(rset.rows), 1) -## def test_update_4(self): -## self.execute("SET X know Y WHERE X ami Y") - def test_update_multiple1(self): peid1 = self.execute("INSERT Personne Y: Y nom 'tutu'")[0][0] peid2 = self.execute("INSERT Personne Y: Y nom 'toto'")[0][0]