(root)/
Python-3.11.7/
Lib/
test/
test_smtplib.py
       1  import base64
       2  import email.mime.text
       3  from email.message import EmailMessage
       4  from email.base64mime import body_encode as encode_base64
       5  import email.utils
       6  import hashlib
       7  import hmac
       8  import socket
       9  import smtplib
      10  import io
      11  import re
      12  import sys
      13  import time
      14  import select
      15  import errno
      16  import textwrap
      17  import threading
      18  
      19  import unittest
      20  from test import support, mock_socket
      21  from test.support import hashlib_helper
      22  from test.support import socket_helper
      23  from test.support import threading_helper
      24  from test.support import warnings_helper
      25  from unittest.mock import Mock
      26  
      27  
      28  asyncore = warnings_helper.import_deprecated('asyncore')
      29  smtpd = warnings_helper.import_deprecated('smtpd')
      30  
      31  
      32  support.requires_working_socket(module=True)
      33  
      34  HOST = socket_helper.HOST
      35  
      36  if sys.platform == 'darwin':
      37      # select.poll returns a select.POLLHUP at the end of the tests
      38      # on darwin, so just ignore it
      39      def handle_expt(self):
      40          pass
      41      smtpd.SMTPChannel.handle_expt = handle_expt
      42  
      43  
      44  def server(evt, buf, serv):
      45      serv.listen()
      46      evt.set()
      47      try:
      48          conn, addr = serv.accept()
      49      except TimeoutError:
      50          pass
      51      else:
      52          n = 500
      53          while buf and n > 0:
      54              r, w, e = select.select([], [conn], [])
      55              if w:
      56                  sent = conn.send(buf)
      57                  buf = buf[sent:]
      58  
      59              n -= 1
      60  
      61          conn.close()
      62      finally:
      63          serv.close()
      64          evt.set()
      65  
      66  class ESC[4;38;5;81mGeneralTests:
      67  
      68      def setUp(self):
      69          smtplib.socket = mock_socket
      70          self.port = 25
      71  
      72      def tearDown(self):
      73          smtplib.socket = socket
      74  
      75      # This method is no longer used but is retained for backward compatibility,
      76      # so test to make sure it still works.
      77      def testQuoteData(self):
      78          teststr  = "abc\n.jkl\rfoo\r\n..blue"
      79          expected = "abc\r\n..jkl\r\nfoo\r\n...blue"
      80          self.assertEqual(expected, smtplib.quotedata(teststr))
      81  
      82      def testBasic1(self):
      83          mock_socket.reply_with(b"220 Hola mundo")
      84          # connects
      85          client = self.client(HOST, self.port)
      86          client.close()
      87  
      88      def testSourceAddress(self):
      89          mock_socket.reply_with(b"220 Hola mundo")
      90          # connects
      91          client = self.client(HOST, self.port,
      92                               source_address=('127.0.0.1',19876))
      93          self.assertEqual(client.source_address, ('127.0.0.1', 19876))
      94          client.close()
      95  
      96      def testBasic2(self):
      97          mock_socket.reply_with(b"220 Hola mundo")
      98          # connects, include port in host name
      99          client = self.client("%s:%s" % (HOST, self.port))
     100          client.close()
     101  
     102      def testLocalHostName(self):
     103          mock_socket.reply_with(b"220 Hola mundo")
     104          # check that supplied local_hostname is used
     105          client = self.client(HOST, self.port, local_hostname="testhost")
     106          self.assertEqual(client.local_hostname, "testhost")
     107          client.close()
     108  
     109      def testTimeoutDefault(self):
     110          mock_socket.reply_with(b"220 Hola mundo")
     111          self.assertIsNone(mock_socket.getdefaulttimeout())
     112          mock_socket.setdefaulttimeout(30)
     113          self.assertEqual(mock_socket.getdefaulttimeout(), 30)
     114          try:
     115              client = self.client(HOST, self.port)
     116          finally:
     117              mock_socket.setdefaulttimeout(None)
     118          self.assertEqual(client.sock.gettimeout(), 30)
     119          client.close()
     120  
     121      def testTimeoutNone(self):
     122          mock_socket.reply_with(b"220 Hola mundo")
     123          self.assertIsNone(socket.getdefaulttimeout())
     124          socket.setdefaulttimeout(30)
     125          try:
     126              client = self.client(HOST, self.port, timeout=None)
     127          finally:
     128              socket.setdefaulttimeout(None)
     129          self.assertIsNone(client.sock.gettimeout())
     130          client.close()
     131  
     132      def testTimeoutZero(self):
     133          mock_socket.reply_with(b"220 Hola mundo")
     134          with self.assertRaises(ValueError):
     135              self.client(HOST, self.port, timeout=0)
     136  
     137      def testTimeoutValue(self):
     138          mock_socket.reply_with(b"220 Hola mundo")
     139          client = self.client(HOST, self.port, timeout=30)
     140          self.assertEqual(client.sock.gettimeout(), 30)
     141          client.close()
     142  
     143      def test_debuglevel(self):
     144          mock_socket.reply_with(b"220 Hello world")
     145          client = self.client()
     146          client.set_debuglevel(1)
     147          with support.captured_stderr() as stderr:
     148              client.connect(HOST, self.port)
     149          client.close()
     150          expected = re.compile(r"^connect:", re.MULTILINE)
     151          self.assertRegex(stderr.getvalue(), expected)
     152  
     153      def test_debuglevel_2(self):
     154          mock_socket.reply_with(b"220 Hello world")
     155          client = self.client()
     156          client.set_debuglevel(2)
     157          with support.captured_stderr() as stderr:
     158              client.connect(HOST, self.port)
     159          client.close()
     160          expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ",
     161                                re.MULTILINE)
     162          self.assertRegex(stderr.getvalue(), expected)
     163  
     164  
     165  class ESC[4;38;5;81mSMTPGeneralTests(ESC[4;38;5;149mGeneralTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     166  
     167      client = smtplib.SMTP
     168  
     169  
     170  class ESC[4;38;5;81mLMTPGeneralTests(ESC[4;38;5;149mGeneralTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     171  
     172      client = smtplib.LMTP
     173  
     174      @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), "test requires Unix domain socket")
     175      def testUnixDomainSocketTimeoutDefault(self):
     176          local_host = '/some/local/lmtp/delivery/program'
     177          mock_socket.reply_with(b"220 Hello world")
     178          try:
     179              client = self.client(local_host, self.port)
     180          finally:
     181              mock_socket.setdefaulttimeout(None)
     182          self.assertIsNone(client.sock.gettimeout())
     183          client.close()
     184  
     185      def testTimeoutZero(self):
     186          super().testTimeoutZero()
     187          local_host = '/some/local/lmtp/delivery/program'
     188          with self.assertRaises(ValueError):
     189              self.client(local_host, timeout=0)
     190  
     191  # Test server thread using the specified SMTP server class
     192  def debugging_server(serv, serv_evt, client_evt):
     193      serv_evt.set()
     194  
     195      try:
     196          if hasattr(select, 'poll'):
     197              poll_fun = asyncore.poll2
     198          else:
     199              poll_fun = asyncore.poll
     200  
     201          n = 1000
     202          while asyncore.socket_map and n > 0:
     203              poll_fun(0.01, asyncore.socket_map)
     204  
     205              # when the client conversation is finished, it will
     206              # set client_evt, and it's then ok to kill the server
     207              if client_evt.is_set():
     208                  serv.close()
     209                  break
     210  
     211              n -= 1
     212  
     213      except TimeoutError:
     214          pass
     215      finally:
     216          if not client_evt.is_set():
     217              # allow some time for the client to read the result
     218              time.sleep(0.5)
     219              serv.close()
     220          asyncore.close_all()
     221          serv_evt.set()
     222  
     223  MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
     224  MSG_END = '------------ END MESSAGE ------------\n'
     225  
     226  # NOTE: Some SMTP objects in the tests below are created with a non-default
     227  # local_hostname argument to the constructor, since (on some systems) the FQDN
     228  # lookup caused by the default local_hostname sometimes takes so long that the
     229  # test server times out, causing the test to fail.
     230  
     231  # Test behavior of smtpd.DebuggingServer
     232  class ESC[4;38;5;81mDebuggingServerTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     233  
     234      maxDiff = None
     235  
     236      def setUp(self):
     237          self.thread_key = threading_helper.threading_setup()
     238          self.real_getfqdn = socket.getfqdn
     239          socket.getfqdn = mock_socket.getfqdn
     240          # temporarily replace sys.stdout to capture DebuggingServer output
     241          self.old_stdout = sys.stdout
     242          self.output = io.StringIO()
     243          sys.stdout = self.output
     244  
     245          self.serv_evt = threading.Event()
     246          self.client_evt = threading.Event()
     247          # Capture SMTPChannel debug output
     248          self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
     249          smtpd.DEBUGSTREAM = io.StringIO()
     250          # Pick a random unused port by passing 0 for the port number
     251          self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1),
     252                                            decode_data=True)
     253          # Keep a note of what server host and port were assigned
     254          self.host, self.port = self.serv.socket.getsockname()[:2]
     255          serv_args = (self.serv, self.serv_evt, self.client_evt)
     256          self.thread = threading.Thread(target=debugging_server, args=serv_args)
     257          self.thread.start()
     258  
     259          # wait until server thread has assigned a port number
     260          self.serv_evt.wait()
     261          self.serv_evt.clear()
     262  
     263      def tearDown(self):
     264          socket.getfqdn = self.real_getfqdn
     265          # indicate that the client is finished
     266          self.client_evt.set()
     267          # wait for the server thread to terminate
     268          self.serv_evt.wait()
     269          threading_helper.join_thread(self.thread)
     270          # restore sys.stdout
     271          sys.stdout = self.old_stdout
     272          # restore DEBUGSTREAM
     273          smtpd.DEBUGSTREAM.close()
     274          smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
     275          del self.thread
     276          self.doCleanups()
     277          threading_helper.threading_cleanup(*self.thread_key)
     278  
     279      def get_output_without_xpeer(self):
     280          test_output = self.output.getvalue()
     281          return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2',
     282                        test_output, flags=re.MULTILINE|re.DOTALL)
     283  
     284      def testBasic(self):
     285          # connect
     286          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     287                              timeout=support.LOOPBACK_TIMEOUT)
     288          smtp.quit()
     289  
     290      def testSourceAddress(self):
     291          # connect
     292          src_port = socket_helper.find_unused_port()
     293          try:
     294              smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost',
     295                                  timeout=support.LOOPBACK_TIMEOUT,
     296                                  source_address=(self.host, src_port))
     297              self.addCleanup(smtp.close)
     298              self.assertEqual(smtp.source_address, (self.host, src_port))
     299              self.assertEqual(smtp.local_hostname, 'localhost')
     300              smtp.quit()
     301          except OSError as e:
     302              if e.errno == errno.EADDRINUSE:
     303                  self.skipTest("couldn't bind to source port %d" % src_port)
     304              raise
     305  
     306      def testNOOP(self):
     307          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     308                              timeout=support.LOOPBACK_TIMEOUT)
     309          self.addCleanup(smtp.close)
     310          expected = (250, b'OK')
     311          self.assertEqual(smtp.noop(), expected)
     312          smtp.quit()
     313  
     314      def testRSET(self):
     315          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     316                              timeout=support.LOOPBACK_TIMEOUT)
     317          self.addCleanup(smtp.close)
     318          expected = (250, b'OK')
     319          self.assertEqual(smtp.rset(), expected)
     320          smtp.quit()
     321  
     322      def testELHO(self):
     323          # EHLO isn't implemented in DebuggingServer
     324          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     325                              timeout=support.LOOPBACK_TIMEOUT)
     326          self.addCleanup(smtp.close)
     327          expected = (250, b'\nSIZE 33554432\nHELP')
     328          self.assertEqual(smtp.ehlo(), expected)
     329          smtp.quit()
     330  
     331      def testEXPNNotImplemented(self):
     332          # EXPN isn't implemented in DebuggingServer
     333          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     334                              timeout=support.LOOPBACK_TIMEOUT)
     335          self.addCleanup(smtp.close)
     336          expected = (502, b'EXPN not implemented')
     337          smtp.putcmd('EXPN')
     338          self.assertEqual(smtp.getreply(), expected)
     339          smtp.quit()
     340  
     341      def test_issue43124_putcmd_escapes_newline(self):
     342          # see: https://bugs.python.org/issue43124
     343          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     344                              timeout=support.LOOPBACK_TIMEOUT)
     345          self.addCleanup(smtp.close)
     346          with self.assertRaises(ValueError) as exc:
     347              smtp.putcmd('helo\nX-INJECTED')
     348          self.assertIn("prohibited newline characters", str(exc.exception))
     349          smtp.quit()
     350  
     351      def testVRFY(self):
     352          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     353                              timeout=support.LOOPBACK_TIMEOUT)
     354          self.addCleanup(smtp.close)
     355          expected = (252, b'Cannot VRFY user, but will accept message ' + \
     356                           b'and attempt delivery')
     357          self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
     358          self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
     359          smtp.quit()
     360  
     361      def testSecondHELO(self):
     362          # check that a second HELO returns a message that it's a duplicate
     363          # (this behavior is specific to smtpd.SMTPChannel)
     364          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     365                              timeout=support.LOOPBACK_TIMEOUT)
     366          self.addCleanup(smtp.close)
     367          smtp.helo()
     368          expected = (503, b'Duplicate HELO/EHLO')
     369          self.assertEqual(smtp.helo(), expected)
     370          smtp.quit()
     371  
     372      def testHELP(self):
     373          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     374                              timeout=support.LOOPBACK_TIMEOUT)
     375          self.addCleanup(smtp.close)
     376          self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \
     377                                        b'RCPT DATA RSET NOOP QUIT VRFY')
     378          smtp.quit()
     379  
     380      def testSend(self):
     381          # connect and send mail
     382          m = 'A test message'
     383          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     384                              timeout=support.LOOPBACK_TIMEOUT)
     385          self.addCleanup(smtp.close)
     386          smtp.sendmail('John', 'Sally', m)
     387          # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
     388          # in asyncore.  This sleep might help, but should really be fixed
     389          # properly by using an Event variable.
     390          time.sleep(0.01)
     391          smtp.quit()
     392  
     393          self.client_evt.set()
     394          self.serv_evt.wait()
     395          self.output.flush()
     396          mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
     397          self.assertEqual(self.output.getvalue(), mexpect)
     398  
     399      def testSendBinary(self):
     400          m = b'A test message'
     401          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     402                              timeout=support.LOOPBACK_TIMEOUT)
     403          self.addCleanup(smtp.close)
     404          smtp.sendmail('John', 'Sally', m)
     405          # XXX (see comment in testSend)
     406          time.sleep(0.01)
     407          smtp.quit()
     408  
     409          self.client_evt.set()
     410          self.serv_evt.wait()
     411          self.output.flush()
     412          mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END)
     413          self.assertEqual(self.output.getvalue(), mexpect)
     414  
     415      def testSendNeedingDotQuote(self):
     416          # Issue 12283
     417          m = '.A test\n.mes.sage.'
     418          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     419                              timeout=support.LOOPBACK_TIMEOUT)
     420          self.addCleanup(smtp.close)
     421          smtp.sendmail('John', 'Sally', m)
     422          # XXX (see comment in testSend)
     423          time.sleep(0.01)
     424          smtp.quit()
     425  
     426          self.client_evt.set()
     427          self.serv_evt.wait()
     428          self.output.flush()
     429          mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
     430          self.assertEqual(self.output.getvalue(), mexpect)
     431  
     432      def test_issue43124_escape_localhostname(self):
     433          # see: https://bugs.python.org/issue43124
     434          # connect and send mail
     435          m = 'wazzuuup\nlinetwo'
     436          smtp = smtplib.SMTP(HOST, self.port, local_hostname='hi\nX-INJECTED',
     437                              timeout=support.LOOPBACK_TIMEOUT)
     438          self.addCleanup(smtp.close)
     439          with self.assertRaises(ValueError) as exc:
     440              smtp.sendmail("hi@me.com", "you@me.com", m)
     441          self.assertIn(
     442              "prohibited newline characters: ehlo hi\\nX-INJECTED",
     443              str(exc.exception),
     444          )
     445          # XXX (see comment in testSend)
     446          time.sleep(0.01)
     447          smtp.quit()
     448  
     449          debugout = smtpd.DEBUGSTREAM.getvalue()
     450          self.assertNotIn("X-INJECTED", debugout)
     451  
     452      def test_issue43124_escape_options(self):
     453          # see: https://bugs.python.org/issue43124
     454          # connect and send mail
     455          m = 'wazzuuup\nlinetwo'
     456          smtp = smtplib.SMTP(
     457              HOST, self.port, local_hostname='localhost',
     458              timeout=support.LOOPBACK_TIMEOUT)
     459  
     460          self.addCleanup(smtp.close)
     461          smtp.sendmail("hi@me.com", "you@me.com", m)
     462          with self.assertRaises(ValueError) as exc:
     463              smtp.mail("hi@me.com", ["X-OPTION\nX-INJECTED-1", "X-OPTION2\nX-INJECTED-2"])
     464          msg = str(exc.exception)
     465          self.assertIn("prohibited newline characters", msg)
     466          self.assertIn("X-OPTION\\nX-INJECTED-1 X-OPTION2\\nX-INJECTED-2", msg)
     467          # XXX (see comment in testSend)
     468          time.sleep(0.01)
     469          smtp.quit()
     470  
     471          debugout = smtpd.DEBUGSTREAM.getvalue()
     472          self.assertNotIn("X-OPTION", debugout)
     473          self.assertNotIn("X-OPTION2", debugout)
     474          self.assertNotIn("X-INJECTED-1", debugout)
     475          self.assertNotIn("X-INJECTED-2", debugout)
     476  
     477      def testSendNullSender(self):
     478          m = 'A test message'
     479          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     480                              timeout=support.LOOPBACK_TIMEOUT)
     481          self.addCleanup(smtp.close)
     482          smtp.sendmail('<>', 'Sally', m)
     483          # XXX (see comment in testSend)
     484          time.sleep(0.01)
     485          smtp.quit()
     486  
     487          self.client_evt.set()
     488          self.serv_evt.wait()
     489          self.output.flush()
     490          mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
     491          self.assertEqual(self.output.getvalue(), mexpect)
     492          debugout = smtpd.DEBUGSTREAM.getvalue()
     493          sender = re.compile("^sender: <>$", re.MULTILINE)
     494          self.assertRegex(debugout, sender)
     495  
     496      def testSendMessage(self):
     497          m = email.mime.text.MIMEText('A test message')
     498          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     499                              timeout=support.LOOPBACK_TIMEOUT)
     500          self.addCleanup(smtp.close)
     501          smtp.send_message(m, from_addr='John', to_addrs='Sally')
     502          # XXX (see comment in testSend)
     503          time.sleep(0.01)
     504          smtp.quit()
     505  
     506          self.client_evt.set()
     507          self.serv_evt.wait()
     508          self.output.flush()
     509          # Remove the X-Peer header that DebuggingServer adds as figuring out
     510          # exactly what IP address format is put there is not easy (and
     511          # irrelevant to our test).  Typically 127.0.0.1 or ::1, but it is
     512          # not always the same as socket.gethostbyname(HOST). :(
     513          test_output = self.get_output_without_xpeer()
     514          del m['X-Peer']
     515          mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
     516          self.assertEqual(test_output, mexpect)
     517  
     518      def testSendMessageWithAddresses(self):
     519          m = email.mime.text.MIMEText('A test message')
     520          m['From'] = 'foo@bar.com'
     521          m['To'] = 'John'
     522          m['CC'] = 'Sally, Fred'
     523          m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
     524          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     525                              timeout=support.LOOPBACK_TIMEOUT)
     526          self.addCleanup(smtp.close)
     527          smtp.send_message(m)
     528          # XXX (see comment in testSend)
     529          time.sleep(0.01)
     530          smtp.quit()
     531          # make sure the Bcc header is still in the message.
     532          self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" '
     533                                      '<warped@silly.walks.com>')
     534  
     535          self.client_evt.set()
     536          self.serv_evt.wait()
     537          self.output.flush()
     538          # Remove the X-Peer header that DebuggingServer adds.
     539          test_output = self.get_output_without_xpeer()
     540          del m['X-Peer']
     541          # The Bcc header should not be transmitted.
     542          del m['Bcc']
     543          mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
     544          self.assertEqual(test_output, mexpect)
     545          debugout = smtpd.DEBUGSTREAM.getvalue()
     546          sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
     547          self.assertRegex(debugout, sender)
     548          for addr in ('John', 'Sally', 'Fred', 'root@localhost',
     549                       'warped@silly.walks.com'):
     550              to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
     551                                   re.MULTILINE)
     552              self.assertRegex(debugout, to_addr)
     553  
     554      def testSendMessageWithSomeAddresses(self):
     555          # Make sure nothing breaks if not all of the three 'to' headers exist
     556          m = email.mime.text.MIMEText('A test message')
     557          m['From'] = 'foo@bar.com'
     558          m['To'] = 'John, Dinsdale'
     559          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     560                              timeout=support.LOOPBACK_TIMEOUT)
     561          self.addCleanup(smtp.close)
     562          smtp.send_message(m)
     563          # XXX (see comment in testSend)
     564          time.sleep(0.01)
     565          smtp.quit()
     566  
     567          self.client_evt.set()
     568          self.serv_evt.wait()
     569          self.output.flush()
     570          # Remove the X-Peer header that DebuggingServer adds.
     571          test_output = self.get_output_without_xpeer()
     572          del m['X-Peer']
     573          mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
     574          self.assertEqual(test_output, mexpect)
     575          debugout = smtpd.DEBUGSTREAM.getvalue()
     576          sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
     577          self.assertRegex(debugout, sender)
     578          for addr in ('John', 'Dinsdale'):
     579              to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
     580                                   re.MULTILINE)
     581              self.assertRegex(debugout, to_addr)
     582  
     583      def testSendMessageWithSpecifiedAddresses(self):
     584          # Make sure addresses specified in call override those in message.
     585          m = email.mime.text.MIMEText('A test message')
     586          m['From'] = 'foo@bar.com'
     587          m['To'] = 'John, Dinsdale'
     588          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     589                              timeout=support.LOOPBACK_TIMEOUT)
     590          self.addCleanup(smtp.close)
     591          smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net')
     592          # XXX (see comment in testSend)
     593          time.sleep(0.01)
     594          smtp.quit()
     595  
     596          self.client_evt.set()
     597          self.serv_evt.wait()
     598          self.output.flush()
     599          # Remove the X-Peer header that DebuggingServer adds.
     600          test_output = self.get_output_without_xpeer()
     601          del m['X-Peer']
     602          mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
     603          self.assertEqual(test_output, mexpect)
     604          debugout = smtpd.DEBUGSTREAM.getvalue()
     605          sender = re.compile("^sender: joe@example.com$", re.MULTILINE)
     606          self.assertRegex(debugout, sender)
     607          for addr in ('John', 'Dinsdale'):
     608              to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
     609                                   re.MULTILINE)
     610              self.assertNotRegex(debugout, to_addr)
     611          recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE)
     612          self.assertRegex(debugout, recip)
     613  
     614      def testSendMessageWithMultipleFrom(self):
     615          # Sender overrides To
     616          m = email.mime.text.MIMEText('A test message')
     617          m['From'] = 'Bernard, Bianca'
     618          m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com'
     619          m['To'] = 'John, Dinsdale'
     620          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     621                              timeout=support.LOOPBACK_TIMEOUT)
     622          self.addCleanup(smtp.close)
     623          smtp.send_message(m)
     624          # XXX (see comment in testSend)
     625          time.sleep(0.01)
     626          smtp.quit()
     627  
     628          self.client_evt.set()
     629          self.serv_evt.wait()
     630          self.output.flush()
     631          # Remove the X-Peer header that DebuggingServer adds.
     632          test_output = self.get_output_without_xpeer()
     633          del m['X-Peer']
     634          mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
     635          self.assertEqual(test_output, mexpect)
     636          debugout = smtpd.DEBUGSTREAM.getvalue()
     637          sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE)
     638          self.assertRegex(debugout, sender)
     639          for addr in ('John', 'Dinsdale'):
     640              to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
     641                                   re.MULTILINE)
     642              self.assertRegex(debugout, to_addr)
     643  
     644      def testSendMessageResent(self):
     645          m = email.mime.text.MIMEText('A test message')
     646          m['From'] = 'foo@bar.com'
     647          m['To'] = 'John'
     648          m['CC'] = 'Sally, Fred'
     649          m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
     650          m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
     651          m['Resent-From'] = 'holy@grail.net'
     652          m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
     653          m['Resent-Bcc'] = 'doe@losthope.net'
     654          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     655                              timeout=support.LOOPBACK_TIMEOUT)
     656          self.addCleanup(smtp.close)
     657          smtp.send_message(m)
     658          # XXX (see comment in testSend)
     659          time.sleep(0.01)
     660          smtp.quit()
     661  
     662          self.client_evt.set()
     663          self.serv_evt.wait()
     664          self.output.flush()
     665          # The Resent-Bcc headers are deleted before serialization.
     666          del m['Bcc']
     667          del m['Resent-Bcc']
     668          # Remove the X-Peer header that DebuggingServer adds.
     669          test_output = self.get_output_without_xpeer()
     670          del m['X-Peer']
     671          mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
     672          self.assertEqual(test_output, mexpect)
     673          debugout = smtpd.DEBUGSTREAM.getvalue()
     674          sender = re.compile("^sender: holy@grail.net$", re.MULTILINE)
     675          self.assertRegex(debugout, sender)
     676          for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'):
     677              to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
     678                                   re.MULTILINE)
     679              self.assertRegex(debugout, to_addr)
     680  
     681      def testSendMessageMultipleResentRaises(self):
     682          m = email.mime.text.MIMEText('A test message')
     683          m['From'] = 'foo@bar.com'
     684          m['To'] = 'John'
     685          m['CC'] = 'Sally, Fred'
     686          m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
     687          m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
     688          m['Resent-From'] = 'holy@grail.net'
     689          m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
     690          m['Resent-Bcc'] = 'doe@losthope.net'
     691          m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000'
     692          m['Resent-To'] = 'holy@grail.net'
     693          m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff'
     694          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
     695                              timeout=support.LOOPBACK_TIMEOUT)
     696          self.addCleanup(smtp.close)
     697          with self.assertRaises(ValueError):
     698              smtp.send_message(m)
     699          smtp.close()
     700  
     701  class ESC[4;38;5;81mNonConnectingTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     702  
     703      def testNotConnected(self):
     704          # Test various operations on an unconnected SMTP object that
     705          # should raise exceptions (at present the attempt in SMTP.send
     706          # to reference the nonexistent 'sock' attribute of the SMTP object
     707          # causes an AttributeError)
     708          smtp = smtplib.SMTP()
     709          self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
     710          self.assertRaises(smtplib.SMTPServerDisconnected,
     711                            smtp.send, 'test msg')
     712  
     713      def testNonnumericPort(self):
     714          # check that non-numeric port raises OSError
     715          self.assertRaises(OSError, smtplib.SMTP,
     716                            "localhost", "bogus")
     717          self.assertRaises(OSError, smtplib.SMTP,
     718                            "localhost:bogus")
     719  
     720      def testSockAttributeExists(self):
     721          # check that sock attribute is present outside of a connect() call
     722          # (regression test, the previous behavior raised an
     723          #  AttributeError: 'SMTP' object has no attribute 'sock')
     724          with smtplib.SMTP() as smtp:
     725              self.assertIsNone(smtp.sock)
     726  
     727  
     728  class ESC[4;38;5;81mDefaultArgumentsTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     729  
     730      def setUp(self):
     731          self.msg = EmailMessage()
     732          self.msg['From'] = 'Páolo <főo@bar.com>'
     733          self.smtp = smtplib.SMTP()
     734          self.smtp.ehlo = Mock(return_value=(200, 'OK'))
     735          self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock()
     736  
     737      def testSendMessage(self):
     738          expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME')
     739          self.smtp.send_message(self.msg)
     740          self.smtp.send_message(self.msg)
     741          self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
     742                           expected_mail_options)
     743          self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3],
     744                           expected_mail_options)
     745  
     746      def testSendMessageWithMailOptions(self):
     747          mail_options = ['STARTTLS']
     748          expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME')
     749          self.smtp.send_message(self.msg, None, None, mail_options)
     750          self.assertEqual(mail_options, ['STARTTLS'])
     751          self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
     752                           expected_mail_options)
     753  
     754  
     755  # test response of client to a non-successful HELO message
     756  class ESC[4;38;5;81mBadHELOServerTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     757  
     758      def setUp(self):
     759          smtplib.socket = mock_socket
     760          mock_socket.reply_with(b"199 no hello for you!")
     761          self.old_stdout = sys.stdout
     762          self.output = io.StringIO()
     763          sys.stdout = self.output
     764          self.port = 25
     765  
     766      def tearDown(self):
     767          smtplib.socket = socket
     768          sys.stdout = self.old_stdout
     769  
     770      def testFailingHELO(self):
     771          self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
     772                              HOST, self.port, 'localhost', 3)
     773  
     774  
     775  class ESC[4;38;5;81mTooLongLineTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     776      respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n'
     777  
     778      def setUp(self):
     779          self.thread_key = threading_helper.threading_setup()
     780          self.old_stdout = sys.stdout
     781          self.output = io.StringIO()
     782          sys.stdout = self.output
     783  
     784          self.evt = threading.Event()
     785          self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     786          self.sock.settimeout(15)
     787          self.port = socket_helper.bind_port(self.sock)
     788          servargs = (self.evt, self.respdata, self.sock)
     789          self.thread = threading.Thread(target=server, args=servargs)
     790          self.thread.start()
     791          self.evt.wait()
     792          self.evt.clear()
     793  
     794      def tearDown(self):
     795          self.evt.wait()
     796          sys.stdout = self.old_stdout
     797          threading_helper.join_thread(self.thread)
     798          del self.thread
     799          self.doCleanups()
     800          threading_helper.threading_cleanup(*self.thread_key)
     801  
     802      def testLineTooLong(self):
     803          self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP,
     804                            HOST, self.port, 'localhost', 3)
     805  
     806  
     807  sim_users = {'Mr.A@somewhere.com':'John A',
     808               'Ms.B@xn--fo-fka.com':'Sally B',
     809               'Mrs.C@somewhereesle.com':'Ruth C',
     810              }
     811  
     812  sim_auth = ('Mr.A@somewhere.com', 'somepassword')
     813  sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
     814                            'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
     815  sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
     816               'list-2':['Ms.B@xn--fo-fka.com',],
     817              }
     818  
     819  # Simulated SMTP channel & server
     820  class ESC[4;38;5;81mResponseException(ESC[4;38;5;149mException): pass
     821  class ESC[4;38;5;81mSimSMTPChannel(ESC[4;38;5;149msmtpdESC[4;38;5;149m.ESC[4;38;5;149mSMTPChannel):
     822  
     823      quit_response = None
     824      mail_response = None
     825      rcpt_response = None
     826      data_response = None
     827      rcpt_count = 0
     828      rset_count = 0
     829      disconnect = 0
     830      AUTH = 99    # Add protocol state to enable auth testing.
     831      authenticated_user = None
     832  
     833      def __init__(self, extra_features, *args, **kw):
     834          self._extrafeatures = ''.join(
     835              [ "250-{0}\r\n".format(x) for x in extra_features ])
     836          super(SimSMTPChannel, self).__init__(*args, **kw)
     837  
     838      # AUTH related stuff.  It would be nice if support for this were in smtpd.
     839      def found_terminator(self):
     840          if self.smtp_state == self.AUTH:
     841              line = self._emptystring.join(self.received_lines)
     842              print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
     843              self.received_lines = []
     844              try:
     845                  self.auth_object(line)
     846              except ResponseException as e:
     847                  self.smtp_state = self.COMMAND
     848                  self.push('%s %s' % (e.smtp_code, e.smtp_error))
     849              return
     850          super().found_terminator()
     851  
     852  
     853      def smtp_AUTH(self, arg):
     854          if not self.seen_greeting:
     855              self.push('503 Error: send EHLO first')
     856              return
     857          if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
     858              self.push('500 Error: command "AUTH" not recognized')
     859              return
     860          if self.authenticated_user is not None:
     861              self.push(
     862                  '503 Bad sequence of commands: already authenticated')
     863              return
     864          args = arg.split()
     865          if len(args) not in [1, 2]:
     866              self.push('501 Syntax: AUTH <mechanism> [initial-response]')
     867              return
     868          auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
     869          try:
     870              self.auth_object = getattr(self, auth_object_name)
     871          except AttributeError:
     872              self.push('504 Command parameter not implemented: unsupported '
     873                        ' authentication mechanism {!r}'.format(auth_object_name))
     874              return
     875          self.smtp_state = self.AUTH
     876          self.auth_object(args[1] if len(args) == 2 else None)
     877  
     878      def _authenticated(self, user, valid):
     879          if valid:
     880              self.authenticated_user = user
     881              self.push('235 Authentication Succeeded')
     882          else:
     883              self.push('535 Authentication credentials invalid')
     884          self.smtp_state = self.COMMAND
     885  
     886      def _decode_base64(self, string):
     887          return base64.decodebytes(string.encode('ascii')).decode('utf-8')
     888  
     889      def _auth_plain(self, arg=None):
     890          if arg is None:
     891              self.push('334 ')
     892          else:
     893              logpass = self._decode_base64(arg)
     894              try:
     895                  *_, user, password = logpass.split('\0')
     896              except ValueError as e:
     897                  self.push('535 Splitting response {!r} into user and password'
     898                            ' failed: {}'.format(logpass, e))
     899                  return
     900              self._authenticated(user, password == sim_auth[1])
     901  
     902      def _auth_login(self, arg=None):
     903          if arg is None:
     904              # base64 encoded 'Username:'
     905              self.push('334 VXNlcm5hbWU6')
     906          elif not hasattr(self, '_auth_login_user'):
     907              self._auth_login_user = self._decode_base64(arg)
     908              # base64 encoded 'Password:'
     909              self.push('334 UGFzc3dvcmQ6')
     910          else:
     911              password = self._decode_base64(arg)
     912              self._authenticated(self._auth_login_user, password == sim_auth[1])
     913              del self._auth_login_user
     914  
     915      def _auth_buggy(self, arg=None):
     916          # This AUTH mechanism will 'trap' client in a neverending 334
     917          # base64 encoded 'BuGgYbUgGy'
     918          self.push('334 QnVHZ1liVWdHeQ==')
     919  
     920      def _auth_cram_md5(self, arg=None):
     921          if arg is None:
     922              self.push('334 {}'.format(sim_cram_md5_challenge))
     923          else:
     924              logpass = self._decode_base64(arg)
     925              try:
     926                  user, hashed_pass = logpass.split()
     927              except ValueError as e:
     928                  self.push('535 Splitting response {!r} into user and password '
     929                            'failed: {}'.format(logpass, e))
     930                  return False
     931              valid_hashed_pass = hmac.HMAC(
     932                  sim_auth[1].encode('ascii'),
     933                  self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
     934                  'md5').hexdigest()
     935              self._authenticated(user, hashed_pass == valid_hashed_pass)
     936      # end AUTH related stuff.
     937  
     938      def smtp_EHLO(self, arg):
     939          resp = ('250-testhost\r\n'
     940                  '250-EXPN\r\n'
     941                  '250-SIZE 20000000\r\n'
     942                  '250-STARTTLS\r\n'
     943                  '250-DELIVERBY\r\n')
     944          resp = resp + self._extrafeatures + '250 HELP'
     945          self.push(resp)
     946          self.seen_greeting = arg
     947          self.extended_smtp = True
     948  
     949      def smtp_VRFY(self, arg):
     950          # For max compatibility smtplib should be sending the raw address.
     951          if arg in sim_users:
     952              self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg)))
     953          else:
     954              self.push('550 No such user: %s' % arg)
     955  
     956      def smtp_EXPN(self, arg):
     957          list_name = arg.lower()
     958          if list_name in sim_lists:
     959              user_list = sim_lists[list_name]
     960              for n, user_email in enumerate(user_list):
     961                  quoted_addr = smtplib.quoteaddr(user_email)
     962                  if n < len(user_list) - 1:
     963                      self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
     964                  else:
     965                      self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
     966          else:
     967              self.push('550 No access for you!')
     968  
     969      def smtp_QUIT(self, arg):
     970          if self.quit_response is None:
     971              super(SimSMTPChannel, self).smtp_QUIT(arg)
     972          else:
     973              self.push(self.quit_response)
     974              self.close_when_done()
     975  
     976      def smtp_MAIL(self, arg):
     977          if self.mail_response is None:
     978              super().smtp_MAIL(arg)
     979          else:
     980              self.push(self.mail_response)
     981              if self.disconnect:
     982                  self.close_when_done()
     983  
     984      def smtp_RCPT(self, arg):
     985          if self.rcpt_response is None:
     986              super().smtp_RCPT(arg)
     987              return
     988          self.rcpt_count += 1
     989          self.push(self.rcpt_response[self.rcpt_count-1])
     990  
     991      def smtp_RSET(self, arg):
     992          self.rset_count += 1
     993          super().smtp_RSET(arg)
     994  
     995      def smtp_DATA(self, arg):
     996          if self.data_response is None:
     997              super().smtp_DATA(arg)
     998          else:
     999              self.push(self.data_response)
    1000  
    1001      def handle_error(self):
    1002          raise
    1003  
    1004  
    1005  class ESC[4;38;5;81mSimSMTPServer(ESC[4;38;5;149msmtpdESC[4;38;5;149m.ESC[4;38;5;149mSMTPServer):
    1006  
    1007      channel_class = SimSMTPChannel
    1008  
    1009      def __init__(self, *args, **kw):
    1010          self._extra_features = []
    1011          self._addresses = {}
    1012          smtpd.SMTPServer.__init__(self, *args, **kw)
    1013  
    1014      def handle_accepted(self, conn, addr):
    1015          self._SMTPchannel = self.channel_class(
    1016              self._extra_features, self, conn, addr,
    1017              decode_data=self._decode_data)
    1018  
    1019      def process_message(self, peer, mailfrom, rcpttos, data):
    1020          self._addresses['from'] = mailfrom
    1021          self._addresses['tos'] = rcpttos
    1022  
    1023      def add_feature(self, feature):
    1024          self._extra_features.append(feature)
    1025  
    1026      def handle_error(self):
    1027          raise
    1028  
    1029  
    1030  # Test various SMTP & ESMTP commands/behaviors that require a simulated server
    1031  # (i.e., something with more features than DebuggingServer)
    1032  class ESC[4;38;5;81mSMTPSimTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1033  
    1034      def setUp(self):
    1035          self.thread_key = threading_helper.threading_setup()
    1036          self.real_getfqdn = socket.getfqdn
    1037          socket.getfqdn = mock_socket.getfqdn
    1038          self.serv_evt = threading.Event()
    1039          self.client_evt = threading.Event()
    1040          # Pick a random unused port by passing 0 for the port number
    1041          self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True)
    1042          # Keep a note of what port was assigned
    1043          self.port = self.serv.socket.getsockname()[1]
    1044          serv_args = (self.serv, self.serv_evt, self.client_evt)
    1045          self.thread = threading.Thread(target=debugging_server, args=serv_args)
    1046          self.thread.start()
    1047  
    1048          # wait until server thread has assigned a port number
    1049          self.serv_evt.wait()
    1050          self.serv_evt.clear()
    1051  
    1052      def tearDown(self):
    1053          socket.getfqdn = self.real_getfqdn
    1054          # indicate that the client is finished
    1055          self.client_evt.set()
    1056          # wait for the server thread to terminate
    1057          self.serv_evt.wait()
    1058          threading_helper.join_thread(self.thread)
    1059          del self.thread
    1060          self.doCleanups()
    1061          threading_helper.threading_cleanup(*self.thread_key)
    1062  
    1063      def testBasic(self):
    1064          # smoke test
    1065          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1066                              timeout=support.LOOPBACK_TIMEOUT)
    1067          smtp.quit()
    1068  
    1069      def testEHLO(self):
    1070          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1071                              timeout=support.LOOPBACK_TIMEOUT)
    1072  
    1073          # no features should be present before the EHLO
    1074          self.assertEqual(smtp.esmtp_features, {})
    1075  
    1076          # features expected from the test server
    1077          expected_features = {'expn':'',
    1078                               'size': '20000000',
    1079                               'starttls': '',
    1080                               'deliverby': '',
    1081                               'help': '',
    1082                               }
    1083  
    1084          smtp.ehlo()
    1085          self.assertEqual(smtp.esmtp_features, expected_features)
    1086          for k in expected_features:
    1087              self.assertTrue(smtp.has_extn(k))
    1088          self.assertFalse(smtp.has_extn('unsupported-feature'))
    1089          smtp.quit()
    1090  
    1091      def testVRFY(self):
    1092          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1093                              timeout=support.LOOPBACK_TIMEOUT)
    1094  
    1095          for addr_spec, name in sim_users.items():
    1096              expected_known = (250, bytes('%s %s' %
    1097                                           (name, smtplib.quoteaddr(addr_spec)),
    1098                                           "ascii"))
    1099              self.assertEqual(smtp.vrfy(addr_spec), expected_known)
    1100  
    1101          u = 'nobody@nowhere.com'
    1102          expected_unknown = (550, ('No such user: %s' % u).encode('ascii'))
    1103          self.assertEqual(smtp.vrfy(u), expected_unknown)
    1104          smtp.quit()
    1105  
    1106      def testEXPN(self):
    1107          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1108                              timeout=support.LOOPBACK_TIMEOUT)
    1109  
    1110          for listname, members in sim_lists.items():
    1111              users = []
    1112              for m in members:
    1113                  users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
    1114              expected_known = (250, bytes('\n'.join(users), "ascii"))
    1115              self.assertEqual(smtp.expn(listname), expected_known)
    1116  
    1117          u = 'PSU-Members-List'
    1118          expected_unknown = (550, b'No access for you!')
    1119          self.assertEqual(smtp.expn(u), expected_unknown)
    1120          smtp.quit()
    1121  
    1122      def testAUTH_PLAIN(self):
    1123          self.serv.add_feature("AUTH PLAIN")
    1124          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1125                              timeout=support.LOOPBACK_TIMEOUT)
    1126          resp = smtp.login(sim_auth[0], sim_auth[1])
    1127          self.assertEqual(resp, (235, b'Authentication Succeeded'))
    1128          smtp.close()
    1129  
    1130      def testAUTH_LOGIN(self):
    1131          self.serv.add_feature("AUTH LOGIN")
    1132          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1133                              timeout=support.LOOPBACK_TIMEOUT)
    1134          resp = smtp.login(sim_auth[0], sim_auth[1])
    1135          self.assertEqual(resp, (235, b'Authentication Succeeded'))
    1136          smtp.close()
    1137  
    1138      def testAUTH_LOGIN_initial_response_ok(self):
    1139          self.serv.add_feature("AUTH LOGIN")
    1140          with smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1141                            timeout=support.LOOPBACK_TIMEOUT) as smtp:
    1142              smtp.user, smtp.password = sim_auth
    1143              smtp.ehlo("test_auth_login")
    1144              resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=True)
    1145              self.assertEqual(resp, (235, b'Authentication Succeeded'))
    1146  
    1147      def testAUTH_LOGIN_initial_response_notok(self):
    1148          self.serv.add_feature("AUTH LOGIN")
    1149          with smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1150                            timeout=support.LOOPBACK_TIMEOUT) as smtp:
    1151              smtp.user, smtp.password = sim_auth
    1152              smtp.ehlo("test_auth_login")
    1153              resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=False)
    1154              self.assertEqual(resp, (235, b'Authentication Succeeded'))
    1155  
    1156      def testAUTH_BUGGY(self):
    1157          self.serv.add_feature("AUTH BUGGY")
    1158  
    1159          def auth_buggy(challenge=None):
    1160              self.assertEqual(b"BuGgYbUgGy", challenge)
    1161              return "\0"
    1162  
    1163          smtp = smtplib.SMTP(
    1164              HOST, self.port, local_hostname='localhost',
    1165              timeout=support.LOOPBACK_TIMEOUT
    1166          )
    1167          try:
    1168              smtp.user, smtp.password = sim_auth
    1169              smtp.ehlo("test_auth_buggy")
    1170              expect = r"^Server AUTH mechanism infinite loop.*"
    1171              with self.assertRaisesRegex(smtplib.SMTPException, expect) as cm:
    1172                  smtp.auth("BUGGY", auth_buggy, initial_response_ok=False)
    1173          finally:
    1174              smtp.close()
    1175  
    1176      @hashlib_helper.requires_hashdigest('md5', openssl=True)
    1177      def testAUTH_CRAM_MD5(self):
    1178          self.serv.add_feature("AUTH CRAM-MD5")
    1179          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1180                              timeout=support.LOOPBACK_TIMEOUT)
    1181          resp = smtp.login(sim_auth[0], sim_auth[1])
    1182          self.assertEqual(resp, (235, b'Authentication Succeeded'))
    1183          smtp.close()
    1184  
    1185      @hashlib_helper.requires_hashdigest('md5', openssl=True)
    1186      def testAUTH_multiple(self):
    1187          # Test that multiple authentication methods are tried.
    1188          self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
    1189          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1190                              timeout=support.LOOPBACK_TIMEOUT)
    1191          resp = smtp.login(sim_auth[0], sim_auth[1])
    1192          self.assertEqual(resp, (235, b'Authentication Succeeded'))
    1193          smtp.close()
    1194  
    1195      def test_auth_function(self):
    1196          supported = {'PLAIN', 'LOGIN'}
    1197          try:
    1198              hashlib.md5()
    1199          except ValueError:
    1200              pass
    1201          else:
    1202              supported.add('CRAM-MD5')
    1203          for mechanism in supported:
    1204              self.serv.add_feature("AUTH {}".format(mechanism))
    1205          for mechanism in supported:
    1206              with self.subTest(mechanism=mechanism):
    1207                  smtp = smtplib.SMTP(HOST, self.port,
    1208                                      local_hostname='localhost',
    1209                                      timeout=support.LOOPBACK_TIMEOUT)
    1210                  smtp.ehlo('foo')
    1211                  smtp.user, smtp.password = sim_auth[0], sim_auth[1]
    1212                  method = 'auth_' + mechanism.lower().replace('-', '_')
    1213                  resp = smtp.auth(mechanism, getattr(smtp, method))
    1214                  self.assertEqual(resp, (235, b'Authentication Succeeded'))
    1215                  smtp.close()
    1216  
    1217      def test_quit_resets_greeting(self):
    1218          smtp = smtplib.SMTP(HOST, self.port,
    1219                              local_hostname='localhost',
    1220                              timeout=support.LOOPBACK_TIMEOUT)
    1221          code, message = smtp.ehlo()
    1222          self.assertEqual(code, 250)
    1223          self.assertIn('size', smtp.esmtp_features)
    1224          smtp.quit()
    1225          self.assertNotIn('size', smtp.esmtp_features)
    1226          smtp.connect(HOST, self.port)
    1227          self.assertNotIn('size', smtp.esmtp_features)
    1228          smtp.ehlo_or_helo_if_needed()
    1229          self.assertIn('size', smtp.esmtp_features)
    1230          smtp.quit()
    1231  
    1232      def test_with_statement(self):
    1233          with smtplib.SMTP(HOST, self.port) as smtp:
    1234              code, message = smtp.noop()
    1235              self.assertEqual(code, 250)
    1236          self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
    1237          with smtplib.SMTP(HOST, self.port) as smtp:
    1238              smtp.close()
    1239          self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
    1240  
    1241      def test_with_statement_QUIT_failure(self):
    1242          with self.assertRaises(smtplib.SMTPResponseException) as error:
    1243              with smtplib.SMTP(HOST, self.port) as smtp:
    1244                  smtp.noop()
    1245                  self.serv._SMTPchannel.quit_response = '421 QUIT FAILED'
    1246          self.assertEqual(error.exception.smtp_code, 421)
    1247          self.assertEqual(error.exception.smtp_error, b'QUIT FAILED')
    1248  
    1249      #TODO: add tests for correct AUTH method fallback now that the
    1250      #test infrastructure can support it.
    1251  
    1252      # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception
    1253      def test__rest_from_mail_cmd(self):
    1254          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1255                              timeout=support.LOOPBACK_TIMEOUT)
    1256          smtp.noop()
    1257          self.serv._SMTPchannel.mail_response = '451 Requested action aborted'
    1258          self.serv._SMTPchannel.disconnect = True
    1259          with self.assertRaises(smtplib.SMTPSenderRefused):
    1260              smtp.sendmail('John', 'Sally', 'test message')
    1261          self.assertIsNone(smtp.sock)
    1262  
    1263      # Issue 5713: make sure close, not rset, is called if we get a 421 error
    1264      def test_421_from_mail_cmd(self):
    1265          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1266                              timeout=support.LOOPBACK_TIMEOUT)
    1267          smtp.noop()
    1268          self.serv._SMTPchannel.mail_response = '421 closing connection'
    1269          with self.assertRaises(smtplib.SMTPSenderRefused):
    1270              smtp.sendmail('John', 'Sally', 'test message')
    1271          self.assertIsNone(smtp.sock)
    1272          self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
    1273  
    1274      def test_421_from_rcpt_cmd(self):
    1275          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1276                              timeout=support.LOOPBACK_TIMEOUT)
    1277          smtp.noop()
    1278          self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing']
    1279          with self.assertRaises(smtplib.SMTPRecipientsRefused) as r:
    1280              smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message')
    1281          self.assertIsNone(smtp.sock)
    1282          self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
    1283          self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')})
    1284  
    1285      def test_421_from_data_cmd(self):
    1286          class ESC[4;38;5;81mMySimSMTPChannel(ESC[4;38;5;149mSimSMTPChannel):
    1287              def found_terminator(self):
    1288                  if self.smtp_state == self.DATA:
    1289                      self.push('421 closing')
    1290                  else:
    1291                      super().found_terminator()
    1292          self.serv.channel_class = MySimSMTPChannel
    1293          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1294                              timeout=support.LOOPBACK_TIMEOUT)
    1295          smtp.noop()
    1296          with self.assertRaises(smtplib.SMTPDataError):
    1297              smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message')
    1298          self.assertIsNone(smtp.sock)
    1299          self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0)
    1300  
    1301      def test_smtputf8_NotSupportedError_if_no_server_support(self):
    1302          smtp = smtplib.SMTP(
    1303              HOST, self.port, local_hostname='localhost',
    1304              timeout=support.LOOPBACK_TIMEOUT)
    1305          self.addCleanup(smtp.close)
    1306          smtp.ehlo()
    1307          self.assertTrue(smtp.does_esmtp)
    1308          self.assertFalse(smtp.has_extn('smtputf8'))
    1309          self.assertRaises(
    1310              smtplib.SMTPNotSupportedError,
    1311              smtp.sendmail,
    1312              'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
    1313          self.assertRaises(
    1314              smtplib.SMTPNotSupportedError,
    1315              smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8'])
    1316  
    1317      def test_send_unicode_without_SMTPUTF8(self):
    1318          smtp = smtplib.SMTP(
    1319              HOST, self.port, local_hostname='localhost',
    1320              timeout=support.LOOPBACK_TIMEOUT)
    1321          self.addCleanup(smtp.close)
    1322          self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '')
    1323          self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice')
    1324  
    1325      def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self):
    1326          # This test is located here and not in the SMTPUTF8SimTests
    1327          # class because it needs a "regular" SMTP server to work
    1328          msg = EmailMessage()
    1329          msg['From'] = "Páolo <főo@bar.com>"
    1330          msg['To'] = 'Dinsdale'
    1331          msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
    1332          smtp = smtplib.SMTP(
    1333              HOST, self.port, local_hostname='localhost',
    1334              timeout=support.LOOPBACK_TIMEOUT)
    1335          self.addCleanup(smtp.close)
    1336          with self.assertRaises(smtplib.SMTPNotSupportedError):
    1337              smtp.send_message(msg)
    1338  
    1339      def test_name_field_not_included_in_envelop_addresses(self):
    1340          smtp = smtplib.SMTP(
    1341              HOST, self.port, local_hostname='localhost',
    1342              timeout=support.LOOPBACK_TIMEOUT)
    1343          self.addCleanup(smtp.close)
    1344  
    1345          message = EmailMessage()
    1346          message['From'] = email.utils.formataddr(('Michaël', 'michael@example.com'))
    1347          message['To'] = email.utils.formataddr(('René', 'rene@example.com'))
    1348  
    1349          self.assertDictEqual(smtp.send_message(message), {})
    1350  
    1351          self.assertEqual(self.serv._addresses['from'], 'michael@example.com')
    1352          self.assertEqual(self.serv._addresses['tos'], ['rene@example.com'])
    1353  
    1354  
    1355  class ESC[4;38;5;81mSimSMTPUTF8Server(ESC[4;38;5;149mSimSMTPServer):
    1356  
    1357      def __init__(self, *args, **kw):
    1358          # The base SMTP server turns these on automatically, but our test
    1359          # server is set up to munge the EHLO response, so we need to provide
    1360          # them as well.  And yes, the call is to SMTPServer not SimSMTPServer.
    1361          self._extra_features = ['SMTPUTF8', '8BITMIME']
    1362          smtpd.SMTPServer.__init__(self, *args, **kw)
    1363  
    1364      def handle_accepted(self, conn, addr):
    1365          self._SMTPchannel = self.channel_class(
    1366              self._extra_features, self, conn, addr,
    1367              decode_data=self._decode_data,
    1368              enable_SMTPUTF8=self.enable_SMTPUTF8,
    1369          )
    1370  
    1371      def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None,
    1372                                                               rcpt_options=None):
    1373          self.last_peer = peer
    1374          self.last_mailfrom = mailfrom
    1375          self.last_rcpttos = rcpttos
    1376          self.last_message = data
    1377          self.last_mail_options = mail_options
    1378          self.last_rcpt_options = rcpt_options
    1379  
    1380  
    1381  class ESC[4;38;5;81mSMTPUTF8SimTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1382  
    1383      maxDiff = None
    1384  
    1385      def setUp(self):
    1386          self.thread_key = threading_helper.threading_setup()
    1387          self.real_getfqdn = socket.getfqdn
    1388          socket.getfqdn = mock_socket.getfqdn
    1389          self.serv_evt = threading.Event()
    1390          self.client_evt = threading.Event()
    1391          # Pick a random unused port by passing 0 for the port number
    1392          self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1),
    1393                                        decode_data=False,
    1394                                        enable_SMTPUTF8=True)
    1395          # Keep a note of what port was assigned
    1396          self.port = self.serv.socket.getsockname()[1]
    1397          serv_args = (self.serv, self.serv_evt, self.client_evt)
    1398          self.thread = threading.Thread(target=debugging_server, args=serv_args)
    1399          self.thread.start()
    1400  
    1401          # wait until server thread has assigned a port number
    1402          self.serv_evt.wait()
    1403          self.serv_evt.clear()
    1404  
    1405      def tearDown(self):
    1406          socket.getfqdn = self.real_getfqdn
    1407          # indicate that the client is finished
    1408          self.client_evt.set()
    1409          # wait for the server thread to terminate
    1410          self.serv_evt.wait()
    1411          threading_helper.join_thread(self.thread)
    1412          del self.thread
    1413          self.doCleanups()
    1414          threading_helper.threading_cleanup(*self.thread_key)
    1415  
    1416      def test_test_server_supports_extensions(self):
    1417          smtp = smtplib.SMTP(
    1418              HOST, self.port, local_hostname='localhost',
    1419              timeout=support.LOOPBACK_TIMEOUT)
    1420          self.addCleanup(smtp.close)
    1421          smtp.ehlo()
    1422          self.assertTrue(smtp.does_esmtp)
    1423          self.assertTrue(smtp.has_extn('smtputf8'))
    1424  
    1425      def test_send_unicode_with_SMTPUTF8_via_sendmail(self):
    1426          m = '¡a test message containing unicode!'.encode('utf-8')
    1427          smtp = smtplib.SMTP(
    1428              HOST, self.port, local_hostname='localhost',
    1429              timeout=support.LOOPBACK_TIMEOUT)
    1430          self.addCleanup(smtp.close)
    1431          smtp.sendmail('Jőhn', 'Sálly', m,
    1432                        mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
    1433          self.assertEqual(self.serv.last_mailfrom, 'Jőhn')
    1434          self.assertEqual(self.serv.last_rcpttos, ['Sálly'])
    1435          self.assertEqual(self.serv.last_message, m)
    1436          self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
    1437          self.assertIn('SMTPUTF8', self.serv.last_mail_options)
    1438          self.assertEqual(self.serv.last_rcpt_options, [])
    1439  
    1440      def test_send_unicode_with_SMTPUTF8_via_low_level_API(self):
    1441          m = '¡a test message containing unicode!'.encode('utf-8')
    1442          smtp = smtplib.SMTP(
    1443              HOST, self.port, local_hostname='localhost',
    1444              timeout=support.LOOPBACK_TIMEOUT)
    1445          self.addCleanup(smtp.close)
    1446          smtp.ehlo()
    1447          self.assertEqual(
    1448              smtp.mail('', options=['BODY=8BITMIME', 'SMTPUTF8']),
    1449              (250, b'OK'))
    1450          self.assertEqual(smtp.rcpt('János'), (250, b'OK'))
    1451          self.assertEqual(smtp.data(m), (250, b'OK'))
    1452          self.assertEqual(self.serv.last_mailfrom, '')
    1453          self.assertEqual(self.serv.last_rcpttos, ['János'])
    1454          self.assertEqual(self.serv.last_message, m)
    1455          self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
    1456          self.assertIn('SMTPUTF8', self.serv.last_mail_options)
    1457          self.assertEqual(self.serv.last_rcpt_options, [])
    1458  
    1459      def test_send_message_uses_smtputf8_if_addrs_non_ascii(self):
    1460          msg = EmailMessage()
    1461          msg['From'] = "Páolo <főo@bar.com>"
    1462          msg['To'] = 'Dinsdale'
    1463          msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
    1464          # XXX I don't know why I need two \n's here, but this is an existing
    1465          # bug (if it is one) and not a problem with the new functionality.
    1466          msg.set_content("oh là là, know what I mean, know what I mean?\n\n")
    1467          # XXX smtpd converts received /r/n to /n, so we can't easily test that
    1468          # we are successfully sending /r/n :(.
    1469          expected = textwrap.dedent("""\
    1470              From: Páolo <főo@bar.com>
    1471              To: Dinsdale
    1472              Subject: Nudge nudge, wink, wink \u1F609
    1473              Content-Type: text/plain; charset="utf-8"
    1474              Content-Transfer-Encoding: 8bit
    1475              MIME-Version: 1.0
    1476  
    1477              oh là là, know what I mean, know what I mean?
    1478              """)
    1479          smtp = smtplib.SMTP(
    1480              HOST, self.port, local_hostname='localhost',
    1481              timeout=support.LOOPBACK_TIMEOUT)
    1482          self.addCleanup(smtp.close)
    1483          self.assertEqual(smtp.send_message(msg), {})
    1484          self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com')
    1485          self.assertEqual(self.serv.last_rcpttos, ['Dinsdale'])
    1486          self.assertEqual(self.serv.last_message.decode(), expected)
    1487          self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
    1488          self.assertIn('SMTPUTF8', self.serv.last_mail_options)
    1489          self.assertEqual(self.serv.last_rcpt_options, [])
    1490  
    1491  
    1492  EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='')
    1493  
    1494  class ESC[4;38;5;81mSimSMTPAUTHInitialResponseChannel(ESC[4;38;5;149mSimSMTPChannel):
    1495      def smtp_AUTH(self, arg):
    1496          # RFC 4954's AUTH command allows for an optional initial-response.
    1497          # Not all AUTH methods support this; some require a challenge.  AUTH
    1498          # PLAIN does those, so test that here.  See issue #15014.
    1499          args = arg.split()
    1500          if args[0].lower() == 'plain':
    1501              if len(args) == 2:
    1502                  # AUTH PLAIN <initial-response> with the response base 64
    1503                  # encoded.  Hard code the expected response for the test.
    1504                  if args[1] == EXPECTED_RESPONSE:
    1505                      self.push('235 Ok')
    1506                      return
    1507          self.push('571 Bad authentication')
    1508  
    1509  class ESC[4;38;5;81mSimSMTPAUTHInitialResponseServer(ESC[4;38;5;149mSimSMTPServer):
    1510      channel_class = SimSMTPAUTHInitialResponseChannel
    1511  
    1512  
    1513  class ESC[4;38;5;81mSMTPAUTHInitialResponseSimTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1514      def setUp(self):
    1515          self.thread_key = threading_helper.threading_setup()
    1516          self.real_getfqdn = socket.getfqdn
    1517          socket.getfqdn = mock_socket.getfqdn
    1518          self.serv_evt = threading.Event()
    1519          self.client_evt = threading.Event()
    1520          # Pick a random unused port by passing 0 for the port number
    1521          self.serv = SimSMTPAUTHInitialResponseServer(
    1522              (HOST, 0), ('nowhere', -1), decode_data=True)
    1523          # Keep a note of what port was assigned
    1524          self.port = self.serv.socket.getsockname()[1]
    1525          serv_args = (self.serv, self.serv_evt, self.client_evt)
    1526          self.thread = threading.Thread(target=debugging_server, args=serv_args)
    1527          self.thread.start()
    1528  
    1529          # wait until server thread has assigned a port number
    1530          self.serv_evt.wait()
    1531          self.serv_evt.clear()
    1532  
    1533      def tearDown(self):
    1534          socket.getfqdn = self.real_getfqdn
    1535          # indicate that the client is finished
    1536          self.client_evt.set()
    1537          # wait for the server thread to terminate
    1538          self.serv_evt.wait()
    1539          threading_helper.join_thread(self.thread)
    1540          del self.thread
    1541          self.doCleanups()
    1542          threading_helper.threading_cleanup(*self.thread_key)
    1543  
    1544      def testAUTH_PLAIN_initial_response_login(self):
    1545          self.serv.add_feature('AUTH PLAIN')
    1546          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1547                              timeout=support.LOOPBACK_TIMEOUT)
    1548          smtp.login('psu', 'doesnotexist')
    1549          smtp.close()
    1550  
    1551      def testAUTH_PLAIN_initial_response_auth(self):
    1552          self.serv.add_feature('AUTH PLAIN')
    1553          smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
    1554                              timeout=support.LOOPBACK_TIMEOUT)
    1555          smtp.user = 'psu'
    1556          smtp.password = 'doesnotexist'
    1557          code, response = smtp.auth('plain', smtp.auth_plain)
    1558          smtp.close()
    1559          self.assertEqual(code, 235)
    1560  
    1561  
    1562  if __name__ == '__main__':
    1563      unittest.main()