test and fix rset returned by SET query 3.5
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Thu, 20 Aug 2009 17:47:35 +0200
branch3.5
changeset 2921 8e2544e78a5e
parent 2920 64322aa83a1d
child 2922 996103009bc5
test and fix rset returned by SET query
server/ssplanner.py
server/test/unittest_hooks.py
server/test/unittest_querier.py
--- a/server/ssplanner.py	Thu Aug 20 17:44:27 2009 +0200
+++ b/server/ssplanner.py	Thu Aug 20 17:47:35 2009 +0200
@@ -377,7 +377,7 @@
     previous FetchStep
 
     relations values comes from the latest result, with one columns for
-    each relation defined in self.r_defs
+    each relation defined in self.rdefs
 
     for one entity definition, we'll construct N entity, where N is the
     number of the latest result
@@ -387,33 +387,35 @@
     RELATION = 1
     REVERSE_RELATION = 2
 
-    def __init__(self, plan, e_def, r_defs):
+    def __init__(self, plan, edef, rdefs):
         Step.__init__(self, plan)
         # partial entity definition to expand
-        self.e_def = e_def
+        self.edef = edef
         # definition of relations to complete
-        self.r_defs = r_defs
+        self.rdefs = rdefs
 
     def execute(self):
         """execute this step"""
-        base_e_def = self.e_def
-        result = []
-        for row in self.execute_child():
+        base_edef = self.edef
+        edefs = []
+        result = self.execute_child()
+        for row in result:
             # get a new entity definition for this row
-            e_def = copy(base_e_def)
+            edef = copy(base_edef)
             # complete this entity def using row values
-            for i in range(len(self.r_defs)):
-                rtype, rorder = self.r_defs[i]
+            for i in range(len(self.rdefs)):
+                rtype, rorder = self.rdefs[i]
                 if rorder == RelationsStep.FINAL:
-                    e_def[rtype] = row[i]
+                    edef[rtype] = row[i]
                 elif rorder == RelationsStep.RELATION:
-                    self.plan.add_relation_def( (e_def, rtype, row[i]) )
-                    e_def.querier_pending_relations[(rtype, 'subject')] = row[i]
+                    self.plan.add_relation_def( (edef, rtype, row[i]) )
+                    edef.querier_pending_relations[(rtype, 'subject')] = row[i]
                 else:
-                    self.plan.add_relation_def( (row[i], rtype, e_def) )
-                    e_def.querier_pending_relations[(rtype, 'object')] = row[i]
-            result.append(e_def)
-        self.plan.substitute_entity_def(base_e_def, result)
+                    self.plan.add_relation_def( (row[i], rtype, edef) )
+                    edef.querier_pending_relations[(rtype, 'object')] = row[i]
+            edefs.append(edef)
+        self.plan.substitute_entity_def(base_edef, edefs)
+        return result
 
 
 class InsertStep(Step):
@@ -483,7 +485,8 @@
         edefs = {}
         # insert relations
         attributes = set([relation.r_type for relation in self.attribute_relations])
-        for row in self.execute_child():
+        result = self.execute_child()
+        for row in result:
             for relation in self.attribute_relations:
                 lhs, rhs = relation.get_variable_parts()
                 eid = typed_eid(row[self.selected_index[str(lhs)]])
@@ -502,8 +505,6 @@
                 obj = row[self.selected_index[str(relation.children[1])]]
                 repo.glob_add_relation(session, subj, relation.r_type, obj)
         # update entities
-        result = []
         for eid, edef in edefs.iteritems():
             repo.glob_update_entity(session, edef, attributes)
-            result.append( (eid,) )
         return result
--- a/server/test/unittest_hooks.py	Thu Aug 20 17:44:27 2009 +0200
+++ b/server/test/unittest_hooks.py	Thu Aug 20 17:47:35 2009 +0200
@@ -155,6 +155,40 @@
         self.assertEquals(entity.descr, u'R&amp;D<p>yo</p>')
 
 
