hgext3rd/evolve/exchange.py
author Pierre-Yves David <pierre-yves.david@ens-lyon.org>
Tue, 07 Mar 2017 12:27:01 +0100
changeset 2047 ce39d0f9976d
parent 2044 d31ad31e456b
permissions -rw-r--r--
serveronly: give the sub extension a way to access to the 'evolve' module We keep it as clean as possible but if the extension is specified with a direct path, we have to be a bit hacky. Being able to access the whole evolve extension from the 'serveronly' extension will lift multiple constraints on how we organise the code and will allow for cleaner and clearer code. We extract a minor function into a 'utility' module to have something to depends on.

#####################################################################
### Obsolescence marker exchange experimenation                   ###
#####################################################################

from __future__ import absolute_import

try:
    import StringIO as io
    StringIO = io.StringIO
except ImportError:
    import io
    StringIO = io.StringIO

import errno
import socket

from mercurial import (
    error,
    exchange,
    httppeer,
    localrepo,
    node,
    obsolete,
    util,
    wireproto,
)
from mercurial.i18n import _

from . import (
    exthelper,
    serveronly,
    utility,
)

eh = exthelper.exthelper()
obsexcmsg = utility.obsexcmsg

def obsexcprg(ui, *args, **kwargs):
    topic = 'obsmarkers exchange'
    if ui.configbool('experimental', 'verbose-obsolescence-exchange', False):
        topic = 'OBSEXC'
    ui.progress(topic, *args, **kwargs)

@eh.wrapfunction(exchange, '_pushdiscoveryobsmarkers')
def _pushdiscoveryobsmarkers(orig, pushop):
    if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
        and pushop.repo.obsstore
        and 'obsolete' in pushop.remote.listkeys('namespaces')):
        repo = pushop.repo
        obsexcmsg(repo.ui, "computing relevant nodes\n")
        revs = list(repo.revs('::%ln', pushop.futureheads))
        unfi = repo.unfiltered()
        cl = unfi.changelog
        if not pushop.remote.capable('_evoext_obshash_0'):
            # do not trust core yet
            # return orig(pushop)
            nodes = [cl.node(r) for r in revs]
            if nodes:
                obsexcmsg(repo.ui, "computing markers relevant to %i nodes\n"
                                   % len(nodes))
                pushop.outobsmarkers = repo.obsstore.relevantmarkers(nodes)
            else:
                obsexcmsg(repo.ui, "markers already in sync\n")
                pushop.outobsmarkers = []
                pushop.outobsmarkers = repo.obsstore.relevantmarkers(nodes)
            return

        common = []
        obsexcmsg(repo.ui, "looking for common markers in %i nodes\n"
                           % len(revs))
        commonrevs = list(unfi.revs('::%ln', pushop.outgoing.commonheads))
        common = findcommonobsmarkers(pushop.ui, unfi, pushop.remote,
                                      commonrevs)

        revs = list(unfi.revs('%ld - (::%ln)', revs, common))
        nodes = [cl.node(r) for r in revs]
        if nodes:
            obsexcmsg(repo.ui, "computing markers relevant to %i nodes\n"
                               % len(nodes))
            pushop.outobsmarkers = repo.obsstore.relevantmarkers(nodes)
        else:
            obsexcmsg(repo.ui, "markers already in sync\n")
            pushop.outobsmarkers = []

@eh.extsetup
def _installobsmarkersdiscovery(ui):
    olddisco = exchange.pushdiscoverymapping['obsmarker']

    def newdisco(pushop):
        _pushdiscoveryobsmarkers(olddisco, pushop)
    exchange.pushdiscoverymapping['obsmarker'] = newdisco

### Set discovery START

from mercurial import dagutil
from mercurial import setdiscovery

@eh.addattr(localrepo.localpeer, 'evoext_obshash')
def local_obshash(peer, nodes):
    return serveronly._obshash(peer._repo, nodes)

@eh.addattr(localrepo.localpeer, 'evoext_obshash1')
def local_obshash1(peer, nodes):
    return serveronly._obshash(peer._repo, nodes, version=1)

