(root)/
Python-3.11.7/
Lib/
test/
test_zlib.py
       1  import unittest
       2  from test import support
       3  from test.support import import_helper
       4  import binascii
       5  import copy
       6  import os
       7  import pickle
       8  import random
       9  import sys
      10  from test.support import bigmemtest, _1G, _4G
      11  
      12  
      13  zlib = import_helper.import_module('zlib')
      14  
      15  requires_Compress_copy = unittest.skipUnless(
      16          hasattr(zlib.compressobj(), "copy"),
      17          'requires Compress.copy()')
      18  requires_Decompress_copy = unittest.skipUnless(
      19          hasattr(zlib.decompressobj(), "copy"),
      20          'requires Decompress.copy()')
      21  
      22  # bpo-46623: On s390x, when a hardware accelerator is used, using different
      23  # ways to compress data with zlib can produce different compressed data.
      24  # Simplified test_pair() code:
      25  #
      26  #   def func1(data):
      27  #       return zlib.compress(data)
      28  #
      29  #   def func2(data)
      30  #       co = zlib.compressobj()
      31  #       x1 = co.compress(data)
      32  #       x2 = co.flush()
      33  #       return x1 + x2
      34  #
      35  # On s390x if zlib uses a hardware accelerator, func1() creates a single
      36  # "final" compressed block whereas func2() produces 3 compressed blocks (the
      37  # last one is a final block). On other platforms with no accelerator, func1()
      38  # and func2() produce the same compressed data made of a single (final)
      39  # compressed block.
      40  #
      41  # Only the compressed data is different, the decompression returns the original
      42  # data:
      43  #
      44  #   zlib.decompress(func1(data)) == zlib.decompress(func2(data)) == data
      45  #
      46  # Make the assumption that s390x always has an accelerator to simplify the skip
      47  # condition. Windows doesn't have os.uname() but it doesn't support s390x.
      48  skip_on_s390x = unittest.skipIf(hasattr(os, 'uname') and os.uname().machine == 's390x',
      49                                  'skipped on s390x')
      50  
      51  
      52  class ESC[4;38;5;81mVersionTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      53  
      54      def test_library_version(self):
      55          # Test that the major version of the actual library in use matches the
      56          # major version that we were compiled against. We can't guarantee that
      57          # the minor versions will match (even on the machine on which the module
      58          # was compiled), and the API is stable between minor versions, so
      59          # testing only the major versions avoids spurious failures.
      60          self.assertEqual(zlib.ZLIB_RUNTIME_VERSION[0], zlib.ZLIB_VERSION[0])
      61  
      62  
      63  class ESC[4;38;5;81mChecksumTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      64      # checksum test cases
      65      def test_crc32start(self):
      66          self.assertEqual(zlib.crc32(b""), zlib.crc32(b"", 0))
      67          self.assertTrue(zlib.crc32(b"abc", 0xffffffff))
      68  
      69      def test_crc32empty(self):
      70          self.assertEqual(zlib.crc32(b"", 0), 0)
      71          self.assertEqual(zlib.crc32(b"", 1), 1)
      72          self.assertEqual(zlib.crc32(b"", 432), 432)
      73  
      74      def test_adler32start(self):
      75          self.assertEqual(zlib.adler32(b""), zlib.adler32(b"", 1))
      76          self.assertTrue(zlib.adler32(b"abc", 0xffffffff))
      77  
      78      def test_adler32empty(self):
      79          self.assertEqual(zlib.adler32(b"", 0), 0)
      80          self.assertEqual(zlib.adler32(b"", 1), 1)
      81          self.assertEqual(zlib.adler32(b"", 432), 432)
      82  
      83      def test_penguins(self):
      84          self.assertEqual(zlib.crc32(b"penguin", 0), 0x0e5c1a120)
      85          self.assertEqual(zlib.crc32(b"penguin", 1), 0x43b6aa94)
      86          self.assertEqual(zlib.adler32(b"penguin", 0), 0x0bcf02f6)
      87          self.assertEqual(zlib.adler32(b"penguin", 1), 0x0bd602f7)
      88  
      89          self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0))
      90          self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1))
      91  
      92      def test_crc32_adler32_unsigned(self):
      93          foo = b'abcdefghijklmnop'
      94          # explicitly test signed behavior
      95          self.assertEqual(zlib.crc32(foo), 2486878355)
      96          self.assertEqual(zlib.crc32(b'spam'), 1138425661)
      97          self.assertEqual(zlib.adler32(foo+foo), 3573550353)
      98          self.assertEqual(zlib.adler32(b'spam'), 72286642)
      99  
     100      def test_same_as_binascii_crc32(self):
     101          foo = b'abcdefghijklmnop'
     102          crc = 2486878355
     103          self.assertEqual(binascii.crc32(foo), crc)
     104          self.assertEqual(zlib.crc32(foo), crc)
     105          self.assertEqual(binascii.crc32(b'spam'), zlib.crc32(b'spam'))
     106  
     107  
     108  # Issue #10276 - check that inputs >=4 GiB are handled correctly.
     109  class ESC[4;38;5;81mChecksumBigBufferTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     110  
     111      @bigmemtest(size=_4G + 4, memuse=1, dry_run=False)
     112      def test_big_buffer(self, size):
     113          data = b"nyan" * (_1G + 1)
     114          self.assertEqual(zlib.crc32(data), 1044521549)
     115          self.assertEqual(zlib.adler32(data), 2256789997)
     116  
     117  
     118  class ESC[4;38;5;81mExceptionTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     119      # make sure we generate some expected errors
     120      def test_badlevel(self):
     121          # specifying compression level out of range causes an error
     122          # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
     123          # accepts 0 too)
     124          self.assertRaises(zlib.error, zlib.compress, b'ERROR', 10)
     125  
     126      def test_badargs(self):
     127          self.assertRaises(TypeError, zlib.adler32)
     128          self.assertRaises(TypeError, zlib.crc32)
     129          self.assertRaises(TypeError, zlib.compress)
     130          self.assertRaises(TypeError, zlib.decompress)
     131          for arg in (42, None, '', 'abc', (), []):
     132              self.assertRaises(TypeError, zlib.adler32, arg)
     133              self.assertRaises(TypeError, zlib.crc32, arg)
     134              self.assertRaises(TypeError, zlib.compress, arg)
     135              self.assertRaises(TypeError, zlib.decompress, arg)
     136  
     137      def test_badcompressobj(self):
     138          # verify failure on building compress object with bad params
     139          self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
     140          # specifying total bits too large causes an error
     141          self.assertRaises(ValueError,
     142                  zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
     143  
     144      def test_baddecompressobj(self):
     145          # verify failure on building decompress object with bad params
     146          self.assertRaises(ValueError, zlib.decompressobj, -1)
     147  
     148      def test_decompressobj_badflush(self):
     149          # verify failure on calling decompressobj.flush with bad params
     150          self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
     151          self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
     152  
     153      @support.cpython_only
     154      def test_overflow(self):
     155          with self.assertRaisesRegex(OverflowError, 'int too large'):
     156              zlib.decompress(b'', 15, sys.maxsize + 1)
     157          with self.assertRaisesRegex(OverflowError, 'int too large'):
     158              zlib.decompressobj().decompress(b'', sys.maxsize + 1)
     159          with self.assertRaisesRegex(OverflowError, 'int too large'):
     160              zlib.decompressobj().flush(sys.maxsize + 1)
     161  
     162      @support.cpython_only
     163      def test_disallow_instantiation(self):
     164          # Ensure that the type disallows instantiation (bpo-43916)
     165          support.check_disallow_instantiation(self, type(zlib.compressobj()))
     166          support.check_disallow_instantiation(self, type(zlib.decompressobj()))
     167  
     168  
     169  class ESC[4;38;5;81mBaseCompressTestCase(ESC[4;38;5;149mobject):
     170      def check_big_compress_buffer(self, size, compress_func):
     171          _1M = 1024 * 1024
     172          # Generate 10 MiB worth of random, and expand it by repeating it.
     173          # The assumption is that zlib's memory is not big enough to exploit
     174          # such spread out redundancy.
     175          data = random.randbytes(_1M * 10)
     176          data = data * (size // len(data) + 1)
     177          try:
     178              compress_func(data)
     179          finally:
     180              # Release memory
     181              data = None
     182  
     183      def check_big_decompress_buffer(self, size, decompress_func):
     184          data = b'x' * size
     185          try:
     186              compressed = zlib.compress(data, 1)
     187          finally:
     188              # Release memory
     189              data = None
     190          data = decompress_func(compressed)
     191          # Sanity check
     192          try:
     193              self.assertEqual(len(data), size)
     194              self.assertEqual(len(data.strip(b'x')), 0)
     195          finally:
     196              data = None
     197  
     198  
     199  class ESC[4;38;5;81mCompressTestCase(ESC[4;38;5;149mBaseCompressTestCase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     200      # Test compression in one go (whole message compression)
     201      def test_speech(self):
     202          x = zlib.compress(HAMLET_SCENE)
     203          self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
     204  
     205      def test_keywords(self):
     206          x = zlib.compress(HAMLET_SCENE, level=3)
     207          self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
     208          with self.assertRaises(TypeError):
     209              zlib.compress(data=HAMLET_SCENE, level=3)
     210          self.assertEqual(zlib.decompress(x,
     211                                           wbits=zlib.MAX_WBITS,
     212                                           bufsize=zlib.DEF_BUF_SIZE),
     213                           HAMLET_SCENE)
     214  
     215      @skip_on_s390x
     216      def test_speech128(self):
     217          # compress more data
     218          data = HAMLET_SCENE * 128
     219          x = zlib.compress(data)
     220          self.assertEqual(zlib.compress(bytearray(data)), x)
     221          for ob in x, bytearray(x):
     222              self.assertEqual(zlib.decompress(ob), data)
     223  
     224      def test_incomplete_stream(self):
     225          # A useful error message is given
     226          x = zlib.compress(HAMLET_SCENE)
     227          self.assertRaisesRegex(zlib.error,
     228              "Error -5 while decompressing data: incomplete or truncated stream",
     229              zlib.decompress, x[:-1])
     230  
     231      # Memory use of the following functions takes into account overallocation
     232  
     233      @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
     234      def test_big_compress_buffer(self, size):
     235          compress = lambda s: zlib.compress(s, 1)
     236          self.check_big_compress_buffer(size, compress)
     237  
     238      @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
     239      def test_big_decompress_buffer(self, size):
     240          self.check_big_decompress_buffer(size, zlib.decompress)
     241  
     242      @bigmemtest(size=_4G, memuse=1)
     243      def test_large_bufsize(self, size):
     244          # Test decompress(bufsize) parameter greater than the internal limit
     245          data = HAMLET_SCENE * 10
     246          compressed = zlib.compress(data, 1)
     247          self.assertEqual(zlib.decompress(compressed, 15, size), data)
     248  
     249      def test_custom_bufsize(self):
     250          data = HAMLET_SCENE * 10
     251          compressed = zlib.compress(data, 1)
     252          self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data)
     253  
     254      @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
     255      @bigmemtest(size=_4G + 100, memuse=4)
     256      def test_64bit_compress(self, size):
     257          data = b'x' * size
     258          try:
     259              comp = zlib.compress(data, 0)
     260              self.assertEqual(zlib.decompress(comp), data)
     261          finally:
     262              comp = data = None
     263  
     264  
     265  class ESC[4;38;5;81mCompressObjectTestCase(ESC[4;38;5;149mBaseCompressTestCase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     266      # Test compression object
     267      @skip_on_s390x
     268      def test_pair(self):
     269          # straightforward compress/decompress objects
     270          datasrc = HAMLET_SCENE * 128
     271          datazip = zlib.compress(datasrc)
     272          # should compress both bytes and bytearray data
     273          for data in (datasrc, bytearray(datasrc)):
     274              co = zlib.compressobj()
     275              x1 = co.compress(data)
     276              x2 = co.flush()
     277              self.assertRaises(zlib.error, co.flush) # second flush should not work
     278              self.assertEqual(x1 + x2, datazip)
     279          for v1, v2 in ((x1, x2), (bytearray(x1), bytearray(x2))):
     280              dco = zlib.decompressobj()
     281              y1 = dco.decompress(v1 + v2)
     282              y2 = dco.flush()
     283              self.assertEqual(data, y1 + y2)
     284              self.assertIsInstance(dco.unconsumed_tail, bytes)
     285              self.assertIsInstance(dco.unused_data, bytes)
     286  
     287      def test_keywords(self):
     288          level = 2
     289          method = zlib.DEFLATED
     290          wbits = -12
     291          memLevel = 9
     292          strategy = zlib.Z_FILTERED
     293          co = zlib.compressobj(level=level,
     294                                method=method,
     295                                wbits=wbits,
     296                                memLevel=memLevel,
     297                                strategy=strategy,
     298                                zdict=b"")
     299          do = zlib.decompressobj(wbits=wbits, zdict=b"")
     300          with self.assertRaises(TypeError):
     301              co.compress(data=HAMLET_SCENE)
     302          with self.assertRaises(TypeError):
     303              do.decompress(data=zlib.compress(HAMLET_SCENE))
     304          x = co.compress(HAMLET_SCENE) + co.flush()
     305          y = do.decompress(x, max_length=len(HAMLET_SCENE)) + do.flush()
     306          self.assertEqual(HAMLET_SCENE, y)
     307  
     308      def test_compressoptions(self):
     309          # specify lots of options to compressobj()
     310          level = 2
     311          method = zlib.DEFLATED
     312          wbits = -12
     313          memLevel = 9
     314          strategy = zlib.Z_FILTERED
     315          co = zlib.compressobj(level, method, wbits, memLevel, strategy)
     316          x1 = co.compress(HAMLET_SCENE)
     317          x2 = co.flush()
     318          dco = zlib.decompressobj(wbits)
     319          y1 = dco.decompress(x1 + x2)
     320          y2 = dco.flush()
     321          self.assertEqual(HAMLET_SCENE, y1 + y2)
     322  
     323      def test_compressincremental(self):
     324          # compress object in steps, decompress object as one-shot
     325          data = HAMLET_SCENE * 128
     326          co = zlib.compressobj()
     327          bufs = []
     328          for i in range(0, len(data), 256):
     329              bufs.append(co.compress(data[i:i+256]))
     330          bufs.append(co.flush())
     331          combuf = b''.join(bufs)
     332  
     333          dco = zlib.decompressobj()
     334          y1 = dco.decompress(b''.join(bufs))
     335          y2 = dco.flush()
     336          self.assertEqual(data, y1 + y2)
     337  
     338      def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
     339          # compress object in steps, decompress object in steps
     340          source = source or HAMLET_SCENE
     341          data = source * 128
     342          co = zlib.compressobj()
     343          bufs = []
     344          for i in range(0, len(data), cx):
     345              bufs.append(co.compress(data[i:i+cx]))
     346          bufs.append(co.flush())
     347          combuf = b''.join(bufs)
     348  
     349          decombuf = zlib.decompress(combuf)
     350          # Test type of return value
     351          self.assertIsInstance(decombuf, bytes)
     352  
     353          self.assertEqual(data, decombuf)
     354  
     355          dco = zlib.decompressobj()
     356          bufs = []
     357          for i in range(0, len(combuf), dcx):
     358              bufs.append(dco.decompress(combuf[i:i+dcx]))
     359              self.assertEqual(b'', dco.unconsumed_tail, ########
     360                               "(A) uct should be b'': not %d long" %
     361                                         len(dco.unconsumed_tail))
     362              self.assertEqual(b'', dco.unused_data)
     363          if flush:
     364              bufs.append(dco.flush())
     365          else:
     366              while True:
     367                  chunk = dco.decompress(b'')
     368                  if chunk:
     369                      bufs.append(chunk)
     370                  else:
     371                      break
     372          self.assertEqual(b'', dco.unconsumed_tail, ########
     373                           "(B) uct should be b'': not %d long" %
     374                                         len(dco.unconsumed_tail))
     375          self.assertEqual(b'', dco.unused_data)
     376          self.assertEqual(data, b''.join(bufs))
     377          # Failure means: "decompressobj with init options failed"
     378  
     379      def test_decompincflush(self):
     380          self.test_decompinc(flush=True)
     381  
     382      def test_decompimax(self, source=None, cx=256, dcx=64):
     383          # compress in steps, decompress in length-restricted steps
     384          source = source or HAMLET_SCENE
     385          # Check a decompression object with max_length specified
     386          data = source * 128
     387          co = zlib.compressobj()
     388          bufs = []
     389          for i in range(0, len(data), cx):
     390              bufs.append(co.compress(data[i:i+cx]))
     391          bufs.append(co.flush())
     392          combuf = b''.join(bufs)
     393          self.assertEqual(data, zlib.decompress(combuf),
     394                           'compressed data failure')
     395  
     396          dco = zlib.decompressobj()
     397          bufs = []
     398          cb = combuf
     399          while cb:
     400              #max_length = 1 + len(cb)//10
     401              chunk = dco.decompress(cb, dcx)
     402              self.assertFalse(len(chunk) > dcx,
     403                      'chunk too big (%d>%d)' % (len(chunk), dcx))
     404              bufs.append(chunk)
     405              cb = dco.unconsumed_tail
     406          bufs.append(dco.flush())
     407          self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
     408  
     409      def test_decompressmaxlen(self, flush=False):
     410          # Check a decompression object with max_length specified
     411          data = HAMLET_SCENE * 128
     412          co = zlib.compressobj()
     413          bufs = []
     414          for i in range(0, len(data), 256):
     415              bufs.append(co.compress(data[i:i+256]))
     416          bufs.append(co.flush())
     417          combuf = b''.join(bufs)
     418          self.assertEqual(data, zlib.decompress(combuf),
     419                           'compressed data failure')
     420  
     421          dco = zlib.decompressobj()
     422          bufs = []
     423          cb = combuf
     424          while cb:
     425              max_length = 1 + len(cb)//10
     426              chunk = dco.decompress(cb, max_length)
     427              self.assertFalse(len(chunk) > max_length,
     428                          'chunk too big (%d>%d)' % (len(chunk),max_length))
     429              bufs.append(chunk)
     430              cb = dco.unconsumed_tail
     431          if flush:
     432              bufs.append(dco.flush())
     433          else:
     434              while chunk:
     435                  chunk = dco.decompress(b'', max_length)
     436                  self.assertFalse(len(chunk) > max_length,
     437                              'chunk too big (%d>%d)' % (len(chunk),max_length))
     438                  bufs.append(chunk)
     439          self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
     440  
     441      def test_decompressmaxlenflush(self):
     442          self.test_decompressmaxlen(flush=True)
     443  
     444      def test_maxlenmisc(self):
     445          # Misc tests of max_length
     446          dco = zlib.decompressobj()
     447          self.assertRaises(ValueError, dco.decompress, b"", -1)
     448          self.assertEqual(b'', dco.unconsumed_tail)
     449  
     450      def test_maxlen_large(self):
     451          # Sizes up to sys.maxsize should be accepted, although zlib is
     452          # internally limited to expressing sizes with unsigned int
     453          data = HAMLET_SCENE * 10
     454          self.assertGreater(len(data), zlib.DEF_BUF_SIZE)
     455          compressed = zlib.compress(data, 1)
     456          dco = zlib.decompressobj()
     457          self.assertEqual(dco.decompress(compressed, sys.maxsize), data)
     458  
     459      def test_maxlen_custom(self):
     460          data = HAMLET_SCENE * 10
     461          compressed = zlib.compress(data, 1)
     462          dco = zlib.decompressobj()
     463          self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100])
     464  
     465      def test_clear_unconsumed_tail(self):
     466          # Issue #12050: calling decompress() without providing max_length
     467          # should clear the unconsumed_tail attribute.
     468          cdata = b"x\x9cKLJ\x06\x00\x02M\x01"    # "abc"
     469          dco = zlib.decompressobj()
     470          ddata = dco.decompress(cdata, 1)
     471          ddata += dco.decompress(dco.unconsumed_tail)
     472          self.assertEqual(dco.unconsumed_tail, b"")
     473  
     474      def test_flushes(self):
     475          # Test flush() with the various options, using all the
     476          # different levels in order to provide more variations.
     477          sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH',
     478                      'Z_PARTIAL_FLUSH']
     479  
     480          ver = tuple(int(v) for v in zlib.ZLIB_RUNTIME_VERSION.split('.'))
     481          # Z_BLOCK has a known failure prior to 1.2.5.3
     482          if ver >= (1, 2, 5, 3):
     483              sync_opt.append('Z_BLOCK')
     484  
     485          sync_opt = [getattr(zlib, opt) for opt in sync_opt
     486                      if hasattr(zlib, opt)]
     487          data = HAMLET_SCENE * 8
     488  
     489          for sync in sync_opt:
     490              for level in range(10):
     491                  try:
     492                      obj = zlib.compressobj( level )
     493                      a = obj.compress( data[:3000] )
     494                      b = obj.flush( sync )
     495                      c = obj.compress( data[3000:] )
     496                      d = obj.flush()
     497                  except:
     498                      print("Error for flush mode={}, level={}"
     499                            .format(sync, level))
     500                      raise
     501                  self.assertEqual(zlib.decompress(b''.join([a,b,c,d])),
     502                                   data, ("Decompress failed: flush "
     503                                          "mode=%i, level=%i") % (sync, level))
     504                  del obj
     505  
     506      @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'),
     507                           'requires zlib.Z_SYNC_FLUSH')
     508      def test_odd_flush(self):
     509          # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
     510          import random
     511          # Testing on 17K of "random" data
     512  
     513          # Create compressor and decompressor objects
     514          co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
     515          dco = zlib.decompressobj()
     516  
     517          # Try 17K of data
     518          # generate random data stream
     519          data = random.randbytes(17 * 1024)
     520  
     521          # compress, sync-flush, and decompress
     522          first = co.compress(data)
     523          second = co.flush(zlib.Z_SYNC_FLUSH)
     524          expanded = dco.decompress(first + second)
     525  
     526          # if decompressed data is different from the input data, choke.
     527          self.assertEqual(expanded, data, "17K random source doesn't match")
     528  
     529      def test_empty_flush(self):
     530          # Test that calling .flush() on unused objects works.
     531          # (Bug #1083110 -- calling .flush() on decompress objects
     532          # caused a core dump.)
     533  
     534          co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
     535          self.assertTrue(co.flush())  # Returns a zlib header
     536          dco = zlib.decompressobj()
     537          self.assertEqual(dco.flush(), b"") # Returns nothing
     538  
     539      def test_dictionary(self):
     540          h = HAMLET_SCENE
     541          # Build a simulated dictionary out of the words in HAMLET.
     542          words = h.split()
     543          random.shuffle(words)
     544          zdict = b''.join(words)
     545          # Use it to compress HAMLET.
     546          co = zlib.compressobj(zdict=zdict)
     547          cd = co.compress(h) + co.flush()
     548          # Verify that it will decompress with the dictionary.
     549          dco = zlib.decompressobj(zdict=zdict)
     550          self.assertEqual(dco.decompress(cd) + dco.flush(), h)
     551          # Verify that it fails when not given the dictionary.
     552          dco = zlib.decompressobj()
     553          self.assertRaises(zlib.error, dco.decompress, cd)
     554  
     555      def test_dictionary_streaming(self):
     556          # This simulates the reuse of a compressor object for compressing
     557          # several separate data streams.
     558          co = zlib.compressobj(zdict=HAMLET_SCENE)
     559          do = zlib.decompressobj(zdict=HAMLET_SCENE)
     560          piece = HAMLET_SCENE[1000:1500]
     561          d0 = co.compress(piece) + co.flush(zlib.Z_SYNC_FLUSH)
     562          d1 = co.compress(piece[100:]) + co.flush(zlib.Z_SYNC_FLUSH)
     563          d2 = co.compress(piece[:-100]) + co.flush(zlib.Z_SYNC_FLUSH)
     564          self.assertEqual(do.decompress(d0), piece)
     565          self.assertEqual(do.decompress(d1), piece[100:])
     566          self.assertEqual(do.decompress(d2), piece[:-100])
     567  
     568      def test_decompress_incomplete_stream(self):
     569          # This is 'foo', deflated
     570          x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
     571          # For the record
     572          self.assertEqual(zlib.decompress(x), b'foo')
     573          self.assertRaises(zlib.error, zlib.decompress, x[:-5])
     574          # Omitting the stream end works with decompressor objects
     575          # (see issue #8672).
     576          dco = zlib.decompressobj()
     577          y = dco.decompress(x[:-5])
     578          y += dco.flush()
     579          self.assertEqual(y, b'foo')
     580  
     581      def test_decompress_eof(self):
     582          x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'  # 'foo'
     583          dco = zlib.decompressobj()
     584          self.assertFalse(dco.eof)
     585          dco.decompress(x[:-5])
     586          self.assertFalse(dco.eof)
     587          dco.decompress(x[-5:])
     588          self.assertTrue(dco.eof)
     589          dco.flush()
     590          self.assertTrue(dco.eof)
     591  
     592      def test_decompress_eof_incomplete_stream(self):
     593          x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'  # 'foo'
     594          dco = zlib.decompressobj()
     595          self.assertFalse(dco.eof)
     596          dco.decompress(x[:-5])
     597          self.assertFalse(dco.eof)
     598          dco.flush()
     599          self.assertFalse(dco.eof)
     600  
     601      def test_decompress_unused_data(self):
     602          # Repeated calls to decompress() after EOF should accumulate data in
     603          # dco.unused_data, instead of just storing the arg to the last call.
     604          source = b'abcdefghijklmnopqrstuvwxyz'
     605          remainder = b'0123456789'
     606          y = zlib.compress(source)
     607          x = y + remainder
     608          for maxlen in 0, 1000:
     609              for step in 1, 2, len(y), len(x):
     610                  dco = zlib.decompressobj()
     611                  data = b''
     612                  for i in range(0, len(x), step):
     613                      if i < len(y):
     614                          self.assertEqual(dco.unused_data, b'')
     615                      if maxlen == 0:
     616                          data += dco.decompress(x[i : i + step])
     617                          self.assertEqual(dco.unconsumed_tail, b'')
     618                      else:
     619                          data += dco.decompress(
     620                                  dco.unconsumed_tail + x[i : i + step], maxlen)
     621                  data += dco.flush()
     622                  self.assertTrue(dco.eof)
     623                  self.assertEqual(data, source)
     624                  self.assertEqual(dco.unconsumed_tail, b'')
     625                  self.assertEqual(dco.unused_data, remainder)
     626  
     627      # issue27164
     628      def test_decompress_raw_with_dictionary(self):
     629          zdict = b'abcdefghijklmnopqrstuvwxyz'
     630          co = zlib.compressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
     631          comp = co.compress(zdict) + co.flush()
     632          dco = zlib.decompressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
     633          uncomp = dco.decompress(comp) + dco.flush()
     634          self.assertEqual(zdict, uncomp)
     635  
     636      def test_flush_with_freed_input(self):
     637          # Issue #16411: decompressor accesses input to last decompress() call
     638          # in flush(), even if this object has been freed in the meanwhile.
     639          input1 = b'abcdefghijklmnopqrstuvwxyz'
     640          input2 = b'QWERTYUIOPASDFGHJKLZXCVBNM'
     641          data = zlib.compress(input1)
     642          dco = zlib.decompressobj()
     643          dco.decompress(data, 1)
     644          del data
     645          data = zlib.compress(input2)
     646          self.assertEqual(dco.flush(), input1[1:])
     647  
     648      @bigmemtest(size=_4G, memuse=1)
     649      def test_flush_large_length(self, size):
     650          # Test flush(length) parameter greater than internal limit UINT_MAX
     651          input = HAMLET_SCENE * 10
     652          data = zlib.compress(input, 1)
     653          dco = zlib.decompressobj()
     654          dco.decompress(data, 1)
     655          self.assertEqual(dco.flush(size), input[1:])
     656  
     657      def test_flush_custom_length(self):
     658          input = HAMLET_SCENE * 10
     659          data = zlib.compress(input, 1)
     660          dco = zlib.decompressobj()
     661          dco.decompress(data, 1)
     662          self.assertEqual(dco.flush(CustomInt()), input[1:])
     663  
     664      @requires_Compress_copy
     665      def test_compresscopy(self):
     666          # Test copying a compression object
     667          data0 = HAMLET_SCENE
     668          data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii")
     669          for func in lambda c: c.copy(), copy.copy, copy.deepcopy:
     670              c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
     671              bufs0 = []
     672              bufs0.append(c0.compress(data0))
     673  
     674              c1 = func(c0)
     675              bufs1 = bufs0[:]
     676  
     677              bufs0.append(c0.compress(data0))
     678              bufs0.append(c0.flush())
     679              s0 = b''.join(bufs0)
     680  
     681              bufs1.append(c1.compress(data1))
     682              bufs1.append(c1.flush())
     683              s1 = b''.join(bufs1)
     684  
     685              self.assertEqual(zlib.decompress(s0),data0+data0)
     686              self.assertEqual(zlib.decompress(s1),data0+data1)
     687  
     688      @requires_Compress_copy
     689      def test_badcompresscopy(self):
     690          # Test copying a compression object in an inconsistent state
     691          c = zlib.compressobj()
     692          c.compress(HAMLET_SCENE)
     693          c.flush()
     694          self.assertRaises(ValueError, c.copy)
     695          self.assertRaises(ValueError, copy.copy, c)
     696          self.assertRaises(ValueError, copy.deepcopy, c)
     697  
     698      @requires_Decompress_copy
     699      def test_decompresscopy(self):
     700          # Test copying a decompression object
     701          data = HAMLET_SCENE
     702          comp = zlib.compress(data)
     703          # Test type of return value
     704          self.assertIsInstance(comp, bytes)
     705  
     706          for func in lambda c: c.copy(), copy.copy, copy.deepcopy:
     707              d0 = zlib.decompressobj()
     708              bufs0 = []
     709              bufs0.append(d0.decompress(comp[:32]))
     710  
     711              d1 = func(d0)
     712              bufs1 = bufs0[:]
     713  
     714              bufs0.append(d0.decompress(comp[32:]))
     715              s0 = b''.join(bufs0)
     716  
     717              bufs1.append(d1.decompress(comp[32:]))
     718              s1 = b''.join(bufs1)
     719  
     720              self.assertEqual(s0,s1)
     721              self.assertEqual(s0,data)
     722  
     723      @requires_Decompress_copy
     724      def test_baddecompresscopy(self):
     725          # Test copying a compression object in an inconsistent state
     726          data = zlib.compress(HAMLET_SCENE)
     727          d = zlib.decompressobj()
     728          d.decompress(data)
     729          d.flush()
     730          self.assertRaises(ValueError, d.copy)
     731          self.assertRaises(ValueError, copy.copy, d)
     732          self.assertRaises(ValueError, copy.deepcopy, d)
     733  
     734      def test_compresspickle(self):
     735          for proto in range(pickle.HIGHEST_PROTOCOL + 1):
     736              with self.assertRaises((TypeError, pickle.PicklingError)):
     737                  pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto)
     738  
     739      def test_decompresspickle(self):
     740          for proto in range(pickle.HIGHEST_PROTOCOL + 1):
     741              with self.assertRaises((TypeError, pickle.PicklingError)):
     742                  pickle.dumps(zlib.decompressobj(), proto)
     743  
     744      # Memory use of the following functions takes into account overallocation
     745  
     746      @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
     747      def test_big_compress_buffer(self, size):
     748          c = zlib.compressobj(1)
     749          compress = lambda s: c.compress(s) + c.flush()
     750          self.check_big_compress_buffer(size, compress)
     751  
     752      @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
     753      def test_big_decompress_buffer(self, size):
     754          d = zlib.decompressobj()
     755          decompress = lambda s: d.decompress(s) + d.flush()
     756          self.check_big_decompress_buffer(size, decompress)
     757  
     758      @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
     759      @bigmemtest(size=_4G + 100, memuse=4)
     760      def test_64bit_compress(self, size):
     761          data = b'x' * size
     762          co = zlib.compressobj(0)
     763          do = zlib.decompressobj()
     764          try:
     765              comp = co.compress(data) + co.flush()
     766              uncomp = do.decompress(comp) + do.flush()
     767              self.assertEqual(uncomp, data)
     768          finally:
     769              comp = uncomp = data = None
     770  
     771      @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
     772      @bigmemtest(size=_4G + 100, memuse=3)
     773      def test_large_unused_data(self, size):
     774          data = b'abcdefghijklmnop'
     775          unused = b'x' * size
     776          comp = zlib.compress(data) + unused
     777          do = zlib.decompressobj()
     778          try:
     779              uncomp = do.decompress(comp) + do.flush()
     780              self.assertEqual(unused, do.unused_data)
     781              self.assertEqual(uncomp, data)
     782          finally:
     783              unused = comp = do = None
     784  
     785      @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
     786      @bigmemtest(size=_4G + 100, memuse=5)
     787      def test_large_unconsumed_tail(self, size):
     788          data = b'x' * size
     789          do = zlib.decompressobj()
     790          try:
     791              comp = zlib.compress(data, 0)
     792              uncomp = do.decompress(comp, 1) + do.flush()
     793              self.assertEqual(uncomp, data)
     794              self.assertEqual(do.unconsumed_tail, b'')
     795          finally:
     796              comp = uncomp = data = None
     797  
     798      def test_wbits(self):
     799          # wbits=0 only supported since zlib v1.2.3.5
     800          # Register "1.2.3" as "1.2.3.0"
     801          # or "1.2.0-linux","1.2.0.f","1.2.0.f-linux"
     802          v = zlib.ZLIB_RUNTIME_VERSION.split('-', 1)[0].split('.')
     803          if len(v) < 4:
     804              v.append('0')
     805          elif not v[-1].isnumeric():
     806              v[-1] = '0'
     807  
     808          v = tuple(map(int, v))
     809          supports_wbits_0 = v >= (1, 2, 3, 5)
     810  
     811          co = zlib.compressobj(level=1, wbits=15)
     812          zlib15 = co.compress(HAMLET_SCENE) + co.flush()
     813          self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE)
     814          if supports_wbits_0:
     815              self.assertEqual(zlib.decompress(zlib15, 0), HAMLET_SCENE)
     816          self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE)
     817          with self.assertRaisesRegex(zlib.error, 'invalid window size'):
     818              zlib.decompress(zlib15, 14)
     819          dco = zlib.decompressobj(wbits=32 + 15)
     820          self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE)
     821          dco = zlib.decompressobj(wbits=14)
     822          with self.assertRaisesRegex(zlib.error, 'invalid window size'):
     823              dco.decompress(zlib15)
     824  
     825          co = zlib.compressobj(level=1, wbits=9)
     826          zlib9 = co.compress(HAMLET_SCENE) + co.flush()
     827          self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE)
     828          self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE)
     829          if supports_wbits_0:
     830              self.assertEqual(zlib.decompress(zlib9, 0), HAMLET_SCENE)
     831          self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE)
     832          dco = zlib.decompressobj(wbits=32 + 9)
     833          self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE)
     834  
     835          co = zlib.compressobj(level=1, wbits=-15)
     836          deflate15 = co.compress(HAMLET_SCENE) + co.flush()
     837          self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE)
     838          dco = zlib.decompressobj(wbits=-15)
     839          self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE)
     840  
     841          co = zlib.compressobj(level=1, wbits=-9)
     842          deflate9 = co.compress(HAMLET_SCENE) + co.flush()
     843          self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE)
     844          self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE)
     845          dco = zlib.decompressobj(wbits=-9)
     846          self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE)
     847  
     848          co = zlib.compressobj(level=1, wbits=16 + 15)
     849          gzip = co.compress(HAMLET_SCENE) + co.flush()
     850          self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE)
     851          self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE)
     852          dco = zlib.decompressobj(32 + 15)
     853          self.assertEqual(dco.decompress(gzip), HAMLET_SCENE)
     854  
     855          for wbits in (-15, 15, 31):
     856              with self.subTest(wbits=wbits):
     857                  expected = HAMLET_SCENE
     858                  actual = zlib.decompress(
     859                      zlib.compress(HAMLET_SCENE, wbits=wbits), wbits=wbits
     860                  )
     861                  self.assertEqual(expected, actual)
     862  
     863  def choose_lines(source, number, seed=None, generator=random):
     864      """Return a list of number lines randomly chosen from the source"""
     865      if seed is not None:
     866          generator.seed(seed)
     867      sources = source.split('\n')
     868      return [generator.choice(sources) for n in range(number)]
     869  
     870  
     871  HAMLET_SCENE = b"""
     872  LAERTES
     873  
     874         O, fear me not.
     875         I stay too long: but here my father comes.
     876  
     877         Enter POLONIUS
     878  
     879         A double blessing is a double grace,
     880         Occasion smiles upon a second leave.
     881  
     882  LORD POLONIUS
     883  
     884         Yet here, Laertes! aboard, aboard, for shame!
     885         The wind sits in the shoulder of your sail,
     886         And you are stay'd for. There; my blessing with thee!
     887         And these few precepts in thy memory
     888         See thou character. Give thy thoughts no tongue,
     889         Nor any unproportioned thought his act.
     890         Be thou familiar, but by no means vulgar.
     891         Those friends thou hast, and their adoption tried,
     892         Grapple them to thy soul with hoops of steel;
     893         But do not dull thy palm with entertainment
     894         Of each new-hatch'd, unfledged comrade. Beware
     895         Of entrance to a quarrel, but being in,
     896         Bear't that the opposed may beware of thee.
     897         Give every man thy ear, but few thy voice;
     898         Take each man's censure, but reserve thy judgment.
     899         Costly thy habit as thy purse can buy,
     900         But not express'd in fancy; rich, not gaudy;
     901         For the apparel oft proclaims the man,
     902         And they in France of the best rank and station
     903         Are of a most select and generous chief in that.
     904         Neither a borrower nor a lender be;
     905         For loan oft loses both itself and friend,
     906         And borrowing dulls the edge of husbandry.
     907         This above all: to thine ownself be true,
     908         And it must follow, as the night the day,
     909         Thou canst not then be false to any man.
     910         Farewell: my blessing season this in thee!
     911  
     912  LAERTES
     913  
     914         Most humbly do I take my leave, my lord.
     915  
     916  LORD POLONIUS
     917  
     918         The time invites you; go; your servants tend.
     919  
     920  LAERTES
     921  
     922         Farewell, Ophelia; and remember well
     923         What I have said to you.
     924  
     925  OPHELIA
     926  
     927         'Tis in my memory lock'd,
     928         And you yourself shall keep the key of it.
     929  
     930  LAERTES
     931  
     932         Farewell.
     933  """
     934  
     935  
     936  class ESC[4;38;5;81mCustomInt:
     937      def __index__(self):
     938          return 100
     939  
     940  
     941  if __name__ == "__main__":
     942      unittest.main()