toolsutils.py
author sylvain.thenault@logilab.fr
Fri, 03 Apr 2009 19:04:00 +0200
changeset 1228 91ae10ffb611
parent 541 0d75cfe50f83
child 1132 96752791c2b6
permissions -rw-r--r--
* refactor ms planner (renaming, reorganization) * fix a bug originaly demonstrated by test_version_depends_on * enhance crossed relation support, though there is still some bug renaming. some tests were actually wrong. Buggy tests (wether they fail or not, they are byggy) marked by XXXFIXME)

"""some utilities for cubicweb tools

:organization: Logilab
:copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
"""
__docformat__ = "restructuredtext en"

import os, sys
from os import listdir, makedirs, symlink, environ, chmod, walk, remove
from os.path import exists, join, abspath, normpath

from logilab.common.clcommands import Command as BaseCommand, \
     main_run as base_main_run, register_commands, pop_arg, cmd_run
from logilab.common.compat import any

from cubicweb import warning
from cubicweb import ConfigurationError, ExecutionError

def iter_dir(directory, condition_file=None, ignore=()):
    """iterate on a directory"""
    for sub in listdir(directory):
        if sub in ('CVS', '.svn', '.hg'):
            continue
        if condition_file is not None and \
               not exists(join(directory, sub, condition_file)):
            continue
        if sub in ignore:
            continue
        yield sub

def create_dir(directory):
    """create a directory if it doesn't exist yet"""
    try:
        makedirs(directory)
        print 'created directory', directory
    except OSError, ex:
        import errno
        if ex.errno != errno.EEXIST:
            raise
        print 'directory %s already exists' % directory
                
def create_symlink(source, target):
    """create a symbolic link"""
    if exists(target):
        remove(target)
    symlink(source, target)
    print '[symlink] %s <-- %s' % (target, source)

def create_copy(source, target):
    import shutil
    print '[copy] %s <-- %s' % (target, source)
    shutil.copy2(source, target)
    
def rm(whatever):
    import shutil
    shutil.rmtree(whatever)
    print 'removed %s' % whatever

def show_diffs(appl_file, ref_file, askconfirm=True):
    """interactivly replace the old file with the new file according to
    user decision
    """
    import shutil
    p_output = os.popen('diff -u %s %s' % (appl_file, ref_file), 'r')
    diffs = p_output.read()
    if diffs:
        if askconfirm:
            print 
            print diffs
            action = raw_input('replace (N/y/q) ? ').lower()
        else:
            action = 'y'
        if action == 'y':
            try:
                shutil.copyfile(ref_file, appl_file)
            except IOError:
                os.system('chmod a+w %s' % appl_file)
                shutil.copyfile(ref_file, appl_file)
            print 'replaced'
        elif action == 'q':
            sys.exit(0)
        else:
            copy_file = appl_file + '.default'
            copy = file(copy_file, 'w')
            copy.write(open(ref_file).read())
            copy.close()
            print 'keep current version, the new file has been written to', copy_file
    else:
        print 'no diff between %s and %s' % (appl_file, ref_file)


def copy_skeleton(skeldir, targetdir, context,
                  exclude=('*.py[co]', '*.orig', '*~', '*_flymake.py'),
                  askconfirm=False):
    import shutil
    from fnmatch import fnmatch
    skeldir = normpath(skeldir)
    targetdir = normpath(targetdir)
    for dirpath, dirnames, filenames in walk(skeldir):
        tdirpath = dirpath.replace(skeldir, targetdir)
        create_dir(tdirpath)
        for fname in filenames:
            if any(fnmatch(fname, pat) for pat in exclude):
                continue
            fpath = join(dirpath, fname)
            if 'CUBENAME' in fname:
                tfpath = join(tdirpath, fname.replace('CUBENAME', context['cubename']))
            elif 'DISTNAME' in fname:
                tfpath = join(tdirpath, fname.replace('DISTNAME', context['distname']))
            else:
                tfpath = join(tdirpath, fname)
            if fname.endswith('.tmpl'):
                tfpath = tfpath[:-5]
                if not askconfirm or not exists(tfpath) or \
                       confirm('%s exists, overwrite?' % tfpath):
                    fname = fill_templated_file(fpath, tfpath, context)
                    print '[generate] %s <-- %s' % (tfpath, fpath)
            elif exists(tfpath):
                show_diffs(tfpath, fpath, askconfirm)
            else:
                shutil.copyfile(fpath, tfpath)
                
def fill_templated_file(fpath, tfpath, context):
    fobj = file(tfpath, 'w')
    templated = file(fpath).read()
    fobj.write(templated % context)
    fobj.close()

