(root)/
Python-3.11.7/
Lib/
multiprocessing/
heap.py
       1  #
       2  # Module which supports allocation of memory from an mmap
       3  #
       4  # multiprocessing/heap.py
       5  #
       6  # Copyright (c) 2006-2008, R Oudkerk
       7  # Licensed to PSF under a Contributor Agreement.
       8  #
       9  
      10  import bisect
      11  from collections import defaultdict
      12  import mmap
      13  import os
      14  import sys
      15  import tempfile
      16  import threading
      17  
      18  from .context import reduction, assert_spawning
      19  from . import util
      20  
      21  __all__ = ['BufferWrapper']
      22  
      23  #
      24  # Inheritable class which wraps an mmap, and from which blocks can be allocated
      25  #
      26  
      27  if sys.platform == 'win32':
      28  
      29      import _winapi
      30  
      31      class ESC[4;38;5;81mArena(ESC[4;38;5;149mobject):
      32          """
      33          A shared memory area backed by anonymous memory (Windows).
      34          """
      35  
      36          _rand = tempfile._RandomNameSequence()
      37  
      38          def __init__(self, size):
      39              self.size = size
      40              for i in range(100):
      41                  name = 'pym-%d-%s' % (os.getpid(), next(self._rand))
      42                  buf = mmap.mmap(-1, size, tagname=name)
      43                  if _winapi.GetLastError() == 0:
      44                      break
      45                  # We have reopened a preexisting mmap.
      46                  buf.close()
      47              else:
      48                  raise FileExistsError('Cannot find name for new mmap')
      49              self.name = name
      50              self.buffer = buf
      51              self._state = (self.size, self.name)
      52  
      53          def __getstate__(self):
      54              assert_spawning(self)
      55              return self._state
      56  
      57          def __setstate__(self, state):
      58              self.size, self.name = self._state = state
      59              # Reopen existing mmap
      60              self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
      61              # XXX Temporarily preventing buildbot failures while determining
      62              # XXX the correct long-term fix. See issue 23060
      63              #assert _winapi.GetLastError() == _winapi.ERROR_ALREADY_EXISTS
      64  
      65  else:
      66  
      67      class ESC[4;38;5;81mArena(ESC[4;38;5;149mobject):
      68          """
      69          A shared memory area backed by a temporary file (POSIX).
      70          """
      71  
      72          if sys.platform == 'linux':
      73              _dir_candidates = ['/dev/shm']
      74          else:
      75              _dir_candidates = []
      76  
      77          def __init__(self, size, fd=-1):
      78              self.size = size
      79              self.fd = fd
      80              if fd == -1:
      81                  # Arena is created anew (if fd != -1, it means we're coming
      82                  # from rebuild_arena() below)
      83                  self.fd, name = tempfile.mkstemp(
      84                       prefix='pym-%d-'%os.getpid(),
      85                       dir=self._choose_dir(size))
      86                  os.unlink(name)
      87                  util.Finalize(self, os.close, (self.fd,))
      88                  os.ftruncate(self.fd, size)
      89              self.buffer = mmap.mmap(self.fd, self.size)
      90  
      91          def _choose_dir(self, size):
      92              # Choose a non-storage backed directory if possible,
      93              # to improve performance
      94              for d in self._dir_candidates:
      95                  st = os.statvfs(d)
      96                  if st.f_bavail * st.f_frsize >= size:  # enough free space?
      97                      return d
      98              return util.get_temp_dir()
      99  
     100      def reduce_arena(a):
     101          if a.fd == -1:
     102              raise ValueError('Arena is unpicklable because '
     103                               'forking was enabled when it was created')
     104          return rebuild_arena, (a.size, reduction.DupFd(a.fd))
     105  
     106      def rebuild_arena(size, dupfd):
     107          return Arena(size, dupfd.detach())
     108  
     109      reduction.register(Arena, reduce_arena)
     110  
     111  #
     112  # Class allowing allocation of chunks of memory from arenas
     113  #
     114  
     115  class ESC[4;38;5;81mHeap(ESC[4;38;5;149mobject):
     116  
     117      # Minimum malloc() alignment
     118      _alignment = 8
     119  
     120      _DISCARD_FREE_SPACE_LARGER_THAN = 4 * 1024 ** 2  # 4 MB
     121      _DOUBLE_ARENA_SIZE_UNTIL = 4 * 1024 ** 2
     122  
     123      def __init__(self, size=mmap.PAGESIZE):
     124          self._lastpid = os.getpid()
     125          self._lock = threading.Lock()
     126          # Current arena allocation size
     127          self._size = size
     128          # A sorted list of available block sizes in arenas
     129          self._lengths = []
     130  
     131          # Free block management:
     132          # - map each block size to a list of `(Arena, start, stop)` blocks
     133          self._len_to_seq = {}
     134          # - map `(Arena, start)` tuple to the `(Arena, start, stop)` block
     135          #   starting at that offset
     136          self._start_to_block = {}
     137          # - map `(Arena, stop)` tuple to the `(Arena, start, stop)` block
     138          #   ending at that offset
     139          self._stop_to_block = {}
     140  
     141          # Map arenas to their `(Arena, start, stop)` blocks in use
     142          self._allocated_blocks = defaultdict(set)
     143          self._arenas = []
     144  
     145          # List of pending blocks to free - see comment in free() below
     146          self._pending_free_blocks = []
     147  
     148          # Statistics
     149          self._n_mallocs = 0
     150          self._n_frees = 0
     151  
     152      @staticmethod
     153      def _roundup(n, alignment):
     154          # alignment must be a power of 2
     155          mask = alignment - 1
     156          return (n + mask) & ~mask
     157  
     158      def _new_arena(self, size):
     159          # Create a new arena with at least the given *size*
     160          length = self._roundup(max(self._size, size), mmap.PAGESIZE)
     161          # We carve larger and larger arenas, for efficiency, until we
     162          # reach a large-ish size (roughly L3 cache-sized)
     163          if self._size < self._DOUBLE_ARENA_SIZE_UNTIL:
     164              self._size *= 2
     165          util.info('allocating a new mmap of length %d', length)
     166          arena = Arena(length)
     167          self._arenas.append(arena)
     168          return (arena, 0, length)
     169  
     170      def _discard_arena(self, arena):
     171          # Possibly delete the given (unused) arena
     172          length = arena.size
     173          # Reusing an existing arena is faster than creating a new one, so
     174          # we only reclaim space if it's large enough.
     175          if length < self._DISCARD_FREE_SPACE_LARGER_THAN:
     176              return
     177          blocks = self._allocated_blocks.pop(arena)
     178          assert not blocks
     179          del self._start_to_block[(arena, 0)]
     180          del self._stop_to_block[(arena, length)]
     181          self._arenas.remove(arena)
     182          seq = self._len_to_seq[length]
     183          seq.remove((arena, 0, length))
     184          if not seq:
     185              del self._len_to_seq[length]
     186              self._lengths.remove(length)
     187  
     188      def _malloc(self, size):
     189          # returns a large enough block -- it might be much larger
     190          i = bisect.bisect_left(self._lengths, size)
     191          if i == len(self._lengths):
     192              return self._new_arena(size)
     193          else:
     194              length = self._lengths[i]
     195              seq = self._len_to_seq[length]
     196              block = seq.pop()
     197              if not seq:
     198                  del self._len_to_seq[length], self._lengths[i]
     199  
     200          (arena, start, stop) = block
     201          del self._start_to_block[(arena, start)]
     202          del self._stop_to_block[(arena, stop)]
     203          return block
     204  
     205      def _add_free_block(self, block):
     206          # make block available and try to merge with its neighbours in the arena
     207          (arena, start, stop) = block
     208  
     209          try:
     210              prev_block = self._stop_to_block[(arena, start)]
     211          except KeyError:
     212              pass
     213          else:
     214              start, _ = self._absorb(prev_block)
     215  
     216          try:
     217              next_block = self._start_to_block[(arena, stop)]
     218          except KeyError:
     219              pass
     220          else:
     221              _, stop = self._absorb(next_block)
     222  
     223          block = (arena, start, stop)
     224          length = stop - start
     225  
     226          try:
     227              self._len_to_seq[length].append(block)
     228          except KeyError:
     229              self._len_to_seq[length] = [block]
     230              bisect.insort(self._lengths, length)
     231  
     232          self._start_to_block[(arena, start)] = block
     233          self._stop_to_block[(arena, stop)] = block
     234  
     235      def _absorb(self, block):
     236          # deregister this block so it can be merged with a neighbour
     237          (arena, start, stop) = block
     238          del self._start_to_block[(arena, start)]
     239          del self._stop_to_block[(arena, stop)]
     240  
     241          length = stop - start
     242          seq = self._len_to_seq[length]
     243          seq.remove(block)
     244          if not seq:
     245              del self._len_to_seq[length]
     246              self._lengths.remove(length)
     247  
     248          return start, stop
     249  
     250      def _remove_allocated_block(self, block):
     251          arena, start, stop = block
     252          blocks = self._allocated_blocks[arena]
     253          blocks.remove((start, stop))
     254          if not blocks:
     255              # Arena is entirely free, discard it from this process
     256              self._discard_arena(arena)
     257  
     258      def _free_pending_blocks(self):
     259          # Free all the blocks in the pending list - called with the lock held.
     260          while True:
     261              try:
     262                  block = self._pending_free_blocks.pop()
     263              except IndexError:
     264                  break
     265              self._add_free_block(block)
     266              self._remove_allocated_block(block)
     267  
     268      def free(self, block):
     269          # free a block returned by malloc()
     270          # Since free() can be called asynchronously by the GC, it could happen
     271          # that it's called while self._lock is held: in that case,
     272          # self._lock.acquire() would deadlock (issue #12352). To avoid that, a
     273          # trylock is used instead, and if the lock can't be acquired
     274          # immediately, the block is added to a list of blocks to be freed
     275          # synchronously sometimes later from malloc() or free(), by calling
     276          # _free_pending_blocks() (appending and retrieving from a list is not
     277          # strictly thread-safe but under CPython it's atomic thanks to the GIL).
     278          if os.getpid() != self._lastpid:
     279              raise ValueError(
     280                  "My pid ({0:n}) is not last pid {1:n}".format(
     281                      os.getpid(),self._lastpid))
     282          if not self._lock.acquire(False):
     283              # can't acquire the lock right now, add the block to the list of
     284              # pending blocks to free
     285              self._pending_free_blocks.append(block)
     286          else:
     287              # we hold the lock
     288              try:
     289                  self._n_frees += 1
     290                  self._free_pending_blocks()
     291                  self._add_free_block(block)
     292                  self._remove_allocated_block(block)
     293              finally:
     294                  self._lock.release()
     295  
     296      def malloc(self, size):
     297          # return a block of right size (possibly rounded up)
     298          if size < 0:
     299              raise ValueError("Size {0:n} out of range".format(size))
     300          if sys.maxsize <= size:
     301              raise OverflowError("Size {0:n} too large".format(size))
     302          if os.getpid() != self._lastpid:
     303              self.__init__()                     # reinitialize after fork
     304          with self._lock:
     305              self._n_mallocs += 1
     306              # allow pending blocks to be marked available
     307              self._free_pending_blocks()
     308              size = self._roundup(max(size, 1), self._alignment)
     309              (arena, start, stop) = self._malloc(size)
     310              real_stop = start + size
     311              if real_stop < stop:
     312                  # if the returned block is larger than necessary, mark
     313                  # the remainder available
     314                  self._add_free_block((arena, real_stop, stop))
     315              self._allocated_blocks[arena].add((start, real_stop))
     316              return (arena, start, real_stop)
     317  
     318  #
     319  # Class wrapping a block allocated out of a Heap -- can be inherited by child process
     320  #
     321  
     322  class ESC[4;38;5;81mBufferWrapper(ESC[4;38;5;149mobject):
     323  
     324      _heap = Heap()
     325  
     326      def __init__(self, size):
     327          if size < 0:
     328              raise ValueError("Size {0:n} out of range".format(size))
     329          if sys.maxsize <= size:
     330              raise OverflowError("Size {0:n} too large".format(size))
     331          block = BufferWrapper._heap.malloc(size)
     332          self._state = (block, size)
     333          util.Finalize(self, BufferWrapper._heap.free, args=(block,))
     334  
     335      def create_memoryview(self):
     336          (arena, start, stop), size = self._state
     337          return memoryview(arena.buffer)[start:start+size]