283 action, meta = sendframe(reactor, |
293 action, meta = sendframe(reactor, |
284 ffs(b'1 2 0 stream-settings eos %s' % data[3:])) |
294 ffs(b'1 2 0 stream-settings eos %s' % data[3:])) |
285 |
295 |
286 self.assertEqual(action, b'noop') |
296 self.assertEqual(action, b'noop') |
287 self.assertEqual(meta, {}) |
297 self.assertEqual(meta, {}) |
|
298 |
|
299 def testinvalidencoder(self): |
|
300 reactor = framing.clientreactor(globalui, buffersends=False) |
|
301 |
|
302 request, action, meta = reactor.callcommand(b'foo', {}) |
|
303 for f in meta[b'framegen']: |
|
304 pass |
|
305 |
|
306 action, meta = sendframe(reactor, |
|
307 ffs(b'1 2 stream-begin stream-settings eos cbor:b"badvalue"')) |
|
308 |
|
309 self.assertEqual(action, b'error') |
|
310 self.assertEqual(meta, { |
|
311 b'message': b'error setting stream decoder: unknown stream ' |
|
312 b'decoder: badvalue', |
|
313 }) |
|
314 |
|
315 def testzlibencoding(self): |
|
316 reactor = framing.clientreactor(globalui, buffersends=False) |
|
317 |
|
318 request, action, meta = reactor.callcommand(b'foo', {}) |
|
319 for f in meta[b'framegen']: |
|
320 pass |
|
321 |
|
322 action, meta = sendframe(reactor, |
|
323 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' % |
|
324 request.requestid)) |
|
325 |
|
326 self.assertEqual(action, b'noop') |
|
327 self.assertEqual(meta, {}) |
|
328 |
|
329 result = { |
|
330 b'status': b'ok', |
|
331 } |
|
332 encoded = b''.join(cborutil.streamencode(result)) |
|
333 |
|
334 compressed = zlib.compress(encoded) |
|
335 self.assertEqual(zlib.decompress(compressed), encoded) |
|
336 |
|
337 action, meta = sendframe(reactor, |
|
338 ffs(b'%d 2 encoded command-response eos %s' % |
|
339 (request.requestid, compressed))) |
|
340 |
|
341 self.assertEqual(action, b'responsedata') |
|
342 self.assertEqual(meta[b'data'], encoded) |
|
343 |
|
344 def testzlibencodingsinglebyteframes(self): |
|
345 reactor = framing.clientreactor(globalui, buffersends=False) |
|
346 |
|
347 request, action, meta = reactor.callcommand(b'foo', {}) |
|
348 for f in meta[b'framegen']: |
|
349 pass |
|
350 |
|
351 action, meta = sendframe(reactor, |
|
352 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' % |
|
353 request.requestid)) |
|
354 |
|
355 self.assertEqual(action, b'noop') |
|
356 self.assertEqual(meta, {}) |
|
357 |
|
358 result = { |
|
359 b'status': b'ok', |
|
360 } |
|
361 encoded = b''.join(cborutil.streamencode(result)) |
|
362 |
|
363 compressed = zlib.compress(encoded) |
|
364 self.assertEqual(zlib.decompress(compressed), encoded) |
|
365 |
|
366 chunks = [] |
|
367 |
|
368 for i in range(len(compressed)): |
|
369 char = compressed[i:i + 1] |
|
370 if char == b'\\': |
|
371 char = b'\\\\' |
|
372 action, meta = sendframe(reactor, |
|
373 ffs(b'%d 2 encoded command-response continuation %s' % |
|
374 (request.requestid, char))) |
|
375 |
|
376 self.assertEqual(action, b'responsedata') |
|
377 chunks.append(meta[b'data']) |
|
378 self.assertTrue(meta[b'expectmore']) |
|
379 self.assertFalse(meta[b'eos']) |
|
380 |
|
381 # zlib will have the full data decoded at this point, even though |
|
382 # we haven't flushed. |
|
383 self.assertEqual(b''.join(chunks), encoded) |
|
384 |
|
385 # End the stream for good measure. |
|
386 action, meta = sendframe(reactor, |
|
387 ffs(b'%d 2 stream-end command-response eos ' % request.requestid)) |
|
388 |
|
389 self.assertEqual(action, b'responsedata') |
|
390 self.assertEqual(meta[b'data'], b'') |
|
391 self.assertFalse(meta[b'expectmore']) |
|
392 self.assertTrue(meta[b'eos']) |
|
393 |
|
394 def testzlibmultipleresponses(self): |
|
395 # We feed in zlib compressed data on the same stream but belonging to |
|
396 # 2 different requests. This tests our flushing behavior. |
|
397 reactor = framing.clientreactor(globalui, buffersends=False, |
|
398 hasmultiplesend=True) |
|
399 |
|
400 request1, action, meta = reactor.callcommand(b'foo', {}) |
|
401 for f in meta[b'framegen']: |
|
402 pass |
|
403 |
|
404 request2, action, meta = reactor.callcommand(b'foo', {}) |
|
405 for f in meta[b'framegen']: |
|
406 pass |
|
407 |
|
408 outstream = framing.outputstream(2) |
|
409 outstream.setencoder(globalui, b'zlib') |
|
410 |
|
411 response1 = b''.join(cborutil.streamencode({ |
|
412 b'status': b'ok', |
|
413 b'extra': b'response1' * 10, |
|
414 })) |
|
415 |
|
416 response2 = b''.join(cborutil.streamencode({ |
|
417 b'status': b'error', |
|
418 b'extra': b'response2' * 10, |
|
419 })) |
|
420 |
|
421 action, meta = sendframe(reactor, |
|
422 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' % |
|
423 request1.requestid)) |
|
424 |
|
425 self.assertEqual(action, b'noop') |
|
426 self.assertEqual(meta, {}) |
|
427 |
|
428 # Feeding partial data in won't get anything useful out. |
|
429 action, meta = sendframe(reactor, |
|
430 ffs(b'%d 2 encoded command-response continuation %s' % ( |
|
431 request1.requestid, outstream.encode(response1)))) |
|
432 self.assertEqual(action, b'responsedata') |
|
433 self.assertEqual(meta[b'data'], b'') |
|
434 |
|
435 # But flushing data at both ends will get our original data. |
|
436 action, meta = sendframe(reactor, |
|
437 ffs(b'%d 2 encoded command-response eos %s' % ( |
|
438 request1.requestid, outstream.flush()))) |
|
439 self.assertEqual(action, b'responsedata') |
|
440 self.assertEqual(meta[b'data'], response1) |
|
441 |
|
442 # We should be able to reuse the compressor/decompressor for the |
|
443 # 2nd response. |
|
444 action, meta = sendframe(reactor, |
|
445 ffs(b'%d 2 encoded command-response continuation %s' % ( |
|
446 request2.requestid, outstream.encode(response2)))) |
|
447 self.assertEqual(action, b'responsedata') |
|
448 self.assertEqual(meta[b'data'], b'') |
|
449 |
|
450 action, meta = sendframe(reactor, |
|
451 ffs(b'%d 2 encoded command-response eos %s' % ( |
|
452 request2.requestid, outstream.flush()))) |
|
453 self.assertEqual(action, b'responsedata') |
|
454 self.assertEqual(meta[b'data'], response2) |
|
455 |
|
456 @unittest.skipUnless(zstd, 'zstd not available') |
|
457 def testzstd8mbencoding(self): |
|
458 reactor = framing.clientreactor(globalui, buffersends=False) |
|
459 |
|
460 request, action, meta = reactor.callcommand(b'foo', {}) |
|
461 for f in meta[b'framegen']: |
|
462 pass |
|
463 |
|
464 action, meta = sendframe(reactor, |
|
465 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' % |
|
466 request.requestid)) |
|
467 |
|
468 self.assertEqual(action, b'noop') |
|
469 self.assertEqual(meta, {}) |
|
470 |
|
471 result = { |
|
472 b'status': b'ok', |
|
473 } |
|
474 encoded = b''.join(cborutil.streamencode(result)) |
|
475 |
|
476 encoder = framing.zstd8mbencoder(globalui) |
|
477 compressed = encoder.encode(encoded) + encoder.finish() |
|
478 self.assertEqual(zstd.ZstdDecompressor().decompress( |
|
479 compressed, max_output_size=len(encoded)), encoded) |
|
480 |
|
481 action, meta = sendframe(reactor, |
|
482 ffs(b'%d 2 encoded command-response eos %s' % |
|
483 (request.requestid, compressed))) |
|
484 |
|
485 self.assertEqual(action, b'responsedata') |
|
486 self.assertEqual(meta[b'data'], encoded) |
|
487 |
|
488 @unittest.skipUnless(zstd, 'zstd not available') |
|
489 def testzstd8mbencodingsinglebyteframes(self): |
|
490 reactor = framing.clientreactor(globalui, buffersends=False) |
|
491 |
|
492 request, action, meta = reactor.callcommand(b'foo', {}) |
|
493 for f in meta[b'framegen']: |
|
494 pass |
|
495 |
|
496 action, meta = sendframe(reactor, |
|
497 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' % |
|
498 request.requestid)) |
|
499 |
|
500 self.assertEqual(action, b'noop') |
|
501 self.assertEqual(meta, {}) |
|
502 |
|
503 result = { |
|
504 b'status': b'ok', |
|
505 } |
|
506 encoded = b''.join(cborutil.streamencode(result)) |
|
507 |
|
508 compressed = zstd.ZstdCompressor().compress(encoded) |
|
509 self.assertEqual(zstd.ZstdDecompressor().decompress(compressed), |
|
510 encoded) |
|
511 |
|
512 chunks = [] |
|
513 |
|
514 for i in range(len(compressed)): |
|
515 char = compressed[i:i + 1] |
|
516 if char == b'\\': |
|
517 char = b'\\\\' |
|
518 action, meta = sendframe(reactor, |
|
519 ffs(b'%d 2 encoded command-response continuation %s' % |
|
520 (request.requestid, char))) |
|
521 |
|
522 self.assertEqual(action, b'responsedata') |
|
523 chunks.append(meta[b'data']) |
|
524 self.assertTrue(meta[b'expectmore']) |
|
525 self.assertFalse(meta[b'eos']) |
|
526 |
|
527 # zstd decompressor will flush at frame boundaries. |
|
528 self.assertEqual(b''.join(chunks), encoded) |
|
529 |
|
530 # End the stream for good measure. |
|
531 action, meta = sendframe(reactor, |
|
532 ffs(b'%d 2 stream-end command-response eos ' % request.requestid)) |
|
533 |
|
534 self.assertEqual(action, b'responsedata') |
|
535 self.assertEqual(meta[b'data'], b'') |
|
536 self.assertFalse(meta[b'expectmore']) |
|
537 self.assertTrue(meta[b'eos']) |
|
538 |
|
539 @unittest.skipUnless(zstd, 'zstd not available') |
|
540 def testzstd8mbmultipleresponses(self): |
|
541 # We feed in zstd compressed data on the same stream but belonging to |
|
542 # 2 different requests. This tests our flushing behavior. |
|
543 reactor = framing.clientreactor(globalui, buffersends=False, |
|
544 hasmultiplesend=True) |
|
545 |
|
546 request1, action, meta = reactor.callcommand(b'foo', {}) |
|
547 for f in meta[b'framegen']: |
|
548 pass |
|
549 |
|
550 request2, action, meta = reactor.callcommand(b'foo', {}) |
|
551 for f in meta[b'framegen']: |
|
552 pass |
|
553 |
|
554 outstream = framing.outputstream(2) |
|
555 outstream.setencoder(globalui, b'zstd-8mb') |
|
556 |
|
557 response1 = b''.join(cborutil.streamencode({ |
|
558 b'status': b'ok', |
|
559 b'extra': b'response1' * 10, |
|
560 })) |
|
561 |
|
562 response2 = b''.join(cborutil.streamencode({ |
|
563 b'status': b'error', |
|
564 b'extra': b'response2' * 10, |
|
565 })) |
|
566 |
|
567 action, meta = sendframe(reactor, |
|
568 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' % |
|
569 request1.requestid)) |
|
570 |
|
571 self.assertEqual(action, b'noop') |
|
572 self.assertEqual(meta, {}) |
|
573 |
|
574 # Feeding partial data in won't get anything useful out. |
|
575 action, meta = sendframe(reactor, |
|
576 ffs(b'%d 2 encoded command-response continuation %s' % ( |
|
577 request1.requestid, outstream.encode(response1)))) |
|
578 self.assertEqual(action, b'responsedata') |
|
579 self.assertEqual(meta[b'data'], b'') |
|
580 |
|
581 # But flushing data at both ends will get our original data. |
|
582 action, meta = sendframe(reactor, |
|
583 ffs(b'%d 2 encoded command-response eos %s' % ( |
|
584 request1.requestid, outstream.flush()))) |
|
585 self.assertEqual(action, b'responsedata') |
|
586 self.assertEqual(meta[b'data'], response1) |
|
587 |
|
588 # We should be able to reuse the compressor/decompressor for the |
|
589 # 2nd response. |
|
590 action, meta = sendframe(reactor, |
|
591 ffs(b'%d 2 encoded command-response continuation %s' % ( |
|
592 request2.requestid, outstream.encode(response2)))) |
|
593 self.assertEqual(action, b'responsedata') |
|
594 self.assertEqual(meta[b'data'], b'') |
|
595 |
|
596 action, meta = sendframe(reactor, |
|
597 ffs(b'%d 2 encoded command-response eos %s' % ( |
|
598 request2.requestid, outstream.flush()))) |
|
599 self.assertEqual(action, b'responsedata') |
|
600 self.assertEqual(meta[b'data'], response2) |
288 |
601 |
289 if __name__ == '__main__': |
602 if __name__ == '__main__': |
290 import silenttestrunner |
603 import silenttestrunner |
291 silenttestrunner.main(__name__) |
604 silenttestrunner.main(__name__) |