110 pass |
110 pass |
111 |
111 |
112 def test_preprocess_1(self): |
112 def test_preprocess_1(self): |
113 with self.session.new_cnx() as cnx: |
113 with self.session.new_cnx() as cnx: |
114 reid = cnx.execute('Any X WHERE X is CWRType, X name "owned_by"')[0][0] |
114 reid = cnx.execute('Any X WHERE X is CWRType, X name "owned_by"')[0][0] |
115 rqlst = self._prepare('Any COUNT(RDEF) WHERE RDEF relation_type X, X eid %(x)s', |
115 rqlst = self._prepare(cnx, 'Any COUNT(RDEF) WHERE RDEF relation_type X, X eid %(x)s', |
116 {'x': reid}) |
116 {'x': reid}) |
117 self.assertEqual([{'RDEF': 'CWAttribute'}, {'RDEF': 'CWRelation'}], |
117 self.assertEqual([{'RDEF': 'CWAttribute'}, {'RDEF': 'CWRelation'}], |
118 rqlst.solutions) |
118 rqlst.solutions) |
119 |
119 |
120 def test_preprocess_2(self): |
120 def test_preprocess_2(self): |
121 teid = self.qexecute("INSERT Tag X: X name 'tag'")[0][0] |
121 with self.session.new_cnx() as cnx: |
122 #geid = self.execute("CWGroup G WHERE G name 'users'")[0][0] |
122 teid = cnx.execute("INSERT Tag X: X name 'tag'")[0][0] |
123 #self.execute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s", |
123 #geid = self.execute("CWGroup G WHERE G name 'users'")[0][0] |
124 # {'g': geid, 't': teid}, 'g') |
124 #self.execute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s", |
125 rqlst = self._prepare('Any X WHERE E eid %(x)s, E tags X', {'x': teid}) |
125 # {'g': geid, 't': teid}, 'g') |
126 # the query may be optimized, should keep only one solution |
126 rqlst = self._prepare(cnx, 'Any X WHERE E eid %(x)s, E tags X', {'x': teid}) |
127 # (any one, etype will be discarded) |
127 # the query may be optimized, should keep only one solution |
128 self.assertEqual(1, len(rqlst.solutions)) |
128 # (any one, etype will be discarded) |
|
129 self.assertEqual(1, len(rqlst.solutions)) |
129 |
130 |
130 def assertRQLEqual(self, expected, got): |
131 def assertRQLEqual(self, expected, got): |
131 from rql import parse |
132 from rql import parse |
132 self.assertMultiLineEqual(unicode(parse(expected)), |
133 self.assertMultiLineEqual(unicode(parse(expected)), |
133 unicode(parse(got))) |
134 unicode(parse(got))) |
134 |
135 |
135 def test_preprocess_security(self): |
136 def test_preprocess_security(self): |
136 plan = self._prepare_plan('Any ETN,COUNT(X) GROUPBY ETN ' |
137 s = self.user_groups_session('users') |
137 'WHERE X is ET, ET name ETN') |
138 with s.new_cnx() as cnx: |
138 plan.cnx = self.user_groups_session('users') |
139 plan = self._prepare_plan(cnx, 'Any ETN,COUNT(X) GROUPBY ETN ' |
139 union = plan.rqlst |
140 'WHERE X is ET, ET name ETN') |
140 plan.preprocess(union) |
141 union = plan.rqlst |
141 self.assertEqual(len(union.children), 1) |
142 plan.preprocess(union) |
142 self.assertEqual(len(union.children[0].with_), 1) |
143 self.assertEqual(len(union.children), 1) |
143 subq = union.children[0].with_[0].query |
144 self.assertEqual(len(union.children[0].with_), 1) |
144 self.assertEqual(len(subq.children), 4) |
145 subq = union.children[0].with_[0].query |
145 self.assertEqual([t.as_string() for t in union.children[0].selection], |
146 self.assertEqual(len(subq.children), 4) |
146 ['ETN','COUNT(X)']) |
147 self.assertEqual([t.as_string() for t in union.children[0].selection], |
147 self.assertEqual([t.as_string() for t in union.children[0].groupby], |
148 ['ETN','COUNT(X)']) |
148 ['ETN']) |
149 self.assertEqual([t.as_string() for t in union.children[0].groupby], |
149 partrqls = sorted(((rqlst.as_string(), rqlst.solutions) for rqlst in subq.children)) |
150 ['ETN']) |
150 rql, solutions = partrqls[0] |
151 partrqls = sorted(((rqlst.as_string(), rqlst.solutions) for rqlst in subq.children)) |
151 self.assertRQLEqual(rql, |
152 rql, solutions = partrqls[0] |
152 'Any ETN,X WHERE X is ET, ET name ETN, (EXISTS(X owned_by %(B)s))' |
153 self.assertRQLEqual(rql, |
153 ' OR ((((EXISTS(D concerne C?, C owned_by %(B)s, ' |
154 'Any ETN,X WHERE X is ET, ET name ETN, (EXISTS(X owned_by %(B)s))' |
154 ' X identity D, C is Division, D is Affaire))' |
155 ' OR ((((EXISTS(D concerne C?, C owned_by %(B)s, ' |
155 ' OR (EXISTS(H concerne G?, G owned_by %(B)s, G is SubDivision, ' |
156 ' X identity D, C is Division, D is Affaire))' |
156 ' X identity H, H is Affaire)))' |
157 ' OR (EXISTS(H concerne G?, G owned_by %(B)s, G is SubDivision, ' |
157 ' OR (EXISTS(I concerne F?, F owned_by %(B)s, F is Societe, ' |
158 ' X identity H, H is Affaire)))' |
158 ' X identity I, I is Affaire)))' |
159 ' OR (EXISTS(I concerne F?, F owned_by %(B)s, F is Societe, ' |
159 ' OR (EXISTS(J concerne E?, E owned_by %(B)s, E is Note, ' |
160 ' X identity I, I is Affaire)))' |
160 ' X identity J, J is Affaire)))' |
161 ' OR (EXISTS(J concerne E?, E owned_by %(B)s, E is Note, ' |
161 ', ET is CWEType, X is Affaire') |
162 ' X identity J, J is Affaire)))' |
162 self.assertEqual(solutions, [{'C': 'Division', |
163 ', ET is CWEType, X is Affaire') |
163 'D': 'Affaire', |
164 self.assertEqual(solutions, [{'C': 'Division', |
164 'E': 'Note', |
165 'D': 'Affaire', |
165 'F': 'Societe', |
166 'E': 'Note', |
166 'G': 'SubDivision', |
167 'F': 'Societe', |
167 'H': 'Affaire', |
168 'G': 'SubDivision', |
168 'I': 'Affaire', |
169 'H': 'Affaire', |
169 'J': 'Affaire', |
170 'I': 'Affaire', |
170 'X': 'Affaire', |
171 'J': 'Affaire', |
171 'ET': 'CWEType', 'ETN': 'String'}]) |
172 'X': 'Affaire', |
172 rql, solutions = partrqls[1] |
173 'ET': 'CWEType', 'ETN': 'String'}]) |
173 self.assertRQLEqual(rql, 'Any ETN,X WHERE X is ET, ET name ETN, ET is CWEType, ' |
174 rql, solutions = partrqls[1] |
174 'X is IN(BaseTransition, Bookmark, CWAttribute, CWCache, CWConstraint, ' |
175 self.assertRQLEqual(rql, 'Any ETN,X WHERE X is ET, ET name ETN, ET is CWEType, ' |
175 ' CWConstraintType, CWEType, CWGroup, CWPermission, CWProperty, CWRType, ' |
176 'X is IN(BaseTransition, Bookmark, CWAttribute, CWCache, CWConstraint, ' |
176 ' CWRelation, CWSource, CWUniqueTogetherConstraint, CWUser, Card, Comment, ' |
177 ' CWConstraintType, CWEType, CWGroup, CWPermission, CWProperty, CWRType, ' |
177 ' Division, Email, EmailPart, EmailThread, ExternalUri, File, Folder, Note, ' |
178 ' CWRelation, CWSource, CWUniqueTogetherConstraint, CWUser, Card, Comment, ' |
178 ' Old, Personne, RQLExpression, Societe, State, SubDivision, ' |
179 ' Division, Email, EmailPart, EmailThread, ExternalUri, File, Folder, Note, ' |
179 ' SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)') |
180 ' Old, Personne, RQLExpression, Societe, State, SubDivision, ' |
180 self.assertListEqual(sorted(solutions), |
181 ' SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)') |
181 sorted([{'X': 'BaseTransition', 'ETN': 'String', 'ET': 'CWEType'}, |
182 self.assertListEqual(sorted(solutions), |
182 {'X': 'Bookmark', 'ETN': 'String', 'ET': 'CWEType'}, |
183 sorted([{'X': 'BaseTransition', 'ETN': 'String', 'ET': 'CWEType'}, |
183 {'X': 'Card', 'ETN': 'String', 'ET': 'CWEType'}, |
184 {'X': 'Bookmark', 'ETN': 'String', 'ET': 'CWEType'}, |
184 {'X': 'Comment', 'ETN': 'String', 'ET': 'CWEType'}, |
185 {'X': 'Card', 'ETN': 'String', 'ET': 'CWEType'}, |
185 {'X': 'Division', 'ETN': 'String', 'ET': 'CWEType'}, |
186 {'X': 'Comment', 'ETN': 'String', 'ET': 'CWEType'}, |
186 {'X': 'CWCache', 'ETN': 'String', 'ET': 'CWEType'}, |
187 {'X': 'Division', 'ETN': 'String', 'ET': 'CWEType'}, |
187 {'X': 'CWConstraint', 'ETN': 'String', 'ET': 'CWEType'}, |
188 {'X': 'CWCache', 'ETN': 'String', 'ET': 'CWEType'}, |
188 {'X': 'CWConstraintType', 'ETN': 'String', 'ET': 'CWEType'}, |
189 {'X': 'CWConstraint', 'ETN': 'String', 'ET': 'CWEType'}, |
189 {'X': 'CWEType', 'ETN': 'String', 'ET': 'CWEType'}, |
190 {'X': 'CWConstraintType', 'ETN': 'String', 'ET': 'CWEType'}, |
190 {'X': 'CWAttribute', 'ETN': 'String', 'ET': 'CWEType'}, |
191 {'X': 'CWEType', 'ETN': 'String', 'ET': 'CWEType'}, |
191 {'X': 'CWGroup', 'ETN': 'String', 'ET': 'CWEType'}, |
192 {'X': 'CWAttribute', 'ETN': 'String', 'ET': 'CWEType'}, |
192 {'X': 'CWRelation', 'ETN': 'String', 'ET': 'CWEType'}, |
193 {'X': 'CWGroup', 'ETN': 'String', 'ET': 'CWEType'}, |
193 {'X': 'CWPermission', 'ETN': 'String', 'ET': 'CWEType'}, |
194 {'X': 'CWRelation', 'ETN': 'String', 'ET': 'CWEType'}, |
194 {'X': 'CWProperty', 'ETN': 'String', 'ET': 'CWEType'}, |
195 {'X': 'CWPermission', 'ETN': 'String', 'ET': 'CWEType'}, |
195 {'X': 'CWRType', 'ETN': 'String', 'ET': 'CWEType'}, |
196 {'X': 'CWProperty', 'ETN': 'String', 'ET': 'CWEType'}, |
196 {'X': 'CWSource', 'ETN': 'String', 'ET': 'CWEType'}, |
197 {'X': 'CWRType', 'ETN': 'String', 'ET': 'CWEType'}, |
197 {'X': 'CWUniqueTogetherConstraint', 'ETN': 'String', 'ET': 'CWEType'}, |
198 {'X': 'CWSource', 'ETN': 'String', 'ET': 'CWEType'}, |
198 {'X': 'CWUser', 'ETN': 'String', 'ET': 'CWEType'}, |
199 {'X': 'CWUniqueTogetherConstraint', 'ETN': 'String', 'ET': 'CWEType'}, |
199 {'X': 'Email', 'ETN': 'String', 'ET': 'CWEType'}, |
200 {'X': 'CWUser', 'ETN': 'String', 'ET': 'CWEType'}, |
200 {'X': 'EmailPart', 'ETN': 'String', 'ET': 'CWEType'}, |
201 {'X': 'Email', 'ETN': 'String', 'ET': 'CWEType'}, |
201 {'X': 'EmailThread', 'ETN': 'String', 'ET': 'CWEType'}, |
202 {'X': 'EmailPart', 'ETN': 'String', 'ET': 'CWEType'}, |
202 {'X': 'ExternalUri', 'ETN': 'String', 'ET': 'CWEType'}, |
203 {'X': 'EmailThread', 'ETN': 'String', 'ET': 'CWEType'}, |
203 {'X': 'File', 'ETN': 'String', 'ET': 'CWEType'}, |
204 {'X': 'ExternalUri', 'ETN': 'String', 'ET': 'CWEType'}, |
204 {'X': 'Folder', 'ETN': 'String', 'ET': 'CWEType'}, |
205 {'X': 'File', 'ETN': 'String', 'ET': 'CWEType'}, |
205 {'X': 'Note', 'ETN': 'String', 'ET': 'CWEType'}, |
206 {'X': 'Folder', 'ETN': 'String', 'ET': 'CWEType'}, |
206 {'X': 'Old', 'ETN': 'String', 'ET': 'CWEType'}, |
207 {'X': 'Note', 'ETN': 'String', 'ET': 'CWEType'}, |
207 {'X': 'Personne', 'ETN': 'String', 'ET': 'CWEType'}, |
208 {'X': 'Old', 'ETN': 'String', 'ET': 'CWEType'}, |
208 {'X': 'RQLExpression', 'ETN': 'String', 'ET': 'CWEType'}, |
209 {'X': 'Personne', 'ETN': 'String', 'ET': 'CWEType'}, |
209 {'X': 'Societe', 'ETN': 'String', 'ET': 'CWEType'}, |
210 {'X': 'RQLExpression', 'ETN': 'String', 'ET': 'CWEType'}, |
210 {'X': 'State', 'ETN': 'String', 'ET': 'CWEType'}, |
211 {'X': 'Societe', 'ETN': 'String', 'ET': 'CWEType'}, |
211 {'X': 'SubDivision', 'ETN': 'String', 'ET': 'CWEType'}, |
212 {'X': 'State', 'ETN': 'String', 'ET': 'CWEType'}, |
212 {'X': 'SubWorkflowExitPoint', 'ETN': 'String', 'ET': 'CWEType'}, |
213 {'X': 'SubDivision', 'ETN': 'String', 'ET': 'CWEType'}, |
213 {'X': 'Tag', 'ETN': 'String', 'ET': 'CWEType'}, |
214 {'X': 'SubWorkflowExitPoint', 'ETN': 'String', 'ET': 'CWEType'}, |
214 {'X': 'Transition', 'ETN': 'String', 'ET': 'CWEType'}, |
215 {'X': 'Tag', 'ETN': 'String', 'ET': 'CWEType'}, |
215 {'X': 'TrInfo', 'ETN': 'String', 'ET': 'CWEType'}, |
216 {'X': 'Transition', 'ETN': 'String', 'ET': 'CWEType'}, |
216 {'X': 'Workflow', 'ETN': 'String', 'ET': 'CWEType'}, |
217 {'X': 'TrInfo', 'ETN': 'String', 'ET': 'CWEType'}, |
217 {'X': 'WorkflowTransition', 'ETN': 'String', 'ET': 'CWEType'}])) |
218 {'X': 'Workflow', 'ETN': 'String', 'ET': 'CWEType'}, |
218 rql, solutions = partrqls[2] |
219 {'X': 'WorkflowTransition', 'ETN': 'String', 'ET': 'CWEType'}])) |
219 self.assertEqual(rql, |
220 rql, solutions = partrqls[2] |
220 'Any ETN,X WHERE X is ET, ET name ETN, EXISTS(%(D)s use_email X), ' |
221 self.assertEqual(rql, |
221 'ET is CWEType, X is EmailAddress') |
222 'Any ETN,X WHERE X is ET, ET name ETN, EXISTS(%(D)s use_email X), ' |
222 self.assertEqual(solutions, [{'X': 'EmailAddress', 'ET': 'CWEType', 'ETN': 'String'}]) |
223 'ET is CWEType, X is EmailAddress') |
223 rql, solutions = partrqls[3] |
224 self.assertEqual(solutions, [{'X': 'EmailAddress', 'ET': 'CWEType', 'ETN': 'String'}]) |
224 self.assertEqual(rql, |
225 rql, solutions = partrqls[3] |
225 'Any ETN,X WHERE X is ET, ET name ETN, EXISTS(X owned_by %(C)s), ' |
226 self.assertEqual(rql, |
226 'ET is CWEType, X is Basket') |
227 'Any ETN,X WHERE X is ET, ET name ETN, EXISTS(X owned_by %(C)s), ' |
227 self.assertEqual(solutions, [{'X': 'Basket', 'ET': 'CWEType', 'ETN': 'String'}]) |
228 'ET is CWEType, X is Basket') |
|
229 self.assertEqual(solutions, [{'X': 'Basket', 'ET': 'CWEType', 'ETN': 'String'}]) |
228 |
230 |
229 def test_preprocess_security_aggregat(self): |
231 def test_preprocess_security_aggregat(self): |
230 plan = self._prepare_plan('Any MAX(X)') |
232 s = self.user_groups_session('users') |
231 plan.cnx = self.user_groups_session('users') |
233 with s.new_cnx() as cnx: |
232 union = plan.rqlst |
234 plan = self._prepare_plan(cnx, 'Any MAX(X)') |
233 plan.preprocess(union) |
235 union = plan.rqlst |
234 self.assertEqual(len(union.children), 1) |
236 plan.preprocess(union) |
235 self.assertEqual(len(union.children[0].with_), 1) |
237 self.assertEqual(len(union.children), 1) |
236 subq = union.children[0].with_[0].query |
238 self.assertEqual(len(union.children[0].with_), 1) |
237 self.assertEqual(len(subq.children), 4) |
239 subq = union.children[0].with_[0].query |
238 self.assertEqual([t.as_string() for t in union.children[0].selection], |
240 self.assertEqual(len(subq.children), 4) |
239 ['MAX(X)']) |
241 self.assertEqual([t.as_string() for t in union.children[0].selection], |
|
242 ['MAX(X)']) |
240 |
243 |
241 def test_preprocess_nonregr(self): |
244 def test_preprocess_nonregr(self): |
242 rqlst = self._prepare('Any S ORDERBY SI WHERE NOT S ecrit_par O, S para SI') |
245 with self.session.new_cnx() as cnx: |
243 self.assertEqual(len(rqlst.solutions), 1) |
246 rqlst = self._prepare(cnx, 'Any S ORDERBY SI WHERE NOT S ecrit_par O, S para SI') |
|
247 self.assertEqual(len(rqlst.solutions), 1) |
244 |
248 |
245 def test_build_description(self): |
249 def test_build_description(self): |
246 # should return an empty result set |
250 # should return an empty result set |
247 rset = self.qexecute('Any X WHERE X eid %(x)s', {'x': self.session.user.eid}) |
251 rset = self.qexecute('Any X WHERE X eid %(x)s', {'x': self.session.user.eid}) |
248 self.assertEqual(rset.description[0][0], 'CWUser') |
252 self.assertEqual(rset.description[0][0], 'CWUser') |