1 """Unittests for the various HTTPServer modules.
2
3 Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>,
4 Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
5 """
6 from collections import OrderedDict
7 from http.server import BaseHTTPRequestHandler, HTTPServer, \
8 SimpleHTTPRequestHandler, CGIHTTPRequestHandler
9 from http import server, HTTPStatus
10
11 import os
12 import socket
13 import sys
14 import re
15 import base64
16 import ntpath
17 import pathlib
18 import shutil
19 import email.message
20 import email.utils
21 import html
22 import http, http.client
23 import urllib.parse
24 import tempfile
25 import time
26 import datetime
27 import threading
28 from unittest import mock
29 import warnings
30 from io import BytesIO, StringIO
31
32 import unittest
33 from test import support
34 from test.support import os_helper
35 from test.support import threading_helper
36
37 support.requires_working_socket(module=True)
38
39 class ESC[4;38;5;81mNoLogRequestHandler:
40 def log_message(self, *args):
41 # don't write log messages to stderr
42 pass
43
44 def read(self, n=None):
45 return ''
46
47
48 class ESC[4;38;5;81mTestServerThread(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
49 def __init__(self, test_object, request_handler):
50 threading.Thread.__init__(self)
51 self.request_handler = request_handler
52 self.test_object = test_object
53
54 def run(self):
55 self.server = HTTPServer(('localhost', 0), self.request_handler)
56 self.test_object.HOST, self.test_object.PORT = self.server.socket.getsockname()
57 self.test_object.server_started.set()
58 self.test_object = None
59 try:
60 self.server.serve_forever(0.05)
61 finally:
62 self.server.server_close()
63
64 def stop(self):
65 self.server.shutdown()
66 self.join()
67
68
69 class ESC[4;38;5;81mBaseTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
70 def setUp(self):
71 self._threads = threading_helper.threading_setup()
72 os.environ = os_helper.EnvironmentVarGuard()
73 self.server_started = threading.Event()
74 self.thread = TestServerThread(self, self.request_handler)
75 self.thread.start()
76 self.server_started.wait()
77
78 def tearDown(self):
79 self.thread.stop()
80 self.thread = None
81 os.environ.__exit__()
82 threading_helper.threading_cleanup(*self._threads)
83
84 def request(self, uri, method='GET', body=None, headers={}):
85 self.connection = http.client.HTTPConnection(self.HOST, self.PORT)
86 self.connection.request(method, uri, body, headers)
87 return self.connection.getresponse()
88
89
90 class ESC[4;38;5;81mBaseHTTPServerTestCase(ESC[4;38;5;149mBaseTestCase):
91 class ESC[4;38;5;81mrequest_handler(ESC[4;38;5;149mNoLogRequestHandler, ESC[4;38;5;149mBaseHTTPRequestHandler):
92 protocol_version = 'HTTP/1.1'
93 default_request_version = 'HTTP/1.1'
94
95 def do_TEST(self):
96 self.send_response(HTTPStatus.NO_CONTENT)
97 self.send_header('Content-Type', 'text/html')
98 self.send_header('Connection', 'close')
99 self.end_headers()
100
101 def do_KEEP(self):
102 self.send_response(HTTPStatus.NO_CONTENT)
103 self.send_header('Content-Type', 'text/html')
104 self.send_header('Connection', 'keep-alive')
105 self.end_headers()
106
107 def do_KEYERROR(self):
108 self.send_error(999)
109
110 def do_NOTFOUND(self):
111 self.send_error(HTTPStatus.NOT_FOUND)
112
113 def do_EXPLAINERROR(self):
114 self.send_error(999, "Short Message",
115 "This is a long \n explanation")
116
117 def do_CUSTOM(self):
118 self.send_response(999)
119 self.send_header('Content-Type', 'text/html')
120 self.send_header('Connection', 'close')
121 self.end_headers()
122
123 def do_LATINONEHEADER(self):
124 self.send_response(999)
125 self.send_header('X-Special', 'Dängerous Mind')
126 self.send_header('Connection', 'close')
127 self.end_headers()
128 body = self.headers['x-special-incoming'].encode('utf-8')
129 self.wfile.write(body)
130
131 def do_SEND_ERROR(self):
132 self.send_error(int(self.path[1:]))
133
134 def do_HEAD(self):
135 self.send_error(int(self.path[1:]))
136
137 def setUp(self):
138 BaseTestCase.setUp(self)
139 self.con = http.client.HTTPConnection(self.HOST, self.PORT)
140 self.con.connect()
141
142 def test_command(self):
143 self.con.request('GET', '/')
144 res = self.con.getresponse()
145 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
146
147 def test_request_line_trimming(self):
148 self.con._http_vsn_str = 'HTTP/1.1\n'
149 self.con.putrequest('XYZBOGUS', '/')
150 self.con.endheaders()
151 res = self.con.getresponse()
152 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
153
154 def test_version_bogus(self):
155 self.con._http_vsn_str = 'FUBAR'
156 self.con.putrequest('GET', '/')
157 self.con.endheaders()
158 res = self.con.getresponse()
159 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
160
161 def test_version_digits(self):
162 self.con._http_vsn_str = 'HTTP/9.9.9'
163 self.con.putrequest('GET', '/')
164 self.con.endheaders()
165 res = self.con.getresponse()
166 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
167
168 def test_version_signs_and_underscores(self):
169 self.con._http_vsn_str = 'HTTP/-9_9_9.+9_9_9'
170 self.con.putrequest('GET', '/')
171 self.con.endheaders()
172 res = self.con.getresponse()
173 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
174
175 def test_major_version_number_too_long(self):
176 self.con._http_vsn_str = 'HTTP/909876543210.0'
177 self.con.putrequest('GET', '/')
178 self.con.endheaders()
179 res = self.con.getresponse()
180 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
181
182 def test_minor_version_number_too_long(self):
183 self.con._http_vsn_str = 'HTTP/1.909876543210'
184 self.con.putrequest('GET', '/')
185 self.con.endheaders()
186 res = self.con.getresponse()
187 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
188
189 def test_version_none_get(self):
190 self.con._http_vsn_str = ''
191 self.con.putrequest('GET', '/')
192 self.con.endheaders()
193 res = self.con.getresponse()
194 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
195
196 def test_version_none(self):
197 # Test that a valid method is rejected when not HTTP/1.x
198 self.con._http_vsn_str = ''
199 self.con.putrequest('CUSTOM', '/')
200 self.con.endheaders()
201 res = self.con.getresponse()
202 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
203
204 def test_version_invalid(self):
205 self.con._http_vsn = 99
206 self.con._http_vsn_str = 'HTTP/9.9'
207 self.con.putrequest('GET', '/')
208 self.con.endheaders()
209 res = self.con.getresponse()
210 self.assertEqual(res.status, HTTPStatus.HTTP_VERSION_NOT_SUPPORTED)
211
212 def test_send_blank(self):
213 self.con._http_vsn_str = ''
214 self.con.putrequest('', '')
215 self.con.endheaders()
216 res = self.con.getresponse()
217 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
218
219 def test_header_close(self):
220 self.con.putrequest('GET', '/')
221 self.con.putheader('Connection', 'close')
222 self.con.endheaders()
223 res = self.con.getresponse()
224 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
225
226 def test_header_keep_alive(self):
227 self.con._http_vsn_str = 'HTTP/1.1'
228 self.con.putrequest('GET', '/')
229 self.con.putheader('Connection', 'keep-alive')
230 self.con.endheaders()
231 res = self.con.getresponse()
232 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
233
234 def test_handler(self):
235 self.con.request('TEST', '/')
236 res = self.con.getresponse()
237 self.assertEqual(res.status, HTTPStatus.NO_CONTENT)
238
239 def test_return_header_keep_alive(self):
240 self.con.request('KEEP', '/')
241 res = self.con.getresponse()
242 self.assertEqual(res.getheader('Connection'), 'keep-alive')
243 self.con.request('TEST', '/')
244 self.addCleanup(self.con.close)
245
246 def test_internal_key_error(self):
247 self.con.request('KEYERROR', '/')
248 res = self.con.getresponse()
249 self.assertEqual(res.status, 999)
250
251 def test_return_custom_status(self):
252 self.con.request('CUSTOM', '/')
253 res = self.con.getresponse()
254 self.assertEqual(res.status, 999)
255
256 def test_return_explain_error(self):
257 self.con.request('EXPLAINERROR', '/')
258 res = self.con.getresponse()
259 self.assertEqual(res.status, 999)
260 self.assertTrue(int(res.getheader('Content-Length')))
261
262 def test_latin1_header(self):
263 self.con.request('LATINONEHEADER', '/', headers={
264 'X-Special-Incoming': 'Ärger mit Unicode'
265 })
266 res = self.con.getresponse()
267 self.assertEqual(res.getheader('X-Special'), 'Dängerous Mind')
268 self.assertEqual(res.read(), 'Ärger mit Unicode'.encode('utf-8'))
269
270 def test_error_content_length(self):
271 # Issue #16088: standard error responses should have a content-length
272 self.con.request('NOTFOUND', '/')
273 res = self.con.getresponse()
274 self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
275
276 data = res.read()
277 self.assertEqual(int(res.getheader('Content-Length')), len(data))
278
279 def test_send_error(self):
280 allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED,
281 HTTPStatus.RESET_CONTENT)
282 for code in (HTTPStatus.NO_CONTENT, HTTPStatus.NOT_MODIFIED,
283 HTTPStatus.PROCESSING, HTTPStatus.RESET_CONTENT,
284 HTTPStatus.SWITCHING_PROTOCOLS):
285 self.con.request('SEND_ERROR', '/{}'.format(code))
286 res = self.con.getresponse()
287 self.assertEqual(code, res.status)
288 self.assertEqual(None, res.getheader('Content-Length'))
289 self.assertEqual(None, res.getheader('Content-Type'))
290 if code not in allow_transfer_encoding_codes:
291 self.assertEqual(None, res.getheader('Transfer-Encoding'))
292
293 data = res.read()
294 self.assertEqual(b'', data)
295
296 def test_head_via_send_error(self):
297 allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED,
298 HTTPStatus.RESET_CONTENT)
299 for code in (HTTPStatus.OK, HTTPStatus.NO_CONTENT,
300 HTTPStatus.NOT_MODIFIED, HTTPStatus.RESET_CONTENT,
301 HTTPStatus.SWITCHING_PROTOCOLS):
302 self.con.request('HEAD', '/{}'.format(code))
303 res = self.con.getresponse()
304 self.assertEqual(code, res.status)
305 if code == HTTPStatus.OK:
306 self.assertTrue(int(res.getheader('Content-Length')) > 0)
307 self.assertIn('text/html', res.getheader('Content-Type'))
308 else:
309 self.assertEqual(None, res.getheader('Content-Length'))
310 self.assertEqual(None, res.getheader('Content-Type'))
311 if code not in allow_transfer_encoding_codes:
312 self.assertEqual(None, res.getheader('Transfer-Encoding'))
313
314 data = res.read()
315 self.assertEqual(b'', data)
316
317
318 class ESC[4;38;5;81mRequestHandlerLoggingTestCase(ESC[4;38;5;149mBaseTestCase):
319 class ESC[4;38;5;81mrequest_handler(ESC[4;38;5;149mBaseHTTPRequestHandler):
320 protocol_version = 'HTTP/1.1'
321 default_request_version = 'HTTP/1.1'
322
323 def do_GET(self):
324 self.send_response(HTTPStatus.OK)
325 self.end_headers()
326
327 def do_ERROR(self):
328 self.send_error(HTTPStatus.NOT_FOUND, 'File not found')
329
330 def test_get(self):
331 self.con = http.client.HTTPConnection(self.HOST, self.PORT)
332 self.con.connect()
333
334 with support.captured_stderr() as err:
335 self.con.request('GET', '/')
336 self.con.getresponse()
337
338 self.assertTrue(
339 err.getvalue().endswith('"GET / HTTP/1.1" 200 -\n'))
340
341 def test_err(self):
342 self.con = http.client.HTTPConnection(self.HOST, self.PORT)
343 self.con.connect()
344
345 with support.captured_stderr() as err:
346 self.con.request('ERROR', '/')
347 self.con.getresponse()
348
349 lines = err.getvalue().split('\n')
350 self.assertTrue(lines[0].endswith('code 404, message File not found'))
351 self.assertTrue(lines[1].endswith('"ERROR / HTTP/1.1" 404 -'))
352
353
354 class ESC[4;38;5;81mSimpleHTTPServerTestCase(ESC[4;38;5;149mBaseTestCase):
355 class ESC[4;38;5;81mrequest_handler(ESC[4;38;5;149mNoLogRequestHandler, ESC[4;38;5;149mSimpleHTTPRequestHandler):
356 pass
357
358 def setUp(self):
359 super().setUp()
360 self.cwd = os.getcwd()
361 basetempdir = tempfile.gettempdir()
362 os.chdir(basetempdir)
363 self.data = b'We are the knights who say Ni!'
364 self.tempdir = tempfile.mkdtemp(dir=basetempdir)
365 self.tempdir_name = os.path.basename(self.tempdir)
366 self.base_url = '/' + self.tempdir_name
367 tempname = os.path.join(self.tempdir, 'test')
368 with open(tempname, 'wb') as temp:
369 temp.write(self.data)
370 temp.flush()
371 mtime = os.stat(tempname).st_mtime
372 # compute last modification datetime for browser cache tests
373 last_modif = datetime.datetime.fromtimestamp(mtime,
374 datetime.timezone.utc)
375 self.last_modif_datetime = last_modif.replace(microsecond=0)
376 self.last_modif_header = email.utils.formatdate(
377 last_modif.timestamp(), usegmt=True)
378
379 def tearDown(self):
380 try:
381 os.chdir(self.cwd)
382 try:
383 shutil.rmtree(self.tempdir)
384 except:
385 pass
386 finally:
387 super().tearDown()
388
389 def check_status_and_reason(self, response, status, data=None):
390 def close_conn():
391 """Don't close reader yet so we can check if there was leftover
392 buffered input"""
393 nonlocal reader
394 reader = response.fp
395 response.fp = None
396 reader = None
397 response._close_conn = close_conn
398
399 body = response.read()
400 self.assertTrue(response)
401 self.assertEqual(response.status, status)
402 self.assertIsNotNone(response.reason)
403 if data:
404 self.assertEqual(data, body)
405 # Ensure the server has not set up a persistent connection, and has
406 # not sent any extra data
407 self.assertEqual(response.version, 10)
408 self.assertEqual(response.msg.get("Connection", "close"), "close")
409 self.assertEqual(reader.read(30), b'', 'Connection should be closed')
410
411 reader.close()
412 return body
413
414 @unittest.skipIf(sys.platform == 'darwin',
415 'undecodable name cannot always be decoded on macOS')
416 @unittest.skipIf(sys.platform == 'win32',
417 'undecodable name cannot be decoded on win32')
418 @unittest.skipUnless(os_helper.TESTFN_UNDECODABLE,
419 'need os_helper.TESTFN_UNDECODABLE')
420 def test_undecodable_filename(self):
421 enc = sys.getfilesystemencoding()
422 filename = os.fsdecode(os_helper.TESTFN_UNDECODABLE) + '.txt'
423 with open(os.path.join(self.tempdir, filename), 'wb') as f:
424 f.write(os_helper.TESTFN_UNDECODABLE)
425 response = self.request(self.base_url + '/')
426 if sys.platform == 'darwin':
427 # On Mac OS the HFS+ filesystem replaces bytes that aren't valid
428 # UTF-8 into a percent-encoded value.
429 for name in os.listdir(self.tempdir):
430 if name != 'test': # Ignore a filename created in setUp().
431 filename = name
432 break
433 body = self.check_status_and_reason(response, HTTPStatus.OK)
434 quotedname = urllib.parse.quote(filename, errors='surrogatepass')
435 self.assertIn(('href="%s"' % quotedname)
436 .encode(enc, 'surrogateescape'), body)
437 self.assertIn(('>%s<' % html.escape(filename, quote=False))
438 .encode(enc, 'surrogateescape'), body)
439 response = self.request(self.base_url + '/' + quotedname)
440 self.check_status_and_reason(response, HTTPStatus.OK,
441 data=os_helper.TESTFN_UNDECODABLE)
442
443 def test_undecodable_parameter(self):
444 # sanity check using a valid parameter
445 response = self.request(self.base_url + '/?x=123').read()
446 self.assertRegex(response, rf'listing for {self.base_url}/\?x=123'.encode('latin1'))
447 # now the bogus encoding
448 response = self.request(self.base_url + '/?x=%bb').read()
449 self.assertRegex(response, rf'listing for {self.base_url}/\?x=\xef\xbf\xbd'.encode('latin1'))
450
451 def test_get_dir_redirect_location_domain_injection_bug(self):
452 """Ensure //evil.co/..%2f../../X does not put //evil.co/ in Location.
453
454 //netloc/ in a Location header is a redirect to a new host.
455 https://github.com/python/cpython/issues/87389
456
457 This checks that a path resolving to a directory on our server cannot
458 resolve into a redirect to another server.
459 """
460 os.mkdir(os.path.join(self.tempdir, 'existing_directory'))
461 url = f'/python.org/..%2f..%2f..%2f..%2f..%2f../%0a%0d/../{self.tempdir_name}/existing_directory'
462 expected_location = f'{url}/' # /python.org.../ single slash single prefix, trailing slash
463 # Canonicalizes to /tmp/tempdir_name/existing_directory which does
464 # exist and is a dir, triggering the 301 redirect logic.
465 response = self.request(url)
466 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
467 location = response.getheader('Location')
468 self.assertEqual(location, expected_location, msg='non-attack failed!')
469
470 # //python.org... multi-slash prefix, no trailing slash
471 attack_url = f'/{url}'
472 response = self.request(attack_url)
473 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
474 location = response.getheader('Location')
475 self.assertFalse(location.startswith('//'), msg=location)
476 self.assertEqual(location, expected_location,
477 msg='Expected Location header to start with a single / and '
478 'end with a / as this is a directory redirect.')
479
480 # ///python.org... triple-slash prefix, no trailing slash
481 attack3_url = f'//{url}'
482 response = self.request(attack3_url)
483 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
484 self.assertEqual(response.getheader('Location'), expected_location)
485
486 # If the second word in the http request (Request-URI for the http
487 # method) is a full URI, we don't worry about it, as that'll be parsed
488 # and reassembled as a full URI within BaseHTTPRequestHandler.send_head
489 # so no errant scheme-less //netloc//evil.co/ domain mixup can happen.
490 attack_scheme_netloc_2slash_url = f'https://pypi.org/{url}'
491 expected_scheme_netloc_location = f'{attack_scheme_netloc_2slash_url}/'
492 response = self.request(attack_scheme_netloc_2slash_url)
493 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
494 location = response.getheader('Location')
495 # We're just ensuring that the scheme and domain make it through, if
496 # there are or aren't multiple slashes at the start of the path that
497 # follows that isn't important in this Location: header.
498 self.assertTrue(location.startswith('https://pypi.org/'), msg=location)
499
500 def test_get(self):
501 #constructs the path relative to the root directory of the HTTPServer
502 response = self.request(self.base_url + '/test')
503 self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
504 # check for trailing "/" which should return 404. See Issue17324
505 response = self.request(self.base_url + '/test/')
506 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
507 response = self.request(self.base_url + '/')
508 self.check_status_and_reason(response, HTTPStatus.OK)
509 response = self.request(self.base_url)
510 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
511 self.assertEqual(response.getheader("Content-Length"), "0")
512 response = self.request(self.base_url + '/?hi=2')
513 self.check_status_and_reason(response, HTTPStatus.OK)
514 response = self.request(self.base_url + '?hi=1')
515 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
516 self.assertEqual(response.getheader("Location"),
517 self.base_url + "/?hi=1")
518 response = self.request('/ThisDoesNotExist')
519 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
520 response = self.request('/' + 'ThisDoesNotExist' + '/')
521 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
522 os.makedirs(os.path.join(self.tempdir, 'spam', 'index.html'))
523 response = self.request(self.base_url + '/spam/')
524 self.check_status_and_reason(response, HTTPStatus.OK)
525
526 data = b"Dummy index file\r\n"
527 with open(os.path.join(self.tempdir_name, 'index.html'), 'wb') as f:
528 f.write(data)
529 response = self.request(self.base_url + '/')
530 self.check_status_and_reason(response, HTTPStatus.OK, data)
531
532 # chmod() doesn't work as expected on Windows, and filesystem
533 # permissions are ignored by root on Unix.
534 if os.name == 'posix' and os.geteuid() != 0:
535 os.chmod(self.tempdir, 0)
536 try:
537 response = self.request(self.base_url + '/')
538 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
539 finally:
540 os.chmod(self.tempdir, 0o755)
541
542 def test_head(self):
543 response = self.request(
544 self.base_url + '/test', method='HEAD')
545 self.check_status_and_reason(response, HTTPStatus.OK)
546 self.assertEqual(response.getheader('content-length'),
547 str(len(self.data)))
548 self.assertEqual(response.getheader('content-type'),
549 'application/octet-stream')
550
551 def test_browser_cache(self):
552 """Check that when a request to /test is sent with the request header
553 If-Modified-Since set to date of last modification, the server returns
554 status code 304, not 200
555 """
556 headers = email.message.Message()
557 headers['If-Modified-Since'] = self.last_modif_header
558 response = self.request(self.base_url + '/test', headers=headers)
559 self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED)
560
561 # one hour after last modification : must return 304
562 new_dt = self.last_modif_datetime + datetime.timedelta(hours=1)
563 headers = email.message.Message()
564 headers['If-Modified-Since'] = email.utils.format_datetime(new_dt,
565 usegmt=True)
566 response = self.request(self.base_url + '/test', headers=headers)
567 self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED)
568
569 def test_browser_cache_file_changed(self):
570 # with If-Modified-Since earlier than Last-Modified, must return 200
571 dt = self.last_modif_datetime
572 # build datetime object : 365 days before last modification
573 old_dt = dt - datetime.timedelta(days=365)
574 headers = email.message.Message()
575 headers['If-Modified-Since'] = email.utils.format_datetime(old_dt,
576 usegmt=True)
577 response = self.request(self.base_url + '/test', headers=headers)
578 self.check_status_and_reason(response, HTTPStatus.OK)
579
580 def test_browser_cache_with_If_None_Match_header(self):
581 # if If-None-Match header is present, ignore If-Modified-Since
582
583 headers = email.message.Message()
584 headers['If-Modified-Since'] = self.last_modif_header
585 headers['If-None-Match'] = "*"
586 response = self.request(self.base_url + '/test', headers=headers)
587 self.check_status_and_reason(response, HTTPStatus.OK)
588
589 def test_invalid_requests(self):
590 response = self.request('/', method='FOO')
591 self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
592 # requests must be case sensitive,so this should fail too
593 response = self.request('/', method='custom')
594 self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
595 response = self.request('/', method='GETs')
596 self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
597
598 def test_last_modified(self):
599 """Checks that the datetime returned in Last-Modified response header
600 is the actual datetime of last modification, rounded to the second
601 """
602 response = self.request(self.base_url + '/test')
603 self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
604 last_modif_header = response.headers['Last-modified']
605 self.assertEqual(last_modif_header, self.last_modif_header)
606
607 def test_path_without_leading_slash(self):
608 response = self.request(self.tempdir_name + '/test')
609 self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
610 response = self.request(self.tempdir_name + '/test/')
611 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
612 response = self.request(self.tempdir_name + '/')
613 self.check_status_and_reason(response, HTTPStatus.OK)
614 response = self.request(self.tempdir_name)
615 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
616 response = self.request(self.tempdir_name + '/?hi=2')
617 self.check_status_and_reason(response, HTTPStatus.OK)
618 response = self.request(self.tempdir_name + '?hi=1')
619 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
620 self.assertEqual(response.getheader("Location"),
621 self.tempdir_name + "/?hi=1")
622
623 def test_html_escape_filename(self):
624 filename = '<test&>.txt'
625 fullpath = os.path.join(self.tempdir, filename)
626
627 try:
628 open(fullpath, 'wb').close()
629 except OSError:
630 raise unittest.SkipTest('Can not create file %s on current file '
631 'system' % filename)
632
633 try:
634 response = self.request(self.base_url + '/')
635 body = self.check_status_and_reason(response, HTTPStatus.OK)
636 enc = response.headers.get_content_charset()
637 finally:
638 os.unlink(fullpath) # avoid affecting test_undecodable_filename
639
640 self.assertIsNotNone(enc)
641 html_text = '>%s<' % html.escape(filename, quote=False)
642 self.assertIn(html_text.encode(enc), body)
643
644
645 cgi_file1 = """\
646 #!%s
647
648 print("Content-type: text/html")
649 print()
650 print("Hello World")
651 """
652
653 cgi_file2 = """\
654 #!%s
655 import os
656 import sys
657 import urllib.parse
658
659 print("Content-type: text/html")
660 print()
661
662 content_length = int(os.environ["CONTENT_LENGTH"])
663 query_string = sys.stdin.buffer.read(content_length)
664 params = {key.decode("utf-8"): val.decode("utf-8")
665 for key, val in urllib.parse.parse_qsl(query_string)}
666
667 print("%%s, %%s, %%s" %% (params["spam"], params["eggs"], params["bacon"]))
668 """
669
670 cgi_file4 = """\
671 #!%s
672 import os
673
674 print("Content-type: text/html")
675 print()
676
677 print(os.environ["%s"])
678 """
679
680 cgi_file6 = """\
681 #!%s
682 import os
683
684 print("X-ambv: was here")
685 print("Content-type: text/html")
686 print()
687 print("<pre>")
688 for k, v in os.environ.items():
689 try:
690 k.encode('ascii')
691 v.encode('ascii')
692 except UnicodeEncodeError:
693 continue # see: BPO-44647
694 print(f"{k}={v}")
695 print("</pre>")
696 """
697
698
699 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
700 "This test can't be run reliably as root (issue #13308).")
701 class ESC[4;38;5;81mCGIHTTPServerTestCase(ESC[4;38;5;149mBaseTestCase):
702 class ESC[4;38;5;81mrequest_handler(ESC[4;38;5;149mNoLogRequestHandler, ESC[4;38;5;149mCGIHTTPRequestHandler):
703 def run_cgi(self):
704 # Silence the threading + fork DeprecationWarning this causes.
705 # gh-109096: This is deprecated in 3.13 to go away in 3.15.
706 with warnings.catch_warnings(action='ignore', category=DeprecationWarning):
707 return super().run_cgi()
708
709 linesep = os.linesep.encode('ascii')
710
711 def setUp(self):
712 BaseTestCase.setUp(self)
713 self.cwd = os.getcwd()
714 self.parent_dir = tempfile.mkdtemp()
715 self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
716 self.cgi_child_dir = os.path.join(self.cgi_dir, 'child-dir')
717 self.sub_dir_1 = os.path.join(self.parent_dir, 'sub')
718 self.sub_dir_2 = os.path.join(self.sub_dir_1, 'dir')
719 self.cgi_dir_in_sub_dir = os.path.join(self.sub_dir_2, 'cgi-bin')
720 os.mkdir(self.cgi_dir)
721 os.mkdir(self.cgi_child_dir)
722 os.mkdir(self.sub_dir_1)
723 os.mkdir(self.sub_dir_2)
724 os.mkdir(self.cgi_dir_in_sub_dir)
725 self.nocgi_path = None
726 self.file1_path = None
727 self.file2_path = None
728 self.file3_path = None
729 self.file4_path = None
730 self.file5_path = None
731
732 # The shebang line should be pure ASCII: use symlink if possible.
733 # See issue #7668.
734 self._pythonexe_symlink = None
735 if os_helper.can_symlink():
736 self.pythonexe = os.path.join(self.parent_dir, 'python')
737 self._pythonexe_symlink = support.PythonSymlink(self.pythonexe).__enter__()
738 else:
739 self.pythonexe = sys.executable
740
741 try:
742 # The python executable path is written as the first line of the
743 # CGI Python script. The encoding cookie cannot be used, and so the
744 # path should be encodable to the default script encoding (utf-8)
745 self.pythonexe.encode('utf-8')
746 except UnicodeEncodeError:
747 self.tearDown()
748 self.skipTest("Python executable path is not encodable to utf-8")
749
750 self.nocgi_path = os.path.join(self.parent_dir, 'nocgi.py')
751 with open(self.nocgi_path, 'w', encoding='utf-8') as fp:
752 fp.write(cgi_file1 % self.pythonexe)
753 os.chmod(self.nocgi_path, 0o777)
754
755 self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
756 with open(self.file1_path, 'w', encoding='utf-8') as file1:
757 file1.write(cgi_file1 % self.pythonexe)
758 os.chmod(self.file1_path, 0o777)
759
760 self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
761 with open(self.file2_path, 'w', encoding='utf-8') as file2:
762 file2.write(cgi_file2 % self.pythonexe)
763 os.chmod(self.file2_path, 0o777)
764
765 self.file3_path = os.path.join(self.cgi_child_dir, 'file3.py')
766 with open(self.file3_path, 'w', encoding='utf-8') as file3:
767 file3.write(cgi_file1 % self.pythonexe)
768 os.chmod(self.file3_path, 0o777)
769
770 self.file4_path = os.path.join(self.cgi_dir, 'file4.py')
771 with open(self.file4_path, 'w', encoding='utf-8') as file4:
772 file4.write(cgi_file4 % (self.pythonexe, 'QUERY_STRING'))
773 os.chmod(self.file4_path, 0o777)
774
775 self.file5_path = os.path.join(self.cgi_dir_in_sub_dir, 'file5.py')
776 with open(self.file5_path, 'w', encoding='utf-8') as file5:
777 file5.write(cgi_file1 % self.pythonexe)
778 os.chmod(self.file5_path, 0o777)
779
780 self.file6_path = os.path.join(self.cgi_dir, 'file6.py')
781 with open(self.file6_path, 'w', encoding='utf-8') as file6:
782 file6.write(cgi_file6 % self.pythonexe)
783 os.chmod(self.file6_path, 0o777)
784
785 os.chdir(self.parent_dir)
786
787 def tearDown(self):
788 try:
789 os.chdir(self.cwd)
790 if self._pythonexe_symlink:
791 self._pythonexe_symlink.__exit__(None, None, None)
792 if self.nocgi_path:
793 os.remove(self.nocgi_path)
794 if self.file1_path:
795 os.remove(self.file1_path)
796 if self.file2_path:
797 os.remove(self.file2_path)
798 if self.file3_path:
799 os.remove(self.file3_path)
800 if self.file4_path:
801 os.remove(self.file4_path)
802 if self.file5_path:
803 os.remove(self.file5_path)
804 if self.file6_path:
805 os.remove(self.file6_path)
806 os.rmdir(self.cgi_child_dir)
807 os.rmdir(self.cgi_dir)
808 os.rmdir(self.cgi_dir_in_sub_dir)
809 os.rmdir(self.sub_dir_2)
810 os.rmdir(self.sub_dir_1)
811 os.rmdir(self.parent_dir)
812 finally:
813 BaseTestCase.tearDown(self)
814
815 def test_url_collapse_path(self):
816 # verify tail is the last portion and head is the rest on proper urls
817 test_vectors = {
818 '': '//',
819 '..': IndexError,
820 '/.//..': IndexError,
821 '/': '//',
822 '//': '//',
823 '/\\': '//\\',
824 '/.//': '//',
825 'cgi-bin/file1.py': '/cgi-bin/file1.py',
826 '/cgi-bin/file1.py': '/cgi-bin/file1.py',
827 'a': '//a',
828 '/a': '//a',
829 '//a': '//a',
830 './a': '//a',
831 './C:/': '/C:/',
832 '/a/b': '/a/b',
833 '/a/b/': '/a/b/',
834 '/a/b/.': '/a/b/',
835 '/a/b/c/..': '/a/b/',
836 '/a/b/c/../d': '/a/b/d',
837 '/a/b/c/../d/e/../f': '/a/b/d/f',
838 '/a/b/c/../d/e/../../f': '/a/b/f',
839 '/a/b/c/../d/e/.././././..//f': '/a/b/f',
840 '../a/b/c/../d/e/.././././..//f': IndexError,
841 '/a/b/c/../d/e/../../../f': '/a/f',
842 '/a/b/c/../d/e/../../../../f': '//f',
843 '/a/b/c/../d/e/../../../../../f': IndexError,
844 '/a/b/c/../d/e/../../../../f/..': '//',
845 '/a/b/c/../d/e/../../../../f/../.': '//',
846 }
847 for path, expected in test_vectors.items():
848 if isinstance(expected, type) and issubclass(expected, Exception):
849 self.assertRaises(expected,
850 server._url_collapse_path, path)
851 else:
852 actual = server._url_collapse_path(path)
853 self.assertEqual(expected, actual,
854 msg='path = %r\nGot: %r\nWanted: %r' %
855 (path, actual, expected))
856
857 def test_headers_and_content(self):
858 res = self.request('/cgi-bin/file1.py')
859 self.assertEqual(
860 (res.read(), res.getheader('Content-type'), res.status),
861 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK))
862
863 def test_issue19435(self):
864 res = self.request('///////////nocgi.py/../cgi-bin/nothere.sh')
865 self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
866
867 def test_post(self):
868 params = urllib.parse.urlencode(
869 {'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
870 headers = {'Content-type' : 'application/x-www-form-urlencoded'}
871 res = self.request('/cgi-bin/file2.py', 'POST', params, headers)
872
873 self.assertEqual(res.read(), b'1, python, 123456' + self.linesep)
874
875 def test_invaliduri(self):
876 res = self.request('/cgi-bin/invalid')
877 res.read()
878 self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
879
880 def test_authorization(self):
881 headers = {b'Authorization' : b'Basic ' +
882 base64.b64encode(b'username:pass')}
883 res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
884 self.assertEqual(
885 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
886 (res.read(), res.getheader('Content-type'), res.status))
887
888 def test_no_leading_slash(self):
889 # http://bugs.python.org/issue2254
890 res = self.request('cgi-bin/file1.py')
891 self.assertEqual(
892 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
893 (res.read(), res.getheader('Content-type'), res.status))
894
895 def test_os_environ_is_not_altered(self):
896 signature = "Test CGI Server"
897 os.environ['SERVER_SOFTWARE'] = signature
898 res = self.request('/cgi-bin/file1.py')
899 self.assertEqual(
900 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
901 (res.read(), res.getheader('Content-type'), res.status))
902 self.assertEqual(os.environ['SERVER_SOFTWARE'], signature)
903
904 def test_urlquote_decoding_in_cgi_check(self):
905 res = self.request('/cgi-bin%2ffile1.py')
906 self.assertEqual(
907 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
908 (res.read(), res.getheader('Content-type'), res.status))
909
910 def test_nested_cgi_path_issue21323(self):
911 res = self.request('/cgi-bin/child-dir/file3.py')
912 self.assertEqual(
913 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
914 (res.read(), res.getheader('Content-type'), res.status))
915
916 def test_query_with_multiple_question_mark(self):
917 res = self.request('/cgi-bin/file4.py?a=b?c=d')
918 self.assertEqual(
919 (b'a=b?c=d' + self.linesep, 'text/html', HTTPStatus.OK),
920 (res.read(), res.getheader('Content-type'), res.status))
921
922 def test_query_with_continuous_slashes(self):
923 res = self.request('/cgi-bin/file4.py?k=aa%2F%2Fbb&//q//p//=//a//b//')
924 self.assertEqual(
925 (b'k=aa%2F%2Fbb&//q//p//=//a//b//' + self.linesep,
926 'text/html', HTTPStatus.OK),
927 (res.read(), res.getheader('Content-type'), res.status))
928
929 def test_cgi_path_in_sub_directories(self):
930 try:
931 CGIHTTPRequestHandler.cgi_directories.append('/sub/dir/cgi-bin')
932 res = self.request('/sub/dir/cgi-bin/file5.py')
933 self.assertEqual(
934 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
935 (res.read(), res.getheader('Content-type'), res.status))
936 finally:
937 CGIHTTPRequestHandler.cgi_directories.remove('/sub/dir/cgi-bin')
938
939 def test_accept(self):
940 browser_accept = \
941 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'
942 tests = (
943 ((('Accept', browser_accept),), browser_accept),
944 ((), ''),
945 # Hack case to get two values for the one header
946 ((('Accept', 'text/html'), ('ACCEPT', 'text/plain')),
947 'text/html,text/plain'),
948 )
949 for headers, expected in tests:
950 headers = OrderedDict(headers)
951 with self.subTest(headers):
952 res = self.request('/cgi-bin/file6.py', 'GET', headers=headers)
953 self.assertEqual(http.HTTPStatus.OK, res.status)
954 expected = f"HTTP_ACCEPT={expected}".encode('ascii')
955 self.assertIn(expected, res.read())
956
957
958 class ESC[4;38;5;81mSocketlessRequestHandler(ESC[4;38;5;149mSimpleHTTPRequestHandler):
959 def __init__(self, directory=None):
960 request = mock.Mock()
961 request.makefile.return_value = BytesIO()
962 super().__init__(request, None, None, directory=directory)
963
964 self.get_called = False
965 self.protocol_version = "HTTP/1.1"
966
967 def do_GET(self):
968 self.get_called = True
969 self.send_response(HTTPStatus.OK)
970 self.send_header('Content-Type', 'text/html')
971 self.end_headers()
972 self.wfile.write(b'<html><body>Data</body></html>\r\n')
973
974 def log_message(self, format, *args):
975 pass
976
977 class ESC[4;38;5;81mRejectingSocketlessRequestHandler(ESC[4;38;5;149mSocketlessRequestHandler):
978 def handle_expect_100(self):
979 self.send_error(HTTPStatus.EXPECTATION_FAILED)
980 return False
981
982
983 class ESC[4;38;5;81mAuditableBytesIO:
984
985 def __init__(self):
986 self.datas = []
987
988 def write(self, data):
989 self.datas.append(data)
990
991 def getData(self):
992 return b''.join(self.datas)
993
994 @property
995 def numWrites(self):
996 return len(self.datas)
997
998
999 class ESC[4;38;5;81mBaseHTTPRequestHandlerTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1000 """Test the functionality of the BaseHTTPServer.
1001
1002 Test the support for the Expect 100-continue header.
1003 """
1004
1005 HTTPResponseMatch = re.compile(b'HTTP/1.[0-9]+ 200 OK')
1006
1007 def setUp (self):
1008 self.handler = SocketlessRequestHandler()
1009
1010 def send_typical_request(self, message):
1011 input = BytesIO(message)
1012 output = BytesIO()
1013 self.handler.rfile = input
1014 self.handler.wfile = output
1015 self.handler.handle_one_request()
1016 output.seek(0)
1017 return output.readlines()
1018
1019 def verify_get_called(self):
1020 self.assertTrue(self.handler.get_called)
1021
1022 def verify_expected_headers(self, headers):
1023 for fieldName in b'Server: ', b'Date: ', b'Content-Type: ':
1024 self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1)
1025
1026 def verify_http_server_response(self, response):
1027 match = self.HTTPResponseMatch.search(response)
1028 self.assertIsNotNone(match)
1029
1030 def test_unprintable_not_logged(self):
1031 # We call the method from the class directly as our Socketless
1032 # Handler subclass overrode it... nice for everything BUT this test.
1033 self.handler.client_address = ('127.0.0.1', 1337)
1034 log_message = BaseHTTPRequestHandler.log_message
1035 with mock.patch.object(sys, 'stderr', StringIO()) as fake_stderr:
1036 log_message(self.handler, '/foo')
1037 log_message(self.handler, '/\033bar\000\033')
1038 log_message(self.handler, '/spam %s.', 'a')
1039 log_message(self.handler, '/spam %s.', '\033\x7f\x9f\xa0beans')
1040 log_message(self.handler, '"GET /foo\\b"ar\007 HTTP/1.0"')
1041 stderr = fake_stderr.getvalue()
1042 self.assertNotIn('\033', stderr) # non-printable chars are caught.
1043 self.assertNotIn('\000', stderr) # non-printable chars are caught.
1044 lines = stderr.splitlines()
1045 self.assertIn('/foo', lines[0])
1046 self.assertIn(r'/\x1bbar\x00\x1b', lines[1])
1047 self.assertIn('/spam a.', lines[2])
1048 self.assertIn('/spam \\x1b\\x7f\\x9f\xa0beans.', lines[3])
1049 self.assertIn(r'"GET /foo\\b"ar\x07 HTTP/1.0"', lines[4])
1050
1051 def test_http_1_1(self):
1052 result = self.send_typical_request(b'GET / HTTP/1.1\r\n\r\n')
1053 self.verify_http_server_response(result[0])
1054 self.verify_expected_headers(result[1:-1])
1055 self.verify_get_called()
1056 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
1057 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
1058 self.assertEqual(self.handler.command, 'GET')
1059 self.assertEqual(self.handler.path, '/')
1060 self.assertEqual(self.handler.request_version, 'HTTP/1.1')
1061 self.assertSequenceEqual(self.handler.headers.items(), ())
1062
1063 def test_http_1_0(self):
1064 result = self.send_typical_request(b'GET / HTTP/1.0\r\n\r\n')
1065 self.verify_http_server_response(result[0])
1066 self.verify_expected_headers(result[1:-1])
1067 self.verify_get_called()
1068 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
1069 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0')
1070 self.assertEqual(self.handler.command, 'GET')
1071 self.assertEqual(self.handler.path, '/')
1072 self.assertEqual(self.handler.request_version, 'HTTP/1.0')
1073 self.assertSequenceEqual(self.handler.headers.items(), ())
1074
1075 def test_http_0_9(self):
1076 result = self.send_typical_request(b'GET / HTTP/0.9\r\n\r\n')
1077 self.assertEqual(len(result), 1)
1078 self.assertEqual(result[0], b'<html><body>Data</body></html>\r\n')
1079 self.verify_get_called()
1080
1081 def test_extra_space(self):
1082 result = self.send_typical_request(
1083 b'GET /spaced out HTTP/1.1\r\n'
1084 b'Host: dummy\r\n'
1085 b'\r\n'
1086 )
1087 self.assertTrue(result[0].startswith(b'HTTP/1.1 400 '))
1088 self.verify_expected_headers(result[1:result.index(b'\r\n')])
1089 self.assertFalse(self.handler.get_called)
1090
1091 def test_with_continue_1_0(self):
1092 result = self.send_typical_request(b'GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n')
1093 self.verify_http_server_response(result[0])
1094 self.verify_expected_headers(result[1:-1])
1095 self.verify_get_called()
1096 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
1097 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0')
1098 self.assertEqual(self.handler.command, 'GET')
1099 self.assertEqual(self.handler.path, '/')
1100 self.assertEqual(self.handler.request_version, 'HTTP/1.0')
1101 headers = (("Expect", "100-continue"),)
1102 self.assertSequenceEqual(self.handler.headers.items(), headers)
1103
1104 def test_with_continue_1_1(self):
1105 result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
1106 self.assertEqual(result[0], b'HTTP/1.1 100 Continue\r\n')
1107 self.assertEqual(result[1], b'\r\n')
1108 self.assertEqual(result[2], b'HTTP/1.1 200 OK\r\n')
1109 self.verify_expected_headers(result[2:-1])
1110 self.verify_get_called()
1111 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
1112 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
1113 self.assertEqual(self.handler.command, 'GET')
1114 self.assertEqual(self.handler.path, '/')
1115 self.assertEqual(self.handler.request_version, 'HTTP/1.1')
1116 headers = (("Expect", "100-continue"),)
1117 self.assertSequenceEqual(self.handler.headers.items(), headers)
1118
1119 def test_header_buffering_of_send_error(self):
1120
1121 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
1122 output = AuditableBytesIO()
1123 handler = SocketlessRequestHandler()
1124 handler.rfile = input
1125 handler.wfile = output
1126 handler.request_version = 'HTTP/1.1'
1127 handler.requestline = ''
1128 handler.command = None
1129
1130 handler.send_error(418)
1131 self.assertEqual(output.numWrites, 2)
1132
1133 def test_header_buffering_of_send_response_only(self):
1134
1135 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
1136 output = AuditableBytesIO()
1137 handler = SocketlessRequestHandler()
1138 handler.rfile = input
1139 handler.wfile = output
1140 handler.request_version = 'HTTP/1.1'
1141
1142 handler.send_response_only(418)
1143 self.assertEqual(output.numWrites, 0)
1144 handler.end_headers()
1145 self.assertEqual(output.numWrites, 1)
1146
1147 def test_header_buffering_of_send_header(self):
1148
1149 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
1150 output = AuditableBytesIO()
1151 handler = SocketlessRequestHandler()
1152 handler.rfile = input
1153 handler.wfile = output
1154 handler.request_version = 'HTTP/1.1'
1155
1156 handler.send_header('Foo', 'foo')
1157 handler.send_header('bar', 'bar')
1158 self.assertEqual(output.numWrites, 0)
1159 handler.end_headers()
1160 self.assertEqual(output.getData(), b'Foo: foo\r\nbar: bar\r\n\r\n')
1161 self.assertEqual(output.numWrites, 1)
1162
1163 def test_header_unbuffered_when_continue(self):
1164
1165 def _readAndReseek(f):
1166 pos = f.tell()
1167 f.seek(0)
1168 data = f.read()
1169 f.seek(pos)
1170 return data
1171
1172 input = BytesIO(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
1173 output = BytesIO()
1174 self.handler.rfile = input
1175 self.handler.wfile = output
1176 self.handler.request_version = 'HTTP/1.1'
1177
1178 self.handler.handle_one_request()
1179 self.assertNotEqual(_readAndReseek(output), b'')
1180 result = _readAndReseek(output).split(b'\r\n')
1181 self.assertEqual(result[0], b'HTTP/1.1 100 Continue')
1182 self.assertEqual(result[1], b'')
1183 self.assertEqual(result[2], b'HTTP/1.1 200 OK')
1184
1185 def test_with_continue_rejected(self):
1186 usual_handler = self.handler # Save to avoid breaking any subsequent tests.
1187 self.handler = RejectingSocketlessRequestHandler()
1188 result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
1189 self.assertEqual(result[0], b'HTTP/1.1 417 Expectation Failed\r\n')
1190 self.verify_expected_headers(result[1:-1])
1191 # The expect handler should short circuit the usual get method by
1192 # returning false here, so get_called should be false
1193 self.assertFalse(self.handler.get_called)
1194 self.assertEqual(sum(r == b'Connection: close\r\n' for r in result[1:-1]), 1)
1195 self.handler = usual_handler # Restore to avoid breaking any subsequent tests.
1196
1197 def test_request_length(self):
1198 # Issue #10714: huge request lines are discarded, to avoid Denial
1199 # of Service attacks.
1200 result = self.send_typical_request(b'GET ' + b'x' * 65537)
1201 self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n')
1202 self.assertFalse(self.handler.get_called)
1203 self.assertIsInstance(self.handler.requestline, str)
1204
1205 def test_header_length(self):
1206 # Issue #6791: same for headers
1207 result = self.send_typical_request(
1208 b'GET / HTTP/1.1\r\nX-Foo: bar' + b'r' * 65537 + b'\r\n\r\n')
1209 self.assertEqual(result[0], b'HTTP/1.1 431 Line too long\r\n')
1210 self.assertFalse(self.handler.get_called)
1211 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
1212
1213 def test_too_many_headers(self):
1214 result = self.send_typical_request(
1215 b'GET / HTTP/1.1\r\n' + b'X-Foo: bar\r\n' * 101 + b'\r\n')
1216 self.assertEqual(result[0], b'HTTP/1.1 431 Too many headers\r\n')
1217 self.assertFalse(self.handler.get_called)
1218 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
1219
1220 def test_html_escape_on_error(self):
1221 result = self.send_typical_request(
1222 b'<script>alert("hello")</script> / HTTP/1.1')
1223 result = b''.join(result)
1224 text = '<script>alert("hello")</script>'
1225 self.assertIn(html.escape(text, quote=False).encode('ascii'), result)
1226
1227 def test_close_connection(self):
1228 # handle_one_request() should be repeatedly called until
1229 # it sets close_connection
1230 def handle_one_request():
1231 self.handler.close_connection = next(close_values)
1232 self.handler.handle_one_request = handle_one_request
1233
1234 close_values = iter((True,))
1235 self.handler.handle()
1236 self.assertRaises(StopIteration, next, close_values)
1237
1238 close_values = iter((False, False, True))
1239 self.handler.handle()
1240 self.assertRaises(StopIteration, next, close_values)
1241
1242 def test_date_time_string(self):
1243 now = time.time()
1244 # this is the old code that formats the timestamp
1245 year, month, day, hh, mm, ss, wd, y, z = time.gmtime(now)
1246 expected = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % (
1247 self.handler.weekdayname[wd],
1248 day,
1249 self.handler.monthname[month],
1250 year, hh, mm, ss
1251 )
1252 self.assertEqual(self.handler.date_time_string(timestamp=now), expected)
1253
1254
1255 class ESC[4;38;5;81mSimpleHTTPRequestHandlerTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1256 """ Test url parsing """
1257 def setUp(self):
1258 self.translated_1 = os.path.join(os.getcwd(), 'filename')
1259 self.translated_2 = os.path.join('foo', 'filename')
1260 self.translated_3 = os.path.join('bar', 'filename')
1261 self.handler_1 = SocketlessRequestHandler()
1262 self.handler_2 = SocketlessRequestHandler(directory='foo')
1263 self.handler_3 = SocketlessRequestHandler(directory=pathlib.PurePath('bar'))
1264
1265 def test_query_arguments(self):
1266 path = self.handler_1.translate_path('/filename')
1267 self.assertEqual(path, self.translated_1)
1268 path = self.handler_2.translate_path('/filename')
1269 self.assertEqual(path, self.translated_2)
1270 path = self.handler_3.translate_path('/filename')
1271 self.assertEqual(path, self.translated_3)
1272
1273 path = self.handler_1.translate_path('/filename?foo=bar')
1274 self.assertEqual(path, self.translated_1)
1275 path = self.handler_2.translate_path('/filename?foo=bar')
1276 self.assertEqual(path, self.translated_2)
1277 path = self.handler_3.translate_path('/filename?foo=bar')
1278 self.assertEqual(path, self.translated_3)
1279
1280 path = self.handler_1.translate_path('/filename?a=b&spam=eggs#zot')
1281 self.assertEqual(path, self.translated_1)
1282 path = self.handler_2.translate_path('/filename?a=b&spam=eggs#zot')
1283 self.assertEqual(path, self.translated_2)
1284 path = self.handler_3.translate_path('/filename?a=b&spam=eggs#zot')
1285 self.assertEqual(path, self.translated_3)
1286
1287 def test_start_with_double_slash(self):
1288 path = self.handler_1.translate_path('//filename')
1289 self.assertEqual(path, self.translated_1)
1290 path = self.handler_2.translate_path('//filename')
1291 self.assertEqual(path, self.translated_2)
1292 path = self.handler_3.translate_path('//filename')
1293 self.assertEqual(path, self.translated_3)
1294
1295 path = self.handler_1.translate_path('//filename?foo=bar')
1296 self.assertEqual(path, self.translated_1)
1297 path = self.handler_2.translate_path('//filename?foo=bar')
1298 self.assertEqual(path, self.translated_2)
1299 path = self.handler_3.translate_path('//filename?foo=bar')
1300 self.assertEqual(path, self.translated_3)
1301
1302 def test_windows_colon(self):
1303 with support.swap_attr(server.os, 'path', ntpath):
1304 path = self.handler_1.translate_path('c:c:c:foo/filename')
1305 path = path.replace(ntpath.sep, os.sep)
1306 self.assertEqual(path, self.translated_1)
1307 path = self.handler_2.translate_path('c:c:c:foo/filename')
1308 path = path.replace(ntpath.sep, os.sep)
1309 self.assertEqual(path, self.translated_2)
1310 path = self.handler_3.translate_path('c:c:c:foo/filename')
1311 path = path.replace(ntpath.sep, os.sep)
1312 self.assertEqual(path, self.translated_3)
1313
1314 path = self.handler_1.translate_path('\\c:../filename')
1315 path = path.replace(ntpath.sep, os.sep)
1316 self.assertEqual(path, self.translated_1)
1317 path = self.handler_2.translate_path('\\c:../filename')
1318 path = path.replace(ntpath.sep, os.sep)
1319 self.assertEqual(path, self.translated_2)
1320 path = self.handler_3.translate_path('\\c:../filename')
1321 path = path.replace(ntpath.sep, os.sep)
1322 self.assertEqual(path, self.translated_3)
1323
1324 path = self.handler_1.translate_path('c:\\c:..\\foo/filename')
1325 path = path.replace(ntpath.sep, os.sep)
1326 self.assertEqual(path, self.translated_1)
1327 path = self.handler_2.translate_path('c:\\c:..\\foo/filename')
1328 path = path.replace(ntpath.sep, os.sep)
1329 self.assertEqual(path, self.translated_2)
1330 path = self.handler_3.translate_path('c:\\c:..\\foo/filename')
1331 path = path.replace(ntpath.sep, os.sep)
1332 self.assertEqual(path, self.translated_3)
1333
1334 path = self.handler_1.translate_path('c:c:foo\\c:c:bar/filename')
1335 path = path.replace(ntpath.sep, os.sep)
1336 self.assertEqual(path, self.translated_1)
1337 path = self.handler_2.translate_path('c:c:foo\\c:c:bar/filename')
1338 path = path.replace(ntpath.sep, os.sep)
1339 self.assertEqual(path, self.translated_2)
1340 path = self.handler_3.translate_path('c:c:foo\\c:c:bar/filename')
1341 path = path.replace(ntpath.sep, os.sep)
1342 self.assertEqual(path, self.translated_3)
1343
1344
1345 class ESC[4;38;5;81mMiscTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1346 def test_all(self):
1347 expected = []
1348 denylist = {'executable', 'nobody_uid', 'test'}
1349 for name in dir(server):
1350 if name.startswith('_') or name in denylist:
1351 continue
1352 module_object = getattr(server, name)
1353 if getattr(module_object, '__module__', None) == 'http.server':
1354 expected.append(name)
1355 self.assertCountEqual(server.__all__, expected)
1356
1357
1358 class ESC[4;38;5;81mScriptTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1359
1360 def mock_server_class(self):
1361 return mock.MagicMock(
1362 return_value=mock.MagicMock(
1363 __enter__=mock.MagicMock(
1364 return_value=mock.MagicMock(
1365 socket=mock.MagicMock(
1366 getsockname=lambda: ('', 0),
1367 ),
1368 ),
1369 ),
1370 ),
1371 )
1372
1373 @mock.patch('builtins.print')
1374 def test_server_test_unspec(self, _):
1375 mock_server = self.mock_server_class()
1376 server.test(ServerClass=mock_server, bind=None)
1377 self.assertIn(
1378 mock_server.address_family,
1379 (socket.AF_INET6, socket.AF_INET),
1380 )
1381
1382 @mock.patch('builtins.print')
1383 def test_server_test_localhost(self, _):
1384 mock_server = self.mock_server_class()
1385 server.test(ServerClass=mock_server, bind="localhost")
1386 self.assertIn(
1387 mock_server.address_family,
1388 (socket.AF_INET6, socket.AF_INET),
1389 )
1390
1391 ipv6_addrs = (
1392 "::",
1393 "2001:0db8:85a3:0000:0000:8a2e:0370:7334",
1394 "::1",
1395 )
1396
1397 ipv4_addrs = (
1398 "0.0.0.0",
1399 "8.8.8.8",
1400 "127.0.0.1",
1401 )
1402
1403 @mock.patch('builtins.print')
1404 def test_server_test_ipv6(self, _):
1405 for bind in self.ipv6_addrs:
1406 mock_server = self.mock_server_class()
1407 server.test(ServerClass=mock_server, bind=bind)
1408 self.assertEqual(mock_server.address_family, socket.AF_INET6)
1409
1410 @mock.patch('builtins.print')
1411 def test_server_test_ipv4(self, _):
1412 for bind in self.ipv4_addrs:
1413 mock_server = self.mock_server_class()
1414 server.test(ServerClass=mock_server, bind=bind)
1415 self.assertEqual(mock_server.address_family, socket.AF_INET)
1416
1417
1418 def setUpModule():
1419 unittest.addModuleCleanup(os.chdir, os.getcwd())
1420
1421
1422 if __name__ == '__main__':
1423 unittest.main()