(root)/
Python-3.12.0/
Lib/
test/
multibytecodec_support.py
       1  #
       2  # multibytecodec_support.py
       3  #   Common Unittest Routines for CJK codecs
       4  #
       5  
       6  import codecs
       7  import os
       8  import re
       9  import sys
      10  import unittest
      11  from http.client import HTTPException
      12  from test import support
      13  from io import BytesIO
      14  
      15  class ESC[4;38;5;81mTestBase:
      16      encoding        = ''   # codec name
      17      codec           = None # codec tuple (with 4 elements)
      18      tstring         = None # must set. 2 strings to test StreamReader
      19  
      20      codectests      = None # must set. codec test tuple
      21      roundtriptest   = 1    # set if roundtrip is possible with unicode
      22      has_iso10646    = 0    # set if this encoding contains whole iso10646 map
      23      xmlcharnametest = None # string to test xmlcharrefreplace
      24      unmappedunicode = '\udeee' # a unicode code point that is not mapped.
      25  
      26      def setUp(self):
      27          if self.codec is None:
      28              self.codec = codecs.lookup(self.encoding)
      29          self.encode = self.codec.encode
      30          self.decode = self.codec.decode
      31          self.reader = self.codec.streamreader
      32          self.writer = self.codec.streamwriter
      33          self.incrementalencoder = self.codec.incrementalencoder
      34          self.incrementaldecoder = self.codec.incrementaldecoder
      35  
      36      def test_chunkcoding(self):
      37          tstring_lines = []
      38          for b in self.tstring:
      39              lines = b.split(b"\n")
      40              last = lines.pop()
      41              assert last == b""
      42              lines = [line + b"\n" for line in lines]
      43              tstring_lines.append(lines)
      44          for native, utf8 in zip(*tstring_lines):
      45              u = self.decode(native)[0]
      46              self.assertEqual(u, utf8.decode('utf-8'))
      47              if self.roundtriptest:
      48                  self.assertEqual(native, self.encode(u)[0])
      49  
      50      def test_errorhandle(self):
      51          for source, scheme, expected in self.codectests:
      52              if isinstance(source, bytes):
      53                  func = self.decode
      54              else:
      55                  func = self.encode
      56              if expected:
      57                  result = func(source, scheme)[0]
      58                  if func is self.decode:
      59                      self.assertTrue(type(result) is str, type(result))
      60                      self.assertEqual(result, expected,
      61                                       '%a.decode(%r, %r)=%a != %a'
      62                                       % (source, self.encoding, scheme, result,
      63                                          expected))
      64                  else:
      65                      self.assertTrue(type(result) is bytes, type(result))
      66                      self.assertEqual(result, expected,
      67                                       '%a.encode(%r, %r)=%a != %a'
      68                                       % (source, self.encoding, scheme, result,
      69                                          expected))
      70              else:
      71                  self.assertRaises(UnicodeError, func, source, scheme)
      72  
      73      def test_xmlcharrefreplace(self):
      74          if self.has_iso10646:
      75              self.skipTest('encoding contains full ISO 10646 map')
      76  
      77          s = "\u0b13\u0b23\u0b60 nd eggs"
      78          self.assertEqual(
      79              self.encode(s, "xmlcharrefreplace")[0],
      80              b"ଓଣୠ nd eggs"
      81          )
      82  
      83      def test_customreplace_encode(self):
      84          if self.has_iso10646:
      85              self.skipTest('encoding contains full ISO 10646 map')
      86  
      87          from html.entities import codepoint2name
      88  
      89          def xmlcharnamereplace(exc):
      90              if not isinstance(exc, UnicodeEncodeError):
      91                  raise TypeError("don't know how to handle %r" % exc)
      92              l = []
      93              for c in exc.object[exc.start:exc.end]:
      94                  if ord(c) in codepoint2name:
      95                      l.append("&%s;" % codepoint2name[ord(c)])
      96                  else:
      97                      l.append("&#%d;" % ord(c))
      98              return ("".join(l), exc.end)
      99  
     100          codecs.register_error("test.xmlcharnamereplace", xmlcharnamereplace)
     101  
     102          if self.xmlcharnametest:
     103              sin, sout = self.xmlcharnametest
     104          else:
     105              sin = "\xab\u211c\xbb = \u2329\u1234\u232a"
     106              sout = b"«ℜ» = ⟨ሴ⟩"
     107          self.assertEqual(self.encode(sin,
     108                                      "test.xmlcharnamereplace")[0], sout)
     109  
     110      def test_callback_returns_bytes(self):
     111          def myreplace(exc):
     112              return (b"1234", exc.end)
     113          codecs.register_error("test.cjktest", myreplace)
     114          enc = self.encode("abc" + self.unmappedunicode + "def", "test.cjktest")[0]
     115          self.assertEqual(enc, b"abc1234def")
     116  
     117      def test_callback_wrong_objects(self):
     118          def myreplace(exc):
     119              return (ret, exc.end)
     120          codecs.register_error("test.cjktest", myreplace)
     121  
     122          for ret in ([1, 2, 3], [], None, object()):
     123              self.assertRaises(TypeError, self.encode, self.unmappedunicode,
     124                                'test.cjktest')
     125  
     126      def test_callback_long_index(self):
     127          def myreplace(exc):
     128              return ('x', int(exc.end))
     129          codecs.register_error("test.cjktest", myreplace)
     130          self.assertEqual(self.encode('abcd' + self.unmappedunicode + 'efgh',
     131                                       'test.cjktest'), (b'abcdxefgh', 9))
     132  
     133          def myreplace(exc):
     134              return ('x', sys.maxsize + 1)
     135          codecs.register_error("test.cjktest", myreplace)
     136          self.assertRaises(IndexError, self.encode, self.unmappedunicode,
     137                            'test.cjktest')
     138  
     139      def test_callback_None_index(self):
     140          def myreplace(exc):
     141              return ('x', None)
     142          codecs.register_error("test.cjktest", myreplace)
     143          self.assertRaises(TypeError, self.encode, self.unmappedunicode,
     144                            'test.cjktest')
     145  
     146      def test_callback_backward_index(self):
     147          def myreplace(exc):
     148              if myreplace.limit > 0:
     149                  myreplace.limit -= 1
     150                  return ('REPLACED', 0)
     151              else:
     152                  return ('TERMINAL', exc.end)
     153          myreplace.limit = 3
     154          codecs.register_error("test.cjktest", myreplace)
     155          self.assertEqual(self.encode('abcd' + self.unmappedunicode + 'efgh',
     156                                       'test.cjktest'),
     157                  (b'abcdREPLACEDabcdREPLACEDabcdREPLACEDabcdTERMINALefgh', 9))
     158  
     159      def test_callback_forward_index(self):
     160          def myreplace(exc):
     161              return ('REPLACED', exc.end + 2)
     162          codecs.register_error("test.cjktest", myreplace)
     163          self.assertEqual(self.encode('abcd' + self.unmappedunicode + 'efgh',
     164                                       'test.cjktest'), (b'abcdREPLACEDgh', 9))
     165  
     166      def test_callback_index_outofbound(self):
     167          def myreplace(exc):
     168              return ('TERM', 100)
     169          codecs.register_error("test.cjktest", myreplace)
     170          self.assertRaises(IndexError, self.encode, self.unmappedunicode,
     171                            'test.cjktest')
     172  
     173      def test_incrementalencoder(self):
     174          UTF8Reader = codecs.getreader('utf-8')
     175          for sizehint in [None] + list(range(1, 33)) + \
     176                          [64, 128, 256, 512, 1024]:
     177              istream = UTF8Reader(BytesIO(self.tstring[1]))
     178              ostream = BytesIO()
     179              encoder = self.incrementalencoder()
     180              while 1:
     181                  if sizehint is not None:
     182                      data = istream.read(sizehint)
     183                  else:
     184                      data = istream.read()
     185  
     186                  if not data:
     187                      break
     188                  e = encoder.encode(data)
     189                  ostream.write(e)
     190  
     191              self.assertEqual(ostream.getvalue(), self.tstring[0])
     192  
     193      def test_incrementaldecoder(self):
     194          UTF8Writer = codecs.getwriter('utf-8')
     195          for sizehint in [None, -1] + list(range(1, 33)) + \
     196                          [64, 128, 256, 512, 1024]:
     197              istream = BytesIO(self.tstring[0])
     198              ostream = UTF8Writer(BytesIO())
     199              decoder = self.incrementaldecoder()
     200              while 1:
     201                  data = istream.read(sizehint)
     202                  if not data:
     203                      break
     204                  else:
     205                      u = decoder.decode(data)
     206                      ostream.write(u)
     207  
     208              self.assertEqual(ostream.getvalue(), self.tstring[1])
     209  
     210      def test_incrementalencoder_error_callback(self):
     211          inv = self.unmappedunicode
     212  
     213          e = self.incrementalencoder()
     214          self.assertRaises(UnicodeEncodeError, e.encode, inv, True)
     215  
     216          e.errors = 'ignore'
     217          self.assertEqual(e.encode(inv, True), b'')
     218  
     219          e.reset()
     220          def tempreplace(exc):
     221              return ('called', exc.end)
     222          codecs.register_error('test.incremental_error_callback', tempreplace)
     223          e.errors = 'test.incremental_error_callback'
     224          self.assertEqual(e.encode(inv, True), b'called')
     225  
     226          # again
     227          e.errors = 'ignore'
     228          self.assertEqual(e.encode(inv, True), b'')
     229  
     230      def test_streamreader(self):
     231          UTF8Writer = codecs.getwriter('utf-8')
     232          for name in ["read", "readline", "readlines"]:
     233              for sizehint in [None, -1] + list(range(1, 33)) + \
     234                              [64, 128, 256, 512, 1024]:
     235                  istream = self.reader(BytesIO(self.tstring[0]))
     236                  ostream = UTF8Writer(BytesIO())
     237                  func = getattr(istream, name)
     238                  while 1:
     239                      data = func(sizehint)
     240                      if not data:
     241                          break
     242                      if name == "readlines":
     243                          ostream.writelines(data)
     244                      else:
     245                          ostream.write(data)
     246  
     247                  self.assertEqual(ostream.getvalue(), self.tstring[1])
     248  
     249      def test_streamwriter(self):
     250          readfuncs = ('read', 'readline', 'readlines')
     251          UTF8Reader = codecs.getreader('utf-8')
     252          for name in readfuncs:
     253              for sizehint in [None] + list(range(1, 33)) + \
     254                              [64, 128, 256, 512, 1024]:
     255                  istream = UTF8Reader(BytesIO(self.tstring[1]))
     256                  ostream = self.writer(BytesIO())
     257                  func = getattr(istream, name)
     258                  while 1:
     259                      if sizehint is not None:
     260                          data = func(sizehint)
     261                      else:
     262                          data = func()
     263  
     264                      if not data:
     265                          break
     266                      if name == "readlines":
     267                          ostream.writelines(data)
     268                      else:
     269                          ostream.write(data)
     270  
     271                  self.assertEqual(ostream.getvalue(), self.tstring[0])
     272  
     273      def test_streamwriter_reset_no_pending(self):
     274          # Issue #23247: Calling reset() on a fresh StreamWriter instance
     275          # (without pending data) must not crash
     276          stream = BytesIO()
     277          writer = self.writer(stream)
     278          writer.reset()
     279  
     280      def test_incrementalencoder_del_segfault(self):
     281          e = self.incrementalencoder()
     282          with self.assertRaises(AttributeError):
     283              del e.errors
     284  
     285  
     286  class ESC[4;38;5;81mTestBase_Mapping(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     287      pass_enctest = []
     288      pass_dectest = []
     289      supmaps = []
     290      codectests = []
     291  
     292      def setUp(self):
     293          try:
     294              self.open_mapping_file().close() # test it to report the error early
     295          except (OSError, HTTPException):
     296              self.skipTest("Could not retrieve "+self.mapfileurl)
     297  
     298      def open_mapping_file(self):
     299          return support.open_urlresource(self.mapfileurl, encoding="utf-8")
     300  
     301      def test_mapping_file(self):
     302          if self.mapfileurl.endswith('.xml'):
     303              self._test_mapping_file_ucm()
     304          else:
     305              self._test_mapping_file_plain()
     306  
     307      def _test_mapping_file_plain(self):
     308          def unichrs(s):
     309              return ''.join(chr(int(x, 16)) for x in s.split('+'))
     310  
     311          urt_wa = {}
     312  
     313          with self.open_mapping_file() as f:
     314              for line in f:
     315                  if not line:
     316                      break
     317                  data = line.split('#')[0].split()
     318                  if len(data) != 2:
     319                      continue
     320  
     321                  if data[0][:2] != '0x':
     322                      self.fail(f"Invalid line: {line!r}")
     323                  csetch = bytes.fromhex(data[0][2:])
     324                  if len(csetch) == 1 and 0x80 <= csetch[0]:
     325                      continue
     326  
     327                  unich = unichrs(data[1])
     328                  if ord(unich) == 0xfffd or unich in urt_wa:
     329                      continue
     330                  urt_wa[unich] = csetch
     331  
     332                  self._testpoint(csetch, unich)
     333  
     334      def _test_mapping_file_ucm(self):
     335          with self.open_mapping_file() as f:
     336              ucmdata = f.read()
     337          uc = re.findall('<a u="([A-F0-9]{4})" b="([0-9A-F ]+)"/>', ucmdata)
     338          for uni, coded in uc:
     339              unich = chr(int(uni, 16))
     340              codech = bytes.fromhex(coded)
     341              self._testpoint(codech, unich)
     342  
     343      def test_mapping_supplemental(self):
     344          for mapping in self.supmaps:
     345              self._testpoint(*mapping)
     346  
     347      def _testpoint(self, csetch, unich):
     348          if (csetch, unich) not in self.pass_enctest:
     349              self.assertEqual(unich.encode(self.encoding), csetch)
     350          if (csetch, unich) not in self.pass_dectest:
     351              self.assertEqual(str(csetch, self.encoding), unich)
     352  
     353      def test_errorhandle(self):
     354          for source, scheme, expected in self.codectests:
     355              if isinstance(source, bytes):
     356                  func = source.decode
     357              else:
     358                  func = source.encode
     359              if expected:
     360                  if isinstance(source, bytes):
     361                      result = func(self.encoding, scheme)
     362                      self.assertTrue(type(result) is str, type(result))
     363                      self.assertEqual(result, expected,
     364                                       '%a.decode(%r, %r)=%a != %a'
     365                                       % (source, self.encoding, scheme, result,
     366                                          expected))
     367                  else:
     368                      result = func(self.encoding, scheme)
     369                      self.assertTrue(type(result) is bytes, type(result))
     370                      self.assertEqual(result, expected,
     371                                       '%a.encode(%r, %r)=%a != %a'
     372                                       % (source, self.encoding, scheme, result,
     373                                          expected))
     374              else:
     375                  self.assertRaises(UnicodeError, func, self.encoding, scheme)
     376  
     377  def load_teststring(name):
     378      dir = os.path.join(os.path.dirname(__file__), 'cjkencodings')
     379      with open(os.path.join(dir, name + '.txt'), 'rb') as f:
     380          encoded = f.read()
     381      with open(os.path.join(dir, name + '-utf8.txt'), 'rb') as f:
     382          utf8 = f.read()
     383      return encoded, utf8