1 import unittest
2 from test import support
3 from test.support import import_helper
4 import binascii
5 import copy
6 import os
7 import pickle
8 import random
9 import sys
10 from test.support import bigmemtest, _1G, _4G
11
12
13 zlib = import_helper.import_module('zlib')
14
15 requires_Compress_copy = unittest.skipUnless(
16 hasattr(zlib.compressobj(), "copy"),
17 'requires Compress.copy()')
18 requires_Decompress_copy = unittest.skipUnless(
19 hasattr(zlib.decompressobj(), "copy"),
20 'requires Decompress.copy()')
21
22 # bpo-46623: On s390x, when a hardware accelerator is used, using different
23 # ways to compress data with zlib can produce different compressed data.
24 # Simplified test_pair() code:
25 #
26 # def func1(data):
27 # return zlib.compress(data)
28 #
29 # def func2(data)
30 # co = zlib.compressobj()
31 # x1 = co.compress(data)
32 # x2 = co.flush()
33 # return x1 + x2
34 #
35 # On s390x if zlib uses a hardware accelerator, func1() creates a single
36 # "final" compressed block whereas func2() produces 3 compressed blocks (the
37 # last one is a final block). On other platforms with no accelerator, func1()
38 # and func2() produce the same compressed data made of a single (final)
39 # compressed block.
40 #
41 # Only the compressed data is different, the decompression returns the original
42 # data:
43 #
44 # zlib.decompress(func1(data)) == zlib.decompress(func2(data)) == data
45 #
46 # Make the assumption that s390x always has an accelerator to simplify the skip
47 # condition. Windows doesn't have os.uname() but it doesn't support s390x.
48 skip_on_s390x = unittest.skipIf(hasattr(os, 'uname') and os.uname().machine == 's390x',
49 'skipped on s390x')
50
51
52 class ESC[4;38;5;81mVersionTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
53
54 def test_library_version(self):
55 # Test that the major version of the actual library in use matches the
56 # major version that we were compiled against. We can't guarantee that
57 # the minor versions will match (even on the machine on which the module
58 # was compiled), and the API is stable between minor versions, so
59 # testing only the major versions avoids spurious failures.
60 self.assertEqual(zlib.ZLIB_RUNTIME_VERSION[0], zlib.ZLIB_VERSION[0])
61
62
63 class ESC[4;38;5;81mChecksumTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
64 # checksum test cases
65 def test_crc32start(self):
66 self.assertEqual(zlib.crc32(b""), zlib.crc32(b"", 0))
67 self.assertTrue(zlib.crc32(b"abc", 0xffffffff))
68
69 def test_crc32empty(self):
70 self.assertEqual(zlib.crc32(b"", 0), 0)
71 self.assertEqual(zlib.crc32(b"", 1), 1)
72 self.assertEqual(zlib.crc32(b"", 432), 432)
73
74 def test_adler32start(self):
75 self.assertEqual(zlib.adler32(b""), zlib.adler32(b"", 1))
76 self.assertTrue(zlib.adler32(b"abc", 0xffffffff))
77
78 def test_adler32empty(self):
79 self.assertEqual(zlib.adler32(b"", 0), 0)
80 self.assertEqual(zlib.adler32(b"", 1), 1)
81 self.assertEqual(zlib.adler32(b"", 432), 432)
82
83 def test_penguins(self):
84 self.assertEqual(zlib.crc32(b"penguin", 0), 0x0e5c1a120)
85 self.assertEqual(zlib.crc32(b"penguin", 1), 0x43b6aa94)
86 self.assertEqual(zlib.adler32(b"penguin", 0), 0x0bcf02f6)
87 self.assertEqual(zlib.adler32(b"penguin", 1), 0x0bd602f7)
88
89 self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0))
90 self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1))
91
92 def test_crc32_adler32_unsigned(self):
93 foo = b'abcdefghijklmnop'
94 # explicitly test signed behavior
95 self.assertEqual(zlib.crc32(foo), 2486878355)
96 self.assertEqual(zlib.crc32(b'spam'), 1138425661)
97 self.assertEqual(zlib.adler32(foo+foo), 3573550353)
98 self.assertEqual(zlib.adler32(b'spam'), 72286642)
99
100 def test_same_as_binascii_crc32(self):
101 foo = b'abcdefghijklmnop'
102 crc = 2486878355
103 self.assertEqual(binascii.crc32(foo), crc)
104 self.assertEqual(zlib.crc32(foo), crc)
105 self.assertEqual(binascii.crc32(b'spam'), zlib.crc32(b'spam'))
106
107
108 # Issue #10276 - check that inputs >=4 GiB are handled correctly.
109 class ESC[4;38;5;81mChecksumBigBufferTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
110
111 @bigmemtest(size=_4G + 4, memuse=1, dry_run=False)
112 def test_big_buffer(self, size):
113 data = b"nyan" * (_1G + 1)
114 self.assertEqual(zlib.crc32(data), 1044521549)
115 self.assertEqual(zlib.adler32(data), 2256789997)
116
117
118 class ESC[4;38;5;81mExceptionTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
119 # make sure we generate some expected errors
120 def test_badlevel(self):
121 # specifying compression level out of range causes an error
122 # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
123 # accepts 0 too)
124 self.assertRaises(zlib.error, zlib.compress, b'ERROR', 10)
125
126 def test_badargs(self):
127 self.assertRaises(TypeError, zlib.adler32)
128 self.assertRaises(TypeError, zlib.crc32)
129 self.assertRaises(TypeError, zlib.compress)
130 self.assertRaises(TypeError, zlib.decompress)
131 for arg in (42, None, '', 'abc', (), []):
132 self.assertRaises(TypeError, zlib.adler32, arg)
133 self.assertRaises(TypeError, zlib.crc32, arg)
134 self.assertRaises(TypeError, zlib.compress, arg)
135 self.assertRaises(TypeError, zlib.decompress, arg)
136
137 def test_badcompressobj(self):
138 # verify failure on building compress object with bad params
139 self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
140 # specifying total bits too large causes an error
141 self.assertRaises(ValueError,
142 zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
143
144 def test_baddecompressobj(self):
145 # verify failure on building decompress object with bad params
146 self.assertRaises(ValueError, zlib.decompressobj, -1)
147
148 def test_decompressobj_badflush(self):
149 # verify failure on calling decompressobj.flush with bad params
150 self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
151 self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
152
153 @support.cpython_only
154 def test_overflow(self):
155 with self.assertRaisesRegex(OverflowError, 'int too large'):
156 zlib.decompress(b'', 15, sys.maxsize + 1)
157 with self.assertRaisesRegex(OverflowError, 'int too large'):
158 zlib.decompressobj().decompress(b'', sys.maxsize + 1)
159 with self.assertRaisesRegex(OverflowError, 'int too large'):
160 zlib.decompressobj().flush(sys.maxsize + 1)
161
162 @support.cpython_only
163 def test_disallow_instantiation(self):
164 # Ensure that the type disallows instantiation (bpo-43916)
165 support.check_disallow_instantiation(self, type(zlib.compressobj()))
166 support.check_disallow_instantiation(self, type(zlib.decompressobj()))
167
168
169 class ESC[4;38;5;81mBaseCompressTestCase(ESC[4;38;5;149mobject):
170 def check_big_compress_buffer(self, size, compress_func):
171 _1M = 1024 * 1024
172 # Generate 10 MiB worth of random, and expand it by repeating it.
173 # The assumption is that zlib's memory is not big enough to exploit
174 # such spread out redundancy.
175 data = random.randbytes(_1M * 10)
176 data = data * (size // len(data) + 1)
177 try:
178 compress_func(data)
179 finally:
180 # Release memory
181 data = None
182
183 def check_big_decompress_buffer(self, size, decompress_func):
184 data = b'x' * size
185 try:
186 compressed = zlib.compress(data, 1)
187 finally:
188 # Release memory
189 data = None
190 data = decompress_func(compressed)
191 # Sanity check
192 try:
193 self.assertEqual(len(data), size)
194 self.assertEqual(len(data.strip(b'x')), 0)
195 finally:
196 data = None
197
198
199 class ESC[4;38;5;81mCompressTestCase(ESC[4;38;5;149mBaseCompressTestCase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
200 # Test compression in one go (whole message compression)
201 def test_speech(self):
202 x = zlib.compress(HAMLET_SCENE)
203 self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
204
205 def test_keywords(self):
206 x = zlib.compress(HAMLET_SCENE, level=3)
207 self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
208 with self.assertRaises(TypeError):
209 zlib.compress(data=HAMLET_SCENE, level=3)
210 self.assertEqual(zlib.decompress(x,
211 wbits=zlib.MAX_WBITS,
212 bufsize=zlib.DEF_BUF_SIZE),
213 HAMLET_SCENE)
214
215 @skip_on_s390x
216 def test_speech128(self):
217 # compress more data
218 data = HAMLET_SCENE * 128
219 x = zlib.compress(data)
220 self.assertEqual(zlib.compress(bytearray(data)), x)
221 for ob in x, bytearray(x):
222 self.assertEqual(zlib.decompress(ob), data)
223
224 def test_incomplete_stream(self):
225 # A useful error message is given
226 x = zlib.compress(HAMLET_SCENE)
227 self.assertRaisesRegex(zlib.error,
228 "Error -5 while decompressing data: incomplete or truncated stream",
229 zlib.decompress, x[:-1])
230
231 # Memory use of the following functions takes into account overallocation
232
233 @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
234 def test_big_compress_buffer(self, size):
235 compress = lambda s: zlib.compress(s, 1)
236 self.check_big_compress_buffer(size, compress)
237
238 @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
239 def test_big_decompress_buffer(self, size):
240 self.check_big_decompress_buffer(size, zlib.decompress)
241
242 @bigmemtest(size=_4G, memuse=1)
243 def test_large_bufsize(self, size):
244 # Test decompress(bufsize) parameter greater than the internal limit
245 data = HAMLET_SCENE * 10
246 compressed = zlib.compress(data, 1)
247 self.assertEqual(zlib.decompress(compressed, 15, size), data)
248
249 def test_custom_bufsize(self):
250 data = HAMLET_SCENE * 10
251 compressed = zlib.compress(data, 1)
252 self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data)
253
254 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
255 @bigmemtest(size=_4G + 100, memuse=4)
256 def test_64bit_compress(self, size):
257 data = b'x' * size
258 try:
259 comp = zlib.compress(data, 0)
260 self.assertEqual(zlib.decompress(comp), data)
261 finally:
262 comp = data = None
263
264
265 class ESC[4;38;5;81mCompressObjectTestCase(ESC[4;38;5;149mBaseCompressTestCase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
266 # Test compression object
267 @skip_on_s390x
268 def test_pair(self):
269 # straightforward compress/decompress objects
270 datasrc = HAMLET_SCENE * 128
271 datazip = zlib.compress(datasrc)
272 # should compress both bytes and bytearray data
273 for data in (datasrc, bytearray(datasrc)):
274 co = zlib.compressobj()
275 x1 = co.compress(data)
276 x2 = co.flush()
277 self.assertRaises(zlib.error, co.flush) # second flush should not work
278 self.assertEqual(x1 + x2, datazip)
279 for v1, v2 in ((x1, x2), (bytearray(x1), bytearray(x2))):
280 dco = zlib.decompressobj()
281 y1 = dco.decompress(v1 + v2)
282 y2 = dco.flush()
283 self.assertEqual(data, y1 + y2)
284 self.assertIsInstance(dco.unconsumed_tail, bytes)
285 self.assertIsInstance(dco.unused_data, bytes)
286
287 def test_keywords(self):
288 level = 2
289 method = zlib.DEFLATED
290 wbits = -12
291 memLevel = 9
292 strategy = zlib.Z_FILTERED
293 co = zlib.compressobj(level=level,
294 method=method,
295 wbits=wbits,
296 memLevel=memLevel,
297 strategy=strategy,
298 zdict=b"")
299 do = zlib.decompressobj(wbits=wbits, zdict=b"")
300 with self.assertRaises(TypeError):
301 co.compress(data=HAMLET_SCENE)
302 with self.assertRaises(TypeError):
303 do.decompress(data=zlib.compress(HAMLET_SCENE))
304 x = co.compress(HAMLET_SCENE) + co.flush()
305 y = do.decompress(x, max_length=len(HAMLET_SCENE)) + do.flush()
306 self.assertEqual(HAMLET_SCENE, y)
307
308 def test_compressoptions(self):
309 # specify lots of options to compressobj()
310 level = 2
311 method = zlib.DEFLATED
312 wbits = -12
313 memLevel = 9
314 strategy = zlib.Z_FILTERED
315 co = zlib.compressobj(level, method, wbits, memLevel, strategy)
316 x1 = co.compress(HAMLET_SCENE)
317 x2 = co.flush()
318 dco = zlib.decompressobj(wbits)
319 y1 = dco.decompress(x1 + x2)
320 y2 = dco.flush()
321 self.assertEqual(HAMLET_SCENE, y1 + y2)
322
323 def test_compressincremental(self):
324 # compress object in steps, decompress object as one-shot
325 data = HAMLET_SCENE * 128
326 co = zlib.compressobj()
327 bufs = []
328 for i in range(0, len(data), 256):
329 bufs.append(co.compress(data[i:i+256]))
330 bufs.append(co.flush())
331 combuf = b''.join(bufs)
332
333 dco = zlib.decompressobj()
334 y1 = dco.decompress(b''.join(bufs))
335 y2 = dco.flush()
336 self.assertEqual(data, y1 + y2)
337
338 def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
339 # compress object in steps, decompress object in steps
340 source = source or HAMLET_SCENE
341 data = source * 128
342 co = zlib.compressobj()
343 bufs = []
344 for i in range(0, len(data), cx):
345 bufs.append(co.compress(data[i:i+cx]))
346 bufs.append(co.flush())
347 combuf = b''.join(bufs)
348
349 decombuf = zlib.decompress(combuf)
350 # Test type of return value
351 self.assertIsInstance(decombuf, bytes)
352
353 self.assertEqual(data, decombuf)
354
355 dco = zlib.decompressobj()
356 bufs = []
357 for i in range(0, len(combuf), dcx):
358 bufs.append(dco.decompress(combuf[i:i+dcx]))
359 self.assertEqual(b'', dco.unconsumed_tail, ########
360 "(A) uct should be b'': not %d long" %
361 len(dco.unconsumed_tail))
362 self.assertEqual(b'', dco.unused_data)
363 if flush:
364 bufs.append(dco.flush())
365 else:
366 while True:
367 chunk = dco.decompress(b'')
368 if chunk:
369 bufs.append(chunk)
370 else:
371 break
372 self.assertEqual(b'', dco.unconsumed_tail, ########
373 "(B) uct should be b'': not %d long" %
374 len(dco.unconsumed_tail))
375 self.assertEqual(b'', dco.unused_data)
376 self.assertEqual(data, b''.join(bufs))
377 # Failure means: "decompressobj with init options failed"
378
379 def test_decompincflush(self):
380 self.test_decompinc(flush=True)
381
382 def test_decompimax(self, source=None, cx=256, dcx=64):
383 # compress in steps, decompress in length-restricted steps
384 source = source or HAMLET_SCENE
385 # Check a decompression object with max_length specified
386 data = source * 128
387 co = zlib.compressobj()
388 bufs = []
389 for i in range(0, len(data), cx):
390 bufs.append(co.compress(data[i:i+cx]))
391 bufs.append(co.flush())
392 combuf = b''.join(bufs)
393 self.assertEqual(data, zlib.decompress(combuf),
394 'compressed data failure')
395
396 dco = zlib.decompressobj()
397 bufs = []
398 cb = combuf
399 while cb:
400 #max_length = 1 + len(cb)//10
401 chunk = dco.decompress(cb, dcx)
402 self.assertFalse(len(chunk) > dcx,
403 'chunk too big (%d>%d)' % (len(chunk), dcx))
404 bufs.append(chunk)
405 cb = dco.unconsumed_tail
406 bufs.append(dco.flush())
407 self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
408
409 def test_decompressmaxlen(self, flush=False):
410 # Check a decompression object with max_length specified
411 data = HAMLET_SCENE * 128
412 co = zlib.compressobj()
413 bufs = []
414 for i in range(0, len(data), 256):
415 bufs.append(co.compress(data[i:i+256]))
416 bufs.append(co.flush())
417 combuf = b''.join(bufs)
418 self.assertEqual(data, zlib.decompress(combuf),
419 'compressed data failure')
420
421 dco = zlib.decompressobj()
422 bufs = []
423 cb = combuf
424 while cb:
425 max_length = 1 + len(cb)//10
426 chunk = dco.decompress(cb, max_length)
427 self.assertFalse(len(chunk) > max_length,
428 'chunk too big (%d>%d)' % (len(chunk),max_length))
429 bufs.append(chunk)
430 cb = dco.unconsumed_tail
431 if flush:
432 bufs.append(dco.flush())
433 else:
434 while chunk:
435 chunk = dco.decompress(b'', max_length)
436 self.assertFalse(len(chunk) > max_length,
437 'chunk too big (%d>%d)' % (len(chunk),max_length))
438 bufs.append(chunk)
439 self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
440
441 def test_decompressmaxlenflush(self):
442 self.test_decompressmaxlen(flush=True)
443
444 def test_maxlenmisc(self):
445 # Misc tests of max_length
446 dco = zlib.decompressobj()
447 self.assertRaises(ValueError, dco.decompress, b"", -1)
448 self.assertEqual(b'', dco.unconsumed_tail)
449
450 def test_maxlen_large(self):
451 # Sizes up to sys.maxsize should be accepted, although zlib is
452 # internally limited to expressing sizes with unsigned int
453 data = HAMLET_SCENE * 10
454 self.assertGreater(len(data), zlib.DEF_BUF_SIZE)
455 compressed = zlib.compress(data, 1)
456 dco = zlib.decompressobj()
457 self.assertEqual(dco.decompress(compressed, sys.maxsize), data)
458
459 def test_maxlen_custom(self):
460 data = HAMLET_SCENE * 10
461 compressed = zlib.compress(data, 1)
462 dco = zlib.decompressobj()
463 self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100])
464
465 def test_clear_unconsumed_tail(self):
466 # Issue #12050: calling decompress() without providing max_length
467 # should clear the unconsumed_tail attribute.
468 cdata = b"x\x9cKLJ\x06\x00\x02M\x01" # "abc"
469 dco = zlib.decompressobj()
470 ddata = dco.decompress(cdata, 1)
471 ddata += dco.decompress(dco.unconsumed_tail)
472 self.assertEqual(dco.unconsumed_tail, b"")
473
474 def test_flushes(self):
475 # Test flush() with the various options, using all the
476 # different levels in order to provide more variations.
477 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH',
478 'Z_PARTIAL_FLUSH']
479
480 ver = tuple(int(v) for v in zlib.ZLIB_RUNTIME_VERSION.split('.'))
481 # Z_BLOCK has a known failure prior to 1.2.5.3
482 if ver >= (1, 2, 5, 3):
483 sync_opt.append('Z_BLOCK')
484
485 sync_opt = [getattr(zlib, opt) for opt in sync_opt
486 if hasattr(zlib, opt)]
487 data = HAMLET_SCENE * 8
488
489 for sync in sync_opt:
490 for level in range(10):
491 try:
492 obj = zlib.compressobj( level )
493 a = obj.compress( data[:3000] )
494 b = obj.flush( sync )
495 c = obj.compress( data[3000:] )
496 d = obj.flush()
497 except:
498 print("Error for flush mode={}, level={}"
499 .format(sync, level))
500 raise
501 self.assertEqual(zlib.decompress(b''.join([a,b,c,d])),
502 data, ("Decompress failed: flush "
503 "mode=%i, level=%i") % (sync, level))
504 del obj
505
506 @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'),
507 'requires zlib.Z_SYNC_FLUSH')
508 def test_odd_flush(self):
509 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
510 import random
511 # Testing on 17K of "random" data
512
513 # Create compressor and decompressor objects
514 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
515 dco = zlib.decompressobj()
516
517 # Try 17K of data
518 # generate random data stream
519 data = random.randbytes(17 * 1024)
520
521 # compress, sync-flush, and decompress
522 first = co.compress(data)
523 second = co.flush(zlib.Z_SYNC_FLUSH)
524 expanded = dco.decompress(first + second)
525
526 # if decompressed data is different from the input data, choke.
527 self.assertEqual(expanded, data, "17K random source doesn't match")
528
529 def test_empty_flush(self):
530 # Test that calling .flush() on unused objects works.
531 # (Bug #1083110 -- calling .flush() on decompress objects
532 # caused a core dump.)
533
534 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
535 self.assertTrue(co.flush()) # Returns a zlib header
536 dco = zlib.decompressobj()
537 self.assertEqual(dco.flush(), b"") # Returns nothing
538
539 def test_dictionary(self):
540 h = HAMLET_SCENE
541 # Build a simulated dictionary out of the words in HAMLET.
542 words = h.split()
543 random.shuffle(words)
544 zdict = b''.join(words)
545 # Use it to compress HAMLET.
546 co = zlib.compressobj(zdict=zdict)
547 cd = co.compress(h) + co.flush()
548 # Verify that it will decompress with the dictionary.
549 dco = zlib.decompressobj(zdict=zdict)
550 self.assertEqual(dco.decompress(cd) + dco.flush(), h)
551 # Verify that it fails when not given the dictionary.
552 dco = zlib.decompressobj()
553 self.assertRaises(zlib.error, dco.decompress, cd)
554
555 def test_dictionary_streaming(self):
556 # This simulates the reuse of a compressor object for compressing
557 # several separate data streams.
558 co = zlib.compressobj(zdict=HAMLET_SCENE)
559 do = zlib.decompressobj(zdict=HAMLET_SCENE)
560 piece = HAMLET_SCENE[1000:1500]
561 d0 = co.compress(piece) + co.flush(zlib.Z_SYNC_FLUSH)
562 d1 = co.compress(piece[100:]) + co.flush(zlib.Z_SYNC_FLUSH)
563 d2 = co.compress(piece[:-100]) + co.flush(zlib.Z_SYNC_FLUSH)
564 self.assertEqual(do.decompress(d0), piece)
565 self.assertEqual(do.decompress(d1), piece[100:])
566 self.assertEqual(do.decompress(d2), piece[:-100])
567
568 def test_decompress_incomplete_stream(self):
569 # This is 'foo', deflated
570 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
571 # For the record
572 self.assertEqual(zlib.decompress(x), b'foo')
573 self.assertRaises(zlib.error, zlib.decompress, x[:-5])
574 # Omitting the stream end works with decompressor objects
575 # (see issue #8672).
576 dco = zlib.decompressobj()
577 y = dco.decompress(x[:-5])
578 y += dco.flush()
579 self.assertEqual(y, b'foo')
580
581 def test_decompress_eof(self):
582 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo'
583 dco = zlib.decompressobj()
584 self.assertFalse(dco.eof)
585 dco.decompress(x[:-5])
586 self.assertFalse(dco.eof)
587 dco.decompress(x[-5:])
588 self.assertTrue(dco.eof)
589 dco.flush()
590 self.assertTrue(dco.eof)
591
592 def test_decompress_eof_incomplete_stream(self):
593 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo'
594 dco = zlib.decompressobj()
595 self.assertFalse(dco.eof)
596 dco.decompress(x[:-5])
597 self.assertFalse(dco.eof)
598 dco.flush()
599 self.assertFalse(dco.eof)
600
601 def test_decompress_unused_data(self):
602 # Repeated calls to decompress() after EOF should accumulate data in
603 # dco.unused_data, instead of just storing the arg to the last call.
604 source = b'abcdefghijklmnopqrstuvwxyz'
605 remainder = b'0123456789'
606 y = zlib.compress(source)
607 x = y + remainder
608 for maxlen in 0, 1000:
609 for step in 1, 2, len(y), len(x):
610 dco = zlib.decompressobj()
611 data = b''
612 for i in range(0, len(x), step):
613 if i < len(y):
614 self.assertEqual(dco.unused_data, b'')
615 if maxlen == 0:
616 data += dco.decompress(x[i : i + step])
617 self.assertEqual(dco.unconsumed_tail, b'')
618 else:
619 data += dco.decompress(
620 dco.unconsumed_tail + x[i : i + step], maxlen)
621 data += dco.flush()
622 self.assertTrue(dco.eof)
623 self.assertEqual(data, source)
624 self.assertEqual(dco.unconsumed_tail, b'')
625 self.assertEqual(dco.unused_data, remainder)
626
627 # issue27164
628 def test_decompress_raw_with_dictionary(self):
629 zdict = b'abcdefghijklmnopqrstuvwxyz'
630 co = zlib.compressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
631 comp = co.compress(zdict) + co.flush()
632 dco = zlib.decompressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
633 uncomp = dco.decompress(comp) + dco.flush()
634 self.assertEqual(zdict, uncomp)
635
636 def test_flush_with_freed_input(self):
637 # Issue #16411: decompressor accesses input to last decompress() call
638 # in flush(), even if this object has been freed in the meanwhile.
639 input1 = b'abcdefghijklmnopqrstuvwxyz'
640 input2 = b'QWERTYUIOPASDFGHJKLZXCVBNM'
641 data = zlib.compress(input1)
642 dco = zlib.decompressobj()
643 dco.decompress(data, 1)
644 del data
645 data = zlib.compress(input2)
646 self.assertEqual(dco.flush(), input1[1:])
647
648 @bigmemtest(size=_4G, memuse=1)
649 def test_flush_large_length(self, size):
650 # Test flush(length) parameter greater than internal limit UINT_MAX
651 input = HAMLET_SCENE * 10
652 data = zlib.compress(input, 1)
653 dco = zlib.decompressobj()
654 dco.decompress(data, 1)
655 self.assertEqual(dco.flush(size), input[1:])
656
657 def test_flush_custom_length(self):
658 input = HAMLET_SCENE * 10
659 data = zlib.compress(input, 1)
660 dco = zlib.decompressobj()
661 dco.decompress(data, 1)
662 self.assertEqual(dco.flush(CustomInt()), input[1:])
663
664 @requires_Compress_copy
665 def test_compresscopy(self):
666 # Test copying a compression object
667 data0 = HAMLET_SCENE
668 data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii")
669 for func in lambda c: c.copy(), copy.copy, copy.deepcopy:
670 c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
671 bufs0 = []
672 bufs0.append(c0.compress(data0))
673
674 c1 = func(c0)
675 bufs1 = bufs0[:]
676
677 bufs0.append(c0.compress(data0))
678 bufs0.append(c0.flush())
679 s0 = b''.join(bufs0)
680
681 bufs1.append(c1.compress(data1))
682 bufs1.append(c1.flush())
683 s1 = b''.join(bufs1)
684
685 self.assertEqual(zlib.decompress(s0),data0+data0)
686 self.assertEqual(zlib.decompress(s1),data0+data1)
687
688 @requires_Compress_copy
689 def test_badcompresscopy(self):
690 # Test copying a compression object in an inconsistent state
691 c = zlib.compressobj()
692 c.compress(HAMLET_SCENE)
693 c.flush()
694 self.assertRaises(ValueError, c.copy)
695 self.assertRaises(ValueError, copy.copy, c)
696 self.assertRaises(ValueError, copy.deepcopy, c)
697
698 @requires_Decompress_copy
699 def test_decompresscopy(self):
700 # Test copying a decompression object
701 data = HAMLET_SCENE
702 comp = zlib.compress(data)
703 # Test type of return value
704 self.assertIsInstance(comp, bytes)
705
706 for func in lambda c: c.copy(), copy.copy, copy.deepcopy:
707 d0 = zlib.decompressobj()
708 bufs0 = []
709 bufs0.append(d0.decompress(comp[:32]))
710
711 d1 = func(d0)
712 bufs1 = bufs0[:]
713
714 bufs0.append(d0.decompress(comp[32:]))
715 s0 = b''.join(bufs0)
716
717 bufs1.append(d1.decompress(comp[32:]))
718 s1 = b''.join(bufs1)
719
720 self.assertEqual(s0,s1)
721 self.assertEqual(s0,data)
722
723 @requires_Decompress_copy
724 def test_baddecompresscopy(self):
725 # Test copying a compression object in an inconsistent state
726 data = zlib.compress(HAMLET_SCENE)
727 d = zlib.decompressobj()
728 d.decompress(data)
729 d.flush()
730 self.assertRaises(ValueError, d.copy)
731 self.assertRaises(ValueError, copy.copy, d)
732 self.assertRaises(ValueError, copy.deepcopy, d)
733
734 def test_compresspickle(self):
735 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
736 with self.assertRaises((TypeError, pickle.PicklingError)):
737 pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto)
738
739 def test_decompresspickle(self):
740 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
741 with self.assertRaises((TypeError, pickle.PicklingError)):
742 pickle.dumps(zlib.decompressobj(), proto)
743
744 # Memory use of the following functions takes into account overallocation
745
746 @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
747 def test_big_compress_buffer(self, size):
748 c = zlib.compressobj(1)
749 compress = lambda s: c.compress(s) + c.flush()
750 self.check_big_compress_buffer(size, compress)
751
752 @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
753 def test_big_decompress_buffer(self, size):
754 d = zlib.decompressobj()
755 decompress = lambda s: d.decompress(s) + d.flush()
756 self.check_big_decompress_buffer(size, decompress)
757
758 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
759 @bigmemtest(size=_4G + 100, memuse=4)
760 def test_64bit_compress(self, size):
761 data = b'x' * size
762 co = zlib.compressobj(0)
763 do = zlib.decompressobj()
764 try:
765 comp = co.compress(data) + co.flush()
766 uncomp = do.decompress(comp) + do.flush()
767 self.assertEqual(uncomp, data)
768 finally:
769 comp = uncomp = data = None
770
771 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
772 @bigmemtest(size=_4G + 100, memuse=3)
773 def test_large_unused_data(self, size):
774 data = b'abcdefghijklmnop'
775 unused = b'x' * size
776 comp = zlib.compress(data) + unused
777 do = zlib.decompressobj()
778 try:
779 uncomp = do.decompress(comp) + do.flush()
780 self.assertEqual(unused, do.unused_data)
781 self.assertEqual(uncomp, data)
782 finally:
783 unused = comp = do = None
784
785 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
786 @bigmemtest(size=_4G + 100, memuse=5)
787 def test_large_unconsumed_tail(self, size):
788 data = b'x' * size
789 do = zlib.decompressobj()
790 try:
791 comp = zlib.compress(data, 0)
792 uncomp = do.decompress(comp, 1) + do.flush()
793 self.assertEqual(uncomp, data)
794 self.assertEqual(do.unconsumed_tail, b'')
795 finally:
796 comp = uncomp = data = None
797
798 def test_wbits(self):
799 # wbits=0 only supported since zlib v1.2.3.5
800 # Register "1.2.3" as "1.2.3.0"
801 # or "1.2.0-linux","1.2.0.f","1.2.0.f-linux"
802 v = zlib.ZLIB_RUNTIME_VERSION.split('-', 1)[0].split('.')
803 if len(v) < 4:
804 v.append('0')
805 elif not v[-1].isnumeric():
806 v[-1] = '0'
807
808 v = tuple(map(int, v))
809 supports_wbits_0 = v >= (1, 2, 3, 5)
810
811 co = zlib.compressobj(level=1, wbits=15)
812 zlib15 = co.compress(HAMLET_SCENE) + co.flush()
813 self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE)
814 if supports_wbits_0:
815 self.assertEqual(zlib.decompress(zlib15, 0), HAMLET_SCENE)
816 self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE)
817 with self.assertRaisesRegex(zlib.error, 'invalid window size'):
818 zlib.decompress(zlib15, 14)
819 dco = zlib.decompressobj(wbits=32 + 15)
820 self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE)
821 dco = zlib.decompressobj(wbits=14)
822 with self.assertRaisesRegex(zlib.error, 'invalid window size'):
823 dco.decompress(zlib15)
824
825 co = zlib.compressobj(level=1, wbits=9)
826 zlib9 = co.compress(HAMLET_SCENE) + co.flush()
827 self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE)
828 self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE)
829 if supports_wbits_0:
830 self.assertEqual(zlib.decompress(zlib9, 0), HAMLET_SCENE)
831 self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE)
832 dco = zlib.decompressobj(wbits=32 + 9)
833 self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE)
834
835 co = zlib.compressobj(level=1, wbits=-15)
836 deflate15 = co.compress(HAMLET_SCENE) + co.flush()
837 self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE)
838 dco = zlib.decompressobj(wbits=-15)
839 self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE)
840
841 co = zlib.compressobj(level=1, wbits=-9)
842 deflate9 = co.compress(HAMLET_SCENE) + co.flush()
843 self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE)
844 self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE)
845 dco = zlib.decompressobj(wbits=-9)
846 self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE)
847
848 co = zlib.compressobj(level=1, wbits=16 + 15)
849 gzip = co.compress(HAMLET_SCENE) + co.flush()
850 self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE)
851 self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE)
852 dco = zlib.decompressobj(32 + 15)
853 self.assertEqual(dco.decompress(gzip), HAMLET_SCENE)
854
855 for wbits in (-15, 15, 31):
856 with self.subTest(wbits=wbits):
857 expected = HAMLET_SCENE
858 actual = zlib.decompress(
859 zlib.compress(HAMLET_SCENE, wbits=wbits), wbits=wbits
860 )
861 self.assertEqual(expected, actual)
862
863 def choose_lines(source, number, seed=None, generator=random):
864 """Return a list of number lines randomly chosen from the source"""
865 if seed is not None:
866 generator.seed(seed)
867 sources = source.split('\n')
868 return [generator.choice(sources) for n in range(number)]
869
870
871 HAMLET_SCENE = b"""
872 LAERTES
873
874 O, fear me not.
875 I stay too long: but here my father comes.
876
877 Enter POLONIUS
878
879 A double blessing is a double grace,
880 Occasion smiles upon a second leave.
881
882 LORD POLONIUS
883
884 Yet here, Laertes! aboard, aboard, for shame!
885 The wind sits in the shoulder of your sail,
886 And you are stay'd for. There; my blessing with thee!
887 And these few precepts in thy memory
888 See thou character. Give thy thoughts no tongue,
889 Nor any unproportioned thought his act.
890 Be thou familiar, but by no means vulgar.
891 Those friends thou hast, and their adoption tried,
892 Grapple them to thy soul with hoops of steel;
893 But do not dull thy palm with entertainment
894 Of each new-hatch'd, unfledged comrade. Beware
895 Of entrance to a quarrel, but being in,
896 Bear't that the opposed may beware of thee.
897 Give every man thy ear, but few thy voice;
898 Take each man's censure, but reserve thy judgment.
899 Costly thy habit as thy purse can buy,
900 But not express'd in fancy; rich, not gaudy;
901 For the apparel oft proclaims the man,
902 And they in France of the best rank and station
903 Are of a most select and generous chief in that.
904 Neither a borrower nor a lender be;
905 For loan oft loses both itself and friend,
906 And borrowing dulls the edge of husbandry.
907 This above all: to thine ownself be true,
908 And it must follow, as the night the day,
909 Thou canst not then be false to any man.
910 Farewell: my blessing season this in thee!
911
912 LAERTES
913
914 Most humbly do I take my leave, my lord.
915
916 LORD POLONIUS
917
918 The time invites you; go; your servants tend.
919
920 LAERTES
921
922 Farewell, Ophelia; and remember well
923 What I have said to you.
924
925 OPHELIA
926
927 'Tis in my memory lock'd,
928 And you yourself shall keep the key of it.
929
930 LAERTES
931
932 Farewell.
933 """
934
935
936 class ESC[4;38;5;81mCustomInt:
937 def __index__(self):
938 return 100
939
940
941 if __name__ == "__main__":
942 unittest.main()