1 # copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
2 # contact http://www.logilab.fr -- mailto:contact@logilab.fr |
|
3 # |
|
4 # This program is free software: you can redistribute it and/or modify it under |
|
5 # the terms of the GNU Lesser General Public License as published by the Free |
|
6 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
7 # any later version. |
|
8 # |
|
9 # This program is distributed in the hope that it will be useful, but WITHOUT |
|
10 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
11 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
12 # details. |
|
13 # |
|
14 # You should have received a copy of the GNU Lesser General Public License along |
|
15 # with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 """Data import of external entities. |
|
17 |
|
18 Main entry points: |
|
19 |
|
20 .. autoclass:: ExtEntitiesImporter |
|
21 .. autoclass:: ExtEntity |
|
22 |
|
23 Utilities: |
|
24 |
|
25 .. autofunction:: cwuri2eid |
|
26 .. autoclass:: RelationMapping |
|
27 .. autofunction:: cubicweb.dataimport.importer.use_extid_as_cwuri |
|
28 """ |
|
29 |
|
30 from collections import defaultdict |
|
31 import logging |
|
32 |
|
33 from logilab.mtconverter import xml_escape |
|
34 |
|
35 |
|
36 def cwuri2eid(cnx, etypes, source_eid=None): |
|
37 """Return a dictionary mapping cwuri to eid for entities of the given entity types and / or |
|
38 source. |
|
39 """ |
|
40 assert source_eid or etypes, 'no entity types nor source specified' |
|
41 rql = 'Any U, X WHERE X cwuri U' |
|
42 args = {} |
|
43 if len(etypes) == 1: |
|
44 rql += ', X is %s' % etypes[0] |
|
45 elif etypes: |
|
46 rql += ', X is IN (%s)' % ','.join(etypes) |
|
47 if source_eid is not None: |
|
48 rql += ', X cw_source S, S eid %(s)s' |
|
49 args['s'] = source_eid |
|
50 return dict(cnx.execute(rql, args)) |
|
51 |
|
52 |
|
53 def use_extid_as_cwuri(extid2eid): |
|
54 """Return a generator of :class:`ExtEntity` objects that will set `cwuri` |
|
55 using entity's extid if the entity does not exist yet and has no `cwuri` |
|
56 defined. |
|
57 |
|
58 `extid2eid` is an extid to eid dictionary coming from an |
|
59 :class:`ExtEntitiesImporter` instance. |
|
60 |
|
61 Example usage: |
|
62 |
|
63 .. code-block:: python |
|
64 |
|
65 importer = SKOSExtEntitiesImporter(cnx, store, import_log) |
|
66 set_cwuri = use_extid_as_cwuri(importer.extid2eid) |
|
67 importer.import_entities(set_cwuri(extentities)) |
|
68 """ |
|
69 def use_extid_as_cwuri_filter(extentities): |
|
70 for extentity in extentities: |
|
71 if extentity.extid not in extid2eid: |
|
72 extentity.values.setdefault('cwuri', set([extentity.extid.decode('utf-8')])) |
|
73 yield extentity |
|
74 return use_extid_as_cwuri_filter |
|
75 |
|
76 |
|
77 class RelationMapping(object): |
|
78 """Read-only mapping from relation type to set of related (subject, object) eids. |
|
79 |
|
80 If `source` is specified, only returns relations implying entities from |
|
81 this source. |
|
82 """ |
|
83 |
|
84 def __init__(self, cnx, source=None): |
|
85 self.cnx = cnx |
|
86 self._rql_template = 'Any S,O WHERE S %s O' |
|
87 self._kwargs = {} |
|
88 if source is not None: |
|
89 self._rql_template += ', S cw_source SO, O cw_source SO, SO eid %%(s)s' |
|
90 self._kwargs['s'] = source.eid |
|
91 |
|
92 def __getitem__(self, rtype): |
|
93 """Return a set of (subject, object) eids already related by `rtype`""" |
|
94 rql = self._rql_template % rtype |
|
95 return set(tuple(x) for x in self.cnx.execute(rql, self._kwargs)) |
|
96 |
|
97 |
|
98 class ExtEntity(object): |
|
99 """Transitional representation of an entity for use in data importer. |
|
100 |
|
101 An external entity has the following properties: |
|
102 |
|
103 * ``extid`` (external id), an identifier for the ext entity, |
|
104 |
|
105 * ``etype`` (entity type), a string which must be the name of one entity type in the schema |
|
106 (eg. ``'Person'``, ``'Animal'``, ...), |
|
107 |
|
108 * ``values``, a dictionary whose keys are attribute or relation names from the schema (eg. |
|
109 ``'first_name'``, ``'friend'``), and whose values are *sets* |
|
110 |
|
111 For instance: |
|
112 |
|
113 .. code-block:: python |
|
114 |
|
115 ext_entity.extid = 'http://example.org/person/debby' |
|
116 ext_entity.etype = 'Person' |
|
117 ext_entity.values = {'first_name': set([u"Deborah", u"Debby"]), |
|
118 'friend': set(['http://example.org/person/john'])} |
|
119 |
|
120 """ |
|
121 |
|
122 def __init__(self, etype, extid, values=None): |
|
123 self.etype = etype |
|
124 self.extid = extid |
|
125 if values is None: |
|
126 values = {} |
|
127 self.values = values |
|
128 self._schema = None |
|
129 |
|
130 def __repr__(self): |
|
131 return '<%s %s %s>' % (self.etype, self.extid, self.values) |
|
132 |
|
133 def iter_rdefs(self): |
|
134 """Yield (key, rtype, role) defined in `.values` dict, with: |
|
135 |
|
136 * `key` is the original key in `.values` (i.e. the relation type or a 2-uple (relation type, |
|
137 role)) |
|
138 |
|
139 * `rtype` is a yams relation type, expected to be found in the schema (attribute or |
|
140 relation) |
|
141 |
|
142 * `role` is the role of the entity in the relation, 'subject' or 'object' |
|
143 |
|
144 Iteration is done on a copy of the keys so values may be inserted/deleted during it. |
|
145 """ |
|
146 for key in list(self.values): |
|
147 if isinstance(key, tuple): |
|
148 rtype, role = key |
|
149 assert role in ('subject', 'object'), key |
|
150 yield key, rtype, role |
|
151 else: |
|
152 yield key, key, 'subject' |
|
153 |
|
154 def prepare(self, schema): |
|
155 """Prepare an external entity for later insertion: |
|
156 |
|
157 * ensure attributes and inlined relations have a single value |
|
158 * turn set([value]) into value and remove key associated to empty set |
|
159 * remove non inlined relations and return them as a [(e1key, relation, e2key)] list |
|
160 |
|
161 Return a list of non inlined relations that may be inserted later, each relations defined by |
|
162 a 3-tuple (subject extid, relation type, object extid). |
|
163 |
|
164 Take care the importer may call this method several times. |
|
165 """ |
|
166 assert self._schema is None, 'prepare() has already been called for %s' % self |
|
167 self._schema = schema |
|
168 eschema = schema.eschema(self.etype) |
|
169 deferred = [] |
|
170 entity_dict = self.values |
|
171 for key, rtype, role in self.iter_rdefs(): |
|
172 rschema = schema.rschema(rtype) |
|
173 if rschema.final or (rschema.inlined and role == 'subject'): |
|
174 assert len(entity_dict[key]) <= 1, \ |
|
175 "more than one value for %s: %s (%s)" % (rtype, entity_dict[key], self.extid) |
|
176 if entity_dict[key]: |
|
177 entity_dict[rtype] = entity_dict[key].pop() |
|
178 if key != rtype: |
|
179 del entity_dict[key] |
|
180 if (rschema.final and eschema.has_metadata(rtype, 'format') |
|
181 and not rtype + '_format' in entity_dict): |
|
182 entity_dict[rtype + '_format'] = u'text/plain' |
|
183 else: |
|
184 del entity_dict[key] |
|
185 else: |
|
186 for target_extid in entity_dict.pop(key): |
|
187 if role == 'subject': |
|
188 deferred.append((self.extid, rtype, target_extid)) |
|
189 else: |
|
190 deferred.append((target_extid, rtype, self.extid)) |
|
191 return deferred |
|
192 |
|
193 def is_ready(self, extid2eid): |
|
194 """Return True if the ext entity is ready, i.e. has all the URIs used in inlined relations |
|
195 currently existing. |
|
196 """ |
|
197 assert self._schema, 'prepare() method should be called first on %s' % self |
|
198 # as .prepare has been called, we know that .values only contains subject relation *type* as |
|
199 # key (no more (rtype, role) tuple) |
|
200 schema = self._schema |
|
201 entity_dict = self.values |
|
202 for rtype in entity_dict: |
|
203 rschema = schema.rschema(rtype) |
|
204 if not rschema.final: |
|
205 # .prepare() should drop other cases from the entity dict |
|
206 assert rschema.inlined |
|
207 if not entity_dict[rtype] in extid2eid: |
|
208 return False |
|
209 # entity is ready, replace all relation's extid by eids |
|
210 for rtype in entity_dict: |
|
211 rschema = schema.rschema(rtype) |
|
212 if rschema.inlined: |
|
213 entity_dict[rtype] = extid2eid[entity_dict[rtype]] |
|
214 return True |
|
215 |
|
216 |
|
217 class ExtEntitiesImporter(object): |
|
218 """This class is responsible for importing externals entities, that is instances of |
|
219 :class:`ExtEntity`, into CubicWeb entities. |
|
220 |
|
221 :param schema: the CubicWeb's instance schema |
|
222 :param store: a CubicWeb `Store` |
|
223 :param extid2eid: optional {extid: eid} dictionary giving information on existing entities. It |
|
224 will be completed during import. You may want to use :func:`cwuri2eid` to build it. |
|
225 :param existing_relation: optional {rtype: set((subj eid, obj eid))} mapping giving information on |
|
226 existing relations of a given type. You may want to use :class:`RelationMapping` to build it. |
|
227 :param etypes_order_hint: optional ordered iterable on entity types, giving an hint on the order in |
|
228 which they should be attempted to be imported |
|
229 :param import_log: optional object implementing the :class:`SimpleImportLog` interface to record |
|
230 events occuring during the import |
|
231 :param raise_on_error: optional boolean flag - default to false, indicating whether errors should |
|
232 be raised or logged. You usually want them to be raised during test but to be logged in |
|
233 production. |
|
234 |
|
235 Instances of this class are meant to import external entities through :meth:`import_entities` |
|
236 which handles a stream of :class:`ExtEntity`. One may then plug arbitrary filters into the |
|
237 external entities stream. |
|
238 |
|
239 .. automethod:: import_entities |
|
240 |
|
241 """ |
|
242 |
|
243 def __init__(self, schema, store, extid2eid=None, existing_relations=None, |
|
244 etypes_order_hint=(), import_log=None, raise_on_error=False): |
|
245 self.schema = schema |
|
246 self.store = store |
|
247 self.extid2eid = extid2eid if extid2eid is not None else {} |
|
248 self.existing_relations = (existing_relations if existing_relations is not None |
|
249 else defaultdict(set)) |
|
250 self.etypes_order_hint = etypes_order_hint |
|
251 if import_log is None: |
|
252 import_log = SimpleImportLog('<unspecified>') |
|
253 self.import_log = import_log |
|
254 self.raise_on_error = raise_on_error |
|
255 # set of created/updated eids |
|
256 self.created = set() |
|
257 self.updated = set() |
|
258 |
|
259 def import_entities(self, ext_entities): |
|
260 """Import given external entities (:class:`ExtEntity`) stream (usually a generator).""" |
|
261 # {etype: [etype dict]} of entities that are in the import queue |
|
262 queue = {} |
|
263 # order entity dictionaries then create/update them |
|
264 deferred = self._import_entities(ext_entities, queue) |
|
265 # create deferred relations that don't exist already |
|
266 missing_relations = self.prepare_insert_deferred_relations(deferred) |
|
267 self._warn_about_missing_work(queue, missing_relations) |
|
268 |
|
269 def _import_entities(self, ext_entities, queue): |
|
270 extid2eid = self.extid2eid |
|
271 deferred = {} # non inlined relations that may be deferred |
|
272 self.import_log.record_debug('importing entities') |
|
273 for ext_entity in self.iter_ext_entities(ext_entities, deferred, queue): |
|
274 try: |
|
275 eid = extid2eid[ext_entity.extid] |
|
276 except KeyError: |
|
277 self.prepare_insert_entity(ext_entity) |
|
278 else: |
|
279 if ext_entity.values: |
|
280 self.prepare_update_entity(ext_entity, eid) |
|
281 return deferred |
|
282 |
|
283 def iter_ext_entities(self, ext_entities, deferred, queue): |
|
284 """Yield external entities in an order which attempts to satisfy |
|
285 schema constraints (inlined / cardinality) and to optimize the import. |
|
286 """ |
|
287 schema = self.schema |
|
288 extid2eid = self.extid2eid |
|
289 for ext_entity in ext_entities: |
|
290 # check data in the transitional representation and prepare it for |
|
291 # later insertion in the database |
|
292 for subject_uri, rtype, object_uri in ext_entity.prepare(schema): |
|
293 deferred.setdefault(rtype, set()).add((subject_uri, object_uri)) |
|
294 if not ext_entity.is_ready(extid2eid): |
|
295 queue.setdefault(ext_entity.etype, []).append(ext_entity) |
|
296 continue |
|
297 yield ext_entity |
|
298 # check for some entities in the queue that may now be ready. We'll have to restart |
|
299 # search for ready entities until no one is generated |
|
300 new = True |
|
301 while new: |
|
302 new = False |
|
303 for etype in self.etypes_order_hint: |
|
304 if etype in queue: |
|
305 new_queue = [] |
|
306 for ext_entity in queue[etype]: |
|
307 if ext_entity.is_ready(extid2eid): |
|
308 yield ext_entity |
|
309 # may unlock entity previously handled within this loop |
|
310 new = True |
|
311 else: |
|
312 new_queue.append(ext_entity) |
|
313 if new_queue: |
|
314 queue[etype][:] = new_queue |
|
315 else: |
|
316 del queue[etype] |
|
317 |
|
318 def prepare_insert_entity(self, ext_entity): |
|
319 """Call the store to prepare insertion of the given external entity""" |
|
320 eid = self.store.prepare_insert_entity(ext_entity.etype, **ext_entity.values) |
|
321 self.extid2eid[ext_entity.extid] = eid |
|
322 self.created.add(eid) |
|
323 return eid |
|
324 |
|
325 def prepare_update_entity(self, ext_entity, eid): |
|
326 """Call the store to prepare update of the given external entity""" |
|
327 self.store.prepare_update_entity(ext_entity.etype, eid, **ext_entity.values) |
|
328 self.updated.add(eid) |
|
329 |
|
330 def prepare_insert_deferred_relations(self, deferred): |
|
331 """Call the store to insert deferred relations (not handled during insertion/update for |
|
332 entities). Return a list of relations `[(subj ext id, obj ext id)]` that may not be inserted |
|
333 because the target entities don't exists yet. |
|
334 """ |
|
335 prepare_insert_relation = self.store.prepare_insert_relation |
|
336 rschema = self.schema.rschema |
|
337 extid2eid = self.extid2eid |
|
338 missing_relations = [] |
|
339 for rtype, relations in deferred.items(): |
|
340 self.import_log.record_debug('importing %s %s relations' % (len(relations), rtype)) |
|
341 symmetric = rschema(rtype).symmetric |
|
342 existing = self.existing_relations[rtype] |
|
343 for subject_uri, object_uri in relations: |
|
344 try: |
|
345 subject_eid = extid2eid[subject_uri] |
|
346 object_eid = extid2eid[object_uri] |
|
347 except KeyError: |
|
348 missing_relations.append((subject_uri, rtype, object_uri)) |
|
349 continue |
|
350 if (subject_eid, object_eid) not in existing: |
|
351 prepare_insert_relation(subject_eid, rtype, object_eid) |
|
352 existing.add((subject_eid, object_eid)) |
|
353 if symmetric: |
|
354 existing.add((object_eid, subject_eid)) |
|
355 return missing_relations |
|
356 |
|
357 def _warn_about_missing_work(self, queue, missing_relations): |
|
358 error = self.import_log.record_error |
|
359 if queue: |
|
360 msgs = ["can't create some entities, is there some cycle or " |
|
361 "missing data?"] |
|
362 for ext_entities in queue.values(): |
|
363 for ext_entity in ext_entities: |
|
364 msgs.append(str(ext_entity)) |
|
365 map(error, msgs) |
|
366 if self.raise_on_error: |
|
367 raise Exception('\n'.join(msgs)) |
|
368 if missing_relations: |
|
369 msgs = ["can't create some relations, is there missing data?"] |
|
370 for subject_uri, rtype, object_uri in missing_relations: |
|
371 msgs.append("%s %s %s" % (subject_uri, rtype, object_uri)) |
|
372 map(error, msgs) |
|
373 if self.raise_on_error: |
|
374 raise Exception('\n'.join(msgs)) |
|
375 |
|
376 |
|
377 class SimpleImportLog(object): |
|
378 """Fake CWDataImport log using a simple text format. |
|
379 |
|
380 Useful to display logs in the UI instead of storing them to the |
|
381 database. |
|
382 """ |
|
383 |
|
384 def __init__(self, filename): |
|
385 self.logs = [] |
|
386 self.filename = filename |
|
387 |
|
388 def record_debug(self, msg, path=None, line=None): |
|
389 self._log(logging.DEBUG, msg, path, line) |
|
390 |
|
391 def record_info(self, msg, path=None, line=None): |
|
392 self._log(logging.INFO, msg, path, line) |
|
393 |
|
394 def record_warning(self, msg, path=None, line=None): |
|
395 self._log(logging.WARNING, msg, path, line) |
|
396 |
|
397 def record_error(self, msg, path=None, line=None): |
|
398 self._log(logging.ERROR, msg, path, line) |
|
399 |
|
400 def record_fatal(self, msg, path=None, line=None): |
|
401 self._log(logging.FATAL, msg, path, line) |
|
402 |
|
403 def _log(self, severity, msg, path, line): |
|
404 encodedmsg = u'%s\t%s\t%s\t%s' % (severity, self.filename, |
|
405 line or u'', msg) |
|
406 self.logs.append(encodedmsg) |
|
407 |
|
408 |
|
409 class HTMLImportLog(SimpleImportLog): |
|
410 """Fake CWDataImport log using a simple HTML format.""" |
|
411 def __init__(self, filename): |
|
412 super(HTMLImportLog, self).__init__(xml_escape(filename)) |
|
413 |
|
414 def _log(self, severity, msg, path, line): |
|
415 encodedmsg = u'%s\t%s\t%s\t%s<br/>' % (severity, self.filename, |
|
416 line or u'', xml_escape(msg)) |
|
417 self.logs.append(encodedmsg) |
|