sobjects/test/unittest_cwxmlparser.py
changeset 11057 0b59724cb3f2
parent 11052 058bb3dc685f
child 11058 23eb30449fe5
equal deleted inserted replaced
11052:058bb3dc685f 11057:0b59724cb3f2
     1 # copyright 2011-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     3 #
       
     4 # This file is part of CubicWeb.
       
     5 #
       
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     7 # terms of the GNU Lesser General Public License as published by the Free
       
     8 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     9 # any later version.
       
    10 #
       
    11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    14 # details.
       
    15 #
       
    16 # You should have received a copy of the GNU Lesser General Public License along
       
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    18 
       
    19 from datetime import datetime
       
    20 
       
    21 from six.moves.urllib.parse import urlsplit, parse_qsl
       
    22 
       
    23 import pytz
       
    24 from cubicweb.devtools.testlib import CubicWebTC
       
    25 from cubicweb.sobjects.cwxmlparser import CWEntityXMLParser
       
    26 
       
    27 orig_parse = CWEntityXMLParser.parse
       
    28 
       
    29 def parse(self, url):
       
    30     try:
       
    31         url = RELATEDXML[url.split('?')[0]]
       
    32     except KeyError:
       
    33         pass
       
    34     return orig_parse(self, url)
       
    35 
       
    36 def setUpModule():
       
    37     CWEntityXMLParser.parse = parse
       
    38 
       
    39 def tearDownModule():
       
    40     CWEntityXMLParser.parse = orig_parse
       
    41 
       
    42 
       
    43 BASEXML = ''.join(u'''
       
    44 <rset size="1">
       
    45  <CWUser eid="5" cwuri="http://pouet.org/5" cwsource="system">
       
    46   <login>sthenault</login>
       
    47   <upassword>toto</upassword>
       
    48   <last_login_time>2011-01-25 14:14:06</last_login_time>
       
    49   <creation_date>2010-01-22 10:27:59</creation_date>
       
    50   <modification_date>2011-01-25 14:14:06</modification_date>
       
    51   <use_email role="subject">
       
    52     <EmailAddress cwuri="http://pouet.org/6" eid="6"/>
       
    53   </use_email>
       
    54   <in_group role="subject">
       
    55     <CWGroup cwuri="http://pouet.org/7" eid="7"/>
       
    56     <CWGroup cwuri="http://pouet.org/8" eid="8"/>
       
    57   </in_group>
       
    58   <tags role="object">
       
    59     <Tag cwuri="http://pouet.org/9" eid="9"/>
       
    60     <Tag cwuri="http://pouet.org/10" eid="10"/>
       
    61   </tags>
       
    62   <in_state role="subject">
       
    63     <State cwuri="http://pouet.org/11" eid="11" name="activated"/>
       
    64   </in_state>
       
    65  </CWUser>
       
    66 </rset>
       
    67 '''.splitlines())
       
    68 
       
    69 RELATEDXML = {
       
    70     'http://pouet.org/6': u'''
       
    71 <rset size="1">
       
    72  <EmailAddress eid="6" cwuri="http://pouet.org/6">
       
    73   <address>syt@logilab.fr</address>
       
    74   <modification_date>2010-04-13 14:35:56</modification_date>
       
    75   <creation_date>2010-04-13 14:35:56</creation_date>
       
    76   <tags role="object">
       
    77     <Tag cwuri="http://pouet.org/9" eid="9"/>
       
    78   </tags>
       
    79  </EmailAddress>
       
    80 </rset>
       
    81 ''',
       
    82     'http://pouet.org/7': u'''
       
    83 <rset size="1">
       
    84  <CWGroup eid="7" cwuri="http://pouet.org/7">
       
    85   <name>users</name>
       
    86   <tags role="object">
       
    87     <Tag cwuri="http://pouet.org/9" eid="9"/>
       
    88   </tags>
       
    89  </CWGroup>
       
    90 </rset>
       
    91 ''',
       
    92     'http://pouet.org/8': u'''
       
    93 <rset size="1">
       
    94  <CWGroup eid="8" cwuri="http://pouet.org/8">
       
    95   <name>unknown</name>
       
    96  </CWGroup>
       
    97 </rset>
       
    98 ''',
       
    99     'http://pouet.org/9': u'''
       
   100 <rset size="1">
       
   101  <Tag eid="9" cwuri="http://pouet.org/9">
       
   102   <name>hop</name>
       
   103  </Tag>
       
   104 </rset>
       
   105 ''',
       
   106     'http://pouet.org/10': u'''
       
   107 <rset size="1">
       
   108  <Tag eid="10" cwuri="http://pouet.org/10">
       
   109   <name>unknown</name>
       
   110  </Tag>
       
   111 </rset>
       
   112 ''',
       
   113     }
       
   114 
       
   115 
       
   116 OTHERXML = ''.join(u'''
       
   117 <rset size="1">
       
   118  <CWUser eid="5" cwuri="http://pouet.org/5" cwsource="myfeed">
       
   119   <login>sthenault</login>
       
   120   <upassword>toto</upassword>
       
   121   <last_login_time>2011-01-25 14:14:06</last_login_time>
       
   122   <creation_date>2010-01-22 10:27:59</creation_date>
       
   123   <modification_date>2011-01-25 14:14:06</modification_date>
       
   124   <in_group role="subject">
       
   125     <CWGroup cwuri="http://pouet.org/7" eid="7"/>
       
   126   </in_group>
       
   127  </CWUser>
       
   128 </rset>
       
   129 '''.splitlines()
       
   130 )
       
   131 
       
   132 
       
   133 class CWEntityXMLParserTC(CubicWebTC):
       
   134     """/!\ this test use a pre-setup database /!\, if you modify above xml,
       
   135     REMOVE THE DATABASE TEMPLATE else it won't be considered
       
   136     """
       
   137     test_db_id = 'xmlparser'
       
   138 
       
   139     def assertURLEquiv(self, first, second):
       
   140         # ignore ordering differences in query params
       
   141         parsed_first = urlsplit(first)
       
   142         parsed_second = urlsplit(second)
       
   143         self.assertEqual(parsed_first.scheme, parsed_second.scheme)
       
   144         self.assertEqual(parsed_first.netloc, parsed_second.netloc)
       
   145         self.assertEqual(parsed_first.path, parsed_second.path)
       
   146         self.assertEqual(parsed_first.fragment, parsed_second.fragment)
       
   147         self.assertCountEqual(parse_qsl(parsed_first.query), parse_qsl(parsed_second.query))
       
   148 
       
   149     @classmethod
       
   150     def pre_setup_database(cls, cnx, config):
       
   151         myfeed = cnx.create_entity('CWSource', name=u'myfeed', type=u'datafeed',
       
   152                                    parser=u'cw.entityxml', url=BASEXML)
       
   153         myotherfeed = cnx.create_entity('CWSource', name=u'myotherfeed', type=u'datafeed',
       
   154                                         parser=u'cw.entityxml', url=OTHERXML)
       
   155         cnx.commit()
       
   156         myfeed.init_mapping([(('CWUser', 'use_email', '*'),
       
   157                               u'role=subject\naction=copy'),
       
   158                              (('CWUser', 'in_group', '*'),
       
   159                               u'role=subject\naction=link\nlinkattr=name'),
       
   160                              (('CWUser', 'in_state', '*'),
       
   161                               u'role=subject\naction=link\nlinkattr=name'),
       
   162                              (('*', 'tags', '*'),
       
   163                               u'role=object\naction=link-or-create\nlinkattr=name'),
       
   164                             ])
       
   165         myotherfeed.init_mapping([(('CWUser', 'in_group', '*'),
       
   166                                    u'role=subject\naction=link\nlinkattr=name'),
       
   167                                   (('CWUser', 'in_state', '*'),
       
   168                                    u'role=subject\naction=link\nlinkattr=name'),
       
   169                                   ])
       
   170         cnx.create_entity('Tag', name=u'hop')
       
   171         cnx.commit()
       
   172 
       
   173     def test_complete_url(self):
       
   174         dfsource = self.repo.sources_by_uri['myfeed']
       
   175         with self.admin_access.repo_cnx() as cnx:
       
   176             parser = dfsource._get_parser(cnx)
       
   177             self.assertURLEquiv(parser.complete_url('http://www.cubicweb.org/CWUser'),
       
   178                                 'http://www.cubicweb.org/CWUser?relation=tags-object&relation=in_group-subject&relation=in_state-subject&relation=use_email-subject')
       
   179             self.assertURLEquiv(parser.complete_url('http://www.cubicweb.org/cwuser'),
       
   180                                 'http://www.cubicweb.org/cwuser?relation=tags-object&relation=in_group-subject&relation=in_state-subject&relation=use_email-subject')
       
   181             self.assertURLEquiv(parser.complete_url('http://www.cubicweb.org/cwuser?vid=rdf&relation=hop'),
       
   182                                 'http://www.cubicweb.org/cwuser?relation=hop&relation=tags-object&relation=in_group-subject&relation=in_state-subject&relation=use_email-subject&vid=rdf')
       
   183             self.assertURLEquiv(parser.complete_url('http://www.cubicweb.org/?rql=cwuser&vid=rdf&relation=hop'),
       
   184                                 'http://www.cubicweb.org/?rql=cwuser&relation=hop&vid=rdf')
       
   185             self.assertURLEquiv(parser.complete_url('http://www.cubicweb.org/?rql=cwuser&relation=hop'),
       
   186                                 'http://www.cubicweb.org/?rql=cwuser&relation=hop')
       
   187 
       
   188 
       
   189     def test_actions(self):
       
   190         dfsource = self.repo.sources_by_uri['myfeed']
       
   191         self.assertEqual(dfsource.mapping,
       
   192                          {u'CWUser': {
       
   193                              (u'in_group', u'subject', u'link'): [
       
   194                                  (u'CWGroup', {u'linkattr': u'name'})],
       
   195                              (u'in_state', u'subject', u'link'): [
       
   196                                  (u'State', {u'linkattr': u'name'})],
       
   197                              (u'tags', u'object', u'link-or-create'): [
       
   198                                  (u'Tag', {u'linkattr': u'name'})],
       
   199                              (u'use_email', u'subject', u'copy'): [
       
   200                                  (u'EmailAddress', {})]
       
   201                              },
       
   202                           u'CWGroup': {
       
   203                              (u'tags', u'object', u'link-or-create'): [
       
   204                                  (u'Tag', {u'linkattr': u'name'})],
       
   205                              },
       
   206                           u'EmailAddress': {
       
   207                              (u'tags', u'object', u'link-or-create'): [
       
   208                                  (u'Tag', {u'linkattr': u'name'})],
       
   209                              },
       
   210                           })
       
   211         with self.repo.internal_cnx() as cnx:
       
   212             stats = dfsource.pull_data(cnx, force=True, raise_on_error=True)
       
   213             self.assertEqual(sorted(stats), ['checked', 'created', 'updated'])
       
   214             self.assertEqual(len(stats['created']), 2)
       
   215             self.assertEqual(stats['updated'], set())
       
   216 
       
   217         with self.admin_access.web_request() as req:
       
   218             user = req.execute('CWUser X WHERE X login "sthenault"').get_entity(0, 0)
       
   219             self.assertEqual(user.creation_date, datetime(2010, 1, 22, 10, 27, 59, tzinfo=pytz.utc))
       
   220             self.assertEqual(user.modification_date, datetime(2011, 1, 25, 14, 14, 6, tzinfo=pytz.utc))
       
   221             self.assertEqual(user.cwuri, 'http://pouet.org/5')
       
   222             self.assertEqual(user.cw_source[0].name, 'myfeed')
       
   223             self.assertEqual(user.absolute_url(), 'http://pouet.org/5')
       
   224             self.assertEqual(len(user.use_email), 1)
       
   225             # copy action
       
   226             email = user.use_email[0]
       
   227             self.assertEqual(email.address, 'syt@logilab.fr')
       
   228             self.assertEqual(email.cwuri, 'http://pouet.org/6')
       
   229             self.assertEqual(email.absolute_url(), 'http://pouet.org/6')
       
   230             self.assertEqual(email.cw_source[0].name, 'myfeed')
       
   231             self.assertEqual(len(email.reverse_tags), 1)
       
   232             self.assertEqual(email.reverse_tags[0].name, 'hop')
       
   233             # link action
       
   234             self.assertFalse(req.execute('CWGroup X WHERE X name "unknown"'))
       
   235             groups = sorted([g.name for g in user.in_group])
       
   236             self.assertEqual(groups, ['users'])
       
   237             group = user.in_group[0]
       
   238             self.assertEqual(len(group.reverse_tags), 1)
       
   239             self.assertEqual(group.reverse_tags[0].name, 'hop')
       
   240             # link or create action
       
   241             tags = set([(t.name, t.cwuri.replace(str(t.eid), ''), t.cw_source[0].name)
       
   242                         for t in user.reverse_tags])
       
   243             self.assertEqual(tags, set((('hop', 'http://testing.fr/cubicweb/', 'system'),
       
   244                                         ('unknown', 'http://testing.fr/cubicweb/', 'system')))
       
   245                              )
       
   246         with self.repo.internal_cnx() as cnx:
       
   247             stats = dfsource.pull_data(cnx, force=True, raise_on_error=True)
       
   248             self.assertEqual(stats['created'], set())
       
   249             self.assertEqual(len(stats['updated']), 0)
       
   250             self.assertEqual(len(stats['checked']), 2)
       
   251             self.repo._type_source_cache.clear()
       
   252             self.repo._extid_cache.clear()
       
   253             stats = dfsource.pull_data(cnx, force=True, raise_on_error=True)
       
   254             self.assertEqual(stats['created'], set())
       
   255             self.assertEqual(len(stats['updated']), 0)
       
   256             self.assertEqual(len(stats['checked']), 2)
       
   257 
       
   258             # test move to system source
       
   259             cnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': email.eid})
       
   260             cnx.commit()
       
   261             rset = cnx.execute('EmailAddress X WHERE X address "syt@logilab.fr"')
       
   262             self.assertEqual(len(rset), 1)
       
   263             e = rset.get_entity(0, 0)
       
   264             self.assertEqual(e.eid, email.eid)
       
   265             self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', 'uri': u'system',
       
   266                                                                  'use-cwuri-as-url': False},
       
   267                                                       'type': 'EmailAddress',
       
   268                                                       'extid': None})
       
   269             self.assertEqual(e.cw_source[0].name, 'system')
       
   270             self.assertEqual(e.reverse_use_email[0].login, 'sthenault')
       
   271             # test everything is still fine after source synchronization
       
   272             # clear caches to make sure we look at the moved_entities table
       
   273             self.repo._type_source_cache.clear()
       
   274             self.repo._extid_cache.clear()
       
   275             stats = dfsource.pull_data(cnx, force=True, raise_on_error=True)
       
   276             self.assertEqual(stats['updated'], set((email.eid,)))
       
   277             rset = cnx.execute('EmailAddress X WHERE X address "syt@logilab.fr"')
       
   278             self.assertEqual(len(rset), 1)
       
   279             e = rset.get_entity(0, 0)
       
   280             self.assertEqual(e.eid, email.eid)
       
   281             self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', 'uri': u'system',
       
   282                                                                  'use-cwuri-as-url': False},
       
   283                                                       'type': 'EmailAddress',
       
   284                                                       'extid': None})
       
   285             self.assertEqual(e.cw_source[0].name, 'system')
       
   286             self.assertEqual(e.reverse_use_email[0].login, 'sthenault')
       
   287             cnx.commit()
       
   288 
       
   289             # test delete entity
       
   290             e.cw_delete()
       
   291             cnx.commit()
       
   292             # test everything is still fine after source synchronization
       
   293             stats = dfsource.pull_data(cnx, force=True, raise_on_error=True)
       
   294             rset = cnx.execute('EmailAddress X WHERE X address "syt@logilab.fr"')
       
   295             self.assertEqual(len(rset), 0)
       
   296             rset = cnx.execute('Any X WHERE X use_email E, X login "sthenault"')
       
   297             self.assertEqual(len(rset), 0)
       
   298 
       
   299     def test_external_entity(self):
       
   300         dfsource = self.repo.sources_by_uri['myotherfeed']
       
   301         with self.repo.internal_cnx() as cnx:
       
   302             stats = dfsource.pull_data(cnx, force=True, raise_on_error=True)
       
   303             user = cnx.execute('CWUser X WHERE X login "sthenault"').get_entity(0, 0)
       
   304             self.assertEqual(user.creation_date, datetime(2010, 1, 22, 10, 27, 59, tzinfo=pytz.utc))
       
   305             self.assertEqual(user.modification_date, datetime(2011, 1, 25, 14, 14, 6, tzinfo=pytz.utc))
       
   306             self.assertEqual(user.cwuri, 'http://pouet.org/5')
       
   307             self.assertEqual(user.cw_source[0].name, 'myfeed')
       
   308 
       
   309     def test_noerror_missing_fti_attribute(self):
       
   310         dfsource = self.repo.sources_by_uri['myfeed']
       
   311         with self.repo.internal_cnx() as cnx:
       
   312             parser = dfsource._get_parser(cnx)
       
   313             dfsource.process_urls(parser, ['''
       
   314 <rset size="1">
       
   315  <Card eid="50" cwuri="http://pouet.org/50" cwsource="system">
       
   316   <title>how-to</title>
       
   317  </Card>
       
   318 </rset>
       
   319 '''], raise_on_error=True)
       
   320 
       
   321     def test_noerror_unspecified_date(self):
       
   322         dfsource = self.repo.sources_by_uri['myfeed']
       
   323         with self.repo.internal_cnx() as cnx:
       
   324             parser = dfsource._get_parser(cnx)
       
   325             dfsource.process_urls(parser, ['''
       
   326 <rset size="1">
       
   327  <Card eid="50" cwuri="http://pouet.org/50" cwsource="system">
       
   328   <title>how-to</title>
       
   329   <content>how-to</content>
       
   330   <synopsis>how-to</synopsis>
       
   331   <creation_date/>
       
   332  </Card>
       
   333 </rset>
       
   334 '''], raise_on_error=True)
       
   335 
       
   336 if __name__ == '__main__':
       
   337     from logilab.common.testlib import unittest_main
       
   338     unittest_main()