(root)/
Python-3.11.7/
Lib/
test/
test_imaplib.py
       1  from test import support
       2  from test.support import socket_helper
       3  
       4  from contextlib import contextmanager
       5  import imaplib
       6  import os.path
       7  import socketserver
       8  import time
       9  import calendar
      10  import threading
      11  import socket
      12  
      13  from test.support import (verbose,
      14                            run_with_tz, run_with_locale, cpython_only, requires_resource,
      15                            requires_working_socket)
      16  from test.support import hashlib_helper
      17  from test.support import threading_helper
      18  from test.support import warnings_helper
      19  import unittest
      20  from unittest import mock
      21  from datetime import datetime, timezone, timedelta
      22  try:
      23      import ssl
      24  except ImportError:
      25      ssl = None
      26  
      27  support.requires_working_socket(module=True)
      28  
      29  CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "certdata", "keycert3.pem")
      30  CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "certdata", "pycacert.pem")
      31  
      32  
      33  class ESC[4;38;5;81mTestImaplib(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      34  
      35      def test_Internaldate2tuple(self):
      36          t0 = calendar.timegm((2000, 1, 1, 0, 0, 0, -1, -1, -1))
      37          tt = imaplib.Internaldate2tuple(
      38              b'25 (INTERNALDATE "01-Jan-2000 00:00:00 +0000")')
      39          self.assertEqual(time.mktime(tt), t0)
      40          tt = imaplib.Internaldate2tuple(
      41              b'25 (INTERNALDATE "01-Jan-2000 11:30:00 +1130")')
      42          self.assertEqual(time.mktime(tt), t0)
      43          tt = imaplib.Internaldate2tuple(
      44              b'25 (INTERNALDATE "31-Dec-1999 12:30:00 -1130")')
      45          self.assertEqual(time.mktime(tt), t0)
      46  
      47      @run_with_tz('MST+07MDT,M4.1.0,M10.5.0')
      48      def test_Internaldate2tuple_issue10941(self):
      49          self.assertNotEqual(imaplib.Internaldate2tuple(
      50              b'25 (INTERNALDATE "02-Apr-2000 02:30:00 +0000")'),
      51              imaplib.Internaldate2tuple(
      52                  b'25 (INTERNALDATE "02-Apr-2000 03:30:00 +0000")'))
      53  
      54      def timevalues(self):
      55          return [2000000000, 2000000000.0, time.localtime(2000000000),
      56                  (2033, 5, 18, 5, 33, 20, -1, -1, -1),
      57                  (2033, 5, 18, 5, 33, 20, -1, -1, 1),
      58                  datetime.fromtimestamp(2000000000,
      59                                         timezone(timedelta(0, 2 * 60 * 60))),
      60                  '"18-May-2033 05:33:20 +0200"']
      61  
      62      @run_with_locale('LC_ALL', 'de_DE', 'fr_FR')
      63      # DST rules included to work around quirk where the Gnu C library may not
      64      # otherwise restore the previous time zone
      65      @run_with_tz('STD-1DST,M3.2.0,M11.1.0')
      66      def test_Time2Internaldate(self):
      67          expected = '"18-May-2033 05:33:20 +0200"'
      68  
      69          for t in self.timevalues():
      70              internal = imaplib.Time2Internaldate(t)
      71              self.assertEqual(internal, expected)
      72  
      73      def test_that_Time2Internaldate_returns_a_result(self):
      74          # Without tzset, we can check only that it successfully
      75          # produces a result, not the correctness of the result itself,
      76          # since the result depends on the timezone the machine is in.
      77          for t in self.timevalues():
      78              imaplib.Time2Internaldate(t)
      79  
      80      @socket_helper.skip_if_tcp_blackhole
      81      def test_imap4_host_default_value(self):
      82          # Check whether the IMAP4_PORT is truly unavailable.
      83          with socket.socket() as s:
      84              try:
      85                  s.connect(('', imaplib.IMAP4_PORT))
      86                  self.skipTest(
      87                      "Cannot run the test with local IMAP server running.")
      88              except socket.error:
      89                  pass
      90  
      91          # This is the exception that should be raised.
      92          expected_errnos = socket_helper.get_socket_conn_refused_errs()
      93          with self.assertRaises(OSError) as cm:
      94              imaplib.IMAP4()
      95          self.assertIn(cm.exception.errno, expected_errnos)
      96  
      97  
      98  if ssl:
      99      class ESC[4;38;5;81mSecureTCPServer(ESC[4;38;5;149msocketserverESC[4;38;5;149m.ESC[4;38;5;149mTCPServer):
     100  
     101          def get_request(self):
     102              newsocket, fromaddr = self.socket.accept()
     103              context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
     104              context.load_cert_chain(CERTFILE)
     105              connstream = context.wrap_socket(newsocket, server_side=True)
     106              return connstream, fromaddr
     107  
     108      IMAP4_SSL = imaplib.IMAP4_SSL
     109  
     110  else:
     111  
     112      class ESC[4;38;5;81mSecureTCPServer:
     113          pass
     114  
     115      IMAP4_SSL = None
     116  
     117  
     118  class ESC[4;38;5;81mSimpleIMAPHandler(ESC[4;38;5;149msocketserverESC[4;38;5;149m.ESC[4;38;5;149mStreamRequestHandler):
     119      timeout = support.LOOPBACK_TIMEOUT
     120      continuation = None
     121      capabilities = ''
     122  
     123      def setup(self):
     124          super().setup()
     125          self.server.is_selected = False
     126          self.server.logged = None
     127  
     128      def _send(self, message):
     129          if verbose:
     130              print("SENT: %r" % message.strip())
     131          self.wfile.write(message)
     132  
     133      def _send_line(self, message):
     134          self._send(message + b'\r\n')
     135  
     136      def _send_textline(self, message):
     137          self._send_line(message.encode('ASCII'))
     138  
     139      def _send_tagged(self, tag, code, message):
     140          self._send_textline(' '.join((tag, code, message)))
     141  
     142      def handle(self):
     143          # Send a welcome message.
     144          self._send_textline('* OK IMAP4rev1')
     145          while 1:
     146              # Gather up input until we receive a line terminator or we timeout.
     147              # Accumulate read(1) because it's simpler to handle the differences
     148              # between naked sockets and SSL sockets.
     149              line = b''
     150              while 1:
     151                  try:
     152                      part = self.rfile.read(1)
     153                      if part == b'':
     154                          # Naked sockets return empty strings..
     155                          return
     156                      line += part
     157                  except OSError:
     158                      # ..but SSLSockets raise exceptions.
     159                      return
     160                  if line.endswith(b'\r\n'):
     161                      break
     162  
     163              if verbose:
     164                  print('GOT: %r' % line.strip())
     165              if self.continuation:
     166                  try:
     167                      self.continuation.send(line)
     168                  except StopIteration:
     169                      self.continuation = None
     170                  continue
     171              splitline = line.decode('ASCII').split()
     172              tag = splitline[0]
     173              cmd = splitline[1]
     174              args = splitline[2:]
     175  
     176              if hasattr(self, 'cmd_' + cmd):
     177                  continuation = getattr(self, 'cmd_' + cmd)(tag, args)
     178                  if continuation:
     179                      self.continuation = continuation
     180                      next(continuation)
     181              else:
     182                  self._send_tagged(tag, 'BAD', cmd + ' unknown')
     183  
     184      def cmd_CAPABILITY(self, tag, args):
     185          caps = ('IMAP4rev1 ' + self.capabilities
     186                  if self.capabilities
     187                  else 'IMAP4rev1')
     188          self._send_textline('* CAPABILITY ' + caps)
     189          self._send_tagged(tag, 'OK', 'CAPABILITY completed')
     190  
     191      def cmd_LOGOUT(self, tag, args):
     192          self.server.logged = None
     193          self._send_textline('* BYE IMAP4ref1 Server logging out')
     194          self._send_tagged(tag, 'OK', 'LOGOUT completed')
     195  
     196      def cmd_LOGIN(self, tag, args):
     197          self.server.logged = args[0]
     198          self._send_tagged(tag, 'OK', 'LOGIN completed')
     199  
     200      def cmd_SELECT(self, tag, args):
     201          self.server.is_selected = True
     202          self._send_line(b'* 2 EXISTS')
     203          self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.')
     204  
     205      def cmd_UNSELECT(self, tag, args):
     206          if self.server.is_selected:
     207              self.server.is_selected = False
     208              self._send_tagged(tag, 'OK', 'Returned to authenticated state. (Success)')
     209          else:
     210              self._send_tagged(tag, 'BAD', 'No mailbox selected')
     211  
     212  
     213  class ESC[4;38;5;81mNewIMAPTestsMixin():
     214      client = None
     215  
     216      def _setup(self, imap_handler, connect=True):
     217          """
     218          Sets up imap_handler for tests. imap_handler should inherit from either:
     219          - SimpleIMAPHandler - for testing IMAP commands,
     220          - socketserver.StreamRequestHandler - if raw access to stream is needed.
     221          Returns (client, server).
     222          """
     223          class ESC[4;38;5;81mTestTCPServer(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149mserver_class):
     224              def handle_error(self, request, client_address):
     225                  """
     226                  End request and raise the error if one occurs.
     227                  """
     228                  self.close_request(request)
     229                  self.server_close()
     230                  raise
     231  
     232          self.addCleanup(self._cleanup)
     233          self.server = self.server_class((socket_helper.HOST, 0), imap_handler)
     234          self.thread = threading.Thread(
     235              name=self._testMethodName+'-server',
     236              target=self.server.serve_forever,
     237              # Short poll interval to make the test finish quickly.
     238              # Time between requests is short enough that we won't wake
     239              # up spuriously too many times.
     240              kwargs={'poll_interval': 0.01})
     241          self.thread.daemon = True  # In case this function raises.
     242          self.thread.start()
     243  
     244          if connect:
     245              self.client = self.imap_class(*self.server.server_address)
     246  
     247          return self.client, self.server
     248  
     249      def _cleanup(self):
     250          """
     251          Cleans up the test server. This method should not be called manually,
     252          it is added to the cleanup queue in the _setup method already.
     253          """
     254          # if logout was called already we'd raise an exception trying to
     255          # shutdown the client once again
     256          if self.client is not None and self.client.state != 'LOGOUT':
     257              self.client.shutdown()
     258          # cleanup the server
     259          self.server.shutdown()
     260          self.server.server_close()
     261          threading_helper.join_thread(self.thread)
     262          # Explicitly clear the attribute to prevent dangling thread
     263          self.thread = None
     264  
     265      def test_EOF_without_complete_welcome_message(self):
     266          # http://bugs.python.org/issue5949
     267          class ESC[4;38;5;81mEOFHandler(ESC[4;38;5;149msocketserverESC[4;38;5;149m.ESC[4;38;5;149mStreamRequestHandler):
     268              def handle(self):
     269                  self.wfile.write(b'* OK')
     270          _, server = self._setup(EOFHandler, connect=False)
     271          self.assertRaises(imaplib.IMAP4.abort, self.imap_class,
     272                            *server.server_address)
     273  
     274      def test_line_termination(self):
     275          class ESC[4;38;5;81mBadNewlineHandler(ESC[4;38;5;149mSimpleIMAPHandler):
     276              def cmd_CAPABILITY(self, tag, args):
     277                  self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
     278                  self._send_tagged(tag, 'OK', 'CAPABILITY completed')
     279          _, server = self._setup(BadNewlineHandler, connect=False)
     280          self.assertRaises(imaplib.IMAP4.abort, self.imap_class,
     281                            *server.server_address)
     282  
     283      def test_enable_raises_error_if_not_AUTH(self):
     284          class ESC[4;38;5;81mEnableHandler(ESC[4;38;5;149mSimpleIMAPHandler):
     285              capabilities = 'AUTH ENABLE UTF8=ACCEPT'
     286          client, _ = self._setup(EnableHandler)
     287          self.assertFalse(client.utf8_enabled)
     288          with self.assertRaisesRegex(imaplib.IMAP4.error, 'ENABLE.*NONAUTH'):
     289              client.enable('foo')
     290          self.assertFalse(client.utf8_enabled)
     291  
     292      def test_enable_raises_error_if_no_capability(self):
     293          client, _ = self._setup(SimpleIMAPHandler)
     294          with self.assertRaisesRegex(imaplib.IMAP4.error,
     295                  'does not support ENABLE'):
     296              client.enable('foo')
     297  
     298      def test_enable_UTF8_raises_error_if_not_supported(self):
     299          client, _ = self._setup(SimpleIMAPHandler)
     300          typ, data = client.login('user', 'pass')
     301          self.assertEqual(typ, 'OK')
     302          with self.assertRaisesRegex(imaplib.IMAP4.error,
     303                  'does not support ENABLE'):
     304              client.enable('UTF8=ACCEPT')
     305  
     306      def test_enable_UTF8_True_append(self):
     307          class ESC[4;38;5;81mUTF8AppendServer(ESC[4;38;5;149mSimpleIMAPHandler):
     308              capabilities = 'ENABLE UTF8=ACCEPT'
     309              def cmd_ENABLE(self, tag, args):
     310                  self._send_tagged(tag, 'OK', 'ENABLE successful')
     311              def cmd_AUTHENTICATE(self, tag, args):
     312                  self._send_textline('+')
     313                  self.server.response = yield
     314                  self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
     315              def cmd_APPEND(self, tag, args):
     316                  self._send_textline('+')
     317                  self.server.response = yield
     318                  self._send_tagged(tag, 'OK', 'okay')
     319          client, server = self._setup(UTF8AppendServer)
     320          self.assertEqual(client._encoding, 'ascii')
     321          code, _ = client.authenticate('MYAUTH', lambda x: b'fake')
     322          self.assertEqual(code, 'OK')
     323          self.assertEqual(server.response, b'ZmFrZQ==\r\n')  # b64 encoded 'fake'
     324          code, _ = client.enable('UTF8=ACCEPT')
     325          self.assertEqual(code, 'OK')
     326          self.assertEqual(client._encoding, 'utf-8')
     327          msg_string = 'Subject: üñí©öðé'
     328          typ, data = client.append(None, None, None, msg_string.encode('utf-8'))
     329          self.assertEqual(typ, 'OK')
     330          self.assertEqual(server.response,
     331              ('UTF8 (%s)\r\n' % msg_string).encode('utf-8'))
     332  
     333      def test_search_disallows_charset_in_utf8_mode(self):
     334          class ESC[4;38;5;81mUTF8Server(ESC[4;38;5;149mSimpleIMAPHandler):
     335              capabilities = 'AUTH ENABLE UTF8=ACCEPT'
     336              def cmd_ENABLE(self, tag, args):
     337                  self._send_tagged(tag, 'OK', 'ENABLE successful')
     338              def cmd_AUTHENTICATE(self, tag, args):
     339                  self._send_textline('+')
     340                  self.server.response = yield
     341                  self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
     342          client, _ = self._setup(UTF8Server)
     343          typ, _ = client.authenticate('MYAUTH', lambda x: b'fake')
     344          self.assertEqual(typ, 'OK')
     345          typ, _ = client.enable('UTF8=ACCEPT')
     346          self.assertEqual(typ, 'OK')
     347          self.assertTrue(client.utf8_enabled)
     348          with self.assertRaisesRegex(imaplib.IMAP4.error, 'charset.*UTF8'):
     349              client.search('foo', 'bar')
     350  
     351      def test_bad_auth_name(self):
     352          class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
     353              def cmd_AUTHENTICATE(self, tag, args):
     354                  self._send_tagged(tag, 'NO',
     355                      'unrecognized authentication type {}'.format(args[0]))
     356          client, _ = self._setup(MyServer)
     357          with self.assertRaisesRegex(imaplib.IMAP4.error,
     358                  'unrecognized authentication type METHOD'):
     359              client.authenticate('METHOD', lambda: 1)
     360  
     361      def test_invalid_authentication(self):
     362          class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
     363              def cmd_AUTHENTICATE(self, tag, args):
     364                  self._send_textline('+')
     365                  self.response = yield
     366                  self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')
     367          client, _ = self._setup(MyServer)
     368          with self.assertRaisesRegex(imaplib.IMAP4.error,
     369                  r'\[AUTHENTICATIONFAILED\] invalid'):
     370              client.authenticate('MYAUTH', lambda x: b'fake')
     371  
     372      def test_valid_authentication_bytes(self):
     373          class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
     374              def cmd_AUTHENTICATE(self, tag, args):
     375                  self._send_textline('+')
     376                  self.server.response = yield
     377                  self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
     378          client, server = self._setup(MyServer)
     379          code, _ = client.authenticate('MYAUTH', lambda x: b'fake')
     380          self.assertEqual(code, 'OK')
     381          self.assertEqual(server.response, b'ZmFrZQ==\r\n')  # b64 encoded 'fake'
     382  
     383      def test_valid_authentication_plain_text(self):
     384          class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
     385              def cmd_AUTHENTICATE(self, tag, args):
     386                  self._send_textline('+')
     387                  self.server.response = yield
     388                  self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
     389          client, server = self._setup(MyServer)
     390          code, _ = client.authenticate('MYAUTH', lambda x: 'fake')
     391          self.assertEqual(code, 'OK')
     392          self.assertEqual(server.response, b'ZmFrZQ==\r\n')  # b64 encoded 'fake'
     393  
     394      @hashlib_helper.requires_hashdigest('md5', openssl=True)
     395      def test_login_cram_md5_bytes(self):
     396          class ESC[4;38;5;81mAuthHandler(ESC[4;38;5;149mSimpleIMAPHandler):
     397              capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
     398              def cmd_AUTHENTICATE(self, tag, args):
     399                  self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
     400                                      'VzdG9uLm1jaS5uZXQ=')
     401                  r = yield
     402                  if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT'
     403                           b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'):
     404                      self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
     405                  else:
     406                      self._send_tagged(tag, 'NO', 'No access')
     407          client, _ = self._setup(AuthHandler)
     408          self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
     409          ret, _ = client.login_cram_md5("tim", b"tanstaaftanstaaf")
     410          self.assertEqual(ret, "OK")
     411  
     412      @hashlib_helper.requires_hashdigest('md5', openssl=True)
     413      def test_login_cram_md5_plain_text(self):
     414          class ESC[4;38;5;81mAuthHandler(ESC[4;38;5;149mSimpleIMAPHandler):
     415              capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
     416              def cmd_AUTHENTICATE(self, tag, args):
     417                  self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
     418                                      'VzdG9uLm1jaS5uZXQ=')
     419                  r = yield
     420                  if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT'
     421                           b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'):
     422                      self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
     423                  else:
     424                      self._send_tagged(tag, 'NO', 'No access')
     425          client, _ = self._setup(AuthHandler)
     426          self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
     427          ret, _ = client.login_cram_md5("tim", "tanstaaftanstaaf")
     428          self.assertEqual(ret, "OK")
     429  
     430      def test_aborted_authentication(self):
     431          class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
     432              def cmd_AUTHENTICATE(self, tag, args):
     433                  self._send_textline('+')
     434                  self.response = yield
     435                  if self.response == b'*\r\n':
     436                      self._send_tagged(
     437                          tag,
     438                          'NO',
     439                          '[AUTHENTICATIONFAILED] aborted')
     440                  else:
     441                      self._send_tagged(tag, 'OK', 'MYAUTH successful')
     442          client, _ = self._setup(MyServer)
     443          with self.assertRaisesRegex(imaplib.IMAP4.error,
     444                  r'\[AUTHENTICATIONFAILED\] aborted'):
     445              client.authenticate('MYAUTH', lambda x: None)
     446  
     447      @mock.patch('imaplib._MAXLINE', 10)
     448      def test_linetoolong(self):
     449          class ESC[4;38;5;81mTooLongHandler(ESC[4;38;5;149mSimpleIMAPHandler):
     450              def handle(self):
     451                  # send response line longer than the limit set in the next line
     452                  self.wfile.write(b'* OK ' + 11 * b'x' + b'\r\n')
     453          _, server = self._setup(TooLongHandler, connect=False)
     454          with self.assertRaisesRegex(imaplib.IMAP4.error,
     455                  'got more than 10 bytes'):
     456              self.imap_class(*server.server_address)
     457  
     458      def test_simple_with_statement(self):
     459          _, server = self._setup(SimpleIMAPHandler, connect=False)
     460          with self.imap_class(*server.server_address):
     461              pass
     462  
     463      @requires_resource('walltime')
     464      def test_imaplib_timeout_test(self):
     465          _, server = self._setup(SimpleIMAPHandler)
     466          addr = server.server_address[1]
     467          client = self.imap_class("localhost", addr, timeout=None)
     468          self.assertEqual(client.sock.timeout, None)
     469          client.shutdown()
     470          client = self.imap_class("localhost", addr, timeout=support.LOOPBACK_TIMEOUT)
     471          self.assertEqual(client.sock.timeout, support.LOOPBACK_TIMEOUT)
     472          client.shutdown()
     473          with self.assertRaises(ValueError):
     474              client = self.imap_class("localhost", addr, timeout=0)
     475  
     476      def test_imaplib_timeout_functionality_test(self):
     477          class ESC[4;38;5;81mTimeoutHandler(ESC[4;38;5;149mSimpleIMAPHandler):
     478              def handle(self):
     479                  time.sleep(1)
     480                  SimpleIMAPHandler.handle(self)
     481  
     482          _, server = self._setup(TimeoutHandler)
     483          addr = server.server_address[1]
     484          with self.assertRaises(TimeoutError):
     485              client = self.imap_class("localhost", addr, timeout=0.001)
     486  
     487      def test_with_statement(self):
     488          _, server = self._setup(SimpleIMAPHandler, connect=False)
     489          with self.imap_class(*server.server_address) as imap:
     490              imap.login('user', 'pass')
     491              self.assertEqual(server.logged, 'user')
     492          self.assertIsNone(server.logged)
     493  
     494      def test_with_statement_logout(self):
     495          # It is legal to log out explicitly inside the with block
     496          _, server = self._setup(SimpleIMAPHandler, connect=False)
     497          with self.imap_class(*server.server_address) as imap:
     498              imap.login('user', 'pass')
     499              self.assertEqual(server.logged, 'user')
     500              imap.logout()
     501              self.assertIsNone(server.logged)
     502          self.assertIsNone(server.logged)
     503  
     504      # command tests
     505  
     506      def test_login(self):
     507          client, _ = self._setup(SimpleIMAPHandler)
     508          typ, data = client.login('user', 'pass')
     509          self.assertEqual(typ, 'OK')
     510          self.assertEqual(data[0], b'LOGIN completed')
     511          self.assertEqual(client.state, 'AUTH')
     512  
     513      def test_logout(self):
     514          client, _ = self._setup(SimpleIMAPHandler)
     515          typ, data = client.login('user', 'pass')
     516          self.assertEqual(typ, 'OK')
     517          self.assertEqual(data[0], b'LOGIN completed')
     518          typ, data = client.logout()
     519          self.assertEqual(typ, 'BYE', (typ, data))
     520          self.assertEqual(data[0], b'IMAP4ref1 Server logging out', (typ, data))
     521          self.assertEqual(client.state, 'LOGOUT')
     522  
     523      def test_lsub(self):
     524          class ESC[4;38;5;81mLsubCmd(ESC[4;38;5;149mSimpleIMAPHandler):
     525              def cmd_LSUB(self, tag, args):
     526                  self._send_textline('* LSUB () "." directoryA')
     527                  return self._send_tagged(tag, 'OK', 'LSUB completed')
     528          client, _ = self._setup(LsubCmd)
     529          client.login('user', 'pass')
     530          typ, data = client.lsub()
     531          self.assertEqual(typ, 'OK')
     532          self.assertEqual(data[0], b'() "." directoryA')
     533  
     534      def test_unselect(self):
     535          client, _ = self._setup(SimpleIMAPHandler)
     536          client.login('user', 'pass')
     537          typ, data = client.select()
     538          self.assertEqual(typ, 'OK')
     539          self.assertEqual(data[0], b'2')
     540  
     541          typ, data = client.unselect()
     542          self.assertEqual(typ, 'OK')
     543          self.assertEqual(data[0], b'Returned to authenticated state. (Success)')
     544          self.assertEqual(client.state, 'AUTH')
     545  
     546  
     547  class ESC[4;38;5;81mNewIMAPTests(ESC[4;38;5;149mNewIMAPTestsMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     548      imap_class = imaplib.IMAP4
     549      server_class = socketserver.TCPServer
     550  
     551  
     552  @unittest.skipUnless(ssl, "SSL not available")
     553  class ESC[4;38;5;81mNewIMAPSSLTests(ESC[4;38;5;149mNewIMAPTestsMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     554      imap_class = IMAP4_SSL
     555      server_class = SecureTCPServer
     556  
     557      @requires_resource('walltime')
     558      def test_ssl_raises(self):
     559          ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
     560          self.assertEqual(ssl_context.verify_mode, ssl.CERT_REQUIRED)
     561          self.assertEqual(ssl_context.check_hostname, True)
     562          ssl_context.load_verify_locations(CAFILE)
     563  
     564          with self.assertRaisesRegex(ssl.CertificateError,
     565                  "IP address mismatch, certificate is not valid for "
     566                  "'127.0.0.1'"):
     567              _, server = self._setup(SimpleIMAPHandler)
     568              client = self.imap_class(*server.server_address,
     569                                       ssl_context=ssl_context)
     570              client.shutdown()
     571  
     572      @requires_resource('walltime')
     573      def test_ssl_verified(self):
     574          ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
     575          ssl_context.load_verify_locations(CAFILE)
     576  
     577          _, server = self._setup(SimpleIMAPHandler)
     578          client = self.imap_class("localhost", server.server_address[1],
     579                                   ssl_context=ssl_context)
     580          client.shutdown()
     581  
     582      # Mock the private method _connect(), so mark the test as specific
     583      # to CPython stdlib
     584      @cpython_only
     585      def test_certfile_arg_warn(self):
     586          with warnings_helper.check_warnings(('', DeprecationWarning)):
     587              with mock.patch.object(self.imap_class, 'open'):
     588                  with mock.patch.object(self.imap_class, '_connect'):
     589                      self.imap_class('localhost', 143, certfile=CERTFILE)
     590  
     591  class ESC[4;38;5;81mThreadedNetworkedTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     592      server_class = socketserver.TCPServer
     593      imap_class = imaplib.IMAP4
     594  
     595      def make_server(self, addr, hdlr):
     596  
     597          class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149mserver_class):
     598              def handle_error(self, request, client_address):
     599                  self.close_request(request)
     600                  self.server_close()
     601                  raise
     602  
     603          if verbose:
     604              print("creating server")
     605          server = MyServer(addr, hdlr)
     606          self.assertEqual(server.server_address, server.socket.getsockname())
     607  
     608          if verbose:
     609              print("server created")
     610              print("ADDR =", addr)
     611              print("CLASS =", self.server_class)
     612              print("HDLR =", server.RequestHandlerClass)
     613  
     614          t = threading.Thread(
     615              name='%s serving' % self.server_class,
     616              target=server.serve_forever,
     617              # Short poll interval to make the test finish quickly.
     618              # Time between requests is short enough that we won't wake
     619              # up spuriously too many times.
     620              kwargs={'poll_interval': 0.01})
     621          t.daemon = True  # In case this function raises.
     622          t.start()
     623          if verbose:
     624              print("server running")
     625          return server, t
     626  
     627      def reap_server(self, server, thread):
     628          if verbose:
     629              print("waiting for server")
     630          server.shutdown()
     631          server.server_close()
     632          thread.join()
     633          if verbose:
     634              print("done")
     635  
     636      @contextmanager
     637      def reaped_server(self, hdlr):
     638          server, thread = self.make_server((socket_helper.HOST, 0), hdlr)
     639          try:
     640              yield server
     641          finally:
     642              self.reap_server(server, thread)
     643  
     644      @contextmanager
     645      def reaped_pair(self, hdlr):
     646          with self.reaped_server(hdlr) as server:
     647              client = self.imap_class(*server.server_address)
     648              try:
     649                  yield server, client
     650              finally:
     651                  client.logout()
     652  
     653      @threading_helper.reap_threads
     654      def test_connect(self):
     655          with self.reaped_server(SimpleIMAPHandler) as server:
     656              client = self.imap_class(*server.server_address)
     657              client.shutdown()
     658  
     659      @threading_helper.reap_threads
     660      def test_bracket_flags(self):
     661  
     662          # This violates RFC 3501, which disallows ']' characters in tag names,
     663          # but imaplib has allowed producing such tags forever, other programs
     664          # also produce them (eg: OtherInbox's Organizer app as of 20140716),
     665          # and Gmail, for example, accepts them and produces them.  So we
     666          # support them.  See issue #21815.
     667  
     668          class ESC[4;38;5;81mBracketFlagHandler(ESC[4;38;5;149mSimpleIMAPHandler):
     669  
     670              def handle(self):
     671                  self.flags = ['Answered', 'Flagged', 'Deleted', 'Seen', 'Draft']
     672                  super().handle()
     673  
     674              def cmd_AUTHENTICATE(self, tag, args):
     675                  self._send_textline('+')
     676                  self.server.response = yield
     677                  self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
     678  
     679              def cmd_SELECT(self, tag, args):
     680                  flag_msg = ' \\'.join(self.flags)
     681                  self._send_line(('* FLAGS (%s)' % flag_msg).encode('ascii'))
     682                  self._send_line(b'* 2 EXISTS')
     683                  self._send_line(b'* 0 RECENT')
     684                  msg = ('* OK [PERMANENTFLAGS %s \\*)] Flags permitted.'
     685                          % flag_msg)
     686                  self._send_line(msg.encode('ascii'))
     687                  self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.')
     688  
     689              def cmd_STORE(self, tag, args):
     690                  new_flags = args[2].strip('(').strip(')').split()
     691                  self.flags.extend(new_flags)
     692                  flags_msg = '(FLAGS (%s))' % ' \\'.join(self.flags)
     693                  msg = '* %s FETCH %s' % (args[0], flags_msg)
     694                  self._send_line(msg.encode('ascii'))
     695                  self._send_tagged(tag, 'OK', 'STORE completed.')
     696  
     697          with self.reaped_pair(BracketFlagHandler) as (server, client):
     698              code, data = client.authenticate('MYAUTH', lambda x: b'fake')
     699              self.assertEqual(code, 'OK')
     700              self.assertEqual(server.response, b'ZmFrZQ==\r\n')
     701              client.select('test')
     702              typ, [data] = client.store(b'1', "+FLAGS", "[test]")
     703              self.assertIn(b'[test]', data)
     704              client.select('test')
     705              typ, [data] = client.response('PERMANENTFLAGS')
     706              self.assertIn(b'[test]', data)
     707  
     708      @threading_helper.reap_threads
     709      def test_issue5949(self):
     710  
     711          class ESC[4;38;5;81mEOFHandler(ESC[4;38;5;149msocketserverESC[4;38;5;149m.ESC[4;38;5;149mStreamRequestHandler):
     712              def handle(self):
     713                  # EOF without sending a complete welcome message.
     714                  self.wfile.write(b'* OK')
     715  
     716          with self.reaped_server(EOFHandler) as server:
     717              self.assertRaises(imaplib.IMAP4.abort,
     718                                self.imap_class, *server.server_address)
     719  
     720      @threading_helper.reap_threads
     721      def test_line_termination(self):
     722  
     723          class ESC[4;38;5;81mBadNewlineHandler(ESC[4;38;5;149mSimpleIMAPHandler):
     724  
     725              def cmd_CAPABILITY(self, tag, args):
     726                  self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
     727                  self._send_tagged(tag, 'OK', 'CAPABILITY completed')
     728  
     729          with self.reaped_server(BadNewlineHandler) as server:
     730              self.assertRaises(imaplib.IMAP4.abort,
     731                                self.imap_class, *server.server_address)
     732  
     733      class ESC[4;38;5;81mUTF8Server(ESC[4;38;5;149mSimpleIMAPHandler):
     734          capabilities = 'AUTH ENABLE UTF8=ACCEPT'
     735  
     736          def cmd_ENABLE(self, tag, args):
     737              self._send_tagged(tag, 'OK', 'ENABLE successful')
     738  
     739          def cmd_AUTHENTICATE(self, tag, args):
     740              self._send_textline('+')
     741              self.server.response = yield
     742              self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
     743  
     744      @threading_helper.reap_threads
     745      def test_enable_raises_error_if_not_AUTH(self):
     746          with self.reaped_pair(self.UTF8Server) as (server, client):
     747              self.assertFalse(client.utf8_enabled)
     748              self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo')
     749              self.assertFalse(client.utf8_enabled)
     750  
     751      # XXX Also need a test that enable after SELECT raises an error.
     752  
     753      @threading_helper.reap_threads
     754      def test_enable_raises_error_if_no_capability(self):
     755          class ESC[4;38;5;81mNoEnableServer(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149mUTF8Server):
     756              capabilities = 'AUTH'
     757          with self.reaped_pair(NoEnableServer) as (server, client):
     758              self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo')
     759  
     760      @threading_helper.reap_threads
     761      def test_enable_UTF8_raises_error_if_not_supported(self):
     762          class ESC[4;38;5;81mNonUTF8Server(ESC[4;38;5;149mSimpleIMAPHandler):
     763              pass
     764          with self.assertRaises(imaplib.IMAP4.error):
     765              with self.reaped_pair(NonUTF8Server) as (server, client):
     766                  typ, data = client.login('user', 'pass')
     767                  self.assertEqual(typ, 'OK')
     768                  client.enable('UTF8=ACCEPT')
     769                  pass
     770  
     771      @threading_helper.reap_threads
     772      def test_enable_UTF8_True_append(self):
     773  
     774          class ESC[4;38;5;81mUTF8AppendServer(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149mUTF8Server):
     775              def cmd_APPEND(self, tag, args):
     776                  self._send_textline('+')
     777                  self.server.response = yield
     778                  self._send_tagged(tag, 'OK', 'okay')
     779  
     780          with self.reaped_pair(UTF8AppendServer) as (server, client):
     781              self.assertEqual(client._encoding, 'ascii')
     782              code, _ = client.authenticate('MYAUTH', lambda x: b'fake')
     783              self.assertEqual(code, 'OK')
     784              self.assertEqual(server.response,
     785                               b'ZmFrZQ==\r\n')  # b64 encoded 'fake'
     786              code, _ = client.enable('UTF8=ACCEPT')
     787              self.assertEqual(code, 'OK')
     788              self.assertEqual(client._encoding, 'utf-8')
     789              msg_string = 'Subject: üñí©öðé'
     790              typ, data = client.append(
     791                  None, None, None, msg_string.encode('utf-8'))
     792              self.assertEqual(typ, 'OK')
     793              self.assertEqual(
     794                  server.response,
     795                  ('UTF8 (%s)\r\n' % msg_string).encode('utf-8')
     796              )
     797  
     798      # XXX also need a test that makes sure that the Literal and Untagged_status
     799      # regexes uses unicode in UTF8 mode instead of the default ASCII.
     800  
     801      @threading_helper.reap_threads
     802      def test_search_disallows_charset_in_utf8_mode(self):
     803          with self.reaped_pair(self.UTF8Server) as (server, client):
     804              typ, _ = client.authenticate('MYAUTH', lambda x: b'fake')
     805              self.assertEqual(typ, 'OK')
     806              typ, _ = client.enable('UTF8=ACCEPT')
     807              self.assertEqual(typ, 'OK')
     808              self.assertTrue(client.utf8_enabled)
     809              self.assertRaises(imaplib.IMAP4.error, client.search, 'foo', 'bar')
     810  
     811      @threading_helper.reap_threads
     812      def test_bad_auth_name(self):
     813  
     814          class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
     815  
     816              def cmd_AUTHENTICATE(self, tag, args):
     817                  self._send_tagged(tag, 'NO', 'unrecognized authentication '
     818                                    'type {}'.format(args[0]))
     819  
     820          with self.reaped_pair(MyServer) as (server, client):
     821              with self.assertRaises(imaplib.IMAP4.error):
     822                  client.authenticate('METHOD', lambda: 1)
     823  
     824      @threading_helper.reap_threads
     825      def test_invalid_authentication(self):
     826  
     827          class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
     828  
     829              def cmd_AUTHENTICATE(self, tag, args):
     830                  self._send_textline('+')
     831                  self.response = yield
     832                  self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')
     833  
     834          with self.reaped_pair(MyServer) as (server, client):
     835              with self.assertRaises(imaplib.IMAP4.error):
     836                  code, data = client.authenticate('MYAUTH', lambda x: b'fake')
     837  
     838      @threading_helper.reap_threads
     839      def test_valid_authentication(self):
     840  
     841          class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
     842  
     843              def cmd_AUTHENTICATE(self, tag, args):
     844                  self._send_textline('+')
     845                  self.server.response = yield
     846                  self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
     847  
     848          with self.reaped_pair(MyServer) as (server, client):
     849              code, data = client.authenticate('MYAUTH', lambda x: b'fake')
     850              self.assertEqual(code, 'OK')
     851              self.assertEqual(server.response,
     852                               b'ZmFrZQ==\r\n')  # b64 encoded 'fake'
     853  
     854          with self.reaped_pair(MyServer) as (server, client):
     855              code, data = client.authenticate('MYAUTH', lambda x: 'fake')
     856              self.assertEqual(code, 'OK')
     857              self.assertEqual(server.response,
     858                               b'ZmFrZQ==\r\n')  # b64 encoded 'fake'
     859  
     860      @threading_helper.reap_threads
     861      @hashlib_helper.requires_hashdigest('md5', openssl=True)
     862      def test_login_cram_md5(self):
     863  
     864          class ESC[4;38;5;81mAuthHandler(ESC[4;38;5;149mSimpleIMAPHandler):
     865  
     866              capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
     867  
     868              def cmd_AUTHENTICATE(self, tag, args):
     869                  self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
     870                                      'VzdG9uLm1jaS5uZXQ=')
     871                  r = yield
     872                  if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT'
     873                           b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'):
     874                      self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
     875                  else:
     876                      self._send_tagged(tag, 'NO', 'No access')
     877  
     878          with self.reaped_pair(AuthHandler) as (server, client):
     879              self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
     880              ret, data = client.login_cram_md5("tim", "tanstaaftanstaaf")
     881              self.assertEqual(ret, "OK")
     882  
     883          with self.reaped_pair(AuthHandler) as (server, client):
     884              self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
     885              ret, data = client.login_cram_md5("tim", b"tanstaaftanstaaf")
     886              self.assertEqual(ret, "OK")
     887  
     888  
     889      @threading_helper.reap_threads
     890      def test_aborted_authentication(self):
     891  
     892          class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
     893  
     894              def cmd_AUTHENTICATE(self, tag, args):
     895                  self._send_textline('+')
     896                  self.response = yield
     897  
     898                  if self.response == b'*\r\n':
     899                      self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] aborted')
     900                  else:
     901                      self._send_tagged(tag, 'OK', 'MYAUTH successful')
     902  
     903          with self.reaped_pair(MyServer) as (server, client):
     904              with self.assertRaises(imaplib.IMAP4.error):
     905                  code, data = client.authenticate('MYAUTH', lambda x: None)
     906  
     907  
     908      def test_linetoolong(self):
     909          class ESC[4;38;5;81mTooLongHandler(ESC[4;38;5;149mSimpleIMAPHandler):
     910              def handle(self):
     911                  # Send a very long response line
     912                  self.wfile.write(b'* OK ' + imaplib._MAXLINE * b'x' + b'\r\n')
     913  
     914          with self.reaped_server(TooLongHandler) as server:
     915              self.assertRaises(imaplib.IMAP4.error,
     916                                self.imap_class, *server.server_address)
     917  
     918      @threading_helper.reap_threads
     919      def test_simple_with_statement(self):
     920          # simplest call
     921          with self.reaped_server(SimpleIMAPHandler) as server:
     922              with self.imap_class(*server.server_address):
     923                  pass
     924  
     925      @threading_helper.reap_threads
     926      def test_with_statement(self):
     927          with self.reaped_server(SimpleIMAPHandler) as server:
     928              with self.imap_class(*server.server_address) as imap:
     929                  imap.login('user', 'pass')
     930                  self.assertEqual(server.logged, 'user')
     931              self.assertIsNone(server.logged)
     932  
     933      @threading_helper.reap_threads
     934      def test_with_statement_logout(self):
     935          # what happens if already logout in the block?
     936          with self.reaped_server(SimpleIMAPHandler) as server:
     937              with self.imap_class(*server.server_address) as imap:
     938                  imap.login('user', 'pass')
     939                  self.assertEqual(server.logged, 'user')
     940                  imap.logout()
     941                  self.assertIsNone(server.logged)
     942              self.assertIsNone(server.logged)
     943  
     944      @threading_helper.reap_threads
     945      @cpython_only
     946      @unittest.skipUnless(__debug__, "Won't work if __debug__ is False")
     947      def test_dump_ur(self):
     948          # See: http://bugs.python.org/issue26543
     949          untagged_resp_dict = {'READ-WRITE': [b'']}
     950  
     951          with self.reaped_server(SimpleIMAPHandler) as server:
     952              with self.imap_class(*server.server_address) as imap:
     953                  with mock.patch.object(imap, '_mesg') as mock_mesg:
     954                      imap._dump_ur(untagged_resp_dict)
     955                      mock_mesg.assert_called_with(
     956                          "untagged responses dump:READ-WRITE: [b'']"
     957                      )
     958  
     959  
     960  @unittest.skipUnless(ssl, "SSL not available")
     961  class ESC[4;38;5;81mThreadedNetworkedTestsSSL(ESC[4;38;5;149mThreadedNetworkedTests):
     962      server_class = SecureTCPServer
     963      imap_class = IMAP4_SSL
     964  
     965      @threading_helper.reap_threads
     966      def test_ssl_verified(self):
     967          ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
     968          ssl_context.load_verify_locations(CAFILE)
     969  
     970          with self.assertRaisesRegex(
     971                  ssl.CertificateError,
     972                  "IP address mismatch, certificate is not valid for "
     973                  "'127.0.0.1'"):
     974              with self.reaped_server(SimpleIMAPHandler) as server:
     975                  client = self.imap_class(*server.server_address,
     976                                           ssl_context=ssl_context)
     977                  client.shutdown()
     978  
     979          with self.reaped_server(SimpleIMAPHandler) as server:
     980              client = self.imap_class("localhost", server.server_address[1],
     981                                       ssl_context=ssl_context)
     982              client.shutdown()
     983  
     984  
     985  @unittest.skipUnless(
     986      support.is_resource_enabled('network'), 'network resource disabled')
     987  @unittest.skip('cyrus.andrew.cmu.edu blocks connections')
     988  class ESC[4;38;5;81mRemoteIMAPTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     989      host = 'cyrus.andrew.cmu.edu'
     990      port = 143
     991      username = 'anonymous'
     992      password = 'pass'
     993      imap_class = imaplib.IMAP4
     994  
     995      def setUp(self):
     996          with socket_helper.transient_internet(self.host):
     997              self.server = self.imap_class(self.host, self.port)
     998  
     999      def tearDown(self):
    1000          if self.server is not None:
    1001              with socket_helper.transient_internet(self.host):
    1002                  self.server.logout()
    1003  
    1004      def test_logincapa(self):
    1005          with socket_helper.transient_internet(self.host):
    1006              for cap in self.server.capabilities:
    1007                  self.assertIsInstance(cap, str)
    1008              self.assertIn('LOGINDISABLED', self.server.capabilities)
    1009              self.assertIn('AUTH=ANONYMOUS', self.server.capabilities)
    1010              rs = self.server.login(self.username, self.password)
    1011              self.assertEqual(rs[0], 'OK')
    1012  
    1013      def test_logout(self):
    1014          with socket_helper.transient_internet(self.host):
    1015              rs = self.server.logout()
    1016              self.server = None
    1017              self.assertEqual(rs[0], 'BYE', rs)
    1018  
    1019  
    1020  @unittest.skipUnless(ssl, "SSL not available")
    1021  @unittest.skipUnless(
    1022      support.is_resource_enabled('network'), 'network resource disabled')
    1023  @unittest.skip('cyrus.andrew.cmu.edu blocks connections')
    1024  class ESC[4;38;5;81mRemoteIMAP_STARTTLSTest(ESC[4;38;5;149mRemoteIMAPTest):
    1025  
    1026      def setUp(self):
    1027          super().setUp()
    1028          with socket_helper.transient_internet(self.host):
    1029              rs = self.server.starttls()
    1030              self.assertEqual(rs[0], 'OK')
    1031  
    1032      def test_logincapa(self):
    1033          for cap in self.server.capabilities:
    1034              self.assertIsInstance(cap, str)
    1035          self.assertNotIn('LOGINDISABLED', self.server.capabilities)
    1036  
    1037  
    1038  @unittest.skipUnless(ssl, "SSL not available")
    1039  @unittest.skip('cyrus.andrew.cmu.edu blocks connections')
    1040  class ESC[4;38;5;81mRemoteIMAP_SSLTest(ESC[4;38;5;149mRemoteIMAPTest):
    1041      port = 993
    1042      imap_class = IMAP4_SSL
    1043  
    1044      def setUp(self):
    1045          pass
    1046  
    1047      def tearDown(self):
    1048          pass
    1049  
    1050      def create_ssl_context(self):
    1051          ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    1052          ssl_context.check_hostname = False
    1053          ssl_context.verify_mode = ssl.CERT_NONE
    1054          ssl_context.load_cert_chain(CERTFILE)
    1055          return ssl_context
    1056  
    1057      def check_logincapa(self, server):
    1058          try:
    1059              for cap in server.capabilities:
    1060                  self.assertIsInstance(cap, str)
    1061              self.assertNotIn('LOGINDISABLED', server.capabilities)
    1062              self.assertIn('AUTH=PLAIN', server.capabilities)
    1063              rs = server.login(self.username, self.password)
    1064              self.assertEqual(rs[0], 'OK')
    1065          finally:
    1066              server.logout()
    1067  
    1068      def test_logincapa(self):
    1069          with socket_helper.transient_internet(self.host):
    1070              _server = self.imap_class(self.host, self.port)
    1071              self.check_logincapa(_server)
    1072  
    1073      def test_logout(self):
    1074          with socket_helper.transient_internet(self.host):
    1075              _server = self.imap_class(self.host, self.port)
    1076              rs = _server.logout()
    1077              self.assertEqual(rs[0], 'BYE', rs)
    1078  
    1079      def test_ssl_context_certfile_exclusive(self):
    1080          with socket_helper.transient_internet(self.host):
    1081              self.assertRaises(
    1082                  ValueError, self.imap_class, self.host, self.port,
    1083                  certfile=CERTFILE, ssl_context=self.create_ssl_context())
    1084  
    1085      def test_ssl_context_keyfile_exclusive(self):
    1086          with socket_helper.transient_internet(self.host):
    1087              self.assertRaises(
    1088                  ValueError, self.imap_class, self.host, self.port,
    1089                  keyfile=CERTFILE, ssl_context=self.create_ssl_context())
    1090  
    1091  
    1092  if __name__ == "__main__":
    1093      unittest.main()