(root)/
Python-3.12.0/
Lib/
test/
test_poplib.py
       1  """Test script for poplib module."""
       2  
       3  # Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL
       4  # a real test suite
       5  
       6  import poplib
       7  import socket
       8  import os
       9  import errno
      10  import threading
      11  
      12  import unittest
      13  from unittest import TestCase, skipUnless
      14  from test import support as test_support
      15  from test.support import hashlib_helper
      16  from test.support import socket_helper
      17  from test.support import threading_helper
      18  from test.support import asynchat
      19  from test.support import asyncore
      20  
      21  
      22  test_support.requires_working_socket(module=True)
      23  
      24  HOST = socket_helper.HOST
      25  PORT = 0
      26  
      27  SUPPORTS_SSL = False
      28  if hasattr(poplib, 'POP3_SSL'):
      29      import ssl
      30  
      31      SUPPORTS_SSL = True
      32      CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem")
      33      CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem")
      34  
      35  requires_ssl = skipUnless(SUPPORTS_SSL, 'SSL not supported')
      36  
      37  # the dummy data returned by server when LIST and RETR commands are issued
      38  LIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n'
      39  RETR_RESP = b"""From: postmaster@python.org\
      40  \r\nContent-Type: text/plain\r\n\
      41  MIME-Version: 1.0\r\n\
      42  Subject: Dummy\r\n\
      43  \r\n\
      44  line1\r\n\
      45  line2\r\n\
      46  line3\r\n\
      47  .\r\n"""
      48  
      49  
      50  class ESC[4;38;5;81mDummyPOP3Handler(ESC[4;38;5;149masynchatESC[4;38;5;149m.ESC[4;38;5;149masync_chat):
      51  
      52      CAPAS = {'UIDL': [], 'IMPLEMENTATION': ['python-testlib-pop-server']}
      53      enable_UTF8 = False
      54  
      55      def __init__(self, conn):
      56          asynchat.async_chat.__init__(self, conn)
      57          self.set_terminator(b"\r\n")
      58          self.in_buffer = []
      59          self.push('+OK dummy pop3 server ready. <timestamp>')
      60          self.tls_active = False
      61          self.tls_starting = False
      62  
      63      def collect_incoming_data(self, data):
      64          self.in_buffer.append(data)
      65  
      66      def found_terminator(self):
      67          line = b''.join(self.in_buffer)
      68          line = str(line, 'ISO-8859-1')
      69          self.in_buffer = []
      70          cmd = line.split(' ')[0].lower()
      71          space = line.find(' ')
      72          if space != -1:
      73              arg = line[space + 1:]
      74          else:
      75              arg = ""
      76          if hasattr(self, 'cmd_' + cmd):
      77              method = getattr(self, 'cmd_' + cmd)
      78              method(arg)
      79          else:
      80              self.push('-ERR unrecognized POP3 command "%s".' %cmd)
      81  
      82      def handle_error(self):
      83          raise
      84  
      85      def push(self, data):
      86          asynchat.async_chat.push(self, data.encode("ISO-8859-1") + b'\r\n')
      87  
      88      def cmd_echo(self, arg):
      89          # sends back the received string (used by the test suite)
      90          self.push(arg)
      91  
      92      def cmd_user(self, arg):
      93          if arg != "guido":
      94              self.push("-ERR no such user")
      95          self.push('+OK password required')
      96  
      97      def cmd_pass(self, arg):
      98          if arg != "python":
      99              self.push("-ERR wrong password")
     100          self.push('+OK 10 messages')
     101  
     102      def cmd_stat(self, arg):
     103          self.push('+OK 10 100')
     104  
     105      def cmd_list(self, arg):
     106          if arg:
     107              self.push('+OK %s %s' % (arg, arg))
     108          else:
     109              self.push('+OK')
     110              asynchat.async_chat.push(self, LIST_RESP)
     111  
     112      cmd_uidl = cmd_list
     113  
     114      def cmd_retr(self, arg):
     115          self.push('+OK %s bytes' %len(RETR_RESP))
     116          asynchat.async_chat.push(self, RETR_RESP)
     117  
     118      cmd_top = cmd_retr
     119  
     120      def cmd_dele(self, arg):
     121          self.push('+OK message marked for deletion.')
     122  
     123      def cmd_noop(self, arg):
     124          self.push('+OK done nothing.')
     125  
     126      def cmd_rpop(self, arg):
     127          self.push('+OK done nothing.')
     128  
     129      def cmd_apop(self, arg):
     130          self.push('+OK done nothing.')
     131  
     132      def cmd_quit(self, arg):
     133          self.push('+OK closing.')
     134          self.close_when_done()
     135  
     136      def _get_capas(self):
     137          _capas = dict(self.CAPAS)
     138          if not self.tls_active and SUPPORTS_SSL:
     139              _capas['STLS'] = []
     140          return _capas
     141  
     142      def cmd_capa(self, arg):
     143          self.push('+OK Capability list follows')
     144          if self._get_capas():
     145              for cap, params in self._get_capas().items():
     146                  _ln = [cap]
     147                  if params:
     148                      _ln.extend(params)
     149                  self.push(' '.join(_ln))
     150          self.push('.')
     151  
     152      def cmd_utf8(self, arg):
     153          self.push('+OK I know RFC6856'
     154                    if self.enable_UTF8
     155                    else '-ERR What is UTF8?!')
     156  
     157      if SUPPORTS_SSL:
     158  
     159          def cmd_stls(self, arg):
     160              if self.tls_active is False:
     161                  self.push('+OK Begin TLS negotiation')
     162                  context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
     163                  context.load_cert_chain(CERTFILE)
     164                  tls_sock = context.wrap_socket(self.socket,
     165                                                 server_side=True,
     166                                                 do_handshake_on_connect=False,
     167                                                 suppress_ragged_eofs=False)
     168                  self.del_channel()
     169                  self.set_socket(tls_sock)
     170                  self.tls_active = True
     171                  self.tls_starting = True
     172                  self.in_buffer = []
     173                  self._do_tls_handshake()
     174              else:
     175                  self.push('-ERR Command not permitted when TLS active')
     176  
     177          def _do_tls_handshake(self):
     178              try:
     179                  self.socket.do_handshake()
     180              except ssl.SSLError as err:
     181                  if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
     182                                     ssl.SSL_ERROR_WANT_WRITE):
     183                      return
     184                  elif err.args[0] == ssl.SSL_ERROR_EOF:
     185                      return self.handle_close()
     186                  # TODO: SSLError does not expose alert information
     187                  elif ("SSLV3_ALERT_BAD_CERTIFICATE" in err.args[1] or
     188                        "SSLV3_ALERT_CERTIFICATE_UNKNOWN" in err.args[1]):
     189                      return self.handle_close()
     190                  raise
     191              except OSError as err:
     192                  if err.args[0] == errno.ECONNABORTED:
     193                      return self.handle_close()
     194              else:
     195                  self.tls_active = True
     196                  self.tls_starting = False
     197  
     198          def handle_read(self):
     199              if self.tls_starting:
     200                  self._do_tls_handshake()
     201              else:
     202                  try:
     203                      asynchat.async_chat.handle_read(self)
     204                  except ssl.SSLEOFError:
     205                      self.handle_close()
     206  
     207  class ESC[4;38;5;81mDummyPOP3Server(ESC[4;38;5;149masyncoreESC[4;38;5;149m.ESC[4;38;5;149mdispatcher, ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
     208  
     209      handler = DummyPOP3Handler
     210  
     211      def __init__(self, address, af=socket.AF_INET):
     212          threading.Thread.__init__(self)
     213          asyncore.dispatcher.__init__(self)
     214          self.daemon = True
     215          self.create_socket(af, socket.SOCK_STREAM)
     216          self.bind(address)
     217          self.listen(5)
     218          self.active = False
     219          self.active_lock = threading.Lock()
     220          self.host, self.port = self.socket.getsockname()[:2]
     221          self.handler_instance = None
     222  
     223      def start(self):
     224          assert not self.active
     225          self.__flag = threading.Event()
     226          threading.Thread.start(self)
     227          self.__flag.wait()
     228  
     229      def run(self):
     230          self.active = True
     231          self.__flag.set()
     232          try:
     233              while self.active and asyncore.socket_map:
     234                  with self.active_lock:
     235                      asyncore.loop(timeout=0.1, count=1)
     236          finally:
     237              asyncore.close_all(ignore_all=True)
     238  
     239      def stop(self):
     240          assert self.active
     241          self.active = False
     242          self.join()
     243  
     244      def handle_accepted(self, conn, addr):
     245          self.handler_instance = self.handler(conn)
     246  
     247      def handle_connect(self):
     248          self.close()
     249      handle_read = handle_connect
     250  
     251      def writable(self):
     252          return 0
     253  
     254      def handle_error(self):
     255          raise
     256  
     257  
     258  class ESC[4;38;5;81mTestPOP3Class(ESC[4;38;5;149mTestCase):
     259      def assertOK(self, resp):
     260          self.assertTrue(resp.startswith(b"+OK"))
     261  
     262      def setUp(self):
     263          self.server = DummyPOP3Server((HOST, PORT))
     264          self.server.start()
     265          self.client = poplib.POP3(self.server.host, self.server.port,
     266                                    timeout=test_support.LOOPBACK_TIMEOUT)
     267  
     268      def tearDown(self):
     269          self.client.close()
     270          self.server.stop()
     271          # Explicitly clear the attribute to prevent dangling thread
     272          self.server = None
     273  
     274      def test_getwelcome(self):
     275          self.assertEqual(self.client.getwelcome(),
     276                           b'+OK dummy pop3 server ready. <timestamp>')
     277  
     278      def test_exceptions(self):
     279          self.assertRaises(poplib.error_proto, self.client._shortcmd, 'echo -err')
     280  
     281      def test_user(self):
     282          self.assertOK(self.client.user('guido'))
     283          self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
     284  
     285      def test_pass_(self):
     286          self.assertOK(self.client.pass_('python'))
     287          self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
     288  
     289      def test_stat(self):
     290          self.assertEqual(self.client.stat(), (10, 100))
     291  
     292      def test_list(self):
     293          self.assertEqual(self.client.list()[1:],
     294                           ([b'1 1', b'2 2', b'3 3', b'4 4', b'5 5'],
     295                            25))
     296          self.assertTrue(self.client.list('1').endswith(b"OK 1 1"))
     297  
     298      def test_retr(self):
     299          expected = (b'+OK 116 bytes',
     300                      [b'From: postmaster@python.org', b'Content-Type: text/plain',
     301                       b'MIME-Version: 1.0', b'Subject: Dummy',
     302                       b'', b'line1', b'line2', b'line3'],
     303                      113)
     304          foo = self.client.retr('foo')
     305          self.assertEqual(foo, expected)
     306  
     307      def test_too_long_lines(self):
     308          self.assertRaises(poplib.error_proto, self.client._shortcmd,
     309                            'echo +%s' % ((poplib._MAXLINE + 10) * 'a'))
     310  
     311      def test_dele(self):
     312          self.assertOK(self.client.dele('foo'))
     313  
     314      def test_noop(self):
     315          self.assertOK(self.client.noop())
     316  
     317      def test_rpop(self):
     318          self.assertOK(self.client.rpop('foo'))
     319  
     320      @hashlib_helper.requires_hashdigest('md5', openssl=True)
     321      def test_apop_normal(self):
     322          self.assertOK(self.client.apop('foo', 'dummypassword'))
     323  
     324      @hashlib_helper.requires_hashdigest('md5', openssl=True)
     325      def test_apop_REDOS(self):
     326          # Replace welcome with very long evil welcome.
     327          # NB The upper bound on welcome length is currently 2048.
     328          # At this length, evil input makes each apop call take
     329          # on the order of milliseconds instead of microseconds.
     330          evil_welcome = b'+OK' + (b'<' * 1000000)
     331          with test_support.swap_attr(self.client, 'welcome', evil_welcome):
     332              # The evil welcome is invalid, so apop should throw.
     333              self.assertRaises(poplib.error_proto, self.client.apop, 'a', 'kb')
     334  
     335      def test_top(self):
     336          expected =  (b'+OK 116 bytes',
     337                       [b'From: postmaster@python.org', b'Content-Type: text/plain',
     338                        b'MIME-Version: 1.0', b'Subject: Dummy', b'',
     339                        b'line1', b'line2', b'line3'],
     340                       113)
     341          self.assertEqual(self.client.top(1, 1), expected)
     342  
     343      def test_uidl(self):
     344          self.client.uidl()
     345          self.client.uidl('foo')
     346  
     347      def test_utf8_raises_if_unsupported(self):
     348          self.server.handler.enable_UTF8 = False
     349          self.assertRaises(poplib.error_proto, self.client.utf8)
     350  
     351      def test_utf8(self):
     352          self.server.handler.enable_UTF8 = True
     353          expected = b'+OK I know RFC6856'
     354          result = self.client.utf8()
     355          self.assertEqual(result, expected)
     356  
     357      def test_capa(self):
     358          capa = self.client.capa()
     359          self.assertTrue('IMPLEMENTATION' in capa.keys())
     360  
     361      def test_quit(self):
     362          resp = self.client.quit()
     363          self.assertTrue(resp)
     364          self.assertIsNone(self.client.sock)
     365          self.assertIsNone(self.client.file)
     366  
     367      @requires_ssl
     368      def test_stls_capa(self):
     369          capa = self.client.capa()
     370          self.assertTrue('STLS' in capa.keys())
     371  
     372      @requires_ssl
     373      def test_stls(self):
     374          expected = b'+OK Begin TLS negotiation'
     375          resp = self.client.stls()
     376          self.assertEqual(resp, expected)
     377  
     378      @requires_ssl
     379      def test_stls_context(self):
     380          expected = b'+OK Begin TLS negotiation'
     381          ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
     382          ctx.load_verify_locations(CAFILE)
     383          self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
     384          self.assertEqual(ctx.check_hostname, True)
     385          with self.assertRaises(ssl.CertificateError):
     386              resp = self.client.stls(context=ctx)
     387          self.client = poplib.POP3("localhost", self.server.port,
     388                                    timeout=test_support.LOOPBACK_TIMEOUT)
     389          resp = self.client.stls(context=ctx)
     390          self.assertEqual(resp, expected)
     391  
     392  
     393  if SUPPORTS_SSL:
     394      from test.test_ftplib import SSLConnection
     395  
     396      class ESC[4;38;5;81mDummyPOP3_SSLHandler(ESC[4;38;5;149mSSLConnection, ESC[4;38;5;149mDummyPOP3Handler):
     397  
     398          def __init__(self, conn):
     399              asynchat.async_chat.__init__(self, conn)
     400              self.secure_connection()
     401              self.set_terminator(b"\r\n")
     402              self.in_buffer = []
     403              self.push('+OK dummy pop3 server ready. <timestamp>')
     404              self.tls_active = True
     405              self.tls_starting = False
     406  
     407  
     408  @requires_ssl
     409  class ESC[4;38;5;81mTestPOP3_SSLClass(ESC[4;38;5;149mTestPOP3Class):
     410      # repeat previous tests by using poplib.POP3_SSL
     411  
     412      def setUp(self):
     413          self.server = DummyPOP3Server((HOST, PORT))
     414          self.server.handler = DummyPOP3_SSLHandler
     415          self.server.start()
     416          self.client = poplib.POP3_SSL(self.server.host, self.server.port)
     417  
     418      def test__all__(self):
     419          self.assertIn('POP3_SSL', poplib.__all__)
     420  
     421      def test_context(self):
     422          ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
     423          ctx.check_hostname = False
     424          ctx.verify_mode = ssl.CERT_NONE
     425  
     426          self.client.quit()
     427          self.client = poplib.POP3_SSL(self.server.host, self.server.port,
     428                                          context=ctx)
     429          self.assertIsInstance(self.client.sock, ssl.SSLSocket)
     430          self.assertIs(self.client.sock.context, ctx)
     431          self.assertTrue(self.client.noop().startswith(b'+OK'))
     432  
     433      def test_stls(self):
     434          self.assertRaises(poplib.error_proto, self.client.stls)
     435  
     436      test_stls_context = test_stls
     437  
     438      def test_stls_capa(self):
     439          capa = self.client.capa()
     440          self.assertFalse('STLS' in capa.keys())
     441  
     442  
     443  @requires_ssl
     444  class ESC[4;38;5;81mTestPOP3_TLSClass(ESC[4;38;5;149mTestPOP3Class):
     445      # repeat previous tests by using poplib.POP3.stls()
     446  
     447      def setUp(self):
     448          self.server = DummyPOP3Server((HOST, PORT))
     449          self.server.start()
     450          self.client = poplib.POP3(self.server.host, self.server.port,
     451                                    timeout=test_support.LOOPBACK_TIMEOUT)
     452          self.client.stls()
     453  
     454      def tearDown(self):
     455          if self.client.file is not None and self.client.sock is not None:
     456              try:
     457                  self.client.quit()
     458              except poplib.error_proto:
     459                  # happens in the test_too_long_lines case; the overlong
     460                  # response will be treated as response to QUIT and raise
     461                  # this exception
     462                  self.client.close()
     463          self.server.stop()
     464          # Explicitly clear the attribute to prevent dangling thread
     465          self.server = None
     466  
     467      def test_stls(self):
     468          self.assertRaises(poplib.error_proto, self.client.stls)
     469  
     470      test_stls_context = test_stls
     471  
     472      def test_stls_capa(self):
     473          capa = self.client.capa()
     474          self.assertFalse(b'STLS' in capa.keys())
     475  
     476  
     477  class ESC[4;38;5;81mTestTimeouts(ESC[4;38;5;149mTestCase):
     478  
     479      def setUp(self):
     480          self.evt = threading.Event()
     481          self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     482          self.sock.settimeout(60)  # Safety net. Look issue 11812
     483          self.port = socket_helper.bind_port(self.sock)
     484          self.thread = threading.Thread(target=self.server, args=(self.evt, self.sock))
     485          self.thread.daemon = True
     486          self.thread.start()
     487          self.evt.wait()
     488  
     489      def tearDown(self):
     490          self.thread.join()
     491          # Explicitly clear the attribute to prevent dangling thread
     492          self.thread = None
     493  
     494      def server(self, evt, serv):
     495          serv.listen()
     496          evt.set()
     497          try:
     498              conn, addr = serv.accept()
     499              conn.send(b"+ Hola mundo\n")
     500              conn.close()
     501          except TimeoutError:
     502              pass
     503          finally:
     504              serv.close()
     505  
     506      def testTimeoutDefault(self):
     507          self.assertIsNone(socket.getdefaulttimeout())
     508          socket.setdefaulttimeout(test_support.LOOPBACK_TIMEOUT)
     509          try:
     510              pop = poplib.POP3(HOST, self.port)
     511          finally:
     512              socket.setdefaulttimeout(None)
     513          self.assertEqual(pop.sock.gettimeout(), test_support.LOOPBACK_TIMEOUT)
     514          pop.close()
     515  
     516      def testTimeoutNone(self):
     517          self.assertIsNone(socket.getdefaulttimeout())
     518          socket.setdefaulttimeout(30)
     519          try:
     520              pop = poplib.POP3(HOST, self.port, timeout=None)
     521          finally:
     522              socket.setdefaulttimeout(None)
     523          self.assertIsNone(pop.sock.gettimeout())
     524          pop.close()
     525  
     526      def testTimeoutValue(self):
     527          pop = poplib.POP3(HOST, self.port, timeout=test_support.LOOPBACK_TIMEOUT)
     528          self.assertEqual(pop.sock.gettimeout(), test_support.LOOPBACK_TIMEOUT)
     529          pop.close()
     530          with self.assertRaises(ValueError):
     531              poplib.POP3(HOST, self.port, timeout=0)
     532  
     533  
     534  def setUpModule():
     535      thread_info = threading_helper.threading_setup()
     536      unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
     537  
     538  
     539  if __name__ == '__main__':
     540      unittest.main()