|
1 """ |
|
2 |
|
3 :organization: Logilab |
|
4 :copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2. |
|
5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
6 :license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses |
|
7 """ |
|
8 from logilab.common.testlib import unittest_main, TestCase |
|
9 from logilab.common.testlib import mock_object |
|
10 |
|
11 from rql import parse, nodes, RQLHelper |
|
12 |
|
13 from cubicweb import Unauthorized |
|
14 from cubicweb.rqlrewrite import RQLRewriter |
|
15 from cubicweb.devtools import repotest, TestServerConfiguration |
|
16 |
|
17 config = TestServerConfiguration('data/rewrite') |
|
18 config.bootstrap_cubes() |
|
19 schema = config.load_schema() |
|
20 schema.add_relation_def(mock_object(subject='Card', name='in_state', object='State', cardinality='1*')) |
|
21 |
|
22 rqlhelper = RQLHelper(schema, special_relations={'eid': 'uid', |
|
23 'has_text': 'fti'}) |
|
24 |
|
25 def setup_module(*args): |
|
26 repotest.do_monkey_patch() |
|
27 |
|
28 def teardown_module(*args): |
|
29 repotest.undo_monkey_patch() |
|
30 |
|
31 def eid_func_map(eid): |
|
32 return {1: 'CWUser', |
|
33 2: 'Card'}[eid] |
|
34 |
|
35 def rewrite(rqlst, snippets_map, kwargs): |
|
36 class FakeVReg: |
|
37 schema = schema |
|
38 @staticmethod |
|
39 def solutions(sqlcursor, mainrqlst, kwargs): |
|
40 rqlhelper.compute_solutions(rqlst, {'eid': eid_func_map}, kwargs=kwargs) |
|
41 class rqlhelper: |
|
42 @staticmethod |
|
43 def annotate(rqlst): |
|
44 rqlhelper.annotate(rqlst) |
|
45 @staticmethod |
|
46 def simplify(mainrqlst, needcopy=False): |
|
47 rqlhelper.simplify(rqlst, needcopy) |
|
48 rewriter = RQLRewriter(mock_object(vreg=FakeVReg, user=(mock_object(eid=1)))) |
|
49 for v, snippets in snippets_map.items(): |
|
50 snippets_map[v] = [mock_object(snippet_rqlst=parse('Any X WHERE '+snippet).children[0], |
|
51 expression='Any X WHERE '+snippet) |
|
52 for snippet in snippets] |
|
53 rqlhelper.compute_solutions(rqlst.children[0], {'eid': eid_func_map}, kwargs=kwargs) |
|
54 solutions = rqlst.children[0].solutions |
|
55 rewriter.rewrite(rqlst.children[0], snippets_map.items(), solutions, kwargs) |
|
56 test_vrefs(rqlst.children[0]) |
|
57 return rewriter.rewritten |
|
58 |
|
59 def test_vrefs(node): |
|
60 vrefmap = {} |
|
61 for vref in node.iget_nodes(nodes.VariableRef): |
|
62 vrefmap.setdefault(vref.name, set()).add(vref) |
|
63 for var in node.defined_vars.itervalues(): |
|
64 assert not (var.stinfo['references'] ^ vrefmap[var.name]) |
|
65 assert (var.stinfo['references']) |
|
66 |
|
67 class RQLRewriteTC(TestCase): |
|
68 """a faire: |
|
69 |
|
70 * optimisation: detecter les relations utilisees dans les rqlexpressions qui |
|
71 sont presentes dans la requete de depart pour les reutiliser si possible |
|
72 |
|
73 * "has_<ACTION>_permission" ? |
|
74 """ |
|
75 |
|
76 def test_base_var(self): |
|
77 card_constraint = ('X in_state S, U in_group G, P require_state S,' |
|
78 'P name "read", P require_group G') |
|
79 rqlst = parse('Card C') |
|
80 rewrite(rqlst, {('C', 'X'): (card_constraint,)}, {}) |
|
81 self.failUnlessEqual(rqlst.as_string(), |
|
82 u"Any C WHERE C is Card, B eid %(D)s, " |
|
83 "EXISTS(C in_state A, B in_group E, F require_state A, " |
|
84 "F name 'read', F require_group E, A is State, E is CWGroup, F is CWPermission)") |
|
85 |
|
86 def test_multiple_var(self): |
|
87 card_constraint = ('X in_state S, U in_group G, P require_state S,' |
|
88 'P name "read", P require_group G') |
|
89 affaire_constraints = ('X ref LIKE "PUBLIC%"', 'U in_group G, G name "public"') |
|
90 kwargs = {'u':2} |
|
91 rqlst = parse('Any S WHERE S documented_by C, C eid %(u)s') |
|
92 rewrite(rqlst, {('C', 'X'): (card_constraint,), ('S', 'X'): affaire_constraints}, |
|
93 kwargs) |
|
94 self.assertTextEquals(rqlst.as_string(), |
|
95 "Any S WHERE S documented_by C, C eid %(u)s, B eid %(D)s, " |
|
96 "EXISTS(C in_state A, B in_group E, F require_state A, " |
|
97 "F name 'read', F require_group E, A is State, E is CWGroup, F is CWPermission), " |
|
98 "(EXISTS(S ref LIKE 'PUBLIC%')) OR (EXISTS(B in_group G, G name 'public', G is CWGroup)), " |
|
99 "S is Affaire") |
|
100 self.failUnless('D' in kwargs) |
|
101 |
|
102 def test_or(self): |
|
103 constraint = '(X identity U) OR (X in_state ST, CL identity U, CL in_state ST, ST name "subscribed")' |
|
104 rqlst = parse('Any S WHERE S owned_by C, C eid %(u)s, S is in (CWUser, CWGroup)') |
|
105 rewrite(rqlst, {('C', 'X'): (constraint,)}, {'u':1}) |
|
106 self.failUnlessEqual(rqlst.as_string(), |
|
107 "Any S WHERE S owned_by C, C eid %(u)s, S is IN(CWUser, CWGroup), A eid %(B)s, " |
|
108 "EXISTS((C identity A) OR (C in_state D, E identity A, " |
|
109 "E in_state D, D name 'subscribed'), D is State, E is CWUser)") |
|
110 |
|
111 def test_simplified_rqlst(self): |
|
112 card_constraint = ('X in_state S, U in_group G, P require_state S,' |
|
113 'P name "read", P require_group G') |
|
114 rqlst = parse('Any 2') # this is the simplified rql st for Any X WHERE X eid 12 |
|
115 rewrite(rqlst, {('2', 'X'): (card_constraint,)}, {}) |
|
116 self.failUnlessEqual(rqlst.as_string(), |
|
117 u"Any 2 WHERE B eid %(C)s, " |
|
118 "EXISTS(2 in_state A, B in_group D, E require_state A, " |
|
119 "E name 'read', E require_group D, A is State, D is CWGroup, E is CWPermission)") |
|
120 |
|
121 def test_optional_var(self): |
|
122 card_constraint = ('X in_state S, U in_group G, P require_state S,' |
|
123 'P name "read", P require_group G') |
|
124 rqlst = parse('Any A,C WHERE A documented_by C?') |
|
125 rewrite(rqlst, {('C', 'X'): (card_constraint,)}, {}) |
|
126 self.failUnlessEqual(rqlst.as_string(), |
|
127 "Any A,C WHERE A documented_by C?, A is Affaire " |
|
128 "WITH C BEING " |
|
129 "(Any C WHERE C in_state B, D in_group F, G require_state B, G name 'read', " |
|
130 "G require_group F, D eid %(A)s, C is Card)") |
|
131 rqlst = parse('Any A,C,T WHERE A documented_by C?, C title T') |
|
132 rewrite(rqlst, {('C', 'X'): (card_constraint,)}, {}) |
|
133 self.failUnlessEqual(rqlst.as_string(), |
|
134 "Any A,C,T WHERE A documented_by C?, A is Affaire " |
|
135 "WITH C,T BEING " |
|
136 "(Any C,T WHERE C in_state B, D in_group F, G require_state B, G name 'read', " |
|
137 "G require_group F, C title T, D eid %(A)s, C is Card)") |
|
138 |
|
139 def test_relation_optimization(self): |
|
140 # since Card in_state State as monovalued cardinality, the in_state |
|
141 # relation used in the rql expression can be ignored and S replaced by |
|
142 # the variable from the incoming query |
|
143 card_constraint = ('X in_state S, U in_group G, P require_state S,' |
|
144 'P name "read", P require_group G') |
|
145 rqlst = parse('Card C WHERE C in_state STATE') |
|
146 rewrite(rqlst, {('C', 'X'): (card_constraint,)}, {}) |
|
147 self.failUnlessEqual(rqlst.as_string(), |
|
148 u"Any C WHERE C in_state STATE, C is Card, A eid %(B)s, " |
|
149 "EXISTS(A in_group D, E require_state STATE, " |
|
150 "E name 'read', E require_group D, D is CWGroup, E is CWPermission), " |
|
151 "STATE is State") |
|
152 |
|
153 def test_unsupported_constraint_1(self): |
|
154 # CWUser doesn't have require_permission |
|
155 trinfo_constraint = ('X wf_info_for Y, Y require_permission P, P name "read"') |
|
156 rqlst = parse('Any U,T WHERE U is CWUser, T wf_info_for U') |
|
157 self.assertRaises(Unauthorized, rewrite, rqlst, {('T', 'X'): (trinfo_constraint,)}, {}) |
|
158 |
|
159 def test_unsupported_constraint_2(self): |
|
160 trinfo_constraint = ('X wf_info_for Y, Y require_permission P, P name "read"') |
|
161 rqlst = parse('Any U,T WHERE U is CWUser, T wf_info_for U') |
|
162 rewrite(rqlst, {('T', 'X'): (trinfo_constraint, 'X wf_info_for Y, Y in_group G, G name "managers"')}, {}) |
|
163 self.failUnlessEqual(rqlst.as_string(), |
|
164 u"Any U,T WHERE U is CWUser, T wf_info_for U, " |
|
165 "EXISTS(U in_group B, B name 'managers', B is CWGroup), T is TrInfo") |
|
166 |
|
167 def test_unsupported_constraint_3(self): |
|
168 self.skip('raise unauthorized for now') |
|
169 trinfo_constraint = ('X wf_info_for Y, Y require_permission P, P name "read"') |
|
170 rqlst = parse('Any T WHERE T wf_info_for X') |
|
171 rewrite(rqlst, {('T', 'X'): (trinfo_constraint, 'X in_group G, G name "managers"')}, {}) |
|
172 self.failUnlessEqual(rqlst.as_string(), |
|
173 u'XXX dunno what should be generated') |
|
174 |
|
175 def test_add_ambiguity_exists(self): |
|
176 constraint = ('X concerne Y') |
|
177 rqlst = parse('Affaire X') |
|
178 rewrite(rqlst, {('X', 'X'): (constraint,)}, {}) |
|
179 self.failUnlessEqual(rqlst.as_string(), |
|
180 u"Any X WHERE X is Affaire, ((EXISTS(X concerne A, A is Division)) OR (EXISTS(X concerne C, C is Societe))) OR (EXISTS(X concerne B, B is Note))") |
|
181 |
|
182 def test_add_ambiguity_outerjoin(self): |
|
183 constraint = ('X concerne Y') |
|
184 rqlst = parse('Any X,C WHERE X? documented_by C') |
|
185 rewrite(rqlst, {('X', 'X'): (constraint,)}, {}) |
|
186 # ambiguity are kept in the sub-query, no need to be resolved using OR |
|
187 self.failUnlessEqual(rqlst.as_string(), |
|
188 u"Any X,C WHERE X? documented_by C, C is Card WITH X BEING (Any X WHERE X concerne A, X is Affaire)") |
|
189 |
|
190 |
|
191 |
|
192 if __name__ == '__main__': |
|
193 unittest_main() |