(root)/
Python-3.11.7/
Lib/
test/
test_email/
test_defect_handling.py
       1  import textwrap
       2  import unittest
       3  import contextlib
       4  from email import policy
       5  from email import errors
       6  from test.test_email import TestEmailBase
       7  
       8  
       9  class ESC[4;38;5;81mTestDefectsBase:
      10  
      11      policy = policy.default
      12      raise_expected = False
      13  
      14      @contextlib.contextmanager
      15      def _raise_point(self, defect):
      16          yield
      17  
      18      def test_same_boundary_inner_outer(self):
      19          source = textwrap.dedent("""\
      20              Subject: XX
      21              From: xx@xx.dk
      22              To: XX
      23              Mime-version: 1.0
      24              Content-type: multipart/mixed;
      25                 boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
      26  
      27              --MS_Mac_OE_3071477847_720252_MIME_Part
      28              Content-type: multipart/alternative;
      29                 boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
      30  
      31              --MS_Mac_OE_3071477847_720252_MIME_Part
      32              Content-type: text/plain; charset="ISO-8859-1"
      33              Content-transfer-encoding: quoted-printable
      34  
      35              text
      36  
      37              --MS_Mac_OE_3071477847_720252_MIME_Part
      38              Content-type: text/html; charset="ISO-8859-1"
      39              Content-transfer-encoding: quoted-printable
      40  
      41              <HTML></HTML>
      42  
      43              --MS_Mac_OE_3071477847_720252_MIME_Part--
      44  
      45              --MS_Mac_OE_3071477847_720252_MIME_Part
      46              Content-type: image/gif; name="xx.gif";
      47              Content-disposition: attachment
      48              Content-transfer-encoding: base64
      49  
      50              Some removed base64 encoded chars.
      51  
      52              --MS_Mac_OE_3071477847_720252_MIME_Part--
      53  
      54              """)
      55          # XXX better would be to actually detect the duplicate.
      56          with self._raise_point(errors.StartBoundaryNotFoundDefect):
      57              msg = self._str_msg(source)
      58          if self.raise_expected: return
      59          inner = msg.get_payload(0)
      60          self.assertTrue(hasattr(inner, 'defects'))
      61          self.assertEqual(len(self.get_defects(inner)), 1)
      62          self.assertIsInstance(self.get_defects(inner)[0],
      63                                errors.StartBoundaryNotFoundDefect)
      64  
      65      def test_multipart_no_boundary(self):
      66          source = textwrap.dedent("""\
      67              Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800)
      68              From: foobar
      69              Subject: broken mail
      70              MIME-Version: 1.0
      71              Content-Type: multipart/report; report-type=delivery-status;
      72  
      73              --JAB03225.986577786/zinfandel.lacita.com
      74  
      75              One part
      76  
      77              --JAB03225.986577786/zinfandel.lacita.com
      78              Content-Type: message/delivery-status
      79  
      80              Header: Another part
      81  
      82              --JAB03225.986577786/zinfandel.lacita.com--
      83              """)
      84          with self._raise_point(errors.NoBoundaryInMultipartDefect):
      85              msg = self._str_msg(source)
      86          if self.raise_expected: return
      87          self.assertIsInstance(msg.get_payload(), str)
      88          self.assertEqual(len(self.get_defects(msg)), 2)
      89          self.assertIsInstance(self.get_defects(msg)[0],
      90                                errors.NoBoundaryInMultipartDefect)
      91          self.assertIsInstance(self.get_defects(msg)[1],
      92                                errors.MultipartInvariantViolationDefect)
      93  
      94      multipart_msg = textwrap.dedent("""\
      95          Date: Wed, 14 Nov 2007 12:56:23 GMT
      96          From: foo@bar.invalid
      97          To: foo@bar.invalid
      98          Subject: Content-Transfer-Encoding: base64 and multipart
      99          MIME-Version: 1.0
     100          Content-Type: multipart/mixed;
     101              boundary="===============3344438784458119861=="{}
     102  
     103          --===============3344438784458119861==
     104          Content-Type: text/plain
     105  
     106          Test message
     107  
     108          --===============3344438784458119861==
     109          Content-Type: application/octet-stream
     110          Content-Transfer-Encoding: base64
     111  
     112          YWJj
     113  
     114          --===============3344438784458119861==--
     115          """)
     116  
     117      def test_multipart_invalid_cte(self):
     118          with self._raise_point(
     119                  errors.InvalidMultipartContentTransferEncodingDefect):
     120              msg = self._str_msg(
     121                      self.multipart_msg.format(
     122                          "\nContent-Transfer-Encoding: base64"))
     123          if self.raise_expected: return
     124          self.assertEqual(len(self.get_defects(msg)), 1)
     125          self.assertIsInstance(self.get_defects(msg)[0],
     126              errors.InvalidMultipartContentTransferEncodingDefect)
     127  
     128      def test_multipart_no_cte_no_defect(self):
     129          if self.raise_expected: return
     130          msg = self._str_msg(self.multipart_msg.format(''))
     131          self.assertEqual(len(self.get_defects(msg)), 0)
     132  
     133      def test_multipart_valid_cte_no_defect(self):
     134          if self.raise_expected: return
     135          for cte in ('7bit', '8bit', 'BINary'):
     136              msg = self._str_msg(
     137                  self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte))
     138              self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte)
     139  
     140      def test_lying_multipart(self):
     141          source = textwrap.dedent("""\
     142              From: "Allison Dunlap" <xxx@example.com>
     143              To: yyy@example.com
     144              Subject: 64423
     145              Date: Sun, 11 Jul 2004 16:09:27 -0300
     146              MIME-Version: 1.0
     147              Content-Type: multipart/alternative;
     148  
     149              Blah blah blah
     150              """)
     151          with self._raise_point(errors.NoBoundaryInMultipartDefect):
     152              msg = self._str_msg(source)
     153          if self.raise_expected: return
     154          self.assertTrue(hasattr(msg, 'defects'))
     155          self.assertEqual(len(self.get_defects(msg)), 2)
     156          self.assertIsInstance(self.get_defects(msg)[0],
     157                                errors.NoBoundaryInMultipartDefect)
     158          self.assertIsInstance(self.get_defects(msg)[1],
     159                                errors.MultipartInvariantViolationDefect)
     160  
     161      def test_missing_start_boundary(self):
     162          source = textwrap.dedent("""\
     163              Content-Type: multipart/mixed; boundary="AAA"
     164              From: Mail Delivery Subsystem <xxx@example.com>
     165              To: yyy@example.com
     166  
     167              --AAA
     168  
     169              Stuff
     170  
     171              --AAA
     172              Content-Type: message/rfc822
     173  
     174              From: webmaster@python.org
     175              To: zzz@example.com
     176              Content-Type: multipart/mixed; boundary="BBB"
     177  
     178              --BBB--
     179  
     180              --AAA--
     181  
     182              """)
     183          # The message structure is:
     184          #
     185          # multipart/mixed
     186          #    text/plain
     187          #    message/rfc822
     188          #        multipart/mixed [*]
     189          #
     190          # [*] This message is missing its start boundary
     191          with self._raise_point(errors.StartBoundaryNotFoundDefect):
     192              outer = self._str_msg(source)
     193          if self.raise_expected: return
     194          bad = outer.get_payload(1).get_payload(0)
     195          self.assertEqual(len(self.get_defects(bad)), 1)
     196          self.assertIsInstance(self.get_defects(bad)[0],
     197                                errors.StartBoundaryNotFoundDefect)
     198  
     199      def test_first_line_is_continuation_header(self):
     200          with self._raise_point(errors.FirstHeaderLineIsContinuationDefect):
     201              msg = self._str_msg(' Line 1\nSubject: test\n\nbody')
     202          if self.raise_expected: return
     203          self.assertEqual(msg.keys(), ['Subject'])
     204          self.assertEqual(msg.get_payload(), 'body')
     205          self.assertEqual(len(self.get_defects(msg)), 1)
     206          self.assertDefectsEqual(self.get_defects(msg),
     207                                   [errors.FirstHeaderLineIsContinuationDefect])
     208          self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n')
     209  
     210      def test_missing_header_body_separator(self):
     211          # Our heuristic if we see a line that doesn't look like a header (no
     212          # leading whitespace but no ':') is to assume that the blank line that
     213          # separates the header from the body is missing, and to stop parsing
     214          # headers and start parsing the body.
     215          with self._raise_point(errors.MissingHeaderBodySeparatorDefect):
     216              msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n')
     217          if self.raise_expected: return
     218          self.assertEqual(msg.keys(), ['Subject'])
     219          self.assertEqual(msg.get_payload(), 'not a header\nTo: abc\n\nb\n')
     220          self.assertDefectsEqual(self.get_defects(msg),
     221                                  [errors.MissingHeaderBodySeparatorDefect])
     222  
     223      def test_bad_padding_in_base64_payload(self):
     224          source = textwrap.dedent("""\
     225              Subject: test
     226              MIME-Version: 1.0
     227              Content-Type: text/plain; charset="utf-8"
     228              Content-Transfer-Encoding: base64
     229  
     230              dmk
     231              """)
     232          msg = self._str_msg(source)
     233          with self._raise_point(errors.InvalidBase64PaddingDefect):
     234              payload = msg.get_payload(decode=True)
     235          if self.raise_expected: return
     236          self.assertEqual(payload, b'vi')
     237          self.assertDefectsEqual(self.get_defects(msg),
     238                                  [errors.InvalidBase64PaddingDefect])
     239  
     240      def test_invalid_chars_in_base64_payload(self):
     241          source = textwrap.dedent("""\
     242              Subject: test
     243              MIME-Version: 1.0
     244              Content-Type: text/plain; charset="utf-8"
     245              Content-Transfer-Encoding: base64
     246  
     247              dm\x01k===
     248              """)
     249          msg = self._str_msg(source)
     250          with self._raise_point(errors.InvalidBase64CharactersDefect):
     251              payload = msg.get_payload(decode=True)
     252          if self.raise_expected: return
     253          self.assertEqual(payload, b'vi')
     254          self.assertDefectsEqual(self.get_defects(msg),
     255                                  [errors.InvalidBase64CharactersDefect])
     256  
     257      def test_invalid_length_of_base64_payload(self):
     258          source = textwrap.dedent("""\
     259              Subject: test
     260              MIME-Version: 1.0
     261              Content-Type: text/plain; charset="utf-8"
     262              Content-Transfer-Encoding: base64
     263  
     264              abcde
     265              """)
     266          msg = self._str_msg(source)
     267          with self._raise_point(errors.InvalidBase64LengthDefect):
     268              payload = msg.get_payload(decode=True)
     269          if self.raise_expected: return
     270          self.assertEqual(payload, b'abcde')
     271          self.assertDefectsEqual(self.get_defects(msg),
     272                                  [errors.InvalidBase64LengthDefect])
     273  
     274      def test_missing_ending_boundary(self):
     275          source = textwrap.dedent("""\
     276              To: 1@harrydomain4.com
     277              Subject: Fwd: 1
     278              MIME-Version: 1.0
     279              Content-Type: multipart/alternative;
     280               boundary="------------000101020201080900040301"
     281  
     282              --------------000101020201080900040301
     283              Content-Type: text/plain; charset=ISO-8859-1
     284              Content-Transfer-Encoding: 7bit
     285  
     286              Alternative 1
     287  
     288              --------------000101020201080900040301
     289              Content-Type: text/html; charset=ISO-8859-1
     290              Content-Transfer-Encoding: 7bit
     291  
     292              Alternative 2
     293  
     294              """)
     295          with self._raise_point(errors.CloseBoundaryNotFoundDefect):
     296              msg = self._str_msg(source)
     297          if self.raise_expected: return
     298          self.assertEqual(len(msg.get_payload()), 2)
     299          self.assertEqual(msg.get_payload(1).get_payload(), 'Alternative 2\n')
     300          self.assertDefectsEqual(self.get_defects(msg),
     301                                  [errors.CloseBoundaryNotFoundDefect])
     302  
     303  
     304  class ESC[4;38;5;81mTestDefectDetection(ESC[4;38;5;149mTestDefectsBase, ESC[4;38;5;149mTestEmailBase):
     305  
     306      def get_defects(self, obj):
     307          return obj.defects
     308  
     309  
     310  class ESC[4;38;5;81mTestDefectCapture(ESC[4;38;5;149mTestDefectsBase, ESC[4;38;5;149mTestEmailBase):
     311  
     312      class ESC[4;38;5;81mCapturePolicy(ESC[4;38;5;149mpolicyESC[4;38;5;149m.ESC[4;38;5;149mEmailPolicy):
     313          captured = None
     314          def register_defect(self, obj, defect):
     315              self.captured.append(defect)
     316  
     317      def setUp(self):
     318          self.policy = self.CapturePolicy(captured=list())
     319  
     320      def get_defects(self, obj):
     321          return self.policy.captured
     322  
     323  
     324  class ESC[4;38;5;81mTestDefectRaising(ESC[4;38;5;149mTestDefectsBase, ESC[4;38;5;149mTestEmailBase):
     325  
     326      policy = TestDefectsBase.policy
     327      policy = policy.clone(raise_on_defect=True)
     328      raise_expected = True
     329  
     330      @contextlib.contextmanager
     331      def _raise_point(self, defect):
     332          with self.assertRaises(defect):
     333              yield
     334  
     335  
     336  if __name__ == '__main__':
     337      unittest.main()