(root)/
Python-3.12.0/
Lib/
multiprocessing/
util.py
       1  #
       2  # Module providing various facilities to other parts of the package
       3  #
       4  # multiprocessing/util.py
       5  #
       6  # Copyright (c) 2006-2008, R Oudkerk
       7  # Licensed to PSF under a Contributor Agreement.
       8  #
       9  
      10  import os
      11  import itertools
      12  import sys
      13  import weakref
      14  import atexit
      15  import threading        # we want threading to install it's
      16                          # cleanup function before multiprocessing does
      17  from subprocess import _args_from_interpreter_flags
      18  
      19  from . import process
      20  
      21  __all__ = [
      22      'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
      23      'log_to_stderr', 'get_temp_dir', 'register_after_fork',
      24      'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
      25      'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING',
      26      ]
      27  
      28  #
      29  # Logging
      30  #
      31  
      32  NOTSET = 0
      33  SUBDEBUG = 5
      34  DEBUG = 10
      35  INFO = 20
      36  SUBWARNING = 25
      37  
      38  LOGGER_NAME = 'multiprocessing'
      39  DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
      40  
      41  _logger = None
      42  _log_to_stderr = False
      43  
      44  def sub_debug(msg, *args):
      45      if _logger:
      46          _logger.log(SUBDEBUG, msg, *args)
      47  
      48  def debug(msg, *args):
      49      if _logger:
      50          _logger.log(DEBUG, msg, *args)
      51  
      52  def info(msg, *args):
      53      if _logger:
      54          _logger.log(INFO, msg, *args)
      55  
      56  def sub_warning(msg, *args):
      57      if _logger:
      58          _logger.log(SUBWARNING, msg, *args)
      59  
      60  def get_logger():
      61      '''
      62      Returns logger used by multiprocessing
      63      '''
      64      global _logger
      65      import logging
      66  
      67      logging._acquireLock()
      68      try:
      69          if not _logger:
      70  
      71              _logger = logging.getLogger(LOGGER_NAME)
      72              _logger.propagate = 0
      73  
      74              # XXX multiprocessing should cleanup before logging
      75              if hasattr(atexit, 'unregister'):
      76                  atexit.unregister(_exit_function)
      77                  atexit.register(_exit_function)
      78              else:
      79                  atexit._exithandlers.remove((_exit_function, (), {}))
      80                  atexit._exithandlers.append((_exit_function, (), {}))
      81  
      82      finally:
      83          logging._releaseLock()
      84  
      85      return _logger
      86  
      87  def log_to_stderr(level=None):
      88      '''
      89      Turn on logging and add a handler which prints to stderr
      90      '''
      91      global _log_to_stderr
      92      import logging
      93  
      94      logger = get_logger()
      95      formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
      96      handler = logging.StreamHandler()
      97      handler.setFormatter(formatter)
      98      logger.addHandler(handler)
      99  
     100      if level:
     101          logger.setLevel(level)
     102      _log_to_stderr = True
     103      return _logger
     104  
     105  
     106  # Abstract socket support
     107  
     108  def _platform_supports_abstract_sockets():
     109      if sys.platform == "linux":
     110          return True
     111      if hasattr(sys, 'getandroidapilevel'):
     112          return True
     113      return False
     114  
     115  
     116  def is_abstract_socket_namespace(address):
     117      if not address:
     118          return False
     119      if isinstance(address, bytes):
     120          return address[0] == 0
     121      elif isinstance(address, str):
     122          return address[0] == "\0"
     123      raise TypeError(f'address type of {address!r} unrecognized')
     124  
     125  
     126  abstract_sockets_supported = _platform_supports_abstract_sockets()
     127  
     128  #
     129  # Function returning a temp directory which will be removed on exit
     130  #
     131  
     132  def _remove_temp_dir(rmtree, tempdir):
     133      rmtree(tempdir)
     134  
     135      current_process = process.current_process()
     136      # current_process() can be None if the finalizer is called
     137      # late during Python finalization
     138      if current_process is not None:
     139          current_process._config['tempdir'] = None
     140  
     141  def get_temp_dir():
     142      # get name of a temp directory which will be automatically cleaned up
     143      tempdir = process.current_process()._config.get('tempdir')
     144      if tempdir is None:
     145          import shutil, tempfile
     146          tempdir = tempfile.mkdtemp(prefix='pymp-')
     147          info('created temp directory %s', tempdir)
     148          # keep a strong reference to shutil.rmtree(), since the finalizer
     149          # can be called late during Python shutdown
     150          Finalize(None, _remove_temp_dir, args=(shutil.rmtree, tempdir),
     151                   exitpriority=-100)
     152          process.current_process()._config['tempdir'] = tempdir
     153      return tempdir
     154  
     155  #
     156  # Support for reinitialization of objects when bootstrapping a child process
     157  #
     158  
     159  _afterfork_registry = weakref.WeakValueDictionary()
     160  _afterfork_counter = itertools.count()
     161  
     162  def _run_after_forkers():
     163      items = list(_afterfork_registry.items())
     164      items.sort()
     165      for (index, ident, func), obj in items:
     166          try:
     167              func(obj)
     168          except Exception as e:
     169              info('after forker raised exception %s', e)
     170  
     171  def register_after_fork(obj, func):
     172      _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj
     173  
     174  #
     175  # Finalization using weakrefs
     176  #
     177  
     178  _finalizer_registry = {}
     179  _finalizer_counter = itertools.count()
     180  
     181  
     182  class ESC[4;38;5;81mFinalize(ESC[4;38;5;149mobject):
     183      '''
     184      Class which supports object finalization using weakrefs
     185      '''
     186      def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
     187          if (exitpriority is not None) and not isinstance(exitpriority,int):
     188              raise TypeError(
     189                  "Exitpriority ({0!r}) must be None or int, not {1!s}".format(
     190                      exitpriority, type(exitpriority)))
     191  
     192          if obj is not None:
     193              self._weakref = weakref.ref(obj, self)
     194          elif exitpriority is None:
     195              raise ValueError("Without object, exitpriority cannot be None")
     196  
     197          self._callback = callback
     198          self._args = args
     199          self._kwargs = kwargs or {}
     200          self._key = (exitpriority, next(_finalizer_counter))
     201          self._pid = os.getpid()
     202  
     203          _finalizer_registry[self._key] = self
     204  
     205      def __call__(self, wr=None,
     206                   # Need to bind these locally because the globals can have
     207                   # been cleared at shutdown
     208                   _finalizer_registry=_finalizer_registry,
     209                   sub_debug=sub_debug, getpid=os.getpid):
     210          '''
     211          Run the callback unless it has already been called or cancelled
     212          '''
     213          try:
     214              del _finalizer_registry[self._key]
     215          except KeyError:
     216              sub_debug('finalizer no longer registered')
     217          else:
     218              if self._pid != getpid():
     219                  sub_debug('finalizer ignored because different process')
     220                  res = None
     221              else:
     222                  sub_debug('finalizer calling %s with args %s and kwargs %s',
     223                            self._callback, self._args, self._kwargs)
     224                  res = self._callback(*self._args, **self._kwargs)
     225              self._weakref = self._callback = self._args = \
     226                              self._kwargs = self._key = None
     227              return res
     228  
     229      def cancel(self):
     230          '''
     231          Cancel finalization of the object
     232          '''
     233          try:
     234              del _finalizer_registry[self._key]
     235          except KeyError:
     236              pass
     237          else:
     238              self._weakref = self._callback = self._args = \
     239                              self._kwargs = self._key = None
     240  
     241      def still_active(self):
     242          '''
     243          Return whether this finalizer is still waiting to invoke callback
     244          '''
     245          return self._key in _finalizer_registry
     246  
     247      def __repr__(self):
     248          try:
     249              obj = self._weakref()
     250          except (AttributeError, TypeError):
     251              obj = None
     252  
     253          if obj is None:
     254              return '<%s object, dead>' % self.__class__.__name__
     255  
     256          x = '<%s object, callback=%s' % (
     257                  self.__class__.__name__,
     258                  getattr(self._callback, '__name__', self._callback))
     259          if self._args:
     260              x += ', args=' + str(self._args)
     261          if self._kwargs:
     262              x += ', kwargs=' + str(self._kwargs)
     263          if self._key[0] is not None:
     264              x += ', exitpriority=' + str(self._key[0])
     265          return x + '>'
     266  
     267  
     268  def _run_finalizers(minpriority=None):
     269      '''
     270      Run all finalizers whose exit priority is not None and at least minpriority
     271  
     272      Finalizers with highest priority are called first; finalizers with
     273      the same priority will be called in reverse order of creation.
     274      '''
     275      if _finalizer_registry is None:
     276          # This function may be called after this module's globals are
     277          # destroyed.  See the _exit_function function in this module for more
     278          # notes.
     279          return
     280  
     281      if minpriority is None:
     282          f = lambda p : p[0] is not None
     283      else:
     284          f = lambda p : p[0] is not None and p[0] >= minpriority
     285  
     286      # Careful: _finalizer_registry may be mutated while this function
     287      # is running (either by a GC run or by another thread).
     288  
     289      # list(_finalizer_registry) should be atomic, while
     290      # list(_finalizer_registry.items()) is not.
     291      keys = [key for key in list(_finalizer_registry) if f(key)]
     292      keys.sort(reverse=True)
     293  
     294      for key in keys:
     295          finalizer = _finalizer_registry.get(key)
     296          # key may have been removed from the registry
     297          if finalizer is not None:
     298              sub_debug('calling %s', finalizer)
     299              try:
     300                  finalizer()
     301              except Exception:
     302                  import traceback
     303                  traceback.print_exc()
     304  
     305      if minpriority is None:
     306          _finalizer_registry.clear()
     307  
     308  #
     309  # Clean up on exit
     310  #
     311  
     312  def is_exiting():
     313      '''
     314      Returns true if the process is shutting down
     315      '''
     316      return _exiting or _exiting is None
     317  
     318  _exiting = False
     319  
     320  def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
     321                     active_children=process.active_children,
     322                     current_process=process.current_process):
     323      # We hold on to references to functions in the arglist due to the
     324      # situation described below, where this function is called after this
     325      # module's globals are destroyed.
     326  
     327      global _exiting
     328  
     329      if not _exiting:
     330          _exiting = True
     331  
     332          info('process shutting down')
     333          debug('running all "atexit" finalizers with priority >= 0')
     334          _run_finalizers(0)
     335  
     336          if current_process() is not None:
     337              # We check if the current process is None here because if
     338              # it's None, any call to ``active_children()`` will raise
     339              # an AttributeError (active_children winds up trying to
     340              # get attributes from util._current_process).  One
     341              # situation where this can happen is if someone has
     342              # manipulated sys.modules, causing this module to be
     343              # garbage collected.  The destructor for the module type
     344              # then replaces all values in the module dict with None.
     345              # For instance, after setuptools runs a test it replaces
     346              # sys.modules with a copy created earlier.  See issues
     347              # #9775 and #15881.  Also related: #4106, #9205, and
     348              # #9207.
     349  
     350              for p in active_children():
     351                  if p.daemon:
     352                      info('calling terminate() for daemon %s', p.name)
     353                      p._popen.terminate()
     354  
     355              for p in active_children():
     356                  info('calling join() for process %s', p.name)
     357                  p.join()
     358  
     359          debug('running the remaining "atexit" finalizers')
     360          _run_finalizers()
     361  
     362  atexit.register(_exit_function)
     363  
     364  #
     365  # Some fork aware types
     366  #
     367  
     368  class ESC[4;38;5;81mForkAwareThreadLock(ESC[4;38;5;149mobject):
     369      def __init__(self):
     370          self._lock = threading.Lock()
     371          self.acquire = self._lock.acquire
     372          self.release = self._lock.release
     373          register_after_fork(self, ForkAwareThreadLock._at_fork_reinit)
     374  
     375      def _at_fork_reinit(self):
     376          self._lock._at_fork_reinit()
     377  
     378      def __enter__(self):
     379          return self._lock.__enter__()
     380  
     381      def __exit__(self, *args):
     382          return self._lock.__exit__(*args)
     383  
     384  
     385  class ESC[4;38;5;81mForkAwareLocal(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mlocal):
     386      def __init__(self):
     387          register_after_fork(self, lambda obj : obj.__dict__.clear())
     388      def __reduce__(self):
     389          return type(self), ()
     390  
     391  #
     392  # Close fds except those specified
     393  #
     394  
     395  try:
     396      MAXFD = os.sysconf("SC_OPEN_MAX")
     397  except Exception:
     398      MAXFD = 256
     399  
     400  def close_all_fds_except(fds):
     401      fds = list(fds) + [-1, MAXFD]
     402      fds.sort()
     403      assert fds[-1] == MAXFD, 'fd too large'
     404      for i in range(len(fds) - 1):
     405          os.closerange(fds[i]+1, fds[i+1])
     406  #
     407  # Close sys.stdin and replace stdin with os.devnull
     408  #
     409  
     410  def _close_stdin():
     411      if sys.stdin is None:
     412          return
     413  
     414      try:
     415          sys.stdin.close()
     416      except (OSError, ValueError):
     417          pass
     418  
     419      try:
     420          fd = os.open(os.devnull, os.O_RDONLY)
     421          try:
     422              sys.stdin = open(fd, encoding="utf-8", closefd=False)
     423          except:
     424              os.close(fd)
     425              raise
     426      except (OSError, ValueError):
     427          pass
     428  
     429  #
     430  # Flush standard streams, if any
     431  #
     432  
     433  def _flush_std_streams():
     434      try:
     435          sys.stdout.flush()
     436      except (AttributeError, ValueError):
     437          pass
     438      try:
     439          sys.stderr.flush()
     440      except (AttributeError, ValueError):
     441          pass
     442  
     443  #
     444  # Start a program with only specified fds kept open
     445  #
     446  
     447  def spawnv_passfds(path, args, passfds):
     448      import _posixsubprocess
     449      import subprocess
     450      passfds = tuple(sorted(map(int, passfds)))
     451      errpipe_read, errpipe_write = os.pipe()
     452      try:
     453          return _posixsubprocess.fork_exec(
     454              args, [path], True, passfds, None, None,
     455              -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,
     456              False, False, -1, None, None, None, -1, None,
     457              subprocess._USE_VFORK)
     458      finally:
     459          os.close(errpipe_read)
     460          os.close(errpipe_write)
     461  
     462  
     463  def close_fds(*fds):
     464      """Close each file descriptor given as an argument"""
     465      for fd in fds:
     466          os.close(fd)
     467  
     468  
     469  def _cleanup_tests():
     470      """Cleanup multiprocessing resources when multiprocessing tests
     471      completed."""
     472  
     473      from test import support
     474  
     475      # cleanup multiprocessing
     476      process._cleanup()
     477  
     478      # Stop the ForkServer process if it's running
     479      from multiprocessing import forkserver
     480      forkserver._forkserver._stop()
     481  
     482      # Stop the ResourceTracker process if it's running
     483      from multiprocessing import resource_tracker
     484      resource_tracker._resource_tracker._stop()
     485  
     486      # bpo-37421: Explicitly call _run_finalizers() to remove immediately
     487      # temporary directories created by multiprocessing.util.get_temp_dir().
     488      _run_finalizers()
     489      support.gc_collect()
     490  
     491      support.reap_children()