pullbundle: slice pull into multiple ranges
We subdivide the set of missing revisions into multiple "range" using the "stable
range" data structure. This slicing aims at maximizing the capability of the
resulting ranges.
# Extension to provide automatic caching of bundle server for pull
#
# Copyright 2018 Pierre-Yves David <pierre-yves.david@ens-lyon.org>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
import errno
import os
from mercurial import (
changegroup,
discovery,
exchange,
narrowspec,
node as nodemod,
util,
)
from mercurial.i18n import _
# generic wrapping
def uisetup(ui):
exchange.getbundle2partsmapping['changegroup'] = _getbundlechangegrouppart
def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
b2caps=None, heads=None, common=None, **kwargs):
"""add a changegroup part to the requested bundle"""
if not kwargs.get(r'cg', True):
return
version = '01'
cgversions = b2caps.get('changegroup')
if cgversions: # 3.1 and 3.2 ship with an empty value
cgversions = [v for v in cgversions
if v in changegroup.supportedoutgoingversions(repo)]
if not cgversions:
raise ValueError(_('no common changegroup version'))
version = max(cgversions)
outgoing = exchange._computeoutgoing(repo, heads, common)
if not outgoing.missing:
return
if kwargs.get(r'narrow', False):
include = sorted(filter(bool, kwargs.get(r'includepats', [])))
exclude = sorted(filter(bool, kwargs.get(r'excludepats', [])))
filematcher = narrowspec.match(repo.root, include=include,
exclude=exclude)
else:
filematcher = None
# START OF ALTERED PART
makeallcgpart(bundler.newpart, repo, outgoing, version, source, bundlecaps,
filematcher, cgversions)
# END OF ALTERED PART
if kwargs.get(r'narrow', False) and (include or exclude):
narrowspecpart = bundler.newpart('narrow:spec')
if include:
narrowspecpart.addparam(
'include', '\n'.join(include), mandatory=True)
if exclude:
narrowspecpart.addparam(
'exclude', '\n'.join(exclude), mandatory=True)
def makeallcgpart(newpart, repo, outgoing, version, source,
bundlecaps, filematcher, cgversions):
pullbundle = not filematcher
if pullbundle and not util.safehasattr(repo, 'stablerange'):
repo.ui.warn('pullbundle: required extension "evolve" are missing, skipping pullbundle\n')
pullbundle = False
if filematcher:
makeonecgpart(newpart, repo, None, outgoing, version, source, bundlecaps,
filematcher, cgversions)
else:
for sliceid, sliceout in sliceoutgoing(repo, outgoing):
makeonecgpart(newpart, repo, sliceid, sliceout, version, source, bundlecaps,
filematcher, cgversions)
# stable range slicing
def sliceoutgoing(repo, outgoing):
cl = repo.changelog
rev = cl.nodemap.get
node = cl.node
revsort = repo.stablesort
missingrevs = set(rev(n) for n in outgoing.missing)
allslices = []
missingheads = [rev(n) for n in outgoing.missingheads]
for head in missingheads:
localslices = []
localmissing = set(repo.revs('%ld and ::%d', missingrevs, head))
while localmissing:
slicerevs = []
for r in revsort.walkfrom(repo, head):
if r not in missingrevs:
break
slicerevs.append(r)
slicenodes = [node(r) for r in slicerevs]
localslices.extend(canonicalslices(repo, slicenodes))
missingrevs.difference_update(slicerevs)
localmissing.difference_update(slicerevs)
if localmissing:
head = max(localmissing)
allslices.extend(localslices)
return [(rangeid, outgoingfromnodes(repo, nodes))
for rangeid, nodes in allslices]
def canonicalslices(repo, nodes):
depth = repo.depthcache.get
stablerange = repo.stablerange
rangelength = lambda x: stablerange.rangelength(repo, x)
headrev = repo.changelog.rev(nodes[0])
nbrevs = len(nodes)
headdepth = depth(headrev)
skipped = headdepth - nbrevs
rangeid = (headrev, skipped)
subranges = canonicalsubranges(repo, stablerange, rangeid)
idx = 0
slices =[]
nodes.reverse()
for rangeid in subranges:
size = rangelength(rangeid)
slices.append((rangeid, nodes[idx:idx + size]))
idx += size
return slices
def canonicalsubranges(repo, stablerange, rangeid):
"""slice a size of nodes into most reusable subranges
We try to slice a range into a set of "largest" and "canonical" stable
range.
It might make sense to move this function as a 'stablerange' method.
"""
headrev, skip = rangeid
rangedepth = stablerange.depthrev(repo, rangeid[0])
canonicals = []
# 0. find the first power of 2 higher than this range depth
cursor = 1
while cursor <= rangedepth:
cursor *= 2
# 1. find first cupt
precut = cut = 0
while True:
if skip <= cut:
break
if cut + cursor < rangedepth:
precut = cut
cut += cursor
if cursor == 1:
break
cursor //=2
# 2. optimise, bottom part
if skip != cut:
tailranges = []
tailsize = cut - skip
assert 0 < tailsize, tailsize
prerange = (headrev, precut)
size = stablerange.rangelength(repo, prerange)
sub = stablerange.subranges(repo, prerange)[:-1]
while not poweroftwo(tailsize):
for prerange in reversed(sub):
if tailsize <= 0:
break
assert stablerange.depthrev(repo, prerange[0]) != prerange[1], prerange
tailrev, tailskip = prerange
size = stablerange.rangelength(repo, (tailrev, tailskip))
if tailsize < size:
tailskip += size - tailsize
size = tailsize
tailranges.append((tailrev, tailskip))
tailsize -= size
else:
# size of the last block
tailsize = stablerange.rangelength(repo, tailranges[-1])
if poweroftwo(tailsize):
continue # exit the loop
prerange = tailranges.pop()
sub = stablerange.subranges(repo, prerange)
tailranges.reverse()
canonicals.extend(tailranges)
# 3. take recursive subrange until we get to a power of two size?
current = (headrev, cut)
while not poweroftwo(stablerange.rangelength(repo, current)):
sub = stablerange.subranges(repo, current)
canonicals.extend(sub[:-1])
current = sub[-1]
canonicals.append(current)
return canonicals
def poweroftwo(num):
return num and not num & (num - 1)
def outgoingfromnodes(repo, nodes):
return discovery.outgoing(repo,
missingroots=nodes,
missingheads=nodes)
# changegroup part construction
def _changegroupinfo(repo, nodes, source):
if repo.ui.verbose or source == 'bundle':
repo.ui.status(_("%d changesets found\n") % len(nodes))
def makeonecgpart(newpart, repo, rangeid, outgoing, version, source,
bundlecaps, filematcher, cgversions):
# same as upstream code
old = changegroup._changegroupinfo
try:
changegroup._changegroupinfo = _changegroupinfo
cgstream = changegroup.makestream(repo, outgoing, version, source,
bundlecaps=bundlecaps,
filematcher=filematcher)
finally:
changegroup._changegroupinfo = old
part = newpart('changegroup', data=cgstream)
if cgversions:
part.addparam('version', version)
part.addparam('nbchanges', '%d' % len(outgoing.missing),
mandatory=False)
if 'treemanifest' in repo.requirements:
part.addparam('treemanifest', '1')