|
1 # copyright 2003-2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
3 # |
|
4 # This file is part of CubicWeb. |
|
5 # |
|
6 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
7 # terms of the GNU Lesser General Public License as published by the Free |
|
8 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
9 # any later version. |
|
10 # |
|
11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
13 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
14 # details. |
|
15 # |
|
16 # You should have received a copy of the GNU Lesser General Public License along |
|
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
18 """Old and deprecated dataimport API that provides tools to import tabular data. |
|
19 |
|
20 |
|
21 Example of use (run this with `cubicweb-ctl shell instance import-script.py`): |
|
22 |
|
23 .. sourcecode:: python |
|
24 |
|
25 from cubicweb.dataimport import * |
|
26 # define data generators |
|
27 GENERATORS = [] |
|
28 |
|
29 USERS = [('Prenom', 'firstname', ()), |
|
30 ('Nom', 'surname', ()), |
|
31 ('Identifiant', 'login', ()), |
|
32 ] |
|
33 |
|
34 def gen_users(ctl): |
|
35 for row in ctl.iter_and_commit('utilisateurs'): |
|
36 entity = mk_entity(row, USERS) |
|
37 entity['upassword'] = 'motdepasse' |
|
38 ctl.check('login', entity['login'], None) |
|
39 entity = ctl.store.prepare_insert_entity('CWUser', **entity) |
|
40 email = ctl.store.prepare_insert_entity('EmailAddress', address=row['email']) |
|
41 ctl.store.prepare_insert_relation(entity, 'use_email', email) |
|
42 ctl.store.rql('SET U in_group G WHERE G name "users", U eid %(x)s', {'x': entity}) |
|
43 |
|
44 CHK = [('login', check_doubles, 'Utilisateurs Login', |
|
45 'Deux utilisateurs ne devraient pas avoir le meme login.'), |
|
46 ] |
|
47 |
|
48 GENERATORS.append( (gen_users, CHK) ) |
|
49 |
|
50 # create controller |
|
51 ctl = CWImportController(RQLObjectStore(cnx)) |
|
52 ctl.askerror = 1 |
|
53 ctl.generators = GENERATORS |
|
54 ctl.data['utilisateurs'] = lazytable(ucsvreader(open('users.csv'))) |
|
55 # run |
|
56 ctl.run() |
|
57 |
|
58 .. BUG file with one column are not parsable |
|
59 .. TODO rollback() invocation is not possible yet |
|
60 """ |
|
61 from __future__ import print_function |
|
62 |
|
63 import sys |
|
64 import traceback |
|
65 from io import StringIO |
|
66 |
|
67 from six import add_metaclass |
|
68 |
|
69 from logilab.common import attrdict, shellutils |
|
70 from logilab.common.date import strptime |
|
71 from logilab.common.deprecation import deprecated, class_deprecated |
|
72 |
|
73 from cubicweb import QueryError |
|
74 from cubicweb.dataimport import callfunc_every |
|
75 |
|
76 |
|
77 @deprecated('[3.21] deprecated') |
|
78 def lazytable(reader): |
|
79 """The first row is taken to be the header of the table and |
|
80 used to output a dict for each row of data. |
|
81 |
|
82 >>> data = lazytable(ucsvreader(open(filename))) |
|
83 """ |
|
84 header = next(reader) |
|
85 for row in reader: |
|
86 yield dict(zip(header, row)) |
|
87 |
|
88 |
|
89 @deprecated('[3.21] deprecated') |
|
90 def lazydbtable(cu, table, headers, orderby=None): |
|
91 """return an iterator on rows of a sql table. On each row, fetch columns |
|
92 defined in headers and return values as a dictionary. |
|
93 |
|
94 >>> data = lazydbtable(cu, 'experimentation', ('id', 'nickname', 'gps')) |
|
95 """ |
|
96 sql = 'SELECT %s FROM %s' % (','.join(headers), table,) |
|
97 if orderby: |
|
98 sql += ' ORDER BY %s' % ','.join(orderby) |
|
99 cu.execute(sql) |
|
100 while True: |
|
101 row = cu.fetchone() |
|
102 if row is None: |
|
103 break |
|
104 yield dict(zip(headers, row)) |
|
105 |
|
106 |
|
107 @deprecated('[3.21] deprecated') |
|
108 def tell(msg): |
|
109 print(msg) |
|
110 |
|
111 |
|
112 @deprecated('[3.21] deprecated') |
|
113 def confirm(question): |
|
114 """A confirm function that asks for yes/no/abort and exits on abort.""" |
|
115 answer = shellutils.ASK.ask(question, ('Y', 'n', 'abort'), 'Y') |
|
116 if answer == 'abort': |
|
117 sys.exit(1) |
|
118 return answer == 'Y' |
|
119 |
|
120 |
|
121 @add_metaclass(class_deprecated) |
|
122 class catch_error(object): |
|
123 """Helper for @contextmanager decorator.""" |
|
124 __deprecation_warning__ = '[3.21] deprecated' |
|
125 |
|
126 def __init__(self, ctl, key='unexpected error', msg=None): |
|
127 self.ctl = ctl |
|
128 self.key = key |
|
129 self.msg = msg |
|
130 |
|
131 def __enter__(self): |
|
132 return self |
|
133 |
|
134 def __exit__(self, type, value, traceback): |
|
135 if type is not None: |
|
136 if issubclass(type, (KeyboardInterrupt, SystemExit)): |
|
137 return # re-raise |
|
138 if self.ctl.catcherrors: |
|
139 self.ctl.record_error(self.key, None, type, value, traceback) |
|
140 return True # silent |
|
141 |
|
142 @deprecated('[3.21] deprecated') |
|
143 def mk_entity(row, map): |
|
144 """Return a dict made from sanitized mapped values. |
|
145 |
|
146 ValueError can be raised on unexpected values found in checkers |
|
147 |
|
148 >>> row = {'myname': u'dupont'} |
|
149 >>> map = [('myname', u'name', (call_transform_method('title'),))] |
|
150 >>> mk_entity(row, map) |
|
151 {'name': u'Dupont'} |
|
152 >>> row = {'myname': u'dupont', 'optname': u''} |
|
153 >>> map = [('myname', u'name', (call_transform_method('title'),)), |
|
154 ... ('optname', u'MARKER', (optional,))] |
|
155 >>> mk_entity(row, map) |
|
156 {'name': u'Dupont', 'optname': None} |
|
157 """ |
|
158 res = {} |
|
159 assert isinstance(row, dict) |
|
160 assert isinstance(map, list) |
|
161 for src, dest, funcs in map: |
|
162 try: |
|
163 res[dest] = row[src] |
|
164 except KeyError: |
|
165 continue |
|
166 try: |
|
167 for func in funcs: |
|
168 res[dest] = func(res[dest]) |
|
169 if res[dest] is None: |
|
170 break |
|
171 except ValueError as err: |
|
172 exc = ValueError('error with %r field: %s' % (src, err)) |
|
173 exc.__traceback__ = sys.exc_info()[-1] |
|
174 raise exc |
|
175 return res |
|
176 |
|
177 |
|
178 # base sanitizing/coercing functions ########################################### |
|
179 |
|
180 @deprecated('[3.21] deprecated') |
|
181 def optional(value): |
|
182 """checker to filter optional field |
|
183 |
|
184 If value is undefined (ex: empty string), return None that will |
|
185 break the checkers validation chain |
|
186 |
|
187 General use is to add 'optional' check in first condition to avoid |
|
188 ValueError by further checkers |
|
189 |
|
190 >>> MAPPER = [(u'value', 'value', (optional, int))] |
|
191 >>> row = {'value': u'XXX'} |
|
192 >>> mk_entity(row, MAPPER) |
|
193 {'value': None} |
|
194 >>> row = {'value': u'100'} |
|
195 >>> mk_entity(row, MAPPER) |
|
196 {'value': 100} |
|
197 """ |
|
198 if value: |
|
199 return value |
|
200 return None |
|
201 |
|
202 |
|
203 @deprecated('[3.21] deprecated') |
|
204 def required(value): |
|
205 """raise ValueError if value is empty |
|
206 |
|
207 This check should be often found in last position in the chain. |
|
208 """ |
|
209 if value: |
|
210 return value |
|
211 raise ValueError("required") |
|
212 |
|
213 |
|
214 @deprecated('[3.21] deprecated') |
|
215 def todatetime(format='%d/%m/%Y'): |
|
216 """return a transformation function to turn string input value into a |
|
217 `datetime.datetime` instance, using given format. |
|
218 |
|
219 Follow it by `todate` or `totime` functions from `logilab.common.date` if |
|
220 you want a `date`/`time` instance instead of `datetime`. |
|
221 """ |
|
222 def coerce(value): |
|
223 return strptime(value, format) |
|
224 return coerce |
|
225 |
|
226 |
|
227 @deprecated('[3.21] deprecated') |
|
228 def call_transform_method(methodname, *args, **kwargs): |
|
229 """return value returned by calling the given method on input""" |
|
230 def coerce(value): |
|
231 return getattr(value, methodname)(*args, **kwargs) |
|
232 return coerce |
|
233 |
|
234 |
|
235 @deprecated('[3.21] deprecated') |
|
236 def call_check_method(methodname, *args, **kwargs): |
|
237 """check value returned by calling the given method on input is true, |
|
238 else raise ValueError |
|
239 """ |
|
240 def check(value): |
|
241 if getattr(value, methodname)(*args, **kwargs): |
|
242 return value |
|
243 raise ValueError('%s not verified on %r' % (methodname, value)) |
|
244 return check |
|
245 |
|
246 |
|
247 # base integrity checking functions ############################################ |
|
248 |
|
249 @deprecated('[3.21] deprecated') |
|
250 def check_doubles(buckets): |
|
251 """Extract the keys that have more than one item in their bucket.""" |
|
252 return [(k, len(v)) for k, v in buckets.items() if len(v) > 1] |
|
253 |
|
254 |
|
255 @deprecated('[3.21] deprecated') |
|
256 def check_doubles_not_none(buckets): |
|
257 """Extract the keys that have more than one item in their bucket.""" |
|
258 return [(k, len(v)) for k, v in buckets.items() |
|
259 if k is not None and len(v) > 1] |
|
260 |
|
261 |
|
262 @add_metaclass(class_deprecated) |
|
263 class ObjectStore(object): |
|
264 """Store objects in memory for *faster* validation (development mode) |
|
265 |
|
266 But it will not enforce the constraints of the schema and hence will miss some problems |
|
267 |
|
268 >>> store = ObjectStore() |
|
269 >>> user = store.prepare_insert_entity('CWUser', login=u'johndoe') |
|
270 >>> group = store.prepare_insert_entity('CWUser', name=u'unknown') |
|
271 >>> store.prepare_insert_relation(user, 'in_group', group) |
|
272 """ |
|
273 __deprecation_warning__ = '[3.21] use the new importer API' |
|
274 |
|
275 def __init__(self): |
|
276 self.items = [] |
|
277 self.eids = {} |
|
278 self.types = {} |
|
279 self.relations = set() |
|
280 self.indexes = {} |
|
281 |
|
282 def prepare_insert_entity(self, etype, **data): |
|
283 """Given an entity type, attributes and inlined relations, return an eid for the entity that |
|
284 would be inserted with a real store. |
|
285 """ |
|
286 data = attrdict(data) |
|
287 data['eid'] = eid = len(self.items) |
|
288 self.items.append(data) |
|
289 self.eids[eid] = data |
|
290 self.types.setdefault(etype, []).append(eid) |
|
291 return eid |
|
292 |
|
293 def prepare_update_entity(self, etype, eid, **kwargs): |
|
294 """Given an entity type and eid, updates the corresponding fake entity with specified |
|
295 attributes and inlined relations. |
|
296 """ |
|
297 assert eid in self.types[etype], 'Trying to update with wrong type %s' % etype |
|
298 data = self.eids[eid] |
|
299 data.update(kwargs) |
|
300 |
|
301 def prepare_insert_relation(self, eid_from, rtype, eid_to, **kwargs): |
|
302 """Store into the `relations` attribute that a relation ``rtype`` exists between entities |
|
303 with eids ``eid_from`` and ``eid_to``. |
|
304 """ |
|
305 relation = eid_from, rtype, eid_to |
|
306 self.relations.add(relation) |
|
307 return relation |
|
308 |
|
309 def flush(self): |
|
310 """Nothing to flush for this store.""" |
|
311 pass |
|
312 |
|
313 def commit(self): |
|
314 """Nothing to commit for this store.""" |
|
315 return |
|
316 |
|
317 def finish(self): |
|
318 """Nothing to do once import is terminated for this store.""" |
|
319 pass |
|
320 |
|
321 @property |
|
322 def nb_inserted_entities(self): |
|
323 return len(self.eids) |
|
324 |
|
325 @property |
|
326 def nb_inserted_types(self): |
|
327 return len(self.types) |
|
328 |
|
329 @property |
|
330 def nb_inserted_relations(self): |
|
331 return len(self.relations) |
|
332 |
|
333 @deprecated('[3.21] use prepare_insert_entity instead') |
|
334 def create_entity(self, etype, **data): |
|
335 self.prepare_insert_entity(etype, **data) |
|
336 return attrdict(data) |
|
337 |
|
338 @deprecated('[3.21] use prepare_insert_relation instead') |
|
339 def relate(self, eid_from, rtype, eid_to, **kwargs): |
|
340 self.prepare_insert_relation(eid_from, rtype, eid_to, **kwargs) |
|
341 |
|
342 |
|
343 @add_metaclass(class_deprecated) |
|
344 class CWImportController(object): |
|
345 """Controller of the data import process. |
|
346 |
|
347 >>> ctl = CWImportController(store) |
|
348 >>> ctl.generators = list_of_data_generators |
|
349 >>> ctl.data = dict_of_data_tables |
|
350 >>> ctl.run() |
|
351 """ |
|
352 __deprecation_warning__ = '[3.21] use the new importer API' |
|
353 |
|
354 def __init__(self, store, askerror=0, catcherrors=None, tell=tell, |
|
355 commitevery=50): |
|
356 self.store = store |
|
357 self.generators = None |
|
358 self.data = {} |
|
359 self.errors = None |
|
360 self.askerror = askerror |
|
361 if catcherrors is None: |
|
362 catcherrors = askerror |
|
363 self.catcherrors = catcherrors |
|
364 self.commitevery = commitevery # set to None to do a single commit |
|
365 self._tell = tell |
|
366 |
|
367 def check(self, type, key, value): |
|
368 self._checks.setdefault(type, {}).setdefault(key, []).append(value) |
|
369 |
|
370 def check_map(self, entity, key, map, default): |
|
371 try: |
|
372 entity[key] = map[entity[key]] |
|
373 except KeyError: |
|
374 self.check(key, entity[key], None) |
|
375 entity[key] = default |
|
376 |
|
377 def record_error(self, key, msg=None, type=None, value=None, tb=None): |
|
378 tmp = StringIO() |
|
379 if type is None: |
|
380 traceback.print_exc(file=tmp) |
|
381 else: |
|
382 traceback.print_exception(type, value, tb, file=tmp) |
|
383 # use a list to avoid counting a <nb lines> errors instead of one |
|
384 errorlog = self.errors.setdefault(key, []) |
|
385 if msg is None: |
|
386 errorlog.append(tmp.getvalue().splitlines()) |
|
387 else: |
|
388 errorlog.append( (msg, tmp.getvalue().splitlines()) ) |
|
389 |
|
390 def run(self): |
|
391 self.errors = {} |
|
392 if self.commitevery is None: |
|
393 self.tell('Will commit all or nothing.') |
|
394 else: |
|
395 self.tell('Will commit every %s iterations' % self.commitevery) |
|
396 for func, checks in self.generators: |
|
397 self._checks = {} |
|
398 func_name = func.__name__ |
|
399 self.tell("Run import function '%s'..." % func_name) |
|
400 try: |
|
401 func(self) |
|
402 except Exception: |
|
403 if self.catcherrors: |
|
404 self.record_error(func_name, 'While calling %s' % func.__name__) |
|
405 else: |
|
406 self._print_stats() |
|
407 raise |
|
408 for key, func, title, help in checks: |
|
409 buckets = self._checks.get(key) |
|
410 if buckets: |
|
411 err = func(buckets) |
|
412 if err: |
|
413 self.errors[title] = (help, err) |
|
414 try: |
|
415 txuuid = self.store.commit() |
|
416 if txuuid is not None: |
|
417 self.tell('Transaction commited (txuuid: %s)' % txuuid) |
|
418 except QueryError as ex: |
|
419 self.tell('Transaction aborted: %s' % ex) |
|
420 self._print_stats() |
|
421 if self.errors: |
|
422 if self.askerror == 2 or (self.askerror and confirm('Display errors ?')): |
|
423 from pprint import pformat |
|
424 for errkey, error in self.errors.items(): |
|
425 self.tell("\n%s (%s): %d\n" % (error[0], errkey, len(error[1]))) |
|
426 self.tell(pformat(sorted(error[1]))) |
|
427 |
|
428 def _print_stats(self): |
|
429 nberrors = sum(len(err) for err in self.errors.values()) |
|
430 self.tell('\nImport statistics: %i entities, %i types, %i relations and %i errors' |
|
431 % (self.store.nb_inserted_entities, |
|
432 self.store.nb_inserted_types, |
|
433 self.store.nb_inserted_relations, |
|
434 nberrors)) |
|
435 |
|
436 def get_data(self, key): |
|
437 return self.data.get(key) |
|
438 |
|
439 def index(self, name, key, value, unique=False): |
|
440 """create a new index |
|
441 |
|
442 If unique is set to True, only first occurence will be kept not the following ones |
|
443 """ |
|
444 if unique: |
|
445 try: |
|
446 if value in self.store.indexes[name][key]: |
|
447 return |
|
448 except KeyError: |
|
449 # we're sure that one is the first occurence; so continue... |
|
450 pass |
|
451 self.store.indexes.setdefault(name, {}).setdefault(key, []).append(value) |
|
452 |
|
453 def tell(self, msg): |
|
454 self._tell(msg) |
|
455 |
|
456 def iter_and_commit(self, datakey): |
|
457 """iter rows, triggering commit every self.commitevery iterations""" |
|
458 if self.commitevery is None: |
|
459 return self.get_data(datakey) |
|
460 else: |
|
461 return callfunc_every(self.store.commit, |
|
462 self.commitevery, |
|
463 self.get_data(datakey)) |