--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/cubicweb/toolsutils.py Sat Jan 16 13:48:51 2016 +0100
@@ -0,0 +1,415 @@
+# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
+#
+# This file is part of CubicWeb.
+#
+# CubicWeb is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License along
+# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
+"""some utilities for cubicweb command line tools"""
+from __future__ import print_function
+
+__docformat__ = "restructuredtext en"
+
+# XXX move most of this in logilab.common (shellutils ?)
+
+import io
+import os, sys
+import subprocess
+from os import listdir, makedirs, environ, chmod, walk, remove
+from os.path import exists, join, abspath, normpath
+import re
+from rlcompleter import Completer
+try:
+ import readline
+except ImportError: # readline not available, no completion
+ pass
+try:
+ from os import symlink
+except ImportError:
+ def symlink(*args):
+ raise NotImplementedError
+
+from six import add_metaclass
+
+from logilab.common.clcommands import Command as BaseCommand
+from logilab.common.shellutils import ASK
+
+from cubicweb import warning # pylint: disable=E0611
+from cubicweb import ConfigurationError, ExecutionError
+
+def underline_title(title, car='-'):
+ return title+'\n'+(car*len(title))
+
+def iter_dir(directory, condition_file=None, ignore=()):
+ """iterate on a directory"""
+ for sub in listdir(directory):
+ if sub in ('CVS', '.svn', '.hg'):
+ continue
+ if condition_file is not None and \
+ not exists(join(directory, sub, condition_file)):
+ continue
+ if sub in ignore:
+ continue
+ yield sub
+
+def create_dir(directory):
+ """create a directory if it doesn't exist yet"""
+ try:
+ makedirs(directory)
+ print('-> created directory %s' % directory)
+ except OSError as ex:
+ import errno
+ if ex.errno != errno.EEXIST:
+ raise
+ print('-> no need to create existing directory %s' % directory)
+
+def create_symlink(source, target):
+ """create a symbolic link"""
+ if exists(target):
+ remove(target)
+ symlink(source, target)
+ print('[symlink] %s <-- %s' % (target, source))
+
+def create_copy(source, target):
+ import shutil
+ print('[copy] %s <-- %s' % (target, source))
+ shutil.copy2(source, target)
+
+def rm(whatever):
+ import shutil
+ shutil.rmtree(whatever)
+ print('-> removed %s' % whatever)
+
+def show_diffs(appl_file, ref_file, askconfirm=True):
+ """interactivly replace the old file with the new file according to
+ user decision
+ """
+ import shutil
+ pipe = subprocess.Popen(['diff', '-u', appl_file, ref_file], stdout=subprocess.PIPE)
+ diffs = pipe.stdout.read()
+ if diffs:
+ if askconfirm:
+ print()
+ print(diffs)
+ action = ASK.ask('Replace ?', ('Y', 'n', 'q'), 'Y').lower()
+ else:
+ action = 'y'
+ if action == 'y':
+ try:
+ shutil.copyfile(ref_file, appl_file)
+ except IOError:
+ os.system('chmod a+w %s' % appl_file)
+ shutil.copyfile(ref_file, appl_file)
+ print('replaced')
+ elif action == 'q':
+ sys.exit(0)
+ else:
+ copy_file = appl_file + '.default'
+ copy = open(copy_file, 'w')
+ copy.write(open(ref_file).read())
+ copy.close()
+ print('keep current version, the new file has been written to', copy_file)
+ else:
+ print('no diff between %s and %s' % (appl_file, ref_file))
+
+SKEL_EXCLUDE = ('*.py[co]', '*.orig', '*~', '*_flymake.py')
+def copy_skeleton(skeldir, targetdir, context,
+ exclude=SKEL_EXCLUDE, askconfirm=False):
+ import shutil
+ from fnmatch import fnmatch
+ skeldir = normpath(skeldir)
+ targetdir = normpath(targetdir)
+ for dirpath, dirnames, filenames in walk(skeldir):
+ tdirpath = dirpath.replace(skeldir, targetdir)
+ create_dir(tdirpath)
+ for fname in filenames:
+ if any(fnmatch(fname, pat) for pat in exclude):
+ continue
+ fpath = join(dirpath, fname)
+ if 'CUBENAME' in fname:
+ tfpath = join(tdirpath, fname.replace('CUBENAME', context['cubename']))
+ elif 'DISTNAME' in fname:
+ tfpath = join(tdirpath, fname.replace('DISTNAME', context['distname']))
+ else:
+ tfpath = join(tdirpath, fname)
+ if fname.endswith('.tmpl'):
+ tfpath = tfpath[:-5]
+ if not askconfirm or not exists(tfpath) or \
+ ASK.confirm('%s exists, overwrite?' % tfpath):
+ fill_templated_file(fpath, tfpath, context)
+ print('[generate] %s <-- %s' % (tfpath, fpath))
+ elif exists(tfpath):
+ show_diffs(tfpath, fpath, askconfirm)
+ else:
+ shutil.copyfile(fpath, tfpath)
+
+def fill_templated_file(fpath, tfpath, context):
+ with io.open(fpath, encoding='ascii') as fobj:
+ template = fobj.read()
+ with io.open(tfpath, 'w', encoding='ascii') as fobj:
+ fobj.write(template % context)
+
+def restrict_perms_to_user(filepath, log=None):
+ """set -rw------- permission on the given file"""
+ if log:
+ log('set permissions to 0600 for %s', filepath)
+ else:
+ print('-> set permissions to 0600 for %s' % filepath)
+ chmod(filepath, 0o600)
+
+def read_config(config_file, raise_if_unreadable=False):
+ """read some simple configuration from `config_file` and return it as a
+ dictionary. If `raise_if_unreadable` is false (the default), an empty
+ dictionary will be returned if the file is inexistant or unreadable, else
+ :exc:`ExecutionError` will be raised.
+ """
+ from logilab.common.fileutils import lines
+ config = current = {}
+ try:
+ for line in lines(config_file, comments='#'):
+ try:
+ option, value = line.split('=', 1)
+ except ValueError:
+ option = line.strip().lower()
+ if option[0] == '[':
+ # start a section
+ section = option[1:-1]
+ assert section not in config, \
+ 'Section %s is defined more than once' % section
+ config[section] = current = {}
+ continue
+ sys.stderr.write('ignoring malformed line\n%r\n' % line)
+ continue
+ option = option.strip().replace(' ', '_')
+ value = value.strip()
+ current[option] = value or None
+ except IOError as ex:
+ if raise_if_unreadable:
+ raise ExecutionError('%s. Are you logged with the correct user '
+ 'to use this instance?' % ex)
+ else:
+ warning('missing or non readable configuration file %s (%s)',
+ config_file, ex)
+ return config
+
+
+_HDLRS = {}
+
+class metacmdhandler(type):
+ def __new__(mcs, name, bases, classdict):
+ cls = super(metacmdhandler, mcs).__new__(mcs, name, bases, classdict)
+ if getattr(cls, 'cfgname', None) and getattr(cls, 'cmdname', None):
+ _HDLRS.setdefault(cls.cmdname, []).append(cls)
+ return cls
+
+
+@add_metaclass(metacmdhandler)
+class CommandHandler(object):
+ """configuration specific helper for cubicweb-ctl commands"""
+ def __init__(self, config):
+ self.config = config
+
+
+class Command(BaseCommand):
+ """base class for cubicweb-ctl commands"""
+
+ def config_helper(self, config, required=True, cmdname=None):
+ if cmdname is None:
+ cmdname = self.name
+ for helpercls in _HDLRS.get(cmdname, ()):
+ if helpercls.cfgname == config.name:
+ return helpercls(config)
+ if config.name == 'all-in-one':
+ for helpercls in _HDLRS.get(cmdname, ()):
+ if helpercls.cfgname == 'repository':
+ return helpercls(config)
+ if required:
+ msg = 'No helper for command %s using %s configuration' % (
+ cmdname, config.name)
+ raise ConfigurationError(msg)
+
+ def fail(self, reason):
+ print("command failed:", reason)
+ sys.exit(1)
+
+
+CONNECT_OPTIONS = (
+ ("user",
+ {'short': 'u', 'type' : 'string', 'metavar': '<user>',
+ 'help': 'connect as <user> instead of being prompted to give it.',
+ }
+ ),
+ ("password",
+ {'short': 'p', 'type' : 'password', 'metavar': '<password>',
+ 'help': 'automatically give <password> for authentication instead of \
+being prompted to give it.',
+ }),
+ ("host",
+ {'short': 'H', 'type' : 'string', 'metavar': '<hostname>',
+ 'default': None,
+ 'help': 'specify the name server\'s host name. Will be detected by \
+broadcast if not provided.',
+ }),
+ )
+
+## cwshell helpers #############################################################
+
+class AbstractMatcher(object):
+ """Abstract class for CWShellCompleter's matchers.
+
+ A matcher should implement a ``possible_matches`` method. This
+ method has to return the list of possible completions for user's input.
+ Because of the python / readline interaction, each completion should
+ be a superset of the user's input.
+
+ NOTE: readline tokenizes user's input and only passes last token to
+ completers.
+ """
+
+ def possible_matches(self, text):
+ """return possible completions for user's input.
+
+ Parameters:
+ text: the user's input
+
+ Return:
+ a list of completions. Each completion includes the original input.
+ """
+ raise NotImplementedError()
+
+
+class RQLExecuteMatcher(AbstractMatcher):
+ """Custom matcher for rql queries.
+
+ If user's input starts with ``rql(`` or ``session.execute(`` and
+ the corresponding rql query is incomplete, suggest some valid completions.
+ """
+ query_match_rgx = re.compile(
+ r'(?P<func_prefix>\s*(?:rql)' # match rql, possibly indented
+ r'|' # or
+ r'\s*(?:\w+\.execute))' # match .execute, possibly indented
+ # end of <func_prefix>
+ r'\(' # followed by a parenthesis
+ r'(?P<quote_delim>["\'])' # a quote or double quote
+ r'(?P<parameters>.*)') # and some content
+
+ def __init__(self, local_ctx, req):
+ self.local_ctx = local_ctx
+ self.req = req
+ self.schema = req.vreg.schema
+ self.rsb = req.vreg['components'].select('rql.suggestions', req)
+
+ @staticmethod
+ def match(text):
+ """check if ``text`` looks like a call to ``rql`` or ``session.execute``
+
+ Parameters:
+ text: the user's input
+
+ Returns:
+ None if it doesn't match, the query structure otherwise.
+ """
+ query_match = RQLExecuteMatcher.query_match_rgx.match(text)
+ if query_match is None:
+ return None
+ parameters_text = query_match.group('parameters')
+ quote_delim = query_match.group('quote_delim')
+ # first parameter is fully specified, no completion needed
+ if re.match(r"(.*?)%s" % quote_delim, parameters_text) is not None:
+ return None
+ func_prefix = query_match.group('func_prefix')
+ return {
+ # user's input
+ 'text': text,
+ # rql( or session.execute(
+ 'func_prefix': func_prefix,
+ # offset of rql query
+ 'rql_offset': len(func_prefix) + 2,
+ # incomplete rql query
+ 'rql_query': parameters_text,
+ }
+
+ def possible_matches(self, text):
+ """call ``rql.suggestions`` component to complete user's input.
+ """
+ # readline will only send last token, but we need the entire user's input
+ user_input = readline.get_line_buffer()
+ query_struct = self.match(user_input)
+ if query_struct is None:
+ return []
+ else:
+ # we must only send completions of the last token => compute where it
+ # starts relatively to the rql query itself.
+ completion_offset = readline.get_begidx() - query_struct['rql_offset']
+ rql_query = query_struct['rql_query']
+ return [suggestion[completion_offset:]
+ for suggestion in self.rsb.build_suggestions(rql_query)]
+
+
+class DefaultMatcher(AbstractMatcher):
+ """Default matcher: delegate to standard's `rlcompleter.Completer`` class
+ """
+ def __init__(self, local_ctx):
+ self.completer = Completer(local_ctx)
+
+ def possible_matches(self, text):
+ if "." in text:
+ return self.completer.attr_matches(text)
+ else:
+ return self.completer.global_matches(text)
+
+
+class CWShellCompleter(object):
+ """Custom auto-completion helper for cubicweb-ctl shell.
+
+ ``CWShellCompleter`` provides a ``complete`` method suitable for
+ ``readline.set_completer``.
+
+ Attributes:
+ matchers: the list of ``AbstractMatcher`` instances that will suggest
+ possible completions
+
+ The completion process is the following:
+
+ - readline calls the ``complete`` method with user's input,
+ - the ``complete`` method asks for each known matchers if
+ it can suggest completions for user's input.
+ """
+
+ def __init__(self, local_ctx):
+ # list of matchers to ask for possible matches on completion
+ self.matchers = [DefaultMatcher(local_ctx)]
+ self.matchers.insert(0, RQLExecuteMatcher(local_ctx, local_ctx['session']))
+
+ def complete(self, text, state):
+ """readline's completer method
+
+ cf http://docs.python.org/2/library/readline.html#readline.set_completer
+ for more details.
+
+ Implementation inspired by `rlcompleter.Completer`
+ """
+ if state == 0:
+ # reset self.matches
+ self.matches = []
+ for matcher in self.matchers:
+ matches = matcher.possible_matches(text)
+ if matches:
+ self.matches = matches
+ break
+ else:
+ return None # no matcher able to handle `text`
+ try:
+ return self.matches[state]
+ except IndexError:
+ return None