discovery: implement a discovery process and use it for push
Much faster! So functional! Very proof on concept implementation! Wow!
--- a/hgext/evolve.py Wed Mar 05 16:58:56 2014 -0800
+++ b/hgext/evolve.py Wed Mar 05 15:09:07 2014 -0800
@@ -2131,14 +2131,28 @@
repo = pushop.repo
remote = pushop.remote
unfi = repo.unfiltered()
+ cl = unfi.changelog
if (obsolete._enabled and repo.obsstore and
'obsolete' in remote.listkeys('namespaces')):
repo.ui.status("OBSEXC: computing relevant nodes\n")
- nodes = [ctx.node() for ctx in unfi.set('::%ln', pushop.commonheads)]
- repo.ui.status("OBSEXC: computing markers relevant to %i nodes\n"
- % len(nodes))
- markers = repo.obsstore.relevantmarkers(nodes)
- if remote.capable('_evoext_pushobsmarkers_0'):
+ revs = unfi.revs('::%ln', pushop.commonheads)
+ common = []
+ if remote.capable('_evoext_obshash_0'):
+ repo.ui.status("OBSEXC: looking for common markers in %i nodes\n"
+ % len(revs))
+ common = findcommonobsmarkers(pushop.ui, repo, remote, revs)
+ revs = list(unfi.revs('%ld - (::%ln)', revs, common))
+ nodes = [cl.node(r) for r in revs]
+ if nodes:
+ repo.ui.status("OBSEXC: computing markers relevant to %i nodes\n"
+ % len(nodes))
+ markers = repo.obsstore.relevantmarkers(nodes)
+ else:
+ repo.ui.status("OBSEXC: markers already in sync\n")
+ markers = []
+ if not markers:
+ repo.ui.status("OBSEXC: no marker to push\n")
+ elif remote.capable('_evoext_pushobsmarkers_0'):
repo.ui.status("OBSEXC: writing %i markers\n" % len(markers))
obsdata = pushobsmarkerStringIO()
_encodemarkersstream(obsdata, markers)
@@ -2295,15 +2309,6 @@
yield c
return wireproto.streamres(data())
-@eh.wrapfunction(wireproto, 'capabilities')
-def capabilities(orig, repo, proto):
- """wrapper to advertise new capability"""
- caps = orig(repo, proto)
- if obsolete._enabled:
- caps += ' _evoext_pushobsmarkers_0'
- caps += ' _evoext_pullobsmarkers_0'
- return caps
-
def _obsrelsethashtree(repo):
cache = []
unfi = repo.unfiltered()
@@ -2346,6 +2351,86 @@
ui.status('%s %s\n' % (node.hex(chg), node.hex(obs)))
+### Set discovery START
+
+import random
+from mercurial import dagutil
+from mercurial import setdiscovery
+
+def _obshash(repo, nodes):
+ hashs = _obsrelsethashtree(repo)
+ nm = repo.changelog.nodemap
+ return [hashs[nm.get(n)][1] for n in nodes]
+
+def srv_obshash(repo, proto, nodes):
+ return wireproto.encodelist(_obshash(repo, wireproto.decodelist(nodes)))
+
+@eh.addattr(localrepo.localpeer, 'evoext_obshash')
+def local_obshash(peer, nodes):
+ return _obshash(peer._repo, nodes)
+
+@eh.addattr(wireproto.wirepeer, 'evoext_obshash')
+def peer_obshash(self, nodes):
+ d = self._call("evoext_obshash", nodes=wireproto.encodelist(nodes))
+ try:
+ return wireproto.decodelist(d)
+ except ValueError:
+ self._abort(error.ResponseError(_("unexpected response:"), d))
+
+def findcommonobsmarkers(ui, local, remote, probeset,
+ initialsamplesize=100,
+ fullsamplesize=200):
+ # from discovery
+ roundtrips = 0
+ cl = local.changelog
+ dag = dagutil.revlogdag(cl)
+ localhash = _obsrelsethashtree(local)
+ missing = set()
+ common = set()
+ undecided = set(probeset)
+ _takefullsample = setdiscovery._takefullsample
+
+ while undecided:
+
+ ui.note(_("sampling from both directions\n"))
+ sample = _takefullsample(dag, undecided, size=fullsamplesize)
+
+ roundtrips += 1
+ ui.debug("query %i; still undecided: %i, sample size is: %i\n"
+ % (roundtrips, len(undecided), len(sample)))
+ # indices between sample and externalized version must match
+ sample = list(sample)
+ remotehash = remote.evoext_obshash(dag.externalizeall(sample))
+
+ yesno = [localhash[ix][1] == remotehash[si]
+ for si, ix in enumerate(sample)]
+
+ commoninsample = set(n for i, n in enumerate(sample) if yesno[i])
+ common.update(dag.ancestorset(commoninsample, common))
+
+ missinginsample = [n for i, n in enumerate(sample) if not yesno[i]]
+ missing.update(dag.descendantset(missinginsample, missing))
+
+ undecided.difference_update(missing)
+ undecided.difference_update(common)
+
+
+ result = dag.headsetofconnecteds(common)
+ ui.debug("%d total queries\n" % roundtrips)
+
+ if not result and srvheadhashes != [nullid]:
+ return set([nullid])
+ return dag.externalizeall(result)
+
+@eh.wrapfunction(wireproto, 'capabilities')
+def capabilities(orig, repo, proto):
+ """wrapper to advertise new capability"""
+ caps = orig(repo, proto)
+ if obsolete._enabled:
+ caps += ' _evoext_pushobsmarkers_0'
+ caps += ' _evoext_pullobsmarkers_0'
+ caps += ' _evoext_obshash_0'
+ return caps
@eh.extsetup
@@ -2353,3 +2438,4 @@
localrepo.MODERNCAPS.add('_evoext_pullobsmarkers_0')
wireproto.commands['evoext_pushobsmarkers_0'] = (srv_pushobsmarkers, '')
wireproto.commands['evoext_pullobsmarkers_0'] = (srv_pullobsmarkers, '*')
+ wireproto.commands['evoext_obshash'] = (srv_obshash, 'nodes')
--- a/tests/test-exchange-A7.t Wed Mar 05 16:58:56 2014 -0800
+++ b/tests/test-exchange-A7.t Wed Mar 05 15:09:07 2014 -0800
@@ -62,8 +62,7 @@
no changes found
OBSEXC: computing relevant nodes
OBSEXC: computing markers relevant to 1 nodes
- OBSEXC: encoding 0 markers
- OBSEXC: sending 0 pushkey payload (0 bytes)
+ OBSEXC: no marker to push
OBSEXC: DONE
## post push state
# obstore: main
--- a/tests/test-exchange-B3.t Wed Mar 05 16:58:56 2014 -0800
+++ b/tests/test-exchange-B3.t Wed Mar 05 15:09:07 2014 -0800
@@ -81,8 +81,7 @@
added 1 changesets with 1 changes to 1 files
OBSEXC: computing relevant nodes
OBSEXC: computing markers relevant to 2 nodes
- OBSEXC: encoding 0 markers
- OBSEXC: sending 0 pushkey payload (0 bytes)
+ OBSEXC: no marker to push
OBSEXC: DONE
## post push state
# obstore: main
--- a/tests/test-exchange-B7.t Wed Mar 05 16:58:56 2014 -0800
+++ b/tests/test-exchange-B7.t Wed Mar 05 15:09:07 2014 -0800
@@ -69,8 +69,7 @@
no changes found
OBSEXC: computing relevant nodes
OBSEXC: computing markers relevant to 1 nodes
- OBSEXC: encoding 0 markers
- OBSEXC: sending 0 pushkey payload (0 bytes)
+ OBSEXC: no marker to push
OBSEXC: DONE
## post push state
# obstore: main
--- a/tests/test-exchange-D3.t Wed Mar 05 16:58:56 2014 -0800
+++ b/tests/test-exchange-D3.t Wed Mar 05 15:09:07 2014 -0800
@@ -74,8 +74,7 @@
no changes found
OBSEXC: computing relevant nodes
OBSEXC: computing markers relevant to 1 nodes
- OBSEXC: encoding 0 markers
- OBSEXC: sending 0 pushkey payload (0 bytes)
+ OBSEXC: no marker to push
OBSEXC: DONE
## post push state
# obstore: main