devtools/__init__.py
author Alexandre Fayolle <alexandre.fayolle@logilab.fr>
Wed, 02 Jun 2010 16:12:18 +0000
branchstable
changeset 5639 4acb860159e4
parent 5426 0d4853a6e5ee
child 5466 b5af2ac0c43c
child 5541 eac04dd201f7
permissions -rw-r--r--
[win32] fix deadlock occuring on the sequence tables with SQLServer actually, this deadlock would occur with any db backend other that PostgreSQL as the previous code was heavily relying on PG's SEQUENCE facility, not available elsewhere. Deadlock description: Thread1 starts creating entities (and therefore calls create_eid): -> this creates a DB-level lock on the entities_id_seq table, which will last until end of transaction Thread2 calls create_eid, which acquires the Python lock object, but updating the entities_id_seq is held by the DB lock Thread1 wants to create a new entity, calls create_eid, and is stuck by the Python lock object held by Thread2. Solution: use a separate connection to read and write the entities_id_seq table.

# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
"""Test tools for cubicweb

"""
__docformat__ = "restructuredtext en"

import os
import logging
from datetime import timedelta
from os.path import (abspath, join, exists, basename, dirname, normpath, split,
                     isfile, isabs)

from logilab.common.date import strptime
from cubicweb import CW_SOFTWARE_ROOT, ConfigurationError, schema, cwconfig
from cubicweb.server.serverconfig import ServerConfiguration
from cubicweb.etwist.twconfig import TwistedConfiguration

cwconfig.CubicWebConfiguration.cls_adjust_sys_path()

# db auto-population configuration #############################################

SYSTEM_ENTITIES = schema.SCHEMA_TYPES | set((
    'CWGroup', 'CWUser', 'CWProperty',
    'Workflow', 'State', 'BaseTransition', 'Transition', 'WorkflowTransition',
    'TrInfo', 'SubWorkflowExitPoint',
    ))

SYSTEM_RELATIONS = schema.META_RTYPES | set((
    # workflow related
    'workflow_of', 'state_of', 'transition_of', 'initial_state', 'default_workflow',
    'allowed_transition', 'destination_state', 'from_state', 'to_state',
    'condition', 'subworkflow', 'subworkflow_state', 'subworkflow_exit',
    'custom_workflow', 'in_state', 'wf_info_for',
    # cwproperty
    'for_user',
    # schema definition
    'specializes',
    'relation_type', 'from_entity', 'to_entity',
    'constrained_by', 'cstrtype', 'widget',
    'read_permission', 'update_permission', 'delete_permission', 'add_permission',
    # permission
    'in_group', 'require_group', 'require_permission',
    # deducted from other relations
    'primary_email',
    ))

# content validation configuration #############################################

# validators are used to validate (XML, DTD, whatever) view's content
# validators availables are :
#  'dtd' : validates XML + declared DTD
#  'xml' : guarantees XML is well formed
#  None : do not try to validate anything

# {'vid': validator}
VIEW_VALIDATORS = {}


# cubicweb test configuration ##################################################

BASE_URL = 'http://testing.fr/cubicweb/'

DEFAULT_SOURCES = {'system': {'adapter' : 'native',
                              'db-encoding' : 'UTF-8', #'ISO-8859-1',
                              'db-user' : u'admin',
                              'db-password' : 'gingkow',
                              'db-name' : 'tmpdb',
                              'db-driver' : 'sqlite',
                              'db-host' : None,
                              },
                   'admin' : {'login': u'admin',
                              'password': u'gingkow',
                              },
                   }


