Fix POST handling.
authorChristophe de Vienne <christophe@unlish.com>
Thu, 18 Sep 2014 15:07:02 +0200
changeset 11508 ef8b9021b47b
parent 11507 4d7286f079e1
child 11509 ca3412269cd1
Fix POST handling. The issues where revealed by the unittests, which are ported from the cubicweb wsgi tests.
.hgignore
pyramid_cubicweb/core.py
pyramid_cubicweb/tests/__init__.py
pyramid_cubicweb/tests/data/bootstrap_cubes
pyramid_cubicweb/tests/test_bw_request.py
--- a/.hgignore	Thu Sep 18 11:43:45 2014 +0200
+++ b/.hgignore	Thu Sep 18 15:07:02 2014 +0200
@@ -4,3 +4,5 @@
 *.swp
 
 *.egg-info
+
+pyramid_cubicweb/tests/data/database
--- a/pyramid_cubicweb/core.py	Thu Sep 18 11:43:45 2014 +0200
+++ b/pyramid_cubicweb/core.py	Thu Sep 18 15:07:02 2014 +0200
@@ -1,5 +1,6 @@
 from contextlib import contextmanager
 from warnings import warn
+from cgi import FieldStorage
 
 import rql
 
@@ -65,6 +66,29 @@
         super(CubicWebPyramidRequest, self).__init__(vreg, https, post,
                                                      headers=headers_in)
 
+        self.content = request.body_file_seekable
+
+    def setup_params(self, params):
+        self.form = {}
+        for param, val in params.iteritems():
+            if param in self.no_script_form_params and val:
+                val = self.no_script_form_param(param, val)
+            if isinstance(val, FieldStorage) and val.file:
+                val = (val.filename, val.file)
+            if param == '_cwmsgid':
+                self.set_message_id(val)
+            elif param == '__message':
+                warn('[3.13] __message in request parameter is deprecated '
+                     '(may only be given to .build_url). Seeing this message '
+                     'usualy means your application hold some <form> where '
+                     'you should replace use of __message hidden input by '
+                     'form.set_message, so new _cwmsgid mechanism is properly '
+                     'used',
+                     DeprecationWarning)
+                self.set_message(val)
+            else:
+                self.form[param] = val
+
     def is_secure(self):
         return self._request.scheme == 'https'
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pyramid_cubicweb/tests/data/bootstrap_cubes	Thu Sep 18 15:07:02 2014 +0200
@@ -0,0 +1,1 @@
+pyramid
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pyramid_cubicweb/tests/test_bw_request.py	Thu Sep 18 15:07:02 2014 +0200
@@ -0,0 +1,121 @@
+# -*- coding: utf8 -*-
+from StringIO import StringIO
+
+import webtest
+import pyramid.request
+
+from cubicweb.devtools.webtest import CubicWebTestTC
+
+from pyramid_cubicweb import make_cubicweb_application
+from pyramid_cubicweb.core import CubicWebPyramidRequest
+
+
+class PyramidCWTest(CubicWebTestTC):
+    @classmethod
+    def init_config(cls, config):
+        super(PyramidCWTest, cls).init_config(config)
+        config.global_set_option('https-url', 'https://localhost.local/')
+        config['pyramid-auth-secret'] = 'authsecret'
+        config['pyramid-session-secret'] = 'sessionsecret'
+
+    def setUp(self):
+        # Skip CubicWebTestTC setUp
+        super(CubicWebTestTC, self).setUp()
+        config = make_cubicweb_application(self.config)
+        self.pyr_registry = config.registry
+        self.webapp = webtest.TestApp(config.make_wsgi_app())
+
+
+class WSGIAppTest(PyramidCWTest):
+    def make_request(self, path, environ=None, **kw):
+        r = webtest.app.TestRequest.blank(path, environ, **kw)
+
+        request = pyramid.request.Request(r.environ)
+        request.registry = self.pyr_registry
+
+        return request
+
+    def test_content_type(self):
+        req = CubicWebPyramidRequest(
+            self.make_request('/', {'CONTENT_TYPE': 'text/plain'}))
+
+        self.assertEqual('text/plain', req.get_header('Content-Type'))
+
+    def test_content_body(self):
+        req = CubicWebPyramidRequest(
+            self.make_request('/', {
+                'CONTENT_LENGTH': 12,
+                'CONTENT_TYPE': 'text/plain',
+                'wsgi.input': StringIO('some content')}))
+
+        self.assertEqual('some content', req.content.read())
+
+    def test_http_scheme(self):
+        req = CubicWebPyramidRequest(
+            self.make_request('/', {
+                'wsgi.url_scheme': 'http'}))
+
+        self.assertFalse(req.https)
+
+    def test_https_scheme(self):
+        req = CubicWebPyramidRequest(
+            self.make_request('/', {
+                'wsgi.url_scheme': 'https'}))
+
+        self.assertTrue(req.https)
+
+    def test_https_prefix(self):
+        r = self.webapp.get('/https/')
+        self.assertIn('https://', r.body)
+
+    def test_big_content(self):
+        content = 'x'*100001
+
+        req = CubicWebPyramidRequest(
+            self.make_request('/', {
+                'CONTENT_LENGTH': len(content),
+                'CONTENT_TYPE': 'text/plain',
+                'wsgi.input': StringIO(content)}))
+
+        self.assertEqual(content, req.content.read())
+
+    def test_post(self):
+        self.webapp.post(
+            '/',
+            params={'__login': self.admlogin, '__password': self.admpassword})
+
+    def test_get_multiple_variables(self):
+        req = CubicWebPyramidRequest(
+            self.make_request('/?arg=1&arg=2'))
+
+        self.assertEqual([u'1', u'2'], req.form['arg'])
+
+    def test_post_multiple_variables(self):
+        req = CubicWebPyramidRequest(
+            self.make_request('/', POST='arg=1&arg=2'))
+
+        self.assertEqual([u'1', u'2'], req.form['arg'])
+
+    def test_post_files(self):
+        content_type, params = self.webapp.encode_multipart(
+            (), (('filefield', 'aname', 'acontent'),))
+        req = CubicWebPyramidRequest(
+            self.make_request('/', POST=params, content_type=content_type))
+        self.assertIn('filefield', req.form)
+        fieldvalue = req.form['filefield']
+        self.assertEqual(u'aname', fieldvalue[0])
+        self.assertEqual('acontent', fieldvalue[1].read())
+
+    def test_post_unicode_urlencoded(self):
+        params = 'arg=%C3%A9'
+        req = CubicWebPyramidRequest(
+            self.make_request(
+                '/', POST=params,
+                content_type='application/x-www-form-urlencoded'))
+        self.assertEqual(u"é", req.form['arg'])
+
+    @classmethod
+    def init_config(cls, config):
+        super(WSGIAppTest, cls).init_config(config)
+        config.https_uiprops = None
+        config.https_datadir_url = None