[R] dataimport: refine error detection
Consider only None as non-expected value after parsing the input file.
False is now allowed because of the boolean yesno checker that returns False by default.
# -*- coding: iso-8859-1 -*-"""unit tests for cubicweb.web.application:organization: Logilab:copyright: 2001-2010 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses"""importbase64,Cookieimportsysfromurllibimportunquotefromlogilab.common.testlibimportTestCase,unittest_mainfromlogilab.common.decoratorsimportclear_cachefromcubicweb.devtools.testlibimportCubicWebTCfromcubicweb.devtools.fakeimportFakeRequestfromcubicweb.webimportRedirect,AuthenticationError,ExplicitLogin,INTERNAL_FIELD_VALUEfromcubicweb.web.views.basecontrollersimportViewControllerclassFakeMapping:"""emulates a mapping module"""def__init__(self):self.ENTITIES_MAP={}self.ATTRIBUTES_MAP={}self.RELATIONS_MAP={}classMockCursor:def__init__(self):self.executed=[]defexecute(self,rql,args=None,cachekey=None):args=argsor{}self.executed.append(rql%args)classFakeController(ViewController):def__init__(self,form=None):self._cw=FakeRequest()self._cw.form=formor{}self._cursor=self._cw.cursor=MockCursor()defnew_cursor(self):self._cursor=self._cw.cursor=MockCursor()defset_form(self,form):self._cw.form=formclassRequestBaseTC(TestCase):defsetUp(self):self._cw=FakeRequest()deftest_list_arg(self):"""tests the list_arg() function"""list_arg=self._cw.list_form_paramself.assertEquals(list_arg('arg3',{}),[])d={'arg1':"value1",'arg2':('foo',INTERNAL_FIELD_VALUE,),'arg3':['bar']}self.assertEquals(list_arg('arg1',d,True),['value1'])self.assertEquals(d,{'arg2':('foo',INTERNAL_FIELD_VALUE),'arg3':['bar'],})self.assertEquals(list_arg('arg2',d,True),['foo'])self.assertEquals({'arg3':['bar'],},d)self.assertEquals(list_arg('arg3',d),['bar',])self.assertEquals({'arg3':['bar'],},d)deftest_from_controller(self):self._cw.vreg['controllers']={'view':1,'login':1}self.assertEquals(self._cw.from_controller(),'view')req=FakeRequest(url='project?vid=list')req.vreg['controllers']={'view':1,'login':1}# this assertion is just to make sure that relative_path can be# correctly computed as it is used in from_controller()self.assertEquals(req.relative_path(False),'project')self.assertEquals(req.from_controller(),'view')# test on a valid non-view controllerreq=FakeRequest(url='login?x=1&y=2')req.vreg['controllers']={'view':1,'login':1}self.assertEquals(req.relative_path(False),'login')self.assertEquals(req.from_controller(),'login')classUtilsTC(TestCase):"""test suite for misc application utilities"""defsetUp(self):self.ctrl=FakeController()#def test_which_mapping(self):# """tests which mapping is used (application or core)"""# init_mapping()# from cubicweb.common import mapping# self.assertEquals(mapping.MAPPING_USED, 'core')# sys.modules['mapping'] = FakeMapping()# init_mapping()# self.assertEquals(mapping.MAPPING_USED, 'application')# del sys.modules['mapping']deftest_execute_linkto(self):"""tests the execute_linkto() function"""self.assertEquals(self.ctrl.execute_linkto(),None)self.assertEquals(self.ctrl._cursor.executed,[])self.ctrl.set_form({'__linkto':'works_for:12_13_14:object','eid':8})self.ctrl.execute_linkto()self.assertEquals(self.ctrl._cursor.executed,['SET Y works_for X WHERE X eid 8, Y eid %s'%iforiin(12,13,14)])self.ctrl.new_cursor()self.ctrl.set_form({'__linkto':'works_for:12_13_14:subject','eid':8})self.ctrl.execute_linkto()self.assertEquals(self.ctrl._cursor.executed,['SET X works_for Y WHERE X eid 8, Y eid %s'%iforiin(12,13,14)])self.ctrl.new_cursor()self.ctrl._cw.form={'__linkto':'works_for:12_13_14:object'}self.ctrl.execute_linkto(eid=8)self.assertEquals(self.ctrl._cursor.executed,['SET Y works_for X WHERE X eid 8, Y eid %s'%iforiin(12,13,14)])self.ctrl.new_cursor()self.ctrl.set_form({'__linkto':'works_for:12_13_14:subject'})self.ctrl.execute_linkto(eid=8)self.assertEquals(self.ctrl._cursor.executed,['SET X works_for Y WHERE X eid 8, Y eid %s'%iforiin(12,13,14)])classApplicationTC(CubicWebTC):defsetUp(self):super(ApplicationTC,self).setUp()defraise_hdlr(*args,**kwargs):raiseself.app.error_handler=raise_hdlrdeftest_cnx_user_groups_sync(self):user=self.user()self.assertEquals(user.groups,set(('managers',)))self.execute('SET X in_group G WHERE X eid %s, G name "guests"'%user.eid)user=self.user()self.assertEquals(user.groups,set(('managers',)))self.commit()user=self.user()self.assertEquals(user.groups,set(('managers','guests')))# cleanupself.execute('DELETE X in_group G WHERE X eid %s, G name "guests"'%user.eid)self.commit()deftest_nonregr_publish1(self):req=self.request(u'CWEType X WHERE X final FALSE, X meta FALSE')self.app.publish('view',req)deftest_nonregr_publish2(self):req=self.request(u'Any count(N) WHERE N todo_by U, N is Note, U eid %s'%self.user().eid)self.app.publish('view',req)deftest_publish_validation_error(self):req=self.request()user=self.user()eid=unicode(user.eid)req.form={'eid':eid,'__type:'+eid:'CWUser','_cw_edited_fields:'+eid:'login-subject','login-subject:'+eid:'',# ERROR: no login specified# just a sample, missing some necessary information for real life'__errorurl':'view?vid=edition...'}path,params=self.expect_redirect(lambdax:self.app_publish(x,'edit'),req)forminfo=req.get_session_data('view?vid=edition...')eidmap=forminfo['eidmap']self.assertEquals(eidmap,{})values=forminfo['values']self.assertEquals(values['login-subject:'+eid],'')self.assertEquals(values['eid'],eid)error=forminfo['error']self.assertEquals(error.entity,user.eid)self.assertEquals(error.errors['login'],'required attribute')deftest_validation_error_dont_loose_subentity_data(self):"""test creation of two linked entities """req=self.request()form={'eid':['X','Y'],'__maineid':'X','__type:X':'CWUser','_cw_edited_fields:X':'login-subject,surname-subject',# missing required field'login-subject:X':u'','surname-subject:X':u'Mr Ouaoua',# but email address is set'__type:Y':'EmailAddress','_cw_edited_fields:Y':'address-subject,alias-subject,use_email-object','address-subject:Y':u'bougloup@logilab.fr','alias-subject:Y':u'','use_email-object:Y':'X',# necessary to get validation error handling'__errorurl':'view?vid=edition...',}req.form=form# monkey patch edited_eid to ensure both entities are edited, not only Xreq.edited_eids=lambda:('Y','X')path,params=self.expect_redirect(lambdax:self.app_publish(x,'edit'),req)forminfo=req.get_session_data('view?vid=edition...')self.assertEquals(set(forminfo['eidmap']),set('XY'))self.assertEquals(forminfo['error'].entity,forminfo['eidmap']['X'])self.assertEquals(forminfo['error'].errors,{'login':'required attribute','upassword':'required attribute'})self.assertEquals(forminfo['values'],form)def_test_cleaned(self,kwargs,injected,cleaned):req=self.request(**kwargs)page=self.app.publish('view',req)self.failIf(injectedinpage,(kwargs,injected))self.failUnless(cleanedinpage,(kwargs,cleaned))deftest_nonregr_script_kiddies(self):"""test against current script injection"""injected='<i>toto</i>'cleaned='toto'forkwargsin({'__message':injected},{'vid':injected},{'vtitle':injected},):yieldself._test_cleaned,kwargs,injected,cleaneddeftest_site_wide_eproperties_sync(self):# XXX work in all-in-one configuration but not in twisted for instance# in which case we need a kindof repo -> http server notification# protocolvreg=self.app.vreg# default valueself.assertEquals(vreg.property_value('ui.language'),'en')self.execute('INSERT CWProperty X: X value "fr", X pkey "ui.language"')self.assertEquals(vreg.property_value('ui.language'),'en')self.commit()self.assertEquals(vreg.property_value('ui.language'),'fr')self.execute('SET X value "de" WHERE X pkey "ui.language"')self.assertEquals(vreg.property_value('ui.language'),'fr')self.commit()self.assertEquals(vreg.property_value('ui.language'),'de')self.execute('DELETE CWProperty X WHERE X pkey "ui.language"')self.assertEquals(vreg.property_value('ui.language'),'de')self.commit()self.assertEquals(vreg.property_value('ui.language'),'en')deftest_fb_login_concept(self):"""see data/views.py"""self.set_option('auth-mode','cookie')self.set_option('anonymous-user','anon')self.login('anon')req=self.request()origcnx=req.cnxreq.form['__fblogin']=u'turlututu'page=self.app_publish(req)self.failIf(req.cnxisorigcnx)self.assertEquals(req.user.login,'turlututu')self.failUnless('turlututu'inpage,page)# authentication tests ####################################################deftest_http_auth_no_anon(self):req,origcnx=self.init_authentication('http')self.assertAuthFailure(req)self.assertRaises(ExplicitLogin,self.app_publish,req,'login')self.assertEquals(req.cnx,None)authstr=base64.encodestring('%s:%s'%(origcnx.login,origcnx.authinfo['password']))req._headers['Authorization']='basic %s'%authstrself.assertAuthSuccess(req,origcnx)self.assertEquals(req.cnx.authinfo,{'password':origcnx.authinfo['password']})self.assertRaises(AuthenticationError,self.app_publish,req,'logout')self.assertEquals(len(self.open_sessions),0)deftest_cookie_auth_no_anon(self):req,origcnx=self.init_authentication('cookie')self.assertAuthFailure(req)form=self.app_publish(req,'login')self.failUnless('__login'inform)self.failUnless('__password'inform)self.assertEquals(req.cnx,None)req.form['__login']=origcnx.loginreq.form['__password']=origcnx.authinfo['password']self.assertAuthSuccess(req,origcnx)self.assertEquals(req.cnx.authinfo,{'password':origcnx.authinfo['password']})self.assertRaises(AuthenticationError,self.app_publish,req,'logout')self.assertEquals(len(self.open_sessions),0)deftest_login_by_email(self):login=self.request().user.loginaddress=login+u'@localhost'self.execute('INSERT EmailAddress X: X address %(address)s, U primary_email X ''WHERE U login %(login)s',{'address':address,'login':login})self.commit()# option allow-email-login not setreq,origcnx=self.init_authentication('cookie')req.form['__login']=addressreq.form['__password']=origcnx.authinfo['password']self.assertAuthFailure(req)# option allow-email-login setorigcnx.login=addressself.set_option('allow-email-login',True)req.form['__login']=addressreq.form['__password']=origcnx.authinfo['password']self.assertAuthSuccess(req,origcnx)self.assertEquals(req.cnx.authinfo,{'password':origcnx.authinfo['password']})self.assertRaises(AuthenticationError,self.app_publish,req,'logout')self.assertEquals(len(self.open_sessions),0)def_reset_cookie(self,req):# preparing the suite of the test# set session id in cookiecookie=Cookie.SimpleCookie()cookie['__session']=req.cnx.sessionidreq._headers['Cookie']=cookie['__session'].OutputString()clear_cache(req,'get_authorization')# reset cnx as if it was a new incoming requestreq.cnx=Nonedef_test_auth_anon(self,req):self.app.connect(req)acnx=req.cnxself.assertEquals(len(self.open_sessions),1)self.assertEquals(acnx.login,'anon')self.assertEquals(acnx.authinfo['password'],'anon')self.failUnless(acnx.anonymous_connection)self._reset_cookie(req)def_test_anon_auth_fail(self,req):self.assertEquals(len(self.open_sessions),1)self.app.connect(req)self.assertEquals(req.message,'authentication failure')self.assertEquals(req.cnx.anonymous_connection,True)self.assertEquals(len(self.open_sessions),1)self._reset_cookie(req)deftest_http_auth_anon_allowed(self):req,origcnx=self.init_authentication('http','anon')self._test_auth_anon(req)authstr=base64.encodestring('toto:pouet')req._headers['Authorization']='basic %s'%authstrself._test_anon_auth_fail(req)authstr=base64.encodestring('%s:%s'%(origcnx.login,origcnx.authinfo['password']))req._headers['Authorization']='basic %s'%authstrself.assertAuthSuccess(req,origcnx)self.assertEquals(req.cnx.authinfo,{'password':origcnx.authinfo['password']})self.assertRaises(AuthenticationError,self.app_publish,req,'logout')self.assertEquals(len(self.open_sessions),0)deftest_cookie_auth_anon_allowed(self):req,origcnx=self.init_authentication('cookie','anon')self._test_auth_anon(req)req.form['__login']='toto'req.form['__password']='pouet'self._test_anon_auth_fail(req)req.form['__login']=origcnx.loginreq.form['__password']=origcnx.authinfo['password']self.assertAuthSuccess(req,origcnx)self.assertEquals(req.cnx.authinfo,{'password':origcnx.authinfo['password']})self.assertRaises(AuthenticationError,self.app_publish,req,'logout')self.assertEquals(len(self.open_sessions),0)deftest_non_regr_optional_first_var(self):req=self.request()# expect a rset with None in [0][0]req.form['rql']='rql:Any OV1, X WHERE X custom_workflow OV1?'self.app_publish(req)if__name__=='__main__':unittest_main()