[migration] don't crash when deleting a non-existing rtype
When asked to delete a relation type, instead of checking if it was
computed or not, just delete both the CWRType and CWComputedRType.
# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
"""some utilities for cubicweb command line tools"""
__docformat__ = "restructuredtext en"
# XXX move most of this in logilab.common (shellutils ?)
import os, sys
import subprocess
from os import listdir, makedirs, environ, chmod, walk, remove
from os.path import exists, join, abspath, normpath
import re
from rlcompleter import Completer
try:
import readline
except ImportError: # readline not available, no completion
pass
try:
from os import symlink
except ImportError:
def symlink(*args):
raise NotImplementedError
from logilab.common.clcommands import Command as BaseCommand
from logilab.common.shellutils import ASK
from cubicweb import warning # pylint: disable=E0611
from cubicweb import ConfigurationError, ExecutionError
def underline_title(title, car='-'):
return title+'\n'+(car*len(title))
def iter_dir(directory, condition_file=None, ignore=()):
"""iterate on a directory"""
for sub in listdir(directory):
if sub in ('CVS', '.svn', '.hg'):
continue
if condition_file is not None and \
not exists(join(directory, sub, condition_file)):
continue
if sub in ignore:
continue
yield sub
def create_dir(directory):
"""create a directory if it doesn't exist yet"""
try:
makedirs(directory)
print '-> created directory %s' % directory
except OSError as ex:
import errno
if ex.errno != errno.EEXIST:
raise
print '-> no need to create existing directory %s' % directory
def create_symlink(source, target):
"""create a symbolic link"""
if exists(target):
remove(target)
symlink(source, target)
print '[symlink] %s <-- %s' % (target, source)
def create_copy(source, target):
import shutil
print '[copy] %s <-- %s' % (target, source)
shutil.copy2(source, target)
def rm(whatever):
import shutil
shutil.rmtree(whatever)
print '-> removed %s' % whatever
def show_diffs(appl_file, ref_file, askconfirm=True):
"""interactivly replace the old file with the new file according to
user decision
"""
import shutil
pipe = subprocess.Popen(['diff', '-u', appl_file, ref_file], stdout=subprocess.PIPE)
diffs = pipe.stdout.read()
if diffs:
if askconfirm:
print
print diffs
action = ASK.ask('Replace ?', ('Y', 'n', 'q'), 'Y').lower()
else:
action = 'y'
if action == 'y':
try:
shutil.copyfile(ref_file, appl_file)
except IOError:
os.system('chmod a+w %s' % appl_file)
shutil.copyfile(ref_file, appl_file)
print 'replaced'
elif action == 'q':
sys.exit(0)
else:
copy_file = appl_file + '.default'
copy = file(copy_file, 'w')
copy.write(open(ref_file).read())
copy.close()
print 'keep current version, the new file has been written to', copy_file
else:
print 'no diff between %s and %s' % (appl_file, ref_file)
SKEL_EXCLUDE = ('*.py[co]', '*.orig', '*~', '*_flymake.py')
def copy_skeleton(skeldir, targetdir, context,
exclude=SKEL_EXCLUDE, askconfirm=False):
import shutil
from fnmatch import fnmatch
skeldir = normpath(skeldir)
targetdir = normpath(targetdir)
for dirpath, dirnames, filenames in walk(skeldir):
tdirpath = dirpath.replace(skeldir, targetdir)
create_dir(tdirpath)
for fname in filenames:
if any(fnmatch(fname, pat) for pat in exclude):
continue
fpath = join(dirpath, fname)
if 'CUBENAME' in fname:
tfpath = join(tdirpath, fname.replace('CUBENAME', context['cubename']))
elif 'DISTNAME' in fname:
tfpath = join(tdirpath, fname.replace('DISTNAME', context['distname']))
else:
tfpath = join(tdirpath, fname)
if fname.endswith('.tmpl'):
tfpath = tfpath[:-5]
if not askconfirm or not exists(tfpath) or \
ASK.confirm('%s exists, overwrite?' % tfpath):
fill_templated_file(fpath, tfpath, context)
print '[generate] %s <-- %s' % (tfpath, fpath)
elif exists(tfpath):
show_diffs(tfpath, fpath, askconfirm)
else:
shutil.copyfile(fpath, tfpath)
def fill_templated_file(fpath, tfpath, context):
fobj = file(tfpath, 'w')
templated = file(fpath).read()
fobj.write(templated % context)
fobj.close()
def restrict_perms_to_user(filepath, log=None):
"""set -rw------- permission on the given file"""
if log:
log('set permissions to 0600 for %s', filepath)
else:
print '-> set permissions to 0600 for %s' % filepath
chmod(filepath, 0600)
def read_config(config_file, raise_if_unreadable=False):
"""read some simple configuration from `config_file` and return it as a
dictionary. If `raise_if_unreadable` is false (the default), an empty
dictionary will be returned if the file is inexistant or unreadable, else
:exc:`ExecutionError` will be raised.
"""
from logilab.common.fileutils import lines
config = current = {}
try:
for line in lines(config_file, comments='#'):
try:
option, value = line.split('=', 1)
except ValueError:
option = line.strip().lower()
if option[0] == '[':
# start a section
section = option[1:-1]
assert section not in config, \
'Section %s is defined more than once' % section
config[section] = current = {}
continue
sys.stderr.write('ignoring malformed line\n%r\n' % line)
continue
option = option.strip().replace(' ', '_')
value = value.strip()
current[option] = value or None
except IOError as ex:
if raise_if_unreadable:
raise ExecutionError('%s. Are you logged with the correct user '
'to use this instance?' % ex)
else:
warning('missing or non readable configuration file %s (%s)',
config_file, ex)
return config
_HDLRS = {}
class metacmdhandler(type):
def __new__(mcs, name, bases, classdict):
cls = super(metacmdhandler, mcs).__new__(mcs, name, bases, classdict)
if getattr(cls, 'cfgname', None) and getattr(cls, 'cmdname', None):
_HDLRS.setdefault(cls.cmdname, []).append(cls)
return cls
class CommandHandler(object):
"""configuration specific helper for cubicweb-ctl commands"""
__metaclass__ = metacmdhandler
def __init__(self, config):
self.config = config
class Command(BaseCommand):
"""base class for cubicweb-ctl commands"""
def config_helper(self, config, required=True, cmdname=None):
if cmdname is None:
cmdname = self.name
for helpercls in _HDLRS.get(cmdname, ()):
if helpercls.cfgname == config.name:
return helpercls(config)
if config.name == 'all-in-one':
for helpercls in _HDLRS.get(cmdname, ()):
if helpercls.cfgname == 'repository':
return helpercls(config)
if required:
msg = 'No helper for command %s using %s configuration' % (
cmdname, config.name)
raise ConfigurationError(msg)
def fail(self, reason):
print "command failed:", reason
sys.exit(1)
CONNECT_OPTIONS = (
("user",
{'short': 'u', 'type' : 'string', 'metavar': '<user>',
'help': 'connect as <user> instead of being prompted to give it.',
}
),
("password",
{'short': 'p', 'type' : 'password', 'metavar': '<password>',
'help': 'automatically give <password> for authentication instead of \
being prompted to give it.',
}),
("host",
{'short': 'H', 'type' : 'string', 'metavar': '<hostname>',
'default': None,
'help': 'specify the name server\'s host name. Will be detected by \
broadcast if not provided.',
}),
)
def config_connect(appid, optconfig):
from cubicweb.dbapi import connect
from getpass import getpass
user = optconfig.user
if not user:
user = raw_input('login: ')
password = optconfig.password
if not password:
password = getpass('password: ')
return connect(login=user, password=password, host=optconfig.host, database=appid)
## cwshell helpers #############################################################
class AbstractMatcher(object):
"""Abstract class for CWShellCompleter's matchers.
A matcher should implement a ``possible_matches`` method. This
method has to return the list of possible completions for user's input.
Because of the python / readline interaction, each completion should
be a superset of the user's input.
NOTE: readline tokenizes user's input and only passes last token to
completers.
"""
def possible_matches(self, text):
"""return possible completions for user's input.
Parameters:
text: the user's input
Return:
a list of completions. Each completion includes the original input.
"""
raise NotImplementedError()
class RQLExecuteMatcher(AbstractMatcher):
"""Custom matcher for rql queries.
If user's input starts with ``rql(`` or ``session.execute(`` and
the corresponding rql query is incomplete, suggest some valid completions.
"""
query_match_rgx = re.compile(
r'(?P<func_prefix>\s*(?:rql)' # match rql, possibly indented
r'|' # or
r'\s*(?:\w+\.execute))' # match .execute, possibly indented
# end of <func_prefix>
r'\(' # followed by a parenthesis
r'(?P<quote_delim>["\'])' # a quote or double quote
r'(?P<parameters>.*)') # and some content
def __init__(self, local_ctx, req):
self.local_ctx = local_ctx
self.req = req
self.schema = req.vreg.schema
self.rsb = req.vreg['components'].select('rql.suggestions', req)
@staticmethod
def match(text):
"""check if ``text`` looks like a call to ``rql`` or ``session.execute``
Parameters:
text: the user's input
Returns:
None if it doesn't match, the query structure otherwise.
"""
query_match = RQLExecuteMatcher.query_match_rgx.match(text)
if query_match is None:
return None
parameters_text = query_match.group('parameters')
quote_delim = query_match.group('quote_delim')
# first parameter is fully specified, no completion needed
if re.match(r"(.*?)%s" % quote_delim, parameters_text) is not None:
return None
func_prefix = query_match.group('func_prefix')
return {
# user's input
'text': text,
# rql( or session.execute(
'func_prefix': func_prefix,
# offset of rql query
'rql_offset': len(func_prefix) + 2,
# incomplete rql query
'rql_query': parameters_text,
}
def possible_matches(self, text):
"""call ``rql.suggestions`` component to complete user's input.
"""
# readline will only send last token, but we need the entire user's input
user_input = readline.get_line_buffer()
query_struct = self.match(user_input)
if query_struct is None:
return []
else:
# we must only send completions of the last token => compute where it
# starts relatively to the rql query itself.
completion_offset = readline.get_begidx() - query_struct['rql_offset']
rql_query = query_struct['rql_query']
return [suggestion[completion_offset:]
for suggestion in self.rsb.build_suggestions(rql_query)]
class DefaultMatcher(AbstractMatcher):
"""Default matcher: delegate to standard's `rlcompleter.Completer`` class
"""
def __init__(self, local_ctx):
self.completer = Completer(local_ctx)
def possible_matches(self, text):
if "." in text:
return self.completer.attr_matches(text)
else:
return self.completer.global_matches(text)
class CWShellCompleter(object):
"""Custom auto-completion helper for cubicweb-ctl shell.
``CWShellCompleter`` provides a ``complete`` method suitable for
``readline.set_completer``.
Attributes:
matchers: the list of ``AbstractMatcher`` instances that will suggest
possible completions
The completion process is the following:
- readline calls the ``complete`` method with user's input,
- the ``complete`` method asks for each known matchers if
it can suggest completions for user's input.
"""
def __init__(self, local_ctx):
# list of matchers to ask for possible matches on completion
self.matchers = [DefaultMatcher(local_ctx)]
self.matchers.insert(0, RQLExecuteMatcher(local_ctx, local_ctx['session']))
def complete(self, text, state):
"""readline's completer method
cf http://docs.python.org/2/library/readline.html#readline.set_completer
for more details.
Implementation inspired by `rlcompleter.Completer`
"""
if state == 0:
# reset self.matches
self.matches = []
for matcher in self.matchers:
matches = matcher.possible_matches(text)
if matches:
self.matches = matches
break
else:
return None # no matcher able to handle `text`
try:
return self.matches[state]
except IndexError:
return None