--- a/.hgignore Wed Mar 16 09:45:57 2011 +0100
+++ b/.hgignore Wed Mar 16 18:12:57 2011 +0100
@@ -9,3 +9,9 @@
\#.*?\#$
\.swp$
^doc/book/en/apidoc$
+\.old$
+syntax: regexp
+.*/data/database/.*\.sqlite
+.*/data/database/.*\.config
+.*/data/database/tmpdb.*
+
--- a/devtools/__init__.py Wed Mar 16 09:45:57 2011 +0100
+++ b/devtools/__init__.py Wed Mar 16 18:12:57 2011 +0100
@@ -22,12 +22,19 @@
import os
import sys
import logging
+import shutil
+import pickle
+import glob
+import warnings
from datetime import timedelta
from os.path import (abspath, join, exists, basename, dirname, normpath, split,
- isfile, isabs, splitext)
+ isfile, isabs, splitext, isdir, expanduser)
+from functools import partial
+import hashlib
from logilab.common.date import strptime
-from cubicweb import CW_SOFTWARE_ROOT, ConfigurationError, schema, cwconfig
+from logilab.common.decorators import cached, clear_cache
+from cubicweb import CW_SOFTWARE_ROOT, ConfigurationError, schema, cwconfig, BadConnectionId
from cubicweb.server.serverconfig import ServerConfiguration
from cubicweb.etwist.twconfig import TwistedConfiguration
@@ -78,13 +85,49 @@
},
}
+def turn_repo_off(repo):
+ """ Idea: this is less costly than a full re-creation of the repo object.
+ off:
+ * session are closed,
+ * pools are closed
+ * system source is shutdown
+ """
+ if not repo._needs_refresh:
+ for sessionid in list(repo._sessions):
+ warnings.warn('%s Open session found while turning repository off'
+ %sessionid, RuntimeWarning)
+ try:
+ repo.close(sessionid)
+ except BadConnectionId: #this is strange ? thread issue ?
+ print 'XXX unknown session', sessionid
+ for pool in repo.pools:
+ pool.close(True)
+ repo.system_source.shutdown()
+ repo._needs_refresh = True
+ repo._has_started = False
+
+def turn_repo_on(repo):
+ """Idea: this is less costly than a full re-creation of the repo object.
+ on:
+ * pools are connected
+ * cache are cleared
+ """
+ if repo._needs_refresh:
+ for pool in repo.pools:
+ pool.reconnect()
+ repo._type_source_cache = {}
+ repo._extid_cache = {}
+ repo.querier._rql_cache = {}
+ for source in repo.sources:
+ source.reset_caches()
+ repo._needs_refresh = False
+
class TestServerConfiguration(ServerConfiguration):
mode = 'test'
set_language = False
read_instance_schema = False
init_repository = True
- db_require_setup = True
def __init__(self, appid='data', apphome=None, log_threshold=logging.CRITICAL+10):
# must be set before calling parent __init__
@@ -216,131 +259,402 @@
self.view('foaf', rset)
"""
- db_require_setup = False # skip init_db / reset_db steps
read_instance_schema = True # read schema from database
# test database handling #######################################################
-def init_test_database(config=None, appid='data', apphome=None):
- """init a test database for a specific driver"""
- from cubicweb.dbapi import in_memory_repo_cnx
- config = config or TestServerConfiguration(appid, apphome=apphome)
- sources = config.sources()
- driver = sources['system']['db-driver']
- if config.db_require_setup:
- if driver == 'sqlite':
- init_test_database_sqlite(config)
- elif driver == 'postgres':
- init_test_database_postgres(config)
+DEFAULT_EMPTY_DB_ID = '__default_empty_db__'
+
+class TestDataBaseHandler(object):
+ DRIVER = None
+ db_cache = {}
+ explored_glob = set()
+
+ def __init__(self, config):
+ self.config = config
+ self._repo = None
+ # pure consistency check
+ assert self.system_source['db-driver'] == self.DRIVER
+
+ def _ensure_test_backup_db_dir(self):
+ """Return path of directory for database backup.
+
+ The function create it if necessary"""
+ backupdir = join(self.config.apphome, 'database')
+ if not isdir(backupdir):
+ os.makedirs(backupdir)
+ return backupdir
+
+ def config_path(self, db_id):
+ """Path for config backup of a given database id"""
+ return self.absolute_backup_file(db_id, 'config')
+
+ def absolute_backup_file(self, db_id, suffix):
+ """Path for config backup of a given database id"""
+ dbname = self.dbname.replace('-', '_')
+ assert '.' not in db_id
+ filename = '%s-%s.%s' % (dbname, db_id, suffix)
+ return join(self._ensure_test_backup_db_dir(), filename)
+
+ def db_cache_key(self, db_id, dbname=None):
+ """Build a database cache key for a db_id with the current config
+
+ This key is meant to be used in the cls.db_cache mapping"""
+ if dbname is None:
+ dbname = self.dbname
+ dbname = os.path.basename(dbname)
+ dbname = dbname.replace('-', '_')
+ return (self.config.apphome, dbname, db_id)
+
+ def backup_database(self, db_id):
+ """Store the content of the current database as <db_id>
+
+ The config used are also stored."""
+ backup_data = self._backup_database(db_id)
+ config_path = self.config_path(db_id)
+ # XXX we dump a dict of the config
+ # This is an experimental to help config dependant setup (like BFSS) to
+ # be propertly restored
+ with open(config_path, 'wb') as conf_file:
+ conf_file.write(pickle.dumps(dict(self.config)))
+ self.db_cache[self.db_cache_key(db_id)] = (backup_data, config_path)
+
+ def _backup_database(self, db_id):
+ """Actual backup the current database.
+
+ return a value to be stored in db_cache to allow restoration"""
+ raise NotImplementedError()
+
+ def restore_database(self, db_id):
+ """Restore a database.
+
+ takes as argument value stored in db_cache by self._backup_database"""
+ # XXX set a clearer error message ???
+ backup_coordinates, config_path = self.db_cache[self.db_cache_key(db_id)]
+ # reload the config used to create the database.
+ config = pickle.loads(open(config_path, 'rb').read())
+ # shutdown repo before changing database content
+ if self._repo is not None:
+ self._repo.turn_repo_off()
+ self._restore_database(backup_coordinates, config)
+
+ def _restore_database(self, backup_coordinates, config):
+ """Actual restore of the current database.
+
+ Use the value tostored in db_cache as input """
+ raise NotImplementedError()
+
+ def get_repo(self, startup=False):
+ """ return Repository object on the current database.
+
+ (turn the current repo object "on" if there is one or recreate one)
+ if startup is True, server startup server hooks will be called if needed
+ """
+ if self._repo is None:
+ self._repo = self._new_repo(self.config)
+ repo = self._repo
+ repo.turn_repo_on()
+ if startup and not repo._has_started:
+ repo.hm.call_hooks('server_startup', repo=repo)
+ repo._has_started = True
+ return repo
+
+ def _new_repo(self, config):
+ """Factory method to create a new Repository Instance"""
+ from cubicweb.dbapi import in_memory_repo
+ config._cubes = None
+ repo = in_memory_repo(config)
+ # extending Repository class
+ repo._has_started = False
+ repo._needs_refresh = False
+ repo.turn_repo_on = partial(turn_repo_on, repo)
+ repo.turn_repo_off = partial(turn_repo_off, repo)
+ return repo
+
+
+ def get_cnx(self):
+ """return Connection object ont he current repository"""
+ from cubicweb.dbapi import in_memory_cnx
+ repo = self.get_repo()
+ sources = self.config.sources()
+ login = unicode(sources['admin']['login'])
+ password = sources['admin']['password'] or 'xxx'
+ cnx = in_memory_cnx(repo, login, password=password)
+ return cnx
+
+ def get_repo_and_cnx(self, db_id=DEFAULT_EMPTY_DB_ID):
+ """Reset database with the current db_id and return (repo, cnx)
+
+ A database *MUST* have been build with the current <db_id> prior to
+ call this method. See the ``build_db_cache`` method. The returned
+ repository have it's startup hooks called and the connection is
+ establised as admin."""
+
+ self.restore_database(db_id)
+ repo = self.get_repo(startup=True)
+ cnx = self.get_cnx()
+ return repo, cnx
+
+ @property
+ def system_source(self):
+ sources = self.config.sources()
+ return sources['system']
+
+ @property
+ def dbname(self):
+ return self.system_source['db-name']
+
+ def init_test_database():
+ """actual initialisation of the database"""
+ raise ValueError('no initialization function for driver %r' % driver)
+
+ def has_cache(self, db_id):
+ """Check if a given database id exist in cb cache for the current config"""
+ cache_glob = self.absolute_backup_file('*', '*')
+ if cache_glob not in self.explored_glob:
+ self.discover_cached_db()
+ return self.db_cache_key(db_id) in self.db_cache
+
+ def discover_cached_db(self):
+ """Search available db_if for the current config"""
+ cache_glob = self.absolute_backup_file('*', '*')
+ directory = os.path.dirname(cache_glob)
+ entries={}
+ candidates = glob.glob(cache_glob)
+ for filepath in candidates:
+ data = os.path.basename(filepath)
+ # database backup are in the forms are <dbname>-<db_id>.<backtype>
+ dbname, data = data.split('-', 1)
+ db_id, filetype = data.split('.', 1)
+ entries.setdefault((dbname, db_id), {})[filetype] = filepath
+ for (dbname, db_id), entry in entries.iteritems():
+ # apply necessary transformation from the driver
+ value = self.process_cache_entry(directory, dbname, db_id, entry)
+ assert 'config' in entry
+ if value is not None: # None value means "not handled by this driver
+ # XXX Ignored value are shadowed to other Handler if cache are common.
+ key = self.db_cache_key(db_id, dbname=dbname)
+ self.db_cache[key] = value, entry['config']
+ self.explored_glob.add(cache_glob)
+
+ def process_cache_entry(self, directory, dbname, db_id, entry):
+ """Transforms potential cache entry to proper backup coordinate
+
+ entry argument is a "filetype" -> "filepath" mapping
+ Return None if an entry should be ignored."""
+ return None
+
+ def build_db_cache(self, test_db_id=DEFAULT_EMPTY_DB_ID, pre_setup_func=None):
+ """Build Database cache for ``test_db_id`` if a cache doesn't exist
+
+ if ``test_db_id is DEFAULT_EMPTY_DB_ID`` self.init_test_database is
+ called. otherwise, DEFAULT_EMPTY_DB_ID is build/restored and
+ ``pre_setup_func`` to setup the database.
+
+ This function backup any database it build"""
+
+ if self.has_cache(test_db_id):
+ return #test_db_id, 'already in cache'
+ if test_db_id is DEFAULT_EMPTY_DB_ID:
+ self.init_test_database()
else:
- raise ValueError('no initialization function for driver %r' % driver)
- config._cubes = None # avoid assertion error
- repo, cnx = in_memory_repo_cnx(config, unicode(sources['admin']['login']),
- password=sources['admin']['password'] or 'xxx')
- if driver == 'sqlite':
- install_sqlite_patch(repo.querier)
- return repo, cnx
-
-def reset_test_database(config):
- """init a test database for a specific driver"""
- if not config.db_require_setup:
- return
- driver = config.sources()['system']['db-driver']
- if driver == 'sqlite':
- reset_test_database_sqlite(config)
- elif driver == 'postgres':
- init_test_database_postgres(config)
- else:
- raise ValueError('no reset function for driver %r' % driver)
-
+ print 'Building %s for database %s' % (test_db_id, self.dbname)
+ self.build_db_cache(DEFAULT_EMPTY_DB_ID)
+ self.restore_database(DEFAULT_EMPTY_DB_ID)
+ repo = self.get_repo(startup=True)
+ cnx = self.get_cnx()
+ session = repo._sessions[cnx.sessionid]
+ session.set_pool()
+ _commit = session.commit
+ def always_pooled_commit():
+ _commit()
+ session.set_pool()
+ session.commit = always_pooled_commit
+ pre_setup_func(session, self.config)
+ session.commit()
+ cnx.close()
+ self.backup_database(test_db_id)
### postgres test database handling ############################################
-def init_test_database_postgres(config):
- """initialize a fresh postgresql databse used for testing purpose"""
- from logilab.database import get_db_helper
- from cubicweb.server import init_repository
- from cubicweb.server.serverctl import (createdb, system_source_cnx,
- _db_sys_cnx)
- source = config.sources()['system']
- dbname = source['db-name']
- templdbname = dbname + '_template'
- helper = get_db_helper('postgres')
- # connect on the dbms system base to create our base
- dbcnx = _db_sys_cnx(source, 'CREATE DATABASE and / or USER', verbose=0)
- cursor = dbcnx.cursor()
- try:
- if dbname in helper.list_databases(cursor):
- cursor.execute('DROP DATABASE %s' % dbname)
- if not templdbname in helper.list_databases(cursor):
- source['db-name'] = templdbname
- createdb(helper, source, dbcnx, cursor)
- dbcnx.commit()
- cnx = system_source_cnx(source, special_privs='LANGUAGE C', verbose=0)
+class PostgresTestDataBaseHandler(TestDataBaseHandler):
+
+ # XXX
+ # XXX PostgresTestDataBaseHandler Have not been tested at all.
+ # XXX
+ DRIVER = 'postgres'
+
+ @property
+ @cached
+ def helper(self):
+ from logilab.database import get_db_helper
+ return get_db_helper('postgres')
+
+ @property
+ @cached
+ def dbcnx(self):
+ from cubicweb.server.serverctl import _db_sys_cnx
+ return _db_sys_cnx(self.system_source, 'CREATE DATABASE and / or USER', verbose=0)
+
+ @property
+ @cached
+ def cursor(self):
+ return self.dbcnx.cursor()
+
+ def init_test_database(self):
+ """initialize a fresh postgresql databse used for testing purpose"""
+ from cubicweb.server import init_repository
+ from cubicweb.server.serverctl import system_source_cnx, createdb
+ # connect on the dbms system base to create our base
+ try:
+ self._drop(self.dbname)
+
+ createdb(self.helper, self.system_source, self.dbcnx, self.cursor)
+ self.dbcnx.commit()
+ cnx = system_source_cnx(self.system_source, special_privs='LANGUAGE C', verbose=0)
templcursor = cnx.cursor()
- # XXX factorize with db-create code
- helper.init_fti_extensions(templcursor)
- # install plpythonu/plpgsql language if not installed by the cube
- langs = sys.platform == 'win32' and ('plpgsql',) or ('plpythonu', 'plpgsql')
- for extlang in langs:
- helper.create_language(templcursor, extlang)
- cnx.commit()
- templcursor.close()
- cnx.close()
- init_repository(config, interactive=False)
- source['db-name'] = dbname
- except:
- dbcnx.rollback()
- # XXX drop template
- raise
- createdb(helper, source, dbcnx, cursor, template=templdbname)
- dbcnx.commit()
- dbcnx.close()
+ try:
+ # XXX factorize with db-create code
+ self.helper.init_fti_extensions(templcursor)
+ # install plpythonu/plpgsql language if not installed by the cube
+ langs = sys.platform == 'win32' and ('plpgsql',) or ('plpythonu', 'plpgsql')
+ for extlang in langs:
+ self.helper.create_language(templcursor, extlang)
+ cnx.commit()
+ finally:
+ templcursor.close()
+ cnx.close()
+ init_repository(self.config, interactive=False)
+ except:
+ self.dbcnx.rollback()
+ print >> sys.stderr, 'building', self.dbname, 'failed'
+ #self._drop(self.dbname)
+ raise
+
+ def helper_clear_cache(self):
+ self.dbcnx.commit()
+ self.dbcnx.close()
+ clear_cache(self, 'dbcnx')
+ clear_cache(self, 'helper')
+ clear_cache(self, 'cursor')
+
+ def __del__(self):
+ self.helper_clear_cache()
+
+ @property
+ def _config_id(self):
+ return hashlib.sha1(self.config.apphome).hexdigest()[:10]
+
+ def _backup_name(self, db_id): # merge me with parent
+ backup_name = '_'.join(('cache', self._config_id, self.dbname, db_id))
+ return backup_name.lower()
+
+ def _drop(self, db_name):
+ if db_name in self.helper.list_databases(self.cursor):
+ #print 'dropping overwritted database:', db_name
+ self.cursor.execute('DROP DATABASE %s' % db_name)
+ self.dbcnx.commit()
+
+ def _backup_database(self, db_id):
+ """Actual backup the current database.
+
+ return a value to be stored in db_cache to allow restoration"""
+ from cubicweb.server.serverctl import createdb
+ orig_name = self.system_source['db-name']
+ try:
+ backup_name = self._backup_name(db_id)
+ #print 'storing postgres backup as', backup_name
+ self._drop(backup_name)
+ self.system_source['db-name'] = backup_name
+ createdb(self.helper, self.system_source, self.dbcnx, self.cursor, template=orig_name)
+ self.dbcnx.commit()
+ return backup_name
+ finally:
+ self.system_source['db-name'] = orig_name
+
+ def _restore_database(self, backup_coordinates, config):
+ from cubicweb.server.serverctl import createdb
+ """Actual restore of the current database.
+
+ Use the value tostored in db_cache as input """
+ #print 'restoring postgrest backup from', backup_coordinates
+ self._drop(self.dbname)
+ createdb(self.helper, self.system_source, self.dbcnx, self.cursor,
+ template=backup_coordinates)
+ self.dbcnx.commit()
+
+
### sqlserver2005 test database handling #######################################
-def init_test_database_sqlserver2005(config):
- """initialize a fresh sqlserver databse used for testing purpose"""
- if config.init_repository:
- from cubicweb.server import init_repository
- init_repository(config, interactive=False, drop=True)
+class SQLServerTestDataBaseHandler(TestDataBaseHandler):
+ DRIVER = 'sqlserver'
+
+ # XXX complete me
+
+ def init_test_database(self):
+ """initialize a fresh sqlserver databse used for testing purpose"""
+ if self.config.init_repository:
+ from cubicweb.server import init_repository
+ init_repository(config, interactive=False, drop=True)
### sqlite test database handling ##############################################
-def cleanup_sqlite(dbfile, removetemplate=False):
- try:
- os.remove(dbfile)
- os.remove('%s-journal' % dbfile)
- except OSError:
- pass
- if removetemplate:
+class SQLiteTestDataBaseHandler(TestDataBaseHandler):
+ DRIVER = 'sqlite'
+
+ @staticmethod
+ def _cleanup_database(dbfile):
try:
- os.remove('%s-template' % dbfile)
+ os.remove(dbfile)
+ os.remove('%s-journal' % dbfile)
except OSError:
pass
-def reset_test_database_sqlite(config):
- import shutil
- dbfile = config.sources()['system']['db-name']
- cleanup_sqlite(dbfile)
- template = '%s-template' % dbfile
- if exists(template):
- shutil.copy(template, dbfile)
- return True
- return False
+ def absolute_dbfile(self):
+ """absolute path of current database file"""
+ dbfile = join(self._ensure_test_backup_db_dir(),
+ self.config.sources()['system']['db-name'])
+ self.config.sources()['system']['db-name'] = dbfile
+ return dbfile
+
+
+ def process_cache_entry(self, directory, dbname, db_id, entry):
+ return entry.get('sqlite')
-def init_test_database_sqlite(config):
- """initialize a fresh sqlite databse used for testing purpose"""
- # remove database file if it exists
- dbfile = join(config.apphome, config.sources()['system']['db-name'])
- config.sources()['system']['db-name'] = dbfile
- if not reset_test_database_sqlite(config):
+ def _backup_database(self, db_id=DEFAULT_EMPTY_DB_ID):
+ # XXX remove database file if it exists ???
+ dbfile = self.absolute_dbfile()
+ backup_file = self.absolute_backup_file(db_id, 'sqlite')
+ shutil.copy(dbfile, backup_file)
+ # Usefull to debug WHO write a database
+ # backup_stack = self.absolute_backup_file(db_id, '.stack')
+ #with open(backup_stack, 'w') as backup_stack_file:
+ # import traceback
+ # traceback.print_stack(file=backup_stack_file)
+ return backup_file
+
+ def _new_repo(self, config):
+ repo = super(SQLiteTestDataBaseHandler, self)._new_repo(config)
+ install_sqlite_patch(repo.querier)
+ return repo
+
+ def _restore_database(self, backup_coordinates, _config):
+ # remove database file if it exists ?
+ dbfile = self.absolute_dbfile()
+ self._cleanup_database(dbfile)
+ #print 'resto from', backup_coordinates
+ shutil.copy(backup_coordinates, dbfile)
+ repo = self.get_repo()
+
+ def init_test_database(self):
+ """initialize a fresh sqlite databse used for testing purpose"""
# initialize the database
- import shutil
from cubicweb.server import init_repository
- init_repository(config, interactive=False)
- shutil.copy(dbfile, '%s-template' % dbfile)
+ self._cleanup_database(self.absolute_dbfile())
+ init_repository(self.config, interactive=False)
+
def install_sqlite_patch(querier):
"""This patch hotfixes the following sqlite bug :
@@ -379,3 +693,74 @@
return new_execute
querier.__class__.execute = wrap_execute(querier.__class__.execute)
querier.__class__._devtools_sqlite_patched = True
+
+
+
+HANDLERS = {}
+
+def register_handler(handlerkls):
+ assert handlerkls is not None
+ HANDLERS[handlerkls.DRIVER] = handlerkls
+
+register_handler(PostgresTestDataBaseHandler)
+register_handler(SQLiteTestDataBaseHandler)
+register_handler(SQLServerTestDataBaseHandler)
+
+
+class HCache(object):
+ """Handler cache object: store database handler for a given configuration.
+
+ We only keep one repo in cache to prevent too much objects to stay alive
+ (database handler holds a reference to a repository). As at the moment a new
+ handler is created for each TestCase class and all test methods are executed
+ sequentialy whithin this class, there should not have more cache miss that
+ if we had a wider cache as once a Handler stop being used it won't be used
+ again.
+ """
+
+ def __init__(self):
+ self.config = None
+ self.handler = None
+
+ def get(self, config):
+ if config is self.config:
+ return self.handler
+ else:
+ return None
+
+ def set(self, config, handler):
+ self.config = config
+ self.handler = handler
+
+HCACHE = HCache()
+
+
+# XXX a class method on Test ?
+def get_test_db_handler(config):
+ handler = HCACHE.get(config)
+ if handler is not None:
+ return handler
+ sources = config.sources()
+ driver = sources['system']['db-driver']
+ key = (driver, config)
+ handlerkls = HANDLERS.get(driver, None)
+ if handlerkls is not None:
+ handler = handlerkls(config)
+ HCACHE.set(config, handler)
+ return handler
+ else:
+ raise ValueError('no initialization function for driver %r' % driver)
+
+### compatibility layer ##############################################
+from logilab.common.deprecation import deprecated
+
+@deprecated("please use the new DatabaseHandler mecanism")
+def init_test_database(config=None, configdir='data', apphome=None):
+ """init a test database for a specific driver"""
+ if config is None:
+ config = TestServerConfiguration(apphome=apphome)
+ handler = get_test_db_handler(config)
+ handler.build_db_cache()
+ return handler.get_repo_and_cnx()
+
+
--- a/devtools/testlib.py Wed Mar 16 09:45:57 2011 +0100
+++ b/devtools/testlib.py Wed Mar 16 18:12:57 2011 +0100
@@ -49,7 +49,7 @@
from cubicweb.server.session import security_enabled
from cubicweb.server.hook import SendMailOp
from cubicweb.devtools import SYSTEM_ENTITIES, SYSTEM_RELATIONS, VIEW_VALIDATORS
-from cubicweb.devtools import BASE_URL, fake, htmlparser
+from cubicweb.devtools import BASE_URL, fake, htmlparser, DEFAULT_EMPTY_DB_ID
from cubicweb.utils import json
# low-level utilities ##########################################################
@@ -61,7 +61,8 @@
def do_view(self, arg):
import webbrowser
data = self._getval(arg)
- file('/tmp/toto.html', 'w').write(data)
+ with file('/tmp/toto.html', 'w') as toto:
+ toto.write(data)
webbrowser.open('file:///tmp/toto.html')
def line_context_filter(line_no, center, before=3, after=None):
@@ -83,22 +84,6 @@
protected_entities = yams.schema.BASE_TYPES.union(SYSTEM_ENTITIES)
return set(schema.entities()) - protected_entities
-def refresh_repo(repo, resetschema=False, resetvreg=False):
- for pool in repo.pools:
- pool.close(True)
- repo.system_source.shutdown()
- devtools.reset_test_database(repo.config)
- for pool in repo.pools:
- pool.reconnect()
- repo._type_source_cache = {}
- repo._extid_cache = {}
- repo.querier._rql_cache = {}
- for source in repo.sources:
- source.reset_caches()
- if resetschema:
- repo.set_schema(repo.config.load_schema(), resetvreg=resetvreg)
-
-
# email handling, to test emails sent by an application ########################
MAILBOX = []
@@ -191,6 +176,19 @@
configcls = devtools.ApptestConfiguration
reset_schema = reset_vreg = False # reset schema / vreg between tests
tags = TestCase.tags | Tags('cubicweb', 'cw_repo')
+ test_db_id = DEFAULT_EMPTY_DB_ID
+ _cnxs = set() # establised connection
+ _cnx = None # current connection
+
+ # Too much complicated stuff. the class doesn't need to bear the repo anymore
+ @classmethod
+ def set_cnx(cls, cnx):
+ cls._cnxs.add(cnx)
+ cls._cnx = cnx
+
+ @property
+ def cnx(self):
+ return self.__class__._cnx
@classproperty
def config(cls):
@@ -199,6 +197,7 @@
Configuration is cached on the test class.
"""
try:
+ assert not cls is CubicWebTC, "Don't use CubicWebTC directly to prevent database caching issue"
return cls.__dict__['_config']
except KeyError:
home = abspath(join(dirname(sys.modules[cls.__module__].__file__), cls.appid))
@@ -237,35 +236,32 @@
except: # not in server only configuration
pass
+ #XXX this doesn't need to a be classmethod anymore
@classmethod
def _init_repo(cls):
"""init the repository and connection to it.
+ """
+ # setup configuration for test
+ cls.init_config(cls.config)
+ # get or restore and working db.
+ db_handler = devtools.get_test_db_handler(cls.config)
+ db_handler.build_db_cache(cls.test_db_id, cls.pre_setup_database)
- Repository and connection are cached on the test class. Once
- initialized, we simply reset connections and repository caches.
- """
- if not 'repo' in cls.__dict__:
- cls._build_repo()
- else:
- try:
- cls.cnx.rollback()
- except ProgrammingError:
- pass
- cls._refresh_repo()
-
- @classmethod
- def _build_repo(cls):
- cls.repo, cls.cnx = devtools.init_test_database(config=cls.config)
- cls.init_config(cls.config)
- cls.repo.hm.call_hooks('server_startup', repo=cls.repo)
+ cls.repo, cnx = db_handler.get_repo_and_cnx(cls.test_db_id)
+ # no direct assignation to cls.cnx anymore.
+ # cnx is now an instance property that use a class protected attributes.
+ cls.set_cnx(cnx)
cls.vreg = cls.repo.vreg
- cls.websession = DBAPISession(cls.cnx, cls.admlogin)
- cls._orig_cnx = (cls.cnx, cls.websession)
+ cls.websession = DBAPISession(cnx, cls.admlogin)
+ cls._orig_cnx = (cnx, cls.websession)
cls.config.repository = lambda x=None: cls.repo
- @classmethod
- def _refresh_repo(cls):
- refresh_repo(cls.repo, cls.reset_schema, cls.reset_vreg)
+ def _close_cnx(self):
+ for cnx in list(self._cnxs):
+ if not cnx._closed:
+ cnx.rollback()
+ cnx.close()
+ self._cnxs.remove(cnx)
# global resources accessors ###############################################
@@ -307,34 +303,47 @@
def setUp(self):
# monkey patch send mail operation so emails are sent synchronously
- self._old_mail_postcommit_event = SendMailOp.postcommit_event
- SendMailOp.postcommit_event = SendMailOp.sendmails
+ self._patch_SendMailOp()
pause_tracing()
previous_failure = self.__class__.__dict__.get('_repo_init_failed')
if previous_failure is not None:
self.skipTest('repository is not initialised: %r' % previous_failure)
try:
self._init_repo()
+ self.addCleanup(self._close_cnx)
except Exception, ex:
self.__class__._repo_init_failed = ex
raise
resume_tracing()
- self._cnxs = []
self.setup_database()
self.commit()
MAILBOX[:] = [] # reset mailbox
def tearDown(self):
- if not self.cnx._closed:
- self.cnx.rollback()
- for cnx in self._cnxs:
- if not cnx._closed:
- cnx.close()
- SendMailOp.postcommit_event = self._old_mail_postcommit_event
+ # XXX hack until logilab.common.testlib is fixed
+ while self._cleanups:
+ cleanup, args, kwargs = self._cleanups.pop(-1)
+ cleanup(*args, **kwargs)
+
+ def _patch_SendMailOp(self):
+ # monkey patch send mail operation so emails are sent synchronously
+ _old_mail_postcommit_event = SendMailOp.postcommit_event
+ SendMailOp.postcommit_event = SendMailOp.sendmails
+ def reverse_SendMailOp_monkey_patch():
+ SendMailOp.postcommit_event = _old_mail_postcommit_event
+ self.addCleanup(reverse_SendMailOp_monkey_patch)
def setup_database(self):
"""add your database setup code by overriding this method"""
+ @classmethod
+ def pre_setup_database(cls, session, config):
+ """add your pre database setup code by overriding this method
+
+ Do not forget to set the cls.test_db_id value to enable caching of the
+ result.
+ """
+
# user / session management ###############################################
def user(self, req=None):
@@ -371,9 +380,8 @@
autoclose = kwargs.pop('autoclose', True)
if not kwargs:
kwargs['password'] = str(login)
- self.cnx = repo_connect(self.repo, unicode(login), **kwargs)
+ self.set_cnx(repo_connect(self.repo, unicode(login), **kwargs))
self.websession = DBAPISession(self.cnx)
- self._cnxs.append(self.cnx)
if login == self.vreg.config.anonymous_user()[0]:
self.cnx.anonymous_connection = True
if autoclose:
@@ -384,11 +392,8 @@
if not self.cnx is self._orig_cnx[0]:
if not self.cnx._closed:
self.cnx.close()
- try:
- self._cnxs.remove(self.cnx)
- except ValueError:
- pass
- self.cnx, self.websession = self._orig_cnx
+ cnx, self.websession = self._orig_cnx
+ self.set_cnx(cnx)
# db api ##################################################################
@@ -954,6 +959,8 @@
"""base class for test with auto-populating of the database"""
__abstract__ = True
+ test_db_id = 'autopopulate'
+
tags = CubicWebTC.tags | Tags('autopopulated')
pdbclass = CubicWebDebugger
@@ -1087,7 +1094,9 @@
tags = AutoPopulateTest.tags | Tags('web', 'generated')
def setUp(self):
- AutoPopulateTest.setUp(self)
+ assert not self.__class__ is AutomaticWebTest, 'Please subclass AutomaticWebTest to pprevent database caching issue'
+ super(AutomaticWebTest, self).setUp()
+
# access to self.app for proper initialization of the authentication
# machinery (else some views may fail)
self.app
--- a/doc/book/en/devrepo/testing.rst Wed Mar 16 09:45:57 2011 +0100
+++ b/doc/book/en/devrepo/testing.rst Wed Mar 16 18:12:57 2011 +0100
@@ -292,6 +292,27 @@
Take care to not let the imported `AutomaticWebTest` in your test module
namespace, else both your subclass *and* this parent class will be run.
+Cache heavy database setup
+-------------------------------
+
+Some tests suite require a complex setup of the database that takes seconds (or
+event minutes) to complete. Doing the whole setup for all individual tests make
+the whole run very slow. The ``CubicWebTC`` class offer a simple way to prepare
+specific database once for multiple tests. The `test_db_id` class attribute of
+your ``CubicWebTC`` must be set a unique identifier and the
+:meth:`pre_setup_database` class method build the cached content. As the
+:meth:`pre_setup_database` method is not grantee to be called, you must not set
+any class attribut to be used during test there. Databases for each `test_db_id`
+are automatically created if not already in cache. Clearing the cache is up to
+the user. Cache files are found in the :file:`data/database` subdirectory of your
+test directory.
+
+.. warning::
+
+ Take care to always have the same :meth:`pre_setup_database` function for all
+ call with a given `test_db_id` otherwise you test will have unpredictable
+ result given the first encountered one.
+
Testing on a real-life database
-------------------------------
--- a/hooks/test/unittest_syncschema.py Wed Mar 16 09:45:57 2011 +0100
+++ b/hooks/test/unittest_syncschema.py Wed Mar 16 18:12:57 2011 +0100
@@ -30,18 +30,10 @@
class SchemaModificationHooksTC(CubicWebTC):
reset_schema = True
- @classmethod
- def init_config(cls, config):
- super(SchemaModificationHooksTC, cls).init_config(config)
- # we have to read schema from the database to get eid for schema entities
- config._cubes = None
- cls.repo.fill_schema()
- cls.schema_eids = schema_eids_idx(cls.repo.schema)
-
- @classmethod
- def _refresh_repo(cls):
- super(SchemaModificationHooksTC, cls)._refresh_repo()
- restore_schema_eids_idx(cls.repo.schema, cls.schema_eids)
+ def setUp(self):
+ super(SchemaModificationHooksTC, self).setUp()
+ self.repo.fill_schema()
+ self.__class__.schema_eids = schema_eids_idx(self.repo.schema)
def index_exists(self, etype, attr, unique=False):
self.session.set_pool()
--- a/server/test/unittest_ldapuser.py Wed Mar 16 09:45:57 2011 +0100
+++ b/server/test/unittest_ldapuser.py Wed Mar 16 18:12:57 2011 +0100
@@ -24,10 +24,11 @@
import subprocess
from socket import socket, error as socketerror
-from logilab.common.testlib import TestCase, unittest_main, mock_object
+from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags
from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.devtools.repotest import RQLGeneratorTC
from cubicweb.devtools.httptest import get_available_port
+from cubicweb.devtools import get_test_db_handler
from cubicweb.server.sources.ldapuser import *
@@ -64,26 +65,10 @@
def setUpModule(*args):
create_slapd_configuration(LDAPUserSourceTC.config)
- global repo
- try:
- LDAPUserSourceTC._init_repo()
- repo = LDAPUserSourceTC.repo
- add_ldap_source(LDAPUserSourceTC.cnx)
- except:
- terminate_slapd()
- raise
def tearDownModule(*args):
- global repo
- repo.shutdown()
- del repo
terminate_slapd()
-def add_ldap_source(cnx):
- cnx.request().create_entity('CWSource', name=u'ldapuser', type=u'ldapuser',
- config=CONFIG)
- cnx.commit()
-
def create_slapd_configuration(config):
global slapd_process, CONFIG
basedir = join(config.apphome, "ldapdb")
@@ -127,10 +112,19 @@
os.kill(slapd_process.pid, signal.SIGTERM)
slapd_process.wait()
print "DONE"
-
del slapd_process
class LDAPUserSourceTC(CubicWebTC):
+ test_db_id = 'ldap-user'
+ tags = CubicWebTC.tags | Tags(('ldap'))
+
+ @classmethod
+ def pre_setup_database(cls, session, config):
+ session.create_entity('CWSource', name=u'ldapuser', type=u'ldapuser',
+ config=CONFIG)
+ session.commit()
+ # XXX keep it there
+ session.execute('CWUser U')
def patch_authenticate(self):
self._orig_authenticate = LDAPUserSource.authenticate
@@ -275,14 +269,16 @@
self.session.create_entity('CWGroup', name=u'bougloup2')
self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"')
self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s', {'syt': SYT})
- rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")')
+ rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, '
+ 'S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")')
self.assertEqual(rset.rows, [['admin', 'activated'], [SYT, 'activated']])
def test_exists2(self):
self.create_user('comme')
self.create_user('cochon')
self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
- rset = self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, (G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))')
+ rset = self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, '
+ '(G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))')
self.assertEqual(rset.rows, [['managers'], ['users']])
def test_exists3(self):
@@ -292,7 +288,8 @@
self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"'))
self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': SYT})
self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': SYT}))
- rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon"))')
+ rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" '
+ 'OR EXISTS(X copain T, T login in ("comme", "cochon"))')
self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', SYT]])
def test_exists4(self):
@@ -397,8 +394,10 @@
def test_nonregr5(self):
# original jpl query:
- # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5
- rql = 'Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "%s", P is X, X creation_date CD' % self.session.user.login
+ # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser,
+ # U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5
+ rql = ('Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, '
+ 'U login "%s", P is X, X creation_date CD') % self.session.user.login
self.sexecute(rql, )#{'x': })
def test_nonregr6(self):
@@ -445,9 +444,20 @@
class RQL2LDAPFilterTC(RQLGeneratorTC):
+ tags = RQLGeneratorTC.tags | Tags(('ldap'))
+
+ @property
+ def schema(self):
+ """return the application schema"""
+ return self._schema
+
def setUp(self):
- self.schema = repo.schema
- RQLGeneratorTC.setUp(self)
+ self.handler = get_test_db_handler(LDAPUserSourceTC.config)
+ self.handler.build_db_cache('ldap-user', LDAPUserSourceTC.pre_setup_database)
+ self.handler.restore_database('ldap-user')
+ self._repo = repo = self.handler.get_repo()
+ self._schema = repo.schema
+ super(RQL2LDAPFilterTC, self).setUp()
ldapsource = repo.sources[-1]
self.pool = repo._get_pool()
session = mock_object(pool=self.pool)
@@ -455,8 +465,8 @@
self.ldapclasses = ''.join(ldapsource.base_filters)
def tearDown(self):
- repo._free_pool(self.pool)
- RQLGeneratorTC.tearDown(self)
+ self._repo.turn_repo_off()
+ super(RQL2LDAPFilterTC, self).tearDown()
def test_base(self):
rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0]
--- a/server/test/unittest_migractions.py Wed Mar 16 09:45:57 2011 +0100
+++ b/server/test/unittest_migractions.py Wed Mar 16 18:12:57 2011 +0100
@@ -45,9 +45,10 @@
tags = CubicWebTC.tags | Tags(('server', 'migration', 'migractions'))
@classmethod
- def init_config(cls, config):
- super(MigrationCommandsTC, cls).init_config(config)
+ def _init_repo(cls):
+ super(MigrationCommandsTC, cls)._init_repo()
# we have to read schema from the database to get eid for schema entities
+ config = cls.config
config._cubes = None
cls.repo.fill_schema()
cls.origschema = deepcopy(cls.repo.schema)
@@ -60,18 +61,6 @@
config._apphome = cls.datadir
assert 'Folder' in migrschema
- @classmethod
- def _refresh_repo(cls):
- super(MigrationCommandsTC, cls)._refresh_repo()
- cls.repo.set_schema(deepcopy(cls.origschema), resetvreg=False)
- # reset migration schema eids
- for eschema in migrschema.entities():
- eschema.eid = None
- for rschema in migrschema.relations():
- rschema.eid = None
- for rdef in rschema.rdefs.values():
- rdef.eid = None
-
def setUp(self):
CubicWebTC.setUp(self)
self.mh = ServerMigrationHelper(self.repo.config, migrschema,
--- a/server/test/unittest_multisources.py Wed Mar 16 09:45:57 2011 +0100
+++ b/server/test/unittest_multisources.py Wed Mar 16 18:12:57 2011 +0100
@@ -20,9 +20,9 @@
from itertools import repeat
from cubicweb.devtools import TestServerConfiguration, init_test_database
-from cubicweb.devtools.testlib import CubicWebTC, refresh_repo
+from cubicweb.devtools.testlib import CubicWebTC, Tags
from cubicweb.devtools.repotest import do_monkey_patch, undo_monkey_patch
-
+from cubicweb.devtools import get_test_db_handler
class ExternalSource1Configuration(TestServerConfiguration):
sourcefile = 'sources_extern'
@@ -52,81 +52,97 @@
repeat(u'write')))
-def setUpModule(*args):
- global repo2, cnx2, repo3, cnx3
- cfg1 = ExternalSource1Configuration('data', apphome=TwoSourcesTC.datadir)
- repo2, cnx2 = init_test_database(config=cfg1)
- cfg2 = ExternalSource2Configuration('data', apphome=TwoSourcesTC.datadir)
- repo3, cnx3 = init_test_database(config=cfg2)
- src = cnx3.request().create_entity('CWSource', name=u'extern',
- type=u'pyrorql', config=EXTERN_SOURCE_CFG)
- cnx3.commit() # must commit before adding the mapping
- add_extern_mapping(src)
- cnx3.commit()
+def pre_setup_database_extern(session, config):
+ session.execute('INSERT Card X: X title "C3: An external card", X wikiid "aaa"')
+ session.execute('INSERT Card X: X title "C4: Ze external card", X wikiid "zzz"')
+ session.execute('INSERT Affaire X: X ref "AFFREF"')
+ session.commit()
- TestServerConfiguration.no_sqlite_wrap = True
- # hi-jack PyroRQLSource.get_connection to access existing connection (no
- # pyro connection)
- PyroRQLSource.get_connection = lambda x: x.uri == 'extern-multi' and cnx3 or cnx2
- # also necessary since the repository is closing its initial connections
- # pool though we want to keep cnx2 valid
- Connection.close = lambda x: None
-
-def tearDownModule(*args):
- PyroRQLSource.get_connection = PyroRQLSource_get_connection
- Connection.close = Connection_close
- global repo2, cnx2, repo3, cnx3
- repo2.shutdown()
- repo3.shutdown()
- del repo2, cnx2, repo3, cnx3
- #del TwoSourcesTC.config.vreg
- #del TwoSourcesTC.config
- TestServerConfiguration.no_sqlite_wrap = False
+def pre_setup_database_multi(session, config):
+ session.create_entity('CWSource', name=u'extern', type=u'pyrorql',
+ config=EXTERN_SOURCE_CFG)
+ session.commit()
class TwoSourcesTC(CubicWebTC):
"""Main repo -> extern-multi -> extern
\-------------/
"""
+ test_db_id= 'cw-server-multisources'
+ tags = CubicWebTC.tags | Tags(('multisources'))
+
@classmethod
- def _refresh_repo(cls):
- super(TwoSourcesTC, cls)._refresh_repo()
- cnx2.rollback()
- refresh_repo(repo2)
- cnx3.rollback()
- refresh_repo(repo3)
+ def setUpClass(cls):
+ cls._cfg2 = ExternalSource1Configuration('data', apphome=TwoSourcesTC.datadir)
+ cls._cfg3 = ExternalSource2Configuration('data', apphome=TwoSourcesTC.datadir)
+ TestServerConfiguration.no_sqlite_wrap = True
+ # hi-jack PyroRQLSource.get_connection to access existing connection (no
+ # pyro connection)
+ PyroRQLSource.get_connection = lambda x: x.uri == 'extern-multi' and cls.cnx3 or cls.cnx2
+ # also necessary since the repository is closing its initial connections
+ # pool though we want to keep cnx2 valid
+ Connection.close = lambda x: None
+
+ @classmethod
+ def tearDowncls(cls):
+ PyroRQLSource.get_connection = PyroRQLSource_get_connection
+ Connection.close = Connection_close
+ cls.cnx2.close()
+ cls.cnx3.close()
+ TestServerConfiguration.no_sqlite_wrap = False
+
+
+ @classmethod
+ def _init_repo(cls):
+ repo2_handler = get_test_db_handler(cls._cfg2)
+ repo2_handler.build_db_cache('4cards-1affaire',pre_setup_func=pre_setup_database_extern)
+ cls.repo2, cls.cnx2 = repo2_handler.get_repo_and_cnx('4cards-1affaire')
+
+ repo3_handler = get_test_db_handler(cls._cfg3)
+ repo3_handler.build_db_cache('multisource',pre_setup_func=pre_setup_database_multi)
+ cls.repo3, cls.cnx3 = repo3_handler.get_repo_and_cnx('multisource')
+
+
+ super(TwoSourcesTC, cls)._init_repo()
def setUp(self):
CubicWebTC.setUp(self)
+ self.addCleanup(self.cnx2.close)
+ self.addCleanup(self.cnx3.close)
do_monkey_patch()
def tearDown(self):
for source in self.repo.sources[1:]:
self.repo.remove_source(source.uri)
CubicWebTC.tearDown(self)
+ self.cnx2.close()
+ self.cnx3.close()
undo_monkey_patch()
- def setup_database(self):
- cu = cnx2.cursor()
- self.ec1 = cu.execute('INSERT Card X: X title "C3: An external card", X wikiid "aaa"')[0][0]
- cu.execute('INSERT Card X: X title "C4: Ze external card", X wikiid "zzz"')
- self.aff1 = cu.execute('INSERT Affaire X: X ref "AFFREF"')[0][0]
- cnx2.commit()
- for uri, config in [('extern', EXTERN_SOURCE_CFG),
+ @staticmethod
+ def pre_setup_database(session, config):
+ for uri, src_config in [('extern', EXTERN_SOURCE_CFG),
('extern-multi', '''
pyro-ns-id = extern-multi
cubicweb-user = admin
cubicweb-password = gingkow
''')]:
- source = self.request().create_entity(
- 'CWSource', name=unicode(uri), type=u'pyrorql',
- config=unicode(config))
- self.commit() # must commit before adding the mapping
+ source = session.create_entity('CWSource', name=unicode(uri),
+ type=u'pyrorql',
+ config=unicode(src_config))
+ session.commit()
add_extern_mapping(source)
- self.commit()
+
+ session.commit()
# trigger discovery
- self.sexecute('Card X')
- self.sexecute('Affaire X')
- self.sexecute('State X')
+ session.execute('Card X')
+ session.execute('Affaire X')
+ session.execute('State X')
+
+ def setup_database(self):
+ cu2 = self.cnx2.cursor()
+ self.ec1 = cu2.execute('Any X WHERE X is Card, X title "C3: An external card", X wikiid "aaa"')[0][0]
+ self.aff1 = cu2.execute('Any X WHERE X is Affaire, X ref "AFFREF"')[0][0]
+ cu2.close()
# add some entities
self.ic1 = self.sexecute('INSERT Card X: X title "C1: An internal card", X wikiid "aaai"')[0][0]
self.ic2 = self.sexecute('INSERT Card X: X title "C2: Ze internal card", X wikiid "zzzi"')[0][0]
@@ -186,25 +202,25 @@
Connection_close(cnx.cnx) # cnx is a TestCaseConnectionProxy
def test_synchronization(self):
- cu = cnx2.cursor()
+ cu = self.cnx2.cursor()
assert cu.execute('Any X WHERE X eid %(x)s', {'x': self.aff1})
cu.execute('SET X ref "BLAH" WHERE X eid %(x)s', {'x': self.aff1})
aff2 = cu.execute('INSERT Affaire X: X ref "AFFREUX"')[0][0]
- cnx2.commit()
+ self.cnx2.commit()
try:
# force sync
self.repo.sources_by_uri['extern'].synchronize(MTIME)
self.failUnless(self.sexecute('Any X WHERE X has_text "blah"'))
self.failUnless(self.sexecute('Any X WHERE X has_text "affreux"'))
cu.execute('DELETE Affaire X WHERE X eid %(x)s', {'x': aff2})
- cnx2.commit()
+ self.cnx2.commit()
self.repo.sources_by_uri['extern'].synchronize(MTIME)
rset = self.sexecute('Any X WHERE X has_text "affreux"')
self.failIf(rset)
finally:
# restore state
cu.execute('SET X ref "AFFREF" WHERE X eid %(x)s', {'x': self.aff1})
- cnx2.commit()
+ self.cnx2.commit()
def test_simplifiable_var(self):
affeid = self.sexecute('Affaire X WHERE X ref "AFFREF"')[0][0]
@@ -234,9 +250,9 @@
def test_greater_eid(self):
rset = self.sexecute('Any X WHERE X eid > %s' % (self.ic1 - 1))
self.assertEqual(len(rset.rows), 2) # self.ic1 and self.ic2
- cu = cnx2.cursor()
+ cu = self.cnx2.cursor()
ec2 = cu.execute('INSERT Card X: X title "glup"')[0][0]
- cnx2.commit()
+ self.cnx2.commit()
# 'X eid > something' should not trigger discovery
rset = self.sexecute('Any X WHERE X eid > %s' % (self.ic1 - 1))
self.assertEqual(len(rset.rows), 2)
@@ -256,16 +272,16 @@
self.assertEqual(len(rset), 1, rset.rows)
def test_attr_unification_2(self):
- cu = cnx2.cursor()
+ cu = self.cnx2.cursor()
ec2 = cu.execute('INSERT Card X: X title "AFFREF"')[0][0]
- cnx2.commit()
+ self.cnx2.commit()
try:
c1 = self.sexecute('INSERT Card C: C title "AFFREF"')[0][0]
rset = self.sexecute('Any X,Y WHERE X is Card, Y is Affaire, X title T, Y ref T')
self.assertEqual(len(rset), 2, rset.rows)
finally:
cu.execute('DELETE Card X WHERE X eid %(x)s', {'x': ec2})
- cnx2.commit()
+ self.cnx2.commit()
def test_attr_unification_neq_1(self):
# XXX complete
@@ -317,22 +333,22 @@
self.assertSetEqual(notstates, states)
def test_absolute_url_base_url(self):
- cu = cnx2.cursor()
+ cu = self.cnx2.cursor()
ceid = cu.execute('INSERT Card X: X title "without wikiid to get eid based url"')[0][0]
- cnx2.commit()
+ self.cnx2.commit()
lc = self.sexecute('Card X WHERE X title "without wikiid to get eid based url"').get_entity(0, 0)
self.assertEqual(lc.absolute_url(), 'http://extern.org/card/eid/%s' % ceid)
cu.execute('DELETE Card X WHERE X eid %(x)s', {'x':ceid})
- cnx2.commit()
+ self.cnx2.commit()
def test_absolute_url_no_base_url(self):
- cu = cnx3.cursor()
+ cu = self.cnx3.cursor()
ceid = cu.execute('INSERT Card X: X title "without wikiid to get eid based url"')[0][0]
- cnx3.commit()
+ self.cnx3.commit()
lc = self.sexecute('Card X WHERE X title "without wikiid to get eid based url"').get_entity(0, 0)
self.assertEqual(lc.absolute_url(), 'http://testing.fr/cubicweb/card/eid/%s' % lc.eid)
cu.execute('DELETE Card X WHERE X eid %(x)s', {'x':ceid})
- cnx3.commit()
+ self.cnx3.commit()
def test_crossed_relation_noeid_needattr(self):
"""http://www.cubicweb.org/ticket/1382452"""
--- a/server/test/unittest_querier.py Wed Mar 16 09:45:57 2011 +0100
+++ b/server/test/unittest_querier.py Wed Mar 16 18:12:57 2011 +0100
@@ -27,9 +27,9 @@
from cubicweb.server.sqlutils import SQL_PREFIX
from cubicweb.server.utils import crypt_password
from cubicweb.server.sources.native import make_schema
-from cubicweb.devtools import init_test_database
+from cubicweb.devtools import get_test_db_handler, TestServerConfiguration
+
from cubicweb.devtools.repotest import tuplify, BaseQuerierTC
-
from unittest_session import Variable
@@ -64,7 +64,10 @@
def setUpModule(*args):
global repo, cnx
- repo, cnx = init_test_database(apphome=UtilsTC.datadir)
+ config = TestServerConfiguration(apphome=UtilsTC.datadir)
+ handler = get_test_db_handler(config)
+ handler.build_db_cache()
+ repo, cnx = handler.get_repo_and_cnx()
def tearDownModule(*args):
global repo, cnx
@@ -746,7 +749,7 @@
rset = self.execute('Tag X WHERE X creation_date TODAY')
self.assertEqual(len(rset.rows), 2)
rset = self.execute('Any MAX(D) WHERE X is Tag, X creation_date D')
- self.failUnless(isinstance(rset[0][0], datetime), type(rset[0][0]))
+ self.failUnless(isinstance(rset[0][0], datetime), (rset[0][0], type(rset[0][0])))
def test_today(self):
self.execute("INSERT Tag X: X name 'bidule', X creation_date TODAY")
--- a/server/test/unittest_security.py Wed Mar 16 09:45:57 2011 +0100
+++ b/server/test/unittest_security.py Wed Mar 16 18:12:57 2011 +0100
@@ -30,13 +30,12 @@
def setup_database(self):
super(BaseSecurityTC, self).setup_database()
self.create_user('iaminusersgrouponly')
- self.readoriggroups = self.schema['Personne'].permissions['read']
- self.addoriggroups = self.schema['Personne'].permissions['add']
-
- def tearDown(self):
- CubicWebTC.tearDown(self)
- self.schema['Personne'].set_action_permissions('read', self.readoriggroups)
- self.schema['Personne'].set_action_permissions('add', self.addoriggroups)
+ readoriggroups = self.schema['Personne'].permissions['read']
+ addoriggroups = self.schema['Personne'].permissions['add']
+ def fix_perm():
+ self.schema['Personne'].set_action_permissions('read', readoriggroups)
+ self.schema['Personne'].set_action_permissions('add', addoriggroups)
+ self.addCleanup(fix_perm)
class LowLevelSecurityFunctionTC(BaseSecurityTC):
--- a/test/unittest_migration.py Wed Mar 16 09:45:57 2011 +0100
+++ b/test/unittest_migration.py Wed Mar 16 18:12:57 2011 +0100
@@ -97,7 +97,7 @@
config.__class__.name = 'repository'
-from cubicweb.devtools import ApptestConfiguration, init_test_database, cleanup_sqlite
+from cubicweb.devtools import ApptestConfiguration, get_test_db_handler
class BaseCreationTC(TestCase):
@@ -106,8 +106,8 @@
config = ApptestConfiguration('data', apphome=self.datadir)
source = config.sources()['system']
self.assertEqual(source['db-driver'], 'sqlite')
- cleanup_sqlite(source['db-name'], removetemplate=True)
- init_test_database(config=config)
+ handler = get_test_db_handler(config)
+ handler.init_test_database()
if __name__ == '__main__':