|
1 # -*- coding: utf-8 -*- |
|
2 # copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
3 # contact http://www.logilab.fr -- mailto:contact@logilab.fr |
|
4 # |
|
5 # This program is free software: you can redistribute it and/or modify it under |
|
6 # the terms of the GNU Lesser General Public License as published by the Free |
|
7 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
8 # any later version. |
|
9 # |
|
10 # This program is distributed in the hope that it will be useful, but WITHOUT |
|
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
12 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
13 # details. |
|
14 # |
|
15 # You should have received a copy of the GNU Lesser General Public License along |
|
16 # with this program. If not, see <http://www.gnu.org/licenses/>. |
|
17 """Tests for cubicweb.dataimport.importer""" |
|
18 |
|
19 from collections import defaultdict |
|
20 |
|
21 from logilab.common.testlib import TestCase, unittest_main |
|
22 |
|
23 from cubicweb import ValidationError |
|
24 from cubicweb.devtools.testlib import CubicWebTC |
|
25 from cubicweb.dataimport import RQLObjectStore, ucsvreader |
|
26 from cubicweb.dataimport.importer import (ExtEntity, ExtEntitiesImporter, SimpleImportLog, |
|
27 RelationMapping, use_extid_as_cwuri) |
|
28 |
|
29 |
|
30 class RelationMappingTC(CubicWebTC): |
|
31 |
|
32 def test_nosource(self): |
|
33 with self.admin_access.repo_cnx() as cnx: |
|
34 alice_eid = cnx.create_entity('Personne', nom=u'alice').eid |
|
35 bob_eid = cnx.create_entity('Personne', nom=u'bob', connait=alice_eid).eid |
|
36 cnx.commit() |
|
37 mapping = RelationMapping(cnx) |
|
38 self.assertEqual(mapping['connait'], |
|
39 set([(bob_eid, alice_eid), (alice_eid, bob_eid)])) |
|
40 |
|
41 def test_with_source(self): |
|
42 with self.admin_access.repo_cnx() as cnx: |
|
43 alice_eid = cnx.create_entity('Personne', nom=u'alice').eid |
|
44 bob_eid = cnx.create_entity('Personne', nom=u'bob', connait=alice_eid).eid |
|
45 cnx.commit() |
|
46 mapping = RelationMapping(cnx, cnx.find('CWSource', name=u'system').one()) |
|
47 self.assertEqual(mapping['connait'], |
|
48 set([(bob_eid, alice_eid), (alice_eid, bob_eid)])) |
|
49 |
|
50 |
|
51 class ExtEntitiesImporterTC(CubicWebTC): |
|
52 |
|
53 def importer(self, cnx): |
|
54 store = RQLObjectStore(cnx) |
|
55 return ExtEntitiesImporter(self.schema, store, raise_on_error=True) |
|
56 |
|
57 def test_simple_import(self): |
|
58 with self.admin_access.repo_cnx() as cnx: |
|
59 importer = self.importer(cnx) |
|
60 personne = ExtEntity('Personne', 1, {'nom': set([u'de la lune']), |
|
61 'prenom': set([u'Jean'])}) |
|
62 importer.import_entities([personne]) |
|
63 cnx.commit() |
|
64 rset = cnx.execute('Any X WHERE X is Personne') |
|
65 entity = rset.get_entity(0, 0) |
|
66 self.assertEqual(entity.nom, u'de la lune') |
|
67 self.assertEqual(entity.prenom, u'Jean') |
|
68 |
|
69 def test_import_missing_required_attribute(self): |
|
70 """Check import of ext entity with missing required attribute""" |
|
71 with self.admin_access.repo_cnx() as cnx: |
|
72 importer = self.importer(cnx) |
|
73 tag = ExtEntity('Personne', 2, {'prenom': set([u'Jean'])}) |
|
74 self.assertRaises(ValidationError, importer.import_entities, [tag]) |
|
75 |
|
76 def test_import_inlined_relation(self): |
|
77 """Check import of ext entities with inlined relation""" |
|
78 with self.admin_access.repo_cnx() as cnx: |
|
79 importer = self.importer(cnx) |
|
80 richelieu = ExtEntity('Personne', 3, {'nom': set([u'Richelieu']), |
|
81 'enfant': set([4])}) |
|
82 athos = ExtEntity('Personne', 4, {'nom': set([u'Athos'])}) |
|
83 importer.import_entities([athos, richelieu]) |
|
84 cnx.commit() |
|
85 rset = cnx.execute('Any X WHERE X is Personne, X nom "Richelieu"') |
|
86 entity = rset.get_entity(0, 0) |
|
87 self.assertEqual(entity.enfant[0].nom, 'Athos') |
|
88 |
|
89 def test_import_non_inlined_relation(self): |
|
90 """Check import of ext entities with non inlined relation""" |
|
91 with self.admin_access.repo_cnx() as cnx: |
|
92 importer = self.importer(cnx) |
|
93 richelieu = ExtEntity('Personne', 5, {'nom': set([u'Richelieu']), |
|
94 'connait': set([6])}) |
|
95 athos = ExtEntity('Personne', 6, {'nom': set([u'Athos'])}) |
|
96 importer.import_entities([athos, richelieu]) |
|
97 cnx.commit() |
|
98 rset = cnx.execute('Any X WHERE X is Personne, X nom "Richelieu"') |
|
99 entity = rset.get_entity(0, 0) |
|
100 self.assertEqual(entity.connait[0].nom, 'Athos') |
|
101 rset = cnx.execute('Any X WHERE X is Personne, X nom "Athos"') |
|
102 entity = rset.get_entity(0, 0) |
|
103 self.assertEqual(entity.connait[0].nom, 'Richelieu') |
|
104 |
|
105 def test_import_missing_inlined_relation(self): |
|
106 """Check import of ext entity with missing inlined relation""" |
|
107 with self.admin_access.repo_cnx() as cnx: |
|
108 importer = self.importer(cnx) |
|
109 richelieu = ExtEntity('Personne', 7, |
|
110 {'nom': set([u'Richelieu']), 'enfant': set([8])}) |
|
111 self.assertRaises(Exception, importer.import_entities, [richelieu]) |
|
112 cnx.commit() |
|
113 rset = cnx.execute('Any X WHERE X is Personne, X nom "Richelieu"') |
|
114 self.assertEqual(len(rset), 0) |
|
115 |
|
116 def test_import_missing_non_inlined_relation(self): |
|
117 """Check import of ext entity with missing non-inlined relation""" |
|
118 with self.admin_access.repo_cnx() as cnx: |
|
119 importer = self.importer(cnx) |
|
120 richelieu = ExtEntity('Personne', 9, |
|
121 {'nom': set([u'Richelieu']), 'connait': set([10])}) |
|
122 self.assertRaises(Exception, importer.import_entities, [richelieu]) |
|
123 cnx.commit() |
|
124 rset = cnx.execute('Any X WHERE X is Personne, X nom "Richelieu"') |
|
125 entity = rset.get_entity(0, 0) |
|
126 self.assertEqual(entity.nom, u'Richelieu') |
|
127 self.assertEqual(len(entity.connait), 0) |
|
128 |
|
129 def test_update(self): |
|
130 """Check update of ext entity""" |
|
131 with self.admin_access.repo_cnx() as cnx: |
|
132 importer = self.importer(cnx) |
|
133 # First import |
|
134 richelieu = ExtEntity('Personne', 11, |
|
135 {'nom': {u'Richelieu Diacre'}}) |
|
136 importer.import_entities([richelieu]) |
|
137 cnx.commit() |
|
138 rset = cnx.execute('Any X WHERE X is Personne') |
|
139 entity = rset.get_entity(0, 0) |
|
140 self.assertEqual(entity.nom, u'Richelieu Diacre') |
|
141 # Second import |
|
142 richelieu = ExtEntity('Personne', 11, |
|
143 {'nom': {u'Richelieu Cardinal'}}) |
|
144 importer.import_entities([richelieu]) |
|
145 cnx.commit() |
|
146 rset = cnx.execute('Any X WHERE X is Personne') |
|
147 self.assertEqual(len(rset), 1) |
|
148 entity = rset.get_entity(0, 0) |
|
149 self.assertEqual(entity.nom, u'Richelieu Cardinal') |
|
150 |
|
151 |
|
152 class UseExtidAsCwuriTC(TestCase): |
|
153 |
|
154 def test(self): |
|
155 personne = ExtEntity('Personne', b'1', {'nom': set([u'de la lune']), |
|
156 'prenom': set([u'Jean'])}) |
|
157 mapping = {} |
|
158 set_cwuri = use_extid_as_cwuri(mapping) |
|
159 list(set_cwuri((personne,))) |
|
160 self.assertIn('cwuri', personne.values) |
|
161 self.assertEqual(personne.values['cwuri'], set([u'1'])) |
|
162 mapping[b'1'] = 'whatever' |
|
163 personne.values.pop('cwuri') |
|
164 list(set_cwuri((personne,))) |
|
165 self.assertNotIn('cwuri', personne.values) |
|
166 |
|
167 |
|
168 def extentities_from_csv(fpath): |
|
169 """Yield ExtEntity read from `fpath` CSV file.""" |
|
170 with open(fpath, 'rb') as f: |
|
171 for uri, name, knows in ucsvreader(f, skipfirst=True, skip_empty=False): |
|
172 yield ExtEntity('Personne', uri, |
|
173 {'nom': set([name]), 'connait': set([knows])}) |
|
174 |
|
175 |
|
176 class DataimportFunctionalTC(CubicWebTC): |
|
177 |
|
178 def test_csv(self): |
|
179 extenties = extentities_from_csv(self.datapath('people.csv')) |
|
180 with self.admin_access.repo_cnx() as cnx: |
|
181 store = RQLObjectStore(cnx) |
|
182 importer = ExtEntitiesImporter(self.schema, store) |
|
183 importer.import_entities(extenties) |
|
184 cnx.commit() |
|
185 rset = cnx.execute('String N WHERE X nom N, X connait Y, Y nom "Alice"') |
|
186 self.assertEqual(rset[0][0], u'Bob') |
|
187 |
|
188 |
|
189 if __name__ == '__main__': |
|
190 unittest_main() |