exchange: move code related to exchange into a 'evolve.exchange' submodule
The evolve extension is HUGE, we split exchange code appart before doing more work on it.
--- a/hgext3rd/evolve/__init__.py Sat Mar 04 03:37:32 2017 +0100
+++ b/hgext3rd/evolve/__init__.py Sat Mar 04 02:56:50 2017 +0100
@@ -62,7 +62,6 @@
import random
import re
import collections
-import socket
import errno
import struct
@@ -96,12 +95,9 @@
context,
copies,
error,
- exchange,
extensions,
help,
hg,
- httppeer,
- localrepo,
lock as lockmod,
merge,
node,
@@ -110,7 +106,6 @@
revset,
scmutil,
templatekw,
- wireproto
)
from mercurial.commands import walkopts, commitopts, commitopts2, mergetoolopts
@@ -119,6 +114,7 @@
from . import (
exthelper,
+ exchange,
serveronly,
)
@@ -148,6 +144,7 @@
eh = exthelper.exthelper()
+eh.merge(exchange.eh)
uisetup = eh.final_uisetup
extsetup = eh.final_extsetup
reposetup = eh.final_reposetup
@@ -3075,389 +3072,6 @@
entry[1].append(('O', 'old-obsolete', False,
_("make graft obsoletes its source (DEPRECATED)")))
-#####################################################################
-### Obsolescence marker exchange experimenation ###
-#####################################################################
-
-def obsexcprg(ui, *args, **kwargs):
- topic = 'obsmarkers exchange'
- if ui.configbool('experimental', 'verbose-obsolescence-exchange', False):
- topic = 'OBSEXC'
- ui.progress(topic, *args, **kwargs)
-
-@eh.wrapfunction(exchange, '_pushdiscoveryobsmarkers')
-def _pushdiscoveryobsmarkers(orig, pushop):
- if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
- and pushop.repo.obsstore
- and 'obsolete' in pushop.remote.listkeys('namespaces')):
- repo = pushop.repo
- obsexcmsg(repo.ui, "computing relevant nodes\n")
- revs = list(repo.revs('::%ln', pushop.futureheads))
- unfi = repo.unfiltered()
- cl = unfi.changelog
- if not pushop.remote.capable('_evoext_obshash_0'):
- # do not trust core yet
- # return orig(pushop)
- nodes = [cl.node(r) for r in revs]
- if nodes:
- obsexcmsg(repo.ui, "computing markers relevant to %i nodes\n"
- % len(nodes))
- pushop.outobsmarkers = repo.obsstore.relevantmarkers(nodes)
- else:
- obsexcmsg(repo.ui, "markers already in sync\n")
- pushop.outobsmarkers = []
- pushop.outobsmarkers = repo.obsstore.relevantmarkers(nodes)
- return
-
- common = []
- obsexcmsg(repo.ui, "looking for common markers in %i nodes\n"
- % len(revs))
- commonrevs = list(unfi.revs('::%ln', pushop.outgoing.commonheads))
- common = findcommonobsmarkers(pushop.ui, unfi, pushop.remote,
- commonrevs)
-
- revs = list(unfi.revs('%ld - (::%ln)', revs, common))
- nodes = [cl.node(r) for r in revs]
- if nodes:
- obsexcmsg(repo.ui, "computing markers relevant to %i nodes\n"
- % len(nodes))
- pushop.outobsmarkers = repo.obsstore.relevantmarkers(nodes)
- else:
- obsexcmsg(repo.ui, "markers already in sync\n")
- pushop.outobsmarkers = []
-
-@eh.extsetup
-def _installobsmarkersdiscovery(ui):
- olddisco = exchange.pushdiscoverymapping['obsmarker']
-
- def newdisco(pushop):
- _pushdiscoveryobsmarkers(olddisco, pushop)
- exchange.pushdiscoverymapping['obsmarker'] = newdisco
-
-### Set discovery START
-
-from mercurial import dagutil
-from mercurial import setdiscovery
-
-@eh.addattr(localrepo.localpeer, 'evoext_obshash')
-def local_obshash(peer, nodes):
- return serveronly._obshash(peer._repo, nodes)
-
-@eh.addattr(localrepo.localpeer, 'evoext_obshash1')
-def local_obshash1(peer, nodes):
- return serveronly._obshash(peer._repo, nodes, version=1)
-
-@eh.addattr(wireproto.wirepeer, 'evoext_obshash')
-def peer_obshash(self, nodes):
- d = self._call("evoext_obshash", nodes=wireproto.encodelist(nodes))
- try:
- return wireproto.decodelist(d)
- except ValueError:
- self._abort(error.ResponseError(_("unexpected response:"), d))
-
-@eh.addattr(wireproto.wirepeer, 'evoext_obshash1')
-def peer_obshash1(self, nodes):
- d = self._call("evoext_obshash1", nodes=wireproto.encodelist(nodes))
- try:
- return wireproto.decodelist(d)
- except ValueError:
- self._abort(error.ResponseError(_("unexpected response:"), d))
-
-def findcommonobsmarkers(ui, local, remote, probeset,
- initialsamplesize=100,
- fullsamplesize=200):
- # from discovery
- roundtrips = 0
- cl = local.changelog
- dag = dagutil.revlogdag(cl)
- missing = set()
- common = set()
- undecided = set(probeset)
- totalnb = len(undecided)
- ui.progress(_("comparing with other"), 0, total=totalnb)
- _takefullsample = setdiscovery._takefullsample
- if remote.capable('_evoext_obshash_1'):
- getremotehash = remote.evoext_obshash1
- localhash = serveronly._obsrelsethashtreefm1(local)
- else:
- getremotehash = remote.evoext_obshash
- localhash = serveronly._obsrelsethashtreefm0(local)
-
- while undecided:
-
- ui.note(_("sampling from both directions\n"))
- if len(undecided) < fullsamplesize:
- sample = set(undecided)
- else:
- sample = _takefullsample(dag, undecided, size=fullsamplesize)
-
- roundtrips += 1
- ui.progress(_("comparing with other"), totalnb - len(undecided),
- total=totalnb)
- ui.debug("query %i; still undecided: %i, sample size is: %i\n"
- % (roundtrips, len(undecided), len(sample)))
- # indices between sample and externalized version must match
- sample = list(sample)
- remotehash = getremotehash(dag.externalizeall(sample))
-
- yesno = [localhash[ix][1] == remotehash[si]
- for si, ix in enumerate(sample)]
-
- commoninsample = set(n for i, n in enumerate(sample) if yesno[i])
- common.update(dag.ancestorset(commoninsample, common))
-
- missinginsample = [n for i, n in enumerate(sample) if not yesno[i]]
- missing.update(dag.descendantset(missinginsample, missing))
-
- undecided.difference_update(missing)
- undecided.difference_update(common)
-
- ui.progress(_("comparing with other"), None)
- result = dag.headsetofconnecteds(common)
- ui.debug("%d total queries\n" % roundtrips)
-
- if not result:
- return set([nullid])
- return dag.externalizeall(result)
-
-
-_pushkeyescape = getattr(obsolete, '_pushkeyescape', None)
-
-class pushobsmarkerStringIO(StringIO):
- """hacky string io for progress"""
-
- @util.propertycache
- def length(self):
- return len(self.getvalue())
-
- def read(self, size=None):
- obsexcprg(self.ui, self.tell(), unit=_("bytes"), total=self.length)
- return StringIO.read(self, size)
-
- def __iter__(self):
- d = self.read(4096)
- while d:
- yield d
- d = self.read(4096)
-
-@eh.wrapfunction(exchange, '_pushobsolete')
-def _pushobsolete(orig, pushop):
- """utility function to push obsolete markers to a remote"""
- stepsdone = getattr(pushop, 'stepsdone', None)
- if stepsdone is not None:
- if 'obsmarkers' in stepsdone:
- return
- stepsdone.add('obsmarkers')
- if pushop.cgresult == 0:
- return
- pushop.ui.debug('try to push obsolete markers to remote\n')
- repo = pushop.repo
- remote = pushop.remote
- if (obsolete.isenabled(repo, obsolete.exchangeopt) and repo.obsstore and
- 'obsolete' in remote.listkeys('namespaces')):
- markers = pushop.outobsmarkers
- if not markers:
- obsexcmsg(repo.ui, "no marker to push\n")
- elif remote.capable('_evoext_pushobsmarkers_0'):
- obsdata = pushobsmarkerStringIO()
- for chunk in obsolete.encodemarkers(markers, True):
- obsdata.write(chunk)
- obsdata.seek(0)
- obsdata.ui = repo.ui
- obsexcmsg(repo.ui, "pushing %i obsolescence markers (%i bytes)\n"
- % (len(markers), len(obsdata.getvalue())),
- True)
- remote.evoext_pushobsmarkers_0(obsdata)
- obsexcprg(repo.ui, None)
- else:
- rslts = []
- remotedata = _pushkeyescape(markers).items()
- totalbytes = sum(len(d) for k, d in remotedata)
- sentbytes = 0
- obsexcmsg(repo.ui, "pushing %i obsolescence markers in %i "
- "pushkey payload (%i bytes)\n"
- % (len(markers), len(remotedata), totalbytes),
- True)
- for key, data in remotedata:
- obsexcprg(repo.ui, sentbytes, item=key, unit=_("bytes"),
- total=totalbytes)
- rslts.append(remote.pushkey('obsolete', key, '', data))
- sentbytes += len(data)
- obsexcprg(repo.ui, sentbytes, item=key, unit=_("bytes"),
- total=totalbytes)
- obsexcprg(repo.ui, None)
- if [r for r in rslts if not r]:
- msg = _('failed to push some obsolete markers!\n')
- repo.ui.warn(msg)
- obsexcmsg(repo.ui, "DONE\n")
-
-
-@eh.addattr(wireproto.wirepeer, 'evoext_pushobsmarkers_0')
-def client_pushobsmarkers(self, obsfile):
- """wireprotocol peer method"""
- self.requirecap('_evoext_pushobsmarkers_0',
- _('push obsolete markers faster'))
- ret, output = self._callpush('evoext_pushobsmarkers_0', obsfile)
- for l in output.splitlines(True):
- self.ui.status(_('remote: '), l)
- return ret
-
-@eh.addattr(httppeer.httppeer, 'evoext_pushobsmarkers_0')
-def httpclient_pushobsmarkers(self, obsfile):
- """httpprotocol peer method
- (Cannot simply use _callpush as http is doing some special handling)"""
- self.requirecap('_evoext_pushobsmarkers_0',
- _('push obsolete markers faster'))
- try:
- r = self._call('evoext_pushobsmarkers_0', data=obsfile)
- vals = r.split('\n', 1)
- if len(vals) < 2:
- raise error.ResponseError(_("unexpected response:"), r)
-
- for l in vals[1].splitlines(True):
- if l.strip():
- self.ui.status(_('remote: '), l)
- return vals[0]
- except socket.error as err:
- if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
- raise error.Abort(_('push failed: %s') % err.args[1])
- raise error.Abort(err.args[1])
-
-@eh.wrapfunction(localrepo.localrepository, '_restrictcapabilities')
-def local_pushobsmarker_capabilities(orig, repo, caps):
- caps = orig(repo, caps)
- caps.add('_evoext_pushobsmarkers_0')
- return caps
-
-@eh.addattr(localrepo.localpeer, 'evoext_pushobsmarkers_0')
-def local_pushobsmarkers(peer, obsfile):
- data = obsfile.read()
- serveronly._pushobsmarkers(peer._repo, data)
-
-def _buildpullobsmarkersboundaries(pullop):
- """small funtion returning the argument for pull markers call
- may to contains 'heads' and 'common'. skip the key for None.
-
- Its a separed functio to play around with strategy for that."""
- repo = pullop.repo
- remote = pullop.remote
- unfi = repo.unfiltered()
- revs = unfi.revs('::(%ln - null)', pullop.common)
- common = [nullid]
- if remote.capable('_evoext_obshash_0'):
- obsexcmsg(repo.ui, "looking for common markers in %i nodes\n"
- % len(revs))
- common = findcommonobsmarkers(repo.ui, repo, remote, revs)
- return {'heads': pullop.pulledsubset, 'common': common}
-
-@eh.uisetup
-def addgetbundleargs(self):
- wireproto.gboptsmap['evo_obscommon'] = 'nodes'
-
-@eh.wrapfunction(exchange, '_pullbundle2extraprepare')
-def _addobscommontob2pull(orig, pullop, kwargs):
- ret = orig(pullop, kwargs)
- if ('obsmarkers' in kwargs and
- pullop.remote.capable('_evoext_getbundle_obscommon')):
- boundaries = _buildpullobsmarkersboundaries(pullop)
- common = boundaries['common']
- if common != [nullid]:
- kwargs['evo_obscommon'] = common
- return ret
-
-@eh.wrapfunction(exchange, '_pullobsolete')
-def _pullobsolete(orig, pullop):
- if not obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
- return None
- if 'obsmarkers' not in getattr(pullop, 'todosteps', ['obsmarkers']):
- return None
- if 'obsmarkers' in getattr(pullop, 'stepsdone', []):
- return None
- wirepull = pullop.remote.capable('_evoext_pullobsmarkers_0')
- if not wirepull:
- return orig(pullop)
- if 'obsolete' not in pullop.remote.listkeys('namespaces'):
- return None # remote opted out of obsolescence marker exchange
- tr = None
- ui = pullop.repo.ui
- boundaries = _buildpullobsmarkersboundaries(pullop)
- if not set(boundaries['heads']) - set(boundaries['common']):
- obsexcmsg(ui, "nothing to pull\n")
- return None
-
- obsexcmsg(ui, "pull obsolescence markers\n", True)
- new = 0
-
- if wirepull:
- obsdata = pullop.remote.evoext_pullobsmarkers_0(**boundaries)
- obsdata = obsdata.read()
- if len(obsdata) > 5:
- msg = "merging obsolescence markers (%i bytes)\n" % len(obsdata)
- obsexcmsg(ui, msg)
- tr = pullop.gettransaction()
- old = len(pullop.repo.obsstore._all)
- pullop.repo.obsstore.mergemarkers(tr, obsdata)
- new = len(pullop.repo.obsstore._all) - old
- obsexcmsg(ui, "%i obsolescence markers added\n" % new, True)
- else:
- obsexcmsg(ui, "no unknown remote markers\n")
- obsexcmsg(ui, "DONE\n")
- if new:
- pullop.repo.invalidatevolatilesets()
- return tr
-
-@eh.addattr(wireproto.wirepeer, 'evoext_pullobsmarkers_0')
-def client_pullobsmarkers(self, heads=None, common=None):
- self.requirecap('_evoext_pullobsmarkers_0', _('look up remote obsmarkers'))
- opts = {}
- if heads is not None:
- opts['heads'] = wireproto.encodelist(heads)
- if common is not None:
- opts['common'] = wireproto.encodelist(common)
- f = self._callcompressable("evoext_pullobsmarkers_0", **opts)
- length = int(f.read(20))
- chunk = 4096
- current = 0
- data = StringIO()
- ui = self.ui
- obsexcprg(ui, current, unit=_("bytes"), total=length)
- while current < length:
- readsize = min(length - current, chunk)
- data.write(f.read(readsize))
- current += readsize
- obsexcprg(ui, current, unit=_("bytes"), total=length)
- obsexcprg(ui, None)
- data.seek(0)
- return data
-
-@eh.addattr(localrepo.localpeer, 'evoext_pullobsmarkers_0')
-def local_pullobsmarkers(self, heads=None, common=None):
- return serveronly._getobsmarkersstream(self._repo, heads=heads,
- common=common)
-
-@eh.command(
- 'debugobsrelsethashtree',
- [('', 'v0', None, 'hash on marker format "0"'),
- ('', 'v1', None, 'hash on marker format "1" (default)')], _(''))
-def debugobsrelsethashtree(ui, repo, v0=False, v1=False):
- """display Obsolete markers, Relevant Set, Hash Tree
- changeset-node obsrelsethashtree-node
-
- It computed form the "orsht" of its parent and markers
- relevant to the changeset itself."""
- if v0 and v1:
- raise error.Abort('cannot only specify one format')
- elif v0:
- treefunc = serveronly._obsrelsethashtreefm0
- else:
- treefunc = serveronly._obsrelsethashtreefm1
-
- for chg, obs in treefunc(repo):
- ui.status('%s %s\n' % (node.hex(chg), node.hex(obs)))
-
-_bestformat = max(obsolete.formats.keys())
-
-
@eh.wrapfunction(obsolete, '_checkinvalidmarkers')
def _checkinvalidmarkers(orig, markers):
"""search for marker with invalid data and raise error if needed
@@ -3475,7 +3089,7 @@
@eh.command(
'debugobsconvert',
- [('', 'new-format', _bestformat, _('Destination format for markers.'))],
+ [('', 'new-format', exchange._bestformat, _('Destination format for markers.'))],
'')
def debugobsconvert(ui, repo, new_format):
origmarkers = repo.obsstore._all # settle version
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/hgext3rd/evolve/exchange.py Sat Mar 04 02:56:50 2017 +0100
@@ -0,0 +1,413 @@
+#####################################################################
+### Obsolescence marker exchange experimenation ###
+#####################################################################
+
+from __future__ import absolute_import
+
+try:
+ import StringIO as io
+ StringIO = io.StringIO
+except ImportError:
+ import io
+ StringIO = io.StringIO
+
+import errno
+import socket
+
+from mercurial import (
+ error,
+ exchange,
+ httppeer,
+ localrepo,
+ node,
+ obsolete,
+ util,
+ wireproto,
+)
+from mercurial.i18n import _
+
+from . import (
+ exthelper,
+ serveronly,
+)
+
+eh = exthelper.exthelper()
+obsexcmsg = serveronly.obsexcmsg
+
+def obsexcprg(ui, *args, **kwargs):
+ topic = 'obsmarkers exchange'
+ if ui.configbool('experimental', 'verbose-obsolescence-exchange', False):
+ topic = 'OBSEXC'
+ ui.progress(topic, *args, **kwargs)
+
+@eh.wrapfunction(exchange, '_pushdiscoveryobsmarkers')
+def _pushdiscoveryobsmarkers(orig, pushop):
+ if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
+ and pushop.repo.obsstore
+ and 'obsolete' in pushop.remote.listkeys('namespaces')):
+ repo = pushop.repo
+ obsexcmsg(repo.ui, "computing relevant nodes\n")
+ revs = list(repo.revs('::%ln', pushop.futureheads))
+ unfi = repo.unfiltered()
+ cl = unfi.changelog
+ if not pushop.remote.capable('_evoext_obshash_0'):
+ # do not trust core yet
+ # return orig(pushop)
+ nodes = [cl.node(r) for r in revs]
+ if nodes:
+ obsexcmsg(repo.ui, "computing markers relevant to %i nodes\n"
+ % len(nodes))
+ pushop.outobsmarkers = repo.obsstore.relevantmarkers(nodes)
+ else:
+ obsexcmsg(repo.ui, "markers already in sync\n")
+ pushop.outobsmarkers = []
+ pushop.outobsmarkers = repo.obsstore.relevantmarkers(nodes)
+ return
+
+ common = []
+ obsexcmsg(repo.ui, "looking for common markers in %i nodes\n"
+ % len(revs))
+ commonrevs = list(unfi.revs('::%ln', pushop.outgoing.commonheads))
+ common = findcommonobsmarkers(pushop.ui, unfi, pushop.remote,
+ commonrevs)
+
+ revs = list(unfi.revs('%ld - (::%ln)', revs, common))
+ nodes = [cl.node(r) for r in revs]
+ if nodes:
+ obsexcmsg(repo.ui, "computing markers relevant to %i nodes\n"
+ % len(nodes))
+ pushop.outobsmarkers = repo.obsstore.relevantmarkers(nodes)
+ else:
+ obsexcmsg(repo.ui, "markers already in sync\n")
+ pushop.outobsmarkers = []
+
+@eh.extsetup
+def _installobsmarkersdiscovery(ui):
+ olddisco = exchange.pushdiscoverymapping['obsmarker']
+
+ def newdisco(pushop):
+ _pushdiscoveryobsmarkers(olddisco, pushop)
+ exchange.pushdiscoverymapping['obsmarker'] = newdisco
+
+### Set discovery START
+
+from mercurial import dagutil
+from mercurial import setdiscovery
+
+@eh.addattr(localrepo.localpeer, 'evoext_obshash')
+def local_obshash(peer, nodes):
+ return serveronly._obshash(peer._repo, nodes)
+
+@eh.addattr(localrepo.localpeer, 'evoext_obshash1')
+def local_obshash1(peer, nodes):
+ return serveronly._obshash(peer._repo, nodes, version=1)
+
+@eh.addattr(wireproto.wirepeer, 'evoext_obshash')
+def peer_obshash(self, nodes):
+ d = self._call("evoext_obshash", nodes=wireproto.encodelist(nodes))
+ try:
+ return wireproto.decodelist(d)
+ except ValueError:
+ self._abort(error.ResponseError(_("unexpected response:"), d))
+
+@eh.addattr(wireproto.wirepeer, 'evoext_obshash1')
+def peer_obshash1(self, nodes):
+ d = self._call("evoext_obshash1", nodes=wireproto.encodelist(nodes))
+ try:
+ return wireproto.decodelist(d)
+ except ValueError:
+ self._abort(error.ResponseError(_("unexpected response:"), d))
+
+def findcommonobsmarkers(ui, local, remote, probeset,
+ initialsamplesize=100,
+ fullsamplesize=200):
+ # from discovery
+ roundtrips = 0
+ cl = local.changelog
+ dag = dagutil.revlogdag(cl)
+ missing = set()
+ common = set()
+ undecided = set(probeset)
+ totalnb = len(undecided)
+ ui.progress(_("comparing with other"), 0, total=totalnb)
+ _takefullsample = setdiscovery._takefullsample
+ if remote.capable('_evoext_obshash_1'):
+ getremotehash = remote.evoext_obshash1
+ localhash = serveronly._obsrelsethashtreefm1(local)
+ else:
+ getremotehash = remote.evoext_obshash
+ localhash = serveronly._obsrelsethashtreefm0(local)
+
+ while undecided:
+
+ ui.note(_("sampling from both directions\n"))
+ if len(undecided) < fullsamplesize:
+ sample = set(undecided)
+ else:
+ sample = _takefullsample(dag, undecided, size=fullsamplesize)
+
+ roundtrips += 1
+ ui.progress(_("comparing with other"), totalnb - len(undecided),
+ total=totalnb)
+ ui.debug("query %i; still undecided: %i, sample size is: %i\n"
+ % (roundtrips, len(undecided), len(sample)))
+ # indices between sample and externalized version must match
+ sample = list(sample)
+ remotehash = getremotehash(dag.externalizeall(sample))
+
+ yesno = [localhash[ix][1] == remotehash[si]
+ for si, ix in enumerate(sample)]
+
+ commoninsample = set(n for i, n in enumerate(sample) if yesno[i])
+ common.update(dag.ancestorset(commoninsample, common))
+
+ missinginsample = [n for i, n in enumerate(sample) if not yesno[i]]
+ missing.update(dag.descendantset(missinginsample, missing))
+
+ undecided.difference_update(missing)
+ undecided.difference_update(common)
+
+ ui.progress(_("comparing with other"), None)
+ result = dag.headsetofconnecteds(common)
+ ui.debug("%d total queries\n" % roundtrips)
+
+ if not result:
+ return set([node.nullid])
+ return dag.externalizeall(result)
+
+
+_pushkeyescape = getattr(obsolete, '_pushkeyescape', None)
+
+class pushobsmarkerStringIO(StringIO):
+ """hacky string io for progress"""
+
+ @util.propertycache
+ def length(self):
+ return len(self.getvalue())
+
+ def read(self, size=None):
+ obsexcprg(self.ui, self.tell(), unit=_("bytes"), total=self.length)
+ return StringIO.read(self, size)
+
+ def __iter__(self):
+ d = self.read(4096)
+ while d:
+ yield d
+ d = self.read(4096)
+
+@eh.wrapfunction(exchange, '_pushobsolete')
+def _pushobsolete(orig, pushop):
+ """utility function to push obsolete markers to a remote"""
+ stepsdone = getattr(pushop, 'stepsdone', None)
+ if stepsdone is not None:
+ if 'obsmarkers' in stepsdone:
+ return
+ stepsdone.add('obsmarkers')
+ if pushop.cgresult == 0:
+ return
+ pushop.ui.debug('try to push obsolete markers to remote\n')
+ repo = pushop.repo
+ remote = pushop.remote
+ if (obsolete.isenabled(repo, obsolete.exchangeopt) and repo.obsstore and
+ 'obsolete' in remote.listkeys('namespaces')):
+ markers = pushop.outobsmarkers
+ if not markers:
+ obsexcmsg(repo.ui, "no marker to push\n")
+ elif remote.capable('_evoext_pushobsmarkers_0'):
+ obsdata = pushobsmarkerStringIO()
+ for chunk in obsolete.encodemarkers(markers, True):
+ obsdata.write(chunk)
+ obsdata.seek(0)
+ obsdata.ui = repo.ui
+ obsexcmsg(repo.ui, "pushing %i obsolescence markers (%i bytes)\n"
+ % (len(markers), len(obsdata.getvalue())),
+ True)
+ remote.evoext_pushobsmarkers_0(obsdata)
+ obsexcprg(repo.ui, None)
+ else:
+ rslts = []
+ remotedata = _pushkeyescape(markers).items()
+ totalbytes = sum(len(d) for k, d in remotedata)
+ sentbytes = 0
+ obsexcmsg(repo.ui, "pushing %i obsolescence markers in %i "
+ "pushkey payload (%i bytes)\n"
+ % (len(markers), len(remotedata), totalbytes),
+ True)
+ for key, data in remotedata:
+ obsexcprg(repo.ui, sentbytes, item=key, unit=_("bytes"),
+ total=totalbytes)
+ rslts.append(remote.pushkey('obsolete', key, '', data))
+ sentbytes += len(data)
+ obsexcprg(repo.ui, sentbytes, item=key, unit=_("bytes"),
+ total=totalbytes)
+ obsexcprg(repo.ui, None)
+ if [r for r in rslts if not r]:
+ msg = _('failed to push some obsolete markers!\n')
+ repo.ui.warn(msg)
+ obsexcmsg(repo.ui, "DONE\n")
+
+
+@eh.addattr(wireproto.wirepeer, 'evoext_pushobsmarkers_0')
+def client_pushobsmarkers(self, obsfile):
+ """wireprotocol peer method"""
+ self.requirecap('_evoext_pushobsmarkers_0',
+ _('push obsolete markers faster'))
+ ret, output = self._callpush('evoext_pushobsmarkers_0', obsfile)
+ for l in output.splitlines(True):
+ self.ui.status(_('remote: '), l)
+ return ret
+
+@eh.addattr(httppeer.httppeer, 'evoext_pushobsmarkers_0')
+def httpclient_pushobsmarkers(self, obsfile):
+ """httpprotocol peer method
+ (Cannot simply use _callpush as http is doing some special handling)"""
+ self.requirecap('_evoext_pushobsmarkers_0',
+ _('push obsolete markers faster'))
+ try:
+ r = self._call('evoext_pushobsmarkers_0', data=obsfile)
+ vals = r.split('\n', 1)
+ if len(vals) < 2:
+ raise error.ResponseError(_("unexpected response:"), r)
+
+ for l in vals[1].splitlines(True):
+ if l.strip():
+ self.ui.status(_('remote: '), l)
+ return vals[0]
+ except socket.error as err:
+ if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
+ raise error.Abort(_('push failed: %s') % err.args[1])
+ raise error.Abort(err.args[1])
+
+@eh.wrapfunction(localrepo.localrepository, '_restrictcapabilities')
+def local_pushobsmarker_capabilities(orig, repo, caps):
+ caps = orig(repo, caps)
+ caps.add('_evoext_pushobsmarkers_0')
+ return caps
+
+@eh.addattr(localrepo.localpeer, 'evoext_pushobsmarkers_0')
+def local_pushobsmarkers(peer, obsfile):
+ data = obsfile.read()
+ serveronly._pushobsmarkers(peer._repo, data)
+
+def _buildpullobsmarkersboundaries(pullop):
+ """small funtion returning the argument for pull markers call
+ may to contains 'heads' and 'common'. skip the key for None.
+
+ Its a separed functio to play around with strategy for that."""
+ repo = pullop.repo
+ remote = pullop.remote
+ unfi = repo.unfiltered()
+ revs = unfi.revs('::(%ln - null)', pullop.common)
+ common = [node.nullid]
+ if remote.capable('_evoext_obshash_0'):
+ obsexcmsg(repo.ui, "looking for common markers in %i nodes\n"
+ % len(revs))
+ common = findcommonobsmarkers(repo.ui, repo, remote, revs)
+ return {'heads': pullop.pulledsubset, 'common': common}
+
+@eh.uisetup
+def addgetbundleargs(self):
+ wireproto.gboptsmap['evo_obscommon'] = 'nodes'
+
+@eh.wrapfunction(exchange, '_pullbundle2extraprepare')
+def _addobscommontob2pull(orig, pullop, kwargs):
+ ret = orig(pullop, kwargs)
+ if ('obsmarkers' in kwargs and
+ pullop.remote.capable('_evoext_getbundle_obscommon')):
+ boundaries = _buildpullobsmarkersboundaries(pullop)
+ common = boundaries['common']
+ if common != [node.nullid]:
+ kwargs['evo_obscommon'] = common
+ return ret
+
+@eh.wrapfunction(exchange, '_pullobsolete')
+def _pullobsolete(orig, pullop):
+ if not obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
+ return None
+ if 'obsmarkers' not in getattr(pullop, 'todosteps', ['obsmarkers']):
+ return None
+ if 'obsmarkers' in getattr(pullop, 'stepsdone', []):
+ return None
+ wirepull = pullop.remote.capable('_evoext_pullobsmarkers_0')
+ if not wirepull:
+ return orig(pullop)
+ if 'obsolete' not in pullop.remote.listkeys('namespaces'):
+ return None # remote opted out of obsolescence marker exchange
+ tr = None
+ ui = pullop.repo.ui
+ boundaries = _buildpullobsmarkersboundaries(pullop)
+ if not set(boundaries['heads']) - set(boundaries['common']):
+ obsexcmsg(ui, "nothing to pull\n")
+ return None
+
+ obsexcmsg(ui, "pull obsolescence markers\n", True)
+ new = 0
+
+ if wirepull:
+ obsdata = pullop.remote.evoext_pullobsmarkers_0(**boundaries)
+ obsdata = obsdata.read()
+ if len(obsdata) > 5:
+ msg = "merging obsolescence markers (%i bytes)\n" % len(obsdata)
+ obsexcmsg(ui, msg)
+ tr = pullop.gettransaction()
+ old = len(pullop.repo.obsstore._all)
+ pullop.repo.obsstore.mergemarkers(tr, obsdata)
+ new = len(pullop.repo.obsstore._all) - old
+ obsexcmsg(ui, "%i obsolescence markers added\n" % new, True)
+ else:
+ obsexcmsg(ui, "no unknown remote markers\n")
+ obsexcmsg(ui, "DONE\n")
+ if new:
+ pullop.repo.invalidatevolatilesets()
+ return tr
+
+@eh.addattr(wireproto.wirepeer, 'evoext_pullobsmarkers_0')
+def client_pullobsmarkers(self, heads=None, common=None):
+ self.requirecap('_evoext_pullobsmarkers_0', _('look up remote obsmarkers'))
+ opts = {}
+ if heads is not None:
+ opts['heads'] = wireproto.encodelist(heads)
+ if common is not None:
+ opts['common'] = wireproto.encodelist(common)
+ f = self._callcompressable("evoext_pullobsmarkers_0", **opts)
+ length = int(f.read(20))
+ chunk = 4096
+ current = 0
+ data = StringIO()
+ ui = self.ui
+ obsexcprg(ui, current, unit=_("bytes"), total=length)
+ while current < length:
+ readsize = min(length - current, chunk)
+ data.write(f.read(readsize))
+ current += readsize
+ obsexcprg(ui, current, unit=_("bytes"), total=length)
+ obsexcprg(ui, None)
+ data.seek(0)
+ return data
+
+@eh.addattr(localrepo.localpeer, 'evoext_pullobsmarkers_0')
+def local_pullobsmarkers(self, heads=None, common=None):
+ return serveronly._getobsmarkersstream(self._repo, heads=heads,
+ common=common)
+
+@eh.command(
+ 'debugobsrelsethashtree',
+ [('', 'v0', None, 'hash on marker format "0"'),
+ ('', 'v1', None, 'hash on marker format "1" (default)')], _(''))
+def debugobsrelsethashtree(ui, repo, v0=False, v1=False):
+ """display Obsolete markers, Relevant Set, Hash Tree
+ changeset-node obsrelsethashtree-node
+
+ It computed form the "orsht" of its parent and markers
+ relevant to the changeset itself."""
+ if v0 and v1:
+ raise error.Abort('cannot only specify one format')
+ elif v0:
+ treefunc = serveronly._obsrelsethashtreefm0
+ else:
+ treefunc = serveronly._obsrelsethashtreefm1
+
+ for chg, obs in treefunc(repo):
+ ui.status('%s %s\n' % (node.hex(chg), node.hex(obs)))
+
+_bestformat = max(obsolete.formats.keys())
--- a/hgext3rd/evolve/exthelper.py Sat Mar 04 03:37:32 2017 +0100
+++ b/hgext3rd/evolve/exthelper.py Sat Mar 04 02:56:50 2017 +0100
@@ -33,7 +33,6 @@
self.command = cmdutil.command(self.cmdtable)
def merge(self, other):
- """merge the data collected by another exthelper into this one"""
self._uicallables.extend(other._uicallables)
self._extcallables.extend(other._extcallables)
self._repocallables.extend(other._repocallables)