(root)/
Python-3.11.7/
Lib/
multiprocessing/
forkserver.py
       1  import errno
       2  import os
       3  import selectors
       4  import signal
       5  import socket
       6  import struct
       7  import sys
       8  import threading
       9  import warnings
      10  
      11  from . import connection
      12  from . import process
      13  from .context import reduction
      14  from . import resource_tracker
      15  from . import spawn
      16  from . import util
      17  
      18  __all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process',
      19             'set_forkserver_preload']
      20  
      21  #
      22  #
      23  #
      24  
      25  MAXFDS_TO_SEND = 256
      26  SIGNED_STRUCT = struct.Struct('q')     # large enough for pid_t
      27  
      28  #
      29  # Forkserver class
      30  #
      31  
      32  class ESC[4;38;5;81mForkServer(ESC[4;38;5;149mobject):
      33  
      34      def __init__(self):
      35          self._forkserver_address = None
      36          self._forkserver_alive_fd = None
      37          self._forkserver_pid = None
      38          self._inherited_fds = None
      39          self._lock = threading.Lock()
      40          self._preload_modules = ['__main__']
      41  
      42      def _stop(self):
      43          # Method used by unit tests to stop the server
      44          with self._lock:
      45              self._stop_unlocked()
      46  
      47      def _stop_unlocked(self):
      48          if self._forkserver_pid is None:
      49              return
      50  
      51          # close the "alive" file descriptor asks the server to stop
      52          os.close(self._forkserver_alive_fd)
      53          self._forkserver_alive_fd = None
      54  
      55          os.waitpid(self._forkserver_pid, 0)
      56          self._forkserver_pid = None
      57  
      58          if not util.is_abstract_socket_namespace(self._forkserver_address):
      59              os.unlink(self._forkserver_address)
      60          self._forkserver_address = None
      61  
      62      def set_forkserver_preload(self, modules_names):
      63          '''Set list of module names to try to load in forkserver process.'''
      64          if not all(type(mod) is str for mod in modules_names):
      65              raise TypeError('module_names must be a list of strings')
      66          self._preload_modules = modules_names
      67  
      68      def get_inherited_fds(self):
      69          '''Return list of fds inherited from parent process.
      70  
      71          This returns None if the current process was not started by fork
      72          server.
      73          '''
      74          return self._inherited_fds
      75  
      76      def connect_to_new_process(self, fds):
      77          '''Request forkserver to create a child process.
      78  
      79          Returns a pair of fds (status_r, data_w).  The calling process can read
      80          the child process's pid and (eventually) its returncode from status_r.
      81          The calling process should write to data_w the pickled preparation and
      82          process data.
      83          '''
      84          self.ensure_running()
      85          if len(fds) + 4 >= MAXFDS_TO_SEND:
      86              raise ValueError('too many fds')
      87          with socket.socket(socket.AF_UNIX) as client:
      88              client.connect(self._forkserver_address)
      89              parent_r, child_w = os.pipe()
      90              child_r, parent_w = os.pipe()
      91              allfds = [child_r, child_w, self._forkserver_alive_fd,
      92                        resource_tracker.getfd()]
      93              allfds += fds
      94              try:
      95                  reduction.sendfds(client, allfds)
      96                  return parent_r, parent_w
      97              except:
      98                  os.close(parent_r)
      99                  os.close(parent_w)
     100                  raise
     101              finally:
     102                  os.close(child_r)
     103                  os.close(child_w)
     104  
     105      def ensure_running(self):
     106          '''Make sure that a fork server is running.
     107  
     108          This can be called from any process.  Note that usually a child
     109          process will just reuse the forkserver started by its parent, so
     110          ensure_running() will do nothing.
     111          '''
     112          with self._lock:
     113              resource_tracker.ensure_running()
     114              if self._forkserver_pid is not None:
     115                  # forkserver was launched before, is it still running?
     116                  pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG)
     117                  if not pid:
     118                      # still alive
     119                      return
     120                  # dead, launch it again
     121                  os.close(self._forkserver_alive_fd)
     122                  self._forkserver_address = None
     123                  self._forkserver_alive_fd = None
     124                  self._forkserver_pid = None
     125  
     126              cmd = ('from multiprocessing.forkserver import main; ' +
     127                     'main(%d, %d, %r, **%r)')
     128  
     129              if self._preload_modules:
     130                  desired_keys = {'main_path', 'sys_path'}
     131                  data = spawn.get_preparation_data('ignore')
     132                  data = {x: y for x, y in data.items() if x in desired_keys}
     133              else:
     134                  data = {}
     135  
     136              with socket.socket(socket.AF_UNIX) as listener:
     137                  address = connection.arbitrary_address('AF_UNIX')
     138                  listener.bind(address)
     139                  if not util.is_abstract_socket_namespace(address):
     140                      os.chmod(address, 0o600)
     141                  listener.listen()
     142  
     143                  # all client processes own the write end of the "alive" pipe;
     144                  # when they all terminate the read end becomes ready.
     145                  alive_r, alive_w = os.pipe()
     146                  try:
     147                      fds_to_pass = [listener.fileno(), alive_r]
     148                      cmd %= (listener.fileno(), alive_r, self._preload_modules,
     149                              data)
     150                      exe = spawn.get_executable()
     151                      args = [exe] + util._args_from_interpreter_flags()
     152                      args += ['-c', cmd]
     153                      pid = util.spawnv_passfds(exe, args, fds_to_pass)
     154                  except:
     155                      os.close(alive_w)
     156                      raise
     157                  finally:
     158                      os.close(alive_r)
     159                  self._forkserver_address = address
     160                  self._forkserver_alive_fd = alive_w
     161                  self._forkserver_pid = pid
     162  
     163  #
     164  #
     165  #
     166  
     167  def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
     168      '''Run forkserver.'''
     169      if preload:
     170          if '__main__' in preload and main_path is not None:
     171              process.current_process()._inheriting = True
     172              try:
     173                  spawn.import_main_path(main_path)
     174              finally:
     175                  del process.current_process()._inheriting
     176          for modname in preload:
     177              try:
     178                  __import__(modname)
     179              except ImportError:
     180                  pass
     181  
     182      util._close_stdin()
     183  
     184      sig_r, sig_w = os.pipe()
     185      os.set_blocking(sig_r, False)
     186      os.set_blocking(sig_w, False)
     187  
     188      def sigchld_handler(*_unused):
     189          # Dummy signal handler, doesn't do anything
     190          pass
     191  
     192      handlers = {
     193          # unblocking SIGCHLD allows the wakeup fd to notify our event loop
     194          signal.SIGCHLD: sigchld_handler,
     195          # protect the process from ^C
     196          signal.SIGINT: signal.SIG_IGN,
     197          }
     198      old_handlers = {sig: signal.signal(sig, val)
     199                      for (sig, val) in handlers.items()}
     200  
     201      # calling os.write() in the Python signal handler is racy
     202      signal.set_wakeup_fd(sig_w)
     203  
     204      # map child pids to client fds
     205      pid_to_fd = {}
     206  
     207      with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \
     208           selectors.DefaultSelector() as selector:
     209          _forkserver._forkserver_address = listener.getsockname()
     210  
     211          selector.register(listener, selectors.EVENT_READ)
     212          selector.register(alive_r, selectors.EVENT_READ)
     213          selector.register(sig_r, selectors.EVENT_READ)
     214  
     215          while True:
     216              try:
     217                  while True:
     218                      rfds = [key.fileobj for (key, events) in selector.select()]
     219                      if rfds:
     220                          break
     221  
     222                  if alive_r in rfds:
     223                      # EOF because no more client processes left
     224                      assert os.read(alive_r, 1) == b'', "Not at EOF?"
     225                      raise SystemExit
     226  
     227                  if sig_r in rfds:
     228                      # Got SIGCHLD
     229                      os.read(sig_r, 65536)  # exhaust
     230                      while True:
     231                          # Scan for child processes
     232                          try:
     233                              pid, sts = os.waitpid(-1, os.WNOHANG)
     234                          except ChildProcessError:
     235                              break
     236                          if pid == 0:
     237                              break
     238                          child_w = pid_to_fd.pop(pid, None)
     239                          if child_w is not None:
     240                              returncode = os.waitstatus_to_exitcode(sts)
     241  
     242                              # Send exit code to client process
     243                              try:
     244                                  write_signed(child_w, returncode)
     245                              except BrokenPipeError:
     246                                  # client vanished
     247                                  pass
     248                              os.close(child_w)
     249                          else:
     250                              # This shouldn't happen really
     251                              warnings.warn('forkserver: waitpid returned '
     252                                            'unexpected pid %d' % pid)
     253  
     254                  if listener in rfds:
     255                      # Incoming fork request
     256                      with listener.accept()[0] as s:
     257                          # Receive fds from client
     258                          fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
     259                          if len(fds) > MAXFDS_TO_SEND:
     260                              raise RuntimeError(
     261                                  "Too many ({0:n}) fds to send".format(
     262                                      len(fds)))
     263                          child_r, child_w, *fds = fds
     264                          s.close()
     265                          pid = os.fork()
     266                          if pid == 0:
     267                              # Child
     268                              code = 1
     269                              try:
     270                                  listener.close()
     271                                  selector.close()
     272                                  unused_fds = [alive_r, child_w, sig_r, sig_w]
     273                                  unused_fds.extend(pid_to_fd.values())
     274                                  code = _serve_one(child_r, fds,
     275                                                    unused_fds,
     276                                                    old_handlers)
     277                              except Exception:
     278                                  sys.excepthook(*sys.exc_info())
     279                                  sys.stderr.flush()
     280                              finally:
     281                                  os._exit(code)
     282                          else:
     283                              # Send pid to client process
     284                              try:
     285                                  write_signed(child_w, pid)
     286                              except BrokenPipeError:
     287                                  # client vanished
     288                                  pass
     289                              pid_to_fd[pid] = child_w
     290                              os.close(child_r)
     291                              for fd in fds:
     292                                  os.close(fd)
     293  
     294              except OSError as e:
     295                  if e.errno != errno.ECONNABORTED:
     296                      raise
     297  
     298  
     299  def _serve_one(child_r, fds, unused_fds, handlers):
     300      # close unnecessary stuff and reset signal handlers
     301      signal.set_wakeup_fd(-1)
     302      for sig, val in handlers.items():
     303          signal.signal(sig, val)
     304      for fd in unused_fds:
     305          os.close(fd)
     306  
     307      (_forkserver._forkserver_alive_fd,
     308       resource_tracker._resource_tracker._fd,
     309       *_forkserver._inherited_fds) = fds
     310  
     311      # Run process object received over pipe
     312      parent_sentinel = os.dup(child_r)
     313      code = spawn._main(child_r, parent_sentinel)
     314  
     315      return code
     316  
     317  
     318  #
     319  # Read and write signed numbers
     320  #
     321  
     322  def read_signed(fd):
     323      data = b''
     324      length = SIGNED_STRUCT.size
     325      while len(data) < length:
     326          s = os.read(fd, length - len(data))
     327          if not s:
     328              raise EOFError('unexpected EOF')
     329          data += s
     330      return SIGNED_STRUCT.unpack(data)[0]
     331  
     332  def write_signed(fd, n):
     333      msg = SIGNED_STRUCT.pack(n)
     334      while msg:
     335          nbytes = os.write(fd, msg)
     336          if nbytes == 0:
     337              raise RuntimeError('should not get here')
     338          msg = msg[nbytes:]
     339  
     340  #
     341  #
     342  #
     343  
     344  _forkserver = ForkServer()
     345  ensure_running = _forkserver.ensure_running
     346  get_inherited_fds = _forkserver.get_inherited_fds
     347  connect_to_new_process = _forkserver.connect_to_new_process
     348  set_forkserver_preload = _forkserver.set_forkserver_preload