(root)/
Python-3.12.0/
Lib/
test/
test_gzip.py
       1  """Test script for the gzip module.
       2  """
       3  
       4  import array
       5  import functools
       6  import io
       7  import os
       8  import pathlib
       9  import struct
      10  import sys
      11  import unittest
      12  from subprocess import PIPE, Popen
      13  from test.support import import_helper
      14  from test.support import os_helper
      15  from test.support import _4G, bigmemtest, requires_subprocess
      16  from test.support.script_helper import assert_python_ok, assert_python_failure
      17  
      18  gzip = import_helper.import_module('gzip')
      19  zlib = import_helper.import_module('zlib')
      20  
      21  data1 = b"""  int length=DEFAULTALLOC, err = Z_OK;
      22    PyObject *RetVal;
      23    int flushmode = Z_FINISH;
      24    unsigned long start_total_out;
      25  
      26  """
      27  
      28  data2 = b"""/* zlibmodule.c -- gzip-compatible data compression */
      29  /* See http://www.gzip.org/zlib/
      30  /* See http://www.winimage.com/zLibDll for Windows */
      31  """
      32  
      33  
      34  TEMPDIR = os.path.abspath(os_helper.TESTFN) + '-gzdir'
      35  
      36  
      37  class ESC[4;38;5;81mUnseekableIO(ESC[4;38;5;149mioESC[4;38;5;149m.ESC[4;38;5;149mBytesIO):
      38      def seekable(self):
      39          return False
      40  
      41      def tell(self):
      42          raise io.UnsupportedOperation
      43  
      44      def seek(self, *args):
      45          raise io.UnsupportedOperation
      46  
      47  
      48  class ESC[4;38;5;81mBaseTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      49      filename = os_helper.TESTFN
      50  
      51      def setUp(self):
      52          os_helper.unlink(self.filename)
      53  
      54      def tearDown(self):
      55          os_helper.unlink(self.filename)
      56  
      57  
      58  class ESC[4;38;5;81mTestGzip(ESC[4;38;5;149mBaseTest):
      59      def write_and_read_back(self, data, mode='b'):
      60          b_data = bytes(data)
      61          with gzip.GzipFile(self.filename, 'w'+mode) as f:
      62              l = f.write(data)
      63          self.assertEqual(l, len(b_data))
      64          with gzip.GzipFile(self.filename, 'r'+mode) as f:
      65              self.assertEqual(f.read(), b_data)
      66  
      67      def test_write(self):
      68          with gzip.GzipFile(self.filename, 'wb') as f:
      69              f.write(data1 * 50)
      70  
      71              # Try flush and fileno.
      72              f.flush()
      73              f.fileno()
      74              if hasattr(os, 'fsync'):
      75                  os.fsync(f.fileno())
      76              f.close()
      77  
      78          # Test multiple close() calls.
      79          f.close()
      80  
      81      def test_write_read_with_pathlike_file(self):
      82          filename = pathlib.Path(self.filename)
      83          with gzip.GzipFile(filename, 'w') as f:
      84              f.write(data1 * 50)
      85          self.assertIsInstance(f.name, str)
      86          with gzip.GzipFile(filename, 'a') as f:
      87              f.write(data1)
      88          with gzip.GzipFile(filename) as f:
      89              d = f.read()
      90          self.assertEqual(d, data1 * 51)
      91          self.assertIsInstance(f.name, str)
      92  
      93      # The following test_write_xy methods test that write accepts
      94      # the corresponding bytes-like object type as input
      95      # and that the data written equals bytes(xy) in all cases.
      96      def test_write_memoryview(self):
      97          self.write_and_read_back(memoryview(data1 * 50))
      98          m = memoryview(bytes(range(256)))
      99          data = m.cast('B', shape=[8,8,4])
     100          self.write_and_read_back(data)
     101  
     102      def test_write_bytearray(self):
     103          self.write_and_read_back(bytearray(data1 * 50))
     104  
     105      def test_write_array(self):
     106          self.write_and_read_back(array.array('I', data1 * 40))
     107  
     108      def test_write_incompatible_type(self):
     109          # Test that non-bytes-like types raise TypeError.
     110          # Issue #21560: attempts to write incompatible types
     111          # should not affect the state of the fileobject
     112          with gzip.GzipFile(self.filename, 'wb') as f:
     113              with self.assertRaises(TypeError):
     114                  f.write('')
     115              with self.assertRaises(TypeError):
     116                  f.write([])
     117              f.write(data1)
     118          with gzip.GzipFile(self.filename, 'rb') as f:
     119              self.assertEqual(f.read(), data1)
     120  
     121      def test_read(self):
     122          self.test_write()
     123          # Try reading.
     124          with gzip.GzipFile(self.filename, 'r') as f:
     125              d = f.read()
     126          self.assertEqual(d, data1*50)
     127  
     128      def test_read1(self):
     129          self.test_write()
     130          blocks = []
     131          nread = 0
     132          with gzip.GzipFile(self.filename, 'r') as f:
     133              while True:
     134                  d = f.read1()
     135                  if not d:
     136                      break
     137                  blocks.append(d)
     138                  nread += len(d)
     139                  # Check that position was updated correctly (see issue10791).
     140                  self.assertEqual(f.tell(), nread)
     141          self.assertEqual(b''.join(blocks), data1 * 50)
     142  
     143      @bigmemtest(size=_4G, memuse=1)
     144      def test_read_large(self, size):
     145          # Read chunk size over UINT_MAX should be supported, despite zlib's
     146          # limitation per low-level call
     147          compressed = gzip.compress(data1, compresslevel=1)
     148          f = gzip.GzipFile(fileobj=io.BytesIO(compressed), mode='rb')
     149          self.assertEqual(f.read(size), data1)
     150  
     151      def test_io_on_closed_object(self):
     152          # Test that I/O operations on closed GzipFile objects raise a
     153          # ValueError, just like the corresponding functions on file objects.
     154  
     155          # Write to a file, open it for reading, then close it.
     156          self.test_write()
     157          f = gzip.GzipFile(self.filename, 'r')
     158          fileobj = f.fileobj
     159          self.assertFalse(fileobj.closed)
     160          f.close()
     161          self.assertTrue(fileobj.closed)
     162          with self.assertRaises(ValueError):
     163              f.read(1)
     164          with self.assertRaises(ValueError):
     165              f.seek(0)
     166          with self.assertRaises(ValueError):
     167              f.tell()
     168          # Open the file for writing, then close it.
     169          f = gzip.GzipFile(self.filename, 'w')
     170          fileobj = f.fileobj
     171          self.assertFalse(fileobj.closed)
     172          f.close()
     173          self.assertTrue(fileobj.closed)
     174          with self.assertRaises(ValueError):
     175              f.write(b'')
     176          with self.assertRaises(ValueError):
     177              f.flush()
     178  
     179      def test_append(self):
     180          self.test_write()
     181          # Append to the previous file
     182          with gzip.GzipFile(self.filename, 'ab') as f:
     183              f.write(data2 * 15)
     184  
     185          with gzip.GzipFile(self.filename, 'rb') as f:
     186              d = f.read()
     187          self.assertEqual(d, (data1*50) + (data2*15))
     188  
     189      def test_many_append(self):
     190          # Bug #1074261 was triggered when reading a file that contained
     191          # many, many members.  Create such a file and verify that reading it
     192          # works.
     193          with gzip.GzipFile(self.filename, 'wb', 9) as f:
     194              f.write(b'a')
     195          for i in range(0, 200):
     196              with gzip.GzipFile(self.filename, "ab", 9) as f: # append
     197                  f.write(b'a')
     198  
     199          # Try reading the file
     200          with gzip.GzipFile(self.filename, "rb") as zgfile:
     201              contents = b""
     202              while 1:
     203                  ztxt = zgfile.read(8192)
     204                  contents += ztxt
     205                  if not ztxt: break
     206          self.assertEqual(contents, b'a'*201)
     207  
     208      def test_exclusive_write(self):
     209          with gzip.GzipFile(self.filename, 'xb') as f:
     210              f.write(data1 * 50)
     211          with gzip.GzipFile(self.filename, 'rb') as f:
     212              self.assertEqual(f.read(), data1 * 50)
     213          with self.assertRaises(FileExistsError):
     214              gzip.GzipFile(self.filename, 'xb')
     215  
     216      def test_buffered_reader(self):
     217          # Issue #7471: a GzipFile can be wrapped in a BufferedReader for
     218          # performance.
     219          self.test_write()
     220  
     221          with gzip.GzipFile(self.filename, 'rb') as f:
     222              with io.BufferedReader(f) as r:
     223                  lines = [line for line in r]
     224  
     225          self.assertEqual(lines, 50 * data1.splitlines(keepends=True))
     226  
     227      def test_readline(self):
     228          self.test_write()
     229          # Try .readline() with varying line lengths
     230  
     231          with gzip.GzipFile(self.filename, 'rb') as f:
     232              line_length = 0
     233              while 1:
     234                  L = f.readline(line_length)
     235                  if not L and line_length != 0: break
     236                  self.assertTrue(len(L) <= line_length)
     237                  line_length = (line_length + 1) % 50
     238  
     239      def test_readlines(self):
     240          self.test_write()
     241          # Try .readlines()
     242  
     243          with gzip.GzipFile(self.filename, 'rb') as f:
     244              L = f.readlines()
     245  
     246          with gzip.GzipFile(self.filename, 'rb') as f:
     247              while 1:
     248                  L = f.readlines(150)
     249                  if L == []: break
     250  
     251      def test_seek_read(self):
     252          self.test_write()
     253          # Try seek, read test
     254  
     255          with gzip.GzipFile(self.filename) as f:
     256              while 1:
     257                  oldpos = f.tell()
     258                  line1 = f.readline()
     259                  if not line1: break
     260                  newpos = f.tell()
     261                  f.seek(oldpos)  # negative seek
     262                  if len(line1)>10:
     263                      amount = 10
     264                  else:
     265                      amount = len(line1)
     266                  line2 = f.read(amount)
     267                  self.assertEqual(line1[:amount], line2)
     268                  f.seek(newpos)  # positive seek
     269  
     270      def test_seek_whence(self):
     271          self.test_write()
     272          # Try seek(whence=1), read test
     273  
     274          with gzip.GzipFile(self.filename) as f:
     275              f.read(10)
     276              f.seek(10, whence=1)
     277              y = f.read(10)
     278          self.assertEqual(y, data1[20:30])
     279  
     280      def test_seek_write(self):
     281          # Try seek, write test
     282          with gzip.GzipFile(self.filename, 'w') as f:
     283              for pos in range(0, 256, 16):
     284                  f.seek(pos)
     285                  f.write(b'GZ\n')
     286  
     287      def test_mode(self):
     288          self.test_write()
     289          with gzip.GzipFile(self.filename, 'r') as f:
     290              self.assertEqual(f.myfileobj.mode, 'rb')
     291          os_helper.unlink(self.filename)
     292          with gzip.GzipFile(self.filename, 'x') as f:
     293              self.assertEqual(f.myfileobj.mode, 'xb')
     294  
     295      def test_1647484(self):
     296          for mode in ('wb', 'rb'):
     297              with gzip.GzipFile(self.filename, mode) as f:
     298                  self.assertTrue(hasattr(f, "name"))
     299                  self.assertEqual(f.name, self.filename)
     300  
     301      def test_paddedfile_getattr(self):
     302          self.test_write()
     303          with gzip.GzipFile(self.filename, 'rb') as f:
     304              self.assertTrue(hasattr(f.fileobj, "name"))
     305              self.assertEqual(f.fileobj.name, self.filename)
     306  
     307      def test_mtime(self):
     308          mtime = 123456789
     309          with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
     310              fWrite.write(data1)
     311          with gzip.GzipFile(self.filename) as fRead:
     312              self.assertTrue(hasattr(fRead, 'mtime'))
     313              self.assertIsNone(fRead.mtime)
     314              dataRead = fRead.read()
     315              self.assertEqual(dataRead, data1)
     316              self.assertEqual(fRead.mtime, mtime)
     317  
     318      def test_metadata(self):
     319          mtime = 123456789
     320  
     321          with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
     322              fWrite.write(data1)
     323  
     324          with open(self.filename, 'rb') as fRead:
     325              # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
     326  
     327              idBytes = fRead.read(2)
     328              self.assertEqual(idBytes, b'\x1f\x8b') # gzip ID
     329  
     330              cmByte = fRead.read(1)
     331              self.assertEqual(cmByte, b'\x08') # deflate
     332  
     333              try:
     334                  expectedname = self.filename.encode('Latin-1') + b'\x00'
     335                  expectedflags = b'\x08' # only the FNAME flag is set
     336              except UnicodeEncodeError:
     337                  expectedname = b''
     338                  expectedflags = b'\x00'
     339  
     340              flagsByte = fRead.read(1)
     341              self.assertEqual(flagsByte, expectedflags)
     342  
     343              mtimeBytes = fRead.read(4)
     344              self.assertEqual(mtimeBytes, struct.pack('<i', mtime)) # little-endian
     345  
     346              xflByte = fRead.read(1)
     347              self.assertEqual(xflByte, b'\x02') # maximum compression
     348  
     349              osByte = fRead.read(1)
     350              self.assertEqual(osByte, b'\xff') # OS "unknown" (OS-independent)
     351  
     352              # Since the FNAME flag is set, the zero-terminated filename follows.
     353              # RFC 1952 specifies that this is the name of the input file, if any.
     354              # However, the gzip module defaults to storing the name of the output
     355              # file in this field.
     356              nameBytes = fRead.read(len(expectedname))
     357              self.assertEqual(nameBytes, expectedname)
     358  
     359              # Since no other flags were set, the header ends here.
     360              # Rather than process the compressed data, let's seek to the trailer.
     361              fRead.seek(os.stat(self.filename).st_size - 8)
     362  
     363              crc32Bytes = fRead.read(4) # CRC32 of uncompressed data [data1]
     364              self.assertEqual(crc32Bytes, b'\xaf\xd7d\x83')
     365  
     366              isizeBytes = fRead.read(4)
     367              self.assertEqual(isizeBytes, struct.pack('<i', len(data1)))
     368  
     369      def test_metadata_ascii_name(self):
     370          self.filename = os_helper.TESTFN_ASCII
     371          self.test_metadata()
     372  
     373      def test_compresslevel_metadata(self):
     374          # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
     375          # specifically, discussion of XFL in section 2.3.1
     376          cases = [
     377              ('fast', 1, b'\x04'),
     378              ('best', 9, b'\x02'),
     379              ('tradeoff', 6, b'\x00'),
     380          ]
     381          xflOffset = 8
     382  
     383          for (name, level, expectedXflByte) in cases:
     384              with self.subTest(name):
     385                  fWrite = gzip.GzipFile(self.filename, 'w', compresslevel=level)
     386                  with fWrite:
     387                      fWrite.write(data1)
     388                  with open(self.filename, 'rb') as fRead:
     389                      fRead.seek(xflOffset)
     390                      xflByte = fRead.read(1)
     391                      self.assertEqual(xflByte, expectedXflByte)
     392  
     393      def test_with_open(self):
     394          # GzipFile supports the context management protocol
     395          with gzip.GzipFile(self.filename, "wb") as f:
     396              f.write(b"xxx")
     397          f = gzip.GzipFile(self.filename, "rb")
     398          f.close()
     399          try:
     400              with f:
     401                  pass
     402          except ValueError:
     403              pass
     404          else:
     405              self.fail("__enter__ on a closed file didn't raise an exception")
     406          try:
     407              with gzip.GzipFile(self.filename, "wb") as f:
     408                  1/0
     409          except ZeroDivisionError:
     410              pass
     411          else:
     412              self.fail("1/0 didn't raise an exception")
     413  
     414      def test_zero_padded_file(self):
     415          with gzip.GzipFile(self.filename, "wb") as f:
     416              f.write(data1 * 50)
     417  
     418          # Pad the file with zeroes
     419          with open(self.filename, "ab") as f:
     420              f.write(b"\x00" * 50)
     421  
     422          with gzip.GzipFile(self.filename, "rb") as f:
     423              d = f.read()
     424              self.assertEqual(d, data1 * 50, "Incorrect data in file")
     425  
     426      def test_gzip_BadGzipFile_exception(self):
     427          self.assertTrue(issubclass(gzip.BadGzipFile, OSError))
     428  
     429      def test_bad_gzip_file(self):
     430          with open(self.filename, 'wb') as file:
     431              file.write(data1 * 50)
     432          with gzip.GzipFile(self.filename, 'r') as file:
     433              self.assertRaises(gzip.BadGzipFile, file.readlines)
     434  
     435      def test_non_seekable_file(self):
     436          uncompressed = data1 * 50
     437          buf = UnseekableIO()
     438          with gzip.GzipFile(fileobj=buf, mode="wb") as f:
     439              f.write(uncompressed)
     440          compressed = buf.getvalue()
     441          buf = UnseekableIO(compressed)
     442          with gzip.GzipFile(fileobj=buf, mode="rb") as f:
     443              self.assertEqual(f.read(), uncompressed)
     444  
     445      def test_peek(self):
     446          uncompressed = data1 * 200
     447          with gzip.GzipFile(self.filename, "wb") as f:
     448              f.write(uncompressed)
     449  
     450          def sizes():
     451              while True:
     452                  for n in range(5, 50, 10):
     453                      yield n
     454  
     455          with gzip.GzipFile(self.filename, "rb") as f:
     456              f.max_read_chunk = 33
     457              nread = 0
     458              for n in sizes():
     459                  s = f.peek(n)
     460                  if s == b'':
     461                      break
     462                  self.assertEqual(f.read(len(s)), s)
     463                  nread += len(s)
     464              self.assertEqual(f.read(100), b'')
     465              self.assertEqual(nread, len(uncompressed))
     466  
     467      def test_textio_readlines(self):
     468          # Issue #10791: TextIOWrapper.readlines() fails when wrapping GzipFile.
     469          lines = (data1 * 50).decode("ascii").splitlines(keepends=True)
     470          self.test_write()
     471          with gzip.GzipFile(self.filename, 'r') as f:
     472              with io.TextIOWrapper(f, encoding="ascii") as t:
     473                  self.assertEqual(t.readlines(), lines)
     474  
     475      def test_fileobj_from_fdopen(self):
     476          # Issue #13781: Opening a GzipFile for writing fails when using a
     477          # fileobj created with os.fdopen().
     478          fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT)
     479          with os.fdopen(fd, "wb") as f:
     480              with gzip.GzipFile(fileobj=f, mode="w") as g:
     481                  pass
     482  
     483      def test_fileobj_mode(self):
     484          gzip.GzipFile(self.filename, "wb").close()
     485          with open(self.filename, "r+b") as f:
     486              with gzip.GzipFile(fileobj=f, mode='r') as g:
     487                  self.assertEqual(g.mode, gzip.READ)
     488              with gzip.GzipFile(fileobj=f, mode='w') as g:
     489                  self.assertEqual(g.mode, gzip.WRITE)
     490              with gzip.GzipFile(fileobj=f, mode='a') as g:
     491                  self.assertEqual(g.mode, gzip.WRITE)
     492              with gzip.GzipFile(fileobj=f, mode='x') as g:
     493                  self.assertEqual(g.mode, gzip.WRITE)
     494              with self.assertRaises(ValueError):
     495                  gzip.GzipFile(fileobj=f, mode='z')
     496          for mode in "rb", "r+b":
     497              with open(self.filename, mode) as f:
     498                  with gzip.GzipFile(fileobj=f) as g:
     499                      self.assertEqual(g.mode, gzip.READ)
     500          for mode in "wb", "ab", "xb":
     501              if "x" in mode:
     502                  os_helper.unlink(self.filename)
     503              with open(self.filename, mode) as f:
     504                  with self.assertWarns(FutureWarning):
     505                      g = gzip.GzipFile(fileobj=f)
     506                  with g:
     507                      self.assertEqual(g.mode, gzip.WRITE)
     508  
     509      def test_bytes_filename(self):
     510          str_filename = self.filename
     511          try:
     512              bytes_filename = str_filename.encode("ascii")
     513          except UnicodeEncodeError:
     514              self.skipTest("Temporary file name needs to be ASCII")
     515          with gzip.GzipFile(bytes_filename, "wb") as f:
     516              f.write(data1 * 50)
     517          with gzip.GzipFile(bytes_filename, "rb") as f:
     518              self.assertEqual(f.read(), data1 * 50)
     519          # Sanity check that we are actually operating on the right file.
     520          with gzip.GzipFile(str_filename, "rb") as f:
     521              self.assertEqual(f.read(), data1 * 50)
     522  
     523      def test_decompress_limited(self):
     524          """Decompressed data buffering should be limited"""
     525          bomb = gzip.compress(b'\0' * int(2e6), compresslevel=9)
     526          self.assertLess(len(bomb), io.DEFAULT_BUFFER_SIZE)
     527  
     528          bomb = io.BytesIO(bomb)
     529          decomp = gzip.GzipFile(fileobj=bomb)
     530          self.assertEqual(decomp.read(1), b'\0')
     531          max_decomp = 1 + io.DEFAULT_BUFFER_SIZE
     532          self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp,
     533              "Excessive amount of data was decompressed")
     534  
     535      # Testing compress/decompress shortcut functions
     536  
     537      def test_compress(self):
     538          for data in [data1, data2]:
     539              for args in [(), (1,), (6,), (9,)]:
     540                  datac = gzip.compress(data, *args)
     541                  self.assertEqual(type(datac), bytes)
     542                  with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f:
     543                      self.assertEqual(f.read(), data)
     544  
     545      def test_compress_mtime(self):
     546          mtime = 123456789
     547          for data in [data1, data2]:
     548              for args in [(), (1,), (6,), (9,)]:
     549                  with self.subTest(data=data, args=args):
     550                      datac = gzip.compress(data, *args, mtime=mtime)
     551                      self.assertEqual(type(datac), bytes)
     552                      with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f:
     553                          f.read(1) # to set mtime attribute
     554                          self.assertEqual(f.mtime, mtime)
     555  
     556      def test_compress_correct_level(self):
     557          # gzip.compress calls with mtime == 0 take a different code path.
     558          for mtime in (0, 42):
     559              with self.subTest(mtime=mtime):
     560                  nocompress = gzip.compress(data1, compresslevel=0, mtime=mtime)
     561                  yescompress = gzip.compress(data1, compresslevel=1, mtime=mtime)
     562                  self.assertIn(data1, nocompress)
     563                  self.assertNotIn(data1, yescompress)
     564  
     565      def test_decompress(self):
     566          for data in (data1, data2):
     567              buf = io.BytesIO()
     568              with gzip.GzipFile(fileobj=buf, mode="wb") as f:
     569                  f.write(data)
     570              self.assertEqual(gzip.decompress(buf.getvalue()), data)
     571              # Roundtrip with compress
     572              datac = gzip.compress(data)
     573              self.assertEqual(gzip.decompress(datac), data)
     574  
     575      def test_decompress_truncated_trailer(self):
     576          compressed_data = gzip.compress(data1)
     577          self.assertRaises(EOFError, gzip.decompress, compressed_data[:-4])
     578  
     579      def test_decompress_missing_trailer(self):
     580          compressed_data = gzip.compress(data1)
     581          self.assertRaises(EOFError, gzip.decompress, compressed_data[:-8])
     582  
     583      def test_read_truncated(self):
     584          data = data1*50
     585          # Drop the CRC (4 bytes) and file size (4 bytes).
     586          truncated = gzip.compress(data)[:-8]
     587          with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f:
     588              self.assertRaises(EOFError, f.read)
     589          with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f:
     590              self.assertEqual(f.read(len(data)), data)
     591              self.assertRaises(EOFError, f.read, 1)
     592          # Incomplete 10-byte header.
     593          for i in range(2, 10):
     594              with gzip.GzipFile(fileobj=io.BytesIO(truncated[:i])) as f:
     595                  self.assertRaises(EOFError, f.read, 1)
     596  
     597      def test_read_with_extra(self):
     598          # Gzip data with an extra field
     599          gzdata = (b'\x1f\x8b\x08\x04\xb2\x17cQ\x02\xff'
     600                    b'\x05\x00Extra'
     601                    b'\x0bI-.\x01\x002\xd1Mx\x04\x00\x00\x00')
     602          with gzip.GzipFile(fileobj=io.BytesIO(gzdata)) as f:
     603              self.assertEqual(f.read(), b'Test')
     604  
     605      def test_prepend_error(self):
     606          # See issue #20875
     607          with gzip.open(self.filename, "wb") as f:
     608              f.write(data1)
     609          with gzip.open(self.filename, "rb") as f:
     610              f._buffer.raw._fp.prepend()
     611  
     612      def test_issue44439(self):
     613          q = array.array('Q', [1, 2, 3, 4, 5])
     614          LENGTH = len(q) * q.itemsize
     615  
     616          with gzip.GzipFile(fileobj=io.BytesIO(), mode='w') as f:
     617              self.assertEqual(f.write(q), LENGTH)
     618              self.assertEqual(f.tell(), LENGTH)
     619  
     620      def test_flush_flushes_compressor(self):
     621          # See issue GH-105808.
     622          b = io.BytesIO()
     623          message = b"important message here."
     624          with gzip.GzipFile(fileobj=b, mode='w') as f:
     625              f.write(message)
     626              f.flush()
     627              partial_data = b.getvalue()
     628          full_data = b.getvalue()
     629          self.assertEqual(gzip.decompress(full_data), message)
     630          # The partial data should contain the gzip header and the complete
     631          # message, but not the end-of-stream markers (so we can't just
     632          # decompress it directly).
     633          with self.assertRaises(EOFError):
     634              gzip.decompress(partial_data)
     635          d = zlib.decompressobj(wbits=-zlib.MAX_WBITS)
     636          f = io.BytesIO(partial_data)
     637          gzip._read_gzip_header(f)
     638          read_message = d.decompress(f.read())
     639          self.assertEqual(read_message, message)
     640  
     641      def test_flush_modes(self):
     642          # Make sure the argument to flush is properly passed to the
     643          # zlib.compressobj; see issue GH-105808.
     644          class ESC[4;38;5;81mFakeCompressor:
     645              def __init__(self):
     646                  self.modes = []
     647              def compress(self, data):
     648                  return b''
     649              def flush(self, mode=-1):
     650                  self.modes.append(mode)
     651                  return b''
     652          b = io.BytesIO()
     653          fc = FakeCompressor()
     654          with gzip.GzipFile(fileobj=b, mode='w') as f:
     655              f.compress = fc
     656              f.flush()
     657              f.flush(50)
     658              f.flush(zlib_mode=100)
     659          # The implicit close will also flush the compressor.
     660          expected_modes = [
     661              zlib.Z_SYNC_FLUSH,
     662              50,
     663              100,
     664              -1,
     665          ]
     666          self.assertEqual(fc.modes, expected_modes)
     667  
     668      def test_write_seek_write(self):
     669          # Make sure that offset is up-to-date before seeking
     670          # See issue GH-108111
     671          b = io.BytesIO()
     672          message = b"important message here."
     673          with gzip.GzipFile(fileobj=b, mode='w') as f:
     674              f.write(message)
     675              f.seek(len(message))
     676              f.write(message)
     677          data = b.getvalue()
     678          self.assertEqual(gzip.decompress(data), message * 2)
     679  
     680  
     681  class ESC[4;38;5;81mTestOpen(ESC[4;38;5;149mBaseTest):
     682      def test_binary_modes(self):
     683          uncompressed = data1 * 50
     684  
     685          with gzip.open(self.filename, "wb") as f:
     686              f.write(uncompressed)
     687          with open(self.filename, "rb") as f:
     688              file_data = gzip.decompress(f.read())
     689              self.assertEqual(file_data, uncompressed)
     690  
     691          with gzip.open(self.filename, "rb") as f:
     692              self.assertEqual(f.read(), uncompressed)
     693  
     694          with gzip.open(self.filename, "ab") as f:
     695              f.write(uncompressed)
     696          with open(self.filename, "rb") as f:
     697              file_data = gzip.decompress(f.read())
     698              self.assertEqual(file_data, uncompressed * 2)
     699  
     700          with self.assertRaises(FileExistsError):
     701              gzip.open(self.filename, "xb")
     702          os_helper.unlink(self.filename)
     703          with gzip.open(self.filename, "xb") as f:
     704              f.write(uncompressed)
     705          with open(self.filename, "rb") as f:
     706              file_data = gzip.decompress(f.read())
     707              self.assertEqual(file_data, uncompressed)
     708  
     709      def test_pathlike_file(self):
     710          filename = pathlib.Path(self.filename)
     711          with gzip.open(filename, "wb") as f:
     712              f.write(data1 * 50)
     713          with gzip.open(filename, "ab") as f:
     714              f.write(data1)
     715          with gzip.open(filename) as f:
     716              self.assertEqual(f.read(), data1 * 51)
     717  
     718      def test_implicit_binary_modes(self):
     719          # Test implicit binary modes (no "b" or "t" in mode string).
     720          uncompressed = data1 * 50
     721  
     722          with gzip.open(self.filename, "w") as f:
     723              f.write(uncompressed)
     724          with open(self.filename, "rb") as f:
     725              file_data = gzip.decompress(f.read())
     726              self.assertEqual(file_data, uncompressed)
     727  
     728          with gzip.open(self.filename, "r") as f:
     729              self.assertEqual(f.read(), uncompressed)
     730  
     731          with gzip.open(self.filename, "a") as f:
     732              f.write(uncompressed)
     733          with open(self.filename, "rb") as f:
     734              file_data = gzip.decompress(f.read())
     735              self.assertEqual(file_data, uncompressed * 2)
     736  
     737          with self.assertRaises(FileExistsError):
     738              gzip.open(self.filename, "x")
     739          os_helper.unlink(self.filename)
     740          with gzip.open(self.filename, "x") as f:
     741              f.write(uncompressed)
     742          with open(self.filename, "rb") as f:
     743              file_data = gzip.decompress(f.read())
     744              self.assertEqual(file_data, uncompressed)
     745  
     746      def test_text_modes(self):
     747          uncompressed = data1.decode("ascii") * 50
     748          uncompressed_raw = uncompressed.replace("\n", os.linesep)
     749          with gzip.open(self.filename, "wt", encoding="ascii") as f:
     750              f.write(uncompressed)
     751          with open(self.filename, "rb") as f:
     752              file_data = gzip.decompress(f.read()).decode("ascii")
     753              self.assertEqual(file_data, uncompressed_raw)
     754          with gzip.open(self.filename, "rt", encoding="ascii") as f:
     755              self.assertEqual(f.read(), uncompressed)
     756          with gzip.open(self.filename, "at", encoding="ascii") as f:
     757              f.write(uncompressed)
     758          with open(self.filename, "rb") as f:
     759              file_data = gzip.decompress(f.read()).decode("ascii")
     760              self.assertEqual(file_data, uncompressed_raw * 2)
     761  
     762      def test_fileobj(self):
     763          uncompressed_bytes = data1 * 50
     764          uncompressed_str = uncompressed_bytes.decode("ascii")
     765          compressed = gzip.compress(uncompressed_bytes)
     766          with gzip.open(io.BytesIO(compressed), "r") as f:
     767              self.assertEqual(f.read(), uncompressed_bytes)
     768          with gzip.open(io.BytesIO(compressed), "rb") as f:
     769              self.assertEqual(f.read(), uncompressed_bytes)
     770          with gzip.open(io.BytesIO(compressed), "rt", encoding="ascii") as f:
     771              self.assertEqual(f.read(), uncompressed_str)
     772  
     773      def test_bad_params(self):
     774          # Test invalid parameter combinations.
     775          with self.assertRaises(TypeError):
     776              gzip.open(123.456)
     777          with self.assertRaises(ValueError):
     778              gzip.open(self.filename, "wbt")
     779          with self.assertRaises(ValueError):
     780              gzip.open(self.filename, "xbt")
     781          with self.assertRaises(ValueError):
     782              gzip.open(self.filename, "rb", encoding="utf-8")
     783          with self.assertRaises(ValueError):
     784              gzip.open(self.filename, "rb", errors="ignore")
     785          with self.assertRaises(ValueError):
     786              gzip.open(self.filename, "rb", newline="\n")
     787  
     788      def test_encoding(self):
     789          # Test non-default encoding.
     790          uncompressed = data1.decode("ascii") * 50
     791          uncompressed_raw = uncompressed.replace("\n", os.linesep)
     792          with gzip.open(self.filename, "wt", encoding="utf-16") as f:
     793              f.write(uncompressed)
     794          with open(self.filename, "rb") as f:
     795              file_data = gzip.decompress(f.read()).decode("utf-16")
     796              self.assertEqual(file_data, uncompressed_raw)
     797          with gzip.open(self.filename, "rt", encoding="utf-16") as f:
     798              self.assertEqual(f.read(), uncompressed)
     799  
     800      def test_encoding_error_handler(self):
     801          # Test with non-default encoding error handler.
     802          with gzip.open(self.filename, "wb") as f:
     803              f.write(b"foo\xffbar")
     804          with gzip.open(self.filename, "rt", encoding="ascii", errors="ignore") \
     805                  as f:
     806              self.assertEqual(f.read(), "foobar")
     807  
     808      def test_newline(self):
     809          # Test with explicit newline (universal newline mode disabled).
     810          uncompressed = data1.decode("ascii") * 50
     811          with gzip.open(self.filename, "wt", encoding="ascii", newline="\n") as f:
     812              f.write(uncompressed)
     813          with gzip.open(self.filename, "rt", encoding="ascii", newline="\r") as f:
     814              self.assertEqual(f.readlines(), [uncompressed])
     815  
     816  
     817  def create_and_remove_directory(directory):
     818      def decorator(function):
     819          @functools.wraps(function)
     820          def wrapper(*args, **kwargs):
     821              os.makedirs(directory)
     822              try:
     823                  return function(*args, **kwargs)
     824              finally:
     825                  os_helper.rmtree(directory)
     826          return wrapper
     827      return decorator
     828  
     829  
     830  class ESC[4;38;5;81mTestCommandLine(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     831      data = b'This is a simple test with gzip'
     832  
     833      @requires_subprocess()
     834      def test_decompress_stdin_stdout(self):
     835          with io.BytesIO() as bytes_io:
     836              with gzip.GzipFile(fileobj=bytes_io, mode='wb') as gzip_file:
     837                  gzip_file.write(self.data)
     838  
     839              args = sys.executable, '-m', 'gzip', '-d'
     840              with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc:
     841                  out, err = proc.communicate(bytes_io.getvalue())
     842  
     843          self.assertEqual(err, b'')
     844          self.assertEqual(out, self.data)
     845  
     846      @create_and_remove_directory(TEMPDIR)
     847      def test_decompress_infile_outfile(self):
     848          gzipname = os.path.join(TEMPDIR, 'testgzip.gz')
     849          self.assertFalse(os.path.exists(gzipname))
     850  
     851          with gzip.open(gzipname, mode='wb') as fp:
     852              fp.write(self.data)
     853          rc, out, err = assert_python_ok('-m', 'gzip', '-d', gzipname)
     854  
     855          with open(os.path.join(TEMPDIR, "testgzip"), "rb") as gunziped:
     856              self.assertEqual(gunziped.read(), self.data)
     857  
     858          self.assertTrue(os.path.exists(gzipname))
     859          self.assertEqual(rc, 0)
     860          self.assertEqual(out, b'')
     861          self.assertEqual(err, b'')
     862  
     863      def test_decompress_infile_outfile_error(self):
     864          rc, out, err = assert_python_failure('-m', 'gzip', '-d', 'thisisatest.out')
     865          self.assertEqual(b"filename doesn't end in .gz: 'thisisatest.out'", err.strip())
     866          self.assertEqual(rc, 1)
     867          self.assertEqual(out, b'')
     868  
     869      @requires_subprocess()
     870      @create_and_remove_directory(TEMPDIR)
     871      def test_compress_stdin_outfile(self):
     872          args = sys.executable, '-m', 'gzip'
     873          with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc:
     874              out, err = proc.communicate(self.data)
     875  
     876          self.assertEqual(err, b'')
     877          self.assertEqual(out[:2], b"\x1f\x8b")
     878  
     879      @create_and_remove_directory(TEMPDIR)
     880      def test_compress_infile_outfile_default(self):
     881          local_testgzip = os.path.join(TEMPDIR, 'testgzip')
     882          gzipname = local_testgzip + '.gz'
     883          self.assertFalse(os.path.exists(gzipname))
     884  
     885          with open(local_testgzip, 'wb') as fp:
     886              fp.write(self.data)
     887  
     888          rc, out, err = assert_python_ok('-m', 'gzip', local_testgzip)
     889  
     890          self.assertTrue(os.path.exists(gzipname))
     891          self.assertEqual(out, b'')
     892          self.assertEqual(err, b'')
     893  
     894      @create_and_remove_directory(TEMPDIR)
     895      def test_compress_infile_outfile(self):
     896          for compress_level in ('--fast', '--best'):
     897              with self.subTest(compress_level=compress_level):
     898                  local_testgzip = os.path.join(TEMPDIR, 'testgzip')
     899                  gzipname = local_testgzip + '.gz'
     900                  self.assertFalse(os.path.exists(gzipname))
     901  
     902                  with open(local_testgzip, 'wb') as fp:
     903                      fp.write(self.data)
     904  
     905                  rc, out, err = assert_python_ok('-m', 'gzip', compress_level, local_testgzip)
     906  
     907                  self.assertTrue(os.path.exists(gzipname))
     908                  self.assertEqual(out, b'')
     909                  self.assertEqual(err, b'')
     910                  os.remove(gzipname)
     911                  self.assertFalse(os.path.exists(gzipname))
     912  
     913      def test_compress_fast_best_are_exclusive(self):
     914          rc, out, err = assert_python_failure('-m', 'gzip', '--fast', '--best')
     915          self.assertIn(b"error: argument --best: not allowed with argument --fast", err)
     916          self.assertEqual(out, b'')
     917  
     918      def test_decompress_cannot_have_flags_compression(self):
     919          rc, out, err = assert_python_failure('-m', 'gzip', '--fast', '-d')
     920          self.assertIn(b'error: argument -d/--decompress: not allowed with argument --fast', err)
     921          self.assertEqual(out, b'')
     922  
     923  
     924  if __name__ == "__main__":
     925      unittest.main()