(root)/
Python-3.12.0/
Lib/
multiprocessing/
synchronize.py
       1  #
       2  # Module implementing synchronization primitives
       3  #
       4  # multiprocessing/synchronize.py
       5  #
       6  # Copyright (c) 2006-2008, R Oudkerk
       7  # Licensed to PSF under a Contributor Agreement.
       8  #
       9  
      10  __all__ = [
      11      'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
      12      ]
      13  
      14  import threading
      15  import sys
      16  import tempfile
      17  import _multiprocessing
      18  import time
      19  
      20  from . import context
      21  from . import process
      22  from . import util
      23  
      24  # Try to import the mp.synchronize module cleanly, if it fails
      25  # raise ImportError for platforms lacking a working sem_open implementation.
      26  # See issue 3770
      27  try:
      28      from _multiprocessing import SemLock, sem_unlink
      29  except (ImportError):
      30      raise ImportError("This platform lacks a functioning sem_open" +
      31                        " implementation, therefore, the required" +
      32                        " synchronization primitives needed will not" +
      33                        " function, see issue 3770.")
      34  
      35  #
      36  # Constants
      37  #
      38  
      39  RECURSIVE_MUTEX, SEMAPHORE = list(range(2))
      40  SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
      41  
      42  #
      43  # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
      44  #
      45  
      46  class ESC[4;38;5;81mSemLock(ESC[4;38;5;149mobject):
      47  
      48      _rand = tempfile._RandomNameSequence()
      49  
      50      def __init__(self, kind, value, maxvalue, *, ctx):
      51          if ctx is None:
      52              ctx = context._default_context.get_context()
      53          self._is_fork_ctx = ctx.get_start_method() == 'fork'
      54          unlink_now = sys.platform == 'win32' or self._is_fork_ctx
      55          for i in range(100):
      56              try:
      57                  sl = self._semlock = _multiprocessing.SemLock(
      58                      kind, value, maxvalue, self._make_name(),
      59                      unlink_now)
      60              except FileExistsError:
      61                  pass
      62              else:
      63                  break
      64          else:
      65              raise FileExistsError('cannot find name for semaphore')
      66  
      67          util.debug('created semlock with handle %s' % sl.handle)
      68          self._make_methods()
      69  
      70          if sys.platform != 'win32':
      71              def _after_fork(obj):
      72                  obj._semlock._after_fork()
      73              util.register_after_fork(self, _after_fork)
      74  
      75          if self._semlock.name is not None:
      76              # We only get here if we are on Unix with forking
      77              # disabled.  When the object is garbage collected or the
      78              # process shuts down we unlink the semaphore name
      79              from .resource_tracker import register
      80              register(self._semlock.name, "semaphore")
      81              util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
      82                            exitpriority=0)
      83  
      84      @staticmethod
      85      def _cleanup(name):
      86          from .resource_tracker import unregister
      87          sem_unlink(name)
      88          unregister(name, "semaphore")
      89  
      90      def _make_methods(self):
      91          self.acquire = self._semlock.acquire
      92          self.release = self._semlock.release
      93  
      94      def __enter__(self):
      95          return self._semlock.__enter__()
      96  
      97      def __exit__(self, *args):
      98          return self._semlock.__exit__(*args)
      99  
     100      def __getstate__(self):
     101          context.assert_spawning(self)
     102          sl = self._semlock
     103          if sys.platform == 'win32':
     104              h = context.get_spawning_popen().duplicate_for_child(sl.handle)
     105          else:
     106              if self._is_fork_ctx:
     107                  raise RuntimeError('A SemLock created in a fork context is being '
     108                                     'shared with a process in a spawn context. This is '
     109                                     'not supported. Please use the same context to create '
     110                                     'multiprocessing objects and Process.')
     111              h = sl.handle
     112          return (h, sl.kind, sl.maxvalue, sl.name)
     113  
     114      def __setstate__(self, state):
     115          self._semlock = _multiprocessing.SemLock._rebuild(*state)
     116          util.debug('recreated blocker with handle %r' % state[0])
     117          self._make_methods()
     118          # Ensure that deserialized SemLock can be serialized again (gh-108520).
     119          self._is_fork_ctx = False
     120  
     121      @staticmethod
     122      def _make_name():
     123          return '%s-%s' % (process.current_process()._config['semprefix'],
     124                            next(SemLock._rand))
     125  
     126  #
     127  # Semaphore
     128  #
     129  
     130  class ESC[4;38;5;81mSemaphore(ESC[4;38;5;149mSemLock):
     131  
     132      def __init__(self, value=1, *, ctx):
     133          SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX, ctx=ctx)
     134  
     135      def get_value(self):
     136          return self._semlock._get_value()
     137  
     138      def __repr__(self):
     139          try:
     140              value = self._semlock._get_value()
     141          except Exception:
     142              value = 'unknown'
     143          return '<%s(value=%s)>' % (self.__class__.__name__, value)
     144  
     145  #
     146  # Bounded semaphore
     147  #
     148  
     149  class ESC[4;38;5;81mBoundedSemaphore(ESC[4;38;5;149mSemaphore):
     150  
     151      def __init__(self, value=1, *, ctx):
     152          SemLock.__init__(self, SEMAPHORE, value, value, ctx=ctx)
     153  
     154      def __repr__(self):
     155          try:
     156              value = self._semlock._get_value()
     157          except Exception:
     158              value = 'unknown'
     159          return '<%s(value=%s, maxvalue=%s)>' % \
     160                 (self.__class__.__name__, value, self._semlock.maxvalue)
     161  
     162  #
     163  # Non-recursive lock
     164  #
     165  
     166  class ESC[4;38;5;81mLock(ESC[4;38;5;149mSemLock):
     167  
     168      def __init__(self, *, ctx):
     169          SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
     170  
     171      def __repr__(self):
     172          try:
     173              if self._semlock._is_mine():
     174                  name = process.current_process().name
     175                  if threading.current_thread().name != 'MainThread':
     176                      name += '|' + threading.current_thread().name
     177              elif self._semlock._get_value() == 1:
     178                  name = 'None'
     179              elif self._semlock._count() > 0:
     180                  name = 'SomeOtherThread'
     181              else:
     182                  name = 'SomeOtherProcess'
     183          except Exception:
     184              name = 'unknown'
     185          return '<%s(owner=%s)>' % (self.__class__.__name__, name)
     186  
     187  #
     188  # Recursive lock
     189  #
     190  
     191  class ESC[4;38;5;81mRLock(ESC[4;38;5;149mSemLock):
     192  
     193      def __init__(self, *, ctx):
     194          SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1, ctx=ctx)
     195  
     196      def __repr__(self):
     197          try:
     198              if self._semlock._is_mine():
     199                  name = process.current_process().name
     200                  if threading.current_thread().name != 'MainThread':
     201                      name += '|' + threading.current_thread().name
     202                  count = self._semlock._count()
     203              elif self._semlock._get_value() == 1:
     204                  name, count = 'None', 0
     205              elif self._semlock._count() > 0:
     206                  name, count = 'SomeOtherThread', 'nonzero'
     207              else:
     208                  name, count = 'SomeOtherProcess', 'nonzero'
     209          except Exception:
     210              name, count = 'unknown', 'unknown'
     211          return '<%s(%s, %s)>' % (self.__class__.__name__, name, count)
     212  
     213  #
     214  # Condition variable
     215  #
     216  
     217  class ESC[4;38;5;81mCondition(ESC[4;38;5;149mobject):
     218  
     219      def __init__(self, lock=None, *, ctx):
     220          self._lock = lock or ctx.RLock()
     221          self._sleeping_count = ctx.Semaphore(0)
     222          self._woken_count = ctx.Semaphore(0)
     223          self._wait_semaphore = ctx.Semaphore(0)
     224          self._make_methods()
     225  
     226      def __getstate__(self):
     227          context.assert_spawning(self)
     228          return (self._lock, self._sleeping_count,
     229                  self._woken_count, self._wait_semaphore)
     230  
     231      def __setstate__(self, state):
     232          (self._lock, self._sleeping_count,
     233           self._woken_count, self._wait_semaphore) = state
     234          self._make_methods()
     235  
     236      def __enter__(self):
     237          return self._lock.__enter__()
     238  
     239      def __exit__(self, *args):
     240          return self._lock.__exit__(*args)
     241  
     242      def _make_methods(self):
     243          self.acquire = self._lock.acquire
     244          self.release = self._lock.release
     245  
     246      def __repr__(self):
     247          try:
     248              num_waiters = (self._sleeping_count._semlock._get_value() -
     249                             self._woken_count._semlock._get_value())
     250          except Exception:
     251              num_waiters = 'unknown'
     252          return '<%s(%s, %s)>' % (self.__class__.__name__, self._lock, num_waiters)
     253  
     254      def wait(self, timeout=None):
     255          assert self._lock._semlock._is_mine(), \
     256                 'must acquire() condition before using wait()'
     257  
     258          # indicate that this thread is going to sleep
     259          self._sleeping_count.release()
     260  
     261          # release lock
     262          count = self._lock._semlock._count()
     263          for i in range(count):
     264              self._lock.release()
     265  
     266          try:
     267              # wait for notification or timeout
     268              return self._wait_semaphore.acquire(True, timeout)
     269          finally:
     270              # indicate that this thread has woken
     271              self._woken_count.release()
     272  
     273              # reacquire lock
     274              for i in range(count):
     275                  self._lock.acquire()
     276  
     277      def notify(self, n=1):
     278          assert self._lock._semlock._is_mine(), 'lock is not owned'
     279          assert not self._wait_semaphore.acquire(
     280              False), ('notify: Should not have been able to acquire '
     281                       + '_wait_semaphore')
     282  
     283          # to take account of timeouts since last notify*() we subtract
     284          # woken_count from sleeping_count and rezero woken_count
     285          while self._woken_count.acquire(False):
     286              res = self._sleeping_count.acquire(False)
     287              assert res, ('notify: Bug in sleeping_count.acquire'
     288                           + '- res should not be False')
     289  
     290          sleepers = 0
     291          while sleepers < n and self._sleeping_count.acquire(False):
     292              self._wait_semaphore.release()        # wake up one sleeper
     293              sleepers += 1
     294  
     295          if sleepers:
     296              for i in range(sleepers):
     297                  self._woken_count.acquire()       # wait for a sleeper to wake
     298  
     299              # rezero wait_semaphore in case some timeouts just happened
     300              while self._wait_semaphore.acquire(False):
     301                  pass
     302  
     303      def notify_all(self):
     304          self.notify(n=sys.maxsize)
     305  
     306      def wait_for(self, predicate, timeout=None):
     307          result = predicate()
     308          if result:
     309              return result
     310          if timeout is not None:
     311              endtime = time.monotonic() + timeout
     312          else:
     313              endtime = None
     314              waittime = None
     315          while not result:
     316              if endtime is not None:
     317                  waittime = endtime - time.monotonic()
     318                  if waittime <= 0:
     319                      break
     320              self.wait(waittime)
     321              result = predicate()
     322          return result
     323  
     324  #
     325  # Event
     326  #
     327  
     328  class ESC[4;38;5;81mEvent(ESC[4;38;5;149mobject):
     329  
     330      def __init__(self, *, ctx):
     331          self._cond = ctx.Condition(ctx.Lock())
     332          self._flag = ctx.Semaphore(0)
     333  
     334      def is_set(self):
     335          with self._cond:
     336              if self._flag.acquire(False):
     337                  self._flag.release()
     338                  return True
     339              return False
     340  
     341      def set(self):
     342          with self._cond:
     343              self._flag.acquire(False)
     344              self._flag.release()
     345              self._cond.notify_all()
     346  
     347      def clear(self):
     348          with self._cond:
     349              self._flag.acquire(False)
     350  
     351      def wait(self, timeout=None):
     352          with self._cond:
     353              if self._flag.acquire(False):
     354                  self._flag.release()
     355              else:
     356                  self._cond.wait(timeout)
     357  
     358              if self._flag.acquire(False):
     359                  self._flag.release()
     360                  return True
     361              return False
     362  
     363      def __repr__(self) -> str:
     364          set_status = 'set' if self.is_set() else 'unset'
     365          return f"<{type(self).__qualname__} at {id(self):#x} {set_status}>"
     366  #
     367  # Barrier
     368  #
     369  
     370  class ESC[4;38;5;81mBarrier(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mBarrier):
     371  
     372      def __init__(self, parties, action=None, timeout=None, *, ctx):
     373          import struct
     374          from .heap import BufferWrapper
     375          wrapper = BufferWrapper(struct.calcsize('i') * 2)
     376          cond = ctx.Condition()
     377          self.__setstate__((parties, action, timeout, cond, wrapper))
     378          self._state = 0
     379          self._count = 0
     380  
     381      def __setstate__(self, state):
     382          (self._parties, self._action, self._timeout,
     383           self._cond, self._wrapper) = state
     384          self._array = self._wrapper.create_memoryview().cast('i')
     385  
     386      def __getstate__(self):
     387          return (self._parties, self._action, self._timeout,
     388                  self._cond, self._wrapper)
     389  
     390      @property
     391      def _state(self):
     392          return self._array[0]
     393  
     394      @_state.setter
     395      def _state(self, value):
     396          self._array[0] = value
     397  
     398      @property
     399      def _count(self):
     400          return self._array[1]
     401  
     402      @_count.setter
     403      def _count(self, value):
     404          self._array[1] = value