"""This module provides misc utilities to test instances:organization: Logilab:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses"""__docformat__="restructuredtext en"fromcopyimportdeepcopyimportsimplejsonfromlogilab.common.testlibimportTestCasefromlogilab.common.pytestimportnocoveragefromlogilab.common.umessageimportmessage_from_stringfromlogilab.common.deprecationimportdeprecatedfromcubicweb.devtoolsimportinit_test_database,TestServerConfiguration,ApptestConfigurationfromcubicweb.devtools._apptestimportTestEnvironmentfromcubicweb.devtools.fakeimportFakeRequestfromcubicweb.dbapiimportrepo_connect,ConnectionProperties,ProgrammingErrorMAILBOX=[]classEmail:def__init__(self,recipients,msg):self.recipients=recipientsself.msg=msg@propertydefmessage(self):returnmessage_from_string(self.msg)@propertydefsubject(self):returnself.message.get('Subject')@propertydefcontent(self):returnself.message.get_payload(decode=True)def__repr__(self):return'<Email to %s with subject %s>'%(','.join(self.recipients),self.message.get('Subject'))classMockSMTP:def__init__(self,server,port):passdefclose(self):passdefsendmail(self,helo_addr,recipients,msg):MAILBOX.append(Email(recipients,msg))fromcubicwebimportcwconfigcwconfig.SMTP=MockSMTPdefget_versions(self,checkversions=False):"""return the a dictionary containing cubes used by this instance as key with their version as value, including cubicweb version. This is a public method, not requiring a session id. replace Repository.get_versions by this method if you don't want versions checking """vcconf={'cubicweb':self.config.cubicweb_version()}self.config.bootstrap_cubes()forpkinself.config.cubes():version=self.config.cube_version(pk)vcconf[pk]=versionself.config._cubes=Nonereturnvcconf@propertydeflate_binding_env(self):"""builds TestEnvironment as late as possible"""ifnothasattr(self,'_env'):self.__class__._env=TestEnvironment('data',configcls=self.configcls,requestcls=self.requestcls)returnself._envclassautoenv(type):"""automatically set environment on EnvBasedTC subclasses if necessary """def__new__(mcs,name,bases,classdict):env=classdict.get('env')# try to find env in one of the base classesifenvisNone:forbaseinbases:env=getattr(base,'env',None)ifenvisnotNone:classdict['env']=envbreakifnotclassdict.get('__abstract__')andnotclassdict.get('env'):classdict['env']=late_binding_envreturnsuper(autoenv,mcs).__new__(mcs,name,bases,classdict)classEnvBasedTC(TestCase):"""abstract class for test using an apptest environment """__metaclass__=autoenv__abstract__=Trueenv=Noneconfigcls=ApptestConfigurationrequestcls=FakeRequest# user / session management ###############################################defuser(self,req=None):ifreqisNone:req=self.env.create_request()returnself.env.cnx.user(req)else:returnreq.userdefcreate_user(self,*args,**kwargs):returnself.env.create_user(*args,**kwargs)deflogin(self,login,password=None):returnself.env.login(login,password)defrestore_connection(self):self.env.restore_connection()# db api ##################################################################@nocoveragedefcursor(self,req=None):returnself.env.cnx.cursor(reqorself.request())@nocoveragedefexecute(self,*args,**kwargs):returnself.env.execute(*args,**kwargs)@nocoveragedefcommit(self):self.env.cnx.commit()@nocoveragedefrollback(self):try:self.env.cnx.rollback()exceptProgrammingError:pass# other utilities #########################################################defset_debug(self,debugmode):fromcubicweb.serverimportset_debugset_debug(debugmode)@propertydefconfig(self):returnself.vreg.configdefsession(self):"""return current server side session (using default manager account)"""returnself.env.repo._sessions[self.env.cnx.sessionid]defrequest(self,*args,**kwargs):"""return a web interface request"""returnself.env.create_request(*args,**kwargs)@nocoveragedefrset_and_req(self,*args,**kwargs):returnself.env.get_rset_and_req(*args,**kwargs)defentity(self,rql,args=None,eidkey=None,req=None):returnself.execute(rql,args,eidkey,req=req).get_entity(0,0)defetype_instance(self,etype,req=None):req=reqorself.request()e=self.env.vreg['etypes'].etype_class(etype)(req)e.eid=Nonereturnedefadd_entity(self,etype,**kwargs):rql=['INSERT %s X'%etype]# dict for replacement in RQL Requestrql_args={}ifkwargs:#rql.append(':')# dict to define new entities variablesentities={}# assignement part of the requestsub_rql=[]forkey,valueinkwargs.iteritems():# entitiesifhasattr(value,'eid'):new_value="%s__"%key.upper()entities[new_value]=value.eidrql_args[new_value]=value.eidsub_rql.append("X %s%s"%(key,new_value))# final attributeselse:sub_rql.append('X %s%%(%s)s'%(key,key))rql_args[key]=valuerql.append(', '.join(sub_rql))ifentities:rql.append('WHERE')# WHERE part of the request (to link entity to they eid)sub_rql=[]forkey,valueinentities.iteritems():sub_rql.append("%s eid %%(%s)s"%(key,key))rql.append(', '.join(sub_rql))rql=' '.join(rql)rset=self.execute(rql,rql_args)returnrset.get_entity(0,0)defset_option(self,optname,value):self.vreg.config.global_set_option(optname,value)defpviews(self,req,rset):returnsorted((a.id,a.__class__)forainself.vreg['views'].possible_views(req,rset=rset))defpactions(self,req,rset,skipcategories=('addrelated','siteactions','useractions')):return[(a.id,a.__class__)forainself.vreg['actions'].possible_vobjects(req,rset=rset)ifa.categorynotinskipcategories]defaction_submenu(self,req,rset,id):returnself._test_action(self.vreg['actions'].select(id,req,rset=rset))def_test_action(self,action):classfake_menu(list):@propertydefitems(self):returnselfclassfake_box(object):defmk_action(self,label,url,**kwargs):return(label,url)defbox_action(self,action,**kwargs):return(action.title,action.url())submenu=fake_menu()action.fill_menu(fake_box(),submenu)returnsubmenudefpactions_by_cats(self,req,rset,categories=('addrelated',)):return[(a.id,a.__class__)forainself.vreg['actions'].possible_vobjects(req,rset=rset)ifa.categoryincategories]paddrelactions=deprecated()(pactions_by_cats)defpactionsdict(self,req,rset,skipcategories=('addrelated','siteactions','useractions')):res={}forainself.vreg['actions'].possible_vobjects(req,rset=rset):ifa.categorynotinskipcategories:res.setdefault(a.category,[]).append(a.__class__)returnresdefremote_call(self,fname,*args):"""remote call simulation"""dump=simplejson.dumpsargs=[dump(arg)forarginargs]req=self.request(fname=fname,pageid='123',arg=args)ctrl=self.vreg['controllers'].select('json',req)returnctrl.publish(),req# default test setup and teardown #########################################defsetup_database(self):passdefsetUp(self):self.restore_connection()session=self.session()#self.maxeid = self.execute('Any MAX(X)')session.set_pool()self.maxeid=session.system_sql('SELECT MAX(eid) FROM entities').fetchone()[0]self.app=self.env.appself.vreg=self.env.app.vregself.schema=self.vreg.schemaself.vreg.config.mode='test'# set default-dest-addrs to a dumb email address to avoid mailbox or# mail queue pollutionself.set_option('default-dest-addrs',['whatever'])self.setup_database()self.commit()MAILBOX[:]=[]# reset mailbox@nocoveragedeftearDown(self):self.rollback()# self.env.restore_database()self.env.restore_connection()self.session().unsafe_execute('DELETE Any X WHERE X eid > %s'%self.maxeid)self.commit()# global resources accessors ################################################ XXXtry:fromcubicweb.webimportRedirectfromurllibimportunquoteexceptImportError:pass# cubicweb-web not installedelse:classControllerTC(EnvBasedTC):defsetUp(self):super(ControllerTC,self).setUp()self.req=self.request()self.ctrl=self.vreg['controllers'].select('edit',self.req)defpublish(self,req):assertreqisself.ctrl.reqtry:result=self.ctrl.publish()req.cnx.commit()exceptRedirect:req.cnx.commit()raisereturnresultdefexpect_redirect_publish(self,req=None):ifreqisnotNone:self.ctrl=self.vreg['controllers'].select('edit',req)else:req=self.reqtry:res=self.publish(req)exceptRedirect,ex:try:path,params=ex.location.split('?',1)except:path,params=ex.location,""req._url=pathcleanup=lambdap:(p[0],unquote(p[1]))params=dict(cleanup(p.split('=',1))forpinparams.split('&')ifp)returnreq.relative_path(False),params# path.rsplit('/', 1)[-1], paramselse:self.fail('expected a Redirect exception')defmake_late_binding_repo_property(attrname):@propertydeflate_binding(self):"""builds cnx as late as possible"""ifnothasattr(self,attrname):# sets explicit test mode here to avoid autoreloadfromcubicweb.cwconfigimportCubicWebConfigurationCubicWebConfiguration.mode='test'cls=self.__class__config=self.repo_configorTestServerConfiguration('data')cls._repo,cls._cnx=init_test_database('sqlite',config=config)returngetattr(self,attrname)returnlate_bindingclassautorepo(type):"""automatically set repository on RepositoryBasedTC subclasses if necessary """def__new__(mcs,name,bases,classdict):repo=classdict.get('repo')# try to find repo in one of the base classesifrepoisNone:forbaseinbases:repo=getattr(base,'repo',None)ifrepoisnotNone:classdict['repo']=repobreakifname!='RepositoryBasedTC'andnotclassdict.get('repo'):classdict['repo']=make_late_binding_repo_property('_repo')classdict['cnx']=make_late_binding_repo_property('_cnx')returnsuper(autorepo,mcs).__new__(mcs,name,bases,classdict)classRepositoryBasedTC(TestCase):"""abstract class for test using direct repository connections """__metaclass__=autoreporepo_config=None# set a particular config instance if necessary# user / session management ###############################################defcreate_user(self,user,groups=('users',),password=None,commit=True):ifpasswordisNone:password=usereid=self.execute('INSERT CWUser X: X login %(x)s, X upassword %(p)s',{'x':unicode(user),'p':password})[0][0]groups=','.join(repr(group)forgroupingroups)self.execute('SET X in_group Y WHERE X eid %%(x)s, Y name IN (%s)'%groups,{'x':eid})ifcommit:self.commit()self.session.reset_pool()returneiddeflogin(self,login,password=None):cnx=repo_connect(self.repo,unicode(login),passwordorlogin,ConnectionProperties('inmemory'))self.cnxs.append(cnx)returncnxdefcurrent_session(self):returnself.repo._sessions[self.cnxs[-1].sessionid]defrestore_connection(self):assertlen(self.cnxs)==1,self.cnxscnx=self.cnxs.pop()try:cnx.close()exceptException,ex:print"exception occured while closing connection",ex# db api ##################################################################defexecute(self,rql,args=None,eid_key=None):assertself.session.id==self.cnxidrset=self.__execute(self.cnxid,rql,args,eid_key)rset.vreg=self.vregrset.req=self.session# call to set_pool is necessary to avoid pb when using# instance entities for convenienceself.session.set_pool()returnrsetdefcommit(self):self.__commit(self.cnxid)self.session.set_pool()defrollback(self):self.__rollback(self.cnxid)self.session.set_pool()defclose(self):self.__close(self.cnxid)# other utilities #########################################################defset_debug(self,debugmode):fromcubicweb.serverimportset_debugset_debug(debugmode)defset_option(self,optname,value):self.vreg.config.global_set_option(optname,value)defadd_entity(self,etype,**kwargs):restrictions=', '.join('X %s%%(%s)s'%(key,key)forkeyinkwargs)rql='INSERT %s X'%etypeifkwargs:rql+=': %s'%', '.join('X %s%%(%s)s'%(key,key)forkeyinkwargs)rset=self.execute(rql,kwargs)returnrset.get_entity(0,0)defdefault_user_password(self):config=self.repo.config#TestConfiguration('data')user=unicode(config.sources()['system']['db-user'])passwd=config.sources()['system']['db-password']returnuser,passwddefclose_connections(self):forcnxinself.cnxs:try:cnx.rollback()cnx.close()except:continueself.cnxs=[]pactions=EnvBasedTC.pactions.im_funcpactionsdict=EnvBasedTC.pactionsdict.im_func# default test setup and teardown #########################################def_prepare(self):MAILBOX[:]=[]# reset mailboxifhasattr(self,'cnxid'):returnrepo=self.repoself.__execute=repo.executeself.__commit=repo.commitself.__rollback=repo.rollbackself.__close=repo.closeself.cnxid=self.cnx.sessionidself.session=repo._sessions[self.cnxid]self.cnxs=[]# reset caches, they may introduce bugs among testsrepo._type_source_cache={}repo._extid_cache={}repo.querier._rql_cache={}forsourceinrepo.sources:source.reset_caches()forsinrepo.sources:ifhasattr(s,'_cache'):s._cache={}@propertydefconfig(self):returnself.repo.config@propertydefvreg(self):returnself.repo.vreg@propertydefschema(self):returnself.repo.schemadefsetUp(self):self._prepare()self.session.set_pool()self.maxeid=self.session.system_sql('SELECT MAX(eid) FROM entities').fetchone()[0]deftearDown(self):self.close_connections()self.rollback()self.session.unsafe_execute('DELETE Any X WHERE X eid > %(x)s',{'x':self.maxeid})self.commit()