python (3.11.7)
       1  """
       2  requests.auth
       3  ~~~~~~~~~~~~~
       4  
       5  This module contains the authentication handlers for Requests.
       6  """
       7  
       8  import hashlib
       9  import os
      10  import re
      11  import threading
      12  import time
      13  import warnings
      14  from base64 import b64encode
      15  
      16  from ._internal_utils import to_native_string
      17  from .compat import basestring, str, urlparse
      18  from .cookies import extract_cookies_to_jar
      19  from .utils import parse_dict_header
      20  
      21  CONTENT_TYPE_FORM_URLENCODED = "application/x-www-form-urlencoded"
      22  CONTENT_TYPE_MULTI_PART = "multipart/form-data"
      23  
      24  
      25  def _basic_auth_str(username, password):
      26      """Returns a Basic Auth string."""
      27  
      28      # "I want us to put a big-ol' comment on top of it that
      29      # says that this behaviour is dumb but we need to preserve
      30      # it because people are relying on it."
      31      #    - Lukasa
      32      #
      33      # These are here solely to maintain backwards compatibility
      34      # for things like ints. This will be removed in 3.0.0.
      35      if not isinstance(username, basestring):
      36          warnings.warn(
      37              "Non-string usernames will no longer be supported in Requests "
      38              "3.0.0. Please convert the object you've passed in ({!r}) to "
      39              "a string or bytes object in the near future to avoid "
      40              "problems.".format(username),
      41              category=DeprecationWarning,
      42          )
      43          username = str(username)
      44  
      45      if not isinstance(password, basestring):
      46          warnings.warn(
      47              "Non-string passwords will no longer be supported in Requests "
      48              "3.0.0. Please convert the object you've passed in ({!r}) to "
      49              "a string or bytes object in the near future to avoid "
      50              "problems.".format(type(password)),
      51              category=DeprecationWarning,
      52          )
      53          password = str(password)
      54      # -- End Removal --
      55  
      56      if isinstance(username, str):
      57          username = username.encode("latin1")
      58  
      59      if isinstance(password, str):
      60          password = password.encode("latin1")
      61  
      62      authstr = "Basic " + to_native_string(
      63          b64encode(b":".join((username, password))).strip()
      64      )
      65  
      66      return authstr
      67  
      68  
      69  class ESC[4;38;5;81mAuthBase:
      70      """Base class that all auth implementations derive from"""
      71  
      72      def __call__(self, r):
      73          raise NotImplementedError("Auth hooks must be callable.")
      74  
      75  
      76  class ESC[4;38;5;81mHTTPBasicAuth(ESC[4;38;5;149mAuthBase):
      77      """Attaches HTTP Basic Authentication to the given Request object."""
      78  
      79      def __init__(self, username, password):
      80          self.username = username
      81          self.password = password
      82  
      83      def __eq__(self, other):
      84          return all(
      85              [
      86                  self.username == getattr(other, "username", None),
      87                  self.password == getattr(other, "password", None),
      88              ]
      89          )
      90  
      91      def __ne__(self, other):
      92          return not self == other
      93  
      94      def __call__(self, r):
      95          r.headers["Authorization"] = _basic_auth_str(self.username, self.password)
      96          return r
      97  
      98  
      99  class ESC[4;38;5;81mHTTPProxyAuth(ESC[4;38;5;149mHTTPBasicAuth):
     100      """Attaches HTTP Proxy Authentication to a given Request object."""
     101  
     102      def __call__(self, r):
     103          r.headers["Proxy-Authorization"] = _basic_auth_str(self.username, self.password)
     104          return r
     105  
     106  
     107  class ESC[4;38;5;81mHTTPDigestAuth(ESC[4;38;5;149mAuthBase):
     108      """Attaches HTTP Digest Authentication to the given Request object."""
     109  
     110      def __init__(self, username, password):
     111          self.username = username
     112          self.password = password
     113          # Keep state in per-thread local storage
     114          self._thread_local = threading.local()
     115  
     116      def init_per_thread_state(self):
     117          # Ensure state is initialized just once per-thread
     118          if not hasattr(self._thread_local, "init"):
     119              self._thread_local.init = True
     120              self._thread_local.last_nonce = ""
     121              self._thread_local.nonce_count = 0
     122              self._thread_local.chal = {}
     123              self._thread_local.pos = None
     124              self._thread_local.num_401_calls = None
     125  
     126      def build_digest_header(self, method, url):
     127          """
     128          :rtype: str
     129          """
     130  
     131          realm = self._thread_local.chal["realm"]
     132          nonce = self._thread_local.chal["nonce"]
     133          qop = self._thread_local.chal.get("qop")
     134          algorithm = self._thread_local.chal.get("algorithm")
     135          opaque = self._thread_local.chal.get("opaque")
     136          hash_utf8 = None
     137  
     138          if algorithm is None:
     139              _algorithm = "MD5"
     140          else:
     141              _algorithm = algorithm.upper()
     142          # lambdas assume digest modules are imported at the top level
     143          if _algorithm == "MD5" or _algorithm == "MD5-SESS":
     144  
     145              def md5_utf8(x):
     146                  if isinstance(x, str):
     147                      x = x.encode("utf-8")
     148                  return hashlib.md5(x).hexdigest()
     149  
     150              hash_utf8 = md5_utf8
     151          elif _algorithm == "SHA":
     152  
     153              def sha_utf8(x):
     154                  if isinstance(x, str):
     155                      x = x.encode("utf-8")
     156                  return hashlib.sha1(x).hexdigest()
     157  
     158              hash_utf8 = sha_utf8
     159          elif _algorithm == "SHA-256":
     160  
     161              def sha256_utf8(x):
     162                  if isinstance(x, str):
     163                      x = x.encode("utf-8")
     164                  return hashlib.sha256(x).hexdigest()
     165  
     166              hash_utf8 = sha256_utf8
     167          elif _algorithm == "SHA-512":
     168  
     169              def sha512_utf8(x):
     170                  if isinstance(x, str):
     171                      x = x.encode("utf-8")
     172                  return hashlib.sha512(x).hexdigest()
     173  
     174              hash_utf8 = sha512_utf8
     175  
     176          KD = lambda s, d: hash_utf8(f"{s}:{d}")  # noqa:E731
     177  
     178          if hash_utf8 is None:
     179              return None
     180  
     181          # XXX not implemented yet
     182          entdig = None
     183          p_parsed = urlparse(url)
     184          #: path is request-uri defined in RFC 2616 which should not be empty
     185          path = p_parsed.path or "/"
     186          if p_parsed.query:
     187              path += f"?{p_parsed.query}"
     188  
     189          A1 = f"{self.username}:{realm}:{self.password}"
     190          A2 = f"{method}:{path}"
     191  
     192          HA1 = hash_utf8(A1)
     193          HA2 = hash_utf8(A2)
     194  
     195          if nonce == self._thread_local.last_nonce:
     196              self._thread_local.nonce_count += 1
     197          else:
     198              self._thread_local.nonce_count = 1
     199          ncvalue = f"{self._thread_local.nonce_count:08x}"
     200          s = str(self._thread_local.nonce_count).encode("utf-8")
     201          s += nonce.encode("utf-8")
     202          s += time.ctime().encode("utf-8")
     203          s += os.urandom(8)
     204  
     205          cnonce = hashlib.sha1(s).hexdigest()[:16]
     206          if _algorithm == "MD5-SESS":
     207              HA1 = hash_utf8(f"{HA1}:{nonce}:{cnonce}")
     208  
     209          if not qop:
     210              respdig = KD(HA1, f"{nonce}:{HA2}")
     211          elif qop == "auth" or "auth" in qop.split(","):
     212              noncebit = f"{nonce}:{ncvalue}:{cnonce}:auth:{HA2}"
     213              respdig = KD(HA1, noncebit)
     214          else:
     215              # XXX handle auth-int.
     216              return None
     217  
     218          self._thread_local.last_nonce = nonce
     219  
     220          # XXX should the partial digests be encoded too?
     221          base = (
     222              f'username="{self.username}", realm="{realm}", nonce="{nonce}", '
     223              f'uri="{path}", response="{respdig}"'
     224          )
     225          if opaque:
     226              base += f', opaque="{opaque}"'
     227          if algorithm:
     228              base += f', algorithm="{algorithm}"'
     229          if entdig:
     230              base += f', digest="{entdig}"'
     231          if qop:
     232              base += f', qop="auth", nc={ncvalue}, cnonce="{cnonce}"'
     233  
     234          return f"Digest {base}"
     235  
     236      def handle_redirect(self, r, **kwargs):
     237          """Reset num_401_calls counter on redirects."""
     238          if r.is_redirect:
     239              self._thread_local.num_401_calls = 1
     240  
     241      def handle_401(self, r, **kwargs):
     242          """
     243          Takes the given response and tries digest-auth, if needed.
     244  
     245          :rtype: requests.Response
     246          """
     247  
     248          # If response is not 4xx, do not auth
     249          # See https://github.com/psf/requests/issues/3772
     250          if not 400 <= r.status_code < 500:
     251              self._thread_local.num_401_calls = 1
     252              return r
     253  
     254          if self._thread_local.pos is not None:
     255              # Rewind the file position indicator of the body to where
     256              # it was to resend the request.
     257              r.request.body.seek(self._thread_local.pos)
     258          s_auth = r.headers.get("www-authenticate", "")
     259  
     260          if "digest" in s_auth.lower() and self._thread_local.num_401_calls < 2:
     261  
     262              self._thread_local.num_401_calls += 1
     263              pat = re.compile(r"digest ", flags=re.IGNORECASE)
     264              self._thread_local.chal = parse_dict_header(pat.sub("", s_auth, count=1))
     265  
     266              # Consume content and release the original connection
     267              # to allow our new request to reuse the same one.
     268              r.content
     269              r.close()
     270              prep = r.request.copy()
     271              extract_cookies_to_jar(prep._cookies, r.request, r.raw)
     272              prep.prepare_cookies(prep._cookies)
     273  
     274              prep.headers["Authorization"] = self.build_digest_header(
     275                  prep.method, prep.url
     276              )
     277              _r = r.connection.send(prep, **kwargs)
     278              _r.history.append(r)
     279              _r.request = prep
     280  
     281              return _r
     282  
     283          self._thread_local.num_401_calls = 1
     284          return r
     285  
     286      def __call__(self, r):
     287          # Initialize per-thread state, if needed
     288          self.init_per_thread_state()
     289          # If we have a saved nonce, skip the 401
     290          if self._thread_local.last_nonce:
     291              r.headers["Authorization"] = self.build_digest_header(r.method, r.url)
     292          try:
     293              self._thread_local.pos = r.body.tell()
     294          except AttributeError:
     295              # In the case of HTTPDigestAuth being reused and the body of
     296              # the previous request was a file-like object, pos has the
     297              # file position of the previous body. Ensure it's set to
     298              # None.
     299              self._thread_local.pos = None
     300          r.register_hook("response", self.handle_401)
     301          r.register_hook("response", self.handle_redirect)
     302          self._thread_local.num_401_calls = 1
     303  
     304          return r
     305  
     306      def __eq__(self, other):
     307          return all(
     308              [
     309                  self.username == getattr(other, "username", None),
     310                  self.password == getattr(other, "password", None),
     311              ]
     312          )
     313  
     314      def __ne__(self, other):
     315          return not self == other