# HG changeset patch # User Sylvain Thénault # Date 1297271184 -3600 # Node ID 822f2530570d4bc63c9d0522aed1e11b309c59ba # Parent 037a0277db0aadd392e82e4663b6d0c848e648d7 [datafeed] add parser to import cubicweb xml this parser is configurable through a mapping, which basically tells which entity's relation should be copied. Multiple actions to control backport of data are available: * 'copy': copy the external entities locally (though marked as coming from the data feed source) * 'link': find similar entity internaly (e.g. for states) * 'link-or-create': try to find a similar entity internaly, and create it if necessary (won't be marked as coming from the data feed source, e.g. for tags) diff -r 037a0277db0a -r 822f2530570d sobjects/parsers.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sobjects/parsers.py Wed Feb 09 18:06:24 2011 +0100 @@ -0,0 +1,336 @@ +# copyright 2010-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr +# +# This file is part of CubicWeb. +# +# CubicWeb is free software: you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 2.1 of the License, or (at your option) +# any later version. +# +# CubicWeb is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with CubicWeb. If not, see . +"""datafeed parser for xml generated by cubicweb""" + +import urllib2 +import StringIO +from cookielib import CookieJar +from datetime import datetime, timedelta + +from lxml import etree + +from logilab.common.date import todate, totime +from logilab.common.textutils import splitstrip, text_to_dict + +from yams.constraints import BASE_CONVERTERS +from yams.schema import role_name as rn + +from cubicweb import ValidationError, typed_eid +from cubicweb.server.sources import datafeed + + +# see cubicweb.web.views.xmlrss.SERIALIZERS +DEFAULT_CONVERTERS = BASE_CONVERTERS.copy() +DEFAULT_CONVERTERS['String'] = unicode +DEFAULT_CONVERTERS['Password'] = lambda x: x.encode('utf8') +def convert_date(ustr): + return todate(datetime.strptime(ustr, '%Y-%m-%d')) +DEFAULT_CONVERTERS['Date'] = convert_date +def convert_datetime(ustr): + return datetime.strptime(ustr, '%Y-%m-%d %H:%M:%S') +DEFAULT_CONVERTERS['Datetime'] = convert_datetime +def convert_time(ustr): + return totime(datetime.strptime(ustr, '%H:%M:%S')) +DEFAULT_CONVERTERS['Time'] = convert_time +def convert_interval(ustr): + return time(seconds=int(ustr)) +DEFAULT_CONVERTERS['Interval'] = convert_interval + +# use a cookie enabled opener to use session cookie if any +_OPENER = urllib2.build_opener(urllib2.HTTPCookieProcessor(CookieJar())) + +def extract_typed_attrs(eschema, stringdict, converters=DEFAULT_CONVERTERS): + typeddict = {} + for rschema in eschema.subject_relations(): + if rschema.final and rschema in stringdict: + if rschema == 'eid': + continue + attrtype = eschema.destination(rschema) + typeddict[rschema.type] = converters[attrtype](stringdict[rschema]) + return typeddict + +def _entity_etree(parent): + for node in list(parent): + item = {'cwtype': unicode(node.tag), + 'cwuri': node.attrib['cwuri'], + 'eid': typed_eid(node.attrib['eid']), + } + rels = {} + for child in node: + role = child.get('role') + if child.get('role'): + # relation + related = rels.setdefault(role, {}).setdefault(child.tag, []) + related += [ritem for ritem, _ in _entity_etree(child)] + else: + # attribute + item[child.tag] = unicode(child.text) + yield item, rels + +def build_search_rql(etype, attrs): + restrictions = [] + for attr in attrs: + restrictions.append('X %(attr)s %%(%(attr)s)s' % {'attr': attr}) + return 'Any X WHERE X is %s, %s' % (etype, ','.join(restrictions)) + +def rtype_role_rql(rtype, role): + if role == 'object': + return 'Y %s X WHERE X eid %%(x)s' % rtype + else: + return 'X %s Y WHERE X eid %%(x)s' % rtype + + +def _check_no_option(action, options, eid, _): + if options: + msg = _("'%s' action doesn't take any options") % action + raise ValidationError(eid, {rn('options', 'subject'): msg}) + +def _check_linkattr_option(action, options, eid, _): + if not 'linkattr' in options: + msg = _("'%s' action require 'linkattr' option") % action + raise ValidationError(eid, {rn('options', 'subject'): msg}) + + +class CWEntityXMLParser(datafeed.DataFeedParser): + """datafeed parser for the 'xml' entity view""" + __regid__ = 'cw.entityxml' + + action_options = { + 'copy': _check_no_option, + 'link-or-create': _check_linkattr_option, + 'link': _check_linkattr_option, + } + + def __init__(self, *args, **kwargs): + super(CWEntityXMLParser, self).__init__(*args, **kwargs) + self.action_methods = { + 'copy': self.related_copy, + 'link-or-create': self.related_link_or_create, + 'link': self.related_link, + } + + # mapping handling ######################################################### + + def add_schema_config(self, schemacfg, checkonly=False): + """added CWSourceSchemaConfig, modify mapping accordingly""" + _ = self._cw._ + try: + rtype = schemacfg.schema.rtype.name + except AttributeError: + msg = _("entity and relation types can't be mapped, only attributes " + "or relations") + raise ValidationError(schemacfg.eid, {rn('cw_for_schema', 'subject'): msg}) + if schemacfg.options: + options = text_to_dict(schemacfg.options) + else: + options = {} + try: + role = options.pop('role') + if role not in ('subject', 'object'): + raise KeyError + except KeyError: + msg = _('"role=subject" or "role=object" must be specified in options') + raise ValidationError(schemacfg.eid, {rn('options', 'subject'): msg}) + try: + action = options.pop('action') + self.action_options[action](action, options, schemacfg.eid, _) + except KeyError: + msg = _('"action" must be specified in options; allowed values are ' + '%s') % ', '.join(self.action_methods) + raise ValidationError(schemacfg.eid, {rn('options', 'subject'): msg}) + if not checkonly: + if role == 'subject': + etype = schemacfg.schema.stype.name + ttype = schemacfg.schema.otype.name + else: + etype = schemacfg.schema.otype.name + ttype = schemacfg.schema.stype.name + etyperules = self.source.mapping.setdefault(etype, {}) + etyperules.setdefault((rtype, role, action), []).append( + (ttype, options) ) + self.source.mapping_idx[schemacfg.eid] = ( + etype, rtype, role, action, ttype) + + def del_schema_config(self, schemacfg, checkonly=False): + """deleted CWSourceSchemaConfig, modify mapping accordingly""" + etype, rtype, role, action, ttype = self.source.mapping_idx[schemacfg.eid] + rules = self.source.mapping[etype][(rtype, role, action)] + rules = [x for x in rules if not x[0] == ttype] + if not rules: + del self.source.mapping[etype][(rtype, role, action)] + + # import handling ########################################################## + + def process(self, url, partialcommit=True): + """IDataFeedParser main entry point""" + # XXX suppression support according to source configuration. If set, get + # all cwuri of entities from this source, and compare with newly + # imported ones + for item, rels in self.parse(url): + self.process_item(item, rels) + if partialcommit: + # commit+set_pool instead of commit(reset_pool=False) to let + # other a chance to get our pool + self._cw.commit() + self._cw.set_pool() + + def parse(self, url): + if not url.startswith('http'): + stream = StringIO.StringIO(url) + else: + self.source.info('GET %s', url) + stream = _OPENER.open(url) + return _entity_etree(etree.parse(stream).getroot()) + + def process_one(self, url): + # XXX assert len(root.children) == 1 + for item, rels in self.parse(url): + return self.process_item(item, rels) + + def process_item(self, item, rels): + entity = self.extid2entity(str(item.pop('cwuri')), + item.pop('cwtype'), + item=item) + if not (self.created_during_pull(entity) + or self.updated_during_pull(entity)): + self.notify_updated(entity) + item.pop('eid') + # XXX check modification date + attrs = extract_typed_attrs(entity.e_schema, item) + entity.set_attributes(**attrs) + for (rtype, role, action), rules in self.source.mapping.get(entity.__regid__, {}).iteritems(): + try: + rel = rels[role][rtype] + except KeyError: + self.source.error('relation %s-%s doesn\'t seem exported in %s xml', + rtype, role, entity.__regid__) + continue + try: + actionmethod = self.action_methods[action] + except KeyError: + raise Exception('Unknown action %s' % action) + actionmethod(entity, rtype, role, rel, rules) + return entity + + def before_entity_copy(self, entity, sourceparams): + """IDataFeedParser callback""" + attrs = extract_typed_attrs(entity.e_schema, sourceparams['item']) + entity.cw_edited.update(attrs) + + def related_copy(self, entity, rtype, role, value, rules): + """implementation of 'copy' action + + Takes no option. + """ + assert not any(x[1] for x in rules), "'copy' action takes no option" + ttypes = set([x[0] for x in rules]) + value = [item for item in value if item['cwtype'] in ttypes] + eids = [] # local eids + if not value: + self._clear_relation(entity, rtype, role, ttypes) + return + for item in value: + eids.append(self.process_one(self._complete_url(item)).eid) + self._set_relation(entity, rtype, role, eids) + + def related_link(self, entity, rtype, role, value, rules): + """implementation of 'link' action + + requires an options to control search of the linked entity. + """ + for ttype, options in rules: + assert 'linkattr' in options, ( + "'link-or-create' action require a list of attributes used to " + "search if the entity already exists") + self._related_link(entity, rtype, role, ttype, value, [options['linkattr']], + self._log_not_found) + + def related_link_or_create(self, entity, rtype, role, value, rules): + """implementation of 'link-or-create' action + + requires an options to control search of the linked entity. + """ + for ttype, options in rules: + assert 'linkattr' in options, ( + "'link-or-create' action require a list of attributes used to " + "search if the entity already exists") + self._related_link(entity, rtype, role, ttype, value, [options['linkattr']], + self._create_not_found) + + def _log_not_found(self, entity, rtype, role, ritem, searchvalues): + self.source.error('can find %s entity with attributes %s', + ritem['cwtype'], searchvalues) + + def _create_not_found(self, entity, rtype, role, ritem, searchvalues): + return self._cw.create_entity(ritem['cwtype'], **searchvalues).eid + + def _related_link(self, entity, rtype, role, ttype, value, searchattrs, + notfound_callback): + eids = [] # local eids + for item in value: + if item['cwtype'] != ttype: + continue + if not all(attr in item for attr in searchattrs): + # need to fetch related entity's xml + ritems = list(self.parse(self._complete_url(item, False))) + assert len(ritems) == 1, 'unexpected xml' + ritem = ritems[0][0] # list of 2-uples + assert all(attr in ritem for attr in searchattrs), \ + 'missing attribute, got %s expected keys %s' % (item, searchattrs) + else: + ritem = item + kwargs = dict((attr, ritem[attr]) for attr in searchattrs) + rql = build_search_rql(item['cwtype'], kwargs) + rset = self._cw.execute(rql, kwargs) + if rset: + assert len(rset) == 1 + eids.append(rset[0][0]) + else: + eid = notfound_callback(entity, rtype, role, ritem, kwargs) + if eid is not None: + eids.append(eid) + if not eids: + self._clear_relation(entity, rtype, role, (ttype,)) + else: + self._set_relation(entity, rtype, role, eids) + + def _complete_url(self, item, add_relations=True): + itemurl = item['cwuri'] + '?vid=xml' + for rtype, role, _ in self.source.mapping.get(item['cwtype'], ()): + itemurl += '&relation=%s_%s' % (rtype, role) + return itemurl + + def _clear_relation(self, entity, rtype, role, ttypes): + if entity.eid not in self.stats['created']: + if len(ttypes) > 1: + typerestr = ', Y is IN(%s)' % ','.join(ttypes) + else: + typerestr = ', Y is %s' % ','.join(ttypes) + self._cw.execute('DELETE ' + rtype_role_rql(rtype, role) + typerestr, + {'x': entity.eid}) + + def _set_relation(self, entity, rtype, role, eids): + eidstr = ','.join(str(eid) for eid in eids) + rql = rtype_role_rql(rtype, role) + self._cw.execute('DELETE %s, NOT Y eid IN (%s)' % (rql, eidstr), + {'x': entity.eid}) + if role == 'object': + rql = 'SET %s, Y eid IN (%s), NOT Y %s X' % (rql, eidstr, rtype) + else: + rql = 'SET %s, Y eid IN (%s), NOT X %s Y' % (rql, eidstr, rtype) + self._cw.execute(rql, {'x': entity.eid}) diff -r 037a0277db0a -r 822f2530570d sobjects/test/data/schema.py --- a/sobjects/test/data/schema.py Wed Feb 09 18:06:19 2011 +0100 +++ b/sobjects/test/data/schema.py Wed Feb 09 18:06:24 2011 +0100 @@ -1,4 +1,4 @@ -# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. @@ -15,10 +15,7 @@ # # You should have received a copy of the GNU Lesser General Public License along # with CubicWeb. If not, see . -""" - -""" -from yams.buildobjs import RelationDefinition +from yams.buildobjs import EntityType, RelationDefinition, String, SubjectRelation class comments(RelationDefinition): subject = 'Comment' @@ -26,3 +23,6 @@ cardinality='1*' composite='object' +class Tag(EntityType): + name = String(unique=True) + tags = SubjectRelation('CWUser') diff -r 037a0277db0a -r 822f2530570d sobjects/test/unittest_parsers.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sobjects/test/unittest_parsers.py Wed Feb 09 18:06:24 2011 +0100 @@ -0,0 +1,170 @@ +# copyright 2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr +# +# This file is part of CubicWeb. +# +# CubicWeb is free software: you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 2.1 of the License, or (at your option) +# any later version. +# +# CubicWeb is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with CubicWeb. If not, see . + +from datetime import datetime + +from cubicweb.devtools.testlib import CubicWebTC + +from cubicweb.sobjects.parsers import CWEntityXMLParser + +orig_parse = CWEntityXMLParser.parse + +def parse(url): + try: + url = RELATEDXML[url.split('?')[0]] + except KeyError: + pass + return orig_parse(url) + +def setUpModule(): + CWEntityXMLParser.parse = staticmethod(parse) + +def tearDownModule(): + CWEntityXMLParser.parse = orig_parse + + +BASEXML = ''.join(u''' + + + sthenault + toto + 2011-01-25 14:14:06 + 2010-01-22 10:27:59 + 2011-01-25 14:14:06 + + + + + + + + + + + + + +'''.splitlines()) + +RELATEDXML ={ + 'http://pouet.org/eid/6': u''' + + +
syt@logilab.fr
+ 2010-04-13 14:35:56 + 2010-04-13 14:35:56 +
+
+''', + 'http://pouet.org/eid/7': u''' + + + users + + +''', + 'http://pouet.org/eid/8': u''' + + + unknown + + +''', + 'http://pouet.org/eid/9': u''' + + + hop + + +''', + 'http://pouet.org/eid/10': u''' + + + unknown + + +''', + } + +class CWEntityXMLParserTC(CubicWebTC): + def setup_database(self): + req = self.request() + source = req.create_entity('CWSource', name=u'myfeed', type=u'datafeed', + parser=u'cw.entityxml', url=BASEXML) + self.commit() + source.init_mapping([(('CWUser', 'use_email', '*'), + u'role=subject\naction=copy'), + (('CWUser', 'in_group', '*'), + u'role=subject\naction=link\nlinkattr=name'), + (('*', 'tags', 'CWUser'), + u'role=object\naction=link-or-create\nlinkattr=name'), + ]) + req.create_entity('Tag', name=u'hop') + + def test_actions(self): + dfsource = self.repo.sources_by_uri['myfeed'] + self.assertEqual(dfsource.mapping, + {u'CWUser': { + (u'in_group', u'subject', u'link'): [ + (u'CWGroup', {u'linkattr': u'name'})], + (u'tags', u'object', u'link-or-create'): [ + (u'Tag', {u'linkattr': u'name'})], + (u'use_email', u'subject', u'copy'): [ + (u'EmailAddress', {})] + } + }) + session = self.repo.internal_session() + stats = dfsource.pull_data(session, force=True) + self.assertEqual(sorted(stats.keys()), ['created', 'updated']) + self.assertEqual(len(stats['created']), 2) + self.assertEqual(stats['updated'], set()) + + user = self.execute('CWUser X WHERE X login "sthenault"').get_entity(0, 0) + self.assertEqual(user.creation_date, datetime(2010, 01, 22, 10, 27, 59)) + self.assertEqual(user.modification_date, datetime(2011, 01, 25, 14, 14, 06)) + self.assertEqual(user.cwuri, 'http://pouet.org/eid/5') + self.assertEqual(user.cw_source[0].name, 'myfeed') + self.assertEqual(len(user.use_email), 1) + # copy action + email = user.use_email[0] + self.assertEqual(email.address, 'syt@logilab.fr') + self.assertEqual(email.cwuri, 'http://pouet.org/eid/6') + self.assertEqual(email.cw_source[0].name, 'myfeed') + # link action + self.assertFalse(self.execute('CWGroup X WHERE X name "unknown"')) + groups = sorted([g.name for g in user.in_group]) + self.assertEqual(groups, ['users']) + # link or create action + tags = sorted([t.name for t in user.reverse_tags]) + self.assertEqual(tags, ['hop', 'unknown']) + tag = self.execute('Tag X WHERE X name "unknown"').get_entity(0, 0) + self.assertEqual(tag.cwuri, 'http://testing.fr/cubicweb/eid/%s' % tag.eid) + self.assertEqual(tag.cw_source[0].name, 'system') + + stats = dfsource.pull_data(session, force=True) + self.assertEqual(stats['created'], set()) + self.assertEqual(len(stats['updated']), 2) + self.repo._type_source_cache.clear() + self.repo._extid_cache.clear() + stats = dfsource.pull_data(session, force=True) + self.assertEqual(stats['created'], set()) + self.assertEqual(len(stats['updated']), 2) + +if __name__ == '__main__': + from logilab.common.testlib import unittest_main + unittest_main() diff -r 037a0277db0a -r 822f2530570d web/views/xmlrss.py --- a/web/views/xmlrss.py Wed Feb 09 18:06:19 2011 +0100 +++ b/web/views/xmlrss.py Wed Feb 09 18:06:24 2011 +0100 @@ -112,6 +112,10 @@ continue self.w(u' <%s role="%s">\n' % (rtype, role)) for related in entity.related(rtype, role, entities=True): + # XXX put unique attributes as xml attribute, they are much + # probably used to search existing entities in client data feed, + # and putting it here may avoid an extra request to get those + # attributes values self.w(u' <%s eid="%s" cwuri="%s"/>\n' % (related.e_schema, related.eid, xml_escape(related.cwuri))) @@ -271,7 +275,6 @@ if entity.creator: self._marker('dc:creator', entity.dc_creator()) - def _marker(self, marker, value): if value: self.w(u' <%s>%s\n' % (marker, xml_escape(value), marker))