# HG changeset patch # User Sylvain Thénault # Date 1250003754 -7200 # Node ID a9a2dca5db20089a08079cc883950b01deddfb1b # Parent 8074dd88e21b9a4e87de6c41ecf5e453b907fdd4# Parent b2530e3e0afbc014fa2f0419d93e7a4921ca9ee3 merge diff -r 8074dd88e21b -r a9a2dca5db20 common/test/unittest_mail.py --- a/common/test/unittest_mail.py Tue Aug 11 17:06:02 2009 +0200 +++ b/common/test/unittest_mail.py Tue Aug 11 17:15:54 2009 +0200 @@ -13,7 +13,7 @@ from logilab.common.testlib import unittest_main from logilab.common.umessage import message_from_string -from cubicweb.devtools.apptest import EnvBasedTC +from cubicweb.devtools.testlib import CubicWebTC from cubicweb.common.mail import format_mail @@ -25,7 +25,7 @@ return pwd.getpwuid(os.getuid())[0] -class EmailTC(EnvBasedTC): +class EmailTC(CubicWebTC): def test_format_mail(self): self.set_option('sender-addr', 'bim@boum.fr') diff -r 8074dd88e21b -r a9a2dca5db20 common/test/unittest_migration.py --- a/common/test/unittest_migration.py Tue Aug 11 17:06:02 2009 +0200 +++ b/common/test/unittest_migration.py Tue Aug 11 17:15:54 2009 +0200 @@ -10,8 +10,6 @@ from logilab.common.testlib import TestCase, unittest_main from cubicweb.devtools import TestServerConfiguration -from cubicweb.devtools.apptest import TestEnvironment - from cubicweb.cwconfig import CubicWebConfiguration from cubicweb.common.migration import MigrationHelper, filter_scripts from cubicweb.server.migractions import ServerMigrationHelper @@ -98,8 +96,8 @@ config = ApptestConfiguration('data') source = config.sources()['system'] self.assertEquals(source['db-driver'], 'sqlite') - cleanup_sqlite(source['db-name'], removecube=True) - init_test_database(driver=source['db-driver'], config=config) + cleanup_sqlite(source['db-name'], removetemplate=True) + init_test_database(config=config) if __name__ == '__main__': diff -r 8074dd88e21b -r a9a2dca5db20 common/test/unittest_mixins.py --- a/common/test/unittest_mixins.py Tue Aug 11 17:06:02 2009 +0200 +++ b/common/test/unittest_mixins.py Tue Aug 11 17:15:54 2009 +0200 @@ -6,9 +6,9 @@ :license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses """ from logilab.common.testlib import unittest_main -from cubicweb.devtools.apptest import EnvBasedTC +from cubicweb.devtools.testlib import CubicWebTC -class WorkfloableMixInTC(EnvBasedTC): +class WorkfloableMixInTC(CubicWebTC): def test_wf_state(self): s = self.add_entity('State', name=u'activated') self.execute('SET X state_of ET WHERE ET name "Bookmark", X eid %(x)s', diff -r 8074dd88e21b -r a9a2dca5db20 devtools/__init__.py --- a/devtools/__init__.py Tue Aug 11 17:06:02 2009 +0200 +++ b/devtools/__init__.py Tue Aug 11 17:15:54 2009 +0200 @@ -13,20 +13,54 @@ from os.path import (abspath, join, exists, basename, dirname, normpath, split, isfile, isabs) -from cubicweb import CW_SOFTWARE_ROOT, ConfigurationError +from cubicweb import CW_SOFTWARE_ROOT, ConfigurationError, schema, cwconfig from cubicweb.utils import strptime -from cubicweb.toolsutils import read_config -from cubicweb.cwconfig import CubicWebConfiguration, merge_options from cubicweb.server.serverconfig import ServerConfiguration from cubicweb.etwist.twconfig import TwistedConfiguration +cwconfig.CubicWebConfiguration.cls_adjust_sys_path() + +# db auto-population configuration ############################################# + +SYSTEM_ENTITIES = schema.SCHEMA_TYPES | set(( + 'CWGroup', 'CWUser', 'CWProperty', + 'State', 'Transition', 'TrInfo', + )) + +SYSTEM_RELATIONS = schema.META_RTYPES | set(( + # workflow related + 'state_of', 'transition_of', 'initial_state', 'allowed_transition', + 'destination_state', 'in_state', 'wf_info_for', 'from_state', 'to_state', + 'condition', + # cwproperty + 'for_user', + # schema definition + 'specializes', + 'relation_type', 'from_entity', 'to_entity', + 'constrained_by', 'cstrtype', 'widget', + 'read_permission', 'update_permission', 'delete_permission', 'add_permission', + # permission + 'in_group', 'require_group', 'require_permission', + # deducted from other relations + 'primary_email', + )) + +# content validation configuration ############################################# + # validators are used to validate (XML, DTD, whatever) view's content # validators availables are : # 'dtd' : validates XML + declared DTD # 'xml' : guarantees XML is well formed # None : do not try to validate anything + +# {'vid': validator} VIEW_VALIDATORS = {} + + +# cubicweb test configuration ################################################## + BASE_URL = 'http://testing.fr/cubicweb/' + DEFAULT_SOURCES = {'system': {'adapter' : 'native', 'db-encoding' : 'UTF-8', #'ISO-8859-1', 'db-user' : u'admin', @@ -40,13 +74,14 @@ }, } + class TestServerConfiguration(ServerConfiguration): mode = 'test' set_language = False read_instance_schema = False bootstrap_schema = False init_repository = True - options = merge_options(ServerConfiguration.options + ( + options = cwconfig.merge_options(ServerConfiguration.options + ( ('anonymous-user', {'type' : 'string', 'default': None, @@ -66,7 +101,6 @@ def __init__(self, appid, log_threshold=logging.CRITICAL+10): ServerConfiguration.__init__(self, appid) - self.global_set_option('log-file', None) self.init_log(log_threshold, force=True) # need this, usually triggered by cubicweb-ctl self.load_cwctl_plugins() @@ -118,30 +152,11 @@ sources = DEFAULT_SOURCES return sources - def load_defaults(self): - super(TestServerConfiguration, self).load_defaults() - # note: don't call global set option here, OptionManager may not yet be initialized - # add anonymous user - self.set_option('anonymous-user', 'anon') - self.set_option('anonymous-password', 'anon') - # uncomment the line below if you want rql queries to be logged - #self.set_option('query-log-file', '/tmp/test_rql_log.' + `os.getpid()`) - self.set_option('sender-name', 'cubicweb-test') - self.set_option('sender-addr', 'cubicweb-test@logilab.fr') - try: - send_to = '%s@logilab.fr' % os.getlogin() - except OSError: - send_to = '%s@logilab.fr' % (os.environ.get('USER') - or os.environ.get('USERNAME') - or os.environ.get('LOGNAME')) - self.set_option('sender-addr', send_to) - self.set_option('default-dest-addrs', send_to) - self.set_option('base-url', BASE_URL) - class BaseApptestConfiguration(TestServerConfiguration, TwistedConfiguration): repo_method = 'inmemory' - options = merge_options(TestServerConfiguration.options + TwistedConfiguration.options) + options = cwconfig.merge_options(TestServerConfiguration.options + + TwistedConfiguration.options) cubicweb_appobject_path = TestServerConfiguration.cubicweb_appobject_path | TwistedConfiguration.cubicweb_appobject_path cube_appobject_path = TestServerConfiguration.cube_appobject_path | TwistedConfiguration.cube_appobject_path @@ -163,98 +178,91 @@ BaseApptestConfiguration.__init__(self, appid, log_threshold=log_threshold) self.init_repository = sourcefile is None self.sourcefile = sourcefile - import re - self.global_set_option('embed-allowed', re.compile('.*')) + self.global_set_option('anonymous-user', 'anon') + self.global_set_option('anonymous-password', 'anon') + + def load_configuration(self): + super(ApptestConfiguration, self).load_configuration() + self.global_set_option('anonymous-user', 'anon') + self.global_set_option('anonymous-password', 'anon') -class RealDatabaseConfiguration(ApptestConfiguration): - init_repository = False - sourcesdef = {'system': {'adapter' : 'native', - 'db-encoding' : 'UTF-8', #'ISO-8859-1', - 'db-user' : u'admin', - 'db-password' : 'gingkow', - 'db-name' : 'seotest', - 'db-driver' : 'postgres', - 'db-host' : None, - }, - 'admin' : {'login': u'admin', - 'password': u'gingkow', - }, - } +# test database handling ####################################################### - def __init__(self, appid, log_threshold=logging.CRITICAL, sourcefile=None): - ApptestConfiguration.__init__(self, appid) - self.init_repository = False +def init_test_database(config=None, configdir='data'): + """init a test database for a specific driver""" + from cubicweb.dbapi import in_memory_cnx + config = config or TestServerConfiguration(configdir) + sources = config.sources() + driver = sources['system']['db-driver'] + if driver == 'sqlite': + init_test_database_sqlite(config) + elif driver == 'postgres': + init_test_database_postgres(config) + else: + raise ValueError('no initialization function for driver %r' % driver) + config._cubes = None # avoid assertion error + repo, cnx = in_memory_cnx(config, unicode(sources['admin']['login']), + sources['admin']['password'] or 'xxx') + if driver == 'sqlite': + install_sqlite_patch(repo.querier) + return repo, cnx - def sources(self): - """ - By default, we run tests with the sqlite DB backend. - One may use its own configuration by just creating a - 'sources' file in the test directory from wich tests are - launched. - """ - self._sources = self.sourcesdef - return self._sources +def reset_test_database(config): + """init a test database for a specific driver""" + driver = config.sources()['system']['db-driver'] + if driver == 'sqlite': + reset_test_database_sqlite(config) + else: + raise ValueError('no reset function for driver %r' % driver) -def buildconfig(dbuser, dbpassword, dbname, adminuser, adminpassword, dbhost=None): - """convenience function that builds a real-db configuration class""" - sourcesdef = {'system': {'adapter' : 'native', - 'db-encoding' : 'UTF-8', #'ISO-8859-1', - 'db-user' : dbuser, - 'db-password' : dbpassword, - 'db-name' : dbname, - 'db-driver' : 'postgres', - 'db-host' : dbhost, - }, - 'admin' : {'login': adminuser, - 'password': adminpassword, - }, - } - return type('MyRealDBConfig', (RealDatabaseConfiguration,), - {'sourcesdef': sourcesdef}) +### postgres test database handling ############################################ -def loadconfig(filename): - """convenience function that builds a real-db configuration class - from a file - """ - return type('MyRealDBConfig', (RealDatabaseConfiguration,), - {'sourcesdef': read_config(filename)}) +def init_test_database_postgres(config): + """initialize a fresh sqlite databse used for testing purpose""" + if config.init_repository: + from cubicweb.server import init_repository + init_repository(config, interactive=False, drop=True) -class LivetestConfiguration(BaseApptestConfiguration): - init_repository = False +### sqlite test database handling ############################################## + +def cleanup_sqlite(dbfile, removetemplate=False): + try: + os.remove(dbfile) + os.remove('%s-journal' % dbfile) + except OSError: + pass + if removetemplate: + try: + os.remove('%s-template' % dbfile) + except OSError: + pass - def __init__(self, cube=None, sourcefile=None, pyro_name=None, - log_threshold=logging.CRITICAL): - TestServerConfiguration.__init__(self, cube, log_threshold=log_threshold) - self.appid = pyro_name or cube - # don't change this, else some symlink problems may arise in some - # environment (e.g. mine (syt) ;o) - # XXX I'm afraid this test will prevent to run test from a production - # environment - self._sources = None - # instance cube test - if cube is not None: - self.apphome = self.cube_dir(cube) - elif 'web' in os.getcwd().split(os.sep): - # web test - self.apphome = join(normpath(join(dirname(__file__), '..')), 'web') - else: - # cube test - self.apphome = abspath('..') - self.sourcefile = sourcefile - self.global_set_option('realm', '') - self.use_pyro = pyro_name is not None +def reset_test_database_sqlite(config): + import shutil + dbfile = config.sources()['system']['db-name'] + cleanup_sqlite(dbfile) + template = '%s-template' % dbfile + if exists(template): + shutil.copy(template, dbfile) + return True + return False - def pyro_enabled(self): - if self.use_pyro: - return True - else: - return False +def init_test_database_sqlite(config): + """initialize a fresh sqlite databse used for testing purpose""" + # remove database file if it exists + dbfile = config.sources()['system']['db-name'] + if not reset_test_database_sqlite(config): + # initialize the database + import shutil + from cubicweb.server import init_repository + init_repository(config, interactive=False) + dbfile = config.sources()['system']['db-name'] + shutil.copy(dbfile, '%s-template' % dbfile) -CubicWebConfiguration.cls_adjust_sys_path() def install_sqlite_patch(querier): """This patch hotfixes the following sqlite bug : @@ -293,57 +301,3 @@ return new_execute querier.__class__.execute = wrap_execute(querier.__class__.execute) querier.__class__._devtools_sqlite_patched = True - -def init_test_database(driver='sqlite', configdir='data', config=None, - vreg=None): - """init a test database for a specific driver""" - from cubicweb.dbapi import in_memory_cnx - if vreg and not config: - config = vreg.config - config = config or TestServerConfiguration(configdir) - source = config.sources() - if driver == 'sqlite': - init_test_database_sqlite(config, source) - elif driver == 'postgres': - init_test_database_postgres(config, source) - else: - raise ValueError('no initialization function for driver %r' % driver) - config._cubes = None # avoid assertion error - repo, cnx = in_memory_cnx(vreg or config, unicode(source['admin']['login']), - source['admin']['password'] or 'xxx') - if driver == 'sqlite': - install_sqlite_patch(repo.querier) - return repo, cnx - -def init_test_database_postgres(config, source, vreg=None): - """initialize a fresh sqlite databse used for testing purpose""" - if config.init_repository: - from cubicweb.server import init_repository - init_repository(config, interactive=False, drop=True, vreg=vreg) - -def cleanup_sqlite(dbfile, removecube=False): - try: - os.remove(dbfile) - os.remove('%s-journal' % dbfile) - except OSError: - pass - if removecube: - try: - os.remove('%s-template' % dbfile) - except OSError: - pass - -def init_test_database_sqlite(config, source, vreg=None): - """initialize a fresh sqlite databse used for testing purpose""" - import shutil - # remove database file if it exists (actually I know driver == 'sqlite' :) - dbfile = source['system']['db-name'] - cleanup_sqlite(dbfile) - template = '%s-template' % dbfile - if exists(template): - shutil.copy(template, dbfile) - else: - # initialize the database - from cubicweb.server import init_repository - init_repository(config, interactive=False, vreg=vreg) - shutil.copy(dbfile, template) diff -r 8074dd88e21b -r a9a2dca5db20 devtools/_apptest.py --- a/devtools/_apptest.py Tue Aug 11 17:06:02 2009 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,208 +0,0 @@ -"""Hidden internals for the devtools.apptest module - -:organization: Logilab -:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2. -:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr -:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses -""" -__docformat__ = "restructuredtext en" - -import sys, traceback - -from logilab.common.pytest import pause_tracing, resume_tracing - -import yams.schema - -from cubicweb.dbapi import repo_connect, ConnectionProperties, ProgrammingError -from cubicweb.cwvreg import CubicWebVRegistry - -from cubicweb.web.application import CubicWebPublisher -from cubicweb.web import Redirect - -from cubicweb.devtools import ApptestConfiguration, init_test_database -from cubicweb.devtools.fake import FakeRequest - -SYSTEM_ENTITIES = ('CWGroup', 'CWUser', - 'CWAttribute', 'CWRelation', - 'CWConstraint', 'CWConstraintType', 'CWProperty', - 'CWEType', 'CWRType', - 'State', 'Transition', 'TrInfo', - 'RQLExpression', - ) -SYSTEM_RELATIONS = ( - # virtual relation - 'identity', - # metadata - 'is', 'is_instance_of', 'owned_by', 'created_by', 'specializes', - # workflow related - 'state_of', 'transition_of', 'initial_state', 'allowed_transition', - 'destination_state', 'in_state', 'wf_info_for', 'from_state', 'to_state', - 'condition', - # permission - 'in_group', 'require_group', 'require_permission', - 'read_permission', 'update_permission', 'delete_permission', 'add_permission', - # eproperty - 'for_user', - # schema definition - 'relation_type', 'from_entity', 'to_entity', - 'constrained_by', 'cstrtype', 'widget', - # deducted from other relations - 'primary_email', - ) - -def unprotected_entities(app_schema, strict=False): - """returned a Set of each non final entity type, excluding CWGroup, and CWUser... - """ - if strict: - protected_entities = yams.schema.BASE_TYPES - else: - protected_entities = yams.schema.BASE_TYPES.union(set(SYSTEM_ENTITIES)) - entities = set(app_schema.entities()) - return entities - protected_entities - - -def ignore_relations(*relations): - global SYSTEM_RELATIONS - SYSTEM_RELATIONS += relations - - -class TestEnvironment(object): - """TestEnvironment defines a context (e.g. a config + a given connection) in - which the tests are executed - """ - - def __init__(self, appid, reporter=None, verbose=False, - configcls=ApptestConfiguration, requestcls=FakeRequest): - config = configcls(appid) - self.requestcls = requestcls - self.cnx = None - config.db_perms = False - source = config.sources()['system'] - if verbose: - print "init test database ..." - self.vreg = vreg = CubicWebVRegistry(config) - self.admlogin = source['db-user'] - # restore database <=> init database - self.restore_database() - if verbose: - print "init done" - config.repository = lambda x=None: self.repo - self.app = CubicWebPublisher(config, vreg=vreg) - self.verbose = verbose - schema = self.vreg.schema - # else we may run into problems since email address are ususally share in app tests - # XXX should not be necessary anymore - schema.rschema('primary_email').set_rproperty('CWUser', 'EmailAddress', 'composite', False) - self.deletable_entities = unprotected_entities(schema) - - def restore_database(self): - """called by unittests' tearDown to restore the original database - """ - try: - pause_tracing() - if self.cnx: - self.cnx.close() - source = self.vreg.config.sources()['system'] - self.repo, self.cnx = init_test_database(driver=source['db-driver'], - vreg=self.vreg) - self._orig_cnx = self.cnx - resume_tracing() - except: - resume_tracing() - traceback.print_exc() - sys.exit(1) - # XXX cnx decoration is usually done by the repository authentication manager, - # necessary in authentication tests - self.cnx.vreg = self.vreg - self.cnx.login = source['db-user'] - self.cnx.password = source['db-password'] - - - def create_user(self, login, groups=('users',), req=None): - req = req or self.create_request() - cursor = self._orig_cnx.cursor(req) - rset = cursor.execute('INSERT CWUser X: X login %(login)s, X upassword %(passwd)s,' - 'X in_state S WHERE S name "activated"', - {'login': unicode(login), 'passwd': login.encode('utf8')}) - user = rset.get_entity(0, 0) - cursor.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)' - % ','.join(repr(g) for g in groups), - {'x': user.eid}, 'x') - user.clear_related_cache('in_group', 'subject') - self._orig_cnx.commit() - return user - - def login(self, login, password=None): - if login == self.admlogin: - self.restore_connection() - else: - self.cnx = repo_connect(self.repo, unicode(login), - password or str(login), - ConnectionProperties('inmemory')) - if login == self.vreg.config.anonymous_user()[0]: - self.cnx.anonymous_connection = True - return self.cnx - - def restore_connection(self): - if not self.cnx is self._orig_cnx: - try: - self.cnx.close() - except ProgrammingError: - pass # already closed - self.cnx = self._orig_cnx - - ############################################################################ - - def execute(self, rql, args=None, eidkey=None, req=None): - """executes , builds a resultset, and returns a couple (rset, req) - where req is a FakeRequest - """ - req = req or self.create_request(rql=rql) - return self.cnx.cursor(req).execute(unicode(rql), args, eidkey) - - def create_request(self, rql=None, **kwargs): - """executes , builds a resultset, and returns a - couple (rset, req) where req is a FakeRequest - """ - if rql: - kwargs['rql'] = rql - req = self.requestcls(self.vreg, form=kwargs) - req.set_connection(self.cnx) - return req - - def get_rset_and_req(self, rql, optional_args=None, args=None, eidkey=None): - """executes , builds a resultset, and returns a - couple (rset, req) where req is a FakeRequest - """ - return (self.execute(rql, args, eidkey), - self.create_request(rql=rql, **optional_args or {})) - - -class ExistingTestEnvironment(TestEnvironment): - - def __init__(self, appid, sourcefile, verbose=False): - config = ApptestConfiguration(appid, sourcefile=sourcefile) - if verbose: - print "init test database ..." - source = config.sources()['system'] - self.vreg = CubicWebVRegistry(config) - self.cnx = init_test_database(driver=source['db-driver'], - vreg=self.vreg)[1] - if verbose: - print "init done" - self.app = CubicWebPublisher(config, vreg=self.vreg) - self.verbose = verbose - # this is done when the publisher is opening a connection - self.cnx.vreg = self.vreg - - def setup(self, config=None): - """config is passed by TestSuite but is ignored in this environment""" - cursor = self.cnx.cursor() - self.last_eid = cursor.execute('Any X WHERE X creation_date D ORDERBY D DESC LIMIT 1').rows[0][0] - - def cleanup(self): - """cancel inserted elements during tests""" - cursor = self.cnx.cursor() - cursor.execute('DELETE Any X WHERE X eid > %(x)s', {'x' : self.last_eid}, eid_key='x') - print "cleaning done" - self.cnx.commit() diff -r 8074dd88e21b -r a9a2dca5db20 devtools/apptest.py --- a/devtools/apptest.py Tue Aug 11 17:06:02 2009 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,506 +0,0 @@ -"""This module provides misc utilities to test instances - -:organization: Logilab -:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2. -:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr -:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses -""" -__docformat__ = "restructuredtext en" - -from copy import deepcopy - -import simplejson - -from logilab.common.testlib import TestCase -from logilab.common.pytest import nocoverage -from logilab.common.umessage import message_from_string - -from logilab.common.deprecation import deprecated - -from cubicweb.devtools import init_test_database, TestServerConfiguration, ApptestConfiguration -from cubicweb.devtools._apptest import TestEnvironment -from cubicweb.devtools.fake import FakeRequest - -from cubicweb.dbapi import repo_connect, ConnectionProperties, ProgrammingError - - -MAILBOX = [] -class Email: - def __init__(self, recipients, msg): - self.recipients = recipients - self.msg = msg - - @property - def message(self): - return message_from_string(self.msg) - - @property - def subject(self): - return self.message.get('Subject') - - @property - def content(self): - return self.message.get_payload(decode=True) - - def __repr__(self): - return '' % (','.join(self.recipients), - self.message.get('Subject')) - -class MockSMTP: - def __init__(self, server, port): - pass - def close(self): - pass - def sendmail(self, helo_addr, recipients, msg): - MAILBOX.append(Email(recipients, msg)) - -from cubicweb import cwconfig -cwconfig.SMTP = MockSMTP - - -def get_versions(self, checkversions=False): - """return the a dictionary containing cubes used by this instance - as key with their version as value, including cubicweb version. This is a - public method, not requiring a session id. - - replace Repository.get_versions by this method if you don't want versions - checking - """ - vcconf = {'cubicweb': self.config.cubicweb_version()} - self.config.bootstrap_cubes() - for pk in self.config.cubes(): - version = self.config.cube_version(pk) - vcconf[pk] = version - self.config._cubes = None - return vcconf - - -@property -def late_binding_env(self): - """builds TestEnvironment as late as possible""" - if not hasattr(self, '_env'): - self.__class__._env = TestEnvironment('data', configcls=self.configcls, - requestcls=self.requestcls) - return self._env - - -class autoenv(type): - """automatically set environment on EnvBasedTC subclasses if necessary - """ - def __new__(mcs, name, bases, classdict): - env = classdict.get('env') - # try to find env in one of the base classes - if env is None: - for base in bases: - env = getattr(base, 'env', None) - if env is not None: - classdict['env'] = env - break - if not classdict.get('__abstract__') and not classdict.get('env'): - classdict['env'] = late_binding_env - return super(autoenv, mcs).__new__(mcs, name, bases, classdict) - - -class EnvBasedTC(TestCase): - """abstract class for test using an apptest environment - """ - __metaclass__ = autoenv - __abstract__ = True - env = None - configcls = ApptestConfiguration - requestcls = FakeRequest - - # user / session management ############################################### - - def user(self, req=None): - if req is None: - req = self.env.create_request() - return self.env.cnx.user(req) - else: - return req.user - - def create_user(self, *args, **kwargs): - return self.env.create_user(*args, **kwargs) - - def login(self, login, password=None): - return self.env.login(login, password) - - def restore_connection(self): - self.env.restore_connection() - - # db api ################################################################## - - @nocoverage - def cursor(self, req=None): - return self.env.cnx.cursor(req or self.request()) - - @nocoverage - def execute(self, *args, **kwargs): - return self.env.execute(*args, **kwargs) - - @nocoverage - def commit(self): - self.env.cnx.commit() - - @nocoverage - def rollback(self): - try: - self.env.cnx.rollback() - except ProgrammingError: - pass - - # other utilities ######################################################### - def set_debug(self, debugmode): - from cubicweb.server import set_debug - set_debug(debugmode) - - @property - def config(self): - return self.vreg.config - - def session(self): - """return current server side session (using default manager account)""" - return self.env.repo._sessions[self.env.cnx.sessionid] - - def request(self, *args, **kwargs): - """return a web interface request""" - return self.env.create_request(*args, **kwargs) - - @nocoverage - def rset_and_req(self, *args, **kwargs): - return self.env.get_rset_and_req(*args, **kwargs) - - def entity(self, rql, args=None, eidkey=None, req=None): - return self.execute(rql, args, eidkey, req=req).get_entity(0, 0) - - def etype_instance(self, etype, req=None): - req = req or self.request() - e = self.env.vreg['etypes'].etype_class(etype)(req) - e.eid = None - return e - - def add_entity(self, etype, **kwargs): - rql = ['INSERT %s X' % etype] - - # dict for replacement in RQL Request - rql_args = {} - - if kwargs: # - rql.append(':') - # dict to define new entities variables - entities = {} - - # assignement part of the request - sub_rql = [] - for key, value in kwargs.iteritems(): - # entities - if hasattr(value, 'eid'): - new_value = "%s__" % key.upper() - - entities[new_value] = value.eid - rql_args[new_value] = value.eid - - sub_rql.append("X %s %s" % (key, new_value)) - # final attributes - else: - sub_rql.append('X %s %%(%s)s' % (key, key)) - rql_args[key] = value - rql.append(', '.join(sub_rql)) - - - if entities: - rql.append('WHERE') - # WHERE part of the request (to link entity to they eid) - sub_rql = [] - for key, value in entities.iteritems(): - sub_rql.append("%s eid %%(%s)s" % (key, key)) - rql.append(', '.join(sub_rql)) - - rql = ' '.join(rql) - rset = self.execute(rql, rql_args) - return rset.get_entity(0, 0) - - def set_option(self, optname, value): - self.vreg.config.global_set_option(optname, value) - - def pviews(self, req, rset): - return sorted((a.id, a.__class__) for a in self.vreg['views'].possible_views(req, rset=rset)) - - def pactions(self, req, rset, skipcategories=('addrelated', 'siteactions', 'useractions')): - return [(a.id, a.__class__) for a in self.vreg['actions'].possible_vobjects(req, rset=rset) - if a.category not in skipcategories] - - def pactions_by_cats(self, req, rset, categories=('addrelated',)): - return [(a.id, a.__class__) for a in self.vreg['actions'].possible_vobjects(req, rset=rset) - if a.category in categories] - - paddrelactions = deprecated()(pactions_by_cats) - - def pactionsdict(self, req, rset, skipcategories=('addrelated', 'siteactions', 'useractions')): - res = {} - for a in self.vreg['actions'].possible_vobjects(req, rset=rset): - if a.category not in skipcategories: - res.setdefault(a.category, []).append(a.__class__) - return res - - - def remote_call(self, fname, *args): - """remote call simulation""" - dump = simplejson.dumps - args = [dump(arg) for arg in args] - req = self.request(fname=fname, pageid='123', arg=args) - ctrl = self.vreg['controllers'].select('json', req) - return ctrl.publish(), req - - # default test setup and teardown ######################################### - - def setup_database(self): - pass - - def setUp(self): - self.restore_connection() - session = self.session() - #self.maxeid = self.execute('Any MAX(X)') - session.set_pool() - self.maxeid = session.system_sql('SELECT MAX(eid) FROM entities').fetchone()[0] - self.app = self.env.app - self.vreg = self.env.app.vreg - self.schema = self.vreg.schema - self.vreg.config.mode = 'test' - # set default-dest-addrs to a dumb email address to avoid mailbox or - # mail queue pollution - self.set_option('default-dest-addrs', ['whatever']) - self.setup_database() - self.commit() - MAILBOX[:] = [] # reset mailbox - - @nocoverage - def tearDown(self): - self.rollback() - # self.env.restore_database() - self.env.restore_connection() - self.session().unsafe_execute('DELETE Any X WHERE X eid > %s' % self.maxeid) - self.commit() - - # global resources accessors ############################################### - -# XXX -try: - from cubicweb.web import Redirect - from urllib import unquote -except ImportError: - pass # cubicweb-web not installed -else: - class ControllerTC(EnvBasedTC): - def setUp(self): - super(ControllerTC, self).setUp() - self.req = self.request() - self.ctrl = self.vreg['controllers'].select('edit', self.req) - - def publish(self, req): - assert req is self.ctrl.req - try: - result = self.ctrl.publish() - req.cnx.commit() - except Redirect: - req.cnx.commit() - raise - return result - - def expect_redirect_publish(self, req=None): - if req is not None: - self.ctrl = self.vreg['controllers'].select('edit', req) - else: - req = self.req - try: - res = self.publish(req) - except Redirect, ex: - try: - path, params = ex.location.split('?', 1) - except: - path, params = ex.location, "" - req._url = path - cleanup = lambda p: (p[0], unquote(p[1])) - params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p) - return req.relative_path(False), params # path.rsplit('/', 1)[-1], params - else: - self.fail('expected a Redirect exception') - - -def make_late_binding_repo_property(attrname): - @property - def late_binding(self): - """builds cnx as late as possible""" - if not hasattr(self, attrname): - # sets explicit test mode here to avoid autoreload - from cubicweb.cwconfig import CubicWebConfiguration - CubicWebConfiguration.mode = 'test' - cls = self.__class__ - config = self.repo_config or TestServerConfiguration('data') - cls._repo, cls._cnx = init_test_database('sqlite', config=config) - return getattr(self, attrname) - return late_binding - - -class autorepo(type): - """automatically set repository on RepositoryBasedTC subclasses if necessary - """ - def __new__(mcs, name, bases, classdict): - repo = classdict.get('repo') - # try to find repo in one of the base classes - if repo is None: - for base in bases: - repo = getattr(base, 'repo', None) - if repo is not None: - classdict['repo'] = repo - break - if name != 'RepositoryBasedTC' and not classdict.get('repo'): - classdict['repo'] = make_late_binding_repo_property('_repo') - classdict['cnx'] = make_late_binding_repo_property('_cnx') - return super(autorepo, mcs).__new__(mcs, name, bases, classdict) - - -class RepositoryBasedTC(TestCase): - """abstract class for test using direct repository connections - """ - __metaclass__ = autorepo - repo_config = None # set a particular config instance if necessary - - # user / session management ############################################### - - def create_user(self, user, groups=('users',), password=None, commit=True): - if password is None: - password = user - eid = self.execute('INSERT CWUser X: X login %(x)s, X upassword %(p)s,' - 'X in_state S WHERE S name "activated"', - {'x': unicode(user), 'p': password})[0][0] - groups = ','.join(repr(group) for group in groups) - self.execute('SET X in_group Y WHERE X eid %%(x)s, Y name IN (%s)' % groups, - {'x': eid}) - if commit: - self.commit() - self.session.reset_pool() - return eid - - def login(self, login, password=None): - cnx = repo_connect(self.repo, unicode(login), password or login, - ConnectionProperties('inmemory')) - self.cnxs.append(cnx) - return cnx - - def current_session(self): - return self.repo._sessions[self.cnxs[-1].sessionid] - - def restore_connection(self): - assert len(self.cnxs) == 1, self.cnxs - cnx = self.cnxs.pop() - try: - cnx.close() - except Exception, ex: - print "exception occured while closing connection", ex - - # db api ################################################################## - - def execute(self, rql, args=None, eid_key=None): - assert self.session.id == self.cnxid - rset = self.__execute(self.cnxid, rql, args, eid_key) - rset.vreg = self.vreg - rset.req = self.session - # call to set_pool is necessary to avoid pb when using - # instance entities for convenience - self.session.set_pool() - return rset - - def commit(self): - self.__commit(self.cnxid) - self.session.set_pool() - - def rollback(self): - self.__rollback(self.cnxid) - self.session.set_pool() - - def close(self): - self.__close(self.cnxid) - - # other utilities ######################################################### - - def set_debug(self, debugmode): - from cubicweb.server import set_debug - set_debug(debugmode) - - def set_option(self, optname, value): - self.vreg.config.global_set_option(optname, value) - - def add_entity(self, etype, **kwargs): - restrictions = ', '.join('X %s %%(%s)s' % (key, key) for key in kwargs) - rql = 'INSERT %s X' % etype - if kwargs: - rql += ': %s' % ', '.join('X %s %%(%s)s' % (key, key) for key in kwargs) - rset = self.execute(rql, kwargs) - return rset.get_entity(0, 0) - - def default_user_password(self): - config = self.repo.config #TestConfiguration('data') - user = unicode(config.sources()['system']['db-user']) - passwd = config.sources()['system']['db-password'] - return user, passwd - - def close_connections(self): - for cnx in self.cnxs: - try: - cnx.rollback() - cnx.close() - except: - continue - self.cnxs = [] - - pactions = EnvBasedTC.pactions.im_func - pactionsdict = EnvBasedTC.pactionsdict.im_func - - # default test setup and teardown ######################################### - - def _prepare(self): - MAILBOX[:] = [] # reset mailbox - if hasattr(self, 'cnxid'): - return - repo = self.repo - self.__execute = repo.execute - self.__commit = repo.commit - self.__rollback = repo.rollback - self.__close = repo.close - self.cnxid = self.cnx.sessionid - self.session = repo._sessions[self.cnxid] - self.cnxs = [] - # reset caches, they may introduce bugs among tests - repo._type_source_cache = {} - repo._extid_cache = {} - repo.querier._rql_cache = {} - for source in repo.sources: - source.reset_caches() - for s in repo.sources: - if hasattr(s, '_cache'): - s._cache = {} - - @property - def config(self): - return self.repo.config - - @property - def vreg(self): - return self.repo.vreg - - @property - def schema(self): - return self.repo.schema - - def setUp(self): - self._prepare() - self.session.set_pool() - self.maxeid = self.session.system_sql('SELECT MAX(eid) FROM entities').fetchone()[0] - - def tearDown(self): - self.close_connections() - self.rollback() - self.session.unsafe_execute('DELETE Any X WHERE X eid > %(x)s', {'x': self.maxeid}) - self.commit() - diff -r 8074dd88e21b -r a9a2dca5db20 devtools/devctl.py --- a/devtools/devctl.py Tue Aug 11 17:06:02 2009 +0200 +++ b/devtools/devctl.py Tue Aug 11 17:15:54 2009 +0200 @@ -119,6 +119,7 @@ libschema = libconfig.load_schema(remove_unused_rtypes=False) entities = [e for e in schema.entities() if not e in libschema] else: + libschema = None entities = schema.entities() done = set() for eschema in sorted(entities): diff -r 8074dd88e21b -r a9a2dca5db20 devtools/fake.py --- a/devtools/fake.py Tue Aug 11 17:06:02 2009 +0200 +++ b/devtools/fake.py Tue Aug 11 17:15:54 2009 +0200 @@ -7,7 +7,6 @@ """ __docformat__ = "restructuredtext en" -from logilab.common.testlib import mock_object as Mock from logilab.common.adbh import get_adv_func_helper from indexer import get_indexer diff -r 8074dd88e21b -r a9a2dca5db20 devtools/htmlparser.py --- a/devtools/htmlparser.py Tue Aug 11 17:06:02 2009 +0200 +++ b/devtools/htmlparser.py Tue Aug 11 17:15:54 2009 +0200 @@ -169,3 +169,5 @@ except KeyError: continue return False + +VALMAP = {None: None, 'dtd': DTDValidator, 'xml': SaxOnlyValidator} diff -r 8074dd88e21b -r a9a2dca5db20 devtools/livetest.py --- a/devtools/livetest.py Tue Aug 11 17:06:02 2009 +0200 +++ b/devtools/livetest.py Tue Aug 11 17:15:54 2009 +0200 @@ -6,9 +6,10 @@ :license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses """ +import os import socket import logging -from os.path import join, dirname, exists +from os.path import join, dirname, normpath, abspath from StringIO import StringIO #from twisted.application import service, strports @@ -21,10 +22,9 @@ from logilab.common.testlib import TestCase -import cubicweb.web from cubicweb.dbapi import in_memory_cnx from cubicweb.etwist.server import CubicWebRootResource -from cubicweb.devtools import LivetestConfiguration, init_test_database +from cubicweb.devtools import BaseApptestConfiguration, init_test_database @@ -50,25 +50,57 @@ +class LivetestConfiguration(BaseApptestConfiguration): + init_repository = False + + def __init__(self, cube=None, sourcefile=None, pyro_name=None, + log_threshold=logging.CRITICAL): + BaseApptestConfiguration.__init__(self, cube, log_threshold=log_threshold) + self.appid = pyro_name or cube + # don't change this, else some symlink problems may arise in some + # environment (e.g. mine (syt) ;o) + # XXX I'm afraid this test will prevent to run test from a production + # environment + self._sources = None + # instance cube test + if cube is not None: + self.apphome = self.cube_dir(cube) + elif 'web' in os.getcwd().split(os.sep): + # web test + self.apphome = join(normpath(join(dirname(__file__), '..')), 'web') + else: + # cube test + self.apphome = abspath('..') + self.sourcefile = sourcefile + self.global_set_option('realm', '') + self.use_pyro = pyro_name is not None + + def pyro_enabled(self): + if self.use_pyro: + return True + else: + return False + + + def make_site(cube, options=None): from cubicweb.etwist import twconfig # trigger configuration registration - sourcefile = options.sourcefile - config = LivetestConfiguration(cube, sourcefile, + config = LivetestConfiguration(cube, options.sourcefile, pyro_name=options.pyro_name, log_threshold=logging.DEBUG) - source = config.sources()['system'] - init_test_database(driver=source['db-driver'], config=config) + init_test_database(config=config) # if '-n' in sys.argv: # debug mode cubicweb = LivetestResource(config, debug=True) toplevel = cubicweb website = server.Site(toplevel) cube_dir = config.cube_dir(cube) + source = config.sources()['system'] for port in xrange(7777, 7798): try: reactor.listenTCP(port, channel.HTTPFactory(website)) saveconf(cube_dir, port, source['db-user'], source['db-password']) break - except CannotListenError, exc: + except CannotListenError: print "port %s already in use, I will try another one" % port else: raise diff -r 8074dd88e21b -r a9a2dca5db20 devtools/migrtest.py --- a/devtools/migrtest.py Tue Aug 11 17:06:02 2009 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,153 +0,0 @@ -"""Migration test script - -* migration will be played into a chroot of the local machine -* the database server used can be configured -* test tested instance may be on another host - - -We are using postgres'.pgpass file. Here is a copy of postgres documentation -about that: - -The file .pgpass in a user's home directory or the file referenced by -PGPASSFILE can contain passwords to be used if the connection requires -a password (and no password has been specified otherwise). - - -This file should contain lines of the following format: - -hostname:port:database:username:password - -Each of the first four fields may be a literal value, or *, which -matches anything. The password field from the first line that matches -the current connection parameters will be used. (Therefore, put -more-specific entries first when you are using wildcards.) If an entry -needs to contain : or \, escape this character with \. A hostname of -localhost matches both host (TCP) and local (Unix domain socket) -connections coming from the local machine. - -The permissions on .pgpass must disallow any access to world or group; -achieve this by the command chmod 0600 ~/.pgpass. If the permissions -are less strict than this, the file will be ignored. - -:organization: Logilab -:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2. -:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr -:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses -""" -__docformat__ = "restructuredtext en" - -from os import system -from os.path import join, basename - -from logilab.common.shellutils import cp, rm - -from cubicweb.toolsutils import read_config -from cubicweb.server.utils import generate_sources_file - -# XXXX use db-copy instead - -# test environment configuration -chrootpath = '/sandbox/cubicwebtest' -tmpdbhost = 'crater' -tmpdbuser = 'syt' -tmpdbpasswd = 'syt' - -def play_migration(applhome, applhost='', sudo=False): - applid = dbname = basename(applhome) - testapplhome = join(chrootpath, applhome) - # copy instance into the chroot - if applhost: - system('scp -r %s:%s %s' % (applhost, applhome, testapplhome)) - else: - cp(applhome, testapplhome) -## # extract db parameters -## sources = read_config(join(testapplhome, 'sources')) -## dbname = sources['system']['db-name'] -## dbhost = sources['system'].get('db-host') or '' -## dbuser = sources['system'].get('db-user') or '' -## dbpasswd = sources['system'].get('db-password') or '' - # generate sources file - # XXX multisources - sources = {'system': {}} - sources['system']['db-encoding'] = 'UTF8' # XXX - sources['system']['db-name'] = dbname - sources['system']['db-host'] = None - sources['system']['db-user'] = tmpdbuser - sources['system']['db-password'] = None - generate_sources_file(join(testapplhome, 'sources'), sources) -## # create postgres password file so we won't need anymore passwords -## # XXX may exist! -## pgpassfile = expanduser('~/.pgpass') -## pgpass = open(pgpassfile, 'w') -## if dbpasswd: -## pgpass.write('%s:*:%s:%s:%s\n' % (dbhost or applhost or 'localhost', -## dbname, dbuser, dbpasswd)) -## if tmpdbpasswd: -## pgpass.write('%s:*:%s:%s:%s\n' % (tmpdbhost or 'localhost', dbname, -## tmpdbuser, tmpdbpasswd)) -## pgpass.close() -## chmod(pgpassfile, 0600) - # dump db -## dumpcmd = 'pg_dump -Fc -U %s -f /tmp/%s.dump %s' % ( -## dbuser, dbname, dbname) -## if dbhost: -## dumpcmd += ' -h %s' % dbhost - dumpfile = '/tmp/%s.dump' % applid - dumpcmd = 'cubicweb-ctl db-dump --output=%s %s' % (dumpfile, applid) - if sudo: - dumpcmd = 'sudo %s' % dumpcmd - if applhost: - dumpcmd = 'ssh %s "%s"' % (applhost, dumpcmd) - if system(dumpcmd): - raise Exception('error while dumping the database') -## if not dbhost and applhost: - if applhost: - # retrieve the dump - if system('scp %s:%s %s' % (applhost, dumpfile, dumpfile)): - raise Exception('error while retreiving the dump') - # move the dump into the chroot - system('mv %s %s%s' % (dumpfile, chrootpath, dumpfile)) - # locate installed versions - vcconf = read_config(join(testapplhome, 'vc.conf')) - template = vcconf['TEMPLATE'] - cubicwebversion = vcconf['CW'] - templversion = vcconf['TEMPLATE_VERSION'] - # install the same versions cubicweb and template versions into the chroot - system('sudo chroot %s apt-get update' % chrootpath) - system('sudo chroot %s apt-get install cubicweb-server=%s cubicweb-client=%s' - % (chrootpath, cubicwebversion, cubicwebversion)) - system('sudo chroot %s apt-get install cubicweb-%s-appl-server=%s cubicweb-%s-appl-client=%s' - % (chrootpath, template, templversion, template, templversion)) - # update and upgrade to the latest version - system('sudo chroot %s apt-get install cubicweb-server cubicweb-client' % chrootpath) - system('sudo chroot %s apt-get install cubicweb-%s-appl-server cubicweb-%s-appl-client' - % (chrootpath, template, template)) - # create and fill the database - system('sudo chroot cubicweb-ctl db-restore %s %s' % (applid, dumpfile)) -## if not tmpdbhost: -## system('createdb -U %s -T template0 -E UTF8 %s' % (tmpdbuser, dbname)) -## system('pg_restore -U %s -O -Fc -d %s /tmp/%s.dump' -## % (tmpdbuser, dbname, dbname)) -## else: -## system('createdb -h %s -U %s -T template0 -E UTF8 %s' -## % (tmpdbhost, tmpdbuser, dbname)) -## system('pg_restore -h %s -U %s -O -Fc -d %s /tmp/%s.dump' -## % (tmpdbhost, tmpdbuser, dbname, dbname)) - # launch upgrade - system('sudo chroot %s cubicweb-ctl upgrade %s' % (chrootpath, applid)) - - # cleanup - rm(testapplhome) -## rm(pgpassfile) -## if tmpdbhost: -## system('dropdb -h %s -U %s %s' % (tmpdbuser, tmpdbhost, dbname)) -## else: -## system('dropdb -U %s %s' % (tmpdbuser, dbname)) -## if not dbhost and applhost: - if applhost: - system('ssh %s rm %s' % (applhost, dumpfile)) - rm('%s%s' % (chrootpath, dumpfile)) - - -if __name__ == '__main__': - play_migration('/etc/cubicweb.d/jpl', 'lepus') diff -r 8074dd88e21b -r a9a2dca5db20 devtools/realdbtest.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/devtools/realdbtest.py Tue Aug 11 17:15:54 2009 +0200 @@ -0,0 +1,43 @@ +import logging +from cubicweb import toolsutils +from cubicweb.devtools import DEFAULT_SOURCES, BaseApptestConfiguration + +class RealDatabaseConfiguration(BaseApptestConfiguration): + init_repository = False + sourcesdef = DEFAULT_SOURCES.copy() + + def sources(self): + """ + By default, we run tests with the sqlite DB backend. + One may use its own configuration by just creating a + 'sources' file in the test directory from wich tests are + launched. + """ + self._sources = self.sourcesdef + return self._sources + + +def buildconfig(dbuser, dbpassword, dbname, adminuser, adminpassword, dbhost=None): + """convenience function that builds a real-db configuration class""" + sourcesdef = {'system': {'adapter' : 'native', + 'db-encoding' : 'UTF-8', #'ISO-8859-1', + 'db-user' : dbuser, + 'db-password' : dbpassword, + 'db-name' : dbname, + 'db-driver' : 'postgres', + 'db-host' : dbhost, + }, + 'admin' : {'login': adminuser, + 'password': adminpassword, + }, + } + return type('MyRealDBConfig', (RealDatabaseConfiguration,), + {'sourcesdef': sourcesdef}) + + +def loadconfig(filename): + """convenience function that builds a real-db configuration class + from a file + """ + return type('MyRealDBConfig', (RealDatabaseConfiguration,), + {'sourcesdef': toolsutils.read_config(filename)}) diff -r 8074dd88e21b -r a9a2dca5db20 devtools/test/unittest_testlib.py --- a/devtools/test/unittest_testlib.py Tue Aug 11 17:06:02 2009 +0200 +++ b/devtools/test/unittest_testlib.py Tue Aug 11 17:15:54 2009 +0200 @@ -10,11 +10,11 @@ from unittest import TestSuite -from logilab.common.testlib import (TestCase, unittest_main, mock_object, +from logilab.common.testlib import (TestCase, unittest_main, SkipAwareTextTestRunner) + from cubicweb.devtools import htmlparser - -from cubicweb.devtools.testlib import WebTest, EnvBasedTC +from cubicweb.devtools.testlib import CubicWebTC class WebTestTC(TestCase): @@ -23,7 +23,7 @@ self.runner = SkipAwareTextTestRunner(stream=output) def test_error_raised(self): - class MyWebTest(WebTest): + class MyWebTest(CubicWebTC): def test_error_view(self): self.add_entity('Bug', title=u"bt") @@ -39,7 +39,7 @@ self.assertEquals(len(result.failures), 1) -class TestLibTC(EnvBasedTC): +class TestLibTC(CubicWebTC): def test_add_entity_with_relation(self): bug = self.add_entity(u'Bug', title=u"toto") self.add_entity(u'Bug', title=u"tata", identical_to=bug) diff -r 8074dd88e21b -r a9a2dca5db20 devtools/testlib.py --- a/devtools/testlib.py Tue Aug 11 17:06:02 2009 +0200 +++ b/devtools/testlib.py Tue Aug 11 17:15:54 2009 +0200 @@ -1,4 +1,4 @@ -"""this module contains base classes for web tests +"""this module contains base classes and utilities for cubicweb tests :organization: Logilab :copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2. @@ -7,84 +7,505 @@ """ __docformat__ = "restructuredtext en" +import os import sys +import re +from urllib import unquote from math import log -from logilab.common.debugger import Debugger -from logilab.common.testlib import InnerTest -from logilab.common.pytest import nocoverage +import simplejson + +import yams.schema -from cubicweb.devtools import VIEW_VALIDATORS -from cubicweb.devtools.apptest import EnvBasedTC -from cubicweb.devtools._apptest import unprotected_entities, SYSTEM_RELATIONS -from cubicweb.devtools.htmlparser import DTDValidator, SaxOnlyValidator, HTMLValidator -from cubicweb.devtools.fill import insert_entity_queries, make_relations_queries +from logilab.common.testlib import TestCase, InnerTest +from logilab.common.pytest import nocoverage, pause_tracing, resume_tracing +from logilab.common.debugger import Debugger +from logilab.common.umessage import message_from_string +from logilab.common.decorators import cached, classproperty +from logilab.common.deprecation import deprecated -from cubicweb.sobjects.notification import NotificationView - -from cubicweb.vregistry import NoSelectableObject +from cubicweb import NoSelectableObject, cwconfig, devtools, web, server +from cubicweb.dbapi import repo_connect, ConnectionProperties, ProgrammingError +from cubicweb.sobjects import notification +from cubicweb.web import application +from cubicweb.devtools import SYSTEM_ENTITIES, SYSTEM_RELATIONS, VIEW_VALIDATORS +from cubicweb.devtools import fake, htmlparser -## TODO ############### -# creation tests: make sure an entity was actually created -# Existing Test Environment +# low-level utilities ########################################################## class CubicWebDebugger(Debugger): - + """special debugger class providing a 'view' function which saves some + html into a temporary file and open a web browser to examinate it. + """ def do_view(self, arg): import webbrowser data = self._getval(arg) file('/tmp/toto.html', 'w').write(data) webbrowser.open('file:///tmp/toto.html') -def how_many_dict(schema, cursor, how_many, skip): - """compute how many entities by type we need to be able to satisfy relations - cardinality - """ - # compute how many entities by type we need to be able to satisfy relation constraint - relmap = {} - for rschema in schema.relations(): - if rschema.is_final(): - continue - for subj, obj in rschema.iter_rdefs(): - card = rschema.rproperty(subj, obj, 'cardinality') - if card[0] in '1?' and len(rschema.subjects(obj)) == 1: - relmap.setdefault((rschema, subj), []).append(str(obj)) - if card[1] in '1?' and len(rschema.objects(subj)) == 1: - relmap.setdefault((rschema, obj), []).append(str(subj)) - unprotected = unprotected_entities(schema) - for etype in skip: - unprotected.add(etype) - howmanydict = {} - for etype in unprotected_entities(schema, strict=True): - howmanydict[str(etype)] = cursor.execute('Any COUNT(X) WHERE X is %s' % etype)[0][0] - if etype in unprotected: - howmanydict[str(etype)] += how_many - for (rschema, etype), targets in relmap.iteritems(): - # XXX should 1. check no cycle 2. propagate changes - relfactor = sum(howmanydict[e] for e in targets) - howmanydict[str(etype)] = max(relfactor, howmanydict[etype]) - return howmanydict - def line_context_filter(line_no, center, before=3, after=None): """return true if line are in context - if after is None: after = before""" + + if after is None: after = before + """ if after is None: after = before return center - before <= line_no <= center + after -## base webtest class ######################################################### -VALMAP = {None: None, 'dtd': DTDValidator, 'xml': SaxOnlyValidator} + +def unprotected_entities(schema, strict=False): + """returned a set of each non final entity type, excluding "system" entities + (eg CWGroup, CWUser...) + """ + if strict: + protected_entities = yams.schema.BASE_TYPES + else: + protected_entities = yams.schema.BASE_TYPES.union(SYSTEM_ENTITIES) + return set(schema.entities()) - protected_entities + + +def get_versions(self, checkversions=False): + """return the a dictionary containing cubes used by this instance + as key with their version as value, including cubicweb version. This is a + public method, not requiring a session id. + + replace Repository.get_versions by this method if you don't want versions + checking + """ + vcconf = {'cubicweb': self.config.cubicweb_version()} + self.config.bootstrap_cubes() + for pk in self.config.cubes(): + version = self.config.cube_version(pk) + vcconf[pk] = version + self.config._cubes = None + return vcconf + + +def refresh_repo(repo): + devtools.reset_test_database(repo.config) + for pool in repo.pools: + pool.reconnect() + repo._type_source_cache = {} + repo._extid_cache = {} + repo.querier._rql_cache = {} + for source in repo.sources: + source.reset_caches() + + +# email handling, to test emails sent by an application ######################## + +MAILBOX = [] + +class Email: + """you'll get instances of Email into MAILBOX during tests that trigger + some notification. + + * `msg` is the original message object + + * `recipients` is a list of email address which are the recipients of this + message + """ + def __init__(self, recipients, msg): + self.recipients = recipients + self.msg = msg + + @property + def message(self): + return message_from_string(self.msg) + + @property + def subject(self): + return self.message.get('Subject') + + @property + def content(self): + return self.message.get_payload(decode=True) + + def __repr__(self): + return '' % (','.join(self.recipients), + self.message.get('Subject')) + +# the trick to get email into MAILBOX instead of actually sent: monkey patch +# cwconfig.SMTP object +class MockSMTP: + def __init__(self, server, port): + pass + def close(self): + pass + def sendmail(self, helo_addr, recipients, msg): + MAILBOX.append(Email(recipients, msg)) + +cwconfig.SMTP = MockSMTP + + +# base class for cubicweb tests requiring a full cw environments ############### + +class CubicWebTC(TestCase): + """abstract class for test using an apptest environment + + attributes: + `vreg`, the vregistry + `schema`, self.vreg.schema + `config`, cubicweb configuration + `cnx`, dbapi connection to the repository using an admin user + `session`, server side session associated to `cnx` + `app`, the cubicweb publisher (for web testing) + `repo`, the repository object + + `admlogin`, login of the admin user + `admpassword`, password of the admin user + + """ + appid = 'data' + configcls = devtools.ApptestConfiguration + + @classproperty + def config(cls): + """return the configuration object. Configuration is cached on the test + class. + """ + try: + return cls.__dict__['_config'] + except KeyError: + config = cls._config = cls.configcls(cls.appid) + config.mode = 'test' + return config + + @classmethod + def init_config(cls, config): + """configuration initialization hooks. You may want to override this.""" + source = config.sources()['system'] + cls.admlogin = unicode(source['db-user']) + cls.admpassword = source['db-password'] + # uncomment the line below if you want rql queries to be logged + #config.global_set_option('query-log-file', '/tmp/test_rql_log.' + `os.getpid()`) + config.global_set_option('log-file', None) + # set default-dest-addrs to a dumb email address to avoid mailbox or + # mail queue pollution + config.global_set_option('default-dest-addrs', ['whatever']) + try: + send_to = '%s@logilab.fr' % os.getlogin() + except OSError: + send_to = '%s@logilab.fr' % (os.environ.get('USER') + or os.environ.get('USERNAME') + or os.environ.get('LOGNAME')) + config.global_set_option('sender-addr', send_to) + config.global_set_option('default-dest-addrs', send_to) + config.global_set_option('sender-name', 'cubicweb-test') + config.global_set_option('sender-addr', 'cubicweb-test@logilab.fr') + # web resources + config.global_set_option('base-url', devtools.BASE_URL) + try: + config.global_set_option('embed-allowed', re.compile('.*')) + except: # not in server only configuration + pass + + @classmethod + def _init_repo(cls): + """init the repository and connection to it. + + Repository and connection are cached on the test class. Once + initialized, we simply reset connections and repository caches. + """ + if not 'repo' in cls.__dict__: + cls._build_repo() + else: + cls.cnx.rollback() + cls._refresh_repo() + + @classmethod + def _build_repo(cls): + cls.repo, cls.cnx = devtools.init_test_database(config=cls.config) + cls.init_config(cls.config) + cls.vreg = cls.repo.vreg + cls._orig_cnx = cls.cnx + cls.config.repository = lambda x=None: cls.repo + # necessary for authentication tests + cls.cnx.login = cls.admlogin + cls.cnx.password = cls.admpassword + + @classmethod + def _refresh_repo(cls): + refresh_repo(cls.repo) + + # global resources accessors ############################################### + + @property + def schema(self): + """return the application schema""" + return self.vreg.schema + + @property + def session(self): + """return current server side session (using default manager account)""" + return self.repo._sessions[self.cnx.sessionid] + + @property + def adminsession(self): + """return current server side session (using default manager account)""" + return self.repo._sessions[self._orig_cnx.sessionid] + + def set_option(self, optname, value): + self.config.global_set_option(optname, value) + + def set_debug(self, debugmode): + server.set_debug(debugmode) + + # default test setup and teardown ######################################### + + def setUp(self): + pause_tracing() + self._init_repo() + resume_tracing() + self.setup_database() + self.commit() + MAILBOX[:] = [] # reset mailbox + + def setup_database(self): + """add your database setup code by overriding this method""" + + # user / session management ############################################### + + def user(self, req=None): + """return the application schema""" + if req is None: + req = self.request() + return self.cnx.user(req) + else: + return req.user -class WebTest(EnvBasedTC): - """base class for web tests""" - __abstract__ = True + def create_user(self, login, groups=('users',), password=None, req=None, + commit=True): + """create and return a new user entity""" + if password is None: + password = login.encode('utf8') + cursor = self._orig_cnx.cursor(req or self.request()) + rset = cursor.execute('INSERT CWUser X: X login %(login)s, X upassword %(passwd)s,' + 'X in_state S WHERE S name "activated"', + {'login': unicode(login), 'passwd': password}) + user = rset.get_entity(0, 0) + cursor.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)' + % ','.join(repr(g) for g in groups), + {'x': user.eid}, 'x') + user.clear_related_cache('in_group', 'subject') + if commit: + self._orig_cnx.commit() + return user + + def login(self, login, password=None): + """return a connection for the given login/password""" + if login == self.admlogin: + self.restore_connection() + else: + self.cnx = repo_connect(self.repo, unicode(login), + password or str(login), + ConnectionProperties('inmemory')) + if login == self.vreg.config.anonymous_user()[0]: + self.cnx.anonymous_connection = True + return self.cnx + + def restore_connection(self): + if not self.cnx is self._orig_cnx: + try: + self.cnx.close() + except ProgrammingError: + pass # already closed + self.cnx = self._orig_cnx + + # db api ################################################################## + + @nocoverage + def cursor(self, req=None): + return self.cnx.cursor(req or self.request()) + + @nocoverage + def execute(self, rql, args=None, eidkey=None, req=None): + """executes , builds a resultset, and returns a couple (rset, req) + where req is a FakeRequest + """ + req = req or self.request(rql=rql) + return self.cnx.cursor(req).execute(unicode(rql), args, eidkey) + + @nocoverage + def commit(self): + self.cnx.commit() + + @nocoverage + def rollback(self): + try: + self.cnx.rollback() + except ProgrammingError: + pass + + # # server side db api ####################################################### + + def sexecute(self, rql, args=None, eid_key=None): + self.session.set_pool() + return self.session.execute(rql, args, eid_key) + + # other utilities ######################################################### + + def entity(self, rql, args=None, eidkey=None, req=None): + return self.execute(rql, args, eidkey, req=req).get_entity(0, 0) + + def add_entity(self, etype, req=None, **kwargs): + rql = ['INSERT %s X' % etype] + # dict for replacement in RQL Request + args = {} + if kwargs: + rql.append(':') + # dict to define new entities variables + entities = {} + # assignement part of the request + sub_rql = [] + for key, value in kwargs.iteritems(): + # entities + if hasattr(value, 'eid'): + new_value = "%s__" % key.upper() + entities[new_value] = value.eid + args[new_value] = value.eid + + sub_rql.append("X %s %s" % (key, new_value)) + # final attributes + else: + sub_rql.append('X %s %%(%s)s' % (key, key)) + args[key] = value + rql.append(', '.join(sub_rql)) + if entities: + rql.append('WHERE') + # WHERE part of the request (to link entity to they eid) + sub_rql = [] + for key, value in entities.iteritems(): + sub_rql.append("%s eid %%(%s)s" % (key, key)) + rql.append(', '.join(sub_rql)) + return self.execute(' '.join(rql), args, req=req).get_entity(0, 0) + + # vregistry inspection utilities ########################################### + + def pviews(self, req, rset): + return sorted((a.id, a.__class__) + for a in self.vreg['views'].possible_views(req, rset=rset)) - pdbclass = CubicWebDebugger - # this is a hook to be able to define a list of rql queries - # that are application dependent and cannot be guessed automatically - application_rql = [] + def pactions(self, req, rset, + skipcategories=('addrelated', 'siteactions', 'useractions')): + return [(a.id, a.__class__) + for a in self.vreg['actions'].possible_vobjects(req, rset=rset) + if a.category not in skipcategories] + + def pactions_by_cats(self, req, rset, categories=('addrelated',)): + return [(a.id, a.__class__) + for a in self.vreg['actions'].possible_vobjects(req, rset=rset) + if a.category in categories] + + def pactionsdict(self, req, rset, + skipcategories=('addrelated', 'siteactions', 'useractions')): + res = {} + for a in self.vreg['actions'].possible_vobjects(req, rset=rset): + if a.category not in skipcategories: + res.setdefault(a.category, []).append(a.__class__) + return res + + def list_views_for(self, rset): + """returns the list of views that can be applied on `rset`""" + req = rset.req + only_once_vids = ('primary', 'secondary', 'text') + req.data['ex'] = ValueError("whatever") + viewsvreg = self.vreg['views'] + for vid, views in viewsvreg.items(): + if vid[0] == '_': + continue + if rset.rowcount > 1 and vid in only_once_vids: + continue + views = [view for view in views + if view.category != 'startupview' + and not issubclass(view, NotificationView)] + if views: + try: + view = viewsvreg._select_best(views, req, rset=rset) + if view.linkable(): + yield view + else: + not_selected(self.vreg, view) + # else the view is expected to be used as subview and should + # not be tested directly + except NoSelectableObject: + continue + + def list_actions_for(self, rset): + """returns the list of actions that can be applied on `rset`""" + req = rset.req + for action in self.vreg['actions'].possible_objects(req, rset=rset): + yield action + + def list_boxes_for(self, rset): + """returns the list of boxes that can be applied on `rset`""" + req = rset.req + for box in self.vreg['boxes'].possible_objects(req, rset=rset): + yield box + + def list_startup_views(self): + """returns the list of startup views""" + req = self.request() + for view in self.vreg['views'].possible_views(req, None): + if view.category == 'startupview': + yield view.id + else: + not_selected(self.vreg, view) + + # web ui testing utilities ################################################# + + @property + @cached + def app(self): + """return a cubicweb publisher""" + return application.CubicWebPublisher(self.config, vreg=self.vreg) + + requestcls = fake.FakeRequest + def request(self, *args, **kwargs): + """return a web ui request""" + req = self.requestcls(self.vreg, form=kwargs) + req.set_connection(self.cnx) + return req + + def remote_call(self, fname, *args): + """remote json call simulation""" + dump = simplejson.dumps + args = [dump(arg) for arg in args] + req = self.request(fname=fname, pageid='123', arg=args) + ctrl = self.vreg['controllers'].select('json', req) + return ctrl.publish(), req + + def publish(self, req): + """call the publish method of the edit controller""" + ctrl = self.vreg['controllers'].select('edit', req) + try: + result = ctrl.publish() + req.cnx.commit() + except web.Redirect: + req.cnx.commit() + raise + return result + + def expect_redirect_publish(self, req): + """call the publish method of the edit controller, expecting to get a + Redirect exception.""" + try: + self.publish(req) + except web.Redirect, ex: + try: + path, params = ex.location.split('?', 1) + except: + path, params = ex.location, "" + req._url = path + cleanup = lambda p: (p[0], unquote(p[1])) + params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p) + return req.relative_path(False), params # path.rsplit('/', 1)[-1], params + else: + self.fail('expected a Redirect exception') + + # content validation ####################################################### # validators are used to validate (XML, DTD, whatever) view's content # validators availables are : @@ -99,8 +520,8 @@ # snippets #'text/html': DTDValidator, #'application/xhtml+xml': DTDValidator, - 'application/xml': SaxOnlyValidator, - 'text/xml': SaxOnlyValidator, + 'application/xml': htmlparser.SaxOnlyValidator, + 'text/xml': htmlparser.SaxOnlyValidator, 'text/plain': None, 'text/comma-separated-values': None, 'text/x-vcard': None, @@ -109,68 +530,9 @@ 'image/png': None, } # maps vid : validator name (override content_type_validators) - vid_validators = dict((vid, VALMAP[valkey]) + vid_validators = dict((vid, htmlparser.VALMAP[valkey]) for vid, valkey in VIEW_VALIDATORS.iteritems()) - no_auto_populate = () - ignored_relations = () - - def custom_populate(self, how_many, cursor): - pass - - def post_populate(self, cursor): - pass - - @nocoverage - def auto_populate(self, how_many): - """this method populates the database with `how_many` entities - of each possible type. It also inserts random relations between them - """ - cu = self.cursor() - self.custom_populate(how_many, cu) - vreg = self.vreg - howmanydict = how_many_dict(self.schema, cu, how_many, self.no_auto_populate) - for etype in unprotected_entities(self.schema): - if etype in self.no_auto_populate: - continue - nb = howmanydict.get(etype, how_many) - for rql, args in insert_entity_queries(etype, self.schema, vreg, nb): - cu.execute(rql, args) - edict = {} - for etype in unprotected_entities(self.schema, strict=True): - rset = cu.execute('%s X' % etype) - edict[str(etype)] = set(row[0] for row in rset.rows) - existingrels = {} - ignored_relations = SYSTEM_RELATIONS + self.ignored_relations - for rschema in self.schema.relations(): - if rschema.is_final() or rschema in ignored_relations: - continue - rset = cu.execute('DISTINCT Any X,Y WHERE X %s Y' % rschema) - existingrels.setdefault(rschema.type, set()).update((x, y) for x, y in rset) - q = make_relations_queries(self.schema, edict, cu, ignored_relations, - existingrels=existingrels) - for rql, args in q: - cu.execute(rql, args) - self.post_populate(cu) - self.commit() - - @nocoverage - def _check_html(self, output, view, template='main-template'): - """raises an exception if the HTML is invalid""" - try: - validatorclass = self.vid_validators[view.id] - except KeyError: - if template is None: - default_validator = HTMLValidator - else: - default_validator = DTDValidator - validatorclass = self.content_type_validators.get(view.content_type, - default_validator) - if validatorclass is None: - return None - validator = validatorclass() - return validator.parse_string(output.strip()) - def view(self, vid, rset=None, req=None, template='main-template', **kwargs): @@ -244,9 +606,145 @@ raise AssertionError, msg, tcbk + @nocoverage + def _check_html(self, output, view, template='main-template'): + """raises an exception if the HTML is invalid""" + try: + validatorclass = self.vid_validators[view.id] + except KeyError: + if template is None: + default_validator = htmlparser.HTMLValidator + else: + default_validator = htmlparser.DTDValidator + validatorclass = self.content_type_validators.get(view.content_type, + default_validator) + if validatorclass is None: + return None + validator = validatorclass() + return validator.parse_string(output.strip()) + + # deprecated ############################################################### + + @deprecated('use self.vreg["etypes"].etype_class(etype)(self.request())') + def etype_instance(self, etype, req=None): + req = req or self.request() + e = self.vreg['etypes'].etype_class(etype)(req) + e.eid = None + return e + + @nocoverage + @deprecated('use req = self.request(); rset = req.execute()') + def rset_and_req(self, rql, optional_args=None, args=None, eidkey=None): + """executes , builds a resultset, and returns a + couple (rset, req) where req is a FakeRequest + """ + return (self.execute(rql, args, eidkey), + self.request(rql=rql, **optional_args or {})) + + +# auto-populating test classes and utilities ################################### + +from cubicweb.devtools.fill import insert_entity_queries, make_relations_queries + +def how_many_dict(schema, cursor, how_many, skip): + """compute how many entities by type we need to be able to satisfy relations + cardinality + """ + # compute how many entities by type we need to be able to satisfy relation constraint + relmap = {} + for rschema in schema.relations(): + if rschema.is_final(): + continue + for subj, obj in rschema.iter_rdefs(): + card = rschema.rproperty(subj, obj, 'cardinality') + if card[0] in '1?' and len(rschema.subjects(obj)) == 1: + relmap.setdefault((rschema, subj), []).append(str(obj)) + if card[1] in '1?' and len(rschema.objects(subj)) == 1: + relmap.setdefault((rschema, obj), []).append(str(subj)) + unprotected = unprotected_entities(schema) + for etype in skip: + unprotected.add(etype) + howmanydict = {} + for etype in unprotected_entities(schema, strict=True): + howmanydict[str(etype)] = cursor.execute('Any COUNT(X) WHERE X is %s' % etype)[0][0] + if etype in unprotected: + howmanydict[str(etype)] += how_many + for (rschema, etype), targets in relmap.iteritems(): + # XXX should 1. check no cycle 2. propagate changes + relfactor = sum(howmanydict[e] for e in targets) + howmanydict[str(etype)] = max(relfactor, howmanydict[etype]) + return howmanydict + + +class AutoPopulateTest(CubicWebTC): + """base class for test with auto-populating of the database""" + __abstract__ = True + + pdbclass = CubicWebDebugger + # this is a hook to be able to define a list of rql queries + # that are application dependent and cannot be guessed automatically + application_rql = [] + + no_auto_populate = () + ignored_relations = () + def to_test_etypes(self): return unprotected_entities(self.schema, strict=True) + def custom_populate(self, how_many, cursor): + pass + + def post_populate(self, cursor): + pass + + @nocoverage + def auto_populate(self, how_many): + """this method populates the database with `how_many` entities + of each possible type. It also inserts random relations between them + """ + cu = self.cursor() + self.custom_populate(how_many, cu) + vreg = self.vreg + howmanydict = how_many_dict(self.schema, cu, how_many, self.no_auto_populate) + for etype in unprotected_entities(self.schema): + if etype in self.no_auto_populate: + continue + nb = howmanydict.get(etype, how_many) + for rql, args in insert_entity_queries(etype, self.schema, vreg, nb): + cu.execute(rql, args) + edict = {} + for etype in unprotected_entities(self.schema, strict=True): + rset = cu.execute('%s X' % etype) + edict[str(etype)] = set(row[0] for row in rset.rows) + existingrels = {} + ignored_relations = SYSTEM_RELATIONS | set(self.ignored_relations) + for rschema in self.schema.relations(): + if rschema.is_final() or rschema in ignored_relations: + continue + rset = cu.execute('DISTINCT Any X,Y WHERE X %s Y' % rschema) + existingrels.setdefault(rschema.type, set()).update((x, y) for x, y in rset) + q = make_relations_queries(self.schema, edict, cu, ignored_relations, + existingrels=existingrels) + for rql, args in q: + cu.execute(rql, args) + self.post_populate(cu) + self.commit() + + def iter_individual_rsets(self, etypes=None, limit=None): + etypes = etypes or self.to_test_etypes() + for etype in etypes: + if limit: + rql = 'Any X LIMIT %s WHERE X is %s' % (limit, etype) + else: + rql = 'Any X WHERE X is %s' % etype + rset = self.execute(rql) + for row in xrange(len(rset)): + if limit and row > limit: + break + # XXX iirk + rset2 = rset.limit(limit=1, offset=row) + yield rset2 + def iter_automatic_rsets(self, limit=10): """generates basic resultsets for each entity type""" etypes = self.to_test_etypes() @@ -267,54 +765,6 @@ for rql in self.application_rql: yield self.execute(rql) - - def list_views_for(self, rset): - """returns the list of views that can be applied on `rset`""" - req = rset.req - only_once_vids = ('primary', 'secondary', 'text') - req.data['ex'] = ValueError("whatever") - viewsvreg = self.vreg['views'] - for vid, views in viewsvreg.items(): - if vid[0] == '_': - continue - if rset.rowcount > 1 and vid in only_once_vids: - continue - views = [view for view in views - if view.category != 'startupview' - and not issubclass(view, NotificationView)] - if views: - try: - view = viewsvreg._select_best(views, req, rset=rset) - if view.linkable(): - yield view - else: - not_selected(self.vreg, view) - # else the view is expected to be used as subview and should - # not be tested directly - except NoSelectableObject: - continue - - def list_actions_for(self, rset): - """returns the list of actions that can be applied on `rset`""" - req = rset.req - for action in self.vreg['actions'].possible_objects(req, rset=rset): - yield action - - def list_boxes_for(self, rset): - """returns the list of boxes that can be applied on `rset`""" - req = rset.req - for box in self.vreg['boxes'].possible_objects(req, rset=rset): - yield box - - def list_startup_views(self): - """returns the list of startup views""" - req = self.request() - for view in self.vreg['views'].possible_views(req, None): - if view.category == 'startupview': - yield view.id - else: - not_selected(self.vreg, view) - def _test_everything_for(self, rset): """this method tries to find everything that can be tested for `rset` and yields a callable test (as needed in generative tests) @@ -342,8 +792,16 @@ return '%s_%s_%s' % ('_'.join(rset.column_types(0)), objid, objtype) -class AutomaticWebTest(WebTest): +# concrete class for automated application testing ############################ + +class AutomaticWebTest(AutoPopulateTest): """import this if you wan automatic tests to be ran""" + def setUp(self): + AutoPopulateTest.setUp(self) + # access to self.app for proper initialization of the authentication + # machinery (else some views may fail) + self.app + ## one each def test_one_each_config(self): self.auto_populate(1) @@ -365,17 +823,7 @@ yield self.view, vid, None, req -class RealDBTest(WebTest): - - def iter_individual_rsets(self, etypes=None, limit=None): - etypes = etypes or unprotected_entities(self.schema, strict=True) - for etype in etypes: - rset = self.execute('Any X WHERE X is %s' % etype) - for row in xrange(len(rset)): - if limit and row > limit: - break - rset2 = rset.limit(limit=1, offset=row) - yield rset2 +# registry instrumentization ################################################### def not_selected(vreg, appobject): try: @@ -383,10 +831,11 @@ except (KeyError, AttributeError): pass + def vreg_instrumentize(testclass): + # XXX broken from cubicweb.devtools.apptest import TestEnvironment - env = testclass._env = TestEnvironment('data', configcls=testclass.configcls, - requestcls=testclass.requestcls) + env = testclass._env = TestEnvironment('data', configcls=testclass.configcls) for reg in env.vreg.values(): reg._selected = {} try: @@ -405,6 +854,7 @@ reg.__class__._select_best = instr_select_best reg.__class__.__orig_select_best = orig_select_best + def print_untested_objects(testclass, skipregs=('hooks', 'etypes')): for regname, reg in testclass._env.vreg.iteritems(): if regname in skipregs: diff -r 8074dd88e21b -r a9a2dca5db20 entities/test/unittest_base.py --- a/entities/test/unittest_base.py Tue Aug 11 17:06:02 2009 +0200 +++ b/entities/test/unittest_base.py Tue Aug 11 17:15:54 2009 +0200 @@ -11,7 +11,7 @@ from logilab.common.decorators import clear_cache from logilab.common.interface import implements -from cubicweb.devtools.apptest import EnvBasedTC +from cubicweb.devtools.testlib import CubicWebTC from cubicweb import ValidationError from cubicweb.interfaces import IMileStone, IWorkflowable @@ -20,7 +20,7 @@ from cubicweb.web.widgets import AutoCompletionWidget -class BaseEntityTC(EnvBasedTC): +class BaseEntityTC(CubicWebTC): def setup_database(self): self.member = self.create_user('member') @@ -260,7 +260,7 @@ self.assertEquals(e.latest_trinfo().comment, 'deactivate 2') -class InterfaceTC(EnvBasedTC): +class InterfaceTC(CubicWebTC): def test_nonregr_subclasses_and_mixins_interfaces(self): self.failUnless(implements(CWUser, IWorkflowable)) @@ -275,7 +275,7 @@ self.failUnless(implements(MyUser_, IWorkflowable)) -class SpecializedEntityClassesTC(EnvBasedTC): +class SpecializedEntityClassesTC(CubicWebTC): def select_eclass(self, etype): # clear selector cache diff -r 8074dd88e21b -r a9a2dca5db20 etwist/test/unittest_server.py --- a/etwist/test/unittest_server.py Tue Aug 11 17:06:02 2009 +0200 +++ b/etwist/test/unittest_server.py Tue Aug 11 17:15:54 2009 +0200 @@ -5,11 +5,11 @@ :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr :license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses """ -from cubicweb.devtools.apptest import EnvBasedTC +from cubicweb.devtools.testlib import CubicWebTC from cubicweb.etwist.server import host_prefixed_baseurl -class HostPrefixedBaseURLTC(EnvBasedTC): +class HostPrefixedBaseURLTC(CubicWebTC): def _check(self, baseurl, host, waited): self.assertEquals(host_prefixed_baseurl(baseurl, host), waited, diff -r 8074dd88e21b -r a9a2dca5db20 ext/test/unittest_rest.py --- a/ext/test/unittest_rest.py Tue Aug 11 17:06:02 2009 +0200 +++ b/ext/test/unittest_rest.py Tue Aug 11 17:15:54 2009 +0200 @@ -6,11 +6,11 @@ :license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses """ from logilab.common.testlib import unittest_main -from cubicweb.devtools.apptest import EnvBasedTC +from cubicweb.devtools.testlib import CubicWebTC from cubicweb.ext.rest import rest_publish -class RestTC(EnvBasedTC): +class RestTC(CubicWebTC): def context(self): return self.execute('CWUser X WHERE X login "admin"').get_entity(0, 0) diff -r 8074dd88e21b -r a9a2dca5db20 goa/test/unittest_editcontroller.py --- a/goa/test/unittest_editcontroller.py Tue Aug 11 17:06:02 2009 +0200 +++ b/goa/test/unittest_editcontroller.py Tue Aug 11 17:15:54 2009 +0200 @@ -401,11 +401,11 @@ # which fires a Redirect # 2/ When re-publishing the copy form, the publisher implicitly commits try: - self.env.app.publish('edit', self.req) + self.app.publish('edit', self.req) except Redirect: self.req.form['rql'] = 'Any X WHERE X eid %s' % p.eid self.req.form['vid'] = 'copy' - self.env.app.publish('view', self.req) + self.app.publish('view', self.req) rset = self.req.execute('CWUser P WHERE P surname "Boom"') self.assertEquals(len(rset), 0) finally: diff -r 8074dd88e21b -r a9a2dca5db20 server/test/unittest_checkintegrity.py --- a/server/test/unittest_checkintegrity.py Tue Aug 11 17:06:02 2009 +0200 +++ b/server/test/unittest_checkintegrity.py Tue Aug 11 17:15:54 2009 +0200 @@ -13,7 +13,7 @@ from cubicweb.server.checkintegrity import check -repo, cnx = init_test_database('sqlite') +repo, cnx = init_test_database() class CheckIntegrityTC(TestCase): def test(self): diff -r 8074dd88e21b -r a9a2dca5db20 server/test/unittest_hookhelper.py --- a/server/test/unittest_hookhelper.py Tue Aug 11 17:06:02 2009 +0200 +++ b/server/test/unittest_hookhelper.py Tue Aug 11 17:15:54 2009 +0200 @@ -8,18 +8,28 @@ """ from logilab.common.testlib import unittest_main -from cubicweb.devtools.apptest import RepositoryBasedTC +from cubicweb.devtools.testlib import CubicWebTC from cubicweb.server.pool import LateOperation, Operation, SingleLastOperation from cubicweb.server.hookhelper import * +from cubicweb.server import hooks, schemahooks -class HookHelpersTC(RepositoryBasedTC): +def clean_session_ops(func): + def wrapper(self, *args, **kwargs): + try: + return func(self, *args, **kwargs) + finally: + self.session.pending_operations[:] = [] + return wrapper + +class HookHelpersTC(CubicWebTC): def setUp(self): - RepositoryBasedTC.setUp(self) + CubicWebTC.setUp(self) self.hm = self.repo.hm + @clean_session_ops def test_late_operation(self): session = self.session l1 = LateOperation(session) @@ -27,6 +37,7 @@ l3 = Operation(session) self.assertEquals(session.pending_operations, [l3, l1, l2]) + @clean_session_ops def test_single_last_operation(self): session = self.session l0 = SingleLastOperation(session) @@ -37,8 +48,8 @@ l4 = SingleLastOperation(session) self.assertEquals(session.pending_operations, [l3, l1, l2, l4]) + @clean_session_ops def test_global_operation_order(self): - from cubicweb.server import hooks, schemahooks session = self.session op1 = hooks.DelayedDeleteOp(session) op2 = schemahooks.MemSchemaRDefDel(session) @@ -80,10 +91,7 @@ self.assertEquals(len(searchedops), 1, self.session.pending_operations) self.commit() - searchedops = [op for op in self.session.pending_operations - if isinstance(op, SendMailOp)] - self.assertEquals(len(searchedops), 0, - self.session.pending_operations) + self.assertEquals([], self.session.pending_operations) if __name__ == '__main__': unittest_main() diff -r 8074dd88e21b -r a9a2dca5db20 server/test/unittest_hooks.py --- a/server/test/unittest_hooks.py Tue Aug 11 17:06:02 2009 +0200 +++ b/server/test/unittest_hooks.py Tue Aug 11 17:15:54 2009 +0200 @@ -11,7 +11,7 @@ from cubicweb import (ConnectionError, RepositoryError, ValidationError, AuthenticationError, BadConnectionId) -from cubicweb.devtools.apptest import RepositoryBasedTC, get_versions +from cubicweb.devtools.testlib import CubicWebTC, get_versions from cubicweb.server.sqlutils import SQL_PREFIX from cubicweb.server.repository import Repository @@ -26,7 +26,7 @@ -class CoreHooksTC(RepositoryBasedTC): +class CoreHooksTC(CubicWebTC): def test_delete_internal_entities(self): self.assertRaises(RepositoryError, self.execute, @@ -62,7 +62,7 @@ def test_delete_if_singlecard1(self): self.assertEquals(self.repo.schema['in_state'].inlined, False) - ueid = self.create_user('toto') + ueid = self.create_user('toto').eid self.commit() self.execute('SET X in_state S WHERE S name "deactivated", X eid %(x)s', {'x': ueid}) rset = self.execute('Any S WHERE X in_state S, X eid %(x)s', {'x': ueid}) @@ -156,7 +156,7 @@ -class UserGroupHooksTC(RepositoryBasedTC): +class UserGroupHooksTC(CubicWebTC): def test_user_synchronization(self): self.create_user('toto', password='hop', commit=False) @@ -164,7 +164,7 @@ self.repo.connect, u'toto', 'hop') self.commit() cnxid = self.repo.connect(u'toto', 'hop') - self.failIfEqual(cnxid, self.cnxid) + self.failIfEqual(cnxid, self.session.id) self.execute('DELETE CWUser X WHERE X login "toto"') self.repo.execute(cnxid, 'State X') self.commit() @@ -184,7 +184,7 @@ self.assertEquals(user.groups, set(('managers',))) def test_user_composite_owner(self): - ueid = self.create_user('toto') + ueid = self.create_user('toto').eid # composite of euser should be owned by the euser regardless of who created it self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", U use_email X ' 'WHERE U login "toto"') @@ -200,7 +200,7 @@ self.failIf(self.execute('Any X WHERE X created_by Y, X eid >= %(x)s', {'x': eid})) -class CWPropertyHooksTC(RepositoryBasedTC): +class CWPropertyHooksTC(CubicWebTC): def test_unexistant_eproperty(self): ex = self.assertRaises(ValidationError, @@ -224,7 +224,7 @@ self.assertEquals(ex.errors, {'value': u'unauthorized value'}) -class SchemaHooksTC(RepositoryBasedTC): +class SchemaHooksTC(CubicWebTC): def test_duplicate_etype_error(self): # check we can't add a CWEType or CWRType entity if it already exists one @@ -246,24 +246,23 @@ self.assertEquals(ex.errors, {'login': 'the value "admin" is already used, use another one'}) -class SchemaModificationHooksTC(RepositoryBasedTC): +class SchemaModificationHooksTC(CubicWebTC): - def setUp(self): - if not hasattr(self, '_repo'): - # first initialization - repo = self.repo # set by the RepositoryBasedTC metaclass - # force to read schema from the database to get proper eid set on schema instances - repo.config._cubes = None - repo.fill_schema() - RepositoryBasedTC.setUp(self) + @classmethod + def init_config(cls, config): + super(SchemaModificationHooksTC, cls).init_config(config) + config._cubes = None + cls.repo.fill_schema() def index_exists(self, etype, attr, unique=False): + self.session.set_pool() dbhelper = self.session.pool.source('system').dbhelper sqlcursor = self.session.pool['system'] return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique) def test_base(self): schema = self.repo.schema + self.session.set_pool() dbhelper = self.session.pool.source('system').dbhelper sqlcursor = self.session.pool['system'] self.failIf(schema.has_entity('Societe2')) @@ -381,6 +380,7 @@ # schema modification hooks tests ######################################### def test_uninline_relation(self): + self.session.set_pool() dbhelper = self.session.pool.source('system').dbhelper sqlcursor = self.session.pool['system'] # Personne inline2 Affaire inline @@ -415,6 +415,7 @@ self.assertEquals(rset.rows[0], [peid, aeid]) def test_indexed_change(self): + self.session.set_pool() dbhelper = self.session.pool.source('system').dbhelper sqlcursor = self.session.pool['system'] try: @@ -433,6 +434,7 @@ self.failIf(self.index_exists('Affaire', 'sujet')) def test_unique_change(self): + self.session.set_pool() dbhelper = self.session.pool.source('system').dbhelper sqlcursor = self.session.pool['system'] try: @@ -481,10 +483,9 @@ self.commit() -class WorkflowHooksTC(RepositoryBasedTC): +class WorkflowHooksTC(CubicWebTC): - def setUp(self): - RepositoryBasedTC.setUp(self) + def setup_database(self): self.s_activated = self.execute('State X WHERE X name "activated"')[0][0] self.s_deactivated = self.execute('State X WHERE X name "deactivated"')[0][0] self.s_dummy = self.execute('INSERT State X: X name "dummy", X state_of E WHERE E name "CWUser"')[0][0] @@ -493,12 +494,6 @@ # so we can test wf enforcing on euser (managers don't have anymore this # enforcement self.execute('SET X require_group G WHERE G name "users", X transition_of ET, ET name "CWUser"') - self.commit() - - def tearDown(self): - self.execute('DELETE X require_group G WHERE G name "users", X transition_of ET, ET name "CWUser"') - self.commit() - RepositoryBasedTC.tearDown(self) def test_set_initial_state(self): ueid = self.execute('INSERT CWUser E: E login "x", E upassword "x", E in_group G ' @@ -511,13 +506,12 @@ self.assertEquals(initialstate, u'activated') def test_initial_state(self): - cnx = self.login('stduser') - cu = cnx.cursor() - self.assertRaises(ValidationError, cu.execute, + self.login('stduser') + self.assertRaises(ValidationError, self.execute, 'INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, ' 'X in_state S WHERE S name "deactivated"', {'pwd': 'oops'}) - cnx.close() # though managers can do whatever he want + self.restore_connection() self.execute('INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, ' 'X in_state S, X in_group G WHERE S name "deactivated", G name "users"', {'pwd': 'oops'}) self.commit() @@ -526,7 +520,7 @@ def test_transition_checking1(self): cnx = self.login('stduser') cu = cnx.cursor() - ueid = cnx.user(self.current_session()).eid + ueid = cnx.user(self.session).eid self.assertRaises(ValidationError, cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s', {'x': ueid, 's': self.s_activated}, 'x') @@ -535,7 +529,7 @@ def test_transition_checking2(self): cnx = self.login('stduser') cu = cnx.cursor() - ueid = cnx.user(self.current_session()).eid + ueid = cnx.user(self.session).eid self.assertRaises(ValidationError, cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s', {'x': ueid, 's': self.s_dummy}, 'x') @@ -544,7 +538,7 @@ def test_transition_checking3(self): cnx = self.login('stduser') cu = cnx.cursor() - ueid = cnx.user(self.current_session()).eid + ueid = cnx.user(self.session).eid cu.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s', {'x': ueid, 's': self.s_deactivated}, 'x') cnx.commit() @@ -560,7 +554,7 @@ def test_transition_checking4(self): cnx = self.login('stduser') cu = cnx.cursor() - ueid = cnx.user(self.current_session()).eid + ueid = cnx.user(self.session).eid cu.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s', {'x': ueid, 's': self.s_deactivated}, 'x') cnx.commit() @@ -602,7 +596,7 @@ self.assertEquals(tr.owned_by[0].login, 'admin') def test_transition_information_on_creation(self): - ueid = self.create_user('toto') + ueid = self.create_user('toto').eid rset = self.execute('TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid}) self.assertEquals(len(rset), 1) tr = rset.get_entity(0, 0) diff -r 8074dd88e21b -r a9a2dca5db20 server/test/unittest_hooksmanager.py --- a/server/test/unittest_hooksmanager.py Tue Aug 11 17:06:02 2009 +0200 +++ b/server/test/unittest_hooksmanager.py Tue Aug 11 17:15:54 2009 +0200 @@ -6,7 +6,7 @@ from cubicweb.server.hooksmanager import HooksManager, Hook from cubicweb.devtools import TestServerConfiguration -from cubicweb.devtools.apptest import RepositoryBasedTC +from cubicweb.devtools.testlib import CubicWebTC class HookCalled(Exception): pass @@ -144,7 +144,7 @@ self.called.append((subject, r_type, object)) -class SystemHooksTC(RepositoryBasedTC): +class SystemHooksTC(CubicWebTC): def test_startup_shutdown(self): import hooks # cubicweb/server/test/data/hooks.py @@ -168,7 +168,7 @@ events = ('whatever', 'another') accepts = ('Societe', 'Division') -class HookTC(RepositoryBasedTC): +class HookTC(CubicWebTC): def test_inheritance(self): self.assertEquals(list(MyHook.register_to()), zip(repeat('whatever'), ('Societe', 'Division', 'SubDivision')) diff -r 8074dd88e21b -r a9a2dca5db20 server/test/unittest_ldapuser.py --- a/server/test/unittest_ldapuser.py Tue Aug 11 17:06:02 2009 +0200 +++ b/server/test/unittest_ldapuser.py Tue Aug 11 17:15:54 2009 +0200 @@ -7,8 +7,8 @@ """ from logilab.common.testlib import TestCase, unittest_main, mock_object -from cubicweb.devtools import init_test_database, TestServerConfiguration -from cubicweb.devtools.apptest import RepositoryBasedTC +from cubicweb.devtools import TestServerConfiguration +from cubicweb.devtools.testlib import CubicWebTC from cubicweb.devtools.repotest import RQLGeneratorTC from cubicweb.server.sources.ldapuser import * @@ -34,36 +34,31 @@ -config = TestServerConfiguration('data') -config.sources_file = lambda : 'data/sourcesldap' -repo, cnx = init_test_database('sqlite', config=config) - -class LDAPUserSourceTC(RepositoryBasedTC): - repo, cnx = repo, cnx +class LDAPUserSourceTC(CubicWebTC): + config = TestServerConfiguration('data') + config.sources_file = lambda : 'data/sourcesldap' def patch_authenticate(self): self._orig_authenticate = LDAPUserSource.authenticate LDAPUserSource.authenticate = nopwd_authenticate - def setUp(self): - self._prepare() + def setup_database(self): # XXX: need this first query else we get 'database is locked' from # sqlite since it doesn't support multiple connections on the same # database # so doing, ldap inserted users don't get removed between each test - rset = self.execute('CWUser X') - self.commit() + rset = self.sexecute('CWUser X') # check we get some users from ldap self.assert_(len(rset) > 1) - self.maxeid = self.execute('Any MAX(X)')[0][0] def tearDown(self): if hasattr(self, '_orig_authenticate'): LDAPUserSource.authenticate = self._orig_authenticate - RepositoryBasedTC.tearDown(self) + CubicWebTC.tearDown(self) def test_authenticate(self): source = self.repo.sources_by_uri['ldapuser'] + self.session.set_pool() self.assertRaises(AuthenticationError, source.authenticate, self.session, 'toto', 'toto') @@ -73,7 +68,7 @@ def test_base(self): # check a known one - e = self.execute('CWUser X WHERE X login "syt"').get_entity(0, 0) + e = self.sexecute('CWUser X WHERE X login "syt"').get_entity(0, 0) self.assertEquals(e.login, 'syt') e.complete() self.assertEquals(e.creation_date, None) @@ -85,73 +80,73 @@ self.assertEquals(e.created_by, []) self.assertEquals(e.primary_email[0].address, 'Sylvain Thenault') # email content should be indexed on the user - rset = self.execute('CWUser X WHERE X has_text "thenault"') + rset = self.sexecute('CWUser X WHERE X has_text "thenault"') self.assertEquals(rset.rows, [[e.eid]]) def test_not(self): - eid = self.execute('CWUser X WHERE X login "syt"')[0][0] - rset = self.execute('CWUser X WHERE NOT X eid %s' % eid) + eid = self.sexecute('CWUser X WHERE X login "syt"')[0][0] + rset = self.sexecute('CWUser X WHERE NOT X eid %s' % eid) self.assert_(rset) self.assert_(not eid in (r[0] for r in rset)) def test_multiple(self): - seid = self.execute('CWUser X WHERE X login "syt"')[0][0] - aeid = self.execute('CWUser X WHERE X login "adim"')[0][0] - rset = self.execute('CWUser X, Y WHERE X login "syt", Y login "adim"') + seid = self.sexecute('CWUser X WHERE X login "syt"')[0][0] + aeid = self.sexecute('CWUser X WHERE X login "adim"')[0][0] + rset = self.sexecute('CWUser X, Y WHERE X login "syt", Y login "adim"') self.assertEquals(rset.rows, [[seid, aeid]]) - rset = self.execute('Any X,Y,L WHERE X login L, X login "syt", Y login "adim"') + rset = self.sexecute('Any X,Y,L WHERE X login L, X login "syt", Y login "adim"') self.assertEquals(rset.rows, [[seid, aeid, 'syt']]) def test_in(self): - seid = self.execute('CWUser X WHERE X login "syt"')[0][0] - aeid = self.execute('CWUser X WHERE X login "adim"')[0][0] - rset = self.execute('Any X,L ORDERBY L WHERE X login IN("syt", "adim"), X login L') + seid = self.sexecute('CWUser X WHERE X login "syt"')[0][0] + aeid = self.sexecute('CWUser X WHERE X login "adim"')[0][0] + rset = self.sexecute('Any X,L ORDERBY L WHERE X login IN("syt", "adim"), X login L') self.assertEquals(rset.rows, [[aeid, 'adim'], [seid, 'syt']]) def test_relations(self): - eid = self.execute('CWUser X WHERE X login "syt"')[0][0] - rset = self.execute('Any X,E WHERE X is CWUser, X login L, X primary_email E') + eid = self.sexecute('CWUser X WHERE X login "syt"')[0][0] + rset = self.sexecute('Any X,E WHERE X is CWUser, X login L, X primary_email E') self.assert_(eid in (r[0] for r in rset)) - rset = self.execute('Any X,L,E WHERE X is CWUser, X login L, X primary_email E') + rset = self.sexecute('Any X,L,E WHERE X is CWUser, X login L, X primary_email E') self.assert_('syt' in (r[1] for r in rset)) def test_count(self): - nbusers = self.execute('Any COUNT(X) WHERE X is CWUser')[0][0] + nbusers = self.sexecute('Any COUNT(X) WHERE X is CWUser')[0][0] # just check this is a possible number self.assert_(nbusers > 1, nbusers) self.assert_(nbusers < 30, nbusers) def test_upper(self): - eid = self.execute('CWUser X WHERE X login "syt"')[0][0] - rset = self.execute('Any UPPER(L) WHERE X eid %s, X login L' % eid) + eid = self.sexecute('CWUser X WHERE X login "syt"')[0][0] + rset = self.sexecute('Any UPPER(L) WHERE X eid %s, X login L' % eid) self.assertEquals(rset[0][0], 'SYT') def test_unknown_attr(self): - eid = self.execute('CWUser X WHERE X login "syt"')[0][0] - rset = self.execute('Any L,C,M WHERE X eid %s, X login L, ' + eid = self.sexecute('CWUser X WHERE X login "syt"')[0][0] + rset = self.sexecute('Any L,C,M WHERE X eid %s, X login L, ' 'X creation_date C, X modification_date M' % eid) self.assertEquals(rset[0][0], 'syt') self.assertEquals(rset[0][1], None) self.assertEquals(rset[0][2], None) def test_sort(self): - logins = [l for l, in self.execute('Any L ORDERBY L WHERE X login L')] + logins = [l for l, in self.sexecute('Any L ORDERBY L WHERE X login L')] self.assertEquals(logins, sorted(logins)) def test_lower_sort(self): - logins = [l for l, in self.execute('Any L ORDERBY lower(L) WHERE X login L')] + logins = [l for l, in self.sexecute('Any L ORDERBY lower(L) WHERE X login L')] self.assertEquals(logins, sorted(logins)) def test_or(self): - rset = self.execute('DISTINCT Any X WHERE X login "syt" OR (X in_group G, G name "managers")') + rset = self.sexecute('DISTINCT Any X WHERE X login "syt" OR (X in_group G, G name "managers")') self.assertEquals(len(rset), 2, rset.rows) # syt + admin def test_nonregr_set_owned_by(self): # test that when a user coming from ldap is triggering a transition # the related TrInfo has correct owner information - self.execute('SET X in_group G WHERE X login "syt", G name "managers"') + self.sexecute('SET X in_group G WHERE X login "syt", G name "managers"') self.commit() - syt = self.execute('CWUser X WHERE X login "syt"').get_entity(0, 0) + syt = self.sexecute('CWUser X WHERE X login "syt"').get_entity(0, 0) self.assertEquals([g.name for g in syt.in_group], ['managers', 'users']) self.patch_authenticate() cnx = self.login('syt', 'dummypassword') @@ -159,12 +154,12 @@ cu.execute('SET X in_state S WHERE X login "alf", S name "deactivated"') try: cnx.commit() - alf = self.execute('CWUser X WHERE X login "alf"').get_entity(0, 0) + alf = self.sexecute('CWUser X WHERE X login "alf"').get_entity(0, 0) self.assertEquals(alf.in_state[0].name, 'deactivated') trinfo = alf.latest_trinfo() self.assertEquals(trinfo.owned_by[0].login, 'syt') # select from_state to skip the user's creation TrInfo - rset = self.execute('Any U ORDERBY D DESC WHERE WF wf_info_for X,' + rset = self.sexecute('Any U ORDERBY D DESC WHERE WF wf_info_for X,' 'WF creation_date D, WF from_state FS,' 'WF owned_by U?, X eid %(x)s', {'x': alf.eid}, 'x') @@ -172,76 +167,76 @@ finally: # restore db state self.restore_connection() - self.execute('SET X in_state S WHERE X login "alf", S name "activated"') - self.execute('DELETE X in_group G WHERE X login "syt", G name "managers"') + self.sexecute('SET X in_state S WHERE X login "alf", S name "activated"') + self.sexecute('DELETE X in_group G WHERE X login "syt", G name "managers"') def test_same_column_names(self): - self.execute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"') + self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"') def test_multiple_entities_from_different_sources(self): - self.create_user('cochon') - self.failUnless(self.execute('Any X,Y WHERE X login "syt", Y login "cochon"')) + self.create_user('cochon', req=self.session) + self.failUnless(self.sexecute('Any X,Y WHERE X login "syt", Y login "cochon"')) def test_exists1(self): - self.add_entity('CWGroup', name=u'bougloup1') - self.add_entity('CWGroup', name=u'bougloup2') - self.execute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"') - self.execute('SET U in_group G WHERE G name = "bougloup1", U login "syt"') - rset = self.execute('Any L,SN ORDERBY L WHERE X in_state S, S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")') + self.add_entity('CWGroup', name=u'bougloup1', req=self.session) + self.add_entity('CWGroup', name=u'bougloup2', req=self.session) + self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"') + self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login "syt"') + rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")') self.assertEquals(rset.rows, [['admin', 'activated'], ['syt', 'activated']]) def test_exists2(self): - self.create_user('comme') - self.create_user('cochon') - self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"') - rset = self.execute('Any GN ORDERBY GN WHERE X in_group G, G name GN, (G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))') + self.create_user('comme', req=self.session) + self.create_user('cochon', req=self.session) + self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') + rset = self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, (G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))') self.assertEquals(rset.rows, [['managers'], ['users']]) def test_exists3(self): - self.create_user('comme') - self.create_user('cochon') - self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"') - self.failUnless(self.execute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')) - self.execute('SET X copain Y WHERE X login "syt", Y login "cochon"') - self.failUnless(self.execute('Any X, Y WHERE X copain Y, X login "syt", Y login "cochon"')) - rset = self.execute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon"))') + self.create_user('comme', req=self.session) + self.create_user('cochon', req=self.session) + self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') + self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')) + self.sexecute('SET X copain Y WHERE X login "syt", Y login "cochon"') + self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login "syt", Y login "cochon"')) + rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon"))') self.assertEquals(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', 'syt']]) def test_exists4(self): - self.create_user('comme') - self.create_user('cochon', groups=('users', 'guests')) - self.create_user('billy') - self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"') - self.execute('SET X copain Y WHERE X login "cochon", Y login "cochon"') - self.execute('SET X copain Y WHERE X login "comme", Y login "billy"') - self.execute('SET X copain Y WHERE X login "syt", Y login "billy"') + self.create_user('comme', req=self.session) + self.create_user('cochon', groups=('users', 'guests'), req=self.session) + self.create_user('billy', req=self.session) + self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') + self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"') + self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"') + self.sexecute('SET X copain Y WHERE X login "syt", Y login "billy"') # search for group name, login where # CWUser copain with "comme" or "cochon" AND same login as the copain # OR # CWUser in_state activated AND not copain with billy # # SO we expect everybody but "comme" and "syt" - rset= self.execute('Any GN,L WHERE X in_group G, X login L, G name GN, ' + rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, ' 'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR ' 'EXISTS(X in_state S, S name "activated", NOT X copain T2, T2 login "billy")') - all = self.execute('Any GN, L WHERE X in_group G, X login L, G name GN') + all = self.sexecute('Any GN, L WHERE X in_group G, X login L, G name GN') all.rows.remove(['users', 'comme']) all.rows.remove(['users', 'syt']) self.assertEquals(sorted(rset.rows), sorted(all.rows)) def test_exists5(self): - self.create_user('comme') - self.create_user('cochon', groups=('users', 'guests')) - self.create_user('billy') - self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"') - self.execute('SET X copain Y WHERE X login "cochon", Y login "cochon"') - self.execute('SET X copain Y WHERE X login "comme", Y login "billy"') - self.execute('SET X copain Y WHERE X login "syt", Y login "cochon"') - rset= self.execute('Any L WHERE X login L, ' + self.create_user('comme', req=self.session) + self.create_user('cochon', groups=('users', 'guests'), req=self.session) + self.create_user('billy', req=self.session) + self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') + self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"') + self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"') + self.sexecute('SET X copain Y WHERE X login "syt", Y login "cochon"') + rset= self.sexecute('Any L WHERE X login L, ' 'EXISTS(X copain T, T login in ("comme", "cochon")) AND ' 'NOT EXISTS(X copain T2, T2 login "billy")') self.assertEquals(sorted(rset.rows), [['cochon'], ['syt']]) - rset= self.execute('Any GN,L WHERE X in_group G, X login L, G name GN, ' + rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, ' 'EXISTS(X copain T, T login in ("comme", "cochon")) AND ' 'NOT EXISTS(X copain T2, T2 login "billy")') self.assertEquals(sorted(rset.rows), [['guests', 'cochon'], @@ -249,18 +244,18 @@ ['users', 'syt']]) def test_cd_restriction(self): - rset = self.execute('CWUser X WHERE X creation_date > "2009-02-01"') + rset = self.sexecute('CWUser X WHERE X creation_date > "2009-02-01"') self.assertEquals(len(rset), 2) # admin/anon but no ldap user since it doesn't support creation_date def test_union(self): - afeids = self.execute('State X') - ueids = self.execute('CWUser X') - rset = self.execute('(Any X WHERE X is State) UNION (Any X WHERE X is CWUser)') + afeids = self.sexecute('State X') + ueids = self.sexecute('CWUser X') + rset = self.sexecute('(Any X WHERE X is State) UNION (Any X WHERE X is CWUser)') self.assertEquals(sorted(r[0] for r in rset.rows), sorted(r[0] for r in afeids + ueids)) def _init_security_test(self): - self.create_user('iaminguestsgrouponly', groups=('guests',)) + self.create_user('iaminguestsgrouponly', groups=('guests',), req=self.session) cnx = self.login('iaminguestsgrouponly') return cnx.cursor() @@ -286,33 +281,33 @@ self.assertEquals(rset.rows, [[None]]) def test_nonregr1(self): - self.execute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, ' + self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, ' 'X modification_date AA', - {'x': cnx.user(self.session).eid}) + {'x': self.session.user.eid}) def test_nonregr2(self): - self.execute('Any X,L,AA WHERE E eid %(x)s, E owned_by X, ' + self.sexecute('Any X,L,AA WHERE E eid %(x)s, E owned_by X, ' 'X login L, X modification_date AA', - {'x': cnx.user(self.session).eid}) + {'x': self.session.user.eid}) def test_nonregr3(self): - self.execute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, ' + self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, ' 'X modification_date AA', - {'x': cnx.user(self.session).eid}) + {'x': self.session.user.eid}) def test_nonregr4(self): - emaileid = self.execute('INSERT EmailAddress X: X address "toto@logilab.org"')[0][0] - self.execute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA', + emaileid = self.sexecute('INSERT EmailAddress X: X address "toto@logilab.org"')[0][0] + self.sexecute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA', {'x': emaileid}) def test_nonregr5(self): # original jpl query: # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5 rql = 'Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "%s", P is X, X creation_date CD' % self.session.user.login - self.execute(rql, )#{'x': }) + self.sexecute(rql, )#{'x': }) def test_nonregr6(self): - self.execute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File ' + self.sexecute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File ' 'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) ' 'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) ' 'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)', @@ -353,6 +348,9 @@ res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) self.assertEquals(res, [[1, 5], [2, 4], [3, 6]]) +# XXX +LDAPUserSourceTC._init_repo() +repo = LDAPUserSourceTC.repo class RQL2LDAPFilterTC(RQLGeneratorTC): schema = repo.schema diff -r 8074dd88e21b -r a9a2dca5db20 server/test/unittest_migractions.py --- a/server/test/unittest_migractions.py Tue Aug 11 17:06:02 2009 +0200 +++ b/server/test/unittest_migractions.py Tue Aug 11 17:15:54 2009 +0200 @@ -2,13 +2,14 @@ :license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses """ +from copy import deepcopy from datetime import date from os.path import join from logilab.common.testlib import TestCase, unittest_main from cubicweb import ConfigurationError -from cubicweb.devtools.apptest import RepositoryBasedTC, get_versions +from cubicweb.devtools.testlib import CubicWebTC, get_versions from cubicweb.schema import CubicWebSchemaLoader from cubicweb.server.sqlutils import SQL_PREFIX from cubicweb.server.repository import Repository @@ -23,22 +24,28 @@ Repository.get_versions = orig_get_versions -class MigrationCommandsTC(RepositoryBasedTC): +class MigrationCommandsTC(CubicWebTC): + + @classmethod + def init_config(cls, config): + super(MigrationCommandsTC, cls).init_config(config) + config._cubes = None + cls.repo.fill_schema() + cls.origschema = deepcopy(cls.repo.schema) + # hack to read the schema from data/migrschema + config.appid = join('data', 'migratedapp') + global migrschema + migrschema = config.load_schema() + config.appid = 'data' + assert 'Folder' in migrschema + + @classmethod + def _refresh_repo(cls): + super(MigrationCommandsTC, cls)._refresh_repo() + cls.repo.schema = cls.vreg.schema = deepcopy(cls.origschema) def setUp(self): - if not hasattr(self, '_repo'): - # first initialization - repo = self.repo # set by the RepositoryBasedTC metaclass - # force to read schema from the database - repo.config._cubes = None - repo.fill_schema() - # hack to read the schema from data/migrschema - self.repo.config.appid = join('data', 'migratedapp') - global migrschema - migrschema = self.repo.config.load_schema() - self.repo.config.appid = 'data' - assert 'Folder' in migrschema - RepositoryBasedTC.setUp(self) + CubicWebTC.setUp(self) self.mh = ServerMigrationHelper(self.repo.config, migrschema, repo=self.repo, cnx=self.cnx, interactive=False) @@ -280,7 +287,7 @@ 'Any N ORDERBY O WHERE X is CWAttribute, X relation_type RT, RT name N,' 'X from_entity FE, FE name "Personne",' 'X ordernum O')] - expected = [u'nom', u'prenom', u'promo', u'ass', u'adel', u'titre', + expected = [u'nom', u'prenom', u'sexe', u'promo', u'ass', u'adel', u'titre', u'web', u'tel', u'fax', u'datenaiss', u'test', 'description', u'firstname', u'creation_date', 'cwuri', u'modification_date'] self.assertEquals(rinorder, expected) diff -r 8074dd88e21b -r a9a2dca5db20 server/test/unittest_msplanner.py --- a/server/test/unittest_msplanner.py Tue Aug 11 17:06:02 2009 +0200 +++ b/server/test/unittest_msplanner.py Tue Aug 11 17:15:54 2009 +0200 @@ -58,7 +58,7 @@ # keep cnx so it's not garbage collected and the associated session is closed -repo, cnx = init_test_database('sqlite') +repo, cnx = init_test_database() class BaseMSPlannerTC(BasePlannerTC): """test planner related feature on a 3-sources repository: diff -r 8074dd88e21b -r a9a2dca5db20 server/test/unittest_multisources.py --- a/server/test/unittest_multisources.py Tue Aug 11 17:06:02 2009 +0200 +++ b/server/test/unittest_multisources.py Tue Aug 11 17:15:54 2009 +0200 @@ -11,7 +11,7 @@ from logilab.common.decorators import cached from cubicweb.devtools import TestServerConfiguration, init_test_database -from cubicweb.devtools.apptest import RepositoryBasedTC +from cubicweb.devtools.testlib import CubicWebTC, refresh_repo from cubicweb.devtools.repotest import do_monkey_patch, undo_monkey_patch TestServerConfiguration.no_sqlite_wrap = True @@ -26,16 +26,9 @@ class ExternalSource2Configuration(TestServerConfiguration): sourcefile = 'sources_multi2' -repo2, cnx2 = init_test_database('sqlite', config=ExternalSource1Configuration('data')) -cu = cnx2.cursor() -ec1 = cu.execute('INSERT Card X: X title "C3: An external card", X wikiid "aaa"')[0][0] -cu.execute('INSERT Card X: X title "C4: Ze external card", X wikiid "zzz"') -aff1 = cu.execute('INSERT Affaire X: X ref "AFFREF", X in_state S WHERE S name "pitetre"')[0][0] -cnx2.commit() - MTIME = datetime.now() - timedelta(0, 10) - -repo3, cnx3 = init_test_database('sqlite', config=ExternalSource2Configuration('data')) +repo2, cnx2 = init_test_database(config=ExternalSource1Configuration('data')) +repo3, cnx3 = init_test_database(config=ExternalSource2Configuration('data')) # XXX, access existing connection, no pyro connection from cubicweb.server.sources.pyrorql import PyroRQLSource @@ -45,38 +38,47 @@ from cubicweb.dbapi import Connection Connection.close = lambda x: None -class TwoSourcesTC(RepositoryBasedTC): - repo_config = TwoSourcesConfiguration('data') +class TwoSourcesTC(CubicWebTC): + config = TwoSourcesConfiguration('data') + + @classmethod + def _refresh_repo(cls): + super(TwoSourcesTC, cls)._refresh_repo() + cnx2.rollback() + refresh_repo(repo2) + cnx3.rollback() + refresh_repo(repo3) def setUp(self): - RepositoryBasedTC.setUp(self) - self.repo.sources[-1]._query_cache.clear() - self.repo.sources[-2]._query_cache.clear() - # trigger discovery - self.execute('Card X') - self.execute('Affaire X') - self.execute('State X') - self.commit() - # don't delete external entities! - self.maxeid = self.session.system_sql('SELECT MAX(eid) FROM entities').fetchone()[0] - # add some entities - self.ic1 = self.execute('INSERT Card X: X title "C1: An internal card", X wikiid "aaai"')[0][0] - self.ic2 = self.execute('INSERT Card X: X title "C2: Ze internal card", X wikiid "zzzi"')[0][0] - self.commit() + CubicWebTC.setUp(self) do_monkey_patch() def tearDown(self): - RepositoryBasedTC.tearDown(self) + CubicWebTC.tearDown(self) undo_monkey_patch() + def setup_database(self): + cu = cnx2.cursor() + self.ec1 = cu.execute('INSERT Card X: X title "C3: An external card", X wikiid "aaa"')[0][0] + cu.execute('INSERT Card X: X title "C4: Ze external card", X wikiid "zzz"') + self.aff1 = cu.execute('INSERT Affaire X: X ref "AFFREF", X in_state S WHERE S name "pitetre"')[0][0] + cnx2.commit() + # trigger discovery + self.sexecute('Card X') + self.sexecute('Affaire X') + self.sexecute('State X') + # add some entities + self.ic1 = self.sexecute('INSERT Card X: X title "C1: An internal card", X wikiid "aaai"')[0][0] + self.ic2 = self.sexecute('INSERT Card X: X title "C2: Ze internal card", X wikiid "zzzi"')[0][0] + def test_eid_comp(self): - rset = self.execute('Card X WHERE X eid > 1') + rset = self.sexecute('Card X WHERE X eid > 1') self.assertEquals(len(rset), 4) - rset = self.execute('Any X,T WHERE X title T, X eid > 1') + rset = self.sexecute('Any X,T WHERE X title T, X eid > 1') self.assertEquals(len(rset), 4) def test_metainformation(self): - rset = self.execute('Card X ORDERBY T WHERE X title T') + rset = self.sexecute('Card X ORDERBY T WHERE X title T') # 2 added to the system source, 2 added to the external source self.assertEquals(len(rset), 4) # since they are orderd by eid, we know the 3 first one is coming from the system source @@ -89,28 +91,28 @@ self.assertEquals(metainf['source'], {'adapter': 'pyrorql', 'base-url': 'http://extern.org/', 'uri': 'extern'}) self.assertEquals(metainf['type'], 'Card') self.assert_(metainf['extid']) - etype = self.execute('Any ETN WHERE X is ET, ET name ETN, X eid %(x)s', + etype = self.sexecute('Any ETN WHERE X is ET, ET name ETN, X eid %(x)s', {'x': externent.eid}, 'x')[0][0] self.assertEquals(etype, 'Card') def test_order_limit_offset(self): - rsetbase = self.execute('Any W,X ORDERBY W,X WHERE X wikiid W') + rsetbase = self.sexecute('Any W,X ORDERBY W,X WHERE X wikiid W') self.assertEquals(len(rsetbase), 4) self.assertEquals(sorted(rsetbase.rows), rsetbase.rows) - rset = self.execute('Any W,X ORDERBY W,X LIMIT 2 OFFSET 2 WHERE X wikiid W') + rset = self.sexecute('Any W,X ORDERBY W,X LIMIT 2 OFFSET 2 WHERE X wikiid W') self.assertEquals(rset.rows, rsetbase.rows[2:4]) def test_has_text(self): self.repo.sources_by_uri['extern'].synchronize(MTIME) # in case fti_update has been run before - self.failUnless(self.execute('Any X WHERE X has_text "affref"')) - self.failUnless(self.execute('Affaire X WHERE X has_text "affref"')) + self.failUnless(self.sexecute('Any X WHERE X has_text "affref"')) + self.failUnless(self.sexecute('Affaire X WHERE X has_text "affref"')) def test_anon_has_text(self): self.repo.sources_by_uri['extern'].synchronize(MTIME) # in case fti_update has been run before - self.execute('INSERT Affaire X: X ref "no readable card"')[0][0] - aff1 = self.execute('INSERT Affaire X: X ref "card"')[0][0] + self.sexecute('INSERT Affaire X: X ref "no readable card"')[0][0] + aff1 = self.sexecute('INSERT Affaire X: X ref "card"')[0][0] # grant read access - self.execute('SET X owned_by U WHERE X eid %(x)s, U login "anon"', {'x': aff1}, 'x') + self.sexecute('SET X owned_by U WHERE X eid %(x)s, U login "anon"', {'x': aff1}, 'x') self.commit() cnx = self.login('anon') cu = cnx.cursor() @@ -120,79 +122,81 @@ def test_synchronization(self): cu = cnx2.cursor() - assert cu.execute('Any X WHERE X eid %(x)s', {'x': aff1}, 'x') - cu.execute('SET X ref "BLAH" WHERE X eid %(x)s', {'x': aff1}, 'x') + assert cu.execute('Any X WHERE X eid %(x)s', {'x': self.aff1}, 'x') + cu.execute('SET X ref "BLAH" WHERE X eid %(x)s', {'x': self.aff1}, 'x') aff2 = cu.execute('INSERT Affaire X: X ref "AFFREUX", X in_state S WHERE S name "pitetre"')[0][0] cnx2.commit() try: # force sync self.repo.sources_by_uri['extern'].synchronize(MTIME) - self.failUnless(self.execute('Any X WHERE X has_text "blah"')) - self.failUnless(self.execute('Any X WHERE X has_text "affreux"')) + self.failUnless(self.sexecute('Any X WHERE X has_text "blah"')) + self.failUnless(self.sexecute('Any X WHERE X has_text "affreux"')) cu.execute('DELETE Affaire X WHERE X eid %(x)s', {'x': aff2}) cnx2.commit() self.repo.sources_by_uri['extern'].synchronize(MTIME) - rset = self.execute('Any X WHERE X has_text "affreux"') + rset = self.sexecute('Any X WHERE X has_text "affreux"') self.failIf(rset) finally: # restore state - cu.execute('SET X ref "AFFREF" WHERE X eid %(x)s', {'x': aff1}, 'x') + cu.execute('SET X ref "AFFREF" WHERE X eid %(x)s', {'x': self.aff1}, 'x') cnx2.commit() def test_simplifiable_var(self): - affeid = self.execute('Affaire X WHERE X ref "AFFREF"')[0][0] - rset = self.execute('Any X,AA,AB WHERE E eid %(x)s, E in_state X, X name AA, X modification_date AB', + affeid = self.sexecute('Affaire X WHERE X ref "AFFREF"')[0][0] + rset = self.sexecute('Any X,AA,AB WHERE E eid %(x)s, E in_state X, X name AA, X modification_date AB', {'x': affeid}, 'x') self.assertEquals(len(rset), 1) self.assertEquals(rset[0][1], "pitetre") def test_simplifiable_var_2(self): - affeid = self.execute('Affaire X WHERE X ref "AFFREF"')[0][0] - rset = self.execute('Any E WHERE E eid %(x)s, E in_state S, NOT S name "moved"', + affeid = self.sexecute('Affaire X WHERE X ref "AFFREF"')[0][0] + rset = self.sexecute('Any E WHERE E eid %(x)s, E in_state S, NOT S name "moved"', {'x': affeid, 'u': self.session.user.eid}, 'x') self.assertEquals(len(rset), 1) def test_sort_func(self): - self.execute('Affaire X ORDERBY DUMB_SORT(RF) WHERE X ref RF') + self.sexecute('Affaire X ORDERBY DUMB_SORT(RF) WHERE X ref RF') def test_sort_func_ambigous(self): - self.execute('Any X ORDERBY DUMB_SORT(RF) WHERE X title RF') + self.sexecute('Any X ORDERBY DUMB_SORT(RF) WHERE X title RF') def test_in_eid(self): - iec1 = self.repo.extid2eid(self.repo.sources_by_uri['extern'], str(ec1), + iec1 = self.repo.extid2eid(self.repo.sources_by_uri['extern'], str(self.ec1), 'Card', self.session) - rset = self.execute('Any X WHERE X eid IN (%s, %s)' % (iec1, self.ic1)) + rset = self.sexecute('Any X WHERE X eid IN (%s, %s)' % (iec1, self.ic1)) self.assertEquals(sorted(r[0] for r in rset.rows), sorted([iec1, self.ic1])) def test_greater_eid(self): - rset = self.execute('Any X WHERE X eid > %s' % self.maxeid) + rset = self.sexecute('Any X WHERE X eid > %s' % (self.ic1 - 1)) self.assertEquals(len(rset.rows), 2) # self.ic1 and self.ic2 + cu = cnx2.cursor() ec2 = cu.execute('INSERT Card X: X title "glup"')[0][0] cnx2.commit() # 'X eid > something' should not trigger discovery - rset = self.execute('Any X WHERE X eid > %s' % self.maxeid) + rset = self.sexecute('Any X WHERE X eid > %s' % (self.ic1 - 1)) self.assertEquals(len(rset.rows), 2) # trigger discovery using another query - crset = self.execute('Card X WHERE X title "glup"') + crset = self.sexecute('Card X WHERE X title "glup"') self.assertEquals(len(crset.rows), 1) - rset = self.execute('Any X WHERE X eid > %s' % self.maxeid) + rset = self.sexecute('Any X WHERE X eid > %s' % (self.ic1 - 1)) self.assertEquals(len(rset.rows), 3) - rset = self.execute('Any MAX(X)') + rset = self.sexecute('Any MAX(X)') self.assertEquals(len(rset.rows), 1) self.assertEquals(rset.rows[0][0], crset[0][0]) def test_attr_unification_1(self): - n1 = self.execute('INSERT Note X: X type "AFFREF"')[0][0] - n2 = self.execute('INSERT Note X: X type "AFFREU"')[0][0] - rset = self.execute('Any X,Y WHERE X is Note, Y is Affaire, X type T, Y ref T') + n1 = self.sexecute('INSERT Note X: X type "AFFREF"')[0][0] + n2 = self.sexecute('INSERT Note X: X type "AFFREU"')[0][0] + rset = self.sexecute('Any X,Y WHERE X is Note, Y is Affaire, X type T, Y ref T') self.assertEquals(len(rset), 1, rset.rows) def test_attr_unification_2(self): + cu = cnx2.cursor() ec2 = cu.execute('INSERT Card X: X title "AFFREF"')[0][0] cnx2.commit() try: - c1 = self.execute('INSERT Card C: C title "AFFREF"')[0][0] - rset = self.execute('Any X,Y WHERE X is Card, Y is Affaire, X title T, Y ref T') + c1 = self.sexecute('INSERT Card C: C title "AFFREF"')[0][0] + rset = self.sexecute('Any X,Y WHERE X is Card, Y is Affaire, X title T, Y ref T') self.assertEquals(len(rset), 2, rset.rows) finally: cu.execute('DELETE Card X WHERE X eid %(x)s', {'x': ec2}, 'x') @@ -200,81 +204,86 @@ def test_attr_unification_neq_1(self): # XXX complete - self.execute('Any X,Y WHERE X is Note, Y is Affaire, X creation_date D, Y creation_date > D') + self.sexecute('Any X,Y WHERE X is Note, Y is Affaire, X creation_date D, Y creation_date > D') def test_attr_unification_neq_2(self): # XXX complete - self.execute('Any X,Y WHERE X is Card, Y is Affaire, X creation_date D, Y creation_date > D') + self.sexecute('Any X,Y WHERE X is Card, Y is Affaire, X creation_date D, Y creation_date > D') def test_union(self): - afeids = self.execute('Affaire X') - ueids = self.execute('CWUser X') - rset = self.execute('(Any X WHERE X is Affaire) UNION (Any X WHERE X is CWUser)') + afeids = self.sexecute('Affaire X') + ueids = self.sexecute('CWUser X') + rset = self.sexecute('(Any X WHERE X is Affaire) UNION (Any X WHERE X is CWUser)') self.assertEquals(sorted(r[0] for r in rset.rows), sorted(r[0] for r in afeids + ueids)) def test_subquery1(self): - rsetbase = self.execute('Any W,X WITH W,X BEING (Any W,X ORDERBY W,X WHERE X wikiid W)') + rsetbase = self.sexecute('Any W,X WITH W,X BEING (Any W,X ORDERBY W,X WHERE X wikiid W)') self.assertEquals(len(rsetbase), 4) self.assertEquals(sorted(rsetbase.rows), rsetbase.rows) - rset = self.execute('Any W,X LIMIT 2 OFFSET 2 WITH W,X BEING (Any W,X ORDERBY W,X WHERE X wikiid W)') + rset = self.sexecute('Any W,X LIMIT 2 OFFSET 2 WITH W,X BEING (Any W,X ORDERBY W,X WHERE X wikiid W)') self.assertEquals(rset.rows, rsetbase.rows[2:4]) - rset = self.execute('Any W,X ORDERBY W,X LIMIT 2 OFFSET 2 WITH W,X BEING (Any W,X WHERE X wikiid W)') + rset = self.sexecute('Any W,X ORDERBY W,X LIMIT 2 OFFSET 2 WITH W,X BEING (Any W,X WHERE X wikiid W)') self.assertEquals(rset.rows, rsetbase.rows[2:4]) - rset = self.execute('Any W,X WITH W,X BEING (Any W,X ORDERBY W,X LIMIT 2 OFFSET 2 WHERE X wikiid W)') + rset = self.sexecute('Any W,X WITH W,X BEING (Any W,X ORDERBY W,X LIMIT 2 OFFSET 2 WHERE X wikiid W)') self.assertEquals(rset.rows, rsetbase.rows[2:4]) def test_subquery2(self): - affeid = self.execute('Affaire X WHERE X ref "AFFREF"')[0][0] - rset = self.execute('Any X,AA,AB WITH X,AA,AB BEING (Any X,AA,AB WHERE E eid %(x)s, E in_state X, X name AA, X modification_date AB)', + affeid = self.sexecute('Affaire X WHERE X ref "AFFREF"')[0][0] + rset = self.sexecute('Any X,AA,AB WITH X,AA,AB BEING (Any X,AA,AB WHERE E eid %(x)s, E in_state X, X name AA, X modification_date AB)', {'x': affeid}) self.assertEquals(len(rset), 1) self.assertEquals(rset[0][1], "pitetre") def test_not_relation(self): - states = set(tuple(x) for x in self.execute('Any S,SN WHERE S is State, S name SN')) + states = set(tuple(x) for x in self.sexecute('Any S,SN WHERE S is State, S name SN')) userstate = self.session.user.in_state[0] states.remove((userstate.eid, userstate.name)) - notstates = set(tuple(x) for x in self.execute('Any S,SN WHERE S is State, S name SN, NOT X in_state S, X eid %(x)s', + notstates = set(tuple(x) for x in self.sexecute('Any S,SN WHERE S is State, S name SN, NOT X in_state S, X eid %(x)s', {'x': self.session.user.eid}, 'x')) self.assertEquals(notstates, states) - aff1 = self.execute('Any X WHERE X is Affaire, X ref "AFFREF"')[0][0] - aff1stateeid, aff1statename = self.execute('Any S,SN WHERE X eid %(x)s, X in_state S, S name SN', {'x': aff1}, 'x')[0] + aff1 = self.sexecute('Any X WHERE X is Affaire, X ref "AFFREF"')[0][0] + aff1stateeid, aff1statename = self.sexecute('Any S,SN WHERE X eid %(x)s, X in_state S, S name SN', {'x': aff1}, 'x')[0] self.assertEquals(aff1statename, 'pitetre') states.add((userstate.eid, userstate.name)) states.remove((aff1stateeid, aff1statename)) - notstates = set(tuple(x) for x in self.execute('Any S,SN WHERE S is State, S name SN, NOT X in_state S, X eid %(x)s', + notstates = set(tuple(x) for x in self.sexecute('Any S,SN WHERE S is State, S name SN, NOT X in_state S, X eid %(x)s', {'x': aff1}, 'x')) self.assertSetEquals(notstates, states) def test_absolute_url_base_url(self): + cu = cnx2.cursor() ceid = cu.execute('INSERT Card X: X title "without wikiid to get eid based url"')[0][0] cnx2.commit() - lc = self.execute('Card X WHERE X title "without wikiid to get eid based url"').get_entity(0, 0) + lc = self.sexecute('Card X WHERE X title "without wikiid to get eid based url"').get_entity(0, 0) self.assertEquals(lc.absolute_url(), 'http://extern.org/card/eid/%s' % ceid) + cu.execute('DELETE Card X WHERE X eid %(x)s', {'x':ceid}) + cnx2.commit() def test_absolute_url_no_base_url(self): cu = cnx3.cursor() ceid = cu.execute('INSERT Card X: X title "without wikiid to get eid based url"')[0][0] cnx3.commit() - lc = self.execute('Card X WHERE X title "without wikiid to get eid based url"').get_entity(0, 0) + lc = self.sexecute('Card X WHERE X title "without wikiid to get eid based url"').get_entity(0, 0) self.assertEquals(lc.absolute_url(), 'http://testing.fr/cubicweb/card/eid/%s' % lc.eid) + cu.execute('DELETE Card X WHERE X eid %(x)s', {'x':ceid}) + cnx3.commit() def test_nonregr1(self): ueid = self.session.user.eid - affaire = self.execute('Affaire X WHERE X ref "AFFREF"').get_entity(0, 0) - self.execute('Any U WHERE U in_group G, (G name IN ("managers", "logilab") OR (X require_permission P?, P name "bla", P require_group G)), X eid %(x)s, U eid %(u)s', + affaire = self.sexecute('Affaire X WHERE X ref "AFFREF"').get_entity(0, 0) + self.sexecute('Any U WHERE U in_group G, (G name IN ("managers", "logilab") OR (X require_permission P?, P name "bla", P require_group G)), X eid %(x)s, U eid %(u)s', {'x': affaire.eid, 'u': ueid}) def test_nonregr2(self): treid = self.session.user.latest_trinfo().eid - rset = self.execute('Any X ORDERBY D DESC WHERE E eid %(x)s, E wf_info_for X, X modification_date D', + rset = self.sexecute('Any X ORDERBY D DESC WHERE E eid %(x)s, E wf_info_for X, X modification_date D', {'x': treid}) self.assertEquals(len(rset), 1) self.assertEquals(rset.rows[0], [self.session.user.eid]) def test_nonregr3(self): - self.execute('DELETE Card X WHERE X eid %(x)s, NOT X multisource_inlined_rel Y', {'x': self.ic1}) + self.sexecute('DELETE Card X WHERE X eid %(x)s, NOT X multisource_inlined_rel Y', {'x': self.ic1}) if __name__ == '__main__': from logilab.common.testlib import unittest_main diff -r 8074dd88e21b -r a9a2dca5db20 server/test/unittest_querier.py --- a/server/test/unittest_querier.py Tue Aug 11 17:06:02 2009 +0200 +++ b/server/test/unittest_querier.py Tue Aug 11 17:15:54 2009 +0200 @@ -46,7 +46,7 @@ ('C0 text,C1 integer', {'A': 'table0.C0', 'B': 'table0.C1'})) -repo, cnx = init_test_database('sqlite') +repo, cnx = init_test_database() diff -r 8074dd88e21b -r a9a2dca5db20 server/test/unittest_repository.py --- a/server/test/unittest_repository.py Tue Aug 11 17:06:02 2009 +0200 +++ b/server/test/unittest_repository.py Tue Aug 11 17:15:54 2009 +0200 @@ -18,10 +18,11 @@ from yams.constraints import UniqueConstraint -from cubicweb import BadConnectionId, RepositoryError, ValidationError, UnknownEid, AuthenticationError +from cubicweb import (BadConnectionId, RepositoryError, ValidationError, + UnknownEid, AuthenticationError) from cubicweb.schema import CubicWebSchema, RQLConstraint from cubicweb.dbapi import connect, repo_connect, multiple_connections_unfix -from cubicweb.devtools.apptest import RepositoryBasedTC +from cubicweb.devtools.testlib import CubicWebTC from cubicweb.devtools.repotest import tuplify from cubicweb.server import repository from cubicweb.server.sqlutils import SQL_PREFIX @@ -31,48 +32,35 @@ os.system('pyro-ns >/dev/null 2>/dev/null &') -class RepositoryTC(RepositoryBasedTC): +class RepositoryTC(CubicWebTC): """ singleton providing access to a persistent storage for entities and relation """ -# def setUp(self): -# pass - -# def tearDown(self): -# self.repo.config.db_perms = True -# cnxid = self.repo.connect(*self.default_user_password()) -# for etype in ('Affaire', 'Note', 'Societe', 'Personne'): -# self.repo.execute(cnxid, 'DELETE %s X' % etype) -# self.repo.commit(cnxid) -# self.repo.close(cnxid) - def test_fill_schema(self): self.repo.schema = CubicWebSchema(self.repo.config.appid) self.repo.config._cubes = None # avoid assertion error + self.repo.config.repairing = True # avoid versions checking self.repo.fill_schema() - pool = self.repo._get_pool() table = SQL_PREFIX + 'CWEType' namecol = SQL_PREFIX + 'name' finalcol = SQL_PREFIX + 'final' - try: - cu = self.session.system_sql('SELECT %s FROM %s WHERE %s is NULL' % ( - namecol, table, finalcol)) - self.assertEquals(cu.fetchall(), []) - cu = self.session.system_sql('SELECT %s FROM %s WHERE %s=%%(final)s ORDER BY %s' - % (namecol, table, finalcol, namecol), {'final': 'TRUE'}) - self.assertEquals(cu.fetchall(), [(u'Boolean',), (u'Bytes',), - (u'Date',), (u'Datetime',), - (u'Decimal',),(u'Float',), - (u'Int',), - (u'Interval',), (u'Password',), - (u'String',), (u'Time',)]) - finally: - self.repo._free_pool(pool) + self.session.set_pool() + cu = self.session.system_sql('SELECT %s FROM %s WHERE %s is NULL' % ( + namecol, table, finalcol)) + self.assertEquals(cu.fetchall(), []) + cu = self.session.system_sql('SELECT %s FROM %s WHERE %s=%%(final)s ORDER BY %s' + % (namecol, table, finalcol, namecol), {'final': 'TRUE'}) + self.assertEquals(cu.fetchall(), [(u'Boolean',), (u'Bytes',), + (u'Date',), (u'Datetime',), + (u'Decimal',),(u'Float',), + (u'Int',), + (u'Interval',), (u'Password',), + (u'String',), (u'Time',)]) def test_schema_has_owner(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) self.failIf(repo.execute(cnxid, 'CWEType X WHERE NOT X owned_by U')) self.failIf(repo.execute(cnxid, 'CWRType X WHERE NOT X owned_by U')) self.failIf(repo.execute(cnxid, 'CWAttribute X WHERE NOT X owned_by U')) @@ -81,18 +69,17 @@ self.failIf(repo.execute(cnxid, 'CWConstraintType X WHERE NOT X owned_by U')) def test_connect(self): - login, passwd = self.default_user_password() - self.assert_(self.repo.connect(login, passwd)) + self.assert_(self.repo.connect(self.admlogin, self.admpassword)) self.assertRaises(AuthenticationError, - self.repo.connect, login, 'nimportnawak') + self.repo.connect, self.admlogin, 'nimportnawak') self.assertRaises(AuthenticationError, - self.repo.connect, login, None) + self.repo.connect, self.admlogin, None) self.assertRaises(AuthenticationError, self.repo.connect, None, None) def test_execute(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) repo.execute(cnxid, 'Any X') repo.execute(cnxid, 'Any X where X is Personne') repo.execute(cnxid, 'Any X where X is Personne, X nom ~= "to"') @@ -101,7 +88,7 @@ def test_login_upassword_accent(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_state S, X in_group G WHERE S name "activated", G name "users"', {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')}) repo.commit(cnxid) @@ -110,7 +97,7 @@ def test_invalid_entity_rollback(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) # no group repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_state S WHERE S name "activated"', {'login': u"tutetute", 'passwd': 'tutetute'}) @@ -120,7 +107,7 @@ def test_close(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) self.assert_(cnxid) repo.close(cnxid) self.assertRaises(BadConnectionId, repo.execute, cnxid, 'Any X') @@ -131,9 +118,9 @@ def test_shared_data(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) repo.set_shared_data(cnxid, 'data', 4) - cnxid2 = repo.connect(*self.default_user_password()) + cnxid2 = repo.connect(self.admlogin, self.admpassword) self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) self.assertEquals(repo.get_shared_data(cnxid2, 'data'), None) repo.set_shared_data(cnxid2, 'data', 5) @@ -151,14 +138,14 @@ def test_check_session(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) self.assertEquals(repo.check_session(cnxid), None) repo.close(cnxid) self.assertRaises(BadConnectionId, repo.check_session, cnxid) def test_transaction_base(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) # check db state result = repo.execute(cnxid, 'Personne X') self.assertEquals(result.rowcount, 0) @@ -177,7 +164,7 @@ def test_transaction_base2(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) # rollback relation insertion repo.execute(cnxid, "SET U in_group G WHERE U login 'admin', G name 'guests'") result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'") @@ -188,7 +175,7 @@ def test_transaction_base3(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) # rollback state change which trigger TrInfo insertion ueid = repo._get_session(cnxid).user.eid rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid}) @@ -206,7 +193,7 @@ def test_close_wait_processing_request(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) repo.execute(cnxid, 'INSERT CWUser X: X login "toto", X upassword "tutu", X in_group G WHERE G name "users"') repo.commit(cnxid) # close has to be in the thread due to sqlite limitations @@ -290,7 +277,7 @@ def test_internal_api(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) session = repo._get_session(cnxid, setpool=True) self.assertEquals(repo.type_and_source_from_eid(1, session), ('CWGroup', 'system', None)) @@ -308,7 +295,7 @@ def test_session_api(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) self.assertEquals(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {})) self.assertEquals(repo.describe(cnxid, 1), (u'CWGroup', u'system', None)) repo.close(cnxid) @@ -317,7 +304,7 @@ def test_shared_data_api(self): repo = self.repo - cnxid = repo.connect(*self.default_user_password()) + cnxid = repo.connect(self.admlogin, self.admpassword) self.assertEquals(repo.get_shared_data(cnxid, 'data'), None) repo.set_shared_data(cnxid, 'data', 4) self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) @@ -343,37 +330,34 @@ # print 'test time: %.3f (time) %.3f (cpu)' % ((time() - t), clock() - c) -class DataHelpersTC(RepositoryBasedTC): - - def setUp(self): - """ called before each test from this class """ - cnxid = self.repo.connect(*self.default_user_password()) - self.session = self.repo._sessions[cnxid] - self.session.set_pool() - - def tearDown(self): - self.session.rollback() +class DataHelpersTC(CubicWebTC): def test_create_eid(self): + self.session.set_pool() self.assert_(self.repo.system_source.create_eid(self.session)) def test_source_from_eid(self): + self.session.set_pool() self.assertEquals(self.repo.source_from_eid(1, self.session), self.repo.sources_by_uri['system']) def test_source_from_eid_raise(self): + self.session.set_pool() self.assertRaises(UnknownEid, self.repo.source_from_eid, -2, self.session) def test_type_from_eid(self): + self.session.set_pool() self.assertEquals(self.repo.type_from_eid(1, self.session), 'CWGroup') def test_type_from_eid_raise(self): + self.session.set_pool() self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, self.session) def test_add_delete_info(self): entity = self.repo.vreg['etypes'].etype_class('Personne')(self.session) entity.eid = -1 entity.complete = lambda x: None + self.session.set_pool() self.repo.add_info(self.session, entity, self.repo.sources_by_uri['system']) cu = self.session.system_sql('SELECT * FROM entities WHERE eid = -1') data = cu.fetchall() @@ -388,13 +372,14 @@ self.assertEquals(data, []) -class FTITC(RepositoryBasedTC): +class FTITC(CubicWebTC): def test_reindex_and_modified_since(self): eidp = self.execute('INSERT Personne X: X nom "toto", X prenom "tutu"')[0][0] self.commit() ts = datetime.now() self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1) + self.session.set_pool() cu = self.session.system_sql('SELECT mtime, eid FROM entities WHERE eid = %s' % eidp) omtime = cu.fetchone()[0] # our sqlite datetime adapter is ignore seconds fraction, so we have to @@ -403,6 +388,7 @@ self.execute('SET X nom "tata" WHERE X eid %(x)s', {'x': eidp}, 'x') self.commit() self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1) + self.session.set_pool() cu = self.session.system_sql('SELECT mtime FROM entities WHERE eid = %s' % eidp) mtime = cu.fetchone()[0] self.failUnless(omtime < mtime) @@ -440,7 +426,7 @@ self.assertEquals(rset.rows, [[self.session.user.eid]]) -class DBInitTC(RepositoryBasedTC): +class DBInitTC(CubicWebTC): def test_versions_inserted(self): inserted = [r[0] for r in self.execute('Any K ORDERBY K WHERE P pkey K, P pkey ~= "system.version.%"')] @@ -450,11 +436,11 @@ u'system.version.file', u'system.version.folder', u'system.version.tag']) -class InlineRelHooksTC(RepositoryBasedTC): +class InlineRelHooksTC(CubicWebTC): """test relation hooks are called for inlined relations """ def setUp(self): - RepositoryBasedTC.setUp(self) + CubicWebTC.setUp(self) self.hm = self.repo.hm self.called = [] diff -r 8074dd88e21b -r a9a2dca5db20 server/test/unittest_rqlannotation.py --- a/server/test/unittest_rqlannotation.py Tue Aug 11 17:06:02 2009 +0200 +++ b/server/test/unittest_rqlannotation.py Tue Aug 11 17:15:54 2009 +0200 @@ -6,7 +6,7 @@ from cubicweb.devtools import init_test_database from cubicweb.devtools.repotest import BaseQuerierTC -repo, cnx = init_test_database('sqlite') +repo, cnx = init_test_database() class SQLGenAnnotatorTC(BaseQuerierTC): repo = repo diff -r 8074dd88e21b -r a9a2dca5db20 server/test/unittest_security.py --- a/server/test/unittest_security.py Tue Aug 11 17:06:02 2009 +0200 +++ b/server/test/unittest_security.py Tue Aug 11 17:15:54 2009 +0200 @@ -4,21 +4,21 @@ import sys from logilab.common.testlib import unittest_main, TestCase -from cubicweb.devtools.apptest import RepositoryBasedTC +from cubicweb.devtools.testlib import CubicWebTC from cubicweb import Unauthorized, ValidationError from cubicweb.server.querier import check_read_access -class BaseSecurityTC(RepositoryBasedTC): +class BaseSecurityTC(CubicWebTC): def setUp(self): - RepositoryBasedTC.setUp(self) + CubicWebTC.setUp(self) self.create_user('iaminusersgrouponly') self.readoriggroups = self.schema['Personne'].get_groups('read') self.addoriggroups = self.schema['Personne'].get_groups('add') def tearDown(self): - RepositoryBasedTC.tearDown(self) + CubicWebTC.tearDown(self) self.schema['Personne'].set_groups('read', self.readoriggroups) self.schema['Personne'].set_groups('add', self.addoriggroups) @@ -37,7 +37,7 @@ cu = cnx.cursor() self.assertRaises(Unauthorized, check_read_access, - self.schema, cnx.user(self.current_session()), rqlst, solution) + self.schema, cnx.user(self.session), rqlst, solution) self.assertRaises(Unauthorized, cu.execute, rql) def test_upassword_not_selectable(self): @@ -165,7 +165,7 @@ def test_insert_relation_rql_permission(self): cnx = self.login('iaminusersgrouponly') - session = self.current_session() + session = self.session cu = cnx.cursor(session) cu.execute("SET A concerne S WHERE A is Affaire, S is Societe") # should raise Unauthorized since user don't own S @@ -210,7 +210,7 @@ def test_user_can_change_its_upassword(self): - ueid = self.create_user('user') + ueid = self.create_user('user').eid cnx = self.login('user') cu = cnx.cursor() cu.execute('SET X upassword %(passwd)s WHERE X eid %(x)s', @@ -220,7 +220,7 @@ cnx = self.login('user', 'newpwd') def test_user_cant_change_other_upassword(self): - ueid = self.create_user('otheruser') + ueid = self.create_user('otheruser').eid cnx = self.login('iaminusersgrouponly') cu = cnx.cursor() cu.execute('SET X upassword %(passwd)s WHERE X eid %(x)s', @@ -416,7 +416,7 @@ def test_users_and_groups_non_readable_by_guests(self): cnx = self.login('anon') - anon = cnx.user(self.current_session()) + anon = cnx.user(self.session) cu = cnx.cursor() # anonymous user can only read itself rset = cu.execute('Any L WHERE X owned_by U, U login L') @@ -426,7 +426,7 @@ # anonymous user can read groups (necessary to check allowed transitions for instance) self.assert_(cu.execute('CWGroup X')) # should only be able to read the anonymous user, not another one - origuser = self.session.user + origuser = self.adminsession.user self.assertRaises(Unauthorized, cu.execute, 'CWUser X WHERE X eid %(x)s', {'x': origuser.eid}, 'x') # nothing selected, nothing updated, no exception raised @@ -462,7 +462,7 @@ self.commit() cnx = self.login('anon') cu = cnx.cursor() - anoneid = self.current_session().user.eid + anoneid = self.session.user.eid self.assertEquals(cu.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,' 'B bookmarked_by U, U eid %s' % anoneid).rows, [['index', '?vid=index']]) @@ -491,7 +491,7 @@ eid = self.execute('INSERT Affaire X: X ref "ARCT01"')[0][0] self.commit() cnx = self.login('iaminusersgrouponly') - session = self.current_session() + session = self.session # needed to avoid check_perm error session.set_pool() # needed to remove rql expr granting update perm to the user @@ -506,7 +506,7 @@ # XXX wether it should raise Unauthorized or ValidationError is not clear # the best would probably ValidationError if the transition doesn't exist # from the current state but Unauthorized if it exists but user can't pass it - self.assertRaises(ValidationError, cu.execute, rql, {'x': cnx.user(self.current_session()).eid}, 'x') + self.assertRaises(ValidationError, cu.execute, rql, {'x': cnx.user(self.session).eid}, 'x') def test_trinfo_security(self): aff = self.execute('INSERT Affaire X: X ref "ARCT01"').get_entity(0, 0) diff -r 8074dd88e21b -r a9a2dca5db20 server/test/unittest_ssplanner.py --- a/server/test/unittest_ssplanner.py Tue Aug 11 17:06:02 2009 +0200 +++ b/server/test/unittest_ssplanner.py Tue Aug 11 17:15:54 2009 +0200 @@ -10,7 +10,7 @@ from cubicweb.server.ssplanner import SSPlanner # keep cnx so it's not garbage collected and the associated session is closed -repo, cnx = init_test_database('sqlite') +repo, cnx = init_test_database() class SSPlannerTC(BasePlannerTC): repo = repo diff -r 8074dd88e21b -r a9a2dca5db20 sobjects/test/unittest_email.py --- a/sobjects/test/unittest_email.py Tue Aug 11 17:06:02 2009 +0200 +++ b/sobjects/test/unittest_email.py Tue Aug 11 17:15:54 2009 +0200 @@ -5,9 +5,9 @@ :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr :license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses """ -from cubicweb.devtools.apptest import EnvBasedTC +from cubicweb.devtools.testlib import CubicWebTC -class EmailAddressHooksTC(EnvBasedTC): +class EmailAddressHooksTC(CubicWebTC): def test_use_email_set_primary_email(self): self.execute('INSERT EmailAddress X: X address "admin@logilab.fr", U use_email X WHERE U login "admin"') diff -r 8074dd88e21b -r a9a2dca5db20 sobjects/test/unittest_hooks.py --- a/sobjects/test/unittest_hooks.py Tue Aug 11 17:06:02 2009 +0200 +++ b/sobjects/test/unittest_hooks.py Tue Aug 11 17:15:54 2009 +0200 @@ -6,9 +6,9 @@ :license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses """ from logilab.common.testlib import unittest_main -from cubicweb.devtools.apptest import EnvBasedTC +from cubicweb.devtools.testlib import CubicWebTC -class HooksTC(EnvBasedTC): +class HooksTC(CubicWebTC): def test_euser_login_stripped(self): u = self.create_user(' joe ') diff -r 8074dd88e21b -r a9a2dca5db20 sobjects/test/unittest_notification.py --- a/sobjects/test/unittest_notification.py Tue Aug 11 17:06:02 2009 +0200 +++ b/sobjects/test/unittest_notification.py Tue Aug 11 17:15:54 2009 +0200 @@ -9,7 +9,7 @@ from socket import gethostname from logilab.common.testlib import unittest_main, TestCase -from cubicweb.devtools.apptest import EnvBasedTC +from cubicweb.devtools.testlib import CubicWebTC from cubicweb.sobjects.notification import construct_message_id, parse_message_id @@ -48,7 +48,7 @@ self.assertNotEquals(msgid1, '<@testapp.%s>' % gethostname()) -class RecipientsFinderTC(EnvBasedTC): +class RecipientsFinderTC(CubicWebTC): def test(self): urset = self.execute('CWUser X WHERE X login "admin"') self.execute('INSERT EmailAddress X: X address "admin@logilab.fr", U primary_email X ' @@ -67,10 +67,10 @@ self.assertEquals(finder.recipients(), [('abcd@logilab.fr', 'en'), ('efgh@logilab.fr', 'en')]) -class StatusChangeViewsTC(EnvBasedTC): +class StatusChangeViewsTC(CubicWebTC): def test_status_change_view(self): - req = self.session() + req = self.session u = self.create_user('toto', req=req) assert u.req assert u.rset diff -r 8074dd88e21b -r a9a2dca5db20 sobjects/test/unittest_supervising.py --- a/sobjects/test/unittest_supervising.py Tue Aug 11 17:06:02 2009 +0200 +++ b/sobjects/test/unittest_supervising.py Tue Aug 11 17:15:54 2009 +0200 @@ -9,12 +9,12 @@ import re from logilab.common.testlib import unittest_main -from cubicweb.devtools.apptest import EnvBasedTC +from cubicweb.devtools.testlib import CubicWebTC from cubicweb.sobjects.supervising import SendMailOp, SupervisionMailOp -class SupervisingTC(EnvBasedTC): +class SupervisingTC(CubicWebTC): def setup_database(self): self.add_entity('Card', title=u"une news !", content=u"cubicweb c'est beau") @@ -26,7 +26,7 @@ def test_supervision(self): - session = self.session() + session = self.session # do some modification ueid = self.execute('INSERT CWUser X: X login "toto", X upassword "sosafe", X in_group G, X in_state S ' 'WHERE G name "users", S name "activated"')[0][0] @@ -75,7 +75,7 @@ self.assertEquals(op.to_send[0][1], ['test@logilab.fr']) def test_nonregr1(self): - session = self.session() + session = self.session # do some unlogged modification self.execute('SET X last_login_time NOW WHERE X eid %(x)s', {'x': session.user.eid}, 'x') self.commit() # no crash diff -r 8074dd88e21b -r a9a2dca5db20 test/unittest_dbapi.py --- a/test/unittest_dbapi.py Tue Aug 11 17:06:02 2009 +0200 +++ b/test/unittest_dbapi.py Tue Aug 11 17:15:54 2009 +0200 @@ -7,24 +7,21 @@ """ from cubicweb import ConnectionError from cubicweb.dbapi import ProgrammingError -from cubicweb.devtools.apptest import EnvBasedTC +from cubicweb.devtools.testlib import CubicWebTC -class DBAPITC(EnvBasedTC): - @property - def cnx(self): - return self.login('anon') +class DBAPITC(CubicWebTC): def test_public_repo_api(self): - cnx = self.cnx - self.assertEquals(cnx.get_schema(), self.env.repo.schema) + cnx = self.login('anon') + self.assertEquals(cnx.get_schema(), self.repo.schema) self.assertEquals(cnx.source_defs(), {'system': {'adapter': 'native', 'uri': 'system'}}) self.restore_connection() # proper way to close cnx self.assertRaises(ProgrammingError, cnx.get_schema) self.assertRaises(ProgrammingError, cnx.source_defs) def test_db_api(self): - cnx = self.cnx + cnx = self.login('anon') self.assertEquals(cnx.rollback(), None) self.assertEquals(cnx.commit(), None) self.restore_connection() # proper way to close cnx @@ -34,7 +31,7 @@ self.assertRaises(ProgrammingError, cnx.close) def test_api(self): - cnx = self.cnx + cnx = self.login('anon') self.assertEquals(cnx.user(None).login, 'anon') self.assertEquals(cnx.describe(1), (u'CWGroup', u'system', None)) self.restore_connection() # proper way to close cnx @@ -42,7 +39,7 @@ self.assertRaises(ConnectionError, cnx.describe, 1) def test_session_data_api(self): - cnx = self.cnx + cnx = self.login('anon') self.assertEquals(cnx.get_session_data('data'), None) self.assertEquals(cnx.session_data(), {}) cnx.set_session_data('data', 4) @@ -57,7 +54,7 @@ self.assertEquals(cnx.session_data(), {'data': 4}) def test_shared_data_api(self): - cnx = self.cnx + cnx = self.login('anon') self.assertEquals(cnx.get_shared_data('data'), None) cnx.set_shared_data('data', 4) self.assertEquals(cnx.get_shared_data('data'), 4) @@ -71,19 +68,6 @@ self.assertRaises(ConnectionError, cnx.set_shared_data, 'data', 0) self.assertRaises(ConnectionError, cnx.get_shared_data, 'data') - -# class DBAPICursorTC(EnvBasedTC): - -# @property -# def cursor(self): -# return self.env.cnx.cursor() - -# def test_api(self): -# cu = self.cursor -# self.assertEquals(cu.describe(1), (u'CWGroup', u'system', None)) -# #cu.close() -# #self.assertRaises(ConnectionError, cu.describe, 1) - if __name__ == '__main__': from logilab.common.testlib import unittest_main unittest_main() diff -r 8074dd88e21b -r a9a2dca5db20 test/unittest_entity.py --- a/test/unittest_entity.py Tue Aug 11 17:06:02 2009 +0200 +++ b/test/unittest_entity.py Tue Aug 11 17:15:54 2009 +0200 @@ -10,10 +10,10 @@ from datetime import datetime from cubicweb import Binary -from cubicweb.devtools.apptest import EnvBasedTC +from cubicweb.devtools.testlib import CubicWebTC from cubicweb.common.mttransforms import HAS_TAL -class EntityTC(EnvBasedTC): +class EntityTC(CubicWebTC): ## def setup_database(self): ## self.add_entity('Personne', nom=u'di mascio', prenom=u'adrien') @@ -23,18 +23,18 @@ ## embed=False) def test_boolean_value(self): - e = self.etype_instance('CWUser') + e = self.vreg['etypes'].etype_class('CWUser')(self.request()) self.failUnless(e) def test_yams_inheritance(self): from entities import Note - e = self.etype_instance('SubNote') + e = self.vreg['etypes'].etype_class('SubNote')(self.request()) self.assertIsInstance(e, Note) - e2 = self.etype_instance('SubNote') + e2 = self.vreg['etypes'].etype_class('SubNote')(self.request()) self.assertIs(e.__class__, e2.__class__) def test_has_eid(self): - e = self.etype_instance('CWUser') + e = self.vreg['etypes'].etype_class('CWUser')(self.request()) self.assertEquals(e.eid, None) self.assertEquals(e.has_eid(), False) e.eid = 'X' @@ -208,7 +208,7 @@ 1) def test_new_entity_unrelated(self): - e = self.etype_instance('CWUser') + e = self.vreg['etypes'].etype_class('CWUser')(self.request()) unrelated = [r[0] for r in e.unrelated('in_group', 'CWGroup', 'subject')] # should be default groups but owners, i.e. managers, users, guests self.assertEquals(len(unrelated), 3) @@ -227,7 +227,6 @@ self.assertEquals(e.printable_value('content'), '

