author Sylvain Thénault <>
Tue, 21 Jun 2011 10:57:25 +0200
changeset 7543 570522300e22
parent 7534 d58a9d96aad8
child 7553 935423529f45
permissions -rw-r--r--
[ms, entity metas] add 'actual source' to entities table / base entity metadata cache. Closes #1767090 this is needed since for entities from 'copy based sources' such as datafeed, we want entity.cw_metainformation() to return as 'source' the datafeed source, not the system source (ie the source where the entity is actually stored). For both performance and bootstraping reasons, we should store this information in the `entities` table and in the _type_source cache.

# copyright 2010-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact --
# This file is part of CubicWeb.
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <>.
"""datafeed parser for xml generated by cubicweb

Example of mapping for CWEntityXMLParser::

  {u'CWUser': {                                        # EntityType
      (u'in_group', u'subject', u'link'): [            # (rtype, role, action)
          (u'CWGroup', {u'linkattr': u'name'})],       #   -> rules = [(EntityType, options), ...]
      (u'tags', u'object', u'link-or-create'): [       # (...)
          (u'Tag', {u'linkattr': u'name'})],           #   -> ...
      (u'use_email', u'subject', u'copy'): [           # (...)
          (u'EmailAddress', {})]                       #   -> ...


import os.path as osp
from datetime import datetime, timedelta
from urllib import urlencode
from cgi import parse_qs # in urlparse with python >= 2.6

from import todate, totime
from logilab.common.textutils import splitstrip, text_to_dict

from yams.constraints import BASE_CONVERTERS
from yams.schema import role_name as rn

from cubicweb import ValidationError, typed_eid
from cubicweb.server.sources import datafeed

# XXX see cubicweb.cwvreg.YAMS_TO_PY
# XXX see cubicweb.web.views.xmlrss.SERIALIZERS
DEFAULT_CONVERTERS['String'] = unicode
DEFAULT_CONVERTERS['Password'] = lambda x: x.encode('utf8')
def convert_date(ustr):
    return todate(datetime.strptime(ustr, '%Y-%m-%d'))
DEFAULT_CONVERTERS['Date'] = convert_date
def convert_datetime(ustr):
    if '.' in ustr: # assume %Y-%m-%d %H:%M:%S.mmmmmm
        ustr = ustr.split('.',1)[0]
    return datetime.strptime(ustr, '%Y-%m-%d %H:%M:%S')
DEFAULT_CONVERTERS['Datetime'] = convert_datetime
def convert_time(ustr):
    return totime(datetime.strptime(ustr, '%H:%M:%S'))
DEFAULT_CONVERTERS['Time'] = convert_time
def convert_interval(ustr):
    return time(seconds=int(ustr))
DEFAULT_CONVERTERS['Interval'] = convert_interval

def extract_typed_attrs(eschema, stringdict, converters=DEFAULT_CONVERTERS):
    typeddict = {}
    for rschema in eschema.subject_relations():
        if and rschema in stringdict:
            if rschema == 'eid':
            attrtype = eschema.destination(rschema)
            typeddict[rschema.type] = converters[attrtype](stringdict[rschema])
    return typeddict

def _parse_entity_etree(parent):
    for node in list(parent):
            item = {'cwtype': unicode(node.tag),
                    'cwuri': node.attrib['cwuri'],
                    'cwsource': node.attrib.get('cwsource'),
                    'eid': typed_eid(node.attrib['eid']),
        except KeyError:
            # cw < 3.11 compat mode XXX
            item = {'cwtype': unicode(node.tag),
                    'cwuri': node.find('cwuri').text,
                    'cwsource': None,
                    'eid': typed_eid(node.find('eid').text),
        rels = {}
        for child in node:
            role = child.get('role')
            if role:
                # relation
                related = rels.setdefault(role, {}).setdefault(child.tag, [])
                related += [ritem for ritem, _ in _parse_entity_etree(child)]
                # attribute
                item[child.tag] = unicode(child.text)
        yield item, rels

def rtype_role_rql(rtype, role):
    if role == 'object':
        return 'Y %s X WHERE X eid %%(x)s' % rtype
        return 'X %s Y WHERE X eid %%(x)s' % rtype

def _check_no_option(action, options, eid, _):
    if options:
        msg = _("'%s' action doesn't take any options") % action
        raise ValidationError(eid, {rn('options', 'subject'): msg})

def _check_linkattr_option(action, options, eid, _):
    if not 'linkattr' in options:
        msg = _("'%s' action requires 'linkattr' option") % action
        raise ValidationError(eid, {rn('options', 'subject'): msg})

class CWEntityXMLParser(datafeed.DataFeedXMLParser):
    """datafeed parser for the 'xml' entity view"""
    __regid__ = 'cw.entityxml'

    action_options = {
        'copy': _check_no_option,
        'link-or-create': _check_linkattr_option,
        'link': _check_linkattr_option,
    parse_etree = staticmethod(_parse_entity_etree)

    def __init__(self, *args, **kwargs):
        super(CWEntityXMLParser, self).__init__(*args, **kwargs)
        self.action_methods = {
            'copy': self.related_copy,
            'link-or-create': self.related_link_or_create,
            'link': self.related_link,
        self._parsed_urls = {}
        self._processed_entities = set()

    # mapping handling #########################################################

    def add_schema_config(self, schemacfg, checkonly=False):
        """added CWSourceSchemaConfig, modify mapping accordingly"""
        _ = self._cw._
            rtype =
        except AttributeError:
            msg = _("entity and relation types can't be mapped, only attributes "
                    "or relations")
            raise ValidationError(schemacfg.eid, {rn('cw_for_schema', 'subject'): msg})
        if schemacfg.options:
            options = text_to_dict(schemacfg.options)
            options = {}
            role = options.pop('role')
            if role not in ('subject', 'object'):
                raise KeyError
        except KeyError:
            msg = _('"role=subject" or "role=object" must be specified in options')
            raise ValidationError(schemacfg.eid, {rn('options', 'subject'): msg})
            action = options.pop('action')
            self.action_options[action](action, options, schemacfg.eid, _)
        except KeyError:
            msg = _('"action" must be specified in options; allowed values are '
                    '%s') % ', '.join(self.action_methods)
            raise ValidationError(schemacfg.eid, {rn('options', 'subject'): msg})
        if not checkonly:
            if role == 'subject':
                etype =
                ttype =
                etype =
                ttype =
            etyperules = self.source.mapping.setdefault(etype, {})
            etyperules.setdefault((rtype, role, action), []).append(
                (ttype, options) )
            self.source.mapping_idx[schemacfg.eid] = (
                etype, rtype, role, action, ttype)

    def del_schema_config(self, schemacfg, checkonly=False):
        """deleted CWSourceSchemaConfig, modify mapping accordingly"""
        etype, rtype, role, action, ttype = self.source.mapping_idx[schemacfg.eid]
        rules = self.source.mapping[etype][(rtype, role, action)]
        rules = [x for x in rules if not x[0] == ttype]
        if not rules:
            del self.source.mapping[etype][(rtype, role, action)]

    # import handling ##########################################################

    def process(self, url, raise_on_error=False, partialcommit=True):
        """IDataFeedParser main entry point"""
        super(CWEntityXMLParser, self).process(self.complete_url(url),
                                               raise_on_error, partialcommit)

    # XXX suppression support according to source configuration. If set, get all
    # cwuri of entities from this source, and compare with newly imported ones

    def process_item(self, item, rels):
        entity = self.extid2entity(str(item.pop('cwuri')),  item.pop('cwtype'),
                                   cwsource=item.pop('cwsource'), item=item)
        if entity is None:
            return None
        if entity.eid in self._processed_entities:
            return entity
        if not (self.created_during_pull(entity) or self.updated_during_pull(entity)):
            # XXX check modification date
            attrs = extract_typed_attrs(entity.e_schema, item)
        for (rtype, role, action), rules in self.source.mapping.get(entity.__regid__, {}).iteritems():
                related_items = rels[role][rtype]
            except KeyError:
                self.source.error('relation %s-%s not found in xml export of %s',
                                  rtype, role, entity.__regid__)
                actionmethod = self.action_methods[action]
            except KeyError:
                raise Exception('Unknown action %s' % action)
            actionmethod(entity, rtype, role, related_items, rules)
        return entity

    def before_entity_copy(self, entity, sourceparams):
        """IDataFeedParser callback"""
        attrs = extract_typed_attrs(entity.e_schema, sourceparams['item'])

    def related_copy(self, entity, rtype, role, others, rules):
        """implementation of 'copy' action

        Takes no option.
        assert not any(x[1] for x in rules), "'copy' action takes no option"
        ttypes = frozenset([x[0] for x in rules])
        eids = [] # local eids
        for item in others:
            if item['cwtype'] in ttypes:
                item, _rels = self._complete_item(item)
                other_entity = self.process_item(item, [])
                if other_entity is not None:
        if eids:
            self._set_relation(entity, rtype, role, eids)
            self._clear_relation(entity, rtype, role, ttypes)

    def related_link(self, entity, rtype, role, others, rules):
        """implementation of 'link' action

        requires an options to control search of the linked entity.
        for ttype, options in rules:
            assert 'linkattr' in options, (
                "'link' action requires a list of attributes used to "
                "search if the entity already exists")
            self._related_link(entity, rtype, role, ttype, others, [options['linkattr']],

    def related_link_or_create(self, entity, rtype, role, others, rules):
        """implementation of 'link-or-create' action

        requires an options to control search of the linked entity.
        for ttype, options in rules:
            assert 'linkattr' in options, (
                "'link-or-create' action requires a list of attributes used to "
                "search if the entity already exists")
            self._related_link(entity, rtype, role, ttype, others, [options['linkattr']],

    def _related_link(self, entity, rtype, role, ttype, others, searchattrs,
        def issubset(x,y):
            return all(z in y for z in x)
        eids = [] # local eids
        for item in others:
            if item['cwtype'] != ttype:
            if not issubset(searchattrs, item):
                item, _rels = self._complete_item(item, False)
                if not issubset(searchattrs, item):
                    self.source.error('missing attribute, got %s expected keys %s'
                                      % item, searchattrs)
            kwargs = dict((str(attr), item[attr]) for attr in searchattrs) # XXX str() needed with python < 2.6
            targets = tuple(self._cw.find_entities(item['cwtype'], **kwargs))
            if len(targets) > 1:
                self.source.error('ambiguous link: found %s entity %s with attributes %s',
                                  len(targets), item['cwtype'], kwargs)
            elif len(targets) == 1:
            elif create_when_not_found:
                eids.append(self._cw.create_entity(item['cwtype'], **kwargs).eid)
                self.source.error('can not find %s entity with attributes %s',
                                  item['cwtype'], kwargs)
        if eids:
            self._set_relation(entity, rtype, role, eids)
            self._clear_relation(entity, rtype, role, (ttype,))

    def complete_url(self, url, etype=None):
        """append to the url's query string information about relation that should
        be included in the resulting xml, according to source mapping.

        If etype is not specified, try to guess it using the last path part of
        the url.
            url, qs = url.split('?', 1)
        except ValueError:
            qs = ''
        if etype is None:
                etype = url.rsplit('/', 1)[1]
            except ValueError:
                return url
                etype = self._cw.vreg.case_insensitive_etypes[etype]
            except KeyError:
                return url
        params = parse_qs(qs)
        if not 'vid' in params:
            params['vid'] = ['xml']
        relations = params.setdefault('relation', [])
        for rtype, role, _ in self.source.mapping.get(etype, ()):
            reldef = '%s-%s' % (rtype, role)
            if not reldef in relations:
        return url + '?' + self._cw.build_url_params(**params)

    def _complete_item(self, item, add_relations=True):
            return self._parsed_urls[(item['cwuri'], add_relations)]
        except KeyError:
            itemurl = self.complete_url(item['cwuri'], item['cwtype'])
            item_rels = list(self.parse(itemurl))
            assert len(item_rels) == 1, 'url %s expected to bring back one '\
                   'and only one entity, got %s' % (itemurl, len(item_rels))
            self._parsed_urls[(item['cwuri'], add_relations)] = item_rels[0]
            return item_rels[0]

    def _clear_relation(self, entity, rtype, role, ttypes):
        if entity.eid not in self.stats['created']:
            if len(ttypes) > 1:
                typerestr = ', Y is IN(%s)' % ','.join(ttypes)
                typerestr = ', Y is %s' % ','.join(ttypes)
            self._cw.execute('DELETE ' + rtype_role_rql(rtype, role) + typerestr,
                             {'x': entity.eid})

    def _set_relation(self, entity, rtype, role, eids):
        assert eids
        rqlbase = rtype_role_rql(rtype, role)
        eidstr = ','.join(str(eid) for eid in eids)
        self._cw.execute('DELETE %s, NOT Y eid IN (%s)' % (rqlbase, eidstr),
                         {'x': entity.eid})
        if role == 'object':
            rql = 'SET %s, Y eid IN (%s), NOT Y %s X' % (rqlbase, eidstr, rtype)
            rql = 'SET %s, Y eid IN (%s), NOT X %s Y' % (rqlbase, eidstr, rtype)
        self._cw.execute(rql, {'x': entity.eid})

def registration_callback(vreg):
    vreg.register_all(globals().values(), __name__)
    global HOST_MAPPING
    if vreg.config.apphome:
        host_mapping_file = osp.join(vreg.config.apphome, '')
        if osp.exists(host_mapping_file):
            HOST_MAPPING = eval(file(host_mapping_file).read())
  'using host mapping %s from %s', HOST_MAPPING, host_mapping_file)