1 #
2 # test_multibytecodec.py
3 # Unit test for multibytecodec itself
4 #
5
6 import _multibytecodec
7 import codecs
8 import io
9 import sys
10 import textwrap
11 import unittest
12 from test import support
13 from test.support import os_helper
14 from test.support.os_helper import TESTFN
15
16 ALL_CJKENCODINGS = [
17 # _codecs_cn
18 'gb2312', 'gbk', 'gb18030', 'hz',
19 # _codecs_hk
20 'big5hkscs',
21 # _codecs_jp
22 'cp932', 'shift_jis', 'euc_jp', 'euc_jisx0213', 'shift_jisx0213',
23 'euc_jis_2004', 'shift_jis_2004',
24 # _codecs_kr
25 'cp949', 'euc_kr', 'johab',
26 # _codecs_tw
27 'big5', 'cp950',
28 # _codecs_iso2022
29 'iso2022_jp', 'iso2022_jp_1', 'iso2022_jp_2', 'iso2022_jp_2004',
30 'iso2022_jp_3', 'iso2022_jp_ext', 'iso2022_kr',
31 ]
32
33 class ESC[4;38;5;81mTest_MultibyteCodec(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
34
35 def test_nullcoding(self):
36 for enc in ALL_CJKENCODINGS:
37 self.assertEqual(b''.decode(enc), '')
38 self.assertEqual(str(b'', enc), '')
39 self.assertEqual(''.encode(enc), b'')
40
41 def test_str_decode(self):
42 for enc in ALL_CJKENCODINGS:
43 self.assertEqual('abcd'.encode(enc), b'abcd')
44
45 def test_errorcallback_longindex(self):
46 dec = codecs.getdecoder('euc-kr')
47 myreplace = lambda exc: ('', sys.maxsize+1)
48 codecs.register_error('test.cjktest', myreplace)
49 self.assertRaises(IndexError, dec,
50 b'apple\x92ham\x93spam', 'test.cjktest')
51
52 def test_errorcallback_custom_ignore(self):
53 # Issue #23215: MemoryError with custom error handlers and multibyte codecs
54 data = 100 * "\udc00"
55 codecs.register_error("test.ignore", codecs.ignore_errors)
56 for enc in ALL_CJKENCODINGS:
57 self.assertEqual(data.encode(enc, "test.ignore"), b'')
58
59 def test_codingspec(self):
60 try:
61 for enc in ALL_CJKENCODINGS:
62 code = '# coding: {}\n'.format(enc)
63 exec(code)
64 finally:
65 os_helper.unlink(TESTFN)
66
67 def test_init_segfault(self):
68 # bug #3305: this used to segfault
69 self.assertRaises(AttributeError,
70 _multibytecodec.MultibyteStreamReader, None)
71 self.assertRaises(AttributeError,
72 _multibytecodec.MultibyteStreamWriter, None)
73
74 def test_decode_unicode(self):
75 # Trying to decode a unicode string should raise a TypeError
76 for enc in ALL_CJKENCODINGS:
77 self.assertRaises(TypeError, codecs.getdecoder(enc), "")
78
79 class ESC[4;38;5;81mTest_IncrementalEncoder(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
80
81 def test_stateless(self):
82 # cp949 encoder isn't stateful at all.
83 encoder = codecs.getincrementalencoder('cp949')()
84 self.assertEqual(encoder.encode('\ud30c\uc774\uc36c \ub9c8\uc744'),
85 b'\xc6\xc4\xc0\xcc\xbd\xe3 \xb8\xb6\xc0\xbb')
86 self.assertEqual(encoder.reset(), None)
87 self.assertEqual(encoder.encode('\u2606\u223c\u2606', True),
88 b'\xa1\xd9\xa1\xad\xa1\xd9')
89 self.assertEqual(encoder.reset(), None)
90 self.assertEqual(encoder.encode('', True), b'')
91 self.assertEqual(encoder.encode('', False), b'')
92 self.assertEqual(encoder.reset(), None)
93
94 def test_stateful(self):
95 # jisx0213 encoder is stateful for a few code points. eg)
96 # U+00E6 => A9DC
97 # U+00E6 U+0300 => ABC4
98 # U+0300 => ABDC
99
100 encoder = codecs.getincrementalencoder('jisx0213')()
101 self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4')
102 self.assertEqual(encoder.encode('\u00e6'), b'')
103 self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4')
104 self.assertEqual(encoder.encode('\u00e6', True), b'\xa9\xdc')
105
106 self.assertEqual(encoder.reset(), None)
107 self.assertEqual(encoder.encode('\u0300'), b'\xab\xdc')
108
109 self.assertEqual(encoder.encode('\u00e6'), b'')
110 self.assertEqual(encoder.encode('', True), b'\xa9\xdc')
111 self.assertEqual(encoder.encode('', True), b'')
112
113 def test_stateful_keep_buffer(self):
114 encoder = codecs.getincrementalencoder('jisx0213')()
115 self.assertEqual(encoder.encode('\u00e6'), b'')
116 self.assertRaises(UnicodeEncodeError, encoder.encode, '\u0123')
117 self.assertEqual(encoder.encode('\u0300\u00e6'), b'\xab\xc4')
118 self.assertRaises(UnicodeEncodeError, encoder.encode, '\u0123')
119 self.assertEqual(encoder.reset(), None)
120 self.assertEqual(encoder.encode('\u0300'), b'\xab\xdc')
121 self.assertEqual(encoder.encode('\u00e6'), b'')
122 self.assertRaises(UnicodeEncodeError, encoder.encode, '\u0123')
123 self.assertEqual(encoder.encode('', True), b'\xa9\xdc')
124
125 def test_state_methods_with_buffer_state(self):
126 # euc_jis_2004 stores state as a buffer of pending bytes
127 encoder = codecs.getincrementalencoder('euc_jis_2004')()
128
129 initial_state = encoder.getstate()
130 self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4')
131 encoder.setstate(initial_state)
132 self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4')
133
134 self.assertEqual(encoder.encode('\u00e6'), b'')
135 partial_state = encoder.getstate()
136 self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4')
137 encoder.setstate(partial_state)
138 self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4')
139
140 def test_state_methods_with_non_buffer_state(self):
141 # iso2022_jp stores state without using a buffer
142 encoder = codecs.getincrementalencoder('iso2022_jp')()
143
144 self.assertEqual(encoder.encode('z'), b'z')
145 en_state = encoder.getstate()
146
147 self.assertEqual(encoder.encode('\u3042'), b'\x1b\x24\x42\x24\x22')
148 jp_state = encoder.getstate()
149 self.assertEqual(encoder.encode('z'), b'\x1b\x28\x42z')
150
151 encoder.setstate(jp_state)
152 self.assertEqual(encoder.encode('\u3042'), b'\x24\x22')
153
154 encoder.setstate(en_state)
155 self.assertEqual(encoder.encode('z'), b'z')
156
157 def test_getstate_returns_expected_value(self):
158 # Note: getstate is implemented such that these state values
159 # are expected to be the same across all builds of Python,
160 # regardless of x32/64 bit, endianness and compiler.
161
162 # euc_jis_2004 stores state as a buffer of pending bytes
163 buffer_state_encoder = codecs.getincrementalencoder('euc_jis_2004')()
164 self.assertEqual(buffer_state_encoder.getstate(), 0)
165 buffer_state_encoder.encode('\u00e6')
166 self.assertEqual(buffer_state_encoder.getstate(),
167 int.from_bytes(
168 b"\x02"
169 b"\xc3\xa6"
170 b"\x00\x00\x00\x00\x00\x00\x00\x00",
171 'little'))
172 buffer_state_encoder.encode('\u0300')
173 self.assertEqual(buffer_state_encoder.getstate(), 0)
174
175 # iso2022_jp stores state without using a buffer
176 non_buffer_state_encoder = codecs.getincrementalencoder('iso2022_jp')()
177 self.assertEqual(non_buffer_state_encoder.getstate(),
178 int.from_bytes(
179 b"\x00"
180 b"\x42\x42\x00\x00\x00\x00\x00\x00",
181 'little'))
182 non_buffer_state_encoder.encode('\u3042')
183 self.assertEqual(non_buffer_state_encoder.getstate(),
184 int.from_bytes(
185 b"\x00"
186 b"\xc2\x42\x00\x00\x00\x00\x00\x00",
187 'little'))
188
189 def test_setstate_validates_input_size(self):
190 encoder = codecs.getincrementalencoder('euc_jp')()
191 pending_size_nine = int.from_bytes(
192 b"\x09"
193 b"\x00\x00\x00\x00\x00\x00\x00\x00"
194 b"\x00\x00\x00\x00\x00\x00\x00\x00",
195 'little')
196 self.assertRaises(UnicodeError, encoder.setstate, pending_size_nine)
197
198 def test_setstate_validates_input_bytes(self):
199 encoder = codecs.getincrementalencoder('euc_jp')()
200 invalid_utf8 = int.from_bytes(
201 b"\x01"
202 b"\xff"
203 b"\x00\x00\x00\x00\x00\x00\x00\x00",
204 'little')
205 self.assertRaises(UnicodeDecodeError, encoder.setstate, invalid_utf8)
206
207 def test_issue5640(self):
208 encoder = codecs.getincrementalencoder('shift-jis')('backslashreplace')
209 self.assertEqual(encoder.encode('\xff'), b'\\xff')
210 self.assertEqual(encoder.encode('\n'), b'\n')
211
212 @support.cpython_only
213 def test_subinterp(self):
214 # bpo-42846: Test a CJK codec in a subinterpreter
215 import _testcapi
216 encoding = 'cp932'
217 text = "Python の開発は、1990 年ごろから開始されています。"
218 code = textwrap.dedent("""
219 import codecs
220 encoding = %r
221 text = %r
222 encoder = codecs.getincrementalencoder(encoding)()
223 text2 = encoder.encode(text).decode(encoding)
224 if text2 != text:
225 raise ValueError(f"encoding issue: {text2!a} != {text!a}")
226 """) % (encoding, text)
227 res = _testcapi.run_in_subinterp(code)
228 self.assertEqual(res, 0)
229
230 class ESC[4;38;5;81mTest_IncrementalDecoder(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
231
232 def test_dbcs(self):
233 # cp949 decoder is simple with only 1 or 2 bytes sequences.
234 decoder = codecs.getincrementaldecoder('cp949')()
235 self.assertEqual(decoder.decode(b'\xc6\xc4\xc0\xcc\xbd'),
236 '\ud30c\uc774')
237 self.assertEqual(decoder.decode(b'\xe3 \xb8\xb6\xc0\xbb'),
238 '\uc36c \ub9c8\uc744')
239 self.assertEqual(decoder.decode(b''), '')
240
241 def test_dbcs_keep_buffer(self):
242 decoder = codecs.getincrementaldecoder('cp949')()
243 self.assertEqual(decoder.decode(b'\xc6\xc4\xc0'), '\ud30c')
244 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', True)
245 self.assertEqual(decoder.decode(b'\xcc'), '\uc774')
246
247 self.assertEqual(decoder.decode(b'\xc6\xc4\xc0'), '\ud30c')
248 self.assertRaises(UnicodeDecodeError, decoder.decode,
249 b'\xcc\xbd', True)
250 self.assertEqual(decoder.decode(b'\xcc'), '\uc774')
251
252 def test_iso2022(self):
253 decoder = codecs.getincrementaldecoder('iso2022-jp')()
254 ESC = b'\x1b'
255 self.assertEqual(decoder.decode(ESC + b'('), '')
256 self.assertEqual(decoder.decode(b'B', True), '')
257 self.assertEqual(decoder.decode(ESC + b'$'), '')
258 self.assertEqual(decoder.decode(b'B@$'), '\u4e16')
259 self.assertEqual(decoder.decode(b'@$@'), '\u4e16')
260 self.assertEqual(decoder.decode(b'$', True), '\u4e16')
261 self.assertEqual(decoder.reset(), None)
262 self.assertEqual(decoder.decode(b'@$'), '@$')
263 self.assertEqual(decoder.decode(ESC + b'$'), '')
264 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', True)
265 self.assertEqual(decoder.decode(b'B@$'), '\u4e16')
266
267 def test_decode_unicode(self):
268 # Trying to decode a unicode string should raise a TypeError
269 for enc in ALL_CJKENCODINGS:
270 decoder = codecs.getincrementaldecoder(enc)()
271 self.assertRaises(TypeError, decoder.decode, "")
272
273 def test_state_methods(self):
274 decoder = codecs.getincrementaldecoder('euc_jp')()
275
276 # Decode a complete input sequence
277 self.assertEqual(decoder.decode(b'\xa4\xa6'), '\u3046')
278 pending1, _ = decoder.getstate()
279 self.assertEqual(pending1, b'')
280
281 # Decode first half of a partial input sequence
282 self.assertEqual(decoder.decode(b'\xa4'), '')
283 pending2, flags2 = decoder.getstate()
284 self.assertEqual(pending2, b'\xa4')
285
286 # Decode second half of a partial input sequence
287 self.assertEqual(decoder.decode(b'\xa6'), '\u3046')
288 pending3, _ = decoder.getstate()
289 self.assertEqual(pending3, b'')
290
291 # Jump back and decode second half of partial input sequence again
292 decoder.setstate((pending2, flags2))
293 self.assertEqual(decoder.decode(b'\xa6'), '\u3046')
294 pending4, _ = decoder.getstate()
295 self.assertEqual(pending4, b'')
296
297 # Ensure state values are preserved correctly
298 decoder.setstate((b'abc', 123456789))
299 self.assertEqual(decoder.getstate(), (b'abc', 123456789))
300
301 def test_setstate_validates_input(self):
302 decoder = codecs.getincrementaldecoder('euc_jp')()
303 self.assertRaises(TypeError, decoder.setstate, 123)
304 self.assertRaises(TypeError, decoder.setstate, ("invalid", 0))
305 self.assertRaises(TypeError, decoder.setstate, (b"1234", "invalid"))
306 self.assertRaises(UnicodeError, decoder.setstate, (b"123456789", 0))
307
308 class ESC[4;38;5;81mTest_StreamReader(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
309 def test_bug1728403(self):
310 try:
311 f = open(TESTFN, 'wb')
312 try:
313 f.write(b'\xa1')
314 finally:
315 f.close()
316 f = codecs.open(TESTFN, encoding='cp949')
317 try:
318 self.assertRaises(UnicodeDecodeError, f.read, 2)
319 finally:
320 f.close()
321 finally:
322 os_helper.unlink(TESTFN)
323
324 class ESC[4;38;5;81mTest_StreamWriter(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
325 def test_gb18030(self):
326 s= io.BytesIO()
327 c = codecs.getwriter('gb18030')(s)
328 c.write('123')
329 self.assertEqual(s.getvalue(), b'123')
330 c.write('\U00012345')
331 self.assertEqual(s.getvalue(), b'123\x907\x959')
332 c.write('\uac00\u00ac')
333 self.assertEqual(s.getvalue(),
334 b'123\x907\x959\x827\xcf5\x810\x851')
335
336 def test_utf_8(self):
337 s= io.BytesIO()
338 c = codecs.getwriter('utf-8')(s)
339 c.write('123')
340 self.assertEqual(s.getvalue(), b'123')
341 c.write('\U00012345')
342 self.assertEqual(s.getvalue(), b'123\xf0\x92\x8d\x85')
343 c.write('\uac00\u00ac')
344 self.assertEqual(s.getvalue(),
345 b'123\xf0\x92\x8d\x85'
346 b'\xea\xb0\x80\xc2\xac')
347
348 def test_streamwriter_strwrite(self):
349 s = io.BytesIO()
350 wr = codecs.getwriter('gb18030')(s)
351 wr.write('abcd')
352 self.assertEqual(s.getvalue(), b'abcd')
353
354 class ESC[4;38;5;81mTest_ISO2022(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
355 def test_g2(self):
356 iso2022jp2 = b'\x1b(B:hu4:unit\x1b.A\x1bNi de famille'
357 uni = ':hu4:unit\xe9 de famille'
358 self.assertEqual(iso2022jp2.decode('iso2022-jp-2'), uni)
359
360 def test_iso2022_jp_g0(self):
361 self.assertNotIn(b'\x0e', '\N{SOFT HYPHEN}'.encode('iso-2022-jp-2'))
362 for encoding in ('iso-2022-jp-2004', 'iso-2022-jp-3'):
363 e = '\u3406'.encode(encoding)
364 self.assertFalse(any(x > 0x80 for x in e))
365
366 @support.requires_resource('cpu')
367 def test_bug1572832(self):
368 for x in range(0x10000, 0x110000):
369 # Any ISO 2022 codec will cause the segfault
370 chr(x).encode('iso_2022_jp', 'ignore')
371
372 class ESC[4;38;5;81mTestStateful(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
373 text = '\u4E16\u4E16'
374 encoding = 'iso-2022-jp'
375 expected = b'\x1b$B@$@$'
376 reset = b'\x1b(B'
377 expected_reset = expected + reset
378
379 def test_encode(self):
380 self.assertEqual(self.text.encode(self.encoding), self.expected_reset)
381
382 def test_incrementalencoder(self):
383 encoder = codecs.getincrementalencoder(self.encoding)()
384 output = b''.join(
385 encoder.encode(char)
386 for char in self.text)
387 self.assertEqual(output, self.expected)
388 self.assertEqual(encoder.encode('', final=True), self.reset)
389 self.assertEqual(encoder.encode('', final=True), b'')
390
391 def test_incrementalencoder_final(self):
392 encoder = codecs.getincrementalencoder(self.encoding)()
393 last_index = len(self.text) - 1
394 output = b''.join(
395 encoder.encode(char, index == last_index)
396 for index, char in enumerate(self.text))
397 self.assertEqual(output, self.expected_reset)
398 self.assertEqual(encoder.encode('', final=True), b'')
399
400 class ESC[4;38;5;81mTestHZStateful(ESC[4;38;5;149mTestStateful):
401 text = '\u804a\u804a'
402 encoding = 'hz'
403 expected = b'~{ADAD'
404 reset = b'~}'
405 expected_reset = expected + reset
406
407
408 if __name__ == "__main__":
409 unittest.main()