1 """Test script for poplib module."""
2
3 # Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL
4 # a real test suite
5
6 import poplib
7 import socket
8 import os
9 import errno
10 import threading
11
12 import unittest
13 from unittest import TestCase, skipUnless
14 from test import support as test_support
15 from test.support import hashlib_helper
16 from test.support import socket_helper
17 from test.support import threading_helper
18 from test.support import asynchat
19 from test.support import asyncore
20
21
22 test_support.requires_working_socket(module=True)
23
24 HOST = socket_helper.HOST
25 PORT = 0
26
27 SUPPORTS_SSL = False
28 if hasattr(poplib, 'POP3_SSL'):
29 import ssl
30
31 SUPPORTS_SSL = True
32 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem")
33 CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem")
34
35 requires_ssl = skipUnless(SUPPORTS_SSL, 'SSL not supported')
36
37 # the dummy data returned by server when LIST and RETR commands are issued
38 LIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n'
39 RETR_RESP = b"""From: postmaster@python.org\
40 \r\nContent-Type: text/plain\r\n\
41 MIME-Version: 1.0\r\n\
42 Subject: Dummy\r\n\
43 \r\n\
44 line1\r\n\
45 line2\r\n\
46 line3\r\n\
47 .\r\n"""
48
49
50 class ESC[4;38;5;81mDummyPOP3Handler(ESC[4;38;5;149masynchatESC[4;38;5;149m.ESC[4;38;5;149masync_chat):
51
52 CAPAS = {'UIDL': [], 'IMPLEMENTATION': ['python-testlib-pop-server']}
53 enable_UTF8 = False
54
55 def __init__(self, conn):
56 asynchat.async_chat.__init__(self, conn)
57 self.set_terminator(b"\r\n")
58 self.in_buffer = []
59 self.push('+OK dummy pop3 server ready. <timestamp>')
60 self.tls_active = False
61 self.tls_starting = False
62
63 def collect_incoming_data(self, data):
64 self.in_buffer.append(data)
65
66 def found_terminator(self):
67 line = b''.join(self.in_buffer)
68 line = str(line, 'ISO-8859-1')
69 self.in_buffer = []
70 cmd = line.split(' ')[0].lower()
71 space = line.find(' ')
72 if space != -1:
73 arg = line[space + 1:]
74 else:
75 arg = ""
76 if hasattr(self, 'cmd_' + cmd):
77 method = getattr(self, 'cmd_' + cmd)
78 method(arg)
79 else:
80 self.push('-ERR unrecognized POP3 command "%s".' %cmd)
81
82 def handle_error(self):
83 raise
84
85 def push(self, data):
86 asynchat.async_chat.push(self, data.encode("ISO-8859-1") + b'\r\n')
87
88 def cmd_echo(self, arg):
89 # sends back the received string (used by the test suite)
90 self.push(arg)
91
92 def cmd_user(self, arg):
93 if arg != "guido":
94 self.push("-ERR no such user")
95 self.push('+OK password required')
96
97 def cmd_pass(self, arg):
98 if arg != "python":
99 self.push("-ERR wrong password")
100 self.push('+OK 10 messages')
101
102 def cmd_stat(self, arg):
103 self.push('+OK 10 100')
104
105 def cmd_list(self, arg):
106 if arg:
107 self.push('+OK %s %s' % (arg, arg))
108 else:
109 self.push('+OK')
110 asynchat.async_chat.push(self, LIST_RESP)
111
112 cmd_uidl = cmd_list
113
114 def cmd_retr(self, arg):
115 self.push('+OK %s bytes' %len(RETR_RESP))
116 asynchat.async_chat.push(self, RETR_RESP)
117
118 cmd_top = cmd_retr
119
120 def cmd_dele(self, arg):
121 self.push('+OK message marked for deletion.')
122
123 def cmd_noop(self, arg):
124 self.push('+OK done nothing.')
125
126 def cmd_rpop(self, arg):
127 self.push('+OK done nothing.')
128
129 def cmd_apop(self, arg):
130 self.push('+OK done nothing.')
131
132 def cmd_quit(self, arg):
133 self.push('+OK closing.')
134 self.close_when_done()
135
136 def _get_capas(self):
137 _capas = dict(self.CAPAS)
138 if not self.tls_active and SUPPORTS_SSL:
139 _capas['STLS'] = []
140 return _capas
141
142 def cmd_capa(self, arg):
143 self.push('+OK Capability list follows')
144 if self._get_capas():
145 for cap, params in self._get_capas().items():
146 _ln = [cap]
147 if params:
148 _ln.extend(params)
149 self.push(' '.join(_ln))
150 self.push('.')
151
152 def cmd_utf8(self, arg):
153 self.push('+OK I know RFC6856'
154 if self.enable_UTF8
155 else '-ERR What is UTF8?!')
156
157 if SUPPORTS_SSL:
158
159 def cmd_stls(self, arg):
160 if self.tls_active is False:
161 self.push('+OK Begin TLS negotiation')
162 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
163 context.load_cert_chain(CERTFILE)
164 tls_sock = context.wrap_socket(self.socket,
165 server_side=True,
166 do_handshake_on_connect=False,
167 suppress_ragged_eofs=False)
168 self.del_channel()
169 self.set_socket(tls_sock)
170 self.tls_active = True
171 self.tls_starting = True
172 self.in_buffer = []
173 self._do_tls_handshake()
174 else:
175 self.push('-ERR Command not permitted when TLS active')
176
177 def _do_tls_handshake(self):
178 try:
179 self.socket.do_handshake()
180 except ssl.SSLError as err:
181 if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
182 ssl.SSL_ERROR_WANT_WRITE):
183 return
184 elif err.args[0] == ssl.SSL_ERROR_EOF:
185 return self.handle_close()
186 # TODO: SSLError does not expose alert information
187 elif ("SSLV3_ALERT_BAD_CERTIFICATE" in err.args[1] or
188 "SSLV3_ALERT_CERTIFICATE_UNKNOWN" in err.args[1]):
189 return self.handle_close()
190 raise
191 except OSError as err:
192 if err.args[0] == errno.ECONNABORTED:
193 return self.handle_close()
194 else:
195 self.tls_active = True
196 self.tls_starting = False
197
198 def handle_read(self):
199 if self.tls_starting:
200 self._do_tls_handshake()
201 else:
202 try:
203 asynchat.async_chat.handle_read(self)
204 except ssl.SSLEOFError:
205 self.handle_close()
206
207 class ESC[4;38;5;81mDummyPOP3Server(ESC[4;38;5;149masyncoreESC[4;38;5;149m.ESC[4;38;5;149mdispatcher, ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
208
209 handler = DummyPOP3Handler
210
211 def __init__(self, address, af=socket.AF_INET):
212 threading.Thread.__init__(self)
213 asyncore.dispatcher.__init__(self)
214 self.daemon = True
215 self.create_socket(af, socket.SOCK_STREAM)
216 self.bind(address)
217 self.listen(5)
218 self.active = False
219 self.active_lock = threading.Lock()
220 self.host, self.port = self.socket.getsockname()[:2]
221 self.handler_instance = None
222
223 def start(self):
224 assert not self.active
225 self.__flag = threading.Event()
226 threading.Thread.start(self)
227 self.__flag.wait()
228
229 def run(self):
230 self.active = True
231 self.__flag.set()
232 try:
233 while self.active and asyncore.socket_map:
234 with self.active_lock:
235 asyncore.loop(timeout=0.1, count=1)
236 finally:
237 asyncore.close_all(ignore_all=True)
238
239 def stop(self):
240 assert self.active
241 self.active = False
242 self.join()
243
244 def handle_accepted(self, conn, addr):
245 self.handler_instance = self.handler(conn)
246
247 def handle_connect(self):
248 self.close()
249 handle_read = handle_connect
250
251 def writable(self):
252 return 0
253
254 def handle_error(self):
255 raise
256
257
258 class ESC[4;38;5;81mTestPOP3Class(ESC[4;38;5;149mTestCase):
259 def assertOK(self, resp):
260 self.assertTrue(resp.startswith(b"+OK"))
261
262 def setUp(self):
263 self.server = DummyPOP3Server((HOST, PORT))
264 self.server.start()
265 self.client = poplib.POP3(self.server.host, self.server.port,
266 timeout=test_support.LOOPBACK_TIMEOUT)
267
268 def tearDown(self):
269 self.client.close()
270 self.server.stop()
271 # Explicitly clear the attribute to prevent dangling thread
272 self.server = None
273
274 def test_getwelcome(self):
275 self.assertEqual(self.client.getwelcome(),
276 b'+OK dummy pop3 server ready. <timestamp>')
277
278 def test_exceptions(self):
279 self.assertRaises(poplib.error_proto, self.client._shortcmd, 'echo -err')
280
281 def test_user(self):
282 self.assertOK(self.client.user('guido'))
283 self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
284
285 def test_pass_(self):
286 self.assertOK(self.client.pass_('python'))
287 self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
288
289 def test_stat(self):
290 self.assertEqual(self.client.stat(), (10, 100))
291
292 def test_list(self):
293 self.assertEqual(self.client.list()[1:],
294 ([b'1 1', b'2 2', b'3 3', b'4 4', b'5 5'],
295 25))
296 self.assertTrue(self.client.list('1').endswith(b"OK 1 1"))
297
298 def test_retr(self):
299 expected = (b'+OK 116 bytes',
300 [b'From: postmaster@python.org', b'Content-Type: text/plain',
301 b'MIME-Version: 1.0', b'Subject: Dummy',
302 b'', b'line1', b'line2', b'line3'],
303 113)
304 foo = self.client.retr('foo')
305 self.assertEqual(foo, expected)
306
307 def test_too_long_lines(self):
308 self.assertRaises(poplib.error_proto, self.client._shortcmd,
309 'echo +%s' % ((poplib._MAXLINE + 10) * 'a'))
310
311 def test_dele(self):
312 self.assertOK(self.client.dele('foo'))
313
314 def test_noop(self):
315 self.assertOK(self.client.noop())
316
317 def test_rpop(self):
318 self.assertOK(self.client.rpop('foo'))
319
320 @hashlib_helper.requires_hashdigest('md5', openssl=True)
321 def test_apop_normal(self):
322 self.assertOK(self.client.apop('foo', 'dummypassword'))
323
324 @hashlib_helper.requires_hashdigest('md5', openssl=True)
325 def test_apop_REDOS(self):
326 # Replace welcome with very long evil welcome.
327 # NB The upper bound on welcome length is currently 2048.
328 # At this length, evil input makes each apop call take
329 # on the order of milliseconds instead of microseconds.
330 evil_welcome = b'+OK' + (b'<' * 1000000)
331 with test_support.swap_attr(self.client, 'welcome', evil_welcome):
332 # The evil welcome is invalid, so apop should throw.
333 self.assertRaises(poplib.error_proto, self.client.apop, 'a', 'kb')
334
335 def test_top(self):
336 expected = (b'+OK 116 bytes',
337 [b'From: postmaster@python.org', b'Content-Type: text/plain',
338 b'MIME-Version: 1.0', b'Subject: Dummy', b'',
339 b'line1', b'line2', b'line3'],
340 113)
341 self.assertEqual(self.client.top(1, 1), expected)
342
343 def test_uidl(self):
344 self.client.uidl()
345 self.client.uidl('foo')
346
347 def test_utf8_raises_if_unsupported(self):
348 self.server.handler.enable_UTF8 = False
349 self.assertRaises(poplib.error_proto, self.client.utf8)
350
351 def test_utf8(self):
352 self.server.handler.enable_UTF8 = True
353 expected = b'+OK I know RFC6856'
354 result = self.client.utf8()
355 self.assertEqual(result, expected)
356
357 def test_capa(self):
358 capa = self.client.capa()
359 self.assertTrue('IMPLEMENTATION' in capa.keys())
360
361 def test_quit(self):
362 resp = self.client.quit()
363 self.assertTrue(resp)
364 self.assertIsNone(self.client.sock)
365 self.assertIsNone(self.client.file)
366
367 @requires_ssl
368 def test_stls_capa(self):
369 capa = self.client.capa()
370 self.assertTrue('STLS' in capa.keys())
371
372 @requires_ssl
373 def test_stls(self):
374 expected = b'+OK Begin TLS negotiation'
375 resp = self.client.stls()
376 self.assertEqual(resp, expected)
377
378 @requires_ssl
379 def test_stls_context(self):
380 expected = b'+OK Begin TLS negotiation'
381 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
382 ctx.load_verify_locations(CAFILE)
383 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
384 self.assertEqual(ctx.check_hostname, True)
385 with self.assertRaises(ssl.CertificateError):
386 resp = self.client.stls(context=ctx)
387 self.client = poplib.POP3("localhost", self.server.port,
388 timeout=test_support.LOOPBACK_TIMEOUT)
389 resp = self.client.stls(context=ctx)
390 self.assertEqual(resp, expected)
391
392
393 if SUPPORTS_SSL:
394 from test.test_ftplib import SSLConnection
395
396 class ESC[4;38;5;81mDummyPOP3_SSLHandler(ESC[4;38;5;149mSSLConnection, ESC[4;38;5;149mDummyPOP3Handler):
397
398 def __init__(self, conn):
399 asynchat.async_chat.__init__(self, conn)
400 self.secure_connection()
401 self.set_terminator(b"\r\n")
402 self.in_buffer = []
403 self.push('+OK dummy pop3 server ready. <timestamp>')
404 self.tls_active = True
405 self.tls_starting = False
406
407
408 @requires_ssl
409 class ESC[4;38;5;81mTestPOP3_SSLClass(ESC[4;38;5;149mTestPOP3Class):
410 # repeat previous tests by using poplib.POP3_SSL
411
412 def setUp(self):
413 self.server = DummyPOP3Server((HOST, PORT))
414 self.server.handler = DummyPOP3_SSLHandler
415 self.server.start()
416 self.client = poplib.POP3_SSL(self.server.host, self.server.port)
417
418 def test__all__(self):
419 self.assertIn('POP3_SSL', poplib.__all__)
420
421 def test_context(self):
422 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
423 ctx.check_hostname = False
424 ctx.verify_mode = ssl.CERT_NONE
425
426 self.client.quit()
427 self.client = poplib.POP3_SSL(self.server.host, self.server.port,
428 context=ctx)
429 self.assertIsInstance(self.client.sock, ssl.SSLSocket)
430 self.assertIs(self.client.sock.context, ctx)
431 self.assertTrue(self.client.noop().startswith(b'+OK'))
432
433 def test_stls(self):
434 self.assertRaises(poplib.error_proto, self.client.stls)
435
436 test_stls_context = test_stls
437
438 def test_stls_capa(self):
439 capa = self.client.capa()
440 self.assertFalse('STLS' in capa.keys())
441
442
443 @requires_ssl
444 class ESC[4;38;5;81mTestPOP3_TLSClass(ESC[4;38;5;149mTestPOP3Class):
445 # repeat previous tests by using poplib.POP3.stls()
446
447 def setUp(self):
448 self.server = DummyPOP3Server((HOST, PORT))
449 self.server.start()
450 self.client = poplib.POP3(self.server.host, self.server.port,
451 timeout=test_support.LOOPBACK_TIMEOUT)
452 self.client.stls()
453
454 def tearDown(self):
455 if self.client.file is not None and self.client.sock is not None:
456 try:
457 self.client.quit()
458 except poplib.error_proto:
459 # happens in the test_too_long_lines case; the overlong
460 # response will be treated as response to QUIT and raise
461 # this exception
462 self.client.close()
463 self.server.stop()
464 # Explicitly clear the attribute to prevent dangling thread
465 self.server = None
466
467 def test_stls(self):
468 self.assertRaises(poplib.error_proto, self.client.stls)
469
470 test_stls_context = test_stls
471
472 def test_stls_capa(self):
473 capa = self.client.capa()
474 self.assertFalse(b'STLS' in capa.keys())
475
476
477 class ESC[4;38;5;81mTestTimeouts(ESC[4;38;5;149mTestCase):
478
479 def setUp(self):
480 self.evt = threading.Event()
481 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
482 self.sock.settimeout(60) # Safety net. Look issue 11812
483 self.port = socket_helper.bind_port(self.sock)
484 self.thread = threading.Thread(target=self.server, args=(self.evt, self.sock))
485 self.thread.daemon = True
486 self.thread.start()
487 self.evt.wait()
488
489 def tearDown(self):
490 self.thread.join()
491 # Explicitly clear the attribute to prevent dangling thread
492 self.thread = None
493
494 def server(self, evt, serv):
495 serv.listen()
496 evt.set()
497 try:
498 conn, addr = serv.accept()
499 conn.send(b"+ Hola mundo\n")
500 conn.close()
501 except TimeoutError:
502 pass
503 finally:
504 serv.close()
505
506 def testTimeoutDefault(self):
507 self.assertIsNone(socket.getdefaulttimeout())
508 socket.setdefaulttimeout(test_support.LOOPBACK_TIMEOUT)
509 try:
510 pop = poplib.POP3(HOST, self.port)
511 finally:
512 socket.setdefaulttimeout(None)
513 self.assertEqual(pop.sock.gettimeout(), test_support.LOOPBACK_TIMEOUT)
514 pop.close()
515
516 def testTimeoutNone(self):
517 self.assertIsNone(socket.getdefaulttimeout())
518 socket.setdefaulttimeout(30)
519 try:
520 pop = poplib.POP3(HOST, self.port, timeout=None)
521 finally:
522 socket.setdefaulttimeout(None)
523 self.assertIsNone(pop.sock.gettimeout())
524 pop.close()
525
526 def testTimeoutValue(self):
527 pop = poplib.POP3(HOST, self.port, timeout=test_support.LOOPBACK_TIMEOUT)
528 self.assertEqual(pop.sock.gettimeout(), test_support.LOOPBACK_TIMEOUT)
529 pop.close()
530 with self.assertRaises(ValueError):
531 poplib.POP3(HOST, self.port, timeout=0)
532
533
534 def setUpModule():
535 thread_info = threading_helper.threading_setup()
536 unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
537
538
539 if __name__ == '__main__':
540 unittest.main()