--- a/toolsutils.py Mon Jan 04 18:40:30 2016 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,415 +0,0 @@
-# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
-# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
-#
-# This file is part of CubicWeb.
-#
-# CubicWeb is free software: you can redistribute it and/or modify it under the
-# terms of the GNU Lesser General Public License as published by the Free
-# Software Foundation, either version 2.1 of the License, or (at your option)
-# any later version.
-#
-# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
-# details.
-#
-# You should have received a copy of the GNU Lesser General Public License along
-# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
-"""some utilities for cubicweb command line tools"""
-from __future__ import print_function
-
-__docformat__ = "restructuredtext en"
-
-# XXX move most of this in logilab.common (shellutils ?)
-
-import io
-import os, sys
-import subprocess
-from os import listdir, makedirs, environ, chmod, walk, remove
-from os.path import exists, join, abspath, normpath
-import re
-from rlcompleter import Completer
-try:
- import readline
-except ImportError: # readline not available, no completion
- pass
-try:
- from os import symlink
-except ImportError:
- def symlink(*args):
- raise NotImplementedError
-
-from six import add_metaclass
-
-from logilab.common.clcommands import Command as BaseCommand
-from logilab.common.shellutils import ASK
-
-from cubicweb import warning # pylint: disable=E0611
-from cubicweb import ConfigurationError, ExecutionError
-
-def underline_title(title, car='-'):
- return title+'\n'+(car*len(title))
-
-def iter_dir(directory, condition_file=None, ignore=()):
- """iterate on a directory"""
- for sub in listdir(directory):
- if sub in ('CVS', '.svn', '.hg'):
- continue
- if condition_file is not None and \
- not exists(join(directory, sub, condition_file)):
- continue
- if sub in ignore:
- continue
- yield sub
-
-def create_dir(directory):
- """create a directory if it doesn't exist yet"""
- try:
- makedirs(directory)
- print('-> created directory %s' % directory)
- except OSError as ex:
- import errno
- if ex.errno != errno.EEXIST:
- raise
- print('-> no need to create existing directory %s' % directory)
-
-def create_symlink(source, target):
- """create a symbolic link"""
- if exists(target):
- remove(target)
- symlink(source, target)
- print('[symlink] %s <-- %s' % (target, source))
-
-def create_copy(source, target):
- import shutil
- print('[copy] %s <-- %s' % (target, source))
- shutil.copy2(source, target)
-
-def rm(whatever):
- import shutil
- shutil.rmtree(whatever)
- print('-> removed %s' % whatever)
-
-def show_diffs(appl_file, ref_file, askconfirm=True):
- """interactivly replace the old file with the new file according to
- user decision
- """
- import shutil
- pipe = subprocess.Popen(['diff', '-u', appl_file, ref_file], stdout=subprocess.PIPE)
- diffs = pipe.stdout.read()
- if diffs:
- if askconfirm:
- print()
- print(diffs)
- action = ASK.ask('Replace ?', ('Y', 'n', 'q'), 'Y').lower()
- else:
- action = 'y'
- if action == 'y':
- try:
- shutil.copyfile(ref_file, appl_file)
- except IOError:
- os.system('chmod a+w %s' % appl_file)
- shutil.copyfile(ref_file, appl_file)
- print('replaced')
- elif action == 'q':
- sys.exit(0)
- else:
- copy_file = appl_file + '.default'
- copy = open(copy_file, 'w')
- copy.write(open(ref_file).read())
- copy.close()
- print('keep current version, the new file has been written to', copy_file)
- else:
- print('no diff between %s and %s' % (appl_file, ref_file))
-
-SKEL_EXCLUDE = ('*.py[co]', '*.orig', '*~', '*_flymake.py')
-def copy_skeleton(skeldir, targetdir, context,
- exclude=SKEL_EXCLUDE, askconfirm=False):
- import shutil
- from fnmatch import fnmatch
- skeldir = normpath(skeldir)
- targetdir = normpath(targetdir)
- for dirpath, dirnames, filenames in walk(skeldir):
- tdirpath = dirpath.replace(skeldir, targetdir)
- create_dir(tdirpath)
- for fname in filenames:
- if any(fnmatch(fname, pat) for pat in exclude):
- continue
- fpath = join(dirpath, fname)
- if 'CUBENAME' in fname:
- tfpath = join(tdirpath, fname.replace('CUBENAME', context['cubename']))
- elif 'DISTNAME' in fname:
- tfpath = join(tdirpath, fname.replace('DISTNAME', context['distname']))
- else:
- tfpath = join(tdirpath, fname)
- if fname.endswith('.tmpl'):
- tfpath = tfpath[:-5]
- if not askconfirm or not exists(tfpath) or \
- ASK.confirm('%s exists, overwrite?' % tfpath):
- fill_templated_file(fpath, tfpath, context)
- print('[generate] %s <-- %s' % (tfpath, fpath))
- elif exists(tfpath):
- show_diffs(tfpath, fpath, askconfirm)
- else:
- shutil.copyfile(fpath, tfpath)
-
-def fill_templated_file(fpath, tfpath, context):
- with io.open(fpath, encoding='ascii') as fobj:
- template = fobj.read()
- with io.open(tfpath, 'w', encoding='ascii') as fobj:
- fobj.write(template % context)
-
-def restrict_perms_to_user(filepath, log=None):
- """set -rw------- permission on the given file"""
- if log:
- log('set permissions to 0600 for %s', filepath)
- else:
- print('-> set permissions to 0600 for %s' % filepath)
- chmod(filepath, 0o600)
-
-def read_config(config_file, raise_if_unreadable=False):
- """read some simple configuration from `config_file` and return it as a
- dictionary. If `raise_if_unreadable` is false (the default), an empty
- dictionary will be returned if the file is inexistant or unreadable, else
- :exc:`ExecutionError` will be raised.
- """
- from logilab.common.fileutils import lines
- config = current = {}
- try:
- for line in lines(config_file, comments='#'):
- try:
- option, value = line.split('=', 1)
- except ValueError:
- option = line.strip().lower()
- if option[0] == '[':
- # start a section
- section = option[1:-1]
- assert section not in config, \
- 'Section %s is defined more than once' % section
- config[section] = current = {}
- continue
- sys.stderr.write('ignoring malformed line\n%r\n' % line)
- continue
- option = option.strip().replace(' ', '_')
- value = value.strip()
- current[option] = value or None
- except IOError as ex:
- if raise_if_unreadable:
- raise ExecutionError('%s. Are you logged with the correct user '
- 'to use this instance?' % ex)
- else:
- warning('missing or non readable configuration file %s (%s)',
- config_file, ex)
- return config
-
-
-_HDLRS = {}
-
-class metacmdhandler(type):
- def __new__(mcs, name, bases, classdict):
- cls = super(metacmdhandler, mcs).__new__(mcs, name, bases, classdict)
- if getattr(cls, 'cfgname', None) and getattr(cls, 'cmdname', None):
- _HDLRS.setdefault(cls.cmdname, []).append(cls)
- return cls
-
-
-@add_metaclass(metacmdhandler)
-class CommandHandler(object):
- """configuration specific helper for cubicweb-ctl commands"""
- def __init__(self, config):
- self.config = config
-
-
-class Command(BaseCommand):
- """base class for cubicweb-ctl commands"""
-
- def config_helper(self, config, required=True, cmdname=None):
- if cmdname is None:
- cmdname = self.name
- for helpercls in _HDLRS.get(cmdname, ()):
- if helpercls.cfgname == config.name:
- return helpercls(config)
- if config.name == 'all-in-one':
- for helpercls in _HDLRS.get(cmdname, ()):
- if helpercls.cfgname == 'repository':
- return helpercls(config)
- if required:
- msg = 'No helper for command %s using %s configuration' % (
- cmdname, config.name)
- raise ConfigurationError(msg)
-
- def fail(self, reason):
- print("command failed:", reason)
- sys.exit(1)
-
-
-CONNECT_OPTIONS = (
- ("user",
- {'short': 'u', 'type' : 'string', 'metavar': '<user>',
- 'help': 'connect as <user> instead of being prompted to give it.',
- }
- ),
- ("password",
- {'short': 'p', 'type' : 'password', 'metavar': '<password>',
- 'help': 'automatically give <password> for authentication instead of \
-being prompted to give it.',
- }),
- ("host",
- {'short': 'H', 'type' : 'string', 'metavar': '<hostname>',
- 'default': None,
- 'help': 'specify the name server\'s host name. Will be detected by \
-broadcast if not provided.',
- }),
- )
-
-## cwshell helpers #############################################################
-
-class AbstractMatcher(object):
- """Abstract class for CWShellCompleter's matchers.
-
- A matcher should implement a ``possible_matches`` method. This
- method has to return the list of possible completions for user's input.
- Because of the python / readline interaction, each completion should
- be a superset of the user's input.
-
- NOTE: readline tokenizes user's input and only passes last token to
- completers.
- """
-
- def possible_matches(self, text):
- """return possible completions for user's input.
-
- Parameters:
- text: the user's input
-
- Return:
- a list of completions. Each completion includes the original input.
- """
- raise NotImplementedError()
-
-
-class RQLExecuteMatcher(AbstractMatcher):
- """Custom matcher for rql queries.
-
- If user's input starts with ``rql(`` or ``session.execute(`` and
- the corresponding rql query is incomplete, suggest some valid completions.
- """
- query_match_rgx = re.compile(
- r'(?P<func_prefix>\s*(?:rql)' # match rql, possibly indented
- r'|' # or
- r'\s*(?:\w+\.execute))' # match .execute, possibly indented
- # end of <func_prefix>
- r'\(' # followed by a parenthesis
- r'(?P<quote_delim>["\'])' # a quote or double quote
- r'(?P<parameters>.*)') # and some content
-
- def __init__(self, local_ctx, req):
- self.local_ctx = local_ctx
- self.req = req
- self.schema = req.vreg.schema
- self.rsb = req.vreg['components'].select('rql.suggestions', req)
-
- @staticmethod
- def match(text):
- """check if ``text`` looks like a call to ``rql`` or ``session.execute``
-
- Parameters:
- text: the user's input
-
- Returns:
- None if it doesn't match, the query structure otherwise.
- """
- query_match = RQLExecuteMatcher.query_match_rgx.match(text)
- if query_match is None:
- return None
- parameters_text = query_match.group('parameters')
- quote_delim = query_match.group('quote_delim')
- # first parameter is fully specified, no completion needed
- if re.match(r"(.*?)%s" % quote_delim, parameters_text) is not None:
- return None
- func_prefix = query_match.group('func_prefix')
- return {
- # user's input
- 'text': text,
- # rql( or session.execute(
- 'func_prefix': func_prefix,
- # offset of rql query
- 'rql_offset': len(func_prefix) + 2,
- # incomplete rql query
- 'rql_query': parameters_text,
- }
-
- def possible_matches(self, text):
- """call ``rql.suggestions`` component to complete user's input.
- """
- # readline will only send last token, but we need the entire user's input
- user_input = readline.get_line_buffer()
- query_struct = self.match(user_input)
- if query_struct is None:
- return []
- else:
- # we must only send completions of the last token => compute where it
- # starts relatively to the rql query itself.
- completion_offset = readline.get_begidx() - query_struct['rql_offset']
- rql_query = query_struct['rql_query']
- return [suggestion[completion_offset:]
- for suggestion in self.rsb.build_suggestions(rql_query)]
-
-
-class DefaultMatcher(AbstractMatcher):
- """Default matcher: delegate to standard's `rlcompleter.Completer`` class
- """
- def __init__(self, local_ctx):
- self.completer = Completer(local_ctx)
-
- def possible_matches(self, text):
- if "." in text:
- return self.completer.attr_matches(text)
- else:
- return self.completer.global_matches(text)
-
-
-class CWShellCompleter(object):
- """Custom auto-completion helper for cubicweb-ctl shell.
-
- ``CWShellCompleter`` provides a ``complete`` method suitable for
- ``readline.set_completer``.
-
- Attributes:
- matchers: the list of ``AbstractMatcher`` instances that will suggest
- possible completions
-
- The completion process is the following:
-
- - readline calls the ``complete`` method with user's input,
- - the ``complete`` method asks for each known matchers if
- it can suggest completions for user's input.
- """
-
- def __init__(self, local_ctx):
- # list of matchers to ask for possible matches on completion
- self.matchers = [DefaultMatcher(local_ctx)]
- self.matchers.insert(0, RQLExecuteMatcher(local_ctx, local_ctx['session']))
-
- def complete(self, text, state):
- """readline's completer method
-
- cf http://docs.python.org/2/library/readline.html#readline.set_completer
- for more details.
-
- Implementation inspired by `rlcompleter.Completer`
- """
- if state == 0:
- # reset self.matches
- self.matches = []
- for matcher in self.matchers:
- matches = matcher.possible_matches(text)
- if matches:
- self.matches = matches
- break
- else:
- return None # no matcher able to handle `text`
- try:
- return self.matches[state]
- except IndexError:
- return None