+    def test_metadata_cwuri(self):
+        eid = self.execute('INSERT Note X')[0][0]
+        cwuri = self.execute('Any U WHERE X eid %s, X cwuri U' % eid)[0][0]
+        self.assertEquals(cwuri, self.repo.config['base-url'] + 'eid/%s' % eid)
+
+    def test_metadata_creation_modification_date(self):
+        _now = datetime.now()
+        eid = self.execute('INSERT Note X')[0][0]
+        creation_date, modification_date = self.execute('Any CD, MD WHERE X eid %s, '
+                                                        'X creation_date CD, '
+                                                        'X modification_date MD' % eid)[0]
+        self.assertEquals((creation_date - _now).seconds, 0)
+        self.assertEquals((modification_date - _now).seconds, 0)
+
+    def test_metadata__date(self):
+        _now = datetime.now()
+        eid = self.execute('INSERT Note X')[0][0]
+        creation_date = self.execute('Any D WHERE X eid %s, X creation_date D' % eid)[0][0]
+        self.assertEquals((creation_date - _now).seconds, 0)
+
+    def test_metadata_created_by(self):
+        eid = self.execute('INSERT Note X')[0][0]
+        self.commit() # fire operations
+        rset = self.execute('Any U WHERE X eid %s, X created_by U' % eid)
+        self.assertEquals(len(rset), 1) # make sure we have only one creator
+        self.assertEquals(rset[0][0], self.session.user.eid)
+
+    def test_metadata_owned_by(self):
+        eid = self.execute('INSERT Note X')[0][0]
+        self.commit() # fire operations
+        rset = self.execute('Any U WHERE X eid %s, X owned_by U' % eid)
+        self.assertEquals(len(rset), 1) # make sure we have only one owner
+        self.assertEquals(rset[0][0], self.session.user.eid)
+
 
 class UserGroupHooksTC(RepositoryBasedTC):
 
@@ -480,177 +514,5 @@
                      'RT name "prenom", E name "Personne"')
         self.commit()
 
