python (3.11.7)
1 from test.support import (
2 requires, _2G, _4G, gc_collect, cpython_only, is_emscripten
3 )
4 from test.support.import_helper import import_module
5 from test.support.os_helper import TESTFN, unlink
6 import unittest
7 import os
8 import re
9 import itertools
10 import random
11 import socket
12 import string
13 import sys
14 import weakref
15
16 # Skip test if we can't import mmap.
17 mmap = import_module('mmap')
18
19 PAGESIZE = mmap.PAGESIZE
20
21 tagname_prefix = f'python_{os.getpid()}_test_mmap'
22 def random_tagname(length=10):
23 suffix = ''.join(random.choices(string.ascii_uppercase, k=length))
24 return f'{tagname_prefix}_{suffix}'
25
26 # Python's mmap module dup()s the file descriptor. Emscripten's FS layer
27 # does not materialize file changes through a dupped fd to a new mmap.
28 if is_emscripten:
29 raise unittest.SkipTest("incompatible with Emscripten's mmap emulation.")
30
31
32 class ESC[4;38;5;81mMmapTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
33
34 def setUp(self):
35 if os.path.exists(TESTFN):
36 os.unlink(TESTFN)
37
38 def tearDown(self):
39 try:
40 os.unlink(TESTFN)
41 except OSError:
42 pass
43
44 def test_basic(self):
45 # Test mmap module on Unix systems and Windows
46
47 # Create a file to be mmap'ed.
48 f = open(TESTFN, 'bw+')
49 try:
50 # Write 2 pages worth of data to the file
51 f.write(b'\0'* PAGESIZE)
52 f.write(b'foo')
53 f.write(b'\0'* (PAGESIZE-3) )
54 f.flush()
55 m = mmap.mmap(f.fileno(), 2 * PAGESIZE)
56 finally:
57 f.close()
58
59 # Simple sanity checks
60
61 tp = str(type(m)) # SF bug 128713: segfaulted on Linux
62 self.assertEqual(m.find(b'foo'), PAGESIZE)
63
64 self.assertEqual(len(m), 2*PAGESIZE)
65
66 self.assertEqual(m[0], 0)
67 self.assertEqual(m[0:3], b'\0\0\0')
68
69 # Shouldn't crash on boundary (Issue #5292)
70 self.assertRaises(IndexError, m.__getitem__, len(m))
71 self.assertRaises(IndexError, m.__setitem__, len(m), b'\0')
72
73 # Modify the file's content
74 m[0] = b'3'[0]
75 m[PAGESIZE +3: PAGESIZE +3+3] = b'bar'
76
77 # Check that the modification worked
78 self.assertEqual(m[0], b'3'[0])
79 self.assertEqual(m[0:3], b'3\0\0')
80 self.assertEqual(m[PAGESIZE-1 : PAGESIZE + 7], b'\0foobar\0')
81
82 m.flush()
83
84 # Test doing a regular expression match in an mmap'ed file
85 match = re.search(b'[A-Za-z]+', m)
86 if match is None:
87 self.fail('regex match on mmap failed!')
88 else:
89 start, end = match.span(0)
90 length = end - start
91
92 self.assertEqual(start, PAGESIZE)
93 self.assertEqual(end, PAGESIZE + 6)
94
95 # test seeking around (try to overflow the seek implementation)
96 m.seek(0,0)
97 self.assertEqual(m.tell(), 0)
98 m.seek(42,1)
99 self.assertEqual(m.tell(), 42)
100 m.seek(0,2)
101 self.assertEqual(m.tell(), len(m))
102
103 # Try to seek to negative position...
104 self.assertRaises(ValueError, m.seek, -1)
105
106 # Try to seek beyond end of mmap...
107 self.assertRaises(ValueError, m.seek, 1, 2)
108
109 # Try to seek to negative position...
110 self.assertRaises(ValueError, m.seek, -len(m)-1, 2)
111
112 # Try resizing map
113 try:
114 m.resize(512)
115 except SystemError:
116 # resize() not supported
117 # No messages are printed, since the output of this test suite
118 # would then be different across platforms.
119 pass
120 else:
121 # resize() is supported
122 self.assertEqual(len(m), 512)
123 # Check that we can no longer seek beyond the new size.
124 self.assertRaises(ValueError, m.seek, 513, 0)
125
126 # Check that the underlying file is truncated too
127 # (bug #728515)
128 f = open(TESTFN, 'rb')
129 try:
130 f.seek(0, 2)
131 self.assertEqual(f.tell(), 512)
132 finally:
133 f.close()
134 self.assertEqual(m.size(), 512)
135
136 m.close()
137
138 def test_access_parameter(self):
139 # Test for "access" keyword parameter
140 mapsize = 10
141 with open(TESTFN, "wb") as fp:
142 fp.write(b"a"*mapsize)
143 with open(TESTFN, "rb") as f:
144 m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_READ)
145 self.assertEqual(m[:], b'a'*mapsize, "Readonly memory map data incorrect.")
146
147 # Ensuring that readonly mmap can't be slice assigned
148 try:
149 m[:] = b'b'*mapsize
150 except TypeError:
151 pass
152 else:
153 self.fail("Able to write to readonly memory map")
154
155 # Ensuring that readonly mmap can't be item assigned
156 try:
157 m[0] = b'b'
158 except TypeError:
159 pass
160 else:
161 self.fail("Able to write to readonly memory map")
162
163 # Ensuring that readonly mmap can't be write() to
164 try:
165 m.seek(0,0)
166 m.write(b'abc')
167 except TypeError:
168 pass
169 else:
170 self.fail("Able to write to readonly memory map")
171
172 # Ensuring that readonly mmap can't be write_byte() to
173 try:
174 m.seek(0,0)
175 m.write_byte(b'd')
176 except TypeError:
177 pass
178 else:
179 self.fail("Able to write to readonly memory map")
180
181 # Ensuring that readonly mmap can't be resized
182 try:
183 m.resize(2*mapsize)
184 except SystemError: # resize is not universally supported
185 pass
186 except TypeError:
187 pass
188 else:
189 self.fail("Able to resize readonly memory map")
190 with open(TESTFN, "rb") as fp:
191 self.assertEqual(fp.read(), b'a'*mapsize,
192 "Readonly memory map data file was modified")
193
194 # Opening mmap with size too big
195 with open(TESTFN, "r+b") as f:
196 try:
197 m = mmap.mmap(f.fileno(), mapsize+1)
198 except ValueError:
199 # we do not expect a ValueError on Windows
200 # CAUTION: This also changes the size of the file on disk, and
201 # later tests assume that the length hasn't changed. We need to
202 # repair that.
203 if sys.platform.startswith('win'):
204 self.fail("Opening mmap with size+1 should work on Windows.")
205 else:
206 # we expect a ValueError on Unix, but not on Windows
207 if not sys.platform.startswith('win'):
208 self.fail("Opening mmap with size+1 should raise ValueError.")
209 m.close()
210 if sys.platform.startswith('win'):
211 # Repair damage from the resizing test.
212 with open(TESTFN, 'r+b') as f:
213 f.truncate(mapsize)
214
215 # Opening mmap with access=ACCESS_WRITE
216 with open(TESTFN, "r+b") as f:
217 m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_WRITE)
218 # Modifying write-through memory map
219 m[:] = b'c'*mapsize
220 self.assertEqual(m[:], b'c'*mapsize,
221 "Write-through memory map memory not updated properly.")
222 m.flush()
223 m.close()
224 with open(TESTFN, 'rb') as f:
225 stuff = f.read()
226 self.assertEqual(stuff, b'c'*mapsize,
227 "Write-through memory map data file not updated properly.")
228
229 # Opening mmap with access=ACCESS_COPY
230 with open(TESTFN, "r+b") as f:
231 m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_COPY)
232 # Modifying copy-on-write memory map
233 m[:] = b'd'*mapsize
234 self.assertEqual(m[:], b'd' * mapsize,
235 "Copy-on-write memory map data not written correctly.")
236 m.flush()
237 with open(TESTFN, "rb") as fp:
238 self.assertEqual(fp.read(), b'c'*mapsize,
239 "Copy-on-write test data file should not be modified.")
240 # Ensuring copy-on-write maps cannot be resized
241 self.assertRaises(TypeError, m.resize, 2*mapsize)
242 m.close()
243
244 # Ensuring invalid access parameter raises exception
245 with open(TESTFN, "r+b") as f:
246 self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize, access=4)
247
248 if os.name == "posix":
249 # Try incompatible flags, prot and access parameters.
250 with open(TESTFN, "r+b") as f:
251 self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize,
252 flags=mmap.MAP_PRIVATE,
253 prot=mmap.PROT_READ, access=mmap.ACCESS_WRITE)
254
255 # Try writing with PROT_EXEC and without PROT_WRITE
256 prot = mmap.PROT_READ | getattr(mmap, 'PROT_EXEC', 0)
257 with open(TESTFN, "r+b") as f:
258 try:
259 m = mmap.mmap(f.fileno(), mapsize, prot=prot)
260 except PermissionError:
261 # on macOS 14, PROT_READ | PROT_EXEC is not allowed
262 pass
263 else:
264 self.assertRaises(TypeError, m.write, b"abcdef")
265 self.assertRaises(TypeError, m.write_byte, 0)
266 m.close()
267
268 def test_bad_file_desc(self):
269 # Try opening a bad file descriptor...
270 self.assertRaises(OSError, mmap.mmap, -2, 4096)
271
272 def test_tougher_find(self):
273 # Do a tougher .find() test. SF bug 515943 pointed out that, in 2.2,
274 # searching for data with embedded \0 bytes didn't work.
275 with open(TESTFN, 'wb+') as f:
276
277 data = b'aabaac\x00deef\x00\x00aa\x00'
278 n = len(data)
279 f.write(data)
280 f.flush()
281 m = mmap.mmap(f.fileno(), n)
282
283 for start in range(n+1):
284 for finish in range(start, n+1):
285 slice = data[start : finish]
286 self.assertEqual(m.find(slice), data.find(slice))
287 self.assertEqual(m.find(slice + b'x'), -1)
288 m.close()
289
290 def test_find_end(self):
291 # test the new 'end' parameter works as expected
292 with open(TESTFN, 'wb+') as f:
293 data = b'one two ones'
294 n = len(data)
295 f.write(data)
296 f.flush()
297 m = mmap.mmap(f.fileno(), n)
298
299 self.assertEqual(m.find(b'one'), 0)
300 self.assertEqual(m.find(b'ones'), 8)
301 self.assertEqual(m.find(b'one', 0, -1), 0)
302 self.assertEqual(m.find(b'one', 1), 8)
303 self.assertEqual(m.find(b'one', 1, -1), 8)
304 self.assertEqual(m.find(b'one', 1, -2), -1)
305 self.assertEqual(m.find(bytearray(b'one')), 0)
306
307 for i in range(-n-1, n+1):
308 for j in range(-n-1, n+1):
309 for p in [b"o", b"on", b"two", b"ones", b"s"]:
310 expected = data.find(p, i, j)
311 self.assertEqual(m.find(p, i, j), expected, (p, i, j))
312
313 def test_find_does_not_access_beyond_buffer(self):
314 try:
315 flags = mmap.MAP_PRIVATE | mmap.MAP_ANONYMOUS
316 PAGESIZE = mmap.PAGESIZE
317 PROT_NONE = 0
318 PROT_READ = mmap.PROT_READ
319 except AttributeError as e:
320 raise unittest.SkipTest("mmap flags unavailable") from e
321 for i in range(0, 2049):
322 with mmap.mmap(-1, PAGESIZE * (i + 1),
323 flags=flags, prot=PROT_NONE) as guard:
324 with mmap.mmap(-1, PAGESIZE * (i + 2048),
325 flags=flags, prot=PROT_READ) as fm:
326 fm.find(b"fo", -2)
327
328
329 def test_rfind(self):
330 # test the new 'end' parameter works as expected
331 with open(TESTFN, 'wb+') as f:
332 data = b'one two ones'
333 n = len(data)
334 f.write(data)
335 f.flush()
336 m = mmap.mmap(f.fileno(), n)
337
338 self.assertEqual(m.rfind(b'one'), 8)
339 self.assertEqual(m.rfind(b'one '), 0)
340 self.assertEqual(m.rfind(b'one', 0, -1), 8)
341 self.assertEqual(m.rfind(b'one', 0, -2), 0)
342 self.assertEqual(m.rfind(b'one', 1, -1), 8)
343 self.assertEqual(m.rfind(b'one', 1, -2), -1)
344 self.assertEqual(m.rfind(bytearray(b'one')), 8)
345
346
347 def test_double_close(self):
348 # make sure a double close doesn't crash on Solaris (Bug# 665913)
349 with open(TESTFN, 'wb+') as f:
350 f.write(2**16 * b'a') # Arbitrary character
351
352 with open(TESTFN, 'rb') as f:
353 mf = mmap.mmap(f.fileno(), 2**16, access=mmap.ACCESS_READ)
354 mf.close()
355 mf.close()
356
357 def test_entire_file(self):
358 # test mapping of entire file by passing 0 for map length
359 with open(TESTFN, "wb+") as f:
360 f.write(2**16 * b'm') # Arbitrary character
361
362 with open(TESTFN, "rb+") as f, \
363 mmap.mmap(f.fileno(), 0) as mf:
364 self.assertEqual(len(mf), 2**16, "Map size should equal file size.")
365 self.assertEqual(mf.read(2**16), 2**16 * b"m")
366
367 def test_length_0_offset(self):
368 # Issue #10916: test mapping of remainder of file by passing 0 for
369 # map length with an offset doesn't cause a segfault.
370 # NOTE: allocation granularity is currently 65536 under Win64,
371 # and therefore the minimum offset alignment.
372 with open(TESTFN, "wb") as f:
373 f.write((65536 * 2) * b'm') # Arbitrary character
374
375 with open(TESTFN, "rb") as f:
376 with mmap.mmap(f.fileno(), 0, offset=65536, access=mmap.ACCESS_READ) as mf:
377 self.assertRaises(IndexError, mf.__getitem__, 80000)
378
379 def test_length_0_large_offset(self):
380 # Issue #10959: test mapping of a file by passing 0 for
381 # map length with a large offset doesn't cause a segfault.
382 with open(TESTFN, "wb") as f:
383 f.write(115699 * b'm') # Arbitrary character
384
385 with open(TESTFN, "w+b") as f:
386 self.assertRaises(ValueError, mmap.mmap, f.fileno(), 0,
387 offset=2147418112)
388
389 def test_move(self):
390 # make move works everywhere (64-bit format problem earlier)
391 with open(TESTFN, 'wb+') as f:
392
393 f.write(b"ABCDEabcde") # Arbitrary character
394 f.flush()
395
396 mf = mmap.mmap(f.fileno(), 10)
397 mf.move(5, 0, 5)
398 self.assertEqual(mf[:], b"ABCDEABCDE", "Map move should have duplicated front 5")
399 mf.close()
400
401 # more excessive test
402 data = b"0123456789"
403 for dest in range(len(data)):
404 for src in range(len(data)):
405 for count in range(len(data) - max(dest, src)):
406 expected = data[:dest] + data[src:src+count] + data[dest+count:]
407 m = mmap.mmap(-1, len(data))
408 m[:] = data
409 m.move(dest, src, count)
410 self.assertEqual(m[:], expected)
411 m.close()
412
413 # segfault test (Issue 5387)
414 m = mmap.mmap(-1, 100)
415 offsets = [-100, -1, 0, 1, 100]
416 for source, dest, size in itertools.product(offsets, offsets, offsets):
417 try:
418 m.move(source, dest, size)
419 except ValueError:
420 pass
421
422 offsets = [(-1, -1, -1), (-1, -1, 0), (-1, 0, -1), (0, -1, -1),
423 (-1, 0, 0), (0, -1, 0), (0, 0, -1)]
424 for source, dest, size in offsets:
425 self.assertRaises(ValueError, m.move, source, dest, size)
426
427 m.close()
428
429 m = mmap.mmap(-1, 1) # single byte
430 self.assertRaises(ValueError, m.move, 0, 0, 2)
431 self.assertRaises(ValueError, m.move, 1, 0, 1)
432 self.assertRaises(ValueError, m.move, 0, 1, 1)
433 m.move(0, 0, 1)
434 m.move(0, 0, 0)
435
436 def test_anonymous(self):
437 # anonymous mmap.mmap(-1, PAGE)
438 m = mmap.mmap(-1, PAGESIZE)
439 for x in range(PAGESIZE):
440 self.assertEqual(m[x], 0,
441 "anonymously mmap'ed contents should be zero")
442
443 for x in range(PAGESIZE):
444 b = x & 0xff
445 m[x] = b
446 self.assertEqual(m[x], b)
447
448 def test_read_all(self):
449 m = mmap.mmap(-1, 16)
450 self.addCleanup(m.close)
451
452 # With no parameters, or None or a negative argument, reads all
453 m.write(bytes(range(16)))
454 m.seek(0)
455 self.assertEqual(m.read(), bytes(range(16)))
456 m.seek(8)
457 self.assertEqual(m.read(), bytes(range(8, 16)))
458 m.seek(16)
459 self.assertEqual(m.read(), b'')
460 m.seek(3)
461 self.assertEqual(m.read(None), bytes(range(3, 16)))
462 m.seek(4)
463 self.assertEqual(m.read(-1), bytes(range(4, 16)))
464 m.seek(5)
465 self.assertEqual(m.read(-2), bytes(range(5, 16)))
466 m.seek(9)
467 self.assertEqual(m.read(-42), bytes(range(9, 16)))
468
469 def test_read_invalid_arg(self):
470 m = mmap.mmap(-1, 16)
471 self.addCleanup(m.close)
472
473 self.assertRaises(TypeError, m.read, 'foo')
474 self.assertRaises(TypeError, m.read, 5.5)
475 self.assertRaises(TypeError, m.read, [1, 2, 3])
476
477 def test_extended_getslice(self):
478 # Test extended slicing by comparing with list slicing.
479 s = bytes(reversed(range(256)))
480 m = mmap.mmap(-1, len(s))
481 m[:] = s
482 self.assertEqual(m[:], s)
483 indices = (0, None, 1, 3, 19, 300, sys.maxsize, -1, -2, -31, -300)
484 for start in indices:
485 for stop in indices:
486 # Skip step 0 (invalid)
487 for step in indices[1:]:
488 self.assertEqual(m[start:stop:step],
489 s[start:stop:step])
490
491 def test_extended_set_del_slice(self):
492 # Test extended slicing by comparing with list slicing.
493 s = bytes(reversed(range(256)))
494 m = mmap.mmap(-1, len(s))
495 indices = (0, None, 1, 3, 19, 300, sys.maxsize, -1, -2, -31, -300)
496 for start in indices:
497 for stop in indices:
498 # Skip invalid step 0
499 for step in indices[1:]:
500 m[:] = s
501 self.assertEqual(m[:], s)
502 L = list(s)
503 # Make sure we have a slice of exactly the right length,
504 # but with different data.
505 data = L[start:stop:step]
506 data = bytes(reversed(data))
507 L[start:stop:step] = data
508 m[start:stop:step] = data
509 self.assertEqual(m[:], bytes(L))
510
511 def make_mmap_file (self, f, halfsize):
512 # Write 2 pages worth of data to the file
513 f.write (b'\0' * halfsize)
514 f.write (b'foo')
515 f.write (b'\0' * (halfsize - 3))
516 f.flush ()
517 return mmap.mmap (f.fileno(), 0)
518
519 def test_empty_file (self):
520 f = open (TESTFN, 'w+b')
521 f.close()
522 with open(TESTFN, "rb") as f :
523 self.assertRaisesRegex(ValueError,
524 "cannot mmap an empty file",
525 mmap.mmap, f.fileno(), 0,
526 access=mmap.ACCESS_READ)
527
528 def test_offset (self):
529 f = open (TESTFN, 'w+b')
530
531 try: # unlink TESTFN no matter what
532 halfsize = mmap.ALLOCATIONGRANULARITY
533 m = self.make_mmap_file (f, halfsize)
534 m.close ()
535 f.close ()
536
537 mapsize = halfsize * 2
538 # Try invalid offset
539 f = open(TESTFN, "r+b")
540 for offset in [-2, -1, None]:
541 try:
542 m = mmap.mmap(f.fileno(), mapsize, offset=offset)
543 self.assertEqual(0, 1)
544 except (ValueError, TypeError, OverflowError):
545 pass
546 else:
547 self.assertEqual(0, 0)
548 f.close()
549
550 # Try valid offset, hopefully 8192 works on all OSes
551 f = open(TESTFN, "r+b")
552 m = mmap.mmap(f.fileno(), mapsize - halfsize, offset=halfsize)
553 self.assertEqual(m[0:3], b'foo')
554 f.close()
555
556 # Try resizing map
557 try:
558 m.resize(512)
559 except SystemError:
560 pass
561 else:
562 # resize() is supported
563 self.assertEqual(len(m), 512)
564 # Check that we can no longer seek beyond the new size.
565 self.assertRaises(ValueError, m.seek, 513, 0)
566 # Check that the content is not changed
567 self.assertEqual(m[0:3], b'foo')
568
569 # Check that the underlying file is truncated too
570 f = open(TESTFN, 'rb')
571 f.seek(0, 2)
572 self.assertEqual(f.tell(), halfsize + 512)
573 f.close()
574 self.assertEqual(m.size(), halfsize + 512)
575
576 m.close()
577
578 finally:
579 f.close()
580 try:
581 os.unlink(TESTFN)
582 except OSError:
583 pass
584
585 def test_subclass(self):
586 class ESC[4;38;5;81manon_mmap(ESC[4;38;5;149mmmapESC[4;38;5;149m.ESC[4;38;5;149mmmap):
587 def __new__(klass, *args, **kwargs):
588 return mmap.mmap.__new__(klass, -1, *args, **kwargs)
589 anon_mmap(PAGESIZE)
590
591 @unittest.skipUnless(hasattr(mmap, 'PROT_READ'), "needs mmap.PROT_READ")
592 def test_prot_readonly(self):
593 mapsize = 10
594 with open(TESTFN, "wb") as fp:
595 fp.write(b"a"*mapsize)
596 with open(TESTFN, "rb") as f:
597 m = mmap.mmap(f.fileno(), mapsize, prot=mmap.PROT_READ)
598 self.assertRaises(TypeError, m.write, "foo")
599
600 def test_error(self):
601 self.assertIs(mmap.error, OSError)
602
603 def test_io_methods(self):
604 data = b"0123456789"
605 with open(TESTFN, "wb") as fp:
606 fp.write(b"x"*len(data))
607 with open(TESTFN, "r+b") as f:
608 m = mmap.mmap(f.fileno(), len(data))
609 # Test write_byte()
610 for i in range(len(data)):
611 self.assertEqual(m.tell(), i)
612 m.write_byte(data[i])
613 self.assertEqual(m.tell(), i+1)
614 self.assertRaises(ValueError, m.write_byte, b"x"[0])
615 self.assertEqual(m[:], data)
616 # Test read_byte()
617 m.seek(0)
618 for i in range(len(data)):
619 self.assertEqual(m.tell(), i)
620 self.assertEqual(m.read_byte(), data[i])
621 self.assertEqual(m.tell(), i+1)
622 self.assertRaises(ValueError, m.read_byte)
623 # Test read()
624 m.seek(3)
625 self.assertEqual(m.read(3), b"345")
626 self.assertEqual(m.tell(), 6)
627 # Test write()
628 m.seek(3)
629 m.write(b"bar")
630 self.assertEqual(m.tell(), 6)
631 self.assertEqual(m[:], b"012bar6789")
632 m.write(bytearray(b"baz"))
633 self.assertEqual(m.tell(), 9)
634 self.assertEqual(m[:], b"012barbaz9")
635 self.assertRaises(ValueError, m.write, b"ba")
636
637 def test_non_ascii_byte(self):
638 for b in (129, 200, 255): # > 128
639 m = mmap.mmap(-1, 1)
640 m.write_byte(b)
641 self.assertEqual(m[0], b)
642 m.seek(0)
643 self.assertEqual(m.read_byte(), b)
644 m.close()
645
646 @unittest.skipUnless(os.name == 'nt', 'requires Windows')
647 def test_tagname(self):
648 data1 = b"0123456789"
649 data2 = b"abcdefghij"
650 assert len(data1) == len(data2)
651 tagname1 = random_tagname()
652 tagname2 = random_tagname()
653
654 # Test same tag
655 m1 = mmap.mmap(-1, len(data1), tagname=tagname1)
656 m1[:] = data1
657 m2 = mmap.mmap(-1, len(data2), tagname=tagname1)
658 m2[:] = data2
659 self.assertEqual(m1[:], data2)
660 self.assertEqual(m2[:], data2)
661 m2.close()
662 m1.close()
663
664 # Test different tag
665 m1 = mmap.mmap(-1, len(data1), tagname=tagname1)
666 m1[:] = data1
667 m2 = mmap.mmap(-1, len(data2), tagname=tagname2)
668 m2[:] = data2
669 self.assertEqual(m1[:], data1)
670 self.assertEqual(m2[:], data2)
671 m2.close()
672 m1.close()
673
674 @cpython_only
675 @unittest.skipUnless(os.name == 'nt', 'requires Windows')
676 def test_sizeof(self):
677 m1 = mmap.mmap(-1, 100)
678 tagname = random_tagname()
679 m2 = mmap.mmap(-1, 100, tagname=tagname)
680 self.assertEqual(sys.getsizeof(m2),
681 sys.getsizeof(m1) + len(tagname) + 1)
682
683 @unittest.skipUnless(os.name == 'nt', 'requires Windows')
684 def test_crasher_on_windows(self):
685 # Should not crash (Issue 1733986)
686 tagname = random_tagname()
687 m = mmap.mmap(-1, 1000, tagname=tagname)
688 try:
689 mmap.mmap(-1, 5000, tagname=tagname)[:] # same tagname, but larger size
690 except:
691 pass
692 m.close()
693
694 # Should not crash (Issue 5385)
695 with open(TESTFN, "wb") as fp:
696 fp.write(b"x"*10)
697 f = open(TESTFN, "r+b")
698 m = mmap.mmap(f.fileno(), 0)
699 f.close()
700 try:
701 m.resize(0) # will raise OSError
702 except:
703 pass
704 try:
705 m[:]
706 except:
707 pass
708 m.close()
709
710 @unittest.skipUnless(os.name == 'nt', 'requires Windows')
711 def test_invalid_descriptor(self):
712 # socket file descriptors are valid, but out of range
713 # for _get_osfhandle, causing a crash when validating the
714 # parameters to _get_osfhandle.
715 s = socket.socket()
716 try:
717 with self.assertRaises(OSError):
718 m = mmap.mmap(s.fileno(), 10)
719 finally:
720 s.close()
721
722 def test_context_manager(self):
723 with mmap.mmap(-1, 10) as m:
724 self.assertFalse(m.closed)
725 self.assertTrue(m.closed)
726
727 def test_context_manager_exception(self):
728 # Test that the OSError gets passed through
729 with self.assertRaises(Exception) as exc:
730 with mmap.mmap(-1, 10) as m:
731 raise OSError
732 self.assertIsInstance(exc.exception, OSError,
733 "wrong exception raised in context manager")
734 self.assertTrue(m.closed, "context manager failed")
735
736 def test_weakref(self):
737 # Check mmap objects are weakrefable
738 mm = mmap.mmap(-1, 16)
739 wr = weakref.ref(mm)
740 self.assertIs(wr(), mm)
741 del mm
742 gc_collect()
743 self.assertIs(wr(), None)
744
745 def test_write_returning_the_number_of_bytes_written(self):
746 mm = mmap.mmap(-1, 16)
747 self.assertEqual(mm.write(b""), 0)
748 self.assertEqual(mm.write(b"x"), 1)
749 self.assertEqual(mm.write(b"yz"), 2)
750 self.assertEqual(mm.write(b"python"), 6)
751
752 def test_resize_past_pos(self):
753 m = mmap.mmap(-1, 8192)
754 self.addCleanup(m.close)
755 m.read(5000)
756 try:
757 m.resize(4096)
758 except SystemError:
759 self.skipTest("resizing not supported")
760 self.assertEqual(m.read(14), b'')
761 self.assertRaises(ValueError, m.read_byte)
762 self.assertRaises(ValueError, m.write_byte, 42)
763 self.assertRaises(ValueError, m.write, b'abc')
764
765 def test_concat_repeat_exception(self):
766 m = mmap.mmap(-1, 16)
767 with self.assertRaises(TypeError):
768 m + m
769 with self.assertRaises(TypeError):
770 m * 2
771
772 def test_flush_return_value(self):
773 # mm.flush() should return None on success, raise an
774 # exception on error under all platforms.
775 mm = mmap.mmap(-1, 16)
776 self.addCleanup(mm.close)
777 mm.write(b'python')
778 result = mm.flush()
779 self.assertIsNone(result)
780 if sys.platform.startswith('linux'):
781 # 'offset' must be a multiple of mmap.PAGESIZE on Linux.
782 # See bpo-34754 for details.
783 self.assertRaises(OSError, mm.flush, 1, len(b'python'))
784
785 def test_repr(self):
786 open_mmap_repr_pat = re.compile(
787 r"<mmap.mmap closed=False, "
788 r"access=(?P<access>\S+), "
789 r"length=(?P<length>\d+), "
790 r"pos=(?P<pos>\d+), "
791 r"offset=(?P<offset>\d+)>")
792 closed_mmap_repr_pat = re.compile(r"<mmap.mmap closed=True>")
793 mapsizes = (50, 100, 1_000, 1_000_000, 10_000_000)
794 offsets = tuple((mapsize // 2 // mmap.ALLOCATIONGRANULARITY)
795 * mmap.ALLOCATIONGRANULARITY for mapsize in mapsizes)
796 for offset, mapsize in zip(offsets, mapsizes):
797 data = b'a' * mapsize
798 length = mapsize - offset
799 accesses = ('ACCESS_DEFAULT', 'ACCESS_READ',
800 'ACCESS_COPY', 'ACCESS_WRITE')
801 positions = (0, length//10, length//5, length//4)
802 with open(TESTFN, "wb+") as fp:
803 fp.write(data)
804 fp.flush()
805 for access, pos in itertools.product(accesses, positions):
806 accint = getattr(mmap, access)
807 with mmap.mmap(fp.fileno(),
808 length,
809 access=accint,
810 offset=offset) as mm:
811 mm.seek(pos)
812 match = open_mmap_repr_pat.match(repr(mm))
813 self.assertIsNotNone(match)
814 self.assertEqual(match.group('access'), access)
815 self.assertEqual(match.group('length'), str(length))
816 self.assertEqual(match.group('pos'), str(pos))
817 self.assertEqual(match.group('offset'), str(offset))
818 match = closed_mmap_repr_pat.match(repr(mm))
819 self.assertIsNotNone(match)
820
821 @unittest.skipUnless(hasattr(mmap.mmap, 'madvise'), 'needs madvise')
822 def test_madvise(self):
823 size = 2 * PAGESIZE
824 m = mmap.mmap(-1, size)
825
826 with self.assertRaisesRegex(ValueError, "madvise start out of bounds"):
827 m.madvise(mmap.MADV_NORMAL, size)
828 with self.assertRaisesRegex(ValueError, "madvise start out of bounds"):
829 m.madvise(mmap.MADV_NORMAL, -1)
830 with self.assertRaisesRegex(ValueError, "madvise length invalid"):
831 m.madvise(mmap.MADV_NORMAL, 0, -1)
832 with self.assertRaisesRegex(OverflowError, "madvise length too large"):
833 m.madvise(mmap.MADV_NORMAL, PAGESIZE, sys.maxsize)
834 self.assertEqual(m.madvise(mmap.MADV_NORMAL), None)
835 self.assertEqual(m.madvise(mmap.MADV_NORMAL, PAGESIZE), None)
836 self.assertEqual(m.madvise(mmap.MADV_NORMAL, PAGESIZE, size), None)
837 self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, 2), None)
838 self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, size), None)
839
840 @unittest.skipUnless(os.name == 'nt', 'requires Windows')
841 def test_resize_up_when_mapped_to_pagefile(self):
842 """If the mmap is backed by the pagefile ensure a resize up can happen
843 and that the original data is still in place
844 """
845 start_size = PAGESIZE
846 new_size = 2 * start_size
847 data = bytes(random.getrandbits(8) for _ in range(start_size))
848
849 m = mmap.mmap(-1, start_size)
850 m[:] = data
851 m.resize(new_size)
852 self.assertEqual(len(m), new_size)
853 self.assertEqual(m[:start_size], data[:start_size])
854
855 @unittest.skipUnless(os.name == 'nt', 'requires Windows')
856 def test_resize_down_when_mapped_to_pagefile(self):
857 """If the mmap is backed by the pagefile ensure a resize down up can happen
858 and that a truncated form of the original data is still in place
859 """
860 start_size = PAGESIZE
861 new_size = start_size // 2
862 data = bytes(random.getrandbits(8) for _ in range(start_size))
863
864 m = mmap.mmap(-1, start_size)
865 m[:] = data
866 m.resize(new_size)
867 self.assertEqual(len(m), new_size)
868 self.assertEqual(m[:new_size], data[:new_size])
869
870 @unittest.skipUnless(os.name == 'nt', 'requires Windows')
871 def test_resize_fails_if_mapping_held_elsewhere(self):
872 """If more than one mapping is held against a named file on Windows, neither
873 mapping can be resized
874 """
875 start_size = 2 * PAGESIZE
876 reduced_size = PAGESIZE
877
878 f = open(TESTFN, 'wb+')
879 f.truncate(start_size)
880 try:
881 m1 = mmap.mmap(f.fileno(), start_size)
882 m2 = mmap.mmap(f.fileno(), start_size)
883 with self.assertRaises(OSError):
884 m1.resize(reduced_size)
885 with self.assertRaises(OSError):
886 m2.resize(reduced_size)
887 m2.close()
888 m1.resize(reduced_size)
889 self.assertEqual(m1.size(), reduced_size)
890 self.assertEqual(os.stat(f.fileno()).st_size, reduced_size)
891 finally:
892 f.close()
893
894 @unittest.skipUnless(os.name == 'nt', 'requires Windows')
895 def test_resize_succeeds_with_error_for_second_named_mapping(self):
896 """If a more than one mapping exists of the same name, none of them can
897 be resized: they'll raise an Exception and leave the original mapping intact
898 """
899 start_size = 2 * PAGESIZE
900 reduced_size = PAGESIZE
901 tagname = random_tagname()
902 data_length = 8
903 data = bytes(random.getrandbits(8) for _ in range(data_length))
904
905 m1 = mmap.mmap(-1, start_size, tagname=tagname)
906 m2 = mmap.mmap(-1, start_size, tagname=tagname)
907 m1[:data_length] = data
908 self.assertEqual(m2[:data_length], data)
909 with self.assertRaises(OSError):
910 m1.resize(reduced_size)
911 self.assertEqual(m1.size(), start_size)
912 self.assertEqual(m1[:data_length], data)
913 self.assertEqual(m2[:data_length], data)
914
915 def test_mmap_closed_by_int_scenarios(self):
916 """
917 gh-103987: Test that mmap objects raise ValueError
918 for closed mmap files
919 """
920
921 class ESC[4;38;5;81mMmapClosedByIntContext:
922 def __init__(self, access) -> None:
923 self.access = access
924
925 def __enter__(self):
926 self.f = open(TESTFN, "w+b")
927 self.f.write(random.randbytes(100))
928 self.f.flush()
929
930 m = mmap.mmap(self.f.fileno(), 100, access=self.access)
931
932 class ESC[4;38;5;81mX:
933 def __index__(self):
934 m.close()
935 return 10
936
937 return (m, X)
938
939 def __exit__(self, exc_type, exc_value, traceback):
940 self.f.close()
941
942 read_access_modes = [
943 mmap.ACCESS_READ,
944 mmap.ACCESS_WRITE,
945 mmap.ACCESS_COPY,
946 mmap.ACCESS_DEFAULT,
947 ]
948
949 write_access_modes = [
950 mmap.ACCESS_WRITE,
951 mmap.ACCESS_COPY,
952 mmap.ACCESS_DEFAULT,
953 ]
954
955 for access in read_access_modes:
956 with MmapClosedByIntContext(access) as (m, X):
957 with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
958 m[X()]
959
960 with MmapClosedByIntContext(access) as (m, X):
961 with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
962 m[X() : 20]
963
964 with MmapClosedByIntContext(access) as (m, X):
965 with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
966 m[X() : 20 : 2]
967
968 with MmapClosedByIntContext(access) as (m, X):
969 with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
970 m[20 : X() : -2]
971
972 with MmapClosedByIntContext(access) as (m, X):
973 with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
974 m.read(X())
975
976 with MmapClosedByIntContext(access) as (m, X):
977 with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
978 m.find(b"1", 1, X())
979
980 for access in write_access_modes:
981 with MmapClosedByIntContext(access) as (m, X):
982 with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
983 m[X() : 20] = b"1" * 10
984
985 with MmapClosedByIntContext(access) as (m, X):
986 with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
987 m[X() : 20 : 2] = b"1" * 5
988
989 with MmapClosedByIntContext(access) as (m, X):
990 with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
991 m[20 : X() : -2] = b"1" * 5
992
993 with MmapClosedByIntContext(access) as (m, X):
994 with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
995 m.move(1, 2, X())
996
997 with MmapClosedByIntContext(access) as (m, X):
998 with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
999 m.write_byte(X())
1000
1001 class ESC[4;38;5;81mLargeMmapTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1002
1003 def setUp(self):
1004 unlink(TESTFN)
1005
1006 def tearDown(self):
1007 unlink(TESTFN)
1008
1009 def _make_test_file(self, num_zeroes, tail):
1010 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
1011 requires('largefile',
1012 'test requires %s bytes and a long time to run' % str(0x180000000))
1013 f = open(TESTFN, 'w+b')
1014 try:
1015 f.seek(num_zeroes)
1016 f.write(tail)
1017 f.flush()
1018 except (OSError, OverflowError, ValueError):
1019 try:
1020 f.close()
1021 except (OSError, OverflowError):
1022 pass
1023 raise unittest.SkipTest("filesystem does not have largefile support")
1024 return f
1025
1026 def test_large_offset(self):
1027 with self._make_test_file(0x14FFFFFFF, b" ") as f:
1028 with mmap.mmap(f.fileno(), 0, offset=0x140000000, access=mmap.ACCESS_READ) as m:
1029 self.assertEqual(m[0xFFFFFFF], 32)
1030
1031 def test_large_filesize(self):
1032 with self._make_test_file(0x17FFFFFFF, b" ") as f:
1033 if sys.maxsize < 0x180000000:
1034 # On 32 bit platforms the file is larger than sys.maxsize so
1035 # mapping the whole file should fail -- Issue #16743
1036 with self.assertRaises(OverflowError):
1037 mmap.mmap(f.fileno(), 0x180000000, access=mmap.ACCESS_READ)
1038 with self.assertRaises(ValueError):
1039 mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
1040 with mmap.mmap(f.fileno(), 0x10000, access=mmap.ACCESS_READ) as m:
1041 self.assertEqual(m.size(), 0x180000000)
1042
1043 # Issue 11277: mmap() with large (~4 GiB) sparse files crashes on OS X.
1044
1045 def _test_around_boundary(self, boundary):
1046 tail = b' DEARdear '
1047 start = boundary - len(tail) // 2
1048 end = start + len(tail)
1049 with self._make_test_file(start, tail) as f:
1050 with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as m:
1051 self.assertEqual(m[start:end], tail)
1052
1053 @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems")
1054 def test_around_2GB(self):
1055 self._test_around_boundary(_2G)
1056
1057 @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems")
1058 def test_around_4GB(self):
1059 self._test_around_boundary(_4G)
1060
1061
1062 if __name__ == '__main__':
1063 unittest.main()