python (3.11.7)
1 import unittest
2 from test import support
3 from test.support import os_helper
4 from test.support import socket_helper
5
6 import contextlib
7 import socket
8 import urllib.parse
9 import urllib.request
10 import os
11 import email.message
12 import time
13
14
15 support.requires('network')
16
17
18 class ESC[4;38;5;81mURLTimeoutTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
19 # XXX this test doesn't seem to test anything useful.
20
21 def setUp(self):
22 socket.setdefaulttimeout(support.INTERNET_TIMEOUT)
23
24 def tearDown(self):
25 socket.setdefaulttimeout(None)
26
27 def testURLread(self):
28 # clear _opener global variable
29 self.addCleanup(urllib.request.urlcleanup)
30
31 domain = urllib.parse.urlparse(support.TEST_HTTP_URL).netloc
32 with socket_helper.transient_internet(domain):
33 f = urllib.request.urlopen(support.TEST_HTTP_URL)
34 f.read()
35
36
37 class ESC[4;38;5;81murlopenNetworkTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
38 """Tests urllib.request.urlopen using the network.
39
40 These tests are not exhaustive. Assuming that testing using files does a
41 good job overall of some of the basic interface features. There are no
42 tests exercising the optional 'data' and 'proxies' arguments. No tests
43 for transparent redirection have been written.
44
45 setUp is not used for always constructing a connection to
46 http://www.pythontest.net/ since there a few tests that don't use that address
47 and making a connection is expensive enough to warrant minimizing unneeded
48 connections.
49
50 """
51
52 url = 'http://www.pythontest.net/'
53
54 def setUp(self):
55 # clear _opener global variable
56 self.addCleanup(urllib.request.urlcleanup)
57
58 @contextlib.contextmanager
59 def urlopen(self, *args, **kwargs):
60 resource = args[0]
61 with socket_helper.transient_internet(resource):
62 r = urllib.request.urlopen(*args, **kwargs)
63 try:
64 yield r
65 finally:
66 r.close()
67
68 def test_basic(self):
69 # Simple test expected to pass.
70 with self.urlopen(self.url) as open_url:
71 for attr in ("read", "readline", "readlines", "fileno", "close",
72 "info", "geturl"):
73 self.assertTrue(hasattr(open_url, attr), "object returned from "
74 "urlopen lacks the %s attribute" % attr)
75 self.assertTrue(open_url.read(), "calling 'read' failed")
76
77 def test_readlines(self):
78 # Test both readline and readlines.
79 with self.urlopen(self.url) as open_url:
80 self.assertIsInstance(open_url.readline(), bytes,
81 "readline did not return a string")
82 self.assertIsInstance(open_url.readlines(), list,
83 "readlines did not return a list")
84
85 def test_info(self):
86 # Test 'info'.
87 with self.urlopen(self.url) as open_url:
88 info_obj = open_url.info()
89 self.assertIsInstance(info_obj, email.message.Message,
90 "object returned by 'info' is not an "
91 "instance of email.message.Message")
92 self.assertEqual(info_obj.get_content_subtype(), "html")
93
94 def test_geturl(self):
95 # Make sure same URL as opened is returned by geturl.
96 with self.urlopen(self.url) as open_url:
97 gotten_url = open_url.geturl()
98 self.assertEqual(gotten_url, self.url)
99
100 def test_getcode(self):
101 # test getcode() with the fancy opener to get 404 error codes
102 URL = self.url + "XXXinvalidXXX"
103 with socket_helper.transient_internet(URL):
104 with self.assertWarns(DeprecationWarning):
105 open_url = urllib.request.FancyURLopener().open(URL)
106 try:
107 code = open_url.getcode()
108 finally:
109 open_url.close()
110 self.assertEqual(code, 404)
111
112 @support.requires_resource('walltime')
113 def test_bad_address(self):
114 # Make sure proper exception is raised when connecting to a bogus
115 # address.
116
117 # Given that both VeriSign and various ISPs have in
118 # the past or are presently hijacking various invalid
119 # domain name requests in an attempt to boost traffic
120 # to their own sites, finding a domain name to use
121 # for this test is difficult. RFC2606 leads one to
122 # believe that '.invalid' should work, but experience
123 # seemed to indicate otherwise. Single character
124 # TLDs are likely to remain invalid, so this seems to
125 # be the best choice. The trailing '.' prevents a
126 # related problem: The normal DNS resolver appends
127 # the domain names from the search path if there is
128 # no '.' the end and, and if one of those domains
129 # implements a '*' rule a result is returned.
130 # However, none of this will prevent the test from
131 # failing if the ISP hijacks all invalid domain
132 # requests. The real solution would be to be able to
133 # parameterize the framework with a mock resolver.
134 bogus_domain = "sadflkjsasf.i.nvali.d."
135 try:
136 socket.gethostbyname(bogus_domain)
137 except OSError:
138 # socket.gaierror is too narrow, since getaddrinfo() may also
139 # fail with EAI_SYSTEM and ETIMEDOUT (seen on Ubuntu 13.04),
140 # i.e. Python's TimeoutError.
141 pass
142 else:
143 # This happens with some overzealous DNS providers such as OpenDNS
144 self.skipTest("%r should not resolve for test to work" % bogus_domain)
145 failure_explanation = ('opening an invalid URL did not raise OSError; '
146 'can be caused by a broken DNS server '
147 '(e.g. returns 404 or hijacks page)')
148 with self.assertRaises(OSError, msg=failure_explanation):
149 urllib.request.urlopen("http://{}/".format(bogus_domain))
150
151
152 class ESC[4;38;5;81murlretrieveNetworkTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
153 """Tests urllib.request.urlretrieve using the network."""
154
155 def setUp(self):
156 # remove temporary files created by urlretrieve()
157 self.addCleanup(urllib.request.urlcleanup)
158
159 @contextlib.contextmanager
160 def urlretrieve(self, *args, **kwargs):
161 resource = args[0]
162 with socket_helper.transient_internet(resource):
163 file_location, info = urllib.request.urlretrieve(*args, **kwargs)
164 try:
165 yield file_location, info
166 finally:
167 os_helper.unlink(file_location)
168
169 def test_basic(self):
170 # Test basic functionality.
171 with self.urlretrieve(self.logo) as (file_location, info):
172 self.assertTrue(os.path.exists(file_location), "file location returned by"
173 " urlretrieve is not a valid path")
174 with open(file_location, 'rb') as f:
175 self.assertTrue(f.read(), "reading from the file location returned"
176 " by urlretrieve failed")
177
178 def test_specified_path(self):
179 # Make sure that specifying the location of the file to write to works.
180 with self.urlretrieve(self.logo,
181 os_helper.TESTFN) as (file_location, info):
182 self.assertEqual(file_location, os_helper.TESTFN)
183 self.assertTrue(os.path.exists(file_location))
184 with open(file_location, 'rb') as f:
185 self.assertTrue(f.read(), "reading from temporary file failed")
186
187 def test_header(self):
188 # Make sure header returned as 2nd value from urlretrieve is good.
189 with self.urlretrieve(self.logo) as (file_location, info):
190 self.assertIsInstance(info, email.message.Message,
191 "info is not an instance of email.message.Message")
192
193 logo = "http://www.pythontest.net/"
194
195 @support.requires_resource('walltime')
196 def test_data_header(self):
197 with self.urlretrieve(self.logo) as (file_location, fileheaders):
198 datevalue = fileheaders.get('Date')
199 dateformat = '%a, %d %b %Y %H:%M:%S GMT'
200 try:
201 time.strptime(datevalue, dateformat)
202 except ValueError:
203 self.fail('Date value not in %r format' % dateformat)
204
205 def test_reporthook(self):
206 records = []
207
208 def recording_reporthook(blocks, block_size, total_size):
209 records.append((blocks, block_size, total_size))
210
211 with self.urlretrieve(self.logo, reporthook=recording_reporthook) as (
212 file_location, fileheaders):
213 expected_size = int(fileheaders['Content-Length'])
214
215 records_repr = repr(records) # For use in error messages.
216 self.assertGreater(len(records), 1, msg="There should always be two "
217 "calls; the first one before the transfer starts.")
218 self.assertEqual(records[0][0], 0)
219 self.assertGreater(records[0][1], 0,
220 msg="block size can't be 0 in %s" % records_repr)
221 self.assertEqual(records[0][2], expected_size)
222 self.assertEqual(records[-1][2], expected_size)
223
224 block_sizes = {block_size for _, block_size, _ in records}
225 self.assertEqual({records[0][1]}, block_sizes,
226 msg="block sizes in %s must be equal" % records_repr)
227 self.assertGreaterEqual(records[-1][0]*records[0][1], expected_size,
228 msg="number of blocks * block size must be"
229 " >= total size in %s" % records_repr)
230
231
232 if __name__ == "__main__":
233 unittest.main()