-
-class WorkflowHooksTC(RepositoryBasedTC):
-
-    def setUp(self):
-        RepositoryBasedTC.setUp(self)
-        self.s_activated = self.execute('State X WHERE X name "activated"')[0][0]
-        self.s_deactivated = self.execute('State X WHERE X name "deactivated"')[0][0]
-        self.s_dummy = self.execute('INSERT State X: X name "dummy", X state_of E WHERE E name "CWUser"')[0][0]
-        self.create_user('stduser')
-        # give access to users group on the user's wf transitions
-        # so we can test wf enforcing on euser (managers don't have anymore this
-        # enforcement
-        self.execute('SET X require_group G WHERE G name "users", X transition_of ET, ET name "CWUser"')
-        self.commit()
-
-    def tearDown(self):
-        self.execute('DELETE X require_group G WHERE G name "users", X transition_of ET, ET name "CWUser"')
-        self.commit()
-        RepositoryBasedTC.tearDown(self)
-
-    def test_set_initial_state(self):
-        ueid = self.execute('INSERT CWUser E: E login "x", E upassword "x", E in_group G '
-                            'WHERE G name "users"')[0][0]
-        self.failIf(self.execute('Any N WHERE S name N, X in_state S, X eid %(x)s',
-                                 {'x' : ueid}))
-        self.commit()
-        initialstate = self.execute('Any N WHERE S name N, X in_state S, X eid %(x)s',
-                                    {'x' : ueid})[0][0]
-        self.assertEquals(initialstate, u'activated')
-
-    def test_initial_state(self):
-        cnx = self.login('stduser')
-        cu = cnx.cursor()
-        self.assertRaises(ValidationError, cu.execute,
-                          'INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, '
-                          'X in_state S WHERE S name "deactivated"', {'pwd': 'oops'})
-        cnx.close()
-        # though managers can do whatever he want
-        self.execute('INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, '
-                     'X in_state S, X in_group G WHERE S name "deactivated", G name "users"', {'pwd': 'oops'})
-        self.commit()
-
-    # test that the workflow is correctly enforced
-    def test_transition_checking1(self):
-        cnx = self.login('stduser')
-        cu = cnx.cursor()
-        ueid = cnx.user(self.current_session()).eid
-        self.assertRaises(ValidationError,
-                          cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
-                          {'x': ueid, 's': self.s_activated}, 'x')
-        cnx.close()
-
-    def test_transition_checking2(self):
-        cnx = self.login('stduser')
-        cu = cnx.cursor()
-        ueid = cnx.user(self.current_session()).eid
-        self.assertRaises(ValidationError,
-                          cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
-                          {'x': ueid, 's': self.s_dummy}, 'x')
-        cnx.close()
-
-    def test_transition_checking3(self):
-        cnx = self.login('stduser')
-        cu = cnx.cursor()
-        ueid = cnx.user(self.current_session()).eid
-        cu.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
-                      {'x': ueid, 's': self.s_deactivated}, 'x')
-        cnx.commit()
-        self.assertRaises(ValidationError,
-                          cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
-                          {'x': ueid, 's': self.s_deactivated}, 'x')
-        # get back now
-        cu.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
-                      {'x': ueid, 's': self.s_activated}, 'x')
-        cnx.commit()
-        cnx.close()
-
-    def test_transition_checking4(self):
-        cnx = self.login('stduser')
-        cu = cnx.cursor()
-        ueid = cnx.user(self.current_session()).eid
-        cu.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
-                   {'x': ueid, 's': self.s_deactivated}, 'x')
-        cnx.commit()
-        self.assertRaises(ValidationError,
-                          cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
-                          {'x': ueid, 's': self.s_dummy}, 'x')
-        # get back now
-        cu.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
-                      {'x': ueid, 's': self.s_activated}, 'x')
-        cnx.commit()
-        cnx.close()
-
-    def test_transition_information(self):
-        ueid = self.session.user.eid
-        self.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
-                      {'x': ueid, 's': self.s_deactivated}, 'x')
-        self.commit()
-        rset = self.execute('TrInfo T ORDERBY T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid})
-        self.assertEquals(len(rset), 2)
-        tr = rset.get_entity(1, 0)
-        #tr.complete()
-        self.assertEquals(tr.comment, None)
-        self.assertEquals(tr.from_state[0].eid, self.s_activated)
-        self.assertEquals(tr.to_state[0].eid, self.s_deactivated)
-
-        self.session.set_shared_data('trcomment', u'il est pas sage celui-la')
-        self.session.set_shared_data('trcommentformat', u'text/plain')
-        self.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
-                     {'x': ueid, 's': self.s_activated}, 'x')
-        self.commit()
-        rset = self.execute('TrInfo T ORDERBY T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid})
-        self.assertEquals(len(rset), 3)
-        tr = rset.get_entity(2, 0)
-        #tr.complete()
-        self.assertEquals(tr.comment, u'il est pas sage celui-la')
-        self.assertEquals(tr.comment_format, u'text/plain')
-        self.assertEquals(tr.from_state[0].eid, self.s_deactivated)
-        self.assertEquals(tr.to_state[0].eid, self.s_activated)
-        self.assertEquals(tr.owned_by[0].login, 'admin')
-
-    def test_transition_information_on_creation(self):
-        ueid = self.create_user('toto')
-        rset = self.execute('TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid})
-        self.assertEquals(len(rset), 1)
-        tr = rset.get_entity(0, 0)
-        #tr.complete()
-        self.assertEquals(tr.comment, None)
-        self.assertEquals(tr.from_state, [])
-        self.assertEquals(tr.to_state[0].eid, self.s_activated)
-
-    def test_std_users_can_create_trinfo(self):
-        self.create_user('toto')
-        cnx = self.login('toto')
-        cu = cnx.cursor()
-        self.failUnless(cu.execute("INSERT Note X: X type 'a', X in_state S WHERE S name 'todo'"))
-        cnx.commit()
-
-    def test_metadata_cwuri(self):
-        eid = self.execute('INSERT Note X')[0][0]
-        cwuri = self.execute('Any U WHERE X eid %s, X cwuri U' % eid)[0][0]
-        self.assertEquals(cwuri, self.repo.config['base-url'] + 'eid/%s' % eid)
-
-    def test_metadata_creation_modification_date(self):
-        _now = datetime.now()
-        eid = self.execute('INSERT Note X')[0][0]
-        creation_date, modification_date = self.execute('Any CD, MD WHERE X eid %s, '
-                                                        'X creation_date CD, '
-                                                        'X modification_date MD' % eid)[0]
-        self.assertEquals((creation_date - _now).seconds, 0)
-        self.assertEquals((modification_date - _now).seconds, 0)
-
-    def test_metadata__date(self):
-        _now = datetime.now()
-        eid = self.execute('INSERT Note X')[0][0]
-        creation_date = self.execute('Any D WHERE X eid %s, X creation_date D' % eid)[0][0]
-        self.assertEquals((creation_date - _now).seconds, 0)
-
-    def test_metadata_created_by(self):
-        eid = self.execute('INSERT Note X')[0][0]
-        self.commit() # fire operations
-        rset = self.execute('Any U WHERE X eid %s, X created_by U' % eid)
-        self.assertEquals(len(rset), 1) # make sure we have only one creator
-        self.assertEquals(rset[0][0], self.session.user.eid)
-
-    def test_metadata_owned_by(self):
-        eid = self.execute('INSERT Note X')[0][0]
-        self.commit() # fire operations
-        rset = self.execute('Any U WHERE X eid %s, X owned_by U' % eid)
-        self.assertEquals(len(rset), 1) # make sure we have only one owner
-        self.assertEquals(rset[0][0], self.session.user.eid)
-
 if __name__ == '__main__':
     unittest_main()
--- a/server/test/unittest_querier.py	Thu Aug 20 17:44:27 2009 +0200
+++ b/server/test/unittest_querier.py	Thu Aug 20 17:47:35 2009 +0200
@@ -990,17 +990,15 @@
         peid = self.execute("INSERT Personne Y: Y nom 'toto'")[0][0]
         rset = self.execute('Personne X WHERE X nom "toto"')
         self.assertEqual(len(rset.rows), 1)
-        self.execute("SET X nom 'tutu', X prenom 'original' WHERE X is Personne, X nom 'toto'")
+        rset = self.execute("SET X nom 'tutu', X prenom 'original' WHERE X is Personne, X nom 'toto'")
+        self.assertEqual(tuplify(rset.rows), [(peid, 'tutu', 'original')])
         rset = self.execute('Any Y, Z WHERE X is Personne, X nom Y, X prenom Z')
         self.assertEqual(tuplify(rset.rows), [('tutu', 'original')])
 
     def test_update_2(self):
-        self.execute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'")
-        #rset = self.execute('Any X, Y WHERE X nom "bidule", Y nom "toto"')
-        #self.assertEqual(len(rset.rows), 1)
-        #rset = self.execute('Any X, Y WHERE X travaille Y')
-        #self.assertEqual(len(rset.rows), 0)
-        self.execute("SET X travaille Y WHERE X nom 'bidule', Y nom 'toto'")
+        peid, seid = self.execute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'")[0]
+        rset = self.execute("SET X travaille Y WHERE X nom 'bidule', Y nom 'toto'")
+        self.assertEquals(tuplify(rset.rows), [(peid, seid)])
         rset = self.execute('Any X, Y WHERE X travaille Y')
         self.assertEqual(len(rset.rows), 1)
 
@@ -1020,9 +1018,6 @@
         rset = self.execute('Any X, Y WHERE X travaille Y')
         self.assertEqual(len(rset.rows), 1)
 
-##     def test_update_4(self):
-##         self.execute("SET X know Y WHERE X ami Y")
-
     def test_update_multiple1(self):
         peid1 = self.execute("INSERT Personne Y: Y nom 'tutu'")[0][0]
         peid2 = self.execute("INSERT Personne Y: Y nom 'toto'")[0][0]