sobjects/parsers.py
author Sylvain Thénault <sylvain.thenault@logilab.fr>
Wed, 08 Jun 2011 17:10:39 +0200
changeset 7474 7dc405ad7bf3
parent 7470 c3fc72ee720a
child 7478 9e213becdcf4
permissions -rw-r--r--
[datafeed cwxml parser] cache processed urls/entities to avoid unnecessary http requests and processing

# copyright 2010-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
"""datafeed parser for xml generated by cubicweb

Example of mapping for CWEntityXMLParser::

  {u'CWUser': {                                        # EntityType
      (u'in_group', u'subject', u'link'): [            # (rtype, role, action)
          (u'CWGroup', {u'linkattr': u'name'})],       #   -> rules = [(EntityType, options), ...]
      (u'tags', u'object', u'link-or-create'): [       # (...)
          (u'Tag', {u'linkattr': u'name'})],           #   -> ...
      (u'use_email', u'subject', u'copy'): [           # (...)
          (u'EmailAddress', {})]                       #   -> ...
      }
   }

"""

import os.path as osp
from datetime import datetime, timedelta

from logilab.common.date import todate, totime
from logilab.common.textutils import splitstrip, text_to_dict

from yams.constraints import BASE_CONVERTERS
from yams.schema import role_name as rn

from cubicweb import ValidationError, typed_eid
from cubicweb.server.sources import datafeed

# XXX see cubicweb.cwvreg.YAMS_TO_PY
# XXX see cubicweb.web.views.xmlrss.SERIALIZERS
DEFAULT_CONVERTERS = BASE_CONVERTERS.copy()
DEFAULT_CONVERTERS['String'] = unicode
DEFAULT_CONVERTERS['Password'] = lambda x: x.encode('utf8')
def convert_date(ustr):
    return todate(datetime.strptime(ustr, '%Y-%m-%d'))
DEFAULT_CONVERTERS['Date'] = convert_date
def convert_datetime(ustr):
    if '.' in ustr: # assume %Y-%m-%d %H:%M:%S.mmmmmm
        ustr = ustr.split('.',1)[0]
    return datetime.strptime(ustr, '%Y-%m-%d %H:%M:%S')
DEFAULT_CONVERTERS['Datetime'] = convert_datetime
def convert_time(ustr):
    return totime(datetime.strptime(ustr, '%H:%M:%S'))
DEFAULT_CONVERTERS['Time'] = convert_time
def convert_interval(ustr):
    return time(seconds=int(ustr))
DEFAULT_CONVERTERS['Interval'] = convert_interval

def extract_typed_attrs(eschema, stringdict, converters=DEFAULT_CONVERTERS):
    typeddict = {}
    for rschema in eschema.subject_relations():
        if rschema.final and rschema in stringdict:
            if rschema == 'eid':
                continue
            attrtype = eschema.destination(rschema)
            typeddict[rschema.type] = converters[attrtype](stringdict[rschema])
    return typeddict

def _parse_entity_etree(parent):
    for node in list(parent):
        try:
            item = {'cwtype': unicode(node.tag),
                    'cwuri': node.attrib['cwuri'],
                    'eid': typed_eid(node.attrib['eid']),
                    }
        except KeyError:
            # cw < 3.11 compat mode XXX
            item = {'cwtype': unicode(node.tag),
                    'cwuri': node.find('cwuri').text,
                    'eid': typed_eid(node.find('eid').text),
                    }
        rels = {}
        for child in node:
            role = child.get('role')
            if role:
                # relation
                related = rels.setdefault(role, {}).setdefault(child.tag, [])
                related += [ritem for ritem, _ in _parse_entity_etree(child)]
            else:
                # attribute
                item[child.tag] = unicode(child.text)
        yield item, rels

def rtype_role_rql(rtype, role):
    if role == 'object':
        return 'Y %s X WHERE X eid %%(x)s' % rtype
    else:
        return 'X %s Y WHERE X eid %%(x)s' % rtype


def _check_no_option(action, options, eid, _):
    if options:
        msg = _("'%s' action doesn't take any options") % action
        raise ValidationError(eid, {rn('options', 'subject'): msg})

def _check_linkattr_option(action, options, eid, _):
    if not 'linkattr' in options:
        msg = _("'%s' action requires 'linkattr' option") % action
        raise ValidationError(eid, {rn('options', 'subject'): msg})


