1 # copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
3 # |
|
4 # This file is part of CubicWeb. |
|
5 # |
|
6 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
7 # terms of the GNU Lesser General Public License as published by the Free |
|
8 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
9 # any later version. |
|
10 # |
|
11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
13 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
14 # details. |
|
15 # |
|
16 # You should have received a copy of the GNU Lesser General Public License along |
|
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
18 |
|
19 from datetime import datetime, timedelta |
|
20 from itertools import repeat |
|
21 |
|
22 from cubicweb.devtools import TestServerConfiguration, init_test_database |
|
23 from cubicweb.devtools.testlib import CubicWebTC, Tags |
|
24 from cubicweb.devtools.repotest import do_monkey_patch, undo_monkey_patch |
|
25 from cubicweb.devtools import get_test_db_handler |
|
26 |
|
27 class ExternalSource1Configuration(TestServerConfiguration): |
|
28 sourcefile = 'sources_extern' |
|
29 |
|
30 class ExternalSource2Configuration(TestServerConfiguration): |
|
31 sourcefile = 'sources_multi' |
|
32 |
|
33 MTIME = datetime.utcnow() - timedelta(0, 10) |
|
34 |
|
35 EXTERN_SOURCE_CFG = u''' |
|
36 cubicweb-user = admin |
|
37 cubicweb-password = gingkow |
|
38 base-url=http://extern.org/ |
|
39 ''' |
|
40 |
|
41 # hi-jacking |
|
42 from cubicweb.server.sources.pyrorql import PyroRQLSource |
|
43 from cubicweb.dbapi import Connection |
|
44 |
|
45 PyroRQLSource_get_connection = PyroRQLSource.get_connection |
|
46 Connection_close = Connection.close |
|
47 |
|
48 def add_extern_mapping(source): |
|
49 source.init_mapping(zip(('Card', 'Affaire', 'State', |
|
50 'in_state', 'documented_by', 'multisource_inlined_rel'), |
|
51 repeat(u'write'))) |
|
52 |
|
53 |
|
54 def pre_setup_database_extern(session, config): |
|
55 session.execute('INSERT Card X: X title "C3: An external card", X wikiid "aaa"') |
|
56 session.execute('INSERT Card X: X title "C4: Ze external card", X wikiid "zzz"') |
|
57 session.execute('INSERT Affaire X: X ref "AFFREF"') |
|
58 session.commit() |
|
59 |
|
60 def pre_setup_database_multi(session, config): |
|
61 session.create_entity('CWSource', name=u'extern', type=u'pyrorql', |
|
62 url=u'pyro:///extern', config=EXTERN_SOURCE_CFG) |
|
63 session.commit() |
|
64 |
|
65 |
|
66 class TwoSourcesTC(CubicWebTC): |
|
67 """Main repo -> extern-multi -> extern |
|
68 \-------------/ |
|
69 """ |
|
70 test_db_id= 'cw-server-multisources' |
|
71 tags = CubicWebTC.tags | Tags(('multisources')) |
|
72 |
|
73 |
|
74 def _init_repo(self): |
|
75 repo2_handler = get_test_db_handler(self._cfg2) |
|
76 repo2_handler.build_db_cache('4cards-1affaire',pre_setup_func=pre_setup_database_extern) |
|
77 self.repo2, self.cnx2 = repo2_handler.get_repo_and_cnx('4cards-1affaire') |
|
78 |
|
79 repo3_handler = get_test_db_handler(self._cfg3) |
|
80 repo3_handler.build_db_cache('multisource',pre_setup_func=pre_setup_database_multi) |
|
81 self.repo3, self.cnx3 = repo3_handler.get_repo_and_cnx('multisource') |
|
82 |
|
83 |
|
84 super(TwoSourcesTC, self)._init_repo() |
|
85 |
|
86 def setUp(self): |
|
87 self._cfg2 = ExternalSource1Configuration('data', apphome=TwoSourcesTC.datadir) |
|
88 self._cfg3 = ExternalSource2Configuration('data', apphome=TwoSourcesTC.datadir) |
|
89 TestServerConfiguration.no_sqlite_wrap = True |
|
90 # hi-jack PyroRQLSource.get_connection to access existing connection (no |
|
91 # pyro connection) |
|
92 PyroRQLSource.get_connection = lambda x: x.uri == 'extern-multi' and self.cnx3 or self.cnx2 |
|
93 # also necessary since the repository is closing its initial connections |
|
94 # pool though we want to keep cnx2 valid |
|
95 Connection.close = lambda x: None |
|
96 CubicWebTC.setUp(self) |
|
97 self.addCleanup(self.cnx2.close) |
|
98 self.addCleanup(self.cnx3.close) |
|
99 do_monkey_patch() |
|
100 |
|
101 def tearDown(self): |
|
102 for source in self.repo.sources[1:]: |
|
103 self.repo.remove_source(source.uri) |
|
104 super(TwoSourcesTC, self).tearDown() |
|
105 PyroRQLSource.get_connection = PyroRQLSource_get_connection |
|
106 Connection.close = Connection_close |
|
107 TestServerConfiguration.no_sqlite_wrap = False |
|
108 undo_monkey_patch() |
|
109 |
|
110 @staticmethod |
|
111 def pre_setup_database(session, config): |
|
112 for uri, src_config in [('extern', EXTERN_SOURCE_CFG), |
|
113 ('extern-multi', ''' |
|
114 cubicweb-user = admin |
|
115 cubicweb-password = gingkow |
|
116 ''')]: |
|
117 source = session.create_entity('CWSource', name=unicode(uri), |
|
118 type=u'pyrorql', url=u'pyro:///extern-multi', |
|
119 config=unicode(src_config)) |
|
120 session.commit() |
|
121 add_extern_mapping(source) |
|
122 |
|
123 session.commit() |
|
124 # trigger discovery |
|
125 session.execute('Card X') |
|
126 session.execute('Affaire X') |
|
127 session.execute('State X') |
|
128 |
|
129 def setup_database(self): |
|
130 cu2 = self.cnx2.cursor() |
|
131 self.ec1 = cu2.execute('Any X WHERE X is Card, X title "C3: An external card", X wikiid "aaa"')[0][0] |
|
132 self.aff1 = cu2.execute('Any X WHERE X is Affaire, X ref "AFFREF"')[0][0] |
|
133 cu2.close() |
|
134 # add some entities |
|
135 self.ic1 = self.sexecute('INSERT Card X: X title "C1: An internal card", X wikiid "aaai"')[0][0] |
|
136 self.ic2 = self.sexecute('INSERT Card X: X title "C2: Ze internal card", X wikiid "zzzi"')[0][0] |
|
137 |
|
138 def test_eid_comp(self): |
|
139 rset = self.sexecute('Card X WHERE X eid > 1') |
|
140 self.assertEqual(len(rset), 4) |
|
141 rset = self.sexecute('Any X,T WHERE X title T, X eid > 1') |
|
142 self.assertEqual(len(rset), 4) |
|
143 |
|
144 def test_metainformation(self): |
|
145 rset = self.sexecute('Card X ORDERBY T WHERE X title T') |
|
146 # 2 added to the system source, 2 added to the external source |
|
147 self.assertEqual(len(rset), 4) |
|
148 # since they are orderd by eid, we know the 3 first one is coming from the system source |
|
149 # and the others from external source |
|
150 self.assertEqual(rset.get_entity(0, 0).cw_metainformation(), |
|
151 {'source': {'type': 'native', 'uri': 'system', 'use-cwuri-as-url': False}, |
|
152 'type': u'Card', 'extid': None}) |
|
153 externent = rset.get_entity(3, 0) |
|
154 metainf = externent.cw_metainformation() |
|
155 self.assertEqual(metainf['source'], {'type': 'pyrorql', 'base-url': 'http://extern.org/', 'uri': 'extern', 'use-cwuri-as-url': False}) |
|
156 self.assertEqual(metainf['type'], 'Card') |
|
157 self.assert_(metainf['extid']) |
|
158 etype = self.sexecute('Any ETN WHERE X is ET, ET name ETN, X eid %(x)s', |
|
159 {'x': externent.eid})[0][0] |
|
160 self.assertEqual(etype, 'Card') |
|
161 |
|
162 def test_order_limit_offset(self): |
|
163 rsetbase = self.sexecute('Any W,X ORDERBY W,X WHERE X wikiid W') |
|
164 self.assertEqual(len(rsetbase), 4) |
|
165 self.assertEqual(sorted(rsetbase.rows), rsetbase.rows) |
|
166 rset = self.sexecute('Any W,X ORDERBY W,X LIMIT 2 OFFSET 2 WHERE X wikiid W') |
|
167 self.assertEqual(rset.rows, rsetbase.rows[2:4]) |
|
168 |
|
169 def test_has_text(self): |
|
170 self.repo.sources_by_uri['extern'].synchronize(MTIME) # in case fti_update has been run before |
|
171 self.assertTrue(self.sexecute('Any X WHERE X has_text "affref"')) |
|
172 self.assertTrue(self.sexecute('Affaire X WHERE X has_text "affref"')) |
|
173 self.assertTrue(self.sexecute('Any X ORDERBY FTIRANK(X) WHERE X has_text "affref"')) |
|
174 self.assertTrue(self.sexecute('Affaire X ORDERBY FTIRANK(X) WHERE X has_text "affref"')) |
|
175 |
|
176 def test_anon_has_text(self): |
|
177 self.repo.sources_by_uri['extern'].synchronize(MTIME) # in case fti_update has been run before |
|
178 self.sexecute('INSERT Affaire X: X ref "no readable card"')[0][0] |
|
179 aff1 = self.sexecute('INSERT Affaire X: X ref "card"')[0][0] |
|
180 # grant read access |
|
181 self.sexecute('SET X owned_by U WHERE X eid %(x)s, U login "anon"', {'x': aff1}) |
|
182 self.commit() |
|
183 cnx = self.login('anon') |
|
184 cu = cnx.cursor() |
|
185 rset = cu.execute('Any X WHERE X has_text "card"') |
|
186 # 5: 4 card + 1 readable affaire |
|
187 self.assertEqual(len(rset), 5, zip(rset.rows, rset.description)) |
|
188 rset = cu.execute('Any X ORDERBY FTIRANK(X) WHERE X has_text "card"') |
|
189 self.assertEqual(len(rset), 5, zip(rset.rows, rset.description)) |
|
190 Connection_close(cnx.cnx) # cnx is a TestCaseConnectionProxy |
|
191 |
|
192 def test_synchronization(self): |
|
193 cu = self.cnx2.cursor() |
|
194 assert cu.execute('Any X WHERE X eid %(x)s', {'x': self.aff1}) |
|
195 cu.execute('SET X ref "BLAH" WHERE X eid %(x)s', {'x': self.aff1}) |
|
196 aff2 = cu.execute('INSERT Affaire X: X ref "AFFREUX"')[0][0] |
|
197 self.cnx2.commit() |
|
198 try: |
|
199 # force sync |
|
200 self.repo.sources_by_uri['extern'].synchronize(MTIME) |
|
201 self.assertTrue(self.sexecute('Any X WHERE X has_text "blah"')) |
|
202 self.assertTrue(self.sexecute('Any X WHERE X has_text "affreux"')) |
|
203 cu.execute('DELETE Affaire X WHERE X eid %(x)s', {'x': aff2}) |
|
204 self.cnx2.commit() |
|
205 self.repo.sources_by_uri['extern'].synchronize(MTIME) |
|
206 rset = self.sexecute('Any X WHERE X has_text "affreux"') |
|
207 self.assertFalse(rset) |
|
208 finally: |
|
209 # restore state |
|
210 cu.execute('SET X ref "AFFREF" WHERE X eid %(x)s', {'x': self.aff1}) |
|
211 self.cnx2.commit() |
|
212 |
|
213 def test_simplifiable_var(self): |
|
214 affeid = self.sexecute('Affaire X WHERE X ref "AFFREF"')[0][0] |
|
215 rset = self.sexecute('Any X,AA,AB WHERE E eid %(x)s, E in_state X, X name AA, X modification_date AB', |
|
216 {'x': affeid}) |
|
217 self.assertEqual(len(rset), 1) |
|
218 self.assertEqual(rset[0][1], "pitetre") |
|
219 |
|
220 def test_simplifiable_var_2(self): |
|
221 affeid = self.sexecute('Affaire X WHERE X ref "AFFREF"')[0][0] |
|
222 rset = self.sexecute('Any E WHERE E eid %(x)s, E in_state S, NOT S name "moved"', |
|
223 {'x': affeid, 'u': self.session.user.eid}) |
|
224 self.assertEqual(len(rset), 1) |
|
225 |
|
226 def test_sort_func(self): |
|
227 self.sexecute('Affaire X ORDERBY DUMB_SORT(RF) WHERE X ref RF') |
|
228 |
|
229 def test_sort_func_ambigous(self): |
|
230 self.sexecute('Any X ORDERBY DUMB_SORT(RF) WHERE X title RF') |
|
231 |
|
232 def test_in_eid(self): |
|
233 iec1 = self.repo.extid2eid(self.repo.sources_by_uri['extern'], str(self.ec1), |
|
234 'Card', self.session) |
|
235 rset = self.sexecute('Any X WHERE X eid IN (%s, %s)' % (iec1, self.ic1)) |
|
236 self.assertEqual(sorted(r[0] for r in rset.rows), sorted([iec1, self.ic1])) |
|
237 |
|
238 def test_greater_eid(self): |
|
239 rset = self.sexecute('Any X WHERE X eid > %s' % (self.ic1 - 1)) |
|
240 self.assertEqual(len(rset.rows), 2) # self.ic1 and self.ic2 |
|
241 cu = self.cnx2.cursor() |
|
242 ec2 = cu.execute('INSERT Card X: X title "glup"')[0][0] |
|
243 self.cnx2.commit() |
|
244 # 'X eid > something' should not trigger discovery |
|
245 rset = self.sexecute('Any X WHERE X eid > %s' % (self.ic1 - 1)) |
|
246 self.assertEqual(len(rset.rows), 2) |
|
247 # trigger discovery using another query |
|
248 crset = self.sexecute('Card X WHERE X title "glup"') |
|
249 self.assertEqual(len(crset.rows), 1) |
|
250 rset = self.sexecute('Any X WHERE X eid > %s' % (self.ic1 - 1)) |
|
251 self.assertEqual(len(rset.rows), 3) |
|
252 rset = self.sexecute('Any MAX(X)') |
|
253 self.assertEqual(len(rset.rows), 1) |
|
254 self.assertEqual(rset.rows[0][0], crset[0][0]) |
|
255 |
|
256 def test_attr_unification_1(self): |
|
257 n1 = self.sexecute('INSERT Note X: X type "AFFREF"')[0][0] |
|
258 n2 = self.sexecute('INSERT Note X: X type "AFFREU"')[0][0] |
|
259 rset = self.sexecute('Any X,Y WHERE X is Note, Y is Affaire, X type T, Y ref T') |
|
260 self.assertEqual(len(rset), 1, rset.rows) |
|
261 |
|
262 def test_attr_unification_2(self): |
|
263 cu = self.cnx2.cursor() |
|
264 ec2 = cu.execute('INSERT Card X: X title "AFFREF"')[0][0] |
|
265 self.cnx2.commit() |
|
266 try: |
|
267 c1 = self.sexecute('INSERT Card C: C title "AFFREF"')[0][0] |
|
268 rset = self.sexecute('Any X,Y WHERE X is Card, Y is Affaire, X title T, Y ref T') |
|
269 self.assertEqual(len(rset), 2, rset.rows) |
|
270 finally: |
|
271 cu.execute('DELETE Card X WHERE X eid %(x)s', {'x': ec2}) |
|
272 self.cnx2.commit() |
|
273 |
|
274 def test_attr_unification_neq_1(self): |
|
275 # XXX complete |
|
276 self.sexecute('Any X,Y WHERE X is Note, Y is Affaire, X creation_date D, Y creation_date > D') |
|
277 |
|
278 def test_attr_unification_neq_2(self): |
|
279 # XXX complete |
|
280 self.sexecute('Any X,Y WHERE X is Card, Y is Affaire, X creation_date D, Y creation_date > D') |
|
281 |
|
282 def test_union(self): |
|
283 afeids = self.sexecute('Affaire X') |
|
284 ueids = self.sexecute('CWUser X') |
|
285 rset = self.sexecute('(Any X WHERE X is Affaire) UNION (Any X WHERE X is CWUser)') |
|
286 self.assertEqual(sorted(r[0] for r in rset.rows), |
|
287 sorted(r[0] for r in afeids + ueids)) |
|
288 |
|
289 def test_subquery1(self): |
|
290 rsetbase = self.sexecute('Any W,X WITH W,X BEING (Any W,X ORDERBY W,X WHERE X wikiid W)') |
|
291 self.assertEqual(len(rsetbase), 4) |
|
292 self.assertEqual(sorted(rsetbase.rows), rsetbase.rows) |
|
293 rset = self.sexecute('Any W,X LIMIT 2 OFFSET 2 WITH W,X BEING (Any W,X ORDERBY W,X WHERE X wikiid W)') |
|
294 self.assertEqual(rset.rows, rsetbase.rows[2:4]) |
|
295 rset = self.sexecute('Any W,X ORDERBY W,X LIMIT 2 OFFSET 2 WITH W,X BEING (Any W,X WHERE X wikiid W)') |
|
296 self.assertEqual(rset.rows, rsetbase.rows[2:4]) |
|
297 rset = self.sexecute('Any W,X WITH W,X BEING (Any W,X ORDERBY W,X LIMIT 2 OFFSET 2 WHERE X wikiid W)') |
|
298 self.assertEqual(rset.rows, rsetbase.rows[2:4]) |
|
299 |
|
300 def test_subquery2(self): |
|
301 affeid = self.sexecute('Affaire X WHERE X ref "AFFREF"')[0][0] |
|
302 rset = self.sexecute('Any X,AA,AB WITH X,AA,AB BEING (Any X,AA,AB WHERE E eid %(x)s, E in_state X, X name AA, X modification_date AB)', |
|
303 {'x': affeid}) |
|
304 self.assertEqual(len(rset), 1) |
|
305 self.assertEqual(rset[0][1], "pitetre") |
|
306 |
|
307 def test_not_relation(self): |
|
308 states = set(tuple(x) for x in self.sexecute('Any S,SN WHERE S is State, S name SN')) |
|
309 userstate = self.session.user.in_state[0] |
|
310 states.remove((userstate.eid, userstate.name)) |
|
311 notstates = set(tuple(x) for x in self.sexecute('Any S,SN WHERE S is State, S name SN, NOT X in_state S, X eid %(x)s', |
|
312 {'x': self.session.user.eid})) |
|
313 self.assertSetEqual(notstates, states) |
|
314 aff1 = self.sexecute('Any X WHERE X is Affaire, X ref "AFFREF"')[0][0] |
|
315 aff1stateeid, aff1statename = self.sexecute('Any S,SN WHERE X eid %(x)s, X in_state S, S name SN', {'x': aff1})[0] |
|
316 self.assertEqual(aff1statename, 'pitetre') |
|
317 states.add((userstate.eid, userstate.name)) |
|
318 states.remove((aff1stateeid, aff1statename)) |
|
319 notstates = set(tuple(x) for x in self.sexecute('Any S,SN WHERE S is State, S name SN, NOT X in_state S, X eid %(x)s', |
|
320 {'x': aff1})) |
|
321 self.assertSetEqual(notstates, states) |
|
322 |
|
323 def test_absolute_url_base_url(self): |
|
324 cu = self.cnx2.cursor() |
|
325 ceid = cu.execute('INSERT Card X: X title "without wikiid to get eid based url"')[0][0] |
|
326 self.cnx2.commit() |
|
327 lc = self.sexecute('Card X WHERE X title "without wikiid to get eid based url"').get_entity(0, 0) |
|
328 self.assertEqual(lc.absolute_url(), 'http://extern.org/%s' % ceid) |
|
329 cu.execute('DELETE Card X WHERE X eid %(x)s', {'x':ceid}) |
|
330 self.cnx2.commit() |
|
331 |
|
332 def test_absolute_url_no_base_url(self): |
|
333 cu = self.cnx3.cursor() |
|
334 ceid = cu.execute('INSERT Card X: X title "without wikiid to get eid based url"')[0][0] |
|
335 self.cnx3.commit() |
|
336 lc = self.sexecute('Card X WHERE X title "without wikiid to get eid based url"').get_entity(0, 0) |
|
337 self.assertEqual(lc.absolute_url(), 'http://testing.fr/cubicweb/%s' % lc.eid) |
|
338 cu.execute('DELETE Card X WHERE X eid %(x)s', {'x':ceid}) |
|
339 self.cnx3.commit() |
|
340 |
|
341 def test_crossed_relation_noeid_needattr(self): |
|
342 """http://www.cubicweb.org/ticket/1382452""" |
|
343 aff1 = self.sexecute('INSERT Affaire X: X ref "AFFREF"')[0][0] |
|
344 # link within extern source |
|
345 ec1 = self.sexecute('Card X WHERE X wikiid "zzz"')[0][0] |
|
346 self.sexecute('SET A documented_by C WHERE E eid %(a)s, C eid %(c)s', |
|
347 {'a': aff1, 'c': ec1}) |
|
348 # link from system to extern source |
|
349 self.sexecute('SET A documented_by C WHERE E eid %(a)s, C eid %(c)s', |
|
350 {'a': aff1, 'c': self.ic2}) |
|
351 rset = self.sexecute('DISTINCT Any DEP WHERE P ref "AFFREF", P documented_by DEP, DEP wikiid LIKE "z%"') |
|
352 self.assertEqual(sorted(rset.rows), [[ec1], [self.ic2]]) |
|
353 |
|
354 def test_nonregr1(self): |
|
355 ueid = self.session.user.eid |
|
356 affaire = self.sexecute('Affaire X WHERE X ref "AFFREF"').get_entity(0, 0) |
|
357 self.sexecute('Any U WHERE U in_group G, (G name IN ("managers", "logilab") OR (X require_permission P?, P name "bla", P require_group G)), X eid %(x)s, U eid %(u)s', |
|
358 {'x': affaire.eid, 'u': ueid}) |
|
359 |
|
360 def test_nonregr2(self): |
|
361 iworkflowable = self.session.user.cw_adapt_to('IWorkflowable') |
|
362 iworkflowable.fire_transition('deactivate') |
|
363 treid = iworkflowable.latest_trinfo().eid |
|
364 rset = self.sexecute('Any X ORDERBY D DESC WHERE E eid %(x)s, E wf_info_for X, X modification_date D', |
|
365 {'x': treid}) |
|
366 self.assertEqual(len(rset), 1) |
|
367 self.assertEqual(rset.rows[0], [self.session.user.eid]) |
|
368 |
|
369 def test_nonregr3(self): |
|
370 self.sexecute('DELETE Card X WHERE X eid %(x)s, NOT X multisource_inlined_rel Y', {'x': self.ic1}) |
|
371 |
|
372 def test_nonregr4(self): |
|
373 self.sexecute('Any X,S,U WHERE X in_state S, X todo_by U') |
|
374 |
|
375 def test_delete_source(self): |
|
376 req = self.request() |
|
377 req.execute('DELETE CWSource S WHERE S name "extern"') |
|
378 self.commit() |
|
379 cu = self.session.system_sql("SELECT * FROM entities WHERE source='extern'") |
|
380 self.assertFalse(cu.fetchall()) |
|
381 |
|
382 if __name__ == '__main__': |
|
383 from logilab.common.testlib import unittest_main |
|
384 unittest_main() |
|