python (3.11.7)
1 # Copyright (C) 2002-2007 Python Software Foundation
2 # Author: Ben Gertzfield, Barry Warsaw
3 # Contact: email-sig@python.org
4
5 """Header encoding and decoding functionality."""
6
7 __all__ = [
8 'Header',
9 'decode_header',
10 'make_header',
11 ]
12
13 import re
14 import binascii
15
16 import email.quoprimime
17 import email.base64mime
18
19 from email.errors import HeaderParseError
20 from email import charset as _charset
21 Charset = _charset.Charset
22
23 NL = '\n'
24 SPACE = ' '
25 BSPACE = b' '
26 SPACE8 = ' ' * 8
27 EMPTYSTRING = ''
28 MAXLINELEN = 78
29 FWS = ' \t'
30
31 USASCII = Charset('us-ascii')
32 UTF8 = Charset('utf-8')
33
34 # Match encoded-word strings in the form =?charset?q?Hello_World?=
35 ecre = re.compile(r'''
36 =\? # literal =?
37 (?P<charset>[^?]*?) # non-greedy up to the next ? is the charset
38 \? # literal ?
39 (?P<encoding>[qQbB]) # either a "q" or a "b", case insensitive
40 \? # literal ?
41 (?P<encoded>.*?) # non-greedy up to the next ?= is the encoded string
42 \?= # literal ?=
43 ''', re.VERBOSE | re.MULTILINE)
44
45 # Field name regexp, including trailing colon, but not separating whitespace,
46 # according to RFC 2822. Character range is from tilde to exclamation mark.
47 # For use with .match()
48 fcre = re.compile(r'[\041-\176]+:$')
49
50 # Find a header embedded in a putative header value. Used to check for
51 # header injection attack.
52 _embedded_header = re.compile(r'\n[^ \t]+:')
53
54
55 # Helpers
56 _max_append = email.quoprimime._max_append
57
58
59 def decode_header(header):
60 """Decode a message header value without converting charset.
61
62 Returns a list of (string, charset) pairs containing each of the decoded
63 parts of the header. Charset is None for non-encoded parts of the header,
64 otherwise a lower-case string containing the name of the character set
65 specified in the encoded string.
66
67 header may be a string that may or may not contain RFC2047 encoded words,
68 or it may be a Header object.
69
70 An email.errors.HeaderParseError may be raised when certain decoding error
71 occurs (e.g. a base64 decoding exception).
72 """
73 # If it is a Header object, we can just return the encoded chunks.
74 if hasattr(header, '_chunks'):
75 return [(_charset._encode(string, str(charset)), str(charset))
76 for string, charset in header._chunks]
77 # If no encoding, just return the header with no charset.
78 if not ecre.search(header):
79 return [(header, None)]
80 # First step is to parse all the encoded parts into triplets of the form
81 # (encoded_string, encoding, charset). For unencoded strings, the last
82 # two parts will be None.
83 words = []
84 for line in header.splitlines():
85 parts = ecre.split(line)
86 first = True
87 while parts:
88 unencoded = parts.pop(0)
89 if first:
90 unencoded = unencoded.lstrip()
91 first = False
92 if unencoded:
93 words.append((unencoded, None, None))
94 if parts:
95 charset = parts.pop(0).lower()
96 encoding = parts.pop(0).lower()
97 encoded = parts.pop(0)
98 words.append((encoded, encoding, charset))
99 # Now loop over words and remove words that consist of whitespace
100 # between two encoded strings.
101 droplist = []
102 for n, w in enumerate(words):
103 if n>1 and w[1] and words[n-2][1] and words[n-1][0].isspace():
104 droplist.append(n-1)
105 for d in reversed(droplist):
106 del words[d]
107
108 # The next step is to decode each encoded word by applying the reverse
109 # base64 or quopri transformation. decoded_words is now a list of the
110 # form (decoded_word, charset).
111 decoded_words = []
112 for encoded_string, encoding, charset in words:
113 if encoding is None:
114 # This is an unencoded word.
115 decoded_words.append((encoded_string, charset))
116 elif encoding == 'q':
117 word = email.quoprimime.header_decode(encoded_string)
118 decoded_words.append((word, charset))
119 elif encoding == 'b':
120 paderr = len(encoded_string) % 4 # Postel's law: add missing padding
121 if paderr:
122 encoded_string += '==='[:4 - paderr]
123 try:
124 word = email.base64mime.decode(encoded_string)
125 except binascii.Error:
126 raise HeaderParseError('Base64 decoding error')
127 else:
128 decoded_words.append((word, charset))
129 else:
130 raise AssertionError('Unexpected encoding: ' + encoding)
131 # Now convert all words to bytes and collapse consecutive runs of
132 # similarly encoded words.
133 collapsed = []
134 last_word = last_charset = None
135 for word, charset in decoded_words:
136 if isinstance(word, str):
137 word = bytes(word, 'raw-unicode-escape')
138 if last_word is None:
139 last_word = word
140 last_charset = charset
141 elif charset != last_charset:
142 collapsed.append((last_word, last_charset))
143 last_word = word
144 last_charset = charset
145 elif last_charset is None:
146 last_word += BSPACE + word
147 else:
148 last_word += word
149 collapsed.append((last_word, last_charset))
150 return collapsed
151
152
153 def make_header(decoded_seq, maxlinelen=None, header_name=None,
154 continuation_ws=' '):
155 """Create a Header from a sequence of pairs as returned by decode_header()
156
157 decode_header() takes a header value string and returns a sequence of
158 pairs of the format (decoded_string, charset) where charset is the string
159 name of the character set.
160
161 This function takes one of those sequence of pairs and returns a Header
162 instance. Optional maxlinelen, header_name, and continuation_ws are as in
163 the Header constructor.
164 """
165 h = Header(maxlinelen=maxlinelen, header_name=header_name,
166 continuation_ws=continuation_ws)
167 for s, charset in decoded_seq:
168 # None means us-ascii but we can simply pass it on to h.append()
169 if charset is not None and not isinstance(charset, Charset):
170 charset = Charset(charset)
171 h.append(s, charset)
172 return h
173
174
175 class ESC[4;38;5;81mHeader:
176 def __init__(self, s=None, charset=None,
177 maxlinelen=None, header_name=None,
178 continuation_ws=' ', errors='strict'):
179 """Create a MIME-compliant header that can contain many character sets.
180
181 Optional s is the initial header value. If None, the initial header
182 value is not set. You can later append to the header with .append()
183 method calls. s may be a byte string or a Unicode string, but see the
184 .append() documentation for semantics.
185
186 Optional charset serves two purposes: it has the same meaning as the
187 charset argument to the .append() method. It also sets the default
188 character set for all subsequent .append() calls that omit the charset
189 argument. If charset is not provided in the constructor, the us-ascii
190 charset is used both as s's initial charset and as the default for
191 subsequent .append() calls.
192
193 The maximum line length can be specified explicitly via maxlinelen. For
194 splitting the first line to a shorter value (to account for the field
195 header which isn't included in s, e.g. `Subject') pass in the name of
196 the field in header_name. The default maxlinelen is 78 as recommended
197 by RFC 2822.
198
199 continuation_ws must be RFC 2822 compliant folding whitespace (usually
200 either a space or a hard tab) which will be prepended to continuation
201 lines.
202
203 errors is passed through to the .append() call.
204 """
205 if charset is None:
206 charset = USASCII
207 elif not isinstance(charset, Charset):
208 charset = Charset(charset)
209 self._charset = charset
210 self._continuation_ws = continuation_ws
211 self._chunks = []
212 if s is not None:
213 self.append(s, charset, errors)
214 if maxlinelen is None:
215 maxlinelen = MAXLINELEN
216 self._maxlinelen = maxlinelen
217 if header_name is None:
218 self._headerlen = 0
219 else:
220 # Take the separating colon and space into account.
221 self._headerlen = len(header_name) + 2
222
223 def __str__(self):
224 """Return the string value of the header."""
225 self._normalize()
226 uchunks = []
227 lastcs = None
228 lastspace = None
229 for string, charset in self._chunks:
230 # We must preserve spaces between encoded and non-encoded word
231 # boundaries, which means for us we need to add a space when we go
232 # from a charset to None/us-ascii, or from None/us-ascii to a
233 # charset. Only do this for the second and subsequent chunks.
234 # Don't add a space if the None/us-ascii string already has
235 # a space (trailing or leading depending on transition)
236 nextcs = charset
237 if nextcs == _charset.UNKNOWN8BIT:
238 original_bytes = string.encode('ascii', 'surrogateescape')
239 string = original_bytes.decode('ascii', 'replace')
240 if uchunks:
241 hasspace = string and self._nonctext(string[0])
242 if lastcs not in (None, 'us-ascii'):
243 if nextcs in (None, 'us-ascii') and not hasspace:
244 uchunks.append(SPACE)
245 nextcs = None
246 elif nextcs not in (None, 'us-ascii') and not lastspace:
247 uchunks.append(SPACE)
248 lastspace = string and self._nonctext(string[-1])
249 lastcs = nextcs
250 uchunks.append(string)
251 return EMPTYSTRING.join(uchunks)
252
253 # Rich comparison operators for equality only. BAW: does it make sense to
254 # have or explicitly disable <, <=, >, >= operators?
255 def __eq__(self, other):
256 # other may be a Header or a string. Both are fine so coerce
257 # ourselves to a unicode (of the unencoded header value), swap the
258 # args and do another comparison.
259 return other == str(self)
260
261 def append(self, s, charset=None, errors='strict'):
262 """Append a string to the MIME header.
263
264 Optional charset, if given, should be a Charset instance or the name
265 of a character set (which will be converted to a Charset instance). A
266 value of None (the default) means that the charset given in the
267 constructor is used.
268
269 s may be a byte string or a Unicode string. If it is a byte string
270 (i.e. isinstance(s, str) is false), then charset is the encoding of
271 that byte string, and a UnicodeError will be raised if the string
272 cannot be decoded with that charset. If s is a Unicode string, then
273 charset is a hint specifying the character set of the characters in
274 the string. In either case, when producing an RFC 2822 compliant
275 header using RFC 2047 rules, the string will be encoded using the
276 output codec of the charset. If the string cannot be encoded to the
277 output codec, a UnicodeError will be raised.
278
279 Optional `errors' is passed as the errors argument to the decode
280 call if s is a byte string.
281 """
282 if charset is None:
283 charset = self._charset
284 elif not isinstance(charset, Charset):
285 charset = Charset(charset)
286 if not isinstance(s, str):
287 input_charset = charset.input_codec or 'us-ascii'
288 if input_charset == _charset.UNKNOWN8BIT:
289 s = s.decode('us-ascii', 'surrogateescape')
290 else:
291 s = s.decode(input_charset, errors)
292 # Ensure that the bytes we're storing can be decoded to the output
293 # character set, otherwise an early error is raised.
294 output_charset = charset.output_codec or 'us-ascii'
295 if output_charset != _charset.UNKNOWN8BIT:
296 try:
297 s.encode(output_charset, errors)
298 except UnicodeEncodeError:
299 if output_charset!='us-ascii':
300 raise
301 charset = UTF8
302 self._chunks.append((s, charset))
303
304 def _nonctext(self, s):
305 """True if string s is not a ctext character of RFC822.
306 """
307 return s.isspace() or s in ('(', ')', '\\')
308
309 def encode(self, splitchars=';, \t', maxlinelen=None, linesep='\n'):
310 r"""Encode a message header into an RFC-compliant format.
311
312 There are many issues involved in converting a given string for use in
313 an email header. Only certain character sets are readable in most
314 email clients, and as header strings can only contain a subset of
315 7-bit ASCII, care must be taken to properly convert and encode (with
316 Base64 or quoted-printable) header strings. In addition, there is a
317 75-character length limit on any given encoded header field, so
318 line-wrapping must be performed, even with double-byte character sets.
319
320 Optional maxlinelen specifies the maximum length of each generated
321 line, exclusive of the linesep string. Individual lines may be longer
322 than maxlinelen if a folding point cannot be found. The first line
323 will be shorter by the length of the header name plus ": " if a header
324 name was specified at Header construction time. The default value for
325 maxlinelen is determined at header construction time.
326
327 Optional splitchars is a string containing characters which should be
328 given extra weight by the splitting algorithm during normal header
329 wrapping. This is in very rough support of RFC 2822's `higher level
330 syntactic breaks': split points preceded by a splitchar are preferred
331 during line splitting, with the characters preferred in the order in
332 which they appear in the string. Space and tab may be included in the
333 string to indicate whether preference should be given to one over the
334 other as a split point when other split chars do not appear in the line
335 being split. Splitchars does not affect RFC 2047 encoded lines.
336
337 Optional linesep is a string to be used to separate the lines of
338 the value. The default value is the most useful for typical
339 Python applications, but it can be set to \r\n to produce RFC-compliant
340 line separators when needed.
341 """
342 self._normalize()
343 if maxlinelen is None:
344 maxlinelen = self._maxlinelen
345 # A maxlinelen of 0 means don't wrap. For all practical purposes,
346 # choosing a huge number here accomplishes that and makes the
347 # _ValueFormatter algorithm much simpler.
348 if maxlinelen == 0:
349 maxlinelen = 1000000
350 formatter = _ValueFormatter(self._headerlen, maxlinelen,
351 self._continuation_ws, splitchars)
352 lastcs = None
353 hasspace = lastspace = None
354 for string, charset in self._chunks:
355 if hasspace is not None:
356 hasspace = string and self._nonctext(string[0])
357 if lastcs not in (None, 'us-ascii'):
358 if not hasspace or charset not in (None, 'us-ascii'):
359 formatter.add_transition()
360 elif charset not in (None, 'us-ascii') and not lastspace:
361 formatter.add_transition()
362 lastspace = string and self._nonctext(string[-1])
363 lastcs = charset
364 hasspace = False
365 lines = string.splitlines()
366 if lines:
367 formatter.feed('', lines[0], charset)
368 else:
369 formatter.feed('', '', charset)
370 for line in lines[1:]:
371 formatter.newline()
372 if charset.header_encoding is not None:
373 formatter.feed(self._continuation_ws, ' ' + line.lstrip(),
374 charset)
375 else:
376 sline = line.lstrip()
377 fws = line[:len(line)-len(sline)]
378 formatter.feed(fws, sline, charset)
379 if len(lines) > 1:
380 formatter.newline()
381 if self._chunks:
382 formatter.add_transition()
383 value = formatter._str(linesep)
384 if _embedded_header.search(value):
385 raise HeaderParseError("header value appears to contain "
386 "an embedded header: {!r}".format(value))
387 return value
388
389 def _normalize(self):
390 # Step 1: Normalize the chunks so that all runs of identical charsets
391 # get collapsed into a single unicode string.
392 chunks = []
393 last_charset = None
394 last_chunk = []
395 for string, charset in self._chunks:
396 if charset == last_charset:
397 last_chunk.append(string)
398 else:
399 if last_charset is not None:
400 chunks.append((SPACE.join(last_chunk), last_charset))
401 last_chunk = [string]
402 last_charset = charset
403 if last_chunk:
404 chunks.append((SPACE.join(last_chunk), last_charset))
405 self._chunks = chunks
406
407
408 class ESC[4;38;5;81m_ValueFormatter:
409 def __init__(self, headerlen, maxlen, continuation_ws, splitchars):
410 self._maxlen = maxlen
411 self._continuation_ws = continuation_ws
412 self._continuation_ws_len = len(continuation_ws)
413 self._splitchars = splitchars
414 self._lines = []
415 self._current_line = _Accumulator(headerlen)
416
417 def _str(self, linesep):
418 self.newline()
419 return linesep.join(self._lines)
420
421 def __str__(self):
422 return self._str(NL)
423
424 def newline(self):
425 end_of_line = self._current_line.pop()
426 if end_of_line != (' ', ''):
427 self._current_line.push(*end_of_line)
428 if len(self._current_line) > 0:
429 if self._current_line.is_onlyws() and self._lines:
430 self._lines[-1] += str(self._current_line)
431 else:
432 self._lines.append(str(self._current_line))
433 self._current_line.reset()
434
435 def add_transition(self):
436 self._current_line.push(' ', '')
437
438 def feed(self, fws, string, charset):
439 # If the charset has no header encoding (i.e. it is an ASCII encoding)
440 # then we must split the header at the "highest level syntactic break"
441 # possible. Note that we don't have a lot of smarts about field
442 # syntax; we just try to break on semi-colons, then commas, then
443 # whitespace. Eventually, this should be pluggable.
444 if charset.header_encoding is None:
445 self._ascii_split(fws, string, self._splitchars)
446 return
447 # Otherwise, we're doing either a Base64 or a quoted-printable
448 # encoding which means we don't need to split the line on syntactic
449 # breaks. We can basically just find enough characters to fit on the
450 # current line, minus the RFC 2047 chrome. What makes this trickier
451 # though is that we have to split at octet boundaries, not character
452 # boundaries but it's only safe to split at character boundaries so at
453 # best we can only get close.
454 encoded_lines = charset.header_encode_lines(string, self._maxlengths())
455 # The first element extends the current line, but if it's None then
456 # nothing more fit on the current line so start a new line.
457 try:
458 first_line = encoded_lines.pop(0)
459 except IndexError:
460 # There are no encoded lines, so we're done.
461 return
462 if first_line is not None:
463 self._append_chunk(fws, first_line)
464 try:
465 last_line = encoded_lines.pop()
466 except IndexError:
467 # There was only one line.
468 return
469 self.newline()
470 self._current_line.push(self._continuation_ws, last_line)
471 # Everything else are full lines in themselves.
472 for line in encoded_lines:
473 self._lines.append(self._continuation_ws + line)
474
475 def _maxlengths(self):
476 # The first line's length.
477 yield self._maxlen - len(self._current_line)
478 while True:
479 yield self._maxlen - self._continuation_ws_len
480
481 def _ascii_split(self, fws, string, splitchars):
482 # The RFC 2822 header folding algorithm is simple in principle but
483 # complex in practice. Lines may be folded any place where "folding
484 # white space" appears by inserting a linesep character in front of the
485 # FWS. The complication is that not all spaces or tabs qualify as FWS,
486 # and we are also supposed to prefer to break at "higher level
487 # syntactic breaks". We can't do either of these without intimate
488 # knowledge of the structure of structured headers, which we don't have
489 # here. So the best we can do here is prefer to break at the specified
490 # splitchars, and hope that we don't choose any spaces or tabs that
491 # aren't legal FWS. (This is at least better than the old algorithm,
492 # where we would sometimes *introduce* FWS after a splitchar, or the
493 # algorithm before that, where we would turn all white space runs into
494 # single spaces or tabs.)
495 parts = re.split("(["+FWS+"]+)", fws+string)
496 if parts[0]:
497 parts[:0] = ['']
498 else:
499 parts.pop(0)
500 for fws, part in zip(*[iter(parts)]*2):
501 self._append_chunk(fws, part)
502
503 def _append_chunk(self, fws, string):
504 self._current_line.push(fws, string)
505 if len(self._current_line) > self._maxlen:
506 # Find the best split point, working backward from the end.
507 # There might be none, on a long first line.
508 for ch in self._splitchars:
509 for i in range(self._current_line.part_count()-1, 0, -1):
510 if ch.isspace():
511 fws = self._current_line[i][0]
512 if fws and fws[0]==ch:
513 break
514 prevpart = self._current_line[i-1][1]
515 if prevpart and prevpart[-1]==ch:
516 break
517 else:
518 continue
519 break
520 else:
521 fws, part = self._current_line.pop()
522 if self._current_line._initial_size > 0:
523 # There will be a header, so leave it on a line by itself.
524 self.newline()
525 if not fws:
526 # We don't use continuation_ws here because the whitespace
527 # after a header should always be a space.
528 fws = ' '
529 self._current_line.push(fws, part)
530 return
531 remainder = self._current_line.pop_from(i)
532 self._lines.append(str(self._current_line))
533 self._current_line.reset(remainder)
534
535
536 class ESC[4;38;5;81m_Accumulator(ESC[4;38;5;149mlist):
537
538 def __init__(self, initial_size=0):
539 self._initial_size = initial_size
540 super().__init__()
541
542 def push(self, fws, string):
543 self.append((fws, string))
544
545 def pop_from(self, i=0):
546 popped = self[i:]
547 self[i:] = []
548 return popped
549
550 def pop(self):
551 if self.part_count()==0:
552 return ('', '')
553 return super().pop()
554
555 def __len__(self):
556 return sum((len(fws)+len(part) for fws, part in self),
557 self._initial_size)
558
559 def __str__(self):
560 return EMPTYSTRING.join((EMPTYSTRING.join((fws, part))
561 for fws, part in self))
562
563 def reset(self, startval=None):
564 if startval is None:
565 startval = []
566 self[:] = startval
567 self._initial_size = 0
568
569 def is_onlyws(self):
570 return self._initial_size==0 and (not self or str(self).isspace())
571
572 def part_count(self):
573 return super().__len__()