diff -r 058bb3dc685f -r 0b59724cb3f2 cubicweb/dataimport/test/test_massive_store.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cubicweb/dataimport/test/test_massive_store.py Sat Jan 16 13:48:51 2016 +0100 @@ -0,0 +1,281 @@ +# -*- coding: utf-8 -*- +# copyright 2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# contact http://www.logilab.fr -- mailto:contact@logilab.fr +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 2.1 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with this program. If not, see . +"""Massive store test case""" + +import itertools + +from cubicweb.dataimport import ucsvreader +from cubicweb.devtools import testlib, PostgresApptestConfiguration +from cubicweb.devtools import startpgcluster, stoppgcluster +from cubicweb.dataimport.massive_store import MassiveObjectStore, PGHelper + + +def setUpModule(): + startpgcluster(__file__) + + +def tearDownModule(*args): + stoppgcluster(__file__) + + +class MassImportSimpleTC(testlib.CubicWebTC): + configcls = PostgresApptestConfiguration + appid = 'data-massimport' + + def cast(self, _type, value): + try: + return _type(value) + except ValueError: + return None + + def push_geonames_data(self, dumpname, store): + # Push timezones + cnx = store._cnx + for code, gmt, dst, raw_offset in ucsvreader(open(self.datapath('timeZones.txt'), 'rb'), + delimiter='\t'): + cnx.create_entity('TimeZone', code=code, gmt=float(gmt), + dst=float(dst), raw_offset=float(raw_offset)) + timezone_code = dict(cnx.execute('Any C, X WHERE X is TimeZone, X code C')) + # Push data + for ind, infos in enumerate(ucsvreader(open(dumpname, 'rb'), + delimiter='\t', + ignore_errors=True)): + latitude = self.cast(float, infos[4]) + longitude = self.cast(float, infos[5]) + population = self.cast(int, infos[14]) + elevation = self.cast(int, infos[15]) + gtopo = self.cast(int, infos[16]) + feature_class = infos[6] + if len(infos[6]) != 1: + feature_class = None + entity = {'name': infos[1], + 'asciiname': infos[2], + 'alternatenames': infos[3], + 'latitude': latitude, 'longitude': longitude, + 'feature_class': feature_class, + 'alternate_country_code':infos[9], + 'admin_code_3': infos[12], + 'admin_code_4': infos[13], + 'population': population, 'elevation': elevation, + 'gtopo30': gtopo, 'timezone': timezone_code.get(infos[17]), + 'cwuri': u'http://sws.geonames.org/%s/' % int(infos[0]), + 'geonameid': int(infos[0]), + } + store.prepare_insert_entity('Location', **entity) + + def test_autoflush_metadata(self): + with self.admin_access.repo_cnx() as cnx: + crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s', + {'t': 'Location'}) + self.assertEqual(len(crs.fetchall()), 0) + store = MassiveObjectStore(cnx) + store.prepare_insert_entity('Location', name=u'toto') + store.flush() + store.commit() + store.finish() + cnx.commit() + with self.admin_access.repo_cnx() as cnx: + crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s', + {'t': 'Location'}) + self.assertEqual(len(crs.fetchall()), 1) + + def test_massimport_etype_metadata(self): + with self.admin_access.repo_cnx() as cnx: + store = MassiveObjectStore(cnx) + timezone_eid = store.prepare_insert_entity('TimeZone') + store.prepare_insert_entity('Location', timezone=timezone_eid) + store.flush() + store.commit() + eid, etname = cnx.execute('Any X, TN WHERE X timezone TZ, X is T, ' + 'T name TN')[0] + self.assertEqual(cnx.entity_from_eid(eid).cw_etype, etname) + + def test_drop_index(self): + with self.admin_access.repo_cnx() as cnx: + store = MassiveObjectStore(cnx) + cnx.commit() + with self.admin_access.repo_cnx() as cnx: + crs = cnx.system_sql('SELECT indexname FROM pg_indexes') + indexes = [r[0] for r in crs.fetchall()] + self.assertNotIn('entities_pkey', indexes) + self.assertNotIn('unique_entities_extid_idx', indexes) + self.assertNotIn('owned_by_relation_pkey', indexes) + self.assertNotIn('owned_by_relation_to_idx', indexes) + + def test_drop_index_recreation(self): + with self.admin_access.repo_cnx() as cnx: + store = MassiveObjectStore(cnx) + store.finish() + cnx.commit() + with self.admin_access.repo_cnx() as cnx: + crs = cnx.system_sql('SELECT indexname FROM pg_indexes') + indexes = [r[0] for r in crs.fetchall()] + self.assertIn('entities_pkey', indexes) + self.assertIn('unique_entities_extid_idx', indexes) + self.assertIn('owned_by_relation_p_key', indexes) + self.assertIn('owned_by_relation_to_idx', indexes) + + def test_eids_seq_range(self): + with self.admin_access.repo_cnx() as cnx: + store = MassiveObjectStore(cnx, eids_seq_range=1000) + store.restart_eid_sequence(50000) + store.prepare_insert_entity('Location', name=u'toto') + store.flush() + cnx.commit() + with self.admin_access.repo_cnx() as cnx: + crs = cnx.system_sql("SELECT * FROM entities_id_seq") + self.assertGreater(crs.fetchone()[0], 50000) + + def test_eid_entity(self): + with self.admin_access.repo_cnx() as cnx: + store = MassiveObjectStore(cnx, eids_seq_range=1000) + store.restart_eid_sequence(50000) + eid = store.prepare_insert_entity('Location', name=u'toto') + store.flush() + self.assertGreater(eid, 50000) + + def test_eid_entity_2(self): + with self.admin_access.repo_cnx() as cnx: + store = MassiveObjectStore(cnx) + store.restart_eid_sequence(50000) + eid = store.prepare_insert_entity('Location', name=u'toto', eid=10000) + store.flush() + self.assertEqual(eid, 10000) + + @staticmethod + def get_db_descr(cnx): + pg_schema = ( + cnx.repo.config.system_source_config.get('db-namespace') + or 'public') + pgh = PGHelper(cnx, pg_schema) + all_tables = cnx.system_sql(''' +SELECT table_name +FROM information_schema.tables +where table_schema = %(s)s''', {'s': pg_schema}).fetchall() + all_tables_descr = {} + for tablename, in all_tables: + all_tables_descr[tablename] = set(pgh.index_list(tablename)).union( + set(pgh.constraint_list(tablename))) + return all_tables_descr + + def test_identical_schema(self): + with self.admin_access.repo_cnx() as cnx: + init_descr = self.get_db_descr(cnx) + with self.admin_access.repo_cnx() as cnx: + store = MassiveObjectStore(cnx) + store.init_etype_table('CWUser') + store.finish() + with self.admin_access.repo_cnx() as cnx: + final_descr = self.get_db_descr(cnx) + self.assertEqual(init_descr, final_descr) + + def test_on_commit_callback(self): + counter = itertools.count() + with self.admin_access.repo_cnx() as cnx: + store = MassiveObjectStore(cnx, on_commit_callback=lambda:next(counter)) + store.prepare_insert_entity('Location', name=u'toto') + store.flush() + store.commit() + self.assertGreaterEqual(next(counter), 1) + + def test_on_rollback_callback(self): + counter = itertools.count() + with self.admin_access.repo_cnx() as cnx: + store = MassiveObjectStore(cnx, on_rollback_callback=lambda *_: next(counter)) + store.prepare_insert_entity('Location', nm='toto') + store.flush() + store.commit() + self.assertGreaterEqual(next(counter), 1) + + def test_slave_mode_indexes(self): + with self.admin_access.repo_cnx() as cnx: + slave_store = MassiveObjectStore(cnx, slave_mode=True) + with self.admin_access.repo_cnx() as cnx: + crs = cnx.system_sql('SELECT indexname FROM pg_indexes') + indexes = [r[0] for r in crs.fetchall()] + self.assertIn('entities_pkey', indexes) + self.assertIn('unique_entities_extid_idx', indexes) + self.assertIn('owned_by_relation_p_key', indexes) + self.assertIn('owned_by_relation_to_idx', indexes) + + def test_slave_mode_exception(self): + with self.admin_access.repo_cnx() as cnx: + master_store = MassiveObjectStore(cnx, slave_mode=False) + slave_store = MassiveObjectStore(cnx, slave_mode=True) + self.assertRaises(RuntimeError, slave_store.flush_meta_data) + self.assertRaises(RuntimeError, slave_store.finish) + + def test_simple_insert(self): + with self.admin_access.repo_cnx() as cnx: + store = MassiveObjectStore(cnx) + self.push_geonames_data(self.datapath('geonames.csv'), store) + store.flush() + store.commit() + store.finish() + with self.admin_access.repo_cnx() as cnx: + rset = cnx.execute('Any X WHERE X is Location') + self.assertEqual(len(rset), 4000) + rset = cnx.execute('Any X WHERE X is Location, X timezone T') + self.assertEqual(len(rset), 4000) + + def test_index_building(self): + with self.admin_access.repo_cnx() as cnx: + store = MassiveObjectStore(cnx) + self.push_geonames_data(self.datapath('geonames.csv'), store) + store.flush() + + # Check index + crs = cnx.system_sql('SELECT indexname FROM pg_indexes') + indexes = [r[0] for r in crs.fetchall()] + self.assertNotIn('entities_pkey', indexes) + self.assertNotIn('unique_entities_extid_idx', indexes) + self.assertNotIn('owned_by_relation_p_key', indexes) + self.assertNotIn('owned_by_relation_to_idx', indexes) + + # Cleanup -> index + store.finish() + + # Check index again + crs = cnx.system_sql('SELECT indexname FROM pg_indexes') + indexes = [r[0] for r in crs.fetchall()] + self.assertIn('entities_pkey', indexes) + self.assertIn('unique_entities_extid_idx', indexes) + self.assertIn('owned_by_relation_p_key', indexes) + self.assertIn('owned_by_relation_to_idx', indexes) + + def test_multiple_insert(self): + with self.admin_access.repo_cnx() as cnx: + store = MassiveObjectStore(cnx) + store.init_etype_table('TestLocation') + store.finish() + store = MassiveObjectStore(cnx) + store.init_etype_table('TestLocation') + store.finish() + + def test_multiple_insert_relation(self): + with self.admin_access.repo_cnx() as cnx: + store = MassiveObjectStore(cnx) + store.init_relation_table('used_language') + store.finish() + store = MassiveObjectStore(cnx) + store.init_relation_table('used_language') + store.finish() + + +if __name__ == '__main__': + from logilab.common.testlib import unittest_main + unittest_main()