server/test/unittest_datafeed.py
changeset 9824 30183ecf5c61
parent 9822 4a118bfd6ab4
child 9897 fa44db7da2dc
--- a/server/test/unittest_datafeed.py	Thu Mar 20 08:55:44 2014 +0100
+++ b/server/test/unittest_datafeed.py	Tue Mar 25 09:20:37 2014 +0100
@@ -16,7 +16,9 @@
 # You should have received a copy of the GNU Lesser General Public License along
 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
 
+import mimetools
 from datetime import timedelta
+from contextlib import contextmanager
 
 from cubicweb.devtools.testlib import CubicWebTC
 from cubicweb.server.sources import datafeed
@@ -25,10 +27,30 @@
 class DataFeedTC(CubicWebTC):
     def setup_database(self):
         with self.admin_access.repo_cnx() as cnx:
-            cnx.create_entity('CWSource', name=u'myfeed', type=u'datafeed',
-                              parser=u'testparser', url=u'ignored',
-                              config=u'synchronization-interval=1min')
-            cnx.commit()
+            with self.base_parser(cnx):
+                cnx.create_entity('CWSource', name=u'myfeed', type=u'datafeed',
+                                  parser=u'testparser', url=u'ignored',
+                                  config=u'synchronization-interval=1min')
+                cnx.commit()
+
+    @contextmanager
+    def base_parser(self, session):
+        class AParser(datafeed.DataFeedParser):
+            __regid__ = 'testparser'
+            def process(self, url, raise_on_error=False):
+                entity = self.extid2entity('http://www.cubicweb.org/', 'Card',
+                                           item={'title': u'cubicweb.org',
+                                                 'content': u'the cw web site'})
+                if not self.created_during_pull(entity):
+                    self.notify_updated(entity)
+            def before_entity_copy(self, entity, sourceparams):
+                entity.cw_edited.update(sourceparams['item'])
+
+        with self.temporary_appobjects(AParser):
+            if 'myfeed' in self.repo.sources_by_uri:
+                yield self.repo.sources_by_uri['myfeed']._get_parser(session)
+            else:
+                yield
 
     def test(self):
         self.assertIn('myfeed', self.repo.sources_by_uri)
@@ -41,20 +63,8 @@
         self.assertEqual(dfsource.synchro_interval, timedelta(seconds=60))
         self.assertFalse(dfsource.fresh())
 
-
-        class AParser(datafeed.DataFeedParser):
-            __regid__ = 'testparser'
-            def process(self, url, raise_on_error=False):
-                entity = self.extid2entity('http://www.cubicweb.org/', 'Card',
-                                           item={'title': u'cubicweb.org',
-                                                 'content': u'the cw web site'})
-                if not self.created_during_pull(entity):
-                    self.notify_updated(entity)
-            def before_entity_copy(self, entity, sourceparams):
-                entity.cw_edited.update(sourceparams['item'])
-
-        with self.temporary_appobjects(AParser):
-            with self.repo.internal_cnx() as cnx:
+        with self.repo.internal_cnx() as cnx:
+            with self.base_parser(cnx):
                 stats = dfsource.pull_data(cnx, force=True)
                 cnx.commit()
                 # test import stats
@@ -123,6 +133,14 @@
             self.assertFalse(cnx.execute('Card X WHERE X title "cubicweb.org"'))
             self.assertFalse(cnx.execute('Any X WHERE X has_text "cubicweb.org"'))
 
+    def test_parser_retrieve_url_local(self):
+        with self.admin_access.repo_cnx() as cnx:
+            with self.base_parser(cnx) as parser:
+                value = parser.retrieve_url('a string')
+                self.assertEqual(200, value.getcode())
+                self.assertEqual('a string', value.geturl())
+                self.assertIsInstance(value.info(), mimetools.Message)
+
 
 class DataFeedConfigTC(CubicWebTC):