python (3.11.7)
       1  import sys
       2  from threading import Event, RLock, Thread
       3  from types import TracebackType
       4  from typing import IO, Any, Callable, List, Optional, TextIO, Type, cast
       5  
       6  from . import get_console
       7  from .console import Console, ConsoleRenderable, RenderableType, RenderHook
       8  from .control import Control
       9  from .file_proxy import FileProxy
      10  from .jupyter import JupyterMixin
      11  from .live_render import LiveRender, VerticalOverflowMethod
      12  from .screen import Screen
      13  from .text import Text
      14  
      15  
      16  class ESC[4;38;5;81m_RefreshThread(ESC[4;38;5;149mThread):
      17      """A thread that calls refresh() at regular intervals."""
      18  
      19      def __init__(self, live: "Live", refresh_per_second: float) -> None:
      20          self.live = live
      21          self.refresh_per_second = refresh_per_second
      22          self.done = Event()
      23          super().__init__(daemon=True)
      24  
      25      def stop(self) -> None:
      26          self.done.set()
      27  
      28      def run(self) -> None:
      29          while not self.done.wait(1 / self.refresh_per_second):
      30              with self.live._lock:
      31                  if not self.done.is_set():
      32                      self.live.refresh()
      33  
      34  
      35  class ESC[4;38;5;81mLive(ESC[4;38;5;149mJupyterMixin, ESC[4;38;5;149mRenderHook):
      36      """Renders an auto-updating live display of any given renderable.
      37  
      38      Args:
      39          renderable (RenderableType, optional): The renderable to live display. Defaults to displaying nothing.
      40          console (Console, optional): Optional Console instance. Default will an internal Console instance writing to stdout.
      41          screen (bool, optional): Enable alternate screen mode. Defaults to False.
      42          auto_refresh (bool, optional): Enable auto refresh. If disabled, you will need to call `refresh()` or `update()` with refresh flag. Defaults to True
      43          refresh_per_second (float, optional): Number of times per second to refresh the live display. Defaults to 4.
      44          transient (bool, optional): Clear the renderable on exit (has no effect when screen=True). Defaults to False.
      45          redirect_stdout (bool, optional): Enable redirection of stdout, so ``print`` may be used. Defaults to True.
      46          redirect_stderr (bool, optional): Enable redirection of stderr. Defaults to True.
      47          vertical_overflow (VerticalOverflowMethod, optional): How to handle renderable when it is too tall for the console. Defaults to "ellipsis".
      48          get_renderable (Callable[[], RenderableType], optional): Optional callable to get renderable. Defaults to None.
      49      """
      50  
      51      def __init__(
      52          self,
      53          renderable: Optional[RenderableType] = None,
      54          *,
      55          console: Optional[Console] = None,
      56          screen: bool = False,
      57          auto_refresh: bool = True,
      58          refresh_per_second: float = 4,
      59          transient: bool = False,
      60          redirect_stdout: bool = True,
      61          redirect_stderr: bool = True,
      62          vertical_overflow: VerticalOverflowMethod = "ellipsis",
      63          get_renderable: Optional[Callable[[], RenderableType]] = None,
      64      ) -> None:
      65          assert refresh_per_second > 0, "refresh_per_second must be > 0"
      66          self._renderable = renderable
      67          self.console = console if console is not None else get_console()
      68          self._screen = screen
      69          self._alt_screen = False
      70  
      71          self._redirect_stdout = redirect_stdout
      72          self._redirect_stderr = redirect_stderr
      73          self._restore_stdout: Optional[IO[str]] = None
      74          self._restore_stderr: Optional[IO[str]] = None
      75  
      76          self._lock = RLock()
      77          self.ipy_widget: Optional[Any] = None
      78          self.auto_refresh = auto_refresh
      79          self._started: bool = False
      80          self.transient = True if screen else transient
      81  
      82          self._refresh_thread: Optional[_RefreshThread] = None
      83          self.refresh_per_second = refresh_per_second
      84  
      85          self.vertical_overflow = vertical_overflow
      86          self._get_renderable = get_renderable
      87          self._live_render = LiveRender(
      88              self.get_renderable(), vertical_overflow=vertical_overflow
      89          )
      90  
      91      @property
      92      def is_started(self) -> bool:
      93          """Check if live display has been started."""
      94          return self._started
      95  
      96      def get_renderable(self) -> RenderableType:
      97          renderable = (
      98              self._get_renderable()
      99              if self._get_renderable is not None
     100              else self._renderable
     101          )
     102          return renderable or ""
     103  
     104      def start(self, refresh: bool = False) -> None:
     105          """Start live rendering display.
     106  
     107          Args:
     108              refresh (bool, optional): Also refresh. Defaults to False.
     109          """
     110          with self._lock:
     111              if self._started:
     112                  return
     113              self.console.set_live(self)
     114              self._started = True
     115              if self._screen:
     116                  self._alt_screen = self.console.set_alt_screen(True)
     117              self.console.show_cursor(False)
     118              self._enable_redirect_io()
     119              self.console.push_render_hook(self)
     120              if refresh:
     121                  try:
     122                      self.refresh()
     123                  except Exception:
     124                      # If refresh fails, we want to stop the redirection of sys.stderr,
     125                      # so the error stacktrace is properly displayed in the terminal.
     126                      # (or, if the code that calls Rich captures the exception and wants to display something,
     127                      # let this be displayed in the terminal).
     128                      self.stop()
     129                      raise
     130              if self.auto_refresh:
     131                  self._refresh_thread = _RefreshThread(self, self.refresh_per_second)
     132                  self._refresh_thread.start()
     133  
     134      def stop(self) -> None:
     135          """Stop live rendering display."""
     136          with self._lock:
     137              if not self._started:
     138                  return
     139              self.console.clear_live()
     140              self._started = False
     141  
     142              if self.auto_refresh and self._refresh_thread is not None:
     143                  self._refresh_thread.stop()
     144                  self._refresh_thread = None
     145              # allow it to fully render on the last even if overflow
     146              self.vertical_overflow = "visible"
     147              with self.console:
     148                  try:
     149                      if not self._alt_screen and not self.console.is_jupyter:
     150                          self.refresh()
     151                  finally:
     152                      self._disable_redirect_io()
     153                      self.console.pop_render_hook()
     154                      if not self._alt_screen and self.console.is_terminal:
     155                          self.console.line()
     156                      self.console.show_cursor(True)
     157                      if self._alt_screen:
     158                          self.console.set_alt_screen(False)
     159  
     160                      if self.transient and not self._alt_screen:
     161                          self.console.control(self._live_render.restore_cursor())
     162                      if self.ipy_widget is not None and self.transient:
     163                          self.ipy_widget.close()  # pragma: no cover
     164  
     165      def __enter__(self) -> "Live":
     166          self.start(refresh=self._renderable is not None)
     167          return self
     168  
     169      def __exit__(
     170          self,
     171          exc_type: Optional[Type[BaseException]],
     172          exc_val: Optional[BaseException],
     173          exc_tb: Optional[TracebackType],
     174      ) -> None:
     175          self.stop()
     176  
     177      def _enable_redirect_io(self) -> None:
     178          """Enable redirecting of stdout / stderr."""
     179          if self.console.is_terminal or self.console.is_jupyter:
     180              if self._redirect_stdout and not isinstance(sys.stdout, FileProxy):
     181                  self._restore_stdout = sys.stdout
     182                  sys.stdout = cast("TextIO", FileProxy(self.console, sys.stdout))
     183              if self._redirect_stderr and not isinstance(sys.stderr, FileProxy):
     184                  self._restore_stderr = sys.stderr
     185                  sys.stderr = cast("TextIO", FileProxy(self.console, sys.stderr))
     186  
     187      def _disable_redirect_io(self) -> None:
     188          """Disable redirecting of stdout / stderr."""
     189          if self._restore_stdout:
     190              sys.stdout = cast("TextIO", self._restore_stdout)
     191              self._restore_stdout = None
     192          if self._restore_stderr:
     193              sys.stderr = cast("TextIO", self._restore_stderr)
     194              self._restore_stderr = None
     195  
     196      @property
     197      def renderable(self) -> RenderableType:
     198          """Get the renderable that is being displayed
     199  
     200          Returns:
     201              RenderableType: Displayed renderable.
     202          """
     203          renderable = self.get_renderable()
     204          return Screen(renderable) if self._alt_screen else renderable
     205  
     206      def update(self, renderable: RenderableType, *, refresh: bool = False) -> None:
     207          """Update the renderable that is being displayed
     208  
     209          Args:
     210              renderable (RenderableType): New renderable to use.
     211              refresh (bool, optional): Refresh the display. Defaults to False.
     212          """
     213          if isinstance(renderable, str):
     214              renderable = self.console.render_str(renderable)
     215          with self._lock:
     216              self._renderable = renderable
     217              if refresh:
     218                  self.refresh()
     219  
     220      def refresh(self) -> None:
     221          """Update the display of the Live Render."""
     222          with self._lock:
     223              self._live_render.set_renderable(self.renderable)
     224              if self.console.is_jupyter:  # pragma: no cover
     225                  try:
     226                      from IPython.display import display
     227                      from ipywidgets import Output
     228                  except ImportError:
     229                      import warnings
     230  
     231                      warnings.warn('install "ipywidgets" for Jupyter support')
     232                  else:
     233                      if self.ipy_widget is None:
     234                          self.ipy_widget = Output()
     235                          display(self.ipy_widget)
     236  
     237                      with self.ipy_widget:
     238                          self.ipy_widget.clear_output(wait=True)
     239                          self.console.print(self._live_render.renderable)
     240              elif self.console.is_terminal and not self.console.is_dumb_terminal:
     241                  with self.console:
     242                      self.console.print(Control())
     243              elif (
     244                  not self._started and not self.transient
     245              ):  # if it is finished allow files or dumb-terminals to see final result
     246                  with self.console:
     247                      self.console.print(Control())
     248  
     249      def process_renderables(
     250          self, renderables: List[ConsoleRenderable]
     251      ) -> List[ConsoleRenderable]:
     252          """Process renderables to restore cursor and display progress."""
     253          self._live_render.vertical_overflow = self.vertical_overflow
     254          if self.console.is_interactive:
     255              # lock needs acquiring as user can modify live_render renderable at any time unlike in Progress.
     256              with self._lock:
     257                  reset = (
     258                      Control.home()
     259                      if self._alt_screen
     260                      else self._live_render.position_cursor()
     261                  )
     262                  renderables = [reset, *renderables, self._live_render]
     263          elif (
     264              not self._started and not self.transient
     265          ):  # if it is finished render the final output for files or dumb_terminals
     266              renderables = [*renderables, self._live_render]
     267  
     268          return renderables
     269  
     270  
     271  if __name__ == "__main__":  # pragma: no cover
     272      import random
     273      import time
     274      from itertools import cycle
     275      from typing import Dict, List, Tuple
     276  
     277      from .align import Align
     278      from .console import Console
     279      from .live import Live as Live
     280      from .panel import Panel
     281      from .rule import Rule
     282      from .syntax import Syntax
     283      from .table import Table
     284  
     285      console = Console()
     286  
     287      syntax = Syntax(
     288          '''def loop_last(values: Iterable[T]) -> Iterable[Tuple[bool, T]]:
     289      """Iterate and generate a tuple with a flag for last value."""
     290      iter_values = iter(values)
     291      try:
     292          previous_value = next(iter_values)
     293      except StopIteration:
     294          return
     295      for value in iter_values:
     296          yield False, previous_value
     297          previous_value = value
     298      yield True, previous_value''',
     299          "python",
     300          line_numbers=True,
     301      )
     302  
     303      table = Table("foo", "bar", "baz")
     304      table.add_row("1", "2", "3")
     305  
     306      progress_renderables = [
     307          "You can make the terminal shorter and taller to see the live table hide"
     308          "Text may be printed while the progress bars are rendering.",
     309          Panel("In fact, [i]any[/i] renderable will work"),
     310          "Such as [magenta]tables[/]...",
     311          table,
     312          "Pretty printed structures...",
     313          {"type": "example", "text": "Pretty printed"},
     314          "Syntax...",
     315          syntax,
     316          Rule("Give it a try!"),
     317      ]
     318  
     319      examples = cycle(progress_renderables)
     320  
     321      exchanges = [
     322          "SGD",
     323          "MYR",
     324          "EUR",
     325          "USD",
     326          "AUD",
     327          "JPY",
     328          "CNH",
     329          "HKD",
     330          "CAD",
     331          "INR",
     332          "DKK",
     333          "GBP",
     334          "RUB",
     335          "NZD",
     336          "MXN",
     337          "IDR",
     338          "TWD",
     339          "THB",
     340          "VND",
     341      ]
     342      with Live(console=console) as live_table:
     343          exchange_rate_dict: Dict[Tuple[str, str], float] = {}
     344  
     345          for index in range(100):
     346              select_exchange = exchanges[index % len(exchanges)]
     347  
     348              for exchange in exchanges:
     349                  if exchange == select_exchange:
     350                      continue
     351                  time.sleep(0.4)
     352                  if random.randint(0, 10) < 1:
     353                      console.log(next(examples))
     354                  exchange_rate_dict[(select_exchange, exchange)] = 200 / (
     355                      (random.random() * 320) + 1
     356                  )
     357                  if len(exchange_rate_dict) > len(exchanges) - 1:
     358                      exchange_rate_dict.pop(list(exchange_rate_dict.keys())[0])
     359                  table = Table(title="Exchange Rates")
     360  
     361                  table.add_column("Source Currency")
     362                  table.add_column("Destination Currency")
     363                  table.add_column("Exchange Rate")
     364  
     365                  for ((source, dest), exchange_rate) in exchange_rate_dict.items():
     366                      table.add_row(
     367                          source,
     368                          dest,
     369                          Text(
     370                              f"{exchange_rate:.4f}",
     371                              style="red" if exchange_rate < 1.0 else "green",
     372                          ),
     373                      )
     374  
     375                  live_table.update(Align.center(table))