author Arthur Lutz <>
Thu, 04 Apr 2019 14:11:40 +0200
changeset 12561 290f44d445a3
parent 1802 d628defebc17
child 1977 606923dff11b
permissions -rw-r--r--
Reclosing branch after merge

"""some utilities for cubicweb tools

:organization: Logilab
:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
:contact: --
__docformat__ = "restructuredtext en"

import os, sys
from os import listdir, makedirs, symlink, environ, chmod, walk, remove
from os.path import exists, join, abspath, normpath

from logilab.common.clcommands import Command as BaseCommand, \
     main_run as base_main_run
from logilab.common.compat import any

from cubicweb import warning
from cubicweb import ConfigurationError, ExecutionError

def iter_dir(directory, condition_file=None, ignore=()):
    """iterate on a directory"""
    for sub in listdir(directory):
        if sub in ('CVS', '.svn', '.hg'):
        if condition_file is not None and \
               not exists(join(directory, sub, condition_file)):
        if sub in ignore:
        yield sub

def create_dir(directory):
    """create a directory if it doesn't exist yet"""
        print 'created directory', directory
    except OSError, ex:
        import errno
        if ex.errno != errno.EEXIST:
        print 'directory %s already exists' % directory

def create_symlink(source, target):
    """create a symbolic link"""
    if exists(target):
    symlink(source, target)
    print '[symlink] %s <-- %s' % (target, source)

def create_copy(source, target):
    import shutil
    print '[copy] %s <-- %s' % (target, source)
    shutil.copy2(source, target)

def rm(whatever):
    import shutil
    print 'removed %s' % whatever

def show_diffs(appl_file, ref_file, askconfirm=True):
    """interactivly replace the old file with the new file according to
    user decision
    import shutil
    p_output = os.popen('diff -u %s %s' % (appl_file, ref_file), 'r')
    diffs =
    if diffs:
        if askconfirm:
            print diffs
            action = raw_input('replace (N/y/q) ? ').lower()
            action = 'y'
        if action == 'y':
                shutil.copyfile(ref_file, appl_file)
            except IOError:
                os.system('chmod a+w %s' % appl_file)
                shutil.copyfile(ref_file, appl_file)
            print 'replaced'
        elif action == 'q':
            copy_file = appl_file + '.default'
            copy = file(copy_file, 'w')
            print 'keep current version, the new file has been written to', copy_file
        print 'no diff between %s and %s' % (appl_file, ref_file)

def copy_skeleton(skeldir, targetdir, context,
                  exclude=('*.py[co]', '*.orig', '*~', '*'),
    import shutil
    from fnmatch import fnmatch
    skeldir = normpath(skeldir)
    targetdir = normpath(targetdir)
    for dirpath, dirnames, filenames in walk(skeldir):
        tdirpath = dirpath.replace(skeldir, targetdir)
        for fname in filenames:
            if any(fnmatch(fname, pat) for pat in exclude):
            fpath = join(dirpath, fname)
            if 'CUBENAME' in fname:
                tfpath = join(tdirpath, fname.replace('CUBENAME', context['cubename']))
            elif 'DISTNAME' in fname:
                tfpath = join(tdirpath, fname.replace('DISTNAME', context['distname']))
                tfpath = join(tdirpath, fname)
            if fname.endswith('.tmpl'):
                tfpath = tfpath[:-5]
                if not askconfirm or not exists(tfpath) or \
                       confirm('%s exists, overwrite?' % tfpath):
                    fill_templated_file(fpath, tfpath, context)
                    print '[generate] %s <-- %s' % (tfpath, fpath)
            elif exists(tfpath):
                show_diffs(tfpath, fpath, askconfirm)
                shutil.copyfile(fpath, tfpath)

def fill_templated_file(fpath, tfpath, context):
    fobj = file(tfpath, 'w')
    templated = file(fpath).read()
    fobj.write(templated % context)

def restrict_perms_to_user(filepath, log=None):
    """set -rw------- permission on the given file"""
    if log:
        log('set %s permissions to 0600', filepath)
        print 'set %s permissions to 0600' % filepath
    chmod(filepath, 0600)

def confirm(question, default_is_yes=True):
    """ask for confirmation and return true on positive answer"""
    if default_is_yes:
        input_str = '%s [Y/n]: '
        input_str = '%s [y/N]: '
    answer = raw_input(input_str % (question)).strip().lower()
    if default_is_yes:
        if answer in ('n', 'no'):
            return False
        return True
    if answer in ('y', 'yes'):
        return True
    return False

def read_config(config_file):
    """read the application configuration from a file and return it as a

    :type config_file: str
    :param config_file: path to the configuration file

    :rtype: dict
    :return: a dictionary with specified values associated to option names
    from logilab.common.fileutils import lines
    config = current = {}
        for line in lines(config_file, comments='#'):
                option, value = line.split('=', 1)
            except ValueError:
                option = line.strip().lower()
                if option[0] == '[':
                    # start a section
                    section = option[1:-1]
                    assert not config.has_key(section), \
                           'Section %s is defined more than once' % section
                    config[section] = current = {}
                print >> sys.stderr, 'ignoring malformed line\n%r' % line
            option = option.strip().replace(' ', '_')
            value = value.strip()
            current[option] = value or None
    except IOError, ex:
        warning('missing or non readable configuration file %s (%s)',
                config_file, ex)
    return config

