cubicweb/dataimport/importer.py
changeset 11057 0b59724cb3f2
parent 10939 b30c2f49da57
child 11128 9b4de34ad394
equal deleted inserted replaced
11052:058bb3dc685f 11057:0b59724cb3f2
       
     1 # copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     2 # contact http://www.logilab.fr -- mailto:contact@logilab.fr
       
     3 #
       
     4 # This program is free software: you can redistribute it and/or modify it under
       
     5 # the terms of the GNU Lesser General Public License as published by the Free
       
     6 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     7 # any later version.
       
     8 #
       
     9 # This program is distributed in the hope that it will be useful, but WITHOUT
       
    10 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    11 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
       
    12 # details.
       
    13 #
       
    14 # You should have received a copy of the GNU Lesser General Public License along
       
    15 # with this program. If not, see <http://www.gnu.org/licenses/>.
       
    16 """Data import of external entities.
       
    17 
       
    18 Main entry points:
       
    19 
       
    20 .. autoclass:: ExtEntitiesImporter
       
    21 .. autoclass:: ExtEntity
       
    22 
       
    23 Utilities:
       
    24 
       
    25 .. autofunction:: cwuri2eid
       
    26 .. autoclass:: RelationMapping
       
    27 .. autofunction:: cubicweb.dataimport.importer.use_extid_as_cwuri
       
    28 """
       
    29 
       
    30 from collections import defaultdict
       
    31 import logging
       
    32 
       
    33 from logilab.mtconverter import xml_escape
       
    34 
       
    35 
       
    36 def cwuri2eid(cnx, etypes, source_eid=None):
       
    37     """Return a dictionary mapping cwuri to eid for entities of the given entity types and / or
       
    38     source.
       
    39     """
       
    40     assert source_eid or etypes, 'no entity types nor source specified'
       
    41     rql = 'Any U, X WHERE X cwuri U'
       
    42     args = {}
       
    43     if len(etypes) == 1:
       
    44         rql += ', X is %s' % etypes[0]
       
    45     elif etypes:
       
    46         rql += ', X is IN (%s)' % ','.join(etypes)
       
    47     if source_eid is not None:
       
    48         rql += ', X cw_source S, S eid %(s)s'
       
    49         args['s'] = source_eid
       
    50     return dict(cnx.execute(rql, args))
       
    51 
       
    52 
       
    53 def use_extid_as_cwuri(extid2eid):
       
    54     """Return a generator of :class:`ExtEntity` objects that will set `cwuri`
       
    55     using entity's extid if the entity does not exist yet and has no `cwuri`
       
    56     defined.
       
    57 
       
    58     `extid2eid` is an extid to eid dictionary coming from an
       
    59     :class:`ExtEntitiesImporter` instance.
       
    60 
       
    61     Example usage:
       
    62 
       
    63     .. code-block:: python
       
    64 
       
    65         importer = SKOSExtEntitiesImporter(cnx, store, import_log)
       
    66         set_cwuri = use_extid_as_cwuri(importer.extid2eid)
       
    67         importer.import_entities(set_cwuri(extentities))
       
    68     """
       
    69     def use_extid_as_cwuri_filter(extentities):
       
    70         for extentity in extentities:
       
    71             if extentity.extid not in extid2eid:
       
    72                 extentity.values.setdefault('cwuri', set([extentity.extid.decode('utf-8')]))
       
    73             yield extentity
       
    74     return use_extid_as_cwuri_filter
       
    75 
       
    76 
       
    77 class RelationMapping(object):
       
    78     """Read-only mapping from relation type to set of related (subject, object) eids.
       
    79 
       
    80     If `source` is specified, only returns relations implying entities from
       
    81     this source.
       
    82     """
       
    83 
       
    84     def __init__(self, cnx, source=None):
       
    85         self.cnx = cnx
       
    86         self._rql_template = 'Any S,O WHERE S %s O'
       
    87         self._kwargs = {}
       
    88         if source is not None:
       
    89             self._rql_template += ', S cw_source SO, O cw_source SO, SO eid %%(s)s'
       
    90             self._kwargs['s'] = source.eid
       
    91 
       
    92     def __getitem__(self, rtype):
       
    93         """Return a set of (subject, object) eids already related by `rtype`"""
       
    94         rql = self._rql_template % rtype
       
    95         return set(tuple(x) for x in self.cnx.execute(rql, self._kwargs))
       
    96 
       
    97 
       
    98 class ExtEntity(object):
       
    99     """Transitional representation of an entity for use in data importer.
       
   100 
       
   101     An external entity has the following properties:
       
   102 
       
   103     * ``extid`` (external id), an identifier for the ext entity,
       
   104 
       
   105     * ``etype`` (entity type), a string which must be the name of one entity type in the schema
       
   106       (eg. ``'Person'``, ``'Animal'``, ...),
       
   107 
       
   108     * ``values``, a dictionary whose keys are attribute or relation names from the schema (eg.
       
   109       ``'first_name'``, ``'friend'``), and whose values are *sets*
       
   110 
       
   111     For instance:
       
   112 
       
   113     .. code-block:: python
       
   114 
       
   115         ext_entity.extid = 'http://example.org/person/debby'
       
   116         ext_entity.etype = 'Person'
       
   117         ext_entity.values = {'first_name': set([u"Deborah", u"Debby"]),
       
   118                             'friend': set(['http://example.org/person/john'])}
       
   119 
       
   120     """
       
   121 
       
   122     def __init__(self, etype, extid, values=None):
       
   123         self.etype = etype
       
   124         self.extid = extid
       
   125         if values is None:
       
   126             values = {}
       
   127         self.values = values
       
   128         self._schema = None
       
   129 
       
   130     def __repr__(self):
       
   131         return '<%s %s %s>' % (self.etype, self.extid, self.values)
       
   132 
       
   133     def iter_rdefs(self):
       
   134         """Yield (key, rtype, role) defined in `.values` dict, with:
       
   135 
       
   136         * `key` is the original key in `.values` (i.e. the relation type or a 2-uple (relation type,
       
   137           role))
       
   138 
       
   139         * `rtype` is a yams relation type, expected to be found in the schema (attribute or
       
   140           relation)
       
   141 
       
   142         * `role` is the role of the entity in the relation, 'subject' or 'object'
       
   143 
       
   144         Iteration is done on a copy of the keys so values may be inserted/deleted during it.
       
   145         """
       
   146         for key in list(self.values):
       
   147             if isinstance(key, tuple):
       
   148                 rtype, role = key
       
   149                 assert role in ('subject', 'object'), key
       
   150                 yield key, rtype, role
       
   151             else:
       
   152                 yield key, key, 'subject'
       
   153 
       
   154     def prepare(self, schema):
       
   155         """Prepare an external entity for later insertion:
       
   156 
       
   157         * ensure attributes and inlined relations have a single value
       
   158         * turn set([value]) into value and remove key associated to empty set
       
   159         * remove non inlined relations and return them as a [(e1key, relation, e2key)] list
       
   160 
       
   161         Return a list of non inlined relations that may be inserted later, each relations defined by
       
   162         a 3-tuple (subject extid, relation type, object extid).
       
   163 
       
   164         Take care the importer may call this method several times.
       
   165         """
       
   166         assert self._schema is None, 'prepare() has already been called for %s' % self
       
   167         self._schema = schema
       
   168         eschema = schema.eschema(self.etype)
       
   169         deferred = []
       
   170         entity_dict = self.values
       
   171         for key, rtype, role in self.iter_rdefs():
       
   172             rschema = schema.rschema(rtype)
       
   173             if rschema.final or (rschema.inlined and role == 'subject'):
       
   174                 assert len(entity_dict[key]) <= 1, \
       
   175                     "more than one value for %s: %s (%s)" % (rtype, entity_dict[key], self.extid)
       
   176                 if entity_dict[key]:
       
   177                     entity_dict[rtype] = entity_dict[key].pop()
       
   178                     if key != rtype:
       
   179                         del entity_dict[key]
       
   180                     if (rschema.final and eschema.has_metadata(rtype, 'format')
       
   181                             and not rtype + '_format' in entity_dict):
       
   182                         entity_dict[rtype + '_format'] = u'text/plain'
       
   183                 else:
       
   184                     del entity_dict[key]
       
   185             else:
       
   186                 for target_extid in entity_dict.pop(key):
       
   187                     if role == 'subject':
       
   188                         deferred.append((self.extid, rtype, target_extid))
       
   189                     else:
       
   190                         deferred.append((target_extid, rtype, self.extid))
       
   191         return deferred
       
   192 
       
   193     def is_ready(self, extid2eid):
       
   194         """Return True if the ext entity is ready, i.e. has all the URIs used in inlined relations
       
   195         currently existing.
       
   196         """
       
   197         assert self._schema, 'prepare() method should be called first on %s' % self
       
   198         # as .prepare has been called, we know that .values only contains subject relation *type* as
       
   199         # key (no more (rtype, role) tuple)
       
   200         schema = self._schema
       
   201         entity_dict = self.values
       
   202         for rtype in entity_dict:
       
   203             rschema = schema.rschema(rtype)
       
   204             if not rschema.final:
       
   205                 # .prepare() should drop other cases from the entity dict
       
   206                 assert rschema.inlined
       
   207                 if not entity_dict[rtype] in extid2eid:
       
   208                     return False
       
   209         # entity is ready, replace all relation's extid by eids
       
   210         for rtype in entity_dict:
       
   211             rschema = schema.rschema(rtype)
       
   212             if rschema.inlined:
       
   213                 entity_dict[rtype] = extid2eid[entity_dict[rtype]]
       
   214         return True
       
   215 
       
   216 
       
   217 class ExtEntitiesImporter(object):
       
   218     """This class is responsible for importing externals entities, that is instances of
       
   219     :class:`ExtEntity`, into CubicWeb entities.
       
   220 
       
   221     :param schema: the CubicWeb's instance schema
       
   222     :param store: a CubicWeb `Store`
       
   223     :param extid2eid: optional {extid: eid} dictionary giving information on existing entities. It
       
   224         will be completed during import. You may want to use :func:`cwuri2eid` to build it.
       
   225     :param existing_relation: optional {rtype: set((subj eid, obj eid))} mapping giving information on
       
   226         existing relations of a given type. You may want to use :class:`RelationMapping` to build it.
       
   227     :param  etypes_order_hint: optional ordered iterable on entity types, giving an hint on the order in
       
   228         which they should be attempted to be imported
       
   229     :param  import_log: optional object implementing the :class:`SimpleImportLog` interface to record
       
   230         events occuring during the import
       
   231     :param  raise_on_error: optional boolean flag - default to false, indicating whether errors should
       
   232         be raised or logged. You usually want them to be raised during test but to be logged in
       
   233         production.
       
   234 
       
   235     Instances of this class are meant to import external entities through :meth:`import_entities`
       
   236     which handles a stream of :class:`ExtEntity`. One may then plug arbitrary filters into the
       
   237     external entities stream.
       
   238 
       
   239     .. automethod:: import_entities
       
   240 
       
   241     """
       
   242 
       
   243     def __init__(self, schema, store, extid2eid=None, existing_relations=None,
       
   244                  etypes_order_hint=(), import_log=None, raise_on_error=False):
       
   245         self.schema = schema
       
   246         self.store = store
       
   247         self.extid2eid = extid2eid if extid2eid is not None else {}
       
   248         self.existing_relations = (existing_relations if existing_relations is not None
       
   249                                    else defaultdict(set))
       
   250         self.etypes_order_hint = etypes_order_hint
       
   251         if import_log is None:
       
   252             import_log = SimpleImportLog('<unspecified>')
       
   253         self.import_log = import_log
       
   254         self.raise_on_error = raise_on_error
       
   255         # set of created/updated eids
       
   256         self.created = set()
       
   257         self.updated = set()
       
   258 
       
   259     def import_entities(self, ext_entities):
       
   260         """Import given external entities (:class:`ExtEntity`) stream (usually a generator)."""
       
   261         # {etype: [etype dict]} of entities that are in the import queue
       
   262         queue = {}
       
   263         # order entity dictionaries then create/update them
       
   264         deferred = self._import_entities(ext_entities, queue)
       
   265         # create deferred relations that don't exist already
       
   266         missing_relations = self.prepare_insert_deferred_relations(deferred)
       
   267         self._warn_about_missing_work(queue, missing_relations)
       
   268 
       
   269     def _import_entities(self, ext_entities, queue):
       
   270         extid2eid = self.extid2eid
       
   271         deferred = {}  # non inlined relations that may be deferred
       
   272         self.import_log.record_debug('importing entities')
       
   273         for ext_entity in self.iter_ext_entities(ext_entities, deferred, queue):
       
   274             try:
       
   275                 eid = extid2eid[ext_entity.extid]
       
   276             except KeyError:
       
   277                 self.prepare_insert_entity(ext_entity)
       
   278             else:
       
   279                 if ext_entity.values:
       
   280                     self.prepare_update_entity(ext_entity, eid)
       
   281         return deferred
       
   282 
       
   283     def iter_ext_entities(self, ext_entities, deferred, queue):
       
   284         """Yield external entities in an order which attempts to satisfy
       
   285         schema constraints (inlined / cardinality) and to optimize the import.
       
   286         """
       
   287         schema = self.schema
       
   288         extid2eid = self.extid2eid
       
   289         for ext_entity in ext_entities:
       
   290             # check data in the transitional representation and prepare it for
       
   291             # later insertion in the database
       
   292             for subject_uri, rtype, object_uri in ext_entity.prepare(schema):
       
   293                 deferred.setdefault(rtype, set()).add((subject_uri, object_uri))
       
   294             if not ext_entity.is_ready(extid2eid):
       
   295                 queue.setdefault(ext_entity.etype, []).append(ext_entity)
       
   296                 continue
       
   297             yield ext_entity
       
   298             # check for some entities in the queue that may now be ready. We'll have to restart
       
   299             # search for ready entities until no one is generated
       
   300             new = True
       
   301             while new:
       
   302                 new = False
       
   303                 for etype in self.etypes_order_hint:
       
   304                     if etype in queue:
       
   305                         new_queue = []
       
   306                         for ext_entity in queue[etype]:
       
   307                             if ext_entity.is_ready(extid2eid):
       
   308                                 yield ext_entity
       
   309                                 # may unlock entity previously handled within this loop
       
   310                                 new = True
       
   311                             else:
       
   312                                 new_queue.append(ext_entity)
       
   313                         if new_queue:
       
   314                             queue[etype][:] = new_queue
       
   315                         else:
       
   316                             del queue[etype]
       
   317 
       
   318     def prepare_insert_entity(self, ext_entity):
       
   319         """Call the store to prepare insertion of the given external entity"""
       
   320         eid = self.store.prepare_insert_entity(ext_entity.etype, **ext_entity.values)
       
   321         self.extid2eid[ext_entity.extid] = eid
       
   322         self.created.add(eid)
       
   323         return eid
       
   324 
       
   325     def prepare_update_entity(self, ext_entity, eid):
       
   326         """Call the store to prepare update of the given external entity"""
       
   327         self.store.prepare_update_entity(ext_entity.etype, eid, **ext_entity.values)
       
   328         self.updated.add(eid)
       
   329 
       
   330     def prepare_insert_deferred_relations(self, deferred):
       
   331         """Call the store to insert deferred relations (not handled during insertion/update for
       
   332         entities). Return a list of relations `[(subj ext id, obj ext id)]` that may not be inserted
       
   333         because the target entities don't exists yet.
       
   334         """
       
   335         prepare_insert_relation = self.store.prepare_insert_relation
       
   336         rschema = self.schema.rschema
       
   337         extid2eid = self.extid2eid
       
   338         missing_relations = []
       
   339         for rtype, relations in deferred.items():
       
   340             self.import_log.record_debug('importing %s %s relations' % (len(relations), rtype))
       
   341             symmetric = rschema(rtype).symmetric
       
   342             existing = self.existing_relations[rtype]
       
   343             for subject_uri, object_uri in relations:
       
   344                 try:
       
   345                     subject_eid = extid2eid[subject_uri]
       
   346                     object_eid = extid2eid[object_uri]
       
   347                 except KeyError:
       
   348                     missing_relations.append((subject_uri, rtype, object_uri))
       
   349                     continue
       
   350                 if (subject_eid, object_eid) not in existing:
       
   351                     prepare_insert_relation(subject_eid, rtype, object_eid)
       
   352                     existing.add((subject_eid, object_eid))
       
   353                     if symmetric:
       
   354                         existing.add((object_eid, subject_eid))
       
   355         return missing_relations
       
   356 
       
   357     def _warn_about_missing_work(self, queue, missing_relations):
       
   358         error = self.import_log.record_error
       
   359         if queue:
       
   360             msgs = ["can't create some entities, is there some cycle or "
       
   361                     "missing data?"]
       
   362             for ext_entities in queue.values():
       
   363                 for ext_entity in ext_entities:
       
   364                     msgs.append(str(ext_entity))
       
   365             map(error, msgs)
       
   366             if self.raise_on_error:
       
   367                 raise Exception('\n'.join(msgs))
       
   368         if missing_relations:
       
   369             msgs = ["can't create some relations, is there missing data?"]
       
   370             for subject_uri, rtype, object_uri in missing_relations:
       
   371                 msgs.append("%s %s %s" % (subject_uri, rtype, object_uri))
       
   372             map(error, msgs)
       
   373             if self.raise_on_error:
       
   374                 raise Exception('\n'.join(msgs))
       
   375 
       
   376 
       
   377 class SimpleImportLog(object):
       
   378     """Fake CWDataImport log using a simple text format.
       
   379 
       
   380     Useful to display logs in the UI instead of storing them to the
       
   381     database.
       
   382     """
       
   383 
       
   384     def __init__(self, filename):
       
   385         self.logs = []
       
   386         self.filename = filename
       
   387 
       
   388     def record_debug(self, msg, path=None, line=None):
       
   389         self._log(logging.DEBUG, msg, path, line)
       
   390 
       
   391     def record_info(self, msg, path=None, line=None):
       
   392         self._log(logging.INFO, msg, path, line)
       
   393 
       
   394     def record_warning(self, msg, path=None, line=None):
       
   395         self._log(logging.WARNING, msg, path, line)
       
   396 
       
   397     def record_error(self, msg, path=None, line=None):
       
   398         self._log(logging.ERROR, msg, path, line)
       
   399 
       
   400     def record_fatal(self, msg, path=None, line=None):
       
   401         self._log(logging.FATAL, msg, path, line)
       
   402 
       
   403     def _log(self, severity, msg, path, line):
       
   404         encodedmsg = u'%s\t%s\t%s\t%s' % (severity, self.filename,
       
   405                                           line or u'', msg)
       
   406         self.logs.append(encodedmsg)
       
   407 
       
   408 
       
   409 class HTMLImportLog(SimpleImportLog):
       
   410     """Fake CWDataImport log using a simple HTML format."""
       
   411     def __init__(self, filename):
       
   412         super(HTMLImportLog, self).__init__(xml_escape(filename))
       
   413 
       
   414     def _log(self, severity, msg, path, line):
       
   415         encodedmsg = u'%s\t%s\t%s\t%s<br/>' % (severity, self.filename,
       
   416                                                line or u'', xml_escape(msg))
       
   417         self.logs.append(encodedmsg)