(root)/
Python-3.11.7/
Lib/
asyncio/
selector_events.py
       1  """Event loop using a selector and related classes.
       2  
       3  A selector is a "notify-when-ready" multiplexer.  For a subclass which
       4  also includes support for signal handling, see the unix_events sub-module.
       5  """
       6  
       7  __all__ = 'BaseSelectorEventLoop',
       8  
       9  import collections
      10  import errno
      11  import functools
      12  import selectors
      13  import socket
      14  import warnings
      15  import weakref
      16  try:
      17      import ssl
      18  except ImportError:  # pragma: no cover
      19      ssl = None
      20  
      21  from . import base_events
      22  from . import constants
      23  from . import events
      24  from . import futures
      25  from . import protocols
      26  from . import sslproto
      27  from . import transports
      28  from . import trsock
      29  from .log import logger
      30  
      31  
      32  def _test_selector_event(selector, fd, event):
      33      # Test if the selector is monitoring 'event' events
      34      # for the file descriptor 'fd'.
      35      try:
      36          key = selector.get_key(fd)
      37      except KeyError:
      38          return False
      39      else:
      40          return bool(key.events & event)
      41  
      42  
      43  class ESC[4;38;5;81mBaseSelectorEventLoop(ESC[4;38;5;149mbase_eventsESC[4;38;5;149m.ESC[4;38;5;149mBaseEventLoop):
      44      """Selector event loop.
      45  
      46      See events.EventLoop for API specification.
      47      """
      48  
      49      def __init__(self, selector=None):
      50          super().__init__()
      51  
      52          if selector is None:
      53              selector = selectors.DefaultSelector()
      54          logger.debug('Using selector: %s', selector.__class__.__name__)
      55          self._selector = selector
      56          self._make_self_pipe()
      57          self._transports = weakref.WeakValueDictionary()
      58  
      59      def _make_socket_transport(self, sock, protocol, waiter=None, *,
      60                                 extra=None, server=None):
      61          return _SelectorSocketTransport(self, sock, protocol, waiter,
      62                                          extra, server)
      63  
      64      def _make_ssl_transport(
      65              self, rawsock, protocol, sslcontext, waiter=None,
      66              *, server_side=False, server_hostname=None,
      67              extra=None, server=None,
      68              ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
      69              ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT,
      70      ):
      71          ssl_protocol = sslproto.SSLProtocol(
      72              self, protocol, sslcontext, waiter,
      73              server_side, server_hostname,
      74              ssl_handshake_timeout=ssl_handshake_timeout,
      75              ssl_shutdown_timeout=ssl_shutdown_timeout
      76          )
      77          _SelectorSocketTransport(self, rawsock, ssl_protocol,
      78                                   extra=extra, server=server)
      79          return ssl_protocol._app_transport
      80  
      81      def _make_datagram_transport(self, sock, protocol,
      82                                   address=None, waiter=None, extra=None):
      83          return _SelectorDatagramTransport(self, sock, protocol,
      84                                            address, waiter, extra)
      85  
      86      def close(self):
      87          if self.is_running():
      88              raise RuntimeError("Cannot close a running event loop")
      89          if self.is_closed():
      90              return
      91          self._close_self_pipe()
      92          super().close()
      93          if self._selector is not None:
      94              self._selector.close()
      95              self._selector = None
      96  
      97      def _close_self_pipe(self):
      98          self._remove_reader(self._ssock.fileno())
      99          self._ssock.close()
     100          self._ssock = None
     101          self._csock.close()
     102          self._csock = None
     103          self._internal_fds -= 1
     104  
     105      def _make_self_pipe(self):
     106          # A self-socket, really. :-)
     107          self._ssock, self._csock = socket.socketpair()
     108          self._ssock.setblocking(False)
     109          self._csock.setblocking(False)
     110          self._internal_fds += 1
     111          self._add_reader(self._ssock.fileno(), self._read_from_self)
     112  
     113      def _process_self_data(self, data):
     114          pass
     115  
     116      def _read_from_self(self):
     117          while True:
     118              try:
     119                  data = self._ssock.recv(4096)
     120                  if not data:
     121                      break
     122                  self._process_self_data(data)
     123              except InterruptedError:
     124                  continue
     125              except BlockingIOError:
     126                  break
     127  
     128      def _write_to_self(self):
     129          # This may be called from a different thread, possibly after
     130          # _close_self_pipe() has been called or even while it is
     131          # running.  Guard for self._csock being None or closed.  When
     132          # a socket is closed, send() raises OSError (with errno set to
     133          # EBADF, but let's not rely on the exact error code).
     134          csock = self._csock
     135          if csock is None:
     136              return
     137  
     138          try:
     139              csock.send(b'\0')
     140          except OSError:
     141              if self._debug:
     142                  logger.debug("Fail to write a null byte into the "
     143                               "self-pipe socket",
     144                               exc_info=True)
     145  
     146      def _start_serving(self, protocol_factory, sock,
     147                         sslcontext=None, server=None, backlog=100,
     148                         ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
     149                         ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
     150          self._add_reader(sock.fileno(), self._accept_connection,
     151                           protocol_factory, sock, sslcontext, server, backlog,
     152                           ssl_handshake_timeout, ssl_shutdown_timeout)
     153  
     154      def _accept_connection(
     155              self, protocol_factory, sock,
     156              sslcontext=None, server=None, backlog=100,
     157              ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
     158              ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
     159          # This method is only called once for each event loop tick where the
     160          # listening socket has triggered an EVENT_READ. There may be multiple
     161          # connections waiting for an .accept() so it is called in a loop.
     162          # See https://bugs.python.org/issue27906 for more details.
     163          for _ in range(backlog):
     164              try:
     165                  conn, addr = sock.accept()
     166                  if self._debug:
     167                      logger.debug("%r got a new connection from %r: %r",
     168                                   server, addr, conn)
     169                  conn.setblocking(False)
     170              except (BlockingIOError, InterruptedError, ConnectionAbortedError):
     171                  # Early exit because the socket accept buffer is empty.
     172                  return None
     173              except OSError as exc:
     174                  # There's nowhere to send the error, so just log it.
     175                  if exc.errno in (errno.EMFILE, errno.ENFILE,
     176                                   errno.ENOBUFS, errno.ENOMEM):
     177                      # Some platforms (e.g. Linux keep reporting the FD as
     178                      # ready, so we remove the read handler temporarily.
     179                      # We'll try again in a while.
     180                      self.call_exception_handler({
     181                          'message': 'socket.accept() out of system resource',
     182                          'exception': exc,
     183                          'socket': trsock.TransportSocket(sock),
     184                      })
     185                      self._remove_reader(sock.fileno())
     186                      self.call_later(constants.ACCEPT_RETRY_DELAY,
     187                                      self._start_serving,
     188                                      protocol_factory, sock, sslcontext, server,
     189                                      backlog, ssl_handshake_timeout,
     190                                      ssl_shutdown_timeout)
     191                  else:
     192                      raise  # The event loop will catch, log and ignore it.
     193              else:
     194                  extra = {'peername': addr}
     195                  accept = self._accept_connection2(
     196                      protocol_factory, conn, extra, sslcontext, server,
     197                      ssl_handshake_timeout, ssl_shutdown_timeout)
     198                  self.create_task(accept)
     199  
     200      async def _accept_connection2(
     201              self, protocol_factory, conn, extra,
     202              sslcontext=None, server=None,
     203              ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
     204              ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
     205          protocol = None
     206          transport = None
     207          try:
     208              protocol = protocol_factory()
     209              waiter = self.create_future()
     210              if sslcontext:
     211                  transport = self._make_ssl_transport(
     212                      conn, protocol, sslcontext, waiter=waiter,
     213                      server_side=True, extra=extra, server=server,
     214                      ssl_handshake_timeout=ssl_handshake_timeout,
     215                      ssl_shutdown_timeout=ssl_shutdown_timeout)
     216              else:
     217                  transport = self._make_socket_transport(
     218                      conn, protocol, waiter=waiter, extra=extra,
     219                      server=server)
     220  
     221              try:
     222                  await waiter
     223              except BaseException:
     224                  transport.close()
     225                  raise
     226                  # It's now up to the protocol to handle the connection.
     227  
     228          except (SystemExit, KeyboardInterrupt):
     229              raise
     230          except BaseException as exc:
     231              if self._debug:
     232                  context = {
     233                      'message':
     234                          'Error on transport creation for incoming connection',
     235                      'exception': exc,
     236                  }
     237                  if protocol is not None:
     238                      context['protocol'] = protocol
     239                  if transport is not None:
     240                      context['transport'] = transport
     241                  self.call_exception_handler(context)
     242  
     243      def _ensure_fd_no_transport(self, fd):
     244          fileno = fd
     245          if not isinstance(fileno, int):
     246              try:
     247                  fileno = int(fileno.fileno())
     248              except (AttributeError, TypeError, ValueError):
     249                  # This code matches selectors._fileobj_to_fd function.
     250                  raise ValueError(f"Invalid file object: {fd!r}") from None
     251          try:
     252              transport = self._transports[fileno]
     253          except KeyError:
     254              pass
     255          else:
     256              if not transport.is_closing():
     257                  raise RuntimeError(
     258                      f'File descriptor {fd!r} is used by transport '
     259                      f'{transport!r}')
     260  
     261      def _add_reader(self, fd, callback, *args):
     262          self._check_closed()
     263          handle = events.Handle(callback, args, self, None)
     264          try:
     265              key = self._selector.get_key(fd)
     266          except KeyError:
     267              self._selector.register(fd, selectors.EVENT_READ,
     268                                      (handle, None))
     269          else:
     270              mask, (reader, writer) = key.events, key.data
     271              self._selector.modify(fd, mask | selectors.EVENT_READ,
     272                                    (handle, writer))
     273              if reader is not None:
     274                  reader.cancel()
     275          return handle
     276  
     277      def _remove_reader(self, fd):
     278          if self.is_closed():
     279              return False
     280          try:
     281              key = self._selector.get_key(fd)
     282          except KeyError:
     283              return False
     284          else:
     285              mask, (reader, writer) = key.events, key.data
     286              mask &= ~selectors.EVENT_READ
     287              if not mask:
     288                  self._selector.unregister(fd)
     289              else:
     290                  self._selector.modify(fd, mask, (None, writer))
     291  
     292              if reader is not None:
     293                  reader.cancel()
     294                  return True
     295              else:
     296                  return False
     297  
     298      def _add_writer(self, fd, callback, *args):
     299          self._check_closed()
     300          handle = events.Handle(callback, args, self, None)
     301          try:
     302              key = self._selector.get_key(fd)
     303          except KeyError:
     304              self._selector.register(fd, selectors.EVENT_WRITE,
     305                                      (None, handle))
     306          else:
     307              mask, (reader, writer) = key.events, key.data
     308              self._selector.modify(fd, mask | selectors.EVENT_WRITE,
     309                                    (reader, handle))
     310              if writer is not None:
     311                  writer.cancel()
     312          return handle
     313  
     314      def _remove_writer(self, fd):
     315          """Remove a writer callback."""
     316          if self.is_closed():
     317              return False
     318          try:
     319              key = self._selector.get_key(fd)
     320          except KeyError:
     321              return False
     322          else:
     323              mask, (reader, writer) = key.events, key.data
     324              # Remove both writer and connector.
     325              mask &= ~selectors.EVENT_WRITE
     326              if not mask:
     327                  self._selector.unregister(fd)
     328              else:
     329                  self._selector.modify(fd, mask, (reader, None))
     330  
     331              if writer is not None:
     332                  writer.cancel()
     333                  return True
     334              else:
     335                  return False
     336  
     337      def add_reader(self, fd, callback, *args):
     338          """Add a reader callback."""
     339          self._ensure_fd_no_transport(fd)
     340          self._add_reader(fd, callback, *args)
     341  
     342      def remove_reader(self, fd):
     343          """Remove a reader callback."""
     344          self._ensure_fd_no_transport(fd)
     345          return self._remove_reader(fd)
     346  
     347      def add_writer(self, fd, callback, *args):
     348          """Add a writer callback.."""
     349          self._ensure_fd_no_transport(fd)
     350          self._add_writer(fd, callback, *args)
     351  
     352      def remove_writer(self, fd):
     353          """Remove a writer callback."""
     354          self._ensure_fd_no_transport(fd)
     355          return self._remove_writer(fd)
     356  
     357      async def sock_recv(self, sock, n):
     358          """Receive data from the socket.
     359  
     360          The return value is a bytes object representing the data received.
     361          The maximum amount of data to be received at once is specified by
     362          nbytes.
     363          """
     364          base_events._check_ssl_socket(sock)
     365          if self._debug and sock.gettimeout() != 0:
     366              raise ValueError("the socket must be non-blocking")
     367          try:
     368              return sock.recv(n)
     369          except (BlockingIOError, InterruptedError):
     370              pass
     371          fut = self.create_future()
     372          fd = sock.fileno()
     373          self._ensure_fd_no_transport(fd)
     374          handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
     375          fut.add_done_callback(
     376              functools.partial(self._sock_read_done, fd, handle=handle))
     377          return await fut
     378  
     379      def _sock_read_done(self, fd, fut, handle=None):
     380          if handle is None or not handle.cancelled():
     381              self.remove_reader(fd)
     382  
     383      def _sock_recv(self, fut, sock, n):
     384          # _sock_recv() can add itself as an I/O callback if the operation can't
     385          # be done immediately. Don't use it directly, call sock_recv().
     386          if fut.done():
     387              return
     388          try:
     389              data = sock.recv(n)
     390          except (BlockingIOError, InterruptedError):
     391              return  # try again next time
     392          except (SystemExit, KeyboardInterrupt):
     393              raise
     394          except BaseException as exc:
     395              fut.set_exception(exc)
     396          else:
     397              fut.set_result(data)
     398  
     399      async def sock_recv_into(self, sock, buf):
     400          """Receive data from the socket.
     401  
     402          The received data is written into *buf* (a writable buffer).
     403          The return value is the number of bytes written.
     404          """
     405          base_events._check_ssl_socket(sock)
     406          if self._debug and sock.gettimeout() != 0:
     407              raise ValueError("the socket must be non-blocking")
     408          try:
     409              return sock.recv_into(buf)
     410          except (BlockingIOError, InterruptedError):
     411              pass
     412          fut = self.create_future()
     413          fd = sock.fileno()
     414          self._ensure_fd_no_transport(fd)
     415          handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf)
     416          fut.add_done_callback(
     417              functools.partial(self._sock_read_done, fd, handle=handle))
     418          return await fut
     419  
     420      def _sock_recv_into(self, fut, sock, buf):
     421          # _sock_recv_into() can add itself as an I/O callback if the operation
     422          # can't be done immediately. Don't use it directly, call
     423          # sock_recv_into().
     424          if fut.done():
     425              return
     426          try:
     427              nbytes = sock.recv_into(buf)
     428          except (BlockingIOError, InterruptedError):
     429              return  # try again next time
     430          except (SystemExit, KeyboardInterrupt):
     431              raise
     432          except BaseException as exc:
     433              fut.set_exception(exc)
     434          else:
     435              fut.set_result(nbytes)
     436  
     437      async def sock_recvfrom(self, sock, bufsize):
     438          """Receive a datagram from a datagram socket.
     439  
     440          The return value is a tuple of (bytes, address) representing the
     441          datagram received and the address it came from.
     442          The maximum amount of data to be received at once is specified by
     443          nbytes.
     444          """
     445          base_events._check_ssl_socket(sock)
     446          if self._debug and sock.gettimeout() != 0:
     447              raise ValueError("the socket must be non-blocking")
     448          try:
     449              return sock.recvfrom(bufsize)
     450          except (BlockingIOError, InterruptedError):
     451              pass
     452          fut = self.create_future()
     453          fd = sock.fileno()
     454          self._ensure_fd_no_transport(fd)
     455          handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize)
     456          fut.add_done_callback(
     457              functools.partial(self._sock_read_done, fd, handle=handle))
     458          return await fut
     459  
     460      def _sock_recvfrom(self, fut, sock, bufsize):
     461          # _sock_recvfrom() can add itself as an I/O callback if the operation
     462          # can't be done immediately. Don't use it directly, call
     463          # sock_recvfrom().
     464          if fut.done():
     465              return
     466          try:
     467              result = sock.recvfrom(bufsize)
     468          except (BlockingIOError, InterruptedError):
     469              return  # try again next time
     470          except (SystemExit, KeyboardInterrupt):
     471              raise
     472          except BaseException as exc:
     473              fut.set_exception(exc)
     474          else:
     475              fut.set_result(result)
     476  
     477      async def sock_recvfrom_into(self, sock, buf, nbytes=0):
     478          """Receive data from the socket.
     479  
     480          The received data is written into *buf* (a writable buffer).
     481          The return value is a tuple of (number of bytes written, address).
     482          """
     483          base_events._check_ssl_socket(sock)
     484          if self._debug and sock.gettimeout() != 0:
     485              raise ValueError("the socket must be non-blocking")
     486          if not nbytes:
     487              nbytes = len(buf)
     488  
     489          try:
     490              return sock.recvfrom_into(buf, nbytes)
     491          except (BlockingIOError, InterruptedError):
     492              pass
     493          fut = self.create_future()
     494          fd = sock.fileno()
     495          self._ensure_fd_no_transport(fd)
     496          handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf,
     497                                    nbytes)
     498          fut.add_done_callback(
     499              functools.partial(self._sock_read_done, fd, handle=handle))
     500          return await fut
     501  
     502      def _sock_recvfrom_into(self, fut, sock, buf, bufsize):
     503          # _sock_recv_into() can add itself as an I/O callback if the operation
     504          # can't be done immediately. Don't use it directly, call
     505          # sock_recv_into().
     506          if fut.done():
     507              return
     508          try:
     509              result = sock.recvfrom_into(buf, bufsize)
     510          except (BlockingIOError, InterruptedError):
     511              return  # try again next time
     512          except (SystemExit, KeyboardInterrupt):
     513              raise
     514          except BaseException as exc:
     515              fut.set_exception(exc)
     516          else:
     517              fut.set_result(result)
     518  
     519      async def sock_sendall(self, sock, data):
     520          """Send data to the socket.
     521  
     522          The socket must be connected to a remote socket. This method continues
     523          to send data from data until either all data has been sent or an
     524          error occurs. None is returned on success. On error, an exception is
     525          raised, and there is no way to determine how much data, if any, was
     526          successfully processed by the receiving end of the connection.
     527          """
     528          base_events._check_ssl_socket(sock)
     529          if self._debug and sock.gettimeout() != 0:
     530              raise ValueError("the socket must be non-blocking")
     531          try:
     532              n = sock.send(data)
     533          except (BlockingIOError, InterruptedError):
     534              n = 0
     535  
     536          if n == len(data):
     537              # all data sent
     538              return
     539  
     540          fut = self.create_future()
     541          fd = sock.fileno()
     542          self._ensure_fd_no_transport(fd)
     543          # use a trick with a list in closure to store a mutable state
     544          handle = self._add_writer(fd, self._sock_sendall, fut, sock,
     545                                    memoryview(data), [n])
     546          fut.add_done_callback(
     547              functools.partial(self._sock_write_done, fd, handle=handle))
     548          return await fut
     549  
     550      def _sock_sendall(self, fut, sock, view, pos):
     551          if fut.done():
     552              # Future cancellation can be scheduled on previous loop iteration
     553              return
     554          start = pos[0]
     555          try:
     556              n = sock.send(view[start:])
     557          except (BlockingIOError, InterruptedError):
     558              return
     559          except (SystemExit, KeyboardInterrupt):
     560              raise
     561          except BaseException as exc:
     562              fut.set_exception(exc)
     563              return
     564  
     565          start += n
     566  
     567          if start == len(view):
     568              fut.set_result(None)
     569          else:
     570              pos[0] = start
     571  
     572      async def sock_sendto(self, sock, data, address):
     573          """Send data to the socket.
     574  
     575          The socket must be connected to a remote socket. This method continues
     576          to send data from data until either all data has been sent or an
     577          error occurs. None is returned on success. On error, an exception is
     578          raised, and there is no way to determine how much data, if any, was
     579          successfully processed by the receiving end of the connection.
     580          """
     581          base_events._check_ssl_socket(sock)
     582          if self._debug and sock.gettimeout() != 0:
     583              raise ValueError("the socket must be non-blocking")
     584          try:
     585              return sock.sendto(data, address)
     586          except (BlockingIOError, InterruptedError):
     587              pass
     588  
     589          fut = self.create_future()
     590          fd = sock.fileno()
     591          self._ensure_fd_no_transport(fd)
     592          # use a trick with a list in closure to store a mutable state
     593          handle = self._add_writer(fd, self._sock_sendto, fut, sock, data,
     594                                    address)
     595          fut.add_done_callback(
     596              functools.partial(self._sock_write_done, fd, handle=handle))
     597          return await fut
     598  
     599      def _sock_sendto(self, fut, sock, data, address):
     600          if fut.done():
     601              # Future cancellation can be scheduled on previous loop iteration
     602              return
     603          try:
     604              n = sock.sendto(data, 0, address)
     605          except (BlockingIOError, InterruptedError):
     606              return
     607          except (SystemExit, KeyboardInterrupt):
     608              raise
     609          except BaseException as exc:
     610              fut.set_exception(exc)
     611          else:
     612              fut.set_result(n)
     613  
     614      async def sock_connect(self, sock, address):
     615          """Connect to a remote socket at address.
     616  
     617          This method is a coroutine.
     618          """
     619          base_events._check_ssl_socket(sock)
     620          if self._debug and sock.gettimeout() != 0:
     621              raise ValueError("the socket must be non-blocking")
     622  
     623          if sock.family == socket.AF_INET or (
     624                  base_events._HAS_IPv6 and sock.family == socket.AF_INET6):
     625              resolved = await self._ensure_resolved(
     626                  address, family=sock.family, type=sock.type, proto=sock.proto,
     627                  loop=self,
     628              )
     629              _, _, _, _, address = resolved[0]
     630  
     631          fut = self.create_future()
     632          self._sock_connect(fut, sock, address)
     633          try:
     634              return await fut
     635          finally:
     636              # Needed to break cycles when an exception occurs.
     637              fut = None
     638  
     639      def _sock_connect(self, fut, sock, address):
     640          fd = sock.fileno()
     641          try:
     642              sock.connect(address)
     643          except (BlockingIOError, InterruptedError):
     644              # Issue #23618: When the C function connect() fails with EINTR, the
     645              # connection runs in background. We have to wait until the socket
     646              # becomes writable to be notified when the connection succeed or
     647              # fails.
     648              self._ensure_fd_no_transport(fd)
     649              handle = self._add_writer(
     650                  fd, self._sock_connect_cb, fut, sock, address)
     651              fut.add_done_callback(
     652                  functools.partial(self._sock_write_done, fd, handle=handle))
     653          except (SystemExit, KeyboardInterrupt):
     654              raise
     655          except BaseException as exc:
     656              fut.set_exception(exc)
     657          else:
     658              fut.set_result(None)
     659          finally:
     660              fut = None
     661  
     662      def _sock_write_done(self, fd, fut, handle=None):
     663          if handle is None or not handle.cancelled():
     664              self.remove_writer(fd)
     665  
     666      def _sock_connect_cb(self, fut, sock, address):
     667          if fut.done():
     668              return
     669  
     670          try:
     671              err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
     672              if err != 0:
     673                  # Jump to any except clause below.
     674                  raise OSError(err, f'Connect call failed {address}')
     675          except (BlockingIOError, InterruptedError):
     676              # socket is still registered, the callback will be retried later
     677              pass
     678          except (SystemExit, KeyboardInterrupt):
     679              raise
     680          except BaseException as exc:
     681              fut.set_exception(exc)
     682          else:
     683              fut.set_result(None)
     684          finally:
     685              fut = None
     686  
     687      async def sock_accept(self, sock):
     688          """Accept a connection.
     689  
     690          The socket must be bound to an address and listening for connections.
     691          The return value is a pair (conn, address) where conn is a new socket
     692          object usable to send and receive data on the connection, and address
     693          is the address bound to the socket on the other end of the connection.
     694          """
     695          base_events._check_ssl_socket(sock)
     696          if self._debug and sock.gettimeout() != 0:
     697              raise ValueError("the socket must be non-blocking")
     698          fut = self.create_future()
     699          self._sock_accept(fut, sock)
     700          return await fut
     701  
     702      def _sock_accept(self, fut, sock):
     703          fd = sock.fileno()
     704          try:
     705              conn, address = sock.accept()
     706              conn.setblocking(False)
     707          except (BlockingIOError, InterruptedError):
     708              self._ensure_fd_no_transport(fd)
     709              handle = self._add_reader(fd, self._sock_accept, fut, sock)
     710              fut.add_done_callback(
     711                  functools.partial(self._sock_read_done, fd, handle=handle))
     712          except (SystemExit, KeyboardInterrupt):
     713              raise
     714          except BaseException as exc:
     715              fut.set_exception(exc)
     716          else:
     717              fut.set_result((conn, address))
     718  
     719      async def _sendfile_native(self, transp, file, offset, count):
     720          del self._transports[transp._sock_fd]
     721          resume_reading = transp.is_reading()
     722          transp.pause_reading()
     723          await transp._make_empty_waiter()
     724          try:
     725              return await self.sock_sendfile(transp._sock, file, offset, count,
     726                                              fallback=False)
     727          finally:
     728              transp._reset_empty_waiter()
     729              if resume_reading:
     730                  transp.resume_reading()
     731              self._transports[transp._sock_fd] = transp
     732  
     733      def _process_events(self, event_list):
     734          for key, mask in event_list:
     735              fileobj, (reader, writer) = key.fileobj, key.data
     736              if mask & selectors.EVENT_READ and reader is not None:
     737                  if reader._cancelled:
     738                      self._remove_reader(fileobj)
     739                  else:
     740                      self._add_callback(reader)
     741              if mask & selectors.EVENT_WRITE and writer is not None:
     742                  if writer._cancelled:
     743                      self._remove_writer(fileobj)
     744                  else:
     745                      self._add_callback(writer)
     746  
     747      def _stop_serving(self, sock):
     748          self._remove_reader(sock.fileno())
     749          sock.close()
     750  
     751  
     752  class ESC[4;38;5;81m_SelectorTransport(ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149m_FlowControlMixin,
     753                           ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149mTransport):
     754  
     755      max_size = 256 * 1024  # Buffer size passed to recv().
     756  
     757      _buffer_factory = bytearray  # Constructs initial value for self._buffer.
     758  
     759      # Attribute used in the destructor: it must be set even if the constructor
     760      # is not called (see _SelectorSslTransport which may start by raising an
     761      # exception)
     762      _sock = None
     763  
     764      def __init__(self, loop, sock, protocol, extra=None, server=None):
     765          super().__init__(extra, loop)
     766          self._extra['socket'] = trsock.TransportSocket(sock)
     767          try:
     768              self._extra['sockname'] = sock.getsockname()
     769          except OSError:
     770              self._extra['sockname'] = None
     771          if 'peername' not in self._extra:
     772              try:
     773                  self._extra['peername'] = sock.getpeername()
     774              except socket.error:
     775                  self._extra['peername'] = None
     776          self._sock = sock
     777          self._sock_fd = sock.fileno()
     778  
     779          self._protocol_connected = False
     780          self.set_protocol(protocol)
     781  
     782          self._server = server
     783          self._buffer = self._buffer_factory()
     784          self._conn_lost = 0  # Set when call to connection_lost scheduled.
     785          self._closing = False  # Set when close() called.
     786          self._paused = False  # Set when pause_reading() called
     787  
     788          if self._server is not None:
     789              self._server._attach()
     790          loop._transports[self._sock_fd] = self
     791  
     792      def __repr__(self):
     793          info = [self.__class__.__name__]
     794          if self._sock is None:
     795              info.append('closed')
     796          elif self._closing:
     797              info.append('closing')
     798          info.append(f'fd={self._sock_fd}')
     799          # test if the transport was closed
     800          if self._loop is not None and not self._loop.is_closed():
     801              polling = _test_selector_event(self._loop._selector,
     802                                             self._sock_fd, selectors.EVENT_READ)
     803              if polling:
     804                  info.append('read=polling')
     805              else:
     806                  info.append('read=idle')
     807  
     808              polling = _test_selector_event(self._loop._selector,
     809                                             self._sock_fd,
     810                                             selectors.EVENT_WRITE)
     811              if polling:
     812                  state = 'polling'
     813              else:
     814                  state = 'idle'
     815  
     816              bufsize = self.get_write_buffer_size()
     817              info.append(f'write=<{state}, bufsize={bufsize}>')
     818          return '<{}>'.format(' '.join(info))
     819  
     820      def abort(self):
     821          self._force_close(None)
     822  
     823      def set_protocol(self, protocol):
     824          self._protocol = protocol
     825          self._protocol_connected = True
     826  
     827      def get_protocol(self):
     828          return self._protocol
     829  
     830      def is_closing(self):
     831          return self._closing
     832  
     833      def is_reading(self):
     834          return not self.is_closing() and not self._paused
     835  
     836      def pause_reading(self):
     837          if not self.is_reading():
     838              return
     839          self._paused = True
     840          self._loop._remove_reader(self._sock_fd)
     841          if self._loop.get_debug():
     842              logger.debug("%r pauses reading", self)
     843  
     844      def resume_reading(self):
     845          if self._closing or not self._paused:
     846              return
     847          self._paused = False
     848          self._add_reader(self._sock_fd, self._read_ready)
     849          if self._loop.get_debug():
     850              logger.debug("%r resumes reading", self)
     851  
     852      def close(self):
     853          if self._closing:
     854              return
     855          self._closing = True
     856          self._loop._remove_reader(self._sock_fd)
     857          if not self._buffer:
     858              self._conn_lost += 1
     859              self._loop._remove_writer(self._sock_fd)
     860              self._loop.call_soon(self._call_connection_lost, None)
     861  
     862      def __del__(self, _warn=warnings.warn):
     863          if self._sock is not None:
     864              _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
     865              self._sock.close()
     866  
     867      def _fatal_error(self, exc, message='Fatal error on transport'):
     868          # Should be called from exception handler only.
     869          if isinstance(exc, OSError):
     870              if self._loop.get_debug():
     871                  logger.debug("%r: %s", self, message, exc_info=True)
     872          else:
     873              self._loop.call_exception_handler({
     874                  'message': message,
     875                  'exception': exc,
     876                  'transport': self,
     877                  'protocol': self._protocol,
     878              })
     879          self._force_close(exc)
     880  
     881      def _force_close(self, exc):
     882          if self._conn_lost:
     883              return
     884          if self._buffer:
     885              self._buffer.clear()
     886              self._loop._remove_writer(self._sock_fd)
     887          if not self._closing:
     888              self._closing = True
     889              self._loop._remove_reader(self._sock_fd)
     890          self._conn_lost += 1
     891          self._loop.call_soon(self._call_connection_lost, exc)
     892  
     893      def _call_connection_lost(self, exc):
     894          try:
     895              if self._protocol_connected:
     896                  self._protocol.connection_lost(exc)
     897          finally:
     898              self._sock.close()
     899              self._sock = None
     900              self._protocol = None
     901              self._loop = None
     902              server = self._server
     903              if server is not None:
     904                  server._detach()
     905                  self._server = None
     906  
     907      def get_write_buffer_size(self):
     908          return len(self._buffer)
     909  
     910      def _add_reader(self, fd, callback, *args):
     911          if not self.is_reading():
     912              return
     913          self._loop._add_reader(fd, callback, *args)
     914  
     915  
     916  class ESC[4;38;5;81m_SelectorSocketTransport(ESC[4;38;5;149m_SelectorTransport):
     917  
     918      _start_tls_compatible = True
     919      _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
     920  
     921      def __init__(self, loop, sock, protocol, waiter=None,
     922                   extra=None, server=None):
     923  
     924          self._read_ready_cb = None
     925          super().__init__(loop, sock, protocol, extra, server)
     926          self._eof = False
     927          self._empty_waiter = None
     928  
     929          # Disable the Nagle algorithm -- small writes will be
     930          # sent without waiting for the TCP ACK.  This generally
     931          # decreases the latency (in some cases significantly.)
     932          base_events._set_nodelay(self._sock)
     933  
     934          self._loop.call_soon(self._protocol.connection_made, self)
     935          # only start reading when connection_made() has been called
     936          self._loop.call_soon(self._add_reader,
     937                               self._sock_fd, self._read_ready)
     938          if waiter is not None:
     939              # only wake up the waiter when connection_made() has been called
     940              self._loop.call_soon(futures._set_result_unless_cancelled,
     941                                   waiter, None)
     942  
     943      def set_protocol(self, protocol):
     944          if isinstance(protocol, protocols.BufferedProtocol):
     945              self._read_ready_cb = self._read_ready__get_buffer
     946          else:
     947              self._read_ready_cb = self._read_ready__data_received
     948  
     949          super().set_protocol(protocol)
     950  
     951      def _read_ready(self):
     952          self._read_ready_cb()
     953  
     954      def _read_ready__get_buffer(self):
     955          if self._conn_lost:
     956              return
     957  
     958          try:
     959              buf = self._protocol.get_buffer(-1)
     960              if not len(buf):
     961                  raise RuntimeError('get_buffer() returned an empty buffer')
     962          except (SystemExit, KeyboardInterrupt):
     963              raise
     964          except BaseException as exc:
     965              self._fatal_error(
     966                  exc, 'Fatal error: protocol.get_buffer() call failed.')
     967              return
     968  
     969          try:
     970              nbytes = self._sock.recv_into(buf)
     971          except (BlockingIOError, InterruptedError):
     972              return
     973          except (SystemExit, KeyboardInterrupt):
     974              raise
     975          except BaseException as exc:
     976              self._fatal_error(exc, 'Fatal read error on socket transport')
     977              return
     978  
     979          if not nbytes:
     980              self._read_ready__on_eof()
     981              return
     982  
     983          try:
     984              self._protocol.buffer_updated(nbytes)
     985          except (SystemExit, KeyboardInterrupt):
     986              raise
     987          except BaseException as exc:
     988              self._fatal_error(
     989                  exc, 'Fatal error: protocol.buffer_updated() call failed.')
     990  
     991      def _read_ready__data_received(self):
     992          if self._conn_lost:
     993              return
     994          try:
     995              data = self._sock.recv(self.max_size)
     996          except (BlockingIOError, InterruptedError):
     997              return
     998          except (SystemExit, KeyboardInterrupt):
     999              raise
    1000          except BaseException as exc:
    1001              self._fatal_error(exc, 'Fatal read error on socket transport')
    1002              return
    1003  
    1004          if not data:
    1005              self._read_ready__on_eof()
    1006              return
    1007  
    1008          try:
    1009              self._protocol.data_received(data)
    1010          except (SystemExit, KeyboardInterrupt):
    1011              raise
    1012          except BaseException as exc:
    1013              self._fatal_error(
    1014                  exc, 'Fatal error: protocol.data_received() call failed.')
    1015  
    1016      def _read_ready__on_eof(self):
    1017          if self._loop.get_debug():
    1018              logger.debug("%r received EOF", self)
    1019  
    1020          try:
    1021              keep_open = self._protocol.eof_received()
    1022          except (SystemExit, KeyboardInterrupt):
    1023              raise
    1024          except BaseException as exc:
    1025              self._fatal_error(
    1026                  exc, 'Fatal error: protocol.eof_received() call failed.')
    1027              return
    1028  
    1029          if keep_open:
    1030              # We're keeping the connection open so the
    1031              # protocol can write more, but we still can't
    1032              # receive more, so remove the reader callback.
    1033              self._loop._remove_reader(self._sock_fd)
    1034          else:
    1035              self.close()
    1036  
    1037      def write(self, data):
    1038          if not isinstance(data, (bytes, bytearray, memoryview)):
    1039              raise TypeError(f'data argument must be a bytes-like object, '
    1040                              f'not {type(data).__name__!r}')
    1041          if self._eof:
    1042              raise RuntimeError('Cannot call write() after write_eof()')
    1043          if self._empty_waiter is not None:
    1044              raise RuntimeError('unable to write; sendfile is in progress')
    1045          if not data:
    1046              return
    1047  
    1048          if self._conn_lost:
    1049              if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
    1050                  logger.warning('socket.send() raised exception.')
    1051              self._conn_lost += 1
    1052              return
    1053  
    1054          if not self._buffer:
    1055              # Optimization: try to send now.
    1056              try:
    1057                  n = self._sock.send(data)
    1058              except (BlockingIOError, InterruptedError):
    1059                  pass
    1060              except (SystemExit, KeyboardInterrupt):
    1061                  raise
    1062              except BaseException as exc:
    1063                  self._fatal_error(exc, 'Fatal write error on socket transport')
    1064                  return
    1065              else:
    1066                  data = data[n:]
    1067                  if not data:
    1068                      return
    1069              # Not all was written; register write handler.
    1070              self._loop._add_writer(self._sock_fd, self._write_ready)
    1071  
    1072          # Add it to the buffer.
    1073          self._buffer.extend(data)
    1074          self._maybe_pause_protocol()
    1075  
    1076      def _write_ready(self):
    1077          assert self._buffer, 'Data should not be empty'
    1078  
    1079          if self._conn_lost:
    1080              return
    1081          try:
    1082              n = self._sock.send(self._buffer)
    1083          except (BlockingIOError, InterruptedError):
    1084              pass
    1085          except (SystemExit, KeyboardInterrupt):
    1086              raise
    1087          except BaseException as exc:
    1088              self._loop._remove_writer(self._sock_fd)
    1089              self._buffer.clear()
    1090              self._fatal_error(exc, 'Fatal write error on socket transport')
    1091              if self._empty_waiter is not None:
    1092                  self._empty_waiter.set_exception(exc)
    1093          else:
    1094              if n:
    1095                  del self._buffer[:n]
    1096              self._maybe_resume_protocol()  # May append to buffer.
    1097              if not self._buffer:
    1098                  self._loop._remove_writer(self._sock_fd)
    1099                  if self._empty_waiter is not None:
    1100                      self._empty_waiter.set_result(None)
    1101                  if self._closing:
    1102                      self._call_connection_lost(None)
    1103                  elif self._eof:
    1104                      self._sock.shutdown(socket.SHUT_WR)
    1105  
    1106      def write_eof(self):
    1107          if self._closing or self._eof:
    1108              return
    1109          self._eof = True
    1110          if not self._buffer:
    1111              self._sock.shutdown(socket.SHUT_WR)
    1112  
    1113      def can_write_eof(self):
    1114          return True
    1115  
    1116      def _call_connection_lost(self, exc):
    1117          super()._call_connection_lost(exc)
    1118          if self._empty_waiter is not None:
    1119              self._empty_waiter.set_exception(
    1120                  ConnectionError("Connection is closed by peer"))
    1121  
    1122      def _make_empty_waiter(self):
    1123          if self._empty_waiter is not None:
    1124              raise RuntimeError("Empty waiter is already set")
    1125          self._empty_waiter = self._loop.create_future()
    1126          if not self._buffer:
    1127              self._empty_waiter.set_result(None)
    1128          return self._empty_waiter
    1129  
    1130      def _reset_empty_waiter(self):
    1131          self._empty_waiter = None
    1132  
    1133  
    1134  class ESC[4;38;5;81m_SelectorDatagramTransport(ESC[4;38;5;149m_SelectorTransport):
    1135  
    1136      _buffer_factory = collections.deque
    1137  
    1138      def __init__(self, loop, sock, protocol, address=None,
    1139                   waiter=None, extra=None):
    1140          super().__init__(loop, sock, protocol, extra)
    1141          self._address = address
    1142          self._buffer_size = 0
    1143          self._loop.call_soon(self._protocol.connection_made, self)
    1144          # only start reading when connection_made() has been called
    1145          self._loop.call_soon(self._add_reader,
    1146                               self._sock_fd, self._read_ready)
    1147          if waiter is not None:
    1148              # only wake up the waiter when connection_made() has been called
    1149              self._loop.call_soon(futures._set_result_unless_cancelled,
    1150                                   waiter, None)
    1151  
    1152      def get_write_buffer_size(self):
    1153          return self._buffer_size
    1154  
    1155      def _read_ready(self):
    1156          if self._conn_lost:
    1157              return
    1158          try:
    1159              data, addr = self._sock.recvfrom(self.max_size)
    1160          except (BlockingIOError, InterruptedError):
    1161              pass
    1162          except OSError as exc:
    1163              self._protocol.error_received(exc)
    1164          except (SystemExit, KeyboardInterrupt):
    1165              raise
    1166          except BaseException as exc:
    1167              self._fatal_error(exc, 'Fatal read error on datagram transport')
    1168          else:
    1169              self._protocol.datagram_received(data, addr)
    1170  
    1171      def sendto(self, data, addr=None):
    1172          if not isinstance(data, (bytes, bytearray, memoryview)):
    1173              raise TypeError(f'data argument must be a bytes-like object, '
    1174                              f'not {type(data).__name__!r}')
    1175          if not data:
    1176              return
    1177  
    1178          if self._address:
    1179              if addr not in (None, self._address):
    1180                  raise ValueError(
    1181                      f'Invalid address: must be None or {self._address}')
    1182              addr = self._address
    1183  
    1184          if self._conn_lost and self._address:
    1185              if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
    1186                  logger.warning('socket.send() raised exception.')
    1187              self._conn_lost += 1
    1188              return
    1189  
    1190          if not self._buffer:
    1191              # Attempt to send it right away first.
    1192              try:
    1193                  if self._extra['peername']:
    1194                      self._sock.send(data)
    1195                  else:
    1196                      self._sock.sendto(data, addr)
    1197                  return
    1198              except (BlockingIOError, InterruptedError):
    1199                  self._loop._add_writer(self._sock_fd, self._sendto_ready)
    1200              except OSError as exc:
    1201                  self._protocol.error_received(exc)
    1202                  return
    1203              except (SystemExit, KeyboardInterrupt):
    1204                  raise
    1205              except BaseException as exc:
    1206                  self._fatal_error(
    1207                      exc, 'Fatal write error on datagram transport')
    1208                  return
    1209  
    1210          # Ensure that what we buffer is immutable.
    1211          self._buffer.append((bytes(data), addr))
    1212          self._buffer_size += len(data)
    1213          self._maybe_pause_protocol()
    1214  
    1215      def _sendto_ready(self):
    1216          while self._buffer:
    1217              data, addr = self._buffer.popleft()
    1218              self._buffer_size -= len(data)
    1219              try:
    1220                  if self._extra['peername']:
    1221                      self._sock.send(data)
    1222                  else:
    1223                      self._sock.sendto(data, addr)
    1224              except (BlockingIOError, InterruptedError):
    1225                  self._buffer.appendleft((data, addr))  # Try again later.
    1226                  self._buffer_size += len(data)
    1227                  break
    1228              except OSError as exc:
    1229                  self._protocol.error_received(exc)
    1230                  return
    1231              except (SystemExit, KeyboardInterrupt):
    1232                  raise
    1233              except BaseException as exc:
    1234                  self._fatal_error(
    1235                      exc, 'Fatal write error on datagram transport')
    1236                  return
    1237  
    1238          self._maybe_resume_protocol()  # May append to buffer.
    1239          if not self._buffer:
    1240              self._loop._remove_writer(self._sock_fd)
    1241              if self._closing:
    1242                  self._call_connection_lost(None)