1 import errno
2 import unittest
3 from test import support
4 from test.support import os_helper
5 from test.support import socket_helper
6 from test.support import ResourceDenied
7 from test.test_urllib2 import sanepathname2url
8
9 import os
10 import socket
11 import urllib.error
12 import urllib.request
13 import sys
14
15 support.requires("network")
16
17
18 def _retry_thrice(func, exc, *args, **kwargs):
19 for i in range(3):
20 try:
21 return func(*args, **kwargs)
22 except exc as e:
23 last_exc = e
24 continue
25 raise last_exc
26
27 def _wrap_with_retry_thrice(func, exc):
28 def wrapped(*args, **kwargs):
29 return _retry_thrice(func, exc, *args, **kwargs)
30 return wrapped
31
32 # Connecting to remote hosts is flaky. Make it more robust by retrying
33 # the connection several times.
34 _urlopen_with_retry = _wrap_with_retry_thrice(urllib.request.urlopen,
35 urllib.error.URLError)
36
37
38 class ESC[4;38;5;81mTransientResource(ESC[4;38;5;149mobject):
39
40 """Raise ResourceDenied if an exception is raised while the context manager
41 is in effect that matches the specified exception and attributes."""
42
43 def __init__(self, exc, **kwargs):
44 self.exc = exc
45 self.attrs = kwargs
46
47 def __enter__(self):
48 return self
49
50 def __exit__(self, type_=None, value=None, traceback=None):
51 """If type_ is a subclass of self.exc and value has attributes matching
52 self.attrs, raise ResourceDenied. Otherwise let the exception
53 propagate (if any)."""
54 if type_ is not None and issubclass(self.exc, type_):
55 for attr, attr_value in self.attrs.items():
56 if not hasattr(value, attr):
57 break
58 if getattr(value, attr) != attr_value:
59 break
60 else:
61 raise ResourceDenied("an optional resource is not available")
62
63 # Context managers that raise ResourceDenied when various issues
64 # with the internet connection manifest themselves as exceptions.
65 # XXX deprecate these and use transient_internet() instead
66 time_out = TransientResource(OSError, errno=errno.ETIMEDOUT)
67 socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
68 ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
69
70
71 class ESC[4;38;5;81mAuthTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
72 """Tests urllib2 authentication features."""
73
74 ## Disabled at the moment since there is no page under python.org which
75 ## could be used to HTTP authentication.
76 #
77 # def test_basic_auth(self):
78 # import http.client
79 #
80 # test_url = "http://www.python.org/test/test_urllib2/basic_auth"
81 # test_hostport = "www.python.org"
82 # test_realm = 'Test Realm'
83 # test_user = 'test.test_urllib2net'
84 # test_password = 'blah'
85 #
86 # # failure
87 # try:
88 # _urlopen_with_retry(test_url)
89 # except urllib2.HTTPError, exc:
90 # self.assertEqual(exc.code, 401)
91 # else:
92 # self.fail("urlopen() should have failed with 401")
93 #
94 # # success
95 # auth_handler = urllib2.HTTPBasicAuthHandler()
96 # auth_handler.add_password(test_realm, test_hostport,
97 # test_user, test_password)
98 # opener = urllib2.build_opener(auth_handler)
99 # f = opener.open('http://localhost/')
100 # response = _urlopen_with_retry("http://www.python.org/")
101 #
102 # # The 'userinfo' URL component is deprecated by RFC 3986 for security
103 # # reasons, let's not implement it! (it's already implemented for proxy
104 # # specification strings (that is, URLs or authorities specifying a
105 # # proxy), so we must keep that)
106 # self.assertRaises(http.client.InvalidURL,
107 # urllib2.urlopen, "http://evil:thing@example.com")
108
109
110 class ESC[4;38;5;81mCloseSocketTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
111
112 def test_close(self):
113 # clear _opener global variable
114 self.addCleanup(urllib.request.urlcleanup)
115
116 # calling .close() on urllib2's response objects should close the
117 # underlying socket
118 url = support.TEST_HTTP_URL
119 with socket_helper.transient_internet(url):
120 response = _urlopen_with_retry(url)
121 sock = response.fp
122 self.assertFalse(sock.closed)
123 response.close()
124 self.assertTrue(sock.closed)
125
126 class ESC[4;38;5;81mOtherNetworkTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
127 def setUp(self):
128 if 0: # for debugging
129 import logging
130 logger = logging.getLogger("test_urllib2net")
131 logger.addHandler(logging.StreamHandler())
132
133 # XXX The rest of these tests aren't very good -- they don't check much.
134 # They do sometimes catch some major disasters, though.
135
136 @support.requires_resource('walltime')
137 def test_ftp(self):
138 # Testing the same URL twice exercises the caching in CacheFTPHandler
139 urls = [
140 'ftp://www.pythontest.net/README',
141 'ftp://www.pythontest.net/README',
142 ('ftp://www.pythontest.net/non-existent-file',
143 None, urllib.error.URLError),
144 ]
145 self._test_urls(urls, self._extra_handlers())
146
147 def test_file(self):
148 TESTFN = os_helper.TESTFN
149 f = open(TESTFN, 'w')
150 try:
151 f.write('hi there\n')
152 f.close()
153 urls = [
154 'file:' + sanepathname2url(os.path.abspath(TESTFN)),
155 ('file:///nonsensename/etc/passwd', None,
156 urllib.error.URLError),
157 ]
158 self._test_urls(urls, self._extra_handlers(), retry=True)
159 finally:
160 os.remove(TESTFN)
161
162 self.assertRaises(ValueError, urllib.request.urlopen,'./relative_path/to/file')
163
164 # XXX Following test depends on machine configurations that are internal
165 # to CNRI. Need to set up a public server with the right authentication
166 # configuration for test purposes.
167
168 ## def test_cnri(self):
169 ## if socket.gethostname() == 'bitdiddle':
170 ## localhost = 'bitdiddle.cnri.reston.va.us'
171 ## elif socket.gethostname() == 'bitdiddle.concentric.net':
172 ## localhost = 'localhost'
173 ## else:
174 ## localhost = None
175 ## if localhost is not None:
176 ## urls = [
177 ## 'file://%s/etc/passwd' % localhost,
178 ## 'http://%s/simple/' % localhost,
179 ## 'http://%s/digest/' % localhost,
180 ## 'http://%s/not/found.h' % localhost,
181 ## ]
182
183 ## bauth = HTTPBasicAuthHandler()
184 ## bauth.add_password('basic_test_realm', localhost, 'jhylton',
185 ## 'password')
186 ## dauth = HTTPDigestAuthHandler()
187 ## dauth.add_password('digest_test_realm', localhost, 'jhylton',
188 ## 'password')
189
190 ## self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
191
192 def test_urlwithfrag(self):
193 urlwith_frag = "http://www.pythontest.net/index.html#frag"
194 with socket_helper.transient_internet(urlwith_frag):
195 req = urllib.request.Request(urlwith_frag)
196 res = urllib.request.urlopen(req)
197 self.assertEqual(res.geturl(),
198 "http://www.pythontest.net/index.html#frag")
199
200 @support.requires_resource('walltime')
201 def test_redirect_url_withfrag(self):
202 redirect_url_with_frag = "http://www.pythontest.net/redir/with_frag/"
203 with socket_helper.transient_internet(redirect_url_with_frag):
204 req = urllib.request.Request(redirect_url_with_frag)
205 res = urllib.request.urlopen(req)
206 self.assertEqual(res.geturl(),
207 "http://www.pythontest.net/elsewhere/#frag")
208
209 def test_custom_headers(self):
210 url = support.TEST_HTTP_URL
211 with socket_helper.transient_internet(url):
212 opener = urllib.request.build_opener()
213 request = urllib.request.Request(url)
214 self.assertFalse(request.header_items())
215 opener.open(request)
216 self.assertTrue(request.header_items())
217 self.assertTrue(request.has_header('User-agent'))
218 request.add_header('User-Agent','Test-Agent')
219 opener.open(request)
220 self.assertEqual(request.get_header('User-agent'),'Test-Agent')
221
222 @unittest.skip('XXX: http://www.imdb.com is gone')
223 def test_sites_no_connection_close(self):
224 # Some sites do not send Connection: close header.
225 # Verify that those work properly. (#issue12576)
226
227 URL = 'http://www.imdb.com' # mangles Connection:close
228
229 with socket_helper.transient_internet(URL):
230 try:
231 with urllib.request.urlopen(URL) as res:
232 pass
233 except ValueError:
234 self.fail("urlopen failed for site not sending \
235 Connection:close")
236 else:
237 self.assertTrue(res)
238
239 req = urllib.request.urlopen(URL)
240 res = req.read()
241 self.assertTrue(res)
242
243 def _test_urls(self, urls, handlers, retry=True):
244 import time
245 import logging
246 debug = logging.getLogger("test_urllib2").debug
247
248 urlopen = urllib.request.build_opener(*handlers).open
249 if retry:
250 urlopen = _wrap_with_retry_thrice(urlopen, urllib.error.URLError)
251
252 for url in urls:
253 with self.subTest(url=url):
254 if isinstance(url, tuple):
255 url, req, expected_err = url
256 else:
257 req = expected_err = None
258
259 with socket_helper.transient_internet(url):
260 try:
261 f = urlopen(url, req, support.INTERNET_TIMEOUT)
262 # urllib.error.URLError is a subclass of OSError
263 except OSError as err:
264 if expected_err:
265 msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" %
266 (expected_err, url, req, type(err), err))
267 self.assertIsInstance(err, expected_err, msg)
268 else:
269 raise
270 else:
271 try:
272 with time_out, \
273 socket_peer_reset, \
274 ioerror_peer_reset:
275 buf = f.read()
276 debug("read %d bytes" % len(buf))
277 except TimeoutError:
278 print("<timeout: %s>" % url, file=sys.stderr)
279 f.close()
280 time.sleep(0.1)
281
282 def _extra_handlers(self):
283 handlers = []
284
285 cfh = urllib.request.CacheFTPHandler()
286 self.addCleanup(cfh.clear_cache)
287 cfh.setTimeout(1)
288 handlers.append(cfh)
289
290 return handlers
291
292
293 class ESC[4;38;5;81mTimeoutTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
294 def setUp(self):
295 # clear _opener global variable
296 self.addCleanup(urllib.request.urlcleanup)
297
298 def test_http_basic(self):
299 self.assertIsNone(socket.getdefaulttimeout())
300 url = support.TEST_HTTP_URL
301 with socket_helper.transient_internet(url, timeout=None):
302 u = _urlopen_with_retry(url)
303 self.addCleanup(u.close)
304 self.assertIsNone(u.fp.raw._sock.gettimeout())
305
306 def test_http_default_timeout(self):
307 self.assertIsNone(socket.getdefaulttimeout())
308 url = support.TEST_HTTP_URL
309 with socket_helper.transient_internet(url):
310 socket.setdefaulttimeout(60)
311 try:
312 u = _urlopen_with_retry(url)
313 self.addCleanup(u.close)
314 finally:
315 socket.setdefaulttimeout(None)
316 self.assertEqual(u.fp.raw._sock.gettimeout(), 60)
317
318 def test_http_no_timeout(self):
319 self.assertIsNone(socket.getdefaulttimeout())
320 url = support.TEST_HTTP_URL
321 with socket_helper.transient_internet(url):
322 socket.setdefaulttimeout(60)
323 try:
324 u = _urlopen_with_retry(url, timeout=None)
325 self.addCleanup(u.close)
326 finally:
327 socket.setdefaulttimeout(None)
328 self.assertIsNone(u.fp.raw._sock.gettimeout())
329
330 def test_http_timeout(self):
331 url = support.TEST_HTTP_URL
332 with socket_helper.transient_internet(url):
333 u = _urlopen_with_retry(url, timeout=120)
334 self.addCleanup(u.close)
335 self.assertEqual(u.fp.raw._sock.gettimeout(), 120)
336
337 FTP_HOST = 'ftp://www.pythontest.net/'
338
339 @support.requires_resource('walltime')
340 def test_ftp_basic(self):
341 self.assertIsNone(socket.getdefaulttimeout())
342 with socket_helper.transient_internet(self.FTP_HOST, timeout=None):
343 u = _urlopen_with_retry(self.FTP_HOST)
344 self.addCleanup(u.close)
345 self.assertIsNone(u.fp.fp.raw._sock.gettimeout())
346
347 def test_ftp_default_timeout(self):
348 self.assertIsNone(socket.getdefaulttimeout())
349 with socket_helper.transient_internet(self.FTP_HOST):
350 socket.setdefaulttimeout(60)
351 try:
352 u = _urlopen_with_retry(self.FTP_HOST)
353 self.addCleanup(u.close)
354 finally:
355 socket.setdefaulttimeout(None)
356 self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
357
358 @support.requires_resource('walltime')
359 def test_ftp_no_timeout(self):
360 self.assertIsNone(socket.getdefaulttimeout())
361 with socket_helper.transient_internet(self.FTP_HOST):
362 socket.setdefaulttimeout(60)
363 try:
364 u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
365 self.addCleanup(u.close)
366 finally:
367 socket.setdefaulttimeout(None)
368 self.assertIsNone(u.fp.fp.raw._sock.gettimeout())
369
370 @support.requires_resource('walltime')
371 def test_ftp_timeout(self):
372 with socket_helper.transient_internet(self.FTP_HOST):
373 u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
374 self.addCleanup(u.close)
375 self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
376
377
378 if __name__ == "__main__":
379 unittest.main()