--- a/dataimport/importer.py Mon Jan 04 18:40:30 2016 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,417 +0,0 @@
-# copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
-# contact http://www.logilab.fr -- mailto:contact@logilab.fr
-#
-# This program is free software: you can redistribute it and/or modify it under
-# the terms of the GNU Lesser General Public License as published by the Free
-# Software Foundation, either version 2.1 of the License, or (at your option)
-# any later version.
-#
-# This program is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
-# details.
-#
-# You should have received a copy of the GNU Lesser General Public License along
-# with this program. If not, see <http://www.gnu.org/licenses/>.
-"""Data import of external entities.
-
-Main entry points:
-
-.. autoclass:: ExtEntitiesImporter
-.. autoclass:: ExtEntity
-
-Utilities:
-
-.. autofunction:: cwuri2eid
-.. autoclass:: RelationMapping
-.. autofunction:: cubicweb.dataimport.importer.use_extid_as_cwuri
-"""
-
-from collections import defaultdict
-import logging
-
-from logilab.mtconverter import xml_escape
-
-
-def cwuri2eid(cnx, etypes, source_eid=None):
- """Return a dictionary mapping cwuri to eid for entities of the given entity types and / or
- source.
- """
- assert source_eid or etypes, 'no entity types nor source specified'
- rql = 'Any U, X WHERE X cwuri U'
- args = {}
- if len(etypes) == 1:
- rql += ', X is %s' % etypes[0]
- elif etypes:
- rql += ', X is IN (%s)' % ','.join(etypes)
- if source_eid is not None:
- rql += ', X cw_source S, S eid %(s)s'
- args['s'] = source_eid
- return dict(cnx.execute(rql, args))
-
-
-def use_extid_as_cwuri(extid2eid):
- """Return a generator of :class:`ExtEntity` objects that will set `cwuri`
- using entity's extid if the entity does not exist yet and has no `cwuri`
- defined.
-
- `extid2eid` is an extid to eid dictionary coming from an
- :class:`ExtEntitiesImporter` instance.
-
- Example usage:
-
- .. code-block:: python
-
- importer = SKOSExtEntitiesImporter(cnx, store, import_log)
- set_cwuri = use_extid_as_cwuri(importer.extid2eid)
- importer.import_entities(set_cwuri(extentities))
- """
- def use_extid_as_cwuri_filter(extentities):
- for extentity in extentities:
- if extentity.extid not in extid2eid:
- extentity.values.setdefault('cwuri', set([extentity.extid.decode('utf-8')]))
- yield extentity
- return use_extid_as_cwuri_filter
-
-
-class RelationMapping(object):
- """Read-only mapping from relation type to set of related (subject, object) eids.
-
- If `source` is specified, only returns relations implying entities from
- this source.
- """
-
- def __init__(self, cnx, source=None):
- self.cnx = cnx
- self._rql_template = 'Any S,O WHERE S %s O'
- self._kwargs = {}
- if source is not None:
- self._rql_template += ', S cw_source SO, O cw_source SO, SO eid %%(s)s'
- self._kwargs['s'] = source.eid
-
- def __getitem__(self, rtype):
- """Return a set of (subject, object) eids already related by `rtype`"""
- rql = self._rql_template % rtype
- return set(tuple(x) for x in self.cnx.execute(rql, self._kwargs))
-
-
-class ExtEntity(object):
- """Transitional representation of an entity for use in data importer.
-
- An external entity has the following properties:
-
- * ``extid`` (external id), an identifier for the ext entity,
-
- * ``etype`` (entity type), a string which must be the name of one entity type in the schema
- (eg. ``'Person'``, ``'Animal'``, ...),
-
- * ``values``, a dictionary whose keys are attribute or relation names from the schema (eg.
- ``'first_name'``, ``'friend'``), and whose values are *sets*
-
- For instance:
-
- .. code-block:: python
-
- ext_entity.extid = 'http://example.org/person/debby'
- ext_entity.etype = 'Person'
- ext_entity.values = {'first_name': set([u"Deborah", u"Debby"]),
- 'friend': set(['http://example.org/person/john'])}
-
- """
-
- def __init__(self, etype, extid, values=None):
- self.etype = etype
- self.extid = extid
- if values is None:
- values = {}
- self.values = values
- self._schema = None
-
- def __repr__(self):
- return '<%s %s %s>' % (self.etype, self.extid, self.values)
-
- def iter_rdefs(self):
- """Yield (key, rtype, role) defined in `.values` dict, with:
-
- * `key` is the original key in `.values` (i.e. the relation type or a 2-uple (relation type,
- role))
-
- * `rtype` is a yams relation type, expected to be found in the schema (attribute or
- relation)
-
- * `role` is the role of the entity in the relation, 'subject' or 'object'
-
- Iteration is done on a copy of the keys so values may be inserted/deleted during it.
- """
- for key in list(self.values):
- if isinstance(key, tuple):
- rtype, role = key
- assert role in ('subject', 'object'), key
- yield key, rtype, role
- else:
- yield key, key, 'subject'
-
- def prepare(self, schema):
- """Prepare an external entity for later insertion:
-
- * ensure attributes and inlined relations have a single value
- * turn set([value]) into value and remove key associated to empty set
- * remove non inlined relations and return them as a [(e1key, relation, e2key)] list
-
- Return a list of non inlined relations that may be inserted later, each relations defined by
- a 3-tuple (subject extid, relation type, object extid).
-
- Take care the importer may call this method several times.
- """
- assert self._schema is None, 'prepare() has already been called for %s' % self
- self._schema = schema
- eschema = schema.eschema(self.etype)
- deferred = []
- entity_dict = self.values
- for key, rtype, role in self.iter_rdefs():
- rschema = schema.rschema(rtype)
- if rschema.final or (rschema.inlined and role == 'subject'):
- assert len(entity_dict[key]) <= 1, \
- "more than one value for %s: %s (%s)" % (rtype, entity_dict[key], self.extid)
- if entity_dict[key]:
- entity_dict[rtype] = entity_dict[key].pop()
- if key != rtype:
- del entity_dict[key]
- if (rschema.final and eschema.has_metadata(rtype, 'format')
- and not rtype + '_format' in entity_dict):
- entity_dict[rtype + '_format'] = u'text/plain'
- else:
- del entity_dict[key]
- else:
- for target_extid in entity_dict.pop(key):
- if role == 'subject':
- deferred.append((self.extid, rtype, target_extid))
- else:
- deferred.append((target_extid, rtype, self.extid))
- return deferred
-
- def is_ready(self, extid2eid):
- """Return True if the ext entity is ready, i.e. has all the URIs used in inlined relations
- currently existing.
- """
- assert self._schema, 'prepare() method should be called first on %s' % self
- # as .prepare has been called, we know that .values only contains subject relation *type* as
- # key (no more (rtype, role) tuple)
- schema = self._schema
- entity_dict = self.values
- for rtype in entity_dict:
- rschema = schema.rschema(rtype)
- if not rschema.final:
- # .prepare() should drop other cases from the entity dict
- assert rschema.inlined
- if not entity_dict[rtype] in extid2eid:
- return False
- # entity is ready, replace all relation's extid by eids
- for rtype in entity_dict:
- rschema = schema.rschema(rtype)
- if rschema.inlined:
- entity_dict[rtype] = extid2eid[entity_dict[rtype]]
- return True
-
-
-class ExtEntitiesImporter(object):
- """This class is responsible for importing externals entities, that is instances of
- :class:`ExtEntity`, into CubicWeb entities.
-
- :param schema: the CubicWeb's instance schema
- :param store: a CubicWeb `Store`
- :param extid2eid: optional {extid: eid} dictionary giving information on existing entities. It
- will be completed during import. You may want to use :func:`cwuri2eid` to build it.
- :param existing_relation: optional {rtype: set((subj eid, obj eid))} mapping giving information on
- existing relations of a given type. You may want to use :class:`RelationMapping` to build it.
- :param etypes_order_hint: optional ordered iterable on entity types, giving an hint on the order in
- which they should be attempted to be imported
- :param import_log: optional object implementing the :class:`SimpleImportLog` interface to record
- events occuring during the import
- :param raise_on_error: optional boolean flag - default to false, indicating whether errors should
- be raised or logged. You usually want them to be raised during test but to be logged in
- production.
-
- Instances of this class are meant to import external entities through :meth:`import_entities`
- which handles a stream of :class:`ExtEntity`. One may then plug arbitrary filters into the
- external entities stream.
-
- .. automethod:: import_entities
-
- """
-
- def __init__(self, schema, store, extid2eid=None, existing_relations=None,
- etypes_order_hint=(), import_log=None, raise_on_error=False):
- self.schema = schema
- self.store = store
- self.extid2eid = extid2eid if extid2eid is not None else {}
- self.existing_relations = (existing_relations if existing_relations is not None
- else defaultdict(set))
- self.etypes_order_hint = etypes_order_hint
- if import_log is None:
- import_log = SimpleImportLog('<unspecified>')
- self.import_log = import_log
- self.raise_on_error = raise_on_error
- # set of created/updated eids
- self.created = set()
- self.updated = set()
-
- def import_entities(self, ext_entities):
- """Import given external entities (:class:`ExtEntity`) stream (usually a generator)."""
- # {etype: [etype dict]} of entities that are in the import queue
- queue = {}
- # order entity dictionaries then create/update them
- deferred = self._import_entities(ext_entities, queue)
- # create deferred relations that don't exist already
- missing_relations = self.prepare_insert_deferred_relations(deferred)
- self._warn_about_missing_work(queue, missing_relations)
-
- def _import_entities(self, ext_entities, queue):
- extid2eid = self.extid2eid
- deferred = {} # non inlined relations that may be deferred
- self.import_log.record_debug('importing entities')
- for ext_entity in self.iter_ext_entities(ext_entities, deferred, queue):
- try:
- eid = extid2eid[ext_entity.extid]
- except KeyError:
- self.prepare_insert_entity(ext_entity)
- else:
- if ext_entity.values:
- self.prepare_update_entity(ext_entity, eid)
- return deferred
-
- def iter_ext_entities(self, ext_entities, deferred, queue):
- """Yield external entities in an order which attempts to satisfy
- schema constraints (inlined / cardinality) and to optimize the import.
- """
- schema = self.schema
- extid2eid = self.extid2eid
- for ext_entity in ext_entities:
- # check data in the transitional representation and prepare it for
- # later insertion in the database
- for subject_uri, rtype, object_uri in ext_entity.prepare(schema):
- deferred.setdefault(rtype, set()).add((subject_uri, object_uri))
- if not ext_entity.is_ready(extid2eid):
- queue.setdefault(ext_entity.etype, []).append(ext_entity)
- continue
- yield ext_entity
- # check for some entities in the queue that may now be ready. We'll have to restart
- # search for ready entities until no one is generated
- new = True
- while new:
- new = False
- for etype in self.etypes_order_hint:
- if etype in queue:
- new_queue = []
- for ext_entity in queue[etype]:
- if ext_entity.is_ready(extid2eid):
- yield ext_entity
- # may unlock entity previously handled within this loop
- new = True
- else:
- new_queue.append(ext_entity)
- if new_queue:
- queue[etype][:] = new_queue
- else:
- del queue[etype]
-
- def prepare_insert_entity(self, ext_entity):
- """Call the store to prepare insertion of the given external entity"""
- eid = self.store.prepare_insert_entity(ext_entity.etype, **ext_entity.values)
- self.extid2eid[ext_entity.extid] = eid
- self.created.add(eid)
- return eid
-
- def prepare_update_entity(self, ext_entity, eid):
- """Call the store to prepare update of the given external entity"""
- self.store.prepare_update_entity(ext_entity.etype, eid, **ext_entity.values)
- self.updated.add(eid)
-
- def prepare_insert_deferred_relations(self, deferred):
- """Call the store to insert deferred relations (not handled during insertion/update for
- entities). Return a list of relations `[(subj ext id, obj ext id)]` that may not be inserted
- because the target entities don't exists yet.
- """
- prepare_insert_relation = self.store.prepare_insert_relation
- rschema = self.schema.rschema
- extid2eid = self.extid2eid
- missing_relations = []
- for rtype, relations in deferred.items():
- self.import_log.record_debug('importing %s %s relations' % (len(relations), rtype))
- symmetric = rschema(rtype).symmetric
- existing = self.existing_relations[rtype]
- for subject_uri, object_uri in relations:
- try:
- subject_eid = extid2eid[subject_uri]
- object_eid = extid2eid[object_uri]
- except KeyError:
- missing_relations.append((subject_uri, rtype, object_uri))
- continue
- if (subject_eid, object_eid) not in existing:
- prepare_insert_relation(subject_eid, rtype, object_eid)
- existing.add((subject_eid, object_eid))
- if symmetric:
- existing.add((object_eid, subject_eid))
- return missing_relations
-
- def _warn_about_missing_work(self, queue, missing_relations):
- error = self.import_log.record_error
- if queue:
- msgs = ["can't create some entities, is there some cycle or "
- "missing data?"]
- for ext_entities in queue.values():
- for ext_entity in ext_entities:
- msgs.append(str(ext_entity))
- map(error, msgs)
- if self.raise_on_error:
- raise Exception('\n'.join(msgs))
- if missing_relations:
- msgs = ["can't create some relations, is there missing data?"]
- for subject_uri, rtype, object_uri in missing_relations:
- msgs.append("%s %s %s" % (subject_uri, rtype, object_uri))
- map(error, msgs)
- if self.raise_on_error:
- raise Exception('\n'.join(msgs))
-
-
-class SimpleImportLog(object):
- """Fake CWDataImport log using a simple text format.
-
- Useful to display logs in the UI instead of storing them to the
- database.
- """
-
- def __init__(self, filename):
- self.logs = []
- self.filename = filename
-
- def record_debug(self, msg, path=None, line=None):
- self._log(logging.DEBUG, msg, path, line)
-
- def record_info(self, msg, path=None, line=None):
- self._log(logging.INFO, msg, path, line)
-
- def record_warning(self, msg, path=None, line=None):
- self._log(logging.WARNING, msg, path, line)
-
- def record_error(self, msg, path=None, line=None):
- self._log(logging.ERROR, msg, path, line)
-
- def record_fatal(self, msg, path=None, line=None):
- self._log(logging.FATAL, msg, path, line)
-
- def _log(self, severity, msg, path, line):
- encodedmsg = u'%s\t%s\t%s\t%s' % (severity, self.filename,
- line or u'', msg)
- self.logs.append(encodedmsg)
-
-
-class HTMLImportLog(SimpleImportLog):
- """Fake CWDataImport log using a simple HTML format."""
- def __init__(self, filename):
- super(HTMLImportLog, self).__init__(xml_escape(filename))
-
- def _log(self, severity, msg, path, line):
- encodedmsg = u'%s\t%s\t%s\t%s<br/>' % (severity, self.filename,
- line or u'', xml_escape(msg))
- self.logs.append(encodedmsg)