when reading the schema while adding/removing cubes, read schema in non-strict mode
# coding: utf-8"""unit tests for module cubicweb.common.utils"""from__future__importwith_statementfromlogilab.common.testlibimportTestCase,unittest_mainfromcubicweb.devtools.apptestimportEnvBasedTCfromcubicweb.selectorsimporttraced_selectionfromurlparseimporturlsplitfromrqlimportparsefromcubicweb.rsetimportNotAnEntity,ResultSet,attr_desc_iteratordefpprelcachedict(d):res={}fork,(rset,related)ind.items():res[k]=sorted(v.eidforvinrelated)returnsorted(res.items())classAttrDescIteratorTC(TestCase):"""TestCase for cubicweb.rset.attr_desc_iterator"""deftest_relations_description(self):"""tests relations_description() function"""queries={'Any U,L,M where U is EUser, U login L, U mail M':[(1,'login','subject'),(2,'mail','subject')],'Any U,L,M where U is EUser, L is Foo, U mail M':[(2,'mail','subject')],'Any C,P where C is Company, C employs P':[(1,'employs','subject')],'Any C,P where C is Company, P employed_by P':[],'Any C where C is Company, C employs P':[],}forrql,relationsinqueries.items():result=list(attr_desc_iterator(parse(rql).children[0]))self.assertEquals((rql,result),(rql,relations))deftest_relations_description_indexed(self):"""tests relations_description() function"""queries={'Any C,U,P,L,M where C is Company, C employs P, U is EUser, U login L, U mail M':{0:[(2,'employs','subject')],1:[(3,'login','subject'),(4,'mail','subject')]},}forrql,resultsinqueries.items():forvar_index,relationsinresults.items():result=list(attr_desc_iterator(parse(rql).children[0],var_index))self.assertEquals(result,relations)classResultSetTC(EnvBasedTC):defsetUp(self):super(ResultSetTC,self).setUp()self.rset=ResultSet([[12,'adim'],[13,'syt']],'Any U,L where U is EUser, U login L',description=[['EUser','String'],['Bar','String']])self.rset.vreg=self.vregdefcompare_urls(self,url1,url2):info1=urlsplit(url1)info2=urlsplit(url2)self.assertEquals(info1[:3],info2[:3])ifinfo1[3]!=info2[3]:params1=dict(pair.split('=')forpairininfo1[3].split('&'))params2=dict(pair.split('=')forpairininfo1[3].split('&'))self.assertDictEquals(params1,params2)deftest_build_url(self):req=self.request()baseurl=req.base_url()self.compare_urls(req.build_url('view',vid='foo',rql='yo'),'%sview?vid=foo&rql=yo'%baseurl)self.compare_urls(req.build_url('view',_restpath='task/title/go'),'%stask/title/go'%baseurl)#self.compare_urls(req.build_url('view', _restpath='/task/title/go'),# '%stask/title/go' % baseurl)# empty _restpath should not crashself.compare_urls(req.build_url('view',_restpath=''),baseurl)deftest_resultset_build(self):"""test basic build of a ResultSet"""rs=ResultSet([1,2,3],'EGroup X',description=['EGroup','EGroup','EGroup'])self.assertEquals(rs.rowcount,3)self.assertEquals(rs.rows,[1,2,3])self.assertEquals(rs.description,['EGroup','EGroup','EGroup'])deftest_resultset_limit(self):rs=ResultSet([[12000,'adim'],[13000,'syt'],[14000,'nico']],'Any U,L where U is EUser, U login L',description=[['EUser','String']]*3)rs.req=self.request()rs.vreg=self.env.vregself.assertEquals(rs.limit(2).rows,[[12000,'adim'],[13000,'syt']])rs2=rs.limit(2,offset=1)self.assertEquals(rs2.rows,[[13000,'syt'],[14000,'nico']])self.assertEquals(rs2.get_entity(0,0).row,0)self.assertEquals(rs.limit(2,offset=2).rows,[[14000,'nico']])self.assertEquals(rs.limit(2,offset=3).rows,[])deftest_resultset_filter(self):rs=ResultSet([[12000,'adim'],[13000,'syt'],[14000,'nico']],'Any U,L where U is EUser, U login L',description=[['EUser','String']]*3)rs.req=self.request()rs.vreg=self.env.vregdeftest_filter(entity):returnentity.login!='nico'rs2=rs.filtered_rset(test_filter)self.assertEquals(len(rs2),2)self.assertEquals([loginfor_,logininrs2],['adim','syt'])deftest_resultset_transform(self):rs=ResultSet([[12,'adim'],[13,'syt'],[14,'nico']],'Any U,L where U is EUser, U login L',description=[['EUser','String']]*3)rs.req=self.request()deftest_transform(row,desc):returnrow[1:],desc[1:]rs2=rs.transformed_rset(test_transform)self.assertEquals(len(rs2),3)self.assertEquals(list(rs2),[['adim'],['syt'],['nico']])deftest_resultset_sort(self):rs=ResultSet([[12000,'adim'],[13000,'syt'],[14000,'nico']],'Any U,L where U is EUser, U login L',description=[['EUser','String']]*3)rs.req=self.request()rs.vreg=self.env.vregrs2=rs.sorted_rset(lambdae:e['login'])self.assertEquals(len(rs2),3)self.assertEquals([loginfor_,logininrs2],['adim','nico','syt'])# make sure rs is unchangedself.assertEquals([loginfor_,logininrs],['adim','syt','nico'])rs2=rs.sorted_rset(lambdae:e['login'],reverse=True)self.assertEquals(len(rs2),3)self.assertEquals([loginfor_,logininrs2],['syt','nico','adim'])# make sure rs is unchangedself.assertEquals([loginfor_,logininrs],['adim','syt','nico'])rs3=rs.sorted_rset(lambdarow:row[1],col=-1)self.assertEquals(len(rs3),3)self.assertEquals([loginfor_,logininrs3],['adim','nico','syt'])# make sure rs is unchangedself.assertEquals([loginfor_,logininrs],['adim','syt','nico'])deftest_resultset_split(self):rs=ResultSet([[12000,'adim',u'Adim chez les pinguins'],[12000,'adim',u'Jardiner facile'],[13000,'syt',u'Le carrelage en 42 leçons'],[14000,'nico',u'La tarte tatin en 15 minutes'],[14000,'nico',u"L'épluchage du castor commun"]],'Any U, L, T WHERE U is EUser, U login L,'\'D created_by U, D title T',description=[['EUser','String','String']]*5)rs.req=self.request()rs.vreg=self.env.vregrsets=rs.split_rset(lambdae:e['login'])self.assertEquals(len(rsets),3)self.assertEquals([loginfor_,login,_inrsets[0]],['adim','adim'])self.assertEquals([loginfor_,login,_inrsets[1]],['syt'])self.assertEquals([loginfor_,login,_inrsets[2]],['nico','nico'])# make sure rs is unchangedself.assertEquals([loginfor_,login,_inrs],['adim','adim','syt','nico','nico'])rsets=rs.split_rset(lambdae:e['login'],return_dict=True)self.assertEquals(len(rsets),3)self.assertEquals([loginfor_,login,_inrsets['nico']],['nico','nico'])self.assertEquals([loginfor_,login,_inrsets['adim']],['adim','adim'])self.assertEquals([loginfor_,login,_inrsets['syt']],['syt'])# make sure rs is unchangedself.assertEquals([loginfor_,login,_inrs],['adim','adim','syt','nico','nico'])rsets=rs.split_rset(lambdas:s.count('d'),col=2)self.assertEquals(len(rsets),2)self.assertEquals([titlefor_,_,titleinrsets[0]],[u"Adim chez les pinguins",u"Jardiner facile",u"L'épluchage du castor commun",])self.assertEquals([titlefor_,_,titleinrsets[1]],[u"Le carrelage en 42 leçons",u"La tarte tatin en 15 minutes",])# make sure rs is unchangedself.assertEquals([titlefor_,_,titleinrs],[u'Adim chez les pinguins',u'Jardiner facile',u'Le carrelage en 42 leçons',u'La tarte tatin en 15 minutes',u"L'épluchage du castor commun"])deftest_cached_syntax_tree(self):"""make sure syntax tree is cached"""rqlst1=self.rset.syntax_tree()rqlst2=self.rset.syntax_tree()self.assert_(rqlst1isrqlst2)deftest_get_entity_simple(self):self.add_entity('EUser',login=u'adim',upassword='adim',surname=u'di mascio',firstname=u'adrien')e=self.entity('Any X,T WHERE X login "adim", X surname T')self.assertEquals(e['surname'],'di mascio')self.assertRaises(KeyError,e.__getitem__,'firstname')self.assertRaises(KeyError,e.__getitem__,'creation_date')self.assertEquals(pprelcachedict(e._related_cache),[])e.complete()self.assertEquals(e['firstname'],'adrien')self.assertEquals(pprelcachedict(e._related_cache),[])deftest_get_entity_advanced(self):self.add_entity('Bookmark',title=u'zou',path=u'/view')self.execute('SET X bookmarked_by Y WHERE X is Bookmark, Y login "anon"')rset=self.execute('Any X,Y,XT,YN WHERE X bookmarked_by Y, X title XT, Y login YN')e=rset.get_entity(0,0)self.assertEquals(e.row,0)self.assertEquals(e.col,0)self.assertEquals(e['title'],'zou')self.assertRaises(KeyError,e.__getitem__,'path')withtraced_selection():self.assertEquals(e.view('text'),'zou')self.assertEquals(pprelcachedict(e._related_cache),[])e=rset.get_entity(0,1)self.assertEquals(e.row,0)self.assertEquals(e.col,1)self.assertEquals(e['login'],'anon')self.assertRaises(KeyError,e.__getitem__,'firstname')self.assertEquals(pprelcachedict(e._related_cache),[])e.complete()self.assertEquals(e['firstname'],None)self.assertEquals(e.view('text'),'anon')self.assertEquals(pprelcachedict(e._related_cache),[])self.assertRaises(NotAnEntity,rset.get_entity,0,2)self.assertRaises(NotAnEntity,rset.get_entity,0,3)deftest_get_entity_relation_cache_compt(self):rset=self.execute('Any X,S WHERE X in_state S, X login "anon"')e=rset.get_entity(0,0)seid=self.execute('State X WHERE X name "activated"')[0][0]# for_user / in_group are prefetched in EUser __init__, in_state should# be filed from our query rsetself.assertEquals(pprelcachedict(e._related_cache),[('in_state_subject',[seid])])deftest_get_entity_advanced_prefilled_cache(self):e=self.add_entity('Bookmark',title=u'zou',path=u'path')self.commit()rset=self.execute('Any X,U,S,XT,UL,SN WHERE X created_by U, U in_state S, ''X title XT, S name SN, U login UL, X eid %s'%e.eid)e=rset.get_entity(0,0)self.assertEquals(e['title'],'zou')self.assertEquals(pprelcachedict(e._related_cache),[('created_by_subject',[5])])# first level of recursionu=e.created_by[0]self.assertEquals(u['login'],'admin')self.assertRaises(KeyError,u.__getitem__,'firstname')# second level of recursions=u.in_state[0]self.assertEquals(s['name'],'activated')self.assertRaises(KeyError,s.__getitem__,'description')deftest_get_entity_cache_with_left_outer_join(self):eid=self.execute('INSERT EUser E: E login "joe", E upassword "joe", E in_group G ''WHERE G name "users"')[0][0]rset=self.execute('Any X,E WHERE X eid %(x)s, X primary_email E?',{'x':eid})e=rset.get_entity(0,0)# if any of the assertion below fails with a KeyError, the relation is not cached# related entities should be an empty listself.assertEquals(e.related_cache('primary_email','subject',True),[])# related rset should be an empty rsetcached=e.related_cache('primary_email','subject',False)self.assertIsInstance(cached,ResultSet)self.assertEquals(cached.rowcount,0)deftest_get_entity_union(self):e=self.add_entity('Bookmark',title=u'manger',path=u'path')rset=self.execute('Any X,N ORDERBY N WITH X,N BEING ''((Any X,N WHERE X is Bookmark, X title N)'' UNION '' (Any X,N WHERE X is EGroup, X name N))')expected=(('EGroup','guests'),('EGroup','managers'),('Bookmark','manger'),('EGroup','owners'),('EGroup','users'))forentityinrset.entities():# test get_entity for each row actuallyetype,n=expected[entity.row]self.assertEquals(entity.id,etype)attr=etype=='Bookmark'and'title'or'name'self.assertEquals(entity[attr],n)deftest_related_entity_optional(self):e=self.add_entity('Bookmark',title=u'aaaa',path=u'path')rset=self.execute('Any B,U,L WHERE B bookmarked_by U?, U login L')entity,rtype=rset.related_entity(0,2)self.assertEquals(entity,None)self.assertEquals(rtype,None)deftest_related_entity_union_subquery(self):e=self.add_entity('Bookmark',title=u'aaaa',path=u'path')rset=self.execute('Any X,N ORDERBY N WITH X,N BEING ''((Any X,N WHERE X is EGroup, X name N)'' UNION '' (Any X,N WHERE X is Bookmark, X title N))')entity,rtype=rset.related_entity(0,1)self.assertEquals(entity.eid,e.eid)self.assertEquals(rtype,'title')entity,rtype=rset.related_entity(1,1)self.assertEquals(entity.id,'EGroup')self.assertEquals(rtype,'name')rset=self.execute('Any X,N ORDERBY N WHERE X is Bookmark WITH X,N BEING ''((Any X,N WHERE X is EGroup, X name N)'' UNION '' (Any X,N WHERE X is Bookmark, X title N))')entity,rtype=rset.related_entity(0,1)self.assertEquals(entity.eid,e.eid)self.assertEquals(rtype,'title')deftest_entities(self):rset=self.execute('Any U,G WHERE U in_group G')# make sure we have at least one elementself.failUnless(rset)self.assertEquals(set(e.e_schema.typeforeinrset.entities(0)),set(['EUser',]))self.assertEquals(set(e.e_schema.typeforeinrset.entities(1)),set(['EGroup',]))deftest_printable_rql(self):rset=self.execute(u'EEType X WHERE X final FALSE, X meta FALSE')self.assertEquals(rset.printable_rql(),'Any X WHERE X final FALSE, X meta FALSE, X is EEType')deftest_searched_text(self):rset=self.execute(u'Any X WHERE X has_text "foobar"')self.assertEquals(rset.searched_text(),'foobar')rset=self.execute(u'Any X WHERE X has_text %(text)s',{'text':'foo'})self.assertEquals(rset.searched_text(),'foo')if__name__=='__main__':unittest_main()