tests/test-wireproto-clientreactor.py
changeset 40132 e67522413ca8
parent 40129 293835e0fff7
child 40482 07b87ee2ea75
equal deleted inserted replaced
40130:5d44c4d1d516 40132:e67522413ca8
     1 from __future__ import absolute_import
     1 from __future__ import absolute_import
     2 
     2 
     3 import unittest
     3 import unittest
       
     4 import zlib
     4 
     5 
     5 from mercurial import (
     6 from mercurial import (
     6     error,
     7     error,
     7     ui as uimod,
     8     ui as uimod,
     8     wireprotoframing as framing,
     9     wireprotoframing as framing,
     9 )
    10 )
    10 from mercurial.utils import (
    11 from mercurial.utils import (
    11     cborutil,
    12     cborutil,
    12 )
    13 )
    13 
    14 
       
    15 try:
       
    16     from mercurial import zstd
       
    17     zstd.__version__
       
    18 except ImportError:
       
    19     zstd = None
       
    20 
    14 ffs = framing.makeframefromhumanstring
    21 ffs = framing.makeframefromhumanstring
    15 
    22 
    16 globalui = uimod.ui()
    23 globalui = uimod.ui()
    17 
    24 
    18 def sendframe(reactor, frame):
    25 def sendframe(reactor, frame):
   259         ])
   266         ])
   260 
   267 
   261         action, meta = sendframe(reactor,
   268         action, meta = sendframe(reactor,
   262             ffs(b'1 2 stream-begin stream-settings eos %s' % data))
   269             ffs(b'1 2 stream-begin stream-settings eos %s' % data))
   263 
   270 
   264         self.assertEqual(action, b'noop')
   271         self.assertEqual(action, b'error')
   265         self.assertEqual(meta, {})
   272         self.assertEqual(meta, {
       
   273             b'message': b'error setting stream decoder: identity decoder '
       
   274                         b'received unexpected additional values',
       
   275         })
   266 
   276 
   267     def testmultipleframes(self):
   277     def testmultipleframes(self):
   268         reactor = framing.clientreactor(globalui, buffersends=False)
   278         reactor = framing.clientreactor(globalui, buffersends=False)
   269 
   279 
   270         request, action, meta = reactor.callcommand(b'foo', {})
   280         request, action, meta = reactor.callcommand(b'foo', {})
   283         action, meta = sendframe(reactor,
   293         action, meta = sendframe(reactor,
   284             ffs(b'1 2 0 stream-settings eos %s' % data[3:]))
   294             ffs(b'1 2 0 stream-settings eos %s' % data[3:]))
   285 
   295 
   286         self.assertEqual(action, b'noop')
   296         self.assertEqual(action, b'noop')
   287         self.assertEqual(meta, {})
   297         self.assertEqual(meta, {})
       
   298 
       
   299     def testinvalidencoder(self):
       
   300         reactor = framing.clientreactor(globalui, buffersends=False)
       
   301 
       
   302         request, action, meta = reactor.callcommand(b'foo', {})
       
   303         for f in meta[b'framegen']:
       
   304             pass
       
   305 
       
   306         action, meta = sendframe(reactor,
       
   307             ffs(b'1 2 stream-begin stream-settings eos cbor:b"badvalue"'))
       
   308 
       
   309         self.assertEqual(action, b'error')
       
   310         self.assertEqual(meta, {
       
   311             b'message': b'error setting stream decoder: unknown stream '
       
   312                         b'decoder: badvalue',
       
   313         })
       
   314 
       
   315     def testzlibencoding(self):
       
   316         reactor = framing.clientreactor(globalui, buffersends=False)
       
   317 
       
   318         request, action, meta = reactor.callcommand(b'foo', {})
       
   319         for f in meta[b'framegen']:
       
   320             pass
       
   321 
       
   322         action, meta = sendframe(reactor,
       
   323             ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
       
   324                 request.requestid))
       
   325 
       
   326         self.assertEqual(action, b'noop')
       
   327         self.assertEqual(meta, {})
       
   328 
       
   329         result = {
       
   330             b'status': b'ok',
       
   331         }
       
   332         encoded = b''.join(cborutil.streamencode(result))
       
   333 
       
   334         compressed = zlib.compress(encoded)
       
   335         self.assertEqual(zlib.decompress(compressed), encoded)
       
   336 
       
   337         action, meta = sendframe(reactor,
       
   338             ffs(b'%d 2 encoded command-response eos %s' %
       
   339                 (request.requestid, compressed)))
       
   340 
       
   341         self.assertEqual(action, b'responsedata')
       
   342         self.assertEqual(meta[b'data'], encoded)
       
   343 
       
   344     def testzlibencodingsinglebyteframes(self):
       
   345         reactor = framing.clientreactor(globalui, buffersends=False)
       
   346 
       
   347         request, action, meta = reactor.callcommand(b'foo', {})
       
   348         for f in meta[b'framegen']:
       
   349             pass
       
   350 
       
   351         action, meta = sendframe(reactor,
       
   352             ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
       
   353                 request.requestid))
       
   354 
       
   355         self.assertEqual(action, b'noop')
       
   356         self.assertEqual(meta, {})
       
   357 
       
   358         result = {
       
   359             b'status': b'ok',
       
   360         }
       
   361         encoded = b''.join(cborutil.streamencode(result))
       
   362 
       
   363         compressed = zlib.compress(encoded)
       
   364         self.assertEqual(zlib.decompress(compressed), encoded)
       
   365 
       
   366         chunks = []
       
   367 
       
   368         for i in range(len(compressed)):
       
   369             char = compressed[i:i + 1]
       
   370             if char == b'\\':
       
   371                 char = b'\\\\'
       
   372             action, meta = sendframe(reactor,
       
   373                 ffs(b'%d 2 encoded command-response continuation %s' %
       
   374                     (request.requestid, char)))
       
   375 
       
   376             self.assertEqual(action, b'responsedata')
       
   377             chunks.append(meta[b'data'])
       
   378             self.assertTrue(meta[b'expectmore'])
       
   379             self.assertFalse(meta[b'eos'])
       
   380 
       
   381         # zlib will have the full data decoded at this point, even though
       
   382         # we haven't flushed.
       
   383         self.assertEqual(b''.join(chunks), encoded)
       
   384 
       
   385         # End the stream for good measure.
       
   386         action, meta = sendframe(reactor,
       
   387             ffs(b'%d 2 stream-end command-response eos ' % request.requestid))
       
   388 
       
   389         self.assertEqual(action, b'responsedata')
       
   390         self.assertEqual(meta[b'data'], b'')
       
   391         self.assertFalse(meta[b'expectmore'])
       
   392         self.assertTrue(meta[b'eos'])
       
   393 
       
   394     def testzlibmultipleresponses(self):
       
   395         # We feed in zlib compressed data on the same stream but belonging to
       
   396         # 2 different requests. This tests our flushing behavior.
       
   397         reactor = framing.clientreactor(globalui, buffersends=False,
       
   398                                         hasmultiplesend=True)
       
   399 
       
   400         request1, action, meta = reactor.callcommand(b'foo', {})
       
   401         for f in meta[b'framegen']:
       
   402             pass
       
   403 
       
   404         request2, action, meta = reactor.callcommand(b'foo', {})
       
   405         for f in meta[b'framegen']:
       
   406             pass
       
   407 
       
   408         outstream = framing.outputstream(2)
       
   409         outstream.setencoder(globalui, b'zlib')
       
   410 
       
   411         response1 = b''.join(cborutil.streamencode({
       
   412             b'status': b'ok',
       
   413             b'extra': b'response1' * 10,
       
   414         }))
       
   415 
       
   416         response2 = b''.join(cborutil.streamencode({
       
   417             b'status': b'error',
       
   418             b'extra': b'response2' * 10,
       
   419         }))
       
   420 
       
   421         action, meta = sendframe(reactor,
       
   422             ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
       
   423                 request1.requestid))
       
   424 
       
   425         self.assertEqual(action, b'noop')
       
   426         self.assertEqual(meta, {})
       
   427 
       
   428         # Feeding partial data in won't get anything useful out.
       
   429         action, meta = sendframe(reactor,
       
   430             ffs(b'%d 2 encoded command-response continuation %s' % (
       
   431                 request1.requestid, outstream.encode(response1))))
       
   432         self.assertEqual(action, b'responsedata')
       
   433         self.assertEqual(meta[b'data'], b'')
       
   434 
       
   435         # But flushing data at both ends will get our original data.
       
   436         action, meta = sendframe(reactor,
       
   437             ffs(b'%d 2 encoded command-response eos %s' % (
       
   438                 request1.requestid, outstream.flush())))
       
   439         self.assertEqual(action, b'responsedata')
       
   440         self.assertEqual(meta[b'data'], response1)
       
   441 
       
   442         # We should be able to reuse the compressor/decompressor for the
       
   443         # 2nd response.
       
   444         action, meta = sendframe(reactor,
       
   445             ffs(b'%d 2 encoded command-response continuation %s' % (
       
   446                 request2.requestid, outstream.encode(response2))))
       
   447         self.assertEqual(action, b'responsedata')
       
   448         self.assertEqual(meta[b'data'], b'')
       
   449 
       
   450         action, meta = sendframe(reactor,
       
   451             ffs(b'%d 2 encoded command-response eos %s' % (
       
   452                 request2.requestid, outstream.flush())))
       
   453         self.assertEqual(action, b'responsedata')
       
   454         self.assertEqual(meta[b'data'], response2)
       
   455 
       
   456     @unittest.skipUnless(zstd, 'zstd not available')
       
   457     def testzstd8mbencoding(self):
       
   458         reactor = framing.clientreactor(globalui, buffersends=False)
       
   459 
       
   460         request, action, meta = reactor.callcommand(b'foo', {})
       
   461         for f in meta[b'framegen']:
       
   462             pass
       
   463 
       
   464         action, meta = sendframe(reactor,
       
   465             ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
       
   466                 request.requestid))
       
   467 
       
   468         self.assertEqual(action, b'noop')
       
   469         self.assertEqual(meta, {})
       
   470 
       
   471         result = {
       
   472             b'status': b'ok',
       
   473         }
       
   474         encoded = b''.join(cborutil.streamencode(result))
       
   475 
       
   476         encoder = framing.zstd8mbencoder(globalui)
       
   477         compressed = encoder.encode(encoded) + encoder.finish()
       
   478         self.assertEqual(zstd.ZstdDecompressor().decompress(
       
   479             compressed, max_output_size=len(encoded)), encoded)
       
   480 
       
   481         action, meta = sendframe(reactor,
       
   482             ffs(b'%d 2 encoded command-response eos %s' %
       
   483                 (request.requestid, compressed)))
       
   484 
       
   485         self.assertEqual(action, b'responsedata')
       
   486         self.assertEqual(meta[b'data'], encoded)
       
   487 
       
   488     @unittest.skipUnless(zstd, 'zstd not available')
       
   489     def testzstd8mbencodingsinglebyteframes(self):
       
   490         reactor = framing.clientreactor(globalui, buffersends=False)
       
   491 
       
   492         request, action, meta = reactor.callcommand(b'foo', {})
       
   493         for f in meta[b'framegen']:
       
   494             pass
       
   495 
       
   496         action, meta = sendframe(reactor,
       
   497             ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
       
   498                 request.requestid))
       
   499 
       
   500         self.assertEqual(action, b'noop')
       
   501         self.assertEqual(meta, {})
       
   502 
       
   503         result = {
       
   504             b'status': b'ok',
       
   505         }
       
   506         encoded = b''.join(cborutil.streamencode(result))
       
   507 
       
   508         compressed = zstd.ZstdCompressor().compress(encoded)
       
   509         self.assertEqual(zstd.ZstdDecompressor().decompress(compressed),
       
   510                          encoded)
       
   511 
       
   512         chunks = []
       
   513 
       
   514         for i in range(len(compressed)):
       
   515             char = compressed[i:i + 1]
       
   516             if char == b'\\':
       
   517                 char = b'\\\\'
       
   518             action, meta = sendframe(reactor,
       
   519                 ffs(b'%d 2 encoded command-response continuation %s' %
       
   520                     (request.requestid, char)))
       
   521 
       
   522             self.assertEqual(action, b'responsedata')
       
   523             chunks.append(meta[b'data'])
       
   524             self.assertTrue(meta[b'expectmore'])
       
   525             self.assertFalse(meta[b'eos'])
       
   526 
       
   527         # zstd decompressor will flush at frame boundaries.
       
   528         self.assertEqual(b''.join(chunks), encoded)
       
   529 
       
   530         # End the stream for good measure.
       
   531         action, meta = sendframe(reactor,
       
   532             ffs(b'%d 2 stream-end command-response eos ' % request.requestid))
       
   533 
       
   534         self.assertEqual(action, b'responsedata')
       
   535         self.assertEqual(meta[b'data'], b'')
       
   536         self.assertFalse(meta[b'expectmore'])
       
   537         self.assertTrue(meta[b'eos'])
       
   538 
       
   539     @unittest.skipUnless(zstd, 'zstd not available')
       
   540     def testzstd8mbmultipleresponses(self):
       
   541         # We feed in zstd compressed data on the same stream but belonging to
       
   542         # 2 different requests. This tests our flushing behavior.
       
   543         reactor = framing.clientreactor(globalui, buffersends=False,
       
   544                                         hasmultiplesend=True)
       
   545 
       
   546         request1, action, meta = reactor.callcommand(b'foo', {})
       
   547         for f in meta[b'framegen']:
       
   548             pass
       
   549 
       
   550         request2, action, meta = reactor.callcommand(b'foo', {})
       
   551         for f in meta[b'framegen']:
       
   552             pass
       
   553 
       
   554         outstream = framing.outputstream(2)
       
   555         outstream.setencoder(globalui, b'zstd-8mb')
       
   556 
       
   557         response1 = b''.join(cborutil.streamencode({
       
   558             b'status': b'ok',
       
   559             b'extra': b'response1' * 10,
       
   560         }))
       
   561 
       
   562         response2 = b''.join(cborutil.streamencode({
       
   563             b'status': b'error',
       
   564             b'extra': b'response2' * 10,
       
   565         }))
       
   566 
       
   567         action, meta = sendframe(reactor,
       
   568             ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
       
   569                 request1.requestid))
       
   570 
       
   571         self.assertEqual(action, b'noop')
       
   572         self.assertEqual(meta, {})
       
   573 
       
   574         # Feeding partial data in won't get anything useful out.
       
   575         action, meta = sendframe(reactor,
       
   576             ffs(b'%d 2 encoded command-response continuation %s' % (
       
   577                 request1.requestid, outstream.encode(response1))))
       
   578         self.assertEqual(action, b'responsedata')
       
   579         self.assertEqual(meta[b'data'], b'')
       
   580 
       
   581         # But flushing data at both ends will get our original data.
       
   582         action, meta = sendframe(reactor,
       
   583             ffs(b'%d 2 encoded command-response eos %s' % (
       
   584                 request1.requestid, outstream.flush())))
       
   585         self.assertEqual(action, b'responsedata')
       
   586         self.assertEqual(meta[b'data'], response1)
       
   587 
       
   588         # We should be able to reuse the compressor/decompressor for the
       
   589         # 2nd response.
       
   590         action, meta = sendframe(reactor,
       
   591             ffs(b'%d 2 encoded command-response continuation %s' % (
       
   592                 request2.requestid, outstream.encode(response2))))
       
   593         self.assertEqual(action, b'responsedata')
       
   594         self.assertEqual(meta[b'data'], b'')
       
   595 
       
   596         action, meta = sendframe(reactor,
       
   597             ffs(b'%d 2 encoded command-response eos %s' % (
       
   598                 request2.requestid, outstream.flush())))
       
   599         self.assertEqual(action, b'responsedata')
       
   600         self.assertEqual(meta[b'data'], response2)
   288 
   601 
   289 if __name__ == '__main__':
   602 if __name__ == '__main__':
   290     import silenttestrunner
   603     import silenttestrunner
   291     silenttestrunner.main(__name__)
   604     silenttestrunner.main(__name__)