@eh.addattr(wireproto.wirepeer, 'evoext_obshash')
def peer_obshash(self, nodes):
    d = self._call("evoext_obshash", nodes=wireproto.encodelist(nodes))
    try:
        return wireproto.decodelist(d)
    except ValueError:
        self._abort(error.ResponseError(_("unexpected response:"), d))

@eh.addattr(wireproto.wirepeer, 'evoext_obshash1')
def peer_obshash1(self, nodes):
    d = self._call("evoext_obshash1", nodes=wireproto.encodelist(nodes))
    try:
        return wireproto.decodelist(d)
    except ValueError:
        self._abort(error.ResponseError(_("unexpected response:"), d))

def findcommonobsmarkers(ui, local, remote, probeset,
                         initialsamplesize=100,
                         fullsamplesize=200):
    # from discovery
    roundtrips = 0
    cl = local.changelog
    dag = dagutil.revlogdag(cl)
    missing = set()
    common = set()
    undecided = set(probeset)
    totalnb = len(undecided)
    ui.progress(_("comparing with other"), 0, total=totalnb)
    _takefullsample = setdiscovery._takefullsample
    if remote.capable('_evoext_obshash_1'):
        getremotehash = remote.evoext_obshash1
        localhash = serveronly._obsrelsethashtreefm1(local)
    else:
        getremotehash = remote.evoext_obshash
        localhash = serveronly._obsrelsethashtreefm0(local)

    while undecided:

        ui.note(_("sampling from both directions\n"))
        if len(undecided) < fullsamplesize:
            sample = set(undecided)
        else:
            sample = _takefullsample(dag, undecided, size=fullsamplesize)

        roundtrips += 1
        ui.progress(_("comparing with other"), totalnb - len(undecided),
                    total=totalnb)
        ui.debug("query %i; still undecided: %i, sample size is: %i\n"
                 % (roundtrips, len(undecided), len(sample)))
        # indices between sample and externalized version must match
        sample = list(sample)
        remotehash = getremotehash(dag.externalizeall(sample))

        yesno = [localhash[ix][1] == remotehash[si]
                 for si, ix in enumerate(sample)]

        commoninsample = set(n for i, n in enumerate(sample) if yesno[i])
        common.update(dag.ancestorset(commoninsample, common))

        missinginsample = [n for i, n in enumerate(sample) if not yesno[i]]
        missing.update(dag.descendantset(missinginsample, missing))

        undecided.difference_update(missing)
        undecided.difference_update(common)

    ui.progress(_("comparing with other"), None)
    result = dag.headsetofconnecteds(common)
    ui.debug("%d total queries\n" % roundtrips)

    if not result:
        return set([node.nullid])
    return dag.externalizeall(result)


_pushkeyescape = getattr(obsolete, '_pushkeyescape', None)

class pushobsmarkerStringIO(StringIO):
    """hacky string io for progress"""

    @util.propertycache
    def length(self):
        return len(self.getvalue())

    def read(self, size=None):
        obsexcprg(self.ui, self.tell(), unit=_("bytes"), total=self.length)
        return StringIO.read(self, size)

    def __iter__(self):
        d = self.read(4096)
        while d:
            yield d
            d = self.read(4096)

