firstmergecache: ignore permission and OS errors when writing
This cache is related to the obshashrange one and we update it lazily by
default.
This can be an issue when pulling locally from a read only repository that was
not configured for a more aggressive cache warming. The raised permission error
was uncaught and could crash the whole process. Errors during cache update
should not block Mercurial operations.
# Code dedicated to the discovery of obsolescence marker "over the wire"
#
# Copyright 2017 Pierre-Yves David <pierre-yves.david@ens-lyon.org>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
# Status: Experiment in progress // open question
#
# The final discovery algorithm and protocol will go into core when we'll be
# happy with it.
#
# Some of the code in this module is for compatiblity with older version
# of evolve and will be eventually dropped.
from __future__ import absolute_import
try:
import StringIO as io
StringIO = io.StringIO
except ImportError:
import io
StringIO = io.StringIO
import hashlib
import heapq
import sqlite3
import struct
import weakref
from mercurial import (
error,
exchange,
extensions,
localrepo,
node,
obsolete,
scmutil,
setdiscovery,
util,
)
from mercurial.i18n import _
from . import (
compat,
exthelper,
obscache,
utility,
stablerange,
stablerangecache,
)
try:
from mercurial import wireprototypes, wireprotov1server
from mercurial.wireprotov1peer import wirepeer
from mercurial.wireprototypes import encodelist, decodelist
except (ImportError, AttributeError): # <= hg-4.5
from mercurial import wireproto as wireprototypes
wireprotov1server = wireprototypes
from mercurial.wireproto import wirepeer, encodelist, decodelist
try:
from mercurial import dagutil
dagutil.revlogdag
except (ImportError, AttributeError): # <= hg-4.7
from . import dagutil
_pack = struct.pack
_unpack = struct.unpack
_calcsize = struct.calcsize
eh = exthelper.exthelper()
obsexcmsg = utility.obsexcmsg
# Config
eh.configitem('experimental', 'evolution.obsdiscovery')
eh.configitem('experimental', 'obshashrange')
eh.configitem('experimental', 'obshashrange.warm-cache')
eh.configitem('experimental', 'obshashrange.max-revs')
eh.configitem('experimental', 'obshashrange.lru-size')
##################################
### Code performing discovery ###
##################################
def findcommonobsmarkers(ui, local, remote, probeset,
initialsamplesize=100,
fullsamplesize=200):
# from discovery
roundtrips = 0
cl = local.changelog
dag = dagutil.revlogdag(cl)
missing = set()
common = set()
undecided = set(probeset)
totalnb = len(undecided)
ui.progress(_("comparing with other"), 0, total=totalnb,
unit=_("changesets"))
_takefullsample = setdiscovery._takefullsample
if remote.capable('_evoext_obshash_1'):
getremotehash = remote.evoext_obshash1
localhash = _obsrelsethashtreefm1(local)
else:
getremotehash = remote.evoext_obshash
localhash = _obsrelsethashtreefm0(local)
while undecided:
ui.note(_("sampling from both directions\n"))
if len(undecided) < fullsamplesize:
sample = set(undecided)
else:
sample = _takefullsample(dag, undecided, size=fullsamplesize)
roundtrips += 1
ui.progress(_("comparing with other"), totalnb - len(undecided),
total=totalnb, unit=_("changesets"))
ui.debug("query %i; still undecided: %i, sample size is: %i\n"
% (roundtrips, len(undecided), len(sample)))
# indices between sample and externalized version must match
sample = list(sample)
remotehash = getremotehash(dag.externalizeall(sample))
yesno = [localhash[ix][1] == remotehash[si]
for si, ix in enumerate(sample)]
commoninsample = set(n for i, n in enumerate(sample) if yesno[i])
common.update(dag.ancestorset(commoninsample, common))
missinginsample = [n for i, n in enumerate(sample) if not yesno[i]]
missing.update(dag.descendantset(missinginsample, missing))
undecided.difference_update(missing)
undecided.difference_update(common)
ui.progress(_("comparing with other"), None)
result = dag.headsetofconnecteds(common)
ui.debug("%d total queries\n" % roundtrips)
if not result:
return set([node.nullid])
return dag.externalizeall(result)
def findmissingrange(ui, local, remote, probeset,
initialsamplesize=100,
fullsamplesize=200):
missing = set()
starttime = util.timer()
heads = local.revs('heads(%ld)', probeset)
local.stablerange.warmup(local)
rangelength = local.stablerange.rangelength
subranges = local.stablerange.subranges
# size of slice ?
heappop = heapq.heappop
heappush = heapq.heappush
heapify = heapq.heapify
tested = set()
sample = []
samplesize = initialsamplesize
def addentry(entry):
if entry in tested:
return False
sample.append(entry)
tested.add(entry)
return True
for h in heads:
entry = (h, 0)
addentry(entry)
local.obsstore.rangeobshashcache.update(local)
querycount = 0
ui.progress(_("comparing obsmarker with other"), querycount,
unit=_("queries"))
overflow = []
while sample or overflow:
if overflow:
sample.extend(overflow)
overflow = []
if samplesize < len(sample):
# too much sample already
overflow = sample[samplesize:]
sample = sample[:samplesize]
elif len(sample) < samplesize:
ui.debug("query %i; add more sample (target %i, current %i)\n"
% (querycount, samplesize, len(sample)))
# we need more sample !
needed = samplesize - len(sample)
sliceme = []
heapify(sliceme)
for entry in sample:
if 1 < rangelength(local, entry):
heappush(sliceme, (-rangelength(local, entry), entry))
while sliceme and 0 < needed:
_key, target = heappop(sliceme)
for new in subranges(local, target):
# XXX we could record hierarchy to optimise drop
if addentry(new):
if 1 < len(new):
heappush(sliceme, (-rangelength(local, new), new))
needed -= 1
if needed <= 0:
break
# no longer the first interation
samplesize = fullsamplesize
nbsample = len(sample)
maxsize = max([rangelength(local, r) for r in sample])
ui.debug("query %i; sample size is %i, largest range %i\n"
% (querycount, nbsample, maxsize))
nbreplies = 0
replies = list(_queryrange(ui, local, remote, sample))
sample = []
n = local.changelog.node
for entry, remotehash in replies:
nbreplies += 1
if remotehash == _obshashrange(local, entry):
continue
elif 1 == rangelength(local, entry):
missing.add(n(entry[0]))
else:
for new in subranges(local, entry):
addentry(new)
assert nbsample == nbreplies
querycount += 1
ui.progress(_("comparing obsmarker with other"), querycount,
unit=_("queries"))
ui.progress(_("comparing obsmarker with other"), None)
local.obsstore.rangeobshashcache.save(local)
duration = util.timer() - starttime
logmsg = ('obsdiscovery, %d/%d mismatch'
' - %d obshashrange queries in %.4f seconds\n')
logmsg %= (len(missing), len(probeset), querycount, duration)
ui.log('evoext-obsdiscovery', logmsg)
ui.debug(logmsg)
return sorted(missing)
def _queryrange(ui, repo, remote, allentries):
# question are asked with node
n = repo.changelog.node
noderanges = [(n(entry[0]), entry[1]) for entry in allentries]
replies = remote.evoext_obshashrange_v1(noderanges)
result = []
for idx, entry in enumerate(allentries):
result.append((entry, replies[idx]))
return result
##############################
### Range Hash computation ###
##############################
@eh.command(
'debugobshashrange',
[
('', 'rev', [], 'display obshash for all (rev, 0) range in REVS'),
('', 'subranges', False, 'display all subranges'),
],
_(''))
def debugobshashrange(ui, repo, **opts):
"""display the ::REVS set topologically sorted in a stable way
"""
s = node.short
revs = scmutil.revrange(repo, opts['rev'])
# prewarm depth cache
if revs:
repo.stablerange.warmup(repo, max(revs))
cl = repo.changelog
rangelength = repo.stablerange.rangelength
depthrev = repo.stablerange.depthrev
if opts['subranges']:
ranges = stablerange.subrangesclosure(repo, repo.stablerange, revs)
else:
ranges = [(r, 0) for r in revs]
headers = ('rev', 'node', 'index', 'size', 'depth', 'obshash')
linetemplate = '%12d %12s %12d %12d %12d %12s\n'
headertemplate = linetemplate.replace('d', 's')
ui.status(headertemplate % headers)
repo.obsstore.rangeobshashcache.update(repo)
for r in ranges:
d = (r[0],
s(cl.node(r[0])),
r[1],
rangelength(repo, r),
depthrev(repo, r[0]),
node.short(_obshashrange(repo, r)))
ui.status(linetemplate % d)
repo.obsstore.rangeobshashcache.save(repo)
def _obshashrange(repo, rangeid):
"""return the obsolete hash associated to a range"""
cache = repo.obsstore.rangeobshashcache
cl = repo.changelog
obshash = cache.get(rangeid)
if obshash is not None:
return obshash
pieces = []
nullid = node.nullid
if repo.stablerange.rangelength(repo, rangeid) == 1:
rangenode = cl.node(rangeid[0])
tmarkers = repo.obsstore.relevantmarkers([rangenode])
pieces = []
for m in tmarkers:
mbin = obsolete._fm1encodeonemarker(m)
pieces.append(mbin)
pieces.sort()
else:
for subrange in repo.stablerange.subranges(repo, rangeid):
obshash = _obshashrange(repo, subrange)
if obshash != nullid:
pieces.append(obshash)
sha = hashlib.sha1()
# note: if there is only one subrange with actual data, we'll just
# reuse the same hash.
if not pieces:
obshash = node.nullid
elif len(pieces) != 1 or obshash is None:
sha = hashlib.sha1()
for p in pieces:
sha.update(p)
obshash = sha.digest()
cache[rangeid] = obshash
return obshash
### sqlite caching
_sqliteschema = [
"""CREATE TABLE obshashrange(rev INTEGER NOT NULL,
idx INTEGER NOT NULL,
obshash BLOB NOT NULL,
PRIMARY KEY(rev, idx));""",
"CREATE INDEX range_index ON obshashrange(rev, idx);",
"""CREATE TABLE meta(schemaversion INTEGER NOT NULL,
tiprev INTEGER NOT NULL,
tipnode BLOB NOT NULL,
nbobsmarker INTEGER NOT NULL,
obssize BLOB NOT NULL,
obskey BLOB NOT NULL
);""",
]
_queryexist = "SELECT name FROM sqlite_master WHERE type='table' AND name='meta';"
_clearmeta = """DELETE FROM meta;"""
_newmeta = """INSERT INTO meta (schemaversion, tiprev, tipnode, nbobsmarker, obssize, obskey)
VALUES (?,?,?,?,?,?);"""
_updateobshash = "INSERT INTO obshashrange(rev, idx, obshash) VALUES (?,?,?);"
_querymeta = "SELECT schemaversion, tiprev, tipnode, nbobsmarker, obssize, obskey FROM meta;"
_queryobshash = "SELECT obshash FROM obshashrange WHERE (rev = ? AND idx = ?);"
_query_max_stored = "SELECT MAX(rev) FROM obshashrange"
_reset = "DELETE FROM obshashrange;"
_delete = "DELETE FROM obshashrange WHERE (rev = ? AND idx = ?);"
def _affectedby(repo, markers):
"""return all nodes whose relevant set is affected by this changeset
This is a reversed version of obsstore.relevantmarkers
"""
affected_nodes = set()
known_markers = set(markers)
node_to_proceed = set()
marker_to_proceed = set(known_markers)
obsstore = repo.obsstore
while node_to_proceed or marker_to_proceed:
while marker_to_proceed:
m = marker_to_proceed.pop()
# check successors and parent
if m[1]:
relevant = (m[1], )
else: # prune case
relevant = ((m[0], ), m[5])
for l in relevant:
if l is None:
continue
for n in l:
if n not in affected_nodes:
node_to_proceed.add(n)
affected_nodes.add(n)
# marker_to_proceed is now empty:
if node_to_proceed:
n = node_to_proceed.pop()
markers = set()
markers.update(obsstore.successors.get(n, ()))
markers.update(obsstore.predecessors.get(n, ()))
markers -= known_markers
marker_to_proceed.update(markers)
known_markers.update(markers)
return affected_nodes
# if there is that many new obsmarkers, reset without analysing them
RESET_ABOVE = 10000
class _obshashcache(obscache.dualsourcecache):
_schemaversion = 3
_cachename = 'evo-ext-obshashrange' # used for error message
_filename = 'cache/evoext_obshashrange_v2.sqlite'
def __init__(self, repo):
super(_obshashcache, self).__init__()
self._vfs = repo.vfs
self._path = repo.vfs.join(self._filename)
self._new = set()
self._valid = True
self._repo = weakref.ref(repo.unfiltered())
# cache status
self._ondiskcachekey = None
self._data = {}
def clear(self, reset=False):
super(_obshashcache, self).clear(reset=reset)
self._data.clear()
self._new.clear()
if reset:
self._valid = False
if '_con' in vars(self):
del self._con
def get(self, rangeid):
# revision should be covered by the tiprev
#
# XXX there are issue with cache warming, we hack around it for now
if not getattr(self, '_updating', False):
if self._cachekey[0] < rangeid[0]:
msg = ('using unwarmed obshashrangecache (%s %s)'
% (rangeid[0], self._cachekey[0]))
raise error.ProgrammingError(msg)
value = self._data.get(rangeid)
if value is None and self._con is not None:
nrange = (rangeid[0], rangeid[1])
try:
obshash = self._con.execute(_queryobshash, nrange).fetchone()
if obshash is not None:
value = obshash[0]
self._data[rangeid] = value
except (sqlite3.DatabaseError, sqlite3.OperationalError):
# something is wrong with the sqlite db
# Since this is a cache, we ignore it.
if '_con' in vars(self):
del self._con
self._new.clear()
return value
def __setitem__(self, rangeid, obshash):
self._new.add(rangeid)
self._data[rangeid] = obshash
def _updatefrom(self, repo, revs, obsmarkers):
"""override this method to update your cache data incrementally
revs: list of new revision in the changelog
obsmarker: list of new obsmarkers in the obsstore
"""
# XXX for now, we'll not actually update the cache, but we'll be
# smarter at invalidating it.
#
# 1) new revisions does not get their entry updated (not update)
# 2) if we detect markers affecting non-new revision we reset the cache
self._updating = True
con = self._con
if con is not None:
reset = False
affected = []
if RESET_ABOVE < len(obsmarkers):
# lots of new obsmarkers, probably smarter to reset the cache
repo.ui.log('evoext-cache', 'obshashcache reset - '
'many new markers (%d)\n'
% len(obsmarkers))
reset = True
elif obsmarkers:
max_stored = con.execute(_query_max_stored).fetchall()[0][0]
affected_nodes = _affectedby(repo, obsmarkers)
rev = repo.changelog.nodemap.get
affected = [rev(n) for n in affected_nodes]
affected = [r for r in affected
if r is not None and r <= max_stored]
if RESET_ABOVE < len(affected):
repo.ui.log('evoext-cache', 'obshashcache reset - '
'new markers affect many changeset (%d)\n'
% len(affected))
reset = True
if affected or reset:
if not reset:
repo.ui.log('evoext-cache', 'obshashcache clean - '
'new markers affect %d changeset and cached ranges\n'
% len(affected))
if con is not None:
# always reset for now, the code detecting affect is buggy
# so we need to reset more broadly than we would like.
try:
if repo.stablerange._con is None:
repo.ui.log('evoext-cache', 'obshashcache reset - '
'underlying stablerange cache unavailable\n')
reset = True
if reset:
con.execute(_reset)
self._data.clear()
else:
ranges = repo.stablerange.contains(repo, affected)
con.executemany(_delete, ranges)
for r in ranges:
self._data.pop(r, None)
except (sqlite3.DatabaseError, sqlite3.OperationalError) as exc:
repo.ui.log('evoext-cache', 'error while updating obshashrange cache: %s' % exc)
del self._updating
return
# rewarm key revisions
#
# (The current invalidation is too wide, but rewarming every
# single revision is quite costly)
newrevs = []
stop = self._cachekey[0] # tiprev
for h in repo.filtered('immutable').changelog.headrevs():
if h <= stop and h in affected:
newrevs.append(h)
newrevs.extend(revs)
revs = newrevs
repo.depthcache.update(repo)
total = len(revs)
def progress(pos, rev):
repo.ui.progress('updating obshashrange cache',
pos, 'rev %s' % rev, unit='revision', total=total)
# warm the cache for the new revs
progress(0, '')
for idx, r in enumerate(revs):
_obshashrange(repo, (r, 0))
progress(idx, r)
progress(None, '')
del self._updating
@property
def _fullcachekey(self):
return (self._schemaversion, ) + self._cachekey
def load(self, repo):
if self._con is None:
self._cachekey = self.emptykey
self._ondiskcachekey = self.emptykey
assert self._cachekey is not None
def _db(self):
try:
util.makedirs(self._vfs.dirname(self._path))
except OSError:
return None
con = sqlite3.connect(self._path, timeout=30, isolation_level="IMMEDIATE")
con.text_factory = str
return con
@util.propertycache
def _con(self):
if not self._valid:
return None
repo = self._repo()
if repo is None:
return None
con = self._db()
if con is None:
return None
cur = con.execute(_queryexist)
if cur.fetchone() is None:
self._valid = False
return None
meta = con.execute(_querymeta).fetchone()
if meta is None or meta[0] != self._schemaversion:
self._valid = False
return None
self._cachekey = self._ondiskcachekey = meta[1:]
return con
def save(self, repo):
if self._cachekey is None:
return
if self._cachekey == self._ondiskcachekey and not self._new:
return
repo = repo.unfiltered()
try:
with repo.lock():
if 'stablerange' in vars(repo):
repo.stablerange.save(repo)
self._save(repo)
except error.LockError:
# Exceptionnally we are noisy about it since performance impact
# is large We should address that before using this more
# widely.
msg = _('obshashrange cache: skipping save unable to lock repo\n')
repo.ui.warn(msg)
def _save(self, repo):
if not self._new:
return
try:
return self._trysave(repo)
except (sqlite3.DatabaseError, sqlite3.OperationalError, sqlite3.IntegrityError) as exc:
# Catch error that may arise under stress
#
# operational error catch read-only and locked database
# IntegrityError catch Unique constraint error that may arise
if '_con' in vars(self):
del self._con
self._new.clear()
repo.ui.log('evoext-cache', 'error while saving new data: %s' % exc)
def _trysave(self, repo):
if self._con is None:
util.unlinkpath(self._path, ignoremissing=True)
if '_con' in vars(self):
del self._con
con = self._db()
if con is None:
repo.ui.log('evoext-cache', 'unable to write obshashrange cache'
' - cannot create database')
return
with con:
for req in _sqliteschema:
con.execute(req)
meta = [self._schemaversion] + list(self.emptykey)
con.execute(_newmeta, meta)
self._ondiskcachekey = self.emptykey
else:
con = self._con
with con:
meta = con.execute(_querymeta).fetchone()
if meta[1:] != self._ondiskcachekey:
# drifting is currently an issue because this means another
# process might have already added the cache line we are about
# to add. This will confuse sqlite
msg = _('obshashrange cache: skipping write, '
'database drifted under my feet\n')
repo.ui.warn(msg)
self._new.clear()
self._valid = False
if '_con' in vars(self):
del self._con
self._valid = False
return
data = ((rangeid[0], rangeid[1], self.get(rangeid)) for rangeid in self._new)
con.executemany(_updateobshash, data)
cachekey = self._fullcachekey
con.execute(_clearmeta) # remove the older entry
con.execute(_newmeta, cachekey)
self._new.clear()
self._valid = True
self._ondiskcachekey = self._cachekey
@eh.wrapfunction(obsolete.obsstore, '_addmarkers')
def _addmarkers(orig, obsstore, *args, **kwargs):
obsstore.rangeobshashcache.clear()
return orig(obsstore, *args, **kwargs)
obsstorefilecache = localrepo.localrepository.obsstore
# obsstore is a filecache so we have do to some spacial dancing
@eh.wrapfunction(obsstorefilecache, 'func')
def obsstorewithcache(orig, repo):
obsstore = orig(repo)
obsstore.rangeobshashcache = _obshashcache(repo.unfiltered())
return obsstore
@eh.reposetup
def setupcache(ui, repo):
class obshashrepo(repo.__class__):
@localrepo.unfilteredmethod
def destroyed(self):
if 'obsstore' in vars(self):
self.obsstore.rangeobshashcache.clear()
super(obshashrepo, self).destroyed()
@localrepo.unfilteredmethod
def updatecaches(self, tr=None, **kwargs):
if utility.shouldwarmcache(self, tr):
self.obsstore.rangeobshashcache.update(self)
self.obsstore.rangeobshashcache.save(self)
super(obshashrepo, self).updatecaches(tr, **kwargs)
repo.__class__ = obshashrepo
### wire protocol commands
def _obshashrange_v0(repo, ranges):
"""return a list of hash from a list of range
The range have the id encoded as a node
return 'wdirid' for unknown range"""
nm = repo.changelog.nodemap
ranges = [(nm.get(n), idx) for n, idx in ranges]
if ranges:
maxrev = max(r for r, i in ranges)
if maxrev is not None:
repo.stablerange.warmup(repo, upto=maxrev)
result = []
repo.obsstore.rangeobshashcache.update(repo)
for r in ranges:
if r[0] is None:
result.append(node.wdirid)
else:
result.append(_obshashrange(repo, r))
repo.obsstore.rangeobshashcache.save(repo)
return result
@eh.addattr(localrepo.localpeer, 'evoext_obshashrange_v1')
def local_obshashrange_v0(peer, ranges):
return _obshashrange_v0(peer._repo, ranges)
_indexformat = '>I'
_indexsize = _calcsize(_indexformat)
def _encrange(node_rangeid):
"""encode a (node) range"""
headnode, index = node_rangeid
return headnode + _pack(_indexformat, index)
def _decrange(data):
"""encode a (node) range"""
assert _indexsize < len(data), len(data)
headnode = data[:-_indexsize]
index = _unpack(_indexformat, data[-_indexsize:])[0]
return (headnode, index)
@eh.addattr(wirepeer, 'evoext_obshashrange_v1')
def peer_obshashrange_v0(self, ranges):
binranges = [_encrange(r) for r in ranges]
encranges = encodelist(binranges)
d = self._call("evoext_obshashrange_v1", ranges=encranges)
try:
return decodelist(d)
except ValueError:
self._abort(error.ResponseError(_("unexpected response:"), d))
@compat.wireprotocommand(eh, 'evoext_obshashrange_v1', 'ranges')
def srv_obshashrange_v1(repo, proto, ranges):
ranges = decodelist(ranges)
ranges = [_decrange(r) for r in ranges]
hashes = _obshashrange_v0(repo, ranges)
return encodelist(hashes)
def _useobshashrange(repo):
base = repo.ui.configbool('experimental', 'obshashrange', True)
if base:
maxrevs = repo.ui.configint('experimental', 'obshashrange.max-revs', None)
if maxrevs is not None and maxrevs < len(repo.unfiltered()):
base = False
return base
def _canobshashrange(local, remote):
return (_useobshashrange(local)
and remote.capable('_evoext_obshashrange_v1'))
def _obshashrange_capabilities(orig, repo, proto):
"""wrapper to advertise new capability"""
caps = orig(repo, proto)
enabled = _useobshashrange(repo)
if obsolete.isenabled(repo, obsolete.exchangeopt) and enabled:
# Compat hg 4.6+ (2f7290555c96)
bytesresponse = False
if util.safehasattr(caps, 'data'):
bytesresponse = True
caps = caps.data
caps = caps.split()
caps.append(b'_evoext_obshashrange_v1')
caps.sort()
caps = b' '.join(caps)
# Compat hg 4.6+ (2f7290555c96)
if bytesresponse:
caps = wireprototypes.bytesresponse(caps)
return caps
@eh.extsetup
def obshashrange_extsetup(ui):
###
extensions.wrapfunction(wireprotov1server, 'capabilities',
_obshashrange_capabilities)
# wrap command content
oldcap, args = wireprotov1server.commands['capabilities']
def newcap(repo, proto):
return _obshashrange_capabilities(oldcap, repo, proto)
wireprotov1server.commands['capabilities'] = (newcap, args)
#############################
### Tree Hash computation ###
#############################
# Dash computed from a given changesets using all markers relevant to it and
# the obshash of its parents. This is similar to what happend for changeset
# node where the parent is used in the computation
def _canobshashtree(repo, remote):
return remote.capable('_evoext_obshash_0')
@eh.command(
'debugobsrelsethashtree',
[('', 'v0', None, 'hash on marker format "0"'),
('', 'v1', None, 'hash on marker format "1" (default)')], _(''))
def debugobsrelsethashtree(ui, repo, v0=False, v1=False):
"""display Obsolete markers, Relevant Set, Hash Tree
changeset-node obsrelsethashtree-node
It computed form the "orsht" of its parent and markers
relevant to the changeset itself."""
if v0 and v1:
raise error.Abort('cannot only specify one format')
elif v0:
treefunc = _obsrelsethashtreefm0
else:
treefunc = _obsrelsethashtreefm1
for chg, obs in treefunc(repo):
ui.status('%s %s\n' % (node.hex(chg), node.hex(obs)))
def _obsrelsethashtreefm0(repo):
return _obsrelsethashtree(repo, obsolete._fm0encodeonemarker)
def _obsrelsethashtreefm1(repo):
return _obsrelsethashtree(repo, obsolete._fm1encodeonemarker)
def _obsrelsethashtree(repo, encodeonemarker):
cache = []
unfi = repo.unfiltered()
markercache = {}
repo.ui.progress(_("preparing locally"), 0, total=len(unfi),
unit=_("changesets"))
for i in unfi:
ctx = unfi[i]
entry = 0
sha = hashlib.sha1()
# add data from p1
for p in ctx.parents():
p = p.rev()
if p < 0:
p = node.nullid
else:
p = cache[p][1]
if p != node.nullid:
entry += 1
sha.update(p)
tmarkers = repo.obsstore.relevantmarkers([ctx.node()])
if tmarkers:
bmarkers = []
for m in tmarkers:
if m not in markercache:
markercache[m] = encodeonemarker(m)
bmarkers.append(markercache[m])
bmarkers.sort()
for m in bmarkers:
entry += 1
sha.update(m)
if entry:
cache.append((ctx.node(), sha.digest()))
else:
cache.append((ctx.node(), node.nullid))
repo.ui.progress(_("preparing locally"), i, total=len(unfi),
unit=_("changesets"))
repo.ui.progress(_("preparing locally"), None)
return cache
def _obshash(repo, nodes, version=0):
if version == 0:
hashs = _obsrelsethashtreefm0(repo)
elif version == 1:
hashs = _obsrelsethashtreefm1(repo)
else:
assert False
nm = repo.changelog.nodemap
revs = [nm.get(n) for n in nodes]
return [r is None and node.nullid or hashs[r][1] for r in revs]
@eh.addattr(localrepo.localpeer, 'evoext_obshash')
def local_obshash(peer, nodes):
return _obshash(peer._repo, nodes)
@eh.addattr(localrepo.localpeer, 'evoext_obshash1')
def local_obshash1(peer, nodes):
return _obshash(peer._repo, nodes, version=1)
@eh.addattr(wirepeer, 'evoext_obshash')
def peer_obshash(self, nodes):
d = self._call("evoext_obshash", nodes=encodelist(nodes))
try:
return decodelist(d)
except ValueError:
self._abort(error.ResponseError(_("unexpected response:"), d))
@eh.addattr(wirepeer, 'evoext_obshash1')
def peer_obshash1(self, nodes):
d = self._call("evoext_obshash1", nodes=encodelist(nodes))
try:
return decodelist(d)
except ValueError:
self._abort(error.ResponseError(_("unexpected response:"), d))
@compat.wireprotocommand(eh, 'evoext_obshash', 'nodes')
def srv_obshash(repo, proto, nodes):
return encodelist(_obshash(repo, decodelist(nodes)))
@compat.wireprotocommand(eh, 'evoext_obshash1', 'nodes')
def srv_obshash1(repo, proto, nodes):
return encodelist(_obshash(repo, decodelist(nodes),
version=1))
def _obshash_capabilities(orig, repo, proto):
"""wrapper to advertise new capability"""
caps = orig(repo, proto)
if (obsolete.isenabled(repo, obsolete.exchangeopt)
and repo.ui.configbool('experimental', 'evolution.obsdiscovery', True)):
# Compat hg 4.6+ (2f7290555c96)
bytesresponse = False
if util.safehasattr(caps, 'data'):
bytesresponse = True
caps = caps.data
caps = caps.split()
caps.append(b'_evoext_obshash_0')
caps.append(b'_evoext_obshash_1')
caps.sort()
caps = b' '.join(caps)
# Compat hg 4.6+ (2f7290555c96)
if bytesresponse:
caps = wireprototypes.bytesresponse(caps)
return caps
@eh.extsetup
def obshash_extsetup(ui):
extensions.wrapfunction(wireprotov1server, 'capabilities',
_obshash_capabilities)
# wrap command content
oldcap, args = wireprotov1server.commands['capabilities']
def newcap(repo, proto):
return _obshash_capabilities(oldcap, repo, proto)
wireprotov1server.commands['capabilities'] = (newcap, args)
##########################################
### trigger discovery during exchange ###
##########################################
def _dopushmarkers(pushop):
return (# we have any markers to push
pushop.repo.obsstore
# exchange of obsmarkers is enabled locally
and obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
# remote server accept markers
and 'obsolete' in pushop.remote.listkeys('namespaces'))
def _pushobshashrange(pushop, commonrevs):
repo = pushop.repo.unfiltered()
remote = pushop.remote
missing = findmissingrange(pushop.ui, repo, remote, commonrevs)
missing += pushop.outgoing.missing
return missing
def _pushobshashtree(pushop, commonrevs):
repo = pushop.repo.unfiltered()
remote = pushop.remote
node = repo.changelog.node
common = findcommonobsmarkers(pushop.ui, repo, remote, commonrevs)
revs = list(repo.revs('only(%ln, %ln)', pushop.futureheads, common))
return [node(r) for r in revs]
# available discovery method, first valid is used
# tuple (canuse, perform discovery))
obsdiscoveries = [
(_canobshashrange, _pushobshashrange),
(_canobshashtree, _pushobshashtree),
]
obsdiscovery_skip_message = """\
(skipping discovery of obsolescence markers, will exchange everything)
(controled by 'experimental.evolution.obsdiscovery' configuration)
"""
def usediscovery(repo):
return repo.ui.configbool('experimental', 'evolution.obsdiscovery', True)
@eh.wrapfunction(exchange, '_pushdiscoveryobsmarkers')
def _pushdiscoveryobsmarkers(orig, pushop):
if _dopushmarkers(pushop):
repo = pushop.repo
remote = pushop.remote
obsexcmsg(repo.ui, "computing relevant nodes\n")
revs = list(repo.revs('::%ln', pushop.futureheads))
unfi = repo.unfiltered()
if not usediscovery(repo):
# discovery disabled by user
repo.ui.status(obsdiscovery_skip_message)
return orig(pushop)
# look for an obs-discovery protocol we can use
discovery = None
for candidate in obsdiscoveries:
if candidate[0](repo, remote):
discovery = candidate[1]
break
if discovery is None:
# no discovery available, rely on core to push all relevants
# obs markers.
return orig(pushop)
obsexcmsg(repo.ui, "looking for common markers in %i nodes\n"
% len(revs))
commonrevs = list(unfi.revs('::%ln', pushop.outgoing.commonheads))
# find the nodes where the relevant obsmarkers mismatches
nodes = discovery(pushop, commonrevs)
if nodes:
obsexcmsg(repo.ui, "computing markers relevant to %i nodes\n"
% len(nodes))
pushop.outobsmarkers = repo.obsstore.relevantmarkers(nodes)
else:
obsexcmsg(repo.ui, "markers already in sync\n")
pushop.outobsmarkers = []
@eh.extsetup
def _installobsmarkersdiscovery(ui):
olddisco = exchange.pushdiscoverymapping['obsmarker']
def newdisco(pushop):
_pushdiscoveryobsmarkers(olddisco, pushop)
exchange.pushdiscoverymapping['obsmarker'] = newdisco
def buildpullobsmarkersboundaries(pullop, bundle2=True):
"""small function returning the argument for pull markers call
may to contains 'heads' and 'common'. skip the key for None.
It is a separed function to play around with strategy for that."""
repo = pullop.repo
remote = pullop.remote
unfi = repo.unfiltered()
revs = unfi.revs('::(%ln - null)', pullop.common)
boundaries = {'heads': pullop.pulledsubset}
if not revs: # nothing common
boundaries['common'] = [node.nullid]
return boundaries
if not usediscovery(repo):
# discovery disabled by users.
repo.ui.status(obsdiscovery_skip_message)
boundaries['common'] = [node.nullid]
return boundaries
if bundle2 and _canobshashrange(repo, remote):
obsexcmsg(repo.ui, "looking for common markers in %i nodes\n"
% len(revs))
boundaries['missing'] = findmissingrange(repo.ui, repo, pullop.remote,
revs)
elif remote.capable('_evoext_obshash_0'):
obsexcmsg(repo.ui, "looking for common markers in %i nodes\n"
% len(revs))
boundaries['common'] = findcommonobsmarkers(repo.ui, repo, remote, revs)
else:
boundaries['common'] = [node.nullid]
return boundaries
# merge later for outer layer wrapping
eh.merge(stablerangecache.eh)