[CWEP002] refactor rql read security checking
Split 'check_read_perms' into 'check_relations_perms' which checks relations
'read' permissions and 'get_local_checks' which build dictionary of local
security checks (rql expression) for variables.
This allows to check relations 'read' permissions earlier in the process and so
to prepare insertion of the rql rewriter: we want to check permissions of the
computed relation, not permissions of relations introduced by the associated
rule, to conform to the CWEP.
Related to #3546717
# -*- coding: iso-8859-1 -*-# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""unit tests for module cubicweb.server.repository"""importthreadingimporttimeimportloggingfromyams.constraintsimportUniqueConstraintfromyamsimportregister_base_type,unregister_base_typefromlogilab.databaseimportget_db_helperfromcubicwebimport(BadConnectionId,ValidationError,UnknownEid,AuthenticationError,Unauthorized,QueryError)fromcubicweb.predicatesimportis_instancefromcubicweb.schemaimportRQLConstraintfromcubicweb.dbapiimportconnect,multiple_connections_unfixfromcubicweb.devtools.testlibimportCubicWebTCfromcubicweb.devtools.repotestimporttuplifyfromcubicweb.serverimportrepository,hookfromcubicweb.server.sqlutilsimportSQL_PREFIXfromcubicweb.server.hookimportHookfromcubicweb.server.sourcesimportnativefromcubicweb.server.sessionimportSessionClosedErrorclassRepositoryTC(CubicWebTC):""" singleton providing access to a persistent storage for entities and relation """deftest_unique_together_constraint(self):withself.admin_access.repo_cnx()ascnx:cnx.execute('INSERT Societe S: S nom "Logilab", S type "SSLL", S cp "75013"')withself.assertRaises(ValidationError)aswraperr:cnx.execute('INSERT Societe S: S nom "Logilab", S type "SSLL", S cp "75013"')self.assertEqual({'cp':u'cp is part of violated unicity constraint','nom':u'nom is part of violated unicity constraint','type':u'type is part of violated unicity constraint','unicity constraint':u'some relations violate a unicity constraint'},wraperr.exception.args[1])deftest_unique_together_schema(self):person=self.repo.schema.eschema('Personne')self.assertEqual(len(person._unique_together),1)self.assertItemsEqual(person._unique_together[0],('nom','prenom','inline2'))deftest_all_entities_have_owner(self):withself.admin_access.repo_cnx()ascnx:self.assertFalse(cnx.execute('Any X WHERE NOT X owned_by U'))deftest_all_entities_have_is(self):withself.admin_access.repo_cnx()ascnx:self.assertFalse(cnx.execute('Any X WHERE NOT X is ET'))deftest_all_entities_have_cw_source(self):withself.admin_access.repo_cnx()ascnx:self.assertFalse(cnx.execute('Any X WHERE NOT X cw_source S'))deftest_connect(self):cnxid=self.repo.connect(self.admlogin,password=self.admpassword)self.assert_(cnxid)self.repo.close(cnxid)self.assertRaises(AuthenticationError,self.repo.connect,self.admlogin,password='nimportnawak')self.assertRaises(AuthenticationError,self.repo.connect,self.admlogin,password='')self.assertRaises(AuthenticationError,self.repo.connect,self.admlogin,password=None)self.assertRaises(AuthenticationError,self.repo.connect,None,password=None)self.assertRaises(AuthenticationError,self.repo.connect,self.admlogin)self.assertRaises(AuthenticationError,self.repo.connect,None)deftest_execute(self):repo=self.repocnxid=repo.connect(self.admlogin,password=self.admpassword)repo.execute(cnxid,'Any X')repo.execute(cnxid,'Any X where X is Personne')repo.execute(cnxid,'Any X where X is Personne, X nom ~= "to"')repo.execute(cnxid,'Any X WHERE X has_text %(text)s',{'text':u'\xe7a'})repo.close(cnxid)deftest_login_upassword_accent(self):repo=self.repocnxid=repo.connect(self.admlogin,password=self.admpassword)repo.execute(cnxid,'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_group G WHERE G name "users"',{'login':u"barnab�",'passwd':u"h�h�h�".encode('UTF8')})repo.commit(cnxid)repo.close(cnxid)cnxid=repo.connect(u"barnab�",password=u"h�h�h�".encode('UTF8'))self.assert_(cnxid)repo.close(cnxid)deftest_rollback_on_commit_error(self):cnxid=self.repo.connect(self.admlogin,password=self.admpassword)self.repo.execute(cnxid,'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s',{'login':u"tutetute",'passwd':'tutetute'})self.assertRaises(ValidationError,self.repo.commit,cnxid)self.assertFalse(self.repo.execute(cnxid,'CWUser X WHERE X login "tutetute"'))self.repo.close(cnxid)deftest_rollback_on_execute_validation_error(self):classValidationErrorAfterHook(Hook):__regid__='valerror-after-hook'__select__=Hook.__select__&is_instance('CWGroup')events=('after_update_entity',)def__call__(self):raiseValidationError(self.entity.eid,{})withself.admin_access.repo_cnx()ascnx:withself.temporary_appobjects(ValidationErrorAfterHook):self.assertRaises(ValidationError,cnx.execute,'SET X name "toto" WHERE X is CWGroup, X name "guests"')self.assertTrue(cnx.execute('Any X WHERE X is CWGroup, X name "toto"'))withself.assertRaises(QueryError)ascm:cnx.commit()self.assertEqual(str(cm.exception),'transaction must be rolled back')cnx.rollback()self.assertFalse(cnx.execute('Any X WHERE X is CWGroup, X name "toto"'))deftest_rollback_on_execute_unauthorized(self):classUnauthorizedAfterHook(Hook):__regid__='unauthorized-after-hook'__select__=Hook.__select__&is_instance('CWGroup')events=('after_update_entity',)def__call__(self):raiseUnauthorized()withself.admin_access.repo_cnx()ascnx:withself.temporary_appobjects(UnauthorizedAfterHook):self.assertRaises(Unauthorized,cnx.execute,'SET X name "toto" WHERE X is CWGroup, X name "guests"')self.assertTrue(cnx.execute('Any X WHERE X is CWGroup, X name "toto"'))withself.assertRaises(QueryError)ascm:cnx.commit()self.assertEqual(str(cm.exception),'transaction must be rolled back')cnx.rollback()self.assertFalse(cnx.execute('Any X WHERE X is CWGroup, X name "toto"'))deftest_close(self):repo=self.repocnxid=repo.connect(self.admlogin,password=self.admpassword)self.assert_(cnxid)repo.close(cnxid)self.assertRaises(BadConnectionId,repo.execute,cnxid,'Any X')deftest_invalid_cnxid(self):self.assertRaises(BadConnectionId,self.repo.execute,0,'Any X')self.assertRaises(BadConnectionId,self.repo.close,None)deftest_shared_data(self):repo=self.repocnxid=repo.connect(self.admlogin,password=self.admpassword)repo.set_shared_data(cnxid,'data',4)cnxid2=repo.connect(self.admlogin,password=self.admpassword)self.assertEqual(repo.get_shared_data(cnxid,'data'),4)self.assertEqual(repo.get_shared_data(cnxid2,'data'),None)repo.set_shared_data(cnxid2,'data',5)self.assertEqual(repo.get_shared_data(cnxid,'data'),4)self.assertEqual(repo.get_shared_data(cnxid2,'data'),5)repo.get_shared_data(cnxid2,'data',pop=True)self.assertEqual(repo.get_shared_data(cnxid,'data'),4)self.assertEqual(repo.get_shared_data(cnxid2,'data'),None)repo.close(cnxid)repo.close(cnxid2)self.assertRaises(BadConnectionId,repo.get_shared_data,cnxid,'data')self.assertRaises(BadConnectionId,repo.get_shared_data,cnxid2,'data')self.assertRaises(BadConnectionId,repo.set_shared_data,cnxid,'data',1)self.assertRaises(BadConnectionId,repo.set_shared_data,cnxid2,'data',1)deftest_check_session(self):repo=self.repocnxid=repo.connect(self.admlogin,password=self.admpassword)self.assertIsInstance(repo.check_session(cnxid),float)repo.close(cnxid)self.assertRaises(BadConnectionId,repo.check_session,cnxid)deftest_transaction_base(self):repo=self.repocnxid=repo.connect(self.admlogin,password=self.admpassword)# check db stateresult=repo.execute(cnxid,'Personne X')self.assertEqual(result.rowcount,0)# rollback entity insertionrepo.execute(cnxid,"INSERT Personne X: X nom 'bidule'")result=repo.execute(cnxid,'Personne X')self.assertEqual(result.rowcount,1)repo.rollback(cnxid)result=repo.execute(cnxid,'Personne X')self.assertEqual(result.rowcount,0,result.rows)# commitrepo.execute(cnxid,"INSERT Personne X: X nom 'bidule'")repo.commit(cnxid)result=repo.execute(cnxid,'Personne X')self.assertEqual(result.rowcount,1)repo.close(cnxid)deftest_transaction_base2(self):repo=self.repocnxid=repo.connect(self.admlogin,password=self.admpassword)# rollback relation insertionrepo.execute(cnxid,"SET U in_group G WHERE U login 'admin', G name 'guests'")result=repo.execute(cnxid,"Any U WHERE U in_group G, U login 'admin', G name 'guests'")self.assertEqual(result.rowcount,1)repo.rollback(cnxid)result=repo.execute(cnxid,"Any U WHERE U in_group G, U login 'admin', G name 'guests'")self.assertEqual(result.rowcount,0,result.rows)repo.close(cnxid)deftest_transaction_base3(self):repo=self.repocnxid=repo.connect(self.admlogin,password=self.admpassword)# rollback state change which trigger TrInfo insertionsession=repo._get_session(cnxid)user=session.useruser.cw_adapt_to('IWorkflowable').fire_transition('deactivate')rset=repo.execute(cnxid,'TrInfo T WHERE T wf_info_for X, X eid %(x)s',{'x':user.eid})self.assertEqual(len(rset),1)repo.rollback(cnxid)rset=repo.execute(cnxid,'TrInfo T WHERE T wf_info_for X, X eid %(x)s',{'x':user.eid})self.assertEqual(len(rset),0)repo.close(cnxid)deftest_close_kill_processing_request(self):repo=self.repocnxid=repo.connect(self.admlogin,password=self.admpassword)repo.execute(cnxid,'INSERT CWUser X: X login "toto", X upassword "tutu", X in_group G WHERE G name "users"')repo.commit(cnxid)lock=threading.Lock()lock.acquire()# close has to be in the thread due to sqlite limitationsdefclose_in_a_few_moment():lock.acquire()repo.close(cnxid)t=threading.Thread(target=close_in_a_few_moment)t.start()defrun_transaction():lock.release()repo.execute(cnxid,'DELETE CWUser X WHERE X login "toto"')repo.commit(cnxid)try:withself.assertRaises(SessionClosedError)ascm:run_transaction()self.assertEqual(str(cm.exception),'try to access connections set on a closed session %s'%cnxid)finally:t.join()deftest_initial_schema(self):schema=self.repo.schema# check order of attributes is respectednotin=set(('eid','is','is_instance_of','identity','creation_date','modification_date','cwuri','owned_by','created_by','cw_source','update_permission','read_permission','add_permission','in_basket'))self.assertListEqual(['relation_type','from_entity','to_entity','constrained_by','cardinality','ordernum','indexed','fulltextindexed','internationalizable','defaultval','extra_props','description','description_format'],[r.typeforrinschema.eschema('CWAttribute').ordered_relations()ifr.typenotinnotin])self.assertEqual(schema.eschema('CWEType').main_attribute(),'name')self.assertEqual(schema.eschema('State').main_attribute(),'name')constraints=schema.rschema('name').rdef('CWEType','String').constraintsself.assertEqual(len(constraints),2)forcstrinconstraints[:]:ifisinstance(cstr,UniqueConstraint):constraints.remove(cstr)breakelse:self.fail('unique constraint not found')sizeconstraint=constraints[0]self.assertEqual(sizeconstraint.min,None)self.assertEqual(sizeconstraint.max,64)constraints=schema.rschema('relation_type').rdef('CWAttribute','CWRType').constraintsself.assertEqual(len(constraints),1)cstr=constraints[0]self.assert_(isinstance(cstr,RQLConstraint))self.assertEqual(cstr.expression,'O final TRUE')ownedby=schema.rschema('owned_by')self.assertEqual(ownedby.objects('CWEType'),('CWUser',))deftest_pyro(self):importPyroPyro.config.PYRO_MULTITHREADED=0done=[]self.repo.config.global_set_option('pyro-ns-host','NO_PYRONS')daemon=self.repo.pyro_register()try:uri=self.repo.pyro_uri.replace('PYRO','pyroloc')# the client part has to be in the thread due to sqlite limitationst=threading.Thread(target=self._pyro_client,args=(uri,done))t.start()whilenotdone:daemon.handleRequests(1.0)t.join(1)ift.isAlive():self.fail('something went wrong, thread still alive')finally:repository.pyro_unregister(self.repo.config)fromlogilab.commonimportpyro_extpyro_ext._DAEMONS.clear()def_pyro_client(self,uri,done):cnx=connect(uri,u'admin',password='gingkow',initlog=False)# don't reset logging configurationtry:cnx.load_appobjects(subpath=('entities',))# check we can get the schemaschema=cnx.get_schema()self.assertTrue(cnx.vreg)self.assertTrue('etypes'incnx.vreg)cu=cnx.cursor()rset=cu.execute('Any U,G WHERE U in_group G')user=iter(rset.entities()).next()self.assertTrue(user._cw)self.assertTrue(user._cw.vreg)fromcubicweb.entitiesimportauthobjsself.assertIsInstance(user._cw.user,authobjs.CWUser)# make sure the tcp connection is closed properly; yes, it's disgusting.adapter=cnx._repo.adaptercnx.close()adapter.release()done.append(True)finally:# connect monkey patch some method by default, remove themmultiple_connections_unfix()deftest_zmq(self):try:importzmqexceptImportError:self.skipTest("zmq in not available")done=[]fromcubicweb.devtoolsimportTestServerConfigurationasServerConfigurationfromcubicweb.server.cwzmqimportZMQRepositoryServer# the client part has to be in a thread due to sqlite limitationst=threading.Thread(target=self._zmq_client,args=(done,))t.start()zmq_server=ZMQRepositoryServer(self.repo)zmq_server.connect('zmqpickle-tcp://127.0.0.1:41415')t2=threading.Thread(target=self._zmq_quit,args=(done,zmq_server,))t2.start()zmq_server.run()t2.join(1)t.join(1)ift.isAlive():self.fail('something went wrong, thread still alive')def_zmq_quit(self,done,srv):whilenotdone:time.sleep(0.1)srv.quit()def_zmq_client(self,done):try:cnx=connect('zmqpickle-tcp://127.0.0.1:41415',u'admin',password=u'gingkow',initlog=False)# don't reset logging configurationtry:cnx.load_appobjects(subpath=('entities',))# check we can get the schemaschema=cnx.get_schema()self.assertTrue(cnx.vreg)self.assertTrue('etypes'incnx.vreg)cu=cnx.cursor()rset=cu.execute('Any U,G WHERE U in_group G')user=iter(rset.entities()).next()self.assertTrue(user._cw)self.assertTrue(user._cw.vreg)fromcubicweb.entitiesimportauthobjsself.assertIsInstance(user._cw.user,authobjs.CWUser)cnx.close()done.append(True)finally:# connect monkey patch some method by default, remove themmultiple_connections_unfix()finally:done.append(False)deftest_internal_api(self):repo=self.repocnxid=repo.connect(self.admlogin,password=self.admpassword)session=repo._get_session(cnxid,setcnxset=True)self.assertEqual(repo.type_and_source_from_eid(2,session),('CWGroup',None,'system'))self.assertEqual(repo.type_from_eid(2,session),'CWGroup')repo.close(cnxid)deftest_public_api(self):self.assertEqual(self.repo.get_schema(),self.repo.schema)self.assertEqual(self.repo.source_defs(),{'system':{'type':'native','uri':'system','use-cwuri-as-url':False}})# .properties() return a result setself.assertEqual(self.repo.properties().rql,'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U')deftest_session_api(self):repo=self.repocnxid=repo.connect(self.admlogin,password=self.admpassword)self.assertEqual(repo.user_info(cnxid),(6,'admin',set([u'managers']),{}))self.assertEqual({'type':u'CWGroup','extid':None,'source':'system'},repo.entity_metas(cnxid,2))self.assertEqual(repo.describe(cnxid,2),(u'CWGroup','system',None,'system'))repo.close(cnxid)self.assertRaises(BadConnectionId,repo.user_info,cnxid)self.assertRaises(BadConnectionId,repo.describe,cnxid,1)deftest_shared_data_api(self):repo=self.repocnxid=repo.connect(self.admlogin,password=self.admpassword)self.assertEqual(repo.get_shared_data(cnxid,'data'),None)repo.set_shared_data(cnxid,'data',4)self.assertEqual(repo.get_shared_data(cnxid,'data'),4)repo.get_shared_data(cnxid,'data',pop=True)repo.get_shared_data(cnxid,'whatever',pop=True)self.assertEqual(repo.get_shared_data(cnxid,'data'),None)repo.close(cnxid)self.assertRaises(BadConnectionId,repo.set_shared_data,cnxid,'data',0)self.assertRaises(BadConnectionId,repo.get_shared_data,cnxid,'data')deftest_schema_is_relation(self):withself.admin_access.repo_cnx()ascnx:no_is_rset=cnx.execute('Any X WHERE NOT X is ET')self.assertFalse(no_is_rset,no_is_rset.description)# def test_perfo(self):# self.set_debug(True)# from time import time, clock# t, c = time(), clock()# try:# self.create_user('toto')# finally:# self.set_debug(False)# print 'test time: %.3f (time) %.3f (cpu)' % ((time() - t), clock() - c)deftest_delete_if_singlecard1(self):withself.admin_access.repo_cnx()ascnx:note=cnx.create_entity('Affaire')p1=cnx.create_entity('Personne',nom=u'toto')cnx.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s',{'x':note.eid,'p':p1.eid})rset=cnx.execute('Any P WHERE A todo_by P, A eid %(x)s',{'x':note.eid})self.assertEqual(len(rset),1)p2=cnx.create_entity('Personne',nom=u'tutu')cnx.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s',{'x':note.eid,'p':p2.eid})rset=cnx.execute('Any P WHERE A todo_by P, A eid %(x)s',{'x':note.eid})self.assertEqual(len(rset),1)self.assertEqual(rset.rows[0][0],p2.eid)deftest_delete_if_object_inlined_singlecard(self):withself.admin_access.repo_cnx()ascnx:c=cnx.create_entity('Card',title=u'Carte')cnx.create_entity('Personne',nom=u'Vincent',fiche=c)cnx.create_entity('Personne',nom=u'Florent',fiche=c)cnx.commit()self.assertEqual(len(c.reverse_fiche),1)deftest_cw_set_in_before_update(self):# local hookclassDummyBeforeHook(Hook):__regid__='dummy-before-hook'__select__=Hook.__select__&is_instance('EmailAddress')events=('before_update_entity',)def__call__(self):# safety belt: avoid potential infinite recursion if the test# fails (i.e. RuntimeError not raised)pendings=self._cw.transaction_data.setdefault('pending',set())ifself.entity.eidnotinpendings:pendings.add(self.entity.eid)self.entity.cw_set(alias=u'foo')withself.admin_access.repo_cnx()ascnx:withself.temporary_appobjects(DummyBeforeHook):addr=cnx.create_entity('EmailAddress',address=u'a@b.fr')addr.cw_set(address=u'a@b.com')rset=cnx.execute('Any A,AA WHERE X eid %(x)s, X address A, X alias AA',{'x':addr.eid})self.assertEqual(rset.rows,[[u'a@b.com',u'foo']])deftest_cw_set_in_before_add(self):# local hookclassDummyBeforeHook(Hook):__regid__='dummy-before-hook'__select__=Hook.__select__&is_instance('EmailAddress')events=('before_add_entity',)def__call__(self):# cw_set is forbidden within before_add_entity()self.entity.cw_set(alias=u'foo')withself.admin_access.repo_cnx()ascnx:withself.temporary_appobjects(DummyBeforeHook):# XXX will fail with python -Oself.assertRaises(AssertionError,cnx.create_entity,'EmailAddress',address=u'a@b.fr')deftest_multiple_edit_cw_set(self):"""make sure cw_edited doesn't get cluttered by previous entities on multiple set """# local hookclassDummyBeforeHook(Hook):_test=self# keep reference to test instance__regid__='dummy-before-hook'__select__=Hook.__select__&is_instance('Affaire')events=('before_update_entity',)def__call__(self):# invoiced attribute shouldn't be considered "edited" before the hookself._test.assertFalse('invoiced'inself.entity.cw_edited,'cw_edited cluttered by previous update')self.entity.cw_edited['invoiced']=10withself.admin_access.repo_cnx()ascnx:withself.temporary_appobjects(DummyBeforeHook):cnx.create_entity('Affaire',ref=u'AFF01')cnx.create_entity('Affaire',ref=u'AFF02')cnx.execute('SET A duration 10 WHERE A is Affaire')deftest_user_friendly_error(self):fromcubicweb.entities.adaptersimportIUserFriendlyUniqueTogetherclassMyIUserFriendlyUniqueTogether(IUserFriendlyUniqueTogether):__select__=IUserFriendlyUniqueTogether.__select__&is_instance('Societe')defraise_user_exception(self):raiseValidationError(self.entity.eid,{'hip':'hop'})withself.admin_access.repo_cnx()ascnx:withself.temporary_appobjects(MyIUserFriendlyUniqueTogether):s=cnx.create_entity('Societe',nom=u'Logilab',type=u'ssll',cp=u'75013')cnx.commit()withself.assertRaises(ValidationError)ascm:cnx.create_entity('Societe',nom=u'Logilab',type=u'ssll',cp=u'75013')self.assertEqual(cm.exception.errors,{'hip':'hop'})cnx.rollback()cnx.create_entity('Societe',nom=u'Logilab',type=u'ssll',cp=u'31400')withself.assertRaises(ValidationError)ascm:s.cw_set(cp=u'31400')self.assertEqual(cm.exception.entity,s.eid)self.assertEqual(cm.exception.errors,{'hip':'hop'})cnx.rollback()classSchemaDeserialTC(CubicWebTC):appid='data-schemaserial'@classmethoddefsetUpClass(cls):register_base_type('BabarTestType',('jungle_speed',))helper=get_db_helper('sqlite')helper.TYPE_MAPPING['BabarTestType']='TEXT'helper.TYPE_CONVERTERS['BabarTestType']=lambdax:'"%s"'%xsuper(SchemaDeserialTC,cls).setUpClass()@classmethoddeftearDownClass(cls):unregister_base_type('BabarTestType')helper=get_db_helper('sqlite')helper.TYPE_MAPPING.pop('BabarTestType',None)helper.TYPE_CONVERTERS.pop('BabarTestType',None)super(SchemaDeserialTC,cls).tearDownClass()deftest_deserialization_base(self):"""Check the following deserialization * all CWEtype has name * Final type * CWUniqueTogetherConstraint * _unique_together__ content"""origshema=self.repo.schematry:self.repo.config.repairing=True# avoid versions checkingself.repo.set_schema(self.repo.deserialize_schema())table=SQL_PREFIX+'CWEType'namecol=SQL_PREFIX+'name'finalcol=SQL_PREFIX+'final'withself.admin_access.repo_cnx()ascnx:withcnx.ensure_cnx_set:cu=cnx.system_sql('SELECT %s FROM %s WHERE %s is NULL'%(namecol,table,finalcol))self.assertEqual(cu.fetchall(),[])cu=cnx.system_sql('SELECT %s FROM %s ''WHERE %s=%%(final)s ORDER BY %s'%(namecol,table,finalcol,namecol),{'final':True})self.assertEqual(cu.fetchall(),[(u'BabarTestType',),(u'BigInt',),(u'Boolean',),(u'Bytes',),(u'Date',),(u'Datetime',),(u'Decimal',),(u'Float',),(u'Int',),(u'Interval',),(u'Password',),(u'String',),(u'TZDatetime',),(u'TZTime',),(u'Time',)])sql=("SELECT etype.cw_eid, etype.cw_name, cstr.cw_eid, rel.eid_to ""FROM cw_CWUniqueTogetherConstraint as cstr, "" relations_relation as rel, "" cw_CWEType as etype ""WHERE cstr.cw_eid = rel.eid_from "" AND cstr.cw_constraint_of = etype.cw_eid "" AND etype.cw_name = 'Personne' "";")cu=cnx.system_sql(sql)rows=cu.fetchall()self.assertEqual(len(rows),3)person=self.repo.schema.eschema('Personne')self.assertEqual(len(person._unique_together),1)self.assertItemsEqual(person._unique_together[0],('nom','prenom','inline2'))finally:self.repo.set_schema(origshema)deftest_custom_attribute_param(self):origshema=self.repo.schematry:self.repo.config.repairing=True# avoid versions checkingself.repo.set_schema(self.repo.deserialize_schema())pes=self.repo.schema['Personne']attr=pes.rdef('custom_field_of_jungle')self.assertIn('jungle_speed',vars(attr))self.assertEqual(42,attr.jungle_speed)finally:self.repo.set_schema(origshema)classDataHelpersTC(CubicWebTC):deftest_type_from_eid(self):withself.admin_access.repo_cnx()ascnx:withcnx.ensure_cnx_set:self.assertEqual(self.repo.type_from_eid(2,cnx),'CWGroup')deftest_type_from_eid_raise(self):withself.admin_access.repo_cnx()ascnx:withcnx.ensure_cnx_set:self.assertRaises(UnknownEid,self.repo.type_from_eid,-2,cnx)deftest_add_delete_info(self):withself.admin_access.repo_cnx()ascnx:withcnx.ensure_cnx_set:cnx.mode='write'entity=self.repo.vreg['etypes'].etype_class('Personne')(cnx)entity.eid=-1entity.complete=lambdax:Noneself.repo.add_info(cnx,entity,self.repo.system_source)cu=cnx.system_sql('SELECT * FROM entities WHERE eid = -1')data=cu.fetchall()self.assertEqual(tuplify(data),[(-1,'Personne','system',None)])self.repo.delete_info(cnx,entity,'system')#self.repo.commit()cu=cnx.system_sql('SELECT * FROM entities WHERE eid = -1')data=cu.fetchall()self.assertEqual(data,[])classFTITC(CubicWebTC):deftest_fulltext_container_entity(self):withself.admin_access.repo_cnx()ascnx:assertself.schema.rschema('use_email').fulltext_container=='subject'toto=cnx.create_entity('EmailAddress',address=u'toto@logilab.fr')cnx.commit()rset=cnx.execute('Any X WHERE X has_text %(t)s',{'t':'toto'})self.assertEqual(rset.rows,[])cnx.user.cw_set(use_email=toto)cnx.commit()rset=cnx.execute('Any X WHERE X has_text %(t)s',{'t':'toto'})self.assertEqual(rset.rows,[[cnx.user.eid]])cnx.execute('DELETE X use_email Y WHERE X login "admin", Y eid %(y)s',{'y':toto.eid})cnx.commit()rset=cnx.execute('Any X WHERE X has_text %(t)s',{'t':'toto'})self.assertEqual(rset.rows,[])tutu=cnx.create_entity('EmailAddress',address=u'tutu@logilab.fr')cnx.user.cw_set(use_email=tutu)cnx.commit()rset=cnx.execute('Any X WHERE X has_text %(t)s',{'t':'tutu'})self.assertEqual(rset.rows,[[cnx.user.eid]])tutu.cw_set(address=u'hip@logilab.fr')cnx.commit()rset=cnx.execute('Any X WHERE X has_text %(t)s',{'t':'tutu'})self.assertEqual(rset.rows,[])rset=cnx.execute('Any X WHERE X has_text %(t)s',{'t':'hip'})self.assertEqual(rset.rows,[[cnx.user.eid]])deftest_no_uncessary_ftiindex_op(self):withself.admin_access.repo_cnx()ascnx:cnx.create_entity('Workflow',name=u'dummy workflow',description=u'huuuuu')self.assertFalse(any(xforxincnx.pending_operationsifisinstance(x,native.FTIndexEntityOp)))classDBInitTC(CubicWebTC):deftest_versions_inserted(self):withself.admin_access.repo_cnx()ascnx:inserted=[r[0]forrincnx.execute('Any K ORDERBY K ''WHERE P pkey K, P pkey ~= "system.version.%"')]self.assertEqual(inserted,[u'system.version.basket',u'system.version.card',u'system.version.comment',u'system.version.cubicweb',u'system.version.email',u'system.version.file',u'system.version.folder',u'system.version.localperms',u'system.version.tag'])CALLED=[]classInlineRelHooksTC(CubicWebTC):"""test relation hooks are called for inlined relations """defsetUp(self):CubicWebTC.setUp(self)CALLED[:]=()deftest_inline_relation(self):"""make sure <event>_relation hooks are called for inlined relation"""classEcritParHook(hook.Hook):__regid__='inlinedrelhook'__select__=hook.Hook.__select__&hook.match_rtype('ecrit_par')events=('before_add_relation','after_add_relation','before_delete_relation','after_delete_relation')def__call__(self):CALLED.append((self.event,self.eidfrom,self.rtype,self.eidto))withself.temporary_appobjects(EcritParHook):withself.admin_access.repo_cnx()ascnx:eidp=cnx.execute('INSERT Personne X: X nom "toto"')[0][0]eidn=cnx.execute('INSERT Note X: X type "T"')[0][0]cnx.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"')self.assertEqual(CALLED,[('before_add_relation',eidn,'ecrit_par',eidp),('after_add_relation',eidn,'ecrit_par',eidp)])CALLED[:]=()cnx.execute('DELETE N ecrit_par Y WHERE N type "T", Y nom "toto"')self.assertEqual(CALLED,[('before_delete_relation',eidn,'ecrit_par',eidp),('after_delete_relation',eidn,'ecrit_par',eidp)])CALLED[:]=()eidn=cnx.execute('INSERT Note N: N ecrit_par P WHERE P nom "toto"')[0][0]self.assertEqual(CALLED,[('before_add_relation',eidn,'ecrit_par',eidp),('after_add_relation',eidn,'ecrit_par',eidp)])deftest_unique_contraint(self):withself.admin_access.repo_cnx()ascnx:toto=cnx.create_entity('Personne',nom=u'toto')a01=cnx.create_entity('Affaire',ref=u'A01',todo_by=toto)cnx.commit()cnx.create_entity('Note',type=u'todo',inline1=a01)cnx.commit()cnx.create_entity('Note',type=u'todo',inline1=a01)withself.assertRaises(ValidationError)ascm:cnx.commit()self.assertEqual(cm.exception.errors,{'inline1-subject':u'RQLUniqueConstraint S type T, S inline1 A1, ''A1 todo_by C, Y type T, Y inline1 A2, A2 todo_by C failed'})deftest_add_relations_at_creation_with_del_existing_rel(self):withself.admin_access.repo_cnx()ascnx:person=cnx.create_entity('Personne',nom=u'Toto',prenom=u'Lanturlu',sexe=u'M')users_rql='Any U WHERE U is CWGroup, U name "users"'users=cnx.execute(users_rql).get_entity(0,0)cnx.create_entity('CWUser',login=u'Toto',upassword=u'firstname',firstname=u'firstname',surname=u'surname',reverse_login_user=person,in_group=users)cnx.commit()classPerformanceTest(CubicWebTC):defsetUp(self):super(PerformanceTest,self).setUp()logger=logging.getLogger('cubicweb.session')#logger.handlers = [logging.StreamHandler(sys.stdout)]logger.setLevel(logging.INFO)self.info=logger.infodeftearDown(self):super(PerformanceTest,self).tearDown()logger=logging.getLogger('cubicweb.session')logger.setLevel(logging.CRITICAL)deftest_composite_deletion(self):withself.admin_access.repo_cnx()ascnx:personnes=[]t0=time.time()foriinxrange(2000):p=cnx.create_entity('Personne',nom=u'Doe%03d'%i,prenom=u'John',sexe=u'M')personnes.append(p)abraham=cnx.create_entity('Personne',nom=u'Abraham',prenom=u'John',sexe=u'M')forjinxrange(0,2000,100):abraham.cw_set(personne_composite=personnes[j:j+100])t1=time.time()self.info('creation: %.2gs',(t1-t0))cnx.commit()t2=time.time()self.info('commit creation: %.2gs',(t2-t1))cnx.execute('DELETE Personne P WHERE P eid %(eid)s',{'eid':abraham.eid})t3=time.time()self.info('deletion: %.2gs',(t3-t2))cnx.commit()t4=time.time()self.info("commit deletion: %2gs",(t4-t3))deftest_add_relation_non_inlined(self):withself.admin_access.repo_cnx()ascnx:personnes=[]foriinxrange(2000):p=cnx.create_entity('Personne',nom=u'Doe%03d'%i,prenom=u'John',sexe=u'M')personnes.append(p)cnx.commit()t0=time.time()abraham=cnx.create_entity('Personne',nom=u'Abraham',prenom=u'John',sexe=u'M',personne_composite=personnes[:100])t1=time.time()self.info('creation: %.2gs',(t1-t0))forjinxrange(100,2000,100):abraham.cw_set(personne_composite=personnes[j:j+100])t2=time.time()self.info('more relations: %.2gs',(t2-t1))cnx.commit()t3=time.time()self.info('commit creation: %.2gs',(t3-t2))deftest_add_relation_inlined(self):withself.admin_access.repo_cnx()ascnx:personnes=[]foriinxrange(2000):p=cnx.create_entity('Personne',nom=u'Doe%03d'%i,prenom=u'John',sexe=u'M')personnes.append(p)cnx.commit()t0=time.time()abraham=cnx.create_entity('Personne',nom=u'Abraham',prenom=u'John',sexe=u'M',personne_inlined=personnes[:100])t1=time.time()self.info('creation: %.2gs',(t1-t0))forjinxrange(100,2000,100):abraham.cw_set(personne_inlined=personnes[j:j+100])t2=time.time()self.info('more relations: %.2gs',(t2-t1))cnx.commit()t3=time.time()self.info('commit creation: %.2gs',(t3-t2))deftest_session_add_relation(self):""" to be compared with test_session_add_relations"""withself.admin_access.repo_cnx()ascnx:personnes=[]foriinxrange(2000):p=cnx.create_entity('Personne',nom=u'Doe%03d'%i,prenom=u'John',sexe=u'M')personnes.append(p)abraham=cnx.create_entity('Personne',nom=u'Abraham',prenom=u'John',sexe=u'M')cnx.commit()t0=time.time()add_relation=cnx.add_relationforpinpersonnes:add_relation(abraham.eid,'personne_composite',p.eid)cnx.commit()t1=time.time()self.info('add relation: %.2gs',t1-t0)deftest_session_add_relations(self):""" to be compared with test_session_add_relation"""withself.admin_access.repo_cnx()ascnx:personnes=[]foriinxrange(2000):p=cnx.create_entity('Personne',nom=u'Doe%03d'%i,prenom=u'John',sexe=u'M')personnes.append(p)abraham=cnx.create_entity('Personne',nom=u'Abraham',prenom=u'John',sexe=u'M')cnx.commit()t0=time.time()add_relations=cnx.add_relationsrelations=[('personne_composite',[(abraham.eid,p.eid)forpinpersonnes])]add_relations(relations)cnx.commit()t1=time.time()self.info('add relations: %.2gs',t1-t0)deftest_session_add_relation_inlined(self):""" to be compared with test_session_add_relations"""withself.admin_access.repo_cnx()ascnx:personnes=[]foriinxrange(2000):p=cnx.create_entity('Personne',nom=u'Doe%03d'%i,prenom=u'John',sexe=u'M')personnes.append(p)abraham=cnx.create_entity('Personne',nom=u'Abraham',prenom=u'John',sexe=u'M')cnx.commit()t0=time.time()add_relation=cnx.add_relationforpinpersonnes:add_relation(abraham.eid,'personne_inlined',p.eid)cnx.commit()t1=time.time()self.info('add relation (inlined): %.2gs',t1-t0)deftest_session_add_relations_inlined(self):""" to be compared with test_session_add_relation"""withself.admin_access.repo_cnx()ascnx:personnes=[]foriinxrange(2000):p=cnx.create_entity('Personne',nom=u'Doe%03d'%i,prenom=u'John',sexe=u'M')personnes.append(p)abraham=cnx.create_entity('Personne',nom=u'Abraham',prenom=u'John',sexe=u'M')cnx.commit()t0=time.time()add_relations=cnx.add_relationsrelations=[('personne_inlined',[(abraham.eid,p.eid)forpinpersonnes])]add_relations(relations)cnx.commit()t1=time.time()self.info('add relations (inlined): %.2gs',t1-t0)deftest_optional_relation_reset_1(self):withself.admin_access.repo_cnx()ascnx:p1=cnx.create_entity('Personne',nom=u'Vincent')p2=cnx.create_entity('Personne',nom=u'Florent')w=cnx.create_entity('Affaire',ref=u'wc')w.cw_set(todo_by=[p1,p2])w.cw_clear_all_caches()cnx.commit()self.assertEqual(len(w.todo_by),1)self.assertEqual(w.todo_by[0].eid,p2.eid)deftest_optional_relation_reset_2(self):withself.admin_access.repo_cnx()ascnx:p1=cnx.create_entity('Personne',nom=u'Vincent')p2=cnx.create_entity('Personne',nom=u'Florent')w=cnx.create_entity('Affaire',ref=u'wc')w.cw_set(todo_by=p1)cnx.commit()w.cw_set(todo_by=p2)w.cw_clear_all_caches()cnx.commit()self.assertEqual(len(w.todo_by),1)self.assertEqual(w.todo_by[0].eid,p2.eid)if__name__=='__main__':fromlogilab.common.testlibimportunittest_mainunittest_main()