[notification] avoid leaking cnxsets (closes #3243810)
When sending notifications, we get each recipient as either an email
address or a CWUser. In the latter case, we create a temporary session
for that user and use it to send the mail. However, if we later decided
to not send the mail after all, we'd leak the session and its cnxset.
Add a try block inside the loop to make sure the temporary sessions are
closed properly.
# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""functional tests for server'security"""importsysfromlogilab.common.testlibimportunittest_main,TestCasefromrqlimportRQLExceptionfromcubicweb.devtools.testlibimportCubicWebTCfromcubicwebimportUnauthorized,ValidationError,QueryError,Binaryfromcubicweb.schemaimportERQLExpressionfromcubicweb.server.querierimportcheck_read_accessfromcubicweb.server.utilsimport_CRYPTO_CTXclassBaseSecurityTC(CubicWebTC):defsetup_database(self):super(BaseSecurityTC,self).setup_database()self.create_user(self.request(),'iaminusersgrouponly')hash=_CRYPTO_CTX.encrypt('oldpassword',scheme='des_crypt')self.create_user(self.request(),'oldpassword',password=Binary(hash))classLowLevelSecurityFunctionTC(BaseSecurityTC):deftest_check_read_access(self):rql=u'Personne U where U nom "managers"'rqlst=self.repo.vreg.rqlhelper.parse(rql).children[0]withself.temporary_permissions(Personne={'read':('users','managers')}):self.repo.vreg.solutions(self.session,rqlst,None)solution=rqlst.solutions[0]check_read_access(self.session,rqlst,solution,{})withself.login('anon')ascu:self.assertRaises(Unauthorized,check_read_access,self.session,rqlst,solution,{})self.assertRaises(Unauthorized,cu.execute,rql)deftest_upassword_not_selectable(self):self.assertRaises(Unauthorized,self.execute,'Any X,P WHERE X is CWUser, X upassword P')self.rollback()withself.login('iaminusersgrouponly')ascu:self.assertRaises(Unauthorized,cu.execute,'Any X,P WHERE X is CWUser, X upassword P')deftest_update_password(self):"""Ensure that if a user's password is stored with a deprecated hash, it will be updated on next login"""oldhash=str(self.session.system_sql("SELECT cw_upassword FROM cw_CWUser WHERE cw_login = 'oldpassword'").fetchone()[0])withself.login('oldpassword')ascu:passnewhash=str(self.session.system_sql("SELECT cw_upassword FROM cw_CWUser WHERE cw_login = 'oldpassword'").fetchone()[0])self.assertNotEqual(oldhash,newhash)self.assertTrue(newhash.startswith('$6$'))withself.login('oldpassword')ascu:passself.assertEqual(newhash,str(self.session.system_sql("SELECT cw_upassword FROM cw_CWUser WHERE cw_login = 'oldpassword'").fetchone()[0]))classSecurityRewritingTC(BaseSecurityTC):defhijack_source_execute(self):defsyntax_tree_search(*args,**kwargs):self.query=(args,kwargs)return[]self.repo.system_source.syntax_tree_search=syntax_tree_searchdeftearDown(self):self.repo.system_source.__dict__.pop('syntax_tree_search',None)super(SecurityRewritingTC,self).tearDown()deftest_not_relation_read_security(self):withself.login('iaminusersgrouponly'):self.hijack_source_execute()self.execute('Any U WHERE NOT A todo_by U, A is Affaire')self.assertEqual(self.query[0][1].as_string(),'Any U WHERE NOT EXISTS(A todo_by U), A is Affaire')self.execute('Any U WHERE NOT EXISTS(A todo_by U), A is Affaire')self.assertEqual(self.query[0][1].as_string(),'Any U WHERE NOT EXISTS(A todo_by U), A is Affaire')classSecurityTC(BaseSecurityTC):defsetUp(self):BaseSecurityTC.setUp(self)# implicitly test manager can add some entitiesself.execute("INSERT Affaire X: X sujet 'cool'")self.execute("INSERT Societe X: X nom 'logilab'")self.execute("INSERT Personne X: X nom 'bidule'")self.execute('INSERT CWGroup X: X name "staff"')self.commit()deftest_insert_security(self):withself.login('anon')ascu:cu.execute("INSERT Personne X: X nom 'bidule'")self.assertRaises(Unauthorized,self.commit)self.assertEqual(cu.execute('Personne X').rowcount,1)deftest_insert_rql_permission(self):# test user can only add une affaire related to a societe he ownswithself.login('iaminusersgrouponly')ascu:cu.execute("INSERT Affaire X: X sujet 'cool'")self.assertRaises(Unauthorized,self.commit)# test nothing has actually been insertedself.assertEqual(self.execute('Affaire X').rowcount,1)withself.login('iaminusersgrouponly')ascu:cu.execute("INSERT Affaire X: X sujet 'cool'")cu.execute("INSERT Societe X: X nom 'chouette'")cu.execute("SET A concerne S WHERE A sujet 'cool', S nom 'chouette'")self.commit()deftest_update_security_1(self):withself.login('anon')ascu:# local security checkcu.execute("SET X nom 'bidulechouette' WHERE X is Personne")self.assertRaises(Unauthorized,self.commit)self.assertEqual(self.execute('Personne X WHERE X nom "bidulechouette"').rowcount,0)deftest_update_security_2(self):withself.temporary_permissions(Personne={'read':('users','managers'),'add':('guests','users','managers')}):withself.login('anon')ascu:self.assertRaises(Unauthorized,cu.execute,"SET X nom 'bidulechouette' WHERE X is Personne")self.rollback()# self.assertRaises(Unauthorized, cnx.commit)# test nothing has actually been insertedself.assertEqual(self.execute('Personne X WHERE X nom "bidulechouette"').rowcount,0)deftest_update_security_3(self):withself.login('iaminusersgrouponly')ascu:cu.execute("INSERT Personne X: X nom 'biduuule'")cu.execute("INSERT Societe X: X nom 'looogilab'")cu.execute("SET X travaille S WHERE X nom 'biduuule', S nom 'looogilab'")deftest_update_rql_permission(self):self.execute("SET A concerne S WHERE A is Affaire, S is Societe")self.commit()# test user can only update une affaire related to a societe he ownswithself.login('iaminusersgrouponly')ascu:cu.execute("SET X sujet 'pascool' WHERE X is Affaire")# this won't actually do anything since the selection query won't return anythingself.commit()# to actually get Unauthorized exception, try to update an entity we can readcu.execute("SET X nom 'toto' WHERE X is Societe")self.assertRaises(Unauthorized,self.commit)cu.execute("INSERT Affaire X: X sujet 'pascool'")cu.execute("INSERT Societe X: X nom 'chouette'")cu.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'")cu.execute("SET X sujet 'habahsicestcool' WHERE X sujet 'pascool'")self.commit()deftest_delete_security(self):# FIXME: sample below fails because we don't detect "owner" can't delete# user anyway, and since no user with login == 'bidule' exists, no# exception is raised#user._groups = {'guests':1}#self.assertRaises(Unauthorized,# self.o.execute, user, "DELETE CWUser X WHERE X login 'bidule'")# check local securitywithself.login('iaminusersgrouponly')ascu:self.assertRaises(Unauthorized,cu.execute,"DELETE CWGroup Y WHERE Y name 'staff'")self.rollback()deftest_delete_rql_permission(self):self.execute("SET A concerne S WHERE A is Affaire, S is Societe")self.commit()# test user can only dele une affaire related to a societe he ownswithself.login('iaminusersgrouponly')ascu:# this won't actually do anything since the selection query won't return anythingcu.execute("DELETE Affaire X")self.commit()# to actually get Unauthorized exception, try to delete an entity we can readself.assertRaises(Unauthorized,cu.execute,"DELETE Societe S")self.assertRaises(QueryError,self.commit)# can't commit anymoreself.rollback()# required after Unauthorizedcu.execute("INSERT Affaire X: X sujet 'pascool'")cu.execute("INSERT Societe X: X nom 'chouette'")cu.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'")self.commit()## # this one should fail since it will try to delete two affaires, one authorized## # and the other not## self.assertRaises(Unauthorized, cu.execute, "DELETE Affaire X")cu.execute("DELETE Affaire X WHERE X sujet 'pascool'")self.commit()deftest_insert_relation_rql_permission(self):withself.login('iaminusersgrouponly')ascu:cu.execute("SET A concerne S WHERE A is Affaire, S is Societe")# should raise Unauthorized since user don't own S though this won't# actually do anything since the selection query won't return# anythingself.commit()# to actually get Unauthorized exception, try to insert a relation# were we can read both entitiesrset=cu.execute('Personne P')self.assertEqual(len(rset),1)ent=rset.get_entity(0,0)self.assertFalse(cu.execute('Any P,S WHERE P travaille S,P is Personne, S is Societe'))self.assertRaises(Unauthorized,ent.cw_check_perm,'update')self.assertRaises(Unauthorized,cu.execute,"SET P travaille S WHERE P is Personne, S is Societe")self.assertRaises(QueryError,self.commit)# can't commit anymoreself.rollback()# test nothing has actually been inserted:self.assertFalse(cu.execute('Any P,S WHERE P travaille S,P is Personne, S is Societe'))cu.execute("INSERT Societe X: X nom 'chouette'")cu.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'")self.commit()deftest_delete_relation_rql_permission(self):self.execute("SET A concerne S WHERE A is Affaire, S is Societe")self.commit()withself.login('iaminusersgrouponly')ascu:# this won't actually do anything since the selection query won't return anythingcu.execute("DELETE A concerne S")self.commit()# to actually get Unauthorized exception, try to delete a relation we can readeid=self.execute("INSERT Affaire X: X sujet 'pascool'")[0][0]self.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"',{'x':eid})self.execute("SET A concerne S WHERE A sujet 'pascool', S is Societe")self.commit()withself.login('iaminusersgrouponly')ascu:self.assertRaises(Unauthorized,cu.execute,"DELETE A concerne S")self.assertRaises(QueryError,self.commit)# can't commit anymoreself.rollback()# required after Unauthorizedcu.execute("INSERT Societe X: X nom 'chouette'")cu.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'")self.commit()cu.execute("DELETE A concerne S WHERE S nom 'chouette'")self.commit()deftest_user_can_change_its_upassword(self):req=self.request()ueid=self.create_user(req,'user').eidwithself.login('user')ascu:cu.execute('SET X upassword %(passwd)s WHERE X eid %(x)s',{'x':ueid,'passwd':'newpwd'})self.commit()cnx=self.login('user',password='newpwd')cnx.close()deftest_user_cant_change_other_upassword(self):req=self.request()ueid=self.create_user(req,'otheruser').eidwithself.login('iaminusersgrouponly')ascu:cu.execute('SET X upassword %(passwd)s WHERE X eid %(x)s',{'x':ueid,'passwd':'newpwd'})self.assertRaises(Unauthorized,self.commit)# read security testdeftest_read_base(self):withself.temporary_permissions(Personne={'read':('users','managers')}):withself.login('anon')ascu:self.assertRaises(Unauthorized,cu.execute,'Personne U where U nom "managers"')self.rollback()deftest_read_erqlexpr_base(self):eid=self.execute("INSERT Affaire X: X sujet 'cool'")[0][0]self.commit()withself.login('iaminusersgrouponly')ascu:rset=cu.execute('Affaire X')self.assertEqual(rset.rows,[])self.assertRaises(Unauthorized,cu.execute,'Any X WHERE X eid %(x)s',{'x':eid})# cache testself.assertRaises(Unauthorized,cu.execute,'Any X WHERE X eid %(x)s',{'x':eid})aff2=cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0]soc1=cu.execute("INSERT Societe X: X nom 'chouette'")[0][0]cu.execute("SET A concerne S WHERE A is Affaire, S is Societe")self.commit()rset=cu.execute('Any X WHERE X eid %(x)s',{'x':aff2})self.assertEqual(rset.rows,[[aff2]])# more cache test w/ NOT eidrset=cu.execute('Affaire X WHERE NOT X eid %(x)s',{'x':eid})self.assertEqual(rset.rows,[[aff2]])rset=cu.execute('Affaire X WHERE NOT X eid %(x)s',{'x':aff2})self.assertEqual(rset.rows,[])# test can't update an attribute of an entity that can't be readenself.assertRaises(Unauthorized,cu.execute,'SET X sujet "hacked" WHERE X eid %(x)s',{'x':eid})self.rollback()deftest_entity_created_in_transaction(self):affschema=self.schema['Affaire']withself.temporary_permissions(Affaire={'read':affschema.permissions['add']}):withself.login('iaminusersgrouponly')ascu:aff2=cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0]# entity created in transaction are readable *by eid*self.assertTrue(cu.execute('Any X WHERE X eid %(x)s',{'x':aff2}))# XXX would be nice if it workedrset=cu.execute("Affaire X WHERE X sujet 'cool'")self.assertEqual(len(rset),0)self.assertRaises(Unauthorized,self.commit)deftest_read_erqlexpr_has_text1(self):aff1=self.execute("INSERT Affaire X: X sujet 'cool'")[0][0]card1=self.execute("INSERT Card X: X title 'cool'")[0][0]self.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"',{'x':card1})self.commit()withself.login('iaminusersgrouponly')ascu:aff2=cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0]soc1=cu.execute("INSERT Societe X: X nom 'chouette'")[0][0]cu.execute("SET A concerne S WHERE A eid %(a)s, S eid %(s)s",{'a':aff2,'s':soc1})self.commit()self.assertRaises(Unauthorized,cu.execute,'Any X WHERE X eid %(x)s',{'x':aff1})self.assertTrue(cu.execute('Any X WHERE X eid %(x)s',{'x':aff2}))self.assertTrue(cu.execute('Any X WHERE X eid %(x)s',{'x':card1}))rset=cu.execute("Any X WHERE X has_text 'cool'")self.assertEqual(sorted(eidforeid,inrset.rows),[card1,aff2])self.rollback()deftest_read_erqlexpr_has_text2(self):self.execute("INSERT Personne X: X nom 'bidule'")self.execute("INSERT Societe X: X nom 'bidule'")self.commit()withself.temporary_permissions(Personne={'read':('managers',)}):withself.login('iaminusersgrouponly')ascu:rset=cu.execute('Any N WHERE N has_text "bidule"')self.assertEqual(len(rset.rows),1,rset.rows)rset=cu.execute('Any N WITH N BEING (Any N WHERE N has_text "bidule")')self.assertEqual(len(rset.rows),1,rset.rows)deftest_read_erqlexpr_optional_rel(self):self.execute("INSERT Personne X: X nom 'bidule'")self.execute("INSERT Societe X: X nom 'bidule'")self.commit()withself.temporary_permissions(Personne={'read':('managers',)}):withself.login('anon')ascu:rset=cu.execute('Any N,U WHERE N has_text "bidule", N owned_by U?')self.assertEqual(len(rset.rows),1,rset.rows)deftest_read_erqlexpr_aggregat(self):self.execute("INSERT Affaire X: X sujet 'cool'")[0][0]self.commit()withself.login('iaminusersgrouponly')ascu:rset=cu.execute('Any COUNT(X) WHERE X is Affaire')self.assertEqual(rset.rows,[[0]])aff2=cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0]soc1=cu.execute("INSERT Societe X: X nom 'chouette'")[0][0]cu.execute("SET A concerne S WHERE A is Affaire, S is Societe")self.commit()rset=cu.execute('Any COUNT(X) WHERE X is Affaire')self.assertEqual(rset.rows,[[1]])rset=cu.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN')values=dict(rset)self.assertEqual(values['Affaire'],1)self.assertEqual(values['Societe'],2)rset=cu.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN WITH X BEING ((Affaire X) UNION (Societe X))')self.assertEqual(len(rset),2)values=dict(rset)self.assertEqual(values['Affaire'],1)self.assertEqual(values['Societe'],2)deftest_attribute_security(self):# only managers should be able to edit the 'test' attribute of Personne entitieseid=self.execute("INSERT Personne X: X nom 'bidule', X web 'http://www.debian.org', X test TRUE")[0][0]self.execute('SET X test FALSE WHERE X eid %(x)s',{'x':eid})self.commit()withself.login('iaminusersgrouponly')ascu:cu.execute("INSERT Personne X: X nom 'bidule', X web 'http://www.debian.org', X test TRUE")self.assertRaises(Unauthorized,self.commit)cu.execute("INSERT Personne X: X nom 'bidule', X web 'http://www.debian.org', X test FALSE")self.assertRaises(Unauthorized,self.commit)eid=cu.execute("INSERT Personne X: X nom 'bidule', X web 'http://www.debian.org'")[0][0]self.commit()cu.execute('SET X test FALSE WHERE X eid %(x)s',{'x':eid})self.assertRaises(Unauthorized,self.commit)cu.execute('SET X test TRUE WHERE X eid %(x)s',{'x':eid})self.assertRaises(Unauthorized,self.commit)cu.execute('SET X web "http://www.logilab.org" WHERE X eid %(x)s',{'x':eid})self.commit()deftest_attribute_security_rqlexpr(self):# Note.para attribute editable by managers or if the note is in "todo" statenote=self.execute("INSERT Note X: X para 'bidule'").get_entity(0,0)self.commit()note.cw_adapt_to('IWorkflowable').fire_transition('markasdone')self.execute('SET X para "truc" WHERE X eid %(x)s',{'x':note.eid})self.commit()withself.login('iaminusersgrouponly')ascu:cu.execute("SET X para 'chouette' WHERE X eid %(x)s",{'x':note.eid})self.assertRaises(Unauthorized,self.commit)note2=cu.execute("INSERT Note X: X para 'bidule'").get_entity(0,0)self.commit()note2.cw_adapt_to('IWorkflowable').fire_transition('markasdone')self.commit()self.assertEqual(len(cu.execute('Any X WHERE X in_state S, S name "todo", X eid %(x)s',{'x':note2.eid})),0)cu.execute("SET X para 'chouette' WHERE X eid %(x)s",{'x':note2.eid})self.assertRaises(Unauthorized,self.commit)note2.cw_adapt_to('IWorkflowable').fire_transition('redoit')self.commit()cu.execute("SET X para 'chouette' WHERE X eid %(x)s",{'x':note2.eid})self.commit()deftest_attribute_read_security(self):# anon not allowed to see users'login, but they can see userslogin_rdef=self.repo.schema['CWUser'].rdef('login')withself.temporary_permissions((login_rdef,{'read':('users','managers')}),CWUser={'read':('guests','users','managers')}):withself.login('anon')ascu:rset=cu.execute('CWUser X')self.assertTrue(rset)x=rset.get_entity(0,0)self.assertEqual(x.login,None)self.assertTrue(x.creation_date)x=rset.get_entity(1,0)x.complete()self.assertEqual(x.login,None)self.assertTrue(x.creation_date)deftest_yams_inheritance_and_security_bug(self):withself.temporary_permissions(Division={'read':('managers',ERQLExpression('X owned_by U'))}):withself.login('iaminusersgrouponly'):querier=self.repo.querierrqlst=querier.parse('Any X WHERE X is_instance_of Societe')querier.solutions(self.session,rqlst,{})querier._annotate(rqlst)plan=querier.plan_factory(rqlst,{},self.session)plan.preprocess(rqlst)self.assertEqual(rqlst.as_string(),'(Any X WHERE X is IN(SubDivision, Societe)) UNION (Any X WHERE X is Division, EXISTS(X owned_by %(B)s))')classBaseSchemaSecurityTC(BaseSecurityTC):"""tests related to the base schema permission configuration"""deftest_user_can_delete_object_he_created(self):# even if some other user have changed object'statewithself.login('iaminusersgrouponly')ascu:# due to security test, affaire has to concerne a societe the user ownscu.execute('INSERT Societe X: X nom "ARCTIA"')cu.execute('INSERT Affaire X: X ref "ARCT01", X concerne S WHERE S nom "ARCTIA"')self.commit()affaire=self.execute('Any X WHERE X ref "ARCT01"').get_entity(0,0)affaire.cw_adapt_to('IWorkflowable').fire_transition('abort')self.commit()self.assertEqual(len(self.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01"')),1)self.assertEqual(len(self.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01",''X owned_by U, U login "admin"')),1)# TrInfo at the above state changewithself.login('iaminusersgrouponly')ascu:cu.execute('DELETE Affaire X WHERE X ref "ARCT01"')self.commit()self.assertFalse(cu.execute('Affaire X'))deftest_users_and_groups_non_readable_by_guests(self):withself.login('anon')ascu:anon=cu.connection.user(self.session)# anonymous user can only read itselfrset=cu.execute('Any L WHERE X owned_by U, U login L')self.assertEqual([['anon']],rset.rows)rset=cu.execute('CWUser X')self.assertEqual([[anon.eid]],rset.rows)# anonymous user can read groups (necessary to check allowed transitions for instance)self.assert_(cu.execute('CWGroup X'))# should only be able to read the anonymous user, not another oneoriguser=self.adminsession.userself.assertRaises(Unauthorized,cu.execute,'CWUser X WHERE X eid %(x)s',{'x':origuser.eid})# nothing selected, nothing updated, no exception raised#self.assertRaises(Unauthorized,# cu.execute, 'SET X login "toto" WHERE X eid %(x)s',# {'x': self.user.eid})rset=cu.execute('CWUser X WHERE X eid %(x)s',{'x':anon.eid})self.assertEqual([[anon.eid]],rset.rows)# but can't modify itcu.execute('SET X login "toto" WHERE X eid %(x)s',{'x':anon.eid})self.assertRaises(Unauthorized,self.commit)deftest_in_group_relation(self):withself.login('iaminusersgrouponly')ascu:rql=u"DELETE U in_group G WHERE U login 'admin'"self.assertRaises(Unauthorized,cu.execute,rql)rql=u"SET U in_group G WHERE U login 'admin', G name 'users'"self.assertRaises(Unauthorized,cu.execute,rql)self.rollback()deftest_owned_by(self):self.execute("INSERT Personne X: X nom 'bidule'")self.commit()withself.login('iaminusersgrouponly')ascu:rql=u"SET X owned_by U WHERE U login 'iaminusersgrouponly', X is Personne"self.assertRaises(Unauthorized,cu.execute,rql)self.rollback()deftest_bookmarked_by_guests_security(self):beid1=self.execute('INSERT Bookmark B: B path "?vid=manage", B title "manage"')[0][0]beid2=self.execute('INSERT Bookmark B: B path "?vid=index", B title "index", B bookmarked_by U WHERE U login "anon"')[0][0]self.commit()withself.login('anon')ascu:anoneid=self.session.user.eidself.assertEqual(cu.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,''B bookmarked_by U, U eid %s'%anoneid).rows,[['index','?vid=index']])self.assertEqual(cu.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,''B bookmarked_by U, U eid %(x)s',{'x':anoneid}).rows,[['index','?vid=index']])# can read others bookmarks as wellself.assertEqual(cu.execute('Any B where B is Bookmark, NOT B bookmarked_by U').rows,[[beid1]])self.assertRaises(Unauthorized,cu.execute,'DELETE B bookmarked_by U')self.assertRaises(Unauthorized,cu.execute,'SET B bookmarked_by U WHERE U eid %(x)s, B eid %(b)s',{'x':anoneid,'b':beid1})self.rollback()deftest_ambigous_ordered(self):withself.login('anon')ascu:names=[tfort,incu.execute('Any N ORDERBY lower(N) WHERE X name N')]self.assertEqual(names,sorted(names,key=lambdax:x.lower()))deftest_restrict_is_instance_ok(self):rset=self.execute('Any X WHERE X is_instance_of BaseTransition')rqlst=rset.syntax_tree()select=rqlst.children[0]x=select.get_selected_variables().next()self.assertRaises(RQLException,select.add_type_restriction,x.variable,'CWUser')select.add_type_restriction(x.variable,'BaseTransition')select.add_type_restriction(x.variable,'WorkflowTransition')self.assertEqual(rqlst.as_string(),'Any X WHERE X is_instance_of WorkflowTransition')deftest_restrict_is_instance_no_supported(self):rset=self.execute('Any X WHERE X is_instance_of IN(CWUser, CWGroup)')rqlst=rset.syntax_tree()select=rqlst.children[0]x=select.get_selected_variables().next()self.assertRaises(NotImplementedError,select.add_type_restriction,x.variable,'WorkflowTransition')deftest_in_state_without_update_perm(self):"""check a user change in_state without having update permission on the subject """eid=self.execute('INSERT Affaire X: X ref "ARCT01"')[0][0]self.commit()withself.login('iaminusersgrouponly')ascu:session=self.session# needed to avoid check_perm errorsession.set_cnxset()# needed to remove rql expr granting update perm to the useraffschema=self.schema['Affaire']withself.temporary_permissions(Affaire={'update':affschema.get_groups('update'),'read':('users',)}):self.assertRaises(Unauthorized,affschema.check_perm,session,'update',eid=eid)aff=cu.execute('Any X WHERE X ref "ARCT01"').get_entity(0,0)aff.cw_adapt_to('IWorkflowable').fire_transition('abort')self.commit()# though changing a user state (even logged user) is reserved to managersuser=self.user(session)session.set_cnxset()# XXX wether it should raise Unauthorized or ValidationError is not clear# the best would probably ValidationError if the transition doesn't exist# from the current state but Unauthorized if it exists but user can't pass itself.assertRaises(ValidationError,user.cw_adapt_to('IWorkflowable').fire_transition,'deactivate')self.rollback()# else will fail on login cm exitdeftest_trinfo_security(self):aff=self.execute('INSERT Affaire X: X ref "ARCT01"').get_entity(0,0)iworkflowable=aff.cw_adapt_to('IWorkflowable')self.commit()iworkflowable.fire_transition('abort')self.commit()# can change tr info commentself.execute('SET TI comment %(c)s WHERE TI wf_info_for X, X ref "ARCT01"',{'c':u'bouh!'})self.commit()aff.cw_clear_relation_cache('wf_info_for','object')trinfo=iworkflowable.latest_trinfo()self.assertEqual(trinfo.comment,'bouh!')# but not from_state/to_stateaff.cw_clear_relation_cache('wf_info_for',role='object')self.assertRaises(Unauthorized,self.execute,'SET TI from_state S WHERE TI eid %(ti)s, S name "ben non"',{'ti':trinfo.eid})self.assertRaises(Unauthorized,self.execute,'SET TI to_state S WHERE TI eid %(ti)s, S name "pitetre"',{'ti':trinfo.eid})deftest_emailaddress_security(self):# check for prexisting email adresseifself.execute('Any X WHERE X is EmailAddress'):rset=self.execute('Any X, U WHERE X is EmailAddress, U use_email X')msg=['Preexisting email readable by anon found!']tmpl=' - "%s" used by user "%s"'foriinxrange(len(rset)):email,user=rset.get_entity(i,0),rset.get_entity(i,1)msg.append(tmpl%(email.dc_title(),user.dc_title()))raiseRuntimeError('\n'.join(msg))# actual testself.execute('INSERT EmailAddress X: X address "hop"').get_entity(0,0)self.execute('INSERT EmailAddress X: X address "anon", U use_email X WHERE U login "anon"').get_entity(0,0)self.commit()self.assertEqual(len(self.execute('Any X WHERE X is EmailAddress')),2)self.login('anon')self.assertEqual(len(self.execute('Any X WHERE X is EmailAddress')),1)if__name__=='__main__':unittest_main()