hgext3rd/pullbundle.py
author Pierre-Yves David <pierre-yves.david@octobus.net>
Tue, 25 Sep 2018 12:19:41 +0200
changeset 4137 21a3c051ca6c
parent 4136 be3a94d3105f
child 4138 cfdc6f55599b
permissions -rw-r--r--
stablerange: fix slicing of arbitrary ranges Pull bundle trigger slicing from range with arbitrary initial skip. We have to adjust the current slicing to take this into account.

# Extension to provide automatic caching of bundle server for pull
#
# Copyright 2018 Pierre-Yves David <pierre-yves.david@ens-lyon.org>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
"""pullbundle: automatic server side bundle caching

General principle
=================

This extension provides a means for server to use pre-computed bundle for
serving arbitrary pulls. If missing, the necessary pre-computed bundle will be
generated on demand.

To maximize usage of existing cached bundle, each pull will be served through
multiple bundles. The bundle will be created using "standard range" from the
"stablerange" principle. The "stablerange" concept if already used for
obsmarkers discovery in the evolve extensions.

Using pull Bundle
=================

All configuration is only required server side.

The "stablerange" code currently still live in the evolve extensions, so for
now enabling that extensions is required:

You need at minimum the following configuration:

    [extensions]
    evolve=yes
    pullbundle=yes
    [experimental]
    obshashrange.warm-cache = yes

If you do not want to use evolution server side, you should disable obsmarkers exchange:

    [experimental]
    evolution.exchange=no

Extra Configuration
===================

  [pullbundle]
  # By default bundles are stored `.hg/cache/pullbundles/.
  # This can be changed with the following config:
  cache-directory=/absolute/path

Implementation status
=====================

Both for stablerange and pullbundle use "simple" initial implementations.
Theses implemenations focus on testing the algorithms and proving the features
works. Yet they are already useful and used in production.

Performances are expected to greatly improved in the final implementation,
especially if some of it end up being compiled.

This first implementation lacks the ability to server the cached bundle from a
CDN. We'll want this limitation to be lifted quickly.

Why is does this live in the same repository as evolve
======================================================

There is no fundamental reasons for live in the same repository. However, the
stablerange data-structure lives in evolve, so it was simpler to put this new
extensions next to it. As soon as stable range have been upstreamed, we won't
need the dependency to the evolve extension anymore.
"""
import errno
import os

from mercurial import (
    changegroup,
    discovery,
    exchange,
    narrowspec,
    node as nodemod,
    registrar,
    util,
)

from mercurial.i18n import _

__version__ = '0.1.0.dev'
testedwith = '4.7.1'
# minimumhgversion = ''
buglink = 'https://bz.mercurial-scm.org/'

configtable = {}
configitem = registrar.configitem(configtable)

configitem('pullbundle', 'cache-directory',
           default=None,
)

# generic wrapping

def uisetup(ui):
    exchange.getbundle2partsmapping['changegroup'] = _getbundlechangegrouppart

def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
                              b2caps=None, heads=None, common=None, **kwargs):
    """add a changegroup part to the requested bundle"""
    if not kwargs.get(r'cg', True):
        return

    version = '01'
    cgversions = b2caps.get('changegroup')
    if cgversions:  # 3.1 and 3.2 ship with an empty value
        cgversions = [v for v in cgversions
                      if v in changegroup.supportedoutgoingversions(repo)]
        if not cgversions:
            raise ValueError(_('no common changegroup version'))
        version = max(cgversions)

    outgoing = exchange._computeoutgoing(repo, heads, common)
    if not outgoing.missing:
        return

    if kwargs.get(r'narrow', False):
        include = sorted(filter(bool, kwargs.get(r'includepats', [])))
        exclude = sorted(filter(bool, kwargs.get(r'excludepats', [])))
        filematcher = narrowspec.match(repo.root, include=include,
                                       exclude=exclude)
    else:
        filematcher = None

    # START OF ALTERED PART
    makeallcgpart(bundler.newpart, repo, outgoing, version, source, bundlecaps,
                  filematcher, cgversions)
    # END OF ALTERED PART

    if kwargs.get(r'narrow', False) and (include or exclude):
        narrowspecpart = bundler.newpart('narrow:spec')
        if include:
            narrowspecpart.addparam(
                'include', '\n'.join(include), mandatory=True)
        if exclude:
            narrowspecpart.addparam(
                'exclude', '\n'.join(exclude), mandatory=True)

