|
1 from logilab.common.testlib import unittest_main, TestCase |
|
2 from logilab.common.testlib import mock_object |
|
3 |
|
4 from rql import parse, nodes, RQLHelper |
|
5 |
|
6 from cubicweb import Unauthorized |
|
7 from cubicweb.server.rqlrewrite import RQLRewriter |
|
8 from cubicweb.devtools import repotest, TestServerConfiguration |
|
9 |
|
10 config = TestServerConfiguration('data') |
|
11 config.bootstrap_cubes() |
|
12 schema = config.load_schema() |
|
13 schema.add_relation_def(mock_object(subject='Card', name='in_state', object='State', cardinality='1*')) |
|
14 |
|
15 rqlhelper = RQLHelper(schema, special_relations={'eid': 'uid', |
|
16 'has_text': 'fti'}) |
|
17 |
|
18 def setup_module(*args): |
|
19 repotest.do_monkey_patch() |
|
20 |
|
21 def teardown_module(*args): |
|
22 repotest.undo_monkey_patch() |
|
23 |
|
24 def eid_func_map(eid): |
|
25 return {1: 'EUser', |
|
26 2: 'Card'}[eid] |
|
27 |
|
28 def rewrite(rqlst, snippets_map, kwargs): |
|
29 class FakeQuerier: |
|
30 schema = schema |
|
31 @staticmethod |
|
32 def solutions(sqlcursor, mainrqlst, kwargs): |
|
33 rqlhelper.compute_solutions(rqlst, {'eid': eid_func_map}, kwargs=kwargs) |
|
34 class _rqlhelper: |
|
35 @staticmethod |
|
36 def annotate(rqlst): |
|
37 rqlhelper.annotate(rqlst) |
|
38 @staticmethod |
|
39 def simplify(mainrqlst, needcopy=False): |
|
40 rqlhelper.simplify(rqlst, needcopy) |
|
41 rewriter = RQLRewriter(FakeQuerier, mock_object(user=(mock_object(eid=1)))) |
|
42 for v, snippets in snippets_map.items(): |
|
43 snippets_map[v] = [mock_object(snippet_rqlst=parse('Any X WHERE '+snippet).children[0], |
|
44 expression='Any X WHERE '+snippet) |
|
45 for snippet in snippets] |
|
46 rqlhelper.compute_solutions(rqlst.children[0], {'eid': eid_func_map}, kwargs=kwargs) |
|
47 solutions = rqlst.children[0].solutions |
|
48 rewriter.rewrite(rqlst.children[0], snippets_map.items(), solutions, kwargs) |
|
49 test_vrefs(rqlst.children[0]) |
|
50 return rewriter.rewritten |
|
51 |
|
52 def test_vrefs(node): |
|
53 vrefmap = {} |
|
54 for vref in node.iget_nodes(nodes.VariableRef): |
|
55 vrefmap.setdefault(vref.name, set()).add(vref) |
|
56 for var in node.defined_vars.itervalues(): |
|
57 assert not (var.stinfo['references'] ^ vrefmap[var.name]) |
|
58 assert (var.stinfo['references']) |
|
59 |
|
60 class RQLRewriteTC(TestCase): |
|
61 """a faire: |
|
62 |
|
63 * optimisation: detecter les relations utilisees dans les rqlexpressions qui |
|
64 sont presentes dans la requete de depart pour les reutiliser si possible |
|
65 |
|
66 * "has_<ACTION>_permission" ? |
|
67 """ |
|
68 |
|
69 def test_base_var(self): |
|
70 card_constraint = ('X in_state S, U in_group G, P require_state S,' |
|
71 'P name "read", P require_group G') |
|
72 rqlst = parse('Card C') |
|
73 rewrite(rqlst, {'C': (card_constraint,)}, {}) |
|
74 self.failUnlessEqual(rqlst.as_string(), |
|
75 u"Any C WHERE C is Card, B eid %(D)s, " |
|
76 "EXISTS(C in_state A, B in_group E, F require_state A, " |
|
77 "F name 'read', F require_group E, A is State, E is EGroup, F is EPermission)") |
|
78 |
|
79 def test_multiple_var(self): |
|
80 card_constraint = ('X in_state S, U in_group G, P require_state S,' |
|
81 'P name "read", P require_group G') |
|
82 affaire_constraints = ('X ref LIKE "PUBLIC%"', 'U in_group G, G name "public"') |
|
83 kwargs = {'u':2} |
|
84 rqlst = parse('Any S WHERE S documented_by C, C eid %(u)s') |
|
85 rewrite(rqlst, {'C': (card_constraint,), 'S': affaire_constraints}, |
|
86 kwargs) |
|
87 self.assertTextEquals(rqlst.as_string(), |
|
88 "Any S WHERE S documented_by C, C eid %(u)s, B eid %(D)s, " |
|
89 "EXISTS(C in_state A, B in_group E, F require_state A, " |
|
90 "F name 'read', F require_group E, A is State, E is EGroup, F is EPermission), " |
|
91 "(EXISTS(S ref LIKE 'PUBLIC%')) OR (EXISTS(B in_group G, G name 'public', G is EGroup)), " |
|
92 "S is Affaire") |
|
93 self.failUnless('D' in kwargs) |
|
94 |
|
95 def test_or(self): |
|
96 constraint = '(X identity U) OR (X in_state ST, CL identity U, CL in_state ST, ST name "subscribed")' |
|
97 rqlst = parse('Any S WHERE S owned_by C, C eid %(u)s') |
|
98 rewrite(rqlst, {'C': (constraint,)}, {'u':1}) |
|
99 self.failUnlessEqual(rqlst.as_string(), |
|
100 "Any S WHERE S owned_by C, C eid %(u)s, A eid %(B)s, " |
|
101 "EXISTS((C identity A) OR (C in_state D, E identity A, " |
|
102 "E in_state D, D name 'subscribed'), D is State, E is EUser), " |
|
103 "S is IN(Affaire, Basket, Bookmark, Card, Comment, Division, EConstraint, EConstraintType, EEType, EFRDef, EGroup, ENFRDef, EPermission, EProperty, ERType, EUser, Email, EmailAddress, EmailPart, EmailThread, File, Folder, Image, Note, Personne, RQLExpression, Societe, State, SubDivision, Tag, TrInfo, Transition)") |
|
104 |
|
105 def test_simplified_rqlst(self): |
|
106 card_constraint = ('X in_state S, U in_group G, P require_state S,' |
|
107 'P name "read", P require_group G') |
|
108 rqlst = parse('Any 2') # this is the simplified rql st for Any X WHERE X eid 12 |
|
109 rewrite(rqlst, {'2': (card_constraint,)}, {}) |
|
110 self.failUnlessEqual(rqlst.as_string(), |
|
111 u"Any 2 WHERE B eid %(C)s, " |
|
112 "EXISTS(2 in_state A, B in_group D, E require_state A, " |
|
113 "E name 'read', E require_group D, A is State, D is EGroup, E is EPermission)") |
|
114 |
|
115 def test_optional_var(self): |
|
116 card_constraint = ('X in_state S, U in_group G, P require_state S,' |
|
117 'P name "read", P require_group G') |
|
118 rqlst = parse('Any A,C WHERE A documented_by C?') |
|
119 rewrite(rqlst, {'C': (card_constraint,)}, {}) |
|
120 self.failUnlessEqual(rqlst.as_string(), |
|
121 "Any A,C WHERE A documented_by C?, A is Affaire " |
|
122 "WITH C BEING " |
|
123 "(Any C WHERE C in_state B, D in_group F, G require_state B, G name 'read', " |
|
124 "G require_group F, D eid %(A)s, C is Card)") |
|
125 rqlst = parse('Any A,C,T WHERE A documented_by C?, C title T') |
|
126 rewrite(rqlst, {'C': (card_constraint,)}, {}) |
|
127 self.failUnlessEqual(rqlst.as_string(), |
|
128 "Any A,C,T WHERE A documented_by C?, A is Affaire " |
|
129 "WITH C,T BEING " |
|
130 "(Any C,T WHERE C in_state B, D in_group F, G require_state B, G name 'read', " |
|
131 "G require_group F, C title T, D eid %(A)s, C is Card)") |
|
132 |
|
133 def test_relation_optimization(self): |
|
134 # since Card in_state State as monovalued cardinality, the in_state |
|
135 # relation used in the rql expression can be ignored and S replaced by |
|
136 # the variable from the incoming query |
|
137 card_constraint = ('X in_state S, U in_group G, P require_state S,' |
|
138 'P name "read", P require_group G') |
|
139 rqlst = parse('Card C WHERE C in_state STATE') |
|
140 rewrite(rqlst, {'C': (card_constraint,)}, {}) |
|
141 self.failUnlessEqual(rqlst.as_string(), |
|
142 u"Any C WHERE C in_state STATE, C is Card, A eid %(B)s, " |
|
143 "EXISTS(A in_group D, E require_state STATE, " |
|
144 "E name 'read', E require_group D, D is EGroup, E is EPermission), " |
|
145 "STATE is State") |
|
146 |
|
147 def test_unsupported_constraint_1(self): |
|
148 # EUser doesn't have require_permission |
|
149 trinfo_constraint = ('X wf_info_for Y, Y require_permission P, P name "read"') |
|
150 rqlst = parse('Any U,T WHERE U is EUser, T wf_info_for U') |
|
151 self.assertRaises(Unauthorized, rewrite, rqlst, {'T': (trinfo_constraint,)}, {}) |
|
152 |
|
153 def test_unsupported_constraint_2(self): |
|
154 trinfo_constraint = ('X wf_info_for Y, Y require_permission P, P name "read"') |
|
155 rqlst = parse('Any U,T WHERE U is EUser, T wf_info_for U') |
|
156 rewrite(rqlst, {'T': (trinfo_constraint, 'X wf_info_for Y, Y in_group G, G name "managers"')}, {}) |
|
157 self.failUnlessEqual(rqlst.as_string(), |
|
158 u"Any U,T WHERE U is EUser, T wf_info_for U, " |
|
159 "EXISTS(U in_group B, B name 'managers', B is EGroup), T is TrInfo") |
|
160 |
|
161 def test_unsupported_constraint_3(self): |
|
162 self.skip('raise unauthorized for now') |
|
163 trinfo_constraint = ('X wf_info_for Y, Y require_permission P, P name "read"') |
|
164 rqlst = parse('Any T WHERE T wf_info_for X') |
|
165 rewrite(rqlst, {'T': (trinfo_constraint, 'X in_group G, G name "managers"')}, {}) |
|
166 self.failUnlessEqual(rqlst.as_string(), |
|
167 u'XXX dunno what should be generated') |
|
168 |
|
169 def test_add_ambiguity_exists(self): |
|
170 constraint = ('X concerne Y') |
|
171 rqlst = parse('Affaire X') |
|
172 rewrite(rqlst, {'X': (constraint,)}, {}) |
|
173 self.failUnlessEqual(rqlst.as_string(), |
|
174 u"Any X WHERE X is Affaire, (((EXISTS(X concerne A, A is Division)) OR (EXISTS(X concerne D, D is SubDivision))) OR (EXISTS(X concerne C, C is Societe))) OR (EXISTS(X concerne B, B is Note))") |
|
175 |
|
176 def test_add_ambiguity_outerjoin(self): |
|
177 constraint = ('X concerne Y') |
|
178 rqlst = parse('Any X,C WHERE X? documented_by C') |
|
179 rewrite(rqlst, {'X': (constraint,)}, {}) |
|
180 # ambiguity are kept in the sub-query, no need to be resolved using OR |
|
181 self.failUnlessEqual(rqlst.as_string(), |
|
182 u"Any X,C WHERE X? documented_by C, C is Card WITH X BEING (Any X WHERE X concerne A, X is Affaire)") |
|
183 |
|
184 |
|
185 |
|
186 if __name__ == '__main__': |
|
187 unittest_main() |