[testlib] gather all repository access logic in one place
Refactoring of the repository access API in test is imminent. We plan to move
from the "old" dbapi to the new repoapi.
Gathering all impacted method in one place help to understand how all those
method interact and help readability for both patch and resulting code.
No code change is done at all in this changeset. The refactoring will code
later.
# copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""this module contains base classes and utilities for cubicweb tests"""__docformat__="restructuredtext en"importsysimportreimporturlparsefromos.pathimportdirname,join,abspathfromurllibimportunquotefrommathimportlogfromcontextlibimportcontextmanagerfromwarningsimportwarnfromtypesimportNoneTypefromitertoolsimportchainimportyams.schemafromlogilab.common.testlibimportTestCase,InnerTest,Tagsfromlogilab.common.pytestimportnocoverage,pause_tracing,resume_tracingfromlogilab.common.debuggerimportDebuggerfromlogilab.common.umessageimportmessage_from_stringfromlogilab.common.decoratorsimportcached,classproperty,clear_cache,iclassmethodfromlogilab.common.deprecationimportdeprecated,class_deprecatedfromlogilab.common.shellutilsimportgetloginfromcubicwebimportValidationError,NoSelectableObject,AuthenticationErrorfromcubicwebimportcwconfig,dbapi,devtools,web,serverfromcubicweb.utilsimportjsonfromcubicweb.sobjectsimportnotificationfromcubicweb.webimportRedirect,applicationfromcubicweb.server.hookimportSendMailOpfromcubicweb.devtoolsimportSYSTEM_ENTITIES,SYSTEM_RELATIONS,VIEW_VALIDATORSfromcubicweb.devtoolsimportfake,htmlparser,DEFAULT_EMPTY_DB_IDfromcubicweb.utilsimportjson# low-level utilities ##########################################################classCubicWebDebugger(Debugger):"""special debugger class providing a 'view' function which saves some html into a temporary file and open a web browser to examinate it. """defdo_view(self,arg):importwebbrowserdata=self._getval(arg)withfile('/tmp/toto.html','w')astoto:toto.write(data)webbrowser.open('file:///tmp/toto.html')defline_context_filter(line_no,center,before=3,after=None):"""return true if line are in context if after is None: after = before """ifafterisNone:after=beforereturncenter-before<=line_no<=center+afterdefunprotected_entities(schema,strict=False):"""returned a set of each non final entity type, excluding "system" entities (eg CWGroup, CWUser...) """ifstrict:protected_entities=yams.schema.BASE_TYPESelse:protected_entities=yams.schema.BASE_TYPES.union(SYSTEM_ENTITIES)returnset(schema.entities())-protected_entitiesclassJsonValidator(object):defparse_string(self,data):returnjson.loads(data)# email handling, to test emails sent by an application ########################MAILBOX=[]classEmail:"""you'll get instances of Email into MAILBOX during tests that trigger some notification. * `msg` is the original message object * `recipients` is a list of email address which are the recipients of this message """def__init__(self,recipients,msg):self.recipients=recipientsself.msg=msg@propertydefmessage(self):returnmessage_from_string(self.msg)@propertydefsubject(self):returnself.message.get('Subject')@propertydefcontent(self):returnself.message.get_payload(decode=True)def__repr__(self):return'<Email to %s with subject %s>'%(','.join(self.recipients),self.message.get('Subject'))# the trick to get email into MAILBOX instead of actually sent: monkey patch# cwconfig.SMTP objectclassMockSMTP:def__init__(self,server,port):passdefclose(self):passdefsendmail(self,helo_addr,recipients,msg):MAILBOX.append(Email(recipients,msg))cwconfig.SMTP=MockSMTPclassTestCaseConnectionProxy(object):"""thin wrapper around `cubicweb.dbapi.Connection` context-manager used in CubicWebTC (cf. `cubicweb.devtools.testlib.CubicWebTC.login` method) It just proxies to the default connection context manager but restores the original connection on exit. """def__init__(self,testcase,cnx):self.testcase=testcaseself.cnx=cnxdef__getattr__(self,attrname):returngetattr(self.cnx,attrname)def__enter__(self):returnself.cnx.__enter__()def__exit__(self,exctype,exc,tb):try:returnself.cnx.__exit__(exctype,exc,tb)finally:self.cnx.close()self.testcase.restore_connection()# base class for cubicweb tests requiring a full cw environments ###############classCubicWebTC(TestCase):"""abstract class for test using an apptest environment attributes: * `vreg`, the vregistry * `schema`, self.vreg.schema * `config`, cubicweb configuration * `cnx`, dbapi connection to the repository using an admin user * `session`, server side session associated to `cnx` * `app`, the cubicweb publisher (for web testing) * `repo`, the repository object * `admlogin`, login of the admin user * `admpassword`, password of the admin user * `shell`, create and use shell environment """appid='data'configcls=devtools.ApptestConfigurationtags=TestCase.tags|Tags('cubicweb','cw_repo')test_db_id=DEFAULT_EMPTY_DB_ID_cnxs=set()# establised connection# stay on connection for leak detection purposedef__init__(self,*args,**kwargs):self._cnx=None# current connectionself.repo=Noneself.websession=Nonesuper(CubicWebTC,self).__init__(*args,**kwargs)# repository connection handling ############################################ Too much complicated stuff. the class doesn't need to bear the repo anymoredefset_cnx(self,cnx):self._cnxs.add(cnx)self._cnx=cnx@propertydefcnx(self):returnself._cnxdef_close_cnx(self):forcnxinlist(self._cnxs):ifnotcnx._closed:cnx.rollback()cnx.close()self._cnxs.remove(cnx)@propertydefsession(self):"""return current server side session (using default manager account)"""session=self.repo._sessions[self.cnx.sessionid]session.set_cnxset()returnsessiondeflogin(self,login,**kwargs):"""return a connection for the given login/password"""iflogin==self.admlogin:self.restore_connection()# definitly don't want autoclose when used as a context managerreturnself.cnxautoclose=kwargs.pop('autoclose',True)ifnotkwargs:kwargs['password']=str(login)self.set_cnx(dbapi._repo_connect(self.repo,unicode(login),**kwargs))self.websession=dbapi.DBAPISession(self.cnx)ifautoclose:returnTestCaseConnectionProxy(self,self.cnx)returnself.cnxdefrestore_connection(self):ifnotself.cnxisself._orig_cnx[0]:ifnotself.cnx._closed:self.cnx.close()cnx,self.websession=self._orig_cnxself.set_cnx(cnx)#XXX this doesn't need to a be classmethod anymoredef_init_repo(self):"""init the repository and connection to it. """# setup configuration for testself.init_config(self.config)# get or restore and working db.db_handler=devtools.get_test_db_handler(self.config)db_handler.build_db_cache(self.test_db_id,self.pre_setup_database)self.repo,cnx=db_handler.get_repo_and_cnx(self.test_db_id)# no direct assignation to cls.cnx anymore.# cnx is now an instance property that use a class protected attributes.self.set_cnx(cnx)self.websession=dbapi.DBAPISession(cnx,self.admlogin)self._orig_cnx=(cnx,self.websession)self.config.repository=lambdax=None:self.repo# db api ##################################################################@nocoveragedefcursor(self,req=None):returnself.cnx.cursor(reqorself.request())@nocoveragedefexecute(self,rql,args=None,eidkey=None,req=None):"""executes <rql>, builds a resultset, and returns a couple (rset, req) where req is a FakeRequest """ifeidkeyisnotNone:warn('[3.8] eidkey is deprecated, you can safely remove this argument',DeprecationWarning,stacklevel=2)req=reqorself.request(rql=rql)returnreq.execute(unicode(rql),args)@nocoveragedefcommit(self):try:returnself.cnx.commit()finally:self.session.set_cnxset()# ensure cnxset still set after commit@nocoveragedefrollback(self):try:self.cnx.rollback()exceptdbapi.ProgrammingError:pass# connection closedfinally:self.session.set_cnxset()# ensure cnxset still set after commitrequestcls=fake.FakeRequestdefrequest(self,rollbackfirst=False,url=None,headers={},**kwargs):"""return a web ui request"""req=self.requestcls(self.vreg,url=url,headers=headers,form=kwargs)ifrollbackfirst:self.websession.cnx.rollback()req.set_session(self.websession)returnreq@propertydefadminsession(self):"""return current server side session (using default manager account)"""returnself.repo._sessions[self._orig_cnx[0].sessionid]# server side db api #######################################################defsexecute(self,rql,args=None,eid_key=None):ifeid_keyisnotNone:warn('[3.8] eid_key is deprecated, you can safely remove this argument',DeprecationWarning,stacklevel=2)self.session.set_cnxset()returnself.session.execute(rql,args)# config management ########################################################@classpropertydefconfig(cls):"""return the configuration object Configuration is cached on the test class. """try:assertnotclsisCubicWebTC,"Don't use CubicWebTC directly to prevent database caching issue"returncls.__dict__['_config']exceptKeyError:home=abspath(join(dirname(sys.modules[cls.__module__].__file__),cls.appid))config=cls._config=cls.configcls(cls.appid,apphome=home)config.mode='test'returnconfig@classmethoddefinit_config(cls,config):"""configuration initialization hooks. You may only want to override here the configuraton logic. Otherwise, consider to use a different :class:`ApptestConfiguration` defined in the `configcls` class attribute"""source=config.sources()['system']cls.admlogin=unicode(source['db-user'])cls.admpassword=source['db-password']# uncomment the line below if you want rql queries to be logged#config.global_set_option('query-log-file',# '/tmp/test_rql_log.' + `os.getpid()`)config.global_set_option('log-file',None)# set default-dest-addrs to a dumb email address to avoid mailbox or# mail queue pollutionconfig.global_set_option('default-dest-addrs',['whatever'])send_to='%s@logilab.fr'%getlogin()config.global_set_option('sender-addr',send_to)config.global_set_option('default-dest-addrs',send_to)config.global_set_option('sender-name','cubicweb-test')config.global_set_option('sender-addr','cubicweb-test@logilab.fr')# default_base_url on config class isn't enough for TestServerConfigurationconfig.global_set_option('base-url',config.default_base_url())# web resourcestry:config.global_set_option('embed-allowed',re.compile('.*'))exceptException:# not in server only configurationpass@propertydefvreg(self):returnself.repo.vreg# global resources accessors ###############################################@propertydefschema(self):"""return the application schema"""returnself.vreg.schemadefshell(self):"""return a shell session object"""fromcubicweb.server.migractionsimportServerMigrationHelperreturnServerMigrationHelper(None,repo=self.repo,cnx=self.cnx,interactive=False,# hack so it don't try to load fs schemaschema=1)defset_option(self,optname,value):self.config.global_set_option(optname,value)defset_debug(self,debugmode):server.set_debug(debugmode)defdebugged(self,debugmode):returnserver.debugged(debugmode)# default test setup and teardown #########################################defsetUp(self):# monkey patch send mail operation so emails are sent synchronouslyself._patch_SendMailOp()pause_tracing()previous_failure=self.__class__.__dict__.get('_repo_init_failed')ifprevious_failureisnotNone:self.skipTest('repository is not initialised: %r'%previous_failure)try:self._init_repo()self.addCleanup(self._close_cnx)exceptExceptionasex:self.__class__._repo_init_failed=exraiseresume_tracing()self.setup_database()self.commit()MAILBOX[:]=[]# reset mailboxdeftearDown(self):# XXX hack until logilab.common.testlib is fixedwhileself._cleanups:cleanup,args,kwargs=self._cleanups.pop(-1)cleanup(*args,**kwargs)def_patch_SendMailOp(self):# monkey patch send mail operation so emails are sent synchronously_old_mail_postcommit_event=SendMailOp.postcommit_eventSendMailOp.postcommit_event=SendMailOp.sendmailsdefreverse_SendMailOp_monkey_patch():SendMailOp.postcommit_event=_old_mail_postcommit_eventself.addCleanup(reverse_SendMailOp_monkey_patch)defsetup_database(self):"""add your database setup code by overriding this method"""@classmethoddefpre_setup_database(cls,session,config):"""add your pre database setup code by overriding this method Do not forget to set the cls.test_db_id value to enable caching of the result. """# user / session management ###############################################defuser(self,req=None):"""return the application schema"""ifreqisNone:req=self.request()returnself.cnx.user(req)else:returnreq.user@iclassmethod# XXX turn into a class methoddefcreate_user(self,req,login=None,groups=('users',),password=None,email=None,commit=True,**kwargs):"""create and return a new user entity"""ifisinstance(req,basestring):warn('[3.12] create_user arguments are now (req, login[, groups, password, commit, **kwargs])',DeprecationWarning,stacklevel=2)ifnotisinstance(groups,(tuple,list)):password=groupsgroups=loginelifisinstance(login,tuple):groups=loginlogin=reqassertnotisinstance(self,type)req=self._orig_cnx[0].request()ifpasswordisNone:password=login.encode('utf8')user=req.create_entity('CWUser',login=unicode(login),upassword=password,**kwargs)req.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)'%','.join(repr(str(g))forgingroups),{'x':user.eid})ifemailisnotNone:req.create_entity('EmailAddress',address=unicode(email),reverse_primary_email=user)user.cw_clear_relation_cache('in_group','subject')ifcommit:try:req.commit()# req is a sessionexceptAttributeError:req.cnx.commit()returnuser# other utilities #########################################################@contextmanagerdeftemporary_appobjects(self,*appobjects):self.vreg._loadedmods.setdefault(self.__module__,{})forobjinappobjects:self.vreg.register(obj)try:yieldfinally:forobjinappobjects:self.vreg.unregister(obj)@contextmanagerdeftemporary_permissions(self,*perm_overrides,**perm_kwoverrides):"""Set custom schema permissions within context. There are two ways to call this method, which may be used together : * using positional argument(s): .. sourcecode:: python rdef = self.schema['CWUser'].rdef('login') with self.temporary_permissions((rdef, {'read': ()})): ... * using named argument(s): .. sourcecode:: python rdef = self.schema['CWUser'].rdef('login') with self.temporary_permissions(CWUser={'read': ()}): ... Usually the former will be prefered to override permissions on a relation definition, while the latter is well suited for entity types. The allowed keys in the permission dictionary depends on the schema type (entity type / relation definition). Resulting permissions will be similar to `orig_permissions.update(partial_perms)`. """torestore=[]forerschema,etypepermsinchain(perm_overrides,perm_kwoverrides.iteritems()):ifisinstance(erschema,basestring):erschema=self.schema[erschema]foraction,actionpermsinetypeperms.iteritems():origperms=erschema.permissions[action]erschema.set_action_permissions(action,actionperms)torestore.append([erschema,action,origperms])yieldforerschema,action,permissionsintorestore:ifactionisNone:erschema.permissions=permissionselse:erschema.set_action_permissions(action,permissions)defassertModificationDateGreater(self,entity,olddate):entity.cw_attr_cache.pop('modification_date',None)self.assertTrue(entity.modification_date>olddate)defassertItemsEqual(self,it1,it2,*args,**kwargs):it1=set(getattr(x,'eid',x)forxinit1)it2=set(getattr(x,'eid',x)forxinit2)super(CubicWebTC,self).assertItemsEqual(it1,it2,*args,**kwargs)defassertMessageEqual(self,req,params,msg):msg=req.session.data[params['_cwmsgid']]self.assertEqual(msg,msg)# workflow utilities #######################################################defassertPossibleTransitions(self,entity,expected):transitions=entity.cw_adapt_to('IWorkflowable').possible_transitions()self.assertListEqual(sorted(tr.namefortrintransitions),sorted(expected))# views and actions registries inspection ##################################defpviews(self,req,rset):returnsorted((a.__regid__,a.__class__)forainself.vreg['views'].possible_views(req,rset=rset))defpactions(self,req,rset,skipcategories=('addrelated','siteactions','useractions','footer','manage')):return[(a.__regid__,a.__class__)forainself.vreg['actions'].poss_visible_objects(req,rset=rset)ifa.categorynotinskipcategories]defpactions_by_cats(self,req,rset,categories=('addrelated',)):return[(a.__regid__,a.__class__)forainself.vreg['actions'].poss_visible_objects(req,rset=rset)ifa.categoryincategories]defpactionsdict(self,req,rset,skipcategories=('addrelated','siteactions','useractions','footer','manage')):res={}forainself.vreg['actions'].poss_visible_objects(req,rset=rset):ifa.categorynotinskipcategories:res.setdefault(a.category,[]).append(a.__class__)returnresdefaction_submenu(self,req,rset,id):returnself._test_action(self.vreg['actions'].select(id,req,rset=rset))def_test_action(self,action):classfake_menu(list):@propertydefitems(self):returnselfclassfake_box(object):defaction_link(self,action,**kwargs):return(action.title,action.url())submenu=fake_menu()action.fill_menu(fake_box(),submenu)returnsubmenudeflist_views_for(self,rset):"""returns the list of views that can be applied on `rset`"""req=rset.reqonly_once_vids=('primary','secondary','text')req.data['ex']=ValueError("whatever")viewsvreg=self.vreg['views']forvid,viewsinviewsvreg.items():ifvid[0]=='_':continueifrset.rowcount>1andvidinonly_once_vids:continueviews=[viewforviewinviewsifview.category!='startupview'andnotissubclass(view,notification.NotificationView)andnotisinstance(view,class_deprecated)]ifviews:try:view=viewsvreg._select_best(views,req,rset=rset)ifviewisNone:raiseNoSelectableObject((req,),{'rset':rset},views)ifview.linkable():yieldviewelse:not_selected(self.vreg,view)# else the view is expected to be used as subview and should# not be tested directlyexceptNoSelectableObject:continuedeflist_actions_for(self,rset):"""returns the list of actions that can be applied on `rset`"""req=rset.reqforactioninself.vreg['actions'].possible_objects(req,rset=rset):yieldactiondeflist_boxes_for(self,rset):"""returns the list of boxes that can be applied on `rset`"""req=rset.reqforboxinself.vreg['ctxcomponents'].possible_objects(req,rset=rset):yieldboxdeflist_startup_views(self):"""returns the list of startup views"""req=self.request()forviewinself.vreg['views'].possible_views(req,None):ifview.category=='startupview':yieldview.__regid__else:not_selected(self.vreg,view)# web ui testing utilities #################################################@property@cacheddefapp(self):"""return a cubicweb publisher"""publisher=application.CubicWebPublisher(self.repo,self.config)defraise_error_handler(*args,**kwargs):raisepublisher.error_handler=raise_error_handlerreturnpublisherdefremote_call(self,fname,*args):"""remote json call simulation"""dump=json.dumpsargs=[dump(arg)forarginargs]req=self.request(fname=fname,pageid='123',arg=args)ctrl=self.vreg['controllers'].select('ajax',req)returnctrl.publish(),reqdefapp_handle_request(self,req,path='view'):returnself.app.core_handle(req,path)@deprecated("[3.15] app_handle_request is the new and better way"" (beware of small semantic changes)")defapp_publish(self,*args,**kwargs):returnself.app_handle_request(*args,**kwargs)defctrl_publish(self,req,ctrl='edit',rset=None):"""call the publish method of the edit controller"""ctrl=self.vreg['controllers'].select(ctrl,req,appli=self.app)try:result=ctrl.publish(rset)req.cnx.commit()exceptweb.Redirect:req.cnx.commit()raisereturnresultdefreq_from_url(self,url):"""parses `url` and builds the corresponding CW-web request req.form will be setup using the url's query string """req=self.request(url=url)ifisinstance(url,unicode):url=url.encode(req.encoding)# req.setup_params() expects encoded stringsquerystring=urlparse.urlparse(url)[-2]params=urlparse.parse_qs(querystring)req.setup_params(params)returnreqdefurl_publish(self,url,data=None):"""takes `url`, uses application's app_resolver to find the appropriate controller and result set, then publishes the result. To simulate post of www-form-encoded data, give a `data` dictionary containing desired key/value associations. This should pretty much correspond to what occurs in a real CW server except the apache-rewriter component is not called. """req=self.req_from_url(url)ifdataisnotNone:req.form.update(data)ctrlid,rset=self.app.url_resolver.process(req,req.relative_path(False))returnself.ctrl_publish(req,ctrlid,rset)defhttp_publish(self,url,data=None):"""like `url_publish`, except this returns a http response, even in case of errors"""req=self.req_from_url(url)ifdataisnotNone:req.form.update(data)# remove the monkey patched error handlerfake_error_handler=self.app.error_handlerdelself.app.error_handlertry:result=self.app_handle_request(req,req.relative_path(False))finally:self.app.error_handler=fake_error_handlerreturnresult,req@staticmethoddef_parse_location(req,location):try:path,params=location.split('?',1)exceptValueError:path=locationparams={}else:cleanup=lambdap:(p[0],unquote(p[1]))params=dict(cleanup(p.split('=',1))forpinparams.split('&')ifp)ifpath.startswith(req.base_url()):# may be relativepath=path[len(req.base_url()):]returnpath,paramsdefexpect_redirect(self,callback,req):"""call the given callback with req as argument, expecting to get a Redirect exception """try:callback(req)exceptRedirectasex:returnself._parse_location(req,ex.location)else:self.fail('expected a Redirect exception')defexpect_redirect_handle_request(self,req,path='edit'):"""call the publish method of the application publisher, expecting to get a Redirect exception """result=self.app_handle_request(req,path)self.assertTrue(300<=req.status_out<400,req.status_out)location=req.get_response_header('location')returnself._parse_location(req,location)@deprecated("[3.15] expect_redirect_handle_request is the new and better way"" (beware of small semantic changes)")defexpect_redirect_publish(self,*args,**kwargs):returnself.expect_redirect_handle_request(*args,**kwargs)defset_auth_mode(self,authmode,anonuser=None):self.set_option('auth-mode',authmode)self.set_option('anonymous-user',anonuser)ifanonuserisNone:self.config.anonymous_credential=Noneelse:self.config.anonymous_credential=(anonuser,anonuser)definit_authentication(self,authmode,anonuser=None):self.set_auth_mode(authmode,anonuser)req=self.requestcls(self.vreg,url='login')sh=self.app.session_handlerauthm=sh.session_manager.authmanagerauthm.anoninfo=self.vreg.config.anonymous_user()authm.anoninfo=authm.anoninfo[0],{'password':authm.anoninfo[1]}# not properly cleaned between testsself.open_sessions=sh.session_manager._sessions={}returnreq,self.websessiondefassertAuthSuccess(self,req,origsession,nbsessions=1):sh=self.app.session_handlersession=self.app.get_session(req)req.set_session(session)self.assertEqual(len(self.open_sessions),nbsessions,self.open_sessions)self.assertEqual(session.login,origsession.login)self.assertEqual(session.anonymous_session,False)defassertAuthFailure(self,req,nbsessions=0):withself.assertRaises(AuthenticationError):self.app.get_session(req)# +0 since we do not track the opened sessionself.assertEqual(len(self.open_sessions),nbsessions)clear_cache(req,'get_authorization')# content validation ######################################################## validators are used to validate (XML, DTD, whatever) view's content# validators availables are :# DTDValidator : validates XML + declared DTD# SaxOnlyValidator : guarantees XML is well formed# None : do not try to validate anything# validators used must be imported from from.devtools.htmlparsercontent_type_validators={# maps MIME type : validator name## do not set html validators here, we need HTMLValidator for html# snippets#'text/html': DTDValidator,#'application/xhtml+xml': DTDValidator,'application/xml':htmlparser.XMLValidator,'text/xml':htmlparser.XMLValidator,'application/json':JsonValidator,'text/plain':None,'text/comma-separated-values':None,'text/x-vcard':None,'text/calendar':None,'image/png':None,}# maps vid : validator name (override content_type_validators)vid_validators=dict((vid,htmlparser.VALMAP[valkey])forvid,valkeyinVIEW_VALIDATORS.iteritems())defview(self,vid,rset=None,req=None,template='main-template',**kwargs):"""This method tests the view `vid` on `rset` using `template` If no error occurred while rendering the view, the HTML is analyzed and parsed. :returns: an instance of `cubicweb.devtools.htmlparser.PageInfo` encapsulation the generated HTML """ifreqisNone:ifrsetisNone:req=self.request()else:req=rset.reqreq.form['vid']=vidviewsreg=self.vreg['views']view=viewsreg.select(vid,req,rset=rset,**kwargs)# set explicit test descriptionifrsetisnotNone:self.set_description("testing vid=%s defined in %s with (%s)"%(vid,view.__module__,rset.printable_rql()))else:self.set_description("testing vid=%s defined in %s without rset"%(vid,view.__module__))iftemplateisNone:# raw view testing, no templateviewfunc=view.renderelse:kwargs['view']=viewviewfunc=lambda**k:viewsreg.main_template(req,template,rset=rset,**kwargs)returnself._test_view(viewfunc,view,template,kwargs)def_test_view(self,viewfunc,view,template='main-template',kwargs={}):"""this method does the actual call to the view If no error occurred while rendering the view, the HTML is analyzed and parsed. :returns: an instance of `cubicweb.devtools.htmlparser.PageInfo` encapsulation the generated HTML """try:output=viewfunc(**kwargs)exceptException:# hijack exception: generative tests stop when the exception# is not an AssertionErrorklass,exc,tcbk=sys.exc_info()try:msg='[%s in %s] %s'%(klass,view.__regid__,exc)exceptException:msg='[%s in %s] undisplayable exception'%(klass,view.__regid__)raiseAssertionError,msg,tcbkreturnself._check_html(output,view,template)defget_validator(self,view=None,content_type=None,output=None):ifviewisnotNone:try:returnself.vid_validators[view.__regid__]()exceptKeyError:ifcontent_typeisNone:content_type=view.content_typeifcontent_typeisNone:content_type='text/html'ifcontent_typein('text/html','application/xhtml+xml')andoutput:ifoutput.startswith('<!DOCTYPE html>'):# only check XML well-formness since HTMLValidator isn't html5# compatible and won't like various other extensionsdefault_validator=htmlparser.XMLSyntaxValidatorelifoutput.startswith('<?xml'):default_validator=htmlparser.DTDValidatorelse:default_validator=htmlparser.HTMLValidatorelse:default_validator=Nonevalidatorclass=self.content_type_validators.get(content_type,default_validator)ifvalidatorclassisNone:returnreturnvalidatorclass()@nocoveragedef_check_html(self,output,view,template='main-template'):"""raises an exception if the HTML is invalid"""output=output.strip()validator=self.get_validator(view,output=output)ifvalidatorisNone:returnoutput# return raw output if no validator is definedifisinstance(validator,htmlparser.DTDValidator):# XXX remove <canvas> used in progress widget, unknown in html dtdoutput=re.sub('<canvas.*?></canvas>','',output)returnself.assertWellFormed(validator,output.strip(),context=view.__regid__)defassertWellFormed(self,validator,content,context=None):try:returnvalidator.parse_string(content)exceptException:# hijack exception: generative tests stop when the exception# is not an AssertionErrorklass,exc,tcbk=sys.exc_info()ifcontextisNone:msg=u'[%s]'%(klass,)else:msg=u'[%s in %s]'%(klass,context)msg=msg.encode(sys.getdefaultencoding(),'replace')try:str_exc=str(exc)exceptException:str_exc='undisplayable exception'msg+=str_excifcontentisnotNone:position=getattr(exc,"position",(0,))[0]ifposition:# define filterifisinstance(content,str):content=unicode(content,sys.getdefaultencoding(),'replace')content=validator.preprocess_data(content)content=content.splitlines()width=int(log(len(content),10))+1line_template=" %"+("%i"%width)+"i: %s"# XXX no need to iterate the whole file except to get# the line numbercontent=u'\n'.join(line_template%(idx+1,line)foridx,lineinenumerate(content)ifline_context_filter(idx+1,position))msg+=u'\nfor content:\n%s'%contentraiseAssertionError,msg,tcbkdefassertDocTestFile(self,testfile):# doctest returns tuple (failure_count, test_count)result=self.shell().process_script(testfile)ifresult[0]andresult[1]:raiseself.failureException("doctest file '%s' failed"%testfile)# notifications ############################################################defassertSentEmail(self,subject,recipients=None,nb_msgs=None):"""test recipients in system mailbox for given email subject :param subject: email subject to find in mailbox :param recipients: list of email recipients :param nb_msgs: expected number of entries :returns: list of matched emails """messages=[emailforemailinMAILBOXifemail.message.get('Subject')==subject]ifrecipientsisnotNone:sent_to=set()formsginmessages:sent_to.update(msg.recipients)self.assertSetEqual(set(recipients),sent_to)ifnb_msgsisnotNone:self.assertEqual(len(MAILBOX),nb_msgs)returnmessages# deprecated ###############################################################@deprecated('[3.8] use self.execute(...).get_entity(0, 0)')defentity(self,rql,args=None,eidkey=None,req=None):ifeidkeyisnotNone:warn('[3.8] eidkey is deprecated, you can safely remove this argument',DeprecationWarning,stacklevel=2)returnself.execute(rql,args,req=req).get_entity(0,0)# auto-populating test classes and utilities ###################################fromcubicweb.devtools.fillimportinsert_entity_queries,make_relations_queries# XXX cleanup unprotected_entities & all messdefhow_many_dict(schema,cursor,how_many,skip):"""given a schema, compute how many entities by type we need to be able to satisfy relations cardinality. The `how_many` argument tells how many entities of which type we want at least. Return a dictionary with entity types as key, and the number of entities for this type as value. """relmap={}forrschemainschema.relations():ifrschema.final:continueforsubj,objinrschema.rdefs:card=rschema.rdef(subj,obj).cardinality# if the relation is mandatory, we'll need at least as many subj and# obj to satisfy itifcard[0]in'1+'andcard[1]in'1?':# subj has to be linked to at least one obj,# but obj can be linked to only one subj# -> we need at least as many subj as obj to satisfy# cardinalities for this relationrelmap.setdefault((rschema,subj),[]).append(str(obj))ifcard[1]in'1+'andcard[0]in'1?':# reverse subj and obj in the above explanationrelmap.setdefault((rschema,obj),[]).append(str(subj))unprotected=unprotected_entities(schema)foretypeinskip:# XXX (syt) duh? explain or killunprotected.add(etype)howmanydict={}# step 1, compute a base number of each entity types: number of already# existing entities of this type + `how_many`foretypeinunprotected_entities(schema,strict=True):howmanydict[str(etype)]=cursor.execute('Any COUNT(X) WHERE X is %s'%etype)[0][0]ifetypeinunprotected:howmanydict[str(etype)]+=how_many# step 2, augment nb entity per types to satisfy cardinality constraints,# by recomputing for each relation that constrained an entity type:## new num for etype = max(current num, sum(num for possible target etypes))## XXX we should first check there is no cycle then propagate changesfor(rschema,etype),targetsinrelmap.iteritems():relfactor=sum(howmanydict[e]foreintargets)howmanydict[str(etype)]=max(relfactor,howmanydict[etype])returnhowmanydictclassAutoPopulateTest(CubicWebTC):"""base class for test with auto-populating of the database"""__abstract__=Truetest_db_id='autopopulate'tags=CubicWebTC.tags|Tags('autopopulated')pdbclass=CubicWebDebugger# this is a hook to be able to define a list of rql queries# that are application dependent and cannot be guessed automaticallyapplication_rql=[]no_auto_populate=()ignored_relations=set()defto_test_etypes(self):returnunprotected_entities(self.schema,strict=True)defcustom_populate(self,how_many,cursor):passdefpost_populate(self,cursor):pass@nocoveragedefauto_populate(self,how_many):"""this method populates the database with `how_many` entities of each possible type. It also inserts random relations between them """withself.session.security_enabled(read=False,write=False):self._auto_populate(how_many)def_auto_populate(self,how_many):cu=self.cursor()self.custom_populate(how_many,cu)vreg=self.vreghowmanydict=how_many_dict(self.schema,cu,how_many,self.no_auto_populate)foretypeinunprotected_entities(self.schema):ifetypeinself.no_auto_populate:continuenb=howmanydict.get(etype,how_many)forrql,argsininsert_entity_queries(etype,self.schema,vreg,nb):cu.execute(rql,args)edict={}foretypeinunprotected_entities(self.schema,strict=True):rset=cu.execute('%s X'%etype)edict[str(etype)]=set(row[0]forrowinrset.rows)existingrels={}ignored_relations=SYSTEM_RELATIONS|self.ignored_relationsforrschemainself.schema.relations():ifrschema.finalorrschemainignored_relations:continuerset=cu.execute('DISTINCT Any X,Y WHERE X %s Y'%rschema)existingrels.setdefault(rschema.type,set()).update((x,y)forx,yinrset)q=make_relations_queries(self.schema,edict,cu,ignored_relations,existingrels=existingrels)forrql,argsinq:try:cu.execute(rql,args)exceptValidationErrorasex:# failed to satisfy some constraintprint'error in automatic db population',exself.session.commit_state=None# reset uncommitable flagself.post_populate(cu)self.commit()defiter_individual_rsets(self,etypes=None,limit=None):etypes=etypesorself.to_test_etypes()foretypeinetypes:iflimit:rql='Any X LIMIT %s WHERE X is %s'%(limit,etype)else:rql='Any X WHERE X is %s'%etyperset=self.execute(rql)forrowinxrange(len(rset)):iflimitandrow>limit:break# XXX iirkrset2=rset.limit(limit=1,offset=row)yieldrset2defiter_automatic_rsets(self,limit=10):"""generates basic resultsets for each entity type"""etypes=self.to_test_etypes()ifnotetypes:returnforetypeinetypes:yieldself.execute('Any X LIMIT %s WHERE X is %s'%(limit,etype))etype1=etypes.pop()try:etype2=etypes.pop()exceptKeyError:etype2=etype1# test a mixed query (DISTINCT/GROUP to avoid getting duplicate# X which make muledit view failing for instance (html validation fails# because of some duplicate "id" attributes)yieldself.execute('DISTINCT Any X, MAX(Y) GROUPBY X WHERE X is %s, Y is %s'%(etype1,etype2))# test some application-specific queries if definedforrqlinself.application_rql:yieldself.execute(rql)def_test_everything_for(self,rset):"""this method tries to find everything that can be tested for `rset` and yields a callable test (as needed in generative tests) """propdefs=self.vreg['propertydefs']# make all components visiblefork,vinpropdefs.items():ifk.endswith('visible')andnotv['default']:propdefs[k]['default']=Trueforviewinself.list_views_for(rset):backup_rset=rset.copy(rset.rows,rset.description)yieldInnerTest(self._testname(rset,view.__regid__,'view'),self.view,view.__regid__,rset,rset.req.reset_headers(),'main-template')# We have to do this because some views modify the# resultset's syntax treerset=backup_rsetforactioninself.list_actions_for(rset):yieldInnerTest(self._testname(rset,action.__regid__,'action'),self._test_action,action)forboxinself.list_boxes_for(rset):w=[].appendyieldInnerTest(self._testname(rset,box.__regid__,'box'),box.render,w)@staticmethoddef_testname(rset,objid,objtype):return'%s_%s_%s'%('_'.join(rset.column_types(0)),objid,objtype)# concrete class for automated application testing ############################classAutomaticWebTest(AutoPopulateTest):"""import this if you wan automatic tests to be ran"""tags=AutoPopulateTest.tags|Tags('web','generated')defsetUp(self):assertnotself.__class__isAutomaticWebTest,'Please subclass AutomaticWebTest to prevent database caching issue'super(AutomaticWebTest,self).setUp()# access to self.app for proper initialization of the authentication# machinery (else some views may fail)self.app## one eachdeftest_one_each_config(self):self.auto_populate(1)forrsetinself.iter_automatic_rsets(limit=1):fortestargsinself._test_everything_for(rset):yieldtestargs## ten eachdeftest_ten_each_config(self):self.auto_populate(10)forrsetinself.iter_automatic_rsets(limit=10):fortestargsinself._test_everything_for(rset):yieldtestargs## startup viewsdeftest_startup_views(self):forvidinself.list_startup_views():req=self.request()yieldself.view,vid,None,req# registry instrumentization ###################################################defnot_selected(vreg,appobject):try:vreg._selected[appobject.__class__]-=1except(KeyError,AttributeError):pass# def vreg_instrumentize(testclass):# # XXX broken# from cubicweb.devtools.apptest import TestEnvironment# env = testclass._env = TestEnvironment('data', configcls=testclass.configcls)# for reg in env.vreg.itervalues():# reg._selected = {}# try:# orig_select_best = reg.__class__.__orig_select_best# except Exception:# orig_select_best = reg.__class__._select_best# def instr_select_best(self, *args, **kwargs):# selected = orig_select_best(self, *args, **kwargs)# try:# self._selected[selected.__class__] += 1# except KeyError:# self._selected[selected.__class__] = 1# except AttributeError:# pass # occurs on reg used to restore database# return selected# reg.__class__._select_best = instr_select_best# reg.__class__.__orig_select_best = orig_select_best# def print_untested_objects(testclass, skipregs=('hooks', 'etypes')):# for regname, reg in testclass._env.vreg.iteritems():# if regname in skipregs:# continue# for appobjects in reg.itervalues():# for appobject in appobjects:# if not reg._selected.get(appobject):# print 'not tested', regname, appobject