def makeallcgpart(newpart, repo, outgoing, version, source,
                  bundlecaps, filematcher, cgversions):

    pullbundle = not filematcher
    if pullbundle and not util.safehasattr(repo, 'stablerange'):
        repo.ui.warn('pullbundle: required extension "evolve" are missing, skipping pullbundle\n')
        pullbundle = False
    if filematcher:
        makeonecgpart(newpart, repo, None, outgoing, version, source, bundlecaps,
                      filematcher, cgversions)
    else:
        start = util.timer()
        slices = sliceoutgoing(repo, outgoing)
        end = util.timer()
        msg = _('pullbundle-cache: "missing" set sliced into %d subranges '
                'in %s seconds\n')
        repo.ui.write(msg % (len(slices), end - start))
        for sliceid, sliceout in slices:
            makeonecgpart(newpart, repo, sliceid, sliceout, version, source, bundlecaps,
                          filematcher, cgversions)

# stable range slicing

def sliceoutgoing(repo, outgoing):
    cl = repo.changelog
    rev = cl.nodemap.get
    node = cl.node
    revsort = repo.stablesort

    missingrevs = set(rev(n) for n in outgoing.missing)
    allslices = []
    missingheads = [rev(n) for n in outgoing.missingheads]
    for head in missingheads:
        localslices = []
        localmissing = set(repo.revs('%ld and ::%d', missingrevs, head))
        while localmissing:
            slicerevs = []
            for r in revsort.walkfrom(repo, head):
                if r not in missingrevs:
                    break
                slicerevs.append(r)
            slicenodes = [node(r) for r in slicerevs]
            localslices.extend(canonicalslices(repo, slicenodes))
            missingrevs.difference_update(slicerevs)
            localmissing.difference_update(slicerevs)
            if localmissing:
                head = max(localmissing)

        allslices.extend(localslices)
    # unknown subrange might had to be computed
    repo.stablerange.save(repo)
    return [(rangeid, outgoingfromnodes(repo, nodes))
            for rangeid, nodes in allslices]

def canonicalslices(repo, nodes):
    depth = repo.depthcache.get
    stablerange = repo.stablerange
    rangelength = lambda x: stablerange.rangelength(repo, x)
    headrev = repo.changelog.rev(nodes[0])
    nbrevs = len(nodes)
    headdepth = depth(headrev)
    skipped = headdepth - nbrevs
    rangeid = (headrev, skipped)

    subranges = canonicalsubranges(repo, stablerange, rangeid)
    idx = 0
    slices = []
    nodes.reverse()
    for rangeid in subranges:
        size = rangelength(rangeid)
        slices.append((rangeid, nodes[idx:idx + size]))
        idx += size
    return slices

def canonicalsubranges(repo, stablerange, rangeid):
    """slice a size of nodes into most reusable subranges

    We try to slice a range into a set of "largest" and "canonical" stable
    range.

    It might make sense to move this function as a 'stablerange' method.
    """
    headrev, skip = rangeid
    rangedepth = stablerange.depthrev(repo, rangeid[0])
    canonicals = []

    # 0. find the first power of 2 higher than this range depth
    cursor = 1
    while cursor <= rangedepth:
        cursor *= 2

    # 1. find first cupt
    precut = cut = 0
    while True:
        if skip <= cut:
            break
        if cut + cursor < rangedepth:
            precut = cut
            cut += cursor
        if cursor == 1:
            break
        cursor //= 2

    # 2. optimise, bottom part
    if skip != cut:
        tailranges = []
        tailsize = cut - skip
        assert 0 < tailsize, tailsize
        prerange = (headrev, precut)
        size = stablerange.rangelength(repo, prerange)
        sub = stablerange.subranges(repo, prerange)[:-1]
        # This power of two check is too simplistic and misbehave when too many
        # merge are involved. because de merge, there can be "canonical" range
        # with various size.
        while not poweroftwo(tailsize):
            for prerange in reversed(sub):
                if tailsize <= 0:
                    break

                assert stablerange.depthrev(repo, prerange[0]) != prerange[1], prerange
                tailrev, tailskip = prerange
                size = stablerange.rangelength(repo, (tailrev, tailskip))
                if tailsize < size:
                    tailskip += size - tailsize
                    size = tailsize
                tailranges.append((tailrev, tailskip))
                tailsize -= size
            else:
                # size of the last block
                tailsize = stablerange.rangelength(repo, tailranges[-1])
                if poweroftwo(tailsize):
                    continue # exit the loop
                prerange = tailranges.pop()
                sub = stablerange.subranges(repo, prerange)

        tailranges.reverse()
        canonicals.extend(tailranges)

    # 3. take recursive subrange until we get to a power of two size?
    current = (headrev, cut)
    while not poweroftwo(stablerange.rangelength(repo, current)):
        sub = stablerange.subranges(repo, current)
        canonicals.extend(sub[:-1])
        current = sub[-1]
    canonicals.append(current)

    return canonicals

