# Various utility function for the evolve extension
#
# Copyright 2017 Pierre-Yves David <pierre-yves.david@ens-lyon.org>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from mercurial import (
obsutil,
)
from mercurial.i18n import _
from mercurial.node import nullrev
shorttemplate = b"[{label('evolve.rev', rev)}] {desc|firstline}\n"
stacktemplate = b"""[{label('evolve.rev', if(topicidx, "s{topicidx}", rev))}] {desc|firstline}\n"""
def obsexcmsg(ui, message, important=False):
verbose = ui.configbool(b'experimental', b'verbose-obsolescence-exchange')
if verbose:
message = b'OBSEXC: ' + message
if important or verbose:
ui.status(message)
def filterparents(parents):
"""filter nullrev parents
(and other crazyness)"""
p1, p2 = parents
if p1 == nullrev and p2 == nullrev:
return ()
elif p1 != nullrev and (p2 == nullrev or p1 == p2):
return (p1,)
elif p1 == nullrev and p2 != nullrev:
return (p2,)
else:
return parents
def shouldwarmcache(repo, tr):
configbool = repo.ui.configbool
config = repo.ui.config
desc = getattr(tr, 'desc', b'')
autocase = False
if tr is None and not getattr(repo, '_destroying', False):
autocase = True
elif desc.startswith(b'serve'):
autocase = True
elif desc.startswith(b'push') and not desc.startswith(b'push-response'):
autocase = True
autocache = config(b'experimental', b'obshashrange.warm-cache',
b'auto') == b'auto'
if autocache:
warm = autocase
else:
# note: we should not get to the default case
warm = configbool(b'experimental', b'obshashrange.warm-cache')
if not configbool(b'experimental', b'obshashrange'):
return False
if not warm:
return False
maxrevs = repo.ui.configint(b'experimental', b'obshashrange.max-revs')
if maxrevs is not None and maxrevs < len(repo.unfiltered()):
return False
return True
class MultipleSuccessorsError(RuntimeError):
"""Exception raised by _singlesuccessor when multiple successor sets exists
The object contains the list of successorssets in its 'successorssets'
attribute to call to easily recover.
"""
def __init__(self, successorssets):
self.successorssets = successorssets
self.divergenceflag = len(successorssets) > 1
self.splitflag = len(successorssets[0]) > 1
def builddependencies(repo, revs):
"""returns dependency graphs giving an order to solve instability of revs
(see _orderrevs for more information on usage)"""
# For each troubled revision we keep track of what instability if any should
# be resolved in order to resolve it. Example:
# dependencies = {3: [6], 6:[]}
# Means that: 6 has no dependency, 3 depends on 6 to be solved
dependencies = {}
for r in revs:
dependencies[r] = set()
for p in repo[r].parents():
for succ in _successorrevs(repo, p):
if succ in revs:
dependencies[r].add(succ)
# rdependencies is the inverted dict of dependencies
rdependencies = {r: set() for r in revs}
for r, deps in dependencies.items():
for dep in deps:
rdependencies[dep].add(r)
return dependencies, rdependencies
def _singlesuccessor(repo, p):
"""returns p (as rev) if not obsolete or its unique latest successors
fail if there are no such successor"""
if not p.obsolete():
return p.rev()
obs = repo[p]
ui = repo.ui
newer = obsutil.successorssets(repo, obs.node())
# search of a parent which is not killed
while not newer:
ui.debug(b"stabilize target %s is plain dead,"
b" trying to stabilize on its parent\n" %
obs)
obs = obs.p1()
newer = obsutil.successorssets(repo, obs.node())
if len(newer) > 1 or len(newer[0]) > 1:
raise MultipleSuccessorsError(newer)
return repo[newer[0][0]].rev()
def picksplitsuccessor(ui, repo, ctx, evolvecand):
"""choose a successor of ctx from split targets
Choose highest one if all successors are in a topological branch. And if
they are split over multiple topological branches, we ask user to choose
an evolve destination.
Return (True, succ) unless split targets are split over multiple
topological branches and user didn't choose any evolve destination,
in which case return (False, '.')
"""
targets = obsutil.successorssets(repo, ctx.node())[0]
assert targets
targetrevs = [repo[r].rev() for r in targets]
heads = repo.revs(b'heads(%ld::%ld)', targetrevs, targetrevs)
if len(heads) > 1:
cheader = (_(b"ancestor of '%s' split over multiple topological"
b" branches.\nchoose an evolve destination:") %
evolvecand)
selectedrev = revselectionprompt(ui, repo, list(heads), cheader)
if selectedrev is None:
return (False, '.')
succ = repo[selectedrev]
else:
succ = repo[heads.first()]
return (True, repo[succ].rev())
def _successorrevs(repo, ctx):
try:
return {_singlesuccessor(repo, ctx)}
except MultipleSuccessorsError as exc:
return {repo[node].rev()
for successorsset in exc.successorssets
for node in successorsset}
def revselectionprompt(ui, repo, revs, customheader=b""):
"""function to prompt user to choose a revision from all the revs and return
that revision for further tasks
revs is a list of rev number of revision from which one revision should be
choosed by the user
customheader is a text which the caller wants as the header of the prompt
which will list revisions to select
returns value is:
rev number of revision choosed: if user choose a revision
None: if user entered a wrong input, user quit the prompt,
ui.interactive is not set
"""
# ui.interactive is not set, fallback to default behavior and avoid showing
# the prompt
if not ui.interactive():
return None
promptmsg = customheader + b"\n"
for idx, rev in enumerate(revs):
curctx = repo[rev]
revmsg = b"%d: [%s] %s\n" % (idx + 1, curctx,
curctx.description().split(b"\n")[0])
promptmsg += revmsg
promptmsg += _(b"q: quit the prompt\n")
promptmsg += _(b"enter the index of the revision you want to select:")
idxselected = ui.prompt(promptmsg)
intidx = None
try:
intidx = int(idxselected)
except ValueError:
if idxselected == b'q':
return None
ui.write_err(_(b"invalid value '%s' entered for index\n") % idxselected)
return None
if intidx > len(revs) or intidx <= 0:
# we can make this error message better
ui.write_err(_(b"invalid value '%d' entered for index\n") % intidx)
return None
return revs[intidx - 1]
def mergeusers(ui, base, divergent, other):
""" merges the users from two divergent changesets using three-way merge
and return the user that will be used as the author of resolved cset"""
baseuser = base.user()
divuser = divergent.user()
othuser = other.user()
if divuser == othuser:
return divuser
else:
if baseuser == divuser:
return othuser
elif baseuser == othuser:
return divuser
else:
# all three are different, lets concatenate the two authors
# XXX: should we let the user know about concatenation of authors
# by printing some message (or maybe in verbose mode)
users = set(divuser.split(b', '))
users.update(othuser.split(b', '))
user = b', '.join(sorted(users))
return user