python (3.11.7)
       1  ######################## BEGIN LICENSE BLOCK ########################
       2  # The Original Code is mozilla.org code.
       3  #
       4  # The Initial Developer of the Original Code is
       5  # Netscape Communications Corporation.
       6  # Portions created by the Initial Developer are Copyright (C) 1998
       7  # the Initial Developer. All Rights Reserved.
       8  #
       9  # Contributor(s):
      10  #   Mark Pilgrim - port to Python
      11  #
      12  # This library is free software; you can redistribute it and/or
      13  # modify it under the terms of the GNU Lesser General Public
      14  # License as published by the Free Software Foundation; either
      15  # version 2.1 of the License, or (at your option) any later version.
      16  #
      17  # This library is distributed in the hope that it will be useful,
      18  # but WITHOUT ANY WARRANTY; without even the implied warranty of
      19  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
      20  # Lesser General Public License for more details.
      21  #
      22  # You should have received a copy of the GNU Lesser General Public
      23  # License along with this library; if not, write to the Free Software
      24  # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
      25  # 02110-1301  USA
      26  ######################### END LICENSE BLOCK #########################
      27  
      28  from typing import Union
      29  
      30  from .chardistribution import EUCJPDistributionAnalysis
      31  from .codingstatemachine import CodingStateMachine
      32  from .enums import MachineState, ProbingState
      33  from .jpcntx import EUCJPContextAnalysis
      34  from .mbcharsetprober import MultiByteCharSetProber
      35  from .mbcssm import EUCJP_SM_MODEL
      36  
      37  
      38  class ESC[4;38;5;81mEUCJPProber(ESC[4;38;5;149mMultiByteCharSetProber):
      39      def __init__(self) -> None:
      40          super().__init__()
      41          self.coding_sm = CodingStateMachine(EUCJP_SM_MODEL)
      42          self.distribution_analyzer = EUCJPDistributionAnalysis()
      43          self.context_analyzer = EUCJPContextAnalysis()
      44          self.reset()
      45  
      46      def reset(self) -> None:
      47          super().reset()
      48          self.context_analyzer.reset()
      49  
      50      @property
      51      def charset_name(self) -> str:
      52          return "EUC-JP"
      53  
      54      @property
      55      def language(self) -> str:
      56          return "Japanese"
      57  
      58      def feed(self, byte_str: Union[bytes, bytearray]) -> ProbingState:
      59          assert self.coding_sm is not None
      60          assert self.distribution_analyzer is not None
      61  
      62          for i, byte in enumerate(byte_str):
      63              # PY3K: byte_str is a byte array, so byte is an int, not a byte
      64              coding_state = self.coding_sm.next_state(byte)
      65              if coding_state == MachineState.ERROR:
      66                  self.logger.debug(
      67                      "%s %s prober hit error at byte %s",
      68                      self.charset_name,
      69                      self.language,
      70                      i,
      71                  )
      72                  self._state = ProbingState.NOT_ME
      73                  break
      74              if coding_state == MachineState.ITS_ME:
      75                  self._state = ProbingState.FOUND_IT
      76                  break
      77              if coding_state == MachineState.START:
      78                  char_len = self.coding_sm.get_current_charlen()
      79                  if i == 0:
      80                      self._last_char[1] = byte
      81                      self.context_analyzer.feed(self._last_char, char_len)
      82                      self.distribution_analyzer.feed(self._last_char, char_len)
      83                  else:
      84                      self.context_analyzer.feed(byte_str[i - 1 : i + 1], char_len)
      85                      self.distribution_analyzer.feed(byte_str[i - 1 : i + 1], char_len)
      86  
      87          self._last_char[0] = byte_str[-1]
      88  
      89          if self.state == ProbingState.DETECTING:
      90              if self.context_analyzer.got_enough_data() and (
      91                  self.get_confidence() > self.SHORTCUT_THRESHOLD
      92              ):
      93                  self._state = ProbingState.FOUND_IT
      94  
      95          return self.state
      96  
      97      def get_confidence(self) -> float:
      98          assert self.distribution_analyzer is not None
      99  
     100          context_conf = self.context_analyzer.get_confidence()
     101          distrib_conf = self.distribution_analyzer.get_confidence()
     102          return max(context_conf, distrib_conf)