def poweroftwo(num):
    return num and not num & (num - 1)

def outgoingfromnodes(repo, nodes):
    return discovery.outgoing(repo,
                              missingroots=nodes,
                              missingheads=nodes)

# changegroup part construction

def _changegroupinfo(repo, nodes, source):
    if repo.ui.verbose or source == 'bundle':
        repo.ui.status(_("%d changesets found\n") % len(nodes))

def _makenewstream(newpart, repo, outgoing, version, source,
                   bundlecaps, filematcher, cgversions):
    old = changegroup._changegroupinfo
    try:
        changegroup._changegroupinfo = _changegroupinfo
        cgstream = changegroup.makestream(repo, outgoing, version, source,
                                          bundlecaps=bundlecaps,
                                          filematcher=filematcher)
    finally:
        changegroup._changegroupinfo = old

    nbchanges = len(outgoing.missing)
    pversion = None
    if cgversions:
        pversion = version
    return (cgstream, nbchanges, pversion)

def _makepartfromstream(newpart, repo, cgstream, nbchanges, version):
    # same as upstream code

    part = newpart('changegroup', data=cgstream)
    if version:
        part.addparam('version', version)

    part.addparam('nbchanges', '%d' % nbchanges,
                  mandatory=False)

    if 'treemanifest' in repo.requirements:
        part.addparam('treemanifest', '1')

# cache management

def cachedir(repo):
    cachedir = repo.ui.config('pullbundle', 'cache-directory')
    if cachedir is not None:
        return cachedir
    return repo.cachevfs.join('pullbundles')

def getcache(repo, bundlename):
    cdir = cachedir(repo)
    bundlepath = os.path.join(cdir, bundlename)
    try:
        fd = open(bundlepath, 'rb')
        return util.filechunkiter(fd)
    except IOError as exc:
        if exc.errno != errno.ENOENT:
            raise
        return None

def cachewriter(repo, bundlename, stream):
    cdir = cachedir(repo)
    bundlepath = os.path.join(cdir, bundlename)
    try:
        os.makedirs(cdir)
    except OSError as exc:
        if exc.errno == errno.EEXIST:
            pass
    with util.atomictempfile(bundlepath) as cachefile:
        for chunk in stream:
            cachefile.write(chunk)
            yield chunk

BUNDLEMASK = "%s-%s-%010iskip-%010isize.hg"

def makeonecgpart(newpart, repo, rangeid, outgoing, version, source,
                  bundlecaps, filematcher, cgversions):
    bundlename = cachedata = None
    if rangeid is not None:
        nbchanges = repo.stablerange.rangelength(repo, rangeid)
        headnode = nodemod.hex(repo.changelog.node(rangeid[0]))
        # XXX do we need to use cgversion in there?
        bundlename = BUNDLEMASK % (version, headnode, rangeid[1], nbchanges)
        cachedata = getcache(repo, bundlename)
    if cachedata is None:
        partdata = _makenewstream(newpart, repo, outgoing, version, source,
                                  bundlecaps, filematcher, cgversions)
        if bundlename is not None:
            cgstream = cachewriter(repo, bundlename, partdata[0])
            partdata = (cgstream,) + partdata[1:]
    else:
        if repo.ui.verbose or source == 'bundle':
            repo.ui.status(_("%d changesets found in caches\n") % nbchanges)
        pversion = None
        if cgversions:
            pversion = version
        partdata = (cachedata, nbchanges, pversion)
    return _makepartfromstream(newpart, repo, *partdata)