1 # coding: utf-8 |
|
2 # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
3 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
4 # |
|
5 # This file is part of CubicWeb. |
|
6 # |
|
7 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
8 # terms of the GNU Lesser General Public License as published by the Free |
|
9 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
10 # any later version. |
|
11 # |
|
12 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
13 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
14 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
15 # details. |
|
16 # |
|
17 # You should have received a copy of the GNU Lesser General Public License along |
|
18 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
19 """unit tests for module cubicweb.utils""" |
|
20 |
|
21 from six import string_types |
|
22 from six.moves import cPickle as pickle |
|
23 from six.moves.urllib.parse import urlsplit |
|
24 |
|
25 from rql import parse |
|
26 |
|
27 from logilab.common.testlib import TestCase, unittest_main, mock_object |
|
28 |
|
29 from cubicweb.devtools.testlib import CubicWebTC |
|
30 from cubicweb.rset import NotAnEntity, ResultSet, attr_desc_iterator |
|
31 from cubicweb import NoResultError, MultipleResultsError |
|
32 |
|
33 |
|
34 def pprelcachedict(d): |
|
35 res = {} |
|
36 for k, (rset, related) in d.items(): |
|
37 res[k] = sorted(v.eid for v in related) |
|
38 return sorted(res.items()) |
|
39 |
|
40 |
|
41 class AttrDescIteratorTC(TestCase): |
|
42 """TestCase for cubicweb.rset.attr_desc_iterator""" |
|
43 |
|
44 def test_relations_description(self): |
|
45 """tests relations_description() function""" |
|
46 queries = { |
|
47 'Any U,L,M where U is CWUser, U login L, U mail M' : [(1, 'login', 'subject'), (2, 'mail', 'subject')], |
|
48 'Any U,L,M where U is CWUser, L is Foo, U mail M' : [(2, 'mail', 'subject')], |
|
49 'Any C,P where C is Company, C employs P' : [(1, 'employs', 'subject')], |
|
50 'Any C,P where C is Company, P employed_by P' : [], |
|
51 'Any C where C is Company, C employs P' : [], |
|
52 } |
|
53 for rql, relations in queries.items(): |
|
54 result = list(attr_desc_iterator(parse(rql).children[0], 0, 0)) |
|
55 self.assertEqual((rql, result), (rql, relations)) |
|
56 |
|
57 def test_relations_description_indexed(self): |
|
58 """tests relations_description() function""" |
|
59 queries = { |
|
60 'Any C,U,P,L,M where C is Company, C employs P, U is CWUser, U login L, U mail M' : |
|
61 {0: [(2,'employs', 'subject')], 1: [(3,'login', 'subject'), (4,'mail', 'subject')]}, |
|
62 } |
|
63 for rql, results in queries.items(): |
|
64 for idx, relations in results.items(): |
|
65 result = list(attr_desc_iterator(parse(rql).children[0], idx, idx)) |
|
66 self.assertEqual(result, relations) |
|
67 |
|
68 def test_subquery_callfunc(self): |
|
69 rql = ('Any A,B,C,COUNT(D) GROUPBY A,B,C WITH A,B,C,D BEING ' |
|
70 '(Any YEAR(CD), MONTH(CD), S, X WHERE X is CWUser, X creation_date CD, X in_state S)') |
|
71 rqlst = parse(rql) |
|
72 select, col = rqlst.locate_subquery(2, 'CWUser', None) |
|
73 result = list(attr_desc_iterator(select, col, 2)) |
|
74 self.assertEqual(result, []) |
|
75 |
|
76 def test_subquery_callfunc_2(self): |
|
77 rql = ('Any X,S,L WHERE X in_state S WITH X, L BEING (Any X,MAX(L) GROUPBY X WHERE X is CWUser, T wf_info_for X, T creation_date L)') |
|
78 rqlst = parse(rql) |
|
79 select, col = rqlst.locate_subquery(0, 'CWUser', None) |
|
80 result = list(attr_desc_iterator(select, col, 0)) |
|
81 self.assertEqual(result, [(1, 'in_state', 'subject')]) |
|
82 |
|
83 |
|
84 class ResultSetTC(CubicWebTC): |
|
85 |
|
86 def setUp(self): |
|
87 super(ResultSetTC, self).setUp() |
|
88 self.rset = ResultSet([[12, 'adim'], [13, 'syt']], |
|
89 'Any U,L where U is CWUser, U login L', |
|
90 description=[['CWUser', 'String'], ['Bar', 'String']]) |
|
91 self.rset.req = mock_object(vreg=self.vreg) |
|
92 |
|
93 def compare_urls(self, url1, url2): |
|
94 info1 = urlsplit(url1) |
|
95 info2 = urlsplit(url2) |
|
96 self.assertEqual(info1[:3], info2[:3]) |
|
97 if info1[3] != info2[3]: |
|
98 params1 = dict(pair.split('=') for pair in info1[3].split('&')) |
|
99 params2 = dict(pair.split('=') for pair in info1[3].split('&')) |
|
100 self.assertDictEqual(params1, params2) |
|
101 |
|
102 def test_pickle(self): |
|
103 del self.rset.req |
|
104 rs2 = pickle.loads(pickle.dumps(self.rset)) |
|
105 self.assertEqual(self.rset.rows, rs2.rows) |
|
106 self.assertEqual(self.rset.rowcount, rs2.rowcount) |
|
107 self.assertEqual(self.rset.rql, rs2.rql) |
|
108 self.assertEqual(self.rset.description, rs2.description) |
|
109 |
|
110 def test_build_url(self): |
|
111 with self.admin_access.web_request() as req: |
|
112 baseurl = req.base_url() |
|
113 self.compare_urls(req.build_url('view', vid='foo', rql='yo'), |
|
114 '%sview?vid=foo&rql=yo' % baseurl) |
|
115 self.compare_urls(req.build_url('view', _restpath='task/title/go'), |
|
116 '%stask/title/go' % baseurl) |
|
117 #self.compare_urls(req.build_url('view', _restpath='/task/title/go'), |
|
118 # '%stask/title/go' % baseurl) |
|
119 # empty _restpath should not crash |
|
120 self.compare_urls(req.build_url('view', _restpath=''), baseurl) |
|
121 self.assertNotIn('https', req.build_url('view', vid='foo', rql='yo', |
|
122 __secure__=True)) |
|
123 try: |
|
124 self.config.global_set_option('https-url', 'https://testing.fr/') |
|
125 self.assertTrue('https', req.build_url('view', vid='foo', rql='yo', |
|
126 __secure__=True)) |
|
127 self.compare_urls(req.build_url('view', vid='foo', rql='yo', |
|
128 __secure__=True), |
|
129 '%sview?vid=foo&rql=yo' % req.base_url(secure=True)) |
|
130 finally: |
|
131 self.config.global_set_option('https-url', None) |
|
132 |
|
133 |
|
134 def test_build(self): |
|
135 """test basic build of a ResultSet""" |
|
136 rs = ResultSet([1,2,3], 'CWGroup X', description=['CWGroup', 'CWGroup', 'CWGroup']) |
|
137 self.assertEqual(rs.rowcount, 3) |
|
138 self.assertEqual(rs.rows, [1,2,3]) |
|
139 self.assertEqual(rs.description, ['CWGroup', 'CWGroup', 'CWGroup']) |
|
140 |
|
141 |
|
142 def test_limit(self): |
|
143 rs = ResultSet([[12000, 'adim'], [13000, 'syt'], [14000, 'nico']], |
|
144 'Any U,L where U is CWUser, U login L', |
|
145 description=[['CWUser', 'String']] * 3) |
|
146 with self.admin_access.web_request() as req: |
|
147 rs.req = req |
|
148 rs.vreg = self.vreg |
|
149 self.assertEqual(rs.limit(2).rows, [[12000, 'adim'], [13000, 'syt']]) |
|
150 rs2 = rs.limit(2, offset=1) |
|
151 self.assertEqual(rs2.rows, [[13000, 'syt'], [14000, 'nico']]) |
|
152 self.assertEqual(rs2.get_entity(0, 0).cw_row, 0) |
|
153 self.assertEqual(rs.limit(2, offset=2).rows, [[14000, 'nico']]) |
|
154 self.assertEqual(rs.limit(2, offset=3).rows, []) |
|
155 |
|
156 def test_limit_2(self): |
|
157 with self.admin_access.web_request() as req: |
|
158 # drop user from cache for the sake of this test |
|
159 req.drop_entity_cache(req.user.eid) |
|
160 rs = req.execute('Any E,U WHERE E is CWEType, E created_by U') |
|
161 # get entity on row 9. This will fill its created_by relation cache, |
|
162 # with cwuser on row 9 as well |
|
163 e1 = rs.get_entity(9, 0) |
|
164 # get entity on row 10. This will fill its created_by relation cache, |
|
165 # with cwuser built on row 9 |
|
166 e2 = rs.get_entity(10, 0) |
|
167 # limit result set from row 10 |
|
168 rs.limit(1, 10, inplace=True) |
|
169 # get back eid |
|
170 e = rs.get_entity(0, 0) |
|
171 self.assertTrue(e2 is e) |
|
172 # rs.limit has properly removed cwuser for request cache, but it's |
|
173 # still referenced by e/e2 relation cache |
|
174 u = e.created_by[0] |
|
175 # now ensure this doesn't trigger IndexError because cwuser.cw_row is 9 |
|
176 # while now rset has only one row |
|
177 u.cw_rset[u.cw_row] |
|
178 |
|
179 def test_filter(self): |
|
180 rs = ResultSet([[12000, 'adim'], [13000, 'syt'], [14000, 'nico']], |
|
181 'Any U,L where U is CWUser, U login L', |
|
182 description=[['CWUser', 'String']] * 3) |
|
183 with self.admin_access.web_request() as req: |
|
184 rs.req = req |
|
185 rs.vreg = self.vreg |
|
186 def test_filter(entity): |
|
187 return entity.login != 'nico' |
|
188 |
|
189 rs2 = rs.filtered_rset(test_filter) |
|
190 self.assertEqual(len(rs2), 2) |
|
191 self.assertEqual([login for _, login in rs2], ['adim', 'syt']) |
|
192 self.assertEqual(rs2.description, rs.description[1:]) |
|
193 |
|
194 def test_transform(self): |
|
195 rs = ResultSet([[12, 'adim'], [13, 'syt'], [14, 'nico']], |
|
196 'Any U,L where U is CWUser, U login L', |
|
197 description=[['CWUser', 'String']] * 3) |
|
198 with self.admin_access.web_request() as req: |
|
199 rs.req = req |
|
200 def test_transform(row, desc): |
|
201 return row[1:], desc[1:] |
|
202 rs2 = rs.transformed_rset(test_transform) |
|
203 |
|
204 self.assertEqual(len(rs2), 3) |
|
205 self.assertEqual(list(rs2), [['adim'],['syt'],['nico']]) |
|
206 |
|
207 def test_sort(self): |
|
208 rs = ResultSet([[12000, 'adim'], [13000, 'syt'], [14000, 'nico']], |
|
209 'Any U,L where U is CWUser, U login L', |
|
210 description=[['CWUser', 'String']] * 3) |
|
211 with self.admin_access.web_request() as req: |
|
212 rs.req = req |
|
213 rs.vreg = self.vreg |
|
214 |
|
215 rs2 = rs.sorted_rset(lambda e:e.cw_attr_cache['login']) |
|
216 self.assertEqual(len(rs2), 3) |
|
217 self.assertEqual([login for _, login in rs2], ['adim', 'nico', 'syt']) |
|
218 # make sure rs is unchanged |
|
219 self.assertEqual([login for _, login in rs], ['adim', 'syt', 'nico']) |
|
220 |
|
221 rs2 = rs.sorted_rset(lambda e:e.cw_attr_cache['login'], reverse=True) |
|
222 self.assertEqual(len(rs2), 3) |
|
223 self.assertEqual([login for _, login in rs2], ['syt', 'nico', 'adim']) |
|
224 # make sure rs is unchanged |
|
225 self.assertEqual([login for _, login in rs], ['adim', 'syt', 'nico']) |
|
226 |
|
227 rs3 = rs.sorted_rset(lambda row: row[1], col=-1) |
|
228 self.assertEqual(len(rs3), 3) |
|
229 self.assertEqual([login for _, login in rs3], ['adim', 'nico', 'syt']) |
|
230 # make sure rs is unchanged |
|
231 self.assertEqual([login for _, login in rs], ['adim', 'syt', 'nico']) |
|
232 |
|
233 def test_split(self): |
|
234 rs = ResultSet([[12000, 'adim', u'Adim chez les pinguins'], |
|
235 [12000, 'adim', u'Jardiner facile'], |
|
236 [13000, 'syt', u'Le carrelage en 42 leçons'], |
|
237 [14000, 'nico', u'La tarte tatin en 15 minutes'], |
|
238 [14000, 'nico', u"L'épluchage du castor commun"]], |
|
239 'Any U, L, T WHERE U is CWUser, U login L,'\ |
|
240 'D created_by U, D title T', |
|
241 description=[['CWUser', 'String', 'String']] * 5) |
|
242 with self.admin_access.web_request() as req: |
|
243 rs.req = req |
|
244 rs.vreg = self.vreg |
|
245 rsets = rs.split_rset(lambda e:e.cw_attr_cache['login']) |
|
246 self.assertEqual(len(rsets), 3) |
|
247 self.assertEqual([login for _, login,_ in rsets[0]], ['adim', 'adim']) |
|
248 self.assertEqual([login for _, login,_ in rsets[1]], ['syt']) |
|
249 self.assertEqual([login for _, login,_ in rsets[2]], ['nico', 'nico']) |
|
250 # make sure rs is unchanged |
|
251 self.assertEqual([login for _, login,_ in rs], ['adim', 'adim', 'syt', 'nico', 'nico']) |
|
252 |
|
253 rsets = rs.split_rset(lambda e:e.cw_attr_cache['login'], return_dict=True) |
|
254 self.assertEqual(len(rsets), 3) |
|
255 self.assertEqual([login for _, login,_ in rsets['nico']], ['nico', 'nico']) |
|
256 self.assertEqual([login for _, login,_ in rsets['adim']], ['adim', 'adim']) |
|
257 self.assertEqual([login for _, login,_ in rsets['syt']], ['syt']) |
|
258 # make sure rs is unchanged |
|
259 self.assertEqual([login for _, login,_ in rs], ['adim', 'adim', 'syt', 'nico', 'nico']) |
|
260 |
|
261 rsets = rs.split_rset(lambda s: s.count('d'), col=2) |
|
262 self.assertEqual(len(rsets), 2) |
|
263 self.assertEqual([title for _, _, title in rsets[0]], |
|
264 [u"Adim chez les pinguins", |
|
265 u"Jardiner facile", |
|
266 u"L'épluchage du castor commun",]) |
|
267 self.assertEqual([title for _, _, title in rsets[1]], |
|
268 [u"Le carrelage en 42 leçons", |
|
269 u"La tarte tatin en 15 minutes",]) |
|
270 # make sure rs is unchanged |
|
271 self.assertEqual([title for _, _, title in rs], |
|
272 [u'Adim chez les pinguins', |
|
273 u'Jardiner facile', |
|
274 u'Le carrelage en 42 leçons', |
|
275 u'La tarte tatin en 15 minutes', |
|
276 u"L'épluchage du castor commun"]) |
|
277 |
|
278 def test_cached_syntax_tree(self): |
|
279 """make sure syntax tree is cached""" |
|
280 rqlst1 = self.rset.syntax_tree() |
|
281 rqlst2 = self.rset.syntax_tree() |
|
282 self.assertIs(rqlst1, rqlst2) |
|
283 |
|
284 def test_get_entity_simple(self): |
|
285 with self.admin_access.web_request() as req: |
|
286 req.create_entity('CWUser', login=u'adim', upassword='adim', |
|
287 surname=u'di mascio', firstname=u'adrien') |
|
288 req.drop_entity_cache() |
|
289 e = req.execute('Any X,T WHERE X login "adim", X surname T').get_entity(0, 0) |
|
290 self.assertEqual(e.cw_attr_cache['surname'], 'di mascio') |
|
291 self.assertRaises(KeyError, e.cw_attr_cache.__getitem__, 'firstname') |
|
292 self.assertRaises(KeyError, e.cw_attr_cache.__getitem__, 'creation_date') |
|
293 self.assertEqual(pprelcachedict(e._cw_related_cache), []) |
|
294 e.complete() |
|
295 self.assertEqual(e.cw_attr_cache['firstname'], 'adrien') |
|
296 self.assertEqual(pprelcachedict(e._cw_related_cache), []) |
|
297 |
|
298 def test_get_entity_advanced(self): |
|
299 with self.admin_access.web_request() as req: |
|
300 req.create_entity('Bookmark', title=u'zou', path=u'/view') |
|
301 req.drop_entity_cache() |
|
302 req.execute('SET X bookmarked_by Y WHERE X is Bookmark, Y login "anon"') |
|
303 rset = req.execute('Any X,Y,XT,YN WHERE X bookmarked_by Y, X title XT, Y login YN') |
|
304 |
|
305 e = rset.get_entity(0, 0) |
|
306 self.assertEqual(e.cw_row, 0) |
|
307 self.assertEqual(e.cw_col, 0) |
|
308 self.assertEqual(e.cw_attr_cache['title'], 'zou') |
|
309 self.assertRaises(KeyError, e.cw_attr_cache.__getitem__, 'path') |
|
310 self.assertEqual(e.view('text'), 'zou') |
|
311 self.assertEqual(pprelcachedict(e._cw_related_cache), []) |
|
312 |
|
313 e = rset.get_entity(0, 1) |
|
314 self.assertEqual(e.cw_row, 0) |
|
315 self.assertEqual(e.cw_col, 1) |
|
316 self.assertEqual(e.cw_attr_cache['login'], 'anon') |
|
317 self.assertRaises(KeyError, e.cw_attr_cache.__getitem__, 'firstname') |
|
318 self.assertEqual(pprelcachedict(e._cw_related_cache), |
|
319 []) |
|
320 e.complete() |
|
321 self.assertEqual(e.cw_attr_cache['firstname'], None) |
|
322 self.assertEqual(e.view('text'), 'anon') |
|
323 self.assertEqual(pprelcachedict(e._cw_related_cache), |
|
324 []) |
|
325 |
|
326 self.assertRaises(NotAnEntity, rset.get_entity, 0, 2) |
|
327 self.assertRaises(NotAnEntity, rset.get_entity, 0, 3) |
|
328 |
|
329 def test_get_entity_relation_cache_compt(self): |
|
330 with self.admin_access.web_request() as req: |
|
331 rset = req.execute('Any X,S WHERE X in_state S, X login "anon"') |
|
332 e = rset.get_entity(0, 0) |
|
333 seid = req.execute('State X WHERE X name "activated"')[0][0] |
|
334 # for_user / in_group are prefetched in CWUser __init__, in_state should |
|
335 # be filed from our query rset |
|
336 self.assertEqual(pprelcachedict(e._cw_related_cache), |
|
337 [('in_state_subject', [seid])]) |
|
338 |
|
339 def test_get_entity_advanced_prefilled_cache(self): |
|
340 with self.admin_access.web_request() as req: |
|
341 e = req.create_entity('Bookmark', title=u'zou', path=u'path') |
|
342 req.cnx.commit() |
|
343 rset = req.execute('Any X,U,S,XT,UL,SN WHERE X created_by U, U in_state S, ' |
|
344 'X title XT, S name SN, U login UL, X eid %s' % e.eid) |
|
345 e = rset.get_entity(0, 0) |
|
346 self.assertEqual(e.cw_attr_cache['title'], 'zou') |
|
347 self.assertEqual(pprelcachedict(e._cw_related_cache), |
|
348 [('created_by_subject', [req.user.eid])]) |
|
349 # first level of recursion |
|
350 u = e.created_by[0] |
|
351 self.assertEqual(u.cw_attr_cache['login'], 'admin') |
|
352 self.assertRaises(KeyError, u.cw_attr_cache.__getitem__, 'firstname') |
|
353 # second level of recursion |
|
354 s = u.in_state[0] |
|
355 self.assertEqual(s.cw_attr_cache['name'], 'activated') |
|
356 self.assertRaises(KeyError, s.cw_attr_cache.__getitem__, 'description') |
|
357 |
|
358 |
|
359 def test_get_entity_cache_with_left_outer_join(self): |
|
360 with self.admin_access.web_request() as req: |
|
361 eid = req.execute('INSERT CWUser E: E login "joe", E upassword "joe", E in_group G ' |
|
362 'WHERE G name "users"')[0][0] |
|
363 rset = req.execute('Any X,E WHERE X eid %(x)s, X primary_email E?', {'x': eid}) |
|
364 e = rset.get_entity(0, 0) |
|
365 # if any of the assertion below fails with a KeyError, the relation is not cached |
|
366 # related entities should be an empty list |
|
367 self.assertEqual(e._cw_related_cache['primary_email_subject'][True], ()) |
|
368 # related rset should be an empty rset |
|
369 cached = e._cw_related_cache['primary_email_subject'][False] |
|
370 self.assertIsInstance(cached, ResultSet) |
|
371 self.assertEqual(cached.rowcount, 0) |
|
372 |
|
373 |
|
374 def test_get_entity_union(self): |
|
375 with self.admin_access.web_request() as req: |
|
376 e = req.create_entity('Bookmark', title=u'manger', path=u'path') |
|
377 req.drop_entity_cache() |
|
378 rset = req.execute('Any X,N ORDERBY N WITH X,N BEING ' |
|
379 '((Any X,N WHERE X is Bookmark, X title N)' |
|
380 ' UNION ' |
|
381 ' (Any X,N WHERE X is CWGroup, X name N))') |
|
382 expected = (('CWGroup', 'guests'), ('CWGroup', 'managers'), |
|
383 ('Bookmark', 'manger'), ('CWGroup', 'owners'), |
|
384 ('CWGroup', 'users')) |
|
385 for entity in rset.entities(): # test get_entity for each row actually |
|
386 etype, n = expected[entity.cw_row] |
|
387 self.assertEqual(entity.cw_etype, etype) |
|
388 attr = etype == 'Bookmark' and 'title' or 'name' |
|
389 self.assertEqual(entity.cw_attr_cache[attr], n) |
|
390 |
|
391 def test_one(self): |
|
392 with self.admin_access.web_request() as req: |
|
393 req.create_entity('CWUser', login=u'cdevienne', |
|
394 upassword=u'cdevienne', |
|
395 surname=u'de Vienne', |
|
396 firstname=u'Christophe') |
|
397 e = req.execute('Any X WHERE X login "cdevienne"').one() |
|
398 |
|
399 self.assertEqual(e.surname, u'de Vienne') |
|
400 |
|
401 e = req.execute( |
|
402 'Any X, N WHERE X login "cdevienne", X surname N').one() |
|
403 self.assertEqual(e.surname, u'de Vienne') |
|
404 |
|
405 e = req.execute( |
|
406 'Any N, X WHERE X login "cdevienne", X surname N').one(col=1) |
|
407 self.assertEqual(e.surname, u'de Vienne') |
|
408 |
|
409 def test_one_no_rows(self): |
|
410 with self.admin_access.web_request() as req: |
|
411 with self.assertRaises(NoResultError): |
|
412 req.execute('Any X WHERE X login "patanok"').one() |
|
413 |
|
414 def test_one_multiple_rows(self): |
|
415 with self.admin_access.web_request() as req: |
|
416 req.create_entity( |
|
417 'CWUser', login=u'cdevienne', upassword=u'cdevienne', |
|
418 surname=u'de Vienne', firstname=u'Christophe') |
|
419 |
|
420 req.create_entity( |
|
421 'CWUser', login=u'adim', upassword='adim', surname=u'di mascio', |
|
422 firstname=u'adrien') |
|
423 |
|
424 with self.assertRaises(MultipleResultsError): |
|
425 req.execute('Any X WHERE X is CWUser').one() |
|
426 |
|
427 def test_related_entity_optional(self): |
|
428 with self.admin_access.web_request() as req: |
|
429 e = req.create_entity('Bookmark', title=u'aaaa', path=u'path') |
|
430 rset = req.execute('Any B,U,L WHERE B bookmarked_by U?, U login L') |
|
431 entity, rtype = rset.related_entity(0, 2) |
|
432 self.assertEqual(entity, None) |
|
433 self.assertEqual(rtype, None) |
|
434 |
|
435 def test_related_entity_union_subquery_1(self): |
|
436 with self.admin_access.web_request() as req: |
|
437 e = req.create_entity('Bookmark', title=u'aaaa', path=u'path') |
|
438 rset = req.execute('Any X,N ORDERBY N WITH X,N BEING ' |
|
439 '((Any X,N WHERE X is CWGroup, X name N)' |
|
440 ' UNION ' |
|
441 ' (Any X,N WHERE X is Bookmark, X title N))') |
|
442 entity, rtype = rset.related_entity(0, 1) |
|
443 self.assertEqual(entity.eid, e.eid) |
|
444 self.assertEqual(rtype, 'title') |
|
445 self.assertEqual(entity.title, 'aaaa') |
|
446 entity, rtype = rset.related_entity(1, 1) |
|
447 self.assertEqual(entity.cw_etype, 'CWGroup') |
|
448 self.assertEqual(rtype, 'name') |
|
449 self.assertEqual(entity.name, 'guests') |
|
450 |
|
451 def test_related_entity_union_subquery_2(self): |
|
452 with self.admin_access.web_request() as req: |
|
453 e = req.create_entity('Bookmark', title=u'aaaa', path=u'path') |
|
454 rset = req.execute('Any X,N ORDERBY N WHERE X is Bookmark WITH X,N BEING ' |
|
455 '((Any X,N WHERE X is CWGroup, X name N)' |
|
456 ' UNION ' |
|
457 ' (Any X,N WHERE X is Bookmark, X title N))') |
|
458 entity, rtype = rset.related_entity(0, 1) |
|
459 self.assertEqual(entity.eid, e.eid) |
|
460 self.assertEqual(rtype, 'title') |
|
461 self.assertEqual(entity.title, 'aaaa') |
|
462 |
|
463 def test_related_entity_union_subquery_3(self): |
|
464 with self.admin_access.web_request() as req: |
|
465 e = req.create_entity('Bookmark', title=u'aaaa', path=u'path') |
|
466 rset = req.execute('Any X,N ORDERBY N WITH N,X BEING ' |
|
467 '((Any N,X WHERE X is CWGroup, X name N)' |
|
468 ' UNION ' |
|
469 ' (Any N,X WHERE X is Bookmark, X title N))') |
|
470 entity, rtype = rset.related_entity(0, 1) |
|
471 self.assertEqual(entity.eid, e.eid) |
|
472 self.assertEqual(rtype, 'title') |
|
473 self.assertEqual(entity.title, 'aaaa') |
|
474 |
|
475 def test_related_entity_union_subquery_4(self): |
|
476 with self.admin_access.web_request() as req: |
|
477 e = req.create_entity('Bookmark', title=u'aaaa', path=u'path') |
|
478 rset = req.execute('Any X,X, N ORDERBY N WITH X,N BEING ' |
|
479 '((Any X,N WHERE X is CWGroup, X name N)' |
|
480 ' UNION ' |
|
481 ' (Any X,N WHERE X is Bookmark, X title N))') |
|
482 entity, rtype = rset.related_entity(0, 2) |
|
483 self.assertEqual(entity.eid, e.eid) |
|
484 self.assertEqual(rtype, 'title') |
|
485 self.assertEqual(entity.title, 'aaaa') |
|
486 |
|
487 def test_related_entity_trap_subquery(self): |
|
488 with self.admin_access.web_request() as req: |
|
489 req.create_entity('Bookmark', title=u'test bookmark', path=u'') |
|
490 req.execute('SET B bookmarked_by U WHERE U login "admin"') |
|
491 rset = req.execute('Any B,T,L WHERE B bookmarked_by U, U login L ' |
|
492 'WITH B,T BEING (Any B,T WHERE B is Bookmark, B title T)') |
|
493 rset.related_entity(0, 2) |
|
494 |
|
495 def test_related_entity_subquery_outerjoin(self): |
|
496 with self.admin_access.web_request() as req: |
|
497 rset = req.execute('Any X,S,L WHERE X in_state S ' |
|
498 'WITH X, L BEING (Any X,MAX(L) GROUPBY X ' |
|
499 'WHERE X is CWUser, T? wf_info_for X, T creation_date L)') |
|
500 self.assertEqual(len(rset), 2) |
|
501 rset.related_entity(0, 1) |
|
502 rset.related_entity(0, 2) |
|
503 |
|
504 def test_entities(self): |
|
505 with self.admin_access.web_request() as req: |
|
506 rset = req.execute('Any U,G WHERE U in_group G') |
|
507 # make sure we have at least one element |
|
508 self.assertTrue(rset) |
|
509 self.assertEqual(set(e.e_schema.type for e in rset.entities(0)), |
|
510 set(['CWUser',])) |
|
511 self.assertEqual(set(e.e_schema.type for e in rset.entities(1)), |
|
512 set(['CWGroup',])) |
|
513 |
|
514 def test_iter_rows_with_entities(self): |
|
515 with self.admin_access.web_request() as req: |
|
516 rset = req.execute('Any U,UN,G,GN WHERE U in_group G, U login UN, G name GN') |
|
517 # make sure we have at least one element |
|
518 self.assertTrue(rset) |
|
519 out = list(rset.iter_rows_with_entities())[0] |
|
520 self.assertEqual( out[0].login, out[1] ) |
|
521 self.assertEqual( out[2].name, out[3] ) |
|
522 |
|
523 def test_printable_rql(self): |
|
524 with self.admin_access.web_request() as req: |
|
525 rset = req.execute(u'CWEType X WHERE X final FALSE') |
|
526 self.assertEqual(rset.printable_rql(), |
|
527 'Any X WHERE X final FALSE, X is CWEType') |
|
528 |
|
529 def test_searched_text(self): |
|
530 with self.admin_access.web_request() as req: |
|
531 rset = req.execute(u'Any X WHERE X has_text "foobar"') |
|
532 self.assertEqual(rset.searched_text(), 'foobar') |
|
533 rset = req.execute(u'Any X WHERE X has_text %(text)s', {'text' : 'foo'}) |
|
534 self.assertEqual(rset.searched_text(), 'foo') |
|
535 |
|
536 def test_union_limited_rql(self): |
|
537 with self.admin_access.web_request() as req: |
|
538 rset = req.execute('(Any X,N WHERE X is Bookmark, X title N)' |
|
539 ' UNION ' |
|
540 '(Any X,N WHERE X is CWGroup, X name N)') |
|
541 rset.limit(2, 10, inplace=True) |
|
542 self.assertEqual(rset.limited_rql(), |
|
543 'Any A,B LIMIT 2 OFFSET 10 ' |
|
544 'WITH A,B BEING (' |
|
545 '(Any X,N WHERE X is Bookmark, X title N) ' |
|
546 'UNION ' |
|
547 '(Any X,N WHERE X is CWGroup, X name N)' |
|
548 ')') |
|
549 |
|
550 def test_count_users_by_date(self): |
|
551 with self.admin_access.web_request() as req: |
|
552 rset = req.execute('Any D, COUNT(U) GROUPBY D WHERE U is CWUser, U creation_date D') |
|
553 self.assertEqual(rset.related_entity(0,0), (None, None)) |
|
554 |
|
555 def test_str(self): |
|
556 with self.admin_access.web_request() as req: |
|
557 rset = req.execute('(Any X,N WHERE X is CWGroup, X name N)') |
|
558 self.assertIsInstance(str(rset), string_types) |
|
559 self.assertEqual(len(str(rset).splitlines()), 1) |
|
560 |
|
561 def test_repr(self): |
|
562 with self.admin_access.web_request() as req: |
|
563 rset = req.execute('(Any X,N WHERE X is CWGroup, X name N)') |
|
564 self.assertIsInstance(repr(rset), string_types) |
|
565 self.assertTrue(len(repr(rset).splitlines()) > 1) |
|
566 |
|
567 rset = req.execute('(Any X WHERE X is CWGroup, X name "managers")') |
|
568 self.assertIsInstance(str(rset), string_types) |
|
569 self.assertEqual(len(str(rset).splitlines()), 1) |
|
570 |
|
571 def test_slice(self): |
|
572 rs = ResultSet([[12000, 'adim', u'Adim chez les pinguins'], |
|
573 [12000, 'adim', u'Jardiner facile'], |
|
574 [13000, 'syt', u'Le carrelage en 42 leçons'], |
|
575 [14000, 'nico', u'La tarte tatin en 15 minutes'], |
|
576 [14000, 'nico', u"L'épluchage du castor commun"]], |
|
577 'Any U, L, T WHERE U is CWUser, U login L,'\ |
|
578 'D created_by U, D title T', |
|
579 description=[['CWUser', 'String', 'String']] * 5) |
|
580 self.assertEqual(rs[1::2], |
|
581 [[12000, 'adim', u'Jardiner facile'], |
|
582 [14000, 'nico', u'La tarte tatin en 15 minutes']]) |
|
583 |
|
584 def test_nonregr_symmetric_relation(self): |
|
585 # see https://www.cubicweb.org/ticket/4739253 |
|
586 with self.admin_access.client_cnx() as cnx: |
|
587 p1 = cnx.create_entity('Personne', nom=u'sylvain') |
|
588 cnx.create_entity('Personne', nom=u'denis', connait=p1) |
|
589 cnx.commit() |
|
590 rset = cnx.execute('Any X,Y WHERE X connait Y') |
|
591 rset.get_entity(0, 1) # used to raise KeyError |
|
592 |
|
593 if __name__ == '__main__': |
|
594 unittest_main() |
|