--- a/hgext3rd/evolve/obsdiscovery.py Fri Mar 10 10:36:46 2017 -0800
+++ b/hgext3rd/evolve/obsdiscovery.py Sun Mar 12 08:15:14 2017 -0700
@@ -16,9 +16,12 @@
import collections
import hashlib
+import heapq
import math
+import struct
from mercurial import (
+ bundle2,
cmdutil,
commands,
dagutil,
@@ -41,6 +44,9 @@
utility,
)
+_pack = struct.pack
+_unpack = struct.unpack
+
eh = exthelper.exthelper()
obsexcmsg = utility.obsexcmsg
@@ -73,14 +79,25 @@
return
common = []
+ missing = None
obsexcmsg(repo.ui, "looking for common markers in %i nodes\n"
% len(revs))
commonrevs = list(unfi.revs('::%ln', pushop.outgoing.commonheads))
- common = findcommonobsmarkers(pushop.ui, unfi, pushop.remote,
- commonrevs)
+ if _canobshashrange(repo, pushop.remote):
+ missing = findmissingrange(pushop.ui, unfi, pushop.remote,
+ commonrevs)
+ else:
+ common = findcommonobsmarkers(pushop.ui, unfi, pushop.remote,
+ commonrevs)
+ if missing is None:
+ revs = list(unfi.revs('%ld - (::%ln)', revs, common))
+ nodes = [cl.node(r) for r in revs]
+ else:
+ revs = list(repo.revs('only(%ln, %ln)', pushop.futureheads,
+ pushop.outgoing.commonheads))
+ nodes = [cl.node(r) for r in revs]
+ nodes += missing
- revs = list(unfi.revs('%ld - (::%ln)', revs, common))
- nodes = [cl.node(r) for r in revs]
if nodes:
obsexcmsg(repo.ui, "computing markers relevant to %i nodes\n"
% len(nodes))
@@ -97,7 +114,7 @@
_pushdiscoveryobsmarkers(olddisco, pushop)
exchange.pushdiscoverymapping['obsmarker'] = newdisco
-def buildpullobsmarkersboundaries(pullop):
+def buildpullobsmarkersboundaries(pullop, bundle2=True):
"""small function returning the argument for pull markers call
may to contains 'heads' and 'common'. skip the key for None.
@@ -106,17 +123,53 @@
remote = pullop.remote
unfi = repo.unfiltered()
revs = unfi.revs('::(%ln - null)', pullop.common)
- common = [node.nullid]
- if remote.capable('_evoext_obshash_0'):
+ boundaries = {'heads': pullop.pulledsubset}
+ if not revs: # nothing common
+ boundaries['common'] = [node.nullid]
+ return boundaries
+
+ if bundle2 and _canobshashrange(repo, remote):
+ obsexcmsg(repo.ui, "looking for common markers in %i nodes\n"
+ % len(revs))
+ boundaries['missing'] = findmissingrange(repo.ui, repo, pullop.remote,
+ revs)
+ elif remote.capable('_evoext_obshash_0'):
obsexcmsg(repo.ui, "looking for common markers in %i nodes\n"
% len(revs))
- common = findcommonobsmarkers(repo.ui, repo, remote, revs)
- return {'heads': pullop.pulledsubset, 'common': common}
+ boundaries['common'] = findcommonobsmarkers(repo.ui, repo, remote, revs)
+ else:
+ boundaries['common'] = [node.nullid]
+ return boundaries
##################################
### Code performing discovery ###
##################################
+def _canobshashrange(local, remote):
+ return (local.ui.configbool('experimental', 'obshashrange', False)
+ and remote.capable('_evoext_obshashrange_1'))
+
+def _obshashrange_capabilities(orig, repo, proto):
+ """wrapper to advertise new capability"""
+ caps = orig(repo, proto)
+ enabled = repo.ui.configbool('experimental', 'obshashrange', False)
+ if obsolete.isenabled(repo, obsolete.exchangeopt) and enabled:
+ caps = caps.split()
+ caps.append('_evoext_obshashrange_1')
+ caps.sort()
+ caps = ' '.join(caps)
+ return caps
+
+@eh.extsetup
+def obshashrange_extsetup(ui):
+ extensions.wrapfunction(wireproto, 'capabilities', _obshashrange_capabilities)
+ # wrap command content
+ oldcap, args = wireproto.commands['capabilities']
+
+ def newcap(repo, proto):
+ return _obshashrange_capabilities(oldcap, repo, proto)
+ wireproto.commands['capabilities'] = (newcap, args)
+
def findcommonobsmarkers(ui, local, remote, probeset,
initialsamplesize=100,
fullsamplesize=200):
@@ -174,6 +227,152 @@
return set([node.nullid])
return dag.externalizeall(result)
+def findmissingrange(ui, local, remote, probeset,
+ initialsamplesize=100,
+ fullsamplesize=200):
+ cl = local.changelog
+ n = cl.node
+
+ missing = set()
+
+ heads = local.revs('heads(%ld)', probeset)
+
+ # size of slice ?
+ heappop = heapq.heappop
+ heappush = heapq.heappush
+ heapify = heapq.heapify
+
+ tested = set()
+
+ sample = []
+ samplesize = initialsamplesize
+
+ def addentry(entry):
+ if entry in tested:
+ return False
+ sample.append(entry)
+ tested.add(entry)
+ return True
+
+ for h in heads:
+ entry = _range(local, h, 0)
+ addentry(entry)
+
+ querycount = 0
+ ui.progress(_("comparing obsmarker with other"), querycount)
+ overflow = []
+ while sample or overflow:
+ if overflow:
+ sample.extend(overflow)
+ overflow = []
+
+ if samplesize < len(sample):
+ # too much sample already
+ overflow = sample[samplesize:]
+ sample = sample[:samplesize]
+ elif len(sample) < samplesize:
+ # we need more sample !
+ needed = samplesize - len(sample)
+ sliceme = []
+ heapify(sliceme)
+ for entry in sample:
+ if 1 < len(entry):
+ heappush(sliceme, (-len(entry), entry))
+
+ while sliceme and 0 < needed:
+ _key, target = heappop(sliceme)
+ for new in target.subranges():
+ # XXX we could record hierarchy to optimise drop
+ if addentry(entry):
+ if 1 < len(entry):
+ heappush(sliceme, (-len(entry), entry))
+ needed -= 1
+ if needed <= 0:
+ break
+
+ # no longer the first interation
+ samplesize = fullsamplesize
+
+ nbsample = len(sample)
+ maxsize = max([len(r) for r in sample])
+ ui.debug("query %i; sample size is %i, largest range %i\n"
+ % (querycount, maxsize, nbsample))
+ nbreplies = 0
+ replies = list(_queryrange(ui, local, remote, sample))
+ sample = []
+ for entry, remotehash in replies:
+ nbreplies += 1
+ if remotehash == entry.obshash:
+ continue
+ elif 1 == len(entry):
+ missing.update(entry._revs)
+ else:
+ for new in entry.subranges():
+ addentry(new)
+ assert nbsample == nbreplies
+ querycount += 1
+ ui.progress(_("comparing obsmarker with other"), querycount)
+ ui.progress(_("comparing obsmarker with other"), None)
+ return [n(r) for r in missing]
+
+def _queryrange(ui, repo, remote, allentries):
+ mapping = {}
+ n = repo.changelog.node
+
+ def gen():
+ for entry in allentries:
+ key = n(entry.head) + _pack('>I', entry.index)
+ mapping[key] = entry
+ yield key
+
+ bundler = bundle2.bundle20(ui, bundle2.bundle2caps(remote))
+ capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
+ bundler.newpart('replycaps', data=capsblob)
+ bundler.newpart('_evoexp_obsrangehash_0', data=gen())
+
+ stream = util.chunkbuffer(bundler.getchunks())
+ try:
+ reply = remote.unbundle(
+ stream, ['force'], remote.url())
+ except error.BundleValueError as exc:
+ raise error.Abort(_('missing support for %s') % exc)
+ try:
+ op = bundle2.processbundle(repo, reply)
+ except error.BundleValueError as exc:
+ raise error.Abort(_('missing support for %s') % exc)
+ except bundle2.AbortFromPart as exc:
+ ui.status(_('remote: %s\n') % exc)
+ if exc.hint is not None:
+ ui.status(_('remote: %s\n') % ('(%s)' % exc.hint))
+ raise error.Abort(_('push failed on remote'))
+ for rep in op.records['_evoexp_obsrangehash_0']:
+ yield mapping[rep['key']], rep['value']
+
+
+@bundle2.parthandler('_evoexp_obsrangehash_0', ())
+def _processqueryrange(op, inpart):
+ assert op.reply is not None
+ replies = []
+ data = inpart.read(24)
+ while data:
+ n = data[:20]
+ index = _unpack('>I', data[20:])[0]
+ r = op.repo.changelog.rev(n)
+ rhash = _range(op.repo, r, index).obshash
+ replies.append(data + rhash)
+ data = inpart.read(24)
+ op.reply.newpart('reply:_evoexp_obsrangehash_0', data=iter(replies))
+
+
+@bundle2.parthandler('reply:_evoexp_obsrangehash_0', ())
+def _processqueryrangereply(op, inpart):
+ data = inpart.read(44)
+ while data:
+ key = data[:24]
+ rhash = data[24:]
+ op.records.add('_evoexp_obsrangehash_0', {'key': key, 'value': rhash})
+ data = inpart.read(44)
+
##################################
### Stable topological sorting ###
##################################
@@ -230,7 +429,8 @@
# * process merge when both parents are yielded
# track what changeset has been
- seen = [0] * (max(revs) + 1)
+ seen = [0] * (max(revs) + 2)
+ seen[-1] = True # nullrev is known
# starts from repository roots
# reuse the list form the mapping as we won't need it again anyway
stack = children[nullrev]
@@ -251,12 +451,13 @@
current = None
continue
p1, p2 = parents(current)
- if p2 != nullrev and not (seen[p1] and seen[p2]):
+ if not (seen[p1] and seen[p2]):
# we can't iterate on this merge yet because other child is not
# yielded yet (and we are topo sorting) we can discard it for now
# because it will be reached from the other child.
current = None
continue
+ assert not seen[current]
seen[current] = True
result.append(current) # could be yield, cf earlier comment
cs = children[current]
@@ -268,11 +469,13 @@
cs.sort(key=n, reverse=True)
current = cs.pop() # proceed on smallest
stack.extend(cs) # stack the rest for later
+ assert len(result) == len(set(result))
return result
##############################
### Range Hash computation ###
##############################
+
@eh.command(
'debugstablerange',
[
@@ -320,9 +523,13 @@
self.head = head
self.index = index
if revs is not None:
+ assert len(revs) == len(self)
self._revs = revs
assert index < self.depth, (head, index, self.depth, revs)
+ def __repr__(self):
+ return '%s %d %d %s' % (node.short(self.node), self.depth, self.index, node.short(self.obshash))
+
def __hash__(self):
return self._id
@@ -344,7 +551,7 @@
return self._repo.changelog.node(self.head)
def __len__(self):
- return len(self._revs)
+ return self.depth - self.index
@util.propertycache
def depth(self):
@@ -352,7 +559,9 @@
@util.propertycache
def _revs(self):
- return _stablesort(self._repo, [self.head])[self.index:]
+ r = _stablesort(self._repo, [self.head])[self.index:]
+ assert len(r) == len(self), (self.head, self.index)
+ return r
def _slicesat(self, globalindex):
localindex = globalindex - self.index
--- a/hgext3rd/evolve/obsexchange.py Fri Mar 10 10:36:46 2017 -0800
+++ b/hgext3rd/evolve/obsexchange.py Sun Mar 12 08:15:14 2017 -0700
@@ -57,30 +57,46 @@
@eh.uisetup
def addgetbundleargs(self):
wireproto.gboptsmap['evo_obscommon'] = 'nodes'
+ wireproto.gboptsmap['evo_missing_nodes'] = 'nodes'
@eh.wrapfunction(exchange, '_pullbundle2extraprepare')
def _addobscommontob2pull(orig, pullop, kwargs):
ret = orig(pullop, kwargs)
+ ui = pullop.repo.ui
if ('obsmarkers' in kwargs and
pullop.remote.capable('_evoext_getbundle_obscommon')):
boundaries = obsdiscovery.buildpullobsmarkersboundaries(pullop)
- common = boundaries['common']
- if common != [node.nullid]:
- kwargs['evo_obscommon'] = common
+ if 'common' in boundaries:
+ common = boundaries['common']
+ if common != pullop.common:
+ obsexcmsg(ui, 'request obsmarkers for some common nodes\n')
+ if common != [node.nullid]:
+ kwargs['evo_obscommon'] = common
+ elif 'missing' in boundaries:
+ missing = boundaries['missing']
+ if missing:
+ obsexcmsg(ui, 'request obsmarkers for %d common nodes\n'
+ % len(missing))
+ kwargs['evo_missing_nodes'] = missing
return ret
def _getbundleobsmarkerpart(orig, bundler, repo, source, **kwargs):
- if 'evo_obscommon' not in kwargs:
+ if not (set(['evo_obscommon', 'evo_missing_nodes']) & set(kwargs)):
return orig(bundler, repo, source, **kwargs)
- heads = kwargs.get('heads')
if kwargs.get('obsmarkers', False):
- if heads is None:
- heads = repo.heads()
- obscommon = kwargs.get('evo_obscommon', ())
- assert obscommon
- obsset = repo.unfiltered().set('::%ln - ::%ln', heads, obscommon)
- subset = [c.node() for c in obsset]
+ heads = kwargs.get('heads')
+ if 'evo_obscommon' in kwargs:
+ if heads is None:
+ heads = repo.heads()
+ obscommon = kwargs.get('evo_obscommon', ())
+ assert obscommon
+ obsset = repo.unfiltered().set('::%ln - ::%ln', heads, obscommon)
+ subset = [c.node() for c in obsset]
+ else:
+ common = kwargs.get('common')
+ subset = [c.node() for c in repo.unfiltered().set('only(%ln, %ln)', heads, common)]
+ subset += kwargs['evo_missing_nodes']
markers = repo.obsstore.relevantmarkers(subset)
exchange.buildobsmarkerspart(bundler, markers)
@@ -342,8 +358,11 @@
return orig(pullop)
tr = None
ui = pullop.repo.ui
- boundaries = obsdiscovery.buildpullobsmarkersboundaries(pullop)
- if not set(boundaries['heads']) - set(boundaries['common']):
+ boundaries = obsdiscovery.buildpullobsmarkersboundaries(pullop, bundle2=False)
+ if 'missing' in boundaries and not boundaries['missing']:
+ obsexcmsg(ui, "nothing to pull\n")
+ return None
+ elif not set(boundaries['heads']) - set(boundaries['common']):
obsexcmsg(ui, "nothing to pull\n")
return None