cubicweb/server/test/unittest_serverctl.py
author Sylvain Thénault <sylvain.thenault@logilab.fr>
Fri, 10 Mar 2017 14:04:26 +0100
changeset 12028 08c866d2f11d
parent 12016 88ed82a25f8a
child 12568 fc45f22c8100
permissions -rw-r--r--
[repo] Kill repo._sessions and with it the need to have a looping task to clean it up, and with that all the session's timestamping machinery.

import os.path as osp
import shutil

from mock import patch

from cubicweb import ExecutionError
from cubicweb.devtools import testlib, ApptestConfiguration
from cubicweb.server.serverctl import (
    DBDumpCommand,
    RepositorySchedulerCommand,
    SynchronizeSourceCommand,
)
from cubicweb.server.serverconfig import ServerConfiguration


class ServerCTLTC(testlib.CubicWebTC):

    def setUp(self):
        super(ServerCTLTC, self).setUp()
        self.orig_config_for = ServerConfiguration.config_for

        def config_for(appid):
            return ApptestConfiguration(appid, __file__)

        ServerConfiguration.config_for = staticmethod(config_for)

    def tearDown(self):
        ServerConfiguration.config_for = self.orig_config_for
        super(ServerCTLTC, self).tearDown()

    def test_dump(self):
        DBDumpCommand(None).run([self.appid])
        shutil.rmtree(osp.join(self.config.apphome, 'backup'))

    def test_scheduler(self):
        cmd = RepositorySchedulerCommand(None)
        with patch('sched.scheduler.run',
                   side_effect=RuntimeError('boom')) as patched_run:
            with self.assertRaises(RuntimeError) as exc_cm:
                with self.assertLogs('cubicweb.repository', level='INFO') as log_cm:
                    cmd.run([self.appid])
        # make sure repository scheduler started
        scheduler_start_message = (
            'INFO:cubicweb.repository:starting repository scheduler with '
            'tasks: update_feeds, expire_dataimports'
        )
        self.assertIn(scheduler_start_message, log_cm.output)
        # and that scheduler's run method got called
        self.assertIn('boom', str(exc_cm.exception))
        patched_run.assert_called_once_with()
        # make sure repository's shutdown method got called
        repo_shutdown_message = 'INFO:cubicweb.repository:shutting down repository'
        self.assertIn(repo_shutdown_message, log_cm.output)

    def test_source_sync(self):
        with self.admin_access.repo_cnx() as cnx:
            cnx.create_entity('CWSource', name=u'success_feed', type=u'datafeed',
                              parser=u'test_source_parser_success',
                              url=u'ignored')
            cnx.create_entity('CWSource', name=u'fail_feed', type=u'datafeed',
                              parser=u'test_source_parser_fail',
                              url=u'ignored')
            cnx.commit()

            cmd = SynchronizeSourceCommand(None)
            cmd.config.force = 1

            # Should sync all sources even if one failed
            with self.assertRaises(ExecutionError) as exc:
                cmd.run([self.appid])
            self.assertEqual(len(cnx.find('Card', title=u'success')), 1)
            self.assertEqual(len(cnx.find('Card', title=u'fail')), 0)
            self.assertEqual(str(exc.exception), 'All sources where not synced')

            # call with named sources
            cmd.run([self.appid, u'success_feed'])
            self.assertEqual(len(cnx.find('Card', title=u'success')), 2)

            with self.assertRaises(ExecutionError) as exc:
                cmd.run([self.appid, u'fail_feed'])
            self.assertEqual(str(exc.exception), 'All sources where not synced')
            self.assertEqual(len(cnx.find('Card', title=u'fail')), 0)


if __name__ == '__main__':
    from unittest import main
    main()