--- a/server/ssplanner.py Thu Aug 20 17:44:27 2009 +0200
+++ b/server/ssplanner.py Thu Aug 20 17:47:35 2009 +0200
@@ -377,7 +377,7 @@
previous FetchStep
relations values comes from the latest result, with one columns for
- each relation defined in self.r_defs
+ each relation defined in self.rdefs
for one entity definition, we'll construct N entity, where N is the
number of the latest result
@@ -387,33 +387,35 @@
RELATION = 1
REVERSE_RELATION = 2
- def __init__(self, plan, e_def, r_defs):
+ def __init__(self, plan, edef, rdefs):
Step.__init__(self, plan)
# partial entity definition to expand
- self.e_def = e_def
+ self.edef = edef
# definition of relations to complete
- self.r_defs = r_defs
+ self.rdefs = rdefs
def execute(self):
"""execute this step"""
- base_e_def = self.e_def
- result = []
- for row in self.execute_child():
+ base_edef = self.edef
+ edefs = []
+ result = self.execute_child()
+ for row in result:
# get a new entity definition for this row
- e_def = copy(base_e_def)
+ edef = copy(base_edef)
# complete this entity def using row values
- for i in range(len(self.r_defs)):
- rtype, rorder = self.r_defs[i]
+ for i in range(len(self.rdefs)):
+ rtype, rorder = self.rdefs[i]
if rorder == RelationsStep.FINAL:
- e_def[rtype] = row[i]
+ edef[rtype] = row[i]
elif rorder == RelationsStep.RELATION:
- self.plan.add_relation_def( (e_def, rtype, row[i]) )
- e_def.querier_pending_relations[(rtype, 'subject')] = row[i]
+ self.plan.add_relation_def( (edef, rtype, row[i]) )
+ edef.querier_pending_relations[(rtype, 'subject')] = row[i]
else:
- self.plan.add_relation_def( (row[i], rtype, e_def) )
- e_def.querier_pending_relations[(rtype, 'object')] = row[i]
- result.append(e_def)
- self.plan.substitute_entity_def(base_e_def, result)
+ self.plan.add_relation_def( (row[i], rtype, edef) )
+ edef.querier_pending_relations[(rtype, 'object')] = row[i]
+ edefs.append(edef)
+ self.plan.substitute_entity_def(base_edef, edefs)
+ return result
class InsertStep(Step):
@@ -483,7 +485,8 @@
edefs = {}
# insert relations
attributes = set([relation.r_type for relation in self.attribute_relations])
- for row in self.execute_child():
+ result = self.execute_child()
+ for row in result:
for relation in self.attribute_relations:
lhs, rhs = relation.get_variable_parts()
eid = typed_eid(row[self.selected_index[str(lhs)]])
@@ -502,8 +505,6 @@
obj = row[self.selected_index[str(relation.children[1])]]
repo.glob_add_relation(session, subj, relation.r_type, obj)
# update entities
- result = []
for eid, edef in edefs.iteritems():
repo.glob_update_entity(session, edef, attributes)
- result.append( (eid,) )
return result
--- a/server/test/unittest_hooks.py Thu Aug 20 17:44:27 2009 +0200
+++ b/server/test/unittest_hooks.py Thu Aug 20 17:47:35 2009 +0200
@@ -155,6 +155,40 @@
self.assertEquals(entity.descr, u'R&D<p>yo</p>')
+ def test_metadata_cwuri(self):
+ eid = self.execute('INSERT Note X')[0][0]
+ cwuri = self.execute('Any U WHERE X eid %s, X cwuri U' % eid)[0][0]
+ self.assertEquals(cwuri, self.repo.config['base-url'] + 'eid/%s' % eid)
+
+ def test_metadata_creation_modification_date(self):
+ _now = datetime.now()
+ eid = self.execute('INSERT Note X')[0][0]
+ creation_date, modification_date = self.execute('Any CD, MD WHERE X eid %s, '
+ 'X creation_date CD, '
+ 'X modification_date MD' % eid)[0]
+ self.assertEquals((creation_date - _now).seconds, 0)
+ self.assertEquals((modification_date - _now).seconds, 0)
+
+ def test_metadata__date(self):
+ _now = datetime.now()
+ eid = self.execute('INSERT Note X')[0][0]
+ creation_date = self.execute('Any D WHERE X eid %s, X creation_date D' % eid)[0][0]
+ self.assertEquals((creation_date - _now).seconds, 0)
+
+ def test_metadata_created_by(self):
+ eid = self.execute('INSERT Note X')[0][0]
+ self.commit() # fire operations
+ rset = self.execute('Any U WHERE X eid %s, X created_by U' % eid)
+ self.assertEquals(len(rset), 1) # make sure we have only one creator
+ self.assertEquals(rset[0][0], self.session.user.eid)
+
+ def test_metadata_owned_by(self):
+ eid = self.execute('INSERT Note X')[0][0]
+ self.commit() # fire operations
+ rset = self.execute('Any U WHERE X eid %s, X owned_by U' % eid)
+ self.assertEquals(len(rset), 1) # make sure we have only one owner
+ self.assertEquals(rset[0][0], self.session.user.eid)
+
class UserGroupHooksTC(RepositoryBasedTC):
@@ -480,177 +514,5 @@
'RT name "prenom", E name "Personne"')
self.commit()
-
-class WorkflowHooksTC(RepositoryBasedTC):
-
- def setUp(self):
- RepositoryBasedTC.setUp(self)
- self.s_activated = self.execute('State X WHERE X name "activated"')[0][0]
- self.s_deactivated = self.execute('State X WHERE X name "deactivated"')[0][0]
- self.s_dummy = self.execute('INSERT State X: X name "dummy", X state_of E WHERE E name "CWUser"')[0][0]
- self.create_user('stduser')
- # give access to users group on the user's wf transitions
- # so we can test wf enforcing on euser (managers don't have anymore this
- # enforcement
- self.execute('SET X require_group G WHERE G name "users", X transition_of ET, ET name "CWUser"')
- self.commit()
-
- def tearDown(self):
- self.execute('DELETE X require_group G WHERE G name "users", X transition_of ET, ET name "CWUser"')
- self.commit()
- RepositoryBasedTC.tearDown(self)
-
- def test_set_initial_state(self):
- ueid = self.execute('INSERT CWUser E: E login "x", E upassword "x", E in_group G '
- 'WHERE G name "users"')[0][0]
- self.failIf(self.execute('Any N WHERE S name N, X in_state S, X eid %(x)s',
- {'x' : ueid}))
- self.commit()
- initialstate = self.execute('Any N WHERE S name N, X in_state S, X eid %(x)s',
- {'x' : ueid})[0][0]
- self.assertEquals(initialstate, u'activated')
-
- def test_initial_state(self):
- cnx = self.login('stduser')
- cu = cnx.cursor()
- self.assertRaises(ValidationError, cu.execute,
- 'INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, '
- 'X in_state S WHERE S name "deactivated"', {'pwd': 'oops'})
- cnx.close()
- # though managers can do whatever he want
- self.execute('INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, '
- 'X in_state S, X in_group G WHERE S name "deactivated", G name "users"', {'pwd': 'oops'})
- self.commit()
-
- # test that the workflow is correctly enforced
- def test_transition_checking1(self):
- cnx = self.login('stduser')
- cu = cnx.cursor()
- ueid = cnx.user(self.current_session()).eid
- self.assertRaises(ValidationError,
- cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
- {'x': ueid, 's': self.s_activated}, 'x')
- cnx.close()
-
- def test_transition_checking2(self):
- cnx = self.login('stduser')
- cu = cnx.cursor()
- ueid = cnx.user(self.current_session()).eid
- self.assertRaises(ValidationError,
- cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
- {'x': ueid, 's': self.s_dummy}, 'x')
- cnx.close()
-
- def test_transition_checking3(self):
- cnx = self.login('stduser')
- cu = cnx.cursor()
- ueid = cnx.user(self.current_session()).eid
- cu.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
- {'x': ueid, 's': self.s_deactivated}, 'x')
- cnx.commit()
- self.assertRaises(ValidationError,
- cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
- {'x': ueid, 's': self.s_deactivated}, 'x')
- # get back now
- cu.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
- {'x': ueid, 's': self.s_activated}, 'x')
- cnx.commit()
- cnx.close()
-
- def test_transition_checking4(self):
- cnx = self.login('stduser')
- cu = cnx.cursor()
- ueid = cnx.user(self.current_session()).eid
- cu.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
- {'x': ueid, 's': self.s_deactivated}, 'x')
- cnx.commit()
- self.assertRaises(ValidationError,
- cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
- {'x': ueid, 's': self.s_dummy}, 'x')
- # get back now
- cu.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
- {'x': ueid, 's': self.s_activated}, 'x')
- cnx.commit()
- cnx.close()
-
- def test_transition_information(self):
- ueid = self.session.user.eid
- self.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
- {'x': ueid, 's': self.s_deactivated}, 'x')
- self.commit()
- rset = self.execute('TrInfo T ORDERBY T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid})
- self.assertEquals(len(rset), 2)
- tr = rset.get_entity(1, 0)
- #tr.complete()
- self.assertEquals(tr.comment, None)
- self.assertEquals(tr.from_state[0].eid, self.s_activated)
- self.assertEquals(tr.to_state[0].eid, self.s_deactivated)
-
- self.session.set_shared_data('trcomment', u'il est pas sage celui-la')
- self.session.set_shared_data('trcommentformat', u'text/plain')
- self.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
- {'x': ueid, 's': self.s_activated}, 'x')
- self.commit()
- rset = self.execute('TrInfo T ORDERBY T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid})
- self.assertEquals(len(rset), 3)
- tr = rset.get_entity(2, 0)
- #tr.complete()
- self.assertEquals(tr.comment, u'il est pas sage celui-la')
- self.assertEquals(tr.comment_format, u'text/plain')
- self.assertEquals(tr.from_state[0].eid, self.s_deactivated)
- self.assertEquals(tr.to_state[0].eid, self.s_activated)
- self.assertEquals(tr.owned_by[0].login, 'admin')
-
- def test_transition_information_on_creation(self):
- ueid = self.create_user('toto')
- rset = self.execute('TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid})
- self.assertEquals(len(rset), 1)
- tr = rset.get_entity(0, 0)
- #tr.complete()
- self.assertEquals(tr.comment, None)
- self.assertEquals(tr.from_state, [])
- self.assertEquals(tr.to_state[0].eid, self.s_activated)
-
- def test_std_users_can_create_trinfo(self):
- self.create_user('toto')
- cnx = self.login('toto')
- cu = cnx.cursor()
- self.failUnless(cu.execute("INSERT Note X: X type 'a', X in_state S WHERE S name 'todo'"))
- cnx.commit()
-
- def test_metadata_cwuri(self):
- eid = self.execute('INSERT Note X')[0][0]
- cwuri = self.execute('Any U WHERE X eid %s, X cwuri U' % eid)[0][0]
- self.assertEquals(cwuri, self.repo.config['base-url'] + 'eid/%s' % eid)
-
- def test_metadata_creation_modification_date(self):
- _now = datetime.now()
- eid = self.execute('INSERT Note X')[0][0]
- creation_date, modification_date = self.execute('Any CD, MD WHERE X eid %s, '
- 'X creation_date CD, '
- 'X modification_date MD' % eid)[0]
- self.assertEquals((creation_date - _now).seconds, 0)
- self.assertEquals((modification_date - _now).seconds, 0)
-
- def test_metadata__date(self):
- _now = datetime.now()
- eid = self.execute('INSERT Note X')[0][0]
- creation_date = self.execute('Any D WHERE X eid %s, X creation_date D' % eid)[0][0]
- self.assertEquals((creation_date - _now).seconds, 0)
-
- def test_metadata_created_by(self):
- eid = self.execute('INSERT Note X')[0][0]
- self.commit() # fire operations
- rset = self.execute('Any U WHERE X eid %s, X created_by U' % eid)
- self.assertEquals(len(rset), 1) # make sure we have only one creator
- self.assertEquals(rset[0][0], self.session.user.eid)
-
- def test_metadata_owned_by(self):
- eid = self.execute('INSERT Note X')[0][0]
- self.commit() # fire operations
- rset = self.execute('Any U WHERE X eid %s, X owned_by U' % eid)
- self.assertEquals(len(rset), 1) # make sure we have only one owner
- self.assertEquals(rset[0][0], self.session.user.eid)
-
if __name__ == '__main__':
unittest_main()
--- a/server/test/unittest_querier.py Thu Aug 20 17:44:27 2009 +0200
+++ b/server/test/unittest_querier.py Thu Aug 20 17:47:35 2009 +0200
@@ -990,17 +990,15 @@
peid = self.execute("INSERT Personne Y: Y nom 'toto'")[0][0]
rset = self.execute('Personne X WHERE X nom "toto"')
self.assertEqual(len(rset.rows), 1)
- self.execute("SET X nom 'tutu', X prenom 'original' WHERE X is Personne, X nom 'toto'")
+ rset = self.execute("SET X nom 'tutu', X prenom 'original' WHERE X is Personne, X nom 'toto'")
+ self.assertEqual(tuplify(rset.rows), [(peid, 'tutu', 'original')])
rset = self.execute('Any Y, Z WHERE X is Personne, X nom Y, X prenom Z')
self.assertEqual(tuplify(rset.rows), [('tutu', 'original')])
def test_update_2(self):
- self.execute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'")
- #rset = self.execute('Any X, Y WHERE X nom "bidule", Y nom "toto"')
- #self.assertEqual(len(rset.rows), 1)
- #rset = self.execute('Any X, Y WHERE X travaille Y')
- #self.assertEqual(len(rset.rows), 0)
- self.execute("SET X travaille Y WHERE X nom 'bidule', Y nom 'toto'")
+ peid, seid = self.execute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'")[0]
+ rset = self.execute("SET X travaille Y WHERE X nom 'bidule', Y nom 'toto'")
+ self.assertEquals(tuplify(rset.rows), [(peid, seid)])
rset = self.execute('Any X, Y WHERE X travaille Y')
self.assertEqual(len(rset.rows), 1)
@@ -1020,9 +1018,6 @@
rset = self.execute('Any X, Y WHERE X travaille Y')
self.assertEqual(len(rset.rows), 1)
-## def test_update_4(self):
-## self.execute("SET X know Y WHERE X ami Y")
-
def test_update_multiple1(self):
peid1 = self.execute("INSERT Personne Y: Y nom 'tutu'")[0][0]
peid2 = self.execute("INSERT Personne Y: Y nom 'toto'")[0][0]