[datafeed] add parser to import cubicweb xml
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Wed, 09 Feb 2011 18:06:24 +0100
changeset 6960 822f2530570d
parent 6959 037a0277db0a
child 6961 686c59dfc401
[datafeed] add parser to import cubicweb xml this parser is configurable through a mapping, which basically tells which entity's relation should be copied. Multiple actions to control backport of data are available: * 'copy': copy the external entities locally (though marked as coming from the data feed source) * 'link': find similar entity internaly (e.g. for states) * 'link-or-create': try to find a similar entity internaly, and create it if necessary (won't be marked as coming from the data feed source, e.g. for tags)
sobjects/parsers.py
sobjects/test/data/schema.py
sobjects/test/unittest_parsers.py
web/views/xmlrss.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sobjects/parsers.py	Wed Feb 09 18:06:24 2011 +0100
@@ -0,0 +1,336 @@
+# copyright 2010-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
+#
+# This file is part of CubicWeb.
+#
+# CubicWeb is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License along
+# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
+"""datafeed parser for xml generated by cubicweb"""
+
+import urllib2
+import StringIO
+from cookielib import CookieJar
+from datetime import datetime, timedelta
+
+from lxml import etree
+
+from logilab.common.date import todate, totime
+from logilab.common.textutils import splitstrip, text_to_dict
+
+from yams.constraints import BASE_CONVERTERS
+from yams.schema import role_name as rn
+
+from cubicweb import ValidationError, typed_eid
+from cubicweb.server.sources import datafeed
+
+
+# see cubicweb.web.views.xmlrss.SERIALIZERS
+DEFAULT_CONVERTERS = BASE_CONVERTERS.copy()
+DEFAULT_CONVERTERS['String'] = unicode
+DEFAULT_CONVERTERS['Password'] = lambda x: x.encode('utf8')
+def convert_date(ustr):
+    return todate(datetime.strptime(ustr, '%Y-%m-%d'))
+DEFAULT_CONVERTERS['Date'] = convert_date
+def convert_datetime(ustr):
+    return datetime.strptime(ustr, '%Y-%m-%d %H:%M:%S')
+DEFAULT_CONVERTERS['Datetime'] = convert_datetime
+def convert_time(ustr):
+    return totime(datetime.strptime(ustr, '%H:%M:%S'))
+DEFAULT_CONVERTERS['Time'] = convert_time
+def convert_interval(ustr):
+    return time(seconds=int(ustr))
+DEFAULT_CONVERTERS['Interval'] = convert_interval
+
+# use a cookie enabled opener to use session cookie if any
+_OPENER = urllib2.build_opener(urllib2.HTTPCookieProcessor(CookieJar()))
+
+def extract_typed_attrs(eschema, stringdict, converters=DEFAULT_CONVERTERS):
+    typeddict = {}
+    for rschema in eschema.subject_relations():
+        if rschema.final and rschema in stringdict:
+            if rschema == 'eid':
+                continue
+            attrtype = eschema.destination(rschema)
+            typeddict[rschema.type] = converters[attrtype](stringdict[rschema])
+    return typeddict
+
+def _entity_etree(parent):
+    for node in list(parent):
+        item = {'cwtype': unicode(node.tag),
+                'cwuri': node.attrib['cwuri'],
+                'eid': typed_eid(node.attrib['eid']),
+                }
+        rels = {}
+        for child in node:
+            role = child.get('role')
+            if child.get('role'):
+                # relation
+                related = rels.setdefault(role, {}).setdefault(child.tag, [])
+                related += [ritem for ritem, _ in _entity_etree(child)]
+            else:
+                # attribute
+                item[child.tag] = unicode(child.text)
+        yield item, rels
+
+def build_search_rql(etype, attrs):
+    restrictions = []
+    for attr in attrs:
+        restrictions.append('X %(attr)s %%(%(attr)s)s' % {'attr': attr})
+    return 'Any X WHERE X is %s, %s' % (etype, ','.join(restrictions))
+
+def rtype_role_rql(rtype, role):
+    if role == 'object':
+        return 'Y %s X WHERE X eid %%(x)s' % rtype
+    else:
+        return 'X %s Y WHERE X eid %%(x)s' % rtype
+
+
+def _check_no_option(action, options, eid, _):
+    if options:
+        msg = _("'%s' action doesn't take any options") % action
+        raise ValidationError(eid, {rn('options', 'subject'): msg})
+
+def _check_linkattr_option(action, options, eid, _):
+    if not 'linkattr' in options:
+        msg = _("'%s' action require 'linkattr' option") % action
+        raise ValidationError(eid, {rn('options', 'subject'): msg})
+
+
+class CWEntityXMLParser(datafeed.DataFeedParser):
+    """datafeed parser for the 'xml' entity view"""
+    __regid__ = 'cw.entityxml'
+
+    action_options = {
+        'copy': _check_no_option,
+        'link-or-create': _check_linkattr_option,
+        'link': _check_linkattr_option,
+        }
+
+    def __init__(self, *args, **kwargs):
+        super(CWEntityXMLParser, self).__init__(*args, **kwargs)
+        self.action_methods = {
+            'copy': self.related_copy,
+            'link-or-create': self.related_link_or_create,
+            'link': self.related_link,
+            }
+
+    # mapping handling #########################################################
+
+    def add_schema_config(self, schemacfg, checkonly=False):
+        """added CWSourceSchemaConfig, modify mapping accordingly"""
+        _ = self._cw._
+        try:
+            rtype = schemacfg.schema.rtype.name
+        except AttributeError:
+            msg = _("entity and relation types can't be mapped, only attributes "
+                    "or relations")
+            raise ValidationError(schemacfg.eid, {rn('cw_for_schema', 'subject'): msg})
+        if schemacfg.options:
+            options = text_to_dict(schemacfg.options)
+        else:
+            options = {}
+        try:
+            role = options.pop('role')
+            if role not in ('subject', 'object'):
+                raise KeyError
+        except KeyError:
+            msg = _('"role=subject" or "role=object" must be specified in options')
+            raise ValidationError(schemacfg.eid, {rn('options', 'subject'): msg})
+        try:
+            action = options.pop('action')
+            self.action_options[action](action, options, schemacfg.eid, _)
+        except KeyError:
+            msg = _('"action" must be specified in options; allowed values are '
+                    '%s') % ', '.join(self.action_methods)
+            raise ValidationError(schemacfg.eid, {rn('options', 'subject'): msg})
+        if not checkonly:
+            if role == 'subject':
+                etype = schemacfg.schema.stype.name
+                ttype = schemacfg.schema.otype.name
+            else:
+                etype = schemacfg.schema.otype.name
+                ttype = schemacfg.schema.stype.name
+            etyperules = self.source.mapping.setdefault(etype, {})
+            etyperules.setdefault((rtype, role, action), []).append(
+                (ttype, options) )
+            self.source.mapping_idx[schemacfg.eid] = (
+                etype, rtype, role, action, ttype)
+
+    def del_schema_config(self, schemacfg, checkonly=False):
+        """deleted CWSourceSchemaConfig, modify mapping accordingly"""
+        etype, rtype, role, action, ttype = self.source.mapping_idx[schemacfg.eid]
+        rules = self.source.mapping[etype][(rtype, role, action)]
+        rules = [x for x in rules if not x[0] == ttype]
+        if not rules:
+            del self.source.mapping[etype][(rtype, role, action)]
+
+    # import handling ##########################################################
+
+    def process(self, url, partialcommit=True):
+        """IDataFeedParser main entry point"""
+        # XXX suppression support according to source configuration. If set, get
+        # all cwuri of entities from this source, and compare with newly
+        # imported ones
+        for item, rels in self.parse(url):
+            self.process_item(item, rels)
+            if partialcommit:
+                # commit+set_pool instead of commit(reset_pool=False) to let
+                # other a chance to get our pool
+                self._cw.commit()
+                self._cw.set_pool()
+
+    def parse(self, url):
+        if not url.startswith('http'):
+            stream = StringIO.StringIO(url)
+        else:
+            self.source.info('GET %s', url)
+            stream = _OPENER.open(url)
+        return _entity_etree(etree.parse(stream).getroot())
+
+    def process_one(self, url):
+        # XXX assert len(root.children) == 1
+        for item, rels in self.parse(url):
+            return self.process_item(item, rels)
+
+    def process_item(self, item, rels):
+        entity = self.extid2entity(str(item.pop('cwuri')),
+                                   item.pop('cwtype'),
+                                   item=item)
+        if not (self.created_during_pull(entity)
+                or self.updated_during_pull(entity)):
+            self.notify_updated(entity)
+            item.pop('eid')
+            # XXX check modification date
+            attrs = extract_typed_attrs(entity.e_schema, item)
+            entity.set_attributes(**attrs)
+        for (rtype, role, action), rules in self.source.mapping.get(entity.__regid__, {}).iteritems():
+            try:
+                rel = rels[role][rtype]
+            except KeyError:
+                self.source.error('relation %s-%s doesn\'t seem exported in %s xml',
+                                  rtype, role, entity.__regid__)
+                continue
+            try:
+                actionmethod = self.action_methods[action]
+            except KeyError:
+                raise Exception('Unknown action %s' % action)
+            actionmethod(entity, rtype, role, rel, rules)
+        return entity
+
+    def before_entity_copy(self, entity, sourceparams):
+        """IDataFeedParser callback"""
+        attrs = extract_typed_attrs(entity.e_schema, sourceparams['item'])
+        entity.cw_edited.update(attrs)
+
+    def related_copy(self, entity, rtype, role, value, rules):
+        """implementation of 'copy' action
+
+        Takes no option.
+        """
+        assert not any(x[1] for x in rules), "'copy' action takes no option"
+        ttypes = set([x[0] for x in rules])
+        value = [item for item in value if item['cwtype'] in ttypes]
+        eids = [] # local eids
+        if not value:
+            self._clear_relation(entity, rtype, role, ttypes)
+            return
+        for item in value:
+            eids.append(self.process_one(self._complete_url(item)).eid)
+        self._set_relation(entity, rtype, role, eids)
+
+    def related_link(self, entity, rtype, role, value, rules):
+        """implementation of 'link' action
+
+        requires an options to control search of the linked entity.
+        """
+        for ttype, options in rules:
+            assert 'linkattr' in options, (
+                "'link-or-create' action require a list of attributes used to "
+                "search if the entity already exists")
+            self._related_link(entity, rtype, role, ttype, value, [options['linkattr']],
+                               self._log_not_found)
+
+    def related_link_or_create(self, entity, rtype, role, value, rules):
+        """implementation of 'link-or-create' action
+
+        requires an options to control search of the linked entity.
+        """
+        for ttype, options in rules:
+            assert 'linkattr' in options, (
+                "'link-or-create' action require a list of attributes used to "
+                "search if the entity already exists")
+            self._related_link(entity, rtype, role, ttype, value, [options['linkattr']],
+                               self._create_not_found)
+
+    def _log_not_found(self, entity, rtype, role, ritem, searchvalues):
+        self.source.error('can find %s entity with attributes %s',
+                          ritem['cwtype'], searchvalues)
+
+    def _create_not_found(self, entity, rtype, role, ritem, searchvalues):
+        return self._cw.create_entity(ritem['cwtype'], **searchvalues).eid
+
+    def _related_link(self, entity, rtype, role, ttype, value, searchattrs,
+                      notfound_callback):
+        eids = [] # local eids
+        for item in value:
+            if item['cwtype'] != ttype:
+                continue
+            if not all(attr in item for attr in searchattrs):
+                # need to fetch related entity's xml
+                ritems = list(self.parse(self._complete_url(item, False)))
+                assert len(ritems) == 1, 'unexpected xml'
+                ritem = ritems[0][0] # list of 2-uples
+                assert all(attr in ritem for attr in searchattrs), \
+                       'missing attribute, got %s expected keys %s' % (item, searchattrs)
+            else:
+                ritem = item
+            kwargs = dict((attr, ritem[attr]) for attr in searchattrs)
+            rql = build_search_rql(item['cwtype'], kwargs)
+            rset = self._cw.execute(rql, kwargs)
+            if rset:
+                assert len(rset) == 1
+                eids.append(rset[0][0])
+            else:
+                eid = notfound_callback(entity, rtype, role, ritem, kwargs)
+                if eid is not None:
+                    eids.append(eid)
+        if not eids:
+            self._clear_relation(entity, rtype, role, (ttype,))
+        else:
+            self._set_relation(entity, rtype, role, eids)
+
+    def _complete_url(self, item, add_relations=True):
+        itemurl = item['cwuri'] + '?vid=xml'
+        for rtype, role, _ in self.source.mapping.get(item['cwtype'], ()):
+            itemurl += '&relation=%s_%s' % (rtype, role)
+        return itemurl
+
+    def _clear_relation(self, entity, rtype, role, ttypes):
+        if entity.eid not in self.stats['created']:
+            if len(ttypes) > 1:
+                typerestr = ', Y is IN(%s)' % ','.join(ttypes)
+            else:
+                typerestr = ', Y is %s' % ','.join(ttypes)
+            self._cw.execute('DELETE ' + rtype_role_rql(rtype, role) + typerestr,
+                             {'x': entity.eid})
+
+    def _set_relation(self, entity, rtype, role, eids):
+        eidstr = ','.join(str(eid) for eid in eids)
+        rql = rtype_role_rql(rtype, role)
+        self._cw.execute('DELETE %s, NOT Y eid IN (%s)' % (rql, eidstr),
+                         {'x': entity.eid})
+        if role == 'object':
+            rql = 'SET %s, Y eid IN (%s), NOT Y %s X' % (rql, eidstr, rtype)
+        else:
+            rql = 'SET %s, Y eid IN (%s), NOT X %s Y' % (rql, eidstr, rtype)
+        self._cw.execute(rql, {'x': entity.eid})
--- a/sobjects/test/data/schema.py	Wed Feb 09 18:06:19 2011 +0100
+++ b/sobjects/test/data/schema.py	Wed Feb 09 18:06:24 2011 +0100
@@ -1,4 +1,4 @@
-# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
 #
 # This file is part of CubicWeb.
