(root)/
Python-3.11.7/
Lib/
test/
test_urllib2.py
       1  import unittest
       2  from test import support
       3  from test.support import os_helper
       4  from test.support import socket_helper
       5  from test.support import warnings_helper
       6  from test import test_urllib
       7  
       8  import os
       9  import io
      10  import socket
      11  import array
      12  import sys
      13  import tempfile
      14  import subprocess
      15  
      16  import urllib.request
      17  # The proxy bypass method imported below has logic specific to the OSX
      18  # proxy config data structure but is testable on all platforms.
      19  from urllib.request import (Request, OpenerDirector, HTTPBasicAuthHandler,
      20                              HTTPPasswordMgrWithPriorAuth, _parse_proxy,
      21                              _proxy_bypass_macosx_sysconf,
      22                              AbstractDigestAuthHandler)
      23  from urllib.parse import urlparse
      24  import urllib.error
      25  import http.client
      26  
      27  support.requires_working_socket(module=True)
      28  
      29  # XXX
      30  # Request
      31  # CacheFTPHandler (hard to write)
      32  # parse_keqv_list, parse_http_list, HTTPDigestAuthHandler
      33  
      34  
      35  class ESC[4;38;5;81mTrivialTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      36  
      37      def test___all__(self):
      38          # Verify which names are exposed
      39          for module in 'request', 'response', 'parse', 'error', 'robotparser':
      40              context = {}
      41              exec('from urllib.%s import *' % module, context)
      42              del context['__builtins__']
      43              if module == 'request' and os.name == 'nt':
      44                  u, p = context.pop('url2pathname'), context.pop('pathname2url')
      45                  self.assertEqual(u.__module__, 'nturl2path')
      46                  self.assertEqual(p.__module__, 'nturl2path')
      47              for k, v in context.items():
      48                  self.assertEqual(v.__module__, 'urllib.%s' % module,
      49                      "%r is exposed in 'urllib.%s' but defined in %r" %
      50                      (k, module, v.__module__))
      51  
      52      def test_trivial(self):
      53          # A couple trivial tests
      54  
      55          # clear _opener global variable
      56          self.addCleanup(urllib.request.urlcleanup)
      57  
      58          self.assertRaises(ValueError, urllib.request.urlopen, 'bogus url')
      59  
      60          # XXX Name hacking to get this to work on Windows.
      61          fname = os.path.abspath(urllib.request.__file__).replace(os.sep, '/')
      62  
      63          if os.name == 'nt':
      64              file_url = "file:///%s" % fname
      65          else:
      66              file_url = "file://%s" % fname
      67  
      68          with urllib.request.urlopen(file_url) as f:
      69              f.read()
      70  
      71      def test_parse_http_list(self):
      72          tests = [
      73              ('a,b,c', ['a', 'b', 'c']),
      74              ('path"o,l"og"i"cal, example', ['path"o,l"og"i"cal', 'example']),
      75              ('a, b, "c", "d", "e,f", g, h',
      76               ['a', 'b', '"c"', '"d"', '"e,f"', 'g', 'h']),
      77              ('a="b\\"c", d="e\\,f", g="h\\\\i"',
      78               ['a="b"c"', 'd="e,f"', 'g="h\\i"'])]
      79          for string, list in tests:
      80              self.assertEqual(urllib.request.parse_http_list(string), list)
      81  
      82      def test_URLError_reasonstr(self):
      83          err = urllib.error.URLError('reason')
      84          self.assertIn(err.reason, str(err))
      85  
      86  
      87  class ESC[4;38;5;81mRequestHdrsTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      88  
      89      def test_request_headers_dict(self):
      90          """
      91          The Request.headers dictionary is not a documented interface.  It
      92          should stay that way, because the complete set of headers are only
      93          accessible through the .get_header(), .has_header(), .header_items()
      94          interface.  However, .headers pre-dates those methods, and so real code
      95          will be using the dictionary.
      96  
      97          The introduction in 2.4 of those methods was a mistake for the same
      98          reason: code that previously saw all (urllib2 user)-provided headers in
      99          .headers now sees only a subset.
     100  
     101          """
     102          url = "http://example.com"
     103          self.assertEqual(Request(url,
     104                                   headers={"Spam-eggs": "blah"}
     105                                   ).headers["Spam-eggs"], "blah")
     106          self.assertEqual(Request(url,
     107                                   headers={"spam-EggS": "blah"}
     108                                   ).headers["Spam-eggs"], "blah")
     109  
     110      def test_request_headers_methods(self):
     111          """
     112          Note the case normalization of header names here, to
     113          .capitalize()-case.  This should be preserved for
     114          backwards-compatibility.  (In the HTTP case, normalization to
     115          .title()-case is done by urllib2 before sending headers to
     116          http.client).
     117  
     118          Note that e.g. r.has_header("spam-EggS") is currently False, and
     119          r.get_header("spam-EggS") returns None, but that could be changed in
     120          future.
     121  
     122          Method r.remove_header should remove items both from r.headers and
     123          r.unredirected_hdrs dictionaries
     124          """
     125          url = "http://example.com"
     126          req = Request(url, headers={"Spam-eggs": "blah"})
     127          self.assertTrue(req.has_header("Spam-eggs"))
     128          self.assertEqual(req.header_items(), [('Spam-eggs', 'blah')])
     129  
     130          req.add_header("Foo-Bar", "baz")
     131          self.assertEqual(sorted(req.header_items()),
     132                           [('Foo-bar', 'baz'), ('Spam-eggs', 'blah')])
     133          self.assertFalse(req.has_header("Not-there"))
     134          self.assertIsNone(req.get_header("Not-there"))
     135          self.assertEqual(req.get_header("Not-there", "default"), "default")
     136  
     137          req.remove_header("Spam-eggs")
     138          self.assertFalse(req.has_header("Spam-eggs"))
     139  
     140          req.add_unredirected_header("Unredirected-spam", "Eggs")
     141          self.assertTrue(req.has_header("Unredirected-spam"))
     142  
     143          req.remove_header("Unredirected-spam")
     144          self.assertFalse(req.has_header("Unredirected-spam"))
     145  
     146      def test_password_manager(self):
     147          mgr = urllib.request.HTTPPasswordMgr()
     148          add = mgr.add_password
     149          find_user_pass = mgr.find_user_password
     150  
     151          add("Some Realm", "http://example.com/", "joe", "password")
     152          add("Some Realm", "http://example.com/ni", "ni", "ni")
     153          add("Some Realm", "http://c.example.com:3128", "3", "c")
     154          add("Some Realm", "d.example.com", "4", "d")
     155          add("Some Realm", "e.example.com:3128", "5", "e")
     156  
     157          # For the same realm, password set the highest path is the winner.
     158          self.assertEqual(find_user_pass("Some Realm", "example.com"),
     159                           ('joe', 'password'))
     160          self.assertEqual(find_user_pass("Some Realm", "http://example.com/ni"),
     161                           ('joe', 'password'))
     162          self.assertEqual(find_user_pass("Some Realm", "http://example.com"),
     163                           ('joe', 'password'))
     164          self.assertEqual(find_user_pass("Some Realm", "http://example.com/"),
     165                           ('joe', 'password'))
     166          self.assertEqual(find_user_pass("Some Realm",
     167                                          "http://example.com/spam"),
     168                           ('joe', 'password'))
     169          self.assertEqual(find_user_pass("Some Realm",
     170                                          "http://example.com/spam/spam"),
     171                           ('joe', 'password'))
     172  
     173          # You can have different passwords for different paths.
     174  
     175          add("c", "http://example.com/foo", "foo", "ni")
     176          add("c", "http://example.com/bar", "bar", "nini")
     177          add("c", "http://example.com/foo/bar", "foobar", "nibar")
     178  
     179          self.assertEqual(find_user_pass("c", "http://example.com/foo"),
     180                           ('foo', 'ni'))
     181          self.assertEqual(find_user_pass("c", "http://example.com/bar"),
     182                           ('bar', 'nini'))
     183          self.assertEqual(find_user_pass("c", "http://example.com/foo/"),
     184                           ('foo', 'ni'))
     185          self.assertEqual(find_user_pass("c", "http://example.com/foo/bar"),
     186                           ('foo', 'ni'))
     187          self.assertEqual(find_user_pass("c", "http://example.com/foo/baz"),
     188                           ('foo', 'ni'))
     189          self.assertEqual(find_user_pass("c", "http://example.com/foobar"),
     190                           (None, None))
     191  
     192          add("c", "http://example.com/baz/", "baz", "ninini")
     193  
     194          self.assertEqual(find_user_pass("c", "http://example.com/baz"),
     195                           (None, None))
     196          self.assertEqual(find_user_pass("c", "http://example.com/baz/"),
     197                           ('baz', 'ninini'))
     198          self.assertEqual(find_user_pass("c", "http://example.com/baz/bar"),
     199                           ('baz', 'ninini'))
     200  
     201          # For the same path, newer password should be considered.
     202  
     203          add("b", "http://example.com/", "first", "blah")
     204          add("b", "http://example.com/", "second", "spam")
     205  
     206          self.assertEqual(find_user_pass("b", "http://example.com/"),
     207                           ('second', 'spam'))
     208  
     209          # No special relationship between a.example.com and example.com:
     210  
     211          add("a", "http://example.com", "1", "a")
     212          self.assertEqual(find_user_pass("a", "http://example.com/"),
     213                           ('1', 'a'))
     214  
     215          self.assertEqual(find_user_pass("a", "http://a.example.com/"),
     216                           (None, None))
     217  
     218          # Ports:
     219  
     220          self.assertEqual(find_user_pass("Some Realm", "c.example.com"),
     221                           (None, None))
     222          self.assertEqual(find_user_pass("Some Realm", "c.example.com:3128"),
     223                           ('3', 'c'))
     224          self.assertEqual(
     225              find_user_pass("Some Realm", "http://c.example.com:3128"),
     226              ('3', 'c'))
     227          self.assertEqual(find_user_pass("Some Realm", "d.example.com"),
     228                           ('4', 'd'))
     229          self.assertEqual(find_user_pass("Some Realm", "e.example.com:3128"),
     230                           ('5', 'e'))
     231  
     232      def test_password_manager_default_port(self):
     233          """
     234          The point to note here is that we can't guess the default port if
     235          there's no scheme.  This applies to both add_password and
     236          find_user_password.
     237          """
     238          mgr = urllib.request.HTTPPasswordMgr()
     239          add = mgr.add_password
     240          find_user_pass = mgr.find_user_password
     241          add("f", "http://g.example.com:80", "10", "j")
     242          add("g", "http://h.example.com", "11", "k")
     243          add("h", "i.example.com:80", "12", "l")
     244          add("i", "j.example.com", "13", "m")
     245          self.assertEqual(find_user_pass("f", "g.example.com:100"),
     246                           (None, None))
     247          self.assertEqual(find_user_pass("f", "g.example.com:80"),
     248                           ('10', 'j'))
     249          self.assertEqual(find_user_pass("f", "g.example.com"),
     250                           (None, None))
     251          self.assertEqual(find_user_pass("f", "http://g.example.com:100"),
     252                           (None, None))
     253          self.assertEqual(find_user_pass("f", "http://g.example.com:80"),
     254                           ('10', 'j'))
     255          self.assertEqual(find_user_pass("f", "http://g.example.com"),
     256                           ('10', 'j'))
     257          self.assertEqual(find_user_pass("g", "h.example.com"), ('11', 'k'))
     258          self.assertEqual(find_user_pass("g", "h.example.com:80"), ('11', 'k'))
     259          self.assertEqual(find_user_pass("g", "http://h.example.com:80"),
     260                           ('11', 'k'))
     261          self.assertEqual(find_user_pass("h", "i.example.com"), (None, None))
     262          self.assertEqual(find_user_pass("h", "i.example.com:80"), ('12', 'l'))
     263          self.assertEqual(find_user_pass("h", "http://i.example.com:80"),
     264                           ('12', 'l'))
     265          self.assertEqual(find_user_pass("i", "j.example.com"), ('13', 'm'))
     266          self.assertEqual(find_user_pass("i", "j.example.com:80"),
     267                           (None, None))
     268          self.assertEqual(find_user_pass("i", "http://j.example.com"),
     269                           ('13', 'm'))
     270          self.assertEqual(find_user_pass("i", "http://j.example.com:80"),
     271                           (None, None))
     272  
     273  
     274  class ESC[4;38;5;81mMockOpener:
     275      addheaders = []
     276  
     277      def open(self, req, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
     278          self.req, self.data, self.timeout = req, data, timeout
     279  
     280      def error(self, proto, *args):
     281          self.proto, self.args = proto, args
     282  
     283  
     284  class ESC[4;38;5;81mMockFile:
     285      def read(self, count=None):
     286          pass
     287  
     288      def readline(self, count=None):
     289          pass
     290  
     291      def close(self):
     292          pass
     293  
     294  
     295  class ESC[4;38;5;81mMockHeaders(ESC[4;38;5;149mdict):
     296      def getheaders(self, name):
     297          return list(self.values())
     298  
     299  
     300  class ESC[4;38;5;81mMockResponse(ESC[4;38;5;149mioESC[4;38;5;149m.ESC[4;38;5;149mStringIO):
     301      def __init__(self, code, msg, headers, data, url=None):
     302          io.StringIO.__init__(self, data)
     303          self.code, self.msg, self.headers, self.url = code, msg, headers, url
     304  
     305      def info(self):
     306          return self.headers
     307  
     308      def geturl(self):
     309          return self.url
     310  
     311  
     312  class ESC[4;38;5;81mMockCookieJar:
     313      def add_cookie_header(self, request):
     314          self.ach_req = request
     315  
     316      def extract_cookies(self, response, request):
     317          self.ec_req, self.ec_r = request, response
     318  
     319  
     320  class ESC[4;38;5;81mFakeMethod:
     321      def __init__(self, meth_name, action, handle):
     322          self.meth_name = meth_name
     323          self.handle = handle
     324          self.action = action
     325  
     326      def __call__(self, *args):
     327          return self.handle(self.meth_name, self.action, *args)
     328  
     329  
     330  class ESC[4;38;5;81mMockHTTPResponse(ESC[4;38;5;149mioESC[4;38;5;149m.ESC[4;38;5;149mIOBase):
     331      def __init__(self, fp, msg, status, reason):
     332          self.fp = fp
     333          self.msg = msg
     334          self.status = status
     335          self.reason = reason
     336          self.code = 200
     337  
     338      def read(self):
     339          return ''
     340  
     341      def info(self):
     342          return {}
     343  
     344      def geturl(self):
     345          return self.url
     346  
     347  
     348  class ESC[4;38;5;81mMockHTTPClass:
     349      def __init__(self):
     350          self.level = 0
     351          self.req_headers = []
     352          self.data = None
     353          self.raise_on_endheaders = False
     354          self.sock = None
     355          self._tunnel_headers = {}
     356  
     357      def __call__(self, host, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
     358          self.host = host
     359          self.timeout = timeout
     360          return self
     361  
     362      def set_debuglevel(self, level):
     363          self.level = level
     364  
     365      def set_tunnel(self, host, port=None, headers=None):
     366          self._tunnel_host = host
     367          self._tunnel_port = port
     368          if headers:
     369              self._tunnel_headers = headers
     370          else:
     371              self._tunnel_headers.clear()
     372  
     373      def request(self, method, url, body=None, headers=None, *,
     374                  encode_chunked=False):
     375          self.method = method
     376          self.selector = url
     377          if headers is not None:
     378              self.req_headers += headers.items()
     379          self.req_headers.sort()
     380          if body:
     381              self.data = body
     382          self.encode_chunked = encode_chunked
     383          if self.raise_on_endheaders:
     384              raise OSError()
     385  
     386      def getresponse(self):
     387          return MockHTTPResponse(MockFile(), {}, 200, "OK")
     388  
     389      def close(self):
     390          pass
     391  
     392  
     393  class ESC[4;38;5;81mMockHandler:
     394      # useful for testing handler machinery
     395      # see add_ordered_mock_handlers() docstring
     396      handler_order = 500
     397  
     398      def __init__(self, methods):
     399          self._define_methods(methods)
     400  
     401      def _define_methods(self, methods):
     402          for spec in methods:
     403              if len(spec) == 2:
     404                  name, action = spec
     405              else:
     406                  name, action = spec, None
     407              meth = FakeMethod(name, action, self.handle)
     408              setattr(self.__class__, name, meth)
     409  
     410      def handle(self, fn_name, action, *args, **kwds):
     411          self.parent.calls.append((self, fn_name, args, kwds))
     412          if action is None:
     413              return None
     414          elif action == "return self":
     415              return self
     416          elif action == "return response":
     417              res = MockResponse(200, "OK", {}, "")
     418              return res
     419          elif action == "return request":
     420              return Request("http://blah/")
     421          elif action.startswith("error"):
     422              code = action[action.rfind(" ")+1:]
     423              try:
     424                  code = int(code)
     425              except ValueError:
     426                  pass
     427              res = MockResponse(200, "OK", {}, "")
     428              return self.parent.error("http", args[0], res, code, "", {})
     429          elif action == "raise":
     430              raise urllib.error.URLError("blah")
     431          assert False
     432  
     433      def close(self):
     434          pass
     435  
     436      def add_parent(self, parent):
     437          self.parent = parent
     438          self.parent.calls = []
     439  
     440      def __lt__(self, other):
     441          if not hasattr(other, "handler_order"):
     442              # No handler_order, leave in original order.  Yuck.
     443              return True
     444          return self.handler_order < other.handler_order
     445  
     446  
     447  def add_ordered_mock_handlers(opener, meth_spec):
     448      """Create MockHandlers and add them to an OpenerDirector.
     449  
     450      meth_spec: list of lists of tuples and strings defining methods to define
     451      on handlers.  eg:
     452  
     453      [["http_error", "ftp_open"], ["http_open"]]
     454  
     455      defines methods .http_error() and .ftp_open() on one handler, and
     456      .http_open() on another.  These methods just record their arguments and
     457      return None.  Using a tuple instead of a string causes the method to
     458      perform some action (see MockHandler.handle()), eg:
     459  
     460      [["http_error"], [("http_open", "return request")]]
     461  
     462      defines .http_error() on one handler (which simply returns None), and
     463      .http_open() on another handler, which returns a Request object.
     464  
     465      """
     466      handlers = []
     467      count = 0
     468      for meths in meth_spec:
     469          class ESC[4;38;5;81mMockHandlerSubclass(ESC[4;38;5;149mMockHandler):
     470              pass
     471  
     472          h = MockHandlerSubclass(meths)
     473          h.handler_order += count
     474          h.add_parent(opener)
     475          count = count + 1
     476          handlers.append(h)
     477          opener.add_handler(h)
     478      return handlers
     479  
     480  
     481  def build_test_opener(*handler_instances):
     482      opener = OpenerDirector()
     483      for h in handler_instances:
     484          opener.add_handler(h)
     485      return opener
     486  
     487  
     488  class ESC[4;38;5;81mMockHTTPHandler(ESC[4;38;5;149murllibESC[4;38;5;149m.ESC[4;38;5;149mrequestESC[4;38;5;149m.ESC[4;38;5;149mBaseHandler):
     489      # useful for testing redirections and auth
     490      # sends supplied headers and code as first response
     491      # sends 200 OK as second response
     492      def __init__(self, code, headers):
     493          self.code = code
     494          self.headers = headers
     495          self.reset()
     496  
     497      def reset(self):
     498          self._count = 0
     499          self.requests = []
     500  
     501      def http_open(self, req):
     502          import email, copy
     503          self.requests.append(copy.deepcopy(req))
     504          if self._count == 0:
     505              self._count = self._count + 1
     506              name = http.client.responses[self.code]
     507              msg = email.message_from_string(self.headers)
     508              return self.parent.error(
     509                  "http", req, MockFile(), self.code, name, msg)
     510          else:
     511              self.req = req
     512              msg = email.message_from_string("\r\n\r\n")
     513              return MockResponse(200, "OK", msg, "", req.get_full_url())
     514  
     515  
     516  class ESC[4;38;5;81mMockHTTPSHandler(ESC[4;38;5;149murllibESC[4;38;5;149m.ESC[4;38;5;149mrequestESC[4;38;5;149m.ESC[4;38;5;149mAbstractHTTPHandler):
     517      # Useful for testing the Proxy-Authorization request by verifying the
     518      # properties of httpcon
     519  
     520      def __init__(self, debuglevel=0):
     521          urllib.request.AbstractHTTPHandler.__init__(self, debuglevel=debuglevel)
     522          self.httpconn = MockHTTPClass()
     523  
     524      def https_open(self, req):
     525          return self.do_open(self.httpconn, req)
     526  
     527  
     528  class ESC[4;38;5;81mMockHTTPHandlerCheckAuth(ESC[4;38;5;149murllibESC[4;38;5;149m.ESC[4;38;5;149mrequestESC[4;38;5;149m.ESC[4;38;5;149mBaseHandler):
     529      # useful for testing auth
     530      # sends supplied code response
     531      # checks if auth header is specified in request
     532      def __init__(self, code):
     533          self.code = code
     534          self.has_auth_header = False
     535  
     536      def reset(self):
     537          self.has_auth_header = False
     538  
     539      def http_open(self, req):
     540          if req.has_header('Authorization'):
     541              self.has_auth_header = True
     542          name = http.client.responses[self.code]
     543          return MockResponse(self.code, name, MockFile(), "", req.get_full_url())
     544  
     545  
     546  
     547  class ESC[4;38;5;81mMockPasswordManager:
     548      def add_password(self, realm, uri, user, password):
     549          self.realm = realm
     550          self.url = uri
     551          self.user = user
     552          self.password = password
     553  
     554      def find_user_password(self, realm, authuri):
     555          self.target_realm = realm
     556          self.target_url = authuri
     557          return self.user, self.password
     558  
     559  
     560  class ESC[4;38;5;81mOpenerDirectorTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     561  
     562      def test_add_non_handler(self):
     563          class ESC[4;38;5;81mNonHandler(ESC[4;38;5;149mobject):
     564              pass
     565          self.assertRaises(TypeError,
     566                            OpenerDirector().add_handler, NonHandler())
     567  
     568      def test_badly_named_methods(self):
     569          # test work-around for three methods that accidentally follow the
     570          # naming conventions for handler methods
     571          # (*_open() / *_request() / *_response())
     572  
     573          # These used to call the accidentally-named methods, causing a
     574          # TypeError in real code; here, returning self from these mock
     575          # methods would either cause no exception, or AttributeError.
     576  
     577          from urllib.error import URLError
     578  
     579          o = OpenerDirector()
     580          meth_spec = [
     581              [("do_open", "return self"), ("proxy_open", "return self")],
     582              [("redirect_request", "return self")],
     583              ]
     584          add_ordered_mock_handlers(o, meth_spec)
     585          o.add_handler(urllib.request.UnknownHandler())
     586          for scheme in "do", "proxy", "redirect":
     587              self.assertRaises(URLError, o.open, scheme+"://example.com/")
     588  
     589      def test_handled(self):
     590          # handler returning non-None means no more handlers will be called
     591          o = OpenerDirector()
     592          meth_spec = [
     593              ["http_open", "ftp_open", "http_error_302"],
     594              ["ftp_open"],
     595              [("http_open", "return self")],
     596              [("http_open", "return self")],
     597              ]
     598          handlers = add_ordered_mock_handlers(o, meth_spec)
     599  
     600          req = Request("http://example.com/")
     601          r = o.open(req)
     602          # Second .http_open() gets called, third doesn't, since second returned
     603          # non-None.  Handlers without .http_open() never get any methods called
     604          # on them.
     605          # In fact, second mock handler defining .http_open() returns self
     606          # (instead of response), which becomes the OpenerDirector's return
     607          # value.
     608          self.assertEqual(r, handlers[2])
     609          calls = [(handlers[0], "http_open"), (handlers[2], "http_open")]
     610          for expected, got in zip(calls, o.calls):
     611              handler, name, args, kwds = got
     612              self.assertEqual((handler, name), expected)
     613              self.assertEqual(args, (req,))
     614  
     615      def test_handler_order(self):
     616          o = OpenerDirector()
     617          handlers = []
     618          for meths, handler_order in [([("http_open", "return self")], 500),
     619                                       (["http_open"], 0)]:
     620              class ESC[4;38;5;81mMockHandlerSubclass(ESC[4;38;5;149mMockHandler):
     621                  pass
     622  
     623              h = MockHandlerSubclass(meths)
     624              h.handler_order = handler_order
     625              handlers.append(h)
     626              o.add_handler(h)
     627  
     628          o.open("http://example.com/")
     629          # handlers called in reverse order, thanks to their sort order
     630          self.assertEqual(o.calls[0][0], handlers[1])
     631          self.assertEqual(o.calls[1][0], handlers[0])
     632  
     633      def test_raise(self):
     634          # raising URLError stops processing of request
     635          o = OpenerDirector()
     636          meth_spec = [
     637              [("http_open", "raise")],
     638              [("http_open", "return self")],
     639              ]
     640          handlers = add_ordered_mock_handlers(o, meth_spec)
     641  
     642          req = Request("http://example.com/")
     643          self.assertRaises(urllib.error.URLError, o.open, req)
     644          self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})])
     645  
     646      def test_http_error(self):
     647          # XXX http_error_default
     648          # http errors are a special case
     649          o = OpenerDirector()
     650          meth_spec = [
     651              [("http_open", "error 302")],
     652              [("http_error_400", "raise"), "http_open"],
     653              [("http_error_302", "return response"), "http_error_303",
     654               "http_error"],
     655              [("http_error_302")],
     656              ]
     657          handlers = add_ordered_mock_handlers(o, meth_spec)
     658          req = Request("http://example.com/")
     659          o.open(req)
     660          assert len(o.calls) == 2
     661          calls = [(handlers[0], "http_open", (req,)),
     662                   (handlers[2], "http_error_302",
     663                    (req, support.ALWAYS_EQ, 302, "", {}))]
     664          for expected, got in zip(calls, o.calls):
     665              handler, method_name, args = expected
     666              self.assertEqual((handler, method_name), got[:2])
     667              self.assertEqual(args, got[2])
     668  
     669      def test_processors(self):
     670          # *_request / *_response methods get called appropriately
     671          o = OpenerDirector()
     672          meth_spec = [
     673              [("http_request", "return request"),
     674               ("http_response", "return response")],
     675              [("http_request", "return request"),
     676               ("http_response", "return response")],
     677              ]
     678          handlers = add_ordered_mock_handlers(o, meth_spec)
     679  
     680          req = Request("http://example.com/")
     681          o.open(req)
     682          # processor methods are called on *all* handlers that define them,
     683          # not just the first handler that handles the request
     684          calls = [
     685              (handlers[0], "http_request"), (handlers[1], "http_request"),
     686              (handlers[0], "http_response"), (handlers[1], "http_response")]
     687  
     688          for i, (handler, name, args, kwds) in enumerate(o.calls):
     689              if i < 2:
     690                  # *_request
     691                  self.assertEqual((handler, name), calls[i])
     692                  self.assertEqual(len(args), 1)
     693                  self.assertIsInstance(args[0], Request)
     694              else:
     695                  # *_response
     696                  self.assertEqual((handler, name), calls[i])
     697                  self.assertEqual(len(args), 2)
     698                  self.assertIsInstance(args[0], Request)
     699                  # response from opener.open is None, because there's no
     700                  # handler that defines http_open to handle it
     701                  if args[1] is not None:
     702                      self.assertIsInstance(args[1], MockResponse)
     703  
     704  
     705  def sanepathname2url(path):
     706      try:
     707          path.encode("utf-8")
     708      except UnicodeEncodeError:
     709          raise unittest.SkipTest("path is not encodable to utf8")
     710      urlpath = urllib.request.pathname2url(path)
     711      if os.name == "nt" and urlpath.startswith("///"):
     712          urlpath = urlpath[2:]
     713      # XXX don't ask me about the mac...
     714      return urlpath
     715  
     716  
     717  class ESC[4;38;5;81mHandlerTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     718  
     719      def test_ftp(self):
     720          class ESC[4;38;5;81mMockFTPWrapper:
     721              def __init__(self, data):
     722                  self.data = data
     723  
     724              def retrfile(self, filename, filetype):
     725                  self.filename, self.filetype = filename, filetype
     726                  return io.StringIO(self.data), len(self.data)
     727  
     728              def close(self):
     729                  pass
     730  
     731          class ESC[4;38;5;81mNullFTPHandler(ESC[4;38;5;149murllibESC[4;38;5;149m.ESC[4;38;5;149mrequestESC[4;38;5;149m.ESC[4;38;5;149mFTPHandler):
     732              def __init__(self, data):
     733                  self.data = data
     734  
     735              def connect_ftp(self, user, passwd, host, port, dirs,
     736                              timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
     737                  self.user, self.passwd = user, passwd
     738                  self.host, self.port = host, port
     739                  self.dirs = dirs
     740                  self.ftpwrapper = MockFTPWrapper(self.data)
     741                  return self.ftpwrapper
     742  
     743          import ftplib
     744          data = "rheum rhaponicum"
     745          h = NullFTPHandler(data)
     746          h.parent = MockOpener()
     747  
     748          for url, host, port, user, passwd, type_, dirs, filename, mimetype in [
     749              ("ftp://localhost/foo/bar/baz.html",
     750               "localhost", ftplib.FTP_PORT, "", "", "I",
     751               ["foo", "bar"], "baz.html", "text/html"),
     752              ("ftp://parrot@localhost/foo/bar/baz.html",
     753               "localhost", ftplib.FTP_PORT, "parrot", "", "I",
     754               ["foo", "bar"], "baz.html", "text/html"),
     755              ("ftp://%25parrot@localhost/foo/bar/baz.html",
     756               "localhost", ftplib.FTP_PORT, "%parrot", "", "I",
     757               ["foo", "bar"], "baz.html", "text/html"),
     758              ("ftp://%2542parrot@localhost/foo/bar/baz.html",
     759               "localhost", ftplib.FTP_PORT, "%42parrot", "", "I",
     760               ["foo", "bar"], "baz.html", "text/html"),
     761              ("ftp://localhost:80/foo/bar/",
     762               "localhost", 80, "", "", "D",
     763               ["foo", "bar"], "", None),
     764              ("ftp://localhost/baz.gif;type=a",
     765               "localhost", ftplib.FTP_PORT, "", "", "A",
     766               [], "baz.gif", None),  # XXX really this should guess image/gif
     767              ]:
     768              req = Request(url)
     769              req.timeout = None
     770              r = h.ftp_open(req)
     771              # ftp authentication not yet implemented by FTPHandler
     772              self.assertEqual(h.user, user)
     773              self.assertEqual(h.passwd, passwd)
     774              self.assertEqual(h.host, socket.gethostbyname(host))
     775              self.assertEqual(h.port, port)
     776              self.assertEqual(h.dirs, dirs)
     777              self.assertEqual(h.ftpwrapper.filename, filename)
     778              self.assertEqual(h.ftpwrapper.filetype, type_)
     779              headers = r.info()
     780              self.assertEqual(headers.get("Content-type"), mimetype)
     781              self.assertEqual(int(headers["Content-length"]), len(data))
     782  
     783      def test_file(self):
     784          import email.utils
     785          h = urllib.request.FileHandler()
     786          o = h.parent = MockOpener()
     787  
     788          TESTFN = os_helper.TESTFN
     789          urlpath = sanepathname2url(os.path.abspath(TESTFN))
     790          towrite = b"hello, world\n"
     791          urls = [
     792              "file://localhost%s" % urlpath,
     793              "file://%s" % urlpath,
     794              "file://%s%s" % (socket.gethostbyname('localhost'), urlpath),
     795              ]
     796          try:
     797              localaddr = socket.gethostbyname(socket.gethostname())
     798          except socket.gaierror:
     799              localaddr = ''
     800          if localaddr:
     801              urls.append("file://%s%s" % (localaddr, urlpath))
     802  
     803          for url in urls:
     804              f = open(TESTFN, "wb")
     805              try:
     806                  try:
     807                      f.write(towrite)
     808                  finally:
     809                      f.close()
     810  
     811                  r = h.file_open(Request(url))
     812                  try:
     813                      data = r.read()
     814                      headers = r.info()
     815                      respurl = r.geturl()
     816                  finally:
     817                      r.close()
     818                  stats = os.stat(TESTFN)
     819                  modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
     820              finally:
     821                  os.remove(TESTFN)
     822              self.assertEqual(data, towrite)
     823              self.assertEqual(headers["Content-type"], "text/plain")
     824              self.assertEqual(headers["Content-length"], "13")
     825              self.assertEqual(headers["Last-modified"], modified)
     826              self.assertEqual(respurl, url)
     827  
     828          for url in [
     829              "file://localhost:80%s" % urlpath,
     830              "file:///file_does_not_exist.txt",
     831              "file://not-a-local-host.com//dir/file.txt",
     832              "file://%s:80%s/%s" % (socket.gethostbyname('localhost'),
     833                                     os.getcwd(), TESTFN),
     834              "file://somerandomhost.ontheinternet.com%s/%s" %
     835              (os.getcwd(), TESTFN),
     836              ]:
     837              try:
     838                  f = open(TESTFN, "wb")
     839                  try:
     840                      f.write(towrite)
     841                  finally:
     842                      f.close()
     843  
     844                  self.assertRaises(urllib.error.URLError,
     845                                    h.file_open, Request(url))
     846              finally:
     847                  os.remove(TESTFN)
     848  
     849          h = urllib.request.FileHandler()
     850          o = h.parent = MockOpener()
     851          # XXXX why does // mean ftp (and /// mean not ftp!), and where
     852          #  is file: scheme specified?  I think this is really a bug, and
     853          #  what was intended was to distinguish between URLs like:
     854          # file:/blah.txt (a file)
     855          # file://localhost/blah.txt (a file)
     856          # file:///blah.txt (a file)
     857          # file://ftp.example.com/blah.txt (an ftp URL)
     858          for url, ftp in [
     859              ("file://ftp.example.com//foo.txt", False),
     860              ("file://ftp.example.com///foo.txt", False),
     861              ("file://ftp.example.com/foo.txt", False),
     862              ("file://somehost//foo/something.txt", False),
     863              ("file://localhost//foo/something.txt", False),
     864              ]:
     865              req = Request(url)
     866              try:
     867                  h.file_open(req)
     868              except urllib.error.URLError:
     869                  self.assertFalse(ftp)
     870              else:
     871                  self.assertIs(o.req, req)
     872                  self.assertEqual(req.type, "ftp")
     873              self.assertEqual(req.type == "ftp", ftp)
     874  
     875      def test_http(self):
     876  
     877          h = urllib.request.AbstractHTTPHandler()
     878          o = h.parent = MockOpener()
     879  
     880          url = "http://example.com/"
     881          for method, data in [("GET", None), ("POST", b"blah")]:
     882              req = Request(url, data, {"Foo": "bar"})
     883              req.timeout = None
     884              req.add_unredirected_header("Spam", "eggs")
     885              http = MockHTTPClass()
     886              r = h.do_open(http, req)
     887  
     888              # result attributes
     889              r.read; r.readline  # wrapped MockFile methods
     890              r.info; r.geturl  # addinfourl methods
     891              r.code, r.msg == 200, "OK"  # added from MockHTTPClass.getreply()
     892              hdrs = r.info()
     893              hdrs.get; hdrs.__contains__  # r.info() gives dict from .getreply()
     894              self.assertEqual(r.geturl(), url)
     895  
     896              self.assertEqual(http.host, "example.com")
     897              self.assertEqual(http.level, 0)
     898              self.assertEqual(http.method, method)
     899              self.assertEqual(http.selector, "/")
     900              self.assertEqual(http.req_headers,
     901                               [("Connection", "close"),
     902                                ("Foo", "bar"), ("Spam", "eggs")])
     903              self.assertEqual(http.data, data)
     904  
     905          # check OSError converted to URLError
     906          http.raise_on_endheaders = True
     907          self.assertRaises(urllib.error.URLError, h.do_open, http, req)
     908  
     909          # Check for TypeError on POST data which is str.
     910          req = Request("http://example.com/","badpost")
     911          self.assertRaises(TypeError, h.do_request_, req)
     912  
     913          # check adding of standard headers
     914          o.addheaders = [("Spam", "eggs")]
     915          for data in b"", None:  # POST, GET
     916              req = Request("http://example.com/", data)
     917              r = MockResponse(200, "OK", {}, "")
     918              newreq = h.do_request_(req)
     919              if data is None:  # GET
     920                  self.assertNotIn("Content-length", req.unredirected_hdrs)
     921                  self.assertNotIn("Content-type", req.unredirected_hdrs)
     922              else:  # POST
     923                  self.assertEqual(req.unredirected_hdrs["Content-length"], "0")
     924                  self.assertEqual(req.unredirected_hdrs["Content-type"],
     925                               "application/x-www-form-urlencoded")
     926              # XXX the details of Host could be better tested
     927              self.assertEqual(req.unredirected_hdrs["Host"], "example.com")
     928              self.assertEqual(req.unredirected_hdrs["Spam"], "eggs")
     929  
     930              # don't clobber existing headers
     931              req.add_unredirected_header("Content-length", "foo")
     932              req.add_unredirected_header("Content-type", "bar")
     933              req.add_unredirected_header("Host", "baz")
     934              req.add_unredirected_header("Spam", "foo")
     935              newreq = h.do_request_(req)
     936              self.assertEqual(req.unredirected_hdrs["Content-length"], "foo")
     937              self.assertEqual(req.unredirected_hdrs["Content-type"], "bar")
     938              self.assertEqual(req.unredirected_hdrs["Host"], "baz")
     939              self.assertEqual(req.unredirected_hdrs["Spam"], "foo")
     940  
     941      def test_http_body_file(self):
     942          # A regular file - chunked encoding is used unless Content Length is
     943          # already set.
     944  
     945          h = urllib.request.AbstractHTTPHandler()
     946          o = h.parent = MockOpener()
     947  
     948          file_obj = tempfile.NamedTemporaryFile(mode='w+b', delete=False)
     949          file_path = file_obj.name
     950          file_obj.close()
     951          self.addCleanup(os.unlink, file_path)
     952  
     953          with open(file_path, "rb") as f:
     954              req = Request("http://example.com/", f, {})
     955              newreq = h.do_request_(req)
     956              te = newreq.get_header('Transfer-encoding')
     957              self.assertEqual(te, "chunked")
     958              self.assertFalse(newreq.has_header('Content-length'))
     959  
     960          with open(file_path, "rb") as f:
     961              req = Request("http://example.com/", f, {"Content-Length": 30})
     962              newreq = h.do_request_(req)
     963              self.assertEqual(int(newreq.get_header('Content-length')), 30)
     964              self.assertFalse(newreq.has_header("Transfer-encoding"))
     965  
     966      def test_http_body_fileobj(self):
     967          # A file object - chunked encoding is used
     968          # unless Content Length is already set.
     969          # (Note that there are some subtle differences to a regular
     970          # file, that is why we are testing both cases.)
     971  
     972          h = urllib.request.AbstractHTTPHandler()
     973          o = h.parent = MockOpener()
     974          file_obj = io.BytesIO()
     975  
     976          req = Request("http://example.com/", file_obj, {})
     977          newreq = h.do_request_(req)
     978          self.assertEqual(newreq.get_header('Transfer-encoding'), 'chunked')
     979          self.assertFalse(newreq.has_header('Content-length'))
     980  
     981          headers = {"Content-Length": 30}
     982          req = Request("http://example.com/", file_obj, headers)
     983          newreq = h.do_request_(req)
     984          self.assertEqual(int(newreq.get_header('Content-length')), 30)
     985          self.assertFalse(newreq.has_header("Transfer-encoding"))
     986  
     987          file_obj.close()
     988  
     989      def test_http_body_pipe(self):
     990          # A file reading from a pipe.
     991          # A pipe cannot be seek'ed.  There is no way to determine the
     992          # content length up front.  Thus, do_request_() should fall
     993          # back to Transfer-encoding chunked.
     994  
     995          h = urllib.request.AbstractHTTPHandler()
     996          o = h.parent = MockOpener()
     997  
     998          cmd = [sys.executable, "-c", r"pass"]
     999          for headers in {}, {"Content-Length": 30}:
    1000              with subprocess.Popen(cmd, stdout=subprocess.PIPE) as proc:
    1001                  req = Request("http://example.com/", proc.stdout, headers)
    1002                  newreq = h.do_request_(req)
    1003                  if not headers:
    1004                      self.assertEqual(newreq.get_header('Content-length'), None)
    1005                      self.assertEqual(newreq.get_header('Transfer-encoding'),
    1006                                       'chunked')
    1007                  else:
    1008                      self.assertEqual(int(newreq.get_header('Content-length')),
    1009                                       30)
    1010  
    1011      def test_http_body_iterable(self):
    1012          # Generic iterable.  There is no way to determine the content
    1013          # length up front.  Fall back to Transfer-encoding chunked.
    1014  
    1015          h = urllib.request.AbstractHTTPHandler()
    1016          o = h.parent = MockOpener()
    1017  
    1018          def iterable_body():
    1019              yield b"one"
    1020  
    1021          for headers in {}, {"Content-Length": 11}:
    1022              req = Request("http://example.com/", iterable_body(), headers)
    1023              newreq = h.do_request_(req)
    1024              if not headers:
    1025                  self.assertEqual(newreq.get_header('Content-length'), None)
    1026                  self.assertEqual(newreq.get_header('Transfer-encoding'),
    1027                                   'chunked')
    1028              else:
    1029                  self.assertEqual(int(newreq.get_header('Content-length')), 11)
    1030  
    1031      def test_http_body_empty_seq(self):
    1032          # Zero-length iterable body should be treated like any other iterable
    1033          h = urllib.request.AbstractHTTPHandler()
    1034          h.parent = MockOpener()
    1035          req = h.do_request_(Request("http://example.com/", ()))
    1036          self.assertEqual(req.get_header("Transfer-encoding"), "chunked")
    1037          self.assertFalse(req.has_header("Content-length"))
    1038  
    1039      def test_http_body_array(self):
    1040          # array.array Iterable - Content Length is calculated
    1041  
    1042          h = urllib.request.AbstractHTTPHandler()
    1043          o = h.parent = MockOpener()
    1044  
    1045          iterable_array = array.array("I",[1,2,3,4])
    1046  
    1047          for headers in {}, {"Content-Length": 16}:
    1048              req = Request("http://example.com/", iterable_array, headers)
    1049              newreq = h.do_request_(req)
    1050              self.assertEqual(int(newreq.get_header('Content-length')),16)
    1051  
    1052      def test_http_handler_debuglevel(self):
    1053          o = OpenerDirector()
    1054          h = MockHTTPSHandler(debuglevel=1)
    1055          o.add_handler(h)
    1056          o.open("https://www.example.com")
    1057          self.assertEqual(h._debuglevel, 1)
    1058  
    1059      def test_http_doubleslash(self):
    1060          # Checks the presence of any unnecessary double slash in url does not
    1061          # break anything. Previously, a double slash directly after the host
    1062          # could cause incorrect parsing.
    1063          h = urllib.request.AbstractHTTPHandler()
    1064          h.parent = MockOpener()
    1065  
    1066          data = b""
    1067          ds_urls = [
    1068              "http://example.com/foo/bar/baz.html",
    1069              "http://example.com//foo/bar/baz.html",
    1070              "http://example.com/foo//bar/baz.html",
    1071              "http://example.com/foo/bar//baz.html"
    1072              ]
    1073  
    1074          for ds_url in ds_urls:
    1075              ds_req = Request(ds_url, data)
    1076  
    1077              # Check whether host is determined correctly if there is no proxy
    1078              np_ds_req = h.do_request_(ds_req)
    1079              self.assertEqual(np_ds_req.unredirected_hdrs["Host"], "example.com")
    1080  
    1081              # Check whether host is determined correctly if there is a proxy
    1082              ds_req.set_proxy("someproxy:3128", None)
    1083              p_ds_req = h.do_request_(ds_req)
    1084              self.assertEqual(p_ds_req.unredirected_hdrs["Host"], "example.com")
    1085  
    1086      def test_full_url_setter(self):
    1087          # Checks to ensure that components are set correctly after setting the
    1088          # full_url of a Request object
    1089  
    1090          urls = [
    1091              'http://example.com?foo=bar#baz',
    1092              'http://example.com?foo=bar&spam=eggs#bash',
    1093              'http://example.com',
    1094          ]
    1095  
    1096          # testing a reusable request instance, but the url parameter is
    1097          # required, so just use a dummy one to instantiate
    1098          r = Request('http://example.com')
    1099          for url in urls:
    1100              r.full_url = url
    1101              parsed = urlparse(url)
    1102  
    1103              self.assertEqual(r.get_full_url(), url)
    1104              # full_url setter uses splittag to split into components.
    1105              # splittag sets the fragment as None while urlparse sets it to ''
    1106              self.assertEqual(r.fragment or '', parsed.fragment)
    1107              self.assertEqual(urlparse(r.get_full_url()).query, parsed.query)
    1108  
    1109      def test_full_url_deleter(self):
    1110          r = Request('http://www.example.com')
    1111          del r.full_url
    1112          self.assertIsNone(r.full_url)
    1113          self.assertIsNone(r.fragment)
    1114          self.assertEqual(r.selector, '')
    1115  
    1116      def test_fixpath_in_weirdurls(self):
    1117          # Issue4493: urllib2 to supply '/' when to urls where path does not
    1118          # start with'/'
    1119  
    1120          h = urllib.request.AbstractHTTPHandler()
    1121          h.parent = MockOpener()
    1122  
    1123          weird_url = 'http://www.python.org?getspam'
    1124          req = Request(weird_url)
    1125          newreq = h.do_request_(req)
    1126          self.assertEqual(newreq.host, 'www.python.org')
    1127          self.assertEqual(newreq.selector, '/?getspam')
    1128  
    1129          url_without_path = 'http://www.python.org'
    1130          req = Request(url_without_path)
    1131          newreq = h.do_request_(req)
    1132          self.assertEqual(newreq.host, 'www.python.org')
    1133          self.assertEqual(newreq.selector, '')
    1134  
    1135      def test_errors(self):
    1136          h = urllib.request.HTTPErrorProcessor()
    1137          o = h.parent = MockOpener()
    1138  
    1139          url = "http://example.com/"
    1140          req = Request(url)
    1141          # all 2xx are passed through
    1142          r = MockResponse(200, "OK", {}, "", url)
    1143          newr = h.http_response(req, r)
    1144          self.assertIs(r, newr)
    1145          self.assertFalse(hasattr(o, "proto"))  # o.error not called
    1146          r = MockResponse(202, "Accepted", {}, "", url)
    1147          newr = h.http_response(req, r)
    1148          self.assertIs(r, newr)
    1149          self.assertFalse(hasattr(o, "proto"))  # o.error not called
    1150          r = MockResponse(206, "Partial content", {}, "", url)
    1151          newr = h.http_response(req, r)
    1152          self.assertIs(r, newr)
    1153          self.assertFalse(hasattr(o, "proto"))  # o.error not called
    1154          # anything else calls o.error (and MockOpener returns None, here)
    1155          r = MockResponse(502, "Bad gateway", {}, "", url)
    1156          self.assertIsNone(h.http_response(req, r))
    1157          self.assertEqual(o.proto, "http")  # o.error called
    1158          self.assertEqual(o.args, (req, r, 502, "Bad gateway", {}))
    1159  
    1160      def test_cookies(self):
    1161          cj = MockCookieJar()
    1162          h = urllib.request.HTTPCookieProcessor(cj)
    1163          h.parent = MockOpener()
    1164  
    1165          req = Request("http://example.com/")
    1166          r = MockResponse(200, "OK", {}, "")
    1167          newreq = h.http_request(req)
    1168          self.assertIs(cj.ach_req, req)
    1169          self.assertIs(cj.ach_req, newreq)
    1170          self.assertEqual(req.origin_req_host, "example.com")
    1171          self.assertFalse(req.unverifiable)
    1172          newr = h.http_response(req, r)
    1173          self.assertIs(cj.ec_req, req)
    1174          self.assertIs(cj.ec_r, r)
    1175          self.assertIs(r, newr)
    1176  
    1177      def test_redirect(self):
    1178          from_url = "http://example.com/a.html"
    1179          to_url = "http://example.com/b.html"
    1180          h = urllib.request.HTTPRedirectHandler()
    1181          o = h.parent = MockOpener()
    1182  
    1183          # ordinary redirect behaviour
    1184          for code in 301, 302, 303, 307, 308:
    1185              for data in None, "blah\nblah\n":
    1186                  method = getattr(h, "http_error_%s" % code)
    1187                  req = Request(from_url, data)
    1188                  req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
    1189                  req.add_header("Nonsense", "viking=withhold")
    1190                  if data is not None:
    1191                      req.add_header("Content-Length", str(len(data)))
    1192                  req.add_unredirected_header("Spam", "spam")
    1193                  try:
    1194                      method(req, MockFile(), code, "Blah",
    1195                             MockHeaders({"location": to_url}))
    1196                  except urllib.error.HTTPError:
    1197                      # 307 and 308 in response to POST require user OK
    1198                      self.assertIn(code, (307, 308))
    1199                      self.assertIsNotNone(data)
    1200                  self.assertEqual(o.req.get_full_url(), to_url)
    1201                  try:
    1202                      self.assertEqual(o.req.get_method(), "GET")
    1203                  except AttributeError:
    1204                      self.assertFalse(o.req.data)
    1205  
    1206                  # now it's a GET, there should not be headers regarding content
    1207                  # (possibly dragged from before being a POST)
    1208                  headers = [x.lower() for x in o.req.headers]
    1209                  self.assertNotIn("content-length", headers)
    1210                  self.assertNotIn("content-type", headers)
    1211  
    1212                  self.assertEqual(o.req.headers["Nonsense"],
    1213                                   "viking=withhold")
    1214                  self.assertNotIn("Spam", o.req.headers)
    1215                  self.assertNotIn("Spam", o.req.unredirected_hdrs)
    1216  
    1217          # loop detection
    1218          req = Request(from_url)
    1219          req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
    1220  
    1221          def redirect(h, req, url=to_url):
    1222              h.http_error_302(req, MockFile(), 302, "Blah",
    1223                               MockHeaders({"location": url}))
    1224          # Note that the *original* request shares the same record of
    1225          # redirections with the sub-requests caused by the redirections.
    1226  
    1227          # detect infinite loop redirect of a URL to itself
    1228          req = Request(from_url, origin_req_host="example.com")
    1229          count = 0
    1230          req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
    1231          try:
    1232              while 1:
    1233                  redirect(h, req, "http://example.com/")
    1234                  count = count + 1
    1235          except urllib.error.HTTPError:
    1236              # don't stop until max_repeats, because cookies may introduce state
    1237              self.assertEqual(count, urllib.request.HTTPRedirectHandler.max_repeats)
    1238  
    1239          # detect endless non-repeating chain of redirects
    1240          req = Request(from_url, origin_req_host="example.com")
    1241          count = 0
    1242          req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
    1243          try:
    1244              while 1:
    1245                  redirect(h, req, "http://example.com/%d" % count)
    1246                  count = count + 1
    1247          except urllib.error.HTTPError:
    1248              self.assertEqual(count,
    1249                               urllib.request.HTTPRedirectHandler.max_redirections)
    1250  
    1251      def test_invalid_redirect(self):
    1252          from_url = "http://example.com/a.html"
    1253          valid_schemes = ['http','https','ftp']
    1254          invalid_schemes = ['file','imap','ldap']
    1255          schemeless_url = "example.com/b.html"
    1256          h = urllib.request.HTTPRedirectHandler()
    1257          o = h.parent = MockOpener()
    1258          req = Request(from_url)
    1259          req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
    1260  
    1261          for scheme in invalid_schemes:
    1262              invalid_url = scheme + '://' + schemeless_url
    1263              self.assertRaises(urllib.error.HTTPError, h.http_error_302,
    1264                      req, MockFile(), 302, "Security Loophole",
    1265                      MockHeaders({"location": invalid_url}))
    1266  
    1267          for scheme in valid_schemes:
    1268              valid_url = scheme + '://' + schemeless_url
    1269              h.http_error_302(req, MockFile(), 302, "That's fine",
    1270                  MockHeaders({"location": valid_url}))
    1271              self.assertEqual(o.req.get_full_url(), valid_url)
    1272  
    1273      def test_relative_redirect(self):
    1274          from_url = "http://example.com/a.html"
    1275          relative_url = "/b.html"
    1276          h = urllib.request.HTTPRedirectHandler()
    1277          o = h.parent = MockOpener()
    1278          req = Request(from_url)
    1279          req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
    1280  
    1281          valid_url = urllib.parse.urljoin(from_url,relative_url)
    1282          h.http_error_302(req, MockFile(), 302, "That's fine",
    1283              MockHeaders({"location": valid_url}))
    1284          self.assertEqual(o.req.get_full_url(), valid_url)
    1285  
    1286      def test_cookie_redirect(self):
    1287          # cookies shouldn't leak into redirected requests
    1288          from http.cookiejar import CookieJar
    1289          from test.test_http_cookiejar import interact_netscape
    1290  
    1291          cj = CookieJar()
    1292          interact_netscape(cj, "http://www.example.com/", "spam=eggs")
    1293          hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n")
    1294          hdeh = urllib.request.HTTPDefaultErrorHandler()
    1295          hrh = urllib.request.HTTPRedirectHandler()
    1296          cp = urllib.request.HTTPCookieProcessor(cj)
    1297          o = build_test_opener(hh, hdeh, hrh, cp)
    1298          o.open("http://www.example.com/")
    1299          self.assertFalse(hh.req.has_header("Cookie"))
    1300  
    1301      def test_redirect_fragment(self):
    1302          redirected_url = 'http://www.example.com/index.html#OK\r\n\r\n'
    1303          hh = MockHTTPHandler(302, 'Location: ' + redirected_url)
    1304          hdeh = urllib.request.HTTPDefaultErrorHandler()
    1305          hrh = urllib.request.HTTPRedirectHandler()
    1306          o = build_test_opener(hh, hdeh, hrh)
    1307          fp = o.open('http://www.example.com')
    1308          self.assertEqual(fp.geturl(), redirected_url.strip())
    1309  
    1310      def test_redirect_no_path(self):
    1311          # Issue 14132: Relative redirect strips original path
    1312  
    1313          # clear _opener global variable
    1314          self.addCleanup(urllib.request.urlcleanup)
    1315  
    1316          real_class = http.client.HTTPConnection
    1317          response1 = b"HTTP/1.1 302 Found\r\nLocation: ?query\r\n\r\n"
    1318          http.client.HTTPConnection = test_urllib.fakehttp(response1)
    1319          self.addCleanup(setattr, http.client, "HTTPConnection", real_class)
    1320          urls = iter(("/path", "/path?query"))
    1321          def request(conn, method, url, *pos, **kw):
    1322              self.assertEqual(url, next(urls))
    1323              real_class.request(conn, method, url, *pos, **kw)
    1324              # Change response for subsequent connection
    1325              conn.__class__.fakedata = b"HTTP/1.1 200 OK\r\n\r\nHello!"
    1326          http.client.HTTPConnection.request = request
    1327          fp = urllib.request.urlopen("http://python.org/path")
    1328          self.assertEqual(fp.geturl(), "http://python.org/path?query")
    1329  
    1330      def test_redirect_encoding(self):
    1331          # Some characters in the redirect target may need special handling,
    1332          # but most ASCII characters should be treated as already encoded
    1333          class ESC[4;38;5;81mHandler(ESC[4;38;5;149murllibESC[4;38;5;149m.ESC[4;38;5;149mrequestESC[4;38;5;149m.ESC[4;38;5;149mHTTPHandler):
    1334              def http_open(self, req):
    1335                  result = self.do_open(self.connection, req)
    1336                  self.last_buf = self.connection.buf
    1337                  # Set up a normal response for the next request
    1338                  self.connection = test_urllib.fakehttp(
    1339                      b'HTTP/1.1 200 OK\r\n'
    1340                      b'Content-Length: 3\r\n'
    1341                      b'\r\n'
    1342                      b'123'
    1343                  )
    1344                  return result
    1345          handler = Handler()
    1346          opener = urllib.request.build_opener(handler)
    1347          tests = (
    1348              (b'/p\xC3\xA5-dansk/', b'/p%C3%A5-dansk/'),
    1349              (b'/spaced%20path/', b'/spaced%20path/'),
    1350              (b'/spaced path/', b'/spaced%20path/'),
    1351              (b'/?p\xC3\xA5-dansk', b'/?p%C3%A5-dansk'),
    1352          )
    1353          for [location, result] in tests:
    1354              with self.subTest(repr(location)):
    1355                  handler.connection = test_urllib.fakehttp(
    1356                      b'HTTP/1.1 302 Redirect\r\n'
    1357                      b'Location: ' + location + b'\r\n'
    1358                      b'\r\n'
    1359                  )
    1360                  response = opener.open('http://example.com/')
    1361                  expected = b'GET ' + result + b' '
    1362                  request = handler.last_buf
    1363                  self.assertTrue(request.startswith(expected), repr(request))
    1364  
    1365      def test_proxy(self):
    1366          u = "proxy.example.com:3128"
    1367          for d in dict(http=u), dict(HTTP=u):
    1368              o = OpenerDirector()
    1369              ph = urllib.request.ProxyHandler(d)
    1370              o.add_handler(ph)
    1371              meth_spec = [
    1372                  [("http_open", "return response")]
    1373                  ]
    1374              handlers = add_ordered_mock_handlers(o, meth_spec)
    1375  
    1376              req = Request("http://acme.example.com/")
    1377              self.assertEqual(req.host, "acme.example.com")
    1378              o.open(req)
    1379              self.assertEqual(req.host, u)
    1380              self.assertEqual([(handlers[0], "http_open")],
    1381                               [tup[0:2] for tup in o.calls])
    1382  
    1383      def test_proxy_no_proxy(self):
    1384          os.environ['no_proxy'] = 'python.org'
    1385          o = OpenerDirector()
    1386          ph = urllib.request.ProxyHandler(dict(http="proxy.example.com"))
    1387          o.add_handler(ph)
    1388          req = Request("http://www.perl.org/")
    1389          self.assertEqual(req.host, "www.perl.org")
    1390          o.open(req)
    1391          self.assertEqual(req.host, "proxy.example.com")
    1392          req = Request("http://www.python.org")
    1393          self.assertEqual(req.host, "www.python.org")
    1394          o.open(req)
    1395          self.assertEqual(req.host, "www.python.org")
    1396          del os.environ['no_proxy']
    1397  
    1398      def test_proxy_no_proxy_all(self):
    1399          os.environ['no_proxy'] = '*'
    1400          o = OpenerDirector()
    1401          ph = urllib.request.ProxyHandler(dict(http="proxy.example.com"))
    1402          o.add_handler(ph)
    1403          req = Request("http://www.python.org")
    1404          self.assertEqual(req.host, "www.python.org")
    1405          o.open(req)
    1406          self.assertEqual(req.host, "www.python.org")
    1407          del os.environ['no_proxy']
    1408  
    1409      def test_proxy_https(self):
    1410          o = OpenerDirector()
    1411          ph = urllib.request.ProxyHandler(dict(https="proxy.example.com:3128"))
    1412          o.add_handler(ph)
    1413          meth_spec = [
    1414              [("https_open", "return response")]
    1415          ]
    1416          handlers = add_ordered_mock_handlers(o, meth_spec)
    1417  
    1418          req = Request("https://www.example.com/")
    1419          self.assertEqual(req.host, "www.example.com")
    1420          o.open(req)
    1421          self.assertEqual(req.host, "proxy.example.com:3128")
    1422          self.assertEqual([(handlers[0], "https_open")],
    1423                           [tup[0:2] for tup in o.calls])
    1424  
    1425      def test_proxy_https_proxy_authorization(self):
    1426          o = OpenerDirector()
    1427          ph = urllib.request.ProxyHandler(dict(https='proxy.example.com:3128'))
    1428          o.add_handler(ph)
    1429          https_handler = MockHTTPSHandler()
    1430          o.add_handler(https_handler)
    1431          req = Request("https://www.example.com/")
    1432          req.add_header("Proxy-Authorization", "FooBar")
    1433          req.add_header("User-Agent", "Grail")
    1434          self.assertEqual(req.host, "www.example.com")
    1435          self.assertIsNone(req._tunnel_host)
    1436          o.open(req)
    1437          # Verify Proxy-Authorization gets tunneled to request.
    1438          # httpsconn req_headers do not have the Proxy-Authorization header but
    1439          # the req will have.
    1440          self.assertNotIn(("Proxy-Authorization", "FooBar"),
    1441                           https_handler.httpconn.req_headers)
    1442          self.assertIn(("User-Agent", "Grail"),
    1443                        https_handler.httpconn.req_headers)
    1444          self.assertIsNotNone(req._tunnel_host)
    1445          self.assertEqual(req.host, "proxy.example.com:3128")
    1446          self.assertEqual(req.get_header("Proxy-authorization"), "FooBar")
    1447  
    1448      @unittest.skipUnless(sys.platform == 'darwin', "only relevant for OSX")
    1449      def test_osx_proxy_bypass(self):
    1450          bypass = {
    1451              'exclude_simple': False,
    1452              'exceptions': ['foo.bar', '*.bar.com', '127.0.0.1', '10.10',
    1453                             '10.0/16']
    1454          }
    1455          # Check hosts that should trigger the proxy bypass
    1456          for host in ('foo.bar', 'www.bar.com', '127.0.0.1', '10.10.0.1',
    1457                       '10.0.0.1'):
    1458              self.assertTrue(_proxy_bypass_macosx_sysconf(host, bypass),
    1459                              'expected bypass of %s to be True' % host)
    1460          # Check hosts that should not trigger the proxy bypass
    1461          for host in ('abc.foo.bar', 'bar.com', '127.0.0.2', '10.11.0.1',
    1462                  'notinbypass'):
    1463              self.assertFalse(_proxy_bypass_macosx_sysconf(host, bypass),
    1464                               'expected bypass of %s to be False' % host)
    1465  
    1466          # Check the exclude_simple flag
    1467          bypass = {'exclude_simple': True, 'exceptions': []}
    1468          self.assertTrue(_proxy_bypass_macosx_sysconf('test', bypass))
    1469  
    1470          # Check that invalid prefix lengths are ignored
    1471          bypass = {
    1472              'exclude_simple': False,
    1473              'exceptions': [ '10.0.0.0/40', '172.19.10.0/24' ]
    1474          }
    1475          host = '172.19.10.5'
    1476          self.assertTrue(_proxy_bypass_macosx_sysconf(host, bypass),
    1477                          'expected bypass of %s to be True' % host)
    1478          host = '10.0.1.5'
    1479          self.assertFalse(_proxy_bypass_macosx_sysconf(host, bypass),
    1480                          'expected bypass of %s to be False' % host)
    1481  
    1482      def check_basic_auth(self, headers, realm):
    1483          with self.subTest(realm=realm, headers=headers):
    1484              opener = OpenerDirector()
    1485              password_manager = MockPasswordManager()
    1486              auth_handler = urllib.request.HTTPBasicAuthHandler(password_manager)
    1487              body = '\r\n'.join(headers) + '\r\n\r\n'
    1488              http_handler = MockHTTPHandler(401, body)
    1489              opener.add_handler(auth_handler)
    1490              opener.add_handler(http_handler)
    1491              self._test_basic_auth(opener, auth_handler, "Authorization",
    1492                                    realm, http_handler, password_manager,
    1493                                    "http://acme.example.com/protected",
    1494                                    "http://acme.example.com/protected")
    1495  
    1496      def test_basic_auth(self):
    1497          realm = "realm2@example.com"
    1498          realm2 = "realm2@example.com"
    1499          basic = f'Basic realm="{realm}"'
    1500          basic2 = f'Basic realm="{realm2}"'
    1501          other_no_realm = 'Otherscheme xxx'
    1502          digest = (f'Digest realm="{realm2}", '
    1503                    f'qop="auth, auth-int", '
    1504                    f'nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", '
    1505                    f'opaque="5ccc069c403ebaf9f0171e9517f40e41"')
    1506          for realm_str in (
    1507              # test "quote" and 'quote'
    1508              f'Basic realm="{realm}"',
    1509              f"Basic realm='{realm}'",
    1510  
    1511              # charset is ignored
    1512              f'Basic realm="{realm}", charset="UTF-8"',
    1513  
    1514              # Multiple challenges per header
    1515              f'{basic}, {basic2}',
    1516              f'{basic}, {other_no_realm}',
    1517              f'{other_no_realm}, {basic}',
    1518              f'{basic}, {digest}',
    1519              f'{digest}, {basic}',
    1520          ):
    1521              headers = [f'WWW-Authenticate: {realm_str}']
    1522              self.check_basic_auth(headers, realm)
    1523  
    1524          # no quote: expect a warning
    1525          with warnings_helper.check_warnings(("Basic Auth Realm was unquoted",
    1526                                       UserWarning)):
    1527              headers = [f'WWW-Authenticate: Basic realm={realm}']
    1528              self.check_basic_auth(headers, realm)
    1529  
    1530          # Multiple headers: one challenge per header.
    1531          # Use the first Basic realm.
    1532          for challenges in (
    1533              [basic,  basic2],
    1534              [basic,  digest],
    1535              [digest, basic],
    1536          ):
    1537              headers = [f'WWW-Authenticate: {challenge}'
    1538                         for challenge in challenges]
    1539              self.check_basic_auth(headers, realm)
    1540  
    1541      def test_proxy_basic_auth(self):
    1542          opener = OpenerDirector()
    1543          ph = urllib.request.ProxyHandler(dict(http="proxy.example.com:3128"))
    1544          opener.add_handler(ph)
    1545          password_manager = MockPasswordManager()
    1546          auth_handler = urllib.request.ProxyBasicAuthHandler(password_manager)
    1547          realm = "ACME Networks"
    1548          http_handler = MockHTTPHandler(
    1549              407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
    1550          opener.add_handler(auth_handler)
    1551          opener.add_handler(http_handler)
    1552          self._test_basic_auth(opener, auth_handler, "Proxy-authorization",
    1553                                realm, http_handler, password_manager,
    1554                                "http://acme.example.com:3128/protected",
    1555                                "proxy.example.com:3128",
    1556                                )
    1557  
    1558      def test_basic_and_digest_auth_handlers(self):
    1559          # HTTPDigestAuthHandler raised an exception if it couldn't handle a 40*
    1560          # response (http://python.org/sf/1479302), where it should instead
    1561          # return None to allow another handler (especially
    1562          # HTTPBasicAuthHandler) to handle the response.
    1563  
    1564          # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must
    1565          # try digest first (since it's the strongest auth scheme), so we record
    1566          # order of calls here to check digest comes first:
    1567          class ESC[4;38;5;81mRecordingOpenerDirector(ESC[4;38;5;149mOpenerDirector):
    1568              def __init__(self):
    1569                  OpenerDirector.__init__(self)
    1570                  self.recorded = []
    1571  
    1572              def record(self, info):
    1573                  self.recorded.append(info)
    1574  
    1575          class ESC[4;38;5;81mTestDigestAuthHandler(ESC[4;38;5;149murllibESC[4;38;5;149m.ESC[4;38;5;149mrequestESC[4;38;5;149m.ESC[4;38;5;149mHTTPDigestAuthHandler):
    1576              def http_error_401(self, *args, **kwds):
    1577                  self.parent.record("digest")
    1578                  urllib.request.HTTPDigestAuthHandler.http_error_401(self,
    1579                                                               *args, **kwds)
    1580  
    1581          class ESC[4;38;5;81mTestBasicAuthHandler(ESC[4;38;5;149murllibESC[4;38;5;149m.ESC[4;38;5;149mrequestESC[4;38;5;149m.ESC[4;38;5;149mHTTPBasicAuthHandler):
    1582              def http_error_401(self, *args, **kwds):
    1583                  self.parent.record("basic")
    1584                  urllib.request.HTTPBasicAuthHandler.http_error_401(self,
    1585                                                              *args, **kwds)
    1586  
    1587          opener = RecordingOpenerDirector()
    1588          password_manager = MockPasswordManager()
    1589          digest_handler = TestDigestAuthHandler(password_manager)
    1590          basic_handler = TestBasicAuthHandler(password_manager)
    1591          realm = "ACME Networks"
    1592          http_handler = MockHTTPHandler(
    1593              401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
    1594          opener.add_handler(basic_handler)
    1595          opener.add_handler(digest_handler)
    1596          opener.add_handler(http_handler)
    1597  
    1598          # check basic auth isn't blocked by digest handler failing
    1599          self._test_basic_auth(opener, basic_handler, "Authorization",
    1600                                realm, http_handler, password_manager,
    1601                                "http://acme.example.com/protected",
    1602                                "http://acme.example.com/protected",
    1603                                )
    1604          # check digest was tried before basic (twice, because
    1605          # _test_basic_auth called .open() twice)
    1606          self.assertEqual(opener.recorded, ["digest", "basic"]*2)
    1607  
    1608      def test_unsupported_auth_digest_handler(self):
    1609          opener = OpenerDirector()
    1610          # While using DigestAuthHandler
    1611          digest_auth_handler = urllib.request.HTTPDigestAuthHandler(None)
    1612          http_handler = MockHTTPHandler(
    1613              401, 'WWW-Authenticate: Kerberos\r\n\r\n')
    1614          opener.add_handler(digest_auth_handler)
    1615          opener.add_handler(http_handler)
    1616          self.assertRaises(ValueError, opener.open, "http://www.example.com")
    1617  
    1618      def test_unsupported_auth_basic_handler(self):
    1619          # While using BasicAuthHandler
    1620          opener = OpenerDirector()
    1621          basic_auth_handler = urllib.request.HTTPBasicAuthHandler(None)
    1622          http_handler = MockHTTPHandler(
    1623              401, 'WWW-Authenticate: NTLM\r\n\r\n')
    1624          opener.add_handler(basic_auth_handler)
    1625          opener.add_handler(http_handler)
    1626          self.assertRaises(ValueError, opener.open, "http://www.example.com")
    1627  
    1628      def _test_basic_auth(self, opener, auth_handler, auth_header,
    1629                           realm, http_handler, password_manager,
    1630                           request_url, protected_url):
    1631          import base64
    1632          user, password = "wile", "coyote"
    1633  
    1634          # .add_password() fed through to password manager
    1635          auth_handler.add_password(realm, request_url, user, password)
    1636          self.assertEqual(realm, password_manager.realm)
    1637          self.assertEqual(request_url, password_manager.url)
    1638          self.assertEqual(user, password_manager.user)
    1639          self.assertEqual(password, password_manager.password)
    1640  
    1641          opener.open(request_url)
    1642  
    1643          # should have asked the password manager for the username/password
    1644          self.assertEqual(password_manager.target_realm, realm)
    1645          self.assertEqual(password_manager.target_url, protected_url)
    1646  
    1647          # expect one request without authorization, then one with
    1648          self.assertEqual(len(http_handler.requests), 2)
    1649          self.assertFalse(http_handler.requests[0].has_header(auth_header))
    1650          userpass = bytes('%s:%s' % (user, password), "ascii")
    1651          auth_hdr_value = ('Basic ' +
    1652              base64.encodebytes(userpass).strip().decode())
    1653          self.assertEqual(http_handler.requests[1].get_header(auth_header),
    1654                           auth_hdr_value)
    1655          self.assertEqual(http_handler.requests[1].unredirected_hdrs[auth_header],
    1656                           auth_hdr_value)
    1657          # if the password manager can't find a password, the handler won't
    1658          # handle the HTTP auth error
    1659          password_manager.user = password_manager.password = None
    1660          http_handler.reset()
    1661          opener.open(request_url)
    1662          self.assertEqual(len(http_handler.requests), 1)
    1663          self.assertFalse(http_handler.requests[0].has_header(auth_header))
    1664  
    1665      def test_basic_prior_auth_auto_send(self):
    1666          # Assume already authenticated if is_authenticated=True
    1667          # for APIs like Github that don't return 401
    1668  
    1669          user, password = "wile", "coyote"
    1670          request_url = "http://acme.example.com/protected"
    1671  
    1672          http_handler = MockHTTPHandlerCheckAuth(200)
    1673  
    1674          pwd_manager = HTTPPasswordMgrWithPriorAuth()
    1675          auth_prior_handler = HTTPBasicAuthHandler(pwd_manager)
    1676          auth_prior_handler.add_password(
    1677              None, request_url, user, password, is_authenticated=True)
    1678  
    1679          self.assertTrue(pwd_manager.is_authenticated(request_url))
    1680          self.assertTrue(pwd_manager.is_authenticated(request_url + '/nested'))
    1681          self.assertFalse(pwd_manager.is_authenticated(request_url + 'plain'))
    1682  
    1683          opener = OpenerDirector()
    1684          opener.add_handler(auth_prior_handler)
    1685          opener.add_handler(http_handler)
    1686  
    1687          opener.open(request_url)
    1688  
    1689          # expect request to be sent with auth header
    1690          self.assertTrue(http_handler.has_auth_header)
    1691  
    1692      def test_basic_prior_auth_send_after_first_success(self):
    1693          # Auto send auth header after authentication is successful once
    1694  
    1695          user, password = 'wile', 'coyote'
    1696          request_url = 'http://acme.example.com/protected'
    1697          realm = 'ACME'
    1698  
    1699          pwd_manager = HTTPPasswordMgrWithPriorAuth()
    1700          auth_prior_handler = HTTPBasicAuthHandler(pwd_manager)
    1701          auth_prior_handler.add_password(realm, request_url, user, password)
    1702  
    1703          is_auth = pwd_manager.is_authenticated(request_url)
    1704          self.assertFalse(is_auth)
    1705  
    1706          opener = OpenerDirector()
    1707          opener.add_handler(auth_prior_handler)
    1708  
    1709          http_handler = MockHTTPHandler(
    1710              401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % None)
    1711          opener.add_handler(http_handler)
    1712  
    1713          opener.open(request_url)
    1714  
    1715          is_auth = pwd_manager.is_authenticated(request_url)
    1716          self.assertTrue(is_auth)
    1717  
    1718          http_handler = MockHTTPHandlerCheckAuth(200)
    1719          self.assertFalse(http_handler.has_auth_header)
    1720  
    1721          opener = OpenerDirector()
    1722          opener.add_handler(auth_prior_handler)
    1723          opener.add_handler(http_handler)
    1724  
    1725          # After getting 200 from MockHTTPHandler
    1726          # Next request sends header in the first request
    1727          opener.open(request_url)
    1728  
    1729          # expect request to be sent with auth header
    1730          self.assertTrue(http_handler.has_auth_header)
    1731  
    1732      def test_http_closed(self):
    1733          """Test the connection is cleaned up when the response is closed"""
    1734          for (transfer, data) in (
    1735              ("Connection: close", b"data"),
    1736              ("Transfer-Encoding: chunked", b"4\r\ndata\r\n0\r\n\r\n"),
    1737              ("Content-Length: 4", b"data"),
    1738          ):
    1739              header = "HTTP/1.1 200 OK\r\n{}\r\n\r\n".format(transfer)
    1740              conn = test_urllib.fakehttp(header.encode() + data)
    1741              handler = urllib.request.AbstractHTTPHandler()
    1742              req = Request("http://dummy/")
    1743              req.timeout = None
    1744              with handler.do_open(conn, req) as resp:
    1745                  resp.read()
    1746              self.assertTrue(conn.fakesock.closed,
    1747                  "Connection not closed with {!r}".format(transfer))
    1748  
    1749      def test_invalid_closed(self):
    1750          """Test the connection is cleaned up after an invalid response"""
    1751          conn = test_urllib.fakehttp(b"")
    1752          handler = urllib.request.AbstractHTTPHandler()
    1753          req = Request("http://dummy/")
    1754          req.timeout = None
    1755          with self.assertRaises(http.client.BadStatusLine):
    1756              handler.do_open(conn, req)
    1757          self.assertTrue(conn.fakesock.closed, "Connection not closed")
    1758  
    1759  
    1760  class ESC[4;38;5;81mMiscTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1761  
    1762      def opener_has_handler(self, opener, handler_class):
    1763          self.assertTrue(any(h.__class__ == handler_class
    1764                              for h in opener.handlers))
    1765  
    1766      def test_build_opener(self):
    1767          class ESC[4;38;5;81mMyHTTPHandler(ESC[4;38;5;149murllibESC[4;38;5;149m.ESC[4;38;5;149mrequestESC[4;38;5;149m.ESC[4;38;5;149mHTTPHandler):
    1768              pass
    1769  
    1770          class ESC[4;38;5;81mFooHandler(ESC[4;38;5;149murllibESC[4;38;5;149m.ESC[4;38;5;149mrequestESC[4;38;5;149m.ESC[4;38;5;149mBaseHandler):
    1771              def foo_open(self):
    1772                  pass
    1773  
    1774          class ESC[4;38;5;81mBarHandler(ESC[4;38;5;149murllibESC[4;38;5;149m.ESC[4;38;5;149mrequestESC[4;38;5;149m.ESC[4;38;5;149mBaseHandler):
    1775              def bar_open(self):
    1776                  pass
    1777  
    1778          build_opener = urllib.request.build_opener
    1779  
    1780          o = build_opener(FooHandler, BarHandler)
    1781          self.opener_has_handler(o, FooHandler)
    1782          self.opener_has_handler(o, BarHandler)
    1783  
    1784          # can take a mix of classes and instances
    1785          o = build_opener(FooHandler, BarHandler())
    1786          self.opener_has_handler(o, FooHandler)
    1787          self.opener_has_handler(o, BarHandler)
    1788  
    1789          # subclasses of default handlers override default handlers
    1790          o = build_opener(MyHTTPHandler)
    1791          self.opener_has_handler(o, MyHTTPHandler)
    1792  
    1793          # a particular case of overriding: default handlers can be passed
    1794          # in explicitly
    1795          o = build_opener()
    1796          self.opener_has_handler(o, urllib.request.HTTPHandler)
    1797          o = build_opener(urllib.request.HTTPHandler)
    1798          self.opener_has_handler(o, urllib.request.HTTPHandler)
    1799          o = build_opener(urllib.request.HTTPHandler())
    1800          self.opener_has_handler(o, urllib.request.HTTPHandler)
    1801  
    1802          # Issue2670: multiple handlers sharing the same base class
    1803          class ESC[4;38;5;81mMyOtherHTTPHandler(ESC[4;38;5;149murllibESC[4;38;5;149m.ESC[4;38;5;149mrequestESC[4;38;5;149m.ESC[4;38;5;149mHTTPHandler):
    1804              pass
    1805  
    1806          o = build_opener(MyHTTPHandler, MyOtherHTTPHandler)
    1807          self.opener_has_handler(o, MyHTTPHandler)
    1808          self.opener_has_handler(o, MyOtherHTTPHandler)
    1809  
    1810      def test_HTTPError_interface(self):
    1811          """
    1812          Issue 13211 reveals that HTTPError didn't implement the URLError
    1813          interface even though HTTPError is a subclass of URLError.
    1814          """
    1815          msg = 'something bad happened'
    1816          url = code = fp = None
    1817          hdrs = 'Content-Length: 42'
    1818          err = urllib.error.HTTPError(url, code, msg, hdrs, fp)
    1819          self.assertTrue(hasattr(err, 'reason'))
    1820          self.assertEqual(err.reason, 'something bad happened')
    1821          self.assertTrue(hasattr(err, 'headers'))
    1822          self.assertEqual(err.headers, 'Content-Length: 42')
    1823          expected_errmsg = 'HTTP Error %s: %s' % (err.code, err.msg)
    1824          self.assertEqual(str(err), expected_errmsg)
    1825          expected_errmsg = '<HTTPError %s: %r>' % (err.code, err.msg)
    1826          self.assertEqual(repr(err), expected_errmsg)
    1827  
    1828      def test_gh_98778(self):
    1829          x = urllib.error.HTTPError("url", 405, "METHOD NOT ALLOWED", None, None)
    1830          self.assertEqual(getattr(x, "__notes__", ()), ())
    1831          self.assertIsInstance(x.fp.read(), bytes)
    1832  
    1833      def test_parse_proxy(self):
    1834          parse_proxy_test_cases = [
    1835              ('proxy.example.com',
    1836               (None, None, None, 'proxy.example.com')),
    1837              ('proxy.example.com:3128',
    1838               (None, None, None, 'proxy.example.com:3128')),
    1839              ('proxy.example.com', (None, None, None, 'proxy.example.com')),
    1840              ('proxy.example.com:3128',
    1841               (None, None, None, 'proxy.example.com:3128')),
    1842              # The authority component may optionally include userinfo
    1843              # (assumed to be # username:password):
    1844              ('joe:password@proxy.example.com',
    1845               (None, 'joe', 'password', 'proxy.example.com')),
    1846              ('joe:password@proxy.example.com:3128',
    1847               (None, 'joe', 'password', 'proxy.example.com:3128')),
    1848              #Examples with URLS
    1849              ('http://proxy.example.com/',
    1850               ('http', None, None, 'proxy.example.com')),
    1851              ('http://proxy.example.com:3128/',
    1852               ('http', None, None, 'proxy.example.com:3128')),
    1853              ('http://joe:password@proxy.example.com/',
    1854               ('http', 'joe', 'password', 'proxy.example.com')),
    1855              ('http://joe:password@proxy.example.com:3128',
    1856               ('http', 'joe', 'password', 'proxy.example.com:3128')),
    1857              # Everything after the authority is ignored
    1858              ('ftp://joe:password@proxy.example.com/rubbish:3128',
    1859               ('ftp', 'joe', 'password', 'proxy.example.com')),
    1860              # Test for no trailing '/' case
    1861              ('http://joe:password@proxy.example.com',
    1862               ('http', 'joe', 'password', 'proxy.example.com')),
    1863              # Testcases with '/' character in username, password
    1864              ('http://user/name:password@localhost:22',
    1865               ('http', 'user/name', 'password', 'localhost:22')),
    1866              ('http://username:pass/word@localhost:22',
    1867               ('http', 'username', 'pass/word', 'localhost:22')),
    1868              ('http://user/name:pass/word@localhost:22',
    1869               ('http', 'user/name', 'pass/word', 'localhost:22')),
    1870          ]
    1871  
    1872  
    1873          for tc, expected in parse_proxy_test_cases:
    1874              self.assertEqual(_parse_proxy(tc), expected)
    1875  
    1876          self.assertRaises(ValueError, _parse_proxy, 'file:/ftp.example.com'),
    1877  
    1878      def test_unsupported_algorithm(self):
    1879          handler = AbstractDigestAuthHandler()
    1880          with self.assertRaises(ValueError) as exc:
    1881              handler.get_algorithm_impls('invalid')
    1882          self.assertEqual(
    1883              str(exc.exception),
    1884              "Unsupported digest authentication algorithm 'invalid'"
    1885          )
    1886  
    1887  
    1888  class ESC[4;38;5;81mRequestTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1889      class ESC[4;38;5;81mPutRequest(ESC[4;38;5;149mRequest):
    1890          method = 'PUT'
    1891  
    1892      def setUp(self):
    1893          self.get = Request("http://www.python.org/~jeremy/")
    1894          self.post = Request("http://www.python.org/~jeremy/",
    1895                              "data",
    1896                              headers={"X-Test": "test"})
    1897          self.head = Request("http://www.python.org/~jeremy/", method='HEAD')
    1898          self.put = self.PutRequest("http://www.python.org/~jeremy/")
    1899          self.force_post = self.PutRequest("http://www.python.org/~jeremy/",
    1900              method="POST")
    1901  
    1902      def test_method(self):
    1903          self.assertEqual("POST", self.post.get_method())
    1904          self.assertEqual("GET", self.get.get_method())
    1905          self.assertEqual("HEAD", self.head.get_method())
    1906          self.assertEqual("PUT", self.put.get_method())
    1907          self.assertEqual("POST", self.force_post.get_method())
    1908  
    1909      def test_data(self):
    1910          self.assertFalse(self.get.data)
    1911          self.assertEqual("GET", self.get.get_method())
    1912          self.get.data = "spam"
    1913          self.assertTrue(self.get.data)
    1914          self.assertEqual("POST", self.get.get_method())
    1915  
    1916      # issue 16464
    1917      # if we change data we need to remove content-length header
    1918      # (cause it's most probably calculated for previous value)
    1919      def test_setting_data_should_remove_content_length(self):
    1920          self.assertNotIn("Content-length", self.get.unredirected_hdrs)
    1921          self.get.add_unredirected_header("Content-length", 42)
    1922          self.assertEqual(42, self.get.unredirected_hdrs["Content-length"])
    1923          self.get.data = "spam"
    1924          self.assertNotIn("Content-length", self.get.unredirected_hdrs)
    1925  
    1926      # issue 17485 same for deleting data.
    1927      def test_deleting_data_should_remove_content_length(self):
    1928          self.assertNotIn("Content-length", self.get.unredirected_hdrs)
    1929          self.get.data = 'foo'
    1930          self.get.add_unredirected_header("Content-length", 3)
    1931          self.assertEqual(3, self.get.unredirected_hdrs["Content-length"])
    1932          del self.get.data
    1933          self.assertNotIn("Content-length", self.get.unredirected_hdrs)
    1934  
    1935      def test_get_full_url(self):
    1936          self.assertEqual("http://www.python.org/~jeremy/",
    1937                           self.get.get_full_url())
    1938  
    1939      def test_selector(self):
    1940          self.assertEqual("/~jeremy/", self.get.selector)
    1941          req = Request("http://www.python.org/")
    1942          self.assertEqual("/", req.selector)
    1943  
    1944      def test_get_type(self):
    1945          self.assertEqual("http", self.get.type)
    1946  
    1947      def test_get_host(self):
    1948          self.assertEqual("www.python.org", self.get.host)
    1949  
    1950      def test_get_host_unquote(self):
    1951          req = Request("http://www.%70ython.org/")
    1952          self.assertEqual("www.python.org", req.host)
    1953  
    1954      def test_proxy(self):
    1955          self.assertFalse(self.get.has_proxy())
    1956          self.get.set_proxy("www.perl.org", "http")
    1957          self.assertTrue(self.get.has_proxy())
    1958          self.assertEqual("www.python.org", self.get.origin_req_host)
    1959          self.assertEqual("www.perl.org", self.get.host)
    1960  
    1961      def test_wrapped_url(self):
    1962          req = Request("<URL:http://www.python.org>")
    1963          self.assertEqual("www.python.org", req.host)
    1964  
    1965      def test_url_fragment(self):
    1966          req = Request("http://www.python.org/?qs=query#fragment=true")
    1967          self.assertEqual("/?qs=query", req.selector)
    1968          req = Request("http://www.python.org/#fun=true")
    1969          self.assertEqual("/", req.selector)
    1970  
    1971          # Issue 11703: geturl() omits fragment in the original URL.
    1972          url = 'http://docs.python.org/library/urllib2.html#OK'
    1973          req = Request(url)
    1974          self.assertEqual(req.get_full_url(), url)
    1975  
    1976      def test_url_fullurl_get_full_url(self):
    1977          urls = ['http://docs.python.org',
    1978                  'http://docs.python.org/library/urllib2.html#OK',
    1979                  'http://www.python.org/?qs=query#fragment=true']
    1980          for url in urls:
    1981              req = Request(url)
    1982              self.assertEqual(req.get_full_url(), req.full_url)
    1983  
    1984  
    1985  if __name__ == "__main__":
    1986      unittest.main()