web/test/unittest_application.py
changeset 0 b97547f5f1fa
child 1398 5fe84a5f7035
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/web/test/unittest_application.py	Wed Nov 05 15:52:50 2008 +0100
@@ -0,0 +1,391 @@
+# -*- coding: iso-8859-1 -*-
+"""unit tests for cubicweb.web.application"""
+
+from logilab.common.testlib import TestCase, unittest_main
+import base64, Cookie
+
+import sys
+from urllib import unquote
+from logilab.common.decorators import clear_cache
+
+from cubicweb.web import Redirect, AuthenticationError, ExplicitLogin, INTERNAL_FIELD_VALUE
+from cubicweb.web.views.basecontrollers import ViewController
+from cubicweb.devtools._apptest import FakeRequest
+
+class FakeMapping:
+    """emulates a mapping module"""
+    def __init__(self):
+        self.ENTITIES_MAP = {}
+        self.ATTRIBUTES_MAP = {}
+        self.RELATIONS_MAP = {}
+
+class MockCursor:
+    def __init__(self):
+        self.executed = []
+    def execute(self, rql, args=None, cachekey=None):
+        args = args or {}
+        self.executed.append(rql % args)
+
+
+class FakeController(ViewController):
+
+    def __init__(self, form=None):
+        self.req = FakeRequest()
+        self.req.form = form or {}
+        self._cursor = self.req.cursor = MockCursor()
+
+    def new_cursor(self):
+        self._cursor = self.req.cursor = MockCursor()
+
+    def set_form(self, form):
+        self.req.form = form
+
+
+class RequestBaseTC(TestCase):
+    def setUp(self):
+        self.req = FakeRequest()
+        
+
+    def test_list_arg(self):
+        """tests the list_arg() function"""
+        list_arg = self.req.list_form_param
+        self.assertEquals(list_arg('arg3', {}), [])
+        d = {'arg1' : "value1",
+             'arg2' : ('foo', INTERNAL_FIELD_VALUE,),
+             'arg3' : ['bar']}
+        self.assertEquals(list_arg('arg1', d, True), ['value1'])
+        self.assertEquals(d, {'arg2' : ('foo', INTERNAL_FIELD_VALUE), 'arg3' : ['bar'],})
+        self.assertEquals(list_arg('arg2', d, True), ['foo'])
+        self.assertEquals({'arg3' : ['bar'],}, d)
+        self.assertEquals(list_arg('arg3', d), ['bar',])
+        self.assertEquals({'arg3' : ['bar'],}, d)
+
+
+    def test_from_controller(self):
+        self.assertEquals(self.req.from_controller(), 'view')
+        req = FakeRequest(url='project?vid=list')
+        # this assertion is just to make sure that relative_path can be
+        # correctly computed as it is used in from_controller()
+        self.assertEquals(req.relative_path(False), 'project')
+        self.assertEquals(req.from_controller(), 'view')
+        # test on a valid non-view controller
+        req = FakeRequest(url='login?x=1&y=2')
+        self.assertEquals(req.relative_path(False), 'login')
+        self.assertEquals(req.from_controller(), 'login')
+
+        
+class UtilsTC(TestCase):
+    """test suite for misc application utilities"""
+
+    def setUp(self):
+        self.ctrl = FakeController()
+    
+    #def test_which_mapping(self):
+    #    """tests which mapping is used (application or core)"""
+    #    init_mapping()
+    #    from cubicweb.common import mapping
+    #    self.assertEquals(mapping.MAPPING_USED, 'core')
+    #    sys.modules['mapping'] = FakeMapping()
+    #    init_mapping()
+    #    self.assertEquals(mapping.MAPPING_USED, 'application')
+    #    del sys.modules['mapping']
+
+    def test_execute_linkto(self):
+        """tests the execute_linkto() function"""
+        self.assertEquals(self.ctrl.execute_linkto(), None)
+        self.assertEquals(self.ctrl._cursor.executed,
+                          [])
+        
+        self.ctrl.set_form({'__linkto' : 'works_for:12_13_14:object',
+                              'eid': 8})
+        self.ctrl.execute_linkto()
+        self.assertEquals(self.ctrl._cursor.executed,
+                          ['SET Y works_for X WHERE X eid 8, Y eid %s' % i
+                           for i in (12, 13, 14)])
+
+        self.ctrl.new_cursor()
+        self.ctrl.set_form({'__linkto' : 'works_for:12_13_14:subject',
+                              'eid': 8})
+        self.ctrl.execute_linkto()
+        self.assertEquals(self.ctrl._cursor.executed,
+                          ['SET X works_for Y WHERE X eid 8, Y eid %s' % i
+                           for i in (12, 13, 14)])
+        
+
+        self.ctrl.new_cursor()
+        self.ctrl.req.form = {'__linkto' : 'works_for:12_13_14:object'}
+        self.ctrl.execute_linkto(eid=8)
+        self.assertEquals(self.ctrl._cursor.executed,
+                          ['SET Y works_for X WHERE X eid 8, Y eid %s' % i
+                           for i in (12, 13, 14)])
+
+        self.ctrl.new_cursor()
+        self.ctrl.set_form({'__linkto' : 'works_for:12_13_14:subject'})
+        self.ctrl.execute_linkto(eid=8)
+        self.assertEquals(self.ctrl._cursor.executed,
+                          ['SET X works_for Y WHERE X eid 8, Y eid %s' % i
+                           for i in (12, 13, 14)])
+
+
+from cubicweb.devtools.apptest import EnvBasedTC
+
+
+class ApplicationTC(EnvBasedTC):
+
+    def publish(self, req, path='view'):
+        return self.app.publish(path, req)
+
+    def expect_redirect(self, callback, req):
+        try:
+            res = callback(req)
+            print res
+        except Redirect, ex:
+            try:
+                path, params = ex.location.split('?', 1)
+            except ValueError:
+                path = ex.location
+                params = {}
+            else:
+                cleanup = lambda p: (p[0], unquote(p[1]))
+                params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p)
+            path = path[len(req.base_url()):]
+            return path, params
+        else:
+            self.fail('expected a Redirect exception')
+        
+    def expect_redirect_publish(self, req, path='view'):
+        return self.expect_redirect(lambda x: self.publish(x, path), req)
+    
+    def test_cnx_user_groups_sync(self):
+        user = self.user()
+        self.assertEquals(user.groups, set(('managers',)))
+        self.execute('SET X in_group G WHERE X eid %s, G name "guests"' % user.eid)
+        user = self.user()
+        self.assertEquals(user.groups, set(('managers',)))
+        self.commit()
+        user = self.user()
+        self.assertEquals(user.groups, set(('managers', 'guests')))
+        # cleanup
+        self.execute('DELETE X in_group G WHERE X eid %s, G name "guests"' % user.eid)
+        self.commit()
+    
+    def test_nonregr_publish1(self):
+        req = self.request(u'EEType X WHERE X final FALSE, X meta FALSE')
+        self.app.publish('view', req)
+        
+    def test_nonregr_publish2(self):
+        req = self.request(u'Any count(N) WHERE N todo_by U, N is Note, U eid %s'
+                           % self.user().eid)
+        self.app.publish('view', req)
+        
+    def test_publish_validation_error(self):
+        req = self.request()
+        user = self.user()
+        req.form = {
+            'eid':       `user.eid`,
+            '__type:'+`user.eid`:    'EUser',
+            'login:'+`user.eid`:     '', # ERROR: no login specified
+            'edits-login:'+`user.eid`: unicode(user.login),
+             # just a sample, missing some necessary information for real life
+            '__errorurl': 'view?vid=edition...'
+            }
+        path, params = self.expect_redirect_publish(req, 'edit')
+        forminfo = req.get_session_data('view?vid=edition...')
+        eidmap = forminfo['eidmap']
+        self.assertEquals(eidmap, {})
+        values = forminfo['values']
+        self.assertEquals(values['login:'+`user.eid`], '')
+        self.assertEquals(values['edits-login:'+`user.eid`], user.login)
+        self.assertEquals(values['eid'], `user.eid`)
+        errors = forminfo['errors']
+        self.assertEquals(errors.entity, user.eid)
+        self.assertEquals(errors.errors['login'], 'required attribute')
+
+
+    def test_validation_error_dont_loose_subentity_data(self):
+        """test creation of two linked entities
+        """        
+        req = self.request()
+        form = {'eid': ['X', 'Y'],
+                '__type:X': 'EUser',
+                # missing required field
+                'login:X': u'', 'edits-login:X': '', 
+                'surname:X': u'Mr Ouaoua', 'edits-surname:X': '',
+                '__type:Y': 'EmailAddress',
+                # but email address is set
+                'address:Y': u'bougloup@logilab.fr', 'edits-address:Y': '',
+                'alias:Y': u'', 'edits-alias:Y': '',
+                'use_email:X': 'Y', 'edits-use_email:X': INTERNAL_FIELD_VALUE,
+                # necessary to get validation error handling
+                '__errorurl': 'view?vid=edition...',
+                }
+        req.form = form
+        # monkey patch edited_eid to ensure both entities are edited, not only X
+        req.edited_eids = lambda : ('Y', 'X')
+        path, params = self.expect_redirect_publish(req, 'edit')
+        forminfo = req.get_session_data('view?vid=edition...')
+        self.assertUnorderedIterableEquals(forminfo['eidmap'].keys(), ['X', 'Y'])
+        self.assertEquals(forminfo['errors'].entity, forminfo['eidmap']['X'])
+        self.assertEquals(forminfo['errors'].errors, {'login': 'required attribute',
+                                                      'upassword': 'required attribute'})
+        self.assertEquals(forminfo['values'], form)
+        
+    def _test_cleaned(self, kwargs, injected, cleaned):
+        req = self.request(**kwargs)
+        page = self.app.publish('view', req)
+        self.failIf(injected in page, (kwargs, injected))
+        self.failUnless(cleaned in page, (kwargs, cleaned))
+        
+    def test_nonregr_script_kiddies(self):
+        """test against current script injection"""
+        injected = '<i>toto</i>'
+        cleaned = 'toto'
+        for kwargs in ({'__message': injected},
+                       {'vid': injected},
+                       {'vtitle': injected},
+                       ):
+            yield self._test_cleaned, kwargs, injected, cleaned
+        
+    def test_site_wide_eproperties_sync(self):
+        # XXX work in all-in-one configuration but not in twisted for instance
+        # in which case we need a kindof repo -> http server notification
+        # protocol
+        vreg = self.app.vreg
+        # default value
+        self.assertEquals(vreg.property_value('ui.language'), 'en')
+        self.execute('INSERT EProperty X: X value "fr", X pkey "ui.language"')
+        self.assertEquals(vreg.property_value('ui.language'), 'en')
+        self.commit()
+        self.assertEquals(vreg.property_value('ui.language'), 'fr')
+        self.execute('SET X value "de" WHERE X pkey "ui.language"')
+        self.assertEquals(vreg.property_value('ui.language'), 'fr')
+        self.commit()
+        self.assertEquals(vreg.property_value('ui.language'), 'de')
+        self.execute('DELETE EProperty X WHERE X pkey "ui.language"')
+        self.assertEquals(vreg.property_value('ui.language'), 'de')
+        self.commit()
+        self.assertEquals(vreg.property_value('ui.language'), 'en')
+
+    def test_fb_login_concept(self):
+        """see data/views.py"""
+        self.set_option('auth-mode', 'cookie')
+        self.set_option('anonymous-user', 'anon')
+        self.login('anon')
+        req = self.request()
+        origcnx = req.cnx
+        req.form['__fblogin'] = u'turlututu'
+        page = self.publish(req)
+        self.failIf(req.cnx is origcnx)
+        self.assertEquals(req.user.login, 'turlututu')
+        self.failUnless('turlututu' in page, page)
+        
+    # authentication tests ####################################################
+
+    def _init_auth(self, authmode, anonuser=None):
+        self.set_option('auth-mode', authmode)
+        self.set_option('anonymous-user', anonuser)
+        req = self.request()
+        origcnx = req.cnx
+        req.cnx = None
+        sh = self.app.session_handler
+        # not properly cleaned between tests
+        self.open_sessions = sh.session_manager._sessions = {} 
+        return req, origcnx
+
+    def _test_auth_succeed(self, req, origcnx):
+        sh = self.app.session_handler
+        path, params = self.expect_redirect(lambda x: self.app.connect(x), req)
+        cnx = req.cnx
+        self.assertEquals(len(self.open_sessions), 1, self.open_sessions)
+        self.assertEquals(cnx.login, origcnx.login)
+        self.assertEquals(cnx.password, origcnx.password)
+        self.assertEquals(cnx.anonymous_connection, False) 
+        self.assertEquals(path, 'view')
+        self.assertEquals(params, {'__message': 'welcome %s !' % origcnx.login})
+    
+    def _test_auth_fail(self, req):
+        self.assertRaises(AuthenticationError, self.app.connect, req)
+        self.assertEquals(req.cnx, None)
+        self.assertEquals(len(self.open_sessions), 0) 
+        clear_cache(req, 'get_authorization')
+        
+    def test_http_auth_no_anon(self):
+        req, origcnx = self._init_auth('http')
+        self._test_auth_fail(req)
+        self.assertRaises(ExplicitLogin, self.publish, req, 'login')
+        self.assertEquals(req.cnx, None)
+        authstr = base64.encodestring('%s:%s' % (origcnx.login, origcnx.password))
+        req._headers['Authorization'] = 'basic %s' % authstr
+        self._test_auth_succeed(req, origcnx)
+        self.assertRaises(AuthenticationError, self.publish, req, 'logout')
+        self.assertEquals(len(self.open_sessions), 0) 
+
+    def test_cookie_auth_no_anon(self):
+        req, origcnx = self._init_auth('cookie')
+        self._test_auth_fail(req)
+        form = self.publish(req, 'login')
+        self.failUnless('__login' in form)
+        self.failUnless('__password' in form)
+        self.assertEquals(req.cnx, None)
+        req.form['__login'] = origcnx.login
+        req.form['__password'] = origcnx.password
+        self._test_auth_succeed(req, origcnx)
+        self.assertRaises(AuthenticationError, self.publish, req, 'logout')
+        self.assertEquals(len(self.open_sessions), 0) 
+
+    def _test_auth_anon(self, req):
+        self.app.connect(req)
+        acnx = req.cnx
+        self.assertEquals(len(self.open_sessions), 1)
+        self.assertEquals(acnx.login, 'anon') 
+        self.assertEquals(acnx.password, 'anon') 
+        self.failUnless(acnx.anonymous_connection)
+        self._reset_cookie(req)
+        
+    def _reset_cookie(self, req):
+        # preparing the suite of the test
+        # set session id in cookie
+        cookie = Cookie.SimpleCookie()
+        cookie['__session'] = req.cnx.sessionid
+        req._headers['Cookie'] = cookie['__session'].OutputString()
+        clear_cache(req, 'get_authorization')
+        # reset cnx as if it was a new incoming request
+        req.cnx = None
+        
+    def _test_anon_auth_fail(self, req):
+        self.assertEquals(len(self.open_sessions), 1) 
+        self.app.connect(req)
+        self.assertEquals(req.message, 'authentication failure')
+        self.assertEquals(req.cnx.anonymous_connection, True)
+        self.assertEquals(len(self.open_sessions), 1) 
+        self._reset_cookie(req)
+        
+    def test_http_auth_anon_allowed(self):
+        req, origcnx = self._init_auth('http', 'anon')
+        self._test_auth_anon(req)
+        authstr = base64.encodestring('toto:pouet')
+        req._headers['Authorization'] = 'basic %s' % authstr
+        self._test_anon_auth_fail(req)
+        authstr = base64.encodestring('%s:%s' % (origcnx.login, origcnx.password))
+        req._headers['Authorization'] = 'basic %s' % authstr
+        self._test_auth_succeed(req, origcnx)
+        self.assertRaises(AuthenticationError, self.publish, req, 'logout')
+        self.assertEquals(len(self.open_sessions), 0) 
+        
+    def test_cookie_auth_anon_allowed(self):
+        req, origcnx = self._init_auth('cookie', 'anon')
+        self._test_auth_anon(req)
+        req.form['__login'] = 'toto'
+        req.form['__password'] = 'pouet'
+        self._test_anon_auth_fail(req)
+        req.form['__login'] = origcnx.login
+        req.form['__password'] = origcnx.password
+        self._test_auth_succeed(req, origcnx)
+        self.assertRaises(AuthenticationError, self.publish, req, 'logout')
+        self.assertEquals(len(self.open_sessions), 0) 
+
+    
+
+        
+if __name__ == '__main__':
+    unittest_main()