|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 import datetime as DT |
|
4 from StringIO import StringIO |
|
5 |
|
6 from logilab.common.testlib import TestCase, unittest_main |
|
7 |
|
8 from cubicweb import dataimport |
|
9 from cubicweb.devtools.testlib import CubicWebTC |
|
10 |
|
11 |
|
12 class RQLObjectStoreTC(CubicWebTC): |
|
13 |
|
14 def test_all(self): |
|
15 with self.admin_access.repo_cnx() as cnx: |
|
16 store = dataimport.RQLObjectStore(cnx) |
|
17 group_eid = store.create_entity('CWGroup', name=u'grp').eid |
|
18 user_eid = store.create_entity('CWUser', login=u'lgn', upassword=u'pwd').eid |
|
19 store.relate(user_eid, 'in_group', group_eid) |
|
20 cnx.commit() |
|
21 |
|
22 with self.admin_access.repo_cnx() as cnx: |
|
23 users = cnx.execute('CWUser X WHERE X login "lgn"') |
|
24 self.assertEqual(1, len(users)) |
|
25 self.assertEqual(user_eid, users.one().eid) |
|
26 groups = cnx.execute('CWGroup X WHERE U in_group X, U login "lgn"') |
|
27 self.assertEqual(1, len(users)) |
|
28 self.assertEqual(group_eid, groups.one().eid) |
|
29 |
|
30 |
|
31 class CreateCopyFromBufferTC(TestCase): |
|
32 |
|
33 # test converters |
|
34 |
|
35 def test_convert_none(self): |
|
36 cnvt = dataimport._copyfrom_buffer_convert_None |
|
37 self.assertEqual('NULL', cnvt(None)) |
|
38 |
|
39 def test_convert_number(self): |
|
40 cnvt = dataimport._copyfrom_buffer_convert_number |
|
41 self.assertEqual('42', cnvt(42)) |
|
42 self.assertEqual('42', cnvt(42L)) |
|
43 self.assertEqual('42.42', cnvt(42.42)) |
|
44 |
|
45 def test_convert_string(self): |
|
46 cnvt = dataimport._copyfrom_buffer_convert_string |
|
47 # simple |
|
48 self.assertEqual('babar', cnvt('babar')) |
|
49 # unicode |
|
50 self.assertEqual('\xc3\xa9l\xc3\xa9phant', cnvt(u'éléphant')) |
|
51 self.assertEqual('\xe9l\xe9phant', cnvt(u'éléphant', encoding='latin1')) |
|
52 # escaping |
|
53 self.assertEqual('babar\\tceleste\\n', cnvt('babar\tceleste\n')) |
|
54 self.assertEqual(r'C:\\new\tC:\\test', cnvt('C:\\new\tC:\\test')) |
|
55 |
|
56 def test_convert_date(self): |
|
57 cnvt = dataimport._copyfrom_buffer_convert_date |
|
58 self.assertEqual('0666-01-13', cnvt(DT.date(666, 1, 13))) |
|
59 |
|
60 def test_convert_time(self): |
|
61 cnvt = dataimport._copyfrom_buffer_convert_time |
|
62 self.assertEqual('06:06:06.000100', cnvt(DT.time(6, 6, 6, 100))) |
|
63 |
|
64 def test_convert_datetime(self): |
|
65 cnvt = dataimport._copyfrom_buffer_convert_datetime |
|
66 self.assertEqual('0666-06-13 06:06:06.000000', cnvt(DT.datetime(666, 6, 13, 6, 6, 6))) |
|
67 |
|
68 # test buffer |
|
69 def test_create_copyfrom_buffer_tuple(self): |
|
70 cnvt = dataimport._create_copyfrom_buffer |
|
71 data = ((42, 42L, 42.42, u'éléphant', DT.date(666, 1, 13), DT.time(6, 6, 6), DT.datetime(666, 6, 13, 6, 6, 6)), |
|
72 (6, 6L, 6.6, u'babar', DT.date(2014, 1, 14), DT.time(4, 2, 1), DT.datetime(2014, 1, 1, 0, 0, 0))) |
|
73 results = dataimport._create_copyfrom_buffer(data) |
|
74 # all columns |
|
75 expected = '''42\t42\t42.42\téléphant\t0666-01-13\t06:06:06.000000\t0666-06-13 06:06:06.000000 |
|
76 6\t6\t6.6\tbabar\t2014-01-14\t04:02:01.000000\t2014-01-01 00:00:00.000000''' |
|
77 self.assertMultiLineEqual(expected, results.getvalue()) |
|
78 # selected columns |
|
79 results = dataimport._create_copyfrom_buffer(data, columns=(1, 3, 6)) |
|
80 expected = '''42\téléphant\t0666-06-13 06:06:06.000000 |
|
81 6\tbabar\t2014-01-01 00:00:00.000000''' |
|
82 self.assertMultiLineEqual(expected, results.getvalue()) |
|
83 |
|
84 def test_create_copyfrom_buffer_dict(self): |
|
85 cnvt = dataimport._create_copyfrom_buffer |
|
86 data = (dict(integer=42, double=42.42, text=u'éléphant', date=DT.datetime(666, 6, 13, 6, 6, 6)), |
|
87 dict(integer=6, double=6.6, text=u'babar', date=DT.datetime(2014, 1, 1, 0, 0, 0))) |
|
88 results = dataimport._create_copyfrom_buffer(data, ('integer', 'text')) |
|
89 expected = '''42\téléphant\n6\tbabar''' |
|
90 self.assertMultiLineEqual(expected, results.getvalue()) |
|
91 |
|
92 |
|
93 class UcsvreaderTC(TestCase): |
|
94 |
|
95 def test_empty_lines_skipped(self): |
|
96 stream = StringIO('''a,b,c,d, |
|
97 1,2,3,4, |
|
98 ,,,, |
|
99 ,,,, |
|
100 ''') |
|
101 self.assertEqual([[u'a', u'b', u'c', u'd', u''], |
|
102 [u'1', u'2', u'3', u'4', u''], |
|
103 ], |
|
104 list(dataimport.ucsvreader(stream))) |
|
105 stream.seek(0) |
|
106 self.assertEqual([[u'a', u'b', u'c', u'd', u''], |
|
107 [u'1', u'2', u'3', u'4', u''], |
|
108 [u'', u'', u'', u'', u''], |
|
109 [u'', u'', u'', u'', u''] |
|
110 ], |
|
111 list(dataimport.ucsvreader(stream, skip_empty=False))) |
|
112 |
|
113 def test_skip_first(self): |
|
114 stream = StringIO('a,b,c,d,\n' |
|
115 '1,2,3,4,\n') |
|
116 reader = dataimport.ucsvreader(stream, skipfirst=True, |
|
117 ignore_errors=True) |
|
118 self.assertEqual(list(reader), |
|
119 [[u'1', u'2', u'3', u'4', u'']]) |
|
120 |
|
121 stream.seek(0) |
|
122 reader = dataimport.ucsvreader(stream, skipfirst=True, |
|
123 ignore_errors=False) |
|
124 self.assertEqual(list(reader), |
|
125 [[u'1', u'2', u'3', u'4', u'']]) |
|
126 |
|
127 stream.seek(0) |
|
128 reader = dataimport.ucsvreader(stream, skipfirst=False, |
|
129 ignore_errors=True) |
|
130 self.assertEqual(list(reader), |
|
131 [[u'a', u'b', u'c', u'd', u''], |
|
132 [u'1', u'2', u'3', u'4', u'']]) |
|
133 |
|
134 stream.seek(0) |
|
135 reader = dataimport.ucsvreader(stream, skipfirst=False, |
|
136 ignore_errors=False) |
|
137 self.assertEqual(list(reader), |
|
138 [[u'a', u'b', u'c', u'd', u''], |
|
139 [u'1', u'2', u'3', u'4', u'']]) |
|
140 |
|
141 |
|
142 class MetaGeneratorTC(CubicWebTC): |
|
143 |
|
144 def test_dont_generate_relation_to_internal_manager(self): |
|
145 with self.admin_access.repo_cnx() as cnx: |
|
146 metagen = dataimport.MetaGenerator(cnx) |
|
147 self.assertIn('created_by', metagen.etype_rels) |
|
148 self.assertIn('owned_by', metagen.etype_rels) |
|
149 with self.repo.internal_cnx() as cnx: |
|
150 metagen = dataimport.MetaGenerator(cnx) |
|
151 self.assertNotIn('created_by', metagen.etype_rels) |
|
152 self.assertNotIn('owned_by', metagen.etype_rels) |
|
153 |
|
154 def test_dont_generate_specified_values(self): |
|
155 with self.admin_access.repo_cnx() as cnx: |
|
156 metagen = dataimport.MetaGenerator(cnx) |
|
157 # hijack gen_modification_date to ensure we don't go through it |
|
158 metagen.gen_modification_date = None |
|
159 md = DT.datetime.now() - DT.timedelta(days=1) |
|
160 entity, rels = metagen.base_etype_dicts('CWUser') |
|
161 entity.cw_edited.update(dict(modification_date=md)) |
|
162 with cnx.ensure_cnx_set: |
|
163 metagen.init_entity(entity) |
|
164 self.assertEqual(entity.cw_edited['modification_date'], md) |
|
165 |
|
166 |
|
167 if __name__ == '__main__': |
|
168 unittest_main() |