util: add a stream compression API to compression engines
authorGregory Szorc <gregory.szorc@gmail.com>
Mon, 07 Nov 2016 18:57:07 -0800
changeset 30356 c86109eface7
parent 30355 c52faa621d9f
child 30357 5925bda42dbd
util: add a stream compression API to compression engines It is a common pattern throughout the code to perform compression on an iterator of chunks, yielding an iterator of compressed chunks. Let's formalize that as part of the compression engine API. The zlib and bzip2 implementations allow an optional "level" option to control the compression level. The default values are the same as what the Python modules use. This option will be used in subsequent patches.
mercurial/util.py
--- a/mercurial/util.py	Mon Nov 07 18:39:08 2016 -0800
+++ b/mercurial/util.py	Mon Nov 07 18:57:07 2016 -0800
@@ -2966,10 +2966,22 @@
         exclude the name from external usage, set the first element to ``None``.
 
         If bundle compression is supported, the class must also implement
-        ``compressorobj`` and `decompressorreader``.
+        ``compressstream``, ``compressorobj`` and `decompressorreader``.
         """
         return None
 
+    def compressstream(self, it, opts=None):
+        """Compress an iterator of chunks.
+
+        The method receives an iterator (ideally a generator) of chunks of
+        bytes to be compressed. It returns an iterator (ideally a generator)
+        of bytes of chunks representing the compressed output.
+
+        Optionally accepts an argument defining how to perform compression.
+        Each engine treats this argument differently.
+        """
+        raise NotImplementedError()
+
     def compressorobj(self):
         """(Temporary) Obtain an object used for compression.
 
@@ -2997,6 +3009,19 @@
     def compressorobj(self):
         return zlib.compressobj()
 
+    def compressstream(self, it, opts=None):
+        opts = opts or {}
+
+        z = zlib.compressobj(opts.get('level', -1))
+        for chunk in it:
+            data = z.compress(chunk)
+            # Not all calls to compress emit data. It is cheaper to inspect
+            # here than to feed empty chunks through generator.
+            if data:
+                yield data
+
+        yield z.flush()
+
     def decompressorreader(self, fh):
         def gen():
             d = zlib.decompressobj()
@@ -3017,6 +3042,16 @@
     def compressorobj(self):
         return bz2.BZ2Compressor()
 
+    def compressstream(self, it, opts=None):
+        opts = opts or {}
+        z = bz2.BZ2Compressor(opts.get('level', 9))
+        for chunk in it:
+            data = z.compress(chunk)
+            if data:
+                yield data
+
+        yield z.flush()
+
     def decompressorreader(self, fh):
         def gen():
             d = bz2.BZ2Decompressor()
@@ -3065,6 +3100,9 @@
     def compressorobj(self):
         return nocompress()
 
+    def compressstream(self, it, opts=None):
+        return it
+
     def decompressorreader(self, fh):
         return fh