cubicweb/server/test/unittest_serverctl.py
author Sylvain Thénault <sylvain.thenault@logilab.fr>
Fri, 20 Jan 2017 18:17:04 +0100
changeset 11919 3a6746dfc57f
parent 11355 47b0b08fbb4b
child 12002 26453d9467f6
permissions -rw-r--r--
Change hooks control (deny_all_hooks_but / allow_all_hooks_but) to be more predictable Prior to this, if one execute code like: with cnx.hooks.deny_all_hooks_but('metadata'): with cnx.hooks.deny_all_hooks_but(): # mycode 'metadata' hooks will be activated anyway in the inner block, which is rather unexpected (of course in real life you only see the latest hooks control statement, the former being higher in the call stack). This is due to the underlying usage of old `enable_hook_categories` / `disable_hook_categories` methods, which were introduced much before the now official context manager based API (with `cnx.[deny|all]_all_hooks_but(...)`). To move on, this patch drop the two legacy methods, rename and privatize related internal state on the connection (`hooks_mode` becomes `_hooks_mode`, `disabled_hook_cats` and `enabled_hook_cats` become `_hooks_categories`) and reimplement the `_hooks_control` context manager to simply update them. See the added unit test for details.

import os.path as osp
import shutil

from cubicweb import ExecutionError
from cubicweb.devtools import testlib, ApptestConfiguration
from cubicweb.server.serverctl import _local_dump, DBDumpCommand, SynchronizeSourceCommand
from cubicweb.server.serverconfig import ServerConfiguration

class ServerCTLTC(testlib.CubicWebTC):
    def setUp(self):
        super(ServerCTLTC, self).setUp()
        self.orig_config_for = ServerConfiguration.config_for
        config_for = lambda appid: ApptestConfiguration(appid, __file__)
        ServerConfiguration.config_for = staticmethod(config_for)

    def tearDown(self):
        ServerConfiguration.config_for = self.orig_config_for
        super(ServerCTLTC, self).tearDown()

    def test_dump(self):
        DBDumpCommand(None).run([self.appid])
        shutil.rmtree(osp.join(self.config.apphome, 'backup'))

    def test_source_sync(self):
        with self.admin_access.repo_cnx() as cnx:
            cnx.create_entity('CWSource', name=u'success_feed', type=u'datafeed',
                              parser=u'test_source_parser_success',
                              url=u'ignored')
            cnx.create_entity('CWSource', name=u'fail_feed', type=u'datafeed',
                              parser=u'test_source_parser_fail',
                              url=u'ignored')
            cnx.commit()

            cmd = SynchronizeSourceCommand(None)
            cmd.config.force = 1

            # Should sync all sources even if one failed
            with self.assertRaises(ExecutionError) as exc:
                cmd.run([self.appid])
            self.assertEqual(len(cnx.find('Card', title=u'success')), 1)
            self.assertEqual(len(cnx.find('Card', title=u'fail')), 0)
            self.assertEqual(str(exc.exception), 'All sources where not synced')

            # call with named sources
            cmd.run([self.appid, u'success_feed'])
            self.assertEqual(len(cnx.find('Card', title=u'success')), 2)

            with self.assertRaises(ExecutionError) as exc:
                cmd.run([self.appid, u'fail_feed'])
            self.assertEqual(str(exc.exception), 'All sources where not synced')
            self.assertEqual(len(cnx.find('Card', title=u'fail')), 0)


if __name__ == '__main__':
    from unittest import main
    main()