python (3.12.0)

(root)/
lib/
python3.12/
test/
test_httplib.py
       1  import enum
       2  import errno
       3  from http import client, HTTPStatus
       4  import io
       5  import itertools
       6  import os
       7  import array
       8  import re
       9  import socket
      10  import threading
      11  
      12  import unittest
      13  from unittest import mock
      14  TestCase = unittest.TestCase
      15  
      16  from test import support
      17  from test.support import os_helper
      18  from test.support import socket_helper
      19  
      20  support.requires_working_socket(module=True)
      21  
      22  here = os.path.dirname(__file__)
      23  # Self-signed cert file for 'localhost'
      24  CERT_localhost = os.path.join(here, 'keycert.pem')
      25  # Self-signed cert file for 'fakehostname'
      26  CERT_fakehostname = os.path.join(here, 'keycert2.pem')
      27  # Self-signed cert file for self-signed.pythontest.net
      28  CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem')
      29  
      30  # constants for testing chunked encoding
      31  chunked_start = (
      32      'HTTP/1.1 200 OK\r\n'
      33      'Transfer-Encoding: chunked\r\n\r\n'
      34      'a\r\n'
      35      'hello worl\r\n'
      36      '3\r\n'
      37      'd! \r\n'
      38      '8\r\n'
      39      'and now \r\n'
      40      '22\r\n'
      41      'for something completely different\r\n'
      42  )
      43  chunked_expected = b'hello world! and now for something completely different'
      44  chunk_extension = ";foo=bar"
      45  last_chunk = "0\r\n"
      46  last_chunk_extended = "0" + chunk_extension + "\r\n"
      47  trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n"
      48  chunked_end = "\r\n"
      49  
      50  HOST = socket_helper.HOST
      51  
      52  class ESC[4;38;5;81mFakeSocket:
      53      def __init__(self, text, fileclass=io.BytesIO, host=None, port=None):
      54          if isinstance(text, str):
      55              text = text.encode("ascii")
      56          self.text = text
      57          self.fileclass = fileclass
      58          self.data = b''
      59          self.sendall_calls = 0
      60          self.file_closed = False
      61          self.host = host
      62          self.port = port
      63  
      64      def sendall(self, data):
      65          self.sendall_calls += 1
      66          self.data += data
      67  
      68      def makefile(self, mode, bufsize=None):
      69          if mode != 'r' and mode != 'rb':
      70              raise client.UnimplementedFileMode()
      71          # keep the file around so we can check how much was read from it
      72          self.file = self.fileclass(self.text)
      73          self.file.close = self.file_close #nerf close ()
      74          return self.file
      75  
      76      def file_close(self):
      77          self.file_closed = True
      78  
      79      def close(self):
      80          pass
      81  
      82      def setsockopt(self, level, optname, value):
      83          pass
      84  
      85  class ESC[4;38;5;81mEPipeSocket(ESC[4;38;5;149mFakeSocket):
      86  
      87      def __init__(self, text, pipe_trigger):
      88          # When sendall() is called with pipe_trigger, raise EPIPE.
      89          FakeSocket.__init__(self, text)
      90          self.pipe_trigger = pipe_trigger
      91  
      92      def sendall(self, data):
      93          if self.pipe_trigger in data:
      94              raise OSError(errno.EPIPE, "gotcha")
      95          self.data += data
      96  
      97      def close(self):
      98          pass
      99  
     100  class ESC[4;38;5;81mNoEOFBytesIO(ESC[4;38;5;149mioESC[4;38;5;149m.ESC[4;38;5;149mBytesIO):
     101      """Like BytesIO, but raises AssertionError on EOF.
     102  
     103      This is used below to test that http.client doesn't try to read
     104      more from the underlying file than it should.
     105      """
     106      def read(self, n=-1):
     107          data = io.BytesIO.read(self, n)
     108          if data == b'':
     109              raise AssertionError('caller tried to read past EOF')
     110          return data
     111  
     112      def readline(self, length=None):
     113          data = io.BytesIO.readline(self, length)
     114          if data == b'':
     115              raise AssertionError('caller tried to read past EOF')
     116          return data
     117  
     118  class ESC[4;38;5;81mFakeSocketHTTPConnection(ESC[4;38;5;149mclientESC[4;38;5;149m.ESC[4;38;5;149mHTTPConnection):
     119      """HTTPConnection subclass using FakeSocket; counts connect() calls"""
     120  
     121      def __init__(self, *args):
     122          self.connections = 0
     123          super().__init__('example.com')
     124          self.fake_socket_args = args
     125          self._create_connection = self.create_connection
     126  
     127      def connect(self):
     128          """Count the number of times connect() is invoked"""
     129          self.connections += 1
     130          return super().connect()
     131  
     132      def create_connection(self, *pos, **kw):
     133          return FakeSocket(*self.fake_socket_args)
     134  
     135  class ESC[4;38;5;81mHeaderTests(ESC[4;38;5;149mTestCase):
     136      def test_auto_headers(self):
     137          # Some headers are added automatically, but should not be added by
     138          # .request() if they are explicitly set.
     139  
     140          class ESC[4;38;5;81mHeaderCountingBuffer(ESC[4;38;5;149mlist):
     141              def __init__(self):
     142                  self.count = {}
     143              def append(self, item):
     144                  kv = item.split(b':')
     145                  if len(kv) > 1:
     146                      # item is a 'Key: Value' header string
     147                      lcKey = kv[0].decode('ascii').lower()
     148                      self.count.setdefault(lcKey, 0)
     149                      self.count[lcKey] += 1
     150                  list.append(self, item)
     151  
     152          for explicit_header in True, False:
     153              for header in 'Content-length', 'Host', 'Accept-encoding':
     154                  conn = client.HTTPConnection('example.com')
     155                  conn.sock = FakeSocket('blahblahblah')
     156                  conn._buffer = HeaderCountingBuffer()
     157  
     158                  body = 'spamspamspam'
     159                  headers = {}
     160                  if explicit_header:
     161                      headers[header] = str(len(body))
     162                  conn.request('POST', '/', body, headers)
     163                  self.assertEqual(conn._buffer.count[header.lower()], 1)
     164  
     165      def test_content_length_0(self):
     166  
     167          class ESC[4;38;5;81mContentLengthChecker(ESC[4;38;5;149mlist):
     168              def __init__(self):
     169                  list.__init__(self)
     170                  self.content_length = None
     171              def append(self, item):
     172                  kv = item.split(b':', 1)
     173                  if len(kv) > 1 and kv[0].lower() == b'content-length':
     174                      self.content_length = kv[1].strip()
     175                  list.append(self, item)
     176  
     177          # Here, we're testing that methods expecting a body get a
     178          # content-length set to zero if the body is empty (either None or '')
     179          bodies = (None, '')
     180          methods_with_body = ('PUT', 'POST', 'PATCH')
     181          for method, body in itertools.product(methods_with_body, bodies):
     182              conn = client.HTTPConnection('example.com')
     183              conn.sock = FakeSocket(None)
     184              conn._buffer = ContentLengthChecker()
     185              conn.request(method, '/', body)
     186              self.assertEqual(
     187                  conn._buffer.content_length, b'0',
     188                  'Header Content-Length incorrect on {}'.format(method)
     189              )
     190  
     191          # For these methods, we make sure that content-length is not set when
     192          # the body is None because it might cause unexpected behaviour on the
     193          # server.
     194          methods_without_body = (
     195               'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE',
     196          )
     197          for method in methods_without_body:
     198              conn = client.HTTPConnection('example.com')
     199              conn.sock = FakeSocket(None)
     200              conn._buffer = ContentLengthChecker()
     201              conn.request(method, '/', None)
     202              self.assertEqual(
     203                  conn._buffer.content_length, None,
     204                  'Header Content-Length set for empty body on {}'.format(method)
     205              )
     206  
     207          # If the body is set to '', that's considered to be "present but
     208          # empty" rather than "missing", so content length would be set, even
     209          # for methods that don't expect a body.
     210          for method in methods_without_body:
     211              conn = client.HTTPConnection('example.com')
     212              conn.sock = FakeSocket(None)
     213              conn._buffer = ContentLengthChecker()
     214              conn.request(method, '/', '')
     215              self.assertEqual(
     216                  conn._buffer.content_length, b'0',
     217                  'Header Content-Length incorrect on {}'.format(method)
     218              )
     219  
     220          # If the body is set, make sure Content-Length is set.
     221          for method in itertools.chain(methods_without_body, methods_with_body):
     222              conn = client.HTTPConnection('example.com')
     223              conn.sock = FakeSocket(None)
     224              conn._buffer = ContentLengthChecker()
     225              conn.request(method, '/', ' ')
     226              self.assertEqual(
     227                  conn._buffer.content_length, b'1',
     228                  'Header Content-Length incorrect on {}'.format(method)
     229              )
     230  
     231      def test_putheader(self):
     232          conn = client.HTTPConnection('example.com')
     233          conn.sock = FakeSocket(None)
     234          conn.putrequest('GET','/')
     235          conn.putheader('Content-length', 42)
     236          self.assertIn(b'Content-length: 42', conn._buffer)
     237  
     238          conn.putheader('Foo', ' bar ')
     239          self.assertIn(b'Foo:  bar ', conn._buffer)
     240          conn.putheader('Bar', '\tbaz\t')
     241          self.assertIn(b'Bar: \tbaz\t', conn._buffer)
     242          conn.putheader('Authorization', 'Bearer mytoken')
     243          self.assertIn(b'Authorization: Bearer mytoken', conn._buffer)
     244          conn.putheader('IterHeader', 'IterA', 'IterB')
     245          self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer)
     246          conn.putheader('LatinHeader', b'\xFF')
     247          self.assertIn(b'LatinHeader: \xFF', conn._buffer)
     248          conn.putheader('Utf8Header', b'\xc3\x80')
     249          self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer)
     250          conn.putheader('C1-Control', b'next\x85line')
     251          self.assertIn(b'C1-Control: next\x85line', conn._buffer)
     252          conn.putheader('Embedded-Fold-Space', 'is\r\n allowed')
     253          self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer)
     254          conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed')
     255          self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer)
     256          conn.putheader('Key Space', 'value')
     257          self.assertIn(b'Key Space: value', conn._buffer)
     258          conn.putheader('KeySpace ', 'value')
     259          self.assertIn(b'KeySpace : value', conn._buffer)
     260          conn.putheader(b'Nonbreak\xa0Space', 'value')
     261          self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer)
     262          conn.putheader(b'\xa0NonbreakSpace', 'value')
     263          self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer)
     264  
     265      def test_ipv6host_header(self):
     266          # Default host header on IPv6 transaction should be wrapped by [] if
     267          # it is an IPv6 address
     268          expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
     269                     b'Accept-Encoding: identity\r\n\r\n'
     270          conn = client.HTTPConnection('[2001::]:81')
     271          sock = FakeSocket('')
     272          conn.sock = sock
     273          conn.request('GET', '/foo')
     274          self.assertTrue(sock.data.startswith(expected))
     275  
     276          expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
     277                     b'Accept-Encoding: identity\r\n\r\n'
     278          conn = client.HTTPConnection('[2001:102A::]')
     279          sock = FakeSocket('')
     280          conn.sock = sock
     281          conn.request('GET', '/foo')
     282          self.assertTrue(sock.data.startswith(expected))
     283  
     284      def test_malformed_headers_coped_with(self):
     285          # Issue 19996
     286          body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n"
     287          sock = FakeSocket(body)
     288          resp = client.HTTPResponse(sock)
     289          resp.begin()
     290  
     291          self.assertEqual(resp.getheader('First'), 'val')
     292          self.assertEqual(resp.getheader('Second'), 'val')
     293  
     294      def test_parse_all_octets(self):
     295          # Ensure no valid header field octet breaks the parser
     296          body = (
     297              b'HTTP/1.1 200 OK\r\n'
     298              b"!#$%&'*+-.^_`|~: value\r\n"  # Special token characters
     299              b'VCHAR: ' + bytes(range(0x21, 0x7E + 1)) + b'\r\n'
     300              b'obs-text: ' + bytes(range(0x80, 0xFF + 1)) + b'\r\n'
     301              b'obs-fold: text\r\n'
     302              b' folded with space\r\n'
     303              b'\tfolded with tab\r\n'
     304              b'Content-Length: 0\r\n'
     305              b'\r\n'
     306          )
     307          sock = FakeSocket(body)
     308          resp = client.HTTPResponse(sock)
     309          resp.begin()
     310          self.assertEqual(resp.getheader('Content-Length'), '0')
     311          self.assertEqual(resp.msg['Content-Length'], '0')
     312          self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value')
     313          self.assertEqual(resp.msg["!#$%&'*+-.^_`|~"], 'value')
     314          vchar = ''.join(map(chr, range(0x21, 0x7E + 1)))
     315          self.assertEqual(resp.getheader('VCHAR'), vchar)
     316          self.assertEqual(resp.msg['VCHAR'], vchar)
     317          self.assertIsNotNone(resp.getheader('obs-text'))
     318          self.assertIn('obs-text', resp.msg)
     319          for folded in (resp.getheader('obs-fold'), resp.msg['obs-fold']):
     320              self.assertTrue(folded.startswith('text'))
     321              self.assertIn(' folded with space', folded)
     322              self.assertTrue(folded.endswith('folded with tab'))
     323  
     324      def test_invalid_headers(self):
     325          conn = client.HTTPConnection('example.com')
     326          conn.sock = FakeSocket('')
     327          conn.putrequest('GET', '/')
     328  
     329          # http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no
     330          # longer allowed in header names
     331          cases = (
     332              (b'Invalid\r\nName', b'ValidValue'),
     333              (b'Invalid\rName', b'ValidValue'),
     334              (b'Invalid\nName', b'ValidValue'),
     335              (b'\r\nInvalidName', b'ValidValue'),
     336              (b'\rInvalidName', b'ValidValue'),
     337              (b'\nInvalidName', b'ValidValue'),
     338              (b' InvalidName', b'ValidValue'),
     339              (b'\tInvalidName', b'ValidValue'),
     340              (b'Invalid:Name', b'ValidValue'),
     341              (b':InvalidName', b'ValidValue'),
     342              (b'ValidName', b'Invalid\r\nValue'),
     343              (b'ValidName', b'Invalid\rValue'),
     344              (b'ValidName', b'Invalid\nValue'),
     345              (b'ValidName', b'InvalidValue\r\n'),
     346              (b'ValidName', b'InvalidValue\r'),
     347              (b'ValidName', b'InvalidValue\n'),
     348          )
     349          for name, value in cases:
     350              with self.subTest((name, value)):
     351                  with self.assertRaisesRegex(ValueError, 'Invalid header'):
     352                      conn.putheader(name, value)
     353  
     354      def test_headers_debuglevel(self):
     355          body = (
     356              b'HTTP/1.1 200 OK\r\n'
     357              b'First: val\r\n'
     358              b'Second: val1\r\n'
     359              b'Second: val2\r\n'
     360          )
     361          sock = FakeSocket(body)
     362          resp = client.HTTPResponse(sock, debuglevel=1)
     363          with support.captured_stdout() as output:
     364              resp.begin()
     365          lines = output.getvalue().splitlines()
     366          self.assertEqual(lines[0], "reply: 'HTTP/1.1 200 OK\\r\\n'")
     367          self.assertEqual(lines[1], "header: First: val")
     368          self.assertEqual(lines[2], "header: Second: val1")
     369          self.assertEqual(lines[3], "header: Second: val2")
     370  
     371  
     372  class ESC[4;38;5;81mHttpMethodTests(ESC[4;38;5;149mTestCase):
     373      def test_invalid_method_names(self):
     374          methods = (
     375              'GET\r',
     376              'POST\n',
     377              'PUT\n\r',
     378              'POST\nValue',
     379              'POST\nHOST:abc',
     380              'GET\nrHost:abc\n',
     381              'POST\rRemainder:\r',
     382              'GET\rHOST:\n',
     383              '\nPUT'
     384          )
     385  
     386          for method in methods:
     387              with self.assertRaisesRegex(
     388                      ValueError, "method can't contain control characters"):
     389                  conn = client.HTTPConnection('example.com')
     390                  conn.sock = FakeSocket(None)
     391                  conn.request(method=method, url="/")
     392  
     393  
     394  class ESC[4;38;5;81mTransferEncodingTest(ESC[4;38;5;149mTestCase):
     395      expected_body = b"It's just a flesh wound"
     396  
     397      def test_endheaders_chunked(self):
     398          conn = client.HTTPConnection('example.com')
     399          conn.sock = FakeSocket(b'')
     400          conn.putrequest('POST', '/')
     401          conn.endheaders(self._make_body(), encode_chunked=True)
     402  
     403          _, _, body = self._parse_request(conn.sock.data)
     404          body = self._parse_chunked(body)
     405          self.assertEqual(body, self.expected_body)
     406  
     407      def test_explicit_headers(self):
     408          # explicit chunked
     409          conn = client.HTTPConnection('example.com')
     410          conn.sock = FakeSocket(b'')
     411          # this shouldn't actually be automatically chunk-encoded because the
     412          # calling code has explicitly stated that it's taking care of it
     413          conn.request(
     414              'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'})
     415  
     416          _, headers, body = self._parse_request(conn.sock.data)
     417          self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
     418          self.assertEqual(headers['Transfer-Encoding'], 'chunked')
     419          self.assertEqual(body, self.expected_body)
     420  
     421          # explicit chunked, string body
     422          conn = client.HTTPConnection('example.com')
     423          conn.sock = FakeSocket(b'')
     424          conn.request(
     425              'POST', '/', self.expected_body.decode('latin-1'),
     426              {'Transfer-Encoding': 'chunked'})
     427  
     428          _, headers, body = self._parse_request(conn.sock.data)
     429          self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
     430          self.assertEqual(headers['Transfer-Encoding'], 'chunked')
     431          self.assertEqual(body, self.expected_body)
     432  
     433          # User-specified TE, but request() does the chunk encoding
     434          conn = client.HTTPConnection('example.com')
     435          conn.sock = FakeSocket(b'')
     436          conn.request('POST', '/',
     437              headers={'Transfer-Encoding': 'gzip, chunked'},
     438              encode_chunked=True,
     439              body=self._make_body())
     440          _, headers, body = self._parse_request(conn.sock.data)
     441          self.assertNotIn('content-length', [k.lower() for k in headers])
     442          self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked')
     443          self.assertEqual(self._parse_chunked(body), self.expected_body)
     444  
     445      def test_request(self):
     446          for empty_lines in (False, True,):
     447              conn = client.HTTPConnection('example.com')
     448              conn.sock = FakeSocket(b'')
     449              conn.request(
     450                  'POST', '/', self._make_body(empty_lines=empty_lines))
     451  
     452              _, headers, body = self._parse_request(conn.sock.data)
     453              body = self._parse_chunked(body)
     454              self.assertEqual(body, self.expected_body)
     455              self.assertEqual(headers['Transfer-Encoding'], 'chunked')
     456  
     457              # Content-Length and Transfer-Encoding SHOULD not be sent in the
     458              # same request
     459              self.assertNotIn('content-length', [k.lower() for k in headers])
     460  
     461      def test_empty_body(self):
     462          # Zero-length iterable should be treated like any other iterable
     463          conn = client.HTTPConnection('example.com')
     464          conn.sock = FakeSocket(b'')
     465          conn.request('POST', '/', ())
     466          _, headers, body = self._parse_request(conn.sock.data)
     467          self.assertEqual(headers['Transfer-Encoding'], 'chunked')
     468          self.assertNotIn('content-length', [k.lower() for k in headers])
     469          self.assertEqual(body, b"0\r\n\r\n")
     470  
     471      def _make_body(self, empty_lines=False):
     472          lines = self.expected_body.split(b' ')
     473          for idx, line in enumerate(lines):
     474              # for testing handling empty lines
     475              if empty_lines and idx % 2:
     476                  yield b''
     477              if idx < len(lines) - 1:
     478                  yield line + b' '
     479              else:
     480                  yield line
     481  
     482      def _parse_request(self, data):
     483          lines = data.split(b'\r\n')
     484          request = lines[0]
     485          headers = {}
     486          n = 1
     487          while n < len(lines) and len(lines[n]) > 0:
     488              key, val = lines[n].split(b':')
     489              key = key.decode('latin-1').strip()
     490              headers[key] = val.decode('latin-1').strip()
     491              n += 1
     492  
     493          return request, headers, b'\r\n'.join(lines[n + 1:])
     494  
     495      def _parse_chunked(self, data):
     496          body = []
     497          trailers = {}
     498          n = 0
     499          lines = data.split(b'\r\n')
     500          # parse body
     501          while True:
     502              size, chunk = lines[n:n+2]
     503              size = int(size, 16)
     504  
     505              if size == 0:
     506                  n += 1
     507                  break
     508  
     509              self.assertEqual(size, len(chunk))
     510              body.append(chunk)
     511  
     512              n += 2
     513              # we /should/ hit the end chunk, but check against the size of
     514              # lines so we're not stuck in an infinite loop should we get
     515              # malformed data
     516              if n > len(lines):
     517                  break
     518  
     519          return b''.join(body)
     520  
     521  
     522  class ESC[4;38;5;81mBasicTest(ESC[4;38;5;149mTestCase):
     523      def test_dir_with_added_behavior_on_status(self):
     524          # see issue40084
     525          self.assertTrue({'description', 'name', 'phrase', 'value'} <= set(dir(HTTPStatus(404))))
     526  
     527      def test_simple_httpstatus(self):
     528          class ESC[4;38;5;81mCheckedHTTPStatus(ESC[4;38;5;149menumESC[4;38;5;149m.ESC[4;38;5;149mIntEnum):
     529              """HTTP status codes and reason phrases
     530  
     531              Status codes from the following RFCs are all observed:
     532  
     533                  * RFC 7231: Hypertext Transfer Protocol (HTTP/1.1), obsoletes 2616
     534                  * RFC 6585: Additional HTTP Status Codes
     535                  * RFC 3229: Delta encoding in HTTP
     536                  * RFC 4918: HTTP Extensions for WebDAV, obsoletes 2518
     537                  * RFC 5842: Binding Extensions to WebDAV
     538                  * RFC 7238: Permanent Redirect
     539                  * RFC 2295: Transparent Content Negotiation in HTTP
     540                  * RFC 2774: An HTTP Extension Framework
     541                  * RFC 7725: An HTTP Status Code to Report Legal Obstacles
     542                  * RFC 7540: Hypertext Transfer Protocol Version 2 (HTTP/2)
     543                  * RFC 2324: Hyper Text Coffee Pot Control Protocol (HTCPCP/1.0)
     544                  * RFC 8297: An HTTP Status Code for Indicating Hints
     545                  * RFC 8470: Using Early Data in HTTP
     546              """
     547              def __new__(cls, value, phrase, description=''):
     548                  obj = int.__new__(cls, value)
     549                  obj._value_ = value
     550  
     551                  obj.phrase = phrase
     552                  obj.description = description
     553                  return obj
     554  
     555              @property
     556              def is_informational(self):
     557                  return 100 <= self <= 199
     558  
     559              @property
     560              def is_success(self):
     561                  return 200 <= self <= 299
     562  
     563              @property
     564              def is_redirection(self):
     565                  return 300 <= self <= 399
     566  
     567              @property
     568              def is_client_error(self):
     569                  return 400 <= self <= 499
     570  
     571              @property
     572              def is_server_error(self):
     573                  return 500 <= self <= 599
     574  
     575              # informational
     576              CONTINUE = 100, 'Continue', 'Request received, please continue'
     577              SWITCHING_PROTOCOLS = (101, 'Switching Protocols',
     578                      'Switching to new protocol; obey Upgrade header')
     579              PROCESSING = 102, 'Processing'
     580              EARLY_HINTS = 103, 'Early Hints'
     581              # success
     582              OK = 200, 'OK', 'Request fulfilled, document follows'
     583              CREATED = 201, 'Created', 'Document created, URL follows'
     584              ACCEPTED = (202, 'Accepted',
     585                  'Request accepted, processing continues off-line')
     586              NON_AUTHORITATIVE_INFORMATION = (203,
     587                  'Non-Authoritative Information', 'Request fulfilled from cache')
     588              NO_CONTENT = 204, 'No Content', 'Request fulfilled, nothing follows'
     589              RESET_CONTENT = 205, 'Reset Content', 'Clear input form for further input'
     590              PARTIAL_CONTENT = 206, 'Partial Content', 'Partial content follows'
     591              MULTI_STATUS = 207, 'Multi-Status'
     592              ALREADY_REPORTED = 208, 'Already Reported'
     593              IM_USED = 226, 'IM Used'
     594              # redirection
     595              MULTIPLE_CHOICES = (300, 'Multiple Choices',
     596                  'Object has several resources -- see URI list')
     597              MOVED_PERMANENTLY = (301, 'Moved Permanently',
     598                  'Object moved permanently -- see URI list')
     599              FOUND = 302, 'Found', 'Object moved temporarily -- see URI list'
     600              SEE_OTHER = 303, 'See Other', 'Object moved -- see Method and URL list'
     601              NOT_MODIFIED = (304, 'Not Modified',
     602                  'Document has not changed since given time')
     603              USE_PROXY = (305, 'Use Proxy',
     604                  'You must use proxy specified in Location to access this resource')
     605              TEMPORARY_REDIRECT = (307, 'Temporary Redirect',
     606                  'Object moved temporarily -- see URI list')
     607              PERMANENT_REDIRECT = (308, 'Permanent Redirect',
     608                  'Object moved permanently -- see URI list')
     609              # client error
     610              BAD_REQUEST = (400, 'Bad Request',
     611                  'Bad request syntax or unsupported method')
     612              UNAUTHORIZED = (401, 'Unauthorized',
     613                  'No permission -- see authorization schemes')
     614              PAYMENT_REQUIRED = (402, 'Payment Required',
     615                  'No payment -- see charging schemes')
     616              FORBIDDEN = (403, 'Forbidden',
     617                  'Request forbidden -- authorization will not help')
     618              NOT_FOUND = (404, 'Not Found',
     619                  'Nothing matches the given URI')
     620              METHOD_NOT_ALLOWED = (405, 'Method Not Allowed',
     621                  'Specified method is invalid for this resource')
     622              NOT_ACCEPTABLE = (406, 'Not Acceptable',
     623                  'URI not available in preferred format')
     624              PROXY_AUTHENTICATION_REQUIRED = (407,
     625                  'Proxy Authentication Required',
     626                  'You must authenticate with this proxy before proceeding')
     627              REQUEST_TIMEOUT = (408, 'Request Timeout',
     628                  'Request timed out; try again later')
     629              CONFLICT = 409, 'Conflict', 'Request conflict'
     630              GONE = (410, 'Gone',
     631                  'URI no longer exists and has been permanently removed')
     632              LENGTH_REQUIRED = (411, 'Length Required',
     633                  'Client must specify Content-Length')
     634              PRECONDITION_FAILED = (412, 'Precondition Failed',
     635                  'Precondition in headers is false')
     636              REQUEST_ENTITY_TOO_LARGE = (413, 'Request Entity Too Large',
     637                  'Entity is too large')
     638              REQUEST_URI_TOO_LONG = (414, 'Request-URI Too Long',
     639                  'URI is too long')
     640              UNSUPPORTED_MEDIA_TYPE = (415, 'Unsupported Media Type',
     641                  'Entity body in unsupported format')
     642              REQUESTED_RANGE_NOT_SATISFIABLE = (416,
     643                  'Requested Range Not Satisfiable',
     644                  'Cannot satisfy request range')
     645              EXPECTATION_FAILED = (417, 'Expectation Failed',
     646                  'Expect condition could not be satisfied')
     647              IM_A_TEAPOT = (418, 'I\'m a Teapot',
     648                  'Server refuses to brew coffee because it is a teapot.')
     649              MISDIRECTED_REQUEST = (421, 'Misdirected Request',
     650                  'Server is not able to produce a response')
     651              UNPROCESSABLE_ENTITY = 422, 'Unprocessable Entity'
     652              LOCKED = 423, 'Locked'
     653              FAILED_DEPENDENCY = 424, 'Failed Dependency'
     654              TOO_EARLY = 425, 'Too Early'
     655              UPGRADE_REQUIRED = 426, 'Upgrade Required'
     656              PRECONDITION_REQUIRED = (428, 'Precondition Required',
     657                  'The origin server requires the request to be conditional')
     658              TOO_MANY_REQUESTS = (429, 'Too Many Requests',
     659                  'The user has sent too many requests in '
     660                  'a given amount of time ("rate limiting")')
     661              REQUEST_HEADER_FIELDS_TOO_LARGE = (431,
     662                  'Request Header Fields Too Large',
     663                  'The server is unwilling to process the request because its header '
     664                  'fields are too large')
     665              UNAVAILABLE_FOR_LEGAL_REASONS = (451,
     666                  'Unavailable For Legal Reasons',
     667                  'The server is denying access to the '
     668                  'resource as a consequence of a legal demand')
     669              # server errors
     670              INTERNAL_SERVER_ERROR = (500, 'Internal Server Error',
     671                  'Server got itself in trouble')
     672              NOT_IMPLEMENTED = (501, 'Not Implemented',
     673                  'Server does not support this operation')
     674              BAD_GATEWAY = (502, 'Bad Gateway',
     675                  'Invalid responses from another server/proxy')
     676              SERVICE_UNAVAILABLE = (503, 'Service Unavailable',
     677                  'The server cannot process the request due to a high load')
     678              GATEWAY_TIMEOUT = (504, 'Gateway Timeout',
     679                  'The gateway server did not receive a timely response')
     680              HTTP_VERSION_NOT_SUPPORTED = (505, 'HTTP Version Not Supported',
     681                  'Cannot fulfill request')
     682              VARIANT_ALSO_NEGOTIATES = 506, 'Variant Also Negotiates'
     683              INSUFFICIENT_STORAGE = 507, 'Insufficient Storage'
     684              LOOP_DETECTED = 508, 'Loop Detected'
     685              NOT_EXTENDED = 510, 'Not Extended'
     686              NETWORK_AUTHENTICATION_REQUIRED = (511,
     687                  'Network Authentication Required',
     688                  'The client needs to authenticate to gain network access')
     689          enum._test_simple_enum(CheckedHTTPStatus, HTTPStatus)
     690  
     691      def test_httpstatus_range(self):
     692          """Checks that the statuses are in the 100-599 range"""
     693  
     694          for member in HTTPStatus.__members__.values():
     695              self.assertGreaterEqual(member, 100)
     696              self.assertLessEqual(member, 599)
     697  
     698      def test_httpstatus_category(self):
     699          """Checks that the statuses belong to the standard categories"""
     700  
     701          categories = (
     702              ((100, 199), "is_informational"),
     703              ((200, 299), "is_success"),
     704              ((300, 399), "is_redirection"),
     705              ((400, 499), "is_client_error"),
     706              ((500, 599), "is_server_error"),
     707          )
     708          for member in HTTPStatus.__members__.values():
     709              for (lower, upper), category in categories:
     710                  category_indicator = getattr(member, category)
     711                  if lower <= member <= upper:
     712                      self.assertTrue(category_indicator)
     713                  else:
     714                      self.assertFalse(category_indicator)
     715  
     716      def test_status_lines(self):
     717          # Test HTTP status lines
     718  
     719          body = "HTTP/1.1 200 Ok\r\n\r\nText"
     720          sock = FakeSocket(body)
     721          resp = client.HTTPResponse(sock)
     722          resp.begin()
     723          self.assertEqual(resp.read(0), b'')  # Issue #20007
     724          self.assertFalse(resp.isclosed())
     725          self.assertFalse(resp.closed)
     726          self.assertEqual(resp.read(), b"Text")
     727          self.assertTrue(resp.isclosed())
     728          self.assertFalse(resp.closed)
     729          resp.close()
     730          self.assertTrue(resp.closed)
     731  
     732          body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
     733          sock = FakeSocket(body)
     734          resp = client.HTTPResponse(sock)
     735          self.assertRaises(client.BadStatusLine, resp.begin)
     736  
     737      def test_bad_status_repr(self):
     738          exc = client.BadStatusLine('')
     739          self.assertEqual(repr(exc), '''BadStatusLine("''")''')
     740  
     741      def test_partial_reads(self):
     742          # if we have Content-Length, HTTPResponse knows when to close itself,
     743          # the same behaviour as when we read the whole thing with read()
     744          body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
     745          sock = FakeSocket(body)
     746          resp = client.HTTPResponse(sock)
     747          resp.begin()
     748          self.assertEqual(resp.read(2), b'Te')
     749          self.assertFalse(resp.isclosed())
     750          self.assertEqual(resp.read(2), b'xt')
     751          self.assertTrue(resp.isclosed())
     752          self.assertFalse(resp.closed)
     753          resp.close()
     754          self.assertTrue(resp.closed)
     755  
     756      def test_mixed_reads(self):
     757          # readline() should update the remaining length, so that read() knows
     758          # how much data is left and does not raise IncompleteRead
     759          body = "HTTP/1.1 200 Ok\r\nContent-Length: 13\r\n\r\nText\r\nAnother"
     760          sock = FakeSocket(body)
     761          resp = client.HTTPResponse(sock)
     762          resp.begin()
     763          self.assertEqual(resp.readline(), b'Text\r\n')
     764          self.assertFalse(resp.isclosed())
     765          self.assertEqual(resp.read(), b'Another')
     766          self.assertTrue(resp.isclosed())
     767          self.assertFalse(resp.closed)
     768          resp.close()
     769          self.assertTrue(resp.closed)
     770  
     771      def test_partial_readintos(self):
     772          # if we have Content-Length, HTTPResponse knows when to close itself,
     773          # the same behaviour as when we read the whole thing with read()
     774          body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
     775          sock = FakeSocket(body)
     776          resp = client.HTTPResponse(sock)
     777          resp.begin()
     778          b = bytearray(2)
     779          n = resp.readinto(b)
     780          self.assertEqual(n, 2)
     781          self.assertEqual(bytes(b), b'Te')
     782          self.assertFalse(resp.isclosed())
     783          n = resp.readinto(b)
     784          self.assertEqual(n, 2)
     785          self.assertEqual(bytes(b), b'xt')
     786          self.assertTrue(resp.isclosed())
     787          self.assertFalse(resp.closed)
     788          resp.close()
     789          self.assertTrue(resp.closed)
     790  
     791      def test_partial_reads_past_end(self):
     792          # if we have Content-Length, clip reads to the end
     793          body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
     794          sock = FakeSocket(body)
     795          resp = client.HTTPResponse(sock)
     796          resp.begin()
     797          self.assertEqual(resp.read(10), b'Text')
     798          self.assertTrue(resp.isclosed())
     799          self.assertFalse(resp.closed)
     800          resp.close()
     801          self.assertTrue(resp.closed)
     802  
     803      def test_partial_readintos_past_end(self):
     804          # if we have Content-Length, clip readintos to the end
     805          body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
     806          sock = FakeSocket(body)
     807          resp = client.HTTPResponse(sock)
     808          resp.begin()
     809          b = bytearray(10)
     810          n = resp.readinto(b)
     811          self.assertEqual(n, 4)
     812          self.assertEqual(bytes(b)[:4], b'Text')
     813          self.assertTrue(resp.isclosed())
     814          self.assertFalse(resp.closed)
     815          resp.close()
     816          self.assertTrue(resp.closed)
     817  
     818      def test_partial_reads_no_content_length(self):
     819          # when no length is present, the socket should be gracefully closed when
     820          # all data was read
     821          body = "HTTP/1.1 200 Ok\r\n\r\nText"
     822          sock = FakeSocket(body)
     823          resp = client.HTTPResponse(sock)
     824          resp.begin()
     825          self.assertEqual(resp.read(2), b'Te')
     826          self.assertFalse(resp.isclosed())
     827          self.assertEqual(resp.read(2), b'xt')
     828          self.assertEqual(resp.read(1), b'')
     829          self.assertTrue(resp.isclosed())
     830          self.assertFalse(resp.closed)
     831          resp.close()
     832          self.assertTrue(resp.closed)
     833  
     834      def test_partial_readintos_no_content_length(self):
     835          # when no length is present, the socket should be gracefully closed when
     836          # all data was read
     837          body = "HTTP/1.1 200 Ok\r\n\r\nText"
     838          sock = FakeSocket(body)
     839          resp = client.HTTPResponse(sock)
     840          resp.begin()
     841          b = bytearray(2)
     842          n = resp.readinto(b)
     843          self.assertEqual(n, 2)
     844          self.assertEqual(bytes(b), b'Te')
     845          self.assertFalse(resp.isclosed())
     846          n = resp.readinto(b)
     847          self.assertEqual(n, 2)
     848          self.assertEqual(bytes(b), b'xt')
     849          n = resp.readinto(b)
     850          self.assertEqual(n, 0)
     851          self.assertTrue(resp.isclosed())
     852  
     853      def test_partial_reads_incomplete_body(self):
     854          # if the server shuts down the connection before the whole
     855          # content-length is delivered, the socket is gracefully closed
     856          body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
     857          sock = FakeSocket(body)
     858          resp = client.HTTPResponse(sock)
     859          resp.begin()
     860          self.assertEqual(resp.read(2), b'Te')
     861          self.assertFalse(resp.isclosed())
     862          self.assertEqual(resp.read(2), b'xt')
     863          self.assertEqual(resp.read(1), b'')
     864          self.assertTrue(resp.isclosed())
     865  
     866      def test_partial_readintos_incomplete_body(self):
     867          # if the server shuts down the connection before the whole
     868          # content-length is delivered, the socket is gracefully closed
     869          body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
     870          sock = FakeSocket(body)
     871          resp = client.HTTPResponse(sock)
     872          resp.begin()
     873          b = bytearray(2)
     874          n = resp.readinto(b)
     875          self.assertEqual(n, 2)
     876          self.assertEqual(bytes(b), b'Te')
     877          self.assertFalse(resp.isclosed())
     878          n = resp.readinto(b)
     879          self.assertEqual(n, 2)
     880          self.assertEqual(bytes(b), b'xt')
     881          n = resp.readinto(b)
     882          self.assertEqual(n, 0)
     883          self.assertTrue(resp.isclosed())
     884          self.assertFalse(resp.closed)
     885          resp.close()
     886          self.assertTrue(resp.closed)
     887  
     888      def test_host_port(self):
     889          # Check invalid host_port
     890  
     891          for hp in ("www.python.org:abc", "user:password@www.python.org"):
     892              self.assertRaises(client.InvalidURL, client.HTTPConnection, hp)
     893  
     894          for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
     895                            "fe80::207:e9ff:fe9b", 8000),
     896                           ("www.python.org:80", "www.python.org", 80),
     897                           ("www.python.org:", "www.python.org", 80),
     898                           ("www.python.org", "www.python.org", 80),
     899                           ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80),
     900                           ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 80)):
     901              c = client.HTTPConnection(hp)
     902              self.assertEqual(h, c.host)
     903              self.assertEqual(p, c.port)
     904  
     905      def test_response_headers(self):
     906          # test response with multiple message headers with the same field name.
     907          text = ('HTTP/1.1 200 OK\r\n'
     908                  'Set-Cookie: Customer="WILE_E_COYOTE"; '
     909                  'Version="1"; Path="/acme"\r\n'
     910                  'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
     911                  ' Path="/acme"\r\n'
     912                  '\r\n'
     913                  'No body\r\n')
     914          hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
     915                 ', '
     916                 'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
     917          s = FakeSocket(text)
     918          r = client.HTTPResponse(s)
     919          r.begin()
     920          cookies = r.getheader("Set-Cookie")
     921          self.assertEqual(cookies, hdr)
     922  
     923      def test_read_head(self):
     924          # Test that the library doesn't attempt to read any data
     925          # from a HEAD request.  (Tickles SF bug #622042.)
     926          sock = FakeSocket(
     927              'HTTP/1.1 200 OK\r\n'
     928              'Content-Length: 14432\r\n'
     929              '\r\n',
     930              NoEOFBytesIO)
     931          resp = client.HTTPResponse(sock, method="HEAD")
     932          resp.begin()
     933          if resp.read():
     934              self.fail("Did not expect response from HEAD request")
     935  
     936      def test_readinto_head(self):
     937          # Test that the library doesn't attempt to read any data
     938          # from a HEAD request.  (Tickles SF bug #622042.)
     939          sock = FakeSocket(
     940              'HTTP/1.1 200 OK\r\n'
     941              'Content-Length: 14432\r\n'
     942              '\r\n',
     943              NoEOFBytesIO)
     944          resp = client.HTTPResponse(sock, method="HEAD")
     945          resp.begin()
     946          b = bytearray(5)
     947          if resp.readinto(b) != 0:
     948              self.fail("Did not expect response from HEAD request")
     949          self.assertEqual(bytes(b), b'\x00'*5)
     950  
     951      def test_too_many_headers(self):
     952          headers = '\r\n'.join('Header%d: foo' % i
     953                                for i in range(client._MAXHEADERS + 1)) + '\r\n'
     954          text = ('HTTP/1.1 200 OK\r\n' + headers)
     955          s = FakeSocket(text)
     956          r = client.HTTPResponse(s)
     957          self.assertRaisesRegex(client.HTTPException,
     958                                 r"got more than \d+ headers", r.begin)
     959  
     960      def test_send_file(self):
     961          expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n'
     962                      b'Accept-Encoding: identity\r\n'
     963                      b'Transfer-Encoding: chunked\r\n'
     964                      b'\r\n')
     965  
     966          with open(__file__, 'rb') as body:
     967              conn = client.HTTPConnection('example.com')
     968              sock = FakeSocket(body)
     969              conn.sock = sock
     970              conn.request('GET', '/foo', body)
     971              self.assertTrue(sock.data.startswith(expected), '%r != %r' %
     972                      (sock.data[:len(expected)], expected))
     973  
     974      def test_send(self):
     975          expected = b'this is a test this is only a test'
     976          conn = client.HTTPConnection('example.com')
     977          sock = FakeSocket(None)
     978          conn.sock = sock
     979          conn.send(expected)
     980          self.assertEqual(expected, sock.data)
     981          sock.data = b''
     982          conn.send(array.array('b', expected))
     983          self.assertEqual(expected, sock.data)
     984          sock.data = b''
     985          conn.send(io.BytesIO(expected))
     986          self.assertEqual(expected, sock.data)
     987  
     988      def test_send_updating_file(self):
     989          def data():
     990              yield 'data'
     991              yield None
     992              yield 'data_two'
     993  
     994          class ESC[4;38;5;81mUpdatingFile(ESC[4;38;5;149mioESC[4;38;5;149m.ESC[4;38;5;149mTextIOBase):
     995              mode = 'r'
     996              d = data()
     997              def read(self, blocksize=-1):
     998                  return next(self.d)
     999  
    1000          expected = b'data'
    1001  
    1002          conn = client.HTTPConnection('example.com')
    1003          sock = FakeSocket("")
    1004          conn.sock = sock
    1005          conn.send(UpdatingFile())
    1006          self.assertEqual(sock.data, expected)
    1007  
    1008  
    1009      def test_send_iter(self):
    1010          expected = b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \
    1011                     b'Accept-Encoding: identity\r\nContent-Length: 11\r\n' \
    1012                     b'\r\nonetwothree'
    1013  
    1014          def body():
    1015              yield b"one"
    1016              yield b"two"
    1017              yield b"three"
    1018  
    1019          conn = client.HTTPConnection('example.com')
    1020          sock = FakeSocket("")
    1021          conn.sock = sock
    1022          conn.request('GET', '/foo', body(), {'Content-Length': '11'})
    1023          self.assertEqual(sock.data, expected)
    1024  
    1025      def test_blocksize_request(self):
    1026          """Check that request() respects the configured block size."""
    1027          blocksize = 8  # For easy debugging.
    1028          conn = client.HTTPConnection('example.com', blocksize=blocksize)
    1029          sock = FakeSocket(None)
    1030          conn.sock = sock
    1031          expected = b"a" * blocksize + b"b"
    1032          conn.request("PUT", "/", io.BytesIO(expected), {"Content-Length": "9"})
    1033          self.assertEqual(sock.sendall_calls, 3)
    1034          body = sock.data.split(b"\r\n\r\n", 1)[1]
    1035          self.assertEqual(body, expected)
    1036  
    1037      def test_blocksize_send(self):
    1038          """Check that send() respects the configured block size."""
    1039          blocksize = 8  # For easy debugging.
    1040          conn = client.HTTPConnection('example.com', blocksize=blocksize)
    1041          sock = FakeSocket(None)
    1042          conn.sock = sock
    1043          expected = b"a" * blocksize + b"b"
    1044          conn.send(io.BytesIO(expected))
    1045          self.assertEqual(sock.sendall_calls, 2)
    1046          self.assertEqual(sock.data, expected)
    1047  
    1048      def test_send_type_error(self):
    1049          # See: Issue #12676
    1050          conn = client.HTTPConnection('example.com')
    1051          conn.sock = FakeSocket('')
    1052          with self.assertRaises(TypeError):
    1053              conn.request('POST', 'test', conn)
    1054  
    1055      def test_chunked(self):
    1056          expected = chunked_expected
    1057          sock = FakeSocket(chunked_start + last_chunk + chunked_end)
    1058          resp = client.HTTPResponse(sock, method="GET")
    1059          resp.begin()
    1060          self.assertEqual(resp.read(), expected)
    1061          resp.close()
    1062  
    1063          # Various read sizes
    1064          for n in range(1, 12):
    1065              sock = FakeSocket(chunked_start + last_chunk + chunked_end)
    1066              resp = client.HTTPResponse(sock, method="GET")
    1067              resp.begin()
    1068              self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected)
    1069              resp.close()
    1070  
    1071          for x in ('', 'foo\r\n'):
    1072              sock = FakeSocket(chunked_start + x)
    1073              resp = client.HTTPResponse(sock, method="GET")
    1074              resp.begin()
    1075              try:
    1076                  resp.read()
    1077              except client.IncompleteRead as i:
    1078                  self.assertEqual(i.partial, expected)
    1079                  expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
    1080                  self.assertEqual(repr(i), expected_message)
    1081                  self.assertEqual(str(i), expected_message)
    1082              else:
    1083                  self.fail('IncompleteRead expected')
    1084              finally:
    1085                  resp.close()
    1086  
    1087      def test_readinto_chunked(self):
    1088  
    1089          expected = chunked_expected
    1090          nexpected = len(expected)
    1091          b = bytearray(128)
    1092  
    1093          sock = FakeSocket(chunked_start + last_chunk + chunked_end)
    1094          resp = client.HTTPResponse(sock, method="GET")
    1095          resp.begin()
    1096          n = resp.readinto(b)
    1097          self.assertEqual(b[:nexpected], expected)
    1098          self.assertEqual(n, nexpected)
    1099          resp.close()
    1100  
    1101          # Various read sizes
    1102          for n in range(1, 12):
    1103              sock = FakeSocket(chunked_start + last_chunk + chunked_end)
    1104              resp = client.HTTPResponse(sock, method="GET")
    1105              resp.begin()
    1106              m = memoryview(b)
    1107              i = resp.readinto(m[0:n])
    1108              i += resp.readinto(m[i:n + i])
    1109              i += resp.readinto(m[i:])
    1110              self.assertEqual(b[:nexpected], expected)
    1111              self.assertEqual(i, nexpected)
    1112              resp.close()
    1113  
    1114          for x in ('', 'foo\r\n'):
    1115              sock = FakeSocket(chunked_start + x)
    1116              resp = client.HTTPResponse(sock, method="GET")
    1117              resp.begin()
    1118              try:
    1119                  n = resp.readinto(b)
    1120              except client.IncompleteRead as i:
    1121                  self.assertEqual(i.partial, expected)
    1122                  expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
    1123                  self.assertEqual(repr(i), expected_message)
    1124                  self.assertEqual(str(i), expected_message)
    1125              else:
    1126                  self.fail('IncompleteRead expected')
    1127              finally:
    1128                  resp.close()
    1129  
    1130      def test_chunked_head(self):
    1131          chunked_start = (
    1132              'HTTP/1.1 200 OK\r\n'
    1133              'Transfer-Encoding: chunked\r\n\r\n'
    1134              'a\r\n'
    1135              'hello world\r\n'
    1136              '1\r\n'
    1137              'd\r\n'
    1138          )
    1139          sock = FakeSocket(chunked_start + last_chunk + chunked_end)
    1140          resp = client.HTTPResponse(sock, method="HEAD")
    1141          resp.begin()
    1142          self.assertEqual(resp.read(), b'')
    1143          self.assertEqual(resp.status, 200)
    1144          self.assertEqual(resp.reason, 'OK')
    1145          self.assertTrue(resp.isclosed())
    1146          self.assertFalse(resp.closed)
    1147          resp.close()
    1148          self.assertTrue(resp.closed)
    1149  
    1150      def test_readinto_chunked_head(self):
    1151          chunked_start = (
    1152              'HTTP/1.1 200 OK\r\n'
    1153              'Transfer-Encoding: chunked\r\n\r\n'
    1154              'a\r\n'
    1155              'hello world\r\n'
    1156              '1\r\n'
    1157              'd\r\n'
    1158          )
    1159          sock = FakeSocket(chunked_start + last_chunk + chunked_end)
    1160          resp = client.HTTPResponse(sock, method="HEAD")
    1161          resp.begin()
    1162          b = bytearray(5)
    1163          n = resp.readinto(b)
    1164          self.assertEqual(n, 0)
    1165          self.assertEqual(bytes(b), b'\x00'*5)
    1166          self.assertEqual(resp.status, 200)
    1167          self.assertEqual(resp.reason, 'OK')
    1168          self.assertTrue(resp.isclosed())
    1169          self.assertFalse(resp.closed)
    1170          resp.close()
    1171          self.assertTrue(resp.closed)
    1172  
    1173      def test_negative_content_length(self):
    1174          sock = FakeSocket(
    1175              'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n')
    1176          resp = client.HTTPResponse(sock, method="GET")
    1177          resp.begin()
    1178          self.assertEqual(resp.read(), b'Hello\r\n')
    1179          self.assertTrue(resp.isclosed())
    1180  
    1181      def test_incomplete_read(self):
    1182          sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
    1183          resp = client.HTTPResponse(sock, method="GET")
    1184          resp.begin()
    1185          try:
    1186              resp.read()
    1187          except client.IncompleteRead as i:
    1188              self.assertEqual(i.partial, b'Hello\r\n')
    1189              self.assertEqual(repr(i),
    1190                               "IncompleteRead(7 bytes read, 3 more expected)")
    1191              self.assertEqual(str(i),
    1192                               "IncompleteRead(7 bytes read, 3 more expected)")
    1193              self.assertTrue(resp.isclosed())
    1194          else:
    1195              self.fail('IncompleteRead expected')
    1196  
    1197      def test_epipe(self):
    1198          sock = EPipeSocket(
    1199              "HTTP/1.0 401 Authorization Required\r\n"
    1200              "Content-type: text/html\r\n"
    1201              "WWW-Authenticate: Basic realm=\"example\"\r\n",
    1202              b"Content-Length")
    1203          conn = client.HTTPConnection("example.com")
    1204          conn.sock = sock
    1205          self.assertRaises(OSError,
    1206                            lambda: conn.request("PUT", "/url", "body"))
    1207          resp = conn.getresponse()
    1208          self.assertEqual(401, resp.status)
    1209          self.assertEqual("Basic realm=\"example\"",
    1210                           resp.getheader("www-authenticate"))
    1211  
    1212      # Test lines overflowing the max line size (_MAXLINE in http.client)
    1213  
    1214      def test_overflowing_status_line(self):
    1215          body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n"
    1216          resp = client.HTTPResponse(FakeSocket(body))
    1217          self.assertRaises((client.LineTooLong, client.BadStatusLine), resp.begin)
    1218  
    1219      def test_overflowing_header_line(self):
    1220          body = (
    1221              'HTTP/1.1 200 OK\r\n'
    1222              'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n'
    1223          )
    1224          resp = client.HTTPResponse(FakeSocket(body))
    1225          self.assertRaises(client.LineTooLong, resp.begin)
    1226  
    1227      def test_overflowing_header_limit_after_100(self):
    1228          body = (
    1229              'HTTP/1.1 100 OK\r\n'
    1230              'r\n' * 32768
    1231          )
    1232          resp = client.HTTPResponse(FakeSocket(body))
    1233          with self.assertRaises(client.HTTPException) as cm:
    1234              resp.begin()
    1235          # We must assert more because other reasonable errors that we
    1236          # do not want can also be HTTPException derived.
    1237          self.assertIn('got more than ', str(cm.exception))
    1238          self.assertIn('headers', str(cm.exception))
    1239  
    1240      def test_overflowing_chunked_line(self):
    1241          body = (
    1242              'HTTP/1.1 200 OK\r\n'
    1243              'Transfer-Encoding: chunked\r\n\r\n'
    1244              + '0' * 65536 + 'a\r\n'
    1245              'hello world\r\n'
    1246              '0\r\n'
    1247              '\r\n'
    1248          )
    1249          resp = client.HTTPResponse(FakeSocket(body))
    1250          resp.begin()
    1251          self.assertRaises(client.LineTooLong, resp.read)
    1252  
    1253      def test_early_eof(self):
    1254          # Test httpresponse with no \r\n termination,
    1255          body = "HTTP/1.1 200 Ok"
    1256          sock = FakeSocket(body)
    1257          resp = client.HTTPResponse(sock)
    1258          resp.begin()
    1259          self.assertEqual(resp.read(), b'')
    1260          self.assertTrue(resp.isclosed())
    1261          self.assertFalse(resp.closed)
    1262          resp.close()
    1263          self.assertTrue(resp.closed)
    1264  
    1265      def test_error_leak(self):
    1266          # Test that the socket is not leaked if getresponse() fails
    1267          conn = client.HTTPConnection('example.com')
    1268          response = None
    1269          class ESC[4;38;5;81mResponse(ESC[4;38;5;149mclientESC[4;38;5;149m.ESC[4;38;5;149mHTTPResponse):
    1270              def __init__(self, *pos, **kw):
    1271                  nonlocal response
    1272                  response = self  # Avoid garbage collector closing the socket
    1273                  client.HTTPResponse.__init__(self, *pos, **kw)
    1274          conn.response_class = Response
    1275          conn.sock = FakeSocket('Invalid status line')
    1276          conn.request('GET', '/')
    1277          self.assertRaises(client.BadStatusLine, conn.getresponse)
    1278          self.assertTrue(response.closed)
    1279          self.assertTrue(conn.sock.file_closed)
    1280  
    1281      def test_chunked_extension(self):
    1282          extra = '3;foo=bar\r\n' + 'abc\r\n'
    1283          expected = chunked_expected + b'abc'
    1284  
    1285          sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end)
    1286          resp = client.HTTPResponse(sock, method="GET")
    1287          resp.begin()
    1288          self.assertEqual(resp.read(), expected)
    1289          resp.close()
    1290  
    1291      def test_chunked_missing_end(self):
    1292          """some servers may serve up a short chunked encoding stream"""
    1293          expected = chunked_expected
    1294          sock = FakeSocket(chunked_start + last_chunk)  #no terminating crlf
    1295          resp = client.HTTPResponse(sock, method="GET")
    1296          resp.begin()
    1297          self.assertEqual(resp.read(), expected)
    1298          resp.close()
    1299  
    1300      def test_chunked_trailers(self):
    1301          """See that trailers are read and ignored"""
    1302          expected = chunked_expected
    1303          sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end)
    1304          resp = client.HTTPResponse(sock, method="GET")
    1305          resp.begin()
    1306          self.assertEqual(resp.read(), expected)
    1307          # we should have reached the end of the file
    1308          self.assertEqual(sock.file.read(), b"") #we read to the end
    1309          resp.close()
    1310  
    1311      def test_chunked_sync(self):
    1312          """Check that we don't read past the end of the chunked-encoding stream"""
    1313          expected = chunked_expected
    1314          extradata = "extradata"
    1315          sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata)
    1316          resp = client.HTTPResponse(sock, method="GET")
    1317          resp.begin()
    1318          self.assertEqual(resp.read(), expected)
    1319          # the file should now have our extradata ready to be read
    1320          self.assertEqual(sock.file.read(), extradata.encode("ascii")) #we read to the end
    1321          resp.close()
    1322  
    1323      def test_content_length_sync(self):
    1324          """Check that we don't read past the end of the Content-Length stream"""
    1325          extradata = b"extradata"
    1326          expected = b"Hello123\r\n"
    1327          sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
    1328          resp = client.HTTPResponse(sock, method="GET")
    1329          resp.begin()
    1330          self.assertEqual(resp.read(), expected)
    1331          # the file should now have our extradata ready to be read
    1332          self.assertEqual(sock.file.read(), extradata) #we read to the end
    1333          resp.close()
    1334  
    1335      def test_readlines_content_length(self):
    1336          extradata = b"extradata"
    1337          expected = b"Hello123\r\n"
    1338          sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
    1339          resp = client.HTTPResponse(sock, method="GET")
    1340          resp.begin()
    1341          self.assertEqual(resp.readlines(2000), [expected])
    1342          # the file should now have our extradata ready to be read
    1343          self.assertEqual(sock.file.read(), extradata) #we read to the end
    1344          resp.close()
    1345  
    1346      def test_read1_content_length(self):
    1347          extradata = b"extradata"
    1348          expected = b"Hello123\r\n"
    1349          sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
    1350          resp = client.HTTPResponse(sock, method="GET")
    1351          resp.begin()
    1352          self.assertEqual(resp.read1(2000), expected)
    1353          # the file should now have our extradata ready to be read
    1354          self.assertEqual(sock.file.read(), extradata) #we read to the end
    1355          resp.close()
    1356  
    1357      def test_readline_bound_content_length(self):
    1358          extradata = b"extradata"
    1359          expected = b"Hello123\r\n"
    1360          sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
    1361          resp = client.HTTPResponse(sock, method="GET")
    1362          resp.begin()
    1363          self.assertEqual(resp.readline(10), expected)
    1364          self.assertEqual(resp.readline(10), b"")
    1365          # the file should now have our extradata ready to be read
    1366          self.assertEqual(sock.file.read(), extradata) #we read to the end
    1367          resp.close()
    1368  
    1369      def test_read1_bound_content_length(self):
    1370          extradata = b"extradata"
    1371          expected = b"Hello123\r\n"
    1372          sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 30\r\n\r\n' + expected*3 + extradata)
    1373          resp = client.HTTPResponse(sock, method="GET")
    1374          resp.begin()
    1375          self.assertEqual(resp.read1(20), expected*2)
    1376          self.assertEqual(resp.read(), expected)
    1377          # the file should now have our extradata ready to be read
    1378          self.assertEqual(sock.file.read(), extradata) #we read to the end
    1379          resp.close()
    1380  
    1381      def test_response_fileno(self):
    1382          # Make sure fd returned by fileno is valid.
    1383          serv = socket.create_server((HOST, 0))
    1384          self.addCleanup(serv.close)
    1385  
    1386          result = None
    1387          def run_server():
    1388              [conn, address] = serv.accept()
    1389              with conn, conn.makefile("rb") as reader:
    1390                  # Read the request header until a blank line
    1391                  while True:
    1392                      line = reader.readline()
    1393                      if not line.rstrip(b"\r\n"):
    1394                          break
    1395                  conn.sendall(b"HTTP/1.1 200 Connection established\r\n\r\n")
    1396                  nonlocal result
    1397                  result = reader.read()
    1398  
    1399          thread = threading.Thread(target=run_server)
    1400          thread.start()
    1401          self.addCleanup(thread.join, float(1))
    1402          conn = client.HTTPConnection(*serv.getsockname())
    1403          conn.request("CONNECT", "dummy:1234")
    1404          response = conn.getresponse()
    1405          try:
    1406              self.assertEqual(response.status, client.OK)
    1407              s = socket.socket(fileno=response.fileno())
    1408              try:
    1409                  s.sendall(b"proxied data\n")
    1410              finally:
    1411                  s.detach()
    1412          finally:
    1413              response.close()
    1414              conn.close()
    1415          thread.join()
    1416          self.assertEqual(result, b"proxied data\n")
    1417  
    1418      def test_putrequest_override_domain_validation(self):
    1419          """
    1420          It should be possible to override the default validation
    1421          behavior in putrequest (bpo-38216).
    1422          """
    1423          class ESC[4;38;5;81mUnsafeHTTPConnection(ESC[4;38;5;149mclientESC[4;38;5;149m.ESC[4;38;5;149mHTTPConnection):
    1424              def _validate_path(self, url):
    1425                  pass
    1426  
    1427          conn = UnsafeHTTPConnection('example.com')
    1428          conn.sock = FakeSocket('')
    1429          conn.putrequest('GET', '/\x00')
    1430  
    1431      def test_putrequest_override_host_validation(self):
    1432          class ESC[4;38;5;81mUnsafeHTTPConnection(ESC[4;38;5;149mclientESC[4;38;5;149m.ESC[4;38;5;149mHTTPConnection):
    1433              def _validate_host(self, url):
    1434                  pass
    1435  
    1436          conn = UnsafeHTTPConnection('example.com\r\n')
    1437          conn.sock = FakeSocket('')
    1438          # set skip_host so a ValueError is not raised upon adding the
    1439          # invalid URL as the value of the "Host:" header
    1440          conn.putrequest('GET', '/', skip_host=1)
    1441  
    1442      def test_putrequest_override_encoding(self):
    1443          """
    1444          It should be possible to override the default encoding
    1445          to transmit bytes in another encoding even if invalid
    1446          (bpo-36274).
    1447          """
    1448          class ESC[4;38;5;81mUnsafeHTTPConnection(ESC[4;38;5;149mclientESC[4;38;5;149m.ESC[4;38;5;149mHTTPConnection):
    1449              def _encode_request(self, str_url):
    1450                  return str_url.encode('utf-8')
    1451  
    1452          conn = UnsafeHTTPConnection('example.com')
    1453          conn.sock = FakeSocket('')
    1454          conn.putrequest('GET', '/☃')
    1455  
    1456  
    1457  class ESC[4;38;5;81mExtendedReadTest(ESC[4;38;5;149mTestCase):
    1458      """
    1459      Test peek(), read1(), readline()
    1460      """
    1461      lines = (
    1462          'HTTP/1.1 200 OK\r\n'
    1463          '\r\n'
    1464          'hello world!\n'
    1465          'and now \n'
    1466          'for something completely different\n'
    1467          'foo'
    1468          )
    1469      lines_expected = lines[lines.find('hello'):].encode("ascii")
    1470      lines_chunked = (
    1471          'HTTP/1.1 200 OK\r\n'
    1472          'Transfer-Encoding: chunked\r\n\r\n'
    1473          'a\r\n'
    1474          'hello worl\r\n'
    1475          '3\r\n'
    1476          'd!\n\r\n'
    1477          '9\r\n'
    1478          'and now \n\r\n'
    1479          '23\r\n'
    1480          'for something completely different\n\r\n'
    1481          '3\r\n'
    1482          'foo\r\n'
    1483          '0\r\n' # terminating chunk
    1484          '\r\n'  # end of trailers
    1485      )
    1486  
    1487      def setUp(self):
    1488          sock = FakeSocket(self.lines)
    1489          resp = client.HTTPResponse(sock, method="GET")
    1490          resp.begin()
    1491          resp.fp = io.BufferedReader(resp.fp)
    1492          self.resp = resp
    1493  
    1494  
    1495  
    1496      def test_peek(self):
    1497          resp = self.resp
    1498          # patch up the buffered peek so that it returns not too much stuff
    1499          oldpeek = resp.fp.peek
    1500          def mypeek(n=-1):
    1501              p = oldpeek(n)
    1502              if n >= 0:
    1503                  return p[:n]
    1504              return p[:10]
    1505          resp.fp.peek = mypeek
    1506  
    1507          all = []
    1508          while True:
    1509              # try a short peek
    1510              p = resp.peek(3)
    1511              if p:
    1512                  self.assertGreater(len(p), 0)
    1513                  # then unbounded peek
    1514                  p2 = resp.peek()
    1515                  self.assertGreaterEqual(len(p2), len(p))
    1516                  self.assertTrue(p2.startswith(p))
    1517                  next = resp.read(len(p2))
    1518                  self.assertEqual(next, p2)
    1519              else:
    1520                  next = resp.read()
    1521                  self.assertFalse(next)
    1522              all.append(next)
    1523              if not next:
    1524                  break
    1525          self.assertEqual(b"".join(all), self.lines_expected)
    1526  
    1527      def test_readline(self):
    1528          resp = self.resp
    1529          self._verify_readline(self.resp.readline, self.lines_expected)
    1530  
    1531      def _verify_readline(self, readline, expected):
    1532          all = []
    1533          while True:
    1534              # short readlines
    1535              line = readline(5)
    1536              if line and line != b"foo":
    1537                  if len(line) < 5:
    1538                      self.assertTrue(line.endswith(b"\n"))
    1539              all.append(line)
    1540              if not line:
    1541                  break
    1542          self.assertEqual(b"".join(all), expected)
    1543  
    1544      def test_read1(self):
    1545          resp = self.resp
    1546          def r():
    1547              res = resp.read1(4)
    1548              self.assertLessEqual(len(res), 4)
    1549              return res
    1550          readliner = Readliner(r)
    1551          self._verify_readline(readliner.readline, self.lines_expected)
    1552  
    1553      def test_read1_unbounded(self):
    1554          resp = self.resp
    1555          all = []
    1556          while True:
    1557              data = resp.read1()
    1558              if not data:
    1559                  break
    1560              all.append(data)
    1561          self.assertEqual(b"".join(all), self.lines_expected)
    1562  
    1563      def test_read1_bounded(self):
    1564          resp = self.resp
    1565          all = []
    1566          while True:
    1567              data = resp.read1(10)
    1568              if not data:
    1569                  break
    1570              self.assertLessEqual(len(data), 10)
    1571              all.append(data)
    1572          self.assertEqual(b"".join(all), self.lines_expected)
    1573  
    1574      def test_read1_0(self):
    1575          self.assertEqual(self.resp.read1(0), b"")
    1576  
    1577      def test_peek_0(self):
    1578          p = self.resp.peek(0)
    1579          self.assertLessEqual(0, len(p))
    1580  
    1581  
    1582  class ESC[4;38;5;81mExtendedReadTestChunked(ESC[4;38;5;149mExtendedReadTest):
    1583      """
    1584      Test peek(), read1(), readline() in chunked mode
    1585      """
    1586      lines = (
    1587          'HTTP/1.1 200 OK\r\n'
    1588          'Transfer-Encoding: chunked\r\n\r\n'
    1589          'a\r\n'
    1590          'hello worl\r\n'
    1591          '3\r\n'
    1592          'd!\n\r\n'
    1593          '9\r\n'
    1594          'and now \n\r\n'
    1595          '23\r\n'
    1596          'for something completely different\n\r\n'
    1597          '3\r\n'
    1598          'foo\r\n'
    1599          '0\r\n' # terminating chunk
    1600          '\r\n'  # end of trailers
    1601      )
    1602  
    1603  
    1604  class ESC[4;38;5;81mReadliner:
    1605      """
    1606      a simple readline class that uses an arbitrary read function and buffering
    1607      """
    1608      def __init__(self, readfunc):
    1609          self.readfunc = readfunc
    1610          self.remainder = b""
    1611  
    1612      def readline(self, limit):
    1613          data = []
    1614          datalen = 0
    1615          read = self.remainder
    1616          try:
    1617              while True:
    1618                  idx = read.find(b'\n')
    1619                  if idx != -1:
    1620                      break
    1621                  if datalen + len(read) >= limit:
    1622                      idx = limit - datalen - 1
    1623                  # read more data
    1624                  data.append(read)
    1625                  read = self.readfunc()
    1626                  if not read:
    1627                      idx = 0 #eof condition
    1628                      break
    1629              idx += 1
    1630              data.append(read[:idx])
    1631              self.remainder = read[idx:]
    1632              return b"".join(data)
    1633          except:
    1634              self.remainder = b"".join(data)
    1635              raise
    1636  
    1637  
    1638  class ESC[4;38;5;81mOfflineTest(ESC[4;38;5;149mTestCase):
    1639      def test_all(self):
    1640          # Documented objects defined in the module should be in __all__
    1641          expected = {"responses"}  # Allowlist documented dict() object
    1642          # HTTPMessage, parse_headers(), and the HTTP status code constants are
    1643          # intentionally omitted for simplicity
    1644          denylist = {"HTTPMessage", "parse_headers"}
    1645          for name in dir(client):
    1646              if name.startswith("_") or name in denylist:
    1647                  continue
    1648              module_object = getattr(client, name)
    1649              if getattr(module_object, "__module__", None) == "http.client":
    1650                  expected.add(name)
    1651          self.assertCountEqual(client.__all__, expected)
    1652  
    1653      def test_responses(self):
    1654          self.assertEqual(client.responses[client.NOT_FOUND], "Not Found")
    1655  
    1656      def test_client_constants(self):
    1657          # Make sure we don't break backward compatibility with 3.4
    1658          expected = [
    1659              'CONTINUE',
    1660              'SWITCHING_PROTOCOLS',
    1661              'PROCESSING',
    1662              'OK',
    1663              'CREATED',
    1664              'ACCEPTED',
    1665              'NON_AUTHORITATIVE_INFORMATION',
    1666              'NO_CONTENT',
    1667              'RESET_CONTENT',
    1668              'PARTIAL_CONTENT',
    1669              'MULTI_STATUS',
    1670              'IM_USED',
    1671              'MULTIPLE_CHOICES',
    1672              'MOVED_PERMANENTLY',
    1673              'FOUND',
    1674              'SEE_OTHER',
    1675              'NOT_MODIFIED',
    1676              'USE_PROXY',
    1677              'TEMPORARY_REDIRECT',
    1678              'BAD_REQUEST',
    1679              'UNAUTHORIZED',
    1680              'PAYMENT_REQUIRED',
    1681              'FORBIDDEN',
    1682              'NOT_FOUND',
    1683              'METHOD_NOT_ALLOWED',
    1684              'NOT_ACCEPTABLE',
    1685              'PROXY_AUTHENTICATION_REQUIRED',
    1686              'REQUEST_TIMEOUT',
    1687              'CONFLICT',
    1688              'GONE',
    1689              'LENGTH_REQUIRED',
    1690              'PRECONDITION_FAILED',
    1691              'REQUEST_ENTITY_TOO_LARGE',
    1692              'REQUEST_URI_TOO_LONG',
    1693              'UNSUPPORTED_MEDIA_TYPE',
    1694              'REQUESTED_RANGE_NOT_SATISFIABLE',
    1695              'EXPECTATION_FAILED',
    1696              'IM_A_TEAPOT',
    1697              'MISDIRECTED_REQUEST',
    1698              'UNPROCESSABLE_ENTITY',
    1699              'LOCKED',
    1700              'FAILED_DEPENDENCY',
    1701              'UPGRADE_REQUIRED',
    1702              'PRECONDITION_REQUIRED',
    1703              'TOO_MANY_REQUESTS',
    1704              'REQUEST_HEADER_FIELDS_TOO_LARGE',
    1705              'UNAVAILABLE_FOR_LEGAL_REASONS',
    1706              'INTERNAL_SERVER_ERROR',
    1707              'NOT_IMPLEMENTED',
    1708              'BAD_GATEWAY',
    1709              'SERVICE_UNAVAILABLE',
    1710              'GATEWAY_TIMEOUT',
    1711              'HTTP_VERSION_NOT_SUPPORTED',
    1712              'INSUFFICIENT_STORAGE',
    1713              'NOT_EXTENDED',
    1714              'NETWORK_AUTHENTICATION_REQUIRED',
    1715              'EARLY_HINTS',
    1716              'TOO_EARLY'
    1717          ]
    1718          for const in expected:
    1719              with self.subTest(constant=const):
    1720                  self.assertTrue(hasattr(client, const))
    1721  
    1722  
    1723  class ESC[4;38;5;81mSourceAddressTest(ESC[4;38;5;149mTestCase):
    1724      def setUp(self):
    1725          self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    1726          self.port = socket_helper.bind_port(self.serv)
    1727          self.source_port = socket_helper.find_unused_port()
    1728          self.serv.listen()
    1729          self.conn = None
    1730  
    1731      def tearDown(self):
    1732          if self.conn:
    1733              self.conn.close()
    1734              self.conn = None
    1735          self.serv.close()
    1736          self.serv = None
    1737  
    1738      def testHTTPConnectionSourceAddress(self):
    1739          self.conn = client.HTTPConnection(HOST, self.port,
    1740                  source_address=('', self.source_port))
    1741          self.conn.connect()
    1742          self.assertEqual(self.conn.sock.getsockname()[1], self.source_port)
    1743  
    1744      @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
    1745                       'http.client.HTTPSConnection not defined')
    1746      def testHTTPSConnectionSourceAddress(self):
    1747          self.conn = client.HTTPSConnection(HOST, self.port,
    1748                  source_address=('', self.source_port))
    1749          # We don't test anything here other than the constructor not barfing as
    1750          # this code doesn't deal with setting up an active running SSL server
    1751          # for an ssl_wrapped connect() to actually return from.
    1752  
    1753  
    1754  class ESC[4;38;5;81mTimeoutTest(ESC[4;38;5;149mTestCase):
    1755      PORT = None
    1756  
    1757      def setUp(self):
    1758          self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    1759          TimeoutTest.PORT = socket_helper.bind_port(self.serv)
    1760          self.serv.listen()
    1761  
    1762      def tearDown(self):
    1763          self.serv.close()
    1764          self.serv = None
    1765  
    1766      def testTimeoutAttribute(self):
    1767          # This will prove that the timeout gets through HTTPConnection
    1768          # and into the socket.
    1769  
    1770          # default -- use global socket timeout
    1771          self.assertIsNone(socket.getdefaulttimeout())
    1772          socket.setdefaulttimeout(30)
    1773          try:
    1774              httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT)
    1775              httpConn.connect()
    1776          finally:
    1777              socket.setdefaulttimeout(None)
    1778          self.assertEqual(httpConn.sock.gettimeout(), 30)
    1779          httpConn.close()
    1780  
    1781          # no timeout -- do not use global socket default
    1782          self.assertIsNone(socket.getdefaulttimeout())
    1783          socket.setdefaulttimeout(30)
    1784          try:
    1785              httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT,
    1786                                                timeout=None)
    1787              httpConn.connect()
    1788          finally:
    1789              socket.setdefaulttimeout(None)
    1790          self.assertEqual(httpConn.sock.gettimeout(), None)
    1791          httpConn.close()
    1792  
    1793          # a value
    1794          httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
    1795          httpConn.connect()
    1796          self.assertEqual(httpConn.sock.gettimeout(), 30)
    1797          httpConn.close()
    1798  
    1799  
    1800  class ESC[4;38;5;81mPersistenceTest(ESC[4;38;5;149mTestCase):
    1801  
    1802      def test_reuse_reconnect(self):
    1803          # Should reuse or reconnect depending on header from server
    1804          tests = (
    1805              ('1.0', '', False),
    1806              ('1.0', 'Connection: keep-alive\r\n', True),
    1807              ('1.1', '', True),
    1808              ('1.1', 'Connection: close\r\n', False),
    1809              ('1.0', 'Connection: keep-ALIVE\r\n', True),
    1810              ('1.1', 'Connection: cloSE\r\n', False),
    1811          )
    1812          for version, header, reuse in tests:
    1813              with self.subTest(version=version, header=header):
    1814                  msg = (
    1815                      'HTTP/{} 200 OK\r\n'
    1816                      '{}'
    1817                      'Content-Length: 12\r\n'
    1818                      '\r\n'
    1819                      'Dummy body\r\n'
    1820                  ).format(version, header)
    1821                  conn = FakeSocketHTTPConnection(msg)
    1822                  self.assertIsNone(conn.sock)
    1823                  conn.request('GET', '/open-connection')
    1824                  with conn.getresponse() as response:
    1825                      self.assertEqual(conn.sock is None, not reuse)
    1826                      response.read()
    1827                  self.assertEqual(conn.sock is None, not reuse)
    1828                  self.assertEqual(conn.connections, 1)
    1829                  conn.request('GET', '/subsequent-request')
    1830                  self.assertEqual(conn.connections, 1 if reuse else 2)
    1831  
    1832      def test_disconnected(self):
    1833  
    1834          def make_reset_reader(text):
    1835              """Return BufferedReader that raises ECONNRESET at EOF"""
    1836              stream = io.BytesIO(text)
    1837              def readinto(buffer):
    1838                  size = io.BytesIO.readinto(stream, buffer)
    1839                  if size == 0:
    1840                      raise ConnectionResetError()
    1841                  return size
    1842              stream.readinto = readinto
    1843              return io.BufferedReader(stream)
    1844  
    1845          tests = (
    1846              (io.BytesIO, client.RemoteDisconnected),
    1847              (make_reset_reader, ConnectionResetError),
    1848          )
    1849          for stream_factory, exception in tests:
    1850              with self.subTest(exception=exception):
    1851                  conn = FakeSocketHTTPConnection(b'', stream_factory)
    1852                  conn.request('GET', '/eof-response')
    1853                  self.assertRaises(exception, conn.getresponse)
    1854                  self.assertIsNone(conn.sock)
    1855                  # HTTPConnection.connect() should be automatically invoked
    1856                  conn.request('GET', '/reconnect')
    1857                  self.assertEqual(conn.connections, 2)
    1858  
    1859      def test_100_close(self):
    1860          conn = FakeSocketHTTPConnection(
    1861              b'HTTP/1.1 100 Continue\r\n'
    1862              b'\r\n'
    1863              # Missing final response
    1864          )
    1865          conn.request('GET', '/', headers={'Expect': '100-continue'})
    1866          self.assertRaises(client.RemoteDisconnected, conn.getresponse)
    1867          self.assertIsNone(conn.sock)
    1868          conn.request('GET', '/reconnect')
    1869          self.assertEqual(conn.connections, 2)
    1870  
    1871  
    1872  class ESC[4;38;5;81mHTTPSTest(ESC[4;38;5;149mTestCase):
    1873  
    1874      def setUp(self):
    1875          if not hasattr(client, 'HTTPSConnection'):
    1876              self.skipTest('ssl support required')
    1877  
    1878      def make_server(self, certfile):
    1879          from test.ssl_servers import make_https_server
    1880          return make_https_server(self, certfile=certfile)
    1881  
    1882      def test_attributes(self):
    1883          # simple test to check it's storing the timeout
    1884          h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
    1885          self.assertEqual(h.timeout, 30)
    1886  
    1887      def test_networked(self):
    1888          # Default settings: requires a valid cert from a trusted CA
    1889          import ssl
    1890          support.requires('network')
    1891          with socket_helper.transient_internet('self-signed.pythontest.net'):
    1892              h = client.HTTPSConnection('self-signed.pythontest.net', 443)
    1893              with self.assertRaises(ssl.SSLError) as exc_info:
    1894                  h.request('GET', '/')
    1895              self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
    1896  
    1897      def test_networked_noverification(self):
    1898          # Switch off cert verification
    1899          import ssl
    1900          support.requires('network')
    1901          with socket_helper.transient_internet('self-signed.pythontest.net'):
    1902              context = ssl._create_unverified_context()
    1903              h = client.HTTPSConnection('self-signed.pythontest.net', 443,
    1904                                         context=context)
    1905              h.request('GET', '/')
    1906              resp = h.getresponse()
    1907              h.close()
    1908              self.assertIn('nginx', resp.getheader('server'))
    1909              resp.close()
    1910  
    1911      @support.system_must_validate_cert
    1912      def test_networked_trusted_by_default_cert(self):
    1913          # Default settings: requires a valid cert from a trusted CA
    1914          support.requires('network')
    1915          with socket_helper.transient_internet('www.python.org'):
    1916              h = client.HTTPSConnection('www.python.org', 443)
    1917              h.request('GET', '/')
    1918              resp = h.getresponse()
    1919              content_type = resp.getheader('content-type')
    1920              resp.close()
    1921              h.close()
    1922              self.assertIn('text/html', content_type)
    1923  
    1924      def test_networked_good_cert(self):
    1925          # We feed the server's cert as a validating cert
    1926          import ssl
    1927          support.requires('network')
    1928          selfsigned_pythontestdotnet = 'self-signed.pythontest.net'
    1929          with socket_helper.transient_internet(selfsigned_pythontestdotnet):
    1930              context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    1931              self.assertEqual(context.verify_mode, ssl.CERT_REQUIRED)
    1932              self.assertEqual(context.check_hostname, True)
    1933              context.load_verify_locations(CERT_selfsigned_pythontestdotnet)
    1934              try:
    1935                  h = client.HTTPSConnection(selfsigned_pythontestdotnet, 443,
    1936                                             context=context)
    1937                  h.request('GET', '/')
    1938                  resp = h.getresponse()
    1939              except ssl.SSLError as ssl_err:
    1940                  ssl_err_str = str(ssl_err)
    1941                  # In the error message of [SSL: CERTIFICATE_VERIFY_FAILED] on
    1942                  # modern Linux distros (Debian Buster, etc) default OpenSSL
    1943                  # configurations it'll fail saying "key too weak" until we
    1944                  # address https://bugs.python.org/issue36816 to use a proper
    1945                  # key size on self-signed.pythontest.net.
    1946                  if re.search(r'(?i)key.too.weak', ssl_err_str):
    1947                      raise unittest.SkipTest(
    1948                          f'Got {ssl_err_str} trying to connect '
    1949                          f'to {selfsigned_pythontestdotnet}. '
    1950                          'See https://bugs.python.org/issue36816.')
    1951                  raise
    1952              server_string = resp.getheader('server')
    1953              resp.close()
    1954              h.close()
    1955              self.assertIn('nginx', server_string)
    1956  
    1957      @support.requires_resource('walltime')
    1958      def test_networked_bad_cert(self):
    1959          # We feed a "CA" cert that is unrelated to the server's cert
    1960          import ssl
    1961          support.requires('network')
    1962          with socket_helper.transient_internet('self-signed.pythontest.net'):
    1963              context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    1964              context.load_verify_locations(CERT_localhost)
    1965              h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context)
    1966              with self.assertRaises(ssl.SSLError) as exc_info:
    1967                  h.request('GET', '/')
    1968              self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
    1969  
    1970      def test_local_unknown_cert(self):
    1971          # The custom cert isn't known to the default trust bundle
    1972          import ssl
    1973          server = self.make_server(CERT_localhost)
    1974          h = client.HTTPSConnection('localhost', server.port)
    1975          with self.assertRaises(ssl.SSLError) as exc_info:
    1976              h.request('GET', '/')
    1977          self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
    1978  
    1979      def test_local_good_hostname(self):
    1980          # The (valid) cert validates the HTTPS hostname
    1981          import ssl
    1982          server = self.make_server(CERT_localhost)
    1983          context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    1984          context.load_verify_locations(CERT_localhost)
    1985          h = client.HTTPSConnection('localhost', server.port, context=context)
    1986          self.addCleanup(h.close)
    1987          h.request('GET', '/nonexistent')
    1988          resp = h.getresponse()
    1989          self.addCleanup(resp.close)
    1990          self.assertEqual(resp.status, 404)
    1991  
    1992      def test_local_bad_hostname(self):
    1993          # The (valid) cert doesn't validate the HTTPS hostname
    1994          import ssl
    1995          server = self.make_server(CERT_fakehostname)
    1996          context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    1997          context.load_verify_locations(CERT_fakehostname)
    1998          h = client.HTTPSConnection('localhost', server.port, context=context)
    1999          with self.assertRaises(ssl.CertificateError):
    2000              h.request('GET', '/')
    2001  
    2002          # Same with explicit context.check_hostname=True
    2003          context.check_hostname = True
    2004          h = client.HTTPSConnection('localhost', server.port, context=context)
    2005          with self.assertRaises(ssl.CertificateError):
    2006              h.request('GET', '/')
    2007  
    2008          # With context.check_hostname=False, the mismatching is ignored
    2009          context.check_hostname = False
    2010          h = client.HTTPSConnection('localhost', server.port, context=context)
    2011          h.request('GET', '/nonexistent')
    2012          resp = h.getresponse()
    2013          resp.close()
    2014          h.close()
    2015          self.assertEqual(resp.status, 404)
    2016  
    2017      @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
    2018                       'http.client.HTTPSConnection not available')
    2019      def test_host_port(self):
    2020          # Check invalid host_port
    2021  
    2022          for hp in ("www.python.org:abc", "user:password@www.python.org"):
    2023              self.assertRaises(client.InvalidURL, client.HTTPSConnection, hp)
    2024  
    2025          for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
    2026                            "fe80::207:e9ff:fe9b", 8000),
    2027                           ("www.python.org:443", "www.python.org", 443),
    2028                           ("www.python.org:", "www.python.org", 443),
    2029                           ("www.python.org", "www.python.org", 443),
    2030                           ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443),
    2031                           ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b",
    2032                               443)):
    2033              c = client.HTTPSConnection(hp)
    2034              self.assertEqual(h, c.host)
    2035              self.assertEqual(p, c.port)
    2036  
    2037      def test_tls13_pha(self):
    2038          import ssl
    2039          if not ssl.HAS_TLSv1_3:
    2040              self.skipTest('TLS 1.3 support required')
    2041          # just check status of PHA flag
    2042          h = client.HTTPSConnection('localhost', 443)
    2043          self.assertTrue(h._context.post_handshake_auth)
    2044  
    2045          context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    2046          self.assertFalse(context.post_handshake_auth)
    2047          h = client.HTTPSConnection('localhost', 443, context=context)
    2048          self.assertIs(h._context, context)
    2049          self.assertFalse(h._context.post_handshake_auth)
    2050  
    2051          context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT, cert_file=CERT_localhost)
    2052          context.post_handshake_auth = True
    2053          h = client.HTTPSConnection('localhost', 443, context=context)
    2054          self.assertTrue(h._context.post_handshake_auth)
    2055  
    2056  
    2057  class ESC[4;38;5;81mRequestBodyTest(ESC[4;38;5;149mTestCase):
    2058      """Test cases where a request includes a message body."""
    2059  
    2060      def setUp(self):
    2061          self.conn = client.HTTPConnection('example.com')
    2062          self.conn.sock = self.sock = FakeSocket("")
    2063          self.conn.sock = self.sock
    2064  
    2065      def get_headers_and_fp(self):
    2066          f = io.BytesIO(self.sock.data)
    2067          f.readline()  # read the request line
    2068          message = client.parse_headers(f)
    2069          return message, f
    2070  
    2071      def test_list_body(self):
    2072          # Note that no content-length is automatically calculated for
    2073          # an iterable.  The request will fall back to send chunked
    2074          # transfer encoding.
    2075          cases = (
    2076              ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
    2077              ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
    2078          )
    2079          for body, expected in cases:
    2080              with self.subTest(body):
    2081                  self.conn = client.HTTPConnection('example.com')
    2082                  self.conn.sock = self.sock = FakeSocket('')
    2083  
    2084                  self.conn.request('PUT', '/url', body)
    2085                  msg, f = self.get_headers_and_fp()
    2086                  self.assertNotIn('Content-Type', msg)
    2087                  self.assertNotIn('Content-Length', msg)
    2088                  self.assertEqual(msg.get('Transfer-Encoding'), 'chunked')
    2089                  self.assertEqual(expected, f.read())
    2090  
    2091      def test_manual_content_length(self):
    2092          # Set an incorrect content-length so that we can verify that
    2093          # it will not be over-ridden by the library.
    2094          self.conn.request("PUT", "/url", "body",
    2095                            {"Content-Length": "42"})
    2096          message, f = self.get_headers_and_fp()
    2097          self.assertEqual("42", message.get("content-length"))
    2098          self.assertEqual(4, len(f.read()))
    2099  
    2100      def test_ascii_body(self):
    2101          self.conn.request("PUT", "/url", "body")
    2102          message, f = self.get_headers_and_fp()
    2103          self.assertEqual("text/plain", message.get_content_type())
    2104          self.assertIsNone(message.get_charset())
    2105          self.assertEqual("4", message.get("content-length"))
    2106          self.assertEqual(b'body', f.read())
    2107  
    2108      def test_latin1_body(self):
    2109          self.conn.request("PUT", "/url", "body\xc1")
    2110          message, f = self.get_headers_and_fp()
    2111          self.assertEqual("text/plain", message.get_content_type())
    2112          self.assertIsNone(message.get_charset())
    2113          self.assertEqual("5", message.get("content-length"))
    2114          self.assertEqual(b'body\xc1', f.read())
    2115  
    2116      def test_bytes_body(self):
    2117          self.conn.request("PUT", "/url", b"body\xc1")
    2118          message, f = self.get_headers_and_fp()
    2119          self.assertEqual("text/plain", message.get_content_type())
    2120          self.assertIsNone(message.get_charset())
    2121          self.assertEqual("5", message.get("content-length"))
    2122          self.assertEqual(b'body\xc1', f.read())
    2123  
    2124      def test_text_file_body(self):
    2125          self.addCleanup(os_helper.unlink, os_helper.TESTFN)
    2126          with open(os_helper.TESTFN, "w", encoding="utf-8") as f:
    2127              f.write("body")
    2128          with open(os_helper.TESTFN, encoding="utf-8") as f:
    2129              self.conn.request("PUT", "/url", f)
    2130              message, f = self.get_headers_and_fp()
    2131              self.assertEqual("text/plain", message.get_content_type())
    2132              self.assertIsNone(message.get_charset())
    2133              # No content-length will be determined for files; the body
    2134              # will be sent using chunked transfer encoding instead.
    2135              self.assertIsNone(message.get("content-length"))
    2136              self.assertEqual("chunked", message.get("transfer-encoding"))
    2137              self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read())
    2138  
    2139      def test_binary_file_body(self):
    2140          self.addCleanup(os_helper.unlink, os_helper.TESTFN)
    2141          with open(os_helper.TESTFN, "wb") as f:
    2142              f.write(b"body\xc1")
    2143          with open(os_helper.TESTFN, "rb") as f:
    2144              self.conn.request("PUT", "/url", f)
    2145              message, f = self.get_headers_and_fp()
    2146              self.assertEqual("text/plain", message.get_content_type())
    2147              self.assertIsNone(message.get_charset())
    2148              self.assertEqual("chunked", message.get("Transfer-Encoding"))
    2149              self.assertNotIn("Content-Length", message)
    2150              self.assertEqual(b'5\r\nbody\xc1\r\n0\r\n\r\n', f.read())
    2151  
    2152  
    2153  class ESC[4;38;5;81mHTTPResponseTest(ESC[4;38;5;149mTestCase):
    2154  
    2155      def setUp(self):
    2156          body = "HTTP/1.1 200 Ok\r\nMy-Header: first-value\r\nMy-Header: \
    2157                  second-value\r\n\r\nText"
    2158          sock = FakeSocket(body)
    2159          self.resp = client.HTTPResponse(sock)
    2160          self.resp.begin()
    2161  
    2162      def test_getting_header(self):
    2163          header = self.resp.getheader('My-Header')
    2164          self.assertEqual(header, 'first-value, second-value')
    2165  
    2166          header = self.resp.getheader('My-Header', 'some default')
    2167          self.assertEqual(header, 'first-value, second-value')
    2168  
    2169      def test_getting_nonexistent_header_with_string_default(self):
    2170          header = self.resp.getheader('No-Such-Header', 'default-value')
    2171          self.assertEqual(header, 'default-value')
    2172  
    2173      def test_getting_nonexistent_header_with_iterable_default(self):
    2174          header = self.resp.getheader('No-Such-Header', ['default', 'values'])
    2175          self.assertEqual(header, 'default, values')
    2176  
    2177          header = self.resp.getheader('No-Such-Header', ('default', 'values'))
    2178          self.assertEqual(header, 'default, values')
    2179  
    2180      def test_getting_nonexistent_header_without_default(self):
    2181          header = self.resp.getheader('No-Such-Header')
    2182          self.assertEqual(header, None)
    2183  
    2184      def test_getting_header_defaultint(self):
    2185          header = self.resp.getheader('No-Such-Header',default=42)
    2186          self.assertEqual(header, 42)
    2187  
    2188  class ESC[4;38;5;81mTunnelTests(ESC[4;38;5;149mTestCase):
    2189      def setUp(self):
    2190          response_text = (
    2191              'HTTP/1.1 200 OK\r\n\r\n' # Reply to CONNECT
    2192              'HTTP/1.1 200 OK\r\n' # Reply to HEAD
    2193              'Content-Length: 42\r\n\r\n'
    2194          )
    2195          self.host = 'proxy.com'
    2196          self.port = client.HTTP_PORT
    2197          self.conn = client.HTTPConnection(self.host)
    2198          self.conn._create_connection = self._create_connection(response_text)
    2199  
    2200      def tearDown(self):
    2201          self.conn.close()
    2202  
    2203      def _create_connection(self, response_text):
    2204          def create_connection(address, timeout=None, source_address=None):
    2205              return FakeSocket(response_text, host=address[0], port=address[1])
    2206          return create_connection
    2207  
    2208      def test_set_tunnel_host_port_headers_add_host_missing(self):
    2209          tunnel_host = 'destination.com'
    2210          tunnel_port = 8888
    2211          tunnel_headers = {'User-Agent': 'Mozilla/5.0 (compatible, MSIE 11)'}
    2212          tunnel_headers_after = tunnel_headers.copy()
    2213          tunnel_headers_after['Host'] = '%s:%d' % (tunnel_host, tunnel_port)
    2214          self.conn.set_tunnel(tunnel_host, port=tunnel_port,
    2215                               headers=tunnel_headers)
    2216          self.conn.request('HEAD', '/', '')
    2217          self.assertEqual(self.conn.sock.host, self.host)
    2218          self.assertEqual(self.conn.sock.port, self.port)
    2219          self.assertEqual(self.conn._tunnel_host, tunnel_host)
    2220          self.assertEqual(self.conn._tunnel_port, tunnel_port)
    2221          self.assertEqual(self.conn._tunnel_headers, tunnel_headers_after)
    2222  
    2223      def test_set_tunnel_host_port_headers_set_host_identical(self):
    2224          tunnel_host = 'destination.com'
    2225          tunnel_port = 8888
    2226          tunnel_headers = {'User-Agent': 'Mozilla/5.0 (compatible, MSIE 11)',
    2227                            'Host': '%s:%d' % (tunnel_host, tunnel_port)}
    2228          self.conn.set_tunnel(tunnel_host, port=tunnel_port,
    2229                               headers=tunnel_headers)
    2230          self.conn.request('HEAD', '/', '')
    2231          self.assertEqual(self.conn.sock.host, self.host)
    2232          self.assertEqual(self.conn.sock.port, self.port)
    2233          self.assertEqual(self.conn._tunnel_host, tunnel_host)
    2234          self.assertEqual(self.conn._tunnel_port, tunnel_port)
    2235          self.assertEqual(self.conn._tunnel_headers, tunnel_headers)
    2236  
    2237      def test_set_tunnel_host_port_headers_set_host_different(self):
    2238          tunnel_host = 'destination.com'
    2239          tunnel_port = 8888
    2240          tunnel_headers = {'User-Agent': 'Mozilla/5.0 (compatible, MSIE 11)',
    2241                            'Host': '%s:%d' % ('example.com', 4200)}
    2242          self.conn.set_tunnel(tunnel_host, port=tunnel_port,
    2243                               headers=tunnel_headers)
    2244          self.conn.request('HEAD', '/', '')
    2245          self.assertEqual(self.conn.sock.host, self.host)
    2246          self.assertEqual(self.conn.sock.port, self.port)
    2247          self.assertEqual(self.conn._tunnel_host, tunnel_host)
    2248          self.assertEqual(self.conn._tunnel_port, tunnel_port)
    2249          self.assertEqual(self.conn._tunnel_headers, tunnel_headers)
    2250  
    2251      def test_disallow_set_tunnel_after_connect(self):
    2252          # Once connected, we shouldn't be able to tunnel anymore
    2253          self.conn.connect()
    2254          self.assertRaises(RuntimeError, self.conn.set_tunnel,
    2255                            'destination.com')
    2256  
    2257      def test_connect_with_tunnel(self):
    2258          d = {
    2259              b'host': b'destination.com',
    2260              b'port': client.HTTP_PORT,
    2261          }
    2262          self.conn.set_tunnel(d[b'host'].decode('ascii'))
    2263          self.conn.request('HEAD', '/', '')
    2264          self.assertEqual(self.conn.sock.host, self.host)
    2265          self.assertEqual(self.conn.sock.port, self.port)
    2266          self.assertIn(b'CONNECT %(host)s:%(port)d HTTP/1.1\r\n'
    2267                        b'Host: %(host)s:%(port)d\r\n\r\n' % d,
    2268                        self.conn.sock.data)
    2269          self.assertIn(b'HEAD / HTTP/1.1\r\nHost: %(host)s\r\n' % d,
    2270                        self.conn.sock.data)
    2271  
    2272      def test_connect_with_tunnel_with_default_port(self):
    2273          d = {
    2274              b'host': b'destination.com',
    2275              b'port': client.HTTP_PORT,
    2276          }
    2277          self.conn.set_tunnel(d[b'host'].decode('ascii'), port=d[b'port'])
    2278          self.conn.request('HEAD', '/', '')
    2279          self.assertEqual(self.conn.sock.host, self.host)
    2280          self.assertEqual(self.conn.sock.port, self.port)
    2281          self.assertIn(b'CONNECT %(host)s:%(port)d HTTP/1.1\r\n'
    2282                        b'Host: %(host)s:%(port)d\r\n\r\n' % d,
    2283                        self.conn.sock.data)
    2284          self.assertIn(b'HEAD / HTTP/1.1\r\nHost: %(host)s\r\n' % d,
    2285                        self.conn.sock.data)
    2286  
    2287      def test_connect_with_tunnel_with_nonstandard_port(self):
    2288          d = {
    2289              b'host': b'destination.com',
    2290              b'port': 8888,
    2291          }
    2292          self.conn.set_tunnel(d[b'host'].decode('ascii'), port=d[b'port'])
    2293          self.conn.request('HEAD', '/', '')
    2294          self.assertEqual(self.conn.sock.host, self.host)
    2295          self.assertEqual(self.conn.sock.port, self.port)
    2296          self.assertIn(b'CONNECT %(host)s:%(port)d HTTP/1.1\r\n'
    2297                        b'Host: %(host)s:%(port)d\r\n\r\n' % d,
    2298                        self.conn.sock.data)
    2299          self.assertIn(b'HEAD / HTTP/1.1\r\nHost: %(host)s:%(port)d\r\n' % d,
    2300                        self.conn.sock.data)
    2301  
    2302      # This request is not RFC-valid, but it's been possible with the library
    2303      # for years, so don't break it unexpectedly... This also tests
    2304      # case-insensitivity when injecting Host: headers if they're missing.
    2305      def test_connect_with_tunnel_with_different_host_header(self):
    2306          d = {
    2307              b'host': b'destination.com',
    2308              b'tunnel_host_header': b'example.com:9876',
    2309              b'port': client.HTTP_PORT,
    2310          }
    2311          self.conn.set_tunnel(
    2312              d[b'host'].decode('ascii'),
    2313              headers={'HOST': d[b'tunnel_host_header'].decode('ascii')})
    2314          self.conn.request('HEAD', '/', '')
    2315          self.assertEqual(self.conn.sock.host, self.host)
    2316          self.assertEqual(self.conn.sock.port, self.port)
    2317          self.assertIn(b'CONNECT %(host)s:%(port)d HTTP/1.1\r\n'
    2318                        b'HOST: %(tunnel_host_header)s\r\n\r\n' % d,
    2319                        self.conn.sock.data)
    2320          self.assertIn(b'HEAD / HTTP/1.1\r\nHost: %(host)s\r\n' % d,
    2321                        self.conn.sock.data)
    2322  
    2323      def test_connect_with_tunnel_different_host(self):
    2324          d = {
    2325              b'host': b'destination.com',
    2326              b'port': client.HTTP_PORT,
    2327          }
    2328          self.conn.set_tunnel(d[b'host'].decode('ascii'))
    2329          self.conn.request('HEAD', '/', '')
    2330          self.assertEqual(self.conn.sock.host, self.host)
    2331          self.assertEqual(self.conn.sock.port, self.port)
    2332          self.assertIn(b'CONNECT %(host)s:%(port)d HTTP/1.1\r\n'
    2333                        b'Host: %(host)s:%(port)d\r\n\r\n' % d,
    2334                        self.conn.sock.data)
    2335          self.assertIn(b'HEAD / HTTP/1.1\r\nHost: %(host)s\r\n' % d,
    2336                        self.conn.sock.data)
    2337  
    2338      def test_connect_with_tunnel_idna(self):
    2339          dest = '\u03b4\u03c0\u03b8.gr'
    2340          dest_port = b'%s:%d' % (dest.encode('idna'), client.HTTP_PORT)
    2341          expected = b'CONNECT %s HTTP/1.1\r\nHost: %s\r\n\r\n' % (
    2342              dest_port, dest_port)
    2343          self.conn.set_tunnel(dest)
    2344          self.conn.request('HEAD', '/', '')
    2345          self.assertEqual(self.conn.sock.host, self.host)
    2346          self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
    2347          self.assertIn(expected, self.conn.sock.data)
    2348  
    2349      def test_tunnel_connect_single_send_connection_setup(self):
    2350          """Regresstion test for https://bugs.python.org/issue43332."""
    2351          with mock.patch.object(self.conn, 'send') as mock_send:
    2352              self.conn.set_tunnel('destination.com')
    2353              self.conn.connect()
    2354              self.conn.request('GET', '/')
    2355          mock_send.assert_called()
    2356          # Likely 2, but this test only cares about the first.
    2357          self.assertGreater(
    2358                  len(mock_send.mock_calls), 1,
    2359                  msg=f'unexpected number of send calls: {mock_send.mock_calls}')
    2360          proxy_setup_data_sent = mock_send.mock_calls[0][1][0]
    2361          self.assertIn(b'CONNECT destination.com', proxy_setup_data_sent)
    2362          self.assertTrue(
    2363                  proxy_setup_data_sent.endswith(b'\r\n\r\n'),
    2364                  msg=f'unexpected proxy data sent {proxy_setup_data_sent!r}')
    2365  
    2366      def test_connect_put_request(self):
    2367          d = {
    2368              b'host': b'destination.com',
    2369              b'port': client.HTTP_PORT,
    2370          }
    2371          self.conn.set_tunnel(d[b'host'].decode('ascii'))
    2372          self.conn.request('PUT', '/', '')
    2373          self.assertEqual(self.conn.sock.host, self.host)
    2374          self.assertEqual(self.conn.sock.port, self.port)
    2375          self.assertIn(b'CONNECT %(host)s:%(port)d HTTP/1.1\r\n'
    2376                        b'Host: %(host)s:%(port)d\r\n\r\n' % d,
    2377                        self.conn.sock.data)
    2378          self.assertIn(b'PUT / HTTP/1.1\r\nHost: %(host)s\r\n' % d,
    2379                        self.conn.sock.data)
    2380  
    2381      def test_tunnel_debuglog(self):
    2382          expected_header = 'X-Dummy: 1'
    2383          response_text = 'HTTP/1.0 200 OK\r\n{}\r\n\r\n'.format(expected_header)
    2384  
    2385          self.conn.set_debuglevel(1)
    2386          self.conn._create_connection = self._create_connection(response_text)
    2387          self.conn.set_tunnel('destination.com')
    2388  
    2389          with support.captured_stdout() as output:
    2390              self.conn.request('PUT', '/', '')
    2391          lines = output.getvalue().splitlines()
    2392          self.assertIn('header: {}'.format(expected_header), lines)
    2393  
    2394      def test_proxy_response_headers(self):
    2395          expected_header = ('X-Dummy', '1')
    2396          response_text = (
    2397              'HTTP/1.0 200 OK\r\n'
    2398              '{0}\r\n\r\n'.format(':'.join(expected_header))
    2399          )
    2400  
    2401          self.conn._create_connection = self._create_connection(response_text)
    2402          self.conn.set_tunnel('destination.com')
    2403  
    2404          self.conn.request('PUT', '/', '')
    2405          headers = self.conn.get_proxy_response_headers()
    2406          self.assertIn(expected_header, headers.items())
    2407  
    2408      def test_no_proxy_response_headers(self):
    2409          expected_header = ('X-Dummy', '1')
    2410          response_text = (
    2411              'HTTP/1.0 200 OK\r\n'
    2412              '{0}\r\n\r\n'.format(':'.join(expected_header))
    2413          )
    2414  
    2415          self.conn._create_connection = self._create_connection(response_text)
    2416  
    2417          self.conn.request('PUT', '/', '')
    2418          headers = self.conn.get_proxy_response_headers()
    2419          self.assertIsNone(headers)
    2420  
    2421      def test_tunnel_leak(self):
    2422          sock = None
    2423  
    2424          def _create_connection(address, timeout=None, source_address=None):
    2425              nonlocal sock
    2426              sock = FakeSocket(
    2427                  'HTTP/1.1 404 NOT FOUND\r\n\r\n',
    2428                  host=address[0],
    2429                  port=address[1],
    2430              )
    2431              return sock
    2432  
    2433          self.conn._create_connection = _create_connection
    2434          self.conn.set_tunnel('destination.com')
    2435          exc = None
    2436          try:
    2437              self.conn.request('HEAD', '/', '')
    2438          except OSError as e:
    2439              # keeping a reference to exc keeps response alive in the traceback
    2440              exc = e
    2441          self.assertIsNotNone(exc)
    2442          self.assertTrue(sock.file_closed)
    2443  
    2444  
    2445  if __name__ == '__main__':
    2446      unittest.main(verbosity=2)