server/test/unittest_rqlrewrite.py
changeset 0 b97547f5f1fa
child 47 54087a269bdd
equal deleted inserted replaced
-1:000000000000 0:b97547f5f1fa
       
     1 from logilab.common.testlib import unittest_main, TestCase
       
     2 from logilab.common.testlib import mock_object
       
     3 
       
     4 from rql import parse, nodes, RQLHelper
       
     5 
       
     6 from cubicweb import Unauthorized
       
     7 from cubicweb.server.rqlrewrite import RQLRewriter
       
     8 from cubicweb.devtools import repotest, TestServerConfiguration
       
     9 
       
    10 config = TestServerConfiguration('data')
       
    11 config.bootstrap_cubes()
       
    12 schema = config.load_schema()
       
    13 schema.add_relation_def(mock_object(subject='Card', name='in_state', object='State', cardinality='1*'))
       
    14                         
       
    15 rqlhelper = RQLHelper(schema, special_relations={'eid': 'uid',
       
    16                                                  'has_text': 'fti'})
       
    17 
       
    18 def setup_module(*args):
       
    19     repotest.do_monkey_patch()
       
    20 
       
    21 def teardown_module(*args):
       
    22     repotest.undo_monkey_patch()
       
    23     
       
    24 def eid_func_map(eid):
       
    25     return {1: 'EUser',
       
    26             2: 'Card'}[eid]
       
    27 
       
    28 def rewrite(rqlst, snippets_map, kwargs):
       
    29     class FakeQuerier:
       
    30         schema = schema
       
    31         @staticmethod
       
    32         def solutions(sqlcursor, mainrqlst, kwargs):
       
    33             rqlhelper.compute_solutions(rqlst, {'eid': eid_func_map}, kwargs=kwargs)
       
    34         class _rqlhelper:
       
    35             @staticmethod
       
    36             def annotate(rqlst):
       
    37                 rqlhelper.annotate(rqlst)
       
    38             @staticmethod
       
    39             def simplify(mainrqlst, needcopy=False):
       
    40                 rqlhelper.simplify(rqlst, needcopy)
       
    41     rewriter = RQLRewriter(FakeQuerier, mock_object(user=(mock_object(eid=1))))
       
    42     for v, snippets in snippets_map.items():
       
    43         snippets_map[v] = [mock_object(snippet_rqlst=parse('Any X WHERE '+snippet).children[0],
       
    44                                        expression='Any X WHERE '+snippet) 
       
    45                            for snippet in snippets]
       
    46     rqlhelper.compute_solutions(rqlst.children[0], {'eid': eid_func_map}, kwargs=kwargs)
       
    47     solutions = rqlst.children[0].solutions
       
    48     rewriter.rewrite(rqlst.children[0], snippets_map.items(), solutions, kwargs)
       
    49     test_vrefs(rqlst.children[0])
       
    50     return rewriter.rewritten
       
    51 
       
    52 def test_vrefs(node):
       
    53     vrefmap = {}
       
    54     for vref in node.iget_nodes(nodes.VariableRef):
       
    55         vrefmap.setdefault(vref.name, set()).add(vref)
       
    56     for var in node.defined_vars.itervalues():
       
    57         assert not (var.stinfo['references'] ^ vrefmap[var.name])
       
    58         assert (var.stinfo['references'])
       
    59 
       
    60 class RQLRewriteTC(TestCase):
       
    61     """a faire:
       
    62 
       
    63     * optimisation: detecter les relations utilisees dans les rqlexpressions qui
       
    64       sont presentes dans la requete de depart pour les reutiliser si possible
       
    65       
       
    66     * "has_<ACTION>_permission" ?
       
    67     """
       
    68     
       
    69     def test_base_var(self):
       
    70         card_constraint = ('X in_state S, U in_group G, P require_state S,'
       
    71                            'P name "read", P require_group G')
       
    72         rqlst = parse('Card C')
       
    73         rewrite(rqlst, {'C': (card_constraint,)}, {})
       
    74         self.failUnlessEqual(rqlst.as_string(),
       
    75                              u"Any C WHERE C is Card, B eid %(D)s, "
       
    76                              "EXISTS(C in_state A, B in_group E, F require_state A, "
       
    77                              "F name 'read', F require_group E, A is State, E is EGroup, F is EPermission)")
       
    78         
       
    79     def test_multiple_var(self):
       
    80         card_constraint = ('X in_state S, U in_group G, P require_state S,'
       
    81                            'P name "read", P require_group G')
       
    82         affaire_constraints = ('X ref LIKE "PUBLIC%"', 'U in_group G, G name "public"')
       
    83         kwargs = {'u':2}
       
    84         rqlst = parse('Any S WHERE S documented_by C, C eid %(u)s')
       
    85         rewrite(rqlst, {'C': (card_constraint,), 'S': affaire_constraints},
       
    86                 kwargs)
       
    87         self.assertTextEquals(rqlst.as_string(),
       
    88                              "Any S WHERE S documented_by C, C eid %(u)s, B eid %(D)s, "
       
    89                              "EXISTS(C in_state A, B in_group E, F require_state A, "
       
    90                              "F name 'read', F require_group E, A is State, E is EGroup, F is EPermission), "
       
    91                              "(EXISTS(S ref LIKE 'PUBLIC%')) OR (EXISTS(B in_group G, G name 'public', G is EGroup)), "
       
    92                              "S is Affaire")
       
    93         self.failUnless('D' in kwargs)
       
    94         
       
    95     def test_or(self):
       
    96         constraint = '(X identity U) OR (X in_state ST, CL identity U, CL in_state ST, ST name "subscribed")'
       
    97         rqlst = parse('Any S WHERE S owned_by C, C eid %(u)s')
       
    98         rewrite(rqlst, {'C': (constraint,)}, {'u':1})
       
    99         self.failUnlessEqual(rqlst.as_string(),
       
   100                              "Any S WHERE S owned_by C, C eid %(u)s, A eid %(B)s, "
       
   101                              "EXISTS((C identity A) OR (C in_state D, E identity A, "
       
   102                              "E in_state D, D name 'subscribed'), D is State, E is EUser), "
       
   103                              "S is IN(Affaire, Basket, Bookmark, Card, Comment, Division, EConstraint, EConstraintType, EEType, EFRDef, EGroup, ENFRDef, EPermission, EProperty, ERType, EUser, Email, EmailAddress, EmailPart, EmailThread, File, Folder, Image, Note, Personne, RQLExpression, Societe, State, SubDivision, Tag, TrInfo, Transition)")
       
   104 
       
   105     def test_simplified_rqlst(self):
       
   106         card_constraint = ('X in_state S, U in_group G, P require_state S,'
       
   107                            'P name "read", P require_group G')
       
   108         rqlst = parse('Any 2') # this is the simplified rql st for Any X WHERE X eid 12
       
   109         rewrite(rqlst, {'2': (card_constraint,)}, {})
       
   110         self.failUnlessEqual(rqlst.as_string(),
       
   111                              u"Any 2 WHERE B eid %(C)s, "
       
   112                              "EXISTS(2 in_state A, B in_group D, E require_state A, "
       
   113                              "E name 'read', E require_group D, A is State, D is EGroup, E is EPermission)")
       
   114 
       
   115     def test_optional_var(self):
       
   116         card_constraint = ('X in_state S, U in_group G, P require_state S,'
       
   117                            'P name "read", P require_group G')
       
   118         rqlst = parse('Any A,C WHERE A documented_by C?')
       
   119         rewrite(rqlst, {'C': (card_constraint,)}, {})
       
   120         self.failUnlessEqual(rqlst.as_string(),
       
   121                              "Any A,C WHERE A documented_by C?, A is Affaire "
       
   122                              "WITH C BEING "
       
   123                              "(Any C WHERE C in_state B, D in_group F, G require_state B, G name 'read', "
       
   124                              "G require_group F, D eid %(A)s, C is Card)")
       
   125         rqlst = parse('Any A,C,T WHERE A documented_by C?, C title T')
       
   126         rewrite(rqlst, {'C': (card_constraint,)}, {})
       
   127         self.failUnlessEqual(rqlst.as_string(),
       
   128                              "Any A,C,T WHERE A documented_by C?, A is Affaire "
       
   129                              "WITH C,T BEING "
       
   130                              "(Any C,T WHERE C in_state B, D in_group F, G require_state B, G name 'read', "
       
   131                              "G require_group F, C title T, D eid %(A)s, C is Card)")
       
   132         
       
   133     def test_relation_optimization(self):
       
   134         # since Card in_state State as monovalued cardinality, the in_state
       
   135         # relation used in the rql expression can be ignored and S replaced by
       
   136         # the variable from the incoming query
       
   137         card_constraint = ('X in_state S, U in_group G, P require_state S,'
       
   138                            'P name "read", P require_group G')
       
   139         rqlst = parse('Card C WHERE C in_state STATE')
       
   140         rewrite(rqlst, {'C': (card_constraint,)}, {})
       
   141         self.failUnlessEqual(rqlst.as_string(),
       
   142                              u"Any C WHERE C in_state STATE, C is Card, A eid %(B)s, "
       
   143                              "EXISTS(A in_group D, E require_state STATE, "
       
   144                              "E name 'read', E require_group D, D is EGroup, E is EPermission), "
       
   145                              "STATE is State")
       
   146 
       
   147     def test_unsupported_constraint_1(self):
       
   148         # EUser doesn't have require_permission
       
   149         trinfo_constraint = ('X wf_info_for Y, Y require_permission P, P name "read"')
       
   150         rqlst = parse('Any U,T WHERE U is EUser, T wf_info_for U')
       
   151         self.assertRaises(Unauthorized, rewrite, rqlst, {'T': (trinfo_constraint,)}, {})
       
   152         
       
   153     def test_unsupported_constraint_2(self):
       
   154         trinfo_constraint = ('X wf_info_for Y, Y require_permission P, P name "read"')
       
   155         rqlst = parse('Any U,T WHERE U is EUser, T wf_info_for U')
       
   156         rewrite(rqlst, {'T': (trinfo_constraint, 'X wf_info_for Y, Y in_group G, G name "managers"')}, {})
       
   157         self.failUnlessEqual(rqlst.as_string(),
       
   158                              u"Any U,T WHERE U is EUser, T wf_info_for U, "
       
   159                              "EXISTS(U in_group B, B name 'managers', B is EGroup), T is TrInfo")
       
   160 
       
   161     def test_unsupported_constraint_3(self):
       
   162         self.skip('raise unauthorized for now')
       
   163         trinfo_constraint = ('X wf_info_for Y, Y require_permission P, P name "read"')
       
   164         rqlst = parse('Any T WHERE T wf_info_for X')
       
   165         rewrite(rqlst, {'T': (trinfo_constraint, 'X in_group G, G name "managers"')}, {})
       
   166         self.failUnlessEqual(rqlst.as_string(),
       
   167                              u'XXX dunno what should be generated')
       
   168         
       
   169     def test_add_ambiguity_exists(self):
       
   170         constraint = ('X concerne Y')
       
   171         rqlst = parse('Affaire X')
       
   172         rewrite(rqlst, {'X': (constraint,)}, {})
       
   173         self.failUnlessEqual(rqlst.as_string(),
       
   174                              u"Any X WHERE X is Affaire, (((EXISTS(X concerne A, A is Division)) OR (EXISTS(X concerne D, D is SubDivision))) OR (EXISTS(X concerne C, C is Societe))) OR (EXISTS(X concerne B, B is Note))")
       
   175         
       
   176     def test_add_ambiguity_outerjoin(self):
       
   177         constraint = ('X concerne Y')
       
   178         rqlst = parse('Any X,C WHERE X? documented_by C')
       
   179         rewrite(rqlst, {'X': (constraint,)}, {})
       
   180         # ambiguity are kept in the sub-query, no need to be resolved using OR
       
   181         self.failUnlessEqual(rqlst.as_string(),
       
   182                              u"Any X,C WHERE X? documented_by C, C is Card WITH X BEING (Any X WHERE X concerne A, X is Affaire)") 
       
   183        
       
   184         
       
   185         
       
   186 if __name__ == '__main__':
       
   187     unittest_main()