hgext3rd/pullbundle.py
author Pierre-Yves David <pierre-yves.david@octobus.net>
Sun, 23 Sep 2018 00:08:02 +0200
changeset 4130 a1f6b8211016
parent 4129 bc4e62a1cb82
child 4132 afc933d32085
permissions -rw-r--r--
pullbundle: add caching logic We now only generate a bundle once (and store it to disk). If we need it again, we use it directly from disk.

# Extension to provide automatic caching of bundle server for pull
#
# Copyright 2018 Pierre-Yves David <pierre-yves.david@ens-lyon.org>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

import errno
import os

from mercurial import (
    changegroup,
    discovery,
    exchange,
    narrowspec,
    node as nodemod,
    util,
)

from mercurial.i18n import _

# generic wrapping

def uisetup(ui):
    exchange.getbundle2partsmapping['changegroup'] = _getbundlechangegrouppart

def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
                              b2caps=None, heads=None, common=None, **kwargs):
    """add a changegroup part to the requested bundle"""
    if not kwargs.get(r'cg', True):
        return

    version = '01'
    cgversions = b2caps.get('changegroup')
    if cgversions:  # 3.1 and 3.2 ship with an empty value
        cgversions = [v for v in cgversions
                      if v in changegroup.supportedoutgoingversions(repo)]
        if not cgversions:
            raise ValueError(_('no common changegroup version'))
        version = max(cgversions)

    outgoing = exchange._computeoutgoing(repo, heads, common)
    if not outgoing.missing:
        return

    if kwargs.get(r'narrow', False):
        include = sorted(filter(bool, kwargs.get(r'includepats', [])))
        exclude = sorted(filter(bool, kwargs.get(r'excludepats', [])))
        filematcher = narrowspec.match(repo.root, include=include,
                                       exclude=exclude)
    else:
        filematcher = None

    # START OF ALTERED PART
    makeallcgpart(bundler.newpart, repo, outgoing, version, source, bundlecaps,
                  filematcher, cgversions)
    # END OF ALTERED PART

    if kwargs.get(r'narrow', False) and (include or exclude):
        narrowspecpart = bundler.newpart('narrow:spec')
        if include:
            narrowspecpart.addparam(
                'include', '\n'.join(include), mandatory=True)
        if exclude:
            narrowspecpart.addparam(
                'exclude', '\n'.join(exclude), mandatory=True)

def makeallcgpart(newpart, repo, outgoing, version, source,
                  bundlecaps, filematcher, cgversions):

    pullbundle = not filematcher
    if pullbundle and not util.safehasattr(repo, 'stablerange'):
        repo.ui.warn('pullbundle: required extension "evolve" are missing, skipping pullbundle\n')
        pullbundle = False
    if filematcher:
        makeonecgpart(newpart, repo, None, outgoing, version, source, bundlecaps,
                      filematcher, cgversions)
    else:
        for sliceid, sliceout in sliceoutgoing(repo, outgoing):
            makeonecgpart(newpart, repo, sliceid, sliceout, version, source, bundlecaps,
                          filematcher, cgversions)

# stable range slicing

def sliceoutgoing(repo, outgoing):
    cl = repo.changelog
    rev = cl.nodemap.get
    node = cl.node
    revsort = repo.stablesort

    missingrevs = set(rev(n) for n in outgoing.missing)
    allslices = []
    missingheads = [rev(n) for n in outgoing.missingheads]
    for head in missingheads:
        localslices = []
        localmissing = set(repo.revs('%ld and ::%d', missingrevs, head))
        while localmissing:
            slicerevs = []
            for r in revsort.walkfrom(repo, head):
                if r not in missingrevs:
                    break
                slicerevs.append(r)
            slicenodes = [node(r) for r in slicerevs]
            localslices.extend(canonicalslices(repo, slicenodes))
            missingrevs.difference_update(slicerevs)
            localmissing.difference_update(slicerevs)
            if localmissing:
                head = max(localmissing)

        allslices.extend(localslices)
    return [(rangeid, outgoingfromnodes(repo, nodes))
            for rangeid, nodes in allslices]

def canonicalslices(repo, nodes):
    depth = repo.depthcache.get
    stablerange = repo.stablerange
    rangelength = lambda x: stablerange.rangelength(repo, x)
    headrev = repo.changelog.rev(nodes[0])
    nbrevs = len(nodes)
    headdepth = depth(headrev)
    skipped = headdepth - nbrevs
    rangeid = (headrev, skipped)

    subranges = canonicalsubranges(repo, stablerange, rangeid)
    idx = 0
    slices = []
    nodes.reverse()
    for rangeid in subranges:
        size = rangelength(rangeid)
        slices.append((rangeid, nodes[idx:idx + size]))
        idx += size
    return slices

