cubicweb/dataimport/test/test_massive_store.py
changeset 11057 0b59724cb3f2
parent 11028 66f94d7f9ca7
child 11095 02e88ca3bc23
equal deleted inserted replaced
11052:058bb3dc685f 11057:0b59724cb3f2
       
     1 # -*- coding: utf-8 -*-
       
     2 # copyright 2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     3 # contact http://www.logilab.fr -- mailto:contact@logilab.fr
       
     4 #
       
     5 # This program is free software: you can redistribute it and/or modify it under
       
     6 # the terms of the GNU Lesser General Public License as published by the Free
       
     7 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     8 # any later version.
       
     9 #
       
    10 # This program is distributed in the hope that it will be useful, but WITHOUT
       
    11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    12 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
       
    13 # details.
       
    14 #
       
    15 # You should have received a copy of the GNU Lesser General Public License along
       
    16 # with this program. If not, see <http://www.gnu.org/licenses/>.
       
    17 """Massive store test case"""
       
    18 
       
    19 import itertools
       
    20 
       
    21 from cubicweb.dataimport import ucsvreader
       
    22 from cubicweb.devtools import testlib, PostgresApptestConfiguration
       
    23 from cubicweb.devtools import startpgcluster, stoppgcluster
       
    24 from cubicweb.dataimport.massive_store import MassiveObjectStore, PGHelper
       
    25 
       
    26 
       
    27 def setUpModule():
       
    28     startpgcluster(__file__)
       
    29 
       
    30 
       
    31 def tearDownModule(*args):
       
    32     stoppgcluster(__file__)
       
    33 
       
    34 
       
    35 class MassImportSimpleTC(testlib.CubicWebTC):
       
    36     configcls = PostgresApptestConfiguration
       
    37     appid = 'data-massimport'
       
    38 
       
    39     def cast(self, _type, value):
       
    40         try:
       
    41             return _type(value)
       
    42         except ValueError:
       
    43             return None
       
    44 
       
    45     def push_geonames_data(self, dumpname, store):
       
    46         # Push timezones
       
    47         cnx = store._cnx
       
    48         for code, gmt, dst, raw_offset in ucsvreader(open(self.datapath('timeZones.txt'), 'rb'),
       
    49                                                      delimiter='\t'):
       
    50             cnx.create_entity('TimeZone', code=code, gmt=float(gmt),
       
    51                                     dst=float(dst), raw_offset=float(raw_offset))
       
    52         timezone_code = dict(cnx.execute('Any C, X WHERE X is TimeZone, X code C'))
       
    53         # Push data
       
    54         for ind, infos in enumerate(ucsvreader(open(dumpname, 'rb'),
       
    55                                                delimiter='\t',
       
    56                                                ignore_errors=True)):
       
    57             latitude = self.cast(float, infos[4])
       
    58             longitude = self.cast(float, infos[5])
       
    59             population = self.cast(int, infos[14])
       
    60             elevation = self.cast(int, infos[15])
       
    61             gtopo = self.cast(int, infos[16])
       
    62             feature_class = infos[6]
       
    63             if len(infos[6]) != 1:
       
    64                 feature_class = None
       
    65             entity = {'name': infos[1],
       
    66                       'asciiname': infos[2],
       
    67                       'alternatenames': infos[3],
       
    68                       'latitude': latitude, 'longitude': longitude,
       
    69                       'feature_class': feature_class,
       
    70                       'alternate_country_code':infos[9],
       
    71                       'admin_code_3': infos[12],
       
    72                       'admin_code_4': infos[13],
       
    73                       'population': population, 'elevation': elevation,
       
    74                       'gtopo30': gtopo, 'timezone': timezone_code.get(infos[17]),
       
    75                       'cwuri':  u'http://sws.geonames.org/%s/' % int(infos[0]),
       
    76                       'geonameid': int(infos[0]),
       
    77                       }
       
    78             store.prepare_insert_entity('Location', **entity)
       
    79 
       
    80     def test_autoflush_metadata(self):
       
    81         with self.admin_access.repo_cnx() as cnx:
       
    82             crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s',
       
    83                                  {'t': 'Location'})
       
    84             self.assertEqual(len(crs.fetchall()), 0)
       
    85             store = MassiveObjectStore(cnx)
       
    86             store.prepare_insert_entity('Location', name=u'toto')
       
    87             store.flush()
       
    88             store.commit()
       
    89             store.finish()
       
    90             cnx.commit()
       
    91         with self.admin_access.repo_cnx() as cnx:
       
    92             crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s',
       
    93                                  {'t': 'Location'})
       
    94             self.assertEqual(len(crs.fetchall()), 1)
       
    95 
       
    96     def test_massimport_etype_metadata(self):
       
    97         with self.admin_access.repo_cnx() as cnx:
       
    98             store = MassiveObjectStore(cnx)
       
    99             timezone_eid = store.prepare_insert_entity('TimeZone')
       
   100             store.prepare_insert_entity('Location', timezone=timezone_eid)
       
   101             store.flush()
       
   102             store.commit()
       
   103             eid, etname = cnx.execute('Any X, TN WHERE X timezone TZ, X is T, '
       
   104                                       'T name TN')[0]
       
   105             self.assertEqual(cnx.entity_from_eid(eid).cw_etype, etname)
       
   106 
       
   107     def test_drop_index(self):
       
   108         with self.admin_access.repo_cnx() as cnx:
       
   109             store = MassiveObjectStore(cnx)
       
   110             cnx.commit()
       
   111         with self.admin_access.repo_cnx() as cnx:
       
   112             crs = cnx.system_sql('SELECT indexname FROM pg_indexes')
       
   113             indexes = [r[0] for r in crs.fetchall()]
       
   114         self.assertNotIn('entities_pkey', indexes)
       
   115         self.assertNotIn('unique_entities_extid_idx', indexes)
       
   116         self.assertNotIn('owned_by_relation_pkey', indexes)
       
   117         self.assertNotIn('owned_by_relation_to_idx', indexes)
       
   118 
       
   119     def test_drop_index_recreation(self):
       
   120         with self.admin_access.repo_cnx() as cnx:
       
   121             store = MassiveObjectStore(cnx)
       
   122             store.finish()
       
   123             cnx.commit()
       
   124         with self.admin_access.repo_cnx() as cnx:
       
   125             crs = cnx.system_sql('SELECT indexname FROM pg_indexes')
       
   126             indexes = [r[0] for r in crs.fetchall()]
       
   127         self.assertIn('entities_pkey', indexes)
       
   128         self.assertIn('unique_entities_extid_idx', indexes)
       
   129         self.assertIn('owned_by_relation_p_key', indexes)
       
   130         self.assertIn('owned_by_relation_to_idx', indexes)
       
   131 
       
   132     def test_eids_seq_range(self):
       
   133         with self.admin_access.repo_cnx() as cnx:
       
   134             store = MassiveObjectStore(cnx, eids_seq_range=1000)
       
   135             store.restart_eid_sequence(50000)
       
   136             store.prepare_insert_entity('Location', name=u'toto')
       
   137             store.flush()
       
   138             cnx.commit()
       
   139         with self.admin_access.repo_cnx() as cnx:
       
   140             crs = cnx.system_sql("SELECT * FROM entities_id_seq")
       
   141             self.assertGreater(crs.fetchone()[0], 50000)
       
   142 
       
   143     def test_eid_entity(self):
       
   144         with self.admin_access.repo_cnx() as cnx:
       
   145             store = MassiveObjectStore(cnx, eids_seq_range=1000)
       
   146             store.restart_eid_sequence(50000)
       
   147             eid = store.prepare_insert_entity('Location', name=u'toto')
       
   148             store.flush()
       
   149             self.assertGreater(eid, 50000)
       
   150 
       
   151     def test_eid_entity_2(self):
       
   152         with self.admin_access.repo_cnx() as cnx:
       
   153             store = MassiveObjectStore(cnx)
       
   154             store.restart_eid_sequence(50000)
       
   155             eid = store.prepare_insert_entity('Location', name=u'toto', eid=10000)
       
   156             store.flush()
       
   157         self.assertEqual(eid, 10000)
       
   158 
       
   159     @staticmethod
       
   160     def get_db_descr(cnx):
       
   161         pg_schema = (
       
   162                 cnx.repo.config.system_source_config.get('db-namespace')
       
   163                 or 'public')
       
   164         pgh = PGHelper(cnx, pg_schema)
       
   165         all_tables = cnx.system_sql('''
       
   166 SELECT table_name
       
   167 FROM information_schema.tables
       
   168 where table_schema = %(s)s''', {'s': pg_schema}).fetchall()
       
   169         all_tables_descr = {}
       
   170         for tablename, in all_tables:
       
   171             all_tables_descr[tablename] = set(pgh.index_list(tablename)).union(
       
   172                 set(pgh.constraint_list(tablename)))
       
   173         return all_tables_descr
       
   174 
       
   175     def test_identical_schema(self):
       
   176         with self.admin_access.repo_cnx() as cnx:
       
   177             init_descr = self.get_db_descr(cnx)
       
   178         with self.admin_access.repo_cnx() as cnx:
       
   179             store = MassiveObjectStore(cnx)
       
   180             store.init_etype_table('CWUser')
       
   181             store.finish()
       
   182         with self.admin_access.repo_cnx() as cnx:
       
   183             final_descr = self.get_db_descr(cnx)
       
   184         self.assertEqual(init_descr, final_descr)
       
   185 
       
   186     def test_on_commit_callback(self):
       
   187         counter = itertools.count()
       
   188         with self.admin_access.repo_cnx() as cnx:
       
   189             store = MassiveObjectStore(cnx, on_commit_callback=lambda:next(counter))
       
   190             store.prepare_insert_entity('Location', name=u'toto')
       
   191             store.flush()
       
   192             store.commit()
       
   193         self.assertGreaterEqual(next(counter), 1)
       
   194 
       
   195     def test_on_rollback_callback(self):
       
   196         counter = itertools.count()
       
   197         with self.admin_access.repo_cnx() as cnx:
       
   198             store = MassiveObjectStore(cnx, on_rollback_callback=lambda *_: next(counter))
       
   199             store.prepare_insert_entity('Location', nm='toto')
       
   200             store.flush()
       
   201             store.commit()
       
   202         self.assertGreaterEqual(next(counter), 1)
       
   203 
       
   204     def test_slave_mode_indexes(self):
       
   205         with self.admin_access.repo_cnx() as cnx:
       
   206             slave_store = MassiveObjectStore(cnx, slave_mode=True)
       
   207         with self.admin_access.repo_cnx() as cnx:
       
   208             crs = cnx.system_sql('SELECT indexname FROM pg_indexes')
       
   209             indexes = [r[0] for r in crs.fetchall()]
       
   210         self.assertIn('entities_pkey', indexes)
       
   211         self.assertIn('unique_entities_extid_idx', indexes)
       
   212         self.assertIn('owned_by_relation_p_key', indexes)
       
   213         self.assertIn('owned_by_relation_to_idx', indexes)
       
   214 
       
   215     def test_slave_mode_exception(self):
       
   216         with self.admin_access.repo_cnx() as cnx:
       
   217             master_store = MassiveObjectStore(cnx, slave_mode=False)
       
   218             slave_store = MassiveObjectStore(cnx, slave_mode=True)
       
   219             self.assertRaises(RuntimeError, slave_store.flush_meta_data)
       
   220             self.assertRaises(RuntimeError, slave_store.finish)
       
   221 
       
   222     def test_simple_insert(self):
       
   223         with self.admin_access.repo_cnx() as cnx:
       
   224             store = MassiveObjectStore(cnx)
       
   225             self.push_geonames_data(self.datapath('geonames.csv'), store)
       
   226             store.flush()
       
   227             store.commit()
       
   228             store.finish()
       
   229         with self.admin_access.repo_cnx() as cnx:
       
   230             rset = cnx.execute('Any X WHERE X is Location')
       
   231             self.assertEqual(len(rset), 4000)
       
   232             rset = cnx.execute('Any X WHERE X is Location, X timezone T')
       
   233             self.assertEqual(len(rset), 4000)
       
   234 
       
   235     def test_index_building(self):
       
   236         with self.admin_access.repo_cnx() as cnx:
       
   237             store = MassiveObjectStore(cnx)
       
   238             self.push_geonames_data(self.datapath('geonames.csv'), store)
       
   239             store.flush()
       
   240 
       
   241             # Check index
       
   242             crs = cnx.system_sql('SELECT indexname FROM pg_indexes')
       
   243             indexes = [r[0] for r in crs.fetchall()]
       
   244             self.assertNotIn('entities_pkey', indexes)
       
   245             self.assertNotIn('unique_entities_extid_idx', indexes)
       
   246             self.assertNotIn('owned_by_relation_p_key', indexes)
       
   247             self.assertNotIn('owned_by_relation_to_idx', indexes)
       
   248 
       
   249             # Cleanup -> index
       
   250             store.finish()
       
   251 
       
   252             # Check index again
       
   253             crs = cnx.system_sql('SELECT indexname FROM pg_indexes')
       
   254             indexes = [r[0] for r in crs.fetchall()]
       
   255             self.assertIn('entities_pkey', indexes)
       
   256             self.assertIn('unique_entities_extid_idx', indexes)
       
   257             self.assertIn('owned_by_relation_p_key', indexes)
       
   258             self.assertIn('owned_by_relation_to_idx', indexes)
       
   259 
       
   260     def test_multiple_insert(self):
       
   261         with self.admin_access.repo_cnx() as cnx:
       
   262             store = MassiveObjectStore(cnx)
       
   263             store.init_etype_table('TestLocation')
       
   264             store.finish()
       
   265             store = MassiveObjectStore(cnx)
       
   266             store.init_etype_table('TestLocation')
       
   267             store.finish()
       
   268 
       
   269     def test_multiple_insert_relation(self):
       
   270         with self.admin_access.repo_cnx() as cnx:
       
   271             store = MassiveObjectStore(cnx)
       
   272             store.init_relation_table('used_language')
       
   273             store.finish()
       
   274             store = MassiveObjectStore(cnx)
       
   275             store.init_relation_table('used_language')
       
   276             store.finish()
       
   277 
       
   278 
       
   279 if __name__ == '__main__':
       
   280     from logilab.common.testlib import unittest_main
       
   281     unittest_main()