(root)/
Python-3.12.0/
Lib/
test/
test_urllibnet.py
       1  import unittest
       2  from test import support
       3  from test.support import os_helper
       4  from test.support import socket_helper
       5  
       6  import contextlib
       7  import socket
       8  import urllib.parse
       9  import urllib.request
      10  import os
      11  import email.message
      12  import time
      13  
      14  
      15  support.requires('network')
      16  
      17  
      18  class ESC[4;38;5;81mURLTimeoutTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      19      # XXX this test doesn't seem to test anything useful.
      20  
      21      def setUp(self):
      22          socket.setdefaulttimeout(support.INTERNET_TIMEOUT)
      23  
      24      def tearDown(self):
      25          socket.setdefaulttimeout(None)
      26  
      27      def testURLread(self):
      28          # clear _opener global variable
      29          self.addCleanup(urllib.request.urlcleanup)
      30  
      31          domain = urllib.parse.urlparse(support.TEST_HTTP_URL).netloc
      32          with socket_helper.transient_internet(domain):
      33              f = urllib.request.urlopen(support.TEST_HTTP_URL)
      34              f.read()
      35  
      36  
      37  class ESC[4;38;5;81murlopenNetworkTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      38      """Tests urllib.request.urlopen using the network.
      39  
      40      These tests are not exhaustive.  Assuming that testing using files does a
      41      good job overall of some of the basic interface features.  There are no
      42      tests exercising the optional 'data' and 'proxies' arguments.  No tests
      43      for transparent redirection have been written.
      44  
      45      setUp is not used for always constructing a connection to
      46      http://www.pythontest.net/ since there a few tests that don't use that address
      47      and making a connection is expensive enough to warrant minimizing unneeded
      48      connections.
      49  
      50      """
      51  
      52      url = 'http://www.pythontest.net/'
      53  
      54      def setUp(self):
      55          # clear _opener global variable
      56          self.addCleanup(urllib.request.urlcleanup)
      57  
      58      @contextlib.contextmanager
      59      def urlopen(self, *args, **kwargs):
      60          resource = args[0]
      61          with socket_helper.transient_internet(resource):
      62              r = urllib.request.urlopen(*args, **kwargs)
      63              try:
      64                  yield r
      65              finally:
      66                  r.close()
      67  
      68      def test_basic(self):
      69          # Simple test expected to pass.
      70          with self.urlopen(self.url) as open_url:
      71              for attr in ("read", "readline", "readlines", "fileno", "close",
      72                           "info", "geturl"):
      73                  self.assertTrue(hasattr(open_url, attr), "object returned from "
      74                                  "urlopen lacks the %s attribute" % attr)
      75              self.assertTrue(open_url.read(), "calling 'read' failed")
      76  
      77      def test_readlines(self):
      78          # Test both readline and readlines.
      79          with self.urlopen(self.url) as open_url:
      80              self.assertIsInstance(open_url.readline(), bytes,
      81                                    "readline did not return a string")
      82              self.assertIsInstance(open_url.readlines(), list,
      83                                    "readlines did not return a list")
      84  
      85      def test_info(self):
      86          # Test 'info'.
      87          with self.urlopen(self.url) as open_url:
      88              info_obj = open_url.info()
      89              self.assertIsInstance(info_obj, email.message.Message,
      90                                    "object returned by 'info' is not an "
      91                                    "instance of email.message.Message")
      92              self.assertEqual(info_obj.get_content_subtype(), "html")
      93  
      94      def test_geturl(self):
      95          # Make sure same URL as opened is returned by geturl.
      96          with self.urlopen(self.url) as open_url:
      97              gotten_url = open_url.geturl()
      98              self.assertEqual(gotten_url, self.url)
      99  
     100      def test_getcode(self):
     101          # test getcode() with the fancy opener to get 404 error codes
     102          URL = self.url + "XXXinvalidXXX"
     103          with socket_helper.transient_internet(URL):
     104              with self.assertWarns(DeprecationWarning):
     105                  open_url = urllib.request.FancyURLopener().open(URL)
     106              try:
     107                  code = open_url.getcode()
     108              finally:
     109                  open_url.close()
     110              self.assertEqual(code, 404)
     111  
     112      @support.requires_resource('walltime')
     113      def test_bad_address(self):
     114          # Make sure proper exception is raised when connecting to a bogus
     115          # address.
     116  
     117          # Given that both VeriSign and various ISPs have in
     118          # the past or are presently hijacking various invalid
     119          # domain name requests in an attempt to boost traffic
     120          # to their own sites, finding a domain name to use
     121          # for this test is difficult.  RFC2606 leads one to
     122          # believe that '.invalid' should work, but experience
     123          # seemed to indicate otherwise.  Single character
     124          # TLDs are likely to remain invalid, so this seems to
     125          # be the best choice. The trailing '.' prevents a
     126          # related problem: The normal DNS resolver appends
     127          # the domain names from the search path if there is
     128          # no '.' the end and, and if one of those domains
     129          # implements a '*' rule a result is returned.
     130          # However, none of this will prevent the test from
     131          # failing if the ISP hijacks all invalid domain
     132          # requests.  The real solution would be to be able to
     133          # parameterize the framework with a mock resolver.
     134          bogus_domain = "sadflkjsasf.i.nvali.d."
     135          try:
     136              socket.gethostbyname(bogus_domain)
     137          except OSError:
     138              # socket.gaierror is too narrow, since getaddrinfo() may also
     139              # fail with EAI_SYSTEM and ETIMEDOUT (seen on Ubuntu 13.04),
     140              # i.e. Python's TimeoutError.
     141              pass
     142          else:
     143              # This happens with some overzealous DNS providers such as OpenDNS
     144              self.skipTest("%r should not resolve for test to work" % bogus_domain)
     145          failure_explanation = ('opening an invalid URL did not raise OSError; '
     146                                 'can be caused by a broken DNS server '
     147                                 '(e.g. returns 404 or hijacks page)')
     148          with self.assertRaises(OSError, msg=failure_explanation):
     149              urllib.request.urlopen("http://{}/".format(bogus_domain))
     150  
     151  
     152  class ESC[4;38;5;81murlretrieveNetworkTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     153      """Tests urllib.request.urlretrieve using the network."""
     154  
     155      def setUp(self):
     156          # remove temporary files created by urlretrieve()
     157          self.addCleanup(urllib.request.urlcleanup)
     158  
     159      @contextlib.contextmanager
     160      def urlretrieve(self, *args, **kwargs):
     161          resource = args[0]
     162          with socket_helper.transient_internet(resource):
     163              file_location, info = urllib.request.urlretrieve(*args, **kwargs)
     164              try:
     165                  yield file_location, info
     166              finally:
     167                  os_helper.unlink(file_location)
     168  
     169      def test_basic(self):
     170          # Test basic functionality.
     171          with self.urlretrieve(self.logo) as (file_location, info):
     172              self.assertTrue(os.path.exists(file_location), "file location returned by"
     173                              " urlretrieve is not a valid path")
     174              with open(file_location, 'rb') as f:
     175                  self.assertTrue(f.read(), "reading from the file location returned"
     176                                  " by urlretrieve failed")
     177  
     178      def test_specified_path(self):
     179          # Make sure that specifying the location of the file to write to works.
     180          with self.urlretrieve(self.logo,
     181                                os_helper.TESTFN) as (file_location, info):
     182              self.assertEqual(file_location, os_helper.TESTFN)
     183              self.assertTrue(os.path.exists(file_location))
     184              with open(file_location, 'rb') as f:
     185                  self.assertTrue(f.read(), "reading from temporary file failed")
     186  
     187      def test_header(self):
     188          # Make sure header returned as 2nd value from urlretrieve is good.
     189          with self.urlretrieve(self.logo) as (file_location, info):
     190              self.assertIsInstance(info, email.message.Message,
     191                                    "info is not an instance of email.message.Message")
     192  
     193      logo = "http://www.pythontest.net/"
     194  
     195      @support.requires_resource('walltime')
     196      def test_data_header(self):
     197          with self.urlretrieve(self.logo) as (file_location, fileheaders):
     198              datevalue = fileheaders.get('Date')
     199              dateformat = '%a, %d %b %Y %H:%M:%S GMT'
     200              try:
     201                  time.strptime(datevalue, dateformat)
     202              except ValueError:
     203                  self.fail('Date value not in %r format' % dateformat)
     204  
     205      def test_reporthook(self):
     206          records = []
     207  
     208          def recording_reporthook(blocks, block_size, total_size):
     209              records.append((blocks, block_size, total_size))
     210  
     211          with self.urlretrieve(self.logo, reporthook=recording_reporthook) as (
     212                  file_location, fileheaders):
     213              expected_size = int(fileheaders['Content-Length'])
     214  
     215          records_repr = repr(records)  # For use in error messages.
     216          self.assertGreater(len(records), 1, msg="There should always be two "
     217                             "calls; the first one before the transfer starts.")
     218          self.assertEqual(records[0][0], 0)
     219          self.assertGreater(records[0][1], 0,
     220                             msg="block size can't be 0 in %s" % records_repr)
     221          self.assertEqual(records[0][2], expected_size)
     222          self.assertEqual(records[-1][2], expected_size)
     223  
     224          block_sizes = {block_size for _, block_size, _ in records}
     225          self.assertEqual({records[0][1]}, block_sizes,
     226                           msg="block sizes in %s must be equal" % records_repr)
     227          self.assertGreaterEqual(records[-1][0]*records[0][1], expected_size,
     228                                  msg="number of blocks * block size must be"
     229                                  " >= total size in %s" % records_repr)
     230  
     231  
     232  if __name__ == "__main__":
     233      unittest.main()