python (3.11.7)

(root)/
lib/
python3.11/
multiprocessing/
resource_sharer.py
       1  #
       2  # We use a background thread for sharing fds on Unix, and for sharing sockets on
       3  # Windows.
       4  #
       5  # A client which wants to pickle a resource registers it with the resource
       6  # sharer and gets an identifier in return.  The unpickling process will connect
       7  # to the resource sharer, sends the identifier and its pid, and then receives
       8  # the resource.
       9  #
      10  
      11  import os
      12  import signal
      13  import socket
      14  import sys
      15  import threading
      16  
      17  from . import process
      18  from .context import reduction
      19  from . import util
      20  
      21  __all__ = ['stop']
      22  
      23  
      24  if sys.platform == 'win32':
      25      __all__ += ['DupSocket']
      26  
      27      class ESC[4;38;5;81mDupSocket(ESC[4;38;5;149mobject):
      28          '''Picklable wrapper for a socket.'''
      29          def __init__(self, sock):
      30              new_sock = sock.dup()
      31              def send(conn, pid):
      32                  share = new_sock.share(pid)
      33                  conn.send_bytes(share)
      34              self._id = _resource_sharer.register(send, new_sock.close)
      35  
      36          def detach(self):
      37              '''Get the socket.  This should only be called once.'''
      38              with _resource_sharer.get_connection(self._id) as conn:
      39                  share = conn.recv_bytes()
      40                  return socket.fromshare(share)
      41  
      42  else:
      43      __all__ += ['DupFd']
      44  
      45      class ESC[4;38;5;81mDupFd(ESC[4;38;5;149mobject):
      46          '''Wrapper for fd which can be used at any time.'''
      47          def __init__(self, fd):
      48              new_fd = os.dup(fd)
      49              def send(conn, pid):
      50                  reduction.send_handle(conn, new_fd, pid)
      51              def close():
      52                  os.close(new_fd)
      53              self._id = _resource_sharer.register(send, close)
      54  
      55          def detach(self):
      56              '''Get the fd.  This should only be called once.'''
      57              with _resource_sharer.get_connection(self._id) as conn:
      58                  return reduction.recv_handle(conn)
      59  
      60  
      61  class ESC[4;38;5;81m_ResourceSharer(ESC[4;38;5;149mobject):
      62      '''Manager for resources using background thread.'''
      63      def __init__(self):
      64          self._key = 0
      65          self._cache = {}
      66          self._lock = threading.Lock()
      67          self._listener = None
      68          self._address = None
      69          self._thread = None
      70          util.register_after_fork(self, _ResourceSharer._afterfork)
      71  
      72      def register(self, send, close):
      73          '''Register resource, returning an identifier.'''
      74          with self._lock:
      75              if self._address is None:
      76                  self._start()
      77              self._key += 1
      78              self._cache[self._key] = (send, close)
      79              return (self._address, self._key)
      80  
      81      @staticmethod
      82      def get_connection(ident):
      83          '''Return connection from which to receive identified resource.'''
      84          from .connection import Client
      85          address, key = ident
      86          c = Client(address, authkey=process.current_process().authkey)
      87          c.send((key, os.getpid()))
      88          return c
      89  
      90      def stop(self, timeout=None):
      91          '''Stop the background thread and clear registered resources.'''
      92          from .connection import Client
      93          with self._lock:
      94              if self._address is not None:
      95                  c = Client(self._address,
      96                             authkey=process.current_process().authkey)
      97                  c.send(None)
      98                  c.close()
      99                  self._thread.join(timeout)
     100                  if self._thread.is_alive():
     101                      util.sub_warning('_ResourceSharer thread did '
     102                                       'not stop when asked')
     103                  self._listener.close()
     104                  self._thread = None
     105                  self._address = None
     106                  self._listener = None
     107                  for key, (send, close) in self._cache.items():
     108                      close()
     109                  self._cache.clear()
     110  
     111      def _afterfork(self):
     112          for key, (send, close) in self._cache.items():
     113              close()
     114          self._cache.clear()
     115          self._lock._at_fork_reinit()
     116          if self._listener is not None:
     117              self._listener.close()
     118          self._listener = None
     119          self._address = None
     120          self._thread = None
     121  
     122      def _start(self):
     123          from .connection import Listener
     124          assert self._listener is None, "Already have Listener"
     125          util.debug('starting listener and thread for sending handles')
     126          self._listener = Listener(authkey=process.current_process().authkey)
     127          self._address = self._listener.address
     128          t = threading.Thread(target=self._serve)
     129          t.daemon = True
     130          t.start()
     131          self._thread = t
     132  
     133      def _serve(self):
     134          if hasattr(signal, 'pthread_sigmask'):
     135              signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals())
     136          while 1:
     137              try:
     138                  with self._listener.accept() as conn:
     139                      msg = conn.recv()
     140                      if msg is None:
     141                          break
     142                      key, destination_pid = msg
     143                      send, close = self._cache.pop(key)
     144                      try:
     145                          send(conn, destination_pid)
     146                      finally:
     147                          close()
     148              except:
     149                  if not util.is_exiting():
     150                      sys.excepthook(*sys.exc_info())
     151  
     152  
     153  _resource_sharer = _ResourceSharer()
     154  stop = _resource_sharer.stop