dataimport/test/test_massive_store.py
changeset 10853 de741492538d
child 10855 cd91f46fa633
equal deleted inserted replaced
10852:e35d23686d1f 10853:de741492538d
       
     1 # -*- coding: utf-8 -*-
       
     2 # copyright 2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     3 # contact http://www.logilab.fr -- mailto:contact@logilab.fr
       
     4 #
       
     5 # This program is free software: you can redistribute it and/or modify it under
       
     6 # the terms of the GNU Lesser General Public License as published by the Free
       
     7 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     8 # any later version.
       
     9 #
       
    10 # This program is distributed in the hope that it will be useful, but WITHOUT
       
    11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    12 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
       
    13 # details.
       
    14 #
       
    15 # You should have received a copy of the GNU Lesser General Public License along
       
    16 # with this program. If not, see <http://www.gnu.org/licenses/>.
       
    17 """Massive store test case"""
       
    18 
       
    19 import os.path as osp
       
    20 import itertools
       
    21 
       
    22 from cubicweb.dataimport import ucsvreader
       
    23 from cubicweb.devtools import testlib, PostgresApptestConfiguration
       
    24 from cubicweb.devtools import startpgcluster, stoppgcluster
       
    25 from cubicweb.dataimport.massive_store import MassiveObjectStore
       
    26 
       
    27 
       
    28 HERE = osp.abspath(osp.dirname(__file__))
       
    29 
       
    30 
       
    31 def setUpModule():
       
    32     startpgcluster(__file__)
       
    33 
       
    34 
       
    35 def tearDownModule(*args):
       
    36     stoppgcluster(__file__)
       
    37 
       
    38 
       
    39 class MassImportSimpleTC(testlib.CubicWebTC):
       
    40     configcls = PostgresApptestConfiguration
       
    41     appid = 'data-massimport'
       
    42 
       
    43     def cast(self, _type, value):
       
    44         try:
       
    45             return _type(value)
       
    46         except ValueError:
       
    47             return None
       
    48 
       
    49     def push_geonames_data(self, dumpname, store):
       
    50         # Push timezones
       
    51         cnx = store._cnx
       
    52         for code, gmt, dst, raw_offset in ucsvreader(open(osp.join(HERE, 'data/timeZones.txt')),
       
    53                                                      delimiter='\t'):
       
    54             cnx.create_entity('TimeZone', code=code, gmt=float(gmt),
       
    55                                     dst=float(dst), raw_offset=float(raw_offset))
       
    56         timezone_code = dict(cnx.execute('Any C, X WHERE X is TimeZone, X code C'))
       
    57         # Push data
       
    58         for ind, infos in enumerate(ucsvreader(open(dumpname),
       
    59                                                separator='\t',
       
    60                                                ignore_errors=True)):
       
    61             latitude = self.cast(float, infos[4])
       
    62             longitude = self.cast(float, infos[5])
       
    63             population = self.cast(int, infos[14])
       
    64             elevation = self.cast(int, infos[15])
       
    65             gtopo = self.cast(int, infos[16])
       
    66             feature_class = infos[6]
       
    67             if len(infos[6]) != 1:
       
    68                 feature_class = None
       
    69             entity = {'name': infos[1],
       
    70                       'asciiname': infos[2],
       
    71                       'alternatenames': infos[3],
       
    72                       'latitude': latitude, 'longitude': longitude,
       
    73                       'feature_class': feature_class,
       
    74                       'alternate_country_code':infos[9],
       
    75                       'admin_code_3': infos[12],
       
    76                       'admin_code_4': infos[13],
       
    77                       'population': population, 'elevation': elevation,
       
    78                       'gtopo30': gtopo, 'timezone': timezone_code.get(infos[17]),
       
    79                       'cwuri':  u'http://sws.geonames.org/%s/' % int(infos[0]),
       
    80                       'geonameid': int(infos[0]),
       
    81                       }
       
    82             store.create_entity('Location', **entity)
       
    83 
       
    84     def test_autoflush_metadata(self):
       
    85         with self.admin_access.repo_cnx() as cnx:
       
    86             crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s',
       
    87                                  {'t': 'Location'})
       
    88             self.assertEqual(len(crs.fetchall()), 0)
       
    89             store = MassiveObjectStore(cnx, autoflush_metadata=True)
       
    90             store.create_entity('Location', name=u'toto')
       
    91             store.flush()
       
    92             store.commit()
       
    93             store.cleanup()
       
    94             cnx.commit()
       
    95         with self.admin_access.repo_cnx() as cnx:
       
    96             crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s',
       
    97                                  {'t': 'Location'})
       
    98             self.assertEqual(len(crs.fetchall()), 1)
       
    99 
       
   100 #    def test_no_autoflush_metadata(self):
       
   101 #        with self.admin_access.repo_cnx() as cnx:
       
   102 #            crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s',
       
   103 #                                      {'t': 'Location'})
       
   104 #            self.assertEqual(len(crs.fetchall()), 0)
       
   105 #        with self.admin_access.repo_cnx() as cnx:
       
   106 #            store = MassiveObjectStore(cnx, autoflush_metadata=False)
       
   107 #            store.create_entity('Location', name=u'toto')
       
   108 #            store.flush()
       
   109 #            store.commit()
       
   110 #            crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s',
       
   111 #                                 {'t': 'Location'})
       
   112 #            self.assertEqual(len(crs.fetchall()), 0)
       
   113 #            store.flush_meta_data()
       
   114 #            crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s',
       
   115 #                                 {'t': 'Location'})
       
   116 #            self.assertEqual(len(crs.fetchall()), 1)
       
   117 #            store.cleanup()
       
   118 
       
   119     def test_massimport_etype_metadata(self):
       
   120         with self.admin_access.repo_cnx() as cnx:
       
   121             store = MassiveObjectStore(cnx)
       
   122             timezone = store.create_entity('TimeZone')
       
   123             store.create_entity('Location', timezone=timezone.eid)
       
   124             store.flush()
       
   125             store.commit()
       
   126             eid, etname = cnx.execute('Any X, TN WHERE X timezone TZ, X is T, '
       
   127                                       'T name TN')[0]
       
   128             self.assertEqual(cnx.entity_from_eid(eid).cw_etype, etname)
       
   129 
       
   130     def test_do_not_drop_index(self):
       
   131         with self.admin_access.repo_cnx() as cnx:
       
   132             store = MassiveObjectStore(cnx, drop_index=False)
       
   133             cnx.commit()
       
   134         with self.admin_access.repo_cnx() as cnx:
       
   135             crs = cnx.system_sql('SELECT indexname FROM pg_indexes')
       
   136             indexes = [r[0] for r in crs.fetchall()]
       
   137         self.assertIn('entities_pkey', indexes)
       
   138         self.assertIn('unique_entities_extid_idx', indexes)
       
   139         self.assertIn('owned_by_relation_p_key', indexes)
       
   140         self.assertIn('owned_by_relation_to_idx', indexes)
       
   141 
       
   142     def test_drop_index(self):
       
   143         with self.admin_access.repo_cnx() as cnx:
       
   144             store = MassiveObjectStore(cnx, drop_index=True)
       
   145             cnx.commit()
       
   146         with self.admin_access.repo_cnx() as cnx:
       
   147             crs = cnx.system_sql('SELECT indexname FROM pg_indexes')
       
   148             indexes = [r[0] for r in crs.fetchall()]
       
   149         self.assertNotIn('entities_pkey', indexes)
       
   150         self.assertNotIn('unique_entities_extid_idx', indexes)
       
   151         self.assertNotIn('owned_by_relation_pkey', indexes)
       
   152         self.assertNotIn('owned_by_relation_to_idx', indexes)
       
   153 
       
   154     def test_drop_index_recreation(self):
       
   155         with self.admin_access.repo_cnx() as cnx:
       
   156             store = MassiveObjectStore(cnx, drop_index=True)
       
   157             store.cleanup()
       
   158             cnx.commit()
       
   159         with self.admin_access.repo_cnx() as cnx:
       
   160             crs = cnx.system_sql('SELECT indexname FROM pg_indexes')
       
   161             indexes = [r[0] for r in crs.fetchall()]
       
   162         self.assertIn('entities_pkey', indexes)
       
   163         self.assertIn('unique_entities_extid_idx', indexes)
       
   164         self.assertIn('owned_by_relation_p_key', indexes)
       
   165         self.assertIn('owned_by_relation_to_idx', indexes)
       
   166 
       
   167     def test_eids_seq_range(self):
       
   168         with self.admin_access.repo_cnx() as cnx:
       
   169             store = MassiveObjectStore(cnx, eids_seq_range=1000, eids_seq_start=50000)
       
   170             store.create_entity('Location', name=u'toto')
       
   171             store.flush()
       
   172             cnx.commit()
       
   173         with self.admin_access.repo_cnx() as cnx:
       
   174             crs = cnx.system_sql("SELECT * FROM entities_id_seq")
       
   175             self.assertTrue(crs.fetchone() > 50000)
       
   176 
       
   177     def test_eid_entity(self):
       
   178         with self.admin_access.repo_cnx() as cnx:
       
   179             store = MassiveObjectStore(cnx, eids_seq_range=1000, eids_seq_start=50000)
       
   180             entity = store.create_entity('Location', name=u'toto')
       
   181             store.flush()
       
   182             self.assertTrue(entity.eid > 50000)
       
   183 
       
   184     def test_eid_entity_2(self):
       
   185         with self.admin_access.repo_cnx() as cnx:
       
   186             store = MassiveObjectStore(cnx, eids_seq_range=1000, eids_seq_start=50000)
       
   187             entity = store.create_entity('Location', name=u'toto', eid=10000)
       
   188             store.flush()
       
   189         with self.admin_access.repo_cnx() as cnx:
       
   190             self.assertTrue(entity.eid==10000)
       
   191 
       
   192     def test_on_commit_callback(self):
       
   193         counter = itertools.count()
       
   194         with self.admin_access.repo_cnx() as cnx:
       
   195             store = MassiveObjectStore(cnx, on_commit_callback=counter.next)
       
   196             store.create_entity('Location', name=u'toto')
       
   197             store.flush()
       
   198             store.commit()
       
   199         self.assertTrue(counter.next() >= 1)
       
   200 
       
   201     def test_on_rollback_callback(self):
       
   202         counter = itertools.count()
       
   203         with self.admin_access.repo_cnx() as cnx:
       
   204             store = MassiveObjectStore(cnx, on_rollback_callback=lambda *_: counter.next())
       
   205             store.create_entity('Location', nm='toto')
       
   206             store.flush()
       
   207             store.commit()
       
   208         self.assertTrue(counter.next() >= 1)
       
   209 
       
   210     def test_slave_mode_indexes(self):
       
   211         with self.admin_access.repo_cnx() as cnx:
       
   212             slave_store = MassiveObjectStore(cnx, slave_mode=True)
       
   213         with self.admin_access.repo_cnx() as cnx:
       
   214             crs = cnx.system_sql('SELECT indexname FROM pg_indexes')
       
   215             indexes = [r[0] for r in crs.fetchall()]
       
   216         self.assertIn('entities_pkey', indexes)
       
   217         self.assertIn('unique_entities_extid_idx', indexes)
       
   218         self.assertIn('owned_by_relation_p_key', indexes)
       
   219         self.assertIn('owned_by_relation_to_idx', indexes)
       
   220 
       
   221     def test_slave_mode_exception(self):
       
   222         with self.admin_access.repo_cnx() as cnx:
       
   223             master_store = MassiveObjectStore(cnx, slave_mode=False)
       
   224             slave_store = MassiveObjectStore(cnx, slave_mode=True)
       
   225             self.assertRaises(RuntimeError, slave_store.flush_meta_data)
       
   226             self.assertRaises(RuntimeError, slave_store.cleanup)
       
   227 
       
   228     def test_simple_insert(self):
       
   229         with self.admin_access.repo_cnx() as cnx:
       
   230             store = MassiveObjectStore(cnx, autoflush_metadata=True)
       
   231             self.push_geonames_data(osp.join(HERE, 'data/geonames.csv'), store)
       
   232             store.flush()
       
   233         with self.admin_access.repo_cnx() as cnx:
       
   234             rset = cnx.execute('Any X WHERE X is Location')
       
   235             self.assertEqual(len(rset), 4000)
       
   236             rset = cnx.execute('Any X WHERE X is Location, X timezone T')
       
   237             self.assertEqual(len(rset), 4000)
       
   238 
       
   239     def test_index_building(self):
       
   240         with self.admin_access.repo_cnx() as cnx:
       
   241             store = MassiveObjectStore(cnx, autoflush_metadata=True)
       
   242             self.push_geonames_data(osp.join(HERE, 'data/geonames.csv'), store)
       
   243             store.flush()
       
   244 
       
   245             # Check index
       
   246             crs = cnx.system_sql('SELECT indexname FROM pg_indexes')
       
   247             indexes = [r[0] for r in crs.fetchall()]
       
   248             self.assertNotIn('entities_pkey', indexes)
       
   249             self.assertNotIn('unique_entities_extid_idx', indexes)
       
   250             self.assertNotIn('owned_by_relation_p_key', indexes)
       
   251             self.assertNotIn('owned_by_relation_to_idx', indexes)
       
   252 
       
   253             # Cleanup -> index
       
   254             store.cleanup()
       
   255 
       
   256             # Check index again
       
   257             crs = cnx.system_sql('SELECT indexname FROM pg_indexes')
       
   258             indexes = [r[0] for r in crs.fetchall()]
       
   259             self.assertIn('entities_pkey', indexes)
       
   260             self.assertIn('unique_entities_extid_idx', indexes)
       
   261             self.assertIn('owned_by_relation_p_key', indexes)
       
   262             self.assertIn('owned_by_relation_to_idx', indexes)
       
   263 
       
   264     def test_flush_meta_data(self):
       
   265         with self.admin_access.repo_cnx() as cnx:
       
   266             store = MassiveObjectStore(cnx, autoflush_metadata=False)
       
   267             self.push_geonames_data(osp.join(HERE, 'data/geonames.csv'), store)
       
   268             store.flush()
       
   269             curs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s',
       
   270                                   {'t': 'Location'})
       
   271             self.assertEqual(len(curs.fetchall()), 0)
       
   272             # Flush metadata -> entities table is updated
       
   273             store.flush_meta_data()
       
   274             curs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s',
       
   275                                   {'t': 'Location'})
       
   276             self.assertEqual(len(curs.fetchall()), 4000)
       
   277 
       
   278     def test_multiple_insert(self):
       
   279         with self.admin_access.repo_cnx() as cnx:
       
   280             store = MassiveObjectStore(cnx)
       
   281             store.init_etype_table('TestLocation')
       
   282             store.cleanup()
       
   283             store = MassiveObjectStore(cnx)
       
   284             store.init_etype_table('TestLocation')
       
   285             store.cleanup()
       
   286 
       
   287     def test_multiple_insert_relation(self):
       
   288         with self.admin_access.repo_cnx() as cnx:
       
   289             store = MassiveObjectStore(cnx)
       
   290             store.init_relation_table('used_language')
       
   291             store.cleanup()
       
   292             store = MassiveObjectStore(cnx)
       
   293             store.init_relation_table('used_language')
       
   294             store.cleanup()
       
   295 
       
   296     def test_multiple_insert_drop_index(self):
       
   297         with self.admin_access.repo_cnx() as cnx:
       
   298             store = MassiveObjectStore(cnx, drop_index=False)
       
   299             store.init_relation_table('used_language')
       
   300             store.init_etype_table('TestLocation')
       
   301             store.cleanup()
       
   302             store = MassiveObjectStore(cnx)
       
   303             store.init_relation_table('used_language')
       
   304             store.init_etype_table('TestLocation')
       
   305             store.cleanup()
       
   306 
       
   307     def test_multiple_insert_drop_index_2(self):
       
   308         with self.admin_access.repo_cnx() as cnx:
       
   309             store = MassiveObjectStore(cnx)
       
   310             store.init_relation_table('used_language')
       
   311             store.init_etype_table('TestLocation')
       
   312             store.cleanup()
       
   313             store = MassiveObjectStore(cnx, drop_index=False)
       
   314             store.init_relation_table('used_language')
       
   315             store.init_etype_table('TestLocation')
       
   316             store.cleanup()
       
   317 
       
   318     def test_multiple_insert_drop_index_3(self):
       
   319         with self.admin_access.repo_cnx() as cnx:
       
   320             store = MassiveObjectStore(cnx, drop_index=False)
       
   321             store.init_relation_table('used_language')
       
   322             store.init_etype_table('TestLocation')
       
   323             store.cleanup()
       
   324             store = MassiveObjectStore(cnx, drop_index=False)
       
   325             store.init_relation_table('used_language')
       
   326             store.init_etype_table('TestLocation')
       
   327             store.cleanup()
       
   328 
       
   329 
       
   330 if __name__ == '__main__':
       
   331     from logilab.common.testlib import unittest_main
       
   332     unittest_main()
       
   333