\ndu *texte*\n

') e['title'] = 'zou' - #e = self.etype_instance('Task') e['content'] = '''\ a title ======= @@ -303,7 +302,7 @@ def test_fulltextindex(self): - e = self.etype_instance('File') + e = self.vreg['etypes'].etype_class('File')(self.request()) e['name'] = 'an html file' e['description'] = 'du html' e['description_format'] = 'text/html' @@ -324,7 +323,7 @@ def test_complete_relation(self): self.execute('SET RT add_permission G WHERE RT name "wf_info_for", G name "managers"') self.commit() - session = self.session() + session = self.session try: eid = session.unsafe_execute( 'INSERT TrInfo X: X comment "zou", X wf_info_for U, X from_state S1, X to_state S2 ' diff -r 8074dd88e21b -r a9a2dca5db20 test/unittest_rset.py --- a/test/unittest_rset.py Tue Aug 11 17:06:02 2009 +0200 +++ b/test/unittest_rset.py Tue Aug 11 17:15:54 2009 +0200 @@ -10,7 +10,7 @@ from logilab.common.testlib import TestCase, unittest_main -from cubicweb.devtools.apptest import EnvBasedTC +from cubicweb.devtools.testlib import CubicWebTC from cubicweb.selectors import traced_selection from urlparse import urlsplit @@ -55,7 +55,7 @@ -class ResultSetTC(EnvBasedTC): +class ResultSetTC(CubicWebTC): def setUp(self): super(ResultSetTC, self).setUp() @@ -100,7 +100,7 @@ 'Any U,L where U is CWUser, U login L', description=[['CWUser', 'String']] * 3) rs.req = self.request() - rs.vreg = self.env.vreg + rs.vreg = self.vreg self.assertEquals(rs.limit(2).rows, [[12000, 'adim'], [13000, 'syt']]) rs2 = rs.limit(2, offset=1) @@ -115,7 +115,7 @@ 'Any U,L where U is CWUser, U login L', description=[['CWUser', 'String']] * 3) rs.req = self.request() - rs.vreg = self.env.vreg + rs.vreg = self.vreg def test_filter(entity): return entity.login != 'nico' @@ -140,7 +140,7 @@ 'Any U,L where U is CWUser, U login L', description=[['CWUser', 'String']] * 3) rs.req = self.request() - rs.vreg = self.env.vreg + rs.vreg = self.vreg rs2 = rs.sorted_rset(lambda e:e['login']) self.assertEquals(len(rs2), 3) @@ -170,7 +170,7 @@ 'D created_by U, D title T', description=[['CWUser', 'String', 'String']] * 5) rs.req = self.request() - rs.vreg = self.env.vreg + rs.vreg = self.vreg rsets = rs.split_rset(lambda e:e['login']) self.assertEquals(len(rsets), 3) diff -r 8074dd88e21b -r a9a2dca5db20 test/unittest_selectors.py --- a/test/unittest_selectors.py Tue Aug 11 17:06:02 2009 +0200 +++ b/test/unittest_selectors.py Tue Aug 11 17:15:54 2009 +0200 @@ -8,7 +8,7 @@ from logilab.common.testlib import TestCase, unittest_main -from cubicweb.devtools.testlib import EnvBasedTC +from cubicweb.devtools.testlib import CubicWebTC from cubicweb.appobject import Selector, AndSelector, OrSelector from cubicweb.selectors import implements, match_user_groups from cubicweb.interfaces import IDownloadable @@ -88,7 +88,7 @@ self.assertIs(csel.search_selector(implements), sel) -class ImplementsSelectorTC(EnvBasedTC): +class ImplementsSelectorTC(CubicWebTC): def test_etype_priority(self): req = self.request() cls = self.vreg['etypes'].etype_class('File') @@ -103,7 +103,7 @@ self.failIf(implements('Societe').score_class(cls, self.request())) -class MatchUserGroupsTC(EnvBasedTC): +class MatchUserGroupsTC(CubicWebTC): def test_owners_group(self): """tests usage of 'owners' group with match_user_group""" class SomeAction(action.Action): @@ -118,16 +118,16 @@ self.create_user('john') self.login('john') # it should not be possible to use SomeAction not owned objects - rset, req = self.env.get_rset_and_req('Any G WHERE G is CWGroup, G name "managers"') + rset, req = self.rset_and_req('Any G WHERE G is CWGroup, G name "managers"') self.failIf('yo' in dict(self.pactions(req, rset))) # insert a new card, and check that we can use SomeAction on our object self.execute('INSERT Card C: C title "zoubidou"') self.commit() - rset, req = self.env.get_rset_and_req('Card C WHERE C title "zoubidou"') + rset, req = self.rset_and_req('Card C WHERE C title "zoubidou"') self.failUnless('yo' in dict(self.pactions(req, rset)), self.pactions(req, rset)) # make sure even managers can't use the action self.restore_connection() - rset, req = self.env.get_rset_and_req('Card C WHERE C title "zoubidou"') + rset, req = self.rset_and_req('Card C WHERE C title "zoubidou"') self.failIf('yo' in dict(self.pactions(req, rset))) finally: del self.vreg[SomeAction.__registry__][SomeAction.id] diff -r 8074dd88e21b -r a9a2dca5db20 web/test/test_views.py --- a/web/test/test_views.py Tue Aug 11 17:06:02 2009 +0200 +++ b/web/test/test_views.py Tue Aug 11 17:15:54 2009 +0200 @@ -6,7 +6,7 @@ :license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses """ -from cubicweb.devtools.testlib import WebTest, AutomaticWebTest +from cubicweb.devtools.testlib import CubicWebTC, AutoPopulateTest, AutomaticWebTest from cubicweb.view import AnyRsetView AutomaticWebTest.application_rql = [ @@ -15,7 +15,7 @@ 'Any COUNT(X) WHERE X is CWUser', ] -class ComposityCopy(WebTest): +class ComposityCopy(CubicWebTC): def test_regr_copy_view(self): """regression test: make sure we can ask a copy of a @@ -34,7 +34,7 @@ self.req.add_js('spam.js') -class ManualWebTests(WebTest): +class ManualCubicWebTCs(AutoPopulateTest): def setup_database(self): self.auto_populate(10) @@ -59,7 +59,7 @@ -class ExplicitViewsTest(WebTest): +class ExplicitViewsTest(CubicWebTC): def test_unrelateddivs(self): rset = self.execute('Any X WHERE X is CWUser, X login "admin"') diff -r 8074dd88e21b -r a9a2dca5db20 web/test/unittest_application.py --- a/web/test/unittest_application.py Tue Aug 11 17:06:02 2009 +0200 +++ b/web/test/unittest_application.py Tue Aug 11 17:15:54 2009 +0200 @@ -14,7 +14,7 @@ from logilab.common.testlib import TestCase, unittest_main from logilab.common.decorators import clear_cache -from cubicweb.devtools.apptest import EnvBasedTC +from cubicweb.devtools.testlib import CubicWebTC from cubicweb.devtools.fake import FakeRequest from cubicweb.web import Redirect, AuthenticationError, ExplicitLogin, INTERNAL_FIELD_VALUE from cubicweb.web.views.basecontrollers import ViewController @@ -134,7 +134,7 @@ for i in (12, 13, 14)]) -class ApplicationTC(EnvBasedTC): +class ApplicationTC(CubicWebTC): def publish(self, req, path='view'): return self.app.publish(path, req) diff -r 8074dd88e21b -r a9a2dca5db20 web/test/unittest_controller.py --- a/web/test/unittest_controller.py Tue Aug 11 17:06:02 2009 +0200 +++ b/web/test/unittest_controller.py Tue Aug 11 17:15:54 2009 +0200 @@ -7,37 +7,35 @@ from logilab.common.testlib import unittest_main -from cubicweb.devtools import apptest +from cubicweb.devtools import testlib -class BaseControllerTC(apptest.ControllerTC): +class BaseControllerTC(testlib.CubicWebTC): def test_parse_datetime_ok(self): - self.assertIsInstance(self.ctrl.parse_datetime('2006/06/24 12:18'), - datetime) - self.assertIsInstance(self.ctrl.parse_datetime('2006/06/24'), - date) - self.assertIsInstance(self.ctrl.parse_datetime('2006/06/24 12:18', 'Datetime'), - datetime) - self.assertIsInstance(self.ctrl.parse_datetime('2006/06/24', 'Datetime'), - datetime) - self.assertIsInstance(self.ctrl.parse_datetime('2006/06/24', 'Date'), - date) - self.assertIsInstance(self.ctrl.parse_datetime('12:18', 'Time'), - time) + ctrl = self.vreg['controllers'].select('view', self.request()) + pd = ctrl.parse_datetime + self.assertIsInstance(pd('2006/06/24 12:18'), datetime) + self.assertIsInstance(pd('2006/06/24'), date) + self.assertIsInstance(pd('2006/06/24 12:18', 'Datetime'), datetime) + self.assertIsInstance(pd('2006/06/24', 'Datetime'), datetime) + self.assertIsInstance(pd('2006/06/24', 'Date'), date) + self.assertIsInstance(pd('12:18', 'Time'), time) def test_parse_datetime_ko(self): + ctrl = self.vreg['controllers'].select('view', self.request()) + pd = ctrl.parse_datetime self.assertRaises(ValueError, - self.ctrl.parse_datetime, '2006/06/24 12:188', 'Datetime') + pd, '2006/06/24 12:188', 'Datetime') self.assertRaises(ValueError, - self.ctrl.parse_datetime, '2006/06/240', 'Datetime') + pd, '2006/06/240', 'Datetime') self.assertRaises(ValueError, - self.ctrl.parse_datetime, '2006/06/24 12:18', 'Date') + pd, '2006/06/24 12:18', 'Date') self.assertRaises(ValueError, - self.ctrl.parse_datetime, '2006/24/06', 'Date') + pd, '2006/24/06', 'Date') self.assertRaises(ValueError, - self.ctrl.parse_datetime, '2006/06/240', 'Date') + pd, '2006/06/240', 'Date') self.assertRaises(ValueError, - self.ctrl.parse_datetime, '12:188', 'Time') + pd, '12:188', 'Time') if __name__ == '__main__': unittest_main() diff -r 8074dd88e21b -r a9a2dca5db20 web/test/unittest_form.py --- a/web/test/unittest_form.py Tue Aug 11 17:06:02 2009 +0200 +++ b/web/test/unittest_form.py Tue Aug 11 17:15:54 2009 +0200 @@ -12,7 +12,7 @@ from logilab.common.testlib import unittest_main, mock_object from cubicweb import Binary -from cubicweb.devtools.testlib import WebTest +from cubicweb.devtools.testlib import CubicWebTC from cubicweb.web.formfields import (IntField, StringField, RichTextField, DateTimeField, DateTimePicker, FileField, EditableFileField) @@ -22,7 +22,7 @@ from cubicweb.web.views.formrenderers import FormRenderer -class FieldsFormTC(WebTest): +class FieldsFormTC(CubicWebTC): def test_form_field_format(self): form = FieldsForm(self.request(), None) @@ -32,7 +32,7 @@ self.assertEquals(form.form_field_format(None), 'text/rest') -class EntityFieldsFormTC(WebTest): +class EntityFieldsFormTC(CubicWebTC): def setUp(self): super(EntityFieldsFormTC, self).setUp() @@ -55,28 +55,28 @@ self.failIf(t.eid in unrelated, unrelated) def test_form_field_vocabulary_new_entity(self): - e = self.etype_instance('CWUser') - form = EntityFieldsForm(self.request(), None, entity=e) + e = self.vreg['etypes'].etype_class('CWUser')(self.request()) + form = EntityFieldsForm(e.req, None, entity=e) unrelated = [rview for rview, reid in form.subject_relation_vocabulary('in_group')] # should be default groups but owners, i.e. managers, users, guests self.assertEquals(unrelated, [u'guests', u'managers', u'users']) def test_subject_in_state_vocabulary(self): # on a new entity - e = self.etype_instance('CWUser') - form = EntityFieldsForm(self.request(), None, entity=e) + e = self.vreg['etypes'].etype_class('CWUser')(self.request()) + form = EntityFieldsForm(e.req, None, entity=e) states = list(form.subject_in_state_vocabulary('in_state')) self.assertEquals(len(states), 1) self.assertEquals(states[0][0], u'activated') # list of (combobox view, state eid) # on an existant entity e = self.user() - form = EntityFieldsForm(self.request(), None, entity=e) + form = EntityFieldsForm(e.req, None, entity=e) states = list(form.subject_in_state_vocabulary('in_state')) self.assertEquals(len(states), 1) self.assertEquals(states[0][0], u'deactivated') # list of (combobox view, state eid) def test_consider_req_form_params(self): - e = self.etype_instance('CWUser') + e = self.vreg['etypes'].etype_class('CWUser')(self.request()) e.eid = 'A' form = EntityFieldsForm(self.request(login=u'toto'), None, entity=e) field = StringField(name='login', eidparam=True) @@ -86,7 +86,7 @@ def test_linkto_field_duplication(self): - e = self.etype_instance('CWUser') + e = self.vreg['etypes'].etype_class('CWUser')(self.request()) e.eid = 'A' e.req = self.req geid = self.execute('CWGroup X WHERE X name "users"')[0][0] diff -r 8074dd88e21b -r a9a2dca5db20 web/test/unittest_formfields.py --- a/web/test/unittest_formfields.py Tue Aug 11 17:06:02 2009 +0200 +++ b/web/test/unittest_formfields.py Tue Aug 11 17:15:54 2009 +0200 @@ -11,7 +11,7 @@ from yams.constraints import StaticVocabularyConstraint, SizeConstraint from cubicweb.devtools import TestServerConfiguration -from cubicweb.devtools.testlib import EnvBasedTC +from cubicweb.devtools.testlib import CubicWebTC from cubicweb.web.formwidgets import PasswordInput, TextArea, Select, Radio from cubicweb.web.formfields import * from cubicweb.web.views.forms import EntityFieldsForm @@ -112,11 +112,11 @@ [(u'maybe', '1'), (u'no', '')]) -class MoreFieldsTC(EnvBasedTC): +class MoreFieldsTC(CubicWebTC): def test_rtf_format_field(self): req = self.request() req.use_fckeditor = lambda: False - e = self.etype_instance('State') + e = self.vreg['etypes'].etype_class('State')(req) form = EntityFieldsForm(req, entity=e) description_field = guess_field(schema['State'], schema['description']) description_format_field = description_field.get_format_field(form) diff -r 8074dd88e21b -r a9a2dca5db20 web/test/unittest_magicsearch.py --- a/web/test/unittest_magicsearch.py Tue Aug 11 17:06:02 2009 +0200 +++ b/web/test/unittest_magicsearch.py Tue Aug 11 17:15:54 2009 +0200 @@ -13,21 +13,17 @@ from rql import BadRQLQuery, RQLSyntaxError -from cubicweb.devtools.apptest import EnvBasedTC, TestEnvironment +from cubicweb.devtools.testlib import CubicWebTC translations = { u'CWUser' : u"Utilisateur", -# u'Workcase' : u"Affaire", u'EmailAddress' : u"Adresse", -# u'Division' : u"Division", -# u'Comment' : u"Commentaire", u'name' : u"nom", u'alias' : u"nom", u'surname' : u"nom", u'firstname' : u"prénom", u'state' : u"état", -# u'subject' : u"sujet", u'address' : u"adresse", u'use_email' : u"adel", } @@ -37,12 +33,12 @@ from cubicweb.web.views.magicsearch import translate_rql_tree, QSPreProcessor, QueryTranslator -class QueryTranslatorTC(EnvBasedTC): +class QueryTranslatorTC(CubicWebTC): """test suite for QueryTranslatorTC""" def setUp(self): super(QueryTranslatorTC, self).setUp() - self.req = self.env.create_request() + self.req = self.request() self.vreg.config.translations = {'en': _translate} proc = self.vreg['components'].select('magicsearch', self.req) self.proc = [p for p in proc.processors if isinstance(p, QueryTranslator)][0] @@ -63,7 +59,7 @@ self.assertEquals(rql, "Any P WHERE P is CWUser, P use_email C, P surname 'Smith'") -class QSPreProcessorTC(EnvBasedTC): +class QSPreProcessorTC(CubicWebTC): """test suite for QSPreProcessor""" def setUp(self): super(QSPreProcessorTC, self).setUp() @@ -88,7 +84,6 @@ eschema = self.schema.eschema('CWUser') self.assertEquals(translate(u'prénom', eschema), "firstname") self.assertEquals(translate(u'nom', eschema), 'surname') - #self.assert_(translate(u'nom') in ('name', 'surname')) eschema = self.schema.eschema('EmailAddress') self.assertEquals(translate(u'adresse', eschema), "address") self.assertEquals(translate(u'nom', eschema), 'alias') @@ -125,7 +120,6 @@ self.assertEquals(transform(u'adresse', 'Logi%'), ('EmailAddress E WHERE E alias LIKE %(text)s', {'text': 'Logi%'})) self.assertRaises(BadRQLQuery, transform, "pers", "taratata") - #self.assertEquals(transform('CWUser', '%mi'), 'CWUser E WHERE P surname LIKE "%mi"') def test_three_words_query(self): """tests the 'three words shortcut queries'""" @@ -180,7 +174,7 @@ ## Processor Chains tests ############################################ -class ProcessorChainTC(EnvBasedTC): +class ProcessorChainTC(CubicWebTC): """test suite for magic_search's processor chains""" def setUp(self): @@ -195,7 +189,7 @@ (u'foo', ("Any X WHERE X has_text %(text)s", {'text': u'foo'})), # XXX this sounds like a language translator test... - # and it fail + # and it fails (u'Utilisateur Smith', ('CWUser C WHERE C has_text %(text)s', {'text': u'Smith'})), (u'utilisateur nom Smith', diff -r 8074dd88e21b -r a9a2dca5db20 web/test/unittest_urlpublisher.py --- a/web/test/unittest_urlpublisher.py Tue Aug 11 17:06:02 2009 +0200 +++ b/web/test/unittest_urlpublisher.py Tue Aug 11 17:15:54 2009 +0200 @@ -11,15 +11,14 @@ from logilab.common.testlib import unittest_main -from cubicweb.devtools.apptest import EnvBasedTC -from cubicweb.devtools._apptest import FakeRequest - from cubicweb.rset import ResultSet +from cubicweb.devtools.testlib import CubicWebTC +from cubicweb.devtools.fake import FakeRequest from cubicweb.web import NotFound, Redirect from cubicweb.web.views.urlrewrite import SimpleReqRewriter -class URLPublisherTC(EnvBasedTC): +class URLPublisherTC(CubicWebTC): """test suite for QSPreProcessor""" def setup_database(self): @@ -30,7 +29,7 @@ def process(self, url): req = self.req = self.request() - return self.env.app.url_resolver.process(req, url) + return self.app.url_resolver.process(req, url) def test_raw_path(self): """tests raw path resolution'""" diff -r 8074dd88e21b -r a9a2dca5db20 web/test/unittest_urlrewrite.py --- a/web/test/unittest_urlrewrite.py Tue Aug 11 17:06:02 2009 +0200 +++ b/web/test/unittest_urlrewrite.py Tue Aug 11 17:15:54 2009 +0200 @@ -7,8 +7,8 @@ """ from logilab.common.testlib import TestCase, unittest_main -from cubicweb.devtools._apptest import FakeRequest -from cubicweb.devtools.apptest import EnvBasedTC +from cubicweb.devtools.testlib import CubicWebTC +from cubicweb.devtools.fake import FakeRequest from cubicweb.web.views.urlrewrite import SimpleReqRewriter, SchemaBasedRewriter, rgx, rgx_action @@ -82,7 +82,7 @@ -class RgxActionRewriteTC(EnvBasedTC): +class RgxActionRewriteTC(CubicWebTC): def setup_database(self): self.p1 = self.create_user(u'user1') diff -r 8074dd88e21b -r a9a2dca5db20 web/test/unittest_views_actions.py --- a/web/test/unittest_views_actions.py Tue Aug 11 17:06:02 2009 +0200 +++ b/web/test/unittest_views_actions.py Tue Aug 11 17:15:54 2009 +0200 @@ -7,9 +7,9 @@ """ from logilab.common.testlib import unittest_main -from cubicweb.devtools.apptest import EnvBasedTC +from cubicweb.devtools.testlib import CubicWebTC -class ActionsTC(EnvBasedTC): +class ActionsTC(CubicWebTC): def test_view_action(self): req = self.request(__message='bla bla bla', vid='rss', rql='CWUser X') rset = self.execute('CWUser X') diff -r 8074dd88e21b -r a9a2dca5db20 web/test/unittest_views_basecontrollers.py --- a/web/test/unittest_views_basecontrollers.py Tue Aug 11 17:06:02 2009 +0200 +++ b/web/test/unittest_views_basecontrollers.py Tue Aug 11 17:15:54 2009 +0200 @@ -8,42 +8,39 @@ import simplejson from logilab.common.testlib import unittest_main, mock_object -from cubicweb.devtools.apptest import EnvBasedTC, ControllerTC from cubicweb import Binary, NoSelectableObject, ValidationError from cubicweb.view import STRICT_DOCTYPE +from cubicweb.devtools.testlib import CubicWebTC from cubicweb.common.uilib import rql_for_eid - from cubicweb.web import INTERNAL_FIELD_VALUE, Redirect, RequestError - from cubicweb.entities.authobjs import CWUser -class EditControllerTC(ControllerTC): +class EditControllerTC(CubicWebTC): def setUp(self): - ControllerTC.setUp(self) + CubicWebTC.setUp(self) self.failUnless('users' in self.schema.eschema('CWGroup').get_groups('read')) def tearDown(self): - ControllerTC.tearDown(self) + CubicWebTC.tearDown(self) self.failUnless('users' in self.schema.eschema('CWGroup').get_groups('read')) def test_noparam_edit(self): """check behaviour of this controller without any form parameter """ - - self.req.form = {} - self.assertRaises(ValidationError, self.publish, self.req) + self.assertRaises(ValidationError, self.publish, self.request()) def test_validation_unique(self): """test creation of two linked entities """ user = self.user() - self.req.form = {'eid': 'X', '__type:X': 'CWUser', - 'login:X': u'admin', 'edits-login:X': u'', - 'upassword:X': u'toto', 'upassword-confirm:X': u'toto', 'edits-upassword:X': u'', - } - self.assertRaises(ValidationError, self.publish, self.req) + req = self.request() + req.form = {'eid': 'X', '__type:X': 'CWUser', + 'login:X': u'admin', 'edits-login:X': u'', + 'upassword:X': u'toto', 'upassword-confirm:X': u'toto', 'edits-upassword:X': u'', + } + self.assertRaises(ValidationError, self.publish, req) def test_user_editing_itself(self): @@ -54,7 +51,8 @@ groupeids = [eid for eid, in self.execute('CWGroup G WHERE G name in ("managers", "users")')] groups = [str(eid) for eid in groupeids] stateeid = [eid for eid, in self.execute('State S WHERE S name "activated"')][0] - self.req.form = { + req = self.request() + req.form = { 'eid': `user.eid`, '__type:'+`user.eid`: 'CWUser', 'login:'+`user.eid`: unicode(user.login), @@ -69,7 +67,7 @@ 'edits-in_group:'+`user.eid`: basegroups, 'edits-in_state:'+`user.eid`: `stateeid`, } - path, params = self.expect_redirect_publish() + path, params = self.expect_redirect_publish(req) e = self.execute('Any X WHERE X eid %(x)s', {'x': user.eid}, 'x').get_entity(0, 0) self.assertEquals(e.firstname, u'Th\xe9nault') self.assertEquals(e.surname, u'Sylvain') @@ -81,8 +79,6 @@ user = self.create_user('user') cnx = self.login('user') req = self.request() - #self.assertEquals(self.ctrl.schema['CWUser']._groups['read'], - # ('managers', 'users')) req.form = { 'eid': `user.eid`, '__type:'+`user.eid`: 'CWUser', '__maineid' : str(user.eid), @@ -101,7 +97,8 @@ """ user = self.user() groupeids = [eid for eid, in self.execute('CWGroup G WHERE X in_group G, X eid %(x)s', {'x': user.eid})] - self.req.form = { + req = self.request() + req.form = { 'eid': `user.eid`, '__type:'+`user.eid`: 'CWUser', 'login:'+`user.eid`: unicode(user.login), @@ -112,7 +109,7 @@ 'edits-firstname:'+`user.eid`: u'', 'edits-surname:'+`user.eid`: u'', } - path, params = self.expect_redirect_publish() + path, params = self.expect_redirect_publish(req) e = self.execute('Any X WHERE X eid %(x)s', {'x': user.eid}, 'x').get_entity(0, 0) self.assertEquals(e.login, user.login) self.assertEquals(e.firstname, u'Th\xe9nault') @@ -124,21 +121,22 @@ def test_create_multiple_linked(self): gueid = self.execute('CWGroup G WHERE G name "users"')[0][0] - self.req.form = {'eid': ['X', 'Y'], - - '__type:X': 'CWUser', - '__maineid' : 'X', - 'login:X': u'adim', 'edits-login:X': u'', - 'upassword:X': u'toto', 'upassword-confirm:X': u'toto', 'edits-upassword:X': u'', - 'surname:X': u'Di Mascio', 'edits-surname:X': '', + req = self.request() + req.form = {'eid': ['X', 'Y'], - 'in_group:X': `gueid`, 'edits-in_group:X': INTERNAL_FIELD_VALUE, + '__type:X': 'CWUser', + '__maineid' : 'X', + 'login:X': u'adim', 'edits-login:X': u'', + 'upassword:X': u'toto', 'upassword-confirm:X': u'toto', 'edits-upassword:X': u'', + 'surname:X': u'Di Mascio', 'edits-surname:X': '', - '__type:Y': 'EmailAddress', - 'address:Y': u'dima@logilab.fr', 'edits-address:Y': '', - 'use_email:X': 'Y', 'edits-use_email:X': INTERNAL_FIELD_VALUE, - } - path, params = self.expect_redirect_publish() + 'in_group:X': `gueid`, 'edits-in_group:X': INTERNAL_FIELD_VALUE, + + '__type:Y': 'EmailAddress', + 'address:Y': u'dima@logilab.fr', 'edits-address:Y': '', + 'use_email:X': 'Y', 'edits-use_email:X': INTERNAL_FIELD_VALUE, + } + path, params = self.expect_redirect_publish(req) # should be redirected on the created person self.assertEquals(path, 'cwuser/adim') e = self.execute('Any P WHERE P surname "Di Mascio"').get_entity(0, 0) @@ -148,17 +146,18 @@ def test_edit_multiple_linked(self): peid = self.create_user('adim').eid - self.req.form = {'eid': [`peid`, 'Y'], - '__type:%s'%peid: 'CWUser', - 'surname:%s'%peid: u'Di Masci', 'edits-surname:%s'%peid: '', + req = self.request() + req.form = {'eid': [`peid`, 'Y'], + '__type:%s'%peid: 'CWUser', + 'surname:%s'%peid: u'Di Masci', 'edits-surname:%s'%peid: '', - '__type:Y': 'EmailAddress', - 'address:Y': u'dima@logilab.fr', 'edits-address:Y': '', - 'use_email:%s'%peid: 'Y', 'edits-use_email:%s'%peid: INTERNAL_FIELD_VALUE, + '__type:Y': 'EmailAddress', + 'address:Y': u'dima@logilab.fr', 'edits-address:Y': '', + 'use_email:%s'%peid: 'Y', 'edits-use_email:%s'%peid: INTERNAL_FIELD_VALUE, - '__redirectrql': 'Any X WHERE X eid %s'%peid, - } - path, params = self.expect_redirect_publish() + '__redirectrql': 'Any X WHERE X eid %s'%peid, + } + path, params = self.expect_redirect_publish(req) # should be redirected on the created person eid = params['rql'].split()[-1] e = self.execute('Any X WHERE X eid %(x)s', {'x': eid}, 'x').get_entity(0, 0) @@ -167,7 +166,8 @@ self.assertEquals(email.address, 'dima@logilab.fr') emaileid = email.eid - self.req.form = {'eid': [`peid`, `emaileid`], + req = self.request() + req.form = {'eid': [`peid`, `emaileid`], '__type:%s'%peid: 'CWUser', 'surname:%s'%peid: u'Di Masci', 'edits-surname:%s'%peid: 'Di Masci', '__type:%s'%emaileid: 'EmailAddress', @@ -175,7 +175,7 @@ 'use_email:%s'%peid: `emaileid`, 'edits-use_email:%s'%peid: `emaileid`, '__redirectrql': 'Any X WHERE X eid %s'%peid, } - path, params = self.expect_redirect_publish() + path, params = self.expect_redirect_publish(req) # should be redirected on the created person eid = params['rql'].split()[-1] e = self.execute('Any X WHERE X eid %(x)s', {'x': eid}, 'x').get_entity(0, 0) @@ -188,41 +188,47 @@ """test creation of two linked entities """ user = self.user() - self.req.form = {'__cloned_eid:X': user.eid, - 'eid': 'X', '__type:X': 'CWUser', - 'login:X': u'toto', 'edits-login:X': u'', - 'upassword:X': u'toto', 'edits-upassword:X': u'', - } - self.assertRaises(ValidationError, self.publish, self.req) - self.req.form = {'__cloned_eid:X': user.eid, - 'eid': 'X', '__type:X': 'CWUser', - 'login:X': u'toto', 'edits-login:X': u'', - 'upassword:X': u'toto', 'upassword-confirm:X': u'tutu', 'edits-upassword:X': u'', - } - self.assertRaises(ValidationError, self.publish, self.req) + req = self.request() + req.form = {'__cloned_eid:X': user.eid, + 'eid': 'X', '__type:X': 'CWUser', + 'login:X': u'toto', 'edits-login:X': u'', + 'upassword:X': u'toto', 'edits-upassword:X': u'', + } + self.assertRaises(ValidationError, self.publish, req) + req = self.request() + req.form = {'__cloned_eid:X': user.eid, + 'eid': 'X', '__type:X': 'CWUser', + 'login:X': u'toto', 'edits-login:X': u'', + 'upassword:X': u'toto', + 'upassword-confirm:X': u'tutu', 'edits-upassword:X': u'', + } + self.assertRaises(ValidationError, self.publish, req) def test_interval_bound_constraint_success(self): feid = self.execute('INSERT File X: X name "toto.txt", X data %(data)s', {'data': Binary('yo')})[0][0] - self.req.form = {'eid': ['X'], - '__type:X': 'Salesterm', - 'amount:X': u'-10', 'edits-amount:X': '', - 'described_by_test:X': str(feid), 'edits-described_by_test:X': INTERNAL_FIELD_VALUE, - } - self.assertRaises(ValidationError, self.publish, self.req) - self.req.form = {'eid': ['X'], - '__type:X': 'Salesterm', - 'amount:X': u'110', 'edits-amount:X': '', - 'described_by_test:X': str(feid), 'edits-described_by_test:X': INTERNAL_FIELD_VALUE, - } - self.assertRaises(ValidationError, self.publish, self.req) - self.req.form = {'eid': ['X'], - '__type:X': 'Salesterm', - 'amount:X': u'10', 'edits-amount:X': '', - 'described_by_test:X': str(feid), 'edits-described_by_test:X': INTERNAL_FIELD_VALUE, - } - self.expect_redirect_publish() + req = self.request() + req.form = {'eid': ['X'], + '__type:X': 'Salesterm', + 'amount:X': u'-10', 'edits-amount:X': '', + 'described_by_test:X': str(feid), 'edits-described_by_test:X': INTERNAL_FIELD_VALUE, + } + self.assertRaises(ValidationError, self.publish, req) + req = self.request() + req.form = {'eid': ['X'], + '__type:X': 'Salesterm', + 'amount:X': u'110', 'edits-amount:X': '', + 'described_by_test:X': str(feid), 'edits-described_by_test:X': INTERNAL_FIELD_VALUE, + } + self.assertRaises(ValidationError, self.publish, req) + req = self.request() + req.form = {'eid': ['X'], + '__type:X': 'Salesterm', + 'amount:X': u'10', 'edits-amount:X': '', + 'described_by_test:X': str(feid), 'edits-described_by_test:X': INTERNAL_FIELD_VALUE, + } + self.expect_redirect_publish(req) # should be redirected on the created #eid = params['rql'].split()[-1] e = self.execute('Salesterm X').get_entity(0, 0) @@ -232,12 +238,13 @@ """make sure req's pending insertions are taken into account""" tmpgroup = self.add_entity('CWGroup', name=u"test") user = self.user() - self.req.set_session_data('pending_insert', set([(user.eid, 'in_group', tmpgroup.eid)])) - path, params = self.expect_redirect_publish() + req = self.request() + req.set_session_data('pending_insert', set([(user.eid, 'in_group', tmpgroup.eid)])) + path, params = self.expect_redirect_publish(req) usergroups = [gname for gname, in self.execute('Any N WHERE G name N, U in_group G, U eid %(u)s', {'u': user.eid})] self.assertUnorderedIterableEquals(usergroups, ['managers', 'test']) - self.assertEquals(self.req.get_pending_inserts(), []) + self.assertEquals(req.get_pending_inserts(), []) def test_req_pending_delete(self): @@ -250,12 +257,13 @@ # just make sure everything was set correctly self.assertUnorderedIterableEquals(usergroups, ['managers', 'test']) # now try to delete the relation - self.req.set_session_data('pending_delete', set([(user.eid, 'in_group', groupeid)])) - path, params = self.expect_redirect_publish() + req = self.request() + req.set_session_data('pending_delete', set([(user.eid, 'in_group', groupeid)])) + path, params = self.expect_redirect_publish(req) usergroups = [gname for gname, in self.execute('Any N WHERE G name N, U in_group G, U eid %(u)s', {'u': user.eid})] self.assertUnorderedIterableEquals(usergroups, ['managers']) - self.assertEquals(self.req.get_pending_deletes(), []) + self.assertEquals(req.get_pending_deletes(), []) def test_custom_attribute_handler(self): def custom_login_edit(self, formparams, value, relations): @@ -265,13 +273,14 @@ try: user = self.user() eid = repr(user.eid) - self.req.form = { + req = self.request() + req.form = { 'eid': eid, '__type:'+eid: 'CWUser', 'login:'+eid: u'foo', 'edits-login:'+eid: unicode(user.login), } - path, params = self.expect_redirect_publish() + path, params = self.expect_redirect_publish(req) rset = self.execute('Any L WHERE X eid %(x)s, X login L', {'x': user.eid}, 'x') self.assertEquals(rset[0][0], 'FOO') finally: @@ -279,18 +288,19 @@ def test_redirect_apply_button(self): redirectrql = rql_for_eid(4012) # whatever - self.req.form = { - 'eid': 'A', '__type:A': 'BlogEntry', - '__maineid' : 'A', - 'content:A': u'"13:03:43"', 'edits-content:A': '', - 'title:A': u'huuu', 'edits-title:A': '', - '__redirectrql': redirectrql, - '__redirectvid': 'primary', - '__redirectparams': 'toto=tutu&tata=titi', - '__form_id': 'edition', - '__action_apply': '', - } - path, params = self.expect_redirect_publish() + req = self.request() + req.form = { + 'eid': 'A', '__type:A': 'BlogEntry', + '__maineid' : 'A', + 'content:A': u'"13:03:43"', 'edits-content:A': '', + 'title:A': u'huuu', 'edits-title:A': '', + '__redirectrql': redirectrql, + '__redirectvid': 'primary', + '__redirectparams': 'toto=tutu&tata=titi', + '__form_id': 'edition', + '__action_apply': '', + } + path, params = self.expect_redirect_publish(req) self.failUnless(path.startswith('blogentry/')) eid = path.split('/')[1] self.assertEquals(params['vid'], 'edition') @@ -301,17 +311,18 @@ def test_redirect_ok_button(self): redirectrql = rql_for_eid(4012) # whatever - self.req.form = { - 'eid': 'A', '__type:A': 'BlogEntry', - '__maineid' : 'A', - 'content:A': u'"13:03:43"', 'edits-content:A': '', - 'title:A': u'huuu', 'edits-title:A': '', - '__redirectrql': redirectrql, - '__redirectvid': 'primary', - '__redirectparams': 'toto=tutu&tata=titi', - '__form_id': 'edition', - } - path, params = self.expect_redirect_publish() + req = self.request() + req.form = { + 'eid': 'A', '__type:A': 'BlogEntry', + '__maineid' : 'A', + 'content:A': u'"13:03:43"', 'edits-content:A': '', + 'title:A': u'huuu', 'edits-title:A': '', + '__redirectrql': redirectrql, + '__redirectvid': 'primary', + '__redirectparams': 'toto=tutu&tata=titi', + '__form_id': 'edition', + } + path, params = self.expect_redirect_publish(req) self.assertEquals(path, 'view') self.assertEquals(params['rql'], redirectrql) self.assertEquals(params['vid'], 'primary') @@ -320,27 +331,30 @@ def test_redirect_delete_button(self): eid = self.add_entity('BlogEntry', title=u'hop', content=u'hop').eid - self.req.form = {'eid': str(eid), '__type:%s'%eid: 'BlogEntry', - '__action_delete': ''} - path, params = self.expect_redirect_publish() + req = self.request() + req.form = {'eid': str(eid), '__type:%s'%eid: 'BlogEntry', + '__action_delete': ''} + path, params = self.expect_redirect_publish(req) self.assertEquals(path, 'blogentry') self.assertEquals(params, {u'__message': u'entity deleted'}) eid = self.add_entity('EmailAddress', address=u'hop@logilab.fr').eid self.execute('SET X use_email E WHERE E eid %(e)s, X eid %(x)s', - {'x': self.session().user.eid, 'e': eid}, 'x') + {'x': self.session.user.eid, 'e': eid}, 'x') self.commit() - self.req.form = {'eid': str(eid), '__type:%s'%eid: 'EmailAddress', - '__action_delete': ''} - path, params = self.expect_redirect_publish() + req = self.request() + req.form = {'eid': str(eid), '__type:%s'%eid: 'EmailAddress', + '__action_delete': ''} + path, params = self.expect_redirect_publish(req) self.assertEquals(path, 'cwuser/admin') self.assertEquals(params, {u'__message': u'entity deleted'}) eid1 = self.add_entity('BlogEntry', title=u'hop', content=u'hop').eid eid2 = self.add_entity('EmailAddress', address=u'hop@logilab.fr').eid - self.req.form = {'eid': [str(eid1), str(eid2)], - '__type:%s'%eid1: 'BlogEntry', - '__type:%s'%eid2: 'EmailAddress', - '__action_delete': ''} - path, params = self.expect_redirect_publish() + req = self.request() + req.form = {'eid': [str(eid1), str(eid2)], + '__type:%s'%eid1: 'BlogEntry', + '__type:%s'%eid2: 'EmailAddress', + '__action_delete': ''} + path, params = self.expect_redirect_publish(req) self.assertEquals(path, 'view') self.assertEquals(params, {u'__message': u'entities deleted'}) @@ -351,23 +365,24 @@ groups = [str(eid) for eid in groupeids] eeetypeeid = self.execute('CWEType X WHERE X name "CWGroup"')[0][0] basegroups = [str(eid) for eid, in self.execute('CWGroup G WHERE X read_permission G, X eid %(x)s', {'x': eeetypeeid})] - self.req.form = { - 'eid': `eeetypeeid`, - '__type:'+`eeetypeeid`: 'CWEType', - 'name:'+`eeetypeeid`: u'CWGroup', - 'final:'+`eeetypeeid`: False, - 'meta:'+`eeetypeeid`: True, - 'description:'+`eeetypeeid`: u'users group', - 'read_permission:'+`eeetypeeid`: groups, - # - 'edits-name:'+`eeetypeeid`: u'CWGroup', - 'edits-final:'+`eeetypeeid`: False, - 'edits-meta:'+`eeetypeeid`: True, - 'edits-description:'+`eeetypeeid`: u'users group', - 'edits-read_permission:'+`eeetypeeid`: basegroups, - } + req = self.request() + req.form = { + 'eid': `eeetypeeid`, + '__type:'+`eeetypeeid`: 'CWEType', + 'name:'+`eeetypeeid`: u'CWGroup', + 'final:'+`eeetypeeid`: False, + 'meta:'+`eeetypeeid`: True, + 'description:'+`eeetypeeid`: u'users group', + 'read_permission:'+`eeetypeeid`: groups, + # + 'edits-name:'+`eeetypeeid`: u'CWGroup', + 'edits-final:'+`eeetypeeid`: False, + 'edits-meta:'+`eeetypeeid`: True, + 'edits-description:'+`eeetypeeid`: u'users group', + 'edits-read_permission:'+`eeetypeeid`: basegroups, + } try: - path, params = self.expect_redirect_publish() + path, params = self.expect_redirect_publish(req) e = self.execute('Any X WHERE X eid %(x)s', {'x': eeetypeeid}, 'x').get_entity(0, 0) self.assertEquals(e.name, 'CWGroup') self.assertEquals([g.eid for g in e.read_permission], groupeids) @@ -383,23 +398,24 @@ groups = [str(eid) for eid in groupeids] eeetypeeid = self.execute('CWEType X WHERE X name "CWEType"')[0][0] basegroups = [str(eid) for eid, in self.execute('CWGroup G WHERE X read_permission G, X eid %(x)s', {'x': eeetypeeid})] - self.req.form = { - 'eid': `eeetypeeid`, - '__type:'+`eeetypeeid`: 'CWEType', - 'name:'+`eeetypeeid`: u'CWEType', - 'final:'+`eeetypeeid`: False, - 'meta:'+`eeetypeeid`: True, - 'description:'+`eeetypeeid`: u'users group', - 'read_permission:'+`eeetypeeid`: groups, + req = self.request() + req.form = { + 'eid': `eeetypeeid`, + '__type:'+`eeetypeeid`: 'CWEType', + 'name:'+`eeetypeeid`: u'CWEType', + 'final:'+`eeetypeeid`: False, + 'meta:'+`eeetypeeid`: True, + 'description:'+`eeetypeeid`: u'users group', + 'read_permission:'+`eeetypeeid`: groups, - 'edits-name:'+`eeetypeeid`: u'CWEType', - 'edits-final:'+`eeetypeeid`: False, - 'edits-meta:'+`eeetypeeid`: True, - 'edits-description:'+`eeetypeeid`: u'users group', - 'edits-read_permission:'+`eeetypeeid`: basegroups, - } + 'edits-name:'+`eeetypeeid`: u'CWEType', + 'edits-final:'+`eeetypeeid`: False, + 'edits-meta:'+`eeetypeeid`: True, + 'edits-description:'+`eeetypeeid`: u'users group', + 'edits-read_permission:'+`eeetypeeid`: basegroups, + } try: - path, params = self.expect_redirect_publish() + path, params = self.expect_redirect_publish(req) e = self.execute('Any X WHERE X eid %(x)s', {'x': eeetypeeid}, 'x').get_entity(0, 0) self.assertEquals(e.name, 'CWEType') self.assertEquals(sorted(g.eid for g in e.read_permission), groupeids) @@ -413,12 +429,13 @@ this seems to be postgres (tsearch?) specific """ - self.req.form = { - 'eid': 'A', '__type:A': 'BlogEntry', - '__maineid' : 'A', - 'title:A': u'"13:03:40"', 'edits-title:A': '', - 'content:A': u'"13:03:43"', 'edits-content:A': ''} - path, params = self.expect_redirect_publish() + req = self.request() + req.form = { + 'eid': 'A', '__type:A': 'BlogEntry', + '__maineid' : 'A', + 'title:A': u'"13:03:40"', 'edits-title:A': '', + 'content:A': u'"13:03:43"', 'edits-content:A': ''} + path, params = self.expect_redirect_publish(req) self.failUnless(path.startswith('blogentry/')) eid = path.split('/')[1] e = self.execute('Any C, T WHERE C eid %(x)s, C content T', {'x': eid}, 'x').get_entity(0, 0) @@ -428,29 +445,31 @@ def test_nonregr_multiple_empty_email_addr(self): gueid = self.execute('CWGroup G WHERE G name "users"')[0][0] - self.req.form = {'eid': ['X', 'Y'], - - '__type:X': 'CWUser', - 'login:X': u'adim', 'edits-login:X': u'', - 'upassword:X': u'toto', 'upassword-confirm:X': u'toto', 'edits-upassword:X': u'', - 'in_group:X': `gueid`, 'edits-in_group:X': INTERNAL_FIELD_VALUE, + req = self.request() + req.form = {'eid': ['X', 'Y'], - '__type:Y': 'EmailAddress', - 'address:Y': u'', 'edits-address:Y': '', - 'alias:Y': u'', 'edits-alias:Y': '', - 'use_email:X': 'Y', 'edits-use_email:X': INTERNAL_FIELD_VALUE, - } - self.assertRaises(ValidationError, self.publish, self.req) + '__type:X': 'CWUser', + 'login:X': u'adim', 'edits-login:X': u'', + 'upassword:X': u'toto', 'upassword-confirm:X': u'toto', 'edits-upassword:X': u'', + 'in_group:X': `gueid`, 'edits-in_group:X': INTERNAL_FIELD_VALUE, + + '__type:Y': 'EmailAddress', + 'address:Y': u'', 'edits-address:Y': '', + 'alias:Y': u'', 'edits-alias:Y': '', + 'use_email:X': 'Y', 'edits-use_email:X': INTERNAL_FIELD_VALUE, + } + self.assertRaises(ValidationError, self.publish, req) def test_nonregr_copy(self): user = self.user() - self.req.form = {'__cloned_eid:X': user.eid, - 'eid': 'X', '__type:X': 'CWUser', - '__maineid' : 'X', - 'login:X': u'toto', 'edits-login:X': u'', - 'upassword:X': u'toto', 'upassword-confirm:X': u'toto', 'edits-upassword:X': u'', - } - path, params = self.expect_redirect_publish() + req = self.request() + req.form = {'__cloned_eid:X': user.eid, + 'eid': 'X', '__type:X': 'CWUser', + '__maineid' : 'X', + 'login:X': u'toto', 'edits-login:X': u'', + 'upassword:X': u'toto', 'upassword-confirm:X': u'toto', 'edits-upassword:X': u'', + } + path, params = self.expect_redirect_publish(req) self.assertEquals(path, 'cwuser/toto') e = self.execute('Any X WHERE X is CWUser, X login "toto"').get_entity(0, 0) self.assertEquals(e.login, 'toto') @@ -466,29 +485,30 @@ e = self.add_entity('EmailAddress', address=u'doe@doe.com') self.execute('SET P use_email E, P primary_email E WHERE P eid %(p)s, E eid %(e)s', {'p' : p.eid, 'e' : e.eid}) - self.req.form = {'__cloned_eid:X': p.eid, - 'eid': 'X', '__type:X': 'CWUser', - 'login': u'dodo', 'edits-login': u'dodo', - 'surname:X': u'Boom', 'edits-surname:X': u'', - '__errorurl' : "whatever but required", + req = self.request() + req.form = {'__cloned_eid:X': p.eid, + 'eid': 'X', '__type:X': 'CWUser', + 'login': u'dodo', 'edits-login': u'dodo', + 'surname:X': u'Boom', 'edits-surname:X': u'', + '__errorurl' : "whatever but required", } # try to emulate what really happens in the web application # 1/ validate form => EditController.publish raises a ValidationError # which fires a Redirect # 2/ When re-publishing the copy form, the publisher implicitly commits try: - self.env.app.publish('edit', self.req) + self.app.publish('edit', req) except Redirect: - self.req.form['rql'] = 'Any X WHERE X eid %s' % p.eid - self.req.form['vid'] = 'copy' - self.env.app.publish('view', self.req) + req.form['rql'] = 'Any X WHERE X eid %s' % p.eid + req.form['vid'] = 'copy' + self.app.publish('view', req) rset = self.execute('CWUser P WHERE P surname "Boom"') self.assertEquals(len(rset), 0) finally: p.__class__.skip_copy_for = old_skips -class EmbedControllerTC(EnvBasedTC): +class EmbedControllerTC(CubicWebTC): def test_nonregr_embed_publish(self): # This test looks a bit stupid but at least it will probably @@ -500,23 +520,23 @@ result = controller.publish(rset=None) -class ReportBugControllerTC(EnvBasedTC): +class ReportBugControllerTC(CubicWebTC): def test_usable_by_guets(self): - req = self.request() - self.vreg['controllers'].select('reportbug', req) + self.login('anon') + self.vreg['controllers'].select('reportbug', self.request()) -class SendMailControllerTC(EnvBasedTC): +class SendMailControllerTC(CubicWebTC): def test_not_usable_by_guets(self): self.login('anon') - req = self.request() - self.assertRaises(NoSelectableObject, self.env.vreg['controllers'].select, 'sendmail', req) + self.assertRaises(NoSelectableObject, + self.vreg['controllers'].select, 'sendmail', self.request()) -class JSONControllerTC(EnvBasedTC): +class JSONControllerTC(CubicWebTC): def ctrl(self, req=None): req = req or self.request(url='http://whatever.fr/') diff -r 8074dd88e21b -r a9a2dca5db20 web/test/unittest_views_basetemplates.py --- a/web/test/unittest_views_basetemplates.py Tue Aug 11 17:06:02 2009 +0200 +++ b/web/test/unittest_views_basetemplates.py Tue Aug 11 17:15:54 2009 +0200 @@ -5,11 +5,11 @@ :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr :license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses """ -from cubicweb.devtools.testlib import WebTest +from cubicweb.devtools.testlib import CubicWebTC from cubicweb.devtools.htmlparser import DTDValidator -class LogFormTemplateTC(WebTest): +class LogFormTemplateTC(CubicWebTC): def _login_labels(self): valid = self.content_type_validators.get('text/html', DTDValidator)() diff -r 8074dd88e21b -r a9a2dca5db20 web/test/unittest_views_baseviews.py --- a/web/test/unittest_views_baseviews.py Tue Aug 11 17:06:02 2009 +0200 +++ b/web/test/unittest_views_baseviews.py Tue Aug 11 17:15:54 2009 +0200 @@ -10,7 +10,7 @@ from logilab.common.testlib import unittest_main from logilab.mtconverter import html_unescape -from cubicweb.devtools.apptest import EnvBasedTC +from cubicweb.devtools.testlib import CubicWebTC from cubicweb.web.htmlwidgets import TableWidget from cubicweb.web.views import vid_from_rset @@ -18,7 +18,7 @@ def loadjson(value): return loads(html_unescape(value)) -class VidFromRsetTC(EnvBasedTC): +class VidFromRsetTC(CubicWebTC): def test_no_rset(self): req = self.request() @@ -84,7 +84,7 @@ self.assertEquals(vid_from_rset(req, rset, self.schema), 'table') -class TableViewTC(EnvBasedTC): +class TableViewTC(CubicWebTC): def _prepare_entity(self): e = self.add_entity("State", name=u'', description=u'loo"ong blabla') diff -r 8074dd88e21b -r a9a2dca5db20 web/test/unittest_views_editforms.py --- a/web/test/unittest_views_editforms.py Tue Aug 11 17:06:02 2009 +0200 +++ b/web/test/unittest_views_editforms.py Tue Aug 11 17:15:54 2009 +0200 @@ -6,14 +6,13 @@ :license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses """ from logilab.common.testlib import unittest_main -from cubicweb.devtools.apptest import EnvBasedTC -from cubicweb.devtools.testlib import WebTest +from cubicweb.devtools.testlib import CubicWebTC from cubicweb.web.views.autoform import AutomaticEntityForm as AEF from cubicweb.web.formwidgets import AutoCompletionWidget def rbc(entity, category): return [(rschema.type, x) for rschema, tschemas, x in AEF.erelations_by_category(entity, category)] -class AutomaticEntityFormTC(EnvBasedTC): +class AutomaticEntityFormTC(CubicWebTC): def test_custom_widget(self): AEF.rfields_kwargs.tag_subject_of(('CWUser', 'login', '*'), @@ -29,7 +28,7 @@ #for (rtype, role, stype, otype), tag in AEF.rcategories._tagdefs.items(): # if rtype == 'tags': # print rtype, role, stype, otype, ':', tag - e = self.etype_instance('CWUser') + e = self.vreg['etypes'].etype_class('CWUser')(self.request()) # see custom configuration in views.cwuser self.assertEquals(rbc(e, 'primary'), [('login', 'subject'), @@ -76,7 +75,7 @@ self.failIf(AEF.rinlined.etype_get('CWUser', 'primary_email', 'subject')) def test_personne_relations_by_category(self): - e = self.etype_instance('Personne') + e = self.vreg['etypes'].etype_class('Personne')(self.request()) self.assertListEquals(rbc(e, 'primary'), [('nom', 'subject'), ('eid', 'subject') @@ -124,7 +123,7 @@ self.failIf(any(f for f in form.fields if f is None)) -class FormViewsTC(WebTest): +class FormViewsTC(CubicWebTC): def test_delete_conf_formview(self): rset = self.execute('CWGroup X') self.view('deleteconf', rset, template=None).source diff -r 8074dd88e21b -r a9a2dca5db20 web/test/unittest_views_navigation.py --- a/web/test/unittest_views_navigation.py Tue Aug 11 17:06:02 2009 +0200 +++ b/web/test/unittest_views_navigation.py Tue Aug 11 17:15:54 2009 +0200 @@ -7,14 +7,14 @@ """ from logilab.common.testlib import unittest_main, mock_object -from cubicweb.devtools.apptest import EnvBasedTC +from cubicweb.devtools.testlib import CubicWebTC from cubicweb.web.views.navigation import PageNavigation, SortedNavigation from cubicweb.web.views.ibreadcrumbs import BreadCrumbEntityVComponent BreadCrumbEntityVComponent.visible = True -class NavigationTC(EnvBasedTC): +class NavigationTC(CubicWebTC): def test_navigation_selection_whatever(self): req = self.request() @@ -96,7 +96,7 @@ -class ContentNavigationTC(EnvBasedTC): +class ContentNavigationTC(CubicWebTC): def test_component_context(self): view = mock_object(is_primary=lambda x: True) diff -r 8074dd88e21b -r a9a2dca5db20 web/test/unittest_views_pyviews.py --- a/web/test/unittest_views_pyviews.py Tue Aug 11 17:06:02 2009 +0200 +++ b/web/test/unittest_views_pyviews.py Tue Aug 11 17:15:54 2009 +0200 @@ -1,7 +1,7 @@ from logilab.common.testlib import unittest_main -from cubicweb.devtools.apptest import EnvBasedTC +from cubicweb.devtools.testlib import CubicWebTC -class PyViewsTC(EnvBasedTC): +class PyViewsTC(CubicWebTC): def test_pyvaltable(self): content = self.vreg['views'].render('pyvaltable', self.request(), diff -r 8074dd88e21b -r a9a2dca5db20 web/test/unittest_views_searchrestriction.py --- a/web/test/unittest_views_searchrestriction.py Tue Aug 11 17:06:02 2009 +0200 +++ b/web/test/unittest_views_searchrestriction.py Tue Aug 11 17:15:54 2009 +0200 @@ -5,11 +5,11 @@ :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr :license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses """ -from cubicweb.devtools.apptest import EnvBasedTC +from cubicweb.devtools.testlib import CubicWebTC from cubicweb.web.facet import insert_attr_select_relation, prepare_facets_rqlst -class InsertAttrRelationTC(EnvBasedTC): +class InsertAttrRelationTC(CubicWebTC): def parse(self, query): rqlst = self.vreg.parse(self.session, query) diff -r 8074dd88e21b -r a9a2dca5db20 web/test/unittest_viewselector.py --- a/web/test/unittest_viewselector.py Tue Aug 11 17:06:02 2009 +0200 +++ b/web/test/unittest_viewselector.py Tue Aug 11 17:15:54 2009 +0200 @@ -4,7 +4,7 @@ """ from logilab.common.testlib import unittest_main -from cubicweb.devtools.apptest import EnvBasedTC +from cubicweb.devtools.testlib import CubicWebTC from cubicweb import CW_SOFTWARE_ROOT as BASE, Binary from cubicweb.selectors import (match_user_groups, implements, specified_etype_implements, rql_condition, @@ -26,7 +26,7 @@ ('siteinfo', actions.SiteInfoAction), ] -class ViewSelectorTC(EnvBasedTC): +class ViewSelectorTC(CubicWebTC): def setup_database(self): self.add_entity('BlogEntry', title=u"une news !", content=u"cubicweb c'est beau") @@ -78,12 +78,12 @@ ('systempropertiesform', cwproperties.SystemCWPropertiesForm)]) def test_possible_views_noresult(self): - rset, req = self.env.get_rset_and_req('Any X WHERE X eid 999999') + rset, req = self.rset_and_req('Any X WHERE X eid 999999') self.assertListEqual(self.pviews(req, rset), []) def test_possible_views_one_egroup(self): - rset, req = self.env.get_rset_and_req('CWGroup X WHERE X name "managers"') + rset, req = self.rset_and_req('CWGroup X WHERE X name "managers"') self.assertListEqual(self.pviews(req, rset), [('adaptedlist', baseviews.AdaptedListView), ('csvexport', csvexport.CSVRsetView), @@ -106,7 +106,7 @@ ]) def test_possible_views_multiple_egroups(self): - rset, req = self.env.get_rset_and_req('CWGroup X') + rset, req = self.rset_and_req('CWGroup X') self.assertListEqual(self.pviews(req, rset), [('adaptedlist', baseviews.AdaptedListView), ('csvexport', csvexport.CSVRsetView), @@ -130,16 +130,16 @@ def test_propertiesform_admin(self): assert self.vreg['views']['propertiesform'] - rset1, req1 = self.env.get_rset_and_req('CWUser X WHERE X login "admin"') - rset2, req2 = self.env.get_rset_and_req('CWUser X WHERE X login "anon"') + rset1, req1 = self.rset_and_req('CWUser X WHERE X login "admin"') + rset2, req2 = self.rset_and_req('CWUser X WHERE X login "anon"') self.failUnless(self.vreg['views'].select('propertiesform', req1, rset=None)) self.failUnless(self.vreg['views'].select('propertiesform', req1, rset=rset1)) self.failUnless(self.vreg['views'].select('propertiesform', req2, rset=rset2)) def test_propertiesform_anon(self): self.login('anon') - rset1, req1 = self.env.get_rset_and_req('CWUser X WHERE X login "admin"') - rset2, req2 = self.env.get_rset_and_req('CWUser X WHERE X login "anon"') + rset1, req1 = self.rset_and_req('CWUser X WHERE X login "admin"') + rset2, req2 = self.rset_and_req('CWUser X WHERE X login "anon"') self.assertRaises(NoSelectableObject, self.vreg['views'].select, 'propertiesform', req1, rset=None) self.assertRaises(NoSelectableObject, self.vreg['views'].select, 'propertiesform', req1, rset=rset1) self.assertRaises(NoSelectableObject, self.vreg['views'].select, 'propertiesform', req1, rset=rset2) @@ -147,14 +147,14 @@ def test_propertiesform_jdoe(self): self.create_user('jdoe') self.login('jdoe') - rset1, req1 = self.env.get_rset_and_req('CWUser X WHERE X login "admin"') - rset2, req2 = self.env.get_rset_and_req('CWUser X WHERE X login "jdoe"') + rset1, req1 = self.rset_and_req('CWUser X WHERE X login "admin"') + rset2, req2 = self.rset_and_req('CWUser X WHERE X login "jdoe"') self.failUnless(self.vreg['views'].select('propertiesform', req1, rset=None)) self.assertRaises(NoSelectableObject, self.vreg['views'].select, 'propertiesform', req1, rset=rset1) self.failUnless(self.vreg['views'].select('propertiesform', req2, rset=rset2)) def test_possible_views_multiple_different_types(self): - rset, req = self.env.get_rset_and_req('Any X') + rset, req = self.rset_and_req('Any X') self.assertListEqual(self.pviews(req, rset), [('csvexport', csvexport.CSVRsetView), ('ecsvexport', csvexport.CSVEntityView), @@ -176,7 +176,7 @@ ]) def test_possible_views_any_rset(self): - rset, req = self.env.get_rset_and_req('Any N, X WHERE X in_group Y, Y name N') + rset, req = self.rset_and_req('Any N, X WHERE X in_group Y, Y name N') self.assertListEqual(self.pviews(req, rset), [('csvexport', csvexport.CSVRsetView), ('editable-table', tableview.EditableTableView), @@ -185,7 +185,7 @@ ]) def test_possible_views_multiple_eusers(self): - rset, req = self.env.get_rset_and_req('CWUser X') + rset, req = self.rset_and_req('CWUser X') self.assertListEqual(self.pviews(req, rset), [('adaptedlist', baseviews.AdaptedListView), ('csvexport', csvexport.CSVRsetView), @@ -218,14 +218,14 @@ }) def test_possible_actions_no_entity(self): - rset, req = self.env.get_rset_and_req('Any X WHERE X eid 999999') + rset, req = self.rset_and_req('Any X WHERE X eid 999999') self.assertDictEqual(self.pactions(req, rset), {'useractions': USERACTIONS, 'siteactions': SITEACTIONS, }) def test_possible_actions_same_type_entities(self): - rset, req = self.env.get_rset_and_req('CWGroup X') + rset, req = self.rset_and_req('CWGroup X') self.assertDictEqual(self.pactions(req, rset), {'useractions': USERACTIONS, 'siteactions': SITEACTIONS, @@ -235,7 +235,7 @@ }) def test_possible_actions_different_types_entities(self): - rset, req = self.env.get_rset_and_req('Any X') + rset, req = self.rset_and_req('Any X') self.assertDictEqual(self.pactions(req, rset), {'useractions': USERACTIONS, 'siteactions': SITEACTIONS, @@ -243,13 +243,13 @@ }) def test_possible_actions_final_entities(self): - rset, req = self.env.get_rset_and_req('Any N, X WHERE X in_group Y, Y name N') + rset, req = self.rset_and_req('Any N, X WHERE X in_group Y, Y name N') self.assertDictEqual(self.pactions(req, rset), {'useractions': USERACTIONS, 'siteactions': SITEACTIONS}) def test_possible_actions_eetype_cwuser_entity(self): - rset, req = self.env.get_rset_and_req('CWEType X WHERE X name "CWUser"') + rset, req = self.rset_and_req('CWEType X WHERE X name "CWUser"') self.assertDictEqual(self.pactions(req, rset), {'useractions': USERACTIONS, 'siteactions': SITEACTIONS, @@ -291,7 +291,7 @@ self.vreg['views'].select, 'table', req, rset=rset) # no entity - rset, req = self.env.get_rset_and_req('Any X WHERE X eid 999999') + rset, req = self.rset_and_req('Any X WHERE X eid 999999') self.failUnlessRaises(NoSelectableObject, self.vreg['views'].select, 'index', req, rset=rset) self.failUnlessRaises(NoSelectableObject, @@ -301,7 +301,7 @@ self.failUnlessRaises(NoSelectableObject, self.vreg['views'].select, 'table', req, rset=rset) # one entity - rset, req = self.env.get_rset_and_req('CWGroup X WHERE X name "managers"') + rset, req = self.rset_and_req('CWGroup X WHERE X name "managers"') self.assertIsInstance(self.vreg['views'].select('primary', req, rset=rset), primary.PrimaryView) self.assertIsInstance(self.vreg['views'].select('list', req, rset=rset), @@ -315,7 +315,7 @@ self.failUnlessRaises(NoSelectableObject, self.vreg['views'].select, 'index', req, rset=rset) # list of entities of the same type - rset, req = self.env.get_rset_and_req('CWGroup X') + rset, req = self.rset_and_req('CWGroup X') self.assertIsInstance(self.vreg['views'].select('primary', req, rset=rset), primary.PrimaryView) self.assertIsInstance(self.vreg['views'].select('list', req, rset=rset), @@ -325,7 +325,7 @@ self.failUnlessRaises(NoSelectableObject, self.vreg['views'].select, 'creation', req, rset=rset) # list of entities of different types - rset, req = self.env.get_rset_and_req('Any X') + rset, req = self.rset_and_req('Any X') self.assertIsInstance(self.vreg['views'].select('primary', req, rset=rset), primary.PrimaryView) self.assertIsInstance(self.vreg['views'].select('list', req, rset=rset), @@ -337,7 +337,7 @@ self.failUnlessRaises(NoSelectableObject, self.vreg['views'].select, 'index', req, rset=rset) # whatever - rset, req = self.env.get_rset_and_req('Any N, X WHERE X in_group Y, Y name N') + rset, req = self.rset_and_req('Any N, X WHERE X in_group Y, Y name N') self.assertIsInstance(self.vreg['views'].select('table', req, rset=rset), tableview.TableView) self.failUnlessRaises(NoSelectableObject, @@ -351,7 +351,7 @@ self.failUnlessRaises(NoSelectableObject, self.vreg['views'].select, 'edition', req, rset=rset) # mixed query - rset, req = self.env.get_rset_and_req('Any U,G WHERE U is CWUser, G is CWGroup') + rset, req = self.rset_and_req('Any U,G WHERE U is CWUser, G is CWGroup') self.failUnlessRaises(NoSelectableObject, self.vreg['views'].select, 'edition', req, rset=rset) self.failUnlessRaises(NoSelectableObject, @@ -362,7 +362,7 @@ def test_interface_selector(self): image = self.add_entity('Image', name=u'bim.png', data=Binary('bim')) # image primary view priority - rset, req = self.env.get_rset_and_req('Image X WHERE X name "bim.png"') + rset, req = self.rset_and_req('Image X WHERE X name "bim.png"') self.assertIsInstance(self.vreg['views'].select('primary', req, rset=rset), idownloadable.IDownloadablePrimaryView) @@ -370,12 +370,12 @@ def test_score_entity_selector(self): image = self.add_entity('Image', name=u'bim.png', data=Binary('bim')) # image primary view priority - rset, req = self.env.get_rset_and_req('Image X WHERE X name "bim.png"') + rset, req = self.rset_and_req('Image X WHERE X name "bim.png"') self.assertIsInstance(self.vreg['views'].select('image', req, rset=rset), idownloadable.ImageView) fileobj = self.add_entity('File', name=u'bim.txt', data=Binary('bim')) # image primary view priority - rset, req = self.env.get_rset_and_req('File X WHERE X name "bim.txt"') + rset, req = self.rset_and_req('File X WHERE X name "bim.txt"') self.assertRaises(NoSelectableObject, self.vreg['views'].select, 'image', req, rset=rset) @@ -385,7 +385,7 @@ rset = None req = self.request() else: - rset, req = self.env.get_rset_and_req(rql) + rset, req = self.rset_and_req(rql) try: self.vreg['views'].render(vid, req, rset=rset, **args) except: @@ -438,7 +438,7 @@ del self.vreg['actions']['testaction'] def test(self): - rset, req = self.env.get_rset_and_req('CWEType X WHERE X name "CWEType"') + rset, req = self.rset_and_req('CWEType X WHERE X name "CWEType"') self.assertDictEqual(self.pactions(req, rset), {'useractions': USERACTIONS, 'siteactions': SITEACTIONS, @@ -449,7 +449,7 @@ ('testaction', CWETypeRQLAction), ], }) - rset, req = self.env.get_rset_and_req('CWEType X WHERE X name "CWRType"') + rset, req = self.rset_and_req('CWEType X WHERE X name "CWRType"') self.assertDictEqual(self.pactions(req, rset), {'useractions': USERACTIONS, 'siteactions': SITEACTIONS, diff -r 8074dd88e21b -r a9a2dca5db20 web/test/unittest_webconfig.py --- a/web/test/unittest_webconfig.py Tue Aug 11 17:06:02 2009 +0200 +++ b/web/test/unittest_webconfig.py Tue Aug 11 17:15:54 2009 +0200 @@ -9,8 +9,7 @@ from logilab.common.testlib import TestCase, unittest_main -from cubicweb.devtools._apptest import FakeRequest -from cubicweb.devtools import ApptestConfiguration +from cubicweb.devtools import ApptestConfiguration, fake class WebconfigTC(TestCase): def setUp(self): @@ -21,7 +20,7 @@ def test_nonregr_print_css_as_list(self): """make sure PRINT_CSS *must* is a list""" config = self.config - req = FakeRequest() + req = fake.FakeRequest() print_css = req.external_resource('STYLESHEETS_PRINT') self.failUnless(isinstance(print_css, list)) ie_css = req.external_resource('IE_STYLESHEETS')