--- a/tests/test-wireproto-clientreactor.py Thu Oct 04 17:39:16 2018 -0700
+++ b/tests/test-wireproto-clientreactor.py Mon Oct 08 17:10:59 2018 -0700
@@ -1,6 +1,7 @@
from __future__ import absolute_import
import unittest
+import zlib
from mercurial import (
error,
@@ -11,6 +12,12 @@
cborutil,
)
+try:
+ from mercurial import zstd
+ zstd.__version__
+except ImportError:
+ zstd = None
+
ffs = framing.makeframefromhumanstring
globalui = uimod.ui()
@@ -261,8 +268,11 @@
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings eos %s' % data))
- self.assertEqual(action, b'noop')
- self.assertEqual(meta, {})
+ self.assertEqual(action, b'error')
+ self.assertEqual(meta, {
+ b'message': b'error setting stream decoder: identity decoder '
+ b'received unexpected additional values',
+ })
def testmultipleframes(self):
reactor = framing.clientreactor(globalui, buffersends=False)
@@ -286,6 +296,309 @@
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
+ def testinvalidencoder(self):
+ reactor = framing.clientreactor(globalui, buffersends=False)
+
+ request, action, meta = reactor.callcommand(b'foo', {})
+ for f in meta[b'framegen']:
+ pass
+
+ action, meta = sendframe(reactor,
+ ffs(b'1 2 stream-begin stream-settings eos cbor:b"badvalue"'))
+
+ self.assertEqual(action, b'error')
+ self.assertEqual(meta, {
+ b'message': b'error setting stream decoder: unknown stream '
+ b'decoder: badvalue',
+ })
+
+ def testzlibencoding(self):
+ reactor = framing.clientreactor(globalui, buffersends=False)
+
+ request, action, meta = reactor.callcommand(b'foo', {})
+ for f in meta[b'framegen']:
+ pass
+
+ action, meta = sendframe(reactor,
+ ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
+ request.requestid))
+
+ self.assertEqual(action, b'noop')
+ self.assertEqual(meta, {})
+
+ result = {
+ b'status': b'ok',
+ }
+ encoded = b''.join(cborutil.streamencode(result))
+
+ compressed = zlib.compress(encoded)
+ self.assertEqual(zlib.decompress(compressed), encoded)
+
+ action, meta = sendframe(reactor,
+ ffs(b'%d 2 encoded command-response eos %s' %
+ (request.requestid, compressed)))
+
+ self.assertEqual(action, b'responsedata')
+ self.assertEqual(meta[b'data'], encoded)
+
+ def testzlibencodingsinglebyteframes(self):
+ reactor = framing.clientreactor(globalui, buffersends=False)
+
+ request, action, meta = reactor.callcommand(b'foo', {})
+ for f in meta[b'framegen']:
+ pass
+
+ action, meta = sendframe(reactor,
+ ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
+ request.requestid))
+
+ self.assertEqual(action, b'noop')
+ self.assertEqual(meta, {})
+
+ result = {
+ b'status': b'ok',
+ }
+ encoded = b''.join(cborutil.streamencode(result))
+
+ compressed = zlib.compress(encoded)
+ self.assertEqual(zlib.decompress(compressed), encoded)
+
+ chunks = []
+
+ for i in range(len(compressed)):
+ char = compressed[i:i + 1]
+ if char == b'\\':
+ char = b'\\\\'
+ action, meta = sendframe(reactor,
+ ffs(b'%d 2 encoded command-response continuation %s' %
+ (request.requestid, char)))
+
+ self.assertEqual(action, b'responsedata')
+ chunks.append(meta[b'data'])
+ self.assertTrue(meta[b'expectmore'])
+ self.assertFalse(meta[b'eos'])
+
+ # zlib will have the full data decoded at this point, even though
+ # we haven't flushed.
+ self.assertEqual(b''.join(chunks), encoded)
+
+ # End the stream for good measure.
+ action, meta = sendframe(reactor,
+ ffs(b'%d 2 stream-end command-response eos ' % request.requestid))
+
+ self.assertEqual(action, b'responsedata')
+ self.assertEqual(meta[b'data'], b'')
+ self.assertFalse(meta[b'expectmore'])
+ self.assertTrue(meta[b'eos'])
+
+ def testzlibmultipleresponses(self):
+ # We feed in zlib compressed data on the same stream but belonging to
+ # 2 different requests. This tests our flushing behavior.
+ reactor = framing.clientreactor(globalui, buffersends=False,
+ hasmultiplesend=True)
+
+ request1, action, meta = reactor.callcommand(b'foo', {})
+ for f in meta[b'framegen']:
+ pass
+
+ request2, action, meta = reactor.callcommand(b'foo', {})
+ for f in meta[b'framegen']:
+ pass
+
+ outstream = framing.outputstream(2)
+ outstream.setencoder(globalui, b'zlib')
+
+ response1 = b''.join(cborutil.streamencode({
+ b'status': b'ok',
+ b'extra': b'response1' * 10,
+ }))
+
+ response2 = b''.join(cborutil.streamencode({
+ b'status': b'error',
+ b'extra': b'response2' * 10,
+ }))
+
+ action, meta = sendframe(reactor,
+ ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
+ request1.requestid))
+
+ self.assertEqual(action, b'noop')
+ self.assertEqual(meta, {})
+
+ # Feeding partial data in won't get anything useful out.
+ action, meta = sendframe(reactor,
+ ffs(b'%d 2 encoded command-response continuation %s' % (
+ request1.requestid, outstream.encode(response1))))
+ self.assertEqual(action, b'responsedata')
+ self.assertEqual(meta[b'data'], b'')
+
+ # But flushing data at both ends will get our original data.
+ action, meta = sendframe(reactor,
+ ffs(b'%d 2 encoded command-response eos %s' % (
+ request1.requestid, outstream.flush())))
+ self.assertEqual(action, b'responsedata')
+ self.assertEqual(meta[b'data'], response1)
+
+ # We should be able to reuse the compressor/decompressor for the
+ # 2nd response.
+ action, meta = sendframe(reactor,
+ ffs(b'%d 2 encoded command-response continuation %s' % (
+ request2.requestid, outstream.encode(response2))))
+ self.assertEqual(action, b'responsedata')
+ self.assertEqual(meta[b'data'], b'')
+
+ action, meta = sendframe(reactor,
+ ffs(b'%d 2 encoded command-response eos %s' % (
+ request2.requestid, outstream.flush())))
+ self.assertEqual(action, b'responsedata')
+ self.assertEqual(meta[b'data'], response2)
+
+ @unittest.skipUnless(zstd, 'zstd not available')
+ def testzstd8mbencoding(self):
+ reactor = framing.clientreactor(globalui, buffersends=False)
+
+ request, action, meta = reactor.callcommand(b'foo', {})
+ for f in meta[b'framegen']:
+ pass
+
+ action, meta = sendframe(reactor,
+ ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
+ request.requestid))
+
+ self.assertEqual(action, b'noop')
+ self.assertEqual(meta, {})
+
+ result = {
+ b'status': b'ok',
+ }
+ encoded = b''.join(cborutil.streamencode(result))
+
+ encoder = framing.zstd8mbencoder(globalui)
+ compressed = encoder.encode(encoded) + encoder.finish()
+ self.assertEqual(zstd.ZstdDecompressor().decompress(
+ compressed, max_output_size=len(encoded)), encoded)
+
+ action, meta = sendframe(reactor,
+ ffs(b'%d 2 encoded command-response eos %s' %
+ (request.requestid, compressed)))
+
+ self.assertEqual(action, b'responsedata')
+ self.assertEqual(meta[b'data'], encoded)
+
+ @unittest.skipUnless(zstd, 'zstd not available')
+ def testzstd8mbencodingsinglebyteframes(self):
+ reactor = framing.clientreactor(globalui, buffersends=False)
+
+ request, action, meta = reactor.callcommand(b'foo', {})
+ for f in meta[b'framegen']:
+ pass
+
+ action, meta = sendframe(reactor,
+ ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
+ request.requestid))
+
+ self.assertEqual(action, b'noop')
+ self.assertEqual(meta, {})
+
+ result = {
+ b'status': b'ok',
+ }
+ encoded = b''.join(cborutil.streamencode(result))
+
+ compressed = zstd.ZstdCompressor().compress(encoded)
+ self.assertEqual(zstd.ZstdDecompressor().decompress(compressed),
+ encoded)
+
+ chunks = []
+
+ for i in range(len(compressed)):
+ char = compressed[i:i + 1]
+ if char == b'\\':
+ char = b'\\\\'
+ action, meta = sendframe(reactor,
+ ffs(b'%d 2 encoded command-response continuation %s' %
+ (request.requestid, char)))
+
+ self.assertEqual(action, b'responsedata')
+ chunks.append(meta[b'data'])
+ self.assertTrue(meta[b'expectmore'])
+ self.assertFalse(meta[b'eos'])
+
+ # zstd decompressor will flush at frame boundaries.
+ self.assertEqual(b''.join(chunks), encoded)
+
+ # End the stream for good measure.
+ action, meta = sendframe(reactor,
+ ffs(b'%d 2 stream-end command-response eos ' % request.requestid))
+
+ self.assertEqual(action, b'responsedata')
+ self.assertEqual(meta[b'data'], b'')
+ self.assertFalse(meta[b'expectmore'])
+ self.assertTrue(meta[b'eos'])
+
+ @unittest.skipUnless(zstd, 'zstd not available')
+ def testzstd8mbmultipleresponses(self):
+ # We feed in zstd compressed data on the same stream but belonging to
+ # 2 different requests. This tests our flushing behavior.
+ reactor = framing.clientreactor(globalui, buffersends=False,
+ hasmultiplesend=True)
+
+ request1, action, meta = reactor.callcommand(b'foo', {})
+ for f in meta[b'framegen']:
+ pass
+
+ request2, action, meta = reactor.callcommand(b'foo', {})
+ for f in meta[b'framegen']:
+ pass
+
+ outstream = framing.outputstream(2)
+ outstream.setencoder(globalui, b'zstd-8mb')
+
+ response1 = b''.join(cborutil.streamencode({
+ b'status': b'ok',
+ b'extra': b'response1' * 10,
+ }))
+
+ response2 = b''.join(cborutil.streamencode({
+ b'status': b'error',
+ b'extra': b'response2' * 10,
+ }))
+
+ action, meta = sendframe(reactor,
+ ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
+ request1.requestid))
+
+ self.assertEqual(action, b'noop')
+ self.assertEqual(meta, {})
+
+ # Feeding partial data in won't get anything useful out.
+ action, meta = sendframe(reactor,
+ ffs(b'%d 2 encoded command-response continuation %s' % (
+ request1.requestid, outstream.encode(response1))))
+ self.assertEqual(action, b'responsedata')
+ self.assertEqual(meta[b'data'], b'')
+
+ # But flushing data at both ends will get our original data.
+ action, meta = sendframe(reactor,
+ ffs(b'%d 2 encoded command-response eos %s' % (
+ request1.requestid, outstream.flush())))
+ self.assertEqual(action, b'responsedata')
+ self.assertEqual(meta[b'data'], response1)
+
+ # We should be able to reuse the compressor/decompressor for the
+ # 2nd response.
+ action, meta = sendframe(reactor,
+ ffs(b'%d 2 encoded command-response continuation %s' % (
+ request2.requestid, outstream.encode(response2))))
+ self.assertEqual(action, b'responsedata')
+ self.assertEqual(meta[b'data'], b'')
+
+ action, meta = sendframe(reactor,
+ ffs(b'%d 2 encoded command-response eos %s' % (
+ request2.requestid, outstream.flush())))
+ self.assertEqual(action, b'responsedata')
+ self.assertEqual(meta[b'data'], response2)
+
if __name__ == '__main__':
import silenttestrunner
silenttestrunner.main(__name__)