--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/dataimport/test/unittest_importer.py Fri Jun 26 16:09:27 2015 +0200
@@ -0,0 +1,173 @@
+# -*- coding: utf-8 -*-
+# copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr -- mailto:contact@logilab.fr
+#
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License along
+# with this program. If not, see <http://www.gnu.org/licenses/>.
+"""Tests for cubicweb.dataimport.importer"""
+
+from collections import defaultdict
+
+from logilab.common.testlib import unittest_main
+
+from cubicweb import ValidationError
+from cubicweb.devtools.testlib import CubicWebTC
+from cubicweb.dataimport import RQLObjectStore, ucsvreader
+from cubicweb.dataimport.importer import ExtEntity, ExtEntitiesImporter, SimpleImportLog, RelationMapping
+
+
+class RelationMappingTC(CubicWebTC):
+
+ def test_nosource(self):
+ with self.admin_access.repo_cnx() as cnx:
+ alice_eid = cnx.create_entity('Personne', nom=u'alice').eid
+ bob_eid = cnx.create_entity('Personne', nom=u'bob', connait=alice_eid).eid
+ cnx.commit()
+ mapping = RelationMapping(cnx)
+ self.assertEqual(mapping['connait'],
+ set([(bob_eid, alice_eid), (alice_eid, bob_eid)]))
+
+ def test_with_source(self):
+ with self.admin_access.repo_cnx() as cnx:
+ alice_eid = cnx.create_entity('Personne', nom=u'alice').eid
+ bob_eid = cnx.create_entity('Personne', nom=u'bob', connait=alice_eid).eid
+ cnx.commit()
+ mapping = RelationMapping(cnx, cnx.find('CWSource', name=u'system').one())
+ self.assertEqual(mapping['connait'],
+ set([(bob_eid, alice_eid), (alice_eid, bob_eid)]))
+
+
+class ExtEntitiesImporterTC(CubicWebTC):
+
+ def importer(self, cnx):
+ store = RQLObjectStore(cnx)
+ return ExtEntitiesImporter(self.schema, store, raise_on_error=True)
+
+ def test_simple_import(self):
+ with self.admin_access.repo_cnx() as cnx:
+ importer = self.importer(cnx)
+ personne = ExtEntity('Personne', 1, {'nom': set([u'de la lune']),
+ 'prenom': set([u'Jean'])})
+ importer.import_entities([personne])
+ cnx.commit()
+ rset = cnx.execute('Any X WHERE X is Personne')
+ entity = rset.get_entity(0, 0)
+ self.assertEqual(entity.nom, u'de la lune')
+ self.assertEqual(entity.prenom, u'Jean')
+
+ def test_import_missing_required_attribute(self):
+ """Check import of ext entity with missing required attribute"""
+ with self.admin_access.repo_cnx() as cnx:
+ importer = self.importer(cnx)
+ tag = ExtEntity('Personne', 2, {'prenom': set([u'Jean'])})
+ self.assertRaises(ValidationError, importer.import_entities, [tag])
+
+ def test_import_inlined_relation(self):
+ """Check import of ext entities with inlined relation"""
+ with self.admin_access.repo_cnx() as cnx:
+ importer = self.importer(cnx)
+ richelieu = ExtEntity('Personne', 3, {'nom': set([u'Richelieu']),
+ 'enfant': set([4])})
+ athos = ExtEntity('Personne', 4, {'nom': set([u'Athos'])})
+ importer.import_entities([athos, richelieu])
+ cnx.commit()
+ rset = cnx.execute('Any X WHERE X is Personne, X nom "Richelieu"')
+ entity = rset.get_entity(0, 0)
+ self.assertEqual(entity.enfant[0].nom, 'Athos')
+
+ def test_import_non_inlined_relation(self):
+ """Check import of ext entities with non inlined relation"""
+ with self.admin_access.repo_cnx() as cnx:
+ importer = self.importer(cnx)
+ richelieu = ExtEntity('Personne', 5, {'nom': set([u'Richelieu']),
+ 'connait': set([6])})
+ athos = ExtEntity('Personne', 6, {'nom': set([u'Athos'])})
+ importer.import_entities([athos, richelieu])
+ cnx.commit()
+ rset = cnx.execute('Any X WHERE X is Personne, X nom "Richelieu"')
+ entity = rset.get_entity(0, 0)
+ self.assertEqual(entity.connait[0].nom, 'Athos')
+ rset = cnx.execute('Any X WHERE X is Personne, X nom "Athos"')
+ entity = rset.get_entity(0, 0)
+ self.assertEqual(entity.connait[0].nom, 'Richelieu')
+
+ def test_import_missing_inlined_relation(self):
+ """Check import of ext entity with missing inlined relation"""
+ with self.admin_access.repo_cnx() as cnx:
+ importer = self.importer(cnx)
+ richelieu = ExtEntity('Personne', 7,
+ {'nom': set([u'Richelieu']), 'enfant': set([8])})
+ self.assertRaises(Exception, importer.import_entities, [richelieu])
+ cnx.commit()
+ rset = cnx.execute('Any X WHERE X is Personne, X nom "Richelieu"')
+ self.assertEqual(len(rset), 0)
+
+ def test_import_missing_non_inlined_relation(self):
+ """Check import of ext entity with missing non-inlined relation"""
+ with self.admin_access.repo_cnx() as cnx:
+ importer = self.importer(cnx)
+ richelieu = ExtEntity('Personne', 9,
+ {'nom': set([u'Richelieu']), 'connait': set([10])})
+ self.assertRaises(Exception, importer.import_entities, [richelieu])
+ cnx.commit()
+ rset = cnx.execute('Any X WHERE X is Personne, X nom "Richelieu"')
+ entity = rset.get_entity(0, 0)
+ self.assertEqual(entity.nom, u'Richelieu')
+ self.assertEqual(len(entity.connait), 0)
+
+ def test_update(self):
+ """Check update of ext entity"""
+ with self.admin_access.repo_cnx() as cnx:
+ importer = self.importer(cnx)
+ # First import
+ richelieu = ExtEntity('Personne', 11,
+ {'nom': {u'Richelieu Diacre'}})
+ importer.import_entities([richelieu])
+ cnx.commit()
+ rset = cnx.execute('Any X WHERE X is Personne')
+ entity = rset.get_entity(0, 0)
+ self.assertEqual(entity.nom, u'Richelieu Diacre')
+ # Second import
+ richelieu = ExtEntity('Personne', 11,
+ {'nom': {u'Richelieu Cardinal'}})
+ importer.import_entities([richelieu])
+ cnx.commit()
+ rset = cnx.execute('Any X WHERE X is Personne')
+ self.assertEqual(len(rset), 1)
+ entity = rset.get_entity(0, 0)
+ self.assertEqual(entity.nom, u'Richelieu Cardinal')
+
+
+def extentities_from_csv(fpath):
+ """Yield ExtEntity read from `fpath` CSV file."""
+ with open(fpath) as f:
+ for uri, name, knows in ucsvreader(f, skipfirst=True, skip_empty=False):
+ yield ExtEntity('Personne', uri,
+ {'nom': set([name]), 'connait': set([knows])})
+
+
+class DataimportFunctionalTC(CubicWebTC):
+
+ def test_csv(self):
+ extenties = extentities_from_csv(self.datapath('people.csv'))
+ with self.admin_access.repo_cnx() as cnx:
+ store = RQLObjectStore(cnx)
+ importer = ExtEntitiesImporter(self.schema, store)
+ importer.import_entities(extenties)
+ cnx.commit()
+ rset = cnx.execute('String N WHERE X nom N, X connait Y, Y nom "Alice"')
+ self.assertEqual(rset[0][0], u'Bob')
+
+
+if __name__ == '__main__':
+ unittest_main()