devtools/_apptest.py
author Aurelien Campeas <aurelien.campeas@logilab.fr>
Wed, 09 Sep 2009 14:55:43 +0200
branch3.5
changeset 3138 b33eb9e91262
parent 2920 64322aa83a1d
child 3877 7ca53fc72a0a
child 4212 ab6573088b4a
permissions -rw-r--r--
add pyro

"""Hidden internals for the devtools.apptest module

:organization: Logilab
:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
"""
__docformat__ = "restructuredtext en"

import sys, traceback

from logilab.common.pytest import pause_tracing, resume_tracing

import yams.schema

from cubicweb.dbapi import repo_connect, ConnectionProperties, ProgrammingError
from cubicweb.cwvreg import CubicWebVRegistry

from cubicweb.web.application import CubicWebPublisher
from cubicweb.web import Redirect

from cubicweb.devtools import ApptestConfiguration, init_test_database
from cubicweb.devtools.fake import FakeRequest

SYSTEM_ENTITIES = ('CWGroup', 'CWUser',
                   'CWAttribute', 'CWRelation',
                   'CWConstraint', 'CWConstraintType', 'CWProperty',
                   'CWEType', 'CWRType',
                   'Workflow', 'State', 'BaseTransition', 'Transition', 'WorkflowTransition', 'TrInfo', 'SubWorkflowExitPoint',
                   'RQLExpression',
                   )
SYSTEM_RELATIONS = (
    # virtual relation
    'identity',
    # metadata
    'is', 'is_instance_of', 'owned_by', 'created_by', 'specializes',
    # workflow related
    'workflow_of', 'state_of', 'transition_of', 'initial_state', 'allowed_transition',
    'destination_state', 'in_state', 'wf_info_for', 'from_state', 'to_state',
    'condition', 'subworkflow', 'subworkflow_state', 'subworkflow_exit',
    # permission
    'in_group', 'require_group', 'require_permission',
    'read_permission', 'update_permission', 'delete_permission', 'add_permission',
    # eproperty
    'for_user',
    # schema definition
    'relation_type', 'from_entity', 'to_entity',
    'constrained_by', 'cstrtype', 'widget',
    # deducted from other relations
    'primary_email',
                    )

def unprotected_entities(app_schema, strict=False):
    """returned a Set of each non final entity type, excluding CWGroup, and CWUser...
    """
    if strict:
        protected_entities = yams.schema.BASE_TYPES
    else:
        protected_entities = yams.schema.BASE_TYPES.union(set(SYSTEM_ENTITIES))
    entities = set(app_schema.entities())
    return entities - protected_entities


def ignore_relations(*relations):
    global SYSTEM_RELATIONS
    SYSTEM_RELATIONS += relations


class TestEnvironment(object):
    """TestEnvironment defines a context (e.g. a config + a given connection) in
    which the tests are executed
    """

    def __init__(self, appid, reporter=None, verbose=False,
                 configcls=ApptestConfiguration, requestcls=FakeRequest):
        config = configcls(appid)
        self.requestcls = requestcls
        self.cnx = None
        config.db_perms = False
        source = config.sources()['system']
        if verbose:
            print "init test database ..."
        self.vreg = vreg = CubicWebVRegistry(config)
        self.admlogin = source['db-user']
        # restore database <=> init database
        self.restore_database()
        if verbose:
            print "init done"
        config.repository = lambda x=None: self.repo
        self.app = CubicWebPublisher(config, vreg=vreg)
        self.verbose = verbose
        schema = self.vreg.schema
        # else we may run into problems since email address are ususally share in app tests
        # XXX should not be necessary anymore
        schema.rschema('primary_email').set_rproperty('CWUser', 'EmailAddress', 'composite', False)
        self.deletable_entities = unprotected_entities(schema)

    def restore_database(self):
        """called by unittests' tearDown to restore the original database
        """
        try:
            pause_tracing()
            if self.cnx:
                self.cnx.close()
            source = self.vreg.config.sources()['system']
            self.repo, self.cnx = init_test_database(driver=source['db-driver'],
                                                     vreg=self.vreg)
            self._orig_cnx = self.cnx
            resume_tracing()
        except:
            resume_tracing()
            traceback.print_exc()
            sys.exit(1)
        # XXX cnx decoration is usually done by the repository authentication manager,
        # necessary in authentication tests
        self.cnx.vreg = self.vreg
        self.cnx.login = source['db-user']
        self.cnx.password = source['db-password']


    def create_user(self, login, groups=('users',), req=None):
        req = req or self.create_request()
        cursor = self._orig_cnx.cursor(req)
        rset = cursor.execute('INSERT CWUser X: X login %(login)s, X upassword %(passwd)s',
                              {'login': unicode(login), 'passwd': login.encode('utf8')})
        user = rset.get_entity(0, 0)
        cursor.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)'
                       % ','.join(repr(g) for g in groups),
                       {'x': user.eid}, 'x')
        user.clear_related_cache('in_group', 'subject')
        self._orig_cnx.commit()
        return user

    def login(self, login, password=None):
        if login == self.admlogin:
            self.restore_connection()
        else:
            self.cnx = repo_connect(self.repo, unicode(login),
                                    password or str(login),
                                    ConnectionProperties('inmemory'))
        if login == self.vreg.config.anonymous_user()[0]:
            self.cnx.anonymous_connection = True
        return self.cnx

    def restore_connection(self):
        if not self.cnx is self._orig_cnx:
            try:
                self.cnx.close()
            except ProgrammingError:
                pass # already closed
        self.cnx = self._orig_cnx

    ############################################################################

    def execute(self, rql, args=None, eidkey=None, req=None):
        """executes <rql>, builds a resultset, and returns a couple (rset, req)
        where req is a FakeRequest
        """
        req = req or self.create_request(rql=rql)
        return self.cnx.cursor(req).execute(unicode(rql), args, eidkey)

    def create_request(self, rql=None, **kwargs):
        """executes <rql>, builds a resultset, and returns a
        couple (rset, req) where req is a FakeRequest
        """
        if rql:
            kwargs['rql'] = rql
        req = self.requestcls(self.vreg, form=kwargs)
        req.set_connection(self.cnx)
        return req

    def get_rset_and_req(self, rql, optional_args=None, args=None, eidkey=None):
        """executes <rql>, builds a resultset, and returns a
        couple (rset, req) where req is a FakeRequest
        """
        return (self.execute(rql, args, eidkey),
                self.create_request(rql=rql, **optional_args or {}))


class ExistingTestEnvironment(TestEnvironment):

    def __init__(self, appid, sourcefile, verbose=False):
        config = ApptestConfiguration(appid, sourcefile=sourcefile)
        if verbose:
            print "init test database ..."
        source = config.sources()['system']
        self.vreg = CubicWebVRegistry(config)
        self.cnx = init_test_database(driver=source['db-driver'],
                                      vreg=self.vreg)[1]
        if verbose:
            print "init done"
        self.app = CubicWebPublisher(config, vreg=self.vreg)
        self.verbose = verbose
        # this is done when the publisher is opening a connection
        self.cnx.vreg = self.vreg

    def setup(self, config=None):
        """config is passed by TestSuite but is ignored in this environment"""
        cursor = self.cnx.cursor()
        self.last_eid = cursor.execute('Any X WHERE X creation_date D ORDERBY D DESC LIMIT 1').rows[0][0]

    def cleanup(self):
        """cancel inserted elements during tests"""
        cursor = self.cnx.cursor()
        cursor.execute('DELETE Any X WHERE X eid > %(x)s', {'x' : self.last_eid}, eid_key='x')
        print "cleaning done"
        self.cnx.commit()