(root)/
Python-3.11.7/
Lib/
test/
test_poplib.py
       1  """Test script for poplib module."""
       2  
       3  # Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL
       4  # a real test suite
       5  
       6  import poplib
       7  import socket
       8  import os
       9  import errno
      10  import threading
      11  
      12  import unittest
      13  from unittest import TestCase, skipUnless
      14  from test import support as test_support
      15  from test.support import hashlib_helper
      16  from test.support import socket_helper
      17  from test.support import threading_helper
      18  from test.support import warnings_helper
      19  
      20  
      21  asynchat = warnings_helper.import_deprecated('asynchat')
      22  asyncore = warnings_helper.import_deprecated('asyncore')
      23  
      24  
      25  test_support.requires_working_socket(module=True)
      26  
      27  HOST = socket_helper.HOST
      28  PORT = 0
      29  
      30  SUPPORTS_SSL = False
      31  if hasattr(poplib, 'POP3_SSL'):
      32      import ssl
      33  
      34      SUPPORTS_SSL = True
      35      CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "certdata", "keycert3.pem")
      36      CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "certdata", "pycacert.pem")
      37  
      38  requires_ssl = skipUnless(SUPPORTS_SSL, 'SSL not supported')
      39  
      40  # the dummy data returned by server when LIST and RETR commands are issued
      41  LIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n'
      42  RETR_RESP = b"""From: postmaster@python.org\
      43  \r\nContent-Type: text/plain\r\n\
      44  MIME-Version: 1.0\r\n\
      45  Subject: Dummy\r\n\
      46  \r\n\
      47  line1\r\n\
      48  line2\r\n\
      49  line3\r\n\
      50  .\r\n"""
      51  
      52  
      53  class ESC[4;38;5;81mDummyPOP3Handler(ESC[4;38;5;149masynchatESC[4;38;5;149m.ESC[4;38;5;149masync_chat):
      54  
      55      CAPAS = {'UIDL': [], 'IMPLEMENTATION': ['python-testlib-pop-server']}
      56      enable_UTF8 = False
      57  
      58      def __init__(self, conn):
      59          asynchat.async_chat.__init__(self, conn)
      60          self.set_terminator(b"\r\n")
      61          self.in_buffer = []
      62          self.push('+OK dummy pop3 server ready. <timestamp>')
      63          self.tls_active = False
      64          self.tls_starting = False
      65  
      66      def collect_incoming_data(self, data):
      67          self.in_buffer.append(data)
      68  
      69      def found_terminator(self):
      70          line = b''.join(self.in_buffer)
      71          line = str(line, 'ISO-8859-1')
      72          self.in_buffer = []
      73          cmd = line.split(' ')[0].lower()
      74          space = line.find(' ')
      75          if space != -1:
      76              arg = line[space + 1:]
      77          else:
      78              arg = ""
      79          if hasattr(self, 'cmd_' + cmd):
      80              method = getattr(self, 'cmd_' + cmd)
      81              method(arg)
      82          else:
      83              self.push('-ERR unrecognized POP3 command "%s".' %cmd)
      84  
      85      def handle_error(self):
      86          raise
      87  
      88      def push(self, data):
      89          asynchat.async_chat.push(self, data.encode("ISO-8859-1") + b'\r\n')
      90  
      91      def cmd_echo(self, arg):
      92          # sends back the received string (used by the test suite)
      93          self.push(arg)
      94  
      95      def cmd_user(self, arg):
      96          if arg != "guido":
      97              self.push("-ERR no such user")
      98          self.push('+OK password required')
      99  
     100      def cmd_pass(self, arg):
     101          if arg != "python":
     102              self.push("-ERR wrong password")
     103          self.push('+OK 10 messages')
     104  
     105      def cmd_stat(self, arg):
     106          self.push('+OK 10 100')
     107  
     108      def cmd_list(self, arg):
     109          if arg:
     110              self.push('+OK %s %s' % (arg, arg))
     111          else:
     112              self.push('+OK')
     113              asynchat.async_chat.push(self, LIST_RESP)
     114  
     115      cmd_uidl = cmd_list
     116  
     117      def cmd_retr(self, arg):
     118          self.push('+OK %s bytes' %len(RETR_RESP))
     119          asynchat.async_chat.push(self, RETR_RESP)
     120  
     121      cmd_top = cmd_retr
     122  
     123      def cmd_dele(self, arg):
     124          self.push('+OK message marked for deletion.')
     125  
     126      def cmd_noop(self, arg):
     127          self.push('+OK done nothing.')
     128  
     129      def cmd_rpop(self, arg):
     130          self.push('+OK done nothing.')
     131  
     132      def cmd_apop(self, arg):
     133          self.push('+OK done nothing.')
     134  
     135      def cmd_quit(self, arg):
     136          self.push('+OK closing.')
     137          self.close_when_done()
     138  
     139      def _get_capas(self):
     140          _capas = dict(self.CAPAS)
     141          if not self.tls_active and SUPPORTS_SSL:
     142              _capas['STLS'] = []
     143          return _capas
     144  
     145      def cmd_capa(self, arg):
     146          self.push('+OK Capability list follows')
     147          if self._get_capas():
     148              for cap, params in self._get_capas().items():
     149                  _ln = [cap]
     150                  if params:
     151                      _ln.extend(params)
     152                  self.push(' '.join(_ln))
     153          self.push('.')
     154  
     155      def cmd_utf8(self, arg):
     156          self.push('+OK I know RFC6856'
     157                    if self.enable_UTF8
     158                    else '-ERR What is UTF8?!')
     159  
     160      if SUPPORTS_SSL:
     161  
     162          def cmd_stls(self, arg):
     163              if self.tls_active is False:
     164                  self.push('+OK Begin TLS negotiation')
     165                  context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
     166                  context.load_cert_chain(CERTFILE)
     167                  tls_sock = context.wrap_socket(self.socket,
     168                                                 server_side=True,
     169                                                 do_handshake_on_connect=False,
     170                                                 suppress_ragged_eofs=False)
     171                  self.del_channel()
     172                  self.set_socket(tls_sock)
     173                  self.tls_active = True
     174                  self.tls_starting = True
     175                  self.in_buffer = []
     176                  self._do_tls_handshake()
     177              else:
     178                  self.push('-ERR Command not permitted when TLS active')
     179  
     180          def _do_tls_handshake(self):
     181              try:
     182                  self.socket.do_handshake()
     183              except ssl.SSLError as err:
     184                  if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
     185                                     ssl.SSL_ERROR_WANT_WRITE):
     186                      return
     187                  elif err.args[0] == ssl.SSL_ERROR_EOF:
     188                      return self.handle_close()
     189                  # TODO: SSLError does not expose alert information
     190                  elif ("SSLV3_ALERT_BAD_CERTIFICATE" in err.args[1] or
     191                        "SSLV3_ALERT_CERTIFICATE_UNKNOWN" in err.args[1]):
     192                      return self.handle_close()
     193                  raise
     194              except OSError as err:
     195                  if err.args[0] == errno.ECONNABORTED:
     196                      return self.handle_close()
     197              else:
     198                  self.tls_active = True
     199                  self.tls_starting = False
     200  
     201          def handle_read(self):
     202              if self.tls_starting:
     203                  self._do_tls_handshake()
     204              else:
     205                  try:
     206                      asynchat.async_chat.handle_read(self)
     207                  except ssl.SSLEOFError:
     208                      self.handle_close()
     209  
     210  class ESC[4;38;5;81mDummyPOP3Server(ESC[4;38;5;149masyncoreESC[4;38;5;149m.ESC[4;38;5;149mdispatcher, ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
     211  
     212      handler = DummyPOP3Handler
     213  
     214      def __init__(self, address, af=socket.AF_INET):
     215          threading.Thread.__init__(self)
     216          asyncore.dispatcher.__init__(self)
     217          self.daemon = True
     218          self.create_socket(af, socket.SOCK_STREAM)
     219          self.bind(address)
     220          self.listen(5)
     221          self.active = False
     222          self.active_lock = threading.Lock()
     223          self.host, self.port = self.socket.getsockname()[:2]
     224          self.handler_instance = None
     225  
     226      def start(self):
     227          assert not self.active
     228          self.__flag = threading.Event()
     229          threading.Thread.start(self)
     230          self.__flag.wait()
     231  
     232      def run(self):
     233          self.active = True
     234          self.__flag.set()
     235          try:
     236              while self.active and asyncore.socket_map:
     237                  with self.active_lock:
     238                      asyncore.loop(timeout=0.1, count=1)
     239          finally:
     240              asyncore.close_all(ignore_all=True)
     241  
     242      def stop(self):
     243          assert self.active
     244          self.active = False
     245          self.join()
     246  
     247      def handle_accepted(self, conn, addr):
     248          self.handler_instance = self.handler(conn)
     249  
     250      def handle_connect(self):
     251          self.close()
     252      handle_read = handle_connect
     253  
     254      def writable(self):
     255          return 0
     256  
     257      def handle_error(self):
     258          raise
     259  
     260  
     261  class ESC[4;38;5;81mTestPOP3Class(ESC[4;38;5;149mTestCase):
     262      def assertOK(self, resp):
     263          self.assertTrue(resp.startswith(b"+OK"))
     264  
     265      def setUp(self):
     266          self.server = DummyPOP3Server((HOST, PORT))
     267          self.server.start()
     268          self.client = poplib.POP3(self.server.host, self.server.port,
     269                                    timeout=test_support.LOOPBACK_TIMEOUT)
     270  
     271      def tearDown(self):
     272          self.client.close()
     273          self.server.stop()
     274          # Explicitly clear the attribute to prevent dangling thread
     275          self.server = None
     276  
     277      def test_getwelcome(self):
     278          self.assertEqual(self.client.getwelcome(),
     279                           b'+OK dummy pop3 server ready. <timestamp>')
     280  
     281      def test_exceptions(self):
     282          self.assertRaises(poplib.error_proto, self.client._shortcmd, 'echo -err')
     283  
     284      def test_user(self):
     285          self.assertOK(self.client.user('guido'))
     286          self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
     287  
     288      def test_pass_(self):
     289          self.assertOK(self.client.pass_('python'))
     290          self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
     291  
     292      def test_stat(self):
     293          self.assertEqual(self.client.stat(), (10, 100))
     294  
     295      def test_list(self):
     296          self.assertEqual(self.client.list()[1:],
     297                           ([b'1 1', b'2 2', b'3 3', b'4 4', b'5 5'],
     298                            25))
     299          self.assertTrue(self.client.list('1').endswith(b"OK 1 1"))
     300  
     301      def test_retr(self):
     302          expected = (b'+OK 116 bytes',
     303                      [b'From: postmaster@python.org', b'Content-Type: text/plain',
     304                       b'MIME-Version: 1.0', b'Subject: Dummy',
     305                       b'', b'line1', b'line2', b'line3'],
     306                      113)
     307          foo = self.client.retr('foo')
     308          self.assertEqual(foo, expected)
     309  
     310      def test_too_long_lines(self):
     311          self.assertRaises(poplib.error_proto, self.client._shortcmd,
     312                            'echo +%s' % ((poplib._MAXLINE + 10) * 'a'))
     313  
     314      def test_dele(self):
     315          self.assertOK(self.client.dele('foo'))
     316  
     317      def test_noop(self):
     318          self.assertOK(self.client.noop())
     319  
     320      def test_rpop(self):
     321          self.assertOK(self.client.rpop('foo'))
     322  
     323      @hashlib_helper.requires_hashdigest('md5', openssl=True)
     324      def test_apop_normal(self):
     325          self.assertOK(self.client.apop('foo', 'dummypassword'))
     326  
     327      @hashlib_helper.requires_hashdigest('md5', openssl=True)
     328      def test_apop_REDOS(self):
     329          # Replace welcome with very long evil welcome.
     330          # NB The upper bound on welcome length is currently 2048.
     331          # At this length, evil input makes each apop call take
     332          # on the order of milliseconds instead of microseconds.
     333          evil_welcome = b'+OK' + (b'<' * 1000000)
     334          with test_support.swap_attr(self.client, 'welcome', evil_welcome):
     335              # The evil welcome is invalid, so apop should throw.
     336              self.assertRaises(poplib.error_proto, self.client.apop, 'a', 'kb')
     337  
     338      def test_top(self):
     339          expected =  (b'+OK 116 bytes',
     340                       [b'From: postmaster@python.org', b'Content-Type: text/plain',
     341                        b'MIME-Version: 1.0', b'Subject: Dummy', b'',
     342                        b'line1', b'line2', b'line3'],
     343                       113)
     344          self.assertEqual(self.client.top(1, 1), expected)
     345  
     346      def test_uidl(self):
     347          self.client.uidl()
     348          self.client.uidl('foo')
     349  
     350      def test_utf8_raises_if_unsupported(self):
     351          self.server.handler.enable_UTF8 = False
     352          self.assertRaises(poplib.error_proto, self.client.utf8)
     353  
     354      def test_utf8(self):
     355          self.server.handler.enable_UTF8 = True
     356          expected = b'+OK I know RFC6856'
     357          result = self.client.utf8()
     358          self.assertEqual(result, expected)
     359  
     360      def test_capa(self):
     361          capa = self.client.capa()
     362          self.assertTrue('IMPLEMENTATION' in capa.keys())
     363  
     364      def test_quit(self):
     365          resp = self.client.quit()
     366          self.assertTrue(resp)
     367          self.assertIsNone(self.client.sock)
     368          self.assertIsNone(self.client.file)
     369  
     370      @requires_ssl
     371      def test_stls_capa(self):
     372          capa = self.client.capa()
     373          self.assertTrue('STLS' in capa.keys())
     374  
     375      @requires_ssl
     376      def test_stls(self):
     377          expected = b'+OK Begin TLS negotiation'
     378          resp = self.client.stls()
     379          self.assertEqual(resp, expected)
     380  
     381      @requires_ssl
     382      def test_stls_context(self):
     383          expected = b'+OK Begin TLS negotiation'
     384          ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
     385          ctx.load_verify_locations(CAFILE)
     386          self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
     387          self.assertEqual(ctx.check_hostname, True)
     388          with self.assertRaises(ssl.CertificateError):
     389              resp = self.client.stls(context=ctx)
     390          self.client = poplib.POP3("localhost", self.server.port,
     391                                    timeout=test_support.LOOPBACK_TIMEOUT)
     392          resp = self.client.stls(context=ctx)
     393          self.assertEqual(resp, expected)
     394  
     395  
     396  if SUPPORTS_SSL:
     397      from test.test_ftplib import SSLConnection
     398  
     399      class ESC[4;38;5;81mDummyPOP3_SSLHandler(ESC[4;38;5;149mSSLConnection, ESC[4;38;5;149mDummyPOP3Handler):
     400  
     401          def __init__(self, conn):
     402              asynchat.async_chat.__init__(self, conn)
     403              self.secure_connection()
     404              self.set_terminator(b"\r\n")
     405              self.in_buffer = []
     406              self.push('+OK dummy pop3 server ready. <timestamp>')
     407              self.tls_active = True
     408              self.tls_starting = False
     409  
     410  
     411  @requires_ssl
     412  class ESC[4;38;5;81mTestPOP3_SSLClass(ESC[4;38;5;149mTestPOP3Class):
     413      # repeat previous tests by using poplib.POP3_SSL
     414  
     415      def setUp(self):
     416          self.server = DummyPOP3Server((HOST, PORT))
     417          self.server.handler = DummyPOP3_SSLHandler
     418          self.server.start()
     419          self.client = poplib.POP3_SSL(self.server.host, self.server.port)
     420  
     421      def test__all__(self):
     422          self.assertIn('POP3_SSL', poplib.__all__)
     423  
     424      def test_context(self):
     425          ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
     426          ctx.check_hostname = False
     427          ctx.verify_mode = ssl.CERT_NONE
     428          self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
     429                              self.server.port, keyfile=CERTFILE, context=ctx)
     430          self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
     431                              self.server.port, certfile=CERTFILE, context=ctx)
     432          self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
     433                              self.server.port, keyfile=CERTFILE,
     434                              certfile=CERTFILE, context=ctx)
     435  
     436          self.client.quit()
     437          self.client = poplib.POP3_SSL(self.server.host, self.server.port,
     438                                          context=ctx)
     439          self.assertIsInstance(self.client.sock, ssl.SSLSocket)
     440          self.assertIs(self.client.sock.context, ctx)
     441          self.assertTrue(self.client.noop().startswith(b'+OK'))
     442  
     443      def test_stls(self):
     444          self.assertRaises(poplib.error_proto, self.client.stls)
     445  
     446      test_stls_context = test_stls
     447  
     448      def test_stls_capa(self):
     449          capa = self.client.capa()
     450          self.assertFalse('STLS' in capa.keys())
     451  
     452  
     453  @requires_ssl
     454  class ESC[4;38;5;81mTestPOP3_TLSClass(ESC[4;38;5;149mTestPOP3Class):
     455      # repeat previous tests by using poplib.POP3.stls()
     456  
     457      def setUp(self):
     458          self.server = DummyPOP3Server((HOST, PORT))
     459          self.server.start()
     460          self.client = poplib.POP3(self.server.host, self.server.port,
     461                                    timeout=test_support.LOOPBACK_TIMEOUT)
     462          self.client.stls()
     463  
     464      def tearDown(self):
     465          if self.client.file is not None and self.client.sock is not None:
     466              try:
     467                  self.client.quit()
     468              except poplib.error_proto:
     469                  # happens in the test_too_long_lines case; the overlong
     470                  # response will be treated as response to QUIT and raise
     471                  # this exception
     472                  self.client.close()
     473          self.server.stop()
     474          # Explicitly clear the attribute to prevent dangling thread
     475          self.server = None
     476  
     477      def test_stls(self):
     478          self.assertRaises(poplib.error_proto, self.client.stls)
     479  
     480      test_stls_context = test_stls
     481  
     482      def test_stls_capa(self):
     483          capa = self.client.capa()
     484          self.assertFalse(b'STLS' in capa.keys())
     485  
     486  
     487  class ESC[4;38;5;81mTestTimeouts(ESC[4;38;5;149mTestCase):
     488  
     489      def setUp(self):
     490          self.evt = threading.Event()
     491          self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     492          self.sock.settimeout(60)  # Safety net. Look issue 11812
     493          self.port = socket_helper.bind_port(self.sock)
     494          self.thread = threading.Thread(target=self.server, args=(self.evt, self.sock))
     495          self.thread.daemon = True
     496          self.thread.start()
     497          self.evt.wait()
     498  
     499      def tearDown(self):
     500          self.thread.join()
     501          # Explicitly clear the attribute to prevent dangling thread
     502          self.thread = None
     503  
     504      def server(self, evt, serv):
     505          serv.listen()
     506          evt.set()
     507          try:
     508              conn, addr = serv.accept()
     509              conn.send(b"+ Hola mundo\n")
     510              conn.close()
     511          except TimeoutError:
     512              pass
     513          finally:
     514              serv.close()
     515  
     516      def testTimeoutDefault(self):
     517          self.assertIsNone(socket.getdefaulttimeout())
     518          socket.setdefaulttimeout(test_support.LOOPBACK_TIMEOUT)
     519          try:
     520              pop = poplib.POP3(HOST, self.port)
     521          finally:
     522              socket.setdefaulttimeout(None)
     523          self.assertEqual(pop.sock.gettimeout(), test_support.LOOPBACK_TIMEOUT)
     524          pop.close()
     525  
     526      def testTimeoutNone(self):
     527          self.assertIsNone(socket.getdefaulttimeout())
     528          socket.setdefaulttimeout(30)
     529          try:
     530              pop = poplib.POP3(HOST, self.port, timeout=None)
     531          finally:
     532              socket.setdefaulttimeout(None)
     533          self.assertIsNone(pop.sock.gettimeout())
     534          pop.close()
     535  
     536      def testTimeoutValue(self):
     537          pop = poplib.POP3(HOST, self.port, timeout=test_support.LOOPBACK_TIMEOUT)
     538          self.assertEqual(pop.sock.gettimeout(), test_support.LOOPBACK_TIMEOUT)
     539          pop.close()
     540          with self.assertRaises(ValueError):
     541              poplib.POP3(HOST, self.port, timeout=0)
     542  
     543  
     544  def setUpModule():
     545      thread_info = threading_helper.threading_setup()
     546      unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
     547  
     548  
     549  if __name__ == '__main__':
     550      unittest.main()