1 # -*- coding: utf-8 -*- |
|
2 # copyright 2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
3 # contact http://www.logilab.fr -- mailto:contact@logilab.fr |
|
4 # |
|
5 # This program is free software: you can redistribute it and/or modify it under |
|
6 # the terms of the GNU Lesser General Public License as published by the Free |
|
7 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
8 # any later version. |
|
9 # |
|
10 # This program is distributed in the hope that it will be useful, but WITHOUT |
|
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
12 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
13 # details. |
|
14 # |
|
15 # You should have received a copy of the GNU Lesser General Public License along |
|
16 # with this program. If not, see <http://www.gnu.org/licenses/>. |
|
17 """Massive store test case""" |
|
18 |
|
19 import itertools |
|
20 |
|
21 from cubicweb.dataimport import ucsvreader |
|
22 from cubicweb.devtools import testlib, PostgresApptestConfiguration |
|
23 from cubicweb.devtools import startpgcluster, stoppgcluster |
|
24 from cubicweb.dataimport.massive_store import MassiveObjectStore, PGHelper |
|
25 |
|
26 |
|
27 def setUpModule(): |
|
28 startpgcluster(__file__) |
|
29 |
|
30 |
|
31 def tearDownModule(*args): |
|
32 stoppgcluster(__file__) |
|
33 |
|
34 |
|
35 class MassImportSimpleTC(testlib.CubicWebTC): |
|
36 configcls = PostgresApptestConfiguration |
|
37 appid = 'data-massimport' |
|
38 |
|
39 def cast(self, _type, value): |
|
40 try: |
|
41 return _type(value) |
|
42 except ValueError: |
|
43 return None |
|
44 |
|
45 def push_geonames_data(self, dumpname, store): |
|
46 # Push timezones |
|
47 cnx = store._cnx |
|
48 for code, gmt, dst, raw_offset in ucsvreader(open(self.datapath('timeZones.txt'), 'rb'), |
|
49 delimiter='\t'): |
|
50 cnx.create_entity('TimeZone', code=code, gmt=float(gmt), |
|
51 dst=float(dst), raw_offset=float(raw_offset)) |
|
52 timezone_code = dict(cnx.execute('Any C, X WHERE X is TimeZone, X code C')) |
|
53 # Push data |
|
54 for ind, infos in enumerate(ucsvreader(open(dumpname, 'rb'), |
|
55 delimiter='\t', |
|
56 ignore_errors=True)): |
|
57 latitude = self.cast(float, infos[4]) |
|
58 longitude = self.cast(float, infos[5]) |
|
59 population = self.cast(int, infos[14]) |
|
60 elevation = self.cast(int, infos[15]) |
|
61 gtopo = self.cast(int, infos[16]) |
|
62 feature_class = infos[6] |
|
63 if len(infos[6]) != 1: |
|
64 feature_class = None |
|
65 entity = {'name': infos[1], |
|
66 'asciiname': infos[2], |
|
67 'alternatenames': infos[3], |
|
68 'latitude': latitude, 'longitude': longitude, |
|
69 'feature_class': feature_class, |
|
70 'alternate_country_code':infos[9], |
|
71 'admin_code_3': infos[12], |
|
72 'admin_code_4': infos[13], |
|
73 'population': population, 'elevation': elevation, |
|
74 'gtopo30': gtopo, 'timezone': timezone_code.get(infos[17]), |
|
75 'cwuri': u'http://sws.geonames.org/%s/' % int(infos[0]), |
|
76 'geonameid': int(infos[0]), |
|
77 } |
|
78 store.prepare_insert_entity('Location', **entity) |
|
79 |
|
80 def test_autoflush_metadata(self): |
|
81 with self.admin_access.repo_cnx() as cnx: |
|
82 crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s', |
|
83 {'t': 'Location'}) |
|
84 self.assertEqual(len(crs.fetchall()), 0) |
|
85 store = MassiveObjectStore(cnx) |
|
86 store.prepare_insert_entity('Location', name=u'toto') |
|
87 store.flush() |
|
88 store.commit() |
|
89 store.finish() |
|
90 cnx.commit() |
|
91 with self.admin_access.repo_cnx() as cnx: |
|
92 crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s', |
|
93 {'t': 'Location'}) |
|
94 self.assertEqual(len(crs.fetchall()), 1) |
|
95 |
|
96 def test_massimport_etype_metadata(self): |
|
97 with self.admin_access.repo_cnx() as cnx: |
|
98 store = MassiveObjectStore(cnx) |
|
99 timezone_eid = store.prepare_insert_entity('TimeZone') |
|
100 store.prepare_insert_entity('Location', timezone=timezone_eid) |
|
101 store.flush() |
|
102 store.commit() |
|
103 eid, etname = cnx.execute('Any X, TN WHERE X timezone TZ, X is T, ' |
|
104 'T name TN')[0] |
|
105 self.assertEqual(cnx.entity_from_eid(eid).cw_etype, etname) |
|
106 |
|
107 def test_drop_index(self): |
|
108 with self.admin_access.repo_cnx() as cnx: |
|
109 store = MassiveObjectStore(cnx) |
|
110 cnx.commit() |
|
111 with self.admin_access.repo_cnx() as cnx: |
|
112 crs = cnx.system_sql('SELECT indexname FROM pg_indexes') |
|
113 indexes = [r[0] for r in crs.fetchall()] |
|
114 self.assertNotIn('entities_pkey', indexes) |
|
115 self.assertNotIn('unique_entities_extid_idx', indexes) |
|
116 self.assertNotIn('owned_by_relation_pkey', indexes) |
|
117 self.assertNotIn('owned_by_relation_to_idx', indexes) |
|
118 |
|
119 def test_drop_index_recreation(self): |
|
120 with self.admin_access.repo_cnx() as cnx: |
|
121 store = MassiveObjectStore(cnx) |
|
122 store.finish() |
|
123 cnx.commit() |
|
124 with self.admin_access.repo_cnx() as cnx: |
|
125 crs = cnx.system_sql('SELECT indexname FROM pg_indexes') |
|
126 indexes = [r[0] for r in crs.fetchall()] |
|
127 self.assertIn('entities_pkey', indexes) |
|
128 self.assertIn('unique_entities_extid_idx', indexes) |
|
129 self.assertIn('owned_by_relation_p_key', indexes) |
|
130 self.assertIn('owned_by_relation_to_idx', indexes) |
|
131 |
|
132 def test_eids_seq_range(self): |
|
133 with self.admin_access.repo_cnx() as cnx: |
|
134 store = MassiveObjectStore(cnx, eids_seq_range=1000) |
|
135 store.restart_eid_sequence(50000) |
|
136 store.prepare_insert_entity('Location', name=u'toto') |
|
137 store.flush() |
|
138 cnx.commit() |
|
139 with self.admin_access.repo_cnx() as cnx: |
|
140 crs = cnx.system_sql("SELECT * FROM entities_id_seq") |
|
141 self.assertGreater(crs.fetchone()[0], 50000) |
|
142 |
|
143 def test_eid_entity(self): |
|
144 with self.admin_access.repo_cnx() as cnx: |
|
145 store = MassiveObjectStore(cnx, eids_seq_range=1000) |
|
146 store.restart_eid_sequence(50000) |
|
147 eid = store.prepare_insert_entity('Location', name=u'toto') |
|
148 store.flush() |
|
149 self.assertGreater(eid, 50000) |
|
150 |
|
151 def test_eid_entity_2(self): |
|
152 with self.admin_access.repo_cnx() as cnx: |
|
153 store = MassiveObjectStore(cnx) |
|
154 store.restart_eid_sequence(50000) |
|
155 eid = store.prepare_insert_entity('Location', name=u'toto', eid=10000) |
|
156 store.flush() |
|
157 self.assertEqual(eid, 10000) |
|
158 |
|
159 @staticmethod |
|
160 def get_db_descr(cnx): |
|
161 pg_schema = ( |
|
162 cnx.repo.config.system_source_config.get('db-namespace') |
|
163 or 'public') |
|
164 pgh = PGHelper(cnx, pg_schema) |
|
165 all_tables = cnx.system_sql(''' |
|
166 SELECT table_name |
|
167 FROM information_schema.tables |
|
168 where table_schema = %(s)s''', {'s': pg_schema}).fetchall() |
|
169 all_tables_descr = {} |
|
170 for tablename, in all_tables: |
|
171 all_tables_descr[tablename] = set(pgh.index_list(tablename)).union( |
|
172 set(pgh.constraint_list(tablename))) |
|
173 return all_tables_descr |
|
174 |
|
175 def test_identical_schema(self): |
|
176 with self.admin_access.repo_cnx() as cnx: |
|
177 init_descr = self.get_db_descr(cnx) |
|
178 with self.admin_access.repo_cnx() as cnx: |
|
179 store = MassiveObjectStore(cnx) |
|
180 store.init_etype_table('CWUser') |
|
181 store.finish() |
|
182 with self.admin_access.repo_cnx() as cnx: |
|
183 final_descr = self.get_db_descr(cnx) |
|
184 self.assertEqual(init_descr, final_descr) |
|
185 |
|
186 def test_on_commit_callback(self): |
|
187 counter = itertools.count() |
|
188 with self.admin_access.repo_cnx() as cnx: |
|
189 store = MassiveObjectStore(cnx, on_commit_callback=lambda:next(counter)) |
|
190 store.prepare_insert_entity('Location', name=u'toto') |
|
191 store.flush() |
|
192 store.commit() |
|
193 self.assertGreaterEqual(next(counter), 1) |
|
194 |
|
195 def test_on_rollback_callback(self): |
|
196 counter = itertools.count() |
|
197 with self.admin_access.repo_cnx() as cnx: |
|
198 store = MassiveObjectStore(cnx, on_rollback_callback=lambda *_: next(counter)) |
|
199 store.prepare_insert_entity('Location', nm='toto') |
|
200 store.flush() |
|
201 store.commit() |
|
202 self.assertGreaterEqual(next(counter), 1) |
|
203 |
|
204 def test_slave_mode_indexes(self): |
|
205 with self.admin_access.repo_cnx() as cnx: |
|
206 slave_store = MassiveObjectStore(cnx, slave_mode=True) |
|
207 with self.admin_access.repo_cnx() as cnx: |
|
208 crs = cnx.system_sql('SELECT indexname FROM pg_indexes') |
|
209 indexes = [r[0] for r in crs.fetchall()] |
|
210 self.assertIn('entities_pkey', indexes) |
|
211 self.assertIn('unique_entities_extid_idx', indexes) |
|
212 self.assertIn('owned_by_relation_p_key', indexes) |
|
213 self.assertIn('owned_by_relation_to_idx', indexes) |
|
214 |
|
215 def test_slave_mode_exception(self): |
|
216 with self.admin_access.repo_cnx() as cnx: |
|
217 master_store = MassiveObjectStore(cnx, slave_mode=False) |
|
218 slave_store = MassiveObjectStore(cnx, slave_mode=True) |
|
219 self.assertRaises(RuntimeError, slave_store.flush_meta_data) |
|
220 self.assertRaises(RuntimeError, slave_store.finish) |
|
221 |
|
222 def test_simple_insert(self): |
|
223 with self.admin_access.repo_cnx() as cnx: |
|
224 store = MassiveObjectStore(cnx) |
|
225 self.push_geonames_data(self.datapath('geonames.csv'), store) |
|
226 store.flush() |
|
227 store.commit() |
|
228 store.finish() |
|
229 with self.admin_access.repo_cnx() as cnx: |
|
230 rset = cnx.execute('Any X WHERE X is Location') |
|
231 self.assertEqual(len(rset), 4000) |
|
232 rset = cnx.execute('Any X WHERE X is Location, X timezone T') |
|
233 self.assertEqual(len(rset), 4000) |
|
234 |
|
235 def test_index_building(self): |
|
236 with self.admin_access.repo_cnx() as cnx: |
|
237 store = MassiveObjectStore(cnx) |
|
238 self.push_geonames_data(self.datapath('geonames.csv'), store) |
|
239 store.flush() |
|
240 |
|
241 # Check index |
|
242 crs = cnx.system_sql('SELECT indexname FROM pg_indexes') |
|
243 indexes = [r[0] for r in crs.fetchall()] |
|
244 self.assertNotIn('entities_pkey', indexes) |
|
245 self.assertNotIn('unique_entities_extid_idx', indexes) |
|
246 self.assertNotIn('owned_by_relation_p_key', indexes) |
|
247 self.assertNotIn('owned_by_relation_to_idx', indexes) |
|
248 |
|
249 # Cleanup -> index |
|
250 store.finish() |
|
251 |
|
252 # Check index again |
|
253 crs = cnx.system_sql('SELECT indexname FROM pg_indexes') |
|
254 indexes = [r[0] for r in crs.fetchall()] |
|
255 self.assertIn('entities_pkey', indexes) |
|
256 self.assertIn('unique_entities_extid_idx', indexes) |
|
257 self.assertIn('owned_by_relation_p_key', indexes) |
|
258 self.assertIn('owned_by_relation_to_idx', indexes) |
|
259 |
|
260 def test_multiple_insert(self): |
|
261 with self.admin_access.repo_cnx() as cnx: |
|
262 store = MassiveObjectStore(cnx) |
|
263 store.init_etype_table('TestLocation') |
|
264 store.finish() |
|
265 store = MassiveObjectStore(cnx) |
|
266 store.init_etype_table('TestLocation') |
|
267 store.finish() |
|
268 |
|
269 def test_multiple_insert_relation(self): |
|
270 with self.admin_access.repo_cnx() as cnx: |
|
271 store = MassiveObjectStore(cnx) |
|
272 store.init_relation_table('used_language') |
|
273 store.finish() |
|
274 store = MassiveObjectStore(cnx) |
|
275 store.init_relation_table('used_language') |
|
276 store.finish() |
|
277 |
|
278 |
|
279 if __name__ == '__main__': |
|
280 from logilab.common.testlib import unittest_main |
|
281 unittest_main() |
|