1 import base64
2 import os
3 import email
4 import urllib.parse
5 import urllib.request
6 import http.server
7 import threading
8 import unittest
9 import hashlib
10
11 from test import support
12 from test.support import hashlib_helper
13 from test.support import threading_helper
14 from test.support import warnings_helper
15
16 try:
17 import ssl
18 except ImportError:
19 ssl = None
20
21 support.requires_working_socket(module=True)
22
23 here = os.path.dirname(__file__)
24 # Self-signed cert file for 'localhost'
25 CERT_localhost = os.path.join(here, 'certdata', 'keycert.pem')
26 # Self-signed cert file for 'fakehostname'
27 CERT_fakehostname = os.path.join(here, 'certdata', 'keycert2.pem')
28
29
30 # Loopback http server infrastructure
31
32 class ESC[4;38;5;81mLoopbackHttpServer(ESC[4;38;5;149mhttpESC[4;38;5;149m.ESC[4;38;5;149mserverESC[4;38;5;149m.ESC[4;38;5;149mHTTPServer):
33 """HTTP server w/ a few modifications that make it useful for
34 loopback testing purposes.
35 """
36
37 def __init__(self, server_address, RequestHandlerClass):
38 http.server.HTTPServer.__init__(self,
39 server_address,
40 RequestHandlerClass)
41
42 # Set the timeout of our listening socket really low so
43 # that we can stop the server easily.
44 self.socket.settimeout(0.1)
45
46 def get_request(self):
47 """HTTPServer method, overridden."""
48
49 request, client_address = self.socket.accept()
50
51 # It's a loopback connection, so setting the timeout
52 # really low shouldn't affect anything, but should make
53 # deadlocks less likely to occur.
54 request.settimeout(10.0)
55
56 return (request, client_address)
57
58 class ESC[4;38;5;81mLoopbackHttpServerThread(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
59 """Stoppable thread that runs a loopback http server."""
60
61 def __init__(self, request_handler):
62 threading.Thread.__init__(self)
63 self._stop_server = False
64 self.ready = threading.Event()
65 request_handler.protocol_version = "HTTP/1.0"
66 self.httpd = LoopbackHttpServer(("127.0.0.1", 0),
67 request_handler)
68 self.port = self.httpd.server_port
69
70 def stop(self):
71 """Stops the webserver if it's currently running."""
72
73 self._stop_server = True
74
75 self.join()
76 self.httpd.server_close()
77
78 def run(self):
79 self.ready.set()
80 while not self._stop_server:
81 self.httpd.handle_request()
82
83 # Authentication infrastructure
84
85 class ESC[4;38;5;81mDigestAuthHandler:
86 """Handler for performing digest authentication."""
87
88 def __init__(self):
89 self._request_num = 0
90 self._nonces = []
91 self._users = {}
92 self._realm_name = "Test Realm"
93 self._qop = "auth"
94
95 def set_qop(self, qop):
96 self._qop = qop
97
98 def set_users(self, users):
99 assert isinstance(users, dict)
100 self._users = users
101
102 def set_realm(self, realm):
103 self._realm_name = realm
104
105 def _generate_nonce(self):
106 self._request_num += 1
107 nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest()
108 self._nonces.append(nonce)
109 return nonce
110
111 def _create_auth_dict(self, auth_str):
112 first_space_index = auth_str.find(" ")
113 auth_str = auth_str[first_space_index+1:]
114
115 parts = auth_str.split(",")
116
117 auth_dict = {}
118 for part in parts:
119 name, value = part.split("=")
120 name = name.strip()
121 if value[0] == '"' and value[-1] == '"':
122 value = value[1:-1]
123 else:
124 value = value.strip()
125 auth_dict[name] = value
126 return auth_dict
127
128 def _validate_auth(self, auth_dict, password, method, uri):
129 final_dict = {}
130 final_dict.update(auth_dict)
131 final_dict["password"] = password
132 final_dict["method"] = method
133 final_dict["uri"] = uri
134 HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
135 HA1 = hashlib.md5(HA1_str.encode("ascii")).hexdigest()
136 HA2_str = "%(method)s:%(uri)s" % final_dict
137 HA2 = hashlib.md5(HA2_str.encode("ascii")).hexdigest()
138 final_dict["HA1"] = HA1
139 final_dict["HA2"] = HA2
140 response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
141 "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
142 response = hashlib.md5(response_str.encode("ascii")).hexdigest()
143
144 return response == auth_dict["response"]
145
146 def _return_auth_challenge(self, request_handler):
147 request_handler.send_response(407, "Proxy Authentication Required")
148 request_handler.send_header("Content-Type", "text/html")
149 request_handler.send_header(
150 'Proxy-Authenticate', 'Digest realm="%s", '
151 'qop="%s",'
152 'nonce="%s", ' % \
153 (self._realm_name, self._qop, self._generate_nonce()))
154 # XXX: Not sure if we're supposed to add this next header or
155 # not.
156 #request_handler.send_header('Connection', 'close')
157 request_handler.end_headers()
158 request_handler.wfile.write(b"Proxy Authentication Required.")
159 return False
160
161 def handle_request(self, request_handler):
162 """Performs digest authentication on the given HTTP request
163 handler. Returns True if authentication was successful, False
164 otherwise.
165
166 If no users have been set, then digest auth is effectively
167 disabled and this method will always return True.
168 """
169
170 if len(self._users) == 0:
171 return True
172
173 if "Proxy-Authorization" not in request_handler.headers:
174 return self._return_auth_challenge(request_handler)
175 else:
176 auth_dict = self._create_auth_dict(
177 request_handler.headers["Proxy-Authorization"]
178 )
179 if auth_dict["username"] in self._users:
180 password = self._users[ auth_dict["username"] ]
181 else:
182 return self._return_auth_challenge(request_handler)
183 if not auth_dict.get("nonce") in self._nonces:
184 return self._return_auth_challenge(request_handler)
185 else:
186 self._nonces.remove(auth_dict["nonce"])
187
188 auth_validated = False
189
190 # MSIE uses short_path in its validation, but Python's
191 # urllib.request uses the full path, so we're going to see if
192 # either of them works here.
193
194 for path in [request_handler.path, request_handler.short_path]:
195 if self._validate_auth(auth_dict,
196 password,
197 request_handler.command,
198 path):
199 auth_validated = True
200
201 if not auth_validated:
202 return self._return_auth_challenge(request_handler)
203 return True
204
205
206 class ESC[4;38;5;81mBasicAuthHandler(ESC[4;38;5;149mhttpESC[4;38;5;149m.ESC[4;38;5;149mserverESC[4;38;5;149m.ESC[4;38;5;149mBaseHTTPRequestHandler):
207 """Handler for performing basic authentication."""
208 # Server side values
209 USER = 'testUser'
210 PASSWD = 'testPass'
211 REALM = 'Test'
212 USER_PASSWD = "%s:%s" % (USER, PASSWD)
213 ENCODED_AUTH = base64.b64encode(USER_PASSWD.encode('ascii')).decode('ascii')
214
215 def __init__(self, *args, **kwargs):
216 http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
217
218 def log_message(self, format, *args):
219 # Suppress console log message
220 pass
221
222 def do_HEAD(self):
223 self.send_response(200)
224 self.send_header("Content-type", "text/html")
225 self.end_headers()
226
227 def do_AUTHHEAD(self):
228 self.send_response(401)
229 self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM)
230 self.send_header("Content-type", "text/html")
231 self.end_headers()
232
233 def do_GET(self):
234 if not self.headers.get("Authorization", ""):
235 self.do_AUTHHEAD()
236 self.wfile.write(b"No Auth header received")
237 elif self.headers.get(
238 "Authorization", "") == "Basic " + self.ENCODED_AUTH:
239 self.send_response(200)
240 self.end_headers()
241 self.wfile.write(b"It works")
242 else:
243 # Request Unauthorized
244 self.do_AUTHHEAD()
245
246
247
248 # Proxy test infrastructure
249
250 class ESC[4;38;5;81mFakeProxyHandler(ESC[4;38;5;149mhttpESC[4;38;5;149m.ESC[4;38;5;149mserverESC[4;38;5;149m.ESC[4;38;5;149mBaseHTTPRequestHandler):
251 """This is a 'fake proxy' that makes it look like the entire
252 internet has gone down due to a sudden zombie invasion. It main
253 utility is in providing us with authentication support for
254 testing.
255 """
256
257 def __init__(self, digest_auth_handler, *args, **kwargs):
258 # This has to be set before calling our parent's __init__(), which will
259 # try to call do_GET().
260 self.digest_auth_handler = digest_auth_handler
261 http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
262
263 def log_message(self, format, *args):
264 # Uncomment the next line for debugging.
265 # sys.stderr.write(format % args)
266 pass
267
268 def do_GET(self):
269 (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse(
270 self.path, "http")
271 self.short_path = path
272 if self.digest_auth_handler.handle_request(self):
273 self.send_response(200, "OK")
274 self.send_header("Content-Type", "text/html")
275 self.end_headers()
276 self.wfile.write(bytes("You've reached %s!<BR>" % self.path,
277 "ascii"))
278 self.wfile.write(b"Our apologies, but our server is down due to "
279 b"a sudden zombie invasion.")
280
281 # Test cases
282
283 class ESC[4;38;5;81mBasicAuthTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
284 USER = "testUser"
285 PASSWD = "testPass"
286 INCORRECT_PASSWD = "Incorrect"
287 REALM = "Test"
288
289 def setUp(self):
290 super(BasicAuthTests, self).setUp()
291 # With Basic Authentication
292 def http_server_with_basic_auth_handler(*args, **kwargs):
293 return BasicAuthHandler(*args, **kwargs)
294 self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler)
295 self.addCleanup(self.stop_server)
296 self.server_url = 'http://127.0.0.1:%s' % self.server.port
297 self.server.start()
298 self.server.ready.wait()
299
300 def stop_server(self):
301 self.server.stop()
302 self.server = None
303
304 def tearDown(self):
305 super(BasicAuthTests, self).tearDown()
306
307 def test_basic_auth_success(self):
308 ah = urllib.request.HTTPBasicAuthHandler()
309 ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD)
310 urllib.request.install_opener(urllib.request.build_opener(ah))
311 try:
312 self.assertTrue(urllib.request.urlopen(self.server_url))
313 except urllib.error.HTTPError:
314 self.fail("Basic auth failed for the url: %s" % self.server_url)
315
316 def test_basic_auth_httperror(self):
317 ah = urllib.request.HTTPBasicAuthHandler()
318 ah.add_password(self.REALM, self.server_url, self.USER, self.INCORRECT_PASSWD)
319 urllib.request.install_opener(urllib.request.build_opener(ah))
320 self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, self.server_url)
321
322
323 @hashlib_helper.requires_hashdigest("md5", openssl=True)
324 class ESC[4;38;5;81mProxyAuthTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
325 URL = "http://localhost"
326
327 USER = "tester"
328 PASSWD = "test123"
329 REALM = "TestRealm"
330
331 def setUp(self):
332 super(ProxyAuthTests, self).setUp()
333 # Ignore proxy bypass settings in the environment.
334 def restore_environ(old_environ):
335 os.environ.clear()
336 os.environ.update(old_environ)
337 self.addCleanup(restore_environ, os.environ.copy())
338 os.environ['NO_PROXY'] = ''
339 os.environ['no_proxy'] = ''
340
341 self.digest_auth_handler = DigestAuthHandler()
342 self.digest_auth_handler.set_users({self.USER: self.PASSWD})
343 self.digest_auth_handler.set_realm(self.REALM)
344 # With Digest Authentication.
345 def create_fake_proxy_handler(*args, **kwargs):
346 return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
347
348 self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
349 self.addCleanup(self.stop_server)
350 self.server.start()
351 self.server.ready.wait()
352 proxy_url = "http://127.0.0.1:%d" % self.server.port
353 handler = urllib.request.ProxyHandler({"http" : proxy_url})
354 self.proxy_digest_handler = urllib.request.ProxyDigestAuthHandler()
355 self.opener = urllib.request.build_opener(
356 handler, self.proxy_digest_handler)
357
358 def stop_server(self):
359 self.server.stop()
360 self.server = None
361
362 def test_proxy_with_bad_password_raises_httperror(self):
363 self.proxy_digest_handler.add_password(self.REALM, self.URL,
364 self.USER, self.PASSWD+"bad")
365 self.digest_auth_handler.set_qop("auth")
366 self.assertRaises(urllib.error.HTTPError,
367 self.opener.open,
368 self.URL)
369
370 def test_proxy_with_no_password_raises_httperror(self):
371 self.digest_auth_handler.set_qop("auth")
372 self.assertRaises(urllib.error.HTTPError,
373 self.opener.open,
374 self.URL)
375
376 def test_proxy_qop_auth_works(self):
377 self.proxy_digest_handler.add_password(self.REALM, self.URL,
378 self.USER, self.PASSWD)
379 self.digest_auth_handler.set_qop("auth")
380 with self.opener.open(self.URL) as result:
381 while result.read():
382 pass
383
384 def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
385 self.proxy_digest_handler.add_password(self.REALM, self.URL,
386 self.USER, self.PASSWD)
387 self.digest_auth_handler.set_qop("auth-int")
388 try:
389 result = self.opener.open(self.URL)
390 except urllib.error.URLError:
391 # It's okay if we don't support auth-int, but we certainly
392 # shouldn't receive any kind of exception here other than
393 # a URLError.
394 pass
395 else:
396 with result:
397 while result.read():
398 pass
399
400
401 def GetRequestHandler(responses):
402
403 class ESC[4;38;5;81mFakeHTTPRequestHandler(ESC[4;38;5;149mhttpESC[4;38;5;149m.ESC[4;38;5;149mserverESC[4;38;5;149m.ESC[4;38;5;149mBaseHTTPRequestHandler):
404
405 server_version = "TestHTTP/"
406 requests = []
407 headers_received = []
408 port = 80
409
410 def do_GET(self):
411 body = self.send_head()
412 while body:
413 done = self.wfile.write(body)
414 body = body[done:]
415
416 def do_POST(self):
417 content_length = self.headers["Content-Length"]
418 post_data = self.rfile.read(int(content_length))
419 self.do_GET()
420 self.requests.append(post_data)
421
422 def send_head(self):
423 FakeHTTPRequestHandler.headers_received = self.headers
424 self.requests.append(self.path)
425 response_code, headers, body = responses.pop(0)
426
427 self.send_response(response_code)
428
429 for (header, value) in headers:
430 self.send_header(header, value % {'port':self.port})
431 if body:
432 self.send_header("Content-type", "text/plain")
433 self.end_headers()
434 return body
435 self.end_headers()
436
437 def log_message(self, *args):
438 pass
439
440
441 return FakeHTTPRequestHandler
442
443
444 class ESC[4;38;5;81mTestUrlopen(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
445 """Tests urllib.request.urlopen using the network.
446
447 These tests are not exhaustive. Assuming that testing using files does a
448 good job overall of some of the basic interface features. There are no
449 tests exercising the optional 'data' and 'proxies' arguments. No tests
450 for transparent redirection have been written.
451 """
452
453 def setUp(self):
454 super(TestUrlopen, self).setUp()
455
456 # clear _opener global variable
457 self.addCleanup(urllib.request.urlcleanup)
458
459 # Ignore proxies for localhost tests.
460 def restore_environ(old_environ):
461 os.environ.clear()
462 os.environ.update(old_environ)
463 self.addCleanup(restore_environ, os.environ.copy())
464 os.environ['NO_PROXY'] = '*'
465 os.environ['no_proxy'] = '*'
466
467 def urlopen(self, url, data=None, **kwargs):
468 l = []
469 f = urllib.request.urlopen(url, data, **kwargs)
470 try:
471 # Exercise various methods
472 l.extend(f.readlines(200))
473 l.append(f.readline())
474 l.append(f.read(1024))
475 l.append(f.read())
476 finally:
477 f.close()
478 return b"".join(l)
479
480 def stop_server(self):
481 self.server.stop()
482 self.server = None
483
484 def start_server(self, responses=None):
485 if responses is None:
486 responses = [(200, [], b"we don't care")]
487 handler = GetRequestHandler(responses)
488
489 self.server = LoopbackHttpServerThread(handler)
490 self.addCleanup(self.stop_server)
491 self.server.start()
492 self.server.ready.wait()
493 port = self.server.port
494 handler.port = port
495 return handler
496
497 def start_https_server(self, responses=None, **kwargs):
498 if not hasattr(urllib.request, 'HTTPSHandler'):
499 self.skipTest('ssl support required')
500 from test.ssl_servers import make_https_server
501 if responses is None:
502 responses = [(200, [], b"we care a bit")]
503 handler = GetRequestHandler(responses)
504 server = make_https_server(self, handler_class=handler, **kwargs)
505 handler.port = server.port
506 return handler
507
508 def test_redirection(self):
509 expected_response = b"We got here..."
510 responses = [
511 (302, [("Location", "http://localhost:%(port)s/somewhere_else")],
512 ""),
513 (200, [], expected_response)
514 ]
515
516 handler = self.start_server(responses)
517 data = self.urlopen("http://localhost:%s/" % handler.port)
518 self.assertEqual(data, expected_response)
519 self.assertEqual(handler.requests, ["/", "/somewhere_else"])
520
521 def test_chunked(self):
522 expected_response = b"hello world"
523 chunked_start = (
524 b'a\r\n'
525 b'hello worl\r\n'
526 b'1\r\n'
527 b'd\r\n'
528 b'0\r\n'
529 )
530 response = [(200, [("Transfer-Encoding", "chunked")], chunked_start)]
531 handler = self.start_server(response)
532 data = self.urlopen("http://localhost:%s/" % handler.port)
533 self.assertEqual(data, expected_response)
534
535 def test_404(self):
536 expected_response = b"Bad bad bad..."
537 handler = self.start_server([(404, [], expected_response)])
538
539 try:
540 self.urlopen("http://localhost:%s/weeble" % handler.port)
541 except urllib.error.URLError as f:
542 data = f.read()
543 f.close()
544 else:
545 self.fail("404 should raise URLError")
546
547 self.assertEqual(data, expected_response)
548 self.assertEqual(handler.requests, ["/weeble"])
549
550 def test_200(self):
551 expected_response = b"pycon 2008..."
552 handler = self.start_server([(200, [], expected_response)])
553 data = self.urlopen("http://localhost:%s/bizarre" % handler.port)
554 self.assertEqual(data, expected_response)
555 self.assertEqual(handler.requests, ["/bizarre"])
556
557 def test_200_with_parameters(self):
558 expected_response = b"pycon 2008..."
559 handler = self.start_server([(200, [], expected_response)])
560 data = self.urlopen("http://localhost:%s/bizarre" % handler.port,
561 b"get=with_feeling")
562 self.assertEqual(data, expected_response)
563 self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"])
564
565 def test_https(self):
566 handler = self.start_https_server()
567 context = ssl.create_default_context(cafile=CERT_localhost)
568 data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context)
569 self.assertEqual(data, b"we care a bit")
570
571 def test_https_with_cafile(self):
572 handler = self.start_https_server(certfile=CERT_localhost)
573 with warnings_helper.check_warnings(('', DeprecationWarning)):
574 # Good cert
575 data = self.urlopen("https://localhost:%s/bizarre" % handler.port,
576 cafile=CERT_localhost)
577 self.assertEqual(data, b"we care a bit")
578 # Bad cert
579 with self.assertRaises(urllib.error.URLError) as cm:
580 self.urlopen("https://localhost:%s/bizarre" % handler.port,
581 cafile=CERT_fakehostname)
582 # Good cert, but mismatching hostname
583 handler = self.start_https_server(certfile=CERT_fakehostname)
584 with self.assertRaises(urllib.error.URLError) as cm:
585 self.urlopen("https://localhost:%s/bizarre" % handler.port,
586 cafile=CERT_fakehostname)
587
588 def test_https_with_cadefault(self):
589 handler = self.start_https_server(certfile=CERT_localhost)
590 # Self-signed cert should fail verification with system certificate store
591 with warnings_helper.check_warnings(('', DeprecationWarning)):
592 with self.assertRaises(urllib.error.URLError) as cm:
593 self.urlopen("https://localhost:%s/bizarre" % handler.port,
594 cadefault=True)
595
596 def test_https_sni(self):
597 if ssl is None:
598 self.skipTest("ssl module required")
599 if not ssl.HAS_SNI:
600 self.skipTest("SNI support required in OpenSSL")
601 sni_name = None
602 def cb_sni(ssl_sock, server_name, initial_context):
603 nonlocal sni_name
604 sni_name = server_name
605 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
606 context.set_servername_callback(cb_sni)
607 handler = self.start_https_server(context=context, certfile=CERT_localhost)
608 context = ssl.create_default_context(cafile=CERT_localhost)
609 self.urlopen("https://localhost:%s" % handler.port, context=context)
610 self.assertEqual(sni_name, "localhost")
611
612 def test_sending_headers(self):
613 handler = self.start_server()
614 req = urllib.request.Request("http://localhost:%s/" % handler.port,
615 headers={"Range": "bytes=20-39"})
616 with urllib.request.urlopen(req):
617 pass
618 self.assertEqual(handler.headers_received["Range"], "bytes=20-39")
619
620 def test_sending_headers_camel(self):
621 handler = self.start_server()
622 req = urllib.request.Request("http://localhost:%s/" % handler.port,
623 headers={"X-SoMe-hEader": "foobar"})
624 with urllib.request.urlopen(req):
625 pass
626 self.assertIn("X-Some-Header", handler.headers_received.keys())
627 self.assertNotIn("X-SoMe-hEader", handler.headers_received.keys())
628
629 def test_basic(self):
630 handler = self.start_server()
631 with urllib.request.urlopen("http://localhost:%s" % handler.port) as open_url:
632 for attr in ("read", "close", "info", "geturl"):
633 self.assertTrue(hasattr(open_url, attr), "object returned from "
634 "urlopen lacks the %s attribute" % attr)
635 self.assertTrue(open_url.read(), "calling 'read' failed")
636
637 def test_info(self):
638 handler = self.start_server()
639 open_url = urllib.request.urlopen(
640 "http://localhost:%s" % handler.port)
641 with open_url:
642 info_obj = open_url.info()
643 self.assertIsInstance(info_obj, email.message.Message,
644 "object returned by 'info' is not an "
645 "instance of email.message.Message")
646 self.assertEqual(info_obj.get_content_subtype(), "plain")
647
648 def test_geturl(self):
649 # Make sure same URL as opened is returned by geturl.
650 handler = self.start_server()
651 open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
652 with open_url:
653 url = open_url.geturl()
654 self.assertEqual(url, "http://localhost:%s" % handler.port)
655
656 def test_iteration(self):
657 expected_response = b"pycon 2008..."
658 handler = self.start_server([(200, [], expected_response)])
659 data = urllib.request.urlopen("http://localhost:%s" % handler.port)
660 for line in data:
661 self.assertEqual(line, expected_response)
662
663 def test_line_iteration(self):
664 lines = [b"We\n", b"got\n", b"here\n", b"verylong " * 8192 + b"\n"]
665 expected_response = b"".join(lines)
666 handler = self.start_server([(200, [], expected_response)])
667 data = urllib.request.urlopen("http://localhost:%s" % handler.port)
668 for index, line in enumerate(data):
669 self.assertEqual(line, lines[index],
670 "Fetched line number %s doesn't match expected:\n"
671 " Expected length was %s, got %s" %
672 (index, len(lines[index]), len(line)))
673 self.assertEqual(index + 1, len(lines))
674
675 def test_issue16464(self):
676 # See https://bugs.python.org/issue16464
677 # and https://bugs.python.org/issue46648
678 handler = self.start_server([
679 (200, [], b'any'),
680 (200, [], b'any'),
681 ])
682 opener = urllib.request.build_opener()
683 request = urllib.request.Request("http://localhost:%s" % handler.port)
684 self.assertEqual(None, request.data)
685
686 opener.open(request, "1".encode("us-ascii"))
687 self.assertEqual(b"1", request.data)
688 self.assertEqual("1", request.get_header("Content-length"))
689
690 opener.open(request, "1234567890".encode("us-ascii"))
691 self.assertEqual(b"1234567890", request.data)
692 self.assertEqual("10", request.get_header("Content-length"))
693
694 def setUpModule():
695 thread_info = threading_helper.threading_setup()
696 unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
697
698
699 if __name__ == "__main__":
700 unittest.main()