cubicweb/test/unittest_rqlrewrite.py
changeset 11057 0b59724cb3f2
parent 10712 f7227cbf1d18
child 11250 597f02c5cf5a
equal deleted inserted replaced
11052:058bb3dc685f 11057:0b59724cb3f2
       
     1 # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     3 #
       
     4 # This file is part of CubicWeb.
       
     5 #
       
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     7 # terms of the GNU Lesser General Public License as published by the Free
       
     8 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     9 # any later version.
       
    10 #
       
    11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    14 # details.
       
    15 #
       
    16 # You should have received a copy of the GNU Lesser General Public License along
       
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    18 
       
    19 from six import string_types
       
    20 
       
    21 from logilab.common.testlib import unittest_main, TestCase
       
    22 from logilab.common.testlib import mock_object
       
    23 from yams import BadSchemaDefinition
       
    24 from yams.buildobjs import RelationDefinition
       
    25 from rql import parse, nodes, RQLHelper
       
    26 
       
    27 from cubicweb import Unauthorized, rqlrewrite
       
    28 from cubicweb.schema import RRQLExpression, ERQLExpression
       
    29 from cubicweb.devtools import repotest, TestServerConfiguration, BaseApptestConfiguration
       
    30 
       
    31 
       
    32 def setUpModule(*args):
       
    33     global rqlhelper, schema
       
    34     config = TestServerConfiguration(RQLRewriteTC.datapath('rewrite'))
       
    35     config.bootstrap_cubes()
       
    36     schema = config.load_schema()
       
    37     schema.add_relation_def(RelationDefinition(subject='Card', name='in_state',
       
    38                                                object='State', cardinality='1*'))
       
    39     rqlhelper = RQLHelper(schema, special_relations={'eid': 'uid',
       
    40                                                      'has_text': 'fti'})
       
    41     repotest.do_monkey_patch()
       
    42 
       
    43 def tearDownModule(*args):
       
    44     repotest.undo_monkey_patch()
       
    45     global rqlhelper, schema
       
    46     del rqlhelper, schema
       
    47 
       
    48 def eid_func_map(eid):
       
    49     return {1: 'CWUser',
       
    50             2: 'Card',
       
    51             3: 'Affaire'}[eid]
       
    52 
       
    53 def _prepare_rewriter(rewriter_cls, kwargs):
       
    54     class FakeVReg:
       
    55         schema = schema
       
    56         @staticmethod
       
    57         def solutions(sqlcursor, rqlst, kwargs):
       
    58             rqlhelper.compute_solutions(rqlst, {'eid': eid_func_map}, kwargs=kwargs)
       
    59         class rqlhelper:
       
    60             @staticmethod
       
    61             def annotate(rqlst):
       
    62                 rqlhelper.annotate(rqlst)
       
    63             @staticmethod
       
    64             def simplify(mainrqlst, needcopy=False):
       
    65                 rqlhelper.simplify(rqlst, needcopy)
       
    66     return rewriter_cls(mock_object(vreg=FakeVReg, user=(mock_object(eid=1))))
       
    67 
       
    68 def rewrite(rqlst, snippets_map, kwargs, existingvars=None):
       
    69     rewriter = _prepare_rewriter(rqlrewrite.RQLRewriter, kwargs)
       
    70     snippets = []
       
    71     for v, exprs in sorted(snippets_map.items()):
       
    72         rqlexprs = [isinstance(snippet, string_types)
       
    73                     and mock_object(snippet_rqlst=parse(u'Any X WHERE '+snippet).children[0],
       
    74                                     expression=u'Any X WHERE '+snippet)
       
    75                     or snippet
       
    76                     for snippet in exprs]
       
    77         snippets.append((dict([v]), rqlexprs))
       
    78     rqlhelper.compute_solutions(rqlst.children[0], {'eid': eid_func_map}, kwargs=kwargs)
       
    79     rewriter.rewrite(rqlst.children[0], snippets, kwargs, existingvars)
       
    80     test_vrefs(rqlst.children[0])
       
    81     return rewriter.rewritten
       
    82 
       
    83 def test_vrefs(node):
       
    84     vrefmaps = {}
       
    85     selects = []
       
    86     for vref in node.iget_nodes(nodes.VariableRef):
       
    87         stmt = vref.stmt
       
    88         try:
       
    89             vrefmaps[stmt].setdefault(vref.name, set()).add(vref)
       
    90         except KeyError:
       
    91             vrefmaps[stmt] = {vref.name: set( (vref,) )}
       
    92             selects.append(stmt)
       
    93     assert node in selects, (node, selects)
       
    94     for stmt in selects:
       
    95         for var in stmt.defined_vars.values():
       
    96             assert var.stinfo['references']
       
    97             vrefmap = vrefmaps[stmt]
       
    98             assert not (var.stinfo['references'] ^ vrefmap[var.name]), (node.as_string(), var, var.stinfo['references'], vrefmap[var.name])
       
    99 
       
   100 
       
   101 class RQLRewriteTC(TestCase):
       
   102     """a faire:
       
   103 
       
   104     * optimisation: detecter les relations utilisees dans les rqlexpressions qui
       
   105       sont presentes dans la requete de depart pour les reutiliser si possible
       
   106 
       
   107     * "has_<ACTION>_permission" ?
       
   108     """
       
   109 
       
   110     def test_base_var(self):
       
   111         constraint = ('X in_state S, U in_group G, P require_state S,'
       
   112                            'P name "read", P require_group G')
       
   113         rqlst = parse(u'Card C')
       
   114         rewrite(rqlst, {('C', 'X'): (constraint,)}, {})
       
   115         self.assertEqual(rqlst.as_string(),
       
   116                          u'Any C WHERE C is Card, B eid %(D)s, '
       
   117                           'EXISTS(C in_state A, B in_group E, F require_state A, '
       
   118                           'F name "read", F require_group E, A is State, E is CWGroup, F is CWPermission)')
       
   119 
       
   120     def test_multiple_var(self):
       
   121         card_constraint = ('X in_state S, U in_group G, P require_state S,'
       
   122                            'P name "read", P require_group G')
       
   123         affaire_constraints = ('X ref LIKE "PUBLIC%"', 'U in_group G, G name "public"')
       
   124         kwargs = {'u':2}
       
   125         rqlst = parse(u'Any S WHERE S documented_by C, C eid %(u)s')
       
   126         rewrite(rqlst, {('C', 'X'): (card_constraint,), ('S', 'X'): affaire_constraints},
       
   127                 kwargs)
       
   128         self.assertMultiLineEqual(
       
   129             rqlst.as_string(),
       
   130             u'Any S WHERE S documented_by C, C eid %(u)s, B eid %(D)s, '
       
   131              'EXISTS(C in_state A, B in_group E, F require_state A, '
       
   132              'F name "read", F require_group E, A is State, E is CWGroup, F is CWPermission), '
       
   133              '(EXISTS(S ref LIKE "PUBLIC%")) OR (EXISTS(B in_group G, G name "public", G is CWGroup)), '
       
   134              'S is Affaire')
       
   135         self.assertIn('D', kwargs)
       
   136 
       
   137     def test_or(self):
       
   138         constraint = '(X identity U) OR (X in_state ST, CL identity U, CL in_state ST, ST name "subscribed")'
       
   139         rqlst = parse(u'Any S WHERE S owned_by C, C eid %(u)s, S is in (CWUser, CWGroup)')
       
   140         rewrite(rqlst, {('C', 'X'): (constraint,)}, {'u':1})
       
   141         self.assertEqual(rqlst.as_string(),
       
   142                          'Any S WHERE S owned_by C, C eid %(u)s, S is IN(CWUser, CWGroup), A eid %(B)s, '
       
   143                          'EXISTS((C identity A) OR (C in_state D, E identity A, '
       
   144                          'E in_state D, D name "subscribed"), D is State, E is CWUser)')
       
   145 
       
   146     def test_simplified_rqlst(self):
       
   147         constraint = ('X in_state S, U in_group G, P require_state S,'
       
   148                            'P name "read", P require_group G')
       
   149         rqlst = parse(u'Any 2') # this is the simplified rql st for Any X WHERE X eid 12
       
   150         rewrite(rqlst, {('2', 'X'): (constraint,)}, {})
       
   151         self.assertEqual(rqlst.as_string(),
       
   152                          u'Any 2 WHERE B eid %(C)s, '
       
   153                           'EXISTS(2 in_state A, B in_group D, E require_state A, '
       
   154                           'E name "read", E require_group D, A is State, D is CWGroup, E is CWPermission)')
       
   155 
       
   156     def test_optional_var_1(self):
       
   157         constraint = ('X in_state S, U in_group G, P require_state S,'
       
   158                            'P name "read", P require_group G')
       
   159         rqlst = parse(u'Any A,C WHERE A documented_by C?')
       
   160         rewrite(rqlst, {('C', 'X'): (constraint,)}, {})
       
   161         self.assertEqual(rqlst.as_string(),
       
   162                          u'Any A,C WHERE A documented_by C?, A is Affaire '
       
   163                           'WITH C BEING '
       
   164                           '(Any C WHERE EXISTS(C in_state B, D in_group F, G require_state B, G name "read", '
       
   165                           'G require_group F), D eid %(A)s, C is Card)')
       
   166 
       
   167     def test_optional_var_2(self):
       
   168         constraint = ('X in_state S, U in_group G, P require_state S,'
       
   169                            'P name "read", P require_group G')
       
   170         rqlst = parse(u'Any A,C,T WHERE A documented_by C?, C title T')
       
   171         rewrite(rqlst, {('C', 'X'): (constraint,)}, {})
       
   172         self.assertEqual(rqlst.as_string(),
       
   173                          u'Any A,C,T WHERE A documented_by C?, A is Affaire '
       
   174                           'WITH C,T BEING '
       
   175                           '(Any C,T WHERE C title T, EXISTS(C in_state B, D in_group F, '
       
   176                           'G require_state B, G name "read", G require_group F), '
       
   177                           'D eid %(A)s, C is Card)')
       
   178 
       
   179     def test_optional_var_3(self):
       
   180         constraint1 = ('X in_state S, U in_group G, P require_state S,'
       
   181                        'P name "read", P require_group G')
       
   182         constraint2 = 'X in_state S, S name "public"'
       
   183         rqlst = parse(u'Any A,C,T WHERE A documented_by C?, C title T')
       
   184         rewrite(rqlst, {('C', 'X'): (constraint1, constraint2)}, {})
       
   185         self.assertEqual(rqlst.as_string(),
       
   186                          u'Any A,C,T WHERE A documented_by C?, A is Affaire '
       
   187                           'WITH C,T BEING (Any C,T WHERE C title T, '
       
   188                           '(EXISTS(C in_state B, D in_group F, G require_state B, G name "read", G require_group F)) '
       
   189                           'OR (EXISTS(C in_state E, E name "public")), '
       
   190                           'D eid %(A)s, C is Card)')
       
   191 
       
   192     def test_optional_var_4(self):
       
   193         constraint1 = 'A created_by U, X documented_by A'
       
   194         constraint2 = 'A created_by U, X concerne A'
       
   195         constraint3 = 'X created_by U'
       
   196         rqlst = parse(u'Any X,LA,Y WHERE LA? documented_by X, LA concerne Y')
       
   197         rewrite(rqlst, {('LA', 'X'): (constraint1, constraint2),
       
   198                         ('X', 'X'): (constraint3,),
       
   199                         ('Y', 'X'): (constraint3,)}, {})
       
   200         self.assertEqual(rqlst.as_string(),
       
   201                              u'Any X,LA,Y WHERE LA? documented_by X, LA concerne Y, B eid %(C)s, '
       
   202                              'EXISTS(X created_by B), EXISTS(Y created_by B), '
       
   203                              'X is Card, Y is IN(Division, Note, Societe) '
       
   204                              'WITH LA BEING (Any LA WHERE (EXISTS(A created_by B, LA documented_by A)) OR (EXISTS(E created_by B, LA concerne E)), '
       
   205                              'B eid %(D)s, LA is Affaire)')
       
   206 
       
   207 
       
   208     def test_ambiguous_optional_same_exprs(self):
       
   209         """See #3013535"""
       
   210         # see test of the same name in RewriteFullTC: original problem is
       
   211         # unreproducible here because it actually lies in
       
   212         # RQLRewriter.insert_local_checks
       
   213         rqlst = parse(u'Any A,AR,X,CD WHERE A concerne X?, A ref AR, A eid %(a)s, X creation_date CD')
       
   214         rewrite(rqlst, {('X', 'X'): ('X created_by U',),}, {'a': 3})
       
   215         self.assertEqual(rqlst.as_string(),
       
   216                          u'Any A,AR,X,CD WHERE A concerne X?, A ref AR, A eid %(a)s WITH X,CD BEING (Any X,CD WHERE X creation_date CD, EXISTS(X created_by B), B eid %(A)s, X is IN(Division, Note, Societe))')
       
   217 
       
   218     def test_optional_var_inlined(self):
       
   219         c1 = ('X require_permission P')
       
   220         c2 = ('X inlined_card O, O require_permission P')
       
   221         rqlst = parse(u'Any C,A,R WHERE A? inlined_card C, A ref R')
       
   222         rewrite(rqlst, {('C', 'X'): (c1,),
       
   223                         ('A', 'X'): (c2,),
       
   224                         }, {})
       
   225         # XXX suboptimal
       
   226         self.assertEqual(rqlst.as_string(),
       
   227                          "Any C,A,R WITH A,C,R BEING "
       
   228                          "(Any A,C,R WHERE A? inlined_card C, A ref R, "
       
   229                          "(A is NULL) OR (EXISTS(A inlined_card B, B require_permission D, "
       
   230                          "B is Card, D is CWPermission)), "
       
   231                          "A is Affaire, C is Card, EXISTS(C require_permission E, E is CWPermission))")
       
   232 
       
   233     # def test_optional_var_inlined_has_perm(self):
       
   234     #     c1 = ('X require_permission P')
       
   235     #     c2 = ('X inlined_card O, U has_read_permission O')
       
   236     #     rqlst = parse(u'Any C,A,R WHERE A? inlined_card C, A ref R')
       
   237     #     rewrite(rqlst, {('C', 'X'): (c1,),
       
   238     #                     ('A', 'X'): (c2,),
       
   239     #                     }, {})
       
   240     #     self.assertEqual(rqlst.as_string(),
       
   241     #                          "")
       
   242 
       
   243     def test_optional_var_inlined_imbricated_error(self):
       
   244         c1 = ('X require_permission P')
       
   245         c2 = ('X inlined_card O, O require_permission P')
       
   246         rqlst = parse(u'Any C,A,R,A2,R2 WHERE A? inlined_card C, A ref R,A2? inlined_card C, A2 ref R2')
       
   247         self.assertRaises(BadSchemaDefinition,
       
   248                           rewrite, rqlst, {('C', 'X'): (c1,),
       
   249                                            ('A', 'X'): (c2,),
       
   250                                            ('A2', 'X'): (c2,),
       
   251                                            }, {})
       
   252 
       
   253     def test_optional_var_inlined_linked(self):
       
   254         c1 = ('X require_permission P')
       
   255         c2 = ('X inlined_card O, O require_permission P')
       
   256         rqlst = parse(u'Any A,W WHERE A inlined_card C?, C inlined_note N, '
       
   257                       'N inlined_affaire W')
       
   258         rewrite(rqlst, {('C', 'X'): (c1,)}, {})
       
   259         self.assertEqual(rqlst.as_string(),
       
   260                          'Any A,W WHERE A inlined_card C?, A is Affaire '
       
   261                          'WITH C,N,W BEING (Any C,N,W WHERE C inlined_note N, '
       
   262                          'N inlined_affaire W, EXISTS(C require_permission B), '
       
   263                          'C is Card, N is Note, W is Affaire)')
       
   264 
       
   265     def test_relation_optimization_1_lhs(self):
       
   266         # since Card in_state State as monovalued cardinality, the in_state
       
   267         # relation used in the rql expression can be ignored and S replaced by
       
   268         # the variable from the incoming query
       
   269         snippet = ('X in_state S, S name "hop"')
       
   270         rqlst = parse(u'Card C WHERE C in_state STATE')
       
   271         rewrite(rqlst, {('C', 'X'): (snippet,)}, {})
       
   272         self.assertEqual(rqlst.as_string(),
       
   273                          'Any C WHERE C in_state STATE, C is Card, '
       
   274                          'EXISTS(STATE name "hop"), STATE is State')
       
   275 
       
   276     def test_relation_optimization_1_rhs(self):
       
   277         snippet = ('TW subworkflow_exit X, TW name "hop"')
       
   278         rqlst = parse(u'WorkflowTransition C WHERE C subworkflow_exit EXIT')
       
   279         rewrite(rqlst, {('EXIT', 'X'): (snippet,)}, {})
       
   280         self.assertEqual(rqlst.as_string(),
       
   281                          'Any C WHERE C subworkflow_exit EXIT, C is WorkflowTransition, '
       
   282                          'EXISTS(C name "hop"), EXIT is SubWorkflowExitPoint')
       
   283 
       
   284     def test_relation_optimization_2_lhs(self):
       
   285         # optional relation can be shared if also optional in the snippet
       
   286         snippet = ('X in_state S?, S name "hop"')
       
   287         rqlst = parse(u'Card C WHERE C in_state STATE?')
       
   288         rewrite(rqlst, {('C', 'X'): (snippet,)}, {})
       
   289         self.assertEqual(rqlst.as_string(),
       
   290                          'Any C WHERE C in_state STATE?, C is Card, '
       
   291                          'EXISTS(STATE name "hop"), STATE is State')
       
   292     def test_relation_optimization_2_rhs(self):
       
   293         snippet = ('TW? subworkflow_exit X, TW name "hop"')
       
   294         rqlst = parse(u'SubWorkflowExitPoint EXIT WHERE C? subworkflow_exit EXIT')
       
   295         rewrite(rqlst, {('EXIT', 'X'): (snippet,)}, {})
       
   296         self.assertEqual(rqlst.as_string(),
       
   297                          'Any EXIT WHERE C? subworkflow_exit EXIT, EXIT is SubWorkflowExitPoint, '
       
   298                          'EXISTS(C name "hop"), C is WorkflowTransition')
       
   299 
       
   300     def test_relation_optimization_3_lhs(self):
       
   301         # optional relation in the snippet but not in the orig tree can be shared
       
   302         snippet = ('X in_state S?, S name "hop"')
       
   303         rqlst = parse(u'Card C WHERE C in_state STATE')
       
   304         rewrite(rqlst, {('C', 'X'): (snippet,)}, {})
       
   305         self.assertEqual(rqlst.as_string(),
       
   306                          'Any C WHERE C in_state STATE, C is Card, '
       
   307                          'EXISTS(STATE name "hop"), STATE is State')
       
   308 
       
   309     def test_relation_optimization_3_rhs(self):
       
   310         snippet = ('TW? subworkflow_exit X, TW name "hop"')
       
   311         rqlst = parse(u'WorkflowTransition C WHERE C subworkflow_exit EXIT')
       
   312         rewrite(rqlst, {('EXIT', 'X'): (snippet,)}, {})
       
   313         self.assertEqual(rqlst.as_string(),
       
   314                          'Any C WHERE C subworkflow_exit EXIT, C is WorkflowTransition, '
       
   315                          'EXISTS(C name "hop"), EXIT is SubWorkflowExitPoint')
       
   316 
       
   317     def test_relation_non_optimization_1_lhs(self):
       
   318         # but optional relation in the orig tree but not in the snippet can't be shared
       
   319         snippet = ('X in_state S, S name "hop"')
       
   320         rqlst = parse(u'Card C WHERE C in_state STATE?')
       
   321         rewrite(rqlst, {('C', 'X'): (snippet,)}, {})
       
   322         self.assertEqual(rqlst.as_string(),
       
   323                          'Any C WHERE C in_state STATE?, C is Card, '
       
   324                          'EXISTS(C in_state A, A name "hop", A is State), STATE is State')
       
   325 
       
   326     def test_relation_non_optimization_1_rhs(self):
       
   327         snippet = ('TW subworkflow_exit X, TW name "hop"')
       
   328         rqlst = parse(u'SubWorkflowExitPoint EXIT WHERE C? subworkflow_exit EXIT')
       
   329         rewrite(rqlst, {('EXIT', 'X'): (snippet,)}, {})
       
   330         self.assertEqual(rqlst.as_string(),
       
   331                          'Any EXIT WHERE C? subworkflow_exit EXIT, EXIT is SubWorkflowExitPoint, '
       
   332                          'EXISTS(A subworkflow_exit EXIT, A name "hop", A is WorkflowTransition), '
       
   333                          'C is WorkflowTransition')
       
   334 
       
   335     def test_relation_non_optimization_2(self):
       
   336         """See #3024730"""
       
   337         # 'X inlined_note N' must not be shared with 'C inlined_note N'
       
   338         # previously inserted, else this may introduce duplicated results, as N
       
   339         # will then be shared by multiple EXISTS and so at SQL generation time,
       
   340         # the table will be in the FROM clause of the outermost query
       
   341         rqlst = parse(u'Any A,C WHERE A inlined_card C')
       
   342         rewrite(rqlst, {('A', 'X'): ('X inlined_card C, C inlined_note N, N owned_by U',),
       
   343                         ('C', 'X'): ('X inlined_note N, N owned_by U',)}, {})
       
   344         self.assertEqual(rqlst.as_string(),
       
   345                          'Any A,C WHERE A inlined_card C, D eid %(E)s, '
       
   346                          'EXISTS(C inlined_note B, B owned_by D, B is Note), '
       
   347                          'EXISTS(C inlined_note F, F owned_by D, F is Note), '
       
   348                          'A is Affaire, C is Card')
       
   349 
       
   350     def test_unsupported_constraint_1(self):
       
   351         # CWUser doesn't have require_permission
       
   352         trinfo_constraint = ('X wf_info_for Y, Y require_permission P, P name "read"')
       
   353         rqlst = parse(u'Any U,T WHERE U is CWUser, T wf_info_for U')
       
   354         self.assertRaises(Unauthorized, rewrite, rqlst, {('T', 'X'): (trinfo_constraint,)}, {})
       
   355 
       
   356     def test_unsupported_constraint_2(self):
       
   357         trinfo_constraint = ('X wf_info_for Y, Y require_permission P, P name "read"')
       
   358         rqlst = parse(u'Any U,T WHERE U is CWUser, T wf_info_for U')
       
   359         rewrite(rqlst, {('T', 'X'): (trinfo_constraint, 'X wf_info_for Y, Y in_group G, G name "managers"')}, {})
       
   360         self.assertEqual(rqlst.as_string(),
       
   361                          u'Any U,T WHERE U is CWUser, T wf_info_for U, '
       
   362                           'EXISTS(U in_group B, B name "managers", B is CWGroup), T is TrInfo')
       
   363 
       
   364     def test_unsupported_constraint_3(self):
       
   365         self.skipTest('raise unauthorized for now')
       
   366         trinfo_constraint = ('X wf_info_for Y, Y require_permission P, P name "read"')
       
   367         rqlst = parse(u'Any T WHERE T wf_info_for X')
       
   368         rewrite(rqlst, {('T', 'X'): (trinfo_constraint, 'X in_group G, G name "managers"')}, {})
       
   369         self.assertEqual(rqlst.as_string(),
       
   370                          u'XXX dunno what should be generated')
       
   371 
       
   372     def test_add_ambiguity_exists(self):
       
   373         constraint = ('X concerne Y')
       
   374         rqlst = parse(u'Affaire X')
       
   375         rewrite(rqlst, {('X', 'X'): (constraint,)}, {})
       
   376         self.assertEqual(rqlst.as_string(),
       
   377                          u"Any X WHERE X is Affaire, ((EXISTS(X concerne A, A is Division)) OR (EXISTS(X concerne C, C is Societe))) OR (EXISTS(X concerne B, B is Note))")
       
   378 
       
   379     def test_add_ambiguity_outerjoin(self):
       
   380         constraint = ('X concerne Y')
       
   381         rqlst = parse(u'Any X,C WHERE X? documented_by C')
       
   382         rewrite(rqlst, {('X', 'X'): (constraint,)}, {})
       
   383         # ambiguity are kept in the sub-query, no need to be resolved using OR
       
   384         self.assertEqual(rqlst.as_string(),
       
   385                          u"Any X,C WHERE X? documented_by C, C is Card WITH X BEING (Any X WHERE EXISTS(X concerne A), X is Affaire)")
       
   386 
       
   387 
       
   388     def test_rrqlexpr_nonexistant_subject_1(self):
       
   389         constraint = RRQLExpression('S owned_by U')
       
   390         rqlst = parse(u'Card C')
       
   391         rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'SU')
       
   392         self.assertEqual(rqlst.as_string(),
       
   393                          u"Any C WHERE C is Card, A eid %(B)s, EXISTS(C owned_by A)")
       
   394         rqlst = parse(u'Card C')
       
   395         rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'OU')
       
   396         self.assertEqual(rqlst.as_string(),
       
   397                          u"Any C WHERE C is Card")
       
   398         rqlst = parse(u'Card C')
       
   399         rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'SOU')
       
   400         self.assertEqual(rqlst.as_string(),
       
   401                          u"Any C WHERE C is Card, A eid %(B)s, EXISTS(C owned_by A)")
       
   402 
       
   403     def test_rrqlexpr_nonexistant_subject_2(self):
       
   404         constraint = RRQLExpression('S owned_by U, O owned_by U, O is Card')
       
   405         rqlst = parse(u'Card C')
       
   406         rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'SU')
       
   407         self.assertEqual(rqlst.as_string(),
       
   408                          'Any C WHERE C is Card, A eid %(B)s, EXISTS(C owned_by A)')
       
   409         rqlst = parse(u'Card C')
       
   410         rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'OU')
       
   411         self.assertEqual(rqlst.as_string(),
       
   412                          'Any C WHERE C is Card, B eid %(D)s, EXISTS(A owned_by B, A is Card)')
       
   413         rqlst = parse(u'Card C')
       
   414         rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'SOU')
       
   415         self.assertEqual(rqlst.as_string(),
       
   416                          'Any C WHERE C is Card, A eid %(B)s, EXISTS(C owned_by A, D owned_by A, D is Card)')
       
   417 
       
   418     def test_rrqlexpr_nonexistant_subject_3(self):
       
   419         constraint = RRQLExpression('U in_group G, G name "users"')
       
   420         rqlst = parse(u'Card C')
       
   421         rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'SU')
       
   422         self.assertEqual(rqlst.as_string(),
       
   423                          u'Any C WHERE C is Card, A eid %(B)s, EXISTS(A in_group D, D name "users", D is CWGroup)')
       
   424 
       
   425     def test_rrqlexpr_nonexistant_subject_4(self):
       
   426         constraint = RRQLExpression('U in_group G, G name "users", S owned_by U')
       
   427         rqlst = parse(u'Card C')
       
   428         rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'SU')
       
   429         self.assertEqual(rqlst.as_string(),
       
   430                          u'Any C WHERE C is Card, A eid %(B)s, EXISTS(A in_group D, D name "users", C owned_by A, D is CWGroup)')
       
   431         rqlst = parse(u'Card C')
       
   432         rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'OU')
       
   433         self.assertEqual(rqlst.as_string(),
       
   434                          u'Any C WHERE C is Card, A eid %(B)s, EXISTS(A in_group D, D name "users", D is CWGroup)')
       
   435 
       
   436     def test_rrqlexpr_nonexistant_subject_5(self):
       
   437         constraint = RRQLExpression('S owned_by Z, O owned_by Z, O is Card')
       
   438         rqlst = parse(u'Card C')
       
   439         rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'S')
       
   440         self.assertEqual(rqlst.as_string(),
       
   441                          u"Any C WHERE C is Card, EXISTS(C owned_by A, A is CWUser)")
       
   442 
       
   443     def test_rqlexpr_not_relation_1_1(self):
       
   444         constraint = ERQLExpression('X owned_by Z, Z login "hop"', 'X')
       
   445         rqlst = parse(u'Affaire A WHERE NOT EXISTS(A documented_by C)')
       
   446         rewrite(rqlst, {('C', 'X'): (constraint,)}, {}, 'X')
       
   447         self.assertEqual(rqlst.as_string(),
       
   448                          u'Any A WHERE NOT EXISTS(A documented_by C, EXISTS(C owned_by B, B login "hop", B is CWUser), C is Card), A is Affaire')
       
   449 
       
   450     def test_rqlexpr_not_relation_1_2(self):
       
   451         constraint = ERQLExpression('X owned_by Z, Z login "hop"', 'X')
       
   452         rqlst = parse(u'Affaire A WHERE NOT EXISTS(A documented_by C)')
       
   453         rewrite(rqlst, {('A', 'X'): (constraint,)}, {}, 'X')
       
   454         self.assertEqual(rqlst.as_string(),
       
   455                          u'Any A WHERE NOT EXISTS(A documented_by C, C is Card), A is Affaire, EXISTS(A owned_by B, B login "hop", B is CWUser)')
       
   456 
       
   457     def test_rqlexpr_not_relation_2(self):
       
   458         constraint = ERQLExpression('X owned_by Z, Z login "hop"', 'X')
       
   459         rqlst = rqlhelper.parse(u'Affaire A WHERE NOT A documented_by C', annotate=False)
       
   460         rewrite(rqlst, {('C', 'X'): (constraint,)}, {}, 'X')
       
   461         self.assertEqual(rqlst.as_string(),
       
   462                          u'Any A WHERE NOT EXISTS(A documented_by C, EXISTS(C owned_by B, B login "hop", B is CWUser), C is Card), A is Affaire')
       
   463 
       
   464     def test_rqlexpr_multiexpr_outerjoin(self):
       
   465         c1 = ERQLExpression('X owned_by Z, Z login "hop"', 'X')
       
   466         c2 = ERQLExpression('X owned_by Z, Z login "hip"', 'X')
       
   467         c3 = ERQLExpression('X owned_by Z, Z login "momo"', 'X')
       
   468         rqlst = rqlhelper.parse(u'Any A WHERE A documented_by C?', annotate=False)
       
   469         rewrite(rqlst, {('C', 'X'): (c1, c2, c3)}, {}, 'X')
       
   470         self.assertEqual(rqlst.as_string(),
       
   471                          u'Any A WHERE A documented_by C?, A is Affaire '
       
   472                          'WITH C BEING (Any C WHERE ((EXISTS(C owned_by B, B login "hop")) '
       
   473                          'OR (EXISTS(C owned_by D, D login "momo"))) '
       
   474                          'OR (EXISTS(C owned_by A, A login "hip")), C is Card)')
       
   475 
       
   476     def test_multiple_erql_one_bad(self):
       
   477         #: reproduce bug #2236985
       
   478         #: (rqlrewrite fails to remove rewritten entry for unsupported constraint and then crash)
       
   479         #:
       
   480         #: This check a very rare code path triggered by the four condition below
       
   481 
       
   482         # 1. c_ok introduce an ambiguity
       
   483         c_ok = ERQLExpression('X concerne R')
       
   484         # 2. c_bad is just plain wrong and won't be kept
       
   485         # 3. but it declare a new variable
       
   486         # 4. this variable require a rewrite
       
   487         c_bad = ERQLExpression('X documented_by R, A in_state R')
       
   488 
       
   489         rqlst = parse(u'Any A, R WHERE A ref R, S is Affaire')
       
   490         rewrite(rqlst, {('A', 'X'): (c_ok, c_bad)}, {})
       
   491 
       
   492     def test_nonregr_is_instance_of(self):
       
   493         user_expr = ERQLExpression('NOT X in_group AF, AF name "guests"')
       
   494         rqlst = parse(u'Any O WHERE S use_email O, S is CWUser, O is_instance_of EmailAddress')
       
   495         rewrite(rqlst, {('S', 'X'): (user_expr,)}, {})
       
   496         self.assertEqual(rqlst.as_string(),
       
   497                          'Any O WHERE S use_email O, S is CWUser, O is EmailAddress, '
       
   498                          'EXISTS(NOT S in_group A, A name "guests", A is CWGroup)')
       
   499 
       
   500 from cubicweb.devtools.testlib import CubicWebTC
       
   501 from logilab.common.decorators import classproperty
       
   502 
       
   503 class RewriteFullTC(CubicWebTC):
       
   504     @classproperty
       
   505     def config(cls):
       
   506         return BaseApptestConfiguration(apphome=cls.datapath('rewrite'))
       
   507 
       
   508     def process(self, rql, args=None):
       
   509         if args is None:
       
   510             args = {}
       
   511         querier = self.repo.querier
       
   512         union = querier.parse(rql)
       
   513         with self.admin_access.repo_cnx() as cnx:
       
   514             querier.solutions(cnx, union, args)
       
   515             querier._annotate(union)
       
   516             plan = querier.plan_factory(union, args, cnx)
       
   517             plan.preprocess(union)
       
   518             return union
       
   519 
       
   520     def test_ambiguous_optional_same_exprs(self):
       
   521         """See #3013535"""
       
   522         edef1 = self.schema['Societe']
       
   523         edef2 = self.schema['Division']
       
   524         edef3 = self.schema['Note']
       
   525         with self.temporary_permissions((edef1, {'read': (ERQLExpression('X owned_by U'),)}),
       
   526                                         (edef2, {'read': (ERQLExpression('X owned_by U'),)}),
       
   527                                         (edef3, {'read': (ERQLExpression('X owned_by U'),)})):
       
   528             union = self.process('Any A,AR,X,CD WHERE A concerne X?, A ref AR, X creation_date CD')
       
   529             self.assertEqual('Any A,AR,X,CD WHERE A concerne X?, A ref AR, A is Affaire '
       
   530                              'WITH X,CD BEING (Any X,CD WHERE X creation_date CD, '
       
   531                              'EXISTS(X owned_by %(A)s), X is IN(Division, Note, Societe))',
       
   532                              union.as_string())
       
   533 
       
   534     def test_ambiguous_optional_diff_exprs(self):
       
   535         """See #3013554"""
       
   536         self.skipTest('bad request generated (may generate duplicated results)')
       
   537         edef1 = self.schema['Societe']
       
   538         edef2 = self.schema['Division']
       
   539         edef3 = self.schema['Note']
       
   540         with self.temporary_permissions((edef1, {'read': (ERQLExpression('X created_by U'),)}),
       
   541                                         (edef2, {'read': ('users',)}),
       
   542                                         (edef3, {'read': (ERQLExpression('X owned_by U'),)})):
       
   543             union = self.process('Any A,AR,X,CD WHERE A concerne X?, A ref AR, X creation_date CD')
       
   544             self.assertEqual(union.as_string(), 'not generated today')
       
   545 
       
   546 
       
   547     def test_xxxx(self):
       
   548         edef1 = self.schema['Societe']
       
   549         edef2 = self.schema['Division']
       
   550         read_expr = ERQLExpression('X responsable E, U has_read_permission E')
       
   551         with self.temporary_permissions((edef1, {'read': (read_expr,)}),
       
   552                                         (edef2, {'read': (read_expr,)})):
       
   553             union = self.process('Any X,AA,AC,AD ORDERBY AD DESC '
       
   554                                  'WHERE X responsable E, X nom AA, '
       
   555                                  'X responsable AC?, AC modification_date AD')
       
   556             self.assertEqual('Any X,AA,AC,AD ORDERBY AD DESC '
       
   557                              'WHERE X responsable E, X nom AA, '
       
   558                              'X responsable AC?, AC modification_date AD, '
       
   559                              'AC is CWUser, E is CWUser, X is IN(Division, Societe)',
       
   560                              union.as_string())
       
   561 
       
   562     def test_question_mark_attribute_snippet(self):
       
   563         # see #3661918
       
   564         from cubicweb.rqlrewrite import RQLRewriter
       
   565         from logilab.common.decorators import monkeypatch
       
   566         repotest.undo_monkey_patch()
       
   567         orig_insert_snippets = RQLRewriter.insert_snippets
       
   568         # patch insert_snippets and not rewrite, insert_snippets is already
       
   569         # monkey patches (see above setupModule/repotest)
       
   570         @monkeypatch(RQLRewriter)
       
   571         def insert_snippets(self, snippets, varexistsmap=None):
       
   572             # crash occurs if snippets are processed in a specific order, force
       
   573             # destiny
       
   574             if snippets[0][0] != {u'N': 'X'}:
       
   575                 snippets = list(reversed(snippets))
       
   576             return orig_insert_snippets(self, snippets, varexistsmap)
       
   577         try:
       
   578             with self.temporary_permissions(
       
   579                     (self.schema['Affaire'],
       
   580                      {'read': (ERQLExpression('X ref "blah"'), )}),
       
   581                     (self.schema['Note'],
       
   582                      {'read': (ERQLExpression(
       
   583                          'EXISTS(X inlined_affaire Z), EXISTS(Z owned_by U)'), )}),
       
   584             ):
       
   585                 union = self.process(
       
   586                     'Any A,COUNT(N) GROUPBY A '
       
   587                     'WHERE A is Affaire, N? inlined_affaire A')
       
   588                 self.assertEqual('Any A,COUNT(N) GROUPBY A WHERE A is Affaire '
       
   589                                  'WITH N,A BEING (Any N,A WHERE N? inlined_affaire A, '
       
   590                                  '(N is NULL) OR (EXISTS(EXISTS(N inlined_affaire B), '
       
   591                                  'EXISTS(B owned_by %(E)s), B is Affaire)), '
       
   592                                  'A is Affaire, N is Note, EXISTS(A ref "blah"))',
       
   593                                  union.as_string())
       
   594         finally:
       
   595             RQLRewriter.insert_snippets = orig_insert_snippets
       
   596 
       
   597 
       
   598 class RQLRelationRewriterTC(TestCase):
       
   599     # XXX valid rules: S and O specified, not in a SET, INSERT, DELETE scope
       
   600     #     valid uses: no outer join
       
   601 
       
   602     # Basic tests
       
   603     def test_base_rule(self):
       
   604         rules = {'participated_in': 'S contributor O'}
       
   605         rqlst = rqlhelper.parse(u'Any X WHERE X participated_in S')
       
   606         rule_rewrite(rqlst, rules)
       
   607         self.assertEqual('Any X WHERE X contributor S',
       
   608                          rqlst.as_string())
       
   609 
       
   610     def test_complex_rule_1(self):
       
   611         rules = {'illustrator_of': ('C is Contribution, C contributor S, '
       
   612                                     'C manifestation O, C role R, '
       
   613                                     'R name "illustrator"')}
       
   614         rqlst = rqlhelper.parse(u'Any A,B WHERE A illustrator_of B')
       
   615         rule_rewrite(rqlst, rules)
       
   616         self.assertEqual('Any A,B WHERE C is Contribution, '
       
   617                          'C contributor A, C manifestation B, '
       
   618                          'C role D, D name "illustrator"',
       
   619                          rqlst.as_string())
       
   620 
       
   621     def test_complex_rule_2(self):
       
   622         rules = {'illustrator_of': ('C is Contribution, C contributor S, '
       
   623                                     'C manifestation O, C role R, '
       
   624                                     'R name "illustrator"')}
       
   625         rqlst = rqlhelper.parse(u'Any A WHERE EXISTS(A illustrator_of B)')
       
   626         rule_rewrite(rqlst, rules)
       
   627         self.assertEqual('Any A WHERE EXISTS(C is Contribution, '
       
   628                          'C contributor A, C manifestation B, '
       
   629                          'C role D, D name "illustrator")',
       
   630                          rqlst.as_string())
       
   631 
       
   632 
       
   633     def test_rewrite2(self):
       
   634         rules = {'illustrator_of': 'C is Contribution, C contributor S, '
       
   635                 'C manifestation O, C role R, R name "illustrator"'}
       
   636         rqlst = rqlhelper.parse(u'Any A,B WHERE A illustrator_of B, C require_permission R, S'
       
   637                                 'require_state O')
       
   638         rule_rewrite(rqlst, rules)
       
   639         self.assertEqual('Any A,B WHERE C require_permission R, S require_state O, '
       
   640                          'D is Contribution, D contributor A, D manifestation B, D role E, '
       
   641                          'E name "illustrator"',
       
   642                           rqlst.as_string())
       
   643 
       
   644     def test_rewrite3(self):
       
   645         rules = {'illustrator_of': 'C is Contribution, C contributor S, '
       
   646                 'C manifestation O, C role R, R name "illustrator"'}
       
   647         rqlst = rqlhelper.parse(u'Any A,B WHERE E require_permission T, A illustrator_of B')
       
   648         rule_rewrite(rqlst, rules)
       
   649         self.assertEqual('Any A,B WHERE E require_permission T, '
       
   650                          'C is Contribution, C contributor A, C manifestation B, '
       
   651                          'C role D, D name "illustrator"',
       
   652                          rqlst.as_string())
       
   653 
       
   654     def test_rewrite4(self):
       
   655         rules = {'illustrator_of': 'C is Contribution, C contributor S, '
       
   656                 'C manifestation O, C role R, R name "illustrator"'}
       
   657         rqlst = rqlhelper.parse(u'Any A,B WHERE C require_permission R, A illustrator_of B')
       
   658         rule_rewrite(rqlst, rules)
       
   659         self.assertEqual('Any A,B WHERE C require_permission R, '
       
   660                          'D is Contribution, D contributor A, D manifestation B, '
       
   661                          'D role E, E name "illustrator"',
       
   662                          rqlst.as_string())
       
   663 
       
   664     def test_rewrite5(self):
       
   665         rules = {'illustrator_of': 'C is Contribution, C contributor S, '
       
   666                 'C manifestation O, C role R, R name "illustrator"'}
       
   667         rqlst = rqlhelper.parse(u'Any A,B WHERE C require_permission R, A illustrator_of B, '
       
   668                                 'S require_state O')
       
   669         rule_rewrite(rqlst, rules)
       
   670         self.assertEqual('Any A,B WHERE C require_permission R, S require_state O, '
       
   671                          'D is Contribution, D contributor A, D manifestation B, D role E, '
       
   672                          'E name "illustrator"',
       
   673                          rqlst.as_string())
       
   674 
       
   675     # Tests for the with clause
       
   676     def test_rewrite_with(self):
       
   677         rules = {'illustrator_of': 'C is Contribution, C contributor S, '
       
   678                 'C manifestation O, C role R, R name "illustrator"'}
       
   679         rqlst = rqlhelper.parse(u'Any A,B WITH A, B BEING(Any X, Y WHERE X illustrator_of Y)')
       
   680         rule_rewrite(rqlst, rules)
       
   681         self.assertEqual('Any A,B WITH A,B BEING '
       
   682                          '(Any X,Y WHERE A is Contribution, A contributor X, '
       
   683                          'A manifestation Y, A role B, B name "illustrator")',
       
   684                          rqlst.as_string())
       
   685 
       
   686     def test_rewrite_with2(self):
       
   687         rules = {'illustrator_of': 'C is Contribution, C contributor S, '
       
   688                 'C manifestation O, C role R, R name "illustrator"'}
       
   689         rqlst = rqlhelper.parse(u'Any A,B WHERE T require_permission C WITH A, B BEING(Any X, Y WHERE X illustrator_of Y)')
       
   690         rule_rewrite(rqlst, rules)
       
   691         self.assertEqual('Any A,B WHERE T require_permission C '
       
   692                          'WITH A,B BEING (Any X,Y WHERE A is Contribution, '
       
   693                          'A contributor X, A manifestation Y, A role B, B name "illustrator")',
       
   694                          rqlst.as_string())
       
   695 
       
   696     def test_rewrite_with3(self):
       
   697         rules = {'participated_in': 'S contributor O'}
       
   698         rqlst = rqlhelper.parse(u'Any A,B WHERE A participated_in B '
       
   699                                 'WITH A, B BEING(Any X,Y WHERE X contributor Y)')
       
   700         rule_rewrite(rqlst, rules)
       
   701         self.assertEqual('Any A,B WHERE A contributor B WITH A,B BEING '
       
   702                          '(Any X,Y WHERE X contributor Y)',
       
   703                          rqlst.as_string())
       
   704 
       
   705     def test_rewrite_with4(self):
       
   706         rules = {'illustrator_of': 'C is Contribution, C contributor S, '
       
   707                 'C manifestation O, C role R, R name "illustrator"'}
       
   708         rqlst = rqlhelper.parse(u'Any A,B WHERE A illustrator_of B '
       
   709                                'WITH A, B BEING(Any X, Y WHERE X illustrator_of Y)')
       
   710         rule_rewrite(rqlst, rules)
       
   711         self.assertEqual('Any A,B WHERE C is Contribution, '
       
   712                          'C contributor A, C manifestation B, C role D, '
       
   713                          'D name "illustrator" WITH A,B BEING '
       
   714                          '(Any X,Y WHERE A is Contribution, A contributor X, '
       
   715                          'A manifestation Y, A role B, B name "illustrator")',
       
   716                           rqlst.as_string())
       
   717 
       
   718     # Tests for the union
       
   719     def test_rewrite_union(self):
       
   720         rules = {'illustrator_of': 'C is Contribution, C contributor S, '
       
   721                 'C manifestation O, C role R, R name "illustrator"'}
       
   722         rqlst = rqlhelper.parse(u'(Any A,B WHERE A illustrator_of B) UNION'
       
   723                                 '(Any X,Y WHERE X is CWUser, Z manifestation Y)')
       
   724         rule_rewrite(rqlst, rules)
       
   725         self.assertEqual('(Any A,B WHERE C is Contribution, '
       
   726                          'C contributor A, C manifestation B, C role D, '
       
   727                          'D name "illustrator") UNION (Any X,Y WHERE X is CWUser, Z manifestation Y)',
       
   728                          rqlst.as_string())
       
   729 
       
   730     def test_rewrite_union2(self):
       
   731         rules = {'illustrator_of': 'C is Contribution, C contributor S, '
       
   732                 'C manifestation O, C role R, R name "illustrator"'}
       
   733         rqlst = rqlhelper.parse(u'(Any Y WHERE Y match W) UNION '
       
   734                                 '(Any A WHERE A illustrator_of B) UNION '
       
   735                                 '(Any Y WHERE Y is ArtWork)')
       
   736         rule_rewrite(rqlst, rules)
       
   737         self.assertEqual('(Any Y WHERE Y match W) '
       
   738                          'UNION (Any A WHERE C is Contribution, C contributor A, '
       
   739                          'C manifestation B, C role D, D name "illustrator") '
       
   740                          'UNION (Any Y WHERE Y is ArtWork)',
       
   741                          rqlst.as_string())
       
   742 
       
   743     # Tests for the exists clause
       
   744     def test_rewrite_exists(self):
       
   745         rules = {'illustrator_of': 'C is Contribution, C contributor S, '
       
   746                 'C manifestation O, C role R, R name "illustrator"'}
       
   747         rqlst = rqlhelper.parse(u'(Any A,B WHERE A illustrator_of B, '
       
   748                      'EXISTS(B is ArtWork))')
       
   749         rule_rewrite(rqlst, rules)
       
   750         self.assertEqual('Any A,B WHERE EXISTS(B is ArtWork), '
       
   751                          'C is Contribution, C contributor A, C manifestation B, C role D, '
       
   752                          'D name "illustrator"',
       
   753                          rqlst.as_string())
       
   754 
       
   755     def test_rewrite_exists2(self):
       
   756         rules = {'illustrator_of': 'C is Contribution, C contributor S, '
       
   757                 'C manifestation O, C role R, R name "illustrator"'}
       
   758         rqlst = rqlhelper.parse(u'(Any A,B WHERE B contributor A, EXISTS(A illustrator_of W))')
       
   759         rule_rewrite(rqlst, rules)
       
   760         self.assertEqual('Any A,B WHERE B contributor A, '
       
   761                          'EXISTS(C is Contribution, C contributor A, C manifestation W, '
       
   762                          'C role D, D name "illustrator")',
       
   763                          rqlst.as_string())
       
   764 
       
   765     def test_rewrite_exists3(self):
       
   766         rules = {'illustrator_of': 'C is Contribution, C contributor S, '
       
   767                 'C manifestation O, C role R, R name "illustrator"'}
       
   768         rqlst = rqlhelper.parse(u'(Any A,B WHERE A illustrator_of B, EXISTS(A illustrator_of W))')
       
   769         rule_rewrite(rqlst, rules)
       
   770         self.assertEqual('Any A,B WHERE EXISTS(C is Contribution, C contributor A, '
       
   771                          'C manifestation W, C role D, D name "illustrator"), '
       
   772                          'E is Contribution, E contributor A, E manifestation B, E role F, '
       
   773                          'F name "illustrator"',
       
   774                          rqlst.as_string())
       
   775 
       
   776     # Test for GROUPBY
       
   777     def test_rewrite_groupby(self):
       
   778         rules = {'participated_in': 'S contributor O'}
       
   779         rqlst = rqlhelper.parse(u'Any SUM(SA) GROUPBY S WHERE P participated_in S, P manifestation SA')
       
   780         rule_rewrite(rqlst, rules)
       
   781         self.assertEqual('Any SUM(SA) GROUPBY S WHERE P manifestation SA, P contributor S',
       
   782                          rqlst.as_string())
       
   783 
       
   784 
       
   785 class RQLRelationRewriterTC(CubicWebTC):
       
   786 
       
   787     appid = 'data/rewrite'
       
   788 
       
   789     def test_base_rule(self):
       
   790         with self.admin_access.client_cnx() as cnx:
       
   791             art = cnx.create_entity('ArtWork', name=u'Les travailleurs de la Mer')
       
   792             role = cnx.create_entity('Role', name=u'illustrator')
       
   793             vic = cnx.create_entity('Person', name=u'Victor Hugo')
       
   794             contrib = cnx.create_entity('Contribution', code=96, contributor=vic,
       
   795                                         manifestation=art, role=role)
       
   796             rset = cnx.execute('Any X WHERE X illustrator_of S')
       
   797             self.assertEqual([u'Victor Hugo'],
       
   798                              [result.name for result in rset.entities()])
       
   799             rset = cnx.execute('Any S WHERE X illustrator_of S, X eid %(x)s',
       
   800                                {'x': vic.eid})
       
   801             self.assertEqual([u'Les travailleurs de la Mer'],
       
   802                              [result.name for result in rset.entities()])
       
   803 
       
   804 
       
   805 def rule_rewrite(rqlst, kwargs=None):
       
   806     rewriter = _prepare_rewriter(rqlrewrite.RQLRelationRewriter, kwargs)
       
   807     rqlhelper.compute_solutions(rqlst.children[0], {'eid': eid_func_map},
       
   808                                 kwargs=kwargs)
       
   809     rewriter.rewrite(rqlst)
       
   810     for select in rqlst.children:
       
   811         test_vrefs(select)
       
   812     return rewriter.rewritten
       
   813 
       
   814 
       
   815 if __name__ == '__main__':
       
   816     unittest_main()