1 # -*- coding: utf-8 -*- |
|
2 # copyright 2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
3 # contact http://www.logilab.fr -- mailto:contact@logilab.fr |
|
4 # |
|
5 # This program is free software: you can redistribute it and/or modify it under |
|
6 # the terms of the GNU Lesser General Public License as published by the Free |
|
7 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
8 # any later version. |
|
9 # |
|
10 # This program is distributed in the hope that it will be useful, but WITHOUT |
|
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
12 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
13 # details. |
|
14 # |
|
15 # You should have received a copy of the GNU Lesser General Public License along |
|
16 # with this program. If not, see <http://www.gnu.org/licenses/>. |
|
17 """SQL object store test case""" |
|
18 |
|
19 import itertools |
|
20 |
|
21 from cubicweb.dataimport import ucsvreader |
|
22 from cubicweb.devtools import testlib, PostgresApptestConfiguration |
|
23 from cubicweb.devtools import startpgcluster, stoppgcluster |
|
24 from cubicweb.dataimport.pgstore import SQLGenObjectStore |
|
25 |
|
26 |
|
27 def setUpModule(): |
|
28 startpgcluster(__file__) |
|
29 |
|
30 |
|
31 def tearDownModule(*args): |
|
32 stoppgcluster(__file__) |
|
33 |
|
34 |
|
35 class SQLGenImportSimpleTC(testlib.CubicWebTC): |
|
36 configcls = PostgresApptestConfiguration |
|
37 appid = 'data-massimport' |
|
38 |
|
39 def cast(self, _type, value): |
|
40 try: |
|
41 return _type(value) |
|
42 except ValueError: |
|
43 return None |
|
44 |
|
45 def push_geonames_data(self, dumpname, store): |
|
46 # Push timezones |
|
47 cnx = store._cnx |
|
48 for code, gmt, dst, raw_offset in ucsvreader(open(self.datapath('timeZones.txt'), 'rb'), |
|
49 delimiter='\t'): |
|
50 cnx.create_entity('TimeZone', code=code, gmt=float(gmt), |
|
51 dst=float(dst), raw_offset=float(raw_offset)) |
|
52 timezone_code = dict(cnx.execute('Any C, X WHERE X is TimeZone, X code C')) |
|
53 cnx.commit() |
|
54 # Push data |
|
55 for ind, infos in enumerate(ucsvreader(open(dumpname, 'rb'), |
|
56 delimiter='\t', |
|
57 ignore_errors=True)): |
|
58 if ind > 99: |
|
59 break |
|
60 latitude = self.cast(float, infos[4]) |
|
61 longitude = self.cast(float, infos[5]) |
|
62 population = self.cast(int, infos[14]) |
|
63 elevation = self.cast(int, infos[15]) |
|
64 gtopo = self.cast(int, infos[16]) |
|
65 feature_class = infos[6] |
|
66 if len(infos[6]) != 1: |
|
67 feature_class = None |
|
68 entity = {'name': infos[1], |
|
69 'asciiname': infos[2], |
|
70 'alternatenames': infos[3], |
|
71 'latitude': latitude, 'longitude': longitude, |
|
72 'feature_class': feature_class, |
|
73 'alternate_country_code':infos[9], |
|
74 'admin_code_3': infos[12], |
|
75 'admin_code_4': infos[13], |
|
76 'population': population, 'elevation': elevation, |
|
77 'gtopo30': gtopo, 'timezone': timezone_code.get(infos[17]), |
|
78 'cwuri': u'http://sws.geonames.org/%s/' % int(infos[0]), |
|
79 'geonameid': int(infos[0]), |
|
80 } |
|
81 store.prepare_insert_entity('Location', **entity) |
|
82 |
|
83 def test_autoflush_metadata(self): |
|
84 with self.admin_access.repo_cnx() as cnx: |
|
85 crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s', |
|
86 {'t': 'Location'}) |
|
87 self.assertEqual(len(crs.fetchall()), 0) |
|
88 store = SQLGenObjectStore(cnx) |
|
89 store.prepare_insert_entity('Location', name=u'toto') |
|
90 store.flush() |
|
91 store.commit() |
|
92 cnx.commit() |
|
93 with self.admin_access.repo_cnx() as cnx: |
|
94 crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s', |
|
95 {'t': 'Location'}) |
|
96 self.assertEqual(len(crs.fetchall()), 1) |
|
97 |
|
98 def test_sqlgenstore_etype_metadata(self): |
|
99 with self.admin_access.repo_cnx() as cnx: |
|
100 store = SQLGenObjectStore(cnx) |
|
101 timezone_eid = store.prepare_insert_entity('TimeZone') |
|
102 store.prepare_insert_entity('Location', timezone=timezone_eid) |
|
103 store.flush() |
|
104 store.commit() |
|
105 eid, etname = cnx.execute('Any X, TN WHERE X timezone TZ, X is T, ' |
|
106 'T name TN')[0] |
|
107 self.assertEqual(cnx.entity_from_eid(eid).cw_etype, etname) |
|
108 |
|
109 def test_simple_insert(self): |
|
110 with self.admin_access.repo_cnx() as cnx: |
|
111 store = SQLGenObjectStore(cnx) |
|
112 self.push_geonames_data(self.datapath('geonames.csv'), store) |
|
113 store.flush() |
|
114 store.commit() |
|
115 with self.admin_access.repo_cnx() as cnx: |
|
116 rset = cnx.execute('Any X WHERE X is Location') |
|
117 self.assertEqual(len(rset), 100) |
|
118 rset = cnx.execute('Any X WHERE X is Location, X timezone T') |
|
119 self.assertEqual(len(rset), 100) |
|
120 |
|
121 |
|
122 if __name__ == '__main__': |
|
123 from logilab.common.testlib import unittest_main |
|
124 unittest_main() |
|