pullbundle: add caching logic
We now only generate a bundle once (and store it to disk). If we need it again,
we use it directly from disk.
# Extension to provide automatic caching of bundle server for pull
#
# Copyright 2018 Pierre-Yves David <pierre-yves.david@ens-lyon.org>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
import errno
import os
from mercurial import (
changegroup,
discovery,
exchange,
narrowspec,
node as nodemod,
util,
)
from mercurial.i18n import _
# generic wrapping
def uisetup(ui):
exchange.getbundle2partsmapping['changegroup'] = _getbundlechangegrouppart
def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
b2caps=None, heads=None, common=None, **kwargs):
"""add a changegroup part to the requested bundle"""
if not kwargs.get(r'cg', True):
return
version = '01'
cgversions = b2caps.get('changegroup')
if cgversions: # 3.1 and 3.2 ship with an empty value
cgversions = [v for v in cgversions
if v in changegroup.supportedoutgoingversions(repo)]
if not cgversions:
raise ValueError(_('no common changegroup version'))
version = max(cgversions)
outgoing = exchange._computeoutgoing(repo, heads, common)
if not outgoing.missing:
return
if kwargs.get(r'narrow', False):
include = sorted(filter(bool, kwargs.get(r'includepats', [])))
exclude = sorted(filter(bool, kwargs.get(r'excludepats', [])))
filematcher = narrowspec.match(repo.root, include=include,
exclude=exclude)
else:
filematcher = None
# START OF ALTERED PART
makeallcgpart(bundler.newpart, repo, outgoing, version, source, bundlecaps,
filematcher, cgversions)
# END OF ALTERED PART
if kwargs.get(r'narrow', False) and (include or exclude):
narrowspecpart = bundler.newpart('narrow:spec')
if include:
narrowspecpart.addparam(
'include', '\n'.join(include), mandatory=True)
if exclude:
narrowspecpart.addparam(
'exclude', '\n'.join(exclude), mandatory=True)
def makeallcgpart(newpart, repo, outgoing, version, source,
bundlecaps, filematcher, cgversions):
pullbundle = not filematcher
if pullbundle and not util.safehasattr(repo, 'stablerange'):
repo.ui.warn('pullbundle: required extension "evolve" are missing, skipping pullbundle\n')
pullbundle = False
if filematcher:
makeonecgpart(newpart, repo, None, outgoing, version, source, bundlecaps,
filematcher, cgversions)
else:
for sliceid, sliceout in sliceoutgoing(repo, outgoing):
makeonecgpart(newpart, repo, sliceid, sliceout, version, source, bundlecaps,
filematcher, cgversions)
# stable range slicing
def sliceoutgoing(repo, outgoing):
cl = repo.changelog
rev = cl.nodemap.get
node = cl.node
revsort = repo.stablesort
missingrevs = set(rev(n) for n in outgoing.missing)
allslices = []
missingheads = [rev(n) for n in outgoing.missingheads]
for head in missingheads:
localslices = []
localmissing = set(repo.revs('%ld and ::%d', missingrevs, head))
while localmissing:
slicerevs = []
for r in revsort.walkfrom(repo, head):
if r not in missingrevs:
break
slicerevs.append(r)
slicenodes = [node(r) for r in slicerevs]
localslices.extend(canonicalslices(repo, slicenodes))
missingrevs.difference_update(slicerevs)
localmissing.difference_update(slicerevs)
if localmissing:
head = max(localmissing)
allslices.extend(localslices)
return [(rangeid, outgoingfromnodes(repo, nodes))
for rangeid, nodes in allslices]
def canonicalslices(repo, nodes):
depth = repo.depthcache.get
stablerange = repo.stablerange
rangelength = lambda x: stablerange.rangelength(repo, x)
headrev = repo.changelog.rev(nodes[0])
nbrevs = len(nodes)
headdepth = depth(headrev)
skipped = headdepth - nbrevs
rangeid = (headrev, skipped)
subranges = canonicalsubranges(repo, stablerange, rangeid)
idx = 0
slices = []
nodes.reverse()
for rangeid in subranges:
size = rangelength(rangeid)
slices.append((rangeid, nodes[idx:idx + size]))
idx += size
return slices
def canonicalsubranges(repo, stablerange, rangeid):
"""slice a size of nodes into most reusable subranges
We try to slice a range into a set of "largest" and "canonical" stable
range.
It might make sense to move this function as a 'stablerange' method.
"""
headrev, skip = rangeid
rangedepth = stablerange.depthrev(repo, rangeid[0])
canonicals = []
# 0. find the first power of 2 higher than this range depth
cursor = 1
while cursor <= rangedepth:
cursor *= 2
# 1. find first cupt
precut = cut = 0
while True:
if skip <= cut:
break
if cut + cursor < rangedepth:
precut = cut
cut += cursor
if cursor == 1:
break
cursor //= 2
# 2. optimise, bottom part
if skip != cut:
tailranges = []
tailsize = cut - skip
assert 0 < tailsize, tailsize
prerange = (headrev, precut)
size = stablerange.rangelength(repo, prerange)
sub = stablerange.subranges(repo, prerange)[:-1]
while not poweroftwo(tailsize):
for prerange in reversed(sub):
if tailsize <= 0:
break
assert stablerange.depthrev(repo, prerange[0]) != prerange[1], prerange
tailrev, tailskip = prerange
size = stablerange.rangelength(repo, (tailrev, tailskip))
if tailsize < size:
tailskip += size - tailsize
size = tailsize
tailranges.append((tailrev, tailskip))
tailsize -= size
else:
# size of the last block
tailsize = stablerange.rangelength(repo, tailranges[-1])
if poweroftwo(tailsize):
continue # exit the loop
prerange = tailranges.pop()
sub = stablerange.subranges(repo, prerange)
tailranges.reverse()
canonicals.extend(tailranges)
# 3. take recursive subrange until we get to a power of two size?
current = (headrev, cut)
while not poweroftwo(stablerange.rangelength(repo, current)):
sub = stablerange.subranges(repo, current)
canonicals.extend(sub[:-1])
current = sub[-1]
canonicals.append(current)
return canonicals
def poweroftwo(num):
return num and not num & (num - 1)
def outgoingfromnodes(repo, nodes):
return discovery.outgoing(repo,
missingroots=nodes,
missingheads=nodes)
# changegroup part construction
def _changegroupinfo(repo, nodes, source):
if repo.ui.verbose or source == 'bundle':
repo.ui.status(_("%d changesets found\n") % len(nodes))
def _makenewstream(newpart, repo, outgoing, version, source,
bundlecaps, filematcher, cgversions):
old = changegroup._changegroupinfo
try:
changegroup._changegroupinfo = _changegroupinfo
cgstream = changegroup.makestream(repo, outgoing, version, source,
bundlecaps=bundlecaps,
filematcher=filematcher)
finally:
changegroup._changegroupinfo = old
nbchanges = len(outgoing.missing)
pversion = None
if cgversions:
pversion = version
return (cgstream, nbchanges, pversion)
def _makepartfromstream(newpart, repo, cgstream, nbchanges, version):
# same as upstream code
part = newpart('changegroup', data=cgstream)
if version:
part.addparam('version', version)
part.addparam('nbchanges', '%d' % nbchanges,
mandatory=False)
if 'treemanifest' in repo.requirements:
part.addparam('treemanifest', '1')
# cache management
def cachedir(repo):
return repo.cachevfs.join('pullbundles')
def getcache(repo, bundlename):
cdir = cachedir(repo)
bundlepath = os.path.join(cdir, bundlename)
try:
fd = open(bundlepath, 'rb')
return util.filechunkiter(fd)
except IOError as exc:
if exc.errno != errno.ENOENT:
raise
return None
def cachewriter(repo, bundlename, stream):
cdir = cachedir(repo)
bundlepath = os.path.join(cdir, bundlename)
try:
os.makedirs(cdir)
except OSError as exc:
if exc.errno == errno.EEXIST:
pass
with util.atomictempfile(bundlepath) as cachefile:
for chunk in stream:
cachefile.write(chunk)
yield chunk
BUNDLEMASK = "%s-%s-%010iskip-%010isize.hg"
def makeonecgpart(newpart, repo, rangeid, outgoing, version, source,
bundlecaps, filematcher, cgversions):
bundlename = cachedata = None
if rangeid is not None:
nbchanges = repo.stablerange.rangelength(repo, rangeid)
headnode = nodemod.hex(repo.changelog.node(rangeid[0]))
# XXX do we need to use cgversion in there?
bundlename = BUNDLEMASK % (version, headnode, rangeid[1], nbchanges)
cachedata = getcache(repo, bundlename)
if cachedata is None:
partdata = _makenewstream(newpart, repo, outgoing, version, source,
bundlecaps, filematcher, cgversions)
if bundlename is not None:
cgstream = cachewriter(repo, bundlename, partdata[0])
partdata = (cgstream,) + partdata[1:]
else:
if repo.ui.verbose or source == 'bundle':
repo.ui.status(_("%d changesets found in caches\n") % nbchanges)
pversion = None
if cgversions:
pversion = version
partdata = (cachedata, nbchanges, pversion)
return _makepartfromstream(newpart, repo, *partdata)