|
1 # -*- coding: utf-8 -*- |
|
2 # copyright 2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
3 # contact http://www.logilab.fr -- mailto:contact@logilab.fr |
|
4 # |
|
5 # This program is free software: you can redistribute it and/or modify it under |
|
6 # the terms of the GNU Lesser General Public License as published by the Free |
|
7 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
8 # any later version. |
|
9 # |
|
10 # This program is distributed in the hope that it will be useful, but WITHOUT |
|
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
12 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
13 # details. |
|
14 # |
|
15 # You should have received a copy of the GNU Lesser General Public License along |
|
16 # with this program. If not, see <http://www.gnu.org/licenses/>. |
|
17 """Massive store test case""" |
|
18 |
|
19 import os.path as osp |
|
20 import itertools |
|
21 |
|
22 from cubicweb.dataimport import ucsvreader |
|
23 from cubicweb.devtools import testlib, PostgresApptestConfiguration |
|
24 from cubicweb.devtools import startpgcluster, stoppgcluster |
|
25 from cubicweb.dataimport.massive_store import MassiveObjectStore |
|
26 |
|
27 |
|
28 HERE = osp.abspath(osp.dirname(__file__)) |
|
29 |
|
30 |
|
31 def setUpModule(): |
|
32 startpgcluster(__file__) |
|
33 |
|
34 |
|
35 def tearDownModule(*args): |
|
36 stoppgcluster(__file__) |
|
37 |
|
38 |
|
39 class MassImportSimpleTC(testlib.CubicWebTC): |
|
40 configcls = PostgresApptestConfiguration |
|
41 appid = 'data-massimport' |
|
42 |
|
43 def cast(self, _type, value): |
|
44 try: |
|
45 return _type(value) |
|
46 except ValueError: |
|
47 return None |
|
48 |
|
49 def push_geonames_data(self, dumpname, store): |
|
50 # Push timezones |
|
51 cnx = store._cnx |
|
52 for code, gmt, dst, raw_offset in ucsvreader(open(osp.join(HERE, 'data/timeZones.txt')), |
|
53 delimiter='\t'): |
|
54 cnx.create_entity('TimeZone', code=code, gmt=float(gmt), |
|
55 dst=float(dst), raw_offset=float(raw_offset)) |
|
56 timezone_code = dict(cnx.execute('Any C, X WHERE X is TimeZone, X code C')) |
|
57 # Push data |
|
58 for ind, infos in enumerate(ucsvreader(open(dumpname), |
|
59 separator='\t', |
|
60 ignore_errors=True)): |
|
61 latitude = self.cast(float, infos[4]) |
|
62 longitude = self.cast(float, infos[5]) |
|
63 population = self.cast(int, infos[14]) |
|
64 elevation = self.cast(int, infos[15]) |
|
65 gtopo = self.cast(int, infos[16]) |
|
66 feature_class = infos[6] |
|
67 if len(infos[6]) != 1: |
|
68 feature_class = None |
|
69 entity = {'name': infos[1], |
|
70 'asciiname': infos[2], |
|
71 'alternatenames': infos[3], |
|
72 'latitude': latitude, 'longitude': longitude, |
|
73 'feature_class': feature_class, |
|
74 'alternate_country_code':infos[9], |
|
75 'admin_code_3': infos[12], |
|
76 'admin_code_4': infos[13], |
|
77 'population': population, 'elevation': elevation, |
|
78 'gtopo30': gtopo, 'timezone': timezone_code.get(infos[17]), |
|
79 'cwuri': u'http://sws.geonames.org/%s/' % int(infos[0]), |
|
80 'geonameid': int(infos[0]), |
|
81 } |
|
82 store.create_entity('Location', **entity) |
|
83 |
|
84 def test_autoflush_metadata(self): |
|
85 with self.admin_access.repo_cnx() as cnx: |
|
86 crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s', |
|
87 {'t': 'Location'}) |
|
88 self.assertEqual(len(crs.fetchall()), 0) |
|
89 store = MassiveObjectStore(cnx, autoflush_metadata=True) |
|
90 store.create_entity('Location', name=u'toto') |
|
91 store.flush() |
|
92 store.commit() |
|
93 store.cleanup() |
|
94 cnx.commit() |
|
95 with self.admin_access.repo_cnx() as cnx: |
|
96 crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s', |
|
97 {'t': 'Location'}) |
|
98 self.assertEqual(len(crs.fetchall()), 1) |
|
99 |
|
100 # def test_no_autoflush_metadata(self): |
|
101 # with self.admin_access.repo_cnx() as cnx: |
|
102 # crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s', |
|
103 # {'t': 'Location'}) |
|
104 # self.assertEqual(len(crs.fetchall()), 0) |
|
105 # with self.admin_access.repo_cnx() as cnx: |
|
106 # store = MassiveObjectStore(cnx, autoflush_metadata=False) |
|
107 # store.create_entity('Location', name=u'toto') |
|
108 # store.flush() |
|
109 # store.commit() |
|
110 # crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s', |
|
111 # {'t': 'Location'}) |
|
112 # self.assertEqual(len(crs.fetchall()), 0) |
|
113 # store.flush_meta_data() |
|
114 # crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s', |
|
115 # {'t': 'Location'}) |
|
116 # self.assertEqual(len(crs.fetchall()), 1) |
|
117 # store.cleanup() |
|
118 |
|
119 def test_massimport_etype_metadata(self): |
|
120 with self.admin_access.repo_cnx() as cnx: |
|
121 store = MassiveObjectStore(cnx) |
|
122 timezone = store.create_entity('TimeZone') |
|
123 store.create_entity('Location', timezone=timezone.eid) |
|
124 store.flush() |
|
125 store.commit() |
|
126 eid, etname = cnx.execute('Any X, TN WHERE X timezone TZ, X is T, ' |
|
127 'T name TN')[0] |
|
128 self.assertEqual(cnx.entity_from_eid(eid).cw_etype, etname) |
|
129 |
|
130 def test_do_not_drop_index(self): |
|
131 with self.admin_access.repo_cnx() as cnx: |
|
132 store = MassiveObjectStore(cnx, drop_index=False) |
|
133 cnx.commit() |
|
134 with self.admin_access.repo_cnx() as cnx: |
|
135 crs = cnx.system_sql('SELECT indexname FROM pg_indexes') |
|
136 indexes = [r[0] for r in crs.fetchall()] |
|
137 self.assertIn('entities_pkey', indexes) |
|
138 self.assertIn('unique_entities_extid_idx', indexes) |
|
139 self.assertIn('owned_by_relation_p_key', indexes) |
|
140 self.assertIn('owned_by_relation_to_idx', indexes) |
|
141 |
|
142 def test_drop_index(self): |
|
143 with self.admin_access.repo_cnx() as cnx: |
|
144 store = MassiveObjectStore(cnx, drop_index=True) |
|
145 cnx.commit() |
|
146 with self.admin_access.repo_cnx() as cnx: |
|
147 crs = cnx.system_sql('SELECT indexname FROM pg_indexes') |
|
148 indexes = [r[0] for r in crs.fetchall()] |
|
149 self.assertNotIn('entities_pkey', indexes) |
|
150 self.assertNotIn('unique_entities_extid_idx', indexes) |
|
151 self.assertNotIn('owned_by_relation_pkey', indexes) |
|
152 self.assertNotIn('owned_by_relation_to_idx', indexes) |
|
153 |
|
154 def test_drop_index_recreation(self): |
|
155 with self.admin_access.repo_cnx() as cnx: |
|
156 store = MassiveObjectStore(cnx, drop_index=True) |
|
157 store.cleanup() |
|
158 cnx.commit() |
|
159 with self.admin_access.repo_cnx() as cnx: |
|
160 crs = cnx.system_sql('SELECT indexname FROM pg_indexes') |
|
161 indexes = [r[0] for r in crs.fetchall()] |
|
162 self.assertIn('entities_pkey', indexes) |
|
163 self.assertIn('unique_entities_extid_idx', indexes) |
|
164 self.assertIn('owned_by_relation_p_key', indexes) |
|
165 self.assertIn('owned_by_relation_to_idx', indexes) |
|
166 |
|
167 def test_eids_seq_range(self): |
|
168 with self.admin_access.repo_cnx() as cnx: |
|
169 store = MassiveObjectStore(cnx, eids_seq_range=1000, eids_seq_start=50000) |
|
170 store.create_entity('Location', name=u'toto') |
|
171 store.flush() |
|
172 cnx.commit() |
|
173 with self.admin_access.repo_cnx() as cnx: |
|
174 crs = cnx.system_sql("SELECT * FROM entities_id_seq") |
|
175 self.assertTrue(crs.fetchone() > 50000) |
|
176 |
|
177 def test_eid_entity(self): |
|
178 with self.admin_access.repo_cnx() as cnx: |
|
179 store = MassiveObjectStore(cnx, eids_seq_range=1000, eids_seq_start=50000) |
|
180 entity = store.create_entity('Location', name=u'toto') |
|
181 store.flush() |
|
182 self.assertTrue(entity.eid > 50000) |
|
183 |
|
184 def test_eid_entity_2(self): |
|
185 with self.admin_access.repo_cnx() as cnx: |
|
186 store = MassiveObjectStore(cnx, eids_seq_range=1000, eids_seq_start=50000) |
|
187 entity = store.create_entity('Location', name=u'toto', eid=10000) |
|
188 store.flush() |
|
189 with self.admin_access.repo_cnx() as cnx: |
|
190 self.assertTrue(entity.eid==10000) |
|
191 |
|
192 def test_on_commit_callback(self): |
|
193 counter = itertools.count() |
|
194 with self.admin_access.repo_cnx() as cnx: |
|
195 store = MassiveObjectStore(cnx, on_commit_callback=counter.next) |
|
196 store.create_entity('Location', name=u'toto') |
|
197 store.flush() |
|
198 store.commit() |
|
199 self.assertTrue(counter.next() >= 1) |
|
200 |
|
201 def test_on_rollback_callback(self): |
|
202 counter = itertools.count() |
|
203 with self.admin_access.repo_cnx() as cnx: |
|
204 store = MassiveObjectStore(cnx, on_rollback_callback=lambda *_: counter.next()) |
|
205 store.create_entity('Location', nm='toto') |
|
206 store.flush() |
|
207 store.commit() |
|
208 self.assertTrue(counter.next() >= 1) |
|
209 |
|
210 def test_slave_mode_indexes(self): |
|
211 with self.admin_access.repo_cnx() as cnx: |
|
212 slave_store = MassiveObjectStore(cnx, slave_mode=True) |
|
213 with self.admin_access.repo_cnx() as cnx: |
|
214 crs = cnx.system_sql('SELECT indexname FROM pg_indexes') |
|
215 indexes = [r[0] for r in crs.fetchall()] |
|
216 self.assertIn('entities_pkey', indexes) |
|
217 self.assertIn('unique_entities_extid_idx', indexes) |
|
218 self.assertIn('owned_by_relation_p_key', indexes) |
|
219 self.assertIn('owned_by_relation_to_idx', indexes) |
|
220 |
|
221 def test_slave_mode_exception(self): |
|
222 with self.admin_access.repo_cnx() as cnx: |
|
223 master_store = MassiveObjectStore(cnx, slave_mode=False) |
|
224 slave_store = MassiveObjectStore(cnx, slave_mode=True) |
|
225 self.assertRaises(RuntimeError, slave_store.flush_meta_data) |
|
226 self.assertRaises(RuntimeError, slave_store.cleanup) |
|
227 |
|
228 def test_simple_insert(self): |
|
229 with self.admin_access.repo_cnx() as cnx: |
|
230 store = MassiveObjectStore(cnx, autoflush_metadata=True) |
|
231 self.push_geonames_data(osp.join(HERE, 'data/geonames.csv'), store) |
|
232 store.flush() |
|
233 with self.admin_access.repo_cnx() as cnx: |
|
234 rset = cnx.execute('Any X WHERE X is Location') |
|
235 self.assertEqual(len(rset), 4000) |
|
236 rset = cnx.execute('Any X WHERE X is Location, X timezone T') |
|
237 self.assertEqual(len(rset), 4000) |
|
238 |
|
239 def test_index_building(self): |
|
240 with self.admin_access.repo_cnx() as cnx: |
|
241 store = MassiveObjectStore(cnx, autoflush_metadata=True) |
|
242 self.push_geonames_data(osp.join(HERE, 'data/geonames.csv'), store) |
|
243 store.flush() |
|
244 |
|
245 # Check index |
|
246 crs = cnx.system_sql('SELECT indexname FROM pg_indexes') |
|
247 indexes = [r[0] for r in crs.fetchall()] |
|
248 self.assertNotIn('entities_pkey', indexes) |
|
249 self.assertNotIn('unique_entities_extid_idx', indexes) |
|
250 self.assertNotIn('owned_by_relation_p_key', indexes) |
|
251 self.assertNotIn('owned_by_relation_to_idx', indexes) |
|
252 |
|
253 # Cleanup -> index |
|
254 store.cleanup() |
|
255 |
|
256 # Check index again |
|
257 crs = cnx.system_sql('SELECT indexname FROM pg_indexes') |
|
258 indexes = [r[0] for r in crs.fetchall()] |
|
259 self.assertIn('entities_pkey', indexes) |
|
260 self.assertIn('unique_entities_extid_idx', indexes) |
|
261 self.assertIn('owned_by_relation_p_key', indexes) |
|
262 self.assertIn('owned_by_relation_to_idx', indexes) |
|
263 |
|
264 def test_flush_meta_data(self): |
|
265 with self.admin_access.repo_cnx() as cnx: |
|
266 store = MassiveObjectStore(cnx, autoflush_metadata=False) |
|
267 self.push_geonames_data(osp.join(HERE, 'data/geonames.csv'), store) |
|
268 store.flush() |
|
269 curs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s', |
|
270 {'t': 'Location'}) |
|
271 self.assertEqual(len(curs.fetchall()), 0) |
|
272 # Flush metadata -> entities table is updated |
|
273 store.flush_meta_data() |
|
274 curs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s', |
|
275 {'t': 'Location'}) |
|
276 self.assertEqual(len(curs.fetchall()), 4000) |
|
277 |
|
278 def test_multiple_insert(self): |
|
279 with self.admin_access.repo_cnx() as cnx: |
|
280 store = MassiveObjectStore(cnx) |
|
281 store.init_etype_table('TestLocation') |
|
282 store.cleanup() |
|
283 store = MassiveObjectStore(cnx) |
|
284 store.init_etype_table('TestLocation') |
|
285 store.cleanup() |
|
286 |
|
287 def test_multiple_insert_relation(self): |
|
288 with self.admin_access.repo_cnx() as cnx: |
|
289 store = MassiveObjectStore(cnx) |
|
290 store.init_relation_table('used_language') |
|
291 store.cleanup() |
|
292 store = MassiveObjectStore(cnx) |
|
293 store.init_relation_table('used_language') |
|
294 store.cleanup() |
|
295 |
|
296 def test_multiple_insert_drop_index(self): |
|
297 with self.admin_access.repo_cnx() as cnx: |
|
298 store = MassiveObjectStore(cnx, drop_index=False) |
|
299 store.init_relation_table('used_language') |
|
300 store.init_etype_table('TestLocation') |
|
301 store.cleanup() |
|
302 store = MassiveObjectStore(cnx) |
|
303 store.init_relation_table('used_language') |
|
304 store.init_etype_table('TestLocation') |
|
305 store.cleanup() |
|
306 |
|
307 def test_multiple_insert_drop_index_2(self): |
|
308 with self.admin_access.repo_cnx() as cnx: |
|
309 store = MassiveObjectStore(cnx) |
|
310 store.init_relation_table('used_language') |
|
311 store.init_etype_table('TestLocation') |
|
312 store.cleanup() |
|
313 store = MassiveObjectStore(cnx, drop_index=False) |
|
314 store.init_relation_table('used_language') |
|
315 store.init_etype_table('TestLocation') |
|
316 store.cleanup() |
|
317 |
|
318 def test_multiple_insert_drop_index_3(self): |
|
319 with self.admin_access.repo_cnx() as cnx: |
|
320 store = MassiveObjectStore(cnx, drop_index=False) |
|
321 store.init_relation_table('used_language') |
|
322 store.init_etype_table('TestLocation') |
|
323 store.cleanup() |
|
324 store = MassiveObjectStore(cnx, drop_index=False) |
|
325 store.init_relation_table('used_language') |
|
326 store.init_etype_table('TestLocation') |
|
327 store.cleanup() |
|
328 |
|
329 |
|
330 if __name__ == '__main__': |
|
331 from logilab.common.testlib import unittest_main |
|
332 unittest_main() |
|
333 |