python (3.11.7)
1 """Test script for the gzip module.
2 """
3
4 import array
5 import functools
6 import io
7 import os
8 import pathlib
9 import struct
10 import sys
11 import unittest
12 from subprocess import PIPE, Popen
13 from test.support import import_helper
14 from test.support import os_helper
15 from test.support import _4G, bigmemtest, requires_subprocess
16 from test.support.script_helper import assert_python_ok, assert_python_failure
17
18 gzip = import_helper.import_module('gzip')
19
20 data1 = b""" int length=DEFAULTALLOC, err = Z_OK;
21 PyObject *RetVal;
22 int flushmode = Z_FINISH;
23 unsigned long start_total_out;
24
25 """
26
27 data2 = b"""/* zlibmodule.c -- gzip-compatible data compression */
28 /* See http://www.gzip.org/zlib/
29 /* See http://www.winimage.com/zLibDll for Windows */
30 """
31
32
33 TEMPDIR = os.path.abspath(os_helper.TESTFN) + '-gzdir'
34
35
36 class ESC[4;38;5;81mUnseekableIO(ESC[4;38;5;149mioESC[4;38;5;149m.ESC[4;38;5;149mBytesIO):
37 def seekable(self):
38 return False
39
40 def tell(self):
41 raise io.UnsupportedOperation
42
43 def seek(self, *args):
44 raise io.UnsupportedOperation
45
46
47 class ESC[4;38;5;81mBaseTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
48 filename = os_helper.TESTFN
49
50 def setUp(self):
51 os_helper.unlink(self.filename)
52
53 def tearDown(self):
54 os_helper.unlink(self.filename)
55
56
57 class ESC[4;38;5;81mTestGzip(ESC[4;38;5;149mBaseTest):
58 def write_and_read_back(self, data, mode='b'):
59 b_data = bytes(data)
60 with gzip.GzipFile(self.filename, 'w'+mode) as f:
61 l = f.write(data)
62 self.assertEqual(l, len(b_data))
63 with gzip.GzipFile(self.filename, 'r'+mode) as f:
64 self.assertEqual(f.read(), b_data)
65
66 def test_write(self):
67 with gzip.GzipFile(self.filename, 'wb') as f:
68 f.write(data1 * 50)
69
70 # Try flush and fileno.
71 f.flush()
72 f.fileno()
73 if hasattr(os, 'fsync'):
74 os.fsync(f.fileno())
75 f.close()
76
77 # Test multiple close() calls.
78 f.close()
79
80 def test_write_read_with_pathlike_file(self):
81 filename = pathlib.Path(self.filename)
82 with gzip.GzipFile(filename, 'w') as f:
83 f.write(data1 * 50)
84 self.assertIsInstance(f.name, str)
85 with gzip.GzipFile(filename, 'a') as f:
86 f.write(data1)
87 with gzip.GzipFile(filename) as f:
88 d = f.read()
89 self.assertEqual(d, data1 * 51)
90 self.assertIsInstance(f.name, str)
91
92 # The following test_write_xy methods test that write accepts
93 # the corresponding bytes-like object type as input
94 # and that the data written equals bytes(xy) in all cases.
95 def test_write_memoryview(self):
96 self.write_and_read_back(memoryview(data1 * 50))
97 m = memoryview(bytes(range(256)))
98 data = m.cast('B', shape=[8,8,4])
99 self.write_and_read_back(data)
100
101 def test_write_bytearray(self):
102 self.write_and_read_back(bytearray(data1 * 50))
103
104 def test_write_array(self):
105 self.write_and_read_back(array.array('I', data1 * 40))
106
107 def test_write_incompatible_type(self):
108 # Test that non-bytes-like types raise TypeError.
109 # Issue #21560: attempts to write incompatible types
110 # should not affect the state of the fileobject
111 with gzip.GzipFile(self.filename, 'wb') as f:
112 with self.assertRaises(TypeError):
113 f.write('')
114 with self.assertRaises(TypeError):
115 f.write([])
116 f.write(data1)
117 with gzip.GzipFile(self.filename, 'rb') as f:
118 self.assertEqual(f.read(), data1)
119
120 def test_read(self):
121 self.test_write()
122 # Try reading.
123 with gzip.GzipFile(self.filename, 'r') as f:
124 d = f.read()
125 self.assertEqual(d, data1*50)
126
127 def test_read1(self):
128 self.test_write()
129 blocks = []
130 nread = 0
131 with gzip.GzipFile(self.filename, 'r') as f:
132 while True:
133 d = f.read1()
134 if not d:
135 break
136 blocks.append(d)
137 nread += len(d)
138 # Check that position was updated correctly (see issue10791).
139 self.assertEqual(f.tell(), nread)
140 self.assertEqual(b''.join(blocks), data1 * 50)
141
142 @bigmemtest(size=_4G, memuse=1)
143 def test_read_large(self, size):
144 # Read chunk size over UINT_MAX should be supported, despite zlib's
145 # limitation per low-level call
146 compressed = gzip.compress(data1, compresslevel=1)
147 f = gzip.GzipFile(fileobj=io.BytesIO(compressed), mode='rb')
148 self.assertEqual(f.read(size), data1)
149
150 def test_io_on_closed_object(self):
151 # Test that I/O operations on closed GzipFile objects raise a
152 # ValueError, just like the corresponding functions on file objects.
153
154 # Write to a file, open it for reading, then close it.
155 self.test_write()
156 f = gzip.GzipFile(self.filename, 'r')
157 fileobj = f.fileobj
158 self.assertFalse(fileobj.closed)
159 f.close()
160 self.assertTrue(fileobj.closed)
161 with self.assertRaises(ValueError):
162 f.read(1)
163 with self.assertRaises(ValueError):
164 f.seek(0)
165 with self.assertRaises(ValueError):
166 f.tell()
167 # Open the file for writing, then close it.
168 f = gzip.GzipFile(self.filename, 'w')
169 fileobj = f.fileobj
170 self.assertFalse(fileobj.closed)
171 f.close()
172 self.assertTrue(fileobj.closed)
173 with self.assertRaises(ValueError):
174 f.write(b'')
175 with self.assertRaises(ValueError):
176 f.flush()
177
178 def test_append(self):
179 self.test_write()
180 # Append to the previous file
181 with gzip.GzipFile(self.filename, 'ab') as f:
182 f.write(data2 * 15)
183
184 with gzip.GzipFile(self.filename, 'rb') as f:
185 d = f.read()
186 self.assertEqual(d, (data1*50) + (data2*15))
187
188 def test_many_append(self):
189 # Bug #1074261 was triggered when reading a file that contained
190 # many, many members. Create such a file and verify that reading it
191 # works.
192 with gzip.GzipFile(self.filename, 'wb', 9) as f:
193 f.write(b'a')
194 for i in range(0, 200):
195 with gzip.GzipFile(self.filename, "ab", 9) as f: # append
196 f.write(b'a')
197
198 # Try reading the file
199 with gzip.GzipFile(self.filename, "rb") as zgfile:
200 contents = b""
201 while 1:
202 ztxt = zgfile.read(8192)
203 contents += ztxt
204 if not ztxt: break
205 self.assertEqual(contents, b'a'*201)
206
207 def test_exclusive_write(self):
208 with gzip.GzipFile(self.filename, 'xb') as f:
209 f.write(data1 * 50)
210 with gzip.GzipFile(self.filename, 'rb') as f:
211 self.assertEqual(f.read(), data1 * 50)
212 with self.assertRaises(FileExistsError):
213 gzip.GzipFile(self.filename, 'xb')
214
215 def test_buffered_reader(self):
216 # Issue #7471: a GzipFile can be wrapped in a BufferedReader for
217 # performance.
218 self.test_write()
219
220 with gzip.GzipFile(self.filename, 'rb') as f:
221 with io.BufferedReader(f) as r:
222 lines = [line for line in r]
223
224 self.assertEqual(lines, 50 * data1.splitlines(keepends=True))
225
226 def test_readline(self):
227 self.test_write()
228 # Try .readline() with varying line lengths
229
230 with gzip.GzipFile(self.filename, 'rb') as f:
231 line_length = 0
232 while 1:
233 L = f.readline(line_length)
234 if not L and line_length != 0: break
235 self.assertTrue(len(L) <= line_length)
236 line_length = (line_length + 1) % 50
237
238 def test_readlines(self):
239 self.test_write()
240 # Try .readlines()
241
242 with gzip.GzipFile(self.filename, 'rb') as f:
243 L = f.readlines()
244
245 with gzip.GzipFile(self.filename, 'rb') as f:
246 while 1:
247 L = f.readlines(150)
248 if L == []: break
249
250 def test_seek_read(self):
251 self.test_write()
252 # Try seek, read test
253
254 with gzip.GzipFile(self.filename) as f:
255 while 1:
256 oldpos = f.tell()
257 line1 = f.readline()
258 if not line1: break
259 newpos = f.tell()
260 f.seek(oldpos) # negative seek
261 if len(line1)>10:
262 amount = 10
263 else:
264 amount = len(line1)
265 line2 = f.read(amount)
266 self.assertEqual(line1[:amount], line2)
267 f.seek(newpos) # positive seek
268
269 def test_seek_whence(self):
270 self.test_write()
271 # Try seek(whence=1), read test
272
273 with gzip.GzipFile(self.filename) as f:
274 f.read(10)
275 f.seek(10, whence=1)
276 y = f.read(10)
277 self.assertEqual(y, data1[20:30])
278
279 def test_seek_write(self):
280 # Try seek, write test
281 with gzip.GzipFile(self.filename, 'w') as f:
282 for pos in range(0, 256, 16):
283 f.seek(pos)
284 f.write(b'GZ\n')
285
286 def test_mode(self):
287 self.test_write()
288 with gzip.GzipFile(self.filename, 'r') as f:
289 self.assertEqual(f.myfileobj.mode, 'rb')
290 os_helper.unlink(self.filename)
291 with gzip.GzipFile(self.filename, 'x') as f:
292 self.assertEqual(f.myfileobj.mode, 'xb')
293
294 def test_1647484(self):
295 for mode in ('wb', 'rb'):
296 with gzip.GzipFile(self.filename, mode) as f:
297 self.assertTrue(hasattr(f, "name"))
298 self.assertEqual(f.name, self.filename)
299
300 def test_paddedfile_getattr(self):
301 self.test_write()
302 with gzip.GzipFile(self.filename, 'rb') as f:
303 self.assertTrue(hasattr(f.fileobj, "name"))
304 self.assertEqual(f.fileobj.name, self.filename)
305
306 def test_mtime(self):
307 mtime = 123456789
308 with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
309 fWrite.write(data1)
310 with gzip.GzipFile(self.filename) as fRead:
311 self.assertTrue(hasattr(fRead, 'mtime'))
312 self.assertIsNone(fRead.mtime)
313 dataRead = fRead.read()
314 self.assertEqual(dataRead, data1)
315 self.assertEqual(fRead.mtime, mtime)
316
317 def test_metadata(self):
318 mtime = 123456789
319
320 with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
321 fWrite.write(data1)
322
323 with open(self.filename, 'rb') as fRead:
324 # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
325
326 idBytes = fRead.read(2)
327 self.assertEqual(idBytes, b'\x1f\x8b') # gzip ID
328
329 cmByte = fRead.read(1)
330 self.assertEqual(cmByte, b'\x08') # deflate
331
332 try:
333 expectedname = self.filename.encode('Latin-1') + b'\x00'
334 expectedflags = b'\x08' # only the FNAME flag is set
335 except UnicodeEncodeError:
336 expectedname = b''
337 expectedflags = b'\x00'
338
339 flagsByte = fRead.read(1)
340 self.assertEqual(flagsByte, expectedflags)
341
342 mtimeBytes = fRead.read(4)
343 self.assertEqual(mtimeBytes, struct.pack('<i', mtime)) # little-endian
344
345 xflByte = fRead.read(1)
346 self.assertEqual(xflByte, b'\x02') # maximum compression
347
348 osByte = fRead.read(1)
349 self.assertEqual(osByte, b'\xff') # OS "unknown" (OS-independent)
350
351 # Since the FNAME flag is set, the zero-terminated filename follows.
352 # RFC 1952 specifies that this is the name of the input file, if any.
353 # However, the gzip module defaults to storing the name of the output
354 # file in this field.
355 nameBytes = fRead.read(len(expectedname))
356 self.assertEqual(nameBytes, expectedname)
357
358 # Since no other flags were set, the header ends here.
359 # Rather than process the compressed data, let's seek to the trailer.
360 fRead.seek(os.stat(self.filename).st_size - 8)
361
362 crc32Bytes = fRead.read(4) # CRC32 of uncompressed data [data1]
363 self.assertEqual(crc32Bytes, b'\xaf\xd7d\x83')
364
365 isizeBytes = fRead.read(4)
366 self.assertEqual(isizeBytes, struct.pack('<i', len(data1)))
367
368 def test_metadata_ascii_name(self):
369 self.filename = os_helper.TESTFN_ASCII
370 self.test_metadata()
371
372 def test_compresslevel_metadata(self):
373 # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
374 # specifically, discussion of XFL in section 2.3.1
375 cases = [
376 ('fast', 1, b'\x04'),
377 ('best', 9, b'\x02'),
378 ('tradeoff', 6, b'\x00'),
379 ]
380 xflOffset = 8
381
382 for (name, level, expectedXflByte) in cases:
383 with self.subTest(name):
384 fWrite = gzip.GzipFile(self.filename, 'w', compresslevel=level)
385 with fWrite:
386 fWrite.write(data1)
387 with open(self.filename, 'rb') as fRead:
388 fRead.seek(xflOffset)
389 xflByte = fRead.read(1)
390 self.assertEqual(xflByte, expectedXflByte)
391
392 def test_with_open(self):
393 # GzipFile supports the context management protocol
394 with gzip.GzipFile(self.filename, "wb") as f:
395 f.write(b"xxx")
396 f = gzip.GzipFile(self.filename, "rb")
397 f.close()
398 try:
399 with f:
400 pass
401 except ValueError:
402 pass
403 else:
404 self.fail("__enter__ on a closed file didn't raise an exception")
405 try:
406 with gzip.GzipFile(self.filename, "wb") as f:
407 1/0
408 except ZeroDivisionError:
409 pass
410 else:
411 self.fail("1/0 didn't raise an exception")
412
413 def test_zero_padded_file(self):
414 with gzip.GzipFile(self.filename, "wb") as f:
415 f.write(data1 * 50)
416
417 # Pad the file with zeroes
418 with open(self.filename, "ab") as f:
419 f.write(b"\x00" * 50)
420
421 with gzip.GzipFile(self.filename, "rb") as f:
422 d = f.read()
423 self.assertEqual(d, data1 * 50, "Incorrect data in file")
424
425 def test_gzip_BadGzipFile_exception(self):
426 self.assertTrue(issubclass(gzip.BadGzipFile, OSError))
427
428 def test_bad_gzip_file(self):
429 with open(self.filename, 'wb') as file:
430 file.write(data1 * 50)
431 with gzip.GzipFile(self.filename, 'r') as file:
432 self.assertRaises(gzip.BadGzipFile, file.readlines)
433
434 def test_non_seekable_file(self):
435 uncompressed = data1 * 50
436 buf = UnseekableIO()
437 with gzip.GzipFile(fileobj=buf, mode="wb") as f:
438 f.write(uncompressed)
439 compressed = buf.getvalue()
440 buf = UnseekableIO(compressed)
441 with gzip.GzipFile(fileobj=buf, mode="rb") as f:
442 self.assertEqual(f.read(), uncompressed)
443
444 def test_peek(self):
445 uncompressed = data1 * 200
446 with gzip.GzipFile(self.filename, "wb") as f:
447 f.write(uncompressed)
448
449 def sizes():
450 while True:
451 for n in range(5, 50, 10):
452 yield n
453
454 with gzip.GzipFile(self.filename, "rb") as f:
455 f.max_read_chunk = 33
456 nread = 0
457 for n in sizes():
458 s = f.peek(n)
459 if s == b'':
460 break
461 self.assertEqual(f.read(len(s)), s)
462 nread += len(s)
463 self.assertEqual(f.read(100), b'')
464 self.assertEqual(nread, len(uncompressed))
465
466 def test_textio_readlines(self):
467 # Issue #10791: TextIOWrapper.readlines() fails when wrapping GzipFile.
468 lines = (data1 * 50).decode("ascii").splitlines(keepends=True)
469 self.test_write()
470 with gzip.GzipFile(self.filename, 'r') as f:
471 with io.TextIOWrapper(f, encoding="ascii") as t:
472 self.assertEqual(t.readlines(), lines)
473
474 def test_fileobj_from_fdopen(self):
475 # Issue #13781: Opening a GzipFile for writing fails when using a
476 # fileobj created with os.fdopen().
477 fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT)
478 with os.fdopen(fd, "wb") as f:
479 with gzip.GzipFile(fileobj=f, mode="w") as g:
480 pass
481
482 def test_fileobj_mode(self):
483 gzip.GzipFile(self.filename, "wb").close()
484 with open(self.filename, "r+b") as f:
485 with gzip.GzipFile(fileobj=f, mode='r') as g:
486 self.assertEqual(g.mode, gzip.READ)
487 with gzip.GzipFile(fileobj=f, mode='w') as g:
488 self.assertEqual(g.mode, gzip.WRITE)
489 with gzip.GzipFile(fileobj=f, mode='a') as g:
490 self.assertEqual(g.mode, gzip.WRITE)
491 with gzip.GzipFile(fileobj=f, mode='x') as g:
492 self.assertEqual(g.mode, gzip.WRITE)
493 with self.assertRaises(ValueError):
494 gzip.GzipFile(fileobj=f, mode='z')
495 for mode in "rb", "r+b":
496 with open(self.filename, mode) as f:
497 with gzip.GzipFile(fileobj=f) as g:
498 self.assertEqual(g.mode, gzip.READ)
499 for mode in "wb", "ab", "xb":
500 if "x" in mode:
501 os_helper.unlink(self.filename)
502 with open(self.filename, mode) as f:
503 with self.assertWarns(FutureWarning):
504 g = gzip.GzipFile(fileobj=f)
505 with g:
506 self.assertEqual(g.mode, gzip.WRITE)
507
508 def test_bytes_filename(self):
509 str_filename = self.filename
510 try:
511 bytes_filename = str_filename.encode("ascii")
512 except UnicodeEncodeError:
513 self.skipTest("Temporary file name needs to be ASCII")
514 with gzip.GzipFile(bytes_filename, "wb") as f:
515 f.write(data1 * 50)
516 with gzip.GzipFile(bytes_filename, "rb") as f:
517 self.assertEqual(f.read(), data1 * 50)
518 # Sanity check that we are actually operating on the right file.
519 with gzip.GzipFile(str_filename, "rb") as f:
520 self.assertEqual(f.read(), data1 * 50)
521
522 def test_decompress_limited(self):
523 """Decompressed data buffering should be limited"""
524 bomb = gzip.compress(b'\0' * int(2e6), compresslevel=9)
525 self.assertLess(len(bomb), io.DEFAULT_BUFFER_SIZE)
526
527 bomb = io.BytesIO(bomb)
528 decomp = gzip.GzipFile(fileobj=bomb)
529 self.assertEqual(decomp.read(1), b'\0')
530 max_decomp = 1 + io.DEFAULT_BUFFER_SIZE
531 self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp,
532 "Excessive amount of data was decompressed")
533
534 # Testing compress/decompress shortcut functions
535
536 def test_compress(self):
537 for data in [data1, data2]:
538 for args in [(), (1,), (6,), (9,)]:
539 datac = gzip.compress(data, *args)
540 self.assertEqual(type(datac), bytes)
541 with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f:
542 self.assertEqual(f.read(), data)
543
544 def test_compress_mtime(self):
545 mtime = 123456789
546 for data in [data1, data2]:
547 for args in [(), (1,), (6,), (9,)]:
548 with self.subTest(data=data, args=args):
549 datac = gzip.compress(data, *args, mtime=mtime)
550 self.assertEqual(type(datac), bytes)
551 with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f:
552 f.read(1) # to set mtime attribute
553 self.assertEqual(f.mtime, mtime)
554
555 def test_compress_correct_level(self):
556 # gzip.compress calls with mtime == 0 take a different code path.
557 for mtime in (0, 42):
558 with self.subTest(mtime=mtime):
559 nocompress = gzip.compress(data1, compresslevel=0, mtime=mtime)
560 yescompress = gzip.compress(data1, compresslevel=1, mtime=mtime)
561 self.assertIn(data1, nocompress)
562 self.assertNotIn(data1, yescompress)
563
564 def test_decompress(self):
565 for data in (data1, data2):
566 buf = io.BytesIO()
567 with gzip.GzipFile(fileobj=buf, mode="wb") as f:
568 f.write(data)
569 self.assertEqual(gzip.decompress(buf.getvalue()), data)
570 # Roundtrip with compress
571 datac = gzip.compress(data)
572 self.assertEqual(gzip.decompress(datac), data)
573
574 def test_decompress_truncated_trailer(self):
575 compressed_data = gzip.compress(data1)
576 self.assertRaises(EOFError, gzip.decompress, compressed_data[:-4])
577
578 def test_decompress_missing_trailer(self):
579 compressed_data = gzip.compress(data1)
580 self.assertRaises(EOFError, gzip.decompress, compressed_data[:-8])
581
582 def test_read_truncated(self):
583 data = data1*50
584 # Drop the CRC (4 bytes) and file size (4 bytes).
585 truncated = gzip.compress(data)[:-8]
586 with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f:
587 self.assertRaises(EOFError, f.read)
588 with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f:
589 self.assertEqual(f.read(len(data)), data)
590 self.assertRaises(EOFError, f.read, 1)
591 # Incomplete 10-byte header.
592 for i in range(2, 10):
593 with gzip.GzipFile(fileobj=io.BytesIO(truncated[:i])) as f:
594 self.assertRaises(EOFError, f.read, 1)
595
596 def test_read_with_extra(self):
597 # Gzip data with an extra field
598 gzdata = (b'\x1f\x8b\x08\x04\xb2\x17cQ\x02\xff'
599 b'\x05\x00Extra'
600 b'\x0bI-.\x01\x002\xd1Mx\x04\x00\x00\x00')
601 with gzip.GzipFile(fileobj=io.BytesIO(gzdata)) as f:
602 self.assertEqual(f.read(), b'Test')
603
604 def test_prepend_error(self):
605 # See issue #20875
606 with gzip.open(self.filename, "wb") as f:
607 f.write(data1)
608 with gzip.open(self.filename, "rb") as f:
609 f._buffer.raw._fp.prepend()
610
611 def test_issue44439(self):
612 q = array.array('Q', [1, 2, 3, 4, 5])
613 LENGTH = len(q) * q.itemsize
614
615 with gzip.GzipFile(fileobj=io.BytesIO(), mode='w') as f:
616 self.assertEqual(f.write(q), LENGTH)
617 self.assertEqual(f.tell(), LENGTH)
618
619
620 class ESC[4;38;5;81mTestOpen(ESC[4;38;5;149mBaseTest):
621 def test_binary_modes(self):
622 uncompressed = data1 * 50
623
624 with gzip.open(self.filename, "wb") as f:
625 f.write(uncompressed)
626 with open(self.filename, "rb") as f:
627 file_data = gzip.decompress(f.read())
628 self.assertEqual(file_data, uncompressed)
629
630 with gzip.open(self.filename, "rb") as f:
631 self.assertEqual(f.read(), uncompressed)
632
633 with gzip.open(self.filename, "ab") as f:
634 f.write(uncompressed)
635 with open(self.filename, "rb") as f:
636 file_data = gzip.decompress(f.read())
637 self.assertEqual(file_data, uncompressed * 2)
638
639 with self.assertRaises(FileExistsError):
640 gzip.open(self.filename, "xb")
641 os_helper.unlink(self.filename)
642 with gzip.open(self.filename, "xb") as f:
643 f.write(uncompressed)
644 with open(self.filename, "rb") as f:
645 file_data = gzip.decompress(f.read())
646 self.assertEqual(file_data, uncompressed)
647
648 def test_pathlike_file(self):
649 filename = pathlib.Path(self.filename)
650 with gzip.open(filename, "wb") as f:
651 f.write(data1 * 50)
652 with gzip.open(filename, "ab") as f:
653 f.write(data1)
654 with gzip.open(filename) as f:
655 self.assertEqual(f.read(), data1 * 51)
656
657 def test_implicit_binary_modes(self):
658 # Test implicit binary modes (no "b" or "t" in mode string).
659 uncompressed = data1 * 50
660
661 with gzip.open(self.filename, "w") as f:
662 f.write(uncompressed)
663 with open(self.filename, "rb") as f:
664 file_data = gzip.decompress(f.read())
665 self.assertEqual(file_data, uncompressed)
666
667 with gzip.open(self.filename, "r") as f:
668 self.assertEqual(f.read(), uncompressed)
669
670 with gzip.open(self.filename, "a") as f:
671 f.write(uncompressed)
672 with open(self.filename, "rb") as f:
673 file_data = gzip.decompress(f.read())
674 self.assertEqual(file_data, uncompressed * 2)
675
676 with self.assertRaises(FileExistsError):
677 gzip.open(self.filename, "x")
678 os_helper.unlink(self.filename)
679 with gzip.open(self.filename, "x") as f:
680 f.write(uncompressed)
681 with open(self.filename, "rb") as f:
682 file_data = gzip.decompress(f.read())
683 self.assertEqual(file_data, uncompressed)
684
685 def test_text_modes(self):
686 uncompressed = data1.decode("ascii") * 50
687 uncompressed_raw = uncompressed.replace("\n", os.linesep)
688 with gzip.open(self.filename, "wt", encoding="ascii") as f:
689 f.write(uncompressed)
690 with open(self.filename, "rb") as f:
691 file_data = gzip.decompress(f.read()).decode("ascii")
692 self.assertEqual(file_data, uncompressed_raw)
693 with gzip.open(self.filename, "rt", encoding="ascii") as f:
694 self.assertEqual(f.read(), uncompressed)
695 with gzip.open(self.filename, "at", encoding="ascii") as f:
696 f.write(uncompressed)
697 with open(self.filename, "rb") as f:
698 file_data = gzip.decompress(f.read()).decode("ascii")
699 self.assertEqual(file_data, uncompressed_raw * 2)
700
701 def test_fileobj(self):
702 uncompressed_bytes = data1 * 50
703 uncompressed_str = uncompressed_bytes.decode("ascii")
704 compressed = gzip.compress(uncompressed_bytes)
705 with gzip.open(io.BytesIO(compressed), "r") as f:
706 self.assertEqual(f.read(), uncompressed_bytes)
707 with gzip.open(io.BytesIO(compressed), "rb") as f:
708 self.assertEqual(f.read(), uncompressed_bytes)
709 with gzip.open(io.BytesIO(compressed), "rt", encoding="ascii") as f:
710 self.assertEqual(f.read(), uncompressed_str)
711
712 def test_bad_params(self):
713 # Test invalid parameter combinations.
714 with self.assertRaises(TypeError):
715 gzip.open(123.456)
716 with self.assertRaises(ValueError):
717 gzip.open(self.filename, "wbt")
718 with self.assertRaises(ValueError):
719 gzip.open(self.filename, "xbt")
720 with self.assertRaises(ValueError):
721 gzip.open(self.filename, "rb", encoding="utf-8")
722 with self.assertRaises(ValueError):
723 gzip.open(self.filename, "rb", errors="ignore")
724 with self.assertRaises(ValueError):
725 gzip.open(self.filename, "rb", newline="\n")
726
727 def test_encoding(self):
728 # Test non-default encoding.
729 uncompressed = data1.decode("ascii") * 50
730 uncompressed_raw = uncompressed.replace("\n", os.linesep)
731 with gzip.open(self.filename, "wt", encoding="utf-16") as f:
732 f.write(uncompressed)
733 with open(self.filename, "rb") as f:
734 file_data = gzip.decompress(f.read()).decode("utf-16")
735 self.assertEqual(file_data, uncompressed_raw)
736 with gzip.open(self.filename, "rt", encoding="utf-16") as f:
737 self.assertEqual(f.read(), uncompressed)
738
739 def test_encoding_error_handler(self):
740 # Test with non-default encoding error handler.
741 with gzip.open(self.filename, "wb") as f:
742 f.write(b"foo\xffbar")
743 with gzip.open(self.filename, "rt", encoding="ascii", errors="ignore") \
744 as f:
745 self.assertEqual(f.read(), "foobar")
746
747 def test_newline(self):
748 # Test with explicit newline (universal newline mode disabled).
749 uncompressed = data1.decode("ascii") * 50
750 with gzip.open(self.filename, "wt", encoding="ascii", newline="\n") as f:
751 f.write(uncompressed)
752 with gzip.open(self.filename, "rt", encoding="ascii", newline="\r") as f:
753 self.assertEqual(f.readlines(), [uncompressed])
754
755
756 def create_and_remove_directory(directory):
757 def decorator(function):
758 @functools.wraps(function)
759 def wrapper(*args, **kwargs):
760 os.makedirs(directory)
761 try:
762 return function(*args, **kwargs)
763 finally:
764 os_helper.rmtree(directory)
765 return wrapper
766 return decorator
767
768
769 class ESC[4;38;5;81mTestCommandLine(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
770 data = b'This is a simple test with gzip'
771
772 @requires_subprocess()
773 def test_decompress_stdin_stdout(self):
774 with io.BytesIO() as bytes_io:
775 with gzip.GzipFile(fileobj=bytes_io, mode='wb') as gzip_file:
776 gzip_file.write(self.data)
777
778 args = sys.executable, '-m', 'gzip', '-d'
779 with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc:
780 out, err = proc.communicate(bytes_io.getvalue())
781
782 self.assertEqual(err, b'')
783 self.assertEqual(out, self.data)
784
785 @create_and_remove_directory(TEMPDIR)
786 def test_decompress_infile_outfile(self):
787 gzipname = os.path.join(TEMPDIR, 'testgzip.gz')
788 self.assertFalse(os.path.exists(gzipname))
789
790 with gzip.open(gzipname, mode='wb') as fp:
791 fp.write(self.data)
792 rc, out, err = assert_python_ok('-m', 'gzip', '-d', gzipname)
793
794 with open(os.path.join(TEMPDIR, "testgzip"), "rb") as gunziped:
795 self.assertEqual(gunziped.read(), self.data)
796
797 self.assertTrue(os.path.exists(gzipname))
798 self.assertEqual(rc, 0)
799 self.assertEqual(out, b'')
800 self.assertEqual(err, b'')
801
802 def test_decompress_infile_outfile_error(self):
803 rc, out, err = assert_python_failure('-m', 'gzip', '-d', 'thisisatest.out')
804 self.assertEqual(b"filename doesn't end in .gz: 'thisisatest.out'", err.strip())
805 self.assertEqual(rc, 1)
806 self.assertEqual(out, b'')
807
808 @requires_subprocess()
809 @create_and_remove_directory(TEMPDIR)
810 def test_compress_stdin_outfile(self):
811 args = sys.executable, '-m', 'gzip'
812 with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc:
813 out, err = proc.communicate(self.data)
814
815 self.assertEqual(err, b'')
816 self.assertEqual(out[:2], b"\x1f\x8b")
817
818 @create_and_remove_directory(TEMPDIR)
819 def test_compress_infile_outfile_default(self):
820 local_testgzip = os.path.join(TEMPDIR, 'testgzip')
821 gzipname = local_testgzip + '.gz'
822 self.assertFalse(os.path.exists(gzipname))
823
824 with open(local_testgzip, 'wb') as fp:
825 fp.write(self.data)
826
827 rc, out, err = assert_python_ok('-m', 'gzip', local_testgzip)
828
829 self.assertTrue(os.path.exists(gzipname))
830 self.assertEqual(out, b'')
831 self.assertEqual(err, b'')
832
833 @create_and_remove_directory(TEMPDIR)
834 def test_compress_infile_outfile(self):
835 for compress_level in ('--fast', '--best'):
836 with self.subTest(compress_level=compress_level):
837 local_testgzip = os.path.join(TEMPDIR, 'testgzip')
838 gzipname = local_testgzip + '.gz'
839 self.assertFalse(os.path.exists(gzipname))
840
841 with open(local_testgzip, 'wb') as fp:
842 fp.write(self.data)
843
844 rc, out, err = assert_python_ok('-m', 'gzip', compress_level, local_testgzip)
845
846 self.assertTrue(os.path.exists(gzipname))
847 self.assertEqual(out, b'')
848 self.assertEqual(err, b'')
849 os.remove(gzipname)
850 self.assertFalse(os.path.exists(gzipname))
851
852 def test_compress_fast_best_are_exclusive(self):
853 rc, out, err = assert_python_failure('-m', 'gzip', '--fast', '--best')
854 self.assertIn(b"error: argument --best: not allowed with argument --fast", err)
855 self.assertEqual(out, b'')
856
857 def test_decompress_cannot_have_flags_compression(self):
858 rc, out, err = assert_python_failure('-m', 'gzip', '--fast', '-d')
859 self.assertIn(b'error: argument -d/--decompress: not allowed with argument --fast', err)
860 self.assertEqual(out, b'')
861
862
863 if __name__ == "__main__":
864 unittest.main()