cubicweb/server/test/unittest_datafeed.py
author Denis Laxalde <denis.laxalde@logilab.fr>
Mon, 06 Mar 2017 13:21:50 +0100
changeset 12011 d2888fee6031
parent 11774 51c160677afe
child 12142 db2fc87348ab
permissions -rw-r--r--
[server] introduce a scheduler class to run repository "looping tasks" We just use the sched module from the standard library and introduce a tiny Python2/3 compatibility layer (more for convenience actually). The "looping" aspect of tasks (previously in LoopTask class) is re-implemeted as a `schedule_periodic_task` function. This is a reasonably thin layer as compared to LoopTask/TasksManager classes. Only the "restart" aspect of LoopTask is no longer present as I'm not sure it's worth keeping. The advantage of using this (in addition to eventually dropping our custom code) is that this scheduler class provides a `run` method that blocks the process while running tasks in its queue. So we can rely on this to have a 'scheduler' ctl command (see forthcoming patch) that would only run "looping tasks" without having to implement the "blocking" aspect ourself. Related to #17057223.

# coding: utf-8
# copyright 2011-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.

from datetime import timedelta
from contextlib import contextmanager

from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.server.sources import datafeed
from cubicweb.dataimport.stores import NoHookRQLObjectStore, MetaGenerator


class DataFeedTC(CubicWebTC):
    def setup_database(self):
        with self.admin_access.repo_cnx() as cnx:
            with self.base_parser(cnx):
                cnx.create_entity('CWSource', name=u'ô myfeed', type=u'datafeed',
                                  parser=u'testparser', url=u'ignored',
                                  config=u'synchronization-interval=1min')
                cnx.commit()

    @contextmanager
    def base_parser(self, session):
        class AParser(datafeed.DataFeedParser):
            __regid__ = 'testparser'

            def process(self, url, raise_on_error=False):
                metagenerator = MetaGenerator(self._cw, source=self.source)
                store = NoHookRQLObjectStore(self._cw, metagenerator)
                store.prepare_insert_entity('Card',
                                            cwuri=u'http://www.cubicweb.org/',
                                            title=u'cubicweb.org',
                                            content=u'the cw web site')
                store.flush()
                store.commit()

        with self.temporary_appobjects(AParser):
            if u'ô myfeed' in self.repo.sources_by_uri:
                yield self.repo.sources_by_uri[u'ô myfeed']._get_parser(session)
            else:
                yield
        # vreg.unregister just pops appobjects from their regid entry,
        # completely remove the entry to ensure we have no side effect with
        # this empty entry.
        del self.vreg['parsers'][AParser.__regid__]

    def test(self):
        self.assertIn(u'ô myfeed', self.repo.sources_by_uri)
        dfsource = self.repo.sources_by_uri[u'ô myfeed']
        self.assertNotIn('use_cwuri_as_url', dfsource.__dict__)
        self.assertEqual({'type': u'datafeed', 'uri': u'ô myfeed', 'use-cwuri-as-url': True},
                         dfsource.public_config)
        self.assertEqual(dfsource.use_cwuri_as_url, True)
        self.assertEqual(dfsource.latest_retrieval, None)
        self.assertEqual(dfsource.synchro_interval, timedelta(seconds=60))
        self.assertFalse(dfsource.fresh())
        # ensure source's logger name has been unormalized
        self.assertEqual(dfsource.info.__self__.name, 'cubicweb.sources.o myfeed')

        with self.repo.internal_cnx() as cnx:
            with self.base_parser(cnx):
                stats = dfsource.pull_data(cnx, force=True, raise_on_error=True)
                cnx.commit()
                # test import stats
                self.assertEqual(sorted(stats), ['checked', 'created', 'updated'])
                entity = cnx.execute('Card X').get_entity(0, 0)
                # test imported entities
                self.assertEqual(entity.title, 'cubicweb.org')
                self.assertEqual(entity.content, 'the cw web site')
                self.assertEqual(entity.cwuri, 'http://www.cubicweb.org/')
                self.assertEqual(entity.cw_source[0].name, u'ô myfeed')
                # test repo cache keys
                self.assertEqual(self.repo._type_cache[entity.eid], 'Card')

                self.assertTrue(dfsource.latest_retrieval)
                self.assertTrue(dfsource.fresh())

        # test_rename_source
        with self.admin_access.repo_cnx() as cnx:
            cnx.entity_from_eid(dfsource.eid).cw_set(name=u"myrenamedfeed")
            cnx.commit()
            entity = cnx.execute('Card X').get_entity(0, 0)
            self.assertEqual(entity.cwuri, 'http://www.cubicweb.org/')
            self.assertEqual(entity.cw_source[0].name, 'myrenamedfeed')
            self.assertEqual(self.repo._type_cache[entity.eid], 'Card')

            # test_delete_source
            cnx.execute('DELETE CWSource S WHERE S name "myrenamedfeed"')
            cnx.commit()
            self.assertFalse(cnx.execute('Card X WHERE X title "cubicweb.org"'))
            self.assertFalse(cnx.execute('Any X WHERE X has_text "cubicweb.org"'))

    def test_parser_retrieve_url_local(self):
        with self.admin_access.repo_cnx() as cnx:
            with self.base_parser(cnx) as parser:
                value = parser.retrieve_url('a string')
                self.assertEqual(200, value.getcode())
                self.assertEqual('a string', value.geturl())

    def test_update_url(self):
        dfsource = self.repo.sources_by_uri[u'ô myfeed']
        with self.admin_access.repo_cnx() as cnx:
            cnx.entity_from_eid(dfsource.eid).cw_set(url=u"http://pouet.com\nhttp://pouet.org")
            self.assertEqual(dfsource.urls, [u'ignored'])
            cnx.commit()
        self.assertEqual(dfsource.urls, [u"http://pouet.com", u"http://pouet.org"])

    def test_parser_not_found(self):
        dfsource = self.repo.sources_by_uri[u'ô myfeed']
        with self.assertLogs('cubicweb.sources.o myfeed', level='ERROR') as cm:
            with self.repo.internal_cnx() as cnx:
                stats = dfsource.pull_data(cnx, force=True)
                importlog = cnx.find('CWDataImport').one().log
        self.assertIn('failed to load parser for', cm.output[0])
        self.assertEqual(stats, {})
        self.assertIn(u'failed to load parser for source &quot;ô myfeed&quot;',
                      importlog)


class DataFeedConfigTC(CubicWebTC):

    def test_use_cwuri_as_url_override(self):
        with self.admin_access.client_cnx() as cnx:
            cnx.create_entity('CWSource', name=u'myfeed', type=u'datafeed',
                              parser=u'testparser', url=u'ignored',
                              config=u'use-cwuri-as-url=no')
            cnx.commit()
        dfsource = self.repo.sources_by_uri['myfeed']
        self.assertEqual(dfsource.use_cwuri_as_url, False)
        self.assertEqual({'type': u'datafeed', 'uri': u'myfeed', 'use-cwuri-as-url': False},
                         dfsource.public_config)

if __name__ == '__main__':
    from logilab.common.testlib import unittest_main
    unittest_main()