1 import base64
2 import email.mime.text
3 from email.message import EmailMessage
4 from email.base64mime import body_encode as encode_base64
5 import email.utils
6 import hashlib
7 import hmac
8 import socket
9 import smtplib
10 import io
11 import re
12 import sys
13 import time
14 import select
15 import errno
16 import textwrap
17 import threading
18
19 import unittest
20 from test import support, mock_socket
21 from test.support import hashlib_helper
22 from test.support import socket_helper
23 from test.support import threading_helper
24 from test.support import warnings_helper
25 from unittest.mock import Mock
26
27
28 asyncore = warnings_helper.import_deprecated('asyncore')
29 smtpd = warnings_helper.import_deprecated('smtpd')
30
31
32 support.requires_working_socket(module=True)
33
34 HOST = socket_helper.HOST
35
36 if sys.platform == 'darwin':
37 # select.poll returns a select.POLLHUP at the end of the tests
38 # on darwin, so just ignore it
39 def handle_expt(self):
40 pass
41 smtpd.SMTPChannel.handle_expt = handle_expt
42
43
44 def server(evt, buf, serv):
45 serv.listen()
46 evt.set()
47 try:
48 conn, addr = serv.accept()
49 except TimeoutError:
50 pass
51 else:
52 n = 500
53 while buf and n > 0:
54 r, w, e = select.select([], [conn], [])
55 if w:
56 sent = conn.send(buf)
57 buf = buf[sent:]
58
59 n -= 1
60
61 conn.close()
62 finally:
63 serv.close()
64 evt.set()
65
66 class ESC[4;38;5;81mGeneralTests:
67
68 def setUp(self):
69 smtplib.socket = mock_socket
70 self.port = 25
71
72 def tearDown(self):
73 smtplib.socket = socket
74
75 # This method is no longer used but is retained for backward compatibility,
76 # so test to make sure it still works.
77 def testQuoteData(self):
78 teststr = "abc\n.jkl\rfoo\r\n..blue"
79 expected = "abc\r\n..jkl\r\nfoo\r\n...blue"
80 self.assertEqual(expected, smtplib.quotedata(teststr))
81
82 def testBasic1(self):
83 mock_socket.reply_with(b"220 Hola mundo")
84 # connects
85 client = self.client(HOST, self.port)
86 client.close()
87
88 def testSourceAddress(self):
89 mock_socket.reply_with(b"220 Hola mundo")
90 # connects
91 client = self.client(HOST, self.port,
92 source_address=('127.0.0.1',19876))
93 self.assertEqual(client.source_address, ('127.0.0.1', 19876))
94 client.close()
95
96 def testBasic2(self):
97 mock_socket.reply_with(b"220 Hola mundo")
98 # connects, include port in host name
99 client = self.client("%s:%s" % (HOST, self.port))
100 client.close()
101
102 def testLocalHostName(self):
103 mock_socket.reply_with(b"220 Hola mundo")
104 # check that supplied local_hostname is used
105 client = self.client(HOST, self.port, local_hostname="testhost")
106 self.assertEqual(client.local_hostname, "testhost")
107 client.close()
108
109 def testTimeoutDefault(self):
110 mock_socket.reply_with(b"220 Hola mundo")
111 self.assertIsNone(mock_socket.getdefaulttimeout())
112 mock_socket.setdefaulttimeout(30)
113 self.assertEqual(mock_socket.getdefaulttimeout(), 30)
114 try:
115 client = self.client(HOST, self.port)
116 finally:
117 mock_socket.setdefaulttimeout(None)
118 self.assertEqual(client.sock.gettimeout(), 30)
119 client.close()
120
121 def testTimeoutNone(self):
122 mock_socket.reply_with(b"220 Hola mundo")
123 self.assertIsNone(socket.getdefaulttimeout())
124 socket.setdefaulttimeout(30)
125 try:
126 client = self.client(HOST, self.port, timeout=None)
127 finally:
128 socket.setdefaulttimeout(None)
129 self.assertIsNone(client.sock.gettimeout())
130 client.close()
131
132 def testTimeoutZero(self):
133 mock_socket.reply_with(b"220 Hola mundo")
134 with self.assertRaises(ValueError):
135 self.client(HOST, self.port, timeout=0)
136
137 def testTimeoutValue(self):
138 mock_socket.reply_with(b"220 Hola mundo")
139 client = self.client(HOST, self.port, timeout=30)
140 self.assertEqual(client.sock.gettimeout(), 30)
141 client.close()
142
143 def test_debuglevel(self):
144 mock_socket.reply_with(b"220 Hello world")
145 client = self.client()
146 client.set_debuglevel(1)
147 with support.captured_stderr() as stderr:
148 client.connect(HOST, self.port)
149 client.close()
150 expected = re.compile(r"^connect:", re.MULTILINE)
151 self.assertRegex(stderr.getvalue(), expected)
152
153 def test_debuglevel_2(self):
154 mock_socket.reply_with(b"220 Hello world")
155 client = self.client()
156 client.set_debuglevel(2)
157 with support.captured_stderr() as stderr:
158 client.connect(HOST, self.port)
159 client.close()
160 expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ",
161 re.MULTILINE)
162 self.assertRegex(stderr.getvalue(), expected)
163
164
165 class ESC[4;38;5;81mSMTPGeneralTests(ESC[4;38;5;149mGeneralTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
166
167 client = smtplib.SMTP
168
169
170 class ESC[4;38;5;81mLMTPGeneralTests(ESC[4;38;5;149mGeneralTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
171
172 client = smtplib.LMTP
173
174 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), "test requires Unix domain socket")
175 def testUnixDomainSocketTimeoutDefault(self):
176 local_host = '/some/local/lmtp/delivery/program'
177 mock_socket.reply_with(b"220 Hello world")
178 try:
179 client = self.client(local_host, self.port)
180 finally:
181 mock_socket.setdefaulttimeout(None)
182 self.assertIsNone(client.sock.gettimeout())
183 client.close()
184
185 def testTimeoutZero(self):
186 super().testTimeoutZero()
187 local_host = '/some/local/lmtp/delivery/program'
188 with self.assertRaises(ValueError):
189 self.client(local_host, timeout=0)
190
191 # Test server thread using the specified SMTP server class
192 def debugging_server(serv, serv_evt, client_evt):
193 serv_evt.set()
194
195 try:
196 if hasattr(select, 'poll'):
197 poll_fun = asyncore.poll2
198 else:
199 poll_fun = asyncore.poll
200
201 n = 1000
202 while asyncore.socket_map and n > 0:
203 poll_fun(0.01, asyncore.socket_map)
204
205 # when the client conversation is finished, it will
206 # set client_evt, and it's then ok to kill the server
207 if client_evt.is_set():
208 serv.close()
209 break
210
211 n -= 1
212
213 except TimeoutError:
214 pass
215 finally:
216 if not client_evt.is_set():
217 # allow some time for the client to read the result
218 time.sleep(0.5)
219 serv.close()
220 asyncore.close_all()
221 serv_evt.set()
222
223 MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
224 MSG_END = '------------ END MESSAGE ------------\n'
225
226 # NOTE: Some SMTP objects in the tests below are created with a non-default
227 # local_hostname argument to the constructor, since (on some systems) the FQDN
228 # lookup caused by the default local_hostname sometimes takes so long that the
229 # test server times out, causing the test to fail.
230
231 # Test behavior of smtpd.DebuggingServer
232 class ESC[4;38;5;81mDebuggingServerTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
233
234 maxDiff = None
235
236 def setUp(self):
237 self.thread_key = threading_helper.threading_setup()
238 self.real_getfqdn = socket.getfqdn
239 socket.getfqdn = mock_socket.getfqdn
240 # temporarily replace sys.stdout to capture DebuggingServer output
241 self.old_stdout = sys.stdout
242 self.output = io.StringIO()
243 sys.stdout = self.output
244
245 self.serv_evt = threading.Event()
246 self.client_evt = threading.Event()
247 # Capture SMTPChannel debug output
248 self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
249 smtpd.DEBUGSTREAM = io.StringIO()
250 # Pick a random unused port by passing 0 for the port number
251 self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1),
252 decode_data=True)
253 # Keep a note of what server host and port were assigned
254 self.host, self.port = self.serv.socket.getsockname()[:2]
255 serv_args = (self.serv, self.serv_evt, self.client_evt)
256 self.thread = threading.Thread(target=debugging_server, args=serv_args)
257 self.thread.start()
258
259 # wait until server thread has assigned a port number
260 self.serv_evt.wait()
261 self.serv_evt.clear()
262
263 def tearDown(self):
264 socket.getfqdn = self.real_getfqdn
265 # indicate that the client is finished
266 self.client_evt.set()
267 # wait for the server thread to terminate
268 self.serv_evt.wait()
269 threading_helper.join_thread(self.thread)
270 # restore sys.stdout
271 sys.stdout = self.old_stdout
272 # restore DEBUGSTREAM
273 smtpd.DEBUGSTREAM.close()
274 smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
275 del self.thread
276 self.doCleanups()
277 threading_helper.threading_cleanup(*self.thread_key)
278
279 def get_output_without_xpeer(self):
280 test_output = self.output.getvalue()
281 return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2',
282 test_output, flags=re.MULTILINE|re.DOTALL)
283
284 def testBasic(self):
285 # connect
286 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
287 timeout=support.LOOPBACK_TIMEOUT)
288 smtp.quit()
289
290 def testSourceAddress(self):
291 # connect
292 src_port = socket_helper.find_unused_port()
293 try:
294 smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost',
295 timeout=support.LOOPBACK_TIMEOUT,
296 source_address=(self.host, src_port))
297 self.addCleanup(smtp.close)
298 self.assertEqual(smtp.source_address, (self.host, src_port))
299 self.assertEqual(smtp.local_hostname, 'localhost')
300 smtp.quit()
301 except OSError as e:
302 if e.errno == errno.EADDRINUSE:
303 self.skipTest("couldn't bind to source port %d" % src_port)
304 raise
305
306 def testNOOP(self):
307 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
308 timeout=support.LOOPBACK_TIMEOUT)
309 self.addCleanup(smtp.close)
310 expected = (250, b'OK')
311 self.assertEqual(smtp.noop(), expected)
312 smtp.quit()
313
314 def testRSET(self):
315 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
316 timeout=support.LOOPBACK_TIMEOUT)
317 self.addCleanup(smtp.close)
318 expected = (250, b'OK')
319 self.assertEqual(smtp.rset(), expected)
320 smtp.quit()
321
322 def testELHO(self):
323 # EHLO isn't implemented in DebuggingServer
324 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
325 timeout=support.LOOPBACK_TIMEOUT)
326 self.addCleanup(smtp.close)
327 expected = (250, b'\nSIZE 33554432\nHELP')
328 self.assertEqual(smtp.ehlo(), expected)
329 smtp.quit()
330
331 def testEXPNNotImplemented(self):
332 # EXPN isn't implemented in DebuggingServer
333 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
334 timeout=support.LOOPBACK_TIMEOUT)
335 self.addCleanup(smtp.close)
336 expected = (502, b'EXPN not implemented')
337 smtp.putcmd('EXPN')
338 self.assertEqual(smtp.getreply(), expected)
339 smtp.quit()
340
341 def test_issue43124_putcmd_escapes_newline(self):
342 # see: https://bugs.python.org/issue43124
343 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
344 timeout=support.LOOPBACK_TIMEOUT)
345 self.addCleanup(smtp.close)
346 with self.assertRaises(ValueError) as exc:
347 smtp.putcmd('helo\nX-INJECTED')
348 self.assertIn("prohibited newline characters", str(exc.exception))
349 smtp.quit()
350
351 def testVRFY(self):
352 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
353 timeout=support.LOOPBACK_TIMEOUT)
354 self.addCleanup(smtp.close)
355 expected = (252, b'Cannot VRFY user, but will accept message ' + \
356 b'and attempt delivery')
357 self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
358 self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
359 smtp.quit()
360
361 def testSecondHELO(self):
362 # check that a second HELO returns a message that it's a duplicate
363 # (this behavior is specific to smtpd.SMTPChannel)
364 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
365 timeout=support.LOOPBACK_TIMEOUT)
366 self.addCleanup(smtp.close)
367 smtp.helo()
368 expected = (503, b'Duplicate HELO/EHLO')
369 self.assertEqual(smtp.helo(), expected)
370 smtp.quit()
371
372 def testHELP(self):
373 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
374 timeout=support.LOOPBACK_TIMEOUT)
375 self.addCleanup(smtp.close)
376 self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \
377 b'RCPT DATA RSET NOOP QUIT VRFY')
378 smtp.quit()
379
380 def testSend(self):
381 # connect and send mail
382 m = 'A test message'
383 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
384 timeout=support.LOOPBACK_TIMEOUT)
385 self.addCleanup(smtp.close)
386 smtp.sendmail('John', 'Sally', m)
387 # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
388 # in asyncore. This sleep might help, but should really be fixed
389 # properly by using an Event variable.
390 time.sleep(0.01)
391 smtp.quit()
392
393 self.client_evt.set()
394 self.serv_evt.wait()
395 self.output.flush()
396 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
397 self.assertEqual(self.output.getvalue(), mexpect)
398
399 def testSendBinary(self):
400 m = b'A test message'
401 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
402 timeout=support.LOOPBACK_TIMEOUT)
403 self.addCleanup(smtp.close)
404 smtp.sendmail('John', 'Sally', m)
405 # XXX (see comment in testSend)
406 time.sleep(0.01)
407 smtp.quit()
408
409 self.client_evt.set()
410 self.serv_evt.wait()
411 self.output.flush()
412 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END)
413 self.assertEqual(self.output.getvalue(), mexpect)
414
415 def testSendNeedingDotQuote(self):
416 # Issue 12283
417 m = '.A test\n.mes.sage.'
418 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
419 timeout=support.LOOPBACK_TIMEOUT)
420 self.addCleanup(smtp.close)
421 smtp.sendmail('John', 'Sally', m)
422 # XXX (see comment in testSend)
423 time.sleep(0.01)
424 smtp.quit()
425
426 self.client_evt.set()
427 self.serv_evt.wait()
428 self.output.flush()
429 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
430 self.assertEqual(self.output.getvalue(), mexpect)
431
432 def test_issue43124_escape_localhostname(self):
433 # see: https://bugs.python.org/issue43124
434 # connect and send mail
435 m = 'wazzuuup\nlinetwo'
436 smtp = smtplib.SMTP(HOST, self.port, local_hostname='hi\nX-INJECTED',
437 timeout=support.LOOPBACK_TIMEOUT)
438 self.addCleanup(smtp.close)
439 with self.assertRaises(ValueError) as exc:
440 smtp.sendmail("hi@me.com", "you@me.com", m)
441 self.assertIn(
442 "prohibited newline characters: ehlo hi\\nX-INJECTED",
443 str(exc.exception),
444 )
445 # XXX (see comment in testSend)
446 time.sleep(0.01)
447 smtp.quit()
448
449 debugout = smtpd.DEBUGSTREAM.getvalue()
450 self.assertNotIn("X-INJECTED", debugout)
451
452 def test_issue43124_escape_options(self):
453 # see: https://bugs.python.org/issue43124
454 # connect and send mail
455 m = 'wazzuuup\nlinetwo'
456 smtp = smtplib.SMTP(
457 HOST, self.port, local_hostname='localhost',
458 timeout=support.LOOPBACK_TIMEOUT)
459
460 self.addCleanup(smtp.close)
461 smtp.sendmail("hi@me.com", "you@me.com", m)
462 with self.assertRaises(ValueError) as exc:
463 smtp.mail("hi@me.com", ["X-OPTION\nX-INJECTED-1", "X-OPTION2\nX-INJECTED-2"])
464 msg = str(exc.exception)
465 self.assertIn("prohibited newline characters", msg)
466 self.assertIn("X-OPTION\\nX-INJECTED-1 X-OPTION2\\nX-INJECTED-2", msg)
467 # XXX (see comment in testSend)
468 time.sleep(0.01)
469 smtp.quit()
470
471 debugout = smtpd.DEBUGSTREAM.getvalue()
472 self.assertNotIn("X-OPTION", debugout)
473 self.assertNotIn("X-OPTION2", debugout)
474 self.assertNotIn("X-INJECTED-1", debugout)
475 self.assertNotIn("X-INJECTED-2", debugout)
476
477 def testSendNullSender(self):
478 m = 'A test message'
479 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
480 timeout=support.LOOPBACK_TIMEOUT)
481 self.addCleanup(smtp.close)
482 smtp.sendmail('<>', 'Sally', m)
483 # XXX (see comment in testSend)
484 time.sleep(0.01)
485 smtp.quit()
486
487 self.client_evt.set()
488 self.serv_evt.wait()
489 self.output.flush()
490 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
491 self.assertEqual(self.output.getvalue(), mexpect)
492 debugout = smtpd.DEBUGSTREAM.getvalue()
493 sender = re.compile("^sender: <>$", re.MULTILINE)
494 self.assertRegex(debugout, sender)
495
496 def testSendMessage(self):
497 m = email.mime.text.MIMEText('A test message')
498 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
499 timeout=support.LOOPBACK_TIMEOUT)
500 self.addCleanup(smtp.close)
501 smtp.send_message(m, from_addr='John', to_addrs='Sally')
502 # XXX (see comment in testSend)
503 time.sleep(0.01)
504 smtp.quit()
505
506 self.client_evt.set()
507 self.serv_evt.wait()
508 self.output.flush()
509 # Remove the X-Peer header that DebuggingServer adds as figuring out
510 # exactly what IP address format is put there is not easy (and
511 # irrelevant to our test). Typically 127.0.0.1 or ::1, but it is
512 # not always the same as socket.gethostbyname(HOST). :(
513 test_output = self.get_output_without_xpeer()
514 del m['X-Peer']
515 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
516 self.assertEqual(test_output, mexpect)
517
518 def testSendMessageWithAddresses(self):
519 m = email.mime.text.MIMEText('A test message')
520 m['From'] = 'foo@bar.com'
521 m['To'] = 'John'
522 m['CC'] = 'Sally, Fred'
523 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
524 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
525 timeout=support.LOOPBACK_TIMEOUT)
526 self.addCleanup(smtp.close)
527 smtp.send_message(m)
528 # XXX (see comment in testSend)
529 time.sleep(0.01)
530 smtp.quit()
531 # make sure the Bcc header is still in the message.
532 self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" '
533 '<warped@silly.walks.com>')
534
535 self.client_evt.set()
536 self.serv_evt.wait()
537 self.output.flush()
538 # Remove the X-Peer header that DebuggingServer adds.
539 test_output = self.get_output_without_xpeer()
540 del m['X-Peer']
541 # The Bcc header should not be transmitted.
542 del m['Bcc']
543 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
544 self.assertEqual(test_output, mexpect)
545 debugout = smtpd.DEBUGSTREAM.getvalue()
546 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
547 self.assertRegex(debugout, sender)
548 for addr in ('John', 'Sally', 'Fred', 'root@localhost',
549 'warped@silly.walks.com'):
550 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
551 re.MULTILINE)
552 self.assertRegex(debugout, to_addr)
553
554 def testSendMessageWithSomeAddresses(self):
555 # Make sure nothing breaks if not all of the three 'to' headers exist
556 m = email.mime.text.MIMEText('A test message')
557 m['From'] = 'foo@bar.com'
558 m['To'] = 'John, Dinsdale'
559 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
560 timeout=support.LOOPBACK_TIMEOUT)
561 self.addCleanup(smtp.close)
562 smtp.send_message(m)
563 # XXX (see comment in testSend)
564 time.sleep(0.01)
565 smtp.quit()
566
567 self.client_evt.set()
568 self.serv_evt.wait()
569 self.output.flush()
570 # Remove the X-Peer header that DebuggingServer adds.
571 test_output = self.get_output_without_xpeer()
572 del m['X-Peer']
573 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
574 self.assertEqual(test_output, mexpect)
575 debugout = smtpd.DEBUGSTREAM.getvalue()
576 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
577 self.assertRegex(debugout, sender)
578 for addr in ('John', 'Dinsdale'):
579 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
580 re.MULTILINE)
581 self.assertRegex(debugout, to_addr)
582
583 def testSendMessageWithSpecifiedAddresses(self):
584 # Make sure addresses specified in call override those in message.
585 m = email.mime.text.MIMEText('A test message')
586 m['From'] = 'foo@bar.com'
587 m['To'] = 'John, Dinsdale'
588 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
589 timeout=support.LOOPBACK_TIMEOUT)
590 self.addCleanup(smtp.close)
591 smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net')
592 # XXX (see comment in testSend)
593 time.sleep(0.01)
594 smtp.quit()
595
596 self.client_evt.set()
597 self.serv_evt.wait()
598 self.output.flush()
599 # Remove the X-Peer header that DebuggingServer adds.
600 test_output = self.get_output_without_xpeer()
601 del m['X-Peer']
602 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
603 self.assertEqual(test_output, mexpect)
604 debugout = smtpd.DEBUGSTREAM.getvalue()
605 sender = re.compile("^sender: joe@example.com$", re.MULTILINE)
606 self.assertRegex(debugout, sender)
607 for addr in ('John', 'Dinsdale'):
608 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
609 re.MULTILINE)
610 self.assertNotRegex(debugout, to_addr)
611 recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE)
612 self.assertRegex(debugout, recip)
613
614 def testSendMessageWithMultipleFrom(self):
615 # Sender overrides To
616 m = email.mime.text.MIMEText('A test message')
617 m['From'] = 'Bernard, Bianca'
618 m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com'
619 m['To'] = 'John, Dinsdale'
620 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
621 timeout=support.LOOPBACK_TIMEOUT)
622 self.addCleanup(smtp.close)
623 smtp.send_message(m)
624 # XXX (see comment in testSend)
625 time.sleep(0.01)
626 smtp.quit()
627
628 self.client_evt.set()
629 self.serv_evt.wait()
630 self.output.flush()
631 # Remove the X-Peer header that DebuggingServer adds.
632 test_output = self.get_output_without_xpeer()
633 del m['X-Peer']
634 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
635 self.assertEqual(test_output, mexpect)
636 debugout = smtpd.DEBUGSTREAM.getvalue()
637 sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE)
638 self.assertRegex(debugout, sender)
639 for addr in ('John', 'Dinsdale'):
640 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
641 re.MULTILINE)
642 self.assertRegex(debugout, to_addr)
643
644 def testSendMessageResent(self):
645 m = email.mime.text.MIMEText('A test message')
646 m['From'] = 'foo@bar.com'
647 m['To'] = 'John'
648 m['CC'] = 'Sally, Fred'
649 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
650 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
651 m['Resent-From'] = 'holy@grail.net'
652 m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
653 m['Resent-Bcc'] = 'doe@losthope.net'
654 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
655 timeout=support.LOOPBACK_TIMEOUT)
656 self.addCleanup(smtp.close)
657 smtp.send_message(m)
658 # XXX (see comment in testSend)
659 time.sleep(0.01)
660 smtp.quit()
661
662 self.client_evt.set()
663 self.serv_evt.wait()
664 self.output.flush()
665 # The Resent-Bcc headers are deleted before serialization.
666 del m['Bcc']
667 del m['Resent-Bcc']
668 # Remove the X-Peer header that DebuggingServer adds.
669 test_output = self.get_output_without_xpeer()
670 del m['X-Peer']
671 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
672 self.assertEqual(test_output, mexpect)
673 debugout = smtpd.DEBUGSTREAM.getvalue()
674 sender = re.compile("^sender: holy@grail.net$", re.MULTILINE)
675 self.assertRegex(debugout, sender)
676 for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'):
677 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
678 re.MULTILINE)
679 self.assertRegex(debugout, to_addr)
680
681 def testSendMessageMultipleResentRaises(self):
682 m = email.mime.text.MIMEText('A test message')
683 m['From'] = 'foo@bar.com'
684 m['To'] = 'John'
685 m['CC'] = 'Sally, Fred'
686 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
687 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
688 m['Resent-From'] = 'holy@grail.net'
689 m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
690 m['Resent-Bcc'] = 'doe@losthope.net'
691 m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000'
692 m['Resent-To'] = 'holy@grail.net'
693 m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff'
694 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
695 timeout=support.LOOPBACK_TIMEOUT)
696 self.addCleanup(smtp.close)
697 with self.assertRaises(ValueError):
698 smtp.send_message(m)
699 smtp.close()
700
701 class ESC[4;38;5;81mNonConnectingTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
702
703 def testNotConnected(self):
704 # Test various operations on an unconnected SMTP object that
705 # should raise exceptions (at present the attempt in SMTP.send
706 # to reference the nonexistent 'sock' attribute of the SMTP object
707 # causes an AttributeError)
708 smtp = smtplib.SMTP()
709 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
710 self.assertRaises(smtplib.SMTPServerDisconnected,
711 smtp.send, 'test msg')
712
713 def testNonnumericPort(self):
714 # check that non-numeric port raises OSError
715 self.assertRaises(OSError, smtplib.SMTP,
716 "localhost", "bogus")
717 self.assertRaises(OSError, smtplib.SMTP,
718 "localhost:bogus")
719
720 def testSockAttributeExists(self):
721 # check that sock attribute is present outside of a connect() call
722 # (regression test, the previous behavior raised an
723 # AttributeError: 'SMTP' object has no attribute 'sock')
724 with smtplib.SMTP() as smtp:
725 self.assertIsNone(smtp.sock)
726
727
728 class ESC[4;38;5;81mDefaultArgumentsTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
729
730 def setUp(self):
731 self.msg = EmailMessage()
732 self.msg['From'] = 'Páolo <főo@bar.com>'
733 self.smtp = smtplib.SMTP()
734 self.smtp.ehlo = Mock(return_value=(200, 'OK'))
735 self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock()
736
737 def testSendMessage(self):
738 expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME')
739 self.smtp.send_message(self.msg)
740 self.smtp.send_message(self.msg)
741 self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
742 expected_mail_options)
743 self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3],
744 expected_mail_options)
745
746 def testSendMessageWithMailOptions(self):
747 mail_options = ['STARTTLS']
748 expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME')
749 self.smtp.send_message(self.msg, None, None, mail_options)
750 self.assertEqual(mail_options, ['STARTTLS'])
751 self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
752 expected_mail_options)
753
754
755 # test response of client to a non-successful HELO message
756 class ESC[4;38;5;81mBadHELOServerTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
757
758 def setUp(self):
759 smtplib.socket = mock_socket
760 mock_socket.reply_with(b"199 no hello for you!")
761 self.old_stdout = sys.stdout
762 self.output = io.StringIO()
763 sys.stdout = self.output
764 self.port = 25
765
766 def tearDown(self):
767 smtplib.socket = socket
768 sys.stdout = self.old_stdout
769
770 def testFailingHELO(self):
771 self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
772 HOST, self.port, 'localhost', 3)
773
774
775 class ESC[4;38;5;81mTooLongLineTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
776 respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n'
777
778 def setUp(self):
779 self.thread_key = threading_helper.threading_setup()
780 self.old_stdout = sys.stdout
781 self.output = io.StringIO()
782 sys.stdout = self.output
783
784 self.evt = threading.Event()
785 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
786 self.sock.settimeout(15)
787 self.port = socket_helper.bind_port(self.sock)
788 servargs = (self.evt, self.respdata, self.sock)
789 self.thread = threading.Thread(target=server, args=servargs)
790 self.thread.start()
791 self.evt.wait()
792 self.evt.clear()
793
794 def tearDown(self):
795 self.evt.wait()
796 sys.stdout = self.old_stdout
797 threading_helper.join_thread(self.thread)
798 del self.thread
799 self.doCleanups()
800 threading_helper.threading_cleanup(*self.thread_key)
801
802 def testLineTooLong(self):
803 self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP,
804 HOST, self.port, 'localhost', 3)
805
806
807 sim_users = {'Mr.A@somewhere.com':'John A',
808 'Ms.B@xn--fo-fka.com':'Sally B',
809 'Mrs.C@somewhereesle.com':'Ruth C',
810 }
811
812 sim_auth = ('Mr.A@somewhere.com', 'somepassword')
813 sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
814 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
815 sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
816 'list-2':['Ms.B@xn--fo-fka.com',],
817 }
818
819 # Simulated SMTP channel & server
820 class ESC[4;38;5;81mResponseException(ESC[4;38;5;149mException): pass
821 class ESC[4;38;5;81mSimSMTPChannel(ESC[4;38;5;149msmtpdESC[4;38;5;149m.ESC[4;38;5;149mSMTPChannel):
822
823 quit_response = None
824 mail_response = None
825 rcpt_response = None
826 data_response = None
827 rcpt_count = 0
828 rset_count = 0
829 disconnect = 0
830 AUTH = 99 # Add protocol state to enable auth testing.
831 authenticated_user = None
832
833 def __init__(self, extra_features, *args, **kw):
834 self._extrafeatures = ''.join(
835 [ "250-{0}\r\n".format(x) for x in extra_features ])
836 super(SimSMTPChannel, self).__init__(*args, **kw)
837
838 # AUTH related stuff. It would be nice if support for this were in smtpd.
839 def found_terminator(self):
840 if self.smtp_state == self.AUTH:
841 line = self._emptystring.join(self.received_lines)
842 print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
843 self.received_lines = []
844 try:
845 self.auth_object(line)
846 except ResponseException as e:
847 self.smtp_state = self.COMMAND
848 self.push('%s %s' % (e.smtp_code, e.smtp_error))
849 return
850 super().found_terminator()
851
852
853 def smtp_AUTH(self, arg):
854 if not self.seen_greeting:
855 self.push('503 Error: send EHLO first')
856 return
857 if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
858 self.push('500 Error: command "AUTH" not recognized')
859 return
860 if self.authenticated_user is not None:
861 self.push(
862 '503 Bad sequence of commands: already authenticated')
863 return
864 args = arg.split()
865 if len(args) not in [1, 2]:
866 self.push('501 Syntax: AUTH <mechanism> [initial-response]')
867 return
868 auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
869 try:
870 self.auth_object = getattr(self, auth_object_name)
871 except AttributeError:
872 self.push('504 Command parameter not implemented: unsupported '
873 ' authentication mechanism {!r}'.format(auth_object_name))
874 return
875 self.smtp_state = self.AUTH
876 self.auth_object(args[1] if len(args) == 2 else None)
877
878 def _authenticated(self, user, valid):
879 if valid:
880 self.authenticated_user = user
881 self.push('235 Authentication Succeeded')
882 else:
883 self.push('535 Authentication credentials invalid')
884 self.smtp_state = self.COMMAND
885
886 def _decode_base64(self, string):
887 return base64.decodebytes(string.encode('ascii')).decode('utf-8')
888
889 def _auth_plain(self, arg=None):
890 if arg is None:
891 self.push('334 ')
892 else:
893 logpass = self._decode_base64(arg)
894 try:
895 *_, user, password = logpass.split('\0')
896 except ValueError as e:
897 self.push('535 Splitting response {!r} into user and password'
898 ' failed: {}'.format(logpass, e))
899 return
900 self._authenticated(user, password == sim_auth[1])
901
902 def _auth_login(self, arg=None):
903 if arg is None:
904 # base64 encoded 'Username:'
905 self.push('334 VXNlcm5hbWU6')
906 elif not hasattr(self, '_auth_login_user'):
907 self._auth_login_user = self._decode_base64(arg)
908 # base64 encoded 'Password:'
909 self.push('334 UGFzc3dvcmQ6')
910 else:
911 password = self._decode_base64(arg)
912 self._authenticated(self._auth_login_user, password == sim_auth[1])
913 del self._auth_login_user
914
915 def _auth_buggy(self, arg=None):
916 # This AUTH mechanism will 'trap' client in a neverending 334
917 # base64 encoded 'BuGgYbUgGy'
918 self.push('334 QnVHZ1liVWdHeQ==')
919
920 def _auth_cram_md5(self, arg=None):
921 if arg is None:
922 self.push('334 {}'.format(sim_cram_md5_challenge))
923 else:
924 logpass = self._decode_base64(arg)
925 try:
926 user, hashed_pass = logpass.split()
927 except ValueError as e:
928 self.push('535 Splitting response {!r} into user and password '
929 'failed: {}'.format(logpass, e))
930 return False
931 valid_hashed_pass = hmac.HMAC(
932 sim_auth[1].encode('ascii'),
933 self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
934 'md5').hexdigest()
935 self._authenticated(user, hashed_pass == valid_hashed_pass)
936 # end AUTH related stuff.
937
938 def smtp_EHLO(self, arg):
939 resp = ('250-testhost\r\n'
940 '250-EXPN\r\n'
941 '250-SIZE 20000000\r\n'
942 '250-STARTTLS\r\n'
943 '250-DELIVERBY\r\n')
944 resp = resp + self._extrafeatures + '250 HELP'
945 self.push(resp)
946 self.seen_greeting = arg
947 self.extended_smtp = True
948
949 def smtp_VRFY(self, arg):
950 # For max compatibility smtplib should be sending the raw address.
951 if arg in sim_users:
952 self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg)))
953 else:
954 self.push('550 No such user: %s' % arg)
955
956 def smtp_EXPN(self, arg):
957 list_name = arg.lower()
958 if list_name in sim_lists:
959 user_list = sim_lists[list_name]
960 for n, user_email in enumerate(user_list):
961 quoted_addr = smtplib.quoteaddr(user_email)
962 if n < len(user_list) - 1:
963 self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
964 else:
965 self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
966 else:
967 self.push('550 No access for you!')
968
969 def smtp_QUIT(self, arg):
970 if self.quit_response is None:
971 super(SimSMTPChannel, self).smtp_QUIT(arg)
972 else:
973 self.push(self.quit_response)
974 self.close_when_done()
975
976 def smtp_MAIL(self, arg):
977 if self.mail_response is None:
978 super().smtp_MAIL(arg)
979 else:
980 self.push(self.mail_response)
981 if self.disconnect:
982 self.close_when_done()
983
984 def smtp_RCPT(self, arg):
985 if self.rcpt_response is None:
986 super().smtp_RCPT(arg)
987 return
988 self.rcpt_count += 1
989 self.push(self.rcpt_response[self.rcpt_count-1])
990
991 def smtp_RSET(self, arg):
992 self.rset_count += 1
993 super().smtp_RSET(arg)
994
995 def smtp_DATA(self, arg):
996 if self.data_response is None:
997 super().smtp_DATA(arg)
998 else:
999 self.push(self.data_response)
1000
1001 def handle_error(self):
1002 raise
1003
1004
1005 class ESC[4;38;5;81mSimSMTPServer(ESC[4;38;5;149msmtpdESC[4;38;5;149m.ESC[4;38;5;149mSMTPServer):
1006
1007 channel_class = SimSMTPChannel
1008
1009 def __init__(self, *args, **kw):
1010 self._extra_features = []
1011 self._addresses = {}
1012 smtpd.SMTPServer.__init__(self, *args, **kw)
1013
1014 def handle_accepted(self, conn, addr):
1015 self._SMTPchannel = self.channel_class(
1016 self._extra_features, self, conn, addr,
1017 decode_data=self._decode_data)
1018
1019 def process_message(self, peer, mailfrom, rcpttos, data):
1020 self._addresses['from'] = mailfrom
1021 self._addresses['tos'] = rcpttos
1022
1023 def add_feature(self, feature):
1024 self._extra_features.append(feature)
1025
1026 def handle_error(self):
1027 raise
1028
1029
1030 # Test various SMTP & ESMTP commands/behaviors that require a simulated server
1031 # (i.e., something with more features than DebuggingServer)
1032 class ESC[4;38;5;81mSMTPSimTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1033
1034 def setUp(self):
1035 self.thread_key = threading_helper.threading_setup()
1036 self.real_getfqdn = socket.getfqdn
1037 socket.getfqdn = mock_socket.getfqdn
1038 self.serv_evt = threading.Event()
1039 self.client_evt = threading.Event()
1040 # Pick a random unused port by passing 0 for the port number
1041 self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True)
1042 # Keep a note of what port was assigned
1043 self.port = self.serv.socket.getsockname()[1]
1044 serv_args = (self.serv, self.serv_evt, self.client_evt)
1045 self.thread = threading.Thread(target=debugging_server, args=serv_args)
1046 self.thread.start()
1047
1048 # wait until server thread has assigned a port number
1049 self.serv_evt.wait()
1050 self.serv_evt.clear()
1051
1052 def tearDown(self):
1053 socket.getfqdn = self.real_getfqdn
1054 # indicate that the client is finished
1055 self.client_evt.set()
1056 # wait for the server thread to terminate
1057 self.serv_evt.wait()
1058 threading_helper.join_thread(self.thread)
1059 del self.thread
1060 self.doCleanups()
1061 threading_helper.threading_cleanup(*self.thread_key)
1062
1063 def testBasic(self):
1064 # smoke test
1065 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1066 timeout=support.LOOPBACK_TIMEOUT)
1067 smtp.quit()
1068
1069 def testEHLO(self):
1070 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1071 timeout=support.LOOPBACK_TIMEOUT)
1072
1073 # no features should be present before the EHLO
1074 self.assertEqual(smtp.esmtp_features, {})
1075
1076 # features expected from the test server
1077 expected_features = {'expn':'',
1078 'size': '20000000',
1079 'starttls': '',
1080 'deliverby': '',
1081 'help': '',
1082 }
1083
1084 smtp.ehlo()
1085 self.assertEqual(smtp.esmtp_features, expected_features)
1086 for k in expected_features:
1087 self.assertTrue(smtp.has_extn(k))
1088 self.assertFalse(smtp.has_extn('unsupported-feature'))
1089 smtp.quit()
1090
1091 def testVRFY(self):
1092 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1093 timeout=support.LOOPBACK_TIMEOUT)
1094
1095 for addr_spec, name in sim_users.items():
1096 expected_known = (250, bytes('%s %s' %
1097 (name, smtplib.quoteaddr(addr_spec)),
1098 "ascii"))
1099 self.assertEqual(smtp.vrfy(addr_spec), expected_known)
1100
1101 u = 'nobody@nowhere.com'
1102 expected_unknown = (550, ('No such user: %s' % u).encode('ascii'))
1103 self.assertEqual(smtp.vrfy(u), expected_unknown)
1104 smtp.quit()
1105
1106 def testEXPN(self):
1107 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1108 timeout=support.LOOPBACK_TIMEOUT)
1109
1110 for listname, members in sim_lists.items():
1111 users = []
1112 for m in members:
1113 users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
1114 expected_known = (250, bytes('\n'.join(users), "ascii"))
1115 self.assertEqual(smtp.expn(listname), expected_known)
1116
1117 u = 'PSU-Members-List'
1118 expected_unknown = (550, b'No access for you!')
1119 self.assertEqual(smtp.expn(u), expected_unknown)
1120 smtp.quit()
1121
1122 def testAUTH_PLAIN(self):
1123 self.serv.add_feature("AUTH PLAIN")
1124 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1125 timeout=support.LOOPBACK_TIMEOUT)
1126 resp = smtp.login(sim_auth[0], sim_auth[1])
1127 self.assertEqual(resp, (235, b'Authentication Succeeded'))
1128 smtp.close()
1129
1130 def testAUTH_LOGIN(self):
1131 self.serv.add_feature("AUTH LOGIN")
1132 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1133 timeout=support.LOOPBACK_TIMEOUT)
1134 resp = smtp.login(sim_auth[0], sim_auth[1])
1135 self.assertEqual(resp, (235, b'Authentication Succeeded'))
1136 smtp.close()
1137
1138 def testAUTH_LOGIN_initial_response_ok(self):
1139 self.serv.add_feature("AUTH LOGIN")
1140 with smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1141 timeout=support.LOOPBACK_TIMEOUT) as smtp:
1142 smtp.user, smtp.password = sim_auth
1143 smtp.ehlo("test_auth_login")
1144 resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=True)
1145 self.assertEqual(resp, (235, b'Authentication Succeeded'))
1146
1147 def testAUTH_LOGIN_initial_response_notok(self):
1148 self.serv.add_feature("AUTH LOGIN")
1149 with smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1150 timeout=support.LOOPBACK_TIMEOUT) as smtp:
1151 smtp.user, smtp.password = sim_auth
1152 smtp.ehlo("test_auth_login")
1153 resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=False)
1154 self.assertEqual(resp, (235, b'Authentication Succeeded'))
1155
1156 def testAUTH_BUGGY(self):
1157 self.serv.add_feature("AUTH BUGGY")
1158
1159 def auth_buggy(challenge=None):
1160 self.assertEqual(b"BuGgYbUgGy", challenge)
1161 return "\0"
1162
1163 smtp = smtplib.SMTP(
1164 HOST, self.port, local_hostname='localhost',
1165 timeout=support.LOOPBACK_TIMEOUT
1166 )
1167 try:
1168 smtp.user, smtp.password = sim_auth
1169 smtp.ehlo("test_auth_buggy")
1170 expect = r"^Server AUTH mechanism infinite loop.*"
1171 with self.assertRaisesRegex(smtplib.SMTPException, expect) as cm:
1172 smtp.auth("BUGGY", auth_buggy, initial_response_ok=False)
1173 finally:
1174 smtp.close()
1175
1176 @hashlib_helper.requires_hashdigest('md5', openssl=True)
1177 def testAUTH_CRAM_MD5(self):
1178 self.serv.add_feature("AUTH CRAM-MD5")
1179 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1180 timeout=support.LOOPBACK_TIMEOUT)
1181 resp = smtp.login(sim_auth[0], sim_auth[1])
1182 self.assertEqual(resp, (235, b'Authentication Succeeded'))
1183 smtp.close()
1184
1185 @hashlib_helper.requires_hashdigest('md5', openssl=True)
1186 def testAUTH_multiple(self):
1187 # Test that multiple authentication methods are tried.
1188 self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
1189 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1190 timeout=support.LOOPBACK_TIMEOUT)
1191 resp = smtp.login(sim_auth[0], sim_auth[1])
1192 self.assertEqual(resp, (235, b'Authentication Succeeded'))
1193 smtp.close()
1194
1195 def test_auth_function(self):
1196 supported = {'PLAIN', 'LOGIN'}
1197 try:
1198 hashlib.md5()
1199 except ValueError:
1200 pass
1201 else:
1202 supported.add('CRAM-MD5')
1203 for mechanism in supported:
1204 self.serv.add_feature("AUTH {}".format(mechanism))
1205 for mechanism in supported:
1206 with self.subTest(mechanism=mechanism):
1207 smtp = smtplib.SMTP(HOST, self.port,
1208 local_hostname='localhost',
1209 timeout=support.LOOPBACK_TIMEOUT)
1210 smtp.ehlo('foo')
1211 smtp.user, smtp.password = sim_auth[0], sim_auth[1]
1212 method = 'auth_' + mechanism.lower().replace('-', '_')
1213 resp = smtp.auth(mechanism, getattr(smtp, method))
1214 self.assertEqual(resp, (235, b'Authentication Succeeded'))
1215 smtp.close()
1216
1217 def test_quit_resets_greeting(self):
1218 smtp = smtplib.SMTP(HOST, self.port,
1219 local_hostname='localhost',
1220 timeout=support.LOOPBACK_TIMEOUT)
1221 code, message = smtp.ehlo()
1222 self.assertEqual(code, 250)
1223 self.assertIn('size', smtp.esmtp_features)
1224 smtp.quit()
1225 self.assertNotIn('size', smtp.esmtp_features)
1226 smtp.connect(HOST, self.port)
1227 self.assertNotIn('size', smtp.esmtp_features)
1228 smtp.ehlo_or_helo_if_needed()
1229 self.assertIn('size', smtp.esmtp_features)
1230 smtp.quit()
1231
1232 def test_with_statement(self):
1233 with smtplib.SMTP(HOST, self.port) as smtp:
1234 code, message = smtp.noop()
1235 self.assertEqual(code, 250)
1236 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
1237 with smtplib.SMTP(HOST, self.port) as smtp:
1238 smtp.close()
1239 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
1240
1241 def test_with_statement_QUIT_failure(self):
1242 with self.assertRaises(smtplib.SMTPResponseException) as error:
1243 with smtplib.SMTP(HOST, self.port) as smtp:
1244 smtp.noop()
1245 self.serv._SMTPchannel.quit_response = '421 QUIT FAILED'
1246 self.assertEqual(error.exception.smtp_code, 421)
1247 self.assertEqual(error.exception.smtp_error, b'QUIT FAILED')
1248
1249 #TODO: add tests for correct AUTH method fallback now that the
1250 #test infrastructure can support it.
1251
1252 # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception
1253 def test__rest_from_mail_cmd(self):
1254 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1255 timeout=support.LOOPBACK_TIMEOUT)
1256 smtp.noop()
1257 self.serv._SMTPchannel.mail_response = '451 Requested action aborted'
1258 self.serv._SMTPchannel.disconnect = True
1259 with self.assertRaises(smtplib.SMTPSenderRefused):
1260 smtp.sendmail('John', 'Sally', 'test message')
1261 self.assertIsNone(smtp.sock)
1262
1263 # Issue 5713: make sure close, not rset, is called if we get a 421 error
1264 def test_421_from_mail_cmd(self):
1265 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1266 timeout=support.LOOPBACK_TIMEOUT)
1267 smtp.noop()
1268 self.serv._SMTPchannel.mail_response = '421 closing connection'
1269 with self.assertRaises(smtplib.SMTPSenderRefused):
1270 smtp.sendmail('John', 'Sally', 'test message')
1271 self.assertIsNone(smtp.sock)
1272 self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1273
1274 def test_421_from_rcpt_cmd(self):
1275 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1276 timeout=support.LOOPBACK_TIMEOUT)
1277 smtp.noop()
1278 self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing']
1279 with self.assertRaises(smtplib.SMTPRecipientsRefused) as r:
1280 smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message')
1281 self.assertIsNone(smtp.sock)
1282 self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1283 self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')})
1284
1285 def test_421_from_data_cmd(self):
1286 class ESC[4;38;5;81mMySimSMTPChannel(ESC[4;38;5;149mSimSMTPChannel):
1287 def found_terminator(self):
1288 if self.smtp_state == self.DATA:
1289 self.push('421 closing')
1290 else:
1291 super().found_terminator()
1292 self.serv.channel_class = MySimSMTPChannel
1293 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1294 timeout=support.LOOPBACK_TIMEOUT)
1295 smtp.noop()
1296 with self.assertRaises(smtplib.SMTPDataError):
1297 smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message')
1298 self.assertIsNone(smtp.sock)
1299 self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0)
1300
1301 def test_smtputf8_NotSupportedError_if_no_server_support(self):
1302 smtp = smtplib.SMTP(
1303 HOST, self.port, local_hostname='localhost',
1304 timeout=support.LOOPBACK_TIMEOUT)
1305 self.addCleanup(smtp.close)
1306 smtp.ehlo()
1307 self.assertTrue(smtp.does_esmtp)
1308 self.assertFalse(smtp.has_extn('smtputf8'))
1309 self.assertRaises(
1310 smtplib.SMTPNotSupportedError,
1311 smtp.sendmail,
1312 'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1313 self.assertRaises(
1314 smtplib.SMTPNotSupportedError,
1315 smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8'])
1316
1317 def test_send_unicode_without_SMTPUTF8(self):
1318 smtp = smtplib.SMTP(
1319 HOST, self.port, local_hostname='localhost',
1320 timeout=support.LOOPBACK_TIMEOUT)
1321 self.addCleanup(smtp.close)
1322 self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '')
1323 self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice')
1324
1325 def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self):
1326 # This test is located here and not in the SMTPUTF8SimTests
1327 # class because it needs a "regular" SMTP server to work
1328 msg = EmailMessage()
1329 msg['From'] = "Páolo <főo@bar.com>"
1330 msg['To'] = 'Dinsdale'
1331 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1332 smtp = smtplib.SMTP(
1333 HOST, self.port, local_hostname='localhost',
1334 timeout=support.LOOPBACK_TIMEOUT)
1335 self.addCleanup(smtp.close)
1336 with self.assertRaises(smtplib.SMTPNotSupportedError):
1337 smtp.send_message(msg)
1338
1339 def test_name_field_not_included_in_envelop_addresses(self):
1340 smtp = smtplib.SMTP(
1341 HOST, self.port, local_hostname='localhost',
1342 timeout=support.LOOPBACK_TIMEOUT)
1343 self.addCleanup(smtp.close)
1344
1345 message = EmailMessage()
1346 message['From'] = email.utils.formataddr(('Michaël', 'michael@example.com'))
1347 message['To'] = email.utils.formataddr(('René', 'rene@example.com'))
1348
1349 self.assertDictEqual(smtp.send_message(message), {})
1350
1351 self.assertEqual(self.serv._addresses['from'], 'michael@example.com')
1352 self.assertEqual(self.serv._addresses['tos'], ['rene@example.com'])
1353
1354
1355 class ESC[4;38;5;81mSimSMTPUTF8Server(ESC[4;38;5;149mSimSMTPServer):
1356
1357 def __init__(self, *args, **kw):
1358 # The base SMTP server turns these on automatically, but our test
1359 # server is set up to munge the EHLO response, so we need to provide
1360 # them as well. And yes, the call is to SMTPServer not SimSMTPServer.
1361 self._extra_features = ['SMTPUTF8', '8BITMIME']
1362 smtpd.SMTPServer.__init__(self, *args, **kw)
1363
1364 def handle_accepted(self, conn, addr):
1365 self._SMTPchannel = self.channel_class(
1366 self._extra_features, self, conn, addr,
1367 decode_data=self._decode_data,
1368 enable_SMTPUTF8=self.enable_SMTPUTF8,
1369 )
1370
1371 def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None,
1372 rcpt_options=None):
1373 self.last_peer = peer
1374 self.last_mailfrom = mailfrom
1375 self.last_rcpttos = rcpttos
1376 self.last_message = data
1377 self.last_mail_options = mail_options
1378 self.last_rcpt_options = rcpt_options
1379
1380
1381 class ESC[4;38;5;81mSMTPUTF8SimTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1382
1383 maxDiff = None
1384
1385 def setUp(self):
1386 self.thread_key = threading_helper.threading_setup()
1387 self.real_getfqdn = socket.getfqdn
1388 socket.getfqdn = mock_socket.getfqdn
1389 self.serv_evt = threading.Event()
1390 self.client_evt = threading.Event()
1391 # Pick a random unused port by passing 0 for the port number
1392 self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1),
1393 decode_data=False,
1394 enable_SMTPUTF8=True)
1395 # Keep a note of what port was assigned
1396 self.port = self.serv.socket.getsockname()[1]
1397 serv_args = (self.serv, self.serv_evt, self.client_evt)
1398 self.thread = threading.Thread(target=debugging_server, args=serv_args)
1399 self.thread.start()
1400
1401 # wait until server thread has assigned a port number
1402 self.serv_evt.wait()
1403 self.serv_evt.clear()
1404
1405 def tearDown(self):
1406 socket.getfqdn = self.real_getfqdn
1407 # indicate that the client is finished
1408 self.client_evt.set()
1409 # wait for the server thread to terminate
1410 self.serv_evt.wait()
1411 threading_helper.join_thread(self.thread)
1412 del self.thread
1413 self.doCleanups()
1414 threading_helper.threading_cleanup(*self.thread_key)
1415
1416 def test_test_server_supports_extensions(self):
1417 smtp = smtplib.SMTP(
1418 HOST, self.port, local_hostname='localhost',
1419 timeout=support.LOOPBACK_TIMEOUT)
1420 self.addCleanup(smtp.close)
1421 smtp.ehlo()
1422 self.assertTrue(smtp.does_esmtp)
1423 self.assertTrue(smtp.has_extn('smtputf8'))
1424
1425 def test_send_unicode_with_SMTPUTF8_via_sendmail(self):
1426 m = '¡a test message containing unicode!'.encode('utf-8')
1427 smtp = smtplib.SMTP(
1428 HOST, self.port, local_hostname='localhost',
1429 timeout=support.LOOPBACK_TIMEOUT)
1430 self.addCleanup(smtp.close)
1431 smtp.sendmail('Jőhn', 'Sálly', m,
1432 mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1433 self.assertEqual(self.serv.last_mailfrom, 'Jőhn')
1434 self.assertEqual(self.serv.last_rcpttos, ['Sálly'])
1435 self.assertEqual(self.serv.last_message, m)
1436 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1437 self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1438 self.assertEqual(self.serv.last_rcpt_options, [])
1439
1440 def test_send_unicode_with_SMTPUTF8_via_low_level_API(self):
1441 m = '¡a test message containing unicode!'.encode('utf-8')
1442 smtp = smtplib.SMTP(
1443 HOST, self.port, local_hostname='localhost',
1444 timeout=support.LOOPBACK_TIMEOUT)
1445 self.addCleanup(smtp.close)
1446 smtp.ehlo()
1447 self.assertEqual(
1448 smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']),
1449 (250, b'OK'))
1450 self.assertEqual(smtp.rcpt('János'), (250, b'OK'))
1451 self.assertEqual(smtp.data(m), (250, b'OK'))
1452 self.assertEqual(self.serv.last_mailfrom, 'Jő')
1453 self.assertEqual(self.serv.last_rcpttos, ['János'])
1454 self.assertEqual(self.serv.last_message, m)
1455 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1456 self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1457 self.assertEqual(self.serv.last_rcpt_options, [])
1458
1459 def test_send_message_uses_smtputf8_if_addrs_non_ascii(self):
1460 msg = EmailMessage()
1461 msg['From'] = "Páolo <főo@bar.com>"
1462 msg['To'] = 'Dinsdale'
1463 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1464 # XXX I don't know why I need two \n's here, but this is an existing
1465 # bug (if it is one) and not a problem with the new functionality.
1466 msg.set_content("oh là là, know what I mean, know what I mean?\n\n")
1467 # XXX smtpd converts received /r/n to /n, so we can't easily test that
1468 # we are successfully sending /r/n :(.
1469 expected = textwrap.dedent("""\
1470 From: Páolo <főo@bar.com>
1471 To: Dinsdale
1472 Subject: Nudge nudge, wink, wink \u1F609
1473 Content-Type: text/plain; charset="utf-8"
1474 Content-Transfer-Encoding: 8bit
1475 MIME-Version: 1.0
1476
1477 oh là là, know what I mean, know what I mean?
1478 """)
1479 smtp = smtplib.SMTP(
1480 HOST, self.port, local_hostname='localhost',
1481 timeout=support.LOOPBACK_TIMEOUT)
1482 self.addCleanup(smtp.close)
1483 self.assertEqual(smtp.send_message(msg), {})
1484 self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com')
1485 self.assertEqual(self.serv.last_rcpttos, ['Dinsdale'])
1486 self.assertEqual(self.serv.last_message.decode(), expected)
1487 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1488 self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1489 self.assertEqual(self.serv.last_rcpt_options, [])
1490
1491
1492 EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='')
1493
1494 class ESC[4;38;5;81mSimSMTPAUTHInitialResponseChannel(ESC[4;38;5;149mSimSMTPChannel):
1495 def smtp_AUTH(self, arg):
1496 # RFC 4954's AUTH command allows for an optional initial-response.
1497 # Not all AUTH methods support this; some require a challenge. AUTH
1498 # PLAIN does those, so test that here. See issue #15014.
1499 args = arg.split()
1500 if args[0].lower() == 'plain':
1501 if len(args) == 2:
1502 # AUTH PLAIN <initial-response> with the response base 64
1503 # encoded. Hard code the expected response for the test.
1504 if args[1] == EXPECTED_RESPONSE:
1505 self.push('235 Ok')
1506 return
1507 self.push('571 Bad authentication')
1508
1509 class ESC[4;38;5;81mSimSMTPAUTHInitialResponseServer(ESC[4;38;5;149mSimSMTPServer):
1510 channel_class = SimSMTPAUTHInitialResponseChannel
1511
1512
1513 class ESC[4;38;5;81mSMTPAUTHInitialResponseSimTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1514 def setUp(self):
1515 self.thread_key = threading_helper.threading_setup()
1516 self.real_getfqdn = socket.getfqdn
1517 socket.getfqdn = mock_socket.getfqdn
1518 self.serv_evt = threading.Event()
1519 self.client_evt = threading.Event()
1520 # Pick a random unused port by passing 0 for the port number
1521 self.serv = SimSMTPAUTHInitialResponseServer(
1522 (HOST, 0), ('nowhere', -1), decode_data=True)
1523 # Keep a note of what port was assigned
1524 self.port = self.serv.socket.getsockname()[1]
1525 serv_args = (self.serv, self.serv_evt, self.client_evt)
1526 self.thread = threading.Thread(target=debugging_server, args=serv_args)
1527 self.thread.start()
1528
1529 # wait until server thread has assigned a port number
1530 self.serv_evt.wait()
1531 self.serv_evt.clear()
1532
1533 def tearDown(self):
1534 socket.getfqdn = self.real_getfqdn
1535 # indicate that the client is finished
1536 self.client_evt.set()
1537 # wait for the server thread to terminate
1538 self.serv_evt.wait()
1539 threading_helper.join_thread(self.thread)
1540 del self.thread
1541 self.doCleanups()
1542 threading_helper.threading_cleanup(*self.thread_key)
1543
1544 def testAUTH_PLAIN_initial_response_login(self):
1545 self.serv.add_feature('AUTH PLAIN')
1546 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1547 timeout=support.LOOPBACK_TIMEOUT)
1548 smtp.login('psu', 'doesnotexist')
1549 smtp.close()
1550
1551 def testAUTH_PLAIN_initial_response_auth(self):
1552 self.serv.add_feature('AUTH PLAIN')
1553 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1554 timeout=support.LOOPBACK_TIMEOUT)
1555 smtp.user = 'psu'
1556 smtp.password = 'doesnotexist'
1557 code, response = smtp.auth('plain', smtp.auth_plain)
1558 smtp.close()
1559 self.assertEqual(code, 235)
1560
1561
1562 if __name__ == '__main__':
1563 unittest.main()