1  """Selector event loop for Unix with signal handling."""
       2  
       3  import errno
       4  import io
       5  import itertools
       6  import os
       7  import selectors
       8  import signal
       9  import socket
      10  import stat
      11  import subprocess
      12  import sys
      13  import threading
      14  import warnings
      15  
      16  from . import base_events
      17  from . import base_subprocess
      18  from . import constants
      19  from . import coroutines
      20  from . import events
      21  from . import exceptions
      22  from . import futures
      23  from . import selector_events
      24  from . import tasks
      25  from . import transports
      26  from .log import logger
      27  
      28  
      29  __all__ = (
      30      'SelectorEventLoop',
      31      'AbstractChildWatcher', 'SafeChildWatcher',
      32      'FastChildWatcher', 'PidfdChildWatcher',
      33      'MultiLoopChildWatcher', 'ThreadedChildWatcher',
      34      'DefaultEventLoopPolicy',
      35  )
      36  
      37  
      38  if sys.platform == 'win32':  # pragma: no cover
      39      raise ImportError('Signals are not really supported on Windows')
      40  
      41  
      42  def _sighandler_noop(signum, frame):
      43      """Dummy signal handler."""
      44      pass
      45  
      46  
      47  def waitstatus_to_exitcode(status):
      48      try:
      49          return os.waitstatus_to_exitcode(status)
      50      except ValueError:
      51          # The child exited, but we don't understand its status.
      52          # This shouldn't happen, but if it does, let's just
      53          # return that status; perhaps that helps debug it.
      54          return status
      55  
      56  
      57  class ESC[4;38;5;81m_UnixSelectorEventLoop(ESC[4;38;5;149mselector_eventsESC[4;38;5;149m.ESC[4;38;5;149mBaseSelectorEventLoop):
      58      """Unix event loop.
      59  
      60      Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
      61      """
      62  
      63      def __init__(self, selector=None):
      64          super().__init__(selector)
      65          self._signal_handlers = {}
      66  
      67      def close(self):
      68          super().close()
      69          if not sys.is_finalizing():
      70              for sig in list(self._signal_handlers):
      71                  self.remove_signal_handler(sig)
      72          else:
      73              if self._signal_handlers:
      74                  warnings.warn(f"Closing the loop {self!r} "
      75                                f"on interpreter shutdown "
      76                                f"stage, skipping signal handlers removal",
      77                                ResourceWarning,
      78                                source=self)
      79                  self._signal_handlers.clear()
      80  
      81      def _process_self_data(self, data):
      82          for signum in data:
      83              if not signum:
      84                  # ignore null bytes written by _write_to_self()
      85                  continue
      86              self._handle_signal(signum)
      87  
      88      def add_signal_handler(self, sig, callback, *args):
      89          """Add a handler for a signal.  UNIX only.
      90  
      91          Raise ValueError if the signal number is invalid or uncatchable.
      92          Raise RuntimeError if there is a problem setting up the handler.
      93          """
      94          if (coroutines.iscoroutine(callback) or
      95                  coroutines.iscoroutinefunction(callback)):
      96              raise TypeError("coroutines cannot be used "
      97                              "with add_signal_handler()")
      98          self._check_signal(sig)
      99          self._check_closed()
     100          try:
     101              # set_wakeup_fd() raises ValueError if this is not the
     102              # main thread.  By calling it early we ensure that an
     103              # event loop running in another thread cannot add a signal
     104              # handler.
     105              signal.set_wakeup_fd(self._csock.fileno())
     106          except (ValueError, OSError) as exc:
     107              raise RuntimeError(str(exc))
     108  
     109          handle = events.Handle(callback, args, self, None)
     110          self._signal_handlers[sig] = handle
     111  
     112          try:
     113              # Register a dummy signal handler to ask Python to write the signal
     114              # number in the wakeup file descriptor. _process_self_data() will
     115              # read signal numbers from this file descriptor to handle signals.
     116              signal.signal(sig, _sighandler_noop)
     117  
     118              # Set SA_RESTART to limit EINTR occurrences.
     119              signal.siginterrupt(sig, False)
     120          except OSError as exc:
     121              del self._signal_handlers[sig]
     122              if not self._signal_handlers:
     123                  try:
     124                      signal.set_wakeup_fd(-1)
     125                  except (ValueError, OSError) as nexc:
     126                      logger.info('set_wakeup_fd(-1) failed: %s', nexc)
     127  
     128              if exc.errno == errno.EINVAL:
     129                  raise RuntimeError(f'sig {sig} cannot be caught')
     130              else:
     131                  raise
     132  
     133      def _handle_signal(self, sig):
     134          """Internal helper that is the actual signal handler."""
     135          handle = self._signal_handlers.get(sig)
     136          if handle is None:
     137              return  # Assume it's some race condition.
     138          if handle._cancelled:
     139              self.remove_signal_handler(sig)  # Remove it properly.
     140          else:
     141              self._add_callback_signalsafe(handle)
     142  
     143      def remove_signal_handler(self, sig):
     144          """Remove a handler for a signal.  UNIX only.
     145  
     146          Return True if a signal handler was removed, False if not.
     147          """
     148          self._check_signal(sig)
     149          try:
     150              del self._signal_handlers[sig]
     151          except KeyError:
     152              return False
     153  
     154          if sig == signal.SIGINT:
     155              handler = signal.default_int_handler
     156          else:
     157              handler = signal.SIG_DFL
     158  
     159          try:
     160              signal.signal(sig, handler)
     161          except OSError as exc:
     162              if exc.errno == errno.EINVAL:
     163                  raise RuntimeError(f'sig {sig} cannot be caught')
     164              else:
     165                  raise
     166  
     167          if not self._signal_handlers:
     168              try:
     169                  signal.set_wakeup_fd(-1)
     170              except (ValueError, OSError) as exc:
     171                  logger.info('set_wakeup_fd(-1) failed: %s', exc)
     172  
     173          return True
     174  
     175      def _check_signal(self, sig):
     176          """Internal helper to validate a signal.
     177  
     178          Raise ValueError if the signal number is invalid or uncatchable.
     179          Raise RuntimeError if there is a problem setting up the handler.
     180          """
     181          if not isinstance(sig, int):
     182              raise TypeError(f'sig must be an int, not {sig!r}')
     183  
     184          if sig not in signal.valid_signals():
     185              raise ValueError(f'invalid signal number {sig}')
     186  
     187      def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
     188                                    extra=None):
     189          return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
     190  
     191      def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
     192                                     extra=None):
     193          return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
     194  
     195      async def _make_subprocess_transport(self, protocol, args, shell,
     196                                           stdin, stdout, stderr, bufsize,
     197                                           extra=None, **kwargs):
     198          with warnings.catch_warnings():
     199              warnings.simplefilter('ignore', DeprecationWarning)
     200              watcher = events.get_child_watcher()
     201  
     202          with watcher:
     203              if not watcher.is_active():
     204                  # Check early.
     205                  # Raising exception before process creation
     206                  # prevents subprocess execution if the watcher
     207                  # is not ready to handle it.
     208                  raise RuntimeError("asyncio.get_child_watcher() is not activated, "
     209                                  "subprocess support is not installed.")
     210              waiter = self.create_future()
     211              transp = _UnixSubprocessTransport(self, protocol, args, shell,
     212                                              stdin, stdout, stderr, bufsize,
     213                                              waiter=waiter, extra=extra,
     214                                              **kwargs)
     215              watcher.add_child_handler(transp.get_pid(),
     216                                      self._child_watcher_callback, transp)
     217              try:
     218                  await waiter
     219              except (SystemExit, KeyboardInterrupt):
     220                  raise
     221              except BaseException:
     222                  transp.close()
     223                  await transp._wait()
     224                  raise
     225  
     226          return transp
     227  
     228      def _child_watcher_callback(self, pid, returncode, transp):
     229          # Skip one iteration for callbacks to be executed
     230          self.call_soon_threadsafe(self.call_soon, transp._process_exited, returncode)
     231  
     232      async def create_unix_connection(
     233              self, protocol_factory, path=None, *,
     234              ssl=None, sock=None,
     235              server_hostname=None,
     236              ssl_handshake_timeout=None,
     237              ssl_shutdown_timeout=None):
     238          assert server_hostname is None or isinstance(server_hostname, str)
     239          if ssl:
     240              if server_hostname is None:
     241                  raise ValueError(
     242                      'you have to pass server_hostname when using ssl')
     243          else:
     244              if server_hostname is not None:
     245                  raise ValueError('server_hostname is only meaningful with ssl')
     246              if ssl_handshake_timeout is not None:
     247                  raise ValueError(
     248                      'ssl_handshake_timeout is only meaningful with ssl')
     249              if ssl_shutdown_timeout is not None:
     250                  raise ValueError(
     251                      'ssl_shutdown_timeout is only meaningful with ssl')
     252  
     253          if path is not None:
     254              if sock is not None:
     255                  raise ValueError(
     256                      'path and sock can not be specified at the same time')
     257  
     258              path = os.fspath(path)
     259              sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
     260              try:
     261                  sock.setblocking(False)
     262                  await self.sock_connect(sock, path)
     263              except:
     264                  sock.close()
     265                  raise
     266  
     267          else:
     268              if sock is None:
     269                  raise ValueError('no path and sock were specified')
     270              if (sock.family != socket.AF_UNIX or
     271                      sock.type != socket.SOCK_STREAM):
     272                  raise ValueError(
     273                      f'A UNIX Domain Stream Socket was expected, got {sock!r}')
     274              sock.setblocking(False)
     275  
     276          transport, protocol = await self._create_connection_transport(
     277              sock, protocol_factory, ssl, server_hostname,
     278              ssl_handshake_timeout=ssl_handshake_timeout,
     279              ssl_shutdown_timeout=ssl_shutdown_timeout)
     280          return transport, protocol
     281  
     282      async def create_unix_server(
     283              self, protocol_factory, path=None, *,
     284              sock=None, backlog=100, ssl=None,
     285              ssl_handshake_timeout=None,
     286              ssl_shutdown_timeout=None,
     287              start_serving=True):
     288          if isinstance(ssl, bool):
     289              raise TypeError('ssl argument must be an SSLContext or None')
     290  
     291          if ssl_handshake_timeout is not None and not ssl:
     292              raise ValueError(
     293                  'ssl_handshake_timeout is only meaningful with ssl')
     294  
     295          if ssl_shutdown_timeout is not None and not ssl:
     296              raise ValueError(
     297                  'ssl_shutdown_timeout is only meaningful with ssl')
     298  
     299          if path is not None:
     300              if sock is not None:
     301                  raise ValueError(
     302                      'path and sock can not be specified at the same time')
     303  
     304              path = os.fspath(path)
     305              sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
     306  
     307              # Check for abstract socket. `str` and `bytes` paths are supported.
     308              if path[0] not in (0, '\x00'):
     309                  try:
     310                      if stat.S_ISSOCK(os.stat(path).st_mode):
     311                          os.remove(path)
     312                  except FileNotFoundError:
     313                      pass
     314                  except OSError as err:
     315                      # Directory may have permissions only to create socket.
     316                      logger.error('Unable to check or remove stale UNIX socket '
     317                                   '%r: %r', path, err)
     318  
     319              try:
     320                  sock.bind(path)
     321              except OSError as exc:
     322                  sock.close()
     323                  if exc.errno == errno.EADDRINUSE:
     324                      # Let's improve the error message by adding
     325                      # with what exact address it occurs.
     326                      msg = f'Address {path!r} is already in use'
     327                      raise OSError(errno.EADDRINUSE, msg) from None
     328                  else:
     329                      raise
     330              except:
     331                  sock.close()
     332                  raise
     333          else:
     334              if sock is None:
     335                  raise ValueError(
     336                      'path was not specified, and no sock specified')
     337  
     338              if (sock.family != socket.AF_UNIX or
     339                      sock.type != socket.SOCK_STREAM):
     340                  raise ValueError(
     341                      f'A UNIX Domain Stream Socket was expected, got {sock!r}')
     342  
     343          sock.setblocking(False)
     344          server = base_events.Server(self, [sock], protocol_factory,
     345                                      ssl, backlog, ssl_handshake_timeout,
     346                                      ssl_shutdown_timeout)
     347          if start_serving:
     348              server._start_serving()
     349              # Skip one loop iteration so that all 'loop.add_reader'
     350              # go through.
     351              await tasks.sleep(0)
     352  
     353          return server
     354  
     355      async def _sock_sendfile_native(self, sock, file, offset, count):
     356          try:
     357              os.sendfile
     358          except AttributeError:
     359              raise exceptions.SendfileNotAvailableError(
     360                  "os.sendfile() is not available")
     361          try:
     362              fileno = file.fileno()
     363          except (AttributeError, io.UnsupportedOperation) as err:
     364              raise exceptions.SendfileNotAvailableError("not a regular file")
     365          try:
     366              fsize = os.fstat(fileno).st_size
     367          except OSError:
     368              raise exceptions.SendfileNotAvailableError("not a regular file")
     369          blocksize = count if count else fsize
     370          if not blocksize:
     371              return 0  # empty file
     372  
     373          fut = self.create_future()
     374          self._sock_sendfile_native_impl(fut, None, sock, fileno,
     375                                          offset, count, blocksize, 0)
     376          return await fut
     377  
     378      def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
     379                                     offset, count, blocksize, total_sent):
     380          fd = sock.fileno()
     381          if registered_fd is not None:
     382              # Remove the callback early.  It should be rare that the
     383              # selector says the fd is ready but the call still returns
     384              # EAGAIN, and I am willing to take a hit in that case in
     385              # order to simplify the common case.
     386              self.remove_writer(registered_fd)
     387          if fut.cancelled():
     388              self._sock_sendfile_update_filepos(fileno, offset, total_sent)
     389              return
     390          if count:
     391              blocksize = count - total_sent
     392              if blocksize <= 0:
     393                  self._sock_sendfile_update_filepos(fileno, offset, total_sent)
     394                  fut.set_result(total_sent)
     395                  return
     396  
     397          try:
     398              sent = os.sendfile(fd, fileno, offset, blocksize)
     399          except (BlockingIOError, InterruptedError):
     400              if registered_fd is None:
     401                  self._sock_add_cancellation_callback(fut, sock)
     402              self.add_writer(fd, self._sock_sendfile_native_impl, fut,
     403                              fd, sock, fileno,
     404                              offset, count, blocksize, total_sent)
     405          except OSError as exc:
     406              if (registered_fd is not None and
     407                      exc.errno == errno.ENOTCONN and
     408                      type(exc) is not ConnectionError):
     409                  # If we have an ENOTCONN and this isn't a first call to
     410                  # sendfile(), i.e. the connection was closed in the middle
     411                  # of the operation, normalize the error to ConnectionError
     412                  # to make it consistent across all Posix systems.
     413                  new_exc = ConnectionError(
     414                      "socket is not connected", errno.ENOTCONN)
     415                  new_exc.__cause__ = exc
     416                  exc = new_exc
     417              if total_sent == 0:
     418                  # We can get here for different reasons, the main
     419                  # one being 'file' is not a regular mmap(2)-like
     420                  # file, in which case we'll fall back on using
     421                  # plain send().
     422                  err = exceptions.SendfileNotAvailableError(
     423                      "os.sendfile call failed")
     424                  self._sock_sendfile_update_filepos(fileno, offset, total_sent)
     425                  fut.set_exception(err)
     426              else:
     427                  self._sock_sendfile_update_filepos(fileno, offset, total_sent)
     428                  fut.set_exception(exc)
     429          except (SystemExit, KeyboardInterrupt):
     430              raise
     431          except BaseException as exc:
     432              self._sock_sendfile_update_filepos(fileno, offset, total_sent)
     433              fut.set_exception(exc)
     434          else:
     435              if sent == 0:
     436                  # EOF
     437                  self._sock_sendfile_update_filepos(fileno, offset, total_sent)
     438                  fut.set_result(total_sent)
     439              else:
     440                  offset += sent
     441                  total_sent += sent
     442                  if registered_fd is None:
     443                      self._sock_add_cancellation_callback(fut, sock)
     444                  self.add_writer(fd, self._sock_sendfile_native_impl, fut,
     445                                  fd, sock, fileno,
     446                                  offset, count, blocksize, total_sent)
     447  
     448      def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
     449          if total_sent > 0:
     450              os.lseek(fileno, offset, os.SEEK_SET)
     451  
     452      def _sock_add_cancellation_callback(self, fut, sock):
     453          def cb(fut):
     454              if fut.cancelled():
     455                  fd = sock.fileno()
     456                  if fd != -1:
     457                      self.remove_writer(fd)
     458          fut.add_done_callback(cb)
     459  
     460  
     461  class ESC[4;38;5;81m_UnixReadPipeTransport(ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149mReadTransport):
     462  
     463      max_size = 256 * 1024  # max bytes we read in one event loop iteration
     464  
     465      def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
     466          super().__init__(extra)
     467          self._extra['pipe'] = pipe
     468          self._loop = loop
     469          self._pipe = pipe
     470          self._fileno = pipe.fileno()
     471          self._protocol = protocol
     472          self._closing = False
     473          self._paused = False
     474  
     475          mode = os.fstat(self._fileno).st_mode
     476          if not (stat.S_ISFIFO(mode) or
     477                  stat.S_ISSOCK(mode) or
     478                  stat.S_ISCHR(mode)):
     479              self._pipe = None
     480              self._fileno = None
     481              self._protocol = None
     482              raise ValueError("Pipe transport is for pipes/sockets only.")
     483  
     484          os.set_blocking(self._fileno, False)
     485  
     486          self._loop.call_soon(self._protocol.connection_made, self)
     487          # only start reading when connection_made() has been called
     488          self._loop.call_soon(self._add_reader,
     489                               self._fileno, self._read_ready)
     490          if waiter is not None:
     491              # only wake up the waiter when connection_made() has been called
     492              self._loop.call_soon(futures._set_result_unless_cancelled,
     493                                   waiter, None)
     494  
     495      def _add_reader(self, fd, callback):
     496          if not self.is_reading():
     497              return
     498          self._loop._add_reader(fd, callback)
     499  
     500      def is_reading(self):
     501          return not self._paused and not self._closing
     502  
     503      def __repr__(self):
     504          info = [self.__class__.__name__]
     505          if self._pipe is None:
     506              info.append('closed')
     507          elif self._closing:
     508              info.append('closing')
     509          info.append(f'fd={self._fileno}')
     510          selector = getattr(self._loop, '_selector', None)
     511          if self._pipe is not None and selector is not None:
     512              polling = selector_events._test_selector_event(
     513                  selector, self._fileno, selectors.EVENT_READ)
     514              if polling:
     515                  info.append('polling')
     516              else:
     517                  info.append('idle')
     518          elif self._pipe is not None:
     519              info.append('open')
     520          else:
     521              info.append('closed')
     522          return '<{}>'.format(' '.join(info))
     523  
     524      def _read_ready(self):
     525          try:
     526              data = os.read(self._fileno, self.max_size)
     527          except (BlockingIOError, InterruptedError):
     528              pass
     529          except OSError as exc:
     530              self._fatal_error(exc, 'Fatal read error on pipe transport')
     531          else:
     532              if data:
     533                  self._protocol.data_received(data)
     534              else:
     535                  if self._loop.get_debug():
     536                      logger.info("%r was closed by peer", self)
     537                  self._closing = True
     538                  self._loop._remove_reader(self._fileno)
     539                  self._loop.call_soon(self._protocol.eof_received)
     540                  self._loop.call_soon(self._call_connection_lost, None)
     541  
     542      def pause_reading(self):
     543          if not self.is_reading():
     544              return
     545          self._paused = True
     546          self._loop._remove_reader(self._fileno)
     547          if self._loop.get_debug():
     548              logger.debug("%r pauses reading", self)
     549  
     550      def resume_reading(self):
     551          if self._closing or not self._paused:
     552              return
     553          self._paused = False
     554          self._loop._add_reader(self._fileno, self._read_ready)
     555          if self._loop.get_debug():
     556              logger.debug("%r resumes reading", self)
     557  
     558      def set_protocol(self, protocol):
     559          self._protocol = protocol
     560  
     561      def get_protocol(self):
     562          return self._protocol
     563  
     564      def is_closing(self):
     565          return self._closing
     566  
     567      def close(self):
     568          if not self._closing:
     569              self._close(None)
     570  
     571      def __del__(self, _warn=warnings.warn):
     572          if self._pipe is not None:
     573              _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
     574              self._pipe.close()
     575  
     576      def _fatal_error(self, exc, message='Fatal error on pipe transport'):
     577          # should be called by exception handler only
     578          if (isinstance(exc, OSError) and exc.errno == errno.EIO):
     579              if self._loop.get_debug():
     580                  logger.debug("%r: %s", self, message, exc_info=True)
     581          else:
     582              self._loop.call_exception_handler({
     583                  'message': message,
     584                  'exception': exc,
     585                  'transport': self,
     586                  'protocol': self._protocol,
     587              })
     588          self._close(exc)
     589  
     590      def _close(self, exc):
     591          self._closing = True
     592          self._loop._remove_reader(self._fileno)
     593          self._loop.call_soon(self._call_connection_lost, exc)
     594  
     595      def _call_connection_lost(self, exc):
     596          try:
     597              self._protocol.connection_lost(exc)
     598          finally:
     599              self._pipe.close()
     600              self._pipe = None
     601              self._protocol = None
     602              self._loop = None
     603  
     604  
     605  class ESC[4;38;5;81m_UnixWritePipeTransport(ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149m_FlowControlMixin,
     606                                ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149mWriteTransport):
     607  
     608      def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
     609          super().__init__(extra, loop)
     610          self._extra['pipe'] = pipe
     611          self._pipe = pipe
     612          self._fileno = pipe.fileno()
     613          self._protocol = protocol
     614          self._buffer = bytearray()
     615          self._conn_lost = 0
     616          self._closing = False  # Set when close() or write_eof() called.
     617  
     618          mode = os.fstat(self._fileno).st_mode
     619          is_char = stat.S_ISCHR(mode)
     620          is_fifo = stat.S_ISFIFO(mode)
     621          is_socket = stat.S_ISSOCK(mode)
     622          if not (is_char or is_fifo or is_socket):
     623              self._pipe = None
     624              self._fileno = None
     625              self._protocol = None
     626              raise ValueError("Pipe transport is only for "
     627                               "pipes, sockets and character devices")
     628  
     629          os.set_blocking(self._fileno, False)
     630          self._loop.call_soon(self._protocol.connection_made, self)
     631  
     632          # On AIX, the reader trick (to be notified when the read end of the
     633          # socket is closed) only works for sockets. On other platforms it
     634          # works for pipes and sockets. (Exception: OS X 10.4?  Issue #19294.)
     635          if is_socket or (is_fifo and not sys.platform.startswith("aix")):
     636              # only start reading when connection_made() has been called
     637              self._loop.call_soon(self._loop._add_reader,
     638                                   self._fileno, self._read_ready)
     639  
     640          if waiter is not None:
     641              # only wake up the waiter when connection_made() has been called
     642              self._loop.call_soon(futures._set_result_unless_cancelled,
     643                                   waiter, None)
     644  
     645      def __repr__(self):
     646          info = [self.__class__.__name__]
     647          if self._pipe is None:
     648              info.append('closed')
     649          elif self._closing:
     650              info.append('closing')
     651          info.append(f'fd={self._fileno}')
     652          selector = getattr(self._loop, '_selector', None)
     653          if self._pipe is not None and selector is not None:
     654              polling = selector_events._test_selector_event(
     655                  selector, self._fileno, selectors.EVENT_WRITE)
     656              if polling:
     657                  info.append('polling')
     658              else:
     659                  info.append('idle')
     660  
     661              bufsize = self.get_write_buffer_size()
     662              info.append(f'bufsize={bufsize}')
     663          elif self._pipe is not None:
     664              info.append('open')
     665          else:
     666              info.append('closed')
     667          return '<{}>'.format(' '.join(info))
     668  
     669      def get_write_buffer_size(self):
     670          return len(self._buffer)
     671  
     672      def _read_ready(self):
     673          # Pipe was closed by peer.
     674          if self._loop.get_debug():
     675              logger.info("%r was closed by peer", self)
     676          if self._buffer:
     677              self._close(BrokenPipeError())
     678          else:
     679              self._close()
     680  
     681      def write(self, data):
     682          assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
     683          if isinstance(data, bytearray):
     684              data = memoryview(data)
     685          if not data:
     686              return
     687  
     688          if self._conn_lost or self._closing:
     689              if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
     690                  logger.warning('pipe closed by peer or '
     691                                 'os.write(pipe, data) raised exception.')
     692              self._conn_lost += 1
     693              return
     694  
     695          if not self._buffer:
     696              # Attempt to send it right away first.
     697              try:
     698                  n = os.write(self._fileno, data)
     699              except (BlockingIOError, InterruptedError):
     700                  n = 0
     701              except (SystemExit, KeyboardInterrupt):
     702                  raise
     703              except BaseException as exc:
     704                  self._conn_lost += 1
     705                  self._fatal_error(exc, 'Fatal write error on pipe transport')
     706                  return
     707              if n == len(data):
     708                  return
     709              elif n > 0:
     710                  data = memoryview(data)[n:]
     711              self._loop._add_writer(self._fileno, self._write_ready)
     712  
     713          self._buffer += data
     714          self._maybe_pause_protocol()
     715  
     716      def _write_ready(self):
     717          assert self._buffer, 'Data should not be empty'
     718  
     719          try:
     720              n = os.write(self._fileno, self._buffer)
     721          except (BlockingIOError, InterruptedError):
     722              pass
     723          except (SystemExit, KeyboardInterrupt):
     724              raise
     725          except BaseException as exc:
     726              self._buffer.clear()
     727              self._conn_lost += 1
     728              # Remove writer here, _fatal_error() doesn't it
     729              # because _buffer is empty.
     730              self._loop._remove_writer(self._fileno)
     731              self._fatal_error(exc, 'Fatal write error on pipe transport')
     732          else:
     733              if n == len(self._buffer):
     734                  self._buffer.clear()
     735                  self._loop._remove_writer(self._fileno)
     736                  self._maybe_resume_protocol()  # May append to buffer.
     737                  if self._closing:
     738                      self._loop._remove_reader(self._fileno)
     739                      self._call_connection_lost(None)
     740                  return
     741              elif n > 0:
     742                  del self._buffer[:n]
     743  
     744      def can_write_eof(self):
     745          return True
     746  
     747      def write_eof(self):
     748          if self._closing:
     749              return
     750          assert self._pipe
     751          self._closing = True
     752          if not self._buffer:
     753              self._loop._remove_reader(self._fileno)
     754              self._loop.call_soon(self._call_connection_lost, None)
     755  
     756      def set_protocol(self, protocol):
     757          self._protocol = protocol
     758  
     759      def get_protocol(self):
     760          return self._protocol
     761  
     762      def is_closing(self):
     763          return self._closing
     764  
     765      def close(self):
     766          if self._pipe is not None and not self._closing:
     767              # write_eof is all what we needed to close the write pipe
     768              self.write_eof()
     769  
     770      def __del__(self, _warn=warnings.warn):
     771          if self._pipe is not None:
     772              _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
     773              self._pipe.close()
     774  
     775      def abort(self):
     776          self._close(None)
     777  
     778      def _fatal_error(self, exc, message='Fatal error on pipe transport'):
     779          # should be called by exception handler only
     780          if isinstance(exc, OSError):
     781              if self._loop.get_debug():
     782                  logger.debug("%r: %s", self, message, exc_info=True)
     783          else:
     784              self._loop.call_exception_handler({
     785                  'message': message,
     786                  'exception': exc,
     787                  'transport': self,
     788                  'protocol': self._protocol,
     789              })
     790          self._close(exc)
     791  
     792      def _close(self, exc=None):
     793          self._closing = True
     794          if self._buffer:
     795              self._loop._remove_writer(self._fileno)
     796          self._buffer.clear()
     797          self._loop._remove_reader(self._fileno)
     798          self._loop.call_soon(self._call_connection_lost, exc)
     799  
     800      def _call_connection_lost(self, exc):
     801          try:
     802              self._protocol.connection_lost(exc)
     803          finally:
     804              self._pipe.close()
     805              self._pipe = None
     806              self._protocol = None
     807              self._loop = None
     808  
     809  
     810  class ESC[4;38;5;81m_UnixSubprocessTransport(ESC[4;38;5;149mbase_subprocessESC[4;38;5;149m.ESC[4;38;5;149mBaseSubprocessTransport):
     811  
     812      def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
     813          stdin_w = None
     814          if stdin == subprocess.PIPE and sys.platform.startswith('aix'):
     815              # Use a socket pair for stdin on AIX, since it does not
     816              # support selecting read events on the write end of a
     817              # socket (which we use in order to detect closing of the
     818              # other end).
     819              stdin, stdin_w = socket.socketpair()
     820          try:
     821              self._proc = subprocess.Popen(
     822                  args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
     823                  universal_newlines=False, bufsize=bufsize, **kwargs)
     824              if stdin_w is not None:
     825                  stdin.close()
     826                  self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
     827                  stdin_w = None
     828          finally:
     829              if stdin_w is not None:
     830                  stdin.close()
     831                  stdin_w.close()
     832  
     833  
     834  class ESC[4;38;5;81mAbstractChildWatcher:
     835      """Abstract base class for monitoring child processes.
     836  
     837      Objects derived from this class monitor a collection of subprocesses and
     838      report their termination or interruption by a signal.
     839  
     840      New callbacks are registered with .add_child_handler(). Starting a new
     841      process must be done within a 'with' block to allow the watcher to suspend
     842      its activity until the new process if fully registered (this is needed to
     843      prevent a race condition in some implementations).
     844  
     845      Example:
     846          with watcher:
     847              proc = subprocess.Popen("sleep 1")
     848              watcher.add_child_handler(proc.pid, callback)
     849  
     850      Notes:
     851          Implementations of this class must be thread-safe.
     852  
     853          Since child watcher objects may catch the SIGCHLD signal and call
     854          waitpid(-1), there should be only one active object per process.
     855      """
     856  
     857      def __init_subclass__(cls) -> None:
     858          if cls.__module__ != __name__:
     859              warnings._deprecated("AbstractChildWatcher",
     860                               "{name!r} is deprecated as of Python 3.12 and will be "
     861                               "removed in Python {remove}.",
     862                                remove=(3, 14))
     863  
     864      def add_child_handler(self, pid, callback, *args):
     865          """Register a new child handler.
     866  
     867          Arrange for callback(pid, returncode, *args) to be called when
     868          process 'pid' terminates. Specifying another callback for the same
     869          process replaces the previous handler.
     870  
     871          Note: callback() must be thread-safe.
     872          """
     873          raise NotImplementedError()
     874  
     875      def remove_child_handler(self, pid):
     876          """Removes the handler for process 'pid'.
     877  
     878          The function returns True if the handler was successfully removed,
     879          False if there was nothing to remove."""
     880  
     881          raise NotImplementedError()
     882  
     883      def attach_loop(self, loop):
     884          """Attach the watcher to an event loop.
     885  
     886          If the watcher was previously attached to an event loop, then it is
     887          first detached before attaching to the new loop.
     888  
     889          Note: loop may be None.
     890          """
     891          raise NotImplementedError()
     892  
     893      def close(self):
     894          """Close the watcher.
     895  
     896          This must be called to make sure that any underlying resource is freed.
     897          """
     898          raise NotImplementedError()
     899  
     900      def is_active(self):
     901          """Return ``True`` if the watcher is active and is used by the event loop.
     902  
     903          Return True if the watcher is installed and ready to handle process exit
     904          notifications.
     905  
     906          """
     907          raise NotImplementedError()
     908  
     909      def __enter__(self):
     910          """Enter the watcher's context and allow starting new processes
     911  
     912          This function must return self"""
     913          raise NotImplementedError()
     914  
     915      def __exit__(self, a, b, c):
     916          """Exit the watcher's context"""
     917          raise NotImplementedError()
     918  
     919  
     920  class ESC[4;38;5;81mPidfdChildWatcher(ESC[4;38;5;149mAbstractChildWatcher):
     921      """Child watcher implementation using Linux's pid file descriptors.
     922  
     923      This child watcher polls process file descriptors (pidfds) to await child
     924      process termination. In some respects, PidfdChildWatcher is a "Goldilocks"
     925      child watcher implementation. It doesn't require signals or threads, doesn't
     926      interfere with any processes launched outside the event loop, and scales
     927      linearly with the number of subprocesses launched by the event loop. The
     928      main disadvantage is that pidfds are specific to Linux, and only work on
     929      recent (5.3+) kernels.
     930      """
     931  
     932      def __enter__(self):
     933          return self
     934  
     935      def __exit__(self, exc_type, exc_value, exc_traceback):
     936          pass
     937  
     938      def is_active(self):
     939          return True
     940  
     941      def close(self):
     942          pass
     943  
     944      def attach_loop(self, loop):
     945          pass
     946  
     947      def add_child_handler(self, pid, callback, *args):
     948          loop = events.get_running_loop()
     949          pidfd = os.pidfd_open(pid)
     950          loop._add_reader(pidfd, self._do_wait, pid, pidfd, callback, args)
     951  
     952      def _do_wait(self, pid, pidfd, callback, args):
     953          loop = events.get_running_loop()
     954          loop._remove_reader(pidfd)
     955          try:
     956              _, status = os.waitpid(pid, 0)
     957          except ChildProcessError:
     958              # The child process is already reaped
     959              # (may happen if waitpid() is called elsewhere).
     960              returncode = 255
     961              logger.warning(
     962                  "child process pid %d exit status already read: "
     963                  " will report returncode 255",
     964                  pid)
     965          else:
     966              returncode = waitstatus_to_exitcode(status)
     967  
     968          os.close(pidfd)
     969          callback(pid, returncode, *args)
     970  
     971      def remove_child_handler(self, pid):
     972          # asyncio never calls remove_child_handler() !!!
     973          # The method is no-op but is implemented because
     974          # abstract base classes require it.
     975          return True
     976  
     977  
     978  class ESC[4;38;5;81mBaseChildWatcher(ESC[4;38;5;149mAbstractChildWatcher):
     979  
     980      def __init__(self):
     981          self._loop = None
     982          self._callbacks = {}
     983  
     984      def close(self):
     985          self.attach_loop(None)
     986  
     987      def is_active(self):
     988          return self._loop is not None and self._loop.is_running()
     989  
     990      def _do_waitpid(self, expected_pid):
     991          raise NotImplementedError()
     992  
     993      def _do_waitpid_all(self):
     994          raise NotImplementedError()
     995  
     996      def attach_loop(self, loop):
     997          assert loop is None or isinstance(loop, events.AbstractEventLoop)
     998  
     999          if self._loop is not None and loop is None and self._callbacks:
    1000              warnings.warn(
    1001                  'A loop is being detached '
    1002                  'from a child watcher with pending handlers',
    1003                  RuntimeWarning)
    1004  
    1005          if self._loop is not None:
    1006              self._loop.remove_signal_handler(signal.SIGCHLD)
    1007  
    1008          self._loop = loop
    1009          if loop is not None:
    1010              loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
    1011  
    1012              # Prevent a race condition in case a child terminated
    1013              # during the switch.
    1014              self._do_waitpid_all()
    1015  
    1016      def _sig_chld(self):
    1017          try:
    1018              self._do_waitpid_all()
    1019          except (SystemExit, KeyboardInterrupt):
    1020              raise
    1021          except BaseException as exc:
    1022              # self._loop should always be available here
    1023              # as '_sig_chld' is added as a signal handler
    1024              # in 'attach_loop'
    1025              self._loop.call_exception_handler({
    1026                  'message': 'Unknown exception in SIGCHLD handler',
    1027                  'exception': exc,
    1028              })
    1029  
    1030  
    1031  class ESC[4;38;5;81mSafeChildWatcher(ESC[4;38;5;149mBaseChildWatcher):
    1032      """'Safe' child watcher implementation.
    1033  
    1034      This implementation avoids disrupting other code spawning processes by
    1035      polling explicitly each process in the SIGCHLD handler instead of calling
    1036      os.waitpid(-1).
    1037  
    1038      This is a safe solution but it has a significant overhead when handling a
    1039      big number of children (O(n) each time SIGCHLD is raised)
    1040      """
    1041  
    1042      def __init__(self):
    1043          super().__init__()
    1044          warnings._deprecated("SafeChildWatcher",
    1045                               "{name!r} is deprecated as of Python 3.12 and will be "
    1046                               "removed in Python {remove}.",
    1047                                remove=(3, 14))
    1048  
    1049      def close(self):
    1050          self._callbacks.clear()
    1051          super().close()
    1052  
    1053      def __enter__(self):
    1054          return self
    1055  
    1056      def __exit__(self, a, b, c):
    1057          pass
    1058  
    1059      def add_child_handler(self, pid, callback, *args):
    1060          self._callbacks[pid] = (callback, args)
    1061  
    1062          # Prevent a race condition in case the child is already terminated.
    1063          self._do_waitpid(pid)
    1064  
    1065      def remove_child_handler(self, pid):
    1066          try:
    1067              del self._callbacks[pid]
    1068              return True
    1069          except KeyError:
    1070              return False
    1071  
    1072      def _do_waitpid_all(self):
    1073  
    1074          for pid in list(self._callbacks):
    1075              self._do_waitpid(pid)
    1076  
    1077      def _do_waitpid(self, expected_pid):
    1078          assert expected_pid > 0
    1079  
    1080          try:
    1081              pid, status = os.waitpid(expected_pid, os.WNOHANG)
    1082          except ChildProcessError:
    1083              # The child process is already reaped
    1084              # (may happen if waitpid() is called elsewhere).
    1085              pid = expected_pid
    1086              returncode = 255
    1087              logger.warning(
    1088                  "Unknown child process pid %d, will report returncode 255",
    1089                  pid)
    1090          else:
    1091              if pid == 0:
    1092                  # The child process is still alive.
    1093                  return
    1094  
    1095              returncode = waitstatus_to_exitcode(status)
    1096              if self._loop.get_debug():
    1097                  logger.debug('process %s exited with returncode %s',
    1098                               expected_pid, returncode)
    1099  
    1100          try:
    1101              callback, args = self._callbacks.pop(pid)
    1102          except KeyError:  # pragma: no cover
    1103              # May happen if .remove_child_handler() is called
    1104              # after os.waitpid() returns.
    1105              if self._loop.get_debug():
    1106                  logger.warning("Child watcher got an unexpected pid: %r",
    1107                                 pid, exc_info=True)
    1108          else:
    1109              callback(pid, returncode, *args)
    1110  
    1111  
    1112  class ESC[4;38;5;81mFastChildWatcher(ESC[4;38;5;149mBaseChildWatcher):
    1113      """'Fast' child watcher implementation.
    1114  
    1115      This implementation reaps every terminated processes by calling
    1116      os.waitpid(-1) directly, possibly breaking other code spawning processes
    1117      and waiting for their termination.
    1118  
    1119      There is no noticeable overhead when handling a big number of children
    1120      (O(1) each time a child terminates).
    1121      """
    1122      def __init__(self):
    1123          super().__init__()
    1124          self._lock = threading.Lock()
    1125          self._zombies = {}
    1126          self._forks = 0
    1127          warnings._deprecated("FastChildWatcher",
    1128                               "{name!r} is deprecated as of Python 3.12 and will be "
    1129                               "removed in Python {remove}.",
    1130                                remove=(3, 14))
    1131  
    1132      def close(self):
    1133          self._callbacks.clear()
    1134          self._zombies.clear()
    1135          super().close()
    1136  
    1137      def __enter__(self):
    1138          with self._lock:
    1139              self._forks += 1
    1140  
    1141              return self
    1142  
    1143      def __exit__(self, a, b, c):
    1144          with self._lock:
    1145              self._forks -= 1
    1146  
    1147              if self._forks or not self._zombies:
    1148                  return
    1149  
    1150              collateral_victims = str(self._zombies)
    1151              self._zombies.clear()
    1152  
    1153          logger.warning(
    1154              "Caught subprocesses termination from unknown pids: %s",
    1155              collateral_victims)
    1156  
    1157      def add_child_handler(self, pid, callback, *args):
    1158          assert self._forks, "Must use the context manager"
    1159  
    1160          with self._lock:
    1161              try:
    1162                  returncode = self._zombies.pop(pid)
    1163              except KeyError:
    1164                  # The child is running.
    1165                  self._callbacks[pid] = callback, args
    1166                  return
    1167  
    1168          # The child is dead already. We can fire the callback.
    1169          callback(pid, returncode, *args)
    1170  
    1171      def remove_child_handler(self, pid):
    1172          try:
    1173              del self._callbacks[pid]
    1174              return True
    1175          except KeyError:
    1176              return False
    1177  
    1178      def _do_waitpid_all(self):
    1179          # Because of signal coalescing, we must keep calling waitpid() as
    1180          # long as we're able to reap a child.
    1181          while True:
    1182              try:
    1183                  pid, status = os.waitpid(-1, os.WNOHANG)
    1184              except ChildProcessError:
    1185                  # No more child processes exist.
    1186                  return
    1187              else:
    1188                  if pid == 0:
    1189                      # A child process is still alive.
    1190                      return
    1191  
    1192                  returncode = waitstatus_to_exitcode(status)
    1193  
    1194              with self._lock:
    1195                  try:
    1196                      callback, args = self._callbacks.pop(pid)
    1197                  except KeyError:
    1198                      # unknown child
    1199                      if self._forks:
    1200                          # It may not be registered yet.
    1201                          self._zombies[pid] = returncode
    1202                          if self._loop.get_debug():
    1203                              logger.debug('unknown process %s exited '
    1204                                           'with returncode %s',
    1205                                           pid, returncode)
    1206                          continue
    1207                      callback = None
    1208                  else:
    1209                      if self._loop.get_debug():
    1210                          logger.debug('process %s exited with returncode %s',
    1211                                       pid, returncode)
    1212  
    1213              if callback is None:
    1214                  logger.warning(
    1215                      "Caught subprocess termination from unknown pid: "
    1216                      "%d -> %d", pid, returncode)
    1217              else:
    1218                  callback(pid, returncode, *args)
    1219  
    1220  
    1221  class ESC[4;38;5;81mMultiLoopChildWatcher(ESC[4;38;5;149mAbstractChildWatcher):
    1222      """A watcher that doesn't require running loop in the main thread.
    1223  
    1224      This implementation registers a SIGCHLD signal handler on
    1225      instantiation (which may conflict with other code that
    1226      install own handler for this signal).
    1227  
    1228      The solution is safe but it has a significant overhead when
    1229      handling a big number of processes (*O(n)* each time a
    1230      SIGCHLD is received).
    1231      """
    1232  
    1233      # Implementation note:
    1234      # The class keeps compatibility with AbstractChildWatcher ABC
    1235      # To achieve this it has empty attach_loop() method
    1236      # and doesn't accept explicit loop argument
    1237      # for add_child_handler()/remove_child_handler()
    1238      # but retrieves the current loop by get_running_loop()
    1239  
    1240      def __init__(self):
    1241          self._callbacks = {}
    1242          self._saved_sighandler = None
    1243          warnings._deprecated("MultiLoopChildWatcher",
    1244                               "{name!r} is deprecated as of Python 3.12 and will be "
    1245                               "removed in Python {remove}.",
    1246                                remove=(3, 14))
    1247  
    1248      def is_active(self):
    1249          return self._saved_sighandler is not None
    1250  
    1251      def close(self):
    1252          self._callbacks.clear()
    1253          if self._saved_sighandler is None:
    1254              return
    1255  
    1256          handler = signal.getsignal(signal.SIGCHLD)
    1257          if handler != self._sig_chld:
    1258              logger.warning("SIGCHLD handler was changed by outside code")
    1259          else:
    1260              signal.signal(signal.SIGCHLD, self._saved_sighandler)
    1261          self._saved_sighandler = None
    1262  
    1263      def __enter__(self):
    1264          return self
    1265  
    1266      def __exit__(self, exc_type, exc_val, exc_tb):
    1267          pass
    1268  
    1269      def add_child_handler(self, pid, callback, *args):
    1270          loop = events.get_running_loop()
    1271          self._callbacks[pid] = (loop, callback, args)
    1272  
    1273          # Prevent a race condition in case the child is already terminated.
    1274          self._do_waitpid(pid)
    1275  
    1276      def remove_child_handler(self, pid):
    1277          try:
    1278              del self._callbacks[pid]
    1279              return True
    1280          except KeyError:
    1281              return False
    1282  
    1283      def attach_loop(self, loop):
    1284          # Don't save the loop but initialize itself if called first time
    1285          # The reason to do it here is that attach_loop() is called from
    1286          # unix policy only for the main thread.
    1287          # Main thread is required for subscription on SIGCHLD signal
    1288          if self._saved_sighandler is not None:
    1289              return
    1290  
    1291          self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
    1292          if self._saved_sighandler is None:
    1293              logger.warning("Previous SIGCHLD handler was set by non-Python code, "
    1294                             "restore to default handler on watcher close.")
    1295              self._saved_sighandler = signal.SIG_DFL
    1296  
    1297          # Set SA_RESTART to limit EINTR occurrences.
    1298          signal.siginterrupt(signal.SIGCHLD, False)
    1299  
    1300      def _do_waitpid_all(self):
    1301          for pid in list(self._callbacks):
    1302              self._do_waitpid(pid)
    1303  
    1304      def _do_waitpid(self, expected_pid):
    1305          assert expected_pid > 0
    1306  
    1307          try:
    1308              pid, status = os.waitpid(expected_pid, os.WNOHANG)
    1309          except ChildProcessError:
    1310              # The child process is already reaped
    1311              # (may happen if waitpid() is called elsewhere).
    1312              pid = expected_pid
    1313              returncode = 255
    1314              logger.warning(
    1315                  "Unknown child process pid %d, will report returncode 255",
    1316                  pid)
    1317              debug_log = False
    1318          else:
    1319              if pid == 0:
    1320                  # The child process is still alive.
    1321                  return
    1322  
    1323              returncode = waitstatus_to_exitcode(status)
    1324              debug_log = True
    1325          try:
    1326              loop, callback, args = self._callbacks.pop(pid)
    1327          except KeyError:  # pragma: no cover
    1328              # May happen if .remove_child_handler() is called
    1329              # after os.waitpid() returns.
    1330              logger.warning("Child watcher got an unexpected pid: %r",
    1331                             pid, exc_info=True)
    1332          else:
    1333              if loop.is_closed():
    1334                  logger.warning("Loop %r that handles pid %r is closed", loop, pid)
    1335              else:
    1336                  if debug_log and loop.get_debug():
    1337                      logger.debug('process %s exited with returncode %s',
    1338                                   expected_pid, returncode)
    1339                  loop.call_soon_threadsafe(callback, pid, returncode, *args)
    1340  
    1341      def _sig_chld(self, signum, frame):
    1342          try:
    1343              self._do_waitpid_all()
    1344          except (SystemExit, KeyboardInterrupt):
    1345              raise
    1346          except BaseException:
    1347              logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
    1348  
    1349  
    1350  class ESC[4;38;5;81mThreadedChildWatcher(ESC[4;38;5;149mAbstractChildWatcher):
    1351      """Threaded child watcher implementation.
    1352  
    1353      The watcher uses a thread per process
    1354      for waiting for the process finish.
    1355  
    1356      It doesn't require subscription on POSIX signal
    1357      but a thread creation is not free.
    1358  
    1359      The watcher has O(1) complexity, its performance doesn't depend
    1360      on amount of spawn processes.
    1361      """
    1362  
    1363      def __init__(self):
    1364          self._pid_counter = itertools.count(0)
    1365          self._threads = {}
    1366  
    1367      def is_active(self):
    1368          return True
    1369  
    1370      def close(self):
    1371          self._join_threads()
    1372  
    1373      def _join_threads(self):
    1374          """Internal: Join all non-daemon threads"""
    1375          threads = [thread for thread in list(self._threads.values())
    1376                     if thread.is_alive() and not thread.daemon]
    1377          for thread in threads:
    1378              thread.join()
    1379  
    1380      def __enter__(self):
    1381          return self
    1382  
    1383      def __exit__(self, exc_type, exc_val, exc_tb):
    1384          pass
    1385  
    1386      def __del__(self, _warn=warnings.warn):
    1387          threads = [thread for thread in list(self._threads.values())
    1388                     if thread.is_alive()]
    1389          if threads:
    1390              _warn(f"{self.__class__} has registered but not finished child processes",
    1391                    ResourceWarning,
    1392                    source=self)
    1393  
    1394      def add_child_handler(self, pid, callback, *args):
    1395          loop = events.get_running_loop()
    1396          thread = threading.Thread(target=self._do_waitpid,
    1397                                    name=f"waitpid-{next(self._pid_counter)}",
    1398                                    args=(loop, pid, callback, args),
    1399                                    daemon=True)
    1400          self._threads[pid] = thread
    1401          thread.start()
    1402  
    1403      def remove_child_handler(self, pid):
    1404          # asyncio never calls remove_child_handler() !!!
    1405          # The method is no-op but is implemented because
    1406          # abstract base classes require it.
    1407          return True
    1408  
    1409      def attach_loop(self, loop):
    1410          pass
    1411  
    1412      def _do_waitpid(self, loop, expected_pid, callback, args):
    1413          assert expected_pid > 0
    1414  
    1415          try:
    1416              pid, status = os.waitpid(expected_pid, 0)
    1417          except ChildProcessError:
    1418              # The child process is already reaped
    1419              # (may happen if waitpid() is called elsewhere).
    1420              pid = expected_pid
    1421              returncode = 255
    1422              logger.warning(
    1423                  "Unknown child process pid %d, will report returncode 255",
    1424                  pid)
    1425          else:
    1426              returncode = waitstatus_to_exitcode(status)
    1427              if loop.get_debug():
    1428                  logger.debug('process %s exited with returncode %s',
    1429                               expected_pid, returncode)
    1430  
    1431          if loop.is_closed():
    1432              logger.warning("Loop %r that handles pid %r is closed", loop, pid)
    1433          else:
    1434              loop.call_soon_threadsafe(callback, pid, returncode, *args)
    1435  
    1436          self._threads.pop(expected_pid)
    1437  
    1438  def can_use_pidfd():
    1439      if not hasattr(os, 'pidfd_open'):
    1440          return False
    1441      try:
    1442          pid = os.getpid()
    1443          os.close(os.pidfd_open(pid, 0))
    1444      except OSError:
    1445          # blocked by security policy like SECCOMP
    1446          return False
    1447      return True
    1448  
    1449  
    1450  class ESC[4;38;5;81m_UnixDefaultEventLoopPolicy(ESC[4;38;5;149meventsESC[4;38;5;149m.ESC[4;38;5;149mBaseDefaultEventLoopPolicy):
    1451      """UNIX event loop policy with a watcher for child processes."""
    1452      _loop_factory = _UnixSelectorEventLoop
    1453  
    1454      def __init__(self):
    1455          super().__init__()
    1456          self._watcher = None
    1457  
    1458      def _init_watcher(self):
    1459          with events._lock:
    1460              if self._watcher is None:  # pragma: no branch
    1461                  if can_use_pidfd():
    1462                      self._watcher = PidfdChildWatcher()
    1463                  else:
    1464                      self._watcher = ThreadedChildWatcher()
    1465                  if threading.current_thread() is threading.main_thread():
    1466                      self._watcher.attach_loop(self._local._loop)
    1467  
    1468      def set_event_loop(self, loop):
    1469          """Set the event loop.
    1470  
    1471          As a side effect, if a child watcher was set before, then calling
    1472          .set_event_loop() from the main thread will call .attach_loop(loop) on
    1473          the child watcher.
    1474          """
    1475  
    1476          super().set_event_loop(loop)
    1477  
    1478          if (self._watcher is not None and
    1479                  threading.current_thread() is threading.main_thread()):
    1480              self._watcher.attach_loop(loop)
    1481  
    1482      def get_child_watcher(self):
    1483          """Get the watcher for child processes.
    1484  
    1485          If not yet set, a ThreadedChildWatcher object is automatically created.
    1486          """
    1487          if self._watcher is None:
    1488              self._init_watcher()
    1489  
    1490          warnings._deprecated("get_child_watcher",
    1491                              "{name!r} is deprecated as of Python 3.12 and will be "
    1492                              "removed in Python {remove}.", remove=(3, 14))
    1493          return self._watcher
    1494  
    1495      def set_child_watcher(self, watcher):
    1496          """Set the watcher for child processes."""
    1497  
    1498          assert watcher is None or isinstance(watcher, AbstractChildWatcher)
    1499  
    1500          if self._watcher is not None:
    1501              self._watcher.close()
    1502  
    1503          self._watcher = watcher
    1504          warnings._deprecated("set_child_watcher",
    1505                              "{name!r} is deprecated as of Python 3.12 and will be "
    1506                              "removed in Python {remove}.", remove=(3, 14))
    1507  
    1508  
    1509  SelectorEventLoop = _UnixSelectorEventLoop
    1510  DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy