dataimport/test/unittest_dataimport.py
changeset 10350 31327bd26931
parent 10349 efbbf1e93a04
child 10457 1f5026e7d848
equal deleted inserted replaced
10349:efbbf1e93a04 10350:31327bd26931
       
     1 # -*- coding: utf-8 -*-
       
     2 
       
     3 import datetime as DT
       
     4 from StringIO import StringIO
       
     5 
       
     6 from logilab.common.testlib import TestCase, unittest_main
       
     7 
       
     8 from cubicweb import dataimport
       
     9 from cubicweb.devtools.testlib import CubicWebTC
       
    10 
       
    11 
       
    12 class RQLObjectStoreTC(CubicWebTC):
       
    13 
       
    14     def test_all(self):
       
    15         with self.admin_access.repo_cnx() as cnx:
       
    16             store = dataimport.RQLObjectStore(cnx)
       
    17             group_eid = store.create_entity('CWGroup', name=u'grp').eid
       
    18             user_eid = store.create_entity('CWUser', login=u'lgn', upassword=u'pwd').eid
       
    19             store.relate(user_eid, 'in_group', group_eid)
       
    20             cnx.commit()
       
    21 
       
    22         with self.admin_access.repo_cnx() as cnx:
       
    23             users = cnx.execute('CWUser X WHERE X login "lgn"')
       
    24             self.assertEqual(1, len(users))
       
    25             self.assertEqual(user_eid, users.one().eid)
       
    26             groups = cnx.execute('CWGroup X WHERE U in_group X, U login "lgn"')
       
    27             self.assertEqual(1, len(users))
       
    28             self.assertEqual(group_eid, groups.one().eid)
       
    29 
       
    30 
       
    31 class CreateCopyFromBufferTC(TestCase):
       
    32 
       
    33     # test converters
       
    34 
       
    35     def test_convert_none(self):
       
    36         cnvt = dataimport._copyfrom_buffer_convert_None
       
    37         self.assertEqual('NULL', cnvt(None))
       
    38 
       
    39     def test_convert_number(self):
       
    40         cnvt = dataimport._copyfrom_buffer_convert_number
       
    41         self.assertEqual('42', cnvt(42))
       
    42         self.assertEqual('42', cnvt(42L))
       
    43         self.assertEqual('42.42', cnvt(42.42))
       
    44 
       
    45     def test_convert_string(self):
       
    46         cnvt = dataimport._copyfrom_buffer_convert_string
       
    47         # simple
       
    48         self.assertEqual('babar', cnvt('babar'))
       
    49         # unicode
       
    50         self.assertEqual('\xc3\xa9l\xc3\xa9phant', cnvt(u'éléphant'))
       
    51         self.assertEqual('\xe9l\xe9phant', cnvt(u'éléphant', encoding='latin1'))
       
    52         # escaping
       
    53         self.assertEqual('babar\\tceleste\\n', cnvt('babar\tceleste\n'))
       
    54         self.assertEqual(r'C:\\new\tC:\\test', cnvt('C:\\new\tC:\\test'))
       
    55 
       
    56     def test_convert_date(self):
       
    57         cnvt = dataimport._copyfrom_buffer_convert_date
       
    58         self.assertEqual('0666-01-13', cnvt(DT.date(666, 1, 13)))
       
    59 
       
    60     def test_convert_time(self):
       
    61         cnvt = dataimport._copyfrom_buffer_convert_time
       
    62         self.assertEqual('06:06:06.000100', cnvt(DT.time(6, 6, 6, 100)))
       
    63 
       
    64     def test_convert_datetime(self):
       
    65         cnvt = dataimport._copyfrom_buffer_convert_datetime
       
    66         self.assertEqual('0666-06-13 06:06:06.000000', cnvt(DT.datetime(666, 6, 13, 6, 6, 6)))
       
    67 
       
    68     # test buffer
       
    69     def test_create_copyfrom_buffer_tuple(self):
       
    70         cnvt = dataimport._create_copyfrom_buffer
       
    71         data = ((42, 42L, 42.42, u'éléphant', DT.date(666, 1, 13), DT.time(6, 6, 6), DT.datetime(666, 6, 13, 6, 6, 6)),
       
    72                 (6, 6L, 6.6, u'babar', DT.date(2014, 1, 14), DT.time(4, 2, 1), DT.datetime(2014, 1, 1, 0, 0, 0)))
       
    73         results = dataimport._create_copyfrom_buffer(data)
       
    74         # all columns
       
    75         expected = '''42\t42\t42.42\téléphant\t0666-01-13\t06:06:06.000000\t0666-06-13 06:06:06.000000
       
    76 6\t6\t6.6\tbabar\t2014-01-14\t04:02:01.000000\t2014-01-01 00:00:00.000000'''
       
    77         self.assertMultiLineEqual(expected, results.getvalue())
       
    78         # selected columns
       
    79         results = dataimport._create_copyfrom_buffer(data, columns=(1, 3, 6))
       
    80         expected = '''42\téléphant\t0666-06-13 06:06:06.000000
       
    81 6\tbabar\t2014-01-01 00:00:00.000000'''
       
    82         self.assertMultiLineEqual(expected, results.getvalue())
       
    83 
       
    84     def test_create_copyfrom_buffer_dict(self):
       
    85         cnvt = dataimport._create_copyfrom_buffer
       
    86         data = (dict(integer=42, double=42.42, text=u'éléphant', date=DT.datetime(666, 6, 13, 6, 6, 6)),
       
    87                 dict(integer=6, double=6.6, text=u'babar', date=DT.datetime(2014, 1, 1, 0, 0, 0)))
       
    88         results = dataimport._create_copyfrom_buffer(data, ('integer', 'text'))
       
    89         expected = '''42\téléphant\n6\tbabar'''
       
    90         self.assertMultiLineEqual(expected, results.getvalue())
       
    91 
       
    92 
       
    93 class UcsvreaderTC(TestCase):
       
    94 
       
    95     def test_empty_lines_skipped(self):
       
    96         stream = StringIO('''a,b,c,d,
       
    97 1,2,3,4,
       
    98 ,,,,
       
    99 ,,,,
       
   100 ''')
       
   101         self.assertEqual([[u'a', u'b', u'c', u'd', u''],
       
   102                           [u'1', u'2', u'3', u'4', u''],
       
   103                           ],
       
   104                          list(dataimport.ucsvreader(stream)))
       
   105         stream.seek(0)
       
   106         self.assertEqual([[u'a', u'b', u'c', u'd', u''],
       
   107                           [u'1', u'2', u'3', u'4', u''],
       
   108                           [u'', u'', u'', u'', u''],
       
   109                           [u'', u'', u'', u'', u'']
       
   110                           ],
       
   111                          list(dataimport.ucsvreader(stream, skip_empty=False)))
       
   112 
       
   113     def test_skip_first(self):
       
   114         stream = StringIO('a,b,c,d,\n'
       
   115                           '1,2,3,4,\n')
       
   116         reader = dataimport.ucsvreader(stream, skipfirst=True,
       
   117                                        ignore_errors=True)
       
   118         self.assertEqual(list(reader),
       
   119                          [[u'1', u'2', u'3', u'4', u'']])
       
   120 
       
   121         stream.seek(0)
       
   122         reader = dataimport.ucsvreader(stream, skipfirst=True,
       
   123                                        ignore_errors=False)
       
   124         self.assertEqual(list(reader),
       
   125                          [[u'1', u'2', u'3', u'4', u'']])
       
   126 
       
   127         stream.seek(0)
       
   128         reader = dataimport.ucsvreader(stream, skipfirst=False,
       
   129                                        ignore_errors=True)
       
   130         self.assertEqual(list(reader),
       
   131                          [[u'a', u'b', u'c', u'd', u''],
       
   132                           [u'1', u'2', u'3', u'4', u'']])
       
   133 
       
   134         stream.seek(0)
       
   135         reader = dataimport.ucsvreader(stream, skipfirst=False,
       
   136                                        ignore_errors=False)
       
   137         self.assertEqual(list(reader),
       
   138                          [[u'a', u'b', u'c', u'd', u''],
       
   139                           [u'1', u'2', u'3', u'4', u'']])
       
   140 
       
   141 
       
   142 class MetaGeneratorTC(CubicWebTC):
       
   143 
       
   144     def test_dont_generate_relation_to_internal_manager(self):
       
   145         with self.admin_access.repo_cnx() as cnx:
       
   146             metagen = dataimport.MetaGenerator(cnx)
       
   147             self.assertIn('created_by', metagen.etype_rels)
       
   148             self.assertIn('owned_by', metagen.etype_rels)
       
   149         with self.repo.internal_cnx() as cnx:
       
   150             metagen = dataimport.MetaGenerator(cnx)
       
   151             self.assertNotIn('created_by', metagen.etype_rels)
       
   152             self.assertNotIn('owned_by', metagen.etype_rels)
       
   153 
       
   154     def test_dont_generate_specified_values(self):
       
   155         with self.admin_access.repo_cnx() as cnx:
       
   156             metagen = dataimport.MetaGenerator(cnx)
       
   157             # hijack gen_modification_date to ensure we don't go through it
       
   158             metagen.gen_modification_date = None
       
   159             md = DT.datetime.now() - DT.timedelta(days=1)
       
   160             entity, rels = metagen.base_etype_dicts('CWUser')
       
   161             entity.cw_edited.update(dict(modification_date=md))
       
   162             with cnx.ensure_cnx_set:
       
   163                 metagen.init_entity(entity)
       
   164             self.assertEqual(entity.cw_edited['modification_date'], md)
       
   165 
       
   166 
       
   167 if __name__ == '__main__':
       
   168     unittest_main()