[rql / querier] fix bad interpretation of some RQL SET query stable
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Wed, 31 Jul 2013 13:56:00 +0200
branchstable
changeset 9206 bf642b50135b
parent 9204 5edd09229890
child 9207 c99e97e9f566
[rql / querier] fix bad interpretation of some RQL SET query when some neged relation is involved. Closes #3058527.
server/ssplanner.py
server/test/unittest_msplanner.py
server/test/unittest_querier.py
--- a/server/ssplanner.py	Tue Jul 30 20:12:20 2013 +0200
+++ b/server/ssplanner.py	Wed Jul 31 13:56:00 2013 +0200
@@ -101,23 +101,16 @@
 
     Return None when no query actually needed, else the given select node that
     will be used as substep query.
-
-    When select has nothing selected, search in origrqlst for restriction that
-    should be considered.
     """
     if origrqlst.where is not None and not select.selection:
         # no selection, append one randomly by searching for a relation which is
-        # neither a type restriction (is) nor an eid specification (not neged
-        # eid with constant node)
+        # not neged neither a type restriction (is/is_instance_of)
         for rel in origrqlst.where.iget_nodes(Relation):
-            if rel.neged(strict=True) or not (
-                rel.is_types_restriction() or
-                (rel.r_type == 'eid'
-                 and isinstance(rel.get_variable_parts()[1], Constant))):
+            if not (rel.neged(traverse_scope=True) or rel.is_types_restriction()):
                 select.append_selected(rel.children[0].copy(select))
                 break
         else:
-            return
+            return None
     if select.selection:
         if origrqlst.where is not None:
             select.set_where(origrqlst.where.copy(select))
--- a/server/test/unittest_msplanner.py	Tue Jul 30 20:12:20 2013 +0200
+++ b/server/test/unittest_msplanner.py	Wed Jul 31 13:56:00 2013 +0200
@@ -1772,7 +1772,12 @@
         repo._type_source_cache[999998] = ('State', 'system', None, 'system')
         self._test('INSERT Note X: X in_state S, X type "bla", X migrated_from N WHERE S eid %(s)s, N eid %(n)s',
                    [('InsertStep',
-                      [('InsertRelationsStep', [])]
+                      [('InsertRelationsStep',
+                        [('OneFetchStep',
+                          [('Any 999999', [{}])],
+                          None, None,
+                          [self.system], {},
+                          [])])]
                      )],
                    {'n': 999999, 's': 999998})
 
--- a/server/test/unittest_querier.py	Tue Jul 30 20:12:20 2013 +0200
+++ b/server/test/unittest_querier.py	Wed Jul 31 13:56:00 2013 +0200
@@ -1,5 +1,5 @@
 # -*- coding: iso-8859-1 -*-
-# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
 #
 # This file is part of CubicWeb.
@@ -1268,12 +1268,21 @@
         newname = self.execute('Any XN WHERE X eid %(x)s, X title XN', {'x': beid})[0][0]
         self.assertEqual(newname, 'toto-moved')
 
+    def test_update_not_exists(self):
+        rset = self.execute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'")
+        eid1, eid2 = rset[0][0], rset[0][1]
+        rset = self.execute("SET X travaille Y WHERE X eid %(x)s, Y eid %(y)s, "
+                            "NOT EXISTS(Z ecrit_par X)",
+                            {'x': unicode(eid1), 'y': unicode(eid2)})
+        self.assertEqual(tuplify(rset.rows), [(eid1, eid2)])
+
     def test_update_query_error(self):
         self.execute("INSERT Personne Y: Y nom 'toto'")
         self.assertRaises(Exception, self.execute, "SET X nom 'toto', X is Personne")
         self.assertRaises(QueryError, self.execute, "SET X nom 'toto', X has_text 'tutu' WHERE X is Personne")
         self.assertRaises(QueryError, self.execute, "SET X login 'tutu', X eid %s" % cnx.user(self.session).eid)
 
+
     # HAVING on write queries test #############################################
 
     def test_update_having(self):