|
1 # copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
2 # contact http://www.logilab.fr -- mailto:contact@logilab.fr |
|
3 # |
|
4 # This program is free software: you can redistribute it and/or modify it under |
|
5 # the terms of the GNU Lesser General Public License as published by the Free |
|
6 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
7 # any later version. |
|
8 # |
|
9 # This program is distributed in the hope that it will be useful, but WITHOUT |
|
10 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
11 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
12 # details. |
|
13 # |
|
14 # You should have received a copy of the GNU Lesser General Public License along |
|
15 # with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 """This module contains tools to programmatically import external data into CubicWeb. It's designed |
|
17 on top of the store concept to leverage possibility of code sharing accross various data import |
|
18 needs. |
|
19 |
|
20 The following classes are defined: |
|
21 |
|
22 * :class:`ExtEntity`: some intermediate representation of data to import, using external identifier |
|
23 but no eid, |
|
24 |
|
25 * :class:`ExtEntitiesImporter`: class responsible for turning ExtEntity's extid to eid, and create |
|
26 or update CubicWeb entities accordingly (using a Store). |
|
27 |
|
28 What is left to do is to write a class or a function that will yield external entities from some |
|
29 data source (eg RDF, CSV) which will be case dependant (the *generator*). You may then plug |
|
30 arbitrary filters into the external entities stream between the generator and the importer, allowing |
|
31 to have some generic generators whose generated content is rafined by specific filters. |
|
32 |
|
33 .. code-block:: python |
|
34 |
|
35 ext_entities = fetch(<source>) # function yielding external entities |
|
36 log = SimpleImportLog('<source file/url/whatever>') |
|
37 importer = ExtEntitiesImporter(cnx, store, import_log=log) |
|
38 importer.import_entities(ext_entities) |
|
39 |
|
40 Here are the two classes that you'll have to deal with, and maybe to override: |
|
41 |
|
42 .. autoclass:: cubicweb.dataimport.importer.ExtEntitiesImporter |
|
43 .. autoclass:: cubicweb.dataimport.importer.ExtEntity |
|
44 """ |
|
45 |
|
46 from collections import defaultdict |
|
47 import logging |
|
48 |
|
49 from logilab.mtconverter import xml_escape |
|
50 |
|
51 |
|
52 def cwuri2eid(cnx, etypes, source_eid=None): |
|
53 """Return a dictionary mapping cwuri to eid for entities of the given entity types and / or |
|
54 source. |
|
55 """ |
|
56 assert source_eid or etypes, 'no entity types nor source specified' |
|
57 rql = 'Any U, X WHERE X cwuri U' |
|
58 args = {} |
|
59 if len(etypes) == 1: |
|
60 rql += ', X is %s' % etypes[0] |
|
61 elif etypes: |
|
62 rql += ', X is IN (%s)' % ','.join(etypes) |
|
63 if source_eid is not None: |
|
64 rql += ', X cw_source S, S eid %(s)s' |
|
65 args['s'] = source_eid |
|
66 return dict(cnx.execute(rql, args)) |
|
67 |
|
68 |
|
69 class RelationMapping(object): |
|
70 """Read-only mapping from relation type to set of related (subject, object) eids. |
|
71 |
|
72 If `source` is specified, only returns relations implying entities from |
|
73 this source. |
|
74 """ |
|
75 |
|
76 def __init__(self, cnx, source=None): |
|
77 self.cnx = cnx |
|
78 self._rql_template = 'Any S,O WHERE S {} O' |
|
79 self._kwargs = {} |
|
80 if source is not None: |
|
81 self._rql_template += ', S cw_source SO, O cw_source SO, SO eid %(s)s' |
|
82 self._kwargs['s'] = source.eid |
|
83 |
|
84 def __getitem__(self, rtype): |
|
85 """Return a set of (subject, object) eids already related by `rtype`""" |
|
86 rql = self._rql_template.format(rtype) |
|
87 return set(tuple(x) for x in self.cnx.execute(rql, self._kwargs)) |
|
88 |
|
89 |
|
90 class ExtEntity(object): |
|
91 """Transitional representation of an entity for use in data importer. |
|
92 |
|
93 An external entity has the following properties: |
|
94 |
|
95 * ``extid`` (external id), an identifier for the ext entity, |
|
96 * ``etype`` (entity type), a string which must be the name of one entity type in the schema |
|
97 (eg. ``'Person'``, ``'Animal'``, ...), |
|
98 * ``values``, a dictionary whose keys are attribute or relation names from the schema (eg. |
|
99 ``'first_name'``, ``'friend'``), and whose values are *sets* |
|
100 |
|
101 For instance: |
|
102 |
|
103 ..code-block::python |
|
104 |
|
105 ext_entity.extid = 'http://example.org/person/debby' |
|
106 ext_entity.etype = 'Person' |
|
107 ext_entity.values = {'first_name': set([u"Deborah", u"Debby"]), |
|
108 'friend': set(['http://example.org/person/john'])} |
|
109 |
|
110 """ |
|
111 |
|
112 def __init__(self, etype, extid, values=None): |
|
113 self.etype = etype |
|
114 self.extid = extid |
|
115 if values is None: |
|
116 values = {} |
|
117 self.values = values |
|
118 self._schema = None |
|
119 |
|
120 def __repr__(self): |
|
121 return '<%s %s %s>' % (self.etype, self.extid, self.values) |
|
122 |
|
123 def iter_rdefs(self): |
|
124 """Yield (key, rtype, role) defined in `.values` dict, with: |
|
125 |
|
126 * `key` is the original key in `.values` (i.e. the relation type or a 2-uple (relation type, |
|
127 role)) |
|
128 |
|
129 * `rtype` is a yams relation type, expected to be found in the schema (attribute or |
|
130 relation) |
|
131 |
|
132 * `role` is the role of the entity in the relation, 'subject' or 'object' |
|
133 |
|
134 Iteration is done on a copy of the keys so values may be inserted/deleted during it. |
|
135 """ |
|
136 for key in list(self.values): |
|
137 if isinstance(key, tuple): |
|
138 rtype, role = key |
|
139 assert role in ('subject', 'object'), key |
|
140 yield key, rtype, role |
|
141 else: |
|
142 yield key, key, 'subject' |
|
143 |
|
144 def prepare(self, schema): |
|
145 """Prepare an external entity for later insertion: |
|
146 |
|
147 * ensure attributes and inlined relations have a single value |
|
148 * turn set([value]) into value and remove key associated to empty set |
|
149 * remove non inlined relations and return them as a [(e1key, relation, e2key)] list |
|
150 |
|
151 Return a list of non inlined relations that may be inserted later, each relations defined by |
|
152 a 3-tuple (subject extid, relation type, object extid). |
|
153 |
|
154 Take care the importer may call this method several times. |
|
155 """ |
|
156 assert self._schema is None, 'prepare() has already been called for %s' % self |
|
157 self._schema = schema |
|
158 eschema = schema.eschema(self.etype) |
|
159 deferred = [] |
|
160 entity_dict = self.values |
|
161 for key, rtype, role in self.iter_rdefs(): |
|
162 rschema = schema.rschema(rtype) |
|
163 if rschema.final or (rschema.inlined and role == 'subject'): |
|
164 assert len(entity_dict[key]) <= 1, \ |
|
165 "more than one value for %s: %s (%s)" % (rtype, entity_dict[key], self.extid) |
|
166 if entity_dict[key]: |
|
167 entity_dict[rtype] = entity_dict[key].pop() |
|
168 if key != rtype: |
|
169 del entity_dict[key] |
|
170 if (rschema.final and eschema.has_metadata(rtype, 'format') |
|
171 and not rtype + '_format' in entity_dict): |
|
172 entity_dict[rtype + '_format'] = u'text/plain' |
|
173 else: |
|
174 del entity_dict[key] |
|
175 else: |
|
176 for target_extid in entity_dict.pop(key): |
|
177 if role == 'subject': |
|
178 deferred.append((self.extid, rtype, target_extid)) |
|
179 else: |
|
180 deferred.append((target_extid, rtype, self.extid)) |
|
181 return deferred |
|
182 |
|
183 def is_ready(self, extid2eid): |
|
184 """Return True if the ext entity is ready, i.e. has all the URIs used in inlined relations |
|
185 currently existing. |
|
186 """ |
|
187 assert self._schema, 'prepare() method should be called first on %s' % self |
|
188 # as .prepare has been called, we know that .values only contains subject relation *type* as |
|
189 # key (no more (rtype, role) tuple) |
|
190 schema = self._schema |
|
191 entity_dict = self.values |
|
192 for rtype in entity_dict: |
|
193 rschema = schema.rschema(rtype) |
|
194 if not rschema.final: |
|
195 # .prepare() should drop other cases from the entity dict |
|
196 assert rschema.inlined |
|
197 if not entity_dict[rtype] in extid2eid: |
|
198 return False |
|
199 # entity is ready, replace all relation's extid by eids |
|
200 for rtype in entity_dict: |
|
201 rschema = schema.rschema(rtype) |
|
202 if rschema.inlined: |
|
203 entity_dict[rtype] = extid2eid[entity_dict[rtype]] |
|
204 return True |
|
205 |
|
206 |
|
207 class ExtEntitiesImporter(object): |
|
208 """This class is responsible for importing externals entities, that is instances of |
|
209 :class:`ExtEntity`, into CubicWeb entities. |
|
210 |
|
211 Parameters: |
|
212 |
|
213 * `schema`: the CubicWeb's instance schema |
|
214 |
|
215 * `store`: a CubicWeb `Store` |
|
216 |
|
217 * `extid2eid`: optional {extid: eid} dictionary giving information on existing entities. It |
|
218 will be completed during import. You may want to use :func:`cwuri2eid` to build it. |
|
219 |
|
220 * `existing_relation`: optional {rtype: set((subj eid, obj eid))} mapping giving information on |
|
221 existing relations of a given type. You may want to use :class:`RelationMapping` to build it. |
|
222 |
|
223 * `etypes_order_hint`: optional ordered iterable on entity types, giving an hint on the order in |
|
224 which they should be attempted to be imported |
|
225 |
|
226 * `import_log`: optional object implementing the :class:`SimpleImportLog` interface to record |
|
227 events occuring during the import |
|
228 |
|
229 * `raise_on_error`: optional boolean flag - default to false, indicating whether errors should |
|
230 be raised or logged. You usually want them to be raised during test but to be logged in |
|
231 production. |
|
232 """ |
|
233 |
|
234 def __init__(self, schema, store, extid2eid=None, existing_relations=None, |
|
235 etypes_order_hint=(), import_log=None, raise_on_error=False): |
|
236 self.schema = schema |
|
237 self.store = store |
|
238 self.extid2eid = extid2eid if extid2eid is not None else {} |
|
239 self.existing_relations = (existing_relations if existing_relations is not None |
|
240 else defaultdict(set)) |
|
241 self.etypes_order_hint = etypes_order_hint |
|
242 if import_log is None: |
|
243 import_log = SimpleImportLog('<unspecified>') |
|
244 self.import_log = import_log |
|
245 self.raise_on_error = raise_on_error |
|
246 # set of created/updated eids |
|
247 self.created = set() |
|
248 self.updated = set() |
|
249 |
|
250 def import_entities(self, ext_entities): |
|
251 """Import given external entities (:class:`ExtEntity`) stream (usually a generator).""" |
|
252 # {etype: [etype dict]} of entities that are in the import queue |
|
253 queue = {} |
|
254 # order entity dictionaries then create/update them |
|
255 deferred = self._import_entities(ext_entities, queue) |
|
256 # create deferred relations that don't exist already |
|
257 missing_relations = self.prepare_insert_deferred_relations(deferred) |
|
258 self._warn_about_missing_work(queue, missing_relations) |
|
259 |
|
260 def _import_entities(self, ext_entities, queue): |
|
261 extid2eid = self.extid2eid |
|
262 deferred = {} # non inlined relations that may be deferred |
|
263 self.import_log.record_debug('importing entities') |
|
264 for ext_entity in self.iter_ext_entities(ext_entities, deferred, queue): |
|
265 try: |
|
266 eid = extid2eid[ext_entity.extid] |
|
267 except KeyError: |
|
268 self.prepare_insert_entity(ext_entity) |
|
269 else: |
|
270 if ext_entity.values: |
|
271 self.prepare_update_entity(ext_entity, eid) |
|
272 return deferred |
|
273 |
|
274 def iter_ext_entities(self, ext_entities, deferred, queue): |
|
275 """Yield external entities in an order which attempts to satisfy |
|
276 schema constraints (inlined / cardinality) and to optimize the import. |
|
277 """ |
|
278 schema = self.schema |
|
279 extid2eid = self.extid2eid |
|
280 for ext_entity in ext_entities: |
|
281 # check data in the transitional representation and prepare it for |
|
282 # later insertion in the database |
|
283 for subject_uri, rtype, object_uri in ext_entity.prepare(schema): |
|
284 deferred.setdefault(rtype, set()).add((subject_uri, object_uri)) |
|
285 if not ext_entity.is_ready(extid2eid): |
|
286 queue.setdefault(ext_entity.etype, []).append(ext_entity) |
|
287 continue |
|
288 yield ext_entity |
|
289 # check for some entities in the queue that may now be ready. We'll have to restart |
|
290 # search for ready entities until no one is generated |
|
291 new = True |
|
292 while new: |
|
293 new = False |
|
294 for etype in self.etypes_order_hint: |
|
295 if etype in queue: |
|
296 new_queue = [] |
|
297 for ext_entity in queue[etype]: |
|
298 if ext_entity.is_ready(extid2eid): |
|
299 yield ext_entity |
|
300 # may unlock entity previously handled within this loop |
|
301 new = True |
|
302 else: |
|
303 new_queue.append(ext_entity) |
|
304 if new_queue: |
|
305 queue[etype][:] = new_queue |
|
306 else: |
|
307 del queue[etype] |
|
308 |
|
309 def prepare_insert_entity(self, ext_entity): |
|
310 """Call the store to prepare insertion of the given external entity""" |
|
311 eid = self.store.prepare_insert_entity(ext_entity.etype, **ext_entity.values) |
|
312 self.extid2eid[ext_entity.extid] = eid |
|
313 self.created.add(eid) |
|
314 return eid |
|
315 |
|
316 def prepare_update_entity(self, ext_entity, eid): |
|
317 """Call the store to prepare update of the given external entity""" |
|
318 self.store.prepare_update_entity(ext_entity.etype, eid, **ext_entity.values) |
|
319 self.updated.add(eid) |
|
320 |
|
321 def prepare_insert_deferred_relations(self, deferred): |
|
322 """Call the store to insert deferred relations (not handled during insertion/update for |
|
323 entities). Return a list of relations `[(subj ext id, obj ext id)]` that may not be inserted |
|
324 because the target entities don't exists yet. |
|
325 """ |
|
326 prepare_insert_relation = self.store.prepare_insert_relation |
|
327 rschema = self.schema.rschema |
|
328 extid2eid = self.extid2eid |
|
329 missing_relations = [] |
|
330 for rtype, relations in deferred.items(): |
|
331 self.import_log.record_debug('importing %s %s relations' % (len(relations), rtype)) |
|
332 symmetric = rschema(rtype).symmetric |
|
333 existing = self.existing_relations[rtype] |
|
334 for subject_uri, object_uri in relations: |
|
335 try: |
|
336 subject_eid = extid2eid[subject_uri] |
|
337 object_eid = extid2eid[object_uri] |
|
338 except KeyError: |
|
339 missing_relations.append((subject_uri, rtype, object_uri)) |
|
340 continue |
|
341 if (subject_eid, object_eid) not in existing: |
|
342 prepare_insert_relation(subject_eid, rtype, object_eid) |
|
343 existing.add((subject_eid, object_eid)) |
|
344 if symmetric: |
|
345 existing.add((object_eid, subject_eid)) |
|
346 return missing_relations |
|
347 |
|
348 def _warn_about_missing_work(self, queue, missing_relations): |
|
349 error = self.import_log.record_error |
|
350 if queue: |
|
351 msgs = ["can't create some entities, is there some cycle or " |
|
352 "missing data?"] |
|
353 for ext_entities in queue.values(): |
|
354 for ext_entity in ext_entities: |
|
355 msgs.append(str(ext_entity)) |
|
356 map(error, msgs) |
|
357 if self.raise_on_error: |
|
358 raise Exception('\n'.join(msgs)) |
|
359 if missing_relations: |
|
360 msgs = ["can't create some relations, is there missing data?"] |
|
361 for subject_uri, rtype, object_uri in missing_relations: |
|
362 msgs.append("%s %s %s" % (subject_uri, rtype, object_uri)) |
|
363 map(error, msgs) |
|
364 if self.raise_on_error: |
|
365 raise Exception('\n'.join(msgs)) |
|
366 |
|
367 |
|
368 class SimpleImportLog(object): |
|
369 """Fake CWDataImport log using a simple text format. |
|
370 |
|
371 Useful to display logs in the UI instead of storing them to the |
|
372 database. |
|
373 """ |
|
374 |
|
375 def __init__(self, filename): |
|
376 self.logs = [] |
|
377 self.filename = filename |
|
378 |
|
379 def record_debug(self, msg, path=None, line=None): |
|
380 self._log(logging.DEBUG, msg, path, line) |
|
381 |
|
382 def record_info(self, msg, path=None, line=None): |
|
383 self._log(logging.INFO, msg, path, line) |
|
384 |
|
385 def record_warning(self, msg, path=None, line=None): |
|
386 self._log(logging.WARNING, msg, path, line) |
|
387 |
|
388 def record_error(self, msg, path=None, line=None): |
|
389 self._log(logging.ERROR, msg, path, line) |
|
390 |
|
391 def record_fatal(self, msg, path=None, line=None): |
|
392 self._log(logging.FATAL, msg, path, line) |
|
393 |
|
394 def _log(self, severity, msg, path, line): |
|
395 encodedmsg = u'%s\t%s\t%s\t%s' % (severity, self.filename, |
|
396 line or u'', msg) |
|
397 self.logs.append(encodedmsg) |
|
398 |
|
399 |
|
400 class HTMLImportLog(SimpleImportLog): |
|
401 """Fake CWDataImport log using a simple HTML format.""" |
|
402 def __init__(self, filename): |
|
403 super(HTMLImportLog, self).__init__(xml_escape(filename)) |
|
404 |
|
405 def _log(self, severity, msg, path, line): |
|
406 encodedmsg = u'%s\t%s\t%s\t%s<br/>' % (severity, self.filename, |
|
407 line or u'', xml_escape(msg)) |
|
408 self.logs.append(encodedmsg) |