# Code dedicated to the discovery of obsolescence marker "over the wire"
#
# Copyright 2017 Pierre-Yves David <pierre-yves.david@ens-lyon.org>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
# Status: Experiment in progress // open question
#
# The final discovery algorithm and protocol will go into core when we'll be
# happy with it.
#
# Some of the code in this module is for compatiblity with older version
# of evolve and will be eventually dropped.
from __future__ import absolute_import
try:
import StringIO as io
StringIO = io.StringIO
except ImportError:
import io
StringIO = io.StringIO
import hashlib
import heapq
import inspect
import sqlite3
import struct
import weakref
from mercurial import (
error,
exchange,
extensions,
localrepo,
node,
obsolete,
scmutil,
setdiscovery,
util,
)
from mercurial.i18n import _
from . import (
compat,
exthelper,
obscache,
utility,
stablerange,
stablerangecache,
)
try:
from mercurial import wireprototypes, wireprotov1server
from mercurial.wireprotov1peer import wirepeer
from mercurial.wireprototypes import encodelist, decodelist
except (ImportError, AttributeError): # <= hg-4.5
from mercurial import wireproto as wireprototypes
wireprotov1server = wireprototypes
from mercurial.wireproto import wirepeer, encodelist, decodelist
try:
from mercurial import dagutil
dagutil.revlogdag
except (ImportError, AttributeError): # <= hg-4.7
from . import dagutil
_pack = struct.pack
_unpack = struct.unpack
_calcsize = struct.calcsize
eh = exthelper.exthelper()
obsexcmsg = utility.obsexcmsg
# Config
eh.configitem('experimental', 'evolution.obsdiscovery')
eh.configitem('experimental', 'obshashrange')
eh.configitem('experimental', 'obshashrange.warm-cache')
eh.configitem('experimental', 'obshashrange.max-revs')
eh.configitem('experimental', 'obshashrange.lru-size')
##################################
### Code performing discovery ###
##################################
def findcommonobsmarkers(ui, local, remote, probeset,
initialsamplesize=100,
fullsamplesize=200):
# from discovery
roundtrips = 0
cl = local.changelog
dag = dagutil.revlogdag(cl)
missing = set()
common = set()
undecided = set(probeset)
totalnb = len(undecided)
ui.progress(_("comparing with other"), 0, total=totalnb,
unit=_("changesets"))
_takefullsample = setdiscovery._takefullsample
if remote.capable('_evoext_obshash_1'):
getremotehash = remote.evoext_obshash1
localhash = _obsrelsethashtreefm1(local)
else:
getremotehash = remote.evoext_obshash
localhash = _obsrelsethashtreefm0(local)
while undecided:
ui.note(_("sampling from both directions\n"))
if len(undecided) < fullsamplesize:
sample = set(undecided)
else:
# Mercurial 4.8 changed calling convention.
if len(inspect.getargspec(_takefullsample)[0]) == 4:
sample = _takefullsample(local, None, undecided,
size=fullsamplesize)
else:
# hg <= 4.7 version
sample = _takefullsample(dag, undecided, size=fullsamplesize)
roundtrips += 1
ui.progress(_("comparing with other"), totalnb - len(undecided),
total=totalnb, unit=_("changesets"))
ui.debug("query %i; still undecided: %i, sample size is: %i\n"
% (roundtrips, len(undecided), len(sample)))
# indices between sample and externalized version must match
sample = list(sample)
remotehash = getremotehash(dag.externalizeall(sample))
yesno = [localhash[ix][1] == remotehash[si]
for si, ix in enumerate(sample)]
commoninsample = set(n for i, n in enumerate(sample) if yesno[i])
common.update(dag.ancestorset(commoninsample, common))
missinginsample = [n for i, n in enumerate(sample) if not yesno[i]]
missing.update(dag.descendantset(missinginsample, missing))
undecided.difference_update(missing)
undecided.difference_update(common)
ui.progress(_("comparing with other"), None)
result = dag.headsetofconnecteds(common)
ui.debug("%d total queries\n" % roundtrips)
if not result:
return set([node.nullid])
return dag.externalizeall(result)
def findmissingrange(ui, local, remote, probeset,
initialsamplesize=100,
fullsamplesize=200):
missing = set()
starttime = util.timer()
heads = local.revs('heads(%ld)', probeset)
local.stablerange.warmup(local)
rangelength = local.stablerange.rangelength
subranges = local.stablerange.subranges
# size of slice ?
heappop = heapq.heappop
heappush = heapq.heappush
heapify = heapq.heapify
tested = set()
sample = []
samplesize = initialsamplesize
def addentry(entry):
if entry in tested:
return False
sample.append(entry)
tested.add(entry)
return True
for h in heads:
entry = (h, 0)
addentry(entry)
local.obsstore.rangeobshashcache.update(local)
querycount = 0
ui.progress(_("comparing obsmarker with other"), querycount,
unit=_("queries"))
overflow = []
while sample or overflow:
if overflow:
sample.extend(overflow)
overflow = []
if samplesize < len(sample):
# too much sample already
overflow = sample[samplesize:]
sample = sample[:samplesize]
elif len(sample) < samplesize:
ui.debug("query %i; add more sample (target %i, current %i)\n"
% (querycount, samplesize, len(sample)))
# we need more sample !
needed = samplesize - len(sample)
sliceme = []
heapify(sliceme)
for entry in sample:
if 1 < rangelength(local, entry):
heappush(sliceme, (-rangelength(local, entry), entry))
while sliceme and 0 < needed:
_key, target = heappop(sliceme)
for new in subranges(local, target):
# XXX we could record hierarchy to optimise drop
if addentry(new):
if 1 < len(new):
heappush(sliceme, (-rangelength(local, new), new))
needed -= 1
if needed <= 0:
break
# no longer the first interation
samplesize = fullsamplesize
nbsample = len(sample)
maxsize = max([rangelength(local, r) for r in sample])
ui.debug("query %i; sample size is %i, largest range %i\n"
% (querycount, nbsample, maxsize))
nbreplies = 0
replies = list(_queryrange(ui, local, remote, sample))
sample = []
n = local.changelog.node
for entry, remotehash in replies:
nbreplies += 1
if remotehash == _obshashrange(local, entry):
continue
elif 1 == rangelength(local, entry):
missing.add(n(entry[0]))
else:
for new in subranges(local, entry):
addentry(new)
assert nbsample == nbreplies
querycount += 1
ui.progress(_("comparing obsmarker with other"), querycount,
unit=_("queries"))
ui.progress(_("comparing obsmarker with other"), None)
local.obsstore.rangeobshashcache.save(local)
duration = util.timer() - starttime
logmsg = ('obsdiscovery, %d/%d mismatch'
' - %d obshashrange queries in %.4f seconds\n')
logmsg %= (len(missing), len(probeset), querycount, duration)
ui.log('evoext-obsdiscovery', logmsg)
ui.debug(logmsg)
return sorted(missing)
def _queryrange(ui, repo, remote, allentries):
# question are asked with node
n = repo.changelog.node
noderanges = [(n(entry[0]), entry[1]) for entry in allentries]
replies = remote.evoext_obshashrange_v1(noderanges)
result = []
for idx, entry in enumerate(allentries):
result.append((entry, replies[idx]))
return result
##############################
### Range Hash computation ###
##############################
@eh.command(
'debugobshashrange',
[
('', 'rev', [], 'display obshash for all (rev, 0) range in REVS'),
('', 'subranges', False, 'display all subranges'),
],
_(''))
def debugobshashrange(ui, repo, **opts):
"""display the ::REVS set topologically sorted in a stable way
"""
s = node.short
revs = scmutil.revrange(repo, opts['rev'])
# prewarm depth cache
if revs:
repo.stablerange.warmup(repo, max(revs))
cl = repo.changelog
rangelength = repo.stablerange.rangelength
depthrev = repo.stablerange.depthrev
if opts['subranges']:
ranges = stablerange.subrangesclosure(repo, repo.stablerange, revs)
else:
ranges = [(r, 0) for r in revs]
headers = ('rev', 'node', 'index', 'size', 'depth', 'obshash')
linetemplate = '%12d %12s %12d %12d %12d %12s\n'
headertemplate = linetemplate.replace('d', 's')
ui.status(headertemplate % headers)
repo.obsstore.rangeobshashcache.update(repo)
for r in ranges:
d = (r[0],
s(cl.node(r[0])),
r[1],
rangelength(repo, r),
depthrev(repo, r[0]),
node.short(_obshashrange(repo, r)))
ui.status(linetemplate % d)
repo.obsstore.rangeobshashcache.save(repo)
def _obshashrange(repo, rangeid):
"""return the obsolete hash associated to a range"""
cache = repo.obsstore.rangeobshashcache
cl = repo.changelog
obshash = cache.get(rangeid)
if obshash is not None:
return obshash
pieces = []
nullid = node.nullid
if repo.stablerange.rangelength(repo, rangeid) == 1:
rangenode = cl.node(rangeid[0])
tmarkers = repo.obsstore.relevantmarkers([rangenode])
pieces = []
for m in tmarkers:
mbin = obsolete._fm1encodeonemarker(m)
pieces.append(mbin)
pieces.sort()
else:
for subrange in repo.stablerange.subranges(repo, rangeid):
obshash = _obshashrange(repo, subrange)
if obshash != nullid:
pieces.append(obshash)
sha = hashlib.sha1()
# note: if there is only one subrange with actual data, we'll just
# reuse the same hash.
if not pieces:
obshash = node.nullid
elif len(pieces) != 1 or obshash is None:
sha = hashlib.sha1()
for p in pieces:
sha.update(p)
obshash = sha.digest()
cache[rangeid] = obshash
return obshash
### sqlite caching
_sqliteschema = [
"""CREATE TABLE obshashrange(rev INTEGER NOT NULL,
idx INTEGER NOT NULL,
obshash BLOB NOT NULL,
PRIMARY KEY(rev, idx));""",
"CREATE INDEX range_index ON obshashrange(rev, idx);",
"""CREATE TABLE meta(schemaversion INTEGER NOT NULL,
tiprev INTEGER NOT NULL,
tipnode BLOB NOT NULL,
nbobsmarker INTEGER NOT NULL,
obssize BLOB NOT NULL,
obskey BLOB NOT NULL
);""",
]
_queryexist = "SELECT name FROM sqlite_master WHERE type='table' AND name='meta';"
_clearmeta = """DELETE FROM meta;"""
_newmeta = """INSERT INTO meta (schemaversion, tiprev, tipnode, nbobsmarker, obssize, obskey)
VALUES (?,?,?,?,?,?);"""
_updateobshash = "INSERT INTO obshashrange(rev, idx, obshash) VALUES (?,?,?);"
_querymeta = "SELECT schemaversion, tiprev, tipnode, nbobsmarker, obssize, obskey FROM meta;"
_queryobshash = "SELECT obshash FROM obshashrange WHERE (rev = ? AND idx = ?);"
_query_max_stored = "SELECT MAX(rev) FROM obshashrange"
_reset = "DELETE FROM obshashrange;"
_delete = "DELETE FROM obshashrange WHERE (rev = ? AND idx = ?);"
def _affectedby(repo, markers):
"""return all nodes whose relevant set is affected by this changeset
This is a reversed version of obsstore.relevantmarkers
"""
affected_nodes = set()
known_markers = set(markers)
node_to_proceed = set()
marker_to_proceed = set(known_markers)
obsstore = repo.obsstore
while node_to_proceed or marker_to_proceed:
while marker_to_proceed:
m = marker_to_proceed.pop()
# check successors and parent
if m[1]:
relevant = (m[1], )
else: # prune case
relevant = ((m[0], ), m[5])
for l in relevant:
if l is None:
continue
for n in l:
if n not in affected_nodes:
node_to_proceed.add(n)
affected_nodes.add(n)
# marker_to_proceed is now empty:
if node_to_proceed:
n = node_to_proceed.pop()
markers = set()
markers.update(obsstore.successors.get(n, ()))
markers.update(obsstore.predecessors.get(n, ()))
markers -= known_markers
marker_to_proceed.update(markers)
known_markers.update(markers)
return affected_nodes
# if there is that many new obsmarkers, reset without analysing them
RESET_ABOVE = 10000
class _obshashcache(obscache.dualsourcecache):
_schemaversion = 3
_cachename = 'evo-ext-obshashrange' # used for error message
_filename = 'cache/evoext_obshashrange_v2.sqlite'
def __init__(self, repo):
super(_obshashcache, self).__init__()
self._vfs = repo.vfs
self._path = repo.vfs.join(self._filename)
self._new = set()
self._valid = True
self._repo = weakref.ref(repo.unfiltered())
# cache status
self._ondiskcachekey = None
self._data = {}
def clear(self, reset=False):
super(_obshashcache, self).clear(reset=reset)
self._data.clear()
self._new.clear()
if reset:
self._valid = False
if '_con' in vars(self):
del self._con
def get(self, rangeid):
# revision should be covered by the tiprev
#
# XXX there are issue with cache warming, we hack around it for now
if not getattr(self, '_updating', False):
if self._cachekey[0] < rangeid[0]:
msg = ('using unwarmed obshashrangecache (%s %s)'
% (rangeid[0], self._cachekey[0]))
raise error.ProgrammingError(msg)
value = self._data.get(rangeid)
if value is None and self._con is not None:
nrange = (rangeid[0], rangeid[1])
try:
obshash = self._con.execute(_queryobshash, nrange).fetchone()
if obshash is not None:
value = obshash[0]
self._data[rangeid] = value
except (sqlite3.DatabaseError, sqlite3.OperationalError):
# something is wrong with the sqlite db
# Since this is a cache, we ignore it.
if '_con' in vars(self):
del self._con
self._new.clear()
return value
def __setitem__(self, rangeid, obshash):
self._new.add(rangeid)
self._data[rangeid] = obshash
def _updatefrom(self, repo, revs, obsmarkers):
"""override this method to update your cache data incrementally
revs: list of new revision in the changelog
obsmarker: list of new obsmarkers in the obsstore
"""
# XXX for now, we'll not actually update the cache, but we'll be
# smarter at invalidating it.
#
# 1) new revisions does not get their entry updated (not update)
# 2) if we detect markers affecting non-new revision we reset the cache
self._updating = True
con = self._con
if con is not None:
reset = False
affected = []
if RESET_ABOVE < len(obsmarkers):
# lots of new obsmarkers, probably smarter to reset the cache
repo.ui.log('evoext-cache', 'obshashcache reset - '
'many new markers (%d)\n'
% len(obsmarkers))
reset = True
elif obsmarkers:
max_stored = con.execute(_query_max_stored).fetchall()[0][0]
affected_nodes = _affectedby(repo, obsmarkers)
rev = repo.changelog.nodemap.get
affected = [rev(n) for n in affected_nodes]
affected = [r for r in affected
if r is not None and r <= max_stored]
if RESET_ABOVE < len(affected):
repo.ui.log('evoext-cache', 'obshashcache reset - '
'new markers affect many changeset (%d)\n'
% len(affected))
reset = True
if affected or reset:
if not reset:
repo.ui.log('evoext-cache', 'obshashcache clean - '
'new markers affect %d changeset and cached ranges\n'
% len(affected))
if con is not None:
# always reset for now, the code detecting affect is buggy
# so we need to reset more broadly than we would like.
try:
if repo.stablerange._con is None:
repo.ui.log('evoext-cache', 'obshashcache reset - '
'underlying stablerange cache unavailable\n')
reset = True
if reset:
con.execute(_reset)
self._data.clear()
else:
ranges = repo.stablerange.contains(repo, affected)
con.executemany(_delete, ranges)
for r in ranges:
self._data.pop(r, None)
except (sqlite3.DatabaseError, sqlite3.OperationalError) as exc:
repo.ui.log('evoext-cache', 'error while updating obshashrange cache: %s' % exc)
del self._updating
return
# rewarm key revisions
#
# (The current invalidation is too wide, but rewarming every
# single revision is quite costly)
newrevs = []
stop = self._cachekey[0] # tiprev
for h in repo.filtered('immutable').changelog.headrevs():
if h <= stop and h in affected:
newrevs.append(h)
newrevs.extend(revs)
revs = newrevs
repo.depthcache.update(repo)
total = len(revs)
def progress(pos, rev):
repo.ui.progress('updating obshashrange cache',
pos, 'rev %s' % rev, unit='revision', total=total)
# warm the cache for the new revs
progress(0, '')
for idx, r in enumerate(revs):
_obshashrange(repo, (r, 0))
progress(idx, r)
progress(None, '')
del self._updating
@property
def _fullcachekey(self):
return (self._schemaversion, ) + self._cachekey
def load(self, repo):
if self._con is None:
self._cachekey = self.emptykey
self._ondiskcachekey = self.emptykey
assert self._cachekey is not None
def _db(self):
try:
util.makedirs(self._vfs.dirname(self._path))
except OSError:
return None
con = sqlite3.connect(self._path, timeout=30, isolation_level="IMMEDIATE")
con.text_factory = str
return con
@util.propertycache
def _con(self):
if not self._valid:
return None
repo = self._repo()
if repo is None:
return None
con = self._db()
if con is None:
return None
cur = con.execute(_queryexist)
if cur.fetchone() is None:
self._valid = False
return None
meta = con.execute(_querymeta).fetchone()
if meta is None or meta[0] != self._schemaversion:
self._valid = False
return None
self._cachekey = self._ondiskcachekey = meta[1:]
return con
def save(self, repo):
if self._cachekey is None:
return
if self._cachekey == self._ondiskcachekey and not self._new:
return
repo = repo.unfiltered()
try:
with repo.lock():
if 'stablerange' in vars(repo):
repo.stablerange.save(repo)
self._save(repo)
except error.LockError:
# Exceptionnally we are noisy about it since performance impact
# is large We should address that before using this more
# widely.
msg = _('obshashrange cache: skipping save unable to lock repo\n')
repo.ui.warn(msg)
def _save(self, repo):
if not self._new:
return
try:
return self._trysave(repo)
except (IOError, OSError, sqlite3.DatabaseError, sqlite3.OperationalError, sqlite3.IntegrityError) as exc:
# Catch error that may arise under stress
#
# operational error catch read-only and locked database
# IntegrityError catch Unique constraint error that may arise
if '_con' in vars(self):
del self._con
self._new.clear()
repo.ui.log('evoext-cache', 'error while saving new data: %s' % exc)
repo.ui.debug('evoext-cache: error while saving new data: %s' % exc)
def _trysave(self, repo):
if self._con is None:
util.unlinkpath(self._path, ignoremissing=True)
if '_con' in vars(self):
del self._con
con = self._db()
if con is None:
repo.ui.log('evoext-cache', 'unable to write obshashrange cache'
' - cannot create database')
return
with con:
for req in _sqliteschema:
con.execute(req)
meta = [self._schemaversion] + list(self.emptykey)
con.execute(_newmeta, meta)
self._ondiskcachekey = self.emptykey
else:
con = self._con
with con:
meta = con.execute(_querymeta).fetchone()
if meta[1:] != self._ondiskcachekey:
# drifting is currently an issue because this means another
# process might have already added the cache line we are about
# to add. This will confuse sqlite
msg = _('obshashrange cache: skipping write, '
'database drifted under my feet\n')
repo.ui.warn(msg)
self._new.clear()
self._valid = False
if '_con' in vars(self):
del self._con
self._valid = False
return
data = ((rangeid[0], rangeid[1], self.get(rangeid)) for rangeid in self._new)
con.executemany(_updateobshash, data)
cachekey = self._fullcachekey
con.execute(_clearmeta) # remove the older entry
con.execute(_newmeta, cachekey)
self._new.clear()
self._valid = True
self._ondiskcachekey = self._cachekey
@eh.wrapfunction(obsolete.obsstore, '_addmarkers')
def _addmarkers(orig, obsstore, *args, **kwargs):
obsstore.rangeobshashcache.clear()
return orig(obsstore, *args, **kwargs)
obsstorefilecache = localrepo.localrepository.obsstore
# obsstore is a filecache so we have do to some spacial dancing
@eh.wrapfunction(obsstorefilecache, 'func')
def obsstorewithcache(orig, repo):
obsstore = orig(repo)
obsstore.rangeobshashcache = _obshashcache(repo.unfiltered())
return obsstore
@eh.reposetup
def setupcache(ui, repo):
class obshashrepo(repo.__class__):
@localrepo.unfilteredmethod
def destroyed(self):
if 'obsstore' in vars(self):
self.obsstore.rangeobshashcache.clear()
toplevel = not util.safehasattr(self, '_destroying')
if toplevel:
self._destroying = True
try:
super(obshashrepo, self).destroyed()
finally:
if toplevel:
del self._destroying
@localrepo.unfilteredmethod
def updatecaches(self, tr=None, **kwargs):
if utility.shouldwarmcache(self, tr):
self.obsstore.rangeobshashcache.update(self)
self.obsstore.rangeobshashcache.save(self)
super(obshashrepo, self).updatecaches(tr, **kwargs)
repo.__class__ = obshashrepo
### wire protocol commands
def _obshashrange_v0(repo, ranges):
"""return a list of hash from a list of range
The range have the id encoded as a node
return 'wdirid' for unknown range"""
nm = repo.changelog.nodemap
ranges = [(nm.get(n), idx) for n, idx in ranges]
if ranges:
maxrev = max(r for r, i in ranges)
if maxrev is not None:
repo.stablerange.warmup(repo, upto=maxrev)
result = []
repo.obsstore.rangeobshashcache.update(repo)
for r in ranges:
if r[0] is None:
result.append(node.wdirid)
else:
result.append(_obshashrange(repo, r))
repo.obsstore.rangeobshashcache.save(repo)
return result
@eh.addattr(localrepo.localpeer, 'evoext_obshashrange_v1')
def local_obshashrange_v0(peer, ranges):
return _obshashrange_v0(peer._repo, ranges)
_indexformat = '>I'
_indexsize = _calcsize(_indexformat)
def _encrange(node_rangeid):
"""encode a (node) range"""
headnode, index = node_rangeid
return headnode + _pack(_indexformat, index)
def _decrange(data):
"""encode a (node) range"""
assert _indexsize < len(data), len(data)
headnode = data[:-_indexsize]
index = _unpack(_indexformat, data[-_indexsize:])[0]
return (headnode, index)
@eh.addattr(wirepeer, 'evoext_obshashrange_v1')
def peer_obshashrange_v0(self, ranges):
binranges = [_encrange(r) for r in ranges]
encranges = encodelist(binranges)
d = self._call("evoext_obshashrange_v1", ranges=encranges)
try:
return decodelist(d)
except ValueError:
self._abort(error.ResponseError(_("unexpected response:"), d))
@compat.wireprotocommand(eh, 'evoext_obshashrange_v1', 'ranges')
def srv_obshashrange_v1(repo, proto, ranges):
ranges = decodelist(ranges)
ranges = [_decrange(r) for r in ranges]
hashes = _obshashrange_v0(repo, ranges)
return encodelist(hashes)
def _useobshashrange(repo):
base = repo.ui.configbool('experimental', 'obshashrange', True)
if base:
maxrevs = repo.ui.configint('experimental', 'obshashrange.max-revs', None)
if maxrevs is not None and maxrevs < len(repo.unfiltered()):
base = False
return base
def _canobshashrange(local, remote):
return (_useobshashrange(local)
and remote.capable('_evoext_obshashrange_v1'))
def _obshashrange_capabilities(orig, repo, proto):
"""wrapper to advertise new capability"""
caps = orig(repo, proto)
enabled = _useobshashrange(repo)
if obsolete.isenabled(repo, obsolete.exchangeopt) and enabled:
# Compat hg 4.6+ (2f7290555c96)
bytesresponse = False
if util.safehasattr(caps, 'data'):
bytesresponse = True
caps = caps.data
caps = caps.split()
caps.append(b'_evoext_obshashrange_v1')
caps.sort()
caps = b' '.join(caps)
# Compat hg 4.6+ (2f7290555c96)
if bytesresponse:
caps = wireprototypes.bytesresponse(caps)
return caps
@eh.extsetup
def obshashrange_extsetup(ui):
###
extensions.wrapfunction(wireprotov1server, 'capabilities',
_obshashrange_capabilities)
# wrap command content
oldcap, args = wireprotov1server.commands['capabilities']
def newcap(repo, proto):
return _obshashrange_capabilities(oldcap, repo, proto)
wireprotov1server.commands['capabilities'] = (newcap, args)
#############################
### Tree Hash computation ###
#############################
# Dash computed from a given changesets using all markers relevant to it and
# the obshash of its parents. This is similar to what happend for changeset
# node where the parent is used in the computation
def _canobshashtree(repo, remote):
return remote.capable('_evoext_obshash_0')
@eh.command(
'debugobsrelsethashtree',
[('', 'v0', None, 'hash on marker format "0"'),
('', 'v1', None, 'hash on marker format "1" (default)')], _(''))
def debugobsrelsethashtree(ui, repo, v0=False, v1=False):
"""display Obsolete markers, Relevant Set, Hash Tree
changeset-node obsrelsethashtree-node
It computed form the "orsht" of its parent and markers
relevant to the changeset itself."""
if v0 and v1:
raise error.Abort('cannot only specify one format')
elif v0:
treefunc = _obsrelsethashtreefm0
else:
treefunc = _obsrelsethashtreefm1
for chg, obs in treefunc(repo):
ui.status('%s %s\n' % (node.hex(chg), node.hex(obs)))
def _obsrelsethashtreefm0(repo):
return _obsrelsethashtree(repo, obsolete._fm0encodeonemarker)
def _obsrelsethashtreefm1(repo):
return _obsrelsethashtree(repo, obsolete._fm1encodeonemarker)
def _obsrelsethashtree(repo, encodeonemarker):
cache = []
unfi = repo.unfiltered()
markercache = {}
repo.ui.progress(_("preparing locally"), 0, total=len(unfi),
unit=_("changesets"))
for i in unfi:
ctx = unfi[i]
entry = 0
sha = hashlib.sha1()
# add data from p1
for p in ctx.parents():
p = p.rev()
if p < 0:
p = node.nullid
else:
p = cache[p][1]
if p != node.nullid:
entry += 1
sha.update(p)
tmarkers = repo.obsstore.relevantmarkers([ctx.node()])
if tmarkers:
bmarkers = []
for m in tmarkers:
if m not in markercache:
markercache[m] = encodeonemarker(m)
bmarkers.append(markercache[m])
bmarkers.sort()
for m in bmarkers:
entry += 1
sha.update(m)
if entry:
cache.append((ctx.node(), sha.digest()))
else:
cache.append((ctx.node(), node.nullid))
repo.ui.progress(_("preparing locally"), i, total=len(unfi),
unit=_("changesets"))
repo.ui.progress(_("preparing locally"), None)
return cache
def _obshash(repo, nodes, version=0):
if version == 0:
hashs = _obsrelsethashtreefm0(repo)
elif version == 1:
hashs = _obsrelsethashtreefm1(repo)
else:
assert False
nm = repo.changelog.nodemap
revs = [nm.get(n) for n in nodes]
return [r is None and node.nullid or hashs[r][1] for r in revs]
@eh.addattr(localrepo.localpeer, 'evoext_obshash')
def local_obshash(peer, nodes):
return _obshash(peer._repo, nodes)
@eh.addattr(localrepo.localpeer, 'evoext_obshash1')
def local_obshash1(peer, nodes):
return _obshash(peer._repo, nodes, version=1)
@eh.addattr(wirepeer, 'evoext_obshash')
def peer_obshash(self, nodes):
d = self._call("evoext_obshash", nodes=encodelist(nodes))
try:
return decodelist(d)
except ValueError:
self._abort(error.ResponseError(_("unexpected response:"), d))
@eh.addattr(wirepeer, 'evoext_obshash1')
def peer_obshash1(self, nodes):
d = self._call("evoext_obshash1", nodes=encodelist(nodes))
try:
return decodelist(d)
except ValueError:
self._abort(error.ResponseError(_("unexpected response:"), d))
@compat.wireprotocommand(eh, 'evoext_obshash', 'nodes')
def srv_obshash(repo, proto, nodes):
return encodelist(_obshash(repo, decodelist(nodes)))
@compat.wireprotocommand(eh, 'evoext_obshash1', 'nodes')
def srv_obshash1(repo, proto, nodes):
return encodelist(_obshash(repo, decodelist(nodes),
version=1))
def _obshash_capabilities(orig, repo, proto):
"""wrapper to advertise new capability"""
caps = orig(repo, proto)
if (obsolete.isenabled(repo, obsolete.exchangeopt)
and repo.ui.configbool('experimental', 'evolution.obsdiscovery', True)):
# Compat hg 4.6+ (2f7290555c96)
bytesresponse = False
if util.safehasattr(caps, 'data'):
bytesresponse = True
caps = caps.data
caps = caps.split()
caps.append(b'_evoext_obshash_0')
caps.append(b'_evoext_obshash_1')
caps.sort()
caps = b' '.join(caps)
# Compat hg 4.6+ (2f7290555c96)
if bytesresponse:
caps = wireprototypes.bytesresponse(caps)
return caps
@eh.extsetup
def obshash_extsetup(ui):
extensions.wrapfunction(wireprotov1server, 'capabilities',
_obshash_capabilities)
# wrap command content
oldcap, args = wireprotov1server.commands['capabilities']
def newcap(repo, proto):
return _obshash_capabilities(oldcap, repo, proto)
wireprotov1server.commands['capabilities'] = (newcap, args)
##########################################
### trigger discovery during exchange ###
##########################################
def _dopushmarkers(pushop):
return (# we have any markers to push
pushop.repo.obsstore
# exchange of obsmarkers is enabled locally
and obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
# remote server accept markers
and 'obsolete' in pushop.remote.listkeys('namespaces'))
def _pushobshashrange(pushop, commonrevs):
repo = pushop.repo.unfiltered()
remote = pushop.remote
missing = findmissingrange(pushop.ui, repo, remote, commonrevs)
missing += pushop.outgoing.missing
return missing
def _pushobshashtree(pushop, commonrevs):
repo = pushop.repo.unfiltered()
remote = pushop.remote
node = repo.changelog.node
common = findcommonobsmarkers(pushop.ui, repo, remote, commonrevs)
revs = list(repo.revs('only(%ln, %ln)', pushop.futureheads, common))
return [node(r) for r in revs]
# available discovery method, first valid is used
# tuple (canuse, perform discovery))
obsdiscoveries = [
(_canobshashrange, _pushobshashrange),
(_canobshashtree, _pushobshashtree),
]
obsdiscovery_skip_message = """\
(skipping discovery of obsolescence markers, will exchange everything)
(controled by 'experimental.evolution.obsdiscovery' configuration)
"""
def usediscovery(repo):
return repo.ui.configbool('experimental', 'evolution.obsdiscovery', True)
@eh.wrapfunction(exchange, '_pushdiscoveryobsmarkers')
def _pushdiscoveryobsmarkers(orig, pushop):
if _dopushmarkers(pushop):
repo = pushop.repo
remote = pushop.remote
obsexcmsg(repo.ui, "computing relevant nodes\n")
revs = list(repo.revs('::%ln', pushop.futureheads))
unfi = repo.unfiltered()
if not usediscovery(repo):
# discovery disabled by user
repo.ui.status(obsdiscovery_skip_message)
return orig(pushop)
# look for an obs-discovery protocol we can use
discovery = None
for candidate in obsdiscoveries:
if candidate[0](repo, remote):
discovery = candidate[1]
break
if discovery is None:
# no discovery available, rely on core to push all relevants
# obs markers.
return orig(pushop)
obsexcmsg(repo.ui, "looking for common markers in %i nodes\n"
% len(revs))
commonrevs = list(unfi.revs('::%ln', pushop.outgoing.commonheads))
# find the nodes where the relevant obsmarkers mismatches
nodes = discovery(pushop, commonrevs)
if nodes:
obsexcmsg(repo.ui, "computing markers relevant to %i nodes\n"
% len(nodes))
pushop.outobsmarkers = repo.obsstore.relevantmarkers(nodes)
else:
obsexcmsg(repo.ui, "markers already in sync\n")
pushop.outobsmarkers = []
@eh.extsetup
def _installobsmarkersdiscovery(ui):
olddisco = exchange.pushdiscoverymapping['obsmarker']
def newdisco(pushop):
_pushdiscoveryobsmarkers(olddisco, pushop)
exchange.pushdiscoverymapping['obsmarker'] = newdisco
def buildpullobsmarkersboundaries(pullop, bundle2=True):
"""small function returning the argument for pull markers call
may to contains 'heads' and 'common'. skip the key for None.
It is a separed function to play around with strategy for that."""
repo = pullop.repo
remote = pullop.remote
unfi = repo.unfiltered()
revs = unfi.revs('::(%ln - null)', pullop.common)
boundaries = {'heads': pullop.pulledsubset}
if not revs: # nothing common
boundaries['common'] = [node.nullid]
return boundaries
if not usediscovery(repo):
# discovery disabled by users.
repo.ui.status(obsdiscovery_skip_message)
boundaries['common'] = [node.nullid]
return boundaries
if bundle2 and _canobshashrange(repo, remote):
obsexcmsg(repo.ui, "looking for common markers in %i nodes\n"
% len(revs))
boundaries['missing'] = findmissingrange(repo.ui, repo, pullop.remote,
revs)
elif remote.capable('_evoext_obshash_0'):
obsexcmsg(repo.ui, "looking for common markers in %i nodes\n"
% len(revs))
boundaries['common'] = findcommonobsmarkers(repo.ui, repo, remote, revs)
else:
boundaries['common'] = [node.nullid]
return boundaries
# merge later for outer layer wrapping
eh.merge(stablerangecache.eh)