test/unittest_rqlrewrite.py
branchstable
changeset 3826 0c0c051863cb
parent 3443 34e451da9b5d
child 3877 7ca53fc72a0a
child 4212 ab6573088b4a
equal deleted inserted replaced
3825:b54c8d664dd6 3826:0c0c051863cb
     9 from logilab.common.testlib import mock_object
     9 from logilab.common.testlib import mock_object
    10 
    10 
    11 from rql import parse, nodes, RQLHelper
    11 from rql import parse, nodes, RQLHelper
    12 
    12 
    13 from cubicweb import Unauthorized
    13 from cubicweb import Unauthorized
       
    14 from cubicweb.schema import RRQLExpression
    14 from cubicweb.rqlrewrite import RQLRewriter
    15 from cubicweb.rqlrewrite import RQLRewriter
    15 from cubicweb.devtools import repotest, TestServerConfiguration
    16 from cubicweb.devtools import repotest, TestServerConfiguration
    16 
    17 
    17 config = TestServerConfiguration('data/rewrite')
    18 config = TestServerConfiguration('data/rewrite')
    18 config.bootstrap_cubes()
    19 config.bootstrap_cubes()
    30 
    31 
    31 def eid_func_map(eid):
    32 def eid_func_map(eid):
    32     return {1: 'CWUser',
    33     return {1: 'CWUser',
    33             2: 'Card'}[eid]
    34             2: 'Card'}[eid]
    34 
    35 
    35 def rewrite(rqlst, snippets_map, kwargs):
    36 def rewrite(rqlst, snippets_map, kwargs, existingvars=None):
    36     class FakeVReg:
    37     class FakeVReg:
    37         schema = schema
    38         schema = schema
    38         @staticmethod
    39         @staticmethod
    39         def solutions(sqlcursor, mainrqlst, kwargs):
    40         def solutions(sqlcursor, mainrqlst, kwargs):
    40             rqlhelper.compute_solutions(rqlst, {'eid': eid_func_map}, kwargs=kwargs)
    41             rqlhelper.compute_solutions(rqlst, {'eid': eid_func_map}, kwargs=kwargs)
    45             @staticmethod
    46             @staticmethod
    46             def simplify(mainrqlst, needcopy=False):
    47             def simplify(mainrqlst, needcopy=False):
    47                 rqlhelper.simplify(rqlst, needcopy)
    48                 rqlhelper.simplify(rqlst, needcopy)
    48     rewriter = RQLRewriter(mock_object(vreg=FakeVReg, user=(mock_object(eid=1))))
    49     rewriter = RQLRewriter(mock_object(vreg=FakeVReg, user=(mock_object(eid=1))))
    49     for v, snippets in snippets_map.items():
    50     for v, snippets in snippets_map.items():
    50         snippets_map[v] = [mock_object(snippet_rqlst=parse('Any X WHERE '+snippet).children[0],
    51         snippets_map[v] = [isinstance(snippet, basestring)
    51                                        expression='Any X WHERE '+snippet)
    52                            and mock_object(snippet_rqlst=parse('Any X WHERE '+snippet).children[0],
       
    53                                            expression='Any X WHERE '+snippet)
       
    54                            or snippet
    52                            for snippet in snippets]
    55                            for snippet in snippets]
    53     rqlhelper.compute_solutions(rqlst.children[0], {'eid': eid_func_map}, kwargs=kwargs)
    56     rqlhelper.compute_solutions(rqlst.children[0], {'eid': eid_func_map}, kwargs=kwargs)
    54     solutions = rqlst.children[0].solutions
    57     solutions = rqlst.children[0].solutions
    55     rewriter.rewrite(rqlst.children[0], snippets_map.items(), solutions, kwargs)
    58     rewriter.rewrite(rqlst.children[0], snippets_map.items(), solutions, kwargs,
       
    59                      existingvars)
    56     test_vrefs(rqlst.children[0])
    60     test_vrefs(rqlst.children[0])
    57     return rewriter.rewritten
    61     return rewriter.rewritten
    58 
    62 
    59 def test_vrefs(node):
    63 def test_vrefs(node):
    60     vrefmap = {}
    64     vrefmap = {}
   239         # ambiguity are kept in the sub-query, no need to be resolved using OR
   243         # ambiguity are kept in the sub-query, no need to be resolved using OR
   240         self.failUnlessEqual(rqlst.as_string(),
   244         self.failUnlessEqual(rqlst.as_string(),
   241                              u"Any X,C WHERE X? documented_by C, C is Card WITH X BEING (Any X WHERE X concerne A, X is Affaire)")
   245                              u"Any X,C WHERE X? documented_by C, C is Card WITH X BEING (Any X WHERE X concerne A, X is Affaire)")
   242 
   246 
   243 
   247 
       
   248     def test_rrqlexpr_nonexistant_subject_1(self):
       
   249         constraint = RRQLExpression('S owned_by U')
       
   250         rqlst = parse('Card C')
       
   251         rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'SU')
       
   252         self.failUnlessEqual(rqlst.as_string(),
       
   253                              u"Any C WHERE C is Card, A eid %(B)s, EXISTS(C owned_by A)")
       
   254         rqlst = parse('Card C')
       
   255         rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'OU')
       
   256         self.failUnlessEqual(rqlst.as_string(),
       
   257                              u"Any C WHERE C is Card")
       
   258         rqlst = parse('Card C')
       
   259         rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'SOU')
       
   260         self.failUnlessEqual(rqlst.as_string(),
       
   261                              u"Any C WHERE C is Card, A eid %(B)s, EXISTS(C owned_by A)")
       
   262 
       
   263     def test_rrqlexpr_nonexistant_subject_2(self):
       
   264         constraint = RRQLExpression('S owned_by U, O owned_by U, O is Card')
       
   265         rqlst = parse('Card C')
       
   266         rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'SU')
       
   267         self.failUnlessEqual(rqlst.as_string(),
       
   268                              'Any C WHERE C is Card, A eid %(B)s, EXISTS(C owned_by A)')
       
   269         rqlst = parse('Card C')
       
   270         rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'OU')
       
   271         self.failUnlessEqual(rqlst.as_string(),
       
   272                              'Any C WHERE C is Card, B eid %(D)s, EXISTS(A owned_by B, A is Card)')
       
   273         rqlst = parse('Card C')
       
   274         rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'SOU')
       
   275         self.failUnlessEqual(rqlst.as_string(),
       
   276                              'Any C WHERE C is Card, A eid %(B)s, EXISTS(C owned_by A, D owned_by A, D is Card)')
       
   277 
       
   278     def test_rrqlexpr_nonexistant_subject_3(self):
       
   279         constraint = RRQLExpression('U in_group G, G name "users"')
       
   280         rqlst = parse('Card C')
       
   281         rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'SU')
       
   282         self.failUnlessEqual(rqlst.as_string(),
       
   283                              u'Any C WHERE C is Card, A eid %(B)s, EXISTS(A in_group D, D name "users", D is CWGroup)')
       
   284 
       
   285     def test_rrqlexpr_nonexistant_subject_4(self):
       
   286         constraint = RRQLExpression('U in_group G, G name "users", S owned_by U')
       
   287         rqlst = parse('Card C')
       
   288         rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'SU')
       
   289         self.failUnlessEqual(rqlst.as_string(),
       
   290                              u'Any C WHERE C is Card, A eid %(B)s, EXISTS(A in_group D, D name "users", C owned_by A, D is CWGroup)')
       
   291         rqlst = parse('Card C')
       
   292         rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'OU')
       
   293         self.failUnlessEqual(rqlst.as_string(),
       
   294                              u'Any C WHERE C is Card, A eid %(B)s, EXISTS(A in_group D, D name "users", D is CWGroup)')
       
   295 
       
   296     def test_rrqlexpr_nonexistant_subject_5(self):
       
   297         constraint = RRQLExpression('S owned_by Z, O owned_by Z, O is Card')
       
   298         rqlst = parse('Card C')
       
   299         rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'S')
       
   300         self.failUnlessEqual(rqlst.as_string(),
       
   301                              u"Any C WHERE C is Card, EXISTS(C owned_by A, A is CWUser)")
       
   302 
   244 
   303 
   245 if __name__ == '__main__':
   304 if __name__ == '__main__':
   246     unittest_main()
   305     unittest_main()