1 """Test script for poplib module."""
2
3 # Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL
4 # a real test suite
5
6 import poplib
7 import socket
8 import os
9 import errno
10 import threading
11
12 import unittest
13 from unittest import TestCase, skipUnless
14 from test import support as test_support
15 from test.support import hashlib_helper
16 from test.support import socket_helper
17 from test.support import threading_helper
18 from test.support import warnings_helper
19
20
21 asynchat = warnings_helper.import_deprecated('asynchat')
22 asyncore = warnings_helper.import_deprecated('asyncore')
23
24
25 test_support.requires_working_socket(module=True)
26
27 HOST = socket_helper.HOST
28 PORT = 0
29
30 SUPPORTS_SSL = False
31 if hasattr(poplib, 'POP3_SSL'):
32 import ssl
33
34 SUPPORTS_SSL = True
35 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "certdata", "keycert3.pem")
36 CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "certdata", "pycacert.pem")
37
38 requires_ssl = skipUnless(SUPPORTS_SSL, 'SSL not supported')
39
40 # the dummy data returned by server when LIST and RETR commands are issued
41 LIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n'
42 RETR_RESP = b"""From: postmaster@python.org\
43 \r\nContent-Type: text/plain\r\n\
44 MIME-Version: 1.0\r\n\
45 Subject: Dummy\r\n\
46 \r\n\
47 line1\r\n\
48 line2\r\n\
49 line3\r\n\
50 .\r\n"""
51
52
53 class ESC[4;38;5;81mDummyPOP3Handler(ESC[4;38;5;149masynchatESC[4;38;5;149m.ESC[4;38;5;149masync_chat):
54
55 CAPAS = {'UIDL': [], 'IMPLEMENTATION': ['python-testlib-pop-server']}
56 enable_UTF8 = False
57
58 def __init__(self, conn):
59 asynchat.async_chat.__init__(self, conn)
60 self.set_terminator(b"\r\n")
61 self.in_buffer = []
62 self.push('+OK dummy pop3 server ready. <timestamp>')
63 self.tls_active = False
64 self.tls_starting = False
65
66 def collect_incoming_data(self, data):
67 self.in_buffer.append(data)
68
69 def found_terminator(self):
70 line = b''.join(self.in_buffer)
71 line = str(line, 'ISO-8859-1')
72 self.in_buffer = []
73 cmd = line.split(' ')[0].lower()
74 space = line.find(' ')
75 if space != -1:
76 arg = line[space + 1:]
77 else:
78 arg = ""
79 if hasattr(self, 'cmd_' + cmd):
80 method = getattr(self, 'cmd_' + cmd)
81 method(arg)
82 else:
83 self.push('-ERR unrecognized POP3 command "%s".' %cmd)
84
85 def handle_error(self):
86 raise
87
88 def push(self, data):
89 asynchat.async_chat.push(self, data.encode("ISO-8859-1") + b'\r\n')
90
91 def cmd_echo(self, arg):
92 # sends back the received string (used by the test suite)
93 self.push(arg)
94
95 def cmd_user(self, arg):
96 if arg != "guido":
97 self.push("-ERR no such user")
98 self.push('+OK password required')
99
100 def cmd_pass(self, arg):
101 if arg != "python":
102 self.push("-ERR wrong password")
103 self.push('+OK 10 messages')
104
105 def cmd_stat(self, arg):
106 self.push('+OK 10 100')
107
108 def cmd_list(self, arg):
109 if arg:
110 self.push('+OK %s %s' % (arg, arg))
111 else:
112 self.push('+OK')
113 asynchat.async_chat.push(self, LIST_RESP)
114
115 cmd_uidl = cmd_list
116
117 def cmd_retr(self, arg):
118 self.push('+OK %s bytes' %len(RETR_RESP))
119 asynchat.async_chat.push(self, RETR_RESP)
120
121 cmd_top = cmd_retr
122
123 def cmd_dele(self, arg):
124 self.push('+OK message marked for deletion.')
125
126 def cmd_noop(self, arg):
127 self.push('+OK done nothing.')
128
129 def cmd_rpop(self, arg):
130 self.push('+OK done nothing.')
131
132 def cmd_apop(self, arg):
133 self.push('+OK done nothing.')
134
135 def cmd_quit(self, arg):
136 self.push('+OK closing.')
137 self.close_when_done()
138
139 def _get_capas(self):
140 _capas = dict(self.CAPAS)
141 if not self.tls_active and SUPPORTS_SSL:
142 _capas['STLS'] = []
143 return _capas
144
145 def cmd_capa(self, arg):
146 self.push('+OK Capability list follows')
147 if self._get_capas():
148 for cap, params in self._get_capas().items():
149 _ln = [cap]
150 if params:
151 _ln.extend(params)
152 self.push(' '.join(_ln))
153 self.push('.')
154
155 def cmd_utf8(self, arg):
156 self.push('+OK I know RFC6856'
157 if self.enable_UTF8
158 else '-ERR What is UTF8?!')
159
160 if SUPPORTS_SSL:
161
162 def cmd_stls(self, arg):
163 if self.tls_active is False:
164 self.push('+OK Begin TLS negotiation')
165 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
166 context.load_cert_chain(CERTFILE)
167 tls_sock = context.wrap_socket(self.socket,
168 server_side=True,
169 do_handshake_on_connect=False,
170 suppress_ragged_eofs=False)
171 self.del_channel()
172 self.set_socket(tls_sock)
173 self.tls_active = True
174 self.tls_starting = True
175 self.in_buffer = []
176 self._do_tls_handshake()
177 else:
178 self.push('-ERR Command not permitted when TLS active')
179
180 def _do_tls_handshake(self):
181 try:
182 self.socket.do_handshake()
183 except ssl.SSLError as err:
184 if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
185 ssl.SSL_ERROR_WANT_WRITE):
186 return
187 elif err.args[0] == ssl.SSL_ERROR_EOF:
188 return self.handle_close()
189 # TODO: SSLError does not expose alert information
190 elif ("SSLV3_ALERT_BAD_CERTIFICATE" in err.args[1] or
191 "SSLV3_ALERT_CERTIFICATE_UNKNOWN" in err.args[1]):
192 return self.handle_close()
193 raise
194 except OSError as err:
195 if err.args[0] == errno.ECONNABORTED:
196 return self.handle_close()
197 else:
198 self.tls_active = True
199 self.tls_starting = False
200
201 def handle_read(self):
202 if self.tls_starting:
203 self._do_tls_handshake()
204 else:
205 try:
206 asynchat.async_chat.handle_read(self)
207 except ssl.SSLEOFError:
208 self.handle_close()
209
210 class ESC[4;38;5;81mDummyPOP3Server(ESC[4;38;5;149masyncoreESC[4;38;5;149m.ESC[4;38;5;149mdispatcher, ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
211
212 handler = DummyPOP3Handler
213
214 def __init__(self, address, af=socket.AF_INET):
215 threading.Thread.__init__(self)
216 asyncore.dispatcher.__init__(self)
217 self.daemon = True
218 self.create_socket(af, socket.SOCK_STREAM)
219 self.bind(address)
220 self.listen(5)
221 self.active = False
222 self.active_lock = threading.Lock()
223 self.host, self.port = self.socket.getsockname()[:2]
224 self.handler_instance = None
225
226 def start(self):
227 assert not self.active
228 self.__flag = threading.Event()
229 threading.Thread.start(self)
230 self.__flag.wait()
231
232 def run(self):
233 self.active = True
234 self.__flag.set()
235 try:
236 while self.active and asyncore.socket_map:
237 with self.active_lock:
238 asyncore.loop(timeout=0.1, count=1)
239 finally:
240 asyncore.close_all(ignore_all=True)
241
242 def stop(self):
243 assert self.active
244 self.active = False
245 self.join()
246
247 def handle_accepted(self, conn, addr):
248 self.handler_instance = self.handler(conn)
249
250 def handle_connect(self):
251 self.close()
252 handle_read = handle_connect
253
254 def writable(self):
255 return 0
256
257 def handle_error(self):
258 raise
259
260
261 class ESC[4;38;5;81mTestPOP3Class(ESC[4;38;5;149mTestCase):
262 def assertOK(self, resp):
263 self.assertTrue(resp.startswith(b"+OK"))
264
265 def setUp(self):
266 self.server = DummyPOP3Server((HOST, PORT))
267 self.server.start()
268 self.client = poplib.POP3(self.server.host, self.server.port,
269 timeout=test_support.LOOPBACK_TIMEOUT)
270
271 def tearDown(self):
272 self.client.close()
273 self.server.stop()
274 # Explicitly clear the attribute to prevent dangling thread
275 self.server = None
276
277 def test_getwelcome(self):
278 self.assertEqual(self.client.getwelcome(),
279 b'+OK dummy pop3 server ready. <timestamp>')
280
281 def test_exceptions(self):
282 self.assertRaises(poplib.error_proto, self.client._shortcmd, 'echo -err')
283
284 def test_user(self):
285 self.assertOK(self.client.user('guido'))
286 self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
287
288 def test_pass_(self):
289 self.assertOK(self.client.pass_('python'))
290 self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
291
292 def test_stat(self):
293 self.assertEqual(self.client.stat(), (10, 100))
294
295 def test_list(self):
296 self.assertEqual(self.client.list()[1:],
297 ([b'1 1', b'2 2', b'3 3', b'4 4', b'5 5'],
298 25))
299 self.assertTrue(self.client.list('1').endswith(b"OK 1 1"))
300
301 def test_retr(self):
302 expected = (b'+OK 116 bytes',
303 [b'From: postmaster@python.org', b'Content-Type: text/plain',
304 b'MIME-Version: 1.0', b'Subject: Dummy',
305 b'', b'line1', b'line2', b'line3'],
306 113)
307 foo = self.client.retr('foo')
308 self.assertEqual(foo, expected)
309
310 def test_too_long_lines(self):
311 self.assertRaises(poplib.error_proto, self.client._shortcmd,
312 'echo +%s' % ((poplib._MAXLINE + 10) * 'a'))
313
314 def test_dele(self):
315 self.assertOK(self.client.dele('foo'))
316
317 def test_noop(self):
318 self.assertOK(self.client.noop())
319
320 def test_rpop(self):
321 self.assertOK(self.client.rpop('foo'))
322
323 @hashlib_helper.requires_hashdigest('md5', openssl=True)
324 def test_apop_normal(self):
325 self.assertOK(self.client.apop('foo', 'dummypassword'))
326
327 @hashlib_helper.requires_hashdigest('md5', openssl=True)
328 def test_apop_REDOS(self):
329 # Replace welcome with very long evil welcome.
330 # NB The upper bound on welcome length is currently 2048.
331 # At this length, evil input makes each apop call take
332 # on the order of milliseconds instead of microseconds.
333 evil_welcome = b'+OK' + (b'<' * 1000000)
334 with test_support.swap_attr(self.client, 'welcome', evil_welcome):
335 # The evil welcome is invalid, so apop should throw.
336 self.assertRaises(poplib.error_proto, self.client.apop, 'a', 'kb')
337
338 def test_top(self):
339 expected = (b'+OK 116 bytes',
340 [b'From: postmaster@python.org', b'Content-Type: text/plain',
341 b'MIME-Version: 1.0', b'Subject: Dummy', b'',
342 b'line1', b'line2', b'line3'],
343 113)
344 self.assertEqual(self.client.top(1, 1), expected)
345
346 def test_uidl(self):
347 self.client.uidl()
348 self.client.uidl('foo')
349
350 def test_utf8_raises_if_unsupported(self):
351 self.server.handler.enable_UTF8 = False
352 self.assertRaises(poplib.error_proto, self.client.utf8)
353
354 def test_utf8(self):
355 self.server.handler.enable_UTF8 = True
356 expected = b'+OK I know RFC6856'
357 result = self.client.utf8()
358 self.assertEqual(result, expected)
359
360 def test_capa(self):
361 capa = self.client.capa()
362 self.assertTrue('IMPLEMENTATION' in capa.keys())
363
364 def test_quit(self):
365 resp = self.client.quit()
366 self.assertTrue(resp)
367 self.assertIsNone(self.client.sock)
368 self.assertIsNone(self.client.file)
369
370 @requires_ssl
371 def test_stls_capa(self):
372 capa = self.client.capa()
373 self.assertTrue('STLS' in capa.keys())
374
375 @requires_ssl
376 def test_stls(self):
377 expected = b'+OK Begin TLS negotiation'
378 resp = self.client.stls()
379 self.assertEqual(resp, expected)
380
381 @requires_ssl
382 def test_stls_context(self):
383 expected = b'+OK Begin TLS negotiation'
384 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
385 ctx.load_verify_locations(CAFILE)
386 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
387 self.assertEqual(ctx.check_hostname, True)
388 with self.assertRaises(ssl.CertificateError):
389 resp = self.client.stls(context=ctx)
390 self.client = poplib.POP3("localhost", self.server.port,
391 timeout=test_support.LOOPBACK_TIMEOUT)
392 resp = self.client.stls(context=ctx)
393 self.assertEqual(resp, expected)
394
395
396 if SUPPORTS_SSL:
397 from test.test_ftplib import SSLConnection
398
399 class ESC[4;38;5;81mDummyPOP3_SSLHandler(ESC[4;38;5;149mSSLConnection, ESC[4;38;5;149mDummyPOP3Handler):
400
401 def __init__(self, conn):
402 asynchat.async_chat.__init__(self, conn)
403 self.secure_connection()
404 self.set_terminator(b"\r\n")
405 self.in_buffer = []
406 self.push('+OK dummy pop3 server ready. <timestamp>')
407 self.tls_active = True
408 self.tls_starting = False
409
410
411 @requires_ssl
412 class ESC[4;38;5;81mTestPOP3_SSLClass(ESC[4;38;5;149mTestPOP3Class):
413 # repeat previous tests by using poplib.POP3_SSL
414
415 def setUp(self):
416 self.server = DummyPOP3Server((HOST, PORT))
417 self.server.handler = DummyPOP3_SSLHandler
418 self.server.start()
419 self.client = poplib.POP3_SSL(self.server.host, self.server.port)
420
421 def test__all__(self):
422 self.assertIn('POP3_SSL', poplib.__all__)
423
424 def test_context(self):
425 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
426 ctx.check_hostname = False
427 ctx.verify_mode = ssl.CERT_NONE
428 self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
429 self.server.port, keyfile=CERTFILE, context=ctx)
430 self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
431 self.server.port, certfile=CERTFILE, context=ctx)
432 self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
433 self.server.port, keyfile=CERTFILE,
434 certfile=CERTFILE, context=ctx)
435
436 self.client.quit()
437 self.client = poplib.POP3_SSL(self.server.host, self.server.port,
438 context=ctx)
439 self.assertIsInstance(self.client.sock, ssl.SSLSocket)
440 self.assertIs(self.client.sock.context, ctx)
441 self.assertTrue(self.client.noop().startswith(b'+OK'))
442
443 def test_stls(self):
444 self.assertRaises(poplib.error_proto, self.client.stls)
445
446 test_stls_context = test_stls
447
448 def test_stls_capa(self):
449 capa = self.client.capa()
450 self.assertFalse('STLS' in capa.keys())
451
452
453 @requires_ssl
454 class ESC[4;38;5;81mTestPOP3_TLSClass(ESC[4;38;5;149mTestPOP3Class):
455 # repeat previous tests by using poplib.POP3.stls()
456
457 def setUp(self):
458 self.server = DummyPOP3Server((HOST, PORT))
459 self.server.start()
460 self.client = poplib.POP3(self.server.host, self.server.port,
461 timeout=test_support.LOOPBACK_TIMEOUT)
462 self.client.stls()
463
464 def tearDown(self):
465 if self.client.file is not None and self.client.sock is not None:
466 try:
467 self.client.quit()
468 except poplib.error_proto:
469 # happens in the test_too_long_lines case; the overlong
470 # response will be treated as response to QUIT and raise
471 # this exception
472 self.client.close()
473 self.server.stop()
474 # Explicitly clear the attribute to prevent dangling thread
475 self.server = None
476
477 def test_stls(self):
478 self.assertRaises(poplib.error_proto, self.client.stls)
479
480 test_stls_context = test_stls
481
482 def test_stls_capa(self):
483 capa = self.client.capa()
484 self.assertFalse(b'STLS' in capa.keys())
485
486
487 class ESC[4;38;5;81mTestTimeouts(ESC[4;38;5;149mTestCase):
488
489 def setUp(self):
490 self.evt = threading.Event()
491 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
492 self.sock.settimeout(60) # Safety net. Look issue 11812
493 self.port = socket_helper.bind_port(self.sock)
494 self.thread = threading.Thread(target=self.server, args=(self.evt, self.sock))
495 self.thread.daemon = True
496 self.thread.start()
497 self.evt.wait()
498
499 def tearDown(self):
500 self.thread.join()
501 # Explicitly clear the attribute to prevent dangling thread
502 self.thread = None
503
504 def server(self, evt, serv):
505 serv.listen()
506 evt.set()
507 try:
508 conn, addr = serv.accept()
509 conn.send(b"+ Hola mundo\n")
510 conn.close()
511 except TimeoutError:
512 pass
513 finally:
514 serv.close()
515
516 def testTimeoutDefault(self):
517 self.assertIsNone(socket.getdefaulttimeout())
518 socket.setdefaulttimeout(test_support.LOOPBACK_TIMEOUT)
519 try:
520 pop = poplib.POP3(HOST, self.port)
521 finally:
522 socket.setdefaulttimeout(None)
523 self.assertEqual(pop.sock.gettimeout(), test_support.LOOPBACK_TIMEOUT)
524 pop.close()
525
526 def testTimeoutNone(self):
527 self.assertIsNone(socket.getdefaulttimeout())
528 socket.setdefaulttimeout(30)
529 try:
530 pop = poplib.POP3(HOST, self.port, timeout=None)
531 finally:
532 socket.setdefaulttimeout(None)
533 self.assertIsNone(pop.sock.gettimeout())
534 pop.close()
535
536 def testTimeoutValue(self):
537 pop = poplib.POP3(HOST, self.port, timeout=test_support.LOOPBACK_TIMEOUT)
538 self.assertEqual(pop.sock.gettimeout(), test_support.LOOPBACK_TIMEOUT)
539 pop.close()
540 with self.assertRaises(ValueError):
541 poplib.POP3(HOST, self.port, timeout=0)
542
543
544 def setUpModule():
545 thread_info = threading_helper.threading_setup()
546 unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
547
548
549 if __name__ == '__main__':
550 unittest.main()