(root)/
Python-3.11.7/
Lib/
email/
contentmanager.py
       1  import binascii
       2  import email.charset
       3  import email.message
       4  import email.errors
       5  from email import quoprimime
       6  
       7  class ESC[4;38;5;81mContentManager:
       8  
       9      def __init__(self):
      10          self.get_handlers = {}
      11          self.set_handlers = {}
      12  
      13      def add_get_handler(self, key, handler):
      14          self.get_handlers[key] = handler
      15  
      16      def get_content(self, msg, *args, **kw):
      17          content_type = msg.get_content_type()
      18          if content_type in self.get_handlers:
      19              return self.get_handlers[content_type](msg, *args, **kw)
      20          maintype = msg.get_content_maintype()
      21          if maintype in self.get_handlers:
      22              return self.get_handlers[maintype](msg, *args, **kw)
      23          if '' in self.get_handlers:
      24              return self.get_handlers[''](msg, *args, **kw)
      25          raise KeyError(content_type)
      26  
      27      def add_set_handler(self, typekey, handler):
      28          self.set_handlers[typekey] = handler
      29  
      30      def set_content(self, msg, obj, *args, **kw):
      31          if msg.get_content_maintype() == 'multipart':
      32              # XXX: is this error a good idea or not?  We can remove it later,
      33              # but we can't add it later, so do it for now.
      34              raise TypeError("set_content not valid on multipart")
      35          handler = self._find_set_handler(msg, obj)
      36          msg.clear_content()
      37          handler(msg, obj, *args, **kw)
      38  
      39      def _find_set_handler(self, msg, obj):
      40          full_path_for_error = None
      41          for typ in type(obj).__mro__:
      42              if typ in self.set_handlers:
      43                  return self.set_handlers[typ]
      44              qname = typ.__qualname__
      45              modname = getattr(typ, '__module__', '')
      46              full_path = '.'.join((modname, qname)) if modname else qname
      47              if full_path_for_error is None:
      48                  full_path_for_error = full_path
      49              if full_path in self.set_handlers:
      50                  return self.set_handlers[full_path]
      51              if qname in self.set_handlers:
      52                  return self.set_handlers[qname]
      53              name = typ.__name__
      54              if name in self.set_handlers:
      55                  return self.set_handlers[name]
      56          if None in self.set_handlers:
      57              return self.set_handlers[None]
      58          raise KeyError(full_path_for_error)
      59  
      60  
      61  raw_data_manager = ContentManager()
      62  
      63  
      64  def get_text_content(msg, errors='replace'):
      65      content = msg.get_payload(decode=True)
      66      charset = msg.get_param('charset', 'ASCII')
      67      return content.decode(charset, errors=errors)
      68  raw_data_manager.add_get_handler('text', get_text_content)
      69  
      70  
      71  def get_non_text_content(msg):
      72      return msg.get_payload(decode=True)
      73  for maintype in 'audio image video application'.split():
      74      raw_data_manager.add_get_handler(maintype, get_non_text_content)
      75  del maintype
      76  
      77  
      78  def get_message_content(msg):
      79      return msg.get_payload(0)
      80  for subtype in 'rfc822 external-body'.split():
      81      raw_data_manager.add_get_handler('message/'+subtype, get_message_content)
      82  del subtype
      83  
      84  
      85  def get_and_fixup_unknown_message_content(msg):
      86      # If we don't understand a message subtype, we are supposed to treat it as
      87      # if it were application/octet-stream, per
      88      # tools.ietf.org/html/rfc2046#section-5.2.4.  Feedparser doesn't do that,
      89      # so do our best to fix things up.  Note that it is *not* appropriate to
      90      # model message/partial content as Message objects, so they are handled
      91      # here as well.  (How to reassemble them is out of scope for this comment :)
      92      return bytes(msg.get_payload(0))
      93  raw_data_manager.add_get_handler('message',
      94                                   get_and_fixup_unknown_message_content)
      95  
      96  
      97  def _prepare_set(msg, maintype, subtype, headers):
      98      msg['Content-Type'] = '/'.join((maintype, subtype))
      99      if headers:
     100          if not hasattr(headers[0], 'name'):
     101              mp = msg.policy
     102              headers = [mp.header_factory(*mp.header_source_parse([header]))
     103                         for header in headers]
     104          try:
     105              for header in headers:
     106                  if header.defects:
     107                      raise header.defects[0]
     108                  msg[header.name] = header
     109          except email.errors.HeaderDefect as exc:
     110              raise ValueError("Invalid header: {}".format(
     111                                  header.fold(policy=msg.policy))) from exc
     112  
     113  
     114  def _finalize_set(msg, disposition, filename, cid, params):
     115      if disposition is None and filename is not None:
     116          disposition = 'attachment'
     117      if disposition is not None:
     118          msg['Content-Disposition'] = disposition
     119      if filename is not None:
     120          msg.set_param('filename',
     121                        filename,
     122                        header='Content-Disposition',
     123                        replace=True)
     124      if cid is not None:
     125          msg['Content-ID'] = cid
     126      if params is not None:
     127          for key, value in params.items():
     128              msg.set_param(key, value)
     129  
     130  
     131  # XXX: This is a cleaned-up version of base64mime.body_encode (including a bug
     132  # fix in the calculation of unencoded_bytes_per_line).  It would be nice to
     133  # drop both this and quoprimime.body_encode in favor of enhanced binascii
     134  # routines that accepted a max_line_length parameter.
     135  def _encode_base64(data, max_line_length):
     136      encoded_lines = []
     137      unencoded_bytes_per_line = max_line_length // 4 * 3
     138      for i in range(0, len(data), unencoded_bytes_per_line):
     139          thisline = data[i:i+unencoded_bytes_per_line]
     140          encoded_lines.append(binascii.b2a_base64(thisline).decode('ascii'))
     141      return ''.join(encoded_lines)
     142  
     143  
     144  def _encode_text(string, charset, cte, policy):
     145      lines = string.encode(charset).splitlines()
     146      linesep = policy.linesep.encode('ascii')
     147      def embedded_body(lines): return linesep.join(lines) + linesep
     148      def normal_body(lines): return b'\n'.join(lines) + b'\n'
     149      if cte is None:
     150          # Use heuristics to decide on the "best" encoding.
     151          if max((len(x) for x in lines), default=0) <= policy.max_line_length:
     152              try:
     153                  return '7bit', normal_body(lines).decode('ascii')
     154              except UnicodeDecodeError:
     155                  pass
     156              if policy.cte_type == '8bit':
     157                  return '8bit', normal_body(lines).decode('ascii', 'surrogateescape')
     158          sniff = embedded_body(lines[:10])
     159          sniff_qp = quoprimime.body_encode(sniff.decode('latin-1'),
     160                                            policy.max_line_length)
     161          sniff_base64 = binascii.b2a_base64(sniff)
     162          # This is a little unfair to qp; it includes lineseps, base64 doesn't.
     163          if len(sniff_qp) > len(sniff_base64):
     164              cte = 'base64'
     165          else:
     166              cte = 'quoted-printable'
     167              if len(lines) <= 10:
     168                  return cte, sniff_qp
     169      if cte == '7bit':
     170          data = normal_body(lines).decode('ascii')
     171      elif cte == '8bit':
     172          data = normal_body(lines).decode('ascii', 'surrogateescape')
     173      elif cte == 'quoted-printable':
     174          data = quoprimime.body_encode(normal_body(lines).decode('latin-1'),
     175                                        policy.max_line_length)
     176      elif cte == 'base64':
     177          data = _encode_base64(embedded_body(lines), policy.max_line_length)
     178      else:
     179          raise ValueError("Unknown content transfer encoding {}".format(cte))
     180      return cte, data
     181  
     182  
     183  def set_text_content(msg, string, subtype="plain", charset='utf-8', cte=None,
     184                       disposition=None, filename=None, cid=None,
     185                       params=None, headers=None):
     186      _prepare_set(msg, 'text', subtype, headers)
     187      cte, payload = _encode_text(string, charset, cte, msg.policy)
     188      msg.set_payload(payload)
     189      msg.set_param('charset',
     190                    email.charset.ALIASES.get(charset, charset),
     191                    replace=True)
     192      msg['Content-Transfer-Encoding'] = cte
     193      _finalize_set(msg, disposition, filename, cid, params)
     194  raw_data_manager.add_set_handler(str, set_text_content)
     195  
     196  
     197  def set_message_content(msg, message, subtype="rfc822", cte=None,
     198                         disposition=None, filename=None, cid=None,
     199                         params=None, headers=None):
     200      if subtype == 'partial':
     201          raise ValueError("message/partial is not supported for Message objects")
     202      if subtype == 'rfc822':
     203          if cte not in (None, '7bit', '8bit', 'binary'):
     204              # http://tools.ietf.org/html/rfc2046#section-5.2.1 mandate.
     205              raise ValueError(
     206                  "message/rfc822 parts do not support cte={}".format(cte))
     207          # 8bit will get coerced on serialization if policy.cte_type='7bit'.  We
     208          # may end up claiming 8bit when it isn't needed, but the only negative
     209          # result of that should be a gateway that needs to coerce to 7bit
     210          # having to look through the whole embedded message to discover whether
     211          # or not it actually has to do anything.
     212          cte = '8bit' if cte is None else cte
     213      elif subtype == 'external-body':
     214          if cte not in (None, '7bit'):
     215              # http://tools.ietf.org/html/rfc2046#section-5.2.3 mandate.
     216              raise ValueError(
     217                  "message/external-body parts do not support cte={}".format(cte))
     218          cte = '7bit'
     219      elif cte is None:
     220          # http://tools.ietf.org/html/rfc2046#section-5.2.4 says all future
     221          # subtypes should be restricted to 7bit, so assume that.
     222          cte = '7bit'
     223      _prepare_set(msg, 'message', subtype, headers)
     224      msg.set_payload([message])
     225      msg['Content-Transfer-Encoding'] = cte
     226      _finalize_set(msg, disposition, filename, cid, params)
     227  raw_data_manager.add_set_handler(email.message.Message, set_message_content)
     228  
     229  
     230  def set_bytes_content(msg, data, maintype, subtype, cte='base64',
     231                       disposition=None, filename=None, cid=None,
     232                       params=None, headers=None):
     233      _prepare_set(msg, maintype, subtype, headers)
     234      if cte == 'base64':
     235          data = _encode_base64(data, max_line_length=msg.policy.max_line_length)
     236      elif cte == 'quoted-printable':
     237          # XXX: quoprimime.body_encode won't encode newline characters in data,
     238          # so we can't use it.  This means max_line_length is ignored.  Another
     239          # bug to fix later.  (Note: encoders.quopri is broken on line ends.)
     240          data = binascii.b2a_qp(data, istext=False, header=False, quotetabs=True)
     241          data = data.decode('ascii')
     242      elif cte == '7bit':
     243          data = data.decode('ascii')
     244      elif cte in ('8bit', 'binary'):
     245          data = data.decode('ascii', 'surrogateescape')
     246      msg.set_payload(data)
     247      msg['Content-Transfer-Encoding'] = cte
     248      _finalize_set(msg, disposition, filename, cid, params)
     249  for typ in (bytes, bytearray, memoryview):
     250      raw_data_manager.add_set_handler(typ, set_bytes_content)
     251  del typ