@eh.wrapfunction(exchange, '_pushobsolete')
def _pushobsolete(orig, pushop):
    """utility function to push obsolete markers to a remote"""
    stepsdone = getattr(pushop, 'stepsdone', None)
    if stepsdone is not None:
        if 'obsmarkers' in stepsdone:
            return
        stepsdone.add('obsmarkers')
    if pushop.cgresult == 0:
        return
    pushop.ui.debug('try to push obsolete markers to remote\n')
    repo = pushop.repo
    remote = pushop.remote
    if (obsolete.isenabled(repo, obsolete.exchangeopt) and repo.obsstore and
        'obsolete' in remote.listkeys('namespaces')):
        markers = pushop.outobsmarkers
        if not markers:
            obsexcmsg(repo.ui, "no marker to push\n")
        elif remote.capable('_evoext_pushobsmarkers_0'):
            obsdata = pushobsmarkerStringIO()
            for chunk in obsolete.encodemarkers(markers, True):
                obsdata.write(chunk)
            obsdata.seek(0)
            obsdata.ui = repo.ui
            obsexcmsg(repo.ui, "pushing %i obsolescence markers (%i bytes)\n"
                               % (len(markers), len(obsdata.getvalue())),
                      True)
            remote.evoext_pushobsmarkers_0(obsdata)
            obsexcprg(repo.ui, None)
        else:
            rslts = []
            remotedata = _pushkeyescape(markers).items()
            totalbytes = sum(len(d) for k, d in remotedata)
            sentbytes = 0
            obsexcmsg(repo.ui, "pushing %i obsolescence markers in %i "
                               "pushkey payload (%i bytes)\n"
                               % (len(markers), len(remotedata), totalbytes),
                      True)
            for key, data in remotedata:
                obsexcprg(repo.ui, sentbytes, item=key, unit=_("bytes"),
                          total=totalbytes)
                rslts.append(remote.pushkey('obsolete', key, '', data))
                sentbytes += len(data)
                obsexcprg(repo.ui, sentbytes, item=key, unit=_("bytes"),
                          total=totalbytes)
            obsexcprg(repo.ui, None)
            if [r for r in rslts if not r]:
                msg = _('failed to push some obsolete markers!\n')
                repo.ui.warn(msg)
        obsexcmsg(repo.ui, "DONE\n")


@eh.addattr(wireproto.wirepeer, 'evoext_pushobsmarkers_0')
def client_pushobsmarkers(self, obsfile):
    """wireprotocol peer method"""
    self.requirecap('_evoext_pushobsmarkers_0',
                    _('push obsolete markers faster'))
    ret, output = self._callpush('evoext_pushobsmarkers_0', obsfile)
    for l in output.splitlines(True):
        self.ui.status(_('remote: '), l)
    return ret

@eh.addattr(httppeer.httppeer, 'evoext_pushobsmarkers_0')
def httpclient_pushobsmarkers(self, obsfile):
    """httpprotocol peer method
    (Cannot simply use _callpush as http is doing some special handling)"""
    self.requirecap('_evoext_pushobsmarkers_0',
                    _('push obsolete markers faster'))
    try:
        r = self._call('evoext_pushobsmarkers_0', data=obsfile)
        vals = r.split('\n', 1)
        if len(vals) < 2:
            raise error.ResponseError(_("unexpected response:"), r)

        for l in vals[1].splitlines(True):
            if l.strip():
                self.ui.status(_('remote: '), l)
        return vals[0]
    except socket.error as err:
        if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
            raise error.Abort(_('push failed: %s') % err.args[1])
        raise error.Abort(err.args[1])

@eh.wrapfunction(localrepo.localrepository, '_restrictcapabilities')
def local_pushobsmarker_capabilities(orig, repo, caps):
    caps = orig(repo, caps)
    caps.add('_evoext_pushobsmarkers_0')
    return caps

@eh.addattr(localrepo.localpeer, 'evoext_pushobsmarkers_0')
def local_pushobsmarkers(peer, obsfile):
    data = obsfile.read()
    serveronly._pushobsmarkers(peer._repo, data)

def _buildpullobsmarkersboundaries(pullop):
    """small funtion returning the argument for pull markers call
    may to contains 'heads' and 'common'. skip the key for None.

    Its a separed functio to play around with strategy for that."""
    repo = pullop.repo
    remote = pullop.remote
    unfi = repo.unfiltered()
    revs = unfi.revs('::(%ln - null)', pullop.common)
    common = [node.nullid]
    if remote.capable('_evoext_obshash_0'):
        obsexcmsg(repo.ui, "looking for common markers in %i nodes\n"
                           % len(revs))
        common = findcommonobsmarkers(repo.ui, repo, remote, revs)
    return {'heads': pullop.pulledsubset, 'common': common}

@eh.uisetup
def addgetbundleargs(self):
    wireproto.gboptsmap['evo_obscommon'] = 'nodes'

