dataimport/test/unittest_importer.py
changeset 11057 0b59724cb3f2
parent 11052 058bb3dc685f
child 11058 23eb30449fe5
equal deleted inserted replaced
11052:058bb3dc685f 11057:0b59724cb3f2
     1 # -*- coding: utf-8 -*-
       
     2 # copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     3 # contact http://www.logilab.fr -- mailto:contact@logilab.fr
       
     4 #
       
     5 # This program is free software: you can redistribute it and/or modify it under
       
     6 # the terms of the GNU Lesser General Public License as published by the Free
       
     7 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     8 # any later version.
       
     9 #
       
    10 # This program is distributed in the hope that it will be useful, but WITHOUT
       
    11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    12 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
       
    13 # details.
       
    14 #
       
    15 # You should have received a copy of the GNU Lesser General Public License along
       
    16 # with this program. If not, see <http://www.gnu.org/licenses/>.
       
    17 """Tests for cubicweb.dataimport.importer"""
       
    18 
       
    19 from collections import defaultdict
       
    20 
       
    21 from logilab.common.testlib import TestCase, unittest_main
       
    22 
       
    23 from cubicweb import ValidationError
       
    24 from cubicweb.devtools.testlib import CubicWebTC
       
    25 from cubicweb.dataimport import RQLObjectStore, ucsvreader
       
    26 from cubicweb.dataimport.importer import (ExtEntity, ExtEntitiesImporter, SimpleImportLog,
       
    27                                           RelationMapping, use_extid_as_cwuri)
       
    28 
       
    29 
       
    30 class RelationMappingTC(CubicWebTC):
       
    31 
       
    32     def test_nosource(self):
       
    33         with self.admin_access.repo_cnx() as cnx:
       
    34             alice_eid = cnx.create_entity('Personne', nom=u'alice').eid
       
    35             bob_eid = cnx.create_entity('Personne', nom=u'bob', connait=alice_eid).eid
       
    36             cnx.commit()
       
    37             mapping = RelationMapping(cnx)
       
    38             self.assertEqual(mapping['connait'],
       
    39                              set([(bob_eid, alice_eid), (alice_eid, bob_eid)]))
       
    40 
       
    41     def test_with_source(self):
       
    42         with self.admin_access.repo_cnx() as cnx:
       
    43             alice_eid = cnx.create_entity('Personne', nom=u'alice').eid
       
    44             bob_eid = cnx.create_entity('Personne', nom=u'bob', connait=alice_eid).eid
       
    45             cnx.commit()
       
    46             mapping = RelationMapping(cnx, cnx.find('CWSource', name=u'system').one())
       
    47             self.assertEqual(mapping['connait'],
       
    48                              set([(bob_eid, alice_eid), (alice_eid, bob_eid)]))
       
    49 
       
    50 
       
    51 class ExtEntitiesImporterTC(CubicWebTC):
       
    52 
       
    53     def importer(self, cnx):
       
    54         store = RQLObjectStore(cnx)
       
    55         return ExtEntitiesImporter(self.schema, store, raise_on_error=True)
       
    56 
       
    57     def test_simple_import(self):
       
    58         with self.admin_access.repo_cnx() as cnx:
       
    59             importer = self.importer(cnx)
       
    60             personne = ExtEntity('Personne', 1, {'nom': set([u'de la lune']),
       
    61                                                  'prenom': set([u'Jean'])})
       
    62             importer.import_entities([personne])
       
    63             cnx.commit()
       
    64             rset = cnx.execute('Any X WHERE X is Personne')
       
    65             entity = rset.get_entity(0, 0)
       
    66             self.assertEqual(entity.nom, u'de la lune')
       
    67             self.assertEqual(entity.prenom, u'Jean')
       
    68 
       
    69     def test_import_missing_required_attribute(self):
       
    70         """Check import of ext entity with missing required attribute"""
       
    71         with self.admin_access.repo_cnx() as cnx:
       
    72             importer = self.importer(cnx)
       
    73             tag = ExtEntity('Personne', 2, {'prenom': set([u'Jean'])})
       
    74             self.assertRaises(ValidationError, importer.import_entities, [tag])
       
    75 
       
    76     def test_import_inlined_relation(self):
       
    77         """Check import of ext entities with inlined relation"""
       
    78         with self.admin_access.repo_cnx() as cnx:
       
    79             importer = self.importer(cnx)
       
    80             richelieu = ExtEntity('Personne', 3, {'nom': set([u'Richelieu']),
       
    81                                                   'enfant': set([4])})
       
    82             athos = ExtEntity('Personne', 4, {'nom': set([u'Athos'])})
       
    83             importer.import_entities([athos, richelieu])
       
    84             cnx.commit()
       
    85             rset = cnx.execute('Any X WHERE X is Personne, X nom "Richelieu"')
       
    86             entity = rset.get_entity(0, 0)
       
    87             self.assertEqual(entity.enfant[0].nom, 'Athos')
       
    88 
       
    89     def test_import_non_inlined_relation(self):
       
    90         """Check import of ext entities with non inlined relation"""
       
    91         with self.admin_access.repo_cnx() as cnx:
       
    92             importer = self.importer(cnx)
       
    93             richelieu = ExtEntity('Personne', 5, {'nom': set([u'Richelieu']),
       
    94                                                   'connait': set([6])})
       
    95             athos = ExtEntity('Personne', 6, {'nom': set([u'Athos'])})
       
    96             importer.import_entities([athos, richelieu])
       
    97             cnx.commit()
       
    98             rset = cnx.execute('Any X WHERE X is Personne, X nom "Richelieu"')
       
    99             entity = rset.get_entity(0, 0)
       
   100             self.assertEqual(entity.connait[0].nom, 'Athos')
       
   101             rset = cnx.execute('Any X WHERE X is Personne, X nom "Athos"')
       
   102             entity = rset.get_entity(0, 0)
       
   103             self.assertEqual(entity.connait[0].nom, 'Richelieu')
       
   104 
       
   105     def test_import_missing_inlined_relation(self):
       
   106         """Check import of ext entity with missing inlined relation"""
       
   107         with self.admin_access.repo_cnx() as cnx:
       
   108             importer = self.importer(cnx)
       
   109             richelieu = ExtEntity('Personne', 7,
       
   110                                   {'nom': set([u'Richelieu']), 'enfant': set([8])})
       
   111             self.assertRaises(Exception, importer.import_entities, [richelieu])
       
   112             cnx.commit()
       
   113             rset = cnx.execute('Any X WHERE X is Personne, X nom "Richelieu"')
       
   114             self.assertEqual(len(rset), 0)
       
   115 
       
   116     def test_import_missing_non_inlined_relation(self):
       
   117         """Check import of ext entity with missing non-inlined relation"""
       
   118         with self.admin_access.repo_cnx() as cnx:
       
   119             importer = self.importer(cnx)
       
   120             richelieu = ExtEntity('Personne', 9,
       
   121                                   {'nom': set([u'Richelieu']), 'connait': set([10])})
       
   122             self.assertRaises(Exception, importer.import_entities, [richelieu])
       
   123             cnx.commit()
       
   124             rset = cnx.execute('Any X WHERE X is Personne, X nom "Richelieu"')
       
   125             entity = rset.get_entity(0, 0)
       
   126             self.assertEqual(entity.nom, u'Richelieu')
       
   127             self.assertEqual(len(entity.connait), 0)
       
   128 
       
   129     def test_update(self):
       
   130         """Check update of ext entity"""
       
   131         with self.admin_access.repo_cnx() as cnx:
       
   132             importer = self.importer(cnx)
       
   133             # First import
       
   134             richelieu = ExtEntity('Personne', 11,
       
   135                                   {'nom': {u'Richelieu Diacre'}})
       
   136             importer.import_entities([richelieu])
       
   137             cnx.commit()
       
   138             rset = cnx.execute('Any X WHERE X is Personne')
       
   139             entity = rset.get_entity(0, 0)
       
   140             self.assertEqual(entity.nom, u'Richelieu Diacre')
       
   141             # Second import
       
   142             richelieu = ExtEntity('Personne', 11,
       
   143                                   {'nom': {u'Richelieu Cardinal'}})
       
   144             importer.import_entities([richelieu])
       
   145             cnx.commit()
       
   146             rset = cnx.execute('Any X WHERE X is Personne')
       
   147             self.assertEqual(len(rset), 1)
       
   148             entity = rset.get_entity(0, 0)
       
   149             self.assertEqual(entity.nom, u'Richelieu Cardinal')
       
   150 
       
   151 
       
   152 class UseExtidAsCwuriTC(TestCase):
       
   153 
       
   154     def test(self):
       
   155         personne = ExtEntity('Personne', b'1', {'nom': set([u'de la lune']),
       
   156                                                 'prenom': set([u'Jean'])})
       
   157         mapping = {}
       
   158         set_cwuri = use_extid_as_cwuri(mapping)
       
   159         list(set_cwuri((personne,)))
       
   160         self.assertIn('cwuri', personne.values)
       
   161         self.assertEqual(personne.values['cwuri'], set([u'1']))
       
   162         mapping[b'1'] = 'whatever'
       
   163         personne.values.pop('cwuri')
       
   164         list(set_cwuri((personne,)))
       
   165         self.assertNotIn('cwuri', personne.values)
       
   166 
       
   167 
       
   168 def extentities_from_csv(fpath):
       
   169     """Yield ExtEntity read from `fpath` CSV file."""
       
   170     with open(fpath, 'rb') as f:
       
   171         for uri, name, knows in ucsvreader(f, skipfirst=True, skip_empty=False):
       
   172             yield ExtEntity('Personne', uri,
       
   173                             {'nom': set([name]), 'connait': set([knows])})
       
   174 
       
   175 
       
   176 class DataimportFunctionalTC(CubicWebTC):
       
   177 
       
   178     def test_csv(self):
       
   179         extenties = extentities_from_csv(self.datapath('people.csv'))
       
   180         with self.admin_access.repo_cnx() as cnx:
       
   181             store = RQLObjectStore(cnx)
       
   182             importer = ExtEntitiesImporter(self.schema, store)
       
   183             importer.import_entities(extenties)
       
   184             cnx.commit()
       
   185             rset = cnx.execute('String N WHERE X nom N, X connait Y, Y nom "Alice"')
       
   186             self.assertEqual(rset[0][0], u'Bob')
       
   187 
       
   188 
       
   189 if __name__ == '__main__':
       
   190     unittest_main()