diff -r 058bb3dc685f -r 0b59724cb3f2 cubicweb/sobjects/test/unittest_cwxmlparser.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cubicweb/sobjects/test/unittest_cwxmlparser.py Sat Jan 16 13:48:51 2016 +0100 @@ -0,0 +1,338 @@ +# copyright 2011-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr +# +# This file is part of CubicWeb. +# +# CubicWeb is free software: you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 2.1 of the License, or (at your option) +# any later version. +# +# CubicWeb is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with CubicWeb. If not, see . + +from datetime import datetime + +from six.moves.urllib.parse import urlsplit, parse_qsl + +import pytz +from cubicweb.devtools.testlib import CubicWebTC +from cubicweb.sobjects.cwxmlparser import CWEntityXMLParser + +orig_parse = CWEntityXMLParser.parse + +def parse(self, url): + try: + url = RELATEDXML[url.split('?')[0]] + except KeyError: + pass + return orig_parse(self, url) + +def setUpModule(): + CWEntityXMLParser.parse = parse + +def tearDownModule(): + CWEntityXMLParser.parse = orig_parse + + +BASEXML = ''.join(u''' + + + sthenault + toto + 2011-01-25 14:14:06 + 2010-01-22 10:27:59 + 2011-01-25 14:14:06 + + + + + + + + + + + + + + + + +'''.splitlines()) + +RELATEDXML = { + 'http://pouet.org/6': u''' + + +
syt@logilab.fr
+ 2010-04-13 14:35:56 + 2010-04-13 14:35:56 + + + +
+
+''', + 'http://pouet.org/7': u''' + + + users + + + + + +''', + 'http://pouet.org/8': u''' + + + unknown + + +''', + 'http://pouet.org/9': u''' + + + hop + + +''', + 'http://pouet.org/10': u''' + + + unknown + + +''', + } + + +OTHERXML = ''.join(u''' + + + sthenault + toto + 2011-01-25 14:14:06 + 2010-01-22 10:27:59 + 2011-01-25 14:14:06 + + + + + +'''.splitlines() +) + + +class CWEntityXMLParserTC(CubicWebTC): + """/!\ this test use a pre-setup database /!\, if you modify above xml, + REMOVE THE DATABASE TEMPLATE else it won't be considered + """ + test_db_id = 'xmlparser' + + def assertURLEquiv(self, first, second): + # ignore ordering differences in query params + parsed_first = urlsplit(first) + parsed_second = urlsplit(second) + self.assertEqual(parsed_first.scheme, parsed_second.scheme) + self.assertEqual(parsed_first.netloc, parsed_second.netloc) + self.assertEqual(parsed_first.path, parsed_second.path) + self.assertEqual(parsed_first.fragment, parsed_second.fragment) + self.assertCountEqual(parse_qsl(parsed_first.query), parse_qsl(parsed_second.query)) + + @classmethod + def pre_setup_database(cls, cnx, config): + myfeed = cnx.create_entity('CWSource', name=u'myfeed', type=u'datafeed', + parser=u'cw.entityxml', url=BASEXML) + myotherfeed = cnx.create_entity('CWSource', name=u'myotherfeed', type=u'datafeed', + parser=u'cw.entityxml', url=OTHERXML) + cnx.commit() + myfeed.init_mapping([(('CWUser', 'use_email', '*'), + u'role=subject\naction=copy'), + (('CWUser', 'in_group', '*'), + u'role=subject\naction=link\nlinkattr=name'), + (('CWUser', 'in_state', '*'), + u'role=subject\naction=link\nlinkattr=name'), + (('*', 'tags', '*'), + u'role=object\naction=link-or-create\nlinkattr=name'), + ]) + myotherfeed.init_mapping([(('CWUser', 'in_group', '*'), + u'role=subject\naction=link\nlinkattr=name'), + (('CWUser', 'in_state', '*'), + u'role=subject\naction=link\nlinkattr=name'), + ]) + cnx.create_entity('Tag', name=u'hop') + cnx.commit() + + def test_complete_url(self): + dfsource = self.repo.sources_by_uri['myfeed'] + with self.admin_access.repo_cnx() as cnx: + parser = dfsource._get_parser(cnx) + self.assertURLEquiv(parser.complete_url('http://www.cubicweb.org/CWUser'), + 'http://www.cubicweb.org/CWUser?relation=tags-object&relation=in_group-subject&relation=in_state-subject&relation=use_email-subject') + self.assertURLEquiv(parser.complete_url('http://www.cubicweb.org/cwuser'), + 'http://www.cubicweb.org/cwuser?relation=tags-object&relation=in_group-subject&relation=in_state-subject&relation=use_email-subject') + self.assertURLEquiv(parser.complete_url('http://www.cubicweb.org/cwuser?vid=rdf&relation=hop'), + 'http://www.cubicweb.org/cwuser?relation=hop&relation=tags-object&relation=in_group-subject&relation=in_state-subject&relation=use_email-subject&vid=rdf') + self.assertURLEquiv(parser.complete_url('http://www.cubicweb.org/?rql=cwuser&vid=rdf&relation=hop'), + 'http://www.cubicweb.org/?rql=cwuser&relation=hop&vid=rdf') + self.assertURLEquiv(parser.complete_url('http://www.cubicweb.org/?rql=cwuser&relation=hop'), + 'http://www.cubicweb.org/?rql=cwuser&relation=hop') + + + def test_actions(self): + dfsource = self.repo.sources_by_uri['myfeed'] + self.assertEqual(dfsource.mapping, + {u'CWUser': { + (u'in_group', u'subject', u'link'): [ + (u'CWGroup', {u'linkattr': u'name'})], + (u'in_state', u'subject', u'link'): [ + (u'State', {u'linkattr': u'name'})], + (u'tags', u'object', u'link-or-create'): [ + (u'Tag', {u'linkattr': u'name'})], + (u'use_email', u'subject', u'copy'): [ + (u'EmailAddress', {})] + }, + u'CWGroup': { + (u'tags', u'object', u'link-or-create'): [ + (u'Tag', {u'linkattr': u'name'})], + }, + u'EmailAddress': { + (u'tags', u'object', u'link-or-create'): [ + (u'Tag', {u'linkattr': u'name'})], + }, + }) + with self.repo.internal_cnx() as cnx: + stats = dfsource.pull_data(cnx, force=True, raise_on_error=True) + self.assertEqual(sorted(stats), ['checked', 'created', 'updated']) + self.assertEqual(len(stats['created']), 2) + self.assertEqual(stats['updated'], set()) + + with self.admin_access.web_request() as req: + user = req.execute('CWUser X WHERE X login "sthenault"').get_entity(0, 0) + self.assertEqual(user.creation_date, datetime(2010, 1, 22, 10, 27, 59, tzinfo=pytz.utc)) + self.assertEqual(user.modification_date, datetime(2011, 1, 25, 14, 14, 6, tzinfo=pytz.utc)) + self.assertEqual(user.cwuri, 'http://pouet.org/5') + self.assertEqual(user.cw_source[0].name, 'myfeed') + self.assertEqual(user.absolute_url(), 'http://pouet.org/5') + self.assertEqual(len(user.use_email), 1) + # copy action + email = user.use_email[0] + self.assertEqual(email.address, 'syt@logilab.fr') + self.assertEqual(email.cwuri, 'http://pouet.org/6') + self.assertEqual(email.absolute_url(), 'http://pouet.org/6') + self.assertEqual(email.cw_source[0].name, 'myfeed') + self.assertEqual(len(email.reverse_tags), 1) + self.assertEqual(email.reverse_tags[0].name, 'hop') + # link action + self.assertFalse(req.execute('CWGroup X WHERE X name "unknown"')) + groups = sorted([g.name for g in user.in_group]) + self.assertEqual(groups, ['users']) + group = user.in_group[0] + self.assertEqual(len(group.reverse_tags), 1) + self.assertEqual(group.reverse_tags[0].name, 'hop') + # link or create action + tags = set([(t.name, t.cwuri.replace(str(t.eid), ''), t.cw_source[0].name) + for t in user.reverse_tags]) + self.assertEqual(tags, set((('hop', 'http://testing.fr/cubicweb/', 'system'), + ('unknown', 'http://testing.fr/cubicweb/', 'system'))) + ) + with self.repo.internal_cnx() as cnx: + stats = dfsource.pull_data(cnx, force=True, raise_on_error=True) + self.assertEqual(stats['created'], set()) + self.assertEqual(len(stats['updated']), 0) + self.assertEqual(len(stats['checked']), 2) + self.repo._type_source_cache.clear() + self.repo._extid_cache.clear() + stats = dfsource.pull_data(cnx, force=True, raise_on_error=True) + self.assertEqual(stats['created'], set()) + self.assertEqual(len(stats['updated']), 0) + self.assertEqual(len(stats['checked']), 2) + + # test move to system source + cnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': email.eid}) + cnx.commit() + rset = cnx.execute('EmailAddress X WHERE X address "syt@logilab.fr"') + self.assertEqual(len(rset), 1) + e = rset.get_entity(0, 0) + self.assertEqual(e.eid, email.eid) + self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', 'uri': u'system', + 'use-cwuri-as-url': False}, + 'type': 'EmailAddress', + 'extid': None}) + self.assertEqual(e.cw_source[0].name, 'system') + self.assertEqual(e.reverse_use_email[0].login, 'sthenault') + # test everything is still fine after source synchronization + # clear caches to make sure we look at the moved_entities table + self.repo._type_source_cache.clear() + self.repo._extid_cache.clear() + stats = dfsource.pull_data(cnx, force=True, raise_on_error=True) + self.assertEqual(stats['updated'], set((email.eid,))) + rset = cnx.execute('EmailAddress X WHERE X address "syt@logilab.fr"') + self.assertEqual(len(rset), 1) + e = rset.get_entity(0, 0) + self.assertEqual(e.eid, email.eid) + self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', 'uri': u'system', + 'use-cwuri-as-url': False}, + 'type': 'EmailAddress', + 'extid': None}) + self.assertEqual(e.cw_source[0].name, 'system') + self.assertEqual(e.reverse_use_email[0].login, 'sthenault') + cnx.commit() + + # test delete entity + e.cw_delete() + cnx.commit() + # test everything is still fine after source synchronization + stats = dfsource.pull_data(cnx, force=True, raise_on_error=True) + rset = cnx.execute('EmailAddress X WHERE X address "syt@logilab.fr"') + self.assertEqual(len(rset), 0) + rset = cnx.execute('Any X WHERE X use_email E, X login "sthenault"') + self.assertEqual(len(rset), 0) + + def test_external_entity(self): + dfsource = self.repo.sources_by_uri['myotherfeed'] + with self.repo.internal_cnx() as cnx: + stats = dfsource.pull_data(cnx, force=True, raise_on_error=True) + user = cnx.execute('CWUser X WHERE X login "sthenault"').get_entity(0, 0) + self.assertEqual(user.creation_date, datetime(2010, 1, 22, 10, 27, 59, tzinfo=pytz.utc)) + self.assertEqual(user.modification_date, datetime(2011, 1, 25, 14, 14, 6, tzinfo=pytz.utc)) + self.assertEqual(user.cwuri, 'http://pouet.org/5') + self.assertEqual(user.cw_source[0].name, 'myfeed') + + def test_noerror_missing_fti_attribute(self): + dfsource = self.repo.sources_by_uri['myfeed'] + with self.repo.internal_cnx() as cnx: + parser = dfsource._get_parser(cnx) + dfsource.process_urls(parser, [''' + + + how-to + + +'''], raise_on_error=True) + + def test_noerror_unspecified_date(self): + dfsource = self.repo.sources_by_uri['myfeed'] + with self.repo.internal_cnx() as cnx: + parser = dfsource._get_parser(cnx) + dfsource.process_urls(parser, [''' + + + how-to + how-to + how-to + + + +'''], raise_on_error=True) + +if __name__ == '__main__': + from logilab.common.testlib import unittest_main + unittest_main()