hgext/evolve.py
changeset 864 401da1b38cca
parent 863 e9693738f234
child 865 5c40b2a4b52f
--- a/hgext/evolve.py	Wed Mar 05 16:58:56 2014 -0800
+++ b/hgext/evolve.py	Wed Mar 05 15:09:07 2014 -0800
@@ -2131,14 +2131,28 @@
     repo = pushop.repo
     remote = pushop.remote
     unfi = repo.unfiltered()
+    cl = unfi.changelog
     if (obsolete._enabled and repo.obsstore and
         'obsolete' in remote.listkeys('namespaces')):
         repo.ui.status("OBSEXC: computing relevant nodes\n")
-        nodes = [ctx.node() for ctx in unfi.set('::%ln', pushop.commonheads)]
-        repo.ui.status("OBSEXC: computing markers relevant to %i nodes\n"
-                       % len(nodes))
-        markers = repo.obsstore.relevantmarkers(nodes)
-        if remote.capable('_evoext_pushobsmarkers_0'):
+        revs = unfi.revs('::%ln', pushop.commonheads)
+        common = []
+        if remote.capable('_evoext_obshash_0'):
+            repo.ui.status("OBSEXC: looking for common markers in %i nodes\n"
+                           % len(revs))
+            common = findcommonobsmarkers(pushop.ui, repo, remote, revs)
+            revs = list(unfi.revs('%ld - (::%ln)', revs, common))
+        nodes = [cl.node(r) for r in revs]
+        if nodes:
+            repo.ui.status("OBSEXC: computing markers relevant to %i nodes\n"
+                           % len(nodes))
+            markers = repo.obsstore.relevantmarkers(nodes)
+        else:
+            repo.ui.status("OBSEXC: markers already in sync\n")
+            markers = []
+        if not markers:
+            repo.ui.status("OBSEXC: no marker to push\n")
+        elif remote.capable('_evoext_pushobsmarkers_0'):
             repo.ui.status("OBSEXC: writing %i markers\n" % len(markers))
             obsdata = pushobsmarkerStringIO()
             _encodemarkersstream(obsdata, markers)
@@ -2295,15 +2309,6 @@
             yield c
     return wireproto.streamres(data())
 
-@eh.wrapfunction(wireproto, 'capabilities')
-def capabilities(orig, repo, proto):
-    """wrapper to advertise new capability"""
-    caps = orig(repo, proto)
-    if obsolete._enabled:
-        caps += ' _evoext_pushobsmarkers_0'
-        caps += ' _evoext_pullobsmarkers_0'
-    return caps
-
 def _obsrelsethashtree(repo):
     cache = []
     unfi = repo.unfiltered()
@@ -2346,6 +2351,86 @@
         ui.status('%s %s\n' % (node.hex(chg), node.hex(obs)))
 
 
+### Set discovery START
+
+import random
+from mercurial import dagutil
+from mercurial import setdiscovery
+
+def _obshash(repo, nodes):
+    hashs = _obsrelsethashtree(repo)
+    nm = repo.changelog.nodemap
+    return  [hashs[nm.get(n)][1] for n in nodes]
+
+def srv_obshash(repo, proto, nodes):
+    return wireproto.encodelist(_obshash(repo, wireproto.decodelist(nodes)))
+
+@eh.addattr(localrepo.localpeer, 'evoext_obshash')
+def local_obshash(peer, nodes):
+    return _obshash(peer._repo, nodes)
+
+@eh.addattr(wireproto.wirepeer, 'evoext_obshash')
+def peer_obshash(self, nodes):
+    d = self._call("evoext_obshash", nodes=wireproto.encodelist(nodes))
+    try:
+        return wireproto.decodelist(d)
+    except ValueError:
+        self._abort(error.ResponseError(_("unexpected response:"), d))
+
+def findcommonobsmarkers(ui, local, remote, probeset,
+                    initialsamplesize=100,
+                    fullsamplesize=200):
+    # from discovery
+    roundtrips = 0
+    cl = local.changelog
+    dag = dagutil.revlogdag(cl)
+    localhash = _obsrelsethashtree(local)
+    missing = set()
+    common = set()
+    undecided = set(probeset)
+    _takefullsample = setdiscovery._takefullsample
+
+    while undecided:
+
+        ui.note(_("sampling from both directions\n"))
+        sample = _takefullsample(dag, undecided, size=fullsamplesize)
+
+        roundtrips += 1
+        ui.debug("query %i; still undecided: %i, sample size is: %i\n"
+                 % (roundtrips, len(undecided), len(sample)))
+        # indices between sample and externalized version must match
+        sample = list(sample)
+        remotehash = remote.evoext_obshash(dag.externalizeall(sample))
+
+        yesno = [localhash[ix][1] == remotehash[si]
+                 for si, ix in enumerate(sample)]
+
+        commoninsample = set(n for i, n in enumerate(sample) if yesno[i])
+        common.update(dag.ancestorset(commoninsample, common))
+
+        missinginsample = [n for i, n in enumerate(sample) if not yesno[i]]
+        missing.update(dag.descendantset(missinginsample, missing))
+
+        undecided.difference_update(missing)
+        undecided.difference_update(common)
+
+
+    result = dag.headsetofconnecteds(common)
+    ui.debug("%d total queries\n" % roundtrips)
+
+    if not result and srvheadhashes != [nullid]:
+        return set([nullid])
+    return dag.externalizeall(result)
+
+@eh.wrapfunction(wireproto, 'capabilities')
+def capabilities(orig, repo, proto):
+    """wrapper to advertise new capability"""
+    caps = orig(repo, proto)
+    if obsolete._enabled:
+        caps += ' _evoext_pushobsmarkers_0'
+        caps += ' _evoext_pullobsmarkers_0'
+        caps += ' _evoext_obshash_0'
+    return caps
 
 
 @eh.extsetup
@@ -2353,3 +2438,4 @@
     localrepo.MODERNCAPS.add('_evoext_pullobsmarkers_0')
     wireproto.commands['evoext_pushobsmarkers_0'] = (srv_pushobsmarkers, '')
     wireproto.commands['evoext_pullobsmarkers_0'] = (srv_pullobsmarkers, '*')
+    wireproto.commands['evoext_obshash'] = (srv_obshash, 'nodes')