[AnyEntity] rename __linkto attribute as cw_linkto to avoid name mangling which make overriding/monkey-patching unnecessarily harder
# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""Test tools for cubicweb"""from__future__importwith_statement__docformat__="restructuredtext en"importosimportsysimportloggingimportshutilimportpickleimportglobimportwarningsfromdatetimeimporttimedeltafromos.pathimport(abspath,join,exists,basename,dirname,normpath,split,isfile,isabs,splitext,isdir,expanduser)fromfunctoolsimportpartialimporthashlibfromlogilab.common.dateimportstrptimefromlogilab.common.decoratorsimportcached,clear_cachefromcubicwebimportCW_SOFTWARE_ROOT,ConfigurationError,schema,cwconfig,BadConnectionIdfromcubicweb.server.serverconfigimportServerConfigurationfromcubicweb.etwist.twconfigimportTwistedConfigurationcwconfig.CubicWebConfiguration.cls_adjust_sys_path()# db auto-population configuration #############################################SYSTEM_ENTITIES=(schema.SCHEMA_TYPES|schema.INTERNAL_TYPES|schema.WORKFLOW_TYPES|set(('CWGroup','CWUser',)))SYSTEM_RELATIONS=(schema.META_RTYPES|schema.WORKFLOW_RTYPES|schema.WORKFLOW_DEF_RTYPES|schema.SYSTEM_RTYPES|schema.SCHEMA_TYPES|set(('primary_email',# deducted from other relations)))# content validation configuration ############################################## validators are used to validate (XML, DTD, whatever) view's content# validators availables are :# 'dtd' : validates XML + declared DTD# 'xml' : guarantees XML is well formed# None : do not try to validate anything# {'vid': validator}VIEW_VALIDATORS={}# cubicweb test configuration ##################################################BASE_URL='http://testing.fr/cubicweb/'DEFAULT_SOURCES={'system':{'adapter':'native','db-encoding':'UTF-8',#'ISO-8859-1','db-user':u'admin','db-password':'gingkow','db-name':'tmpdb','db-driver':'sqlite','db-host':None,},'admin':{'login':u'admin','password':u'gingkow',},}defturn_repo_off(repo):""" Idea: this is less costly than a full re-creation of the repo object. off: * session are closed, * pools are closed * system source is shutdown """ifnotrepo._needs_refresh:forsessionidinlist(repo._sessions):warnings.warn('%s Open session found while turning repository off'%sessionid,RuntimeWarning)try:repo.close(sessionid)exceptBadConnectionId:#this is strange ? thread issue ?print'XXX unknown session',sessionidforpoolinrepo.pools:pool.close(True)repo.system_source.shutdown()repo._needs_refresh=Truerepo._has_started=Falsedefturn_repo_on(repo):"""Idea: this is less costly than a full re-creation of the repo object. on: * pools are connected * cache are cleared """ifrepo._needs_refresh:forpoolinrepo.pools:pool.reconnect()repo._type_source_cache={}repo._extid_cache={}repo.querier._rql_cache={}forsourceinrepo.sources:source.reset_caches()repo._needs_refresh=FalseclassTestServerConfiguration(ServerConfiguration):mode='test'read_instance_schema=Falseinit_repository=Truedef__init__(self,appid='data',apphome=None,log_threshold=logging.CRITICAL+10):# must be set before calling parent __init__ifapphomeisNone:ifexists(appid):apphome=abspath(appid)else:# cube testapphome=abspath('..')self._apphome=apphomeServerConfiguration.__init__(self,appid)self.init_log(log_threshold,force=True)# need this, usually triggered by cubicweb-ctlself.load_cwctl_plugins()# By default anonymous login are allow but some test need to deny of to# change the default user. Set it to None to prevent anonymous login.anonymous_credential=('anon','anon')defanonymous_user(self):ifnotself.anonymous_credential:returnNone,Nonereturnself.anonymous_credentialdefset_anonymous_allowed(self,allowed,anonuser='anon'):ifallowed:self.anonymous_credential=(anonuser,anonuser)else:self.anonymous_credential=None@propertydefapphome(self):returnself._apphomeappdatahome=apphomedefload_configuration(self):super(TestServerConfiguration,self).load_configuration()# no undo support in testsself.global_set_option('undo-support','')defmain_config_file(self):"""return instance's control configuration file"""returnjoin(self.apphome,'%s.conf'%self.name)defbootstrap_cubes(self):try:super(TestServerConfiguration,self).bootstrap_cubes()exceptIOError:# no cubesself.init_cubes(())sourcefile=Nonedefsources_file(self):"""define in subclasses self.sourcefile if necessary"""ifself.sourcefile:print'Reading sources from',self.sourcefilesourcefile=self.sourcefileifnotisabs(sourcefile):sourcefile=join(self.apphome,sourcefile)else:sourcefile=super(TestServerConfiguration,self).sources_file()returnsourcefiledefsources(self):"""By default, we run tests with the sqlite DB backend. One may use its own configuration by just creating a 'sources' file in the test directory from wich tests are launched or by specifying an alternative sources file using self.sourcefile. """sources=super(TestServerConfiguration,self).sources()ifnotsources:sources=DEFAULT_SOURCESif'admin'notinsources:sources['admin']=DEFAULT_SOURCES['admin']returnsources# web config methods needed here for cases when we use this config as a web# configdefinstance_md5_version(self):return''defdefault_base_url(self):returnBASE_URLclassBaseApptestConfiguration(TestServerConfiguration,TwistedConfiguration):repo_method='inmemory'name='all-in-one'# so it search for all-in-one.conf, not repository.confoptions=cwconfig.merge_options(TestServerConfiguration.options+TwistedConfiguration.options)cubicweb_appobject_path=TestServerConfiguration.cubicweb_appobject_path|TwistedConfiguration.cubicweb_appobject_pathcube_appobject_path=TestServerConfiguration.cube_appobject_path|TwistedConfiguration.cube_appobject_pathdefavailable_languages(self,*args):returnself.cw_languages()defpyro_enabled(self):# but export PYRO_MULTITHREAD=0 or you get problems with sqlite and# threadsreturnTrue# XXX merge with BaseApptestConfiguration ?classApptestConfiguration(BaseApptestConfiguration):def__init__(self,appid,apphome=None,log_threshold=logging.CRITICAL,sourcefile=None):BaseApptestConfiguration.__init__(self,appid,apphome,log_threshold=log_threshold)self.init_repository=sourcefileisNoneself.sourcefile=sourcefileclassRealDatabaseConfiguration(ApptestConfiguration):"""configuration class for tests to run on a real database. The intialization is done by specifying a source file path. Important note: init_test_database / reset_test_database steps are skipped. It's thus up to the test developer to implement setUp/tearDown accordingly. Example usage:: class MyTests(CubicWebTC): _config = RealDatabseConfiguration('myapp', sourcefile='/path/to/sources') def test_something(self): rset = self.execute('Any X WHERE X is CWUser') self.view('foaf', rset) """read_instance_schema=True# read schema from database# test database handling #######################################################DEFAULT_EMPTY_DB_ID='__default_empty_db__'classTestDataBaseHandler(object):DRIVER=Nonedb_cache={}explored_glob=set()def__init__(self,config):self.config=configself._repo=None# pure consistency checkassertself.system_source['db-driver']==self.DRIVERdef_ensure_test_backup_db_dir(self):"""Return path of directory for database backup. The function create it if necessary"""backupdir=join(self.config.apphome,'database')ifnotisdir(backupdir):os.makedirs(backupdir)returnbackupdirdefconfig_path(self,db_id):"""Path for config backup of a given database id"""returnself.absolute_backup_file(db_id,'config')defabsolute_backup_file(self,db_id,suffix):"""Path for config backup of a given database id"""dbname=self.dbname.replace('-','_')assert'.'notindb_idfilename='%s-%s.%s'%(dbname,db_id,suffix)returnjoin(self._ensure_test_backup_db_dir(),filename)defdb_cache_key(self,db_id,dbname=None):"""Build a database cache key for a db_id with the current config This key is meant to be used in the cls.db_cache mapping"""ifdbnameisNone:dbname=self.dbnamedbname=os.path.basename(dbname)dbname=dbname.replace('-','_')return(self.config.apphome,dbname,db_id)defbackup_database(self,db_id):"""Store the content of the current database as <db_id> The config used are also stored."""backup_data=self._backup_database(db_id)config_path=self.config_path(db_id)# XXX we dump a dict of the config# This is an experimental to help config dependant setup (like BFSS) to# be propertly restoredwithopen(config_path,'wb')asconf_file:conf_file.write(pickle.dumps(dict(self.config)))self.db_cache[self.db_cache_key(db_id)]=(backup_data,config_path)def_backup_database(self,db_id):"""Actual backup the current database. return a value to be stored in db_cache to allow restoration"""raiseNotImplementedError()defrestore_database(self,db_id):"""Restore a database. takes as argument value stored in db_cache by self._backup_database"""# XXX set a clearer error message ???backup_coordinates,config_path=self.db_cache[self.db_cache_key(db_id)]# reload the config used to create the database.config=pickle.loads(open(config_path,'rb').read())# shutdown repo before changing database contentifself._repoisnotNone:self._repo.turn_repo_off()self._restore_database(backup_coordinates,config)def_restore_database(self,backup_coordinates,config):"""Actual restore of the current database. Use the value tostored in db_cache as input """raiseNotImplementedError()defget_repo(self,startup=False):""" return Repository object on the current database. (turn the current repo object "on" if there is one or recreate one) if startup is True, server startup server hooks will be called if needed """ifself._repoisNone:self._repo=self._new_repo(self.config)repo=self._reporepo.turn_repo_on()ifstartupandnotrepo._has_started:repo.hm.call_hooks('server_startup',repo=repo)repo._has_started=Truereturnrepodef_new_repo(self,config):"""Factory method to create a new Repository Instance"""fromcubicweb.dbapiimportin_memory_repoconfig._cubes=Nonerepo=in_memory_repo(config)# extending Repository classrepo._has_started=Falserepo._needs_refresh=Falserepo.turn_repo_on=partial(turn_repo_on,repo)repo.turn_repo_off=partial(turn_repo_off,repo)returnrepodefget_cnx(self):"""return Connection object ont he current repository"""fromcubicweb.dbapiimportin_memory_cnxrepo=self.get_repo()sources=self.config.sources()login=unicode(sources['admin']['login'])password=sources['admin']['password']or'xxx'cnx=in_memory_cnx(repo,login,password=password)returncnxdefget_repo_and_cnx(self,db_id=DEFAULT_EMPTY_DB_ID):"""Reset database with the current db_id and return (repo, cnx) A database *MUST* have been build with the current <db_id> prior to call this method. See the ``build_db_cache`` method. The returned repository have it's startup hooks called and the connection is establised as admin."""self.restore_database(db_id)repo=self.get_repo(startup=True)cnx=self.get_cnx()returnrepo,cnx@propertydefsystem_source(self):sources=self.config.sources()returnsources['system']@propertydefdbname(self):returnself.system_source['db-name']definit_test_database():"""actual initialisation of the database"""raiseValueError('no initialization function for driver %r'%driver)defhas_cache(self,db_id):"""Check if a given database id exist in cb cache for the current config"""cache_glob=self.absolute_backup_file('*','*')ifcache_globnotinself.explored_glob:self.discover_cached_db()returnself.db_cache_key(db_id)inself.db_cachedefdiscover_cached_db(self):"""Search available db_if for the current config"""cache_glob=self.absolute_backup_file('*','*')directory=os.path.dirname(cache_glob)entries={}candidates=glob.glob(cache_glob)forfilepathincandidates:data=os.path.basename(filepath)# database backup are in the forms are <dbname>-<db_id>.<backtype>dbname,data=data.split('-',1)db_id,filetype=data.split('.',1)entries.setdefault((dbname,db_id),{})[filetype]=filepathfor(dbname,db_id),entryinentries.iteritems():# apply necessary transformation from the drivervalue=self.process_cache_entry(directory,dbname,db_id,entry)assert'config'inentryifvalueisnotNone:# None value means "not handled by this driver# XXX Ignored value are shadowed to other Handler if cache are common.key=self.db_cache_key(db_id,dbname=dbname)self.db_cache[key]=value,entry['config']self.explored_glob.add(cache_glob)defprocess_cache_entry(self,directory,dbname,db_id,entry):"""Transforms potential cache entry to proper backup coordinate entry argument is a "filetype" -> "filepath" mapping Return None if an entry should be ignored."""returnNonedefbuild_db_cache(self,test_db_id=DEFAULT_EMPTY_DB_ID,pre_setup_func=None):"""Build Database cache for ``test_db_id`` if a cache doesn't exist if ``test_db_id is DEFAULT_EMPTY_DB_ID`` self.init_test_database is called. otherwise, DEFAULT_EMPTY_DB_ID is build/restored and ``pre_setup_func`` to setup the database. This function backup any database it build"""ifself.has_cache(test_db_id):return#test_db_id, 'already in cache'iftest_db_idisDEFAULT_EMPTY_DB_ID:self.init_test_database()else:print'Building %s for database %s'%(test_db_id,self.dbname)self.build_db_cache(DEFAULT_EMPTY_DB_ID)self.restore_database(DEFAULT_EMPTY_DB_ID)repo=self.get_repo(startup=True)cnx=self.get_cnx()session=repo._sessions[cnx.sessionid]session.set_pool()_commit=session.commitdefalways_pooled_commit():_commit()session.set_pool()session.commit=always_pooled_commitpre_setup_func(session,self.config)session.commit()cnx.close()self.backup_database(test_db_id)### postgres test database handling ############################################classPostgresTestDataBaseHandler(TestDataBaseHandler):# XXX# XXX PostgresTestDataBaseHandler Have not been tested at all.# XXXDRIVER='postgres'@property@cacheddefhelper(self):fromlogilab.databaseimportget_db_helperreturnget_db_helper('postgres')@property@cacheddefdbcnx(self):fromcubicweb.server.serverctlimport_db_sys_cnxreturn_db_sys_cnx(self.system_source,'CREATE DATABASE and / or USER',interactive=False)@property@cacheddefcursor(self):returnself.dbcnx.cursor()definit_test_database(self):"""initialize a fresh postgresql databse used for testing purpose"""fromcubicweb.serverimportinit_repositoryfromcubicweb.server.serverctlimportsystem_source_cnx,createdb# connect on the dbms system base to create our basetry:self._drop(self.dbname)createdb(self.helper,self.system_source,self.dbcnx,self.cursor)self.dbcnx.commit()cnx=system_source_cnx(self.system_source,special_privs='LANGUAGE C',interactive=False)templcursor=cnx.cursor()try:# XXX factorize with db-create codeself.helper.init_fti_extensions(templcursor)# install plpythonu/plpgsql language if not installed by the cubelangs=sys.platform=='win32'and('plpgsql',)or('plpythonu','plpgsql')forextlanginlangs:self.helper.create_language(templcursor,extlang)cnx.commit()finally:templcursor.close()cnx.close()init_repository(self.config,interactive=False)except:self.dbcnx.rollback()print>>sys.stderr,'building',self.dbname,'failed'#self._drop(self.dbname)raisedefhelper_clear_cache(self):self.dbcnx.commit()self.dbcnx.close()clear_cache(self,'dbcnx')clear_cache(self,'helper')clear_cache(self,'cursor')def__del__(self):self.helper_clear_cache()@propertydef_config_id(self):returnhashlib.sha1(self.config.apphome).hexdigest()[:10]def_backup_name(self,db_id):# merge me with parentbackup_name='_'.join(('cache',self._config_id,self.dbname,db_id))returnbackup_name.lower()def_drop(self,db_name):ifdb_nameinself.helper.list_databases(self.cursor):#print 'dropping overwritted database:', db_nameself.cursor.execute('DROP DATABASE %s'%db_name)self.dbcnx.commit()def_backup_database(self,db_id):"""Actual backup the current database. return a value to be stored in db_cache to allow restoration"""fromcubicweb.server.serverctlimportcreatedborig_name=self.system_source['db-name']try:backup_name=self._backup_name(db_id)#print 'storing postgres backup as', backup_nameself._drop(backup_name)self.system_source['db-name']=backup_namecreatedb(self.helper,self.system_source,self.dbcnx,self.cursor,template=orig_name)self.dbcnx.commit()returnbackup_namefinally:self.system_source['db-name']=orig_namedef_restore_database(self,backup_coordinates,config):fromcubicweb.server.serverctlimportcreatedb"""Actual restore of the current database. Use the value tostored in db_cache as input """#print 'restoring postgrest backup from', backup_coordinatesself._drop(self.dbname)createdb(self.helper,self.system_source,self.dbcnx,self.cursor,template=backup_coordinates)self.dbcnx.commit()### sqlserver2005 test database handling #######################################classSQLServerTestDataBaseHandler(TestDataBaseHandler):DRIVER='sqlserver'# XXX complete medefinit_test_database(self):"""initialize a fresh sqlserver databse used for testing purpose"""ifself.config.init_repository:fromcubicweb.serverimportinit_repositoryinit_repository(config,interactive=False,drop=True)### sqlite test database handling ##############################################classSQLiteTestDataBaseHandler(TestDataBaseHandler):DRIVER='sqlite'@staticmethoddef_cleanup_database(dbfile):try:os.remove(dbfile)os.remove('%s-journal'%dbfile)exceptOSError:passdefabsolute_dbfile(self):"""absolute path of current database file"""dbfile=join(self._ensure_test_backup_db_dir(),self.config.sources()['system']['db-name'])self.config.sources()['system']['db-name']=dbfilereturndbfiledefprocess_cache_entry(self,directory,dbname,db_id,entry):returnentry.get('sqlite')def_backup_database(self,db_id=DEFAULT_EMPTY_DB_ID):# XXX remove database file if it exists ???dbfile=self.absolute_dbfile()backup_file=self.absolute_backup_file(db_id,'sqlite')shutil.copy(dbfile,backup_file)# Usefull to debug WHO write a database# backup_stack = self.absolute_backup_file(db_id, '.stack')#with open(backup_stack, 'w') as backup_stack_file:# import traceback# traceback.print_stack(file=backup_stack_file)returnbackup_filedef_new_repo(self,config):repo=super(SQLiteTestDataBaseHandler,self)._new_repo(config)install_sqlite_patch(repo.querier)returnrepodef_restore_database(self,backup_coordinates,_config):# remove database file if it exists ?dbfile=self.absolute_dbfile()self._cleanup_database(dbfile)#print 'resto from', backup_coordinatesshutil.copy(backup_coordinates,dbfile)repo=self.get_repo()definit_test_database(self):"""initialize a fresh sqlite databse used for testing purpose"""# initialize the databasefromcubicweb.serverimportinit_repositoryself._cleanup_database(self.absolute_dbfile())init_repository(self.config,interactive=False)definstall_sqlite_patch(querier):"""This patch hotfixes the following sqlite bug : - http://www.sqlite.org/cvstrac/tktview?tn=1327,33 (some dates are returned as strings rather thant date objects) """ifhasattr(querier.__class__,'_devtools_sqlite_patched'):return# already monkey patcheddefwrap_execute(base_execute):defnew_execute(*args,**kwargs):rset=base_execute(*args,**kwargs)ifrset.description:found_date=Falseforrow,rowdescinzip(rset,rset.description):forcellindex,(value,vtype)inenumerate(zip(row,rowdesc)):ifvtypein('Date','Datetime')andtype(value)isunicode:found_date=Truevalue=value.rsplit('.',1)[0]try:row[cellindex]=strptime(value,'%Y-%m-%d %H:%M:%S')except:row[cellindex]=strptime(value,'%Y-%m-%d')ifvtype=='Time'andtype(value)isunicode:found_date=Truetry:row[cellindex]=strptime(value,'%H:%M:%S')except:# DateTime used as Time?row[cellindex]=strptime(value,'%Y-%m-%d %H:%M:%S')ifvtype=='Interval'andtype(value)isint:found_date=Truerow[cellindex]=timedelta(0,value,0)# XXX value is in number of seconds?ifnotfound_date:breakreturnrsetreturnnew_executequerier.__class__.execute=wrap_execute(querier.__class__.execute)querier.__class__._devtools_sqlite_patched=TrueHANDLERS={}defregister_handler(handlerkls,overwrite=False):asserthandlerklsisnotNoneifoverwriteorhandlerkls.DRIVERnotinHANDLERS:HANDLERS[handlerkls.DRIVER]=handlerklselse:msg="%s: Handler already exists use overwrite if it's intended\n"\"(existing handler class is %r)"raiseValueError(msg%(handlerkls.DRIVER,HANDLERS[handlerkls.DRIVER]))register_handler(PostgresTestDataBaseHandler)register_handler(SQLiteTestDataBaseHandler)register_handler(SQLServerTestDataBaseHandler)classHCache(object):"""Handler cache object: store database handler for a given configuration. We only keep one repo in cache to prevent too much objects to stay alive (database handler holds a reference to a repository). As at the moment a new handler is created for each TestCase class and all test methods are executed sequentialy whithin this class, there should not have more cache miss that if we had a wider cache as once a Handler stop being used it won't be used again. """def__init__(self):self.config=Noneself.handler=Nonedefget(self,config):ifconfigisself.config:returnself.handlerelse:returnNonedefset(self,config,handler):self.config=configself.handler=handlerHCACHE=HCache()# XXX a class method on Test ?defget_test_db_handler(config):handler=HCACHE.get(config)ifhandlerisnotNone:returnhandlersources=config.sources()driver=sources['system']['db-driver']key=(driver,config)handlerkls=HANDLERS.get(driver,None)ifhandlerklsisnotNone:handler=handlerkls(config)HCACHE.set(config,handler)returnhandlerelse:raiseValueError('no initialization function for driver %r'%driver)### compatibility layer ##############################################fromlogilab.common.deprecationimportdeprecated@deprecated("please use the new DatabaseHandler mecanism")definit_test_database(config=None,configdir='data',apphome=None):"""init a test database for a specific driver"""ifconfigisNone:config=TestServerConfiguration(apphome=apphome)handler=get_test_db_handler(config)handler.build_db_cache()returnhandler.get_repo_and_cnx()