cubicweb/devtools/__init__.py
changeset 11057 0b59724cb3f2
parent 11035 0fb100e8385b
child 11129 97095348b3ee
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/cubicweb/devtools/__init__.py	Sat Jan 16 13:48:51 2016 +0100
@@ -0,0 +1,915 @@
+# copyright 2003-2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
+#
+# This file is part of CubicWeb.
+#
+# CubicWeb is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License along
+# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
+"""Test tools for cubicweb"""
+from __future__ import print_function
+
+__docformat__ = "restructuredtext en"
+
+import os
+import sys
+import errno
+import logging
+import shutil
+import glob
+import subprocess
+import warnings
+import tempfile
+import getpass
+from hashlib import sha1  # pylint: disable=E0611
+from datetime import timedelta
+from os.path import abspath, join, exists, split, isabs, isdir
+from functools import partial
+
+from six import text_type
+from six.moves import cPickle as pickle
+
+from logilab.common.date import strptime
+from logilab.common.decorators import cached, clear_cache
+
+from cubicweb import ExecutionError, BadConnectionId
+from cubicweb import schema, cwconfig
+from cubicweb.server.serverconfig import ServerConfiguration
+from cubicweb.etwist.twconfig import WebConfigurationBase
+
+cwconfig.CubicWebConfiguration.cls_adjust_sys_path()
+
+# db auto-population configuration #############################################
+
+SYSTEM_ENTITIES = (schema.SCHEMA_TYPES
+                   | schema.INTERNAL_TYPES
+                   | schema.WORKFLOW_TYPES
+                   | set(('CWGroup', 'CWUser',))
+                   )
+SYSTEM_RELATIONS = (schema.META_RTYPES
+                    | schema.WORKFLOW_RTYPES
+                    | schema.WORKFLOW_DEF_RTYPES
+                    | schema.SYSTEM_RTYPES
+                    | schema.SCHEMA_TYPES
+                    | set(('primary_email', # deducted from other relations
+                           ))
+                    )
+
+# content validation configuration #############################################
+
+# validators are used to validate (XML, DTD, whatever) view's content
+# validators availables are :
+#  'dtd' : validates XML + declared DTD
+#  'xml' : guarantees XML is well formed
+#  None : do not try to validate anything
+
+# {'vid': validator}
+VIEW_VALIDATORS = {}
+
+
+# cubicweb test configuration ##################################################
+
+BASE_URL = 'http://testing.fr/cubicweb/'
+
+DEFAULT_SOURCES = {'system': {'adapter' : 'native',
+                              'db-encoding' : 'UTF-8', #'ISO-8859-1',
+                              'db-user' : u'admin',
+                              'db-password' : 'gingkow',
+                              'db-name' : 'tmpdb',
+                              'db-driver' : 'sqlite',
+                              'db-host' : None,
+                              },
+                   'admin' : {'login': u'admin',
+                              'password': u'gingkow',
+                              },
+                   }
+DEFAULT_PSQL_SOURCES = DEFAULT_SOURCES.copy()
+DEFAULT_PSQL_SOURCES['system'] = DEFAULT_SOURCES['system'].copy()
+DEFAULT_PSQL_SOURCES['system']['db-driver'] = 'postgres'
+DEFAULT_PSQL_SOURCES['system']['db-user'] = text_type(getpass.getuser())
+DEFAULT_PSQL_SOURCES['system']['db-password'] = None
+
+def turn_repo_off(repo):
+    """ Idea: this is less costly than a full re-creation of the repo object.
+    off:
+    * session are closed,
+    * cnxsets are closed
+    * system source is shutdown
+    """
+    if not repo._needs_refresh:
+        for sessionid in list(repo._sessions):
+            warnings.warn('%s Open session found while turning repository off'
+                          %sessionid, RuntimeWarning)
+            try:
+                repo.close(sessionid)
+            except BadConnectionId: #this is strange ? thread issue ?
+                print('XXX unknown session', sessionid)
+        for cnxset in repo.cnxsets:
+            cnxset.close(True)
+        repo.system_source.shutdown()
+        repo._needs_refresh = True
+        repo._has_started = False
+
+
+def turn_repo_on(repo):
+    """Idea: this is less costly than a full re-creation of the repo object.
+    on:
+    * cnxsets are connected
+    * cache are cleared
+    """
+    if repo._needs_refresh:
+        for cnxset in repo.cnxsets:
+            cnxset.reconnect()
+        repo._type_source_cache = {}
+        repo._extid_cache = {}
+        repo.querier._rql_cache = {}
+        repo.system_source.reset_caches()
+        repo._needs_refresh = False
+
+
+class TestServerConfiguration(ServerConfiguration):
+    mode = 'test'
+    read_instance_schema = False
+    init_repository = True
+    skip_db_create_and_restore = False
+    default_sources = DEFAULT_SOURCES
+
+    def __init__(self, appid='data', apphome=None, log_threshold=logging.CRITICAL+10):
+        # must be set before calling parent __init__
+        if apphome is None:
+            if exists(appid):
+                apphome = abspath(appid)
+            else: # cube test
+                apphome = abspath('..')
+        self._apphome = apphome
+        super(TestServerConfiguration, self).__init__(appid)
+        self.init_log(log_threshold, force=True)
+        # need this, usually triggered by cubicweb-ctl
+        self.load_cwctl_plugins()
+
+    # By default anonymous login are allow but some test need to deny of to
+    # change the default user. Set it to None to prevent anonymous login.
+    anonymous_credential = ('anon', 'anon')
+
+    def anonymous_user(self):
+        if not self.anonymous_credential:
+            return None, None
+        return self.anonymous_credential
+
+    def set_anonymous_allowed(self, allowed, anonuser=u'anon'):
+        if allowed:
+            self.anonymous_credential = (anonuser, anonuser)
+        else:
+            self.anonymous_credential = None
+
+    @property
+    def apphome(self):
+        return self._apphome
+    appdatahome = apphome
+
+    def load_configuration(self, **kw):
+        super(TestServerConfiguration, self).load_configuration(**kw)
+        # no undo support in tests
+        self.global_set_option('undo-enabled', 'n')
+
+    def main_config_file(self):
+        """return instance's control configuration file"""
+        return join(self.apphome, '%s.conf' % self.name)
+
+    def bootstrap_cubes(self):
+        try:
+            super(TestServerConfiguration, self).bootstrap_cubes()
+        except IOError:
+            # no cubes
+            self.init_cubes( () )
+
+    sourcefile = None
+    def sources_file(self):
+        """define in subclasses self.sourcefile if necessary"""
+        if self.sourcefile:
+            print('Reading sources from', self.sourcefile)
+            sourcefile = self.sourcefile
+            if not isabs(sourcefile):
+                sourcefile = join(self.apphome, sourcefile)
+        else:
+            sourcefile = super(TestServerConfiguration, self).sources_file()
+        return sourcefile
+
+    def read_sources_file(self):
+        """By default, we run tests with the sqlite DB backend.  One may use its
+        own configuration by just creating a 'sources' file in the test
+        directory from which tests are launched or by specifying an alternative
+        sources file using self.sourcefile.
+        """
+        try:
+            sources = super(TestServerConfiguration, self).read_sources_file()
+        except ExecutionError:
+            sources = {}
+        if not sources:
+            sources = self.default_sources
+        if 'admin' not in sources:
+            sources['admin'] = self.default_sources['admin']
+        return sources
+
+    # web config methods needed here for cases when we use this config as a web
+    # config
+
+    def default_base_url(self):
+        return BASE_URL
+
+
+class BaseApptestConfiguration(TestServerConfiguration, WebConfigurationBase):
+    name = 'all-in-one' # so it search for all-in-one.conf, not repository.conf
+    options = cwconfig.merge_options(TestServerConfiguration.options
+                                     + WebConfigurationBase.options)
+    cubicweb_appobject_path = TestServerConfiguration.cubicweb_appobject_path | WebConfigurationBase.cubicweb_appobject_path
+    cube_appobject_path = TestServerConfiguration.cube_appobject_path | WebConfigurationBase.cube_appobject_path
+
+    def available_languages(self, *args):
+        return self.cw_languages()
+
+
+# XXX merge with BaseApptestConfiguration ?
+class ApptestConfiguration(BaseApptestConfiguration):
+    # `skip_db_create_and_restore` controls wether or not the test database
+    # should be created / backuped / restored. If set to True, those
+    # steps are completely skipped, the database is used as is and is
+    # considered initialized
+    skip_db_create_and_restore = False
+
+    def __init__(self, appid, apphome=None,
+                 log_threshold=logging.WARNING, sourcefile=None):
+        BaseApptestConfiguration.__init__(self, appid, apphome,
+                                          log_threshold=log_threshold)
+        self.init_repository = sourcefile is None
+        self.sourcefile = sourcefile
+
+
+class PostgresApptestConfiguration(ApptestConfiguration):
+    default_sources = DEFAULT_PSQL_SOURCES
+
+
+class RealDatabaseConfiguration(ApptestConfiguration):
+    """configuration class for tests to run on a real database.
+
+    The intialization is done by specifying a source file path.
+
+    Important note: init_test_database / reset_test_database steps are
+    skipped. It's thus up to the test developer to implement setUp/tearDown
+    accordingly.
+
+    Example usage::
+
+      class MyTests(CubicWebTC):
+          _config = RealDatabaseConfiguration('myapp',
+                                              sourcefile='/path/to/sources')
+
+          def test_something(self):
+              with self.admin_access.web_request() as req:
+                  rset = req.execute('Any X WHERE X is CWUser')
+                  self.view('foaf', rset, req=req)
+
+    """
+    skip_db_create_and_restore = True
+    read_instance_schema = True # read schema from database
+
+# test database handling #######################################################
+
+DEFAULT_EMPTY_DB_ID = '__default_empty_db__'
+
+class TestDataBaseHandler(object):
+    DRIVER = None
+
+    db_cache = {}
+    explored_glob = set()
+
+    def __init__(self, config, init_config=None):
+        self.config = config
+        self.init_config = init_config
+        self._repo = None
+        # pure consistency check
+        assert self.system_source['db-driver'] == self.DRIVER
+
+        # some handlers want to store info here, avoid a warning
+        from cubicweb.server.sources.native import NativeSQLSource
+        NativeSQLSource.options += (
+            ('global-db-name',
+             {'type': 'string', 'help': 'for internal use only'
+            }),
+        )
+
+    def _ensure_test_backup_db_dir(self):
+        """Return path of directory for database backup.
+
+        The function create it if necessary"""
+        backupdir = join(self.config.apphome, 'database')
+        try:
+            os.makedirs(backupdir)
+        except:
+            if not isdir(backupdir):
+                raise
+        return backupdir
+
+    def config_path(self, db_id):
+        """Path for config backup of a given database id"""
+        return self.absolute_backup_file(db_id, 'config')
+
+    def absolute_backup_file(self, db_id, suffix):
+        """Path for config backup of a given database id"""
+        # in case db name is an absolute path, we don't want to replace anything
+        # in parent directories
+        directory, basename = split(self.dbname)
+        dbname = basename.replace('-', '_')
+        assert '.' not in db_id
+        filename = join(directory, '%s-%s.%s' % (dbname, db_id, suffix))
+        return join(self._ensure_test_backup_db_dir(), filename)
+
+    def db_cache_key(self, db_id, dbname=None):
+        """Build a database cache key for a db_id with the current config
+
+        This key is meant to be used in the cls.db_cache mapping"""
+        if dbname is None:
+            dbname = self.dbname
+        dbname = os.path.basename(dbname)
+        dbname = dbname.replace('-', '_')
+        return (self.config.apphome, dbname, db_id)
+
+    def backup_database(self, db_id):
+        """Store the content of the current database as <db_id>
+
+        The config used are also stored."""
+        backup_data = self._backup_database(db_id)
+        config_path = self.config_path(db_id)
+        # XXX we dump a dict of the config
+        # This is an experimental to help config dependant setup (like BFSS) to
+        # be propertly restored
+        with tempfile.NamedTemporaryFile(dir=os.path.dirname(config_path), delete=False) as conf_file:
+            conf_file.write(pickle.dumps(dict(self.config)))
+        os.rename(conf_file.name, config_path)
+        self.db_cache[self.db_cache_key(db_id)] = (backup_data, config_path)
+
+    def _backup_database(self, db_id):
+        """Actual backup the current database.
+
+        return a value to be stored in db_cache to allow restoration"""
+        raise NotImplementedError()
+
+    def restore_database(self, db_id):
+        """Restore a database.
+
+        takes as argument value stored in db_cache by self._backup_database"""
+        # XXX set a clearer error message ???
+        backup_coordinates, config_path = self.db_cache[self.db_cache_key(db_id)]
+        # reload the config used to create the database.
+        with open(config_path, 'rb') as f:
+            config = pickle.load(f)
+        # shutdown repo before changing database content
+        if self._repo is not None:
+            self._repo.turn_repo_off()
+        self._restore_database(backup_coordinates, config)
+
+    def _restore_database(self, backup_coordinates, config):
+        """Actual restore of the current database.
+
+        Use the value stored in db_cache as input """
+        raise NotImplementedError()
+
+    def get_repo(self, startup=False):
+        """ return Repository object on the current database.
+
+        (turn the current repo object "on" if there is one or recreate one)
+        if startup is True, server startup server hooks will be called if needed
+        """
+        if self._repo is None:
+            self._repo = self._new_repo(self.config)
+        # config has now been bootstrapped, call init_config if specified
+        if self.init_config is not None:
+            self.init_config(self.config)
+        repo = self._repo
+        repo.turn_repo_on()
+        if startup and not repo._has_started:
+            repo.hm.call_hooks('server_startup', repo=repo)
+            repo._has_started = True
+        return repo
+
+    def _new_repo(self, config):
+        """Factory method to create a new Repository Instance"""
+        config._cubes = None
+        repo = config.repository()
+        config.repository = lambda x=None: repo
+        # extending Repository class
+        repo._has_started = False
+        repo._needs_refresh = False
+        repo.turn_repo_on = partial(turn_repo_on, repo)
+        repo.turn_repo_off = partial(turn_repo_off, repo)
+        return repo
+
+    def get_cnx(self):
+        """return Connection object on the current repository"""
+        from cubicweb.repoapi import connect
+        repo = self.get_repo()
+        sources = self.config.read_sources_file()
+        login  = text_type(sources['admin']['login'])
+        password = sources['admin']['password'] or 'xxx'
+        cnx = connect(repo, login, password=password)
+        return cnx
+
+    def get_repo_and_cnx(self, db_id=DEFAULT_EMPTY_DB_ID):
+        """Reset database with the current db_id and return (repo, cnx)
+
+        A database *MUST* have been build with the current <db_id> prior to
+        call this method. See the ``build_db_cache`` method. The returned
+        repository have it's startup hooks called and the connection is
+        establised as admin."""
+
+        self.restore_database(db_id)
+        repo = self.get_repo(startup=True)
+        cnx  = self.get_cnx()
+        return repo, cnx
+
+    @property
+    def system_source(self):
+        return self.config.system_source_config
+
+    @property
+    def dbname(self):
+        return self.system_source['db-name']
+
+    def init_test_database(self):
+        """actual initialisation of the database"""
+        raise ValueError('no initialization function for driver %r' % self.DRIVER)
+
+    def has_cache(self, db_id):
+        """Check if a given database id exist in cb cache for the current config"""
+        cache_glob = self.absolute_backup_file('*', '*')
+        if cache_glob not in self.explored_glob:
+            self.discover_cached_db()
+        return self.db_cache_key(db_id) in self.db_cache
+
+    def discover_cached_db(self):
+        """Search available db_if for the current config"""
+        cache_glob = self.absolute_backup_file('*', '*')
+        directory = os.path.dirname(cache_glob)
+        entries={}
+        candidates = glob.glob(cache_glob)
+        for filepath in candidates:
+            data = os.path.basename(filepath)
+            # database backup are in the forms are <dbname>-<db_id>.<backtype>
+            dbname, data = data.split('-', 1)
+            db_id, filetype = data.split('.', 1)
+            entries.setdefault((dbname, db_id), {})[filetype] = filepath
+        for (dbname, db_id), entry in entries.items():
+            # apply necessary transformation from the driver
+            value = self.process_cache_entry(directory, dbname, db_id, entry)
+            assert 'config' in entry
+            if value is not None: # None value means "not handled by this driver
+                                  # XXX Ignored value are shadowed to other Handler if cache are common.
+                key = self.db_cache_key(db_id, dbname=dbname)
+                self.db_cache[key] = value, entry['config']
+        self.explored_glob.add(cache_glob)
+
+    def process_cache_entry(self, directory, dbname, db_id, entry):
+        """Transforms potential cache entry to proper backup coordinate
+
+        entry argument is a "filetype" -> "filepath" mapping
+        Return None if an entry should be ignored."""
+        return None
+
+    def build_db_cache(self, test_db_id=DEFAULT_EMPTY_DB_ID, pre_setup_func=None):
+        """Build Database cache for ``test_db_id`` if a cache doesn't exist
+
+        if ``test_db_id is DEFAULT_EMPTY_DB_ID`` self.init_test_database is
+        called. otherwise, DEFAULT_EMPTY_DB_ID is build/restored and
+        ``pre_setup_func`` to setup the database.
+
+        This function backup any database it build"""
+        if self.has_cache(test_db_id):
+            return #test_db_id, 'already in cache'
+        if test_db_id is DEFAULT_EMPTY_DB_ID:
+            self.init_test_database()
+        else:
+            print('Building %s for database %s' % (test_db_id, self.dbname))
+            self.build_db_cache(DEFAULT_EMPTY_DB_ID)
+            self.restore_database(DEFAULT_EMPTY_DB_ID)
+            repo = self.get_repo(startup=True)
+            cnx = self.get_cnx()
+            with cnx:
+                pre_setup_func(cnx, self.config)
+                cnx.commit()
+        self.backup_database(test_db_id)
+
+
+class NoCreateDropDatabaseHandler(TestDataBaseHandler):
+    """This handler is used if config.skip_db_create_and_restore is True
+
+    This is typically the case with RealDBConfig. In that case,
+    we explicitely want to skip init / backup / restore phases.
+
+    This handler redefines the three corresponding methods and delegates
+    to original handler for any other method / attribute
+    """
+
+    def __init__(self, base_handler):
+        self.base_handler = base_handler
+
+    # override init / backup / restore methods
+    def init_test_database(self):
+        pass
+
+    def backup_database(self, db_id):
+        pass
+
+    def restore_database(self, db_id):
+        pass
+
+    # delegate to original handler in all other cases
+    def __getattr__(self, attrname):
+        return getattr(self.base_handler, attrname)
+
+
+### postgres test database handling ############################################
+
+def startpgcluster(pyfile):
+    """Start a postgresql cluster next to pyfile"""
+    datadir = join(os.path.dirname(pyfile), 'data', 'database',
+                   'pgdb-%s' % os.path.splitext(os.path.basename(pyfile))[0])
+    if not exists(datadir):
+        try:
+            subprocess.check_call(['initdb', '-D', datadir, '-E', 'utf-8', '--locale=C'])
+
+        except OSError as err:
+            if err.errno == errno.ENOENT:
+                raise OSError('"initdb" could not be found. '
+                              'You should add the postgresql bin folder to your PATH '
+                              '(/usr/lib/postgresql/9.1/bin for example).')
+            raise
+    datadir = os.path.abspath(datadir)
+    pgport = '5432'
+    env = os.environ.copy()
+    sockdir = tempfile.mkdtemp(prefix='cwpg')
+    DEFAULT_PSQL_SOURCES['system']['db-host'] = sockdir
+    DEFAULT_PSQL_SOURCES['system']['db-port'] = pgport
+    options = '-h "" -k %s -p %s' % (sockdir, pgport)
+    options += ' -c fsync=off -c full_page_writes=off'
+    options += ' -c synchronous_commit=off'
+    try:
+        subprocess.check_call(['pg_ctl', 'start', '-w', '-D', datadir,
+                               '-o', options],
+                              env=env)
+    except OSError as err:
+        try:
+            os.rmdir(sockdir)
+        except OSError:
+            pass
+        if err.errno == errno.ENOENT:
+            raise OSError('"pg_ctl" could not be found. '
+                          'You should add the postgresql bin folder to your PATH '
+                          '(/usr/lib/postgresql/9.1/bin for example).')
+        raise
+
+
+def stoppgcluster(pyfile):
+    """Kill the postgresql cluster running next to pyfile"""
+    datadir = join(os.path.dirname(pyfile), 'data', 'database',
+                   'pgdb-%s' % os.path.splitext(os.path.basename(pyfile))[0])
+    subprocess.call(['pg_ctl', 'stop', '-D', datadir, '-m', 'fast'])
+    try:
+        os.rmdir(DEFAULT_PSQL_SOURCES['system']['db-host'])
+    except OSError:
+        pass
+
+
+class PostgresTestDataBaseHandler(TestDataBaseHandler):
+    DRIVER = 'postgres'
+
+    # Separate db_cache for PG databases, to avoid collisions with sqlite dbs
+    db_cache = {}
+    explored_glob = set()
+
+    __CTL = set()
+
+    def __init__(self, *args, **kwargs):
+        super(PostgresTestDataBaseHandler, self).__init__(*args, **kwargs)
+        if 'global-db-name' not in self.system_source:
+            self.system_source['global-db-name'] = self.system_source['db-name']
+            self.system_source['db-name'] = self.system_source['db-name'] + str(os.getpid())
+
+    @property
+    @cached
+    def helper(self):
+        from logilab.database import get_db_helper
+        return get_db_helper('postgres')
+
+    @property
+    def dbname(self):
+        return self.system_source['global-db-name']
+
+    @property
+    def dbcnx(self):
+        try:
+            return self._cnx
+        except AttributeError:
+            from cubicweb.server.serverctl import _db_sys_cnx
+            try:
+                self._cnx = _db_sys_cnx(
+                    self.system_source, 'CREATE DATABASE and / or USER',
+                    interactive=False)
+                return self._cnx
+            except Exception:
+                self._cnx = None
+                raise
+
+    @property
+    @cached
+    def cursor(self):
+        return self.dbcnx.cursor()
+
+    def process_cache_entry(self, directory, dbname, db_id, entry):
+        backup_name = self._backup_name(db_id)
+        if backup_name in self.helper.list_databases(self.cursor):
+            return backup_name
+        return None
+
+    def has_cache(self, db_id):
+        backup_name = self._backup_name(db_id)
+        return (super(PostgresTestDataBaseHandler, self).has_cache(db_id)
+                and backup_name in self.helper.list_databases(self.cursor))
+
+    def init_test_database(self):
+        """initialize a fresh postgresql database used for testing purpose"""
+        from cubicweb.server import init_repository
+        from cubicweb.server.serverctl import system_source_cnx, createdb
+        # connect on the dbms system base to create our base
+        try:
+            self._drop(self.system_source['db-name'])
+            createdb(self.helper, self.system_source, self.dbcnx, self.cursor)
+            self.dbcnx.commit()
+            cnx = system_source_cnx(self.system_source, special_privs='LANGUAGE C',
+                                    interactive=False)
+            templcursor = cnx.cursor()
+            try:
+                # XXX factorize with db-create code
+                self.helper.init_fti_extensions(templcursor)
+                # install plpythonu/plpgsql language if not installed by the cube
+                langs = sys.platform == 'win32' and ('plpgsql',) or ('plpythonu', 'plpgsql')
+                for extlang in langs:
+                    self.helper.create_language(templcursor, extlang)
+                cnx.commit()
+            finally:
+                templcursor.close()
+                cnx.close()
+            init_repository(self.config, interactive=False,
+                            init_config=self.init_config)
+        except BaseException:
+            if self.dbcnx is not None:
+                self.dbcnx.rollback()
+            sys.stderr.write('building %s failed\n' % self.dbname)
+            #self._drop(self.dbname)
+            raise
+
+    def helper_clear_cache(self):
+        if self.dbcnx is not None:
+            self.dbcnx.commit()
+            self.dbcnx.close()
+            del self._cnx
+            clear_cache(self, 'cursor')
+        clear_cache(self, 'helper')
+
+    def __del__(self):
+        self.helper_clear_cache()
+
+    @property
+    def _config_id(self):
+        return sha1(self.config.apphome.encode('utf-8')).hexdigest()[:10]
+
+    def _backup_name(self, db_id): # merge me with parent
+        backup_name = '_'.join(('cache', self._config_id, self.dbname, db_id))
+        return backup_name.lower()
+
+    def _drop(self, db_name):
+        if db_name in self.helper.list_databases(self.cursor):
+            self.cursor.execute('DROP DATABASE %s' % db_name)
+            self.dbcnx.commit()
+
+    def _backup_database(self, db_id):
+        """Actual backup the current database.
+
+        return a value to be stored in db_cache to allow restoration
+        """
+        from cubicweb.server.serverctl import createdb
+        orig_name = self.system_source['db-name']
+        try:
+            backup_name = self._backup_name(db_id)
+            self._drop(backup_name)
+            self.system_source['db-name'] = backup_name
+            if self._repo:
+                self._repo.turn_repo_off()
+            try:
+                createdb(self.helper, self.system_source, self.dbcnx, self.cursor, template=orig_name)
+                self.dbcnx.commit()
+            finally:
+                if self._repo:
+                    self._repo.turn_repo_on()
+            return backup_name
+        finally:
+            self.system_source['db-name'] = orig_name
+
+    def _restore_database(self, backup_coordinates, config):
+        from cubicweb.server.serverctl import createdb
+        """Actual restore of the current database.
+
+        Use the value tostored in db_cache as input """
+        self._drop(self.system_source['db-name'])
+        createdb(self.helper, self.system_source, self.dbcnx, self.cursor,
+                 template=backup_coordinates)
+        self.dbcnx.commit()
+
+
+
+### sqlserver2005 test database handling #######################################
+
+class SQLServerTestDataBaseHandler(TestDataBaseHandler):
+    DRIVER = 'sqlserver'
+
+    # XXX complete me
+
+    def init_test_database(self):
+        """initialize a fresh sqlserver databse used for testing purpose"""
+        if self.config.init_repository:
+            from cubicweb.server import init_repository
+            init_repository(self.config, interactive=False, drop=True,
+                            init_config=self.init_config)
+
+### sqlite test database handling ##############################################
+
+class SQLiteTestDataBaseHandler(TestDataBaseHandler):
+    DRIVER = 'sqlite'
+
+    __TMPDB = set()
+
+    @classmethod
+    def _cleanup_all_tmpdb(cls):
+        for dbpath in cls.__TMPDB:
+            cls._cleanup_database(dbpath)
+
+
+
+    def __init__(self, *args, **kwargs):
+        super(SQLiteTestDataBaseHandler, self).__init__(*args, **kwargs)
+        # use a dedicated base for each process.
+        if 'global-db-name' not in self.system_source:
+            self.system_source['global-db-name'] = self.system_source['db-name']
+            process_db = self.system_source['db-name'] + str(os.getpid())
+            self.system_source['db-name'] = process_db
+        process_db = self.absolute_dbfile() # update db-name to absolute path
+        self.__TMPDB.add(process_db)
+
+    @staticmethod
+    def _cleanup_database(dbfile):
+        try:
+            os.remove(dbfile)
+            os.remove('%s-journal' % dbfile)
+        except OSError:
+            pass
+
+    @property
+    def dbname(self):
+        return self.system_source['global-db-name']
+
+    def absolute_dbfile(self):
+        """absolute path of current database file"""
+        dbfile = join(self._ensure_test_backup_db_dir(),
+                      self.system_source['db-name'])
+        self.system_source['db-name'] = dbfile
+        return dbfile
+
+    def process_cache_entry(self, directory, dbname, db_id, entry):
+        return entry.get('sqlite')
+
+    def _backup_database(self, db_id=DEFAULT_EMPTY_DB_ID):
+        # XXX remove database file if it exists ???
+        dbfile = self.absolute_dbfile()
+        backup_file = self.absolute_backup_file(db_id, 'sqlite')
+        shutil.copy(dbfile, backup_file)
+        # Useful to debug WHO writes a database
+        # backup_stack = self.absolute_backup_file(db_id, '.stack')
+        #with open(backup_stack, 'w') as backup_stack_file:
+        #    import traceback
+        #    traceback.print_stack(file=backup_stack_file)
+        return backup_file
+
+    def _restore_database(self, backup_coordinates, _config):
+        # remove database file if it exists ?
+        dbfile = self.absolute_dbfile()
+        self._cleanup_database(dbfile)
+        shutil.copy(backup_coordinates, dbfile)
+        self.get_repo()
+
+    def init_test_database(self):
+        """initialize a fresh sqlite databse used for testing purpose"""
+        # initialize the database
+        from cubicweb.server import init_repository
+        self._cleanup_database(self.absolute_dbfile())
+        init_repository(self.config, interactive=False,
+                        init_config=self.init_config)
+
+import atexit
+atexit.register(SQLiteTestDataBaseHandler._cleanup_all_tmpdb)
+
+
+HANDLERS = {}
+
+def register_handler(handlerkls, overwrite=False):
+    assert handlerkls is not None
+    if overwrite or handlerkls.DRIVER not in HANDLERS:
+        HANDLERS[handlerkls.DRIVER] = handlerkls
+    else:
+        msg = "%s: Handler already exists use overwrite if it's intended\n"\
+              "(existing handler class is %r)"
+        raise ValueError(msg % (handlerkls.DRIVER, HANDLERS[handlerkls.DRIVER]))
+
+register_handler(PostgresTestDataBaseHandler)
+register_handler(SQLiteTestDataBaseHandler)
+register_handler(SQLServerTestDataBaseHandler)
+
+
+class HCache(object):
+    """Handler cache object: store database handler for a given configuration.
+
+    We only keep one repo in cache to prevent too much objects to stay alive
+    (database handler holds a reference to a repository). As at the moment a new
+    handler is created for each TestCase class and all test methods are executed
+    sequentially whithin this class, there should not have more cache miss that
+    if we had a wider cache as once a Handler stop being used it won't be used
+    again.
+    """
+
+    def __init__(self):
+        self.config = None
+        self.handler = None
+
+    def get(self, config):
+        if config is self.config:
+            return self.handler
+        else:
+            return None
+
+    def set(self, config, handler):
+        self.config = config
+        self.handler = handler
+
+HCACHE = HCache()
+
+
+# XXX a class method on Test ?
+
+_CONFIG = None
+def get_test_db_handler(config, init_config=None):
+    global _CONFIG
+    if _CONFIG is not None and config is not _CONFIG:
+        from logilab.common.modutils import cleanup_sys_modules
+        # cleanup all dynamically loaded modules and everything in the instance
+        # directory
+        apphome = _CONFIG.apphome
+        if apphome: # may be unset in tests
+            cleanup_sys_modules([apphome])
+        # also cleanup sys.path
+        if apphome in sys.path:
+            sys.path.remove(apphome)
+    _CONFIG = config
+    config.adjust_sys_path()
+    handler = HCACHE.get(config)
+    if handler is not None:
+        return handler
+    driver = config.system_source_config['db-driver']
+    key = (driver, config)
+    handlerkls = HANDLERS.get(driver, None)
+    if handlerkls is not None:
+        handler = handlerkls(config, init_config)
+        if config.skip_db_create_and_restore:
+            handler = NoCreateDropDatabaseHandler(handler)
+        HCACHE.set(config, handler)
+        return handler
+    else:
+        raise ValueError('no initialization function for driver %r' % driver)
+
+### compatibility layer ##############################################
+from logilab.common.deprecation import deprecated
+
+@deprecated("please use the new DatabaseHandler mecanism")
+def init_test_database(config=None, configdir='data', apphome=None):
+    """init a test database for a specific driver"""
+    if config is None:
+        config = TestServerConfiguration(apphome=apphome)
+    handler = get_test_db_handler(config)
+    handler.build_db_cache()
+    return handler.get_repo_and_cnx()