def restrict_perms_to_user(filepath, log=None):
    """set -rw------- permission on the given file"""
    if log:
        log('set %s permissions to 0600', filepath)
    else:
        print 'set %s permissions to 0600' % filepath
    chmod(filepath, 0600)

def confirm(question, default_is_yes=True):
    """ask for confirmation and return true on positive answer"""
    if default_is_yes:
        input_str = '%s [Y/n]: '
    else:
        input_str = '%s [y/N]: '
    answer = raw_input(input_str % (question)).strip().lower()
    if default_is_yes:
        if answer in ('n', 'no'):
            return False
        return True
    if answer in ('y', 'yes'):
        return True
    return False

def read_config(config_file):
    """read the application configuration from a file and return it as a
    dictionnary

    :type config_file: str
    :param config_file: path to the configuration file

    :rtype: dict
    :return: a dictionary with specified values associated to option names 
    """
    from logilab.common.fileutils import lines
    config = current = {}
    try:
        for line in lines(config_file, comments='#'):
            try:
                option, value = line.split('=', 1)
            except ValueError:
                option = line.strip().lower()
                if option[0] == '[':
                    # start a section
                    section = option[1:-1]
                    assert not config.has_key(section), \
                           'Section %s is defined more than once' % section
                    config[section] = current = {}
                    continue
                print >> sys.stderr, 'ignoring malformed line\n%r' % line
                continue
            option = option.strip().replace(' ', '_')
            value = value.strip()
            current[option] = value or None
    except IOError, ex:
        warning('missing or non readable configuration file %s (%s)',
                config_file, ex)
    return config

def env_path(env_var, default, name):
    """get a path specified in a variable or using the default value and return
    it.

    :type env_var: str
    :param env_var: name of an environment variable

    :type default: str
    :param default: default value if the environment variable is not defined
    
    :type name: str
    :param name: the informal name of the path, used for error message
    
    :rtype: str
    :return: the value of the environment variable or the default value

    :raise `ConfigurationError`: if the returned path does not exist
    """
    path = environ.get(env_var, default)
    if not exists(path):
        raise ConfigurationError('%s path %s doesn\'t exist' % (name, path))
    return abspath(path)



_HDLRS = {}

class metacmdhandler(type):
    def __new__(mcs, name, bases, classdict):
        cls = super(metacmdhandler, mcs).__new__(mcs, name, bases, classdict)
        if getattr(cls, 'cfgname', None) and getattr(cls, 'cmdname', None):
            _HDLRS.setdefault(cls.cmdname, []).append(cls)
        return cls


class CommandHandler(object):
    """configuration specific helper for cubicweb-ctl commands"""
    __metaclass__ = metacmdhandler
    def __init__(self, config):
        self.config = config

class Command(BaseCommand):
    """base class for cubicweb-ctl commands"""

    def config_helper(self, config, required=True, cmdname=None):
        if cmdname is None:
            cmdname = self.name
        for helpercls in _HDLRS.get(cmdname, ()):
            if helpercls.cfgname == config.name:
                return helpercls(config)
        if config.name == 'all-in-one':
            for helpercls in _HDLRS.get(cmdname, ()):
                if helpercls.cfgname == 'repository':
                    return helpercls(config)
        if required:
            msg = 'No helper for command %s using %s configuration' % (
                cmdname, config.name)
            raise ConfigurationError(msg)
        
    def fail(self, reason):
        print "command failed:", reason
        sys.exit(1)
    
                    
def main_run(args, doc):
    """command line tool"""
    try:
        base_main_run(args, doc)
    except ConfigurationError, err:
        print 'ERROR: ', err
        sys.exit(1)
    except ExecutionError, err:
        print err
        sys.exit(2)

CONNECT_OPTIONS = (
    ("user",
     {'short': 'u', 'type' : 'string', 'metavar': '<user>',
      'help': 'connect as <user> instead of being prompted to give it.',
      }
     ),
    ("password",
     {'short': 'p', 'type' : 'password', 'metavar': '<password>',
      'help': 'automatically give <password> for authentication instead of \
being prompted to give it.',
      }),
    ("host",
     {'short': 'H', 'type' : 'string', 'metavar': '<hostname>',
      'default': None,
      'help': 'specify the name server\'s host name. Will be detected by \
broadcast if not provided.',
      }),
    )

def config_connect(appid, optconfig):
    from cubicweb.dbapi import connect
    from getpass import getpass
    user = optconfig.user
    if not user:
        user = raw_input('login: ')
    password = optconfig.password
    if not password:
        password = getpass('password: ')
    return connect(user=user, password=password, host=optconfig.host, database=appid)