[server] introduce a scheduler class to run repository "looping tasks"
We just use the sched module from the standard library and introduce a tiny
Python2/3 compatibility layer (more for convenience actually). The "looping"
aspect of tasks (previously in LoopTask class) is re-implemeted as a
`schedule_periodic_task` function. This is a reasonably thin layer as compared
to LoopTask/TasksManager classes. Only the "restart" aspect of LoopTask is no
longer present as I'm not sure it's worth keeping.
The advantage of using this (in addition to eventually dropping our custom
code) is that this scheduler class provides a `run` method that blocks the
process while running tasks in its queue. So we can rely on this to have a
'scheduler' ctl command (see forthcoming patch) that would only run "looping
tasks" without having to implement the "blocking" aspect ourself.
Related to #17057223.
# coding: utf-8
# copyright 2011-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
from datetime import timedelta
from contextlib import contextmanager
from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.server.sources import datafeed
from cubicweb.dataimport.stores import NoHookRQLObjectStore, MetaGenerator
class DataFeedTC(CubicWebTC):
def setup_database(self):
with self.admin_access.repo_cnx() as cnx:
with self.base_parser(cnx):
cnx.create_entity('CWSource', name=u'ô myfeed', type=u'datafeed',
parser=u'testparser', url=u'ignored',
config=u'synchronization-interval=1min')
cnx.commit()
@contextmanager
def base_parser(self, session):
class AParser(datafeed.DataFeedParser):
__regid__ = 'testparser'
def process(self, url, raise_on_error=False):
metagenerator = MetaGenerator(self._cw, source=self.source)
store = NoHookRQLObjectStore(self._cw, metagenerator)
store.prepare_insert_entity('Card',
cwuri=u'http://www.cubicweb.org/',
title=u'cubicweb.org',
content=u'the cw web site')
store.flush()
store.commit()
with self.temporary_appobjects(AParser):
if u'ô myfeed' in self.repo.sources_by_uri:
yield self.repo.sources_by_uri[u'ô myfeed']._get_parser(session)
else:
yield
# vreg.unregister just pops appobjects from their regid entry,
# completely remove the entry to ensure we have no side effect with
# this empty entry.
del self.vreg['parsers'][AParser.__regid__]
def test(self):
self.assertIn(u'ô myfeed', self.repo.sources_by_uri)
dfsource = self.repo.sources_by_uri[u'ô myfeed']
self.assertNotIn('use_cwuri_as_url', dfsource.__dict__)
self.assertEqual({'type': u'datafeed', 'uri': u'ô myfeed', 'use-cwuri-as-url': True},
dfsource.public_config)
self.assertEqual(dfsource.use_cwuri_as_url, True)
self.assertEqual(dfsource.latest_retrieval, None)
self.assertEqual(dfsource.synchro_interval, timedelta(seconds=60))
self.assertFalse(dfsource.fresh())
# ensure source's logger name has been unormalized
self.assertEqual(dfsource.info.__self__.name, 'cubicweb.sources.o myfeed')
with self.repo.internal_cnx() as cnx:
with self.base_parser(cnx):
stats = dfsource.pull_data(cnx, force=True, raise_on_error=True)
cnx.commit()
# test import stats
self.assertEqual(sorted(stats), ['checked', 'created', 'updated'])
entity = cnx.execute('Card X').get_entity(0, 0)
# test imported entities
self.assertEqual(entity.title, 'cubicweb.org')
self.assertEqual(entity.content, 'the cw web site')
self.assertEqual(entity.cwuri, 'http://www.cubicweb.org/')
self.assertEqual(entity.cw_source[0].name, u'ô myfeed')
# test repo cache keys
self.assertEqual(self.repo._type_cache[entity.eid], 'Card')
self.assertTrue(dfsource.latest_retrieval)
self.assertTrue(dfsource.fresh())
# test_rename_source
with self.admin_access.repo_cnx() as cnx:
cnx.entity_from_eid(dfsource.eid).cw_set(name=u"myrenamedfeed")
cnx.commit()
entity = cnx.execute('Card X').get_entity(0, 0)
self.assertEqual(entity.cwuri, 'http://www.cubicweb.org/')
self.assertEqual(entity.cw_source[0].name, 'myrenamedfeed')
self.assertEqual(self.repo._type_cache[entity.eid], 'Card')
# test_delete_source
cnx.execute('DELETE CWSource S WHERE S name "myrenamedfeed"')
cnx.commit()
self.assertFalse(cnx.execute('Card X WHERE X title "cubicweb.org"'))
self.assertFalse(cnx.execute('Any X WHERE X has_text "cubicweb.org"'))
def test_parser_retrieve_url_local(self):
with self.admin_access.repo_cnx() as cnx:
with self.base_parser(cnx) as parser:
value = parser.retrieve_url('a string')
self.assertEqual(200, value.getcode())
self.assertEqual('a string', value.geturl())
def test_update_url(self):
dfsource = self.repo.sources_by_uri[u'ô myfeed']
with self.admin_access.repo_cnx() as cnx:
cnx.entity_from_eid(dfsource.eid).cw_set(url=u"http://pouet.com\nhttp://pouet.org")
self.assertEqual(dfsource.urls, [u'ignored'])
cnx.commit()
self.assertEqual(dfsource.urls, [u"http://pouet.com", u"http://pouet.org"])
def test_parser_not_found(self):
dfsource = self.repo.sources_by_uri[u'ô myfeed']
with self.assertLogs('cubicweb.sources.o myfeed', level='ERROR') as cm:
with self.repo.internal_cnx() as cnx:
stats = dfsource.pull_data(cnx, force=True)
importlog = cnx.find('CWDataImport').one().log
self.assertIn('failed to load parser for', cm.output[0])
self.assertEqual(stats, {})
self.assertIn(u'failed to load parser for source "ô myfeed"',
importlog)
class DataFeedConfigTC(CubicWebTC):
def test_use_cwuri_as_url_override(self):
with self.admin_access.client_cnx() as cnx:
cnx.create_entity('CWSource', name=u'myfeed', type=u'datafeed',
parser=u'testparser', url=u'ignored',
config=u'use-cwuri-as-url=no')
cnx.commit()
dfsource = self.repo.sources_by_uri['myfeed']
self.assertEqual(dfsource.use_cwuri_as_url, False)
self.assertEqual({'type': u'datafeed', 'uri': u'myfeed', 'use-cwuri-as-url': False},
dfsource.public_config)
if __name__ == '__main__':
from logilab.common.testlib import unittest_main
unittest_main()