(root)/
Python-3.11.7/
Lib/
test/
test_httplib.py
       1  import enum
       2  import errno
       3  from http import client, HTTPStatus
       4  import io
       5  import itertools
       6  import os
       7  import array
       8  import re
       9  import socket
      10  import threading
      11  import warnings
      12  
      13  import unittest
      14  from unittest import mock
      15  TestCase = unittest.TestCase
      16  
      17  from test import support
      18  from test.support import os_helper
      19  from test.support import socket_helper
      20  from test.support import warnings_helper
      21  
      22  support.requires_working_socket(module=True)
      23  
      24  here = os.path.dirname(__file__)
      25  # Self-signed cert file for 'localhost'
      26  CERT_localhost = os.path.join(here, 'certdata', 'keycert.pem')
      27  # Self-signed cert file for 'fakehostname'
      28  CERT_fakehostname = os.path.join(here, 'certdata', 'keycert2.pem')
      29  # Self-signed cert file for self-signed.pythontest.net
      30  CERT_selfsigned_pythontestdotnet = os.path.join(
      31      here, 'certdata', 'selfsigned_pythontestdotnet.pem',
      32  )
      33  
      34  # constants for testing chunked encoding
      35  chunked_start = (
      36      'HTTP/1.1 200 OK\r\n'
      37      'Transfer-Encoding: chunked\r\n\r\n'
      38      'a\r\n'
      39      'hello worl\r\n'
      40      '3\r\n'
      41      'd! \r\n'
      42      '8\r\n'
      43      'and now \r\n'
      44      '22\r\n'
      45      'for something completely different\r\n'
      46  )
      47  chunked_expected = b'hello world! and now for something completely different'
      48  chunk_extension = ";foo=bar"
      49  last_chunk = "0\r\n"
      50  last_chunk_extended = "0" + chunk_extension + "\r\n"
      51  trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n"
      52  chunked_end = "\r\n"
      53  
      54  HOST = socket_helper.HOST
      55  
      56  class ESC[4;38;5;81mFakeSocket:
      57      def __init__(self, text, fileclass=io.BytesIO, host=None, port=None):
      58          if isinstance(text, str):
      59              text = text.encode("ascii")
      60          self.text = text
      61          self.fileclass = fileclass
      62          self.data = b''
      63          self.sendall_calls = 0
      64          self.file_closed = False
      65          self.host = host
      66          self.port = port
      67  
      68      def sendall(self, data):
      69          self.sendall_calls += 1
      70          self.data += data
      71  
      72      def makefile(self, mode, bufsize=None):
      73          if mode != 'r' and mode != 'rb':
      74              raise client.UnimplementedFileMode()
      75          # keep the file around so we can check how much was read from it
      76          self.file = self.fileclass(self.text)
      77          self.file.close = self.file_close #nerf close ()
      78          return self.file
      79  
      80      def file_close(self):
      81          self.file_closed = True
      82  
      83      def close(self):
      84          pass
      85  
      86      def setsockopt(self, level, optname, value):
      87          pass
      88  
      89  class ESC[4;38;5;81mEPipeSocket(ESC[4;38;5;149mFakeSocket):
      90  
      91      def __init__(self, text, pipe_trigger):
      92          # When sendall() is called with pipe_trigger, raise EPIPE.
      93          FakeSocket.__init__(self, text)
      94          self.pipe_trigger = pipe_trigger
      95  
      96      def sendall(self, data):
      97          if self.pipe_trigger in data:
      98              raise OSError(errno.EPIPE, "gotcha")
      99          self.data += data
     100  
     101      def close(self):
     102          pass
     103  
     104  class ESC[4;38;5;81mNoEOFBytesIO(ESC[4;38;5;149mioESC[4;38;5;149m.ESC[4;38;5;149mBytesIO):
     105      """Like BytesIO, but raises AssertionError on EOF.
     106  
     107      This is used below to test that http.client doesn't try to read
     108      more from the underlying file than it should.
     109      """
     110      def read(self, n=-1):
     111          data = io.BytesIO.read(self, n)
     112          if data == b'':
     113              raise AssertionError('caller tried to read past EOF')
     114          return data
     115  
     116      def readline(self, length=None):
     117          data = io.BytesIO.readline(self, length)
     118          if data == b'':
     119              raise AssertionError('caller tried to read past EOF')
     120          return data
     121  
     122  class ESC[4;38;5;81mFakeSocketHTTPConnection(ESC[4;38;5;149mclientESC[4;38;5;149m.ESC[4;38;5;149mHTTPConnection):
     123      """HTTPConnection subclass using FakeSocket; counts connect() calls"""
     124  
     125      def __init__(self, *args):
     126          self.connections = 0
     127          super().__init__('example.com')
     128          self.fake_socket_args = args
     129          self._create_connection = self.create_connection
     130  
     131      def connect(self):
     132          """Count the number of times connect() is invoked"""
     133          self.connections += 1
     134          return super().connect()
     135  
     136      def create_connection(self, *pos, **kw):
     137          return FakeSocket(*self.fake_socket_args)
     138  
     139  class ESC[4;38;5;81mHeaderTests(ESC[4;38;5;149mTestCase):
     140      def test_auto_headers(self):
     141          # Some headers are added automatically, but should not be added by
     142          # .request() if they are explicitly set.
     143  
     144          class ESC[4;38;5;81mHeaderCountingBuffer(ESC[4;38;5;149mlist):
     145              def __init__(self):
     146                  self.count = {}
     147              def append(self, item):
     148                  kv = item.split(b':')
     149                  if len(kv) > 1:
     150                      # item is a 'Key: Value' header string
     151                      lcKey = kv[0].decode('ascii').lower()
     152                      self.count.setdefault(lcKey, 0)
     153                      self.count[lcKey] += 1
     154                  list.append(self, item)
     155  
     156          for explicit_header in True, False:
     157              for header in 'Content-length', 'Host', 'Accept-encoding':
     158                  conn = client.HTTPConnection('example.com')
     159                  conn.sock = FakeSocket('blahblahblah')
     160                  conn._buffer = HeaderCountingBuffer()
     161  
     162                  body = 'spamspamspam'
     163                  headers = {}
     164                  if explicit_header:
     165                      headers[header] = str(len(body))
     166                  conn.request('POST', '/', body, headers)
     167                  self.assertEqual(conn._buffer.count[header.lower()], 1)
     168  
     169      def test_content_length_0(self):
     170  
     171          class ESC[4;38;5;81mContentLengthChecker(ESC[4;38;5;149mlist):
     172              def __init__(self):
     173                  list.__init__(self)
     174                  self.content_length = None
     175              def append(self, item):
     176                  kv = item.split(b':', 1)
     177                  if len(kv) > 1 and kv[0].lower() == b'content-length':
     178                      self.content_length = kv[1].strip()
     179                  list.append(self, item)
     180  
     181          # Here, we're testing that methods expecting a body get a
     182          # content-length set to zero if the body is empty (either None or '')
     183          bodies = (None, '')
     184          methods_with_body = ('PUT', 'POST', 'PATCH')
     185          for method, body in itertools.product(methods_with_body, bodies):
     186              conn = client.HTTPConnection('example.com')
     187              conn.sock = FakeSocket(None)
     188              conn._buffer = ContentLengthChecker()
     189              conn.request(method, '/', body)
     190              self.assertEqual(
     191                  conn._buffer.content_length, b'0',
     192                  'Header Content-Length incorrect on {}'.format(method)
     193              )
     194  
     195          # For these methods, we make sure that content-length is not set when
     196          # the body is None because it might cause unexpected behaviour on the
     197          # server.
     198          methods_without_body = (
     199               'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE',
     200          )
     201          for method in methods_without_body:
     202              conn = client.HTTPConnection('example.com')
     203              conn.sock = FakeSocket(None)
     204              conn._buffer = ContentLengthChecker()
     205              conn.request(method, '/', None)
     206              self.assertEqual(
     207                  conn._buffer.content_length, None,
     208                  'Header Content-Length set for empty body on {}'.format(method)
     209              )
     210  
     211          # If the body is set to '', that's considered to be "present but
     212          # empty" rather than "missing", so content length would be set, even
     213          # for methods that don't expect a body.
     214          for method in methods_without_body:
     215              conn = client.HTTPConnection('example.com')
     216              conn.sock = FakeSocket(None)
     217              conn._buffer = ContentLengthChecker()
     218              conn.request(method, '/', '')
     219              self.assertEqual(
     220                  conn._buffer.content_length, b'0',
     221                  'Header Content-Length incorrect on {}'.format(method)
     222              )
     223  
     224          # If the body is set, make sure Content-Length is set.
     225          for method in itertools.chain(methods_without_body, methods_with_body):
     226              conn = client.HTTPConnection('example.com')
     227              conn.sock = FakeSocket(None)
     228              conn._buffer = ContentLengthChecker()
     229              conn.request(method, '/', ' ')
     230              self.assertEqual(
     231                  conn._buffer.content_length, b'1',
     232                  'Header Content-Length incorrect on {}'.format(method)
     233              )
     234  
     235      def test_putheader(self):
     236          conn = client.HTTPConnection('example.com')
     237          conn.sock = FakeSocket(None)
     238          conn.putrequest('GET','/')
     239          conn.putheader('Content-length', 42)
     240          self.assertIn(b'Content-length: 42', conn._buffer)
     241  
     242          conn.putheader('Foo', ' bar ')
     243          self.assertIn(b'Foo:  bar ', conn._buffer)
     244          conn.putheader('Bar', '\tbaz\t')
     245          self.assertIn(b'Bar: \tbaz\t', conn._buffer)
     246          conn.putheader('Authorization', 'Bearer mytoken')
     247          self.assertIn(b'Authorization: Bearer mytoken', conn._buffer)
     248          conn.putheader('IterHeader', 'IterA', 'IterB')
     249          self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer)
     250          conn.putheader('LatinHeader', b'\xFF')
     251          self.assertIn(b'LatinHeader: \xFF', conn._buffer)
     252          conn.putheader('Utf8Header', b'\xc3\x80')
     253          self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer)
     254          conn.putheader('C1-Control', b'next\x85line')
     255          self.assertIn(b'C1-Control: next\x85line', conn._buffer)
     256          conn.putheader('Embedded-Fold-Space', 'is\r\n allowed')
     257          self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer)
     258          conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed')
     259          self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer)
     260          conn.putheader('Key Space', 'value')
     261          self.assertIn(b'Key Space: value', conn._buffer)
     262          conn.putheader('KeySpace ', 'value')
     263          self.assertIn(b'KeySpace : value', conn._buffer)
     264          conn.putheader(b'Nonbreak\xa0Space', 'value')
     265          self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer)
     266          conn.putheader(b'\xa0NonbreakSpace', 'value')
     267          self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer)
     268  
     269      def test_ipv6host_header(self):
     270          # Default host header on IPv6 transaction should be wrapped by [] if
     271          # it is an IPv6 address
     272          expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
     273                     b'Accept-Encoding: identity\r\n\r\n'
     274          conn = client.HTTPConnection('[2001::]:81')
     275          sock = FakeSocket('')
     276          conn.sock = sock
     277          conn.request('GET', '/foo')
     278          self.assertTrue(sock.data.startswith(expected))
     279  
     280          expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
     281                     b'Accept-Encoding: identity\r\n\r\n'
     282          conn = client.HTTPConnection('[2001:102A::]')
     283          sock = FakeSocket('')
     284          conn.sock = sock
     285          conn.request('GET', '/foo')
     286          self.assertTrue(sock.data.startswith(expected))
     287  
     288          expected = b'GET /foo HTTP/1.1\r\nHost: [fe80::]\r\n' \
     289                     b'Accept-Encoding: identity\r\n\r\n'
     290          conn = client.HTTPConnection('[fe80::%2]')
     291          sock = FakeSocket('')
     292          conn.sock = sock
     293          conn.request('GET', '/foo')
     294          self.assertTrue(sock.data.startswith(expected))
     295  
     296          expected = b'GET /foo HTTP/1.1\r\nHost: [fe80::]:81\r\n' \
     297                     b'Accept-Encoding: identity\r\n\r\n'
     298          conn = client.HTTPConnection('[fe80::%2]:81')
     299          sock = FakeSocket('')
     300          conn.sock = sock
     301          conn.request('GET', '/foo')
     302          self.assertTrue(sock.data.startswith(expected))
     303  
     304      def test_malformed_headers_coped_with(self):
     305          # Issue 19996
     306          body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n"
     307          sock = FakeSocket(body)
     308          resp = client.HTTPResponse(sock)
     309          resp.begin()
     310  
     311          self.assertEqual(resp.getheader('First'), 'val')
     312          self.assertEqual(resp.getheader('Second'), 'val')
     313  
     314      def test_parse_all_octets(self):
     315          # Ensure no valid header field octet breaks the parser
     316          body = (
     317              b'HTTP/1.1 200 OK\r\n'
     318              b"!#$%&'*+-.^_`|~: value\r\n"  # Special token characters
     319              b'VCHAR: ' + bytes(range(0x21, 0x7E + 1)) + b'\r\n'
     320              b'obs-text: ' + bytes(range(0x80, 0xFF + 1)) + b'\r\n'
     321              b'obs-fold: text\r\n'
     322              b' folded with space\r\n'
     323              b'\tfolded with tab\r\n'
     324              b'Content-Length: 0\r\n'
     325              b'\r\n'
     326          )
     327          sock = FakeSocket(body)
     328          resp = client.HTTPResponse(sock)
     329          resp.begin()
     330          self.assertEqual(resp.getheader('Content-Length'), '0')
     331          self.assertEqual(resp.msg['Content-Length'], '0')
     332          self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value')
     333          self.assertEqual(resp.msg["!#$%&'*+-.^_`|~"], 'value')
     334          vchar = ''.join(map(chr, range(0x21, 0x7E + 1)))
     335          self.assertEqual(resp.getheader('VCHAR'), vchar)
     336          self.assertEqual(resp.msg['VCHAR'], vchar)
     337          self.assertIsNotNone(resp.getheader('obs-text'))
     338          self.assertIn('obs-text', resp.msg)
     339          for folded in (resp.getheader('obs-fold'), resp.msg['obs-fold']):
     340              self.assertTrue(folded.startswith('text'))
     341              self.assertIn(' folded with space', folded)
     342              self.assertTrue(folded.endswith('folded with tab'))
     343  
     344      def test_invalid_headers(self):
     345          conn = client.HTTPConnection('example.com')
     346          conn.sock = FakeSocket('')
     347          conn.putrequest('GET', '/')
     348  
     349          # http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no
     350          # longer allowed in header names
     351          cases = (
     352              (b'Invalid\r\nName', b'ValidValue'),
     353              (b'Invalid\rName', b'ValidValue'),
     354              (b'Invalid\nName', b'ValidValue'),
     355              (b'\r\nInvalidName', b'ValidValue'),
     356              (b'\rInvalidName', b'ValidValue'),
     357              (b'\nInvalidName', b'ValidValue'),
     358              (b' InvalidName', b'ValidValue'),
     359              (b'\tInvalidName', b'ValidValue'),
     360              (b'Invalid:Name', b'ValidValue'),
     361              (b':InvalidName', b'ValidValue'),
     362              (b'ValidName', b'Invalid\r\nValue'),
     363              (b'ValidName', b'Invalid\rValue'),
     364              (b'ValidName', b'Invalid\nValue'),
     365              (b'ValidName', b'InvalidValue\r\n'),
     366              (b'ValidName', b'InvalidValue\r'),
     367              (b'ValidName', b'InvalidValue\n'),
     368          )
     369          for name, value in cases:
     370              with self.subTest((name, value)):
     371                  with self.assertRaisesRegex(ValueError, 'Invalid header'):
     372                      conn.putheader(name, value)
     373  
     374      def test_headers_debuglevel(self):
     375          body = (
     376              b'HTTP/1.1 200 OK\r\n'
     377              b'First: val\r\n'
     378              b'Second: val1\r\n'
     379              b'Second: val2\r\n'
     380          )
     381          sock = FakeSocket(body)
     382          resp = client.HTTPResponse(sock, debuglevel=1)
     383          with support.captured_stdout() as output:
     384              resp.begin()
     385          lines = output.getvalue().splitlines()
     386          self.assertEqual(lines[0], "reply: 'HTTP/1.1 200 OK\\r\\n'")
     387          self.assertEqual(lines[1], "header: First: val")
     388          self.assertEqual(lines[2], "header: Second: val1")
     389          self.assertEqual(lines[3], "header: Second: val2")
     390  
     391  
     392  class ESC[4;38;5;81mHttpMethodTests(ESC[4;38;5;149mTestCase):
     393      def test_invalid_method_names(self):
     394          methods = (
     395              'GET\r',
     396              'POST\n',
     397              'PUT\n\r',
     398              'POST\nValue',
     399              'POST\nHOST:abc',
     400              'GET\nrHost:abc\n',
     401              'POST\rRemainder:\r',
     402              'GET\rHOST:\n',
     403              '\nPUT'
     404          )
     405  
     406          for method in methods:
     407              with self.assertRaisesRegex(
     408                      ValueError, "method can't contain control characters"):
     409                  conn = client.HTTPConnection('example.com')
     410                  conn.sock = FakeSocket(None)
     411                  conn.request(method=method, url="/")
     412  
     413  
     414  class ESC[4;38;5;81mTransferEncodingTest(ESC[4;38;5;149mTestCase):
     415      expected_body = b"It's just a flesh wound"
     416  
     417      def test_endheaders_chunked(self):
     418          conn = client.HTTPConnection('example.com')
     419          conn.sock = FakeSocket(b'')
     420          conn.putrequest('POST', '/')
     421          conn.endheaders(self._make_body(), encode_chunked=True)
     422  
     423          _, _, body = self._parse_request(conn.sock.data)
     424          body = self._parse_chunked(body)
     425          self.assertEqual(body, self.expected_body)
     426  
     427      def test_explicit_headers(self):
     428          # explicit chunked
     429          conn = client.HTTPConnection('example.com')
     430          conn.sock = FakeSocket(b'')
     431          # this shouldn't actually be automatically chunk-encoded because the
     432          # calling code has explicitly stated that it's taking care of it
     433          conn.request(
     434              'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'})
     435  
     436          _, headers, body = self._parse_request(conn.sock.data)
     437          self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
     438          self.assertEqual(headers['Transfer-Encoding'], 'chunked')
     439          self.assertEqual(body, self.expected_body)
     440  
     441          # explicit chunked, string body
     442          conn = client.HTTPConnection('example.com')
     443          conn.sock = FakeSocket(b'')
     444          conn.request(
     445              'POST', '/', self.expected_body.decode('latin-1'),
     446              {'Transfer-Encoding': 'chunked'})
     447  
     448          _, headers, body = self._parse_request(conn.sock.data)
     449          self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
     450          self.assertEqual(headers['Transfer-Encoding'], 'chunked')
     451          self.assertEqual(body, self.expected_body)
     452  
     453          # User-specified TE, but request() does the chunk encoding
     454          conn = client.HTTPConnection('example.com')
     455          conn.sock = FakeSocket(b'')
     456          conn.request('POST', '/',
     457              headers={'Transfer-Encoding': 'gzip, chunked'},
     458              encode_chunked=True,
     459              body=self._make_body())
     460          _, headers, body = self._parse_request(conn.sock.data)
     461          self.assertNotIn('content-length', [k.lower() for k in headers])
     462          self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked')
     463          self.assertEqual(self._parse_chunked(body), self.expected_body)
     464  
     465      def test_request(self):
     466          for empty_lines in (False, True,):
     467              conn = client.HTTPConnection('example.com')
     468              conn.sock = FakeSocket(b'')
     469              conn.request(
     470                  'POST', '/', self._make_body(empty_lines=empty_lines))
     471  
     472              _, headers, body = self._parse_request(conn.sock.data)
     473              body = self._parse_chunked(body)
     474              self.assertEqual(body, self.expected_body)
     475              self.assertEqual(headers['Transfer-Encoding'], 'chunked')
     476  
     477              # Content-Length and Transfer-Encoding SHOULD not be sent in the
     478              # same request
     479              self.assertNotIn('content-length', [k.lower() for k in headers])
     480  
     481      def test_empty_body(self):
     482          # Zero-length iterable should be treated like any other iterable
     483          conn = client.HTTPConnection('example.com')
     484          conn.sock = FakeSocket(b'')
     485          conn.request('POST', '/', ())
     486          _, headers, body = self._parse_request(conn.sock.data)
     487          self.assertEqual(headers['Transfer-Encoding'], 'chunked')
     488          self.assertNotIn('content-length', [k.lower() for k in headers])
     489          self.assertEqual(body, b"0\r\n\r\n")
     490  
     491      def _make_body(self, empty_lines=False):
     492          lines = self.expected_body.split(b' ')
     493          for idx, line in enumerate(lines):
     494              # for testing handling empty lines
     495              if empty_lines and idx % 2:
     496                  yield b''
     497              if idx < len(lines) - 1:
     498                  yield line + b' '
     499              else:
     500                  yield line
     501  
     502      def _parse_request(self, data):
     503          lines = data.split(b'\r\n')
     504          request = lines[0]
     505          headers = {}
     506          n = 1
     507          while n < len(lines) and len(lines[n]) > 0:
     508              key, val = lines[n].split(b':')
     509              key = key.decode('latin-1').strip()
     510              headers[key] = val.decode('latin-1').strip()
     511              n += 1
     512  
     513          return request, headers, b'\r\n'.join(lines[n + 1:])
     514  
     515      def _parse_chunked(self, data):
     516          body = []
     517          trailers = {}
     518          n = 0
     519          lines = data.split(b'\r\n')
     520          # parse body
     521          while True:
     522              size, chunk = lines[n:n+2]
     523              size = int(size, 16)
     524  
     525              if size == 0:
     526                  n += 1
     527                  break
     528  
     529              self.assertEqual(size, len(chunk))
     530              body.append(chunk)
     531  
     532              n += 2
     533              # we /should/ hit the end chunk, but check against the size of
     534              # lines so we're not stuck in an infinite loop should we get
     535              # malformed data
     536              if n > len(lines):
     537                  break
     538  
     539          return b''.join(body)
     540  
     541  
     542  class ESC[4;38;5;81mBasicTest(ESC[4;38;5;149mTestCase):
     543      def test_dir_with_added_behavior_on_status(self):
     544          # see issue40084
     545          self.assertTrue({'description', 'name', 'phrase', 'value'} <= set(dir(HTTPStatus(404))))
     546  
     547      def test_simple_httpstatus(self):
     548          class ESC[4;38;5;81mCheckedHTTPStatus(ESC[4;38;5;149menumESC[4;38;5;149m.ESC[4;38;5;149mIntEnum):
     549              """HTTP status codes and reason phrases
     550  
     551              Status codes from the following RFCs are all observed:
     552  
     553                  * RFC 7231: Hypertext Transfer Protocol (HTTP/1.1), obsoletes 2616
     554                  * RFC 6585: Additional HTTP Status Codes
     555                  * RFC 3229: Delta encoding in HTTP
     556                  * RFC 4918: HTTP Extensions for WebDAV, obsoletes 2518
     557                  * RFC 5842: Binding Extensions to WebDAV
     558                  * RFC 7238: Permanent Redirect
     559                  * RFC 2295: Transparent Content Negotiation in HTTP
     560                  * RFC 2774: An HTTP Extension Framework
     561                  * RFC 7725: An HTTP Status Code to Report Legal Obstacles
     562                  * RFC 7540: Hypertext Transfer Protocol Version 2 (HTTP/2)
     563                  * RFC 2324: Hyper Text Coffee Pot Control Protocol (HTCPCP/1.0)
     564                  * RFC 8297: An HTTP Status Code for Indicating Hints
     565                  * RFC 8470: Using Early Data in HTTP
     566              """
     567              def __new__(cls, value, phrase, description=''):
     568                  obj = int.__new__(cls, value)
     569                  obj._value_ = value
     570  
     571                  obj.phrase = phrase
     572                  obj.description = description
     573                  return obj
     574              # informational
     575              CONTINUE = 100, 'Continue', 'Request received, please continue'
     576              SWITCHING_PROTOCOLS = (101, 'Switching Protocols',
     577                      'Switching to new protocol; obey Upgrade header')
     578              PROCESSING = 102, 'Processing'
     579              EARLY_HINTS = 103, 'Early Hints'
     580              # success
     581              OK = 200, 'OK', 'Request fulfilled, document follows'
     582              CREATED = 201, 'Created', 'Document created, URL follows'
     583              ACCEPTED = (202, 'Accepted',
     584                  'Request accepted, processing continues off-line')
     585              NON_AUTHORITATIVE_INFORMATION = (203,
     586                  'Non-Authoritative Information', 'Request fulfilled from cache')
     587              NO_CONTENT = 204, 'No Content', 'Request fulfilled, nothing follows'
     588              RESET_CONTENT = 205, 'Reset Content', 'Clear input form for further input'
     589              PARTIAL_CONTENT = 206, 'Partial Content', 'Partial content follows'
     590              MULTI_STATUS = 207, 'Multi-Status'
     591              ALREADY_REPORTED = 208, 'Already Reported'
     592              IM_USED = 226, 'IM Used'
     593              # redirection
     594              MULTIPLE_CHOICES = (300, 'Multiple Choices',
     595                  'Object has several resources -- see URI list')
     596              MOVED_PERMANENTLY = (301, 'Moved Permanently',
     597                  'Object moved permanently -- see URI list')
     598              FOUND = 302, 'Found', 'Object moved temporarily -- see URI list'
     599              SEE_OTHER = 303, 'See Other', 'Object moved -- see Method and URL list'
     600              NOT_MODIFIED = (304, 'Not Modified',
     601                  'Document has not changed since given time')
     602              USE_PROXY = (305, 'Use Proxy',
     603                  'You must use proxy specified in Location to access this resource')
     604              TEMPORARY_REDIRECT = (307, 'Temporary Redirect',
     605                  'Object moved temporarily -- see URI list')
     606              PERMANENT_REDIRECT = (308, 'Permanent Redirect',
     607                  'Object moved permanently -- see URI list')
     608              # client error
     609              BAD_REQUEST = (400, 'Bad Request',
     610                  'Bad request syntax or unsupported method')
     611              UNAUTHORIZED = (401, 'Unauthorized',
     612                  'No permission -- see authorization schemes')
     613              PAYMENT_REQUIRED = (402, 'Payment Required',
     614                  'No payment -- see charging schemes')
     615              FORBIDDEN = (403, 'Forbidden',
     616                  'Request forbidden -- authorization will not help')
     617              NOT_FOUND = (404, 'Not Found',
     618                  'Nothing matches the given URI')
     619              METHOD_NOT_ALLOWED = (405, 'Method Not Allowed',
     620                  'Specified method is invalid for this resource')
     621              NOT_ACCEPTABLE = (406, 'Not Acceptable',
     622                  'URI not available in preferred format')
     623              PROXY_AUTHENTICATION_REQUIRED = (407,
     624                  'Proxy Authentication Required',
     625                  'You must authenticate with this proxy before proceeding')
     626              REQUEST_TIMEOUT = (408, 'Request Timeout',
     627                  'Request timed out; try again later')
     628              CONFLICT = 409, 'Conflict', 'Request conflict'
     629              GONE = (410, 'Gone',
     630                  'URI no longer exists and has been permanently removed')
     631              LENGTH_REQUIRED = (411, 'Length Required',
     632                  'Client must specify Content-Length')
     633              PRECONDITION_FAILED = (412, 'Precondition Failed',
     634                  'Precondition in headers is false')
     635              REQUEST_ENTITY_TOO_LARGE = (413, 'Request Entity Too Large',
     636                  'Entity is too large')
     637              REQUEST_URI_TOO_LONG = (414, 'Request-URI Too Long',
     638                  'URI is too long')
     639              UNSUPPORTED_MEDIA_TYPE = (415, 'Unsupported Media Type',
     640                  'Entity body in unsupported format')
     641              REQUESTED_RANGE_NOT_SATISFIABLE = (416,
     642                  'Requested Range Not Satisfiable',
     643                  'Cannot satisfy request range')
     644              EXPECTATION_FAILED = (417, 'Expectation Failed',
     645                  'Expect condition could not be satisfied')
     646              IM_A_TEAPOT = (418, 'I\'m a Teapot',
     647                  'Server refuses to brew coffee because it is a teapot.')
     648              MISDIRECTED_REQUEST = (421, 'Misdirected Request',
     649                  'Server is not able to produce a response')
     650              UNPROCESSABLE_ENTITY = 422, 'Unprocessable Entity'
     651              LOCKED = 423, 'Locked'
     652              FAILED_DEPENDENCY = 424, 'Failed Dependency'
     653              TOO_EARLY = 425, 'Too Early'
     654              UPGRADE_REQUIRED = 426, 'Upgrade Required'
     655              PRECONDITION_REQUIRED = (428, 'Precondition Required',
     656                  'The origin server requires the request to be conditional')
     657              TOO_MANY_REQUESTS = (429, 'Too Many Requests',
     658                  'The user has sent too many requests in '
     659                  'a given amount of time ("rate limiting")')
     660              REQUEST_HEADER_FIELDS_TOO_LARGE = (431,
     661                  'Request Header Fields Too Large',
     662                  'The server is unwilling to process the request because its header '
     663                  'fields are too large')
     664              UNAVAILABLE_FOR_LEGAL_REASONS = (451,
     665                  'Unavailable For Legal Reasons',
     666                  'The server is denying access to the '
     667                  'resource as a consequence of a legal demand')
     668              # server errors
     669              INTERNAL_SERVER_ERROR = (500, 'Internal Server Error',
     670                  'Server got itself in trouble')
     671              NOT_IMPLEMENTED = (501, 'Not Implemented',
     672                  'Server does not support this operation')
     673              BAD_GATEWAY = (502, 'Bad Gateway',
     674                  'Invalid responses from another server/proxy')
     675              SERVICE_UNAVAILABLE = (503, 'Service Unavailable',
     676                  'The server cannot process the request due to a high load')
     677              GATEWAY_TIMEOUT = (504, 'Gateway Timeout',
     678                  'The gateway server did not receive a timely response')
     679              HTTP_VERSION_NOT_SUPPORTED = (505, 'HTTP Version Not Supported',
     680                  'Cannot fulfill request')
     681              VARIANT_ALSO_NEGOTIATES = 506, 'Variant Also Negotiates'
     682              INSUFFICIENT_STORAGE = 507, 'Insufficient Storage'
     683              LOOP_DETECTED = 508, 'Loop Detected'
     684              NOT_EXTENDED = 510, 'Not Extended'
     685              NETWORK_AUTHENTICATION_REQUIRED = (511,
     686                  'Network Authentication Required',
     687                  'The client needs to authenticate to gain network access')
     688          enum._test_simple_enum(CheckedHTTPStatus, HTTPStatus)
     689  
     690  
     691      def test_status_lines(self):
     692          # Test HTTP status lines
     693  
     694          body = "HTTP/1.1 200 Ok\r\n\r\nText"
     695          sock = FakeSocket(body)
     696          resp = client.HTTPResponse(sock)
     697          resp.begin()
     698          self.assertEqual(resp.read(0), b'')  # Issue #20007
     699          self.assertFalse(resp.isclosed())
     700          self.assertFalse(resp.closed)
     701          self.assertEqual(resp.read(), b"Text")
     702          self.assertTrue(resp.isclosed())
     703          self.assertFalse(resp.closed)
     704          resp.close()
     705          self.assertTrue(resp.closed)
     706  
     707          body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
     708          sock = FakeSocket(body)
     709          resp = client.HTTPResponse(sock)
     710          self.assertRaises(client.BadStatusLine, resp.begin)
     711  
     712      def test_bad_status_repr(self):
     713          exc = client.BadStatusLine('')
     714          self.assertEqual(repr(exc), '''BadStatusLine("''")''')
     715  
     716      def test_partial_reads(self):
     717          # if we have Content-Length, HTTPResponse knows when to close itself,
     718          # the same behaviour as when we read the whole thing with read()
     719          body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
     720          sock = FakeSocket(body)
     721          resp = client.HTTPResponse(sock)
     722          resp.begin()
     723          self.assertEqual(resp.read(2), b'Te')
     724          self.assertFalse(resp.isclosed())
     725          self.assertEqual(resp.read(2), b'xt')
     726          self.assertTrue(resp.isclosed())
     727          self.assertFalse(resp.closed)
     728          resp.close()
     729          self.assertTrue(resp.closed)
     730  
     731      def test_mixed_reads(self):
     732          # readline() should update the remaining length, so that read() knows
     733          # how much data is left and does not raise IncompleteRead
     734          body = "HTTP/1.1 200 Ok\r\nContent-Length: 13\r\n\r\nText\r\nAnother"
     735          sock = FakeSocket(body)
     736          resp = client.HTTPResponse(sock)
     737          resp.begin()
     738          self.assertEqual(resp.readline(), b'Text\r\n')
     739          self.assertFalse(resp.isclosed())
     740          self.assertEqual(resp.read(), b'Another')
     741          self.assertTrue(resp.isclosed())
     742          self.assertFalse(resp.closed)
     743          resp.close()
     744          self.assertTrue(resp.closed)
     745  
     746      def test_partial_readintos(self):
     747          # if we have Content-Length, HTTPResponse knows when to close itself,
     748          # the same behaviour as when we read the whole thing with read()
     749          body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
     750          sock = FakeSocket(body)
     751          resp = client.HTTPResponse(sock)
     752          resp.begin()
     753          b = bytearray(2)
     754          n = resp.readinto(b)
     755          self.assertEqual(n, 2)
     756          self.assertEqual(bytes(b), b'Te')
     757          self.assertFalse(resp.isclosed())
     758          n = resp.readinto(b)
     759          self.assertEqual(n, 2)
     760          self.assertEqual(bytes(b), b'xt')
     761          self.assertTrue(resp.isclosed())
     762          self.assertFalse(resp.closed)
     763          resp.close()
     764          self.assertTrue(resp.closed)
     765  
     766      def test_partial_reads_past_end(self):
     767          # if we have Content-Length, clip reads to the end
     768          body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
     769          sock = FakeSocket(body)
     770          resp = client.HTTPResponse(sock)
     771          resp.begin()
     772          self.assertEqual(resp.read(10), b'Text')
     773          self.assertTrue(resp.isclosed())
     774          self.assertFalse(resp.closed)
     775          resp.close()
     776          self.assertTrue(resp.closed)
     777  
     778      def test_partial_readintos_past_end(self):
     779          # if we have Content-Length, clip readintos to the end
     780          body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
     781          sock = FakeSocket(body)
     782          resp = client.HTTPResponse(sock)
     783          resp.begin()
     784          b = bytearray(10)
     785          n = resp.readinto(b)
     786          self.assertEqual(n, 4)
     787          self.assertEqual(bytes(b)[:4], b'Text')
     788          self.assertTrue(resp.isclosed())
     789          self.assertFalse(resp.closed)
     790          resp.close()
     791          self.assertTrue(resp.closed)
     792  
     793      def test_partial_reads_no_content_length(self):
     794          # when no length is present, the socket should be gracefully closed when
     795          # all data was read
     796          body = "HTTP/1.1 200 Ok\r\n\r\nText"
     797          sock = FakeSocket(body)
     798          resp = client.HTTPResponse(sock)
     799          resp.begin()
     800          self.assertEqual(resp.read(2), b'Te')
     801          self.assertFalse(resp.isclosed())
     802          self.assertEqual(resp.read(2), b'xt')
     803          self.assertEqual(resp.read(1), b'')
     804          self.assertTrue(resp.isclosed())
     805          self.assertFalse(resp.closed)
     806          resp.close()
     807          self.assertTrue(resp.closed)
     808  
     809      def test_partial_readintos_no_content_length(self):
     810          # when no length is present, the socket should be gracefully closed when
     811          # all data was read
     812          body = "HTTP/1.1 200 Ok\r\n\r\nText"
     813          sock = FakeSocket(body)
     814          resp = client.HTTPResponse(sock)
     815          resp.begin()
     816          b = bytearray(2)
     817          n = resp.readinto(b)
     818          self.assertEqual(n, 2)
     819          self.assertEqual(bytes(b), b'Te')
     820          self.assertFalse(resp.isclosed())
     821          n = resp.readinto(b)
     822          self.assertEqual(n, 2)
     823          self.assertEqual(bytes(b), b'xt')
     824          n = resp.readinto(b)
     825          self.assertEqual(n, 0)
     826          self.assertTrue(resp.isclosed())
     827  
     828      def test_partial_reads_incomplete_body(self):
     829          # if the server shuts down the connection before the whole
     830          # content-length is delivered, the socket is gracefully closed
     831          body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
     832          sock = FakeSocket(body)
     833          resp = client.HTTPResponse(sock)
     834          resp.begin()
     835          self.assertEqual(resp.read(2), b'Te')
     836          self.assertFalse(resp.isclosed())
     837          self.assertEqual(resp.read(2), b'xt')
     838          self.assertEqual(resp.read(1), b'')
     839          self.assertTrue(resp.isclosed())
     840  
     841      def test_partial_readintos_incomplete_body(self):
     842          # if the server shuts down the connection before the whole
     843          # content-length is delivered, the socket is gracefully closed
     844          body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
     845          sock = FakeSocket(body)
     846          resp = client.HTTPResponse(sock)
     847          resp.begin()
     848          b = bytearray(2)
     849          n = resp.readinto(b)
     850          self.assertEqual(n, 2)
     851          self.assertEqual(bytes(b), b'Te')
     852          self.assertFalse(resp.isclosed())
     853          n = resp.readinto(b)
     854          self.assertEqual(n, 2)
     855          self.assertEqual(bytes(b), b'xt')
     856          n = resp.readinto(b)
     857          self.assertEqual(n, 0)
     858          self.assertTrue(resp.isclosed())
     859          self.assertFalse(resp.closed)
     860          resp.close()
     861          self.assertTrue(resp.closed)
     862  
     863      def test_host_port(self):
     864          # Check invalid host_port
     865  
     866          for hp in ("www.python.org:abc", "user:password@www.python.org"):
     867              self.assertRaises(client.InvalidURL, client.HTTPConnection, hp)
     868  
     869          for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
     870                            "fe80::207:e9ff:fe9b", 8000),
     871                           ("www.python.org:80", "www.python.org", 80),
     872                           ("www.python.org:", "www.python.org", 80),
     873                           ("www.python.org", "www.python.org", 80),
     874                           ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80),
     875                           ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 80)):
     876              c = client.HTTPConnection(hp)
     877              self.assertEqual(h, c.host)
     878              self.assertEqual(p, c.port)
     879  
     880      def test_response_headers(self):
     881          # test response with multiple message headers with the same field name.
     882          text = ('HTTP/1.1 200 OK\r\n'
     883                  'Set-Cookie: Customer="WILE_E_COYOTE"; '
     884                  'Version="1"; Path="/acme"\r\n'
     885                  'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
     886                  ' Path="/acme"\r\n'
     887                  '\r\n'
     888                  'No body\r\n')
     889          hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
     890                 ', '
     891                 'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
     892          s = FakeSocket(text)
     893          r = client.HTTPResponse(s)
     894          r.begin()
     895          cookies = r.getheader("Set-Cookie")
     896          self.assertEqual(cookies, hdr)
     897  
     898      def test_read_head(self):
     899          # Test that the library doesn't attempt to read any data
     900          # from a HEAD request.  (Tickles SF bug #622042.)
     901          sock = FakeSocket(
     902              'HTTP/1.1 200 OK\r\n'
     903              'Content-Length: 14432\r\n'
     904              '\r\n',
     905              NoEOFBytesIO)
     906          resp = client.HTTPResponse(sock, method="HEAD")
     907          resp.begin()
     908          if resp.read():
     909              self.fail("Did not expect response from HEAD request")
     910  
     911      def test_readinto_head(self):
     912          # Test that the library doesn't attempt to read any data
     913          # from a HEAD request.  (Tickles SF bug #622042.)
     914          sock = FakeSocket(
     915              'HTTP/1.1 200 OK\r\n'
     916              'Content-Length: 14432\r\n'
     917              '\r\n',
     918              NoEOFBytesIO)
     919          resp = client.HTTPResponse(sock, method="HEAD")
     920          resp.begin()
     921          b = bytearray(5)
     922          if resp.readinto(b) != 0:
     923              self.fail("Did not expect response from HEAD request")
     924          self.assertEqual(bytes(b), b'\x00'*5)
     925  
     926      def test_too_many_headers(self):
     927          headers = '\r\n'.join('Header%d: foo' % i
     928                                for i in range(client._MAXHEADERS + 1)) + '\r\n'
     929          text = ('HTTP/1.1 200 OK\r\n' + headers)
     930          s = FakeSocket(text)
     931          r = client.HTTPResponse(s)
     932          self.assertRaisesRegex(client.HTTPException,
     933                                 r"got more than \d+ headers", r.begin)
     934  
     935      def test_send_file(self):
     936          expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n'
     937                      b'Accept-Encoding: identity\r\n'
     938                      b'Transfer-Encoding: chunked\r\n'
     939                      b'\r\n')
     940  
     941          with open(__file__, 'rb') as body:
     942              conn = client.HTTPConnection('example.com')
     943              sock = FakeSocket(body)
     944              conn.sock = sock
     945              conn.request('GET', '/foo', body)
     946              self.assertTrue(sock.data.startswith(expected), '%r != %r' %
     947                      (sock.data[:len(expected)], expected))
     948  
     949      def test_send(self):
     950          expected = b'this is a test this is only a test'
     951          conn = client.HTTPConnection('example.com')
     952          sock = FakeSocket(None)
     953          conn.sock = sock
     954          conn.send(expected)
     955          self.assertEqual(expected, sock.data)
     956          sock.data = b''
     957          conn.send(array.array('b', expected))
     958          self.assertEqual(expected, sock.data)
     959          sock.data = b''
     960          conn.send(io.BytesIO(expected))
     961          self.assertEqual(expected, sock.data)
     962  
     963      def test_send_updating_file(self):
     964          def data():
     965              yield 'data'
     966              yield None
     967              yield 'data_two'
     968  
     969          class ESC[4;38;5;81mUpdatingFile(ESC[4;38;5;149mioESC[4;38;5;149m.ESC[4;38;5;149mTextIOBase):
     970              mode = 'r'
     971              d = data()
     972              def read(self, blocksize=-1):
     973                  return next(self.d)
     974  
     975          expected = b'data'
     976  
     977          conn = client.HTTPConnection('example.com')
     978          sock = FakeSocket("")
     979          conn.sock = sock
     980          conn.send(UpdatingFile())
     981          self.assertEqual(sock.data, expected)
     982  
     983  
     984      def test_send_iter(self):
     985          expected = b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \
     986                     b'Accept-Encoding: identity\r\nContent-Length: 11\r\n' \
     987                     b'\r\nonetwothree'
     988  
     989          def body():
     990              yield b"one"
     991              yield b"two"
     992              yield b"three"
     993  
     994          conn = client.HTTPConnection('example.com')
     995          sock = FakeSocket("")
     996          conn.sock = sock
     997          conn.request('GET', '/foo', body(), {'Content-Length': '11'})
     998          self.assertEqual(sock.data, expected)
     999  
    1000      def test_blocksize_request(self):
    1001          """Check that request() respects the configured block size."""
    1002          blocksize = 8  # For easy debugging.
    1003          conn = client.HTTPConnection('example.com', blocksize=blocksize)
    1004          sock = FakeSocket(None)
    1005          conn.sock = sock
    1006          expected = b"a" * blocksize + b"b"
    1007          conn.request("PUT", "/", io.BytesIO(expected), {"Content-Length": "9"})
    1008          self.assertEqual(sock.sendall_calls, 3)
    1009          body = sock.data.split(b"\r\n\r\n", 1)[1]
    1010          self.assertEqual(body, expected)
    1011  
    1012      def test_blocksize_send(self):
    1013          """Check that send() respects the configured block size."""
    1014          blocksize = 8  # For easy debugging.
    1015          conn = client.HTTPConnection('example.com', blocksize=blocksize)
    1016          sock = FakeSocket(None)
    1017          conn.sock = sock
    1018          expected = b"a" * blocksize + b"b"
    1019          conn.send(io.BytesIO(expected))
    1020          self.assertEqual(sock.sendall_calls, 2)
    1021          self.assertEqual(sock.data, expected)
    1022  
    1023      def test_send_type_error(self):
    1024          # See: Issue #12676
    1025          conn = client.HTTPConnection('example.com')
    1026          conn.sock = FakeSocket('')
    1027          with self.assertRaises(TypeError):
    1028              conn.request('POST', 'test', conn)
    1029  
    1030      def test_chunked(self):
    1031          expected = chunked_expected
    1032          sock = FakeSocket(chunked_start + last_chunk + chunked_end)
    1033          resp = client.HTTPResponse(sock, method="GET")
    1034          resp.begin()
    1035          self.assertEqual(resp.read(), expected)
    1036          resp.close()
    1037  
    1038          # Various read sizes
    1039          for n in range(1, 12):
    1040              sock = FakeSocket(chunked_start + last_chunk + chunked_end)
    1041              resp = client.HTTPResponse(sock, method="GET")
    1042              resp.begin()
    1043              self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected)
    1044              resp.close()
    1045  
    1046          for x in ('', 'foo\r\n'):
    1047              sock = FakeSocket(chunked_start + x)
    1048              resp = client.HTTPResponse(sock, method="GET")
    1049              resp.begin()
    1050              try:
    1051                  resp.read()
    1052              except client.IncompleteRead as i:
    1053                  self.assertEqual(i.partial, expected)
    1054                  expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
    1055                  self.assertEqual(repr(i), expected_message)
    1056                  self.assertEqual(str(i), expected_message)
    1057              else:
    1058                  self.fail('IncompleteRead expected')
    1059              finally:
    1060                  resp.close()
    1061  
    1062      def test_readinto_chunked(self):
    1063  
    1064          expected = chunked_expected
    1065          nexpected = len(expected)
    1066          b = bytearray(128)
    1067  
    1068          sock = FakeSocket(chunked_start + last_chunk + chunked_end)
    1069          resp = client.HTTPResponse(sock, method="GET")
    1070          resp.begin()
    1071          n = resp.readinto(b)
    1072          self.assertEqual(b[:nexpected], expected)
    1073          self.assertEqual(n, nexpected)
    1074          resp.close()
    1075  
    1076          # Various read sizes
    1077          for n in range(1, 12):
    1078              sock = FakeSocket(chunked_start + last_chunk + chunked_end)
    1079              resp = client.HTTPResponse(sock, method="GET")
    1080              resp.begin()
    1081              m = memoryview(b)
    1082              i = resp.readinto(m[0:n])
    1083              i += resp.readinto(m[i:n + i])
    1084              i += resp.readinto(m[i:])
    1085              self.assertEqual(b[:nexpected], expected)
    1086              self.assertEqual(i, nexpected)
    1087              resp.close()
    1088  
    1089          for x in ('', 'foo\r\n'):
    1090              sock = FakeSocket(chunked_start + x)
    1091              resp = client.HTTPResponse(sock, method="GET")
    1092              resp.begin()
    1093              try:
    1094                  n = resp.readinto(b)
    1095              except client.IncompleteRead as i:
    1096                  self.assertEqual(i.partial, expected)
    1097                  expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
    1098                  self.assertEqual(repr(i), expected_message)
    1099                  self.assertEqual(str(i), expected_message)
    1100              else:
    1101                  self.fail('IncompleteRead expected')
    1102              finally:
    1103                  resp.close()
    1104  
    1105      def test_chunked_head(self):
    1106          chunked_start = (
    1107              'HTTP/1.1 200 OK\r\n'
    1108              'Transfer-Encoding: chunked\r\n\r\n'
    1109              'a\r\n'
    1110              'hello world\r\n'
    1111              '1\r\n'
    1112              'd\r\n'
    1113          )
    1114          sock = FakeSocket(chunked_start + last_chunk + chunked_end)
    1115          resp = client.HTTPResponse(sock, method="HEAD")
    1116          resp.begin()
    1117          self.assertEqual(resp.read(), b'')
    1118          self.assertEqual(resp.status, 200)
    1119          self.assertEqual(resp.reason, 'OK')
    1120          self.assertTrue(resp.isclosed())
    1121          self.assertFalse(resp.closed)
    1122          resp.close()
    1123          self.assertTrue(resp.closed)
    1124  
    1125      def test_readinto_chunked_head(self):
    1126          chunked_start = (
    1127              'HTTP/1.1 200 OK\r\n'
    1128              'Transfer-Encoding: chunked\r\n\r\n'
    1129              'a\r\n'
    1130              'hello world\r\n'
    1131              '1\r\n'
    1132              'd\r\n'
    1133          )
    1134          sock = FakeSocket(chunked_start + last_chunk + chunked_end)
    1135          resp = client.HTTPResponse(sock, method="HEAD")
    1136          resp.begin()
    1137          b = bytearray(5)
    1138          n = resp.readinto(b)
    1139          self.assertEqual(n, 0)
    1140          self.assertEqual(bytes(b), b'\x00'*5)
    1141          self.assertEqual(resp.status, 200)
    1142          self.assertEqual(resp.reason, 'OK')
    1143          self.assertTrue(resp.isclosed())
    1144          self.assertFalse(resp.closed)
    1145          resp.close()
    1146          self.assertTrue(resp.closed)
    1147  
    1148      def test_negative_content_length(self):
    1149          sock = FakeSocket(
    1150              'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n')
    1151          resp = client.HTTPResponse(sock, method="GET")
    1152          resp.begin()
    1153          self.assertEqual(resp.read(), b'Hello\r\n')
    1154          self.assertTrue(resp.isclosed())
    1155  
    1156      def test_incomplete_read(self):
    1157          sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
    1158          resp = client.HTTPResponse(sock, method="GET")
    1159          resp.begin()
    1160          try:
    1161              resp.read()
    1162          except client.IncompleteRead as i:
    1163              self.assertEqual(i.partial, b'Hello\r\n')
    1164              self.assertEqual(repr(i),
    1165                               "IncompleteRead(7 bytes read, 3 more expected)")
    1166              self.assertEqual(str(i),
    1167                               "IncompleteRead(7 bytes read, 3 more expected)")
    1168              self.assertTrue(resp.isclosed())
    1169          else:
    1170              self.fail('IncompleteRead expected')
    1171  
    1172      def test_epipe(self):
    1173          sock = EPipeSocket(
    1174              "HTTP/1.0 401 Authorization Required\r\n"
    1175              "Content-type: text/html\r\n"
    1176              "WWW-Authenticate: Basic realm=\"example\"\r\n",
    1177              b"Content-Length")
    1178          conn = client.HTTPConnection("example.com")
    1179          conn.sock = sock
    1180          self.assertRaises(OSError,
    1181                            lambda: conn.request("PUT", "/url", "body"))
    1182          resp = conn.getresponse()
    1183          self.assertEqual(401, resp.status)
    1184          self.assertEqual("Basic realm=\"example\"",
    1185                           resp.getheader("www-authenticate"))
    1186  
    1187      # Test lines overflowing the max line size (_MAXLINE in http.client)
    1188  
    1189      def test_overflowing_status_line(self):
    1190          body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n"
    1191          resp = client.HTTPResponse(FakeSocket(body))
    1192          self.assertRaises((client.LineTooLong, client.BadStatusLine), resp.begin)
    1193  
    1194      def test_overflowing_header_line(self):
    1195          body = (
    1196              'HTTP/1.1 200 OK\r\n'
    1197              'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n'
    1198          )
    1199          resp = client.HTTPResponse(FakeSocket(body))
    1200          self.assertRaises(client.LineTooLong, resp.begin)
    1201  
    1202      def test_overflowing_header_limit_after_100(self):
    1203          body = (
    1204              'HTTP/1.1 100 OK\r\n'
    1205              'r\n' * 32768
    1206          )
    1207          resp = client.HTTPResponse(FakeSocket(body))
    1208          with self.assertRaises(client.HTTPException) as cm:
    1209              resp.begin()
    1210          # We must assert more because other reasonable errors that we
    1211          # do not want can also be HTTPException derived.
    1212          self.assertIn('got more than ', str(cm.exception))
    1213          self.assertIn('headers', str(cm.exception))
    1214  
    1215      def test_overflowing_chunked_line(self):
    1216          body = (
    1217              'HTTP/1.1 200 OK\r\n'
    1218              'Transfer-Encoding: chunked\r\n\r\n'
    1219              + '0' * 65536 + 'a\r\n'
    1220              'hello world\r\n'
    1221              '0\r\n'
    1222              '\r\n'
    1223          )
    1224          resp = client.HTTPResponse(FakeSocket(body))
    1225          resp.begin()
    1226          self.assertRaises(client.LineTooLong, resp.read)
    1227  
    1228      def test_early_eof(self):
    1229          # Test httpresponse with no \r\n termination,
    1230          body = "HTTP/1.1 200 Ok"
    1231          sock = FakeSocket(body)
    1232          resp = client.HTTPResponse(sock)
    1233          resp.begin()
    1234          self.assertEqual(resp.read(), b'')
    1235          self.assertTrue(resp.isclosed())
    1236          self.assertFalse(resp.closed)
    1237          resp.close()
    1238          self.assertTrue(resp.closed)
    1239  
    1240      def test_error_leak(self):
    1241          # Test that the socket is not leaked if getresponse() fails
    1242          conn = client.HTTPConnection('example.com')
    1243          response = None
    1244          class ESC[4;38;5;81mResponse(ESC[4;38;5;149mclientESC[4;38;5;149m.ESC[4;38;5;149mHTTPResponse):
    1245              def __init__(self, *pos, **kw):
    1246                  nonlocal response
    1247                  response = self  # Avoid garbage collector closing the socket
    1248                  client.HTTPResponse.__init__(self, *pos, **kw)
    1249          conn.response_class = Response
    1250          conn.sock = FakeSocket('Invalid status line')
    1251          conn.request('GET', '/')
    1252          self.assertRaises(client.BadStatusLine, conn.getresponse)
    1253          self.assertTrue(response.closed)
    1254          self.assertTrue(conn.sock.file_closed)
    1255  
    1256      def test_chunked_extension(self):
    1257          extra = '3;foo=bar\r\n' + 'abc\r\n'
    1258          expected = chunked_expected + b'abc'
    1259  
    1260          sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end)
    1261          resp = client.HTTPResponse(sock, method="GET")
    1262          resp.begin()
    1263          self.assertEqual(resp.read(), expected)
    1264          resp.close()
    1265  
    1266      def test_chunked_missing_end(self):
    1267          """some servers may serve up a short chunked encoding stream"""
    1268          expected = chunked_expected
    1269          sock = FakeSocket(chunked_start + last_chunk)  #no terminating crlf
    1270          resp = client.HTTPResponse(sock, method="GET")
    1271          resp.begin()
    1272          self.assertEqual(resp.read(), expected)
    1273          resp.close()
    1274  
    1275      def test_chunked_trailers(self):
    1276          """See that trailers are read and ignored"""
    1277          expected = chunked_expected
    1278          sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end)
    1279          resp = client.HTTPResponse(sock, method="GET")
    1280          resp.begin()
    1281          self.assertEqual(resp.read(), expected)
    1282          # we should have reached the end of the file
    1283          self.assertEqual(sock.file.read(), b"") #we read to the end
    1284          resp.close()
    1285  
    1286      def test_chunked_sync(self):
    1287          """Check that we don't read past the end of the chunked-encoding stream"""
    1288          expected = chunked_expected
    1289          extradata = "extradata"
    1290          sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata)
    1291          resp = client.HTTPResponse(sock, method="GET")
    1292          resp.begin()
    1293          self.assertEqual(resp.read(), expected)
    1294          # the file should now have our extradata ready to be read
    1295          self.assertEqual(sock.file.read(), extradata.encode("ascii")) #we read to the end
    1296          resp.close()
    1297  
    1298      def test_content_length_sync(self):
    1299          """Check that we don't read past the end of the Content-Length stream"""
    1300          extradata = b"extradata"
    1301          expected = b"Hello123\r\n"
    1302          sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
    1303          resp = client.HTTPResponse(sock, method="GET")
    1304          resp.begin()
    1305          self.assertEqual(resp.read(), expected)
    1306          # the file should now have our extradata ready to be read
    1307          self.assertEqual(sock.file.read(), extradata) #we read to the end
    1308          resp.close()
    1309  
    1310      def test_readlines_content_length(self):
    1311          extradata = b"extradata"
    1312          expected = b"Hello123\r\n"
    1313          sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
    1314          resp = client.HTTPResponse(sock, method="GET")
    1315          resp.begin()
    1316          self.assertEqual(resp.readlines(2000), [expected])
    1317          # the file should now have our extradata ready to be read
    1318          self.assertEqual(sock.file.read(), extradata) #we read to the end
    1319          resp.close()
    1320  
    1321      def test_read1_content_length(self):
    1322          extradata = b"extradata"
    1323          expected = b"Hello123\r\n"
    1324          sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
    1325          resp = client.HTTPResponse(sock, method="GET")
    1326          resp.begin()
    1327          self.assertEqual(resp.read1(2000), expected)
    1328          # the file should now have our extradata ready to be read
    1329          self.assertEqual(sock.file.read(), extradata) #we read to the end
    1330          resp.close()
    1331  
    1332      def test_readline_bound_content_length(self):
    1333          extradata = b"extradata"
    1334          expected = b"Hello123\r\n"
    1335          sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
    1336          resp = client.HTTPResponse(sock, method="GET")
    1337          resp.begin()
    1338          self.assertEqual(resp.readline(10), expected)
    1339          self.assertEqual(resp.readline(10), b"")
    1340          # the file should now have our extradata ready to be read
    1341          self.assertEqual(sock.file.read(), extradata) #we read to the end
    1342          resp.close()
    1343  
    1344      def test_read1_bound_content_length(self):
    1345          extradata = b"extradata"
    1346          expected = b"Hello123\r\n"
    1347          sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 30\r\n\r\n' + expected*3 + extradata)
    1348          resp = client.HTTPResponse(sock, method="GET")
    1349          resp.begin()
    1350          self.assertEqual(resp.read1(20), expected*2)
    1351          self.assertEqual(resp.read(), expected)
    1352          # the file should now have our extradata ready to be read
    1353          self.assertEqual(sock.file.read(), extradata) #we read to the end
    1354          resp.close()
    1355  
    1356      def test_response_fileno(self):
    1357          # Make sure fd returned by fileno is valid.
    1358          serv = socket.create_server((HOST, 0))
    1359          self.addCleanup(serv.close)
    1360  
    1361          result = None
    1362          def run_server():
    1363              [conn, address] = serv.accept()
    1364              with conn, conn.makefile("rb") as reader:
    1365                  # Read the request header until a blank line
    1366                  while True:
    1367                      line = reader.readline()
    1368                      if not line.rstrip(b"\r\n"):
    1369                          break
    1370                  conn.sendall(b"HTTP/1.1 200 Connection established\r\n\r\n")
    1371                  nonlocal result
    1372                  result = reader.read()
    1373  
    1374          thread = threading.Thread(target=run_server)
    1375          thread.start()
    1376          self.addCleanup(thread.join, float(1))
    1377          conn = client.HTTPConnection(*serv.getsockname())
    1378          conn.request("CONNECT", "dummy:1234")
    1379          response = conn.getresponse()
    1380          try:
    1381              self.assertEqual(response.status, client.OK)
    1382              s = socket.socket(fileno=response.fileno())
    1383              try:
    1384                  s.sendall(b"proxied data\n")
    1385              finally:
    1386                  s.detach()
    1387          finally:
    1388              response.close()
    1389              conn.close()
    1390          thread.join()
    1391          self.assertEqual(result, b"proxied data\n")
    1392  
    1393      def test_putrequest_override_domain_validation(self):
    1394          """
    1395          It should be possible to override the default validation
    1396          behavior in putrequest (bpo-38216).
    1397          """
    1398          class ESC[4;38;5;81mUnsafeHTTPConnection(ESC[4;38;5;149mclientESC[4;38;5;149m.ESC[4;38;5;149mHTTPConnection):
    1399              def _validate_path(self, url):
    1400                  pass
    1401  
    1402          conn = UnsafeHTTPConnection('example.com')
    1403          conn.sock = FakeSocket('')
    1404          conn.putrequest('GET', '/\x00')
    1405  
    1406      def test_putrequest_override_host_validation(self):
    1407          class ESC[4;38;5;81mUnsafeHTTPConnection(ESC[4;38;5;149mclientESC[4;38;5;149m.ESC[4;38;5;149mHTTPConnection):
    1408              def _validate_host(self, url):
    1409                  pass
    1410  
    1411          conn = UnsafeHTTPConnection('example.com\r\n')
    1412          conn.sock = FakeSocket('')
    1413          # set skip_host so a ValueError is not raised upon adding the
    1414          # invalid URL as the value of the "Host:" header
    1415          conn.putrequest('GET', '/', skip_host=1)
    1416  
    1417      def test_putrequest_override_encoding(self):
    1418          """
    1419          It should be possible to override the default encoding
    1420          to transmit bytes in another encoding even if invalid
    1421          (bpo-36274).
    1422          """
    1423          class ESC[4;38;5;81mUnsafeHTTPConnection(ESC[4;38;5;149mclientESC[4;38;5;149m.ESC[4;38;5;149mHTTPConnection):
    1424              def _encode_request(self, str_url):
    1425                  return str_url.encode('utf-8')
    1426  
    1427          conn = UnsafeHTTPConnection('example.com')
    1428          conn.sock = FakeSocket('')
    1429          conn.putrequest('GET', '/☃')
    1430  
    1431  
    1432  class ESC[4;38;5;81mExtendedReadTest(ESC[4;38;5;149mTestCase):
    1433      """
    1434      Test peek(), read1(), readline()
    1435      """
    1436      lines = (
    1437          'HTTP/1.1 200 OK\r\n'
    1438          '\r\n'
    1439          'hello world!\n'
    1440          'and now \n'
    1441          'for something completely different\n'
    1442          'foo'
    1443          )
    1444      lines_expected = lines[lines.find('hello'):].encode("ascii")
    1445      lines_chunked = (
    1446          'HTTP/1.1 200 OK\r\n'
    1447          'Transfer-Encoding: chunked\r\n\r\n'
    1448          'a\r\n'
    1449          'hello worl\r\n'
    1450          '3\r\n'
    1451          'd!\n\r\n'
    1452          '9\r\n'
    1453          'and now \n\r\n'
    1454          '23\r\n'
    1455          'for something completely different\n\r\n'
    1456          '3\r\n'
    1457          'foo\r\n'
    1458          '0\r\n' # terminating chunk
    1459          '\r\n'  # end of trailers
    1460      )
    1461  
    1462      def setUp(self):
    1463          sock = FakeSocket(self.lines)
    1464          resp = client.HTTPResponse(sock, method="GET")
    1465          resp.begin()
    1466          resp.fp = io.BufferedReader(resp.fp)
    1467          self.resp = resp
    1468  
    1469  
    1470  
    1471      def test_peek(self):
    1472          resp = self.resp
    1473          # patch up the buffered peek so that it returns not too much stuff
    1474          oldpeek = resp.fp.peek
    1475          def mypeek(n=-1):
    1476              p = oldpeek(n)
    1477              if n >= 0:
    1478                  return p[:n]
    1479              return p[:10]
    1480          resp.fp.peek = mypeek
    1481  
    1482          all = []
    1483          while True:
    1484              # try a short peek
    1485              p = resp.peek(3)
    1486              if p:
    1487                  self.assertGreater(len(p), 0)
    1488                  # then unbounded peek
    1489                  p2 = resp.peek()
    1490                  self.assertGreaterEqual(len(p2), len(p))
    1491                  self.assertTrue(p2.startswith(p))
    1492                  next = resp.read(len(p2))
    1493                  self.assertEqual(next, p2)
    1494              else:
    1495                  next = resp.read()
    1496                  self.assertFalse(next)
    1497              all.append(next)
    1498              if not next:
    1499                  break
    1500          self.assertEqual(b"".join(all), self.lines_expected)
    1501  
    1502      def test_readline(self):
    1503          resp = self.resp
    1504          self._verify_readline(self.resp.readline, self.lines_expected)
    1505  
    1506      def _verify_readline(self, readline, expected):
    1507          all = []
    1508          while True:
    1509              # short readlines
    1510              line = readline(5)
    1511              if line and line != b"foo":
    1512                  if len(line) < 5:
    1513                      self.assertTrue(line.endswith(b"\n"))
    1514              all.append(line)
    1515              if not line:
    1516                  break
    1517          self.assertEqual(b"".join(all), expected)
    1518  
    1519      def test_read1(self):
    1520          resp = self.resp
    1521          def r():
    1522              res = resp.read1(4)
    1523              self.assertLessEqual(len(res), 4)
    1524              return res
    1525          readliner = Readliner(r)
    1526          self._verify_readline(readliner.readline, self.lines_expected)
    1527  
    1528      def test_read1_unbounded(self):
    1529          resp = self.resp
    1530          all = []
    1531          while True:
    1532              data = resp.read1()
    1533              if not data:
    1534                  break
    1535              all.append(data)
    1536          self.assertEqual(b"".join(all), self.lines_expected)
    1537  
    1538      def test_read1_bounded(self):
    1539          resp = self.resp
    1540          all = []
    1541          while True:
    1542              data = resp.read1(10)
    1543              if not data:
    1544                  break
    1545              self.assertLessEqual(len(data), 10)
    1546              all.append(data)
    1547          self.assertEqual(b"".join(all), self.lines_expected)
    1548  
    1549      def test_read1_0(self):
    1550          self.assertEqual(self.resp.read1(0), b"")
    1551  
    1552      def test_peek_0(self):
    1553          p = self.resp.peek(0)
    1554          self.assertLessEqual(0, len(p))
    1555  
    1556  
    1557  class ESC[4;38;5;81mExtendedReadTestChunked(ESC[4;38;5;149mExtendedReadTest):
    1558      """
    1559      Test peek(), read1(), readline() in chunked mode
    1560      """
    1561      lines = (
    1562          'HTTP/1.1 200 OK\r\n'
    1563          'Transfer-Encoding: chunked\r\n\r\n'
    1564          'a\r\n'
    1565          'hello worl\r\n'
    1566          '3\r\n'
    1567          'd!\n\r\n'
    1568          '9\r\n'
    1569          'and now \n\r\n'
    1570          '23\r\n'
    1571          'for something completely different\n\r\n'
    1572          '3\r\n'
    1573          'foo\r\n'
    1574          '0\r\n' # terminating chunk
    1575          '\r\n'  # end of trailers
    1576      )
    1577  
    1578  
    1579  class ESC[4;38;5;81mReadliner:
    1580      """
    1581      a simple readline class that uses an arbitrary read function and buffering
    1582      """
    1583      def __init__(self, readfunc):
    1584          self.readfunc = readfunc
    1585          self.remainder = b""
    1586  
    1587      def readline(self, limit):
    1588          data = []
    1589          datalen = 0
    1590          read = self.remainder
    1591          try:
    1592              while True:
    1593                  idx = read.find(b'\n')
    1594                  if idx != -1:
    1595                      break
    1596                  if datalen + len(read) >= limit:
    1597                      idx = limit - datalen - 1
    1598                  # read more data
    1599                  data.append(read)
    1600                  read = self.readfunc()
    1601                  if not read:
    1602                      idx = 0 #eof condition
    1603                      break
    1604              idx += 1
    1605              data.append(read[:idx])
    1606              self.remainder = read[idx:]
    1607              return b"".join(data)
    1608          except:
    1609              self.remainder = b"".join(data)
    1610              raise
    1611  
    1612  
    1613  class ESC[4;38;5;81mOfflineTest(ESC[4;38;5;149mTestCase):
    1614      def test_all(self):
    1615          # Documented objects defined in the module should be in __all__
    1616          expected = {"responses"}  # Allowlist documented dict() object
    1617          # HTTPMessage, parse_headers(), and the HTTP status code constants are
    1618          # intentionally omitted for simplicity
    1619          denylist = {"HTTPMessage", "parse_headers"}
    1620          for name in dir(client):
    1621              if name.startswith("_") or name in denylist:
    1622                  continue
    1623              module_object = getattr(client, name)
    1624              if getattr(module_object, "__module__", None) == "http.client":
    1625                  expected.add(name)
    1626          self.assertCountEqual(client.__all__, expected)
    1627  
    1628      def test_responses(self):
    1629          self.assertEqual(client.responses[client.NOT_FOUND], "Not Found")
    1630  
    1631      def test_client_constants(self):
    1632          # Make sure we don't break backward compatibility with 3.4
    1633          expected = [
    1634              'CONTINUE',
    1635              'SWITCHING_PROTOCOLS',
    1636              'PROCESSING',
    1637              'OK',
    1638              'CREATED',
    1639              'ACCEPTED',
    1640              'NON_AUTHORITATIVE_INFORMATION',
    1641              'NO_CONTENT',
    1642              'RESET_CONTENT',
    1643              'PARTIAL_CONTENT',
    1644              'MULTI_STATUS',
    1645              'IM_USED',
    1646              'MULTIPLE_CHOICES',
    1647              'MOVED_PERMANENTLY',
    1648              'FOUND',
    1649              'SEE_OTHER',
    1650              'NOT_MODIFIED',
    1651              'USE_PROXY',
    1652              'TEMPORARY_REDIRECT',
    1653              'BAD_REQUEST',
    1654              'UNAUTHORIZED',
    1655              'PAYMENT_REQUIRED',
    1656              'FORBIDDEN',
    1657              'NOT_FOUND',
    1658              'METHOD_NOT_ALLOWED',
    1659              'NOT_ACCEPTABLE',
    1660              'PROXY_AUTHENTICATION_REQUIRED',
    1661              'REQUEST_TIMEOUT',
    1662              'CONFLICT',
    1663              'GONE',
    1664              'LENGTH_REQUIRED',
    1665              'PRECONDITION_FAILED',
    1666              'REQUEST_ENTITY_TOO_LARGE',
    1667              'REQUEST_URI_TOO_LONG',
    1668              'UNSUPPORTED_MEDIA_TYPE',
    1669              'REQUESTED_RANGE_NOT_SATISFIABLE',
    1670              'EXPECTATION_FAILED',
    1671              'IM_A_TEAPOT',
    1672              'MISDIRECTED_REQUEST',
    1673              'UNPROCESSABLE_ENTITY',
    1674              'LOCKED',
    1675              'FAILED_DEPENDENCY',
    1676              'UPGRADE_REQUIRED',
    1677              'PRECONDITION_REQUIRED',
    1678              'TOO_MANY_REQUESTS',
    1679              'REQUEST_HEADER_FIELDS_TOO_LARGE',
    1680              'UNAVAILABLE_FOR_LEGAL_REASONS',
    1681              'INTERNAL_SERVER_ERROR',
    1682              'NOT_IMPLEMENTED',
    1683              'BAD_GATEWAY',
    1684              'SERVICE_UNAVAILABLE',
    1685              'GATEWAY_TIMEOUT',
    1686              'HTTP_VERSION_NOT_SUPPORTED',
    1687              'INSUFFICIENT_STORAGE',
    1688              'NOT_EXTENDED',
    1689              'NETWORK_AUTHENTICATION_REQUIRED',
    1690              'EARLY_HINTS',
    1691              'TOO_EARLY'
    1692          ]
    1693          for const in expected:
    1694              with self.subTest(constant=const):
    1695                  self.assertTrue(hasattr(client, const))
    1696  
    1697  
    1698  class ESC[4;38;5;81mSourceAddressTest(ESC[4;38;5;149mTestCase):
    1699      def setUp(self):
    1700          self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    1701          self.port = socket_helper.bind_port(self.serv)
    1702          self.source_port = socket_helper.find_unused_port()
    1703          self.serv.listen()
    1704          self.conn = None
    1705  
    1706      def tearDown(self):
    1707          if self.conn:
    1708              self.conn.close()
    1709              self.conn = None
    1710          self.serv.close()
    1711          self.serv = None
    1712  
    1713      def testHTTPConnectionSourceAddress(self):
    1714          self.conn = client.HTTPConnection(HOST, self.port,
    1715                  source_address=('', self.source_port))
    1716          self.conn.connect()
    1717          self.assertEqual(self.conn.sock.getsockname()[1], self.source_port)
    1718  
    1719      @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
    1720                       'http.client.HTTPSConnection not defined')
    1721      def testHTTPSConnectionSourceAddress(self):
    1722          self.conn = client.HTTPSConnection(HOST, self.port,
    1723                  source_address=('', self.source_port))
    1724          # We don't test anything here other than the constructor not barfing as
    1725          # this code doesn't deal with setting up an active running SSL server
    1726          # for an ssl_wrapped connect() to actually return from.
    1727  
    1728  
    1729  class ESC[4;38;5;81mTimeoutTest(ESC[4;38;5;149mTestCase):
    1730      PORT = None
    1731  
    1732      def setUp(self):
    1733          self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    1734          TimeoutTest.PORT = socket_helper.bind_port(self.serv)
    1735          self.serv.listen()
    1736  
    1737      def tearDown(self):
    1738          self.serv.close()
    1739          self.serv = None
    1740  
    1741      def testTimeoutAttribute(self):
    1742          # This will prove that the timeout gets through HTTPConnection
    1743          # and into the socket.
    1744  
    1745          # default -- use global socket timeout
    1746          self.assertIsNone(socket.getdefaulttimeout())
    1747          socket.setdefaulttimeout(30)
    1748          try:
    1749              httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT)
    1750              httpConn.connect()
    1751          finally:
    1752              socket.setdefaulttimeout(None)
    1753          self.assertEqual(httpConn.sock.gettimeout(), 30)
    1754          httpConn.close()
    1755  
    1756          # no timeout -- do not use global socket default
    1757          self.assertIsNone(socket.getdefaulttimeout())
    1758          socket.setdefaulttimeout(30)
    1759          try:
    1760              httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT,
    1761                                                timeout=None)
    1762              httpConn.connect()
    1763          finally:
    1764              socket.setdefaulttimeout(None)
    1765          self.assertEqual(httpConn.sock.gettimeout(), None)
    1766          httpConn.close()
    1767  
    1768          # a value
    1769          httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
    1770          httpConn.connect()
    1771          self.assertEqual(httpConn.sock.gettimeout(), 30)
    1772          httpConn.close()
    1773  
    1774  
    1775  class ESC[4;38;5;81mPersistenceTest(ESC[4;38;5;149mTestCase):
    1776  
    1777      def test_reuse_reconnect(self):
    1778          # Should reuse or reconnect depending on header from server
    1779          tests = (
    1780              ('1.0', '', False),
    1781              ('1.0', 'Connection: keep-alive\r\n', True),
    1782              ('1.1', '', True),
    1783              ('1.1', 'Connection: close\r\n', False),
    1784              ('1.0', 'Connection: keep-ALIVE\r\n', True),
    1785              ('1.1', 'Connection: cloSE\r\n', False),
    1786          )
    1787          for version, header, reuse in tests:
    1788              with self.subTest(version=version, header=header):
    1789                  msg = (
    1790                      'HTTP/{} 200 OK\r\n'
    1791                      '{}'
    1792                      'Content-Length: 12\r\n'
    1793                      '\r\n'
    1794                      'Dummy body\r\n'
    1795                  ).format(version, header)
    1796                  conn = FakeSocketHTTPConnection(msg)
    1797                  self.assertIsNone(conn.sock)
    1798                  conn.request('GET', '/open-connection')
    1799                  with conn.getresponse() as response:
    1800                      self.assertEqual(conn.sock is None, not reuse)
    1801                      response.read()
    1802                  self.assertEqual(conn.sock is None, not reuse)
    1803                  self.assertEqual(conn.connections, 1)
    1804                  conn.request('GET', '/subsequent-request')
    1805                  self.assertEqual(conn.connections, 1 if reuse else 2)
    1806  
    1807      def test_disconnected(self):
    1808  
    1809          def make_reset_reader(text):
    1810              """Return BufferedReader that raises ECONNRESET at EOF"""
    1811              stream = io.BytesIO(text)
    1812              def readinto(buffer):
    1813                  size = io.BytesIO.readinto(stream, buffer)
    1814                  if size == 0:
    1815                      raise ConnectionResetError()
    1816                  return size
    1817              stream.readinto = readinto
    1818              return io.BufferedReader(stream)
    1819  
    1820          tests = (
    1821              (io.BytesIO, client.RemoteDisconnected),
    1822              (make_reset_reader, ConnectionResetError),
    1823          )
    1824          for stream_factory, exception in tests:
    1825              with self.subTest(exception=exception):
    1826                  conn = FakeSocketHTTPConnection(b'', stream_factory)
    1827                  conn.request('GET', '/eof-response')
    1828                  self.assertRaises(exception, conn.getresponse)
    1829                  self.assertIsNone(conn.sock)
    1830                  # HTTPConnection.connect() should be automatically invoked
    1831                  conn.request('GET', '/reconnect')
    1832                  self.assertEqual(conn.connections, 2)
    1833  
    1834      def test_100_close(self):
    1835          conn = FakeSocketHTTPConnection(
    1836              b'HTTP/1.1 100 Continue\r\n'
    1837              b'\r\n'
    1838              # Missing final response
    1839          )
    1840          conn.request('GET', '/', headers={'Expect': '100-continue'})
    1841          self.assertRaises(client.RemoteDisconnected, conn.getresponse)
    1842          self.assertIsNone(conn.sock)
    1843          conn.request('GET', '/reconnect')
    1844          self.assertEqual(conn.connections, 2)
    1845  
    1846  
    1847  class ESC[4;38;5;81mHTTPSTest(ESC[4;38;5;149mTestCase):
    1848  
    1849      def setUp(self):
    1850          if not hasattr(client, 'HTTPSConnection'):
    1851              self.skipTest('ssl support required')
    1852  
    1853      def make_server(self, certfile):
    1854          from test.ssl_servers import make_https_server
    1855          return make_https_server(self, certfile=certfile)
    1856  
    1857      def test_attributes(self):
    1858          # simple test to check it's storing the timeout
    1859          h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
    1860          self.assertEqual(h.timeout, 30)
    1861  
    1862      def test_networked(self):
    1863          # Default settings: requires a valid cert from a trusted CA
    1864          import ssl
    1865          support.requires('network')
    1866          with socket_helper.transient_internet('self-signed.pythontest.net'):
    1867              h = client.HTTPSConnection('self-signed.pythontest.net', 443)
    1868              with self.assertRaises(ssl.SSLError) as exc_info:
    1869                  h.request('GET', '/')
    1870              self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
    1871  
    1872      def test_networked_noverification(self):
    1873          # Switch off cert verification
    1874          import ssl
    1875          support.requires('network')
    1876          with socket_helper.transient_internet('self-signed.pythontest.net'):
    1877              context = ssl._create_unverified_context()
    1878              h = client.HTTPSConnection('self-signed.pythontest.net', 443,
    1879                                         context=context)
    1880              h.request('GET', '/')
    1881              resp = h.getresponse()
    1882              h.close()
    1883              self.assertIn('nginx', resp.getheader('server'))
    1884              resp.close()
    1885  
    1886      @support.system_must_validate_cert
    1887      def test_networked_trusted_by_default_cert(self):
    1888          # Default settings: requires a valid cert from a trusted CA
    1889          support.requires('network')
    1890          with socket_helper.transient_internet('www.python.org'):
    1891              h = client.HTTPSConnection('www.python.org', 443)
    1892              h.request('GET', '/')
    1893              resp = h.getresponse()
    1894              content_type = resp.getheader('content-type')
    1895              resp.close()
    1896              h.close()
    1897              self.assertIn('text/html', content_type)
    1898  
    1899      def test_networked_good_cert(self):
    1900          # We feed the server's cert as a validating cert
    1901          import ssl
    1902          support.requires('network')
    1903          selfsigned_pythontestdotnet = 'self-signed.pythontest.net'
    1904          with socket_helper.transient_internet(selfsigned_pythontestdotnet):
    1905              context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    1906              self.assertEqual(context.verify_mode, ssl.CERT_REQUIRED)
    1907              self.assertEqual(context.check_hostname, True)
    1908              context.load_verify_locations(CERT_selfsigned_pythontestdotnet)
    1909              try:
    1910                  h = client.HTTPSConnection(selfsigned_pythontestdotnet, 443,
    1911                                             context=context)
    1912                  h.request('GET', '/')
    1913                  resp = h.getresponse()
    1914              except ssl.SSLError as ssl_err:
    1915                  ssl_err_str = str(ssl_err)
    1916                  # In the error message of [SSL: CERTIFICATE_VERIFY_FAILED] on
    1917                  # modern Linux distros (Debian Buster, etc) default OpenSSL
    1918                  # configurations it'll fail saying "key too weak" until we
    1919                  # address https://bugs.python.org/issue36816 to use a proper
    1920                  # key size on self-signed.pythontest.net.
    1921                  if re.search(r'(?i)key.too.weak', ssl_err_str):
    1922                      raise unittest.SkipTest(
    1923                          f'Got {ssl_err_str} trying to connect '
    1924                          f'to {selfsigned_pythontestdotnet}. '
    1925                          'See https://bugs.python.org/issue36816.')
    1926                  raise
    1927              server_string = resp.getheader('server')
    1928              resp.close()
    1929              h.close()
    1930              self.assertIn('nginx', server_string)
    1931  
    1932      @support.requires_resource('walltime')
    1933      def test_networked_bad_cert(self):
    1934          # We feed a "CA" cert that is unrelated to the server's cert
    1935          import ssl
    1936          support.requires('network')
    1937          with socket_helper.transient_internet('self-signed.pythontest.net'):
    1938              context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    1939              context.load_verify_locations(CERT_localhost)
    1940              h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context)
    1941              with self.assertRaises(ssl.SSLError) as exc_info:
    1942                  h.request('GET', '/')
    1943              self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
    1944  
    1945      def test_local_unknown_cert(self):
    1946          # The custom cert isn't known to the default trust bundle
    1947          import ssl
    1948          server = self.make_server(CERT_localhost)
    1949          h = client.HTTPSConnection('localhost', server.port)
    1950          with self.assertRaises(ssl.SSLError) as exc_info:
    1951              h.request('GET', '/')
    1952          self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
    1953  
    1954      def test_local_good_hostname(self):
    1955          # The (valid) cert validates the HTTP hostname
    1956          import ssl
    1957          server = self.make_server(CERT_localhost)
    1958          context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    1959          context.load_verify_locations(CERT_localhost)
    1960          h = client.HTTPSConnection('localhost', server.port, context=context)
    1961          self.addCleanup(h.close)
    1962          h.request('GET', '/nonexistent')
    1963          resp = h.getresponse()
    1964          self.addCleanup(resp.close)
    1965          self.assertEqual(resp.status, 404)
    1966  
    1967      def test_local_bad_hostname(self):
    1968          # The (valid) cert doesn't validate the HTTP hostname
    1969          import ssl
    1970          server = self.make_server(CERT_fakehostname)
    1971          context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    1972          context.load_verify_locations(CERT_fakehostname)
    1973          h = client.HTTPSConnection('localhost', server.port, context=context)
    1974          with self.assertRaises(ssl.CertificateError):
    1975              h.request('GET', '/')
    1976          # Same with explicit check_hostname=True
    1977          with warnings_helper.check_warnings(('', DeprecationWarning)):
    1978              h = client.HTTPSConnection('localhost', server.port,
    1979                                         context=context, check_hostname=True)
    1980          with self.assertRaises(ssl.CertificateError):
    1981              h.request('GET', '/')
    1982          # With check_hostname=False, the mismatching is ignored
    1983          context.check_hostname = False
    1984          with warnings_helper.check_warnings(('', DeprecationWarning)):
    1985              h = client.HTTPSConnection('localhost', server.port,
    1986                                         context=context, check_hostname=False)
    1987          h.request('GET', '/nonexistent')
    1988          resp = h.getresponse()
    1989          resp.close()
    1990          h.close()
    1991          self.assertEqual(resp.status, 404)
    1992          # The context's check_hostname setting is used if one isn't passed to
    1993          # HTTPSConnection.
    1994          context.check_hostname = False
    1995          h = client.HTTPSConnection('localhost', server.port, context=context)
    1996          h.request('GET', '/nonexistent')
    1997          resp = h.getresponse()
    1998          self.assertEqual(resp.status, 404)
    1999          resp.close()
    2000          h.close()
    2001          # Passing check_hostname to HTTPSConnection should override the
    2002          # context's setting.
    2003          with warnings_helper.check_warnings(('', DeprecationWarning)):
    2004              h = client.HTTPSConnection('localhost', server.port,
    2005                                         context=context, check_hostname=True)
    2006          with self.assertRaises(ssl.CertificateError):
    2007              h.request('GET', '/')
    2008  
    2009      @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
    2010                       'http.client.HTTPSConnection not available')
    2011      def test_host_port(self):
    2012          # Check invalid host_port
    2013  
    2014          for hp in ("www.python.org:abc", "user:password@www.python.org"):
    2015              self.assertRaises(client.InvalidURL, client.HTTPSConnection, hp)
    2016  
    2017          for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
    2018                            "fe80::207:e9ff:fe9b", 8000),
    2019                           ("www.python.org:443", "www.python.org", 443),
    2020                           ("www.python.org:", "www.python.org", 443),
    2021                           ("www.python.org", "www.python.org", 443),
    2022                           ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443),
    2023                           ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b",
    2024                               443)):
    2025              c = client.HTTPSConnection(hp)
    2026              self.assertEqual(h, c.host)
    2027              self.assertEqual(p, c.port)
    2028  
    2029      def test_tls13_pha(self):
    2030          import ssl
    2031          if not ssl.HAS_TLSv1_3:
    2032              self.skipTest('TLS 1.3 support required')
    2033          # just check status of PHA flag
    2034          h = client.HTTPSConnection('localhost', 443)
    2035          self.assertTrue(h._context.post_handshake_auth)
    2036  
    2037          context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    2038          self.assertFalse(context.post_handshake_auth)
    2039          h = client.HTTPSConnection('localhost', 443, context=context)
    2040          self.assertIs(h._context, context)
    2041          self.assertFalse(h._context.post_handshake_auth)
    2042  
    2043          with warnings.catch_warnings():
    2044              warnings.filterwarnings('ignore', 'key_file, cert_file and check_hostname are deprecated',
    2045                                      DeprecationWarning)
    2046              h = client.HTTPSConnection('localhost', 443, context=context,
    2047                                         cert_file=CERT_localhost)
    2048          self.assertTrue(h._context.post_handshake_auth)
    2049  
    2050  
    2051  class ESC[4;38;5;81mRequestBodyTest(ESC[4;38;5;149mTestCase):
    2052      """Test cases where a request includes a message body."""
    2053  
    2054      def setUp(self):
    2055          self.conn = client.HTTPConnection('example.com')
    2056          self.conn.sock = self.sock = FakeSocket("")
    2057          self.conn.sock = self.sock
    2058  
    2059      def get_headers_and_fp(self):
    2060          f = io.BytesIO(self.sock.data)
    2061          f.readline()  # read the request line
    2062          message = client.parse_headers(f)
    2063          return message, f
    2064  
    2065      def test_list_body(self):
    2066          # Note that no content-length is automatically calculated for
    2067          # an iterable.  The request will fall back to send chunked
    2068          # transfer encoding.
    2069          cases = (
    2070              ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
    2071              ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
    2072          )
    2073          for body, expected in cases:
    2074              with self.subTest(body):
    2075                  self.conn = client.HTTPConnection('example.com')
    2076                  self.conn.sock = self.sock = FakeSocket('')
    2077  
    2078                  self.conn.request('PUT', '/url', body)
    2079                  msg, f = self.get_headers_and_fp()
    2080                  self.assertNotIn('Content-Type', msg)
    2081                  self.assertNotIn('Content-Length', msg)
    2082                  self.assertEqual(msg.get('Transfer-Encoding'), 'chunked')
    2083                  self.assertEqual(expected, f.read())
    2084  
    2085      def test_manual_content_length(self):
    2086          # Set an incorrect content-length so that we can verify that
    2087          # it will not be over-ridden by the library.
    2088          self.conn.request("PUT", "/url", "body",
    2089                            {"Content-Length": "42"})
    2090          message, f = self.get_headers_and_fp()
    2091          self.assertEqual("42", message.get("content-length"))
    2092          self.assertEqual(4, len(f.read()))
    2093  
    2094      def test_ascii_body(self):
    2095          self.conn.request("PUT", "/url", "body")
    2096          message, f = self.get_headers_and_fp()
    2097          self.assertEqual("text/plain", message.get_content_type())
    2098          self.assertIsNone(message.get_charset())
    2099          self.assertEqual("4", message.get("content-length"))
    2100          self.assertEqual(b'body', f.read())
    2101  
    2102      def test_latin1_body(self):
    2103          self.conn.request("PUT", "/url", "body\xc1")
    2104          message, f = self.get_headers_and_fp()
    2105          self.assertEqual("text/plain", message.get_content_type())
    2106          self.assertIsNone(message.get_charset())
    2107          self.assertEqual("5", message.get("content-length"))
    2108          self.assertEqual(b'body\xc1', f.read())
    2109  
    2110      def test_bytes_body(self):
    2111          self.conn.request("PUT", "/url", b"body\xc1")
    2112          message, f = self.get_headers_and_fp()
    2113          self.assertEqual("text/plain", message.get_content_type())
    2114          self.assertIsNone(message.get_charset())
    2115          self.assertEqual("5", message.get("content-length"))
    2116          self.assertEqual(b'body\xc1', f.read())
    2117  
    2118      def test_text_file_body(self):
    2119          self.addCleanup(os_helper.unlink, os_helper.TESTFN)
    2120          with open(os_helper.TESTFN, "w", encoding="utf-8") as f:
    2121              f.write("body")
    2122          with open(os_helper.TESTFN, encoding="utf-8") as f:
    2123              self.conn.request("PUT", "/url", f)
    2124              message, f = self.get_headers_and_fp()
    2125              self.assertEqual("text/plain", message.get_content_type())
    2126              self.assertIsNone(message.get_charset())
    2127              # No content-length will be determined for files; the body
    2128              # will be sent using chunked transfer encoding instead.
    2129              self.assertIsNone(message.get("content-length"))
    2130              self.assertEqual("chunked", message.get("transfer-encoding"))
    2131              self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read())
    2132  
    2133      def test_binary_file_body(self):
    2134          self.addCleanup(os_helper.unlink, os_helper.TESTFN)
    2135          with open(os_helper.TESTFN, "wb") as f:
    2136              f.write(b"body\xc1")
    2137          with open(os_helper.TESTFN, "rb") as f:
    2138              self.conn.request("PUT", "/url", f)
    2139              message, f = self.get_headers_and_fp()
    2140              self.assertEqual("text/plain", message.get_content_type())
    2141              self.assertIsNone(message.get_charset())
    2142              self.assertEqual("chunked", message.get("Transfer-Encoding"))
    2143              self.assertNotIn("Content-Length", message)
    2144              self.assertEqual(b'5\r\nbody\xc1\r\n0\r\n\r\n', f.read())
    2145  
    2146  
    2147  class ESC[4;38;5;81mHTTPResponseTest(ESC[4;38;5;149mTestCase):
    2148  
    2149      def setUp(self):
    2150          body = "HTTP/1.1 200 Ok\r\nMy-Header: first-value\r\nMy-Header: \
    2151                  second-value\r\n\r\nText"
    2152          sock = FakeSocket(body)
    2153          self.resp = client.HTTPResponse(sock)
    2154          self.resp.begin()
    2155  
    2156      def test_getting_header(self):
    2157          header = self.resp.getheader('My-Header')
    2158          self.assertEqual(header, 'first-value, second-value')
    2159  
    2160          header = self.resp.getheader('My-Header', 'some default')
    2161          self.assertEqual(header, 'first-value, second-value')
    2162  
    2163      def test_getting_nonexistent_header_with_string_default(self):
    2164          header = self.resp.getheader('No-Such-Header', 'default-value')
    2165          self.assertEqual(header, 'default-value')
    2166  
    2167      def test_getting_nonexistent_header_with_iterable_default(self):
    2168          header = self.resp.getheader('No-Such-Header', ['default', 'values'])
    2169          self.assertEqual(header, 'default, values')
    2170  
    2171          header = self.resp.getheader('No-Such-Header', ('default', 'values'))
    2172          self.assertEqual(header, 'default, values')
    2173  
    2174      def test_getting_nonexistent_header_without_default(self):
    2175          header = self.resp.getheader('No-Such-Header')
    2176          self.assertEqual(header, None)
    2177  
    2178      def test_getting_header_defaultint(self):
    2179          header = self.resp.getheader('No-Such-Header',default=42)
    2180          self.assertEqual(header, 42)
    2181  
    2182  class ESC[4;38;5;81mTunnelTests(ESC[4;38;5;149mTestCase):
    2183      def setUp(self):
    2184          response_text = (
    2185              'HTTP/1.0 200 OK\r\n\r\n' # Reply to CONNECT
    2186              'HTTP/1.1 200 OK\r\n' # Reply to HEAD
    2187              'Content-Length: 42\r\n\r\n'
    2188          )
    2189          self.host = 'proxy.com'
    2190          self.conn = client.HTTPConnection(self.host)
    2191          self.conn._create_connection = self._create_connection(response_text)
    2192  
    2193      def tearDown(self):
    2194          self.conn.close()
    2195  
    2196      def _create_connection(self, response_text):
    2197          def create_connection(address, timeout=None, source_address=None):
    2198              return FakeSocket(response_text, host=address[0], port=address[1])
    2199          return create_connection
    2200  
    2201      def test_set_tunnel_host_port_headers(self):
    2202          tunnel_host = 'destination.com'
    2203          tunnel_port = 8888
    2204          tunnel_headers = {'User-Agent': 'Mozilla/5.0 (compatible, MSIE 11)'}
    2205          self.conn.set_tunnel(tunnel_host, port=tunnel_port,
    2206                               headers=tunnel_headers)
    2207          self.conn.request('HEAD', '/', '')
    2208          self.assertEqual(self.conn.sock.host, self.host)
    2209          self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
    2210          self.assertEqual(self.conn._tunnel_host, tunnel_host)
    2211          self.assertEqual(self.conn._tunnel_port, tunnel_port)
    2212          self.assertEqual(self.conn._tunnel_headers, tunnel_headers)
    2213  
    2214      def test_disallow_set_tunnel_after_connect(self):
    2215          # Once connected, we shouldn't be able to tunnel anymore
    2216          self.conn.connect()
    2217          self.assertRaises(RuntimeError, self.conn.set_tunnel,
    2218                            'destination.com')
    2219  
    2220      def test_connect_with_tunnel(self):
    2221          self.conn.set_tunnel('destination.com')
    2222          self.conn.request('HEAD', '/', '')
    2223          self.assertEqual(self.conn.sock.host, self.host)
    2224          self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
    2225          self.assertIn(b'CONNECT destination.com', self.conn.sock.data)
    2226          # issue22095
    2227          self.assertNotIn(b'Host: destination.com:None', self.conn.sock.data)
    2228          self.assertIn(b'Host: destination.com', self.conn.sock.data)
    2229  
    2230          # This test should be removed when CONNECT gets the HTTP/1.1 blessing
    2231          self.assertNotIn(b'Host: proxy.com', self.conn.sock.data)
    2232  
    2233      def test_tunnel_connect_single_send_connection_setup(self):
    2234          """Regresstion test for https://bugs.python.org/issue43332."""
    2235          with mock.patch.object(self.conn, 'send') as mock_send:
    2236              self.conn.set_tunnel('destination.com')
    2237              self.conn.connect()
    2238              self.conn.request('GET', '/')
    2239          mock_send.assert_called()
    2240          # Likely 2, but this test only cares about the first.
    2241          self.assertGreater(
    2242                  len(mock_send.mock_calls), 1,
    2243                  msg=f'unexpected number of send calls: {mock_send.mock_calls}')
    2244          proxy_setup_data_sent = mock_send.mock_calls[0][1][0]
    2245          self.assertIn(b'CONNECT destination.com', proxy_setup_data_sent)
    2246          self.assertTrue(
    2247                  proxy_setup_data_sent.endswith(b'\r\n\r\n'),
    2248                  msg=f'unexpected proxy data sent {proxy_setup_data_sent!r}')
    2249  
    2250      def test_connect_put_request(self):
    2251          self.conn.set_tunnel('destination.com')
    2252          self.conn.request('PUT', '/', '')
    2253          self.assertEqual(self.conn.sock.host, self.host)
    2254          self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
    2255          self.assertIn(b'CONNECT destination.com', self.conn.sock.data)
    2256          self.assertIn(b'Host: destination.com', self.conn.sock.data)
    2257  
    2258      def test_tunnel_debuglog(self):
    2259          expected_header = 'X-Dummy: 1'
    2260          response_text = 'HTTP/1.0 200 OK\r\n{}\r\n\r\n'.format(expected_header)
    2261  
    2262          self.conn.set_debuglevel(1)
    2263          self.conn._create_connection = self._create_connection(response_text)
    2264          self.conn.set_tunnel('destination.com')
    2265  
    2266          with support.captured_stdout() as output:
    2267              self.conn.request('PUT', '/', '')
    2268          lines = output.getvalue().splitlines()
    2269          self.assertIn('header: {}'.format(expected_header), lines)
    2270  
    2271      def test_tunnel_leak(self):
    2272          sock = None
    2273  
    2274          def _create_connection(address, timeout=None, source_address=None):
    2275              nonlocal sock
    2276              sock = FakeSocket(
    2277                  'HTTP/1.1 404 NOT FOUND\r\n\r\n',
    2278                  host=address[0],
    2279                  port=address[1],
    2280              )
    2281              return sock
    2282  
    2283          self.conn._create_connection = _create_connection
    2284          self.conn.set_tunnel('destination.com')
    2285          exc = None
    2286          try:
    2287              self.conn.request('HEAD', '/', '')
    2288          except OSError as e:
    2289              # keeping a reference to exc keeps response alive in the traceback
    2290              exc = e
    2291          self.assertIsNotNone(exc)
    2292          self.assertTrue(sock.file_closed)
    2293  
    2294  
    2295  if __name__ == '__main__':
    2296      unittest.main(verbosity=2)