util: create new abstraction for compression engines
authorGregory Szorc <gregory.szorc@gmail.com>
Mon, 07 Nov 2016 18:31:39 -0800
changeset 30350 358cda0af6ee
parent 30349 954002426f78
child 30351 f81002f736d7
util: create new abstraction for compression engines Currently, util.py has "compressors" and "decompressors" dicts mapping compression algorithms to callables returning objects that perform well-defined operations. In addition, revlog.py has code for calling into a compressor or decompressor explicitly. And, there is code in the wire protocol for performing zlib compression. The 3rd party lz4revlog extension has demonstrated the utility of supporting alternative compression formats for revlog storage. But it stops short of supporting lz4 for bundles and the wire protocol. There are also plans to support zstd as a general compression replacement. So, there appears to be a market for a unified API for registering compression engines. This commit starts the process of establishing one. This commit establishes a base class/interface for defining compression engines and how they will be used. A collection class to hold references to registered compression engines has also been introduced. The built-in zlib, bz2, truncated bz2, and no-op compression engines are registered with a singleton instance of the collection class. The compression engine API will change once consumers are ported to the new API and some common patterns can be simplified at the engine API level. So don't get too attached to the API...
mercurial/util.py
--- a/mercurial/util.py	Sun Oct 09 09:25:39 2016 -0400
+++ b/mercurial/util.py	Mon Nov 07 18:31:39 2016 -0800
@@ -2856,13 +2856,219 @@
             raise exc_val
         return received and suppressed
 
-# compression utility
+# compression code
+
+class compressormanager(object):
+    """Holds registrations of various compression engines.
+
+    This class essentially abstracts the differences between compression
+    engines to allow new compression formats to be added easily, possibly from
+    extensions.
+
+    Compressors are registered against the global instance by calling its
+    ``register()`` method.
+    """
+    def __init__(self):
+        self._engines = {}
+        # Bundle spec human name to engine name.
+        self._bundlenames = {}
+        # Internal bundle identifier to engine name.
+        self._bundletypes = {}
+
+    def __getitem__(self, key):
+        return self._engines[key]
+
+    def __contains__(self, key):
+        return key in self._engines
+
+    def __iter__(self):
+        return iter(self._engines.keys())
+
+    def register(self, engine):
+        """Register a compression engine with the manager.
+
+        The argument must be a ``compressionengine`` instance.
+        """
+        if not isinstance(engine, compressionengine):
+            raise ValueError(_('argument must be a compressionengine'))
+
+        name = engine.name()
+
+        if name in self._engines:
+            raise error.Abort(_('compression engine %s already registered') %
+                              name)
+
+        bundleinfo = engine.bundletype()
+        if bundleinfo:
+            bundlename, bundletype = bundleinfo
+
+            if bundlename in self._bundlenames:
+                raise error.Abort(_('bundle name %s already registered') %
+                                  bundlename)
+            if bundletype in self._bundletypes:
+                raise error.Abort(_('bundle type %s already registered by %s') %
+                                  (bundletype, self._bundletypes[bundletype]))
+
+            # No external facing name declared.
+            if bundlename:
+                self._bundlenames[bundlename] = name
+
+            self._bundletypes[bundletype] = name
+
+        self._engines[name] = engine
+
+    @property
+    def supportedbundlenames(self):
+        return set(self._bundlenames.keys())
+
+    @property
+    def supportedbundletypes(self):
+        return set(self._bundletypes.keys())
+
+    def forbundlename(self, bundlename):
+        """Obtain a compression engine registered to a bundle name.
+
+        Will raise KeyError if the bundle type isn't registered.
+        """
+        return self._engines[self._bundlenames[bundlename]]
+
+    def forbundletype(self, bundletype):
+        """Obtain a compression engine registered to a bundle type.
+
+        Will raise KeyError if the bundle type isn't registered.
+        """
+        return self._engines[self._bundletypes[bundletype]]
+
+compengines = compressormanager()
+
+class compressionengine(object):
+    """Base class for compression engines.
+
+    Compression engines must implement the interface defined by this class.
+    """
+    def name(self):
+        """Returns the name of the compression engine.
+
+        This is the key the engine is registered under.
+
+        This method must be implemented.
+        """
+        raise NotImplementedError()
+
+    def bundletype(self):
+        """Describes bundle identifiers for this engine.
+
+        If this compression engine isn't supported for bundles, returns None.
+
+        If this engine can be used for bundles, returns a 2-tuple of strings of
+        the user-facing "bundle spec" compression name and an internal
+        identifier used to denote the compression format within bundles. To
+        exclude the name from external usage, set the first element to ``None``.
+
+        If bundle compression is supported, the class must also implement
+        ``compressorobj`` and `decompressorreader``.
+        """
+        return None
+
+    def compressorobj(self):
+        """(Temporary) Obtain an object used for compression.
+
+        The returned object has ``compress(data)`` and ``flush()`` methods.
+        These are used to incrementally feed data chunks into a compressor.
+        """
+        raise NotImplementedError()
+
+    def decompressorreader(self, fh):
+        """Perform decompression on a file object.
+
+        Argument is an object with a ``read(size)`` method that returns
+        compressed data. Return value is an object with a ``read(size)`` that
+        returns uncompressed data.
+        """
+        raise NotImplementedError()
+
+class _zlibengine(compressionengine):
+    def name(self):
+        return 'zlib'
+
+    def bundletype(self):
+        return 'gzip', 'GZ'
+
+    def compressorobj(self):
+        return zlib.compressobj()
+
+    def decompressorreader(self, fh):
+        def gen():
+            d = zlib.decompressobj()
+            for chunk in filechunkiter(fh):
+                yield d.decompress(chunk)
+
+        return chunkbuffer(gen())
+
+compengines.register(_zlibengine())
+
+class _bz2engine(compressionengine):
+    def name(self):
+        return 'bz2'
+
+    def bundletype(self):
+        return 'bzip2', 'BZ'
+
+    def compressorobj(self):
+        return bz2.BZ2Compressor()
+
+    def decompressorreader(self, fh):
+        def gen():
+            d = bz2.BZ2Decompressor()
+            for chunk in filechunkiter(fh):
+                yield d.decompress(chunk)
+
+        return chunkbuffer(gen())
+
+compengines.register(_bz2engine())
+
+class _truncatedbz2engine(compressionengine):
+    def name(self):
+        return 'bz2truncated'
+
+    def bundletype(self):
+        return None, '_truncatedBZ'
+
+    # We don't implement compressorobj because it is hackily handled elsewhere.
+
+    def decompressorreader(self, fh):
+        def gen():
+            # The input stream doesn't have the 'BZ' header. So add it back.
+            d = bz2.BZ2Decompressor()
+            d.decompress('BZ')
+            for chunk in filechunkiter(fh):
+                yield d.decompress(chunk)
+
+        return chunkbuffer(gen())
+
+compengines.register(_truncatedbz2engine())
 
 class nocompress(object):
     def compress(self, x):
         return x
+
     def flush(self):
-        return ""
+        return ''
+
+class _noopengine(compressionengine):
+    def name(self):
+        return 'none'
+
+    def bundletype(self):
+        return 'none', 'UN'
+
+    def compressorobj(self):
+        return nocompress()
+
+    def decompressorreader(self, fh):
+        return fh
+
+compengines.register(_noopengine())
 
 compressors = {
     None: nocompress,