48 content=u'the cw web site') |
48 content=u'the cw web site') |
49 store.flush() |
49 store.flush() |
50 store.commit() |
50 store.commit() |
51 |
51 |
52 with self.temporary_appobjects(AParser): |
52 with self.temporary_appobjects(AParser): |
53 if u'ô myfeed' in self.repo.sources_by_uri: |
53 try: |
54 yield self.repo.sources_by_uri[u'ô myfeed']._get_parser(session) |
54 source = self.repo.source_by_uri(u'ô myfeed') |
|
55 except ValueError: |
|
56 yield |
55 else: |
57 else: |
56 yield |
58 yield source._get_parser(session) |
57 # vreg.unregister just pops appobjects from their regid entry, |
59 # vreg.unregister just pops appobjects from their regid entry, |
58 # completely remove the entry to ensure we have no side effect with |
60 # completely remove the entry to ensure we have no side effect with |
59 # this empty entry. |
61 # this empty entry. |
60 del self.vreg['parsers'][AParser.__regid__] |
62 del self.vreg['parsers'][AParser.__regid__] |
61 |
63 |
62 def test(self): |
64 def test(self): |
63 self.assertIn(u'ô myfeed', self.repo.sources_by_uri) |
65 dfsource = self.repo.source_by_uri(u'ô myfeed') |
64 dfsource = self.repo.sources_by_uri[u'ô myfeed'] |
|
65 self.assertNotIn('use_cwuri_as_url', dfsource.__dict__) |
66 self.assertNotIn('use_cwuri_as_url', dfsource.__dict__) |
66 self.assertEqual({'type': u'datafeed', 'uri': u'ô myfeed', 'use-cwuri-as-url': True}, |
67 self.assertEqual({'type': u'datafeed', 'uri': u'ô myfeed', 'use-cwuri-as-url': True}, |
67 dfsource.public_config) |
68 dfsource.public_config) |
68 self.assertEqual(dfsource.use_cwuri_as_url, True) |
69 self.assertEqual(dfsource.use_cwuri_as_url, True) |
69 self.assertEqual(dfsource.latest_retrieval, None) |
70 self.assertEqual(dfsource.latest_retrieval, None) |
111 value = parser.retrieve_url('a string') |
112 value = parser.retrieve_url('a string') |
112 self.assertEqual(200, value.getcode()) |
113 self.assertEqual(200, value.getcode()) |
113 self.assertEqual('a string', value.geturl()) |
114 self.assertEqual('a string', value.geturl()) |
114 |
115 |
115 def test_update_url(self): |
116 def test_update_url(self): |
116 dfsource = self.repo.sources_by_uri[u'ô myfeed'] |
117 dfsource = self.repo.source_by_uri(u'ô myfeed') |
117 with self.admin_access.repo_cnx() as cnx: |
118 with self.admin_access.repo_cnx() as cnx: |
118 cnx.entity_from_eid(dfsource.eid).cw_set(url=u"http://pouet.com\nhttp://pouet.org") |
119 cnx.entity_from_eid(dfsource.eid).cw_set(url=u"http://pouet.com\nhttp://pouet.org") |
119 cnx.commit() |
120 cnx.commit() |
120 self.assertEqual(dfsource.urls, [u'ignored']) |
121 self.assertEqual(dfsource.urls, [u'ignored']) |
121 dfsource = self.repo.sources_by_uri[u'ô myfeed'] |
122 dfsource = self.repo.source_by_uri(u'ô myfeed') |
122 self.assertEqual(dfsource.urls, [u"http://pouet.com", u"http://pouet.org"]) |
123 self.assertEqual(dfsource.urls, [u"http://pouet.com", u"http://pouet.org"]) |
123 |
124 |
124 def test_parser_not_found(self): |
125 def test_parser_not_found(self): |
125 dfsource = self.repo.sources_by_uri[u'ô myfeed'] |
126 dfsource = self.repo.source_by_uri(u'ô myfeed') |
126 with self.assertLogs('cubicweb.sources.o myfeed', level='ERROR') as cm: |
127 with self.assertLogs('cubicweb.sources.o myfeed', level='ERROR') as cm: |
127 with self.repo.internal_cnx() as cnx: |
128 with self.repo.internal_cnx() as cnx: |
128 stats = dfsource.pull_data(cnx, force=True) |
129 stats = dfsource.pull_data(cnx, force=True) |
129 importlog = cnx.find('CWDataImport').one().log |
130 importlog = cnx.find('CWDataImport').one().log |
130 self.assertIn('failed to load parser for', cm.output[0]) |
131 self.assertIn('failed to load parser for', cm.output[0]) |
139 with self.admin_access.client_cnx() as cnx: |
140 with self.admin_access.client_cnx() as cnx: |
140 cnx.create_entity('CWSource', name=u'myfeed', type=u'datafeed', |
141 cnx.create_entity('CWSource', name=u'myfeed', type=u'datafeed', |
141 parser=u'testparser', url=u'ignored', |
142 parser=u'testparser', url=u'ignored', |
142 config=u'use-cwuri-as-url=no') |
143 config=u'use-cwuri-as-url=no') |
143 cnx.commit() |
144 cnx.commit() |
144 dfsource = self.repo.sources_by_uri['myfeed'] |
145 dfsource = self.repo.source_by_uri('myfeed') |
145 self.assertEqual(dfsource.use_cwuri_as_url, False) |
146 self.assertEqual(dfsource.use_cwuri_as_url, False) |
146 self.assertEqual({'type': u'datafeed', 'uri': u'myfeed', 'use-cwuri-as-url': False}, |
147 self.assertEqual({'type': u'datafeed', 'uri': u'myfeed', 'use-cwuri-as-url': False}, |
147 dfsource.public_config) |
148 dfsource.public_config) |
148 |
149 |
149 if __name__ == '__main__': |
150 if __name__ == '__main__': |