devtools/apptest.py
changeset 2774 a9a2dca5db20
parent 2771 8074dd88e21b
parent 2773 b2530e3e0afb
child 2777 8f7fcbe11879
--- a/devtools/apptest.py	Tue Aug 11 17:06:02 2009 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,506 +0,0 @@
-"""This module provides misc utilities to test instances
-
-:organization: Logilab
-:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.
-:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
-:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
-"""
-__docformat__ = "restructuredtext en"
-
-from copy import deepcopy
-
-import simplejson
-
-from logilab.common.testlib import TestCase
-from logilab.common.pytest import nocoverage
-from logilab.common.umessage import message_from_string
-
-from logilab.common.deprecation import deprecated
-
-from cubicweb.devtools import init_test_database, TestServerConfiguration, ApptestConfiguration
-from cubicweb.devtools._apptest import TestEnvironment
-from cubicweb.devtools.fake import FakeRequest
-
-from cubicweb.dbapi import repo_connect, ConnectionProperties, ProgrammingError
-
-
-MAILBOX = []
-class Email:
-    def __init__(self, recipients, msg):
-        self.recipients = recipients
-        self.msg = msg
-
-    @property
-    def message(self):
-        return message_from_string(self.msg)
-
-    @property
-    def subject(self):
-        return self.message.get('Subject')
-
-    @property
-    def content(self):
-        return self.message.get_payload(decode=True)
-
-    def __repr__(self):
-        return '<Email to %s with subject %s>' % (','.join(self.recipients),
-                                                  self.message.get('Subject'))
-
-class MockSMTP:
-    def __init__(self, server, port):
-        pass
-    def close(self):
-        pass
-    def sendmail(self, helo_addr, recipients, msg):
-        MAILBOX.append(Email(recipients, msg))
-
-from cubicweb import cwconfig
-cwconfig.SMTP = MockSMTP
-
-
-def get_versions(self, checkversions=False):
-    """return the a dictionary containing cubes used by this instance
-    as key with their version as value, including cubicweb version. This is a
-    public method, not requiring a session id.
-
-    replace Repository.get_versions by this method if you don't want versions
-    checking
-    """
-    vcconf = {'cubicweb': self.config.cubicweb_version()}
-    self.config.bootstrap_cubes()
-    for pk in self.config.cubes():
-        version = self.config.cube_version(pk)
-        vcconf[pk] = version
-    self.config._cubes = None
-    return vcconf
-
-
-@property
-def late_binding_env(self):
-    """builds TestEnvironment as late as possible"""
-    if not hasattr(self, '_env'):
-        self.__class__._env = TestEnvironment('data', configcls=self.configcls,
-                                              requestcls=self.requestcls)
-    return self._env
-
-
-class autoenv(type):
-    """automatically set environment on EnvBasedTC subclasses if necessary
-    """
-    def __new__(mcs, name, bases, classdict):
-        env = classdict.get('env')
-        # try to find env in one of the base classes
-        if env is None:
-            for base in bases:
-                env = getattr(base, 'env', None)
-                if env is not None:
-                    classdict['env'] = env
-                    break
-        if not classdict.get('__abstract__')  and not classdict.get('env'):
-            classdict['env'] = late_binding_env
-        return super(autoenv, mcs).__new__(mcs, name, bases, classdict)
-
-
-class EnvBasedTC(TestCase):
-    """abstract class for test using an apptest environment
-    """
-    __metaclass__ = autoenv
-    __abstract__ = True
-    env = None
-    configcls = ApptestConfiguration
-    requestcls = FakeRequest
-
-    # user / session management ###############################################
-
-    def user(self, req=None):
-        if req is None:
-            req = self.env.create_request()
-            return self.env.cnx.user(req)
-        else:
-            return req.user
-
-    def create_user(self, *args, **kwargs):
-        return self.env.create_user(*args, **kwargs)
-
-    def login(self, login, password=None):
-        return self.env.login(login, password)
-
-    def restore_connection(self):
-        self.env.restore_connection()
-
-    # db api ##################################################################
-
-    @nocoverage
-    def cursor(self, req=None):
-        return self.env.cnx.cursor(req or self.request())
-
-    @nocoverage
-    def execute(self, *args, **kwargs):
-        return self.env.execute(*args, **kwargs)
-
-    @nocoverage
-    def commit(self):
-        self.env.cnx.commit()
-
-    @nocoverage
-    def rollback(self):
-        try:
-            self.env.cnx.rollback()
-        except ProgrammingError:
-            pass
-
-    # other utilities #########################################################
-    def set_debug(self, debugmode):
-        from cubicweb.server import set_debug
-        set_debug(debugmode)
-
-    @property
-    def config(self):
-        return self.vreg.config
-
-    def session(self):
-        """return current server side session (using default manager account)"""
-        return self.env.repo._sessions[self.env.cnx.sessionid]
-
-    def request(self, *args, **kwargs):
-        """return a web interface request"""
-        return self.env.create_request(*args, **kwargs)
-
-    @nocoverage
-    def rset_and_req(self, *args, **kwargs):
-        return self.env.get_rset_and_req(*args, **kwargs)
-
-    def entity(self, rql, args=None, eidkey=None, req=None):
-        return self.execute(rql, args, eidkey, req=req).get_entity(0, 0)
-
-    def etype_instance(self, etype, req=None):
-        req = req or self.request()
-        e = self.env.vreg['etypes'].etype_class(etype)(req)
-        e.eid = None
-        return e
-
-    def add_entity(self, etype, **kwargs):
-        rql = ['INSERT %s X' % etype]
-
-        # dict for replacement in RQL Request
-        rql_args = {}
-
-        if kwargs: #
-            rql.append(':')
-            # dict to define new entities variables
-            entities = {}
-
-            # assignement part of the request
-            sub_rql = []
-            for key, value in kwargs.iteritems():
-                # entities
-                if hasattr(value, 'eid'):
-                    new_value = "%s__" % key.upper()
-
-                    entities[new_value] = value.eid
-                    rql_args[new_value] = value.eid
-
-                    sub_rql.append("X %s %s" % (key, new_value))
-                # final attributes
-                else:
-                    sub_rql.append('X %s %%(%s)s' % (key, key))
-                    rql_args[key] = value
-            rql.append(', '.join(sub_rql))
-
-
-            if entities:
-                rql.append('WHERE')
-                # WHERE part of the request (to link entity to they eid)
-                sub_rql = []
-                for key, value in entities.iteritems():
-                    sub_rql.append("%s eid %%(%s)s" % (key, key))
-                rql.append(', '.join(sub_rql))
-
-        rql = ' '.join(rql)
-        rset = self.execute(rql, rql_args)
-        return rset.get_entity(0, 0)
-
-    def set_option(self, optname, value):
-        self.vreg.config.global_set_option(optname, value)
-
-    def pviews(self, req, rset):
-        return sorted((a.id, a.__class__) for a in self.vreg['views'].possible_views(req, rset=rset))
-
-    def pactions(self, req, rset, skipcategories=('addrelated', 'siteactions', 'useractions')):
-        return [(a.id, a.__class__) for a in self.vreg['actions'].possible_vobjects(req, rset=rset)
-                if a.category not in skipcategories]
-
-    def pactions_by_cats(self, req, rset, categories=('addrelated',)):
-        return [(a.id, a.__class__) for a in self.vreg['actions'].possible_vobjects(req, rset=rset)
-                if a.category in categories]
-
-    paddrelactions = deprecated()(pactions_by_cats)
-
-    def pactionsdict(self, req, rset, skipcategories=('addrelated', 'siteactions', 'useractions')):
-        res = {}
-        for a in self.vreg['actions'].possible_vobjects(req, rset=rset):
-            if a.category not in skipcategories:
-                res.setdefault(a.category, []).append(a.__class__)
-        return res
-
-
-    def remote_call(self, fname, *args):
-        """remote call simulation"""
-        dump = simplejson.dumps
-        args = [dump(arg) for arg in args]
-        req = self.request(fname=fname, pageid='123', arg=args)
-        ctrl = self.vreg['controllers'].select('json', req)
-        return ctrl.publish(), req
-
-    # default test setup and teardown #########################################
-
-    def setup_database(self):
-        pass
-
-    def setUp(self):
-        self.restore_connection()
-        session = self.session()
-        #self.maxeid = self.execute('Any MAX(X)')
-        session.set_pool()
-        self.maxeid = session.system_sql('SELECT MAX(eid) FROM entities').fetchone()[0]
-        self.app = self.env.app
-        self.vreg = self.env.app.vreg
-        self.schema = self.vreg.schema
-        self.vreg.config.mode = 'test'
-        # set default-dest-addrs to a dumb email address to avoid mailbox or
-        # mail queue pollution
-        self.set_option('default-dest-addrs', ['whatever'])
-        self.setup_database()
-        self.commit()
-        MAILBOX[:] = [] # reset mailbox
-
-    @nocoverage
-    def tearDown(self):
-        self.rollback()
-        # self.env.restore_database()
-        self.env.restore_connection()
-        self.session().unsafe_execute('DELETE Any X WHERE X eid > %s' % self.maxeid)
-        self.commit()
-
-    # global resources accessors ###############################################
-
-# XXX
-try:
-    from cubicweb.web import Redirect
-    from urllib import unquote
-except ImportError:
-    pass # cubicweb-web not installed
-else:
-    class ControllerTC(EnvBasedTC):
-        def setUp(self):
-            super(ControllerTC, self).setUp()
-            self.req = self.request()
-            self.ctrl = self.vreg['controllers'].select('edit', self.req)
-
-        def publish(self, req):
-            assert req is self.ctrl.req
-            try:
-                result = self.ctrl.publish()
-                req.cnx.commit()
-            except Redirect:
-                req.cnx.commit()
-                raise
-            return result
-
-        def expect_redirect_publish(self, req=None):
-            if req is not None:
-                self.ctrl = self.vreg['controllers'].select('edit', req)
-            else:
-                req = self.req
-            try:
-                res = self.publish(req)
-            except Redirect, ex:
-                try:
-                    path, params = ex.location.split('?', 1)
-                except:
-                    path, params = ex.location, ""
-                req._url = path
-                cleanup = lambda p: (p[0], unquote(p[1]))
-                params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p)
-                return req.relative_path(False), params # path.rsplit('/', 1)[-1], params
-            else:
-                self.fail('expected a Redirect exception')
-
-
-def make_late_binding_repo_property(attrname):
-    @property
-    def late_binding(self):
-        """builds cnx as late as possible"""
-        if not hasattr(self, attrname):
-            # sets explicit test mode here to avoid autoreload
-            from cubicweb.cwconfig import CubicWebConfiguration
-            CubicWebConfiguration.mode = 'test'
-            cls = self.__class__
-            config = self.repo_config or TestServerConfiguration('data')
-            cls._repo, cls._cnx = init_test_database('sqlite',  config=config)
-        return getattr(self, attrname)
-    return late_binding
-
-
-class autorepo(type):
-    """automatically set repository on RepositoryBasedTC subclasses if necessary
-    """
-    def __new__(mcs, name, bases, classdict):
-        repo = classdict.get('repo')
-        # try to find repo in one of the base classes
-        if repo is None:
-            for base in bases:
-                repo = getattr(base, 'repo', None)
-                if repo is not None:
-                    classdict['repo'] = repo
-                    break
-        if name != 'RepositoryBasedTC' and not classdict.get('repo'):
-            classdict['repo'] = make_late_binding_repo_property('_repo')
-            classdict['cnx'] = make_late_binding_repo_property('_cnx')
-        return super(autorepo, mcs).__new__(mcs, name, bases, classdict)
-
-
-class RepositoryBasedTC(TestCase):
-    """abstract class for test using direct repository connections
-    """
-    __metaclass__ = autorepo
-    repo_config = None # set a particular config instance if necessary
-
-    # user / session management ###############################################
-
-    def create_user(self, user, groups=('users',), password=None, commit=True):
-        if password is None:
-            password = user
-        eid = self.execute('INSERT CWUser X: X login %(x)s, X upassword %(p)s,'
-                            'X in_state S WHERE S name "activated"',
-                            {'x': unicode(user), 'p': password})[0][0]
-        groups = ','.join(repr(group) for group in groups)
-        self.execute('SET X in_group Y WHERE X eid %%(x)s, Y name IN (%s)' % groups,
-                      {'x': eid})
-        if commit:
-            self.commit()
-        self.session.reset_pool()
-        return eid
-
-    def login(self, login, password=None):
-        cnx = repo_connect(self.repo, unicode(login), password or login,
-                           ConnectionProperties('inmemory'))
-        self.cnxs.append(cnx)
-        return cnx
-
-    def current_session(self):
-        return self.repo._sessions[self.cnxs[-1].sessionid]
-
-    def restore_connection(self):
-        assert len(self.cnxs) == 1, self.cnxs
-        cnx = self.cnxs.pop()
-        try:
-            cnx.close()
-        except Exception, ex:
-            print "exception occured while closing connection", ex
-
-    # db api ##################################################################
-
-    def execute(self, rql, args=None, eid_key=None):
-        assert self.session.id == self.cnxid
-        rset = self.__execute(self.cnxid, rql, args, eid_key)
-        rset.vreg = self.vreg
-        rset.req = self.session
-        # call to set_pool is necessary to avoid pb when using
-        # instance entities for convenience
-        self.session.set_pool()
-        return rset
-
-    def commit(self):
-        self.__commit(self.cnxid)
-        self.session.set_pool()
-
-    def rollback(self):
-        self.__rollback(self.cnxid)
-        self.session.set_pool()
-
-    def close(self):
-        self.__close(self.cnxid)
-
-    # other utilities #########################################################
-
-    def set_debug(self, debugmode):
-        from cubicweb.server import set_debug
-        set_debug(debugmode)
-
-    def set_option(self, optname, value):
-        self.vreg.config.global_set_option(optname, value)
-
-    def add_entity(self, etype, **kwargs):
-        restrictions = ', '.join('X %s %%(%s)s' % (key, key) for key in kwargs)
-        rql = 'INSERT %s X' % etype
-        if kwargs:
-            rql += ': %s' % ', '.join('X %s %%(%s)s' % (key, key) for key in kwargs)
-        rset = self.execute(rql, kwargs)
-        return rset.get_entity(0, 0)
-
-    def default_user_password(self):
-        config = self.repo.config #TestConfiguration('data')
-        user = unicode(config.sources()['system']['db-user'])
-        passwd = config.sources()['system']['db-password']
-        return user, passwd
-
-    def close_connections(self):
-        for cnx in self.cnxs:
-            try:
-                cnx.rollback()
-                cnx.close()
-            except:
-                continue
-        self.cnxs = []
-
-    pactions = EnvBasedTC.pactions.im_func
-    pactionsdict = EnvBasedTC.pactionsdict.im_func
-
-    # default test setup and teardown #########################################
-
-    def _prepare(self):
-        MAILBOX[:] = [] # reset mailbox
-        if hasattr(self, 'cnxid'):
-            return
-        repo = self.repo
-        self.__execute = repo.execute
-        self.__commit = repo.commit
-        self.__rollback = repo.rollback
-        self.__close = repo.close
-        self.cnxid = self.cnx.sessionid
-        self.session = repo._sessions[self.cnxid]
-        self.cnxs = []
-        # reset caches, they may introduce bugs among tests
-        repo._type_source_cache = {}
-        repo._extid_cache = {}
-        repo.querier._rql_cache = {}
-        for source in repo.sources:
-            source.reset_caches()
-        for s in repo.sources:
-            if hasattr(s, '_cache'):
-                s._cache = {}
-
-    @property
-    def config(self):
-        return self.repo.config
-
-    @property
-    def vreg(self):
-        return self.repo.vreg
-
-    @property
-    def schema(self):
-        return self.repo.schema
-
-    def setUp(self):
-        self._prepare()
-        self.session.set_pool()
-        self.maxeid = self.session.system_sql('SELECT MAX(eid) FROM entities').fetchone()[0]
-
-    def tearDown(self):
-        self.close_connections()
-        self.rollback()
-        self.session.unsafe_execute('DELETE Any X WHERE X eid > %(x)s', {'x': self.maxeid})
-        self.commit()
-