diff -r 058bb3dc685f -r 0b59724cb3f2 dataimport/importer.py --- a/dataimport/importer.py Mon Jan 04 18:40:30 2016 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,417 +0,0 @@ -# copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved. -# contact http://www.logilab.fr -- mailto:contact@logilab.fr -# -# This program is free software: you can redistribute it and/or modify it under -# the terms of the GNU Lesser General Public License as published by the Free -# Software Foundation, either version 2.1 of the License, or (at your option) -# any later version. -# -# This program is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License along -# with this program. If not, see . -"""Data import of external entities. - -Main entry points: - -.. autoclass:: ExtEntitiesImporter -.. autoclass:: ExtEntity - -Utilities: - -.. autofunction:: cwuri2eid -.. autoclass:: RelationMapping -.. autofunction:: cubicweb.dataimport.importer.use_extid_as_cwuri -""" - -from collections import defaultdict -import logging - -from logilab.mtconverter import xml_escape - - -def cwuri2eid(cnx, etypes, source_eid=None): - """Return a dictionary mapping cwuri to eid for entities of the given entity types and / or - source. - """ - assert source_eid or etypes, 'no entity types nor source specified' - rql = 'Any U, X WHERE X cwuri U' - args = {} - if len(etypes) == 1: - rql += ', X is %s' % etypes[0] - elif etypes: - rql += ', X is IN (%s)' % ','.join(etypes) - if source_eid is not None: - rql += ', X cw_source S, S eid %(s)s' - args['s'] = source_eid - return dict(cnx.execute(rql, args)) - - -def use_extid_as_cwuri(extid2eid): - """Return a generator of :class:`ExtEntity` objects that will set `cwuri` - using entity's extid if the entity does not exist yet and has no `cwuri` - defined. - - `extid2eid` is an extid to eid dictionary coming from an - :class:`ExtEntitiesImporter` instance. - - Example usage: - - .. code-block:: python - - importer = SKOSExtEntitiesImporter(cnx, store, import_log) - set_cwuri = use_extid_as_cwuri(importer.extid2eid) - importer.import_entities(set_cwuri(extentities)) - """ - def use_extid_as_cwuri_filter(extentities): - for extentity in extentities: - if extentity.extid not in extid2eid: - extentity.values.setdefault('cwuri', set([extentity.extid.decode('utf-8')])) - yield extentity - return use_extid_as_cwuri_filter - - -class RelationMapping(object): - """Read-only mapping from relation type to set of related (subject, object) eids. - - If `source` is specified, only returns relations implying entities from - this source. - """ - - def __init__(self, cnx, source=None): - self.cnx = cnx - self._rql_template = 'Any S,O WHERE S %s O' - self._kwargs = {} - if source is not None: - self._rql_template += ', S cw_source SO, O cw_source SO, SO eid %%(s)s' - self._kwargs['s'] = source.eid - - def __getitem__(self, rtype): - """Return a set of (subject, object) eids already related by `rtype`""" - rql = self._rql_template % rtype - return set(tuple(x) for x in self.cnx.execute(rql, self._kwargs)) - - -class ExtEntity(object): - """Transitional representation of an entity for use in data importer. - - An external entity has the following properties: - - * ``extid`` (external id), an identifier for the ext entity, - - * ``etype`` (entity type), a string which must be the name of one entity type in the schema - (eg. ``'Person'``, ``'Animal'``, ...), - - * ``values``, a dictionary whose keys are attribute or relation names from the schema (eg. - ``'first_name'``, ``'friend'``), and whose values are *sets* - - For instance: - - .. code-block:: python - - ext_entity.extid = 'http://example.org/person/debby' - ext_entity.etype = 'Person' - ext_entity.values = {'first_name': set([u"Deborah", u"Debby"]), - 'friend': set(['http://example.org/person/john'])} - - """ - - def __init__(self, etype, extid, values=None): - self.etype = etype - self.extid = extid - if values is None: - values = {} - self.values = values - self._schema = None - - def __repr__(self): - return '<%s %s %s>' % (self.etype, self.extid, self.values) - - def iter_rdefs(self): - """Yield (key, rtype, role) defined in `.values` dict, with: - - * `key` is the original key in `.values` (i.e. the relation type or a 2-uple (relation type, - role)) - - * `rtype` is a yams relation type, expected to be found in the schema (attribute or - relation) - - * `role` is the role of the entity in the relation, 'subject' or 'object' - - Iteration is done on a copy of the keys so values may be inserted/deleted during it. - """ - for key in list(self.values): - if isinstance(key, tuple): - rtype, role = key - assert role in ('subject', 'object'), key - yield key, rtype, role - else: - yield key, key, 'subject' - - def prepare(self, schema): - """Prepare an external entity for later insertion: - - * ensure attributes and inlined relations have a single value - * turn set([value]) into value and remove key associated to empty set - * remove non inlined relations and return them as a [(e1key, relation, e2key)] list - - Return a list of non inlined relations that may be inserted later, each relations defined by - a 3-tuple (subject extid, relation type, object extid). - - Take care the importer may call this method several times. - """ - assert self._schema is None, 'prepare() has already been called for %s' % self - self._schema = schema - eschema = schema.eschema(self.etype) - deferred = [] - entity_dict = self.values - for key, rtype, role in self.iter_rdefs(): - rschema = schema.rschema(rtype) - if rschema.final or (rschema.inlined and role == 'subject'): - assert len(entity_dict[key]) <= 1, \ - "more than one value for %s: %s (%s)" % (rtype, entity_dict[key], self.extid) - if entity_dict[key]: - entity_dict[rtype] = entity_dict[key].pop() - if key != rtype: - del entity_dict[key] - if (rschema.final and eschema.has_metadata(rtype, 'format') - and not rtype + '_format' in entity_dict): - entity_dict[rtype + '_format'] = u'text/plain' - else: - del entity_dict[key] - else: - for target_extid in entity_dict.pop(key): - if role == 'subject': - deferred.append((self.extid, rtype, target_extid)) - else: - deferred.append((target_extid, rtype, self.extid)) - return deferred - - def is_ready(self, extid2eid): - """Return True if the ext entity is ready, i.e. has all the URIs used in inlined relations - currently existing. - """ - assert self._schema, 'prepare() method should be called first on %s' % self - # as .prepare has been called, we know that .values only contains subject relation *type* as - # key (no more (rtype, role) tuple) - schema = self._schema - entity_dict = self.values - for rtype in entity_dict: - rschema = schema.rschema(rtype) - if not rschema.final: - # .prepare() should drop other cases from the entity dict - assert rschema.inlined - if not entity_dict[rtype] in extid2eid: - return False - # entity is ready, replace all relation's extid by eids - for rtype in entity_dict: - rschema = schema.rschema(rtype) - if rschema.inlined: - entity_dict[rtype] = extid2eid[entity_dict[rtype]] - return True - - -class ExtEntitiesImporter(object): - """This class is responsible for importing externals entities, that is instances of - :class:`ExtEntity`, into CubicWeb entities. - - :param schema: the CubicWeb's instance schema - :param store: a CubicWeb `Store` - :param extid2eid: optional {extid: eid} dictionary giving information on existing entities. It - will be completed during import. You may want to use :func:`cwuri2eid` to build it. - :param existing_relation: optional {rtype: set((subj eid, obj eid))} mapping giving information on - existing relations of a given type. You may want to use :class:`RelationMapping` to build it. - :param etypes_order_hint: optional ordered iterable on entity types, giving an hint on the order in - which they should be attempted to be imported - :param import_log: optional object implementing the :class:`SimpleImportLog` interface to record - events occuring during the import - :param raise_on_error: optional boolean flag - default to false, indicating whether errors should - be raised or logged. You usually want them to be raised during test but to be logged in - production. - - Instances of this class are meant to import external entities through :meth:`import_entities` - which handles a stream of :class:`ExtEntity`. One may then plug arbitrary filters into the - external entities stream. - - .. automethod:: import_entities - - """ - - def __init__(self, schema, store, extid2eid=None, existing_relations=None, - etypes_order_hint=(), import_log=None, raise_on_error=False): - self.schema = schema - self.store = store - self.extid2eid = extid2eid if extid2eid is not None else {} - self.existing_relations = (existing_relations if existing_relations is not None - else defaultdict(set)) - self.etypes_order_hint = etypes_order_hint - if import_log is None: - import_log = SimpleImportLog('') - self.import_log = import_log - self.raise_on_error = raise_on_error - # set of created/updated eids - self.created = set() - self.updated = set() - - def import_entities(self, ext_entities): - """Import given external entities (:class:`ExtEntity`) stream (usually a generator).""" - # {etype: [etype dict]} of entities that are in the import queue - queue = {} - # order entity dictionaries then create/update them - deferred = self._import_entities(ext_entities, queue) - # create deferred relations that don't exist already - missing_relations = self.prepare_insert_deferred_relations(deferred) - self._warn_about_missing_work(queue, missing_relations) - - def _import_entities(self, ext_entities, queue): - extid2eid = self.extid2eid - deferred = {} # non inlined relations that may be deferred - self.import_log.record_debug('importing entities') - for ext_entity in self.iter_ext_entities(ext_entities, deferred, queue): - try: - eid = extid2eid[ext_entity.extid] - except KeyError: - self.prepare_insert_entity(ext_entity) - else: - if ext_entity.values: - self.prepare_update_entity(ext_entity, eid) - return deferred - - def iter_ext_entities(self, ext_entities, deferred, queue): - """Yield external entities in an order which attempts to satisfy - schema constraints (inlined / cardinality) and to optimize the import. - """ - schema = self.schema - extid2eid = self.extid2eid - for ext_entity in ext_entities: - # check data in the transitional representation and prepare it for - # later insertion in the database - for subject_uri, rtype, object_uri in ext_entity.prepare(schema): - deferred.setdefault(rtype, set()).add((subject_uri, object_uri)) - if not ext_entity.is_ready(extid2eid): - queue.setdefault(ext_entity.etype, []).append(ext_entity) - continue - yield ext_entity - # check for some entities in the queue that may now be ready. We'll have to restart - # search for ready entities until no one is generated - new = True - while new: - new = False - for etype in self.etypes_order_hint: - if etype in queue: - new_queue = [] - for ext_entity in queue[etype]: - if ext_entity.is_ready(extid2eid): - yield ext_entity - # may unlock entity previously handled within this loop - new = True - else: - new_queue.append(ext_entity) - if new_queue: - queue[etype][:] = new_queue - else: - del queue[etype] - - def prepare_insert_entity(self, ext_entity): - """Call the store to prepare insertion of the given external entity""" - eid = self.store.prepare_insert_entity(ext_entity.etype, **ext_entity.values) - self.extid2eid[ext_entity.extid] = eid - self.created.add(eid) - return eid - - def prepare_update_entity(self, ext_entity, eid): - """Call the store to prepare update of the given external entity""" - self.store.prepare_update_entity(ext_entity.etype, eid, **ext_entity.values) - self.updated.add(eid) - - def prepare_insert_deferred_relations(self, deferred): - """Call the store to insert deferred relations (not handled during insertion/update for - entities). Return a list of relations `[(subj ext id, obj ext id)]` that may not be inserted - because the target entities don't exists yet. - """ - prepare_insert_relation = self.store.prepare_insert_relation - rschema = self.schema.rschema - extid2eid = self.extid2eid - missing_relations = [] - for rtype, relations in deferred.items(): - self.import_log.record_debug('importing %s %s relations' % (len(relations), rtype)) - symmetric = rschema(rtype).symmetric - existing = self.existing_relations[rtype] - for subject_uri, object_uri in relations: - try: - subject_eid = extid2eid[subject_uri] - object_eid = extid2eid[object_uri] - except KeyError: - missing_relations.append((subject_uri, rtype, object_uri)) - continue - if (subject_eid, object_eid) not in existing: - prepare_insert_relation(subject_eid, rtype, object_eid) - existing.add((subject_eid, object_eid)) - if symmetric: - existing.add((object_eid, subject_eid)) - return missing_relations - - def _warn_about_missing_work(self, queue, missing_relations): - error = self.import_log.record_error - if queue: - msgs = ["can't create some entities, is there some cycle or " - "missing data?"] - for ext_entities in queue.values(): - for ext_entity in ext_entities: - msgs.append(str(ext_entity)) - map(error, msgs) - if self.raise_on_error: - raise Exception('\n'.join(msgs)) - if missing_relations: - msgs = ["can't create some relations, is there missing data?"] - for subject_uri, rtype, object_uri in missing_relations: - msgs.append("%s %s %s" % (subject_uri, rtype, object_uri)) - map(error, msgs) - if self.raise_on_error: - raise Exception('\n'.join(msgs)) - - -class SimpleImportLog(object): - """Fake CWDataImport log using a simple text format. - - Useful to display logs in the UI instead of storing them to the - database. - """ - - def __init__(self, filename): - self.logs = [] - self.filename = filename - - def record_debug(self, msg, path=None, line=None): - self._log(logging.DEBUG, msg, path, line) - - def record_info(self, msg, path=None, line=None): - self._log(logging.INFO, msg, path, line) - - def record_warning(self, msg, path=None, line=None): - self._log(logging.WARNING, msg, path, line) - - def record_error(self, msg, path=None, line=None): - self._log(logging.ERROR, msg, path, line) - - def record_fatal(self, msg, path=None, line=None): - self._log(logging.FATAL, msg, path, line) - - def _log(self, severity, msg, path, line): - encodedmsg = u'%s\t%s\t%s\t%s' % (severity, self.filename, - line or u'', msg) - self.logs.append(encodedmsg) - - -class HTMLImportLog(SimpleImportLog): - """Fake CWDataImport log using a simple HTML format.""" - def __init__(self, filename): - super(HTMLImportLog, self).__init__(xml_escape(filename)) - - def _log(self, severity, msg, path, line): - encodedmsg = u'%s\t%s\t%s\t%s
' % (severity, self.filename, - line or u'', xml_escape(msg)) - self.logs.append(encodedmsg)