def canonicalsubranges(repo, stablerange, rangeid):
    """slice a size of nodes into most reusable subranges

    We try to slice a range into a set of "largest" and "canonical" stable
    range.

    It might make sense to move this function as a 'stablerange' method.
    """
    headrev, skip = rangeid
    rangedepth = stablerange.depthrev(repo, rangeid[0])
    canonicals = []

    # 0. find the first power of 2 higher than this range depth
    cursor = 1
    while cursor <= rangedepth:
        cursor *= 2

    # 1. find first cupt
    precut = cut = 0
    while True:
        if skip <= cut:
            break
        if cut + cursor < rangedepth:
            precut = cut
            cut += cursor
        if cursor == 1:
            break
        cursor //= 2

    # 2. optimise, bottom part
    if skip != cut:
        tailranges = []
        tailsize = cut - skip
        assert 0 < tailsize, tailsize
        prerange = (headrev, precut)
        size = stablerange.rangelength(repo, prerange)
        sub = stablerange.subranges(repo, prerange)[:-1]
        while not poweroftwo(tailsize):
            for prerange in reversed(sub):
                if tailsize <= 0:
                    break

                assert stablerange.depthrev(repo, prerange[0]) != prerange[1], prerange
                tailrev, tailskip = prerange
                size = stablerange.rangelength(repo, (tailrev, tailskip))
                if tailsize < size:
                    tailskip += size - tailsize
                    size = tailsize
                tailranges.append((tailrev, tailskip))
                tailsize -= size
            else:
                # size of the last block
                tailsize = stablerange.rangelength(repo, tailranges[-1])
                if poweroftwo(tailsize):
                    continue # exit the loop
                prerange = tailranges.pop()
                sub = stablerange.subranges(repo, prerange)

        tailranges.reverse()
        canonicals.extend(tailranges)

    # 3. take recursive subrange until we get to a power of two size?
    current = (headrev, cut)
    while not poweroftwo(stablerange.rangelength(repo, current)):
        sub = stablerange.subranges(repo, current)
        canonicals.extend(sub[:-1])
        current = sub[-1]
    canonicals.append(current)

    return canonicals

def poweroftwo(num):
    return num and not num & (num - 1)

def outgoingfromnodes(repo, nodes):
    return discovery.outgoing(repo,
                              missingroots=nodes,
                              missingheads=nodes)

# changegroup part construction

def _changegroupinfo(repo, nodes, source):
    if repo.ui.verbose or source == 'bundle':
        repo.ui.status(_("%d changesets found\n") % len(nodes))

def _makenewstream(newpart, repo, outgoing, version, source,
                   bundlecaps, filematcher, cgversions):
    old = changegroup._changegroupinfo
    try:
        changegroup._changegroupinfo = _changegroupinfo
        cgstream = changegroup.makestream(repo, outgoing, version, source,
                                          bundlecaps=bundlecaps,
                                          filematcher=filematcher)
    finally:
        changegroup._changegroupinfo = old

    nbchanges = len(outgoing.missing)
    pversion = None
    if cgversions:
        pversion = version
    return (cgstream, nbchanges, pversion)

def _makepartfromstream(newpart, repo, cgstream, nbchanges, version):
    # same as upstream code

    part = newpart('changegroup', data=cgstream)
    if version:
        part.addparam('version', version)

    part.addparam('nbchanges', '%d' % nbchanges,
                  mandatory=False)

    if 'treemanifest' in repo.requirements:
        part.addparam('treemanifest', '1')

# cache management

def cachedir(repo):
    return repo.cachevfs.join('pullbundles')

def getcache(repo, bundlename):
    cdir = cachedir(repo)
    bundlepath = os.path.join(cdir, bundlename)
    try:
        fd = open(bundlepath, 'rb')
        return util.filechunkiter(fd)
    except IOError as exc:
        if exc.errno != errno.ENOENT:
            raise
        return None

def cachewriter(repo, bundlename, stream):
    cdir = cachedir(repo)
    bundlepath = os.path.join(cdir, bundlename)
    try:
        os.makedirs(cdir)
    except OSError as exc:
        if exc.errno == errno.EEXIST:
            pass
    with util.atomictempfile(bundlepath) as cachefile:
        for chunk in stream:
            cachefile.write(chunk)
            yield chunk

BUNDLEMASK = "%s-%s-%010iskip-%010isize.hg"

def makeonecgpart(newpart, repo, rangeid, outgoing, version, source,
                  bundlecaps, filematcher, cgversions):
    bundlename = cachedata = None
    if rangeid is not None:
        nbchanges = repo.stablerange.rangelength(repo, rangeid)
        headnode = nodemod.hex(repo.changelog.node(rangeid[0]))
        # XXX do we need to use cgversion in there?
        bundlename = BUNDLEMASK % (version, headnode, rangeid[1], nbchanges)
        cachedata = getcache(repo, bundlename)
    if cachedata is None:
        partdata = _makenewstream(newpart, repo, outgoing, version, source,
                                  bundlecaps, filematcher, cgversions)
        if bundlename is not None:
            cgstream = cachewriter(repo, bundlename, partdata[0])
            partdata = (cgstream,) + partdata[1:]
    else:
        if repo.ui.verbose or source == 'bundle':
            repo.ui.status(_("%d changesets found in caches\n") % nbchanges)
        pversion = None
        if cgversions:
            pversion = version
        partdata = (cachedata, nbchanges, pversion)
    return _makepartfromstream(newpart, repo, *partdata)