python (3.12.0)
1 import unittest
2 from test.test_email import TestEmailBase, parameterize
3 import textwrap
4 from email import policy
5 from email.message import EmailMessage
6 from email.contentmanager import ContentManager, raw_data_manager
7
8
9 @parameterize
10 class ESC[4;38;5;81mTestContentManager(ESC[4;38;5;149mTestEmailBase):
11
12 policy = policy.default
13 message = EmailMessage
14
15 get_key_params = {
16 'full_type': (1, 'text/plain',),
17 'maintype_only': (2, 'text',),
18 'null_key': (3, '',),
19 }
20
21 def get_key_as_get_content_key(self, order, key):
22 def foo_getter(msg, foo=None):
23 bar = msg['X-Bar-Header']
24 return foo, bar
25 cm = ContentManager()
26 cm.add_get_handler(key, foo_getter)
27 m = self._make_message()
28 m['Content-Type'] = 'text/plain'
29 m['X-Bar-Header'] = 'foo'
30 self.assertEqual(cm.get_content(m, foo='bar'), ('bar', 'foo'))
31
32 def get_key_as_get_content_key_order(self, order, key):
33 def bar_getter(msg):
34 return msg['X-Bar-Header']
35 def foo_getter(msg):
36 return msg['X-Foo-Header']
37 cm = ContentManager()
38 cm.add_get_handler(key, foo_getter)
39 for precedence, key in self.get_key_params.values():
40 if precedence > order:
41 cm.add_get_handler(key, bar_getter)
42 m = self._make_message()
43 m['Content-Type'] = 'text/plain'
44 m['X-Bar-Header'] = 'bar'
45 m['X-Foo-Header'] = 'foo'
46 self.assertEqual(cm.get_content(m), ('foo'))
47
48 def test_get_content_raises_if_unknown_mimetype_and_no_default(self):
49 cm = ContentManager()
50 m = self._make_message()
51 m['Content-Type'] = 'text/plain'
52 with self.assertRaisesRegex(KeyError, 'text/plain'):
53 cm.get_content(m)
54
55 class ESC[4;38;5;81mBaseThing(ESC[4;38;5;149mstr):
56 pass
57 baseobject_full_path = __name__ + '.' + 'TestContentManager.BaseThing'
58 class ESC[4;38;5;81mThing(ESC[4;38;5;149mBaseThing):
59 pass
60 testobject_full_path = __name__ + '.' + 'TestContentManager.Thing'
61
62 set_key_params = {
63 'type': (0, Thing,),
64 'full_path': (1, testobject_full_path,),
65 'qualname': (2, 'TestContentManager.Thing',),
66 'name': (3, 'Thing',),
67 'base_type': (4, BaseThing,),
68 'base_full_path': (5, baseobject_full_path,),
69 'base_qualname': (6, 'TestContentManager.BaseThing',),
70 'base_name': (7, 'BaseThing',),
71 'str_type': (8, str,),
72 'str_full_path': (9, 'builtins.str',),
73 'str_name': (10, 'str',), # str name and qualname are the same
74 'null_key': (11, None,),
75 }
76
77 def set_key_as_set_content_key(self, order, key):
78 def foo_setter(msg, obj, foo=None):
79 msg['X-Foo-Header'] = foo
80 msg.set_payload(obj)
81 cm = ContentManager()
82 cm.add_set_handler(key, foo_setter)
83 m = self._make_message()
84 msg_obj = self.Thing()
85 cm.set_content(m, msg_obj, foo='bar')
86 self.assertEqual(m['X-Foo-Header'], 'bar')
87 self.assertEqual(m.get_payload(), msg_obj)
88
89 def set_key_as_set_content_key_order(self, order, key):
90 def foo_setter(msg, obj):
91 msg['X-FooBar-Header'] = 'foo'
92 msg.set_payload(obj)
93 def bar_setter(msg, obj):
94 msg['X-FooBar-Header'] = 'bar'
95 cm = ContentManager()
96 cm.add_set_handler(key, foo_setter)
97 for precedence, key in self.get_key_params.values():
98 if precedence > order:
99 cm.add_set_handler(key, bar_setter)
100 m = self._make_message()
101 msg_obj = self.Thing()
102 cm.set_content(m, msg_obj)
103 self.assertEqual(m['X-FooBar-Header'], 'foo')
104 self.assertEqual(m.get_payload(), msg_obj)
105
106 def test_set_content_raises_if_unknown_type_and_no_default(self):
107 cm = ContentManager()
108 m = self._make_message()
109 msg_obj = self.Thing()
110 with self.assertRaisesRegex(KeyError, self.testobject_full_path):
111 cm.set_content(m, msg_obj)
112
113 def test_set_content_raises_if_called_on_multipart(self):
114 cm = ContentManager()
115 m = self._make_message()
116 m['Content-Type'] = 'multipart/foo'
117 with self.assertRaises(TypeError):
118 cm.set_content(m, 'test')
119
120 def test_set_content_calls_clear_content(self):
121 m = self._make_message()
122 m['Content-Foo'] = 'bar'
123 m['Content-Type'] = 'text/html'
124 m['To'] = 'test'
125 m.set_payload('abc')
126 cm = ContentManager()
127 cm.add_set_handler(str, lambda *args, **kw: None)
128 m.set_content('xyz', content_manager=cm)
129 self.assertIsNone(m['Content-Foo'])
130 self.assertIsNone(m['Content-Type'])
131 self.assertEqual(m['To'], 'test')
132 self.assertIsNone(m.get_payload())
133
134
135 @parameterize
136 class ESC[4;38;5;81mTestRawDataManager(ESC[4;38;5;149mTestEmailBase):
137 # Note: these tests are dependent on the order in which headers are added
138 # to the message objects by the code. There's no defined ordering in
139 # RFC5322/MIME, so this makes the tests more fragile than the standards
140 # require. However, if the header order changes it is best to understand
141 # *why*, and make sure it isn't a subtle bug in whatever change was
142 # applied.
143
144 policy = policy.default.clone(max_line_length=60,
145 content_manager=raw_data_manager)
146 message = EmailMessage
147
148 def test_get_text_plain(self):
149 m = self._str_msg(textwrap.dedent("""\
150 Content-Type: text/plain
151
152 Basic text.
153 """))
154 self.assertEqual(raw_data_manager.get_content(m), "Basic text.\n")
155
156 def test_get_text_html(self):
157 m = self._str_msg(textwrap.dedent("""\
158 Content-Type: text/html
159
160 <p>Basic text.</p>
161 """))
162 self.assertEqual(raw_data_manager.get_content(m),
163 "<p>Basic text.</p>\n")
164
165 def test_get_text_plain_latin1(self):
166 m = self._bytes_msg(textwrap.dedent("""\
167 Content-Type: text/plain; charset=latin1
168
169 Basìc tëxt.
170 """).encode('latin1'))
171 self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
172
173 def test_get_text_plain_latin1_quoted_printable(self):
174 m = self._str_msg(textwrap.dedent("""\
175 Content-Type: text/plain; charset="latin-1"
176 Content-Transfer-Encoding: quoted-printable
177
178 Bas=ECc t=EBxt.
179 """))
180 self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
181
182 def test_get_text_plain_utf8_base64(self):
183 m = self._str_msg(textwrap.dedent("""\
184 Content-Type: text/plain; charset="utf8"
185 Content-Transfer-Encoding: base64
186
187 QmFzw6xjIHTDq3h0Lgo=
188 """))
189 self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
190
191 def test_get_text_plain_bad_utf8_quoted_printable(self):
192 m = self._str_msg(textwrap.dedent("""\
193 Content-Type: text/plain; charset="utf8"
194 Content-Transfer-Encoding: quoted-printable
195
196 Bas=c3=acc t=c3=abxt=fd.
197 """))
198 self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt�.\n")
199
200 def test_get_text_plain_bad_utf8_quoted_printable_ignore_errors(self):
201 m = self._str_msg(textwrap.dedent("""\
202 Content-Type: text/plain; charset="utf8"
203 Content-Transfer-Encoding: quoted-printable
204
205 Bas=c3=acc t=c3=abxt=fd.
206 """))
207 self.assertEqual(raw_data_manager.get_content(m, errors='ignore'),
208 "Basìc tëxt.\n")
209
210 def test_get_text_plain_utf8_base64_recoverable_bad_CTE_data(self):
211 m = self._str_msg(textwrap.dedent("""\
212 Content-Type: text/plain; charset="utf8"
213 Content-Transfer-Encoding: base64
214
215 QmFzw6xjIHTDq3h0Lgo\xFF=
216 """))
217 self.assertEqual(raw_data_manager.get_content(m, errors='ignore'),
218 "Basìc tëxt.\n")
219
220 def test_get_text_invalid_keyword(self):
221 m = self._str_msg(textwrap.dedent("""\
222 Content-Type: text/plain
223
224 Basic text.
225 """))
226 with self.assertRaises(TypeError):
227 raw_data_manager.get_content(m, foo='ignore')
228
229 def test_get_non_text(self):
230 template = textwrap.dedent("""\
231 Content-Type: {}
232 Content-Transfer-Encoding: base64
233
234 Ym9ndXMgZGF0YQ==
235 """)
236 for maintype in 'audio image video application'.split():
237 with self.subTest(maintype=maintype):
238 m = self._str_msg(template.format(maintype+'/foo'))
239 self.assertEqual(raw_data_manager.get_content(m), b"bogus data")
240
241 def test_get_non_text_invalid_keyword(self):
242 m = self._str_msg(textwrap.dedent("""\
243 Content-Type: image/jpg
244 Content-Transfer-Encoding: base64
245
246 Ym9ndXMgZGF0YQ==
247 """))
248 with self.assertRaises(TypeError):
249 raw_data_manager.get_content(m, errors='ignore')
250
251 def test_get_raises_on_multipart(self):
252 m = self._str_msg(textwrap.dedent("""\
253 Content-Type: multipart/mixed; boundary="==="
254
255 --===
256 --===--
257 """))
258 with self.assertRaises(KeyError):
259 raw_data_manager.get_content(m)
260
261 def test_get_message_rfc822_and_external_body(self):
262 template = textwrap.dedent("""\
263 Content-Type: message/{}
264
265 To: foo@example.com
266 From: bar@example.com
267 Subject: example
268
269 an example message
270 """)
271 for subtype in 'rfc822 external-body'.split():
272 with self.subTest(subtype=subtype):
273 m = self._str_msg(template.format(subtype))
274 sub_msg = raw_data_manager.get_content(m)
275 self.assertIsInstance(sub_msg, self.message)
276 self.assertEqual(raw_data_manager.get_content(sub_msg),
277 "an example message\n")
278 self.assertEqual(sub_msg['to'], 'foo@example.com')
279 self.assertEqual(sub_msg['from'].addresses[0].username, 'bar')
280
281 def test_get_message_non_rfc822_or_external_body_yields_bytes(self):
282 m = self._str_msg(textwrap.dedent("""\
283 Content-Type: message/partial
284
285 To: foo@example.com
286 From: bar@example.com
287 Subject: example
288
289 The real body is in another message.
290 """))
291 self.assertEqual(raw_data_manager.get_content(m)[:10], b'To: foo@ex')
292
293 def test_set_text_plain(self):
294 m = self._make_message()
295 content = "Simple message.\n"
296 raw_data_manager.set_content(m, content)
297 self.assertEqual(str(m), textwrap.dedent("""\
298 Content-Type: text/plain; charset="utf-8"
299 Content-Transfer-Encoding: 7bit
300
301 Simple message.
302 """))
303 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
304 self.assertEqual(m.get_content(), content)
305
306 def test_set_text_plain_null(self):
307 m = self._make_message()
308 content = ''
309 raw_data_manager.set_content(m, content)
310 self.assertEqual(str(m), textwrap.dedent("""\
311 Content-Type: text/plain; charset="utf-8"
312 Content-Transfer-Encoding: 7bit
313
314
315 """))
316 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), '\n')
317 self.assertEqual(m.get_content(), '\n')
318
319 def test_set_text_html(self):
320 m = self._make_message()
321 content = "<p>Simple message.</p>\n"
322 raw_data_manager.set_content(m, content, subtype='html')
323 self.assertEqual(str(m), textwrap.dedent("""\
324 Content-Type: text/html; charset="utf-8"
325 Content-Transfer-Encoding: 7bit
326
327 <p>Simple message.</p>
328 """))
329 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
330 self.assertEqual(m.get_content(), content)
331
332 def test_set_text_charset_latin_1(self):
333 m = self._make_message()
334 content = "Simple message.\n"
335 raw_data_manager.set_content(m, content, charset='latin-1')
336 self.assertEqual(str(m), textwrap.dedent("""\
337 Content-Type: text/plain; charset="iso-8859-1"
338 Content-Transfer-Encoding: 7bit
339
340 Simple message.
341 """))
342 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
343 self.assertEqual(m.get_content(), content)
344
345 def test_set_text_plain_long_line_heuristics(self):
346 m = self._make_message()
347 content = ("Simple but long message that is over 78 characters"
348 " long to force transfer encoding.\n")
349 raw_data_manager.set_content(m, content)
350 self.assertEqual(str(m), textwrap.dedent("""\
351 Content-Type: text/plain; charset="utf-8"
352 Content-Transfer-Encoding: quoted-printable
353
354 Simple but long message that is over 78 characters long to =
355 force transfer encoding.
356 """))
357 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
358 self.assertEqual(m.get_content(), content)
359
360 def test_set_text_short_line_minimal_non_ascii_heuristics(self):
361 m = self._make_message()
362 content = "et là il est monté sur moi et il commence à m'éto.\n"
363 raw_data_manager.set_content(m, content)
364 self.assertEqual(bytes(m), textwrap.dedent("""\
365 Content-Type: text/plain; charset="utf-8"
366 Content-Transfer-Encoding: 8bit
367
368 et là il est monté sur moi et il commence à m'éto.
369 """).encode('utf-8'))
370 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
371 self.assertEqual(m.get_content(), content)
372
373 def test_set_text_long_line_minimal_non_ascii_heuristics(self):
374 m = self._make_message()
375 content = ("j'ai un problème de python. il est sorti de son"
376 " vivarium. et là il est monté sur moi et il commence"
377 " à m'éto.\n")
378 raw_data_manager.set_content(m, content)
379 self.assertEqual(bytes(m), textwrap.dedent("""\
380 Content-Type: text/plain; charset="utf-8"
381 Content-Transfer-Encoding: quoted-printable
382
383 j'ai un probl=C3=A8me de python. il est sorti de son vivari=
384 um. et l=C3=A0 il est mont=C3=A9 sur moi et il commence =
385 =C3=A0 m'=C3=A9to.
386 """).encode('utf-8'))
387 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
388 self.assertEqual(m.get_content(), content)
389
390 def test_set_text_11_lines_long_line_minimal_non_ascii_heuristics(self):
391 m = self._make_message()
392 content = '\n'*10 + (
393 "j'ai un problème de python. il est sorti de son"
394 " vivarium. et là il est monté sur moi et il commence"
395 " à m'éto.\n")
396 raw_data_manager.set_content(m, content)
397 self.assertEqual(bytes(m), textwrap.dedent("""\
398 Content-Type: text/plain; charset="utf-8"
399 Content-Transfer-Encoding: quoted-printable
400 """ + '\n'*10 + """
401 j'ai un probl=C3=A8me de python. il est sorti de son vivari=
402 um. et l=C3=A0 il est mont=C3=A9 sur moi et il commence =
403 =C3=A0 m'=C3=A9to.
404 """).encode('utf-8'))
405 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
406 self.assertEqual(m.get_content(), content)
407
408 def test_set_text_maximal_non_ascii_heuristics(self):
409 m = self._make_message()
410 content = "áàäéèęöő.\n"
411 raw_data_manager.set_content(m, content)
412 self.assertEqual(bytes(m), textwrap.dedent("""\
413 Content-Type: text/plain; charset="utf-8"
414 Content-Transfer-Encoding: 8bit
415
416 áàäéèęöő.
417 """).encode('utf-8'))
418 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
419 self.assertEqual(m.get_content(), content)
420
421 def test_set_text_11_lines_maximal_non_ascii_heuristics(self):
422 m = self._make_message()
423 content = '\n'*10 + "áàäéèęöő.\n"
424 raw_data_manager.set_content(m, content)
425 self.assertEqual(bytes(m), textwrap.dedent("""\
426 Content-Type: text/plain; charset="utf-8"
427 Content-Transfer-Encoding: 8bit
428 """ + '\n'*10 + """
429 áàäéèęöő.
430 """).encode('utf-8'))
431 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
432 self.assertEqual(m.get_content(), content)
433
434 def test_set_text_long_line_maximal_non_ascii_heuristics(self):
435 m = self._make_message()
436 content = ("áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
437 "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
438 "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
439 raw_data_manager.set_content(m, content)
440 self.assertEqual(bytes(m), textwrap.dedent("""\
441 Content-Type: text/plain; charset="utf-8"
442 Content-Transfer-Encoding: base64
443
444 w6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOoxJnD
445 tsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOo
446 xJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TD
447 qcOoxJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOg
448 w6TDqcOoxJnDtsWRLgo=
449 """).encode('utf-8'))
450 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
451 self.assertEqual(m.get_content(), content)
452
453 def test_set_text_11_lines_long_line_maximal_non_ascii_heuristics(self):
454 # Yes, it chooses "wrong" here. It's a heuristic. So this result
455 # could change if we come up with a better heuristic.
456 m = self._make_message()
457 content = ('\n'*10 +
458 "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
459 "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
460 "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
461 raw_data_manager.set_content(m, "\n"*10 +
462 "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
463 "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
464 "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
465 self.assertEqual(bytes(m), textwrap.dedent("""\
466 Content-Type: text/plain; charset="utf-8"
467 Content-Transfer-Encoding: quoted-printable
468 """ + '\n'*10 + """
469 =C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=
470 =A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=
471 =C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=
472 =A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=
473 =C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=
474 =91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=
475 =C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=
476 =A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=
477 =C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=
478 =99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=
479 =C5=91.
480 """).encode('utf-8'))
481 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
482 self.assertEqual(m.get_content(), content)
483
484 def test_set_text_non_ascii_with_cte_7bit_raises(self):
485 m = self._make_message()
486 with self.assertRaises(UnicodeError):
487 raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit')
488
489 def test_set_text_non_ascii_with_charset_ascii_raises(self):
490 m = self._make_message()
491 with self.assertRaises(UnicodeError):
492 raw_data_manager.set_content(m,"áàäéèęöő.\n", charset='ascii')
493
494 def test_set_text_non_ascii_with_cte_7bit_and_charset_ascii_raises(self):
495 m = self._make_message()
496 with self.assertRaises(UnicodeError):
497 raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit', charset='ascii')
498
499 def test_set_message(self):
500 m = self._make_message()
501 m['Subject'] = "Forwarded message"
502 content = self._make_message()
503 content['To'] = 'python@vivarium.org'
504 content['From'] = 'police@monty.org'
505 content['Subject'] = "get back in your box"
506 content.set_content("Or face the comfy chair.")
507 raw_data_manager.set_content(m, content)
508 self.assertEqual(str(m), textwrap.dedent("""\
509 Subject: Forwarded message
510 Content-Type: message/rfc822
511 Content-Transfer-Encoding: 8bit
512
513 To: python@vivarium.org
514 From: police@monty.org
515 Subject: get back in your box
516 Content-Type: text/plain; charset="utf-8"
517 Content-Transfer-Encoding: 7bit
518 MIME-Version: 1.0
519
520 Or face the comfy chair.
521 """))
522 payload = m.get_payload(0)
523 self.assertIsInstance(payload, self.message)
524 self.assertEqual(str(payload), str(content))
525 self.assertIsInstance(m.get_content(), self.message)
526 self.assertEqual(str(m.get_content()), str(content))
527
528 def test_set_message_with_non_ascii_and_coercion_to_7bit(self):
529 m = self._make_message()
530 m['Subject'] = "Escape report"
531 content = self._make_message()
532 content['To'] = 'police@monty.org'
533 content['From'] = 'victim@monty.org'
534 content['Subject'] = "Help"
535 content.set_content("j'ai un problème de python. il est sorti de son"
536 " vivarium.")
537 raw_data_manager.set_content(m, content)
538 self.assertEqual(bytes(m), textwrap.dedent("""\
539 Subject: Escape report
540 Content-Type: message/rfc822
541 Content-Transfer-Encoding: 8bit
542
543 To: police@monty.org
544 From: victim@monty.org
545 Subject: Help
546 Content-Type: text/plain; charset="utf-8"
547 Content-Transfer-Encoding: 8bit
548 MIME-Version: 1.0
549
550 j'ai un problème de python. il est sorti de son vivarium.
551 """).encode('utf-8'))
552 # The choice of base64 for the body encoding is because generator
553 # doesn't bother with heuristics and uses it unconditionally for utf-8
554 # text.
555 # XXX: the first cte should be 7bit, too...that's a generator bug.
556 # XXX: the line length in the body also looks like a generator bug.
557 self.assertEqual(m.as_string(maxheaderlen=self.policy.max_line_length),
558 textwrap.dedent("""\
559 Subject: Escape report
560 Content-Type: message/rfc822
561 Content-Transfer-Encoding: 8bit
562
563 To: police@monty.org
564 From: victim@monty.org
565 Subject: Help
566 Content-Type: text/plain; charset="utf-8"
567 Content-Transfer-Encoding: base64
568 MIME-Version: 1.0
569
570 aidhaSB1biBwcm9ibMOobWUgZGUgcHl0aG9uLiBpbCBlc3Qgc29ydGkgZGUgc29uIHZpdmFyaXVt
571 Lgo=
572 """))
573 self.assertIsInstance(m.get_content(), self.message)
574 self.assertEqual(str(m.get_content()), str(content))
575
576 def test_set_message_invalid_cte_raises(self):
577 m = self._make_message()
578 content = self._make_message()
579 for cte in 'quoted-printable base64'.split():
580 for subtype in 'rfc822 external-body'.split():
581 with self.subTest(cte=cte, subtype=subtype):
582 with self.assertRaises(ValueError) as ar:
583 m.set_content(content, subtype, cte=cte)
584 exc = str(ar.exception)
585 self.assertIn(cte, exc)
586 self.assertIn(subtype, exc)
587 subtype = 'external-body'
588 for cte in '8bit binary'.split():
589 with self.subTest(cte=cte, subtype=subtype):
590 with self.assertRaises(ValueError) as ar:
591 m.set_content(content, subtype, cte=cte)
592 exc = str(ar.exception)
593 self.assertIn(cte, exc)
594 self.assertIn(subtype, exc)
595
596 def test_set_image_jpg(self):
597 for content in (b"bogus content",
598 bytearray(b"bogus content"),
599 memoryview(b"bogus content")):
600 with self.subTest(content=content):
601 m = self._make_message()
602 raw_data_manager.set_content(m, content, 'image', 'jpeg')
603 self.assertEqual(str(m), textwrap.dedent("""\
604 Content-Type: image/jpeg
605 Content-Transfer-Encoding: base64
606
607 Ym9ndXMgY29udGVudA==
608 """))
609 self.assertEqual(m.get_payload(decode=True), content)
610 self.assertEqual(m.get_content(), content)
611
612 def test_set_audio_aif_with_quoted_printable_cte(self):
613 # Why you would use qp, I don't know, but it is technically supported.
614 # XXX: the incorrect line length is because binascii.b2a_qp doesn't
615 # support a line length parameter, but we must use it to get newline
616 # encoding.
617 # XXX: what about that lack of tailing newline? Do we actually handle
618 # that correctly in all cases? That is, if the *source* has an
619 # unencoded newline, do we add an extra newline to the returned payload
620 # or not? And can that actually be disambiguated based on the RFC?
621 m = self._make_message()
622 content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100
623 m.set_content(content, 'audio', 'aif', cte='quoted-printable')
624 self.assertEqual(bytes(m), textwrap.dedent("""\
625 Content-Type: audio/aif
626 Content-Transfer-Encoding: quoted-printable
627 MIME-Version: 1.0
628
629 b=FFgus=09con=0At=0Dent=20zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz=
630 zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz""").encode('latin-1'))
631 self.assertEqual(m.get_payload(decode=True), content)
632 self.assertEqual(m.get_content(), content)
633
634 def test_set_video_mpeg_with_binary_cte(self):
635 m = self._make_message()
636 content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100
637 m.set_content(content, 'video', 'mpeg', cte='binary')
638 self.assertEqual(bytes(m), textwrap.dedent("""\
639 Content-Type: video/mpeg
640 Content-Transfer-Encoding: binary
641 MIME-Version: 1.0
642
643 """).encode('ascii') +
644 # XXX: the second \n ought to be a \r, but generator gets it wrong.
645 # THIS MEANS WE DON'T ACTUALLY SUPPORT THE 'binary' CTE.
646 b'b\xFFgus\tcon\nt\nent zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz' +
647 b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz')
648 self.assertEqual(m.get_payload(decode=True), content)
649 self.assertEqual(m.get_content(), content)
650
651 def test_set_application_octet_stream_with_8bit_cte(self):
652 # In 8bit mode, universal line end logic applies. It is up to the
653 # application to make sure the lines are short enough; we don't check.
654 m = self._make_message()
655 content = b'b\xFFgus\tcon\nt\rent\n' + b'z'*60 + b'\n'
656 m.set_content(content, 'application', 'octet-stream', cte='8bit')
657 self.assertEqual(bytes(m), textwrap.dedent("""\
658 Content-Type: application/octet-stream
659 Content-Transfer-Encoding: 8bit
660 MIME-Version: 1.0
661
662 """).encode('ascii') +
663 b'b\xFFgus\tcon\nt\nent\n' +
664 b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz\n')
665 self.assertEqual(m.get_payload(decode=True), content)
666 self.assertEqual(m.get_content(), content)
667
668 def test_set_headers_from_header_objects(self):
669 m = self._make_message()
670 content = "Simple message.\n"
671 header_factory = self.policy.header_factory
672 raw_data_manager.set_content(m, content, headers=(
673 header_factory("To", "foo@example.com"),
674 header_factory("From", "foo@example.com"),
675 header_factory("Subject", "I'm talking to myself.")))
676 self.assertEqual(str(m), textwrap.dedent("""\
677 Content-Type: text/plain; charset="utf-8"
678 To: foo@example.com
679 From: foo@example.com
680 Subject: I'm talking to myself.
681 Content-Transfer-Encoding: 7bit
682
683 Simple message.
684 """))
685
686 def test_set_headers_from_strings(self):
687 m = self._make_message()
688 content = "Simple message.\n"
689 raw_data_manager.set_content(m, content, headers=(
690 "X-Foo-Header: foo",
691 "X-Bar-Header: bar",))
692 self.assertEqual(str(m), textwrap.dedent("""\
693 Content-Type: text/plain; charset="utf-8"
694 X-Foo-Header: foo
695 X-Bar-Header: bar
696 Content-Transfer-Encoding: 7bit
697
698 Simple message.
699 """))
700
701 def test_set_headers_with_invalid_duplicate_string_header_raises(self):
702 m = self._make_message()
703 content = "Simple message.\n"
704 with self.assertRaisesRegex(ValueError, 'Content-Type'):
705 raw_data_manager.set_content(m, content, headers=(
706 "Content-Type: foo/bar",)
707 )
708
709 def test_set_headers_with_invalid_duplicate_header_header_raises(self):
710 m = self._make_message()
711 content = "Simple message.\n"
712 header_factory = self.policy.header_factory
713 with self.assertRaisesRegex(ValueError, 'Content-Type'):
714 raw_data_manager.set_content(m, content, headers=(
715 header_factory("Content-Type", " foo/bar"),)
716 )
717
718 def test_set_headers_with_defective_string_header_raises(self):
719 m = self._make_message()
720 content = "Simple message.\n"
721 with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'):
722 raw_data_manager.set_content(m, content, headers=(
723 'To: a@fairly@@invalid@address',)
724 )
725 print(m['To'].defects)
726
727 def test_set_headers_with_defective_header_header_raises(self):
728 m = self._make_message()
729 content = "Simple message.\n"
730 header_factory = self.policy.header_factory
731 with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'):
732 raw_data_manager.set_content(m, content, headers=(
733 header_factory('To', 'a@fairly@@invalid@address'),)
734 )
735 print(m['To'].defects)
736
737 def test_set_disposition_inline(self):
738 m = self._make_message()
739 m.set_content('foo', disposition='inline')
740 self.assertEqual(m['Content-Disposition'], 'inline')
741
742 def test_set_disposition_attachment(self):
743 m = self._make_message()
744 m.set_content('foo', disposition='attachment')
745 self.assertEqual(m['Content-Disposition'], 'attachment')
746
747 def test_set_disposition_foo(self):
748 m = self._make_message()
749 m.set_content('foo', disposition='foo')
750 self.assertEqual(m['Content-Disposition'], 'foo')
751
752 # XXX: we should have a 'strict' policy mode (beyond raise_on_defect) that
753 # would cause 'foo' above to raise.
754
755 def test_set_filename(self):
756 m = self._make_message()
757 m.set_content('foo', filename='bar.txt')
758 self.assertEqual(m['Content-Disposition'],
759 'attachment; filename="bar.txt"')
760
761 def test_set_filename_and_disposition_inline(self):
762 m = self._make_message()
763 m.set_content('foo', disposition='inline', filename='bar.txt')
764 self.assertEqual(m['Content-Disposition'], 'inline; filename="bar.txt"')
765
766 def test_set_non_ascii_filename(self):
767 m = self._make_message()
768 m.set_content('foo', filename='ábárî.txt')
769 self.assertEqual(bytes(m), textwrap.dedent("""\
770 Content-Type: text/plain; charset="utf-8"
771 Content-Transfer-Encoding: 7bit
772 Content-Disposition: attachment;
773 filename*=utf-8''%C3%A1b%C3%A1r%C3%AE.txt
774 MIME-Version: 1.0
775
776 foo
777 """).encode('ascii'))
778
779 def test_set_content_bytes_cte_7bit(self):
780 m = self._make_message()
781 m.set_content(b'ASCII-only message.\n',
782 maintype='application', subtype='octet-stream', cte='7bit')
783 self.assertEqual(str(m), textwrap.dedent("""\
784 Content-Type: application/octet-stream
785 Content-Transfer-Encoding: 7bit
786 MIME-Version: 1.0
787
788 ASCII-only message.
789 """))
790
791 content_object_params = {
792 'text_plain': ('content', ()),
793 'text_html': ('content', ('html',)),
794 'application_octet_stream': (b'content',
795 ('application', 'octet_stream')),
796 'image_jpeg': (b'content', ('image', 'jpeg')),
797 'message_rfc822': (message(), ()),
798 'message_external_body': (message(), ('external-body',)),
799 }
800
801 def content_object_as_header_receiver(self, obj, mimetype):
802 m = self._make_message()
803 m.set_content(obj, *mimetype, headers=(
804 'To: foo@example.com',
805 'From: bar@simple.net'))
806 self.assertEqual(m['to'], 'foo@example.com')
807 self.assertEqual(m['from'], 'bar@simple.net')
808
809 def content_object_as_disposition_inline_receiver(self, obj, mimetype):
810 m = self._make_message()
811 m.set_content(obj, *mimetype, disposition='inline')
812 self.assertEqual(m['Content-Disposition'], 'inline')
813
814 def content_object_as_non_ascii_filename_receiver(self, obj, mimetype):
815 m = self._make_message()
816 m.set_content(obj, *mimetype, disposition='inline', filename='bár.txt')
817 self.assertEqual(m['Content-Disposition'], 'inline; filename="bár.txt"')
818 self.assertEqual(m.get_filename(), "bár.txt")
819 self.assertEqual(m['Content-Disposition'].params['filename'], "bár.txt")
820
821 def content_object_as_cid_receiver(self, obj, mimetype):
822 m = self._make_message()
823 m.set_content(obj, *mimetype, cid='some_random_stuff')
824 self.assertEqual(m['Content-ID'], 'some_random_stuff')
825
826 def content_object_as_params_receiver(self, obj, mimetype):
827 m = self._make_message()
828 params = {'foo': 'bár', 'abc': 'xyz'}
829 m.set_content(obj, *mimetype, params=params)
830 if isinstance(obj, str):
831 params['charset'] = 'utf-8'
832 self.assertEqual(m['Content-Type'].params, params)
833
834
835 if __name__ == '__main__':
836 unittest.main()