1 """IMAP4 client.
2
3 Based on RFC 2060.
4
5 Public class: IMAP4
6 Public variable: Debug
7 Public functions: Internaldate2tuple
8 Int2AP
9 ParseFlags
10 Time2Internaldate
11 """
12
13 # Author: Piers Lauder <piers@cs.su.oz.au> December 1997.
14 #
15 # Authentication code contributed by Donn Cave <donn@u.washington.edu> June 1998.
16 # String method conversion by ESR, February 2001.
17 # GET/SETACL contributed by Anthony Baxter <anthony@interlink.com.au> April 2001.
18 # IMAP4_SSL contributed by Tino Lange <Tino.Lange@isg.de> March 2002.
19 # GET/SETQUOTA contributed by Andreas Zeidler <az@kreativkombinat.de> June 2002.
20 # PROXYAUTH contributed by Rick Holbert <holbert.13@osu.edu> November 2002.
21 # GET/SETANNOTATION contributed by Tomas Lindroos <skitta@abo.fi> June 2005.
22
23 __version__ = "2.58"
24
25 import binascii, errno, random, re, socket, subprocess, sys, time, calendar
26 from datetime import datetime, timezone, timedelta
27 from io import DEFAULT_BUFFER_SIZE
28
29 try:
30 import ssl
31 HAVE_SSL = True
32 except ImportError:
33 HAVE_SSL = False
34
35 __all__ = ["IMAP4", "IMAP4_stream", "Internaldate2tuple",
36 "Int2AP", "ParseFlags", "Time2Internaldate"]
37
38 # Globals
39
40 CRLF = b'\r\n'
41 Debug = 0
42 IMAP4_PORT = 143
43 IMAP4_SSL_PORT = 993
44 AllowedVersions = ('IMAP4REV1', 'IMAP4') # Most recent first
45
46 # Maximal line length when calling readline(). This is to prevent
47 # reading arbitrary length lines. RFC 3501 and 2060 (IMAP 4rev1)
48 # don't specify a line length. RFC 2683 suggests limiting client
49 # command lines to 1000 octets and that servers should be prepared
50 # to accept command lines up to 8000 octets, so we used to use 10K here.
51 # In the modern world (eg: gmail) the response to, for example, a
52 # search command can be quite large, so we now use 1M.
53 _MAXLINE = 1000000
54
55
56 # Commands
57
58 Commands = {
59 # name valid states
60 'APPEND': ('AUTH', 'SELECTED'),
61 'AUTHENTICATE': ('NONAUTH',),
62 'CAPABILITY': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'),
63 'CHECK': ('SELECTED',),
64 'CLOSE': ('SELECTED',),
65 'COPY': ('SELECTED',),
66 'CREATE': ('AUTH', 'SELECTED'),
67 'DELETE': ('AUTH', 'SELECTED'),
68 'DELETEACL': ('AUTH', 'SELECTED'),
69 'ENABLE': ('AUTH', ),
70 'EXAMINE': ('AUTH', 'SELECTED'),
71 'EXPUNGE': ('SELECTED',),
72 'FETCH': ('SELECTED',),
73 'GETACL': ('AUTH', 'SELECTED'),
74 'GETANNOTATION':('AUTH', 'SELECTED'),
75 'GETQUOTA': ('AUTH', 'SELECTED'),
76 'GETQUOTAROOT': ('AUTH', 'SELECTED'),
77 'MYRIGHTS': ('AUTH', 'SELECTED'),
78 'LIST': ('AUTH', 'SELECTED'),
79 'LOGIN': ('NONAUTH',),
80 'LOGOUT': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'),
81 'LSUB': ('AUTH', 'SELECTED'),
82 'MOVE': ('SELECTED',),
83 'NAMESPACE': ('AUTH', 'SELECTED'),
84 'NOOP': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'),
85 'PARTIAL': ('SELECTED',), # NB: obsolete
86 'PROXYAUTH': ('AUTH',),
87 'RENAME': ('AUTH', 'SELECTED'),
88 'SEARCH': ('SELECTED',),
89 'SELECT': ('AUTH', 'SELECTED'),
90 'SETACL': ('AUTH', 'SELECTED'),
91 'SETANNOTATION':('AUTH', 'SELECTED'),
92 'SETQUOTA': ('AUTH', 'SELECTED'),
93 'SORT': ('SELECTED',),
94 'STARTTLS': ('NONAUTH',),
95 'STATUS': ('AUTH', 'SELECTED'),
96 'STORE': ('SELECTED',),
97 'SUBSCRIBE': ('AUTH', 'SELECTED'),
98 'THREAD': ('SELECTED',),
99 'UID': ('SELECTED',),
100 'UNSUBSCRIBE': ('AUTH', 'SELECTED'),
101 'UNSELECT': ('SELECTED',),
102 }
103
104 # Patterns to match server responses
105
106 Continuation = re.compile(br'\+( (?P<data>.*))?')
107 Flags = re.compile(br'.*FLAGS \((?P<flags>[^\)]*)\)')
108 InternalDate = re.compile(br'.*INTERNALDATE "'
109 br'(?P<day>[ 0123][0-9])-(?P<mon>[A-Z][a-z][a-z])-(?P<year>[0-9][0-9][0-9][0-9])'
110 br' (?P<hour>[0-9][0-9]):(?P<min>[0-9][0-9]):(?P<sec>[0-9][0-9])'
111 br' (?P<zonen>[-+])(?P<zoneh>[0-9][0-9])(?P<zonem>[0-9][0-9])'
112 br'"')
113 # Literal is no longer used; kept for backward compatibility.
114 Literal = re.compile(br'.*{(?P<size>\d+)}$', re.ASCII)
115 MapCRLF = re.compile(br'\r\n|\r|\n')
116 # We no longer exclude the ']' character from the data portion of the response
117 # code, even though it violates the RFC. Popular IMAP servers such as Gmail
118 # allow flags with ']', and there are programs (including imaplib!) that can
119 # produce them. The problem with this is if the 'text' portion of the response
120 # includes a ']' we'll parse the response wrong (which is the point of the RFC
121 # restriction). However, that seems less likely to be a problem in practice
122 # than being unable to correctly parse flags that include ']' chars, which
123 # was reported as a real-world problem in issue #21815.
124 Response_code = re.compile(br'\[(?P<type>[A-Z-]+)( (?P<data>.*))?\]')
125 Untagged_response = re.compile(br'\* (?P<type>[A-Z-]+)( (?P<data>.*))?')
126 # Untagged_status is no longer used; kept for backward compatibility
127 Untagged_status = re.compile(
128 br'\* (?P<data>\d+) (?P<type>[A-Z-]+)( (?P<data2>.*))?', re.ASCII)
129 # We compile these in _mode_xxx.
130 _Literal = br'.*{(?P<size>\d+)}$'
131 _Untagged_status = br'\* (?P<data>\d+) (?P<type>[A-Z-]+)( (?P<data2>.*))?'
132
133
134
135 class ESC[4;38;5;81mIMAP4:
136
137 r"""IMAP4 client class.
138
139 Instantiate with: IMAP4([host[, port[, timeout=None]]])
140
141 host - host's name (default: localhost);
142 port - port number (default: standard IMAP4 port).
143 timeout - socket timeout (default: None)
144 If timeout is not given or is None,
145 the global default socket timeout is used
146
147 All IMAP4rev1 commands are supported by methods of the same
148 name (in lowercase).
149
150 All arguments to commands are converted to strings, except for
151 AUTHENTICATE, and the last argument to APPEND which is passed as
152 an IMAP4 literal. If necessary (the string contains any
153 non-printing characters or white-space and isn't enclosed with
154 either parentheses or double quotes) each string is quoted.
155 However, the 'password' argument to the LOGIN command is always
156 quoted. If you want to avoid having an argument string quoted
157 (eg: the 'flags' argument to STORE) then enclose the string in
158 parentheses (eg: "(\Deleted)").
159
160 Each command returns a tuple: (type, [data, ...]) where 'type'
161 is usually 'OK' or 'NO', and 'data' is either the text from the
162 tagged response, or untagged results from command. Each 'data'
163 is either a string, or a tuple. If a tuple, then the first part
164 is the header of the response, and the second part contains
165 the data (ie: 'literal' value).
166
167 Errors raise the exception class <instance>.error("<reason>").
168 IMAP4 server errors raise <instance>.abort("<reason>"),
169 which is a sub-class of 'error'. Mailbox status changes
170 from READ-WRITE to READ-ONLY raise the exception class
171 <instance>.readonly("<reason>"), which is a sub-class of 'abort'.
172
173 "error" exceptions imply a program error.
174 "abort" exceptions imply the connection should be reset, and
175 the command re-tried.
176 "readonly" exceptions imply the command should be re-tried.
177
178 Note: to use this module, you must read the RFCs pertaining to the
179 IMAP4 protocol, as the semantics of the arguments to each IMAP4
180 command are left to the invoker, not to mention the results. Also,
181 most IMAP servers implement a sub-set of the commands available here.
182 """
183
184 class ESC[4;38;5;81merror(ESC[4;38;5;149mException): pass # Logical errors - debug required
185 class ESC[4;38;5;81mabort(ESC[4;38;5;149merror): pass # Service errors - close and retry
186 class ESC[4;38;5;81mreadonly(ESC[4;38;5;149mabort): pass # Mailbox status changed to READ-ONLY
187
188 def __init__(self, host='', port=IMAP4_PORT, timeout=None):
189 self.debug = Debug
190 self.state = 'LOGOUT'
191 self.literal = None # A literal argument to a command
192 self.tagged_commands = {} # Tagged commands awaiting response
193 self.untagged_responses = {} # {typ: [data, ...], ...}
194 self.continuation_response = '' # Last continuation response
195 self.is_readonly = False # READ-ONLY desired state
196 self.tagnum = 0
197 self._tls_established = False
198 self._mode_ascii()
199
200 # Open socket to server.
201
202 self.open(host, port, timeout)
203
204 try:
205 self._connect()
206 except Exception:
207 try:
208 self.shutdown()
209 except OSError:
210 pass
211 raise
212
213 def _mode_ascii(self):
214 self.utf8_enabled = False
215 self._encoding = 'ascii'
216 self.Literal = re.compile(_Literal, re.ASCII)
217 self.Untagged_status = re.compile(_Untagged_status, re.ASCII)
218
219
220 def _mode_utf8(self):
221 self.utf8_enabled = True
222 self._encoding = 'utf-8'
223 self.Literal = re.compile(_Literal)
224 self.Untagged_status = re.compile(_Untagged_status)
225
226
227 def _connect(self):
228 # Create unique tag for this session,
229 # and compile tagged response matcher.
230
231 self.tagpre = Int2AP(random.randint(4096, 65535))
232 self.tagre = re.compile(br'(?P<tag>'
233 + self.tagpre
234 + br'\d+) (?P<type>[A-Z]+) (?P<data>.*)', re.ASCII)
235
236 # Get server welcome message,
237 # request and store CAPABILITY response.
238
239 if __debug__:
240 self._cmd_log_len = 10
241 self._cmd_log_idx = 0
242 self._cmd_log = {} # Last `_cmd_log_len' interactions
243 if self.debug >= 1:
244 self._mesg('imaplib version %s' % __version__)
245 self._mesg('new IMAP4 connection, tag=%s' % self.tagpre)
246
247 self.welcome = self._get_response()
248 if 'PREAUTH' in self.untagged_responses:
249 self.state = 'AUTH'
250 elif 'OK' in self.untagged_responses:
251 self.state = 'NONAUTH'
252 else:
253 raise self.error(self.welcome)
254
255 self._get_capabilities()
256 if __debug__:
257 if self.debug >= 3:
258 self._mesg('CAPABILITIES: %r' % (self.capabilities,))
259
260 for version in AllowedVersions:
261 if not version in self.capabilities:
262 continue
263 self.PROTOCOL_VERSION = version
264 return
265
266 raise self.error('server not IMAP4 compliant')
267
268
269 def __getattr__(self, attr):
270 # Allow UPPERCASE variants of IMAP4 command methods.
271 if attr in Commands:
272 return getattr(self, attr.lower())
273 raise AttributeError("Unknown IMAP4 command: '%s'" % attr)
274
275 def __enter__(self):
276 return self
277
278 def __exit__(self, *args):
279 if self.state == "LOGOUT":
280 return
281
282 try:
283 self.logout()
284 except OSError:
285 pass
286
287
288 # Overridable methods
289
290
291 def _create_socket(self, timeout):
292 # Default value of IMAP4.host is '', but socket.getaddrinfo()
293 # (which is used by socket.create_connection()) expects None
294 # as a default value for host.
295 if timeout is not None and not timeout:
296 raise ValueError('Non-blocking socket (timeout=0) is not supported')
297 host = None if not self.host else self.host
298 sys.audit("imaplib.open", self, self.host, self.port)
299 address = (host, self.port)
300 if timeout is not None:
301 return socket.create_connection(address, timeout)
302 return socket.create_connection(address)
303
304 def open(self, host='', port=IMAP4_PORT, timeout=None):
305 """Setup connection to remote server on "host:port"
306 (default: localhost:standard IMAP4 port).
307 This connection will be used by the routines:
308 read, readline, send, shutdown.
309 """
310 self.host = host
311 self.port = port
312 self.sock = self._create_socket(timeout)
313 self.file = self.sock.makefile('rb')
314
315
316 def read(self, size):
317 """Read 'size' bytes from remote."""
318 return self.file.read(size)
319
320
321 def readline(self):
322 """Read line from remote."""
323 line = self.file.readline(_MAXLINE + 1)
324 if len(line) > _MAXLINE:
325 raise self.error("got more than %d bytes" % _MAXLINE)
326 return line
327
328
329 def send(self, data):
330 """Send data to remote."""
331 sys.audit("imaplib.send", self, data)
332 self.sock.sendall(data)
333
334
335 def shutdown(self):
336 """Close I/O established in "open"."""
337 self.file.close()
338 try:
339 self.sock.shutdown(socket.SHUT_RDWR)
340 except OSError as exc:
341 # The server might already have closed the connection.
342 # On Windows, this may result in WSAEINVAL (error 10022):
343 # An invalid operation was attempted.
344 if (exc.errno != errno.ENOTCONN
345 and getattr(exc, 'winerror', 0) != 10022):
346 raise
347 finally:
348 self.sock.close()
349
350
351 def socket(self):
352 """Return socket instance used to connect to IMAP4 server.
353
354 socket = <instance>.socket()
355 """
356 return self.sock
357
358
359
360 # Utility methods
361
362
363 def recent(self):
364 """Return most recent 'RECENT' responses if any exist,
365 else prompt server for an update using the 'NOOP' command.
366
367 (typ, [data]) = <instance>.recent()
368
369 'data' is None if no new messages,
370 else list of RECENT responses, most recent last.
371 """
372 name = 'RECENT'
373 typ, dat = self._untagged_response('OK', [None], name)
374 if dat[-1]:
375 return typ, dat
376 typ, dat = self.noop() # Prod server for response
377 return self._untagged_response(typ, dat, name)
378
379
380 def response(self, code):
381 """Return data for response 'code' if received, or None.
382
383 Old value for response 'code' is cleared.
384
385 (code, [data]) = <instance>.response(code)
386 """
387 return self._untagged_response(code, [None], code.upper())
388
389
390
391 # IMAP4 commands
392
393
394 def append(self, mailbox, flags, date_time, message):
395 """Append message to named mailbox.
396
397 (typ, [data]) = <instance>.append(mailbox, flags, date_time, message)
398
399 All args except `message' can be None.
400 """
401 name = 'APPEND'
402 if not mailbox:
403 mailbox = 'INBOX'
404 if flags:
405 if (flags[0],flags[-1]) != ('(',')'):
406 flags = '(%s)' % flags
407 else:
408 flags = None
409 if date_time:
410 date_time = Time2Internaldate(date_time)
411 else:
412 date_time = None
413 literal = MapCRLF.sub(CRLF, message)
414 if self.utf8_enabled:
415 literal = b'UTF8 (' + literal + b')'
416 self.literal = literal
417 return self._simple_command(name, mailbox, flags, date_time)
418
419
420 def authenticate(self, mechanism, authobject):
421 """Authenticate command - requires response processing.
422
423 'mechanism' specifies which authentication mechanism is to
424 be used - it must appear in <instance>.capabilities in the
425 form AUTH=<mechanism>.
426
427 'authobject' must be a callable object:
428
429 data = authobject(response)
430
431 It will be called to process server continuation responses; the
432 response argument it is passed will be a bytes. It should return bytes
433 data that will be base64 encoded and sent to the server. It should
434 return None if the client abort response '*' should be sent instead.
435 """
436 mech = mechanism.upper()
437 # XXX: shouldn't this code be removed, not commented out?
438 #cap = 'AUTH=%s' % mech
439 #if not cap in self.capabilities: # Let the server decide!
440 # raise self.error("Server doesn't allow %s authentication." % mech)
441 self.literal = _Authenticator(authobject).process
442 typ, dat = self._simple_command('AUTHENTICATE', mech)
443 if typ != 'OK':
444 raise self.error(dat[-1].decode('utf-8', 'replace'))
445 self.state = 'AUTH'
446 return typ, dat
447
448
449 def capability(self):
450 """(typ, [data]) = <instance>.capability()
451 Fetch capabilities list from server."""
452
453 name = 'CAPABILITY'
454 typ, dat = self._simple_command(name)
455 return self._untagged_response(typ, dat, name)
456
457
458 def check(self):
459 """Checkpoint mailbox on server.
460
461 (typ, [data]) = <instance>.check()
462 """
463 return self._simple_command('CHECK')
464
465
466 def close(self):
467 """Close currently selected mailbox.
468
469 Deleted messages are removed from writable mailbox.
470 This is the recommended command before 'LOGOUT'.
471
472 (typ, [data]) = <instance>.close()
473 """
474 try:
475 typ, dat = self._simple_command('CLOSE')
476 finally:
477 self.state = 'AUTH'
478 return typ, dat
479
480
481 def copy(self, message_set, new_mailbox):
482 """Copy 'message_set' messages onto end of 'new_mailbox'.
483
484 (typ, [data]) = <instance>.copy(message_set, new_mailbox)
485 """
486 return self._simple_command('COPY', message_set, new_mailbox)
487
488
489 def create(self, mailbox):
490 """Create new mailbox.
491
492 (typ, [data]) = <instance>.create(mailbox)
493 """
494 return self._simple_command('CREATE', mailbox)
495
496
497 def delete(self, mailbox):
498 """Delete old mailbox.
499
500 (typ, [data]) = <instance>.delete(mailbox)
501 """
502 return self._simple_command('DELETE', mailbox)
503
504 def deleteacl(self, mailbox, who):
505 """Delete the ACLs (remove any rights) set for who on mailbox.
506
507 (typ, [data]) = <instance>.deleteacl(mailbox, who)
508 """
509 return self._simple_command('DELETEACL', mailbox, who)
510
511 def enable(self, capability):
512 """Send an RFC5161 enable string to the server.
513
514 (typ, [data]) = <instance>.enable(capability)
515 """
516 if 'ENABLE' not in self.capabilities:
517 raise IMAP4.error("Server does not support ENABLE")
518 typ, data = self._simple_command('ENABLE', capability)
519 if typ == 'OK' and 'UTF8=ACCEPT' in capability.upper():
520 self._mode_utf8()
521 return typ, data
522
523 def expunge(self):
524 """Permanently remove deleted items from selected mailbox.
525
526 Generates 'EXPUNGE' response for each deleted message.
527
528 (typ, [data]) = <instance>.expunge()
529
530 'data' is list of 'EXPUNGE'd message numbers in order received.
531 """
532 name = 'EXPUNGE'
533 typ, dat = self._simple_command(name)
534 return self._untagged_response(typ, dat, name)
535
536
537 def fetch(self, message_set, message_parts):
538 """Fetch (parts of) messages.
539
540 (typ, [data, ...]) = <instance>.fetch(message_set, message_parts)
541
542 'message_parts' should be a string of selected parts
543 enclosed in parentheses, eg: "(UID BODY[TEXT])".
544
545 'data' are tuples of message part envelope and data.
546 """
547 name = 'FETCH'
548 typ, dat = self._simple_command(name, message_set, message_parts)
549 return self._untagged_response(typ, dat, name)
550
551
552 def getacl(self, mailbox):
553 """Get the ACLs for a mailbox.
554
555 (typ, [data]) = <instance>.getacl(mailbox)
556 """
557 typ, dat = self._simple_command('GETACL', mailbox)
558 return self._untagged_response(typ, dat, 'ACL')
559
560
561 def getannotation(self, mailbox, entry, attribute):
562 """(typ, [data]) = <instance>.getannotation(mailbox, entry, attribute)
563 Retrieve ANNOTATIONs."""
564
565 typ, dat = self._simple_command('GETANNOTATION', mailbox, entry, attribute)
566 return self._untagged_response(typ, dat, 'ANNOTATION')
567
568
569 def getquota(self, root):
570 """Get the quota root's resource usage and limits.
571
572 Part of the IMAP4 QUOTA extension defined in rfc2087.
573
574 (typ, [data]) = <instance>.getquota(root)
575 """
576 typ, dat = self._simple_command('GETQUOTA', root)
577 return self._untagged_response(typ, dat, 'QUOTA')
578
579
580 def getquotaroot(self, mailbox):
581 """Get the list of quota roots for the named mailbox.
582
583 (typ, [[QUOTAROOT responses...], [QUOTA responses]]) = <instance>.getquotaroot(mailbox)
584 """
585 typ, dat = self._simple_command('GETQUOTAROOT', mailbox)
586 typ, quota = self._untagged_response(typ, dat, 'QUOTA')
587 typ, quotaroot = self._untagged_response(typ, dat, 'QUOTAROOT')
588 return typ, [quotaroot, quota]
589
590
591 def list(self, directory='""', pattern='*'):
592 """List mailbox names in directory matching pattern.
593
594 (typ, [data]) = <instance>.list(directory='""', pattern='*')
595
596 'data' is list of LIST responses.
597 """
598 name = 'LIST'
599 typ, dat = self._simple_command(name, directory, pattern)
600 return self._untagged_response(typ, dat, name)
601
602
603 def login(self, user, password):
604 """Identify client using plaintext password.
605
606 (typ, [data]) = <instance>.login(user, password)
607
608 NB: 'password' will be quoted.
609 """
610 typ, dat = self._simple_command('LOGIN', user, self._quote(password))
611 if typ != 'OK':
612 raise self.error(dat[-1])
613 self.state = 'AUTH'
614 return typ, dat
615
616
617 def login_cram_md5(self, user, password):
618 """ Force use of CRAM-MD5 authentication.
619
620 (typ, [data]) = <instance>.login_cram_md5(user, password)
621 """
622 self.user, self.password = user, password
623 return self.authenticate('CRAM-MD5', self._CRAM_MD5_AUTH)
624
625
626 def _CRAM_MD5_AUTH(self, challenge):
627 """ Authobject to use with CRAM-MD5 authentication. """
628 import hmac
629 pwd = (self.password.encode('utf-8') if isinstance(self.password, str)
630 else self.password)
631 return self.user + " " + hmac.HMAC(pwd, challenge, 'md5').hexdigest()
632
633
634 def logout(self):
635 """Shutdown connection to server.
636
637 (typ, [data]) = <instance>.logout()
638
639 Returns server 'BYE' response.
640 """
641 self.state = 'LOGOUT'
642 typ, dat = self._simple_command('LOGOUT')
643 self.shutdown()
644 return typ, dat
645
646
647 def lsub(self, directory='""', pattern='*'):
648 """List 'subscribed' mailbox names in directory matching pattern.
649
650 (typ, [data, ...]) = <instance>.lsub(directory='""', pattern='*')
651
652 'data' are tuples of message part envelope and data.
653 """
654 name = 'LSUB'
655 typ, dat = self._simple_command(name, directory, pattern)
656 return self._untagged_response(typ, dat, name)
657
658 def myrights(self, mailbox):
659 """Show my ACLs for a mailbox (i.e. the rights that I have on mailbox).
660
661 (typ, [data]) = <instance>.myrights(mailbox)
662 """
663 typ,dat = self._simple_command('MYRIGHTS', mailbox)
664 return self._untagged_response(typ, dat, 'MYRIGHTS')
665
666 def namespace(self):
667 """ Returns IMAP namespaces ala rfc2342
668
669 (typ, [data, ...]) = <instance>.namespace()
670 """
671 name = 'NAMESPACE'
672 typ, dat = self._simple_command(name)
673 return self._untagged_response(typ, dat, name)
674
675
676 def noop(self):
677 """Send NOOP command.
678
679 (typ, [data]) = <instance>.noop()
680 """
681 if __debug__:
682 if self.debug >= 3:
683 self._dump_ur(self.untagged_responses)
684 return self._simple_command('NOOP')
685
686
687 def partial(self, message_num, message_part, start, length):
688 """Fetch truncated part of a message.
689
690 (typ, [data, ...]) = <instance>.partial(message_num, message_part, start, length)
691
692 'data' is tuple of message part envelope and data.
693 """
694 name = 'PARTIAL'
695 typ, dat = self._simple_command(name, message_num, message_part, start, length)
696 return self._untagged_response(typ, dat, 'FETCH')
697
698
699 def proxyauth(self, user):
700 """Assume authentication as "user".
701
702 Allows an authorised administrator to proxy into any user's
703 mailbox.
704
705 (typ, [data]) = <instance>.proxyauth(user)
706 """
707
708 name = 'PROXYAUTH'
709 return self._simple_command('PROXYAUTH', user)
710
711
712 def rename(self, oldmailbox, newmailbox):
713 """Rename old mailbox name to new.
714
715 (typ, [data]) = <instance>.rename(oldmailbox, newmailbox)
716 """
717 return self._simple_command('RENAME', oldmailbox, newmailbox)
718
719
720 def search(self, charset, *criteria):
721 """Search mailbox for matching messages.
722
723 (typ, [data]) = <instance>.search(charset, criterion, ...)
724
725 'data' is space separated list of matching message numbers.
726 If UTF8 is enabled, charset MUST be None.
727 """
728 name = 'SEARCH'
729 if charset:
730 if self.utf8_enabled:
731 raise IMAP4.error("Non-None charset not valid in UTF8 mode")
732 typ, dat = self._simple_command(name, 'CHARSET', charset, *criteria)
733 else:
734 typ, dat = self._simple_command(name, *criteria)
735 return self._untagged_response(typ, dat, name)
736
737
738 def select(self, mailbox='INBOX', readonly=False):
739 """Select a mailbox.
740
741 Flush all untagged responses.
742
743 (typ, [data]) = <instance>.select(mailbox='INBOX', readonly=False)
744
745 'data' is count of messages in mailbox ('EXISTS' response).
746
747 Mandated responses are ('FLAGS', 'EXISTS', 'RECENT', 'UIDVALIDITY'), so
748 other responses should be obtained via <instance>.response('FLAGS') etc.
749 """
750 self.untagged_responses = {} # Flush old responses.
751 self.is_readonly = readonly
752 if readonly:
753 name = 'EXAMINE'
754 else:
755 name = 'SELECT'
756 typ, dat = self._simple_command(name, mailbox)
757 if typ != 'OK':
758 self.state = 'AUTH' # Might have been 'SELECTED'
759 return typ, dat
760 self.state = 'SELECTED'
761 if 'READ-ONLY' in self.untagged_responses \
762 and not readonly:
763 if __debug__:
764 if self.debug >= 1:
765 self._dump_ur(self.untagged_responses)
766 raise self.readonly('%s is not writable' % mailbox)
767 return typ, self.untagged_responses.get('EXISTS', [None])
768
769
770 def setacl(self, mailbox, who, what):
771 """Set a mailbox acl.
772
773 (typ, [data]) = <instance>.setacl(mailbox, who, what)
774 """
775 return self._simple_command('SETACL', mailbox, who, what)
776
777
778 def setannotation(self, *args):
779 """(typ, [data]) = <instance>.setannotation(mailbox[, entry, attribute]+)
780 Set ANNOTATIONs."""
781
782 typ, dat = self._simple_command('SETANNOTATION', *args)
783 return self._untagged_response(typ, dat, 'ANNOTATION')
784
785
786 def setquota(self, root, limits):
787 """Set the quota root's resource limits.
788
789 (typ, [data]) = <instance>.setquota(root, limits)
790 """
791 typ, dat = self._simple_command('SETQUOTA', root, limits)
792 return self._untagged_response(typ, dat, 'QUOTA')
793
794
795 def sort(self, sort_criteria, charset, *search_criteria):
796 """IMAP4rev1 extension SORT command.
797
798 (typ, [data]) = <instance>.sort(sort_criteria, charset, search_criteria, ...)
799 """
800 name = 'SORT'
801 #if not name in self.capabilities: # Let the server decide!
802 # raise self.error('unimplemented extension command: %s' % name)
803 if (sort_criteria[0],sort_criteria[-1]) != ('(',')'):
804 sort_criteria = '(%s)' % sort_criteria
805 typ, dat = self._simple_command(name, sort_criteria, charset, *search_criteria)
806 return self._untagged_response(typ, dat, name)
807
808
809 def starttls(self, ssl_context=None):
810 name = 'STARTTLS'
811 if not HAVE_SSL:
812 raise self.error('SSL support missing')
813 if self._tls_established:
814 raise self.abort('TLS session already established')
815 if name not in self.capabilities:
816 raise self.abort('TLS not supported by server')
817 # Generate a default SSL context if none was passed.
818 if ssl_context is None:
819 ssl_context = ssl._create_stdlib_context()
820 typ, dat = self._simple_command(name)
821 if typ == 'OK':
822 self.sock = ssl_context.wrap_socket(self.sock,
823 server_hostname=self.host)
824 self.file = self.sock.makefile('rb')
825 self._tls_established = True
826 self._get_capabilities()
827 else:
828 raise self.error("Couldn't establish TLS session")
829 return self._untagged_response(typ, dat, name)
830
831
832 def status(self, mailbox, names):
833 """Request named status conditions for mailbox.
834
835 (typ, [data]) = <instance>.status(mailbox, names)
836 """
837 name = 'STATUS'
838 #if self.PROTOCOL_VERSION == 'IMAP4': # Let the server decide!
839 # raise self.error('%s unimplemented in IMAP4 (obtain IMAP4rev1 server, or re-code)' % name)
840 typ, dat = self._simple_command(name, mailbox, names)
841 return self._untagged_response(typ, dat, name)
842
843
844 def store(self, message_set, command, flags):
845 """Alters flag dispositions for messages in mailbox.
846
847 (typ, [data]) = <instance>.store(message_set, command, flags)
848 """
849 if (flags[0],flags[-1]) != ('(',')'):
850 flags = '(%s)' % flags # Avoid quoting the flags
851 typ, dat = self._simple_command('STORE', message_set, command, flags)
852 return self._untagged_response(typ, dat, 'FETCH')
853
854
855 def subscribe(self, mailbox):
856 """Subscribe to new mailbox.
857
858 (typ, [data]) = <instance>.subscribe(mailbox)
859 """
860 return self._simple_command('SUBSCRIBE', mailbox)
861
862
863 def thread(self, threading_algorithm, charset, *search_criteria):
864 """IMAPrev1 extension THREAD command.
865
866 (type, [data]) = <instance>.thread(threading_algorithm, charset, search_criteria, ...)
867 """
868 name = 'THREAD'
869 typ, dat = self._simple_command(name, threading_algorithm, charset, *search_criteria)
870 return self._untagged_response(typ, dat, name)
871
872
873 def uid(self, command, *args):
874 """Execute "command arg ..." with messages identified by UID,
875 rather than message number.
876
877 (typ, [data]) = <instance>.uid(command, arg1, arg2, ...)
878
879 Returns response appropriate to 'command'.
880 """
881 command = command.upper()
882 if not command in Commands:
883 raise self.error("Unknown IMAP4 UID command: %s" % command)
884 if self.state not in Commands[command]:
885 raise self.error("command %s illegal in state %s, "
886 "only allowed in states %s" %
887 (command, self.state,
888 ', '.join(Commands[command])))
889 name = 'UID'
890 typ, dat = self._simple_command(name, command, *args)
891 if command in ('SEARCH', 'SORT', 'THREAD'):
892 name = command
893 else:
894 name = 'FETCH'
895 return self._untagged_response(typ, dat, name)
896
897
898 def unsubscribe(self, mailbox):
899 """Unsubscribe from old mailbox.
900
901 (typ, [data]) = <instance>.unsubscribe(mailbox)
902 """
903 return self._simple_command('UNSUBSCRIBE', mailbox)
904
905
906 def unselect(self):
907 """Free server's resources associated with the selected mailbox
908 and returns the server to the authenticated state.
909 This command performs the same actions as CLOSE, except
910 that no messages are permanently removed from the currently
911 selected mailbox.
912
913 (typ, [data]) = <instance>.unselect()
914 """
915 try:
916 typ, data = self._simple_command('UNSELECT')
917 finally:
918 self.state = 'AUTH'
919 return typ, data
920
921
922 def xatom(self, name, *args):
923 """Allow simple extension commands
924 notified by server in CAPABILITY response.
925
926 Assumes command is legal in current state.
927
928 (typ, [data]) = <instance>.xatom(name, arg, ...)
929
930 Returns response appropriate to extension command `name'.
931 """
932 name = name.upper()
933 #if not name in self.capabilities: # Let the server decide!
934 # raise self.error('unknown extension command: %s' % name)
935 if not name in Commands:
936 Commands[name] = (self.state,)
937 return self._simple_command(name, *args)
938
939
940
941 # Private methods
942
943
944 def _append_untagged(self, typ, dat):
945 if dat is None:
946 dat = b''
947 ur = self.untagged_responses
948 if __debug__:
949 if self.debug >= 5:
950 self._mesg('untagged_responses[%s] %s += ["%r"]' %
951 (typ, len(ur.get(typ,'')), dat))
952 if typ in ur:
953 ur[typ].append(dat)
954 else:
955 ur[typ] = [dat]
956
957
958 def _check_bye(self):
959 bye = self.untagged_responses.get('BYE')
960 if bye:
961 raise self.abort(bye[-1].decode(self._encoding, 'replace'))
962
963
964 def _command(self, name, *args):
965
966 if self.state not in Commands[name]:
967 self.literal = None
968 raise self.error("command %s illegal in state %s, "
969 "only allowed in states %s" %
970 (name, self.state,
971 ', '.join(Commands[name])))
972
973 for typ in ('OK', 'NO', 'BAD'):
974 if typ in self.untagged_responses:
975 del self.untagged_responses[typ]
976
977 if 'READ-ONLY' in self.untagged_responses \
978 and not self.is_readonly:
979 raise self.readonly('mailbox status changed to READ-ONLY')
980
981 tag = self._new_tag()
982 name = bytes(name, self._encoding)
983 data = tag + b' ' + name
984 for arg in args:
985 if arg is None: continue
986 if isinstance(arg, str):
987 arg = bytes(arg, self._encoding)
988 data = data + b' ' + arg
989
990 literal = self.literal
991 if literal is not None:
992 self.literal = None
993 if type(literal) is type(self._command):
994 literator = literal
995 else:
996 literator = None
997 data = data + bytes(' {%s}' % len(literal), self._encoding)
998
999 if __debug__:
1000 if self.debug >= 4:
1001 self._mesg('> %r' % data)
1002 else:
1003 self._log('> %r' % data)
1004
1005 try:
1006 self.send(data + CRLF)
1007 except OSError as val:
1008 raise self.abort('socket error: %s' % val)
1009
1010 if literal is None:
1011 return tag
1012
1013 while 1:
1014 # Wait for continuation response
1015
1016 while self._get_response():
1017 if self.tagged_commands[tag]: # BAD/NO?
1018 return tag
1019
1020 # Send literal
1021
1022 if literator:
1023 literal = literator(self.continuation_response)
1024
1025 if __debug__:
1026 if self.debug >= 4:
1027 self._mesg('write literal size %s' % len(literal))
1028
1029 try:
1030 self.send(literal)
1031 self.send(CRLF)
1032 except OSError as val:
1033 raise self.abort('socket error: %s' % val)
1034
1035 if not literator:
1036 break
1037
1038 return tag
1039
1040
1041 def _command_complete(self, name, tag):
1042 logout = (name == 'LOGOUT')
1043 # BYE is expected after LOGOUT
1044 if not logout:
1045 self._check_bye()
1046 try:
1047 typ, data = self._get_tagged_response(tag, expect_bye=logout)
1048 except self.abort as val:
1049 raise self.abort('command: %s => %s' % (name, val))
1050 except self.error as val:
1051 raise self.error('command: %s => %s' % (name, val))
1052 if not logout:
1053 self._check_bye()
1054 if typ == 'BAD':
1055 raise self.error('%s command error: %s %s' % (name, typ, data))
1056 return typ, data
1057
1058
1059 def _get_capabilities(self):
1060 typ, dat = self.capability()
1061 if dat == [None]:
1062 raise self.error('no CAPABILITY response from server')
1063 dat = str(dat[-1], self._encoding)
1064 dat = dat.upper()
1065 self.capabilities = tuple(dat.split())
1066
1067
1068 def _get_response(self):
1069
1070 # Read response and store.
1071 #
1072 # Returns None for continuation responses,
1073 # otherwise first response line received.
1074
1075 resp = self._get_line()
1076
1077 # Command completion response?
1078
1079 if self._match(self.tagre, resp):
1080 tag = self.mo.group('tag')
1081 if not tag in self.tagged_commands:
1082 raise self.abort('unexpected tagged response: %r' % resp)
1083
1084 typ = self.mo.group('type')
1085 typ = str(typ, self._encoding)
1086 dat = self.mo.group('data')
1087 self.tagged_commands[tag] = (typ, [dat])
1088 else:
1089 dat2 = None
1090
1091 # '*' (untagged) responses?
1092
1093 if not self._match(Untagged_response, resp):
1094 if self._match(self.Untagged_status, resp):
1095 dat2 = self.mo.group('data2')
1096
1097 if self.mo is None:
1098 # Only other possibility is '+' (continuation) response...
1099
1100 if self._match(Continuation, resp):
1101 self.continuation_response = self.mo.group('data')
1102 return None # NB: indicates continuation
1103
1104 raise self.abort("unexpected response: %r" % resp)
1105
1106 typ = self.mo.group('type')
1107 typ = str(typ, self._encoding)
1108 dat = self.mo.group('data')
1109 if dat is None: dat = b'' # Null untagged response
1110 if dat2: dat = dat + b' ' + dat2
1111
1112 # Is there a literal to come?
1113
1114 while self._match(self.Literal, dat):
1115
1116 # Read literal direct from connection.
1117
1118 size = int(self.mo.group('size'))
1119 if __debug__:
1120 if self.debug >= 4:
1121 self._mesg('read literal size %s' % size)
1122 data = self.read(size)
1123
1124 # Store response with literal as tuple
1125
1126 self._append_untagged(typ, (dat, data))
1127
1128 # Read trailer - possibly containing another literal
1129
1130 dat = self._get_line()
1131
1132 self._append_untagged(typ, dat)
1133
1134 # Bracketed response information?
1135
1136 if typ in ('OK', 'NO', 'BAD') and self._match(Response_code, dat):
1137 typ = self.mo.group('type')
1138 typ = str(typ, self._encoding)
1139 self._append_untagged(typ, self.mo.group('data'))
1140
1141 if __debug__:
1142 if self.debug >= 1 and typ in ('NO', 'BAD', 'BYE'):
1143 self._mesg('%s response: %r' % (typ, dat))
1144
1145 return resp
1146
1147
1148 def _get_tagged_response(self, tag, expect_bye=False):
1149
1150 while 1:
1151 result = self.tagged_commands[tag]
1152 if result is not None:
1153 del self.tagged_commands[tag]
1154 return result
1155
1156 if expect_bye:
1157 typ = 'BYE'
1158 bye = self.untagged_responses.pop(typ, None)
1159 if bye is not None:
1160 # Server replies to the "LOGOUT" command with "BYE"
1161 return (typ, bye)
1162
1163 # If we've seen a BYE at this point, the socket will be
1164 # closed, so report the BYE now.
1165 self._check_bye()
1166
1167 # Some have reported "unexpected response" exceptions.
1168 # Note that ignoring them here causes loops.
1169 # Instead, send me details of the unexpected response and
1170 # I'll update the code in `_get_response()'.
1171
1172 try:
1173 self._get_response()
1174 except self.abort as val:
1175 if __debug__:
1176 if self.debug >= 1:
1177 self.print_log()
1178 raise
1179
1180
1181 def _get_line(self):
1182
1183 line = self.readline()
1184 if not line:
1185 raise self.abort('socket error: EOF')
1186
1187 # Protocol mandates all lines terminated by CRLF
1188 if not line.endswith(b'\r\n'):
1189 raise self.abort('socket error: unterminated line: %r' % line)
1190
1191 line = line[:-2]
1192 if __debug__:
1193 if self.debug >= 4:
1194 self._mesg('< %r' % line)
1195 else:
1196 self._log('< %r' % line)
1197 return line
1198
1199
1200 def _match(self, cre, s):
1201
1202 # Run compiled regular expression match method on 's'.
1203 # Save result, return success.
1204
1205 self.mo = cre.match(s)
1206 if __debug__:
1207 if self.mo is not None and self.debug >= 5:
1208 self._mesg("\tmatched %r => %r" % (cre.pattern, self.mo.groups()))
1209 return self.mo is not None
1210
1211
1212 def _new_tag(self):
1213
1214 tag = self.tagpre + bytes(str(self.tagnum), self._encoding)
1215 self.tagnum = self.tagnum + 1
1216 self.tagged_commands[tag] = None
1217 return tag
1218
1219
1220 def _quote(self, arg):
1221
1222 arg = arg.replace('\\', '\\\\')
1223 arg = arg.replace('"', '\\"')
1224
1225 return '"' + arg + '"'
1226
1227
1228 def _simple_command(self, name, *args):
1229
1230 return self._command_complete(name, self._command(name, *args))
1231
1232
1233 def _untagged_response(self, typ, dat, name):
1234 if typ == 'NO':
1235 return typ, dat
1236 if not name in self.untagged_responses:
1237 return typ, [None]
1238 data = self.untagged_responses.pop(name)
1239 if __debug__:
1240 if self.debug >= 5:
1241 self._mesg('untagged_responses[%s] => %s' % (name, data))
1242 return typ, data
1243
1244
1245 if __debug__:
1246
1247 def _mesg(self, s, secs=None):
1248 if secs is None:
1249 secs = time.time()
1250 tm = time.strftime('%M:%S', time.localtime(secs))
1251 sys.stderr.write(' %s.%02d %s\n' % (tm, (secs*100)%100, s))
1252 sys.stderr.flush()
1253
1254 def _dump_ur(self, untagged_resp_dict):
1255 if not untagged_resp_dict:
1256 return
1257 items = (f'{key}: {value!r}'
1258 for key, value in untagged_resp_dict.items())
1259 self._mesg('untagged responses dump:' + '\n\t\t'.join(items))
1260
1261 def _log(self, line):
1262 # Keep log of last `_cmd_log_len' interactions for debugging.
1263 self._cmd_log[self._cmd_log_idx] = (line, time.time())
1264 self._cmd_log_idx += 1
1265 if self._cmd_log_idx >= self._cmd_log_len:
1266 self._cmd_log_idx = 0
1267
1268 def print_log(self):
1269 self._mesg('last %d IMAP4 interactions:' % len(self._cmd_log))
1270 i, n = self._cmd_log_idx, self._cmd_log_len
1271 while n:
1272 try:
1273 self._mesg(*self._cmd_log[i])
1274 except:
1275 pass
1276 i += 1
1277 if i >= self._cmd_log_len:
1278 i = 0
1279 n -= 1
1280
1281
1282 if HAVE_SSL:
1283
1284 class ESC[4;38;5;81mIMAP4_SSL(ESC[4;38;5;149mIMAP4):
1285
1286 """IMAP4 client class over SSL connection
1287
1288 Instantiate with: IMAP4_SSL([host[, port[, ssl_context[, timeout=None]]]])
1289
1290 host - host's name (default: localhost);
1291 port - port number (default: standard IMAP4 SSL port);
1292 ssl_context - a SSLContext object that contains your certificate chain
1293 and private key (default: None)
1294 timeout - socket timeout (default: None) If timeout is not given or is None,
1295 the global default socket timeout is used
1296
1297 for more documentation see the docstring of the parent class IMAP4.
1298 """
1299
1300
1301 def __init__(self, host='', port=IMAP4_SSL_PORT,
1302 *, ssl_context=None, timeout=None):
1303 if ssl_context is None:
1304 ssl_context = ssl._create_stdlib_context()
1305 self.ssl_context = ssl_context
1306 IMAP4.__init__(self, host, port, timeout)
1307
1308 def _create_socket(self, timeout):
1309 sock = IMAP4._create_socket(self, timeout)
1310 return self.ssl_context.wrap_socket(sock,
1311 server_hostname=self.host)
1312
1313 def open(self, host='', port=IMAP4_SSL_PORT, timeout=None):
1314 """Setup connection to remote server on "host:port".
1315 (default: localhost:standard IMAP4 SSL port).
1316 This connection will be used by the routines:
1317 read, readline, send, shutdown.
1318 """
1319 IMAP4.open(self, host, port, timeout)
1320
1321 __all__.append("IMAP4_SSL")
1322
1323
1324 class ESC[4;38;5;81mIMAP4_stream(ESC[4;38;5;149mIMAP4):
1325
1326 """IMAP4 client class over a stream
1327
1328 Instantiate with: IMAP4_stream(command)
1329
1330 "command" - a string that can be passed to subprocess.Popen()
1331
1332 for more documentation see the docstring of the parent class IMAP4.
1333 """
1334
1335
1336 def __init__(self, command):
1337 self.command = command
1338 IMAP4.__init__(self)
1339
1340
1341 def open(self, host=None, port=None, timeout=None):
1342 """Setup a stream connection.
1343 This connection will be used by the routines:
1344 read, readline, send, shutdown.
1345 """
1346 self.host = None # For compatibility with parent class
1347 self.port = None
1348 self.sock = None
1349 self.file = None
1350 self.process = subprocess.Popen(self.command,
1351 bufsize=DEFAULT_BUFFER_SIZE,
1352 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1353 shell=True, close_fds=True)
1354 self.writefile = self.process.stdin
1355 self.readfile = self.process.stdout
1356
1357 def read(self, size):
1358 """Read 'size' bytes from remote."""
1359 return self.readfile.read(size)
1360
1361
1362 def readline(self):
1363 """Read line from remote."""
1364 return self.readfile.readline()
1365
1366
1367 def send(self, data):
1368 """Send data to remote."""
1369 self.writefile.write(data)
1370 self.writefile.flush()
1371
1372
1373 def shutdown(self):
1374 """Close I/O established in "open"."""
1375 self.readfile.close()
1376 self.writefile.close()
1377 self.process.wait()
1378
1379
1380
1381 class ESC[4;38;5;81m_Authenticator:
1382
1383 """Private class to provide en/decoding
1384 for base64-based authentication conversation.
1385 """
1386
1387 def __init__(self, mechinst):
1388 self.mech = mechinst # Callable object to provide/process data
1389
1390 def process(self, data):
1391 ret = self.mech(self.decode(data))
1392 if ret is None:
1393 return b'*' # Abort conversation
1394 return self.encode(ret)
1395
1396 def encode(self, inp):
1397 #
1398 # Invoke binascii.b2a_base64 iteratively with
1399 # short even length buffers, strip the trailing
1400 # line feed from the result and append. "Even"
1401 # means a number that factors to both 6 and 8,
1402 # so when it gets to the end of the 8-bit input
1403 # there's no partial 6-bit output.
1404 #
1405 oup = b''
1406 if isinstance(inp, str):
1407 inp = inp.encode('utf-8')
1408 while inp:
1409 if len(inp) > 48:
1410 t = inp[:48]
1411 inp = inp[48:]
1412 else:
1413 t = inp
1414 inp = b''
1415 e = binascii.b2a_base64(t)
1416 if e:
1417 oup = oup + e[:-1]
1418 return oup
1419
1420 def decode(self, inp):
1421 if not inp:
1422 return b''
1423 return binascii.a2b_base64(inp)
1424
1425 Months = ' Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split(' ')
1426 Mon2num = {s.encode():n+1 for n, s in enumerate(Months[1:])}
1427
1428 def Internaldate2tuple(resp):
1429 """Parse an IMAP4 INTERNALDATE string.
1430
1431 Return corresponding local time. The return value is a
1432 time.struct_time tuple or None if the string has wrong format.
1433 """
1434
1435 mo = InternalDate.match(resp)
1436 if not mo:
1437 return None
1438
1439 mon = Mon2num[mo.group('mon')]
1440 zonen = mo.group('zonen')
1441
1442 day = int(mo.group('day'))
1443 year = int(mo.group('year'))
1444 hour = int(mo.group('hour'))
1445 min = int(mo.group('min'))
1446 sec = int(mo.group('sec'))
1447 zoneh = int(mo.group('zoneh'))
1448 zonem = int(mo.group('zonem'))
1449
1450 # INTERNALDATE timezone must be subtracted to get UT
1451
1452 zone = (zoneh*60 + zonem)*60
1453 if zonen == b'-':
1454 zone = -zone
1455
1456 tt = (year, mon, day, hour, min, sec, -1, -1, -1)
1457 utc = calendar.timegm(tt) - zone
1458
1459 return time.localtime(utc)
1460
1461
1462
1463 def Int2AP(num):
1464
1465 """Convert integer to A-P string representation."""
1466
1467 val = b''; AP = b'ABCDEFGHIJKLMNOP'
1468 num = int(abs(num))
1469 while num:
1470 num, mod = divmod(num, 16)
1471 val = AP[mod:mod+1] + val
1472 return val
1473
1474
1475
1476 def ParseFlags(resp):
1477
1478 """Convert IMAP4 flags response to python tuple."""
1479
1480 mo = Flags.match(resp)
1481 if not mo:
1482 return ()
1483
1484 return tuple(mo.group('flags').split())
1485
1486
1487 def Time2Internaldate(date_time):
1488
1489 """Convert date_time to IMAP4 INTERNALDATE representation.
1490
1491 Return string in form: '"DD-Mmm-YYYY HH:MM:SS +HHMM"'. The
1492 date_time argument can be a number (int or float) representing
1493 seconds since epoch (as returned by time.time()), a 9-tuple
1494 representing local time, an instance of time.struct_time (as
1495 returned by time.localtime()), an aware datetime instance or a
1496 double-quoted string. In the last case, it is assumed to already
1497 be in the correct format.
1498 """
1499 if isinstance(date_time, (int, float)):
1500 dt = datetime.fromtimestamp(date_time,
1501 timezone.utc).astimezone()
1502 elif isinstance(date_time, tuple):
1503 try:
1504 gmtoff = date_time.tm_gmtoff
1505 except AttributeError:
1506 if time.daylight:
1507 dst = date_time[8]
1508 if dst == -1:
1509 dst = time.localtime(time.mktime(date_time))[8]
1510 gmtoff = -(time.timezone, time.altzone)[dst]
1511 else:
1512 gmtoff = -time.timezone
1513 delta = timedelta(seconds=gmtoff)
1514 dt = datetime(*date_time[:6], tzinfo=timezone(delta))
1515 elif isinstance(date_time, datetime):
1516 if date_time.tzinfo is None:
1517 raise ValueError("date_time must be aware")
1518 dt = date_time
1519 elif isinstance(date_time, str) and (date_time[0],date_time[-1]) == ('"','"'):
1520 return date_time # Assume in correct format
1521 else:
1522 raise ValueError("date_time not of a known type")
1523 fmt = '"%d-{}-%Y %H:%M:%S %z"'.format(Months[dt.month])
1524 return dt.strftime(fmt)
1525
1526
1527
1528 if __name__ == '__main__':
1529
1530 # To test: invoke either as 'python imaplib.py [IMAP4_server_hostname]'
1531 # or 'python imaplib.py -s "rsh IMAP4_server_hostname exec /etc/rimapd"'
1532 # to test the IMAP4_stream class
1533
1534 import getopt, getpass
1535
1536 try:
1537 optlist, args = getopt.getopt(sys.argv[1:], 'd:s:')
1538 except getopt.error as val:
1539 optlist, args = (), ()
1540
1541 stream_command = None
1542 for opt,val in optlist:
1543 if opt == '-d':
1544 Debug = int(val)
1545 elif opt == '-s':
1546 stream_command = val
1547 if not args: args = (stream_command,)
1548
1549 if not args: args = ('',)
1550
1551 host = args[0]
1552
1553 USER = getpass.getuser()
1554 PASSWD = getpass.getpass("IMAP password for %s on %s: " % (USER, host or "localhost"))
1555
1556 test_mesg = 'From: %(user)s@localhost%(lf)sSubject: IMAP4 test%(lf)s%(lf)sdata...%(lf)s' % {'user':USER, 'lf':'\n'}
1557 test_seq1 = (
1558 ('login', (USER, PASSWD)),
1559 ('create', ('/tmp/xxx 1',)),
1560 ('rename', ('/tmp/xxx 1', '/tmp/yyy')),
1561 ('CREATE', ('/tmp/yyz 2',)),
1562 ('append', ('/tmp/yyz 2', None, None, test_mesg)),
1563 ('list', ('/tmp', 'yy*')),
1564 ('select', ('/tmp/yyz 2',)),
1565 ('search', (None, 'SUBJECT', 'test')),
1566 ('fetch', ('1', '(FLAGS INTERNALDATE RFC822)')),
1567 ('store', ('1', 'FLAGS', r'(\Deleted)')),
1568 ('namespace', ()),
1569 ('expunge', ()),
1570 ('recent', ()),
1571 ('close', ()),
1572 )
1573
1574 test_seq2 = (
1575 ('select', ()),
1576 ('response',('UIDVALIDITY',)),
1577 ('uid', ('SEARCH', 'ALL')),
1578 ('response', ('EXISTS',)),
1579 ('append', (None, None, None, test_mesg)),
1580 ('recent', ()),
1581 ('logout', ()),
1582 )
1583
1584 def run(cmd, args):
1585 M._mesg('%s %s' % (cmd, args))
1586 typ, dat = getattr(M, cmd)(*args)
1587 M._mesg('%s => %s %s' % (cmd, typ, dat))
1588 if typ == 'NO': raise dat[0]
1589 return dat
1590
1591 try:
1592 if stream_command:
1593 M = IMAP4_stream(stream_command)
1594 else:
1595 M = IMAP4(host)
1596 if M.state == 'AUTH':
1597 test_seq1 = test_seq1[1:] # Login not needed
1598 M._mesg('PROTOCOL_VERSION = %s' % M.PROTOCOL_VERSION)
1599 M._mesg('CAPABILITIES = %r' % (M.capabilities,))
1600
1601 for cmd,args in test_seq1:
1602 run(cmd, args)
1603
1604 for ml in run('list', ('/tmp/', 'yy%')):
1605 mo = re.match(r'.*"([^"]+)"$', ml)
1606 if mo: path = mo.group(1)
1607 else: path = ml.split()[-1]
1608 run('delete', (path,))
1609
1610 for cmd,args in test_seq2:
1611 dat = run(cmd, args)
1612
1613 if (cmd,args) != ('uid', ('SEARCH', 'ALL')):
1614 continue
1615
1616 uid = dat[-1].split()
1617 if not uid: continue
1618 run('uid', ('FETCH', '%s' % uid[-1],
1619 '(FLAGS INTERNALDATE RFC822.SIZE RFC822.HEADER RFC822.TEXT)'))
1620
1621 print('\nAll tests OK.')
1622
1623 except:
1624 print('\nTests failed.')
1625
1626 if not Debug:
1627 print('''
1628 If you would like to see debugging output,
1629 try: %s -d5
1630 ''' % sys.argv[0])
1631
1632 raise