@@ -15,10 +15,7 @@
 #
 # You should have received a copy of the GNU Lesser General Public License along
 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
-"""
-
-"""
-from yams.buildobjs import RelationDefinition
+from yams.buildobjs import EntityType, RelationDefinition, String, SubjectRelation
 
 class comments(RelationDefinition):
     subject = 'Comment'
@@ -26,3 +23,6 @@
     cardinality='1*'
     composite='object'
 
+class Tag(EntityType):
+    name = String(unique=True)
+    tags = SubjectRelation('CWUser')
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sobjects/test/unittest_parsers.py	Wed Feb 09 18:06:24 2011 +0100
@@ -0,0 +1,170 @@
+# copyright 2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
+#
+# This file is part of CubicWeb.
+#
+# CubicWeb is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License along
+# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
+
+from datetime import datetime
+
+from cubicweb.devtools.testlib import CubicWebTC
+
+from cubicweb.sobjects.parsers import CWEntityXMLParser
+
+orig_parse = CWEntityXMLParser.parse
+
+def parse(url):
+    try:
+        url = RELATEDXML[url.split('?')[0]]
+    except KeyError:
+        pass
+    return orig_parse(url)
+
+def setUpModule():
+    CWEntityXMLParser.parse = staticmethod(parse)
+
+def tearDownModule():
+    CWEntityXMLParser.parse = orig_parse
+
+
+BASEXML = ''.join(u'''
+<rset size="1">
+ <CWUser eid="5" cwuri="http://pouet.org/eid/5">
+  <login>sthenault</login>
+  <upassword>toto</upassword>
+  <last_login_time>2011-01-25 14:14:06</last_login_time>
+  <creation_date>2010-01-22 10:27:59</creation_date>
+  <modification_date>2011-01-25 14:14:06</modification_date>
+  <use_email role="subject">
+    <EmailAddress cwuri="http://pouet.org/eid/6" eid="6"/>
+  </use_email>
+  <in_group role="subject">
+    <CWGroup cwuri="http://pouet.org/eid/7" eid="7"/>
+    <CWGroup cwuri="http://pouet.org/eid/8" eid="8"/>
+  </in_group>
+  <tags role="object">
+    <Tag cwuri="http://pouet.org/eid/9" eid="9"/>
+    <Tag cwuri="http://pouet.org/eid/10" eid="10"/>
+  </tags>
+ </CWUser>
+</rset>
+'''.splitlines())
+
+RELATEDXML ={
+    'http://pouet.org/eid/6': u'''
+<rset size="1">
+ <EmailAddress eid="6" cwuri="http://pouet.org/eid/6">
+  <address>syt@logilab.fr</address>
+  <modification_date>2010-04-13 14:35:56</modification_date>
+  <creation_date>2010-04-13 14:35:56</creation_date>
+ </EmailAddress>
+</rset>
+''',
+    'http://pouet.org/eid/7': u'''
+<rset size="1">
+ <CWGroup eid="7" cwuri="http://pouet.org/eid/7">
+  <name>users</name>
+ </CWGroup>
+</rset>
+''',
+    'http://pouet.org/eid/8': u'''
+<rset size="1">
+ <CWGroup eid="8" cwuri="http://pouet.org/eid/8">
+  <name>unknown</name>
+ </CWGroup>
+</rset>
+''',
+    'http://pouet.org/eid/9': u'''
+<rset size="1">
+ <Tag eid="9" cwuri="http://pouet.org/eid/9">
+  <name>hop</name>
+ </Tag>
+</rset>
+''',
+    'http://pouet.org/eid/10': u'''
+<rset size="1">
+ <Tag eid="10" cwuri="http://pouet.org/eid/10">
+  <name>unknown</name>
+ </Tag>
+</rset>
+''',
+    }
+
+class CWEntityXMLParserTC(CubicWebTC):
+    def setup_database(self):
+        req = self.request()
+        source = req.create_entity('CWSource', name=u'myfeed', type=u'datafeed',
+                                   parser=u'cw.entityxml', url=BASEXML)
+        self.commit()
+        source.init_mapping([(('CWUser', 'use_email', '*'),
+                              u'role=subject\naction=copy'),
+                             (('CWUser', 'in_group', '*'),
+                              u'role=subject\naction=link\nlinkattr=name'),
+                             (('*', 'tags', 'CWUser'),
+                              u'role=object\naction=link-or-create\nlinkattr=name'),
+                            ])
+        req.create_entity('Tag', name=u'hop')
+
+    def test_actions(self):
+        dfsource = self.repo.sources_by_uri['myfeed']
+        self.assertEqual(dfsource.mapping,
+                         {u'CWUser': {
+                             (u'in_group', u'subject', u'link'): [
+                                 (u'CWGroup', {u'linkattr': u'name'})],
+                             (u'tags', u'object', u'link-or-create'): [
+                                 (u'Tag', {u'linkattr': u'name'})],
+                             (u'use_email', u'subject', u'copy'): [
+                                 (u'EmailAddress', {})]
+                             }
+                          })
+        session = self.repo.internal_session()
+        stats = dfsource.pull_data(session, force=True)
+        self.assertEqual(sorted(stats.keys()), ['created', 'updated'])
+        self.assertEqual(len(stats['created']), 2)
+        self.assertEqual(stats['updated'], set())
+
+        user = self.execute('CWUser X WHERE X login "sthenault"').get_entity(0, 0)
+        self.assertEqual(user.creation_date, datetime(2010, 01, 22, 10, 27, 59))
+        self.assertEqual(user.modification_date, datetime(2011, 01, 25, 14, 14, 06))
+        self.assertEqual(user.cwuri, 'http://pouet.org/eid/5')
+        self.assertEqual(user.cw_source[0].name, 'myfeed')
+        self.assertEqual(len(user.use_email), 1)
+        # copy action
+        email = user.use_email[0]
+        self.assertEqual(email.address, 'syt@logilab.fr')
+        self.assertEqual(email.cwuri, 'http://pouet.org/eid/6')
+        self.assertEqual(email.cw_source[0].name, 'myfeed')
+        # link action
+        self.assertFalse(self.execute('CWGroup X WHERE X name "unknown"'))
+        groups = sorted([g.name for g in user.in_group])
+        self.assertEqual(groups, ['users'])
+        # link or create action
+        tags = sorted([t.name for t in user.reverse_tags])
+        self.assertEqual(tags, ['hop', 'unknown'])
+        tag = self.execute('Tag X WHERE X name "unknown"').get_entity(0, 0)
+        self.assertEqual(tag.cwuri, 'http://testing.fr/cubicweb/eid/%s' % tag.eid)
+        self.assertEqual(tag.cw_source[0].name, 'system')
+
+        stats = dfsource.pull_data(session, force=True)
+        self.assertEqual(stats['created'], set())
+        self.assertEqual(len(stats['updated']), 2)
+        self.repo._type_source_cache.clear()
+        self.repo._extid_cache.clear()
+        stats = dfsource.pull_data(session, force=True)
+        self.assertEqual(stats['created'], set())
+        self.assertEqual(len(stats['updated']), 2)
+
+if __name__ == '__main__':
+    from logilab.common.testlib import unittest_main
+    unittest_main()
--- a/web/views/xmlrss.py	Wed Feb 09 18:06:19 2011 +0100
+++ b/web/views/xmlrss.py	Wed Feb 09 18:06:24 2011 +0100
@@ -112,6 +112,10 @@
                 continue
             self.w(u'  <%s role="%s">\n' % (rtype, role))
             for related in entity.related(rtype, role, entities=True):
+                # XXX put unique attributes as xml attribute, they are much
+                # probably used to search existing entities in client data feed,
+                # and putting it here may avoid an extra request to get those
+                # attributes values
                 self.w(u'    <%s eid="%s" cwuri="%s"/>\n'
                        % (related.e_schema, related.eid,
                           xml_escape(related.cwuri)))
@@ -271,7 +275,6 @@
         if entity.creator:
             self._marker('dc:creator', entity.dc_creator())
 
-
     def _marker(self, marker, value):
         if value:
             self.w(u'  <%s>%s</%s>\n' % (marker, xml_escape(value), marker))