# -*- coding: iso-8859-1 -*-# copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""unit tests for module cubicweb.server.repository"""importosimportsysimportthreadingimporttimeimportloggingfromcopyimportdeepcopyfromdatetimeimportdatetimefromlogilab.common.testlibimportTestCase,unittest_mainfromyams.constraintsimportUniqueConstraintfromyamsimportregister_base_type,unregister_base_typefromlogilab.databaseimportget_db_helperfromcubicwebimport(BadConnectionId,RepositoryError,ValidationError,UnknownEid,AuthenticationError,Unauthorized,QueryError)fromcubicweb.predicatesimportis_instancefromcubicweb.schemaimportCubicWebSchema,RQLConstraintfromcubicweb.dbapiimportconnect,multiple_connections_unfixfromcubicweb.devtools.testlibimportCubicWebTCfromcubicweb.devtools.repotestimporttuplifyfromcubicweb.serverimportrepository,hookfromcubicweb.server.sqlutilsimportSQL_PREFIXfromcubicweb.server.hookimportHookfromcubicweb.server.sourcesimportnativefromcubicweb.server.sessionimportSessionClosedErrorclassRepositoryTC(CubicWebTC):""" singleton providing access to a persistent storage for entities and relation """deftest_unique_together_constraint(self):self.execute('INSERT Societe S: S nom "Logilab", S type "SSLL", S cp "75013"')withself.assertRaises(ValidationError)aswraperr:self.execute('INSERT Societe S: S nom "Logilab", S type "SSLL", S cp "75013"')self.assertEqual({'cp':u'cp is part of violated unicity constraint','nom':u'nom is part of violated unicity constraint','type':u'type is part of violated unicity constraint','unicity constraint':u'some relations violate a unicity constraint'},wraperr.exception.args[1])deftest_unique_together_schema(self):person=self.repo.schema.eschema('Personne')self.assertEqual(len(person._unique_together),1)self.assertItemsEqual(person._unique_together[0],('nom','prenom','inline2'))deftest_all_entities_have_owner(self):self.assertFalse(self.execute('Any X WHERE NOT X owned_by U'))deftest_all_entities_have_is(self):self.assertFalse(self.execute('Any X WHERE NOT X is ET'))deftest_all_entities_have_cw_source(self):self.assertFalse(self.execute('Any X WHERE NOT X cw_source S'))deftest_connect(self):cnxid=self.repo.connect(self.admlogin,password=self.admpassword)self.assert_(cnxid)self.repo.close(cnxid)self.assertRaises(AuthenticationError,self.repo.connect,self.admlogin,password='nimportnawak')self.assertRaises(AuthenticationError,self.repo.connect,self.admlogin,password='')self.assertRaises(AuthenticationError,self.repo.connect,self.admlogin,password=None)self.assertRaises(AuthenticationError,self.repo.connect,None,password=None)self.assertRaises(AuthenticationError,self.repo.connect,self.admlogin)self.assertRaises(AuthenticationError,self.repo.connect,None)deftest_execute(self):repo=self.repocnxid=repo.connect(self.admlogin,password=self.admpassword)repo.execute(cnxid,'Any X')repo.execute(cnxid,'Any X where X is Personne')repo.execute(cnxid,'Any X where X is Personne, X nom ~= "to"')repo.execute(cnxid,'Any X WHERE X has_text %(text)s',{'text':u'\xe7a'})repo.close(cnxid)deftest_login_upassword_accent(self):repo=self.repocnxid=repo.connect(self.admlogin,password=self.admpassword)repo.execute(cnxid,'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_group G WHERE G name "users"',{'login':u"barnab�",'passwd':u"h�h�h�".encode('UTF8')})repo.commit(cnxid)repo.close(cnxid)cnxid=repo.connect(u"barnab�",password=u"h�h�h�".encode('UTF8'))self.assert_(cnxid)repo.close(cnxid)deftest_rollback_on_commit_error(self):cnxid=self.repo.connect(self.admlogin,password=self.admpassword)self.repo.execute(cnxid,'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s',{'login':u"tutetute",'passwd':'tutetute'})self.assertRaises(ValidationError,self.repo.commit,cnxid)self.assertFalse(self.repo.execute(cnxid,'CWUser X WHERE X login "tutetute"'))self.repo.close(cnxid)deftest_rollback_on_execute_validation_error(self):classValidationErrorAfterHook(Hook):__regid__='valerror-after-hook'__select__=Hook.__select__&is_instance('CWGroup')events=('after_update_entity',)def__call__(self):raiseValidationError(self.entity.eid,{})withself.temporary_appobjects(ValidationErrorAfterHook):self.assertRaises(ValidationError,self.execute,'SET X name "toto" WHERE X is CWGroup, X name "guests"')self.assertTrue(self.execute('Any X WHERE X is CWGroup, X name "toto"'))withself.assertRaises(QueryError)ascm:self.commit()self.assertEqual(str(cm.exception),'transaction must be rolled back')self.rollback()self.assertFalse(self.execute('Any X WHERE X is CWGroup, X name "toto"'))deftest_rollback_on_execute_unauthorized(self):classUnauthorizedAfterHook(Hook):__regid__='unauthorized-after-hook'__select__=Hook.__select__&is_instance('CWGroup')events=('after_update_entity',)def__call__(self):raiseUnauthorized()withself.temporary_appobjects(UnauthorizedAfterHook):self.assertRaises(Unauthorized,self.execute,'SET X name "toto" WHERE X is CWGroup, X name "guests"')self.assertTrue(self.execute('Any X WHERE X is CWGroup, X name "toto"'))withself.assertRaises(QueryError)ascm:self.commit()self.assertEqual(str(cm.exception),'transaction must be rolled back')self.rollback()self.assertFalse(self.execute('Any X WHERE X is CWGroup, X name "toto"'))deftest_close(self):repo=self.repocnxid=repo.connect(self.admlogin,password=self.admpassword)self.assert_(cnxid)repo.close(cnxid)self.assertRaises(BadConnectionId,repo.execute,cnxid,'Any X')deftest_invalid_cnxid(self):self.assertRaises(BadConnectionId,self.repo.execute,0,'Any X')self.assertRaises(BadConnectionId,self.repo.close,None)deftest_shared_data(self):repo=self.repocnxid=repo.connect(self.admlogin,password=self.admpassword)repo.set_shared_data(cnxid,'data',4)cnxid2=repo.connect(self.admlogin,password=self.admpassword)self.assertEqual(repo.get_shared_data(cnxid,'data'),4)self.assertEqual(repo.get_shared_data(cnxid2,'data'),None)repo.set_shared_data(cnxid2,'data',5)self.assertEqual(repo.get_shared_data(cnxid,'data'),4)self.assertEqual(repo.get_shared_data(cnxid2,'data'),5)repo.get_shared_data(cnxid2,'data',pop=True)self.assertEqual(repo.get_shared_data(cnxid,'data'),4)self.assertEqual(repo.get_shared_data(cnxid2,'data'),None)repo.close(cnxid)repo.close(cnxid2)self.assertRaises(BadConnectionId,repo.get_shared_data,cnxid,'data')self.assertRaises(BadConnectionId,repo.get_shared_data,cnxid2,'data')self.assertRaises(BadConnectionId,repo.set_shared_data,cnxid,'data',1)self.assertRaises(BadConnectionId,repo.set_shared_data,cnxid2,'data',1)deftest_check_session(self):repo=self.repocnxid=repo.connect(self.admlogin,password=self.admpassword)self.assertIsInstance(repo.check_session(cnxid),float)repo.close(cnxid)self.assertRaises(BadConnectionId,repo.check_session,cnxid)deftest_transaction_base(self):repo=self.repocnxid=repo.connect(self.admlogin,password=self.admpassword)# check db stateresult=repo.execute(cnxid,'Personne X')self.assertEqual(result.rowcount,0)# rollback entity insertionrepo.execute(cnxid,"INSERT Personne X: X nom 'bidule'")result=repo.execute(cnxid,'Personne X')self.assertEqual(result.rowcount,1)repo.rollback(cnxid)result=repo.execute(cnxid,'Personne X')self.assertEqual(result.rowcount,0,result.rows)# commitrepo.execute(cnxid,"INSERT Personne X: X nom 'bidule'")repo.commit(cnxid)result=repo.execute(cnxid,'Personne X')self.assertEqual(result.rowcount,1)repo.close(cnxid)deftest_transaction_base2(self):repo=self.repocnxid=repo.connect(self.admlogin,password=self.admpassword)# rollback relation insertionrepo.execute(cnxid,"SET U in_group G WHERE U login 'admin', G name 'guests'")result=repo.execute(cnxid,"Any U WHERE U in_group G, U login 'admin', G name 'guests'")self.assertEqual(result.rowcount,1)repo.rollback(cnxid)result=repo.execute(cnxid,"Any U WHERE U in_group G, U login 'admin', G name 'guests'")self.assertEqual(result.rowcount,0,result.rows)repo.close(cnxid)deftest_transaction_base3(self):repo=self.repocnxid=repo.connect(self.admlogin,password=self.admpassword)# rollback state change which trigger TrInfo insertionsession=repo._get_session(cnxid)session.set_cnxset()user=session.useruser.cw_adapt_to('IWorkflowable').fire_transition('deactivate')rset=repo.execute(cnxid,'TrInfo T WHERE T wf_info_for X, X eid %(x)s',{'x':user.eid})self.assertEqual(len(rset),1)repo.rollback(cnxid)rset=repo.execute(cnxid,'TrInfo T WHERE T wf_info_for X, X eid %(x)s',{'x':user.eid})self.assertEqual(len(rset),0)repo.close(cnxid)deftest_transaction_interleaved(self):self.skipTest('implement me')deftest_close_kill_processing_request(self):repo=self.repocnxid=repo.connect(self.admlogin,password=self.admpassword)repo.execute(cnxid,'INSERT CWUser X: X login "toto", X upassword "tutu", X in_group G WHERE G name "users"')repo.commit(cnxid)lock=threading.Lock()lock.acquire()# close has to be in the thread due to sqlite limitationsdefclose_in_a_few_moment():lock.acquire()repo.close(cnxid)t=threading.Thread(target=close_in_a_few_moment)t.start()defrun_transaction():lock.release()repo.execute(cnxid,'DELETE CWUser X WHERE X login "toto"')repo.commit(cnxid)try:withself.assertRaises(SessionClosedError)ascm:run_transaction()self.assertEqual(str(cm.exception),'try to access connections set on a closed session %s'%cnxid)finally:t.join()deftest_initial_schema(self):schema=self.repo.schema# check order of attributes is respectednotin=set(('eid','is','is_instance_of','identity','creation_date','modification_date','cwuri','owned_by','created_by','cw_source','update_permission','read_permission','in_basket'))self.assertListEqual(['relation_type','from_entity','to_entity','constrained_by','cardinality','ordernum','indexed','fulltextindexed','internationalizable','defaultval','extra_props','description','description_format'],[r.typeforrinschema.eschema('CWAttribute').ordered_relations()ifr.typenotinnotin])self.assertEqual(schema.eschema('CWEType').main_attribute(),'name')self.assertEqual(schema.eschema('State').main_attribute(),'name')constraints=schema.rschema('name').rdef('CWEType','String').constraintsself.assertEqual(len(constraints),2)forcstrinconstraints[:]:ifisinstance(cstr,UniqueConstraint):constraints.remove(cstr)breakelse:self.fail('unique constraint not found')sizeconstraint=constraints[0]self.assertEqual(sizeconstraint.min,None)self.assertEqual(sizeconstraint.max,64)constraints=schema.rschema('relation_type').rdef('CWAttribute','CWRType').constraintsself.assertEqual(len(constraints),1)cstr=constraints[0]self.assert_(isinstance(cstr,RQLConstraint))self.assertEqual(cstr.expression,'O final TRUE')ownedby=schema.rschema('owned_by')self.assertEqual(ownedby.objects('CWEType'),('CWUser',))deftest_pyro(self):importPyroPyro.config.PYRO_MULTITHREADED=0done=[]self.repo.config.global_set_option('pyro-ns-host','NO_PYRONS')daemon=self.repo.pyro_register()try:uri=self.repo.pyro_uri.replace('PYRO','pyroloc')# the client part has to be in the thread due to sqlite limitationst=threading.Thread(target=self._pyro_client,args=(uri,done))t.start()whilenotdone:daemon.handleRequests(1.0)t.join(1)ift.isAlive():self.fail('something went wrong, thread still alive')finally:repository.pyro_unregister(self.repo.config)fromlogilab.commonimportpyro_extpyro_ext._DAEMONS.clear()def_pyro_client(self,uri,done):cnx=connect(uri,u'admin',password='gingkow',initlog=False)# don't reset logging configurationtry:cnx.load_appobjects(subpath=('entities',))# check we can get the schemaschema=cnx.get_schema()self.assertTrue(cnx.vreg)self.assertTrue('etypes'incnx.vreg)cu=cnx.cursor()rset=cu.execute('Any U,G WHERE U in_group G')user=iter(rset.entities()).next()self.assertTrue(user._cw)self.assertTrue(user._cw.vreg)fromcubicweb.entitiesimportauthobjsself.assertIsInstance(user._cw.user,authobjs.CWUser)# make sure the tcp connection is closed properly; yes, it's disgusting.adapter=cnx._repo.adaptercnx.close()adapter.release()done.append(True)finally:# connect monkey patch some method by default, remove themmultiple_connections_unfix()deftest_zmq(self):try:importzmqexceptImportError:self.skipTest("zmq in not available")done=[]fromcubicweb.devtoolsimportTestServerConfigurationasServerConfigurationfromcubicweb.server.cwzmqimportZMQRepositoryServer# the client part has to be in a thread due to sqlite limitationst=threading.Thread(target=self._zmq_client,args=(done,))t.start()zmq_server=ZMQRepositoryServer(self.repo)zmq_server.connect('zmqpickle-tcp://127.0.0.1:41415')t2=threading.Thread(target=self._zmq_quit,args=(done,zmq_server,))t2.start()zmq_server.run()t2.join(1)t.join(1)ift.isAlive():self.fail('something went wrong, thread still alive')def_zmq_quit(self,done,srv):whilenotdone:time.sleep(0.1)srv.quit()def_zmq_client(self,done):try:cnx=connect('zmqpickle-tcp://127.0.0.1:41415',u'admin',password=u'gingkow',initlog=False)# don't reset logging configurationtry:cnx.load_appobjects(subpath=('entities',))# check we can get the schemaschema=cnx.get_schema()self.assertTrue(cnx.vreg)self.assertTrue('etypes'incnx.vreg)cu=cnx.cursor()rset=cu.execute('Any U,G WHERE U in_group G')user=iter(rset.entities()).next()self.assertTrue(user._cw)self.assertTrue(user._cw.vreg)fromcubicweb.entitiesimportauthobjsself.assertIsInstance(user._cw.user,authobjs.CWUser)cnx.close()done.append(True)finally:# connect monkey patch some method by default, remove themmultiple_connections_unfix()finally:done.append(False)deftest_internal_api(self):repo=self.repocnxid=repo.connect(self.admlogin,password=self.admpassword)session=repo._get_session(cnxid,setcnxset=True)self.assertEqual(repo.type_and_source_from_eid(2,session),('CWGroup','system',None,'system'))self.assertEqual(repo.type_from_eid(2,session),'CWGroup')self.assertEqual(repo.source_from_eid(2,session).uri,'system')self.assertEqual(repo.eid2extid(repo.system_source,2,session),None)classdummysource:uri='toto'self.assertRaises(UnknownEid,repo.eid2extid,dummysource,2,session)repo.close(cnxid)deftest_public_api(self):self.assertEqual(self.repo.get_schema(),self.repo.schema)self.assertEqual(self.repo.source_defs(),{'system':{'type':'native','uri':'system','use-cwuri-as-url':False}})# .properties() return a result setself.assertEqual(self.repo.properties().rql,'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U')deftest_session_api(self):repo=self.repocnxid=repo.connect(self.admlogin,password=self.admpassword)self.assertEqual(repo.user_info(cnxid),(6,'admin',set([u'managers']),{}))self.assertEqual(repo.describe(cnxid,2),(u'CWGroup',u'system',None,'system'))repo.close(cnxid)self.assertRaises(BadConnectionId,repo.user_info,cnxid)self.assertRaises(BadConnectionId,repo.describe,cnxid,1)deftest_shared_data_api(self):repo=self.repocnxid=repo.connect(self.admlogin,password=self.admpassword)self.assertEqual(repo.get_shared_data(cnxid,'data'),None)repo.set_shared_data(cnxid,'data',4)self.assertEqual(repo.get_shared_data(cnxid,'data'),4)repo.get_shared_data(cnxid,'data',pop=True)repo.get_shared_data(cnxid,'whatever',pop=True)self.assertEqual(repo.get_shared_data(cnxid,'data'),None)repo.close(cnxid)self.assertRaises(BadConnectionId,repo.set_shared_data,cnxid,'data',0)self.assertRaises(BadConnectionId,repo.get_shared_data,cnxid,'data')deftest_schema_is_relation(self):no_is_rset=self.execute('Any X WHERE NOT X is ET')self.assertFalse(no_is_rset,no_is_rset.description)# def test_perfo(self):# self.set_debug(True)# from time import time, clock# t, c = time(), clock()# try:# self.create_user('toto')# finally:# self.set_debug(False)# print 'test time: %.3f (time) %.3f (cpu)' % ((time() - t), clock() - c)deftest_delete_if_singlecard1(self):note=self.request().create_entity('Affaire')p1=self.request().create_entity('Personne',nom=u'toto')self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s',{'x':note.eid,'p':p1.eid})rset=self.execute('Any P WHERE A todo_by P, A eid %(x)s',{'x':note.eid})self.assertEqual(len(rset),1)p2=self.request().create_entity('Personne',nom=u'tutu')self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s',{'x':note.eid,'p':p2.eid})rset=self.execute('Any P WHERE A todo_by P, A eid %(x)s',{'x':note.eid})self.assertEqual(len(rset),1)self.assertEqual(rset.rows[0][0],p2.eid)deftest_delete_if_object_inlined_singlecard(self):req=self.request()c=req.create_entity('Card',title=u'Carte')req.create_entity('Personne',nom=u'Vincent',fiche=c)req.create_entity('Personne',nom=u'Florent',fiche=c)self.commit()self.assertEqual(len(c.reverse_fiche),1)deftest_cw_set_in_before_update(self):# local hookclassDummyBeforeHook(Hook):__regid__='dummy-before-hook'__select__=Hook.__select__&is_instance('EmailAddress')events=('before_update_entity',)def__call__(self):# safety belt: avoid potential infinite recursion if the test# fails (i.e. RuntimeError not raised)pendings=self._cw.transaction_data.setdefault('pending',set())ifself.entity.eidnotinpendings:pendings.add(self.entity.eid)self.entity.cw_set(alias=u'foo')withself.temporary_appobjects(DummyBeforeHook):req=self.request()addr=req.create_entity('EmailAddress',address=u'a@b.fr')addr.cw_set(address=u'a@b.com')rset=self.execute('Any A,AA WHERE X eid %(x)s, X address A, X alias AA',{'x':addr.eid})self.assertEqual(rset.rows,[[u'a@b.com',u'foo']])deftest_cw_set_in_before_add(self):# local hookclassDummyBeforeHook(Hook):__regid__='dummy-before-hook'__select__=Hook.__select__&is_instance('EmailAddress')events=('before_add_entity',)def__call__(self):# cw_set is forbidden within before_add_entity()self.entity.cw_set(alias=u'foo')withself.temporary_appobjects(DummyBeforeHook):req=self.request()# XXX will fail with python -Oself.assertRaises(AssertionError,req.create_entity,'EmailAddress',address=u'a@b.fr')deftest_multiple_edit_cw_set(self):"""make sure cw_edited doesn't get cluttered by previous entities on multiple set """# local hookclassDummyBeforeHook(Hook):_test=self# keep reference to test instance__regid__='dummy-before-hook'__select__=Hook.__select__&is_instance('Affaire')events=('before_update_entity',)def__call__(self):# invoiced attribute shouldn't be considered "edited" before the hookself._test.assertFalse('invoiced'inself.entity.cw_edited,'cw_edited cluttered by previous update')self.entity.cw_edited['invoiced']=10withself.temporary_appobjects(DummyBeforeHook):req=self.request()req.create_entity('Affaire',ref=u'AFF01')req.create_entity('Affaire',ref=u'AFF02')req.execute('SET A duration 10 WHERE A is Affaire')deftest_user_friendly_error(self):fromcubicweb.entities.adaptersimportIUserFriendlyUniqueTogetherclassMyIUserFriendlyUniqueTogether(IUserFriendlyUniqueTogether):__select__=IUserFriendlyUniqueTogether.__select__&is_instance('Societe')defraise_user_exception(self):raiseValidationError(self.entity.eid,{'hip':'hop'})withself.temporary_appobjects(MyIUserFriendlyUniqueTogether):req=self.request()s=req.create_entity('Societe',nom=u'Logilab',type=u'ssll',cp=u'75013')self.commit()withself.assertRaises(ValidationError)ascm:req.create_entity('Societe',nom=u'Logilab',type=u'ssll',cp=u'75013')self.assertEqual(cm.exception.errors,{'hip':'hop'})self.rollback()req.create_entity('Societe',nom=u'Logilab',type=u'ssll',cp=u'31400')withself.assertRaises(ValidationError)ascm:s.cw_set(cp=u'31400')self.assertEqual(cm.exception.entity,s.eid)self.assertEqual(cm.exception.errors,{'hip':'hop'})self.rollback()classSchemaDeserialTC(CubicWebTC):appid='data-schemaserial'@classmethoddefsetUpClass(cls):register_base_type('BabarTestType',('jungle_speed',))helper=get_db_helper('sqlite')helper.TYPE_MAPPING['BabarTestType']='TEXT'helper.TYPE_CONVERTERS['BabarTestType']=lambdax:'"%s"'%xsuper(SchemaDeserialTC,cls).setUpClass()@classmethoddeftearDownClass(cls):unregister_base_type('BabarTestType')helper=get_db_helper('sqlite')helper.TYPE_MAPPING.pop('BabarTestType',None)helper.TYPE_CONVERTERS.pop('BabarTestType',None)super(SchemaDeserialTC,cls).tearDownClass()deftest_deserialization_base(self):"""Check the following deserialization * all CWEtype has name * Final type * CWUniqueTogetherConstraint * _unique_together__ content"""origshema=self.repo.schematry:self.repo.config.repairing=True# avoid versions checkingself.repo.set_schema(self.repo.deserialize_schema())table=SQL_PREFIX+'CWEType'namecol=SQL_PREFIX+'name'finalcol=SQL_PREFIX+'final'self.session.set_cnxset()cu=self.session.system_sql('SELECT %s FROM %s WHERE %s is NULL'%(namecol,table,finalcol))self.assertEqual(cu.fetchall(),[])cu=self.session.system_sql('SELECT %s FROM %s WHERE %s=%%(final)s ORDER BY %s'%(namecol,table,finalcol,namecol),{'final':True})self.assertEqual(cu.fetchall(),[(u'BabarTestType',),(u'BigInt',),(u'Boolean',),(u'Bytes',),(u'Date',),(u'Datetime',),(u'Decimal',),(u'Float',),(u'Int',),(u'Interval',),(u'Password',),(u'String',),(u'TZDatetime',),(u'TZTime',),(u'Time',)])sql=("SELECT etype.cw_eid, etype.cw_name, cstr.cw_eid, rel.eid_to ""FROM cw_CWUniqueTogetherConstraint as cstr, "" relations_relation as rel, "" cw_CWEType as etype ""WHERE cstr.cw_eid = rel.eid_from "" AND cstr.cw_constraint_of = etype.cw_eid "" AND etype.cw_name = 'Personne' "";")cu=self.session.system_sql(sql)rows=cu.fetchall()self.assertEqual(len(rows),3)person=self.repo.schema.eschema('Personne')self.assertEqual(len(person._unique_together),1)self.assertItemsEqual(person._unique_together[0],('nom','prenom','inline2'))finally:self.repo.set_schema(origshema)deftest_custom_attribute_param(self):origshema=self.repo.schematry:self.repo.config.repairing=True# avoid versions checkingself.repo.set_schema(self.repo.deserialize_schema())pes=self.repo.schema['Personne']attr=pes.rdef('custom_field_of_jungle')self.assertIn('jungle_speed',vars(attr))self.assertEqual(42,attr.jungle_speed)finally:self.repo.set_schema(origshema)classDataHelpersTC(CubicWebTC):deftest_create_eid(self):self.session.set_cnxset()self.assert_(self.repo.system_source.create_eid(self.session))deftest_source_from_eid(self):self.session.set_cnxset()self.assertEqual(self.repo.source_from_eid(1,self.session),self.repo.sources_by_uri['system'])deftest_source_from_eid_raise(self):self.session.set_cnxset()self.assertRaises(UnknownEid,self.repo.source_from_eid,-2,self.session)deftest_type_from_eid(self):self.session.set_cnxset()self.assertEqual(self.repo.type_from_eid(2,self.session),'CWGroup')deftest_type_from_eid_raise(self):self.session.set_cnxset()self.assertRaises(UnknownEid,self.repo.type_from_eid,-2,self.session)deftest_add_delete_info(self):entity=self.repo.vreg['etypes'].etype_class('Personne')(self.session)entity.eid=-1entity.complete=lambdax:Noneself.session.set_cnxset()self.repo.add_info(self.session,entity,self.repo.system_source)cu=self.session.system_sql('SELECT * FROM entities WHERE eid = -1')data=cu.fetchall()self.assertIsInstance(data[0][4],datetime)data[0]=list(data[0])data[0][4]=Noneself.assertEqual(tuplify(data),[(-1,'Personne','system','system',None,None)])self.repo.delete_info(self.session,entity,'system',None)#self.repo.commit()cu=self.session.system_sql('SELECT * FROM entities WHERE eid = -1')data=cu.fetchall()self.assertEqual(data,[])classFTITC(CubicWebTC):deftest_reindex_and_modified_since(self):self.repo.system_source.multisources_etypes.add('Personne')eidp=self.execute('INSERT Personne X: X nom "toto", X prenom "tutu"')[0][0]self.commit()ts=datetime.now()self.assertEqual(len(self.execute('Personne X WHERE X has_text "tutu"')),1)self.session.set_cnxset()cu=self.session.system_sql('SELECT mtime, eid FROM entities WHERE eid = %s'%eidp)omtime=cu.fetchone()[0]# our sqlite datetime adapter is ignore seconds fraction, so we have to# ensure update is done the next secondstime.sleep(1-(ts.second-int(ts.second)))self.execute('SET X nom "tata" WHERE X eid %(x)s',{'x':eidp})self.commit()self.assertEqual(len(self.execute('Personne X WHERE X has_text "tutu"')),1)self.session.set_cnxset()cu=self.session.system_sql('SELECT mtime FROM entities WHERE eid = %s'%eidp)mtime=cu.fetchone()[0]self.assertTrue(omtime<mtime)self.commit()date,modified,deleted=self.repo.entities_modified_since(('Personne',),omtime)self.assertEqual(modified,[('Personne',eidp)])self.assertEqual(deleted,[])date,modified,deleted=self.repo.entities_modified_since(('Personne',),mtime)self.assertEqual(modified,[])self.assertEqual(deleted,[])self.execute('DELETE Personne X WHERE X eid %(x)s',{'x':eidp})self.commit()date,modified,deleted=self.repo.entities_modified_since(('Personne',),omtime)self.assertEqual(modified,[])self.assertEqual(deleted,[('Personne',eidp)])deftest_fulltext_container_entity(self):assertself.schema.rschema('use_email').fulltext_container=='subject'req=self.request()toto=req.create_entity('EmailAddress',address=u'toto@logilab.fr')self.commit()rset=req.execute('Any X WHERE X has_text %(t)s',{'t':'toto'})self.assertEqual(rset.rows,[])req.user.cw_set(use_email=toto)self.commit()rset=req.execute('Any X WHERE X has_text %(t)s',{'t':'toto'})self.assertEqual(rset.rows,[[req.user.eid]])req.execute('DELETE X use_email Y WHERE X login "admin", Y eid %(y)s',{'y':toto.eid})self.commit()rset=req.execute('Any X WHERE X has_text %(t)s',{'t':'toto'})self.assertEqual(rset.rows,[])tutu=req.create_entity('EmailAddress',address=u'tutu@logilab.fr')req.user.cw_set(use_email=tutu)self.commit()rset=req.execute('Any X WHERE X has_text %(t)s',{'t':'tutu'})self.assertEqual(rset.rows,[[req.user.eid]])tutu.cw_set(address=u'hip@logilab.fr')self.commit()rset=req.execute('Any X WHERE X has_text %(t)s',{'t':'tutu'})self.assertEqual(rset.rows,[])rset=req.execute('Any X WHERE X has_text %(t)s',{'t':'hip'})self.assertEqual(rset.rows,[[req.user.eid]])deftest_no_uncessary_ftiindex_op(self):req=self.request()req.create_entity('Workflow',name=u'dummy workflow',description=u'huuuuu')self.assertFalse(any(xforxinself.session.pending_operationsifisinstance(x,native.FTIndexEntityOp)))classDBInitTC(CubicWebTC):deftest_versions_inserted(self):inserted=[r[0]forrinself.execute('Any K ORDERBY K WHERE P pkey K, P pkey ~= "system.version.%"')]self.assertEqual(inserted,[u'system.version.basket',u'system.version.card',u'system.version.comment',u'system.version.cubicweb',u'system.version.email',u'system.version.file',u'system.version.folder',u'system.version.localperms',u'system.version.tag'])CALLED=[]classInlineRelHooksTC(CubicWebTC):"""test relation hooks are called for inlined relations """defsetUp(self):CubicWebTC.setUp(self)CALLED[:]=()def_after_relation_hook(self,cnxset,fromeid,rtype,toeid):self.called.append((fromeid,rtype,toeid))deftest_inline_relation(self):"""make sure <event>_relation hooks are called for inlined relation"""classEcritParHook(hook.Hook):__regid__='inlinedrelhook'__select__=hook.Hook.__select__&hook.match_rtype('ecrit_par')events=('before_add_relation','after_add_relation','before_delete_relation','after_delete_relation')def__call__(self):CALLED.append((self.event,self.eidfrom,self.rtype,self.eidto))withself.temporary_appobjects(EcritParHook):eidp=self.execute('INSERT Personne X: X nom "toto"')[0][0]eidn=self.execute('INSERT Note X: X type "T"')[0][0]self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"')self.assertEqual(CALLED,[('before_add_relation',eidn,'ecrit_par',eidp),('after_add_relation',eidn,'ecrit_par',eidp)])CALLED[:]=()self.execute('DELETE N ecrit_par Y WHERE N type "T", Y nom "toto"')self.assertEqual(CALLED,[('before_delete_relation',eidn,'ecrit_par',eidp),('after_delete_relation',eidn,'ecrit_par',eidp)])CALLED[:]=()eidn=self.execute('INSERT Note N: N ecrit_par P WHERE P nom "toto"')[0][0]self.assertEqual(CALLED,[('before_add_relation',eidn,'ecrit_par',eidp),('after_add_relation',eidn,'ecrit_par',eidp)])deftest_unique_contraint(self):req=self.request()toto=req.create_entity('Personne',nom=u'toto')a01=req.create_entity('Affaire',ref=u'A01',todo_by=toto)req.cnx.commit()req=self.request()req.create_entity('Note',type=u'todo',inline1=a01)req.cnx.commit()req=self.request()req.create_entity('Note',type=u'todo',inline1=a01)withself.assertRaises(ValidationError)ascm:req.cnx.commit()self.assertEqual(cm.exception.errors,{'inline1-subject':u'RQLUniqueConstraint S type T, S inline1 A1, A1 todo_by C, Y type T, Y inline1 A2, A2 todo_by C failed'})deftest_add_relations_at_creation_with_del_existing_rel(self):req=self.request()person=req.create_entity('Personne',nom=u'Toto',prenom=u'Lanturlu',sexe=u'M')users_rql='Any U WHERE U is CWGroup, U name "users"'users=self.execute(users_rql).get_entity(0,0)req.create_entity('CWUser',login=u'Toto',upassword=u'firstname',firstname=u'firstname',surname=u'surname',reverse_login_user=person,in_group=users)self.commit()classPerformanceTest(CubicWebTC):defsetUp(self):super(PerformanceTest,self).setUp()logger=logging.getLogger('cubicweb.session')#logger.handlers = [logging.StreamHandler(sys.stdout)]logger.setLevel(logging.INFO)self.info=logger.infodeftearDown(self):super(PerformanceTest,self).tearDown()logger=logging.getLogger('cubicweb.session')logger.setLevel(logging.CRITICAL)deftest_composite_deletion(self):req=self.request()personnes=[]t0=time.time()foriinxrange(2000):p=req.create_entity('Personne',nom=u'Doe%03d'%i,prenom=u'John',sexe=u'M')personnes.append(p)abraham=req.create_entity('Personne',nom=u'Abraham',prenom=u'John',sexe=u'M')forjinxrange(0,2000,100):abraham.cw_set(personne_composite=personnes[j:j+100])t1=time.time()self.info('creation: %.2gs',(t1-t0))req.cnx.commit()t2=time.time()self.info('commit creation: %.2gs',(t2-t1))self.execute('DELETE Personne P WHERE P eid %(eid)s',{'eid':abraham.eid})t3=time.time()self.info('deletion: %.2gs',(t3-t2))req.cnx.commit()t4=time.time()self.info("commit deletion: %2gs",(t4-t3))deftest_add_relation_non_inlined(self):req=self.request()personnes=[]foriinxrange(2000):p=req.create_entity('Personne',nom=u'Doe%03d'%i,prenom=u'John',sexe=u'M')personnes.append(p)req.cnx.commit()t0=time.time()abraham=req.create_entity('Personne',nom=u'Abraham',prenom=u'John',sexe=u'M',personne_composite=personnes[:100])t1=time.time()self.info('creation: %.2gs',(t1-t0))forjinxrange(100,2000,100):abraham.cw_set(personne_composite=personnes[j:j+100])t2=time.time()self.info('more relations: %.2gs',(t2-t1))req.cnx.commit()t3=time.time()self.info('commit creation: %.2gs',(t3-t2))deftest_add_relation_inlined(self):req=self.request()personnes=[]foriinxrange(2000):p=req.create_entity('Personne',nom=u'Doe%03d'%i,prenom=u'John',sexe=u'M')personnes.append(p)req.cnx.commit()t0=time.time()abraham=req.create_entity('Personne',nom=u'Abraham',prenom=u'John',sexe=u'M',personne_inlined=personnes[:100])t1=time.time()self.info('creation: %.2gs',(t1-t0))forjinxrange(100,2000,100):abraham.cw_set(personne_inlined=personnes[j:j+100])t2=time.time()self.info('more relations: %.2gs',(t2-t1))req.cnx.commit()t3=time.time()self.info('commit creation: %.2gs',(t3-t2))deftest_session_add_relation(self):""" to be compared with test_session_add_relations"""req=self.request()personnes=[]foriinxrange(2000):p=req.create_entity('Personne',nom=u'Doe%03d'%i,prenom=u'John',sexe=u'M')personnes.append(p)abraham=req.create_entity('Personne',nom=u'Abraham',prenom=u'John',sexe=u'M')req.cnx.commit()t0=time.time()add_relation=self.session.add_relationforpinpersonnes:add_relation(abraham.eid,'personne_composite',p.eid)req.cnx.commit()t1=time.time()self.info('add relation: %.2gs',t1-t0)deftest_session_add_relations(self):""" to be compared with test_session_add_relation"""req=self.request()personnes=[]foriinxrange(2000):p=req.create_entity('Personne',nom=u'Doe%03d'%i,prenom=u'John',sexe=u'M')personnes.append(p)abraham=req.create_entity('Personne',nom=u'Abraham',prenom=u'John',sexe=u'M')req.cnx.commit()t0=time.time()add_relations=self.session.add_relationsrelations=[('personne_composite',[(abraham.eid,p.eid)forpinpersonnes])]add_relations(relations)req.cnx.commit()t1=time.time()self.info('add relations: %.2gs',t1-t0)deftest_session_add_relation_inlined(self):""" to be compared with test_session_add_relations"""req=self.request()personnes=[]foriinxrange(2000):p=req.create_entity('Personne',nom=u'Doe%03d'%i,prenom=u'John',sexe=u'M')personnes.append(p)abraham=req.create_entity('Personne',nom=u'Abraham',prenom=u'John',sexe=u'M')req.cnx.commit()t0=time.time()add_relation=self.session.add_relationforpinpersonnes:add_relation(abraham.eid,'personne_inlined',p.eid)req.cnx.commit()t1=time.time()self.info('add relation (inlined): %.2gs',t1-t0)deftest_session_add_relations_inlined(self):""" to be compared with test_session_add_relation"""req=self.request()personnes=[]foriinxrange(2000):p=req.create_entity('Personne',nom=u'Doe%03d'%i,prenom=u'John',sexe=u'M')personnes.append(p)abraham=req.create_entity('Personne',nom=u'Abraham',prenom=u'John',sexe=u'M')req.cnx.commit()t0=time.time()add_relations=self.session.add_relationsrelations=[('personne_inlined',[(abraham.eid,p.eid)forpinpersonnes])]add_relations(relations)req.cnx.commit()t1=time.time()self.info('add relations (inlined): %.2gs',t1-t0)deftest_optional_relation_reset_1(self):req=self.request()p1=req.create_entity('Personne',nom=u'Vincent')p2=req.create_entity('Personne',nom=u'Florent')w=req.create_entity('Affaire',ref=u'wc')w.cw_set(todo_by=[p1,p2])w.cw_clear_all_caches()self.commit()self.assertEqual(len(w.todo_by),1)self.assertEqual(w.todo_by[0].eid,p2.eid)deftest_optional_relation_reset_2(self):req=self.request()p1=req.create_entity('Personne',nom=u'Vincent')p2=req.create_entity('Personne',nom=u'Florent')w=req.create_entity('Affaire',ref=u'wc')w.cw_set(todo_by=p1)self.commit()w.cw_set(todo_by=p2)w.cw_clear_all_caches()self.commit()self.assertEqual(len(w.todo_by),1)self.assertEqual(w.todo_by[0].eid,p2.eid)if__name__=='__main__':unittest_main()