1 import textwrap
2 import unittest
3 import contextlib
4 from email import policy
5 from email import errors
6 from test.test_email import TestEmailBase
7
8
9 class ESC[4;38;5;81mTestDefectsBase:
10
11 policy = policy.default
12 raise_expected = False
13
14 @contextlib.contextmanager
15 def _raise_point(self, defect):
16 yield
17
18 def test_same_boundary_inner_outer(self):
19 source = textwrap.dedent("""\
20 Subject: XX
21 From: xx@xx.dk
22 To: XX
23 Mime-version: 1.0
24 Content-type: multipart/mixed;
25 boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
26
27 --MS_Mac_OE_3071477847_720252_MIME_Part
28 Content-type: multipart/alternative;
29 boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
30
31 --MS_Mac_OE_3071477847_720252_MIME_Part
32 Content-type: text/plain; charset="ISO-8859-1"
33 Content-transfer-encoding: quoted-printable
34
35 text
36
37 --MS_Mac_OE_3071477847_720252_MIME_Part
38 Content-type: text/html; charset="ISO-8859-1"
39 Content-transfer-encoding: quoted-printable
40
41 <HTML></HTML>
42
43 --MS_Mac_OE_3071477847_720252_MIME_Part--
44
45 --MS_Mac_OE_3071477847_720252_MIME_Part
46 Content-type: image/gif; name="xx.gif";
47 Content-disposition: attachment
48 Content-transfer-encoding: base64
49
50 Some removed base64 encoded chars.
51
52 --MS_Mac_OE_3071477847_720252_MIME_Part--
53
54 """)
55 # XXX better would be to actually detect the duplicate.
56 with self._raise_point(errors.StartBoundaryNotFoundDefect):
57 msg = self._str_msg(source)
58 if self.raise_expected: return
59 inner = msg.get_payload(0)
60 self.assertTrue(hasattr(inner, 'defects'))
61 self.assertEqual(len(self.get_defects(inner)), 1)
62 self.assertIsInstance(self.get_defects(inner)[0],
63 errors.StartBoundaryNotFoundDefect)
64
65 def test_multipart_no_boundary(self):
66 source = textwrap.dedent("""\
67 Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800)
68 From: foobar
69 Subject: broken mail
70 MIME-Version: 1.0
71 Content-Type: multipart/report; report-type=delivery-status;
72
73 --JAB03225.986577786/zinfandel.lacita.com
74
75 One part
76
77 --JAB03225.986577786/zinfandel.lacita.com
78 Content-Type: message/delivery-status
79
80 Header: Another part
81
82 --JAB03225.986577786/zinfandel.lacita.com--
83 """)
84 with self._raise_point(errors.NoBoundaryInMultipartDefect):
85 msg = self._str_msg(source)
86 if self.raise_expected: return
87 self.assertIsInstance(msg.get_payload(), str)
88 self.assertEqual(len(self.get_defects(msg)), 2)
89 self.assertIsInstance(self.get_defects(msg)[0],
90 errors.NoBoundaryInMultipartDefect)
91 self.assertIsInstance(self.get_defects(msg)[1],
92 errors.MultipartInvariantViolationDefect)
93
94 multipart_msg = textwrap.dedent("""\
95 Date: Wed, 14 Nov 2007 12:56:23 GMT
96 From: foo@bar.invalid
97 To: foo@bar.invalid
98 Subject: Content-Transfer-Encoding: base64 and multipart
99 MIME-Version: 1.0
100 Content-Type: multipart/mixed;
101 boundary="===============3344438784458119861=="{}
102
103 --===============3344438784458119861==
104 Content-Type: text/plain
105
106 Test message
107
108 --===============3344438784458119861==
109 Content-Type: application/octet-stream
110 Content-Transfer-Encoding: base64
111
112 YWJj
113
114 --===============3344438784458119861==--
115 """)
116
117 def test_multipart_invalid_cte(self):
118 with self._raise_point(
119 errors.InvalidMultipartContentTransferEncodingDefect):
120 msg = self._str_msg(
121 self.multipart_msg.format(
122 "\nContent-Transfer-Encoding: base64"))
123 if self.raise_expected: return
124 self.assertEqual(len(self.get_defects(msg)), 1)
125 self.assertIsInstance(self.get_defects(msg)[0],
126 errors.InvalidMultipartContentTransferEncodingDefect)
127
128 def test_multipart_no_cte_no_defect(self):
129 if self.raise_expected: return
130 msg = self._str_msg(self.multipart_msg.format(''))
131 self.assertEqual(len(self.get_defects(msg)), 0)
132
133 def test_multipart_valid_cte_no_defect(self):
134 if self.raise_expected: return
135 for cte in ('7bit', '8bit', 'BINary'):
136 msg = self._str_msg(
137 self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte))
138 self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte)
139
140 def test_lying_multipart(self):
141 source = textwrap.dedent("""\
142 From: "Allison Dunlap" <xxx@example.com>
143 To: yyy@example.com
144 Subject: 64423
145 Date: Sun, 11 Jul 2004 16:09:27 -0300
146 MIME-Version: 1.0
147 Content-Type: multipart/alternative;
148
149 Blah blah blah
150 """)
151 with self._raise_point(errors.NoBoundaryInMultipartDefect):
152 msg = self._str_msg(source)
153 if self.raise_expected: return
154 self.assertTrue(hasattr(msg, 'defects'))
155 self.assertEqual(len(self.get_defects(msg)), 2)
156 self.assertIsInstance(self.get_defects(msg)[0],
157 errors.NoBoundaryInMultipartDefect)
158 self.assertIsInstance(self.get_defects(msg)[1],
159 errors.MultipartInvariantViolationDefect)
160
161 def test_missing_start_boundary(self):
162 source = textwrap.dedent("""\
163 Content-Type: multipart/mixed; boundary="AAA"
164 From: Mail Delivery Subsystem <xxx@example.com>
165 To: yyy@example.com
166
167 --AAA
168
169 Stuff
170
171 --AAA
172 Content-Type: message/rfc822
173
174 From: webmaster@python.org
175 To: zzz@example.com
176 Content-Type: multipart/mixed; boundary="BBB"
177
178 --BBB--
179
180 --AAA--
181
182 """)
183 # The message structure is:
184 #
185 # multipart/mixed
186 # text/plain
187 # message/rfc822
188 # multipart/mixed [*]
189 #
190 # [*] This message is missing its start boundary
191 with self._raise_point(errors.StartBoundaryNotFoundDefect):
192 outer = self._str_msg(source)
193 if self.raise_expected: return
194 bad = outer.get_payload(1).get_payload(0)
195 self.assertEqual(len(self.get_defects(bad)), 1)
196 self.assertIsInstance(self.get_defects(bad)[0],
197 errors.StartBoundaryNotFoundDefect)
198
199 def test_first_line_is_continuation_header(self):
200 with self._raise_point(errors.FirstHeaderLineIsContinuationDefect):
201 msg = self._str_msg(' Line 1\nSubject: test\n\nbody')
202 if self.raise_expected: return
203 self.assertEqual(msg.keys(), ['Subject'])
204 self.assertEqual(msg.get_payload(), 'body')
205 self.assertEqual(len(self.get_defects(msg)), 1)
206 self.assertDefectsEqual(self.get_defects(msg),
207 [errors.FirstHeaderLineIsContinuationDefect])
208 self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n')
209
210 def test_missing_header_body_separator(self):
211 # Our heuristic if we see a line that doesn't look like a header (no
212 # leading whitespace but no ':') is to assume that the blank line that
213 # separates the header from the body is missing, and to stop parsing
214 # headers and start parsing the body.
215 with self._raise_point(errors.MissingHeaderBodySeparatorDefect):
216 msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n')
217 if self.raise_expected: return
218 self.assertEqual(msg.keys(), ['Subject'])
219 self.assertEqual(msg.get_payload(), 'not a header\nTo: abc\n\nb\n')
220 self.assertDefectsEqual(self.get_defects(msg),
221 [errors.MissingHeaderBodySeparatorDefect])
222
223 def test_bad_padding_in_base64_payload(self):
224 source = textwrap.dedent("""\
225 Subject: test
226 MIME-Version: 1.0
227 Content-Type: text/plain; charset="utf-8"
228 Content-Transfer-Encoding: base64
229
230 dmk
231 """)
232 msg = self._str_msg(source)
233 with self._raise_point(errors.InvalidBase64PaddingDefect):
234 payload = msg.get_payload(decode=True)
235 if self.raise_expected: return
236 self.assertEqual(payload, b'vi')
237 self.assertDefectsEqual(self.get_defects(msg),
238 [errors.InvalidBase64PaddingDefect])
239
240 def test_invalid_chars_in_base64_payload(self):
241 source = textwrap.dedent("""\
242 Subject: test
243 MIME-Version: 1.0
244 Content-Type: text/plain; charset="utf-8"
245 Content-Transfer-Encoding: base64
246
247 dm\x01k===
248 """)
249 msg = self._str_msg(source)
250 with self._raise_point(errors.InvalidBase64CharactersDefect):
251 payload = msg.get_payload(decode=True)
252 if self.raise_expected: return
253 self.assertEqual(payload, b'vi')
254 self.assertDefectsEqual(self.get_defects(msg),
255 [errors.InvalidBase64CharactersDefect])
256
257 def test_invalid_length_of_base64_payload(self):
258 source = textwrap.dedent("""\
259 Subject: test
260 MIME-Version: 1.0
261 Content-Type: text/plain; charset="utf-8"
262 Content-Transfer-Encoding: base64
263
264 abcde
265 """)
266 msg = self._str_msg(source)
267 with self._raise_point(errors.InvalidBase64LengthDefect):
268 payload = msg.get_payload(decode=True)
269 if self.raise_expected: return
270 self.assertEqual(payload, b'abcde')
271 self.assertDefectsEqual(self.get_defects(msg),
272 [errors.InvalidBase64LengthDefect])
273
274 def test_missing_ending_boundary(self):
275 source = textwrap.dedent("""\
276 To: 1@harrydomain4.com
277 Subject: Fwd: 1
278 MIME-Version: 1.0
279 Content-Type: multipart/alternative;
280 boundary="------------000101020201080900040301"
281
282 --------------000101020201080900040301
283 Content-Type: text/plain; charset=ISO-8859-1
284 Content-Transfer-Encoding: 7bit
285
286 Alternative 1
287
288 --------------000101020201080900040301
289 Content-Type: text/html; charset=ISO-8859-1
290 Content-Transfer-Encoding: 7bit
291
292 Alternative 2
293
294 """)
295 with self._raise_point(errors.CloseBoundaryNotFoundDefect):
296 msg = self._str_msg(source)
297 if self.raise_expected: return
298 self.assertEqual(len(msg.get_payload()), 2)
299 self.assertEqual(msg.get_payload(1).get_payload(), 'Alternative 2\n')
300 self.assertDefectsEqual(self.get_defects(msg),
301 [errors.CloseBoundaryNotFoundDefect])
302
303
304 class ESC[4;38;5;81mTestDefectDetection(ESC[4;38;5;149mTestDefectsBase, ESC[4;38;5;149mTestEmailBase):
305
306 def get_defects(self, obj):
307 return obj.defects
308
309
310 class ESC[4;38;5;81mTestDefectCapture(ESC[4;38;5;149mTestDefectsBase, ESC[4;38;5;149mTestEmailBase):
311
312 class ESC[4;38;5;81mCapturePolicy(ESC[4;38;5;149mpolicyESC[4;38;5;149m.ESC[4;38;5;149mEmailPolicy):
313 captured = None
314 def register_defect(self, obj, defect):
315 self.captured.append(defect)
316
317 def setUp(self):
318 self.policy = self.CapturePolicy(captured=list())
319
320 def get_defects(self, obj):
321 return self.policy.captured
322
323
324 class ESC[4;38;5;81mTestDefectRaising(ESC[4;38;5;149mTestDefectsBase, ESC[4;38;5;149mTestEmailBase):
325
326 policy = TestDefectsBase.policy
327 policy = policy.clone(raise_on_defect=True)
328 raise_expected = True
329
330 @contextlib.contextmanager
331 def _raise_point(self, defect):
332 with self.assertRaises(defect):
333 yield
334
335
336 if __name__ == '__main__':
337 unittest.main()