python (3.11.7)

(root)/
lib/
python3.11/
email/
header.py
       1  # Copyright (C) 2002-2007 Python Software Foundation
       2  # Author: Ben Gertzfield, Barry Warsaw
       3  # Contact: email-sig@python.org
       4  
       5  """Header encoding and decoding functionality."""
       6  
       7  __all__ = [
       8      'Header',
       9      'decode_header',
      10      'make_header',
      11      ]
      12  
      13  import re
      14  import binascii
      15  
      16  import email.quoprimime
      17  import email.base64mime
      18  
      19  from email.errors import HeaderParseError
      20  from email import charset as _charset
      21  Charset = _charset.Charset
      22  
      23  NL = '\n'
      24  SPACE = ' '
      25  BSPACE = b' '
      26  SPACE8 = ' ' * 8
      27  EMPTYSTRING = ''
      28  MAXLINELEN = 78
      29  FWS = ' \t'
      30  
      31  USASCII = Charset('us-ascii')
      32  UTF8 = Charset('utf-8')
      33  
      34  # Match encoded-word strings in the form =?charset?q?Hello_World?=
      35  ecre = re.compile(r'''
      36    =\?                   # literal =?
      37    (?P<charset>[^?]*?)   # non-greedy up to the next ? is the charset
      38    \?                    # literal ?
      39    (?P<encoding>[qQbB])  # either a "q" or a "b", case insensitive
      40    \?                    # literal ?
      41    (?P<encoded>.*?)      # non-greedy up to the next ?= is the encoded string
      42    \?=                   # literal ?=
      43    ''', re.VERBOSE | re.MULTILINE)
      44  
      45  # Field name regexp, including trailing colon, but not separating whitespace,
      46  # according to RFC 2822.  Character range is from tilde to exclamation mark.
      47  # For use with .match()
      48  fcre = re.compile(r'[\041-\176]+:$')
      49  
      50  # Find a header embedded in a putative header value.  Used to check for
      51  # header injection attack.
      52  _embedded_header = re.compile(r'\n[^ \t]+:')
      53  
      54  
      55  # Helpers
      56  _max_append = email.quoprimime._max_append
      57  
      58  
      59  def decode_header(header):
      60      """Decode a message header value without converting charset.
      61  
      62      Returns a list of (string, charset) pairs containing each of the decoded
      63      parts of the header.  Charset is None for non-encoded parts of the header,
      64      otherwise a lower-case string containing the name of the character set
      65      specified in the encoded string.
      66  
      67      header may be a string that may or may not contain RFC2047 encoded words,
      68      or it may be a Header object.
      69  
      70      An email.errors.HeaderParseError may be raised when certain decoding error
      71      occurs (e.g. a base64 decoding exception).
      72      """
      73      # If it is a Header object, we can just return the encoded chunks.
      74      if hasattr(header, '_chunks'):
      75          return [(_charset._encode(string, str(charset)), str(charset))
      76                      for string, charset in header._chunks]
      77      # If no encoding, just return the header with no charset.
      78      if not ecre.search(header):
      79          return [(header, None)]
      80      # First step is to parse all the encoded parts into triplets of the form
      81      # (encoded_string, encoding, charset).  For unencoded strings, the last
      82      # two parts will be None.
      83      words = []
      84      for line in header.splitlines():
      85          parts = ecre.split(line)
      86          first = True
      87          while parts:
      88              unencoded = parts.pop(0)
      89              if first:
      90                  unencoded = unencoded.lstrip()
      91                  first = False
      92              if unencoded:
      93                  words.append((unencoded, None, None))
      94              if parts:
      95                  charset = parts.pop(0).lower()
      96                  encoding = parts.pop(0).lower()
      97                  encoded = parts.pop(0)
      98                  words.append((encoded, encoding, charset))
      99      # Now loop over words and remove words that consist of whitespace
     100      # between two encoded strings.
     101      droplist = []
     102      for n, w in enumerate(words):
     103          if n>1 and w[1] and words[n-2][1] and words[n-1][0].isspace():
     104              droplist.append(n-1)
     105      for d in reversed(droplist):
     106          del words[d]
     107  
     108      # The next step is to decode each encoded word by applying the reverse
     109      # base64 or quopri transformation.  decoded_words is now a list of the
     110      # form (decoded_word, charset).
     111      decoded_words = []
     112      for encoded_string, encoding, charset in words:
     113          if encoding is None:
     114              # This is an unencoded word.
     115              decoded_words.append((encoded_string, charset))
     116          elif encoding == 'q':
     117              word = email.quoprimime.header_decode(encoded_string)
     118              decoded_words.append((word, charset))
     119          elif encoding == 'b':
     120              paderr = len(encoded_string) % 4   # Postel's law: add missing padding
     121              if paderr:
     122                  encoded_string += '==='[:4 - paderr]
     123              try:
     124                  word = email.base64mime.decode(encoded_string)
     125              except binascii.Error:
     126                  raise HeaderParseError('Base64 decoding error')
     127              else:
     128                  decoded_words.append((word, charset))
     129          else:
     130              raise AssertionError('Unexpected encoding: ' + encoding)
     131      # Now convert all words to bytes and collapse consecutive runs of
     132      # similarly encoded words.
     133      collapsed = []
     134      last_word = last_charset = None
     135      for word, charset in decoded_words:
     136          if isinstance(word, str):
     137              word = bytes(word, 'raw-unicode-escape')
     138          if last_word is None:
     139              last_word = word
     140              last_charset = charset
     141          elif charset != last_charset:
     142              collapsed.append((last_word, last_charset))
     143              last_word = word
     144              last_charset = charset
     145          elif last_charset is None:
     146              last_word += BSPACE + word
     147          else:
     148              last_word += word
     149      collapsed.append((last_word, last_charset))
     150      return collapsed
     151  
     152  
     153  def make_header(decoded_seq, maxlinelen=None, header_name=None,
     154                  continuation_ws=' '):
     155      """Create a Header from a sequence of pairs as returned by decode_header()
     156  
     157      decode_header() takes a header value string and returns a sequence of
     158      pairs of the format (decoded_string, charset) where charset is the string
     159      name of the character set.
     160  
     161      This function takes one of those sequence of pairs and returns a Header
     162      instance.  Optional maxlinelen, header_name, and continuation_ws are as in
     163      the Header constructor.
     164      """
     165      h = Header(maxlinelen=maxlinelen, header_name=header_name,
     166                 continuation_ws=continuation_ws)
     167      for s, charset in decoded_seq:
     168          # None means us-ascii but we can simply pass it on to h.append()
     169          if charset is not None and not isinstance(charset, Charset):
     170              charset = Charset(charset)
     171          h.append(s, charset)
     172      return h
     173  
     174  
     175  class ESC[4;38;5;81mHeader:
     176      def __init__(self, s=None, charset=None,
     177                   maxlinelen=None, header_name=None,
     178                   continuation_ws=' ', errors='strict'):
     179          """Create a MIME-compliant header that can contain many character sets.
     180  
     181          Optional s is the initial header value.  If None, the initial header
     182          value is not set.  You can later append to the header with .append()
     183          method calls.  s may be a byte string or a Unicode string, but see the
     184          .append() documentation for semantics.
     185  
     186          Optional charset serves two purposes: it has the same meaning as the
     187          charset argument to the .append() method.  It also sets the default
     188          character set for all subsequent .append() calls that omit the charset
     189          argument.  If charset is not provided in the constructor, the us-ascii
     190          charset is used both as s's initial charset and as the default for
     191          subsequent .append() calls.
     192  
     193          The maximum line length can be specified explicitly via maxlinelen. For
     194          splitting the first line to a shorter value (to account for the field
     195          header which isn't included in s, e.g. `Subject') pass in the name of
     196          the field in header_name.  The default maxlinelen is 78 as recommended
     197          by RFC 2822.
     198  
     199          continuation_ws must be RFC 2822 compliant folding whitespace (usually
     200          either a space or a hard tab) which will be prepended to continuation
     201          lines.
     202  
     203          errors is passed through to the .append() call.
     204          """
     205          if charset is None:
     206              charset = USASCII
     207          elif not isinstance(charset, Charset):
     208              charset = Charset(charset)
     209          self._charset = charset
     210          self._continuation_ws = continuation_ws
     211          self._chunks = []
     212          if s is not None:
     213              self.append(s, charset, errors)
     214          if maxlinelen is None:
     215              maxlinelen = MAXLINELEN
     216          self._maxlinelen = maxlinelen
     217          if header_name is None:
     218              self._headerlen = 0
     219          else:
     220              # Take the separating colon and space into account.
     221              self._headerlen = len(header_name) + 2
     222  
     223      def __str__(self):
     224          """Return the string value of the header."""
     225          self._normalize()
     226          uchunks = []
     227          lastcs = None
     228          lastspace = None
     229          for string, charset in self._chunks:
     230              # We must preserve spaces between encoded and non-encoded word
     231              # boundaries, which means for us we need to add a space when we go
     232              # from a charset to None/us-ascii, or from None/us-ascii to a
     233              # charset.  Only do this for the second and subsequent chunks.
     234              # Don't add a space if the None/us-ascii string already has
     235              # a space (trailing or leading depending on transition)
     236              nextcs = charset
     237              if nextcs == _charset.UNKNOWN8BIT:
     238                  original_bytes = string.encode('ascii', 'surrogateescape')
     239                  string = original_bytes.decode('ascii', 'replace')
     240              if uchunks:
     241                  hasspace = string and self._nonctext(string[0])
     242                  if lastcs not in (None, 'us-ascii'):
     243                      if nextcs in (None, 'us-ascii') and not hasspace:
     244                          uchunks.append(SPACE)
     245                          nextcs = None
     246                  elif nextcs not in (None, 'us-ascii') and not lastspace:
     247                      uchunks.append(SPACE)
     248              lastspace = string and self._nonctext(string[-1])
     249              lastcs = nextcs
     250              uchunks.append(string)
     251          return EMPTYSTRING.join(uchunks)
     252  
     253      # Rich comparison operators for equality only.  BAW: does it make sense to
     254      # have or explicitly disable <, <=, >, >= operators?
     255      def __eq__(self, other):
     256          # other may be a Header or a string.  Both are fine so coerce
     257          # ourselves to a unicode (of the unencoded header value), swap the
     258          # args and do another comparison.
     259          return other == str(self)
     260  
     261      def append(self, s, charset=None, errors='strict'):
     262          """Append a string to the MIME header.
     263  
     264          Optional charset, if given, should be a Charset instance or the name
     265          of a character set (which will be converted to a Charset instance).  A
     266          value of None (the default) means that the charset given in the
     267          constructor is used.
     268  
     269          s may be a byte string or a Unicode string.  If it is a byte string
     270          (i.e. isinstance(s, str) is false), then charset is the encoding of
     271          that byte string, and a UnicodeError will be raised if the string
     272          cannot be decoded with that charset.  If s is a Unicode string, then
     273          charset is a hint specifying the character set of the characters in
     274          the string.  In either case, when producing an RFC 2822 compliant
     275          header using RFC 2047 rules, the string will be encoded using the
     276          output codec of the charset.  If the string cannot be encoded to the
     277          output codec, a UnicodeError will be raised.
     278  
     279          Optional `errors' is passed as the errors argument to the decode
     280          call if s is a byte string.
     281          """
     282          if charset is None:
     283              charset = self._charset
     284          elif not isinstance(charset, Charset):
     285              charset = Charset(charset)
     286          if not isinstance(s, str):
     287              input_charset = charset.input_codec or 'us-ascii'
     288              if input_charset == _charset.UNKNOWN8BIT:
     289                  s = s.decode('us-ascii', 'surrogateescape')
     290              else:
     291                  s = s.decode(input_charset, errors)
     292          # Ensure that the bytes we're storing can be decoded to the output
     293          # character set, otherwise an early error is raised.
     294          output_charset = charset.output_codec or 'us-ascii'
     295          if output_charset != _charset.UNKNOWN8BIT:
     296              try:
     297                  s.encode(output_charset, errors)
     298              except UnicodeEncodeError:
     299                  if output_charset!='us-ascii':
     300                      raise
     301                  charset = UTF8
     302          self._chunks.append((s, charset))
     303  
     304      def _nonctext(self, s):
     305          """True if string s is not a ctext character of RFC822.
     306          """
     307          return s.isspace() or s in ('(', ')', '\\')
     308  
     309      def encode(self, splitchars=';, \t', maxlinelen=None, linesep='\n'):
     310          r"""Encode a message header into an RFC-compliant format.
     311  
     312          There are many issues involved in converting a given string for use in
     313          an email header.  Only certain character sets are readable in most
     314          email clients, and as header strings can only contain a subset of
     315          7-bit ASCII, care must be taken to properly convert and encode (with
     316          Base64 or quoted-printable) header strings.  In addition, there is a
     317          75-character length limit on any given encoded header field, so
     318          line-wrapping must be performed, even with double-byte character sets.
     319  
     320          Optional maxlinelen specifies the maximum length of each generated
     321          line, exclusive of the linesep string.  Individual lines may be longer
     322          than maxlinelen if a folding point cannot be found.  The first line
     323          will be shorter by the length of the header name plus ": " if a header
     324          name was specified at Header construction time.  The default value for
     325          maxlinelen is determined at header construction time.
     326  
     327          Optional splitchars is a string containing characters which should be
     328          given extra weight by the splitting algorithm during normal header
     329          wrapping.  This is in very rough support of RFC 2822's `higher level
     330          syntactic breaks':  split points preceded by a splitchar are preferred
     331          during line splitting, with the characters preferred in the order in
     332          which they appear in the string.  Space and tab may be included in the
     333          string to indicate whether preference should be given to one over the
     334          other as a split point when other split chars do not appear in the line
     335          being split.  Splitchars does not affect RFC 2047 encoded lines.
     336  
     337          Optional linesep is a string to be used to separate the lines of
     338          the value.  The default value is the most useful for typical
     339          Python applications, but it can be set to \r\n to produce RFC-compliant
     340          line separators when needed.
     341          """
     342          self._normalize()
     343          if maxlinelen is None:
     344              maxlinelen = self._maxlinelen
     345          # A maxlinelen of 0 means don't wrap.  For all practical purposes,
     346          # choosing a huge number here accomplishes that and makes the
     347          # _ValueFormatter algorithm much simpler.
     348          if maxlinelen == 0:
     349              maxlinelen = 1000000
     350          formatter = _ValueFormatter(self._headerlen, maxlinelen,
     351                                      self._continuation_ws, splitchars)
     352          lastcs = None
     353          hasspace = lastspace = None
     354          for string, charset in self._chunks:
     355              if hasspace is not None:
     356                  hasspace = string and self._nonctext(string[0])
     357                  if lastcs not in (None, 'us-ascii'):
     358                      if not hasspace or charset not in (None, 'us-ascii'):
     359                          formatter.add_transition()
     360                  elif charset not in (None, 'us-ascii') and not lastspace:
     361                      formatter.add_transition()
     362              lastspace = string and self._nonctext(string[-1])
     363              lastcs = charset
     364              hasspace = False
     365              lines = string.splitlines()
     366              if lines:
     367                  formatter.feed('', lines[0], charset)
     368              else:
     369                  formatter.feed('', '', charset)
     370              for line in lines[1:]:
     371                  formatter.newline()
     372                  if charset.header_encoding is not None:
     373                      formatter.feed(self._continuation_ws, ' ' + line.lstrip(),
     374                                     charset)
     375                  else:
     376                      sline = line.lstrip()
     377                      fws = line[:len(line)-len(sline)]
     378                      formatter.feed(fws, sline, charset)
     379              if len(lines) > 1:
     380                  formatter.newline()
     381          if self._chunks:
     382              formatter.add_transition()
     383          value = formatter._str(linesep)
     384          if _embedded_header.search(value):
     385              raise HeaderParseError("header value appears to contain "
     386                  "an embedded header: {!r}".format(value))
     387          return value
     388  
     389      def _normalize(self):
     390          # Step 1: Normalize the chunks so that all runs of identical charsets
     391          # get collapsed into a single unicode string.
     392          chunks = []
     393          last_charset = None
     394          last_chunk = []
     395          for string, charset in self._chunks:
     396              if charset == last_charset:
     397                  last_chunk.append(string)
     398              else:
     399                  if last_charset is not None:
     400                      chunks.append((SPACE.join(last_chunk), last_charset))
     401                  last_chunk = [string]
     402                  last_charset = charset
     403          if last_chunk:
     404              chunks.append((SPACE.join(last_chunk), last_charset))
     405          self._chunks = chunks
     406  
     407  
     408  class ESC[4;38;5;81m_ValueFormatter:
     409      def __init__(self, headerlen, maxlen, continuation_ws, splitchars):
     410          self._maxlen = maxlen
     411          self._continuation_ws = continuation_ws
     412          self._continuation_ws_len = len(continuation_ws)
     413          self._splitchars = splitchars
     414          self._lines = []
     415          self._current_line = _Accumulator(headerlen)
     416  
     417      def _str(self, linesep):
     418          self.newline()
     419          return linesep.join(self._lines)
     420  
     421      def __str__(self):
     422          return self._str(NL)
     423  
     424      def newline(self):
     425          end_of_line = self._current_line.pop()
     426          if end_of_line != (' ', ''):
     427              self._current_line.push(*end_of_line)
     428          if len(self._current_line) > 0:
     429              if self._current_line.is_onlyws() and self._lines:
     430                  self._lines[-1] += str(self._current_line)
     431              else:
     432                  self._lines.append(str(self._current_line))
     433          self._current_line.reset()
     434  
     435      def add_transition(self):
     436          self._current_line.push(' ', '')
     437  
     438      def feed(self, fws, string, charset):
     439          # If the charset has no header encoding (i.e. it is an ASCII encoding)
     440          # then we must split the header at the "highest level syntactic break"
     441          # possible. Note that we don't have a lot of smarts about field
     442          # syntax; we just try to break on semi-colons, then commas, then
     443          # whitespace.  Eventually, this should be pluggable.
     444          if charset.header_encoding is None:
     445              self._ascii_split(fws, string, self._splitchars)
     446              return
     447          # Otherwise, we're doing either a Base64 or a quoted-printable
     448          # encoding which means we don't need to split the line on syntactic
     449          # breaks.  We can basically just find enough characters to fit on the
     450          # current line, minus the RFC 2047 chrome.  What makes this trickier
     451          # though is that we have to split at octet boundaries, not character
     452          # boundaries but it's only safe to split at character boundaries so at
     453          # best we can only get close.
     454          encoded_lines = charset.header_encode_lines(string, self._maxlengths())
     455          # The first element extends the current line, but if it's None then
     456          # nothing more fit on the current line so start a new line.
     457          try:
     458              first_line = encoded_lines.pop(0)
     459          except IndexError:
     460              # There are no encoded lines, so we're done.
     461              return
     462          if first_line is not None:
     463              self._append_chunk(fws, first_line)
     464          try:
     465              last_line = encoded_lines.pop()
     466          except IndexError:
     467              # There was only one line.
     468              return
     469          self.newline()
     470          self._current_line.push(self._continuation_ws, last_line)
     471          # Everything else are full lines in themselves.
     472          for line in encoded_lines:
     473              self._lines.append(self._continuation_ws + line)
     474  
     475      def _maxlengths(self):
     476          # The first line's length.
     477          yield self._maxlen - len(self._current_line)
     478          while True:
     479              yield self._maxlen - self._continuation_ws_len
     480  
     481      def _ascii_split(self, fws, string, splitchars):
     482          # The RFC 2822 header folding algorithm is simple in principle but
     483          # complex in practice.  Lines may be folded any place where "folding
     484          # white space" appears by inserting a linesep character in front of the
     485          # FWS.  The complication is that not all spaces or tabs qualify as FWS,
     486          # and we are also supposed to prefer to break at "higher level
     487          # syntactic breaks".  We can't do either of these without intimate
     488          # knowledge of the structure of structured headers, which we don't have
     489          # here.  So the best we can do here is prefer to break at the specified
     490          # splitchars, and hope that we don't choose any spaces or tabs that
     491          # aren't legal FWS.  (This is at least better than the old algorithm,
     492          # where we would sometimes *introduce* FWS after a splitchar, or the
     493          # algorithm before that, where we would turn all white space runs into
     494          # single spaces or tabs.)
     495          parts = re.split("(["+FWS+"]+)", fws+string)
     496          if parts[0]:
     497              parts[:0] = ['']
     498          else:
     499              parts.pop(0)
     500          for fws, part in zip(*[iter(parts)]*2):
     501              self._append_chunk(fws, part)
     502  
     503      def _append_chunk(self, fws, string):
     504          self._current_line.push(fws, string)
     505          if len(self._current_line) > self._maxlen:
     506              # Find the best split point, working backward from the end.
     507              # There might be none, on a long first line.
     508              for ch in self._splitchars:
     509                  for i in range(self._current_line.part_count()-1, 0, -1):
     510                      if ch.isspace():
     511                          fws = self._current_line[i][0]
     512                          if fws and fws[0]==ch:
     513                              break
     514                      prevpart = self._current_line[i-1][1]
     515                      if prevpart and prevpart[-1]==ch:
     516                          break
     517                  else:
     518                      continue
     519                  break
     520              else:
     521                  fws, part = self._current_line.pop()
     522                  if self._current_line._initial_size > 0:
     523                      # There will be a header, so leave it on a line by itself.
     524                      self.newline()
     525                      if not fws:
     526                          # We don't use continuation_ws here because the whitespace
     527                          # after a header should always be a space.
     528                          fws = ' '
     529                  self._current_line.push(fws, part)
     530                  return
     531              remainder = self._current_line.pop_from(i)
     532              self._lines.append(str(self._current_line))
     533              self._current_line.reset(remainder)
     534  
     535  
     536  class ESC[4;38;5;81m_Accumulator(ESC[4;38;5;149mlist):
     537  
     538      def __init__(self, initial_size=0):
     539          self._initial_size = initial_size
     540          super().__init__()
     541  
     542      def push(self, fws, string):
     543          self.append((fws, string))
     544  
     545      def pop_from(self, i=0):
     546          popped = self[i:]
     547          self[i:] = []
     548          return popped
     549  
     550      def pop(self):
     551          if self.part_count()==0:
     552              return ('', '')
     553          return super().pop()
     554  
     555      def __len__(self):
     556          return sum((len(fws)+len(part) for fws, part in self),
     557                     self._initial_size)
     558  
     559      def __str__(self):
     560          return EMPTYSTRING.join((EMPTYSTRING.join((fws, part))
     561                                  for fws, part in self))
     562  
     563      def reset(self, startval=None):
     564          if startval is None:
     565              startval = []
     566          self[:] = startval
     567          self._initial_size = 0
     568  
     569      def is_onlyws(self):
     570          return self._initial_size==0 and (not self or str(self).isspace())
     571  
     572      def part_count(self):
     573          return super().__len__()