python (3.11.7)

(root)/
lib/
python3.11/
site-packages/
pip/
_vendor/
requests/
utils.py
       1  """
       2  requests.utils
       3  ~~~~~~~~~~~~~~
       4  
       5  This module provides utility functions that are used within Requests
       6  that are also useful for external consumption.
       7  """
       8  
       9  import codecs
      10  import contextlib
      11  import io
      12  import os
      13  import re
      14  import socket
      15  import struct
      16  import sys
      17  import tempfile
      18  import warnings
      19  import zipfile
      20  from collections import OrderedDict
      21  
      22  from pip._vendor.urllib3.util import make_headers, parse_url
      23  
      24  from . import certs
      25  from .__version__ import __version__
      26  
      27  # to_native_string is unused here, but imported here for backwards compatibility
      28  from ._internal_utils import (  # noqa: F401
      29      _HEADER_VALIDATORS_BYTE,
      30      _HEADER_VALIDATORS_STR,
      31      HEADER_VALIDATORS,
      32      to_native_string,
      33  )
      34  from .compat import (
      35      Mapping,
      36      basestring,
      37      bytes,
      38      getproxies,
      39      getproxies_environment,
      40      integer_types,
      41  )
      42  from .compat import parse_http_list as _parse_list_header
      43  from .compat import (
      44      proxy_bypass,
      45      proxy_bypass_environment,
      46      quote,
      47      str,
      48      unquote,
      49      urlparse,
      50      urlunparse,
      51  )
      52  from .cookies import cookiejar_from_dict
      53  from .exceptions import (
      54      FileModeWarning,
      55      InvalidHeader,
      56      InvalidURL,
      57      UnrewindableBodyError,
      58  )
      59  from .structures import CaseInsensitiveDict
      60  
      61  NETRC_FILES = (".netrc", "_netrc")
      62  
      63  DEFAULT_CA_BUNDLE_PATH = certs.where()
      64  
      65  DEFAULT_PORTS = {"http": 80, "https": 443}
      66  
      67  # Ensure that ', ' is used to preserve previous delimiter behavior.
      68  DEFAULT_ACCEPT_ENCODING = ", ".join(
      69      re.split(r",\s*", make_headers(accept_encoding=True)["accept-encoding"])
      70  )
      71  
      72  
      73  if sys.platform == "win32":
      74      # provide a proxy_bypass version on Windows without DNS lookups
      75  
      76      def proxy_bypass_registry(host):
      77          try:
      78              import winreg
      79          except ImportError:
      80              return False
      81  
      82          try:
      83              internetSettings = winreg.OpenKey(
      84                  winreg.HKEY_CURRENT_USER,
      85                  r"Software\Microsoft\Windows\CurrentVersion\Internet Settings",
      86              )
      87              # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it
      88              proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0])
      89              # ProxyOverride is almost always a string
      90              proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0]
      91          except (OSError, ValueError):
      92              return False
      93          if not proxyEnable or not proxyOverride:
      94              return False
      95  
      96          # make a check value list from the registry entry: replace the
      97          # '<local>' string by the localhost entry and the corresponding
      98          # canonical entry.
      99          proxyOverride = proxyOverride.split(";")
     100          # now check if we match one of the registry values.
     101          for test in proxyOverride:
     102              if test == "<local>":
     103                  if "." not in host:
     104                      return True
     105              test = test.replace(".", r"\.")  # mask dots
     106              test = test.replace("*", r".*")  # change glob sequence
     107              test = test.replace("?", r".")  # change glob char
     108              if re.match(test, host, re.I):
     109                  return True
     110          return False
     111  
     112      def proxy_bypass(host):  # noqa
     113          """Return True, if the host should be bypassed.
     114  
     115          Checks proxy settings gathered from the environment, if specified,
     116          or the registry.
     117          """
     118          if getproxies_environment():
     119              return proxy_bypass_environment(host)
     120          else:
     121              return proxy_bypass_registry(host)
     122  
     123  
     124  def dict_to_sequence(d):
     125      """Returns an internal sequence dictionary update."""
     126  
     127      if hasattr(d, "items"):
     128          d = d.items()
     129  
     130      return d
     131  
     132  
     133  def super_len(o):
     134      total_length = None
     135      current_position = 0
     136  
     137      if hasattr(o, "__len__"):
     138          total_length = len(o)
     139  
     140      elif hasattr(o, "len"):
     141          total_length = o.len
     142  
     143      elif hasattr(o, "fileno"):
     144          try:
     145              fileno = o.fileno()
     146          except (io.UnsupportedOperation, AttributeError):
     147              # AttributeError is a surprising exception, seeing as how we've just checked
     148              # that `hasattr(o, 'fileno')`.  It happens for objects obtained via
     149              # `Tarfile.extractfile()`, per issue 5229.
     150              pass
     151          else:
     152              total_length = os.fstat(fileno).st_size
     153  
     154              # Having used fstat to determine the file length, we need to
     155              # confirm that this file was opened up in binary mode.
     156              if "b" not in o.mode:
     157                  warnings.warn(
     158                      (
     159                          "Requests has determined the content-length for this "
     160                          "request using the binary size of the file: however, the "
     161                          "file has been opened in text mode (i.e. without the 'b' "
     162                          "flag in the mode). This may lead to an incorrect "
     163                          "content-length. In Requests 3.0, support will be removed "
     164                          "for files in text mode."
     165                      ),
     166                      FileModeWarning,
     167                  )
     168  
     169      if hasattr(o, "tell"):
     170          try:
     171              current_position = o.tell()
     172          except OSError:
     173              # This can happen in some weird situations, such as when the file
     174              # is actually a special file descriptor like stdin. In this
     175              # instance, we don't know what the length is, so set it to zero and
     176              # let requests chunk it instead.
     177              if total_length is not None:
     178                  current_position = total_length
     179          else:
     180              if hasattr(o, "seek") and total_length is None:
     181                  # StringIO and BytesIO have seek but no usable fileno
     182                  try:
     183                      # seek to end of file
     184                      o.seek(0, 2)
     185                      total_length = o.tell()
     186  
     187                      # seek back to current position to support
     188                      # partially read file-like objects
     189                      o.seek(current_position or 0)
     190                  except OSError:
     191                      total_length = 0
     192  
     193      if total_length is None:
     194          total_length = 0
     195  
     196      return max(0, total_length - current_position)
     197  
     198  
     199  def get_netrc_auth(url, raise_errors=False):
     200      """Returns the Requests tuple auth for a given url from netrc."""
     201  
     202      netrc_file = os.environ.get("NETRC")
     203      if netrc_file is not None:
     204          netrc_locations = (netrc_file,)
     205      else:
     206          netrc_locations = (f"~/{f}" for f in NETRC_FILES)
     207  
     208      try:
     209          from netrc import NetrcParseError, netrc
     210  
     211          netrc_path = None
     212  
     213          for f in netrc_locations:
     214              try:
     215                  loc = os.path.expanduser(f)
     216              except KeyError:
     217                  # os.path.expanduser can fail when $HOME is undefined and
     218                  # getpwuid fails. See https://bugs.python.org/issue20164 &
     219                  # https://github.com/psf/requests/issues/1846
     220                  return
     221  
     222              if os.path.exists(loc):
     223                  netrc_path = loc
     224                  break
     225  
     226          # Abort early if there isn't one.
     227          if netrc_path is None:
     228              return
     229  
     230          ri = urlparse(url)
     231  
     232          # Strip port numbers from netloc. This weird `if...encode`` dance is
     233          # used for Python 3.2, which doesn't support unicode literals.
     234          splitstr = b":"
     235          if isinstance(url, str):
     236              splitstr = splitstr.decode("ascii")
     237          host = ri.netloc.split(splitstr)[0]
     238  
     239          try:
     240              _netrc = netrc(netrc_path).authenticators(host)
     241              if _netrc:
     242                  # Return with login / password
     243                  login_i = 0 if _netrc[0] else 1
     244                  return (_netrc[login_i], _netrc[2])
     245          except (NetrcParseError, OSError):
     246              # If there was a parsing error or a permissions issue reading the file,
     247              # we'll just skip netrc auth unless explicitly asked to raise errors.
     248              if raise_errors:
     249                  raise
     250  
     251      # App Engine hackiness.
     252      except (ImportError, AttributeError):
     253          pass
     254  
     255  
     256  def guess_filename(obj):
     257      """Tries to guess the filename of the given object."""
     258      name = getattr(obj, "name", None)
     259      if name and isinstance(name, basestring) and name[0] != "<" and name[-1] != ">":
     260          return os.path.basename(name)
     261  
     262  
     263  def extract_zipped_paths(path):
     264      """Replace nonexistent paths that look like they refer to a member of a zip
     265      archive with the location of an extracted copy of the target, or else
     266      just return the provided path unchanged.
     267      """
     268      if os.path.exists(path):
     269          # this is already a valid path, no need to do anything further
     270          return path
     271  
     272      # find the first valid part of the provided path and treat that as a zip archive
     273      # assume the rest of the path is the name of a member in the archive
     274      archive, member = os.path.split(path)
     275      while archive and not os.path.exists(archive):
     276          archive, prefix = os.path.split(archive)
     277          if not prefix:
     278              # If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split),
     279              # we _can_ end up in an infinite loop on a rare corner case affecting a small number of users
     280              break
     281          member = "/".join([prefix, member])
     282  
     283      if not zipfile.is_zipfile(archive):
     284          return path
     285  
     286      zip_file = zipfile.ZipFile(archive)
     287      if member not in zip_file.namelist():
     288          return path
     289  
     290      # we have a valid zip archive and a valid member of that archive
     291      tmp = tempfile.gettempdir()
     292      extracted_path = os.path.join(tmp, member.split("/")[-1])
     293      if not os.path.exists(extracted_path):
     294          # use read + write to avoid the creating nested folders, we only want the file, avoids mkdir racing condition
     295          with atomic_open(extracted_path) as file_handler:
     296              file_handler.write(zip_file.read(member))
     297      return extracted_path
     298  
     299  
     300  @contextlib.contextmanager
     301  def atomic_open(filename):
     302      """Write a file to the disk in an atomic fashion"""
     303      tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename))
     304      try:
     305          with os.fdopen(tmp_descriptor, "wb") as tmp_handler:
     306              yield tmp_handler
     307          os.replace(tmp_name, filename)
     308      except BaseException:
     309          os.remove(tmp_name)
     310          raise
     311  
     312  
     313  def from_key_val_list(value):
     314      """Take an object and test to see if it can be represented as a
     315      dictionary. Unless it can not be represented as such, return an
     316      OrderedDict, e.g.,
     317  
     318      ::
     319  
     320          >>> from_key_val_list([('key', 'val')])
     321          OrderedDict([('key', 'val')])
     322          >>> from_key_val_list('string')
     323          Traceback (most recent call last):
     324          ...
     325          ValueError: cannot encode objects that are not 2-tuples
     326          >>> from_key_val_list({'key': 'val'})
     327          OrderedDict([('key', 'val')])
     328  
     329      :rtype: OrderedDict
     330      """
     331      if value is None:
     332          return None
     333  
     334      if isinstance(value, (str, bytes, bool, int)):
     335          raise ValueError("cannot encode objects that are not 2-tuples")
     336  
     337      return OrderedDict(value)
     338  
     339  
     340  def to_key_val_list(value):
     341      """Take an object and test to see if it can be represented as a
     342      dictionary. If it can be, return a list of tuples, e.g.,
     343  
     344      ::
     345  
     346          >>> to_key_val_list([('key', 'val')])
     347          [('key', 'val')]
     348          >>> to_key_val_list({'key': 'val'})
     349          [('key', 'val')]
     350          >>> to_key_val_list('string')
     351          Traceback (most recent call last):
     352          ...
     353          ValueError: cannot encode objects that are not 2-tuples
     354  
     355      :rtype: list
     356      """
     357      if value is None:
     358          return None
     359  
     360      if isinstance(value, (str, bytes, bool, int)):
     361          raise ValueError("cannot encode objects that are not 2-tuples")
     362  
     363      if isinstance(value, Mapping):
     364          value = value.items()
     365  
     366      return list(value)
     367  
     368  
     369  # From mitsuhiko/werkzeug (used with permission).
     370  def parse_list_header(value):
     371      """Parse lists as described by RFC 2068 Section 2.
     372  
     373      In particular, parse comma-separated lists where the elements of
     374      the list may include quoted-strings.  A quoted-string could
     375      contain a comma.  A non-quoted string could have quotes in the
     376      middle.  Quotes are removed automatically after parsing.
     377  
     378      It basically works like :func:`parse_set_header` just that items
     379      may appear multiple times and case sensitivity is preserved.
     380  
     381      The return value is a standard :class:`list`:
     382  
     383      >>> parse_list_header('token, "quoted value"')
     384      ['token', 'quoted value']
     385  
     386      To create a header from the :class:`list` again, use the
     387      :func:`dump_header` function.
     388  
     389      :param value: a string with a list header.
     390      :return: :class:`list`
     391      :rtype: list
     392      """
     393      result = []
     394      for item in _parse_list_header(value):
     395          if item[:1] == item[-1:] == '"':
     396              item = unquote_header_value(item[1:-1])
     397          result.append(item)
     398      return result
     399  
     400  
     401  # From mitsuhiko/werkzeug (used with permission).
     402  def parse_dict_header(value):
     403      """Parse lists of key, value pairs as described by RFC 2068 Section 2 and
     404      convert them into a python dict:
     405  
     406      >>> d = parse_dict_header('foo="is a fish", bar="as well"')
     407      >>> type(d) is dict
     408      True
     409      >>> sorted(d.items())
     410      [('bar', 'as well'), ('foo', 'is a fish')]
     411  
     412      If there is no value for a key it will be `None`:
     413  
     414      >>> parse_dict_header('key_without_value')
     415      {'key_without_value': None}
     416  
     417      To create a header from the :class:`dict` again, use the
     418      :func:`dump_header` function.
     419  
     420      :param value: a string with a dict header.
     421      :return: :class:`dict`
     422      :rtype: dict
     423      """
     424      result = {}
     425      for item in _parse_list_header(value):
     426          if "=" not in item:
     427              result[item] = None
     428              continue
     429          name, value = item.split("=", 1)
     430          if value[:1] == value[-1:] == '"':
     431              value = unquote_header_value(value[1:-1])
     432          result[name] = value
     433      return result
     434  
     435  
     436  # From mitsuhiko/werkzeug (used with permission).
     437  def unquote_header_value(value, is_filename=False):
     438      r"""Unquotes a header value.  (Reversal of :func:`quote_header_value`).
     439      This does not use the real unquoting but what browsers are actually
     440      using for quoting.
     441  
     442      :param value: the header value to unquote.
     443      :rtype: str
     444      """
     445      if value and value[0] == value[-1] == '"':
     446          # this is not the real unquoting, but fixing this so that the
     447          # RFC is met will result in bugs with internet explorer and
     448          # probably some other browsers as well.  IE for example is
     449          # uploading files with "C:\foo\bar.txt" as filename
     450          value = value[1:-1]
     451  
     452          # if this is a filename and the starting characters look like
     453          # a UNC path, then just return the value without quotes.  Using the
     454          # replace sequence below on a UNC path has the effect of turning
     455          # the leading double slash into a single slash and then
     456          # _fix_ie_filename() doesn't work correctly.  See #458.
     457          if not is_filename or value[:2] != "\\\\":
     458              return value.replace("\\\\", "\\").replace('\\"', '"')
     459      return value
     460  
     461  
     462  def dict_from_cookiejar(cj):
     463      """Returns a key/value dictionary from a CookieJar.
     464  
     465      :param cj: CookieJar object to extract cookies from.
     466      :rtype: dict
     467      """
     468  
     469      cookie_dict = {}
     470  
     471      for cookie in cj:
     472          cookie_dict[cookie.name] = cookie.value
     473  
     474      return cookie_dict
     475  
     476  
     477  def add_dict_to_cookiejar(cj, cookie_dict):
     478      """Returns a CookieJar from a key/value dictionary.
     479  
     480      :param cj: CookieJar to insert cookies into.
     481      :param cookie_dict: Dict of key/values to insert into CookieJar.
     482      :rtype: CookieJar
     483      """
     484  
     485      return cookiejar_from_dict(cookie_dict, cj)
     486  
     487  
     488  def get_encodings_from_content(content):
     489      """Returns encodings from given content string.
     490  
     491      :param content: bytestring to extract encodings from.
     492      """
     493      warnings.warn(
     494          (
     495              "In requests 3.0, get_encodings_from_content will be removed. For "
     496              "more information, please see the discussion on issue #2266. (This"
     497              " warning should only appear once.)"
     498          ),
     499          DeprecationWarning,
     500      )
     501  
     502      charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
     503      pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
     504      xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
     505  
     506      return (
     507          charset_re.findall(content)
     508          + pragma_re.findall(content)
     509          + xml_re.findall(content)
     510      )
     511  
     512  
     513  def _parse_content_type_header(header):
     514      """Returns content type and parameters from given header
     515  
     516      :param header: string
     517      :return: tuple containing content type and dictionary of
     518           parameters
     519      """
     520  
     521      tokens = header.split(";")
     522      content_type, params = tokens[0].strip(), tokens[1:]
     523      params_dict = {}
     524      items_to_strip = "\"' "
     525  
     526      for param in params:
     527          param = param.strip()
     528          if param:
     529              key, value = param, True
     530              index_of_equals = param.find("=")
     531              if index_of_equals != -1:
     532                  key = param[:index_of_equals].strip(items_to_strip)
     533                  value = param[index_of_equals + 1 :].strip(items_to_strip)
     534              params_dict[key.lower()] = value
     535      return content_type, params_dict
     536  
     537  
     538  def get_encoding_from_headers(headers):
     539      """Returns encodings from given HTTP Header Dict.
     540  
     541      :param headers: dictionary to extract encoding from.
     542      :rtype: str
     543      """
     544  
     545      content_type = headers.get("content-type")
     546  
     547      if not content_type:
     548          return None
     549  
     550      content_type, params = _parse_content_type_header(content_type)
     551  
     552      if "charset" in params:
     553          return params["charset"].strip("'\"")
     554  
     555      if "text" in content_type:
     556          return "ISO-8859-1"
     557  
     558      if "application/json" in content_type:
     559          # Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset
     560          return "utf-8"
     561  
     562  
     563  def stream_decode_response_unicode(iterator, r):
     564      """Stream decodes an iterator."""
     565  
     566      if r.encoding is None:
     567          yield from iterator
     568          return
     569  
     570      decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace")
     571      for chunk in iterator:
     572          rv = decoder.decode(chunk)
     573          if rv:
     574              yield rv
     575      rv = decoder.decode(b"", final=True)
     576      if rv:
     577          yield rv
     578  
     579  
     580  def iter_slices(string, slice_length):
     581      """Iterate over slices of a string."""
     582      pos = 0
     583      if slice_length is None or slice_length <= 0:
     584          slice_length = len(string)
     585      while pos < len(string):
     586          yield string[pos : pos + slice_length]
     587          pos += slice_length
     588  
     589  
     590  def get_unicode_from_response(r):
     591      """Returns the requested content back in unicode.
     592  
     593      :param r: Response object to get unicode content from.
     594  
     595      Tried:
     596  
     597      1. charset from content-type
     598      2. fall back and replace all unicode characters
     599  
     600      :rtype: str
     601      """
     602      warnings.warn(
     603          (
     604              "In requests 3.0, get_unicode_from_response will be removed. For "
     605              "more information, please see the discussion on issue #2266. (This"
     606              " warning should only appear once.)"
     607          ),
     608          DeprecationWarning,
     609      )
     610  
     611      tried_encodings = []
     612  
     613      # Try charset from content-type
     614      encoding = get_encoding_from_headers(r.headers)
     615  
     616      if encoding:
     617          try:
     618              return str(r.content, encoding)
     619          except UnicodeError:
     620              tried_encodings.append(encoding)
     621  
     622      # Fall back:
     623      try:
     624          return str(r.content, encoding, errors="replace")
     625      except TypeError:
     626          return r.content
     627  
     628  
     629  # The unreserved URI characters (RFC 3986)
     630  UNRESERVED_SET = frozenset(
     631      "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~"
     632  )
     633  
     634  
     635  def unquote_unreserved(uri):
     636      """Un-escape any percent-escape sequences in a URI that are unreserved
     637      characters. This leaves all reserved, illegal and non-ASCII bytes encoded.
     638  
     639      :rtype: str
     640      """
     641      parts = uri.split("%")
     642      for i in range(1, len(parts)):
     643          h = parts[i][0:2]
     644          if len(h) == 2 and h.isalnum():
     645              try:
     646                  c = chr(int(h, 16))
     647              except ValueError:
     648                  raise InvalidURL(f"Invalid percent-escape sequence: '{h}'")
     649  
     650              if c in UNRESERVED_SET:
     651                  parts[i] = c + parts[i][2:]
     652              else:
     653                  parts[i] = f"%{parts[i]}"
     654          else:
     655              parts[i] = f"%{parts[i]}"
     656      return "".join(parts)
     657  
     658  
     659  def requote_uri(uri):
     660      """Re-quote the given URI.
     661  
     662      This function passes the given URI through an unquote/quote cycle to
     663      ensure that it is fully and consistently quoted.
     664  
     665      :rtype: str
     666      """
     667      safe_with_percent = "!#$%&'()*+,/:;=?@[]~"
     668      safe_without_percent = "!#$&'()*+,/:;=?@[]~"
     669      try:
     670          # Unquote only the unreserved characters
     671          # Then quote only illegal characters (do not quote reserved,
     672          # unreserved, or '%')
     673          return quote(unquote_unreserved(uri), safe=safe_with_percent)
     674      except InvalidURL:
     675          # We couldn't unquote the given URI, so let's try quoting it, but
     676          # there may be unquoted '%'s in the URI. We need to make sure they're
     677          # properly quoted so they do not cause issues elsewhere.
     678          return quote(uri, safe=safe_without_percent)
     679  
     680  
     681  def address_in_network(ip, net):
     682      """This function allows you to check if an IP belongs to a network subnet
     683  
     684      Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24
     685               returns False if ip = 192.168.1.1 and net = 192.168.100.0/24
     686  
     687      :rtype: bool
     688      """
     689      ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0]
     690      netaddr, bits = net.split("/")
     691      netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0]
     692      network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask
     693      return (ipaddr & netmask) == (network & netmask)
     694  
     695  
     696  def dotted_netmask(mask):
     697      """Converts mask from /xx format to xxx.xxx.xxx.xxx
     698  
     699      Example: if mask is 24 function returns 255.255.255.0
     700  
     701      :rtype: str
     702      """
     703      bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1
     704      return socket.inet_ntoa(struct.pack(">I", bits))
     705  
     706  
     707  def is_ipv4_address(string_ip):
     708      """
     709      :rtype: bool
     710      """
     711      try:
     712          socket.inet_aton(string_ip)
     713      except OSError:
     714          return False
     715      return True
     716  
     717  
     718  def is_valid_cidr(string_network):
     719      """
     720      Very simple check of the cidr format in no_proxy variable.
     721  
     722      :rtype: bool
     723      """
     724      if string_network.count("/") == 1:
     725          try:
     726              mask = int(string_network.split("/")[1])
     727          except ValueError:
     728              return False
     729  
     730          if mask < 1 or mask > 32:
     731              return False
     732  
     733          try:
     734              socket.inet_aton(string_network.split("/")[0])
     735          except OSError:
     736              return False
     737      else:
     738          return False
     739      return True
     740  
     741  
     742  @contextlib.contextmanager
     743  def set_environ(env_name, value):
     744      """Set the environment variable 'env_name' to 'value'
     745  
     746      Save previous value, yield, and then restore the previous value stored in
     747      the environment variable 'env_name'.
     748  
     749      If 'value' is None, do nothing"""
     750      value_changed = value is not None
     751      if value_changed:
     752          old_value = os.environ.get(env_name)
     753          os.environ[env_name] = value
     754      try:
     755          yield
     756      finally:
     757          if value_changed:
     758              if old_value is None:
     759                  del os.environ[env_name]
     760              else:
     761                  os.environ[env_name] = old_value
     762  
     763  
     764  def should_bypass_proxies(url, no_proxy):
     765      """
     766      Returns whether we should bypass proxies or not.
     767  
     768      :rtype: bool
     769      """
     770      # Prioritize lowercase environment variables over uppercase
     771      # to keep a consistent behaviour with other http projects (curl, wget).
     772      def get_proxy(key):
     773          return os.environ.get(key) or os.environ.get(key.upper())
     774  
     775      # First check whether no_proxy is defined. If it is, check that the URL
     776      # we're getting isn't in the no_proxy list.
     777      no_proxy_arg = no_proxy
     778      if no_proxy is None:
     779          no_proxy = get_proxy("no_proxy")
     780      parsed = urlparse(url)
     781  
     782      if parsed.hostname is None:
     783          # URLs don't always have hostnames, e.g. file:/// urls.
     784          return True
     785  
     786      if no_proxy:
     787          # We need to check whether we match here. We need to see if we match
     788          # the end of the hostname, both with and without the port.
     789          no_proxy = (host for host in no_proxy.replace(" ", "").split(",") if host)
     790  
     791          if is_ipv4_address(parsed.hostname):
     792              for proxy_ip in no_proxy:
     793                  if is_valid_cidr(proxy_ip):
     794                      if address_in_network(parsed.hostname, proxy_ip):
     795                          return True
     796                  elif parsed.hostname == proxy_ip:
     797                      # If no_proxy ip was defined in plain IP notation instead of cidr notation &
     798                      # matches the IP of the index
     799                      return True
     800          else:
     801              host_with_port = parsed.hostname
     802              if parsed.port:
     803                  host_with_port += f":{parsed.port}"
     804  
     805              for host in no_proxy:
     806                  if parsed.hostname.endswith(host) or host_with_port.endswith(host):
     807                      # The URL does match something in no_proxy, so we don't want
     808                      # to apply the proxies on this URL.
     809                      return True
     810  
     811      with set_environ("no_proxy", no_proxy_arg):
     812          # parsed.hostname can be `None` in cases such as a file URI.
     813          try:
     814              bypass = proxy_bypass(parsed.hostname)
     815          except (TypeError, socket.gaierror):
     816              bypass = False
     817  
     818      if bypass:
     819          return True
     820  
     821      return False
     822  
     823  
     824  def get_environ_proxies(url, no_proxy=None):
     825      """
     826      Return a dict of environment proxies.
     827  
     828      :rtype: dict
     829      """
     830      if should_bypass_proxies(url, no_proxy=no_proxy):
     831          return {}
     832      else:
     833          return getproxies()
     834  
     835  
     836  def select_proxy(url, proxies):
     837      """Select a proxy for the url, if applicable.
     838  
     839      :param url: The url being for the request
     840      :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs
     841      """
     842      proxies = proxies or {}
     843      urlparts = urlparse(url)
     844      if urlparts.hostname is None:
     845          return proxies.get(urlparts.scheme, proxies.get("all"))
     846  
     847      proxy_keys = [
     848          urlparts.scheme + "://" + urlparts.hostname,
     849          urlparts.scheme,
     850          "all://" + urlparts.hostname,
     851          "all",
     852      ]
     853      proxy = None
     854      for proxy_key in proxy_keys:
     855          if proxy_key in proxies:
     856              proxy = proxies[proxy_key]
     857              break
     858  
     859      return proxy
     860  
     861  
     862  def resolve_proxies(request, proxies, trust_env=True):
     863      """This method takes proxy information from a request and configuration
     864      input to resolve a mapping of target proxies. This will consider settings
     865      such a NO_PROXY to strip proxy configurations.
     866  
     867      :param request: Request or PreparedRequest
     868      :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs
     869      :param trust_env: Boolean declaring whether to trust environment configs
     870  
     871      :rtype: dict
     872      """
     873      proxies = proxies if proxies is not None else {}
     874      url = request.url
     875      scheme = urlparse(url).scheme
     876      no_proxy = proxies.get("no_proxy")
     877      new_proxies = proxies.copy()
     878  
     879      if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy):
     880          environ_proxies = get_environ_proxies(url, no_proxy=no_proxy)
     881  
     882          proxy = environ_proxies.get(scheme, environ_proxies.get("all"))
     883  
     884          if proxy:
     885              new_proxies.setdefault(scheme, proxy)
     886      return new_proxies
     887  
     888  
     889  def default_user_agent(name="python-requests"):
     890      """
     891      Return a string representing the default user agent.
     892  
     893      :rtype: str
     894      """
     895      return f"{name}/{__version__}"
     896  
     897  
     898  def default_headers():
     899      """
     900      :rtype: requests.structures.CaseInsensitiveDict
     901      """
     902      return CaseInsensitiveDict(
     903          {
     904              "User-Agent": default_user_agent(),
     905              "Accept-Encoding": DEFAULT_ACCEPT_ENCODING,
     906              "Accept": "*/*",
     907              "Connection": "keep-alive",
     908          }
     909      )
     910  
     911  
     912  def parse_header_links(value):
     913      """Return a list of parsed link headers proxies.
     914  
     915      i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg"
     916  
     917      :rtype: list
     918      """
     919  
     920      links = []
     921  
     922      replace_chars = " '\""
     923  
     924      value = value.strip(replace_chars)
     925      if not value:
     926          return links
     927  
     928      for val in re.split(", *<", value):
     929          try:
     930              url, params = val.split(";", 1)
     931          except ValueError:
     932              url, params = val, ""
     933  
     934          link = {"url": url.strip("<> '\"")}
     935  
     936          for param in params.split(";"):
     937              try:
     938                  key, value = param.split("=")
     939              except ValueError:
     940                  break
     941  
     942              link[key.strip(replace_chars)] = value.strip(replace_chars)
     943  
     944          links.append(link)
     945  
     946      return links
     947  
     948  
     949  # Null bytes; no need to recreate these on each call to guess_json_utf
     950  _null = "\x00".encode("ascii")  # encoding to ASCII for Python 3
     951  _null2 = _null * 2
     952  _null3 = _null * 3
     953  
     954  
     955  def guess_json_utf(data):
     956      """
     957      :rtype: str
     958      """
     959      # JSON always starts with two ASCII characters, so detection is as
     960      # easy as counting the nulls and from their location and count
     961      # determine the encoding. Also detect a BOM, if present.
     962      sample = data[:4]
     963      if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE):
     964          return "utf-32"  # BOM included
     965      if sample[:3] == codecs.BOM_UTF8:
     966          return "utf-8-sig"  # BOM included, MS style (discouraged)
     967      if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE):
     968          return "utf-16"  # BOM included
     969      nullcount = sample.count(_null)
     970      if nullcount == 0:
     971          return "utf-8"
     972      if nullcount == 2:
     973          if sample[::2] == _null2:  # 1st and 3rd are null
     974              return "utf-16-be"
     975          if sample[1::2] == _null2:  # 2nd and 4th are null
     976              return "utf-16-le"
     977          # Did not detect 2 valid UTF-16 ascii-range characters
     978      if nullcount == 3:
     979          if sample[:3] == _null3:
     980              return "utf-32-be"
     981          if sample[1:] == _null3:
     982              return "utf-32-le"
     983          # Did not detect a valid UTF-32 ascii-range character
     984      return None
     985  
     986  
     987  def prepend_scheme_if_needed(url, new_scheme):
     988      """Given a URL that may or may not have a scheme, prepend the given scheme.
     989      Does not replace a present scheme with the one provided as an argument.
     990  
     991      :rtype: str
     992      """
     993      parsed = parse_url(url)
     994      scheme, auth, host, port, path, query, fragment = parsed
     995  
     996      # A defect in urlparse determines that there isn't a netloc present in some
     997      # urls. We previously assumed parsing was overly cautious, and swapped the
     998      # netloc and path. Due to a lack of tests on the original defect, this is
     999      # maintained with parse_url for backwards compatibility.
    1000      netloc = parsed.netloc
    1001      if not netloc:
    1002          netloc, path = path, netloc
    1003  
    1004      if auth:
    1005          # parse_url doesn't provide the netloc with auth
    1006          # so we'll add it ourselves.
    1007          netloc = "@".join([auth, netloc])
    1008      if scheme is None:
    1009          scheme = new_scheme
    1010      if path is None:
    1011          path = ""
    1012  
    1013      return urlunparse((scheme, netloc, path, "", query, fragment))
    1014  
    1015  
    1016  def get_auth_from_url(url):
    1017      """Given a url with authentication components, extract them into a tuple of
    1018      username,password.
    1019  
    1020      :rtype: (str,str)
    1021      """
    1022      parsed = urlparse(url)
    1023  
    1024      try:
    1025          auth = (unquote(parsed.username), unquote(parsed.password))
    1026      except (AttributeError, TypeError):
    1027          auth = ("", "")
    1028  
    1029      return auth
    1030  
    1031  
    1032  def check_header_validity(header):
    1033      """Verifies that header parts don't contain leading whitespace
    1034      reserved characters, or return characters.
    1035  
    1036      :param header: tuple, in the format (name, value).
    1037      """
    1038      name, value = header
    1039      _validate_header_part(header, name, 0)
    1040      _validate_header_part(header, value, 1)
    1041  
    1042  
    1043  def _validate_header_part(header, header_part, header_validator_index):
    1044      if isinstance(header_part, str):
    1045          validator = _HEADER_VALIDATORS_STR[header_validator_index]
    1046      elif isinstance(header_part, bytes):
    1047          validator = _HEADER_VALIDATORS_BYTE[header_validator_index]
    1048      else:
    1049          raise InvalidHeader(
    1050              f"Header part ({header_part!r}) from {header} "
    1051              f"must be of type str or bytes, not {type(header_part)}"
    1052          )
    1053  
    1054      if not validator.match(header_part):
    1055          header_kind = "name" if header_validator_index == 0 else "value"
    1056          raise InvalidHeader(
    1057              f"Invalid leading whitespace, reserved character(s), or return"
    1058              f"character(s) in header {header_kind}: {header_part!r}"
    1059          )
    1060  
    1061  
    1062  def urldefragauth(url):
    1063      """
    1064      Given a url remove the fragment and the authentication part.
    1065  
    1066      :rtype: str
    1067      """
    1068      scheme, netloc, path, params, query, fragment = urlparse(url)
    1069  
    1070      # see func:`prepend_scheme_if_needed`
    1071      if not netloc:
    1072          netloc, path = path, netloc
    1073  
    1074      netloc = netloc.rsplit("@", 1)[-1]
    1075  
    1076      return urlunparse((scheme, netloc, path, params, query, ""))
    1077  
    1078  
    1079  def rewind_body(prepared_request):
    1080      """Move file pointer back to its recorded starting position
    1081      so it can be read again on redirect.
    1082      """
    1083      body_seek = getattr(prepared_request.body, "seek", None)
    1084      if body_seek is not None and isinstance(
    1085          prepared_request._body_position, integer_types
    1086      ):
    1087          try:
    1088              body_seek(prepared_request._body_position)
    1089          except OSError:
    1090              raise UnrewindableBodyError(
    1091                  "An error occurred when rewinding request body for redirect."
    1092              )
    1093      else:
    1094          raise UnrewindableBodyError("Unable to rewind request body for redirect.")