1  import contextlib
       2  import errno
       3  import os.path
       4  import socket
       5  import sys
       6  import subprocess
       7  import tempfile
       8  import unittest
       9  
      10  from .. import support
      11  from . import warnings_helper
      12  
      13  HOST = "localhost"
      14  HOSTv4 = "127.0.0.1"
      15  HOSTv6 = "::1"
      16  
      17  # WASI SDK 15.0 does not provide gethostname, stub raises OSError ENOTSUP.
      18  has_gethostname = not support.is_wasi
      19  
      20  
      21  def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
      22      """Returns an unused port that should be suitable for binding.  This is
      23      achieved by creating a temporary socket with the same family and type as
      24      the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
      25      the specified host address (defaults to 0.0.0.0) with the port set to 0,
      26      eliciting an unused ephemeral port from the OS.  The temporary socket is
      27      then closed and deleted, and the ephemeral port is returned.
      28  
      29      Either this method or bind_port() should be used for any tests where a
      30      server socket needs to be bound to a particular port for the duration of
      31      the test.  Which one to use depends on whether the calling code is creating
      32      a python socket, or if an unused port needs to be provided in a constructor
      33      or passed to an external program (i.e. the -accept argument to openssl's
      34      s_server mode).  Always prefer bind_port() over find_unused_port() where
      35      possible.  Hard coded ports should *NEVER* be used.  As soon as a server
      36      socket is bound to a hard coded port, the ability to run multiple instances
      37      of the test simultaneously on the same host is compromised, which makes the
      38      test a ticking time bomb in a buildbot environment. On Unix buildbots, this
      39      may simply manifest as a failed test, which can be recovered from without
      40      intervention in most cases, but on Windows, the entire python process can
      41      completely and utterly wedge, requiring someone to log in to the buildbot
      42      and manually kill the affected process.
      43  
      44      (This is easy to reproduce on Windows, unfortunately, and can be traced to
      45      the SO_REUSEADDR socket option having different semantics on Windows versus
      46      Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
      47      listen and then accept connections on identical host/ports.  An EADDRINUSE
      48      OSError will be raised at some point (depending on the platform and
      49      the order bind and listen were called on each socket).
      50  
      51      However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
      52      will ever be raised when attempting to bind two identical host/ports. When
      53      accept() is called on each socket, the second caller's process will steal
      54      the port from the first caller, leaving them both in an awkwardly wedged
      55      state where they'll no longer respond to any signals or graceful kills, and
      56      must be forcibly killed via OpenProcess()/TerminateProcess().
      57  
      58      The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
      59      instead of SO_REUSEADDR, which effectively affords the same semantics as
      60      SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
      61      Source world compared to Windows ones, this is a common mistake.  A quick
      62      look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
      63      openssl.exe is called with the 's_server' option, for example. See
      64      http://bugs.python.org/issue2550 for more info.  The following site also
      65      has a very thorough description about the implications of both REUSEADDR
      66      and EXCLUSIVEADDRUSE on Windows:
      67      https://learn.microsoft.com/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse
      68  
      69      XXX: although this approach is a vast improvement on previous attempts to
      70      elicit unused ports, it rests heavily on the assumption that the ephemeral
      71      port returned to us by the OS won't immediately be dished back out to some
      72      other process when we close and delete our temporary socket but before our
      73      calling code has a chance to bind the returned port.  We can deal with this
      74      issue if/when we come across it.
      75      """
      76  
      77      with socket.socket(family, socktype) as tempsock:
      78          port = bind_port(tempsock)
      79      del tempsock
      80      return port
      81  
      82  def bind_port(sock, host=HOST):
      83      """Bind the socket to a free port and return the port number.  Relies on
      84      ephemeral ports in order to ensure we are using an unbound port.  This is
      85      important as many tests may be running simultaneously, especially in a
      86      buildbot environment.  This method raises an exception if the sock.family
      87      is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
      88      or SO_REUSEPORT set on it.  Tests should *never* set these socket options
      89      for TCP/IP sockets.  The only case for setting these options is testing
      90      multicasting via multiple UDP sockets.
      91  
      92      Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
      93      on Windows), it will be set on the socket.  This will prevent anyone else
      94      from bind()'ing to our host/port for the duration of the test.
      95      """
      96  
      97      if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
      98          if hasattr(socket, 'SO_REUSEADDR'):
      99              if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
     100                  raise support.TestFailed("tests should never set the "
     101                                           "SO_REUSEADDR socket option on "
     102                                           "TCP/IP sockets!")
     103          if hasattr(socket, 'SO_REUSEPORT'):
     104              try:
     105                  if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
     106                      raise support.TestFailed("tests should never set the "
     107                                               "SO_REUSEPORT socket option on "
     108                                               "TCP/IP sockets!")
     109              except OSError:
     110                  # Python's socket module was compiled using modern headers
     111                  # thus defining SO_REUSEPORT but this process is running
     112                  # under an older kernel that does not support SO_REUSEPORT.
     113                  pass
     114          if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
     115              sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
     116  
     117      sock.bind((host, 0))
     118      port = sock.getsockname()[1]
     119      return port
     120  
     121  def bind_unix_socket(sock, addr):
     122      """Bind a unix socket, raising SkipTest if PermissionError is raised."""
     123      assert sock.family == socket.AF_UNIX
     124      try:
     125          sock.bind(addr)
     126      except PermissionError:
     127          sock.close()
     128          raise unittest.SkipTest('cannot bind AF_UNIX sockets')
     129  
     130  def _is_ipv6_enabled():
     131      """Check whether IPv6 is enabled on this host."""
     132      if socket.has_ipv6:
     133          sock = None
     134          try:
     135              sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
     136              sock.bind((HOSTv6, 0))
     137              return True
     138          except OSError:
     139              pass
     140          finally:
     141              if sock:
     142                  sock.close()
     143      return False
     144  
     145  IPV6_ENABLED = _is_ipv6_enabled()
     146  
     147  
     148  _bind_nix_socket_error = None
     149  def skip_unless_bind_unix_socket(test):
     150      """Decorator for tests requiring a functional bind() for unix sockets."""
     151      if not hasattr(socket, 'AF_UNIX'):
     152          return unittest.skip('No UNIX Sockets')(test)
     153      global _bind_nix_socket_error
     154      if _bind_nix_socket_error is None:
     155          from .os_helper import TESTFN, unlink
     156          path = TESTFN + "can_bind_unix_socket"
     157          with socket.socket(socket.AF_UNIX) as sock:
     158              try:
     159                  sock.bind(path)
     160                  _bind_nix_socket_error = False
     161              except OSError as e:
     162                  _bind_nix_socket_error = e
     163              finally:
     164                  unlink(path)
     165      if _bind_nix_socket_error:
     166          msg = 'Requires a functional unix bind(): %s' % _bind_nix_socket_error
     167          return unittest.skip(msg)(test)
     168      else:
     169          return test
     170  
     171  
     172  def get_socket_conn_refused_errs():
     173      """
     174      Get the different socket error numbers ('errno') which can be received
     175      when a connection is refused.
     176      """
     177      errors = [errno.ECONNREFUSED]
     178      if hasattr(errno, 'ENETUNREACH'):
     179          # On Solaris, ENETUNREACH is returned sometimes instead of ECONNREFUSED
     180          errors.append(errno.ENETUNREACH)
     181      if hasattr(errno, 'EADDRNOTAVAIL'):
     182          # bpo-31910: socket.create_connection() fails randomly
     183          # with EADDRNOTAVAIL on Travis CI
     184          errors.append(errno.EADDRNOTAVAIL)
     185      if hasattr(errno, 'EHOSTUNREACH'):
     186          # bpo-37583: The destination host cannot be reached
     187          errors.append(errno.EHOSTUNREACH)
     188      if not IPV6_ENABLED:
     189          errors.append(errno.EAFNOSUPPORT)
     190      return errors
     191  
     192  
     193  _NOT_SET = object()
     194  
     195  @contextlib.contextmanager
     196  def transient_internet(resource_name, *, timeout=_NOT_SET, errnos=()):
     197      """Return a context manager that raises ResourceDenied when various issues
     198      with the internet connection manifest themselves as exceptions."""
     199      nntplib = warnings_helper.import_deprecated("nntplib")
     200      import urllib.error
     201      if timeout is _NOT_SET:
     202          timeout = support.INTERNET_TIMEOUT
     203  
     204      default_errnos = [
     205          ('ECONNREFUSED', 111),
     206          ('ECONNRESET', 104),
     207          ('EHOSTUNREACH', 113),
     208          ('ENETUNREACH', 101),
     209          ('ETIMEDOUT', 110),
     210          # socket.create_connection() fails randomly with
     211          # EADDRNOTAVAIL on Travis CI.
     212          ('EADDRNOTAVAIL', 99),
     213      ]
     214      default_gai_errnos = [
     215          ('EAI_AGAIN', -3),
     216          ('EAI_FAIL', -4),
     217          ('EAI_NONAME', -2),
     218          ('EAI_NODATA', -5),
     219          # Encountered when trying to resolve IPv6-only hostnames
     220          ('WSANO_DATA', 11004),
     221      ]
     222  
     223      denied = support.ResourceDenied("Resource %r is not available" % resource_name)
     224      captured_errnos = errnos
     225      gai_errnos = []
     226      if not captured_errnos:
     227          captured_errnos = [getattr(errno, name, num)
     228                             for (name, num) in default_errnos]
     229          gai_errnos = [getattr(socket, name, num)
     230                        for (name, num) in default_gai_errnos]
     231  
     232      def filter_error(err):
     233          n = getattr(err, 'errno', None)
     234          if (isinstance(err, TimeoutError) or
     235              (isinstance(err, socket.gaierror) and n in gai_errnos) or
     236              (isinstance(err, urllib.error.HTTPError) and
     237               500 <= err.code <= 599) or
     238              (isinstance(err, urllib.error.URLError) and
     239                   (("ConnectionRefusedError" in err.reason) or
     240                    ("TimeoutError" in err.reason) or
     241                    ("EOFError" in err.reason))) or
     242              n in captured_errnos):
     243              if not support.verbose:
     244                  sys.stderr.write(denied.args[0] + "\n")
     245              raise denied from err
     246  
     247      old_timeout = socket.getdefaulttimeout()
     248      try:
     249          if timeout is not None:
     250              socket.setdefaulttimeout(timeout)
     251          yield
     252      except nntplib.NNTPTemporaryError as err:
     253          if support.verbose:
     254              sys.stderr.write(denied.args[0] + "\n")
     255          raise denied from err
     256      except OSError as err:
     257          # urllib can wrap original socket errors multiple times (!), we must
     258          # unwrap to get at the original error.
     259          while True:
     260              a = err.args
     261              if len(a) >= 1 and isinstance(a[0], OSError):
     262                  err = a[0]
     263              # The error can also be wrapped as args[1]:
     264              #    except socket.error as msg:
     265              #        raise OSError('socket error', msg) from msg
     266              elif len(a) >= 2 and isinstance(a[1], OSError):
     267                  err = a[1]
     268              else:
     269                  break
     270          filter_error(err)
     271          raise
     272      # XXX should we catch generic exceptions and look for their
     273      # __cause__ or __context__?
     274      finally:
     275          socket.setdefaulttimeout(old_timeout)
     276  
     277  
     278  def create_unix_domain_name():
     279      """
     280      Create a UNIX domain name: socket.bind() argument of a AF_UNIX socket.
     281  
     282      Return a path relative to the current directory to get a short path
     283      (around 27 ASCII characters).
     284      """
     285      return tempfile.mktemp(prefix="test_python_", suffix='.sock',
     286                             dir=os.path.curdir)
     287  
     288  
     289  # consider that sysctl values should not change while tests are running
     290  _sysctl_cache = {}
     291  
     292  def _get_sysctl(name):
     293      """Get a sysctl value as an integer."""
     294      try:
     295          return _sysctl_cache[name]
     296      except KeyError:
     297          pass
     298  
     299      # At least Linux and FreeBSD support the "-n" option
     300      cmd = ['sysctl', '-n', name]
     301      proc = subprocess.run(cmd,
     302                            stdout=subprocess.PIPE,
     303                            stderr=subprocess.STDOUT,
     304                            text=True)
     305      if proc.returncode:
     306          support.print_warning(f'{' '.join(cmd)!r} command failed with '
     307                                f'exit code {proc.returncode}')
     308          # cache the error to only log the warning once
     309          _sysctl_cache[name] = None
     310          return None
     311      output = proc.stdout
     312  
     313      # Parse '0\n' to get '0'
     314      try:
     315          value = int(output.strip())
     316      except Exception as exc:
     317          support.print_warning(f'Failed to parse {' '.join(cmd)!r} '
     318                                f'command output {output!r}: {exc!r}')
     319          # cache the error to only log the warning once
     320          _sysctl_cache[name] = None
     321          return None
     322  
     323      _sysctl_cache[name] = value
     324      return value
     325  
     326  
     327  def tcp_blackhole():
     328      if not sys.platform.startswith('freebsd'):
     329          return False
     330  
     331      # gh-109015: test if FreeBSD TCP blackhole is enabled
     332      value = _get_sysctl('net.inet.tcp.blackhole')
     333      if value is None:
     334          # don't skip if we fail to get the sysctl value
     335          return False
     336      return (value != 0)
     337  
     338  
     339  def skip_if_tcp_blackhole(test):
     340      """Decorator skipping test if TCP blackhole is enabled."""
     341      skip_if = unittest.skipIf(
     342          tcp_blackhole(),
     343          "TCP blackhole is enabled (sysctl net.inet.tcp.blackhole)"
     344      )
     345      return skip_if(test)