tests/test-wireproto-clientreactor.py
changeset 40132 e67522413ca8
parent 40129 293835e0fff7
child 40482 07b87ee2ea75
--- a/tests/test-wireproto-clientreactor.py	Thu Oct 04 17:39:16 2018 -0700
+++ b/tests/test-wireproto-clientreactor.py	Mon Oct 08 17:10:59 2018 -0700
@@ -1,6 +1,7 @@
 from __future__ import absolute_import
 
 import unittest
+import zlib
 
 from mercurial import (
     error,
@@ -11,6 +12,12 @@
     cborutil,
 )
 
+try:
+    from mercurial import zstd
+    zstd.__version__
+except ImportError:
+    zstd = None
+
 ffs = framing.makeframefromhumanstring
 
 globalui = uimod.ui()
@@ -261,8 +268,11 @@
         action, meta = sendframe(reactor,
             ffs(b'1 2 stream-begin stream-settings eos %s' % data))
 
-        self.assertEqual(action, b'noop')
-        self.assertEqual(meta, {})
+        self.assertEqual(action, b'error')
+        self.assertEqual(meta, {
+            b'message': b'error setting stream decoder: identity decoder '
+                        b'received unexpected additional values',
+        })
 
     def testmultipleframes(self):
         reactor = framing.clientreactor(globalui, buffersends=False)
@@ -286,6 +296,309 @@
         self.assertEqual(action, b'noop')
         self.assertEqual(meta, {})
 
