contrib/python-zstandard/tests/test_decompressor_fuzzing.py
changeset 37495 b1fb341d8a61
parent 31796 e0dc40530c5a
child 42070 675775c33ab6
equal deleted inserted replaced
37494:1ce7a55b09d1 37495:b1fb341d8a61
     1 import io
     1 import io
     2 import os
     2 import os
     3 
     3 import unittest
     4 try:
       
     5     import unittest2 as unittest
       
     6 except ImportError:
       
     7     import unittest
       
     8 
     4 
     9 try:
     5 try:
    10     import hypothesis
     6     import hypothesis
    11     import hypothesis.strategies as strategies
     7     import hypothesis.strategies as strategies
    12 except ImportError:
     8 except ImportError:
    13     raise unittest.SkipTest('hypothesis not available')
     9     raise unittest.SkipTest('hypothesis not available')
    14 
    10 
    15 import zstd
    11 import zstandard as zstd
    16 
    12 
    17 from . common import (
    13 from . common import (
    18     make_cffi,
    14     make_cffi,
    19     random_input_data,
    15     random_input_data,
    20 )
    16 )
    21 
    17 
    22 
    18 
    23 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
    19 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
    24 @make_cffi
    20 @make_cffi
    25 class TestDecompressor_write_to_fuzzing(unittest.TestCase):
    21 class TestDecompressor_stream_reader_fuzzing(unittest.TestCase):
       
    22     @hypothesis.settings(
       
    23         suppress_health_check=[hypothesis.HealthCheck.large_base_example])
       
    24     @hypothesis.given(original=strategies.sampled_from(random_input_data()),
       
    25                       level=strategies.integers(min_value=1, max_value=5),
       
    26                       source_read_size=strategies.integers(1, 16384),
       
    27                       read_sizes=strategies.data())
       
    28     def test_stream_source_read_variance(self, original, level, source_read_size,
       
    29                                          read_sizes):
       
    30         cctx = zstd.ZstdCompressor(level=level)
       
    31         frame = cctx.compress(original)
       
    32 
       
    33         dctx = zstd.ZstdDecompressor()
       
    34         source = io.BytesIO(frame)
       
    35 
       
    36         chunks = []
       
    37         with dctx.stream_reader(source, read_size=source_read_size) as reader:
       
    38             while True:
       
    39                 read_size = read_sizes.draw(strategies.integers(1, 16384))
       
    40                 chunk = reader.read(read_size)
       
    41                 if not chunk:
       
    42                     break
       
    43 
       
    44                 chunks.append(chunk)
       
    45 
       
    46         self.assertEqual(b''.join(chunks), original)
       
    47 
       
    48     @hypothesis.settings(
       
    49         suppress_health_check=[hypothesis.HealthCheck.large_base_example])
       
    50     @hypothesis.given(original=strategies.sampled_from(random_input_data()),
       
    51                       level=strategies.integers(min_value=1, max_value=5),
       
    52                       source_read_size=strategies.integers(1, 16384),
       
    53                       read_sizes=strategies.data())
       
    54     def test_buffer_source_read_variance(self, original, level, source_read_size,
       
    55                                          read_sizes):
       
    56         cctx = zstd.ZstdCompressor(level=level)
       
    57         frame = cctx.compress(original)
       
    58 
       
    59         dctx = zstd.ZstdDecompressor()
       
    60         chunks = []
       
    61 
       
    62         with dctx.stream_reader(frame, read_size=source_read_size) as reader:
       
    63             while True:
       
    64                 read_size = read_sizes.draw(strategies.integers(1, 16384))
       
    65                 chunk = reader.read(read_size)
       
    66                 if not chunk:
       
    67                     break
       
    68 
       
    69                 chunks.append(chunk)
       
    70 
       
    71         self.assertEqual(b''.join(chunks), original)
       
    72 
       
    73     @hypothesis.settings(
       
    74         suppress_health_check=[hypothesis.HealthCheck.large_base_example])
       
    75     @hypothesis.given(
       
    76         original=strategies.sampled_from(random_input_data()),
       
    77         level=strategies.integers(min_value=1, max_value=5),
       
    78         source_read_size=strategies.integers(1, 16384),
       
    79         seek_amounts=strategies.data(),
       
    80         read_sizes=strategies.data())
       
    81     def test_relative_seeks(self, original, level, source_read_size, seek_amounts,
       
    82                             read_sizes):
       
    83         cctx = zstd.ZstdCompressor(level=level)
       
    84         frame = cctx.compress(original)
       
    85 
       
    86         dctx = zstd.ZstdDecompressor()
       
    87 
       
    88         with dctx.stream_reader(frame, read_size=source_read_size) as reader:
       
    89             while True:
       
    90                 amount = seek_amounts.draw(strategies.integers(0, 16384))
       
    91                 reader.seek(amount, os.SEEK_CUR)
       
    92 
       
    93                 offset = reader.tell()
       
    94                 read_amount = read_sizes.draw(strategies.integers(1, 16384))
       
    95                 chunk = reader.read(read_amount)
       
    96 
       
    97                 if not chunk:
       
    98                     break
       
    99 
       
   100                 self.assertEqual(original[offset:offset + len(chunk)], chunk)
       
   101 
       
   102 
       
   103 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
       
   104 @make_cffi
       
   105 class TestDecompressor_stream_writer_fuzzing(unittest.TestCase):
    26     @hypothesis.given(original=strategies.sampled_from(random_input_data()),
   106     @hypothesis.given(original=strategies.sampled_from(random_input_data()),
    27                       level=strategies.integers(min_value=1, max_value=5),
   107                       level=strategies.integers(min_value=1, max_value=5),
    28                       write_size=strategies.integers(min_value=1, max_value=8192),
   108                       write_size=strategies.integers(min_value=1, max_value=8192),
    29                       input_sizes=strategies.streaming(
   109                       input_sizes=strategies.data())
    30                           strategies.integers(min_value=1, max_value=4096)))
       
    31     def test_write_size_variance(self, original, level, write_size, input_sizes):
   110     def test_write_size_variance(self, original, level, write_size, input_sizes):
    32         input_sizes = iter(input_sizes)
       
    33 
       
    34         cctx = zstd.ZstdCompressor(level=level)
   111         cctx = zstd.ZstdCompressor(level=level)
    35         frame = cctx.compress(original)
   112         frame = cctx.compress(original)
    36 
   113 
    37         dctx = zstd.ZstdDecompressor()
   114         dctx = zstd.ZstdDecompressor()
    38         source = io.BytesIO(frame)
   115         source = io.BytesIO(frame)
    39         dest = io.BytesIO()
   116         dest = io.BytesIO()
    40 
   117 
    41         with dctx.write_to(dest, write_size=write_size) as decompressor:
   118         with dctx.stream_writer(dest, write_size=write_size) as decompressor:
    42             while True:
   119             while True:
    43                 chunk = source.read(next(input_sizes))
   120                 input_size = input_sizes.draw(strategies.integers(1, 4096))
       
   121                 chunk = source.read(input_size)
    44                 if not chunk:
   122                 if not chunk:
    45                     break
   123                     break
    46 
   124 
    47                 decompressor.write(chunk)
   125                 decompressor.write(chunk)
    48 
   126 
    72 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
   150 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
    73 @make_cffi
   151 @make_cffi
    74 class TestDecompressor_decompressobj_fuzzing(unittest.TestCase):
   152 class TestDecompressor_decompressobj_fuzzing(unittest.TestCase):
    75     @hypothesis.given(original=strategies.sampled_from(random_input_data()),
   153     @hypothesis.given(original=strategies.sampled_from(random_input_data()),
    76                       level=strategies.integers(min_value=1, max_value=5),
   154                       level=strategies.integers(min_value=1, max_value=5),
    77                       chunk_sizes=strategies.streaming(
   155                       chunk_sizes=strategies.data())
    78                           strategies.integers(min_value=1, max_value=4096)))
       
    79     def test_random_input_sizes(self, original, level, chunk_sizes):
   156     def test_random_input_sizes(self, original, level, chunk_sizes):
    80         chunk_sizes = iter(chunk_sizes)
       
    81 
       
    82         cctx = zstd.ZstdCompressor(level=level)
   157         cctx = zstd.ZstdCompressor(level=level)
    83         frame = cctx.compress(original)
   158         frame = cctx.compress(original)
    84 
   159 
    85         source = io.BytesIO(frame)
   160         source = io.BytesIO(frame)
    86 
   161 
    87         dctx = zstd.ZstdDecompressor()
   162         dctx = zstd.ZstdDecompressor()
    88         dobj = dctx.decompressobj()
   163         dobj = dctx.decompressobj()
    89 
   164 
    90         chunks = []
   165         chunks = []
    91         while True:
   166         while True:
    92             chunk = source.read(next(chunk_sizes))
   167             chunk_size = chunk_sizes.draw(strategies.integers(1, 4096))
       
   168             chunk = source.read(chunk_size)
    93             if not chunk:
   169             if not chunk:
    94                 break
   170                 break
    95 
   171 
    96             chunks.append(dobj.decompress(chunk))
   172             chunks.append(dobj.decompress(chunk))
    97 
   173 
    98         self.assertEqual(b''.join(chunks), original)
   174         self.assertEqual(b''.join(chunks), original)
    99 
   175 
   100 
   176     @hypothesis.given(original=strategies.sampled_from(random_input_data()),
   101 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
   177                       level=strategies.integers(min_value=1, max_value=5),
   102 @make_cffi
   178                       write_size=strategies.integers(min_value=1,
   103 class TestDecompressor_read_from_fuzzing(unittest.TestCase):
   179                                                      max_value=4 * zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE),
       
   180                       chunk_sizes=strategies.data())
       
   181     def test_random_output_sizes(self, original, level, write_size, chunk_sizes):
       
   182         cctx = zstd.ZstdCompressor(level=level)
       
   183         frame = cctx.compress(original)
       
   184 
       
   185         source = io.BytesIO(frame)
       
   186 
       
   187         dctx = zstd.ZstdDecompressor()
       
   188         dobj = dctx.decompressobj(write_size=write_size)
       
   189 
       
   190         chunks = []
       
   191         while True:
       
   192             chunk_size = chunk_sizes.draw(strategies.integers(1, 4096))
       
   193             chunk = source.read(chunk_size)
       
   194             if not chunk:
       
   195                 break
       
   196 
       
   197             chunks.append(dobj.decompress(chunk))
       
   198 
       
   199         self.assertEqual(b''.join(chunks), original)
       
   200 
       
   201 
       
   202 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
       
   203 @make_cffi
       
   204 class TestDecompressor_read_to_iter_fuzzing(unittest.TestCase):
   104     @hypothesis.given(original=strategies.sampled_from(random_input_data()),
   205     @hypothesis.given(original=strategies.sampled_from(random_input_data()),
   105                       level=strategies.integers(min_value=1, max_value=5),
   206                       level=strategies.integers(min_value=1, max_value=5),
   106                       read_size=strategies.integers(min_value=1, max_value=4096),
   207                       read_size=strategies.integers(min_value=1, max_value=4096),
   107                       write_size=strategies.integers(min_value=1, max_value=4096))
   208                       write_size=strategies.integers(min_value=1, max_value=4096))
   108     def test_read_write_size_variance(self, original, level, read_size, write_size):
   209     def test_read_write_size_variance(self, original, level, read_size, write_size):
   110         frame = cctx.compress(original)
   211         frame = cctx.compress(original)
   111 
   212 
   112         source = io.BytesIO(frame)
   213         source = io.BytesIO(frame)
   113 
   214 
   114         dctx = zstd.ZstdDecompressor()
   215         dctx = zstd.ZstdDecompressor()
   115         chunks = list(dctx.read_from(source, read_size=read_size, write_size=write_size))
   216         chunks = list(dctx.read_to_iter(source, read_size=read_size, write_size=write_size))
   116 
   217 
   117         self.assertEqual(b''.join(chunks), original)
   218         self.assertEqual(b''.join(chunks), original)
   118 
   219 
   119 
   220 
   120 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
   221 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')