--- a/devtools/apptest.py Tue Aug 11 17:06:02 2009 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,506 +0,0 @@
-"""This module provides misc utilities to test instances
-
-:organization: Logilab
-:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.
-:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
-:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
-"""
-__docformat__ = "restructuredtext en"
-
-from copy import deepcopy
-
-import simplejson
-
-from logilab.common.testlib import TestCase
-from logilab.common.pytest import nocoverage
-from logilab.common.umessage import message_from_string
-
-from logilab.common.deprecation import deprecated
-
-from cubicweb.devtools import init_test_database, TestServerConfiguration, ApptestConfiguration
-from cubicweb.devtools._apptest import TestEnvironment
-from cubicweb.devtools.fake import FakeRequest
-
-from cubicweb.dbapi import repo_connect, ConnectionProperties, ProgrammingError
-
-
-MAILBOX = []
-class Email:
- def __init__(self, recipients, msg):
- self.recipients = recipients
- self.msg = msg
-
- @property
- def message(self):
- return message_from_string(self.msg)
-
- @property
- def subject(self):
- return self.message.get('Subject')
-
- @property
- def content(self):
- return self.message.get_payload(decode=True)
-
- def __repr__(self):
- return '<Email to %s with subject %s>' % (','.join(self.recipients),
- self.message.get('Subject'))
-
-class MockSMTP:
- def __init__(self, server, port):
- pass
- def close(self):
- pass
- def sendmail(self, helo_addr, recipients, msg):
- MAILBOX.append(Email(recipients, msg))
-
-from cubicweb import cwconfig
-cwconfig.SMTP = MockSMTP
-
-
-def get_versions(self, checkversions=False):
- """return the a dictionary containing cubes used by this instance
- as key with their version as value, including cubicweb version. This is a
- public method, not requiring a session id.
-
- replace Repository.get_versions by this method if you don't want versions
- checking
- """
- vcconf = {'cubicweb': self.config.cubicweb_version()}
- self.config.bootstrap_cubes()
- for pk in self.config.cubes():
- version = self.config.cube_version(pk)
- vcconf[pk] = version
- self.config._cubes = None
- return vcconf
-
-
-@property
-def late_binding_env(self):
- """builds TestEnvironment as late as possible"""
- if not hasattr(self, '_env'):
- self.__class__._env = TestEnvironment('data', configcls=self.configcls,
- requestcls=self.requestcls)
- return self._env
-
-
-class autoenv(type):
- """automatically set environment on EnvBasedTC subclasses if necessary
- """
- def __new__(mcs, name, bases, classdict):
- env = classdict.get('env')
- # try to find env in one of the base classes
- if env is None:
- for base in bases:
- env = getattr(base, 'env', None)
- if env is not None:
- classdict['env'] = env
- break
- if not classdict.get('__abstract__') and not classdict.get('env'):
- classdict['env'] = late_binding_env
- return super(autoenv, mcs).__new__(mcs, name, bases, classdict)
-
-
-class EnvBasedTC(TestCase):
- """abstract class for test using an apptest environment
- """
- __metaclass__ = autoenv
- __abstract__ = True
- env = None
- configcls = ApptestConfiguration
- requestcls = FakeRequest
-
- # user / session management ###############################################
-
- def user(self, req=None):
- if req is None:
- req = self.env.create_request()
- return self.env.cnx.user(req)
- else:
- return req.user
-
- def create_user(self, *args, **kwargs):
- return self.env.create_user(*args, **kwargs)
-
- def login(self, login, password=None):
- return self.env.login(login, password)
-
- def restore_connection(self):
- self.env.restore_connection()
-
- # db api ##################################################################
-
- @nocoverage
- def cursor(self, req=None):
- return self.env.cnx.cursor(req or self.request())
-
- @nocoverage
- def execute(self, *args, **kwargs):
- return self.env.execute(*args, **kwargs)
-
- @nocoverage
- def commit(self):
- self.env.cnx.commit()
-
- @nocoverage
- def rollback(self):
- try:
- self.env.cnx.rollback()
- except ProgrammingError:
- pass
-
- # other utilities #########################################################
- def set_debug(self, debugmode):
- from cubicweb.server import set_debug
- set_debug(debugmode)
-
- @property
- def config(self):
- return self.vreg.config
-
- def session(self):
- """return current server side session (using default manager account)"""
- return self.env.repo._sessions[self.env.cnx.sessionid]
-
- def request(self, *args, **kwargs):
- """return a web interface request"""
- return self.env.create_request(*args, **kwargs)
-
- @nocoverage
- def rset_and_req(self, *args, **kwargs):
- return self.env.get_rset_and_req(*args, **kwargs)
-
- def entity(self, rql, args=None, eidkey=None, req=None):
- return self.execute(rql, args, eidkey, req=req).get_entity(0, 0)
-
- def etype_instance(self, etype, req=None):
- req = req or self.request()
- e = self.env.vreg['etypes'].etype_class(etype)(req)
- e.eid = None
- return e
-
- def add_entity(self, etype, **kwargs):
- rql = ['INSERT %s X' % etype]
-
- # dict for replacement in RQL Request
- rql_args = {}
-
- if kwargs: #
- rql.append(':')
- # dict to define new entities variables
- entities = {}
-
- # assignement part of the request
- sub_rql = []
- for key, value in kwargs.iteritems():
- # entities
- if hasattr(value, 'eid'):
- new_value = "%s__" % key.upper()
-
- entities[new_value] = value.eid
- rql_args[new_value] = value.eid
-
- sub_rql.append("X %s %s" % (key, new_value))
- # final attributes
- else:
- sub_rql.append('X %s %%(%s)s' % (key, key))
- rql_args[key] = value
- rql.append(', '.join(sub_rql))
-
-
- if entities:
- rql.append('WHERE')
- # WHERE part of the request (to link entity to they eid)
- sub_rql = []
- for key, value in entities.iteritems():
- sub_rql.append("%s eid %%(%s)s" % (key, key))
- rql.append(', '.join(sub_rql))
-
- rql = ' '.join(rql)
- rset = self.execute(rql, rql_args)
- return rset.get_entity(0, 0)
-
- def set_option(self, optname, value):
- self.vreg.config.global_set_option(optname, value)
-
- def pviews(self, req, rset):
- return sorted((a.id, a.__class__) for a in self.vreg['views'].possible_views(req, rset=rset))
-
- def pactions(self, req, rset, skipcategories=('addrelated', 'siteactions', 'useractions')):
- return [(a.id, a.__class__) for a in self.vreg['actions'].possible_vobjects(req, rset=rset)
- if a.category not in skipcategories]
-
- def pactions_by_cats(self, req, rset, categories=('addrelated',)):
- return [(a.id, a.__class__) for a in self.vreg['actions'].possible_vobjects(req, rset=rset)
- if a.category in categories]
-
- paddrelactions = deprecated()(pactions_by_cats)
-
- def pactionsdict(self, req, rset, skipcategories=('addrelated', 'siteactions', 'useractions')):
- res = {}
- for a in self.vreg['actions'].possible_vobjects(req, rset=rset):
- if a.category not in skipcategories:
- res.setdefault(a.category, []).append(a.__class__)
- return res
-
-
- def remote_call(self, fname, *args):
- """remote call simulation"""
- dump = simplejson.dumps
- args = [dump(arg) for arg in args]
- req = self.request(fname=fname, pageid='123', arg=args)
- ctrl = self.vreg['controllers'].select('json', req)
- return ctrl.publish(), req
-
- # default test setup and teardown #########################################
-
- def setup_database(self):
- pass
-
- def setUp(self):
- self.restore_connection()
- session = self.session()
- #self.maxeid = self.execute('Any MAX(X)')
- session.set_pool()
- self.maxeid = session.system_sql('SELECT MAX(eid) FROM entities').fetchone()[0]
- self.app = self.env.app
- self.vreg = self.env.app.vreg
- self.schema = self.vreg.schema
- self.vreg.config.mode = 'test'
- # set default-dest-addrs to a dumb email address to avoid mailbox or
- # mail queue pollution
- self.set_option('default-dest-addrs', ['whatever'])
- self.setup_database()
- self.commit()
- MAILBOX[:] = [] # reset mailbox
-
- @nocoverage
- def tearDown(self):
- self.rollback()
- # self.env.restore_database()
- self.env.restore_connection()
- self.session().unsafe_execute('DELETE Any X WHERE X eid > %s' % self.maxeid)
- self.commit()
-
- # global resources accessors ###############################################
-
-# XXX
-try:
- from cubicweb.web import Redirect
- from urllib import unquote
-except ImportError:
- pass # cubicweb-web not installed
-else:
- class ControllerTC(EnvBasedTC):
- def setUp(self):
- super(ControllerTC, self).setUp()
- self.req = self.request()
- self.ctrl = self.vreg['controllers'].select('edit', self.req)
-
- def publish(self, req):
- assert req is self.ctrl.req
- try:
- result = self.ctrl.publish()
- req.cnx.commit()
- except Redirect:
- req.cnx.commit()
- raise
- return result
-
- def expect_redirect_publish(self, req=None):
- if req is not None:
- self.ctrl = self.vreg['controllers'].select('edit', req)
- else:
- req = self.req
- try:
- res = self.publish(req)
- except Redirect, ex:
- try:
- path, params = ex.location.split('?', 1)
- except:
- path, params = ex.location, ""
- req._url = path
- cleanup = lambda p: (p[0], unquote(p[1]))
- params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p)
- return req.relative_path(False), params # path.rsplit('/', 1)[-1], params
- else:
- self.fail('expected a Redirect exception')
-
-
-def make_late_binding_repo_property(attrname):
- @property
- def late_binding(self):
- """builds cnx as late as possible"""
- if not hasattr(self, attrname):
- # sets explicit test mode here to avoid autoreload
- from cubicweb.cwconfig import CubicWebConfiguration
- CubicWebConfiguration.mode = 'test'
- cls = self.__class__
- config = self.repo_config or TestServerConfiguration('data')
- cls._repo, cls._cnx = init_test_database('sqlite', config=config)
- return getattr(self, attrname)
- return late_binding
-
-
-class autorepo(type):
- """automatically set repository on RepositoryBasedTC subclasses if necessary
- """
- def __new__(mcs, name, bases, classdict):
- repo = classdict.get('repo')
- # try to find repo in one of the base classes
- if repo is None:
- for base in bases:
- repo = getattr(base, 'repo', None)
- if repo is not None:
- classdict['repo'] = repo
- break
- if name != 'RepositoryBasedTC' and not classdict.get('repo'):
- classdict['repo'] = make_late_binding_repo_property('_repo')
- classdict['cnx'] = make_late_binding_repo_property('_cnx')
- return super(autorepo, mcs).__new__(mcs, name, bases, classdict)
-
-
-class RepositoryBasedTC(TestCase):
- """abstract class for test using direct repository connections
- """
- __metaclass__ = autorepo
- repo_config = None # set a particular config instance if necessary
-
- # user / session management ###############################################
-
- def create_user(self, user, groups=('users',), password=None, commit=True):
- if password is None:
- password = user
- eid = self.execute('INSERT CWUser X: X login %(x)s, X upassword %(p)s,'
- 'X in_state S WHERE S name "activated"',
- {'x': unicode(user), 'p': password})[0][0]
- groups = ','.join(repr(group) for group in groups)
- self.execute('SET X in_group Y WHERE X eid %%(x)s, Y name IN (%s)' % groups,
- {'x': eid})
- if commit:
- self.commit()
- self.session.reset_pool()
- return eid
-
- def login(self, login, password=None):
- cnx = repo_connect(self.repo, unicode(login), password or login,
- ConnectionProperties('inmemory'))
- self.cnxs.append(cnx)
- return cnx
-
- def current_session(self):
- return self.repo._sessions[self.cnxs[-1].sessionid]
-
- def restore_connection(self):
- assert len(self.cnxs) == 1, self.cnxs
- cnx = self.cnxs.pop()
- try:
- cnx.close()
- except Exception, ex:
- print "exception occured while closing connection", ex
-
- # db api ##################################################################
-
- def execute(self, rql, args=None, eid_key=None):
- assert self.session.id == self.cnxid
- rset = self.__execute(self.cnxid, rql, args, eid_key)
- rset.vreg = self.vreg
- rset.req = self.session
- # call to set_pool is necessary to avoid pb when using
- # instance entities for convenience
- self.session.set_pool()
- return rset
-
- def commit(self):
- self.__commit(self.cnxid)
- self.session.set_pool()
-
- def rollback(self):
- self.__rollback(self.cnxid)
- self.session.set_pool()
-
- def close(self):
- self.__close(self.cnxid)
-
- # other utilities #########################################################
-
- def set_debug(self, debugmode):
- from cubicweb.server import set_debug
- set_debug(debugmode)
-
- def set_option(self, optname, value):
- self.vreg.config.global_set_option(optname, value)
-
- def add_entity(self, etype, **kwargs):
- restrictions = ', '.join('X %s %%(%s)s' % (key, key) for key in kwargs)
- rql = 'INSERT %s X' % etype
- if kwargs:
- rql += ': %s' % ', '.join('X %s %%(%s)s' % (key, key) for key in kwargs)
- rset = self.execute(rql, kwargs)
- return rset.get_entity(0, 0)
-
- def default_user_password(self):
- config = self.repo.config #TestConfiguration('data')
- user = unicode(config.sources()['system']['db-user'])
- passwd = config.sources()['system']['db-password']
- return user, passwd
-
- def close_connections(self):
- for cnx in self.cnxs:
- try:
- cnx.rollback()
- cnx.close()
- except:
- continue
- self.cnxs = []
-
- pactions = EnvBasedTC.pactions.im_func
- pactionsdict = EnvBasedTC.pactionsdict.im_func
-
- # default test setup and teardown #########################################
-
- def _prepare(self):
- MAILBOX[:] = [] # reset mailbox
- if hasattr(self, 'cnxid'):
- return
- repo = self.repo
- self.__execute = repo.execute
- self.__commit = repo.commit
- self.__rollback = repo.rollback
- self.__close = repo.close
- self.cnxid = self.cnx.sessionid
- self.session = repo._sessions[self.cnxid]
- self.cnxs = []
- # reset caches, they may introduce bugs among tests
- repo._type_source_cache = {}
- repo._extid_cache = {}
- repo.querier._rql_cache = {}
- for source in repo.sources:
- source.reset_caches()
- for s in repo.sources:
- if hasattr(s, '_cache'):
- s._cache = {}
-
- @property
- def config(self):
- return self.repo.config
-
- @property
- def vreg(self):
- return self.repo.vreg
-
- @property
- def schema(self):
- return self.repo.schema
-
- def setUp(self):
- self._prepare()
- self.session.set_pool()
- self.maxeid = self.session.system_sql('SELECT MAX(eid) FROM entities').fetchone()[0]
-
- def tearDown(self):
- self.close_connections()
- self.rollback()
- self.session.unsafe_execute('DELETE Any X WHERE X eid > %(x)s', {'x': self.maxeid})
- self.commit()
-