|
1 # -*- coding: iso-8859-1 -*- |
|
2 """unit tests for modules cubicweb.server.querier and cubicweb.server.querier_steps |
|
3 """ |
|
4 |
|
5 from logilab.common.testlib import TestCase, unittest_main |
|
6 from cubicweb.devtools import init_test_database |
|
7 from cubicweb.devtools.repotest import tuplify, BaseQuerierTC |
|
8 from unittest_session import Variable |
|
9 |
|
10 from mx.DateTime import today, now, DateTimeType |
|
11 from rql import BadRQLQuery, RQLSyntaxError |
|
12 from cubicweb import QueryError, Unauthorized |
|
13 from cubicweb.server.utils import crypt_password |
|
14 from cubicweb.server.sources.native import make_schema |
|
15 |
|
16 |
|
17 # register priority/severity sorting registered procedure |
|
18 from rql.utils import register_function, FunctionDescr |
|
19 |
|
20 class group_sort_value(FunctionDescr): |
|
21 supported_backends = ('sqlite',) |
|
22 rtype = 'Int' |
|
23 try: |
|
24 register_function(group_sort_value) |
|
25 except AssertionError: |
|
26 pass |
|
27 from cubicweb.server.sqlutils import SQL_CONNECT_HOOKS |
|
28 def init_sqlite_connexion(cnx): |
|
29 def group_sort_value(text): |
|
30 return {"managers": "3", "users": "2", "guests": "1", "owners": "0"}[text] |
|
31 cnx.create_function("GROUP_SORT_VALUE", 1, group_sort_value) |
|
32 SQL_CONNECT_HOOKS['sqlite'].append(init_sqlite_connexion) |
|
33 |
|
34 |
|
35 from logilab.common.adbh import _GenericAdvFuncHelper |
|
36 TYPEMAP = _GenericAdvFuncHelper.TYPE_MAPPING |
|
37 |
|
38 class MakeSchemaTC(TestCase): |
|
39 def test_known_values(self): |
|
40 solution = {'A': 'String', 'B': 'EUser'} |
|
41 self.assertEquals(make_schema((Variable('A'), Variable('B')), solution, |
|
42 'table0', TYPEMAP), |
|
43 ('C0 text,C1 integer', {'A': 'table0.C0', 'B': 'table0.C1'})) |
|
44 |
|
45 |
|
46 repo, cnx = init_test_database('sqlite') |
|
47 |
|
48 |
|
49 |
|
50 class UtilsTC(BaseQuerierTC): |
|
51 repo = repo |
|
52 |
|
53 def get_max_eid(self): |
|
54 # no need for cleanup here |
|
55 return None |
|
56 def cleanup(self): |
|
57 # no need for cleanup here |
|
58 pass |
|
59 |
|
60 def test_preprocess_1(self): |
|
61 reid = self.execute('Any X WHERE X is ERType, X name "owned_by"')[0][0] |
|
62 rqlst = self._prepare('Any COUNT(RDEF) WHERE RDEF relation_type X, X eid %(x)s', {'x': reid}) |
|
63 self.assertEquals(rqlst.solutions, [{'RDEF': 'EFRDef'}, {'RDEF': 'ENFRDef'}]) |
|
64 |
|
65 def test_preprocess_2(self): |
|
66 teid = self.execute("INSERT Tag X: X name 'tag'")[0][0] |
|
67 #geid = self.execute("EGroup G WHERE G name 'users'")[0][0] |
|
68 #self.execute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s", |
|
69 # {'g': geid, 't': teid}, 'g') |
|
70 rqlst = self._prepare('Any X WHERE E eid %(x)s, E tags X', {'x': teid}) |
|
71 # the query may be optimized, should keep only one solution |
|
72 # (any one, etype will be discarded) |
|
73 self.assertEquals(len(rqlst.solutions), 1) |
|
74 |
|
75 def test_preprocess_security(self): |
|
76 plan = self._prepare_plan('Any ETN,COUNT(X) GROUPBY ETN ' |
|
77 'WHERE X is ET, ET name ETN') |
|
78 plan.session = self._user_session(('users',))[1] |
|
79 union = plan.rqlst |
|
80 plan.preprocess(union) |
|
81 self.assertEquals(len(union.children), 1) |
|
82 self.assertEquals(len(union.children[0].with_), 1) |
|
83 subq = union.children[0].with_[0].query |
|
84 self.assertEquals(len(subq.children), 3) |
|
85 self.assertEquals([t.as_string() for t in union.children[0].selection], |
|
86 ['ETN','COUNT(X)']) |
|
87 self.assertEquals([t.as_string() for t in union.children[0].groupby], |
|
88 ['ETN']) |
|
89 partrqls = sorted(((rqlst.as_string(), rqlst.solutions) for rqlst in subq.children)) |
|
90 rql, solutions = partrqls[0] |
|
91 self.assertEquals(rql, |
|
92 'Any ETN,X WHERE X is ET, ET name ETN, (EXISTS(X owned_by %(B)s))' |
|
93 ' OR ((((EXISTS(D concerne C?, C owned_by %(B)s, X identity D, C is Division, D is Affaire))' |
|
94 ' OR (EXISTS(H concerne G?, G owned_by %(B)s, G is SubDivision, X identity H, H is Affaire)))' |
|
95 ' OR (EXISTS(I concerne F?, F owned_by %(B)s, F is Societe, X identity I, I is Affaire)))' |
|
96 ' OR (EXISTS(J concerne E?, E owned_by %(B)s, E is Note, X identity J, J is Affaire)))' |
|
97 ', ET is EEType, X is Affaire') |
|
98 self.assertEquals(solutions, [{'C': 'Division', |
|
99 'D': 'Affaire', |
|
100 'E': 'Note', |
|
101 'F': 'Societe', |
|
102 'G': 'SubDivision', |
|
103 'H': 'Affaire', |
|
104 'I': 'Affaire', |
|
105 'J': 'Affaire', |
|
106 'X': 'Affaire', |
|
107 'ET': 'EEType', 'ETN': 'String'}]) |
|
108 rql, solutions = partrqls[1] |
|
109 self.assertEquals(rql, 'Any ETN,X WHERE X is ET, ET name ETN, ET is EEType, ' |
|
110 'X is IN(Bookmark, Card, Comment, Division, EConstraint, EConstraintType, EEType, EFRDef, EGroup, ENFRDef, EPermission, EProperty, ERType, EUser, Email, EmailAddress, EmailPart, EmailThread, File, Folder, Image, Note, Personne, RQLExpression, Societe, State, SubDivision, Tag, TrInfo, Transition)') |
|
111 self.assertListEquals(sorted(solutions), |
|
112 sorted([{'X': 'Bookmark', 'ETN': 'String', 'ET': 'EEType'}, |
|
113 {'X': 'Card', 'ETN': 'String', 'ET': 'EEType'}, |
|
114 {'X': 'Comment', 'ETN': 'String', 'ET': 'EEType'}, |
|
115 {'X': 'Division', 'ETN': 'String', 'ET': 'EEType'}, |
|
116 {'X': 'EConstraint', 'ETN': 'String', 'ET': 'EEType'}, |
|
117 {'X': 'EConstraintType', 'ETN': 'String', 'ET': 'EEType'}, |
|
118 {'X': 'EEType', 'ETN': 'String', 'ET': 'EEType'}, |
|
119 {'X': 'EFRDef', 'ETN': 'String', 'ET': 'EEType'}, |
|
120 {'X': 'EGroup', 'ETN': 'String', 'ET': 'EEType'}, |
|
121 {'X': 'Email', 'ETN': 'String', 'ET': 'EEType'}, |
|
122 {'X': 'EmailAddress', 'ETN': 'String', 'ET': 'EEType'}, |
|
123 {'X': 'EmailPart', 'ETN': 'String', 'ET': 'EEType'}, |
|
124 {'X': 'EmailThread', 'ETN': 'String', 'ET': 'EEType'}, |
|
125 {'X': 'ENFRDef', 'ETN': 'String', 'ET': 'EEType'}, |
|
126 {'X': 'EPermission', 'ETN': 'String', 'ET': 'EEType'}, |
|
127 {'X': 'EProperty', 'ETN': 'String', 'ET': 'EEType'}, |
|
128 {'X': 'ERType', 'ETN': 'String', 'ET': 'EEType'}, |
|
129 {'X': 'EUser', 'ETN': 'String', 'ET': 'EEType'}, |
|
130 {'X': 'File', 'ETN': 'String', 'ET': 'EEType'}, |
|
131 {'X': 'Folder', 'ETN': 'String', 'ET': 'EEType'}, |
|
132 {'X': 'Image', 'ETN': 'String', 'ET': 'EEType'}, |
|
133 {'X': 'Note', 'ETN': 'String', 'ET': 'EEType'}, |
|
134 {'X': 'Personne', 'ETN': 'String', 'ET': 'EEType'}, |
|
135 {'X': 'RQLExpression', 'ETN': 'String', 'ET': 'EEType'}, |
|
136 {'X': 'Societe', 'ETN': 'String', 'ET': 'EEType'}, |
|
137 {'X': 'State', 'ETN': 'String', 'ET': 'EEType'}, |
|
138 {'X': 'SubDivision', 'ETN': 'String', 'ET': 'EEType'}, |
|
139 {'X': 'Tag', 'ETN': 'String', 'ET': 'EEType'}, |
|
140 {'X': 'Transition', 'ETN': 'String', 'ET': 'EEType'}, |
|
141 {'X': 'TrInfo', 'ETN': 'String', 'ET': 'EEType'}])) |
|
142 rql, solutions = partrqls[2] |
|
143 self.assertEquals(rql, |
|
144 'Any ETN,X WHERE X is ET, ET name ETN, EXISTS(X owned_by %(C)s), ' |
|
145 'ET is EEType, X is Basket') |
|
146 self.assertEquals(solutions, [{'ET': 'EEType', |
|
147 'X': 'Basket', |
|
148 'ETN': 'String', |
|
149 }]) |
|
150 |
|
151 def test_preprocess_security_aggregat(self): |
|
152 plan = self._prepare_plan('Any MAX(X)') |
|
153 plan.session = self._user_session(('users',))[1] |
|
154 union = plan.rqlst |
|
155 plan.preprocess(union) |
|
156 self.assertEquals(len(union.children), 1) |
|
157 self.assertEquals(len(union.children[0].with_), 1) |
|
158 subq = union.children[0].with_[0].query |
|
159 self.assertEquals(len(subq.children), 3) |
|
160 self.assertEquals([t.as_string() for t in union.children[0].selection], |
|
161 ['MAX(X)']) |
|
162 |
|
163 def test_preprocess_nonregr(self): |
|
164 rqlst = self._prepare('Any S ORDERBY SI WHERE NOT S ecrit_par O, S para SI') |
|
165 self.assertEquals(len(rqlst.solutions), 1) |
|
166 |
|
167 def test_build_description(self): |
|
168 # should return an empty result set |
|
169 rset = self.execute('Any X WHERE X eid %(x)s', {'x': self.session.user.eid}) |
|
170 self.assertEquals(rset.description[0][0], 'EUser') |
|
171 rset = self.execute('Any 1') |
|
172 self.assertEquals(rset.description[0][0], 'Int') |
|
173 rset = self.execute('Any TRUE') |
|
174 self.assertEquals(rset.description[0][0], 'Boolean') |
|
175 rset = self.execute('Any "hop"') |
|
176 self.assertEquals(rset.description[0][0], 'String') |
|
177 rset = self.execute('Any TODAY') |
|
178 self.assertEquals(rset.description[0][0], 'Date') |
|
179 rset = self.execute('Any NOW') |
|
180 self.assertEquals(rset.description[0][0], 'Datetime') |
|
181 rset = self.execute('Any %(x)s', {'x': 1}) |
|
182 self.assertEquals(rset.description[0][0], 'Int') |
|
183 rset = self.execute('Any %(x)s', {'x': 1L}) |
|
184 self.assertEquals(rset.description[0][0], 'Int') |
|
185 rset = self.execute('Any %(x)s', {'x': True}) |
|
186 self.assertEquals(rset.description[0][0], 'Boolean') |
|
187 rset = self.execute('Any %(x)s', {'x': 1.0}) |
|
188 self.assertEquals(rset.description[0][0], 'Float') |
|
189 rset = self.execute('Any %(x)s', {'x': now()}) |
|
190 self.assertEquals(rset.description[0][0], 'Datetime') |
|
191 rset = self.execute('Any %(x)s', {'x': 'str'}) |
|
192 self.assertEquals(rset.description[0][0], 'String') |
|
193 rset = self.execute('Any %(x)s', {'x': u'str'}) |
|
194 self.assertEquals(rset.description[0][0], 'String') |
|
195 |
|
196 |
|
197 class QuerierTC(BaseQuerierTC): |
|
198 repo = repo |
|
199 |
|
200 def test_encoding_pb(self): |
|
201 self.assertRaises(RQLSyntaxError, self.execute, |
|
202 'Any X WHERE X is ERType, X name "öwned_by"') |
|
203 |
|
204 def test_unknown_eid(self): |
|
205 # should return an empty result set |
|
206 self.failIf(self.execute('Any X WHERE X eid 99999999')) |
|
207 |
|
208 # selection queries tests ################################################# |
|
209 |
|
210 def test_select_1(self): |
|
211 rset = self.execute('Any X ORDERBY X WHERE X is EGroup') |
|
212 result, descr = rset.rows, rset.description |
|
213 self.assertEquals(tuplify(result), [(1,), (2,), (3,), (4,)]) |
|
214 self.assertEquals(descr, [('EGroup',), ('EGroup',), ('EGroup',), ('EGroup',)]) |
|
215 |
|
216 def test_select_2(self): |
|
217 rset = self.execute('Any X ORDERBY N WHERE X is EGroup, X name N') |
|
218 self.assertEquals(tuplify(rset.rows), [(3,), (1,), (4,), (2,)]) |
|
219 self.assertEquals(rset.description, [('EGroup',), ('EGroup',), ('EGroup',), ('EGroup',)]) |
|
220 rset = self.execute('Any X ORDERBY N DESC WHERE X is EGroup, X name N') |
|
221 self.assertEquals(tuplify(rset.rows), [(2,), (4,), (1,), (3,)]) |
|
222 |
|
223 def test_select_3(self): |
|
224 rset = self.execute('Any N GROUPBY N WHERE X is EGroup, X name N') |
|
225 result, descr = rset.rows, rset.description |
|
226 result.sort() |
|
227 self.assertEquals(tuplify(result), [('guests',), ('managers',), ('owners',), ('users',)]) |
|
228 self.assertEquals(descr, [('String',), ('String',), ('String',), ('String',)]) |
|
229 |
|
230 def test_select_is(self): |
|
231 rset = self.execute('Any X, TN ORDERBY TN LIMIT 10 WHERE X is T, T name TN') |
|
232 result, descr = rset.rows, rset.description |
|
233 self.assertEquals(result[0][1], descr[0][0]) |
|
234 |
|
235 def test_select_is_aggr(self): |
|
236 rset = self.execute('Any TN, COUNT(X) GROUPBY TN ORDERBY 2 DESC WHERE X is T, T name TN') |
|
237 result, descr = rset.rows, rset.description |
|
238 self.assertEquals(descr[0][0], 'String') |
|
239 self.assertEquals(descr[0][1], 'Int') |
|
240 self.assertEquals(result[0][0], 'ENFRDef') |
|
241 |
|
242 def test_select_groupby_orderby(self): |
|
243 rset = self.execute('Any N GROUPBY N ORDERBY N WHERE X is EGroup, X name N') |
|
244 self.assertEquals(tuplify(rset.rows), [('guests',), ('managers',), ('owners',), ('users',)]) |
|
245 self.assertEquals(rset.description, [('String',), ('String',), ('String',), ('String',)]) |
|
246 |
|
247 def test_select_complex_groupby(self): |
|
248 rset = self.execute('Any N GROUPBY N WHERE X name N') |
|
249 rset = self.execute('Any N,MAX(D) GROUPBY N LIMIT 5 WHERE X name N, X creation_date D') |
|
250 |
|
251 def test_select_inlined_groupby(self): |
|
252 seid = self.execute('State X WHERE X name "deactivated"')[0][0] |
|
253 rset = self.execute('Any U,L,S GROUPBY U,L,S WHERE X in_state S, U login L, S eid %s' % seid) |
|
254 |
|
255 def test_select_complex_orderby(self): |
|
256 rset1 = self.execute('Any N ORDERBY N WHERE X name N') |
|
257 self.assertEquals(sorted(rset1.rows), rset1.rows) |
|
258 rset = self.execute('Any N ORDERBY N LIMIT 5 OFFSET 1 WHERE X name N') |
|
259 self.assertEquals(rset.rows[0][0], rset1.rows[1][0]) |
|
260 self.assertEquals(len(rset), 5) |
|
261 |
|
262 def test_select_5(self): |
|
263 rset = self.execute('Any X, TMP ORDERBY TMP WHERE X name TMP, X is EGroup') |
|
264 self.assertEquals(tuplify(rset.rows), [(3, 'guests',), (1, 'managers',), (4, 'owners',), (2, 'users',)]) |
|
265 self.assertEquals(rset.description, [('EGroup', 'String',), ('EGroup', 'String',), ('EGroup', 'String',), ('EGroup', 'String',)]) |
|
266 |
|
267 def test_select_6(self): |
|
268 self.execute("INSERT Personne X: X nom 'bidule'")[0] |
|
269 rset = self.execute('Any Y where X name TMP, Y nom in (TMP, "bidule")') |
|
270 #self.assertEquals(rset.description, [('Personne',), ('Personne',)]) |
|
271 self.assert_(('Personne',) in rset.description) |
|
272 rset = self.execute('DISTINCT Any Y where X name TMP, Y nom in (TMP, "bidule")') |
|
273 self.assert_(('Personne',) in rset.description) |
|
274 |
|
275 def test_select_not_attr(self): |
|
276 self.execute("INSERT Personne X: X nom 'bidule'") |
|
277 self.execute("INSERT Societe X: X nom 'chouette'") |
|
278 rset = self.execute('Personne X WHERE NOT X nom "bidule"') |
|
279 self.assertEquals(len(rset.rows), 0, rset.rows) |
|
280 rset = self.execute('Personne X WHERE NOT X nom "bid"') |
|
281 self.assertEquals(len(rset.rows), 1, rset.rows) |
|
282 self.execute("SET P travaille S WHERE P nom 'bidule', S nom 'chouette'") |
|
283 rset = self.execute('Personne X WHERE NOT X travaille S') |
|
284 self.assertEquals(len(rset.rows), 0, rset.rows) |
|
285 |
|
286 def test_select_is_in(self): |
|
287 self.execute("INSERT Personne X: X nom 'bidule'") |
|
288 self.execute("INSERT Societe X: X nom 'chouette'") |
|
289 self.assertEquals(len(self.execute("Any X WHERE X is IN (Personne, Societe)")), |
|
290 2) |
|
291 |
|
292 def test_select_not_rel(self): |
|
293 self.execute("INSERT Personne X: X nom 'bidule'") |
|
294 self.execute("INSERT Societe X: X nom 'chouette'") |
|
295 self.execute("INSERT Personne X: X nom 'autre'") |
|
296 self.execute("SET P travaille S WHERE P nom 'bidule', S nom 'chouette'") |
|
297 rset = self.execute('Personne X WHERE NOT X travaille S') |
|
298 self.assertEquals(len(rset.rows), 1, rset.rows) |
|
299 rset = self.execute('Personne X WHERE NOT X travaille S, S nom "chouette"') |
|
300 self.assertEquals(len(rset.rows), 1, rset.rows) |
|
301 |
|
302 def test_select_nonregr_inlined(self): |
|
303 self.execute("INSERT Note X: X para 'bidule'") |
|
304 self.execute("INSERT Personne X: X nom 'chouette'") |
|
305 self.execute("INSERT Personne X: X nom 'autre'") |
|
306 self.execute("SET X ecrit_par P WHERE X para 'bidule', P nom 'chouette'") |
|
307 rset = self.execute('Any U,T ORDERBY T DESC WHERE U is EUser, ' |
|
308 'N ecrit_par U, N type T')#, {'x': self.ueid}) |
|
309 self.assertEquals(len(rset.rows), 0) |
|
310 |
|
311 def test_select_nonregr_edition_not(self): |
|
312 groupeids = set((1, 2, 3)) |
|
313 groupreadperms = set(r[0] for r in self.execute('Any Y WHERE X name "EGroup", Y eid IN(1, 2, 3), X read_permission Y')) |
|
314 rset = self.execute('DISTINCT Any Y WHERE X is EEType, X name "EGroup", Y eid IN(1, 2, 3), NOT X read_permission Y') |
|
315 self.assertEquals(sorted(r[0] for r in rset.rows), sorted(groupeids - groupreadperms)) |
|
316 rset = self.execute('DISTINCT Any Y WHERE X name "EGroup", Y eid IN(1, 2, 3), NOT X read_permission Y') |
|
317 self.assertEquals(sorted(r[0] for r in rset.rows), sorted(groupeids - groupreadperms)) |
|
318 |
|
319 def test_select_outer_join(self): |
|
320 peid1 = self.execute("INSERT Personne X: X nom 'bidule'")[0][0] |
|
321 peid2 = self.execute("INSERT Personne X: X nom 'autre'")[0][0] |
|
322 seid1 = self.execute("INSERT Societe X: X nom 'chouette'")[0][0] |
|
323 seid2 = self.execute("INSERT Societe X: X nom 'chouetos'")[0][0] |
|
324 rset = self.execute('Any X,S ORDERBY X WHERE X travaille S?') |
|
325 self.assertEquals(rset.rows, [[peid1, None], [peid2, None]]) |
|
326 self.execute("SET P travaille S WHERE P nom 'bidule', S nom 'chouette'") |
|
327 rset = self.execute('Any X,S ORDERBY X WHERE X travaille S?') |
|
328 self.assertEquals(rset.rows, [[peid1, seid1], [peid2, None]]) |
|
329 rset = self.execute('Any S,X ORDERBY S WHERE X? travaille S') |
|
330 self.assertEquals(rset.rows, [[seid1, peid1], [seid2, None]]) |
|
331 |
|
332 def test_select_outer_join_optimized(self): |
|
333 peid1 = self.execute("INSERT Personne X: X nom 'bidule'")[0][0] |
|
334 rset = self.execute('Any X WHERE X eid %(x)s, P? connait X', {'x':peid1}, 'x') |
|
335 self.assertEquals(rset.rows, [[peid1]]) |
|
336 rset = self.execute('Any X WHERE X eid %(x)s, X require_permission P?', {'x':peid1}, 'x') |
|
337 self.assertEquals(rset.rows, [[peid1]]) |
|
338 |
|
339 def test_select_left_outer_join(self): |
|
340 ueid = self.execute("INSERT EUser X: X login 'bob', X upassword 'toto', X in_group G " |
|
341 "WHERE G name 'users'")[0][0] |
|
342 self.commit() |
|
343 try: |
|
344 rset = self.execute('Any FS,TS,C,D,U ORDERBY D DESC ' |
|
345 'WHERE WF wf_info_for X,' |
|
346 'WF from_state FS?, WF to_state TS, WF comment C,' |
|
347 'WF creation_date D, WF owned_by U, X eid %(x)s', |
|
348 {'x': ueid}, 'x') |
|
349 self.assertEquals(len(rset), 1) |
|
350 self.execute('SET X in_state S WHERE X eid %(x)s, S name "deactivated"', |
|
351 {'x': ueid}, 'x') |
|
352 rset = self.execute('Any FS,TS,C,D,U ORDERBY D DESC ' |
|
353 'WHERE WF wf_info_for X,' |
|
354 'WF from_state FS?, WF to_state TS, WF comment C,' |
|
355 'WF creation_date D, WF owned_by U, X eid %(x)s', |
|
356 {'x': ueid}, 'x') |
|
357 self.assertEquals(len(rset), 2) |
|
358 finally: |
|
359 self.execute('DELETE EUser X WHERE X eid %s' % ueid) |
|
360 self.commit() |
|
361 |
|
362 def test_select_ambigous_outer_join(self): |
|
363 teid = self.execute("INSERT Tag X: X name 'tag'")[0][0] |
|
364 self.execute("INSERT Tag X: X name 'tagbis'")[0][0] |
|
365 geid = self.execute("EGroup G WHERE G name 'users'")[0][0] |
|
366 self.execute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s", |
|
367 {'g': geid, 't': teid}, 'g') |
|
368 rset = self.execute("Any GN,TN ORDERBY GN WHERE T? tags G, T name TN, G name GN") |
|
369 self.failUnless(['users', 'tag'] in rset.rows) |
|
370 self.failUnless(['activated', None] in rset.rows) |
|
371 rset = self.execute("Any GN,TN ORDERBY GN WHERE T tags G?, T name TN, G name GN") |
|
372 self.assertEquals(rset.rows, [[None, 'tagbis'], ['users', 'tag']]) |
|
373 |
|
374 def test_select_not_inline_rel(self): |
|
375 self.execute("INSERT Personne X: X nom 'bidule'") |
|
376 self.execute("INSERT Note X: X type 'a'") |
|
377 self.execute("INSERT Note X: X type 'b'") |
|
378 self.execute("SET X ecrit_par Y WHERE X type 'a', Y nom 'bidule'") |
|
379 rset = self.execute('Note X WHERE NOT X ecrit_par P') |
|
380 self.assertEquals(len(rset.rows), 1, rset.rows) |
|
381 |
|
382 def test_select_not_unlinked_multiple_solutions(self): |
|
383 self.execute("INSERT Personne X: X nom 'bidule'") |
|
384 self.execute("INSERT Note X: X type 'a'") |
|
385 self.execute("INSERT Note X: X type 'b'") |
|
386 self.execute("SET Y evaluee X WHERE X type 'a', Y nom 'bidule'") |
|
387 rset = self.execute('Note X WHERE NOT Y evaluee X') |
|
388 self.assertEquals(len(rset.rows), 1, rset.rows) |
|
389 |
|
390 def test_select_aggregat_count(self): |
|
391 rset = self.execute('Any COUNT(X)') |
|
392 self.assertEquals(len(rset.rows), 1) |
|
393 self.assertEquals(len(rset.rows[0]), 1) |
|
394 self.assertEquals(rset.description, [('Int',)]) |
|
395 |
|
396 def test_select_aggregat_sum(self): |
|
397 rset = self.execute('Any SUM(O) WHERE X ordernum O') |
|
398 self.assertEquals(len(rset.rows), 1) |
|
399 self.assertEquals(len(rset.rows[0]), 1) |
|
400 self.assertEquals(rset.description, [('Int',)]) |
|
401 |
|
402 def test_select_aggregat_min(self): |
|
403 rset = self.execute('Any MIN(X) WHERE X is Personne') |
|
404 self.assertEquals(len(rset.rows), 1) |
|
405 self.assertEquals(len(rset.rows[0]), 1) |
|
406 self.assertEquals(rset.description, [('Personne',)]) |
|
407 rset = self.execute('Any MIN(O) WHERE X ordernum O') |
|
408 self.assertEquals(len(rset.rows), 1) |
|
409 self.assertEquals(len(rset.rows[0]), 1) |
|
410 self.assertEquals(rset.description, [('Int',)]) |
|
411 |
|
412 def test_select_aggregat_max(self): |
|
413 rset = self.execute('Any MAX(X) WHERE X is Personne') |
|
414 self.assertEquals(len(rset.rows), 1) |
|
415 self.assertEquals(len(rset.rows[0]), 1) |
|
416 self.assertEquals(rset.description, [('Personne',)]) |
|
417 rset = self.execute('Any MAX(O) WHERE X ordernum O') |
|
418 self.assertEquals(len(rset.rows), 1) |
|
419 self.assertEquals(len(rset.rows[0]), 1) |
|
420 self.assertEquals(rset.description, [('Int',)]) |
|
421 |
|
422 def test_select_custom_aggregat_concat_string(self): |
|
423 rset = self.execute('Any CONCAT_STRINGS(N) WHERE X is EGroup, X name N') |
|
424 self.failUnless(rset) |
|
425 self.failUnlessEqual(sorted(rset[0][0].split(', ')), ['guests', 'managers', |
|
426 'owners', 'users']) |
|
427 |
|
428 def test_select_custom_regproc_limit_size(self): |
|
429 rset = self.execute('Any TEXT_LIMIT_SIZE(N, 3) WHERE X is EGroup, X name N, X name "managers"') |
|
430 self.failUnless(rset) |
|
431 self.failUnlessEqual(rset[0][0], 'man...') |
|
432 self.execute("INSERT Basket X: X name 'bidule', X description '<b>hop hop</b>', X description_format 'text/html'") |
|
433 rset = self.execute('Any LIMIT_SIZE(D, DF, 3) WHERE X is Basket, X description D, X description_format DF') |
|
434 self.failUnless(rset) |
|
435 self.failUnlessEqual(rset[0][0], 'hop...') |
|
436 |
|
437 def test_select_regproc_orderby(self): |
|
438 rset = self.execute('DISTINCT Any X,N ORDERBY GROUP_SORT_VALUE(N) WHERE X is EGroup, X name N, X name "managers"') |
|
439 self.failUnlessEqual(len(rset), 1) |
|
440 self.failUnlessEqual(rset[0][1], 'managers') |
|
441 rset = self.execute('Any X,N ORDERBY GROUP_SORT_VALUE(N) WHERE X is EGroup, X name N, NOT U in_group X, U login "admin"') |
|
442 self.failUnlessEqual(len(rset), 3) |
|
443 self.failUnlessEqual(rset[0][1], 'owners') |
|
444 |
|
445 def test_select_aggregat_sort(self): |
|
446 rset = self.execute('Any G, COUNT(U) GROUPBY G ORDERBY 2 WHERE U in_group G') |
|
447 self.assertEquals(len(rset.rows), 2) |
|
448 self.assertEquals(len(rset.rows[0]), 2) |
|
449 self.assertEquals(rset.description[0], ('EGroup', 'Int',)) |
|
450 |
|
451 def test_select_aggregat_having(self): |
|
452 rset = self.execute('Any N,COUNT(RDEF) GROUPBY N ORDERBY 2,N ' |
|
453 'WHERE RT name N, RDEF relation_type RT ' |
|
454 'HAVING COUNT(RDEF) > 10') |
|
455 self.assertListEquals(rset.rows, |
|
456 [[u'description', 11], ['in_basket', 11], |
|
457 [u'name', 12], [u'created_by', 32], |
|
458 [u'creation_date', 32], [u'is', 32], [u'is_instance_of', 32], |
|
459 [u'modification_date', 32], [u'owned_by', 32]]) |
|
460 |
|
461 def test_select_aggregat_having_dumb(self): |
|
462 # dumb but should not raise an error |
|
463 rset = self.execute('Any U,COUNT(X) GROUPBY U ' |
|
464 'WHERE U eid %(x)s, X owned_by U ' |
|
465 'HAVING COUNT(X) > 10', {'x': self.ueid}) |
|
466 self.assertEquals(len(rset.rows), 1) |
|
467 self.assertEquals(rset.rows[0][0], self.ueid) |
|
468 |
|
469 def test_select_complex_sort(self): |
|
470 rset = self.execute('Any X ORDERBY X,D LIMIT 5 WHERE X creation_date D') |
|
471 result = rset.rows |
|
472 result.sort() |
|
473 self.assertEquals(tuplify(result), [(1,), (2,), (3,), (4,), (5,)]) |
|
474 |
|
475 def test_select_upper(self): |
|
476 rset = self.execute('Any X, UPPER(L) ORDERBY L WHERE X is EUser, X login L') |
|
477 self.assertEquals(len(rset.rows), 2) |
|
478 self.assertEquals(rset.rows[0][1], 'ADMIN') |
|
479 self.assertEquals(rset.description[0], ('EUser', 'String',)) |
|
480 self.assertEquals(rset.rows[1][1], 'ANON') |
|
481 self.assertEquals(rset.description[1], ('EUser', 'String',)) |
|
482 eid = rset.rows[0][0] |
|
483 rset = self.execute('Any UPPER(L) WHERE X eid %s, X login L'%eid) |
|
484 self.assertEquals(rset.rows[0][0], 'ADMIN') |
|
485 self.assertEquals(rset.description, [('String',)]) |
|
486 |
|
487 ## def test_select_simplified(self): |
|
488 ## ueid = self.session.user.eid |
|
489 ## rset = self.execute('Any L WHERE %s login L'%ueid) |
|
490 ## self.assertEquals(rset.rows[0][0], 'admin') |
|
491 ## rset = self.execute('Any L WHERE %(x)s login L', {'x':ueid}) |
|
492 ## self.assertEquals(rset.rows[0][0], 'admin') |
|
493 |
|
494 def test_select_searchable_text_1(self): |
|
495 rset = self.execute(u"INSERT Personne X: X nom 'bidüle'") |
|
496 rset = self.execute(u"INSERT Societe X: X nom 'bidüle'") |
|
497 rset = self.execute("INSERT Societe X: X nom 'chouette'") |
|
498 self.commit() |
|
499 rset = self.execute('Any X where X has_text %(text)s', {'text': u'bidüle'}) |
|
500 self.assertEquals(len(rset.rows), 2, rset.rows) |
|
501 rset = self.execute(u'Any N where N has_text "bidüle"') |
|
502 self.assertEquals(len(rset.rows), 2, rset.rows) |
|
503 biduleeids = [r[0] for r in rset.rows] |
|
504 rset = self.execute(u'Any N where NOT N has_text "bidüle"') |
|
505 self.failIf([r[0] for r in rset.rows if r[0] in biduleeids]) |
|
506 # duh? |
|
507 rset = self.execute('Any X WHERE X has_text %(text)s', {'text': u'ça'}) |
|
508 |
|
509 def test_select_searchable_text_2(self): |
|
510 rset = self.execute("INSERT Personne X: X nom 'bidule'") |
|
511 rset = self.execute("INSERT Personne X: X nom 'chouette'") |
|
512 rset = self.execute("INSERT Societe X: X nom 'bidule'") |
|
513 self.commit() |
|
514 rset = self.execute('Personne N where N has_text "bidule"') |
|
515 self.assertEquals(len(rset.rows), 1, rset.rows) |
|
516 |
|
517 def test_select_searchable_text_3(self): |
|
518 rset = self.execute("INSERT Personne X: X nom 'bidule', X sexe 'M'") |
|
519 rset = self.execute("INSERT Personne X: X nom 'bidule', X sexe 'F'") |
|
520 rset = self.execute("INSERT Societe X: X nom 'bidule'") |
|
521 self.commit() |
|
522 rset = self.execute('Any X where X has_text "bidule" and X sexe "M"') |
|
523 self.assertEquals(len(rset.rows), 1, rset.rows) |
|
524 |
|
525 def test_select_multiple_searchable_text(self): |
|
526 self.execute(u"INSERT Personne X: X nom 'bidüle'") |
|
527 self.execute("INSERT Societe X: X nom 'chouette', S travaille X") |
|
528 self.execute(u"INSERT Personne X: X nom 'bidüle'") |
|
529 self.commit() |
|
530 rset = self.execute('Personne X WHERE X has_text %(text)s, X travaille S, S has_text %(text2)s', |
|
531 {'text': u'bidüle', |
|
532 'text2': u'chouette',} |
|
533 ) |
|
534 self.assertEquals(len(rset.rows), 1, rset.rows) |
|
535 |
|
536 def test_select_no_descr(self): |
|
537 rset = self.execute('Any X WHERE X is EGroup', build_descr=0) |
|
538 rset.rows.sort() |
|
539 self.assertEquals(tuplify(rset.rows), [(1,), (2,), (3,), (4,)]) |
|
540 self.assertEquals(rset.description, ()) |
|
541 |
|
542 def test_select_limit_offset(self): |
|
543 rset = self.execute('EGroup X ORDERBY N LIMIT 2 WHERE X name N') |
|
544 self.assertEquals(tuplify(rset.rows), [(3,), (1,)]) |
|
545 self.assertEquals(rset.description, [('EGroup',), ('EGroup',)]) |
|
546 rset = self.execute('EGroup X ORDERBY N LIMIT 2 OFFSET 2 WHERE X name N') |
|
547 self.assertEquals(tuplify(rset.rows), [(4,), (2,)]) |
|
548 |
|
549 def test_select_symetric(self): |
|
550 self.execute("INSERT Personne X: X nom 'machin'") |
|
551 self.execute("INSERT Personne X: X nom 'bidule'") |
|
552 self.execute("INSERT Personne X: X nom 'chouette'") |
|
553 self.execute("INSERT Personne X: X nom 'trucmuche'") |
|
554 self.execute("SET X connait Y WHERE X nom 'chouette', Y nom 'bidule'") |
|
555 self.execute("SET X connait Y WHERE X nom 'machin', Y nom 'chouette'") |
|
556 rset = self.execute('Any P where P connait P2') |
|
557 self.assertEquals(len(rset.rows), 3, rset.rows) |
|
558 rset = self.execute('Any P where NOT P connait P2') |
|
559 self.assertEquals(len(rset.rows), 1, rset.rows) # trucmuche |
|
560 rset = self.execute('Any P where P connait P2, P2 nom "bidule"') |
|
561 self.assertEquals(len(rset.rows), 1, rset.rows) |
|
562 rset = self.execute('Any P where P2 connait P, P2 nom "bidule"') |
|
563 self.assertEquals(len(rset.rows), 1, rset.rows) |
|
564 rset = self.execute('Any P where P connait P2, P2 nom "chouette"') |
|
565 self.assertEquals(len(rset.rows), 2, rset.rows) |
|
566 rset = self.execute('Any P where P2 connait P, P2 nom "chouette"') |
|
567 self.assertEquals(len(rset.rows), 2, rset.rows) |
|
568 |
|
569 def test_select_inline(self): |
|
570 self.execute("INSERT Personne X: X nom 'bidule'") |
|
571 self.execute("INSERT Note X: X type 'a'") |
|
572 self.execute("SET X ecrit_par Y WHERE X type 'a', Y nom 'bidule'") |
|
573 rset = self.execute('Any N where N ecrit_par X, X nom "bidule"') |
|
574 self.assertEquals(len(rset.rows), 1, rset.rows) |
|
575 |
|
576 def test_select_creation_date(self): |
|
577 self.execute("INSERT Personne X: X nom 'bidule'") |
|
578 rset = self.execute('Any D WHERE X nom "bidule", X creation_date D') |
|
579 self.assertEqual(len(rset.rows), 1) |
|
580 |
|
581 def test_select_or_relation(self): |
|
582 self.execute("INSERT Personne X: X nom 'bidule'") |
|
583 self.execute("INSERT Personne X: X nom 'chouette'") |
|
584 self.execute("INSERT Societe X: X nom 'logilab'") |
|
585 self.execute("INSERT Societe X: X nom 'caesium'") |
|
586 self.execute("SET P travaille S WHERE P nom 'bidule', S nom 'logilab'") |
|
587 rset = self.execute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, S1 nom "logilab", S2 nom "caesium"') |
|
588 self.assertEqual(len(rset.rows), 1) |
|
589 self.execute("SET P travaille S WHERE P nom 'chouette', S nom 'caesium'") |
|
590 rset = self.execute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, S1 nom "logilab", S2 nom "caesium"') |
|
591 self.assertEqual(len(rset.rows), 2) |
|
592 |
|
593 def test_select_or_sym_relation(self): |
|
594 self.execute("INSERT Personne X: X nom 'bidule'") |
|
595 self.execute("INSERT Personne X: X nom 'chouette'") |
|
596 self.execute("INSERT Personne X: X nom 'truc'") |
|
597 self.execute("SET P connait S WHERE P nom 'bidule', S nom 'chouette'") |
|
598 rset = self.execute('DISTINCT Any P WHERE S connait P, S nom "chouette"') |
|
599 self.assertEqual(len(rset.rows), 1, rset.rows) |
|
600 rset = self.execute('DISTINCT Any P WHERE P connait S or S connait P, S nom "chouette"') |
|
601 self.assertEqual(len(rset.rows), 1, rset.rows) |
|
602 self.execute("SET P connait S WHERE P nom 'chouette', S nom 'truc'") |
|
603 rset = self.execute('DISTINCT Any P WHERE S connait P, S nom "chouette"') |
|
604 self.assertEqual(len(rset.rows), 2, rset.rows) |
|
605 rset = self.execute('DISTINCT Any P WHERE P connait S OR S connait P, S nom "chouette"') |
|
606 self.assertEqual(len(rset.rows), 2, rset.rows) |
|
607 |
|
608 def test_select_follow_relation(self): |
|
609 self.execute("INSERT Affaire X: X sujet 'cool'") |
|
610 self.execute("INSERT Societe X: X nom 'chouette'") |
|
611 self.execute("SET A concerne S WHERE A is Affaire, S is Societe") |
|
612 self.execute("INSERT Note X: X para 'truc'") |
|
613 self.execute("SET S evaluee N WHERE S is Societe, N is Note") |
|
614 self.execute("INSERT Societe X: X nom 'bidule'") |
|
615 self.execute("INSERT Note X: X para 'troc'") |
|
616 self.execute("SET S evaluee N WHERE S nom 'bidule', N para 'troc'") |
|
617 rset = self.execute('DISTINCT Any A,N WHERE A concerne S, S evaluee N') |
|
618 self.assertEqual(len(rset.rows), 1, rset.rows) |
|
619 |
|
620 def test_select_ordered_distinct_1(self): |
|
621 self.execute("INSERT Affaire X: X sujet 'cool', X ref '1'") |
|
622 self.execute("INSERT Affaire X: X sujet 'cool', X ref '2'") |
|
623 rset = self.execute('DISTINCT Any S ORDERBY R WHERE A is Affaire, A sujet S, A ref R') |
|
624 self.assertEqual(rset.rows, [['cool']]) |
|
625 |
|
626 def test_select_ordered_distinct_2(self): |
|
627 self.execute("INSERT Affaire X: X sujet 'minor'") |
|
628 self.execute("INSERT Affaire X: X sujet 'important'") |
|
629 self.execute("INSERT Affaire X: X sujet 'normal'") |
|
630 self.execute("INSERT Affaire X: X sujet 'zou'") |
|
631 self.execute("INSERT Affaire X: X sujet 'abcd'") |
|
632 rset = self.execute('DISTINCT Any S ORDERBY S WHERE A is Affaire, A sujet S') |
|
633 self.assertEqual(rset.rows, [['abcd'], ['important'], ['minor'], ['normal'], ['zou']]) |
|
634 |
|
635 def test_select_ordered_distinct_3(self): |
|
636 rset = self.execute('DISTINCT Any N ORDERBY GROUP_SORT_VALUE(N) WHERE X is EGroup, X name N') |
|
637 self.assertEqual(rset.rows, [['owners'], ['guests'], ['users'], ['managers']]) |
|
638 |
|
639 def test_select_or_value(self): |
|
640 rset = self.execute('Any U WHERE U in_group G, G name "owners" OR G name "users"') |
|
641 self.assertEqual(len(rset.rows), 0) |
|
642 rset = self.execute('Any U WHERE U in_group G, G name "guests" OR G name "managers"') |
|
643 self.assertEqual(len(rset.rows), 2) |
|
644 |
|
645 def test_select_explicit_eid(self): |
|
646 rset = self.execute('Any X,E WHERE X owned_by U, X eid E, U eid %(u)s', {'u': self.session.user.eid}) |
|
647 self.failUnless(rset) |
|
648 self.assertEquals(rset.description[0][1], 'Int') |
|
649 |
|
650 # def test_select_rewritten_optional(self): |
|
651 # eid = self.execute("INSERT Affaire X: X sujet 'cool'")[0][0] |
|
652 # rset = self.execute('Any X WHERE X eid %(x)s, EXISTS(X owned_by U) OR EXISTS(X concerne S?, S owned_by U)', |
|
653 # {'x': eid}, 'x') |
|
654 # self.assertEquals(rset.rows, [[eid]]) |
|
655 |
|
656 def test_today_bug(self): |
|
657 self.execute("INSERT Tag X: X name 'bidule', X creation_date TODAY") |
|
658 self.execute("INSERT Tag Y: Y name 'toto'") |
|
659 rset = self.execute("Any D WHERE X name in ('bidule', 'toto') , X creation_date D") |
|
660 self.assert_(isinstance(rset.rows[0][0], DateTimeType), rset.rows) |
|
661 rset = self.execute('Tag X WHERE X creation_date TODAY') |
|
662 self.assertEqual(len(rset.rows), 2) |
|
663 rset = self.execute('Any MAX(D) WHERE X is Tag, X creation_date D') |
|
664 self.failUnless(isinstance(rset[0][0], DateTimeType), type(rset[0][0])) |
|
665 |
|
666 def test_today(self): |
|
667 self.execute("INSERT Tag X: X name 'bidule', X creation_date TODAY") |
|
668 self.execute("INSERT Tag Y: Y name 'toto'") |
|
669 rset = self.execute('Tag X WHERE X creation_date TODAY') |
|
670 self.assertEqual(len(rset.rows), 2) |
|
671 |
|
672 def test_select_boolean(self): |
|
673 rset = self.execute('Any N WHERE X is EEType, X name N, X final %(val)s', |
|
674 {'val': True}) |
|
675 self.assertEquals(sorted(r[0] for r in rset.rows), ['Boolean', 'Bytes', |
|
676 'Date', 'Datetime', |
|
677 'Decimal', 'Float', |
|
678 'Int', 'Interval', |
|
679 'Password', 'String', |
|
680 'Time']) |
|
681 rset = self.execute('Any N WHERE X is EEType, X name N, X final TRUE') |
|
682 self.assertEquals(sorted(r[0] for r in rset.rows), ['Boolean', 'Bytes', |
|
683 'Date', 'Datetime', |
|
684 'Decimal', 'Float', |
|
685 'Int', 'Interval', |
|
686 'Password', 'String', |
|
687 'Time']) |
|
688 |
|
689 def test_select_constant(self): |
|
690 rset = self.execute('Any X, "toto" ORDERBY X WHERE X is EGroup') |
|
691 self.assertEquals(rset.rows, |
|
692 map(list, zip((1,2,3,4), ('toto','toto','toto','toto',)))) |
|
693 self.assertIsInstance(rset[0][1], unicode) |
|
694 self.assertEquals(rset.description, |
|
695 zip(('EGroup', 'EGroup', 'EGroup', 'EGroup'), |
|
696 ('String', 'String', 'String', 'String',))) |
|
697 rset = self.execute('Any X, %(value)s ORDERBY X WHERE X is EGroup', {'value': 'toto'}) |
|
698 self.assertEquals(rset.rows, |
|
699 map(list, zip((1,2,3,4), ('toto','toto','toto','toto',)))) |
|
700 self.assertIsInstance(rset[0][1], unicode) |
|
701 self.assertEquals(rset.description, |
|
702 zip(('EGroup', 'EGroup', 'EGroup', 'EGroup'), |
|
703 ('String', 'String', 'String', 'String',))) |
|
704 rset = self.execute('Any X,GN WHERE X is EUser, G is EGroup, X login "syt", X in_group G, G name GN') |
|
705 |
|
706 def test_select_union(self): |
|
707 rset = self.execute('Any X,N ORDERBY N WITH X,N BEING ' |
|
708 '((Any X,N WHERE X name N, X transition_of E, E name %(name)s)' |
|
709 ' UNION ' |
|
710 '(Any X,N WHERE X name N, X state_of E, E name %(name)s))', |
|
711 {'name': 'EUser'}) |
|
712 self.assertEquals([x[1] for x in rset.rows], |
|
713 ['activate', 'activated', 'deactivate', 'deactivated']) |
|
714 self.assertEquals(rset.description, |
|
715 [('Transition', 'String'), ('State', 'String'), |
|
716 ('Transition', 'String'), ('State', 'String')]) |
|
717 |
|
718 def test_select_union_aggregat(self): |
|
719 # meaningless, the goal in to have group by done on different attribute |
|
720 # for each sub-query |
|
721 self.execute('(Any N,COUNT(X) GROUPBY N WHERE X name N, X is State)' |
|
722 ' UNION ' |
|
723 '(Any N,COUNT(X) GROUPBY N ORDERBY 2 WHERE X login N)') |
|
724 |
|
725 def test_select_union_aggregat_independant_group(self): |
|
726 self.execute('INSERT State X: X name "hop"') |
|
727 self.execute('INSERT State X: X name "hop"') |
|
728 self.execute('INSERT Transition X: X name "hop"') |
|
729 self.execute('INSERT Transition X: X name "hop"') |
|
730 rset = self.execute('Any N,NX ORDERBY 2 WITH N,NX BEING ' |
|
731 '((Any N,COUNT(X) GROUPBY N WHERE X name N, X is State HAVING COUNT(X)>1)' |
|
732 ' UNION ' |
|
733 '(Any N,COUNT(X) GROUPBY N WHERE X name N, X is Transition HAVING COUNT(X)>1))') |
|
734 self.assertEquals(rset.rows, [[u'hop', 2], [u'hop', 2]]) |
|
735 |
|
736 def test_select_union_selection_with_diff_variables(self): |
|
737 rset = self.execute('(Any N WHERE X name N, X is State)' |
|
738 ' UNION ' |
|
739 '(Any NN WHERE XX name NN, XX is Transition)') |
|
740 self.assertEquals(sorted(r[0] for r in rset.rows), |
|
741 ['abort', 'activate', 'activated', 'ben non', |
|
742 'deactivate', 'deactivated', 'done', 'en cours', |
|
743 'end', 'finie', 'markasdone', 'pitetre', 'redoit', |
|
744 'start', 'todo']) |
|
745 |
|
746 def test_exists(self): |
|
747 geid = self.execute("INSERT EGroup X: X name 'lulufanclub'")[0][0] |
|
748 self.execute("SET U in_group G WHERE G name 'lulufanclub'") |
|
749 peid = self.execute("INSERT Personne X: X prenom 'lulu', X nom 'petit'")[0][0] |
|
750 rset = self.execute("Any X WHERE X prenom 'lulu'," |
|
751 "EXISTS (U in_group G, G name 'lulufanclub' OR G name 'managers');") |
|
752 self.assertEquals(rset.rows, [[peid]]) |
|
753 |
|
754 def test_identity(self): |
|
755 eid = self.execute('Any X WHERE X identity Y, Y eid 1')[0][0] |
|
756 self.assertEquals(eid, 1) |
|
757 eid = self.execute('Any X WHERE Y identity X, Y eid 1')[0][0] |
|
758 self.assertEquals(eid, 1) |
|
759 login = self.execute('Any L WHERE X login "admin", X identity Y, Y login L')[0][0] |
|
760 self.assertEquals(login, 'admin') |
|
761 |
|
762 def test_select_date_mathexp(self): |
|
763 rset = self.execute('Any X, TODAY - CD WHERE X is EUser, X creation_date CD') |
|
764 self.failUnless(rset) |
|
765 self.failUnlessEqual(rset.description[0][1], 'Interval') |
|
766 eid, = self.execute("INSERT Personne X: X nom 'bidule'")[0] |
|
767 rset = self.execute('Any X, NOW - CD WHERE X is Personne, X creation_date CD') |
|
768 self.failUnlessEqual(rset.description[0][1], 'Interval') |
|
769 # sqlite bug |
|
770 #from mx.DateTime import DateTimeDeltaType |
|
771 #self.assertIsInstance(rset[0][1], DateTimeDeltaType) |
|
772 #self.failUnless(rset[0][1].seconds > 0) |
|
773 |
|
774 def test_select_subquery_aggregat(self): |
|
775 # percent users by groups |
|
776 self.execute('SET X in_group G WHERE G name "users"') |
|
777 rset = self.execute('Any GN, COUNT(X)*100/T GROUPBY GN ORDERBY 2,1' |
|
778 ' WHERE G name GN, X in_group G' |
|
779 ' WITH T BEING (Any COUNT(U) WHERE U is EUser)') |
|
780 self.assertEquals(rset.rows, [[u'guests', 50], [u'managers', 50], [u'users', 100]]) |
|
781 self.assertEquals(rset.description, [('String', 'Int'), ('String', 'Int'), ('String', 'Int')]) |
|
782 |
|
783 def test_select_subquery_const(self): |
|
784 rset = self.execute('Any X WITH X BEING ((Any NULL) UNION (Any "toto"))') |
|
785 self.assertEquals(rset.rows, [[None], ['toto']]) |
|
786 self.assertEquals(rset.description, [(None,), ('String',)]) |
|
787 |
|
788 # insertion queries tests ################################################# |
|
789 |
|
790 def test_insert_is(self): |
|
791 eid, = self.execute("INSERT Personne X: X nom 'bidule'")[0] |
|
792 etype, = self.execute("Any TN WHERE X is T, X eid %s, T name TN" % eid)[0] |
|
793 self.assertEquals(etype, 'Personne') |
|
794 self.execute("INSERT Personne X: X nom 'managers'") |
|
795 |
|
796 def test_insert_1(self): |
|
797 rset = self.execute("INSERT Personne X: X nom 'bidule'") |
|
798 self.assertEquals(len(rset.rows), 1) |
|
799 self.assertEquals(rset.description, [('Personne',)]) |
|
800 rset = self.execute('Personne X WHERE X nom "bidule"') |
|
801 self.assert_(rset.rows) |
|
802 self.assertEquals(rset.description, [('Personne',)]) |
|
803 |
|
804 def test_insert_1_multiple(self): |
|
805 self.execute("INSERT Personne X: X nom 'bidule'") |
|
806 self.execute("INSERT Personne X: X nom 'chouette'") |
|
807 rset = self.execute("INSERT Societe Y: Y nom N, P travaille Y WHERE P nom N") |
|
808 self.assertEquals(len(rset.rows), 2) |
|
809 self.assertEquals(rset.description, [('Societe',), ('Societe',)]) |
|
810 |
|
811 def test_insert_2(self): |
|
812 rset = self.execute("INSERT Personne X, Personne Y: X nom 'bidule', Y nom 'tutu'") |
|
813 self.assertEquals(rset.description, [('Personne', 'Personne')]) |
|
814 rset = self.execute('Personne X WHERE X nom "bidule" or X nom "tutu"') |
|
815 self.assert_(rset.rows) |
|
816 self.assertEquals(rset.description, [('Personne',), ('Personne',)]) |
|
817 |
|
818 def test_insert_3(self): |
|
819 self.execute("INSERT Personne X: X nom Y WHERE U login 'admin', U login Y") |
|
820 rset = self.execute('Personne X WHERE X nom "admin"') |
|
821 self.assert_(rset.rows) |
|
822 self.assertEquals(rset.description, [('Personne',)]) |
|
823 |
|
824 def test_insert_4(self): |
|
825 self.execute("INSERT Societe Y: Y nom 'toto'") |
|
826 self.execute("INSERT Personne X: X nom 'bidule', X travaille Y WHERE Y nom 'toto'") |
|
827 rset = self.execute('Any X, Y WHERE X nom "bidule", Y nom "toto", X travaille Y') |
|
828 self.assert_(rset.rows) |
|
829 self.assertEquals(rset.description, [('Personne', 'Societe',)]) |
|
830 |
|
831 def test_insert_4bis(self): |
|
832 peid = self.execute("INSERT Personne X: X nom 'bidule'")[0][0] |
|
833 seid = self.execute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X eid %(x)s", |
|
834 {'x': str(peid)})[0][0] |
|
835 self.assertEqual(len(self.execute('Any X, Y WHERE X travaille Y')), 1) |
|
836 self.execute("INSERT Personne X: X nom 'chouette', X travaille Y WHERE Y eid %(x)s", |
|
837 {'x': str(seid)}) |
|
838 self.assertEqual(len(self.execute('Any X, Y WHERE X travaille Y')), 2) |
|
839 |
|
840 def test_insert_4ter(self): |
|
841 peid = self.execute("INSERT Personne X: X nom 'bidule'")[0][0] |
|
842 seid = self.execute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X eid %(x)s", |
|
843 {'x': unicode(peid)})[0][0] |
|
844 self.assertEqual(len(self.execute('Any X, Y WHERE X travaille Y')), 1) |
|
845 self.execute("INSERT Personne X: X nom 'chouette', X travaille Y WHERE Y eid %(x)s", |
|
846 {'x': unicode(seid)}) |
|
847 self.assertEqual(len(self.execute('Any X, Y WHERE X travaille Y')), 2) |
|
848 |
|
849 def test_insert_5(self): |
|
850 self.execute("INSERT Personne X: X nom 'bidule'") |
|
851 self.execute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X nom 'bidule'") |
|
852 rset = self.execute('Any X, Y WHERE X nom "bidule", Y nom "toto", X travaille Y') |
|
853 self.assert_(rset.rows) |
|
854 self.assertEquals(rset.description, [('Personne', 'Societe',)]) |
|
855 |
|
856 def test_insert_6(self): |
|
857 self.execute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto', X travaille Y") |
|
858 rset = self.execute('Any X, Y WHERE X nom "bidule", Y nom "toto", X travaille Y') |
|
859 self.assert_(rset.rows) |
|
860 self.assertEquals(rset.description, [('Personne', 'Societe',)]) |
|
861 |
|
862 def test_insert_7(self): |
|
863 self.execute("INSERT Personne X, Societe Y: X nom N, Y nom 'toto', X travaille Y WHERE U login 'admin', U login N") |
|
864 rset = self.execute('Any X, Y WHERE X nom "admin", Y nom "toto", X travaille Y') |
|
865 self.assert_(rset.rows) |
|
866 self.assertEquals(rset.description, [('Personne', 'Societe',)]) |
|
867 |
|
868 def test_insert_8(self): |
|
869 self.execute("INSERT Societe Y, Personne X: Y nom N, X nom 'toto', X travaille Y WHERE U login 'admin', U login N") |
|
870 rset = self.execute('Any X, Y WHERE X nom "toto", Y nom "admin", X travaille Y') |
|
871 self.assert_(rset.rows) |
|
872 self.assertEquals(rset.description, [('Personne', 'Societe',)]) |
|
873 |
|
874 def test_insert_query_error(self): |
|
875 self.assertRaises(Exception, |
|
876 self.execute, |
|
877 "INSERT Personne X: X nom 'toto', X is Personne") |
|
878 self.assertRaises(Exception, |
|
879 self.execute, |
|
880 "INSERT Personne X: X nom 'toto', X is_instance_of Personne") |
|
881 self.assertRaises(QueryError, |
|
882 self.execute, |
|
883 "INSERT Personne X: X nom 'toto', X has_text 'tutu'") |
|
884 |
|
885 self.assertRaises(QueryError, |
|
886 self.execute, |
|
887 "INSERT EUser X: X login 'toto', X eid %s" % cnx.user(self.session).eid) |
|
888 |
|
889 def test_insertion_description_with_where(self): |
|
890 rset = self.execute('INSERT EUser E, EmailAddress EM: E login "X", E upassword "X", ' |
|
891 'E primary_email EM, EM address "X", E in_group G ' |
|
892 'WHERE G name "managers"') |
|
893 self.assertEquals(list(rset.description[0]), ['EUser', 'EmailAddress']) |
|
894 |
|
895 # deletion queries tests ################################################## |
|
896 |
|
897 def test_delete_1(self): |
|
898 self.execute("INSERT Personne Y: Y nom 'toto'") |
|
899 rset = self.execute('Personne X WHERE X nom "toto"') |
|
900 self.assertEqual(len(rset.rows), 1) |
|
901 self.execute("DELETE Personne Y WHERE Y nom 'toto'") |
|
902 rset = self.execute('Personne X WHERE X nom "toto"') |
|
903 self.assertEqual(len(rset.rows), 0) |
|
904 |
|
905 def test_delete_2(self): |
|
906 rset = self.execute("INSERT Personne X, Personne Y, Societe Z : X nom 'syt', Y nom 'adim', Z nom 'Logilab', X travaille Z, Y travaille Z") |
|
907 self.assertEquals(len(rset), 1) |
|
908 self.assertEquals(len(rset[0]), 3) |
|
909 self.assertEquals(rset.description[0], ('Personne', 'Personne', 'Societe')) |
|
910 self.assertEquals(self.execute('Any N WHERE X nom N, X eid %s'% rset[0][0])[0][0], 'syt') |
|
911 rset = self.execute('Personne X WHERE X travaille Y, Y nom "Logilab"') |
|
912 self.assertEqual(len(rset.rows), 2, rset.rows) |
|
913 self.execute("DELETE X travaille Y WHERE X is Personne, Y nom 'Logilabo'") |
|
914 rset = self.execute('Personne X WHERE X travaille Y, Y nom "Logilab"') |
|
915 self.assertEqual(len(rset.rows), 2, rset.rows) |
|
916 self.execute("DELETE X travaille Y WHERE X is Personne, Y nom 'Logilab'") |
|
917 rset = self.execute('Personne X WHERE X travaille Y, Y nom "Logilab"') |
|
918 self.assertEqual(len(rset.rows), 0, rset.rows) |
|
919 |
|
920 def test_delete_3(self): |
|
921 u, s = self._user_session(('users',)) |
|
922 peid, = self.o.execute(s, "INSERT Personne P: P nom 'toto'")[0] |
|
923 seid, = self.o.execute(s, "INSERT Societe S: S nom 'logilab'")[0] |
|
924 self.o.execute(s, "SET P travaille S") |
|
925 rset = self.execute('Personne P WHERE P travaille S') |
|
926 self.assertEqual(len(rset.rows), 1) |
|
927 self.execute("DELETE X travaille Y WHERE X eid %s, Y eid %s" % (peid, seid)) |
|
928 rset = self.execute('Personne P WHERE P travaille S') |
|
929 self.assertEqual(len(rset.rows), 0) |
|
930 |
|
931 def test_delete_symetric(self): |
|
932 teid1 = self.execute("INSERT Folder T: T name 'toto'")[0][0] |
|
933 teid2 = self.execute("INSERT Folder T: T name 'tutu'")[0][0] |
|
934 self.execute('SET X see_also Y WHERE X eid %s, Y eid %s' % (teid1, teid2)) |
|
935 rset = self.execute('Any X,Y WHERE X see_also Y') |
|
936 self.assertEquals(len(rset) , 2, rset.rows) |
|
937 self.execute('DELETE X see_also Y WHERE X eid %s, Y eid %s' % (teid1, teid2)) |
|
938 rset = self.execute('Any X,Y WHERE X see_also Y') |
|
939 self.assertEquals(len(rset) , 0) |
|
940 self.execute('SET X see_also Y WHERE X eid %s, Y eid %s' % (teid1, teid2)) |
|
941 rset = self.execute('Any X,Y WHERE X see_also Y') |
|
942 self.assertEquals(len(rset) , 2) |
|
943 self.execute('DELETE X see_also Y WHERE X eid %s, Y eid %s' % (teid2, teid1)) |
|
944 rset = self.execute('Any X,Y WHERE X see_also Y') |
|
945 self.assertEquals(len(rset) , 0) |
|
946 |
|
947 def test_nonregr_delete_cache(self): |
|
948 """test that relations are properly cleaned when an entity is deleted |
|
949 (using cachekey on sql generation returned always the same query for an eid, |
|
950 whatever the relation) |
|
951 """ |
|
952 u, s = self._user_session(('users',)) |
|
953 aeid, = self.o.execute(s, 'INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')[0] |
|
954 # XXX would be nice if the rql below was enough... |
|
955 #'INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y' |
|
956 eeid, = self.o.execute(s, 'INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y WHERE Y is EmailAddress')[0] |
|
957 self.o.execute(s, "DELETE Email X") |
|
958 sqlc = s.pool['system'] |
|
959 sqlc.execute('SELECT * FROM recipients_relation') |
|
960 self.assertEquals(len(sqlc.fetchall()), 0) |
|
961 sqlc.execute('SELECT * FROM owned_by_relation WHERE eid_from=%s'%eeid) |
|
962 self.assertEquals(len(sqlc.fetchall()), 0) |
|
963 |
|
964 def test_nonregr_delete_cache2(self): |
|
965 eid = self.execute("INSERT Folder T: T name 'toto'")[0][0] |
|
966 self.commit() |
|
967 # fill the cache |
|
968 self.execute("Any X WHERE X eid %(x)s", {'x': eid}, 'x') |
|
969 self.execute("Any X WHERE X eid %s" %eid) |
|
970 self.execute("Folder X WHERE X eid %(x)s", {'x': eid}, 'x') |
|
971 self.execute("Folder X WHERE X eid %s" %eid) |
|
972 self.execute("DELETE Folder T WHERE T eid %s"%eid) |
|
973 self.commit() |
|
974 rset = self.execute("Any X WHERE X eid %(x)s", {'x': eid}, 'x') |
|
975 self.assertEquals(rset.rows, []) |
|
976 rset = self.execute("Any X WHERE X eid %s" %eid) |
|
977 self.assertEquals(rset.rows, []) |
|
978 rset = self.execute("Folder X WHERE X eid %(x)s", {'x': eid}, 'x') |
|
979 self.assertEquals(rset.rows, []) |
|
980 rset = self.execute("Folder X WHERE X eid %s" %eid) |
|
981 self.assertEquals(rset.rows, []) |
|
982 |
|
983 # update queries tests #################################################### |
|
984 |
|
985 def test_update_1(self): |
|
986 self.execute("INSERT Personne Y: Y nom 'toto'") |
|
987 rset = self.execute('Personne X WHERE X nom "toto"') |
|
988 self.assertEqual(len(rset.rows), 1) |
|
989 self.execute("SET X nom 'tutu', X prenom 'original' WHERE X is Personne, X nom 'toto'") |
|
990 rset = self.execute('Any Y, Z WHERE X is Personne, X nom Y, X prenom Z') |
|
991 self.assertEqual(tuplify(rset.rows), [('tutu', 'original')]) |
|
992 |
|
993 def test_update_2(self): |
|
994 self.execute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'") |
|
995 #rset = self.execute('Any X, Y WHERE X nom "bidule", Y nom "toto"') |
|
996 #self.assertEqual(len(rset.rows), 1) |
|
997 #rset = self.execute('Any X, Y WHERE X travaille Y') |
|
998 #self.assertEqual(len(rset.rows), 0) |
|
999 self.execute("SET X travaille Y WHERE X nom 'bidule', Y nom 'toto'") |
|
1000 rset = self.execute('Any X, Y WHERE X travaille Y') |
|
1001 self.assertEqual(len(rset.rows), 1) |
|
1002 |
|
1003 def test_update_2bis(self): |
|
1004 rset = self.execute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'") |
|
1005 eid1, eid2 = rset[0][0], rset[0][1] |
|
1006 self.execute("SET X travaille Y WHERE X eid %(x)s, Y eid %(y)s", |
|
1007 {'x': str(eid1), 'y': str(eid2)}) |
|
1008 rset = self.execute('Any X, Y WHERE X travaille Y') |
|
1009 self.assertEqual(len(rset.rows), 1) |
|
1010 |
|
1011 def test_update_2ter(self): |
|
1012 rset = self.execute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'") |
|
1013 eid1, eid2 = rset[0][0], rset[0][1] |
|
1014 self.execute("SET X travaille Y WHERE X eid %(x)s, Y eid %(y)s", |
|
1015 {'x': unicode(eid1), 'y': unicode(eid2)}) |
|
1016 rset = self.execute('Any X, Y WHERE X travaille Y') |
|
1017 self.assertEqual(len(rset.rows), 1) |
|
1018 |
|
1019 ## def test_update_4(self): |
|
1020 ## self.execute("SET X know Y WHERE X ami Y") |
|
1021 |
|
1022 def test_update_multiple1(self): |
|
1023 peid1 = self.execute("INSERT Personne Y: Y nom 'tutu'")[0][0] |
|
1024 peid2 = self.execute("INSERT Personne Y: Y nom 'toto'")[0][0] |
|
1025 self.execute("SET X nom 'tutu', Y nom 'toto' WHERE X nom 'toto', Y nom 'tutu'") |
|
1026 self.assertEquals(self.execute('Any X WHERE X nom "toto"').rows, [[peid1]]) |
|
1027 self.assertEquals(self.execute('Any X WHERE X nom "tutu"').rows, [[peid2]]) |
|
1028 |
|
1029 def test_update_multiple2(self): |
|
1030 ueid = self.execute("INSERT EUser X: X login 'bob', X upassword 'toto'")[0][0] |
|
1031 peid1 = self.execute("INSERT Personne Y: Y nom 'turlu'")[0][0] |
|
1032 peid2 = self.execute("INSERT Personne Y: Y nom 'tutu'")[0][0] |
|
1033 self.execute('SET P1 owned_by U, P2 owned_by U ' |
|
1034 'WHERE P1 eid %s, P2 eid %s, U eid %s' % (peid1, peid2, ueid)) |
|
1035 self.failUnless(self.execute('Any X WHERE X eid %s, X owned_by U, U eid %s' |
|
1036 % (peid1, ueid))) |
|
1037 self.failUnless(self.execute('Any X WHERE X eid %s, X owned_by U, U eid %s' |
|
1038 % (peid2, ueid))) |
|
1039 |
|
1040 def test_update_math_expr(self): |
|
1041 orders = [r[0] for r in self.execute('Any O ORDERBY O WHERE ST name "Personne", X from_entity ST, X ordernum O')] |
|
1042 for i,v in enumerate(orders): |
|
1043 if v != orders[0]: |
|
1044 splitidx = i |
|
1045 break |
|
1046 self.execute('SET X ordernum Y+1 WHERE X from_entity SE, SE name "Personne", X ordernum Y, X ordernum >= %(order)s', |
|
1047 {'order': orders[splitidx]}) |
|
1048 orders2 = [r[0] for r in self.execute('Any O ORDERBY O WHERE ST name "Personne", X from_entity ST, X ordernum O')] |
|
1049 orders = orders[:splitidx] + [o+1 for o in orders[splitidx:]] |
|
1050 self.assertEquals(orders2, orders) |
|
1051 |
|
1052 def test_update_string_concat(self): |
|
1053 beid = self.execute("INSERT Bookmark Y: Y title 'toto', Y path '/view'")[0][0] |
|
1054 self.execute('SET X title XN + %(suffix)s WHERE X is Bookmark, X title XN', {'suffix': u'-moved'}) |
|
1055 newname = self.execute('Any XN WHERE X eid %(x)s, X title XN', {'x': beid}, 'x')[0][0] |
|
1056 self.assertEquals(newname, 'toto-moved') |
|
1057 |
|
1058 def test_update_query_error(self): |
|
1059 self.execute("INSERT Personne Y: Y nom 'toto'") |
|
1060 self.assertRaises(Exception, self.execute, "SET X nom 'toto', X is Personne") |
|
1061 self.assertRaises(QueryError, self.execute, "SET X nom 'toto', X has_text 'tutu' WHERE X is Personne") |
|
1062 self.assertRaises(QueryError, self.execute, "SET X login 'tutu', X eid %s" % cnx.user(self.session).eid) |
|
1063 |
|
1064 |
|
1065 # upassword encryption tests ################################################# |
|
1066 |
|
1067 def test_insert_upassword(self): |
|
1068 rset = self.execute("INSERT EUser X: X login 'bob', X upassword 'toto'") |
|
1069 self.assertEquals(len(rset.rows), 1) |
|
1070 self.assertEquals(rset.description, [('EUser',)]) |
|
1071 self.assertRaises(Unauthorized, |
|
1072 self.execute, "Any P WHERE X is EUser, X login 'bob', X upassword P") |
|
1073 cursor = self.pool['system'] |
|
1074 cursor.execute("SELECT upassword from EUser WHERE login='bob'") |
|
1075 passwd = cursor.fetchone()[0].getvalue() |
|
1076 self.assertEquals(passwd, crypt_password('toto', passwd[:2])) |
|
1077 rset = self.execute("Any X WHERE X is EUser, X login 'bob', X upassword '%s'" % passwd) |
|
1078 self.assertEquals(len(rset.rows), 1) |
|
1079 self.assertEquals(rset.description, [('EUser',)]) |
|
1080 |
|
1081 def test_update_upassword(self): |
|
1082 cursor = self.pool['system'] |
|
1083 rset = self.execute("INSERT EUser X: X login 'bob', X upassword %(pwd)s", {'pwd': 'toto'}) |
|
1084 self.assertEquals(rset.description[0][0], 'EUser') |
|
1085 rset = self.execute("SET X upassword %(pwd)s WHERE X is EUser, X login 'bob'", |
|
1086 {'pwd': 'tutu'}) |
|
1087 cursor.execute("SELECT upassword from EUser WHERE login='bob'") |
|
1088 passwd = cursor.fetchone()[0].getvalue() |
|
1089 self.assertEquals(passwd, crypt_password('tutu', passwd[:2])) |
|
1090 rset = self.execute("Any X WHERE X is EUser, X login 'bob', X upassword '%s'" % passwd) |
|
1091 self.assertEquals(len(rset.rows), 1) |
|
1092 self.assertEquals(rset.description, [('EUser',)]) |
|
1093 |
|
1094 # non regression tests #################################################### |
|
1095 |
|
1096 def test_nonregr_1(self): |
|
1097 teid = self.execute("INSERT Tag X: X name 'tag'")[0][0] |
|
1098 self.execute("SET X tags Y WHERE X name 'tag', Y is State, Y name 'activated'") |
|
1099 rset = self.execute('Any X WHERE T tags X') |
|
1100 self.assertEquals(len(rset.rows), 1, rset.rows) |
|
1101 rset = self.execute('Any T WHERE T tags X, X is State') |
|
1102 self.assertEquals(rset.rows, [[teid]]) |
|
1103 rset = self.execute('Any T WHERE T tags X') |
|
1104 self.assertEquals(rset.rows, [[teid]]) |
|
1105 |
|
1106 def test_nonregr_2(self): |
|
1107 teid = self.execute("INSERT Tag X: X name 'tag'")[0][0] |
|
1108 geid = self.execute("EGroup G WHERE G name 'users'")[0][0] |
|
1109 self.execute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s", |
|
1110 {'g': geid, 't': teid}) |
|
1111 rset = self.execute('Any X WHERE E eid %(x)s, E tags X', |
|
1112 {'x': teid}) |
|
1113 self.assertEquals(rset.rows, [[geid]]) |
|
1114 |
|
1115 def test_nonregr_3(self): |
|
1116 """bad sql generated on the second query (destination_state is not |
|
1117 detected as an inlined relation) |
|
1118 """ |
|
1119 rset = self.execute('Any S,ES,T WHERE S state_of ET, ET name "EUser",' |
|
1120 'ES allowed_transition T, T destination_state S') |
|
1121 self.assertEquals(len(rset.rows), 2) |
|
1122 |
|
1123 def test_nonregr_4(self): |
|
1124 # fix variables'type, else we get (nb of entity types with a 'name' attribute)**3 |
|
1125 # union queries and that make for instance a 266Ko sql query which is refused |
|
1126 # by the server (or client lib) |
|
1127 rset = self.execute('Any ER,SE,OE WHERE SE name "Comment", ER name "comments", OE name "Comment",' |
|
1128 'ER is ERType, SE is EEType, OE is EEType') |
|
1129 self.assertEquals(len(rset), 1) |
|
1130 |
|
1131 def test_nonregr_5(self): |
|
1132 # jpl #15505: equivalent queries returning different result sets |
|
1133 teid1 = self.execute("INSERT Folder X: X name 'hop'")[0][0] |
|
1134 teid2 = self.execute("INSERT Folder X: X name 'hip'")[0][0] |
|
1135 neid = self.execute("INSERT Note X: X todo_by U, X filed_under T WHERE U login 'admin', T name 'hop'")[0][0] |
|
1136 weid = self.execute("INSERT Affaire X: X concerne N, X filed_under T WHERE N is Note, T name 'hip'")[0][0] |
|
1137 rset1 = self.execute('Any N,U WHERE N filed_under T, T eid %s,' |
|
1138 'N todo_by U, W concerne N,' |
|
1139 'W is Affaire, W filed_under A, A eid %s' % (teid1, teid2)) |
|
1140 rset2 = self.execute('Any N,U WHERE N filed_under T, T eid %s,' |
|
1141 'N todo_by U, W concerne N,' |
|
1142 'W filed_under A, A eid %s' % (teid1, teid2)) |
|
1143 rset3 = self.execute('Any N,U WHERE N todo_by U, T eid %s,' |
|
1144 'N filed_under T, W concerne N,' |
|
1145 'W is Affaire, W filed_under A, A eid %s' % (teid1, teid2)) |
|
1146 rset4 = self.execute('Any N,U WHERE N todo_by U, T eid %s,' |
|
1147 'N filed_under T, W concerne N,' |
|
1148 'W filed_under A, A eid %s' % (teid1, teid2)) |
|
1149 self.assertEquals(rset1.rows, rset2.rows) |
|
1150 self.assertEquals(rset1.rows, rset3.rows) |
|
1151 self.assertEquals(rset1.rows, rset4.rows) |
|
1152 |
|
1153 def test_nonregr_6(self): |
|
1154 self.execute('Any N,COUNT(S) GROUPBY N ORDERBY COUNT(N) WHERE S name N, S is State') |
|
1155 |
|
1156 def test_sqlite_encoding(self): |
|
1157 """XXX this test was trying to show a bug on use of lower which only |
|
1158 occurs with non ascii string and misconfigured locale |
|
1159 """ |
|
1160 self.execute("INSERT Tag X: X name %(name)s," |
|
1161 "X modification_date %(modification_date)s," |
|
1162 "X creation_date %(creation_date)s", |
|
1163 {'name': u'éname0', |
|
1164 'modification_date': '2003/03/12 11:00', |
|
1165 'creation_date': '2000/07/03 11:00'}) |
|
1166 rset = self.execute('Any lower(N) ORDERBY LOWER(N) WHERE X is Tag, X name N,' |
|
1167 'X owned_by U, U eid %(x)s', |
|
1168 {'x':self.session.user.eid}, 'x') |
|
1169 self.assertEquals(rset.rows, [[u'\xe9name0']]) |
|
1170 |
|
1171 |
|
1172 def test_nonregr_description(self): |
|
1173 """check that a correct description is built in case where infered |
|
1174 solutions may be "fusionned" into one by the querier while all solutions |
|
1175 are needed to build the result's description |
|
1176 """ |
|
1177 self.execute("INSERT Personne X: X nom 'bidule'") |
|
1178 self.execute("INSERT Societe Y: Y nom 'toto'") |
|
1179 beid = self.execute("INSERT Basket B: B name 'mybasket'")[0][0] |
|
1180 self.execute("SET X in_basket B WHERE X is Personne") |
|
1181 self.execute("SET X in_basket B WHERE X is Societe") |
|
1182 rset = self.execute('Any X WHERE X in_basket B, B eid %s' % beid) |
|
1183 self.assertEquals(len(rset), 2) |
|
1184 self.assertEquals(rset.description, [('Personne',), ('Societe',)]) |
|
1185 |
|
1186 |
|
1187 def test_nonregr_cache_1(self): |
|
1188 peid = self.execute("INSERT Personne X: X nom 'bidule'")[0][0] |
|
1189 beid = self.execute("INSERT Basket X: X name 'tag'")[0][0] |
|
1190 self.execute("SET X in_basket Y WHERE X is Personne, Y eid %(y)s", |
|
1191 {'y': beid}) |
|
1192 rset = self.execute("Any X WHERE X in_basket B, B eid %(x)s", |
|
1193 {'x': beid}) |
|
1194 self.assertEquals(rset.rows, [[peid]]) |
|
1195 rset = self.execute("Any X WHERE X in_basket B, B eid %(x)s", |
|
1196 {'x': beid}) |
|
1197 self.assertEquals(rset.rows, [[peid]]) |
|
1198 |
|
1199 def test_nonregr_has_text_cache(self): |
|
1200 eid1 = self.execute("INSERT Personne X: X nom 'bidule'")[0][0] |
|
1201 eid2 = self.execute("INSERT Personne X: X nom 'tag'")[0][0] |
|
1202 self.commit() |
|
1203 rset = self.execute("Any X WHERE X has_text %(text)s", {'text': 'bidule'}) |
|
1204 self.assertEquals(rset.rows, [[eid1]]) |
|
1205 rset = self.execute("Any X WHERE X has_text %(text)s", {'text': 'tag'}) |
|
1206 self.assertEquals(rset.rows, [[eid2]]) |
|
1207 |
|
1208 def test_nonregr_sortterm_management(self): |
|
1209 """Error: Variable has no attribute 'sql' in rql2sql.py (visit_variable) |
|
1210 |
|
1211 cause: old variable ref inserted into a fresh rqlst copy |
|
1212 (in RQLSpliter._complex_select_plan) |
|
1213 """ |
|
1214 self.execute('Any X ORDERBY D DESC WHERE X creation_date D') |
|
1215 |
|
1216 def test_nonregr_extra_joins(self): |
|
1217 ueid = self.session.user.eid |
|
1218 teid1 = self.execute("INSERT Folder X: X name 'folder1'")[0][0] |
|
1219 teid2 = self.execute("INSERT Folder X: X name 'folder2'")[0][0] |
|
1220 neid1 = self.execute("INSERT Note X: X para 'note1'")[0][0] |
|
1221 neid2 = self.execute("INSERT Note X: X para 'note2'")[0][0] |
|
1222 self.execute("SET X filed_under Y WHERE X eid %s, Y eid %s" |
|
1223 % (neid1, teid1)) |
|
1224 self.execute("SET X filed_under Y WHERE X eid %s, Y eid %s" |
|
1225 % (neid2, teid2)) |
|
1226 self.execute("SET X todo_by Y WHERE X is Note, Y eid %s" % ueid) |
|
1227 rset = self.execute('Any N WHERE N todo_by U, N is Note, U eid %s, N filed_under T, T eid %s' |
|
1228 % (ueid, teid1)) |
|
1229 self.assertEquals(len(rset), 1) |
|
1230 |
|
1231 def test_nonregr_XXX(self): |
|
1232 teid = self.execute('Transition S WHERE S name "deactivate"')[0][0] |
|
1233 rset = self.execute('Any O WHERE O is State, ' |
|
1234 'S eid %(x)s, S transition_of ET, O state_of ET', {'x': teid}) |
|
1235 self.assertEquals(len(rset), 2) |
|
1236 rset = self.execute('Any O WHERE O is State, NOT S destination_state O, ' |
|
1237 'S eid %(x)s, S transition_of ET, O state_of ET', {'x': teid}) |
|
1238 self.assertEquals(len(rset), 1) |
|
1239 |
|
1240 |
|
1241 def test_nonregr_set_datetime(self): |
|
1242 # huum, psycopg specific |
|
1243 self.execute('SET X creation_date %(date)s WHERE X eid 1', {'date': today()}) |
|
1244 |
|
1245 def test_nonregr_set_query(self): |
|
1246 ueid = self.execute("INSERT EUser X: X login 'bob', X upassword 'toto'")[0][0] |
|
1247 self.execute("SET E in_group G, E in_state S, " |
|
1248 "E firstname %(firstname)s, E surname %(surname)s " |
|
1249 "WHERE E eid %(x)s, G name 'users', S name 'activated'", |
|
1250 {'x':ueid, 'firstname': u'jean', 'surname': u'paul'}, 'x') |
|
1251 |
|
1252 def test_nonregr_u_owned_by_u(self): |
|
1253 ueid = self.execute("INSERT EUser X: X login 'bob', X upassword 'toto', X in_group G " |
|
1254 "WHERE G name 'users'")[0][0] |
|
1255 rset = self.execute("EUser U") |
|
1256 self.assertEquals(len(rset), 3) # bob + admin + anon |
|
1257 rset = self.execute("Any U WHERE NOT U owned_by U") |
|
1258 self.assertEquals(len(rset), 0) # even admin created at repo initialization time should belong to itself |
|
1259 |
|
1260 def test_nonreg_update_index(self): |
|
1261 # this is the kind of queries generated by "cubicweb-ctl db-check -ry" |
|
1262 self.execute("SET X description D WHERE X is State, X description D") |
|
1263 |
|
1264 def test_nonregr_is(self): |
|
1265 uteid = self.execute('Any ET WHERE ET name "EUser"')[0][0] |
|
1266 self.execute('Any X, ET WHERE X is ET, ET eid %s' % uteid) |
|
1267 |
|
1268 def test_nonregr_orderby(self): |
|
1269 seid = self.execute('Any X WHERE X name "activated"')[0][0] |
|
1270 self.execute('Any X,S, MAX(T) GROUPBY X,S ORDERBY S WHERE X is EUser, T tags X, S eid IN(%s), X in_state S' % seid) |
|
1271 |
|
1272 def test_nonregr_solution_cache(self): |
|
1273 self.skip('XXX should be fixed or documented') # (doesn't occur if cache key is provided.) |
|
1274 rset = self.execute('Any X WHERE X is EUser, X eid %(x)s', {'x':self.ueid}) |
|
1275 self.assertEquals(len(rset), 1) |
|
1276 rset = self.execute('Any X WHERE X is EUser, X eid %(x)s', {'x':12345}) |
|
1277 self.assertEquals(len(rset), 0) |
|
1278 |
|
1279 def test_nonregr_final_norestr(self): |
|
1280 self.assertRaises(BadRQLQuery, self.execute, 'Date X') |
|
1281 |
|
1282 |
|
1283 if __name__ == '__main__': |
|
1284 unittest_main() |