|
1 from os.path import dirname, join, abspath |
|
2 from logilab.common.decorators import cached |
|
3 from mx.DateTime import now |
|
4 |
|
5 from cubicweb.devtools import TestServerConfiguration, init_test_database |
|
6 from cubicweb.devtools.apptest import RepositoryBasedTC |
|
7 from cubicweb.devtools.repotest import do_monkey_patch, undo_monkey_patch |
|
8 |
|
9 class TwoSourcesConfiguration(TestServerConfiguration): |
|
10 sourcefile = 'sources_multi' |
|
11 |
|
12 |
|
13 class ExternalSourceConfiguration(TestServerConfiguration): |
|
14 sourcefile = 'sources_extern' |
|
15 |
|
16 repo2, cnx2 = init_test_database('sqlite', config=ExternalSourceConfiguration('data')) |
|
17 cu = cnx2.cursor() |
|
18 ec1 = cu.execute('INSERT Card X: X title "C3: An external card", X wikiid "aaa"')[0][0] |
|
19 cu.execute('INSERT Card X: X title "C4: Ze external card", X wikiid "zzz"') |
|
20 aff1 = cu.execute('INSERT Affaire X: X ref "AFFREF", X in_state S WHERE S name "pitetre"')[0][0] |
|
21 cnx2.commit() |
|
22 |
|
23 MTIME = now() - 0.1 |
|
24 |
|
25 # XXX, access existing connection, no pyro connection |
|
26 from cubicweb.server.sources.pyrorql import PyroRQLSource |
|
27 PyroRQLSource.get_connection = lambda x: cnx2 |
|
28 # necessary since the repository is closing its initial connections pool though |
|
29 # we want to keep cnx2 valid |
|
30 from cubicweb.dbapi import Connection |
|
31 Connection.close = lambda x: None |
|
32 |
|
33 class TwoSourcesTC(RepositoryBasedTC): |
|
34 repo_config = TwoSourcesConfiguration('data') |
|
35 |
|
36 def setUp(self): |
|
37 RepositoryBasedTC.setUp(self) |
|
38 # trigger discovery |
|
39 self.execute('Card X') |
|
40 self.execute('Affaire X') |
|
41 self.execute('State X') |
|
42 self.commit() |
|
43 # don't delete external entities! |
|
44 self.maxeid = self.session.system_sql('SELECT MAX(eid) FROM entities').fetchone()[0] |
|
45 # add some entities |
|
46 self.ic1 = self.execute('INSERT Card X: X title "C1: An internal card", X wikiid "aaai"')[0][0] |
|
47 self.ic2 = self.execute('INSERT Card X: X title "C2: Ze internal card", X wikiid "zzzi"')[0][0] |
|
48 self.commit() |
|
49 do_monkey_patch() |
|
50 |
|
51 def tearDown(self): |
|
52 RepositoryBasedTC.tearDown(self) |
|
53 undo_monkey_patch() |
|
54 |
|
55 def test_eid_comp(self): |
|
56 rset = self.execute('Card X WHERE X eid > 1') |
|
57 self.assertEquals(len(rset), 4) |
|
58 rset = self.execute('Any X,T WHERE X title T, X eid > 1') |
|
59 self.assertEquals(len(rset), 4) |
|
60 |
|
61 def test_metainformation(self): |
|
62 rset = self.execute('Card X ORDERBY T WHERE X title T') |
|
63 # 2 added to the system source, 2 added to the external source |
|
64 self.assertEquals(len(rset), 4) |
|
65 # since they are orderd by eid, we know the 3 first one is coming from the system source |
|
66 # and the others from external source |
|
67 self.assertEquals(rset.get_entity(0, 0).metainformation(), |
|
68 {'source': {'adapter': 'native', 'uri': 'system'}, |
|
69 'type': u'Card', 'extid': None}) |
|
70 externent = rset.get_entity(3, 0) |
|
71 metainf = externent.metainformation() |
|
72 self.assertEquals(metainf['source'], {'adapter': 'pyrorql', 'uri': 'extern'}) |
|
73 self.assertEquals(metainf['type'], 'Card') |
|
74 self.assert_(metainf['extid']) |
|
75 etype = self.execute('Any ETN WHERE X is ET, ET name ETN, X eid %(x)s', |
|
76 {'x': externent.eid}, 'x')[0][0] |
|
77 self.assertEquals(etype, 'Card') |
|
78 |
|
79 def test_order_limit_offset(self): |
|
80 rsetbase = self.execute('Any W,X ORDERBY W,X WHERE X wikiid W') |
|
81 self.assertEquals(len(rsetbase), 4) |
|
82 self.assertEquals(sorted(rsetbase.rows), rsetbase.rows) |
|
83 rset = self.execute('Any W,X ORDERBY W,X LIMIT 2 OFFSET 2 WHERE X wikiid W') |
|
84 self.assertEquals(rset.rows, rsetbase.rows[2:4]) |
|
85 |
|
86 def test_has_text(self): |
|
87 self.repo.sources[-1].synchronize(MTIME) # in case fti_update has been run before |
|
88 self.failUnless(self.execute('Any X WHERE X has_text "affref"')) |
|
89 self.failUnless(self.execute('Affaire X WHERE X has_text "affref"')) |
|
90 |
|
91 def test_anon_has_text(self): |
|
92 self.repo.sources[-1].synchronize(MTIME) # in case fti_update has been run before |
|
93 self.execute('INSERT Affaire X: X ref "no readable card"')[0][0] |
|
94 aff1 = self.execute('INSERT Affaire X: X ref "card"')[0][0] |
|
95 # grant read access |
|
96 self.execute('SET X owned_by U WHERE X eid %(x)s, U login "anon"', {'x': aff1}, 'x') |
|
97 self.commit() |
|
98 cnx = self.login('anon') |
|
99 cu = cnx.cursor() |
|
100 rset = cu.execute('Any X WHERE X has_text "card"') |
|
101 self.assertEquals(len(rset), 5) |
|
102 |
|
103 def test_synchronization(self): |
|
104 cu = cnx2.cursor() |
|
105 cu.execute('SET X ref "BLAH" WHERE X eid %(x)s', {'x': aff1}, 'x') |
|
106 aff2 = cu.execute('INSERT Affaire X: X ref "AFFREUX", X in_state S WHERE S name "pitetre"')[0][0] |
|
107 cnx2.commit() |
|
108 try: |
|
109 # force sync |
|
110 self.repo.sources[-1].synchronize(MTIME) |
|
111 self.failUnless(self.execute('Any X WHERE X has_text "blah"')) |
|
112 self.failUnless(self.execute('Any X WHERE X has_text "affreux"')) |
|
113 cu.execute('DELETE Affaire X WHERE X eid %(x)s', {'x': aff2}) |
|
114 cnx2.commit() |
|
115 self.repo.sources[-1].synchronize(MTIME) |
|
116 self.failIf(self.execute('Any X WHERE X has_text "affreux"')) |
|
117 finally: |
|
118 # restore state |
|
119 cu.execute('SET X ref "AFFREF" WHERE X eid %(x)s', {'x': aff1}, 'x') |
|
120 cnx2.commit() |
|
121 |
|
122 def test_simplifiable_var(self): |
|
123 affeid = self.execute('Affaire X WHERE X ref "AFFREF"')[0][0] |
|
124 rset = self.execute('Any X,AA,AB WHERE E eid %(x)s, E in_state X, X name AA, X modification_date AB', |
|
125 {'x': affeid}, 'x') |
|
126 self.assertEquals(len(rset), 1) |
|
127 self.assertEquals(rset[0][1], "pitetre") |
|
128 |
|
129 def test_simplifiable_var_2(self): |
|
130 affeid = self.execute('Affaire X WHERE X ref "AFFREF"')[0][0] |
|
131 rset = self.execute('Any E WHERE E eid %(x)s, E in_state S, NOT S name "moved"', |
|
132 {'x': affeid, 'u': self.session.user.eid}, 'x') |
|
133 self.assertEquals(len(rset), 1) |
|
134 |
|
135 def test_sort_func(self): |
|
136 self.execute('Affaire X ORDERBY DUMB_SORT(RF) WHERE X ref RF') |
|
137 |
|
138 def test_sort_func_ambigous(self): |
|
139 self.execute('Any X ORDERBY DUMB_SORT(RF) WHERE X title RF') |
|
140 |
|
141 def test_in_eid(self): |
|
142 iec1 = self.repo.extid2eid(self.repo.sources[-1], ec1, 'Card', self.session) |
|
143 rset = self.execute('Any X WHERE X eid IN (%s, %s)' % (iec1, self.ic1)) |
|
144 self.assertEquals(sorted(r[0] for r in rset.rows), sorted([iec1, self.ic1])) |
|
145 |
|
146 def test_greater_eid(self): |
|
147 rset = self.execute('Any X WHERE X eid > %s' % self.maxeid) |
|
148 self.assertEquals(len(rset.rows), 2) # self.ic1 and self.ic2 |
|
149 ec2 = cu.execute('INSERT Card X: X title "glup"')[0][0] |
|
150 cnx2.commit() |
|
151 # 'X eid > something' should not trigger discovery |
|
152 rset = self.execute('Any X WHERE X eid > %s' % self.maxeid) |
|
153 self.assertEquals(len(rset.rows), 2) |
|
154 # trigger discovery using another query |
|
155 crset = self.execute('Card X WHERE X title "glup"') |
|
156 self.assertEquals(len(crset.rows), 1) |
|
157 rset = self.execute('Any X WHERE X eid > %s' % self.maxeid) |
|
158 self.assertEquals(len(rset.rows), 3) |
|
159 rset = self.execute('Any MAX(X)') |
|
160 self.assertEquals(len(rset.rows), 1) |
|
161 self.assertEquals(rset.rows[0][0], crset[0][0]) |
|
162 |
|
163 def test_attr_unification_1(self): |
|
164 n1 = self.execute('INSERT Note X: X type "AFFREF"')[0][0] |
|
165 n2 = self.execute('INSERT Note X: X type "AFFREU"')[0][0] |
|
166 rset = self.execute('Any X,Y WHERE X is Note, Y is Affaire, X type T, Y ref T') |
|
167 self.assertEquals(len(rset), 1, rset.rows) |
|
168 |
|
169 def test_attr_unification_2(self): |
|
170 ec2 = cu.execute('INSERT Card X: X title "AFFREF"')[0][0] |
|
171 cnx2.commit() |
|
172 try: |
|
173 c1 = self.execute('INSERT Card C: C title "AFFREF"')[0][0] |
|
174 rset = self.execute('Any X,Y WHERE X is Card, Y is Affaire, X title T, Y ref T') |
|
175 self.assertEquals(len(rset), 2, rset.rows) |
|
176 finally: |
|
177 cu.execute('DELETE Card X WHERE X eid %(x)s', {'x': ec2}, 'x') |
|
178 cnx2.commit() |
|
179 |
|
180 def test_attr_unification_neq_1(self): |
|
181 # XXX complete |
|
182 self.execute('Any X,Y WHERE X is Note, Y is Affaire, X creation_date D, Y creation_date > D') |
|
183 |
|
184 def test_attr_unification_neq_2(self): |
|
185 # XXX complete |
|
186 self.execute('Any X,Y WHERE X is Card, Y is Affaire, X creation_date D, Y creation_date > D') |
|
187 |
|
188 def test_union(self): |
|
189 afeids = self.execute('Affaire X') |
|
190 ueids = self.execute('EUser X') |
|
191 rset = self.execute('(Any X WHERE X is Affaire) UNION (Any X WHERE X is EUser)') |
|
192 self.assertEquals(sorted(r[0] for r in rset.rows), |
|
193 sorted(r[0] for r in afeids + ueids)) |
|
194 |
|
195 def test_subquery1(self): |
|
196 rsetbase = self.execute('Any W,X WITH W,X BEING (Any W,X ORDERBY W,X WHERE X wikiid W)') |
|
197 self.assertEquals(len(rsetbase), 4) |
|
198 self.assertEquals(sorted(rsetbase.rows), rsetbase.rows) |
|
199 rset = self.execute('Any W,X LIMIT 2 OFFSET 2 WITH W,X BEING (Any W,X ORDERBY W,X WHERE X wikiid W)') |
|
200 self.assertEquals(rset.rows, rsetbase.rows[2:4]) |
|
201 rset = self.execute('Any W,X ORDERBY W,X LIMIT 2 OFFSET 2 WITH W,X BEING (Any W,X WHERE X wikiid W)') |
|
202 self.assertEquals(rset.rows, rsetbase.rows[2:4]) |
|
203 rset = self.execute('Any W,X WITH W,X BEING (Any W,X ORDERBY W,X LIMIT 2 OFFSET 2 WHERE X wikiid W)') |
|
204 self.assertEquals(rset.rows, rsetbase.rows[2:4]) |
|
205 |
|
206 def test_subquery2(self): |
|
207 affeid = self.execute('Affaire X WHERE X ref "AFFREF"')[0][0] |
|
208 rset =self.execute('Any X,AA,AB WITH X,AA,AB BEING (Any X,AA,AB WHERE E eid %(x)s, E in_state X, X name AA, X modification_date AB)', |
|
209 {'x': affeid}) |
|
210 self.assertEquals(len(rset), 1) |
|
211 self.assertEquals(rset[0][1], "pitetre") |
|
212 |
|
213 def test_not_relation(self): |
|
214 states = set(tuple(x) for x in self.execute('Any S,SN WHERE S is State, S name SN')) |
|
215 userstate = self.session.user.in_state[0] |
|
216 states.remove((userstate.eid, userstate.name)) |
|
217 notstates = set(tuple(x) for x in self.execute('Any S,SN WHERE S is State, S name SN, NOT X in_state S, X eid %(x)s', |
|
218 {'x': self.session.user.eid}, 'x')) |
|
219 self.assertEquals(notstates, states) |
|
220 aff1 = self.execute('Any X WHERE X is Affaire, X ref "AFFREF"')[0][0] |
|
221 aff1stateeid, aff1statename = self.execute('Any S,SN WHERE X eid %(x)s, X in_state S, S name SN', {'x': aff1}, 'x')[0] |
|
222 self.assertEquals(aff1statename, 'pitetre') |
|
223 states.add((userstate.eid, userstate.name)) |
|
224 states.remove((aff1stateeid, aff1statename)) |
|
225 notstates = set(tuple(x) for x in self.execute('Any S,SN WHERE S is State, S name SN, NOT X in_state S, X eid %(x)s', |
|
226 {'x': aff1}, 'x')) |
|
227 self.set_debug(False) |
|
228 self.assertSetEquals(notstates, states) |
|
229 |
|
230 def test_nonregr1(self): |
|
231 ueid = self.session.user.eid |
|
232 affaire = self.execute('Affaire X WHERE X ref "AFFREF"').get_entity(0, 0) |
|
233 self.execute('Any U WHERE U in_group G, (G name IN ("managers", "logilab") OR (X require_permission P?, P name "bla", P require_group G)), X eid %(x)s, U eid %(u)s', |
|
234 {'x': affaire.eid, 'u': ueid}) |
|
235 |
|
236 def test_nonregr2(self): |
|
237 treid = self.session.user.latest_trinfo().eid |
|
238 rset = self.execute('Any X ORDERBY D DESC WHERE E eid %(x)s, E wf_info_for X, X modification_date D', |
|
239 {'x': treid}) |
|
240 self.assertEquals(len(rset), 1) |
|
241 self.assertEquals(rset.rows[0], [self.session.user.eid]) |
|
242 |
|
243 |
|
244 def test_nonregr3(self): |
|
245 self.execute('DELETE Card X WHERE X eid %(x)s, NOT X multisource_inlined_rel Y', {'x': self.ic1}) |
|
246 |
|
247 if __name__ == '__main__': |
|
248 from logilab.common.testlib import unittest_main |
|
249 unittest_main() |