(root)/
Python-3.12.0/
Lib/
email/
headerregistry.py
       1  """Representing and manipulating email headers via custom objects.
       2  
       3  This module provides an implementation of the HeaderRegistry API.
       4  The implementation is designed to flexibly follow RFC5322 rules.
       5  """
       6  from types import MappingProxyType
       7  
       8  from email import utils
       9  from email import errors
      10  from email import _header_value_parser as parser
      11  
      12  class ESC[4;38;5;81mAddress:
      13  
      14      def __init__(self, display_name='', username='', domain='', addr_spec=None):
      15          """Create an object representing a full email address.
      16  
      17          An address can have a 'display_name', a 'username', and a 'domain'.  In
      18          addition to specifying the username and domain separately, they may be
      19          specified together by using the addr_spec keyword *instead of* the
      20          username and domain keywords.  If an addr_spec string is specified it
      21          must be properly quoted according to RFC 5322 rules; an error will be
      22          raised if it is not.
      23  
      24          An Address object has display_name, username, domain, and addr_spec
      25          attributes, all of which are read-only.  The addr_spec and the string
      26          value of the object are both quoted according to RFC5322 rules, but
      27          without any Content Transfer Encoding.
      28  
      29          """
      30  
      31          inputs = ''.join(filter(None, (display_name, username, domain, addr_spec)))
      32          if '\r' in inputs or '\n' in inputs:
      33              raise ValueError("invalid arguments; address parts cannot contain CR or LF")
      34  
      35          # This clause with its potential 'raise' may only happen when an
      36          # application program creates an Address object using an addr_spec
      37          # keyword.  The email library code itself must always supply username
      38          # and domain.
      39          if addr_spec is not None:
      40              if username or domain:
      41                  raise TypeError("addrspec specified when username and/or "
      42                                  "domain also specified")
      43              a_s, rest = parser.get_addr_spec(addr_spec)
      44              if rest:
      45                  raise ValueError("Invalid addr_spec; only '{}' "
      46                                   "could be parsed from '{}'".format(
      47                                      a_s, addr_spec))
      48              if a_s.all_defects:
      49                  raise a_s.all_defects[0]
      50              username = a_s.local_part
      51              domain = a_s.domain
      52          self._display_name = display_name
      53          self._username = username
      54          self._domain = domain
      55  
      56      @property
      57      def display_name(self):
      58          return self._display_name
      59  
      60      @property
      61      def username(self):
      62          return self._username
      63  
      64      @property
      65      def domain(self):
      66          return self._domain
      67  
      68      @property
      69      def addr_spec(self):
      70          """The addr_spec (username@domain) portion of the address, quoted
      71          according to RFC 5322 rules, but with no Content Transfer Encoding.
      72          """
      73          lp = self.username
      74          if not parser.DOT_ATOM_ENDS.isdisjoint(lp):
      75              lp = parser.quote_string(lp)
      76          if self.domain:
      77              return lp + '@' + self.domain
      78          if not lp:
      79              return '<>'
      80          return lp
      81  
      82      def __repr__(self):
      83          return "{}(display_name={!r}, username={!r}, domain={!r})".format(
      84                          self.__class__.__name__,
      85                          self.display_name, self.username, self.domain)
      86  
      87      def __str__(self):
      88          disp = self.display_name
      89          if not parser.SPECIALS.isdisjoint(disp):
      90              disp = parser.quote_string(disp)
      91          if disp:
      92              addr_spec = '' if self.addr_spec=='<>' else self.addr_spec
      93              return "{} <{}>".format(disp, addr_spec)
      94          return self.addr_spec
      95  
      96      def __eq__(self, other):
      97          if not isinstance(other, Address):
      98              return NotImplemented
      99          return (self.display_name == other.display_name and
     100                  self.username == other.username and
     101                  self.domain == other.domain)
     102  
     103  
     104  class ESC[4;38;5;81mGroup:
     105  
     106      def __init__(self, display_name=None, addresses=None):
     107          """Create an object representing an address group.
     108  
     109          An address group consists of a display_name followed by colon and a
     110          list of addresses (see Address) terminated by a semi-colon.  The Group
     111          is created by specifying a display_name and a possibly empty list of
     112          Address objects.  A Group can also be used to represent a single
     113          address that is not in a group, which is convenient when manipulating
     114          lists that are a combination of Groups and individual Addresses.  In
     115          this case the display_name should be set to None.  In particular, the
     116          string representation of a Group whose display_name is None is the same
     117          as the Address object, if there is one and only one Address object in
     118          the addresses list.
     119  
     120          """
     121          self._display_name = display_name
     122          self._addresses = tuple(addresses) if addresses else tuple()
     123  
     124      @property
     125      def display_name(self):
     126          return self._display_name
     127  
     128      @property
     129      def addresses(self):
     130          return self._addresses
     131  
     132      def __repr__(self):
     133          return "{}(display_name={!r}, addresses={!r}".format(
     134                   self.__class__.__name__,
     135                   self.display_name, self.addresses)
     136  
     137      def __str__(self):
     138          if self.display_name is None and len(self.addresses)==1:
     139              return str(self.addresses[0])
     140          disp = self.display_name
     141          if disp is not None and not parser.SPECIALS.isdisjoint(disp):
     142              disp = parser.quote_string(disp)
     143          adrstr = ", ".join(str(x) for x in self.addresses)
     144          adrstr = ' ' + adrstr if adrstr else adrstr
     145          return "{}:{};".format(disp, adrstr)
     146  
     147      def __eq__(self, other):
     148          if not isinstance(other, Group):
     149              return NotImplemented
     150          return (self.display_name == other.display_name and
     151                  self.addresses == other.addresses)
     152  
     153  
     154  # Header Classes #
     155  
     156  class ESC[4;38;5;81mBaseHeader(ESC[4;38;5;149mstr):
     157  
     158      """Base class for message headers.
     159  
     160      Implements generic behavior and provides tools for subclasses.
     161  
     162      A subclass must define a classmethod named 'parse' that takes an unfolded
     163      value string and a dictionary as its arguments.  The dictionary will
     164      contain one key, 'defects', initialized to an empty list.  After the call
     165      the dictionary must contain two additional keys: parse_tree, set to the
     166      parse tree obtained from parsing the header, and 'decoded', set to the
     167      string value of the idealized representation of the data from the value.
     168      (That is, encoded words are decoded, and values that have canonical
     169      representations are so represented.)
     170  
     171      The defects key is intended to collect parsing defects, which the message
     172      parser will subsequently dispose of as appropriate.  The parser should not,
     173      insofar as practical, raise any errors.  Defects should be added to the
     174      list instead.  The standard header parsers register defects for RFC
     175      compliance issues, for obsolete RFC syntax, and for unrecoverable parsing
     176      errors.
     177  
     178      The parse method may add additional keys to the dictionary.  In this case
     179      the subclass must define an 'init' method, which will be passed the
     180      dictionary as its keyword arguments.  The method should use (usually by
     181      setting them as the value of similarly named attributes) and remove all the
     182      extra keys added by its parse method, and then use super to call its parent
     183      class with the remaining arguments and keywords.
     184  
     185      The subclass should also make sure that a 'max_count' attribute is defined
     186      that is either None or 1. XXX: need to better define this API.
     187  
     188      """
     189  
     190      def __new__(cls, name, value):
     191          kwds = {'defects': []}
     192          cls.parse(value, kwds)
     193          if utils._has_surrogates(kwds['decoded']):
     194              kwds['decoded'] = utils._sanitize(kwds['decoded'])
     195          self = str.__new__(cls, kwds['decoded'])
     196          del kwds['decoded']
     197          self.init(name, **kwds)
     198          return self
     199  
     200      def init(self, name, *, parse_tree, defects):
     201          self._name = name
     202          self._parse_tree = parse_tree
     203          self._defects = defects
     204  
     205      @property
     206      def name(self):
     207          return self._name
     208  
     209      @property
     210      def defects(self):
     211          return tuple(self._defects)
     212  
     213      def __reduce__(self):
     214          return (
     215              _reconstruct_header,
     216              (
     217                  self.__class__.__name__,
     218                  self.__class__.__bases__,
     219                  str(self),
     220              ),
     221              self.__getstate__())
     222  
     223      @classmethod
     224      def _reconstruct(cls, value):
     225          return str.__new__(cls, value)
     226  
     227      def fold(self, *, policy):
     228          """Fold header according to policy.
     229  
     230          The parsed representation of the header is folded according to
     231          RFC5322 rules, as modified by the policy.  If the parse tree
     232          contains surrogateescaped bytes, the bytes are CTE encoded using
     233          the charset 'unknown-8bit".
     234  
     235          Any non-ASCII characters in the parse tree are CTE encoded using
     236          charset utf-8. XXX: make this a policy setting.
     237  
     238          The returned value is an ASCII-only string possibly containing linesep
     239          characters, and ending with a linesep character.  The string includes
     240          the header name and the ': ' separator.
     241  
     242          """
     243          # At some point we need to put fws here if it was in the source.
     244          header = parser.Header([
     245              parser.HeaderLabel([
     246                  parser.ValueTerminal(self.name, 'header-name'),
     247                  parser.ValueTerminal(':', 'header-sep')]),
     248              ])
     249          if self._parse_tree:
     250              header.append(
     251                  parser.CFWSList([parser.WhiteSpaceTerminal(' ', 'fws')]))
     252          header.append(self._parse_tree)
     253          return header.fold(policy=policy)
     254  
     255  
     256  def _reconstruct_header(cls_name, bases, value):
     257      return type(cls_name, bases, {})._reconstruct(value)
     258  
     259  
     260  class ESC[4;38;5;81mUnstructuredHeader:
     261  
     262      max_count = None
     263      value_parser = staticmethod(parser.get_unstructured)
     264  
     265      @classmethod
     266      def parse(cls, value, kwds):
     267          kwds['parse_tree'] = cls.value_parser(value)
     268          kwds['decoded'] = str(kwds['parse_tree'])
     269  
     270  
     271  class ESC[4;38;5;81mUniqueUnstructuredHeader(ESC[4;38;5;149mUnstructuredHeader):
     272  
     273      max_count = 1
     274  
     275  
     276  class ESC[4;38;5;81mDateHeader:
     277  
     278      """Header whose value consists of a single timestamp.
     279  
     280      Provides an additional attribute, datetime, which is either an aware
     281      datetime using a timezone, or a naive datetime if the timezone
     282      in the input string is -0000.  Also accepts a datetime as input.
     283      The 'value' attribute is the normalized form of the timestamp,
     284      which means it is the output of format_datetime on the datetime.
     285      """
     286  
     287      max_count = None
     288  
     289      # This is used only for folding, not for creating 'decoded'.
     290      value_parser = staticmethod(parser.get_unstructured)
     291  
     292      @classmethod
     293      def parse(cls, value, kwds):
     294          if not value:
     295              kwds['defects'].append(errors.HeaderMissingRequiredValue())
     296              kwds['datetime'] = None
     297              kwds['decoded'] = ''
     298              kwds['parse_tree'] = parser.TokenList()
     299              return
     300          if isinstance(value, str):
     301              kwds['decoded'] = value
     302              try:
     303                  value = utils.parsedate_to_datetime(value)
     304              except ValueError:
     305                  kwds['defects'].append(errors.InvalidDateDefect('Invalid date value or format'))
     306                  kwds['datetime'] = None
     307                  kwds['parse_tree'] = parser.TokenList()
     308                  return
     309          kwds['datetime'] = value
     310          kwds['decoded'] = utils.format_datetime(kwds['datetime'])
     311          kwds['parse_tree'] = cls.value_parser(kwds['decoded'])
     312  
     313      def init(self, *args, **kw):
     314          self._datetime = kw.pop('datetime')
     315          super().init(*args, **kw)
     316  
     317      @property
     318      def datetime(self):
     319          return self._datetime
     320  
     321  
     322  class ESC[4;38;5;81mUniqueDateHeader(ESC[4;38;5;149mDateHeader):
     323  
     324      max_count = 1
     325  
     326  
     327  class ESC[4;38;5;81mAddressHeader:
     328  
     329      max_count = None
     330  
     331      @staticmethod
     332      def value_parser(value):
     333          address_list, value = parser.get_address_list(value)
     334          assert not value, 'this should not happen'
     335          return address_list
     336  
     337      @classmethod
     338      def parse(cls, value, kwds):
     339          if isinstance(value, str):
     340              # We are translating here from the RFC language (address/mailbox)
     341              # to our API language (group/address).
     342              kwds['parse_tree'] = address_list = cls.value_parser(value)
     343              groups = []
     344              for addr in address_list.addresses:
     345                  groups.append(Group(addr.display_name,
     346                                      [Address(mb.display_name or '',
     347                                               mb.local_part or '',
     348                                               mb.domain or '')
     349                                       for mb in addr.all_mailboxes]))
     350              defects = list(address_list.all_defects)
     351          else:
     352              # Assume it is Address/Group stuff
     353              if not hasattr(value, '__iter__'):
     354                  value = [value]
     355              groups = [Group(None, [item]) if not hasattr(item, 'addresses')
     356                                            else item
     357                                      for item in value]
     358              defects = []
     359          kwds['groups'] = groups
     360          kwds['defects'] = defects
     361          kwds['decoded'] = ', '.join([str(item) for item in groups])
     362          if 'parse_tree' not in kwds:
     363              kwds['parse_tree'] = cls.value_parser(kwds['decoded'])
     364  
     365      def init(self, *args, **kw):
     366          self._groups = tuple(kw.pop('groups'))
     367          self._addresses = None
     368          super().init(*args, **kw)
     369  
     370      @property
     371      def groups(self):
     372          return self._groups
     373  
     374      @property
     375      def addresses(self):
     376          if self._addresses is None:
     377              self._addresses = tuple(address for group in self._groups
     378                                              for address in group.addresses)
     379          return self._addresses
     380  
     381  
     382  class ESC[4;38;5;81mUniqueAddressHeader(ESC[4;38;5;149mAddressHeader):
     383  
     384      max_count = 1
     385  
     386  
     387  class ESC[4;38;5;81mSingleAddressHeader(ESC[4;38;5;149mAddressHeader):
     388  
     389      @property
     390      def address(self):
     391          if len(self.addresses)!=1:
     392              raise ValueError(("value of single address header {} is not "
     393                  "a single address").format(self.name))
     394          return self.addresses[0]
     395  
     396  
     397  class ESC[4;38;5;81mUniqueSingleAddressHeader(ESC[4;38;5;149mSingleAddressHeader):
     398  
     399      max_count = 1
     400  
     401  
     402  class ESC[4;38;5;81mMIMEVersionHeader:
     403  
     404      max_count = 1
     405  
     406      value_parser = staticmethod(parser.parse_mime_version)
     407  
     408      @classmethod
     409      def parse(cls, value, kwds):
     410          kwds['parse_tree'] = parse_tree = cls.value_parser(value)
     411          kwds['decoded'] = str(parse_tree)
     412          kwds['defects'].extend(parse_tree.all_defects)
     413          kwds['major'] = None if parse_tree.minor is None else parse_tree.major
     414          kwds['minor'] = parse_tree.minor
     415          if parse_tree.minor is not None:
     416              kwds['version'] = '{}.{}'.format(kwds['major'], kwds['minor'])
     417          else:
     418              kwds['version'] = None
     419  
     420      def init(self, *args, **kw):
     421          self._version = kw.pop('version')
     422          self._major = kw.pop('major')
     423          self._minor = kw.pop('minor')
     424          super().init(*args, **kw)
     425  
     426      @property
     427      def major(self):
     428          return self._major
     429  
     430      @property
     431      def minor(self):
     432          return self._minor
     433  
     434      @property
     435      def version(self):
     436          return self._version
     437  
     438  
     439  class ESC[4;38;5;81mParameterizedMIMEHeader:
     440  
     441      # Mixin that handles the params dict.  Must be subclassed and
     442      # a property value_parser for the specific header provided.
     443  
     444      max_count = 1
     445  
     446      @classmethod
     447      def parse(cls, value, kwds):
     448          kwds['parse_tree'] = parse_tree = cls.value_parser(value)
     449          kwds['decoded'] = str(parse_tree)
     450          kwds['defects'].extend(parse_tree.all_defects)
     451          if parse_tree.params is None:
     452              kwds['params'] = {}
     453          else:
     454              # The MIME RFCs specify that parameter ordering is arbitrary.
     455              kwds['params'] = {utils._sanitize(name).lower():
     456                                      utils._sanitize(value)
     457                                 for name, value in parse_tree.params}
     458  
     459      def init(self, *args, **kw):
     460          self._params = kw.pop('params')
     461          super().init(*args, **kw)
     462  
     463      @property
     464      def params(self):
     465          return MappingProxyType(self._params)
     466  
     467  
     468  class ESC[4;38;5;81mContentTypeHeader(ESC[4;38;5;149mParameterizedMIMEHeader):
     469  
     470      value_parser = staticmethod(parser.parse_content_type_header)
     471  
     472      def init(self, *args, **kw):
     473          super().init(*args, **kw)
     474          self._maintype = utils._sanitize(self._parse_tree.maintype)
     475          self._subtype = utils._sanitize(self._parse_tree.subtype)
     476  
     477      @property
     478      def maintype(self):
     479          return self._maintype
     480  
     481      @property
     482      def subtype(self):
     483          return self._subtype
     484  
     485      @property
     486      def content_type(self):
     487          return self.maintype + '/' + self.subtype
     488  
     489  
     490  class ESC[4;38;5;81mContentDispositionHeader(ESC[4;38;5;149mParameterizedMIMEHeader):
     491  
     492      value_parser = staticmethod(parser.parse_content_disposition_header)
     493  
     494      def init(self, *args, **kw):
     495          super().init(*args, **kw)
     496          cd = self._parse_tree.content_disposition
     497          self._content_disposition = cd if cd is None else utils._sanitize(cd)
     498  
     499      @property
     500      def content_disposition(self):
     501          return self._content_disposition
     502  
     503  
     504  class ESC[4;38;5;81mContentTransferEncodingHeader:
     505  
     506      max_count = 1
     507  
     508      value_parser = staticmethod(parser.parse_content_transfer_encoding_header)
     509  
     510      @classmethod
     511      def parse(cls, value, kwds):
     512          kwds['parse_tree'] = parse_tree = cls.value_parser(value)
     513          kwds['decoded'] = str(parse_tree)
     514          kwds['defects'].extend(parse_tree.all_defects)
     515  
     516      def init(self, *args, **kw):
     517          super().init(*args, **kw)
     518          self._cte = utils._sanitize(self._parse_tree.cte)
     519  
     520      @property
     521      def cte(self):
     522          return self._cte
     523  
     524  
     525  class ESC[4;38;5;81mMessageIDHeader:
     526  
     527      max_count = 1
     528      value_parser = staticmethod(parser.parse_message_id)
     529  
     530      @classmethod
     531      def parse(cls, value, kwds):
     532          kwds['parse_tree'] = parse_tree = cls.value_parser(value)
     533          kwds['decoded'] = str(parse_tree)
     534          kwds['defects'].extend(parse_tree.all_defects)
     535  
     536  
     537  # The header factory #
     538  
     539  _default_header_map = {
     540      'subject':                      UniqueUnstructuredHeader,
     541      'date':                         UniqueDateHeader,
     542      'resent-date':                  DateHeader,
     543      'orig-date':                    UniqueDateHeader,
     544      'sender':                       UniqueSingleAddressHeader,
     545      'resent-sender':                SingleAddressHeader,
     546      'to':                           UniqueAddressHeader,
     547      'resent-to':                    AddressHeader,
     548      'cc':                           UniqueAddressHeader,
     549      'resent-cc':                    AddressHeader,
     550      'bcc':                          UniqueAddressHeader,
     551      'resent-bcc':                   AddressHeader,
     552      'from':                         UniqueAddressHeader,
     553      'resent-from':                  AddressHeader,
     554      'reply-to':                     UniqueAddressHeader,
     555      'mime-version':                 MIMEVersionHeader,
     556      'content-type':                 ContentTypeHeader,
     557      'content-disposition':          ContentDispositionHeader,
     558      'content-transfer-encoding':    ContentTransferEncodingHeader,
     559      'message-id':                   MessageIDHeader,
     560      }
     561  
     562  class ESC[4;38;5;81mHeaderRegistry:
     563  
     564      """A header_factory and header registry."""
     565  
     566      def __init__(self, base_class=BaseHeader, default_class=UnstructuredHeader,
     567                         use_default_map=True):
     568          """Create a header_factory that works with the Policy API.
     569  
     570          base_class is the class that will be the last class in the created
     571          header class's __bases__ list.  default_class is the class that will be
     572          used if "name" (see __call__) does not appear in the registry.
     573          use_default_map controls whether or not the default mapping of names to
     574          specialized classes is copied in to the registry when the factory is
     575          created.  The default is True.
     576  
     577          """
     578          self.registry = {}
     579          self.base_class = base_class
     580          self.default_class = default_class
     581          if use_default_map:
     582              self.registry.update(_default_header_map)
     583  
     584      def map_to_type(self, name, cls):
     585          """Register cls as the specialized class for handling "name" headers.
     586  
     587          """
     588          self.registry[name.lower()] = cls
     589  
     590      def __getitem__(self, name):
     591          cls = self.registry.get(name.lower(), self.default_class)
     592          return type('_'+cls.__name__, (cls, self.base_class), {})
     593  
     594      def __call__(self, name, value):
     595          """Create a header instance for header 'name' from 'value'.
     596  
     597          Creates a header instance by creating a specialized class for parsing
     598          and representing the specified header by combining the factory
     599          base_class with a specialized class from the registry or the
     600          default_class, and passing the name and value to the constructed
     601          class's constructor.
     602  
     603          """
     604          return self[name](name, value)