--- a/appobject.py Tue Aug 11 17:19:05 2009 +0200
+++ b/appobject.py Tue Aug 11 17:28:18 2009 +0200
@@ -377,8 +377,8 @@
# try to get page boundaries from the navigation component
# XXX we should probably not have a ref to this component here (eg in
# cubicweb.common)
- nav = self.vreg['components'].select_object('navigation', self.req,
- rset=self.rset)
+ nav = self.vreg['components'].select_or_none('navigation', self.req,
+ rset=self.rset)
if nav:
start, stop = nav.page_boundaries()
rql = self._limit_offset_rql(stop - start, start)
--- a/common/mixins.py Tue Aug 11 17:19:05 2009 +0200
+++ b/common/mixins.py Tue Aug 11 17:28:18 2009 +0200
@@ -168,7 +168,7 @@
@property
def state(self):
try:
- return self.in_state[0].name
+ return self.related('in_state', entities=True)[0].get_value('name')
except IndexError:
self.warning('entity %s has no state', self)
return None
@@ -259,10 +259,10 @@
__implements__ = (IEmailable,)
def get_email(self):
- if getattr(self, 'primary_email', None):
- return self.primary_email[0].address
- if getattr(self, 'use_email', None):
- return self.use_email[0].address
+ emails = self.related('primary_email', 'subject', entities=True) or \
+ self.related('use_email', 'subject', entities=True)
+ if emails:
+ return emails[0].get_value('address')
return None
@classmethod
@@ -283,7 +283,8 @@
NOTE: the dictionary keys should match the list returned by the
`allowed_massmail_keys` method.
"""
- return dict( (attr, getattr(self, attr)) for attr in self.allowed_massmail_keys() )
+ return dict( (attr, self.cwgetattr(attr))
+ for attr in self.allowed_massmail_keys() )
--- a/common/test/unittest_mail.py Tue Aug 11 17:19:05 2009 +0200
+++ b/common/test/unittest_mail.py Tue Aug 11 17:28:18 2009 +0200
@@ -13,7 +13,7 @@
from logilab.common.testlib import unittest_main
from logilab.common.umessage import message_from_string
-from cubicweb.devtools.apptest import EnvBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.common.mail import format_mail
@@ -25,7 +25,7 @@
return pwd.getpwuid(os.getuid())[0]
-class EmailTC(EnvBasedTC):
+class EmailTC(CubicWebTC):
def test_format_mail(self):
self.set_option('sender-addr', 'bim@boum.fr')
--- a/common/test/unittest_migration.py Tue Aug 11 17:19:05 2009 +0200
+++ b/common/test/unittest_migration.py Tue Aug 11 17:28:18 2009 +0200
@@ -10,8 +10,6 @@
from logilab.common.testlib import TestCase, unittest_main
from cubicweb.devtools import TestServerConfiguration
-from cubicweb.devtools.apptest import TestEnvironment
-
from cubicweb.cwconfig import CubicWebConfiguration
from cubicweb.common.migration import MigrationHelper, filter_scripts
from cubicweb.server.migractions import ServerMigrationHelper
@@ -98,8 +96,8 @@
config = ApptestConfiguration('data')
source = config.sources()['system']
self.assertEquals(source['db-driver'], 'sqlite')
- cleanup_sqlite(source['db-name'], removecube=True)
- init_test_database(driver=source['db-driver'], config=config)
+ cleanup_sqlite(source['db-name'], removetemplate=True)
+ init_test_database(config=config)
if __name__ == '__main__':
--- a/common/test/unittest_mixins.py Tue Aug 11 17:19:05 2009 +0200
+++ b/common/test/unittest_mixins.py Tue Aug 11 17:28:18 2009 +0200
@@ -6,9 +6,9 @@
:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
"""
from logilab.common.testlib import unittest_main
-from cubicweb.devtools.apptest import EnvBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
-class WorkfloableMixInTC(EnvBasedTC):
+class WorkfloableMixInTC(CubicWebTC):
def test_wf_state(self):
s = self.add_entity('State', name=u'activated')
self.execute('SET X state_of ET WHERE ET name "Bookmark", X eid %(x)s',
--- a/cwvreg.py Tue Aug 11 17:19:05 2009 +0200
+++ b/cwvreg.py Tue Aug 11 17:28:18 2009 +0200
@@ -69,7 +69,7 @@
return obj.render(**kwargs)
def select_vobject(self, oid, *args, **kwargs):
- selected = self.select_object(oid, *args, **kwargs)
+ selected = self.select_or_none(oid, *args, **kwargs)
if selected and selected.propval('visible'):
return selected
return None
@@ -172,7 +172,7 @@
if vid[0] == '_':
continue
try:
- view = self.select_best(views, req, rset=rset, **kwargs)
+ view = self._select_best(views, req, rset=rset, **kwargs)
if view.linkable():
yield view
except NoSelectableObject:
@@ -351,6 +351,8 @@
implemented_interfaces = set()
if 'Any' in self.get('etypes', ()):
for etype in self.schema.entities():
+ if etype.is_final():
+ continue
cls = self['etypes'].etype_class(etype)
for iface in cls.__implements__:
implemented_interfaces.update(iface.__mro__)
@@ -403,17 +405,17 @@
def possible_actions(self, req, rset=None, **kwargs):
return self["actions"].possible_actions(req, rest=rset, **kwargs)
- @deprecated("use vreg['boxes'].select_object(...)")
+ @deprecated("use vreg['boxes'].select_or_none(...)")
def select_box(self, oid, *args, **kwargs):
- return self['boxes'].select_object(oid, *args, **kwargs)
+ return self['boxes'].select_or_none(oid, *args, **kwargs)
- @deprecated("use vreg['components'].select_object(...)")
+ @deprecated("use vreg['components'].select_or_none(...)")
def select_component(self, cid, *args, **kwargs):
- return self['components'].select_object(cid, *args, **kwargs)
+ return self['components'].select_or_none(cid, *args, **kwargs)
- @deprecated("use vreg['actions'].select_object(...)")
+ @deprecated("use vreg['actions'].select_or_none(...)")
def select_action(self, oid, *args, **kwargs):
- return self['actions'].select_object(oid, *args, **kwargs)
+ return self['actions'].select_or_none(oid, *args, **kwargs)
@deprecated("use vreg['views'].select(...)")
def select_view(self, __vid, req, rset=None, **kwargs):
--- a/dbapi.py Tue Aug 11 17:19:05 2009 +0200
+++ b/dbapi.py Tue Aug 11 17:28:18 2009 +0200
@@ -42,9 +42,9 @@
registries.
"""
defaultcls = cwvreg.VRegistry.REGISTRY_FACTORY[None]
- orig_select_best = defaultcls.orig_select_best = defaultcls.select_best
+ orig_select_best = defaultcls.orig_select_best = defaultcls._select_best
@monkeypatch(defaultcls)
- def select_best(self, appobjects, *args, **kwargs):
+ def _select_best(self, appobjects, *args, **kwargs):
"""return an instance of the most specific object according
to parameters
--- a/devtools/__init__.py Tue Aug 11 17:19:05 2009 +0200
+++ b/devtools/__init__.py Tue Aug 11 17:28:18 2009 +0200
@@ -13,20 +13,54 @@
from os.path import (abspath, join, exists, basename, dirname, normpath, split,
isfile, isabs)
-from cubicweb import CW_SOFTWARE_ROOT, ConfigurationError
+from cubicweb import CW_SOFTWARE_ROOT, ConfigurationError, schema, cwconfig
from cubicweb.utils import strptime
-from cubicweb.toolsutils import read_config
-from cubicweb.cwconfig import CubicWebConfiguration, merge_options
from cubicweb.server.serverconfig import ServerConfiguration
from cubicweb.etwist.twconfig import TwistedConfiguration
+cwconfig.CubicWebConfiguration.cls_adjust_sys_path()
+
+# db auto-population configuration #############################################
+
+SYSTEM_ENTITIES = schema.SCHEMA_TYPES | set((
+ 'CWGroup', 'CWUser', 'CWProperty',
+ 'State', 'Transition', 'TrInfo',
+ ))
+
+SYSTEM_RELATIONS = schema.META_RTYPES | set((
+ # workflow related
+ 'state_of', 'transition_of', 'initial_state', 'allowed_transition',
+ 'destination_state', 'in_state', 'wf_info_for', 'from_state', 'to_state',
+ 'condition',
+ # cwproperty
+ 'for_user',
+ # schema definition
+ 'specializes',
+ 'relation_type', 'from_entity', 'to_entity',
+ 'constrained_by', 'cstrtype', 'widget',
+ 'read_permission', 'update_permission', 'delete_permission', 'add_permission',
+ # permission
+ 'in_group', 'require_group', 'require_permission',
+ # deducted from other relations
+ 'primary_email',
+ ))
+
+# content validation configuration #############################################
+
# validators are used to validate (XML, DTD, whatever) view's content
# validators availables are :
# 'dtd' : validates XML + declared DTD
# 'xml' : guarantees XML is well formed
# None : do not try to validate anything
+
+# {'vid': validator}
VIEW_VALIDATORS = {}
+
+
+# cubicweb test configuration ##################################################
+
BASE_URL = 'http://testing.fr/cubicweb/'
+
DEFAULT_SOURCES = {'system': {'adapter' : 'native',
'db-encoding' : 'UTF-8', #'ISO-8859-1',
'db-user' : u'admin',
@@ -40,13 +74,14 @@
},
}
+
class TestServerConfiguration(ServerConfiguration):
mode = 'test'
set_language = False
read_instance_schema = False
bootstrap_schema = False
init_repository = True
- options = merge_options(ServerConfiguration.options + (
+ options = cwconfig.merge_options(ServerConfiguration.options + (
('anonymous-user',
{'type' : 'string',
'default': None,
@@ -66,7 +101,6 @@
def __init__(self, appid, log_threshold=logging.CRITICAL+10):
ServerConfiguration.__init__(self, appid)
- self.global_set_option('log-file', None)
self.init_log(log_threshold, force=True)
# need this, usually triggered by cubicweb-ctl
self.load_cwctl_plugins()
@@ -118,30 +152,11 @@
sources = DEFAULT_SOURCES
return sources
- def load_defaults(self):
- super(TestServerConfiguration, self).load_defaults()
- # note: don't call global set option here, OptionManager may not yet be initialized
- # add anonymous user
- self.set_option('anonymous-user', 'anon')
- self.set_option('anonymous-password', 'anon')
- # uncomment the line below if you want rql queries to be logged
- #self.set_option('query-log-file', '/tmp/test_rql_log.' + `os.getpid()`)
- self.set_option('sender-name', 'cubicweb-test')
- self.set_option('sender-addr', 'cubicweb-test@logilab.fr')
- try:
- send_to = '%s@logilab.fr' % os.getlogin()
- except OSError:
- send_to = '%s@logilab.fr' % (os.environ.get('USER')
- or os.environ.get('USERNAME')
- or os.environ.get('LOGNAME'))
- self.set_option('sender-addr', send_to)
- self.set_option('default-dest-addrs', send_to)
- self.set_option('base-url', BASE_URL)
-
class BaseApptestConfiguration(TestServerConfiguration, TwistedConfiguration):
repo_method = 'inmemory'
- options = merge_options(TestServerConfiguration.options + TwistedConfiguration.options)
+ options = cwconfig.merge_options(TestServerConfiguration.options
+ + TwistedConfiguration.options)
cubicweb_appobject_path = TestServerConfiguration.cubicweb_appobject_path | TwistedConfiguration.cubicweb_appobject_path
cube_appobject_path = TestServerConfiguration.cube_appobject_path | TwistedConfiguration.cube_appobject_path
@@ -163,98 +178,91 @@
BaseApptestConfiguration.__init__(self, appid, log_threshold=log_threshold)
self.init_repository = sourcefile is None
self.sourcefile = sourcefile
- import re
- self.global_set_option('embed-allowed', re.compile('.*'))
+ self.global_set_option('anonymous-user', 'anon')
+ self.global_set_option('anonymous-password', 'anon')
+
+ def load_configuration(self):
+ super(ApptestConfiguration, self).load_configuration()
+ self.global_set_option('anonymous-user', 'anon')
+ self.global_set_option('anonymous-password', 'anon')
-class RealDatabaseConfiguration(ApptestConfiguration):
- init_repository = False
- sourcesdef = {'system': {'adapter' : 'native',
- 'db-encoding' : 'UTF-8', #'ISO-8859-1',
- 'db-user' : u'admin',
- 'db-password' : 'gingkow',
- 'db-name' : 'seotest',
- 'db-driver' : 'postgres',
- 'db-host' : None,
- },
- 'admin' : {'login': u'admin',
- 'password': u'gingkow',
- },
- }
+# test database handling #######################################################
- def __init__(self, appid, log_threshold=logging.CRITICAL, sourcefile=None):
- ApptestConfiguration.__init__(self, appid)
- self.init_repository = False
+def init_test_database(config=None, configdir='data'):
+ """init a test database for a specific driver"""
+ from cubicweb.dbapi import in_memory_cnx
+ config = config or TestServerConfiguration(configdir)
+ sources = config.sources()
+ driver = sources['system']['db-driver']
+ if driver == 'sqlite':
+ init_test_database_sqlite(config)
+ elif driver == 'postgres':
+ init_test_database_postgres(config)
+ else:
+ raise ValueError('no initialization function for driver %r' % driver)
+ config._cubes = None # avoid assertion error
+ repo, cnx = in_memory_cnx(config, unicode(sources['admin']['login']),
+ sources['admin']['password'] or 'xxx')
+ if driver == 'sqlite':
+ install_sqlite_patch(repo.querier)
+ return repo, cnx
- def sources(self):
- """
- By default, we run tests with the sqlite DB backend.
- One may use its own configuration by just creating a
- 'sources' file in the test directory from wich tests are
- launched.
- """
- self._sources = self.sourcesdef
- return self._sources
+def reset_test_database(config):
+ """init a test database for a specific driver"""
+ driver = config.sources()['system']['db-driver']
+ if driver == 'sqlite':
+ reset_test_database_sqlite(config)
+ else:
+ raise ValueError('no reset function for driver %r' % driver)
-def buildconfig(dbuser, dbpassword, dbname, adminuser, adminpassword, dbhost=None):
- """convenience function that builds a real-db configuration class"""
- sourcesdef = {'system': {'adapter' : 'native',
- 'db-encoding' : 'UTF-8', #'ISO-8859-1',
- 'db-user' : dbuser,
- 'db-password' : dbpassword,
- 'db-name' : dbname,
- 'db-driver' : 'postgres',
- 'db-host' : dbhost,
- },
- 'admin' : {'login': adminuser,
- 'password': adminpassword,
- },
- }
- return type('MyRealDBConfig', (RealDatabaseConfiguration,),
- {'sourcesdef': sourcesdef})
+### postgres test database handling ############################################
-def loadconfig(filename):
- """convenience function that builds a real-db configuration class
- from a file
- """
- return type('MyRealDBConfig', (RealDatabaseConfiguration,),
- {'sourcesdef': read_config(filename)})
+def init_test_database_postgres(config):
+ """initialize a fresh sqlite databse used for testing purpose"""
+ if config.init_repository:
+ from cubicweb.server import init_repository
+ init_repository(config, interactive=False, drop=True)
-class LivetestConfiguration(BaseApptestConfiguration):
- init_repository = False
+### sqlite test database handling ##############################################
+
+def cleanup_sqlite(dbfile, removetemplate=False):
+ try:
+ os.remove(dbfile)
+ os.remove('%s-journal' % dbfile)
+ except OSError:
+ pass
+ if removetemplate:
+ try:
+ os.remove('%s-template' % dbfile)
+ except OSError:
+ pass
- def __init__(self, cube=None, sourcefile=None, pyro_name=None,
- log_threshold=logging.CRITICAL):
- TestServerConfiguration.__init__(self, cube, log_threshold=log_threshold)
- self.appid = pyro_name or cube
- # don't change this, else some symlink problems may arise in some
- # environment (e.g. mine (syt) ;o)
- # XXX I'm afraid this test will prevent to run test from a production
- # environment
- self._sources = None
- # instance cube test
- if cube is not None:
- self.apphome = self.cube_dir(cube)
- elif 'web' in os.getcwd().split(os.sep):
- # web test
- self.apphome = join(normpath(join(dirname(__file__), '..')), 'web')
- else:
- # cube test
- self.apphome = abspath('..')
- self.sourcefile = sourcefile
- self.global_set_option('realm', '')
- self.use_pyro = pyro_name is not None
+def reset_test_database_sqlite(config):
+ import shutil
+ dbfile = config.sources()['system']['db-name']
+ cleanup_sqlite(dbfile)
+ template = '%s-template' % dbfile
+ if exists(template):
+ shutil.copy(template, dbfile)
+ return True
+ return False
- def pyro_enabled(self):
- if self.use_pyro:
- return True
- else:
- return False
+def init_test_database_sqlite(config):
+ """initialize a fresh sqlite databse used for testing purpose"""
+ # remove database file if it exists
+ dbfile = config.sources()['system']['db-name']
+ if not reset_test_database_sqlite(config):
+ # initialize the database
+ import shutil
+ from cubicweb.server import init_repository
+ init_repository(config, interactive=False)
+ dbfile = config.sources()['system']['db-name']
+ shutil.copy(dbfile, '%s-template' % dbfile)
-CubicWebConfiguration.cls_adjust_sys_path()
def install_sqlite_patch(querier):
"""This patch hotfixes the following sqlite bug :
@@ -293,57 +301,3 @@
return new_execute
querier.__class__.execute = wrap_execute(querier.__class__.execute)
querier.__class__._devtools_sqlite_patched = True
-
-def init_test_database(driver='sqlite', configdir='data', config=None,
- vreg=None):
- """init a test database for a specific driver"""
- from cubicweb.dbapi import in_memory_cnx
- if vreg and not config:
- config = vreg.config
- config = config or TestServerConfiguration(configdir)
- source = config.sources()
- if driver == 'sqlite':
- init_test_database_sqlite(config, source)
- elif driver == 'postgres':
- init_test_database_postgres(config, source)
- else:
- raise ValueError('no initialization function for driver %r' % driver)
- config._cubes = None # avoid assertion error
- repo, cnx = in_memory_cnx(vreg or config, unicode(source['admin']['login']),
- source['admin']['password'] or 'xxx')
- if driver == 'sqlite':
- install_sqlite_patch(repo.querier)
- return repo, cnx
-
-def init_test_database_postgres(config, source, vreg=None):
- """initialize a fresh sqlite databse used for testing purpose"""
- if config.init_repository:
- from cubicweb.server import init_repository
- init_repository(config, interactive=False, drop=True, vreg=vreg)
-
-def cleanup_sqlite(dbfile, removecube=False):
- try:
- os.remove(dbfile)
- os.remove('%s-journal' % dbfile)
- except OSError:
- pass
- if removecube:
- try:
- os.remove('%s-template' % dbfile)
- except OSError:
- pass
-
-def init_test_database_sqlite(config, source, vreg=None):
- """initialize a fresh sqlite databse used for testing purpose"""
- import shutil
- # remove database file if it exists (actually I know driver == 'sqlite' :)
- dbfile = source['system']['db-name']
- cleanup_sqlite(dbfile)
- template = '%s-template' % dbfile
- if exists(template):
- shutil.copy(template, dbfile)
- else:
- # initialize the database
- from cubicweb.server import init_repository
- init_repository(config, interactive=False, vreg=vreg)
- shutil.copy(dbfile, template)
--- a/devtools/_apptest.py Tue Aug 11 17:19:05 2009 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,208 +0,0 @@
-"""Hidden internals for the devtools.apptest module
-
-:organization: Logilab
-:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.
-:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
-:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
-"""
-__docformat__ = "restructuredtext en"
-
-import sys, traceback
-
-from logilab.common.pytest import pause_tracing, resume_tracing
-
-import yams.schema
-
-from cubicweb.dbapi import repo_connect, ConnectionProperties, ProgrammingError
-from cubicweb.cwvreg import CubicWebVRegistry
-
-from cubicweb.web.application import CubicWebPublisher
-from cubicweb.web import Redirect
-
-from cubicweb.devtools import ApptestConfiguration, init_test_database
-from cubicweb.devtools.fake import FakeRequest
-
-SYSTEM_ENTITIES = ('CWGroup', 'CWUser',
- 'CWAttribute', 'CWRelation',
- 'CWConstraint', 'CWConstraintType', 'CWProperty',
- 'CWEType', 'CWRType',
- 'State', 'Transition', 'TrInfo',
- 'RQLExpression',
- )
-SYSTEM_RELATIONS = (
- # virtual relation
- 'identity',
- # metadata
- 'is', 'is_instance_of', 'owned_by', 'created_by', 'specializes',
- # workflow related
- 'state_of', 'transition_of', 'initial_state', 'allowed_transition',
- 'destination_state', 'in_state', 'wf_info_for', 'from_state', 'to_state',
- 'condition',
- # permission
- 'in_group', 'require_group', 'require_permission',
- 'read_permission', 'update_permission', 'delete_permission', 'add_permission',
- # eproperty
- 'for_user',
- # schema definition
- 'relation_type', 'from_entity', 'to_entity',
- 'constrained_by', 'cstrtype', 'widget',
- # deducted from other relations
- 'primary_email',
- )
-
-def unprotected_entities(app_schema, strict=False):
- """returned a Set of each non final entity type, excluding CWGroup, and CWUser...
- """
- if strict:
- protected_entities = yams.schema.BASE_TYPES
- else:
- protected_entities = yams.schema.BASE_TYPES.union(set(SYSTEM_ENTITIES))
- entities = set(app_schema.entities())
- return entities - protected_entities
-
-
-def ignore_relations(*relations):
- global SYSTEM_RELATIONS
- SYSTEM_RELATIONS += relations
-
-
-class TestEnvironment(object):
- """TestEnvironment defines a context (e.g. a config + a given connection) in
- which the tests are executed
- """
-
- def __init__(self, appid, reporter=None, verbose=False,
- configcls=ApptestConfiguration, requestcls=FakeRequest):
- config = configcls(appid)
- self.requestcls = requestcls
- self.cnx = None
- config.db_perms = False
- source = config.sources()['system']
- if verbose:
- print "init test database ..."
- self.vreg = vreg = CubicWebVRegistry(config)
- self.admlogin = source['db-user']
- # restore database <=> init database
- self.restore_database()
- if verbose:
- print "init done"
- config.repository = lambda x=None: self.repo
- self.app = CubicWebPublisher(config, vreg=vreg)
- self.verbose = verbose
- schema = self.vreg.schema
- # else we may run into problems since email address are ususally share in app tests
- # XXX should not be necessary anymore
- schema.rschema('primary_email').set_rproperty('CWUser', 'EmailAddress', 'composite', False)
- self.deletable_entities = unprotected_entities(schema)
-
- def restore_database(self):
- """called by unittests' tearDown to restore the original database
- """
- try:
- pause_tracing()
- if self.cnx:
- self.cnx.close()
- source = self.vreg.config.sources()['system']
- self.repo, self.cnx = init_test_database(driver=source['db-driver'],
- vreg=self.vreg)
- self._orig_cnx = self.cnx
- resume_tracing()
- except:
- resume_tracing()
- traceback.print_exc()
- sys.exit(1)
- # XXX cnx decoration is usually done by the repository authentication manager,
- # necessary in authentication tests
- self.cnx.vreg = self.vreg
- self.cnx.login = source['db-user']
- self.cnx.password = source['db-password']
-
-
- def create_user(self, login, groups=('users',), req=None):
- req = req or self.create_request()
- cursor = self._orig_cnx.cursor(req)
- rset = cursor.execute('INSERT CWUser X: X login %(login)s, X upassword %(passwd)s,'
- 'X in_state S WHERE S name "activated"',
- {'login': unicode(login), 'passwd': login.encode('utf8')})
- user = rset.get_entity(0, 0)
- cursor.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)'
- % ','.join(repr(g) for g in groups),
- {'x': user.eid}, 'x')
- user.clear_related_cache('in_group', 'subject')
- self._orig_cnx.commit()
- return user
-
- def login(self, login, password=None):
- if login == self.admlogin:
- self.restore_connection()
- else:
- self.cnx = repo_connect(self.repo, unicode(login),
- password or str(login),
- ConnectionProperties('inmemory'))
- if login == self.vreg.config.anonymous_user()[0]:
- self.cnx.anonymous_connection = True
- return self.cnx
-
- def restore_connection(self):
- if not self.cnx is self._orig_cnx:
- try:
- self.cnx.close()
- except ProgrammingError:
- pass # already closed
- self.cnx = self._orig_cnx
-
- ############################################################################
-
- def execute(self, rql, args=None, eidkey=None, req=None):
- """executes <rql>, builds a resultset, and returns a couple (rset, req)
- where req is a FakeRequest
- """
- req = req or self.create_request(rql=rql)
- return self.cnx.cursor(req).execute(unicode(rql), args, eidkey)
-
- def create_request(self, rql=None, **kwargs):
- """executes <rql>, builds a resultset, and returns a
- couple (rset, req) where req is a FakeRequest
- """
- if rql:
- kwargs['rql'] = rql
- req = self.requestcls(self.vreg, form=kwargs)
- req.set_connection(self.cnx)
- return req
-
- def get_rset_and_req(self, rql, optional_args=None, args=None, eidkey=None):
- """executes <rql>, builds a resultset, and returns a
- couple (rset, req) where req is a FakeRequest
- """
- return (self.execute(rql, args, eidkey),
- self.create_request(rql=rql, **optional_args or {}))
-
-
-class ExistingTestEnvironment(TestEnvironment):
-
- def __init__(self, appid, sourcefile, verbose=False):
- config = ApptestConfiguration(appid, sourcefile=sourcefile)
- if verbose:
- print "init test database ..."
- source = config.sources()['system']
- self.vreg = CubicWebVRegistry(config)
- self.cnx = init_test_database(driver=source['db-driver'],
- vreg=self.vreg)[1]
- if verbose:
- print "init done"
- self.app = CubicWebPublisher(config, vreg=self.vreg)
- self.verbose = verbose
- # this is done when the publisher is opening a connection
- self.cnx.vreg = self.vreg
-
- def setup(self, config=None):
- """config is passed by TestSuite but is ignored in this environment"""
- cursor = self.cnx.cursor()
- self.last_eid = cursor.execute('Any X WHERE X creation_date D ORDERBY D DESC LIMIT 1').rows[0][0]
-
- def cleanup(self):
- """cancel inserted elements during tests"""
- cursor = self.cnx.cursor()
- cursor.execute('DELETE Any X WHERE X eid > %(x)s', {'x' : self.last_eid}, eid_key='x')
- print "cleaning done"
- self.cnx.commit()
--- a/devtools/apptest.py Tue Aug 11 17:19:05 2009 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,506 +0,0 @@
-"""This module provides misc utilities to test instances
-
-:organization: Logilab
-:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.
-:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
-:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
-"""
-__docformat__ = "restructuredtext en"
-
-from copy import deepcopy
-
-import simplejson
-
-from logilab.common.testlib import TestCase
-from logilab.common.pytest import nocoverage
-from logilab.common.umessage import message_from_string
-
-from logilab.common.deprecation import deprecated
-
-from cubicweb.devtools import init_test_database, TestServerConfiguration, ApptestConfiguration
-from cubicweb.devtools._apptest import TestEnvironment
-from cubicweb.devtools.fake import FakeRequest
-
-from cubicweb.dbapi import repo_connect, ConnectionProperties, ProgrammingError
-
-
-MAILBOX = []
-class Email:
- def __init__(self, recipients, msg):
- self.recipients = recipients
- self.msg = msg
-
- @property
- def message(self):
- return message_from_string(self.msg)
-
- @property
- def subject(self):
- return self.message.get('Subject')
-
- @property
- def content(self):
- return self.message.get_payload(decode=True)
-
- def __repr__(self):
- return '<Email to %s with subject %s>' % (','.join(self.recipients),
- self.message.get('Subject'))
-
-class MockSMTP:
- def __init__(self, server, port):
- pass
- def close(self):
- pass
- def sendmail(self, helo_addr, recipients, msg):
- MAILBOX.append(Email(recipients, msg))
-
-from cubicweb import cwconfig
-cwconfig.SMTP = MockSMTP
-
-
-def get_versions(self, checkversions=False):
- """return the a dictionary containing cubes used by this instance
- as key with their version as value, including cubicweb version. This is a
- public method, not requiring a session id.
-
- replace Repository.get_versions by this method if you don't want versions
- checking
- """
- vcconf = {'cubicweb': self.config.cubicweb_version()}
- self.config.bootstrap_cubes()
- for pk in self.config.cubes():
- version = self.config.cube_version(pk)
- vcconf[pk] = version
- self.config._cubes = None
- return vcconf
-
-
-@property
-def late_binding_env(self):
- """builds TestEnvironment as late as possible"""
- if not hasattr(self, '_env'):
- self.__class__._env = TestEnvironment('data', configcls=self.configcls,
- requestcls=self.requestcls)
- return self._env
-
-
-class autoenv(type):
- """automatically set environment on EnvBasedTC subclasses if necessary
- """
- def __new__(mcs, name, bases, classdict):
- env = classdict.get('env')
- # try to find env in one of the base classes
- if env is None:
- for base in bases:
- env = getattr(base, 'env', None)
- if env is not None:
- classdict['env'] = env
- break
- if not classdict.get('__abstract__') and not classdict.get('env'):
- classdict['env'] = late_binding_env
- return super(autoenv, mcs).__new__(mcs, name, bases, classdict)
-
-
-class EnvBasedTC(TestCase):
- """abstract class for test using an apptest environment
- """
- __metaclass__ = autoenv
- __abstract__ = True
- env = None
- configcls = ApptestConfiguration
- requestcls = FakeRequest
-
- # user / session management ###############################################
-
- def user(self, req=None):
- if req is None:
- req = self.env.create_request()
- return self.env.cnx.user(req)
- else:
- return req.user
-
- def create_user(self, *args, **kwargs):
- return self.env.create_user(*args, **kwargs)
-
- def login(self, login, password=None):
- return self.env.login(login, password)
-
- def restore_connection(self):
- self.env.restore_connection()
-
- # db api ##################################################################
-
- @nocoverage
- def cursor(self, req=None):
- return self.env.cnx.cursor(req or self.request())
-
- @nocoverage
- def execute(self, *args, **kwargs):
- return self.env.execute(*args, **kwargs)
-
- @nocoverage
- def commit(self):
- self.env.cnx.commit()
-
- @nocoverage
- def rollback(self):
- try:
- self.env.cnx.rollback()
- except ProgrammingError:
- pass
-
- # other utilities #########################################################
- def set_debug(self, debugmode):
- from cubicweb.server import set_debug
- set_debug(debugmode)
-
- @property
- def config(self):
- return self.vreg.config
-
- def session(self):
- """return current server side session (using default manager account)"""
- return self.env.repo._sessions[self.env.cnx.sessionid]
-
- def request(self, *args, **kwargs):
- """return a web interface request"""
- return self.env.create_request(*args, **kwargs)
-
- @nocoverage
- def rset_and_req(self, *args, **kwargs):
- return self.env.get_rset_and_req(*args, **kwargs)
-
- def entity(self, rql, args=None, eidkey=None, req=None):
- return self.execute(rql, args, eidkey, req=req).get_entity(0, 0)
-
- def etype_instance(self, etype, req=None):
- req = req or self.request()
- e = self.env.vreg['etypes'].etype_class(etype)(req)
- e.eid = None
- return e
-
- def add_entity(self, etype, **kwargs):
- rql = ['INSERT %s X' % etype]
-
- # dict for replacement in RQL Request
- rql_args = {}
-
- if kwargs: #
- rql.append(':')
- # dict to define new entities variables
- entities = {}
-
- # assignement part of the request
- sub_rql = []
- for key, value in kwargs.iteritems():
- # entities
- if hasattr(value, 'eid'):
- new_value = "%s__" % key.upper()
-
- entities[new_value] = value.eid
- rql_args[new_value] = value.eid
-
- sub_rql.append("X %s %s" % (key, new_value))
- # final attributes
- else:
- sub_rql.append('X %s %%(%s)s' % (key, key))
- rql_args[key] = value
- rql.append(', '.join(sub_rql))
-
-
- if entities:
- rql.append('WHERE')
- # WHERE part of the request (to link entity to they eid)
- sub_rql = []
- for key, value in entities.iteritems():
- sub_rql.append("%s eid %%(%s)s" % (key, key))
- rql.append(', '.join(sub_rql))
-
- rql = ' '.join(rql)
- rset = self.execute(rql, rql_args)
- return rset.get_entity(0, 0)
-
- def set_option(self, optname, value):
- self.vreg.config.global_set_option(optname, value)
-
- def pviews(self, req, rset):
- return sorted((a.id, a.__class__) for a in self.vreg['views'].possible_views(req, rset=rset))
-
- def pactions(self, req, rset, skipcategories=('addrelated', 'siteactions', 'useractions')):
- return [(a.id, a.__class__) for a in self.vreg['actions'].possible_vobjects(req, rset=rset)
- if a.category not in skipcategories]
-
- def pactions_by_cats(self, req, rset, categories=('addrelated',)):
- return [(a.id, a.__class__) for a in self.vreg['actions'].possible_vobjects(req, rset=rset)
- if a.category in categories]
-
- paddrelactions = deprecated()(pactions_by_cats)
-
- def pactionsdict(self, req, rset, skipcategories=('addrelated', 'siteactions', 'useractions')):
- res = {}
- for a in self.vreg['actions'].possible_vobjects(req, rset=rset):
- if a.category not in skipcategories:
- res.setdefault(a.category, []).append(a.__class__)
- return res
-
-
- def remote_call(self, fname, *args):
- """remote call simulation"""
- dump = simplejson.dumps
- args = [dump(arg) for arg in args]
- req = self.request(fname=fname, pageid='123', arg=args)
- ctrl = self.vreg['controllers'].select('json', req)
- return ctrl.publish(), req
-
- # default test setup and teardown #########################################
-
- def setup_database(self):
- pass
-
- def setUp(self):
- self.restore_connection()
- session = self.session()
- #self.maxeid = self.execute('Any MAX(X)')
- session.set_pool()
- self.maxeid = session.system_sql('SELECT MAX(eid) FROM entities').fetchone()[0]
- self.app = self.env.app
- self.vreg = self.env.app.vreg
- self.schema = self.vreg.schema
- self.vreg.config.mode = 'test'
- # set default-dest-addrs to a dumb email address to avoid mailbox or
- # mail queue pollution
- self.set_option('default-dest-addrs', ['whatever'])
- self.setup_database()
- self.commit()
- MAILBOX[:] = [] # reset mailbox
-
- @nocoverage
- def tearDown(self):
- self.rollback()
- # self.env.restore_database()
- self.env.restore_connection()
- self.session().unsafe_execute('DELETE Any X WHERE X eid > %s' % self.maxeid)
- self.commit()
-
- # global resources accessors ###############################################
-
-# XXX
-try:
- from cubicweb.web import Redirect
- from urllib import unquote
-except ImportError:
- pass # cubicweb-web not installed
-else:
- class ControllerTC(EnvBasedTC):
- def setUp(self):
- super(ControllerTC, self).setUp()
- self.req = self.request()
- self.ctrl = self.vreg['controllers'].select('edit', self.req)
-
- def publish(self, req):
- assert req is self.ctrl.req
- try:
- result = self.ctrl.publish()
- req.cnx.commit()
- except Redirect:
- req.cnx.commit()
- raise
- return result
-
- def expect_redirect_publish(self, req=None):
- if req is not None:
- self.ctrl = self.vreg['controllers'].select('edit', req)
- else:
- req = self.req
- try:
- res = self.publish(req)
- except Redirect, ex:
- try:
- path, params = ex.location.split('?', 1)
- except:
- path, params = ex.location, ""
- req._url = path
- cleanup = lambda p: (p[0], unquote(p[1]))
- params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p)
- return req.relative_path(False), params # path.rsplit('/', 1)[-1], params
- else:
- self.fail('expected a Redirect exception')
-
-
-def make_late_binding_repo_property(attrname):
- @property
- def late_binding(self):
- """builds cnx as late as possible"""
- if not hasattr(self, attrname):
- # sets explicit test mode here to avoid autoreload
- from cubicweb.cwconfig import CubicWebConfiguration
- CubicWebConfiguration.mode = 'test'
- cls = self.__class__
- config = self.repo_config or TestServerConfiguration('data')
- cls._repo, cls._cnx = init_test_database('sqlite', config=config)
- return getattr(self, attrname)
- return late_binding
-
-
-class autorepo(type):
- """automatically set repository on RepositoryBasedTC subclasses if necessary
- """
- def __new__(mcs, name, bases, classdict):
- repo = classdict.get('repo')
- # try to find repo in one of the base classes
- if repo is None:
- for base in bases:
- repo = getattr(base, 'repo', None)
- if repo is not None:
- classdict['repo'] = repo
- break
- if name != 'RepositoryBasedTC' and not classdict.get('repo'):
- classdict['repo'] = make_late_binding_repo_property('_repo')
- classdict['cnx'] = make_late_binding_repo_property('_cnx')
- return super(autorepo, mcs).__new__(mcs, name, bases, classdict)
-
-
-class RepositoryBasedTC(TestCase):
- """abstract class for test using direct repository connections
- """
- __metaclass__ = autorepo
- repo_config = None # set a particular config instance if necessary
-
- # user / session management ###############################################
-
- def create_user(self, user, groups=('users',), password=None, commit=True):
- if password is None:
- password = user
- eid = self.execute('INSERT CWUser X: X login %(x)s, X upassword %(p)s,'
- 'X in_state S WHERE S name "activated"',
- {'x': unicode(user), 'p': password})[0][0]
- groups = ','.join(repr(group) for group in groups)
- self.execute('SET X in_group Y WHERE X eid %%(x)s, Y name IN (%s)' % groups,
- {'x': eid})
- if commit:
- self.commit()
- self.session.reset_pool()
- return eid
-
- def login(self, login, password=None):
- cnx = repo_connect(self.repo, unicode(login), password or login,
- ConnectionProperties('inmemory'))
- self.cnxs.append(cnx)
- return cnx
-
- def current_session(self):
- return self.repo._sessions[self.cnxs[-1].sessionid]
-
- def restore_connection(self):
- assert len(self.cnxs) == 1, self.cnxs
- cnx = self.cnxs.pop()
- try:
- cnx.close()
- except Exception, ex:
- print "exception occured while closing connection", ex
-
- # db api ##################################################################
-
- def execute(self, rql, args=None, eid_key=None):
- assert self.session.id == self.cnxid
- rset = self.__execute(self.cnxid, rql, args, eid_key)
- rset.vreg = self.vreg
- rset.req = self.session
- # call to set_pool is necessary to avoid pb when using
- # instance entities for convenience
- self.session.set_pool()
- return rset
-
- def commit(self):
- self.__commit(self.cnxid)
- self.session.set_pool()
-
- def rollback(self):
- self.__rollback(self.cnxid)
- self.session.set_pool()
-
- def close(self):
- self.__close(self.cnxid)
-
- # other utilities #########################################################
-
- def set_debug(self, debugmode):
- from cubicweb.server import set_debug
- set_debug(debugmode)
-
- def set_option(self, optname, value):
- self.vreg.config.global_set_option(optname, value)
-
- def add_entity(self, etype, **kwargs):
- restrictions = ', '.join('X %s %%(%s)s' % (key, key) for key in kwargs)
- rql = 'INSERT %s X' % etype
- if kwargs:
- rql += ': %s' % ', '.join('X %s %%(%s)s' % (key, key) for key in kwargs)
- rset = self.execute(rql, kwargs)
- return rset.get_entity(0, 0)
-
- def default_user_password(self):
- config = self.repo.config #TestConfiguration('data')
- user = unicode(config.sources()['system']['db-user'])
- passwd = config.sources()['system']['db-password']
- return user, passwd
-
- def close_connections(self):
- for cnx in self.cnxs:
- try:
- cnx.rollback()
- cnx.close()
- except:
- continue
- self.cnxs = []
-
- pactions = EnvBasedTC.pactions.im_func
- pactionsdict = EnvBasedTC.pactionsdict.im_func
-
- # default test setup and teardown #########################################
-
- def _prepare(self):
- MAILBOX[:] = [] # reset mailbox
- if hasattr(self, 'cnxid'):
- return
- repo = self.repo
- self.__execute = repo.execute
- self.__commit = repo.commit
- self.__rollback = repo.rollback
- self.__close = repo.close
- self.cnxid = self.cnx.sessionid
- self.session = repo._sessions[self.cnxid]
- self.cnxs = []
- # reset caches, they may introduce bugs among tests
- repo._type_source_cache = {}
- repo._extid_cache = {}
- repo.querier._rql_cache = {}
- for source in repo.sources:
- source.reset_caches()
- for s in repo.sources:
- if hasattr(s, '_cache'):
- s._cache = {}
-
- @property
- def config(self):
- return self.repo.config
-
- @property
- def vreg(self):
- return self.repo.vreg
-
- @property
- def schema(self):
- return self.repo.schema
-
- def setUp(self):
- self._prepare()
- self.session.set_pool()
- self.maxeid = self.session.system_sql('SELECT MAX(eid) FROM entities').fetchone()[0]
-
- def tearDown(self):
- self.close_connections()
- self.rollback()
- self.session.unsafe_execute('DELETE Any X WHERE X eid > %(x)s', {'x': self.maxeid})
- self.commit()
-
--- a/devtools/devctl.py Tue Aug 11 17:19:05 2009 +0200
+++ b/devtools/devctl.py Tue Aug 11 17:28:18 2009 +0200
@@ -119,6 +119,7 @@
libschema = libconfig.load_schema(remove_unused_rtypes=False)
entities = [e for e in schema.entities() if not e in libschema]
else:
+ libschema = None
entities = schema.entities()
done = set()
for eschema in sorted(entities):
--- a/devtools/fake.py Tue Aug 11 17:19:05 2009 +0200
+++ b/devtools/fake.py Tue Aug 11 17:28:18 2009 +0200
@@ -7,7 +7,6 @@
"""
__docformat__ = "restructuredtext en"
-from logilab.common.testlib import mock_object as Mock
from logilab.common.adbh import get_adv_func_helper
from indexer import get_indexer
--- a/devtools/htmlparser.py Tue Aug 11 17:19:05 2009 +0200
+++ b/devtools/htmlparser.py Tue Aug 11 17:28:18 2009 +0200
@@ -169,3 +169,5 @@
except KeyError:
continue
return False
+
+VALMAP = {None: None, 'dtd': DTDValidator, 'xml': SaxOnlyValidator}
--- a/devtools/livetest.py Tue Aug 11 17:19:05 2009 +0200
+++ b/devtools/livetest.py Tue Aug 11 17:28:18 2009 +0200
@@ -6,9 +6,10 @@
:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
"""
+import os
import socket
import logging
-from os.path import join, dirname, exists
+from os.path import join, dirname, normpath, abspath
from StringIO import StringIO
#from twisted.application import service, strports
@@ -21,10 +22,9 @@
from logilab.common.testlib import TestCase
-import cubicweb.web
from cubicweb.dbapi import in_memory_cnx
from cubicweb.etwist.server import CubicWebRootResource
-from cubicweb.devtools import LivetestConfiguration, init_test_database
+from cubicweb.devtools import BaseApptestConfiguration, init_test_database
@@ -50,25 +50,57 @@
+class LivetestConfiguration(BaseApptestConfiguration):
+ init_repository = False
+
+ def __init__(self, cube=None, sourcefile=None, pyro_name=None,
+ log_threshold=logging.CRITICAL):
+ BaseApptestConfiguration.__init__(self, cube, log_threshold=log_threshold)
+ self.appid = pyro_name or cube
+ # don't change this, else some symlink problems may arise in some
+ # environment (e.g. mine (syt) ;o)
+ # XXX I'm afraid this test will prevent to run test from a production
+ # environment
+ self._sources = None
+ # instance cube test
+ if cube is not None:
+ self.apphome = self.cube_dir(cube)
+ elif 'web' in os.getcwd().split(os.sep):
+ # web test
+ self.apphome = join(normpath(join(dirname(__file__), '..')), 'web')
+ else:
+ # cube test
+ self.apphome = abspath('..')
+ self.sourcefile = sourcefile
+ self.global_set_option('realm', '')
+ self.use_pyro = pyro_name is not None
+
+ def pyro_enabled(self):
+ if self.use_pyro:
+ return True
+ else:
+ return False
+
+
+
def make_site(cube, options=None):
from cubicweb.etwist import twconfig # trigger configuration registration
- sourcefile = options.sourcefile
- config = LivetestConfiguration(cube, sourcefile,
+ config = LivetestConfiguration(cube, options.sourcefile,
pyro_name=options.pyro_name,
log_threshold=logging.DEBUG)
- source = config.sources()['system']
- init_test_database(driver=source['db-driver'], config=config)
+ init_test_database(config=config)
# if '-n' in sys.argv: # debug mode
cubicweb = LivetestResource(config, debug=True)
toplevel = cubicweb
website = server.Site(toplevel)
cube_dir = config.cube_dir(cube)
+ source = config.sources()['system']
for port in xrange(7777, 7798):
try:
reactor.listenTCP(port, channel.HTTPFactory(website))
saveconf(cube_dir, port, source['db-user'], source['db-password'])
break
- except CannotListenError, exc:
+ except CannotListenError:
print "port %s already in use, I will try another one" % port
else:
raise
--- a/devtools/migrtest.py Tue Aug 11 17:19:05 2009 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,153 +0,0 @@
-"""Migration test script
-
-* migration will be played into a chroot of the local machine
-* the database server used can be configured
-* test tested instance may be on another host
-
-
-We are using postgres'.pgpass file. Here is a copy of postgres documentation
-about that:
-
-The file .pgpass in a user's home directory or the file referenced by
-PGPASSFILE can contain passwords to be used if the connection requires
-a password (and no password has been specified otherwise).
-
-
-This file should contain lines of the following format:
-
-hostname:port:database:username:password
-
-Each of the first four fields may be a literal value, or *, which
-matches anything. The password field from the first line that matches
-the current connection parameters will be used. (Therefore, put
-more-specific entries first when you are using wildcards.) If an entry
-needs to contain : or \, escape this character with \. A hostname of
-localhost matches both host (TCP) and local (Unix domain socket)
-connections coming from the local machine.
-
-The permissions on .pgpass must disallow any access to world or group;
-achieve this by the command chmod 0600 ~/.pgpass. If the permissions
-are less strict than this, the file will be ignored.
-
-:organization: Logilab
-:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.
-:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
-:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
-"""
-__docformat__ = "restructuredtext en"
-
-from os import system
-from os.path import join, basename
-
-from logilab.common.shellutils import cp, rm
-
-from cubicweb.toolsutils import read_config
-from cubicweb.server.utils import generate_sources_file
-
-# XXXX use db-copy instead
-
-# test environment configuration
-chrootpath = '/sandbox/cubicwebtest'
-tmpdbhost = 'crater'
-tmpdbuser = 'syt'
-tmpdbpasswd = 'syt'
-
-def play_migration(applhome, applhost='', sudo=False):
- applid = dbname = basename(applhome)
- testapplhome = join(chrootpath, applhome)
- # copy instance into the chroot
- if applhost:
- system('scp -r %s:%s %s' % (applhost, applhome, testapplhome))
- else:
- cp(applhome, testapplhome)
-## # extract db parameters
-## sources = read_config(join(testapplhome, 'sources'))
-## dbname = sources['system']['db-name']
-## dbhost = sources['system'].get('db-host') or ''
-## dbuser = sources['system'].get('db-user') or ''
-## dbpasswd = sources['system'].get('db-password') or ''
- # generate sources file
- # XXX multisources
- sources = {'system': {}}
- sources['system']['db-encoding'] = 'UTF8' # XXX
- sources['system']['db-name'] = dbname
- sources['system']['db-host'] = None
- sources['system']['db-user'] = tmpdbuser
- sources['system']['db-password'] = None
- generate_sources_file(join(testapplhome, 'sources'), sources)
-## # create postgres password file so we won't need anymore passwords
-## # XXX may exist!
-## pgpassfile = expanduser('~/.pgpass')
-## pgpass = open(pgpassfile, 'w')
-## if dbpasswd:
-## pgpass.write('%s:*:%s:%s:%s\n' % (dbhost or applhost or 'localhost',
-## dbname, dbuser, dbpasswd))
-## if tmpdbpasswd:
-## pgpass.write('%s:*:%s:%s:%s\n' % (tmpdbhost or 'localhost', dbname,
-## tmpdbuser, tmpdbpasswd))
-## pgpass.close()
-## chmod(pgpassfile, 0600)
- # dump db
-## dumpcmd = 'pg_dump -Fc -U %s -f /tmp/%s.dump %s' % (
-## dbuser, dbname, dbname)
-## if dbhost:
-## dumpcmd += ' -h %s' % dbhost
- dumpfile = '/tmp/%s.dump' % applid
- dumpcmd = 'cubicweb-ctl db-dump --output=%s %s' % (dumpfile, applid)
- if sudo:
- dumpcmd = 'sudo %s' % dumpcmd
- if applhost:
- dumpcmd = 'ssh %s "%s"' % (applhost, dumpcmd)
- if system(dumpcmd):
- raise Exception('error while dumping the database')
-## if not dbhost and applhost:
- if applhost:
- # retrieve the dump
- if system('scp %s:%s %s' % (applhost, dumpfile, dumpfile)):
- raise Exception('error while retreiving the dump')
- # move the dump into the chroot
- system('mv %s %s%s' % (dumpfile, chrootpath, dumpfile))
- # locate installed versions
- vcconf = read_config(join(testapplhome, 'vc.conf'))
- template = vcconf['TEMPLATE']
- cubicwebversion = vcconf['CW']
- templversion = vcconf['TEMPLATE_VERSION']
- # install the same versions cubicweb and template versions into the chroot
- system('sudo chroot %s apt-get update' % chrootpath)
- system('sudo chroot %s apt-get install cubicweb-server=%s cubicweb-client=%s'
- % (chrootpath, cubicwebversion, cubicwebversion))
- system('sudo chroot %s apt-get install cubicweb-%s-appl-server=%s cubicweb-%s-appl-client=%s'
- % (chrootpath, template, templversion, template, templversion))
- # update and upgrade to the latest version
- system('sudo chroot %s apt-get install cubicweb-server cubicweb-client' % chrootpath)
- system('sudo chroot %s apt-get install cubicweb-%s-appl-server cubicweb-%s-appl-client'
- % (chrootpath, template, template))
- # create and fill the database
- system('sudo chroot cubicweb-ctl db-restore %s %s' % (applid, dumpfile))
-## if not tmpdbhost:
-## system('createdb -U %s -T template0 -E UTF8 %s' % (tmpdbuser, dbname))
-## system('pg_restore -U %s -O -Fc -d %s /tmp/%s.dump'
-## % (tmpdbuser, dbname, dbname))
-## else:
-## system('createdb -h %s -U %s -T template0 -E UTF8 %s'
-## % (tmpdbhost, tmpdbuser, dbname))
-## system('pg_restore -h %s -U %s -O -Fc -d %s /tmp/%s.dump'
-## % (tmpdbhost, tmpdbuser, dbname, dbname))
- # launch upgrade
- system('sudo chroot %s cubicweb-ctl upgrade %s' % (chrootpath, applid))
-
- # cleanup
- rm(testapplhome)
-## rm(pgpassfile)
-## if tmpdbhost:
-## system('dropdb -h %s -U %s %s' % (tmpdbuser, tmpdbhost, dbname))
-## else:
-## system('dropdb -U %s %s' % (tmpdbuser, dbname))
-## if not dbhost and applhost:
- if applhost:
- system('ssh %s rm %s' % (applhost, dumpfile))
- rm('%s%s' % (chrootpath, dumpfile))
-
-
-if __name__ == '__main__':
- play_migration('/etc/cubicweb.d/jpl', 'lepus')
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/devtools/realdbtest.py Tue Aug 11 17:28:18 2009 +0200
@@ -0,0 +1,43 @@
+import logging
+from cubicweb import toolsutils
+from cubicweb.devtools import DEFAULT_SOURCES, BaseApptestConfiguration
+
+class RealDatabaseConfiguration(BaseApptestConfiguration):
+ init_repository = False
+ sourcesdef = DEFAULT_SOURCES.copy()
+
+ def sources(self):
+ """
+ By default, we run tests with the sqlite DB backend.
+ One may use its own configuration by just creating a
+ 'sources' file in the test directory from wich tests are
+ launched.
+ """
+ self._sources = self.sourcesdef
+ return self._sources
+
+
+def buildconfig(dbuser, dbpassword, dbname, adminuser, adminpassword, dbhost=None):
+ """convenience function that builds a real-db configuration class"""
+ sourcesdef = {'system': {'adapter' : 'native',
+ 'db-encoding' : 'UTF-8', #'ISO-8859-1',
+ 'db-user' : dbuser,
+ 'db-password' : dbpassword,
+ 'db-name' : dbname,
+ 'db-driver' : 'postgres',
+ 'db-host' : dbhost,
+ },
+ 'admin' : {'login': adminuser,
+ 'password': adminpassword,
+ },
+ }
+ return type('MyRealDBConfig', (RealDatabaseConfiguration,),
+ {'sourcesdef': sourcesdef})
+
+
+def loadconfig(filename):
+ """convenience function that builds a real-db configuration class
+ from a file
+ """
+ return type('MyRealDBConfig', (RealDatabaseConfiguration,),
+ {'sourcesdef': toolsutils.read_config(filename)})
--- a/devtools/test/unittest_testlib.py Tue Aug 11 17:19:05 2009 +0200
+++ b/devtools/test/unittest_testlib.py Tue Aug 11 17:28:18 2009 +0200
@@ -10,11 +10,11 @@
from unittest import TestSuite
-from logilab.common.testlib import (TestCase, unittest_main, mock_object,
+from logilab.common.testlib import (TestCase, unittest_main,
SkipAwareTextTestRunner)
+
from cubicweb.devtools import htmlparser
-
-from cubicweb.devtools.testlib import WebTest, EnvBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
class WebTestTC(TestCase):
@@ -23,7 +23,7 @@
self.runner = SkipAwareTextTestRunner(stream=output)
def test_error_raised(self):
- class MyWebTest(WebTest):
+ class MyWebTest(CubicWebTC):
def test_error_view(self):
self.add_entity('Bug', title=u"bt")
@@ -39,7 +39,7 @@
self.assertEquals(len(result.failures), 1)
-class TestLibTC(EnvBasedTC):
+class TestLibTC(CubicWebTC):
def test_add_entity_with_relation(self):
bug = self.add_entity(u'Bug', title=u"toto")
self.add_entity(u'Bug', title=u"tata", identical_to=bug)
--- a/devtools/testlib.py Tue Aug 11 17:19:05 2009 +0200
+++ b/devtools/testlib.py Tue Aug 11 17:28:18 2009 +0200
@@ -1,4 +1,4 @@
-"""this module contains base classes for web tests
+"""this module contains base classes and utilities for cubicweb tests
:organization: Logilab
:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.
@@ -7,84 +7,505 @@
"""
__docformat__ = "restructuredtext en"
+import os
import sys
+import re
+from urllib import unquote
from math import log
-from logilab.common.debugger import Debugger
-from logilab.common.testlib import InnerTest
-from logilab.common.pytest import nocoverage
+import simplejson
+
+import yams.schema
-from cubicweb.devtools import VIEW_VALIDATORS
-from cubicweb.devtools.apptest import EnvBasedTC
-from cubicweb.devtools._apptest import unprotected_entities, SYSTEM_RELATIONS
-from cubicweb.devtools.htmlparser import DTDValidator, SaxOnlyValidator, HTMLValidator
-from cubicweb.devtools.fill import insert_entity_queries, make_relations_queries
+from logilab.common.testlib import TestCase, InnerTest
+from logilab.common.pytest import nocoverage, pause_tracing, resume_tracing
+from logilab.common.debugger import Debugger
+from logilab.common.umessage import message_from_string
+from logilab.common.decorators import cached, classproperty
+from logilab.common.deprecation import deprecated
-from cubicweb.sobjects.notification import NotificationView
-
-from cubicweb.vregistry import NoSelectableObject
+from cubicweb import NoSelectableObject, cwconfig, devtools, web, server
+from cubicweb.dbapi import repo_connect, ConnectionProperties, ProgrammingError
+from cubicweb.sobjects import notification
+from cubicweb.web import application
+from cubicweb.devtools import SYSTEM_ENTITIES, SYSTEM_RELATIONS, VIEW_VALIDATORS
+from cubicweb.devtools import fake, htmlparser
-## TODO ###############
-# creation tests: make sure an entity was actually created
-# Existing Test Environment
+# low-level utilities ##########################################################
class CubicWebDebugger(Debugger):
-
+ """special debugger class providing a 'view' function which saves some
+ html into a temporary file and open a web browser to examinate it.
+ """
def do_view(self, arg):
import webbrowser
data = self._getval(arg)
file('/tmp/toto.html', 'w').write(data)
webbrowser.open('file:///tmp/toto.html')
-def how_many_dict(schema, cursor, how_many, skip):
- """compute how many entities by type we need to be able to satisfy relations
- cardinality
- """
- # compute how many entities by type we need to be able to satisfy relation constraint
- relmap = {}
- for rschema in schema.relations():
- if rschema.is_final():
- continue
- for subj, obj in rschema.iter_rdefs():
- card = rschema.rproperty(subj, obj, 'cardinality')
- if card[0] in '1?' and len(rschema.subjects(obj)) == 1:
- relmap.setdefault((rschema, subj), []).append(str(obj))
- if card[1] in '1?' and len(rschema.objects(subj)) == 1:
- relmap.setdefault((rschema, obj), []).append(str(subj))
- unprotected = unprotected_entities(schema)
- for etype in skip:
- unprotected.add(etype)
- howmanydict = {}
- for etype in unprotected_entities(schema, strict=True):
- howmanydict[str(etype)] = cursor.execute('Any COUNT(X) WHERE X is %s' % etype)[0][0]
- if etype in unprotected:
- howmanydict[str(etype)] += how_many
- for (rschema, etype), targets in relmap.iteritems():
- # XXX should 1. check no cycle 2. propagate changes
- relfactor = sum(howmanydict[e] for e in targets)
- howmanydict[str(etype)] = max(relfactor, howmanydict[etype])
- return howmanydict
-
def line_context_filter(line_no, center, before=3, after=None):
"""return true if line are in context
- if after is None: after = before"""
+
+ if after is None: after = before
+ """
if after is None:
after = before
return center - before <= line_no <= center + after
-## base webtest class #########################################################
-VALMAP = {None: None, 'dtd': DTDValidator, 'xml': SaxOnlyValidator}
+
+def unprotected_entities(schema, strict=False):
+ """returned a set of each non final entity type, excluding "system" entities
+ (eg CWGroup, CWUser...)
+ """
+ if strict:
+ protected_entities = yams.schema.BASE_TYPES
+ else:
+ protected_entities = yams.schema.BASE_TYPES.union(SYSTEM_ENTITIES)
+ return set(schema.entities()) - protected_entities
+
+
+def get_versions(self, checkversions=False):
+ """return the a dictionary containing cubes used by this instance
+ as key with their version as value, including cubicweb version. This is a
+ public method, not requiring a session id.
+
+ replace Repository.get_versions by this method if you don't want versions
+ checking
+ """
+ vcconf = {'cubicweb': self.config.cubicweb_version()}
+ self.config.bootstrap_cubes()
+ for pk in self.config.cubes():
+ version = self.config.cube_version(pk)
+ vcconf[pk] = version
+ self.config._cubes = None
+ return vcconf
+
+
+def refresh_repo(repo):
+ devtools.reset_test_database(repo.config)
+ for pool in repo.pools:
+ pool.reconnect()
+ repo._type_source_cache = {}
+ repo._extid_cache = {}
+ repo.querier._rql_cache = {}
+ for source in repo.sources:
+ source.reset_caches()
+
+
+# email handling, to test emails sent by an application ########################
+
+MAILBOX = []
+
+class Email:
+ """you'll get instances of Email into MAILBOX during tests that trigger
+ some notification.
+
+ * `msg` is the original message object
+
+ * `recipients` is a list of email address which are the recipients of this
+ message
+ """
+ def __init__(self, recipients, msg):
+ self.recipients = recipients
+ self.msg = msg
+
+ @property
+ def message(self):
+ return message_from_string(self.msg)
+
+ @property
+ def subject(self):
+ return self.message.get('Subject')
+
+ @property
+ def content(self):
+ return self.message.get_payload(decode=True)
+
+ def __repr__(self):
+ return '<Email to %s with subject %s>' % (','.join(self.recipients),
+ self.message.get('Subject'))
+
+# the trick to get email into MAILBOX instead of actually sent: monkey patch
+# cwconfig.SMTP object
+class MockSMTP:
+ def __init__(self, server, port):
+ pass
+ def close(self):
+ pass
+ def sendmail(self, helo_addr, recipients, msg):
+ MAILBOX.append(Email(recipients, msg))
+
+cwconfig.SMTP = MockSMTP
+
+
+# base class for cubicweb tests requiring a full cw environments ###############
+
+class CubicWebTC(TestCase):
+ """abstract class for test using an apptest environment
+
+ attributes:
+ `vreg`, the vregistry
+ `schema`, self.vreg.schema
+ `config`, cubicweb configuration
+ `cnx`, dbapi connection to the repository using an admin user
+ `session`, server side session associated to `cnx`
+ `app`, the cubicweb publisher (for web testing)
+ `repo`, the repository object
+
+ `admlogin`, login of the admin user
+ `admpassword`, password of the admin user
+
+ """
+ appid = 'data'
+ configcls = devtools.ApptestConfiguration
+
+ @classproperty
+ def config(cls):
+ """return the configuration object. Configuration is cached on the test
+ class.
+ """
+ try:
+ return cls.__dict__['_config']
+ except KeyError:
+ config = cls._config = cls.configcls(cls.appid)
+ config.mode = 'test'
+ return config
+
+ @classmethod
+ def init_config(cls, config):
+ """configuration initialization hooks. You may want to override this."""
+ source = config.sources()['system']
+ cls.admlogin = unicode(source['db-user'])
+ cls.admpassword = source['db-password']
+ # uncomment the line below if you want rql queries to be logged
+ #config.global_set_option('query-log-file', '/tmp/test_rql_log.' + `os.getpid()`)
+ config.global_set_option('log-file', None)
+ # set default-dest-addrs to a dumb email address to avoid mailbox or
+ # mail queue pollution
+ config.global_set_option('default-dest-addrs', ['whatever'])
+ try:
+ send_to = '%s@logilab.fr' % os.getlogin()
+ except OSError:
+ send_to = '%s@logilab.fr' % (os.environ.get('USER')
+ or os.environ.get('USERNAME')
+ or os.environ.get('LOGNAME'))
+ config.global_set_option('sender-addr', send_to)
+ config.global_set_option('default-dest-addrs', send_to)
+ config.global_set_option('sender-name', 'cubicweb-test')
+ config.global_set_option('sender-addr', 'cubicweb-test@logilab.fr')
+ # web resources
+ config.global_set_option('base-url', devtools.BASE_URL)
+ try:
+ config.global_set_option('embed-allowed', re.compile('.*'))
+ except: # not in server only configuration
+ pass
+
+ @classmethod
+ def _init_repo(cls):
+ """init the repository and connection to it.
+
+ Repository and connection are cached on the test class. Once
+ initialized, we simply reset connections and repository caches.
+ """
+ if not 'repo' in cls.__dict__:
+ cls._build_repo()
+ else:
+ cls.cnx.rollback()
+ cls._refresh_repo()
+
+ @classmethod
+ def _build_repo(cls):
+ cls.repo, cls.cnx = devtools.init_test_database(config=cls.config)
+ cls.init_config(cls.config)
+ cls.vreg = cls.repo.vreg
+ cls._orig_cnx = cls.cnx
+ cls.config.repository = lambda x=None: cls.repo
+ # necessary for authentication tests
+ cls.cnx.login = cls.admlogin
+ cls.cnx.password = cls.admpassword
+
+ @classmethod
+ def _refresh_repo(cls):
+ refresh_repo(cls.repo)
+
+ # global resources accessors ###############################################
+
+ @property
+ def schema(self):
+ """return the application schema"""
+ return self.vreg.schema
+
+ @property
+ def session(self):
+ """return current server side session (using default manager account)"""
+ return self.repo._sessions[self.cnx.sessionid]
+
+ @property
+ def adminsession(self):
+ """return current server side session (using default manager account)"""
+ return self.repo._sessions[self._orig_cnx.sessionid]
+
+ def set_option(self, optname, value):
+ self.config.global_set_option(optname, value)
+
+ def set_debug(self, debugmode):
+ server.set_debug(debugmode)
+
+ # default test setup and teardown #########################################
+
+ def setUp(self):
+ pause_tracing()
+ self._init_repo()
+ resume_tracing()
+ self.setup_database()
+ self.commit()
+ MAILBOX[:] = [] # reset mailbox
+
+ def setup_database(self):
+ """add your database setup code by overriding this method"""
+
+ # user / session management ###############################################
+
+ def user(self, req=None):
+ """return the application schema"""
+ if req is None:
+ req = self.request()
+ return self.cnx.user(req)
+ else:
+ return req.user
-class WebTest(EnvBasedTC):
- """base class for web tests"""
- __abstract__ = True
+ def create_user(self, login, groups=('users',), password=None, req=None,
+ commit=True):
+ """create and return a new user entity"""
+ if password is None:
+ password = login.encode('utf8')
+ cursor = self._orig_cnx.cursor(req or self.request())
+ rset = cursor.execute('INSERT CWUser X: X login %(login)s, X upassword %(passwd)s,'
+ 'X in_state S WHERE S name "activated"',
+ {'login': unicode(login), 'passwd': password})
+ user = rset.get_entity(0, 0)
+ cursor.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)'
+ % ','.join(repr(g) for g in groups),
+ {'x': user.eid}, 'x')
+ user.clear_related_cache('in_group', 'subject')
+ if commit:
+ self._orig_cnx.commit()
+ return user
+
+ def login(self, login, password=None):
+ """return a connection for the given login/password"""
+ if login == self.admlogin:
+ self.restore_connection()
+ else:
+ self.cnx = repo_connect(self.repo, unicode(login),
+ password or str(login),
+ ConnectionProperties('inmemory'))
+ if login == self.vreg.config.anonymous_user()[0]:
+ self.cnx.anonymous_connection = True
+ return self.cnx
+
+ def restore_connection(self):
+ if not self.cnx is self._orig_cnx:
+ try:
+ self.cnx.close()
+ except ProgrammingError:
+ pass # already closed
+ self.cnx = self._orig_cnx
+
+ # db api ##################################################################
+
+ @nocoverage
+ def cursor(self, req=None):
+ return self.cnx.cursor(req or self.request())
+
+ @nocoverage
+ def execute(self, rql, args=None, eidkey=None, req=None):
+ """executes <rql>, builds a resultset, and returns a couple (rset, req)
+ where req is a FakeRequest
+ """
+ req = req or self.request(rql=rql)
+ return self.cnx.cursor(req).execute(unicode(rql), args, eidkey)
+
+ @nocoverage
+ def commit(self):
+ self.cnx.commit()
+
+ @nocoverage
+ def rollback(self):
+ try:
+ self.cnx.rollback()
+ except ProgrammingError:
+ pass
+
+ # # server side db api #######################################################
+
+ def sexecute(self, rql, args=None, eid_key=None):
+ self.session.set_pool()
+ return self.session.execute(rql, args, eid_key)
+
+ # other utilities #########################################################
+
+ def entity(self, rql, args=None, eidkey=None, req=None):
+ return self.execute(rql, args, eidkey, req=req).get_entity(0, 0)
+
+ def add_entity(self, etype, req=None, **kwargs):
+ rql = ['INSERT %s X' % etype]
+ # dict for replacement in RQL Request
+ args = {}
+ if kwargs:
+ rql.append(':')
+ # dict to define new entities variables
+ entities = {}
+ # assignement part of the request
+ sub_rql = []
+ for key, value in kwargs.iteritems():
+ # entities
+ if hasattr(value, 'eid'):
+ new_value = "%s__" % key.upper()
+ entities[new_value] = value.eid
+ args[new_value] = value.eid
+
+ sub_rql.append("X %s %s" % (key, new_value))
+ # final attributes
+ else:
+ sub_rql.append('X %s %%(%s)s' % (key, key))
+ args[key] = value
+ rql.append(', '.join(sub_rql))
+ if entities:
+ rql.append('WHERE')
+ # WHERE part of the request (to link entity to they eid)
+ sub_rql = []
+ for key, value in entities.iteritems():
+ sub_rql.append("%s eid %%(%s)s" % (key, key))
+ rql.append(', '.join(sub_rql))
+ return self.execute(' '.join(rql), args, req=req).get_entity(0, 0)
+
+ # vregistry inspection utilities ###########################################
+
+ def pviews(self, req, rset):
+ return sorted((a.id, a.__class__)
+ for a in self.vreg['views'].possible_views(req, rset=rset))
- pdbclass = CubicWebDebugger
- # this is a hook to be able to define a list of rql queries
- # that are application dependent and cannot be guessed automatically
- application_rql = []
+ def pactions(self, req, rset,
+ skipcategories=('addrelated', 'siteactions', 'useractions')):
+ return [(a.id, a.__class__)
+ for a in self.vreg['actions'].possible_vobjects(req, rset=rset)
+ if a.category not in skipcategories]
+
+ def pactions_by_cats(self, req, rset, categories=('addrelated',)):
+ return [(a.id, a.__class__)
+ for a in self.vreg['actions'].possible_vobjects(req, rset=rset)
+ if a.category in categories]
+
+ def pactionsdict(self, req, rset,
+ skipcategories=('addrelated', 'siteactions', 'useractions')):
+ res = {}
+ for a in self.vreg['actions'].possible_vobjects(req, rset=rset):
+ if a.category not in skipcategories:
+ res.setdefault(a.category, []).append(a.__class__)
+ return res
+
+ def list_views_for(self, rset):
+ """returns the list of views that can be applied on `rset`"""
+ req = rset.req
+ only_once_vids = ('primary', 'secondary', 'text')
+ req.data['ex'] = ValueError("whatever")
+ viewsvreg = self.vreg['views']
+ for vid, views in viewsvreg.items():
+ if vid[0] == '_':
+ continue
+ if rset.rowcount > 1 and vid in only_once_vids:
+ continue
+ views = [view for view in views
+ if view.category != 'startupview'
+ and not issubclass(view, NotificationView)]
+ if views:
+ try:
+ view = viewsvreg._select_best(views, req, rset=rset)
+ if view.linkable():
+ yield view
+ else:
+ not_selected(self.vreg, view)
+ # else the view is expected to be used as subview and should
+ # not be tested directly
+ except NoSelectableObject:
+ continue
+
+ def list_actions_for(self, rset):
+ """returns the list of actions that can be applied on `rset`"""
+ req = rset.req
+ for action in self.vreg['actions'].possible_objects(req, rset=rset):
+ yield action
+
+ def list_boxes_for(self, rset):
+ """returns the list of boxes that can be applied on `rset`"""
+ req = rset.req
+ for box in self.vreg['boxes'].possible_objects(req, rset=rset):
+ yield box
+
+ def list_startup_views(self):
+ """returns the list of startup views"""
+ req = self.request()
+ for view in self.vreg['views'].possible_views(req, None):
+ if view.category == 'startupview':
+ yield view.id
+ else:
+ not_selected(self.vreg, view)
+
+ # web ui testing utilities #################################################
+
+ @property
+ @cached
+ def app(self):
+ """return a cubicweb publisher"""
+ return application.CubicWebPublisher(self.config, vreg=self.vreg)
+
+ requestcls = fake.FakeRequest
+ def request(self, *args, **kwargs):
+ """return a web ui request"""
+ req = self.requestcls(self.vreg, form=kwargs)
+ req.set_connection(self.cnx)
+ return req
+
+ def remote_call(self, fname, *args):
+ """remote json call simulation"""
+ dump = simplejson.dumps
+ args = [dump(arg) for arg in args]
+ req = self.request(fname=fname, pageid='123', arg=args)
+ ctrl = self.vreg['controllers'].select('json', req)
+ return ctrl.publish(), req
+
+ def publish(self, req):
+ """call the publish method of the edit controller"""
+ ctrl = self.vreg['controllers'].select('edit', req)
+ try:
+ result = ctrl.publish()
+ req.cnx.commit()
+ except web.Redirect:
+ req.cnx.commit()
+ raise
+ return result
+
+ def expect_redirect_publish(self, req):
+ """call the publish method of the edit controller, expecting to get a
+ Redirect exception."""
+ try:
+ self.publish(req)
+ except web.Redirect, ex:
+ try:
+ path, params = ex.location.split('?', 1)
+ except:
+ path, params = ex.location, ""
+ req._url = path
+ cleanup = lambda p: (p[0], unquote(p[1]))
+ params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p)
+ return req.relative_path(False), params # path.rsplit('/', 1)[-1], params
+ else:
+ self.fail('expected a Redirect exception')
+
+ # content validation #######################################################
# validators are used to validate (XML, DTD, whatever) view's content
# validators availables are :
@@ -99,8 +520,8 @@
# snippets
#'text/html': DTDValidator,
#'application/xhtml+xml': DTDValidator,
- 'application/xml': SaxOnlyValidator,
- 'text/xml': SaxOnlyValidator,
+ 'application/xml': htmlparser.SaxOnlyValidator,
+ 'text/xml': htmlparser.SaxOnlyValidator,
'text/plain': None,
'text/comma-separated-values': None,
'text/x-vcard': None,
@@ -109,68 +530,9 @@
'image/png': None,
}
# maps vid : validator name (override content_type_validators)
- vid_validators = dict((vid, VALMAP[valkey])
+ vid_validators = dict((vid, htmlparser.VALMAP[valkey])
for vid, valkey in VIEW_VALIDATORS.iteritems())
- no_auto_populate = ()
- ignored_relations = ()
-
- def custom_populate(self, how_many, cursor):
- pass
-
- def post_populate(self, cursor):
- pass
-
- @nocoverage
- def auto_populate(self, how_many):
- """this method populates the database with `how_many` entities
- of each possible type. It also inserts random relations between them
- """
- cu = self.cursor()
- self.custom_populate(how_many, cu)
- vreg = self.vreg
- howmanydict = how_many_dict(self.schema, cu, how_many, self.no_auto_populate)
- for etype in unprotected_entities(self.schema):
- if etype in self.no_auto_populate:
- continue
- nb = howmanydict.get(etype, how_many)
- for rql, args in insert_entity_queries(etype, self.schema, vreg, nb):
- cu.execute(rql, args)
- edict = {}
- for etype in unprotected_entities(self.schema, strict=True):
- rset = cu.execute('%s X' % etype)
- edict[str(etype)] = set(row[0] for row in rset.rows)
- existingrels = {}
- ignored_relations = SYSTEM_RELATIONS + self.ignored_relations
- for rschema in self.schema.relations():
- if rschema.is_final() or rschema in ignored_relations:
- continue
- rset = cu.execute('DISTINCT Any X,Y WHERE X %s Y' % rschema)
- existingrels.setdefault(rschema.type, set()).update((x, y) for x, y in rset)
- q = make_relations_queries(self.schema, edict, cu, ignored_relations,
- existingrels=existingrels)
- for rql, args in q:
- cu.execute(rql, args)
- self.post_populate(cu)
- self.commit()
-
- @nocoverage
- def _check_html(self, output, view, template='main-template'):
- """raises an exception if the HTML is invalid"""
- try:
- validatorclass = self.vid_validators[view.id]
- except KeyError:
- if template is None:
- default_validator = HTMLValidator
- else:
- default_validator = DTDValidator
- validatorclass = self.content_type_validators.get(view.content_type,
- default_validator)
- if validatorclass is None:
- return None
- validator = validatorclass()
- return validator.parse_string(output.strip())
-
def view(self, vid, rset=None, req=None, template='main-template',
**kwargs):
@@ -244,9 +606,145 @@
raise AssertionError, msg, tcbk
+ @nocoverage
+ def _check_html(self, output, view, template='main-template'):
+ """raises an exception if the HTML is invalid"""
+ try:
+ validatorclass = self.vid_validators[view.id]
+ except KeyError:
+ if template is None:
+ default_validator = htmlparser.HTMLValidator
+ else:
+ default_validator = htmlparser.DTDValidator
+ validatorclass = self.content_type_validators.get(view.content_type,
+ default_validator)
+ if validatorclass is None:
+ return None
+ validator = validatorclass()
+ return validator.parse_string(output.strip())
+
+ # deprecated ###############################################################
+
+ @deprecated('use self.vreg["etypes"].etype_class(etype)(self.request())')
+ def etype_instance(self, etype, req=None):
+ req = req or self.request()
+ e = self.vreg['etypes'].etype_class(etype)(req)
+ e.eid = None
+ return e
+
+ @nocoverage
+ @deprecated('use req = self.request(); rset = req.execute()')
+ def rset_and_req(self, rql, optional_args=None, args=None, eidkey=None):
+ """executes <rql>, builds a resultset, and returns a
+ couple (rset, req) where req is a FakeRequest
+ """
+ return (self.execute(rql, args, eidkey),
+ self.request(rql=rql, **optional_args or {}))
+
+
+# auto-populating test classes and utilities ###################################
+
+from cubicweb.devtools.fill import insert_entity_queries, make_relations_queries
+
+def how_many_dict(schema, cursor, how_many, skip):
+ """compute how many entities by type we need to be able to satisfy relations
+ cardinality
+ """
+ # compute how many entities by type we need to be able to satisfy relation constraint
+ relmap = {}
+ for rschema in schema.relations():
+ if rschema.is_final():
+ continue
+ for subj, obj in rschema.iter_rdefs():
+ card = rschema.rproperty(subj, obj, 'cardinality')
+ if card[0] in '1?' and len(rschema.subjects(obj)) == 1:
+ relmap.setdefault((rschema, subj), []).append(str(obj))
+ if card[1] in '1?' and len(rschema.objects(subj)) == 1:
+ relmap.setdefault((rschema, obj), []).append(str(subj))
+ unprotected = unprotected_entities(schema)
+ for etype in skip:
+ unprotected.add(etype)
+ howmanydict = {}
+ for etype in unprotected_entities(schema, strict=True):
+ howmanydict[str(etype)] = cursor.execute('Any COUNT(X) WHERE X is %s' % etype)[0][0]
+ if etype in unprotected:
+ howmanydict[str(etype)] += how_many
+ for (rschema, etype), targets in relmap.iteritems():
+ # XXX should 1. check no cycle 2. propagate changes
+ relfactor = sum(howmanydict[e] for e in targets)
+ howmanydict[str(etype)] = max(relfactor, howmanydict[etype])
+ return howmanydict
+
+
+class AutoPopulateTest(CubicWebTC):
+ """base class for test with auto-populating of the database"""
+ __abstract__ = True
+
+ pdbclass = CubicWebDebugger
+ # this is a hook to be able to define a list of rql queries
+ # that are application dependent and cannot be guessed automatically
+ application_rql = []
+
+ no_auto_populate = ()
+ ignored_relations = ()
+
def to_test_etypes(self):
return unprotected_entities(self.schema, strict=True)
+ def custom_populate(self, how_many, cursor):
+ pass
+
+ def post_populate(self, cursor):
+ pass
+
+ @nocoverage
+ def auto_populate(self, how_many):
+ """this method populates the database with `how_many` entities
+ of each possible type. It also inserts random relations between them
+ """
+ cu = self.cursor()
+ self.custom_populate(how_many, cu)
+ vreg = self.vreg
+ howmanydict = how_many_dict(self.schema, cu, how_many, self.no_auto_populate)
+ for etype in unprotected_entities(self.schema):
+ if etype in self.no_auto_populate:
+ continue
+ nb = howmanydict.get(etype, how_many)
+ for rql, args in insert_entity_queries(etype, self.schema, vreg, nb):
+ cu.execute(rql, args)
+ edict = {}
+ for etype in unprotected_entities(self.schema, strict=True):
+ rset = cu.execute('%s X' % etype)
+ edict[str(etype)] = set(row[0] for row in rset.rows)
+ existingrels = {}
+ ignored_relations = SYSTEM_RELATIONS | set(self.ignored_relations)
+ for rschema in self.schema.relations():
+ if rschema.is_final() or rschema in ignored_relations:
+ continue
+ rset = cu.execute('DISTINCT Any X,Y WHERE X %s Y' % rschema)
+ existingrels.setdefault(rschema.type, set()).update((x, y) for x, y in rset)
+ q = make_relations_queries(self.schema, edict, cu, ignored_relations,
+ existingrels=existingrels)
+ for rql, args in q:
+ cu.execute(rql, args)
+ self.post_populate(cu)
+ self.commit()
+
+ def iter_individual_rsets(self, etypes=None, limit=None):
+ etypes = etypes or self.to_test_etypes()
+ for etype in etypes:
+ if limit:
+ rql = 'Any X LIMIT %s WHERE X is %s' % (limit, etype)
+ else:
+ rql = 'Any X WHERE X is %s' % etype
+ rset = self.execute(rql)
+ for row in xrange(len(rset)):
+ if limit and row > limit:
+ break
+ # XXX iirk
+ rset2 = rset.limit(limit=1, offset=row)
+ yield rset2
+
def iter_automatic_rsets(self, limit=10):
"""generates basic resultsets for each entity type"""
etypes = self.to_test_etypes()
@@ -267,54 +765,6 @@
for rql in self.application_rql:
yield self.execute(rql)
-
- def list_views_for(self, rset):
- """returns the list of views that can be applied on `rset`"""
- req = rset.req
- only_once_vids = ('primary', 'secondary', 'text')
- req.data['ex'] = ValueError("whatever")
- viewsvreg = self.vreg['views']
- for vid, views in viewsvreg.items():
- if vid[0] == '_':
- continue
- if rset.rowcount > 1 and vid in only_once_vids:
- continue
- views = [view for view in views
- if view.category != 'startupview'
- and not issubclass(view, NotificationView)]
- if views:
- try:
- view = viewsvreg.select_best(views, req, rset=rset)
- if view.linkable():
- yield view
- else:
- not_selected(self.vreg, view)
- # else the view is expected to be used as subview and should
- # not be tested directly
- except NoSelectableObject:
- continue
-
- def list_actions_for(self, rset):
- """returns the list of actions that can be applied on `rset`"""
- req = rset.req
- for action in self.vreg['actions'].possible_objects(req, rset=rset):
- yield action
-
- def list_boxes_for(self, rset):
- """returns the list of boxes that can be applied on `rset`"""
- req = rset.req
- for box in self.vreg['boxes'].possible_objects(req, rset=rset):
- yield box
-
- def list_startup_views(self):
- """returns the list of startup views"""
- req = self.request()
- for view in self.vreg['views'].possible_views(req, None):
- if view.category == 'startupview':
- yield view.id
- else:
- not_selected(self.vreg, view)
-
def _test_everything_for(self, rset):
"""this method tries to find everything that can be tested
for `rset` and yields a callable test (as needed in generative tests)
@@ -342,8 +792,16 @@
return '%s_%s_%s' % ('_'.join(rset.column_types(0)), objid, objtype)
-class AutomaticWebTest(WebTest):
+# concrete class for automated application testing ############################
+
+class AutomaticWebTest(AutoPopulateTest):
"""import this if you wan automatic tests to be ran"""
+ def setUp(self):
+ AutoPopulateTest.setUp(self)
+ # access to self.app for proper initialization of the authentication
+ # machinery (else some views may fail)
+ self.app
+
## one each
def test_one_each_config(self):
self.auto_populate(1)
@@ -365,17 +823,7 @@
yield self.view, vid, None, req
-class RealDBTest(WebTest):
-
- def iter_individual_rsets(self, etypes=None, limit=None):
- etypes = etypes or unprotected_entities(self.schema, strict=True)
- for etype in etypes:
- rset = self.execute('Any X WHERE X is %s' % etype)
- for row in xrange(len(rset)):
- if limit and row > limit:
- break
- rset2 = rset.limit(limit=1, offset=row)
- yield rset2
+# registry instrumentization ###################################################
def not_selected(vreg, appobject):
try:
@@ -383,16 +831,17 @@
except (KeyError, AttributeError):
pass
+
def vreg_instrumentize(testclass):
+ # XXX broken
from cubicweb.devtools.apptest import TestEnvironment
- env = testclass._env = TestEnvironment('data', configcls=testclass.configcls,
- requestcls=testclass.requestcls)
+ env = testclass._env = TestEnvironment('data', configcls=testclass.configcls)
for reg in env.vreg.values():
reg._selected = {}
try:
orig_select_best = reg.__class__.__orig_select_best
except:
- orig_select_best = reg.__class__.select_best
+ orig_select_best = reg.__class__._select_best
def instr_select_best(self, *args, **kwargs):
selected = orig_select_best(self, *args, **kwargs)
try:
@@ -402,9 +851,10 @@
except AttributeError:
pass # occurs on reg used to restore database
return selected
- reg.__class__.select_best = instr_select_best
+ reg.__class__._select_best = instr_select_best
reg.__class__.__orig_select_best = orig_select_best
+
def print_untested_objects(testclass, skipregs=('hooks', 'etypes')):
for regname, reg in testclass._env.vreg.iteritems():
if regname in skipregs:
--- a/entities/authobjs.py Tue Aug 11 17:19:05 2009 +0200
+++ b/entities/authobjs.py Tue Aug 11 17:28:18 2009 +0200
@@ -44,7 +44,7 @@
try:
return self._groups
except AttributeError:
- self._groups = set(g.name for g in self.in_group)
+ self._groups = set(g.cwdb.name for g in self.cwdb.in_group)
return self._groups
@property
@@ -52,7 +52,8 @@
try:
return self._properties
except AttributeError:
- self._properties = dict((p.pkey, p.value) for p in self.reverse_for_user)
+ self._properties = dict((p.cwdb.pkey, p.cwdb.value)
+ for p in self.cwdb.reverse_for_user)
return self._properties
def property_value(self, key):
@@ -63,7 +64,8 @@
except KeyError:
pass
except ValueError:
- self.warning('incorrect value for eproperty %s of user %s', key, self.login)
+ self.warning('incorrect value for property %s of user %s', key,
+ self.cwdb.login)
return self.vreg.property_value(key)
def matching_groups(self, groups):
@@ -122,16 +124,17 @@
def name(self):
"""construct a name using firstname / surname or login if not defined"""
-
- if self.firstname and self.surname:
+ surname = self.get_value('surname')
+ firstname = self.get_value('firstname')
+ if firstname and surname:
return self.req._('%(firstname)s %(surname)s') % {
- 'firstname': self.firstname, 'surname' : self.surname}
- if self.firstname:
- return self.firstname
- return self.login
+ 'firstname': firstname, 'surname' : surname}
+ if firstname or surname:
+ return surname or firstname
+ return self.get_value('login')
def dc_title(self):
- return self.login
+ return self.get_value('login')
dc_long_title = name
--- a/entities/test/unittest_base.py Tue Aug 11 17:19:05 2009 +0200
+++ b/entities/test/unittest_base.py Tue Aug 11 17:28:18 2009 +0200
@@ -11,7 +11,7 @@
from logilab.common.decorators import clear_cache
from logilab.common.interface import implements
-from cubicweb.devtools.apptest import EnvBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
from cubicweb import ValidationError
from cubicweb.interfaces import IMileStone, IWorkflowable
@@ -20,7 +20,7 @@
from cubicweb.web.widgets import AutoCompletionWidget
-class BaseEntityTC(EnvBasedTC):
+class BaseEntityTC(CubicWebTC):
def setup_database(self):
self.member = self.create_user('member')
@@ -260,7 +260,7 @@
self.assertEquals(e.latest_trinfo().comment, 'deactivate 2')
-class InterfaceTC(EnvBasedTC):
+class InterfaceTC(CubicWebTC):
def test_nonregr_subclasses_and_mixins_interfaces(self):
self.failUnless(implements(CWUser, IWorkflowable))
@@ -275,7 +275,7 @@
self.failUnless(implements(MyUser_, IWorkflowable))
-class SpecializedEntityClassesTC(EnvBasedTC):
+class SpecializedEntityClassesTC(CubicWebTC):
def select_eclass(self, etype):
# clear selector cache
--- a/entity.py Tue Aug 11 17:19:05 2009 +0200
+++ b/entity.py Tue Aug 11 17:28:18 2009 +0200
@@ -132,14 +132,20 @@
return super(_metaentity, mcs).__new__(mcs, name, bases, classdict)
-class Entity(AppObject, dict):
- """an entity instance has e_schema automagically set on
- the class and instances has access to their issuing cursor.
+_CWDB_CLASSES = {}
+
+def cwdb___init__(self, entity):
+ self.entity = entity
- A property is set for each attribute and relation on each entity's type
- class. Becare that among attributes, 'eid' is *NEITHER* stored in the
- dict containment (which acts as a cache for other attributes dynamically
- fetched)
+class Entity(AppObject, dict):
+ """an entity instance has e_schema automagically set on the class.
+
+ Also its special cwdb attribute provides access to persistent attributes and
+ relation using properties which are set for each attribute and relation
+ according to entity's type schema.
+
+ Beware that among attributes, 'eid' is *NEITHER* stored in the dict
+ containment (which acts as a cache for other attributes dynamically fetched)
:type e_schema: `cubicweb.schema.EntitySchema`
:ivar e_schema: the entity's schema
@@ -177,24 +183,30 @@
etype = cls.id
assert etype != 'Any', etype
cls.e_schema = eschema = cls.schema.eschema(etype)
+ cwdbclsdict = {'__init__': cwdb___init__}
for rschema, _ in eschema.attribute_definitions():
- if rschema.type == 'eid':
+ rtype = rschema.type
+ if rtype == 'eid':
continue
- setattr(cls, rschema.type, Attribute(rschema.type))
+ setattr(cls, rtype, DeprecatedAttribute(rtype))
+ cwdbclsdict[rtype] = Attribute(rtype)
mixins = []
- for rschema, _, x in eschema.relation_definitions():
- if (rschema, x) in MI_REL_TRIGGERS:
- mixin = MI_REL_TRIGGERS[(rschema, x)]
+ for rschema, _, role in eschema.relation_definitions():
+ if (rschema, role) in MI_REL_TRIGGERS:
+ mixin = MI_REL_TRIGGERS[(rschema, role)]
if not (issubclass(cls, mixin) or mixin in mixins): # already mixed ?
mixins.append(mixin)
for iface in getattr(mixin, '__implements__', ()):
if not interface.implements(cls, iface):
interface.extend(cls, iface)
- if x == 'subject':
- setattr(cls, rschema.type, SubjectRelation(rschema))
+ rtype = rschema.type
+ if role == 'object':
+ attr = 'reverse_%s' % rtype
else:
- attr = 'reverse_%s' % rschema.type
- setattr(cls, attr, ObjectRelation(rschema))
+ attr = rtype
+ setattr(cls, attr, DeprecatedRelation(rtype, role))
+ cwdbclsdict[attr] = Relation(rtype, role)
+ _CWDB_CLASSES[etype] = type(etype + 'CWDB', (object,), cwdbclsdict)
if mixins:
cls.__bases__ = tuple(mixins + [p for p in cls.__bases__ if not p is object])
cls.debug('plugged %s mixins on %s', mixins, etype)
@@ -299,6 +311,8 @@
else:
self.eid = None
self._is_saved = True
+ if self.id != 'Any':
+ self.cwdb = _CWDB_CLASSES[self.id](self)
def __repr__(self):
return '<Entity %s %s %s at %s>' % (
@@ -310,6 +324,15 @@
def __hash__(self):
return id(self)
+ def cwgetattr(self, attr, default=_marker):
+ """return attribute from either self.cwdb or self"""
+ try:
+ return getattr(self.cwdb, attr)
+ except AttributeError:
+ if default is _marker:
+ return getattr(self, attr)
+ return getattr(self, attr, default)
+
def pre_add_hook(self):
"""hook called by the repository before doing anything to add the entity
(before_add entity hooks have not been called yet). This give the
@@ -392,7 +415,7 @@
etype = str(self.e_schema)
path = etype.lower()
if mainattr != 'eid':
- value = getattr(self, mainattr)
+ value = getattr(self.cwdb, mainattr)
if value is None or unicode(value) == u'':
mainattr = 'eid'
path += '/eid'
@@ -413,7 +436,7 @@
def attr_metadata(self, attr, metadata):
"""return a metadata for an attribute (None if unspecified)"""
- value = getattr(self, '%s_%s' % (attr, metadata), None)
+ value = self.cwgetattr('%s_%s' % (attr, metadata), None)
if value is None and metadata == 'encoding':
value = self.vreg.property_value('ui.encoding')
return value
@@ -425,14 +448,17 @@
"""
attr = str(attr)
if value is _marker:
- value = getattr(self, attr)
+ value = self.cwgetattr(attr)
if isinstance(value, basestring):
value = value.strip()
if value is None or value == '': # don't use "not", 0 is an acceptable value
return u''
if attrtype is None:
attrtype = self.e_schema.destination(attr)
- props = self.e_schema.rproperties(attr)
+ try:
+ props = self.e_schema.rproperties(attr)
+ except KeyError:
+ props = {}
if attrtype == 'String':
# internalinalized *and* formatted string such as schema
# description...
@@ -473,13 +499,20 @@
assert self.has_eid()
execute = self.req.execute
for rschema in self.e_schema.subject_relations():
- if rschema.is_final() or rschema.meta:
+ # skip final, meta or composite relation
+ if rschema.is_final() or rschema.meta or self.e_schema.subjrproperty(rschema, 'composite'):
+ continue
+ # skip relation with card in ?1 else we either change the copied
+ # object (inlined relation) or inserting some inconsistency
+ if self.e_schema.subjrproperty(rschema, 'cardinality')[1] in '?1':
+ continue
+ # skip if we're told to do so
+ if rschema.type in self.skip_copy_for:
continue
# skip already defined relations
- if getattr(self, rschema.type):
+ if self.related(rschema.type, 'subject'):
continue
- if rschema.type in self.skip_copy_for:
- continue
+ # special case for in_state
if rschema.type == 'in_state':
# if the workflow is defining an initial state (XXX AND we are
# not in the managers group? not done to be more consistent)
@@ -487,32 +520,26 @@
if execute('Any S WHERE S state_of ET, ET initial_state S,'
'ET name %(etype)s', {'etype': str(self.e_schema)}):
continue
- # skip composite relation
- if self.e_schema.subjrproperty(rschema, 'composite'):
- continue
- # skip relation with card in ?1 else we either change the copied
- # object (inlined relation) or inserting some inconsistency
- if self.e_schema.subjrproperty(rschema, 'cardinality')[1] in '?1':
- continue
rql = 'SET X %s V WHERE X eid %%(x)s, Y eid %%(y)s, Y %s V' % (
- rschema.type, rschema.type)
+ rschema, rschema)
execute(rql, {'x': self.eid, 'y': ceid}, ('x', 'y'))
self.clear_related_cache(rschema.type, 'subject')
for rschema in self.e_schema.object_relations():
- if rschema.meta:
- continue
- # skip already defined relations
- if getattr(self, 'reverse_%s' % rschema.type):
- continue
- # skip composite relation
- if self.e_schema.objrproperty(rschema, 'composite'):
+ # skip meta or composite
+ if rschema.meta or self.e_schema.objrproperty(rschema, 'composite'):
continue
# skip relation with card in ?1 else we either change the copied
# object (inlined relation) or inserting some inconsistency
if self.e_schema.objrproperty(rschema, 'cardinality')[0] in '?1':
continue
+ # skip if we're told to do so
+ if rschema.type in self.skip_copy_for:
+ continue
+ # skip already defined relations
+ if self.related(rschema.type, 'object'):
+ continue
rql = 'SET V %s X WHERE X eid %%(x)s, Y eid %%(y)s, V %s Y' % (
- rschema.type, rschema.type)
+ rschema, rschema)
execute(rql, {'x': self.eid, 'y': ceid}, ('x', 'y'))
self.clear_related_cache(rschema.type, 'object')
@@ -877,9 +904,9 @@
yielded = False
for rschema, target in containers:
if target == 'object':
- targets = getattr(self, rschema.type)
+ targets = self.related(rschema.type, 'subject', entities=True)
else:
- targets = getattr(self, 'reverse_%s' % rschema)
+ targets = self.related(rschema.type, 'object', entities=True)
for entity in targets:
if entity.eid in _done:
continue
@@ -915,61 +942,73 @@
words += tokenize(value)
for rschema, role in self.e_schema.fulltext_relations():
- if role == 'subject':
- for entity in getattr(self, rschema.type):
- words += entity.get_words()
- else: # if role == 'object':
- for entity in getattr(self, 'reverse_%s' % rschema.type):
- words += entity.get_words()
+ for entity in self.related(rschema.type, role, entities=True):
+ words += entity.get_words()
return words
# attribute and relation descriptors ##########################################
+
class Attribute(object):
"""descriptor that controls schema attribute access"""
- def __init__(self, attrname):
- assert attrname != 'eid'
- self._attrname = attrname
+ def __init__(self, rtype):
+ assert rtype != 'eid'
+ self._rtype = rtype
+
+ def __get__(self, cwdbobj, eclass):
+ if cwdbobj is None:
+ return self
+ return cwdbobj.entity.get_value(self._rtype)
+
+ def __set__(self, eobj, value):
+ raise NotImplementedError
+
+
+class Relation(Attribute):
+ """descriptor that controls schema relation access"""
+
+ def __init__(self, rtype, role):
+ super(Relation, self).__init__(rtype)
+ self._role = role
+
+ def __get__(self, cwdbobj, eclass):
+ if cwdbobj is None:
+ raise AttributeError('%s cannot be only be accessed from instances'
+ % self._rtype)
+ return cwdbobj.entity.related(self._rtype, self._role, entities=True)
+
+
+class DeprecatedAttribute(Attribute):
+ """descriptor that controls schema attribute access"""
def __get__(self, eobj, eclass):
+ rtype = self._rtype
+ warn('entity.%s is deprecated, use entity.cwdb.%s' % (rtype, rtype),
+ DeprecationWarning, stacklevel=2)
if eobj is None:
return self
- return eobj.get_value(self._attrname)
+ return eobj.get_value(self._rtype)
def __set__(self, eobj, value):
# XXX bw compat
# would be better to generate UPDATE queries than the current behaviour
eobj.warning("deprecated usage, don't use 'entity.attr = val' notation)")
- eobj[self._attrname] = value
+ eobj[self._rtype] = value
-class Relation(object):
+class DeprecatedRelation(Relation):
"""descriptor that controls schema relation access"""
- _role = None # for pylint
-
- def __init__(self, rschema):
- self._rschema = rschema
- self._rtype = rschema.type
def __get__(self, eobj, eclass):
+ rtype = self._rtype
+ warn('entity.[reverse_]%s is deprecated, use entity.cwdb.[reverse_]%s'
+ % (rtype, rtype), DeprecationWarning, stacklevel=2)
if eobj is None:
raise AttributeError('%s cannot be only be accessed from instances'
% self._rtype)
- return eobj.related(self._rtype, self._role, entities=True)
-
- def __set__(self, eobj, value):
- raise NotImplementedError
-
-
-class SubjectRelation(Relation):
- """descriptor that controls schema relation access"""
- _role = 'subject'
-
-class ObjectRelation(Relation):
- """descriptor that controls schema relation access"""
- _role = 'object'
+ return eobj.related(rtype, self._role, entities=True)
from logging import getLogger
from cubicweb import set_log_methods
--- a/etwist/server.py Tue Aug 11 17:19:05 2009 +0200
+++ b/etwist/server.py Tue Aug 11 17:28:18 2009 +0200
@@ -116,7 +116,7 @@
start_task(interval, self.appli.session_handler.clean_sessions)
def set_url_rewriter(self):
- self.url_rewriter = self.appli.vreg['components'].select_object('urlrewriter')
+ self.url_rewriter = self.appli.vreg['components'].select_or_none('urlrewriter')
def shutdown_event(self):
"""callback fired when the server is shutting down to properly
--- a/etwist/test/unittest_server.py Tue Aug 11 17:19:05 2009 +0200
+++ b/etwist/test/unittest_server.py Tue Aug 11 17:28:18 2009 +0200
@@ -5,11 +5,11 @@
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
"""
-from cubicweb.devtools.apptest import EnvBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.etwist.server import host_prefixed_baseurl
-class HostPrefixedBaseURLTC(EnvBasedTC):
+class HostPrefixedBaseURLTC(CubicWebTC):
def _check(self, baseurl, host, waited):
self.assertEquals(host_prefixed_baseurl(baseurl, host), waited,
--- a/ext/test/unittest_rest.py Tue Aug 11 17:19:05 2009 +0200
+++ b/ext/test/unittest_rest.py Tue Aug 11 17:28:18 2009 +0200
@@ -6,11 +6,11 @@
:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
"""
from logilab.common.testlib import unittest_main
-from cubicweb.devtools.apptest import EnvBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.ext.rest import rest_publish
-class RestTC(EnvBasedTC):
+class RestTC(CubicWebTC):
def context(self):
return self.execute('CWUser X WHERE X login "admin"').get_entity(0, 0)
--- a/goa/test/unittest_editcontroller.py Tue Aug 11 17:19:05 2009 +0200
+++ b/goa/test/unittest_editcontroller.py Tue Aug 11 17:28:18 2009 +0200
@@ -401,11 +401,11 @@
# which fires a Redirect
# 2/ When re-publishing the copy form, the publisher implicitly commits
try:
- self.env.app.publish('edit', self.req)
+ self.app.publish('edit', self.req)
except Redirect:
self.req.form['rql'] = 'Any X WHERE X eid %s' % p.eid
self.req.form['vid'] = 'copy'
- self.env.app.publish('view', self.req)
+ self.app.publish('view', self.req)
rset = self.req.execute('CWUser P WHERE P surname "Boom"')
self.assertEquals(len(rset), 0)
finally:
--- a/server/hooks.py Tue Aug 11 17:19:05 2009 +0200
+++ b/server/hooks.py Tue Aug 11 17:28:18 2009 +0200
@@ -67,7 +67,7 @@
if self.entity.eid in session.transaction_data.get('pendingeids', ()):
# entity have been created and deleted in the same transaction
return
- if not self.entity.created_by:
+ if not self.entity.related('created_by'):
session.add_relation(self.entity.eid, 'created_by', session.user.eid)
--- a/server/repository.py Tue Aug 11 17:19:05 2009 +0200
+++ b/server/repository.py Tue Aug 11 17:28:18 2009 +0200
@@ -687,7 +687,7 @@
if props is not None:
self.set_session_props(sessionid, props)
user = session.user
- return user.eid, user.login, user.groups, user.properties
+ return user.eid, user.cwdb.login, user.groups, user.properties
def set_session_props(self, sessionid, props):
"""this method should be used by client to:
--- a/server/session.py Tue Aug 11 17:19:05 2009 +0200
+++ b/server/session.py Tue Aug 11 17:28:18 2009 +0200
@@ -48,7 +48,7 @@
def __init__(self, user, repo, cnxprops=None, _id=None):
super(Session, self).__init__(repo.vreg)
- self.id = _id or make_uid(user.login.encode('UTF8'))
+ self.id = _id or make_uid(user.cwdb.login.encode('UTF8'))
cnxprops = cnxprops or ConnectionProperties('inmemory')
self.user = user
self.repo = repo
--- a/server/test/unittest_checkintegrity.py Tue Aug 11 17:19:05 2009 +0200
+++ b/server/test/unittest_checkintegrity.py Tue Aug 11 17:28:18 2009 +0200
@@ -13,7 +13,7 @@
from cubicweb.server.checkintegrity import check
-repo, cnx = init_test_database('sqlite')
+repo, cnx = init_test_database()
class CheckIntegrityTC(TestCase):
def test(self):
--- a/server/test/unittest_hookhelper.py Tue Aug 11 17:19:05 2009 +0200
+++ b/server/test/unittest_hookhelper.py Tue Aug 11 17:28:18 2009 +0200
@@ -8,18 +8,28 @@
"""
from logilab.common.testlib import unittest_main
-from cubicweb.devtools.apptest import RepositoryBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.server.pool import LateOperation, Operation, SingleLastOperation
from cubicweb.server.hookhelper import *
+from cubicweb.server import hooks, schemahooks
-class HookHelpersTC(RepositoryBasedTC):
+def clean_session_ops(func):
+ def wrapper(self, *args, **kwargs):
+ try:
+ return func(self, *args, **kwargs)
+ finally:
+ self.session.pending_operations[:] = []
+ return wrapper
+
+class HookHelpersTC(CubicWebTC):
def setUp(self):
- RepositoryBasedTC.setUp(self)
+ CubicWebTC.setUp(self)
self.hm = self.repo.hm
+ @clean_session_ops
def test_late_operation(self):
session = self.session
l1 = LateOperation(session)
@@ -27,6 +37,7 @@
l3 = Operation(session)
self.assertEquals(session.pending_operations, [l3, l1, l2])
+ @clean_session_ops
def test_single_last_operation(self):
session = self.session
l0 = SingleLastOperation(session)
@@ -37,8 +48,8 @@
l4 = SingleLastOperation(session)
self.assertEquals(session.pending_operations, [l3, l1, l2, l4])
+ @clean_session_ops
def test_global_operation_order(self):
- from cubicweb.server import hooks, schemahooks
session = self.session
op1 = hooks.DelayedDeleteOp(session)
op2 = schemahooks.MemSchemaRDefDel(session)
@@ -80,10 +91,7 @@
self.assertEquals(len(searchedops), 1,
self.session.pending_operations)
self.commit()
- searchedops = [op for op in self.session.pending_operations
- if isinstance(op, SendMailOp)]
- self.assertEquals(len(searchedops), 0,
- self.session.pending_operations)
+ self.assertEquals([], self.session.pending_operations)
if __name__ == '__main__':
unittest_main()
--- a/server/test/unittest_hooks.py Tue Aug 11 17:19:05 2009 +0200
+++ b/server/test/unittest_hooks.py Tue Aug 11 17:28:18 2009 +0200
@@ -11,7 +11,7 @@
from cubicweb import (ConnectionError, RepositoryError, ValidationError,
AuthenticationError, BadConnectionId)
-from cubicweb.devtools.apptest import RepositoryBasedTC, get_versions
+from cubicweb.devtools.testlib import CubicWebTC, get_versions
from cubicweb.server.sqlutils import SQL_PREFIX
from cubicweb.server.repository import Repository
@@ -26,7 +26,7 @@
-class CoreHooksTC(RepositoryBasedTC):
+class CoreHooksTC(CubicWebTC):
def test_delete_internal_entities(self):
self.assertRaises(RepositoryError, self.execute,
@@ -62,7 +62,7 @@
def test_delete_if_singlecard1(self):
self.assertEquals(self.repo.schema['in_state'].inlined, False)
- ueid = self.create_user('toto')
+ ueid = self.create_user('toto').eid
self.commit()
self.execute('SET X in_state S WHERE S name "deactivated", X eid %(x)s', {'x': ueid})
rset = self.execute('Any S WHERE X in_state S, X eid %(x)s', {'x': ueid})
@@ -156,7 +156,7 @@
-class UserGroupHooksTC(RepositoryBasedTC):
+class UserGroupHooksTC(CubicWebTC):
def test_user_synchronization(self):
self.create_user('toto', password='hop', commit=False)
@@ -164,7 +164,7 @@
self.repo.connect, u'toto', 'hop')
self.commit()
cnxid = self.repo.connect(u'toto', 'hop')
- self.failIfEqual(cnxid, self.cnxid)
+ self.failIfEqual(cnxid, self.session.id)
self.execute('DELETE CWUser X WHERE X login "toto"')
self.repo.execute(cnxid, 'State X')
self.commit()
@@ -184,7 +184,7 @@
self.assertEquals(user.groups, set(('managers',)))
def test_user_composite_owner(self):
- ueid = self.create_user('toto')
+ ueid = self.create_user('toto').eid
# composite of euser should be owned by the euser regardless of who created it
self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", U use_email X '
'WHERE U login "toto"')
@@ -200,7 +200,7 @@
self.failIf(self.execute('Any X WHERE X created_by Y, X eid >= %(x)s', {'x': eid}))
-class CWPropertyHooksTC(RepositoryBasedTC):
+class CWPropertyHooksTC(CubicWebTC):
def test_unexistant_eproperty(self):
ex = self.assertRaises(ValidationError,
@@ -224,7 +224,7 @@
self.assertEquals(ex.errors, {'value': u'unauthorized value'})
-class SchemaHooksTC(RepositoryBasedTC):
+class SchemaHooksTC(CubicWebTC):
def test_duplicate_etype_error(self):
# check we can't add a CWEType or CWRType entity if it already exists one
@@ -246,24 +246,23 @@
self.assertEquals(ex.errors, {'login': 'the value "admin" is already used, use another one'})
-class SchemaModificationHooksTC(RepositoryBasedTC):
+class SchemaModificationHooksTC(CubicWebTC):
- def setUp(self):
- if not hasattr(self, '_repo'):
- # first initialization
- repo = self.repo # set by the RepositoryBasedTC metaclass
- # force to read schema from the database to get proper eid set on schema instances
- repo.config._cubes = None
- repo.fill_schema()
- RepositoryBasedTC.setUp(self)
+ @classmethod
+ def init_config(cls, config):
+ super(SchemaModificationHooksTC, cls).init_config(config)
+ config._cubes = None
+ cls.repo.fill_schema()
def index_exists(self, etype, attr, unique=False):
+ self.session.set_pool()
dbhelper = self.session.pool.source('system').dbhelper
sqlcursor = self.session.pool['system']
return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique)
def test_base(self):
schema = self.repo.schema
+ self.session.set_pool()
dbhelper = self.session.pool.source('system').dbhelper
sqlcursor = self.session.pool['system']
self.failIf(schema.has_entity('Societe2'))
@@ -381,6 +380,7 @@
# schema modification hooks tests #########################################
def test_uninline_relation(self):
+ self.session.set_pool()
dbhelper = self.session.pool.source('system').dbhelper
sqlcursor = self.session.pool['system']
# Personne inline2 Affaire inline
@@ -415,6 +415,7 @@
self.assertEquals(rset.rows[0], [peid, aeid])
def test_indexed_change(self):
+ self.session.set_pool()
dbhelper = self.session.pool.source('system').dbhelper
sqlcursor = self.session.pool['system']
try:
@@ -433,6 +434,7 @@
self.failIf(self.index_exists('Affaire', 'sujet'))
def test_unique_change(self):
+ self.session.set_pool()
dbhelper = self.session.pool.source('system').dbhelper
sqlcursor = self.session.pool['system']
try:
@@ -481,10 +483,9 @@
self.commit()
-class WorkflowHooksTC(RepositoryBasedTC):
+class WorkflowHooksTC(CubicWebTC):
- def setUp(self):
- RepositoryBasedTC.setUp(self)
+ def setup_database(self):
self.s_activated = self.execute('State X WHERE X name "activated"')[0][0]
self.s_deactivated = self.execute('State X WHERE X name "deactivated"')[0][0]
self.s_dummy = self.execute('INSERT State X: X name "dummy", X state_of E WHERE E name "CWUser"')[0][0]
@@ -493,12 +494,6 @@
# so we can test wf enforcing on euser (managers don't have anymore this
# enforcement
self.execute('SET X require_group G WHERE G name "users", X transition_of ET, ET name "CWUser"')
- self.commit()
-
- def tearDown(self):
- self.execute('DELETE X require_group G WHERE G name "users", X transition_of ET, ET name "CWUser"')
- self.commit()
- RepositoryBasedTC.tearDown(self)
def test_set_initial_state(self):
ueid = self.execute('INSERT CWUser E: E login "x", E upassword "x", E in_group G '
@@ -511,13 +506,12 @@
self.assertEquals(initialstate, u'activated')
def test_initial_state(self):
- cnx = self.login('stduser')
- cu = cnx.cursor()
- self.assertRaises(ValidationError, cu.execute,
+ self.login('stduser')
+ self.assertRaises(ValidationError, self.execute,
'INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, '
'X in_state S WHERE S name "deactivated"', {'pwd': 'oops'})
- cnx.close()
# though managers can do whatever he want
+ self.restore_connection()
self.execute('INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, '
'X in_state S, X in_group G WHERE S name "deactivated", G name "users"', {'pwd': 'oops'})
self.commit()
@@ -526,7 +520,7 @@
def test_transition_checking1(self):
cnx = self.login('stduser')
cu = cnx.cursor()
- ueid = cnx.user(self.current_session()).eid
+ ueid = cnx.user(self.session).eid
self.assertRaises(ValidationError,
cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
{'x': ueid, 's': self.s_activated}, 'x')
@@ -535,7 +529,7 @@
def test_transition_checking2(self):
cnx = self.login('stduser')
cu = cnx.cursor()
- ueid = cnx.user(self.current_session()).eid
+ ueid = cnx.user(self.session).eid
self.assertRaises(ValidationError,
cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
{'x': ueid, 's': self.s_dummy}, 'x')
@@ -544,7 +538,7 @@
def test_transition_checking3(self):
cnx = self.login('stduser')
cu = cnx.cursor()
- ueid = cnx.user(self.current_session()).eid
+ ueid = cnx.user(self.session).eid
cu.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
{'x': ueid, 's': self.s_deactivated}, 'x')
cnx.commit()
@@ -560,7 +554,7 @@
def test_transition_checking4(self):
cnx = self.login('stduser')
cu = cnx.cursor()
- ueid = cnx.user(self.current_session()).eid
+ ueid = cnx.user(self.session).eid
cu.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
{'x': ueid, 's': self.s_deactivated}, 'x')
cnx.commit()
@@ -602,7 +596,7 @@
self.assertEquals(tr.owned_by[0].login, 'admin')
def test_transition_information_on_creation(self):
- ueid = self.create_user('toto')
+ ueid = self.create_user('toto').eid
rset = self.execute('TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid})
self.assertEquals(len(rset), 1)
tr = rset.get_entity(0, 0)
--- a/server/test/unittest_hooksmanager.py Tue Aug 11 17:19:05 2009 +0200
+++ b/server/test/unittest_hooksmanager.py Tue Aug 11 17:28:18 2009 +0200
@@ -6,7 +6,7 @@
from cubicweb.server.hooksmanager import HooksManager, Hook
from cubicweb.devtools import TestServerConfiguration
-from cubicweb.devtools.apptest import RepositoryBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
class HookCalled(Exception): pass
@@ -144,7 +144,7 @@
self.called.append((subject, r_type, object))
-class SystemHooksTC(RepositoryBasedTC):
+class SystemHooksTC(CubicWebTC):
def test_startup_shutdown(self):
import hooks # cubicweb/server/test/data/hooks.py
@@ -168,7 +168,7 @@
events = ('whatever', 'another')
accepts = ('Societe', 'Division')
-class HookTC(RepositoryBasedTC):
+class HookTC(CubicWebTC):
def test_inheritance(self):
self.assertEquals(list(MyHook.register_to()),
zip(repeat('whatever'), ('Societe', 'Division', 'SubDivision'))
--- a/server/test/unittest_ldapuser.py Tue Aug 11 17:19:05 2009 +0200
+++ b/server/test/unittest_ldapuser.py Tue Aug 11 17:28:18 2009 +0200
@@ -7,8 +7,8 @@
"""
from logilab.common.testlib import TestCase, unittest_main, mock_object
-from cubicweb.devtools import init_test_database, TestServerConfiguration
-from cubicweb.devtools.apptest import RepositoryBasedTC
+from cubicweb.devtools import TestServerConfiguration
+from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.devtools.repotest import RQLGeneratorTC
from cubicweb.server.sources.ldapuser import *
@@ -34,36 +34,31 @@
-config = TestServerConfiguration('data')
-config.sources_file = lambda : 'data/sourcesldap'
-repo, cnx = init_test_database('sqlite', config=config)
-
-class LDAPUserSourceTC(RepositoryBasedTC):
- repo, cnx = repo, cnx
+class LDAPUserSourceTC(CubicWebTC):
+ config = TestServerConfiguration('data')
+ config.sources_file = lambda : 'data/sourcesldap'
def patch_authenticate(self):
self._orig_authenticate = LDAPUserSource.authenticate
LDAPUserSource.authenticate = nopwd_authenticate
- def setUp(self):
- self._prepare()
+ def setup_database(self):
# XXX: need this first query else we get 'database is locked' from
# sqlite since it doesn't support multiple connections on the same
# database
# so doing, ldap inserted users don't get removed between each test
- rset = self.execute('CWUser X')
- self.commit()
+ rset = self.sexecute('CWUser X')
# check we get some users from ldap
self.assert_(len(rset) > 1)
- self.maxeid = self.execute('Any MAX(X)')[0][0]
def tearDown(self):
if hasattr(self, '_orig_authenticate'):
LDAPUserSource.authenticate = self._orig_authenticate
- RepositoryBasedTC.tearDown(self)
+ CubicWebTC.tearDown(self)
def test_authenticate(self):
source = self.repo.sources_by_uri['ldapuser']
+ self.session.set_pool()
self.assertRaises(AuthenticationError,
source.authenticate, self.session, 'toto', 'toto')
@@ -73,7 +68,7 @@
def test_base(self):
# check a known one
- e = self.execute('CWUser X WHERE X login "syt"').get_entity(0, 0)
+ e = self.sexecute('CWUser X WHERE X login "syt"').get_entity(0, 0)
self.assertEquals(e.login, 'syt')
e.complete()
self.assertEquals(e.creation_date, None)
@@ -85,73 +80,73 @@
self.assertEquals(e.created_by, [])
self.assertEquals(e.primary_email[0].address, 'Sylvain Thenault')
# email content should be indexed on the user
- rset = self.execute('CWUser X WHERE X has_text "thenault"')
+ rset = self.sexecute('CWUser X WHERE X has_text "thenault"')
self.assertEquals(rset.rows, [[e.eid]])
def test_not(self):
- eid = self.execute('CWUser X WHERE X login "syt"')[0][0]
- rset = self.execute('CWUser X WHERE NOT X eid %s' % eid)
+ eid = self.sexecute('CWUser X WHERE X login "syt"')[0][0]
+ rset = self.sexecute('CWUser X WHERE NOT X eid %s' % eid)
self.assert_(rset)
self.assert_(not eid in (r[0] for r in rset))
def test_multiple(self):
- seid = self.execute('CWUser X WHERE X login "syt"')[0][0]
- aeid = self.execute('CWUser X WHERE X login "adim"')[0][0]
- rset = self.execute('CWUser X, Y WHERE X login "syt", Y login "adim"')
+ seid = self.sexecute('CWUser X WHERE X login "syt"')[0][0]
+ aeid = self.sexecute('CWUser X WHERE X login "adim"')[0][0]
+ rset = self.sexecute('CWUser X, Y WHERE X login "syt", Y login "adim"')
self.assertEquals(rset.rows, [[seid, aeid]])
- rset = self.execute('Any X,Y,L WHERE X login L, X login "syt", Y login "adim"')
+ rset = self.sexecute('Any X,Y,L WHERE X login L, X login "syt", Y login "adim"')
self.assertEquals(rset.rows, [[seid, aeid, 'syt']])
def test_in(self):
- seid = self.execute('CWUser X WHERE X login "syt"')[0][0]
- aeid = self.execute('CWUser X WHERE X login "adim"')[0][0]
- rset = self.execute('Any X,L ORDERBY L WHERE X login IN("syt", "adim"), X login L')
+ seid = self.sexecute('CWUser X WHERE X login "syt"')[0][0]
+ aeid = self.sexecute('CWUser X WHERE X login "adim"')[0][0]
+ rset = self.sexecute('Any X,L ORDERBY L WHERE X login IN("syt", "adim"), X login L')
self.assertEquals(rset.rows, [[aeid, 'adim'], [seid, 'syt']])
def test_relations(self):
- eid = self.execute('CWUser X WHERE X login "syt"')[0][0]
- rset = self.execute('Any X,E WHERE X is CWUser, X login L, X primary_email E')
+ eid = self.sexecute('CWUser X WHERE X login "syt"')[0][0]
+ rset = self.sexecute('Any X,E WHERE X is CWUser, X login L, X primary_email E')
self.assert_(eid in (r[0] for r in rset))
- rset = self.execute('Any X,L,E WHERE X is CWUser, X login L, X primary_email E')
+ rset = self.sexecute('Any X,L,E WHERE X is CWUser, X login L, X primary_email E')
self.assert_('syt' in (r[1] for r in rset))
def test_count(self):
- nbusers = self.execute('Any COUNT(X) WHERE X is CWUser')[0][0]
+ nbusers = self.sexecute('Any COUNT(X) WHERE X is CWUser')[0][0]
# just check this is a possible number
self.assert_(nbusers > 1, nbusers)
self.assert_(nbusers < 30, nbusers)
def test_upper(self):
- eid = self.execute('CWUser X WHERE X login "syt"')[0][0]
- rset = self.execute('Any UPPER(L) WHERE X eid %s, X login L' % eid)
+ eid = self.sexecute('CWUser X WHERE X login "syt"')[0][0]
+ rset = self.sexecute('Any UPPER(L) WHERE X eid %s, X login L' % eid)
self.assertEquals(rset[0][0], 'SYT')
def test_unknown_attr(self):
- eid = self.execute('CWUser X WHERE X login "syt"')[0][0]
- rset = self.execute('Any L,C,M WHERE X eid %s, X login L, '
+ eid = self.sexecute('CWUser X WHERE X login "syt"')[0][0]
+ rset = self.sexecute('Any L,C,M WHERE X eid %s, X login L, '
'X creation_date C, X modification_date M' % eid)
self.assertEquals(rset[0][0], 'syt')
self.assertEquals(rset[0][1], None)
self.assertEquals(rset[0][2], None)
def test_sort(self):
- logins = [l for l, in self.execute('Any L ORDERBY L WHERE X login L')]
+ logins = [l for l, in self.sexecute('Any L ORDERBY L WHERE X login L')]
self.assertEquals(logins, sorted(logins))
def test_lower_sort(self):
- logins = [l for l, in self.execute('Any L ORDERBY lower(L) WHERE X login L')]
+ logins = [l for l, in self.sexecute('Any L ORDERBY lower(L) WHERE X login L')]
self.assertEquals(logins, sorted(logins))
def test_or(self):
- rset = self.execute('DISTINCT Any X WHERE X login "syt" OR (X in_group G, G name "managers")')
+ rset = self.sexecute('DISTINCT Any X WHERE X login "syt" OR (X in_group G, G name "managers")')
self.assertEquals(len(rset), 2, rset.rows) # syt + admin
def test_nonregr_set_owned_by(self):
# test that when a user coming from ldap is triggering a transition
# the related TrInfo has correct owner information
- self.execute('SET X in_group G WHERE X login "syt", G name "managers"')
+ self.sexecute('SET X in_group G WHERE X login "syt", G name "managers"')
self.commit()
- syt = self.execute('CWUser X WHERE X login "syt"').get_entity(0, 0)
+ syt = self.sexecute('CWUser X WHERE X login "syt"').get_entity(0, 0)
self.assertEquals([g.name for g in syt.in_group], ['managers', 'users'])
self.patch_authenticate()
cnx = self.login('syt', 'dummypassword')
@@ -159,12 +154,12 @@
cu.execute('SET X in_state S WHERE X login "alf", S name "deactivated"')
try:
cnx.commit()
- alf = self.execute('CWUser X WHERE X login "alf"').get_entity(0, 0)
+ alf = self.sexecute('CWUser X WHERE X login "alf"').get_entity(0, 0)
self.assertEquals(alf.in_state[0].name, 'deactivated')
trinfo = alf.latest_trinfo()
self.assertEquals(trinfo.owned_by[0].login, 'syt')
# select from_state to skip the user's creation TrInfo
- rset = self.execute('Any U ORDERBY D DESC WHERE WF wf_info_for X,'
+ rset = self.sexecute('Any U ORDERBY D DESC WHERE WF wf_info_for X,'
'WF creation_date D, WF from_state FS,'
'WF owned_by U?, X eid %(x)s',
{'x': alf.eid}, 'x')
@@ -172,76 +167,76 @@
finally:
# restore db state
self.restore_connection()
- self.execute('SET X in_state S WHERE X login "alf", S name "activated"')
- self.execute('DELETE X in_group G WHERE X login "syt", G name "managers"')
+ self.sexecute('SET X in_state S WHERE X login "alf", S name "activated"')
+ self.sexecute('DELETE X in_group G WHERE X login "syt", G name "managers"')
def test_same_column_names(self):
- self.execute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')
+ self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')
def test_multiple_entities_from_different_sources(self):
- self.create_user('cochon')
- self.failUnless(self.execute('Any X,Y WHERE X login "syt", Y login "cochon"'))
+ self.create_user('cochon', req=self.session)
+ self.failUnless(self.sexecute('Any X,Y WHERE X login "syt", Y login "cochon"'))
def test_exists1(self):
- self.add_entity('CWGroup', name=u'bougloup1')
- self.add_entity('CWGroup', name=u'bougloup2')
- self.execute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"')
- self.execute('SET U in_group G WHERE G name = "bougloup1", U login "syt"')
- rset = self.execute('Any L,SN ORDERBY L WHERE X in_state S, S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")')
+ self.add_entity('CWGroup', name=u'bougloup1', req=self.session)
+ self.add_entity('CWGroup', name=u'bougloup2', req=self.session)
+ self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"')
+ self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login "syt"')
+ rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")')
self.assertEquals(rset.rows, [['admin', 'activated'], ['syt', 'activated']])
def test_exists2(self):
- self.create_user('comme')
- self.create_user('cochon')
- self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"')
- rset = self.execute('Any GN ORDERBY GN WHERE X in_group G, G name GN, (G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))')
+ self.create_user('comme', req=self.session)
+ self.create_user('cochon', req=self.session)
+ self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
+ rset = self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, (G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))')
self.assertEquals(rset.rows, [['managers'], ['users']])
def test_exists3(self):
- self.create_user('comme')
- self.create_user('cochon')
- self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"')
- self.failUnless(self.execute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"'))
- self.execute('SET X copain Y WHERE X login "syt", Y login "cochon"')
- self.failUnless(self.execute('Any X, Y WHERE X copain Y, X login "syt", Y login "cochon"'))
- rset = self.execute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon"))')
+ self.create_user('comme', req=self.session)
+ self.create_user('cochon', req=self.session)
+ self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
+ self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"'))
+ self.sexecute('SET X copain Y WHERE X login "syt", Y login "cochon"')
+ self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login "syt", Y login "cochon"'))
+ rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon"))')
self.assertEquals(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', 'syt']])
def test_exists4(self):
- self.create_user('comme')
- self.create_user('cochon', groups=('users', 'guests'))
- self.create_user('billy')
- self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"')
- self.execute('SET X copain Y WHERE X login "cochon", Y login "cochon"')
- self.execute('SET X copain Y WHERE X login "comme", Y login "billy"')
- self.execute('SET X copain Y WHERE X login "syt", Y login "billy"')
+ self.create_user('comme', req=self.session)
+ self.create_user('cochon', groups=('users', 'guests'), req=self.session)
+ self.create_user('billy', req=self.session)
+ self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
+ self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"')
+ self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"')
+ self.sexecute('SET X copain Y WHERE X login "syt", Y login "billy"')
# search for group name, login where
# CWUser copain with "comme" or "cochon" AND same login as the copain
# OR
# CWUser in_state activated AND not copain with billy
#
# SO we expect everybody but "comme" and "syt"
- rset= self.execute('Any GN,L WHERE X in_group G, X login L, G name GN, '
+ rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, '
'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR '
'EXISTS(X in_state S, S name "activated", NOT X copain T2, T2 login "billy")')
- all = self.execute('Any GN, L WHERE X in_group G, X login L, G name GN')
+ all = self.sexecute('Any GN, L WHERE X in_group G, X login L, G name GN')
all.rows.remove(['users', 'comme'])
all.rows.remove(['users', 'syt'])
self.assertEquals(sorted(rset.rows), sorted(all.rows))
def test_exists5(self):
- self.create_user('comme')
- self.create_user('cochon', groups=('users', 'guests'))
- self.create_user('billy')
- self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"')
- self.execute('SET X copain Y WHERE X login "cochon", Y login "cochon"')
- self.execute('SET X copain Y WHERE X login "comme", Y login "billy"')
- self.execute('SET X copain Y WHERE X login "syt", Y login "cochon"')
- rset= self.execute('Any L WHERE X login L, '
+ self.create_user('comme', req=self.session)
+ self.create_user('cochon', groups=('users', 'guests'), req=self.session)
+ self.create_user('billy', req=self.session)
+ self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
+ self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"')
+ self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"')
+ self.sexecute('SET X copain Y WHERE X login "syt", Y login "cochon"')
+ rset= self.sexecute('Any L WHERE X login L, '
'EXISTS(X copain T, T login in ("comme", "cochon")) AND '
'NOT EXISTS(X copain T2, T2 login "billy")')
self.assertEquals(sorted(rset.rows), [['cochon'], ['syt']])
- rset= self.execute('Any GN,L WHERE X in_group G, X login L, G name GN, '
+ rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, '
'EXISTS(X copain T, T login in ("comme", "cochon")) AND '
'NOT EXISTS(X copain T2, T2 login "billy")')
self.assertEquals(sorted(rset.rows), [['guests', 'cochon'],
@@ -249,18 +244,18 @@
['users', 'syt']])
def test_cd_restriction(self):
- rset = self.execute('CWUser X WHERE X creation_date > "2009-02-01"')
+ rset = self.sexecute('CWUser X WHERE X creation_date > "2009-02-01"')
self.assertEquals(len(rset), 2) # admin/anon but no ldap user since it doesn't support creation_date
def test_union(self):
- afeids = self.execute('State X')
- ueids = self.execute('CWUser X')
- rset = self.execute('(Any X WHERE X is State) UNION (Any X WHERE X is CWUser)')
+ afeids = self.sexecute('State X')
+ ueids = self.sexecute('CWUser X')
+ rset = self.sexecute('(Any X WHERE X is State) UNION (Any X WHERE X is CWUser)')
self.assertEquals(sorted(r[0] for r in rset.rows),
sorted(r[0] for r in afeids + ueids))
def _init_security_test(self):
- self.create_user('iaminguestsgrouponly', groups=('guests',))
+ self.create_user('iaminguestsgrouponly', groups=('guests',), req=self.session)
cnx = self.login('iaminguestsgrouponly')
return cnx.cursor()
@@ -286,33 +281,33 @@
self.assertEquals(rset.rows, [[None]])
def test_nonregr1(self):
- self.execute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, '
+ self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, '
'X modification_date AA',
- {'x': cnx.user(self.session).eid})
+ {'x': self.session.user.eid})
def test_nonregr2(self):
- self.execute('Any X,L,AA WHERE E eid %(x)s, E owned_by X, '
+ self.sexecute('Any X,L,AA WHERE E eid %(x)s, E owned_by X, '
'X login L, X modification_date AA',
- {'x': cnx.user(self.session).eid})
+ {'x': self.session.user.eid})
def test_nonregr3(self):
- self.execute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, '
+ self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, '
'X modification_date AA',
- {'x': cnx.user(self.session).eid})
+ {'x': self.session.user.eid})
def test_nonregr4(self):
- emaileid = self.execute('INSERT EmailAddress X: X address "toto@logilab.org"')[0][0]
- self.execute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA',
+ emaileid = self.sexecute('INSERT EmailAddress X: X address "toto@logilab.org"')[0][0]
+ self.sexecute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA',
{'x': emaileid})
def test_nonregr5(self):
# original jpl query:
# Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5
rql = 'Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "%s", P is X, X creation_date CD' % self.session.user.login
- self.execute(rql, )#{'x': })
+ self.sexecute(rql, )#{'x': })
def test_nonregr6(self):
- self.execute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File '
+ self.sexecute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File '
'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) '
'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) '
'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)',
@@ -353,6 +348,9 @@
res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
self.assertEquals(res, [[1, 5], [2, 4], [3, 6]])
+# XXX
+LDAPUserSourceTC._init_repo()
+repo = LDAPUserSourceTC.repo
class RQL2LDAPFilterTC(RQLGeneratorTC):
schema = repo.schema
--- a/server/test/unittest_migractions.py Tue Aug 11 17:19:05 2009 +0200
+++ b/server/test/unittest_migractions.py Tue Aug 11 17:28:18 2009 +0200
@@ -2,13 +2,14 @@
:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
"""
+from copy import deepcopy
from datetime import date
from os.path import join
from logilab.common.testlib import TestCase, unittest_main
from cubicweb import ConfigurationError
-from cubicweb.devtools.apptest import RepositoryBasedTC, get_versions
+from cubicweb.devtools.testlib import CubicWebTC, get_versions
from cubicweb.schema import CubicWebSchemaLoader
from cubicweb.server.sqlutils import SQL_PREFIX
from cubicweb.server.repository import Repository
@@ -23,22 +24,28 @@
Repository.get_versions = orig_get_versions
-class MigrationCommandsTC(RepositoryBasedTC):
+class MigrationCommandsTC(CubicWebTC):
+
+ @classmethod
+ def init_config(cls, config):
+ super(MigrationCommandsTC, cls).init_config(config)
+ config._cubes = None
+ cls.repo.fill_schema()
+ cls.origschema = deepcopy(cls.repo.schema)
+ # hack to read the schema from data/migrschema
+ config.appid = join('data', 'migratedapp')
+ global migrschema
+ migrschema = config.load_schema()
+ config.appid = 'data'
+ assert 'Folder' in migrschema
+
+ @classmethod
+ def _refresh_repo(cls):
+ super(MigrationCommandsTC, cls)._refresh_repo()
+ cls.repo.schema = cls.vreg.schema = deepcopy(cls.origschema)
def setUp(self):
- if not hasattr(self, '_repo'):
- # first initialization
- repo = self.repo # set by the RepositoryBasedTC metaclass
- # force to read schema from the database
- repo.config._cubes = None
- repo.fill_schema()
- # hack to read the schema from data/migrschema
- self.repo.config.appid = join('data', 'migratedapp')
- global migrschema
- migrschema = self.repo.config.load_schema()
- self.repo.config.appid = 'data'
- assert 'Folder' in migrschema
- RepositoryBasedTC.setUp(self)
+ CubicWebTC.setUp(self)
self.mh = ServerMigrationHelper(self.repo.config, migrschema,
repo=self.repo, cnx=self.cnx,
interactive=False)
@@ -280,7 +287,7 @@
'Any N ORDERBY O WHERE X is CWAttribute, X relation_type RT, RT name N,'
'X from_entity FE, FE name "Personne",'
'X ordernum O')]
- expected = [u'nom', u'prenom', u'promo', u'ass', u'adel', u'titre',
+ expected = [u'nom', u'prenom', u'sexe', u'promo', u'ass', u'adel', u'titre',
u'web', u'tel', u'fax', u'datenaiss', u'test', 'description', u'firstname',
u'creation_date', 'cwuri', u'modification_date']
self.assertEquals(rinorder, expected)
--- a/server/test/unittest_msplanner.py Tue Aug 11 17:19:05 2009 +0200
+++ b/server/test/unittest_msplanner.py Tue Aug 11 17:28:18 2009 +0200
@@ -58,7 +58,7 @@
# keep cnx so it's not garbage collected and the associated session is closed
-repo, cnx = init_test_database('sqlite')
+repo, cnx = init_test_database()
class BaseMSPlannerTC(BasePlannerTC):
"""test planner related feature on a 3-sources repository:
--- a/server/test/unittest_multisources.py Tue Aug 11 17:19:05 2009 +0200
+++ b/server/test/unittest_multisources.py Tue Aug 11 17:28:18 2009 +0200
@@ -11,7 +11,7 @@
from logilab.common.decorators import cached
from cubicweb.devtools import TestServerConfiguration, init_test_database
-from cubicweb.devtools.apptest import RepositoryBasedTC
+from cubicweb.devtools.testlib import CubicWebTC, refresh_repo
from cubicweb.devtools.repotest import do_monkey_patch, undo_monkey_patch
TestServerConfiguration.no_sqlite_wrap = True
@@ -26,16 +26,9 @@
class ExternalSource2Configuration(TestServerConfiguration):
sourcefile = 'sources_multi2'
-repo2, cnx2 = init_test_database('sqlite', config=ExternalSource1Configuration('data'))
-cu = cnx2.cursor()
-ec1 = cu.execute('INSERT Card X: X title "C3: An external card", X wikiid "aaa"')[0][0]
-cu.execute('INSERT Card X: X title "C4: Ze external card", X wikiid "zzz"')
-aff1 = cu.execute('INSERT Affaire X: X ref "AFFREF", X in_state S WHERE S name "pitetre"')[0][0]
-cnx2.commit()
-
MTIME = datetime.now() - timedelta(0, 10)
-
-repo3, cnx3 = init_test_database('sqlite', config=ExternalSource2Configuration('data'))
+repo2, cnx2 = init_test_database(config=ExternalSource1Configuration('data'))
+repo3, cnx3 = init_test_database(config=ExternalSource2Configuration('data'))
# XXX, access existing connection, no pyro connection
from cubicweb.server.sources.pyrorql import PyroRQLSource
@@ -45,38 +38,47 @@
from cubicweb.dbapi import Connection
Connection.close = lambda x: None
-class TwoSourcesTC(RepositoryBasedTC):
- repo_config = TwoSourcesConfiguration('data')
+class TwoSourcesTC(CubicWebTC):
+ config = TwoSourcesConfiguration('data')
+
+ @classmethod
+ def _refresh_repo(cls):
+ super(TwoSourcesTC, cls)._refresh_repo()
+ cnx2.rollback()
+ refresh_repo(repo2)
+ cnx3.rollback()
+ refresh_repo(repo3)
def setUp(self):
- RepositoryBasedTC.setUp(self)
- self.repo.sources[-1]._query_cache.clear()
- self.repo.sources[-2]._query_cache.clear()
- # trigger discovery
- self.execute('Card X')
- self.execute('Affaire X')
- self.execute('State X')
- self.commit()
- # don't delete external entities!
- self.maxeid = self.session.system_sql('SELECT MAX(eid) FROM entities').fetchone()[0]
- # add some entities
- self.ic1 = self.execute('INSERT Card X: X title "C1: An internal card", X wikiid "aaai"')[0][0]
- self.ic2 = self.execute('INSERT Card X: X title "C2: Ze internal card", X wikiid "zzzi"')[0][0]
- self.commit()
+ CubicWebTC.setUp(self)
do_monkey_patch()
def tearDown(self):
- RepositoryBasedTC.tearDown(self)
+ CubicWebTC.tearDown(self)
undo_monkey_patch()
+ def setup_database(self):
+ cu = cnx2.cursor()
+ self.ec1 = cu.execute('INSERT Card X: X title "C3: An external card", X wikiid "aaa"')[0][0]
+ cu.execute('INSERT Card X: X title "C4: Ze external card", X wikiid "zzz"')
+ self.aff1 = cu.execute('INSERT Affaire X: X ref "AFFREF", X in_state S WHERE S name "pitetre"')[0][0]
+ cnx2.commit()
+ # trigger discovery
+ self.sexecute('Card X')
+ self.sexecute('Affaire X')
+ self.sexecute('State X')
+ # add some entities
+ self.ic1 = self.sexecute('INSERT Card X: X title "C1: An internal card", X wikiid "aaai"')[0][0]
+ self.ic2 = self.sexecute('INSERT Card X: X title "C2: Ze internal card", X wikiid "zzzi"')[0][0]
+
def test_eid_comp(self):
- rset = self.execute('Card X WHERE X eid > 1')
+ rset = self.sexecute('Card X WHERE X eid > 1')
self.assertEquals(len(rset), 4)
- rset = self.execute('Any X,T WHERE X title T, X eid > 1')
+ rset = self.sexecute('Any X,T WHERE X title T, X eid > 1')
self.assertEquals(len(rset), 4)
def test_metainformation(self):
- rset = self.execute('Card X ORDERBY T WHERE X title T')
+ rset = self.sexecute('Card X ORDERBY T WHERE X title T')
# 2 added to the system source, 2 added to the external source
self.assertEquals(len(rset), 4)
# since they are orderd by eid, we know the 3 first one is coming from the system source
@@ -89,28 +91,28 @@
self.assertEquals(metainf['source'], {'adapter': 'pyrorql', 'base-url': 'http://extern.org/', 'uri': 'extern'})
self.assertEquals(metainf['type'], 'Card')
self.assert_(metainf['extid'])
- etype = self.execute('Any ETN WHERE X is ET, ET name ETN, X eid %(x)s',
+ etype = self.sexecute('Any ETN WHERE X is ET, ET name ETN, X eid %(x)s',
{'x': externent.eid}, 'x')[0][0]
self.assertEquals(etype, 'Card')
def test_order_limit_offset(self):
- rsetbase = self.execute('Any W,X ORDERBY W,X WHERE X wikiid W')
+ rsetbase = self.sexecute('Any W,X ORDERBY W,X WHERE X wikiid W')
self.assertEquals(len(rsetbase), 4)
self.assertEquals(sorted(rsetbase.rows), rsetbase.rows)
- rset = self.execute('Any W,X ORDERBY W,X LIMIT 2 OFFSET 2 WHERE X wikiid W')
+ rset = self.sexecute('Any W,X ORDERBY W,X LIMIT 2 OFFSET 2 WHERE X wikiid W')
self.assertEquals(rset.rows, rsetbase.rows[2:4])
def test_has_text(self):
self.repo.sources_by_uri['extern'].synchronize(MTIME) # in case fti_update has been run before
- self.failUnless(self.execute('Any X WHERE X has_text "affref"'))
- self.failUnless(self.execute('Affaire X WHERE X has_text "affref"'))
+ self.failUnless(self.sexecute('Any X WHERE X has_text "affref"'))
+ self.failUnless(self.sexecute('Affaire X WHERE X has_text "affref"'))
def test_anon_has_text(self):
self.repo.sources_by_uri['extern'].synchronize(MTIME) # in case fti_update has been run before
- self.execute('INSERT Affaire X: X ref "no readable card"')[0][0]
- aff1 = self.execute('INSERT Affaire X: X ref "card"')[0][0]
+ self.sexecute('INSERT Affaire X: X ref "no readable card"')[0][0]
+ aff1 = self.sexecute('INSERT Affaire X: X ref "card"')[0][0]
# grant read access
- self.execute('SET X owned_by U WHERE X eid %(x)s, U login "anon"', {'x': aff1}, 'x')
+ self.sexecute('SET X owned_by U WHERE X eid %(x)s, U login "anon"', {'x': aff1}, 'x')
self.commit()
cnx = self.login('anon')
cu = cnx.cursor()
@@ -120,79 +122,81 @@
def test_synchronization(self):
cu = cnx2.cursor()
- assert cu.execute('Any X WHERE X eid %(x)s', {'x': aff1}, 'x')
- cu.execute('SET X ref "BLAH" WHERE X eid %(x)s', {'x': aff1}, 'x')
+ assert cu.execute('Any X WHERE X eid %(x)s', {'x': self.aff1}, 'x')
+ cu.execute('SET X ref "BLAH" WHERE X eid %(x)s', {'x': self.aff1}, 'x')
aff2 = cu.execute('INSERT Affaire X: X ref "AFFREUX", X in_state S WHERE S name "pitetre"')[0][0]
cnx2.commit()
try:
# force sync
self.repo.sources_by_uri['extern'].synchronize(MTIME)
- self.failUnless(self.execute('Any X WHERE X has_text "blah"'))
- self.failUnless(self.execute('Any X WHERE X has_text "affreux"'))
+ self.failUnless(self.sexecute('Any X WHERE X has_text "blah"'))
+ self.failUnless(self.sexecute('Any X WHERE X has_text "affreux"'))
cu.execute('DELETE Affaire X WHERE X eid %(x)s', {'x': aff2})
cnx2.commit()
self.repo.sources_by_uri['extern'].synchronize(MTIME)
- rset = self.execute('Any X WHERE X has_text "affreux"')
+ rset = self.sexecute('Any X WHERE X has_text "affreux"')
self.failIf(rset)
finally:
# restore state
- cu.execute('SET X ref "AFFREF" WHERE X eid %(x)s', {'x': aff1}, 'x')
+ cu.execute('SET X ref "AFFREF" WHERE X eid %(x)s', {'x': self.aff1}, 'x')
cnx2.commit()
def test_simplifiable_var(self):
- affeid = self.execute('Affaire X WHERE X ref "AFFREF"')[0][0]
- rset = self.execute('Any X,AA,AB WHERE E eid %(x)s, E in_state X, X name AA, X modification_date AB',
+ affeid = self.sexecute('Affaire X WHERE X ref "AFFREF"')[0][0]
+ rset = self.sexecute('Any X,AA,AB WHERE E eid %(x)s, E in_state X, X name AA, X modification_date AB',
{'x': affeid}, 'x')
self.assertEquals(len(rset), 1)
self.assertEquals(rset[0][1], "pitetre")
def test_simplifiable_var_2(self):
- affeid = self.execute('Affaire X WHERE X ref "AFFREF"')[0][0]
- rset = self.execute('Any E WHERE E eid %(x)s, E in_state S, NOT S name "moved"',
+ affeid = self.sexecute('Affaire X WHERE X ref "AFFREF"')[0][0]
+ rset = self.sexecute('Any E WHERE E eid %(x)s, E in_state S, NOT S name "moved"',
{'x': affeid, 'u': self.session.user.eid}, 'x')
self.assertEquals(len(rset), 1)
def test_sort_func(self):
- self.execute('Affaire X ORDERBY DUMB_SORT(RF) WHERE X ref RF')
+ self.sexecute('Affaire X ORDERBY DUMB_SORT(RF) WHERE X ref RF')
def test_sort_func_ambigous(self):
- self.execute('Any X ORDERBY DUMB_SORT(RF) WHERE X title RF')
+ self.sexecute('Any X ORDERBY DUMB_SORT(RF) WHERE X title RF')
def test_in_eid(self):
- iec1 = self.repo.extid2eid(self.repo.sources_by_uri['extern'], str(ec1),
+ iec1 = self.repo.extid2eid(self.repo.sources_by_uri['extern'], str(self.ec1),
'Card', self.session)
- rset = self.execute('Any X WHERE X eid IN (%s, %s)' % (iec1, self.ic1))
+ rset = self.sexecute('Any X WHERE X eid IN (%s, %s)' % (iec1, self.ic1))
self.assertEquals(sorted(r[0] for r in rset.rows), sorted([iec1, self.ic1]))
def test_greater_eid(self):
- rset = self.execute('Any X WHERE X eid > %s' % self.maxeid)
+ rset = self.sexecute('Any X WHERE X eid > %s' % (self.ic1 - 1))
self.assertEquals(len(rset.rows), 2) # self.ic1 and self.ic2
+ cu = cnx2.cursor()
ec2 = cu.execute('INSERT Card X: X title "glup"')[0][0]
cnx2.commit()
# 'X eid > something' should not trigger discovery
- rset = self.execute('Any X WHERE X eid > %s' % self.maxeid)
+ rset = self.sexecute('Any X WHERE X eid > %s' % (self.ic1 - 1))
self.assertEquals(len(rset.rows), 2)
# trigger discovery using another query
- crset = self.execute('Card X WHERE X title "glup"')
+ crset = self.sexecute('Card X WHERE X title "glup"')
self.assertEquals(len(crset.rows), 1)
- rset = self.execute('Any X WHERE X eid > %s' % self.maxeid)
+ rset = self.sexecute('Any X WHERE X eid > %s' % (self.ic1 - 1))
self.assertEquals(len(rset.rows), 3)
- rset = self.execute('Any MAX(X)')
+ rset = self.sexecute('Any MAX(X)')
self.assertEquals(len(rset.rows), 1)
self.assertEquals(rset.rows[0][0], crset[0][0])
def test_attr_unification_1(self):
- n1 = self.execute('INSERT Note X: X type "AFFREF"')[0][0]
- n2 = self.execute('INSERT Note X: X type "AFFREU"')[0][0]
- rset = self.execute('Any X,Y WHERE X is Note, Y is Affaire, X type T, Y ref T')
+ n1 = self.sexecute('INSERT Note X: X type "AFFREF"')[0][0]
+ n2 = self.sexecute('INSERT Note X: X type "AFFREU"')[0][0]
+ rset = self.sexecute('Any X,Y WHERE X is Note, Y is Affaire, X type T, Y ref T')
self.assertEquals(len(rset), 1, rset.rows)
def test_attr_unification_2(self):
+ cu = cnx2.cursor()
ec2 = cu.execute('INSERT Card X: X title "AFFREF"')[0][0]
cnx2.commit()
try:
- c1 = self.execute('INSERT Card C: C title "AFFREF"')[0][0]
- rset = self.execute('Any X,Y WHERE X is Card, Y is Affaire, X title T, Y ref T')
+ c1 = self.sexecute('INSERT Card C: C title "AFFREF"')[0][0]
+ rset = self.sexecute('Any X,Y WHERE X is Card, Y is Affaire, X title T, Y ref T')
self.assertEquals(len(rset), 2, rset.rows)
finally:
cu.execute('DELETE Card X WHERE X eid %(x)s', {'x': ec2}, 'x')
@@ -200,81 +204,86 @@
def test_attr_unification_neq_1(self):
# XXX complete
- self.execute('Any X,Y WHERE X is Note, Y is Affaire, X creation_date D, Y creation_date > D')
+ self.sexecute('Any X,Y WHERE X is Note, Y is Affaire, X creation_date D, Y creation_date > D')
def test_attr_unification_neq_2(self):
# XXX complete
- self.execute('Any X,Y WHERE X is Card, Y is Affaire, X creation_date D, Y creation_date > D')
+ self.sexecute('Any X,Y WHERE X is Card, Y is Affaire, X creation_date D, Y creation_date > D')
def test_union(self):
- afeids = self.execute('Affaire X')
- ueids = self.execute('CWUser X')
- rset = self.execute('(Any X WHERE X is Affaire) UNION (Any X WHERE X is CWUser)')
+ afeids = self.sexecute('Affaire X')
+ ueids = self.sexecute('CWUser X')
+ rset = self.sexecute('(Any X WHERE X is Affaire) UNION (Any X WHERE X is CWUser)')
self.assertEquals(sorted(r[0] for r in rset.rows),
sorted(r[0] for r in afeids + ueids))
def test_subquery1(self):
- rsetbase = self.execute('Any W,X WITH W,X BEING (Any W,X ORDERBY W,X WHERE X wikiid W)')
+ rsetbase = self.sexecute('Any W,X WITH W,X BEING (Any W,X ORDERBY W,X WHERE X wikiid W)')
self.assertEquals(len(rsetbase), 4)
self.assertEquals(sorted(rsetbase.rows), rsetbase.rows)
- rset = self.execute('Any W,X LIMIT 2 OFFSET 2 WITH W,X BEING (Any W,X ORDERBY W,X WHERE X wikiid W)')
+ rset = self.sexecute('Any W,X LIMIT 2 OFFSET 2 WITH W,X BEING (Any W,X ORDERBY W,X WHERE X wikiid W)')
self.assertEquals(rset.rows, rsetbase.rows[2:4])
- rset = self.execute('Any W,X ORDERBY W,X LIMIT 2 OFFSET 2 WITH W,X BEING (Any W,X WHERE X wikiid W)')
+ rset = self.sexecute('Any W,X ORDERBY W,X LIMIT 2 OFFSET 2 WITH W,X BEING (Any W,X WHERE X wikiid W)')
self.assertEquals(rset.rows, rsetbase.rows[2:4])
- rset = self.execute('Any W,X WITH W,X BEING (Any W,X ORDERBY W,X LIMIT 2 OFFSET 2 WHERE X wikiid W)')
+ rset = self.sexecute('Any W,X WITH W,X BEING (Any W,X ORDERBY W,X LIMIT 2 OFFSET 2 WHERE X wikiid W)')
self.assertEquals(rset.rows, rsetbase.rows[2:4])
def test_subquery2(self):
- affeid = self.execute('Affaire X WHERE X ref "AFFREF"')[0][0]
- rset = self.execute('Any X,AA,AB WITH X,AA,AB BEING (Any X,AA,AB WHERE E eid %(x)s, E in_state X, X name AA, X modification_date AB)',
+ affeid = self.sexecute('Affaire X WHERE X ref "AFFREF"')[0][0]
+ rset = self.sexecute('Any X,AA,AB WITH X,AA,AB BEING (Any X,AA,AB WHERE E eid %(x)s, E in_state X, X name AA, X modification_date AB)',
{'x': affeid})
self.assertEquals(len(rset), 1)
self.assertEquals(rset[0][1], "pitetre")
def test_not_relation(self):
- states = set(tuple(x) for x in self.execute('Any S,SN WHERE S is State, S name SN'))
+ states = set(tuple(x) for x in self.sexecute('Any S,SN WHERE S is State, S name SN'))
userstate = self.session.user.in_state[0]
states.remove((userstate.eid, userstate.name))
- notstates = set(tuple(x) for x in self.execute('Any S,SN WHERE S is State, S name SN, NOT X in_state S, X eid %(x)s',
+ notstates = set(tuple(x) for x in self.sexecute('Any S,SN WHERE S is State, S name SN, NOT X in_state S, X eid %(x)s',
{'x': self.session.user.eid}, 'x'))
self.assertEquals(notstates, states)
- aff1 = self.execute('Any X WHERE X is Affaire, X ref "AFFREF"')[0][0]
- aff1stateeid, aff1statename = self.execute('Any S,SN WHERE X eid %(x)s, X in_state S, S name SN', {'x': aff1}, 'x')[0]
+ aff1 = self.sexecute('Any X WHERE X is Affaire, X ref "AFFREF"')[0][0]
+ aff1stateeid, aff1statename = self.sexecute('Any S,SN WHERE X eid %(x)s, X in_state S, S name SN', {'x': aff1}, 'x')[0]
self.assertEquals(aff1statename, 'pitetre')
states.add((userstate.eid, userstate.name))
states.remove((aff1stateeid, aff1statename))
- notstates = set(tuple(x) for x in self.execute('Any S,SN WHERE S is State, S name SN, NOT X in_state S, X eid %(x)s',
+ notstates = set(tuple(x) for x in self.sexecute('Any S,SN WHERE S is State, S name SN, NOT X in_state S, X eid %(x)s',
{'x': aff1}, 'x'))
self.assertSetEquals(notstates, states)
def test_absolute_url_base_url(self):
+ cu = cnx2.cursor()
ceid = cu.execute('INSERT Card X: X title "without wikiid to get eid based url"')[0][0]
cnx2.commit()
- lc = self.execute('Card X WHERE X title "without wikiid to get eid based url"').get_entity(0, 0)
+ lc = self.sexecute('Card X WHERE X title "without wikiid to get eid based url"').get_entity(0, 0)
self.assertEquals(lc.absolute_url(), 'http://extern.org/card/eid/%s' % ceid)
+ cu.execute('DELETE Card X WHERE X eid %(x)s', {'x':ceid})
+ cnx2.commit()
def test_absolute_url_no_base_url(self):
cu = cnx3.cursor()
ceid = cu.execute('INSERT Card X: X title "without wikiid to get eid based url"')[0][0]
cnx3.commit()
- lc = self.execute('Card X WHERE X title "without wikiid to get eid based url"').get_entity(0, 0)
+ lc = self.sexecute('Card X WHERE X title "without wikiid to get eid based url"').get_entity(0, 0)
self.assertEquals(lc.absolute_url(), 'http://testing.fr/cubicweb/card/eid/%s' % lc.eid)
+ cu.execute('DELETE Card X WHERE X eid %(x)s', {'x':ceid})
+ cnx3.commit()
def test_nonregr1(self):
ueid = self.session.user.eid
- affaire = self.execute('Affaire X WHERE X ref "AFFREF"').get_entity(0, 0)
- self.execute('Any U WHERE U in_group G, (G name IN ("managers", "logilab") OR (X require_permission P?, P name "bla", P require_group G)), X eid %(x)s, U eid %(u)s',
+ affaire = self.sexecute('Affaire X WHERE X ref "AFFREF"').get_entity(0, 0)
+ self.sexecute('Any U WHERE U in_group G, (G name IN ("managers", "logilab") OR (X require_permission P?, P name "bla", P require_group G)), X eid %(x)s, U eid %(u)s',
{'x': affaire.eid, 'u': ueid})
def test_nonregr2(self):
treid = self.session.user.latest_trinfo().eid
- rset = self.execute('Any X ORDERBY D DESC WHERE E eid %(x)s, E wf_info_for X, X modification_date D',
+ rset = self.sexecute('Any X ORDERBY D DESC WHERE E eid %(x)s, E wf_info_for X, X modification_date D',
{'x': treid})
self.assertEquals(len(rset), 1)
self.assertEquals(rset.rows[0], [self.session.user.eid])
def test_nonregr3(self):
- self.execute('DELETE Card X WHERE X eid %(x)s, NOT X multisource_inlined_rel Y', {'x': self.ic1})
+ self.sexecute('DELETE Card X WHERE X eid %(x)s, NOT X multisource_inlined_rel Y', {'x': self.ic1})
if __name__ == '__main__':
from logilab.common.testlib import unittest_main
--- a/server/test/unittest_querier.py Tue Aug 11 17:19:05 2009 +0200
+++ b/server/test/unittest_querier.py Tue Aug 11 17:28:18 2009 +0200
@@ -46,7 +46,7 @@
('C0 text,C1 integer', {'A': 'table0.C0', 'B': 'table0.C1'}))
-repo, cnx = init_test_database('sqlite')
+repo, cnx = init_test_database()
--- a/server/test/unittest_repository.py Tue Aug 11 17:19:05 2009 +0200
+++ b/server/test/unittest_repository.py Tue Aug 11 17:28:18 2009 +0200
@@ -18,10 +18,11 @@
from yams.constraints import UniqueConstraint
-from cubicweb import BadConnectionId, RepositoryError, ValidationError, UnknownEid, AuthenticationError
+from cubicweb import (BadConnectionId, RepositoryError, ValidationError,
+ UnknownEid, AuthenticationError)
from cubicweb.schema import CubicWebSchema, RQLConstraint
from cubicweb.dbapi import connect, repo_connect, multiple_connections_unfix
-from cubicweb.devtools.apptest import RepositoryBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.devtools.repotest import tuplify
from cubicweb.server import repository
from cubicweb.server.sqlutils import SQL_PREFIX
@@ -31,48 +32,35 @@
os.system('pyro-ns >/dev/null 2>/dev/null &')
-class RepositoryTC(RepositoryBasedTC):
+class RepositoryTC(CubicWebTC):
""" singleton providing access to a persistent storage for entities
and relation
"""
-# def setUp(self):
-# pass
-
-# def tearDown(self):
-# self.repo.config.db_perms = True
-# cnxid = self.repo.connect(*self.default_user_password())
-# for etype in ('Affaire', 'Note', 'Societe', 'Personne'):
-# self.repo.execute(cnxid, 'DELETE %s X' % etype)
-# self.repo.commit(cnxid)
-# self.repo.close(cnxid)
-
def test_fill_schema(self):
self.repo.schema = CubicWebSchema(self.repo.config.appid)
self.repo.config._cubes = None # avoid assertion error
+ self.repo.config.repairing = True # avoid versions checking
self.repo.fill_schema()
- pool = self.repo._get_pool()
table = SQL_PREFIX + 'CWEType'
namecol = SQL_PREFIX + 'name'
finalcol = SQL_PREFIX + 'final'
- try:
- cu = self.session.system_sql('SELECT %s FROM %s WHERE %s is NULL' % (
- namecol, table, finalcol))
- self.assertEquals(cu.fetchall(), [])
- cu = self.session.system_sql('SELECT %s FROM %s WHERE %s=%%(final)s ORDER BY %s'
- % (namecol, table, finalcol, namecol), {'final': 'TRUE'})
- self.assertEquals(cu.fetchall(), [(u'Boolean',), (u'Bytes',),
- (u'Date',), (u'Datetime',),
- (u'Decimal',),(u'Float',),
- (u'Int',),
- (u'Interval',), (u'Password',),
- (u'String',), (u'Time',)])
- finally:
- self.repo._free_pool(pool)
+ self.session.set_pool()
+ cu = self.session.system_sql('SELECT %s FROM %s WHERE %s is NULL' % (
+ namecol, table, finalcol))
+ self.assertEquals(cu.fetchall(), [])
+ cu = self.session.system_sql('SELECT %s FROM %s WHERE %s=%%(final)s ORDER BY %s'
+ % (namecol, table, finalcol, namecol), {'final': 'TRUE'})
+ self.assertEquals(cu.fetchall(), [(u'Boolean',), (u'Bytes',),
+ (u'Date',), (u'Datetime',),
+ (u'Decimal',),(u'Float',),
+ (u'Int',),
+ (u'Interval',), (u'Password',),
+ (u'String',), (u'Time',)])
def test_schema_has_owner(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
self.failIf(repo.execute(cnxid, 'CWEType X WHERE NOT X owned_by U'))
self.failIf(repo.execute(cnxid, 'CWRType X WHERE NOT X owned_by U'))
self.failIf(repo.execute(cnxid, 'CWAttribute X WHERE NOT X owned_by U'))
@@ -81,18 +69,17 @@
self.failIf(repo.execute(cnxid, 'CWConstraintType X WHERE NOT X owned_by U'))
def test_connect(self):
- login, passwd = self.default_user_password()
- self.assert_(self.repo.connect(login, passwd))
+ self.assert_(self.repo.connect(self.admlogin, self.admpassword))
self.assertRaises(AuthenticationError,
- self.repo.connect, login, 'nimportnawak')
+ self.repo.connect, self.admlogin, 'nimportnawak')
self.assertRaises(AuthenticationError,
- self.repo.connect, login, None)
+ self.repo.connect, self.admlogin, None)
self.assertRaises(AuthenticationError,
self.repo.connect, None, None)
def test_execute(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
repo.execute(cnxid, 'Any X')
repo.execute(cnxid, 'Any X where X is Personne')
repo.execute(cnxid, 'Any X where X is Personne, X nom ~= "to"')
@@ -101,7 +88,7 @@
def test_login_upassword_accent(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_state S, X in_group G WHERE S name "activated", G name "users"',
{'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')})
repo.commit(cnxid)
@@ -110,7 +97,7 @@
def test_invalid_entity_rollback(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
# no group
repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_state S WHERE S name "activated"',
{'login': u"tutetute", 'passwd': 'tutetute'})
@@ -120,7 +107,7 @@
def test_close(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
self.assert_(cnxid)
repo.close(cnxid)
self.assertRaises(BadConnectionId, repo.execute, cnxid, 'Any X')
@@ -131,9 +118,9 @@
def test_shared_data(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
repo.set_shared_data(cnxid, 'data', 4)
- cnxid2 = repo.connect(*self.default_user_password())
+ cnxid2 = repo.connect(self.admlogin, self.admpassword)
self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
self.assertEquals(repo.get_shared_data(cnxid2, 'data'), None)
repo.set_shared_data(cnxid2, 'data', 5)
@@ -151,14 +138,14 @@
def test_check_session(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
self.assertEquals(repo.check_session(cnxid), None)
repo.close(cnxid)
self.assertRaises(BadConnectionId, repo.check_session, cnxid)
def test_transaction_base(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
# check db state
result = repo.execute(cnxid, 'Personne X')
self.assertEquals(result.rowcount, 0)
@@ -177,7 +164,7 @@
def test_transaction_base2(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
# rollback relation insertion
repo.execute(cnxid, "SET U in_group G WHERE U login 'admin', G name 'guests'")
result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'")
@@ -188,7 +175,7 @@
def test_transaction_base3(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
# rollback state change which trigger TrInfo insertion
ueid = repo._get_session(cnxid).user.eid
rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid})
@@ -206,7 +193,7 @@
def test_close_wait_processing_request(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
repo.execute(cnxid, 'INSERT CWUser X: X login "toto", X upassword "tutu", X in_group G WHERE G name "users"')
repo.commit(cnxid)
# close has to be in the thread due to sqlite limitations
@@ -290,7 +277,7 @@
def test_internal_api(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
session = repo._get_session(cnxid, setpool=True)
self.assertEquals(repo.type_and_source_from_eid(1, session),
('CWGroup', 'system', None))
@@ -308,7 +295,7 @@
def test_session_api(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
self.assertEquals(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {}))
self.assertEquals(repo.describe(cnxid, 1), (u'CWGroup', u'system', None))
repo.close(cnxid)
@@ -317,7 +304,7 @@
def test_shared_data_api(self):
repo = self.repo
- cnxid = repo.connect(*self.default_user_password())
+ cnxid = repo.connect(self.admlogin, self.admpassword)
self.assertEquals(repo.get_shared_data(cnxid, 'data'), None)
repo.set_shared_data(cnxid, 'data', 4)
self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
@@ -343,37 +330,34 @@
# print 'test time: %.3f (time) %.3f (cpu)' % ((time() - t), clock() - c)
-class DataHelpersTC(RepositoryBasedTC):
-
- def setUp(self):
- """ called before each test from this class """
- cnxid = self.repo.connect(*self.default_user_password())
- self.session = self.repo._sessions[cnxid]
- self.session.set_pool()
-
- def tearDown(self):
- self.session.rollback()
+class DataHelpersTC(CubicWebTC):
def test_create_eid(self):
+ self.session.set_pool()
self.assert_(self.repo.system_source.create_eid(self.session))
def test_source_from_eid(self):
+ self.session.set_pool()
self.assertEquals(self.repo.source_from_eid(1, self.session),
self.repo.sources_by_uri['system'])
def test_source_from_eid_raise(self):
+ self.session.set_pool()
self.assertRaises(UnknownEid, self.repo.source_from_eid, -2, self.session)
def test_type_from_eid(self):
+ self.session.set_pool()
self.assertEquals(self.repo.type_from_eid(1, self.session), 'CWGroup')
def test_type_from_eid_raise(self):
+ self.session.set_pool()
self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, self.session)
def test_add_delete_info(self):
entity = self.repo.vreg['etypes'].etype_class('Personne')(self.session)
entity.eid = -1
entity.complete = lambda x: None
+ self.session.set_pool()
self.repo.add_info(self.session, entity, self.repo.sources_by_uri['system'])
cu = self.session.system_sql('SELECT * FROM entities WHERE eid = -1')
data = cu.fetchall()
@@ -388,13 +372,14 @@
self.assertEquals(data, [])
-class FTITC(RepositoryBasedTC):
+class FTITC(CubicWebTC):
def test_reindex_and_modified_since(self):
eidp = self.execute('INSERT Personne X: X nom "toto", X prenom "tutu"')[0][0]
self.commit()
ts = datetime.now()
self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1)
+ self.session.set_pool()
cu = self.session.system_sql('SELECT mtime, eid FROM entities WHERE eid = %s' % eidp)
omtime = cu.fetchone()[0]
# our sqlite datetime adapter is ignore seconds fraction, so we have to
@@ -403,6 +388,7 @@
self.execute('SET X nom "tata" WHERE X eid %(x)s', {'x': eidp}, 'x')
self.commit()
self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1)
+ self.session.set_pool()
cu = self.session.system_sql('SELECT mtime FROM entities WHERE eid = %s' % eidp)
mtime = cu.fetchone()[0]
self.failUnless(omtime < mtime)
@@ -440,7 +426,7 @@
self.assertEquals(rset.rows, [[self.session.user.eid]])
-class DBInitTC(RepositoryBasedTC):
+class DBInitTC(CubicWebTC):
def test_versions_inserted(self):
inserted = [r[0] for r in self.execute('Any K ORDERBY K WHERE P pkey K, P pkey ~= "system.version.%"')]
@@ -450,11 +436,11 @@
u'system.version.file', u'system.version.folder',
u'system.version.tag'])
-class InlineRelHooksTC(RepositoryBasedTC):
+class InlineRelHooksTC(CubicWebTC):
"""test relation hooks are called for inlined relations
"""
def setUp(self):
- RepositoryBasedTC.setUp(self)
+ CubicWebTC.setUp(self)
self.hm = self.repo.hm
self.called = []
--- a/server/test/unittest_rqlannotation.py Tue Aug 11 17:19:05 2009 +0200
+++ b/server/test/unittest_rqlannotation.py Tue Aug 11 17:28:18 2009 +0200
@@ -6,7 +6,7 @@
from cubicweb.devtools import init_test_database
from cubicweb.devtools.repotest import BaseQuerierTC
-repo, cnx = init_test_database('sqlite')
+repo, cnx = init_test_database()
class SQLGenAnnotatorTC(BaseQuerierTC):
repo = repo
--- a/server/test/unittest_security.py Tue Aug 11 17:19:05 2009 +0200
+++ b/server/test/unittest_security.py Tue Aug 11 17:28:18 2009 +0200
@@ -4,21 +4,21 @@
import sys
from logilab.common.testlib import unittest_main, TestCase
-from cubicweb.devtools.apptest import RepositoryBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
from cubicweb import Unauthorized, ValidationError
from cubicweb.server.querier import check_read_access
-class BaseSecurityTC(RepositoryBasedTC):
+class BaseSecurityTC(CubicWebTC):
def setUp(self):
- RepositoryBasedTC.setUp(self)
+ CubicWebTC.setUp(self)
self.create_user('iaminusersgrouponly')
self.readoriggroups = self.schema['Personne'].get_groups('read')
self.addoriggroups = self.schema['Personne'].get_groups('add')
def tearDown(self):
- RepositoryBasedTC.tearDown(self)
+ CubicWebTC.tearDown(self)
self.schema['Personne'].set_groups('read', self.readoriggroups)
self.schema['Personne'].set_groups('add', self.addoriggroups)
@@ -37,7 +37,7 @@
cu = cnx.cursor()
self.assertRaises(Unauthorized,
check_read_access,
- self.schema, cnx.user(self.current_session()), rqlst, solution)
+ self.schema, cnx.user(self.session), rqlst, solution)
self.assertRaises(Unauthorized, cu.execute, rql)
def test_upassword_not_selectable(self):
@@ -165,7 +165,7 @@
def test_insert_relation_rql_permission(self):
cnx = self.login('iaminusersgrouponly')
- session = self.current_session()
+ session = self.session
cu = cnx.cursor(session)
cu.execute("SET A concerne S WHERE A is Affaire, S is Societe")
# should raise Unauthorized since user don't own S
@@ -210,7 +210,7 @@
def test_user_can_change_its_upassword(self):
- ueid = self.create_user('user')
+ ueid = self.create_user('user').eid
cnx = self.login('user')
cu = cnx.cursor()
cu.execute('SET X upassword %(passwd)s WHERE X eid %(x)s',
@@ -220,7 +220,7 @@
cnx = self.login('user', 'newpwd')
def test_user_cant_change_other_upassword(self):
- ueid = self.create_user('otheruser')
+ ueid = self.create_user('otheruser').eid
cnx = self.login('iaminusersgrouponly')
cu = cnx.cursor()
cu.execute('SET X upassword %(passwd)s WHERE X eid %(x)s',
@@ -416,7 +416,7 @@
def test_users_and_groups_non_readable_by_guests(self):
cnx = self.login('anon')
- anon = cnx.user(self.current_session())
+ anon = cnx.user(self.session)
cu = cnx.cursor()
# anonymous user can only read itself
rset = cu.execute('Any L WHERE X owned_by U, U login L')
@@ -426,7 +426,7 @@
# anonymous user can read groups (necessary to check allowed transitions for instance)
self.assert_(cu.execute('CWGroup X'))
# should only be able to read the anonymous user, not another one
- origuser = self.session.user
+ origuser = self.adminsession.user
self.assertRaises(Unauthorized,
cu.execute, 'CWUser X WHERE X eid %(x)s', {'x': origuser.eid}, 'x')
# nothing selected, nothing updated, no exception raised
@@ -462,7 +462,7 @@
self.commit()
cnx = self.login('anon')
cu = cnx.cursor()
- anoneid = self.current_session().user.eid
+ anoneid = self.session.user.eid
self.assertEquals(cu.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,'
'B bookmarked_by U, U eid %s' % anoneid).rows,
[['index', '?vid=index']])
@@ -491,7 +491,7 @@
eid = self.execute('INSERT Affaire X: X ref "ARCT01"')[0][0]
self.commit()
cnx = self.login('iaminusersgrouponly')
- session = self.current_session()
+ session = self.session
# needed to avoid check_perm error
session.set_pool()
# needed to remove rql expr granting update perm to the user
@@ -506,7 +506,7 @@
# XXX wether it should raise Unauthorized or ValidationError is not clear
# the best would probably ValidationError if the transition doesn't exist
# from the current state but Unauthorized if it exists but user can't pass it
- self.assertRaises(ValidationError, cu.execute, rql, {'x': cnx.user(self.current_session()).eid}, 'x')
+ self.assertRaises(ValidationError, cu.execute, rql, {'x': cnx.user(self.session).eid}, 'x')
def test_trinfo_security(self):
aff = self.execute('INSERT Affaire X: X ref "ARCT01"').get_entity(0, 0)
--- a/server/test/unittest_ssplanner.py Tue Aug 11 17:19:05 2009 +0200
+++ b/server/test/unittest_ssplanner.py Tue Aug 11 17:28:18 2009 +0200
@@ -10,7 +10,7 @@
from cubicweb.server.ssplanner import SSPlanner
# keep cnx so it's not garbage collected and the associated session is closed
-repo, cnx = init_test_database('sqlite')
+repo, cnx = init_test_database()
class SSPlannerTC(BasePlannerTC):
repo = repo
--- a/sobjects/test/unittest_email.py Tue Aug 11 17:19:05 2009 +0200
+++ b/sobjects/test/unittest_email.py Tue Aug 11 17:28:18 2009 +0200
@@ -5,9 +5,9 @@
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
"""
-from cubicweb.devtools.apptest import EnvBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
-class EmailAddressHooksTC(EnvBasedTC):
+class EmailAddressHooksTC(CubicWebTC):
def test_use_email_set_primary_email(self):
self.execute('INSERT EmailAddress X: X address "admin@logilab.fr", U use_email X WHERE U login "admin"')
--- a/sobjects/test/unittest_hooks.py Tue Aug 11 17:19:05 2009 +0200
+++ b/sobjects/test/unittest_hooks.py Tue Aug 11 17:28:18 2009 +0200
@@ -6,9 +6,9 @@
:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
"""
from logilab.common.testlib import unittest_main
-from cubicweb.devtools.apptest import EnvBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
-class HooksTC(EnvBasedTC):
+class HooksTC(CubicWebTC):
def test_euser_login_stripped(self):
u = self.create_user(' joe ')
--- a/sobjects/test/unittest_notification.py Tue Aug 11 17:19:05 2009 +0200
+++ b/sobjects/test/unittest_notification.py Tue Aug 11 17:28:18 2009 +0200
@@ -9,7 +9,7 @@
from socket import gethostname
from logilab.common.testlib import unittest_main, TestCase
-from cubicweb.devtools.apptest import EnvBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.sobjects.notification import construct_message_id, parse_message_id
@@ -48,7 +48,7 @@
self.assertNotEquals(msgid1, '<@testapp.%s>' % gethostname())
-class RecipientsFinderTC(EnvBasedTC):
+class RecipientsFinderTC(CubicWebTC):
def test(self):
urset = self.execute('CWUser X WHERE X login "admin"')
self.execute('INSERT EmailAddress X: X address "admin@logilab.fr", U primary_email X '
@@ -67,10 +67,10 @@
self.assertEquals(finder.recipients(), [('abcd@logilab.fr', 'en'), ('efgh@logilab.fr', 'en')])
-class StatusChangeViewsTC(EnvBasedTC):
+class StatusChangeViewsTC(CubicWebTC):
def test_status_change_view(self):
- req = self.session()
+ req = self.session
u = self.create_user('toto', req=req)
assert u.req
assert u.rset
--- a/sobjects/test/unittest_supervising.py Tue Aug 11 17:19:05 2009 +0200
+++ b/sobjects/test/unittest_supervising.py Tue Aug 11 17:28:18 2009 +0200
@@ -9,12 +9,12 @@
import re
from logilab.common.testlib import unittest_main
-from cubicweb.devtools.apptest import EnvBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.sobjects.supervising import SendMailOp, SupervisionMailOp
-class SupervisingTC(EnvBasedTC):
+class SupervisingTC(CubicWebTC):
def setup_database(self):
self.add_entity('Card', title=u"une news !", content=u"cubicweb c'est beau")
@@ -26,7 +26,7 @@
def test_supervision(self):
- session = self.session()
+ session = self.session
# do some modification
ueid = self.execute('INSERT CWUser X: X login "toto", X upassword "sosafe", X in_group G, X in_state S '
'WHERE G name "users", S name "activated"')[0][0]
@@ -75,7 +75,7 @@
self.assertEquals(op.to_send[0][1], ['test@logilab.fr'])
def test_nonregr1(self):
- session = self.session()
+ session = self.session
# do some unlogged modification
self.execute('SET X last_login_time NOW WHERE X eid %(x)s', {'x': session.user.eid}, 'x')
self.commit() # no crash
--- a/test/unittest_dbapi.py Tue Aug 11 17:19:05 2009 +0200
+++ b/test/unittest_dbapi.py Tue Aug 11 17:28:18 2009 +0200
@@ -7,24 +7,21 @@
"""
from cubicweb import ConnectionError
from cubicweb.dbapi import ProgrammingError
-from cubicweb.devtools.apptest import EnvBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
-class DBAPITC(EnvBasedTC):
- @property
- def cnx(self):
- return self.login('anon')
+class DBAPITC(CubicWebTC):
def test_public_repo_api(self):
- cnx = self.cnx
- self.assertEquals(cnx.get_schema(), self.env.repo.schema)
+ cnx = self.login('anon')
+ self.assertEquals(cnx.get_schema(), self.repo.schema)
self.assertEquals(cnx.source_defs(), {'system': {'adapter': 'native', 'uri': 'system'}})
self.restore_connection() # proper way to close cnx
self.assertRaises(ProgrammingError, cnx.get_schema)
self.assertRaises(ProgrammingError, cnx.source_defs)
def test_db_api(self):
- cnx = self.cnx
+ cnx = self.login('anon')
self.assertEquals(cnx.rollback(), None)
self.assertEquals(cnx.commit(), None)
self.restore_connection() # proper way to close cnx
@@ -34,7 +31,7 @@
self.assertRaises(ProgrammingError, cnx.close)
def test_api(self):
- cnx = self.cnx
+ cnx = self.login('anon')
self.assertEquals(cnx.user(None).login, 'anon')
self.assertEquals(cnx.describe(1), (u'CWGroup', u'system', None))
self.restore_connection() # proper way to close cnx
@@ -42,7 +39,7 @@
self.assertRaises(ConnectionError, cnx.describe, 1)
def test_session_data_api(self):
- cnx = self.cnx
+ cnx = self.login('anon')
self.assertEquals(cnx.get_session_data('data'), None)
self.assertEquals(cnx.session_data(), {})
cnx.set_session_data('data', 4)
@@ -57,7 +54,7 @@
self.assertEquals(cnx.session_data(), {'data': 4})
def test_shared_data_api(self):
- cnx = self.cnx
+ cnx = self.login('anon')
self.assertEquals(cnx.get_shared_data('data'), None)
cnx.set_shared_data('data', 4)
self.assertEquals(cnx.get_shared_data('data'), 4)
@@ -71,19 +68,6 @@
self.assertRaises(ConnectionError, cnx.set_shared_data, 'data', 0)
self.assertRaises(ConnectionError, cnx.get_shared_data, 'data')
-
-# class DBAPICursorTC(EnvBasedTC):
-
-# @property
-# def cursor(self):
-# return self.env.cnx.cursor()
-
-# def test_api(self):
-# cu = self.cursor
-# self.assertEquals(cu.describe(1), (u'CWGroup', u'system', None))
-# #cu.close()
-# #self.assertRaises(ConnectionError, cu.describe, 1)
-
if __name__ == '__main__':
from logilab.common.testlib import unittest_main
unittest_main()
--- a/test/unittest_entity.py Tue Aug 11 17:19:05 2009 +0200
+++ b/test/unittest_entity.py Tue Aug 11 17:28:18 2009 +0200
@@ -10,10 +10,10 @@
from datetime import datetime
from cubicweb import Binary
-from cubicweb.devtools.apptest import EnvBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.common.mttransforms import HAS_TAL
-class EntityTC(EnvBasedTC):
+class EntityTC(CubicWebTC):
## def setup_database(self):
## self.add_entity('Personne', nom=u'di mascio', prenom=u'adrien')
@@ -23,18 +23,18 @@
## embed=False)
def test_boolean_value(self):
- e = self.etype_instance('CWUser')
+ e = self.vreg['etypes'].etype_class('CWUser')(self.request())
self.failUnless(e)
def test_yams_inheritance(self):
from entities import Note
- e = self.etype_instance('SubNote')
+ e = self.vreg['etypes'].etype_class('SubNote')(self.request())
self.assertIsInstance(e, Note)
- e2 = self.etype_instance('SubNote')
+ e2 = self.vreg['etypes'].etype_class('SubNote')(self.request())
self.assertIs(e.__class__, e2.__class__)
def test_has_eid(self):
- e = self.etype_instance('CWUser')
+ e = self.vreg['etypes'].etype_class('CWUser')(self.request())
self.assertEquals(e.eid, None)
self.assertEquals(e.has_eid(), False)
e.eid = 'X'
@@ -208,7 +208,7 @@
1)
def test_new_entity_unrelated(self):
- e = self.etype_instance('CWUser')
+ e = self.vreg['etypes'].etype_class('CWUser')(self.request())
unrelated = [r[0] for r in e.unrelated('in_group', 'CWGroup', 'subject')]
# should be default groups but owners, i.e. managers, users, guests
self.assertEquals(len(unrelated), 3)
@@ -227,7 +227,6 @@
self.assertEquals(e.printable_value('content'),
'<p>\ndu *texte*\n</p>')
e['title'] = 'zou'
- #e = self.etype_instance('Task')
e['content'] = '''\
a title
=======
@@ -303,7 +302,7 @@
def test_fulltextindex(self):
- e = self.etype_instance('File')
+ e = self.vreg['etypes'].etype_class('File')(self.request())
e['name'] = 'an html file'
e['description'] = 'du <em>html</em>'
e['description_format'] = 'text/html'
@@ -324,7 +323,7 @@
def test_complete_relation(self):
self.execute('SET RT add_permission G WHERE RT name "wf_info_for", G name "managers"')
self.commit()
- session = self.session()
+ session = self.session
try:
eid = session.unsafe_execute(
'INSERT TrInfo X: X comment "zou", X wf_info_for U, X from_state S1, X to_state S2 '
--- a/test/unittest_rset.py Tue Aug 11 17:19:05 2009 +0200
+++ b/test/unittest_rset.py Tue Aug 11 17:28:18 2009 +0200
@@ -10,7 +10,7 @@
from logilab.common.testlib import TestCase, unittest_main
-from cubicweb.devtools.apptest import EnvBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.selectors import traced_selection
from urlparse import urlsplit
@@ -55,7 +55,7 @@
-class ResultSetTC(EnvBasedTC):
+class ResultSetTC(CubicWebTC):
def setUp(self):
super(ResultSetTC, self).setUp()
@@ -100,7 +100,7 @@
'Any U,L where U is CWUser, U login L',
description=[['CWUser', 'String']] * 3)
rs.req = self.request()
- rs.vreg = self.env.vreg
+ rs.vreg = self.vreg
self.assertEquals(rs.limit(2).rows, [[12000, 'adim'], [13000, 'syt']])
rs2 = rs.limit(2, offset=1)
@@ -115,9 +115,9 @@
'Any U,L where U is CWUser, U login L',
description=[['CWUser', 'String']] * 3)
rs.req = self.request()
- rs.vreg = self.env.vreg
+ rs.vreg = self.vreg
def test_filter(entity):
- return entity.login != 'nico'
+ return entity.cwdb.login != 'nico'
rs2 = rs.filtered_rset(test_filter)
self.assertEquals(len(rs2), 2)
@@ -140,7 +140,7 @@
'Any U,L where U is CWUser, U login L',
description=[['CWUser', 'String']] * 3)
rs.req = self.request()
- rs.vreg = self.env.vreg
+ rs.vreg = self.vreg
rs2 = rs.sorted_rset(lambda e:e['login'])
self.assertEquals(len(rs2), 3)
@@ -170,7 +170,7 @@
'D created_by U, D title T',
description=[['CWUser', 'String', 'String']] * 5)
rs.req = self.request()
- rs.vreg = self.env.vreg
+ rs.vreg = self.vreg
rsets = rs.split_rset(lambda e:e['login'])
self.assertEquals(len(rsets), 3)
@@ -271,11 +271,11 @@
self.assertEquals(pprelcachedict(e._related_cache),
[('created_by_subject', [5])])
# first level of recursion
- u = e.created_by[0]
+ u = e.cwdb.created_by[0]
self.assertEquals(u['login'], 'admin')
self.assertRaises(KeyError, u.__getitem__, 'firstname')
# second level of recursion
- s = u.in_state[0]
+ s = u.cwdb.in_state[0]
self.assertEquals(s['name'], 'activated')
self.assertRaises(KeyError, s.__getitem__, 'description')
--- a/test/unittest_selectors.py Tue Aug 11 17:19:05 2009 +0200
+++ b/test/unittest_selectors.py Tue Aug 11 17:28:18 2009 +0200
@@ -8,7 +8,7 @@
from logilab.common.testlib import TestCase, unittest_main
-from cubicweb.devtools.testlib import EnvBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.appobject import Selector, AndSelector, OrSelector
from cubicweb.selectors import implements, match_user_groups
from cubicweb.interfaces import IDownloadable
@@ -88,7 +88,7 @@
self.assertIs(csel.search_selector(implements), sel)
-class ImplementsSelectorTC(EnvBasedTC):
+class ImplementsSelectorTC(CubicWebTC):
def test_etype_priority(self):
req = self.request()
cls = self.vreg['etypes'].etype_class('File')
@@ -103,7 +103,7 @@
self.failIf(implements('Societe').score_class(cls, self.request()))
-class MatchUserGroupsTC(EnvBasedTC):
+class MatchUserGroupsTC(CubicWebTC):
def test_owners_group(self):
"""tests usage of 'owners' group with match_user_group"""
class SomeAction(action.Action):
@@ -118,16 +118,16 @@
self.create_user('john')
self.login('john')
# it should not be possible to use SomeAction not owned objects
- rset, req = self.env.get_rset_and_req('Any G WHERE G is CWGroup, G name "managers"')
+ rset, req = self.rset_and_req('Any G WHERE G is CWGroup, G name "managers"')
self.failIf('yo' in dict(self.pactions(req, rset)))
# insert a new card, and check that we can use SomeAction on our object
self.execute('INSERT Card C: C title "zoubidou"')
self.commit()
- rset, req = self.env.get_rset_and_req('Card C WHERE C title "zoubidou"')
+ rset, req = self.rset_and_req('Card C WHERE C title "zoubidou"')
self.failUnless('yo' in dict(self.pactions(req, rset)), self.pactions(req, rset))
# make sure even managers can't use the action
self.restore_connection()
- rset, req = self.env.get_rset_and_req('Card C WHERE C title "zoubidou"')
+ rset, req = self.rset_and_req('Card C WHERE C title "zoubidou"')
self.failIf('yo' in dict(self.pactions(req, rset)))
finally:
del self.vreg[SomeAction.__registry__][SomeAction.id]
--- a/vregistry.py Tue Aug 11 17:19:05 2009 +0200
+++ b/vregistry.py Tue Aug 11 17:28:18 2009 +0200
@@ -125,6 +125,7 @@
# dynamic selection methods ################################################
+ @deprecated('use select instead of object_by_id')
def object_by_id(self, oid, *args, **kwargs):
"""return object with the given oid. Only one object is expected to be
found.
@@ -143,9 +144,9 @@
raise `ObjectNotFound` if not object with id <oid> in <registry>
raise `NoSelectableObject` if not object apply
"""
- return self.select_best(self[oid], *args, **kwargs)
+ return self._select_best(self[oid], *args, **kwargs)
- def select_object(self, oid, *args, **kwargs):
+ def select_or_none(self, oid, *args, **kwargs):
"""return the most specific object among those with the given oid
according to the given context, or None if no object applies.
"""
@@ -153,6 +154,8 @@
return self.select(oid, *args, **kwargs)
except (NoSelectableObject, ObjectNotFound):
return None
+ select_object = deprecated('use select_or_none instead of select_object'
+ )(select_or_none)
def possible_objects(self, *args, **kwargs):
"""return an iterator on possible objects in this registry for the given
@@ -160,11 +163,11 @@
"""
for appobjects in self.itervalues():
try:
- yield self.select_best(appobjects, *args, **kwargs)
+ yield self._select_best(appobjects, *args, **kwargs)
except NoSelectableObject:
continue
- def select_best(self, appobjects, *args, **kwargs):
+ def _select_best(self, appobjects, *args, **kwargs):
"""return an instance of the most specific object according
to parameters
@@ -194,7 +197,7 @@
[repr(v) for v in winners]))
# return the result of calling the appobject
return winners[0](*args, **kwargs)
-
+ select_best = deprecated('select_best is now private')(_select_best)
class VRegistry(dict):
"""class responsible to register, propose and select the various
@@ -240,12 +243,12 @@
"""
return self[registry].select(oid, *args, **kwargs)
- @deprecated('use vreg[registry].select_object(oid, *args, **kwargs)')
+ @deprecated('use vreg[registry].select_or_none(oid, *args, **kwargs)')
def select_object(self, registry, oid, *args, **kwargs):
"""return the most specific object in <registry>.<oid> according to
the given context, or None if no object apply
"""
- return self[registry].select_object(oid, *args, **kwargs)
+ return self[registry].select_or_none(oid, *args, **kwargs)
@deprecated('use vreg[registry].possible_objects(*args, **kwargs)')
def possible_objects(self, registry, *args, **kwargs):
--- a/web/controller.py Tue Aug 11 17:19:05 2009 +0200
+++ b/web/controller.py Tue Aug 11 17:28:18 2009 +0200
@@ -93,7 +93,7 @@
self.ensure_ro_rql(rql)
if not isinstance(rql, unicode):
rql = unicode(rql, self.req.encoding)
- pp = self.vreg['components'].select_object('magicsearch', self.req)
+ pp = self.vreg['components'].select_or_none('magicsearch', self.req)
if pp is not None:
self.rset = pp.process_query(rql, self.req)
return self.rset
--- a/web/facet.py Tue Aug 11 17:19:05 2009 +0200
+++ b/web/facet.py Tue Aug 11 17:28:18 2009 +0200
@@ -64,8 +64,8 @@
def get_facet(req, facetid, rqlst, mainvar):
- return req.vreg['facets'].object_by_id(facetid, req, rqlst=rqlst,
- filtered_variable=mainvar)
+ return req.vreg['facets'].select(facetid, req, rqlst=rqlst,
+ filtered_variable=mainvar)
def filter_hiddens(w, **kwargs):
--- a/web/test/test_views.py Tue Aug 11 17:19:05 2009 +0200
+++ b/web/test/test_views.py Tue Aug 11 17:28:18 2009 +0200
@@ -6,7 +6,7 @@
:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
"""
-from cubicweb.devtools.testlib import WebTest, AutomaticWebTest
+from cubicweb.devtools.testlib import CubicWebTC, AutoPopulateTest, AutomaticWebTest
from cubicweb.view import AnyRsetView
AutomaticWebTest.application_rql = [
@@ -15,7 +15,7 @@
'Any COUNT(X) WHERE X is CWUser',
]
-class ComposityCopy(WebTest):
+class ComposityCopy(CubicWebTC):
def test_regr_copy_view(self):
"""regression test: make sure we can ask a copy of a
@@ -34,7 +34,7 @@
self.req.add_js('spam.js')
-class ManualWebTests(WebTest):
+class ManualCubicWebTCs(AutoPopulateTest):
def setup_database(self):
self.auto_populate(10)
@@ -59,7 +59,7 @@
-class ExplicitViewsTest(WebTest):
+class ExplicitViewsTest(CubicWebTC):
def test_unrelateddivs(self):
rset = self.execute('Any X WHERE X is CWUser, X login "admin"')
--- a/web/test/unittest_application.py Tue Aug 11 17:19:05 2009 +0200
+++ b/web/test/unittest_application.py Tue Aug 11 17:28:18 2009 +0200
@@ -14,7 +14,7 @@
from logilab.common.testlib import TestCase, unittest_main
from logilab.common.decorators import clear_cache
-from cubicweb.devtools.apptest import EnvBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.devtools.fake import FakeRequest
from cubicweb.web import Redirect, AuthenticationError, ExplicitLogin, INTERNAL_FIELD_VALUE
from cubicweb.web.views.basecontrollers import ViewController
@@ -134,7 +134,7 @@
for i in (12, 13, 14)])
-class ApplicationTC(EnvBasedTC):
+class ApplicationTC(CubicWebTC):
def publish(self, req, path='view'):
return self.app.publish(path, req)
--- a/web/test/unittest_controller.py Tue Aug 11 17:19:05 2009 +0200
+++ b/web/test/unittest_controller.py Tue Aug 11 17:28:18 2009 +0200
@@ -7,37 +7,35 @@
from logilab.common.testlib import unittest_main
-from cubicweb.devtools import apptest
+from cubicweb.devtools import testlib
-class BaseControllerTC(apptest.ControllerTC):
+class BaseControllerTC(testlib.CubicWebTC):
def test_parse_datetime_ok(self):
- self.assertIsInstance(self.ctrl.parse_datetime('2006/06/24 12:18'),
- datetime)
- self.assertIsInstance(self.ctrl.parse_datetime('2006/06/24'),
- date)
- self.assertIsInstance(self.ctrl.parse_datetime('2006/06/24 12:18', 'Datetime'),
- datetime)
- self.assertIsInstance(self.ctrl.parse_datetime('2006/06/24', 'Datetime'),
- datetime)
- self.assertIsInstance(self.ctrl.parse_datetime('2006/06/24', 'Date'),
- date)
- self.assertIsInstance(self.ctrl.parse_datetime('12:18', 'Time'),
- time)
+ ctrl = self.vreg['controllers'].select('view', self.request())
+ pd = ctrl.parse_datetime
+ self.assertIsInstance(pd('2006/06/24 12:18'), datetime)
+ self.assertIsInstance(pd('2006/06/24'), date)
+ self.assertIsInstance(pd('2006/06/24 12:18', 'Datetime'), datetime)
+ self.assertIsInstance(pd('2006/06/24', 'Datetime'), datetime)
+ self.assertIsInstance(pd('2006/06/24', 'Date'), date)
+ self.assertIsInstance(pd('12:18', 'Time'), time)
def test_parse_datetime_ko(self):
+ ctrl = self.vreg['controllers'].select('view', self.request())
+ pd = ctrl.parse_datetime
self.assertRaises(ValueError,
- self.ctrl.parse_datetime, '2006/06/24 12:188', 'Datetime')
+ pd, '2006/06/24 12:188', 'Datetime')
self.assertRaises(ValueError,
- self.ctrl.parse_datetime, '2006/06/240', 'Datetime')
+ pd, '2006/06/240', 'Datetime')
self.assertRaises(ValueError,
- self.ctrl.parse_datetime, '2006/06/24 12:18', 'Date')
+ pd, '2006/06/24 12:18', 'Date')
self.assertRaises(ValueError,
- self.ctrl.parse_datetime, '2006/24/06', 'Date')
+ pd, '2006/24/06', 'Date')
self.assertRaises(ValueError,
- self.ctrl.parse_datetime, '2006/06/240', 'Date')
+ pd, '2006/06/240', 'Date')
self.assertRaises(ValueError,
- self.ctrl.parse_datetime, '12:188', 'Time')
+ pd, '12:188', 'Time')
if __name__ == '__main__':
unittest_main()
--- a/web/test/unittest_form.py Tue Aug 11 17:19:05 2009 +0200
+++ b/web/test/unittest_form.py Tue Aug 11 17:28:18 2009 +0200
@@ -12,7 +12,7 @@
from logilab.common.testlib import unittest_main, mock_object
from cubicweb import Binary
-from cubicweb.devtools.testlib import WebTest
+from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.web.formfields import (IntField, StringField, RichTextField,
DateTimeField, DateTimePicker,
FileField, EditableFileField)
@@ -22,7 +22,7 @@
from cubicweb.web.views.formrenderers import FormRenderer
-class FieldsFormTC(WebTest):
+class FieldsFormTC(CubicWebTC):
def test_form_field_format(self):
form = FieldsForm(self.request(), None)
@@ -32,7 +32,7 @@
self.assertEquals(form.form_field_format(None), 'text/rest')
-class EntityFieldsFormTC(WebTest):
+class EntityFieldsFormTC(CubicWebTC):
def setUp(self):
super(EntityFieldsFormTC, self).setUp()
@@ -55,28 +55,28 @@
self.failIf(t.eid in unrelated, unrelated)
def test_form_field_vocabulary_new_entity(self):
- e = self.etype_instance('CWUser')
- form = EntityFieldsForm(self.request(), None, entity=e)
+ e = self.vreg['etypes'].etype_class('CWUser')(self.request())
+ form = EntityFieldsForm(e.req, None, entity=e)
unrelated = [rview for rview, reid in form.subject_relation_vocabulary('in_group')]
# should be default groups but owners, i.e. managers, users, guests
self.assertEquals(unrelated, [u'guests', u'managers', u'users'])
def test_subject_in_state_vocabulary(self):
# on a new entity
- e = self.etype_instance('CWUser')
- form = EntityFieldsForm(self.request(), None, entity=e)
+ e = self.vreg['etypes'].etype_class('CWUser')(self.request())
+ form = EntityFieldsForm(e.req, None, entity=e)
states = list(form.subject_in_state_vocabulary('in_state'))
self.assertEquals(len(states), 1)
self.assertEquals(states[0][0], u'activated') # list of (combobox view, state eid)
# on an existant entity
e = self.user()
- form = EntityFieldsForm(self.request(), None, entity=e)
+ form = EntityFieldsForm(e.req, None, entity=e)
states = list(form.subject_in_state_vocabulary('in_state'))
self.assertEquals(len(states), 1)
self.assertEquals(states[0][0], u'deactivated') # list of (combobox view, state eid)
def test_consider_req_form_params(self):
- e = self.etype_instance('CWUser')
+ e = self.vreg['etypes'].etype_class('CWUser')(self.request())
e.eid = 'A'
form = EntityFieldsForm(self.request(login=u'toto'), None, entity=e)
field = StringField(name='login', eidparam=True)
@@ -86,7 +86,7 @@
def test_linkto_field_duplication(self):
- e = self.etype_instance('CWUser')
+ e = self.vreg['etypes'].etype_class('CWUser')(self.request())
e.eid = 'A'
e.req = self.req
geid = self.execute('CWGroup X WHERE X name "users"')[0][0]
--- a/web/test/unittest_formfields.py Tue Aug 11 17:19:05 2009 +0200
+++ b/web/test/unittest_formfields.py Tue Aug 11 17:28:18 2009 +0200
@@ -11,7 +11,7 @@
from yams.constraints import StaticVocabularyConstraint, SizeConstraint
from cubicweb.devtools import TestServerConfiguration
-from cubicweb.devtools.testlib import EnvBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.web.formwidgets import PasswordInput, TextArea, Select, Radio
from cubicweb.web.formfields import *
from cubicweb.web.views.forms import EntityFieldsForm
@@ -112,11 +112,11 @@
[(u'maybe', '1'), (u'no', '')])
-class MoreFieldsTC(EnvBasedTC):
+class MoreFieldsTC(CubicWebTC):
def test_rtf_format_field(self):
req = self.request()
req.use_fckeditor = lambda: False
- e = self.etype_instance('State')
+ e = self.vreg['etypes'].etype_class('State')(req)
form = EntityFieldsForm(req, entity=e)
description_field = guess_field(schema['State'], schema['description'])
description_format_field = description_field.get_format_field(form)
--- a/web/test/unittest_magicsearch.py Tue Aug 11 17:19:05 2009 +0200
+++ b/web/test/unittest_magicsearch.py Tue Aug 11 17:28:18 2009 +0200
@@ -13,21 +13,17 @@
from rql import BadRQLQuery, RQLSyntaxError
-from cubicweb.devtools.apptest import EnvBasedTC, TestEnvironment
+from cubicweb.devtools.testlib import CubicWebTC
translations = {
u'CWUser' : u"Utilisateur",
-# u'Workcase' : u"Affaire",
u'EmailAddress' : u"Adresse",
-# u'Division' : u"Division",
-# u'Comment' : u"Commentaire",
u'name' : u"nom",
u'alias' : u"nom",
u'surname' : u"nom",
u'firstname' : u"prénom",
u'state' : u"état",
-# u'subject' : u"sujet",
u'address' : u"adresse",
u'use_email' : u"adel",
}
@@ -37,12 +33,12 @@
from cubicweb.web.views.magicsearch import translate_rql_tree, QSPreProcessor, QueryTranslator
-class QueryTranslatorTC(EnvBasedTC):
+class QueryTranslatorTC(CubicWebTC):
"""test suite for QueryTranslatorTC"""
def setUp(self):
super(QueryTranslatorTC, self).setUp()
- self.req = self.env.create_request()
+ self.req = self.request()
self.vreg.config.translations = {'en': _translate}
proc = self.vreg['components'].select('magicsearch', self.req)
self.proc = [p for p in proc.processors if isinstance(p, QueryTranslator)][0]
@@ -63,7 +59,7 @@
self.assertEquals(rql, "Any P WHERE P is CWUser, P use_email C, P surname 'Smith'")
-class QSPreProcessorTC(EnvBasedTC):
+class QSPreProcessorTC(CubicWebTC):
"""test suite for QSPreProcessor"""
def setUp(self):
super(QSPreProcessorTC, self).setUp()
@@ -88,7 +84,6 @@
eschema = self.schema.eschema('CWUser')
self.assertEquals(translate(u'prénom', eschema), "firstname")
self.assertEquals(translate(u'nom', eschema), 'surname')
- #self.assert_(translate(u'nom') in ('name', 'surname'))
eschema = self.schema.eschema('EmailAddress')
self.assertEquals(translate(u'adresse', eschema), "address")
self.assertEquals(translate(u'nom', eschema), 'alias')
@@ -125,7 +120,6 @@
self.assertEquals(transform(u'adresse', 'Logi%'),
('EmailAddress E WHERE E alias LIKE %(text)s', {'text': 'Logi%'}))
self.assertRaises(BadRQLQuery, transform, "pers", "taratata")
- #self.assertEquals(transform('CWUser', '%mi'), 'CWUser E WHERE P surname LIKE "%mi"')
def test_three_words_query(self):
"""tests the 'three words shortcut queries'"""
@@ -180,7 +174,7 @@
## Processor Chains tests ############################################
-class ProcessorChainTC(EnvBasedTC):
+class ProcessorChainTC(CubicWebTC):
"""test suite for magic_search's processor chains"""
def setUp(self):
@@ -195,7 +189,7 @@
(u'foo',
("Any X WHERE X has_text %(text)s", {'text': u'foo'})),
# XXX this sounds like a language translator test...
- # and it fail
+ # and it fails
(u'Utilisateur Smith',
('CWUser C WHERE C has_text %(text)s', {'text': u'Smith'})),
(u'utilisateur nom Smith',
--- a/web/test/unittest_urlpublisher.py Tue Aug 11 17:19:05 2009 +0200
+++ b/web/test/unittest_urlpublisher.py Tue Aug 11 17:28:18 2009 +0200
@@ -11,15 +11,14 @@
from logilab.common.testlib import unittest_main
-from cubicweb.devtools.apptest import EnvBasedTC
-from cubicweb.devtools._apptest import FakeRequest
-
from cubicweb.rset import ResultSet
+from cubicweb.devtools.testlib import CubicWebTC
+from cubicweb.devtools.fake import FakeRequest
from cubicweb.web import NotFound, Redirect
from cubicweb.web.views.urlrewrite import SimpleReqRewriter
-class URLPublisherTC(EnvBasedTC):
+class URLPublisherTC(CubicWebTC):
"""test suite for QSPreProcessor"""
def setup_database(self):
@@ -30,7 +29,7 @@
def process(self, url):
req = self.req = self.request()
- return self.env.app.url_resolver.process(req, url)
+ return self.app.url_resolver.process(req, url)
def test_raw_path(self):
"""tests raw path resolution'"""
--- a/web/test/unittest_urlrewrite.py Tue Aug 11 17:19:05 2009 +0200
+++ b/web/test/unittest_urlrewrite.py Tue Aug 11 17:28:18 2009 +0200
@@ -7,8 +7,8 @@
"""
from logilab.common.testlib import TestCase, unittest_main
-from cubicweb.devtools._apptest import FakeRequest
-from cubicweb.devtools.apptest import EnvBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
+from cubicweb.devtools.fake import FakeRequest
from cubicweb.web.views.urlrewrite import SimpleReqRewriter, SchemaBasedRewriter, rgx, rgx_action
@@ -82,7 +82,7 @@
-class RgxActionRewriteTC(EnvBasedTC):
+class RgxActionRewriteTC(CubicWebTC):
def setup_database(self):
self.p1 = self.create_user(u'user1')
--- a/web/test/unittest_views_actions.py Tue Aug 11 17:19:05 2009 +0200
+++ b/web/test/unittest_views_actions.py Tue Aug 11 17:28:18 2009 +0200
@@ -7,9 +7,9 @@
"""
from logilab.common.testlib import unittest_main
-from cubicweb.devtools.apptest import EnvBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
-class ActionsTC(EnvBasedTC):
+class ActionsTC(CubicWebTC):
def test_view_action(self):
req = self.request(__message='bla bla bla', vid='rss', rql='CWUser X')
rset = self.execute('CWUser X')
--- a/web/test/unittest_views_basecontrollers.py Tue Aug 11 17:19:05 2009 +0200
+++ b/web/test/unittest_views_basecontrollers.py Tue Aug 11 17:28:18 2009 +0200
@@ -8,42 +8,39 @@
import simplejson
from logilab.common.testlib import unittest_main, mock_object
-from cubicweb.devtools.apptest import EnvBasedTC, ControllerTC
from cubicweb import Binary, NoSelectableObject, ValidationError
from cubicweb.view import STRICT_DOCTYPE
+from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.common.uilib import rql_for_eid
-
from cubicweb.web import INTERNAL_FIELD_VALUE, Redirect, RequestError
-
from cubicweb.entities.authobjs import CWUser
-class EditControllerTC(ControllerTC):
+class EditControllerTC(CubicWebTC):
def setUp(self):
- ControllerTC.setUp(self)
+ CubicWebTC.setUp(self)
self.failUnless('users' in self.schema.eschema('CWGroup').get_groups('read'))
def tearDown(self):
- ControllerTC.tearDown(self)
+ CubicWebTC.tearDown(self)
self.failUnless('users' in self.schema.eschema('CWGroup').get_groups('read'))
def test_noparam_edit(self):
"""check behaviour of this controller without any form parameter
"""
-
- self.req.form = {}
- self.assertRaises(ValidationError, self.publish, self.req)
+ self.assertRaises(ValidationError, self.publish, self.request())
def test_validation_unique(self):
"""test creation of two linked entities
"""
user = self.user()
- self.req.form = {'eid': 'X', '__type:X': 'CWUser',
- 'login:X': u'admin', 'edits-login:X': u'',
- 'upassword:X': u'toto', 'upassword-confirm:X': u'toto', 'edits-upassword:X': u'',
- }
- self.assertRaises(ValidationError, self.publish, self.req)
+ req = self.request()
+ req.form = {'eid': 'X', '__type:X': 'CWUser',
+ 'login:X': u'admin', 'edits-login:X': u'',
+ 'upassword:X': u'toto', 'upassword-confirm:X': u'toto', 'edits-upassword:X': u'',
+ }
+ self.assertRaises(ValidationError, self.publish, req)
def test_user_editing_itself(self):
@@ -54,7 +51,8 @@
groupeids = [eid for eid, in self.execute('CWGroup G WHERE G name in ("managers", "users")')]
groups = [str(eid) for eid in groupeids]
stateeid = [eid for eid, in self.execute('State S WHERE S name "activated"')][0]
- self.req.form = {
+ req = self.request()
+ req.form = {
'eid': `user.eid`,
'__type:'+`user.eid`: 'CWUser',
'login:'+`user.eid`: unicode(user.login),
@@ -69,7 +67,7 @@
'edits-in_group:'+`user.eid`: basegroups,
'edits-in_state:'+`user.eid`: `stateeid`,
}
- path, params = self.expect_redirect_publish()
+ path, params = self.expect_redirect_publish(req)
e = self.execute('Any X WHERE X eid %(x)s', {'x': user.eid}, 'x').get_entity(0, 0)
self.assertEquals(e.firstname, u'Th\xe9nault')
self.assertEquals(e.surname, u'Sylvain')
@@ -81,8 +79,6 @@
user = self.create_user('user')
cnx = self.login('user')
req = self.request()
- #self.assertEquals(self.ctrl.schema['CWUser']._groups['read'],
- # ('managers', 'users'))
req.form = {
'eid': `user.eid`, '__type:'+`user.eid`: 'CWUser',
'__maineid' : str(user.eid),
@@ -101,7 +97,8 @@
"""
user = self.user()
groupeids = [eid for eid, in self.execute('CWGroup G WHERE X in_group G, X eid %(x)s', {'x': user.eid})]
- self.req.form = {
+ req = self.request()
+ req.form = {
'eid': `user.eid`,
'__type:'+`user.eid`: 'CWUser',
'login:'+`user.eid`: unicode(user.login),
@@ -112,7 +109,7 @@
'edits-firstname:'+`user.eid`: u'',
'edits-surname:'+`user.eid`: u'',
}
- path, params = self.expect_redirect_publish()
+ path, params = self.expect_redirect_publish(req)
e = self.execute('Any X WHERE X eid %(x)s', {'x': user.eid}, 'x').get_entity(0, 0)
self.assertEquals(e.login, user.login)
self.assertEquals(e.firstname, u'Th\xe9nault')
@@ -124,21 +121,22 @@
def test_create_multiple_linked(self):
gueid = self.execute('CWGroup G WHERE G name "users"')[0][0]
- self.req.form = {'eid': ['X', 'Y'],
-
- '__type:X': 'CWUser',
- '__maineid' : 'X',
- 'login:X': u'adim', 'edits-login:X': u'',
- 'upassword:X': u'toto', 'upassword-confirm:X': u'toto', 'edits-upassword:X': u'',
- 'surname:X': u'Di Mascio', 'edits-surname:X': '',
+ req = self.request()
+ req.form = {'eid': ['X', 'Y'],
- 'in_group:X': `gueid`, 'edits-in_group:X': INTERNAL_FIELD_VALUE,
+ '__type:X': 'CWUser',
+ '__maineid' : 'X',
+ 'login:X': u'adim', 'edits-login:X': u'',
+ 'upassword:X': u'toto', 'upassword-confirm:X': u'toto', 'edits-upassword:X': u'',
+ 'surname:X': u'Di Mascio', 'edits-surname:X': '',
- '__type:Y': 'EmailAddress',
- 'address:Y': u'dima@logilab.fr', 'edits-address:Y': '',
- 'use_email:X': 'Y', 'edits-use_email:X': INTERNAL_FIELD_VALUE,
- }
- path, params = self.expect_redirect_publish()
+ 'in_group:X': `gueid`, 'edits-in_group:X': INTERNAL_FIELD_VALUE,
+
+ '__type:Y': 'EmailAddress',
+ 'address:Y': u'dima@logilab.fr', 'edits-address:Y': '',
+ 'use_email:X': 'Y', 'edits-use_email:X': INTERNAL_FIELD_VALUE,
+ }
+ path, params = self.expect_redirect_publish(req)
# should be redirected on the created person
self.assertEquals(path, 'cwuser/adim')
e = self.execute('Any P WHERE P surname "Di Mascio"').get_entity(0, 0)
@@ -148,17 +146,18 @@
def test_edit_multiple_linked(self):
peid = self.create_user('adim').eid
- self.req.form = {'eid': [`peid`, 'Y'],
- '__type:%s'%peid: 'CWUser',
- 'surname:%s'%peid: u'Di Masci', 'edits-surname:%s'%peid: '',
+ req = self.request()
+ req.form = {'eid': [`peid`, 'Y'],
+ '__type:%s'%peid: 'CWUser',
+ 'surname:%s'%peid: u'Di Masci', 'edits-surname:%s'%peid: '',
- '__type:Y': 'EmailAddress',
- 'address:Y': u'dima@logilab.fr', 'edits-address:Y': '',
- 'use_email:%s'%peid: 'Y', 'edits-use_email:%s'%peid: INTERNAL_FIELD_VALUE,
+ '__type:Y': 'EmailAddress',
+ 'address:Y': u'dima@logilab.fr', 'edits-address:Y': '',
+ 'use_email:%s'%peid: 'Y', 'edits-use_email:%s'%peid: INTERNAL_FIELD_VALUE,
- '__redirectrql': 'Any X WHERE X eid %s'%peid,
- }
- path, params = self.expect_redirect_publish()
+ '__redirectrql': 'Any X WHERE X eid %s'%peid,
+ }
+ path, params = self.expect_redirect_publish(req)
# should be redirected on the created person
eid = params['rql'].split()[-1]
e = self.execute('Any X WHERE X eid %(x)s', {'x': eid}, 'x').get_entity(0, 0)
@@ -167,7 +166,8 @@
self.assertEquals(email.address, 'dima@logilab.fr')
emaileid = email.eid
- self.req.form = {'eid': [`peid`, `emaileid`],
+ req = self.request()
+ req.form = {'eid': [`peid`, `emaileid`],
'__type:%s'%peid: 'CWUser',
'surname:%s'%peid: u'Di Masci', 'edits-surname:%s'%peid: 'Di Masci',
'__type:%s'%emaileid: 'EmailAddress',
@@ -175,7 +175,7 @@
'use_email:%s'%peid: `emaileid`, 'edits-use_email:%s'%peid: `emaileid`,
'__redirectrql': 'Any X WHERE X eid %s'%peid,
}
- path, params = self.expect_redirect_publish()
+ path, params = self.expect_redirect_publish(req)
# should be redirected on the created person
eid = params['rql'].split()[-1]
e = self.execute('Any X WHERE X eid %(x)s', {'x': eid}, 'x').get_entity(0, 0)
@@ -188,41 +188,47 @@
"""test creation of two linked entities
"""
user = self.user()
- self.req.form = {'__cloned_eid:X': user.eid,
- 'eid': 'X', '__type:X': 'CWUser',
- 'login:X': u'toto', 'edits-login:X': u'',
- 'upassword:X': u'toto', 'edits-upassword:X': u'',
- }
- self.assertRaises(ValidationError, self.publish, self.req)
- self.req.form = {'__cloned_eid:X': user.eid,
- 'eid': 'X', '__type:X': 'CWUser',
- 'login:X': u'toto', 'edits-login:X': u'',
- 'upassword:X': u'toto', 'upassword-confirm:X': u'tutu', 'edits-upassword:X': u'',
- }
- self.assertRaises(ValidationError, self.publish, self.req)
+ req = self.request()
+ req.form = {'__cloned_eid:X': user.eid,
+ 'eid': 'X', '__type:X': 'CWUser',
+ 'login:X': u'toto', 'edits-login:X': u'',
+ 'upassword:X': u'toto', 'edits-upassword:X': u'',
+ }
+ self.assertRaises(ValidationError, self.publish, req)
+ req = self.request()
+ req.form = {'__cloned_eid:X': user.eid,
+ 'eid': 'X', '__type:X': 'CWUser',
+ 'login:X': u'toto', 'edits-login:X': u'',
+ 'upassword:X': u'toto',
+ 'upassword-confirm:X': u'tutu', 'edits-upassword:X': u'',
+ }
+ self.assertRaises(ValidationError, self.publish, req)
def test_interval_bound_constraint_success(self):
feid = self.execute('INSERT File X: X name "toto.txt", X data %(data)s',
{'data': Binary('yo')})[0][0]
- self.req.form = {'eid': ['X'],
- '__type:X': 'Salesterm',
- 'amount:X': u'-10', 'edits-amount:X': '',
- 'described_by_test:X': str(feid), 'edits-described_by_test:X': INTERNAL_FIELD_VALUE,
- }
- self.assertRaises(ValidationError, self.publish, self.req)
- self.req.form = {'eid': ['X'],
- '__type:X': 'Salesterm',
- 'amount:X': u'110', 'edits-amount:X': '',
- 'described_by_test:X': str(feid), 'edits-described_by_test:X': INTERNAL_FIELD_VALUE,
- }
- self.assertRaises(ValidationError, self.publish, self.req)
- self.req.form = {'eid': ['X'],
- '__type:X': 'Salesterm',
- 'amount:X': u'10', 'edits-amount:X': '',
- 'described_by_test:X': str(feid), 'edits-described_by_test:X': INTERNAL_FIELD_VALUE,
- }
- self.expect_redirect_publish()
+ req = self.request()
+ req.form = {'eid': ['X'],
+ '__type:X': 'Salesterm',
+ 'amount:X': u'-10', 'edits-amount:X': '',
+ 'described_by_test:X': str(feid), 'edits-described_by_test:X': INTERNAL_FIELD_VALUE,
+ }
+ self.assertRaises(ValidationError, self.publish, req)
+ req = self.request()
+ req.form = {'eid': ['X'],
+ '__type:X': 'Salesterm',
+ 'amount:X': u'110', 'edits-amount:X': '',
+ 'described_by_test:X': str(feid), 'edits-described_by_test:X': INTERNAL_FIELD_VALUE,
+ }
+ self.assertRaises(ValidationError, self.publish, req)
+ req = self.request()
+ req.form = {'eid': ['X'],
+ '__type:X': 'Salesterm',
+ 'amount:X': u'10', 'edits-amount:X': '',
+ 'described_by_test:X': str(feid), 'edits-described_by_test:X': INTERNAL_FIELD_VALUE,
+ }
+ self.expect_redirect_publish(req)
# should be redirected on the created
#eid = params['rql'].split()[-1]
e = self.execute('Salesterm X').get_entity(0, 0)
@@ -232,12 +238,13 @@
"""make sure req's pending insertions are taken into account"""
tmpgroup = self.add_entity('CWGroup', name=u"test")
user = self.user()
- self.req.set_session_data('pending_insert', set([(user.eid, 'in_group', tmpgroup.eid)]))
- path, params = self.expect_redirect_publish()
+ req = self.request()
+ req.set_session_data('pending_insert', set([(user.eid, 'in_group', tmpgroup.eid)]))
+ path, params = self.expect_redirect_publish(req)
usergroups = [gname for gname, in
self.execute('Any N WHERE G name N, U in_group G, U eid %(u)s', {'u': user.eid})]
self.assertUnorderedIterableEquals(usergroups, ['managers', 'test'])
- self.assertEquals(self.req.get_pending_inserts(), [])
+ self.assertEquals(req.get_pending_inserts(), [])
def test_req_pending_delete(self):
@@ -250,12 +257,13 @@
# just make sure everything was set correctly
self.assertUnorderedIterableEquals(usergroups, ['managers', 'test'])
# now try to delete the relation
- self.req.set_session_data('pending_delete', set([(user.eid, 'in_group', groupeid)]))
- path, params = self.expect_redirect_publish()
+ req = self.request()
+ req.set_session_data('pending_delete', set([(user.eid, 'in_group', groupeid)]))
+ path, params = self.expect_redirect_publish(req)
usergroups = [gname for gname, in
self.execute('Any N WHERE G name N, U in_group G, U eid %(u)s', {'u': user.eid})]
self.assertUnorderedIterableEquals(usergroups, ['managers'])
- self.assertEquals(self.req.get_pending_deletes(), [])
+ self.assertEquals(req.get_pending_deletes(), [])
def test_custom_attribute_handler(self):
def custom_login_edit(self, formparams, value, relations):
@@ -265,13 +273,14 @@
try:
user = self.user()
eid = repr(user.eid)
- self.req.form = {
+ req = self.request()
+ req.form = {
'eid': eid,
'__type:'+eid: 'CWUser',
'login:'+eid: u'foo',
'edits-login:'+eid: unicode(user.login),
}
- path, params = self.expect_redirect_publish()
+ path, params = self.expect_redirect_publish(req)
rset = self.execute('Any L WHERE X eid %(x)s, X login L', {'x': user.eid}, 'x')
self.assertEquals(rset[0][0], 'FOO')
finally:
@@ -279,18 +288,19 @@
def test_redirect_apply_button(self):
redirectrql = rql_for_eid(4012) # whatever
- self.req.form = {
- 'eid': 'A', '__type:A': 'BlogEntry',
- '__maineid' : 'A',
- 'content:A': u'"13:03:43"', 'edits-content:A': '',
- 'title:A': u'huuu', 'edits-title:A': '',
- '__redirectrql': redirectrql,
- '__redirectvid': 'primary',
- '__redirectparams': 'toto=tutu&tata=titi',
- '__form_id': 'edition',
- '__action_apply': '',
- }
- path, params = self.expect_redirect_publish()
+ req = self.request()
+ req.form = {
+ 'eid': 'A', '__type:A': 'BlogEntry',
+ '__maineid' : 'A',
+ 'content:A': u'"13:03:43"', 'edits-content:A': '',
+ 'title:A': u'huuu', 'edits-title:A': '',
+ '__redirectrql': redirectrql,
+ '__redirectvid': 'primary',
+ '__redirectparams': 'toto=tutu&tata=titi',
+ '__form_id': 'edition',
+ '__action_apply': '',
+ }
+ path, params = self.expect_redirect_publish(req)
self.failUnless(path.startswith('blogentry/'))
eid = path.split('/')[1]
self.assertEquals(params['vid'], 'edition')
@@ -301,17 +311,18 @@
def test_redirect_ok_button(self):
redirectrql = rql_for_eid(4012) # whatever
- self.req.form = {
- 'eid': 'A', '__type:A': 'BlogEntry',
- '__maineid' : 'A',
- 'content:A': u'"13:03:43"', 'edits-content:A': '',
- 'title:A': u'huuu', 'edits-title:A': '',
- '__redirectrql': redirectrql,
- '__redirectvid': 'primary',
- '__redirectparams': 'toto=tutu&tata=titi',
- '__form_id': 'edition',
- }
- path, params = self.expect_redirect_publish()
+ req = self.request()
+ req.form = {
+ 'eid': 'A', '__type:A': 'BlogEntry',
+ '__maineid' : 'A',
+ 'content:A': u'"13:03:43"', 'edits-content:A': '',
+ 'title:A': u'huuu', 'edits-title:A': '',
+ '__redirectrql': redirectrql,
+ '__redirectvid': 'primary',
+ '__redirectparams': 'toto=tutu&tata=titi',
+ '__form_id': 'edition',
+ }
+ path, params = self.expect_redirect_publish(req)
self.assertEquals(path, 'view')
self.assertEquals(params['rql'], redirectrql)
self.assertEquals(params['vid'], 'primary')
@@ -320,27 +331,30 @@
def test_redirect_delete_button(self):
eid = self.add_entity('BlogEntry', title=u'hop', content=u'hop').eid
- self.req.form = {'eid': str(eid), '__type:%s'%eid: 'BlogEntry',
- '__action_delete': ''}
- path, params = self.expect_redirect_publish()
+ req = self.request()
+ req.form = {'eid': str(eid), '__type:%s'%eid: 'BlogEntry',
+ '__action_delete': ''}
+ path, params = self.expect_redirect_publish(req)
self.assertEquals(path, 'blogentry')
self.assertEquals(params, {u'__message': u'entity deleted'})
eid = self.add_entity('EmailAddress', address=u'hop@logilab.fr').eid
self.execute('SET X use_email E WHERE E eid %(e)s, X eid %(x)s',
- {'x': self.session().user.eid, 'e': eid}, 'x')
+ {'x': self.session.user.eid, 'e': eid}, 'x')
self.commit()
- self.req.form = {'eid': str(eid), '__type:%s'%eid: 'EmailAddress',
- '__action_delete': ''}
- path, params = self.expect_redirect_publish()
+ req = self.request()
+ req.form = {'eid': str(eid), '__type:%s'%eid: 'EmailAddress',
+ '__action_delete': ''}
+ path, params = self.expect_redirect_publish(req)
self.assertEquals(path, 'cwuser/admin')
self.assertEquals(params, {u'__message': u'entity deleted'})
eid1 = self.add_entity('BlogEntry', title=u'hop', content=u'hop').eid
eid2 = self.add_entity('EmailAddress', address=u'hop@logilab.fr').eid
- self.req.form = {'eid': [str(eid1), str(eid2)],
- '__type:%s'%eid1: 'BlogEntry',
- '__type:%s'%eid2: 'EmailAddress',
- '__action_delete': ''}
- path, params = self.expect_redirect_publish()
+ req = self.request()
+ req.form = {'eid': [str(eid1), str(eid2)],
+ '__type:%s'%eid1: 'BlogEntry',
+ '__type:%s'%eid2: 'EmailAddress',
+ '__action_delete': ''}
+ path, params = self.expect_redirect_publish(req)
self.assertEquals(path, 'view')
self.assertEquals(params, {u'__message': u'entities deleted'})
@@ -351,23 +365,24 @@
groups = [str(eid) for eid in groupeids]
eeetypeeid = self.execute('CWEType X WHERE X name "CWGroup"')[0][0]
basegroups = [str(eid) for eid, in self.execute('CWGroup G WHERE X read_permission G, X eid %(x)s', {'x': eeetypeeid})]
- self.req.form = {
- 'eid': `eeetypeeid`,
- '__type:'+`eeetypeeid`: 'CWEType',
- 'name:'+`eeetypeeid`: u'CWGroup',
- 'final:'+`eeetypeeid`: False,
- 'meta:'+`eeetypeeid`: True,
- 'description:'+`eeetypeeid`: u'users group',
- 'read_permission:'+`eeetypeeid`: groups,
- #
- 'edits-name:'+`eeetypeeid`: u'CWGroup',
- 'edits-final:'+`eeetypeeid`: False,
- 'edits-meta:'+`eeetypeeid`: True,
- 'edits-description:'+`eeetypeeid`: u'users group',
- 'edits-read_permission:'+`eeetypeeid`: basegroups,
- }
+ req = self.request()
+ req.form = {
+ 'eid': `eeetypeeid`,
+ '__type:'+`eeetypeeid`: 'CWEType',
+ 'name:'+`eeetypeeid`: u'CWGroup',
+ 'final:'+`eeetypeeid`: False,
+ 'meta:'+`eeetypeeid`: True,
+ 'description:'+`eeetypeeid`: u'users group',
+ 'read_permission:'+`eeetypeeid`: groups,
+ #
+ 'edits-name:'+`eeetypeeid`: u'CWGroup',
+ 'edits-final:'+`eeetypeeid`: False,
+ 'edits-meta:'+`eeetypeeid`: True,
+ 'edits-description:'+`eeetypeeid`: u'users group',
+ 'edits-read_permission:'+`eeetypeeid`: basegroups,
+ }
try:
- path, params = self.expect_redirect_publish()
+ path, params = self.expect_redirect_publish(req)
e = self.execute('Any X WHERE X eid %(x)s', {'x': eeetypeeid}, 'x').get_entity(0, 0)
self.assertEquals(e.name, 'CWGroup')
self.assertEquals([g.eid for g in e.read_permission], groupeids)
@@ -383,23 +398,24 @@
groups = [str(eid) for eid in groupeids]
eeetypeeid = self.execute('CWEType X WHERE X name "CWEType"')[0][0]
basegroups = [str(eid) for eid, in self.execute('CWGroup G WHERE X read_permission G, X eid %(x)s', {'x': eeetypeeid})]
- self.req.form = {
- 'eid': `eeetypeeid`,
- '__type:'+`eeetypeeid`: 'CWEType',
- 'name:'+`eeetypeeid`: u'CWEType',
- 'final:'+`eeetypeeid`: False,
- 'meta:'+`eeetypeeid`: True,
- 'description:'+`eeetypeeid`: u'users group',
- 'read_permission:'+`eeetypeeid`: groups,
+ req = self.request()
+ req.form = {
+ 'eid': `eeetypeeid`,
+ '__type:'+`eeetypeeid`: 'CWEType',
+ 'name:'+`eeetypeeid`: u'CWEType',
+ 'final:'+`eeetypeeid`: False,
+ 'meta:'+`eeetypeeid`: True,
+ 'description:'+`eeetypeeid`: u'users group',
+ 'read_permission:'+`eeetypeeid`: groups,
- 'edits-name:'+`eeetypeeid`: u'CWEType',
- 'edits-final:'+`eeetypeeid`: False,
- 'edits-meta:'+`eeetypeeid`: True,
- 'edits-description:'+`eeetypeeid`: u'users group',
- 'edits-read_permission:'+`eeetypeeid`: basegroups,
- }
+ 'edits-name:'+`eeetypeeid`: u'CWEType',
+ 'edits-final:'+`eeetypeeid`: False,
+ 'edits-meta:'+`eeetypeeid`: True,
+ 'edits-description:'+`eeetypeeid`: u'users group',
+ 'edits-read_permission:'+`eeetypeeid`: basegroups,
+ }
try:
- path, params = self.expect_redirect_publish()
+ path, params = self.expect_redirect_publish(req)
e = self.execute('Any X WHERE X eid %(x)s', {'x': eeetypeeid}, 'x').get_entity(0, 0)
self.assertEquals(e.name, 'CWEType')
self.assertEquals(sorted(g.eid for g in e.read_permission), groupeids)
@@ -413,12 +429,13 @@
this seems to be postgres (tsearch?) specific
"""
- self.req.form = {
- 'eid': 'A', '__type:A': 'BlogEntry',
- '__maineid' : 'A',
- 'title:A': u'"13:03:40"', 'edits-title:A': '',
- 'content:A': u'"13:03:43"', 'edits-content:A': ''}
- path, params = self.expect_redirect_publish()
+ req = self.request()
+ req.form = {
+ 'eid': 'A', '__type:A': 'BlogEntry',
+ '__maineid' : 'A',
+ 'title:A': u'"13:03:40"', 'edits-title:A': '',
+ 'content:A': u'"13:03:43"', 'edits-content:A': ''}
+ path, params = self.expect_redirect_publish(req)
self.failUnless(path.startswith('blogentry/'))
eid = path.split('/')[1]
e = self.execute('Any C, T WHERE C eid %(x)s, C content T', {'x': eid}, 'x').get_entity(0, 0)
@@ -428,29 +445,31 @@
def test_nonregr_multiple_empty_email_addr(self):
gueid = self.execute('CWGroup G WHERE G name "users"')[0][0]
- self.req.form = {'eid': ['X', 'Y'],
-
- '__type:X': 'CWUser',
- 'login:X': u'adim', 'edits-login:X': u'',
- 'upassword:X': u'toto', 'upassword-confirm:X': u'toto', 'edits-upassword:X': u'',
- 'in_group:X': `gueid`, 'edits-in_group:X': INTERNAL_FIELD_VALUE,
+ req = self.request()
+ req.form = {'eid': ['X', 'Y'],
- '__type:Y': 'EmailAddress',
- 'address:Y': u'', 'edits-address:Y': '',
- 'alias:Y': u'', 'edits-alias:Y': '',
- 'use_email:X': 'Y', 'edits-use_email:X': INTERNAL_FIELD_VALUE,
- }
- self.assertRaises(ValidationError, self.publish, self.req)
+ '__type:X': 'CWUser',
+ 'login:X': u'adim', 'edits-login:X': u'',
+ 'upassword:X': u'toto', 'upassword-confirm:X': u'toto', 'edits-upassword:X': u'',
+ 'in_group:X': `gueid`, 'edits-in_group:X': INTERNAL_FIELD_VALUE,
+
+ '__type:Y': 'EmailAddress',
+ 'address:Y': u'', 'edits-address:Y': '',
+ 'alias:Y': u'', 'edits-alias:Y': '',
+ 'use_email:X': 'Y', 'edits-use_email:X': INTERNAL_FIELD_VALUE,
+ }
+ self.assertRaises(ValidationError, self.publish, req)
def test_nonregr_copy(self):
user = self.user()
- self.req.form = {'__cloned_eid:X': user.eid,
- 'eid': 'X', '__type:X': 'CWUser',
- '__maineid' : 'X',
- 'login:X': u'toto', 'edits-login:X': u'',
- 'upassword:X': u'toto', 'upassword-confirm:X': u'toto', 'edits-upassword:X': u'',
- }
- path, params = self.expect_redirect_publish()
+ req = self.request()
+ req.form = {'__cloned_eid:X': user.eid,
+ 'eid': 'X', '__type:X': 'CWUser',
+ '__maineid' : 'X',
+ 'login:X': u'toto', 'edits-login:X': u'',
+ 'upassword:X': u'toto', 'upassword-confirm:X': u'toto', 'edits-upassword:X': u'',
+ }
+ path, params = self.expect_redirect_publish(req)
self.assertEquals(path, 'cwuser/toto')
e = self.execute('Any X WHERE X is CWUser, X login "toto"').get_entity(0, 0)
self.assertEquals(e.login, 'toto')
@@ -466,29 +485,30 @@
e = self.add_entity('EmailAddress', address=u'doe@doe.com')
self.execute('SET P use_email E, P primary_email E WHERE P eid %(p)s, E eid %(e)s',
{'p' : p.eid, 'e' : e.eid})
- self.req.form = {'__cloned_eid:X': p.eid,
- 'eid': 'X', '__type:X': 'CWUser',
- 'login': u'dodo', 'edits-login': u'dodo',
- 'surname:X': u'Boom', 'edits-surname:X': u'',
- '__errorurl' : "whatever but required",
+ req = self.request()
+ req.form = {'__cloned_eid:X': p.eid,
+ 'eid': 'X', '__type:X': 'CWUser',
+ 'login': u'dodo', 'edits-login': u'dodo',
+ 'surname:X': u'Boom', 'edits-surname:X': u'',
+ '__errorurl' : "whatever but required",
}
# try to emulate what really happens in the web application
# 1/ validate form => EditController.publish raises a ValidationError
# which fires a Redirect
# 2/ When re-publishing the copy form, the publisher implicitly commits
try:
- self.env.app.publish('edit', self.req)
+ self.app.publish('edit', req)
except Redirect:
- self.req.form['rql'] = 'Any X WHERE X eid %s' % p.eid
- self.req.form['vid'] = 'copy'
- self.env.app.publish('view', self.req)
+ req.form['rql'] = 'Any X WHERE X eid %s' % p.eid
+ req.form['vid'] = 'copy'
+ self.app.publish('view', req)
rset = self.execute('CWUser P WHERE P surname "Boom"')
self.assertEquals(len(rset), 0)
finally:
p.__class__.skip_copy_for = old_skips
-class EmbedControllerTC(EnvBasedTC):
+class EmbedControllerTC(CubicWebTC):
def test_nonregr_embed_publish(self):
# This test looks a bit stupid but at least it will probably
@@ -500,23 +520,23 @@
result = controller.publish(rset=None)
-class ReportBugControllerTC(EnvBasedTC):
+class ReportBugControllerTC(CubicWebTC):
def test_usable_by_guets(self):
- req = self.request()
- self.vreg['controllers'].select('reportbug', req)
+ self.login('anon')
+ self.vreg['controllers'].select('reportbug', self.request())
-class SendMailControllerTC(EnvBasedTC):
+class SendMailControllerTC(CubicWebTC):
def test_not_usable_by_guets(self):
self.login('anon')
- req = self.request()
- self.assertRaises(NoSelectableObject, self.env.vreg['controllers'].select, 'sendmail', req)
+ self.assertRaises(NoSelectableObject,
+ self.vreg['controllers'].select, 'sendmail', self.request())
-class JSONControllerTC(EnvBasedTC):
+class JSONControllerTC(CubicWebTC):
def ctrl(self, req=None):
req = req or self.request(url='http://whatever.fr/')
--- a/web/test/unittest_views_basetemplates.py Tue Aug 11 17:19:05 2009 +0200
+++ b/web/test/unittest_views_basetemplates.py Tue Aug 11 17:28:18 2009 +0200
@@ -5,11 +5,11 @@
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
"""
-from cubicweb.devtools.testlib import WebTest
+from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.devtools.htmlparser import DTDValidator
-class LogFormTemplateTC(WebTest):
+class LogFormTemplateTC(CubicWebTC):
def _login_labels(self):
valid = self.content_type_validators.get('text/html', DTDValidator)()
--- a/web/test/unittest_views_baseviews.py Tue Aug 11 17:19:05 2009 +0200
+++ b/web/test/unittest_views_baseviews.py Tue Aug 11 17:28:18 2009 +0200
@@ -10,7 +10,7 @@
from logilab.common.testlib import unittest_main
from logilab.mtconverter import html_unescape
-from cubicweb.devtools.apptest import EnvBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.web.htmlwidgets import TableWidget
from cubicweb.web.views import vid_from_rset
@@ -18,7 +18,7 @@
def loadjson(value):
return loads(html_unescape(value))
-class VidFromRsetTC(EnvBasedTC):
+class VidFromRsetTC(CubicWebTC):
def test_no_rset(self):
req = self.request()
@@ -84,7 +84,7 @@
self.assertEquals(vid_from_rset(req, rset, self.schema), 'table')
-class TableViewTC(EnvBasedTC):
+class TableViewTC(CubicWebTC):
def _prepare_entity(self):
e = self.add_entity("State", name=u'<toto>', description=u'loo"ong blabla')
--- a/web/test/unittest_views_editforms.py Tue Aug 11 17:19:05 2009 +0200
+++ b/web/test/unittest_views_editforms.py Tue Aug 11 17:28:18 2009 +0200
@@ -6,14 +6,13 @@
:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
"""
from logilab.common.testlib import unittest_main
-from cubicweb.devtools.apptest import EnvBasedTC
-from cubicweb.devtools.testlib import WebTest
+from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.web.views.autoform import AutomaticEntityForm as AEF
from cubicweb.web.formwidgets import AutoCompletionWidget
def rbc(entity, category):
return [(rschema.type, x) for rschema, tschemas, x in AEF.erelations_by_category(entity, category)]
-class AutomaticEntityFormTC(EnvBasedTC):
+class AutomaticEntityFormTC(CubicWebTC):
def test_custom_widget(self):
AEF.rfields_kwargs.tag_subject_of(('CWUser', 'login', '*'),
@@ -29,7 +28,7 @@
#for (rtype, role, stype, otype), tag in AEF.rcategories._tagdefs.items():
# if rtype == 'tags':
# print rtype, role, stype, otype, ':', tag
- e = self.etype_instance('CWUser')
+ e = self.vreg['etypes'].etype_class('CWUser')(self.request())
# see custom configuration in views.cwuser
self.assertEquals(rbc(e, 'primary'),
[('login', 'subject'),
@@ -76,7 +75,7 @@
self.failIf(AEF.rinlined.etype_get('CWUser', 'primary_email', 'subject'))
def test_personne_relations_by_category(self):
- e = self.etype_instance('Personne')
+ e = self.vreg['etypes'].etype_class('Personne')(self.request())
self.assertListEquals(rbc(e, 'primary'),
[('nom', 'subject'),
('eid', 'subject')
@@ -124,7 +123,7 @@
self.failIf(any(f for f in form.fields if f is None))
-class FormViewsTC(WebTest):
+class FormViewsTC(CubicWebTC):
def test_delete_conf_formview(self):
rset = self.execute('CWGroup X')
self.view('deleteconf', rset, template=None).source
--- a/web/test/unittest_views_navigation.py Tue Aug 11 17:19:05 2009 +0200
+++ b/web/test/unittest_views_navigation.py Tue Aug 11 17:28:18 2009 +0200
@@ -7,14 +7,14 @@
"""
from logilab.common.testlib import unittest_main, mock_object
-from cubicweb.devtools.apptest import EnvBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.web.views.navigation import PageNavigation, SortedNavigation
from cubicweb.web.views.ibreadcrumbs import BreadCrumbEntityVComponent
BreadCrumbEntityVComponent.visible = True
-class NavigationTC(EnvBasedTC):
+class NavigationTC(CubicWebTC):
def test_navigation_selection_whatever(self):
req = self.request()
@@ -40,10 +40,10 @@
def test_navigation_selection_not_enough(self):
req = self.request()
rset = self.execute('Any X,N LIMIT 10 WHERE X name N')
- navcomp = self.vreg['components'].select_object('navigation', req, rset=rset)
+ navcomp = self.vreg['components'].select_or_none('navigation', req, rset=rset)
self.assertEquals(navcomp, None)
req.set_search_state('W:X:Y:Z')
- navcomp = self.vreg['components'].select_object('navigation', req, rset=rset)
+ navcomp = self.vreg['components'].select_or_none('navigation', req, rset=rset)
self.assertEquals(navcomp, None)
req.set_search_state('normal')
@@ -96,7 +96,7 @@
-class ContentNavigationTC(EnvBasedTC):
+class ContentNavigationTC(CubicWebTC):
def test_component_context(self):
view = mock_object(is_primary=lambda x: True)
--- a/web/test/unittest_views_pyviews.py Tue Aug 11 17:19:05 2009 +0200
+++ b/web/test/unittest_views_pyviews.py Tue Aug 11 17:28:18 2009 +0200
@@ -1,7 +1,7 @@
from logilab.common.testlib import unittest_main
-from cubicweb.devtools.apptest import EnvBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
-class PyViewsTC(EnvBasedTC):
+class PyViewsTC(CubicWebTC):
def test_pyvaltable(self):
content = self.vreg['views'].render('pyvaltable', self.request(),
--- a/web/test/unittest_views_searchrestriction.py Tue Aug 11 17:19:05 2009 +0200
+++ b/web/test/unittest_views_searchrestriction.py Tue Aug 11 17:28:18 2009 +0200
@@ -5,11 +5,11 @@
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
"""
-from cubicweb.devtools.apptest import EnvBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.web.facet import insert_attr_select_relation, prepare_facets_rqlst
-class InsertAttrRelationTC(EnvBasedTC):
+class InsertAttrRelationTC(CubicWebTC):
def parse(self, query):
rqlst = self.vreg.parse(self.session, query)
--- a/web/test/unittest_viewselector.py Tue Aug 11 17:19:05 2009 +0200
+++ b/web/test/unittest_viewselector.py Tue Aug 11 17:28:18 2009 +0200
@@ -4,7 +4,7 @@
"""
from logilab.common.testlib import unittest_main
-from cubicweb.devtools.apptest import EnvBasedTC
+from cubicweb.devtools.testlib import CubicWebTC
from cubicweb import CW_SOFTWARE_ROOT as BASE, Binary
from cubicweb.selectors import (match_user_groups, implements,
specified_etype_implements, rql_condition,
@@ -26,7 +26,7 @@
('siteinfo', actions.SiteInfoAction),
]
-class ViewSelectorTC(EnvBasedTC):
+class ViewSelectorTC(CubicWebTC):
def setup_database(self):
self.add_entity('BlogEntry', title=u"une news !", content=u"cubicweb c'est beau")
@@ -78,12 +78,12 @@
('systempropertiesform', cwproperties.SystemCWPropertiesForm)])
def test_possible_views_noresult(self):
- rset, req = self.env.get_rset_and_req('Any X WHERE X eid 999999')
+ rset, req = self.rset_and_req('Any X WHERE X eid 999999')
self.assertListEqual(self.pviews(req, rset),
[])
def test_possible_views_one_egroup(self):
- rset, req = self.env.get_rset_and_req('CWGroup X WHERE X name "managers"')
+ rset, req = self.rset_and_req('CWGroup X WHERE X name "managers"')
self.assertListEqual(self.pviews(req, rset),
[('adaptedlist', baseviews.AdaptedListView),
('csvexport', csvexport.CSVRsetView),
@@ -106,7 +106,7 @@
])
def test_possible_views_multiple_egroups(self):
- rset, req = self.env.get_rset_and_req('CWGroup X')
+ rset, req = self.rset_and_req('CWGroup X')
self.assertListEqual(self.pviews(req, rset),
[('adaptedlist', baseviews.AdaptedListView),
('csvexport', csvexport.CSVRsetView),
@@ -130,16 +130,16 @@
def test_propertiesform_admin(self):
assert self.vreg['views']['propertiesform']
- rset1, req1 = self.env.get_rset_and_req('CWUser X WHERE X login "admin"')
- rset2, req2 = self.env.get_rset_and_req('CWUser X WHERE X login "anon"')
+ rset1, req1 = self.rset_and_req('CWUser X WHERE X login "admin"')
+ rset2, req2 = self.rset_and_req('CWUser X WHERE X login "anon"')
self.failUnless(self.vreg['views'].select('propertiesform', req1, rset=None))
self.failUnless(self.vreg['views'].select('propertiesform', req1, rset=rset1))
self.failUnless(self.vreg['views'].select('propertiesform', req2, rset=rset2))
def test_propertiesform_anon(self):
self.login('anon')
- rset1, req1 = self.env.get_rset_and_req('CWUser X WHERE X login "admin"')
- rset2, req2 = self.env.get_rset_and_req('CWUser X WHERE X login "anon"')
+ rset1, req1 = self.rset_and_req('CWUser X WHERE X login "admin"')
+ rset2, req2 = self.rset_and_req('CWUser X WHERE X login "anon"')
self.assertRaises(NoSelectableObject, self.vreg['views'].select, 'propertiesform', req1, rset=None)
self.assertRaises(NoSelectableObject, self.vreg['views'].select, 'propertiesform', req1, rset=rset1)
self.assertRaises(NoSelectableObject, self.vreg['views'].select, 'propertiesform', req1, rset=rset2)
@@ -147,14 +147,14 @@
def test_propertiesform_jdoe(self):
self.create_user('jdoe')
self.login('jdoe')
- rset1, req1 = self.env.get_rset_and_req('CWUser X WHERE X login "admin"')
- rset2, req2 = self.env.get_rset_and_req('CWUser X WHERE X login "jdoe"')
+ rset1, req1 = self.rset_and_req('CWUser X WHERE X login "admin"')
+ rset2, req2 = self.rset_and_req('CWUser X WHERE X login "jdoe"')
self.failUnless(self.vreg['views'].select('propertiesform', req1, rset=None))
self.assertRaises(NoSelectableObject, self.vreg['views'].select, 'propertiesform', req1, rset=rset1)
self.failUnless(self.vreg['views'].select('propertiesform', req2, rset=rset2))
def test_possible_views_multiple_different_types(self):
- rset, req = self.env.get_rset_and_req('Any X')
+ rset, req = self.rset_and_req('Any X')
self.assertListEqual(self.pviews(req, rset),
[('csvexport', csvexport.CSVRsetView),
('ecsvexport', csvexport.CSVEntityView),
@@ -176,7 +176,7 @@
])
def test_possible_views_any_rset(self):
- rset, req = self.env.get_rset_and_req('Any N, X WHERE X in_group Y, Y name N')
+ rset, req = self.rset_and_req('Any N, X WHERE X in_group Y, Y name N')
self.assertListEqual(self.pviews(req, rset),
[('csvexport', csvexport.CSVRsetView),
('editable-table', tableview.EditableTableView),
@@ -185,7 +185,7 @@
])
def test_possible_views_multiple_eusers(self):
- rset, req = self.env.get_rset_and_req('CWUser X')
+ rset, req = self.rset_and_req('CWUser X')
self.assertListEqual(self.pviews(req, rset),
[('adaptedlist', baseviews.AdaptedListView),
('csvexport', csvexport.CSVRsetView),
@@ -218,14 +218,14 @@
})
def test_possible_actions_no_entity(self):
- rset, req = self.env.get_rset_and_req('Any X WHERE X eid 999999')
+ rset, req = self.rset_and_req('Any X WHERE X eid 999999')
self.assertDictEqual(self.pactions(req, rset),
{'useractions': USERACTIONS,
'siteactions': SITEACTIONS,
})
def test_possible_actions_same_type_entities(self):
- rset, req = self.env.get_rset_and_req('CWGroup X')
+ rset, req = self.rset_and_req('CWGroup X')
self.assertDictEqual(self.pactions(req, rset),
{'useractions': USERACTIONS,
'siteactions': SITEACTIONS,
@@ -235,7 +235,7 @@
})
def test_possible_actions_different_types_entities(self):
- rset, req = self.env.get_rset_and_req('Any X')
+ rset, req = self.rset_and_req('Any X')
self.assertDictEqual(self.pactions(req, rset),
{'useractions': USERACTIONS,
'siteactions': SITEACTIONS,
@@ -243,13 +243,13 @@
})
def test_possible_actions_final_entities(self):
- rset, req = self.env.get_rset_and_req('Any N, X WHERE X in_group Y, Y name N')
+ rset, req = self.rset_and_req('Any N, X WHERE X in_group Y, Y name N')
self.assertDictEqual(self.pactions(req, rset),
{'useractions': USERACTIONS,
'siteactions': SITEACTIONS})
def test_possible_actions_eetype_cwuser_entity(self):
- rset, req = self.env.get_rset_and_req('CWEType X WHERE X name "CWUser"')
+ rset, req = self.rset_and_req('CWEType X WHERE X name "CWUser"')
self.assertDictEqual(self.pactions(req, rset),
{'useractions': USERACTIONS,
'siteactions': SITEACTIONS,
@@ -291,7 +291,7 @@
self.vreg['views'].select, 'table', req, rset=rset)
# no entity
- rset, req = self.env.get_rset_and_req('Any X WHERE X eid 999999')
+ rset, req = self.rset_and_req('Any X WHERE X eid 999999')
self.failUnlessRaises(NoSelectableObject,
self.vreg['views'].select, 'index', req, rset=rset)
self.failUnlessRaises(NoSelectableObject,
@@ -301,7 +301,7 @@
self.failUnlessRaises(NoSelectableObject,
self.vreg['views'].select, 'table', req, rset=rset)
# one entity
- rset, req = self.env.get_rset_and_req('CWGroup X WHERE X name "managers"')
+ rset, req = self.rset_and_req('CWGroup X WHERE X name "managers"')
self.assertIsInstance(self.vreg['views'].select('primary', req, rset=rset),
primary.PrimaryView)
self.assertIsInstance(self.vreg['views'].select('list', req, rset=rset),
@@ -315,7 +315,7 @@
self.failUnlessRaises(NoSelectableObject,
self.vreg['views'].select, 'index', req, rset=rset)
# list of entities of the same type
- rset, req = self.env.get_rset_and_req('CWGroup X')
+ rset, req = self.rset_and_req('CWGroup X')
self.assertIsInstance(self.vreg['views'].select('primary', req, rset=rset),
primary.PrimaryView)
self.assertIsInstance(self.vreg['views'].select('list', req, rset=rset),
@@ -325,7 +325,7 @@
self.failUnlessRaises(NoSelectableObject,
self.vreg['views'].select, 'creation', req, rset=rset)
# list of entities of different types
- rset, req = self.env.get_rset_and_req('Any X')
+ rset, req = self.rset_and_req('Any X')
self.assertIsInstance(self.vreg['views'].select('primary', req, rset=rset),
primary.PrimaryView)
self.assertIsInstance(self.vreg['views'].select('list', req, rset=rset),
@@ -337,7 +337,7 @@
self.failUnlessRaises(NoSelectableObject,
self.vreg['views'].select, 'index', req, rset=rset)
# whatever
- rset, req = self.env.get_rset_and_req('Any N, X WHERE X in_group Y, Y name N')
+ rset, req = self.rset_and_req('Any N, X WHERE X in_group Y, Y name N')
self.assertIsInstance(self.vreg['views'].select('table', req, rset=rset),
tableview.TableView)
self.failUnlessRaises(NoSelectableObject,
@@ -351,7 +351,7 @@
self.failUnlessRaises(NoSelectableObject,
self.vreg['views'].select, 'edition', req, rset=rset)
# mixed query
- rset, req = self.env.get_rset_and_req('Any U,G WHERE U is CWUser, G is CWGroup')
+ rset, req = self.rset_and_req('Any U,G WHERE U is CWUser, G is CWGroup')
self.failUnlessRaises(NoSelectableObject,
self.vreg['views'].select, 'edition', req, rset=rset)
self.failUnlessRaises(NoSelectableObject,
@@ -362,7 +362,7 @@
def test_interface_selector(self):
image = self.add_entity('Image', name=u'bim.png', data=Binary('bim'))
# image primary view priority
- rset, req = self.env.get_rset_and_req('Image X WHERE X name "bim.png"')
+ rset, req = self.rset_and_req('Image X WHERE X name "bim.png"')
self.assertIsInstance(self.vreg['views'].select('primary', req, rset=rset),
idownloadable.IDownloadablePrimaryView)
@@ -370,12 +370,12 @@
def test_score_entity_selector(self):
image = self.add_entity('Image', name=u'bim.png', data=Binary('bim'))
# image primary view priority
- rset, req = self.env.get_rset_and_req('Image X WHERE X name "bim.png"')
+ rset, req = self.rset_and_req('Image X WHERE X name "bim.png"')
self.assertIsInstance(self.vreg['views'].select('image', req, rset=rset),
idownloadable.ImageView)
fileobj = self.add_entity('File', name=u'bim.txt', data=Binary('bim'))
# image primary view priority
- rset, req = self.env.get_rset_and_req('File X WHERE X name "bim.txt"')
+ rset, req = self.rset_and_req('File X WHERE X name "bim.txt"')
self.assertRaises(NoSelectableObject, self.vreg['views'].select, 'image', req, rset=rset)
@@ -385,7 +385,7 @@
rset = None
req = self.request()
else:
- rset, req = self.env.get_rset_and_req(rql)
+ rset, req = self.rset_and_req(rql)
try:
self.vreg['views'].render(vid, req, rset=rset, **args)
except:
@@ -438,7 +438,7 @@
del self.vreg['actions']['testaction']
def test(self):
- rset, req = self.env.get_rset_and_req('CWEType X WHERE X name "CWEType"')
+ rset, req = self.rset_and_req('CWEType X WHERE X name "CWEType"')
self.assertDictEqual(self.pactions(req, rset),
{'useractions': USERACTIONS,
'siteactions': SITEACTIONS,
@@ -449,7 +449,7 @@
('testaction', CWETypeRQLAction),
],
})
- rset, req = self.env.get_rset_and_req('CWEType X WHERE X name "CWRType"')
+ rset, req = self.rset_and_req('CWEType X WHERE X name "CWRType"')
self.assertDictEqual(self.pactions(req, rset),
{'useractions': USERACTIONS,
'siteactions': SITEACTIONS,
--- a/web/test/unittest_webconfig.py Tue Aug 11 17:19:05 2009 +0200
+++ b/web/test/unittest_webconfig.py Tue Aug 11 17:28:18 2009 +0200
@@ -9,8 +9,7 @@
from logilab.common.testlib import TestCase, unittest_main
-from cubicweb.devtools._apptest import FakeRequest
-from cubicweb.devtools import ApptestConfiguration
+from cubicweb.devtools import ApptestConfiguration, fake
class WebconfigTC(TestCase):
def setUp(self):
@@ -21,7 +20,7 @@
def test_nonregr_print_css_as_list(self):
"""make sure PRINT_CSS *must* is a list"""
config = self.config
- req = FakeRequest()
+ req = fake.FakeRequest()
print_css = req.external_resource('STYLESHEETS_PRINT')
self.failUnless(isinstance(print_css, list))
ie_css = req.external_resource('IE_STYLESHEETS')
--- a/web/views/basetemplates.py Tue Aug 11 17:19:05 2009 +0200
+++ b/web/views/basetemplates.py Tue Aug 11 17:28:18 2009 +0200
@@ -154,12 +154,11 @@
w(u'<div id="page"><table width="100%" border="0" id="mainLayout"><tr>\n')
self.nav_column(view, 'left')
w(u'<td id="contentcol">\n')
- rqlcomp = self.vreg['components'].select_object('rqlinput', self.req,
- rset=self.rset)
+ components = self.vreg['components']
+ rqlcomp = components.select_or_none('rqlinput', self.req, rset=self.rset)
if rqlcomp:
rqlcomp.render(w=self.w, view=view)
- msgcomp = self.vreg['components'].select_object('applmessages',
- self.req, rset=self.rset)
+ msgcomp = components.select_or_none('applmessages', self.req, rset=self.rset)
if msgcomp:
msgcomp.render(w=self.w)
self.content_header(view)
@@ -299,8 +298,8 @@
self.req.add_js(jscript, localfile=False)
def alternates(self):
- urlgetter = self.vreg['components'].select_object('rss_feed_url',
- self.req, rset=self.rset)
+ urlgetter = self.vreg['components'].select_or_none('rss_feed_url',
+ self.req, rset=self.rset)
if urlgetter is not None:
self.whead(u'<link rel="alternate" type="application/rss+xml" title="RSS feed" href="%s"/>\n'
% xml_escape(urlgetter.feed_url()))
--- a/web/views/navigation.py Tue Aug 11 17:19:05 2009 +0200
+++ b/web/views/navigation.py Tue Aug 11 17:28:18 2009 +0200
@@ -149,8 +149,8 @@
def limit_rset_using_paged_nav(self, req, rset, w, forcedisplay=False,
show_all_option=True, page_size=None):
if not (forcedisplay or req.form.get('__force_display') is not None):
- nav = self.vreg['components'].select_object('navigation', req,
- rset=rset, page_size=page_size)
+ nav = self.vreg['components'].select_or_none('navigation', req,
+ rset=rset, page_size=page_size)
if nav:
# get boundaries before component rendering
start, stop = nav.page_boundaries()
--- a/web/views/urlpublishing.py Tue Aug 11 17:19:05 2009 +0200
+++ b/web/views/urlpublishing.py Tue Aug 11 17:28:18 2009 +0200
@@ -217,6 +217,7 @@
raise PathDontMatch()
# remove last part and see if this is something like an actions
# if so, call
+ # XXX bad smell: refactor to simpler code
try:
actionsreg = self.vreg['actions']
requested = parts.pop(-1)
@@ -232,7 +233,7 @@
continue
else:
try:
- action = actionsreg.select_best(actions, req, rset=rset)
+ action = actionsreg._select_best(actions, req, rset=rset)
except RegistryException:
continue
else:
--- a/wsgi/handler.py Tue Aug 11 17:19:05 2009 +0200
+++ b/wsgi/handler.py Tue Aug 11 17:28:18 2009 +0200
@@ -97,7 +97,7 @@
# assert self.base_url[-1] == '/'
# self.https_url = config['https-url']
# assert not self.https_url or self.https_url[-1] == '/'
- self.url_rewriter = self.appli.vreg.select_object('components', 'urlrewriter')
+ self.url_rewriter = self.appli.vreg['components'].select_or_none('urlrewriter')
def _render(self, req):
"""this function performs the actual rendering