1  import unittest
       2  from test.test_email import TestEmailBase, parameterize
       3  import textwrap
       4  from email import policy
       5  from email.message import EmailMessage
       6  from email.contentmanager import ContentManager, raw_data_manager
       7  
       8  
       9  @parameterize
      10  class ESC[4;38;5;81mTestContentManager(ESC[4;38;5;149mTestEmailBase):
      11  
      12      policy = policy.default
      13      message = EmailMessage
      14  
      15      get_key_params = {
      16          'full_type':        (1, 'text/plain',),
      17          'maintype_only':    (2, 'text',),
      18          'null_key':         (3, '',),
      19          }
      20  
      21      def get_key_as_get_content_key(self, order, key):
      22          def foo_getter(msg, foo=None):
      23              bar = msg['X-Bar-Header']
      24              return foo, bar
      25          cm = ContentManager()
      26          cm.add_get_handler(key, foo_getter)
      27          m = self._make_message()
      28          m['Content-Type'] = 'text/plain'
      29          m['X-Bar-Header'] = 'foo'
      30          self.assertEqual(cm.get_content(m, foo='bar'), ('bar', 'foo'))
      31  
      32      def get_key_as_get_content_key_order(self, order, key):
      33          def bar_getter(msg):
      34              return msg['X-Bar-Header']
      35          def foo_getter(msg):
      36              return msg['X-Foo-Header']
      37          cm = ContentManager()
      38          cm.add_get_handler(key, foo_getter)
      39          for precedence, key in self.get_key_params.values():
      40              if precedence > order:
      41                  cm.add_get_handler(key, bar_getter)
      42          m = self._make_message()
      43          m['Content-Type'] = 'text/plain'
      44          m['X-Bar-Header'] = 'bar'
      45          m['X-Foo-Header'] = 'foo'
      46          self.assertEqual(cm.get_content(m), ('foo'))
      47  
      48      def test_get_content_raises_if_unknown_mimetype_and_no_default(self):
      49          cm = ContentManager()
      50          m = self._make_message()
      51          m['Content-Type'] = 'text/plain'
      52          with self.assertRaisesRegex(KeyError, 'text/plain'):
      53              cm.get_content(m)
      54  
      55      class ESC[4;38;5;81mBaseThing(ESC[4;38;5;149mstr):
      56          pass
      57      baseobject_full_path = __name__ + '.' + 'TestContentManager.BaseThing'
      58      class ESC[4;38;5;81mThing(ESC[4;38;5;149mBaseThing):
      59          pass
      60      testobject_full_path = __name__ + '.' + 'TestContentManager.Thing'
      61  
      62      set_key_params = {
      63          'type':             (0,  Thing,),
      64          'full_path':        (1,  testobject_full_path,),
      65          'qualname':         (2,  'TestContentManager.Thing',),
      66          'name':             (3,  'Thing',),
      67          'base_type':        (4,  BaseThing,),
      68          'base_full_path':   (5,  baseobject_full_path,),
      69          'base_qualname':    (6,  'TestContentManager.BaseThing',),
      70          'base_name':        (7,  'BaseThing',),
      71          'str_type':         (8,  str,),
      72          'str_full_path':    (9,  'builtins.str',),
      73          'str_name':         (10, 'str',),   # str name and qualname are the same
      74          'null_key':         (11, None,),
      75          }
      76  
      77      def set_key_as_set_content_key(self, order, key):
      78          def foo_setter(msg, obj, foo=None):
      79              msg['X-Foo-Header'] = foo
      80              msg.set_payload(obj)
      81          cm = ContentManager()
      82          cm.add_set_handler(key, foo_setter)
      83          m = self._make_message()
      84          msg_obj = self.Thing()
      85          cm.set_content(m, msg_obj, foo='bar')
      86          self.assertEqual(m['X-Foo-Header'], 'bar')
      87          self.assertEqual(m.get_payload(), msg_obj)
      88  
      89      def set_key_as_set_content_key_order(self, order, key):
      90          def foo_setter(msg, obj):
      91              msg['X-FooBar-Header'] = 'foo'
      92              msg.set_payload(obj)
      93          def bar_setter(msg, obj):
      94              msg['X-FooBar-Header'] = 'bar'
      95          cm = ContentManager()
      96          cm.add_set_handler(key, foo_setter)
      97          for precedence, key in self.get_key_params.values():
      98              if precedence > order:
      99                  cm.add_set_handler(key, bar_setter)
     100          m = self._make_message()
     101          msg_obj = self.Thing()
     102          cm.set_content(m, msg_obj)
     103          self.assertEqual(m['X-FooBar-Header'], 'foo')
     104          self.assertEqual(m.get_payload(), msg_obj)
     105  
     106      def test_set_content_raises_if_unknown_type_and_no_default(self):
     107          cm = ContentManager()
     108          m = self._make_message()
     109          msg_obj = self.Thing()
     110          with self.assertRaisesRegex(KeyError, self.testobject_full_path):
     111              cm.set_content(m, msg_obj)
     112  
     113      def test_set_content_raises_if_called_on_multipart(self):
     114          cm = ContentManager()
     115          m = self._make_message()
     116          m['Content-Type'] = 'multipart/foo'
     117          with self.assertRaises(TypeError):
     118              cm.set_content(m, 'test')
     119  
     120      def test_set_content_calls_clear_content(self):
     121          m = self._make_message()
     122          m['Content-Foo'] = 'bar'
     123          m['Content-Type'] = 'text/html'
     124          m['To'] = 'test'
     125          m.set_payload('abc')
     126          cm = ContentManager()
     127          cm.add_set_handler(str, lambda *args, **kw: None)
     128          m.set_content('xyz', content_manager=cm)
     129          self.assertIsNone(m['Content-Foo'])
     130          self.assertIsNone(m['Content-Type'])
     131          self.assertEqual(m['To'], 'test')
     132          self.assertIsNone(m.get_payload())
     133  
     134  
     135  @parameterize
     136  class ESC[4;38;5;81mTestRawDataManager(ESC[4;38;5;149mTestEmailBase):
     137      # Note: these tests are dependent on the order in which headers are added
     138      # to the message objects by the code.  There's no defined ordering in
     139      # RFC5322/MIME, so this makes the tests more fragile than the standards
     140      # require.  However, if the header order changes it is best to understand
     141      # *why*, and make sure it isn't a subtle bug in whatever change was
     142      # applied.
     143  
     144      policy = policy.default.clone(max_line_length=60,
     145                                    content_manager=raw_data_manager)
     146      message = EmailMessage
     147  
     148      def test_get_text_plain(self):
     149          m = self._str_msg(textwrap.dedent("""\
     150              Content-Type: text/plain
     151  
     152              Basic text.
     153              """))
     154          self.assertEqual(raw_data_manager.get_content(m), "Basic text.\n")
     155  
     156      def test_get_text_html(self):
     157          m = self._str_msg(textwrap.dedent("""\
     158              Content-Type: text/html
     159  
     160              <p>Basic text.</p>
     161              """))
     162          self.assertEqual(raw_data_manager.get_content(m),
     163                           "<p>Basic text.</p>\n")
     164  
     165      def test_get_text_plain_latin1(self):
     166          m = self._bytes_msg(textwrap.dedent("""\
     167              Content-Type: text/plain; charset=latin1
     168  
     169              Basìc tëxt.
     170              """).encode('latin1'))
     171          self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
     172  
     173      def test_get_text_plain_latin1_quoted_printable(self):
     174          m = self._str_msg(textwrap.dedent("""\
     175              Content-Type: text/plain; charset="latin-1"
     176              Content-Transfer-Encoding: quoted-printable
     177  
     178              Bas=ECc t=EBxt.
     179              """))
     180          self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
     181  
     182      def test_get_text_plain_utf8_base64(self):
     183          m = self._str_msg(textwrap.dedent("""\
     184              Content-Type: text/plain; charset="utf8"
     185              Content-Transfer-Encoding: base64
     186  
     187              QmFzw6xjIHTDq3h0Lgo=
     188              """))
     189          self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
     190  
     191      def test_get_text_plain_bad_utf8_quoted_printable(self):
     192          m = self._str_msg(textwrap.dedent("""\
     193              Content-Type: text/plain; charset="utf8"
     194              Content-Transfer-Encoding: quoted-printable
     195  
     196              Bas=c3=acc t=c3=abxt=fd.
     197              """))
     198          self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt�.\n")
     199  
     200      def test_get_text_plain_bad_utf8_quoted_printable_ignore_errors(self):
     201          m = self._str_msg(textwrap.dedent("""\
     202              Content-Type: text/plain; charset="utf8"
     203              Content-Transfer-Encoding: quoted-printable
     204  
     205              Bas=c3=acc t=c3=abxt=fd.
     206              """))
     207          self.assertEqual(raw_data_manager.get_content(m, errors='ignore'),
     208                           "Basìc tëxt.\n")
     209  
     210      def test_get_text_plain_utf8_base64_recoverable_bad_CTE_data(self):
     211          m = self._str_msg(textwrap.dedent("""\
     212              Content-Type: text/plain; charset="utf8"
     213              Content-Transfer-Encoding: base64
     214  
     215              QmFzw6xjIHTDq3h0Lgo\xFF=
     216              """))
     217          self.assertEqual(raw_data_manager.get_content(m, errors='ignore'),
     218                           "Basìc tëxt.\n")
     219  
     220      def test_get_text_invalid_keyword(self):
     221          m = self._str_msg(textwrap.dedent("""\
     222              Content-Type: text/plain
     223  
     224              Basic text.
     225              """))
     226          with self.assertRaises(TypeError):
     227              raw_data_manager.get_content(m, foo='ignore')
     228  
     229      def test_get_non_text(self):
     230          template = textwrap.dedent("""\
     231              Content-Type: {}
     232              Content-Transfer-Encoding: base64
     233  
     234              Ym9ndXMgZGF0YQ==
     235              """)
     236          for maintype in 'audio image video application'.split():
     237              with self.subTest(maintype=maintype):
     238                  m = self._str_msg(template.format(maintype+'/foo'))
     239                  self.assertEqual(raw_data_manager.get_content(m), b"bogus data")
     240  
     241      def test_get_non_text_invalid_keyword(self):
     242          m = self._str_msg(textwrap.dedent("""\
     243              Content-Type: image/jpg
     244              Content-Transfer-Encoding: base64
     245  
     246              Ym9ndXMgZGF0YQ==
     247              """))
     248          with self.assertRaises(TypeError):
     249              raw_data_manager.get_content(m, errors='ignore')
     250  
     251      def test_get_raises_on_multipart(self):
     252          m = self._str_msg(textwrap.dedent("""\
     253              Content-Type: multipart/mixed; boundary="==="
     254  
     255              --===
     256              --===--
     257              """))
     258          with self.assertRaises(KeyError):
     259              raw_data_manager.get_content(m)
     260  
     261      def test_get_message_rfc822_and_external_body(self):
     262          template = textwrap.dedent("""\
     263              Content-Type: message/{}
     264  
     265              To: foo@example.com
     266              From: bar@example.com
     267              Subject: example
     268  
     269              an example message
     270              """)
     271          for subtype in 'rfc822 external-body'.split():
     272              with self.subTest(subtype=subtype):
     273                  m = self._str_msg(template.format(subtype))
     274                  sub_msg = raw_data_manager.get_content(m)
     275                  self.assertIsInstance(sub_msg, self.message)
     276                  self.assertEqual(raw_data_manager.get_content(sub_msg),
     277                                   "an example message\n")
     278                  self.assertEqual(sub_msg['to'], 'foo@example.com')
     279                  self.assertEqual(sub_msg['from'].addresses[0].username, 'bar')
     280  
     281      def test_get_message_non_rfc822_or_external_body_yields_bytes(self):
     282          m = self._str_msg(textwrap.dedent("""\
     283              Content-Type: message/partial
     284  
     285              To: foo@example.com
     286              From: bar@example.com
     287              Subject: example
     288  
     289              The real body is in another message.
     290              """))
     291          self.assertEqual(raw_data_manager.get_content(m)[:10], b'To: foo@ex')
     292  
     293      def test_set_text_plain(self):
     294          m = self._make_message()
     295          content = "Simple message.\n"
     296          raw_data_manager.set_content(m, content)
     297          self.assertEqual(str(m), textwrap.dedent("""\
     298              Content-Type: text/plain; charset="utf-8"
     299              Content-Transfer-Encoding: 7bit
     300  
     301              Simple message.
     302              """))
     303          self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
     304          self.assertEqual(m.get_content(), content)
     305  
     306      def test_set_text_plain_null(self):
     307          m = self._make_message()
     308          content = ''
     309          raw_data_manager.set_content(m, content)
     310          self.assertEqual(str(m), textwrap.dedent("""\
     311              Content-Type: text/plain; charset="utf-8"
     312              Content-Transfer-Encoding: 7bit
     313  
     314  
     315              """))
     316          self.assertEqual(m.get_payload(decode=True).decode('utf-8'), '\n')
     317          self.assertEqual(m.get_content(), '\n')
     318  
     319      def test_set_text_html(self):
     320          m = self._make_message()
     321          content = "<p>Simple message.</p>\n"
     322          raw_data_manager.set_content(m, content, subtype='html')
     323          self.assertEqual(str(m), textwrap.dedent("""\
     324              Content-Type: text/html; charset="utf-8"
     325              Content-Transfer-Encoding: 7bit
     326  
     327              <p>Simple message.</p>
     328              """))
     329          self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
     330          self.assertEqual(m.get_content(), content)
     331  
     332      def test_set_text_charset_latin_1(self):
     333          m = self._make_message()
     334          content = "Simple message.\n"
     335          raw_data_manager.set_content(m, content, charset='latin-1')
     336          self.assertEqual(str(m), textwrap.dedent("""\
     337              Content-Type: text/plain; charset="iso-8859-1"
     338              Content-Transfer-Encoding: 7bit
     339  
     340              Simple message.
     341              """))
     342          self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
     343          self.assertEqual(m.get_content(), content)
     344  
     345      def test_set_text_plain_long_line_heuristics(self):
     346          m = self._make_message()
     347          content = ("Simple but long message that is over 78 characters"
     348                     " long to force transfer encoding.\n")
     349          raw_data_manager.set_content(m, content)
     350          self.assertEqual(str(m), textwrap.dedent("""\
     351              Content-Type: text/plain; charset="utf-8"
     352              Content-Transfer-Encoding: quoted-printable
     353  
     354              Simple but long message that is over 78 characters long to =
     355              force transfer encoding.
     356              """))
     357          self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
     358          self.assertEqual(m.get_content(), content)
     359  
     360      def test_set_text_short_line_minimal_non_ascii_heuristics(self):
     361          m = self._make_message()
     362          content = "et là il est monté sur moi et il commence à m'éto.\n"
     363          raw_data_manager.set_content(m, content)
     364          self.assertEqual(bytes(m), textwrap.dedent("""\
     365              Content-Type: text/plain; charset="utf-8"
     366              Content-Transfer-Encoding: 8bit
     367  
     368              et là il est monté sur moi et il commence à m'éto.
     369              """).encode('utf-8'))
     370          self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
     371          self.assertEqual(m.get_content(), content)
     372  
     373      def test_set_text_long_line_minimal_non_ascii_heuristics(self):
     374          m = self._make_message()
     375          content = ("j'ai un problème de python. il est sorti de son"
     376                     " vivarium.  et là il est monté sur moi et il commence"
     377                     " à m'éto.\n")
     378          raw_data_manager.set_content(m, content)
     379          self.assertEqual(bytes(m), textwrap.dedent("""\
     380              Content-Type: text/plain; charset="utf-8"
     381              Content-Transfer-Encoding: quoted-printable
     382  
     383              j'ai un probl=C3=A8me de python. il est sorti de son vivari=
     384              um.  et l=C3=A0 il est mont=C3=A9 sur moi et il commence =
     385              =C3=A0 m'=C3=A9to.
     386              """).encode('utf-8'))
     387          self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
     388          self.assertEqual(m.get_content(), content)
     389  
     390      def test_set_text_11_lines_long_line_minimal_non_ascii_heuristics(self):
     391          m = self._make_message()
     392          content = '\n'*10 + (
     393                    "j'ai un problème de python. il est sorti de son"
     394                    " vivarium.  et là il est monté sur moi et il commence"
     395                    " à m'éto.\n")
     396          raw_data_manager.set_content(m, content)
     397          self.assertEqual(bytes(m), textwrap.dedent("""\
     398              Content-Type: text/plain; charset="utf-8"
     399              Content-Transfer-Encoding: quoted-printable
     400              """ + '\n'*10 + """
     401              j'ai un probl=C3=A8me de python. il est sorti de son vivari=
     402              um.  et l=C3=A0 il est mont=C3=A9 sur moi et il commence =
     403              =C3=A0 m'=C3=A9to.
     404              """).encode('utf-8'))
     405          self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
     406          self.assertEqual(m.get_content(), content)
     407  
     408      def test_set_text_maximal_non_ascii_heuristics(self):
     409          m = self._make_message()
     410          content = "áàäéèęöő.\n"
     411          raw_data_manager.set_content(m, content)
     412          self.assertEqual(bytes(m), textwrap.dedent("""\
     413              Content-Type: text/plain; charset="utf-8"
     414              Content-Transfer-Encoding: 8bit
     415  
     416              áàäéèęöő.
     417              """).encode('utf-8'))
     418          self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
     419          self.assertEqual(m.get_content(), content)
     420  
     421      def test_set_text_11_lines_maximal_non_ascii_heuristics(self):
     422          m = self._make_message()
     423          content = '\n'*10 + "áàäéèęöő.\n"
     424          raw_data_manager.set_content(m, content)
     425          self.assertEqual(bytes(m), textwrap.dedent("""\
     426              Content-Type: text/plain; charset="utf-8"
     427              Content-Transfer-Encoding: 8bit
     428              """ + '\n'*10 + """
     429              áàäéèęöő.
     430              """).encode('utf-8'))
     431          self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
     432          self.assertEqual(m.get_content(), content)
     433  
     434      def test_set_text_long_line_maximal_non_ascii_heuristics(self):
     435          m = self._make_message()
     436          content = ("áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
     437                     "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
     438                     "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
     439          raw_data_manager.set_content(m, content)
     440          self.assertEqual(bytes(m), textwrap.dedent("""\
     441              Content-Type: text/plain; charset="utf-8"
     442              Content-Transfer-Encoding: base64
     443  
     444              w6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOoxJnD
     445              tsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOo
     446              xJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TD
     447              qcOoxJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOg
     448              w6TDqcOoxJnDtsWRLgo=
     449              """).encode('utf-8'))
     450          self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
     451          self.assertEqual(m.get_content(), content)
     452  
     453      def test_set_text_11_lines_long_line_maximal_non_ascii_heuristics(self):
     454          # Yes, it chooses "wrong" here.  It's a heuristic.  So this result
     455          # could change if we come up with a better heuristic.
     456          m = self._make_message()
     457          content = ('\n'*10 +
     458                     "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
     459                     "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
     460                     "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
     461          raw_data_manager.set_content(m, "\n"*10 +
     462                                          "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
     463                                          "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
     464                                          "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
     465          self.assertEqual(bytes(m), textwrap.dedent("""\
     466              Content-Type: text/plain; charset="utf-8"
     467              Content-Transfer-Encoding: quoted-printable
     468              """ + '\n'*10 + """
     469              =C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=
     470              =A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=
     471              =C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=
     472              =A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=
     473              =C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=
     474              =91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=
     475              =C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=
     476              =A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=
     477              =C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=
     478              =99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=
     479              =C5=91.
     480              """).encode('utf-8'))
     481          self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
     482          self.assertEqual(m.get_content(), content)
     483  
     484      def test_set_text_non_ascii_with_cte_7bit_raises(self):
     485          m = self._make_message()
     486          with self.assertRaises(UnicodeError):
     487              raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit')
     488  
     489      def test_set_text_non_ascii_with_charset_ascii_raises(self):
     490          m = self._make_message()
     491          with self.assertRaises(UnicodeError):
     492              raw_data_manager.set_content(m,"áàäéèęöő.\n", charset='ascii')
     493  
     494      def test_set_text_non_ascii_with_cte_7bit_and_charset_ascii_raises(self):
     495          m = self._make_message()
     496          with self.assertRaises(UnicodeError):
     497              raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit', charset='ascii')
     498  
     499      def test_set_message(self):
     500          m = self._make_message()
     501          m['Subject'] = "Forwarded message"
     502          content = self._make_message()
     503          content['To'] = 'python@vivarium.org'
     504          content['From'] = 'police@monty.org'
     505          content['Subject'] = "get back in your box"
     506          content.set_content("Or face the comfy chair.")
     507          raw_data_manager.set_content(m, content)
     508          self.assertEqual(str(m), textwrap.dedent("""\
     509              Subject: Forwarded message
     510              Content-Type: message/rfc822
     511              Content-Transfer-Encoding: 8bit
     512  
     513              To: python@vivarium.org
     514              From: police@monty.org
     515              Subject: get back in your box
     516              Content-Type: text/plain; charset="utf-8"
     517              Content-Transfer-Encoding: 7bit
     518              MIME-Version: 1.0
     519  
     520              Or face the comfy chair.
     521              """))
     522          payload = m.get_payload(0)
     523          self.assertIsInstance(payload, self.message)
     524          self.assertEqual(str(payload), str(content))
     525          self.assertIsInstance(m.get_content(), self.message)
     526          self.assertEqual(str(m.get_content()), str(content))
     527  
     528      def test_set_message_with_non_ascii_and_coercion_to_7bit(self):
     529          m = self._make_message()
     530          m['Subject'] = "Escape report"
     531          content = self._make_message()
     532          content['To'] = 'police@monty.org'
     533          content['From'] = 'victim@monty.org'
     534          content['Subject'] = "Help"
     535          content.set_content("j'ai un problème de python. il est sorti de son"
     536                              " vivarium.")
     537          raw_data_manager.set_content(m, content)
     538          self.assertEqual(bytes(m), textwrap.dedent("""\
     539              Subject: Escape report
     540              Content-Type: message/rfc822
     541              Content-Transfer-Encoding: 8bit
     542  
     543              To: police@monty.org
     544              From: victim@monty.org
     545              Subject: Help
     546              Content-Type: text/plain; charset="utf-8"
     547              Content-Transfer-Encoding: 8bit
     548              MIME-Version: 1.0
     549  
     550              j'ai un problème de python. il est sorti de son vivarium.
     551              """).encode('utf-8'))
     552          # The choice of base64 for the body encoding is because generator
     553          # doesn't bother with heuristics and uses it unconditionally for utf-8
     554          # text.
     555          # XXX: the first cte should be 7bit, too...that's a generator bug.
     556          # XXX: the line length in the body also looks like a generator bug.
     557          self.assertEqual(m.as_string(maxheaderlen=self.policy.max_line_length),
     558                           textwrap.dedent("""\
     559              Subject: Escape report
     560              Content-Type: message/rfc822
     561              Content-Transfer-Encoding: 8bit
     562  
     563              To: police@monty.org
     564              From: victim@monty.org
     565              Subject: Help
     566              Content-Type: text/plain; charset="utf-8"
     567              Content-Transfer-Encoding: base64
     568              MIME-Version: 1.0
     569  
     570              aidhaSB1biBwcm9ibMOobWUgZGUgcHl0aG9uLiBpbCBlc3Qgc29ydGkgZGUgc29uIHZpdmFyaXVt
     571              Lgo=
     572              """))
     573          self.assertIsInstance(m.get_content(), self.message)
     574          self.assertEqual(str(m.get_content()), str(content))
     575  
     576      def test_set_message_invalid_cte_raises(self):
     577          m = self._make_message()
     578          content = self._make_message()
     579          for cte in 'quoted-printable base64'.split():
     580              for subtype in 'rfc822 external-body'.split():
     581                  with self.subTest(cte=cte, subtype=subtype):
     582                      with self.assertRaises(ValueError) as ar:
     583                          m.set_content(content, subtype, cte=cte)
     584                      exc = str(ar.exception)
     585                      self.assertIn(cte, exc)
     586                      self.assertIn(subtype, exc)
     587          subtype = 'external-body'
     588          for cte in '8bit binary'.split():
     589              with self.subTest(cte=cte, subtype=subtype):
     590                  with self.assertRaises(ValueError) as ar:
     591                      m.set_content(content, subtype, cte=cte)
     592                  exc = str(ar.exception)
     593                  self.assertIn(cte, exc)
     594                  self.assertIn(subtype, exc)
     595  
     596      def test_set_image_jpg(self):
     597          for content in (b"bogus content",
     598                          bytearray(b"bogus content"),
     599                          memoryview(b"bogus content")):
     600              with self.subTest(content=content):
     601                  m = self._make_message()
     602                  raw_data_manager.set_content(m, content, 'image', 'jpeg')
     603                  self.assertEqual(str(m), textwrap.dedent("""\
     604                      Content-Type: image/jpeg
     605                      Content-Transfer-Encoding: base64
     606  
     607                      Ym9ndXMgY29udGVudA==
     608                      """))
     609                  self.assertEqual(m.get_payload(decode=True), content)
     610                  self.assertEqual(m.get_content(), content)
     611  
     612      def test_set_audio_aif_with_quoted_printable_cte(self):
     613          # Why you would use qp, I don't know, but it is technically supported.
     614          # XXX: the incorrect line length is because binascii.b2a_qp doesn't
     615          # support a line length parameter, but we must use it to get newline
     616          # encoding.
     617          # XXX: what about that lack of tailing newline?  Do we actually handle
     618          # that correctly in all cases?  That is, if the *source* has an
     619          # unencoded newline, do we add an extra newline to the returned payload
     620          # or not?  And can that actually be disambiguated based on the RFC?
     621          m = self._make_message()
     622          content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100
     623          m.set_content(content, 'audio', 'aif', cte='quoted-printable')
     624          self.assertEqual(bytes(m), textwrap.dedent("""\
     625              Content-Type: audio/aif
     626              Content-Transfer-Encoding: quoted-printable
     627              MIME-Version: 1.0
     628  
     629              b=FFgus=09con=0At=0Dent=20zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz=
     630              zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz""").encode('latin-1'))
     631          self.assertEqual(m.get_payload(decode=True), content)
     632          self.assertEqual(m.get_content(), content)
     633  
     634      def test_set_video_mpeg_with_binary_cte(self):
     635          m = self._make_message()
     636          content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100
     637          m.set_content(content, 'video', 'mpeg', cte='binary')
     638          self.assertEqual(bytes(m), textwrap.dedent("""\
     639              Content-Type: video/mpeg
     640              Content-Transfer-Encoding: binary
     641              MIME-Version: 1.0
     642  
     643              """).encode('ascii') +
     644              # XXX: the second \n ought to be a \r, but generator gets it wrong.
     645              # THIS MEANS WE DON'T ACTUALLY SUPPORT THE 'binary' CTE.
     646              b'b\xFFgus\tcon\nt\nent zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz' +
     647              b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz')
     648          self.assertEqual(m.get_payload(decode=True), content)
     649          self.assertEqual(m.get_content(), content)
     650  
     651      def test_set_application_octet_stream_with_8bit_cte(self):
     652          # In 8bit mode, universal line end logic applies.  It is up to the
     653          # application to make sure the lines are short enough; we don't check.
     654          m = self._make_message()
     655          content = b'b\xFFgus\tcon\nt\rent\n' + b'z'*60 + b'\n'
     656          m.set_content(content, 'application', 'octet-stream', cte='8bit')
     657          self.assertEqual(bytes(m), textwrap.dedent("""\
     658              Content-Type: application/octet-stream
     659              Content-Transfer-Encoding: 8bit
     660              MIME-Version: 1.0
     661  
     662              """).encode('ascii') +
     663              b'b\xFFgus\tcon\nt\nent\n' +
     664              b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz\n')
     665          self.assertEqual(m.get_payload(decode=True), content)
     666          self.assertEqual(m.get_content(), content)
     667  
     668      def test_set_headers_from_header_objects(self):
     669          m = self._make_message()
     670          content = "Simple message.\n"
     671          header_factory = self.policy.header_factory
     672          raw_data_manager.set_content(m, content, headers=(
     673              header_factory("To", "foo@example.com"),
     674              header_factory("From", "foo@example.com"),
     675              header_factory("Subject", "I'm talking to myself.")))
     676          self.assertEqual(str(m), textwrap.dedent("""\
     677              Content-Type: text/plain; charset="utf-8"
     678              To: foo@example.com
     679              From: foo@example.com
     680              Subject: I'm talking to myself.
     681              Content-Transfer-Encoding: 7bit
     682  
     683              Simple message.
     684              """))
     685  
     686      def test_set_headers_from_strings(self):
     687          m = self._make_message()
     688          content = "Simple message.\n"
     689          raw_data_manager.set_content(m, content, headers=(
     690              "X-Foo-Header: foo",
     691              "X-Bar-Header: bar",))
     692          self.assertEqual(str(m), textwrap.dedent("""\
     693              Content-Type: text/plain; charset="utf-8"
     694              X-Foo-Header: foo
     695              X-Bar-Header: bar
     696              Content-Transfer-Encoding: 7bit
     697  
     698              Simple message.
     699              """))
     700  
     701      def test_set_headers_with_invalid_duplicate_string_header_raises(self):
     702          m = self._make_message()
     703          content = "Simple message.\n"
     704          with self.assertRaisesRegex(ValueError, 'Content-Type'):
     705              raw_data_manager.set_content(m, content, headers=(
     706                  "Content-Type: foo/bar",)
     707                  )
     708  
     709      def test_set_headers_with_invalid_duplicate_header_header_raises(self):
     710          m = self._make_message()
     711          content = "Simple message.\n"
     712          header_factory = self.policy.header_factory
     713          with self.assertRaisesRegex(ValueError, 'Content-Type'):
     714              raw_data_manager.set_content(m, content, headers=(
     715                  header_factory("Content-Type", " foo/bar"),)
     716                  )
     717  
     718      def test_set_headers_with_defective_string_header_raises(self):
     719          m = self._make_message()
     720          content = "Simple message.\n"
     721          with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'):
     722              raw_data_manager.set_content(m, content, headers=(
     723                  'To: a@fairly@@invalid@address',)
     724                  )
     725              print(m['To'].defects)
     726  
     727      def test_set_headers_with_defective_header_header_raises(self):
     728          m = self._make_message()
     729          content = "Simple message.\n"
     730          header_factory = self.policy.header_factory
     731          with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'):
     732              raw_data_manager.set_content(m, content, headers=(
     733                  header_factory('To', 'a@fairly@@invalid@address'),)
     734                  )
     735              print(m['To'].defects)
     736  
     737      def test_set_disposition_inline(self):
     738          m = self._make_message()
     739          m.set_content('foo', disposition='inline')
     740          self.assertEqual(m['Content-Disposition'], 'inline')
     741  
     742      def test_set_disposition_attachment(self):
     743          m = self._make_message()
     744          m.set_content('foo', disposition='attachment')
     745          self.assertEqual(m['Content-Disposition'], 'attachment')
     746  
     747      def test_set_disposition_foo(self):
     748          m = self._make_message()
     749          m.set_content('foo', disposition='foo')
     750          self.assertEqual(m['Content-Disposition'], 'foo')
     751  
     752      # XXX: we should have a 'strict' policy mode (beyond raise_on_defect) that
     753      # would cause 'foo' above to raise.
     754  
     755      def test_set_filename(self):
     756          m = self._make_message()
     757          m.set_content('foo', filename='bar.txt')
     758          self.assertEqual(m['Content-Disposition'],
     759                           'attachment; filename="bar.txt"')
     760  
     761      def test_set_filename_and_disposition_inline(self):
     762          m = self._make_message()
     763          m.set_content('foo', disposition='inline', filename='bar.txt')
     764          self.assertEqual(m['Content-Disposition'], 'inline; filename="bar.txt"')
     765  
     766      def test_set_non_ascii_filename(self):
     767          m = self._make_message()
     768          m.set_content('foo', filename='ábárî.txt')
     769          self.assertEqual(bytes(m), textwrap.dedent("""\
     770              Content-Type: text/plain; charset="utf-8"
     771              Content-Transfer-Encoding: 7bit
     772              Content-Disposition: attachment;
     773               filename*=utf-8''%C3%A1b%C3%A1r%C3%AE.txt
     774              MIME-Version: 1.0
     775  
     776              foo
     777              """).encode('ascii'))
     778  
     779      def test_set_content_bytes_cte_7bit(self):
     780          m = self._make_message()
     781          m.set_content(b'ASCII-only message.\n',
     782              maintype='application', subtype='octet-stream', cte='7bit')
     783          self.assertEqual(str(m), textwrap.dedent("""\
     784              Content-Type: application/octet-stream
     785              Content-Transfer-Encoding: 7bit
     786              MIME-Version: 1.0
     787  
     788              ASCII-only message.
     789              """))
     790  
     791      content_object_params = {
     792          'text_plain': ('content', ()),
     793          'text_html': ('content', ('html',)),
     794          'application_octet_stream': (b'content',
     795                                       ('application', 'octet_stream')),
     796          'image_jpeg': (b'content', ('image', 'jpeg')),
     797          'message_rfc822': (message(), ()),
     798          'message_external_body': (message(), ('external-body',)),
     799          }
     800  
     801      def content_object_as_header_receiver(self, obj, mimetype):
     802          m = self._make_message()
     803          m.set_content(obj, *mimetype, headers=(
     804              'To: foo@example.com',
     805              'From: bar@simple.net'))
     806          self.assertEqual(m['to'], 'foo@example.com')
     807          self.assertEqual(m['from'], 'bar@simple.net')
     808  
     809      def content_object_as_disposition_inline_receiver(self, obj, mimetype):
     810          m = self._make_message()
     811          m.set_content(obj, *mimetype, disposition='inline')
     812          self.assertEqual(m['Content-Disposition'], 'inline')
     813  
     814      def content_object_as_non_ascii_filename_receiver(self, obj, mimetype):
     815          m = self._make_message()
     816          m.set_content(obj, *mimetype, disposition='inline', filename='bár.txt')
     817          self.assertEqual(m['Content-Disposition'], 'inline; filename="bár.txt"')
     818          self.assertEqual(m.get_filename(), "bár.txt")
     819          self.assertEqual(m['Content-Disposition'].params['filename'], "bár.txt")
     820  
     821      def content_object_as_cid_receiver(self, obj, mimetype):
     822          m = self._make_message()
     823          m.set_content(obj, *mimetype, cid='some_random_stuff')
     824          self.assertEqual(m['Content-ID'], 'some_random_stuff')
     825  
     826      def content_object_as_params_receiver(self, obj, mimetype):
     827          m = self._make_message()
     828          params = {'foo': 'bár', 'abc': 'xyz'}
     829          m.set_content(obj, *mimetype, params=params)
     830          if isinstance(obj, str):
     831              params['charset'] = 'utf-8'
     832          self.assertEqual(m['Content-Type'].params, params)
     833  
     834  
     835  if __name__ == '__main__':
     836      unittest.main()