python (3.12.0)
1 from test import support
2 from test.support import socket_helper
3
4 from contextlib import contextmanager
5 import imaplib
6 import os.path
7 import socketserver
8 import time
9 import calendar
10 import threading
11 import socket
12
13 from test.support import verbose, run_with_tz, run_with_locale, cpython_only, requires_resource
14 from test.support import hashlib_helper
15 from test.support import threading_helper
16 import unittest
17 from unittest import mock
18 from datetime import datetime, timezone, timedelta
19 try:
20 import ssl
21 except ImportError:
22 ssl = None
23
24 support.requires_working_socket(module=True)
25
26 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem")
27 CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem")
28
29
30 class ESC[4;38;5;81mTestImaplib(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
31
32 def test_Internaldate2tuple(self):
33 t0 = calendar.timegm((2000, 1, 1, 0, 0, 0, -1, -1, -1))
34 tt = imaplib.Internaldate2tuple(
35 b'25 (INTERNALDATE "01-Jan-2000 00:00:00 +0000")')
36 self.assertEqual(time.mktime(tt), t0)
37 tt = imaplib.Internaldate2tuple(
38 b'25 (INTERNALDATE "01-Jan-2000 11:30:00 +1130")')
39 self.assertEqual(time.mktime(tt), t0)
40 tt = imaplib.Internaldate2tuple(
41 b'25 (INTERNALDATE "31-Dec-1999 12:30:00 -1130")')
42 self.assertEqual(time.mktime(tt), t0)
43
44 @run_with_tz('MST+07MDT,M4.1.0,M10.5.0')
45 def test_Internaldate2tuple_issue10941(self):
46 self.assertNotEqual(imaplib.Internaldate2tuple(
47 b'25 (INTERNALDATE "02-Apr-2000 02:30:00 +0000")'),
48 imaplib.Internaldate2tuple(
49 b'25 (INTERNALDATE "02-Apr-2000 03:30:00 +0000")'))
50
51 def timevalues(self):
52 return [2000000000, 2000000000.0, time.localtime(2000000000),
53 (2033, 5, 18, 5, 33, 20, -1, -1, -1),
54 (2033, 5, 18, 5, 33, 20, -1, -1, 1),
55 datetime.fromtimestamp(2000000000,
56 timezone(timedelta(0, 2 * 60 * 60))),
57 '"18-May-2033 05:33:20 +0200"']
58
59 @run_with_locale('LC_ALL', 'de_DE', 'fr_FR')
60 # DST rules included to work around quirk where the Gnu C library may not
61 # otherwise restore the previous time zone
62 @run_with_tz('STD-1DST,M3.2.0,M11.1.0')
63 def test_Time2Internaldate(self):
64 expected = '"18-May-2033 05:33:20 +0200"'
65
66 for t in self.timevalues():
67 internal = imaplib.Time2Internaldate(t)
68 self.assertEqual(internal, expected)
69
70 def test_that_Time2Internaldate_returns_a_result(self):
71 # Without tzset, we can check only that it successfully
72 # produces a result, not the correctness of the result itself,
73 # since the result depends on the timezone the machine is in.
74 for t in self.timevalues():
75 imaplib.Time2Internaldate(t)
76
77 @socket_helper.skip_if_tcp_blackhole
78 def test_imap4_host_default_value(self):
79 # Check whether the IMAP4_PORT is truly unavailable.
80 with socket.socket() as s:
81 try:
82 s.connect(('', imaplib.IMAP4_PORT))
83 self.skipTest(
84 "Cannot run the test with local IMAP server running.")
85 except socket.error:
86 pass
87
88 # This is the exception that should be raised.
89 expected_errnos = socket_helper.get_socket_conn_refused_errs()
90 with self.assertRaises(OSError) as cm:
91 imaplib.IMAP4()
92 self.assertIn(cm.exception.errno, expected_errnos)
93
94
95 if ssl:
96 class ESC[4;38;5;81mSecureTCPServer(ESC[4;38;5;149msocketserverESC[4;38;5;149m.ESC[4;38;5;149mTCPServer):
97
98 def get_request(self):
99 newsocket, fromaddr = self.socket.accept()
100 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
101 context.load_cert_chain(CERTFILE)
102 connstream = context.wrap_socket(newsocket, server_side=True)
103 return connstream, fromaddr
104
105 IMAP4_SSL = imaplib.IMAP4_SSL
106
107 else:
108
109 class ESC[4;38;5;81mSecureTCPServer:
110 pass
111
112 IMAP4_SSL = None
113
114
115 class ESC[4;38;5;81mSimpleIMAPHandler(ESC[4;38;5;149msocketserverESC[4;38;5;149m.ESC[4;38;5;149mStreamRequestHandler):
116 timeout = support.LOOPBACK_TIMEOUT
117 continuation = None
118 capabilities = ''
119
120 def setup(self):
121 super().setup()
122 self.server.is_selected = False
123 self.server.logged = None
124
125 def _send(self, message):
126 if verbose:
127 print("SENT: %r" % message.strip())
128 self.wfile.write(message)
129
130 def _send_line(self, message):
131 self._send(message + b'\r\n')
132
133 def _send_textline(self, message):
134 self._send_line(message.encode('ASCII'))
135
136 def _send_tagged(self, tag, code, message):
137 self._send_textline(' '.join((tag, code, message)))
138
139 def handle(self):
140 # Send a welcome message.
141 self._send_textline('* OK IMAP4rev1')
142 while 1:
143 # Gather up input until we receive a line terminator or we timeout.
144 # Accumulate read(1) because it's simpler to handle the differences
145 # between naked sockets and SSL sockets.
146 line = b''
147 while 1:
148 try:
149 part = self.rfile.read(1)
150 if part == b'':
151 # Naked sockets return empty strings..
152 return
153 line += part
154 except OSError:
155 # ..but SSLSockets raise exceptions.
156 return
157 if line.endswith(b'\r\n'):
158 break
159
160 if verbose:
161 print('GOT: %r' % line.strip())
162 if self.continuation:
163 try:
164 self.continuation.send(line)
165 except StopIteration:
166 self.continuation = None
167 continue
168 splitline = line.decode('ASCII').split()
169 tag = splitline[0]
170 cmd = splitline[1]
171 args = splitline[2:]
172
173 if hasattr(self, 'cmd_' + cmd):
174 continuation = getattr(self, 'cmd_' + cmd)(tag, args)
175 if continuation:
176 self.continuation = continuation
177 next(continuation)
178 else:
179 self._send_tagged(tag, 'BAD', cmd + ' unknown')
180
181 def cmd_CAPABILITY(self, tag, args):
182 caps = ('IMAP4rev1 ' + self.capabilities
183 if self.capabilities
184 else 'IMAP4rev1')
185 self._send_textline('* CAPABILITY ' + caps)
186 self._send_tagged(tag, 'OK', 'CAPABILITY completed')
187
188 def cmd_LOGOUT(self, tag, args):
189 self.server.logged = None
190 self._send_textline('* BYE IMAP4ref1 Server logging out')
191 self._send_tagged(tag, 'OK', 'LOGOUT completed')
192
193 def cmd_LOGIN(self, tag, args):
194 self.server.logged = args[0]
195 self._send_tagged(tag, 'OK', 'LOGIN completed')
196
197 def cmd_SELECT(self, tag, args):
198 self.server.is_selected = True
199 self._send_line(b'* 2 EXISTS')
200 self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.')
201
202 def cmd_UNSELECT(self, tag, args):
203 if self.server.is_selected:
204 self.server.is_selected = False
205 self._send_tagged(tag, 'OK', 'Returned to authenticated state. (Success)')
206 else:
207 self._send_tagged(tag, 'BAD', 'No mailbox selected')
208
209
210 class ESC[4;38;5;81mNewIMAPTestsMixin():
211 client = None
212
213 def _setup(self, imap_handler, connect=True):
214 """
215 Sets up imap_handler for tests. imap_handler should inherit from either:
216 - SimpleIMAPHandler - for testing IMAP commands,
217 - socketserver.StreamRequestHandler - if raw access to stream is needed.
218 Returns (client, server).
219 """
220 class ESC[4;38;5;81mTestTCPServer(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149mserver_class):
221 def handle_error(self, request, client_address):
222 """
223 End request and raise the error if one occurs.
224 """
225 self.close_request(request)
226 self.server_close()
227 raise
228
229 self.addCleanup(self._cleanup)
230 self.server = self.server_class((socket_helper.HOST, 0), imap_handler)
231 self.thread = threading.Thread(
232 name=self._testMethodName+'-server',
233 target=self.server.serve_forever,
234 # Short poll interval to make the test finish quickly.
235 # Time between requests is short enough that we won't wake
236 # up spuriously too many times.
237 kwargs={'poll_interval': 0.01})
238 self.thread.daemon = True # In case this function raises.
239 self.thread.start()
240
241 if connect:
242 self.client = self.imap_class(*self.server.server_address)
243
244 return self.client, self.server
245
246 def _cleanup(self):
247 """
248 Cleans up the test server. This method should not be called manually,
249 it is added to the cleanup queue in the _setup method already.
250 """
251 # if logout was called already we'd raise an exception trying to
252 # shutdown the client once again
253 if self.client is not None and self.client.state != 'LOGOUT':
254 self.client.shutdown()
255 # cleanup the server
256 self.server.shutdown()
257 self.server.server_close()
258 threading_helper.join_thread(self.thread)
259 # Explicitly clear the attribute to prevent dangling thread
260 self.thread = None
261
262 def test_EOF_without_complete_welcome_message(self):
263 # http://bugs.python.org/issue5949
264 class ESC[4;38;5;81mEOFHandler(ESC[4;38;5;149msocketserverESC[4;38;5;149m.ESC[4;38;5;149mStreamRequestHandler):
265 def handle(self):
266 self.wfile.write(b'* OK')
267 _, server = self._setup(EOFHandler, connect=False)
268 self.assertRaises(imaplib.IMAP4.abort, self.imap_class,
269 *server.server_address)
270
271 def test_line_termination(self):
272 class ESC[4;38;5;81mBadNewlineHandler(ESC[4;38;5;149mSimpleIMAPHandler):
273 def cmd_CAPABILITY(self, tag, args):
274 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
275 self._send_tagged(tag, 'OK', 'CAPABILITY completed')
276 _, server = self._setup(BadNewlineHandler, connect=False)
277 self.assertRaises(imaplib.IMAP4.abort, self.imap_class,
278 *server.server_address)
279
280 def test_enable_raises_error_if_not_AUTH(self):
281 class ESC[4;38;5;81mEnableHandler(ESC[4;38;5;149mSimpleIMAPHandler):
282 capabilities = 'AUTH ENABLE UTF8=ACCEPT'
283 client, _ = self._setup(EnableHandler)
284 self.assertFalse(client.utf8_enabled)
285 with self.assertRaisesRegex(imaplib.IMAP4.error, 'ENABLE.*NONAUTH'):
286 client.enable('foo')
287 self.assertFalse(client.utf8_enabled)
288
289 def test_enable_raises_error_if_no_capability(self):
290 client, _ = self._setup(SimpleIMAPHandler)
291 with self.assertRaisesRegex(imaplib.IMAP4.error,
292 'does not support ENABLE'):
293 client.enable('foo')
294
295 def test_enable_UTF8_raises_error_if_not_supported(self):
296 client, _ = self._setup(SimpleIMAPHandler)
297 typ, data = client.login('user', 'pass')
298 self.assertEqual(typ, 'OK')
299 with self.assertRaisesRegex(imaplib.IMAP4.error,
300 'does not support ENABLE'):
301 client.enable('UTF8=ACCEPT')
302
303 def test_enable_UTF8_True_append(self):
304 class ESC[4;38;5;81mUTF8AppendServer(ESC[4;38;5;149mSimpleIMAPHandler):
305 capabilities = 'ENABLE UTF8=ACCEPT'
306 def cmd_ENABLE(self, tag, args):
307 self._send_tagged(tag, 'OK', 'ENABLE successful')
308 def cmd_AUTHENTICATE(self, tag, args):
309 self._send_textline('+')
310 self.server.response = yield
311 self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
312 def cmd_APPEND(self, tag, args):
313 self._send_textline('+')
314 self.server.response = yield
315 self._send_tagged(tag, 'OK', 'okay')
316 client, server = self._setup(UTF8AppendServer)
317 self.assertEqual(client._encoding, 'ascii')
318 code, _ = client.authenticate('MYAUTH', lambda x: b'fake')
319 self.assertEqual(code, 'OK')
320 self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake'
321 code, _ = client.enable('UTF8=ACCEPT')
322 self.assertEqual(code, 'OK')
323 self.assertEqual(client._encoding, 'utf-8')
324 msg_string = 'Subject: üñéöðé'
325 typ, data = client.append(None, None, None, msg_string.encode('utf-8'))
326 self.assertEqual(typ, 'OK')
327 self.assertEqual(server.response,
328 ('UTF8 (%s)\r\n' % msg_string).encode('utf-8'))
329
330 def test_search_disallows_charset_in_utf8_mode(self):
331 class ESC[4;38;5;81mUTF8Server(ESC[4;38;5;149mSimpleIMAPHandler):
332 capabilities = 'AUTH ENABLE UTF8=ACCEPT'
333 def cmd_ENABLE(self, tag, args):
334 self._send_tagged(tag, 'OK', 'ENABLE successful')
335 def cmd_AUTHENTICATE(self, tag, args):
336 self._send_textline('+')
337 self.server.response = yield
338 self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
339 client, _ = self._setup(UTF8Server)
340 typ, _ = client.authenticate('MYAUTH', lambda x: b'fake')
341 self.assertEqual(typ, 'OK')
342 typ, _ = client.enable('UTF8=ACCEPT')
343 self.assertEqual(typ, 'OK')
344 self.assertTrue(client.utf8_enabled)
345 with self.assertRaisesRegex(imaplib.IMAP4.error, 'charset.*UTF8'):
346 client.search('foo', 'bar')
347
348 def test_bad_auth_name(self):
349 class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
350 def cmd_AUTHENTICATE(self, tag, args):
351 self._send_tagged(tag, 'NO',
352 'unrecognized authentication type {}'.format(args[0]))
353 client, _ = self._setup(MyServer)
354 with self.assertRaisesRegex(imaplib.IMAP4.error,
355 'unrecognized authentication type METHOD'):
356 client.authenticate('METHOD', lambda: 1)
357
358 def test_invalid_authentication(self):
359 class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
360 def cmd_AUTHENTICATE(self, tag, args):
361 self._send_textline('+')
362 self.response = yield
363 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')
364 client, _ = self._setup(MyServer)
365 with self.assertRaisesRegex(imaplib.IMAP4.error,
366 r'\[AUTHENTICATIONFAILED\] invalid'):
367 client.authenticate('MYAUTH', lambda x: b'fake')
368
369 def test_valid_authentication_bytes(self):
370 class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
371 def cmd_AUTHENTICATE(self, tag, args):
372 self._send_textline('+')
373 self.server.response = yield
374 self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
375 client, server = self._setup(MyServer)
376 code, _ = client.authenticate('MYAUTH', lambda x: b'fake')
377 self.assertEqual(code, 'OK')
378 self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake'
379
380 def test_valid_authentication_plain_text(self):
381 class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
382 def cmd_AUTHENTICATE(self, tag, args):
383 self._send_textline('+')
384 self.server.response = yield
385 self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
386 client, server = self._setup(MyServer)
387 code, _ = client.authenticate('MYAUTH', lambda x: 'fake')
388 self.assertEqual(code, 'OK')
389 self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake'
390
391 @hashlib_helper.requires_hashdigest('md5', openssl=True)
392 def test_login_cram_md5_bytes(self):
393 class ESC[4;38;5;81mAuthHandler(ESC[4;38;5;149mSimpleIMAPHandler):
394 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
395 def cmd_AUTHENTICATE(self, tag, args):
396 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
397 'VzdG9uLm1jaS5uZXQ=')
398 r = yield
399 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT'
400 b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'):
401 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
402 else:
403 self._send_tagged(tag, 'NO', 'No access')
404 client, _ = self._setup(AuthHandler)
405 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
406 ret, _ = client.login_cram_md5("tim", b"tanstaaftanstaaf")
407 self.assertEqual(ret, "OK")
408
409 @hashlib_helper.requires_hashdigest('md5', openssl=True)
410 def test_login_cram_md5_plain_text(self):
411 class ESC[4;38;5;81mAuthHandler(ESC[4;38;5;149mSimpleIMAPHandler):
412 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
413 def cmd_AUTHENTICATE(self, tag, args):
414 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
415 'VzdG9uLm1jaS5uZXQ=')
416 r = yield
417 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT'
418 b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'):
419 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
420 else:
421 self._send_tagged(tag, 'NO', 'No access')
422 client, _ = self._setup(AuthHandler)
423 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
424 ret, _ = client.login_cram_md5("tim", "tanstaaftanstaaf")
425 self.assertEqual(ret, "OK")
426
427 def test_aborted_authentication(self):
428 class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
429 def cmd_AUTHENTICATE(self, tag, args):
430 self._send_textline('+')
431 self.response = yield
432 if self.response == b'*\r\n':
433 self._send_tagged(
434 tag,
435 'NO',
436 '[AUTHENTICATIONFAILED] aborted')
437 else:
438 self._send_tagged(tag, 'OK', 'MYAUTH successful')
439 client, _ = self._setup(MyServer)
440 with self.assertRaisesRegex(imaplib.IMAP4.error,
441 r'\[AUTHENTICATIONFAILED\] aborted'):
442 client.authenticate('MYAUTH', lambda x: None)
443
444 @mock.patch('imaplib._MAXLINE', 10)
445 def test_linetoolong(self):
446 class ESC[4;38;5;81mTooLongHandler(ESC[4;38;5;149mSimpleIMAPHandler):
447 def handle(self):
448 # send response line longer than the limit set in the next line
449 self.wfile.write(b'* OK ' + 11 * b'x' + b'\r\n')
450 _, server = self._setup(TooLongHandler, connect=False)
451 with self.assertRaisesRegex(imaplib.IMAP4.error,
452 'got more than 10 bytes'):
453 self.imap_class(*server.server_address)
454
455 def test_simple_with_statement(self):
456 _, server = self._setup(SimpleIMAPHandler, connect=False)
457 with self.imap_class(*server.server_address):
458 pass
459
460 @requires_resource('walltime')
461 def test_imaplib_timeout_test(self):
462 _, server = self._setup(SimpleIMAPHandler)
463 addr = server.server_address[1]
464 client = self.imap_class("localhost", addr, timeout=None)
465 self.assertEqual(client.sock.timeout, None)
466 client.shutdown()
467 client = self.imap_class("localhost", addr, timeout=support.LOOPBACK_TIMEOUT)
468 self.assertEqual(client.sock.timeout, support.LOOPBACK_TIMEOUT)
469 client.shutdown()
470 with self.assertRaises(ValueError):
471 client = self.imap_class("localhost", addr, timeout=0)
472
473 def test_imaplib_timeout_functionality_test(self):
474 class ESC[4;38;5;81mTimeoutHandler(ESC[4;38;5;149mSimpleIMAPHandler):
475 def handle(self):
476 time.sleep(1)
477 SimpleIMAPHandler.handle(self)
478
479 _, server = self._setup(TimeoutHandler)
480 addr = server.server_address[1]
481 with self.assertRaises(TimeoutError):
482 client = self.imap_class("localhost", addr, timeout=0.001)
483
484 def test_with_statement(self):
485 _, server = self._setup(SimpleIMAPHandler, connect=False)
486 with self.imap_class(*server.server_address) as imap:
487 imap.login('user', 'pass')
488 self.assertEqual(server.logged, 'user')
489 self.assertIsNone(server.logged)
490
491 def test_with_statement_logout(self):
492 # It is legal to log out explicitly inside the with block
493 _, server = self._setup(SimpleIMAPHandler, connect=False)
494 with self.imap_class(*server.server_address) as imap:
495 imap.login('user', 'pass')
496 self.assertEqual(server.logged, 'user')
497 imap.logout()
498 self.assertIsNone(server.logged)
499 self.assertIsNone(server.logged)
500
501 # command tests
502
503 def test_login(self):
504 client, _ = self._setup(SimpleIMAPHandler)
505 typ, data = client.login('user', 'pass')
506 self.assertEqual(typ, 'OK')
507 self.assertEqual(data[0], b'LOGIN completed')
508 self.assertEqual(client.state, 'AUTH')
509
510 def test_logout(self):
511 client, _ = self._setup(SimpleIMAPHandler)
512 typ, data = client.login('user', 'pass')
513 self.assertEqual(typ, 'OK')
514 self.assertEqual(data[0], b'LOGIN completed')
515 typ, data = client.logout()
516 self.assertEqual(typ, 'BYE', (typ, data))
517 self.assertEqual(data[0], b'IMAP4ref1 Server logging out', (typ, data))
518 self.assertEqual(client.state, 'LOGOUT')
519
520 def test_lsub(self):
521 class ESC[4;38;5;81mLsubCmd(ESC[4;38;5;149mSimpleIMAPHandler):
522 def cmd_LSUB(self, tag, args):
523 self._send_textline('* LSUB () "." directoryA')
524 return self._send_tagged(tag, 'OK', 'LSUB completed')
525 client, _ = self._setup(LsubCmd)
526 client.login('user', 'pass')
527 typ, data = client.lsub()
528 self.assertEqual(typ, 'OK')
529 self.assertEqual(data[0], b'() "." directoryA')
530
531 def test_unselect(self):
532 client, _ = self._setup(SimpleIMAPHandler)
533 client.login('user', 'pass')
534 typ, data = client.select()
535 self.assertEqual(typ, 'OK')
536 self.assertEqual(data[0], b'2')
537
538 typ, data = client.unselect()
539 self.assertEqual(typ, 'OK')
540 self.assertEqual(data[0], b'Returned to authenticated state. (Success)')
541 self.assertEqual(client.state, 'AUTH')
542
543
544 class ESC[4;38;5;81mNewIMAPTests(ESC[4;38;5;149mNewIMAPTestsMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
545 imap_class = imaplib.IMAP4
546 server_class = socketserver.TCPServer
547
548
549 @unittest.skipUnless(ssl, "SSL not available")
550 class ESC[4;38;5;81mNewIMAPSSLTests(ESC[4;38;5;149mNewIMAPTestsMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
551 imap_class = IMAP4_SSL
552 server_class = SecureTCPServer
553
554 @requires_resource('walltime')
555 def test_ssl_raises(self):
556 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
557 self.assertEqual(ssl_context.verify_mode, ssl.CERT_REQUIRED)
558 self.assertEqual(ssl_context.check_hostname, True)
559 ssl_context.load_verify_locations(CAFILE)
560
561 with self.assertRaisesRegex(ssl.CertificateError,
562 "IP address mismatch, certificate is not valid for "
563 "'127.0.0.1'"):
564 _, server = self._setup(SimpleIMAPHandler)
565 client = self.imap_class(*server.server_address,
566 ssl_context=ssl_context)
567 client.shutdown()
568
569 @requires_resource('walltime')
570 def test_ssl_verified(self):
571 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
572 ssl_context.load_verify_locations(CAFILE)
573
574 _, server = self._setup(SimpleIMAPHandler)
575 client = self.imap_class("localhost", server.server_address[1],
576 ssl_context=ssl_context)
577 client.shutdown()
578
579 class ESC[4;38;5;81mThreadedNetworkedTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
580 server_class = socketserver.TCPServer
581 imap_class = imaplib.IMAP4
582
583 def make_server(self, addr, hdlr):
584
585 class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149mserver_class):
586 def handle_error(self, request, client_address):
587 self.close_request(request)
588 self.server_close()
589 raise
590
591 if verbose:
592 print("creating server")
593 server = MyServer(addr, hdlr)
594 self.assertEqual(server.server_address, server.socket.getsockname())
595
596 if verbose:
597 print("server created")
598 print("ADDR =", addr)
599 print("CLASS =", self.server_class)
600 print("HDLR =", server.RequestHandlerClass)
601
602 t = threading.Thread(
603 name='%s serving' % self.server_class,
604 target=server.serve_forever,
605 # Short poll interval to make the test finish quickly.
606 # Time between requests is short enough that we won't wake
607 # up spuriously too many times.
608 kwargs={'poll_interval': 0.01})
609 t.daemon = True # In case this function raises.
610 t.start()
611 if verbose:
612 print("server running")
613 return server, t
614
615 def reap_server(self, server, thread):
616 if verbose:
617 print("waiting for server")
618 server.shutdown()
619 server.server_close()
620 thread.join()
621 if verbose:
622 print("done")
623
624 @contextmanager
625 def reaped_server(self, hdlr):
626 server, thread = self.make_server((socket_helper.HOST, 0), hdlr)
627 try:
628 yield server
629 finally:
630 self.reap_server(server, thread)
631
632 @contextmanager
633 def reaped_pair(self, hdlr):
634 with self.reaped_server(hdlr) as server:
635 client = self.imap_class(*server.server_address)
636 try:
637 yield server, client
638 finally:
639 client.logout()
640
641 @threading_helper.reap_threads
642 def test_connect(self):
643 with self.reaped_server(SimpleIMAPHandler) as server:
644 client = self.imap_class(*server.server_address)
645 client.shutdown()
646
647 @threading_helper.reap_threads
648 def test_bracket_flags(self):
649
650 # This violates RFC 3501, which disallows ']' characters in tag names,
651 # but imaplib has allowed producing such tags forever, other programs
652 # also produce them (eg: OtherInbox's Organizer app as of 20140716),
653 # and Gmail, for example, accepts them and produces them. So we
654 # support them. See issue #21815.
655
656 class ESC[4;38;5;81mBracketFlagHandler(ESC[4;38;5;149mSimpleIMAPHandler):
657
658 def handle(self):
659 self.flags = ['Answered', 'Flagged', 'Deleted', 'Seen', 'Draft']
660 super().handle()
661
662 def cmd_AUTHENTICATE(self, tag, args):
663 self._send_textline('+')
664 self.server.response = yield
665 self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
666
667 def cmd_SELECT(self, tag, args):
668 flag_msg = ' \\'.join(self.flags)
669 self._send_line(('* FLAGS (%s)' % flag_msg).encode('ascii'))
670 self._send_line(b'* 2 EXISTS')
671 self._send_line(b'* 0 RECENT')
672 msg = ('* OK [PERMANENTFLAGS %s \\*)] Flags permitted.'
673 % flag_msg)
674 self._send_line(msg.encode('ascii'))
675 self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.')
676
677 def cmd_STORE(self, tag, args):
678 new_flags = args[2].strip('(').strip(')').split()
679 self.flags.extend(new_flags)
680 flags_msg = '(FLAGS (%s))' % ' \\'.join(self.flags)
681 msg = '* %s FETCH %s' % (args[0], flags_msg)
682 self._send_line(msg.encode('ascii'))
683 self._send_tagged(tag, 'OK', 'STORE completed.')
684
685 with self.reaped_pair(BracketFlagHandler) as (server, client):
686 code, data = client.authenticate('MYAUTH', lambda x: b'fake')
687 self.assertEqual(code, 'OK')
688 self.assertEqual(server.response, b'ZmFrZQ==\r\n')
689 client.select('test')
690 typ, [data] = client.store(b'1', "+FLAGS", "[test]")
691 self.assertIn(b'[test]', data)
692 client.select('test')
693 typ, [data] = client.response('PERMANENTFLAGS')
694 self.assertIn(b'[test]', data)
695
696 @threading_helper.reap_threads
697 def test_issue5949(self):
698
699 class ESC[4;38;5;81mEOFHandler(ESC[4;38;5;149msocketserverESC[4;38;5;149m.ESC[4;38;5;149mStreamRequestHandler):
700 def handle(self):
701 # EOF without sending a complete welcome message.
702 self.wfile.write(b'* OK')
703
704 with self.reaped_server(EOFHandler) as server:
705 self.assertRaises(imaplib.IMAP4.abort,
706 self.imap_class, *server.server_address)
707
708 @threading_helper.reap_threads
709 def test_line_termination(self):
710
711 class ESC[4;38;5;81mBadNewlineHandler(ESC[4;38;5;149mSimpleIMAPHandler):
712
713 def cmd_CAPABILITY(self, tag, args):
714 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
715 self._send_tagged(tag, 'OK', 'CAPABILITY completed')
716
717 with self.reaped_server(BadNewlineHandler) as server:
718 self.assertRaises(imaplib.IMAP4.abort,
719 self.imap_class, *server.server_address)
720
721 class ESC[4;38;5;81mUTF8Server(ESC[4;38;5;149mSimpleIMAPHandler):
722 capabilities = 'AUTH ENABLE UTF8=ACCEPT'
723
724 def cmd_ENABLE(self, tag, args):
725 self._send_tagged(tag, 'OK', 'ENABLE successful')
726
727 def cmd_AUTHENTICATE(self, tag, args):
728 self._send_textline('+')
729 self.server.response = yield
730 self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
731
732 @threading_helper.reap_threads
733 def test_enable_raises_error_if_not_AUTH(self):
734 with self.reaped_pair(self.UTF8Server) as (server, client):
735 self.assertFalse(client.utf8_enabled)
736 self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo')
737 self.assertFalse(client.utf8_enabled)
738
739 # XXX Also need a test that enable after SELECT raises an error.
740
741 @threading_helper.reap_threads
742 def test_enable_raises_error_if_no_capability(self):
743 class ESC[4;38;5;81mNoEnableServer(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149mUTF8Server):
744 capabilities = 'AUTH'
745 with self.reaped_pair(NoEnableServer) as (server, client):
746 self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo')
747
748 @threading_helper.reap_threads
749 def test_enable_UTF8_raises_error_if_not_supported(self):
750 class ESC[4;38;5;81mNonUTF8Server(ESC[4;38;5;149mSimpleIMAPHandler):
751 pass
752 with self.assertRaises(imaplib.IMAP4.error):
753 with self.reaped_pair(NonUTF8Server) as (server, client):
754 typ, data = client.login('user', 'pass')
755 self.assertEqual(typ, 'OK')
756 client.enable('UTF8=ACCEPT')
757
758 @threading_helper.reap_threads
759 def test_enable_UTF8_True_append(self):
760
761 class ESC[4;38;5;81mUTF8AppendServer(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149mUTF8Server):
762 def cmd_APPEND(self, tag, args):
763 self._send_textline('+')
764 self.server.response = yield
765 self._send_tagged(tag, 'OK', 'okay')
766
767 with self.reaped_pair(UTF8AppendServer) as (server, client):
768 self.assertEqual(client._encoding, 'ascii')
769 code, _ = client.authenticate('MYAUTH', lambda x: b'fake')
770 self.assertEqual(code, 'OK')
771 self.assertEqual(server.response,
772 b'ZmFrZQ==\r\n') # b64 encoded 'fake'
773 code, _ = client.enable('UTF8=ACCEPT')
774 self.assertEqual(code, 'OK')
775 self.assertEqual(client._encoding, 'utf-8')
776 msg_string = 'Subject: üñéöðé'
777 typ, data = client.append(
778 None, None, None, msg_string.encode('utf-8'))
779 self.assertEqual(typ, 'OK')
780 self.assertEqual(
781 server.response,
782 ('UTF8 (%s)\r\n' % msg_string).encode('utf-8')
783 )
784
785 # XXX also need a test that makes sure that the Literal and Untagged_status
786 # regexes uses unicode in UTF8 mode instead of the default ASCII.
787
788 @threading_helper.reap_threads
789 def test_search_disallows_charset_in_utf8_mode(self):
790 with self.reaped_pair(self.UTF8Server) as (server, client):
791 typ, _ = client.authenticate('MYAUTH', lambda x: b'fake')
792 self.assertEqual(typ, 'OK')
793 typ, _ = client.enable('UTF8=ACCEPT')
794 self.assertEqual(typ, 'OK')
795 self.assertTrue(client.utf8_enabled)
796 self.assertRaises(imaplib.IMAP4.error, client.search, 'foo', 'bar')
797
798 @threading_helper.reap_threads
799 def test_bad_auth_name(self):
800
801 class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
802
803 def cmd_AUTHENTICATE(self, tag, args):
804 self._send_tagged(tag, 'NO', 'unrecognized authentication '
805 'type {}'.format(args[0]))
806
807 with self.reaped_pair(MyServer) as (server, client):
808 with self.assertRaises(imaplib.IMAP4.error):
809 client.authenticate('METHOD', lambda: 1)
810
811 @threading_helper.reap_threads
812 def test_invalid_authentication(self):
813
814 class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
815
816 def cmd_AUTHENTICATE(self, tag, args):
817 self._send_textline('+')
818 self.response = yield
819 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')
820
821 with self.reaped_pair(MyServer) as (server, client):
822 with self.assertRaises(imaplib.IMAP4.error):
823 code, data = client.authenticate('MYAUTH', lambda x: b'fake')
824
825 @threading_helper.reap_threads
826 def test_valid_authentication(self):
827
828 class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
829
830 def cmd_AUTHENTICATE(self, tag, args):
831 self._send_textline('+')
832 self.server.response = yield
833 self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
834
835 with self.reaped_pair(MyServer) as (server, client):
836 code, data = client.authenticate('MYAUTH', lambda x: b'fake')
837 self.assertEqual(code, 'OK')
838 self.assertEqual(server.response,
839 b'ZmFrZQ==\r\n') # b64 encoded 'fake'
840
841 with self.reaped_pair(MyServer) as (server, client):
842 code, data = client.authenticate('MYAUTH', lambda x: 'fake')
843 self.assertEqual(code, 'OK')
844 self.assertEqual(server.response,
845 b'ZmFrZQ==\r\n') # b64 encoded 'fake'
846
847 @threading_helper.reap_threads
848 @hashlib_helper.requires_hashdigest('md5', openssl=True)
849 def test_login_cram_md5(self):
850
851 class ESC[4;38;5;81mAuthHandler(ESC[4;38;5;149mSimpleIMAPHandler):
852
853 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
854
855 def cmd_AUTHENTICATE(self, tag, args):
856 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
857 'VzdG9uLm1jaS5uZXQ=')
858 r = yield
859 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT'
860 b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'):
861 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
862 else:
863 self._send_tagged(tag, 'NO', 'No access')
864
865 with self.reaped_pair(AuthHandler) as (server, client):
866 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
867 ret, data = client.login_cram_md5("tim", "tanstaaftanstaaf")
868 self.assertEqual(ret, "OK")
869
870 with self.reaped_pair(AuthHandler) as (server, client):
871 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
872 ret, data = client.login_cram_md5("tim", b"tanstaaftanstaaf")
873 self.assertEqual(ret, "OK")
874
875
876 @threading_helper.reap_threads
877 def test_aborted_authentication(self):
878
879 class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
880
881 def cmd_AUTHENTICATE(self, tag, args):
882 self._send_textline('+')
883 self.response = yield
884
885 if self.response == b'*\r\n':
886 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] aborted')
887 else:
888 self._send_tagged(tag, 'OK', 'MYAUTH successful')
889
890 with self.reaped_pair(MyServer) as (server, client):
891 with self.assertRaises(imaplib.IMAP4.error):
892 code, data = client.authenticate('MYAUTH', lambda x: None)
893
894
895 def test_linetoolong(self):
896 class ESC[4;38;5;81mTooLongHandler(ESC[4;38;5;149mSimpleIMAPHandler):
897 def handle(self):
898 # Send a very long response line
899 self.wfile.write(b'* OK ' + imaplib._MAXLINE * b'x' + b'\r\n')
900
901 with self.reaped_server(TooLongHandler) as server:
902 self.assertRaises(imaplib.IMAP4.error,
903 self.imap_class, *server.server_address)
904
905 @threading_helper.reap_threads
906 def test_simple_with_statement(self):
907 # simplest call
908 with self.reaped_server(SimpleIMAPHandler) as server:
909 with self.imap_class(*server.server_address):
910 pass
911
912 @threading_helper.reap_threads
913 def test_with_statement(self):
914 with self.reaped_server(SimpleIMAPHandler) as server:
915 with self.imap_class(*server.server_address) as imap:
916 imap.login('user', 'pass')
917 self.assertEqual(server.logged, 'user')
918 self.assertIsNone(server.logged)
919
920 @threading_helper.reap_threads
921 def test_with_statement_logout(self):
922 # what happens if already logout in the block?
923 with self.reaped_server(SimpleIMAPHandler) as server:
924 with self.imap_class(*server.server_address) as imap:
925 imap.login('user', 'pass')
926 self.assertEqual(server.logged, 'user')
927 imap.logout()
928 self.assertIsNone(server.logged)
929 self.assertIsNone(server.logged)
930
931 @threading_helper.reap_threads
932 @cpython_only
933 @unittest.skipUnless(__debug__, "Won't work if __debug__ is False")
934 def test_dump_ur(self):
935 # See: http://bugs.python.org/issue26543
936 untagged_resp_dict = {'READ-WRITE': [b'']}
937
938 with self.reaped_server(SimpleIMAPHandler) as server:
939 with self.imap_class(*server.server_address) as imap:
940 with mock.patch.object(imap, '_mesg') as mock_mesg:
941 imap._dump_ur(untagged_resp_dict)
942 mock_mesg.assert_called_with(
943 "untagged responses dump:READ-WRITE: [b'']"
944 )
945
946
947 @unittest.skipUnless(ssl, "SSL not available")
948 class ESC[4;38;5;81mThreadedNetworkedTestsSSL(ESC[4;38;5;149mThreadedNetworkedTests):
949 server_class = SecureTCPServer
950 imap_class = IMAP4_SSL
951
952 @threading_helper.reap_threads
953 def test_ssl_verified(self):
954 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
955 ssl_context.load_verify_locations(CAFILE)
956
957 with self.assertRaisesRegex(
958 ssl.CertificateError,
959 "IP address mismatch, certificate is not valid for "
960 "'127.0.0.1'"):
961 with self.reaped_server(SimpleIMAPHandler) as server:
962 client = self.imap_class(*server.server_address,
963 ssl_context=ssl_context)
964 client.shutdown()
965
966 with self.reaped_server(SimpleIMAPHandler) as server:
967 client = self.imap_class("localhost", server.server_address[1],
968 ssl_context=ssl_context)
969 client.shutdown()
970
971
972 @unittest.skipUnless(
973 support.is_resource_enabled('network'), 'network resource disabled')
974 @unittest.skip('cyrus.andrew.cmu.edu blocks connections')
975 class ESC[4;38;5;81mRemoteIMAPTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
976 host = 'cyrus.andrew.cmu.edu'
977 port = 143
978 username = 'anonymous'
979 password = 'pass'
980 imap_class = imaplib.IMAP4
981
982 def setUp(self):
983 with socket_helper.transient_internet(self.host):
984 self.server = self.imap_class(self.host, self.port)
985
986 def tearDown(self):
987 if self.server is not None:
988 with socket_helper.transient_internet(self.host):
989 self.server.logout()
990
991 def test_logincapa(self):
992 with socket_helper.transient_internet(self.host):
993 for cap in self.server.capabilities:
994 self.assertIsInstance(cap, str)
995 self.assertIn('LOGINDISABLED', self.server.capabilities)
996 self.assertIn('AUTH=ANONYMOUS', self.server.capabilities)
997 rs = self.server.login(self.username, self.password)
998 self.assertEqual(rs[0], 'OK')
999
1000 def test_logout(self):
1001 with socket_helper.transient_internet(self.host):
1002 rs = self.server.logout()
1003 self.server = None
1004 self.assertEqual(rs[0], 'BYE', rs)
1005
1006
1007 @unittest.skipUnless(ssl, "SSL not available")
1008 @unittest.skipUnless(
1009 support.is_resource_enabled('network'), 'network resource disabled')
1010 @unittest.skip('cyrus.andrew.cmu.edu blocks connections')
1011 class ESC[4;38;5;81mRemoteIMAP_STARTTLSTest(ESC[4;38;5;149mRemoteIMAPTest):
1012
1013 def setUp(self):
1014 super().setUp()
1015 with socket_helper.transient_internet(self.host):
1016 rs = self.server.starttls()
1017 self.assertEqual(rs[0], 'OK')
1018
1019 def test_logincapa(self):
1020 for cap in self.server.capabilities:
1021 self.assertIsInstance(cap, str)
1022 self.assertNotIn('LOGINDISABLED', self.server.capabilities)
1023
1024
1025 @unittest.skipUnless(ssl, "SSL not available")
1026 @unittest.skip('cyrus.andrew.cmu.edu blocks connections')
1027 class ESC[4;38;5;81mRemoteIMAP_SSLTest(ESC[4;38;5;149mRemoteIMAPTest):
1028 port = 993
1029 imap_class = IMAP4_SSL
1030
1031 def setUp(self):
1032 pass
1033
1034 def tearDown(self):
1035 pass
1036
1037 def create_ssl_context(self):
1038 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1039 ssl_context.check_hostname = False
1040 ssl_context.verify_mode = ssl.CERT_NONE
1041 ssl_context.load_cert_chain(CERTFILE)
1042 return ssl_context
1043
1044 def check_logincapa(self, server):
1045 try:
1046 for cap in server.capabilities:
1047 self.assertIsInstance(cap, str)
1048 self.assertNotIn('LOGINDISABLED', server.capabilities)
1049 self.assertIn('AUTH=PLAIN', server.capabilities)
1050 rs = server.login(self.username, self.password)
1051 self.assertEqual(rs[0], 'OK')
1052 finally:
1053 server.logout()
1054
1055 def test_logincapa(self):
1056 with socket_helper.transient_internet(self.host):
1057 _server = self.imap_class(self.host, self.port)
1058 self.check_logincapa(_server)
1059
1060 def test_logout(self):
1061 with socket_helper.transient_internet(self.host):
1062 _server = self.imap_class(self.host, self.port)
1063 rs = _server.logout()
1064 self.assertEqual(rs[0], 'BYE', rs)
1065
1066
1067 if __name__ == "__main__":
1068 unittest.main()