1 import io |
|
2 |
|
3 try: |
|
4 import unittest2 as unittest |
|
5 except ImportError: |
|
6 import unittest |
|
7 |
|
8 try: |
|
9 import hypothesis |
|
10 import hypothesis.strategies as strategies |
|
11 except ImportError: |
|
12 raise unittest.SkipTest('hypothesis not available') |
|
13 |
|
14 import zstd |
|
15 |
|
16 |
|
17 compression_levels = strategies.integers(min_value=1, max_value=22) |
|
18 |
|
19 |
|
20 class TestRoundTrip(unittest.TestCase): |
|
21 @hypothesis.given(strategies.binary(), compression_levels) |
|
22 def test_compress_write_to(self, data, level): |
|
23 """Random data from compress() roundtrips via write_to.""" |
|
24 cctx = zstd.ZstdCompressor(level=level) |
|
25 compressed = cctx.compress(data) |
|
26 |
|
27 buffer = io.BytesIO() |
|
28 dctx = zstd.ZstdDecompressor() |
|
29 with dctx.write_to(buffer) as decompressor: |
|
30 decompressor.write(compressed) |
|
31 |
|
32 self.assertEqual(buffer.getvalue(), data) |
|
33 |
|
34 @hypothesis.given(strategies.binary(), compression_levels) |
|
35 def test_compressor_write_to_decompressor_write_to(self, data, level): |
|
36 """Random data from compressor write_to roundtrips via write_to.""" |
|
37 compress_buffer = io.BytesIO() |
|
38 decompressed_buffer = io.BytesIO() |
|
39 |
|
40 cctx = zstd.ZstdCompressor(level=level) |
|
41 with cctx.write_to(compress_buffer) as compressor: |
|
42 compressor.write(data) |
|
43 |
|
44 dctx = zstd.ZstdDecompressor() |
|
45 with dctx.write_to(decompressed_buffer) as decompressor: |
|
46 decompressor.write(compress_buffer.getvalue()) |
|
47 |
|
48 self.assertEqual(decompressed_buffer.getvalue(), data) |
|
49 |
|
50 @hypothesis.given(strategies.binary(average_size=1048576)) |
|
51 @hypothesis.settings(perform_health_check=False) |
|
52 def test_compressor_write_to_decompressor_write_to_larger(self, data): |
|
53 compress_buffer = io.BytesIO() |
|
54 decompressed_buffer = io.BytesIO() |
|
55 |
|
56 cctx = zstd.ZstdCompressor(level=5) |
|
57 with cctx.write_to(compress_buffer) as compressor: |
|
58 compressor.write(data) |
|
59 |
|
60 dctx = zstd.ZstdDecompressor() |
|
61 with dctx.write_to(decompressed_buffer) as decompressor: |
|
62 decompressor.write(compress_buffer.getvalue()) |
|
63 |
|
64 self.assertEqual(decompressed_buffer.getvalue(), data) |
|