1 import io
2 import types
3 import textwrap
4 import unittest
5 import email.errors
6 import email.policy
7 import email.parser
8 import email.generator
9 import email.message
10 from email import headerregistry
11
12 def make_defaults(base_defaults, differences):
13 defaults = base_defaults.copy()
14 defaults.update(differences)
15 return defaults
16
17 class ESC[4;38;5;81mPolicyAPITests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
18
19 longMessage = True
20
21 # Base default values.
22 compat32_defaults = {
23 'max_line_length': 78,
24 'linesep': '\n',
25 'cte_type': '8bit',
26 'raise_on_defect': False,
27 'mangle_from_': True,
28 'message_factory': None,
29 }
30 # These default values are the ones set on email.policy.default.
31 # If any of these defaults change, the docs must be updated.
32 policy_defaults = compat32_defaults.copy()
33 policy_defaults.update({
34 'utf8': False,
35 'raise_on_defect': False,
36 'header_factory': email.policy.EmailPolicy.header_factory,
37 'refold_source': 'long',
38 'content_manager': email.policy.EmailPolicy.content_manager,
39 'mangle_from_': False,
40 'message_factory': email.message.EmailMessage,
41 })
42
43 # For each policy under test, we give here what we expect the defaults to
44 # be for that policy. The second argument to make defaults is the
45 # difference between the base defaults and that for the particular policy.
46 new_policy = email.policy.EmailPolicy()
47 policies = {
48 email.policy.compat32: make_defaults(compat32_defaults, {}),
49 email.policy.default: make_defaults(policy_defaults, {}),
50 email.policy.SMTP: make_defaults(policy_defaults,
51 {'linesep': '\r\n'}),
52 email.policy.SMTPUTF8: make_defaults(policy_defaults,
53 {'linesep': '\r\n',
54 'utf8': True}),
55 email.policy.HTTP: make_defaults(policy_defaults,
56 {'linesep': '\r\n',
57 'max_line_length': None}),
58 email.policy.strict: make_defaults(policy_defaults,
59 {'raise_on_defect': True}),
60 new_policy: make_defaults(policy_defaults, {}),
61 }
62 # Creating a new policy creates a new header factory. There is a test
63 # later that proves this.
64 policies[new_policy]['header_factory'] = new_policy.header_factory
65
66 def test_defaults(self):
67 for policy, expected in self.policies.items():
68 for attr, value in expected.items():
69 with self.subTest(policy=policy, attr=attr):
70 self.assertEqual(getattr(policy, attr), value,
71 ("change {} docs/docstrings if defaults have "
72 "changed").format(policy))
73
74 def test_all_attributes_covered(self):
75 for policy, expected in self.policies.items():
76 for attr in dir(policy):
77 with self.subTest(policy=policy, attr=attr):
78 if (attr.startswith('_') or
79 isinstance(getattr(email.policy.EmailPolicy, attr),
80 types.FunctionType)):
81 continue
82 else:
83 self.assertIn(attr, expected,
84 "{} is not fully tested".format(attr))
85
86 def test_abc(self):
87 with self.assertRaises(TypeError) as cm:
88 email.policy.Policy()
89 msg = str(cm.exception)
90 abstract_methods = ('fold',
91 'fold_binary',
92 'header_fetch_parse',
93 'header_source_parse',
94 'header_store_parse')
95 for method in abstract_methods:
96 self.assertIn(method, msg)
97
98 def test_policy_is_immutable(self):
99 for policy, defaults in self.policies.items():
100 for attr in defaults:
101 with self.assertRaisesRegex(AttributeError, attr+".*read-only"):
102 setattr(policy, attr, None)
103 with self.assertRaisesRegex(AttributeError, 'no attribute.*foo'):
104 policy.foo = None
105
106 def test_set_policy_attrs_when_cloned(self):
107 # None of the attributes has a default value of None, so we set them
108 # all to None in the clone call and check that it worked.
109 for policyclass, defaults in self.policies.items():
110 testattrdict = {attr: None for attr in defaults}
111 policy = policyclass.clone(**testattrdict)
112 for attr in defaults:
113 self.assertIsNone(getattr(policy, attr))
114
115 def test_reject_non_policy_keyword_when_called(self):
116 for policyclass in self.policies:
117 with self.assertRaises(TypeError):
118 policyclass(this_keyword_should_not_be_valid=None)
119 with self.assertRaises(TypeError):
120 policyclass(newtline=None)
121
122 def test_policy_addition(self):
123 expected = self.policy_defaults.copy()
124 p1 = email.policy.default.clone(max_line_length=100)
125 p2 = email.policy.default.clone(max_line_length=50)
126 added = p1 + p2
127 expected.update(max_line_length=50)
128 for attr, value in expected.items():
129 self.assertEqual(getattr(added, attr), value)
130 added = p2 + p1
131 expected.update(max_line_length=100)
132 for attr, value in expected.items():
133 self.assertEqual(getattr(added, attr), value)
134 added = added + email.policy.default
135 for attr, value in expected.items():
136 self.assertEqual(getattr(added, attr), value)
137
138 def test_fold_zero_max_line_length(self):
139 expected = 'Subject: =?utf-8?q?=C3=A1?=\n'
140
141 msg = email.message.EmailMessage()
142 msg['Subject'] = 'á'
143
144 p1 = email.policy.default.clone(max_line_length=0)
145 p2 = email.policy.default.clone(max_line_length=None)
146
147 self.assertEqual(p1.fold('Subject', msg['Subject']), expected)
148 self.assertEqual(p2.fold('Subject', msg['Subject']), expected)
149
150 def test_register_defect(self):
151 class ESC[4;38;5;81mDummy:
152 def __init__(self):
153 self.defects = []
154 obj = Dummy()
155 defect = object()
156 policy = email.policy.EmailPolicy()
157 policy.register_defect(obj, defect)
158 self.assertEqual(obj.defects, [defect])
159 defect2 = object()
160 policy.register_defect(obj, defect2)
161 self.assertEqual(obj.defects, [defect, defect2])
162
163 class ESC[4;38;5;81mMyObj:
164 def __init__(self):
165 self.defects = []
166
167 class ESC[4;38;5;81mMyDefect(ESC[4;38;5;149mException):
168 pass
169
170 def test_handle_defect_raises_on_strict(self):
171 foo = self.MyObj()
172 defect = self.MyDefect("the telly is broken")
173 with self.assertRaisesRegex(self.MyDefect, "the telly is broken"):
174 email.policy.strict.handle_defect(foo, defect)
175
176 def test_handle_defect_registers_defect(self):
177 foo = self.MyObj()
178 defect1 = self.MyDefect("one")
179 email.policy.default.handle_defect(foo, defect1)
180 self.assertEqual(foo.defects, [defect1])
181 defect2 = self.MyDefect("two")
182 email.policy.default.handle_defect(foo, defect2)
183 self.assertEqual(foo.defects, [defect1, defect2])
184
185 class ESC[4;38;5;81mMyPolicy(ESC[4;38;5;149memailESC[4;38;5;149m.ESC[4;38;5;149mpolicyESC[4;38;5;149m.ESC[4;38;5;149mEmailPolicy):
186 defects = None
187 def __init__(self, *args, **kw):
188 super().__init__(*args, defects=[], **kw)
189 def register_defect(self, obj, defect):
190 self.defects.append(defect)
191
192 def test_overridden_register_defect_still_raises(self):
193 foo = self.MyObj()
194 defect = self.MyDefect("the telly is broken")
195 with self.assertRaisesRegex(self.MyDefect, "the telly is broken"):
196 self.MyPolicy(raise_on_defect=True).handle_defect(foo, defect)
197
198 def test_overridden_register_defect_works(self):
199 foo = self.MyObj()
200 defect1 = self.MyDefect("one")
201 my_policy = self.MyPolicy()
202 my_policy.handle_defect(foo, defect1)
203 self.assertEqual(my_policy.defects, [defect1])
204 self.assertEqual(foo.defects, [])
205 defect2 = self.MyDefect("two")
206 my_policy.handle_defect(foo, defect2)
207 self.assertEqual(my_policy.defects, [defect1, defect2])
208 self.assertEqual(foo.defects, [])
209
210 def test_default_header_factory(self):
211 h = email.policy.default.header_factory('Test', 'test')
212 self.assertEqual(h.name, 'Test')
213 self.assertIsInstance(h, headerregistry.UnstructuredHeader)
214 self.assertIsInstance(h, headerregistry.BaseHeader)
215
216 class ESC[4;38;5;81mFoo:
217 parse = headerregistry.UnstructuredHeader.parse
218
219 def test_each_Policy_gets_unique_factory(self):
220 policy1 = email.policy.EmailPolicy()
221 policy2 = email.policy.EmailPolicy()
222 policy1.header_factory.map_to_type('foo', self.Foo)
223 h = policy1.header_factory('foo', 'test')
224 self.assertIsInstance(h, self.Foo)
225 self.assertNotIsInstance(h, headerregistry.UnstructuredHeader)
226 h = policy2.header_factory('foo', 'test')
227 self.assertNotIsInstance(h, self.Foo)
228 self.assertIsInstance(h, headerregistry.UnstructuredHeader)
229
230 def test_clone_copies_factory(self):
231 policy1 = email.policy.EmailPolicy()
232 policy2 = policy1.clone()
233 policy1.header_factory.map_to_type('foo', self.Foo)
234 h = policy1.header_factory('foo', 'test')
235 self.assertIsInstance(h, self.Foo)
236 h = policy2.header_factory('foo', 'test')
237 self.assertIsInstance(h, self.Foo)
238
239 def test_new_factory_overrides_default(self):
240 mypolicy = email.policy.EmailPolicy()
241 myfactory = mypolicy.header_factory
242 newpolicy = mypolicy + email.policy.strict
243 self.assertEqual(newpolicy.header_factory, myfactory)
244 newpolicy = email.policy.strict + mypolicy
245 self.assertEqual(newpolicy.header_factory, myfactory)
246
247 def test_adding_default_policies_preserves_default_factory(self):
248 newpolicy = email.policy.default + email.policy.strict
249 self.assertEqual(newpolicy.header_factory,
250 email.policy.EmailPolicy.header_factory)
251 self.assertEqual(newpolicy.__dict__, {'raise_on_defect': True})
252
253 def test_non_ascii_chars_do_not_cause_inf_loop(self):
254 policy = email.policy.default.clone(max_line_length=20)
255 actual = policy.fold('Subject', 'ą' * 12)
256 self.assertEqual(
257 actual,
258 'Subject: \n' +
259 12 * ' =?utf-8?q?=C4=85?=\n')
260
261 def test_short_maxlen_error(self):
262 # RFC 2047 chrome takes up 7 characters, plus the length of the charset
263 # name, so folding should fail if maxlen is lower than the minimum
264 # required length for a line.
265
266 # Note: This is only triggered when there is a single word longer than
267 # max_line_length, hence the 1234567890 at the end of this whimsical
268 # subject. This is because when we encounter a word longer than
269 # max_line_length, it is broken down into encoded words to fit
270 # max_line_length. If the max_line_length isn't large enough to even
271 # contain the RFC 2047 chrome (`?=<charset>?q??=`), we fail.
272 subject = "Melt away the pounds with this one simple trick! 1234567890"
273
274 for maxlen in [3, 7, 9]:
275 with self.subTest(maxlen=maxlen):
276 policy = email.policy.default.clone(max_line_length=maxlen)
277 with self.assertRaises(email.errors.HeaderParseError):
278 policy.fold("Subject", subject)
279
280 # XXX: Need subclassing tests.
281 # For adding subclassed objects, make sure the usual rules apply (subclass
282 # wins), but that the order still works (right overrides left).
283
284
285 class ESC[4;38;5;81mTestException(ESC[4;38;5;149mException):
286 pass
287
288 class ESC[4;38;5;81mTestPolicyPropagation(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
289
290 # The abstract methods are used by the parser but not by the wrapper
291 # functions that call it, so if the exception gets raised we know that the
292 # policy was actually propagated all the way to feedparser.
293 class ESC[4;38;5;81mMyPolicy(ESC[4;38;5;149memailESC[4;38;5;149m.ESC[4;38;5;149mpolicyESC[4;38;5;149m.ESC[4;38;5;149mPolicy):
294 def badmethod(self, *args, **kw):
295 raise TestException("test")
296 fold = fold_binary = header_fetch_parser = badmethod
297 header_source_parse = header_store_parse = badmethod
298
299 def test_message_from_string(self):
300 with self.assertRaisesRegex(TestException, "^test$"):
301 email.message_from_string("Subject: test\n\n",
302 policy=self.MyPolicy)
303
304 def test_message_from_bytes(self):
305 with self.assertRaisesRegex(TestException, "^test$"):
306 email.message_from_bytes(b"Subject: test\n\n",
307 policy=self.MyPolicy)
308
309 def test_message_from_file(self):
310 f = io.StringIO('Subject: test\n\n')
311 with self.assertRaisesRegex(TestException, "^test$"):
312 email.message_from_file(f, policy=self.MyPolicy)
313
314 def test_message_from_binary_file(self):
315 f = io.BytesIO(b'Subject: test\n\n')
316 with self.assertRaisesRegex(TestException, "^test$"):
317 email.message_from_binary_file(f, policy=self.MyPolicy)
318
319 # These are redundant, but we need them for black-box completeness.
320
321 def test_parser(self):
322 p = email.parser.Parser(policy=self.MyPolicy)
323 with self.assertRaisesRegex(TestException, "^test$"):
324 p.parsestr('Subject: test\n\n')
325
326 def test_bytes_parser(self):
327 p = email.parser.BytesParser(policy=self.MyPolicy)
328 with self.assertRaisesRegex(TestException, "^test$"):
329 p.parsebytes(b'Subject: test\n\n')
330
331 # Now that we've established that all the parse methods get the
332 # policy in to feedparser, we can use message_from_string for
333 # the rest of the propagation tests.
334
335 def _make_msg(self, source='Subject: test\n\n', policy=None):
336 self.policy = email.policy.default.clone() if policy is None else policy
337 return email.message_from_string(source, policy=self.policy)
338
339 def test_parser_propagates_policy_to_message(self):
340 msg = self._make_msg()
341 self.assertIs(msg.policy, self.policy)
342
343 def test_parser_propagates_policy_to_sub_messages(self):
344 msg = self._make_msg(textwrap.dedent("""\
345 Subject: mime test
346 MIME-Version: 1.0
347 Content-Type: multipart/mixed, boundary="XXX"
348
349 --XXX
350 Content-Type: text/plain
351
352 test
353 --XXX
354 Content-Type: text/plain
355
356 test2
357 --XXX--
358 """))
359 for part in msg.walk():
360 self.assertIs(part.policy, self.policy)
361
362 def test_message_policy_propagates_to_generator(self):
363 msg = self._make_msg("Subject: test\nTo: foo\n\n",
364 policy=email.policy.default.clone(linesep='X'))
365 s = io.StringIO()
366 g = email.generator.Generator(s)
367 g.flatten(msg)
368 self.assertEqual(s.getvalue(), "Subject: testXTo: fooXX")
369
370 def test_message_policy_used_by_as_string(self):
371 msg = self._make_msg("Subject: test\nTo: foo\n\n",
372 policy=email.policy.default.clone(linesep='X'))
373 self.assertEqual(msg.as_string(), "Subject: testXTo: fooXX")
374
375
376 class ESC[4;38;5;81mTestConcretePolicies(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
377
378 def test_header_store_parse_rejects_newlines(self):
379 instance = email.policy.EmailPolicy()
380 self.assertRaises(ValueError,
381 instance.header_store_parse,
382 'From', 'spam\negg@foo.py')
383
384
385 if __name__ == '__main__':
386 unittest.main()