[i18n test] hack to make i18n tests run using a Python interpreter
they used to work fine when executed via pytest only.
This should be removed as soon as logilab.common.registry is fixed.
# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""Test tools for cubicweb"""__docformat__="restructuredtext en"importosimportsysimportloggingimportshutilimportpickleimportglobimportwarningsfromhashlibimportsha1# pylint: disable=E0611fromdatetimeimporttimedeltafromos.pathimport(abspath,join,exists,split,isabs,isdir)fromfunctoolsimportpartialfromlogilab.common.dateimportstrptimefromlogilab.common.decoratorsimportcached,clear_cachefromcubicwebimportExecutionError,BadConnectionIdfromcubicwebimportschema,cwconfigfromcubicweb.server.serverconfigimportServerConfigurationfromcubicweb.etwist.twconfigimportTwistedConfigurationcwconfig.CubicWebConfiguration.cls_adjust_sys_path()# db auto-population configuration #############################################SYSTEM_ENTITIES=(schema.SCHEMA_TYPES|schema.INTERNAL_TYPES|schema.WORKFLOW_TYPES|set(('CWGroup','CWUser',)))SYSTEM_RELATIONS=(schema.META_RTYPES|schema.WORKFLOW_RTYPES|schema.WORKFLOW_DEF_RTYPES|schema.SYSTEM_RTYPES|schema.SCHEMA_TYPES|set(('primary_email',# deducted from other relations)))# content validation configuration ############################################## validators are used to validate (XML, DTD, whatever) view's content# validators availables are :# 'dtd' : validates XML + declared DTD# 'xml' : guarantees XML is well formed# None : do not try to validate anything# {'vid': validator}VIEW_VALIDATORS={}# cubicweb test configuration ##################################################BASE_URL='http://testing.fr/cubicweb/'DEFAULT_SOURCES={'system':{'adapter':'native','db-encoding':'UTF-8',#'ISO-8859-1','db-user':u'admin','db-password':'gingkow','db-name':'tmpdb','db-driver':'sqlite','db-host':None,},'admin':{'login':u'admin','password':u'gingkow',},}defturn_repo_off(repo):""" Idea: this is less costly than a full re-creation of the repo object. off: * session are closed, * cnxsets are closed * system source is shutdown """ifnotrepo._needs_refresh:forsessionidinlist(repo._sessions):warnings.warn('%s Open session found while turning repository off'%sessionid,RuntimeWarning)try:repo.close(sessionid)exceptBadConnectionId:#this is strange ? thread issue ?print'XXX unknown session',sessionidforcnxsetinrepo.cnxsets:cnxset.close(True)repo.system_source.shutdown()repo._needs_refresh=Truerepo._has_started=Falsedefturn_repo_on(repo):"""Idea: this is less costly than a full re-creation of the repo object. on: * cnxsets are connected * cache are cleared """ifrepo._needs_refresh:forcnxsetinrepo.cnxsets:cnxset.reconnect()repo._type_source_cache={}repo._extid_cache={}repo.querier._rql_cache={}forsourceinrepo.sources:source.reset_caches()repo._needs_refresh=FalseclassTestServerConfiguration(ServerConfiguration):mode='test'read_instance_schema=Falseinit_repository=Trueskip_db_create_and_restore=Falsedef__init__(self,appid='data',apphome=None,log_threshold=logging.CRITICAL+10):# must be set before calling parent __init__ifapphomeisNone:ifexists(appid):apphome=abspath(appid)else:# cube testapphome=abspath('..')self._apphome=apphomeServerConfiguration.__init__(self,appid)self.init_log(log_threshold,force=True)# need this, usually triggered by cubicweb-ctlself.load_cwctl_plugins()# By default anonymous login are allow but some test need to deny of to# change the default user. Set it to None to prevent anonymous login.anonymous_credential=('anon','anon')defanonymous_user(self):ifnotself.anonymous_credential:returnNone,Nonereturnself.anonymous_credentialdefset_anonymous_allowed(self,allowed,anonuser='anon'):ifallowed:self.anonymous_credential=(anonuser,anonuser)else:self.anonymous_credential=None@propertydefapphome(self):returnself._apphomeappdatahome=apphomedefload_configuration(self):super(TestServerConfiguration,self).load_configuration()# no undo support in testsself.global_set_option('undo-enabled','n')defmain_config_file(self):"""return instance's control configuration file"""returnjoin(self.apphome,'%s.conf'%self.name)defbootstrap_cubes(self):try:super(TestServerConfiguration,self).bootstrap_cubes()exceptIOError:# no cubesself.init_cubes(())sourcefile=Nonedefsources_file(self):"""define in subclasses self.sourcefile if necessary"""ifself.sourcefile:print'Reading sources from',self.sourcefilesourcefile=self.sourcefileifnotisabs(sourcefile):sourcefile=join(self.apphome,sourcefile)else:sourcefile=super(TestServerConfiguration,self).sources_file()returnsourcefiledefsources(self):"""By default, we run tests with the sqlite DB backend. One may use its own configuration by just creating a 'sources' file in the test directory from wich tests are launched or by specifying an alternative sources file using self.sourcefile. """try:sources=super(TestServerConfiguration,self).sources()exceptExecutionError:sources={}ifnotsources:sources=DEFAULT_SOURCESif'admin'notinsources:sources['admin']=DEFAULT_SOURCES['admin']returnsources# web config methods needed here for cases when we use this config as a web# configdefdefault_base_url(self):returnBASE_URLclassBaseApptestConfiguration(TestServerConfiguration,TwistedConfiguration):name='all-in-one'# so it search for all-in-one.conf, not repository.confoptions=cwconfig.merge_options(TestServerConfiguration.options+TwistedConfiguration.options)cubicweb_appobject_path=TestServerConfiguration.cubicweb_appobject_path|TwistedConfiguration.cubicweb_appobject_pathcube_appobject_path=TestServerConfiguration.cube_appobject_path|TwistedConfiguration.cube_appobject_pathdefavailable_languages(self,*args):returnself.cw_languages()defpyro_enabled(self):# but export PYRO_MULTITHREAD=0 or you get problems with sqlite and# threadsreturnTrue# XXX merge with BaseApptestConfiguration ?classApptestConfiguration(BaseApptestConfiguration):# `skip_db_create_and_restore` controls wether or not the test database# should be created / backuped / restored. If set to True, those# steps are completely skipped, the database is used as is and is# considered initializedskip_db_create_and_restore=Falsedef__init__(self,appid,apphome=None,log_threshold=logging.CRITICAL,sourcefile=None):BaseApptestConfiguration.__init__(self,appid,apphome,log_threshold=log_threshold)self.init_repository=sourcefileisNoneself.sourcefile=sourcefileclassRealDatabaseConfiguration(ApptestConfiguration):"""configuration class for tests to run on a real database. The intialization is done by specifying a source file path. Important note: init_test_database / reset_test_database steps are skipped. It's thus up to the test developer to implement setUp/tearDown accordingly. Example usage:: class MyTests(CubicWebTC): _config = RealDatabaseConfiguration('myapp', sourcefile='/path/to/sources') def test_something(self): rset = self.execute('Any X WHERE X is CWUser') self.view('foaf', rset) """skip_db_create_and_restore=Trueread_instance_schema=True# read schema from database# test database handling #######################################################DEFAULT_EMPTY_DB_ID='__default_empty_db__'classTestDataBaseHandler(object):DRIVER=Nonedb_cache={}explored_glob=set()def__init__(self,config):self.config=configself._repo=None# pure consistency checkassertself.system_source['db-driver']==self.DRIVERdef_ensure_test_backup_db_dir(self):"""Return path of directory for database backup. The function create it if necessary"""backupdir=join(self.config.apphome,'database')ifnotisdir(backupdir):os.makedirs(backupdir)returnbackupdirdefconfig_path(self,db_id):"""Path for config backup of a given database id"""returnself.absolute_backup_file(db_id,'config')defabsolute_backup_file(self,db_id,suffix):"""Path for config backup of a given database id"""# in case db name is an absolute path, we don't want to replace anything# in parent directoriesdirectory,basename=split(self.dbname)dbname=basename.replace('-','_')assert'.'notindb_idfilename=join(directory,'%s-%s.%s'%(dbname,db_id,suffix))returnjoin(self._ensure_test_backup_db_dir(),filename)defdb_cache_key(self,db_id,dbname=None):"""Build a database cache key for a db_id with the current config This key is meant to be used in the cls.db_cache mapping"""ifdbnameisNone:dbname=self.dbnamedbname=os.path.basename(dbname)dbname=dbname.replace('-','_')return(self.config.apphome,dbname,db_id)defbackup_database(self,db_id):"""Store the content of the current database as <db_id> The config used are also stored."""backup_data=self._backup_database(db_id)config_path=self.config_path(db_id)# XXX we dump a dict of the config# This is an experimental to help config dependant setup (like BFSS) to# be propertly restoredwithopen(config_path,'wb')asconf_file:conf_file.write(pickle.dumps(dict(self.config)))self.db_cache[self.db_cache_key(db_id)]=(backup_data,config_path)def_backup_database(self,db_id):"""Actual backup the current database. return a value to be stored in db_cache to allow restoration"""raiseNotImplementedError()defrestore_database(self,db_id):"""Restore a database. takes as argument value stored in db_cache by self._backup_database"""# XXX set a clearer error message ???backup_coordinates,config_path=self.db_cache[self.db_cache_key(db_id)]# reload the config used to create the database.config=pickle.loads(open(config_path,'rb').read())# shutdown repo before changing database contentifself._repoisnotNone:self._repo.turn_repo_off()self._restore_database(backup_coordinates,config)def_restore_database(self,backup_coordinates,config):"""Actual restore of the current database. Use the value stored in db_cache as input """raiseNotImplementedError()defget_repo(self,startup=False):""" return Repository object on the current database. (turn the current repo object "on" if there is one or recreate one) if startup is True, server startup server hooks will be called if needed """ifself._repoisNone:self._repo=self._new_repo(self.config)repo=self._reporepo.turn_repo_on()ifstartupandnotrepo._has_started:repo.hm.call_hooks('server_startup',repo=repo)repo._has_started=Truereturnrepodef_new_repo(self,config):"""Factory method to create a new Repository Instance"""fromcubicweb.dbapiimportin_memory_repoconfig._cubes=Nonerepo=in_memory_repo(config)# extending Repository classrepo._has_started=Falserepo._needs_refresh=Falserepo.turn_repo_on=partial(turn_repo_on,repo)repo.turn_repo_off=partial(turn_repo_off,repo)returnrepodefget_cnx(self):"""return Connection object on the current repository"""fromcubicweb.dbapiimport_repo_connectrepo=self.get_repo()sources=self.config.sources()login=unicode(sources['admin']['login'])password=sources['admin']['password']or'xxx'cnx=_repo_connect(repo,login,password=password)returncnxdefget_repo_and_cnx(self,db_id=DEFAULT_EMPTY_DB_ID):"""Reset database with the current db_id and return (repo, cnx) A database *MUST* have been build with the current <db_id> prior to call this method. See the ``build_db_cache`` method. The returned repository have it's startup hooks called and the connection is establised as admin."""self.restore_database(db_id)repo=self.get_repo(startup=True)cnx=self.get_cnx()returnrepo,cnx@propertydefsystem_source(self):sources=self.config.sources()returnsources['system']@propertydefdbname(self):returnself.system_source['db-name']definit_test_database(self):"""actual initialisation of the database"""raiseValueError('no initialization function for driver %r'%self.DRIVER)defhas_cache(self,db_id):"""Check if a given database id exist in cb cache for the current config"""cache_glob=self.absolute_backup_file('*','*')ifcache_globnotinself.explored_glob:self.discover_cached_db()returnself.db_cache_key(db_id)inself.db_cachedefdiscover_cached_db(self):"""Search available db_if for the current config"""cache_glob=self.absolute_backup_file('*','*')directory=os.path.dirname(cache_glob)entries={}candidates=glob.glob(cache_glob)forfilepathincandidates:data=os.path.basename(filepath)# database backup are in the forms are <dbname>-<db_id>.<backtype>dbname,data=data.split('-',1)db_id,filetype=data.split('.',1)entries.setdefault((dbname,db_id),{})[filetype]=filepathfor(dbname,db_id),entryinentries.iteritems():# apply necessary transformation from the drivervalue=self.process_cache_entry(directory,dbname,db_id,entry)assert'config'inentryifvalueisnotNone:# None value means "not handled by this driver# XXX Ignored value are shadowed to other Handler if cache are common.key=self.db_cache_key(db_id,dbname=dbname)self.db_cache[key]=value,entry['config']self.explored_glob.add(cache_glob)defprocess_cache_entry(self,directory,dbname,db_id,entry):"""Transforms potential cache entry to proper backup coordinate entry argument is a "filetype" -> "filepath" mapping Return None if an entry should be ignored."""returnNonedefbuild_db_cache(self,test_db_id=DEFAULT_EMPTY_DB_ID,pre_setup_func=None):"""Build Database cache for ``test_db_id`` if a cache doesn't exist if ``test_db_id is DEFAULT_EMPTY_DB_ID`` self.init_test_database is called. otherwise, DEFAULT_EMPTY_DB_ID is build/restored and ``pre_setup_func`` to setup the database. This function backup any database it build"""ifself.has_cache(test_db_id):return#test_db_id, 'already in cache'iftest_db_idisDEFAULT_EMPTY_DB_ID:self.init_test_database()else:print'Building %s for database %s'%(test_db_id,self.dbname)self.build_db_cache(DEFAULT_EMPTY_DB_ID)self.restore_database(DEFAULT_EMPTY_DB_ID)repo=self.get_repo(startup=True)cnx=self.get_cnx()session=repo._sessions[cnx.sessionid]session.set_cnxset()_commit=session.commitdefkeep_cnxset_commit(free_cnxset=False):_commit(free_cnxset=free_cnxset)session.commit=keep_cnxset_commitpre_setup_func(session,self.config)session.commit()cnx.close()self.backup_database(test_db_id)classNoCreateDropDatabaseHandler(TestDataBaseHandler):"""This handler is used if config.skip_db_create_and_restore is True This is typically the case with RealDBConfig. In that case, we explicitely want to skip init / backup / restore phases. This handler redefines the three corresponding methods and delegates to original handler for any other method / attribute """def__init__(self,base_handler):self.base_handler=base_handler# override init / backup / restore methodsdefinit_test_database(self):passdefbackup_database(self,db_id):passdefrestore_database(self,db_id):pass# delegate to original handler in all other casesdef__getattr__(self,attrname):returngetattr(self.base_handler,attrname)### postgres test database handling ############################################classPostgresTestDataBaseHandler(TestDataBaseHandler):DRIVER='postgres'@property@cacheddefhelper(self):fromlogilab.databaseimportget_db_helperreturnget_db_helper('postgres')@propertydefdbcnx(self):try:returnself._cnxexceptAttributeError:fromcubicweb.server.serverctlimport_db_sys_cnxtry:self._cnx=_db_sys_cnx(self.system_source,'CREATE DATABASE and / or USER',interactive=False)returnself._cnxexceptException:self._cnx=Noneraise@property@cacheddefcursor(self):returnself.dbcnx.cursor()defprocess_cache_entry(self,directory,dbname,db_id,entry):backup_name=self._backup_name(db_id)ifbackup_nameinself.helper.list_databases(self.cursor):returnbackup_namereturnNonedefinit_test_database(self):"""initialize a fresh postgresql database used for testing purpose"""fromcubicweb.serverimportinit_repositoryfromcubicweb.server.serverctlimportsystem_source_cnx,createdb# connect on the dbms system base to create our basetry:self._drop(self.dbname)createdb(self.helper,self.system_source,self.dbcnx,self.cursor)self.dbcnx.commit()cnx=system_source_cnx(self.system_source,special_privs='LANGUAGE C',interactive=False)templcursor=cnx.cursor()try:# XXX factorize with db-create codeself.helper.init_fti_extensions(templcursor)# install plpythonu/plpgsql language if not installed by the cubelangs=sys.platform=='win32'and('plpgsql',)or('plpythonu','plpgsql')forextlanginlangs:self.helper.create_language(templcursor,extlang)cnx.commit()finally:templcursor.close()cnx.close()init_repository(self.config,interactive=False)exceptBaseException:ifself.dbcnxisnotNone:self.dbcnx.rollback()sys.stderr.write('building %s failed\n'%self.dbname)#self._drop(self.dbname)raisedefhelper_clear_cache(self):ifself.dbcnxisnotNone:self.dbcnx.commit()self.dbcnx.close()delself._cnxclear_cache(self,'cursor')clear_cache(self,'helper')def__del__(self):self.helper_clear_cache()@propertydef_config_id(self):returnsha1(self.config.apphome).hexdigest()[:10]def_backup_name(self,db_id):# merge me with parentbackup_name='_'.join(('cache',self._config_id,self.dbname,db_id))returnbackup_name.lower()def_drop(self,db_name):ifdb_nameinself.helper.list_databases(self.cursor):self.cursor.execute('DROP DATABASE %s'%db_name)self.dbcnx.commit()def_backup_database(self,db_id):"""Actual backup the current database. return a value to be stored in db_cache to allow restoration """fromcubicweb.server.serverctlimportcreatedborig_name=self.system_source['db-name']try:backup_name=self._backup_name(db_id)self._drop(backup_name)self.system_source['db-name']=backup_name# during postgres database initialization, there is no repo set here.assertself._repoisNone#self._repo.turn_repo_off()createdb(self.helper,self.system_source,self.dbcnx,self.cursor,template=orig_name)self.dbcnx.commit()#self._repo.turn_repo_on()returnbackup_namefinally:self.system_source['db-name']=orig_namedef_restore_database(self,backup_coordinates,config):fromcubicweb.server.serverctlimportcreatedb"""Actual restore of the current database. Use the value tostored in db_cache as input """self._drop(self.dbname)createdb(self.helper,self.system_source,self.dbcnx,self.cursor,template=backup_coordinates)self.dbcnx.commit()### sqlserver2005 test database handling #######################################classSQLServerTestDataBaseHandler(TestDataBaseHandler):DRIVER='sqlserver'# XXX complete medefinit_test_database(self):"""initialize a fresh sqlserver databse used for testing purpose"""ifself.config.init_repository:fromcubicweb.serverimportinit_repositoryinit_repository(self.config,interactive=False,drop=True)### sqlite test database handling ##############################################classSQLiteTestDataBaseHandler(TestDataBaseHandler):DRIVER='sqlite'__TMPDB=set()@classmethoddef_cleanup_all_tmpdb(cls):fordbpathincls.__TMPDB:cls._cleanup_database(dbpath)def__init__(self,*args,**kwargs):super(SQLiteTestDataBaseHandler,self).__init__(*args,**kwargs)# use a dedicated base for each process.if'global-db-name'notinself.system_source:self.system_source['global-db-name']=self.system_source['db-name']process_db=self.system_source['db-name']+str(os.getpid())self.system_source['db-name']=process_dbprocess_db=self.absolute_dbfile()# update db-name to absolute pathself.__TMPDB.add(process_db)@staticmethoddef_cleanup_database(dbfile):try:os.remove(dbfile)os.remove('%s-journal'%dbfile)exceptOSError:pass@propertydefdbname(self):returnself.system_source['global-db-name']defabsolute_dbfile(self):"""absolute path of current database file"""dbfile=join(self._ensure_test_backup_db_dir(),self.config.sources()['system']['db-name'])self.config.sources()['system']['db-name']=dbfilereturndbfiledefprocess_cache_entry(self,directory,dbname,db_id,entry):returnentry.get('sqlite')def_backup_database(self,db_id=DEFAULT_EMPTY_DB_ID):# XXX remove database file if it exists ???dbfile=self.absolute_dbfile()backup_file=self.absolute_backup_file(db_id,'sqlite')shutil.copy(dbfile,backup_file)# Usefull to debug WHO write a database# backup_stack = self.absolute_backup_file(db_id, '.stack')#with open(backup_stack, 'w') as backup_stack_file:# import traceback# traceback.print_stack(file=backup_stack_file)returnbackup_filedef_new_repo(self,config):repo=super(SQLiteTestDataBaseHandler,self)._new_repo(config)install_sqlite_patch(repo.querier)returnrepodef_restore_database(self,backup_coordinates,_config):# remove database file if it exists ?dbfile=self.absolute_dbfile()self._cleanup_database(dbfile)shutil.copy(backup_coordinates,dbfile)self.get_repo()definit_test_database(self):"""initialize a fresh sqlite databse used for testing purpose"""# initialize the databasefromcubicweb.serverimportinit_repositoryself._cleanup_database(self.absolute_dbfile())init_repository(self.config,interactive=False)importatexitatexit.register(SQLiteTestDataBaseHandler._cleanup_all_tmpdb)definstall_sqlite_patch(querier):"""This patch hotfixes the following sqlite bug : - http://www.sqlite.org/cvstrac/tktview?tn=1327,33 (some dates are returned as strings rather thant date objects) """ifhasattr(querier.__class__,'_devtools_sqlite_patched'):return# already monkey patcheddefwrap_execute(base_execute):defnew_execute(*args,**kwargs):rset=base_execute(*args,**kwargs)ifrset.description:found_date=Falseforrow,rowdescinzip(rset,rset.description):forcellindex,(value,vtype)inenumerate(zip(row,rowdesc)):ifvtypein('Date','Datetime')andtype(value)isunicode:found_date=Truevalue=value.rsplit('.',1)[0]try:row[cellindex]=strptime(value,'%Y-%m-%d %H:%M:%S')exceptException:row[cellindex]=strptime(value,'%Y-%m-%d')ifvtype=='Time'andtype(value)isunicode:found_date=Truetry:row[cellindex]=strptime(value,'%H:%M:%S')exceptException:# DateTime used as Time?row[cellindex]=strptime(value,'%Y-%m-%d %H:%M:%S')ifvtype=='Interval'andtype(value)isint:found_date=Truerow[cellindex]=timedelta(0,value,0)# XXX value is in number of seconds?ifnotfound_date:breakreturnrsetreturnnew_executequerier.__class__.execute=wrap_execute(querier.__class__.execute)querier.__class__._devtools_sqlite_patched=TrueHANDLERS={}defregister_handler(handlerkls,overwrite=False):asserthandlerklsisnotNoneifoverwriteorhandlerkls.DRIVERnotinHANDLERS:HANDLERS[handlerkls.DRIVER]=handlerklselse:msg="%s: Handler already exists use overwrite if it's intended\n"\"(existing handler class is %r)"raiseValueError(msg%(handlerkls.DRIVER,HANDLERS[handlerkls.DRIVER]))register_handler(PostgresTestDataBaseHandler)register_handler(SQLiteTestDataBaseHandler)register_handler(SQLServerTestDataBaseHandler)classHCache(object):"""Handler cache object: store database handler for a given configuration. We only keep one repo in cache to prevent too much objects to stay alive (database handler holds a reference to a repository). As at the moment a new handler is created for each TestCase class and all test methods are executed sequentialy whithin this class, there should not have more cache miss that if we had a wider cache as once a Handler stop being used it won't be used again. """def__init__(self):self.config=Noneself.handler=Nonedefget(self,config):ifconfigisself.config:returnself.handlerelse:returnNonedefset(self,config,handler):self.config=configself.handler=handlerHCACHE=HCache()# XXX a class method on Test ?defget_test_db_handler(config):handler=HCACHE.get(config)ifhandlerisnotNone:returnhandlersources=config.sources()driver=sources['system']['db-driver']key=(driver,config)handlerkls=HANDLERS.get(driver,None)ifhandlerklsisnotNone:handler=handlerkls(config)ifconfig.skip_db_create_and_restore:handler=NoCreateDropDatabaseHandler(handler)HCACHE.set(config,handler)returnhandlerelse:raiseValueError('no initialization function for driver %r'%driver)### compatibility layer ##############################################fromlogilab.common.deprecationimportdeprecated@deprecated("please use the new DatabaseHandler mecanism")definit_test_database(config=None,configdir='data',apphome=None):"""init a test database for a specific driver"""ifconfigisNone:config=TestServerConfiguration(apphome=apphome)handler=get_test_db_handler(config)handler.build_db_cache()returnhandler.get_repo_and_cnx()