python (3.11.7)
       1  from __future__ import absolute_import
       2  
       3  import datetime
       4  import logging
       5  import os
       6  import re
       7  import socket
       8  import warnings
       9  from socket import error as SocketError
      10  from socket import timeout as SocketTimeout
      11  
      12  from .packages import six
      13  from .packages.six.moves.http_client import HTTPConnection as _HTTPConnection
      14  from .packages.six.moves.http_client import HTTPException  # noqa: F401
      15  from .util.proxy import create_proxy_ssl_context
      16  
      17  try:  # Compiled with SSL?
      18      import ssl
      19  
      20      BaseSSLError = ssl.SSLError
      21  except (ImportError, AttributeError):  # Platform-specific: No SSL.
      22      ssl = None
      23  
      24      class ESC[4;38;5;81mBaseSSLError(ESC[4;38;5;149mBaseException):
      25          pass
      26  
      27  
      28  try:
      29      # Python 3: not a no-op, we're adding this to the namespace so it can be imported.
      30      ConnectionError = ConnectionError
      31  except NameError:
      32      # Python 2
      33      class ESC[4;38;5;81mConnectionError(ESC[4;38;5;149mException):
      34          pass
      35  
      36  
      37  try:  # Python 3:
      38      # Not a no-op, we're adding this to the namespace so it can be imported.
      39      BrokenPipeError = BrokenPipeError
      40  except NameError:  # Python 2:
      41  
      42      class ESC[4;38;5;81mBrokenPipeError(ESC[4;38;5;149mException):
      43          pass
      44  
      45  
      46  from ._collections import HTTPHeaderDict  # noqa (historical, removed in v2)
      47  from ._version import __version__
      48  from .exceptions import (
      49      ConnectTimeoutError,
      50      NewConnectionError,
      51      SubjectAltNameWarning,
      52      SystemTimeWarning,
      53  )
      54  from .util import SKIP_HEADER, SKIPPABLE_HEADERS, connection
      55  from .util.ssl_ import (
      56      assert_fingerprint,
      57      create_urllib3_context,
      58      is_ipaddress,
      59      resolve_cert_reqs,
      60      resolve_ssl_version,
      61      ssl_wrap_socket,
      62  )
      63  from .util.ssl_match_hostname import CertificateError, match_hostname
      64  
      65  log = logging.getLogger(__name__)
      66  
      67  port_by_scheme = {"http": 80, "https": 443}
      68  
      69  # When it comes time to update this value as a part of regular maintenance
      70  # (ie test_recent_date is failing) update it to ~6 months before the current date.
      71  RECENT_DATE = datetime.date(2022, 1, 1)
      72  
      73  _CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]")
      74  
      75  
      76  class ESC[4;38;5;81mHTTPConnection(ESC[4;38;5;149m_HTTPConnection, ESC[4;38;5;149mobject):
      77      """
      78      Based on :class:`http.client.HTTPConnection` but provides an extra constructor
      79      backwards-compatibility layer between older and newer Pythons.
      80  
      81      Additional keyword parameters are used to configure attributes of the connection.
      82      Accepted parameters include:
      83  
      84      - ``strict``: See the documentation on :class:`urllib3.connectionpool.HTTPConnectionPool`
      85      - ``source_address``: Set the source address for the current connection.
      86      - ``socket_options``: Set specific options on the underlying socket. If not specified, then
      87        defaults are loaded from ``HTTPConnection.default_socket_options`` which includes disabling
      88        Nagle's algorithm (sets TCP_NODELAY to 1) unless the connection is behind a proxy.
      89  
      90        For example, if you wish to enable TCP Keep Alive in addition to the defaults,
      91        you might pass:
      92  
      93        .. code-block:: python
      94  
      95           HTTPConnection.default_socket_options + [
      96               (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
      97           ]
      98  
      99        Or you may want to disable the defaults by passing an empty list (e.g., ``[]``).
     100      """
     101  
     102      default_port = port_by_scheme["http"]
     103  
     104      #: Disable Nagle's algorithm by default.
     105      #: ``[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]``
     106      default_socket_options = [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
     107  
     108      #: Whether this connection verifies the host's certificate.
     109      is_verified = False
     110  
     111      #: Whether this proxy connection (if used) verifies the proxy host's
     112      #: certificate.
     113      proxy_is_verified = None
     114  
     115      def __init__(self, *args, **kw):
     116          if not six.PY2:
     117              kw.pop("strict", None)
     118  
     119          # Pre-set source_address.
     120          self.source_address = kw.get("source_address")
     121  
     122          #: The socket options provided by the user. If no options are
     123          #: provided, we use the default options.
     124          self.socket_options = kw.pop("socket_options", self.default_socket_options)
     125  
     126          # Proxy options provided by the user.
     127          self.proxy = kw.pop("proxy", None)
     128          self.proxy_config = kw.pop("proxy_config", None)
     129  
     130          _HTTPConnection.__init__(self, *args, **kw)
     131  
     132      @property
     133      def host(self):
     134          """
     135          Getter method to remove any trailing dots that indicate the hostname is an FQDN.
     136  
     137          In general, SSL certificates don't include the trailing dot indicating a
     138          fully-qualified domain name, and thus, they don't validate properly when
     139          checked against a domain name that includes the dot. In addition, some
     140          servers may not expect to receive the trailing dot when provided.
     141  
     142          However, the hostname with trailing dot is critical to DNS resolution; doing a
     143          lookup with the trailing dot will properly only resolve the appropriate FQDN,
     144          whereas a lookup without a trailing dot will search the system's search domain
     145          list. Thus, it's important to keep the original host around for use only in
     146          those cases where it's appropriate (i.e., when doing DNS lookup to establish the
     147          actual TCP connection across which we're going to send HTTP requests).
     148          """
     149          return self._dns_host.rstrip(".")
     150  
     151      @host.setter
     152      def host(self, value):
     153          """
     154          Setter for the `host` property.
     155  
     156          We assume that only urllib3 uses the _dns_host attribute; httplib itself
     157          only uses `host`, and it seems reasonable that other libraries follow suit.
     158          """
     159          self._dns_host = value
     160  
     161      def _new_conn(self):
     162          """Establish a socket connection and set nodelay settings on it.
     163  
     164          :return: New socket connection.
     165          """
     166          extra_kw = {}
     167          if self.source_address:
     168              extra_kw["source_address"] = self.source_address
     169  
     170          if self.socket_options:
     171              extra_kw["socket_options"] = self.socket_options
     172  
     173          try:
     174              conn = connection.create_connection(
     175                  (self._dns_host, self.port), self.timeout, **extra_kw
     176              )
     177  
     178          except SocketTimeout:
     179              raise ConnectTimeoutError(
     180                  self,
     181                  "Connection to %s timed out. (connect timeout=%s)"
     182                  % (self.host, self.timeout),
     183              )
     184  
     185          except SocketError as e:
     186              raise NewConnectionError(
     187                  self, "Failed to establish a new connection: %s" % e
     188              )
     189  
     190          return conn
     191  
     192      def _is_using_tunnel(self):
     193          # Google App Engine's httplib does not define _tunnel_host
     194          return getattr(self, "_tunnel_host", None)
     195  
     196      def _prepare_conn(self, conn):
     197          self.sock = conn
     198          if self._is_using_tunnel():
     199              # TODO: Fix tunnel so it doesn't depend on self.sock state.
     200              self._tunnel()
     201              # Mark this connection as not reusable
     202              self.auto_open = 0
     203  
     204      def connect(self):
     205          conn = self._new_conn()
     206          self._prepare_conn(conn)
     207  
     208      def putrequest(self, method, url, *args, **kwargs):
     209          """ """
     210          # Empty docstring because the indentation of CPython's implementation
     211          # is broken but we don't want this method in our documentation.
     212          match = _CONTAINS_CONTROL_CHAR_RE.search(method)
     213          if match:
     214              raise ValueError(
     215                  "Method cannot contain non-token characters %r (found at least %r)"
     216                  % (method, match.group())
     217              )
     218  
     219          return _HTTPConnection.putrequest(self, method, url, *args, **kwargs)
     220  
     221      def putheader(self, header, *values):
     222          """ """
     223          if not any(isinstance(v, str) and v == SKIP_HEADER for v in values):
     224              _HTTPConnection.putheader(self, header, *values)
     225          elif six.ensure_str(header.lower()) not in SKIPPABLE_HEADERS:
     226              raise ValueError(
     227                  "urllib3.util.SKIP_HEADER only supports '%s'"
     228                  % ("', '".join(map(str.title, sorted(SKIPPABLE_HEADERS))),)
     229              )
     230  
     231      def request(self, method, url, body=None, headers=None):
     232          # Update the inner socket's timeout value to send the request.
     233          # This only triggers if the connection is re-used.
     234          if getattr(self, "sock", None) is not None:
     235              self.sock.settimeout(self.timeout)
     236  
     237          if headers is None:
     238              headers = {}
     239          else:
     240              # Avoid modifying the headers passed into .request()
     241              headers = headers.copy()
     242          if "user-agent" not in (six.ensure_str(k.lower()) for k in headers):
     243              headers["User-Agent"] = _get_default_user_agent()
     244          super(HTTPConnection, self).request(method, url, body=body, headers=headers)
     245  
     246      def request_chunked(self, method, url, body=None, headers=None):
     247          """
     248          Alternative to the common request method, which sends the
     249          body with chunked encoding and not as one block
     250          """
     251          headers = headers or {}
     252          header_keys = set([six.ensure_str(k.lower()) for k in headers])
     253          skip_accept_encoding = "accept-encoding" in header_keys
     254          skip_host = "host" in header_keys
     255          self.putrequest(
     256              method, url, skip_accept_encoding=skip_accept_encoding, skip_host=skip_host
     257          )
     258          if "user-agent" not in header_keys:
     259              self.putheader("User-Agent", _get_default_user_agent())
     260          for header, value in headers.items():
     261              self.putheader(header, value)
     262          if "transfer-encoding" not in header_keys:
     263              self.putheader("Transfer-Encoding", "chunked")
     264          self.endheaders()
     265  
     266          if body is not None:
     267              stringish_types = six.string_types + (bytes,)
     268              if isinstance(body, stringish_types):
     269                  body = (body,)
     270              for chunk in body:
     271                  if not chunk:
     272                      continue
     273                  if not isinstance(chunk, bytes):
     274                      chunk = chunk.encode("utf8")
     275                  len_str = hex(len(chunk))[2:]
     276                  to_send = bytearray(len_str.encode())
     277                  to_send += b"\r\n"
     278                  to_send += chunk
     279                  to_send += b"\r\n"
     280                  self.send(to_send)
     281  
     282          # After the if clause, to always have a closed body
     283          self.send(b"0\r\n\r\n")
     284  
     285  
     286  class ESC[4;38;5;81mHTTPSConnection(ESC[4;38;5;149mHTTPConnection):
     287      """
     288      Many of the parameters to this constructor are passed to the underlying SSL
     289      socket by means of :py:func:`urllib3.util.ssl_wrap_socket`.
     290      """
     291  
     292      default_port = port_by_scheme["https"]
     293  
     294      cert_reqs = None
     295      ca_certs = None
     296      ca_cert_dir = None
     297      ca_cert_data = None
     298      ssl_version = None
     299      assert_fingerprint = None
     300      tls_in_tls_required = False
     301  
     302      def __init__(
     303          self,
     304          host,
     305          port=None,
     306          key_file=None,
     307          cert_file=None,
     308          key_password=None,
     309          strict=None,
     310          timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
     311          ssl_context=None,
     312          server_hostname=None,
     313          **kw
     314      ):
     315  
     316          HTTPConnection.__init__(self, host, port, strict=strict, timeout=timeout, **kw)
     317  
     318          self.key_file = key_file
     319          self.cert_file = cert_file
     320          self.key_password = key_password
     321          self.ssl_context = ssl_context
     322          self.server_hostname = server_hostname
     323  
     324          # Required property for Google AppEngine 1.9.0 which otherwise causes
     325          # HTTPS requests to go out as HTTP. (See Issue #356)
     326          self._protocol = "https"
     327  
     328      def set_cert(
     329          self,
     330          key_file=None,
     331          cert_file=None,
     332          cert_reqs=None,
     333          key_password=None,
     334          ca_certs=None,
     335          assert_hostname=None,
     336          assert_fingerprint=None,
     337          ca_cert_dir=None,
     338          ca_cert_data=None,
     339      ):
     340          """
     341          This method should only be called once, before the connection is used.
     342          """
     343          # If cert_reqs is not provided we'll assume CERT_REQUIRED unless we also
     344          # have an SSLContext object in which case we'll use its verify_mode.
     345          if cert_reqs is None:
     346              if self.ssl_context is not None:
     347                  cert_reqs = self.ssl_context.verify_mode
     348              else:
     349                  cert_reqs = resolve_cert_reqs(None)
     350  
     351          self.key_file = key_file
     352          self.cert_file = cert_file
     353          self.cert_reqs = cert_reqs
     354          self.key_password = key_password
     355          self.assert_hostname = assert_hostname
     356          self.assert_fingerprint = assert_fingerprint
     357          self.ca_certs = ca_certs and os.path.expanduser(ca_certs)
     358          self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir)
     359          self.ca_cert_data = ca_cert_data
     360  
     361      def connect(self):
     362          # Add certificate verification
     363          self.sock = conn = self._new_conn()
     364          hostname = self.host
     365          tls_in_tls = False
     366  
     367          if self._is_using_tunnel():
     368              if self.tls_in_tls_required:
     369                  self.sock = conn = self._connect_tls_proxy(hostname, conn)
     370                  tls_in_tls = True
     371  
     372              # Calls self._set_hostport(), so self.host is
     373              # self._tunnel_host below.
     374              self._tunnel()
     375              # Mark this connection as not reusable
     376              self.auto_open = 0
     377  
     378              # Override the host with the one we're requesting data from.
     379              hostname = self._tunnel_host
     380  
     381          server_hostname = hostname
     382          if self.server_hostname is not None:
     383              server_hostname = self.server_hostname
     384  
     385          is_time_off = datetime.date.today() < RECENT_DATE
     386          if is_time_off:
     387              warnings.warn(
     388                  (
     389                      "System time is way off (before {0}). This will probably "
     390                      "lead to SSL verification errors"
     391                  ).format(RECENT_DATE),
     392                  SystemTimeWarning,
     393              )
     394  
     395          # Wrap socket using verification with the root certs in
     396          # trusted_root_certs
     397          default_ssl_context = False
     398          if self.ssl_context is None:
     399              default_ssl_context = True
     400              self.ssl_context = create_urllib3_context(
     401                  ssl_version=resolve_ssl_version(self.ssl_version),
     402                  cert_reqs=resolve_cert_reqs(self.cert_reqs),
     403              )
     404  
     405          context = self.ssl_context
     406          context.verify_mode = resolve_cert_reqs(self.cert_reqs)
     407  
     408          # Try to load OS default certs if none are given.
     409          # Works well on Windows (requires Python3.4+)
     410          if (
     411              not self.ca_certs
     412              and not self.ca_cert_dir
     413              and not self.ca_cert_data
     414              and default_ssl_context
     415              and hasattr(context, "load_default_certs")
     416          ):
     417              context.load_default_certs()
     418  
     419          self.sock = ssl_wrap_socket(
     420              sock=conn,
     421              keyfile=self.key_file,
     422              certfile=self.cert_file,
     423              key_password=self.key_password,
     424              ca_certs=self.ca_certs,
     425              ca_cert_dir=self.ca_cert_dir,
     426              ca_cert_data=self.ca_cert_data,
     427              server_hostname=server_hostname,
     428              ssl_context=context,
     429              tls_in_tls=tls_in_tls,
     430          )
     431  
     432          # If we're using all defaults and the connection
     433          # is TLSv1 or TLSv1.1 we throw a DeprecationWarning
     434          # for the host.
     435          if (
     436              default_ssl_context
     437              and self.ssl_version is None
     438              and hasattr(self.sock, "version")
     439              and self.sock.version() in {"TLSv1", "TLSv1.1"}
     440          ):
     441              warnings.warn(
     442                  "Negotiating TLSv1/TLSv1.1 by default is deprecated "
     443                  "and will be disabled in urllib3 v2.0.0. Connecting to "
     444                  "'%s' with '%s' can be enabled by explicitly opting-in "
     445                  "with 'ssl_version'" % (self.host, self.sock.version()),
     446                  DeprecationWarning,
     447              )
     448  
     449          if self.assert_fingerprint:
     450              assert_fingerprint(
     451                  self.sock.getpeercert(binary_form=True), self.assert_fingerprint
     452              )
     453          elif (
     454              context.verify_mode != ssl.CERT_NONE
     455              and not getattr(context, "check_hostname", False)
     456              and self.assert_hostname is not False
     457          ):
     458              # While urllib3 attempts to always turn off hostname matching from
     459              # the TLS library, this cannot always be done. So we check whether
     460              # the TLS Library still thinks it's matching hostnames.
     461              cert = self.sock.getpeercert()
     462              if not cert.get("subjectAltName", ()):
     463                  warnings.warn(
     464                      (
     465                          "Certificate for {0} has no `subjectAltName`, falling back to check for a "
     466                          "`commonName` for now. This feature is being removed by major browsers and "
     467                          "deprecated by RFC 2818. (See https://github.com/urllib3/urllib3/issues/497 "
     468                          "for details.)".format(hostname)
     469                      ),
     470                      SubjectAltNameWarning,
     471                  )
     472              _match_hostname(cert, self.assert_hostname or server_hostname)
     473  
     474          self.is_verified = (
     475              context.verify_mode == ssl.CERT_REQUIRED
     476              or self.assert_fingerprint is not None
     477          )
     478  
     479      def _connect_tls_proxy(self, hostname, conn):
     480          """
     481          Establish a TLS connection to the proxy using the provided SSL context.
     482          """
     483          proxy_config = self.proxy_config
     484          ssl_context = proxy_config.ssl_context
     485          if ssl_context:
     486              # If the user provided a proxy context, we assume CA and client
     487              # certificates have already been set
     488              return ssl_wrap_socket(
     489                  sock=conn,
     490                  server_hostname=hostname,
     491                  ssl_context=ssl_context,
     492              )
     493  
     494          ssl_context = create_proxy_ssl_context(
     495              self.ssl_version,
     496              self.cert_reqs,
     497              self.ca_certs,
     498              self.ca_cert_dir,
     499              self.ca_cert_data,
     500          )
     501  
     502          # If no cert was provided, use only the default options for server
     503          # certificate validation
     504          socket = ssl_wrap_socket(
     505              sock=conn,
     506              ca_certs=self.ca_certs,
     507              ca_cert_dir=self.ca_cert_dir,
     508              ca_cert_data=self.ca_cert_data,
     509              server_hostname=hostname,
     510              ssl_context=ssl_context,
     511          )
     512  
     513          if ssl_context.verify_mode != ssl.CERT_NONE and not getattr(
     514              ssl_context, "check_hostname", False
     515          ):
     516              # While urllib3 attempts to always turn off hostname matching from
     517              # the TLS library, this cannot always be done. So we check whether
     518              # the TLS Library still thinks it's matching hostnames.
     519              cert = socket.getpeercert()
     520              if not cert.get("subjectAltName", ()):
     521                  warnings.warn(
     522                      (
     523                          "Certificate for {0} has no `subjectAltName`, falling back to check for a "
     524                          "`commonName` for now. This feature is being removed by major browsers and "
     525                          "deprecated by RFC 2818. (See https://github.com/urllib3/urllib3/issues/497 "
     526                          "for details.)".format(hostname)
     527                      ),
     528                      SubjectAltNameWarning,
     529                  )
     530              _match_hostname(cert, hostname)
     531  
     532          self.proxy_is_verified = ssl_context.verify_mode == ssl.CERT_REQUIRED
     533          return socket
     534  
     535  
     536  def _match_hostname(cert, asserted_hostname):
     537      # Our upstream implementation of ssl.match_hostname()
     538      # only applies this normalization to IP addresses so it doesn't
     539      # match DNS SANs so we do the same thing!
     540      stripped_hostname = asserted_hostname.strip("u[]")
     541      if is_ipaddress(stripped_hostname):
     542          asserted_hostname = stripped_hostname
     543  
     544      try:
     545          match_hostname(cert, asserted_hostname)
     546      except CertificateError as e:
     547          log.warning(
     548              "Certificate did not match expected hostname: %s. Certificate: %s",
     549              asserted_hostname,
     550              cert,
     551          )
     552          # Add cert to exception and reraise so client code can inspect
     553          # the cert when catching the exception, if they want to
     554          e._peer_cert = cert
     555          raise
     556  
     557  
     558  def _get_default_user_agent():
     559      return "python-urllib3/%s" % __version__
     560  
     561  
     562  class ESC[4;38;5;81mDummyConnection(ESC[4;38;5;149mobject):
     563      """Used to detect a failed ConnectionCls import."""
     564  
     565      pass
     566  
     567  
     568  if not ssl:
     569      HTTPSConnection = DummyConnection  # noqa: F811
     570  
     571  
     572  VerifiedHTTPSConnection = HTTPSConnection