discovery: split discovery related code in 'obsdiscovery'
More code splitting for more clarity.
# Code dedicated to the exchange of obsolescence markers
#
# Copyright 2017 Pierre-Yves David <pierre-yves.david@ens-lyon.org>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from __future__ import absolute_import
try:
import StringIO as io
StringIO = io.StringIO
except ImportError:
import io
StringIO = io.StringIO
import errno
import socket
from mercurial import (
error,
exchange,
httppeer,
localrepo,
node,
obsolete,
util,
wireproto,
)
from mercurial.i18n import _
from . import (
exthelper,
serveronly,
utility,
obsdiscovery,
)
eh = exthelper.exthelper()
eh.merge(obsdiscovery.eh)
obsexcmsg = utility.obsexcmsg
obsexcprg = utility.obsexcprg
_pushkeyescape = getattr(obsolete, '_pushkeyescape', None)
class pushobsmarkerStringIO(StringIO):
"""hacky string io for progress"""
@util.propertycache
def length(self):
return len(self.getvalue())
def read(self, size=None):
obsexcprg(self.ui, self.tell(), unit=_("bytes"), total=self.length)
return StringIO.read(self, size)
def __iter__(self):
d = self.read(4096)
while d:
yield d
d = self.read(4096)
@eh.wrapfunction(exchange, '_pushobsolete')
def _pushobsolete(orig, pushop):
"""utility function to push obsolete markers to a remote"""
stepsdone = getattr(pushop, 'stepsdone', None)
if stepsdone is not None:
if 'obsmarkers' in stepsdone:
return
stepsdone.add('obsmarkers')
if pushop.cgresult == 0:
return
pushop.ui.debug('try to push obsolete markers to remote\n')
repo = pushop.repo
remote = pushop.remote
if (obsolete.isenabled(repo, obsolete.exchangeopt) and repo.obsstore and
'obsolete' in remote.listkeys('namespaces')):
markers = pushop.outobsmarkers
if not markers:
obsexcmsg(repo.ui, "no marker to push\n")
elif remote.capable('_evoext_pushobsmarkers_0'):
obsdata = pushobsmarkerStringIO()
for chunk in obsolete.encodemarkers(markers, True):
obsdata.write(chunk)
obsdata.seek(0)
obsdata.ui = repo.ui
obsexcmsg(repo.ui, "pushing %i obsolescence markers (%i bytes)\n"
% (len(markers), len(obsdata.getvalue())),
True)
remote.evoext_pushobsmarkers_0(obsdata)
obsexcprg(repo.ui, None)
else:
rslts = []
remotedata = _pushkeyescape(markers).items()
totalbytes = sum(len(d) for k, d in remotedata)
sentbytes = 0
obsexcmsg(repo.ui, "pushing %i obsolescence markers in %i "
"pushkey payload (%i bytes)\n"
% (len(markers), len(remotedata), totalbytes),
True)
for key, data in remotedata:
obsexcprg(repo.ui, sentbytes, item=key, unit=_("bytes"),
total=totalbytes)
rslts.append(remote.pushkey('obsolete', key, '', data))
sentbytes += len(data)
obsexcprg(repo.ui, sentbytes, item=key, unit=_("bytes"),
total=totalbytes)
obsexcprg(repo.ui, None)
if [r for r in rslts if not r]:
msg = _('failed to push some obsolete markers!\n')
repo.ui.warn(msg)
obsexcmsg(repo.ui, "DONE\n")
@eh.addattr(wireproto.wirepeer, 'evoext_pushobsmarkers_0')
def client_pushobsmarkers(self, obsfile):
"""wireprotocol peer method"""
self.requirecap('_evoext_pushobsmarkers_0',
_('push obsolete markers faster'))
ret, output = self._callpush('evoext_pushobsmarkers_0', obsfile)
for l in output.splitlines(True):
self.ui.status(_('remote: '), l)
return ret
@eh.addattr(httppeer.httppeer, 'evoext_pushobsmarkers_0')
def httpclient_pushobsmarkers(self, obsfile):
"""httpprotocol peer method
(Cannot simply use _callpush as http is doing some special handling)"""
self.requirecap('_evoext_pushobsmarkers_0',
_('push obsolete markers faster'))
try:
r = self._call('evoext_pushobsmarkers_0', data=obsfile)
vals = r.split('\n', 1)
if len(vals) < 2:
raise error.ResponseError(_("unexpected response:"), r)
for l in vals[1].splitlines(True):
if l.strip():
self.ui.status(_('remote: '), l)
return vals[0]
except socket.error as err:
if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
raise error.Abort(_('push failed: %s') % err.args[1])
raise error.Abort(err.args[1])
@eh.wrapfunction(localrepo.localrepository, '_restrictcapabilities')
def local_pushobsmarker_capabilities(orig, repo, caps):
caps = orig(repo, caps)
caps.add('_evoext_pushobsmarkers_0')
return caps
@eh.addattr(localrepo.localpeer, 'evoext_pushobsmarkers_0')
def local_pushobsmarkers(peer, obsfile):
data = obsfile.read()
serveronly._pushobsmarkers(peer._repo, data)
@eh.uisetup
def addgetbundleargs(self):
wireproto.gboptsmap['evo_obscommon'] = 'nodes'
@eh.wrapfunction(exchange, '_pullbundle2extraprepare')
def _addobscommontob2pull(orig, pullop, kwargs):
ret = orig(pullop, kwargs)
if ('obsmarkers' in kwargs and
pullop.remote.capable('_evoext_getbundle_obscommon')):
boundaries = obsdiscovery._buildpullobsmarkersboundaries(pullop)
common = boundaries['common']
if common != [node.nullid]:
kwargs['evo_obscommon'] = common
return ret
@eh.wrapfunction(exchange, '_pullobsolete')
def _pullobsolete(orig, pullop):
if not obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
return None
if 'obsmarkers' not in getattr(pullop, 'todosteps', ['obsmarkers']):
return None
if 'obsmarkers' in getattr(pullop, 'stepsdone', []):
return None
wirepull = pullop.remote.capable('_evoext_pullobsmarkers_0')
if not wirepull:
return orig(pullop)
if 'obsolete' not in pullop.remote.listkeys('namespaces'):
return None # remote opted out of obsolescence marker exchange
tr = None
ui = pullop.repo.ui
boundaries = obsdiscovery._buildpullobsmarkersboundaries(pullop)
if not set(boundaries['heads']) - set(boundaries['common']):
obsexcmsg(ui, "nothing to pull\n")
return None
obsexcmsg(ui, "pull obsolescence markers\n", True)
new = 0
if wirepull:
obsdata = pullop.remote.evoext_pullobsmarkers_0(**boundaries)
obsdata = obsdata.read()
if len(obsdata) > 5:
msg = "merging obsolescence markers (%i bytes)\n" % len(obsdata)
obsexcmsg(ui, msg)
tr = pullop.gettransaction()
old = len(pullop.repo.obsstore._all)
pullop.repo.obsstore.mergemarkers(tr, obsdata)
new = len(pullop.repo.obsstore._all) - old
obsexcmsg(ui, "%i obsolescence markers added\n" % new, True)
else:
obsexcmsg(ui, "no unknown remote markers\n")
obsexcmsg(ui, "DONE\n")
if new:
pullop.repo.invalidatevolatilesets()
return tr
@eh.addattr(wireproto.wirepeer, 'evoext_pullobsmarkers_0')
def client_pullobsmarkers(self, heads=None, common=None):
self.requirecap('_evoext_pullobsmarkers_0', _('look up remote obsmarkers'))
opts = {}
if heads is not None:
opts['heads'] = wireproto.encodelist(heads)
if common is not None:
opts['common'] = wireproto.encodelist(common)
f = self._callcompressable("evoext_pullobsmarkers_0", **opts)
length = int(f.read(20))
chunk = 4096
current = 0
data = StringIO()
ui = self.ui
obsexcprg(ui, current, unit=_("bytes"), total=length)
while current < length:
readsize = min(length - current, chunk)
data.write(f.read(readsize))
current += readsize
obsexcprg(ui, current, unit=_("bytes"), total=length)
obsexcprg(ui, None)
data.seek(0)
return data
@eh.addattr(localrepo.localpeer, 'evoext_pullobsmarkers_0')
def local_pullobsmarkers(self, heads=None, common=None):
return serveronly._getobsmarkersstream(self._repo, heads=heads,
common=common)
_bestformat = max(obsolete.formats.keys())