|
1 # -*- coding: utf-8 -*- |
|
2 # copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
3 # contact http://www.logilab.fr -- mailto:contact@logilab.fr |
|
4 # |
|
5 # This program is free software: you can redistribute it and/or modify it under |
|
6 # the terms of the GNU Lesser General Public License as published by the Free |
|
7 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
8 # any later version. |
|
9 # |
|
10 # This program is distributed in the hope that it will be useful, but WITHOUT |
|
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
12 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
13 # details. |
|
14 # |
|
15 # You should have received a copy of the GNU Lesser General Public License along |
|
16 # with this program. If not, see <http://www.gnu.org/licenses/>. |
|
17 """Tests for cubicweb.dataimport.importer""" |
|
18 |
|
19 from collections import defaultdict |
|
20 |
|
21 from logilab.common.testlib import unittest_main |
|
22 |
|
23 from cubicweb import ValidationError |
|
24 from cubicweb.devtools.testlib import CubicWebTC |
|
25 from cubicweb.dataimport import RQLObjectStore, ucsvreader |
|
26 from cubicweb.dataimport.importer import ExtEntity, ExtEntitiesImporter, SimpleImportLog, RelationMapping |
|
27 |
|
28 |
|
29 class RelationMappingTC(CubicWebTC): |
|
30 |
|
31 def test_nosource(self): |
|
32 with self.admin_access.repo_cnx() as cnx: |
|
33 alice_eid = cnx.create_entity('Personne', nom=u'alice').eid |
|
34 bob_eid = cnx.create_entity('Personne', nom=u'bob', connait=alice_eid).eid |
|
35 cnx.commit() |
|
36 mapping = RelationMapping(cnx) |
|
37 self.assertEqual(mapping['connait'], |
|
38 set([(bob_eid, alice_eid), (alice_eid, bob_eid)])) |
|
39 |
|
40 def test_with_source(self): |
|
41 with self.admin_access.repo_cnx() as cnx: |
|
42 alice_eid = cnx.create_entity('Personne', nom=u'alice').eid |
|
43 bob_eid = cnx.create_entity('Personne', nom=u'bob', connait=alice_eid).eid |
|
44 cnx.commit() |
|
45 mapping = RelationMapping(cnx, cnx.find('CWSource', name=u'system').one()) |
|
46 self.assertEqual(mapping['connait'], |
|
47 set([(bob_eid, alice_eid), (alice_eid, bob_eid)])) |
|
48 |
|
49 |
|
50 class ExtEntitiesImporterTC(CubicWebTC): |
|
51 |
|
52 def importer(self, cnx): |
|
53 store = RQLObjectStore(cnx) |
|
54 return ExtEntitiesImporter(self.schema, store, raise_on_error=True) |
|
55 |
|
56 def test_simple_import(self): |
|
57 with self.admin_access.repo_cnx() as cnx: |
|
58 importer = self.importer(cnx) |
|
59 personne = ExtEntity('Personne', 1, {'nom': set([u'de la lune']), |
|
60 'prenom': set([u'Jean'])}) |
|
61 importer.import_entities([personne]) |
|
62 cnx.commit() |
|
63 rset = cnx.execute('Any X WHERE X is Personne') |
|
64 entity = rset.get_entity(0, 0) |
|
65 self.assertEqual(entity.nom, u'de la lune') |
|
66 self.assertEqual(entity.prenom, u'Jean') |
|
67 |
|
68 def test_import_missing_required_attribute(self): |
|
69 """Check import of ext entity with missing required attribute""" |
|
70 with self.admin_access.repo_cnx() as cnx: |
|
71 importer = self.importer(cnx) |
|
72 tag = ExtEntity('Personne', 2, {'prenom': set([u'Jean'])}) |
|
73 self.assertRaises(ValidationError, importer.import_entities, [tag]) |
|
74 |
|
75 def test_import_inlined_relation(self): |
|
76 """Check import of ext entities with inlined relation""" |
|
77 with self.admin_access.repo_cnx() as cnx: |
|
78 importer = self.importer(cnx) |
|
79 richelieu = ExtEntity('Personne', 3, {'nom': set([u'Richelieu']), |
|
80 'enfant': set([4])}) |
|
81 athos = ExtEntity('Personne', 4, {'nom': set([u'Athos'])}) |
|
82 importer.import_entities([athos, richelieu]) |
|
83 cnx.commit() |
|
84 rset = cnx.execute('Any X WHERE X is Personne, X nom "Richelieu"') |
|
85 entity = rset.get_entity(0, 0) |
|
86 self.assertEqual(entity.enfant[0].nom, 'Athos') |
|
87 |
|
88 def test_import_non_inlined_relation(self): |
|
89 """Check import of ext entities with non inlined relation""" |
|
90 with self.admin_access.repo_cnx() as cnx: |
|
91 importer = self.importer(cnx) |
|
92 richelieu = ExtEntity('Personne', 5, {'nom': set([u'Richelieu']), |
|
93 'connait': set([6])}) |
|
94 athos = ExtEntity('Personne', 6, {'nom': set([u'Athos'])}) |
|
95 importer.import_entities([athos, richelieu]) |
|
96 cnx.commit() |
|
97 rset = cnx.execute('Any X WHERE X is Personne, X nom "Richelieu"') |
|
98 entity = rset.get_entity(0, 0) |
|
99 self.assertEqual(entity.connait[0].nom, 'Athos') |
|
100 rset = cnx.execute('Any X WHERE X is Personne, X nom "Athos"') |
|
101 entity = rset.get_entity(0, 0) |
|
102 self.assertEqual(entity.connait[0].nom, 'Richelieu') |
|
103 |
|
104 def test_import_missing_inlined_relation(self): |
|
105 """Check import of ext entity with missing inlined relation""" |
|
106 with self.admin_access.repo_cnx() as cnx: |
|
107 importer = self.importer(cnx) |
|
108 richelieu = ExtEntity('Personne', 7, |
|
109 {'nom': set([u'Richelieu']), 'enfant': set([8])}) |
|
110 self.assertRaises(Exception, importer.import_entities, [richelieu]) |
|
111 cnx.commit() |
|
112 rset = cnx.execute('Any X WHERE X is Personne, X nom "Richelieu"') |
|
113 self.assertEqual(len(rset), 0) |
|
114 |
|
115 def test_import_missing_non_inlined_relation(self): |
|
116 """Check import of ext entity with missing non-inlined relation""" |
|
117 with self.admin_access.repo_cnx() as cnx: |
|
118 importer = self.importer(cnx) |
|
119 richelieu = ExtEntity('Personne', 9, |
|
120 {'nom': set([u'Richelieu']), 'connait': set([10])}) |
|
121 self.assertRaises(Exception, importer.import_entities, [richelieu]) |
|
122 cnx.commit() |
|
123 rset = cnx.execute('Any X WHERE X is Personne, X nom "Richelieu"') |
|
124 entity = rset.get_entity(0, 0) |
|
125 self.assertEqual(entity.nom, u'Richelieu') |
|
126 self.assertEqual(len(entity.connait), 0) |
|
127 |
|
128 def test_update(self): |
|
129 """Check update of ext entity""" |
|
130 with self.admin_access.repo_cnx() as cnx: |
|
131 importer = self.importer(cnx) |
|
132 # First import |
|
133 richelieu = ExtEntity('Personne', 11, |
|
134 {'nom': {u'Richelieu Diacre'}}) |
|
135 importer.import_entities([richelieu]) |
|
136 cnx.commit() |
|
137 rset = cnx.execute('Any X WHERE X is Personne') |
|
138 entity = rset.get_entity(0, 0) |
|
139 self.assertEqual(entity.nom, u'Richelieu Diacre') |
|
140 # Second import |
|
141 richelieu = ExtEntity('Personne', 11, |
|
142 {'nom': {u'Richelieu Cardinal'}}) |
|
143 importer.import_entities([richelieu]) |
|
144 cnx.commit() |
|
145 rset = cnx.execute('Any X WHERE X is Personne') |
|
146 self.assertEqual(len(rset), 1) |
|
147 entity = rset.get_entity(0, 0) |
|
148 self.assertEqual(entity.nom, u'Richelieu Cardinal') |
|
149 |
|
150 |
|
151 def extentities_from_csv(fpath): |
|
152 """Yield ExtEntity read from `fpath` CSV file.""" |
|
153 with open(fpath) as f: |
|
154 for uri, name, knows in ucsvreader(f, skipfirst=True, skip_empty=False): |
|
155 yield ExtEntity('Personne', uri, |
|
156 {'nom': set([name]), 'connait': set([knows])}) |
|
157 |
|
158 |
|
159 class DataimportFunctionalTC(CubicWebTC): |
|
160 |
|
161 def test_csv(self): |
|
162 extenties = extentities_from_csv(self.datapath('people.csv')) |
|
163 with self.admin_access.repo_cnx() as cnx: |
|
164 store = RQLObjectStore(cnx) |
|
165 importer = ExtEntitiesImporter(self.schema, store) |
|
166 importer.import_entities(extenties) |
|
167 cnx.commit() |
|
168 rset = cnx.execute('String N WHERE X nom N, X connait Y, Y nom "Alice"') |
|
169 self.assertEqual(rset[0][0], u'Bob') |
|
170 |
|
171 |
|
172 if __name__ == '__main__': |
|
173 unittest_main() |