diff -r 058bb3dc685f -r 0b59724cb3f2 cubicweb/devtools/qunit.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cubicweb/devtools/qunit.py Sat Jan 16 13:48:51 2016 +0100 @@ -0,0 +1,293 @@ +# copyright 2010-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr +# +# This file is part of CubicWeb. +# +# CubicWeb is free software: you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 2.1 of the License, or (at your option) +# any later version. +# +# CubicWeb is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with CubicWeb. If not, see . +from __future__ import absolute_import + +import os, os.path as osp +import errno +from tempfile import mkdtemp +from subprocess import Popen, PIPE, STDOUT + +from six.moves.queue import Queue, Empty + +# imported by default to simplify further import statements +from logilab.common.testlib import unittest_main, with_tempdir, InnerTest, Tags +import webtest.http + +import cubicweb +from cubicweb.view import View +from cubicweb.web.controller import Controller +from cubicweb.web.views.staticcontrollers import StaticFileController, STATIC_CONTROLLERS +from cubicweb.devtools import webtest as cwwebtest + + +class FirefoxHelper(object): + + def __init__(self, url=None): + self._process = None + self._profile_dir = mkdtemp(prefix='cwtest-ffxprof-') + self.firefox_cmd = ['firefox', '-no-remote'] + if os.name == 'posix': + self.firefox_cmd = [osp.join(osp.dirname(__file__), 'data', 'xvfb-run.sh'), + '-a', '-s', '-noreset -screen 0 800x600x24'] + self.firefox_cmd + + def test(self): + try: + proc = Popen(['firefox', '--help'], stdout=PIPE, stderr=STDOUT) + stdout, _ = proc.communicate() + return proc.returncode == 0, stdout + except OSError as exc: + if exc.errno == errno.ENOENT: + msg = '[%s] %s' % (errno.errorcode[exc.errno], exc.strerror) + return False, msg + raise + + def start(self, url): + self.stop() + cmd = self.firefox_cmd + ['-silent', '--profile', self._profile_dir, + '-url', url] + with open(os.devnull, 'w') as fnull: + self._process = Popen(cmd, stdout=fnull, stderr=fnull) + + def stop(self): + if self._process is not None: + assert self._process.returncode is None, self._process.returncode + self._process.terminate() + self._process.wait() + self._process = None + + def __del__(self): + self.stop() + + +class QUnitTestCase(cwwebtest.CubicWebTestTC): + + tags = cwwebtest.CubicWebTestTC.tags | Tags(('qunit',)) + + # testfile, (dep_a, dep_b) + all_js_tests = () + + def setUp(self): + super(QUnitTestCase, self).setUp() + self.test_queue = Queue() + class MyQUnitResultController(QUnitResultController): + tc = self + test_queue = self.test_queue + self._qunit_controller = MyQUnitResultController + self.webapp.app.appli.vreg.register(MyQUnitResultController) + self.webapp.app.appli.vreg.register(QUnitView) + self.webapp.app.appli.vreg.register(CWDevtoolsStaticController) + self.server = webtest.http.StopableWSGIServer.create(self.webapp.app) + self.config.global_set_option('base-url', self.server.application_url) + + def tearDown(self): + self.server.shutdown() + self.webapp.app.appli.vreg.unregister(self._qunit_controller) + self.webapp.app.appli.vreg.unregister(QUnitView) + self.webapp.app.appli.vreg.unregister(CWDevtoolsStaticController) + super(QUnitTestCase, self).tearDown() + + def test_javascripts(self): + for args in self.all_js_tests: + self.assertIn(len(args), (1, 2)) + test_file = args[0] + if len(args) > 1: + depends = args[1] + else: + depends = () + for js_test in self._test_qunit(test_file, depends): + yield js_test + + @with_tempdir + def _test_qunit(self, test_file, depends=(), timeout=10): + QUnitView.test_file = test_file + QUnitView.depends = depends + + while not self.test_queue.empty(): + self.test_queue.get(False) + + browser = FirefoxHelper() + isavailable, reason = browser.test() + if not isavailable: + self.fail('firefox not available or not working properly (%s)' % reason) + browser.start(self.config['base-url'] + "?vid=qunit") + test_count = 0 + error = False + def raise_exception(cls, *data): + raise cls(*data) + while not error: + try: + result, test_name, msg = self.test_queue.get(timeout=timeout) + test_name = '%s (%s)' % (test_name, test_file) + self.set_description(test_name) + if result is None: + break + test_count += 1 + if result: + yield InnerTest(test_name, lambda : 1) + else: + yield InnerTest(test_name, self.fail, msg) + except Empty: + error = True + msg = '%s inactivity timeout (%is). %i test results received' + yield InnerTest(test_file, raise_exception, RuntimeError, + msg % (test_file, timeout, test_count)) + browser.stop() + if test_count <= 0 and not error: + yield InnerTest(test_name, raise_exception, RuntimeError, + 'No test yielded by qunit for %s' % test_file) + +class QUnitResultController(Controller): + + __regid__ = 'qunit_result' + + + # Class variables to circumvent the instantiation of a new Controller for each request. + _log_stack = [] # store QUnit log messages + _current_module_name = '' # store the current QUnit module name + + def publish(self, rset=None): + event = self._cw.form['event'] + getattr(self, 'handle_%s' % event)() + return b'' + + def handle_module_start(self): + self.__class__._current_module_name = self._cw.form.get('name', '') + + def handle_test_done(self): + name = '%s // %s' % (self._current_module_name, self._cw.form.get('name', '')) + failures = int(self._cw.form.get('failures', 0)) + total = int(self._cw.form.get('total', 0)) + + self._log_stack.append('%i/%i assertions failed' % (failures, total)) + msg = '\n'.join(self._log_stack) + + if failures: + self.tc.test_queue.put((False, name, msg)) + else: + self.tc.test_queue.put((True, name, msg)) + self._log_stack[:] = [] + + def handle_done(self): + self.tc.test_queue.put((None, None, None)) + + def handle_log(self): + result = self._cw.form['result'] + message = self._cw.form.get('message', '') + actual = self._cw.form.get('actual') + expected = self._cw.form.get('expected') + source = self._cw.form.get('source') + log = '%s: %s' % (result, message) + if result == 'false' and actual is not None and expected is not None: + log += ' (got: %s, expected: %s)' % (actual, expected) + if source is not None: + log += '\n' + source + self._log_stack.append(log) + + +class QUnitView(View): + __regid__ = 'qunit' + + templatable = False + + depends = None + test_file = None + + def call(self, **kwargs): + w = self.w + req = self._cw + w(u''' + + + + + + + + ''') + w(u'') + w(u'') + w(u'') + + for dep in self.depends: + w(u' \n' % dep) + + w(u' ') + w(u' ' % self.test_file) + w(u''' + +
+
+ + ''') + + +class CWDevtoolsStaticController(StaticFileController): + __regid__ = 'devtools' + + def publish(self, rset=None): + staticdir = osp.join(osp.dirname(__file__), 'data') + relpath = self.relpath[len(self.__regid__) + 1:] + return self.static_file(osp.join(staticdir, relpath)) + + +STATIC_CONTROLLERS.append(CWDevtoolsStaticController) + + +if __name__ == '__main__': + unittest_main()