# HG changeset patch # User Pierre-Yves David # Date 1511969932 18000 # Node ID 94788616fbebb6109c0f73273a0d51242824ee4a # Parent c42158efb64ea250fcaaa605352a98321cf035aa stablesort: implement an ondisk cache Persisting the cache on disk will be important for large repositories. diff -r c42158efb64e -r 94788616fbeb hgext3rd/evolve/stablesort.py --- a/hgext3rd/evolve/stablesort.py Wed Dec 20 23:45:11 2017 +0100 +++ b/hgext3rd/evolve/stablesort.py Wed Nov 29 10:38:52 2017 -0500 @@ -7,7 +7,9 @@ # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. +import array import collections +import struct from mercurial import ( commands, @@ -23,6 +25,7 @@ depthcache, exthelper, utility, + genericcaches, ) filterparents = utility.filterparents @@ -319,6 +322,7 @@ def __init__(self): self._jumps = {} + super(stablesortcache, self).__init__() def get(self, repo, rev, limit=None): result = [] @@ -497,10 +501,160 @@ if previous is not None: recordjump(previous, lower, size) +def stablesort_mergepoint_head_ondisk(repo, revs, limit=None): + heads = repo.revs('heads(%ld)', revs) + if not heads: + return [] + elif 2 < len(heads): + raise error.Abort('cannot use head based merging, %d heads found' + % len(heads)) + head = heads.first() + unfi = repo.unfiltered() + cache = ondiskstablesortcache() + cache.load(unfi) + cache.update(unfi) + cache.save(unfi) + return cache.get(repo, head, limit=limit) + +S_INDEXSIZE = struct.Struct('>I') + +class ondiskstablesortcache(stablesortcache, genericcaches.changelogsourcebase): + + _filepath = 'evoext-stablesortcache-00' + _cachename = 'evo-ext-stablesort' + + def __init__(self): + super(ondiskstablesortcache, self).__init__() + self._index = array.array('l') + self._data = array.array('l') + del self._jumps + + def getjumps(self, repo, rev): + if len(self._index) < rev: + msg = 'stablesortcache must be warmed before use (%d < %d)' + msg %= (len(self._index), rev) + raise error.ProgrammingError(msg) + return self._getjumps(rev) + + def _getjumps(self, rev): + # very first revision + if rev == 0: + return None + # no data yet + if len(self._index) <= rev: + return None + index = self._index + # non merge revision + if index[rev] == index[rev - 1]: + return None + data = self._data + # merge revision + + def jumps(): + for idx in xrange(index[rev - 1], index[rev]): + i = idx * 3 + yield tuple(data[i:i + 3]) + return jumps() + + def _setjumps(self, rev, jumps): + assert len(self._index) == rev, (len(self._index), rev) + if rev == 0: + self._index.append(0) + return + end = self._index[rev - 1] + if jumps is None: + self._index.append(end) + return + assert len(self._data) == end * 3, (len(self._data), end) + for j in jumps: + self._data.append(j[0]) + self._data.append(j[1]) + self._data.append(j[2]) + end += 1 + self._index.append(end) + + def _updatefrom(self, repo, data): + repo = repo.unfiltered() + + total = len(data) + + def progress(pos, rev): + repo.ui.progress('updating stablesort cache', + pos, 'rev %s' % rev, unit='revision', total=total) + + progress(0, '') + for idx, rev in enumerate(data): + parents = repo.changelog.parentrevs(rev) + if parents[1] == nodemod.nullrev: + self._setjumps(rev, None) + else: + # merge! warn the cache + tiebreaker = _mergepoint_tie_breaker(repo) + minparent = sorted(parents, key=tiebreaker)[0] + for r in self.walkfrom(repo, rev): + if r == minparent: + break + if not (idx % 1000): # progress as a too high performance impact + progress(idx, rev) + progress(None, '') + + def clear(self, reset=False): + super(ondiskstablesortcache, self).clear() + self._index = array.array('l') + self._data = array.array('l') + + def load(self, repo): + """load data from disk + + (crude version, read everything all the time) + """ + assert repo.filtername is None + + data = repo.cachevfs.tryread(self._filepath) + self._index = array.array('l') + self._data = array.array('l') + if not data: + self._cachekey = self.emptykey + else: + headerdata = data[:self._cachekeysize] + self._cachekey = self._deserializecachekey(headerdata) + offset = self._cachekeysize + indexsizedata = data[offset:offset + S_INDEXSIZE.size] + indexsize = S_INDEXSIZE.unpack(indexsizedata)[0] + offset += S_INDEXSIZE.size + self._index.fromstring(data[offset:offset + indexsize]) + offset += indexsize + self._data.fromstring(data[offset:]) + self._ondiskkey = self._cachekey + pass + + def save(self, repo): + """save the data to disk + + (crude version, rewrite everything all the time) + """ + if self._cachekey is None or self._cachekey == self._ondiskkey: + return + cachefile = repo.cachevfs(self._filepath, 'w', atomictemp=True) + + # data to write + headerdata = self._serializecachekey() + indexdata = self._index.tostring() + data = self._data.tostring() + indexsize = S_INDEXSIZE.pack(len(indexdata)) + + # writing + cachefile.write(headerdata) + cachefile.write(indexsize) + cachefile.write(indexdata) + cachefile.write(data) + cachefile.close() + _methodmap = { 'branchpoint': stablesort_branchpoint, 'basic-mergepoint': stablesort_mergepoint_multirevs, 'basic-headstart': stablesort_mergepoint_head_basic, 'headstart': stablesort_mergepoint_head_debug, 'headcached': stablesort_mergepoint_head_cached, + 'headondisk': stablesort_mergepoint_head_ondisk, }