def env_path(env_var, default, name):
    """get a path specified in a variable or using the default value and return

    :type env_var: str
    :param env_var: name of an environment variable

    :type default: str
    :param default: default value if the environment variable is not defined

    :type name: str
    :param name: the informal name of the path, used for error message

    :rtype: str
    :return: the value of the environment variable or the default value

    :raise `ConfigurationError`: if the returned path does not exist
    path = environ.get(env_var, default)
    if not exists(path):
        raise ConfigurationError('%s path %s doesn\'t exist' % (name, path))
    return abspath(path)

_HDLRS = {}

class metacmdhandler(type):
    def __new__(mcs, name, bases, classdict):
        cls = super(metacmdhandler, mcs).__new__(mcs, name, bases, classdict)
        if getattr(cls, 'cfgname', None) and getattr(cls, 'cmdname', None):
            _HDLRS.setdefault(cls.cmdname, []).append(cls)
        return cls

class CommandHandler(object):
    """configuration specific helper for cubicweb-ctl commands"""
    __metaclass__ = metacmdhandler
    def __init__(self, config):
        self.config = config

class Command(BaseCommand):
    """base class for cubicweb-ctl commands"""

    def config_helper(self, config, required=True, cmdname=None):
        if cmdname is None:
            cmdname =
        for helpercls in _HDLRS.get(cmdname, ()):
            if helpercls.cfgname ==
                return helpercls(config)
        if == 'all-in-one':
            for helpercls in _HDLRS.get(cmdname, ()):
                if helpercls.cfgname == 'repository':
                    return helpercls(config)
        if required:
            msg = 'No helper for command %s using %s configuration' % (
            raise ConfigurationError(msg)

    def fail(self, reason):
        print "command failed:", reason

def main_run(args, doc):
    """command line tool"""
        base_main_run(args, doc)
    except ConfigurationError, err:
        print 'ERROR: ', err
    except ExecutionError, err:
        print err

     {'short': 'u', 'type' : 'string', 'metavar': '<user>',
      'help': 'connect as <user> instead of being prompted to give it.',
     {'short': 'p', 'type' : 'password', 'metavar': '<password>',
      'help': 'automatically give <password> for authentication instead of \
being prompted to give it.',
     {'short': 'H', 'type' : 'string', 'metavar': '<hostname>',
      'default': None,
      'help': 'specify the name server\'s host name. Will be detected by \
broadcast if not provided.',

def config_connect(appid, optconfig):
    from cubicweb.dbapi import connect
    from getpass import getpass
    user = optconfig.user
    if not user:
        user = raw_input('login: ')
    password = optconfig.password
    if not password:
        password = getpass('password: ')
    return connect(user=user, password=password,, database=appid)