class TestServerConfiguration(ServerConfiguration):
    mode = 'test'
    set_language = False
    read_instance_schema = False
    init_repository = True
    options = cwconfig.merge_options(ServerConfiguration.options + (
        ('anonymous-user',
         {'type' : 'string',
          'default': None,
          'help': 'login of the CubicWeb user account to use for anonymous user (if you want to allow anonymous)',
          'group': 'main', 'level': 1,
          }),
        ('anonymous-password',
         {'type' : 'string',
          'default': None,
          'help': 'password of the CubicWeb user account matching login',
          'group': 'main', 'level': 1,
          }),
        ))

    def __init__(self, appid, log_threshold=logging.CRITICAL+10):
        ServerConfiguration.__init__(self, appid)
        self.init_log(log_threshold, force=True)
        # need this, usually triggered by cubicweb-ctl
        self.load_cwctl_plugins()

    anonymous_user = TwistedConfiguration.anonymous_user.im_func

    @property
    def apphome(self):
        if exists(self.appid):
            return abspath(self.appid)
        # cube test
        return abspath('..')
    appdatahome = apphome

    def load_configuration(self):
        super(TestServerConfiguration, self).load_configuration()
        self.global_set_option('anonymous-user', 'anon')
        self.global_set_option('anonymous-password', 'anon')
        # no undo support in tests
        self.global_set_option('undo-support', '')

    def main_config_file(self):
        """return instance's control configuration file"""
        return join(self.apphome, '%s.conf' % self.name)

    def instance_md5_version(self):
        return ''

    def bootstrap_cubes(self):
        try:
            super(TestServerConfiguration, self).bootstrap_cubes()
        except IOError:
            # no cubes
            self.init_cubes( () )

    sourcefile = None
    def sources_file(self):
        """define in subclasses self.sourcefile if necessary"""
        if self.sourcefile:
            print 'Reading sources from', self.sourcefile
            sourcefile = self.sourcefile
            if not isabs(sourcefile):
                sourcefile = join(self.apphome, sourcefile)
        else:
            sourcefile = super(TestServerConfiguration, self).sources_file()
        return sourcefile

    def sources(self):
        """By default, we run tests with the sqlite DB backend.  One may use its
        own configuration by just creating a 'sources' file in the test
        directory from wich tests are launched or by specifying an alternative
        sources file using self.sourcefile.
        """
        sources = super(TestServerConfiguration, self).sources()
        if not sources:
            sources = DEFAULT_SOURCES
        return sources


class BaseApptestConfiguration(TestServerConfiguration, TwistedConfiguration):
    repo_method = 'inmemory'
    options = cwconfig.merge_options(TestServerConfiguration.options
                                     + TwistedConfiguration.options)
    cubicweb_appobject_path = TestServerConfiguration.cubicweb_appobject_path | TwistedConfiguration.cubicweb_appobject_path
    cube_appobject_path = TestServerConfiguration.cube_appobject_path | TwistedConfiguration.cube_appobject_path

    def available_languages(self, *args):
        return ('en', 'fr', 'de')

    def ext_resources_file(self):
        """return instance's external resources file"""
        return join(self.apphome, 'data', 'external_resources')

    def pyro_enabled(self):
        # but export PYRO_MULTITHREAD=0 or you get problems with sqlite and threads
        return True


class ApptestConfiguration(BaseApptestConfiguration):

    def __init__(self, appid, log_threshold=logging.CRITICAL, sourcefile=None):
        BaseApptestConfiguration.__init__(self, appid, log_threshold=log_threshold)
        self.init_repository = sourcefile is None
        self.sourcefile = sourcefile


# test database handling #######################################################

def init_test_database(config=None, configdir='data'):
    """init a test database for a specific driver"""
    from cubicweb.dbapi import in_memory_cnx
    config = config or TestServerConfiguration(configdir)
    sources = config.sources()
    driver = sources['system']['db-driver']
    if driver == 'sqlite':
        init_test_database_sqlite(config)
    elif driver == 'postgres':
        init_test_database_postgres(config)
    elif driver == 'sqlserver2005':
        init_test_database_sqlserver2005(config)
    else:
        raise ValueError('no initialization function for driver %r' % driver)
    config._cubes = None # avoid assertion error
    repo, cnx = in_memory_cnx(config, unicode(sources['admin']['login']),
                              password=sources['admin']['password'] or 'xxx')
    if driver == 'sqlite':
        install_sqlite_patch(repo.querier)
    return repo, cnx


