hgext/remotefilelog/basestore.py
changeset 40495 3a333a582d7b
child 40610 13d4ad8d7801
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/hgext/remotefilelog/basestore.py	Thu Sep 27 13:03:19 2018 -0400
@@ -0,0 +1,423 @@
+from __future__ import absolute_import
+
+import errno
+import hashlib
+import os
+import shutil
+import stat
+import time
+
+from mercurial.i18n import _
+from mercurial.node import bin, hex
+from mercurial import (
+    error,
+    pycompat,
+    util,
+)
+from . import (
+    constants,
+    shallowutil,
+)
+
+class basestore(object):
+    def __init__(self, repo, path, reponame, shared=False):
+        """Creates a remotefilelog store object for the given repo name.
+
+        `path` - The file path where this store keeps its data
+        `reponame` - The name of the repo. This is used to partition data from
+        many repos.
+        `shared` - True if this store is a shared cache of data from the central
+        server, for many repos on this machine. False means this store is for
+        the local data for one repo.
+        """
+        self.repo = repo
+        self.ui = repo.ui
+        self._path = path
+        self._reponame = reponame
+        self._shared = shared
+        self._uid = os.getuid() if not pycompat.iswindows else None
+
+        self._validatecachelog = self.ui.config("remotefilelog",
+                                                "validatecachelog")
+        self._validatecache = self.ui.config("remotefilelog", "validatecache",
+                                             'on')
+        if self._validatecache not in ('on', 'strict', 'off'):
+            self._validatecache = 'on'
+        if self._validatecache == 'off':
+            self._validatecache = False
+
+        if shared:
+            shallowutil.mkstickygroupdir(self.ui, path)
+
+    def getmissing(self, keys):
+        missing = []
+        for name, node in keys:
+            filepath = self._getfilepath(name, node)
+            exists = os.path.exists(filepath)
+            if (exists and self._validatecache == 'strict' and
+                not self._validatekey(filepath, 'contains')):
+                exists = False
+            if not exists:
+                missing.append((name, node))
+
+        return missing
+
+    # BELOW THIS ARE IMPLEMENTATIONS OF REPACK SOURCE
+
+    def markledger(self, ledger, options=None):
+        if options and options.get(constants.OPTION_PACKSONLY):
+            return
+        if self._shared:
+            for filename, nodes in self._getfiles():
+                for node in nodes:
+                    ledger.markdataentry(self, filename, node)
+                    ledger.markhistoryentry(self, filename, node)
+
+    def cleanup(self, ledger):
+        ui = self.ui
+        entries = ledger.sources.get(self, [])
+        count = 0
+        for entry in entries:
+            if entry.gced or (entry.datarepacked and entry.historyrepacked):
+                ui.progress(_("cleaning up"), count, unit="files",
+                            total=len(entries))
+                path = self._getfilepath(entry.filename, entry.node)
+                util.tryunlink(path)
+            count += 1
+        ui.progress(_("cleaning up"), None)
+
+        # Clean up the repo cache directory.
+        self._cleanupdirectory(self._getrepocachepath())
+
+    # BELOW THIS ARE NON-STANDARD APIS
+
+    def _cleanupdirectory(self, rootdir):
+        """Removes the empty directories and unnecessary files within the root
+        directory recursively. Note that this method does not remove the root
+        directory itself. """
+
+        oldfiles = set()
+        otherfiles = set()
+        # osutil.listdir returns stat information which saves some rmdir/listdir
+        # syscalls.
+        for name, mode in util.osutil.listdir(rootdir):
+            if stat.S_ISDIR(mode):
+                dirpath = os.path.join(rootdir, name)
+                self._cleanupdirectory(dirpath)
+
+                # Now that the directory specified by dirpath is potentially
+                # empty, try and remove it.
+                try:
+                    os.rmdir(dirpath)
+                except OSError:
+                    pass
+
+            elif stat.S_ISREG(mode):
+                if name.endswith('_old'):
+                    oldfiles.add(name[:-4])
+                else:
+                    otherfiles.add(name)
+
+        # Remove the files which end with suffix '_old' and have no
+        # corresponding file without the suffix '_old'. See addremotefilelognode
+        # method for the generation/purpose of files with '_old' suffix.
+        for filename in oldfiles - otherfiles:
+            filepath = os.path.join(rootdir, filename + '_old')
+            util.tryunlink(filepath)
+
+    def _getfiles(self):
+        """Return a list of (filename, [node,...]) for all the revisions that
+        exist in the store.
+
+        This is useful for obtaining a list of all the contents of the store
+        when performing a repack to another store, since the store API requires
+        name+node keys and not namehash+node keys.
+        """
+        existing = {}
+        for filenamehash, node in self._listkeys():
+            existing.setdefault(filenamehash, []).append(node)
+
+        filenamemap = self._resolvefilenames(existing.keys())
+
+        for filename, sha in filenamemap.iteritems():
+            yield (filename, existing[sha])
+
+    def _resolvefilenames(self, hashes):
+        """Given a list of filename hashes that are present in the
+        remotefilelog store, return a mapping from filename->hash.
+
+        This is useful when converting remotefilelog blobs into other storage
+        formats.
+        """
+        if not hashes:
+            return {}
+
+        filenames = {}
+        missingfilename = set(hashes)
+
+        # Start with a full manifest, since it'll cover the majority of files
+        for filename in self.repo['tip'].manifest():
+            sha = hashlib.sha1(filename).digest()
+            if sha in missingfilename:
+                filenames[filename] = sha
+                missingfilename.discard(sha)
+
+        # Scan the changelog until we've found every file name
+        cl = self.repo.unfiltered().changelog
+        for rev in pycompat.xrange(len(cl) - 1, -1, -1):
+            if not missingfilename:
+                break
+            files = cl.readfiles(cl.node(rev))
+            for filename in files:
+                sha = hashlib.sha1(filename).digest()
+                if sha in missingfilename:
+                    filenames[filename] = sha
+                    missingfilename.discard(sha)
+
+        return filenames
+
+    def _getrepocachepath(self):
+        return os.path.join(
+            self._path, self._reponame) if self._shared else self._path
+
+    def _listkeys(self):
+        """List all the remotefilelog keys that exist in the store.
+
+        Returns a iterator of (filename hash, filecontent hash) tuples.
+        """
+
+        for root, dirs, files in os.walk(self._getrepocachepath()):
+            for filename in files:
+                if len(filename) != 40:
+                    continue
+                node = filename
+                if self._shared:
+                    # .../1a/85ffda..be21
+                    filenamehash = root[-41:-39] + root[-38:]
+                else:
+                    filenamehash = root[-40:]
+                yield (bin(filenamehash), bin(node))
+
+    def _getfilepath(self, name, node):
+        node = hex(node)
+        if self._shared:
+            key = shallowutil.getcachekey(self._reponame, name, node)
+        else:
+            key = shallowutil.getlocalkey(name, node)
+
+        return os.path.join(self._path, key)
+
+    def _getdata(self, name, node):
+        filepath = self._getfilepath(name, node)
+        try:
+            data = shallowutil.readfile(filepath)
+            if self._validatecache and not self._validatedata(data, filepath):
+                if self._validatecachelog:
+                    with open(self._validatecachelog, 'a+') as f:
+                        f.write("corrupt %s during read\n" % filepath)
+                os.rename(filepath, filepath + ".corrupt")
+                raise KeyError("corrupt local cache file %s" % filepath)
+        except IOError:
+            raise KeyError("no file found at %s for %s:%s" % (filepath, name,
+                                                              hex(node)))
+
+        return data
+
+    def addremotefilelognode(self, name, node, data):
+        filepath = self._getfilepath(name, node)
+
+        oldumask = os.umask(0o002)
+        try:
+            # if this node already exists, save the old version for
+            # recovery/debugging purposes.
+            if os.path.exists(filepath):
+                newfilename = filepath + '_old'
+                # newfilename can be read-only and shutil.copy will fail.
+                # Delete newfilename to avoid it
+                if os.path.exists(newfilename):
+                    shallowutil.unlinkfile(newfilename)
+                shutil.copy(filepath, newfilename)
+
+            shallowutil.mkstickygroupdir(self.ui, os.path.dirname(filepath))
+            shallowutil.writefile(filepath, data, readonly=True)
+
+            if self._validatecache:
+                if not self._validatekey(filepath, 'write'):
+                    raise error.Abort(_("local cache write was corrupted %s") %
+                                      filepath)
+        finally:
+            os.umask(oldumask)
+
+    def markrepo(self, path):
+        """Call this to add the given repo path to the store's list of
+        repositories that are using it. This is useful later when doing garbage
+        collection, since it allows us to insecpt the repos to see what nodes
+        they want to be kept alive in the store.
+        """
+        repospath = os.path.join(self._path, "repos")
+        with open(repospath, 'a') as reposfile:
+            reposfile.write(os.path.dirname(path) + "\n")
+
+        repospathstat = os.stat(repospath)
+        if repospathstat.st_uid == self._uid:
+            os.chmod(repospath, 0o0664)
+
+    def _validatekey(self, path, action):
+        with open(path, 'rb') as f:
+            data = f.read()
+
+        if self._validatedata(data, path):
+            return True
+
+        if self._validatecachelog:
+            with open(self._validatecachelog, 'a+') as f:
+                f.write("corrupt %s during %s\n" % (path, action))
+
+        os.rename(path, path + ".corrupt")
+        return False
+
+    def _validatedata(self, data, path):
+        try:
+            if len(data) > 0:
+                # see remotefilelogserver.createfileblob for the format
+                offset, size, flags = shallowutil.parsesizeflags(data)
+                if len(data) <= size:
+                    # it is truncated
+                    return False
+
+                # extract the node from the metadata
+                offset += size
+                datanode = data[offset:offset + 20]
+
+                # and compare against the path
+                if os.path.basename(path) == hex(datanode):
+                    # Content matches the intended path
+                    return True
+                return False
+        except (ValueError, RuntimeError):
+            pass
+
+        return False
+
+    def gc(self, keepkeys):
+        ui = self.ui
+        cachepath = self._path
+        _removing = _("removing unnecessary files")
+        _truncating = _("enforcing cache limit")
+
+        # prune cache
+        import Queue
+        queue = Queue.PriorityQueue()
+        originalsize = 0
+        size = 0
+        count = 0
+        removed = 0
+
+        # keep files newer than a day even if they aren't needed
+        limit = time.time() - (60 * 60 * 24)
+
+        ui.progress(_removing, count, unit="files")
+        for root, dirs, files in os.walk(cachepath):
+            for file in files:
+                if file == 'repos':
+                    continue
+
+                # Don't delete pack files
+                if '/packs/' in root:
+                    continue
+
+                ui.progress(_removing, count, unit="files")
+                path = os.path.join(root, file)
+                key = os.path.relpath(path, cachepath)
+                count += 1
+                try:
+                    pathstat = os.stat(path)
+                except OSError as e:
+                    # errno.ENOENT = no such file or directory
+                    if e.errno != errno.ENOENT:
+                        raise
+                    msg = _("warning: file %s was removed by another process\n")
+                    ui.warn(msg % path)
+                    continue
+
+                originalsize += pathstat.st_size
+
+                if key in keepkeys or pathstat.st_atime > limit:
+                    queue.put((pathstat.st_atime, path, pathstat))
+                    size += pathstat.st_size
+                else:
+                    try:
+                        shallowutil.unlinkfile(path)
+                    except OSError as e:
+                        # errno.ENOENT = no such file or directory
+                        if e.errno != errno.ENOENT:
+                            raise
+                        msg = _("warning: file %s was removed by another "
+                                "process\n")
+                        ui.warn(msg % path)
+                        continue
+                    removed += 1
+        ui.progress(_removing, None)
+
+        # remove oldest files until under limit
+        limit = ui.configbytes("remotefilelog", "cachelimit")
+        if size > limit:
+            excess = size - limit
+            removedexcess = 0
+            while queue and size > limit and size > 0:
+                ui.progress(_truncating, removedexcess, unit="bytes",
+                            total=excess)
+                atime, oldpath, oldpathstat = queue.get()
+                try:
+                    shallowutil.unlinkfile(oldpath)
+                except OSError as e:
+                    # errno.ENOENT = no such file or directory
+                    if e.errno != errno.ENOENT:
+                        raise
+                    msg = _("warning: file %s was removed by another process\n")
+                    ui.warn(msg % oldpath)
+                size -= oldpathstat.st_size
+                removed += 1
+                removedexcess += oldpathstat.st_size
+        ui.progress(_truncating, None)
+
+        ui.status(_("finished: removed %s of %s files (%0.2f GB to %0.2f GB)\n")
+                  % (removed, count,
+                     float(originalsize) / 1024.0 / 1024.0 / 1024.0,
+                     float(size) / 1024.0 / 1024.0 / 1024.0))
+
+class baseunionstore(object):
+    def __init__(self, *args, **kwargs):
+        # If one of the functions that iterates all of the stores is about to
+        # throw a KeyError, try this many times with a full refresh between
+        # attempts. A repack operation may have moved data from one store to
+        # another while we were running.
+        self.numattempts = kwargs.get('numretries', 0) + 1
+        # If not-None, call this function on every retry and if the attempts are
+        # exhausted.
+        self.retrylog = kwargs.get('retrylog', None)
+
+    def markforrefresh(self):
+        for store in self.stores:
+            if util.safehasattr(store, 'markforrefresh'):
+                store.markforrefresh()
+
+    @staticmethod
+    def retriable(fn):
+        def noop(*args):
+            pass
+        def wrapped(self, *args, **kwargs):
+            retrylog = self.retrylog or noop
+            funcname = fn.__name__
+            for i in pycompat.xrange(self.numattempts):
+                if i > 0:
+                    retrylog('re-attempting (n=%d) %s\n' % (i, funcname))
+                    self.markforrefresh()
+                try:
+                    return fn(self, *args, **kwargs)
+                except KeyError:
+                    pass
+            # retries exhausted
+            retrylog('retries exhausted in %s, raising KeyError\n' % funcname)
+            raise
+        return wrapped