--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/dataimport/importer.py Fri Jun 26 16:09:27 2015 +0200
@@ -0,0 +1,408 @@
+# copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr -- mailto:contact@logilab.fr
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+# You should have received a copy of the GNU Lesser General Public License along
+# with this program. If not, see <http://www.gnu.org/licenses/>.
+"""This module contains tools to programmatically import external data into CubicWeb. It's designed
+on top of the store concept to leverage possibility of code sharing accross various data import
+The following classes are defined:
+* :class:`ExtEntity`: some intermediate representation of data to import, using external identifier
+ but no eid,
+* :class:`ExtEntitiesImporter`: class responsible for turning ExtEntity's extid to eid, and create
+ or update CubicWeb entities accordingly (using a Store).
+What is left to do is to write a class or a function that will yield external entities from some
+data source (eg RDF, CSV) which will be case dependant (the *generator*). You may then plug
+arbitrary filters into the external entities stream between the generator and the importer, allowing
+to have some generic generators whose generated content is rafined by specific filters.
+.. code-block:: python
+ ext_entities = fetch(<source>) # function yielding external entities
+ log = SimpleImportLog('<source file/url/whatever>')
+ importer = ExtEntitiesImporter(cnx, store, import_log=log)
+ importer.import_entities(ext_entities)
+Here are the two classes that you'll have to deal with, and maybe to override:
+.. autoclass:: cubicweb.dataimport.importer.ExtEntitiesImporter
+.. autoclass:: cubicweb.dataimport.importer.ExtEntity
+from collections import defaultdict
+import logging
+from logilab.mtconverter import xml_escape
+def cwuri2eid(cnx, etypes, source_eid=None):
+ """Return a dictionary mapping cwuri to eid for entities of the given entity types and / or
+ source.
+ """
+ assert source_eid or etypes, 'no entity types nor source specified'
+ rql = 'Any U, X WHERE X cwuri U'
+ args = {}
+ if len(etypes) == 1:
+ rql += ', X is %s' % etypes[0]
+ elif etypes:
+ rql += ', X is IN (%s)' % ','.join(etypes)
+ if source_eid is not None:
+ rql += ', X cw_source S, S eid %(s)s'
+ args['s'] = source_eid
+ return dict(cnx.execute(rql, args))
+class RelationMapping(object):
+ """Read-only mapping from relation type to set of related (subject, object) eids.
+ If `source` is specified, only returns relations implying entities from
+ this source.
+ """
+ def __init__(self, cnx, source=None):
+ self.cnx = cnx
+ self._rql_template = 'Any S,O WHERE S {} O'
+ self._kwargs = {}
+ if source is not None:
+ self._rql_template += ', S cw_source SO, O cw_source SO, SO eid %(s)s'
+ self._kwargs['s'] = source.eid
+ def __getitem__(self, rtype):
+ """Return a set of (subject, object) eids already related by `rtype`"""
+ rql = self._rql_template.format(rtype)
+ return set(tuple(x) for x in self.cnx.execute(rql, self._kwargs))
+class ExtEntity(object):
+ """Transitional representation of an entity for use in data importer.
+ An external entity has the following properties:
+ * ``extid`` (external id), an identifier for the ext entity,
+ * ``etype`` (entity type), a string which must be the name of one entity type in the schema
+ (eg. ``'Person'``, ``'Animal'``, ...),
+ * ``values``, a dictionary whose keys are attribute or relation names from the schema (eg.
+ ``'first_name'``, ``'friend'``), and whose values are *sets*
+ For instance:
+ ..code-block::python
+ ext_entity.extid = 'http://example.org/person/debby'
+ ext_entity.etype = 'Person'
+ ext_entity.values = {'first_name': set([u"Deborah", u"Debby"]),
+ 'friend': set(['http://example.org/person/john'])}
+ """
+ def __init__(self, etype, extid, values=None):
+ self.etype = etype
+ self.extid = extid
+ if values is None:
+ values = {}
+ self.values = values
+ self._schema = None
+ def __repr__(self):
+ return '<%s %s %s>' % (self.etype, self.extid, self.values)
+ def iter_rdefs(self):
+ """Yield (key, rtype, role) defined in `.values` dict, with:
+ * `key` is the original key in `.values` (i.e. the relation type or a 2-uple (relation type,
+ role))
+ * `rtype` is a yams relation type, expected to be found in the schema (attribute or
+ relation)
+ * `role` is the role of the entity in the relation, 'subject' or 'object'
+ Iteration is done on a copy of the keys so values may be inserted/deleted during it.
+ """
+ for key in list(self.values):
+ if isinstance(key, tuple):
+ rtype, role = key
+ assert role in ('subject', 'object'), key
+ yield key, rtype, role
+ else:
+ yield key, key, 'subject'
+ def prepare(self, schema):
+ """Prepare an external entity for later insertion:
+ * ensure attributes and inlined relations have a single value
+ * turn set([value]) into value and remove key associated to empty set
+ * remove non inlined relations and return them as a [(e1key, relation, e2key)] list
+ Return a list of non inlined relations that may be inserted later, each relations defined by
+ a 3-tuple (subject extid, relation type, object extid).
+ Take care the importer may call this method several times.
+ """
+ assert self._schema is None, 'prepare() has already been called for %s' % self
+ self._schema = schema
+ eschema = schema.eschema(self.etype)
+ deferred = []
+ entity_dict = self.values
+ for key, rtype, role in self.iter_rdefs():
+ rschema = schema.rschema(rtype)
+ if rschema.final or (rschema.inlined and role == 'subject'):
+ assert len(entity_dict[key]) <= 1, \
+ "more than one value for %s: %s (%s)" % (rtype, entity_dict[key], self.extid)
+ if entity_dict[key]:
+ entity_dict[rtype] = entity_dict[key].pop()
+ if key != rtype:
+ del entity_dict[key]
+ if (rschema.final and eschema.has_metadata(rtype, 'format')
+ and not rtype + '_format' in entity_dict):
+ entity_dict[rtype + '_format'] = u'text/plain'
+ else:
+ del entity_dict[key]
+ else:
+ for target_extid in entity_dict.pop(key):
+ if role == 'subject':
+ deferred.append((self.extid, rtype, target_extid))
+ else:
+ deferred.append((target_extid, rtype, self.extid))
+ return deferred
+ def is_ready(self, extid2eid):
+ """Return True if the ext entity is ready, i.e. has all the URIs used in inlined relations
+ currently existing.
+ """
+ assert self._schema, 'prepare() method should be called first on %s' % self
+ # as .prepare has been called, we know that .values only contains subject relation *type* as
+ # key (no more (rtype, role) tuple)
+ schema = self._schema
+ entity_dict = self.values
+ for rtype in entity_dict:
+ rschema = schema.rschema(rtype)
+ if not rschema.final:
+ # .prepare() should drop other cases from the entity dict
+ assert rschema.inlined
+ if not entity_dict[rtype] in extid2eid:
+ return False
+ # entity is ready, replace all relation's extid by eids
+ for rtype in entity_dict:
+ rschema = schema.rschema(rtype)
+ if rschema.inlined:
+ entity_dict[rtype] = extid2eid[entity_dict[rtype]]
+ return True
+class ExtEntitiesImporter(object):
+ """This class is responsible for importing externals entities, that is instances of
+ :class:`ExtEntity`, into CubicWeb entities.
+ Parameters:
+ * `schema`: the CubicWeb's instance schema
+ * `store`: a CubicWeb `Store`
+ * `extid2eid`: optional {extid: eid} dictionary giving information on existing entities. It
+ will be completed during import. You may want to use :func:`cwuri2eid` to build it.
+ * `existing_relation`: optional {rtype: set((subj eid, obj eid))} mapping giving information on
+ existing relations of a given type. You may want to use :class:`RelationMapping` to build it.
+ * `etypes_order_hint`: optional ordered iterable on entity types, giving an hint on the order in
+ which they should be attempted to be imported
+ * `import_log`: optional object implementing the :class:`SimpleImportLog` interface to record
+ events occuring during the import
+ * `raise_on_error`: optional boolean flag - default to false, indicating whether errors should
+ be raised or logged. You usually want them to be raised during test but to be logged in
+ production.
+ """
+ def __init__(self, schema, store, extid2eid=None, existing_relations=None,
+ etypes_order_hint=(), import_log=None, raise_on_error=False):
+ self.schema = schema
+ self.store = store
+ self.extid2eid = extid2eid if extid2eid is not None else {}
+ self.existing_relations = (existing_relations if existing_relations is not None
+ else defaultdict(set))
+ self.etypes_order_hint = etypes_order_hint
+ if import_log is None:
+ import_log = SimpleImportLog('<unspecified>')
+ self.import_log = import_log
+ self.raise_on_error = raise_on_error
+ # set of created/updated eids
+ self.created = set()
+ self.updated = set()
+ def import_entities(self, ext_entities):
+ """Import given external entities (:class:`ExtEntity`) stream (usually a generator)."""
+ # {etype: [etype dict]} of entities that are in the import queue
+ queue = {}
+ # order entity dictionaries then create/update them
+ deferred = self._import_entities(ext_entities, queue)
+ # create deferred relations that don't exist already
+ missing_relations = self.prepare_insert_deferred_relations(deferred)
+ self._warn_about_missing_work(queue, missing_relations)
+ def _import_entities(self, ext_entities, queue):
+ extid2eid = self.extid2eid
+ deferred = {} # non inlined relations that may be deferred
+ self.import_log.record_debug('importing entities')
+ for ext_entity in self.iter_ext_entities(ext_entities, deferred, queue):
+ try:
+ eid = extid2eid[ext_entity.extid]
+ except KeyError:
+ self.prepare_insert_entity(ext_entity)
+ else:
+ if ext_entity.values:
+ self.prepare_update_entity(ext_entity, eid)
+ return deferred
+ def iter_ext_entities(self, ext_entities, deferred, queue):
+ """Yield external entities in an order which attempts to satisfy
+ schema constraints (inlined / cardinality) and to optimize the import.
+ """
+ schema = self.schema
+ extid2eid = self.extid2eid
+ for ext_entity in ext_entities:
+ # check data in the transitional representation and prepare it for
+ # later insertion in the database
+ for subject_uri, rtype, object_uri in ext_entity.prepare(schema):
+ deferred.setdefault(rtype, set()).add((subject_uri, object_uri))
+ if not ext_entity.is_ready(extid2eid):
+ queue.setdefault(ext_entity.etype, []).append(ext_entity)
+ continue
+ yield ext_entity
+ # check for some entities in the queue that may now be ready. We'll have to restart
+ # search for ready entities until no one is generated
+ new = True
+ while new:
+ new = False
+ for etype in self.etypes_order_hint:
+ if etype in queue:
+ new_queue = []
+ for ext_entity in queue[etype]:
+ if ext_entity.is_ready(extid2eid):
+ yield ext_entity
+ # may unlock entity previously handled within this loop
+ new = True
+ else:
+ new_queue.append(ext_entity)
+ if new_queue:
+ queue[etype][:] = new_queue
+ else:
+ del queue[etype]
+ def prepare_insert_entity(self, ext_entity):
+ """Call the store to prepare insertion of the given external entity"""
+ eid = self.store.prepare_insert_entity(ext_entity.etype, **ext_entity.values)
+ self.extid2eid[ext_entity.extid] = eid
+ self.created.add(eid)
+ return eid
+ def prepare_update_entity(self, ext_entity, eid):
+ """Call the store to prepare update of the given external entity"""
+ self.store.prepare_update_entity(ext_entity.etype, eid, **ext_entity.values)
+ self.updated.add(eid)
+ def prepare_insert_deferred_relations(self, deferred):
+ """Call the store to insert deferred relations (not handled during insertion/update for
+ entities). Return a list of relations `[(subj ext id, obj ext id)]` that may not be inserted
+ because the target entities don't exists yet.
+ """
+ prepare_insert_relation = self.store.prepare_insert_relation
+ rschema = self.schema.rschema
+ extid2eid = self.extid2eid
+ missing_relations = []
+ for rtype, relations in deferred.items():
+ self.import_log.record_debug('importing %s %s relations' % (len(relations), rtype))
+ symmetric = rschema(rtype).symmetric
+ existing = self.existing_relations[rtype]
+ for subject_uri, object_uri in relations:
+ try:
+ subject_eid = extid2eid[subject_uri]
+ object_eid = extid2eid[object_uri]
+ except KeyError:
+ missing_relations.append((subject_uri, rtype, object_uri))
+ continue
+ if (subject_eid, object_eid) not in existing:
+ prepare_insert_relation(subject_eid, rtype, object_eid)
+ existing.add((subject_eid, object_eid))
+ if symmetric:
+ existing.add((object_eid, subject_eid))
+ return missing_relations
+ def _warn_about_missing_work(self, queue, missing_relations):
+ error = self.import_log.record_error
+ if queue:
+ msgs = ["can't create some entities, is there some cycle or "
+ "missing data?"]
+ for ext_entities in queue.values():
+ for ext_entity in ext_entities:
+ msgs.append(str(ext_entity))
+ map(error, msgs)
+ if self.raise_on_error:
+ raise Exception('\n'.join(msgs))
+ if missing_relations:
+ msgs = ["can't create some relations, is there missing data?"]
+ for subject_uri, rtype, object_uri in missing_relations:
+ msgs.append("%s %s %s" % (subject_uri, rtype, object_uri))
+ map(error, msgs)
+ if self.raise_on_error:
+ raise Exception('\n'.join(msgs))
+class SimpleImportLog(object):
+ """Fake CWDataImport log using a simple text format.
+ Useful to display logs in the UI instead of storing them to the
+ database.
+ """
+ def __init__(self, filename):
+ self.logs = []
+ self.filename = filename
+ def record_debug(self, msg, path=None, line=None):
+ self._log(logging.DEBUG, msg, path, line)
+ def record_info(self, msg, path=None, line=None):
+ self._log(logging.INFO, msg, path, line)
+ def record_warning(self, msg, path=None, line=None):
+ self._log(logging.WARNING, msg, path, line)
+ def record_error(self, msg, path=None, line=None):
+ self._log(logging.ERROR, msg, path, line)
+ def record_fatal(self, msg, path=None, line=None):
+ self._log(logging.FATAL, msg, path, line)
+ def _log(self, severity, msg, path, line):
+ encodedmsg = u'%s\t%s\t%s\t%s' % (severity, self.filename,
+ line or u'', msg)
+ self.logs.append(encodedmsg)
+class HTMLImportLog(SimpleImportLog):
+ """Fake CWDataImport log using a simple HTML format."""
+ def __init__(self, filename):
+ super(HTMLImportLog, self).__init__(xml_escape(filename))
+ def _log(self, severity, msg, path, line):
+ encodedmsg = u'%s\t%s\t%s\t%s<br/>' % (severity, self.filename,
+ line or u'', xml_escape(msg))
+ self.logs.append(encodedmsg)