sobjects/test/unittest_cwxmlparser.py
author Pierre-Yves David <pierre-yves.david@logilab.fr>
Fri, 29 Mar 2013 12:34:53 +0100
changeset 8842 63fa6b02b241
parent 8696 0bb18407c053
child 9748 5ee3d16b0df0
permissions -rw-r--r--
[transaction] keep a reference to the repo object

# copyright 2011-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.

from datetime import datetime

from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.sobjects.cwxmlparser import CWEntityXMLParser

orig_parse = CWEntityXMLParser.parse

def parse(self, url):
    try:
        url = RELATEDXML[url.split('?')[0]]
    except KeyError:
        pass
    return orig_parse(self, url)

def setUpModule():
    CWEntityXMLParser.parse = parse

def tearDownModule():
    CWEntityXMLParser.parse = orig_parse


BASEXML = ''.join(u'''
<rset size="1">
 <CWUser eid="5" cwuri="http://pouet.org/5" cwsource="system">
  <login>sthenault</login>
  <upassword>toto</upassword>
  <last_login_time>2011-01-25 14:14:06</last_login_time>
  <creation_date>2010-01-22 10:27:59</creation_date>
  <modification_date>2011-01-25 14:14:06</modification_date>
  <use_email role="subject">
    <EmailAddress cwuri="http://pouet.org/6" eid="6"/>
  </use_email>
  <in_group role="subject">
    <CWGroup cwuri="http://pouet.org/7" eid="7"/>
    <CWGroup cwuri="http://pouet.org/8" eid="8"/>
  </in_group>
  <tags role="object">
    <Tag cwuri="http://pouet.org/9" eid="9"/>
    <Tag cwuri="http://pouet.org/10" eid="10"/>
  </tags>
  <in_state role="subject">
    <State cwuri="http://pouet.org/11" eid="11" name="activated"/>
  </in_state>
 </CWUser>
</rset>
'''.splitlines())

RELATEDXML = {
    'http://pouet.org/6': u'''
<rset size="1">
 <EmailAddress eid="6" cwuri="http://pouet.org/6">
  <address>syt@logilab.fr</address>
  <modification_date>2010-04-13 14:35:56</modification_date>
  <creation_date>2010-04-13 14:35:56</creation_date>
  <tags role="object">
    <Tag cwuri="http://pouet.org/9" eid="9"/>
  </tags>
 </EmailAddress>
</rset>
''',
    'http://pouet.org/7': u'''
<rset size="1">
 <CWGroup eid="7" cwuri="http://pouet.org/7">
  <name>users</name>
  <tags role="object">
    <Tag cwuri="http://pouet.org/9" eid="9"/>
  </tags>
 </CWGroup>
</rset>
''',
    'http://pouet.org/8': u'''
<rset size="1">
 <CWGroup eid="8" cwuri="http://pouet.org/8">
  <name>unknown</name>
 </CWGroup>
</rset>
''',
    'http://pouet.org/9': u'''
<rset size="1">
 <Tag eid="9" cwuri="http://pouet.org/9">
  <name>hop</name>
 </Tag>
</rset>
''',
    'http://pouet.org/10': u'''
<rset size="1">
 <Tag eid="10" cwuri="http://pouet.org/10">
  <name>unknown</name>
 </Tag>
</rset>
''',
    }


OTHERXML = ''.join(u'''
<rset size="1">
 <CWUser eid="5" cwuri="http://pouet.org/5" cwsource="myfeed">
  <login>sthenault</login>
  <upassword>toto</upassword>
  <last_login_time>2011-01-25 14:14:06</last_login_time>
  <creation_date>2010-01-22 10:27:59</creation_date>
  <modification_date>2011-01-25 14:14:06</modification_date>
  <in_group role="subject">
    <CWGroup cwuri="http://pouet.org/7" eid="7"/>
  </in_group>
 </CWUser>
</rset>
'''.splitlines()
)


