1 import io |
1 import io |
2 import os |
2 import os |
3 |
3 import unittest |
4 try: |
|
5 import unittest2 as unittest |
|
6 except ImportError: |
|
7 import unittest |
|
8 |
4 |
9 try: |
5 try: |
10 import hypothesis |
6 import hypothesis |
11 import hypothesis.strategies as strategies |
7 import hypothesis.strategies as strategies |
12 except ImportError: |
8 except ImportError: |
13 raise unittest.SkipTest('hypothesis not available') |
9 raise unittest.SkipTest('hypothesis not available') |
14 |
10 |
15 import zstd |
11 import zstandard as zstd |
16 |
12 |
17 from . common import ( |
13 from . common import ( |
18 make_cffi, |
14 make_cffi, |
19 random_input_data, |
15 random_input_data, |
20 ) |
16 ) |
21 |
17 |
22 |
18 |
23 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') |
19 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') |
24 @make_cffi |
20 @make_cffi |
25 class TestDecompressor_write_to_fuzzing(unittest.TestCase): |
21 class TestDecompressor_stream_reader_fuzzing(unittest.TestCase): |
|
22 @hypothesis.settings( |
|
23 suppress_health_check=[hypothesis.HealthCheck.large_base_example]) |
|
24 @hypothesis.given(original=strategies.sampled_from(random_input_data()), |
|
25 level=strategies.integers(min_value=1, max_value=5), |
|
26 source_read_size=strategies.integers(1, 16384), |
|
27 read_sizes=strategies.data()) |
|
28 def test_stream_source_read_variance(self, original, level, source_read_size, |
|
29 read_sizes): |
|
30 cctx = zstd.ZstdCompressor(level=level) |
|
31 frame = cctx.compress(original) |
|
32 |
|
33 dctx = zstd.ZstdDecompressor() |
|
34 source = io.BytesIO(frame) |
|
35 |
|
36 chunks = [] |
|
37 with dctx.stream_reader(source, read_size=source_read_size) as reader: |
|
38 while True: |
|
39 read_size = read_sizes.draw(strategies.integers(1, 16384)) |
|
40 chunk = reader.read(read_size) |
|
41 if not chunk: |
|
42 break |
|
43 |
|
44 chunks.append(chunk) |
|
45 |
|
46 self.assertEqual(b''.join(chunks), original) |
|
47 |
|
48 @hypothesis.settings( |
|
49 suppress_health_check=[hypothesis.HealthCheck.large_base_example]) |
|
50 @hypothesis.given(original=strategies.sampled_from(random_input_data()), |
|
51 level=strategies.integers(min_value=1, max_value=5), |
|
52 source_read_size=strategies.integers(1, 16384), |
|
53 read_sizes=strategies.data()) |
|
54 def test_buffer_source_read_variance(self, original, level, source_read_size, |
|
55 read_sizes): |
|
56 cctx = zstd.ZstdCompressor(level=level) |
|
57 frame = cctx.compress(original) |
|
58 |
|
59 dctx = zstd.ZstdDecompressor() |
|
60 chunks = [] |
|
61 |
|
62 with dctx.stream_reader(frame, read_size=source_read_size) as reader: |
|
63 while True: |
|
64 read_size = read_sizes.draw(strategies.integers(1, 16384)) |
|
65 chunk = reader.read(read_size) |
|
66 if not chunk: |
|
67 break |
|
68 |
|
69 chunks.append(chunk) |
|
70 |
|
71 self.assertEqual(b''.join(chunks), original) |
|
72 |
|
73 @hypothesis.settings( |
|
74 suppress_health_check=[hypothesis.HealthCheck.large_base_example]) |
|
75 @hypothesis.given( |
|
76 original=strategies.sampled_from(random_input_data()), |
|
77 level=strategies.integers(min_value=1, max_value=5), |
|
78 source_read_size=strategies.integers(1, 16384), |
|
79 seek_amounts=strategies.data(), |
|
80 read_sizes=strategies.data()) |
|
81 def test_relative_seeks(self, original, level, source_read_size, seek_amounts, |
|
82 read_sizes): |
|
83 cctx = zstd.ZstdCompressor(level=level) |
|
84 frame = cctx.compress(original) |
|
85 |
|
86 dctx = zstd.ZstdDecompressor() |
|
87 |
|
88 with dctx.stream_reader(frame, read_size=source_read_size) as reader: |
|
89 while True: |
|
90 amount = seek_amounts.draw(strategies.integers(0, 16384)) |
|
91 reader.seek(amount, os.SEEK_CUR) |
|
92 |
|
93 offset = reader.tell() |
|
94 read_amount = read_sizes.draw(strategies.integers(1, 16384)) |
|
95 chunk = reader.read(read_amount) |
|
96 |
|
97 if not chunk: |
|
98 break |
|
99 |
|
100 self.assertEqual(original[offset:offset + len(chunk)], chunk) |
|
101 |
|
102 |
|
103 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') |
|
104 @make_cffi |
|
105 class TestDecompressor_stream_writer_fuzzing(unittest.TestCase): |
26 @hypothesis.given(original=strategies.sampled_from(random_input_data()), |
106 @hypothesis.given(original=strategies.sampled_from(random_input_data()), |
27 level=strategies.integers(min_value=1, max_value=5), |
107 level=strategies.integers(min_value=1, max_value=5), |
28 write_size=strategies.integers(min_value=1, max_value=8192), |
108 write_size=strategies.integers(min_value=1, max_value=8192), |
29 input_sizes=strategies.streaming( |
109 input_sizes=strategies.data()) |
30 strategies.integers(min_value=1, max_value=4096))) |
|
31 def test_write_size_variance(self, original, level, write_size, input_sizes): |
110 def test_write_size_variance(self, original, level, write_size, input_sizes): |
32 input_sizes = iter(input_sizes) |
|
33 |
|
34 cctx = zstd.ZstdCompressor(level=level) |
111 cctx = zstd.ZstdCompressor(level=level) |
35 frame = cctx.compress(original) |
112 frame = cctx.compress(original) |
36 |
113 |
37 dctx = zstd.ZstdDecompressor() |
114 dctx = zstd.ZstdDecompressor() |
38 source = io.BytesIO(frame) |
115 source = io.BytesIO(frame) |
39 dest = io.BytesIO() |
116 dest = io.BytesIO() |
40 |
117 |
41 with dctx.write_to(dest, write_size=write_size) as decompressor: |
118 with dctx.stream_writer(dest, write_size=write_size) as decompressor: |
42 while True: |
119 while True: |
43 chunk = source.read(next(input_sizes)) |
120 input_size = input_sizes.draw(strategies.integers(1, 4096)) |
|
121 chunk = source.read(input_size) |
44 if not chunk: |
122 if not chunk: |
45 break |
123 break |
46 |
124 |
47 decompressor.write(chunk) |
125 decompressor.write(chunk) |
48 |
126 |
72 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') |
150 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') |
73 @make_cffi |
151 @make_cffi |
74 class TestDecompressor_decompressobj_fuzzing(unittest.TestCase): |
152 class TestDecompressor_decompressobj_fuzzing(unittest.TestCase): |
75 @hypothesis.given(original=strategies.sampled_from(random_input_data()), |
153 @hypothesis.given(original=strategies.sampled_from(random_input_data()), |
76 level=strategies.integers(min_value=1, max_value=5), |
154 level=strategies.integers(min_value=1, max_value=5), |
77 chunk_sizes=strategies.streaming( |
155 chunk_sizes=strategies.data()) |
78 strategies.integers(min_value=1, max_value=4096))) |
|
79 def test_random_input_sizes(self, original, level, chunk_sizes): |
156 def test_random_input_sizes(self, original, level, chunk_sizes): |
80 chunk_sizes = iter(chunk_sizes) |
|
81 |
|
82 cctx = zstd.ZstdCompressor(level=level) |
157 cctx = zstd.ZstdCompressor(level=level) |
83 frame = cctx.compress(original) |
158 frame = cctx.compress(original) |
84 |
159 |
85 source = io.BytesIO(frame) |
160 source = io.BytesIO(frame) |
86 |
161 |
87 dctx = zstd.ZstdDecompressor() |
162 dctx = zstd.ZstdDecompressor() |
88 dobj = dctx.decompressobj() |
163 dobj = dctx.decompressobj() |
89 |
164 |
90 chunks = [] |
165 chunks = [] |
91 while True: |
166 while True: |
92 chunk = source.read(next(chunk_sizes)) |
167 chunk_size = chunk_sizes.draw(strategies.integers(1, 4096)) |
|
168 chunk = source.read(chunk_size) |
93 if not chunk: |
169 if not chunk: |
94 break |
170 break |
95 |
171 |
96 chunks.append(dobj.decompress(chunk)) |
172 chunks.append(dobj.decompress(chunk)) |
97 |
173 |
98 self.assertEqual(b''.join(chunks), original) |
174 self.assertEqual(b''.join(chunks), original) |
99 |
175 |
100 |
176 @hypothesis.given(original=strategies.sampled_from(random_input_data()), |
101 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') |
177 level=strategies.integers(min_value=1, max_value=5), |
102 @make_cffi |
178 write_size=strategies.integers(min_value=1, |
103 class TestDecompressor_read_from_fuzzing(unittest.TestCase): |
179 max_value=4 * zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE), |
|
180 chunk_sizes=strategies.data()) |
|
181 def test_random_output_sizes(self, original, level, write_size, chunk_sizes): |
|
182 cctx = zstd.ZstdCompressor(level=level) |
|
183 frame = cctx.compress(original) |
|
184 |
|
185 source = io.BytesIO(frame) |
|
186 |
|
187 dctx = zstd.ZstdDecompressor() |
|
188 dobj = dctx.decompressobj(write_size=write_size) |
|
189 |
|
190 chunks = [] |
|
191 while True: |
|
192 chunk_size = chunk_sizes.draw(strategies.integers(1, 4096)) |
|
193 chunk = source.read(chunk_size) |
|
194 if not chunk: |
|
195 break |
|
196 |
|
197 chunks.append(dobj.decompress(chunk)) |
|
198 |
|
199 self.assertEqual(b''.join(chunks), original) |
|
200 |
|
201 |
|
202 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') |
|
203 @make_cffi |
|
204 class TestDecompressor_read_to_iter_fuzzing(unittest.TestCase): |
104 @hypothesis.given(original=strategies.sampled_from(random_input_data()), |
205 @hypothesis.given(original=strategies.sampled_from(random_input_data()), |
105 level=strategies.integers(min_value=1, max_value=5), |
206 level=strategies.integers(min_value=1, max_value=5), |
106 read_size=strategies.integers(min_value=1, max_value=4096), |
207 read_size=strategies.integers(min_value=1, max_value=4096), |
107 write_size=strategies.integers(min_value=1, max_value=4096)) |
208 write_size=strategies.integers(min_value=1, max_value=4096)) |
108 def test_read_write_size_variance(self, original, level, read_size, write_size): |
209 def test_read_write_size_variance(self, original, level, read_size, write_size): |