(root)/
Python-3.11.7/
Lib/
test/
support/
socket_helper.py
       1  import contextlib
       2  import errno
       3  import socket
       4  import subprocess
       5  import sys
       6  import unittest
       7  
       8  from .. import support
       9  from . import warnings_helper
      10  
      11  HOST = "localhost"
      12  HOSTv4 = "127.0.0.1"
      13  HOSTv6 = "::1"
      14  
      15  # WASI SDK 15.0 does not provide gethostname, stub raises OSError ENOTSUP.
      16  has_gethostname = not support.is_wasi
      17  
      18  
      19  def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
      20      """Returns an unused port that should be suitable for binding.  This is
      21      achieved by creating a temporary socket with the same family and type as
      22      the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
      23      the specified host address (defaults to 0.0.0.0) with the port set to 0,
      24      eliciting an unused ephemeral port from the OS.  The temporary socket is
      25      then closed and deleted, and the ephemeral port is returned.
      26  
      27      Either this method or bind_port() should be used for any tests where a
      28      server socket needs to be bound to a particular port for the duration of
      29      the test.  Which one to use depends on whether the calling code is creating
      30      a python socket, or if an unused port needs to be provided in a constructor
      31      or passed to an external program (i.e. the -accept argument to openssl's
      32      s_server mode).  Always prefer bind_port() over find_unused_port() where
      33      possible.  Hard coded ports should *NEVER* be used.  As soon as a server
      34      socket is bound to a hard coded port, the ability to run multiple instances
      35      of the test simultaneously on the same host is compromised, which makes the
      36      test a ticking time bomb in a buildbot environment. On Unix buildbots, this
      37      may simply manifest as a failed test, which can be recovered from without
      38      intervention in most cases, but on Windows, the entire python process can
      39      completely and utterly wedge, requiring someone to log in to the buildbot
      40      and manually kill the affected process.
      41  
      42      (This is easy to reproduce on Windows, unfortunately, and can be traced to
      43      the SO_REUSEADDR socket option having different semantics on Windows versus
      44      Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
      45      listen and then accept connections on identical host/ports.  An EADDRINUSE
      46      OSError will be raised at some point (depending on the platform and
      47      the order bind and listen were called on each socket).
      48  
      49      However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
      50      will ever be raised when attempting to bind two identical host/ports. When
      51      accept() is called on each socket, the second caller's process will steal
      52      the port from the first caller, leaving them both in an awkwardly wedged
      53      state where they'll no longer respond to any signals or graceful kills, and
      54      must be forcibly killed via OpenProcess()/TerminateProcess().
      55  
      56      The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
      57      instead of SO_REUSEADDR, which effectively affords the same semantics as
      58      SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
      59      Source world compared to Windows ones, this is a common mistake.  A quick
      60      look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
      61      openssl.exe is called with the 's_server' option, for example. See
      62      http://bugs.python.org/issue2550 for more info.  The following site also
      63      has a very thorough description about the implications of both REUSEADDR
      64      and EXCLUSIVEADDRUSE on Windows:
      65      https://learn.microsoft.com/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse
      66  
      67      XXX: although this approach is a vast improvement on previous attempts to
      68      elicit unused ports, it rests heavily on the assumption that the ephemeral
      69      port returned to us by the OS won't immediately be dished back out to some
      70      other process when we close and delete our temporary socket but before our
      71      calling code has a chance to bind the returned port.  We can deal with this
      72      issue if/when we come across it.
      73      """
      74  
      75      with socket.socket(family, socktype) as tempsock:
      76          port = bind_port(tempsock)
      77      del tempsock
      78      return port
      79  
      80  def bind_port(sock, host=HOST):
      81      """Bind the socket to a free port and return the port number.  Relies on
      82      ephemeral ports in order to ensure we are using an unbound port.  This is
      83      important as many tests may be running simultaneously, especially in a
      84      buildbot environment.  This method raises an exception if the sock.family
      85      is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
      86      or SO_REUSEPORT set on it.  Tests should *never* set these socket options
      87      for TCP/IP sockets.  The only case for setting these options is testing
      88      multicasting via multiple UDP sockets.
      89  
      90      Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
      91      on Windows), it will be set on the socket.  This will prevent anyone else
      92      from bind()'ing to our host/port for the duration of the test.
      93      """
      94  
      95      if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
      96          if hasattr(socket, 'SO_REUSEADDR'):
      97              if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
      98                  raise support.TestFailed("tests should never set the "
      99                                           "SO_REUSEADDR socket option on "
     100                                           "TCP/IP sockets!")
     101          if hasattr(socket, 'SO_REUSEPORT'):
     102              try:
     103                  if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
     104                      raise support.TestFailed("tests should never set the "
     105                                               "SO_REUSEPORT socket option on "
     106                                               "TCP/IP sockets!")
     107              except OSError:
     108                  # Python's socket module was compiled using modern headers
     109                  # thus defining SO_REUSEPORT but this process is running
     110                  # under an older kernel that does not support SO_REUSEPORT.
     111                  pass
     112          if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
     113              sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
     114  
     115      sock.bind((host, 0))
     116      port = sock.getsockname()[1]
     117      return port
     118  
     119  def bind_unix_socket(sock, addr):
     120      """Bind a unix socket, raising SkipTest if PermissionError is raised."""
     121      assert sock.family == socket.AF_UNIX
     122      try:
     123          sock.bind(addr)
     124      except PermissionError:
     125          sock.close()
     126          raise unittest.SkipTest('cannot bind AF_UNIX sockets')
     127  
     128  def _is_ipv6_enabled():
     129      """Check whether IPv6 is enabled on this host."""
     130      if socket.has_ipv6:
     131          sock = None
     132          try:
     133              sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
     134              sock.bind((HOSTv6, 0))
     135              return True
     136          except OSError:
     137              pass
     138          finally:
     139              if sock:
     140                  sock.close()
     141      return False
     142  
     143  IPV6_ENABLED = _is_ipv6_enabled()
     144  
     145  
     146  _bind_nix_socket_error = None
     147  def skip_unless_bind_unix_socket(test):
     148      """Decorator for tests requiring a functional bind() for unix sockets."""
     149      if not hasattr(socket, 'AF_UNIX'):
     150          return unittest.skip('No UNIX Sockets')(test)
     151      global _bind_nix_socket_error
     152      if _bind_nix_socket_error is None:
     153          from .os_helper import TESTFN, unlink
     154          path = TESTFN + "can_bind_unix_socket"
     155          with socket.socket(socket.AF_UNIX) as sock:
     156              try:
     157                  sock.bind(path)
     158                  _bind_nix_socket_error = False
     159              except OSError as e:
     160                  _bind_nix_socket_error = e
     161              finally:
     162                  unlink(path)
     163      if _bind_nix_socket_error:
     164          msg = 'Requires a functional unix bind(): %s' % _bind_nix_socket_error
     165          return unittest.skip(msg)(test)
     166      else:
     167          return test
     168  
     169  
     170  def get_socket_conn_refused_errs():
     171      """
     172      Get the different socket error numbers ('errno') which can be received
     173      when a connection is refused.
     174      """
     175      errors = [errno.ECONNREFUSED]
     176      if hasattr(errno, 'ENETUNREACH'):
     177          # On Solaris, ENETUNREACH is returned sometimes instead of ECONNREFUSED
     178          errors.append(errno.ENETUNREACH)
     179      if hasattr(errno, 'EADDRNOTAVAIL'):
     180          # bpo-31910: socket.create_connection() fails randomly
     181          # with EADDRNOTAVAIL on Travis CI
     182          errors.append(errno.EADDRNOTAVAIL)
     183      if hasattr(errno, 'EHOSTUNREACH'):
     184          # bpo-37583: The destination host cannot be reached
     185          errors.append(errno.EHOSTUNREACH)
     186      if not IPV6_ENABLED:
     187          errors.append(errno.EAFNOSUPPORT)
     188      return errors
     189  
     190  
     191  _NOT_SET = object()
     192  
     193  @contextlib.contextmanager
     194  def transient_internet(resource_name, *, timeout=_NOT_SET, errnos=()):
     195      """Return a context manager that raises ResourceDenied when various issues
     196      with the internet connection manifest themselves as exceptions."""
     197      nntplib = warnings_helper.import_deprecated("nntplib")
     198      import urllib.error
     199      if timeout is _NOT_SET:
     200          timeout = support.INTERNET_TIMEOUT
     201  
     202      default_errnos = [
     203          ('ECONNREFUSED', 111),
     204          ('ECONNRESET', 104),
     205          ('EHOSTUNREACH', 113),
     206          ('ENETUNREACH', 101),
     207          ('ETIMEDOUT', 110),
     208          # socket.create_connection() fails randomly with
     209          # EADDRNOTAVAIL on Travis CI.
     210          ('EADDRNOTAVAIL', 99),
     211      ]
     212      default_gai_errnos = [
     213          ('EAI_AGAIN', -3),
     214          ('EAI_FAIL', -4),
     215          ('EAI_NONAME', -2),
     216          ('EAI_NODATA', -5),
     217          # Encountered when trying to resolve IPv6-only hostnames
     218          ('WSANO_DATA', 11004),
     219      ]
     220  
     221      denied = support.ResourceDenied("Resource %r is not available" % resource_name)
     222      captured_errnos = errnos
     223      gai_errnos = []
     224      if not captured_errnos:
     225          captured_errnos = [getattr(errno, name, num)
     226                             for (name, num) in default_errnos]
     227          gai_errnos = [getattr(socket, name, num)
     228                        for (name, num) in default_gai_errnos]
     229  
     230      def filter_error(err):
     231          n = getattr(err, 'errno', None)
     232          if (isinstance(err, TimeoutError) or
     233              (isinstance(err, socket.gaierror) and n in gai_errnos) or
     234              (isinstance(err, urllib.error.HTTPError) and
     235               500 <= err.code <= 599) or
     236              (isinstance(err, urllib.error.URLError) and
     237                   (("ConnectionRefusedError" in err.reason) or
     238                    ("TimeoutError" in err.reason) or
     239                    ("EOFError" in err.reason))) or
     240              n in captured_errnos):
     241              if not support.verbose:
     242                  sys.stderr.write(denied.args[0] + "\n")
     243              raise denied from err
     244  
     245      old_timeout = socket.getdefaulttimeout()
     246      try:
     247          if timeout is not None:
     248              socket.setdefaulttimeout(timeout)
     249          yield
     250      except nntplib.NNTPTemporaryError as err:
     251          if support.verbose:
     252              sys.stderr.write(denied.args[0] + "\n")
     253          raise denied from err
     254      except OSError as err:
     255          # urllib can wrap original socket errors multiple times (!), we must
     256          # unwrap to get at the original error.
     257          while True:
     258              a = err.args
     259              if len(a) >= 1 and isinstance(a[0], OSError):
     260                  err = a[0]
     261              # The error can also be wrapped as args[1]:
     262              #    except socket.error as msg:
     263              #        raise OSError('socket error', msg) from msg
     264              elif len(a) >= 2 and isinstance(a[1], OSError):
     265                  err = a[1]
     266              else:
     267                  break
     268          filter_error(err)
     269          raise
     270      # XXX should we catch generic exceptions and look for their
     271      # __cause__ or __context__?
     272      finally:
     273          socket.setdefaulttimeout(old_timeout)
     274  
     275  
     276  # consider that sysctl values should not change while tests are running
     277  _sysctl_cache = {}
     278  
     279  def _get_sysctl(name):
     280      """Get a sysctl value as an integer."""
     281      try:
     282          return _sysctl_cache[name]
     283      except KeyError:
     284          pass
     285  
     286      # At least Linux and FreeBSD support the "-n" option
     287      cmd = ['sysctl', '-n', name]
     288      proc = subprocess.run(cmd,
     289                            stdout=subprocess.PIPE,
     290                            stderr=subprocess.STDOUT,
     291                            text=True)
     292      if proc.returncode:
     293          support.print_warning(f"{' '.join(cmd)!r} command failed with "
     294                                f"exit code {proc.returncode}")
     295          # cache the error to only log the warning once
     296          _sysctl_cache[name] = None
     297          return None
     298      output = proc.stdout
     299  
     300      # Parse '0\n' to get '0'
     301      try:
     302          value = int(output.strip())
     303      except Exception as exc:
     304          support.print_warning(f"Failed to parse {' '.join(cmd)!r} "
     305                                f"command output {output!r}: {exc!r}")
     306          # cache the error to only log the warning once
     307          _sysctl_cache[name] = None
     308          return None
     309  
     310      _sysctl_cache[name] = value
     311      return value
     312  
     313  
     314  def tcp_blackhole():
     315      if not sys.platform.startswith('freebsd'):
     316          return False
     317  
     318      # gh-109015: test if FreeBSD TCP blackhole is enabled
     319      value = _get_sysctl('net.inet.tcp.blackhole')
     320      if value is None:
     321          # don't skip if we fail to get the sysctl value
     322          return False
     323      return (value != 0)
     324  
     325  
     326  def skip_if_tcp_blackhole(test):
     327      """Decorator skipping test if TCP blackhole is enabled."""
     328      skip_if = unittest.skipIf(
     329          tcp_blackhole(),
     330          "TCP blackhole is enabled (sysctl net.inet.tcp.blackhole)"
     331      )
     332      return skip_if(test)