python (3.11.7)
1 from test import support
2 from test.support import socket_helper
3
4 from contextlib import contextmanager
5 import imaplib
6 import os.path
7 import socketserver
8 import time
9 import calendar
10 import threading
11 import socket
12
13 from test.support import (verbose,
14 run_with_tz, run_with_locale, cpython_only, requires_resource,
15 requires_working_socket)
16 from test.support import hashlib_helper
17 from test.support import threading_helper
18 from test.support import warnings_helper
19 import unittest
20 from unittest import mock
21 from datetime import datetime, timezone, timedelta
22 try:
23 import ssl
24 except ImportError:
25 ssl = None
26
27 support.requires_working_socket(module=True)
28
29 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "certdata", "keycert3.pem")
30 CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "certdata", "pycacert.pem")
31
32
33 class ESC[4;38;5;81mTestImaplib(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
34
35 def test_Internaldate2tuple(self):
36 t0 = calendar.timegm((2000, 1, 1, 0, 0, 0, -1, -1, -1))
37 tt = imaplib.Internaldate2tuple(
38 b'25 (INTERNALDATE "01-Jan-2000 00:00:00 +0000")')
39 self.assertEqual(time.mktime(tt), t0)
40 tt = imaplib.Internaldate2tuple(
41 b'25 (INTERNALDATE "01-Jan-2000 11:30:00 +1130")')
42 self.assertEqual(time.mktime(tt), t0)
43 tt = imaplib.Internaldate2tuple(
44 b'25 (INTERNALDATE "31-Dec-1999 12:30:00 -1130")')
45 self.assertEqual(time.mktime(tt), t0)
46
47 @run_with_tz('MST+07MDT,M4.1.0,M10.5.0')
48 def test_Internaldate2tuple_issue10941(self):
49 self.assertNotEqual(imaplib.Internaldate2tuple(
50 b'25 (INTERNALDATE "02-Apr-2000 02:30:00 +0000")'),
51 imaplib.Internaldate2tuple(
52 b'25 (INTERNALDATE "02-Apr-2000 03:30:00 +0000")'))
53
54 def timevalues(self):
55 return [2000000000, 2000000000.0, time.localtime(2000000000),
56 (2033, 5, 18, 5, 33, 20, -1, -1, -1),
57 (2033, 5, 18, 5, 33, 20, -1, -1, 1),
58 datetime.fromtimestamp(2000000000,
59 timezone(timedelta(0, 2 * 60 * 60))),
60 '"18-May-2033 05:33:20 +0200"']
61
62 @run_with_locale('LC_ALL', 'de_DE', 'fr_FR')
63 # DST rules included to work around quirk where the Gnu C library may not
64 # otherwise restore the previous time zone
65 @run_with_tz('STD-1DST,M3.2.0,M11.1.0')
66 def test_Time2Internaldate(self):
67 expected = '"18-May-2033 05:33:20 +0200"'
68
69 for t in self.timevalues():
70 internal = imaplib.Time2Internaldate(t)
71 self.assertEqual(internal, expected)
72
73 def test_that_Time2Internaldate_returns_a_result(self):
74 # Without tzset, we can check only that it successfully
75 # produces a result, not the correctness of the result itself,
76 # since the result depends on the timezone the machine is in.
77 for t in self.timevalues():
78 imaplib.Time2Internaldate(t)
79
80 @socket_helper.skip_if_tcp_blackhole
81 def test_imap4_host_default_value(self):
82 # Check whether the IMAP4_PORT is truly unavailable.
83 with socket.socket() as s:
84 try:
85 s.connect(('', imaplib.IMAP4_PORT))
86 self.skipTest(
87 "Cannot run the test with local IMAP server running.")
88 except socket.error:
89 pass
90
91 # This is the exception that should be raised.
92 expected_errnos = socket_helper.get_socket_conn_refused_errs()
93 with self.assertRaises(OSError) as cm:
94 imaplib.IMAP4()
95 self.assertIn(cm.exception.errno, expected_errnos)
96
97
98 if ssl:
99 class ESC[4;38;5;81mSecureTCPServer(ESC[4;38;5;149msocketserverESC[4;38;5;149m.ESC[4;38;5;149mTCPServer):
100
101 def get_request(self):
102 newsocket, fromaddr = self.socket.accept()
103 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
104 context.load_cert_chain(CERTFILE)
105 connstream = context.wrap_socket(newsocket, server_side=True)
106 return connstream, fromaddr
107
108 IMAP4_SSL = imaplib.IMAP4_SSL
109
110 else:
111
112 class ESC[4;38;5;81mSecureTCPServer:
113 pass
114
115 IMAP4_SSL = None
116
117
118 class ESC[4;38;5;81mSimpleIMAPHandler(ESC[4;38;5;149msocketserverESC[4;38;5;149m.ESC[4;38;5;149mStreamRequestHandler):
119 timeout = support.LOOPBACK_TIMEOUT
120 continuation = None
121 capabilities = ''
122
123 def setup(self):
124 super().setup()
125 self.server.is_selected = False
126 self.server.logged = None
127
128 def _send(self, message):
129 if verbose:
130 print("SENT: %r" % message.strip())
131 self.wfile.write(message)
132
133 def _send_line(self, message):
134 self._send(message + b'\r\n')
135
136 def _send_textline(self, message):
137 self._send_line(message.encode('ASCII'))
138
139 def _send_tagged(self, tag, code, message):
140 self._send_textline(' '.join((tag, code, message)))
141
142 def handle(self):
143 # Send a welcome message.
144 self._send_textline('* OK IMAP4rev1')
145 while 1:
146 # Gather up input until we receive a line terminator or we timeout.
147 # Accumulate read(1) because it's simpler to handle the differences
148 # between naked sockets and SSL sockets.
149 line = b''
150 while 1:
151 try:
152 part = self.rfile.read(1)
153 if part == b'':
154 # Naked sockets return empty strings..
155 return
156 line += part
157 except OSError:
158 # ..but SSLSockets raise exceptions.
159 return
160 if line.endswith(b'\r\n'):
161 break
162
163 if verbose:
164 print('GOT: %r' % line.strip())
165 if self.continuation:
166 try:
167 self.continuation.send(line)
168 except StopIteration:
169 self.continuation = None
170 continue
171 splitline = line.decode('ASCII').split()
172 tag = splitline[0]
173 cmd = splitline[1]
174 args = splitline[2:]
175
176 if hasattr(self, 'cmd_' + cmd):
177 continuation = getattr(self, 'cmd_' + cmd)(tag, args)
178 if continuation:
179 self.continuation = continuation
180 next(continuation)
181 else:
182 self._send_tagged(tag, 'BAD', cmd + ' unknown')
183
184 def cmd_CAPABILITY(self, tag, args):
185 caps = ('IMAP4rev1 ' + self.capabilities
186 if self.capabilities
187 else 'IMAP4rev1')
188 self._send_textline('* CAPABILITY ' + caps)
189 self._send_tagged(tag, 'OK', 'CAPABILITY completed')
190
191 def cmd_LOGOUT(self, tag, args):
192 self.server.logged = None
193 self._send_textline('* BYE IMAP4ref1 Server logging out')
194 self._send_tagged(tag, 'OK', 'LOGOUT completed')
195
196 def cmd_LOGIN(self, tag, args):
197 self.server.logged = args[0]
198 self._send_tagged(tag, 'OK', 'LOGIN completed')
199
200 def cmd_SELECT(self, tag, args):
201 self.server.is_selected = True
202 self._send_line(b'* 2 EXISTS')
203 self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.')
204
205 def cmd_UNSELECT(self, tag, args):
206 if self.server.is_selected:
207 self.server.is_selected = False
208 self._send_tagged(tag, 'OK', 'Returned to authenticated state. (Success)')
209 else:
210 self._send_tagged(tag, 'BAD', 'No mailbox selected')
211
212
213 class ESC[4;38;5;81mNewIMAPTestsMixin():
214 client = None
215
216 def _setup(self, imap_handler, connect=True):
217 """
218 Sets up imap_handler for tests. imap_handler should inherit from either:
219 - SimpleIMAPHandler - for testing IMAP commands,
220 - socketserver.StreamRequestHandler - if raw access to stream is needed.
221 Returns (client, server).
222 """
223 class ESC[4;38;5;81mTestTCPServer(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149mserver_class):
224 def handle_error(self, request, client_address):
225 """
226 End request and raise the error if one occurs.
227 """
228 self.close_request(request)
229 self.server_close()
230 raise
231
232 self.addCleanup(self._cleanup)
233 self.server = self.server_class((socket_helper.HOST, 0), imap_handler)
234 self.thread = threading.Thread(
235 name=self._testMethodName+'-server',
236 target=self.server.serve_forever,
237 # Short poll interval to make the test finish quickly.
238 # Time between requests is short enough that we won't wake
239 # up spuriously too many times.
240 kwargs={'poll_interval': 0.01})
241 self.thread.daemon = True # In case this function raises.
242 self.thread.start()
243
244 if connect:
245 self.client = self.imap_class(*self.server.server_address)
246
247 return self.client, self.server
248
249 def _cleanup(self):
250 """
251 Cleans up the test server. This method should not be called manually,
252 it is added to the cleanup queue in the _setup method already.
253 """
254 # if logout was called already we'd raise an exception trying to
255 # shutdown the client once again
256 if self.client is not None and self.client.state != 'LOGOUT':
257 self.client.shutdown()
258 # cleanup the server
259 self.server.shutdown()
260 self.server.server_close()
261 threading_helper.join_thread(self.thread)
262 # Explicitly clear the attribute to prevent dangling thread
263 self.thread = None
264
265 def test_EOF_without_complete_welcome_message(self):
266 # http://bugs.python.org/issue5949
267 class ESC[4;38;5;81mEOFHandler(ESC[4;38;5;149msocketserverESC[4;38;5;149m.ESC[4;38;5;149mStreamRequestHandler):
268 def handle(self):
269 self.wfile.write(b'* OK')
270 _, server = self._setup(EOFHandler, connect=False)
271 self.assertRaises(imaplib.IMAP4.abort, self.imap_class,
272 *server.server_address)
273
274 def test_line_termination(self):
275 class ESC[4;38;5;81mBadNewlineHandler(ESC[4;38;5;149mSimpleIMAPHandler):
276 def cmd_CAPABILITY(self, tag, args):
277 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
278 self._send_tagged(tag, 'OK', 'CAPABILITY completed')
279 _, server = self._setup(BadNewlineHandler, connect=False)
280 self.assertRaises(imaplib.IMAP4.abort, self.imap_class,
281 *server.server_address)
282
283 def test_enable_raises_error_if_not_AUTH(self):
284 class ESC[4;38;5;81mEnableHandler(ESC[4;38;5;149mSimpleIMAPHandler):
285 capabilities = 'AUTH ENABLE UTF8=ACCEPT'
286 client, _ = self._setup(EnableHandler)
287 self.assertFalse(client.utf8_enabled)
288 with self.assertRaisesRegex(imaplib.IMAP4.error, 'ENABLE.*NONAUTH'):
289 client.enable('foo')
290 self.assertFalse(client.utf8_enabled)
291
292 def test_enable_raises_error_if_no_capability(self):
293 client, _ = self._setup(SimpleIMAPHandler)
294 with self.assertRaisesRegex(imaplib.IMAP4.error,
295 'does not support ENABLE'):
296 client.enable('foo')
297
298 def test_enable_UTF8_raises_error_if_not_supported(self):
299 client, _ = self._setup(SimpleIMAPHandler)
300 typ, data = client.login('user', 'pass')
301 self.assertEqual(typ, 'OK')
302 with self.assertRaisesRegex(imaplib.IMAP4.error,
303 'does not support ENABLE'):
304 client.enable('UTF8=ACCEPT')
305
306 def test_enable_UTF8_True_append(self):
307 class ESC[4;38;5;81mUTF8AppendServer(ESC[4;38;5;149mSimpleIMAPHandler):
308 capabilities = 'ENABLE UTF8=ACCEPT'
309 def cmd_ENABLE(self, tag, args):
310 self._send_tagged(tag, 'OK', 'ENABLE successful')
311 def cmd_AUTHENTICATE(self, tag, args):
312 self._send_textline('+')
313 self.server.response = yield
314 self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
315 def cmd_APPEND(self, tag, args):
316 self._send_textline('+')
317 self.server.response = yield
318 self._send_tagged(tag, 'OK', 'okay')
319 client, server = self._setup(UTF8AppendServer)
320 self.assertEqual(client._encoding, 'ascii')
321 code, _ = client.authenticate('MYAUTH', lambda x: b'fake')
322 self.assertEqual(code, 'OK')
323 self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake'
324 code, _ = client.enable('UTF8=ACCEPT')
325 self.assertEqual(code, 'OK')
326 self.assertEqual(client._encoding, 'utf-8')
327 msg_string = 'Subject: üñéöðé'
328 typ, data = client.append(None, None, None, msg_string.encode('utf-8'))
329 self.assertEqual(typ, 'OK')
330 self.assertEqual(server.response,
331 ('UTF8 (%s)\r\n' % msg_string).encode('utf-8'))
332
333 def test_search_disallows_charset_in_utf8_mode(self):
334 class ESC[4;38;5;81mUTF8Server(ESC[4;38;5;149mSimpleIMAPHandler):
335 capabilities = 'AUTH ENABLE UTF8=ACCEPT'
336 def cmd_ENABLE(self, tag, args):
337 self._send_tagged(tag, 'OK', 'ENABLE successful')
338 def cmd_AUTHENTICATE(self, tag, args):
339 self._send_textline('+')
340 self.server.response = yield
341 self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
342 client, _ = self._setup(UTF8Server)
343 typ, _ = client.authenticate('MYAUTH', lambda x: b'fake')
344 self.assertEqual(typ, 'OK')
345 typ, _ = client.enable('UTF8=ACCEPT')
346 self.assertEqual(typ, 'OK')
347 self.assertTrue(client.utf8_enabled)
348 with self.assertRaisesRegex(imaplib.IMAP4.error, 'charset.*UTF8'):
349 client.search('foo', 'bar')
350
351 def test_bad_auth_name(self):
352 class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
353 def cmd_AUTHENTICATE(self, tag, args):
354 self._send_tagged(tag, 'NO',
355 'unrecognized authentication type {}'.format(args[0]))
356 client, _ = self._setup(MyServer)
357 with self.assertRaisesRegex(imaplib.IMAP4.error,
358 'unrecognized authentication type METHOD'):
359 client.authenticate('METHOD', lambda: 1)
360
361 def test_invalid_authentication(self):
362 class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
363 def cmd_AUTHENTICATE(self, tag, args):
364 self._send_textline('+')
365 self.response = yield
366 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')
367 client, _ = self._setup(MyServer)
368 with self.assertRaisesRegex(imaplib.IMAP4.error,
369 r'\[AUTHENTICATIONFAILED\] invalid'):
370 client.authenticate('MYAUTH', lambda x: b'fake')
371
372 def test_valid_authentication_bytes(self):
373 class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
374 def cmd_AUTHENTICATE(self, tag, args):
375 self._send_textline('+')
376 self.server.response = yield
377 self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
378 client, server = self._setup(MyServer)
379 code, _ = client.authenticate('MYAUTH', lambda x: b'fake')
380 self.assertEqual(code, 'OK')
381 self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake'
382
383 def test_valid_authentication_plain_text(self):
384 class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
385 def cmd_AUTHENTICATE(self, tag, args):
386 self._send_textline('+')
387 self.server.response = yield
388 self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
389 client, server = self._setup(MyServer)
390 code, _ = client.authenticate('MYAUTH', lambda x: 'fake')
391 self.assertEqual(code, 'OK')
392 self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake'
393
394 @hashlib_helper.requires_hashdigest('md5', openssl=True)
395 def test_login_cram_md5_bytes(self):
396 class ESC[4;38;5;81mAuthHandler(ESC[4;38;5;149mSimpleIMAPHandler):
397 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
398 def cmd_AUTHENTICATE(self, tag, args):
399 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
400 'VzdG9uLm1jaS5uZXQ=')
401 r = yield
402 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT'
403 b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'):
404 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
405 else:
406 self._send_tagged(tag, 'NO', 'No access')
407 client, _ = self._setup(AuthHandler)
408 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
409 ret, _ = client.login_cram_md5("tim", b"tanstaaftanstaaf")
410 self.assertEqual(ret, "OK")
411
412 @hashlib_helper.requires_hashdigest('md5', openssl=True)
413 def test_login_cram_md5_plain_text(self):
414 class ESC[4;38;5;81mAuthHandler(ESC[4;38;5;149mSimpleIMAPHandler):
415 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
416 def cmd_AUTHENTICATE(self, tag, args):
417 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
418 'VzdG9uLm1jaS5uZXQ=')
419 r = yield
420 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT'
421 b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'):
422 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
423 else:
424 self._send_tagged(tag, 'NO', 'No access')
425 client, _ = self._setup(AuthHandler)
426 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
427 ret, _ = client.login_cram_md5("tim", "tanstaaftanstaaf")
428 self.assertEqual(ret, "OK")
429
430 def test_aborted_authentication(self):
431 class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
432 def cmd_AUTHENTICATE(self, tag, args):
433 self._send_textline('+')
434 self.response = yield
435 if self.response == b'*\r\n':
436 self._send_tagged(
437 tag,
438 'NO',
439 '[AUTHENTICATIONFAILED] aborted')
440 else:
441 self._send_tagged(tag, 'OK', 'MYAUTH successful')
442 client, _ = self._setup(MyServer)
443 with self.assertRaisesRegex(imaplib.IMAP4.error,
444 r'\[AUTHENTICATIONFAILED\] aborted'):
445 client.authenticate('MYAUTH', lambda x: None)
446
447 @mock.patch('imaplib._MAXLINE', 10)
448 def test_linetoolong(self):
449 class ESC[4;38;5;81mTooLongHandler(ESC[4;38;5;149mSimpleIMAPHandler):
450 def handle(self):
451 # send response line longer than the limit set in the next line
452 self.wfile.write(b'* OK ' + 11 * b'x' + b'\r\n')
453 _, server = self._setup(TooLongHandler, connect=False)
454 with self.assertRaisesRegex(imaplib.IMAP4.error,
455 'got more than 10 bytes'):
456 self.imap_class(*server.server_address)
457
458 def test_simple_with_statement(self):
459 _, server = self._setup(SimpleIMAPHandler, connect=False)
460 with self.imap_class(*server.server_address):
461 pass
462
463 @requires_resource('walltime')
464 def test_imaplib_timeout_test(self):
465 _, server = self._setup(SimpleIMAPHandler)
466 addr = server.server_address[1]
467 client = self.imap_class("localhost", addr, timeout=None)
468 self.assertEqual(client.sock.timeout, None)
469 client.shutdown()
470 client = self.imap_class("localhost", addr, timeout=support.LOOPBACK_TIMEOUT)
471 self.assertEqual(client.sock.timeout, support.LOOPBACK_TIMEOUT)
472 client.shutdown()
473 with self.assertRaises(ValueError):
474 client = self.imap_class("localhost", addr, timeout=0)
475
476 def test_imaplib_timeout_functionality_test(self):
477 class ESC[4;38;5;81mTimeoutHandler(ESC[4;38;5;149mSimpleIMAPHandler):
478 def handle(self):
479 time.sleep(1)
480 SimpleIMAPHandler.handle(self)
481
482 _, server = self._setup(TimeoutHandler)
483 addr = server.server_address[1]
484 with self.assertRaises(TimeoutError):
485 client = self.imap_class("localhost", addr, timeout=0.001)
486
487 def test_with_statement(self):
488 _, server = self._setup(SimpleIMAPHandler, connect=False)
489 with self.imap_class(*server.server_address) as imap:
490 imap.login('user', 'pass')
491 self.assertEqual(server.logged, 'user')
492 self.assertIsNone(server.logged)
493
494 def test_with_statement_logout(self):
495 # It is legal to log out explicitly inside the with block
496 _, server = self._setup(SimpleIMAPHandler, connect=False)
497 with self.imap_class(*server.server_address) as imap:
498 imap.login('user', 'pass')
499 self.assertEqual(server.logged, 'user')
500 imap.logout()
501 self.assertIsNone(server.logged)
502 self.assertIsNone(server.logged)
503
504 # command tests
505
506 def test_login(self):
507 client, _ = self._setup(SimpleIMAPHandler)
508 typ, data = client.login('user', 'pass')
509 self.assertEqual(typ, 'OK')
510 self.assertEqual(data[0], b'LOGIN completed')
511 self.assertEqual(client.state, 'AUTH')
512
513 def test_logout(self):
514 client, _ = self._setup(SimpleIMAPHandler)
515 typ, data = client.login('user', 'pass')
516 self.assertEqual(typ, 'OK')
517 self.assertEqual(data[0], b'LOGIN completed')
518 typ, data = client.logout()
519 self.assertEqual(typ, 'BYE', (typ, data))
520 self.assertEqual(data[0], b'IMAP4ref1 Server logging out', (typ, data))
521 self.assertEqual(client.state, 'LOGOUT')
522
523 def test_lsub(self):
524 class ESC[4;38;5;81mLsubCmd(ESC[4;38;5;149mSimpleIMAPHandler):
525 def cmd_LSUB(self, tag, args):
526 self._send_textline('* LSUB () "." directoryA')
527 return self._send_tagged(tag, 'OK', 'LSUB completed')
528 client, _ = self._setup(LsubCmd)
529 client.login('user', 'pass')
530 typ, data = client.lsub()
531 self.assertEqual(typ, 'OK')
532 self.assertEqual(data[0], b'() "." directoryA')
533
534 def test_unselect(self):
535 client, _ = self._setup(SimpleIMAPHandler)
536 client.login('user', 'pass')
537 typ, data = client.select()
538 self.assertEqual(typ, 'OK')
539 self.assertEqual(data[0], b'2')
540
541 typ, data = client.unselect()
542 self.assertEqual(typ, 'OK')
543 self.assertEqual(data[0], b'Returned to authenticated state. (Success)')
544 self.assertEqual(client.state, 'AUTH')
545
546
547 class ESC[4;38;5;81mNewIMAPTests(ESC[4;38;5;149mNewIMAPTestsMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
548 imap_class = imaplib.IMAP4
549 server_class = socketserver.TCPServer
550
551
552 @unittest.skipUnless(ssl, "SSL not available")
553 class ESC[4;38;5;81mNewIMAPSSLTests(ESC[4;38;5;149mNewIMAPTestsMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
554 imap_class = IMAP4_SSL
555 server_class = SecureTCPServer
556
557 @requires_resource('walltime')
558 def test_ssl_raises(self):
559 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
560 self.assertEqual(ssl_context.verify_mode, ssl.CERT_REQUIRED)
561 self.assertEqual(ssl_context.check_hostname, True)
562 ssl_context.load_verify_locations(CAFILE)
563
564 with self.assertRaisesRegex(ssl.CertificateError,
565 "IP address mismatch, certificate is not valid for "
566 "'127.0.0.1'"):
567 _, server = self._setup(SimpleIMAPHandler)
568 client = self.imap_class(*server.server_address,
569 ssl_context=ssl_context)
570 client.shutdown()
571
572 @requires_resource('walltime')
573 def test_ssl_verified(self):
574 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
575 ssl_context.load_verify_locations(CAFILE)
576
577 _, server = self._setup(SimpleIMAPHandler)
578 client = self.imap_class("localhost", server.server_address[1],
579 ssl_context=ssl_context)
580 client.shutdown()
581
582 # Mock the private method _connect(), so mark the test as specific
583 # to CPython stdlib
584 @cpython_only
585 def test_certfile_arg_warn(self):
586 with warnings_helper.check_warnings(('', DeprecationWarning)):
587 with mock.patch.object(self.imap_class, 'open'):
588 with mock.patch.object(self.imap_class, '_connect'):
589 self.imap_class('localhost', 143, certfile=CERTFILE)
590
591 class ESC[4;38;5;81mThreadedNetworkedTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
592 server_class = socketserver.TCPServer
593 imap_class = imaplib.IMAP4
594
595 def make_server(self, addr, hdlr):
596
597 class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149mserver_class):
598 def handle_error(self, request, client_address):
599 self.close_request(request)
600 self.server_close()
601 raise
602
603 if verbose:
604 print("creating server")
605 server = MyServer(addr, hdlr)
606 self.assertEqual(server.server_address, server.socket.getsockname())
607
608 if verbose:
609 print("server created")
610 print("ADDR =", addr)
611 print("CLASS =", self.server_class)
612 print("HDLR =", server.RequestHandlerClass)
613
614 t = threading.Thread(
615 name='%s serving' % self.server_class,
616 target=server.serve_forever,
617 # Short poll interval to make the test finish quickly.
618 # Time between requests is short enough that we won't wake
619 # up spuriously too many times.
620 kwargs={'poll_interval': 0.01})
621 t.daemon = True # In case this function raises.
622 t.start()
623 if verbose:
624 print("server running")
625 return server, t
626
627 def reap_server(self, server, thread):
628 if verbose:
629 print("waiting for server")
630 server.shutdown()
631 server.server_close()
632 thread.join()
633 if verbose:
634 print("done")
635
636 @contextmanager
637 def reaped_server(self, hdlr):
638 server, thread = self.make_server((socket_helper.HOST, 0), hdlr)
639 try:
640 yield server
641 finally:
642 self.reap_server(server, thread)
643
644 @contextmanager
645 def reaped_pair(self, hdlr):
646 with self.reaped_server(hdlr) as server:
647 client = self.imap_class(*server.server_address)
648 try:
649 yield server, client
650 finally:
651 client.logout()
652
653 @threading_helper.reap_threads
654 def test_connect(self):
655 with self.reaped_server(SimpleIMAPHandler) as server:
656 client = self.imap_class(*server.server_address)
657 client.shutdown()
658
659 @threading_helper.reap_threads
660 def test_bracket_flags(self):
661
662 # This violates RFC 3501, which disallows ']' characters in tag names,
663 # but imaplib has allowed producing such tags forever, other programs
664 # also produce them (eg: OtherInbox's Organizer app as of 20140716),
665 # and Gmail, for example, accepts them and produces them. So we
666 # support them. See issue #21815.
667
668 class ESC[4;38;5;81mBracketFlagHandler(ESC[4;38;5;149mSimpleIMAPHandler):
669
670 def handle(self):
671 self.flags = ['Answered', 'Flagged', 'Deleted', 'Seen', 'Draft']
672 super().handle()
673
674 def cmd_AUTHENTICATE(self, tag, args):
675 self._send_textline('+')
676 self.server.response = yield
677 self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
678
679 def cmd_SELECT(self, tag, args):
680 flag_msg = ' \\'.join(self.flags)
681 self._send_line(('* FLAGS (%s)' % flag_msg).encode('ascii'))
682 self._send_line(b'* 2 EXISTS')
683 self._send_line(b'* 0 RECENT')
684 msg = ('* OK [PERMANENTFLAGS %s \\*)] Flags permitted.'
685 % flag_msg)
686 self._send_line(msg.encode('ascii'))
687 self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.')
688
689 def cmd_STORE(self, tag, args):
690 new_flags = args[2].strip('(').strip(')').split()
691 self.flags.extend(new_flags)
692 flags_msg = '(FLAGS (%s))' % ' \\'.join(self.flags)
693 msg = '* %s FETCH %s' % (args[0], flags_msg)
694 self._send_line(msg.encode('ascii'))
695 self._send_tagged(tag, 'OK', 'STORE completed.')
696
697 with self.reaped_pair(BracketFlagHandler) as (server, client):
698 code, data = client.authenticate('MYAUTH', lambda x: b'fake')
699 self.assertEqual(code, 'OK')
700 self.assertEqual(server.response, b'ZmFrZQ==\r\n')
701 client.select('test')
702 typ, [data] = client.store(b'1', "+FLAGS", "[test]")
703 self.assertIn(b'[test]', data)
704 client.select('test')
705 typ, [data] = client.response('PERMANENTFLAGS')
706 self.assertIn(b'[test]', data)
707
708 @threading_helper.reap_threads
709 def test_issue5949(self):
710
711 class ESC[4;38;5;81mEOFHandler(ESC[4;38;5;149msocketserverESC[4;38;5;149m.ESC[4;38;5;149mStreamRequestHandler):
712 def handle(self):
713 # EOF without sending a complete welcome message.
714 self.wfile.write(b'* OK')
715
716 with self.reaped_server(EOFHandler) as server:
717 self.assertRaises(imaplib.IMAP4.abort,
718 self.imap_class, *server.server_address)
719
720 @threading_helper.reap_threads
721 def test_line_termination(self):
722
723 class ESC[4;38;5;81mBadNewlineHandler(ESC[4;38;5;149mSimpleIMAPHandler):
724
725 def cmd_CAPABILITY(self, tag, args):
726 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
727 self._send_tagged(tag, 'OK', 'CAPABILITY completed')
728
729 with self.reaped_server(BadNewlineHandler) as server:
730 self.assertRaises(imaplib.IMAP4.abort,
731 self.imap_class, *server.server_address)
732
733 class ESC[4;38;5;81mUTF8Server(ESC[4;38;5;149mSimpleIMAPHandler):
734 capabilities = 'AUTH ENABLE UTF8=ACCEPT'
735
736 def cmd_ENABLE(self, tag, args):
737 self._send_tagged(tag, 'OK', 'ENABLE successful')
738
739 def cmd_AUTHENTICATE(self, tag, args):
740 self._send_textline('+')
741 self.server.response = yield
742 self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
743
744 @threading_helper.reap_threads
745 def test_enable_raises_error_if_not_AUTH(self):
746 with self.reaped_pair(self.UTF8Server) as (server, client):
747 self.assertFalse(client.utf8_enabled)
748 self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo')
749 self.assertFalse(client.utf8_enabled)
750
751 # XXX Also need a test that enable after SELECT raises an error.
752
753 @threading_helper.reap_threads
754 def test_enable_raises_error_if_no_capability(self):
755 class ESC[4;38;5;81mNoEnableServer(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149mUTF8Server):
756 capabilities = 'AUTH'
757 with self.reaped_pair(NoEnableServer) as (server, client):
758 self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo')
759
760 @threading_helper.reap_threads
761 def test_enable_UTF8_raises_error_if_not_supported(self):
762 class ESC[4;38;5;81mNonUTF8Server(ESC[4;38;5;149mSimpleIMAPHandler):
763 pass
764 with self.assertRaises(imaplib.IMAP4.error):
765 with self.reaped_pair(NonUTF8Server) as (server, client):
766 typ, data = client.login('user', 'pass')
767 self.assertEqual(typ, 'OK')
768 client.enable('UTF8=ACCEPT')
769 pass
770
771 @threading_helper.reap_threads
772 def test_enable_UTF8_True_append(self):
773
774 class ESC[4;38;5;81mUTF8AppendServer(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149mUTF8Server):
775 def cmd_APPEND(self, tag, args):
776 self._send_textline('+')
777 self.server.response = yield
778 self._send_tagged(tag, 'OK', 'okay')
779
780 with self.reaped_pair(UTF8AppendServer) as (server, client):
781 self.assertEqual(client._encoding, 'ascii')
782 code, _ = client.authenticate('MYAUTH', lambda x: b'fake')
783 self.assertEqual(code, 'OK')
784 self.assertEqual(server.response,
785 b'ZmFrZQ==\r\n') # b64 encoded 'fake'
786 code, _ = client.enable('UTF8=ACCEPT')
787 self.assertEqual(code, 'OK')
788 self.assertEqual(client._encoding, 'utf-8')
789 msg_string = 'Subject: üñéöðé'
790 typ, data = client.append(
791 None, None, None, msg_string.encode('utf-8'))
792 self.assertEqual(typ, 'OK')
793 self.assertEqual(
794 server.response,
795 ('UTF8 (%s)\r\n' % msg_string).encode('utf-8')
796 )
797
798 # XXX also need a test that makes sure that the Literal and Untagged_status
799 # regexes uses unicode in UTF8 mode instead of the default ASCII.
800
801 @threading_helper.reap_threads
802 def test_search_disallows_charset_in_utf8_mode(self):
803 with self.reaped_pair(self.UTF8Server) as (server, client):
804 typ, _ = client.authenticate('MYAUTH', lambda x: b'fake')
805 self.assertEqual(typ, 'OK')
806 typ, _ = client.enable('UTF8=ACCEPT')
807 self.assertEqual(typ, 'OK')
808 self.assertTrue(client.utf8_enabled)
809 self.assertRaises(imaplib.IMAP4.error, client.search, 'foo', 'bar')
810
811 @threading_helper.reap_threads
812 def test_bad_auth_name(self):
813
814 class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
815
816 def cmd_AUTHENTICATE(self, tag, args):
817 self._send_tagged(tag, 'NO', 'unrecognized authentication '
818 'type {}'.format(args[0]))
819
820 with self.reaped_pair(MyServer) as (server, client):
821 with self.assertRaises(imaplib.IMAP4.error):
822 client.authenticate('METHOD', lambda: 1)
823
824 @threading_helper.reap_threads
825 def test_invalid_authentication(self):
826
827 class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
828
829 def cmd_AUTHENTICATE(self, tag, args):
830 self._send_textline('+')
831 self.response = yield
832 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')
833
834 with self.reaped_pair(MyServer) as (server, client):
835 with self.assertRaises(imaplib.IMAP4.error):
836 code, data = client.authenticate('MYAUTH', lambda x: b'fake')
837
838 @threading_helper.reap_threads
839 def test_valid_authentication(self):
840
841 class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
842
843 def cmd_AUTHENTICATE(self, tag, args):
844 self._send_textline('+')
845 self.server.response = yield
846 self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
847
848 with self.reaped_pair(MyServer) as (server, client):
849 code, data = client.authenticate('MYAUTH', lambda x: b'fake')
850 self.assertEqual(code, 'OK')
851 self.assertEqual(server.response,
852 b'ZmFrZQ==\r\n') # b64 encoded 'fake'
853
854 with self.reaped_pair(MyServer) as (server, client):
855 code, data = client.authenticate('MYAUTH', lambda x: 'fake')
856 self.assertEqual(code, 'OK')
857 self.assertEqual(server.response,
858 b'ZmFrZQ==\r\n') # b64 encoded 'fake'
859
860 @threading_helper.reap_threads
861 @hashlib_helper.requires_hashdigest('md5', openssl=True)
862 def test_login_cram_md5(self):
863
864 class ESC[4;38;5;81mAuthHandler(ESC[4;38;5;149mSimpleIMAPHandler):
865
866 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
867
868 def cmd_AUTHENTICATE(self, tag, args):
869 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
870 'VzdG9uLm1jaS5uZXQ=')
871 r = yield
872 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT'
873 b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'):
874 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
875 else:
876 self._send_tagged(tag, 'NO', 'No access')
877
878 with self.reaped_pair(AuthHandler) as (server, client):
879 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
880 ret, data = client.login_cram_md5("tim", "tanstaaftanstaaf")
881 self.assertEqual(ret, "OK")
882
883 with self.reaped_pair(AuthHandler) as (server, client):
884 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
885 ret, data = client.login_cram_md5("tim", b"tanstaaftanstaaf")
886 self.assertEqual(ret, "OK")
887
888
889 @threading_helper.reap_threads
890 def test_aborted_authentication(self):
891
892 class ESC[4;38;5;81mMyServer(ESC[4;38;5;149mSimpleIMAPHandler):
893
894 def cmd_AUTHENTICATE(self, tag, args):
895 self._send_textline('+')
896 self.response = yield
897
898 if self.response == b'*\r\n':
899 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] aborted')
900 else:
901 self._send_tagged(tag, 'OK', 'MYAUTH successful')
902
903 with self.reaped_pair(MyServer) as (server, client):
904 with self.assertRaises(imaplib.IMAP4.error):
905 code, data = client.authenticate('MYAUTH', lambda x: None)
906
907
908 def test_linetoolong(self):
909 class ESC[4;38;5;81mTooLongHandler(ESC[4;38;5;149mSimpleIMAPHandler):
910 def handle(self):
911 # Send a very long response line
912 self.wfile.write(b'* OK ' + imaplib._MAXLINE * b'x' + b'\r\n')
913
914 with self.reaped_server(TooLongHandler) as server:
915 self.assertRaises(imaplib.IMAP4.error,
916 self.imap_class, *server.server_address)
917
918 @threading_helper.reap_threads
919 def test_simple_with_statement(self):
920 # simplest call
921 with self.reaped_server(SimpleIMAPHandler) as server:
922 with self.imap_class(*server.server_address):
923 pass
924
925 @threading_helper.reap_threads
926 def test_with_statement(self):
927 with self.reaped_server(SimpleIMAPHandler) as server:
928 with self.imap_class(*server.server_address) as imap:
929 imap.login('user', 'pass')
930 self.assertEqual(server.logged, 'user')
931 self.assertIsNone(server.logged)
932
933 @threading_helper.reap_threads
934 def test_with_statement_logout(self):
935 # what happens if already logout in the block?
936 with self.reaped_server(SimpleIMAPHandler) as server:
937 with self.imap_class(*server.server_address) as imap:
938 imap.login('user', 'pass')
939 self.assertEqual(server.logged, 'user')
940 imap.logout()
941 self.assertIsNone(server.logged)
942 self.assertIsNone(server.logged)
943
944 @threading_helper.reap_threads
945 @cpython_only
946 @unittest.skipUnless(__debug__, "Won't work if __debug__ is False")
947 def test_dump_ur(self):
948 # See: http://bugs.python.org/issue26543
949 untagged_resp_dict = {'READ-WRITE': [b'']}
950
951 with self.reaped_server(SimpleIMAPHandler) as server:
952 with self.imap_class(*server.server_address) as imap:
953 with mock.patch.object(imap, '_mesg') as mock_mesg:
954 imap._dump_ur(untagged_resp_dict)
955 mock_mesg.assert_called_with(
956 "untagged responses dump:READ-WRITE: [b'']"
957 )
958
959
960 @unittest.skipUnless(ssl, "SSL not available")
961 class ESC[4;38;5;81mThreadedNetworkedTestsSSL(ESC[4;38;5;149mThreadedNetworkedTests):
962 server_class = SecureTCPServer
963 imap_class = IMAP4_SSL
964
965 @threading_helper.reap_threads
966 def test_ssl_verified(self):
967 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
968 ssl_context.load_verify_locations(CAFILE)
969
970 with self.assertRaisesRegex(
971 ssl.CertificateError,
972 "IP address mismatch, certificate is not valid for "
973 "'127.0.0.1'"):
974 with self.reaped_server(SimpleIMAPHandler) as server:
975 client = self.imap_class(*server.server_address,
976 ssl_context=ssl_context)
977 client.shutdown()
978
979 with self.reaped_server(SimpleIMAPHandler) as server:
980 client = self.imap_class("localhost", server.server_address[1],
981 ssl_context=ssl_context)
982 client.shutdown()
983
984
985 @unittest.skipUnless(
986 support.is_resource_enabled('network'), 'network resource disabled')
987 @unittest.skip('cyrus.andrew.cmu.edu blocks connections')
988 class ESC[4;38;5;81mRemoteIMAPTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
989 host = 'cyrus.andrew.cmu.edu'
990 port = 143
991 username = 'anonymous'
992 password = 'pass'
993 imap_class = imaplib.IMAP4
994
995 def setUp(self):
996 with socket_helper.transient_internet(self.host):
997 self.server = self.imap_class(self.host, self.port)
998
999 def tearDown(self):
1000 if self.server is not None:
1001 with socket_helper.transient_internet(self.host):
1002 self.server.logout()
1003
1004 def test_logincapa(self):
1005 with socket_helper.transient_internet(self.host):
1006 for cap in self.server.capabilities:
1007 self.assertIsInstance(cap, str)
1008 self.assertIn('LOGINDISABLED', self.server.capabilities)
1009 self.assertIn('AUTH=ANONYMOUS', self.server.capabilities)
1010 rs = self.server.login(self.username, self.password)
1011 self.assertEqual(rs[0], 'OK')
1012
1013 def test_logout(self):
1014 with socket_helper.transient_internet(self.host):
1015 rs = self.server.logout()
1016 self.server = None
1017 self.assertEqual(rs[0], 'BYE', rs)
1018
1019
1020 @unittest.skipUnless(ssl, "SSL not available")
1021 @unittest.skipUnless(
1022 support.is_resource_enabled('network'), 'network resource disabled')
1023 @unittest.skip('cyrus.andrew.cmu.edu blocks connections')
1024 class ESC[4;38;5;81mRemoteIMAP_STARTTLSTest(ESC[4;38;5;149mRemoteIMAPTest):
1025
1026 def setUp(self):
1027 super().setUp()
1028 with socket_helper.transient_internet(self.host):
1029 rs = self.server.starttls()
1030 self.assertEqual(rs[0], 'OK')
1031
1032 def test_logincapa(self):
1033 for cap in self.server.capabilities:
1034 self.assertIsInstance(cap, str)
1035 self.assertNotIn('LOGINDISABLED', self.server.capabilities)
1036
1037
1038 @unittest.skipUnless(ssl, "SSL not available")
1039 @unittest.skip('cyrus.andrew.cmu.edu blocks connections')
1040 class ESC[4;38;5;81mRemoteIMAP_SSLTest(ESC[4;38;5;149mRemoteIMAPTest):
1041 port = 993
1042 imap_class = IMAP4_SSL
1043
1044 def setUp(self):
1045 pass
1046
1047 def tearDown(self):
1048 pass
1049
1050 def create_ssl_context(self):
1051 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1052 ssl_context.check_hostname = False
1053 ssl_context.verify_mode = ssl.CERT_NONE
1054 ssl_context.load_cert_chain(CERTFILE)
1055 return ssl_context
1056
1057 def check_logincapa(self, server):
1058 try:
1059 for cap in server.capabilities:
1060 self.assertIsInstance(cap, str)
1061 self.assertNotIn('LOGINDISABLED', server.capabilities)
1062 self.assertIn('AUTH=PLAIN', server.capabilities)
1063 rs = server.login(self.username, self.password)
1064 self.assertEqual(rs[0], 'OK')
1065 finally:
1066 server.logout()
1067
1068 def test_logincapa(self):
1069 with socket_helper.transient_internet(self.host):
1070 _server = self.imap_class(self.host, self.port)
1071 self.check_logincapa(_server)
1072
1073 def test_logout(self):
1074 with socket_helper.transient_internet(self.host):
1075 _server = self.imap_class(self.host, self.port)
1076 rs = _server.logout()
1077 self.assertEqual(rs[0], 'BYE', rs)
1078
1079 def test_ssl_context_certfile_exclusive(self):
1080 with socket_helper.transient_internet(self.host):
1081 self.assertRaises(
1082 ValueError, self.imap_class, self.host, self.port,
1083 certfile=CERTFILE, ssl_context=self.create_ssl_context())
1084
1085 def test_ssl_context_keyfile_exclusive(self):
1086 with socket_helper.transient_internet(self.host):
1087 self.assertRaises(
1088 ValueError, self.imap_class, self.host, self.port,
1089 keyfile=CERTFILE, ssl_context=self.create_ssl_context())
1090
1091
1092 if __name__ == "__main__":
1093 unittest.main()