class CWEntityXMLParserTC(CubicWebTC):
    """/!\ this test use a pre-setup database /!\, if you modify above xml,
    REMOVE THE DATABASE TEMPLATE else it won't be considered
    """
    test_db_id = 'xmlparser'
    @classmethod
    def pre_setup_database(cls, session, config):
        myfeed = session.create_entity('CWSource', name=u'myfeed', type=u'datafeed',
                                   parser=u'cw.entityxml', url=BASEXML)
        myotherfeed = session.create_entity('CWSource', name=u'myotherfeed', type=u'datafeed',
                                            parser=u'cw.entityxml', url=OTHERXML)
        session.commit()
        myfeed.init_mapping([(('CWUser', 'use_email', '*'),
                              u'role=subject\naction=copy'),
                             (('CWUser', 'in_group', '*'),
                              u'role=subject\naction=link\nlinkattr=name'),
                             (('CWUser', 'in_state', '*'),
                              u'role=subject\naction=link\nlinkattr=name'),
                             (('*', 'tags', '*'),
                              u'role=object\naction=link-or-create\nlinkattr=name'),
                            ])
        myotherfeed.init_mapping([(('CWUser', 'in_group', '*'),
                                   u'role=subject\naction=link\nlinkattr=name'),
                                  (('CWUser', 'in_state', '*'),
                                   u'role=subject\naction=link\nlinkattr=name'),
                                  ])
        session.create_entity('Tag', name=u'hop')

    def test_complete_url(self):
        dfsource = self.repo.sources_by_uri['myfeed']
        parser = dfsource._get_parser(self.session)
        self.assertEqual(parser.complete_url('http://www.cubicweb.org/CWUser'),
                         'http://www.cubicweb.org/CWUser?relation=tags-object&relation=in_group-subject&relation=in_state-subject&relation=use_email-subject')
        self.assertEqual(parser.complete_url('http://www.cubicweb.org/cwuser'),
                         'http://www.cubicweb.org/cwuser?relation=tags-object&relation=in_group-subject&relation=in_state-subject&relation=use_email-subject')
        self.assertEqual(parser.complete_url('http://www.cubicweb.org/cwuser?vid=rdf&relation=hop'),
                         'http://www.cubicweb.org/cwuser?relation=hop&relation=tags-object&relation=in_group-subject&relation=in_state-subject&relation=use_email-subject&vid=rdf')
        self.assertEqual(parser.complete_url('http://www.cubicweb.org/?rql=cwuser&vid=rdf&relation=hop'),
                         'http://www.cubicweb.org/?rql=cwuser&relation=hop&vid=rdf')
        self.assertEqual(parser.complete_url('http://www.cubicweb.org/?rql=cwuser&relation=hop'),
                         'http://www.cubicweb.org/?rql=cwuser&relation=hop')


    def test_actions(self):
        dfsource = self.repo.sources_by_uri['myfeed']
        self.assertEqual(dfsource.mapping,
                         {u'CWUser': {
                             (u'in_group', u'subject', u'link'): [
                                 (u'CWGroup', {u'linkattr': u'name'})],
                             (u'in_state', u'subject', u'link'): [
                                 (u'State', {u'linkattr': u'name'})],
                             (u'tags', u'object', u'link-or-create'): [
                                 (u'Tag', {u'linkattr': u'name'})],
                             (u'use_email', u'subject', u'copy'): [
                                 (u'EmailAddress', {})]
                             },
                          u'CWGroup': {
                             (u'tags', u'object', u'link-or-create'): [
                                 (u'Tag', {u'linkattr': u'name'})],
                             },
                          u'EmailAddress': {
                             (u'tags', u'object', u'link-or-create'): [
                                 (u'Tag', {u'linkattr': u'name'})],
                             },
                          })
        session = self.repo.internal_session(safe=True)
        stats = dfsource.pull_data(session, force=True, raise_on_error=True)
        self.assertEqual(sorted(stats), ['checked', 'created', 'updated'])
        self.assertEqual(len(stats['created']), 2)
        self.assertEqual(stats['updated'], set())

        user = self.execute('CWUser X WHERE X login "sthenault"').get_entity(0, 0)
        self.assertEqual(user.creation_date, datetime(2010, 01, 22, 10, 27, 59))
        self.assertEqual(user.modification_date, datetime(2011, 01, 25, 14, 14, 06))
        self.assertEqual(user.cwuri, 'http://pouet.org/5')
        self.assertEqual(user.cw_source[0].name, 'myfeed')
        self.assertEqual(user.absolute_url(), 'http://pouet.org/5')
        self.assertEqual(len(user.use_email), 1)
        # copy action
        email = user.use_email[0]
        self.assertEqual(email.address, 'syt@logilab.fr')
        self.assertEqual(email.cwuri, 'http://pouet.org/6')
        self.assertEqual(email.absolute_url(), 'http://pouet.org/6')
        self.assertEqual(email.cw_source[0].name, 'myfeed')
        self.assertEqual(len(email.reverse_tags), 1)
        self.assertEqual(email.reverse_tags[0].name, 'hop')
        # link action
        self.assertFalse(self.execute('CWGroup X WHERE X name "unknown"'))
        groups = sorted([g.name for g in user.in_group])
        self.assertEqual(groups, ['users'])
        group = user.in_group[0]
        self.assertEqual(len(group.reverse_tags), 1)
        self.assertEqual(group.reverse_tags[0].name, 'hop')
        # link or create action
        tags = set([(t.name, t.cwuri.replace(str(t.eid), ''), t.cw_source[0].name)
                    for t in user.reverse_tags])
        self.assertEqual(tags, set((('hop', 'http://testing.fr/cubicweb/', 'system'),
                                    ('unknown', 'http://testing.fr/cubicweb/', 'system')))
                         )
        session.set_cnxset()
        with session.security_enabled(read=False): # avoid Unauthorized due to password selection
            stats = dfsource.pull_data(session, force=True, raise_on_error=True)
        self.assertEqual(stats['created'], set())
        self.assertEqual(len(stats['updated']), 0)
        self.assertEqual(len(stats['checked']), 2)
        self.repo._type_source_cache.clear()
        self.repo._extid_cache.clear()
        session.set_cnxset()
        with session.security_enabled(read=False): # avoid Unauthorized due to password selection
            stats = dfsource.pull_data(session, force=True, raise_on_error=True)
        self.assertEqual(stats['created'], set())
        self.assertEqual(len(stats['updated']), 0)
        self.assertEqual(len(stats['checked']), 2)
        session.commit()

        # test move to system source
        self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': email.eid})
        self.commit()
        rset = self.sexecute('EmailAddress X WHERE X address "syt@logilab.fr"')
        self.assertEqual(len(rset), 1)
        e = rset.get_entity(0, 0)
        self.assertEqual(e.eid, email.eid)
        self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', 'uri': u'system',
                                                             'use-cwuri-as-url': False},
                                                  'type': 'EmailAddress',
                                                  'extid': None})
        self.assertEqual(e.cw_source[0].name, 'system')
        self.assertEqual(e.reverse_use_email[0].login, 'sthenault')
        self.commit()
        # test everything is still fine after source synchronization
        session.set_cnxset()
        with session.security_enabled(read=False): # avoid Unauthorized due to password selection
            stats = dfsource.pull_data(session, force=True, raise_on_error=True)
        rset = self.sexecute('EmailAddress X WHERE X address "syt@logilab.fr"')
        self.assertEqual(len(rset), 1)
        e = rset.get_entity(0, 0)
        self.assertEqual(e.eid, email.eid)
        self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', 'uri': u'system',
                                                             'use-cwuri-as-url': False},
                                                  'type': 'EmailAddress',
                                                  'extid': None})
        self.assertEqual(e.cw_source[0].name, 'system')
        self.assertEqual(e.reverse_use_email[0].login, 'sthenault')
        session.commit()

        # test delete entity
        e.cw_delete()
        self.commit()
        # test everything is still fine after source synchronization
        session.set_cnxset()
        with session.security_enabled(read=False): # avoid Unauthorized due to password selection
            stats = dfsource.pull_data(session, force=True, raise_on_error=True)
        rset = self.sexecute('EmailAddress X WHERE X address "syt@logilab.fr"')
        self.assertEqual(len(rset), 0)
        rset = self.sexecute('Any X WHERE X use_email E, X login "sthenault"')
        self.assertEqual(len(rset), 0)

    def test_external_entity(self):
        dfsource = self.repo.sources_by_uri['myotherfeed']
        session = self.repo.internal_session(safe=True)
        stats = dfsource.pull_data(session, force=True, raise_on_error=True)
        user = self.execute('CWUser X WHERE X login "sthenault"').get_entity(0, 0)
        self.assertEqual(user.creation_date, datetime(2010, 01, 22, 10, 27, 59))
        self.assertEqual(user.modification_date, datetime(2011, 01, 25, 14, 14, 06))
        self.assertEqual(user.cwuri, 'http://pouet.org/5')
        self.assertEqual(user.cw_source[0].name, 'myfeed')

    def test_noerror_missing_fti_attribute(self):
        dfsource = self.repo.sources_by_uri['myfeed']
        session = self.repo.internal_session(safe=True)
        parser = dfsource._get_parser(session)
        dfsource.process_urls(parser, ['''
<rset size="1">
 <Card eid="50" cwuri="http://pouet.org/50" cwsource="system">
  <title>how-to</title>
 </Card>
</rset>
'''], raise_on_error=True)

    def test_noerror_unspecified_date(self):
        dfsource = self.repo.sources_by_uri['myfeed']
        session = self.repo.internal_session(safe=True)
        parser = dfsource._get_parser(session)
        dfsource.process_urls(parser, ['''
<rset size="1">
 <Card eid="50" cwuri="http://pouet.org/50" cwsource="system">
  <title>how-to</title>
  <content>how-to</content>
  <synopsis>how-to</synopsis>
  <creation_date/>
 </Card>
</rset>
'''], raise_on_error=True)

if __name__ == '__main__':
    from logilab.common.testlib import unittest_main
    unittest_main()