devtools/__init__.py
changeset 11057 0b59724cb3f2
parent 11052 058bb3dc685f
child 11058 23eb30449fe5
--- a/devtools/__init__.py	Mon Jan 04 18:40:30 2016 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,915 +0,0 @@
-# copyright 2003-2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
-# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
-#
-# This file is part of CubicWeb.
-#
-# CubicWeb is free software: you can redistribute it and/or modify it under the
-# terms of the GNU Lesser General Public License as published by the Free
-# Software Foundation, either version 2.1 of the License, or (at your option)
-# any later version.
-#
-# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
-# details.
-#
-# You should have received a copy of the GNU Lesser General Public License along
-# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
-"""Test tools for cubicweb"""
-from __future__ import print_function
-
-__docformat__ = "restructuredtext en"
-
-import os
-import sys
-import errno
-import logging
-import shutil
-import glob
-import subprocess
-import warnings
-import tempfile
-import getpass
-from hashlib import sha1  # pylint: disable=E0611
-from datetime import timedelta
-from os.path import abspath, join, exists, split, isabs, isdir
-from functools import partial
-
-from six import text_type
-from six.moves import cPickle as pickle
-
-from logilab.common.date import strptime
-from logilab.common.decorators import cached, clear_cache
-
-from cubicweb import ExecutionError, BadConnectionId
-from cubicweb import schema, cwconfig
-from cubicweb.server.serverconfig import ServerConfiguration
-from cubicweb.etwist.twconfig import WebConfigurationBase
-
-cwconfig.CubicWebConfiguration.cls_adjust_sys_path()
-
-# db auto-population configuration #############################################
-
-SYSTEM_ENTITIES = (schema.SCHEMA_TYPES
-                   | schema.INTERNAL_TYPES
-                   | schema.WORKFLOW_TYPES
-                   | set(('CWGroup', 'CWUser',))
-                   )
-SYSTEM_RELATIONS = (schema.META_RTYPES
-                    | schema.WORKFLOW_RTYPES
-                    | schema.WORKFLOW_DEF_RTYPES
-                    | schema.SYSTEM_RTYPES
-                    | schema.SCHEMA_TYPES
-                    | set(('primary_email', # deducted from other relations
-                           ))
-                    )
-
-# content validation configuration #############################################
-
-# validators are used to validate (XML, DTD, whatever) view's content
-# validators availables are :
-#  'dtd' : validates XML + declared DTD
-#  'xml' : guarantees XML is well formed
-#  None : do not try to validate anything
-
-# {'vid': validator}
-VIEW_VALIDATORS = {}
-
-
-# cubicweb test configuration ##################################################
-
-BASE_URL = 'http://testing.fr/cubicweb/'
-
-DEFAULT_SOURCES = {'system': {'adapter' : 'native',
-                              'db-encoding' : 'UTF-8', #'ISO-8859-1',
-                              'db-user' : u'admin',
-                              'db-password' : 'gingkow',
-                              'db-name' : 'tmpdb',
-                              'db-driver' : 'sqlite',
-                              'db-host' : None,
-                              },
-                   'admin' : {'login': u'admin',
-                              'password': u'gingkow',
-                              },
-                   }
-DEFAULT_PSQL_SOURCES = DEFAULT_SOURCES.copy()
-DEFAULT_PSQL_SOURCES['system'] = DEFAULT_SOURCES['system'].copy()
-DEFAULT_PSQL_SOURCES['system']['db-driver'] = 'postgres'
-DEFAULT_PSQL_SOURCES['system']['db-user'] = text_type(getpass.getuser())
-DEFAULT_PSQL_SOURCES['system']['db-password'] = None
-
-def turn_repo_off(repo):
-    """ Idea: this is less costly than a full re-creation of the repo object.
-    off:
-    * session are closed,
-    * cnxsets are closed
-    * system source is shutdown
-    """
-    if not repo._needs_refresh:
-        for sessionid in list(repo._sessions):
-            warnings.warn('%s Open session found while turning repository off'
-                          %sessionid, RuntimeWarning)
-            try:
-                repo.close(sessionid)
-            except BadConnectionId: #this is strange ? thread issue ?
-                print('XXX unknown session', sessionid)
-        for cnxset in repo.cnxsets:
-            cnxset.close(True)
-        repo.system_source.shutdown()
-        repo._needs_refresh = True
-        repo._has_started = False
-
-
-def turn_repo_on(repo):
-    """Idea: this is less costly than a full re-creation of the repo object.
-    on:
-    * cnxsets are connected
-    * cache are cleared
-    """
-    if repo._needs_refresh:
-        for cnxset in repo.cnxsets:
-            cnxset.reconnect()
-        repo._type_source_cache = {}
-        repo._extid_cache = {}
-        repo.querier._rql_cache = {}
-        repo.system_source.reset_caches()
-        repo._needs_refresh = False
-
-
-class TestServerConfiguration(ServerConfiguration):
-    mode = 'test'
-    read_instance_schema = False
-    init_repository = True
-    skip_db_create_and_restore = False
-    default_sources = DEFAULT_SOURCES
-
-    def __init__(self, appid='data', apphome=None, log_threshold=logging.CRITICAL+10):
-        # must be set before calling parent __init__
-        if apphome is None:
-            if exists(appid):
-                apphome = abspath(appid)
-            else: # cube test
-                apphome = abspath('..')
-        self._apphome = apphome
-        super(TestServerConfiguration, self).__init__(appid)
-        self.init_log(log_threshold, force=True)
-        # need this, usually triggered by cubicweb-ctl
-        self.load_cwctl_plugins()
-
-    # By default anonymous login are allow but some test need to deny of to
-    # change the default user. Set it to None to prevent anonymous login.
-    anonymous_credential = ('anon', 'anon')
-
-    def anonymous_user(self):
-        if not self.anonymous_credential:
-            return None, None
-        return self.anonymous_credential
-
-    def set_anonymous_allowed(self, allowed, anonuser=u'anon'):
-        if allowed:
-            self.anonymous_credential = (anonuser, anonuser)
-        else:
-            self.anonymous_credential = None
-
-    @property
-    def apphome(self):
-        return self._apphome
-    appdatahome = apphome
-
-    def load_configuration(self, **kw):
-        super(TestServerConfiguration, self).load_configuration(**kw)
-        # no undo support in tests
-        self.global_set_option('undo-enabled', 'n')
-
-    def main_config_file(self):
-        """return instance's control configuration file"""
-        return join(self.apphome, '%s.conf' % self.name)
-
-    def bootstrap_cubes(self):
-        try:
-            super(TestServerConfiguration, self).bootstrap_cubes()
-        except IOError:
-            # no cubes
-            self.init_cubes( () )
-
-    sourcefile = None
-    def sources_file(self):
-        """define in subclasses self.sourcefile if necessary"""
-        if self.sourcefile:
-            print('Reading sources from', self.sourcefile)
-            sourcefile = self.sourcefile
-            if not isabs(sourcefile):
-                sourcefile = join(self.apphome, sourcefile)
-        else:
-            sourcefile = super(TestServerConfiguration, self).sources_file()
-        return sourcefile
-
-    def read_sources_file(self):
-        """By default, we run tests with the sqlite DB backend.  One may use its
-        own configuration by just creating a 'sources' file in the test
-        directory from which tests are launched or by specifying an alternative
-        sources file using self.sourcefile.
-        """
-        try:
-            sources = super(TestServerConfiguration, self).read_sources_file()
-        except ExecutionError:
-            sources = {}
-        if not sources:
-            sources = self.default_sources
-        if 'admin' not in sources:
-            sources['admin'] = self.default_sources['admin']
-        return sources
-
-    # web config methods needed here for cases when we use this config as a web
-    # config
-
-    def default_base_url(self):
-        return BASE_URL
-
-
-class BaseApptestConfiguration(TestServerConfiguration, WebConfigurationBase):
-    name = 'all-in-one' # so it search for all-in-one.conf, not repository.conf
-    options = cwconfig.merge_options(TestServerConfiguration.options
-                                     + WebConfigurationBase.options)
-    cubicweb_appobject_path = TestServerConfiguration.cubicweb_appobject_path | WebConfigurationBase.cubicweb_appobject_path
-    cube_appobject_path = TestServerConfiguration.cube_appobject_path | WebConfigurationBase.cube_appobject_path
-
-    def available_languages(self, *args):
-        return self.cw_languages()
-
-
-# XXX merge with BaseApptestConfiguration ?
-class ApptestConfiguration(BaseApptestConfiguration):
-    # `skip_db_create_and_restore` controls wether or not the test database
-    # should be created / backuped / restored. If set to True, those
-    # steps are completely skipped, the database is used as is and is
-    # considered initialized
-    skip_db_create_and_restore = False
-
-    def __init__(self, appid, apphome=None,
-                 log_threshold=logging.WARNING, sourcefile=None):
-        BaseApptestConfiguration.__init__(self, appid, apphome,
-                                          log_threshold=log_threshold)
-        self.init_repository = sourcefile is None
-        self.sourcefile = sourcefile
-
-
-class PostgresApptestConfiguration(ApptestConfiguration):
-    default_sources = DEFAULT_PSQL_SOURCES
-
-
-class RealDatabaseConfiguration(ApptestConfiguration):
-    """configuration class for tests to run on a real database.
-
-    The intialization is done by specifying a source file path.
-
-    Important note: init_test_database / reset_test_database steps are
-    skipped. It's thus up to the test developer to implement setUp/tearDown
-    accordingly.
-
-    Example usage::
-
-      class MyTests(CubicWebTC):
-          _config = RealDatabaseConfiguration('myapp',
-                                              sourcefile='/path/to/sources')
-
-          def test_something(self):
-              with self.admin_access.web_request() as req:
-                  rset = req.execute('Any X WHERE X is CWUser')
-                  self.view('foaf', rset, req=req)
-
-    """
-    skip_db_create_and_restore = True
-    read_instance_schema = True # read schema from database
-
-# test database handling #######################################################
-
-DEFAULT_EMPTY_DB_ID = '__default_empty_db__'
-
-class TestDataBaseHandler(object):
-    DRIVER = None
-
-    db_cache = {}
-    explored_glob = set()
-
-    def __init__(self, config, init_config=None):
-        self.config = config
-        self.init_config = init_config
-        self._repo = None
-        # pure consistency check
-        assert self.system_source['db-driver'] == self.DRIVER
-
-        # some handlers want to store info here, avoid a warning
-        from cubicweb.server.sources.native import NativeSQLSource
-        NativeSQLSource.options += (
-            ('global-db-name',
-             {'type': 'string', 'help': 'for internal use only'
-            }),
-        )
-
-    def _ensure_test_backup_db_dir(self):
-        """Return path of directory for database backup.
-
-        The function create it if necessary"""
-        backupdir = join(self.config.apphome, 'database')
-        try:
-            os.makedirs(backupdir)
-        except:
-            if not isdir(backupdir):
-                raise
-        return backupdir
-
-    def config_path(self, db_id):
-        """Path for config backup of a given database id"""
-        return self.absolute_backup_file(db_id, 'config')
-
-    def absolute_backup_file(self, db_id, suffix):
-        """Path for config backup of a given database id"""
-        # in case db name is an absolute path, we don't want to replace anything
-        # in parent directories
-        directory, basename = split(self.dbname)
-        dbname = basename.replace('-', '_')
-        assert '.' not in db_id
-        filename = join(directory, '%s-%s.%s' % (dbname, db_id, suffix))
-        return join(self._ensure_test_backup_db_dir(), filename)
-
-    def db_cache_key(self, db_id, dbname=None):
-        """Build a database cache key for a db_id with the current config
-
-        This key is meant to be used in the cls.db_cache mapping"""
-        if dbname is None:
-            dbname = self.dbname
-        dbname = os.path.basename(dbname)
-        dbname = dbname.replace('-', '_')
-        return (self.config.apphome, dbname, db_id)
-
-    def backup_database(self, db_id):
-        """Store the content of the current database as <db_id>
-
-        The config used are also stored."""
-        backup_data = self._backup_database(db_id)
-        config_path = self.config_path(db_id)
-        # XXX we dump a dict of the config
-        # This is an experimental to help config dependant setup (like BFSS) to
-        # be propertly restored
-        with tempfile.NamedTemporaryFile(dir=os.path.dirname(config_path), delete=False) as conf_file:
-            conf_file.write(pickle.dumps(dict(self.config)))
-        os.rename(conf_file.name, config_path)
-        self.db_cache[self.db_cache_key(db_id)] = (backup_data, config_path)
-
-    def _backup_database(self, db_id):
-        """Actual backup the current database.
-
-        return a value to be stored in db_cache to allow restoration"""
-        raise NotImplementedError()
-
-    def restore_database(self, db_id):
-        """Restore a database.
-
-        takes as argument value stored in db_cache by self._backup_database"""
-        # XXX set a clearer error message ???
-        backup_coordinates, config_path = self.db_cache[self.db_cache_key(db_id)]
-        # reload the config used to create the database.
-        with open(config_path, 'rb') as f:
-            config = pickle.load(f)
-        # shutdown repo before changing database content
-        if self._repo is not None:
-            self._repo.turn_repo_off()
-        self._restore_database(backup_coordinates, config)
-
-    def _restore_database(self, backup_coordinates, config):
-        """Actual restore of the current database.
-
-        Use the value stored in db_cache as input """
-        raise NotImplementedError()
-
-    def get_repo(self, startup=False):
-        """ return Repository object on the current database.
-
-        (turn the current repo object "on" if there is one or recreate one)
-        if startup is True, server startup server hooks will be called if needed
-        """
-        if self._repo is None:
-            self._repo = self._new_repo(self.config)
-        # config has now been bootstrapped, call init_config if specified
-        if self.init_config is not None:
-            self.init_config(self.config)
-        repo = self._repo
-        repo.turn_repo_on()
-        if startup and not repo._has_started:
-            repo.hm.call_hooks('server_startup', repo=repo)
-            repo._has_started = True
-        return repo
-
-    def _new_repo(self, config):
-        """Factory method to create a new Repository Instance"""
-        config._cubes = None
-        repo = config.repository()
-        config.repository = lambda x=None: repo
-        # extending Repository class
-        repo._has_started = False
-        repo._needs_refresh = False
-        repo.turn_repo_on = partial(turn_repo_on, repo)
-        repo.turn_repo_off = partial(turn_repo_off, repo)
-        return repo
-
-    def get_cnx(self):
-        """return Connection object on the current repository"""
-        from cubicweb.repoapi import connect
-        repo = self.get_repo()
-        sources = self.config.read_sources_file()
-        login  = text_type(sources['admin']['login'])
-        password = sources['admin']['password'] or 'xxx'
-        cnx = connect(repo, login, password=password)
-        return cnx
-
-    def get_repo_and_cnx(self, db_id=DEFAULT_EMPTY_DB_ID):
-        """Reset database with the current db_id and return (repo, cnx)
-
-        A database *MUST* have been build with the current <db_id> prior to
-        call this method. See the ``build_db_cache`` method. The returned
-        repository have it's startup hooks called and the connection is
-        establised as admin."""
-
-        self.restore_database(db_id)
-        repo = self.get_repo(startup=True)
-        cnx  = self.get_cnx()
-        return repo, cnx
-
-    @property
-    def system_source(self):
-        return self.config.system_source_config
-
-    @property
-    def dbname(self):
-        return self.system_source['db-name']
-
-    def init_test_database(self):
-        """actual initialisation of the database"""
-        raise ValueError('no initialization function for driver %r' % self.DRIVER)
-
-    def has_cache(self, db_id):
-        """Check if a given database id exist in cb cache for the current config"""
-        cache_glob = self.absolute_backup_file('*', '*')
-        if cache_glob not in self.explored_glob:
-            self.discover_cached_db()
-        return self.db_cache_key(db_id) in self.db_cache
-
-    def discover_cached_db(self):
-        """Search available db_if for the current config"""
-        cache_glob = self.absolute_backup_file('*', '*')
-        directory = os.path.dirname(cache_glob)
-        entries={}
-        candidates = glob.glob(cache_glob)
-        for filepath in candidates:
-            data = os.path.basename(filepath)
-            # database backup are in the forms are <dbname>-<db_id>.<backtype>
-            dbname, data = data.split('-', 1)
-            db_id, filetype = data.split('.', 1)
-            entries.setdefault((dbname, db_id), {})[filetype] = filepath
-        for (dbname, db_id), entry in entries.items():
-            # apply necessary transformation from the driver
-            value = self.process_cache_entry(directory, dbname, db_id, entry)
-            assert 'config' in entry
-            if value is not None: # None value means "not handled by this driver
-                                  # XXX Ignored value are shadowed to other Handler if cache are common.
-                key = self.db_cache_key(db_id, dbname=dbname)
-                self.db_cache[key] = value, entry['config']
-        self.explored_glob.add(cache_glob)
-
-    def process_cache_entry(self, directory, dbname, db_id, entry):
-        """Transforms potential cache entry to proper backup coordinate
-
-        entry argument is a "filetype" -> "filepath" mapping
-        Return None if an entry should be ignored."""
-        return None
-
-    def build_db_cache(self, test_db_id=DEFAULT_EMPTY_DB_ID, pre_setup_func=None):
-        """Build Database cache for ``test_db_id`` if a cache doesn't exist
-
-        if ``test_db_id is DEFAULT_EMPTY_DB_ID`` self.init_test_database is
-        called. otherwise, DEFAULT_EMPTY_DB_ID is build/restored and
-        ``pre_setup_func`` to setup the database.
-
-        This function backup any database it build"""
-        if self.has_cache(test_db_id):
-            return #test_db_id, 'already in cache'
-        if test_db_id is DEFAULT_EMPTY_DB_ID:
-            self.init_test_database()
-        else:
-            print('Building %s for database %s' % (test_db_id, self.dbname))
-            self.build_db_cache(DEFAULT_EMPTY_DB_ID)
-            self.restore_database(DEFAULT_EMPTY_DB_ID)
-            repo = self.get_repo(startup=True)
-            cnx = self.get_cnx()
-            with cnx:
-                pre_setup_func(cnx, self.config)
-                cnx.commit()
-        self.backup_database(test_db_id)
-
-
-class NoCreateDropDatabaseHandler(TestDataBaseHandler):
-    """This handler is used if config.skip_db_create_and_restore is True
-
-    This is typically the case with RealDBConfig. In that case,
-    we explicitely want to skip init / backup / restore phases.
-
-    This handler redefines the three corresponding methods and delegates
-    to original handler for any other method / attribute
-    """
-
-    def __init__(self, base_handler):
-        self.base_handler = base_handler
-
-    # override init / backup / restore methods
-    def init_test_database(self):
-        pass
-
-    def backup_database(self, db_id):
-        pass
-
-    def restore_database(self, db_id):
-        pass
-
-    # delegate to original handler in all other cases
-    def __getattr__(self, attrname):
-        return getattr(self.base_handler, attrname)
-
-
-### postgres test database handling ############################################
-
-def startpgcluster(pyfile):
-    """Start a postgresql cluster next to pyfile"""
-    datadir = join(os.path.dirname(pyfile), 'data', 'database',
-                   'pgdb-%s' % os.path.splitext(os.path.basename(pyfile))[0])
-    if not exists(datadir):
-        try:
-            subprocess.check_call(['initdb', '-D', datadir, '-E', 'utf-8', '--locale=C'])
-
-        except OSError as err:
-            if err.errno == errno.ENOENT:
-                raise OSError('"initdb" could not be found. '
-                              'You should add the postgresql bin folder to your PATH '
-                              '(/usr/lib/postgresql/9.1/bin for example).')
-            raise
-    datadir = os.path.abspath(datadir)
-    pgport = '5432'
-    env = os.environ.copy()
-    sockdir = tempfile.mkdtemp(prefix='cwpg')
-    DEFAULT_PSQL_SOURCES['system']['db-host'] = sockdir
-    DEFAULT_PSQL_SOURCES['system']['db-port'] = pgport
-    options = '-h "" -k %s -p %s' % (sockdir, pgport)
-    options += ' -c fsync=off -c full_page_writes=off'
-    options += ' -c synchronous_commit=off'
-    try:
-        subprocess.check_call(['pg_ctl', 'start', '-w', '-D', datadir,
-                               '-o', options],
-                              env=env)
-    except OSError as err:
-        try:
-            os.rmdir(sockdir)
-        except OSError:
-            pass
-        if err.errno == errno.ENOENT:
-            raise OSError('"pg_ctl" could not be found. '
-                          'You should add the postgresql bin folder to your PATH '
-                          '(/usr/lib/postgresql/9.1/bin for example).')
-        raise
-
-
-def stoppgcluster(pyfile):
-    """Kill the postgresql cluster running next to pyfile"""
-    datadir = join(os.path.dirname(pyfile), 'data', 'database',
-                   'pgdb-%s' % os.path.splitext(os.path.basename(pyfile))[0])
-    subprocess.call(['pg_ctl', 'stop', '-D', datadir, '-m', 'fast'])
-    try:
-        os.rmdir(DEFAULT_PSQL_SOURCES['system']['db-host'])
-    except OSError:
-        pass
-
-
-class PostgresTestDataBaseHandler(TestDataBaseHandler):
-    DRIVER = 'postgres'
-
-    # Separate db_cache for PG databases, to avoid collisions with sqlite dbs
-    db_cache = {}
-    explored_glob = set()
-
-    __CTL = set()
-
-    def __init__(self, *args, **kwargs):
-        super(PostgresTestDataBaseHandler, self).__init__(*args, **kwargs)
-        if 'global-db-name' not in self.system_source:
-            self.system_source['global-db-name'] = self.system_source['db-name']
-            self.system_source['db-name'] = self.system_source['db-name'] + str(os.getpid())
-
-    @property
-    @cached
-    def helper(self):
-        from logilab.database import get_db_helper
-        return get_db_helper('postgres')
-
-    @property
-    def dbname(self):
-        return self.system_source['global-db-name']
-
-    @property
-    def dbcnx(self):
-        try:
-            return self._cnx
-        except AttributeError:
-            from cubicweb.server.serverctl import _db_sys_cnx
-            try:
-                self._cnx = _db_sys_cnx(
-                    self.system_source, 'CREATE DATABASE and / or USER',
-                    interactive=False)
-                return self._cnx
-            except Exception:
-                self._cnx = None
-                raise
-
-    @property
-    @cached
-    def cursor(self):
-        return self.dbcnx.cursor()
-
-    def process_cache_entry(self, directory, dbname, db_id, entry):
-        backup_name = self._backup_name(db_id)
-        if backup_name in self.helper.list_databases(self.cursor):
-            return backup_name
-        return None
-
-    def has_cache(self, db_id):
-        backup_name = self._backup_name(db_id)
-        return (super(PostgresTestDataBaseHandler, self).has_cache(db_id)
-                and backup_name in self.helper.list_databases(self.cursor))
-
-    def init_test_database(self):
-        """initialize a fresh postgresql database used for testing purpose"""
-        from cubicweb.server import init_repository
-        from cubicweb.server.serverctl import system_source_cnx, createdb
-        # connect on the dbms system base to create our base
-        try:
-            self._drop(self.system_source['db-name'])
-            createdb(self.helper, self.system_source, self.dbcnx, self.cursor)
-            self.dbcnx.commit()
-            cnx = system_source_cnx(self.system_source, special_privs='LANGUAGE C',
-                                    interactive=False)
-            templcursor = cnx.cursor()
-            try:
-                # XXX factorize with db-create code
-                self.helper.init_fti_extensions(templcursor)
-                # install plpythonu/plpgsql language if not installed by the cube
-                langs = sys.platform == 'win32' and ('plpgsql',) or ('plpythonu', 'plpgsql')
-                for extlang in langs:
-                    self.helper.create_language(templcursor, extlang)
-                cnx.commit()
-            finally:
-                templcursor.close()
-                cnx.close()
-            init_repository(self.config, interactive=False,
-                            init_config=self.init_config)
-        except BaseException:
-            if self.dbcnx is not None:
-                self.dbcnx.rollback()
-            sys.stderr.write('building %s failed\n' % self.dbname)
-            #self._drop(self.dbname)
-            raise
-
-    def helper_clear_cache(self):
-        if self.dbcnx is not None:
-            self.dbcnx.commit()
-            self.dbcnx.close()
-            del self._cnx
-            clear_cache(self, 'cursor')
-        clear_cache(self, 'helper')
-
-    def __del__(self):
-        self.helper_clear_cache()
-
-    @property
-    def _config_id(self):
-        return sha1(self.config.apphome.encode('utf-8')).hexdigest()[:10]
-
-    def _backup_name(self, db_id): # merge me with parent
-        backup_name = '_'.join(('cache', self._config_id, self.dbname, db_id))
-        return backup_name.lower()
-
-    def _drop(self, db_name):
-        if db_name in self.helper.list_databases(self.cursor):
-            self.cursor.execute('DROP DATABASE %s' % db_name)
-            self.dbcnx.commit()
-
-    def _backup_database(self, db_id):
-        """Actual backup the current database.
-
-        return a value to be stored in db_cache to allow restoration
-        """
-        from cubicweb.server.serverctl import createdb
-        orig_name = self.system_source['db-name']
-        try:
-            backup_name = self._backup_name(db_id)
-            self._drop(backup_name)
-            self.system_source['db-name'] = backup_name
-            if self._repo:
-                self._repo.turn_repo_off()
-            try:
-                createdb(self.helper, self.system_source, self.dbcnx, self.cursor, template=orig_name)
-                self.dbcnx.commit()
-            finally:
-                if self._repo:
-                    self._repo.turn_repo_on()
-            return backup_name
-        finally:
-            self.system_source['db-name'] = orig_name
-
-    def _restore_database(self, backup_coordinates, config):
-        from cubicweb.server.serverctl import createdb
-        """Actual restore of the current database.
-
-        Use the value tostored in db_cache as input """
-        self._drop(self.system_source['db-name'])
-        createdb(self.helper, self.system_source, self.dbcnx, self.cursor,
-                 template=backup_coordinates)
-        self.dbcnx.commit()
-
-
-
-### sqlserver2005 test database handling #######################################
-
-class SQLServerTestDataBaseHandler(TestDataBaseHandler):
-    DRIVER = 'sqlserver'
-
-    # XXX complete me
-
-    def init_test_database(self):
-        """initialize a fresh sqlserver databse used for testing purpose"""
-        if self.config.init_repository:
-            from cubicweb.server import init_repository
-            init_repository(self.config, interactive=False, drop=True,
-                            init_config=self.init_config)
-
-### sqlite test database handling ##############################################
-
-class SQLiteTestDataBaseHandler(TestDataBaseHandler):
-    DRIVER = 'sqlite'
-
-    __TMPDB = set()
-
-    @classmethod
-    def _cleanup_all_tmpdb(cls):
-        for dbpath in cls.__TMPDB:
-            cls._cleanup_database(dbpath)
-
-
-
-    def __init__(self, *args, **kwargs):
-        super(SQLiteTestDataBaseHandler, self).__init__(*args, **kwargs)
-        # use a dedicated base for each process.
-        if 'global-db-name' not in self.system_source:
-            self.system_source['global-db-name'] = self.system_source['db-name']
-            process_db = self.system_source['db-name'] + str(os.getpid())
-            self.system_source['db-name'] = process_db
-        process_db = self.absolute_dbfile() # update db-name to absolute path
-        self.__TMPDB.add(process_db)
-
-    @staticmethod
-    def _cleanup_database(dbfile):
-        try:
-            os.remove(dbfile)
-            os.remove('%s-journal' % dbfile)
-        except OSError:
-            pass
-
-    @property
-    def dbname(self):
-        return self.system_source['global-db-name']
-
-    def absolute_dbfile(self):
-        """absolute path of current database file"""
-        dbfile = join(self._ensure_test_backup_db_dir(),
-                      self.system_source['db-name'])
-        self.system_source['db-name'] = dbfile
-        return dbfile
-
-    def process_cache_entry(self, directory, dbname, db_id, entry):
-        return entry.get('sqlite')
-
-    def _backup_database(self, db_id=DEFAULT_EMPTY_DB_ID):
-        # XXX remove database file if it exists ???
-        dbfile = self.absolute_dbfile()
-        backup_file = self.absolute_backup_file(db_id, 'sqlite')
-        shutil.copy(dbfile, backup_file)
-        # Useful to debug WHO writes a database
-        # backup_stack = self.absolute_backup_file(db_id, '.stack')
-        #with open(backup_stack, 'w') as backup_stack_file:
-        #    import traceback
-        #    traceback.print_stack(file=backup_stack_file)
-        return backup_file
-
-    def _restore_database(self, backup_coordinates, _config):
-        # remove database file if it exists ?
-        dbfile = self.absolute_dbfile()
-        self._cleanup_database(dbfile)
-        shutil.copy(backup_coordinates, dbfile)
-        self.get_repo()
-
-    def init_test_database(self):
-        """initialize a fresh sqlite databse used for testing purpose"""
-        # initialize the database
-        from cubicweb.server import init_repository
-        self._cleanup_database(self.absolute_dbfile())
-        init_repository(self.config, interactive=False,
-                        init_config=self.init_config)
-
-import atexit
-atexit.register(SQLiteTestDataBaseHandler._cleanup_all_tmpdb)
-
-
-HANDLERS = {}
-
-def register_handler(handlerkls, overwrite=False):
-    assert handlerkls is not None
-    if overwrite or handlerkls.DRIVER not in HANDLERS:
-        HANDLERS[handlerkls.DRIVER] = handlerkls
-    else:
-        msg = "%s: Handler already exists use overwrite if it's intended\n"\
-              "(existing handler class is %r)"
-        raise ValueError(msg % (handlerkls.DRIVER, HANDLERS[handlerkls.DRIVER]))
-
-register_handler(PostgresTestDataBaseHandler)
-register_handler(SQLiteTestDataBaseHandler)
-register_handler(SQLServerTestDataBaseHandler)
-
-
-class HCache(object):
-    """Handler cache object: store database handler for a given configuration.
-
-    We only keep one repo in cache to prevent too much objects to stay alive
-    (database handler holds a reference to a repository). As at the moment a new
-    handler is created for each TestCase class and all test methods are executed
-    sequentially whithin this class, there should not have more cache miss that
-    if we had a wider cache as once a Handler stop being used it won't be used
-    again.
-    """
-
-    def __init__(self):
-        self.config = None
-        self.handler = None
-
-    def get(self, config):
-        if config is self.config:
-            return self.handler
-        else:
-            return None
-
-    def set(self, config, handler):
-        self.config = config
-        self.handler = handler
-
-HCACHE = HCache()
-
-
-# XXX a class method on Test ?
-
-_CONFIG = None
-def get_test_db_handler(config, init_config=None):
-    global _CONFIG
-    if _CONFIG is not None and config is not _CONFIG:
-        from logilab.common.modutils import cleanup_sys_modules
-        # cleanup all dynamically loaded modules and everything in the instance
-        # directory
-        apphome = _CONFIG.apphome
-        if apphome: # may be unset in tests
-            cleanup_sys_modules([apphome])
-        # also cleanup sys.path
-        if apphome in sys.path:
-            sys.path.remove(apphome)
-    _CONFIG = config
-    config.adjust_sys_path()
-    handler = HCACHE.get(config)
-    if handler is not None:
-        return handler
-    driver = config.system_source_config['db-driver']
-    key = (driver, config)
-    handlerkls = HANDLERS.get(driver, None)
-    if handlerkls is not None:
-        handler = handlerkls(config, init_config)
-        if config.skip_db_create_and_restore:
-            handler = NoCreateDropDatabaseHandler(handler)
-        HCACHE.set(config, handler)
-        return handler
-    else:
-        raise ValueError('no initialization function for driver %r' % driver)
-
-### compatibility layer ##############################################
-from logilab.common.deprecation import deprecated
-
-@deprecated("please use the new DatabaseHandler mecanism")
-def init_test_database(config=None, configdir='data', apphome=None):
-    """init a test database for a specific driver"""
-    if config is None:
-        config = TestServerConfiguration(apphome=apphome)
-    handler = get_test_db_handler(config)
-    handler.build_db_cache()
-    return handler.get_repo_and_cnx()