[source] implement storages right in the source rather than in hooks
The problem is that Storage objects will most probably change entity's
dictionary so that values are correctly set before the source's
corresponding method (e.g. entity_added()) is called.
For instance, the BFSFileStorage will change the original binary
data and replace it with the destination file path in order to store
the file path in the database. This change must be local
to the source in order not to impact other hooks or attribute access
during the transaction, the whole idea being that the same
application code should work exactly the same whether or not a
BFSStorage is used or not.
""":organization: Logilab:copyright: 2001-2010 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses"""fromcubicweb.goa.testlibimport*fromurllibimportunquotefromcubicwebimportValidationErrorfromcubicweb.uilibimportrql_for_eidfromcubicweb.webimportINTERNAL_FIELD_VALUE,Redirectfromcubicweb.goa.goaconfigimportGAEConfigurationfromcubicweb.entities.authobjsimportCWUserclassEditControllerTC(GAEBasedTC):config=GAEConfiguration('toto')config.global_set_option('use-google-auth',False)config.global_set_option('schema-type','yams')config.global_set_option('included-cubes',())config.global_set_option('included-yams-cubes',('blog',))MODEL_CLASSES=()fromcubicweb.web.viewsimporteditcontrollerfromcubicweb.entitiesimportlibLOAD_APP_MODULES=(editcontroller,lib)defsetUp(self):GAEBasedTC.setUp(self)self.req=self.request()self.ctrl=self.get_ctrl(self.req)defget_ctrl(self,req):returnself.vreg.select('controllers','edit',req=req,appli=self)defpublish(self,req):assertreqisself.ctrl.reqtry:result=self.ctrl.publish()req.cnx.commit()exceptRedirect:req.cnx.commit()raiseexcept:req.cnx.rollback()raisereturnresultdefexpect_redirect_publish(self,req=None):ifreqisnotNone:self.ctrl=self.get_ctrl(req)else:req=self.reqtry:res=self.publish(req)exceptRedirect,ex:try:path,params=ex.location.split('?',1)except:path,params=ex.location,""req._url=pathcleanup=lambdap:(p[0],unquote(p[1]))params=dict(cleanup(p.split('=',1))forpinparams.split('&')ifp)returnreq.relative_path(False),params# path.rsplit('/', 1)[-1], paramselse:self.fail('expected a Redirect exception')deftest_noparam_edit(self):"""check behaviour of this controller without any form parameter"""self.req.form={}self.assertRaises(ValidationError,self.publish,self.req)deftest_validation_unique(self):"""test creation of two linked entities"""user=self.userself.req.form={'eid':'X','__type:X':'CWUser','login:X':self.user.login,'edits-login:X':u'','upassword:X':u'toto','upassword-confirm:X':u'toto','edits-upassword:X':u'',}self.assertRaises(ValidationError,self.publish,self.req)deftest_user_editing_itself(self):"""checking that a manager user can edit itself"""self.skip('missing actual gae support, retry latter')user=self.userbasegroups=[str(eid)foreid,inself.req.execute('CWGroup G WHERE X in_group G, X eid %(x)s',{'x':user.eid})]groupeids=[eidforeid,inself.req.execute('CWGroup G WHERE G name in ("managers", "users")')]groups=[str(eid)foreidingroupeids]stateeid=[eidforeid,inself.req.execute('State S WHERE S name "activated"')][0]self.req.form={'eid':user.eid,'__type:'+user.eid:'CWUser','login:'+user.eid:unicode(user.login),'firstname:'+user.eid:u'Th\xe9nault','surname:'+user.eid:u'Sylvain','in_group:'+user.eid:groups,'in_state:'+user.eid:stateeid,#'edits-login:'+user.eid:unicode(user.login),'edits-firstname:'+user.eid:u'','edits-surname:'+user.eid:u'','edits-in_group:'+user.eid:basegroups,'edits-in_state:'+user.eid:stateeid,}path,params=self.expect_redirect_publish()e=self.req.execute('Any X WHERE X eid %(x)s',{'x':user.eid},'x').get_entity(0,0)self.assertEquals(e.firstname,u'Th\xe9nault')self.assertEquals(e.surname,u'Sylvain')self.assertEquals(e.login,user.login)self.assertEquals([g.eidforgine.in_group],groupeids)self.assertEquals(e.in_state[0].eid,stateeid)deftest_user_can_change_its_password(self):user=self.create_user('user')cnx=self.login('user')req=self.request()#self.assertEquals(self.ctrl.schema['CWUser']._groups['read'],# ('managers', 'users'))req.form={'eid':user.eid,'__type:'+user.eid:'CWUser','__maineid':str(user.eid),'upassword:'+user.eid:'tournicoton','upassword-confirm:'+user.eid:'tournicoton','edits-upassword:'+user.eid:'',}path,params=self.expect_redirect_publish(req)cnx.commit()# commit to check we don't get late validation error for instanceself.assertEquals(path,'euser/user')self.failIf('vid'inparams)deftest_user_editing_itself_no_relation(self):"""checking we can edit an entity without specifying some required relations (meaning no changes) """user=self.usergroupeids=[eidforeid,inself.req.execute('CWGroup G WHERE X in_group G, X eid %(x)s',{'x':user.eid})]self.req.form={'eid':user.eid,'__type:'+user.eid:'CWUser','login:'+user.eid:unicode(user.login),'firstname:'+user.eid:u'Th\xe9nault','surname:'+user.eid:u'Sylvain',#'edits-login:'+user.eid:unicode(user.login),'edits-firstname:'+user.eid:u'','edits-surname:'+user.eid:u'',}path,params=self.expect_redirect_publish()self.req.drop_entity_cache(user.eid)e=self.req.execute('Any X WHERE X eid %(x)s',{'x':user.eid},'x').get_entity(0,0)self.assertEquals(e.login,user.login)self.assertEquals(e.firstname,u'Th\xe9nault')self.assertEquals(e.surname,u'Sylvain')self.assertUnorderedIterableEquals([g.eidforgine.in_group],groupeids)#stateeids = [eid for eid, in self.req.execute('State S WHERE S name "activated"')]#self.assertEquals([s.eid for s in e.in_state], stateeids)deftest_create_multiple_linked(self):gueid=self.req.execute('CWGroup G WHERE G name "users"')[0][0]self.req.form={'eid':['X','Y'],'__type:X':'CWUser','__maineid':'X','login:X':u'adim','edits-login:X':u'','upassword:X':u'toto','upassword-confirm:X':u'toto','edits-upassword:X':u'','surname:X':u'Di Mascio','edits-surname:X':'','in_group:X':gueid,'edits-in_group:X':INTERNAL_FIELD_VALUE,'__type:Y':'EmailAddress','address:Y':u'dima@logilab.fr','edits-address:Y':'','use_email:X':'Y','edits-use_email:X':INTERNAL_FIELD_VALUE,}path,params=self.expect_redirect_publish()# should be redirected on the created personself.assertEquals(path,'euser/adim')e=self.req.execute('Any P WHERE P surname "Di Mascio"').get_entity(0,0)self.assertEquals(e.surname,'Di Mascio')email=e.use_email[0]self.assertEquals(email.address,'dima@logilab.fr')deftest_edit_multiple_linked(self):peid=self.create_user('adim').eidself.req.form={'eid':[peid,'Y'],'__type:%s'%peid:'CWUser','surname:%s'%peid:u'Di Masci','edits-surname:%s'%peid:'','__type:Y':'EmailAddress','address:Y':u'dima@logilab.fr','edits-address:Y':'','use_email:%s'%peid:'Y','edits-use_email:%s'%peid:INTERNAL_FIELD_VALUE,'__redirectrql':'Any X WHERE X eid %s'%peid,}path,params=self.expect_redirect_publish()# should be redirected on the created personeid=params['rql'].split()[-1]e=self.req.execute('Any X WHERE X eid %(x)s',{'x':eid},'x').get_entity(0,0)self.assertEquals(e.surname,'Di Masci')email=e.use_email[0]self.assertEquals(email.address,'dima@logilab.fr')emaileid=email.eidself.req.form={'eid':[peid,emaileid],'__type:%s'%peid:'CWUser','surname:%s'%peid:u'Di Masci','edits-surname:%s'%peid:'Di Masci','__type:%s'%emaileid:'EmailAddress','address:%s'%emaileid:u'adim@logilab.fr','edits-address:%s'%emaileid:'dima@logilab.fr','use_email:%s'%peid:emaileid,'edits-use_email:%s'%peid:emaileid,'__redirectrql':'Any X WHERE X eid %s'%peid,}path,params=self.expect_redirect_publish()# should be redirected on the created personeid=params['rql'].split()[-1]# XXX this should not be necessary, it isn't with regular cubicwebself.req._eid_cache={}e=self.req.execute('Any X WHERE X eid %(x)s',{'x':eid},'x').get_entity(0,0)self.assertEquals(e.surname,'Di Masci')email=e.use_email[0]self.assertEquals(email.address,'adim@logilab.fr')deftest_password_confirm(self):"""test creation of two linked entities """user=self.userself.req.form={'__cloned_eid:X':user.eid,'eid':'X','__type:X':'CWUser','login:X':u'toto','edits-login:X':u'','upassword:X':u'toto','edits-upassword:X':u'',}self.assertRaises(ValidationError,self.publish,self.req)self.req.form={'__cloned_eid:X':user.eid,'eid':'X','__type:X':'CWUser','login:X':u'toto','edits-login:X':u'','upassword:X':u'toto','upassword-confirm:X':u'tutu','edits-upassword:X':u'',}self.assertRaises(ValidationError,self.publish,self.req)deftest_req_pending_insert(self):"""make sure req's pending insertions are taken into account"""tmpgroup=self.add_entity('CWGroup',name=u"test")user=self.userself.req.set_session_data('pending_insert',set([(user.eid,'in_group',tmpgroup.eid)]))path,params=self.expect_redirect_publish()usergroups=[gnameforgname,inself.req.execute('Any N WHERE G name N, U in_group G, U eid %(u)s',{'u':user.eid})]self.assertUnorderedIterableEquals(usergroups,['managers','users','test'])self.assertEquals(self.req.get_pending_inserts(),[])deftest_req_pending_delete(self):"""make sure req's pending deletions are taken into account"""user=self.usergroupeid=self.req.execute('INSERT CWGroup G: G name "test", U in_group G WHERE U eid %(x)s',{'x':user.eid})[0][0]usergroups=[gnameforgname,inself.req.execute('Any N WHERE G name N, U in_group G, U eid %(u)s',{'u':user.eid})]# just make sure everything was set correctlyself.assertUnorderedIterableEquals(usergroups,['managers','users','test'])# now try to delete the relationself.req.set_session_data('pending_delete',set([(user.eid,'in_group',groupeid)]))path,params=self.expect_redirect_publish()usergroups=[gnameforgname,inself.req.execute('Any N WHERE G name N, U in_group G, U eid %(u)s',{'u':user.eid})]self.assertUnorderedIterableEquals(usergroups,['managers','users'])#self.assertUnorderedIterableEquals(usergroups, ['managers'])self.assertEquals(self.req.get_pending_deletes(),[])deftest_custom_attribute_handler(self):defcustom_login_edit(self,formparams,value,relations):formparams['login']=value.upper()relations.append('X login %(login)s')CWUser.custom_login_edit=custom_login_edittry:user=self.usereid=repr(user.eid)self.req.form={'eid':eid,'__type:'+eid:'CWUser','login:'+eid:u'foo','edits-login:'+eid:unicode(user.login),}path,params=self.expect_redirect_publish()rset=self.req.execute('Any L WHERE X eid %(x)s, X login L',{'x':user.eid},'x')self.assertEquals(rset[0][0],'FOO')finally:delCWUser.custom_login_editdeftest_redirect_apply_button(self):redirectrql=rql_for_eid(4012)# whateverself.req.form={'eid':'A','__type:A':'BlogEntry','__maineid':'A','content:A':u'"13:03:43"','edits-content:A':'','title:A':u'huuu','edits-title:A':'','__redirectrql':redirectrql,'__redirectvid':'primary','__redirectparams':'toto=tutu&tata=titi','__form_id':'edition','__action_apply':'',}path,params=self.expect_redirect_publish()self.failUnless(path.startswith('blogentry/'))eid=path.split('/')[1]self.assertEquals(params['vid'],'edition')self.assertNotEquals(eid,'4012')self.assertEquals(params['__redirectrql'],redirectrql)self.assertEquals(params['__redirectvid'],'primary')self.assertEquals(params['__redirectparams'],'toto=tutu&tata=titi')deftest_redirect_ok_button(self):redirectrql=rql_for_eid(4012)# whateverself.req.form={'eid':'A','__type:A':'BlogEntry','__maineid':'A','content:A':u'"13:03:43"','edits-content:A':'','title:A':u'huuu','edits-title:A':'','__redirectrql':redirectrql,'__redirectvid':'primary','__redirectparams':'toto=tutu&tata=titi','__form_id':'edition',}path,params=self.expect_redirect_publish()self.assertEquals(path,'view')self.assertEquals(params['rql'],redirectrql)self.assertEquals(params['vid'],'primary')self.assertEquals(params['tata'],'titi')self.assertEquals(params['toto'],'tutu')deftest_redirect_delete_button(self):eid=self.add_entity('BlogEntry',title=u'hop',content=u'hop').eidself.req.form={'eid':str(eid),'__type:%s'%eid:'BlogEntry','__action_delete':''}path,params=self.expect_redirect_publish()self.assertEquals(path,'blogentry')self.assertEquals(params,{u'__message':u'entity deleted'})eid=self.add_entity('EmailAddress',address=u'hop@logilab.fr').eidself.req.execute('SET X use_email E WHERE E eid %(e)s, X eid %(x)s',{'x':self.user.eid,'e':eid},'x')self.commit()self.req.form={'eid':str(eid),'__type:%s'%eid:'EmailAddress','__action_delete':''}path,params=self.expect_redirect_publish()self.assertEquals(unquote(path),'euser/'+self.user.login)self.assertEquals(params,{u'__message':u'entity deleted'})eid1=self.add_entity('BlogEntry',title=u'hop',content=u'hop').eideid2=self.add_entity('EmailAddress',address=u'hop@logilab.fr').eidself.req.form={'eid':[str(eid1),str(eid2)],'__type:%s'%eid1:'BlogEntry','__type:%s'%eid2:'EmailAddress','__action_delete':''}path,params=self.expect_redirect_publish()self.assertEquals(path,'view')self.assertEquals(params,{u'__message':u'entities deleted'})deftest_nonregr_multiple_empty_email_addr(self):gueid=self.req.execute('CWGroup G WHERE G name "users"')[0][0]self.req.form={'eid':['X','Y'],'__type:X':'CWUser','login:X':u'adim','edits-login:X':u'','upassword:X':u'toto','upassword-confirm:X':u'toto','edits-upassword:X':u'','in_group:X':gueid,'edits-in_group:X':INTERNAL_FIELD_VALUE,'__type:Y':'EmailAddress','address:Y':u'','edits-address:Y':'','alias:Y':u'','edits-alias:Y':'','use_email:X':'Y','edits-use_email:X':INTERNAL_FIELD_VALUE,}self.assertRaises(ValidationError,self.publish,self.req)deftest_nonregr_rollback_on_validation_error(self):self.skip('lax fix me')p=self.create_user("doe")# do not try to skip 'primary_email' for this testold_skips=p.__class__.skip_copy_forp.__class__.skip_copy_for=()try:e=self.add_entity('EmailAddress',address=u'doe@doe.com')self.req.execute('SET P use_email E, P primary_email E WHERE P eid %(p)s, E eid %(e)s',{'p':p.eid,'e':e.eid})self.req.form={'__cloned_eid:X':p.eid,'eid':'X','__type:X':'CWUser','login':u'dodo','edits-login':u'dodo','surname:X':u'Boom','edits-surname:X':u'','__errorurl':"whatever but required",}# try to emulate what really happens in the web application# 1/ validate form => EditController.publish raises a ValidationError# which fires a Redirect# 2/ When re-publishing the copy form, the publisher implicitly commitstry:self.app.publish('edit',self.req)exceptRedirect:self.req.form['rql']='Any X WHERE X eid %s'%p.eidself.req.form['vid']='copy'self.app.publish('view',self.req)rset=self.req.execute('CWUser P WHERE P surname "Boom"')self.assertEquals(len(rset),0)finally:p.__class__.skip_copy_for=old_skipsif__name__=='__main__':fromlogilab.common.testlibimportunittest_mainunittest_main()