|
1 # coding: utf-8 |
|
2 """unit tests for module cubicweb.common.utils""" |
|
3 |
|
4 from logilab.common.testlib import TestCase, unittest_main |
|
5 from cubicweb.devtools.apptest import EnvBasedTC |
|
6 |
|
7 from urlparse import urlsplit |
|
8 from rql import parse |
|
9 |
|
10 from cubicweb.rset import NotAnEntity, ResultSet, attr_desc_iterator |
|
11 |
|
12 |
|
13 def pprelcachedict(d): |
|
14 res = {} |
|
15 for k, (rset, related) in d.items(): |
|
16 res[k] = sorted(v.eid for v in related) |
|
17 return sorted(res.items()) |
|
18 |
|
19 |
|
20 class AttrDescIteratorTC(TestCase): |
|
21 """TestCase for cubicweb.rset.attr_desc_iterator""" |
|
22 |
|
23 def test_relations_description(self): |
|
24 """tests relations_description() function""" |
|
25 queries = { |
|
26 'Any U,L,M where U is EUser, U login L, U mail M' : [(1, 'login', 'subject'), (2, 'mail', 'subject')], |
|
27 'Any U,L,M where U is EUser, L is Foo, U mail M' : [(2, 'mail', 'subject')], |
|
28 'Any C,P where C is Company, C employs P' : [(1, 'employs', 'subject')], |
|
29 'Any C,P where C is Company, P employed_by P' : [], |
|
30 'Any C where C is Company, C employs P' : [], |
|
31 } |
|
32 for rql, relations in queries.items(): |
|
33 result = list(attr_desc_iterator(parse(rql).children[0])) |
|
34 self.assertEquals((rql, result), (rql, relations)) |
|
35 |
|
36 def test_relations_description_indexed(self): |
|
37 """tests relations_description() function""" |
|
38 queries = { |
|
39 'Any C,U,P,L,M where C is Company, C employs P, U is EUser, U login L, U mail M' : |
|
40 {0: [(2,'employs', 'subject')], 1: [(3,'login', 'subject'), (4,'mail', 'subject')]}, |
|
41 } |
|
42 for rql, results in queries.items(): |
|
43 for var_index, relations in results.items(): |
|
44 result = list(attr_desc_iterator(parse(rql).children[0], var_index)) |
|
45 self.assertEquals(result, relations) |
|
46 |
|
47 |
|
48 |
|
49 class ResultSetTC(EnvBasedTC): |
|
50 |
|
51 def setUp(self): |
|
52 super(ResultSetTC, self).setUp() |
|
53 self.rset = ResultSet([[12, 'adim'], [13, 'syt']], |
|
54 'Any U,L where U is EUser, U login L', |
|
55 description=[['EUser', 'String'], ['Bar', 'String']]) |
|
56 self.rset.vreg = self.vreg |
|
57 |
|
58 def compare_urls(self, url1, url2): |
|
59 info1 = urlsplit(url1) |
|
60 info2 = urlsplit(url2) |
|
61 self.assertEquals(info1[:3], info2[:3]) |
|
62 if info1[3] != info2[3]: |
|
63 params1 = dict(pair.split('=') for pair in info1[3].split('&')) |
|
64 params2 = dict(pair.split('=') for pair in info1[3].split('&')) |
|
65 self.assertDictEquals(params1, params2) |
|
66 |
|
67 |
|
68 def test_build_url(self): |
|
69 req = self.request() |
|
70 baseurl = req.base_url() |
|
71 self.compare_urls(req.build_url('view', vid='foo', rql='yo'), |
|
72 '%sview?vid=foo&rql=yo' % baseurl) |
|
73 self.compare_urls(req.build_url('view', _restpath='task/title/go'), |
|
74 '%stask/title/go' % baseurl) |
|
75 #self.compare_urls(req.build_url('view', _restpath='/task/title/go'), |
|
76 # '%stask/title/go' % baseurl) |
|
77 # empty _restpath should not crash |
|
78 self.compare_urls(req.build_url('view', _restpath=''), baseurl) |
|
79 |
|
80 |
|
81 def test_resultset_build(self): |
|
82 """test basic build of a ResultSet""" |
|
83 rs = ResultSet([1,2,3], 'EGroup X', description=['EGroup', 'EGroup', 'EGroup']) |
|
84 self.assertEquals(rs.rowcount, 3) |
|
85 self.assertEquals(rs.rows, [1,2,3]) |
|
86 self.assertEquals(rs.description, ['EGroup', 'EGroup', 'EGroup']) |
|
87 |
|
88 |
|
89 def test_resultset_limit(self): |
|
90 rs = ResultSet([[12000, 'adim'], [13000, 'syt'], [14000, 'nico']], |
|
91 'Any U,L where U is EUser, U login L', |
|
92 description=[['EUser', 'String']] * 3) |
|
93 rs.req = self.request() |
|
94 rs.vreg = self.env.vreg |
|
95 |
|
96 self.assertEquals(rs.limit(2).rows, [[12000, 'adim'], [13000, 'syt']]) |
|
97 rs2 = rs.limit(2, offset=1) |
|
98 self.assertEquals(rs2.rows, [[13000, 'syt'], [14000, 'nico']]) |
|
99 self.assertEquals(rs2.get_entity(0, 0).row, 0) |
|
100 self.assertEquals(rs.limit(2, offset=2).rows, [[14000, 'nico']]) |
|
101 self.assertEquals(rs.limit(2, offset=3).rows, []) |
|
102 |
|
103 |
|
104 def test_resultset_filter(self): |
|
105 rs = ResultSet([[12000, 'adim'], [13000, 'syt'], [14000, 'nico']], |
|
106 'Any U,L where U is EUser, U login L', |
|
107 description=[['EUser', 'String']] * 3) |
|
108 rs.req = self.request() |
|
109 rs.vreg = self.env.vreg |
|
110 def test_filter(entity): |
|
111 return entity.login != 'nico' |
|
112 |
|
113 rs2 = rs.filtered_rset(test_filter) |
|
114 self.assertEquals(len(rs2), 2) |
|
115 self.assertEquals([login for _, login in rs2], ['adim', 'syt']) |
|
116 |
|
117 def test_resultset_transform(self): |
|
118 rs = ResultSet([[12, 'adim'], [13, 'syt'], [14, 'nico']], |
|
119 'Any U,L where U is EUser, U login L', |
|
120 description=[['EUser', 'String']] * 3) |
|
121 rs.req = self.request() |
|
122 def test_transform(row, desc): |
|
123 return row[1:], desc[1:] |
|
124 rs2 = rs.transformed_rset(test_transform) |
|
125 |
|
126 self.assertEquals(len(rs2), 3) |
|
127 self.assertEquals(list(rs2), [['adim'],['syt'],['nico']]) |
|
128 |
|
129 def test_resultset_sort(self): |
|
130 rs = ResultSet([[12000, 'adim'], [13000, 'syt'], [14000, 'nico']], |
|
131 'Any U,L where U is EUser, U login L', |
|
132 description=[['EUser', 'String']] * 3) |
|
133 rs.req = self.request() |
|
134 rs.vreg = self.env.vreg |
|
135 |
|
136 rs2 = rs.sorted_rset(lambda e:e['login']) |
|
137 self.assertEquals(len(rs2), 3) |
|
138 self.assertEquals([login for _, login in rs2], ['adim', 'nico', 'syt']) |
|
139 # make sure rs is unchanged |
|
140 self.assertEquals([login for _, login in rs], ['adim', 'syt', 'nico']) |
|
141 |
|
142 rs2 = rs.sorted_rset(lambda e:e['login'], reverse=True) |
|
143 self.assertEquals(len(rs2), 3) |
|
144 self.assertEquals([login for _, login in rs2], ['syt', 'nico', 'adim']) |
|
145 # make sure rs is unchanged |
|
146 self.assertEquals([login for _, login in rs], ['adim', 'syt', 'nico']) |
|
147 |
|
148 rs3 = rs.sorted_rset(lambda row: row[1], col=-1) |
|
149 self.assertEquals(len(rs3), 3) |
|
150 self.assertEquals([login for _, login in rs3], ['adim', 'nico', 'syt']) |
|
151 # make sure rs is unchanged |
|
152 self.assertEquals([login for _, login in rs], ['adim', 'syt', 'nico']) |
|
153 |
|
154 def test_resultset_split(self): |
|
155 rs = ResultSet([[12000, 'adim', u'Adim chez les pinguins'], |
|
156 [12000, 'adim', u'Jardiner facile'], |
|
157 [13000, 'syt', u'Le carrelage en 42 leçons'], |
|
158 [14000, 'nico', u'La tarte tatin en 15 minutes'], |
|
159 [14000, 'nico', u"L'épluchage du castor commun"]], |
|
160 'Any U, L, T WHERE U is EUser, U login L,'\ |
|
161 'D created_by U, D title T', |
|
162 description=[['EUser', 'String', 'String']] * 5) |
|
163 rs.req = self.request() |
|
164 rs.vreg = self.env.vreg |
|
165 |
|
166 rsets = rs.split_rset(lambda e:e['login']) |
|
167 self.assertEquals(len(rsets), 3) |
|
168 self.assertEquals([login for _, login,_ in rsets[0]], ['adim', 'adim']) |
|
169 self.assertEquals([login for _, login,_ in rsets[1]], ['syt']) |
|
170 self.assertEquals([login for _, login,_ in rsets[2]], ['nico', 'nico']) |
|
171 # make sure rs is unchanged |
|
172 self.assertEquals([login for _, login,_ in rs], ['adim', 'adim', 'syt', 'nico', 'nico']) |
|
173 |
|
174 rsets = rs.split_rset(lambda e:e['login'], return_dict=True) |
|
175 self.assertEquals(len(rsets), 3) |
|
176 self.assertEquals([login for _, login,_ in rsets['nico']], ['nico', 'nico']) |
|
177 self.assertEquals([login for _, login,_ in rsets['adim']], ['adim', 'adim']) |
|
178 self.assertEquals([login for _, login,_ in rsets['syt']], ['syt']) |
|
179 # make sure rs is unchanged |
|
180 self.assertEquals([login for _, login,_ in rs], ['adim', 'adim', 'syt', 'nico', 'nico']) |
|
181 |
|
182 rsets = rs.split_rset(lambda s: s.count('d'), col=2) |
|
183 self.assertEquals(len(rsets), 2) |
|
184 self.assertEquals([title for _, _, title in rsets[0]], |
|
185 [u"Adim chez les pinguins", |
|
186 u"Jardiner facile", |
|
187 u"L'épluchage du castor commun",]) |
|
188 self.assertEquals([title for _, _, title in rsets[1]], |
|
189 [u"Le carrelage en 42 leçons", |
|
190 u"La tarte tatin en 15 minutes",]) |
|
191 # make sure rs is unchanged |
|
192 self.assertEquals([title for _, _, title in rs], |
|
193 [u'Adim chez les pinguins', |
|
194 u'Jardiner facile', |
|
195 u'Le carrelage en 42 leçons', |
|
196 u'La tarte tatin en 15 minutes', |
|
197 u"L'épluchage du castor commun"]) |
|
198 |
|
199 def test_cached_syntax_tree(self): |
|
200 """make sure syntax tree is cached""" |
|
201 rqlst1 = self.rset.syntax_tree() |
|
202 rqlst2 = self.rset.syntax_tree() |
|
203 self.assert_(rqlst1 is rqlst2) |
|
204 |
|
205 def test_get_entity_simple(self): |
|
206 self.add_entity('EUser', login=u'adim', upassword='adim', |
|
207 surname=u'di mascio', firstname=u'adrien') |
|
208 e = self.entity('Any X,T WHERE X login "adim", X surname T') |
|
209 self.assertEquals(e['surname'], 'di mascio') |
|
210 self.assertRaises(KeyError, e.__getitem__, 'firstname') |
|
211 self.assertRaises(KeyError, e.__getitem__, 'creation_date') |
|
212 self.assertEquals(pprelcachedict(e._related_cache), []) |
|
213 e.complete() |
|
214 self.assertEquals(e['firstname'], 'adrien') |
|
215 self.assertEquals(pprelcachedict(e._related_cache), []) |
|
216 |
|
217 def test_get_entity_advanced(self): |
|
218 self.add_entity('Bookmark', title=u'zou', path=u'/view') |
|
219 self.execute('SET X bookmarked_by Y WHERE X is Bookmark, Y login "anon"') |
|
220 rset = self.execute('Any X,Y,XT,YN WHERE X bookmarked_by Y, X title XT, Y login YN') |
|
221 |
|
222 e = rset.get_entity(0, 0) |
|
223 self.assertEquals(e.row, 0) |
|
224 self.assertEquals(e.col, 0) |
|
225 self.assertEquals(e['title'], 'zou') |
|
226 self.assertRaises(KeyError, e.__getitem__, 'path') |
|
227 self.assertEquals(e.view('text'), 'zou') |
|
228 self.assertEquals(pprelcachedict(e._related_cache), []) |
|
229 |
|
230 e = rset.get_entity(0, 1) |
|
231 self.assertEquals(e.row, 0) |
|
232 self.assertEquals(e.col, 1) |
|
233 self.assertEquals(e['login'], 'anon') |
|
234 self.assertRaises(KeyError, e.__getitem__, 'firstname') |
|
235 self.assertEquals(pprelcachedict(e._related_cache), |
|
236 []) |
|
237 e.complete() |
|
238 self.assertEquals(e['firstname'], None) |
|
239 self.assertEquals(e.view('text'), 'anon') |
|
240 self.assertEquals(pprelcachedict(e._related_cache), |
|
241 []) |
|
242 |
|
243 self.assertRaises(NotAnEntity, rset.get_entity, 0, 2) |
|
244 self.assertRaises(NotAnEntity, rset.get_entity, 0, 3) |
|
245 |
|
246 def test_get_entity_relation_cache_compt(self): |
|
247 rset = self.execute('Any X,S WHERE X in_state S, X login "anon"') |
|
248 e = rset.get_entity(0, 0) |
|
249 seid = self.execute('State X WHERE X name "activated"')[0][0] |
|
250 # for_user / in_group are prefetched in EUser __init__, in_state should |
|
251 # be filed from our query rset |
|
252 self.assertEquals(pprelcachedict(e._related_cache), |
|
253 [('in_state_subject', [seid])]) |
|
254 |
|
255 def test_get_entity_advanced_prefilled_cache(self): |
|
256 e = self.add_entity('Bookmark', title=u'zou', path=u'path') |
|
257 self.commit() |
|
258 rset = self.execute('Any X,U,S,XT,UL,SN WHERE X created_by U, U in_state S, ' |
|
259 'X title XT, S name SN, U login UL, X eid %s' % e.eid) |
|
260 e = rset.get_entity(0, 0) |
|
261 self.assertEquals(e['title'], 'zou') |
|
262 self.assertEquals(pprelcachedict(e._related_cache), |
|
263 [('created_by_subject', [5])]) |
|
264 # first level of recursion |
|
265 u = e.created_by[0] |
|
266 self.assertEquals(u['login'], 'admin') |
|
267 self.assertRaises(KeyError, u.__getitem__, 'firstname') |
|
268 # second level of recursion |
|
269 s = u.in_state[0] |
|
270 self.assertEquals(s['name'], 'activated') |
|
271 self.assertRaises(KeyError, s.__getitem__, 'description') |
|
272 |
|
273 |
|
274 def test_get_entity_cache_with_left_outer_join(self): |
|
275 eid = self.execute('INSERT EUser E: E login "joe", E upassword "joe", E in_group G ' |
|
276 'WHERE G name "users"')[0][0] |
|
277 rset = self.execute('Any X,E WHERE X eid %(x)s, X primary_email E?', {'x': eid}) |
|
278 e = rset.get_entity(0, 0) |
|
279 # if any of the assertion below fails with a KeyError, the relation is not cached |
|
280 # related entities should be an empty list |
|
281 self.assertEquals(e.related_cache('primary_email', 'subject', True), []) |
|
282 # related rset should be an empty rset |
|
283 cached = e.related_cache('primary_email', 'subject', False) |
|
284 self.assertIsInstance(cached, ResultSet) |
|
285 self.assertEquals(cached.rowcount, 0) |
|
286 |
|
287 |
|
288 def test_get_entity_union(self): |
|
289 e = self.add_entity('Bookmark', title=u'manger', path=u'path') |
|
290 rset = self.execute('Any X,N ORDERBY N WITH X,N BEING ' |
|
291 '((Any X,N WHERE X is Bookmark, X title N)' |
|
292 ' UNION ' |
|
293 ' (Any X,N WHERE X is EGroup, X name N))') |
|
294 expected = (('EGroup', 'guests'), ('EGroup', 'managers'), |
|
295 ('Bookmark', 'manger'), ('EGroup', 'owners'), |
|
296 ('EGroup', 'users')) |
|
297 for entity in rset.entities(): # test get_entity for each row actually |
|
298 etype, n = expected[entity.row] |
|
299 self.assertEquals(entity.id, etype) |
|
300 attr = etype == 'Bookmark' and 'title' or 'name' |
|
301 self.assertEquals(entity[attr], n) |
|
302 |
|
303 |
|
304 def test_related_entity_union_subquery(self): |
|
305 e = self.add_entity('Bookmark', title=u'aaaa', path=u'path') |
|
306 rset = self.execute('Any X,N ORDERBY N WITH X,N BEING ' |
|
307 '((Any X,N WHERE X is EGroup, X name N)' |
|
308 ' UNION ' |
|
309 ' (Any X,N WHERE X is Bookmark, X title N))') |
|
310 entity, rtype = rset.related_entity(0, 1) |
|
311 self.assertEquals(entity.eid, e.eid) |
|
312 self.assertEquals(rtype, 'title') |
|
313 entity, rtype = rset.related_entity(1, 1) |
|
314 self.assertEquals(entity.id, 'EGroup') |
|
315 self.assertEquals(rtype, 'name') |
|
316 rset = self.execute('Any X,N ORDERBY N WHERE X is Bookmark WITH X,N BEING ' |
|
317 '((Any X,N WHERE X is EGroup, X name N)' |
|
318 ' UNION ' |
|
319 ' (Any X,N WHERE X is Bookmark, X title N))') |
|
320 entity, rtype = rset.related_entity(0, 1) |
|
321 self.assertEquals(entity.eid, e.eid) |
|
322 self.assertEquals(rtype, 'title') |
|
323 |
|
324 def test_entities(self): |
|
325 rset = self.execute('Any U,G WHERE U in_group G') |
|
326 # make sure we have at least one element |
|
327 self.failUnless(rset) |
|
328 self.assertEquals(set(e.e_schema.type for e in rset.entities(0)), |
|
329 set(['EUser',])) |
|
330 self.assertEquals(set(e.e_schema.type for e in rset.entities(1)), |
|
331 set(['EGroup',])) |
|
332 |
|
333 def test_printable_rql(self): |
|
334 rset = self.execute(u'EEType X WHERE X final FALSE, X meta FALSE') |
|
335 self.assertEquals(rset.printable_rql(), |
|
336 'Any X WHERE X final FALSE, X meta FALSE, X is EEType') |
|
337 |
|
338 |
|
339 def test_searched_text(self): |
|
340 rset = self.execute(u'Any X WHERE X has_text "foobar"') |
|
341 self.assertEquals(rset.searched_text(), 'foobar') |
|
342 rset = self.execute(u'Any X WHERE X has_text %(text)s', {'text' : 'foo'}) |
|
343 self.assertEquals(rset.searched_text(), 'foo') |
|
344 |
|
345 |
|
346 if __name__ == '__main__': |
|
347 unittest_main() |