@eh.wrapfunction(exchange, '_pullbundle2extraprepare')
def _addobscommontob2pull(orig, pullop, kwargs):
    ret = orig(pullop, kwargs)
    if ('obsmarkers' in kwargs and
        pullop.remote.capable('_evoext_getbundle_obscommon')):
        boundaries = _buildpullobsmarkersboundaries(pullop)
        common = boundaries['common']
        if common != [node.nullid]:
            kwargs['evo_obscommon'] = common
    return ret

@eh.wrapfunction(exchange, '_pullobsolete')
def _pullobsolete(orig, pullop):
    if not obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
        return None
    if 'obsmarkers' not in getattr(pullop, 'todosteps', ['obsmarkers']):
        return None
    if 'obsmarkers' in getattr(pullop, 'stepsdone', []):
        return None
    wirepull = pullop.remote.capable('_evoext_pullobsmarkers_0')
    if not wirepull:
        return orig(pullop)
    if 'obsolete' not in pullop.remote.listkeys('namespaces'):
        return None # remote opted out of obsolescence marker exchange
    tr = None
    ui = pullop.repo.ui
    boundaries = _buildpullobsmarkersboundaries(pullop)
    if not set(boundaries['heads']) - set(boundaries['common']):
        obsexcmsg(ui, "nothing to pull\n")
        return None

    obsexcmsg(ui, "pull obsolescence markers\n", True)
    new = 0

    if wirepull:
        obsdata = pullop.remote.evoext_pullobsmarkers_0(**boundaries)
        obsdata = obsdata.read()
        if len(obsdata) > 5:
            msg = "merging obsolescence markers (%i bytes)\n" % len(obsdata)
            obsexcmsg(ui, msg)
            tr = pullop.gettransaction()
            old = len(pullop.repo.obsstore._all)
            pullop.repo.obsstore.mergemarkers(tr, obsdata)
            new = len(pullop.repo.obsstore._all) - old
            obsexcmsg(ui, "%i obsolescence markers added\n" % new, True)
        else:
            obsexcmsg(ui, "no unknown remote markers\n")
        obsexcmsg(ui, "DONE\n")
    if new:
        pullop.repo.invalidatevolatilesets()
    return tr

@eh.addattr(wireproto.wirepeer, 'evoext_pullobsmarkers_0')
def client_pullobsmarkers(self, heads=None, common=None):
    self.requirecap('_evoext_pullobsmarkers_0', _('look up remote obsmarkers'))
    opts = {}
    if heads is not None:
        opts['heads'] = wireproto.encodelist(heads)
    if common is not None:
        opts['common'] = wireproto.encodelist(common)
    f = self._callcompressable("evoext_pullobsmarkers_0", **opts)
    length = int(f.read(20))
    chunk = 4096
    current = 0
    data = StringIO()
    ui = self.ui
    obsexcprg(ui, current, unit=_("bytes"), total=length)
    while current < length:
        readsize = min(length - current, chunk)
        data.write(f.read(readsize))
        current += readsize
        obsexcprg(ui, current, unit=_("bytes"), total=length)
    obsexcprg(ui, None)
    data.seek(0)
    return data

@eh.addattr(localrepo.localpeer, 'evoext_pullobsmarkers_0')
def local_pullobsmarkers(self, heads=None, common=None):
    return serveronly._getobsmarkersstream(self._repo, heads=heads,
                                           common=common)

@eh.command(
    'debugobsrelsethashtree',
    [('', 'v0', None, 'hash on marker format "0"'),
     ('', 'v1', None, 'hash on marker format "1" (default)')], _(''))
def debugobsrelsethashtree(ui, repo, v0=False, v1=False):
    """display Obsolete markers, Relevant Set, Hash Tree
    changeset-node obsrelsethashtree-node

    It computed form the "orsht" of its parent and markers
    relevant to the changeset itself."""
    if v0 and v1:
        raise error.Abort('cannot only specify one format')
    elif v0:
        treefunc = serveronly._obsrelsethashtreefm0
    else:
        treefunc = serveronly._obsrelsethashtreefm1

    for chg, obs in treefunc(repo):
        ui.status('%s %s\n' % (node.hex(chg), node.hex(obs)))

_bestformat = max(obsolete.formats.keys())