dataimport/importer.py
changeset 10460 d260722f2453
child 10461 37644c518705
equal deleted inserted replaced
10459:5ccc3bd8927e 10460:d260722f2453
       
     1 # copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     2 # contact http://www.logilab.fr -- mailto:contact@logilab.fr
       
     3 #
       
     4 # This program is free software: you can redistribute it and/or modify it under
       
     5 # the terms of the GNU Lesser General Public License as published by the Free
       
     6 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     7 # any later version.
       
     8 #
       
     9 # This program is distributed in the hope that it will be useful, but WITHOUT
       
    10 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    11 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
       
    12 # details.
       
    13 #
       
    14 # You should have received a copy of the GNU Lesser General Public License along
       
    15 # with this program. If not, see <http://www.gnu.org/licenses/>.
       
    16 """This module contains tools to programmatically import external data into CubicWeb. It's designed
       
    17 on top of the store concept to leverage possibility of code sharing accross various data import
       
    18 needs.
       
    19 
       
    20 The following classes are defined:
       
    21 
       
    22 * :class:`ExtEntity`: some intermediate representation of data to import, using external identifier
       
    23   but no eid,
       
    24 
       
    25 * :class:`ExtEntitiesImporter`: class responsible for turning ExtEntity's extid to eid, and create
       
    26   or update CubicWeb entities accordingly (using a Store).
       
    27 
       
    28 What is left to do is to write a class or a function that will yield external entities from some
       
    29 data source (eg RDF, CSV) which will be case dependant (the *generator*).  You may then plug
       
    30 arbitrary filters into the external entities stream between the generator and the importer, allowing
       
    31 to have some generic generators whose generated content is rafined by specific filters.
       
    32 
       
    33 .. code-block:: python
       
    34 
       
    35     ext_entities = fetch(<source>) # function yielding external entities
       
    36     log = SimpleImportLog('<source file/url/whatever>')
       
    37     importer = ExtEntitiesImporter(cnx, store, import_log=log)
       
    38     importer.import_entities(ext_entities)
       
    39 
       
    40 Here are the two classes that you'll have to deal with, and maybe to override:
       
    41 
       
    42 .. autoclass:: cubicweb.dataimport.importer.ExtEntitiesImporter
       
    43 .. autoclass:: cubicweb.dataimport.importer.ExtEntity
       
    44 """
       
    45 
       
    46 from collections import defaultdict
       
    47 import logging
       
    48 
       
    49 from logilab.mtconverter import xml_escape
       
    50 
       
    51 
       
    52 def cwuri2eid(cnx, etypes, source_eid=None):
       
    53     """Return a dictionary mapping cwuri to eid for entities of the given entity types and / or
       
    54     source.
       
    55     """
       
    56     assert source_eid or etypes, 'no entity types nor source specified'
       
    57     rql = 'Any U, X WHERE X cwuri U'
       
    58     args = {}
       
    59     if len(etypes) == 1:
       
    60         rql += ', X is %s' % etypes[0]
       
    61     elif etypes:
       
    62         rql += ', X is IN (%s)' % ','.join(etypes)
       
    63     if source_eid is not None:
       
    64         rql += ', X cw_source S, S eid %(s)s'
       
    65         args['s'] = source_eid
       
    66     return dict(cnx.execute(rql, args))
       
    67 
       
    68 
       
    69 class RelationMapping(object):
       
    70     """Read-only mapping from relation type to set of related (subject, object) eids.
       
    71 
       
    72     If `source` is specified, only returns relations implying entities from
       
    73     this source.
       
    74     """
       
    75 
       
    76     def __init__(self, cnx, source=None):
       
    77         self.cnx = cnx
       
    78         self._rql_template = 'Any S,O WHERE S {} O'
       
    79         self._kwargs = {}
       
    80         if source is not None:
       
    81             self._rql_template += ', S cw_source SO, O cw_source SO, SO eid %(s)s'
       
    82             self._kwargs['s'] = source.eid
       
    83 
       
    84     def __getitem__(self, rtype):
       
    85         """Return a set of (subject, object) eids already related by `rtype`"""
       
    86         rql = self._rql_template.format(rtype)
       
    87         return set(tuple(x) for x in self.cnx.execute(rql, self._kwargs))
       
    88 
       
    89 
       
    90 class ExtEntity(object):
       
    91     """Transitional representation of an entity for use in data importer.
       
    92 
       
    93     An external entity has the following properties:
       
    94 
       
    95     * ``extid`` (external id), an identifier for the ext entity,
       
    96     * ``etype`` (entity type), a string which must be the name of one entity type in the schema
       
    97       (eg. ``'Person'``, ``'Animal'``, ...),
       
    98     * ``values``, a dictionary whose keys are attribute or relation names from the schema (eg.
       
    99       ``'first_name'``, ``'friend'``), and whose values are *sets*
       
   100 
       
   101     For instance:
       
   102 
       
   103     ..code-block::python
       
   104 
       
   105         ext_entity.extid = 'http://example.org/person/debby'
       
   106         ext_entity.etype = 'Person'
       
   107         ext_entity.values = {'first_name': set([u"Deborah", u"Debby"]),
       
   108                             'friend': set(['http://example.org/person/john'])}
       
   109 
       
   110     """
       
   111 
       
   112     def __init__(self, etype, extid, values=None):
       
   113         self.etype = etype
       
   114         self.extid = extid
       
   115         if values is None:
       
   116             values = {}
       
   117         self.values = values
       
   118         self._schema = None
       
   119 
       
   120     def __repr__(self):
       
   121         return '<%s %s %s>' % (self.etype, self.extid, self.values)
       
   122 
       
   123     def iter_rdefs(self):
       
   124         """Yield (key, rtype, role) defined in `.values` dict, with:
       
   125 
       
   126         * `key` is the original key in `.values` (i.e. the relation type or a 2-uple (relation type,
       
   127           role))
       
   128 
       
   129         * `rtype` is a yams relation type, expected to be found in the schema (attribute or
       
   130           relation)
       
   131 
       
   132         * `role` is the role of the entity in the relation, 'subject' or 'object'
       
   133 
       
   134         Iteration is done on a copy of the keys so values may be inserted/deleted during it.
       
   135         """
       
   136         for key in list(self.values):
       
   137             if isinstance(key, tuple):
       
   138                 rtype, role = key
       
   139                 assert role in ('subject', 'object'), key
       
   140                 yield key, rtype, role
       
   141             else:
       
   142                 yield key, key, 'subject'
       
   143 
       
   144     def prepare(self, schema):
       
   145         """Prepare an external entity for later insertion:
       
   146 
       
   147         * ensure attributes and inlined relations have a single value
       
   148         * turn set([value]) into value and remove key associated to empty set
       
   149         * remove non inlined relations and return them as a [(e1key, relation, e2key)] list
       
   150 
       
   151         Return a list of non inlined relations that may be inserted later, each relations defined by
       
   152         a 3-tuple (subject extid, relation type, object extid).
       
   153 
       
   154         Take care the importer may call this method several times.
       
   155         """
       
   156         assert self._schema is None, 'prepare() has already been called for %s' % self
       
   157         self._schema = schema
       
   158         eschema = schema.eschema(self.etype)
       
   159         deferred = []
       
   160         entity_dict = self.values
       
   161         for key, rtype, role in self.iter_rdefs():
       
   162             rschema = schema.rschema(rtype)
       
   163             if rschema.final or (rschema.inlined and role == 'subject'):
       
   164                 assert len(entity_dict[key]) <= 1, \
       
   165                     "more than one value for %s: %s (%s)" % (rtype, entity_dict[key], self.extid)
       
   166                 if entity_dict[key]:
       
   167                     entity_dict[rtype] = entity_dict[key].pop()
       
   168                     if key != rtype:
       
   169                         del entity_dict[key]
       
   170                     if (rschema.final and eschema.has_metadata(rtype, 'format')
       
   171                             and not rtype + '_format' in entity_dict):
       
   172                         entity_dict[rtype + '_format'] = u'text/plain'
       
   173                 else:
       
   174                     del entity_dict[key]
       
   175             else:
       
   176                 for target_extid in entity_dict.pop(key):
       
   177                     if role == 'subject':
       
   178                         deferred.append((self.extid, rtype, target_extid))
       
   179                     else:
       
   180                         deferred.append((target_extid, rtype, self.extid))
       
   181         return deferred
       
   182 
       
   183     def is_ready(self, extid2eid):
       
   184         """Return True if the ext entity is ready, i.e. has all the URIs used in inlined relations
       
   185         currently existing.
       
   186         """
       
   187         assert self._schema, 'prepare() method should be called first on %s' % self
       
   188         # as .prepare has been called, we know that .values only contains subject relation *type* as
       
   189         # key (no more (rtype, role) tuple)
       
   190         schema = self._schema
       
   191         entity_dict = self.values
       
   192         for rtype in entity_dict:
       
   193             rschema = schema.rschema(rtype)
       
   194             if not rschema.final:
       
   195                 # .prepare() should drop other cases from the entity dict
       
   196                 assert rschema.inlined
       
   197                 if not entity_dict[rtype] in extid2eid:
       
   198                     return False
       
   199         # entity is ready, replace all relation's extid by eids
       
   200         for rtype in entity_dict:
       
   201             rschema = schema.rschema(rtype)
       
   202             if rschema.inlined:
       
   203                 entity_dict[rtype] = extid2eid[entity_dict[rtype]]
       
   204         return True
       
   205 
       
   206 
       
   207 class ExtEntitiesImporter(object):
       
   208     """This class is responsible for importing externals entities, that is instances of
       
   209     :class:`ExtEntity`, into CubicWeb entities.
       
   210 
       
   211     Parameters:
       
   212 
       
   213     * `schema`: the CubicWeb's instance schema
       
   214 
       
   215     * `store`: a CubicWeb `Store`
       
   216 
       
   217     * `extid2eid`: optional {extid: eid} dictionary giving information on existing entities. It
       
   218     will be completed during import. You may want to use :func:`cwuri2eid` to build it.
       
   219 
       
   220     * `existing_relation`: optional {rtype: set((subj eid, obj eid))} mapping giving information on
       
   221     existing relations of a given type. You may want to use :class:`RelationMapping` to build it.
       
   222 
       
   223     * `etypes_order_hint`: optional ordered iterable on entity types, giving an hint on the order in
       
   224       which they should be attempted to be imported
       
   225 
       
   226     * `import_log`: optional object implementing the :class:`SimpleImportLog` interface to record
       
   227       events occuring during the import
       
   228 
       
   229     * `raise_on_error`: optional boolean flag - default to false, indicating whether errors should
       
   230       be raised or logged. You usually want them to be raised during test but to be logged in
       
   231       production.
       
   232     """
       
   233 
       
   234     def __init__(self, schema, store, extid2eid=None, existing_relations=None,
       
   235                  etypes_order_hint=(), import_log=None, raise_on_error=False):
       
   236         self.schema = schema
       
   237         self.store = store
       
   238         self.extid2eid = extid2eid if extid2eid is not None else {}
       
   239         self.existing_relations = (existing_relations if existing_relations is not None
       
   240                                    else defaultdict(set))
       
   241         self.etypes_order_hint = etypes_order_hint
       
   242         if import_log is None:
       
   243             import_log = SimpleImportLog('<unspecified>')
       
   244         self.import_log = import_log
       
   245         self.raise_on_error = raise_on_error
       
   246         # set of created/updated eids
       
   247         self.created = set()
       
   248         self.updated = set()
       
   249 
       
   250     def import_entities(self, ext_entities):
       
   251         """Import given external entities (:class:`ExtEntity`) stream (usually a generator)."""
       
   252         # {etype: [etype dict]} of entities that are in the import queue
       
   253         queue = {}
       
   254         # order entity dictionaries then create/update them
       
   255         deferred = self._import_entities(ext_entities, queue)
       
   256         # create deferred relations that don't exist already
       
   257         missing_relations = self.prepare_insert_deferred_relations(deferred)
       
   258         self._warn_about_missing_work(queue, missing_relations)
       
   259 
       
   260     def _import_entities(self, ext_entities, queue):
       
   261         extid2eid = self.extid2eid
       
   262         deferred = {}  # non inlined relations that may be deferred
       
   263         self.import_log.record_debug('importing entities')
       
   264         for ext_entity in self.iter_ext_entities(ext_entities, deferred, queue):
       
   265             try:
       
   266                 eid = extid2eid[ext_entity.extid]
       
   267             except KeyError:
       
   268                 self.prepare_insert_entity(ext_entity)
       
   269             else:
       
   270                 if ext_entity.values:
       
   271                     self.prepare_update_entity(ext_entity, eid)
       
   272         return deferred
       
   273 
       
   274     def iter_ext_entities(self, ext_entities, deferred, queue):
       
   275         """Yield external entities in an order which attempts to satisfy
       
   276         schema constraints (inlined / cardinality) and to optimize the import.
       
   277         """
       
   278         schema = self.schema
       
   279         extid2eid = self.extid2eid
       
   280         for ext_entity in ext_entities:
       
   281             # check data in the transitional representation and prepare it for
       
   282             # later insertion in the database
       
   283             for subject_uri, rtype, object_uri in ext_entity.prepare(schema):
       
   284                 deferred.setdefault(rtype, set()).add((subject_uri, object_uri))
       
   285             if not ext_entity.is_ready(extid2eid):
       
   286                 queue.setdefault(ext_entity.etype, []).append(ext_entity)
       
   287                 continue
       
   288             yield ext_entity
       
   289             # check for some entities in the queue that may now be ready. We'll have to restart
       
   290             # search for ready entities until no one is generated
       
   291             new = True
       
   292             while new:
       
   293                 new = False
       
   294                 for etype in self.etypes_order_hint:
       
   295                     if etype in queue:
       
   296                         new_queue = []
       
   297                         for ext_entity in queue[etype]:
       
   298                             if ext_entity.is_ready(extid2eid):
       
   299                                 yield ext_entity
       
   300                                 # may unlock entity previously handled within this loop
       
   301                                 new = True
       
   302                             else:
       
   303                                 new_queue.append(ext_entity)
       
   304                         if new_queue:
       
   305                             queue[etype][:] = new_queue
       
   306                         else:
       
   307                             del queue[etype]
       
   308 
       
   309     def prepare_insert_entity(self, ext_entity):
       
   310         """Call the store to prepare insertion of the given external entity"""
       
   311         eid = self.store.prepare_insert_entity(ext_entity.etype, **ext_entity.values)
       
   312         self.extid2eid[ext_entity.extid] = eid
       
   313         self.created.add(eid)
       
   314         return eid
       
   315 
       
   316     def prepare_update_entity(self, ext_entity, eid):
       
   317         """Call the store to prepare update of the given external entity"""
       
   318         self.store.prepare_update_entity(ext_entity.etype, eid, **ext_entity.values)
       
   319         self.updated.add(eid)
       
   320 
       
   321     def prepare_insert_deferred_relations(self, deferred):
       
   322         """Call the store to insert deferred relations (not handled during insertion/update for
       
   323         entities). Return a list of relations `[(subj ext id, obj ext id)]` that may not be inserted
       
   324         because the target entities don't exists yet.
       
   325         """
       
   326         prepare_insert_relation = self.store.prepare_insert_relation
       
   327         rschema = self.schema.rschema
       
   328         extid2eid = self.extid2eid
       
   329         missing_relations = []
       
   330         for rtype, relations in deferred.items():
       
   331             self.import_log.record_debug('importing %s %s relations' % (len(relations), rtype))
       
   332             symmetric = rschema(rtype).symmetric
       
   333             existing = self.existing_relations[rtype]
       
   334             for subject_uri, object_uri in relations:
       
   335                 try:
       
   336                     subject_eid = extid2eid[subject_uri]
       
   337                     object_eid = extid2eid[object_uri]
       
   338                 except KeyError:
       
   339                     missing_relations.append((subject_uri, rtype, object_uri))
       
   340                     continue
       
   341                 if (subject_eid, object_eid) not in existing:
       
   342                     prepare_insert_relation(subject_eid, rtype, object_eid)
       
   343                     existing.add((subject_eid, object_eid))
       
   344                     if symmetric:
       
   345                         existing.add((object_eid, subject_eid))
       
   346         return missing_relations
       
   347 
       
   348     def _warn_about_missing_work(self, queue, missing_relations):
       
   349         error = self.import_log.record_error
       
   350         if queue:
       
   351             msgs = ["can't create some entities, is there some cycle or "
       
   352                     "missing data?"]
       
   353             for ext_entities in queue.values():
       
   354                 for ext_entity in ext_entities:
       
   355                     msgs.append(str(ext_entity))
       
   356             map(error, msgs)
       
   357             if self.raise_on_error:
       
   358                 raise Exception('\n'.join(msgs))
       
   359         if missing_relations:
       
   360             msgs = ["can't create some relations, is there missing data?"]
       
   361             for subject_uri, rtype, object_uri in missing_relations:
       
   362                 msgs.append("%s %s %s" % (subject_uri, rtype, object_uri))
       
   363             map(error, msgs)
       
   364             if self.raise_on_error:
       
   365                 raise Exception('\n'.join(msgs))
       
   366 
       
   367 
       
   368 class SimpleImportLog(object):
       
   369     """Fake CWDataImport log using a simple text format.
       
   370 
       
   371     Useful to display logs in the UI instead of storing them to the
       
   372     database.
       
   373     """
       
   374 
       
   375     def __init__(self, filename):
       
   376         self.logs = []
       
   377         self.filename = filename
       
   378 
       
   379     def record_debug(self, msg, path=None, line=None):
       
   380         self._log(logging.DEBUG, msg, path, line)
       
   381 
       
   382     def record_info(self, msg, path=None, line=None):
       
   383         self._log(logging.INFO, msg, path, line)
       
   384 
       
   385     def record_warning(self, msg, path=None, line=None):
       
   386         self._log(logging.WARNING, msg, path, line)
       
   387 
       
   388     def record_error(self, msg, path=None, line=None):
       
   389         self._log(logging.ERROR, msg, path, line)
       
   390 
       
   391     def record_fatal(self, msg, path=None, line=None):
       
   392         self._log(logging.FATAL, msg, path, line)
       
   393 
       
   394     def _log(self, severity, msg, path, line):
       
   395         encodedmsg = u'%s\t%s\t%s\t%s' % (severity, self.filename,
       
   396                                           line or u'', msg)
       
   397         self.logs.append(encodedmsg)
       
   398 
       
   399 
       
   400 class HTMLImportLog(SimpleImportLog):
       
   401     """Fake CWDataImport log using a simple HTML format."""
       
   402     def __init__(self, filename):
       
   403         super(HTMLImportLog, self).__init__(xml_escape(filename))
       
   404 
       
   405     def _log(self, severity, msg, path, line):
       
   406         encodedmsg = u'%s\t%s\t%s\t%s<br/>' % (severity, self.filename,
       
   407                                                line or u'', xml_escape(msg))
       
   408         self.logs.append(encodedmsg)