"""Hidden internals for the devtools.apptest module:organization: Logilab:copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr"""__docformat__="restructuredtext en"importsys,tracebackfromlogilab.common.pytestimportpause_tracing,resume_tracingimportyams.schemafromcubicweb.dbapiimportrepo_connect,ConnectionProperties,ProgrammingErrorfromcubicweb.cwvregimportCubicWebRegistryfromcubicweb.web.applicationimportCubicWebPublisherfromcubicweb.webimportRedirectfromcubicweb.devtoolsimportApptestConfiguration,init_test_databasefromcubicweb.devtools.fakeimportFakeRequestSYSTEM_ENTITIES=('EGroup','EUser','EFRDef','ENFRDef','EConstraint','EConstraintType','EProperty','EEType','ERType','State','Transition','TrInfo','RQLExpression',)SYSTEM_RELATIONS=(# virtual relation'identity',# metadata'is','is_instance_of','owned_by','created_by','specializes',# workflow related'state_of','transition_of','initial_state','allowed_transition','destination_state','in_state','wf_info_for','from_state','to_state','condition',# permission'in_group','require_group','require_permission','read_permission','update_permission','delete_permission','add_permission',# eproperty'for_user',# schema definition'relation_type','from_entity','to_entity','constrained_by','cstrtype','widget',# deducted from other relations'primary_email',)defunprotected_entities(app_schema,strict=False):"""returned a Set of each non final entity type, excluding EGroup, and EUser... """ifstrict:protected_entities=yams.schema.BASE_TYPESelse:protected_entities=yams.schema.BASE_TYPES.union(set(SYSTEM_ENTITIES))entities=set(app_schema.entities())returnentities-protected_entitiesdefignore_relations(*relations):SYSTEM_RELATIONS+=relationsclassTestEnvironment(object):"""TestEnvironment defines a context (e.g. a config + a given connection) in which the tests are executed """def__init__(self,appid,reporter=None,verbose=False,configcls=ApptestConfiguration,requestcls=FakeRequest):config=configcls(appid)self.requestcls=requestclsself.cnx=Noneconfig.db_perms=Falsesource=config.sources()['system']ifverbose:print"init test database ..."self.vreg=vreg=CubicWebRegistry(config)self.admlogin=source['db-user']# restore database <=> init databaseself.restore_database()ifverbose:print"init done"login=source['db-user']config.repository=lambdax=None:self.repoself.app=CubicWebPublisher(config,vreg=vreg)self.verbose=verboseschema=self.vreg.schema# else we may run into problems since email address are ususally share in app tests# XXX should not be necessary anymoreschema.rschema('primary_email').set_rproperty('EUser','EmailAddress','composite',False)self.deletable_entities=unprotected_entities(schema)defrestore_database(self):"""called by unittests' tearDown to restore the original database """try:pause_tracing()ifself.cnx:self.cnx.close()source=self.vreg.config.sources()['system']self.repo,self.cnx=init_test_database(driver=source['db-driver'],vreg=self.vreg)self._orig_cnx=self.cnxresume_tracing()except:resume_tracing()traceback.print_exc()sys.exit(1)# XXX cnx decoration is usually done by the repository authentication manager,# necessary in authentication testsself.cnx.vreg=self.vregself.cnx.login=source['db-user']self.cnx.password=source['db-password']defcreate_user(self,login,groups=('users',),req=None):req=reqorself.create_request()cursor=self._orig_cnx.cursor(req)rset=cursor.execute('INSERT EUser X: X login %(login)s, X upassword %(passwd)s,''X in_state S WHERE S name "activated"',{'login':unicode(login),'passwd':login.encode('utf8')})user=rset.get_entity(0,0)cursor.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)'%','.join(repr(g)forgingroups),{'x':user.eid},'x')user.clear_related_cache('in_group','subject')self._orig_cnx.commit()returnuserdeflogin(self,login):iflogin==self.admlogin:self.restore_connection()else:self.cnx=repo_connect(self.repo,unicode(login),str(login),ConnectionProperties('inmemory'))iflogin==self.vreg.config.anonymous_user()[0]:self.cnx.anonymous_connection=Truereturnself.cnxdefrestore_connection(self):ifnotself.cnxisself._orig_cnx:try:self.cnx.close()exceptProgrammingError:pass# already closedself.cnx=self._orig_cnx############################################################################defexecute(self,rql,args=None,eidkey=None,req=None):"""executes <rql>, builds a resultset, and returns a couple (rset, req) where req is a FakeRequest """req=reqorself.create_request(rql=rql)returnself.cnx.cursor(req).execute(unicode(rql),args,eidkey)defcreate_request(self,rql=None,**kwargs):"""executes <rql>, builds a resultset, and returns a couple (rset, req) where req is a FakeRequest """ifrql:kwargs['rql']=rqlreq=self.requestcls(self.vreg,form=kwargs)req.set_connection(self.cnx)returnreqdefget_rset_and_req(self,rql,optional_args=None,args=None,eidkey=None):"""executes <rql>, builds a resultset, and returns a couple (rset, req) where req is a FakeRequest """return(self.execute(rql,args,eidkey),self.create_request(rql=rql,**optional_argsor{}))defcheck_view(self,rql,vid,optional_args,template='main'):"""checks if vreg.view() raises an exception in this environment If any exception is raised in this method, it will be considered as a TestFailure """returnself.call_view(vid,rql,template=template,optional_args=optional_args)defcall_view(self,vid,rql,template='main',optional_args=None):"""shortcut for self.vreg.view()"""asserttemplateifoptional_argsisNone:optional_args={}optional_args['vid']=vidreq=self.create_request(rql=rql,**optional_args)returnself.vreg.main_template(req,template)defcall_edit(self,req):"""shortcut for self.app.edit()"""controller=self.app.select_controller('edit',req)try:controller.publish()exceptRedirect:result='success'else:raiseException('edit should raise Redirect on success')req.cnx.commit()returnresultdefiter_possible_views(self,req,rset):"""returns a list of possible vids for <rql>"""forviewinself.vreg.possible_views(req,rset):ifview.category=='startupview':continueyieldview.idifrset.rowcount==1:yield'edition'defiter_startup_views(self,req):"""returns the list of startup views"""forviewinself.vreg.possible_views(req,None):ifview.category!='startupview':continueyieldview.iddefiter_possible_actions(self,req,rset):"""returns a list of possible vids for <rql>"""foractioninself.vreg.possible_vobjects('actions',req,rset):yieldactionclassExistingTestEnvironment(TestEnvironment):def__init__(self,appid,sourcefile,verbose=False):config=ApptestConfiguration(appid,sourcefile=sourcefile)ifverbose:print"init test database ..."source=config.sources()['system']self.vreg=CubicWebRegistry(config)repo,self.cnx=init_test_database(driver=source['db-driver'],vreg=self.vreg)ifverbose:print"init done"self.app=CubicWebPublisher(config,vreg=self.vreg)self.verbose=verbose# this is done when the publisher is opening a connectionself.cnx.vreg=self.vreglogin=source['db-user']defsetup(self,config=None):"""config is passed by TestSuite but is ignored in this environment"""cursor=self.cnx.cursor()self.last_eid=cursor.execute('Any X WHERE X creation_date D ORDERBY D DESC LIMIT 1').rows[0][0]defcleanup(self):"""cancel inserted elements during tests"""cursor=self.cnx.cursor()cursor.execute('DELETE Any X WHERE X eid > %(x)s',{'x':self.last_eid},eid_key='x')print"cleaning done"self.cnx.commit()