python (3.11.7)

(root)/
lib/
python3.11/
test/
test_mmap.py
       1  from test.support import (
       2      requires, _2G, _4G, gc_collect, cpython_only, is_emscripten
       3  )
       4  from test.support.import_helper import import_module
       5  from test.support.os_helper import TESTFN, unlink
       6  import unittest
       7  import os
       8  import re
       9  import itertools
      10  import random
      11  import socket
      12  import string
      13  import sys
      14  import weakref
      15  
      16  # Skip test if we can't import mmap.
      17  mmap = import_module('mmap')
      18  
      19  PAGESIZE = mmap.PAGESIZE
      20  
      21  tagname_prefix = f'python_{os.getpid()}_test_mmap'
      22  def random_tagname(length=10):
      23      suffix = ''.join(random.choices(string.ascii_uppercase, k=length))
      24      return f'{tagname_prefix}_{suffix}'
      25  
      26  # Python's mmap module dup()s the file descriptor. Emscripten's FS layer
      27  # does not materialize file changes through a dupped fd to a new mmap.
      28  if is_emscripten:
      29      raise unittest.SkipTest("incompatible with Emscripten's mmap emulation.")
      30  
      31  
      32  class ESC[4;38;5;81mMmapTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      33  
      34      def setUp(self):
      35          if os.path.exists(TESTFN):
      36              os.unlink(TESTFN)
      37  
      38      def tearDown(self):
      39          try:
      40              os.unlink(TESTFN)
      41          except OSError:
      42              pass
      43  
      44      def test_basic(self):
      45          # Test mmap module on Unix systems and Windows
      46  
      47          # Create a file to be mmap'ed.
      48          f = open(TESTFN, 'bw+')
      49          try:
      50              # Write 2 pages worth of data to the file
      51              f.write(b'\0'* PAGESIZE)
      52              f.write(b'foo')
      53              f.write(b'\0'* (PAGESIZE-3) )
      54              f.flush()
      55              m = mmap.mmap(f.fileno(), 2 * PAGESIZE)
      56          finally:
      57              f.close()
      58  
      59          # Simple sanity checks
      60  
      61          tp = str(type(m))  # SF bug 128713:  segfaulted on Linux
      62          self.assertEqual(m.find(b'foo'), PAGESIZE)
      63  
      64          self.assertEqual(len(m), 2*PAGESIZE)
      65  
      66          self.assertEqual(m[0], 0)
      67          self.assertEqual(m[0:3], b'\0\0\0')
      68  
      69          # Shouldn't crash on boundary (Issue #5292)
      70          self.assertRaises(IndexError, m.__getitem__, len(m))
      71          self.assertRaises(IndexError, m.__setitem__, len(m), b'\0')
      72  
      73          # Modify the file's content
      74          m[0] = b'3'[0]
      75          m[PAGESIZE +3: PAGESIZE +3+3] = b'bar'
      76  
      77          # Check that the modification worked
      78          self.assertEqual(m[0], b'3'[0])
      79          self.assertEqual(m[0:3], b'3\0\0')
      80          self.assertEqual(m[PAGESIZE-1 : PAGESIZE + 7], b'\0foobar\0')
      81  
      82          m.flush()
      83  
      84          # Test doing a regular expression match in an mmap'ed file
      85          match = re.search(b'[A-Za-z]+', m)
      86          if match is None:
      87              self.fail('regex match on mmap failed!')
      88          else:
      89              start, end = match.span(0)
      90              length = end - start
      91  
      92              self.assertEqual(start, PAGESIZE)
      93              self.assertEqual(end, PAGESIZE + 6)
      94  
      95          # test seeking around (try to overflow the seek implementation)
      96          m.seek(0,0)
      97          self.assertEqual(m.tell(), 0)
      98          m.seek(42,1)
      99          self.assertEqual(m.tell(), 42)
     100          m.seek(0,2)
     101          self.assertEqual(m.tell(), len(m))
     102  
     103          # Try to seek to negative position...
     104          self.assertRaises(ValueError, m.seek, -1)
     105  
     106          # Try to seek beyond end of mmap...
     107          self.assertRaises(ValueError, m.seek, 1, 2)
     108  
     109          # Try to seek to negative position...
     110          self.assertRaises(ValueError, m.seek, -len(m)-1, 2)
     111  
     112          # Try resizing map
     113          try:
     114              m.resize(512)
     115          except SystemError:
     116              # resize() not supported
     117              # No messages are printed, since the output of this test suite
     118              # would then be different across platforms.
     119              pass
     120          else:
     121              # resize() is supported
     122              self.assertEqual(len(m), 512)
     123              # Check that we can no longer seek beyond the new size.
     124              self.assertRaises(ValueError, m.seek, 513, 0)
     125  
     126              # Check that the underlying file is truncated too
     127              # (bug #728515)
     128              f = open(TESTFN, 'rb')
     129              try:
     130                  f.seek(0, 2)
     131                  self.assertEqual(f.tell(), 512)
     132              finally:
     133                  f.close()
     134              self.assertEqual(m.size(), 512)
     135  
     136          m.close()
     137  
     138      def test_access_parameter(self):
     139          # Test for "access" keyword parameter
     140          mapsize = 10
     141          with open(TESTFN, "wb") as fp:
     142              fp.write(b"a"*mapsize)
     143          with open(TESTFN, "rb") as f:
     144              m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_READ)
     145              self.assertEqual(m[:], b'a'*mapsize, "Readonly memory map data incorrect.")
     146  
     147              # Ensuring that readonly mmap can't be slice assigned
     148              try:
     149                  m[:] = b'b'*mapsize
     150              except TypeError:
     151                  pass
     152              else:
     153                  self.fail("Able to write to readonly memory map")
     154  
     155              # Ensuring that readonly mmap can't be item assigned
     156              try:
     157                  m[0] = b'b'
     158              except TypeError:
     159                  pass
     160              else:
     161                  self.fail("Able to write to readonly memory map")
     162  
     163              # Ensuring that readonly mmap can't be write() to
     164              try:
     165                  m.seek(0,0)
     166                  m.write(b'abc')
     167              except TypeError:
     168                  pass
     169              else:
     170                  self.fail("Able to write to readonly memory map")
     171  
     172              # Ensuring that readonly mmap can't be write_byte() to
     173              try:
     174                  m.seek(0,0)
     175                  m.write_byte(b'd')
     176              except TypeError:
     177                  pass
     178              else:
     179                  self.fail("Able to write to readonly memory map")
     180  
     181              # Ensuring that readonly mmap can't be resized
     182              try:
     183                  m.resize(2*mapsize)
     184              except SystemError:   # resize is not universally supported
     185                  pass
     186              except TypeError:
     187                  pass
     188              else:
     189                  self.fail("Able to resize readonly memory map")
     190              with open(TESTFN, "rb") as fp:
     191                  self.assertEqual(fp.read(), b'a'*mapsize,
     192                                   "Readonly memory map data file was modified")
     193  
     194          # Opening mmap with size too big
     195          with open(TESTFN, "r+b") as f:
     196              try:
     197                  m = mmap.mmap(f.fileno(), mapsize+1)
     198              except ValueError:
     199                  # we do not expect a ValueError on Windows
     200                  # CAUTION:  This also changes the size of the file on disk, and
     201                  # later tests assume that the length hasn't changed.  We need to
     202                  # repair that.
     203                  if sys.platform.startswith('win'):
     204                      self.fail("Opening mmap with size+1 should work on Windows.")
     205              else:
     206                  # we expect a ValueError on Unix, but not on Windows
     207                  if not sys.platform.startswith('win'):
     208                      self.fail("Opening mmap with size+1 should raise ValueError.")
     209                  m.close()
     210              if sys.platform.startswith('win'):
     211                  # Repair damage from the resizing test.
     212                  with open(TESTFN, 'r+b') as f:
     213                      f.truncate(mapsize)
     214  
     215          # Opening mmap with access=ACCESS_WRITE
     216          with open(TESTFN, "r+b") as f:
     217              m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_WRITE)
     218              # Modifying write-through memory map
     219              m[:] = b'c'*mapsize
     220              self.assertEqual(m[:], b'c'*mapsize,
     221                     "Write-through memory map memory not updated properly.")
     222              m.flush()
     223              m.close()
     224          with open(TESTFN, 'rb') as f:
     225              stuff = f.read()
     226          self.assertEqual(stuff, b'c'*mapsize,
     227                 "Write-through memory map data file not updated properly.")
     228  
     229          # Opening mmap with access=ACCESS_COPY
     230          with open(TESTFN, "r+b") as f:
     231              m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_COPY)
     232              # Modifying copy-on-write memory map
     233              m[:] = b'd'*mapsize
     234              self.assertEqual(m[:], b'd' * mapsize,
     235                               "Copy-on-write memory map data not written correctly.")
     236              m.flush()
     237              with open(TESTFN, "rb") as fp:
     238                  self.assertEqual(fp.read(), b'c'*mapsize,
     239                                   "Copy-on-write test data file should not be modified.")
     240              # Ensuring copy-on-write maps cannot be resized
     241              self.assertRaises(TypeError, m.resize, 2*mapsize)
     242              m.close()
     243  
     244          # Ensuring invalid access parameter raises exception
     245          with open(TESTFN, "r+b") as f:
     246              self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize, access=4)
     247  
     248          if os.name == "posix":
     249              # Try incompatible flags, prot and access parameters.
     250              with open(TESTFN, "r+b") as f:
     251                  self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize,
     252                                    flags=mmap.MAP_PRIVATE,
     253                                    prot=mmap.PROT_READ, access=mmap.ACCESS_WRITE)
     254  
     255              # Try writing with PROT_EXEC and without PROT_WRITE
     256              prot = mmap.PROT_READ | getattr(mmap, 'PROT_EXEC', 0)
     257              with open(TESTFN, "r+b") as f:
     258                  try:
     259                      m = mmap.mmap(f.fileno(), mapsize, prot=prot)
     260                  except PermissionError:
     261                      # on macOS 14, PROT_READ | PROT_EXEC is not allowed
     262                      pass
     263                  else:
     264                      self.assertRaises(TypeError, m.write, b"abcdef")
     265                      self.assertRaises(TypeError, m.write_byte, 0)
     266                      m.close()
     267  
     268      def test_bad_file_desc(self):
     269          # Try opening a bad file descriptor...
     270          self.assertRaises(OSError, mmap.mmap, -2, 4096)
     271  
     272      def test_tougher_find(self):
     273          # Do a tougher .find() test.  SF bug 515943 pointed out that, in 2.2,
     274          # searching for data with embedded \0 bytes didn't work.
     275          with open(TESTFN, 'wb+') as f:
     276  
     277              data = b'aabaac\x00deef\x00\x00aa\x00'
     278              n = len(data)
     279              f.write(data)
     280              f.flush()
     281              m = mmap.mmap(f.fileno(), n)
     282  
     283          for start in range(n+1):
     284              for finish in range(start, n+1):
     285                  slice = data[start : finish]
     286                  self.assertEqual(m.find(slice), data.find(slice))
     287                  self.assertEqual(m.find(slice + b'x'), -1)
     288          m.close()
     289  
     290      def test_find_end(self):
     291          # test the new 'end' parameter works as expected
     292          with open(TESTFN, 'wb+') as f:
     293              data = b'one two ones'
     294              n = len(data)
     295              f.write(data)
     296              f.flush()
     297              m = mmap.mmap(f.fileno(), n)
     298  
     299          self.assertEqual(m.find(b'one'), 0)
     300          self.assertEqual(m.find(b'ones'), 8)
     301          self.assertEqual(m.find(b'one', 0, -1), 0)
     302          self.assertEqual(m.find(b'one', 1), 8)
     303          self.assertEqual(m.find(b'one', 1, -1), 8)
     304          self.assertEqual(m.find(b'one', 1, -2), -1)
     305          self.assertEqual(m.find(bytearray(b'one')), 0)
     306  
     307          for i in range(-n-1, n+1):
     308              for j in range(-n-1, n+1):
     309                  for p in [b"o", b"on", b"two", b"ones", b"s"]:
     310                      expected = data.find(p, i, j)
     311                      self.assertEqual(m.find(p, i, j), expected, (p, i, j))
     312  
     313      def test_find_does_not_access_beyond_buffer(self):
     314          try:
     315              flags = mmap.MAP_PRIVATE | mmap.MAP_ANONYMOUS
     316              PAGESIZE = mmap.PAGESIZE
     317              PROT_NONE = 0
     318              PROT_READ = mmap.PROT_READ
     319          except AttributeError as e:
     320              raise unittest.SkipTest("mmap flags unavailable") from e
     321          for i in range(0, 2049):
     322              with mmap.mmap(-1, PAGESIZE * (i + 1),
     323                             flags=flags, prot=PROT_NONE) as guard:
     324                  with mmap.mmap(-1, PAGESIZE * (i + 2048),
     325                                 flags=flags, prot=PROT_READ) as fm:
     326                      fm.find(b"fo", -2)
     327  
     328  
     329      def test_rfind(self):
     330          # test the new 'end' parameter works as expected
     331          with open(TESTFN, 'wb+') as f:
     332              data = b'one two ones'
     333              n = len(data)
     334              f.write(data)
     335              f.flush()
     336              m = mmap.mmap(f.fileno(), n)
     337  
     338          self.assertEqual(m.rfind(b'one'), 8)
     339          self.assertEqual(m.rfind(b'one '), 0)
     340          self.assertEqual(m.rfind(b'one', 0, -1), 8)
     341          self.assertEqual(m.rfind(b'one', 0, -2), 0)
     342          self.assertEqual(m.rfind(b'one', 1, -1), 8)
     343          self.assertEqual(m.rfind(b'one', 1, -2), -1)
     344          self.assertEqual(m.rfind(bytearray(b'one')), 8)
     345  
     346  
     347      def test_double_close(self):
     348          # make sure a double close doesn't crash on Solaris (Bug# 665913)
     349          with open(TESTFN, 'wb+') as f:
     350              f.write(2**16 * b'a') # Arbitrary character
     351  
     352          with open(TESTFN, 'rb') as f:
     353              mf = mmap.mmap(f.fileno(), 2**16, access=mmap.ACCESS_READ)
     354              mf.close()
     355              mf.close()
     356  
     357      def test_entire_file(self):
     358          # test mapping of entire file by passing 0 for map length
     359          with open(TESTFN, "wb+") as f:
     360              f.write(2**16 * b'm') # Arbitrary character
     361  
     362          with open(TESTFN, "rb+") as f, \
     363               mmap.mmap(f.fileno(), 0) as mf:
     364              self.assertEqual(len(mf), 2**16, "Map size should equal file size.")
     365              self.assertEqual(mf.read(2**16), 2**16 * b"m")
     366  
     367      def test_length_0_offset(self):
     368          # Issue #10916: test mapping of remainder of file by passing 0 for
     369          # map length with an offset doesn't cause a segfault.
     370          # NOTE: allocation granularity is currently 65536 under Win64,
     371          # and therefore the minimum offset alignment.
     372          with open(TESTFN, "wb") as f:
     373              f.write((65536 * 2) * b'm') # Arbitrary character
     374  
     375          with open(TESTFN, "rb") as f:
     376              with mmap.mmap(f.fileno(), 0, offset=65536, access=mmap.ACCESS_READ) as mf:
     377                  self.assertRaises(IndexError, mf.__getitem__, 80000)
     378  
     379      def test_length_0_large_offset(self):
     380          # Issue #10959: test mapping of a file by passing 0 for
     381          # map length with a large offset doesn't cause a segfault.
     382          with open(TESTFN, "wb") as f:
     383              f.write(115699 * b'm') # Arbitrary character
     384  
     385          with open(TESTFN, "w+b") as f:
     386              self.assertRaises(ValueError, mmap.mmap, f.fileno(), 0,
     387                                offset=2147418112)
     388  
     389      def test_move(self):
     390          # make move works everywhere (64-bit format problem earlier)
     391          with open(TESTFN, 'wb+') as f:
     392  
     393              f.write(b"ABCDEabcde") # Arbitrary character
     394              f.flush()
     395  
     396              mf = mmap.mmap(f.fileno(), 10)
     397              mf.move(5, 0, 5)
     398              self.assertEqual(mf[:], b"ABCDEABCDE", "Map move should have duplicated front 5")
     399              mf.close()
     400  
     401          # more excessive test
     402          data = b"0123456789"
     403          for dest in range(len(data)):
     404              for src in range(len(data)):
     405                  for count in range(len(data) - max(dest, src)):
     406                      expected = data[:dest] + data[src:src+count] + data[dest+count:]
     407                      m = mmap.mmap(-1, len(data))
     408                      m[:] = data
     409                      m.move(dest, src, count)
     410                      self.assertEqual(m[:], expected)
     411                      m.close()
     412  
     413          # segfault test (Issue 5387)
     414          m = mmap.mmap(-1, 100)
     415          offsets = [-100, -1, 0, 1, 100]
     416          for source, dest, size in itertools.product(offsets, offsets, offsets):
     417              try:
     418                  m.move(source, dest, size)
     419              except ValueError:
     420                  pass
     421  
     422          offsets = [(-1, -1, -1), (-1, -1, 0), (-1, 0, -1), (0, -1, -1),
     423                     (-1, 0, 0), (0, -1, 0), (0, 0, -1)]
     424          for source, dest, size in offsets:
     425              self.assertRaises(ValueError, m.move, source, dest, size)
     426  
     427          m.close()
     428  
     429          m = mmap.mmap(-1, 1) # single byte
     430          self.assertRaises(ValueError, m.move, 0, 0, 2)
     431          self.assertRaises(ValueError, m.move, 1, 0, 1)
     432          self.assertRaises(ValueError, m.move, 0, 1, 1)
     433          m.move(0, 0, 1)
     434          m.move(0, 0, 0)
     435  
     436      def test_anonymous(self):
     437          # anonymous mmap.mmap(-1, PAGE)
     438          m = mmap.mmap(-1, PAGESIZE)
     439          for x in range(PAGESIZE):
     440              self.assertEqual(m[x], 0,
     441                               "anonymously mmap'ed contents should be zero")
     442  
     443          for x in range(PAGESIZE):
     444              b = x & 0xff
     445              m[x] = b
     446              self.assertEqual(m[x], b)
     447  
     448      def test_read_all(self):
     449          m = mmap.mmap(-1, 16)
     450          self.addCleanup(m.close)
     451  
     452          # With no parameters, or None or a negative argument, reads all
     453          m.write(bytes(range(16)))
     454          m.seek(0)
     455          self.assertEqual(m.read(), bytes(range(16)))
     456          m.seek(8)
     457          self.assertEqual(m.read(), bytes(range(8, 16)))
     458          m.seek(16)
     459          self.assertEqual(m.read(), b'')
     460          m.seek(3)
     461          self.assertEqual(m.read(None), bytes(range(3, 16)))
     462          m.seek(4)
     463          self.assertEqual(m.read(-1), bytes(range(4, 16)))
     464          m.seek(5)
     465          self.assertEqual(m.read(-2), bytes(range(5, 16)))
     466          m.seek(9)
     467          self.assertEqual(m.read(-42), bytes(range(9, 16)))
     468  
     469      def test_read_invalid_arg(self):
     470          m = mmap.mmap(-1, 16)
     471          self.addCleanup(m.close)
     472  
     473          self.assertRaises(TypeError, m.read, 'foo')
     474          self.assertRaises(TypeError, m.read, 5.5)
     475          self.assertRaises(TypeError, m.read, [1, 2, 3])
     476  
     477      def test_extended_getslice(self):
     478          # Test extended slicing by comparing with list slicing.
     479          s = bytes(reversed(range(256)))
     480          m = mmap.mmap(-1, len(s))
     481          m[:] = s
     482          self.assertEqual(m[:], s)
     483          indices = (0, None, 1, 3, 19, 300, sys.maxsize, -1, -2, -31, -300)
     484          for start in indices:
     485              for stop in indices:
     486                  # Skip step 0 (invalid)
     487                  for step in indices[1:]:
     488                      self.assertEqual(m[start:stop:step],
     489                                       s[start:stop:step])
     490  
     491      def test_extended_set_del_slice(self):
     492          # Test extended slicing by comparing with list slicing.
     493          s = bytes(reversed(range(256)))
     494          m = mmap.mmap(-1, len(s))
     495          indices = (0, None, 1, 3, 19, 300, sys.maxsize, -1, -2, -31, -300)
     496          for start in indices:
     497              for stop in indices:
     498                  # Skip invalid step 0
     499                  for step in indices[1:]:
     500                      m[:] = s
     501                      self.assertEqual(m[:], s)
     502                      L = list(s)
     503                      # Make sure we have a slice of exactly the right length,
     504                      # but with different data.
     505                      data = L[start:stop:step]
     506                      data = bytes(reversed(data))
     507                      L[start:stop:step] = data
     508                      m[start:stop:step] = data
     509                      self.assertEqual(m[:], bytes(L))
     510  
     511      def make_mmap_file (self, f, halfsize):
     512          # Write 2 pages worth of data to the file
     513          f.write (b'\0' * halfsize)
     514          f.write (b'foo')
     515          f.write (b'\0' * (halfsize - 3))
     516          f.flush ()
     517          return mmap.mmap (f.fileno(), 0)
     518  
     519      def test_empty_file (self):
     520          f = open (TESTFN, 'w+b')
     521          f.close()
     522          with open(TESTFN, "rb") as f :
     523              self.assertRaisesRegex(ValueError,
     524                                     "cannot mmap an empty file",
     525                                     mmap.mmap, f.fileno(), 0,
     526                                     access=mmap.ACCESS_READ)
     527  
     528      def test_offset (self):
     529          f = open (TESTFN, 'w+b')
     530  
     531          try: # unlink TESTFN no matter what
     532              halfsize = mmap.ALLOCATIONGRANULARITY
     533              m = self.make_mmap_file (f, halfsize)
     534              m.close ()
     535              f.close ()
     536  
     537              mapsize = halfsize * 2
     538              # Try invalid offset
     539              f = open(TESTFN, "r+b")
     540              for offset in [-2, -1, None]:
     541                  try:
     542                      m = mmap.mmap(f.fileno(), mapsize, offset=offset)
     543                      self.assertEqual(0, 1)
     544                  except (ValueError, TypeError, OverflowError):
     545                      pass
     546                  else:
     547                      self.assertEqual(0, 0)
     548              f.close()
     549  
     550              # Try valid offset, hopefully 8192 works on all OSes
     551              f = open(TESTFN, "r+b")
     552              m = mmap.mmap(f.fileno(), mapsize - halfsize, offset=halfsize)
     553              self.assertEqual(m[0:3], b'foo')
     554              f.close()
     555  
     556              # Try resizing map
     557              try:
     558                  m.resize(512)
     559              except SystemError:
     560                  pass
     561              else:
     562                  # resize() is supported
     563                  self.assertEqual(len(m), 512)
     564                  # Check that we can no longer seek beyond the new size.
     565                  self.assertRaises(ValueError, m.seek, 513, 0)
     566                  # Check that the content is not changed
     567                  self.assertEqual(m[0:3], b'foo')
     568  
     569                  # Check that the underlying file is truncated too
     570                  f = open(TESTFN, 'rb')
     571                  f.seek(0, 2)
     572                  self.assertEqual(f.tell(), halfsize + 512)
     573                  f.close()
     574                  self.assertEqual(m.size(), halfsize + 512)
     575  
     576              m.close()
     577  
     578          finally:
     579              f.close()
     580              try:
     581                  os.unlink(TESTFN)
     582              except OSError:
     583                  pass
     584  
     585      def test_subclass(self):
     586          class ESC[4;38;5;81manon_mmap(ESC[4;38;5;149mmmapESC[4;38;5;149m.ESC[4;38;5;149mmmap):
     587              def __new__(klass, *args, **kwargs):
     588                  return mmap.mmap.__new__(klass, -1, *args, **kwargs)
     589          anon_mmap(PAGESIZE)
     590  
     591      @unittest.skipUnless(hasattr(mmap, 'PROT_READ'), "needs mmap.PROT_READ")
     592      def test_prot_readonly(self):
     593          mapsize = 10
     594          with open(TESTFN, "wb") as fp:
     595              fp.write(b"a"*mapsize)
     596          with open(TESTFN, "rb") as f:
     597              m = mmap.mmap(f.fileno(), mapsize, prot=mmap.PROT_READ)
     598              self.assertRaises(TypeError, m.write, "foo")
     599  
     600      def test_error(self):
     601          self.assertIs(mmap.error, OSError)
     602  
     603      def test_io_methods(self):
     604          data = b"0123456789"
     605          with open(TESTFN, "wb") as fp:
     606              fp.write(b"x"*len(data))
     607          with open(TESTFN, "r+b") as f:
     608              m = mmap.mmap(f.fileno(), len(data))
     609          # Test write_byte()
     610          for i in range(len(data)):
     611              self.assertEqual(m.tell(), i)
     612              m.write_byte(data[i])
     613              self.assertEqual(m.tell(), i+1)
     614          self.assertRaises(ValueError, m.write_byte, b"x"[0])
     615          self.assertEqual(m[:], data)
     616          # Test read_byte()
     617          m.seek(0)
     618          for i in range(len(data)):
     619              self.assertEqual(m.tell(), i)
     620              self.assertEqual(m.read_byte(), data[i])
     621              self.assertEqual(m.tell(), i+1)
     622          self.assertRaises(ValueError, m.read_byte)
     623          # Test read()
     624          m.seek(3)
     625          self.assertEqual(m.read(3), b"345")
     626          self.assertEqual(m.tell(), 6)
     627          # Test write()
     628          m.seek(3)
     629          m.write(b"bar")
     630          self.assertEqual(m.tell(), 6)
     631          self.assertEqual(m[:], b"012bar6789")
     632          m.write(bytearray(b"baz"))
     633          self.assertEqual(m.tell(), 9)
     634          self.assertEqual(m[:], b"012barbaz9")
     635          self.assertRaises(ValueError, m.write, b"ba")
     636  
     637      def test_non_ascii_byte(self):
     638          for b in (129, 200, 255): # > 128
     639              m = mmap.mmap(-1, 1)
     640              m.write_byte(b)
     641              self.assertEqual(m[0], b)
     642              m.seek(0)
     643              self.assertEqual(m.read_byte(), b)
     644              m.close()
     645  
     646      @unittest.skipUnless(os.name == 'nt', 'requires Windows')
     647      def test_tagname(self):
     648          data1 = b"0123456789"
     649          data2 = b"abcdefghij"
     650          assert len(data1) == len(data2)
     651          tagname1 = random_tagname()
     652          tagname2 = random_tagname()
     653  
     654          # Test same tag
     655          m1 = mmap.mmap(-1, len(data1), tagname=tagname1)
     656          m1[:] = data1
     657          m2 = mmap.mmap(-1, len(data2), tagname=tagname1)
     658          m2[:] = data2
     659          self.assertEqual(m1[:], data2)
     660          self.assertEqual(m2[:], data2)
     661          m2.close()
     662          m1.close()
     663  
     664          # Test different tag
     665          m1 = mmap.mmap(-1, len(data1), tagname=tagname1)
     666          m1[:] = data1
     667          m2 = mmap.mmap(-1, len(data2), tagname=tagname2)
     668          m2[:] = data2
     669          self.assertEqual(m1[:], data1)
     670          self.assertEqual(m2[:], data2)
     671          m2.close()
     672          m1.close()
     673  
     674      @cpython_only
     675      @unittest.skipUnless(os.name == 'nt', 'requires Windows')
     676      def test_sizeof(self):
     677          m1 = mmap.mmap(-1, 100)
     678          tagname = random_tagname()
     679          m2 = mmap.mmap(-1, 100, tagname=tagname)
     680          self.assertEqual(sys.getsizeof(m2),
     681                           sys.getsizeof(m1) + len(tagname) + 1)
     682  
     683      @unittest.skipUnless(os.name == 'nt', 'requires Windows')
     684      def test_crasher_on_windows(self):
     685          # Should not crash (Issue 1733986)
     686          tagname = random_tagname()
     687          m = mmap.mmap(-1, 1000, tagname=tagname)
     688          try:
     689              mmap.mmap(-1, 5000, tagname=tagname)[:] # same tagname, but larger size
     690          except:
     691              pass
     692          m.close()
     693  
     694          # Should not crash (Issue 5385)
     695          with open(TESTFN, "wb") as fp:
     696              fp.write(b"x"*10)
     697          f = open(TESTFN, "r+b")
     698          m = mmap.mmap(f.fileno(), 0)
     699          f.close()
     700          try:
     701              m.resize(0) # will raise OSError
     702          except:
     703              pass
     704          try:
     705              m[:]
     706          except:
     707              pass
     708          m.close()
     709  
     710      @unittest.skipUnless(os.name == 'nt', 'requires Windows')
     711      def test_invalid_descriptor(self):
     712          # socket file descriptors are valid, but out of range
     713          # for _get_osfhandle, causing a crash when validating the
     714          # parameters to _get_osfhandle.
     715          s = socket.socket()
     716          try:
     717              with self.assertRaises(OSError):
     718                  m = mmap.mmap(s.fileno(), 10)
     719          finally:
     720              s.close()
     721  
     722      def test_context_manager(self):
     723          with mmap.mmap(-1, 10) as m:
     724              self.assertFalse(m.closed)
     725          self.assertTrue(m.closed)
     726  
     727      def test_context_manager_exception(self):
     728          # Test that the OSError gets passed through
     729          with self.assertRaises(Exception) as exc:
     730              with mmap.mmap(-1, 10) as m:
     731                  raise OSError
     732          self.assertIsInstance(exc.exception, OSError,
     733                                "wrong exception raised in context manager")
     734          self.assertTrue(m.closed, "context manager failed")
     735  
     736      def test_weakref(self):
     737          # Check mmap objects are weakrefable
     738          mm = mmap.mmap(-1, 16)
     739          wr = weakref.ref(mm)
     740          self.assertIs(wr(), mm)
     741          del mm
     742          gc_collect()
     743          self.assertIs(wr(), None)
     744  
     745      def test_write_returning_the_number_of_bytes_written(self):
     746          mm = mmap.mmap(-1, 16)
     747          self.assertEqual(mm.write(b""), 0)
     748          self.assertEqual(mm.write(b"x"), 1)
     749          self.assertEqual(mm.write(b"yz"), 2)
     750          self.assertEqual(mm.write(b"python"), 6)
     751  
     752      def test_resize_past_pos(self):
     753          m = mmap.mmap(-1, 8192)
     754          self.addCleanup(m.close)
     755          m.read(5000)
     756          try:
     757              m.resize(4096)
     758          except SystemError:
     759              self.skipTest("resizing not supported")
     760          self.assertEqual(m.read(14), b'')
     761          self.assertRaises(ValueError, m.read_byte)
     762          self.assertRaises(ValueError, m.write_byte, 42)
     763          self.assertRaises(ValueError, m.write, b'abc')
     764  
     765      def test_concat_repeat_exception(self):
     766          m = mmap.mmap(-1, 16)
     767          with self.assertRaises(TypeError):
     768              m + m
     769          with self.assertRaises(TypeError):
     770              m * 2
     771  
     772      def test_flush_return_value(self):
     773          # mm.flush() should return None on success, raise an
     774          # exception on error under all platforms.
     775          mm = mmap.mmap(-1, 16)
     776          self.addCleanup(mm.close)
     777          mm.write(b'python')
     778          result = mm.flush()
     779          self.assertIsNone(result)
     780          if sys.platform.startswith('linux'):
     781              # 'offset' must be a multiple of mmap.PAGESIZE on Linux.
     782              # See bpo-34754 for details.
     783              self.assertRaises(OSError, mm.flush, 1, len(b'python'))
     784  
     785      def test_repr(self):
     786          open_mmap_repr_pat = re.compile(
     787              r"<mmap.mmap closed=False, "
     788              r"access=(?P<access>\S+), "
     789              r"length=(?P<length>\d+), "
     790              r"pos=(?P<pos>\d+), "
     791              r"offset=(?P<offset>\d+)>")
     792          closed_mmap_repr_pat = re.compile(r"<mmap.mmap closed=True>")
     793          mapsizes = (50, 100, 1_000, 1_000_000, 10_000_000)
     794          offsets = tuple((mapsize // 2 // mmap.ALLOCATIONGRANULARITY)
     795                          * mmap.ALLOCATIONGRANULARITY for mapsize in mapsizes)
     796          for offset, mapsize in zip(offsets, mapsizes):
     797              data = b'a' * mapsize
     798              length = mapsize - offset
     799              accesses = ('ACCESS_DEFAULT', 'ACCESS_READ',
     800                          'ACCESS_COPY', 'ACCESS_WRITE')
     801              positions = (0, length//10, length//5, length//4)
     802              with open(TESTFN, "wb+") as fp:
     803                  fp.write(data)
     804                  fp.flush()
     805                  for access, pos in itertools.product(accesses, positions):
     806                      accint = getattr(mmap, access)
     807                      with mmap.mmap(fp.fileno(),
     808                                     length,
     809                                     access=accint,
     810                                     offset=offset) as mm:
     811                          mm.seek(pos)
     812                          match = open_mmap_repr_pat.match(repr(mm))
     813                          self.assertIsNotNone(match)
     814                          self.assertEqual(match.group('access'), access)
     815                          self.assertEqual(match.group('length'), str(length))
     816                          self.assertEqual(match.group('pos'), str(pos))
     817                          self.assertEqual(match.group('offset'), str(offset))
     818                      match = closed_mmap_repr_pat.match(repr(mm))
     819                      self.assertIsNotNone(match)
     820  
     821      @unittest.skipUnless(hasattr(mmap.mmap, 'madvise'), 'needs madvise')
     822      def test_madvise(self):
     823          size = 2 * PAGESIZE
     824          m = mmap.mmap(-1, size)
     825  
     826          with self.assertRaisesRegex(ValueError, "madvise start out of bounds"):
     827              m.madvise(mmap.MADV_NORMAL, size)
     828          with self.assertRaisesRegex(ValueError, "madvise start out of bounds"):
     829              m.madvise(mmap.MADV_NORMAL, -1)
     830          with self.assertRaisesRegex(ValueError, "madvise length invalid"):
     831              m.madvise(mmap.MADV_NORMAL, 0, -1)
     832          with self.assertRaisesRegex(OverflowError, "madvise length too large"):
     833              m.madvise(mmap.MADV_NORMAL, PAGESIZE, sys.maxsize)
     834          self.assertEqual(m.madvise(mmap.MADV_NORMAL), None)
     835          self.assertEqual(m.madvise(mmap.MADV_NORMAL, PAGESIZE), None)
     836          self.assertEqual(m.madvise(mmap.MADV_NORMAL, PAGESIZE, size), None)
     837          self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, 2), None)
     838          self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, size), None)
     839  
     840      @unittest.skipUnless(os.name == 'nt', 'requires Windows')
     841      def test_resize_up_when_mapped_to_pagefile(self):
     842          """If the mmap is backed by the pagefile ensure a resize up can happen
     843          and that the original data is still in place
     844          """
     845          start_size = PAGESIZE
     846          new_size = 2 * start_size
     847          data = bytes(random.getrandbits(8) for _ in range(start_size))
     848  
     849          m = mmap.mmap(-1, start_size)
     850          m[:] = data
     851          m.resize(new_size)
     852          self.assertEqual(len(m), new_size)
     853          self.assertEqual(m[:start_size], data[:start_size])
     854  
     855      @unittest.skipUnless(os.name == 'nt', 'requires Windows')
     856      def test_resize_down_when_mapped_to_pagefile(self):
     857          """If the mmap is backed by the pagefile ensure a resize down up can happen
     858          and that a truncated form of the original data is still in place
     859          """
     860          start_size = PAGESIZE
     861          new_size = start_size // 2
     862          data = bytes(random.getrandbits(8) for _ in range(start_size))
     863  
     864          m = mmap.mmap(-1, start_size)
     865          m[:] = data
     866          m.resize(new_size)
     867          self.assertEqual(len(m), new_size)
     868          self.assertEqual(m[:new_size], data[:new_size])
     869  
     870      @unittest.skipUnless(os.name == 'nt', 'requires Windows')
     871      def test_resize_fails_if_mapping_held_elsewhere(self):
     872          """If more than one mapping is held against a named file on Windows, neither
     873          mapping can be resized
     874          """
     875          start_size = 2 * PAGESIZE
     876          reduced_size = PAGESIZE
     877  
     878          f = open(TESTFN, 'wb+')
     879          f.truncate(start_size)
     880          try:
     881              m1 = mmap.mmap(f.fileno(), start_size)
     882              m2 = mmap.mmap(f.fileno(), start_size)
     883              with self.assertRaises(OSError):
     884                  m1.resize(reduced_size)
     885              with self.assertRaises(OSError):
     886                  m2.resize(reduced_size)
     887              m2.close()
     888              m1.resize(reduced_size)
     889              self.assertEqual(m1.size(), reduced_size)
     890              self.assertEqual(os.stat(f.fileno()).st_size, reduced_size)
     891          finally:
     892              f.close()
     893  
     894      @unittest.skipUnless(os.name == 'nt', 'requires Windows')
     895      def test_resize_succeeds_with_error_for_second_named_mapping(self):
     896          """If a more than one mapping exists of the same name, none of them can
     897          be resized: they'll raise an Exception and leave the original mapping intact
     898          """
     899          start_size = 2 * PAGESIZE
     900          reduced_size = PAGESIZE
     901          tagname =  random_tagname()
     902          data_length = 8
     903          data = bytes(random.getrandbits(8) for _ in range(data_length))
     904  
     905          m1 = mmap.mmap(-1, start_size, tagname=tagname)
     906          m2 = mmap.mmap(-1, start_size, tagname=tagname)
     907          m1[:data_length] = data
     908          self.assertEqual(m2[:data_length], data)
     909          with self.assertRaises(OSError):
     910              m1.resize(reduced_size)
     911          self.assertEqual(m1.size(), start_size)
     912          self.assertEqual(m1[:data_length], data)
     913          self.assertEqual(m2[:data_length], data)
     914  
     915      def test_mmap_closed_by_int_scenarios(self):
     916          """
     917          gh-103987: Test that mmap objects raise ValueError
     918                  for closed mmap files
     919          """
     920  
     921          class ESC[4;38;5;81mMmapClosedByIntContext:
     922              def __init__(self, access) -> None:
     923                  self.access = access
     924  
     925              def __enter__(self):
     926                  self.f = open(TESTFN, "w+b")
     927                  self.f.write(random.randbytes(100))
     928                  self.f.flush()
     929  
     930                  m = mmap.mmap(self.f.fileno(), 100, access=self.access)
     931  
     932                  class ESC[4;38;5;81mX:
     933                      def __index__(self):
     934                          m.close()
     935                          return 10
     936  
     937                  return (m, X)
     938  
     939              def __exit__(self, exc_type, exc_value, traceback):
     940                  self.f.close()
     941  
     942          read_access_modes = [
     943              mmap.ACCESS_READ,
     944              mmap.ACCESS_WRITE,
     945              mmap.ACCESS_COPY,
     946              mmap.ACCESS_DEFAULT,
     947          ]
     948  
     949          write_access_modes = [
     950              mmap.ACCESS_WRITE,
     951              mmap.ACCESS_COPY,
     952              mmap.ACCESS_DEFAULT,
     953          ]
     954  
     955          for access in read_access_modes:
     956              with MmapClosedByIntContext(access) as (m, X):
     957                  with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
     958                      m[X()]
     959  
     960              with MmapClosedByIntContext(access) as (m, X):
     961                  with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
     962                      m[X() : 20]
     963  
     964              with MmapClosedByIntContext(access) as (m, X):
     965                  with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
     966                      m[X() : 20 : 2]
     967  
     968              with MmapClosedByIntContext(access) as (m, X):
     969                  with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
     970                      m[20 : X() : -2]
     971  
     972              with MmapClosedByIntContext(access) as (m, X):
     973                  with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
     974                      m.read(X())
     975  
     976              with MmapClosedByIntContext(access) as (m, X):
     977                  with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
     978                      m.find(b"1", 1, X())
     979  
     980          for access in write_access_modes:
     981              with MmapClosedByIntContext(access) as (m, X):
     982                  with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
     983                      m[X() : 20] = b"1" * 10
     984  
     985              with MmapClosedByIntContext(access) as (m, X):
     986                  with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
     987                      m[X() : 20 : 2] = b"1" * 5
     988  
     989              with MmapClosedByIntContext(access) as (m, X):
     990                  with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
     991                      m[20 : X() : -2] = b"1" * 5
     992  
     993              with MmapClosedByIntContext(access) as (m, X):
     994                  with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
     995                      m.move(1, 2, X())
     996  
     997              with MmapClosedByIntContext(access) as (m, X):
     998                  with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
     999                      m.write_byte(X())
    1000  
    1001  class ESC[4;38;5;81mLargeMmapTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1002  
    1003      def setUp(self):
    1004          unlink(TESTFN)
    1005  
    1006      def tearDown(self):
    1007          unlink(TESTFN)
    1008  
    1009      def _make_test_file(self, num_zeroes, tail):
    1010          if sys.platform[:3] == 'win' or sys.platform == 'darwin':
    1011              requires('largefile',
    1012                  'test requires %s bytes and a long time to run' % str(0x180000000))
    1013          f = open(TESTFN, 'w+b')
    1014          try:
    1015              f.seek(num_zeroes)
    1016              f.write(tail)
    1017              f.flush()
    1018          except (OSError, OverflowError, ValueError):
    1019              try:
    1020                  f.close()
    1021              except (OSError, OverflowError):
    1022                  pass
    1023              raise unittest.SkipTest("filesystem does not have largefile support")
    1024          return f
    1025  
    1026      def test_large_offset(self):
    1027          with self._make_test_file(0x14FFFFFFF, b" ") as f:
    1028              with mmap.mmap(f.fileno(), 0, offset=0x140000000, access=mmap.ACCESS_READ) as m:
    1029                  self.assertEqual(m[0xFFFFFFF], 32)
    1030  
    1031      def test_large_filesize(self):
    1032          with self._make_test_file(0x17FFFFFFF, b" ") as f:
    1033              if sys.maxsize < 0x180000000:
    1034                  # On 32 bit platforms the file is larger than sys.maxsize so
    1035                  # mapping the whole file should fail -- Issue #16743
    1036                  with self.assertRaises(OverflowError):
    1037                      mmap.mmap(f.fileno(), 0x180000000, access=mmap.ACCESS_READ)
    1038                  with self.assertRaises(ValueError):
    1039                      mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
    1040              with mmap.mmap(f.fileno(), 0x10000, access=mmap.ACCESS_READ) as m:
    1041                  self.assertEqual(m.size(), 0x180000000)
    1042  
    1043      # Issue 11277: mmap() with large (~4 GiB) sparse files crashes on OS X.
    1044  
    1045      def _test_around_boundary(self, boundary):
    1046          tail = b'  DEARdear  '
    1047          start = boundary - len(tail) // 2
    1048          end = start + len(tail)
    1049          with self._make_test_file(start, tail) as f:
    1050              with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as m:
    1051                  self.assertEqual(m[start:end], tail)
    1052  
    1053      @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems")
    1054      def test_around_2GB(self):
    1055          self._test_around_boundary(_2G)
    1056  
    1057      @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems")
    1058      def test_around_4GB(self):
    1059          self._test_around_boundary(_4G)
    1060  
    1061  
    1062  if __name__ == '__main__':
    1063      unittest.main()