test/unittest_rqlrewrite.py
branchstable
changeset 9168 0fb4b67bde58
parent 8342 7a5271182ef0
child 9170 e6fe77dbcfdf
--- a/test/unittest_rqlrewrite.py	Mon Jul 15 10:59:34 2013 +0200
+++ b/test/unittest_rqlrewrite.py	Mon Jul 22 14:26:33 2013 +0200
@@ -1,4 +1,4 @@
-# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
 #
 # This file is part of CubicWeb.
@@ -23,7 +23,7 @@
 
 from cubicweb import Unauthorized, rqlrewrite
 from cubicweb.schema import RRQLExpression, ERQLExpression
-from cubicweb.devtools import repotest, TestServerConfiguration
+from cubicweb.devtools import repotest, TestServerConfiguration, BaseApptestConfiguration
 
 
 def setUpModule(*args):
@@ -46,7 +46,8 @@
 
 def eid_func_map(eid):
     return {1: 'CWUser',
-            2: 'Card'}[eid]
+            2: 'Card',
+            3: 'Affaire'}[eid]
 
 def rewrite(rqlst, snippets_map, kwargs, existingvars=None):
     class FakeVReg:
@@ -202,6 +203,17 @@
                              'WITH LA BEING (Any LA WHERE (EXISTS(A created_by B, LA documented_by A)) OR (EXISTS(E created_by B, LA concerne E)), '
                              'B eid %(D)s, LA is Affaire)')
 
+
+    def test_ambiguous_optional_same_exprs(self):
+        """See #3013535"""
+        # see test of the same name in RewriteFullTC: original problem is
+        # unreproducible here because it actually lies in
+        # RQLRewriter.insert_local_checks
+        rqlst = parse('Any A,AR,X,CD WHERE A concerne X?, A ref AR, A eid %(a)s, X creation_date CD')
+        rewrite(rqlst, {('X', 'X'): ('X created_by U',),}, {'a': 3})
+        self.assertEqual(rqlst.as_string(),
+                         u'Any A,AR,X,CD WHERE A concerne X?, A ref AR, A eid %(a)s WITH X,CD BEING (Any X,CD WHERE X creation_date CD, EXISTS(X created_by B), B eid %(A)s, X is IN(Division, Note, Societe))')
+
     def test_optional_var_inlined(self):
         c1 = ('X require_permission P')
         c2 = ('X inlined_card O, O require_permission P')
@@ -292,6 +304,7 @@
         self.assertEqual(rqlst.as_string(),
                          "Any C WHERE C in_state STATE, C is Card, "
                          "EXISTS(STATE name 'hop'), STATE is State")
+
     def test_relation_optimization_3_rhs(self):
         snippet = ('TW? subworkflow_exit X, TW name "hop"')
         rqlst = parse('WorkflowTransition C WHERE C subworkflow_exit EXIT')
@@ -308,6 +321,7 @@
         self.assertEqual(rqlst.as_string(),
                          "Any C WHERE C in_state STATE?, C is Card, "
                          "EXISTS(C in_state A, A name 'hop', A is State), STATE is State")
+
     def test_relation_non_optimization_1_rhs(self):
         snippet = ('TW subworkflow_exit X, TW name "hop"')
         rqlst = parse('SubWorkflowExitPoint EXIT WHERE C? subworkflow_exit EXIT')
@@ -459,5 +473,37 @@
         rqlst = parse('Any A, R WHERE A ref R, S is Affaire')
         rewrite(rqlst, {('A', 'X'): (c_ok, c_bad)}, {})
 
+
+from cubicweb.devtools.testlib import CubicWebTC
+from logilab.common.decorators import classproperty
+
+class RewriteFullTC(CubicWebTC):
+    @classproperty
+    def config(cls):
+        return BaseApptestConfiguration(apphome=cls.datapath('rewrite'))
+
+    def process(self, rql, args=None):
+        if args is None:
+            args = {}
+        querier = self.repo.querier
+        union = querier.parse(rql)
+        querier.solutions(self.session, union, args)
+        querier._annotate(union)
+        plan = querier.plan_factory(union, args, self.session)
+        plan.preprocess(union)
+        return union
+
+    def test_ambiguous_optional_same_exprs(self):
+        """See #3013535"""
+        edef1 = self.schema['Societe']
+        edef2 = self.schema['Division']
+        edef3 = self.schema['Note']
+        with self.temporary_permissions((edef1, {'read': (ERQLExpression('X owned_by U'),)}),
+                                        (edef2, {'read': (ERQLExpression('X owned_by U'),)}),
+                                        (edef3, {'read': (ERQLExpression('X owned_by U'),)})):
+            union = self.process('Any A,AR,X,CD WHERE A concerne X?, A ref AR, X creation_date CD')
+            self.assertEqual(union.as_string(), 'Any A,AR,X,CD WHERE A concerne X?, A ref AR, A is Affaire WITH X,CD BEING (Any X,CD WHERE X creation_date CD, EXISTS(X owned_by %(A)s), X is IN(Division, Note, Societe))')
+
+
 if __name__ == '__main__':
     unittest_main()