python (3.12.0)

(root)/
lib/
python3.12/
asyncio/
selector_events.py
       1  """Event loop using a selector and related classes.
       2  
       3  A selector is a "notify-when-ready" multiplexer.  For a subclass which
       4  also includes support for signal handling, see the unix_events sub-module.
       5  """
       6  
       7  __all__ = 'BaseSelectorEventLoop',
       8  
       9  import collections
      10  import errno
      11  import functools
      12  import itertools
      13  import os
      14  import selectors
      15  import socket
      16  import warnings
      17  import weakref
      18  try:
      19      import ssl
      20  except ImportError:  # pragma: no cover
      21      ssl = None
      22  
      23  from . import base_events
      24  from . import constants
      25  from . import events
      26  from . import futures
      27  from . import protocols
      28  from . import sslproto
      29  from . import transports
      30  from . import trsock
      31  from .log import logger
      32  
      33  _HAS_SENDMSG = hasattr(socket.socket, 'sendmsg')
      34  
      35  if _HAS_SENDMSG:
      36      try:
      37          SC_IOV_MAX = os.sysconf('SC_IOV_MAX')
      38      except OSError:
      39          # Fallback to send
      40          _HAS_SENDMSG = False
      41  
      42  def _test_selector_event(selector, fd, event):
      43      # Test if the selector is monitoring 'event' events
      44      # for the file descriptor 'fd'.
      45      try:
      46          key = selector.get_key(fd)
      47      except KeyError:
      48          return False
      49      else:
      50          return bool(key.events & event)
      51  
      52  
      53  class ESC[4;38;5;81mBaseSelectorEventLoop(ESC[4;38;5;149mbase_eventsESC[4;38;5;149m.ESC[4;38;5;149mBaseEventLoop):
      54      """Selector event loop.
      55  
      56      See events.EventLoop for API specification.
      57      """
      58  
      59      def __init__(self, selector=None):
      60          super().__init__()
      61  
      62          if selector is None:
      63              selector = selectors.DefaultSelector()
      64          logger.debug('Using selector: %s', selector.__class__.__name__)
      65          self._selector = selector
      66          self._make_self_pipe()
      67          self._transports = weakref.WeakValueDictionary()
      68  
      69      def _make_socket_transport(self, sock, protocol, waiter=None, *,
      70                                 extra=None, server=None):
      71          self._ensure_fd_no_transport(sock)
      72          return _SelectorSocketTransport(self, sock, protocol, waiter,
      73                                          extra, server)
      74  
      75      def _make_ssl_transport(
      76              self, rawsock, protocol, sslcontext, waiter=None,
      77              *, server_side=False, server_hostname=None,
      78              extra=None, server=None,
      79              ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
      80              ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT,
      81      ):
      82          self._ensure_fd_no_transport(rawsock)
      83          ssl_protocol = sslproto.SSLProtocol(
      84              self, protocol, sslcontext, waiter,
      85              server_side, server_hostname,
      86              ssl_handshake_timeout=ssl_handshake_timeout,
      87              ssl_shutdown_timeout=ssl_shutdown_timeout
      88          )
      89          _SelectorSocketTransport(self, rawsock, ssl_protocol,
      90                                   extra=extra, server=server)
      91          return ssl_protocol._app_transport
      92  
      93      def _make_datagram_transport(self, sock, protocol,
      94                                   address=None, waiter=None, extra=None):
      95          self._ensure_fd_no_transport(sock)
      96          return _SelectorDatagramTransport(self, sock, protocol,
      97                                            address, waiter, extra)
      98  
      99      def close(self):
     100          if self.is_running():
     101              raise RuntimeError("Cannot close a running event loop")
     102          if self.is_closed():
     103              return
     104          self._close_self_pipe()
     105          super().close()
     106          if self._selector is not None:
     107              self._selector.close()
     108              self._selector = None
     109  
     110      def _close_self_pipe(self):
     111          self._remove_reader(self._ssock.fileno())
     112          self._ssock.close()
     113          self._ssock = None
     114          self._csock.close()
     115          self._csock = None
     116          self._internal_fds -= 1
     117  
     118      def _make_self_pipe(self):
     119          # A self-socket, really. :-)
     120          self._ssock, self._csock = socket.socketpair()
     121          self._ssock.setblocking(False)
     122          self._csock.setblocking(False)
     123          self._internal_fds += 1
     124          self._add_reader(self._ssock.fileno(), self._read_from_self)
     125  
     126      def _process_self_data(self, data):
     127          pass
     128  
     129      def _read_from_self(self):
     130          while True:
     131              try:
     132                  data = self._ssock.recv(4096)
     133                  if not data:
     134                      break
     135                  self._process_self_data(data)
     136              except InterruptedError:
     137                  continue
     138              except BlockingIOError:
     139                  break
     140  
     141      def _write_to_self(self):
     142          # This may be called from a different thread, possibly after
     143          # _close_self_pipe() has been called or even while it is
     144          # running.  Guard for self._csock being None or closed.  When
     145          # a socket is closed, send() raises OSError (with errno set to
     146          # EBADF, but let's not rely on the exact error code).
     147          csock = self._csock
     148          if csock is None:
     149              return
     150  
     151          try:
     152              csock.send(b'\0')
     153          except OSError:
     154              if self._debug:
     155                  logger.debug("Fail to write a null byte into the "
     156                               "self-pipe socket",
     157                               exc_info=True)
     158  
     159      def _start_serving(self, protocol_factory, sock,
     160                         sslcontext=None, server=None, backlog=100,
     161                         ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
     162                         ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
     163          self._add_reader(sock.fileno(), self._accept_connection,
     164                           protocol_factory, sock, sslcontext, server, backlog,
     165                           ssl_handshake_timeout, ssl_shutdown_timeout)
     166  
     167      def _accept_connection(
     168              self, protocol_factory, sock,
     169              sslcontext=None, server=None, backlog=100,
     170              ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
     171              ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
     172          # This method is only called once for each event loop tick where the
     173          # listening socket has triggered an EVENT_READ. There may be multiple
     174          # connections waiting for an .accept() so it is called in a loop.
     175          # See https://bugs.python.org/issue27906 for more details.
     176          for _ in range(backlog):
     177              try:
     178                  conn, addr = sock.accept()
     179                  if self._debug:
     180                      logger.debug("%r got a new connection from %r: %r",
     181                                   server, addr, conn)
     182                  conn.setblocking(False)
     183              except (BlockingIOError, InterruptedError, ConnectionAbortedError):
     184                  # Early exit because the socket accept buffer is empty.
     185                  return None
     186              except OSError as exc:
     187                  # There's nowhere to send the error, so just log it.
     188                  if exc.errno in (errno.EMFILE, errno.ENFILE,
     189                                   errno.ENOBUFS, errno.ENOMEM):
     190                      # Some platforms (e.g. Linux keep reporting the FD as
     191                      # ready, so we remove the read handler temporarily.
     192                      # We'll try again in a while.
     193                      self.call_exception_handler({
     194                          'message': 'socket.accept() out of system resource',
     195                          'exception': exc,
     196                          'socket': trsock.TransportSocket(sock),
     197                      })
     198                      self._remove_reader(sock.fileno())
     199                      self.call_later(constants.ACCEPT_RETRY_DELAY,
     200                                      self._start_serving,
     201                                      protocol_factory, sock, sslcontext, server,
     202                                      backlog, ssl_handshake_timeout,
     203                                      ssl_shutdown_timeout)
     204                  else:
     205                      raise  # The event loop will catch, log and ignore it.
     206              else:
     207                  extra = {'peername': addr}
     208                  accept = self._accept_connection2(
     209                      protocol_factory, conn, extra, sslcontext, server,
     210                      ssl_handshake_timeout, ssl_shutdown_timeout)
     211                  self.create_task(accept)
     212  
     213      async def _accept_connection2(
     214              self, protocol_factory, conn, extra,
     215              sslcontext=None, server=None,
     216              ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
     217              ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
     218          protocol = None
     219          transport = None
     220          try:
     221              protocol = protocol_factory()
     222              waiter = self.create_future()
     223              if sslcontext:
     224                  transport = self._make_ssl_transport(
     225                      conn, protocol, sslcontext, waiter=waiter,
     226                      server_side=True, extra=extra, server=server,
     227                      ssl_handshake_timeout=ssl_handshake_timeout,
     228                      ssl_shutdown_timeout=ssl_shutdown_timeout)
     229              else:
     230                  transport = self._make_socket_transport(
     231                      conn, protocol, waiter=waiter, extra=extra,
     232                      server=server)
     233  
     234              try:
     235                  await waiter
     236              except BaseException:
     237                  transport.close()
     238                  raise
     239                  # It's now up to the protocol to handle the connection.
     240  
     241          except (SystemExit, KeyboardInterrupt):
     242              raise
     243          except BaseException as exc:
     244              if self._debug:
     245                  context = {
     246                      'message':
     247                          'Error on transport creation for incoming connection',
     248                      'exception': exc,
     249                  }
     250                  if protocol is not None:
     251                      context['protocol'] = protocol
     252                  if transport is not None:
     253                      context['transport'] = transport
     254                  self.call_exception_handler(context)
     255  
     256      def _ensure_fd_no_transport(self, fd):
     257          fileno = fd
     258          if not isinstance(fileno, int):
     259              try:
     260                  fileno = int(fileno.fileno())
     261              except (AttributeError, TypeError, ValueError):
     262                  # This code matches selectors._fileobj_to_fd function.
     263                  raise ValueError(f"Invalid file object: {fd!r}") from None
     264          try:
     265              transport = self._transports[fileno]
     266          except KeyError:
     267              pass
     268          else:
     269              if not transport.is_closing():
     270                  raise RuntimeError(
     271                      f'File descriptor {fd!r} is used by transport '
     272                      f'{transport!r}')
     273  
     274      def _add_reader(self, fd, callback, *args):
     275          self._check_closed()
     276          handle = events.Handle(callback, args, self, None)
     277          try:
     278              key = self._selector.get_key(fd)
     279          except KeyError:
     280              self._selector.register(fd, selectors.EVENT_READ,
     281                                      (handle, None))
     282          else:
     283              mask, (reader, writer) = key.events, key.data
     284              self._selector.modify(fd, mask | selectors.EVENT_READ,
     285                                    (handle, writer))
     286              if reader is not None:
     287                  reader.cancel()
     288          return handle
     289  
     290      def _remove_reader(self, fd):
     291          if self.is_closed():
     292              return False
     293          try:
     294              key = self._selector.get_key(fd)
     295          except KeyError:
     296              return False
     297          else:
     298              mask, (reader, writer) = key.events, key.data
     299              mask &= ~selectors.EVENT_READ
     300              if not mask:
     301                  self._selector.unregister(fd)
     302              else:
     303                  self._selector.modify(fd, mask, (None, writer))
     304  
     305              if reader is not None:
     306                  reader.cancel()
     307                  return True
     308              else:
     309                  return False
     310  
     311      def _add_writer(self, fd, callback, *args):
     312          self._check_closed()
     313          handle = events.Handle(callback, args, self, None)
     314          try:
     315              key = self._selector.get_key(fd)
     316          except KeyError:
     317              self._selector.register(fd, selectors.EVENT_WRITE,
     318                                      (None, handle))
     319          else:
     320              mask, (reader, writer) = key.events, key.data
     321              self._selector.modify(fd, mask | selectors.EVENT_WRITE,
     322                                    (reader, handle))
     323              if writer is not None:
     324                  writer.cancel()
     325          return handle
     326  
     327      def _remove_writer(self, fd):
     328          """Remove a writer callback."""
     329          if self.is_closed():
     330              return False
     331          try:
     332              key = self._selector.get_key(fd)
     333          except KeyError:
     334              return False
     335          else:
     336              mask, (reader, writer) = key.events, key.data
     337              # Remove both writer and connector.
     338              mask &= ~selectors.EVENT_WRITE
     339              if not mask:
     340                  self._selector.unregister(fd)
     341              else:
     342                  self._selector.modify(fd, mask, (reader, None))
     343  
     344              if writer is not None:
     345                  writer.cancel()
     346                  return True
     347              else:
     348                  return False
     349  
     350      def add_reader(self, fd, callback, *args):
     351          """Add a reader callback."""
     352          self._ensure_fd_no_transport(fd)
     353          self._add_reader(fd, callback, *args)
     354  
     355      def remove_reader(self, fd):
     356          """Remove a reader callback."""
     357          self._ensure_fd_no_transport(fd)
     358          return self._remove_reader(fd)
     359  
     360      def add_writer(self, fd, callback, *args):
     361          """Add a writer callback.."""
     362          self._ensure_fd_no_transport(fd)
     363          self._add_writer(fd, callback, *args)
     364  
     365      def remove_writer(self, fd):
     366          """Remove a writer callback."""
     367          self._ensure_fd_no_transport(fd)
     368          return self._remove_writer(fd)
     369  
     370      async def sock_recv(self, sock, n):
     371          """Receive data from the socket.
     372  
     373          The return value is a bytes object representing the data received.
     374          The maximum amount of data to be received at once is specified by
     375          nbytes.
     376          """
     377          base_events._check_ssl_socket(sock)
     378          if self._debug and sock.gettimeout() != 0:
     379              raise ValueError("the socket must be non-blocking")
     380          try:
     381              return sock.recv(n)
     382          except (BlockingIOError, InterruptedError):
     383              pass
     384          fut = self.create_future()
     385          fd = sock.fileno()
     386          self._ensure_fd_no_transport(fd)
     387          handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
     388          fut.add_done_callback(
     389              functools.partial(self._sock_read_done, fd, handle=handle))
     390          return await fut
     391  
     392      def _sock_read_done(self, fd, fut, handle=None):
     393          if handle is None or not handle.cancelled():
     394              self.remove_reader(fd)
     395  
     396      def _sock_recv(self, fut, sock, n):
     397          # _sock_recv() can add itself as an I/O callback if the operation can't
     398          # be done immediately. Don't use it directly, call sock_recv().
     399          if fut.done():
     400              return
     401          try:
     402              data = sock.recv(n)
     403          except (BlockingIOError, InterruptedError):
     404              return  # try again next time
     405          except (SystemExit, KeyboardInterrupt):
     406              raise
     407          except BaseException as exc:
     408              fut.set_exception(exc)
     409          else:
     410              fut.set_result(data)
     411  
     412      async def sock_recv_into(self, sock, buf):
     413          """Receive data from the socket.
     414  
     415          The received data is written into *buf* (a writable buffer).
     416          The return value is the number of bytes written.
     417          """
     418          base_events._check_ssl_socket(sock)
     419          if self._debug and sock.gettimeout() != 0:
     420              raise ValueError("the socket must be non-blocking")
     421          try:
     422              return sock.recv_into(buf)
     423          except (BlockingIOError, InterruptedError):
     424              pass
     425          fut = self.create_future()
     426          fd = sock.fileno()
     427          self._ensure_fd_no_transport(fd)
     428          handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf)
     429          fut.add_done_callback(
     430              functools.partial(self._sock_read_done, fd, handle=handle))
     431          return await fut
     432  
     433      def _sock_recv_into(self, fut, sock, buf):
     434          # _sock_recv_into() can add itself as an I/O callback if the operation
     435          # can't be done immediately. Don't use it directly, call
     436          # sock_recv_into().
     437          if fut.done():
     438              return
     439          try:
     440              nbytes = sock.recv_into(buf)
     441          except (BlockingIOError, InterruptedError):
     442              return  # try again next time
     443          except (SystemExit, KeyboardInterrupt):
     444              raise
     445          except BaseException as exc:
     446              fut.set_exception(exc)
     447          else:
     448              fut.set_result(nbytes)
     449  
     450      async def sock_recvfrom(self, sock, bufsize):
     451          """Receive a datagram from a datagram socket.
     452  
     453          The return value is a tuple of (bytes, address) representing the
     454          datagram received and the address it came from.
     455          The maximum amount of data to be received at once is specified by
     456          nbytes.
     457          """
     458          base_events._check_ssl_socket(sock)
     459          if self._debug and sock.gettimeout() != 0:
     460              raise ValueError("the socket must be non-blocking")
     461          try:
     462              return sock.recvfrom(bufsize)
     463          except (BlockingIOError, InterruptedError):
     464              pass
     465          fut = self.create_future()
     466          fd = sock.fileno()
     467          self._ensure_fd_no_transport(fd)
     468          handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize)
     469          fut.add_done_callback(
     470              functools.partial(self._sock_read_done, fd, handle=handle))
     471          return await fut
     472  
     473      def _sock_recvfrom(self, fut, sock, bufsize):
     474          # _sock_recvfrom() can add itself as an I/O callback if the operation
     475          # can't be done immediately. Don't use it directly, call
     476          # sock_recvfrom().
     477          if fut.done():
     478              return
     479          try:
     480              result = sock.recvfrom(bufsize)
     481          except (BlockingIOError, InterruptedError):
     482              return  # try again next time
     483          except (SystemExit, KeyboardInterrupt):
     484              raise
     485          except BaseException as exc:
     486              fut.set_exception(exc)
     487          else:
     488              fut.set_result(result)
     489  
     490      async def sock_recvfrom_into(self, sock, buf, nbytes=0):
     491          """Receive data from the socket.
     492  
     493          The received data is written into *buf* (a writable buffer).
     494          The return value is a tuple of (number of bytes written, address).
     495          """
     496          base_events._check_ssl_socket(sock)
     497          if self._debug and sock.gettimeout() != 0:
     498              raise ValueError("the socket must be non-blocking")
     499          if not nbytes:
     500              nbytes = len(buf)
     501  
     502          try:
     503              return sock.recvfrom_into(buf, nbytes)
     504          except (BlockingIOError, InterruptedError):
     505              pass
     506          fut = self.create_future()
     507          fd = sock.fileno()
     508          self._ensure_fd_no_transport(fd)
     509          handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf,
     510                                    nbytes)
     511          fut.add_done_callback(
     512              functools.partial(self._sock_read_done, fd, handle=handle))
     513          return await fut
     514  
     515      def _sock_recvfrom_into(self, fut, sock, buf, bufsize):
     516          # _sock_recv_into() can add itself as an I/O callback if the operation
     517          # can't be done immediately. Don't use it directly, call
     518          # sock_recv_into().
     519          if fut.done():
     520              return
     521          try:
     522              result = sock.recvfrom_into(buf, bufsize)
     523          except (BlockingIOError, InterruptedError):
     524              return  # try again next time
     525          except (SystemExit, KeyboardInterrupt):
     526              raise
     527          except BaseException as exc:
     528              fut.set_exception(exc)
     529          else:
     530              fut.set_result(result)
     531  
     532      async def sock_sendall(self, sock, data):
     533          """Send data to the socket.
     534  
     535          The socket must be connected to a remote socket. This method continues
     536          to send data from data until either all data has been sent or an
     537          error occurs. None is returned on success. On error, an exception is
     538          raised, and there is no way to determine how much data, if any, was
     539          successfully processed by the receiving end of the connection.
     540          """
     541          base_events._check_ssl_socket(sock)
     542          if self._debug and sock.gettimeout() != 0:
     543              raise ValueError("the socket must be non-blocking")
     544          try:
     545              n = sock.send(data)
     546          except (BlockingIOError, InterruptedError):
     547              n = 0
     548  
     549          if n == len(data):
     550              # all data sent
     551              return
     552  
     553          fut = self.create_future()
     554          fd = sock.fileno()
     555          self._ensure_fd_no_transport(fd)
     556          # use a trick with a list in closure to store a mutable state
     557          handle = self._add_writer(fd, self._sock_sendall, fut, sock,
     558                                    memoryview(data), [n])
     559          fut.add_done_callback(
     560              functools.partial(self._sock_write_done, fd, handle=handle))
     561          return await fut
     562  
     563      def _sock_sendall(self, fut, sock, view, pos):
     564          if fut.done():
     565              # Future cancellation can be scheduled on previous loop iteration
     566              return
     567          start = pos[0]
     568          try:
     569              n = sock.send(view[start:])
     570          except (BlockingIOError, InterruptedError):
     571              return
     572          except (SystemExit, KeyboardInterrupt):
     573              raise
     574          except BaseException as exc:
     575              fut.set_exception(exc)
     576              return
     577  
     578          start += n
     579  
     580          if start == len(view):
     581              fut.set_result(None)
     582          else:
     583              pos[0] = start
     584  
     585      async def sock_sendto(self, sock, data, address):
     586          """Send data to the socket.
     587  
     588          The socket must be connected to a remote socket. This method continues
     589          to send data from data until either all data has been sent or an
     590          error occurs. None is returned on success. On error, an exception is
     591          raised, and there is no way to determine how much data, if any, was
     592          successfully processed by the receiving end of the connection.
     593          """
     594          base_events._check_ssl_socket(sock)
     595          if self._debug and sock.gettimeout() != 0:
     596              raise ValueError("the socket must be non-blocking")
     597          try:
     598              return sock.sendto(data, address)
     599          except (BlockingIOError, InterruptedError):
     600              pass
     601  
     602          fut = self.create_future()
     603          fd = sock.fileno()
     604          self._ensure_fd_no_transport(fd)
     605          # use a trick with a list in closure to store a mutable state
     606          handle = self._add_writer(fd, self._sock_sendto, fut, sock, data,
     607                                    address)
     608          fut.add_done_callback(
     609              functools.partial(self._sock_write_done, fd, handle=handle))
     610          return await fut
     611  
     612      def _sock_sendto(self, fut, sock, data, address):
     613          if fut.done():
     614              # Future cancellation can be scheduled on previous loop iteration
     615              return
     616          try:
     617              n = sock.sendto(data, 0, address)
     618          except (BlockingIOError, InterruptedError):
     619              return
     620          except (SystemExit, KeyboardInterrupt):
     621              raise
     622          except BaseException as exc:
     623              fut.set_exception(exc)
     624          else:
     625              fut.set_result(n)
     626  
     627      async def sock_connect(self, sock, address):
     628          """Connect to a remote socket at address.
     629  
     630          This method is a coroutine.
     631          """
     632          base_events._check_ssl_socket(sock)
     633          if self._debug and sock.gettimeout() != 0:
     634              raise ValueError("the socket must be non-blocking")
     635  
     636          if sock.family == socket.AF_INET or (
     637                  base_events._HAS_IPv6 and sock.family == socket.AF_INET6):
     638              resolved = await self._ensure_resolved(
     639                  address, family=sock.family, type=sock.type, proto=sock.proto,
     640                  loop=self,
     641              )
     642              _, _, _, _, address = resolved[0]
     643  
     644          fut = self.create_future()
     645          self._sock_connect(fut, sock, address)
     646          try:
     647              return await fut
     648          finally:
     649              # Needed to break cycles when an exception occurs.
     650              fut = None
     651  
     652      def _sock_connect(self, fut, sock, address):
     653          fd = sock.fileno()
     654          try:
     655              sock.connect(address)
     656          except (BlockingIOError, InterruptedError):
     657              # Issue #23618: When the C function connect() fails with EINTR, the
     658              # connection runs in background. We have to wait until the socket
     659              # becomes writable to be notified when the connection succeed or
     660              # fails.
     661              self._ensure_fd_no_transport(fd)
     662              handle = self._add_writer(
     663                  fd, self._sock_connect_cb, fut, sock, address)
     664              fut.add_done_callback(
     665                  functools.partial(self._sock_write_done, fd, handle=handle))
     666          except (SystemExit, KeyboardInterrupt):
     667              raise
     668          except BaseException as exc:
     669              fut.set_exception(exc)
     670          else:
     671              fut.set_result(None)
     672          finally:
     673              fut = None
     674  
     675      def _sock_write_done(self, fd, fut, handle=None):
     676          if handle is None or not handle.cancelled():
     677              self.remove_writer(fd)
     678  
     679      def _sock_connect_cb(self, fut, sock, address):
     680          if fut.done():
     681              return
     682  
     683          try:
     684              err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
     685              if err != 0:
     686                  # Jump to any except clause below.
     687                  raise OSError(err, f'Connect call failed {address}')
     688          except (BlockingIOError, InterruptedError):
     689              # socket is still registered, the callback will be retried later
     690              pass
     691          except (SystemExit, KeyboardInterrupt):
     692              raise
     693          except BaseException as exc:
     694              fut.set_exception(exc)
     695          else:
     696              fut.set_result(None)
     697          finally:
     698              fut = None
     699  
     700      async def sock_accept(self, sock):
     701          """Accept a connection.
     702  
     703          The socket must be bound to an address and listening for connections.
     704          The return value is a pair (conn, address) where conn is a new socket
     705          object usable to send and receive data on the connection, and address
     706          is the address bound to the socket on the other end of the connection.
     707          """
     708          base_events._check_ssl_socket(sock)
     709          if self._debug and sock.gettimeout() != 0:
     710              raise ValueError("the socket must be non-blocking")
     711          fut = self.create_future()
     712          self._sock_accept(fut, sock)
     713          return await fut
     714  
     715      def _sock_accept(self, fut, sock):
     716          fd = sock.fileno()
     717          try:
     718              conn, address = sock.accept()
     719              conn.setblocking(False)
     720          except (BlockingIOError, InterruptedError):
     721              self._ensure_fd_no_transport(fd)
     722              handle = self._add_reader(fd, self._sock_accept, fut, sock)
     723              fut.add_done_callback(
     724                  functools.partial(self._sock_read_done, fd, handle=handle))
     725          except (SystemExit, KeyboardInterrupt):
     726              raise
     727          except BaseException as exc:
     728              fut.set_exception(exc)
     729          else:
     730              fut.set_result((conn, address))
     731  
     732      async def _sendfile_native(self, transp, file, offset, count):
     733          del self._transports[transp._sock_fd]
     734          resume_reading = transp.is_reading()
     735          transp.pause_reading()
     736          await transp._make_empty_waiter()
     737          try:
     738              return await self.sock_sendfile(transp._sock, file, offset, count,
     739                                              fallback=False)
     740          finally:
     741              transp._reset_empty_waiter()
     742              if resume_reading:
     743                  transp.resume_reading()
     744              self._transports[transp._sock_fd] = transp
     745  
     746      def _process_events(self, event_list):
     747          for key, mask in event_list:
     748              fileobj, (reader, writer) = key.fileobj, key.data
     749              if mask & selectors.EVENT_READ and reader is not None:
     750                  if reader._cancelled:
     751                      self._remove_reader(fileobj)
     752                  else:
     753                      self._add_callback(reader)
     754              if mask & selectors.EVENT_WRITE and writer is not None:
     755                  if writer._cancelled:
     756                      self._remove_writer(fileobj)
     757                  else:
     758                      self._add_callback(writer)
     759  
     760      def _stop_serving(self, sock):
     761          self._remove_reader(sock.fileno())
     762          sock.close()
     763  
     764  
     765  class ESC[4;38;5;81m_SelectorTransport(ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149m_FlowControlMixin,
     766                           ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149mTransport):
     767  
     768      max_size = 256 * 1024  # Buffer size passed to recv().
     769  
     770      # Attribute used in the destructor: it must be set even if the constructor
     771      # is not called (see _SelectorSslTransport which may start by raising an
     772      # exception)
     773      _sock = None
     774  
     775      def __init__(self, loop, sock, protocol, extra=None, server=None):
     776          super().__init__(extra, loop)
     777          self._extra['socket'] = trsock.TransportSocket(sock)
     778          try:
     779              self._extra['sockname'] = sock.getsockname()
     780          except OSError:
     781              self._extra['sockname'] = None
     782          if 'peername' not in self._extra:
     783              try:
     784                  self._extra['peername'] = sock.getpeername()
     785              except socket.error:
     786                  self._extra['peername'] = None
     787          self._sock = sock
     788          self._sock_fd = sock.fileno()
     789  
     790          self._protocol_connected = False
     791          self.set_protocol(protocol)
     792  
     793          self._server = server
     794          self._buffer = collections.deque()
     795          self._conn_lost = 0  # Set when call to connection_lost scheduled.
     796          self._closing = False  # Set when close() called.
     797          self._paused = False  # Set when pause_reading() called
     798  
     799          if self._server is not None:
     800              self._server._attach()
     801          loop._transports[self._sock_fd] = self
     802  
     803      def __repr__(self):
     804          info = [self.__class__.__name__]
     805          if self._sock is None:
     806              info.append('closed')
     807          elif self._closing:
     808              info.append('closing')
     809          info.append(f'fd={self._sock_fd}')
     810          # test if the transport was closed
     811          if self._loop is not None and not self._loop.is_closed():
     812              polling = _test_selector_event(self._loop._selector,
     813                                             self._sock_fd, selectors.EVENT_READ)
     814              if polling:
     815                  info.append('read=polling')
     816              else:
     817                  info.append('read=idle')
     818  
     819              polling = _test_selector_event(self._loop._selector,
     820                                             self._sock_fd,
     821                                             selectors.EVENT_WRITE)
     822              if polling:
     823                  state = 'polling'
     824              else:
     825                  state = 'idle'
     826  
     827              bufsize = self.get_write_buffer_size()
     828              info.append(f'write=<{state}, bufsize={bufsize}>')
     829          return '<{}>'.format(' '.join(info))
     830  
     831      def abort(self):
     832          self._force_close(None)
     833  
     834      def set_protocol(self, protocol):
     835          self._protocol = protocol
     836          self._protocol_connected = True
     837  
     838      def get_protocol(self):
     839          return self._protocol
     840  
     841      def is_closing(self):
     842          return self._closing
     843  
     844      def is_reading(self):
     845          return not self.is_closing() and not self._paused
     846  
     847      def pause_reading(self):
     848          if not self.is_reading():
     849              return
     850          self._paused = True
     851          self._loop._remove_reader(self._sock_fd)
     852          if self._loop.get_debug():
     853              logger.debug("%r pauses reading", self)
     854  
     855      def resume_reading(self):
     856          if self._closing or not self._paused:
     857              return
     858          self._paused = False
     859          self._add_reader(self._sock_fd, self._read_ready)
     860          if self._loop.get_debug():
     861              logger.debug("%r resumes reading", self)
     862  
     863      def close(self):
     864          if self._closing:
     865              return
     866          self._closing = True
     867          self._loop._remove_reader(self._sock_fd)
     868          if not self._buffer:
     869              self._conn_lost += 1
     870              self._loop._remove_writer(self._sock_fd)
     871              self._loop.call_soon(self._call_connection_lost, None)
     872  
     873      def __del__(self, _warn=warnings.warn):
     874          if self._sock is not None:
     875              _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
     876              self._sock.close()
     877  
     878      def _fatal_error(self, exc, message='Fatal error on transport'):
     879          # Should be called from exception handler only.
     880          if isinstance(exc, OSError):
     881              if self._loop.get_debug():
     882                  logger.debug("%r: %s", self, message, exc_info=True)
     883          else:
     884              self._loop.call_exception_handler({
     885                  'message': message,
     886                  'exception': exc,
     887                  'transport': self,
     888                  'protocol': self._protocol,
     889              })
     890          self._force_close(exc)
     891  
     892      def _force_close(self, exc):
     893          if self._conn_lost:
     894              return
     895          if self._buffer:
     896              self._buffer.clear()
     897              self._loop._remove_writer(self._sock_fd)
     898          if not self._closing:
     899              self._closing = True
     900              self._loop._remove_reader(self._sock_fd)
     901          self._conn_lost += 1
     902          self._loop.call_soon(self._call_connection_lost, exc)
     903  
     904      def _call_connection_lost(self, exc):
     905          try:
     906              if self._protocol_connected:
     907                  self._protocol.connection_lost(exc)
     908          finally:
     909              self._sock.close()
     910              self._sock = None
     911              self._protocol = None
     912              self._loop = None
     913              server = self._server
     914              if server is not None:
     915                  server._detach()
     916                  self._server = None
     917  
     918      def get_write_buffer_size(self):
     919          return sum(map(len, self._buffer))
     920  
     921      def _add_reader(self, fd, callback, *args):
     922          if not self.is_reading():
     923              return
     924          self._loop._add_reader(fd, callback, *args)
     925  
     926  
     927  class ESC[4;38;5;81m_SelectorSocketTransport(ESC[4;38;5;149m_SelectorTransport):
     928  
     929      _start_tls_compatible = True
     930      _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
     931  
     932      def __init__(self, loop, sock, protocol, waiter=None,
     933                   extra=None, server=None):
     934  
     935          self._read_ready_cb = None
     936          super().__init__(loop, sock, protocol, extra, server)
     937          self._eof = False
     938          self._empty_waiter = None
     939          if _HAS_SENDMSG:
     940              self._write_ready = self._write_sendmsg
     941          else:
     942              self._write_ready = self._write_send
     943          # Disable the Nagle algorithm -- small writes will be
     944          # sent without waiting for the TCP ACK.  This generally
     945          # decreases the latency (in some cases significantly.)
     946          base_events._set_nodelay(self._sock)
     947  
     948          self._loop.call_soon(self._protocol.connection_made, self)
     949          # only start reading when connection_made() has been called
     950          self._loop.call_soon(self._add_reader,
     951                               self._sock_fd, self._read_ready)
     952          if waiter is not None:
     953              # only wake up the waiter when connection_made() has been called
     954              self._loop.call_soon(futures._set_result_unless_cancelled,
     955                                   waiter, None)
     956  
     957      def set_protocol(self, protocol):
     958          if isinstance(protocol, protocols.BufferedProtocol):
     959              self._read_ready_cb = self._read_ready__get_buffer
     960          else:
     961              self._read_ready_cb = self._read_ready__data_received
     962  
     963          super().set_protocol(protocol)
     964  
     965      def _read_ready(self):
     966          self._read_ready_cb()
     967  
     968      def _read_ready__get_buffer(self):
     969          if self._conn_lost:
     970              return
     971  
     972          try:
     973              buf = self._protocol.get_buffer(-1)
     974              if not len(buf):
     975                  raise RuntimeError('get_buffer() returned an empty buffer')
     976          except (SystemExit, KeyboardInterrupt):
     977              raise
     978          except BaseException as exc:
     979              self._fatal_error(
     980                  exc, 'Fatal error: protocol.get_buffer() call failed.')
     981              return
     982  
     983          try:
     984              nbytes = self._sock.recv_into(buf)
     985          except (BlockingIOError, InterruptedError):
     986              return
     987          except (SystemExit, KeyboardInterrupt):
     988              raise
     989          except BaseException as exc:
     990              self._fatal_error(exc, 'Fatal read error on socket transport')
     991              return
     992  
     993          if not nbytes:
     994              self._read_ready__on_eof()
     995              return
     996  
     997          try:
     998              self._protocol.buffer_updated(nbytes)
     999          except (SystemExit, KeyboardInterrupt):
    1000              raise
    1001          except BaseException as exc:
    1002              self._fatal_error(
    1003                  exc, 'Fatal error: protocol.buffer_updated() call failed.')
    1004  
    1005      def _read_ready__data_received(self):
    1006          if self._conn_lost:
    1007              return
    1008          try:
    1009              data = self._sock.recv(self.max_size)
    1010          except (BlockingIOError, InterruptedError):
    1011              return
    1012          except (SystemExit, KeyboardInterrupt):
    1013              raise
    1014          except BaseException as exc:
    1015              self._fatal_error(exc, 'Fatal read error on socket transport')
    1016              return
    1017  
    1018          if not data:
    1019              self._read_ready__on_eof()
    1020              return
    1021  
    1022          try:
    1023              self._protocol.data_received(data)
    1024          except (SystemExit, KeyboardInterrupt):
    1025              raise
    1026          except BaseException as exc:
    1027              self._fatal_error(
    1028                  exc, 'Fatal error: protocol.data_received() call failed.')
    1029  
    1030      def _read_ready__on_eof(self):
    1031          if self._loop.get_debug():
    1032              logger.debug("%r received EOF", self)
    1033  
    1034          try:
    1035              keep_open = self._protocol.eof_received()
    1036          except (SystemExit, KeyboardInterrupt):
    1037              raise
    1038          except BaseException as exc:
    1039              self._fatal_error(
    1040                  exc, 'Fatal error: protocol.eof_received() call failed.')
    1041              return
    1042  
    1043          if keep_open:
    1044              # We're keeping the connection open so the
    1045              # protocol can write more, but we still can't
    1046              # receive more, so remove the reader callback.
    1047              self._loop._remove_reader(self._sock_fd)
    1048          else:
    1049              self.close()
    1050  
    1051      def write(self, data):
    1052          if not isinstance(data, (bytes, bytearray, memoryview)):
    1053              raise TypeError(f'data argument must be a bytes-like object, '
    1054                              f'not {type(data).__name__!r}')
    1055          if self._eof:
    1056              raise RuntimeError('Cannot call write() after write_eof()')
    1057          if self._empty_waiter is not None:
    1058              raise RuntimeError('unable to write; sendfile is in progress')
    1059          if not data:
    1060              return
    1061  
    1062          if self._conn_lost:
    1063              if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
    1064                  logger.warning('socket.send() raised exception.')
    1065              self._conn_lost += 1
    1066              return
    1067  
    1068          if not self._buffer:
    1069              # Optimization: try to send now.
    1070              try:
    1071                  n = self._sock.send(data)
    1072              except (BlockingIOError, InterruptedError):
    1073                  pass
    1074              except (SystemExit, KeyboardInterrupt):
    1075                  raise
    1076              except BaseException as exc:
    1077                  self._fatal_error(exc, 'Fatal write error on socket transport')
    1078                  return
    1079              else:
    1080                  data = memoryview(data)[n:]
    1081                  if not data:
    1082                      return
    1083              # Not all was written; register write handler.
    1084              self._loop._add_writer(self._sock_fd, self._write_ready)
    1085  
    1086          # Add it to the buffer.
    1087          self._buffer.append(data)
    1088          self._maybe_pause_protocol()
    1089  
    1090      def _get_sendmsg_buffer(self):
    1091          return itertools.islice(self._buffer, SC_IOV_MAX)
    1092  
    1093      def _write_sendmsg(self):
    1094          assert self._buffer, 'Data should not be empty'
    1095          if self._conn_lost:
    1096              return
    1097          try:
    1098              nbytes = self._sock.sendmsg(self._get_sendmsg_buffer())
    1099              self._adjust_leftover_buffer(nbytes)
    1100          except (BlockingIOError, InterruptedError):
    1101              pass
    1102          except (SystemExit, KeyboardInterrupt):
    1103              raise
    1104          except BaseException as exc:
    1105              self._loop._remove_writer(self._sock_fd)
    1106              self._buffer.clear()
    1107              self._fatal_error(exc, 'Fatal write error on socket transport')
    1108              if self._empty_waiter is not None:
    1109                  self._empty_waiter.set_exception(exc)
    1110          else:
    1111              self._maybe_resume_protocol()  # May append to buffer.
    1112              if not self._buffer:
    1113                  self._loop._remove_writer(self._sock_fd)
    1114                  if self._empty_waiter is not None:
    1115                      self._empty_waiter.set_result(None)
    1116                  if self._closing:
    1117                      self._call_connection_lost(None)
    1118                  elif self._eof:
    1119                      self._sock.shutdown(socket.SHUT_WR)
    1120  
    1121      def _adjust_leftover_buffer(self, nbytes: int) -> None:
    1122          buffer = self._buffer
    1123          while nbytes:
    1124              b = buffer.popleft()
    1125              b_len = len(b)
    1126              if b_len <= nbytes:
    1127                  nbytes -= b_len
    1128              else:
    1129                  buffer.appendleft(b[nbytes:])
    1130                  break
    1131  
    1132      def _write_send(self):
    1133          assert self._buffer, 'Data should not be empty'
    1134          if self._conn_lost:
    1135              return
    1136          try:
    1137              buffer = self._buffer.popleft()
    1138              n = self._sock.send(buffer)
    1139              if n != len(buffer):
    1140                  # Not all data was written
    1141                  self._buffer.appendleft(buffer[n:])
    1142          except (BlockingIOError, InterruptedError):
    1143              pass
    1144          except (SystemExit, KeyboardInterrupt):
    1145              raise
    1146          except BaseException as exc:
    1147              self._loop._remove_writer(self._sock_fd)
    1148              self._buffer.clear()
    1149              self._fatal_error(exc, 'Fatal write error on socket transport')
    1150              if self._empty_waiter is not None:
    1151                  self._empty_waiter.set_exception(exc)
    1152          else:
    1153              self._maybe_resume_protocol()  # May append to buffer.
    1154              if not self._buffer:
    1155                  self._loop._remove_writer(self._sock_fd)
    1156                  if self._empty_waiter is not None:
    1157                      self._empty_waiter.set_result(None)
    1158                  if self._closing:
    1159                      self._call_connection_lost(None)
    1160                  elif self._eof:
    1161                      self._sock.shutdown(socket.SHUT_WR)
    1162  
    1163      def write_eof(self):
    1164          if self._closing or self._eof:
    1165              return
    1166          self._eof = True
    1167          if not self._buffer:
    1168              self._sock.shutdown(socket.SHUT_WR)
    1169  
    1170      def writelines(self, list_of_data):
    1171          if self._eof:
    1172              raise RuntimeError('Cannot call writelines() after write_eof()')
    1173          if self._empty_waiter is not None:
    1174              raise RuntimeError('unable to writelines; sendfile is in progress')
    1175          if not list_of_data:
    1176              return
    1177          self._buffer.extend([memoryview(data) for data in list_of_data])
    1178          self._write_ready()
    1179          # If the entire buffer couldn't be written, register a write handler
    1180          if self._buffer:
    1181              self._loop._add_writer(self._sock_fd, self._write_ready)
    1182  
    1183      def can_write_eof(self):
    1184          return True
    1185  
    1186      def _call_connection_lost(self, exc):
    1187          super()._call_connection_lost(exc)
    1188          if self._empty_waiter is not None:
    1189              self._empty_waiter.set_exception(
    1190                  ConnectionError("Connection is closed by peer"))
    1191  
    1192      def _make_empty_waiter(self):
    1193          if self._empty_waiter is not None:
    1194              raise RuntimeError("Empty waiter is already set")
    1195          self._empty_waiter = self._loop.create_future()
    1196          if not self._buffer:
    1197              self._empty_waiter.set_result(None)
    1198          return self._empty_waiter
    1199  
    1200      def _reset_empty_waiter(self):
    1201          self._empty_waiter = None
    1202  
    1203      def close(self):
    1204          self._read_ready_cb = None
    1205          self._write_ready = None
    1206          super().close()
    1207  
    1208  
    1209  class ESC[4;38;5;81m_SelectorDatagramTransport(ESC[4;38;5;149m_SelectorTransport, ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149mDatagramTransport):
    1210  
    1211      _buffer_factory = collections.deque
    1212  
    1213      def __init__(self, loop, sock, protocol, address=None,
    1214                   waiter=None, extra=None):
    1215          super().__init__(loop, sock, protocol, extra)
    1216          self._address = address
    1217          self._buffer_size = 0
    1218          self._loop.call_soon(self._protocol.connection_made, self)
    1219          # only start reading when connection_made() has been called
    1220          self._loop.call_soon(self._add_reader,
    1221                               self._sock_fd, self._read_ready)
    1222          if waiter is not None:
    1223              # only wake up the waiter when connection_made() has been called
    1224              self._loop.call_soon(futures._set_result_unless_cancelled,
    1225                                   waiter, None)
    1226  
    1227      def get_write_buffer_size(self):
    1228          return self._buffer_size
    1229  
    1230      def _read_ready(self):
    1231          if self._conn_lost:
    1232              return
    1233          try:
    1234              data, addr = self._sock.recvfrom(self.max_size)
    1235          except (BlockingIOError, InterruptedError):
    1236              pass
    1237          except OSError as exc:
    1238              self._protocol.error_received(exc)
    1239          except (SystemExit, KeyboardInterrupt):
    1240              raise
    1241          except BaseException as exc:
    1242              self._fatal_error(exc, 'Fatal read error on datagram transport')
    1243          else:
    1244              self._protocol.datagram_received(data, addr)
    1245  
    1246      def sendto(self, data, addr=None):
    1247          if not isinstance(data, (bytes, bytearray, memoryview)):
    1248              raise TypeError(f'data argument must be a bytes-like object, '
    1249                              f'not {type(data).__name__!r}')
    1250          if not data:
    1251              return
    1252  
    1253          if self._address:
    1254              if addr not in (None, self._address):
    1255                  raise ValueError(
    1256                      f'Invalid address: must be None or {self._address}')
    1257              addr = self._address
    1258  
    1259          if self._conn_lost and self._address:
    1260              if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
    1261                  logger.warning('socket.send() raised exception.')
    1262              self._conn_lost += 1
    1263              return
    1264  
    1265          if not self._buffer:
    1266              # Attempt to send it right away first.
    1267              try:
    1268                  if self._extra['peername']:
    1269                      self._sock.send(data)
    1270                  else:
    1271                      self._sock.sendto(data, addr)
    1272                  return
    1273              except (BlockingIOError, InterruptedError):
    1274                  self._loop._add_writer(self._sock_fd, self._sendto_ready)
    1275              except OSError as exc:
    1276                  self._protocol.error_received(exc)
    1277                  return
    1278              except (SystemExit, KeyboardInterrupt):
    1279                  raise
    1280              except BaseException as exc:
    1281                  self._fatal_error(
    1282                      exc, 'Fatal write error on datagram transport')
    1283                  return
    1284  
    1285          # Ensure that what we buffer is immutable.
    1286          self._buffer.append((bytes(data), addr))
    1287          self._buffer_size += len(data)
    1288          self._maybe_pause_protocol()
    1289  
    1290      def _sendto_ready(self):
    1291          while self._buffer:
    1292              data, addr = self._buffer.popleft()
    1293              self._buffer_size -= len(data)
    1294              try:
    1295                  if self._extra['peername']:
    1296                      self._sock.send(data)
    1297                  else:
    1298                      self._sock.sendto(data, addr)
    1299              except (BlockingIOError, InterruptedError):
    1300                  self._buffer.appendleft((data, addr))  # Try again later.
    1301                  self._buffer_size += len(data)
    1302                  break
    1303              except OSError as exc:
    1304                  self._protocol.error_received(exc)
    1305                  return
    1306              except (SystemExit, KeyboardInterrupt):
    1307                  raise
    1308              except BaseException as exc:
    1309                  self._fatal_error(
    1310                      exc, 'Fatal write error on datagram transport')
    1311                  return
    1312  
    1313          self._maybe_resume_protocol()  # May append to buffer.
    1314          if not self._buffer:
    1315              self._loop._remove_writer(self._sock_fd)
    1316              if self._closing:
    1317                  self._call_connection_lost(None)