(root)/
Python-3.11.7/
Lib/
test/
test_urllib2net.py
       1  import errno
       2  import unittest
       3  from test import support
       4  from test.support import os_helper
       5  from test.support import socket_helper
       6  from test.support import ResourceDenied
       7  from test.test_urllib2 import sanepathname2url
       8  
       9  import os
      10  import socket
      11  import urllib.error
      12  import urllib.request
      13  import sys
      14  
      15  support.requires("network")
      16  
      17  
      18  def _retry_thrice(func, exc, *args, **kwargs):
      19      for i in range(3):
      20          try:
      21              return func(*args, **kwargs)
      22          except exc as e:
      23              last_exc = e
      24              continue
      25      raise last_exc
      26  
      27  def _wrap_with_retry_thrice(func, exc):
      28      def wrapped(*args, **kwargs):
      29          return _retry_thrice(func, exc, *args, **kwargs)
      30      return wrapped
      31  
      32  # Connecting to remote hosts is flaky.  Make it more robust by retrying
      33  # the connection several times.
      34  _urlopen_with_retry = _wrap_with_retry_thrice(urllib.request.urlopen,
      35                                                urllib.error.URLError)
      36  
      37  
      38  class ESC[4;38;5;81mTransientResource(ESC[4;38;5;149mobject):
      39  
      40      """Raise ResourceDenied if an exception is raised while the context manager
      41      is in effect that matches the specified exception and attributes."""
      42  
      43      def __init__(self, exc, **kwargs):
      44          self.exc = exc
      45          self.attrs = kwargs
      46  
      47      def __enter__(self):
      48          return self
      49  
      50      def __exit__(self, type_=None, value=None, traceback=None):
      51          """If type_ is a subclass of self.exc and value has attributes matching
      52          self.attrs, raise ResourceDenied.  Otherwise let the exception
      53          propagate (if any)."""
      54          if type_ is not None and issubclass(self.exc, type_):
      55              for attr, attr_value in self.attrs.items():
      56                  if not hasattr(value, attr):
      57                      break
      58                  if getattr(value, attr) != attr_value:
      59                      break
      60              else:
      61                  raise ResourceDenied("an optional resource is not available")
      62  
      63  # Context managers that raise ResourceDenied when various issues
      64  # with the internet connection manifest themselves as exceptions.
      65  # XXX deprecate these and use transient_internet() instead
      66  time_out = TransientResource(OSError, errno=errno.ETIMEDOUT)
      67  socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
      68  ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
      69  
      70  
      71  class ESC[4;38;5;81mAuthTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      72      """Tests urllib2 authentication features."""
      73  
      74  ## Disabled at the moment since there is no page under python.org which
      75  ## could be used to HTTP authentication.
      76  #
      77  #    def test_basic_auth(self):
      78  #        import http.client
      79  #
      80  #        test_url = "http://www.python.org/test/test_urllib2/basic_auth"
      81  #        test_hostport = "www.python.org"
      82  #        test_realm = 'Test Realm'
      83  #        test_user = 'test.test_urllib2net'
      84  #        test_password = 'blah'
      85  #
      86  #        # failure
      87  #        try:
      88  #            _urlopen_with_retry(test_url)
      89  #        except urllib2.HTTPError, exc:
      90  #            self.assertEqual(exc.code, 401)
      91  #        else:
      92  #            self.fail("urlopen() should have failed with 401")
      93  #
      94  #        # success
      95  #        auth_handler = urllib2.HTTPBasicAuthHandler()
      96  #        auth_handler.add_password(test_realm, test_hostport,
      97  #                                  test_user, test_password)
      98  #        opener = urllib2.build_opener(auth_handler)
      99  #        f = opener.open('http://localhost/')
     100  #        response = _urlopen_with_retry("http://www.python.org/")
     101  #
     102  #        # The 'userinfo' URL component is deprecated by RFC 3986 for security
     103  #        # reasons, let's not implement it!  (it's already implemented for proxy
     104  #        # specification strings (that is, URLs or authorities specifying a
     105  #        # proxy), so we must keep that)
     106  #        self.assertRaises(http.client.InvalidURL,
     107  #                          urllib2.urlopen, "http://evil:thing@example.com")
     108  
     109  
     110  class ESC[4;38;5;81mCloseSocketTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     111  
     112      def test_close(self):
     113          # clear _opener global variable
     114          self.addCleanup(urllib.request.urlcleanup)
     115  
     116          # calling .close() on urllib2's response objects should close the
     117          # underlying socket
     118          url = support.TEST_HTTP_URL
     119          with socket_helper.transient_internet(url):
     120              response = _urlopen_with_retry(url)
     121              sock = response.fp
     122              self.assertFalse(sock.closed)
     123              response.close()
     124              self.assertTrue(sock.closed)
     125  
     126  class ESC[4;38;5;81mOtherNetworkTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     127      def setUp(self):
     128          if 0:  # for debugging
     129              import logging
     130              logger = logging.getLogger("test_urllib2net")
     131              logger.addHandler(logging.StreamHandler())
     132  
     133      # XXX The rest of these tests aren't very good -- they don't check much.
     134      # They do sometimes catch some major disasters, though.
     135  
     136      @support.requires_resource('walltime')
     137      def test_ftp(self):
     138          # Testing the same URL twice exercises the caching in CacheFTPHandler
     139          urls = [
     140              'ftp://www.pythontest.net/README',
     141              'ftp://www.pythontest.net/README',
     142              ('ftp://www.pythontest.net/non-existent-file',
     143               None, urllib.error.URLError),
     144              ]
     145          self._test_urls(urls, self._extra_handlers())
     146  
     147      def test_file(self):
     148          TESTFN = os_helper.TESTFN
     149          f = open(TESTFN, 'w')
     150          try:
     151              f.write('hi there\n')
     152              f.close()
     153              urls = [
     154                  'file:' + sanepathname2url(os.path.abspath(TESTFN)),
     155                  ('file:///nonsensename/etc/passwd', None,
     156                   urllib.error.URLError),
     157                  ]
     158              self._test_urls(urls, self._extra_handlers(), retry=True)
     159          finally:
     160              os.remove(TESTFN)
     161  
     162          self.assertRaises(ValueError, urllib.request.urlopen,'./relative_path/to/file')
     163  
     164      # XXX Following test depends on machine configurations that are internal
     165      # to CNRI.  Need to set up a public server with the right authentication
     166      # configuration for test purposes.
     167  
     168  ##     def test_cnri(self):
     169  ##         if socket.gethostname() == 'bitdiddle':
     170  ##             localhost = 'bitdiddle.cnri.reston.va.us'
     171  ##         elif socket.gethostname() == 'bitdiddle.concentric.net':
     172  ##             localhost = 'localhost'
     173  ##         else:
     174  ##             localhost = None
     175  ##         if localhost is not None:
     176  ##             urls = [
     177  ##                 'file://%s/etc/passwd' % localhost,
     178  ##                 'http://%s/simple/' % localhost,
     179  ##                 'http://%s/digest/' % localhost,
     180  ##                 'http://%s/not/found.h' % localhost,
     181  ##                 ]
     182  
     183  ##             bauth = HTTPBasicAuthHandler()
     184  ##             bauth.add_password('basic_test_realm', localhost, 'jhylton',
     185  ##                                'password')
     186  ##             dauth = HTTPDigestAuthHandler()
     187  ##             dauth.add_password('digest_test_realm', localhost, 'jhylton',
     188  ##                                'password')
     189  
     190  ##             self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
     191  
     192      def test_urlwithfrag(self):
     193          urlwith_frag = "http://www.pythontest.net/index.html#frag"
     194          with socket_helper.transient_internet(urlwith_frag):
     195              req = urllib.request.Request(urlwith_frag)
     196              res = urllib.request.urlopen(req)
     197              self.assertEqual(res.geturl(),
     198                      "http://www.pythontest.net/index.html#frag")
     199  
     200      @support.requires_resource('walltime')
     201      def test_redirect_url_withfrag(self):
     202          redirect_url_with_frag = "http://www.pythontest.net/redir/with_frag/"
     203          with socket_helper.transient_internet(redirect_url_with_frag):
     204              req = urllib.request.Request(redirect_url_with_frag)
     205              res = urllib.request.urlopen(req)
     206              self.assertEqual(res.geturl(),
     207                      "http://www.pythontest.net/elsewhere/#frag")
     208  
     209      def test_custom_headers(self):
     210          url = support.TEST_HTTP_URL
     211          with socket_helper.transient_internet(url):
     212              opener = urllib.request.build_opener()
     213              request = urllib.request.Request(url)
     214              self.assertFalse(request.header_items())
     215              opener.open(request)
     216              self.assertTrue(request.header_items())
     217              self.assertTrue(request.has_header('User-agent'))
     218              request.add_header('User-Agent','Test-Agent')
     219              opener.open(request)
     220              self.assertEqual(request.get_header('User-agent'),'Test-Agent')
     221  
     222      @unittest.skip('XXX: http://www.imdb.com is gone')
     223      def test_sites_no_connection_close(self):
     224          # Some sites do not send Connection: close header.
     225          # Verify that those work properly. (#issue12576)
     226  
     227          URL = 'http://www.imdb.com' # mangles Connection:close
     228  
     229          with socket_helper.transient_internet(URL):
     230              try:
     231                  with urllib.request.urlopen(URL) as res:
     232                      pass
     233              except ValueError:
     234                  self.fail("urlopen failed for site not sending \
     235                             Connection:close")
     236              else:
     237                  self.assertTrue(res)
     238  
     239              req = urllib.request.urlopen(URL)
     240              res = req.read()
     241              self.assertTrue(res)
     242  
     243      def _test_urls(self, urls, handlers, retry=True):
     244          import time
     245          import logging
     246          debug = logging.getLogger("test_urllib2").debug
     247  
     248          urlopen = urllib.request.build_opener(*handlers).open
     249          if retry:
     250              urlopen = _wrap_with_retry_thrice(urlopen, urllib.error.URLError)
     251  
     252          for url in urls:
     253              with self.subTest(url=url):
     254                  if isinstance(url, tuple):
     255                      url, req, expected_err = url
     256                  else:
     257                      req = expected_err = None
     258  
     259                  with socket_helper.transient_internet(url):
     260                      try:
     261                          f = urlopen(url, req, support.INTERNET_TIMEOUT)
     262                      # urllib.error.URLError is a subclass of OSError
     263                      except OSError as err:
     264                          if expected_err:
     265                              msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" %
     266                                     (expected_err, url, req, type(err), err))
     267                              self.assertIsInstance(err, expected_err, msg)
     268                          else:
     269                              raise
     270                      else:
     271                          try:
     272                              with time_out, \
     273                                   socket_peer_reset, \
     274                                   ioerror_peer_reset:
     275                                  buf = f.read()
     276                                  debug("read %d bytes" % len(buf))
     277                          except TimeoutError:
     278                              print("<timeout: %s>" % url, file=sys.stderr)
     279                          f.close()
     280                  time.sleep(0.1)
     281  
     282      def _extra_handlers(self):
     283          handlers = []
     284  
     285          cfh = urllib.request.CacheFTPHandler()
     286          self.addCleanup(cfh.clear_cache)
     287          cfh.setTimeout(1)
     288          handlers.append(cfh)
     289  
     290          return handlers
     291  
     292  
     293  class ESC[4;38;5;81mTimeoutTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     294      def setUp(self):
     295          # clear _opener global variable
     296          self.addCleanup(urllib.request.urlcleanup)
     297  
     298      def test_http_basic(self):
     299          self.assertIsNone(socket.getdefaulttimeout())
     300          url = support.TEST_HTTP_URL
     301          with socket_helper.transient_internet(url, timeout=None):
     302              u = _urlopen_with_retry(url)
     303              self.addCleanup(u.close)
     304              self.assertIsNone(u.fp.raw._sock.gettimeout())
     305  
     306      def test_http_default_timeout(self):
     307          self.assertIsNone(socket.getdefaulttimeout())
     308          url = support.TEST_HTTP_URL
     309          with socket_helper.transient_internet(url):
     310              socket.setdefaulttimeout(60)
     311              try:
     312                  u = _urlopen_with_retry(url)
     313                  self.addCleanup(u.close)
     314              finally:
     315                  socket.setdefaulttimeout(None)
     316              self.assertEqual(u.fp.raw._sock.gettimeout(), 60)
     317  
     318      def test_http_no_timeout(self):
     319          self.assertIsNone(socket.getdefaulttimeout())
     320          url = support.TEST_HTTP_URL
     321          with socket_helper.transient_internet(url):
     322              socket.setdefaulttimeout(60)
     323              try:
     324                  u = _urlopen_with_retry(url, timeout=None)
     325                  self.addCleanup(u.close)
     326              finally:
     327                  socket.setdefaulttimeout(None)
     328              self.assertIsNone(u.fp.raw._sock.gettimeout())
     329  
     330      def test_http_timeout(self):
     331          url = support.TEST_HTTP_URL
     332          with socket_helper.transient_internet(url):
     333              u = _urlopen_with_retry(url, timeout=120)
     334              self.addCleanup(u.close)
     335              self.assertEqual(u.fp.raw._sock.gettimeout(), 120)
     336  
     337      FTP_HOST = 'ftp://www.pythontest.net/'
     338  
     339      @support.requires_resource('walltime')
     340      def test_ftp_basic(self):
     341          self.assertIsNone(socket.getdefaulttimeout())
     342          with socket_helper.transient_internet(self.FTP_HOST, timeout=None):
     343              u = _urlopen_with_retry(self.FTP_HOST)
     344              self.addCleanup(u.close)
     345              self.assertIsNone(u.fp.fp.raw._sock.gettimeout())
     346  
     347      def test_ftp_default_timeout(self):
     348          self.assertIsNone(socket.getdefaulttimeout())
     349          with socket_helper.transient_internet(self.FTP_HOST):
     350              socket.setdefaulttimeout(60)
     351              try:
     352                  u = _urlopen_with_retry(self.FTP_HOST)
     353                  self.addCleanup(u.close)
     354              finally:
     355                  socket.setdefaulttimeout(None)
     356              self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
     357  
     358      @support.requires_resource('walltime')
     359      def test_ftp_no_timeout(self):
     360          self.assertIsNone(socket.getdefaulttimeout())
     361          with socket_helper.transient_internet(self.FTP_HOST):
     362              socket.setdefaulttimeout(60)
     363              try:
     364                  u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
     365                  self.addCleanup(u.close)
     366              finally:
     367                  socket.setdefaulttimeout(None)
     368              self.assertIsNone(u.fp.fp.raw._sock.gettimeout())
     369  
     370      @support.requires_resource('walltime')
     371      def test_ftp_timeout(self):
     372          with socket_helper.transient_internet(self.FTP_HOST):
     373              u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
     374              self.addCleanup(u.close)
     375              self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
     376  
     377  
     378  if __name__ == "__main__":
     379      unittest.main()