--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/cubicweb/devtools/__init__.py Sat Jan 16 13:48:51 2016 +0100
@@ -0,0 +1,915 @@
+# copyright 2003-2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
+#
+# This file is part of CubicWeb.
+#
+# CubicWeb is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License along
+# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
+"""Test tools for cubicweb"""
+from __future__ import print_function
+
+__docformat__ = "restructuredtext en"
+
+import os
+import sys
+import errno
+import logging
+import shutil
+import glob
+import subprocess
+import warnings
+import tempfile
+import getpass
+from hashlib import sha1 # pylint: disable=E0611
+from datetime import timedelta
+from os.path import abspath, join, exists, split, isabs, isdir
+from functools import partial
+
+from six import text_type
+from six.moves import cPickle as pickle
+
+from logilab.common.date import strptime
+from logilab.common.decorators import cached, clear_cache
+
+from cubicweb import ExecutionError, BadConnectionId
+from cubicweb import schema, cwconfig
+from cubicweb.server.serverconfig import ServerConfiguration
+from cubicweb.etwist.twconfig import WebConfigurationBase
+
+cwconfig.CubicWebConfiguration.cls_adjust_sys_path()
+
+# db auto-population configuration #############################################
+
+SYSTEM_ENTITIES = (schema.SCHEMA_TYPES
+ | schema.INTERNAL_TYPES
+ | schema.WORKFLOW_TYPES
+ | set(('CWGroup', 'CWUser',))
+ )
+SYSTEM_RELATIONS = (schema.META_RTYPES
+ | schema.WORKFLOW_RTYPES
+ | schema.WORKFLOW_DEF_RTYPES
+ | schema.SYSTEM_RTYPES
+ | schema.SCHEMA_TYPES
+ | set(('primary_email', # deducted from other relations
+ ))
+ )
+
+# content validation configuration #############################################
+
+# validators are used to validate (XML, DTD, whatever) view's content
+# validators availables are :
+# 'dtd' : validates XML + declared DTD
+# 'xml' : guarantees XML is well formed
+# None : do not try to validate anything
+
+# {'vid': validator}
+VIEW_VALIDATORS = {}
+
+
+# cubicweb test configuration ##################################################
+
+BASE_URL = 'http://testing.fr/cubicweb/'
+
+DEFAULT_SOURCES = {'system': {'adapter' : 'native',
+ 'db-encoding' : 'UTF-8', #'ISO-8859-1',
+ 'db-user' : u'admin',
+ 'db-password' : 'gingkow',
+ 'db-name' : 'tmpdb',
+ 'db-driver' : 'sqlite',
+ 'db-host' : None,
+ },
+ 'admin' : {'login': u'admin',
+ 'password': u'gingkow',
+ },
+ }
+DEFAULT_PSQL_SOURCES = DEFAULT_SOURCES.copy()
+DEFAULT_PSQL_SOURCES['system'] = DEFAULT_SOURCES['system'].copy()
+DEFAULT_PSQL_SOURCES['system']['db-driver'] = 'postgres'
+DEFAULT_PSQL_SOURCES['system']['db-user'] = text_type(getpass.getuser())
+DEFAULT_PSQL_SOURCES['system']['db-password'] = None
+
+def turn_repo_off(repo):
+ """ Idea: this is less costly than a full re-creation of the repo object.
+ off:
+ * session are closed,
+ * cnxsets are closed
+ * system source is shutdown
+ """
+ if not repo._needs_refresh:
+ for sessionid in list(repo._sessions):
+ warnings.warn('%s Open session found while turning repository off'
+ %sessionid, RuntimeWarning)
+ try:
+ repo.close(sessionid)
+ except BadConnectionId: #this is strange ? thread issue ?
+ print('XXX unknown session', sessionid)
+ for cnxset in repo.cnxsets:
+ cnxset.close(True)
+ repo.system_source.shutdown()
+ repo._needs_refresh = True
+ repo._has_started = False
+
+
+def turn_repo_on(repo):
+ """Idea: this is less costly than a full re-creation of the repo object.
+ on:
+ * cnxsets are connected
+ * cache are cleared
+ """
+ if repo._needs_refresh:
+ for cnxset in repo.cnxsets:
+ cnxset.reconnect()
+ repo._type_source_cache = {}
+ repo._extid_cache = {}
+ repo.querier._rql_cache = {}
+ repo.system_source.reset_caches()
+ repo._needs_refresh = False
+
+
+class TestServerConfiguration(ServerConfiguration):
+ mode = 'test'
+ read_instance_schema = False
+ init_repository = True
+ skip_db_create_and_restore = False
+ default_sources = DEFAULT_SOURCES
+
+ def __init__(self, appid='data', apphome=None, log_threshold=logging.CRITICAL+10):
+ # must be set before calling parent __init__
+ if apphome is None:
+ if exists(appid):
+ apphome = abspath(appid)
+ else: # cube test
+ apphome = abspath('..')
+ self._apphome = apphome
+ super(TestServerConfiguration, self).__init__(appid)
+ self.init_log(log_threshold, force=True)
+ # need this, usually triggered by cubicweb-ctl
+ self.load_cwctl_plugins()
+
+ # By default anonymous login are allow but some test need to deny of to
+ # change the default user. Set it to None to prevent anonymous login.
+ anonymous_credential = ('anon', 'anon')
+
+ def anonymous_user(self):
+ if not self.anonymous_credential:
+ return None, None
+ return self.anonymous_credential
+
+ def set_anonymous_allowed(self, allowed, anonuser=u'anon'):
+ if allowed:
+ self.anonymous_credential = (anonuser, anonuser)
+ else:
+ self.anonymous_credential = None
+
+ @property
+ def apphome(self):
+ return self._apphome
+ appdatahome = apphome
+
+ def load_configuration(self, **kw):
+ super(TestServerConfiguration, self).load_configuration(**kw)
+ # no undo support in tests
+ self.global_set_option('undo-enabled', 'n')
+
+ def main_config_file(self):
+ """return instance's control configuration file"""
+ return join(self.apphome, '%s.conf' % self.name)
+
+ def bootstrap_cubes(self):
+ try:
+ super(TestServerConfiguration, self).bootstrap_cubes()
+ except IOError:
+ # no cubes
+ self.init_cubes( () )
+
+ sourcefile = None
+ def sources_file(self):
+ """define in subclasses self.sourcefile if necessary"""
+ if self.sourcefile:
+ print('Reading sources from', self.sourcefile)
+ sourcefile = self.sourcefile
+ if not isabs(sourcefile):
+ sourcefile = join(self.apphome, sourcefile)
+ else:
+ sourcefile = super(TestServerConfiguration, self).sources_file()
+ return sourcefile
+
+ def read_sources_file(self):
+ """By default, we run tests with the sqlite DB backend. One may use its
+ own configuration by just creating a 'sources' file in the test
+ directory from which tests are launched or by specifying an alternative
+ sources file using self.sourcefile.
+ """
+ try:
+ sources = super(TestServerConfiguration, self).read_sources_file()
+ except ExecutionError:
+ sources = {}
+ if not sources:
+ sources = self.default_sources
+ if 'admin' not in sources:
+ sources['admin'] = self.default_sources['admin']
+ return sources
+
+ # web config methods needed here for cases when we use this config as a web
+ # config
+
+ def default_base_url(self):
+ return BASE_URL
+
+
+class BaseApptestConfiguration(TestServerConfiguration, WebConfigurationBase):
+ name = 'all-in-one' # so it search for all-in-one.conf, not repository.conf
+ options = cwconfig.merge_options(TestServerConfiguration.options
+ + WebConfigurationBase.options)
+ cubicweb_appobject_path = TestServerConfiguration.cubicweb_appobject_path | WebConfigurationBase.cubicweb_appobject_path
+ cube_appobject_path = TestServerConfiguration.cube_appobject_path | WebConfigurationBase.cube_appobject_path
+
+ def available_languages(self, *args):
+ return self.cw_languages()
+
+
+# XXX merge with BaseApptestConfiguration ?
+class ApptestConfiguration(BaseApptestConfiguration):
+ # `skip_db_create_and_restore` controls wether or not the test database
+ # should be created / backuped / restored. If set to True, those
+ # steps are completely skipped, the database is used as is and is
+ # considered initialized
+ skip_db_create_and_restore = False
+
+ def __init__(self, appid, apphome=None,
+ log_threshold=logging.WARNING, sourcefile=None):
+ BaseApptestConfiguration.__init__(self, appid, apphome,
+ log_threshold=log_threshold)
+ self.init_repository = sourcefile is None
+ self.sourcefile = sourcefile
+
+
+class PostgresApptestConfiguration(ApptestConfiguration):
+ default_sources = DEFAULT_PSQL_SOURCES
+
+
+class RealDatabaseConfiguration(ApptestConfiguration):
+ """configuration class for tests to run on a real database.
+
+ The intialization is done by specifying a source file path.
+
+ Important note: init_test_database / reset_test_database steps are
+ skipped. It's thus up to the test developer to implement setUp/tearDown
+ accordingly.
+
+ Example usage::
+
+ class MyTests(CubicWebTC):
+ _config = RealDatabaseConfiguration('myapp',
+ sourcefile='/path/to/sources')
+
+ def test_something(self):
+ with self.admin_access.web_request() as req:
+ rset = req.execute('Any X WHERE X is CWUser')
+ self.view('foaf', rset, req=req)
+
+ """
+ skip_db_create_and_restore = True
+ read_instance_schema = True # read schema from database
+
+# test database handling #######################################################
+
+DEFAULT_EMPTY_DB_ID = '__default_empty_db__'
+
+class TestDataBaseHandler(object):
+ DRIVER = None
+
+ db_cache = {}
+ explored_glob = set()
+
+ def __init__(self, config, init_config=None):
+ self.config = config
+ self.init_config = init_config
+ self._repo = None
+ # pure consistency check
+ assert self.system_source['db-driver'] == self.DRIVER
+
+ # some handlers want to store info here, avoid a warning
+ from cubicweb.server.sources.native import NativeSQLSource
+ NativeSQLSource.options += (
+ ('global-db-name',
+ {'type': 'string', 'help': 'for internal use only'
+ }),
+ )
+
+ def _ensure_test_backup_db_dir(self):
+ """Return path of directory for database backup.
+
+ The function create it if necessary"""
+ backupdir = join(self.config.apphome, 'database')
+ try:
+ os.makedirs(backupdir)
+ except:
+ if not isdir(backupdir):
+ raise
+ return backupdir
+
+ def config_path(self, db_id):
+ """Path for config backup of a given database id"""
+ return self.absolute_backup_file(db_id, 'config')
+
+ def absolute_backup_file(self, db_id, suffix):
+ """Path for config backup of a given database id"""
+ # in case db name is an absolute path, we don't want to replace anything
+ # in parent directories
+ directory, basename = split(self.dbname)
+ dbname = basename.replace('-', '_')
+ assert '.' not in db_id
+ filename = join(directory, '%s-%s.%s' % (dbname, db_id, suffix))
+ return join(self._ensure_test_backup_db_dir(), filename)
+
+ def db_cache_key(self, db_id, dbname=None):
+ """Build a database cache key for a db_id with the current config
+
+ This key is meant to be used in the cls.db_cache mapping"""
+ if dbname is None:
+ dbname = self.dbname
+ dbname = os.path.basename(dbname)
+ dbname = dbname.replace('-', '_')
+ return (self.config.apphome, dbname, db_id)
+
+ def backup_database(self, db_id):
+ """Store the content of the current database as <db_id>
+
+ The config used are also stored."""
+ backup_data = self._backup_database(db_id)
+ config_path = self.config_path(db_id)
+ # XXX we dump a dict of the config
+ # This is an experimental to help config dependant setup (like BFSS) to
+ # be propertly restored
+ with tempfile.NamedTemporaryFile(dir=os.path.dirname(config_path), delete=False) as conf_file:
+ conf_file.write(pickle.dumps(dict(self.config)))
+ os.rename(conf_file.name, config_path)
+ self.db_cache[self.db_cache_key(db_id)] = (backup_data, config_path)
+
+ def _backup_database(self, db_id):
+ """Actual backup the current database.
+
+ return a value to be stored in db_cache to allow restoration"""
+ raise NotImplementedError()
+
+ def restore_database(self, db_id):
+ """Restore a database.
+
+ takes as argument value stored in db_cache by self._backup_database"""
+ # XXX set a clearer error message ???
+ backup_coordinates, config_path = self.db_cache[self.db_cache_key(db_id)]
+ # reload the config used to create the database.
+ with open(config_path, 'rb') as f:
+ config = pickle.load(f)
+ # shutdown repo before changing database content
+ if self._repo is not None:
+ self._repo.turn_repo_off()
+ self._restore_database(backup_coordinates, config)
+
+ def _restore_database(self, backup_coordinates, config):
+ """Actual restore of the current database.
+
+ Use the value stored in db_cache as input """
+ raise NotImplementedError()
+
+ def get_repo(self, startup=False):
+ """ return Repository object on the current database.
+
+ (turn the current repo object "on" if there is one or recreate one)
+ if startup is True, server startup server hooks will be called if needed
+ """
+ if self._repo is None:
+ self._repo = self._new_repo(self.config)
+ # config has now been bootstrapped, call init_config if specified
+ if self.init_config is not None:
+ self.init_config(self.config)
+ repo = self._repo
+ repo.turn_repo_on()
+ if startup and not repo._has_started:
+ repo.hm.call_hooks('server_startup', repo=repo)
+ repo._has_started = True
+ return repo
+
+ def _new_repo(self, config):
+ """Factory method to create a new Repository Instance"""
+ config._cubes = None
+ repo = config.repository()
+ config.repository = lambda x=None: repo
+ # extending Repository class
+ repo._has_started = False
+ repo._needs_refresh = False
+ repo.turn_repo_on = partial(turn_repo_on, repo)
+ repo.turn_repo_off = partial(turn_repo_off, repo)
+ return repo
+
+ def get_cnx(self):
+ """return Connection object on the current repository"""
+ from cubicweb.repoapi import connect
+ repo = self.get_repo()
+ sources = self.config.read_sources_file()
+ login = text_type(sources['admin']['login'])
+ password = sources['admin']['password'] or 'xxx'
+ cnx = connect(repo, login, password=password)
+ return cnx
+
+ def get_repo_and_cnx(self, db_id=DEFAULT_EMPTY_DB_ID):
+ """Reset database with the current db_id and return (repo, cnx)
+
+ A database *MUST* have been build with the current <db_id> prior to
+ call this method. See the ``build_db_cache`` method. The returned
+ repository have it's startup hooks called and the connection is
+ establised as admin."""
+
+ self.restore_database(db_id)
+ repo = self.get_repo(startup=True)
+ cnx = self.get_cnx()
+ return repo, cnx
+
+ @property
+ def system_source(self):
+ return self.config.system_source_config
+
+ @property
+ def dbname(self):
+ return self.system_source['db-name']
+
+ def init_test_database(self):
+ """actual initialisation of the database"""
+ raise ValueError('no initialization function for driver %r' % self.DRIVER)
+
+ def has_cache(self, db_id):
+ """Check if a given database id exist in cb cache for the current config"""
+ cache_glob = self.absolute_backup_file('*', '*')
+ if cache_glob not in self.explored_glob:
+ self.discover_cached_db()
+ return self.db_cache_key(db_id) in self.db_cache
+
+ def discover_cached_db(self):
+ """Search available db_if for the current config"""
+ cache_glob = self.absolute_backup_file('*', '*')
+ directory = os.path.dirname(cache_glob)
+ entries={}
+ candidates = glob.glob(cache_glob)
+ for filepath in candidates:
+ data = os.path.basename(filepath)
+ # database backup are in the forms are <dbname>-<db_id>.<backtype>
+ dbname, data = data.split('-', 1)
+ db_id, filetype = data.split('.', 1)
+ entries.setdefault((dbname, db_id), {})[filetype] = filepath
+ for (dbname, db_id), entry in entries.items():
+ # apply necessary transformation from the driver
+ value = self.process_cache_entry(directory, dbname, db_id, entry)
+ assert 'config' in entry
+ if value is not None: # None value means "not handled by this driver
+ # XXX Ignored value are shadowed to other Handler if cache are common.
+ key = self.db_cache_key(db_id, dbname=dbname)
+ self.db_cache[key] = value, entry['config']
+ self.explored_glob.add(cache_glob)
+
+ def process_cache_entry(self, directory, dbname, db_id, entry):
+ """Transforms potential cache entry to proper backup coordinate
+
+ entry argument is a "filetype" -> "filepath" mapping
+ Return None if an entry should be ignored."""
+ return None
+
+ def build_db_cache(self, test_db_id=DEFAULT_EMPTY_DB_ID, pre_setup_func=None):
+ """Build Database cache for ``test_db_id`` if a cache doesn't exist
+
+ if ``test_db_id is DEFAULT_EMPTY_DB_ID`` self.init_test_database is
+ called. otherwise, DEFAULT_EMPTY_DB_ID is build/restored and
+ ``pre_setup_func`` to setup the database.
+
+ This function backup any database it build"""
+ if self.has_cache(test_db_id):
+ return #test_db_id, 'already in cache'
+ if test_db_id is DEFAULT_EMPTY_DB_ID:
+ self.init_test_database()
+ else:
+ print('Building %s for database %s' % (test_db_id, self.dbname))
+ self.build_db_cache(DEFAULT_EMPTY_DB_ID)
+ self.restore_database(DEFAULT_EMPTY_DB_ID)
+ repo = self.get_repo(startup=True)
+ cnx = self.get_cnx()
+ with cnx:
+ pre_setup_func(cnx, self.config)
+ cnx.commit()
+ self.backup_database(test_db_id)
+
+
+class NoCreateDropDatabaseHandler(TestDataBaseHandler):
+ """This handler is used if config.skip_db_create_and_restore is True
+
+ This is typically the case with RealDBConfig. In that case,
+ we explicitely want to skip init / backup / restore phases.
+
+ This handler redefines the three corresponding methods and delegates
+ to original handler for any other method / attribute
+ """
+
+ def __init__(self, base_handler):
+ self.base_handler = base_handler
+
+ # override init / backup / restore methods
+ def init_test_database(self):
+ pass
+
+ def backup_database(self, db_id):
+ pass
+
+ def restore_database(self, db_id):
+ pass
+
+ # delegate to original handler in all other cases
+ def __getattr__(self, attrname):
+ return getattr(self.base_handler, attrname)
+
+
+### postgres test database handling ############################################
+
+def startpgcluster(pyfile):
+ """Start a postgresql cluster next to pyfile"""
+ datadir = join(os.path.dirname(pyfile), 'data', 'database',
+ 'pgdb-%s' % os.path.splitext(os.path.basename(pyfile))[0])
+ if not exists(datadir):
+ try:
+ subprocess.check_call(['initdb', '-D', datadir, '-E', 'utf-8', '--locale=C'])
+
+ except OSError as err:
+ if err.errno == errno.ENOENT:
+ raise OSError('"initdb" could not be found. '
+ 'You should add the postgresql bin folder to your PATH '
+ '(/usr/lib/postgresql/9.1/bin for example).')
+ raise
+ datadir = os.path.abspath(datadir)
+ pgport = '5432'
+ env = os.environ.copy()
+ sockdir = tempfile.mkdtemp(prefix='cwpg')
+ DEFAULT_PSQL_SOURCES['system']['db-host'] = sockdir
+ DEFAULT_PSQL_SOURCES['system']['db-port'] = pgport
+ options = '-h "" -k %s -p %s' % (sockdir, pgport)
+ options += ' -c fsync=off -c full_page_writes=off'
+ options += ' -c synchronous_commit=off'
+ try:
+ subprocess.check_call(['pg_ctl', 'start', '-w', '-D', datadir,
+ '-o', options],
+ env=env)
+ except OSError as err:
+ try:
+ os.rmdir(sockdir)
+ except OSError:
+ pass
+ if err.errno == errno.ENOENT:
+ raise OSError('"pg_ctl" could not be found. '
+ 'You should add the postgresql bin folder to your PATH '
+ '(/usr/lib/postgresql/9.1/bin for example).')
+ raise
+
+
+def stoppgcluster(pyfile):
+ """Kill the postgresql cluster running next to pyfile"""
+ datadir = join(os.path.dirname(pyfile), 'data', 'database',
+ 'pgdb-%s' % os.path.splitext(os.path.basename(pyfile))[0])
+ subprocess.call(['pg_ctl', 'stop', '-D', datadir, '-m', 'fast'])
+ try:
+ os.rmdir(DEFAULT_PSQL_SOURCES['system']['db-host'])
+ except OSError:
+ pass
+
+
+class PostgresTestDataBaseHandler(TestDataBaseHandler):
+ DRIVER = 'postgres'
+
+ # Separate db_cache for PG databases, to avoid collisions with sqlite dbs
+ db_cache = {}
+ explored_glob = set()
+
+ __CTL = set()
+
+ def __init__(self, *args, **kwargs):
+ super(PostgresTestDataBaseHandler, self).__init__(*args, **kwargs)
+ if 'global-db-name' not in self.system_source:
+ self.system_source['global-db-name'] = self.system_source['db-name']
+ self.system_source['db-name'] = self.system_source['db-name'] + str(os.getpid())
+
+ @property
+ @cached
+ def helper(self):
+ from logilab.database import get_db_helper
+ return get_db_helper('postgres')
+
+ @property
+ def dbname(self):
+ return self.system_source['global-db-name']
+
+ @property
+ def dbcnx(self):
+ try:
+ return self._cnx
+ except AttributeError:
+ from cubicweb.server.serverctl import _db_sys_cnx
+ try:
+ self._cnx = _db_sys_cnx(
+ self.system_source, 'CREATE DATABASE and / or USER',
+ interactive=False)
+ return self._cnx
+ except Exception:
+ self._cnx = None
+ raise
+
+ @property
+ @cached
+ def cursor(self):
+ return self.dbcnx.cursor()
+
+ def process_cache_entry(self, directory, dbname, db_id, entry):
+ backup_name = self._backup_name(db_id)
+ if backup_name in self.helper.list_databases(self.cursor):
+ return backup_name
+ return None
+
+ def has_cache(self, db_id):
+ backup_name = self._backup_name(db_id)
+ return (super(PostgresTestDataBaseHandler, self).has_cache(db_id)
+ and backup_name in self.helper.list_databases(self.cursor))
+
+ def init_test_database(self):
+ """initialize a fresh postgresql database used for testing purpose"""
+ from cubicweb.server import init_repository
+ from cubicweb.server.serverctl import system_source_cnx, createdb
+ # connect on the dbms system base to create our base
+ try:
+ self._drop(self.system_source['db-name'])
+ createdb(self.helper, self.system_source, self.dbcnx, self.cursor)
+ self.dbcnx.commit()
+ cnx = system_source_cnx(self.system_source, special_privs='LANGUAGE C',
+ interactive=False)
+ templcursor = cnx.cursor()
+ try:
+ # XXX factorize with db-create code
+ self.helper.init_fti_extensions(templcursor)
+ # install plpythonu/plpgsql language if not installed by the cube
+ langs = sys.platform == 'win32' and ('plpgsql',) or ('plpythonu', 'plpgsql')
+ for extlang in langs:
+ self.helper.create_language(templcursor, extlang)
+ cnx.commit()
+ finally:
+ templcursor.close()
+ cnx.close()
+ init_repository(self.config, interactive=False,
+ init_config=self.init_config)
+ except BaseException:
+ if self.dbcnx is not None:
+ self.dbcnx.rollback()
+ sys.stderr.write('building %s failed\n' % self.dbname)
+ #self._drop(self.dbname)
+ raise
+
+ def helper_clear_cache(self):
+ if self.dbcnx is not None:
+ self.dbcnx.commit()
+ self.dbcnx.close()
+ del self._cnx
+ clear_cache(self, 'cursor')
+ clear_cache(self, 'helper')
+
+ def __del__(self):
+ self.helper_clear_cache()
+
+ @property
+ def _config_id(self):
+ return sha1(self.config.apphome.encode('utf-8')).hexdigest()[:10]
+
+ def _backup_name(self, db_id): # merge me with parent
+ backup_name = '_'.join(('cache', self._config_id, self.dbname, db_id))
+ return backup_name.lower()
+
+ def _drop(self, db_name):
+ if db_name in self.helper.list_databases(self.cursor):
+ self.cursor.execute('DROP DATABASE %s' % db_name)
+ self.dbcnx.commit()
+
+ def _backup_database(self, db_id):
+ """Actual backup the current database.
+
+ return a value to be stored in db_cache to allow restoration
+ """
+ from cubicweb.server.serverctl import createdb
+ orig_name = self.system_source['db-name']
+ try:
+ backup_name = self._backup_name(db_id)
+ self._drop(backup_name)
+ self.system_source['db-name'] = backup_name
+ if self._repo:
+ self._repo.turn_repo_off()
+ try:
+ createdb(self.helper, self.system_source, self.dbcnx, self.cursor, template=orig_name)
+ self.dbcnx.commit()
+ finally:
+ if self._repo:
+ self._repo.turn_repo_on()
+ return backup_name
+ finally:
+ self.system_source['db-name'] = orig_name
+
+ def _restore_database(self, backup_coordinates, config):
+ from cubicweb.server.serverctl import createdb
+ """Actual restore of the current database.
+
+ Use the value tostored in db_cache as input """
+ self._drop(self.system_source['db-name'])
+ createdb(self.helper, self.system_source, self.dbcnx, self.cursor,
+ template=backup_coordinates)
+ self.dbcnx.commit()
+
+
+
+### sqlserver2005 test database handling #######################################
+
+class SQLServerTestDataBaseHandler(TestDataBaseHandler):
+ DRIVER = 'sqlserver'
+
+ # XXX complete me
+
+ def init_test_database(self):
+ """initialize a fresh sqlserver databse used for testing purpose"""
+ if self.config.init_repository:
+ from cubicweb.server import init_repository
+ init_repository(self.config, interactive=False, drop=True,
+ init_config=self.init_config)
+
+### sqlite test database handling ##############################################
+
+class SQLiteTestDataBaseHandler(TestDataBaseHandler):
+ DRIVER = 'sqlite'
+
+ __TMPDB = set()
+
+ @classmethod
+ def _cleanup_all_tmpdb(cls):
+ for dbpath in cls.__TMPDB:
+ cls._cleanup_database(dbpath)
+
+
+
+ def __init__(self, *args, **kwargs):
+ super(SQLiteTestDataBaseHandler, self).__init__(*args, **kwargs)
+ # use a dedicated base for each process.
+ if 'global-db-name' not in self.system_source:
+ self.system_source['global-db-name'] = self.system_source['db-name']
+ process_db = self.system_source['db-name'] + str(os.getpid())
+ self.system_source['db-name'] = process_db
+ process_db = self.absolute_dbfile() # update db-name to absolute path
+ self.__TMPDB.add(process_db)
+
+ @staticmethod
+ def _cleanup_database(dbfile):
+ try:
+ os.remove(dbfile)
+ os.remove('%s-journal' % dbfile)
+ except OSError:
+ pass
+
+ @property
+ def dbname(self):
+ return self.system_source['global-db-name']
+
+ def absolute_dbfile(self):
+ """absolute path of current database file"""
+ dbfile = join(self._ensure_test_backup_db_dir(),
+ self.system_source['db-name'])
+ self.system_source['db-name'] = dbfile
+ return dbfile
+
+ def process_cache_entry(self, directory, dbname, db_id, entry):
+ return entry.get('sqlite')
+
+ def _backup_database(self, db_id=DEFAULT_EMPTY_DB_ID):
+ # XXX remove database file if it exists ???
+ dbfile = self.absolute_dbfile()
+ backup_file = self.absolute_backup_file(db_id, 'sqlite')
+ shutil.copy(dbfile, backup_file)
+ # Useful to debug WHO writes a database
+ # backup_stack = self.absolute_backup_file(db_id, '.stack')
+ #with open(backup_stack, 'w') as backup_stack_file:
+ # import traceback
+ # traceback.print_stack(file=backup_stack_file)
+ return backup_file
+
+ def _restore_database(self, backup_coordinates, _config):
+ # remove database file if it exists ?
+ dbfile = self.absolute_dbfile()
+ self._cleanup_database(dbfile)
+ shutil.copy(backup_coordinates, dbfile)
+ self.get_repo()
+
+ def init_test_database(self):
+ """initialize a fresh sqlite databse used for testing purpose"""
+ # initialize the database
+ from cubicweb.server import init_repository
+ self._cleanup_database(self.absolute_dbfile())
+ init_repository(self.config, interactive=False,
+ init_config=self.init_config)
+
+import atexit
+atexit.register(SQLiteTestDataBaseHandler._cleanup_all_tmpdb)
+
+
+HANDLERS = {}
+
+def register_handler(handlerkls, overwrite=False):
+ assert handlerkls is not None
+ if overwrite or handlerkls.DRIVER not in HANDLERS:
+ HANDLERS[handlerkls.DRIVER] = handlerkls
+ else:
+ msg = "%s: Handler already exists use overwrite if it's intended\n"\
+ "(existing handler class is %r)"
+ raise ValueError(msg % (handlerkls.DRIVER, HANDLERS[handlerkls.DRIVER]))
+
+register_handler(PostgresTestDataBaseHandler)
+register_handler(SQLiteTestDataBaseHandler)
+register_handler(SQLServerTestDataBaseHandler)
+
+
+class HCache(object):
+ """Handler cache object: store database handler for a given configuration.
+
+ We only keep one repo in cache to prevent too much objects to stay alive
+ (database handler holds a reference to a repository). As at the moment a new
+ handler is created for each TestCase class and all test methods are executed
+ sequentially whithin this class, there should not have more cache miss that
+ if we had a wider cache as once a Handler stop being used it won't be used
+ again.
+ """
+
+ def __init__(self):
+ self.config = None
+ self.handler = None
+
+ def get(self, config):
+ if config is self.config:
+ return self.handler
+ else:
+ return None
+
+ def set(self, config, handler):
+ self.config = config
+ self.handler = handler
+
+HCACHE = HCache()
+
+
+# XXX a class method on Test ?
+
+_CONFIG = None
+def get_test_db_handler(config, init_config=None):
+ global _CONFIG
+ if _CONFIG is not None and config is not _CONFIG:
+ from logilab.common.modutils import cleanup_sys_modules
+ # cleanup all dynamically loaded modules and everything in the instance
+ # directory
+ apphome = _CONFIG.apphome
+ if apphome: # may be unset in tests
+ cleanup_sys_modules([apphome])
+ # also cleanup sys.path
+ if apphome in sys.path:
+ sys.path.remove(apphome)
+ _CONFIG = config
+ config.adjust_sys_path()
+ handler = HCACHE.get(config)
+ if handler is not None:
+ return handler
+ driver = config.system_source_config['db-driver']
+ key = (driver, config)
+ handlerkls = HANDLERS.get(driver, None)
+ if handlerkls is not None:
+ handler = handlerkls(config, init_config)
+ if config.skip_db_create_and_restore:
+ handler = NoCreateDropDatabaseHandler(handler)
+ HCACHE.set(config, handler)
+ return handler
+ else:
+ raise ValueError('no initialization function for driver %r' % driver)
+
+### compatibility layer ##############################################
+from logilab.common.deprecation import deprecated
+
+@deprecated("please use the new DatabaseHandler mecanism")
+def init_test_database(config=None, configdir='data', apphome=None):
+ """init a test database for a specific driver"""
+ if config is None:
+ config = TestServerConfiguration(apphome=apphome)
+ handler = get_test_db_handler(config)
+ handler.build_db_cache()
+ return handler.get_repo_and_cnx()