python (3.12.0)
1 import base64
2 import email.mime.text
3 from email.message import EmailMessage
4 from email.base64mime import body_encode as encode_base64
5 import email.utils
6 import hashlib
7 import hmac
8 import socket
9 import smtplib
10 import io
11 import re
12 import sys
13 import time
14 import select
15 import errno
16 import textwrap
17 import threading
18
19 import unittest
20 from test import support, mock_socket
21 from test.support import hashlib_helper
22 from test.support import socket_helper
23 from test.support import threading_helper
24 from test.support import asyncore
25 from unittest.mock import Mock
26
27 from . import smtpd
28
29
30 support.requires_working_socket(module=True)
31
32 HOST = socket_helper.HOST
33
34 if sys.platform == 'darwin':
35 # select.poll returns a select.POLLHUP at the end of the tests
36 # on darwin, so just ignore it
37 def handle_expt(self):
38 pass
39 smtpd.SMTPChannel.handle_expt = handle_expt
40
41
42 def server(evt, buf, serv):
43 serv.listen()
44 evt.set()
45 try:
46 conn, addr = serv.accept()
47 except TimeoutError:
48 pass
49 else:
50 n = 500
51 while buf and n > 0:
52 r, w, e = select.select([], [conn], [])
53 if w:
54 sent = conn.send(buf)
55 buf = buf[sent:]
56
57 n -= 1
58
59 conn.close()
60 finally:
61 serv.close()
62 evt.set()
63
64 class ESC[4;38;5;81mGeneralTests:
65
66 def setUp(self):
67 smtplib.socket = mock_socket
68 self.port = 25
69
70 def tearDown(self):
71 smtplib.socket = socket
72
73 # This method is no longer used but is retained for backward compatibility,
74 # so test to make sure it still works.
75 def testQuoteData(self):
76 teststr = "abc\n.jkl\rfoo\r\n..blue"
77 expected = "abc\r\n..jkl\r\nfoo\r\n...blue"
78 self.assertEqual(expected, smtplib.quotedata(teststr))
79
80 def testBasic1(self):
81 mock_socket.reply_with(b"220 Hola mundo")
82 # connects
83 client = self.client(HOST, self.port)
84 client.close()
85
86 def testSourceAddress(self):
87 mock_socket.reply_with(b"220 Hola mundo")
88 # connects
89 client = self.client(HOST, self.port,
90 source_address=('127.0.0.1',19876))
91 self.assertEqual(client.source_address, ('127.0.0.1', 19876))
92 client.close()
93
94 def testBasic2(self):
95 mock_socket.reply_with(b"220 Hola mundo")
96 # connects, include port in host name
97 client = self.client("%s:%s" % (HOST, self.port))
98 client.close()
99
100 def testLocalHostName(self):
101 mock_socket.reply_with(b"220 Hola mundo")
102 # check that supplied local_hostname is used
103 client = self.client(HOST, self.port, local_hostname="testhost")
104 self.assertEqual(client.local_hostname, "testhost")
105 client.close()
106
107 def testTimeoutDefault(self):
108 mock_socket.reply_with(b"220 Hola mundo")
109 self.assertIsNone(mock_socket.getdefaulttimeout())
110 mock_socket.setdefaulttimeout(30)
111 self.assertEqual(mock_socket.getdefaulttimeout(), 30)
112 try:
113 client = self.client(HOST, self.port)
114 finally:
115 mock_socket.setdefaulttimeout(None)
116 self.assertEqual(client.sock.gettimeout(), 30)
117 client.close()
118
119 def testTimeoutNone(self):
120 mock_socket.reply_with(b"220 Hola mundo")
121 self.assertIsNone(socket.getdefaulttimeout())
122 socket.setdefaulttimeout(30)
123 try:
124 client = self.client(HOST, self.port, timeout=None)
125 finally:
126 socket.setdefaulttimeout(None)
127 self.assertIsNone(client.sock.gettimeout())
128 client.close()
129
130 def testTimeoutZero(self):
131 mock_socket.reply_with(b"220 Hola mundo")
132 with self.assertRaises(ValueError):
133 self.client(HOST, self.port, timeout=0)
134
135 def testTimeoutValue(self):
136 mock_socket.reply_with(b"220 Hola mundo")
137 client = self.client(HOST, self.port, timeout=30)
138 self.assertEqual(client.sock.gettimeout(), 30)
139 client.close()
140
141 def test_debuglevel(self):
142 mock_socket.reply_with(b"220 Hello world")
143 client = self.client()
144 client.set_debuglevel(1)
145 with support.captured_stderr() as stderr:
146 client.connect(HOST, self.port)
147 client.close()
148 expected = re.compile(r"^connect:", re.MULTILINE)
149 self.assertRegex(stderr.getvalue(), expected)
150
151 def test_debuglevel_2(self):
152 mock_socket.reply_with(b"220 Hello world")
153 client = self.client()
154 client.set_debuglevel(2)
155 with support.captured_stderr() as stderr:
156 client.connect(HOST, self.port)
157 client.close()
158 expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ",
159 re.MULTILINE)
160 self.assertRegex(stderr.getvalue(), expected)
161
162
163 class ESC[4;38;5;81mSMTPGeneralTests(ESC[4;38;5;149mGeneralTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
164
165 client = smtplib.SMTP
166
167
168 class ESC[4;38;5;81mLMTPGeneralTests(ESC[4;38;5;149mGeneralTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
169
170 client = smtplib.LMTP
171
172 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), "test requires Unix domain socket")
173 def testUnixDomainSocketTimeoutDefault(self):
174 local_host = '/some/local/lmtp/delivery/program'
175 mock_socket.reply_with(b"220 Hello world")
176 try:
177 client = self.client(local_host, self.port)
178 finally:
179 mock_socket.setdefaulttimeout(None)
180 self.assertIsNone(client.sock.gettimeout())
181 client.close()
182
183 def testTimeoutZero(self):
184 super().testTimeoutZero()
185 local_host = '/some/local/lmtp/delivery/program'
186 with self.assertRaises(ValueError):
187 self.client(local_host, timeout=0)
188
189 # Test server thread using the specified SMTP server class
190 def debugging_server(serv, serv_evt, client_evt):
191 serv_evt.set()
192
193 try:
194 if hasattr(select, 'poll'):
195 poll_fun = asyncore.poll2
196 else:
197 poll_fun = asyncore.poll
198
199 n = 1000
200 while asyncore.socket_map and n > 0:
201 poll_fun(0.01, asyncore.socket_map)
202
203 # when the client conversation is finished, it will
204 # set client_evt, and it's then ok to kill the server
205 if client_evt.is_set():
206 serv.close()
207 break
208
209 n -= 1
210
211 except TimeoutError:
212 pass
213 finally:
214 if not client_evt.is_set():
215 # allow some time for the client to read the result
216 time.sleep(0.5)
217 serv.close()
218 asyncore.close_all()
219 serv_evt.set()
220
221 MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
222 MSG_END = '------------ END MESSAGE ------------\n'
223
224 # NOTE: Some SMTP objects in the tests below are created with a non-default
225 # local_hostname argument to the constructor, since (on some systems) the FQDN
226 # lookup caused by the default local_hostname sometimes takes so long that the
227 # test server times out, causing the test to fail.
228
229 # Test behavior of smtpd.DebuggingServer
230 class ESC[4;38;5;81mDebuggingServerTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
231
232 maxDiff = None
233
234 def setUp(self):
235 self.thread_key = threading_helper.threading_setup()
236 self.real_getfqdn = socket.getfqdn
237 socket.getfqdn = mock_socket.getfqdn
238 # temporarily replace sys.stdout to capture DebuggingServer output
239 self.old_stdout = sys.stdout
240 self.output = io.StringIO()
241 sys.stdout = self.output
242
243 self.serv_evt = threading.Event()
244 self.client_evt = threading.Event()
245 # Capture SMTPChannel debug output
246 self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
247 smtpd.DEBUGSTREAM = io.StringIO()
248 # Pick a random unused port by passing 0 for the port number
249 self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1),
250 decode_data=True)
251 # Keep a note of what server host and port were assigned
252 self.host, self.port = self.serv.socket.getsockname()[:2]
253 serv_args = (self.serv, self.serv_evt, self.client_evt)
254 self.thread = threading.Thread(target=debugging_server, args=serv_args)
255 self.thread.start()
256
257 # wait until server thread has assigned a port number
258 self.serv_evt.wait()
259 self.serv_evt.clear()
260
261 def tearDown(self):
262 socket.getfqdn = self.real_getfqdn
263 # indicate that the client is finished
264 self.client_evt.set()
265 # wait for the server thread to terminate
266 self.serv_evt.wait()
267 threading_helper.join_thread(self.thread)
268 # restore sys.stdout
269 sys.stdout = self.old_stdout
270 # restore DEBUGSTREAM
271 smtpd.DEBUGSTREAM.close()
272 smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
273 del self.thread
274 self.doCleanups()
275 threading_helper.threading_cleanup(*self.thread_key)
276
277 def get_output_without_xpeer(self):
278 test_output = self.output.getvalue()
279 return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2',
280 test_output, flags=re.MULTILINE|re.DOTALL)
281
282 def testBasic(self):
283 # connect
284 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
285 timeout=support.LOOPBACK_TIMEOUT)
286 smtp.quit()
287
288 def testSourceAddress(self):
289 # connect
290 src_port = socket_helper.find_unused_port()
291 try:
292 smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost',
293 timeout=support.LOOPBACK_TIMEOUT,
294 source_address=(self.host, src_port))
295 self.addCleanup(smtp.close)
296 self.assertEqual(smtp.source_address, (self.host, src_port))
297 self.assertEqual(smtp.local_hostname, 'localhost')
298 smtp.quit()
299 except OSError as e:
300 if e.errno == errno.EADDRINUSE:
301 self.skipTest("couldn't bind to source port %d" % src_port)
302 raise
303
304 def testNOOP(self):
305 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
306 timeout=support.LOOPBACK_TIMEOUT)
307 self.addCleanup(smtp.close)
308 expected = (250, b'OK')
309 self.assertEqual(smtp.noop(), expected)
310 smtp.quit()
311
312 def testRSET(self):
313 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
314 timeout=support.LOOPBACK_TIMEOUT)
315 self.addCleanup(smtp.close)
316 expected = (250, b'OK')
317 self.assertEqual(smtp.rset(), expected)
318 smtp.quit()
319
320 def testELHO(self):
321 # EHLO isn't implemented in DebuggingServer
322 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
323 timeout=support.LOOPBACK_TIMEOUT)
324 self.addCleanup(smtp.close)
325 expected = (250, b'\nSIZE 33554432\nHELP')
326 self.assertEqual(smtp.ehlo(), expected)
327 smtp.quit()
328
329 def testEXPNNotImplemented(self):
330 # EXPN isn't implemented in DebuggingServer
331 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
332 timeout=support.LOOPBACK_TIMEOUT)
333 self.addCleanup(smtp.close)
334 expected = (502, b'EXPN not implemented')
335 smtp.putcmd('EXPN')
336 self.assertEqual(smtp.getreply(), expected)
337 smtp.quit()
338
339 def test_issue43124_putcmd_escapes_newline(self):
340 # see: https://bugs.python.org/issue43124
341 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
342 timeout=support.LOOPBACK_TIMEOUT)
343 self.addCleanup(smtp.close)
344 with self.assertRaises(ValueError) as exc:
345 smtp.putcmd('helo\nX-INJECTED')
346 self.assertIn("prohibited newline characters", str(exc.exception))
347 smtp.quit()
348
349 def testVRFY(self):
350 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
351 timeout=support.LOOPBACK_TIMEOUT)
352 self.addCleanup(smtp.close)
353 expected = (252, b'Cannot VRFY user, but will accept message ' + \
354 b'and attempt delivery')
355 self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
356 self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
357 smtp.quit()
358
359 def testSecondHELO(self):
360 # check that a second HELO returns a message that it's a duplicate
361 # (this behavior is specific to smtpd.SMTPChannel)
362 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
363 timeout=support.LOOPBACK_TIMEOUT)
364 self.addCleanup(smtp.close)
365 smtp.helo()
366 expected = (503, b'Duplicate HELO/EHLO')
367 self.assertEqual(smtp.helo(), expected)
368 smtp.quit()
369
370 def testHELP(self):
371 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
372 timeout=support.LOOPBACK_TIMEOUT)
373 self.addCleanup(smtp.close)
374 self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \
375 b'RCPT DATA RSET NOOP QUIT VRFY')
376 smtp.quit()
377
378 def testSend(self):
379 # connect and send mail
380 m = 'A test message'
381 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
382 timeout=support.LOOPBACK_TIMEOUT)
383 self.addCleanup(smtp.close)
384 smtp.sendmail('John', 'Sally', m)
385 # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
386 # in asyncore. This sleep might help, but should really be fixed
387 # properly by using an Event variable.
388 time.sleep(0.01)
389 smtp.quit()
390
391 self.client_evt.set()
392 self.serv_evt.wait()
393 self.output.flush()
394 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
395 self.assertEqual(self.output.getvalue(), mexpect)
396
397 def testSendBinary(self):
398 m = b'A test message'
399 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
400 timeout=support.LOOPBACK_TIMEOUT)
401 self.addCleanup(smtp.close)
402 smtp.sendmail('John', 'Sally', m)
403 # XXX (see comment in testSend)
404 time.sleep(0.01)
405 smtp.quit()
406
407 self.client_evt.set()
408 self.serv_evt.wait()
409 self.output.flush()
410 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END)
411 self.assertEqual(self.output.getvalue(), mexpect)
412
413 def testSendNeedingDotQuote(self):
414 # Issue 12283
415 m = '.A test\n.mes.sage.'
416 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
417 timeout=support.LOOPBACK_TIMEOUT)
418 self.addCleanup(smtp.close)
419 smtp.sendmail('John', 'Sally', m)
420 # XXX (see comment in testSend)
421 time.sleep(0.01)
422 smtp.quit()
423
424 self.client_evt.set()
425 self.serv_evt.wait()
426 self.output.flush()
427 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
428 self.assertEqual(self.output.getvalue(), mexpect)
429
430 def test_issue43124_escape_localhostname(self):
431 # see: https://bugs.python.org/issue43124
432 # connect and send mail
433 m = 'wazzuuup\nlinetwo'
434 smtp = smtplib.SMTP(HOST, self.port, local_hostname='hi\nX-INJECTED',
435 timeout=support.LOOPBACK_TIMEOUT)
436 self.addCleanup(smtp.close)
437 with self.assertRaises(ValueError) as exc:
438 smtp.sendmail("hi@me.com", "you@me.com", m)
439 self.assertIn(
440 "prohibited newline characters: ehlo hi\\nX-INJECTED",
441 str(exc.exception),
442 )
443 # XXX (see comment in testSend)
444 time.sleep(0.01)
445 smtp.quit()
446
447 debugout = smtpd.DEBUGSTREAM.getvalue()
448 self.assertNotIn("X-INJECTED", debugout)
449
450 def test_issue43124_escape_options(self):
451 # see: https://bugs.python.org/issue43124
452 # connect and send mail
453 m = 'wazzuuup\nlinetwo'
454 smtp = smtplib.SMTP(
455 HOST, self.port, local_hostname='localhost',
456 timeout=support.LOOPBACK_TIMEOUT)
457
458 self.addCleanup(smtp.close)
459 smtp.sendmail("hi@me.com", "you@me.com", m)
460 with self.assertRaises(ValueError) as exc:
461 smtp.mail("hi@me.com", ["X-OPTION\nX-INJECTED-1", "X-OPTION2\nX-INJECTED-2"])
462 msg = str(exc.exception)
463 self.assertIn("prohibited newline characters", msg)
464 self.assertIn("X-OPTION\\nX-INJECTED-1 X-OPTION2\\nX-INJECTED-2", msg)
465 # XXX (see comment in testSend)
466 time.sleep(0.01)
467 smtp.quit()
468
469 debugout = smtpd.DEBUGSTREAM.getvalue()
470 self.assertNotIn("X-OPTION", debugout)
471 self.assertNotIn("X-OPTION2", debugout)
472 self.assertNotIn("X-INJECTED-1", debugout)
473 self.assertNotIn("X-INJECTED-2", debugout)
474
475 def testSendNullSender(self):
476 m = 'A test message'
477 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
478 timeout=support.LOOPBACK_TIMEOUT)
479 self.addCleanup(smtp.close)
480 smtp.sendmail('<>', 'Sally', m)
481 # XXX (see comment in testSend)
482 time.sleep(0.01)
483 smtp.quit()
484
485 self.client_evt.set()
486 self.serv_evt.wait()
487 self.output.flush()
488 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
489 self.assertEqual(self.output.getvalue(), mexpect)
490 debugout = smtpd.DEBUGSTREAM.getvalue()
491 sender = re.compile("^sender: <>$", re.MULTILINE)
492 self.assertRegex(debugout, sender)
493
494 def testSendMessage(self):
495 m = email.mime.text.MIMEText('A test message')
496 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
497 timeout=support.LOOPBACK_TIMEOUT)
498 self.addCleanup(smtp.close)
499 smtp.send_message(m, from_addr='John', to_addrs='Sally')
500 # XXX (see comment in testSend)
501 time.sleep(0.01)
502 smtp.quit()
503
504 self.client_evt.set()
505 self.serv_evt.wait()
506 self.output.flush()
507 # Remove the X-Peer header that DebuggingServer adds as figuring out
508 # exactly what IP address format is put there is not easy (and
509 # irrelevant to our test). Typically 127.0.0.1 or ::1, but it is
510 # not always the same as socket.gethostbyname(HOST). :(
511 test_output = self.get_output_without_xpeer()
512 del m['X-Peer']
513 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
514 self.assertEqual(test_output, mexpect)
515
516 def testSendMessageWithAddresses(self):
517 m = email.mime.text.MIMEText('A test message')
518 m['From'] = 'foo@bar.com'
519 m['To'] = 'John'
520 m['CC'] = 'Sally, Fred'
521 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
522 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
523 timeout=support.LOOPBACK_TIMEOUT)
524 self.addCleanup(smtp.close)
525 smtp.send_message(m)
526 # XXX (see comment in testSend)
527 time.sleep(0.01)
528 smtp.quit()
529 # make sure the Bcc header is still in the message.
530 self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" '
531 '<warped@silly.walks.com>')
532
533 self.client_evt.set()
534 self.serv_evt.wait()
535 self.output.flush()
536 # Remove the X-Peer header that DebuggingServer adds.
537 test_output = self.get_output_without_xpeer()
538 del m['X-Peer']
539 # The Bcc header should not be transmitted.
540 del m['Bcc']
541 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
542 self.assertEqual(test_output, mexpect)
543 debugout = smtpd.DEBUGSTREAM.getvalue()
544 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
545 self.assertRegex(debugout, sender)
546 for addr in ('John', 'Sally', 'Fred', 'root@localhost',
547 'warped@silly.walks.com'):
548 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
549 re.MULTILINE)
550 self.assertRegex(debugout, to_addr)
551
552 def testSendMessageWithSomeAddresses(self):
553 # Make sure nothing breaks if not all of the three 'to' headers exist
554 m = email.mime.text.MIMEText('A test message')
555 m['From'] = 'foo@bar.com'
556 m['To'] = 'John, Dinsdale'
557 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
558 timeout=support.LOOPBACK_TIMEOUT)
559 self.addCleanup(smtp.close)
560 smtp.send_message(m)
561 # XXX (see comment in testSend)
562 time.sleep(0.01)
563 smtp.quit()
564
565 self.client_evt.set()
566 self.serv_evt.wait()
567 self.output.flush()
568 # Remove the X-Peer header that DebuggingServer adds.
569 test_output = self.get_output_without_xpeer()
570 del m['X-Peer']
571 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
572 self.assertEqual(test_output, mexpect)
573 debugout = smtpd.DEBUGSTREAM.getvalue()
574 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
575 self.assertRegex(debugout, sender)
576 for addr in ('John', 'Dinsdale'):
577 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
578 re.MULTILINE)
579 self.assertRegex(debugout, to_addr)
580
581 def testSendMessageWithSpecifiedAddresses(self):
582 # Make sure addresses specified in call override those in message.
583 m = email.mime.text.MIMEText('A test message')
584 m['From'] = 'foo@bar.com'
585 m['To'] = 'John, Dinsdale'
586 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
587 timeout=support.LOOPBACK_TIMEOUT)
588 self.addCleanup(smtp.close)
589 smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net')
590 # XXX (see comment in testSend)
591 time.sleep(0.01)
592 smtp.quit()
593
594 self.client_evt.set()
595 self.serv_evt.wait()
596 self.output.flush()
597 # Remove the X-Peer header that DebuggingServer adds.
598 test_output = self.get_output_without_xpeer()
599 del m['X-Peer']
600 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
601 self.assertEqual(test_output, mexpect)
602 debugout = smtpd.DEBUGSTREAM.getvalue()
603 sender = re.compile("^sender: joe@example.com$", re.MULTILINE)
604 self.assertRegex(debugout, sender)
605 for addr in ('John', 'Dinsdale'):
606 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
607 re.MULTILINE)
608 self.assertNotRegex(debugout, to_addr)
609 recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE)
610 self.assertRegex(debugout, recip)
611
612 def testSendMessageWithMultipleFrom(self):
613 # Sender overrides To
614 m = email.mime.text.MIMEText('A test message')
615 m['From'] = 'Bernard, Bianca'
616 m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com'
617 m['To'] = 'John, Dinsdale'
618 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
619 timeout=support.LOOPBACK_TIMEOUT)
620 self.addCleanup(smtp.close)
621 smtp.send_message(m)
622 # XXX (see comment in testSend)
623 time.sleep(0.01)
624 smtp.quit()
625
626 self.client_evt.set()
627 self.serv_evt.wait()
628 self.output.flush()
629 # Remove the X-Peer header that DebuggingServer adds.
630 test_output = self.get_output_without_xpeer()
631 del m['X-Peer']
632 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
633 self.assertEqual(test_output, mexpect)
634 debugout = smtpd.DEBUGSTREAM.getvalue()
635 sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE)
636 self.assertRegex(debugout, sender)
637 for addr in ('John', 'Dinsdale'):
638 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
639 re.MULTILINE)
640 self.assertRegex(debugout, to_addr)
641
642 def testSendMessageResent(self):
643 m = email.mime.text.MIMEText('A test message')
644 m['From'] = 'foo@bar.com'
645 m['To'] = 'John'
646 m['CC'] = 'Sally, Fred'
647 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
648 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
649 m['Resent-From'] = 'holy@grail.net'
650 m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
651 m['Resent-Bcc'] = 'doe@losthope.net'
652 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
653 timeout=support.LOOPBACK_TIMEOUT)
654 self.addCleanup(smtp.close)
655 smtp.send_message(m)
656 # XXX (see comment in testSend)
657 time.sleep(0.01)
658 smtp.quit()
659
660 self.client_evt.set()
661 self.serv_evt.wait()
662 self.output.flush()
663 # The Resent-Bcc headers are deleted before serialization.
664 del m['Bcc']
665 del m['Resent-Bcc']
666 # Remove the X-Peer header that DebuggingServer adds.
667 test_output = self.get_output_without_xpeer()
668 del m['X-Peer']
669 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
670 self.assertEqual(test_output, mexpect)
671 debugout = smtpd.DEBUGSTREAM.getvalue()
672 sender = re.compile("^sender: holy@grail.net$", re.MULTILINE)
673 self.assertRegex(debugout, sender)
674 for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'):
675 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
676 re.MULTILINE)
677 self.assertRegex(debugout, to_addr)
678
679 def testSendMessageMultipleResentRaises(self):
680 m = email.mime.text.MIMEText('A test message')
681 m['From'] = 'foo@bar.com'
682 m['To'] = 'John'
683 m['CC'] = 'Sally, Fred'
684 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
685 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
686 m['Resent-From'] = 'holy@grail.net'
687 m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
688 m['Resent-Bcc'] = 'doe@losthope.net'
689 m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000'
690 m['Resent-To'] = 'holy@grail.net'
691 m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff'
692 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
693 timeout=support.LOOPBACK_TIMEOUT)
694 self.addCleanup(smtp.close)
695 with self.assertRaises(ValueError):
696 smtp.send_message(m)
697 smtp.close()
698
699 class ESC[4;38;5;81mNonConnectingTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
700
701 def testNotConnected(self):
702 # Test various operations on an unconnected SMTP object that
703 # should raise exceptions (at present the attempt in SMTP.send
704 # to reference the nonexistent 'sock' attribute of the SMTP object
705 # causes an AttributeError)
706 smtp = smtplib.SMTP()
707 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
708 self.assertRaises(smtplib.SMTPServerDisconnected,
709 smtp.send, 'test msg')
710
711 def testNonnumericPort(self):
712 # check that non-numeric port raises OSError
713 self.assertRaises(OSError, smtplib.SMTP,
714 "localhost", "bogus")
715 self.assertRaises(OSError, smtplib.SMTP,
716 "localhost:bogus")
717
718 def testSockAttributeExists(self):
719 # check that sock attribute is present outside of a connect() call
720 # (regression test, the previous behavior raised an
721 # AttributeError: 'SMTP' object has no attribute 'sock')
722 with smtplib.SMTP() as smtp:
723 self.assertIsNone(smtp.sock)
724
725
726 class ESC[4;38;5;81mDefaultArgumentsTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
727
728 def setUp(self):
729 self.msg = EmailMessage()
730 self.msg['From'] = 'Páolo <főo@bar.com>'
731 self.smtp = smtplib.SMTP()
732 self.smtp.ehlo = Mock(return_value=(200, 'OK'))
733 self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock()
734
735 def testSendMessage(self):
736 expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME')
737 self.smtp.send_message(self.msg)
738 self.smtp.send_message(self.msg)
739 self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
740 expected_mail_options)
741 self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3],
742 expected_mail_options)
743
744 def testSendMessageWithMailOptions(self):
745 mail_options = ['STARTTLS']
746 expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME')
747 self.smtp.send_message(self.msg, None, None, mail_options)
748 self.assertEqual(mail_options, ['STARTTLS'])
749 self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
750 expected_mail_options)
751
752
753 # test response of client to a non-successful HELO message
754 class ESC[4;38;5;81mBadHELOServerTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
755
756 def setUp(self):
757 smtplib.socket = mock_socket
758 mock_socket.reply_with(b"199 no hello for you!")
759 self.old_stdout = sys.stdout
760 self.output = io.StringIO()
761 sys.stdout = self.output
762 self.port = 25
763
764 def tearDown(self):
765 smtplib.socket = socket
766 sys.stdout = self.old_stdout
767
768 def testFailingHELO(self):
769 self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
770 HOST, self.port, 'localhost', 3)
771
772
773 class ESC[4;38;5;81mTooLongLineTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
774 respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n'
775
776 def setUp(self):
777 self.thread_key = threading_helper.threading_setup()
778 self.old_stdout = sys.stdout
779 self.output = io.StringIO()
780 sys.stdout = self.output
781
782 self.evt = threading.Event()
783 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
784 self.sock.settimeout(15)
785 self.port = socket_helper.bind_port(self.sock)
786 servargs = (self.evt, self.respdata, self.sock)
787 self.thread = threading.Thread(target=server, args=servargs)
788 self.thread.start()
789 self.evt.wait()
790 self.evt.clear()
791
792 def tearDown(self):
793 self.evt.wait()
794 sys.stdout = self.old_stdout
795 threading_helper.join_thread(self.thread)
796 del self.thread
797 self.doCleanups()
798 threading_helper.threading_cleanup(*self.thread_key)
799
800 def testLineTooLong(self):
801 self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP,
802 HOST, self.port, 'localhost', 3)
803
804
805 sim_users = {'Mr.A@somewhere.com':'John A',
806 'Ms.B@xn--fo-fka.com':'Sally B',
807 'Mrs.C@somewhereesle.com':'Ruth C',
808 }
809
810 sim_auth = ('Mr.A@somewhere.com', 'somepassword')
811 sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
812 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
813 sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
814 'list-2':['Ms.B@xn--fo-fka.com',],
815 }
816
817 # Simulated SMTP channel & server
818 class ESC[4;38;5;81mResponseException(ESC[4;38;5;149mException): pass
819 class ESC[4;38;5;81mSimSMTPChannel(ESC[4;38;5;149msmtpdESC[4;38;5;149m.ESC[4;38;5;149mSMTPChannel):
820
821 quit_response = None
822 mail_response = None
823 rcpt_response = None
824 data_response = None
825 rcpt_count = 0
826 rset_count = 0
827 disconnect = 0
828 AUTH = 99 # Add protocol state to enable auth testing.
829 authenticated_user = None
830
831 def __init__(self, extra_features, *args, **kw):
832 self._extrafeatures = ''.join(
833 [ "250-{0}\r\n".format(x) for x in extra_features ])
834 super(SimSMTPChannel, self).__init__(*args, **kw)
835
836 # AUTH related stuff. It would be nice if support for this were in smtpd.
837 def found_terminator(self):
838 if self.smtp_state == self.AUTH:
839 line = self._emptystring.join(self.received_lines)
840 print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
841 self.received_lines = []
842 try:
843 self.auth_object(line)
844 except ResponseException as e:
845 self.smtp_state = self.COMMAND
846 self.push('%s %s' % (e.smtp_code, e.smtp_error))
847 return
848 super().found_terminator()
849
850
851 def smtp_AUTH(self, arg):
852 if not self.seen_greeting:
853 self.push('503 Error: send EHLO first')
854 return
855 if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
856 self.push('500 Error: command "AUTH" not recognized')
857 return
858 if self.authenticated_user is not None:
859 self.push(
860 '503 Bad sequence of commands: already authenticated')
861 return
862 args = arg.split()
863 if len(args) not in [1, 2]:
864 self.push('501 Syntax: AUTH <mechanism> [initial-response]')
865 return
866 auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
867 try:
868 self.auth_object = getattr(self, auth_object_name)
869 except AttributeError:
870 self.push('504 Command parameter not implemented: unsupported '
871 ' authentication mechanism {!r}'.format(auth_object_name))
872 return
873 self.smtp_state = self.AUTH
874 self.auth_object(args[1] if len(args) == 2 else None)
875
876 def _authenticated(self, user, valid):
877 if valid:
878 self.authenticated_user = user
879 self.push('235 Authentication Succeeded')
880 else:
881 self.push('535 Authentication credentials invalid')
882 self.smtp_state = self.COMMAND
883
884 def _decode_base64(self, string):
885 return base64.decodebytes(string.encode('ascii')).decode('utf-8')
886
887 def _auth_plain(self, arg=None):
888 if arg is None:
889 self.push('334 ')
890 else:
891 logpass = self._decode_base64(arg)
892 try:
893 *_, user, password = logpass.split('\0')
894 except ValueError as e:
895 self.push('535 Splitting response {!r} into user and password'
896 ' failed: {}'.format(logpass, e))
897 return
898 self._authenticated(user, password == sim_auth[1])
899
900 def _auth_login(self, arg=None):
901 if arg is None:
902 # base64 encoded 'Username:'
903 self.push('334 VXNlcm5hbWU6')
904 elif not hasattr(self, '_auth_login_user'):
905 self._auth_login_user = self._decode_base64(arg)
906 # base64 encoded 'Password:'
907 self.push('334 UGFzc3dvcmQ6')
908 else:
909 password = self._decode_base64(arg)
910 self._authenticated(self._auth_login_user, password == sim_auth[1])
911 del self._auth_login_user
912
913 def _auth_buggy(self, arg=None):
914 # This AUTH mechanism will 'trap' client in a neverending 334
915 # base64 encoded 'BuGgYbUgGy'
916 self.push('334 QnVHZ1liVWdHeQ==')
917
918 def _auth_cram_md5(self, arg=None):
919 if arg is None:
920 self.push('334 {}'.format(sim_cram_md5_challenge))
921 else:
922 logpass = self._decode_base64(arg)
923 try:
924 user, hashed_pass = logpass.split()
925 except ValueError as e:
926 self.push('535 Splitting response {!r} into user and password '
927 'failed: {}'.format(logpass, e))
928 return False
929 valid_hashed_pass = hmac.HMAC(
930 sim_auth[1].encode('ascii'),
931 self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
932 'md5').hexdigest()
933 self._authenticated(user, hashed_pass == valid_hashed_pass)
934 # end AUTH related stuff.
935
936 def smtp_EHLO(self, arg):
937 resp = ('250-testhost\r\n'
938 '250-EXPN\r\n'
939 '250-SIZE 20000000\r\n'
940 '250-STARTTLS\r\n'
941 '250-DELIVERBY\r\n')
942 resp = resp + self._extrafeatures + '250 HELP'
943 self.push(resp)
944 self.seen_greeting = arg
945 self.extended_smtp = True
946
947 def smtp_VRFY(self, arg):
948 # For max compatibility smtplib should be sending the raw address.
949 if arg in sim_users:
950 self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg)))
951 else:
952 self.push('550 No such user: %s' % arg)
953
954 def smtp_EXPN(self, arg):
955 list_name = arg.lower()
956 if list_name in sim_lists:
957 user_list = sim_lists[list_name]
958 for n, user_email in enumerate(user_list):
959 quoted_addr = smtplib.quoteaddr(user_email)
960 if n < len(user_list) - 1:
961 self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
962 else:
963 self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
964 else:
965 self.push('550 No access for you!')
966
967 def smtp_QUIT(self, arg):
968 if self.quit_response is None:
969 super(SimSMTPChannel, self).smtp_QUIT(arg)
970 else:
971 self.push(self.quit_response)
972 self.close_when_done()
973
974 def smtp_MAIL(self, arg):
975 if self.mail_response is None:
976 super().smtp_MAIL(arg)
977 else:
978 self.push(self.mail_response)
979 if self.disconnect:
980 self.close_when_done()
981
982 def smtp_RCPT(self, arg):
983 if self.rcpt_response is None:
984 super().smtp_RCPT(arg)
985 return
986 self.rcpt_count += 1
987 self.push(self.rcpt_response[self.rcpt_count-1])
988
989 def smtp_RSET(self, arg):
990 self.rset_count += 1
991 super().smtp_RSET(arg)
992
993 def smtp_DATA(self, arg):
994 if self.data_response is None:
995 super().smtp_DATA(arg)
996 else:
997 self.push(self.data_response)
998
999 def handle_error(self):
1000 raise
1001
1002
1003 class ESC[4;38;5;81mSimSMTPServer(ESC[4;38;5;149msmtpdESC[4;38;5;149m.ESC[4;38;5;149mSMTPServer):
1004
1005 channel_class = SimSMTPChannel
1006
1007 def __init__(self, *args, **kw):
1008 self._extra_features = []
1009 self._addresses = {}
1010 smtpd.SMTPServer.__init__(self, *args, **kw)
1011
1012 def handle_accepted(self, conn, addr):
1013 self._SMTPchannel = self.channel_class(
1014 self._extra_features, self, conn, addr,
1015 decode_data=self._decode_data)
1016
1017 def process_message(self, peer, mailfrom, rcpttos, data):
1018 self._addresses['from'] = mailfrom
1019 self._addresses['tos'] = rcpttos
1020
1021 def add_feature(self, feature):
1022 self._extra_features.append(feature)
1023
1024 def handle_error(self):
1025 raise
1026
1027
1028 # Test various SMTP & ESMTP commands/behaviors that require a simulated server
1029 # (i.e., something with more features than DebuggingServer)
1030 class ESC[4;38;5;81mSMTPSimTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1031
1032 def setUp(self):
1033 self.thread_key = threading_helper.threading_setup()
1034 self.real_getfqdn = socket.getfqdn
1035 socket.getfqdn = mock_socket.getfqdn
1036 self.serv_evt = threading.Event()
1037 self.client_evt = threading.Event()
1038 # Pick a random unused port by passing 0 for the port number
1039 self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True)
1040 # Keep a note of what port was assigned
1041 self.port = self.serv.socket.getsockname()[1]
1042 serv_args = (self.serv, self.serv_evt, self.client_evt)
1043 self.thread = threading.Thread(target=debugging_server, args=serv_args)
1044 self.thread.start()
1045
1046 # wait until server thread has assigned a port number
1047 self.serv_evt.wait()
1048 self.serv_evt.clear()
1049
1050 def tearDown(self):
1051 socket.getfqdn = self.real_getfqdn
1052 # indicate that the client is finished
1053 self.client_evt.set()
1054 # wait for the server thread to terminate
1055 self.serv_evt.wait()
1056 threading_helper.join_thread(self.thread)
1057 del self.thread
1058 self.doCleanups()
1059 threading_helper.threading_cleanup(*self.thread_key)
1060
1061 def testBasic(self):
1062 # smoke test
1063 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1064 timeout=support.LOOPBACK_TIMEOUT)
1065 smtp.quit()
1066
1067 def testEHLO(self):
1068 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1069 timeout=support.LOOPBACK_TIMEOUT)
1070
1071 # no features should be present before the EHLO
1072 self.assertEqual(smtp.esmtp_features, {})
1073
1074 # features expected from the test server
1075 expected_features = {'expn':'',
1076 'size': '20000000',
1077 'starttls': '',
1078 'deliverby': '',
1079 'help': '',
1080 }
1081
1082 smtp.ehlo()
1083 self.assertEqual(smtp.esmtp_features, expected_features)
1084 for k in expected_features:
1085 self.assertTrue(smtp.has_extn(k))
1086 self.assertFalse(smtp.has_extn('unsupported-feature'))
1087 smtp.quit()
1088
1089 def testVRFY(self):
1090 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1091 timeout=support.LOOPBACK_TIMEOUT)
1092
1093 for addr_spec, name in sim_users.items():
1094 expected_known = (250, bytes('%s %s' %
1095 (name, smtplib.quoteaddr(addr_spec)),
1096 "ascii"))
1097 self.assertEqual(smtp.vrfy(addr_spec), expected_known)
1098
1099 u = 'nobody@nowhere.com'
1100 expected_unknown = (550, ('No such user: %s' % u).encode('ascii'))
1101 self.assertEqual(smtp.vrfy(u), expected_unknown)
1102 smtp.quit()
1103
1104 def testEXPN(self):
1105 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1106 timeout=support.LOOPBACK_TIMEOUT)
1107
1108 for listname, members in sim_lists.items():
1109 users = []
1110 for m in members:
1111 users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
1112 expected_known = (250, bytes('\n'.join(users), "ascii"))
1113 self.assertEqual(smtp.expn(listname), expected_known)
1114
1115 u = 'PSU-Members-List'
1116 expected_unknown = (550, b'No access for you!')
1117 self.assertEqual(smtp.expn(u), expected_unknown)
1118 smtp.quit()
1119
1120 def testAUTH_PLAIN(self):
1121 self.serv.add_feature("AUTH PLAIN")
1122 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1123 timeout=support.LOOPBACK_TIMEOUT)
1124 resp = smtp.login(sim_auth[0], sim_auth[1])
1125 self.assertEqual(resp, (235, b'Authentication Succeeded'))
1126 smtp.close()
1127
1128 def testAUTH_LOGIN(self):
1129 self.serv.add_feature("AUTH LOGIN")
1130 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1131 timeout=support.LOOPBACK_TIMEOUT)
1132 resp = smtp.login(sim_auth[0], sim_auth[1])
1133 self.assertEqual(resp, (235, b'Authentication Succeeded'))
1134 smtp.close()
1135
1136 def testAUTH_LOGIN_initial_response_ok(self):
1137 self.serv.add_feature("AUTH LOGIN")
1138 with smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1139 timeout=support.LOOPBACK_TIMEOUT) as smtp:
1140 smtp.user, smtp.password = sim_auth
1141 smtp.ehlo("test_auth_login")
1142 resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=True)
1143 self.assertEqual(resp, (235, b'Authentication Succeeded'))
1144
1145 def testAUTH_LOGIN_initial_response_notok(self):
1146 self.serv.add_feature("AUTH LOGIN")
1147 with smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1148 timeout=support.LOOPBACK_TIMEOUT) as smtp:
1149 smtp.user, smtp.password = sim_auth
1150 smtp.ehlo("test_auth_login")
1151 resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=False)
1152 self.assertEqual(resp, (235, b'Authentication Succeeded'))
1153
1154 def testAUTH_BUGGY(self):
1155 self.serv.add_feature("AUTH BUGGY")
1156
1157 def auth_buggy(challenge=None):
1158 self.assertEqual(b"BuGgYbUgGy", challenge)
1159 return "\0"
1160
1161 smtp = smtplib.SMTP(
1162 HOST, self.port, local_hostname='localhost',
1163 timeout=support.LOOPBACK_TIMEOUT
1164 )
1165 try:
1166 smtp.user, smtp.password = sim_auth
1167 smtp.ehlo("test_auth_buggy")
1168 expect = r"^Server AUTH mechanism infinite loop.*"
1169 with self.assertRaisesRegex(smtplib.SMTPException, expect) as cm:
1170 smtp.auth("BUGGY", auth_buggy, initial_response_ok=False)
1171 finally:
1172 smtp.close()
1173
1174 @hashlib_helper.requires_hashdigest('md5', openssl=True)
1175 def testAUTH_CRAM_MD5(self):
1176 self.serv.add_feature("AUTH CRAM-MD5")
1177 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1178 timeout=support.LOOPBACK_TIMEOUT)
1179 resp = smtp.login(sim_auth[0], sim_auth[1])
1180 self.assertEqual(resp, (235, b'Authentication Succeeded'))
1181 smtp.close()
1182
1183 @hashlib_helper.requires_hashdigest('md5', openssl=True)
1184 def testAUTH_multiple(self):
1185 # Test that multiple authentication methods are tried.
1186 self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
1187 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1188 timeout=support.LOOPBACK_TIMEOUT)
1189 resp = smtp.login(sim_auth[0], sim_auth[1])
1190 self.assertEqual(resp, (235, b'Authentication Succeeded'))
1191 smtp.close()
1192
1193 def test_auth_function(self):
1194 supported = {'PLAIN', 'LOGIN'}
1195 try:
1196 hashlib.md5()
1197 except ValueError:
1198 pass
1199 else:
1200 supported.add('CRAM-MD5')
1201 for mechanism in supported:
1202 self.serv.add_feature("AUTH {}".format(mechanism))
1203 for mechanism in supported:
1204 with self.subTest(mechanism=mechanism):
1205 smtp = smtplib.SMTP(HOST, self.port,
1206 local_hostname='localhost',
1207 timeout=support.LOOPBACK_TIMEOUT)
1208 smtp.ehlo('foo')
1209 smtp.user, smtp.password = sim_auth[0], sim_auth[1]
1210 method = 'auth_' + mechanism.lower().replace('-', '_')
1211 resp = smtp.auth(mechanism, getattr(smtp, method))
1212 self.assertEqual(resp, (235, b'Authentication Succeeded'))
1213 smtp.close()
1214
1215 def test_quit_resets_greeting(self):
1216 smtp = smtplib.SMTP(HOST, self.port,
1217 local_hostname='localhost',
1218 timeout=support.LOOPBACK_TIMEOUT)
1219 code, message = smtp.ehlo()
1220 self.assertEqual(code, 250)
1221 self.assertIn('size', smtp.esmtp_features)
1222 smtp.quit()
1223 self.assertNotIn('size', smtp.esmtp_features)
1224 smtp.connect(HOST, self.port)
1225 self.assertNotIn('size', smtp.esmtp_features)
1226 smtp.ehlo_or_helo_if_needed()
1227 self.assertIn('size', smtp.esmtp_features)
1228 smtp.quit()
1229
1230 def test_with_statement(self):
1231 with smtplib.SMTP(HOST, self.port) as smtp:
1232 code, message = smtp.noop()
1233 self.assertEqual(code, 250)
1234 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
1235 with smtplib.SMTP(HOST, self.port) as smtp:
1236 smtp.close()
1237 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
1238
1239 def test_with_statement_QUIT_failure(self):
1240 with self.assertRaises(smtplib.SMTPResponseException) as error:
1241 with smtplib.SMTP(HOST, self.port) as smtp:
1242 smtp.noop()
1243 self.serv._SMTPchannel.quit_response = '421 QUIT FAILED'
1244 self.assertEqual(error.exception.smtp_code, 421)
1245 self.assertEqual(error.exception.smtp_error, b'QUIT FAILED')
1246
1247 #TODO: add tests for correct AUTH method fallback now that the
1248 #test infrastructure can support it.
1249
1250 # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception
1251 def test__rest_from_mail_cmd(self):
1252 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1253 timeout=support.LOOPBACK_TIMEOUT)
1254 smtp.noop()
1255 self.serv._SMTPchannel.mail_response = '451 Requested action aborted'
1256 self.serv._SMTPchannel.disconnect = True
1257 with self.assertRaises(smtplib.SMTPSenderRefused):
1258 smtp.sendmail('John', 'Sally', 'test message')
1259 self.assertIsNone(smtp.sock)
1260
1261 # Issue 5713: make sure close, not rset, is called if we get a 421 error
1262 def test_421_from_mail_cmd(self):
1263 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1264 timeout=support.LOOPBACK_TIMEOUT)
1265 smtp.noop()
1266 self.serv._SMTPchannel.mail_response = '421 closing connection'
1267 with self.assertRaises(smtplib.SMTPSenderRefused):
1268 smtp.sendmail('John', 'Sally', 'test message')
1269 self.assertIsNone(smtp.sock)
1270 self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1271
1272 def test_421_from_rcpt_cmd(self):
1273 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1274 timeout=support.LOOPBACK_TIMEOUT)
1275 smtp.noop()
1276 self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing']
1277 with self.assertRaises(smtplib.SMTPRecipientsRefused) as r:
1278 smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message')
1279 self.assertIsNone(smtp.sock)
1280 self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1281 self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')})
1282
1283 def test_421_from_data_cmd(self):
1284 class ESC[4;38;5;81mMySimSMTPChannel(ESC[4;38;5;149mSimSMTPChannel):
1285 def found_terminator(self):
1286 if self.smtp_state == self.DATA:
1287 self.push('421 closing')
1288 else:
1289 super().found_terminator()
1290 self.serv.channel_class = MySimSMTPChannel
1291 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1292 timeout=support.LOOPBACK_TIMEOUT)
1293 smtp.noop()
1294 with self.assertRaises(smtplib.SMTPDataError):
1295 smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message')
1296 self.assertIsNone(smtp.sock)
1297 self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0)
1298
1299 def test_smtputf8_NotSupportedError_if_no_server_support(self):
1300 smtp = smtplib.SMTP(
1301 HOST, self.port, local_hostname='localhost',
1302 timeout=support.LOOPBACK_TIMEOUT)
1303 self.addCleanup(smtp.close)
1304 smtp.ehlo()
1305 self.assertTrue(smtp.does_esmtp)
1306 self.assertFalse(smtp.has_extn('smtputf8'))
1307 self.assertRaises(
1308 smtplib.SMTPNotSupportedError,
1309 smtp.sendmail,
1310 'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1311 self.assertRaises(
1312 smtplib.SMTPNotSupportedError,
1313 smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8'])
1314
1315 def test_send_unicode_without_SMTPUTF8(self):
1316 smtp = smtplib.SMTP(
1317 HOST, self.port, local_hostname='localhost',
1318 timeout=support.LOOPBACK_TIMEOUT)
1319 self.addCleanup(smtp.close)
1320 self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '')
1321 self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice')
1322
1323 def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self):
1324 # This test is located here and not in the SMTPUTF8SimTests
1325 # class because it needs a "regular" SMTP server to work
1326 msg = EmailMessage()
1327 msg['From'] = "Páolo <főo@bar.com>"
1328 msg['To'] = 'Dinsdale'
1329 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1330 smtp = smtplib.SMTP(
1331 HOST, self.port, local_hostname='localhost',
1332 timeout=support.LOOPBACK_TIMEOUT)
1333 self.addCleanup(smtp.close)
1334 with self.assertRaises(smtplib.SMTPNotSupportedError):
1335 smtp.send_message(msg)
1336
1337 def test_name_field_not_included_in_envelop_addresses(self):
1338 smtp = smtplib.SMTP(
1339 HOST, self.port, local_hostname='localhost',
1340 timeout=support.LOOPBACK_TIMEOUT)
1341 self.addCleanup(smtp.close)
1342
1343 message = EmailMessage()
1344 message['From'] = email.utils.formataddr(('Michaël', 'michael@example.com'))
1345 message['To'] = email.utils.formataddr(('René', 'rene@example.com'))
1346
1347 self.assertDictEqual(smtp.send_message(message), {})
1348
1349 self.assertEqual(self.serv._addresses['from'], 'michael@example.com')
1350 self.assertEqual(self.serv._addresses['tos'], ['rene@example.com'])
1351
1352
1353 class ESC[4;38;5;81mSimSMTPUTF8Server(ESC[4;38;5;149mSimSMTPServer):
1354
1355 def __init__(self, *args, **kw):
1356 # The base SMTP server turns these on automatically, but our test
1357 # server is set up to munge the EHLO response, so we need to provide
1358 # them as well. And yes, the call is to SMTPServer not SimSMTPServer.
1359 self._extra_features = ['SMTPUTF8', '8BITMIME']
1360 smtpd.SMTPServer.__init__(self, *args, **kw)
1361
1362 def handle_accepted(self, conn, addr):
1363 self._SMTPchannel = self.channel_class(
1364 self._extra_features, self, conn, addr,
1365 decode_data=self._decode_data,
1366 enable_SMTPUTF8=self.enable_SMTPUTF8,
1367 )
1368
1369 def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None,
1370 rcpt_options=None):
1371 self.last_peer = peer
1372 self.last_mailfrom = mailfrom
1373 self.last_rcpttos = rcpttos
1374 self.last_message = data
1375 self.last_mail_options = mail_options
1376 self.last_rcpt_options = rcpt_options
1377
1378
1379 class ESC[4;38;5;81mSMTPUTF8SimTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1380
1381 maxDiff = None
1382
1383 def setUp(self):
1384 self.thread_key = threading_helper.threading_setup()
1385 self.real_getfqdn = socket.getfqdn
1386 socket.getfqdn = mock_socket.getfqdn
1387 self.serv_evt = threading.Event()
1388 self.client_evt = threading.Event()
1389 # Pick a random unused port by passing 0 for the port number
1390 self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1),
1391 decode_data=False,
1392 enable_SMTPUTF8=True)
1393 # Keep a note of what port was assigned
1394 self.port = self.serv.socket.getsockname()[1]
1395 serv_args = (self.serv, self.serv_evt, self.client_evt)
1396 self.thread = threading.Thread(target=debugging_server, args=serv_args)
1397 self.thread.start()
1398
1399 # wait until server thread has assigned a port number
1400 self.serv_evt.wait()
1401 self.serv_evt.clear()
1402
1403 def tearDown(self):
1404 socket.getfqdn = self.real_getfqdn
1405 # indicate that the client is finished
1406 self.client_evt.set()
1407 # wait for the server thread to terminate
1408 self.serv_evt.wait()
1409 threading_helper.join_thread(self.thread)
1410 del self.thread
1411 self.doCleanups()
1412 threading_helper.threading_cleanup(*self.thread_key)
1413
1414 def test_test_server_supports_extensions(self):
1415 smtp = smtplib.SMTP(
1416 HOST, self.port, local_hostname='localhost',
1417 timeout=support.LOOPBACK_TIMEOUT)
1418 self.addCleanup(smtp.close)
1419 smtp.ehlo()
1420 self.assertTrue(smtp.does_esmtp)
1421 self.assertTrue(smtp.has_extn('smtputf8'))
1422
1423 def test_send_unicode_with_SMTPUTF8_via_sendmail(self):
1424 m = '¡a test message containing unicode!'.encode('utf-8')
1425 smtp = smtplib.SMTP(
1426 HOST, self.port, local_hostname='localhost',
1427 timeout=support.LOOPBACK_TIMEOUT)
1428 self.addCleanup(smtp.close)
1429 smtp.sendmail('Jőhn', 'Sálly', m,
1430 mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1431 self.assertEqual(self.serv.last_mailfrom, 'Jőhn')
1432 self.assertEqual(self.serv.last_rcpttos, ['Sálly'])
1433 self.assertEqual(self.serv.last_message, m)
1434 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1435 self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1436 self.assertEqual(self.serv.last_rcpt_options, [])
1437
1438 def test_send_unicode_with_SMTPUTF8_via_low_level_API(self):
1439 m = '¡a test message containing unicode!'.encode('utf-8')
1440 smtp = smtplib.SMTP(
1441 HOST, self.port, local_hostname='localhost',
1442 timeout=support.LOOPBACK_TIMEOUT)
1443 self.addCleanup(smtp.close)
1444 smtp.ehlo()
1445 self.assertEqual(
1446 smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']),
1447 (250, b'OK'))
1448 self.assertEqual(smtp.rcpt('János'), (250, b'OK'))
1449 self.assertEqual(smtp.data(m), (250, b'OK'))
1450 self.assertEqual(self.serv.last_mailfrom, 'Jő')
1451 self.assertEqual(self.serv.last_rcpttos, ['János'])
1452 self.assertEqual(self.serv.last_message, m)
1453 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1454 self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1455 self.assertEqual(self.serv.last_rcpt_options, [])
1456
1457 def test_send_message_uses_smtputf8_if_addrs_non_ascii(self):
1458 msg = EmailMessage()
1459 msg['From'] = "Páolo <főo@bar.com>"
1460 msg['To'] = 'Dinsdale'
1461 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1462 # XXX I don't know why I need two \n's here, but this is an existing
1463 # bug (if it is one) and not a problem with the new functionality.
1464 msg.set_content("oh là là, know what I mean, know what I mean?\n\n")
1465 # XXX smtpd converts received /r/n to /n, so we can't easily test that
1466 # we are successfully sending /r/n :(.
1467 expected = textwrap.dedent("""\
1468 From: Páolo <főo@bar.com>
1469 To: Dinsdale
1470 Subject: Nudge nudge, wink, wink \u1F609
1471 Content-Type: text/plain; charset="utf-8"
1472 Content-Transfer-Encoding: 8bit
1473 MIME-Version: 1.0
1474
1475 oh là là, know what I mean, know what I mean?
1476 """)
1477 smtp = smtplib.SMTP(
1478 HOST, self.port, local_hostname='localhost',
1479 timeout=support.LOOPBACK_TIMEOUT)
1480 self.addCleanup(smtp.close)
1481 self.assertEqual(smtp.send_message(msg), {})
1482 self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com')
1483 self.assertEqual(self.serv.last_rcpttos, ['Dinsdale'])
1484 self.assertEqual(self.serv.last_message.decode(), expected)
1485 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1486 self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1487 self.assertEqual(self.serv.last_rcpt_options, [])
1488
1489
1490 EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='')
1491
1492 class ESC[4;38;5;81mSimSMTPAUTHInitialResponseChannel(ESC[4;38;5;149mSimSMTPChannel):
1493 def smtp_AUTH(self, arg):
1494 # RFC 4954's AUTH command allows for an optional initial-response.
1495 # Not all AUTH methods support this; some require a challenge. AUTH
1496 # PLAIN does those, so test that here. See issue #15014.
1497 args = arg.split()
1498 if args[0].lower() == 'plain':
1499 if len(args) == 2:
1500 # AUTH PLAIN <initial-response> with the response base 64
1501 # encoded. Hard code the expected response for the test.
1502 if args[1] == EXPECTED_RESPONSE:
1503 self.push('235 Ok')
1504 return
1505 self.push('571 Bad authentication')
1506
1507 class ESC[4;38;5;81mSimSMTPAUTHInitialResponseServer(ESC[4;38;5;149mSimSMTPServer):
1508 channel_class = SimSMTPAUTHInitialResponseChannel
1509
1510
1511 class ESC[4;38;5;81mSMTPAUTHInitialResponseSimTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1512 def setUp(self):
1513 self.thread_key = threading_helper.threading_setup()
1514 self.real_getfqdn = socket.getfqdn
1515 socket.getfqdn = mock_socket.getfqdn
1516 self.serv_evt = threading.Event()
1517 self.client_evt = threading.Event()
1518 # Pick a random unused port by passing 0 for the port number
1519 self.serv = SimSMTPAUTHInitialResponseServer(
1520 (HOST, 0), ('nowhere', -1), decode_data=True)
1521 # Keep a note of what port was assigned
1522 self.port = self.serv.socket.getsockname()[1]
1523 serv_args = (self.serv, self.serv_evt, self.client_evt)
1524 self.thread = threading.Thread(target=debugging_server, args=serv_args)
1525 self.thread.start()
1526
1527 # wait until server thread has assigned a port number
1528 self.serv_evt.wait()
1529 self.serv_evt.clear()
1530
1531 def tearDown(self):
1532 socket.getfqdn = self.real_getfqdn
1533 # indicate that the client is finished
1534 self.client_evt.set()
1535 # wait for the server thread to terminate
1536 self.serv_evt.wait()
1537 threading_helper.join_thread(self.thread)
1538 del self.thread
1539 self.doCleanups()
1540 threading_helper.threading_cleanup(*self.thread_key)
1541
1542 def testAUTH_PLAIN_initial_response_login(self):
1543 self.serv.add_feature('AUTH PLAIN')
1544 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1545 timeout=support.LOOPBACK_TIMEOUT)
1546 smtp.login('psu', 'doesnotexist')
1547 smtp.close()
1548
1549 def testAUTH_PLAIN_initial_response_auth(self):
1550 self.serv.add_feature('AUTH PLAIN')
1551 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1552 timeout=support.LOOPBACK_TIMEOUT)
1553 smtp.user = 'psu'
1554 smtp.password = 'doesnotexist'
1555 code, response = smtp.auth('plain', smtp.auth_plain)
1556 smtp.close()
1557 self.assertEqual(code, 235)
1558
1559
1560 if __name__ == '__main__':
1561 unittest.main()