devtools/httptest.py
brancholdstable
changeset 6665 90f2f20367bc
parent 6438 abae10f81a85
child 7059 1d65b235549f
child 7065 6a6ea9966931
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/devtools/httptest.py	Wed Nov 03 16:38:28 2010 +0100
@@ -0,0 +1,197 @@
+# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
+#
+# This file is part of CubicWeb.
+#
+# CubicWeb is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License along
+# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
+"""this module contains base classes and utilities for integration with running
+http server
+"""
+from __future__ import with_statement
+
+__docformat__ = "restructuredtext en"
+
+import threading
+import socket
+import httplib
+from urlparse import urlparse
+
+from twisted.internet import reactor, error
+
+from cubicweb.etwist.server import run
+from cubicweb.devtools.testlib import CubicWebTC
+from cubicweb.devtools import ApptestConfiguration
+
+
+def get_available_port(ports_scan):
+    """return the first available port from the given ports range
+
+    Try to connect port by looking for refused connection (111) or transport
+    endpoint already connected (106) errors
+
+    Raise a RuntimeError if no port can be found
+
+    :type ports_range: list
+    :param ports_range: range of ports to test
+    :rtype: int
+
+    .. see:: :func:`test.test_support.bind_port`
+    """
+    for port in ports_scan:
+        try:
+            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            sock = s.connect(("localhost", port))
+        except socket.error, err:
+            if err.args[0] in (111, 106):
+                return port
+        finally:
+            s.close()
+    raise RuntimeError('get_available_port([ports_range]) cannot find an available port')
+
+
+class CubicWebServerConfig(ApptestConfiguration):
+    """basic configuration class for configuring test server
+
+    Class attributes:
+
+    * `ports_range`: list giving range of http ports to test (range(7000, 8000)
+      by default). The first port found as available in `ports_range` will be
+      used to launch the test web server.
+
+    """
+    ports_range = range(7000, 8000)
+
+    def default_base_url(self):
+        port = self['port'] or get_available_port(self.ports_range)
+        self.global_set_option('port', port) # force rewrite here
+        return 'http://127.0.0.1:%d/' % self['port']
+
+    def pyro_enabled(self):
+        return False
+
+    def load_configuration(self):
+        super(CubicWebServerConfig, self).load_configuration()
+        self.global_set_option('force-html-content-type', True)
+
+
+class CubicWebServerTC(CubicWebTC):
+    """Class for running test web server. See :class:`CubicWebServerConfig`.
+
+    Class attributes:
+    * ` anonymous_logged`: flag telling ifs anonymous user should be log logged
+      by default (True by default)
+    """
+    configcls = CubicWebServerConfig
+    # anonymous is logged by default in cubicweb test cases
+    anonymous_logged = True
+
+    def start_server(self):
+        # use a semaphore to avoid starting test while the http server isn't
+        # fully initilialized
+        semaphore = threading.Semaphore(0)
+        def safe_run(*args, **kwargs):
+            try:
+                run(*args, **kwargs)
+            finally:
+                semaphore.release()
+
+        reactor.addSystemEventTrigger('after', 'startup', semaphore.release)
+        t = threading.Thread(target=safe_run, name='cubicweb_test_web_server',
+                             args=(self.config, self.vreg, True))
+        self.web_thread = t
+        t.start()
+        semaphore.acquire()
+        if not self.web_thread.isAlive():
+            # XXX race condition with actual thread death
+            raise RuntimeError('Could not start the web server')
+        #pre init utils connection
+        parseurl = urlparse(self.config['base-url'])
+        assert parseurl.port == self.config['port'], (self.config['base-url'], self.config['port'])
+        self._web_test_cnx = httplib.HTTPConnection(parseurl.hostname,
+                                                    parseurl.port)
+        self._ident_cookie = None
+
+    def stop_server(self, timeout=15):
+        """Stop the webserver, waiting for the thread to return"""
+        if self._web_test_cnx is None:
+            self.web_logout()
+            self._web_test_cnx.close()
+        try:
+            reactor.stop()
+            self.web_thread.join(timeout)
+            assert not self.web_thread.isAlive()
+
+        finally:
+            reactor.__init__()
+
+    def web_login(self, user=None, passwd=None):
+        """Log the current http session for the provided credential
+
+        If no user is provided, admin connection are used.
+        """
+        if user is None:
+            user  = self.admlogin
+            passwd = self.admpassword
+        if passwd is None:
+            passwd = user
+        self.login(user)
+        response = self.web_get("?__login=%s&__password=%s" %
+                                (user, passwd))
+        assert response.status == httplib.SEE_OTHER, response.status
+        self._ident_cookie = response.getheader('Set-Cookie')
+        assert self._ident_cookie
+        return True
+
+    def web_logout(self, user='admin', pwd=None):
+        """Log out current http user"""
+        if self._ident_cookie is not None:
+            response = self.web_get('logout')
+        self._ident_cookie = None
+
+    def web_get(self, path='', headers=None):
+        """Return an httplib.HTTPResponse object for the specified path
+
+        Use available credential if available.
+        """
+        if headers is None:
+            headers = {}
+        if self._ident_cookie is not None:
+            assert 'Cookie' not in headers
+            headers['Cookie'] = self._ident_cookie
+        self._web_test_cnx.request("GET", '/' + path, headers=headers)
+        response = self._web_test_cnx.getresponse()
+        response.body = response.read() # to chain request
+        response.read = lambda : response.body
+        return response
+
+    def setUp(self):
+        CubicWebTC.setUp(self)
+        self.start_server()
+
+    def tearDown(self):
+        try:
+            self.stop_server()
+        except error.ReactorNotRunning, err:
+            # Server could be launched manually
+            print err
+        CubicWebTC.tearDown(self)
+
+    @classmethod
+    def init_config(cls, config):
+        super(CubicWebServerTC, cls).init_config(config)
+        if not cls.anonymous_logged:
+            config.global_set_option('anonymous-user', None)
+        else:
+            config.global_set_option('anonymous-user', 'anon')
+            config.global_set_option('anonymous-password', 'anon')