def reset_test_database(config):
    """init a test database for a specific driver"""
    driver = config.sources()['system']['db-driver']
    if driver == 'sqlite':
        reset_test_database_sqlite(config)
    elif driver in ('sqlserver2005', 'postgres'):
        # XXX do something with dump/restore ?
        print 'resetting the database is not done for', driver
        print 'you should handle it manually'
    else:
        raise ValueError('no reset function for driver %r' % driver)


### postgres test database handling ############################################

def init_test_database_postgres(config):
    """initialize a fresh postgresql databse used for testing purpose"""
    if config.init_repository:
        from cubicweb.server import init_repository
        init_repository(config, interactive=False, drop=True)

### sqlserver2005 test database handling ############################################

def init_test_database_sqlserver2005(config):
    """initialize a fresh sqlserver databse used for testing purpose"""
    if config.init_repository:
        from cubicweb.server import init_repository
        init_repository(config, interactive=False, drop=True, vreg=vreg)

### sqlite test database handling ##############################################

def cleanup_sqlite(dbfile, removetemplate=False):
    try:
        os.remove(dbfile)
        os.remove('%s-journal' % dbfile)
    except OSError:
        pass
    if removetemplate:
        try:
            os.remove('%s-template' % dbfile)
        except OSError:
            pass

def reset_test_database_sqlite(config):
    import shutil
    dbfile = config.sources()['system']['db-name']
    cleanup_sqlite(dbfile)
    template = '%s-template' % dbfile
    if exists(template):
        shutil.copy(template, dbfile)
        return True
    return False

def init_test_database_sqlite(config):
    """initialize a fresh sqlite databse used for testing purpose"""
    # remove database file if it exists
    dbfile = config.sources()['system']['db-name']
    if not reset_test_database_sqlite(config):
        # initialize the database
        import shutil
        from cubicweb.server import init_repository
        init_repository(config, interactive=False)
        dbfile = config.sources()['system']['db-name']
        shutil.copy(dbfile, '%s-template' % dbfile)


def install_sqlite_patch(querier):
    """This patch hotfixes the following sqlite bug :
       - http://www.sqlite.org/cvstrac/tktview?tn=1327,33
       (some dates are returned as strings rather thant date objects)
    """
    if hasattr(querier.__class__, '_devtools_sqlite_patched'):
        return # already monkey patched
    def wrap_execute(base_execute):
        def new_execute(*args, **kwargs):
            rset = base_execute(*args, **kwargs)
            if rset.description:
                found_date = False
                for row, rowdesc in zip(rset, rset.description):
                    for cellindex, (value, vtype) in enumerate(zip(row, rowdesc)):
                        if vtype in ('Date', 'Datetime') and type(value) is unicode:
                            found_date = True
                            value = value.rsplit('.', 1)[0]
                            try:
                                row[cellindex] = strptime(value, '%Y-%m-%d %H:%M:%S')
                            except:
                                row[cellindex] = strptime(value, '%Y-%m-%d')
                        if vtype == 'Time' and type(value) is unicode:
                            found_date = True
                            try:
                                row[cellindex] = strptime(value, '%H:%M:%S')
                            except:
                                # DateTime used as Time?
                                row[cellindex] = strptime(value, '%Y-%m-%d %H:%M:%S')
                        if vtype == 'Interval' and type(value) is int:
                            found_date = True
                            row[cellindex] = timedelta(0, value, 0) # XXX value is in number of seconds?
                    if not found_date:
                        break
            return rset
        return new_execute
    querier.__class__.execute = wrap_execute(querier.__class__.execute)
    querier.__class__._devtools_sqlite_patched = True