stablerange: warm cache before using it in findrangemissing
We make sure the cache is fully up to date before starting to use it. Updating
all value is more efficient and this give us a single point where we update and
write on disk. Hopefully the cache have been kept up to date as we go anyway.
# Code dedicated to the discovery of obsolescence marker "over the wire"
#
# Copyright 2017 Pierre-Yves David <pierre-yves.david@ens-lyon.org>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
# Status: Experiment in progress // open question
#
# The final discovery algorithm and protocol will go into core when we'll be
# happy with it.
#
# Some of the code in this module is for compatiblity with older version
# of evolve and will be eventually dropped.
from __future__ import absolute_import
try:
import StringIO as io
StringIO = io.StringIO
except ImportError:
import io
StringIO = io.StringIO
import hashlib
import heapq
import struct
from mercurial import (
bundle2,
dagutil,
error,
exchange,
extensions,
localrepo,
node,
obsolete,
scmutil,
setdiscovery,
util,
wireproto,
)
from mercurial.hgweb import hgweb_mod
from mercurial.i18n import _
from . import (
exthelper,
utility,
stablerange,
)
_pack = struct.pack
_unpack = struct.unpack
eh = exthelper.exthelper()
eh.merge(stablerange.eh)
obsexcmsg = utility.obsexcmsg
##########################################
### trigger discovery during exchange ###
##########################################
@eh.wrapfunction(exchange, '_pushdiscoveryobsmarkers')
def _pushdiscoveryobsmarkers(orig, pushop):
if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
and pushop.repo.obsstore
and 'obsolete' in pushop.remote.listkeys('namespaces')):
repo = pushop.repo
obsexcmsg(repo.ui, "computing relevant nodes\n")
revs = list(repo.revs('::%ln', pushop.futureheads))
unfi = repo.unfiltered()
cl = unfi.changelog
if not pushop.remote.capable('_evoext_obshash_0'):
# do not trust core yet
# return orig(pushop)
nodes = [cl.node(r) for r in revs]
if nodes:
obsexcmsg(repo.ui, "computing markers relevant to %i nodes\n"
% len(nodes))
pushop.outobsmarkers = repo.obsstore.relevantmarkers(nodes)
else:
obsexcmsg(repo.ui, "markers already in sync\n")
pushop.outobsmarkers = []
pushop.outobsmarkers = repo.obsstore.relevantmarkers(nodes)
return
common = []
missing = None
obsexcmsg(repo.ui, "looking for common markers in %i nodes\n"
% len(revs))
commonrevs = list(unfi.revs('::%ln', pushop.outgoing.commonheads))
if _canobshashrange(repo, pushop.remote):
missing = findmissingrange(pushop.ui, unfi, pushop.remote,
commonrevs)
else:
common = findcommonobsmarkers(pushop.ui, unfi, pushop.remote,
commonrevs)
if missing is None:
revs = list(unfi.revs('%ld - (::%ln)', revs, common))
nodes = [cl.node(r) for r in revs]
else:
revs = list(repo.revs('only(%ln, %ln)', pushop.futureheads,
pushop.outgoing.commonheads))
nodes = [cl.node(r) for r in revs]
nodes += missing
if nodes:
obsexcmsg(repo.ui, "computing markers relevant to %i nodes\n"
% len(nodes))
pushop.outobsmarkers = repo.obsstore.relevantmarkers(nodes)
else:
obsexcmsg(repo.ui, "markers already in sync\n")
pushop.outobsmarkers = []
@eh.extsetup
def _installobsmarkersdiscovery(ui):
olddisco = exchange.pushdiscoverymapping['obsmarker']
def newdisco(pushop):
_pushdiscoveryobsmarkers(olddisco, pushop)
exchange.pushdiscoverymapping['obsmarker'] = newdisco
def buildpullobsmarkersboundaries(pullop, bundle2=True):
"""small function returning the argument for pull markers call
may to contains 'heads' and 'common'. skip the key for None.
It is a separed function to play around with strategy for that."""
repo = pullop.repo
remote = pullop.remote
unfi = repo.unfiltered()
revs = unfi.revs('::(%ln - null)', pullop.common)
boundaries = {'heads': pullop.pulledsubset}
if not revs: # nothing common
boundaries['common'] = [node.nullid]
return boundaries
if bundle2 and _canobshashrange(repo, remote):
obsexcmsg(repo.ui, "looking for common markers in %i nodes\n"
% len(revs))
boundaries['missing'] = findmissingrange(repo.ui, repo, pullop.remote,
revs)
elif remote.capable('_evoext_obshash_0'):
obsexcmsg(repo.ui, "looking for common markers in %i nodes\n"
% len(revs))
boundaries['common'] = findcommonobsmarkers(repo.ui, repo, remote, revs)
else:
boundaries['common'] = [node.nullid]
return boundaries
##################################
### Code performing discovery ###
##################################
def _canobshashrange(local, remote):
return (local.ui.configbool('experimental', 'obshashrange', False)
and remote.capable('_donotusemeever_evoext_obshashrange_1'))
def _obshashrange_capabilities(orig, repo, proto):
"""wrapper to advertise new capability"""
caps = orig(repo, proto)
enabled = repo.ui.configbool('experimental', 'obshashrange', False)
if obsolete.isenabled(repo, obsolete.exchangeopt) and enabled:
caps = caps.split()
caps.append('_donotusemeever_evoext_obshashrange_1')
caps.sort()
caps = ' '.join(caps)
return caps
@eh.extsetup
def obshashrange_extsetup(ui):
extensions.wrapfunction(wireproto, 'capabilities', _obshashrange_capabilities)
# wrap command content
oldcap, args = wireproto.commands['capabilities']
def newcap(repo, proto):
return _obshashrange_capabilities(oldcap, repo, proto)
wireproto.commands['capabilities'] = (newcap, args)
def findcommonobsmarkers(ui, local, remote, probeset,
initialsamplesize=100,
fullsamplesize=200):
# from discovery
roundtrips = 0
cl = local.changelog
dag = dagutil.revlogdag(cl)
missing = set()
common = set()
undecided = set(probeset)
totalnb = len(undecided)
ui.progress(_("comparing with other"), 0, total=totalnb)
_takefullsample = setdiscovery._takefullsample
if remote.capable('_evoext_obshash_1'):
getremotehash = remote.evoext_obshash1
localhash = _obsrelsethashtreefm1(local)
else:
getremotehash = remote.evoext_obshash
localhash = _obsrelsethashtreefm0(local)
while undecided:
ui.note(_("sampling from both directions\n"))
if len(undecided) < fullsamplesize:
sample = set(undecided)
else:
sample = _takefullsample(dag, undecided, size=fullsamplesize)
roundtrips += 1
ui.progress(_("comparing with other"), totalnb - len(undecided),
total=totalnb)
ui.debug("query %i; still undecided: %i, sample size is: %i\n"
% (roundtrips, len(undecided), len(sample)))
# indices between sample and externalized version must match
sample = list(sample)
remotehash = getremotehash(dag.externalizeall(sample))
yesno = [localhash[ix][1] == remotehash[si]
for si, ix in enumerate(sample)]
commoninsample = set(n for i, n in enumerate(sample) if yesno[i])
common.update(dag.ancestorset(commoninsample, common))
missinginsample = [n for i, n in enumerate(sample) if not yesno[i]]
missing.update(dag.descendantset(missinginsample, missing))
undecided.difference_update(missing)
undecided.difference_update(common)
ui.progress(_("comparing with other"), None)
result = dag.headsetofconnecteds(common)
ui.debug("%d total queries\n" % roundtrips)
if not result:
return set([node.nullid])
return dag.externalizeall(result)
def findmissingrange(ui, local, remote, probeset,
initialsamplesize=100,
fullsamplesize=200):
missing = set()
heads = local.revs('heads(%ld)', probeset)
local.stablerange.warmup(local)
rangelength = local.stablerange.rangelength
subranges = local.stablerange.subranges
# size of slice ?
heappop = heapq.heappop
heappush = heapq.heappush
heapify = heapq.heapify
tested = set()
sample = []
samplesize = initialsamplesize
def addentry(entry):
if entry in tested:
return False
sample.append(entry)
tested.add(entry)
return True
for h in heads:
entry = (h, 0)
addentry(entry)
querycount = 0
ui.progress(_("comparing obsmarker with other"), querycount)
overflow = []
while sample or overflow:
if overflow:
sample.extend(overflow)
overflow = []
if samplesize < len(sample):
# too much sample already
overflow = sample[samplesize:]
sample = sample[:samplesize]
elif len(sample) < samplesize:
# we need more sample !
needed = samplesize - len(sample)
sliceme = []
heapify(sliceme)
for entry in sample:
if 1 < rangelength(local, entry):
heappush(sliceme, (-rangelength(local, entry), entry))
while sliceme and 0 < needed:
_key, target = heappop(sliceme)
for new in subranges(local, target):
# XXX we could record hierarchy to optimise drop
if addentry(new):
if 1 < len(new):
heappush(sliceme, (-rangelength(local, new), new))
needed -= 1
if needed <= 0:
break
# no longer the first interation
samplesize = fullsamplesize
nbsample = len(sample)
maxsize = max([rangelength(local, r) for r in sample])
ui.debug("query %i; sample size is %i, largest range %i\n"
% (querycount, nbsample, maxsize))
nbreplies = 0
replies = list(_queryrange(ui, local, remote, sample))
sample = []
n = local.changelog.node
for entry, remotehash in replies:
nbreplies += 1
if remotehash == _obshashrange(local, entry):
continue
elif 1 == rangelength(local, entry):
missing.add(n(entry[0]))
else:
for new in subranges(local, entry):
addentry(new)
assert nbsample == nbreplies
querycount += 1
ui.progress(_("comparing obsmarker with other"), querycount)
ui.progress(_("comparing obsmarker with other"), None)
return sorted(missing)
def _queryrange(ui, repo, remote, allentries):
mapping = {}
n = repo.changelog.node
nodeentries = [(n(entry[0]), entry[1], entry) for entry in allentries]
def gen():
for enode, eindex, entry in nodeentries:
key = enode + _pack('>I', eindex)
mapping[key] = entry
yield key
bundler = bundle2.bundle20(ui, bundle2.bundle2caps(remote))
capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
bundler.newpart('replycaps', data=capsblob)
bundler.newpart('_donotusemeever_evoext_obshashrange_1', data=gen())
stream = util.chunkbuffer(bundler.getchunks())
try:
reply = remote.unbundle(
stream, ['force'], remote.url())
except error.BundleValueError as exc:
raise error.Abort(_('missing support for %s') % exc)
try:
op = bundle2.processbundle(repo, reply)
except error.BundleValueError as exc:
raise error.Abort(_('missing support for %s') % exc)
except bundle2.AbortFromPart as exc:
ui.status(_('remote: %s\n') % exc)
if exc.hint is not None:
ui.status(_('remote: %s\n') % ('(%s)' % exc.hint))
raise error.Abort(_('push failed on remote'))
for rep in op.records['_donotusemeever_evoext_obshashrange_1']:
yield mapping[rep['key']], rep['value']
@bundle2.parthandler('_donotusemeever_evoext_obshashrange_1', ())
def _processqueryrange(op, inpart):
assert op.reply is not None
replies = []
data = inpart.read(24)
while data:
n = data[:20]
index = _unpack('>I', data[20:])[0]
r = op.repo.changelog.rev(n)
rhash = _obshashrange(op.repo, (r, index))
replies.append(data + rhash)
data = inpart.read(24)
op.reply.newpart('reply:_donotusemeever_evoext_obshashrange_1', data=iter(replies))
@bundle2.parthandler('reply:_donotusemeever_evoext_obshashrange_1', ())
def _processqueryrangereply(op, inpart):
data = inpart.read(44)
while data:
key = data[:24]
rhash = data[24:]
op.records.add('_donotusemeever_evoext_obshashrange_1', {'key': key, 'value': rhash})
data = inpart.read(44)
##############################
### Range Hash computation ###
##############################
@eh.command(
'debugobshashrange',
[
('', 'rev', [], 'display obshash for all (rev, 0) range in REVS'),
('', 'subranges', False, 'display all subranges'),
],
_(''))
def debugobshashrange(ui, repo, **opts):
"""display the ::REVS set topologically sorted in a stable way
"""
s = node.short
revs = scmutil.revrange(repo, opts['rev'])
# prewarm depth cache
repo.stablerange.warmup(repo, max(revs))
cl = repo.changelog
rangelength = repo.stablerange.rangelength
depthrev = repo.stablerange.depthrev
if opts['subranges']:
ranges = stablerange.subrangesclosure(repo, revs)
else:
ranges = [(r, 0) for r in revs]
headers = ('rev', 'node', 'index', 'size', 'depth', 'obshash')
linetemplate = '%12d %12s %12d %12d %12d %12s\n'
headertemplate = linetemplate.replace('d', 's')
ui.status(headertemplate % headers)
for r in ranges:
d = (r[0],
s(cl.node(r[0])),
r[1],
rangelength(repo, r),
depthrev(repo, r[0]),
node.short(_obshashrange(repo, r)))
ui.status(linetemplate % d)
def _obshashrange(repo, rangeid):
"""return the obsolete hash associated to a range"""
cache = repo.obsstore.rangeobshashcache
cl = repo.changelog
obshash = cache.get(rangeid)
if obshash is not None:
return obshash
pieces = []
nullid = node.nullid
if repo.stablerange.rangelength(repo, rangeid) == 1:
rangenode = cl.node(rangeid[0])
tmarkers = repo.obsstore.relevantmarkers([rangenode])
pieces = []
for m in tmarkers:
mbin = obsolete._fm1encodeonemarker(m)
pieces.append(mbin)
pieces.sort()
else:
for subrange in repo.stablerange.subranges(repo, rangeid):
obshash = _obshashrange(repo, subrange)
if obshash != nullid:
pieces.append(obshash)
sha = hashlib.sha1()
# note: if there is only one subrange with actual data, we'll just
# reuse the same hash.
if not pieces:
obshash = node.nullid
elif len(pieces) != 1 or obshash is None:
sha = hashlib.sha1()
for p in pieces:
sha.update(p)
obshash = sha.digest()
cache[rangeid] = obshash
return obshash
@eh.wrapfunction(obsolete.obsstore, '_addmarkers')
def _addmarkers(orig, obsstore, *args, **kwargs):
obsstore.rangeobshashcache.clear()
return orig(obsstore, *args, **kwargs)
@eh.addattr(obsolete.obsstore, 'rangeobshashcache')
@util.propertycache
def rangeobshashcache(obsstore):
return {}
#############################
### Tree Hash computation ###
#############################
# Dash computed from a given changesets using all markers relevant to it and
# the obshash of its parents. This is similar to what happend for changeset
# node where the parent is used in the computation
@eh.command(
'debugobsrelsethashtree',
[('', 'v0', None, 'hash on marker format "0"'),
('', 'v1', None, 'hash on marker format "1" (default)')], _(''))
def debugobsrelsethashtree(ui, repo, v0=False, v1=False):
"""display Obsolete markers, Relevant Set, Hash Tree
changeset-node obsrelsethashtree-node
It computed form the "orsht" of its parent and markers
relevant to the changeset itself."""
if v0 and v1:
raise error.Abort('cannot only specify one format')
elif v0:
treefunc = _obsrelsethashtreefm0
else:
treefunc = _obsrelsethashtreefm1
for chg, obs in treefunc(repo):
ui.status('%s %s\n' % (node.hex(chg), node.hex(obs)))
def _obsrelsethashtreefm0(repo):
return _obsrelsethashtree(repo, obsolete._fm0encodeonemarker)
def _obsrelsethashtreefm1(repo):
return _obsrelsethashtree(repo, obsolete._fm1encodeonemarker)
def _obsrelsethashtree(repo, encodeonemarker):
cache = []
unfi = repo.unfiltered()
markercache = {}
repo.ui.progress(_("preparing locally"), 0, total=len(unfi))
for i in unfi:
ctx = unfi[i]
entry = 0
sha = hashlib.sha1()
# add data from p1
for p in ctx.parents():
p = p.rev()
if p < 0:
p = node.nullid
else:
p = cache[p][1]
if p != node.nullid:
entry += 1
sha.update(p)
tmarkers = repo.obsstore.relevantmarkers([ctx.node()])
if tmarkers:
bmarkers = []
for m in tmarkers:
if m not in markercache:
markercache[m] = encodeonemarker(m)
bmarkers.append(markercache[m])
bmarkers.sort()
for m in bmarkers:
entry += 1
sha.update(m)
if entry:
cache.append((ctx.node(), sha.digest()))
else:
cache.append((ctx.node(), node.nullid))
repo.ui.progress(_("preparing locally"), i, total=len(unfi))
repo.ui.progress(_("preparing locally"), None)
return cache
def _obshash(repo, nodes, version=0):
if version == 0:
hashs = _obsrelsethashtreefm0(repo)
elif version == 1:
hashs = _obsrelsethashtreefm1(repo)
else:
assert False
nm = repo.changelog.nodemap
revs = [nm.get(n) for n in nodes]
return [r is None and node.nullid or hashs[r][1] for r in revs]
@eh.addattr(localrepo.localpeer, 'evoext_obshash')
def local_obshash(peer, nodes):
return _obshash(peer._repo, nodes)
@eh.addattr(localrepo.localpeer, 'evoext_obshash1')
def local_obshash1(peer, nodes):
return _obshash(peer._repo, nodes, version=1)
@eh.addattr(wireproto.wirepeer, 'evoext_obshash')
def peer_obshash(self, nodes):
d = self._call("evoext_obshash", nodes=wireproto.encodelist(nodes))
try:
return wireproto.decodelist(d)
except ValueError:
self._abort(error.ResponseError(_("unexpected response:"), d))
@eh.addattr(wireproto.wirepeer, 'evoext_obshash1')
def peer_obshash1(self, nodes):
d = self._call("evoext_obshash1", nodes=wireproto.encodelist(nodes))
try:
return wireproto.decodelist(d)
except ValueError:
self._abort(error.ResponseError(_("unexpected response:"), d))
def srv_obshash(repo, proto, nodes):
return wireproto.encodelist(_obshash(repo, wireproto.decodelist(nodes)))
def srv_obshash1(repo, proto, nodes):
return wireproto.encodelist(_obshash(repo, wireproto.decodelist(nodes),
version=1))
def _obshash_capabilities(orig, repo, proto):
"""wrapper to advertise new capability"""
caps = orig(repo, proto)
if obsolete.isenabled(repo, obsolete.exchangeopt):
caps = caps.split()
caps.append('_evoext_obshash_0')
caps.append('_evoext_obshash_1')
caps.sort()
caps = ' '.join(caps)
return caps
@eh.extsetup
def obshash_extsetup(ui):
hgweb_mod.perms['evoext_obshash'] = 'pull'
hgweb_mod.perms['evoext_obshash1'] = 'pull'
wireproto.commands['evoext_obshash'] = (srv_obshash, 'nodes')
wireproto.commands['evoext_obshash1'] = (srv_obshash1, 'nodes')
extensions.wrapfunction(wireproto, 'capabilities', _obshash_capabilities)
# wrap command content
oldcap, args = wireproto.commands['capabilities']
def newcap(repo, proto):
return _obshash_capabilities(oldcap, repo, proto)
wireproto.commands['capabilities'] = (newcap, args)