(root)/
Python-3.12.0/
Lib/
test/
test_httpservers.py
       1  """Unittests for the various HTTPServer modules.
       2  
       3  Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>,
       4  Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
       5  """
       6  from collections import OrderedDict
       7  from http.server import BaseHTTPRequestHandler, HTTPServer, \
       8       SimpleHTTPRequestHandler, CGIHTTPRequestHandler
       9  from http import server, HTTPStatus
      10  
      11  import os
      12  import socket
      13  import sys
      14  import re
      15  import base64
      16  import ntpath
      17  import pathlib
      18  import shutil
      19  import email.message
      20  import email.utils
      21  import html
      22  import http, http.client
      23  import urllib.parse
      24  import tempfile
      25  import time
      26  import datetime
      27  import threading
      28  from unittest import mock
      29  import warnings
      30  from io import BytesIO, StringIO
      31  
      32  import unittest
      33  from test import support
      34  from test.support import os_helper
      35  from test.support import threading_helper
      36  
      37  support.requires_working_socket(module=True)
      38  
      39  class ESC[4;38;5;81mNoLogRequestHandler:
      40      def log_message(self, *args):
      41          # don't write log messages to stderr
      42          pass
      43  
      44      def read(self, n=None):
      45          return ''
      46  
      47  
      48  class ESC[4;38;5;81mTestServerThread(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
      49      def __init__(self, test_object, request_handler):
      50          threading.Thread.__init__(self)
      51          self.request_handler = request_handler
      52          self.test_object = test_object
      53  
      54      def run(self):
      55          self.server = HTTPServer(('localhost', 0), self.request_handler)
      56          self.test_object.HOST, self.test_object.PORT = self.server.socket.getsockname()
      57          self.test_object.server_started.set()
      58          self.test_object = None
      59          try:
      60              self.server.serve_forever(0.05)
      61          finally:
      62              self.server.server_close()
      63  
      64      def stop(self):
      65          self.server.shutdown()
      66          self.join()
      67  
      68  
      69  class ESC[4;38;5;81mBaseTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      70      def setUp(self):
      71          self._threads = threading_helper.threading_setup()
      72          os.environ = os_helper.EnvironmentVarGuard()
      73          self.server_started = threading.Event()
      74          self.thread = TestServerThread(self, self.request_handler)
      75          self.thread.start()
      76          self.server_started.wait()
      77  
      78      def tearDown(self):
      79          self.thread.stop()
      80          self.thread = None
      81          os.environ.__exit__()
      82          threading_helper.threading_cleanup(*self._threads)
      83  
      84      def request(self, uri, method='GET', body=None, headers={}):
      85          self.connection = http.client.HTTPConnection(self.HOST, self.PORT)
      86          self.connection.request(method, uri, body, headers)
      87          return self.connection.getresponse()
      88  
      89  
      90  class ESC[4;38;5;81mBaseHTTPServerTestCase(ESC[4;38;5;149mBaseTestCase):
      91      class ESC[4;38;5;81mrequest_handler(ESC[4;38;5;149mNoLogRequestHandler, ESC[4;38;5;149mBaseHTTPRequestHandler):
      92          protocol_version = 'HTTP/1.1'
      93          default_request_version = 'HTTP/1.1'
      94  
      95          def do_TEST(self):
      96              self.send_response(HTTPStatus.NO_CONTENT)
      97              self.send_header('Content-Type', 'text/html')
      98              self.send_header('Connection', 'close')
      99              self.end_headers()
     100  
     101          def do_KEEP(self):
     102              self.send_response(HTTPStatus.NO_CONTENT)
     103              self.send_header('Content-Type', 'text/html')
     104              self.send_header('Connection', 'keep-alive')
     105              self.end_headers()
     106  
     107          def do_KEYERROR(self):
     108              self.send_error(999)
     109  
     110          def do_NOTFOUND(self):
     111              self.send_error(HTTPStatus.NOT_FOUND)
     112  
     113          def do_EXPLAINERROR(self):
     114              self.send_error(999, "Short Message",
     115                              "This is a long \n explanation")
     116  
     117          def do_CUSTOM(self):
     118              self.send_response(999)
     119              self.send_header('Content-Type', 'text/html')
     120              self.send_header('Connection', 'close')
     121              self.end_headers()
     122  
     123          def do_LATINONEHEADER(self):
     124              self.send_response(999)
     125              self.send_header('X-Special', 'Dängerous Mind')
     126              self.send_header('Connection', 'close')
     127              self.end_headers()
     128              body = self.headers['x-special-incoming'].encode('utf-8')
     129              self.wfile.write(body)
     130  
     131          def do_SEND_ERROR(self):
     132              self.send_error(int(self.path[1:]))
     133  
     134          def do_HEAD(self):
     135              self.send_error(int(self.path[1:]))
     136  
     137      def setUp(self):
     138          BaseTestCase.setUp(self)
     139          self.con = http.client.HTTPConnection(self.HOST, self.PORT)
     140          self.con.connect()
     141  
     142      def test_command(self):
     143          self.con.request('GET', '/')
     144          res = self.con.getresponse()
     145          self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
     146  
     147      def test_request_line_trimming(self):
     148          self.con._http_vsn_str = 'HTTP/1.1\n'
     149          self.con.putrequest('XYZBOGUS', '/')
     150          self.con.endheaders()
     151          res = self.con.getresponse()
     152          self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
     153  
     154      def test_version_bogus(self):
     155          self.con._http_vsn_str = 'FUBAR'
     156          self.con.putrequest('GET', '/')
     157          self.con.endheaders()
     158          res = self.con.getresponse()
     159          self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
     160  
     161      def test_version_digits(self):
     162          self.con._http_vsn_str = 'HTTP/9.9.9'
     163          self.con.putrequest('GET', '/')
     164          self.con.endheaders()
     165          res = self.con.getresponse()
     166          self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
     167  
     168      def test_version_signs_and_underscores(self):
     169          self.con._http_vsn_str = 'HTTP/-9_9_9.+9_9_9'
     170          self.con.putrequest('GET', '/')
     171          self.con.endheaders()
     172          res = self.con.getresponse()
     173          self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
     174  
     175      def test_major_version_number_too_long(self):
     176          self.con._http_vsn_str = 'HTTP/909876543210.0'
     177          self.con.putrequest('GET', '/')
     178          self.con.endheaders()
     179          res = self.con.getresponse()
     180          self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
     181  
     182      def test_minor_version_number_too_long(self):
     183          self.con._http_vsn_str = 'HTTP/1.909876543210'
     184          self.con.putrequest('GET', '/')
     185          self.con.endheaders()
     186          res = self.con.getresponse()
     187          self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
     188  
     189      def test_version_none_get(self):
     190          self.con._http_vsn_str = ''
     191          self.con.putrequest('GET', '/')
     192          self.con.endheaders()
     193          res = self.con.getresponse()
     194          self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
     195  
     196      def test_version_none(self):
     197          # Test that a valid method is rejected when not HTTP/1.x
     198          self.con._http_vsn_str = ''
     199          self.con.putrequest('CUSTOM', '/')
     200          self.con.endheaders()
     201          res = self.con.getresponse()
     202          self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
     203  
     204      def test_version_invalid(self):
     205          self.con._http_vsn = 99
     206          self.con._http_vsn_str = 'HTTP/9.9'
     207          self.con.putrequest('GET', '/')
     208          self.con.endheaders()
     209          res = self.con.getresponse()
     210          self.assertEqual(res.status, HTTPStatus.HTTP_VERSION_NOT_SUPPORTED)
     211  
     212      def test_send_blank(self):
     213          self.con._http_vsn_str = ''
     214          self.con.putrequest('', '')
     215          self.con.endheaders()
     216          res = self.con.getresponse()
     217          self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
     218  
     219      def test_header_close(self):
     220          self.con.putrequest('GET', '/')
     221          self.con.putheader('Connection', 'close')
     222          self.con.endheaders()
     223          res = self.con.getresponse()
     224          self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
     225  
     226      def test_header_keep_alive(self):
     227          self.con._http_vsn_str = 'HTTP/1.1'
     228          self.con.putrequest('GET', '/')
     229          self.con.putheader('Connection', 'keep-alive')
     230          self.con.endheaders()
     231          res = self.con.getresponse()
     232          self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
     233  
     234      def test_handler(self):
     235          self.con.request('TEST', '/')
     236          res = self.con.getresponse()
     237          self.assertEqual(res.status, HTTPStatus.NO_CONTENT)
     238  
     239      def test_return_header_keep_alive(self):
     240          self.con.request('KEEP', '/')
     241          res = self.con.getresponse()
     242          self.assertEqual(res.getheader('Connection'), 'keep-alive')
     243          self.con.request('TEST', '/')
     244          self.addCleanup(self.con.close)
     245  
     246      def test_internal_key_error(self):
     247          self.con.request('KEYERROR', '/')
     248          res = self.con.getresponse()
     249          self.assertEqual(res.status, 999)
     250  
     251      def test_return_custom_status(self):
     252          self.con.request('CUSTOM', '/')
     253          res = self.con.getresponse()
     254          self.assertEqual(res.status, 999)
     255  
     256      def test_return_explain_error(self):
     257          self.con.request('EXPLAINERROR', '/')
     258          res = self.con.getresponse()
     259          self.assertEqual(res.status, 999)
     260          self.assertTrue(int(res.getheader('Content-Length')))
     261  
     262      def test_latin1_header(self):
     263          self.con.request('LATINONEHEADER', '/', headers={
     264              'X-Special-Incoming':       'Ärger mit Unicode'
     265          })
     266          res = self.con.getresponse()
     267          self.assertEqual(res.getheader('X-Special'), 'Dängerous Mind')
     268          self.assertEqual(res.read(), 'Ärger mit Unicode'.encode('utf-8'))
     269  
     270      def test_error_content_length(self):
     271          # Issue #16088: standard error responses should have a content-length
     272          self.con.request('NOTFOUND', '/')
     273          res = self.con.getresponse()
     274          self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
     275  
     276          data = res.read()
     277          self.assertEqual(int(res.getheader('Content-Length')), len(data))
     278  
     279      def test_send_error(self):
     280          allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED,
     281                                           HTTPStatus.RESET_CONTENT)
     282          for code in (HTTPStatus.NO_CONTENT, HTTPStatus.NOT_MODIFIED,
     283                       HTTPStatus.PROCESSING, HTTPStatus.RESET_CONTENT,
     284                       HTTPStatus.SWITCHING_PROTOCOLS):
     285              self.con.request('SEND_ERROR', '/{}'.format(code))
     286              res = self.con.getresponse()
     287              self.assertEqual(code, res.status)
     288              self.assertEqual(None, res.getheader('Content-Length'))
     289              self.assertEqual(None, res.getheader('Content-Type'))
     290              if code not in allow_transfer_encoding_codes:
     291                  self.assertEqual(None, res.getheader('Transfer-Encoding'))
     292  
     293              data = res.read()
     294              self.assertEqual(b'', data)
     295  
     296      def test_head_via_send_error(self):
     297          allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED,
     298                                           HTTPStatus.RESET_CONTENT)
     299          for code in (HTTPStatus.OK, HTTPStatus.NO_CONTENT,
     300                       HTTPStatus.NOT_MODIFIED, HTTPStatus.RESET_CONTENT,
     301                       HTTPStatus.SWITCHING_PROTOCOLS):
     302              self.con.request('HEAD', '/{}'.format(code))
     303              res = self.con.getresponse()
     304              self.assertEqual(code, res.status)
     305              if code == HTTPStatus.OK:
     306                  self.assertTrue(int(res.getheader('Content-Length')) > 0)
     307                  self.assertIn('text/html', res.getheader('Content-Type'))
     308              else:
     309                  self.assertEqual(None, res.getheader('Content-Length'))
     310                  self.assertEqual(None, res.getheader('Content-Type'))
     311              if code not in allow_transfer_encoding_codes:
     312                  self.assertEqual(None, res.getheader('Transfer-Encoding'))
     313  
     314              data = res.read()
     315              self.assertEqual(b'', data)
     316  
     317  
     318  class ESC[4;38;5;81mRequestHandlerLoggingTestCase(ESC[4;38;5;149mBaseTestCase):
     319      class ESC[4;38;5;81mrequest_handler(ESC[4;38;5;149mBaseHTTPRequestHandler):
     320          protocol_version = 'HTTP/1.1'
     321          default_request_version = 'HTTP/1.1'
     322  
     323          def do_GET(self):
     324              self.send_response(HTTPStatus.OK)
     325              self.end_headers()
     326  
     327          def do_ERROR(self):
     328              self.send_error(HTTPStatus.NOT_FOUND, 'File not found')
     329  
     330      def test_get(self):
     331          self.con = http.client.HTTPConnection(self.HOST, self.PORT)
     332          self.con.connect()
     333  
     334          with support.captured_stderr() as err:
     335              self.con.request('GET', '/')
     336              self.con.getresponse()
     337  
     338          self.assertTrue(
     339              err.getvalue().endswith('"GET / HTTP/1.1" 200 -\n'))
     340  
     341      def test_err(self):
     342          self.con = http.client.HTTPConnection(self.HOST, self.PORT)
     343          self.con.connect()
     344  
     345          with support.captured_stderr() as err:
     346              self.con.request('ERROR', '/')
     347              self.con.getresponse()
     348  
     349          lines = err.getvalue().split('\n')
     350          self.assertTrue(lines[0].endswith('code 404, message File not found'))
     351          self.assertTrue(lines[1].endswith('"ERROR / HTTP/1.1" 404 -'))
     352  
     353  
     354  class ESC[4;38;5;81mSimpleHTTPServerTestCase(ESC[4;38;5;149mBaseTestCase):
     355      class ESC[4;38;5;81mrequest_handler(ESC[4;38;5;149mNoLogRequestHandler, ESC[4;38;5;149mSimpleHTTPRequestHandler):
     356          pass
     357  
     358      def setUp(self):
     359          super().setUp()
     360          self.cwd = os.getcwd()
     361          basetempdir = tempfile.gettempdir()
     362          os.chdir(basetempdir)
     363          self.data = b'We are the knights who say Ni!'
     364          self.tempdir = tempfile.mkdtemp(dir=basetempdir)
     365          self.tempdir_name = os.path.basename(self.tempdir)
     366          self.base_url = '/' + self.tempdir_name
     367          tempname = os.path.join(self.tempdir, 'test')
     368          with open(tempname, 'wb') as temp:
     369              temp.write(self.data)
     370              temp.flush()
     371          mtime = os.stat(tempname).st_mtime
     372          # compute last modification datetime for browser cache tests
     373          last_modif = datetime.datetime.fromtimestamp(mtime,
     374              datetime.timezone.utc)
     375          self.last_modif_datetime = last_modif.replace(microsecond=0)
     376          self.last_modif_header = email.utils.formatdate(
     377              last_modif.timestamp(), usegmt=True)
     378  
     379      def tearDown(self):
     380          try:
     381              os.chdir(self.cwd)
     382              try:
     383                  shutil.rmtree(self.tempdir)
     384              except:
     385                  pass
     386          finally:
     387              super().tearDown()
     388  
     389      def check_status_and_reason(self, response, status, data=None):
     390          def close_conn():
     391              """Don't close reader yet so we can check if there was leftover
     392              buffered input"""
     393              nonlocal reader
     394              reader = response.fp
     395              response.fp = None
     396          reader = None
     397          response._close_conn = close_conn
     398  
     399          body = response.read()
     400          self.assertTrue(response)
     401          self.assertEqual(response.status, status)
     402          self.assertIsNotNone(response.reason)
     403          if data:
     404              self.assertEqual(data, body)
     405          # Ensure the server has not set up a persistent connection, and has
     406          # not sent any extra data
     407          self.assertEqual(response.version, 10)
     408          self.assertEqual(response.msg.get("Connection", "close"), "close")
     409          self.assertEqual(reader.read(30), b'', 'Connection should be closed')
     410  
     411          reader.close()
     412          return body
     413  
     414      @unittest.skipIf(sys.platform == 'darwin',
     415                       'undecodable name cannot always be decoded on macOS')
     416      @unittest.skipIf(sys.platform == 'win32',
     417                       'undecodable name cannot be decoded on win32')
     418      @unittest.skipUnless(os_helper.TESTFN_UNDECODABLE,
     419                           'need os_helper.TESTFN_UNDECODABLE')
     420      def test_undecodable_filename(self):
     421          enc = sys.getfilesystemencoding()
     422          filename = os.fsdecode(os_helper.TESTFN_UNDECODABLE) + '.txt'
     423          with open(os.path.join(self.tempdir, filename), 'wb') as f:
     424              f.write(os_helper.TESTFN_UNDECODABLE)
     425          response = self.request(self.base_url + '/')
     426          if sys.platform == 'darwin':
     427              # On Mac OS the HFS+ filesystem replaces bytes that aren't valid
     428              # UTF-8 into a percent-encoded value.
     429              for name in os.listdir(self.tempdir):
     430                  if name != 'test': # Ignore a filename created in setUp().
     431                      filename = name
     432                      break
     433          body = self.check_status_and_reason(response, HTTPStatus.OK)
     434          quotedname = urllib.parse.quote(filename, errors='surrogatepass')
     435          self.assertIn(('href="%s"' % quotedname)
     436                        .encode(enc, 'surrogateescape'), body)
     437          self.assertIn(('>%s<' % html.escape(filename, quote=False))
     438                        .encode(enc, 'surrogateescape'), body)
     439          response = self.request(self.base_url + '/' + quotedname)
     440          self.check_status_and_reason(response, HTTPStatus.OK,
     441                                       data=os_helper.TESTFN_UNDECODABLE)
     442  
     443      def test_undecodable_parameter(self):
     444          # sanity check using a valid parameter
     445          response = self.request(self.base_url + '/?x=123').read()
     446          self.assertRegex(response, rf'listing for {self.base_url}/\?x=123'.encode('latin1'))
     447          # now the bogus encoding
     448          response = self.request(self.base_url + '/?x=%bb').read()
     449          self.assertRegex(response, rf'listing for {self.base_url}/\?x=\xef\xbf\xbd'.encode('latin1'))
     450  
     451      def test_get_dir_redirect_location_domain_injection_bug(self):
     452          """Ensure //evil.co/..%2f../../X does not put //evil.co/ in Location.
     453  
     454          //netloc/ in a Location header is a redirect to a new host.
     455          https://github.com/python/cpython/issues/87389
     456  
     457          This checks that a path resolving to a directory on our server cannot
     458          resolve into a redirect to another server.
     459          """
     460          os.mkdir(os.path.join(self.tempdir, 'existing_directory'))
     461          url = f'/python.org/..%2f..%2f..%2f..%2f..%2f../%0a%0d/../{self.tempdir_name}/existing_directory'
     462          expected_location = f'{url}/'  # /python.org.../ single slash single prefix, trailing slash
     463          # Canonicalizes to /tmp/tempdir_name/existing_directory which does
     464          # exist and is a dir, triggering the 301 redirect logic.
     465          response = self.request(url)
     466          self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
     467          location = response.getheader('Location')
     468          self.assertEqual(location, expected_location, msg='non-attack failed!')
     469  
     470          # //python.org... multi-slash prefix, no trailing slash
     471          attack_url = f'/{url}'
     472          response = self.request(attack_url)
     473          self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
     474          location = response.getheader('Location')
     475          self.assertFalse(location.startswith('//'), msg=location)
     476          self.assertEqual(location, expected_location,
     477                  msg='Expected Location header to start with a single / and '
     478                  'end with a / as this is a directory redirect.')
     479  
     480          # ///python.org... triple-slash prefix, no trailing slash
     481          attack3_url = f'//{url}'
     482          response = self.request(attack3_url)
     483          self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
     484          self.assertEqual(response.getheader('Location'), expected_location)
     485  
     486          # If the second word in the http request (Request-URI for the http
     487          # method) is a full URI, we don't worry about it, as that'll be parsed
     488          # and reassembled as a full URI within BaseHTTPRequestHandler.send_head
     489          # so no errant scheme-less //netloc//evil.co/ domain mixup can happen.
     490          attack_scheme_netloc_2slash_url = f'https://pypi.org/{url}'
     491          expected_scheme_netloc_location = f'{attack_scheme_netloc_2slash_url}/'
     492          response = self.request(attack_scheme_netloc_2slash_url)
     493          self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
     494          location = response.getheader('Location')
     495          # We're just ensuring that the scheme and domain make it through, if
     496          # there are or aren't multiple slashes at the start of the path that
     497          # follows that isn't important in this Location: header.
     498          self.assertTrue(location.startswith('https://pypi.org/'), msg=location)
     499  
     500      def test_get(self):
     501          #constructs the path relative to the root directory of the HTTPServer
     502          response = self.request(self.base_url + '/test')
     503          self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
     504          # check for trailing "/" which should return 404. See Issue17324
     505          response = self.request(self.base_url + '/test/')
     506          self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
     507          response = self.request(self.base_url + '/')
     508          self.check_status_and_reason(response, HTTPStatus.OK)
     509          response = self.request(self.base_url)
     510          self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
     511          self.assertEqual(response.getheader("Content-Length"), "0")
     512          response = self.request(self.base_url + '/?hi=2')
     513          self.check_status_and_reason(response, HTTPStatus.OK)
     514          response = self.request(self.base_url + '?hi=1')
     515          self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
     516          self.assertEqual(response.getheader("Location"),
     517                           self.base_url + "/?hi=1")
     518          response = self.request('/ThisDoesNotExist')
     519          self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
     520          response = self.request('/' + 'ThisDoesNotExist' + '/')
     521          self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
     522          os.makedirs(os.path.join(self.tempdir, 'spam', 'index.html'))
     523          response = self.request(self.base_url + '/spam/')
     524          self.check_status_and_reason(response, HTTPStatus.OK)
     525  
     526          data = b"Dummy index file\r\n"
     527          with open(os.path.join(self.tempdir_name, 'index.html'), 'wb') as f:
     528              f.write(data)
     529          response = self.request(self.base_url + '/')
     530          self.check_status_and_reason(response, HTTPStatus.OK, data)
     531  
     532          # chmod() doesn't work as expected on Windows, and filesystem
     533          # permissions are ignored by root on Unix.
     534          if os.name == 'posix' and os.geteuid() != 0:
     535              os.chmod(self.tempdir, 0)
     536              try:
     537                  response = self.request(self.base_url + '/')
     538                  self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
     539              finally:
     540                  os.chmod(self.tempdir, 0o755)
     541  
     542      def test_head(self):
     543          response = self.request(
     544              self.base_url + '/test', method='HEAD')
     545          self.check_status_and_reason(response, HTTPStatus.OK)
     546          self.assertEqual(response.getheader('content-length'),
     547                           str(len(self.data)))
     548          self.assertEqual(response.getheader('content-type'),
     549                           'application/octet-stream')
     550  
     551      def test_browser_cache(self):
     552          """Check that when a request to /test is sent with the request header
     553          If-Modified-Since set to date of last modification, the server returns
     554          status code 304, not 200
     555          """
     556          headers = email.message.Message()
     557          headers['If-Modified-Since'] = self.last_modif_header
     558          response = self.request(self.base_url + '/test', headers=headers)
     559          self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED)
     560  
     561          # one hour after last modification : must return 304
     562          new_dt = self.last_modif_datetime + datetime.timedelta(hours=1)
     563          headers = email.message.Message()
     564          headers['If-Modified-Since'] = email.utils.format_datetime(new_dt,
     565              usegmt=True)
     566          response = self.request(self.base_url + '/test', headers=headers)
     567          self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED)
     568  
     569      def test_browser_cache_file_changed(self):
     570          # with If-Modified-Since earlier than Last-Modified, must return 200
     571          dt = self.last_modif_datetime
     572          # build datetime object : 365 days before last modification
     573          old_dt = dt - datetime.timedelta(days=365)
     574          headers = email.message.Message()
     575          headers['If-Modified-Since'] = email.utils.format_datetime(old_dt,
     576              usegmt=True)
     577          response = self.request(self.base_url + '/test', headers=headers)
     578          self.check_status_and_reason(response, HTTPStatus.OK)
     579  
     580      def test_browser_cache_with_If_None_Match_header(self):
     581          # if If-None-Match header is present, ignore If-Modified-Since
     582  
     583          headers = email.message.Message()
     584          headers['If-Modified-Since'] = self.last_modif_header
     585          headers['If-None-Match'] = "*"
     586          response = self.request(self.base_url + '/test', headers=headers)
     587          self.check_status_and_reason(response, HTTPStatus.OK)
     588  
     589      def test_invalid_requests(self):
     590          response = self.request('/', method='FOO')
     591          self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
     592          # requests must be case sensitive,so this should fail too
     593          response = self.request('/', method='custom')
     594          self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
     595          response = self.request('/', method='GETs')
     596          self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
     597  
     598      def test_last_modified(self):
     599          """Checks that the datetime returned in Last-Modified response header
     600          is the actual datetime of last modification, rounded to the second
     601          """
     602          response = self.request(self.base_url + '/test')
     603          self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
     604          last_modif_header = response.headers['Last-modified']
     605          self.assertEqual(last_modif_header, self.last_modif_header)
     606  
     607      def test_path_without_leading_slash(self):
     608          response = self.request(self.tempdir_name + '/test')
     609          self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
     610          response = self.request(self.tempdir_name + '/test/')
     611          self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
     612          response = self.request(self.tempdir_name + '/')
     613          self.check_status_and_reason(response, HTTPStatus.OK)
     614          response = self.request(self.tempdir_name)
     615          self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
     616          response = self.request(self.tempdir_name + '/?hi=2')
     617          self.check_status_and_reason(response, HTTPStatus.OK)
     618          response = self.request(self.tempdir_name + '?hi=1')
     619          self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
     620          self.assertEqual(response.getheader("Location"),
     621                           self.tempdir_name + "/?hi=1")
     622  
     623      def test_html_escape_filename(self):
     624          filename = '<test&>.txt'
     625          fullpath = os.path.join(self.tempdir, filename)
     626  
     627          try:
     628              open(fullpath, 'wb').close()
     629          except OSError:
     630              raise unittest.SkipTest('Can not create file %s on current file '
     631                                      'system' % filename)
     632  
     633          try:
     634              response = self.request(self.base_url + '/')
     635              body = self.check_status_and_reason(response, HTTPStatus.OK)
     636              enc = response.headers.get_content_charset()
     637          finally:
     638              os.unlink(fullpath)  # avoid affecting test_undecodable_filename
     639  
     640          self.assertIsNotNone(enc)
     641          html_text = '>%s<' % html.escape(filename, quote=False)
     642          self.assertIn(html_text.encode(enc), body)
     643  
     644  
     645  cgi_file1 = """\
     646  #!%s
     647  
     648  print("Content-type: text/html")
     649  print()
     650  print("Hello World")
     651  """
     652  
     653  cgi_file2 = """\
     654  #!%s
     655  import os
     656  import sys
     657  import urllib.parse
     658  
     659  print("Content-type: text/html")
     660  print()
     661  
     662  content_length = int(os.environ["CONTENT_LENGTH"])
     663  query_string = sys.stdin.buffer.read(content_length)
     664  params = {key.decode("utf-8"): val.decode("utf-8")
     665              for key, val in urllib.parse.parse_qsl(query_string)}
     666  
     667  print("%%s, %%s, %%s" %% (params["spam"], params["eggs"], params["bacon"]))
     668  """
     669  
     670  cgi_file4 = """\
     671  #!%s
     672  import os
     673  
     674  print("Content-type: text/html")
     675  print()
     676  
     677  print(os.environ["%s"])
     678  """
     679  
     680  cgi_file6 = """\
     681  #!%s
     682  import os
     683  
     684  print("X-ambv: was here")
     685  print("Content-type: text/html")
     686  print()
     687  print("<pre>")
     688  for k, v in os.environ.items():
     689      try:
     690          k.encode('ascii')
     691          v.encode('ascii')
     692      except UnicodeEncodeError:
     693          continue  # see: BPO-44647
     694      print(f"{k}={v}")
     695  print("</pre>")
     696  """
     697  
     698  
     699  @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
     700          "This test can't be run reliably as root (issue #13308).")
     701  class ESC[4;38;5;81mCGIHTTPServerTestCase(ESC[4;38;5;149mBaseTestCase):
     702      class ESC[4;38;5;81mrequest_handler(ESC[4;38;5;149mNoLogRequestHandler, ESC[4;38;5;149mCGIHTTPRequestHandler):
     703          def run_cgi(self):
     704              # Silence the threading + fork DeprecationWarning this causes.
     705              # gh-109096: This is deprecated in 3.13 to go away in 3.15.
     706              with warnings.catch_warnings(action='ignore', category=DeprecationWarning):
     707                  return super().run_cgi()
     708  
     709      linesep = os.linesep.encode('ascii')
     710  
     711      def setUp(self):
     712          BaseTestCase.setUp(self)
     713          self.cwd = os.getcwd()
     714          self.parent_dir = tempfile.mkdtemp()
     715          self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
     716          self.cgi_child_dir = os.path.join(self.cgi_dir, 'child-dir')
     717          self.sub_dir_1 = os.path.join(self.parent_dir, 'sub')
     718          self.sub_dir_2 = os.path.join(self.sub_dir_1, 'dir')
     719          self.cgi_dir_in_sub_dir = os.path.join(self.sub_dir_2, 'cgi-bin')
     720          os.mkdir(self.cgi_dir)
     721          os.mkdir(self.cgi_child_dir)
     722          os.mkdir(self.sub_dir_1)
     723          os.mkdir(self.sub_dir_2)
     724          os.mkdir(self.cgi_dir_in_sub_dir)
     725          self.nocgi_path = None
     726          self.file1_path = None
     727          self.file2_path = None
     728          self.file3_path = None
     729          self.file4_path = None
     730          self.file5_path = None
     731  
     732          # The shebang line should be pure ASCII: use symlink if possible.
     733          # See issue #7668.
     734          self._pythonexe_symlink = None
     735          if os_helper.can_symlink():
     736              self.pythonexe = os.path.join(self.parent_dir, 'python')
     737              self._pythonexe_symlink = support.PythonSymlink(self.pythonexe).__enter__()
     738          else:
     739              self.pythonexe = sys.executable
     740  
     741          try:
     742              # The python executable path is written as the first line of the
     743              # CGI Python script. The encoding cookie cannot be used, and so the
     744              # path should be encodable to the default script encoding (utf-8)
     745              self.pythonexe.encode('utf-8')
     746          except UnicodeEncodeError:
     747              self.tearDown()
     748              self.skipTest("Python executable path is not encodable to utf-8")
     749  
     750          self.nocgi_path = os.path.join(self.parent_dir, 'nocgi.py')
     751          with open(self.nocgi_path, 'w', encoding='utf-8') as fp:
     752              fp.write(cgi_file1 % self.pythonexe)
     753          os.chmod(self.nocgi_path, 0o777)
     754  
     755          self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
     756          with open(self.file1_path, 'w', encoding='utf-8') as file1:
     757              file1.write(cgi_file1 % self.pythonexe)
     758          os.chmod(self.file1_path, 0o777)
     759  
     760          self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
     761          with open(self.file2_path, 'w', encoding='utf-8') as file2:
     762              file2.write(cgi_file2 % self.pythonexe)
     763          os.chmod(self.file2_path, 0o777)
     764  
     765          self.file3_path = os.path.join(self.cgi_child_dir, 'file3.py')
     766          with open(self.file3_path, 'w', encoding='utf-8') as file3:
     767              file3.write(cgi_file1 % self.pythonexe)
     768          os.chmod(self.file3_path, 0o777)
     769  
     770          self.file4_path = os.path.join(self.cgi_dir, 'file4.py')
     771          with open(self.file4_path, 'w', encoding='utf-8') as file4:
     772              file4.write(cgi_file4 % (self.pythonexe, 'QUERY_STRING'))
     773          os.chmod(self.file4_path, 0o777)
     774  
     775          self.file5_path = os.path.join(self.cgi_dir_in_sub_dir, 'file5.py')
     776          with open(self.file5_path, 'w', encoding='utf-8') as file5:
     777              file5.write(cgi_file1 % self.pythonexe)
     778          os.chmod(self.file5_path, 0o777)
     779  
     780          self.file6_path = os.path.join(self.cgi_dir, 'file6.py')
     781          with open(self.file6_path, 'w', encoding='utf-8') as file6:
     782              file6.write(cgi_file6 % self.pythonexe)
     783          os.chmod(self.file6_path, 0o777)
     784  
     785          os.chdir(self.parent_dir)
     786  
     787      def tearDown(self):
     788          try:
     789              os.chdir(self.cwd)
     790              if self._pythonexe_symlink:
     791                  self._pythonexe_symlink.__exit__(None, None, None)
     792              if self.nocgi_path:
     793                  os.remove(self.nocgi_path)
     794              if self.file1_path:
     795                  os.remove(self.file1_path)
     796              if self.file2_path:
     797                  os.remove(self.file2_path)
     798              if self.file3_path:
     799                  os.remove(self.file3_path)
     800              if self.file4_path:
     801                  os.remove(self.file4_path)
     802              if self.file5_path:
     803                  os.remove(self.file5_path)
     804              if self.file6_path:
     805                  os.remove(self.file6_path)
     806              os.rmdir(self.cgi_child_dir)
     807              os.rmdir(self.cgi_dir)
     808              os.rmdir(self.cgi_dir_in_sub_dir)
     809              os.rmdir(self.sub_dir_2)
     810              os.rmdir(self.sub_dir_1)
     811              os.rmdir(self.parent_dir)
     812          finally:
     813              BaseTestCase.tearDown(self)
     814  
     815      def test_url_collapse_path(self):
     816          # verify tail is the last portion and head is the rest on proper urls
     817          test_vectors = {
     818              '': '//',
     819              '..': IndexError,
     820              '/.//..': IndexError,
     821              '/': '//',
     822              '//': '//',
     823              '/\\': '//\\',
     824              '/.//': '//',
     825              'cgi-bin/file1.py': '/cgi-bin/file1.py',
     826              '/cgi-bin/file1.py': '/cgi-bin/file1.py',
     827              'a': '//a',
     828              '/a': '//a',
     829              '//a': '//a',
     830              './a': '//a',
     831              './C:/': '/C:/',
     832              '/a/b': '/a/b',
     833              '/a/b/': '/a/b/',
     834              '/a/b/.': '/a/b/',
     835              '/a/b/c/..': '/a/b/',
     836              '/a/b/c/../d': '/a/b/d',
     837              '/a/b/c/../d/e/../f': '/a/b/d/f',
     838              '/a/b/c/../d/e/../../f': '/a/b/f',
     839              '/a/b/c/../d/e/.././././..//f': '/a/b/f',
     840              '../a/b/c/../d/e/.././././..//f': IndexError,
     841              '/a/b/c/../d/e/../../../f': '/a/f',
     842              '/a/b/c/../d/e/../../../../f': '//f',
     843              '/a/b/c/../d/e/../../../../../f': IndexError,
     844              '/a/b/c/../d/e/../../../../f/..': '//',
     845              '/a/b/c/../d/e/../../../../f/../.': '//',
     846          }
     847          for path, expected in test_vectors.items():
     848              if isinstance(expected, type) and issubclass(expected, Exception):
     849                  self.assertRaises(expected,
     850                                    server._url_collapse_path, path)
     851              else:
     852                  actual = server._url_collapse_path(path)
     853                  self.assertEqual(expected, actual,
     854                                   msg='path = %r\nGot:    %r\nWanted: %r' %
     855                                   (path, actual, expected))
     856  
     857      def test_headers_and_content(self):
     858          res = self.request('/cgi-bin/file1.py')
     859          self.assertEqual(
     860              (res.read(), res.getheader('Content-type'), res.status),
     861              (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK))
     862  
     863      def test_issue19435(self):
     864          res = self.request('///////////nocgi.py/../cgi-bin/nothere.sh')
     865          self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
     866  
     867      def test_post(self):
     868          params = urllib.parse.urlencode(
     869              {'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
     870          headers = {'Content-type' : 'application/x-www-form-urlencoded'}
     871          res = self.request('/cgi-bin/file2.py', 'POST', params, headers)
     872  
     873          self.assertEqual(res.read(), b'1, python, 123456' + self.linesep)
     874  
     875      def test_invaliduri(self):
     876          res = self.request('/cgi-bin/invalid')
     877          res.read()
     878          self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
     879  
     880      def test_authorization(self):
     881          headers = {b'Authorization' : b'Basic ' +
     882                     base64.b64encode(b'username:pass')}
     883          res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
     884          self.assertEqual(
     885              (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
     886              (res.read(), res.getheader('Content-type'), res.status))
     887  
     888      def test_no_leading_slash(self):
     889          # http://bugs.python.org/issue2254
     890          res = self.request('cgi-bin/file1.py')
     891          self.assertEqual(
     892              (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
     893              (res.read(), res.getheader('Content-type'), res.status))
     894  
     895      def test_os_environ_is_not_altered(self):
     896          signature = "Test CGI Server"
     897          os.environ['SERVER_SOFTWARE'] = signature
     898          res = self.request('/cgi-bin/file1.py')
     899          self.assertEqual(
     900              (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
     901              (res.read(), res.getheader('Content-type'), res.status))
     902          self.assertEqual(os.environ['SERVER_SOFTWARE'], signature)
     903  
     904      def test_urlquote_decoding_in_cgi_check(self):
     905          res = self.request('/cgi-bin%2ffile1.py')
     906          self.assertEqual(
     907              (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
     908              (res.read(), res.getheader('Content-type'), res.status))
     909  
     910      def test_nested_cgi_path_issue21323(self):
     911          res = self.request('/cgi-bin/child-dir/file3.py')
     912          self.assertEqual(
     913              (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
     914              (res.read(), res.getheader('Content-type'), res.status))
     915  
     916      def test_query_with_multiple_question_mark(self):
     917          res = self.request('/cgi-bin/file4.py?a=b?c=d')
     918          self.assertEqual(
     919              (b'a=b?c=d' + self.linesep, 'text/html', HTTPStatus.OK),
     920              (res.read(), res.getheader('Content-type'), res.status))
     921  
     922      def test_query_with_continuous_slashes(self):
     923          res = self.request('/cgi-bin/file4.py?k=aa%2F%2Fbb&//q//p//=//a//b//')
     924          self.assertEqual(
     925              (b'k=aa%2F%2Fbb&//q//p//=//a//b//' + self.linesep,
     926               'text/html', HTTPStatus.OK),
     927              (res.read(), res.getheader('Content-type'), res.status))
     928  
     929      def test_cgi_path_in_sub_directories(self):
     930          try:
     931              CGIHTTPRequestHandler.cgi_directories.append('/sub/dir/cgi-bin')
     932              res = self.request('/sub/dir/cgi-bin/file5.py')
     933              self.assertEqual(
     934                  (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
     935                  (res.read(), res.getheader('Content-type'), res.status))
     936          finally:
     937              CGIHTTPRequestHandler.cgi_directories.remove('/sub/dir/cgi-bin')
     938  
     939      def test_accept(self):
     940          browser_accept = \
     941                      'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'
     942          tests = (
     943              ((('Accept', browser_accept),), browser_accept),
     944              ((), ''),
     945              # Hack case to get two values for the one header
     946              ((('Accept', 'text/html'), ('ACCEPT', 'text/plain')),
     947                 'text/html,text/plain'),
     948          )
     949          for headers, expected in tests:
     950              headers = OrderedDict(headers)
     951              with self.subTest(headers):
     952                  res = self.request('/cgi-bin/file6.py', 'GET', headers=headers)
     953                  self.assertEqual(http.HTTPStatus.OK, res.status)
     954                  expected = f"HTTP_ACCEPT={expected}".encode('ascii')
     955                  self.assertIn(expected, res.read())
     956  
     957  
     958  class ESC[4;38;5;81mSocketlessRequestHandler(ESC[4;38;5;149mSimpleHTTPRequestHandler):
     959      def __init__(self, directory=None):
     960          request = mock.Mock()
     961          request.makefile.return_value = BytesIO()
     962          super().__init__(request, None, None, directory=directory)
     963  
     964          self.get_called = False
     965          self.protocol_version = "HTTP/1.1"
     966  
     967      def do_GET(self):
     968          self.get_called = True
     969          self.send_response(HTTPStatus.OK)
     970          self.send_header('Content-Type', 'text/html')
     971          self.end_headers()
     972          self.wfile.write(b'<html><body>Data</body></html>\r\n')
     973  
     974      def log_message(self, format, *args):
     975          pass
     976  
     977  class ESC[4;38;5;81mRejectingSocketlessRequestHandler(ESC[4;38;5;149mSocketlessRequestHandler):
     978      def handle_expect_100(self):
     979          self.send_error(HTTPStatus.EXPECTATION_FAILED)
     980          return False
     981  
     982  
     983  class ESC[4;38;5;81mAuditableBytesIO:
     984  
     985      def __init__(self):
     986          self.datas = []
     987  
     988      def write(self, data):
     989          self.datas.append(data)
     990  
     991      def getData(self):
     992          return b''.join(self.datas)
     993  
     994      @property
     995      def numWrites(self):
     996          return len(self.datas)
     997  
     998  
     999  class ESC[4;38;5;81mBaseHTTPRequestHandlerTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1000      """Test the functionality of the BaseHTTPServer.
    1001  
    1002         Test the support for the Expect 100-continue header.
    1003         """
    1004  
    1005      HTTPResponseMatch = re.compile(b'HTTP/1.[0-9]+ 200 OK')
    1006  
    1007      def setUp (self):
    1008          self.handler = SocketlessRequestHandler()
    1009  
    1010      def send_typical_request(self, message):
    1011          input = BytesIO(message)
    1012          output = BytesIO()
    1013          self.handler.rfile = input
    1014          self.handler.wfile = output
    1015          self.handler.handle_one_request()
    1016          output.seek(0)
    1017          return output.readlines()
    1018  
    1019      def verify_get_called(self):
    1020          self.assertTrue(self.handler.get_called)
    1021  
    1022      def verify_expected_headers(self, headers):
    1023          for fieldName in b'Server: ', b'Date: ', b'Content-Type: ':
    1024              self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1)
    1025  
    1026      def verify_http_server_response(self, response):
    1027          match = self.HTTPResponseMatch.search(response)
    1028          self.assertIsNotNone(match)
    1029  
    1030      def test_unprintable_not_logged(self):
    1031          # We call the method from the class directly as our Socketless
    1032          # Handler subclass overrode it... nice for everything BUT this test.
    1033          self.handler.client_address = ('127.0.0.1', 1337)
    1034          log_message = BaseHTTPRequestHandler.log_message
    1035          with mock.patch.object(sys, 'stderr', StringIO()) as fake_stderr:
    1036              log_message(self.handler, '/foo')
    1037              log_message(self.handler, '/\033bar\000\033')
    1038              log_message(self.handler, '/spam %s.', 'a')
    1039              log_message(self.handler, '/spam %s.', '\033\x7f\x9f\xa0beans')
    1040              log_message(self.handler, '"GET /foo\\b"ar\007 HTTP/1.0"')
    1041          stderr = fake_stderr.getvalue()
    1042          self.assertNotIn('\033', stderr)  # non-printable chars are caught.
    1043          self.assertNotIn('\000', stderr)  # non-printable chars are caught.
    1044          lines = stderr.splitlines()
    1045          self.assertIn('/foo', lines[0])
    1046          self.assertIn(r'/\x1bbar\x00\x1b', lines[1])
    1047          self.assertIn('/spam a.', lines[2])
    1048          self.assertIn('/spam \\x1b\\x7f\\x9f\xa0beans.', lines[3])
    1049          self.assertIn(r'"GET /foo\\b"ar\x07 HTTP/1.0"', lines[4])
    1050  
    1051      def test_http_1_1(self):
    1052          result = self.send_typical_request(b'GET / HTTP/1.1\r\n\r\n')
    1053          self.verify_http_server_response(result[0])
    1054          self.verify_expected_headers(result[1:-1])
    1055          self.verify_get_called()
    1056          self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
    1057          self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
    1058          self.assertEqual(self.handler.command, 'GET')
    1059          self.assertEqual(self.handler.path, '/')
    1060          self.assertEqual(self.handler.request_version, 'HTTP/1.1')
    1061          self.assertSequenceEqual(self.handler.headers.items(), ())
    1062  
    1063      def test_http_1_0(self):
    1064          result = self.send_typical_request(b'GET / HTTP/1.0\r\n\r\n')
    1065          self.verify_http_server_response(result[0])
    1066          self.verify_expected_headers(result[1:-1])
    1067          self.verify_get_called()
    1068          self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
    1069          self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0')
    1070          self.assertEqual(self.handler.command, 'GET')
    1071          self.assertEqual(self.handler.path, '/')
    1072          self.assertEqual(self.handler.request_version, 'HTTP/1.0')
    1073          self.assertSequenceEqual(self.handler.headers.items(), ())
    1074  
    1075      def test_http_0_9(self):
    1076          result = self.send_typical_request(b'GET / HTTP/0.9\r\n\r\n')
    1077          self.assertEqual(len(result), 1)
    1078          self.assertEqual(result[0], b'<html><body>Data</body></html>\r\n')
    1079          self.verify_get_called()
    1080  
    1081      def test_extra_space(self):
    1082          result = self.send_typical_request(
    1083              b'GET /spaced out HTTP/1.1\r\n'
    1084              b'Host: dummy\r\n'
    1085              b'\r\n'
    1086          )
    1087          self.assertTrue(result[0].startswith(b'HTTP/1.1 400 '))
    1088          self.verify_expected_headers(result[1:result.index(b'\r\n')])
    1089          self.assertFalse(self.handler.get_called)
    1090  
    1091      def test_with_continue_1_0(self):
    1092          result = self.send_typical_request(b'GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n')
    1093          self.verify_http_server_response(result[0])
    1094          self.verify_expected_headers(result[1:-1])
    1095          self.verify_get_called()
    1096          self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
    1097          self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0')
    1098          self.assertEqual(self.handler.command, 'GET')
    1099          self.assertEqual(self.handler.path, '/')
    1100          self.assertEqual(self.handler.request_version, 'HTTP/1.0')
    1101          headers = (("Expect", "100-continue"),)
    1102          self.assertSequenceEqual(self.handler.headers.items(), headers)
    1103  
    1104      def test_with_continue_1_1(self):
    1105          result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
    1106          self.assertEqual(result[0], b'HTTP/1.1 100 Continue\r\n')
    1107          self.assertEqual(result[1], b'\r\n')
    1108          self.assertEqual(result[2], b'HTTP/1.1 200 OK\r\n')
    1109          self.verify_expected_headers(result[2:-1])
    1110          self.verify_get_called()
    1111          self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
    1112          self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
    1113          self.assertEqual(self.handler.command, 'GET')
    1114          self.assertEqual(self.handler.path, '/')
    1115          self.assertEqual(self.handler.request_version, 'HTTP/1.1')
    1116          headers = (("Expect", "100-continue"),)
    1117          self.assertSequenceEqual(self.handler.headers.items(), headers)
    1118  
    1119      def test_header_buffering_of_send_error(self):
    1120  
    1121          input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
    1122          output = AuditableBytesIO()
    1123          handler = SocketlessRequestHandler()
    1124          handler.rfile = input
    1125          handler.wfile = output
    1126          handler.request_version = 'HTTP/1.1'
    1127          handler.requestline = ''
    1128          handler.command = None
    1129  
    1130          handler.send_error(418)
    1131          self.assertEqual(output.numWrites, 2)
    1132  
    1133      def test_header_buffering_of_send_response_only(self):
    1134  
    1135          input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
    1136          output = AuditableBytesIO()
    1137          handler = SocketlessRequestHandler()
    1138          handler.rfile = input
    1139          handler.wfile = output
    1140          handler.request_version = 'HTTP/1.1'
    1141  
    1142          handler.send_response_only(418)
    1143          self.assertEqual(output.numWrites, 0)
    1144          handler.end_headers()
    1145          self.assertEqual(output.numWrites, 1)
    1146  
    1147      def test_header_buffering_of_send_header(self):
    1148  
    1149          input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
    1150          output = AuditableBytesIO()
    1151          handler = SocketlessRequestHandler()
    1152          handler.rfile = input
    1153          handler.wfile = output
    1154          handler.request_version = 'HTTP/1.1'
    1155  
    1156          handler.send_header('Foo', 'foo')
    1157          handler.send_header('bar', 'bar')
    1158          self.assertEqual(output.numWrites, 0)
    1159          handler.end_headers()
    1160          self.assertEqual(output.getData(), b'Foo: foo\r\nbar: bar\r\n\r\n')
    1161          self.assertEqual(output.numWrites, 1)
    1162  
    1163      def test_header_unbuffered_when_continue(self):
    1164  
    1165          def _readAndReseek(f):
    1166              pos = f.tell()
    1167              f.seek(0)
    1168              data = f.read()
    1169              f.seek(pos)
    1170              return data
    1171  
    1172          input = BytesIO(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
    1173          output = BytesIO()
    1174          self.handler.rfile = input
    1175          self.handler.wfile = output
    1176          self.handler.request_version = 'HTTP/1.1'
    1177  
    1178          self.handler.handle_one_request()
    1179          self.assertNotEqual(_readAndReseek(output), b'')
    1180          result = _readAndReseek(output).split(b'\r\n')
    1181          self.assertEqual(result[0], b'HTTP/1.1 100 Continue')
    1182          self.assertEqual(result[1], b'')
    1183          self.assertEqual(result[2], b'HTTP/1.1 200 OK')
    1184  
    1185      def test_with_continue_rejected(self):
    1186          usual_handler = self.handler        # Save to avoid breaking any subsequent tests.
    1187          self.handler = RejectingSocketlessRequestHandler()
    1188          result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
    1189          self.assertEqual(result[0], b'HTTP/1.1 417 Expectation Failed\r\n')
    1190          self.verify_expected_headers(result[1:-1])
    1191          # The expect handler should short circuit the usual get method by
    1192          # returning false here, so get_called should be false
    1193          self.assertFalse(self.handler.get_called)
    1194          self.assertEqual(sum(r == b'Connection: close\r\n' for r in result[1:-1]), 1)
    1195          self.handler = usual_handler        # Restore to avoid breaking any subsequent tests.
    1196  
    1197      def test_request_length(self):
    1198          # Issue #10714: huge request lines are discarded, to avoid Denial
    1199          # of Service attacks.
    1200          result = self.send_typical_request(b'GET ' + b'x' * 65537)
    1201          self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n')
    1202          self.assertFalse(self.handler.get_called)
    1203          self.assertIsInstance(self.handler.requestline, str)
    1204  
    1205      def test_header_length(self):
    1206          # Issue #6791: same for headers
    1207          result = self.send_typical_request(
    1208              b'GET / HTTP/1.1\r\nX-Foo: bar' + b'r' * 65537 + b'\r\n\r\n')
    1209          self.assertEqual(result[0], b'HTTP/1.1 431 Line too long\r\n')
    1210          self.assertFalse(self.handler.get_called)
    1211          self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
    1212  
    1213      def test_too_many_headers(self):
    1214          result = self.send_typical_request(
    1215              b'GET / HTTP/1.1\r\n' + b'X-Foo: bar\r\n' * 101 + b'\r\n')
    1216          self.assertEqual(result[0], b'HTTP/1.1 431 Too many headers\r\n')
    1217          self.assertFalse(self.handler.get_called)
    1218          self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
    1219  
    1220      def test_html_escape_on_error(self):
    1221          result = self.send_typical_request(
    1222              b'<script>alert("hello")</script> / HTTP/1.1')
    1223          result = b''.join(result)
    1224          text = '<script>alert("hello")</script>'
    1225          self.assertIn(html.escape(text, quote=False).encode('ascii'), result)
    1226  
    1227      def test_close_connection(self):
    1228          # handle_one_request() should be repeatedly called until
    1229          # it sets close_connection
    1230          def handle_one_request():
    1231              self.handler.close_connection = next(close_values)
    1232          self.handler.handle_one_request = handle_one_request
    1233  
    1234          close_values = iter((True,))
    1235          self.handler.handle()
    1236          self.assertRaises(StopIteration, next, close_values)
    1237  
    1238          close_values = iter((False, False, True))
    1239          self.handler.handle()
    1240          self.assertRaises(StopIteration, next, close_values)
    1241  
    1242      def test_date_time_string(self):
    1243          now = time.time()
    1244          # this is the old code that formats the timestamp
    1245          year, month, day, hh, mm, ss, wd, y, z = time.gmtime(now)
    1246          expected = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % (
    1247              self.handler.weekdayname[wd],
    1248              day,
    1249              self.handler.monthname[month],
    1250              year, hh, mm, ss
    1251          )
    1252          self.assertEqual(self.handler.date_time_string(timestamp=now), expected)
    1253  
    1254  
    1255  class ESC[4;38;5;81mSimpleHTTPRequestHandlerTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1256      """ Test url parsing """
    1257      def setUp(self):
    1258          self.translated_1 = os.path.join(os.getcwd(), 'filename')
    1259          self.translated_2 = os.path.join('foo', 'filename')
    1260          self.translated_3 = os.path.join('bar', 'filename')
    1261          self.handler_1 = SocketlessRequestHandler()
    1262          self.handler_2 = SocketlessRequestHandler(directory='foo')
    1263          self.handler_3 = SocketlessRequestHandler(directory=pathlib.PurePath('bar'))
    1264  
    1265      def test_query_arguments(self):
    1266          path = self.handler_1.translate_path('/filename')
    1267          self.assertEqual(path, self.translated_1)
    1268          path = self.handler_2.translate_path('/filename')
    1269          self.assertEqual(path, self.translated_2)
    1270          path = self.handler_3.translate_path('/filename')
    1271          self.assertEqual(path, self.translated_3)
    1272  
    1273          path = self.handler_1.translate_path('/filename?foo=bar')
    1274          self.assertEqual(path, self.translated_1)
    1275          path = self.handler_2.translate_path('/filename?foo=bar')
    1276          self.assertEqual(path, self.translated_2)
    1277          path = self.handler_3.translate_path('/filename?foo=bar')
    1278          self.assertEqual(path, self.translated_3)
    1279  
    1280          path = self.handler_1.translate_path('/filename?a=b&spam=eggs#zot')
    1281          self.assertEqual(path, self.translated_1)
    1282          path = self.handler_2.translate_path('/filename?a=b&spam=eggs#zot')
    1283          self.assertEqual(path, self.translated_2)
    1284          path = self.handler_3.translate_path('/filename?a=b&spam=eggs#zot')
    1285          self.assertEqual(path, self.translated_3)
    1286  
    1287      def test_start_with_double_slash(self):
    1288          path = self.handler_1.translate_path('//filename')
    1289          self.assertEqual(path, self.translated_1)
    1290          path = self.handler_2.translate_path('//filename')
    1291          self.assertEqual(path, self.translated_2)
    1292          path = self.handler_3.translate_path('//filename')
    1293          self.assertEqual(path, self.translated_3)
    1294  
    1295          path = self.handler_1.translate_path('//filename?foo=bar')
    1296          self.assertEqual(path, self.translated_1)
    1297          path = self.handler_2.translate_path('//filename?foo=bar')
    1298          self.assertEqual(path, self.translated_2)
    1299          path = self.handler_3.translate_path('//filename?foo=bar')
    1300          self.assertEqual(path, self.translated_3)
    1301  
    1302      def test_windows_colon(self):
    1303          with support.swap_attr(server.os, 'path', ntpath):
    1304              path = self.handler_1.translate_path('c:c:c:foo/filename')
    1305              path = path.replace(ntpath.sep, os.sep)
    1306              self.assertEqual(path, self.translated_1)
    1307              path = self.handler_2.translate_path('c:c:c:foo/filename')
    1308              path = path.replace(ntpath.sep, os.sep)
    1309              self.assertEqual(path, self.translated_2)
    1310              path = self.handler_3.translate_path('c:c:c:foo/filename')
    1311              path = path.replace(ntpath.sep, os.sep)
    1312              self.assertEqual(path, self.translated_3)
    1313  
    1314              path = self.handler_1.translate_path('\\c:../filename')
    1315              path = path.replace(ntpath.sep, os.sep)
    1316              self.assertEqual(path, self.translated_1)
    1317              path = self.handler_2.translate_path('\\c:../filename')
    1318              path = path.replace(ntpath.sep, os.sep)
    1319              self.assertEqual(path, self.translated_2)
    1320              path = self.handler_3.translate_path('\\c:../filename')
    1321              path = path.replace(ntpath.sep, os.sep)
    1322              self.assertEqual(path, self.translated_3)
    1323  
    1324              path = self.handler_1.translate_path('c:\\c:..\\foo/filename')
    1325              path = path.replace(ntpath.sep, os.sep)
    1326              self.assertEqual(path, self.translated_1)
    1327              path = self.handler_2.translate_path('c:\\c:..\\foo/filename')
    1328              path = path.replace(ntpath.sep, os.sep)
    1329              self.assertEqual(path, self.translated_2)
    1330              path = self.handler_3.translate_path('c:\\c:..\\foo/filename')
    1331              path = path.replace(ntpath.sep, os.sep)
    1332              self.assertEqual(path, self.translated_3)
    1333  
    1334              path = self.handler_1.translate_path('c:c:foo\\c:c:bar/filename')
    1335              path = path.replace(ntpath.sep, os.sep)
    1336              self.assertEqual(path, self.translated_1)
    1337              path = self.handler_2.translate_path('c:c:foo\\c:c:bar/filename')
    1338              path = path.replace(ntpath.sep, os.sep)
    1339              self.assertEqual(path, self.translated_2)
    1340              path = self.handler_3.translate_path('c:c:foo\\c:c:bar/filename')
    1341              path = path.replace(ntpath.sep, os.sep)
    1342              self.assertEqual(path, self.translated_3)
    1343  
    1344  
    1345  class ESC[4;38;5;81mMiscTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1346      def test_all(self):
    1347          expected = []
    1348          denylist = {'executable', 'nobody_uid', 'test'}
    1349          for name in dir(server):
    1350              if name.startswith('_') or name in denylist:
    1351                  continue
    1352              module_object = getattr(server, name)
    1353              if getattr(module_object, '__module__', None) == 'http.server':
    1354                  expected.append(name)
    1355          self.assertCountEqual(server.__all__, expected)
    1356  
    1357  
    1358  class ESC[4;38;5;81mScriptTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1359  
    1360      def mock_server_class(self):
    1361          return mock.MagicMock(
    1362              return_value=mock.MagicMock(
    1363                  __enter__=mock.MagicMock(
    1364                      return_value=mock.MagicMock(
    1365                          socket=mock.MagicMock(
    1366                              getsockname=lambda: ('', 0),
    1367                          ),
    1368                      ),
    1369                  ),
    1370              ),
    1371          )
    1372  
    1373      @mock.patch('builtins.print')
    1374      def test_server_test_unspec(self, _):
    1375          mock_server = self.mock_server_class()
    1376          server.test(ServerClass=mock_server, bind=None)
    1377          self.assertIn(
    1378              mock_server.address_family,
    1379              (socket.AF_INET6, socket.AF_INET),
    1380          )
    1381  
    1382      @mock.patch('builtins.print')
    1383      def test_server_test_localhost(self, _):
    1384          mock_server = self.mock_server_class()
    1385          server.test(ServerClass=mock_server, bind="localhost")
    1386          self.assertIn(
    1387              mock_server.address_family,
    1388              (socket.AF_INET6, socket.AF_INET),
    1389          )
    1390  
    1391      ipv6_addrs = (
    1392          "::",
    1393          "2001:0db8:85a3:0000:0000:8a2e:0370:7334",
    1394          "::1",
    1395      )
    1396  
    1397      ipv4_addrs = (
    1398          "0.0.0.0",
    1399          "8.8.8.8",
    1400          "127.0.0.1",
    1401      )
    1402  
    1403      @mock.patch('builtins.print')
    1404      def test_server_test_ipv6(self, _):
    1405          for bind in self.ipv6_addrs:
    1406              mock_server = self.mock_server_class()
    1407              server.test(ServerClass=mock_server, bind=bind)
    1408              self.assertEqual(mock_server.address_family, socket.AF_INET6)
    1409  
    1410      @mock.patch('builtins.print')
    1411      def test_server_test_ipv4(self, _):
    1412          for bind in self.ipv4_addrs:
    1413              mock_server = self.mock_server_class()
    1414              server.test(ServerClass=mock_server, bind=bind)
    1415              self.assertEqual(mock_server.address_family, socket.AF_INET)
    1416  
    1417  
    1418  def setUpModule():
    1419      unittest.addModuleCleanup(os.chdir, os.getcwd())
    1420  
    1421  
    1422  if __name__ == '__main__':
    1423      unittest.main()