+    def testinvalidencoder(self):
+        reactor = framing.clientreactor(globalui, buffersends=False)
+
+        request, action, meta = reactor.callcommand(b'foo', {})
+        for f in meta[b'framegen']:
+            pass
+
+        action, meta = sendframe(reactor,
+            ffs(b'1 2 stream-begin stream-settings eos cbor:b"badvalue"'))
+
+        self.assertEqual(action, b'error')
+        self.assertEqual(meta, {
+            b'message': b'error setting stream decoder: unknown stream '
+                        b'decoder: badvalue',
+        })
+
+    def testzlibencoding(self):
+        reactor = framing.clientreactor(globalui, buffersends=False)
+
+        request, action, meta = reactor.callcommand(b'foo', {})
+        for f in meta[b'framegen']:
+            pass
+
+        action, meta = sendframe(reactor,
+            ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
+                request.requestid))
+
+        self.assertEqual(action, b'noop')
+        self.assertEqual(meta, {})
+
+        result = {
+            b'status': b'ok',
+        }
+        encoded = b''.join(cborutil.streamencode(result))
+
+        compressed = zlib.compress(encoded)
+        self.assertEqual(zlib.decompress(compressed), encoded)
+
+        action, meta = sendframe(reactor,
+            ffs(b'%d 2 encoded command-response eos %s' %
+                (request.requestid, compressed)))
+
+        self.assertEqual(action, b'responsedata')
+        self.assertEqual(meta[b'data'], encoded)
+
+    def testzlibencodingsinglebyteframes(self):
+        reactor = framing.clientreactor(globalui, buffersends=False)
+
+        request, action, meta = reactor.callcommand(b'foo', {})
+        for f in meta[b'framegen']:
+            pass
+
+        action, meta = sendframe(reactor,
+            ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
+                request.requestid))
+
+        self.assertEqual(action, b'noop')
+        self.assertEqual(meta, {})
+
+        result = {
+            b'status': b'ok',
+        }
+        encoded = b''.join(cborutil.streamencode(result))
+
+        compressed = zlib.compress(encoded)
+        self.assertEqual(zlib.decompress(compressed), encoded)
+
+        chunks = []
+
+        for i in range(len(compressed)):
+            char = compressed[i:i + 1]
+            if char == b'\\':
+                char = b'\\\\'
+            action, meta = sendframe(reactor,
+                ffs(b'%d 2 encoded command-response continuation %s' %
+                    (request.requestid, char)))
+
+            self.assertEqual(action, b'responsedata')
+            chunks.append(meta[b'data'])
+            self.assertTrue(meta[b'expectmore'])
+            self.assertFalse(meta[b'eos'])
+
+        # zlib will have the full data decoded at this point, even though
+        # we haven't flushed.
+        self.assertEqual(b''.join(chunks), encoded)
+
+        # End the stream for good measure.
+        action, meta = sendframe(reactor,
+            ffs(b'%d 2 stream-end command-response eos ' % request.requestid))
+
+        self.assertEqual(action, b'responsedata')
+        self.assertEqual(meta[b'data'], b'')
+        self.assertFalse(meta[b'expectmore'])
+        self.assertTrue(meta[b'eos'])
+
+    def testzlibmultipleresponses(self):
+        # We feed in zlib compressed data on the same stream but belonging to
+        # 2 different requests. This tests our flushing behavior.
+        reactor = framing.clientreactor(globalui, buffersends=False,
+                                        hasmultiplesend=True)
+
+        request1, action, meta = reactor.callcommand(b'foo', {})
+        for f in meta[b'framegen']:
+            pass
+
+        request2, action, meta = reactor.callcommand(b'foo', {})
+        for f in meta[b'framegen']:
+            pass
+
+        outstream = framing.outputstream(2)
+        outstream.setencoder(globalui, b'zlib')
+
+        response1 = b''.join(cborutil.streamencode({
+            b'status': b'ok',
+            b'extra': b'response1' * 10,
+        }))
+
+        response2 = b''.join(cborutil.streamencode({
+            b'status': b'error',
+            b'extra': b'response2' * 10,
+        }))
+
+        action, meta = sendframe(reactor,
+            ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
+                request1.requestid))
+
+        self.assertEqual(action, b'noop')
+        self.assertEqual(meta, {})
+
+        # Feeding partial data in won't get anything useful out.
+        action, meta = sendframe(reactor,
+            ffs(b'%d 2 encoded command-response continuation %s' % (
+                request1.requestid, outstream.encode(response1))))
+        self.assertEqual(action, b'responsedata')
+        self.assertEqual(meta[b'data'], b'')
+
+        # But flushing data at both ends will get our original data.
+        action, meta = sendframe(reactor,
+            ffs(b'%d 2 encoded command-response eos %s' % (
+                request1.requestid, outstream.flush())))
+        self.assertEqual(action, b'responsedata')
+        self.assertEqual(meta[b'data'], response1)
+
+        # We should be able to reuse the compressor/decompressor for the
+        # 2nd response.
+        action, meta = sendframe(reactor,
+            ffs(b'%d 2 encoded command-response continuation %s' % (
+                request2.requestid, outstream.encode(response2))))
+        self.assertEqual(action, b'responsedata')
+        self.assertEqual(meta[b'data'], b'')
+
+        action, meta = sendframe(reactor,
+            ffs(b'%d 2 encoded command-response eos %s' % (
+                request2.requestid, outstream.flush())))
+        self.assertEqual(action, b'responsedata')
+        self.assertEqual(meta[b'data'], response2)
+
+    @unittest.skipUnless(zstd, 'zstd not available')
+    def testzstd8mbencoding(self):
+        reactor = framing.clientreactor(globalui, buffersends=False)
+
+        request, action, meta = reactor.callcommand(b'foo', {})
+        for f in meta[b'framegen']:
+            pass
+
+        action, meta = sendframe(reactor,
+            ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
+                request.requestid))
+
+        self.assertEqual(action, b'noop')
+        self.assertEqual(meta, {})
+
+        result = {
+            b'status': b'ok',
+        }
+        encoded = b''.join(cborutil.streamencode(result))
+
+        encoder = framing.zstd8mbencoder(globalui)
+        compressed = encoder.encode(encoded) + encoder.finish()
+        self.assertEqual(zstd.ZstdDecompressor().decompress(
+            compressed, max_output_size=len(encoded)), encoded)
+
+        action, meta = sendframe(reactor,
+            ffs(b'%d 2 encoded command-response eos %s' %
+                (request.requestid, compressed)))
+
+        self.assertEqual(action, b'responsedata')
+        self.assertEqual(meta[b'data'], encoded)
+
+    @unittest.skipUnless(zstd, 'zstd not available')
+    def testzstd8mbencodingsinglebyteframes(self):
+        reactor = framing.clientreactor(globalui, buffersends=False)
+
+        request, action, meta = reactor.callcommand(b'foo', {})
+        for f in meta[b'framegen']:
+            pass
+
+        action, meta = sendframe(reactor,
+            ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
+                request.requestid))
+
+        self.assertEqual(action, b'noop')
+        self.assertEqual(meta, {})
+
+        result = {
+            b'status': b'ok',
+        }
+        encoded = b''.join(cborutil.streamencode(result))
+
+        compressed = zstd.ZstdCompressor().compress(encoded)
+        self.assertEqual(zstd.ZstdDecompressor().decompress(compressed),
+                         encoded)
+
+        chunks = []
+
+        for i in range(len(compressed)):
+            char = compressed[i:i + 1]
+            if char == b'\\':
+                char = b'\\\\'
+            action, meta = sendframe(reactor,
+                ffs(b'%d 2 encoded command-response continuation %s' %
+                    (request.requestid, char)))
+
+            self.assertEqual(action, b'responsedata')
+            chunks.append(meta[b'data'])
+            self.assertTrue(meta[b'expectmore'])
+            self.assertFalse(meta[b'eos'])
+
+        # zstd decompressor will flush at frame boundaries.
+        self.assertEqual(b''.join(chunks), encoded)
+
+        # End the stream for good measure.
+        action, meta = sendframe(reactor,
+            ffs(b'%d 2 stream-end command-response eos ' % request.requestid))
+
+        self.assertEqual(action, b'responsedata')
+        self.assertEqual(meta[b'data'], b'')
+        self.assertFalse(meta[b'expectmore'])
+        self.assertTrue(meta[b'eos'])
+
+    @unittest.skipUnless(zstd, 'zstd not available')
+    def testzstd8mbmultipleresponses(self):
+        # We feed in zstd compressed data on the same stream but belonging to
+        # 2 different requests. This tests our flushing behavior.
+        reactor = framing.clientreactor(globalui, buffersends=False,
+                                        hasmultiplesend=True)
+
+        request1, action, meta = reactor.callcommand(b'foo', {})
+        for f in meta[b'framegen']:
+            pass
+
+        request2, action, meta = reactor.callcommand(b'foo', {})
+        for f in meta[b'framegen']:
+            pass
+
+        outstream = framing.outputstream(2)
+        outstream.setencoder(globalui, b'zstd-8mb')
+
+        response1 = b''.join(cborutil.streamencode({
+            b'status': b'ok',
+            b'extra': b'response1' * 10,
+        }))
+
+        response2 = b''.join(cborutil.streamencode({
+            b'status': b'error',
+            b'extra': b'response2' * 10,
+        }))
+
+        action, meta = sendframe(reactor,
+            ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
+                request1.requestid))
+
+        self.assertEqual(action, b'noop')
+        self.assertEqual(meta, {})
+
+        # Feeding partial data in won't get anything useful out.
+        action, meta = sendframe(reactor,
+            ffs(b'%d 2 encoded command-response continuation %s' % (
+                request1.requestid, outstream.encode(response1))))
+        self.assertEqual(action, b'responsedata')
+        self.assertEqual(meta[b'data'], b'')
+
+        # But flushing data at both ends will get our original data.
+        action, meta = sendframe(reactor,
+            ffs(b'%d 2 encoded command-response eos %s' % (
+                request1.requestid, outstream.flush())))
+        self.assertEqual(action, b'responsedata')
+        self.assertEqual(meta[b'data'], response1)
+
+        # We should be able to reuse the compressor/decompressor for the
+        # 2nd response.
+        action, meta = sendframe(reactor,
+            ffs(b'%d 2 encoded command-response continuation %s' % (
+                request2.requestid, outstream.encode(response2))))
+        self.assertEqual(action, b'responsedata')
+        self.assertEqual(meta[b'data'], b'')
+
+        action, meta = sendframe(reactor,
+            ffs(b'%d 2 encoded command-response eos %s' % (
+                request2.requestid, outstream.flush())))
+        self.assertEqual(action, b'responsedata')
+        self.assertEqual(meta[b'data'], response2)
+
 if __name__ == '__main__':
     import silenttestrunner
     silenttestrunner.main(__name__)