stablesort: implement an ondisk cache
authorPierre-Yves David <pierre-yves.david@octobus.net>
Wed, 29 Nov 2017 10:38:52 -0500
changeset 3337 94788616fbeb
parent 3336 c42158efb64e
child 3338 3f049353d733
stablesort: implement an ondisk cache Persisting the cache on disk will be important for large repositories.
hgext3rd/evolve/stablesort.py
--- a/hgext3rd/evolve/stablesort.py	Wed Dec 20 23:45:11 2017 +0100
+++ b/hgext3rd/evolve/stablesort.py	Wed Nov 29 10:38:52 2017 -0500
@@ -7,7 +7,9 @@
 # This software may be used and distributed according to the terms of the
 # GNU General Public License version 2 or any later version.
 
+import array
 import collections
+import struct
 
 from mercurial import (
     commands,
@@ -23,6 +25,7 @@
     depthcache,
     exthelper,
     utility,
+    genericcaches,
 )
 
 filterparents = utility.filterparents
@@ -319,6 +322,7 @@
 
     def __init__(self):
         self._jumps = {}
+        super(stablesortcache, self).__init__()
 
     def get(self, repo, rev, limit=None):
         result = []
@@ -497,10 +501,160 @@
         if previous is not None:
             recordjump(previous, lower, size)
 
+def stablesort_mergepoint_head_ondisk(repo, revs, limit=None):
+    heads = repo.revs('heads(%ld)', revs)
+    if not heads:
+        return []
+    elif 2 < len(heads):
+        raise error.Abort('cannot use head based merging, %d heads found'
+                          % len(heads))
+    head = heads.first()
+    unfi = repo.unfiltered()
+    cache = ondiskstablesortcache()
+    cache.load(unfi)
+    cache.update(unfi)
+    cache.save(unfi)
+    return cache.get(repo, head, limit=limit)
+
+S_INDEXSIZE = struct.Struct('>I')
+
+class ondiskstablesortcache(stablesortcache, genericcaches.changelogsourcebase):
+
+    _filepath = 'evoext-stablesortcache-00'
+    _cachename = 'evo-ext-stablesort'
+
+    def __init__(self):
+        super(ondiskstablesortcache, self).__init__()
+        self._index = array.array('l')
+        self._data = array.array('l')
+        del self._jumps
+
+    def getjumps(self, repo, rev):
+        if len(self._index) < rev:
+            msg = 'stablesortcache must be warmed before use (%d < %d)'
+            msg %= (len(self._index), rev)
+            raise error.ProgrammingError(msg)
+        return self._getjumps(rev)
+
+    def _getjumps(self, rev):
+        # very first revision
+        if rev == 0:
+            return None
+        # no data yet
+        if len(self._index) <= rev:
+            return None
+        index = self._index
+        # non merge revision
+        if index[rev] == index[rev - 1]:
+            return None
+        data = self._data
+        # merge revision
+
+        def jumps():
+            for idx in xrange(index[rev - 1], index[rev]):
+                i = idx * 3
+                yield tuple(data[i:i + 3])
+        return jumps()
+
+    def _setjumps(self, rev, jumps):
+        assert len(self._index) == rev, (len(self._index), rev)
+        if rev == 0:
+            self._index.append(0)
+            return
+        end = self._index[rev - 1]
+        if jumps is None:
+            self._index.append(end)
+            return
+        assert len(self._data) == end * 3, (len(self._data), end)
+        for j in jumps:
+            self._data.append(j[0])
+            self._data.append(j[1])
+            self._data.append(j[2])
+            end += 1
+        self._index.append(end)
+
+    def _updatefrom(self, repo, data):
+        repo = repo.unfiltered()
+
+        total = len(data)
+
+        def progress(pos, rev):
+            repo.ui.progress('updating stablesort cache',
+                             pos, 'rev %s' % rev, unit='revision', total=total)
+
+        progress(0, '')
+        for idx, rev in enumerate(data):
+            parents = repo.changelog.parentrevs(rev)
+            if parents[1] == nodemod.nullrev:
+                self._setjumps(rev, None)
+            else:
+                # merge! warn the cache
+                tiebreaker = _mergepoint_tie_breaker(repo)
+                minparent = sorted(parents, key=tiebreaker)[0]
+                for r in self.walkfrom(repo, rev):
+                    if r == minparent:
+                        break
+            if not (idx % 1000): # progress as a too high performance impact
+                progress(idx, rev)
+        progress(None, '')
+
+    def clear(self, reset=False):
+        super(ondiskstablesortcache, self).clear()
+        self._index = array.array('l')
+        self._data = array.array('l')
+
+    def load(self, repo):
+        """load data from disk
+
+        (crude version, read everything all the time)
+        """
+        assert repo.filtername is None
+
+        data = repo.cachevfs.tryread(self._filepath)
+        self._index = array.array('l')
+        self._data = array.array('l')
+        if not data:
+            self._cachekey = self.emptykey
+        else:
+            headerdata = data[:self._cachekeysize]
+            self._cachekey = self._deserializecachekey(headerdata)
+            offset = self._cachekeysize
+            indexsizedata = data[offset:offset + S_INDEXSIZE.size]
+            indexsize = S_INDEXSIZE.unpack(indexsizedata)[0]
+            offset += S_INDEXSIZE.size
+            self._index.fromstring(data[offset:offset + indexsize])
+            offset += indexsize
+            self._data.fromstring(data[offset:])
+        self._ondiskkey = self._cachekey
+        pass
+
+    def save(self, repo):
+        """save the data to disk
+
+        (crude version, rewrite everything all the time)
+        """
+        if self._cachekey is None or self._cachekey == self._ondiskkey:
+            return
+        cachefile = repo.cachevfs(self._filepath, 'w', atomictemp=True)
+
+        # data to write
+        headerdata = self._serializecachekey()
+        indexdata = self._index.tostring()
+        data = self._data.tostring()
+        indexsize = S_INDEXSIZE.pack(len(indexdata))
+
+        # writing
+        cachefile.write(headerdata)
+        cachefile.write(indexsize)
+        cachefile.write(indexdata)
+        cachefile.write(data)
+        cachefile.close()
+
 _methodmap = {
     'branchpoint': stablesort_branchpoint,
     'basic-mergepoint': stablesort_mergepoint_multirevs,
     'basic-headstart': stablesort_mergepoint_head_basic,
     'headstart': stablesort_mergepoint_head_debug,
     'headcached': stablesort_mergepoint_head_cached,
+    'headondisk': stablesort_mergepoint_head_ondisk,
 }