python (3.12.0)

(root)/
lib/
python3.12/
test/
test_smtplib.py
       1  import base64
       2  import email.mime.text
       3  from email.message import EmailMessage
       4  from email.base64mime import body_encode as encode_base64
       5  import email.utils
       6  import hashlib
       7  import hmac
       8  import socket
       9  import smtplib
      10  import io
      11  import re
      12  import sys
      13  import time
      14  import select
      15  import errno
      16  import textwrap
      17  import threading
      18  
      19  import unittest
      20  from test import support, mock_socket
      21  from test.support import hashlib_helper
      22  from test.support import socket_helper
      23  from test.support import threading_helper
      24  from test.support import asyncore
      25  from unittest.mock import Mock
      26  
      27  from . import smtpd
      28  
      29  
      30  support.requires_working_socket(module=True)
      31  
      32  HOST = socket_helper.HOST
      33  
      34  if sys.platform == 'darwin':
      35      # select.poll returns a select.POLLHUP at the end of the tests
      36      # on darwin, so just ignore it
      37      def handle_expt(self):
      38          pass
      39      smtpd.SMTPChannel.handle_expt = handle_expt
      40  
      41  
      42  def server(evt, buf, serv):
      43      serv.listen()
      44      evt.set()
      45      try:
      46          conn, addr = serv.accept()
      47      except TimeoutError:
      48          pass
      49      else:
      50          n = 500
      51          while buf and n > 0:
      52              r, w, e = select.select([], [conn], [])
      53              if w:
      54                  sent = conn.send(buf)
      55                  buf = buf[sent:]
      56  
      57              n -= 1
      58  
      59          conn.close()
      60      finally:
      61          serv.close()
      62          evt.set()
      63  
      64  class ESC[4;38;5;81mGeneralTests:
      65  
      66      def setUp(self):
      67          smtplib.socket = mock_socket
      68          self.port = 25
      69  
      70      def tearDown(self):
      71          smtplib.socket = socket
      72  
      73      # This method is no longer used but is retained for backward compatibility,
      74      # so test to make sure it still works.
      75      def testQuoteData(self):
      76          teststr  = "abc\n.jkl\rfoo\r\n..blue"
      77          expected = "abc\r\n..jkl\r\nfoo\r\n...blue"
      78          self.assertEqual(expected, smtplib.quotedata(teststr))
      79  
      80      def testBasic1(self):
      81          mock_socket.reply_with(b"220 Hola mundo")
      82          # connects
      83          client = self.client(HOST, self.port)
      84          client.close()
      85  
      86      def testSourceAddress(self):
      87          mock_socket.reply_with(b"220 Hola mundo")
      88          # connects
      89          client = self.client(HOST, self.port,
      90                               source_address=('127.0.0.1',19876))
      91          self.assertEqual(client.source_address, ('127.0.0.1', 19876))
      92          client.close()
      93  
      94      def testBasic2(self):
      95          mock_socket.reply_with(b"220 Hola mundo")
      96          # connects, include port in host name
      97          client = self.client("%s:%s" % (HOST, self.port))
      98          client.close()
      99  
     100      def testLocalHostName(self):
     101          mock_socket.reply_with(b"220 Hola mundo")
     102          # check that supplied local_hostname is used
     103          client = self.client(HOST, self.port, local_hostname="testhost")
     104          self.assertEqual(client.local_hostname, "testhost")
     105          client.close()
     106  
     107      def testTimeoutDefault(self):
     108          mock_socket.reply_with(b"220 Hola mundo")
     109          self.assertIsNone(mock_socket.getdefaulttimeout())
     110          mock_socket.setdefaulttimeout(30)
     111          self.assertEqual(mock_socket.getdefaulttimeout(), 30)
     112          try:
     113              client = self.client(HOST, self.port)
     114          finally:
     115              mock_socket.setdefaulttimeout(None)
     116          self.assertEqual(client.sock.gettimeout(), 30)
     117          client.close()
     118  
     119      def testTimeoutNone(self):
     120          mock_socket.reply_with(b"220 Hola mundo")
     121          self.assertIsNone(socket.getdefaulttimeout())
     122          socket.setdefaulttimeout(30)
     123          try:
     124              client = self.client(HOST, self.port, timeout=None)
     125          finally:
     126              socket.setdefaulttimeout(None)
     127          self.assertIsNone(client.sock.gettimeout())
     128          client.close()
     129  
     130      def testTimeoutZero(self):
     131          mock_socket.reply_with(b"220 Hola mundo")
     132          with self.assertRaises(ValueError):
     133              self.client(HOST, self.port, timeout=0)
     134  
     135      def testTimeoutValue(self):
     136          mock_socket.reply_with(b"220 Hola mundo")
     137          client = self.client(HOST, self.port, timeout=30)
     138          self.assertEqual(client.sock.gettimeout(), 30)
     139          client.close()
     140  
     141      def test_debuglevel(self):
     142          mock_socket.reply_with(b"220 Hello world")
     143          client = self.client()
     144          client.set_debuglevel(1)
     145          with support.captured_stderr() as stderr:
     146              client.connect(HOST, self.port)
     147          client.close()
     148          expected = re.compile(r"^connect:", re.MULTILINE)
     149          self.assertRegex(stderr.getvalue(), expected)
     150  
     151      def test_debuglevel_2(self):
     152          mock_socket.reply_with(b"220 Hello world")
     153          client = self.client()
     154          client.set_debuglevel(2)
     155          with support.captured_stderr() as stderr:
     156              client.connect(HOST, self.port)
     157          client.close()
     158          expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ",
     159                                re.MULTILINE)
     160          self.assertRegex(stderr.getvalue(), expected)
     161  
     162  
     163  class ESC[4;38;5;81mSMTPGeneralTests(ESC[4;38;5;149mGeneralTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     164  
     165      client = smtplib.SMTP
     166  
     167  
     168  class ESC[4;38;5;81mLMTPGeneralTests(ESC[4;38;5;149mGeneralTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     169  
     170      client = smtplib.LMTP
     171  
     172      @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), "test requires Unix domain socket")
     173      def testUnixDomainSocketTimeoutDefault(self):
     174          local_host = '/some/local/lmtp/delivery/program'
     175          mock_socket.reply_with(b"220 Hello world")
     176          try:
     177              client = self.client(local_host, self.port)
     178          finally:
     179              mock_socket.setdefaulttimeout(None)
     180          self.assertIsNone(client.sock.gettimeout())
     181          client.close()
     182  
     183      def testTimeoutZero(self):
     184          super().testTimeoutZero()
     185          local_host = '/some/local/lmtp/delivery/program'
     186          with self.assertRaises(ValueError):
     187              self.client(local_host, timeout=0)
     188  
     189  # Test server thread using the specified SMTP server class
     190  def debugging_server(serv, serv_evt, client_evt):
     191      serv_evt.set()
     192  
     193      try:
     194          if hasattr(select, 'poll'):
     195              poll_fun = asyncore.poll2
     196          else:
     197              poll_fun = asyncore.poll
     198  
     199          n = 1000
     200          while asyncore.socket_map and n > 0:
     201              poll_fun(0.01, asyncore.socket_map)
     202  
     203              # when the client conversation is finished, it will
     204              # set client_evt, and it's then ok to kill the server
     205              if client_evt.is_set():
     206                  serv.close()
     207                  break
     208  
     209              n -= 1
     210  
     211      except TimeoutError:
     212          pass
     213      finally:
     214          if not client_evt.is_set():
     215              # allow some time for the client to read the result
     216              time.sleep(0.5)
     217              serv.close()
     218          asyncore.close_all()
     219          serv_evt.set()
     220  
     221  MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
     222  MSG_END = '------------ END MESSAGE ------------\n'
     223  
     224  # NOTE: Some SMTP objects in the tests below are created with a non-default
     225  # local_hostname argument to the constructor, since (on some systems) the FQDN
     226  # lookup caused by the default local_hostname sometimes takes so long that the
     227  # test server times out, causing the test to fail.
     228  
     229  # Test behavior of smtpd.DebuggingServer
     230  class ESC[4;38;5;81mDebuggingServerTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     231  
     232      maxDiff = None
     233  
     234      def setUp(self):
     235          self.thread_key = threading_helper.threading_setup()
     236          self.real_getfqdn = socket.getfqdn
     237          socket.getfqdn = mock_socket.getfqdn
     238          # temporarily replace sys.stdout to capture DebuggingServer output
     239          self.old_stdout = sys.stdout
     240          self.output = io.StringIO()
     241          sys.stdout = self.output
     242  
     243          self.serv_evt = threading.Event()
     244          self.client_evt = threading.Event()
     245          # Capture SMTPChannel debug output
     246          self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
     247          smtpd.DEBUGSTREAM = io.StringIO()
     248          # Pick a random unused port by passing 0 for the port number
     249          self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1),
     250                                            decode_data=True)
     251          # Keep a note of what server host and port were assigned
     252          self.host, self.port = self.serv.socket.getsockname()[:2]
     253          serv_args = (self.serv, self.serv_evt, self.client_evt)
     254          self.thread = threading.Thread(target=debugging_server, args=serv_args)
     255          self.thread.start()
     256  
     257          # wait until server thread has assigned a port number
     258          self.serv_evt.wait()
     259          self.serv_evt.clear()
     260  
     261      def tearDown(self):
     262          socket.getfqdn = self.real_getfqdn
     263          # indicate that the client is finished
     264          self.client_evt.set()
     265          # wait for the server thread to terminate
     266          self.serv_evt.wait()
     267          threading_helper.join_thread(self.thread)
     268          # restore sys.stdout
     269          sys.stdout = self.old_stdout
     270          # restore DEBUGSTREAM
     271          smtpd.DEBUGSTREAM.close()
     272          smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
     273          del self.thread
     274          self.doCleanups()
     275          threading_helper.threading_cleanup(*self.thread_key)
     276  
     277      def get_output_without_xpeer(self):
     278          test_output = self.output.getvalue()
     279          return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2',
     280                        test_output, flags=re.MULTILINE|re.DOTALL)
     281  
     282      def testBasic(self):
     283          # connect
     284          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     285                              timeout=support.LOOPBACK_TIMEOUT)
     286          smtp.quit()
     287  
     288      def testSourceAddress(self):
     289          # connect
     290          src_port = socket_helper.find_unused_port()
     291          try:
     292              smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost',
     293                                  timeout=support.LOOPBACK_TIMEOUT,
     294                                  source_address=(self.host, src_port))
     295              self.addCleanup(smtp.close)
     296              self.assertEqual(smtp.source_address, (self.host, src_port))
     297              self.assertEqual(smtp.local_hostname, 'localhost')
     298              smtp.quit()
     299          except OSError as e:
     300              if e.errno == errno.EADDRINUSE:
     301                  self.skipTest("couldn't bind to source port %d" % src_port)
     302              raise
     303  
     304      def testNOOP(self):
     305          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     306                              timeout=support.LOOPBACK_TIMEOUT)
     307          self.addCleanup(smtp.close)
     308          expected = (250, b'OK')
     309          self.assertEqual(smtp.noop(), expected)
     310          smtp.quit()
     311  
     312      def testRSET(self):
     313          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     314                              timeout=support.LOOPBACK_TIMEOUT)
     315          self.addCleanup(smtp.close)
     316          expected = (250, b'OK')
     317          self.assertEqual(smtp.rset(), expected)
     318          smtp.quit()
     319  
     320      def testELHO(self):
     321          # EHLO isn't implemented in DebuggingServer
     322          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     323                              timeout=support.LOOPBACK_TIMEOUT)
     324          self.addCleanup(smtp.close)
     325          expected = (250, b'\nSIZE 33554432\nHELP')
     326          self.assertEqual(smtp.ehlo(), expected)
     327          smtp.quit()
     328  
     329      def testEXPNNotImplemented(self):
     330          # EXPN isn't implemented in DebuggingServer
     331          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     332                              timeout=support.LOOPBACK_TIMEOUT)
     333          self.addCleanup(smtp.close)
     334          expected = (502, b'EXPN not implemented')
     335          smtp.putcmd('EXPN')
     336          self.assertEqual(smtp.getreply(), expected)
     337          smtp.quit()
     338  
     339      def test_issue43124_putcmd_escapes_newline(self):
     340          # see: https://bugs.python.org/issue43124
     341          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     342                              timeout=support.LOOPBACK_TIMEOUT)
     343          self.addCleanup(smtp.close)
     344          with self.assertRaises(ValueError) as exc:
     345              smtp.putcmd('helo\nX-INJECTED')
     346          self.assertIn("prohibited newline characters", str(exc.exception))
     347          smtp.quit()
     348  
     349      def testVRFY(self):
     350          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     351                              timeout=support.LOOPBACK_TIMEOUT)
     352          self.addCleanup(smtp.close)
     353          expected = (252, b'Cannot VRFY user, but will accept message ' + \
     354                           b'and attempt delivery')
     355          self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
     356          self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
     357          smtp.quit()
     358  
     359      def testSecondHELO(self):
     360          # check that a second HELO returns a message that it's a duplicate
     361          # (this behavior is specific to smtpd.SMTPChannel)
     362          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     363                              timeout=support.LOOPBACK_TIMEOUT)
     364          self.addCleanup(smtp.close)
     365          smtp.helo()
     366          expected = (503, b'Duplicate HELO/EHLO')
     367          self.assertEqual(smtp.helo(), expected)
     368          smtp.quit()
     369  
     370      def testHELP(self):
     371          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     372                              timeout=support.LOOPBACK_TIMEOUT)
     373          self.addCleanup(smtp.close)
     374          self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \
     375                                        b'RCPT DATA RSET NOOP QUIT VRFY')
     376          smtp.quit()
     377  
     378      def testSend(self):
     379          # connect and send mail
     380          m = 'A test message'
     381          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     382                              timeout=support.LOOPBACK_TIMEOUT)
     383          self.addCleanup(smtp.close)
     384          smtp.sendmail('John', 'Sally', m)
     385          # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
     386          # in asyncore.  This sleep might help, but should really be fixed
     387          # properly by using an Event variable.
     388          time.sleep(0.01)
     389          smtp.quit()
     390  
     391          self.client_evt.set()
     392          self.serv_evt.wait()
     393          self.output.flush()
     394          mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
     395          self.assertEqual(self.output.getvalue(), mexpect)
     396  
     397      def testSendBinary(self):
     398          m = b'A test message'
     399          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     400                              timeout=support.LOOPBACK_TIMEOUT)
     401          self.addCleanup(smtp.close)
     402          smtp.sendmail('John', 'Sally', m)
     403          # XXX (see comment in testSend)
     404          time.sleep(0.01)
     405          smtp.quit()
     406  
     407          self.client_evt.set()
     408          self.serv_evt.wait()
     409          self.output.flush()
     410          mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END)
     411          self.assertEqual(self.output.getvalue(), mexpect)
     412  
     413      def testSendNeedingDotQuote(self):
     414          # Issue 12283
     415          m = '.A test\n.mes.sage.'
     416          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     417                              timeout=support.LOOPBACK_TIMEOUT)
     418          self.addCleanup(smtp.close)
     419          smtp.sendmail('John', 'Sally', m)
     420          # XXX (see comment in testSend)
     421          time.sleep(0.01)
     422          smtp.quit()
     423  
     424          self.client_evt.set()
     425          self.serv_evt.wait()
     426          self.output.flush()
     427          mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
     428          self.assertEqual(self.output.getvalue(), mexpect)
     429  
     430      def test_issue43124_escape_localhostname(self):
     431          # see: https://bugs.python.org/issue43124
     432          # connect and send mail
     433          m = 'wazzuuup\nlinetwo'
     434          smtp = smtplib.SMTP(HOST, self.port, local_hostname='hi\nX-INJECTED',
     435                              timeout=support.LOOPBACK_TIMEOUT)
     436          self.addCleanup(smtp.close)
     437          with self.assertRaises(ValueError) as exc:
     438              smtp.sendmail("hi@me.com", "you@me.com", m)
     439          self.assertIn(
     440              "prohibited newline characters: ehlo hi\\nX-INJECTED",
     441              str(exc.exception),
     442          )
     443          # XXX (see comment in testSend)
     444          time.sleep(0.01)
     445          smtp.quit()
     446  
     447          debugout = smtpd.DEBUGSTREAM.getvalue()
     448          self.assertNotIn("X-INJECTED", debugout)
     449  
     450      def test_issue43124_escape_options(self):
     451          # see: https://bugs.python.org/issue43124
     452          # connect and send mail
     453          m = 'wazzuuup\nlinetwo'
     454          smtp = smtplib.SMTP(
     455              HOST, self.port, local_hostname='localhost',
     456              timeout=support.LOOPBACK_TIMEOUT)
     457  
     458          self.addCleanup(smtp.close)
     459          smtp.sendmail("hi@me.com", "you@me.com", m)
     460          with self.assertRaises(ValueError) as exc:
     461              smtp.mail("hi@me.com", ["X-OPTION\nX-INJECTED-1", "X-OPTION2\nX-INJECTED-2"])
     462          msg = str(exc.exception)
     463          self.assertIn("prohibited newline characters", msg)
     464          self.assertIn("X-OPTION\\nX-INJECTED-1 X-OPTION2\\nX-INJECTED-2", msg)
     465          # XXX (see comment in testSend)
     466          time.sleep(0.01)
     467          smtp.quit()
     468  
     469          debugout = smtpd.DEBUGSTREAM.getvalue()
     470          self.assertNotIn("X-OPTION", debugout)
     471          self.assertNotIn("X-OPTION2", debugout)
     472          self.assertNotIn("X-INJECTED-1", debugout)
     473          self.assertNotIn("X-INJECTED-2", debugout)
     474  
     475      def testSendNullSender(self):
     476          m = 'A test message'
     477          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     478                              timeout=support.LOOPBACK_TIMEOUT)
     479          self.addCleanup(smtp.close)
     480          smtp.sendmail('<>', 'Sally', m)
     481          # XXX (see comment in testSend)
     482          time.sleep(0.01)
     483          smtp.quit()
     484  
     485          self.client_evt.set()
     486          self.serv_evt.wait()
     487          self.output.flush()
     488          mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
     489          self.assertEqual(self.output.getvalue(), mexpect)
     490          debugout = smtpd.DEBUGSTREAM.getvalue()
     491          sender = re.compile("^sender: <>$", re.MULTILINE)
     492          self.assertRegex(debugout, sender)
     493  
     494      def testSendMessage(self):
     495          m = email.mime.text.MIMEText('A test message')
     496          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     497                              timeout=support.LOOPBACK_TIMEOUT)
     498          self.addCleanup(smtp.close)
     499          smtp.send_message(m, from_addr='John', to_addrs='Sally')
     500          # XXX (see comment in testSend)
     501          time.sleep(0.01)
     502          smtp.quit()
     503  
     504          self.client_evt.set()
     505          self.serv_evt.wait()
     506          self.output.flush()
     507          # Remove the X-Peer header that DebuggingServer adds as figuring out
     508          # exactly what IP address format is put there is not easy (and
     509          # irrelevant to our test).  Typically 127.0.0.1 or ::1, but it is
     510          # not always the same as socket.gethostbyname(HOST). :(
     511          test_output = self.get_output_without_xpeer()
     512          del m['X-Peer']
     513          mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
     514          self.assertEqual(test_output, mexpect)
     515  
     516      def testSendMessageWithAddresses(self):
     517          m = email.mime.text.MIMEText('A test message')
     518          m['From'] = 'foo@bar.com'
     519          m['To'] = 'John'
     520          m['CC'] = 'Sally, Fred'
     521          m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
     522          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     523                              timeout=support.LOOPBACK_TIMEOUT)
     524          self.addCleanup(smtp.close)
     525          smtp.send_message(m)
     526          # XXX (see comment in testSend)
     527          time.sleep(0.01)
     528          smtp.quit()
     529          # make sure the Bcc header is still in the message.
     530          self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" '
     531                                      '<warped@silly.walks.com>')
     532  
     533          self.client_evt.set()
     534          self.serv_evt.wait()
     535          self.output.flush()
     536          # Remove the X-Peer header that DebuggingServer adds.
     537          test_output = self.get_output_without_xpeer()
     538          del m['X-Peer']
     539          # The Bcc header should not be transmitted.
     540          del m['Bcc']
     541          mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
     542          self.assertEqual(test_output, mexpect)
     543          debugout = smtpd.DEBUGSTREAM.getvalue()
     544          sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
     545          self.assertRegex(debugout, sender)
     546          for addr in ('John', 'Sally', 'Fred', 'root@localhost',
     547                       'warped@silly.walks.com'):
     548              to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
     549                                   re.MULTILINE)
     550              self.assertRegex(debugout, to_addr)
     551  
     552      def testSendMessageWithSomeAddresses(self):
     553          # Make sure nothing breaks if not all of the three 'to' headers exist
     554          m = email.mime.text.MIMEText('A test message')
     555          m['From'] = 'foo@bar.com'
     556          m['To'] = 'John, Dinsdale'
     557          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     558                              timeout=support.LOOPBACK_TIMEOUT)
     559          self.addCleanup(smtp.close)
     560          smtp.send_message(m)
     561          # XXX (see comment in testSend)
     562          time.sleep(0.01)
     563          smtp.quit()
     564  
     565          self.client_evt.set()
     566          self.serv_evt.wait()
     567          self.output.flush()
     568          # Remove the X-Peer header that DebuggingServer adds.
     569          test_output = self.get_output_without_xpeer()
     570          del m['X-Peer']
     571          mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
     572          self.assertEqual(test_output, mexpect)
     573          debugout = smtpd.DEBUGSTREAM.getvalue()
     574          sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
     575          self.assertRegex(debugout, sender)
     576          for addr in ('John', 'Dinsdale'):
     577              to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
     578                                   re.MULTILINE)
     579              self.assertRegex(debugout, to_addr)
     580  
     581      def testSendMessageWithSpecifiedAddresses(self):
     582          # Make sure addresses specified in call override those in message.
     583          m = email.mime.text.MIMEText('A test message')
     584          m['From'] = 'foo@bar.com'
     585          m['To'] = 'John, Dinsdale'
     586          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     587                              timeout=support.LOOPBACK_TIMEOUT)
     588          self.addCleanup(smtp.close)
     589          smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net')
     590          # XXX (see comment in testSend)
     591          time.sleep(0.01)
     592          smtp.quit()
     593  
     594          self.client_evt.set()
     595          self.serv_evt.wait()
     596          self.output.flush()
     597          # Remove the X-Peer header that DebuggingServer adds.
     598          test_output = self.get_output_without_xpeer()
     599          del m['X-Peer']
     600          mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
     601          self.assertEqual(test_output, mexpect)
     602          debugout = smtpd.DEBUGSTREAM.getvalue()
     603          sender = re.compile("^sender: joe@example.com$", re.MULTILINE)
     604          self.assertRegex(debugout, sender)
     605          for addr in ('John', 'Dinsdale'):
     606              to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
     607                                   re.MULTILINE)
     608              self.assertNotRegex(debugout, to_addr)
     609          recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE)
     610          self.assertRegex(debugout, recip)
     611  
     612      def testSendMessageWithMultipleFrom(self):
     613          # Sender overrides To
     614          m = email.mime.text.MIMEText('A test message')
     615          m['From'] = 'Bernard, Bianca'
     616          m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com'
     617          m['To'] = 'John, Dinsdale'
     618          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     619                              timeout=support.LOOPBACK_TIMEOUT)
     620          self.addCleanup(smtp.close)
     621          smtp.send_message(m)
     622          # XXX (see comment in testSend)
     623          time.sleep(0.01)
     624          smtp.quit()
     625  
     626          self.client_evt.set()
     627          self.serv_evt.wait()
     628          self.output.flush()
     629          # Remove the X-Peer header that DebuggingServer adds.
     630          test_output = self.get_output_without_xpeer()
     631          del m['X-Peer']
     632          mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
     633          self.assertEqual(test_output, mexpect)
     634          debugout = smtpd.DEBUGSTREAM.getvalue()
     635          sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE)
     636          self.assertRegex(debugout, sender)
     637          for addr in ('John', 'Dinsdale'):
     638              to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
     639                                   re.MULTILINE)
     640              self.assertRegex(debugout, to_addr)
     641  
     642      def testSendMessageResent(self):
     643          m = email.mime.text.MIMEText('A test message')
     644          m['From'] = 'foo@bar.com'
     645          m['To'] = 'John'
     646          m['CC'] = 'Sally, Fred'
     647          m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
     648          m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
     649          m['Resent-From'] = 'holy@grail.net'
     650          m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
     651          m['Resent-Bcc'] = 'doe@losthope.net'
     652          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     653                              timeout=support.LOOPBACK_TIMEOUT)
     654          self.addCleanup(smtp.close)
     655          smtp.send_message(m)
     656          # XXX (see comment in testSend)
     657          time.sleep(0.01)
     658          smtp.quit()
     659  
     660          self.client_evt.set()
     661          self.serv_evt.wait()
     662          self.output.flush()
     663          # The Resent-Bcc headers are deleted before serialization.
     664          del m['Bcc']
     665          del m['Resent-Bcc']
     666          # Remove the X-Peer header that DebuggingServer adds.
     667          test_output = self.get_output_without_xpeer()
     668          del m['X-Peer']
     669          mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
     670          self.assertEqual(test_output, mexpect)
     671          debugout = smtpd.DEBUGSTREAM.getvalue()
     672          sender = re.compile("^sender: holy@grail.net$", re.MULTILINE)
     673          self.assertRegex(debugout, sender)
     674          for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'):
     675              to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
     676                                   re.MULTILINE)
     677              self.assertRegex(debugout, to_addr)
     678  
     679      def testSendMessageMultipleResentRaises(self):
     680          m = email.mime.text.MIMEText('A test message')
     681          m['From'] = 'foo@bar.com'
     682          m['To'] = 'John'
     683          m['CC'] = 'Sally, Fred'
     684          m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
     685          m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
     686          m['Resent-From'] = 'holy@grail.net'
     687          m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
     688          m['Resent-Bcc'] = 'doe@losthope.net'
     689          m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000'
     690          m['Resent-To'] = 'holy@grail.net'
     691          m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff'
     692          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     693                              timeout=support.LOOPBACK_TIMEOUT)
     694          self.addCleanup(smtp.close)
     695          with self.assertRaises(ValueError):
     696              smtp.send_message(m)
     697          smtp.close()
     698  
     699  class ESC[4;38;5;81mNonConnectingTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     700  
     701      def testNotConnected(self):
     702          # Test various operations on an unconnected SMTP object that
     703          # should raise exceptions (at present the attempt in SMTP.send
     704          # to reference the nonexistent 'sock' attribute of the SMTP object
     705          # causes an AttributeError)
     706          smtp = smtplib.SMTP()
     707          self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
     708          self.assertRaises(smtplib.SMTPServerDisconnected,
     709                            smtp.send, 'test msg')
     710  
     711      def testNonnumericPort(self):
     712          # check that non-numeric port raises OSError
     713          self.assertRaises(OSError, smtplib.SMTP,
     714                            "localhost", "bogus")
     715          self.assertRaises(OSError, smtplib.SMTP,
     716                            "localhost:bogus")
     717  
     718      def testSockAttributeExists(self):
     719          # check that sock attribute is present outside of a connect() call
     720          # (regression test, the previous behavior raised an
     721          #  AttributeError: 'SMTP' object has no attribute 'sock')
     722          with smtplib.SMTP() as smtp:
     723              self.assertIsNone(smtp.sock)
     724  
     725  
     726  class ESC[4;38;5;81mDefaultArgumentsTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     727  
     728      def setUp(self):
     729          self.msg = EmailMessage()
     730          self.msg['From'] = 'Páolo <főo@bar.com>'
     731          self.smtp = smtplib.SMTP()
     732          self.smtp.ehlo = Mock(return_value=(200, 'OK'))
     733          self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock()
     734  
     735      def testSendMessage(self):
     736          expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME')
     737          self.smtp.send_message(self.msg)
     738          self.smtp.send_message(self.msg)
     739          self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
     740                           expected_mail_options)
     741          self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3],
     742                           expected_mail_options)
     743  
     744      def testSendMessageWithMailOptions(self):
     745          mail_options = ['STARTTLS']
     746          expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME')
     747          self.smtp.send_message(self.msg, None, None, mail_options)
     748          self.assertEqual(mail_options, ['STARTTLS'])
     749          self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
     750                           expected_mail_options)
     751  
     752  
     753  # test response of client to a non-successful HELO message
     754  class ESC[4;38;5;81mBadHELOServerTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     755  
     756      def setUp(self):
     757          smtplib.socket = mock_socket
     758          mock_socket.reply_with(b"199 no hello for you!")
     759          self.old_stdout = sys.stdout
     760          self.output = io.StringIO()
     761          sys.stdout = self.output
     762          self.port = 25
     763  
     764      def tearDown(self):
     765          smtplib.socket = socket
     766          sys.stdout = self.old_stdout
     767  
     768      def testFailingHELO(self):
     769          self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
     770                              HOST, self.port, 'localhost', 3)
     771  
     772  
     773  class ESC[4;38;5;81mTooLongLineTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     774      respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n'
     775  
     776      def setUp(self):
     777          self.thread_key = threading_helper.threading_setup()
     778          self.old_stdout = sys.stdout
     779          self.output = io.StringIO()
     780          sys.stdout = self.output
     781  
     782          self.evt = threading.Event()
     783          self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     784          self.sock.settimeout(15)
     785          self.port = socket_helper.bind_port(self.sock)
     786          servargs = (self.evt, self.respdata, self.sock)
     787          self.thread = threading.Thread(target=server, args=servargs)
     788          self.thread.start()
     789          self.evt.wait()
     790          self.evt.clear()
     791  
     792      def tearDown(self):
     793          self.evt.wait()
     794          sys.stdout = self.old_stdout
     795          threading_helper.join_thread(self.thread)
     796          del self.thread
     797          self.doCleanups()
     798          threading_helper.threading_cleanup(*self.thread_key)
     799  
     800      def testLineTooLong(self):
     801          self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP,
     802                            HOST, self.port, 'localhost', 3)
     803  
     804  
     805  sim_users = {'Mr.A@somewhere.com':'John A',
     806               'Ms.B@xn--fo-fka.com':'Sally B',
     807               'Mrs.C@somewhereesle.com':'Ruth C',
     808              }
     809  
     810  sim_auth = ('Mr.A@somewhere.com', 'somepassword')
     811  sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
     812                            'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
     813  sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
     814               'list-2':['Ms.B@xn--fo-fka.com',],
     815              }
     816  
     817  # Simulated SMTP channel & server
     818  class ESC[4;38;5;81mResponseException(ESC[4;38;5;149mException): pass
     819  class ESC[4;38;5;81mSimSMTPChannel(ESC[4;38;5;149msmtpdESC[4;38;5;149m.ESC[4;38;5;149mSMTPChannel):
     820  
     821      quit_response = None
     822      mail_response = None
     823      rcpt_response = None
     824      data_response = None
     825      rcpt_count = 0
     826      rset_count = 0
     827      disconnect = 0
     828      AUTH = 99    # Add protocol state to enable auth testing.
     829      authenticated_user = None
     830  
     831      def __init__(self, extra_features, *args, **kw):
     832          self._extrafeatures = ''.join(
     833              [ "250-{0}\r\n".format(x) for x in extra_features ])
     834          super(SimSMTPChannel, self).__init__(*args, **kw)
     835  
     836      # AUTH related stuff.  It would be nice if support for this were in smtpd.
     837      def found_terminator(self):
     838          if self.smtp_state == self.AUTH:
     839              line = self._emptystring.join(self.received_lines)
     840              print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
     841              self.received_lines = []
     842              try:
     843                  self.auth_object(line)
     844              except ResponseException as e:
     845                  self.smtp_state = self.COMMAND
     846                  self.push('%s %s' % (e.smtp_code, e.smtp_error))
     847              return
     848          super().found_terminator()
     849  
     850  
     851      def smtp_AUTH(self, arg):
     852          if not self.seen_greeting:
     853              self.push('503 Error: send EHLO first')
     854              return
     855          if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
     856              self.push('500 Error: command "AUTH" not recognized')
     857              return
     858          if self.authenticated_user is not None:
     859              self.push(
     860                  '503 Bad sequence of commands: already authenticated')
     861              return
     862          args = arg.split()
     863          if len(args) not in [1, 2]:
     864              self.push('501 Syntax: AUTH <mechanism> [initial-response]')
     865              return
     866          auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
     867          try:
     868              self.auth_object = getattr(self, auth_object_name)
     869          except AttributeError:
     870              self.push('504 Command parameter not implemented: unsupported '
     871                        ' authentication mechanism {!r}'.format(auth_object_name))
     872              return
     873          self.smtp_state = self.AUTH
     874          self.auth_object(args[1] if len(args) == 2 else None)
     875  
     876      def _authenticated(self, user, valid):
     877          if valid:
     878              self.authenticated_user = user
     879              self.push('235 Authentication Succeeded')
     880          else:
     881              self.push('535 Authentication credentials invalid')
     882          self.smtp_state = self.COMMAND
     883  
     884      def _decode_base64(self, string):
     885          return base64.decodebytes(string.encode('ascii')).decode('utf-8')
     886  
     887      def _auth_plain(self, arg=None):
     888          if arg is None:
     889              self.push('334 ')
     890          else:
     891              logpass = self._decode_base64(arg)
     892              try:
     893                  *_, user, password = logpass.split('\0')
     894              except ValueError as e:
     895                  self.push('535 Splitting response {!r} into user and password'
     896                            ' failed: {}'.format(logpass, e))
     897                  return
     898              self._authenticated(user, password == sim_auth[1])
     899  
     900      def _auth_login(self, arg=None):
     901          if arg is None:
     902              # base64 encoded 'Username:'
     903              self.push('334 VXNlcm5hbWU6')
     904          elif not hasattr(self, '_auth_login_user'):
     905              self._auth_login_user = self._decode_base64(arg)
     906              # base64 encoded 'Password:'
     907              self.push('334 UGFzc3dvcmQ6')
     908          else:
     909              password = self._decode_base64(arg)
     910              self._authenticated(self._auth_login_user, password == sim_auth[1])
     911              del self._auth_login_user
     912  
     913      def _auth_buggy(self, arg=None):
     914          # This AUTH mechanism will 'trap' client in a neverending 334
     915          # base64 encoded 'BuGgYbUgGy'
     916          self.push('334 QnVHZ1liVWdHeQ==')
     917  
     918      def _auth_cram_md5(self, arg=None):
     919          if arg is None:
     920              self.push('334 {}'.format(sim_cram_md5_challenge))
     921          else:
     922              logpass = self._decode_base64(arg)
     923              try:
     924                  user, hashed_pass = logpass.split()
     925              except ValueError as e:
     926                  self.push('535 Splitting response {!r} into user and password '
     927                            'failed: {}'.format(logpass, e))
     928                  return False
     929              valid_hashed_pass = hmac.HMAC(
     930                  sim_auth[1].encode('ascii'),
     931                  self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
     932                  'md5').hexdigest()
     933              self._authenticated(user, hashed_pass == valid_hashed_pass)
     934      # end AUTH related stuff.
     935  
     936      def smtp_EHLO(self, arg):
     937          resp = ('250-testhost\r\n'
     938                  '250-EXPN\r\n'
     939                  '250-SIZE 20000000\r\n'
     940                  '250-STARTTLS\r\n'
     941                  '250-DELIVERBY\r\n')
     942          resp = resp + self._extrafeatures + '250 HELP'
     943          self.push(resp)
     944          self.seen_greeting = arg
     945          self.extended_smtp = True
     946  
     947      def smtp_VRFY(self, arg):
     948          # For max compatibility smtplib should be sending the raw address.
     949          if arg in sim_users:
     950              self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg)))
     951          else:
     952              self.push('550 No such user: %s' % arg)
     953  
     954      def smtp_EXPN(self, arg):
     955          list_name = arg.lower()
     956          if list_name in sim_lists:
     957              user_list = sim_lists[list_name]
     958              for n, user_email in enumerate(user_list):
     959                  quoted_addr = smtplib.quoteaddr(user_email)
     960                  if n < len(user_list) - 1:
     961                      self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
     962                  else:
     963                      self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
     964          else:
     965              self.push('550 No access for you!')
     966  
     967      def smtp_QUIT(self, arg):
     968          if self.quit_response is None:
     969              super(SimSMTPChannel, self).smtp_QUIT(arg)
     970          else:
     971              self.push(self.quit_response)
     972              self.close_when_done()
     973  
     974      def smtp_MAIL(self, arg):
     975          if self.mail_response is None:
     976              super().smtp_MAIL(arg)
     977          else:
     978              self.push(self.mail_response)
     979              if self.disconnect:
     980                  self.close_when_done()
     981  
     982      def smtp_RCPT(self, arg):
     983          if self.rcpt_response is None:
     984              super().smtp_RCPT(arg)
     985              return
     986          self.rcpt_count += 1
     987          self.push(self.rcpt_response[self.rcpt_count-1])
     988  
     989      def smtp_RSET(self, arg):
     990          self.rset_count += 1
     991          super().smtp_RSET(arg)
     992  
     993      def smtp_DATA(self, arg):
     994          if self.data_response is None:
     995              super().smtp_DATA(arg)
     996          else:
     997              self.push(self.data_response)
     998  
     999      def handle_error(self):
    1000          raise
    1001  
    1002  
    1003  class ESC[4;38;5;81mSimSMTPServer(ESC[4;38;5;149msmtpdESC[4;38;5;149m.ESC[4;38;5;149mSMTPServer):
    1004  
    1005      channel_class = SimSMTPChannel
    1006  
    1007      def __init__(self, *args, **kw):
    1008          self._extra_features = []
    1009          self._addresses = {}
    1010          smtpd.SMTPServer.__init__(self, *args, **kw)
    1011  
    1012      def handle_accepted(self, conn, addr):
    1013          self._SMTPchannel = self.channel_class(
    1014              self._extra_features, self, conn, addr,
    1015              decode_data=self._decode_data)
    1016  
    1017      def process_message(self, peer, mailfrom, rcpttos, data):
    1018          self._addresses['from'] = mailfrom
    1019          self._addresses['tos'] = rcpttos
    1020  
    1021      def add_feature(self, feature):
    1022          self._extra_features.append(feature)
    1023  
    1024      def handle_error(self):
    1025          raise
    1026  
    1027  
    1028  # Test various SMTP & ESMTP commands/behaviors that require a simulated server
    1029  # (i.e., something with more features than DebuggingServer)
    1030  class ESC[4;38;5;81mSMTPSimTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1031  
    1032      def setUp(self):
    1033          self.thread_key = threading_helper.threading_setup()
    1034          self.real_getfqdn = socket.getfqdn
    1035          socket.getfqdn = mock_socket.getfqdn
    1036          self.serv_evt = threading.Event()
    1037          self.client_evt = threading.Event()
    1038          # Pick a random unused port by passing 0 for the port number
    1039          self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True)
    1040          # Keep a note of what port was assigned
    1041          self.port = self.serv.socket.getsockname()[1]
    1042          serv_args = (self.serv, self.serv_evt, self.client_evt)
    1043          self.thread = threading.Thread(target=debugging_server, args=serv_args)
    1044          self.thread.start()
    1045  
    1046          # wait until server thread has assigned a port number
    1047          self.serv_evt.wait()
    1048          self.serv_evt.clear()
    1049  
    1050      def tearDown(self):
    1051          socket.getfqdn = self.real_getfqdn
    1052          # indicate that the client is finished
    1053          self.client_evt.set()
    1054          # wait for the server thread to terminate
    1055          self.serv_evt.wait()
    1056          threading_helper.join_thread(self.thread)
    1057          del self.thread
    1058          self.doCleanups()
    1059          threading_helper.threading_cleanup(*self.thread_key)
    1060  
    1061      def testBasic(self):
    1062          # smoke test
    1063          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1064                              timeout=support.LOOPBACK_TIMEOUT)
    1065          smtp.quit()
    1066  
    1067      def testEHLO(self):
    1068          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1069                              timeout=support.LOOPBACK_TIMEOUT)
    1070  
    1071          # no features should be present before the EHLO
    1072          self.assertEqual(smtp.esmtp_features, {})
    1073  
    1074          # features expected from the test server
    1075          expected_features = {'expn':'',
    1076                               'size': '20000000',
    1077                               'starttls': '',
    1078                               'deliverby': '',
    1079                               'help': '',
    1080                               }
    1081  
    1082          smtp.ehlo()
    1083          self.assertEqual(smtp.esmtp_features, expected_features)
    1084          for k in expected_features:
    1085              self.assertTrue(smtp.has_extn(k))
    1086          self.assertFalse(smtp.has_extn('unsupported-feature'))
    1087          smtp.quit()
    1088  
    1089      def testVRFY(self):
    1090          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1091                              timeout=support.LOOPBACK_TIMEOUT)
    1092  
    1093          for addr_spec, name in sim_users.items():
    1094              expected_known = (250, bytes('%s %s' %
    1095                                           (name, smtplib.quoteaddr(addr_spec)),
    1096                                           "ascii"))
    1097              self.assertEqual(smtp.vrfy(addr_spec), expected_known)
    1098  
    1099          u = 'nobody@nowhere.com'
    1100          expected_unknown = (550, ('No such user: %s' % u).encode('ascii'))
    1101          self.assertEqual(smtp.vrfy(u), expected_unknown)
    1102          smtp.quit()
    1103  
    1104      def testEXPN(self):
    1105          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1106                              timeout=support.LOOPBACK_TIMEOUT)
    1107  
    1108          for listname, members in sim_lists.items():
    1109              users = []
    1110              for m in members:
    1111                  users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
    1112              expected_known = (250, bytes('\n'.join(users), "ascii"))
    1113              self.assertEqual(smtp.expn(listname), expected_known)
    1114  
    1115          u = 'PSU-Members-List'
    1116          expected_unknown = (550, b'No access for you!')
    1117          self.assertEqual(smtp.expn(u), expected_unknown)
    1118          smtp.quit()
    1119  
    1120      def testAUTH_PLAIN(self):
    1121          self.serv.add_feature("AUTH PLAIN")
    1122          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1123                              timeout=support.LOOPBACK_TIMEOUT)
    1124          resp = smtp.login(sim_auth[0], sim_auth[1])
    1125          self.assertEqual(resp, (235, b'Authentication Succeeded'))
    1126          smtp.close()
    1127  
    1128      def testAUTH_LOGIN(self):
    1129          self.serv.add_feature("AUTH LOGIN")
    1130          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1131                              timeout=support.LOOPBACK_TIMEOUT)
    1132          resp = smtp.login(sim_auth[0], sim_auth[1])
    1133          self.assertEqual(resp, (235, b'Authentication Succeeded'))
    1134          smtp.close()
    1135  
    1136      def testAUTH_LOGIN_initial_response_ok(self):
    1137          self.serv.add_feature("AUTH LOGIN")
    1138          with smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1139                            timeout=support.LOOPBACK_TIMEOUT) as smtp:
    1140              smtp.user, smtp.password = sim_auth
    1141              smtp.ehlo("test_auth_login")
    1142              resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=True)
    1143              self.assertEqual(resp, (235, b'Authentication Succeeded'))
    1144  
    1145      def testAUTH_LOGIN_initial_response_notok(self):
    1146          self.serv.add_feature("AUTH LOGIN")
    1147          with smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1148                            timeout=support.LOOPBACK_TIMEOUT) as smtp:
    1149              smtp.user, smtp.password = sim_auth
    1150              smtp.ehlo("test_auth_login")
    1151              resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=False)
    1152              self.assertEqual(resp, (235, b'Authentication Succeeded'))
    1153  
    1154      def testAUTH_BUGGY(self):
    1155          self.serv.add_feature("AUTH BUGGY")
    1156  
    1157          def auth_buggy(challenge=None):
    1158              self.assertEqual(b"BuGgYbUgGy", challenge)
    1159              return "\0"
    1160  
    1161          smtp = smtplib.SMTP(
    1162              HOST, self.port, local_hostname='localhost',
    1163              timeout=support.LOOPBACK_TIMEOUT
    1164          )
    1165          try:
    1166              smtp.user, smtp.password = sim_auth
    1167              smtp.ehlo("test_auth_buggy")
    1168              expect = r"^Server AUTH mechanism infinite loop.*"
    1169              with self.assertRaisesRegex(smtplib.SMTPException, expect) as cm:
    1170                  smtp.auth("BUGGY", auth_buggy, initial_response_ok=False)
    1171          finally:
    1172              smtp.close()
    1173  
    1174      @hashlib_helper.requires_hashdigest('md5', openssl=True)
    1175      def testAUTH_CRAM_MD5(self):
    1176          self.serv.add_feature("AUTH CRAM-MD5")
    1177          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1178                              timeout=support.LOOPBACK_TIMEOUT)
    1179          resp = smtp.login(sim_auth[0], sim_auth[1])
    1180          self.assertEqual(resp, (235, b'Authentication Succeeded'))
    1181          smtp.close()
    1182  
    1183      @hashlib_helper.requires_hashdigest('md5', openssl=True)
    1184      def testAUTH_multiple(self):
    1185          # Test that multiple authentication methods are tried.
    1186          self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
    1187          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1188                              timeout=support.LOOPBACK_TIMEOUT)
    1189          resp = smtp.login(sim_auth[0], sim_auth[1])
    1190          self.assertEqual(resp, (235, b'Authentication Succeeded'))
    1191          smtp.close()
    1192  
    1193      def test_auth_function(self):
    1194          supported = {'PLAIN', 'LOGIN'}
    1195          try:
    1196              hashlib.md5()
    1197          except ValueError:
    1198              pass
    1199          else:
    1200              supported.add('CRAM-MD5')
    1201          for mechanism in supported:
    1202              self.serv.add_feature("AUTH {}".format(mechanism))
    1203          for mechanism in supported:
    1204              with self.subTest(mechanism=mechanism):
    1205                  smtp = smtplib.SMTP(HOST, self.port,
    1206                                      local_hostname='localhost',
    1207                                      timeout=support.LOOPBACK_TIMEOUT)
    1208                  smtp.ehlo('foo')
    1209                  smtp.user, smtp.password = sim_auth[0], sim_auth[1]
    1210                  method = 'auth_' + mechanism.lower().replace('-', '_')
    1211                  resp = smtp.auth(mechanism, getattr(smtp, method))
    1212                  self.assertEqual(resp, (235, b'Authentication Succeeded'))
    1213                  smtp.close()
    1214  
    1215      def test_quit_resets_greeting(self):
    1216          smtp = smtplib.SMTP(HOST, self.port,
    1217                              local_hostname='localhost',
    1218                              timeout=support.LOOPBACK_TIMEOUT)
    1219          code, message = smtp.ehlo()
    1220          self.assertEqual(code, 250)
    1221          self.assertIn('size', smtp.esmtp_features)
    1222          smtp.quit()
    1223          self.assertNotIn('size', smtp.esmtp_features)
    1224          smtp.connect(HOST, self.port)
    1225          self.assertNotIn('size', smtp.esmtp_features)
    1226          smtp.ehlo_or_helo_if_needed()
    1227          self.assertIn('size', smtp.esmtp_features)
    1228          smtp.quit()
    1229  
    1230      def test_with_statement(self):
    1231          with smtplib.SMTP(HOST, self.port) as smtp:
    1232              code, message = smtp.noop()
    1233              self.assertEqual(code, 250)
    1234          self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
    1235          with smtplib.SMTP(HOST, self.port) as smtp:
    1236              smtp.close()
    1237          self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
    1238  
    1239      def test_with_statement_QUIT_failure(self):
    1240          with self.assertRaises(smtplib.SMTPResponseException) as error:
    1241              with smtplib.SMTP(HOST, self.port) as smtp:
    1242                  smtp.noop()
    1243                  self.serv._SMTPchannel.quit_response = '421 QUIT FAILED'
    1244          self.assertEqual(error.exception.smtp_code, 421)
    1245          self.assertEqual(error.exception.smtp_error, b'QUIT FAILED')
    1246  
    1247      #TODO: add tests for correct AUTH method fallback now that the
    1248      #test infrastructure can support it.
    1249  
    1250      # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception
    1251      def test__rest_from_mail_cmd(self):
    1252          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1253                              timeout=support.LOOPBACK_TIMEOUT)
    1254          smtp.noop()
    1255          self.serv._SMTPchannel.mail_response = '451 Requested action aborted'
    1256          self.serv._SMTPchannel.disconnect = True
    1257          with self.assertRaises(smtplib.SMTPSenderRefused):
    1258              smtp.sendmail('John', 'Sally', 'test message')
    1259          self.assertIsNone(smtp.sock)
    1260  
    1261      # Issue 5713: make sure close, not rset, is called if we get a 421 error
    1262      def test_421_from_mail_cmd(self):
    1263          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1264                              timeout=support.LOOPBACK_TIMEOUT)
    1265          smtp.noop()
    1266          self.serv._SMTPchannel.mail_response = '421 closing connection'
    1267          with self.assertRaises(smtplib.SMTPSenderRefused):
    1268              smtp.sendmail('John', 'Sally', 'test message')
    1269          self.assertIsNone(smtp.sock)
    1270          self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
    1271  
    1272      def test_421_from_rcpt_cmd(self):
    1273          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1274                              timeout=support.LOOPBACK_TIMEOUT)
    1275          smtp.noop()
    1276          self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing']
    1277          with self.assertRaises(smtplib.SMTPRecipientsRefused) as r:
    1278              smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message')
    1279          self.assertIsNone(smtp.sock)
    1280          self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
    1281          self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')})
    1282  
    1283      def test_421_from_data_cmd(self):
    1284          class ESC[4;38;5;81mMySimSMTPChannel(ESC[4;38;5;149mSimSMTPChannel):
    1285              def found_terminator(self):
    1286                  if self.smtp_state == self.DATA:
    1287                      self.push('421 closing')
    1288                  else:
    1289                      super().found_terminator()
    1290          self.serv.channel_class = MySimSMTPChannel
    1291          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1292                              timeout=support.LOOPBACK_TIMEOUT)
    1293          smtp.noop()
    1294          with self.assertRaises(smtplib.SMTPDataError):
    1295              smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message')
    1296          self.assertIsNone(smtp.sock)
    1297          self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0)
    1298  
    1299      def test_smtputf8_NotSupportedError_if_no_server_support(self):
    1300          smtp = smtplib.SMTP(
    1301              HOST, self.port, local_hostname='localhost',
    1302              timeout=support.LOOPBACK_TIMEOUT)
    1303          self.addCleanup(smtp.close)
    1304          smtp.ehlo()
    1305          self.assertTrue(smtp.does_esmtp)
    1306          self.assertFalse(smtp.has_extn('smtputf8'))
    1307          self.assertRaises(
    1308              smtplib.SMTPNotSupportedError,
    1309              smtp.sendmail,
    1310              'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
    1311          self.assertRaises(
    1312              smtplib.SMTPNotSupportedError,
    1313              smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8'])
    1314  
    1315      def test_send_unicode_without_SMTPUTF8(self):
    1316          smtp = smtplib.SMTP(
    1317              HOST, self.port, local_hostname='localhost',
    1318              timeout=support.LOOPBACK_TIMEOUT)
    1319          self.addCleanup(smtp.close)
    1320          self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '')
    1321          self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice')
    1322  
    1323      def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self):
    1324          # This test is located here and not in the SMTPUTF8SimTests
    1325          # class because it needs a "regular" SMTP server to work
    1326          msg = EmailMessage()
    1327          msg['From'] = "Páolo <főo@bar.com>"
    1328          msg['To'] = 'Dinsdale'
    1329          msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
    1330          smtp = smtplib.SMTP(
    1331              HOST, self.port, local_hostname='localhost',
    1332              timeout=support.LOOPBACK_TIMEOUT)
    1333          self.addCleanup(smtp.close)
    1334          with self.assertRaises(smtplib.SMTPNotSupportedError):
    1335              smtp.send_message(msg)
    1336  
    1337      def test_name_field_not_included_in_envelop_addresses(self):
    1338          smtp = smtplib.SMTP(
    1339              HOST, self.port, local_hostname='localhost',
    1340              timeout=support.LOOPBACK_TIMEOUT)
    1341          self.addCleanup(smtp.close)
    1342  
    1343          message = EmailMessage()
    1344          message['From'] = email.utils.formataddr(('Michaël', 'michael@example.com'))
    1345          message['To'] = email.utils.formataddr(('René', 'rene@example.com'))
    1346  
    1347          self.assertDictEqual(smtp.send_message(message), {})
    1348  
    1349          self.assertEqual(self.serv._addresses['from'], 'michael@example.com')
    1350          self.assertEqual(self.serv._addresses['tos'], ['rene@example.com'])
    1351  
    1352  
    1353  class ESC[4;38;5;81mSimSMTPUTF8Server(ESC[4;38;5;149mSimSMTPServer):
    1354  
    1355      def __init__(self, *args, **kw):
    1356          # The base SMTP server turns these on automatically, but our test
    1357          # server is set up to munge the EHLO response, so we need to provide
    1358          # them as well.  And yes, the call is to SMTPServer not SimSMTPServer.
    1359          self._extra_features = ['SMTPUTF8', '8BITMIME']
    1360          smtpd.SMTPServer.__init__(self, *args, **kw)
    1361  
    1362      def handle_accepted(self, conn, addr):
    1363          self._SMTPchannel = self.channel_class(
    1364              self._extra_features, self, conn, addr,
    1365              decode_data=self._decode_data,
    1366              enable_SMTPUTF8=self.enable_SMTPUTF8,
    1367          )
    1368  
    1369      def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None,
    1370                                                               rcpt_options=None):
    1371          self.last_peer = peer
    1372          self.last_mailfrom = mailfrom
    1373          self.last_rcpttos = rcpttos
    1374          self.last_message = data
    1375          self.last_mail_options = mail_options
    1376          self.last_rcpt_options = rcpt_options
    1377  
    1378  
    1379  class ESC[4;38;5;81mSMTPUTF8SimTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1380  
    1381      maxDiff = None
    1382  
    1383      def setUp(self):
    1384          self.thread_key = threading_helper.threading_setup()
    1385          self.real_getfqdn = socket.getfqdn
    1386          socket.getfqdn = mock_socket.getfqdn
    1387          self.serv_evt = threading.Event()
    1388          self.client_evt = threading.Event()
    1389          # Pick a random unused port by passing 0 for the port number
    1390          self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1),
    1391                                        decode_data=False,
    1392                                        enable_SMTPUTF8=True)
    1393          # Keep a note of what port was assigned
    1394          self.port = self.serv.socket.getsockname()[1]
    1395          serv_args = (self.serv, self.serv_evt, self.client_evt)
    1396          self.thread = threading.Thread(target=debugging_server, args=serv_args)
    1397          self.thread.start()
    1398  
    1399          # wait until server thread has assigned a port number
    1400          self.serv_evt.wait()
    1401          self.serv_evt.clear()
    1402  
    1403      def tearDown(self):
    1404          socket.getfqdn = self.real_getfqdn
    1405          # indicate that the client is finished
    1406          self.client_evt.set()
    1407          # wait for the server thread to terminate
    1408          self.serv_evt.wait()
    1409          threading_helper.join_thread(self.thread)
    1410          del self.thread
    1411          self.doCleanups()
    1412          threading_helper.threading_cleanup(*self.thread_key)
    1413  
    1414      def test_test_server_supports_extensions(self):
    1415          smtp = smtplib.SMTP(
    1416              HOST, self.port, local_hostname='localhost',
    1417              timeout=support.LOOPBACK_TIMEOUT)
    1418          self.addCleanup(smtp.close)
    1419          smtp.ehlo()
    1420          self.assertTrue(smtp.does_esmtp)
    1421          self.assertTrue(smtp.has_extn('smtputf8'))
    1422  
    1423      def test_send_unicode_with_SMTPUTF8_via_sendmail(self):
    1424          m = '¡a test message containing unicode!'.encode('utf-8')
    1425          smtp = smtplib.SMTP(
    1426              HOST, self.port, local_hostname='localhost',
    1427              timeout=support.LOOPBACK_TIMEOUT)
    1428          self.addCleanup(smtp.close)
    1429          smtp.sendmail('Jőhn', 'Sálly', m,
    1430                        mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
    1431          self.assertEqual(self.serv.last_mailfrom, 'Jőhn')
    1432          self.assertEqual(self.serv.last_rcpttos, ['Sálly'])
    1433          self.assertEqual(self.serv.last_message, m)
    1434          self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
    1435          self.assertIn('SMTPUTF8', self.serv.last_mail_options)
    1436          self.assertEqual(self.serv.last_rcpt_options, [])
    1437  
    1438      def test_send_unicode_with_SMTPUTF8_via_low_level_API(self):
    1439          m = '¡a test message containing unicode!'.encode('utf-8')
    1440          smtp = smtplib.SMTP(
    1441              HOST, self.port, local_hostname='localhost',
    1442              timeout=support.LOOPBACK_TIMEOUT)
    1443          self.addCleanup(smtp.close)
    1444          smtp.ehlo()
    1445          self.assertEqual(
    1446              smtp.mail('', options=['BODY=8BITMIME', 'SMTPUTF8']),
    1447              (250, b'OK'))
    1448          self.assertEqual(smtp.rcpt('János'), (250, b'OK'))
    1449          self.assertEqual(smtp.data(m), (250, b'OK'))
    1450          self.assertEqual(self.serv.last_mailfrom, '')
    1451          self.assertEqual(self.serv.last_rcpttos, ['János'])
    1452          self.assertEqual(self.serv.last_message, m)
    1453          self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
    1454          self.assertIn('SMTPUTF8', self.serv.last_mail_options)
    1455          self.assertEqual(self.serv.last_rcpt_options, [])
    1456  
    1457      def test_send_message_uses_smtputf8_if_addrs_non_ascii(self):
    1458          msg = EmailMessage()
    1459          msg['From'] = "Páolo <főo@bar.com>"
    1460          msg['To'] = 'Dinsdale'
    1461          msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
    1462          # XXX I don't know why I need two \n's here, but this is an existing
    1463          # bug (if it is one) and not a problem with the new functionality.
    1464          msg.set_content("oh là là, know what I mean, know what I mean?\n\n")
    1465          # XXX smtpd converts received /r/n to /n, so we can't easily test that
    1466          # we are successfully sending /r/n :(.
    1467          expected = textwrap.dedent("""\
    1468              From: Páolo <főo@bar.com>
    1469              To: Dinsdale
    1470              Subject: Nudge nudge, wink, wink \u1F609
    1471              Content-Type: text/plain; charset="utf-8"
    1472              Content-Transfer-Encoding: 8bit
    1473              MIME-Version: 1.0
    1474  
    1475              oh là là, know what I mean, know what I mean?
    1476              """)
    1477          smtp = smtplib.SMTP(
    1478              HOST, self.port, local_hostname='localhost',
    1479              timeout=support.LOOPBACK_TIMEOUT)
    1480          self.addCleanup(smtp.close)
    1481          self.assertEqual(smtp.send_message(msg), {})
    1482          self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com')
    1483          self.assertEqual(self.serv.last_rcpttos, ['Dinsdale'])
    1484          self.assertEqual(self.serv.last_message.decode(), expected)
    1485          self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
    1486          self.assertIn('SMTPUTF8', self.serv.last_mail_options)
    1487          self.assertEqual(self.serv.last_rcpt_options, [])
    1488  
    1489  
    1490  EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='')
    1491  
    1492  class ESC[4;38;5;81mSimSMTPAUTHInitialResponseChannel(ESC[4;38;5;149mSimSMTPChannel):
    1493      def smtp_AUTH(self, arg):
    1494          # RFC 4954's AUTH command allows for an optional initial-response.
    1495          # Not all AUTH methods support this; some require a challenge.  AUTH
    1496          # PLAIN does those, so test that here.  See issue #15014.
    1497          args = arg.split()
    1498          if args[0].lower() == 'plain':
    1499              if len(args) == 2:
    1500                  # AUTH PLAIN <initial-response> with the response base 64
    1501                  # encoded.  Hard code the expected response for the test.
    1502                  if args[1] == EXPECTED_RESPONSE:
    1503                      self.push('235 Ok')
    1504                      return
    1505          self.push('571 Bad authentication')
    1506  
    1507  class ESC[4;38;5;81mSimSMTPAUTHInitialResponseServer(ESC[4;38;5;149mSimSMTPServer):
    1508      channel_class = SimSMTPAUTHInitialResponseChannel
    1509  
    1510  
    1511  class ESC[4;38;5;81mSMTPAUTHInitialResponseSimTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1512      def setUp(self):
    1513          self.thread_key = threading_helper.threading_setup()
    1514          self.real_getfqdn = socket.getfqdn
    1515          socket.getfqdn = mock_socket.getfqdn
    1516          self.serv_evt = threading.Event()
    1517          self.client_evt = threading.Event()
    1518          # Pick a random unused port by passing 0 for the port number
    1519          self.serv = SimSMTPAUTHInitialResponseServer(
    1520              (HOST, 0), ('nowhere', -1), decode_data=True)
    1521          # Keep a note of what port was assigned
    1522          self.port = self.serv.socket.getsockname()[1]
    1523          serv_args = (self.serv, self.serv_evt, self.client_evt)
    1524          self.thread = threading.Thread(target=debugging_server, args=serv_args)
    1525          self.thread.start()
    1526  
    1527          # wait until server thread has assigned a port number
    1528          self.serv_evt.wait()
    1529          self.serv_evt.clear()
    1530  
    1531      def tearDown(self):
    1532          socket.getfqdn = self.real_getfqdn
    1533          # indicate that the client is finished
    1534          self.client_evt.set()
    1535          # wait for the server thread to terminate
    1536          self.serv_evt.wait()
    1537          threading_helper.join_thread(self.thread)
    1538          del self.thread
    1539          self.doCleanups()
    1540          threading_helper.threading_cleanup(*self.thread_key)
    1541  
    1542      def testAUTH_PLAIN_initial_response_login(self):
    1543          self.serv.add_feature('AUTH PLAIN')
    1544          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1545                              timeout=support.LOOPBACK_TIMEOUT)
    1546          smtp.login('psu', 'doesnotexist')
    1547          smtp.close()
    1548  
    1549      def testAUTH_PLAIN_initial_response_auth(self):
    1550          self.serv.add_feature('AUTH PLAIN')
    1551          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1552                              timeout=support.LOOPBACK_TIMEOUT)
    1553          smtp.user = 'psu'
    1554          smtp.password = 'doesnotexist'
    1555          code, response = smtp.auth('plain', smtp.auth_plain)
    1556          smtp.close()
    1557          self.assertEqual(code, 235)
    1558  
    1559  
    1560  if __name__ == '__main__':
    1561      unittest.main()