python (3.12.0)
1 ######################## BEGIN LICENSE BLOCK ########################
2 # The Original Code is Mozilla Universal charset detector code.
3 #
4 # The Initial Developer of the Original Code is
5 # Netscape Communications Corporation.
6 # Portions created by the Initial Developer are Copyright (C) 2001
7 # the Initial Developer. All Rights Reserved.
8 #
9 # Contributor(s):
10 # Mark Pilgrim - port to Python
11 # Shy Shalom - original C code
12 # Proofpoint, Inc.
13 #
14 # This library is free software; you can redistribute it and/or
15 # modify it under the terms of the GNU Lesser General Public
16 # License as published by the Free Software Foundation; either
17 # version 2.1 of the License, or (at your option) any later version.
18 #
19 # This library is distributed in the hope that it will be useful,
20 # but WITHOUT ANY WARRANTY; without even the implied warranty of
21 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
22 # Lesser General Public License for more details.
23 #
24 # You should have received a copy of the GNU Lesser General Public
25 # License along with this library; if not, write to the Free Software
26 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
27 # 02110-1301 USA
28 ######################### END LICENSE BLOCK #########################
29
30 from typing import Optional, Union
31
32 from .chardistribution import CharDistributionAnalysis
33 from .charsetprober import CharSetProber
34 from .codingstatemachine import CodingStateMachine
35 from .enums import LanguageFilter, MachineState, ProbingState
36
37
38 class ESC[4;38;5;81mMultiByteCharSetProber(ESC[4;38;5;149mCharSetProber):
39 """
40 MultiByteCharSetProber
41 """
42
43 def __init__(self, lang_filter: LanguageFilter = LanguageFilter.NONE) -> None:
44 super().__init__(lang_filter=lang_filter)
45 self.distribution_analyzer: Optional[CharDistributionAnalysis] = None
46 self.coding_sm: Optional[CodingStateMachine] = None
47 self._last_char = bytearray(b"\0\0")
48
49 def reset(self) -> None:
50 super().reset()
51 if self.coding_sm:
52 self.coding_sm.reset()
53 if self.distribution_analyzer:
54 self.distribution_analyzer.reset()
55 self._last_char = bytearray(b"\0\0")
56
57 def feed(self, byte_str: Union[bytes, bytearray]) -> ProbingState:
58 assert self.coding_sm is not None
59 assert self.distribution_analyzer is not None
60
61 for i, byte in enumerate(byte_str):
62 coding_state = self.coding_sm.next_state(byte)
63 if coding_state == MachineState.ERROR:
64 self.logger.debug(
65 "%s %s prober hit error at byte %s",
66 self.charset_name,
67 self.language,
68 i,
69 )
70 self._state = ProbingState.NOT_ME
71 break
72 if coding_state == MachineState.ITS_ME:
73 self._state = ProbingState.FOUND_IT
74 break
75 if coding_state == MachineState.START:
76 char_len = self.coding_sm.get_current_charlen()
77 if i == 0:
78 self._last_char[1] = byte
79 self.distribution_analyzer.feed(self._last_char, char_len)
80 else:
81 self.distribution_analyzer.feed(byte_str[i - 1 : i + 1], char_len)
82
83 self._last_char[0] = byte_str[-1]
84
85 if self.state == ProbingState.DETECTING:
86 if self.distribution_analyzer.got_enough_data() and (
87 self.get_confidence() > self.SHORTCUT_THRESHOLD
88 ):
89 self._state = ProbingState.FOUND_IT
90
91 return self.state
92
93 def get_confidence(self) -> float:
94 assert self.distribution_analyzer is not None
95 return self.distribution_analyzer.get_confidence()