stablerange: introduce ondisk caching through sqlite
authorPierre-Yves David <pierre-yves.david@ens-lyon.org>
Fri, 24 Mar 2017 16:05:28 +0100
changeset 2237 98e0369b548b
parent 2236 c0e2ba85e76a
child 2238 aac765e84de3
stablerange: introduce ondisk caching through sqlite There is many short cut and limitation with this first version of a cache but this allow use to envision some actual usage of the stablerange thingy so let us got for it.
hgext3rd/evolve/stablerange.py
--- a/hgext3rd/evolve/stablerange.py	Fri Mar 24 18:41:55 2017 +0100
+++ b/hgext3rd/evolve/stablerange.py	Fri Mar 24 16:05:28 2017 +0100
@@ -10,13 +10,16 @@
 import collections
 import heapq
 import math
+import sqlite3
 
 from mercurial import (
     commands,
     cmdutil,
+    error,
     localrepo,
     node as nodemod,
     scmutil,
+    util,
 )
 
 from mercurial.i18n import _
@@ -661,6 +664,213 @@
         result.append(top)
         return result
 
+#############################
+### simple sqlite caching ###
+#############################
+
+_sqliteschema = [
+    """CREATE TABLE meta(schemaversion INTEGER NOT NULL,
+                         tiprev        INTEGER NOT NULL,
+                         tipnode       BLOB    NOT NULL
+                        );""",
+    "CREATE TABLE depth(rev INTEGER NOT NULL PRIMARY KEY, depth INTEGER NOT NULL);",
+    """CREATE TABLE range(rev INTEGER  NOT NULL,
+                          idx INTEGER NOT NULL,
+                          PRIMARY KEY(rev, idx));""",
+    """CREATE TABLE subranges(listidx INTEGER NOT NULL,
+                              suprev  INTEGER NOT NULL,
+                              supidx  INTEGER NOT NULL,
+                              subrev  INTEGER NOT NULL,
+                              subidx  INTEGER NOT NULL,
+                              PRIMARY KEY(listidx, suprev, supidx),
+                              FOREIGN KEY (suprev, supidx) REFERENCES range(rev, idx),
+                              FOREIGN KEY (subrev, subidx) REFERENCES range(rev, idx)
+    );""",
+    "CREATE INDEX subrange_index ON subranges (suprev, supidx);",
+    "CREATE INDEX depth_index ON depth (rev);"
+]
+_newmeta = "INSERT INTO meta (schemaversion, tiprev, tipnode) VALUES (?,?,?);"
+_updatemeta = "UPDATE meta SET tiprev = ?, tipnode = ?;"
+_updatedepth = "INSERT INTO depth(rev, depth) VALUES (?,?);"
+_updaterange = "INSERT INTO range(rev, idx) VALUES (?,?);"
+_updatesubranges = """INSERT
+                       INTO subranges(listidx, suprev, supidx, subrev, subidx)
+                       VALUES (?,?,?,?,?);"""
+_queryexist = "SELECT name FROM sqlite_master WHERE type='table' AND name='meta';"
+_querymeta = "SELECT schemaversion, tiprev, tipnode FROM meta;"
+_querydepth = "SELECT depth FROM depth WHERE rev = ?;"
+_batchdepth = "SELECT rev, depth FROM depth;"
+_queryrange = "SELECT * FROM range WHERE (rev = ? AND idx = ?);"
+_querysubranges = """SELECT subrev, subidx
+                     FROM subranges
+                     WHERE (suprev = ? AND supidx = ?)
+                     ORDER BY listidx;"""
+
+class sqlstablerange(stablerange):
+
+    _schemaversion = 0
+
+    def __init__(self, repo):
+        super(sqlstablerange, self).__init__()
+        self._path = repo.vfs.join('cache/evoext_stablerange_v0.sqlite')
+        self._cl = repo.unfiltered().changelog # (okay to keep an old one)
+        self._ondisktiprev = None
+        self._ondisktipnode = None
+        self._unsaveddepth = {}
+        self._unsavedsubranges = {}
+        self._fulldepth = False
+
+    def warmup(self, repo, upto=None):
+        self._con # make sure the data base is loaded
+        try:
+            # samelessly lock the repo to ensure nobody will update the repo
+            # concurently. This should not be too much of an issue if we warm
+            # at the end of the transaction.
+            #
+            # XXX However, we lock even if we are up to date so we should check
+            # before locking
+            with repo.lock():
+                super(sqlstablerange, self).warmup(repo, upto)
+                self._save(repo)
+        except error.LockError:
+            # Exceptionnally we are noisy about it since performance impact is
+            # large We should address that before using this more widely.
+            repo.ui.warn('stable-range cache: unable to lock repo while warming\n')
+            repo.ui.warn('(cache will not be saved)\n')
+            super(sqlstablerange, self).warmup(repo, upto)
+
+    def _getdepth(self, rev):
+        cache = self._depthcache
+        if rev not in cache and rev <= self._ondisktiprev and self._con is not None:
+            value = None
+            result = self._con.execute(_querydepth, (rev,)).fetchone()
+            if result is not None:
+                value = result[0]
+            # in memory caching of the value
+            cache[rev] = value
+        return cache.get(rev)
+
+    def _setdepth(self, rev, depth):
+        assert rev not in self._unsaveddepth
+        self._unsaveddepth[rev] = depth
+        super(sqlstablerange, self)._setdepth(rev, depth)
+
+    def _getsub(self, rangeid):
+        cache = self._subrangescache
+        if rangeid not in cache and rangeid[0] <= self._ondisktiprev and self._con is not None:
+            value = None
+            result = self._con.execute(_queryrange, rangeid).fetchone()
+            if result is not None: # database know about this node (skip in the future?)
+                value = self._con.execute(_querysubranges, rangeid).fetchall()
+            # in memory caching of the value
+            cache[rangeid] = value
+        return cache.get(rangeid)
+
+    def _setsub(self, rangeid, value):
+        assert rangeid not in self._unsavedsubranges
+        self._unsavedsubranges[rangeid] = value
+        super(sqlstablerange, self)._setsub(rangeid, value)
+
+    def _inheritancepoint(self, *args, **kwargs):
+        self._loaddepth()
+        return super(sqlstablerange, self)._inheritancepoint(*args, **kwargs)
+
+    @util.propertycache
+    def _con(self):
+        con = sqlite3.connect(self._path)
+        con.text_factory = str
+        cur = con.execute(_queryexist)
+        if cur.fetchone() is None:
+            return None
+        meta = con.execute(_querymeta).fetchone()
+        if meta is None:
+            return None
+        if meta[0] != self._schemaversion:
+            return None
+        if len(self._cl) <= meta[1]:
+            return None
+        if self._cl.node(meta[1]) != meta[2]:
+            return None
+        self._ondisktiprev = meta[1]
+        self._ondisktipnode = meta[2]
+        if self._tiprev < self._ondisktiprev:
+            self._tiprev = self._ondisktiprev
+            self._tipnode = self._ondisktipnode
+        return con
+
+    def _save(self, repo):
+        repo = repo.unfiltered()
+        if not (self._unsavedsubranges or self._unsaveddepth):
+            return # no new data
+
+        if self._con is None:
+            util.unlinkpath(self._path, ignoremissing=True)
+            if '_con' in vars(self):
+                del self._con
+
+            con = sqlite3.connect(self._path)
+            con.text_factory = str
+            with con:
+                for req in _sqliteschema:
+                    con.execute(req)
+
+                meta = [self._schemaversion,
+                        self._tiprev,
+                        self._tipnode,
+                ]
+                con.execute(_newmeta, meta)
+        else:
+            con = self._con
+            meta = con.execute(_querymeta).fetchone()
+            if meta[2] != self._ondisktipnode or meta[1] != self._ondisktiprev:
+                # drifting is currently an issue because this means another
+                # process might have already added the cache line we are about
+                # to add. This will confuse sqlite
+                msg = _('stable-range cache: skipping write, '
+                        'database drifted under my feet\n')
+                hint = _('(disk: %s-%s vs mem: %s%s)\n')
+                data = (meta[2], meta[1], self._ondisktiprev, self._ondisktipnode)
+                repo.ui.warn(msg)
+                repo.ui.warn(hint % data)
+                return
+            meta = [self._tiprev,
+                    self._tipnode,
+            ]
+            con.execute(_updatemeta, meta)
+
+        self._savedepth(con, repo)
+        self._saverange(con, repo)
+        con.commit()
+        self._ondisktiprev = self._tiprev
+        self._ondisktipnode = self._tipnode
+        self._unsaveddepth.clear()
+        self._unsavedsubranges.clear()
+
+    def _savedepth(self, con, repo):
+        repo = repo.unfiltered()
+        data = self._unsaveddepth.items()
+        con.executemany(_updatedepth, data)
+
+    def _loaddepth(self):
+        """batch load all data about depth"""
+        if not (self._fulldepth or self._con is None):
+            result = self._con.execute(_batchdepth)
+            self._depthcache.update(result.fetchall())
+            self._fulldepth = True
+
+    def _saverange(self, con, repo):
+        repo = repo.unfiltered()
+        data = []
+        allranges = set()
+        for key, value in self._unsavedsubranges.items():
+            allranges.add(key)
+            for idx, sub in enumerate(value):
+                data.append((idx, key[0], key[1], sub[0], sub[1]))
+
+        con.executemany(_updaterange, allranges)
+        con.executemany(_updatesubranges, data)
+
+
 @eh.reposetup
 def setupcache(ui, repo):
 
@@ -668,7 +878,7 @@
 
         @localrepo.unfilteredpropertycache
         def stablerange(self):
-            return stablerange()
+            return sqlstablerange(repo)
 
         @localrepo.unfilteredmethod
         def destroyed(self):