stablerange: fix slicing of arbitrary ranges
Pull bundle trigger slicing from range with arbitrary initial skip. We have to
adjust the current slicing to take this into account.
# Extension to provide automatic caching of bundle server for pull
#
# Copyright 2018 Pierre-Yves David <pierre-yves.david@ens-lyon.org>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
"""pullbundle: automatic server side bundle caching
General principle
=================
This extension provides a means for server to use pre-computed bundle for
serving arbitrary pulls. If missing, the necessary pre-computed bundle will be
generated on demand.
To maximize usage of existing cached bundle, each pull will be served through
multiple bundles. The bundle will be created using "standard range" from the
"stablerange" principle. The "stablerange" concept if already used for
obsmarkers discovery in the evolve extensions.
Using pull Bundle
=================
All configuration is only required server side.
The "stablerange" code currently still live in the evolve extensions, so for
now enabling that extensions is required:
You need at minimum the following configuration:
[extensions]
evolve=yes
pullbundle=yes
[experimental]
obshashrange.warm-cache = yes
If you do not want to use evolution server side, you should disable obsmarkers exchange:
[experimental]
evolution.exchange=no
Extra Configuration
===================
[pullbundle]
# By default bundles are stored `.hg/cache/pullbundles/.
# This can be changed with the following config:
cache-directory=/absolute/path
Implementation status
=====================
Both for stablerange and pullbundle use "simple" initial implementations.
Theses implemenations focus on testing the algorithms and proving the features
works. Yet they are already useful and used in production.
Performances are expected to greatly improved in the final implementation,
especially if some of it end up being compiled.
This first implementation lacks the ability to server the cached bundle from a
CDN. We'll want this limitation to be lifted quickly.
Why is does this live in the same repository as evolve
======================================================
There is no fundamental reasons for live in the same repository. However, the
stablerange data-structure lives in evolve, so it was simpler to put this new
extensions next to it. As soon as stable range have been upstreamed, we won't
need the dependency to the evolve extension anymore.
"""
import errno
import os
from mercurial import (
changegroup,
discovery,
exchange,
narrowspec,
node as nodemod,
registrar,
util,
)
from mercurial.i18n import _
__version__ = '0.1.0.dev'
testedwith = '4.7.1'
# minimumhgversion = ''
buglink = 'https://bz.mercurial-scm.org/'
configtable = {}
configitem = registrar.configitem(configtable)
configitem('pullbundle', 'cache-directory',
default=None,
)
# generic wrapping
def uisetup(ui):
exchange.getbundle2partsmapping['changegroup'] = _getbundlechangegrouppart
def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
b2caps=None, heads=None, common=None, **kwargs):
"""add a changegroup part to the requested bundle"""
if not kwargs.get(r'cg', True):
return
version = '01'
cgversions = b2caps.get('changegroup')
if cgversions: # 3.1 and 3.2 ship with an empty value
cgversions = [v for v in cgversions
if v in changegroup.supportedoutgoingversions(repo)]
if not cgversions:
raise ValueError(_('no common changegroup version'))
version = max(cgversions)
outgoing = exchange._computeoutgoing(repo, heads, common)
if not outgoing.missing:
return
if kwargs.get(r'narrow', False):
include = sorted(filter(bool, kwargs.get(r'includepats', [])))
exclude = sorted(filter(bool, kwargs.get(r'excludepats', [])))
filematcher = narrowspec.match(repo.root, include=include,
exclude=exclude)
else:
filematcher = None
# START OF ALTERED PART
makeallcgpart(bundler.newpart, repo, outgoing, version, source, bundlecaps,
filematcher, cgversions)
# END OF ALTERED PART
if kwargs.get(r'narrow', False) and (include or exclude):
narrowspecpart = bundler.newpart('narrow:spec')
if include:
narrowspecpart.addparam(
'include', '\n'.join(include), mandatory=True)
if exclude:
narrowspecpart.addparam(
'exclude', '\n'.join(exclude), mandatory=True)
def makeallcgpart(newpart, repo, outgoing, version, source,
bundlecaps, filematcher, cgversions):
pullbundle = not filematcher
if pullbundle and not util.safehasattr(repo, 'stablerange'):
repo.ui.warn('pullbundle: required extension "evolve" are missing, skipping pullbundle\n')
pullbundle = False
if filematcher:
makeonecgpart(newpart, repo, None, outgoing, version, source, bundlecaps,
filematcher, cgversions)
else:
start = util.timer()
slices = sliceoutgoing(repo, outgoing)
end = util.timer()
msg = _('pullbundle-cache: "missing" set sliced into %d subranges '
'in %s seconds\n')
repo.ui.write(msg % (len(slices), end - start))
for sliceid, sliceout in slices:
makeonecgpart(newpart, repo, sliceid, sliceout, version, source, bundlecaps,
filematcher, cgversions)
# stable range slicing
def sliceoutgoing(repo, outgoing):
cl = repo.changelog
rev = cl.nodemap.get
node = cl.node
revsort = repo.stablesort
missingrevs = set(rev(n) for n in outgoing.missing)
allslices = []
missingheads = [rev(n) for n in outgoing.missingheads]
for head in missingheads:
localslices = []
localmissing = set(repo.revs('%ld and ::%d', missingrevs, head))
while localmissing:
slicerevs = []
for r in revsort.walkfrom(repo, head):
if r not in missingrevs:
break
slicerevs.append(r)
slicenodes = [node(r) for r in slicerevs]
localslices.extend(canonicalslices(repo, slicenodes))
missingrevs.difference_update(slicerevs)
localmissing.difference_update(slicerevs)
if localmissing:
head = max(localmissing)
allslices.extend(localslices)
# unknown subrange might had to be computed
repo.stablerange.save(repo)
return [(rangeid, outgoingfromnodes(repo, nodes))
for rangeid, nodes in allslices]
def canonicalslices(repo, nodes):
depth = repo.depthcache.get
stablerange = repo.stablerange
rangelength = lambda x: stablerange.rangelength(repo, x)
headrev = repo.changelog.rev(nodes[0])
nbrevs = len(nodes)
headdepth = depth(headrev)
skipped = headdepth - nbrevs
rangeid = (headrev, skipped)
subranges = canonicalsubranges(repo, stablerange, rangeid)
idx = 0
slices = []
nodes.reverse()
for rangeid in subranges:
size = rangelength(rangeid)
slices.append((rangeid, nodes[idx:idx + size]))
idx += size
return slices
def canonicalsubranges(repo, stablerange, rangeid):
"""slice a size of nodes into most reusable subranges
We try to slice a range into a set of "largest" and "canonical" stable
range.
It might make sense to move this function as a 'stablerange' method.
"""
headrev, skip = rangeid
rangedepth = stablerange.depthrev(repo, rangeid[0])
canonicals = []
# 0. find the first power of 2 higher than this range depth
cursor = 1
while cursor <= rangedepth:
cursor *= 2
# 1. find first cupt
precut = cut = 0
while True:
if skip <= cut:
break
if cut + cursor < rangedepth:
precut = cut
cut += cursor
if cursor == 1:
break
cursor //= 2
# 2. optimise, bottom part
if skip != cut:
tailranges = []
tailsize = cut - skip
assert 0 < tailsize, tailsize
prerange = (headrev, precut)
size = stablerange.rangelength(repo, prerange)
sub = stablerange.subranges(repo, prerange)[:-1]
# This power of two check is too simplistic and misbehave when too many
# merge are involved. because de merge, there can be "canonical" range
# with various size.
while not poweroftwo(tailsize):
for prerange in reversed(sub):
if tailsize <= 0:
break
assert stablerange.depthrev(repo, prerange[0]) != prerange[1], prerange
tailrev, tailskip = prerange
size = stablerange.rangelength(repo, (tailrev, tailskip))
if tailsize < size:
tailskip += size - tailsize
size = tailsize
tailranges.append((tailrev, tailskip))
tailsize -= size
else:
# size of the last block
tailsize = stablerange.rangelength(repo, tailranges[-1])
if poweroftwo(tailsize):
continue # exit the loop
prerange = tailranges.pop()
sub = stablerange.subranges(repo, prerange)
tailranges.reverse()
canonicals.extend(tailranges)
# 3. take recursive subrange until we get to a power of two size?
current = (headrev, cut)
while not poweroftwo(stablerange.rangelength(repo, current)):
sub = stablerange.subranges(repo, current)
canonicals.extend(sub[:-1])
current = sub[-1]
canonicals.append(current)
return canonicals
def poweroftwo(num):
return num and not num & (num - 1)
def outgoingfromnodes(repo, nodes):
return discovery.outgoing(repo,
missingroots=nodes,
missingheads=nodes)
# changegroup part construction
def _changegroupinfo(repo, nodes, source):
if repo.ui.verbose or source == 'bundle':
repo.ui.status(_("%d changesets found\n") % len(nodes))
def _makenewstream(newpart, repo, outgoing, version, source,
bundlecaps, filematcher, cgversions):
old = changegroup._changegroupinfo
try:
changegroup._changegroupinfo = _changegroupinfo
cgstream = changegroup.makestream(repo, outgoing, version, source,
bundlecaps=bundlecaps,
filematcher=filematcher)
finally:
changegroup._changegroupinfo = old
nbchanges = len(outgoing.missing)
pversion = None
if cgversions:
pversion = version
return (cgstream, nbchanges, pversion)
def _makepartfromstream(newpart, repo, cgstream, nbchanges, version):
# same as upstream code
part = newpart('changegroup', data=cgstream)
if version:
part.addparam('version', version)
part.addparam('nbchanges', '%d' % nbchanges,
mandatory=False)
if 'treemanifest' in repo.requirements:
part.addparam('treemanifest', '1')
# cache management
def cachedir(repo):
cachedir = repo.ui.config('pullbundle', 'cache-directory')
if cachedir is not None:
return cachedir
return repo.cachevfs.join('pullbundles')
def getcache(repo, bundlename):
cdir = cachedir(repo)
bundlepath = os.path.join(cdir, bundlename)
try:
fd = open(bundlepath, 'rb')
return util.filechunkiter(fd)
except IOError as exc:
if exc.errno != errno.ENOENT:
raise
return None
def cachewriter(repo, bundlename, stream):
cdir = cachedir(repo)
bundlepath = os.path.join(cdir, bundlename)
try:
os.makedirs(cdir)
except OSError as exc:
if exc.errno == errno.EEXIST:
pass
with util.atomictempfile(bundlepath) as cachefile:
for chunk in stream:
cachefile.write(chunk)
yield chunk
BUNDLEMASK = "%s-%s-%010iskip-%010isize.hg"
def makeonecgpart(newpart, repo, rangeid, outgoing, version, source,
bundlecaps, filematcher, cgversions):
bundlename = cachedata = None
if rangeid is not None:
nbchanges = repo.stablerange.rangelength(repo, rangeid)
headnode = nodemod.hex(repo.changelog.node(rangeid[0]))
# XXX do we need to use cgversion in there?
bundlename = BUNDLEMASK % (version, headnode, rangeid[1], nbchanges)
cachedata = getcache(repo, bundlename)
if cachedata is None:
partdata = _makenewstream(newpart, repo, outgoing, version, source,
bundlecaps, filematcher, cgversions)
if bundlename is not None:
cgstream = cachewriter(repo, bundlename, partdata[0])
partdata = (cgstream,) + partdata[1:]
else:
if repo.ui.verbose or source == 'bundle':
repo.ui.status(_("%d changesets found in caches\n") % nbchanges)
pversion = None
if cgversions:
pversion = version
partdata = (cachedata, nbchanges, pversion)
return _makepartfromstream(newpart, repo, *partdata)