util: compression APIs to support revlog decompression
authorGregory Szorc <gregory.szorc@gmail.com>
Mon, 02 Jan 2017 13:27:20 -0800
changeset 30798 f50c0db50025
parent 30797 0bde7372e4c0
child 30799 0b49449a01f4
util: compression APIs to support revlog decompression Previously, compression engines had APIs for performing revlog compression but no mechanism to perform revlog decompression. This patch changes that. Revlog decompression is slightly more complicated than compression because in the compression case there is (currently) only a single engine that can be used at a time. However for decompression, a revlog could contain chunks from multiple compression engines. This means decompression needs to map to multiple engines and decompressors. This functionality is outside the scope of this patch. But it drives the decision for engines to declare a byte header sequence that identifies revlog data as belonging to an engine and an API for obtaining an engine from a revlog header.
mercurial/util.py
--- a/mercurial/util.py	Sun Jan 08 10:08:29 2017 +0800
+++ b/mercurial/util.py	Mon Jan 02 13:27:20 2017 -0800
@@ -3000,6 +3000,8 @@
         self._bundlenames = {}
         # Internal bundle identifier to engine name.
         self._bundletypes = {}
+        # Revlog header to engine name.
+        self._revlogheaders = {}
         # Wire proto identifier to engine name.
         self._wiretypes = {}
 
@@ -3053,6 +3055,14 @@
 
             self._wiretypes[wiretype] = name
 
+        revlogheader = engine.revlogheader()
+        if revlogheader and revlogheader in self._revlogheaders:
+            raise error.Abort(_('revlog header %s already registered by %s') %
+                              (revlogheader, self._revlogheaders[revlogheader]))
+
+        if revlogheader:
+            self._revlogheaders[revlogheader] = name
+
         self._engines[name] = engine
 
     @property
@@ -3121,6 +3131,13 @@
                               engine.name())
         return engine
 
+    def forrevlogheader(self, header):
+        """Obtain a compression engine registered to a revlog header.
+
+        Will raise KeyError if the revlog header value isn't registered.
+        """
+        return self._engines[self._revlogheaders[header]]
+
 compengines = compressormanager()
 
 class compressionengine(object):
@@ -3186,6 +3203,16 @@
         """
         return None
 
+    def revlogheader(self):
+        """Header added to revlog chunks that identifies this engine.
+
+        If this engine can be used to compress revlogs, this method should
+        return the bytes used to identify chunks compressed with this engine.
+        Else, the method should return ``None`` to indicate it does not
+        participate in revlog compression.
+        """
+        return None
+
     def compressstream(self, it, opts=None):
         """Compress an iterator of chunks.
 
@@ -3215,6 +3242,13 @@
         the data could not be compressed (too small, not compressible, etc).
         The returned data should have a header uniquely identifying this
         compression format so decompression can be routed to this engine.
+        This header should be identified by the ``revlogheader()`` return
+        value.
+
+        The object has a ``decompress(data)`` method that decompresses
+        data. The method will only be called if ``data`` begins with
+        ``revlogheader()``. The method should return the raw, uncompressed
+        data or raise a ``RevlogError``.
 
         The object is reusable but is not thread safe.
         """
@@ -3230,6 +3264,9 @@
     def wireprotosupport(self):
         return compewireprotosupport('zlib', 20, 20)
 
+    def revlogheader(self):
+        return 'x'
+
     def compressstream(self, it, opts=None):
         opts = opts or {}
 
@@ -3286,6 +3323,13 @@
                     return ''.join(parts)
                 return None
 
+        def decompress(self, data):
+            try:
+                return zlib.decompress(data)
+            except zlib.error as e:
+                raise error.RevlogError(_('revlog decompress error: %s') %
+                                        str(e))
+
     def revlogcompressor(self, opts=None):
         return self.zlibrevlogcompressor()
 
@@ -3357,6 +3401,9 @@
     def wireprotosupport(self):
         return compewireprotosupport('none', 0, 10)
 
+    # We don't implement revlogheader because it is handled specially
+    # in the revlog class.
+
     def compressstream(self, it, opts=None):
         return it
 
@@ -3397,6 +3444,9 @@
     def wireprotosupport(self):
         return compewireprotosupport('zstd', 50, 50)
 
+    def revlogheader(self):
+        return '\x28'
+
     def compressstream(self, it, opts=None):
         opts = opts or {}
         # zstd level 3 is almost always significantly faster than zlib
@@ -3425,7 +3475,9 @@
             # pre-allocate a buffer to hold the result.
             self._cctx = zstd.ZstdCompressor(level=level,
                                              write_content_size=True)
+            self._dctx = zstd.ZstdDecompressor()
             self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE
+            self._decompinsize = zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
 
         def compress(self, data):
             insize = len(data)
@@ -3456,6 +3508,28 @@
                     return ''.join(chunks)
                 return None
 
+        def decompress(self, data):
+            insize = len(data)
+
+            try:
+                # This was measured to be faster than other streaming
+                # decompressors.
+                dobj = self._dctx.decompressobj()
+                chunks = []
+                pos = 0
+                while pos < insize:
+                    pos2 = pos + self._decompinsize
+                    chunk = dobj.decompress(data[pos:pos2])
+                    if chunk:
+                        chunks.append(chunk)
+                    pos = pos2
+                # Frame should be exhausted, so no finish() API.
+
+                return ''.join(chunks)
+            except Exception as e:
+                raise error.RevlogError(_('revlog decompress error: %s') %
+                                        str(e))
+
     def revlogcompressor(self, opts=None):
         opts = opts or {}
         return self.zstdrevlogcompressor(self._module,