changeset 11057 0b59724cb3f2
parent 10939 b30c2f49da57
child 11128 9b4de34ad394
equal deleted inserted replaced
11052:058bb3dc685f 11057:0b59724cb3f2
     1 # copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     2 # contact --
     3 #
     4 # This program is free software: you can redistribute it and/or modify it under
     5 # the terms of the GNU Lesser General Public License as published by the Free
     6 # Software Foundation, either version 2.1 of the License, or (at your option)
     7 # any later version.
     8 #
     9 # This program is distributed in the hope that it will be useful, but WITHOUT
    10 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
    11 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
    12 # details.
    13 #
    14 # You should have received a copy of the GNU Lesser General Public License along
    15 # with this program. If not, see <>.
    16 """Data import of external entities.
    18 Main entry points:
    20 .. autoclass:: ExtEntitiesImporter
    21 .. autoclass:: ExtEntity
    23 Utilities:
    25 .. autofunction:: cwuri2eid
    26 .. autoclass:: RelationMapping
    27 .. autofunction:: cubicweb.dataimport.importer.use_extid_as_cwuri
    28 """
    30 from collections import defaultdict
    31 import logging
    33 from logilab.mtconverter import xml_escape
    36 def cwuri2eid(cnx, etypes, source_eid=None):
    37     """Return a dictionary mapping cwuri to eid for entities of the given entity types and / or
    38     source.
    39     """
    40     assert source_eid or etypes, 'no entity types nor source specified'
    41     rql = 'Any U, X WHERE X cwuri U'
    42     args = {}
    43     if len(etypes) == 1:
    44         rql += ', X is %s' % etypes[0]
    45     elif etypes:
    46         rql += ', X is IN (%s)' % ','.join(etypes)
    47     if source_eid is not None:
    48         rql += ', X cw_source S, S eid %(s)s'
    49         args['s'] = source_eid
    50     return dict(cnx.execute(rql, args))
    53 def use_extid_as_cwuri(extid2eid):
    54     """Return a generator of :class:`ExtEntity` objects that will set `cwuri`
    55     using entity's extid if the entity does not exist yet and has no `cwuri`
    56     defined.
    58     `extid2eid` is an extid to eid dictionary coming from an
    59     :class:`ExtEntitiesImporter` instance.
    61     Example usage:
    63     .. code-block:: python
    65         importer = SKOSExtEntitiesImporter(cnx, store, import_log)
    66         set_cwuri = use_extid_as_cwuri(importer.extid2eid)
    67         importer.import_entities(set_cwuri(extentities))
    68     """
    69     def use_extid_as_cwuri_filter(extentities):
    70         for extentity in extentities:
    71             if extentity.extid not in extid2eid:
    72                 extentity.values.setdefault('cwuri', set([extentity.extid.decode('utf-8')]))
    73             yield extentity
    74     return use_extid_as_cwuri_filter
    77 class RelationMapping(object):
    78     """Read-only mapping from relation type to set of related (subject, object) eids.
    80     If `source` is specified, only returns relations implying entities from
    81     this source.
    82     """
    84     def __init__(self, cnx, source=None):
    85         self.cnx = cnx
    86         self._rql_template = 'Any S,O WHERE S %s O'
    87         self._kwargs = {}
    88         if source is not None:
    89             self._rql_template += ', S cw_source SO, O cw_source SO, SO eid %%(s)s'
    90             self._kwargs['s'] = source.eid
    92     def __getitem__(self, rtype):
    93         """Return a set of (subject, object) eids already related by `rtype`"""
    94         rql = self._rql_template % rtype
    95         return set(tuple(x) for x in self.cnx.execute(rql, self._kwargs))
    98 class ExtEntity(object):
    99     """Transitional representation of an entity for use in data importer.
   101     An external entity has the following properties:
   103     * ``extid`` (external id), an identifier for the ext entity,
   105     * ``etype`` (entity type), a string which must be the name of one entity type in the schema
   106       (eg. ``'Person'``, ``'Animal'``, ...),
   108     * ``values``, a dictionary whose keys are attribute or relation names from the schema (eg.
   109       ``'first_name'``, ``'friend'``), and whose values are *sets*
   111     For instance:
   113     .. code-block:: python
   115         ext_entity.extid = ''
   116         ext_entity.etype = 'Person'
   117         ext_entity.values = {'first_name': set([u"Deborah", u"Debby"]),
   118                             'friend': set([''])}
   120     """
   122     def __init__(self, etype, extid, values=None):
   123         self.etype = etype
   124         self.extid = extid
   125         if values is None:
   126             values = {}
   127         self.values = values
   128         self._schema = None
   130     def __repr__(self):
   131         return '<%s %s %s>' % (self.etype, self.extid, self.values)
   133     def iter_rdefs(self):
   134         """Yield (key, rtype, role) defined in `.values` dict, with:
   136         * `key` is the original key in `.values` (i.e. the relation type or a 2-uple (relation type,
   137           role))
   139         * `rtype` is a yams relation type, expected to be found in the schema (attribute or
   140           relation)
   142         * `role` is the role of the entity in the relation, 'subject' or 'object'
   144         Iteration is done on a copy of the keys so values may be inserted/deleted during it.
   145         """
   146         for key in list(self.values):
   147             if isinstance(key, tuple):
   148                 rtype, role = key
   149                 assert role in ('subject', 'object'), key
   150                 yield key, rtype, role
   151             else:
   152                 yield key, key, 'subject'
   154     def prepare(self, schema):
   155         """Prepare an external entity for later insertion:
   157         * ensure attributes and inlined relations have a single value
   158         * turn set([value]) into value and remove key associated to empty set
   159         * remove non inlined relations and return them as a [(e1key, relation, e2key)] list
   161         Return a list of non inlined relations that may be inserted later, each relations defined by
   162         a 3-tuple (subject extid, relation type, object extid).
   164         Take care the importer may call this method several times.
   165         """
   166         assert self._schema is None, 'prepare() has already been called for %s' % self
   167         self._schema = schema
   168         eschema = schema.eschema(self.etype)
   169         deferred = []
   170         entity_dict = self.values
   171         for key, rtype, role in self.iter_rdefs():
   172             rschema = schema.rschema(rtype)
   173             if or (rschema.inlined and role == 'subject'):
   174                 assert len(entity_dict[key]) <= 1, \
   175                     "more than one value for %s: %s (%s)" % (rtype, entity_dict[key], self.extid)
   176                 if entity_dict[key]:
   177                     entity_dict[rtype] = entity_dict[key].pop()
   178                     if key != rtype:
   179                         del entity_dict[key]
   180                     if ( and eschema.has_metadata(rtype, 'format')
   181                             and not rtype + '_format' in entity_dict):
   182                         entity_dict[rtype + '_format'] = u'text/plain'
   183                 else:
   184                     del entity_dict[key]
   185             else:
   186                 for target_extid in entity_dict.pop(key):
   187                     if role == 'subject':
   188                         deferred.append((self.extid, rtype, target_extid))
   189                     else:
   190                         deferred.append((target_extid, rtype, self.extid))
   191         return deferred
   193     def is_ready(self, extid2eid):
   194         """Return True if the ext entity is ready, i.e. has all the URIs used in inlined relations
   195         currently existing.
   196         """
   197         assert self._schema, 'prepare() method should be called first on %s' % self
   198         # as .prepare has been called, we know that .values only contains subject relation *type* as
   199         # key (no more (rtype, role) tuple)
   200         schema = self._schema
   201         entity_dict = self.values
   202         for rtype in entity_dict:
   203             rschema = schema.rschema(rtype)
   204             if not
   205                 # .prepare() should drop other cases from the entity dict
   206                 assert rschema.inlined
   207                 if not entity_dict[rtype] in extid2eid:
   208                     return False
   209         # entity is ready, replace all relation's extid by eids
   210         for rtype in entity_dict:
   211             rschema = schema.rschema(rtype)
   212             if rschema.inlined:
   213                 entity_dict[rtype] = extid2eid[entity_dict[rtype]]
   214         return True
   217 class ExtEntitiesImporter(object):
   218     """This class is responsible for importing externals entities, that is instances of
   219     :class:`ExtEntity`, into CubicWeb entities.
   221     :param schema: the CubicWeb's instance schema
   222     :param store: a CubicWeb `Store`
   223     :param extid2eid: optional {extid: eid} dictionary giving information on existing entities. It
   224         will be completed during import. You may want to use :func:`cwuri2eid` to build it.
   225     :param existing_relation: optional {rtype: set((subj eid, obj eid))} mapping giving information on
   226         existing relations of a given type. You may want to use :class:`RelationMapping` to build it.
   227     :param  etypes_order_hint: optional ordered iterable on entity types, giving an hint on the order in
   228         which they should be attempted to be imported
   229     :param  import_log: optional object implementing the :class:`SimpleImportLog` interface to record
   230         events occuring during the import
   231     :param  raise_on_error: optional boolean flag - default to false, indicating whether errors should
   232         be raised or logged. You usually want them to be raised during test but to be logged in
   233         production.
   235     Instances of this class are meant to import external entities through :meth:`import_entities`
   236     which handles a stream of :class:`ExtEntity`. One may then plug arbitrary filters into the
   237     external entities stream.
   239     .. automethod:: import_entities
   241     """
   243     def __init__(self, schema, store, extid2eid=None, existing_relations=None,
   244                  etypes_order_hint=(), import_log=None, raise_on_error=False):
   245         self.schema = schema
   246 = store
   247         self.extid2eid = extid2eid if extid2eid is not None else {}
   248         self.existing_relations = (existing_relations if existing_relations is not None
   249                                    else defaultdict(set))
   250         self.etypes_order_hint = etypes_order_hint
   251         if import_log is None:
   252             import_log = SimpleImportLog('<unspecified>')
   253         self.import_log = import_log
   254         self.raise_on_error = raise_on_error
   255         # set of created/updated eids
   256         self.created = set()
   257         self.updated = set()
   259     def import_entities(self, ext_entities):
   260         """Import given external entities (:class:`ExtEntity`) stream (usually a generator)."""
   261         # {etype: [etype dict]} of entities that are in the import queue
   262         queue = {}
   263         # order entity dictionaries then create/update them
   264         deferred = self._import_entities(ext_entities, queue)
   265         # create deferred relations that don't exist already
   266         missing_relations = self.prepare_insert_deferred_relations(deferred)
   267         self._warn_about_missing_work(queue, missing_relations)
   269     def _import_entities(self, ext_entities, queue):
   270         extid2eid = self.extid2eid
   271         deferred = {}  # non inlined relations that may be deferred
   272         self.import_log.record_debug('importing entities')
   273         for ext_entity in self.iter_ext_entities(ext_entities, deferred, queue):
   274             try:
   275                 eid = extid2eid[ext_entity.extid]
   276             except KeyError:
   277                 self.prepare_insert_entity(ext_entity)
   278             else:
   279                 if ext_entity.values:
   280                     self.prepare_update_entity(ext_entity, eid)
   281         return deferred
   283     def iter_ext_entities(self, ext_entities, deferred, queue):
   284         """Yield external entities in an order which attempts to satisfy
   285         schema constraints (inlined / cardinality) and to optimize the import.
   286         """
   287         schema = self.schema
   288         extid2eid = self.extid2eid
   289         for ext_entity in ext_entities:
   290             # check data in the transitional representation and prepare it for
   291             # later insertion in the database
   292             for subject_uri, rtype, object_uri in ext_entity.prepare(schema):
   293                 deferred.setdefault(rtype, set()).add((subject_uri, object_uri))
   294             if not ext_entity.is_ready(extid2eid):
   295                 queue.setdefault(ext_entity.etype, []).append(ext_entity)
   296                 continue
   297             yield ext_entity
   298             # check for some entities in the queue that may now be ready. We'll have to restart
   299             # search for ready entities until no one is generated
   300             new = True
   301             while new:
   302                 new = False
   303                 for etype in self.etypes_order_hint:
   304                     if etype in queue:
   305                         new_queue = []
   306                         for ext_entity in queue[etype]:
   307                             if ext_entity.is_ready(extid2eid):
   308                                 yield ext_entity
   309                                 # may unlock entity previously handled within this loop
   310                                 new = True
   311                             else:
   312                                 new_queue.append(ext_entity)
   313                         if new_queue:
   314                             queue[etype][:] = new_queue
   315                         else:
   316                             del queue[etype]
   318     def prepare_insert_entity(self, ext_entity):
   319         """Call the store to prepare insertion of the given external entity"""
   320         eid =, **ext_entity.values)
   321         self.extid2eid[ext_entity.extid] = eid
   322         self.created.add(eid)
   323         return eid
   325     def prepare_update_entity(self, ext_entity, eid):
   326         """Call the store to prepare update of the given external entity"""
   327, eid, **ext_entity.values)
   328         self.updated.add(eid)
   330     def prepare_insert_deferred_relations(self, deferred):
   331         """Call the store to insert deferred relations (not handled during insertion/update for
   332         entities). Return a list of relations `[(subj ext id, obj ext id)]` that may not be inserted
   333         because the target entities don't exists yet.
   334         """
   335         prepare_insert_relation =
   336         rschema = self.schema.rschema
   337         extid2eid = self.extid2eid
   338         missing_relations = []
   339         for rtype, relations in deferred.items():
   340             self.import_log.record_debug('importing %s %s relations' % (len(relations), rtype))
   341             symmetric = rschema(rtype).symmetric
   342             existing = self.existing_relations[rtype]
   343             for subject_uri, object_uri in relations:
   344                 try:
   345                     subject_eid = extid2eid[subject_uri]
   346                     object_eid = extid2eid[object_uri]
   347                 except KeyError:
   348                     missing_relations.append((subject_uri, rtype, object_uri))
   349                     continue
   350                 if (subject_eid, object_eid) not in existing:
   351                     prepare_insert_relation(subject_eid, rtype, object_eid)
   352                     existing.add((subject_eid, object_eid))
   353                     if symmetric:
   354                         existing.add((object_eid, subject_eid))
   355         return missing_relations
   357     def _warn_about_missing_work(self, queue, missing_relations):
   358         error = self.import_log.record_error
   359         if queue:
   360             msgs = ["can't create some entities, is there some cycle or "
   361                     "missing data?"]
   362             for ext_entities in queue.values():
   363                 for ext_entity in ext_entities:
   364                     msgs.append(str(ext_entity))
   365             map(error, msgs)
   366             if self.raise_on_error:
   367                 raise Exception('\n'.join(msgs))
   368         if missing_relations:
   369             msgs = ["can't create some relations, is there missing data?"]
   370             for subject_uri, rtype, object_uri in missing_relations:
   371                 msgs.append("%s %s %s" % (subject_uri, rtype, object_uri))
   372             map(error, msgs)
   373             if self.raise_on_error:
   374                 raise Exception('\n'.join(msgs))
   377 class SimpleImportLog(object):
   378     """Fake CWDataImport log using a simple text format.
   380     Useful to display logs in the UI instead of storing them to the
   381     database.
   382     """
   384     def __init__(self, filename):
   385         self.logs = []
   386         self.filename = filename
   388     def record_debug(self, msg, path=None, line=None):
   389         self._log(logging.DEBUG, msg, path, line)
   391     def record_info(self, msg, path=None, line=None):
   392         self._log(logging.INFO, msg, path, line)
   394     def record_warning(self, msg, path=None, line=None):
   395         self._log(logging.WARNING, msg, path, line)
   397     def record_error(self, msg, path=None, line=None):
   398         self._log(logging.ERROR, msg, path, line)
   400     def record_fatal(self, msg, path=None, line=None):
   401         self._log(logging.FATAL, msg, path, line)
   403     def _log(self, severity, msg, path, line):
   404         encodedmsg = u'%s\t%s\t%s\t%s' % (severity, self.filename,
   405                                           line or u'', msg)
   406         self.logs.append(encodedmsg)
   409 class HTMLImportLog(SimpleImportLog):
   410     """Fake CWDataImport log using a simple HTML format."""
   411     def __init__(self, filename):
   412         super(HTMLImportLog, self).__init__(xml_escape(filename))
   414     def _log(self, severity, msg, path, line):
   415         encodedmsg = u'%s\t%s\t%s\t%s<br/>' % (severity, self.filename,
   416                                                line or u'', xml_escape(msg))
   417         self.logs.append(encodedmsg)