python (3.12.0)

(root)/
lib/
python3.12/
test/
test_imaplib.py
       1  from test import support
       2  from test.support import socket_helper
       3  
       4  from contextlib import contextmanager
       5  import imaplib
       6  import os.path
       7  import socketserver
       8  import time
       9  import calendar
      10  import threading
      11  import socket
      12  
      13  from test.support import verbose, run_with_tz, run_with_locale, cpython_only, requires_resource
      14  from test.support import hashlib_helper
      15  from test.support import threading_helper
      16  import unittest
      17  from unittest import mock
      18  from datetime import datetime, timezone, timedelta
      19  try:
      20      import ssl
      21  except ImportError:
      22      ssl = None
      23  
      24  support.requires_working_socket(module=True)
      25  
      26  CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem")
      27  CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem")
      28  
      29  
      30  class ESC[4;38;5;81mTestImaplib(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      31  
      32      def test_Internaldate2tuple(self):
      33          t0 = calendar.timegm((2000, 1, 1, 0, 0, 0, -1, -1, -1))
      34          tt = imaplib.Internaldate2tuple(
      35              b'25 (INTERNALDATE "01-Jan-2000 00:00:00 +0000")')
      36          self.assertEqual(time.mktime(tt), t0)
      37          tt = imaplib.Internaldate2tuple(
      38              b'25 (INTERNALDATE "01-Jan-2000 11:30:00 +1130")')
      39          self.assertEqual(time.mktime(tt), t0)
      40          tt = imaplib.Internaldate2tuple(
      41              b'25 (INTERNALDATE "31-Dec-1999 12:30:00 -1130")')
      42          self.assertEqual(time.mktime(tt), t0)
      43  
      44      @run_with_tz('MST+07MDT,M4.1.0,M10.5.0')
      45      def test_Internaldate2tuple_issue10941(self):
      46          self.assertNotEqual(imaplib.Internaldate2tuple(
      47              b'25 (INTERNALDATE "02-Apr-2000 02:30:00 +0000")'),
      48              imaplib.Internaldate2tuple(
      49                  b'25 (INTERNALDATE "02-Apr-2000 03:30:00 +0000")'))
      50  
      51      def timevalues(self):
      52          return [2000000000, 2000000000.0, time.localtime(2000000000),
      53                  (2033, 5, 18, 5, 33, 20, -1, -1, -1),
      54                  (2033, 5, 18, 5, 33, 20, -1, -1, 1),
      55                  datetime.fromtimestamp(2000000000,
      56                                         timezone(timedelta(0, 2 * 60 * 60))),
      57                  '"18-May-2033 05:33:20 +0200"']
      58  
      59      @run_with_locale('LC_ALL', 'de_DE', 'fr_FR')
      60      # DST rules included to work around quirk where the Gnu C library may not
      61      # otherwise restore the previous time zone
      62      @run_with_tz('STD-1DST,M3.2.0,M11.1.0')
      63      def test_Time2Internaldate(self):
      64          expected = '"18-May-2033 05:33:20 +0200"'
      65  
      66          for t in self.timevalues():
      67              internal = imaplib.Time2Internaldate(t)
      68              self.assertEqual(internal, expected)
      69  
      70      def test_that_Time2Internaldate_returns_a_result(self):
      71          # Without tzset, we can check only that it successfully
      72          # produces a result, not the correctness of the result itself,
      73          # since the result depends on the timezone the machine is in.
      74          for t in self.timevalues():
      75              imaplib.Time2Internaldate(t)
      76  
      77      @socket_helper.skip_if_tcp_blackhole
      78      def test_imap4_host_default_value(self):
      79          # Check whether the IMAP4_PORT is truly unavailable.
      80          with socket.socket() as s:
      81              try:
      82                  s.connect(('', imaplib.IMAP4_PORT))
      83                  self.skipTest(
      84                      "Cannot run the test with local IMAP server running.")
      85              except socket.error:
      86                  pass
      87  
      88          # This is the exception that should be raised.
      89          expected_errnos = socket_helper.get_socket_conn_refused_errs()
      90          with self.assertRaises(OSError) as cm:
      91              imaplib.IMAP4()
      92          self.assertIn(cm.exception.errno, expected_errnos)
      93  
      94  
      95  if ssl:
      96      class ESC[4;38;5;81mSecureTCPServer(ESC[4;38;5;149msocketserverESC[4;38;5;149m.ESC[4;38;5;149mTCPServer):
      97  
      98          def get_request(self):
      99              newsocket, fromaddr = self.socket.accept()
     100              context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
     101              context.load_cert_chain(CERTFILE)
     102              connstream = context.wrap_socket(newsocket, server_side=True)
     103              return connstream, fromaddr
     104  
     105      IMAP4_SSL = imaplib.IMAP4_SSL
     106  
     107  else:
     108  
     109      class ESC[4;38;5;81mSecureTCPServer:
     110          pass
     111  
     112      IMAP4_SSL = None
     113  
     114  
     115  class ESC[4;38;5;81mSimpleIMAPHandler(ESC[4;38;5;149msocketserverESC[4;38;5;149m.ESC[4;38;5;149mStreamRequestHandler):
     116      timeout = support.LOOPBACK_TIMEOUT
     117      continuation = None
     118      capabilities = ''
     119  
     120      def setup(self):
     121          super().setup()
     122          self.server.is_selected = False
     123          self.server.logged = None
     124  
     125      def _send(self, message):
     126          if verbose:
     127              print("SENT: %r" % message.strip())
     128          self.wfile.write(message)
     129  
     130      def _send_line(self, message):
     131          self._send(message + b'\r\n')
     132  
     133      def _send_textline(self, message):
     134          self._send_line(message.encode('ASCII'))
     135  
     136      def _send_tagged(self, tag, code, message):
     137          self._send_textline(' '.join((tag, code, message)))
     138  
     139      def handle(self):
     140          # Send a welcome message.
     141          self._send_textline('* OK IMAP4rev1')
     142          while 1:
     143              # Gather up input until we receive a line terminator or we timeout.
     144              # Accumulate read(1) because it's simpler to handle the differences
     145              # between naked sockets and SSL sockets.
     146              line = b''
     147              while 1:
     148                  try:
     149                      part = self.rfile.read(1)
     150                      if part == b'':
     151                          # Naked sockets return empty strings..
     152                          return
     153                      line += part
     154                  except OSError:
     155                      # ..but SSLSockets raise exceptions.
     156                      return
     157                  if line.endswith(b'\r\n'):
     158                      break
     159  
     160              if verbose:
     161                  print('GOT: %r' % line.strip())
     162              if self.continuation:
     163                  try:
     164                      self.continuation.send(line)
     165                  except StopIteration:
     166                      self.continuation = None
     167                  continue
     168              splitline = line.decode('ASCII').split()
     169              tag = splitline[0]
     170              cmd = splitline[1]
     171              args = splitline[2:]
     172  
     173              if hasattr(self, 'cmd_' + cmd):
     174                  continuation = getattr(self, 'cmd_' + cmd)(tag, args)
     175                  if continuation:
     176                      self.continuation = continuation
     177                      next(continuation)
     178              else:
     179                  self._send_tagged(tag, 'BAD', cmd + ' unknown')
     180  
     181      def cmd_CAPABILITY(self, tag, args):
     182          caps = ('IMAP4rev1 ' + self.capabilities
     183                  if self.capabilities
     184                  else 'IMAP4rev1')
     185          self._send_textline('* CAPABILITY ' + caps)
     186          self._send_tagged(tag, 'OK', 'CAPABILITY completed')
     187  
     188      def cmd_LOGOUT(self, tag, args):
     189          self.server.logged = None
     190          self._send_textline('* BYE IMAP4ref1 Server logging out')
     191          self._send_tagged(tag, 'OK', 'LOGOUT completed')
     192  
     193      def cmd_LOGIN(self, tag, args):
     194          self.server.logged = args[0]
     195          self._send_tagged(tag, 'OK', 'LOGIN completed')
     196  
     197      def cmd_SELECT(self, tag, args):
     198          self.server.is_selected = True
     199          self._send_line(b'* 2 EXISTS')
     200          self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.')
     201  
     202      def cmd_UNSELECT(self, tag, args):
     203          if self.server.is_selected:
     204              self.server.is_selected = False
     205              self._send_tagged(tag, 'OK', 'Returned to authenticated state. (Success)')
     206          else:
     207              self._send_tagged(tag, 'BAD', 'No mailbox selected')
     208  
     209  
     210  class ESC[4;38;5;81mNewIMAPTestsMixin():
     211      client = None
     212  
     213      def _setup(self, imap_handler, connect=True):
     214          """
     215          Sets up imap_handler for tests. imap_handler should inherit from either:
     216          - SimpleIMAPHandler - for testing IMAP commands,
     217          - socketserver.StreamRequestHandler - if raw access to stream is needed.
     218          Returns (client, server).
     219          """
     220          class ESC[4;38;5;81mTestTCPServer(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149mserver_class):
     221              def handle_error(self, request, client_address):
     222                  """
     223                  End request and raise the error if one occurs.
     224                  """
     225                  self.close_request(request)
     226                  self.server_close()
     227                  raise
     228  
     229          self.addCleanup(self._cleanup)
     230          self.server = self.server_class((socket_helper.HOST, 0), imap_handler)
     231          self.thread = threading.Thread(
     232              name=self._testMethodName+'-server',
     233              target=self.server.serve_forever,
     234              # Short poll interval to make the test finish quickly.
     235              # Time between requests is short enough that we won't wake
     236              # up spuriously too many times.
     237              kwargs={'poll_interval': 0.01})
     238          self.thread.daemon = True  # In case this function raises.
     239          self.thread.start()
     240  
     241          if connect:
     242              self.client = self.imap_class(*self.server.server_address)
     243  
     244          return self.client, self.server
     245  
     246      def _cleanup(self):
     247          """
     248          Cleans up the test server. This method should not be called manually,
     249          it is added to the cleanup queue in the _setup method already.
     250          """
     251          # if logout was called already we'd raise an exception trying to
     252          # shutdown the client once again
     253          if self.client is not None and self.client.state != 'LOGOUT':
     254              self.client.shutdown()
     255          # cleanup the server
     256          self.server.shutdown()
     257          self.server.server_close()
     258          threading_helper.join_thread(self.thread)
     259          # Explicitly clear the attribute to prevent dangling thread
     260          self.thread = None
     261  
     262      def test_EOF_without_complete_welcome_message(self):
     263          # http://bugs.python.org/issue5949
     264          class ESC[4;38;5;81mEOFHandler(ESC[4;38;5;149msocketserverESC[4;38;5;149m.ESC[4;38;5;149mStreamRequestHandler):
     265              def handle(self):
     266                  self.wfile.write(b'* OK')
     267          _, server = self._setup(EOFHandler, connect=False)
     268          self.assertRaises(imaplib.IMAP4.abort, self.imap_class,
     269                            *server.server_address)
     270  
     271      def test_line_termination(self):
     272          class ESC[4;38;5;81mBadNewlineHandler(ESC[4;38;5;149mSimpleIMAPHandler):
     273              def cmd_CAPABILITY(self, tag, args):
     274                  self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
     275                  self._send_tagged(tag, 'OK', 'CAPABILITY completed')
     276          _, server = self._setup(BadNewlineHandler, connect=False)
     277          self.assertRaises(imaplib.IMAP4.abort, self.imap_class,
     278                            *server.server_address)
     279  
     280      def test_enable_raises_error_if_not_AUTH(self):
     281          class ESC[4;38;5;81mEnableHandler(ESC[4;38;5;149mSimpleIMAPHandler):
     282              capabilities = 'AUTH ENABLE UTF8=ACCEPT'
     283          client, _ = self._setup(EnableHandler)
     284          self.assertFalse(client.utf8_enabled)
     285          with self.assertRaisesRegex(imaplib.IMAP4.error, 'ENABLE.*NONAUTH'):
     286              client.enable('foo')
     287          self.assertFalse(client.utf8_enabled)
     288  
     289      def test_enable_raises_error_if_no_capability(self):
     290          client, _ = self._setup(SimpleIMAPHandler)
     291          with self.assertRaisesRegex(imaplib.IMAP4.error,
     292                  'does not support ENABLE'):
     293              client.enable('foo')
     294  
     295      def test_enable_UTF8_raises_error_if_not_supported(self):
     296          client, _ = self._setup(SimpleIMAPHandler)
     297          typ, data = client.login('user', 'pass')
     298          self.assertEqual(typ, 'OK')
     299          with self.assertRaisesRegex(imaplib.IMAP4.error,
     300                  'does not support ENABLE'):
     301              client.enable('UTF8=ACCEPT')
     302  
     303      def test_enable_UTF8_True_append(self):
     304          class ESC[4;38;5;81mUTF8AppendServer(ESC[4;38;5;149mSimpleIMAPHandler):
     305              capabilities = 'ENABLE UTF8=ACCEPT'
     306              def cmd_ENABLE(self, tag, args):
     307                  self._send_tagged(tag, 'OK', 'ENABLE successful')
     308              def cmd_AUTHENTICATE(self, tag, args):
     309                  self._send_textline('+')
     310                  self.server.response = yield
     311                  self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
     312              def cmd_APPEND(self, tag, args):
     313                  self._send_textline('+')
     314                  self.server.response = yield
     315                  self._send_tagged(tag, 'OK', 'okay')
     316          client, server = self._setup(UTF8AppendServer)
     317          self.assertEqual(client._encoding, 'ascii')
     318          code, _ = client.authenticate('MYAUTH', lambda x: b'fake')
     319          self.assertEqual(code, 'OK')
     320          self.assertEqual(server.response, b'ZmFrZQ==\r\n')  # b64 encoded 'fake'
     321          code, _ = client.enable('UTF8=ACCEPT')
     322          self.assertEqual(code, 'OK')
     323          self.assertEqual(client._encoding, 'utf-8')
     324          msg_string = 'Subject: üñí©öðé'
     325          typ, data = client.append(None, None, None, msg_string.encode('utf-8'))
     326          self.assertEqual(typ, 'OK')
     327          self.assertEqual(server.response,
     328              ('UTF8 (%s)\r\n' % msg_string).encode('utf-8'))
     329  
     330      def test_search_disallows_charset_in_utf8_mode(self):
     331          class ESC[4;38;5;81mUTF8Server(ESC[4;38;5;149mSimpleIMAPHandler):
     332              capabilities = 'AUTH ENABLE UTF8=ACCEPT'
     333              def cmd_ENABLE(self, tag, args):
     334                  self._send_tagged(tag, 'OK', 'ENABLE successful')
     335              def cmd_AUTHENTICATE(self, tag, args):
     336                  self._send_textline('+')
     337                  self.server.response = yield
     338                  self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
     339          client, _ = self._setup(UTF8Server)
     340          typ, _ = client.authenticate('MYAUTH', lambda x: b'fake')
     341          self.assertEqual(typ, 'OK')
     342          typ, _ = client.enable('UTF8=ACCEPT')
     343          self.assertEqual(typ, 'OK')
     344          self.assertTrue(client.utf8_enabled)
     345          with self.assertRaisesRegex(imaplib.IMAP4.error, 'charset.*UTF8'):
     346              client.search('foo', 'bar')
     347  
     348      def test_bad_auth_name(self):
     349          class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
     350              def cmd_AUTHENTICATE(self, tag, args):
     351                  self._send_tagged(tag, 'NO',
     352                      'unrecognized authentication type {}'.format(args[0]))
     353          client, _ = self._setup(MyServer)
     354          with self.assertRaisesRegex(imaplib.IMAP4.error,
     355                  'unrecognized authentication type METHOD'):
     356              client.authenticate('METHOD', lambda: 1)
     357  
     358      def test_invalid_authentication(self):
     359          class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
     360              def cmd_AUTHENTICATE(self, tag, args):
     361                  self._send_textline('+')
     362                  self.response = yield
     363                  self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')
     364          client, _ = self._setup(MyServer)
     365          with self.assertRaisesRegex(imaplib.IMAP4.error,
     366                  r'\[AUTHENTICATIONFAILED\] invalid'):
     367              client.authenticate('MYAUTH', lambda x: b'fake')
     368  
     369      def test_valid_authentication_bytes(self):
     370          class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
     371              def cmd_AUTHENTICATE(self, tag, args):
     372                  self._send_textline('+')
     373                  self.server.response = yield
     374                  self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
     375          client, server = self._setup(MyServer)
     376          code, _ = client.authenticate('MYAUTH', lambda x: b'fake')
     377          self.assertEqual(code, 'OK')
     378          self.assertEqual(server.response, b'ZmFrZQ==\r\n')  # b64 encoded 'fake'
     379  
     380      def test_valid_authentication_plain_text(self):
     381          class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
     382              def cmd_AUTHENTICATE(self, tag, args):
     383                  self._send_textline('+')
     384                  self.server.response = yield
     385                  self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
     386          client, server = self._setup(MyServer)
     387          code, _ = client.authenticate('MYAUTH', lambda x: 'fake')
     388          self.assertEqual(code, 'OK')
     389          self.assertEqual(server.response, b'ZmFrZQ==\r\n')  # b64 encoded 'fake'
     390  
     391      @hashlib_helper.requires_hashdigest('md5', openssl=True)
     392      def test_login_cram_md5_bytes(self):
     393          class ESC[4;38;5;81mAuthHandler(ESC[4;38;5;149mSimpleIMAPHandler):
     394              capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
     395              def cmd_AUTHENTICATE(self, tag, args):
     396                  self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
     397                                      'VzdG9uLm1jaS5uZXQ=')
     398                  r = yield
     399                  if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT'
     400                           b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'):
     401                      self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
     402                  else:
     403                      self._send_tagged(tag, 'NO', 'No access')
     404          client, _ = self._setup(AuthHandler)
     405          self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
     406          ret, _ = client.login_cram_md5("tim", b"tanstaaftanstaaf")
     407          self.assertEqual(ret, "OK")
     408  
     409      @hashlib_helper.requires_hashdigest('md5', openssl=True)
     410      def test_login_cram_md5_plain_text(self):
     411          class ESC[4;38;5;81mAuthHandler(ESC[4;38;5;149mSimpleIMAPHandler):
     412              capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
     413              def cmd_AUTHENTICATE(self, tag, args):
     414                  self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
     415                                      'VzdG9uLm1jaS5uZXQ=')
     416                  r = yield
     417                  if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT'
     418                           b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'):
     419                      self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
     420                  else:
     421                      self._send_tagged(tag, 'NO', 'No access')
     422          client, _ = self._setup(AuthHandler)
     423          self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
     424          ret, _ = client.login_cram_md5("tim", "tanstaaftanstaaf")
     425          self.assertEqual(ret, "OK")
     426  
     427      def test_aborted_authentication(self):
     428          class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
     429              def cmd_AUTHENTICATE(self, tag, args):
     430                  self._send_textline('+')
     431                  self.response = yield
     432                  if self.response == b'*\r\n':
     433                      self._send_tagged(
     434                          tag,
     435                          'NO',
     436                          '[AUTHENTICATIONFAILED] aborted')
     437                  else:
     438                      self._send_tagged(tag, 'OK', 'MYAUTH successful')
     439          client, _ = self._setup(MyServer)
     440          with self.assertRaisesRegex(imaplib.IMAP4.error,
     441                  r'\[AUTHENTICATIONFAILED\] aborted'):
     442              client.authenticate('MYAUTH', lambda x: None)
     443  
     444      @mock.patch('imaplib._MAXLINE', 10)
     445      def test_linetoolong(self):
     446          class ESC[4;38;5;81mTooLongHandler(ESC[4;38;5;149mSimpleIMAPHandler):
     447              def handle(self):
     448                  # send response line longer than the limit set in the next line
     449                  self.wfile.write(b'* OK ' + 11 * b'x' + b'\r\n')
     450          _, server = self._setup(TooLongHandler, connect=False)
     451          with self.assertRaisesRegex(imaplib.IMAP4.error,
     452                  'got more than 10 bytes'):
     453              self.imap_class(*server.server_address)
     454  
     455      def test_simple_with_statement(self):
     456          _, server = self._setup(SimpleIMAPHandler, connect=False)
     457          with self.imap_class(*server.server_address):
     458              pass
     459  
     460      @requires_resource('walltime')
     461      def test_imaplib_timeout_test(self):
     462          _, server = self._setup(SimpleIMAPHandler)
     463          addr = server.server_address[1]
     464          client = self.imap_class("localhost", addr, timeout=None)
     465          self.assertEqual(client.sock.timeout, None)
     466          client.shutdown()
     467          client = self.imap_class("localhost", addr, timeout=support.LOOPBACK_TIMEOUT)
     468          self.assertEqual(client.sock.timeout, support.LOOPBACK_TIMEOUT)
     469          client.shutdown()
     470          with self.assertRaises(ValueError):
     471              client = self.imap_class("localhost", addr, timeout=0)
     472  
     473      def test_imaplib_timeout_functionality_test(self):
     474          class ESC[4;38;5;81mTimeoutHandler(ESC[4;38;5;149mSimpleIMAPHandler):
     475              def handle(self):
     476                  time.sleep(1)
     477                  SimpleIMAPHandler.handle(self)
     478  
     479          _, server = self._setup(TimeoutHandler)
     480          addr = server.server_address[1]
     481          with self.assertRaises(TimeoutError):
     482              client = self.imap_class("localhost", addr, timeout=0.001)
     483  
     484      def test_with_statement(self):
     485          _, server = self._setup(SimpleIMAPHandler, connect=False)
     486          with self.imap_class(*server.server_address) as imap:
     487              imap.login('user', 'pass')
     488              self.assertEqual(server.logged, 'user')
     489          self.assertIsNone(server.logged)
     490  
     491      def test_with_statement_logout(self):
     492          # It is legal to log out explicitly inside the with block
     493          _, server = self._setup(SimpleIMAPHandler, connect=False)
     494          with self.imap_class(*server.server_address) as imap:
     495              imap.login('user', 'pass')
     496              self.assertEqual(server.logged, 'user')
     497              imap.logout()
     498              self.assertIsNone(server.logged)
     499          self.assertIsNone(server.logged)
     500  
     501      # command tests
     502  
     503      def test_login(self):
     504          client, _ = self._setup(SimpleIMAPHandler)
     505          typ, data = client.login('user', 'pass')
     506          self.assertEqual(typ, 'OK')
     507          self.assertEqual(data[0], b'LOGIN completed')
     508          self.assertEqual(client.state, 'AUTH')
     509  
     510      def test_logout(self):
     511          client, _ = self._setup(SimpleIMAPHandler)
     512          typ, data = client.login('user', 'pass')
     513          self.assertEqual(typ, 'OK')
     514          self.assertEqual(data[0], b'LOGIN completed')
     515          typ, data = client.logout()
     516          self.assertEqual(typ, 'BYE', (typ, data))
     517          self.assertEqual(data[0], b'IMAP4ref1 Server logging out', (typ, data))
     518          self.assertEqual(client.state, 'LOGOUT')
     519  
     520      def test_lsub(self):
     521          class ESC[4;38;5;81mLsubCmd(ESC[4;38;5;149mSimpleIMAPHandler):
     522              def cmd_LSUB(self, tag, args):
     523                  self._send_textline('* LSUB () "." directoryA')
     524                  return self._send_tagged(tag, 'OK', 'LSUB completed')
     525          client, _ = self._setup(LsubCmd)
     526          client.login('user', 'pass')
     527          typ, data = client.lsub()
     528          self.assertEqual(typ, 'OK')
     529          self.assertEqual(data[0], b'() "." directoryA')
     530  
     531      def test_unselect(self):
     532          client, _ = self._setup(SimpleIMAPHandler)
     533          client.login('user', 'pass')
     534          typ, data = client.select()
     535          self.assertEqual(typ, 'OK')
     536          self.assertEqual(data[0], b'2')
     537  
     538          typ, data = client.unselect()
     539          self.assertEqual(typ, 'OK')
     540          self.assertEqual(data[0], b'Returned to authenticated state. (Success)')
     541          self.assertEqual(client.state, 'AUTH')
     542  
     543  
     544  class ESC[4;38;5;81mNewIMAPTests(ESC[4;38;5;149mNewIMAPTestsMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     545      imap_class = imaplib.IMAP4
     546      server_class = socketserver.TCPServer
     547  
     548  
     549  @unittest.skipUnless(ssl, "SSL not available")
     550  class ESC[4;38;5;81mNewIMAPSSLTests(ESC[4;38;5;149mNewIMAPTestsMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     551      imap_class = IMAP4_SSL
     552      server_class = SecureTCPServer
     553  
     554      @requires_resource('walltime')
     555      def test_ssl_raises(self):
     556          ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
     557          self.assertEqual(ssl_context.verify_mode, ssl.CERT_REQUIRED)
     558          self.assertEqual(ssl_context.check_hostname, True)
     559          ssl_context.load_verify_locations(CAFILE)
     560  
     561          with self.assertRaisesRegex(ssl.CertificateError,
     562                  "IP address mismatch, certificate is not valid for "
     563                  "'127.0.0.1'"):
     564              _, server = self._setup(SimpleIMAPHandler)
     565              client = self.imap_class(*server.server_address,
     566                                       ssl_context=ssl_context)
     567              client.shutdown()
     568  
     569      @requires_resource('walltime')
     570      def test_ssl_verified(self):
     571          ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
     572          ssl_context.load_verify_locations(CAFILE)
     573  
     574          _, server = self._setup(SimpleIMAPHandler)
     575          client = self.imap_class("localhost", server.server_address[1],
     576                                   ssl_context=ssl_context)
     577          client.shutdown()
     578  
     579  class ESC[4;38;5;81mThreadedNetworkedTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     580      server_class = socketserver.TCPServer
     581      imap_class = imaplib.IMAP4
     582  
     583      def make_server(self, addr, hdlr):
     584  
     585          class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149mserver_class):
     586              def handle_error(self, request, client_address):
     587                  self.close_request(request)
     588                  self.server_close()
     589                  raise
     590  
     591          if verbose:
     592              print("creating server")
     593          server = MyServer(addr, hdlr)
     594          self.assertEqual(server.server_address, server.socket.getsockname())
     595  
     596          if verbose:
     597              print("server created")
     598              print("ADDR =", addr)
     599              print("CLASS =", self.server_class)
     600              print("HDLR =", server.RequestHandlerClass)
     601  
     602          t = threading.Thread(
     603              name='%s serving' % self.server_class,
     604              target=server.serve_forever,
     605              # Short poll interval to make the test finish quickly.
     606              # Time between requests is short enough that we won't wake
     607              # up spuriously too many times.
     608              kwargs={'poll_interval': 0.01})
     609          t.daemon = True  # In case this function raises.
     610          t.start()
     611          if verbose:
     612              print("server running")
     613          return server, t
     614  
     615      def reap_server(self, server, thread):
     616          if verbose:
     617              print("waiting for server")
     618          server.shutdown()
     619          server.server_close()
     620          thread.join()
     621          if verbose:
     622              print("done")
     623  
     624      @contextmanager
     625      def reaped_server(self, hdlr):
     626          server, thread = self.make_server((socket_helper.HOST, 0), hdlr)
     627          try:
     628              yield server
     629          finally:
     630              self.reap_server(server, thread)
     631  
     632      @contextmanager
     633      def reaped_pair(self, hdlr):
     634          with self.reaped_server(hdlr) as server:
     635              client = self.imap_class(*server.server_address)
     636              try:
     637                  yield server, client
     638              finally:
     639                  client.logout()
     640  
     641      @threading_helper.reap_threads
     642      def test_connect(self):
     643          with self.reaped_server(SimpleIMAPHandler) as server:
     644              client = self.imap_class(*server.server_address)
     645              client.shutdown()
     646  
     647      @threading_helper.reap_threads
     648      def test_bracket_flags(self):
     649  
     650          # This violates RFC 3501, which disallows ']' characters in tag names,
     651          # but imaplib has allowed producing such tags forever, other programs
     652          # also produce them (eg: OtherInbox's Organizer app as of 20140716),
     653          # and Gmail, for example, accepts them and produces them.  So we
     654          # support them.  See issue #21815.
     655  
     656          class ESC[4;38;5;81mBracketFlagHandler(ESC[4;38;5;149mSimpleIMAPHandler):
     657  
     658              def handle(self):
     659                  self.flags = ['Answered', 'Flagged', 'Deleted', 'Seen', 'Draft']
     660                  super().handle()
     661  
     662              def cmd_AUTHENTICATE(self, tag, args):
     663                  self._send_textline('+')
     664                  self.server.response = yield
     665                  self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
     666  
     667              def cmd_SELECT(self, tag, args):
     668                  flag_msg = ' \\'.join(self.flags)
     669                  self._send_line(('* FLAGS (%s)' % flag_msg).encode('ascii'))
     670                  self._send_line(b'* 2 EXISTS')
     671                  self._send_line(b'* 0 RECENT')
     672                  msg = ('* OK [PERMANENTFLAGS %s \\*)] Flags permitted.'
     673                          % flag_msg)
     674                  self._send_line(msg.encode('ascii'))
     675                  self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.')
     676  
     677              def cmd_STORE(self, tag, args):
     678                  new_flags = args[2].strip('(').strip(')').split()
     679                  self.flags.extend(new_flags)
     680                  flags_msg = '(FLAGS (%s))' % ' \\'.join(self.flags)
     681                  msg = '* %s FETCH %s' % (args[0], flags_msg)
     682                  self._send_line(msg.encode('ascii'))
     683                  self._send_tagged(tag, 'OK', 'STORE completed.')
     684  
     685          with self.reaped_pair(BracketFlagHandler) as (server, client):
     686              code, data = client.authenticate('MYAUTH', lambda x: b'fake')
     687              self.assertEqual(code, 'OK')
     688              self.assertEqual(server.response, b'ZmFrZQ==\r\n')
     689              client.select('test')
     690              typ, [data] = client.store(b'1', "+FLAGS", "[test]")
     691              self.assertIn(b'[test]', data)
     692              client.select('test')
     693              typ, [data] = client.response('PERMANENTFLAGS')
     694              self.assertIn(b'[test]', data)
     695  
     696      @threading_helper.reap_threads
     697      def test_issue5949(self):
     698  
     699          class ESC[4;38;5;81mEOFHandler(ESC[4;38;5;149msocketserverESC[4;38;5;149m.ESC[4;38;5;149mStreamRequestHandler):
     700              def handle(self):
     701                  # EOF without sending a complete welcome message.
     702                  self.wfile.write(b'* OK')
     703  
     704          with self.reaped_server(EOFHandler) as server:
     705              self.assertRaises(imaplib.IMAP4.abort,
     706                                self.imap_class, *server.server_address)
     707  
     708      @threading_helper.reap_threads
     709      def test_line_termination(self):
     710  
     711          class ESC[4;38;5;81mBadNewlineHandler(ESC[4;38;5;149mSimpleIMAPHandler):
     712  
     713              def cmd_CAPABILITY(self, tag, args):
     714                  self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
     715                  self._send_tagged(tag, 'OK', 'CAPABILITY completed')
     716  
     717          with self.reaped_server(BadNewlineHandler) as server:
     718              self.assertRaises(imaplib.IMAP4.abort,
     719                                self.imap_class, *server.server_address)
     720  
     721      class ESC[4;38;5;81mUTF8Server(ESC[4;38;5;149mSimpleIMAPHandler):
     722          capabilities = 'AUTH ENABLE UTF8=ACCEPT'
     723  
     724          def cmd_ENABLE(self, tag, args):
     725              self._send_tagged(tag, 'OK', 'ENABLE successful')
     726  
     727          def cmd_AUTHENTICATE(self, tag, args):
     728              self._send_textline('+')
     729              self.server.response = yield
     730              self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
     731  
     732      @threading_helper.reap_threads
     733      def test_enable_raises_error_if_not_AUTH(self):
     734          with self.reaped_pair(self.UTF8Server) as (server, client):
     735              self.assertFalse(client.utf8_enabled)
     736              self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo')
     737              self.assertFalse(client.utf8_enabled)
     738  
     739      # XXX Also need a test that enable after SELECT raises an error.
     740  
     741      @threading_helper.reap_threads
     742      def test_enable_raises_error_if_no_capability(self):
     743          class ESC[4;38;5;81mNoEnableServer(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149mUTF8Server):
     744              capabilities = 'AUTH'
     745          with self.reaped_pair(NoEnableServer) as (server, client):
     746              self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo')
     747  
     748      @threading_helper.reap_threads
     749      def test_enable_UTF8_raises_error_if_not_supported(self):
     750          class ESC[4;38;5;81mNonUTF8Server(ESC[4;38;5;149mSimpleIMAPHandler):
     751              pass
     752          with self.assertRaises(imaplib.IMAP4.error):
     753              with self.reaped_pair(NonUTF8Server) as (server, client):
     754                  typ, data = client.login('user', 'pass')
     755                  self.assertEqual(typ, 'OK')
     756                  client.enable('UTF8=ACCEPT')
     757  
     758      @threading_helper.reap_threads
     759      def test_enable_UTF8_True_append(self):
     760  
     761          class ESC[4;38;5;81mUTF8AppendServer(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149mUTF8Server):
     762              def cmd_APPEND(self, tag, args):
     763                  self._send_textline('+')
     764                  self.server.response = yield
     765                  self._send_tagged(tag, 'OK', 'okay')
     766  
     767          with self.reaped_pair(UTF8AppendServer) as (server, client):
     768              self.assertEqual(client._encoding, 'ascii')
     769              code, _ = client.authenticate('MYAUTH', lambda x: b'fake')
     770              self.assertEqual(code, 'OK')
     771              self.assertEqual(server.response,
     772                               b'ZmFrZQ==\r\n')  # b64 encoded 'fake'
     773              code, _ = client.enable('UTF8=ACCEPT')
     774              self.assertEqual(code, 'OK')
     775              self.assertEqual(client._encoding, 'utf-8')
     776              msg_string = 'Subject: üñí©öðé'
     777              typ, data = client.append(
     778                  None, None, None, msg_string.encode('utf-8'))
     779              self.assertEqual(typ, 'OK')
     780              self.assertEqual(
     781                  server.response,
     782                  ('UTF8 (%s)\r\n' % msg_string).encode('utf-8')
     783              )
     784  
     785      # XXX also need a test that makes sure that the Literal and Untagged_status
     786      # regexes uses unicode in UTF8 mode instead of the default ASCII.
     787  
     788      @threading_helper.reap_threads
     789      def test_search_disallows_charset_in_utf8_mode(self):
     790          with self.reaped_pair(self.UTF8Server) as (server, client):
     791              typ, _ = client.authenticate('MYAUTH', lambda x: b'fake')
     792              self.assertEqual(typ, 'OK')
     793              typ, _ = client.enable('UTF8=ACCEPT')
     794              self.assertEqual(typ, 'OK')
     795              self.assertTrue(client.utf8_enabled)
     796              self.assertRaises(imaplib.IMAP4.error, client.search, 'foo', 'bar')
     797  
     798      @threading_helper.reap_threads
     799      def test_bad_auth_name(self):
     800  
     801          class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
     802  
     803              def cmd_AUTHENTICATE(self, tag, args):
     804                  self._send_tagged(tag, 'NO', 'unrecognized authentication '
     805                                    'type {}'.format(args[0]))
     806  
     807          with self.reaped_pair(MyServer) as (server, client):
     808              with self.assertRaises(imaplib.IMAP4.error):
     809                  client.authenticate('METHOD', lambda: 1)
     810  
     811      @threading_helper.reap_threads
     812      def test_invalid_authentication(self):
     813  
     814          class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
     815  
     816              def cmd_AUTHENTICATE(self, tag, args):
     817                  self._send_textline('+')
     818                  self.response = yield
     819                  self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')
     820  
     821          with self.reaped_pair(MyServer) as (server, client):
     822              with self.assertRaises(imaplib.IMAP4.error):
     823                  code, data = client.authenticate('MYAUTH', lambda x: b'fake')
     824  
     825      @threading_helper.reap_threads
     826      def test_valid_authentication(self):
     827  
     828          class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
     829  
     830              def cmd_AUTHENTICATE(self, tag, args):
     831                  self._send_textline('+')
     832                  self.server.response = yield
     833                  self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
     834  
     835          with self.reaped_pair(MyServer) as (server, client):
     836              code, data = client.authenticate('MYAUTH', lambda x: b'fake')
     837              self.assertEqual(code, 'OK')
     838              self.assertEqual(server.response,
     839                               b'ZmFrZQ==\r\n')  # b64 encoded 'fake'
     840  
     841          with self.reaped_pair(MyServer) as (server, client):
     842              code, data = client.authenticate('MYAUTH', lambda x: 'fake')
     843              self.assertEqual(code, 'OK')
     844              self.assertEqual(server.response,
     845                               b'ZmFrZQ==\r\n')  # b64 encoded 'fake'
     846  
     847      @threading_helper.reap_threads
     848      @hashlib_helper.requires_hashdigest('md5', openssl=True)
     849      def test_login_cram_md5(self):
     850  
     851          class ESC[4;38;5;81mAuthHandler(ESC[4;38;5;149mSimpleIMAPHandler):
     852  
     853              capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
     854  
     855              def cmd_AUTHENTICATE(self, tag, args):
     856                  self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
     857                                      'VzdG9uLm1jaS5uZXQ=')
     858                  r = yield
     859                  if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT'
     860                           b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'):
     861                      self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
     862                  else:
     863                      self._send_tagged(tag, 'NO', 'No access')
     864  
     865          with self.reaped_pair(AuthHandler) as (server, client):
     866              self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
     867              ret, data = client.login_cram_md5("tim", "tanstaaftanstaaf")
     868              self.assertEqual(ret, "OK")
     869  
     870          with self.reaped_pair(AuthHandler) as (server, client):
     871              self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
     872              ret, data = client.login_cram_md5("tim", b"tanstaaftanstaaf")
     873              self.assertEqual(ret, "OK")
     874  
     875  
     876      @threading_helper.reap_threads
     877      def test_aborted_authentication(self):
     878  
     879          class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
     880  
     881              def cmd_AUTHENTICATE(self, tag, args):
     882                  self._send_textline('+')
     883                  self.response = yield
     884  
     885                  if self.response == b'*\r\n':
     886                      self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] aborted')
     887                  else:
     888                      self._send_tagged(tag, 'OK', 'MYAUTH successful')
     889  
     890          with self.reaped_pair(MyServer) as (server, client):
     891              with self.assertRaises(imaplib.IMAP4.error):
     892                  code, data = client.authenticate('MYAUTH', lambda x: None)
     893  
     894  
     895      def test_linetoolong(self):
     896          class ESC[4;38;5;81mTooLongHandler(ESC[4;38;5;149mSimpleIMAPHandler):
     897              def handle(self):
     898                  # Send a very long response line
     899                  self.wfile.write(b'* OK ' + imaplib._MAXLINE * b'x' + b'\r\n')
     900  
     901          with self.reaped_server(TooLongHandler) as server:
     902              self.assertRaises(imaplib.IMAP4.error,
     903                                self.imap_class, *server.server_address)
     904  
     905      @threading_helper.reap_threads
     906      def test_simple_with_statement(self):
     907          # simplest call
     908          with self.reaped_server(SimpleIMAPHandler) as server:
     909              with self.imap_class(*server.server_address):
     910                  pass
     911  
     912      @threading_helper.reap_threads
     913      def test_with_statement(self):
     914          with self.reaped_server(SimpleIMAPHandler) as server:
     915              with self.imap_class(*server.server_address) as imap:
     916                  imap.login('user', 'pass')
     917                  self.assertEqual(server.logged, 'user')
     918              self.assertIsNone(server.logged)
     919  
     920      @threading_helper.reap_threads
     921      def test_with_statement_logout(self):
     922          # what happens if already logout in the block?
     923          with self.reaped_server(SimpleIMAPHandler) as server:
     924              with self.imap_class(*server.server_address) as imap:
     925                  imap.login('user', 'pass')
     926                  self.assertEqual(server.logged, 'user')
     927                  imap.logout()
     928                  self.assertIsNone(server.logged)
     929              self.assertIsNone(server.logged)
     930  
     931      @threading_helper.reap_threads
     932      @cpython_only
     933      @unittest.skipUnless(__debug__, "Won't work if __debug__ is False")
     934      def test_dump_ur(self):
     935          # See: http://bugs.python.org/issue26543
     936          untagged_resp_dict = {'READ-WRITE': [b'']}
     937  
     938          with self.reaped_server(SimpleIMAPHandler) as server:
     939              with self.imap_class(*server.server_address) as imap:
     940                  with mock.patch.object(imap, '_mesg') as mock_mesg:
     941                      imap._dump_ur(untagged_resp_dict)
     942                      mock_mesg.assert_called_with(
     943                          "untagged responses dump:READ-WRITE: [b'']"
     944                      )
     945  
     946  
     947  @unittest.skipUnless(ssl, "SSL not available")
     948  class ESC[4;38;5;81mThreadedNetworkedTestsSSL(ESC[4;38;5;149mThreadedNetworkedTests):
     949      server_class = SecureTCPServer
     950      imap_class = IMAP4_SSL
     951  
     952      @threading_helper.reap_threads
     953      def test_ssl_verified(self):
     954          ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
     955          ssl_context.load_verify_locations(CAFILE)
     956  
     957          with self.assertRaisesRegex(
     958                  ssl.CertificateError,
     959                  "IP address mismatch, certificate is not valid for "
     960                  "'127.0.0.1'"):
     961              with self.reaped_server(SimpleIMAPHandler) as server:
     962                  client = self.imap_class(*server.server_address,
     963                                           ssl_context=ssl_context)
     964                  client.shutdown()
     965  
     966          with self.reaped_server(SimpleIMAPHandler) as server:
     967              client = self.imap_class("localhost", server.server_address[1],
     968                                       ssl_context=ssl_context)
     969              client.shutdown()
     970  
     971  
     972  @unittest.skipUnless(
     973      support.is_resource_enabled('network'), 'network resource disabled')
     974  @unittest.skip('cyrus.andrew.cmu.edu blocks connections')
     975  class ESC[4;38;5;81mRemoteIMAPTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     976      host = 'cyrus.andrew.cmu.edu'
     977      port = 143
     978      username = 'anonymous'
     979      password = 'pass'
     980      imap_class = imaplib.IMAP4
     981  
     982      def setUp(self):
     983          with socket_helper.transient_internet(self.host):
     984              self.server = self.imap_class(self.host, self.port)
     985  
     986      def tearDown(self):
     987          if self.server is not None:
     988              with socket_helper.transient_internet(self.host):
     989                  self.server.logout()
     990  
     991      def test_logincapa(self):
     992          with socket_helper.transient_internet(self.host):
     993              for cap in self.server.capabilities:
     994                  self.assertIsInstance(cap, str)
     995              self.assertIn('LOGINDISABLED', self.server.capabilities)
     996              self.assertIn('AUTH=ANONYMOUS', self.server.capabilities)
     997              rs = self.server.login(self.username, self.password)
     998              self.assertEqual(rs[0], 'OK')
     999  
    1000      def test_logout(self):
    1001          with socket_helper.transient_internet(self.host):
    1002              rs = self.server.logout()
    1003              self.server = None
    1004              self.assertEqual(rs[0], 'BYE', rs)
    1005  
    1006  
    1007  @unittest.skipUnless(ssl, "SSL not available")
    1008  @unittest.skipUnless(
    1009      support.is_resource_enabled('network'), 'network resource disabled')
    1010  @unittest.skip('cyrus.andrew.cmu.edu blocks connections')
    1011  class ESC[4;38;5;81mRemoteIMAP_STARTTLSTest(ESC[4;38;5;149mRemoteIMAPTest):
    1012  
    1013      def setUp(self):
    1014          super().setUp()
    1015          with socket_helper.transient_internet(self.host):
    1016              rs = self.server.starttls()
    1017              self.assertEqual(rs[0], 'OK')
    1018  
    1019      def test_logincapa(self):
    1020          for cap in self.server.capabilities:
    1021              self.assertIsInstance(cap, str)
    1022          self.assertNotIn('LOGINDISABLED', self.server.capabilities)
    1023  
    1024  
    1025  @unittest.skipUnless(ssl, "SSL not available")
    1026  @unittest.skip('cyrus.andrew.cmu.edu blocks connections')
    1027  class ESC[4;38;5;81mRemoteIMAP_SSLTest(ESC[4;38;5;149mRemoteIMAPTest):
    1028      port = 993
    1029      imap_class = IMAP4_SSL
    1030  
    1031      def setUp(self):
    1032          pass
    1033  
    1034      def tearDown(self):
    1035          pass
    1036  
    1037      def create_ssl_context(self):
    1038          ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    1039          ssl_context.check_hostname = False
    1040          ssl_context.verify_mode = ssl.CERT_NONE
    1041          ssl_context.load_cert_chain(CERTFILE)
    1042          return ssl_context
    1043  
    1044      def check_logincapa(self, server):
    1045          try:
    1046              for cap in server.capabilities:
    1047                  self.assertIsInstance(cap, str)
    1048              self.assertNotIn('LOGINDISABLED', server.capabilities)
    1049              self.assertIn('AUTH=PLAIN', server.capabilities)
    1050              rs = server.login(self.username, self.password)
    1051              self.assertEqual(rs[0], 'OK')
    1052          finally:
    1053              server.logout()
    1054  
    1055      def test_logincapa(self):
    1056          with socket_helper.transient_internet(self.host):
    1057              _server = self.imap_class(self.host, self.port)
    1058              self.check_logincapa(_server)
    1059  
    1060      def test_logout(self):
    1061          with socket_helper.transient_internet(self.host):
    1062              _server = self.imap_class(self.host, self.port)
    1063              rs = _server.logout()
    1064              self.assertEqual(rs[0], 'BYE', rs)
    1065  
    1066  
    1067  if __name__ == "__main__":
    1068      unittest.main()