class CWEntityXMLParser(datafeed.DataFeedXMLParser):
    """datafeed parser for the 'xml' entity view"""
    __regid__ = 'cw.entityxml'

    action_options = {
        'copy': _check_no_option,
        'link-or-create': _check_linkattr_option,
        'link': _check_linkattr_option,
        }
    parse_etree = staticmethod(_parse_entity_etree)


    def __init__(self, *args, **kwargs):
        super(CWEntityXMLParser, self).__init__(*args, **kwargs)
        self.action_methods = {
            'copy': self.related_copy,
            'link-or-create': self.related_link_or_create,
            'link': self.related_link,
            }
        self._parsed_urls = {}
        self._processed_entities = set()

    # mapping handling #########################################################

    def add_schema_config(self, schemacfg, checkonly=False):
        """added CWSourceSchemaConfig, modify mapping accordingly"""
        _ = self._cw._
        try:
            rtype = schemacfg.schema.rtype.name
        except AttributeError:
            msg = _("entity and relation types can't be mapped, only attributes "
                    "or relations")
            raise ValidationError(schemacfg.eid, {rn('cw_for_schema', 'subject'): msg})
        if schemacfg.options:
            options = text_to_dict(schemacfg.options)
        else:
            options = {}
        try:
            role = options.pop('role')
            if role not in ('subject', 'object'):
                raise KeyError
        except KeyError:
            msg = _('"role=subject" or "role=object" must be specified in options')
            raise ValidationError(schemacfg.eid, {rn('options', 'subject'): msg})
        try:
            action = options.pop('action')
            self.action_options[action](action, options, schemacfg.eid, _)
        except KeyError:
            msg = _('"action" must be specified in options; allowed values are '
                    '%s') % ', '.join(self.action_methods)
            raise ValidationError(schemacfg.eid, {rn('options', 'subject'): msg})
        if not checkonly:
            if role == 'subject':
                etype = schemacfg.schema.stype.name
                ttype = schemacfg.schema.otype.name
            else:
                etype = schemacfg.schema.otype.name
                ttype = schemacfg.schema.stype.name
            etyperules = self.source.mapping.setdefault(etype, {})
            etyperules.setdefault((rtype, role, action), []).append(
                (ttype, options) )
            self.source.mapping_idx[schemacfg.eid] = (
                etype, rtype, role, action, ttype)

    def del_schema_config(self, schemacfg, checkonly=False):
        """deleted CWSourceSchemaConfig, modify mapping accordingly"""
        etype, rtype, role, action, ttype = self.source.mapping_idx[schemacfg.eid]
        rules = self.source.mapping[etype][(rtype, role, action)]
        rules = [x for x in rules if not x[0] == ttype]
        if not rules:
            del self.source.mapping[etype][(rtype, role, action)]

    # import handling ##########################################################

    # XXX suppression support according to source configuration. If set, get all
    # cwuri of entities from this source, and compare with newly imported ones

    def process_item(self, item, rels):
        entity = self.extid2entity(str(item.pop('cwuri')),  item.pop('cwtype'),
                                   item=item)
        if entity is None:
            return None
        if entity.eid in self._processed_entities:
            return entity
        self._processed_entities.add(entity.eid)
        if not (self.created_during_pull(entity) or self.updated_during_pull(entity)):
            self.notify_updated(entity)
            item.pop('eid')
            # XXX check modification date
            attrs = extract_typed_attrs(entity.e_schema, item)
            entity.set_attributes(**attrs)
        for (rtype, role, action), rules in self.source.mapping.get(entity.__regid__, {}).iteritems():
            try:
                related_items = rels[role][rtype]
            except KeyError:
                self.source.error('relation %s-%s not found in xml export of %s',
                                  rtype, role, entity.__regid__)
                continue
            try:
                actionmethod = self.action_methods[action]
            except KeyError:
                raise Exception('Unknown action %s' % action)
            actionmethod(entity, rtype, role, related_items, rules)
        return entity

    def before_entity_copy(self, entity, sourceparams):
        """IDataFeedParser callback"""
        attrs = extract_typed_attrs(entity.e_schema, sourceparams['item'])
        entity.cw_edited.update(attrs)

    def related_copy(self, entity, rtype, role, others, rules):
        """implementation of 'copy' action

        Takes no option.
        """
        assert not any(x[1] for x in rules), "'copy' action takes no option"
        ttypes = frozenset([x[0] for x in rules])
        eids = [] # local eids
        for item in others:
            if item['cwtype'] in ttypes:
                item, _rels = self._complete_item(item)
                other_entity = self.process_item(item, [])
                if other_entity is not None:
                    eids.append(other_entity.eid)
        if eids:
            self._set_relation(entity, rtype, role, eids)
        else:
            self._clear_relation(entity, rtype, role, ttypes)

    def related_link(self, entity, rtype, role, others, rules):
        """implementation of 'link' action

        requires an options to control search of the linked entity.
        """
        for ttype, options in rules:
            assert 'linkattr' in options, (
                "'link' action requires a list of attributes used to "
                "search if the entity already exists")
            self._related_link(entity, rtype, role, ttype, others, [options['linkattr']],
                               create_when_not_found=False)

    def related_link_or_create(self, entity, rtype, role, others, rules):
        """implementation of 'link-or-create' action

        requires an options to control search of the linked entity.
        """
        for ttype, options in rules:
            assert 'linkattr' in options, (
                "'link-or-create' action requires a list of attributes used to "
                "search if the entity already exists")
            self._related_link(entity, rtype, role, ttype, others, [options['linkattr']],
                               create_when_not_found=True)

    def _related_link(self, entity, rtype, role, ttype, others, searchattrs,
                      create_when_not_found):
        def issubset(x,y):
            return all(z in y for z in x)
        eids = [] # local eids
        for item in others:
            if item['cwtype'] != ttype:
                continue
            if not issubset(searchattrs, item):
                item, _rels = self._complete_item(item, False)
                if not issubset(searchattrs, item):
                    self.source.error('missing attribute, got %s expected keys %s'
                                      % item, searchattrs)
                    continue
            kwargs = dict((str(attr), item[attr]) for attr in searchattrs) # XXX str() needed with python < 2.6
            targets = tuple(self._cw.find_entities(item['cwtype'], **kwargs))
            if len(targets) > 1:
                self.source.error('ambiguous link: found %s entity %s with attributes %s',
                                  len(targets), item['cwtype'], kwargs)
            elif len(targets) == 1:
                eids.append(targets[0].eid)
            elif create_when_not_found:
                eids.append(self._cw.create_entity(item['cwtype'], **kwargs).eid)
            else:
                self.source.error('can not find %s entity with attributes %s',
                                  item['cwtype'], kwargs)
        if eids:
            self._set_relation(entity, rtype, role, eids)
        else:
            self._clear_relation(entity, rtype, role, (ttype,))

    def _complete_item(self, item, add_relations=True):
        try:
            return self._parsed_urls[(item['cwuri'], add_relations)]
        except KeyError:
            itemurl = item['cwuri'] + '?vid=xml'
            if add_relations:
                for rtype, role, _ in self.source.mapping.get(item['cwtype'], ()):
                    itemurl += '&relation=%s_%s' % (rtype, role)
            item_rels = list(self.parse(itemurl))
            assert len(item_rels) == 1, 'url %s expected to bring back one '\
                   'and only one entity, got %s' % (itemurl, len(item_rels))
            self._parsed_urls[(item['cwuri'], add_relations)] = item_rels[0]
            return item_rels[0]

    def _clear_relation(self, entity, rtype, role, ttypes):
        if entity.eid not in self.stats['created']:
            if len(ttypes) > 1:
                typerestr = ', Y is IN(%s)' % ','.join(ttypes)
            else:
                typerestr = ', Y is %s' % ','.join(ttypes)
            self._cw.execute('DELETE ' + rtype_role_rql(rtype, role) + typerestr,
                             {'x': entity.eid})

    def _set_relation(self, entity, rtype, role, eids):
        assert eids
        rqlbase = rtype_role_rql(rtype, role)
        eidstr = ','.join(str(eid) for eid in eids)
        self._cw.execute('DELETE %s, NOT Y eid IN (%s)' % (rqlbase, eidstr),
                         {'x': entity.eid})
        if role == 'object':
            rql = 'SET %s, Y eid IN (%s), NOT Y %s X' % (rqlbase, eidstr, rtype)
        else:
            rql = 'SET %s, Y eid IN (%s), NOT X %s Y' % (rqlbase, eidstr, rtype)
        self._cw.execute(rql, {'x': entity.eid})

def registration_callback(vreg):
    vreg.register_all(globals().values(), __name__)
    global HOST_MAPPING
    HOST_MAPPING = {}
    if vreg.config.apphome:
        host_mapping_file = osp.join(vreg.config.apphome, 'hostmapping.py')
        if osp.exists(host_mapping_file):
            HOST_MAPPING = eval(file(host_mapping_file).read())
            vreg.info('using host mapping %s from %s', HOST_MAPPING, host_mapping_file)