(root)/
Python-3.12.0/
Lib/
test/
test_urllib2_localnet.py
       1  import base64
       2  import os
       3  import email
       4  import urllib.parse
       5  import urllib.request
       6  import http.server
       7  import threading
       8  import unittest
       9  import hashlib
      10  
      11  from test import support
      12  from test.support import hashlib_helper
      13  from test.support import threading_helper
      14  from test.support import warnings_helper
      15  
      16  try:
      17      import ssl
      18  except ImportError:
      19      ssl = None
      20  
      21  support.requires_working_socket(module=True)
      22  
      23  here = os.path.dirname(__file__)
      24  # Self-signed cert file for 'localhost'
      25  CERT_localhost = os.path.join(here, 'keycert.pem')
      26  # Self-signed cert file for 'fakehostname'
      27  CERT_fakehostname = os.path.join(here, 'keycert2.pem')
      28  
      29  
      30  # Loopback http server infrastructure
      31  
      32  class ESC[4;38;5;81mLoopbackHttpServer(ESC[4;38;5;149mhttpESC[4;38;5;149m.ESC[4;38;5;149mserverESC[4;38;5;149m.ESC[4;38;5;149mHTTPServer):
      33      """HTTP server w/ a few modifications that make it useful for
      34      loopback testing purposes.
      35      """
      36  
      37      def __init__(self, server_address, RequestHandlerClass):
      38          http.server.HTTPServer.__init__(self,
      39                                          server_address,
      40                                          RequestHandlerClass)
      41  
      42          # Set the timeout of our listening socket really low so
      43          # that we can stop the server easily.
      44          self.socket.settimeout(0.1)
      45  
      46      def get_request(self):
      47          """HTTPServer method, overridden."""
      48  
      49          request, client_address = self.socket.accept()
      50  
      51          # It's a loopback connection, so setting the timeout
      52          # really low shouldn't affect anything, but should make
      53          # deadlocks less likely to occur.
      54          request.settimeout(10.0)
      55  
      56          return (request, client_address)
      57  
      58  class ESC[4;38;5;81mLoopbackHttpServerThread(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
      59      """Stoppable thread that runs a loopback http server."""
      60  
      61      def __init__(self, request_handler):
      62          threading.Thread.__init__(self)
      63          self._stop_server = False
      64          self.ready = threading.Event()
      65          request_handler.protocol_version = "HTTP/1.0"
      66          self.httpd = LoopbackHttpServer(("127.0.0.1", 0),
      67                                          request_handler)
      68          self.port = self.httpd.server_port
      69  
      70      def stop(self):
      71          """Stops the webserver if it's currently running."""
      72  
      73          self._stop_server = True
      74  
      75          self.join()
      76          self.httpd.server_close()
      77  
      78      def run(self):
      79          self.ready.set()
      80          while not self._stop_server:
      81              self.httpd.handle_request()
      82  
      83  # Authentication infrastructure
      84  
      85  class ESC[4;38;5;81mDigestAuthHandler:
      86      """Handler for performing digest authentication."""
      87  
      88      def __init__(self):
      89          self._request_num = 0
      90          self._nonces = []
      91          self._users = {}
      92          self._realm_name = "Test Realm"
      93          self._qop = "auth"
      94  
      95      def set_qop(self, qop):
      96          self._qop = qop
      97  
      98      def set_users(self, users):
      99          assert isinstance(users, dict)
     100          self._users = users
     101  
     102      def set_realm(self, realm):
     103          self._realm_name = realm
     104  
     105      def _generate_nonce(self):
     106          self._request_num += 1
     107          nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest()
     108          self._nonces.append(nonce)
     109          return nonce
     110  
     111      def _create_auth_dict(self, auth_str):
     112          first_space_index = auth_str.find(" ")
     113          auth_str = auth_str[first_space_index+1:]
     114  
     115          parts = auth_str.split(",")
     116  
     117          auth_dict = {}
     118          for part in parts:
     119              name, value = part.split("=")
     120              name = name.strip()
     121              if value[0] == '"' and value[-1] == '"':
     122                  value = value[1:-1]
     123              else:
     124                  value = value.strip()
     125              auth_dict[name] = value
     126          return auth_dict
     127  
     128      def _validate_auth(self, auth_dict, password, method, uri):
     129          final_dict = {}
     130          final_dict.update(auth_dict)
     131          final_dict["password"] = password
     132          final_dict["method"] = method
     133          final_dict["uri"] = uri
     134          HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
     135          HA1 = hashlib.md5(HA1_str.encode("ascii")).hexdigest()
     136          HA2_str = "%(method)s:%(uri)s" % final_dict
     137          HA2 = hashlib.md5(HA2_str.encode("ascii")).hexdigest()
     138          final_dict["HA1"] = HA1
     139          final_dict["HA2"] = HA2
     140          response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
     141                         "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
     142          response = hashlib.md5(response_str.encode("ascii")).hexdigest()
     143  
     144          return response == auth_dict["response"]
     145  
     146      def _return_auth_challenge(self, request_handler):
     147          request_handler.send_response(407, "Proxy Authentication Required")
     148          request_handler.send_header("Content-Type", "text/html")
     149          request_handler.send_header(
     150              'Proxy-Authenticate', 'Digest realm="%s", '
     151              'qop="%s",'
     152              'nonce="%s", ' % \
     153              (self._realm_name, self._qop, self._generate_nonce()))
     154          # XXX: Not sure if we're supposed to add this next header or
     155          # not.
     156          #request_handler.send_header('Connection', 'close')
     157          request_handler.end_headers()
     158          request_handler.wfile.write(b"Proxy Authentication Required.")
     159          return False
     160  
     161      def handle_request(self, request_handler):
     162          """Performs digest authentication on the given HTTP request
     163          handler.  Returns True if authentication was successful, False
     164          otherwise.
     165  
     166          If no users have been set, then digest auth is effectively
     167          disabled and this method will always return True.
     168          """
     169  
     170          if len(self._users) == 0:
     171              return True
     172  
     173          if "Proxy-Authorization" not in request_handler.headers:
     174              return self._return_auth_challenge(request_handler)
     175          else:
     176              auth_dict = self._create_auth_dict(
     177                  request_handler.headers["Proxy-Authorization"]
     178                  )
     179              if auth_dict["username"] in self._users:
     180                  password = self._users[ auth_dict["username"] ]
     181              else:
     182                  return self._return_auth_challenge(request_handler)
     183              if not auth_dict.get("nonce") in self._nonces:
     184                  return self._return_auth_challenge(request_handler)
     185              else:
     186                  self._nonces.remove(auth_dict["nonce"])
     187  
     188              auth_validated = False
     189  
     190              # MSIE uses short_path in its validation, but Python's
     191              # urllib.request uses the full path, so we're going to see if
     192              # either of them works here.
     193  
     194              for path in [request_handler.path, request_handler.short_path]:
     195                  if self._validate_auth(auth_dict,
     196                                         password,
     197                                         request_handler.command,
     198                                         path):
     199                      auth_validated = True
     200  
     201              if not auth_validated:
     202                  return self._return_auth_challenge(request_handler)
     203              return True
     204  
     205  
     206  class ESC[4;38;5;81mBasicAuthHandler(ESC[4;38;5;149mhttpESC[4;38;5;149m.ESC[4;38;5;149mserverESC[4;38;5;149m.ESC[4;38;5;149mBaseHTTPRequestHandler):
     207      """Handler for performing basic authentication."""
     208      # Server side values
     209      USER = 'testUser'
     210      PASSWD = 'testPass'
     211      REALM = 'Test'
     212      USER_PASSWD = "%s:%s" % (USER, PASSWD)
     213      ENCODED_AUTH = base64.b64encode(USER_PASSWD.encode('ascii')).decode('ascii')
     214  
     215      def __init__(self, *args, **kwargs):
     216          http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
     217  
     218      def log_message(self, format, *args):
     219          # Suppress console log message
     220          pass
     221  
     222      def do_HEAD(self):
     223          self.send_response(200)
     224          self.send_header("Content-type", "text/html")
     225          self.end_headers()
     226  
     227      def do_AUTHHEAD(self):
     228          self.send_response(401)
     229          self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM)
     230          self.send_header("Content-type", "text/html")
     231          self.end_headers()
     232  
     233      def do_GET(self):
     234          if not self.headers.get("Authorization", ""):
     235              self.do_AUTHHEAD()
     236              self.wfile.write(b"No Auth header received")
     237          elif self.headers.get(
     238                  "Authorization", "") == "Basic " + self.ENCODED_AUTH:
     239              self.send_response(200)
     240              self.end_headers()
     241              self.wfile.write(b"It works")
     242          else:
     243              # Request Unauthorized
     244              self.do_AUTHHEAD()
     245  
     246  
     247  
     248  # Proxy test infrastructure
     249  
     250  class ESC[4;38;5;81mFakeProxyHandler(ESC[4;38;5;149mhttpESC[4;38;5;149m.ESC[4;38;5;149mserverESC[4;38;5;149m.ESC[4;38;5;149mBaseHTTPRequestHandler):
     251      """This is a 'fake proxy' that makes it look like the entire
     252      internet has gone down due to a sudden zombie invasion.  It main
     253      utility is in providing us with authentication support for
     254      testing.
     255      """
     256  
     257      def __init__(self, digest_auth_handler, *args, **kwargs):
     258          # This has to be set before calling our parent's __init__(), which will
     259          # try to call do_GET().
     260          self.digest_auth_handler = digest_auth_handler
     261          http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
     262  
     263      def log_message(self, format, *args):
     264          # Uncomment the next line for debugging.
     265          # sys.stderr.write(format % args)
     266          pass
     267  
     268      def do_GET(self):
     269          (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse(
     270              self.path, "http")
     271          self.short_path = path
     272          if self.digest_auth_handler.handle_request(self):
     273              self.send_response(200, "OK")
     274              self.send_header("Content-Type", "text/html")
     275              self.end_headers()
     276              self.wfile.write(bytes("You've reached %s!<BR>" % self.path,
     277                                     "ascii"))
     278              self.wfile.write(b"Our apologies, but our server is down due to "
     279                               b"a sudden zombie invasion.")
     280  
     281  # Test cases
     282  
     283  class ESC[4;38;5;81mBasicAuthTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     284      USER = "testUser"
     285      PASSWD = "testPass"
     286      INCORRECT_PASSWD = "Incorrect"
     287      REALM = "Test"
     288  
     289      def setUp(self):
     290          super(BasicAuthTests, self).setUp()
     291          # With Basic Authentication
     292          def http_server_with_basic_auth_handler(*args, **kwargs):
     293              return BasicAuthHandler(*args, **kwargs)
     294          self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler)
     295          self.addCleanup(self.stop_server)
     296          self.server_url = 'http://127.0.0.1:%s' % self.server.port
     297          self.server.start()
     298          self.server.ready.wait()
     299  
     300      def stop_server(self):
     301          self.server.stop()
     302          self.server = None
     303  
     304      def tearDown(self):
     305          super(BasicAuthTests, self).tearDown()
     306  
     307      def test_basic_auth_success(self):
     308          ah = urllib.request.HTTPBasicAuthHandler()
     309          ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD)
     310          urllib.request.install_opener(urllib.request.build_opener(ah))
     311          try:
     312              self.assertTrue(urllib.request.urlopen(self.server_url))
     313          except urllib.error.HTTPError:
     314              self.fail("Basic auth failed for the url: %s" % self.server_url)
     315  
     316      def test_basic_auth_httperror(self):
     317          ah = urllib.request.HTTPBasicAuthHandler()
     318          ah.add_password(self.REALM, self.server_url, self.USER, self.INCORRECT_PASSWD)
     319          urllib.request.install_opener(urllib.request.build_opener(ah))
     320          self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, self.server_url)
     321  
     322  
     323  @hashlib_helper.requires_hashdigest("md5", openssl=True)
     324  class ESC[4;38;5;81mProxyAuthTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     325      URL = "http://localhost"
     326  
     327      USER = "tester"
     328      PASSWD = "test123"
     329      REALM = "TestRealm"
     330  
     331      def setUp(self):
     332          super(ProxyAuthTests, self).setUp()
     333          # Ignore proxy bypass settings in the environment.
     334          def restore_environ(old_environ):
     335              os.environ.clear()
     336              os.environ.update(old_environ)
     337          self.addCleanup(restore_environ, os.environ.copy())
     338          os.environ['NO_PROXY'] = ''
     339          os.environ['no_proxy'] = ''
     340  
     341          self.digest_auth_handler = DigestAuthHandler()
     342          self.digest_auth_handler.set_users({self.USER: self.PASSWD})
     343          self.digest_auth_handler.set_realm(self.REALM)
     344          # With Digest Authentication.
     345          def create_fake_proxy_handler(*args, **kwargs):
     346              return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
     347  
     348          self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
     349          self.addCleanup(self.stop_server)
     350          self.server.start()
     351          self.server.ready.wait()
     352          proxy_url = "http://127.0.0.1:%d" % self.server.port
     353          handler = urllib.request.ProxyHandler({"http" : proxy_url})
     354          self.proxy_digest_handler = urllib.request.ProxyDigestAuthHandler()
     355          self.opener = urllib.request.build_opener(
     356              handler, self.proxy_digest_handler)
     357  
     358      def stop_server(self):
     359          self.server.stop()
     360          self.server = None
     361  
     362      def test_proxy_with_bad_password_raises_httperror(self):
     363          self.proxy_digest_handler.add_password(self.REALM, self.URL,
     364                                                 self.USER, self.PASSWD+"bad")
     365          self.digest_auth_handler.set_qop("auth")
     366          self.assertRaises(urllib.error.HTTPError,
     367                            self.opener.open,
     368                            self.URL)
     369  
     370      def test_proxy_with_no_password_raises_httperror(self):
     371          self.digest_auth_handler.set_qop("auth")
     372          self.assertRaises(urllib.error.HTTPError,
     373                            self.opener.open,
     374                            self.URL)
     375  
     376      def test_proxy_qop_auth_works(self):
     377          self.proxy_digest_handler.add_password(self.REALM, self.URL,
     378                                                 self.USER, self.PASSWD)
     379          self.digest_auth_handler.set_qop("auth")
     380          with self.opener.open(self.URL) as result:
     381              while result.read():
     382                  pass
     383  
     384      def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
     385          self.proxy_digest_handler.add_password(self.REALM, self.URL,
     386                                                 self.USER, self.PASSWD)
     387          self.digest_auth_handler.set_qop("auth-int")
     388          try:
     389              result = self.opener.open(self.URL)
     390          except urllib.error.URLError:
     391              # It's okay if we don't support auth-int, but we certainly
     392              # shouldn't receive any kind of exception here other than
     393              # a URLError.
     394              pass
     395          else:
     396              with result:
     397                  while result.read():
     398                      pass
     399  
     400  
     401  def GetRequestHandler(responses):
     402  
     403      class ESC[4;38;5;81mFakeHTTPRequestHandler(ESC[4;38;5;149mhttpESC[4;38;5;149m.ESC[4;38;5;149mserverESC[4;38;5;149m.ESC[4;38;5;149mBaseHTTPRequestHandler):
     404  
     405          server_version = "TestHTTP/"
     406          requests = []
     407          headers_received = []
     408          port = 80
     409  
     410          def do_GET(self):
     411              body = self.send_head()
     412              while body:
     413                  done = self.wfile.write(body)
     414                  body = body[done:]
     415  
     416          def do_POST(self):
     417              content_length = self.headers["Content-Length"]
     418              post_data = self.rfile.read(int(content_length))
     419              self.do_GET()
     420              self.requests.append(post_data)
     421  
     422          def send_head(self):
     423              FakeHTTPRequestHandler.headers_received = self.headers
     424              self.requests.append(self.path)
     425              response_code, headers, body = responses.pop(0)
     426  
     427              self.send_response(response_code)
     428  
     429              for (header, value) in headers:
     430                  self.send_header(header, value % {'port':self.port})
     431              if body:
     432                  self.send_header("Content-type", "text/plain")
     433                  self.end_headers()
     434                  return body
     435              self.end_headers()
     436  
     437          def log_message(self, *args):
     438              pass
     439  
     440  
     441      return FakeHTTPRequestHandler
     442  
     443  
     444  class ESC[4;38;5;81mTestUrlopen(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     445      """Tests urllib.request.urlopen using the network.
     446  
     447      These tests are not exhaustive.  Assuming that testing using files does a
     448      good job overall of some of the basic interface features.  There are no
     449      tests exercising the optional 'data' and 'proxies' arguments.  No tests
     450      for transparent redirection have been written.
     451      """
     452  
     453      def setUp(self):
     454          super(TestUrlopen, self).setUp()
     455  
     456          # clear _opener global variable
     457          self.addCleanup(urllib.request.urlcleanup)
     458  
     459          # Ignore proxies for localhost tests.
     460          def restore_environ(old_environ):
     461              os.environ.clear()
     462              os.environ.update(old_environ)
     463          self.addCleanup(restore_environ, os.environ.copy())
     464          os.environ['NO_PROXY'] = '*'
     465          os.environ['no_proxy'] = '*'
     466  
     467      def urlopen(self, url, data=None, **kwargs):
     468          l = []
     469          f = urllib.request.urlopen(url, data, **kwargs)
     470          try:
     471              # Exercise various methods
     472              l.extend(f.readlines(200))
     473              l.append(f.readline())
     474              l.append(f.read(1024))
     475              l.append(f.read())
     476          finally:
     477              f.close()
     478          return b"".join(l)
     479  
     480      def stop_server(self):
     481          self.server.stop()
     482          self.server = None
     483  
     484      def start_server(self, responses=None):
     485          if responses is None:
     486              responses = [(200, [], b"we don't care")]
     487          handler = GetRequestHandler(responses)
     488  
     489          self.server = LoopbackHttpServerThread(handler)
     490          self.addCleanup(self.stop_server)
     491          self.server.start()
     492          self.server.ready.wait()
     493          port = self.server.port
     494          handler.port = port
     495          return handler
     496  
     497      def start_https_server(self, responses=None, **kwargs):
     498          if not hasattr(urllib.request, 'HTTPSHandler'):
     499              self.skipTest('ssl support required')
     500          from test.ssl_servers import make_https_server
     501          if responses is None:
     502              responses = [(200, [], b"we care a bit")]
     503          handler = GetRequestHandler(responses)
     504          server = make_https_server(self, handler_class=handler, **kwargs)
     505          handler.port = server.port
     506          return handler
     507  
     508      def test_redirection(self):
     509          expected_response = b"We got here..."
     510          responses = [
     511              (302, [("Location", "http://localhost:%(port)s/somewhere_else")],
     512               ""),
     513              (200, [], expected_response)
     514          ]
     515  
     516          handler = self.start_server(responses)
     517          data = self.urlopen("http://localhost:%s/" % handler.port)
     518          self.assertEqual(data, expected_response)
     519          self.assertEqual(handler.requests, ["/", "/somewhere_else"])
     520  
     521      def test_chunked(self):
     522          expected_response = b"hello world"
     523          chunked_start = (
     524                          b'a\r\n'
     525                          b'hello worl\r\n'
     526                          b'1\r\n'
     527                          b'd\r\n'
     528                          b'0\r\n'
     529                          )
     530          response = [(200, [("Transfer-Encoding", "chunked")], chunked_start)]
     531          handler = self.start_server(response)
     532          data = self.urlopen("http://localhost:%s/" % handler.port)
     533          self.assertEqual(data, expected_response)
     534  
     535      def test_404(self):
     536          expected_response = b"Bad bad bad..."
     537          handler = self.start_server([(404, [], expected_response)])
     538  
     539          try:
     540              self.urlopen("http://localhost:%s/weeble" % handler.port)
     541          except urllib.error.URLError as f:
     542              data = f.read()
     543              f.close()
     544          else:
     545              self.fail("404 should raise URLError")
     546  
     547          self.assertEqual(data, expected_response)
     548          self.assertEqual(handler.requests, ["/weeble"])
     549  
     550      def test_200(self):
     551          expected_response = b"pycon 2008..."
     552          handler = self.start_server([(200, [], expected_response)])
     553          data = self.urlopen("http://localhost:%s/bizarre" % handler.port)
     554          self.assertEqual(data, expected_response)
     555          self.assertEqual(handler.requests, ["/bizarre"])
     556  
     557      def test_200_with_parameters(self):
     558          expected_response = b"pycon 2008..."
     559          handler = self.start_server([(200, [], expected_response)])
     560          data = self.urlopen("http://localhost:%s/bizarre" % handler.port,
     561                               b"get=with_feeling")
     562          self.assertEqual(data, expected_response)
     563          self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"])
     564  
     565      def test_https(self):
     566          handler = self.start_https_server()
     567          context = ssl.create_default_context(cafile=CERT_localhost)
     568          data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context)
     569          self.assertEqual(data, b"we care a bit")
     570  
     571      def test_https_with_cafile(self):
     572          handler = self.start_https_server(certfile=CERT_localhost)
     573          with warnings_helper.check_warnings(('', DeprecationWarning)):
     574              # Good cert
     575              data = self.urlopen("https://localhost:%s/bizarre" % handler.port,
     576                                  cafile=CERT_localhost)
     577              self.assertEqual(data, b"we care a bit")
     578              # Bad cert
     579              with self.assertRaises(urllib.error.URLError) as cm:
     580                  self.urlopen("https://localhost:%s/bizarre" % handler.port,
     581                               cafile=CERT_fakehostname)
     582              # Good cert, but mismatching hostname
     583              handler = self.start_https_server(certfile=CERT_fakehostname)
     584              with self.assertRaises(urllib.error.URLError) as cm:
     585                  self.urlopen("https://localhost:%s/bizarre" % handler.port,
     586                               cafile=CERT_fakehostname)
     587  
     588      def test_https_with_cadefault(self):
     589          handler = self.start_https_server(certfile=CERT_localhost)
     590          # Self-signed cert should fail verification with system certificate store
     591          with warnings_helper.check_warnings(('', DeprecationWarning)):
     592              with self.assertRaises(urllib.error.URLError) as cm:
     593                  self.urlopen("https://localhost:%s/bizarre" % handler.port,
     594                               cadefault=True)
     595  
     596      def test_https_sni(self):
     597          if ssl is None:
     598              self.skipTest("ssl module required")
     599          if not ssl.HAS_SNI:
     600              self.skipTest("SNI support required in OpenSSL")
     601          sni_name = None
     602          def cb_sni(ssl_sock, server_name, initial_context):
     603              nonlocal sni_name
     604              sni_name = server_name
     605          context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
     606          context.set_servername_callback(cb_sni)
     607          handler = self.start_https_server(context=context, certfile=CERT_localhost)
     608          context = ssl.create_default_context(cafile=CERT_localhost)
     609          self.urlopen("https://localhost:%s" % handler.port, context=context)
     610          self.assertEqual(sni_name, "localhost")
     611  
     612      def test_sending_headers(self):
     613          handler = self.start_server()
     614          req = urllib.request.Request("http://localhost:%s/" % handler.port,
     615                                       headers={"Range": "bytes=20-39"})
     616          with urllib.request.urlopen(req):
     617              pass
     618          self.assertEqual(handler.headers_received["Range"], "bytes=20-39")
     619  
     620      def test_sending_headers_camel(self):
     621          handler = self.start_server()
     622          req = urllib.request.Request("http://localhost:%s/" % handler.port,
     623                                       headers={"X-SoMe-hEader": "foobar"})
     624          with urllib.request.urlopen(req):
     625              pass
     626          self.assertIn("X-Some-Header", handler.headers_received.keys())
     627          self.assertNotIn("X-SoMe-hEader", handler.headers_received.keys())
     628  
     629      def test_basic(self):
     630          handler = self.start_server()
     631          with urllib.request.urlopen("http://localhost:%s" % handler.port) as open_url:
     632              for attr in ("read", "close", "info", "geturl"):
     633                  self.assertTrue(hasattr(open_url, attr), "object returned from "
     634                               "urlopen lacks the %s attribute" % attr)
     635              self.assertTrue(open_url.read(), "calling 'read' failed")
     636  
     637      def test_info(self):
     638          handler = self.start_server()
     639          open_url = urllib.request.urlopen(
     640              "http://localhost:%s" % handler.port)
     641          with open_url:
     642              info_obj = open_url.info()
     643          self.assertIsInstance(info_obj, email.message.Message,
     644                                "object returned by 'info' is not an "
     645                                "instance of email.message.Message")
     646          self.assertEqual(info_obj.get_content_subtype(), "plain")
     647  
     648      def test_geturl(self):
     649          # Make sure same URL as opened is returned by geturl.
     650          handler = self.start_server()
     651          open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
     652          with open_url:
     653              url = open_url.geturl()
     654          self.assertEqual(url, "http://localhost:%s" % handler.port)
     655  
     656      def test_iteration(self):
     657          expected_response = b"pycon 2008..."
     658          handler = self.start_server([(200, [], expected_response)])
     659          data = urllib.request.urlopen("http://localhost:%s" % handler.port)
     660          for line in data:
     661              self.assertEqual(line, expected_response)
     662  
     663      def test_line_iteration(self):
     664          lines = [b"We\n", b"got\n", b"here\n", b"verylong " * 8192 + b"\n"]
     665          expected_response = b"".join(lines)
     666          handler = self.start_server([(200, [], expected_response)])
     667          data = urllib.request.urlopen("http://localhost:%s" % handler.port)
     668          for index, line in enumerate(data):
     669              self.assertEqual(line, lines[index],
     670                               "Fetched line number %s doesn't match expected:\n"
     671                               "    Expected length was %s, got %s" %
     672                               (index, len(lines[index]), len(line)))
     673          self.assertEqual(index + 1, len(lines))
     674  
     675      def test_issue16464(self):
     676          # See https://bugs.python.org/issue16464
     677          # and https://bugs.python.org/issue46648
     678          handler = self.start_server([
     679              (200, [], b'any'),
     680              (200, [], b'any'),
     681          ])
     682          opener = urllib.request.build_opener()
     683          request = urllib.request.Request("http://localhost:%s" % handler.port)
     684          self.assertEqual(None, request.data)
     685  
     686          opener.open(request, "1".encode("us-ascii"))
     687          self.assertEqual(b"1", request.data)
     688          self.assertEqual("1", request.get_header("Content-length"))
     689  
     690          opener.open(request, "1234567890".encode("us-ascii"))
     691          self.assertEqual(b"1234567890", request.data)
     692          self.assertEqual("10", request.get_header("Content-length"))
     693  
     694  def setUpModule():
     695      thread_info = threading_helper.threading_setup()
     696      unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
     697  
     698  
     699  if __name__ == "__main__":
     700      unittest.main()