(root)/
Python-3.11.7/
Lib/
test/
test_email/
test_policy.py
       1  import io
       2  import types
       3  import textwrap
       4  import unittest
       5  import email.errors
       6  import email.policy
       7  import email.parser
       8  import email.generator
       9  import email.message
      10  from email import headerregistry
      11  
      12  def make_defaults(base_defaults, differences):
      13      defaults = base_defaults.copy()
      14      defaults.update(differences)
      15      return defaults
      16  
      17  class ESC[4;38;5;81mPolicyAPITests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      18  
      19      longMessage = True
      20  
      21      # Base default values.
      22      compat32_defaults = {
      23          'max_line_length':          78,
      24          'linesep':                  '\n',
      25          'cte_type':                 '8bit',
      26          'raise_on_defect':          False,
      27          'mangle_from_':             True,
      28          'message_factory':          None,
      29          }
      30      # These default values are the ones set on email.policy.default.
      31      # If any of these defaults change, the docs must be updated.
      32      policy_defaults = compat32_defaults.copy()
      33      policy_defaults.update({
      34          'utf8':                     False,
      35          'raise_on_defect':          False,
      36          'header_factory':           email.policy.EmailPolicy.header_factory,
      37          'refold_source':            'long',
      38          'content_manager':          email.policy.EmailPolicy.content_manager,
      39          'mangle_from_':             False,
      40          'message_factory':          email.message.EmailMessage,
      41          })
      42  
      43      # For each policy under test, we give here what we expect the defaults to
      44      # be for that policy.  The second argument to make defaults is the
      45      # difference between the base defaults and that for the particular policy.
      46      new_policy = email.policy.EmailPolicy()
      47      policies = {
      48          email.policy.compat32: make_defaults(compat32_defaults, {}),
      49          email.policy.default: make_defaults(policy_defaults, {}),
      50          email.policy.SMTP: make_defaults(policy_defaults,
      51                                           {'linesep': '\r\n'}),
      52          email.policy.SMTPUTF8: make_defaults(policy_defaults,
      53                                               {'linesep': '\r\n',
      54                                                'utf8': True}),
      55          email.policy.HTTP: make_defaults(policy_defaults,
      56                                           {'linesep': '\r\n',
      57                                            'max_line_length': None}),
      58          email.policy.strict: make_defaults(policy_defaults,
      59                                             {'raise_on_defect': True}),
      60          new_policy: make_defaults(policy_defaults, {}),
      61          }
      62      # Creating a new policy creates a new header factory.  There is a test
      63      # later that proves this.
      64      policies[new_policy]['header_factory'] = new_policy.header_factory
      65  
      66      def test_defaults(self):
      67          for policy, expected in self.policies.items():
      68              for attr, value in expected.items():
      69                  with self.subTest(policy=policy, attr=attr):
      70                      self.assertEqual(getattr(policy, attr), value,
      71                                      ("change {} docs/docstrings if defaults have "
      72                                      "changed").format(policy))
      73  
      74      def test_all_attributes_covered(self):
      75          for policy, expected in self.policies.items():
      76              for attr in dir(policy):
      77                  with self.subTest(policy=policy, attr=attr):
      78                      if (attr.startswith('_') or
      79                              isinstance(getattr(email.policy.EmailPolicy, attr),
      80                                    types.FunctionType)):
      81                          continue
      82                      else:
      83                          self.assertIn(attr, expected,
      84                                        "{} is not fully tested".format(attr))
      85  
      86      def test_abc(self):
      87          with self.assertRaises(TypeError) as cm:
      88              email.policy.Policy()
      89          msg = str(cm.exception)
      90          abstract_methods = ('fold',
      91                              'fold_binary',
      92                              'header_fetch_parse',
      93                              'header_source_parse',
      94                              'header_store_parse')
      95          for method in abstract_methods:
      96              self.assertIn(method, msg)
      97  
      98      def test_policy_is_immutable(self):
      99          for policy, defaults in self.policies.items():
     100              for attr in defaults:
     101                  with self.assertRaisesRegex(AttributeError, attr+".*read-only"):
     102                      setattr(policy, attr, None)
     103              with self.assertRaisesRegex(AttributeError, 'no attribute.*foo'):
     104                  policy.foo = None
     105  
     106      def test_set_policy_attrs_when_cloned(self):
     107          # None of the attributes has a default value of None, so we set them
     108          # all to None in the clone call and check that it worked.
     109          for policyclass, defaults in self.policies.items():
     110              testattrdict = {attr: None for attr in defaults}
     111              policy = policyclass.clone(**testattrdict)
     112              for attr in defaults:
     113                  self.assertIsNone(getattr(policy, attr))
     114  
     115      def test_reject_non_policy_keyword_when_called(self):
     116          for policyclass in self.policies:
     117              with self.assertRaises(TypeError):
     118                  policyclass(this_keyword_should_not_be_valid=None)
     119              with self.assertRaises(TypeError):
     120                  policyclass(newtline=None)
     121  
     122      def test_policy_addition(self):
     123          expected = self.policy_defaults.copy()
     124          p1 = email.policy.default.clone(max_line_length=100)
     125          p2 = email.policy.default.clone(max_line_length=50)
     126          added = p1 + p2
     127          expected.update(max_line_length=50)
     128          for attr, value in expected.items():
     129              self.assertEqual(getattr(added, attr), value)
     130          added = p2 + p1
     131          expected.update(max_line_length=100)
     132          for attr, value in expected.items():
     133              self.assertEqual(getattr(added, attr), value)
     134          added = added + email.policy.default
     135          for attr, value in expected.items():
     136              self.assertEqual(getattr(added, attr), value)
     137  
     138      def test_fold_zero_max_line_length(self):
     139          expected = 'Subject: =?utf-8?q?=C3=A1?=\n'
     140  
     141          msg = email.message.EmailMessage()
     142          msg['Subject'] = 'á'
     143  
     144          p1 = email.policy.default.clone(max_line_length=0)
     145          p2 = email.policy.default.clone(max_line_length=None)
     146  
     147          self.assertEqual(p1.fold('Subject', msg['Subject']), expected)
     148          self.assertEqual(p2.fold('Subject', msg['Subject']), expected)
     149  
     150      def test_register_defect(self):
     151          class ESC[4;38;5;81mDummy:
     152              def __init__(self):
     153                  self.defects = []
     154          obj = Dummy()
     155          defect = object()
     156          policy = email.policy.EmailPolicy()
     157          policy.register_defect(obj, defect)
     158          self.assertEqual(obj.defects, [defect])
     159          defect2 = object()
     160          policy.register_defect(obj, defect2)
     161          self.assertEqual(obj.defects, [defect, defect2])
     162  
     163      class ESC[4;38;5;81mMyObj:
     164          def __init__(self):
     165              self.defects = []
     166  
     167      class ESC[4;38;5;81mMyDefect(ESC[4;38;5;149mException):
     168          pass
     169  
     170      def test_handle_defect_raises_on_strict(self):
     171          foo = self.MyObj()
     172          defect = self.MyDefect("the telly is broken")
     173          with self.assertRaisesRegex(self.MyDefect, "the telly is broken"):
     174              email.policy.strict.handle_defect(foo, defect)
     175  
     176      def test_handle_defect_registers_defect(self):
     177          foo = self.MyObj()
     178          defect1 = self.MyDefect("one")
     179          email.policy.default.handle_defect(foo, defect1)
     180          self.assertEqual(foo.defects, [defect1])
     181          defect2 = self.MyDefect("two")
     182          email.policy.default.handle_defect(foo, defect2)
     183          self.assertEqual(foo.defects, [defect1, defect2])
     184  
     185      class ESC[4;38;5;81mMyPolicy(ESC[4;38;5;149memailESC[4;38;5;149m.ESC[4;38;5;149mpolicyESC[4;38;5;149m.ESC[4;38;5;149mEmailPolicy):
     186          defects = None
     187          def __init__(self, *args, **kw):
     188              super().__init__(*args, defects=[], **kw)
     189          def register_defect(self, obj, defect):
     190              self.defects.append(defect)
     191  
     192      def test_overridden_register_defect_still_raises(self):
     193          foo = self.MyObj()
     194          defect = self.MyDefect("the telly is broken")
     195          with self.assertRaisesRegex(self.MyDefect, "the telly is broken"):
     196              self.MyPolicy(raise_on_defect=True).handle_defect(foo, defect)
     197  
     198      def test_overridden_register_defect_works(self):
     199          foo = self.MyObj()
     200          defect1 = self.MyDefect("one")
     201          my_policy = self.MyPolicy()
     202          my_policy.handle_defect(foo, defect1)
     203          self.assertEqual(my_policy.defects, [defect1])
     204          self.assertEqual(foo.defects, [])
     205          defect2 = self.MyDefect("two")
     206          my_policy.handle_defect(foo, defect2)
     207          self.assertEqual(my_policy.defects, [defect1, defect2])
     208          self.assertEqual(foo.defects, [])
     209  
     210      def test_default_header_factory(self):
     211          h = email.policy.default.header_factory('Test', 'test')
     212          self.assertEqual(h.name, 'Test')
     213          self.assertIsInstance(h, headerregistry.UnstructuredHeader)
     214          self.assertIsInstance(h, headerregistry.BaseHeader)
     215  
     216      class ESC[4;38;5;81mFoo:
     217          parse = headerregistry.UnstructuredHeader.parse
     218  
     219      def test_each_Policy_gets_unique_factory(self):
     220          policy1 = email.policy.EmailPolicy()
     221          policy2 = email.policy.EmailPolicy()
     222          policy1.header_factory.map_to_type('foo', self.Foo)
     223          h = policy1.header_factory('foo', 'test')
     224          self.assertIsInstance(h, self.Foo)
     225          self.assertNotIsInstance(h, headerregistry.UnstructuredHeader)
     226          h = policy2.header_factory('foo', 'test')
     227          self.assertNotIsInstance(h, self.Foo)
     228          self.assertIsInstance(h, headerregistry.UnstructuredHeader)
     229  
     230      def test_clone_copies_factory(self):
     231          policy1 = email.policy.EmailPolicy()
     232          policy2 = policy1.clone()
     233          policy1.header_factory.map_to_type('foo', self.Foo)
     234          h = policy1.header_factory('foo', 'test')
     235          self.assertIsInstance(h, self.Foo)
     236          h = policy2.header_factory('foo', 'test')
     237          self.assertIsInstance(h, self.Foo)
     238  
     239      def test_new_factory_overrides_default(self):
     240          mypolicy = email.policy.EmailPolicy()
     241          myfactory = mypolicy.header_factory
     242          newpolicy = mypolicy + email.policy.strict
     243          self.assertEqual(newpolicy.header_factory, myfactory)
     244          newpolicy = email.policy.strict + mypolicy
     245          self.assertEqual(newpolicy.header_factory, myfactory)
     246  
     247      def test_adding_default_policies_preserves_default_factory(self):
     248          newpolicy = email.policy.default + email.policy.strict
     249          self.assertEqual(newpolicy.header_factory,
     250                           email.policy.EmailPolicy.header_factory)
     251          self.assertEqual(newpolicy.__dict__, {'raise_on_defect': True})
     252  
     253      def test_non_ascii_chars_do_not_cause_inf_loop(self):
     254          policy = email.policy.default.clone(max_line_length=20)
     255          actual = policy.fold('Subject', 'ą' * 12)
     256          self.assertEqual(
     257              actual,
     258              'Subject: \n' +
     259              12 * ' =?utf-8?q?=C4=85?=\n')
     260  
     261      def test_short_maxlen_error(self):
     262          # RFC 2047 chrome takes up 7 characters, plus the length of the charset
     263          # name, so folding should fail if maxlen is lower than the minimum
     264          # required length for a line.
     265  
     266          # Note: This is only triggered when there is a single word longer than
     267          # max_line_length, hence the 1234567890 at the end of this whimsical
     268          # subject. This is because when we encounter a word longer than
     269          # max_line_length, it is broken down into encoded words to fit
     270          # max_line_length. If the max_line_length isn't large enough to even
     271          # contain the RFC 2047 chrome (`?=<charset>?q??=`), we fail.
     272          subject = "Melt away the pounds with this one simple trick! 1234567890"
     273  
     274          for maxlen in [3, 7, 9]:
     275              with self.subTest(maxlen=maxlen):
     276                  policy = email.policy.default.clone(max_line_length=maxlen)
     277                  with self.assertRaises(email.errors.HeaderParseError):
     278                      policy.fold("Subject", subject)
     279  
     280      # XXX: Need subclassing tests.
     281      # For adding subclassed objects, make sure the usual rules apply (subclass
     282      # wins), but that the order still works (right overrides left).
     283  
     284  
     285  class ESC[4;38;5;81mTestException(ESC[4;38;5;149mException):
     286      pass
     287  
     288  class ESC[4;38;5;81mTestPolicyPropagation(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     289  
     290      # The abstract methods are used by the parser but not by the wrapper
     291      # functions that call it, so if the exception gets raised we know that the
     292      # policy was actually propagated all the way to feedparser.
     293      class ESC[4;38;5;81mMyPolicy(ESC[4;38;5;149memailESC[4;38;5;149m.ESC[4;38;5;149mpolicyESC[4;38;5;149m.ESC[4;38;5;149mPolicy):
     294          def badmethod(self, *args, **kw):
     295              raise TestException("test")
     296          fold = fold_binary = header_fetch_parser = badmethod
     297          header_source_parse = header_store_parse = badmethod
     298  
     299      def test_message_from_string(self):
     300          with self.assertRaisesRegex(TestException, "^test$"):
     301              email.message_from_string("Subject: test\n\n",
     302                                        policy=self.MyPolicy)
     303  
     304      def test_message_from_bytes(self):
     305          with self.assertRaisesRegex(TestException, "^test$"):
     306              email.message_from_bytes(b"Subject: test\n\n",
     307                                       policy=self.MyPolicy)
     308  
     309      def test_message_from_file(self):
     310          f = io.StringIO('Subject: test\n\n')
     311          with self.assertRaisesRegex(TestException, "^test$"):
     312              email.message_from_file(f, policy=self.MyPolicy)
     313  
     314      def test_message_from_binary_file(self):
     315          f = io.BytesIO(b'Subject: test\n\n')
     316          with self.assertRaisesRegex(TestException, "^test$"):
     317              email.message_from_binary_file(f, policy=self.MyPolicy)
     318  
     319      # These are redundant, but we need them for black-box completeness.
     320  
     321      def test_parser(self):
     322          p = email.parser.Parser(policy=self.MyPolicy)
     323          with self.assertRaisesRegex(TestException, "^test$"):
     324              p.parsestr('Subject: test\n\n')
     325  
     326      def test_bytes_parser(self):
     327          p = email.parser.BytesParser(policy=self.MyPolicy)
     328          with self.assertRaisesRegex(TestException, "^test$"):
     329              p.parsebytes(b'Subject: test\n\n')
     330  
     331      # Now that we've established that all the parse methods get the
     332      # policy in to feedparser, we can use message_from_string for
     333      # the rest of the propagation tests.
     334  
     335      def _make_msg(self, source='Subject: test\n\n', policy=None):
     336          self.policy = email.policy.default.clone() if policy is None else policy
     337          return email.message_from_string(source, policy=self.policy)
     338  
     339      def test_parser_propagates_policy_to_message(self):
     340          msg = self._make_msg()
     341          self.assertIs(msg.policy, self.policy)
     342  
     343      def test_parser_propagates_policy_to_sub_messages(self):
     344          msg = self._make_msg(textwrap.dedent("""\
     345              Subject: mime test
     346              MIME-Version: 1.0
     347              Content-Type: multipart/mixed, boundary="XXX"
     348  
     349              --XXX
     350              Content-Type: text/plain
     351  
     352              test
     353              --XXX
     354              Content-Type: text/plain
     355  
     356              test2
     357              --XXX--
     358              """))
     359          for part in msg.walk():
     360              self.assertIs(part.policy, self.policy)
     361  
     362      def test_message_policy_propagates_to_generator(self):
     363          msg = self._make_msg("Subject: test\nTo: foo\n\n",
     364                               policy=email.policy.default.clone(linesep='X'))
     365          s = io.StringIO()
     366          g = email.generator.Generator(s)
     367          g.flatten(msg)
     368          self.assertEqual(s.getvalue(), "Subject: testXTo: fooXX")
     369  
     370      def test_message_policy_used_by_as_string(self):
     371          msg = self._make_msg("Subject: test\nTo: foo\n\n",
     372                               policy=email.policy.default.clone(linesep='X'))
     373          self.assertEqual(msg.as_string(), "Subject: testXTo: fooXX")
     374  
     375  
     376  class ESC[4;38;5;81mTestConcretePolicies(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     377  
     378      def test_header_store_parse_rejects_newlines(self):
     379          instance = email.policy.EmailPolicy()
     380          self.assertRaises(ValueError,
     381                            instance.header_store_parse,
     382                            'From', 'spam\negg@foo.py')
     383  
     384  
     385  if __name__ == '__main__':
     386      unittest.main()