1 # -*- coding: utf-8 -*- |
|
2 # copyright 2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
3 # contact http://www.logilab.fr -- mailto:contact@logilab.fr |
|
4 # |
|
5 # This program is free software: you can redistribute it and/or modify it under |
|
6 # the terms of the GNU Lesser General Public License as published by the Free |
|
7 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
8 # any later version. |
|
9 # |
|
10 # This program is distributed in the hope that it will be useful, but WITHOUT |
|
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
12 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
13 # details. |
|
14 # |
|
15 # You should have received a copy of the GNU Lesser General Public License along |
|
16 # with this program. If not, see <http://www.gnu.org/licenses/>. |
|
17 """SQL object store test case""" |
|
18 |
|
19 from cubicweb.dataimport import ucsvreader |
|
20 from cubicweb.devtools import testlib, PostgresApptestConfiguration |
|
21 from cubicweb.devtools import startpgcluster, stoppgcluster |
|
22 from cubicweb.dataimport.pgstore import SQLGenObjectStore |
|
23 |
|
24 |
|
25 def setUpModule(): |
|
26 startpgcluster(__file__) |
|
27 |
|
28 |
|
29 def tearDownModule(*args): |
|
30 stoppgcluster(__file__) |
|
31 |
|
32 |
|
33 class SQLGenImportSimpleTC(testlib.CubicWebTC): |
|
34 configcls = PostgresApptestConfiguration |
|
35 appid = 'data-massimport' |
|
36 |
|
37 def cast(self, _type, value): |
|
38 try: |
|
39 return _type(value) |
|
40 except ValueError: |
|
41 return None |
|
42 |
|
43 def push_geonames_data(self, dumpname, store): |
|
44 # Push timezones |
|
45 cnx = store._cnx |
|
46 for code, gmt, dst, raw_offset in ucsvreader(open(self.datapath('timeZones.txt'), 'rb'), |
|
47 delimiter='\t'): |
|
48 cnx.create_entity('TimeZone', code=code, gmt=float(gmt), |
|
49 dst=float(dst), raw_offset=float(raw_offset)) |
|
50 timezone_code = dict(cnx.execute('Any C, X WHERE X is TimeZone, X code C')) |
|
51 cnx.commit() |
|
52 # Push data |
|
53 for ind, infos in enumerate(ucsvreader(open(dumpname, 'rb'), |
|
54 delimiter='\t', |
|
55 ignore_errors=True)): |
|
56 if ind > 99: |
|
57 break |
|
58 latitude = self.cast(float, infos[4]) |
|
59 longitude = self.cast(float, infos[5]) |
|
60 population = self.cast(int, infos[14]) |
|
61 elevation = self.cast(int, infos[15]) |
|
62 gtopo = self.cast(int, infos[16]) |
|
63 feature_class = infos[6] |
|
64 if len(infos[6]) != 1: |
|
65 feature_class = None |
|
66 entity = {'name': infos[1], |
|
67 'asciiname': infos[2], |
|
68 'alternatenames': infos[3], |
|
69 'latitude': latitude, 'longitude': longitude, |
|
70 'feature_class': feature_class, |
|
71 'alternate_country_code': infos[9], |
|
72 'admin_code_3': infos[12], |
|
73 'admin_code_4': infos[13], |
|
74 'population': population, 'elevation': elevation, |
|
75 'gtopo30': gtopo, 'timezone': timezone_code.get(infos[17]), |
|
76 'cwuri': u'http://sws.geonames.org/%s/' % int(infos[0]), |
|
77 'geonameid': int(infos[0]), |
|
78 } |
|
79 store.prepare_insert_entity('Location', **entity) |
|
80 |
|
81 def test_autoflush_metadata(self): |
|
82 with self.admin_access.repo_cnx() as cnx: |
|
83 crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s', |
|
84 {'t': 'Location'}) |
|
85 self.assertEqual(len(crs.fetchall()), 0) |
|
86 store = SQLGenObjectStore(cnx) |
|
87 store.prepare_insert_entity('Location', name=u'toto') |
|
88 store.flush() |
|
89 store.commit() |
|
90 cnx.commit() |
|
91 with self.admin_access.repo_cnx() as cnx: |
|
92 crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s', |
|
93 {'t': 'Location'}) |
|
94 self.assertEqual(len(crs.fetchall()), 1) |
|
95 |
|
96 def test_sqlgenstore_etype_metadata(self): |
|
97 with self.admin_access.repo_cnx() as cnx: |
|
98 store = SQLGenObjectStore(cnx) |
|
99 timezone_eid = store.prepare_insert_entity('TimeZone', code=u'12') |
|
100 store.prepare_insert_entity('Location', timezone=timezone_eid) |
|
101 store.flush() |
|
102 store.commit() |
|
103 eid, etname = cnx.execute('Any X, TN WHERE X timezone TZ, X is T, ' |
|
104 'T name TN')[0] |
|
105 self.assertEqual(cnx.entity_from_eid(eid).cw_etype, etname) |
|
106 |
|
107 def test_simple_insert(self): |
|
108 with self.admin_access.repo_cnx() as cnx: |
|
109 store = SQLGenObjectStore(cnx) |
|
110 self.push_geonames_data(self.datapath('geonames.csv'), store) |
|
111 store.flush() |
|
112 store.commit() |
|
113 with self.admin_access.repo_cnx() as cnx: |
|
114 rset = cnx.execute('Any X WHERE X is Location') |
|
115 self.assertEqual(len(rset), 100) |
|
116 rset = cnx.execute('Any X WHERE X is Location, X timezone T') |
|
117 self.assertEqual(len(rset), 100) |
|
118 |
|
119 |
|
120 if __name__ == '__main__': |
|
121 import unittest |
|
122 unittest.main() |
|