# -*- coding: iso-8859-1 -*-"""unit tests for module cubicweb.server.repository"""importosimportsysimportthreadingimporttimefromcopyimportdeepcopyfrommx.DateTimeimportDateTimeType,nowfromlogilab.common.testlibimportTestCase,unittest_mainfromcubicweb.devtools.apptestimportRepositoryBasedTCfromcubicweb.devtools.repotestimporttuplifyfromyams.constraintsimportUniqueConstraintfromcubicwebimportBadConnectionId,RepositoryError,ValidationError,UnknownEid,AuthenticationErrorfromcubicweb.schemaimportCubicWebSchema,RQLConstraintfromcubicweb.dbapiimportconnect,repo_connectfromcubicweb.serverimportrepository# start name server anyway, process will fail if already runningos.system('pyro-ns >/dev/null 2>/dev/null &')classRepositoryTC(RepositoryBasedTC):""" singleton providing access to a persistent storage for entities and relation """# def setUp(self):# pass# def tearDown(self):# self.repo.config.db_perms = True# cnxid = self.repo.connect(*self.default_user_password())# for etype in ('Affaire', 'Note', 'Societe', 'Personne'):# self.repo.execute(cnxid, 'DELETE %s X' % etype)# self.repo.commit(cnxid)# self.repo.close(cnxid)deftest_fill_schema(self):self.repo.schema=CubicWebSchema(self.repo.config.appid)self.repo.config._cubes=None# avoid assertion errorself.repo.fill_schema()pool=self.repo._get_pool()try:sqlcursor=pool['system']sqlcursor.execute('SELECT name FROM EEType WHERE final is NULL')self.assertEquals(sqlcursor.fetchall(),[])sqlcursor.execute('SELECT name FROM EEType WHERE final=%(final)s ORDER BY name',{'final':'TRUE'})self.assertEquals(sqlcursor.fetchall(),[(u'Boolean',),(u'Bytes',),(u'Date',),(u'Datetime',),(u'Decimal',),(u'Float',),(u'Int',),(u'Interval',),(u'Password',),(u'String',),(u'Time',)])finally:self.repo._free_pool(pool)deftest_schema_has_owner(self):repo=self.repocnxid=repo.connect(*self.default_user_password())self.failIf(repo.execute(cnxid,'EEType X WHERE NOT X owned_by U'))self.failIf(repo.execute(cnxid,'ERType X WHERE NOT X owned_by U'))self.failIf(repo.execute(cnxid,'EFRDef X WHERE NOT X owned_by U'))self.failIf(repo.execute(cnxid,'ENFRDef X WHERE NOT X owned_by U'))self.failIf(repo.execute(cnxid,'EConstraint X WHERE NOT X owned_by U'))self.failIf(repo.execute(cnxid,'EConstraintType X WHERE NOT X owned_by U'))deftest_connect(self):login,passwd=self.default_user_password()self.assert_(self.repo.connect(login,passwd))self.assertRaises(AuthenticationError,self.repo.connect,login,'nimportnawak')self.assertRaises(AuthenticationError,self.repo.connect,login,None)self.assertRaises(AuthenticationError,self.repo.connect,None,None)deftest_execute(self):repo=self.repocnxid=repo.connect(*self.default_user_password())repo.execute(cnxid,'Any X')repo.execute(cnxid,'Any X where X is Personne')repo.execute(cnxid,'Any X where X is Personne, X nom ~= "to"')repo.execute(cnxid,'Any X WHERE X has_text %(text)s',{'text':u'\xe7a'})repo.close(cnxid)deftest_login_upassword_accent(self):repo=self.repocnxid=repo.connect(*self.default_user_password())repo.execute(cnxid,'INSERT EUser X: X login %(login)s, X upassword %(passwd)s, X in_state S, X in_group G WHERE S name "activated", G name "users"',{'login':u"barnab�",'passwd':u"h�h�h�".encode('UTF8')})repo.commit(cnxid)repo.close(cnxid)self.assert_(repo.connect(u"barnab�",u"h�h�h�".encode('UTF8')))deftest_invalid_entity_rollback(self):repo=self.repocnxid=repo.connect(*self.default_user_password())repo.execute(cnxid,'INSERT EUser X: X login %(login)s, X upassword %(passwd)s, X in_state S WHERE S name "activated"',{'login':u"tutetute",'passwd':'tutetute'})self.assertRaises(ValidationError,repo.commit,cnxid)rset=repo.execute(cnxid,'EUser X WHERE X login "tutetute"')self.assertEquals(rset.rowcount,0)deftest_close(self):repo=self.repocnxid=repo.connect(*self.default_user_password())self.assert_(cnxid)repo.close(cnxid)self.assertRaises(BadConnectionId,repo.execute,cnxid,'Any X')deftest_invalid_cnxid(self):self.assertRaises(BadConnectionId,self.repo.execute,0,'Any X')self.assertRaises(BadConnectionId,self.repo.close,None)deftest_shared_data(self):repo=self.repocnxid=repo.connect(*self.default_user_password())repo.set_shared_data(cnxid,'data',4)cnxid2=repo.connect(*self.default_user_password())self.assertEquals(repo.get_shared_data(cnxid,'data'),4)self.assertEquals(repo.get_shared_data(cnxid2,'data'),None)repo.set_shared_data(cnxid2,'data',5)self.assertEquals(repo.get_shared_data(cnxid,'data'),4)self.assertEquals(repo.get_shared_data(cnxid2,'data'),5)repo.get_shared_data(cnxid2,'data',pop=True)self.assertEquals(repo.get_shared_data(cnxid,'data'),4)self.assertEquals(repo.get_shared_data(cnxid2,'data'),None)repo.close(cnxid)repo.close(cnxid2)self.assertRaises(BadConnectionId,repo.get_shared_data,cnxid,'data')self.assertRaises(BadConnectionId,repo.get_shared_data,cnxid2,'data')self.assertRaises(BadConnectionId,repo.set_shared_data,cnxid,'data',1)self.assertRaises(BadConnectionId,repo.set_shared_data,cnxid2,'data',1)deftest_check_session(self):repo=self.repocnxid=repo.connect(*self.default_user_password())self.assertEquals(repo.check_session(cnxid),None)repo.close(cnxid)self.assertRaises(BadConnectionId,repo.check_session,cnxid)deftest_transaction_base(self):repo=self.repocnxid=repo.connect(*self.default_user_password())# check db stateresult=repo.execute(cnxid,'Personne X')self.assertEquals(result.rowcount,0)# rollback entity insertionrepo.execute(cnxid,"INSERT Personne X: X nom 'bidule'")result=repo.execute(cnxid,'Personne X')self.assertEquals(result.rowcount,1)repo.rollback(cnxid)result=repo.execute(cnxid,'Personne X')self.assertEquals(result.rowcount,0,result.rows)# commitrepo.execute(cnxid,"INSERT Personne X: X nom 'bidule'")repo.commit(cnxid)result=repo.execute(cnxid,'Personne X')self.assertEquals(result.rowcount,1)deftest_transaction_base2(self):repo=self.repocnxid=repo.connect(*self.default_user_password())# rollback relation insertionrepo.execute(cnxid,"SET U in_group G WHERE U login 'admin', G name 'guests'")result=repo.execute(cnxid,"Any U WHERE U in_group G, U login 'admin', G name 'guests'")self.assertEquals(result.rowcount,1)repo.rollback(cnxid)result=repo.execute(cnxid,"Any U WHERE U in_group G, U login 'admin', G name 'guests'")self.assertEquals(result.rowcount,0,result.rows)deftest_transaction_base3(self):repo=self.repocnxid=repo.connect(*self.default_user_password())# rollback state change which trigger TrInfo insertionueid=repo._get_session(cnxid).user.eidrset=repo.execute(cnxid,'TrInfo T WHERE T wf_info_for X, X eid %(x)s',{'x':ueid})self.assertEquals(len(rset),1)repo.execute(cnxid,'SET X in_state S WHERE X eid %(x)s, S name "deactivated"',{'x':ueid},'x')rset=repo.execute(cnxid,'TrInfo T WHERE T wf_info_for X, X eid %(x)s',{'x':ueid})self.assertEquals(len(rset),2)repo.rollback(cnxid)rset=repo.execute(cnxid,'TrInfo T WHERE T wf_info_for X, X eid %(x)s',{'x':ueid})self.assertEquals(len(rset),1)deftest_transaction_interleaved(self):self.skip('implement me')deftest_initial_schema(self):schema=self.repo.schema# check order of attributes is respectedself.assertListEquals([r.typeforrinschema.eschema('EFRDef').ordered_relations()ifnotr.typein('eid','is','is_instance_of','identity','creation_date','modification_date','owned_by','created_by')],['relation_type','from_entity','to_entity','constrained_by','cardinality','ordernum','indexed','fulltextindexed','internationalizable','defaultval','description_format','description'])self.assertEquals(schema.eschema('EEType').main_attribute(),'name')self.assertEquals(schema.eschema('State').main_attribute(),'name')constraints=schema.rschema('name').rproperty('EEType','String','constraints')self.assertEquals(len(constraints),2)forcstrinconstraints[:]:ifisinstance(cstr,UniqueConstraint):constraints.remove(cstr)breakelse:self.fail('unique constraint not found')sizeconstraint=constraints[0]self.assertEquals(sizeconstraint.min,None)self.assertEquals(sizeconstraint.max,64)constraints=schema.rschema('relation_type').rproperty('EFRDef','ERType','constraints')self.assertEquals(len(constraints),1)cstr=constraints[0]self.assert_(isinstance(cstr,RQLConstraint))self.assertEquals(cstr.restriction,'O final TRUE')ownedby=schema.rschema('owned_by')self.assertEquals(ownedby.objects('EEType'),('EUser',))deftest_pyro(self):importPyroPyro.config.PYRO_MULTITHREADED=0lock=threading.Lock()# the client part has to be in the thread due to sqlite limitationst=threading.Thread(target=self._pyro_client,args=(lock,))try:daemon=self.repo.pyro_register()t.start()# connectiondaemon.handleRequests(1.0)daemon.handleRequests(1.0)daemon.handleRequests(1.0)# get schemadaemon.handleRequests(1.0)# executedaemon.handleRequests(1.0)t.join()finally:repository.pyro_unregister(self.repo.config)def_pyro_client(self,lock):cnx=connect(self.repo.config.appid,u'admin','gingkow')# check we can get the schemaschema=cnx.get_schema()self.assertEquals(schema.__hashmode__,None)rset=cnx.cursor().execute('Any U,G WHERE U in_group G')deftest_internal_api(self):repo=self.repocnxid=repo.connect(*self.default_user_password())session=repo._get_session(cnxid,setpool=True)self.assertEquals(repo.type_and_source_from_eid(1,session),('EGroup','system',None))self.assertEquals(repo.type_from_eid(1,session),'EGroup')self.assertEquals(repo.source_from_eid(1,session).uri,'system')self.assertEquals(repo.eid2extid(repo.system_source,1,session),None)classdummysource:uri='toto'self.assertRaises(UnknownEid,repo.eid2extid,dummysource,1,session)deftest_public_api(self):self.assertEquals(self.repo.get_schema(),self.repo.schema)self.assertEquals(self.repo.source_defs(),{'system':{'adapter':'native','uri':'system'}})# .properties() return a result setself.assertEquals(self.repo.properties().rql,'Any K,V WHERE P is EProperty,P pkey K, P value V, NOT P for_user U')deftest_session_api(self):repo=self.repocnxid=repo.connect(*self.default_user_password())self.assertEquals(repo.user_info(cnxid),(5,'admin',set([u'managers']),{}))self.assertEquals(repo.describe(cnxid,1),(u'EGroup',u'system',None))repo.close(cnxid)self.assertRaises(BadConnectionId,repo.user_info,cnxid)self.assertRaises(BadConnectionId,repo.describe,cnxid,1)deftest_shared_data_api(self):repo=self.repocnxid=repo.connect(*self.default_user_password())self.assertEquals(repo.get_shared_data(cnxid,'data'),None)repo.set_shared_data(cnxid,'data',4)self.assertEquals(repo.get_shared_data(cnxid,'data'),4)repo.get_shared_data(cnxid,'data',pop=True)repo.get_shared_data(cnxid,'whatever',pop=True)self.assertEquals(repo.get_shared_data(cnxid,'data'),None)repo.close(cnxid)self.assertRaises(BadConnectionId,repo.set_shared_data,cnxid,'data',0)self.assertRaises(BadConnectionId,repo.get_shared_data,cnxid,'data')classDataHelpersTC(RepositoryBasedTC):defsetUp(self):""" called before each test from this class """cnxid=self.repo.connect(*self.default_user_password())self.session=self.repo._sessions[cnxid]self.session.set_pool()deftearDown(self):self.session.rollback()deftest_create_eid(self):self.assert_(self.repo.system_source.create_eid(self.session))deftest_source_from_eid(self):self.assertEquals(self.repo.source_from_eid(1,self.session),self.repo.sources_by_uri['system'])deftest_source_from_eid_raise(self):self.assertRaises(UnknownEid,self.repo.source_from_eid,-2,self.session)deftest_type_from_eid(self):self.assertEquals(self.repo.type_from_eid(1,self.session),'EGroup')deftest_type_from_eid_raise(self):self.assertRaises(UnknownEid,self.repo.type_from_eid,-2,self.session)deftest_add_delete_info(self):entity=self.repo.vreg.etype_class('Personne')(self.session,None,None)entity.eid=-1entity.complete=lambdax:Noneself.repo.add_info(self.session,entity,self.repo.sources_by_uri['system'])cursor=self.session.pool['system']cursor.execute('SELECT * FROM entities WHERE eid = -1')data=cursor.fetchall()self.assertIsInstance(data[0][3],DateTimeType)data[0]=list(data[0])data[0][3]=Noneself.assertEquals(tuplify(data),[(-1,'Personne','system',None,None)])self.repo.delete_info(self.session,-1)#self.repo.commit()cursor.execute('SELECT * FROM entities WHERE eid = -1')data=cursor.fetchall()self.assertEquals(data,[])classFTITC(RepositoryBasedTC):deftest_reindex_and_modified_since(self):cursor=self.session.pool['system']eidp=self.execute('INSERT Personne X: X nom "toto", X prenom "tutu"')[0][0]self.commit()ts=now()self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')),1)cursor.execute('SELECT mtime, eid FROM entities WHERE eid = %s'%eidp)omtime=cursor.fetchone()[0]# our sqlite datetime adapter is ignore seconds fraction, so we have to# ensure update is done the next secondstime.sleep(1-(ts.second-int(ts.second)))self.execute('SET X nom "tata" WHERE X eid %(x)s',{'x':eidp},'x')self.commit()self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')),1)cursor.execute('SELECT mtime FROM entities WHERE eid = %s'%eidp)mtime=cursor.fetchone()[0]self.failUnless(omtime<mtime)self.commit()date,modified,deleted=self.repo.entities_modified_since(('Personne',),omtime)self.assertEquals(modified,[('Personne',eidp)])self.assertEquals(deleted,[])date,modified,deleted=self.repo.entities_modified_since(('Personne',),mtime)self.assertEquals(modified,[])self.assertEquals(deleted,[])self.execute('DELETE Personne X WHERE X eid %(x)s',{'x':eidp})self.commit()date,modified,deleted=self.repo.entities_modified_since(('Personne',),omtime)self.assertEquals(modified,[])self.assertEquals(deleted,[('Personne',eidp)])deftest_composite_entity(self):assertself.schema.rschema('use_email').fulltext_container=='subject'eid=self.add_entity('EmailAddress',address=u'toto@logilab.fr').eidself.commit()rset=self.execute('Any X WHERE X has_text %(t)s',{'t':'toto'})self.assertEquals(rset.rows,[[eid]])self.execute('SET X use_email Y WHERE X login "admin", Y eid %(y)s',{'y':eid})self.commit()rset=self.execute('Any X WHERE X has_text %(t)s',{'t':'toto'})self.assertEquals(rset.rows,[[self.session.user.eid]])self.execute('DELETE X use_email Y WHERE X login "admin", Y eid %(y)s',{'y':eid})self.commit()rset=self.execute('Any X WHERE X has_text %(t)s',{'t':'toto'})self.assertEquals(rset.rows,[])eid=self.add_entity('EmailAddress',address=u'tutu@logilab.fr').eidself.execute('SET X use_email Y WHERE X login "admin", Y eid %(y)s',{'y':eid})self.commit()rset=self.execute('Any X WHERE X has_text %(t)s',{'t':'tutu'})self.assertEquals(rset.rows,[[self.session.user.eid]])classDBInitTC(RepositoryBasedTC):deftest_versions_inserted(self):inserted=[r[0]forrinself.execute('Any K ORDERBY K WHERE P pkey K, P pkey ~= "system.version.%"')]self.assertEquals(inserted,[u'system.version.basket',u'system.version.comment',u'system.version.cubicweb',u'system.version.email',u'system.version.file',u'system.version.folder',u'system.version.tag'])classInlineRelHooksTC(RepositoryBasedTC):"""test relation hooks are called for inlined relations """defsetUp(self):RepositoryBasedTC.setUp(self)self.hm=self.repo.hmself.called=[]def_before_relation_hook(self,pool,fromeid,rtype,toeid):self.called.append((fromeid,rtype,toeid))def_after_relation_hook(self,pool,fromeid,rtype,toeid):self.called.append((fromeid,rtype,toeid))deftest_before_add_inline_relation(self):"""make sure before_<event>_relation hooks are called directly"""self.hm.register_hook(self._before_relation_hook,'before_add_relation','ecrit_par')eidp=self.execute('INSERT Personne X: X nom "toto"')[0][0]eidn=self.execute('INSERT Note X: X type "T"')[0][0]self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"')self.assertEquals(self.called,[(eidn,'ecrit_par',eidp)])deftest_after_add_inline_relation(self):"""make sure after_<event>_relation hooks are deferred"""self.hm.register_hook(self._after_relation_hook,'after_add_relation','ecrit_par')eidp=self.execute('INSERT Personne X: X nom "toto"')[0][0]eidn=self.execute('INSERT Note X: X type "T"')[0][0]self.assertEquals(self.called,[])self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"')self.assertEquals(self.called,[(eidn,'ecrit_par',eidp,)])deftest_after_add_inline(self):"""make sure after_<event>_relation hooks are deferred"""self.hm.register_hook(self._after_relation_hook,'after_add_relation','in_state')eidp=self.execute('INSERT EUser X: X login "toto", X upassword "tutu", X in_state S WHERE S name "activated"')[0][0]eids=self.execute('State X WHERE X name "activated"')[0][0]self.assertEquals(self.called,[(eidp,'in_state',eids,)])deftest_before_delete_inline_relation(self):"""make sure before_<event>_relation hooks are called directly"""self.hm.register_hook(self._before_relation_hook,'before_delete_relation','ecrit_par')eidp=self.execute('INSERT Personne X: X nom "toto"')[0][0]eidn=self.execute('INSERT Note X: X type "T"')[0][0]self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"')self.execute('DELETE N ecrit_par Y WHERE N type "T", Y nom "toto"')self.assertEquals(self.called,[(eidn,'ecrit_par',eidp)])rset=self.execute('Any Y where N ecrit_par Y, N type "T", Y nom "toto"')# make sure the relation is really deletedself.failUnless(len(rset)==0,"failed to delete inline relation")deftest_after_delete_inline_relation(self):"""make sure after_<event>_relation hooks are deferred"""self.hm.register_hook(self._after_relation_hook,'after_delete_relation','ecrit_par')eidp=self.execute('INSERT Personne X: X nom "toto"')[0][0]eidn=self.execute('INSERT Note X: X type "T"')[0][0]self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"')self.assertEquals(self.called,[])self.execute('DELETE N ecrit_par Y WHERE N type "T", Y nom "toto"')self.assertEquals(self.called,[(eidn,'ecrit_par',eidp,)])if__name__=='__main__':unittest_main()