python (3.11.7)
       1  # util.py
       2  import inspect
       3  import warnings
       4  import types
       5  import collections
       6  import itertools
       7  from functools import lru_cache, wraps
       8  from typing import Callable, List, Union, Iterable, TypeVar, cast
       9  
      10  _bslash = chr(92)
      11  C = TypeVar("C", bound=Callable)
      12  
      13  
      14  class ESC[4;38;5;81m__config_flags:
      15      """Internal class for defining compatibility and debugging flags"""
      16  
      17      _all_names: List[str] = []
      18      _fixed_names: List[str] = []
      19      _type_desc = "configuration"
      20  
      21      @classmethod
      22      def _set(cls, dname, value):
      23          if dname in cls._fixed_names:
      24              warnings.warn(
      25                  f"{cls.__name__}.{dname} {cls._type_desc} is {str(getattr(cls, dname)).upper()}"
      26                  f" and cannot be overridden",
      27                  stacklevel=3,
      28              )
      29              return
      30          if dname in cls._all_names:
      31              setattr(cls, dname, value)
      32          else:
      33              raise ValueError(f"no such {cls._type_desc} {dname!r}")
      34  
      35      enable = classmethod(lambda cls, name: cls._set(name, True))
      36      disable = classmethod(lambda cls, name: cls._set(name, False))
      37  
      38  
      39  @lru_cache(maxsize=128)
      40  def col(loc: int, strg: str) -> int:
      41      """
      42      Returns current column within a string, counting newlines as line separators.
      43      The first column is number 1.
      44  
      45      Note: the default parsing behavior is to expand tabs in the input string
      46      before starting the parsing process.  See
      47      :class:`ParserElement.parse_string` for more
      48      information on parsing strings containing ``<TAB>`` s, and suggested
      49      methods to maintain a consistent view of the parsed string, the parse
      50      location, and line and column positions within the parsed string.
      51      """
      52      s = strg
      53      return 1 if 0 < loc < len(s) and s[loc - 1] == "\n" else loc - s.rfind("\n", 0, loc)
      54  
      55  
      56  @lru_cache(maxsize=128)
      57  def lineno(loc: int, strg: str) -> int:
      58      """Returns current line number within a string, counting newlines as line separators.
      59      The first line is number 1.
      60  
      61      Note - the default parsing behavior is to expand tabs in the input string
      62      before starting the parsing process.  See :class:`ParserElement.parse_string`
      63      for more information on parsing strings containing ``<TAB>`` s, and
      64      suggested methods to maintain a consistent view of the parsed string, the
      65      parse location, and line and column positions within the parsed string.
      66      """
      67      return strg.count("\n", 0, loc) + 1
      68  
      69  
      70  @lru_cache(maxsize=128)
      71  def line(loc: int, strg: str) -> str:
      72      """
      73      Returns the line of text containing loc within a string, counting newlines as line separators.
      74      """
      75      last_cr = strg.rfind("\n", 0, loc)
      76      next_cr = strg.find("\n", loc)
      77      return strg[last_cr + 1 : next_cr] if next_cr >= 0 else strg[last_cr + 1 :]
      78  
      79  
      80  class ESC[4;38;5;81m_UnboundedCache:
      81      def __init__(self):
      82          cache = {}
      83          cache_get = cache.get
      84          self.not_in_cache = not_in_cache = object()
      85  
      86          def get(_, key):
      87              return cache_get(key, not_in_cache)
      88  
      89          def set_(_, key, value):
      90              cache[key] = value
      91  
      92          def clear(_):
      93              cache.clear()
      94  
      95          self.size = None
      96          self.get = types.MethodType(get, self)
      97          self.set = types.MethodType(set_, self)
      98          self.clear = types.MethodType(clear, self)
      99  
     100  
     101  class ESC[4;38;5;81m_FifoCache:
     102      def __init__(self, size):
     103          self.not_in_cache = not_in_cache = object()
     104          cache = {}
     105          keyring = [object()] * size
     106          cache_get = cache.get
     107          cache_pop = cache.pop
     108          keyiter = itertools.cycle(range(size))
     109  
     110          def get(_, key):
     111              return cache_get(key, not_in_cache)
     112  
     113          def set_(_, key, value):
     114              cache[key] = value
     115              i = next(keyiter)
     116              cache_pop(keyring[i], None)
     117              keyring[i] = key
     118  
     119          def clear(_):
     120              cache.clear()
     121              keyring[:] = [object()] * size
     122  
     123          self.size = size
     124          self.get = types.MethodType(get, self)
     125          self.set = types.MethodType(set_, self)
     126          self.clear = types.MethodType(clear, self)
     127  
     128  
     129  class ESC[4;38;5;81mLRUMemo:
     130      """
     131      A memoizing mapping that retains `capacity` deleted items
     132  
     133      The memo tracks retained items by their access order; once `capacity` items
     134      are retained, the least recently used item is discarded.
     135      """
     136  
     137      def __init__(self, capacity):
     138          self._capacity = capacity
     139          self._active = {}
     140          self._memory = collections.OrderedDict()
     141  
     142      def __getitem__(self, key):
     143          try:
     144              return self._active[key]
     145          except KeyError:
     146              self._memory.move_to_end(key)
     147              return self._memory[key]
     148  
     149      def __setitem__(self, key, value):
     150          self._memory.pop(key, None)
     151          self._active[key] = value
     152  
     153      def __delitem__(self, key):
     154          try:
     155              value = self._active.pop(key)
     156          except KeyError:
     157              pass
     158          else:
     159              while len(self._memory) >= self._capacity:
     160                  self._memory.popitem(last=False)
     161              self._memory[key] = value
     162  
     163      def clear(self):
     164          self._active.clear()
     165          self._memory.clear()
     166  
     167  
     168  class ESC[4;38;5;81mUnboundedMemo(ESC[4;38;5;149mdict):
     169      """
     170      A memoizing mapping that retains all deleted items
     171      """
     172  
     173      def __delitem__(self, key):
     174          pass
     175  
     176  
     177  def _escape_regex_range_chars(s: str) -> str:
     178      # escape these chars: ^-[]
     179      for c in r"\^-[]":
     180          s = s.replace(c, _bslash + c)
     181      s = s.replace("\n", r"\n")
     182      s = s.replace("\t", r"\t")
     183      return str(s)
     184  
     185  
     186  def _collapse_string_to_ranges(
     187      s: Union[str, Iterable[str]], re_escape: bool = True
     188  ) -> str:
     189      def is_consecutive(c):
     190          c_int = ord(c)
     191          is_consecutive.prev, prev = c_int, is_consecutive.prev
     192          if c_int - prev > 1:
     193              is_consecutive.value = next(is_consecutive.counter)
     194          return is_consecutive.value
     195  
     196      is_consecutive.prev = 0  # type: ignore [attr-defined]
     197      is_consecutive.counter = itertools.count()  # type: ignore [attr-defined]
     198      is_consecutive.value = -1  # type: ignore [attr-defined]
     199  
     200      def escape_re_range_char(c):
     201          return "\\" + c if c in r"\^-][" else c
     202  
     203      def no_escape_re_range_char(c):
     204          return c
     205  
     206      if not re_escape:
     207          escape_re_range_char = no_escape_re_range_char
     208  
     209      ret = []
     210      s = "".join(sorted(set(s)))
     211      if len(s) > 3:
     212          for _, chars in itertools.groupby(s, key=is_consecutive):
     213              first = last = next(chars)
     214              last = collections.deque(
     215                  itertools.chain(iter([last]), chars), maxlen=1
     216              ).pop()
     217              if first == last:
     218                  ret.append(escape_re_range_char(first))
     219              else:
     220                  sep = "" if ord(last) == ord(first) + 1 else "-"
     221                  ret.append(
     222                      f"{escape_re_range_char(first)}{sep}{escape_re_range_char(last)}"
     223                  )
     224      else:
     225          ret = [escape_re_range_char(c) for c in s]
     226  
     227      return "".join(ret)
     228  
     229  
     230  def _flatten(ll: list) -> list:
     231      ret = []
     232      for i in ll:
     233          if isinstance(i, list):
     234              ret.extend(_flatten(i))
     235          else:
     236              ret.append(i)
     237      return ret
     238  
     239  
     240  def _make_synonym_function(compat_name: str, fn: C) -> C:
     241      # In a future version, uncomment the code in the internal _inner() functions
     242      # to begin emitting DeprecationWarnings.
     243  
     244      # Unwrap staticmethod/classmethod
     245      fn = getattr(fn, "__func__", fn)
     246  
     247      # (Presence of 'self' arg in signature is used by explain_exception() methods, so we take
     248      # some extra steps to add it if present in decorated function.)
     249      if "self" == list(inspect.signature(fn).parameters)[0]:
     250  
     251          @wraps(fn)
     252          def _inner(self, *args, **kwargs):
     253              # warnings.warn(
     254              #     f"Deprecated - use {fn.__name__}", DeprecationWarning, stacklevel=3
     255              # )
     256              return fn(self, *args, **kwargs)
     257  
     258      else:
     259  
     260          @wraps(fn)
     261          def _inner(*args, **kwargs):
     262              # warnings.warn(
     263              #     f"Deprecated - use {fn.__name__}", DeprecationWarning, stacklevel=3
     264              # )
     265              return fn(*args, **kwargs)
     266  
     267      _inner.__doc__ = f"""Deprecated - use :class:`{fn.__name__}`"""
     268      _inner.__name__ = compat_name
     269      _inner.__annotations__ = fn.__annotations__
     270      if isinstance(fn, types.FunctionType):
     271          _inner.__kwdefaults__ = fn.__kwdefaults__
     272      elif isinstance(fn, type) and hasattr(fn, "__init__"):
     273          _inner.__kwdefaults__ = fn.__init__.__kwdefaults__
     274      else:
     275          _inner.__kwdefaults__ = None
     276      _inner.__qualname__ = fn.__qualname__
     277      return cast(C, _inner)
     278  
     279  
     280  def replaced_by_pep8(fn: C) -> Callable[[Callable], C]:
     281      """
     282      Decorator for pre-PEP8 compatibility synonyms, to link them to the new function.
     283      """
     284      return lambda other: _make_synonym_function(other.__name__, fn)