python (3.11.7)

(root)/
lib/
python3.11/
site-packages/
pip/
_vendor/
rich/
progress.py
       1  import io
       2  import sys
       3  import typing
       4  import warnings
       5  from abc import ABC, abstractmethod
       6  from collections import deque
       7  from dataclasses import dataclass, field
       8  from datetime import timedelta
       9  from io import RawIOBase, UnsupportedOperation
      10  from math import ceil
      11  from mmap import mmap
      12  from operator import length_hint
      13  from os import PathLike, stat
      14  from threading import Event, RLock, Thread
      15  from types import TracebackType
      16  from typing import (
      17      Any,
      18      BinaryIO,
      19      Callable,
      20      ContextManager,
      21      Deque,
      22      Dict,
      23      Generic,
      24      Iterable,
      25      List,
      26      NamedTuple,
      27      NewType,
      28      Optional,
      29      Sequence,
      30      TextIO,
      31      Tuple,
      32      Type,
      33      TypeVar,
      34      Union,
      35  )
      36  
      37  if sys.version_info >= (3, 8):
      38      from typing import Literal
      39  else:
      40      from pip._vendor.typing_extensions import Literal  # pragma: no cover
      41  
      42  from . import filesize, get_console
      43  from .console import Console, Group, JustifyMethod, RenderableType
      44  from .highlighter import Highlighter
      45  from .jupyter import JupyterMixin
      46  from .live import Live
      47  from .progress_bar import ProgressBar
      48  from .spinner import Spinner
      49  from .style import StyleType
      50  from .table import Column, Table
      51  from .text import Text, TextType
      52  
      53  TaskID = NewType("TaskID", int)
      54  
      55  ProgressType = TypeVar("ProgressType")
      56  
      57  GetTimeCallable = Callable[[], float]
      58  
      59  
      60  _I = typing.TypeVar("_I", TextIO, BinaryIO)
      61  
      62  
      63  class ESC[4;38;5;81m_TrackThread(ESC[4;38;5;149mThread):
      64      """A thread to periodically update progress."""
      65  
      66      def __init__(self, progress: "Progress", task_id: "TaskID", update_period: float):
      67          self.progress = progress
      68          self.task_id = task_id
      69          self.update_period = update_period
      70          self.done = Event()
      71  
      72          self.completed = 0
      73          super().__init__()
      74  
      75      def run(self) -> None:
      76          task_id = self.task_id
      77          advance = self.progress.advance
      78          update_period = self.update_period
      79          last_completed = 0
      80          wait = self.done.wait
      81          while not wait(update_period):
      82              completed = self.completed
      83              if last_completed != completed:
      84                  advance(task_id, completed - last_completed)
      85                  last_completed = completed
      86  
      87          self.progress.update(self.task_id, completed=self.completed, refresh=True)
      88  
      89      def __enter__(self) -> "_TrackThread":
      90          self.start()
      91          return self
      92  
      93      def __exit__(
      94          self,
      95          exc_type: Optional[Type[BaseException]],
      96          exc_val: Optional[BaseException],
      97          exc_tb: Optional[TracebackType],
      98      ) -> None:
      99          self.done.set()
     100          self.join()
     101  
     102  
     103  def track(
     104      sequence: Union[Sequence[ProgressType], Iterable[ProgressType]],
     105      description: str = "Working...",
     106      total: Optional[float] = None,
     107      auto_refresh: bool = True,
     108      console: Optional[Console] = None,
     109      transient: bool = False,
     110      get_time: Optional[Callable[[], float]] = None,
     111      refresh_per_second: float = 10,
     112      style: StyleType = "bar.back",
     113      complete_style: StyleType = "bar.complete",
     114      finished_style: StyleType = "bar.finished",
     115      pulse_style: StyleType = "bar.pulse",
     116      update_period: float = 0.1,
     117      disable: bool = False,
     118      show_speed: bool = True,
     119  ) -> Iterable[ProgressType]:
     120      """Track progress by iterating over a sequence.
     121  
     122      Args:
     123          sequence (Iterable[ProgressType]): A sequence (must support "len") you wish to iterate over.
     124          description (str, optional): Description of task show next to progress bar. Defaults to "Working".
     125          total: (float, optional): Total number of steps. Default is len(sequence).
     126          auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True.
     127          transient: (bool, optional): Clear the progress on exit. Defaults to False.
     128          console (Console, optional): Console to write to. Default creates internal Console instance.
     129          refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10.
     130          style (StyleType, optional): Style for the bar background. Defaults to "bar.back".
     131          complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete".
     132          finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished".
     133          pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse".
     134          update_period (float, optional): Minimum time (in seconds) between calls to update(). Defaults to 0.1.
     135          disable (bool, optional): Disable display of progress.
     136          show_speed (bool, optional): Show speed if total isn't known. Defaults to True.
     137      Returns:
     138          Iterable[ProgressType]: An iterable of the values in the sequence.
     139  
     140      """
     141  
     142      columns: List["ProgressColumn"] = (
     143          [TextColumn("[progress.description]{task.description}")] if description else []
     144      )
     145      columns.extend(
     146          (
     147              BarColumn(
     148                  style=style,
     149                  complete_style=complete_style,
     150                  finished_style=finished_style,
     151                  pulse_style=pulse_style,
     152              ),
     153              TaskProgressColumn(show_speed=show_speed),
     154              TimeRemainingColumn(elapsed_when_finished=True),
     155          )
     156      )
     157      progress = Progress(
     158          *columns,
     159          auto_refresh=auto_refresh,
     160          console=console,
     161          transient=transient,
     162          get_time=get_time,
     163          refresh_per_second=refresh_per_second or 10,
     164          disable=disable,
     165      )
     166  
     167      with progress:
     168          yield from progress.track(
     169              sequence, total=total, description=description, update_period=update_period
     170          )
     171  
     172  
     173  class ESC[4;38;5;81m_Reader(ESC[4;38;5;149mRawIOBase, ESC[4;38;5;149mBinaryIO):
     174      """A reader that tracks progress while it's being read from."""
     175  
     176      def __init__(
     177          self,
     178          handle: BinaryIO,
     179          progress: "Progress",
     180          task: TaskID,
     181          close_handle: bool = True,
     182      ) -> None:
     183          self.handle = handle
     184          self.progress = progress
     185          self.task = task
     186          self.close_handle = close_handle
     187          self._closed = False
     188  
     189      def __enter__(self) -> "_Reader":
     190          self.handle.__enter__()
     191          return self
     192  
     193      def __exit__(
     194          self,
     195          exc_type: Optional[Type[BaseException]],
     196          exc_val: Optional[BaseException],
     197          exc_tb: Optional[TracebackType],
     198      ) -> None:
     199          self.close()
     200  
     201      def __iter__(self) -> BinaryIO:
     202          return self
     203  
     204      def __next__(self) -> bytes:
     205          line = next(self.handle)
     206          self.progress.advance(self.task, advance=len(line))
     207          return line
     208  
     209      @property
     210      def closed(self) -> bool:
     211          return self._closed
     212  
     213      def fileno(self) -> int:
     214          return self.handle.fileno()
     215  
     216      def isatty(self) -> bool:
     217          return self.handle.isatty()
     218  
     219      @property
     220      def mode(self) -> str:
     221          return self.handle.mode
     222  
     223      @property
     224      def name(self) -> str:
     225          return self.handle.name
     226  
     227      def readable(self) -> bool:
     228          return self.handle.readable()
     229  
     230      def seekable(self) -> bool:
     231          return self.handle.seekable()
     232  
     233      def writable(self) -> bool:
     234          return False
     235  
     236      def read(self, size: int = -1) -> bytes:
     237          block = self.handle.read(size)
     238          self.progress.advance(self.task, advance=len(block))
     239          return block
     240  
     241      def readinto(self, b: Union[bytearray, memoryview, mmap]):  # type: ignore[no-untyped-def, override]
     242          n = self.handle.readinto(b)  # type: ignore[attr-defined]
     243          self.progress.advance(self.task, advance=n)
     244          return n
     245  
     246      def readline(self, size: int = -1) -> bytes:  # type: ignore[override]
     247          line = self.handle.readline(size)
     248          self.progress.advance(self.task, advance=len(line))
     249          return line
     250  
     251      def readlines(self, hint: int = -1) -> List[bytes]:
     252          lines = self.handle.readlines(hint)
     253          self.progress.advance(self.task, advance=sum(map(len, lines)))
     254          return lines
     255  
     256      def close(self) -> None:
     257          if self.close_handle:
     258              self.handle.close()
     259          self._closed = True
     260  
     261      def seek(self, offset: int, whence: int = 0) -> int:
     262          pos = self.handle.seek(offset, whence)
     263          self.progress.update(self.task, completed=pos)
     264          return pos
     265  
     266      def tell(self) -> int:
     267          return self.handle.tell()
     268  
     269      def write(self, s: Any) -> int:
     270          raise UnsupportedOperation("write")
     271  
     272  
     273  class ESC[4;38;5;81m_ReadContext(ESC[4;38;5;149mContextManager[_I], ESC[4;38;5;149mGeneric[_I]):
     274      """A utility class to handle a context for both a reader and a progress."""
     275  
     276      def __init__(self, progress: "Progress", reader: _I) -> None:
     277          self.progress = progress
     278          self.reader: _I = reader
     279  
     280      def __enter__(self) -> _I:
     281          self.progress.start()
     282          return self.reader.__enter__()
     283  
     284      def __exit__(
     285          self,
     286          exc_type: Optional[Type[BaseException]],
     287          exc_val: Optional[BaseException],
     288          exc_tb: Optional[TracebackType],
     289      ) -> None:
     290          self.progress.stop()
     291          self.reader.__exit__(exc_type, exc_val, exc_tb)
     292  
     293  
     294  def wrap_file(
     295      file: BinaryIO,
     296      total: int,
     297      *,
     298      description: str = "Reading...",
     299      auto_refresh: bool = True,
     300      console: Optional[Console] = None,
     301      transient: bool = False,
     302      get_time: Optional[Callable[[], float]] = None,
     303      refresh_per_second: float = 10,
     304      style: StyleType = "bar.back",
     305      complete_style: StyleType = "bar.complete",
     306      finished_style: StyleType = "bar.finished",
     307      pulse_style: StyleType = "bar.pulse",
     308      disable: bool = False,
     309  ) -> ContextManager[BinaryIO]:
     310      """Read bytes from a file while tracking progress.
     311  
     312      Args:
     313          file (Union[str, PathLike[str], BinaryIO]): The path to the file to read, or a file-like object in binary mode.
     314          total (int): Total number of bytes to read.
     315          description (str, optional): Description of task show next to progress bar. Defaults to "Reading".
     316          auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True.
     317          transient: (bool, optional): Clear the progress on exit. Defaults to False.
     318          console (Console, optional): Console to write to. Default creates internal Console instance.
     319          refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10.
     320          style (StyleType, optional): Style for the bar background. Defaults to "bar.back".
     321          complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete".
     322          finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished".
     323          pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse".
     324          disable (bool, optional): Disable display of progress.
     325      Returns:
     326          ContextManager[BinaryIO]: A context manager yielding a progress reader.
     327  
     328      """
     329  
     330      columns: List["ProgressColumn"] = (
     331          [TextColumn("[progress.description]{task.description}")] if description else []
     332      )
     333      columns.extend(
     334          (
     335              BarColumn(
     336                  style=style,
     337                  complete_style=complete_style,
     338                  finished_style=finished_style,
     339                  pulse_style=pulse_style,
     340              ),
     341              DownloadColumn(),
     342              TimeRemainingColumn(),
     343          )
     344      )
     345      progress = Progress(
     346          *columns,
     347          auto_refresh=auto_refresh,
     348          console=console,
     349          transient=transient,
     350          get_time=get_time,
     351          refresh_per_second=refresh_per_second or 10,
     352          disable=disable,
     353      )
     354  
     355      reader = progress.wrap_file(file, total=total, description=description)
     356      return _ReadContext(progress, reader)
     357  
     358  
     359  @typing.overload
     360  def open(
     361      file: Union[str, "PathLike[str]", bytes],
     362      mode: Union[Literal["rt"], Literal["r"]],
     363      buffering: int = -1,
     364      encoding: Optional[str] = None,
     365      errors: Optional[str] = None,
     366      newline: Optional[str] = None,
     367      *,
     368      total: Optional[int] = None,
     369      description: str = "Reading...",
     370      auto_refresh: bool = True,
     371      console: Optional[Console] = None,
     372      transient: bool = False,
     373      get_time: Optional[Callable[[], float]] = None,
     374      refresh_per_second: float = 10,
     375      style: StyleType = "bar.back",
     376      complete_style: StyleType = "bar.complete",
     377      finished_style: StyleType = "bar.finished",
     378      pulse_style: StyleType = "bar.pulse",
     379      disable: bool = False,
     380  ) -> ContextManager[TextIO]:
     381      pass
     382  
     383  
     384  @typing.overload
     385  def open(
     386      file: Union[str, "PathLike[str]", bytes],
     387      mode: Literal["rb"],
     388      buffering: int = -1,
     389      encoding: Optional[str] = None,
     390      errors: Optional[str] = None,
     391      newline: Optional[str] = None,
     392      *,
     393      total: Optional[int] = None,
     394      description: str = "Reading...",
     395      auto_refresh: bool = True,
     396      console: Optional[Console] = None,
     397      transient: bool = False,
     398      get_time: Optional[Callable[[], float]] = None,
     399      refresh_per_second: float = 10,
     400      style: StyleType = "bar.back",
     401      complete_style: StyleType = "bar.complete",
     402      finished_style: StyleType = "bar.finished",
     403      pulse_style: StyleType = "bar.pulse",
     404      disable: bool = False,
     405  ) -> ContextManager[BinaryIO]:
     406      pass
     407  
     408  
     409  def open(
     410      file: Union[str, "PathLike[str]", bytes],
     411      mode: Union[Literal["rb"], Literal["rt"], Literal["r"]] = "r",
     412      buffering: int = -1,
     413      encoding: Optional[str] = None,
     414      errors: Optional[str] = None,
     415      newline: Optional[str] = None,
     416      *,
     417      total: Optional[int] = None,
     418      description: str = "Reading...",
     419      auto_refresh: bool = True,
     420      console: Optional[Console] = None,
     421      transient: bool = False,
     422      get_time: Optional[Callable[[], float]] = None,
     423      refresh_per_second: float = 10,
     424      style: StyleType = "bar.back",
     425      complete_style: StyleType = "bar.complete",
     426      finished_style: StyleType = "bar.finished",
     427      pulse_style: StyleType = "bar.pulse",
     428      disable: bool = False,
     429  ) -> Union[ContextManager[BinaryIO], ContextManager[TextIO]]:
     430      """Read bytes from a file while tracking progress.
     431  
     432      Args:
     433          path (Union[str, PathLike[str], BinaryIO]): The path to the file to read, or a file-like object in binary mode.
     434          mode (str): The mode to use to open the file. Only supports "r", "rb" or "rt".
     435          buffering (int): The buffering strategy to use, see :func:`io.open`.
     436          encoding (str, optional): The encoding to use when reading in text mode, see :func:`io.open`.
     437          errors (str, optional): The error handling strategy for decoding errors, see :func:`io.open`.
     438          newline (str, optional): The strategy for handling newlines in text mode, see :func:`io.open`
     439          total: (int, optional): Total number of bytes to read. Must be provided if reading from a file handle. Default for a path is os.stat(file).st_size.
     440          description (str, optional): Description of task show next to progress bar. Defaults to "Reading".
     441          auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True.
     442          transient: (bool, optional): Clear the progress on exit. Defaults to False.
     443          console (Console, optional): Console to write to. Default creates internal Console instance.
     444          refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10.
     445          style (StyleType, optional): Style for the bar background. Defaults to "bar.back".
     446          complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete".
     447          finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished".
     448          pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse".
     449          disable (bool, optional): Disable display of progress.
     450          encoding (str, optional): The encoding to use when reading in text mode.
     451  
     452      Returns:
     453          ContextManager[BinaryIO]: A context manager yielding a progress reader.
     454  
     455      """
     456  
     457      columns: List["ProgressColumn"] = (
     458          [TextColumn("[progress.description]{task.description}")] if description else []
     459      )
     460      columns.extend(
     461          (
     462              BarColumn(
     463                  style=style,
     464                  complete_style=complete_style,
     465                  finished_style=finished_style,
     466                  pulse_style=pulse_style,
     467              ),
     468              DownloadColumn(),
     469              TimeRemainingColumn(),
     470          )
     471      )
     472      progress = Progress(
     473          *columns,
     474          auto_refresh=auto_refresh,
     475          console=console,
     476          transient=transient,
     477          get_time=get_time,
     478          refresh_per_second=refresh_per_second or 10,
     479          disable=disable,
     480      )
     481  
     482      reader = progress.open(
     483          file,
     484          mode=mode,
     485          buffering=buffering,
     486          encoding=encoding,
     487          errors=errors,
     488          newline=newline,
     489          total=total,
     490          description=description,
     491      )
     492      return _ReadContext(progress, reader)  # type: ignore[return-value, type-var]
     493  
     494  
     495  class ESC[4;38;5;81mProgressColumn(ESC[4;38;5;149mABC):
     496      """Base class for a widget to use in progress display."""
     497  
     498      max_refresh: Optional[float] = None
     499  
     500      def __init__(self, table_column: Optional[Column] = None) -> None:
     501          self._table_column = table_column
     502          self._renderable_cache: Dict[TaskID, Tuple[float, RenderableType]] = {}
     503          self._update_time: Optional[float] = None
     504  
     505      def get_table_column(self) -> Column:
     506          """Get a table column, used to build tasks table."""
     507          return self._table_column or Column()
     508  
     509      def __call__(self, task: "Task") -> RenderableType:
     510          """Called by the Progress object to return a renderable for the given task.
     511  
     512          Args:
     513              task (Task): An object containing information regarding the task.
     514  
     515          Returns:
     516              RenderableType: Anything renderable (including str).
     517          """
     518          current_time = task.get_time()
     519          if self.max_refresh is not None and not task.completed:
     520              try:
     521                  timestamp, renderable = self._renderable_cache[task.id]
     522              except KeyError:
     523                  pass
     524              else:
     525                  if timestamp + self.max_refresh > current_time:
     526                      return renderable
     527  
     528          renderable = self.render(task)
     529          self._renderable_cache[task.id] = (current_time, renderable)
     530          return renderable
     531  
     532      @abstractmethod
     533      def render(self, task: "Task") -> RenderableType:
     534          """Should return a renderable object."""
     535  
     536  
     537  class ESC[4;38;5;81mRenderableColumn(ESC[4;38;5;149mProgressColumn):
     538      """A column to insert an arbitrary column.
     539  
     540      Args:
     541          renderable (RenderableType, optional): Any renderable. Defaults to empty string.
     542      """
     543  
     544      def __init__(
     545          self, renderable: RenderableType = "", *, table_column: Optional[Column] = None
     546      ):
     547          self.renderable = renderable
     548          super().__init__(table_column=table_column)
     549  
     550      def render(self, task: "Task") -> RenderableType:
     551          return self.renderable
     552  
     553  
     554  class ESC[4;38;5;81mSpinnerColumn(ESC[4;38;5;149mProgressColumn):
     555      """A column with a 'spinner' animation.
     556  
     557      Args:
     558          spinner_name (str, optional): Name of spinner animation. Defaults to "dots".
     559          style (StyleType, optional): Style of spinner. Defaults to "progress.spinner".
     560          speed (float, optional): Speed factor of spinner. Defaults to 1.0.
     561          finished_text (TextType, optional): Text used when task is finished. Defaults to " ".
     562      """
     563  
     564      def __init__(
     565          self,
     566          spinner_name: str = "dots",
     567          style: Optional[StyleType] = "progress.spinner",
     568          speed: float = 1.0,
     569          finished_text: TextType = " ",
     570          table_column: Optional[Column] = None,
     571      ):
     572          self.spinner = Spinner(spinner_name, style=style, speed=speed)
     573          self.finished_text = (
     574              Text.from_markup(finished_text)
     575              if isinstance(finished_text, str)
     576              else finished_text
     577          )
     578          super().__init__(table_column=table_column)
     579  
     580      def set_spinner(
     581          self,
     582          spinner_name: str,
     583          spinner_style: Optional[StyleType] = "progress.spinner",
     584          speed: float = 1.0,
     585      ) -> None:
     586          """Set a new spinner.
     587  
     588          Args:
     589              spinner_name (str): Spinner name, see python -m rich.spinner.
     590              spinner_style (Optional[StyleType], optional): Spinner style. Defaults to "progress.spinner".
     591              speed (float, optional): Speed factor of spinner. Defaults to 1.0.
     592          """
     593          self.spinner = Spinner(spinner_name, style=spinner_style, speed=speed)
     594  
     595      def render(self, task: "Task") -> RenderableType:
     596          text = (
     597              self.finished_text
     598              if task.finished
     599              else self.spinner.render(task.get_time())
     600          )
     601          return text
     602  
     603  
     604  class ESC[4;38;5;81mTextColumn(ESC[4;38;5;149mProgressColumn):
     605      """A column containing text."""
     606  
     607      def __init__(
     608          self,
     609          text_format: str,
     610          style: StyleType = "none",
     611          justify: JustifyMethod = "left",
     612          markup: bool = True,
     613          highlighter: Optional[Highlighter] = None,
     614          table_column: Optional[Column] = None,
     615      ) -> None:
     616          self.text_format = text_format
     617          self.justify: JustifyMethod = justify
     618          self.style = style
     619          self.markup = markup
     620          self.highlighter = highlighter
     621          super().__init__(table_column=table_column or Column(no_wrap=True))
     622  
     623      def render(self, task: "Task") -> Text:
     624          _text = self.text_format.format(task=task)
     625          if self.markup:
     626              text = Text.from_markup(_text, style=self.style, justify=self.justify)
     627          else:
     628              text = Text(_text, style=self.style, justify=self.justify)
     629          if self.highlighter:
     630              self.highlighter.highlight(text)
     631          return text
     632  
     633  
     634  class ESC[4;38;5;81mBarColumn(ESC[4;38;5;149mProgressColumn):
     635      """Renders a visual progress bar.
     636  
     637      Args:
     638          bar_width (Optional[int], optional): Width of bar or None for full width. Defaults to 40.
     639          style (StyleType, optional): Style for the bar background. Defaults to "bar.back".
     640          complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete".
     641          finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished".
     642          pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse".
     643      """
     644  
     645      def __init__(
     646          self,
     647          bar_width: Optional[int] = 40,
     648          style: StyleType = "bar.back",
     649          complete_style: StyleType = "bar.complete",
     650          finished_style: StyleType = "bar.finished",
     651          pulse_style: StyleType = "bar.pulse",
     652          table_column: Optional[Column] = None,
     653      ) -> None:
     654          self.bar_width = bar_width
     655          self.style = style
     656          self.complete_style = complete_style
     657          self.finished_style = finished_style
     658          self.pulse_style = pulse_style
     659          super().__init__(table_column=table_column)
     660  
     661      def render(self, task: "Task") -> ProgressBar:
     662          """Gets a progress bar widget for a task."""
     663          return ProgressBar(
     664              total=max(0, task.total) if task.total is not None else None,
     665              completed=max(0, task.completed),
     666              width=None if self.bar_width is None else max(1, self.bar_width),
     667              pulse=not task.started,
     668              animation_time=task.get_time(),
     669              style=self.style,
     670              complete_style=self.complete_style,
     671              finished_style=self.finished_style,
     672              pulse_style=self.pulse_style,
     673          )
     674  
     675  
     676  class ESC[4;38;5;81mTimeElapsedColumn(ESC[4;38;5;149mProgressColumn):
     677      """Renders time elapsed."""
     678  
     679      def render(self, task: "Task") -> Text:
     680          """Show time elapsed."""
     681          elapsed = task.finished_time if task.finished else task.elapsed
     682          if elapsed is None:
     683              return Text("-:--:--", style="progress.elapsed")
     684          delta = timedelta(seconds=int(elapsed))
     685          return Text(str(delta), style="progress.elapsed")
     686  
     687  
     688  class ESC[4;38;5;81mTaskProgressColumn(ESC[4;38;5;149mTextColumn):
     689      """Show task progress as a percentage.
     690  
     691      Args:
     692          text_format (str, optional): Format for percentage display. Defaults to "[progress.percentage]{task.percentage:>3.0f}%".
     693          text_format_no_percentage (str, optional): Format if percentage is unknown. Defaults to "".
     694          style (StyleType, optional): Style of output. Defaults to "none".
     695          justify (JustifyMethod, optional): Text justification. Defaults to "left".
     696          markup (bool, optional): Enable markup. Defaults to True.
     697          highlighter (Optional[Highlighter], optional): Highlighter to apply to output. Defaults to None.
     698          table_column (Optional[Column], optional): Table Column to use. Defaults to None.
     699          show_speed (bool, optional): Show speed if total is unknown. Defaults to False.
     700      """
     701  
     702      def __init__(
     703          self,
     704          text_format: str = "[progress.percentage]{task.percentage:>3.0f}%",
     705          text_format_no_percentage: str = "",
     706          style: StyleType = "none",
     707          justify: JustifyMethod = "left",
     708          markup: bool = True,
     709          highlighter: Optional[Highlighter] = None,
     710          table_column: Optional[Column] = None,
     711          show_speed: bool = False,
     712      ) -> None:
     713  
     714          self.text_format_no_percentage = text_format_no_percentage
     715          self.show_speed = show_speed
     716          super().__init__(
     717              text_format=text_format,
     718              style=style,
     719              justify=justify,
     720              markup=markup,
     721              highlighter=highlighter,
     722              table_column=table_column,
     723          )
     724  
     725      @classmethod
     726      def render_speed(cls, speed: Optional[float]) -> Text:
     727          """Render the speed in iterations per second.
     728  
     729          Args:
     730              task (Task): A Task object.
     731  
     732          Returns:
     733              Text: Text object containing the task speed.
     734          """
     735          if speed is None:
     736              return Text("", style="progress.percentage")
     737          unit, suffix = filesize.pick_unit_and_suffix(
     738              int(speed),
     739              ["", "×10³", "×10⁶", "×10⁹", "×10¹²"],
     740              1000,
     741          )
     742          data_speed = speed / unit
     743          return Text(f"{data_speed:.1f}{suffix} it/s", style="progress.percentage")
     744  
     745      def render(self, task: "Task") -> Text:
     746          if task.total is None and self.show_speed:
     747              return self.render_speed(task.finished_speed or task.speed)
     748          text_format = (
     749              self.text_format_no_percentage if task.total is None else self.text_format
     750          )
     751          _text = text_format.format(task=task)
     752          if self.markup:
     753              text = Text.from_markup(_text, style=self.style, justify=self.justify)
     754          else:
     755              text = Text(_text, style=self.style, justify=self.justify)
     756          if self.highlighter:
     757              self.highlighter.highlight(text)
     758          return text
     759  
     760  
     761  class ESC[4;38;5;81mTimeRemainingColumn(ESC[4;38;5;149mProgressColumn):
     762      """Renders estimated time remaining.
     763  
     764      Args:
     765          compact (bool, optional): Render MM:SS when time remaining is less than an hour. Defaults to False.
     766          elapsed_when_finished (bool, optional): Render time elapsed when the task is finished. Defaults to False.
     767      """
     768  
     769      # Only refresh twice a second to prevent jitter
     770      max_refresh = 0.5
     771  
     772      def __init__(
     773          self,
     774          compact: bool = False,
     775          elapsed_when_finished: bool = False,
     776          table_column: Optional[Column] = None,
     777      ):
     778          self.compact = compact
     779          self.elapsed_when_finished = elapsed_when_finished
     780          super().__init__(table_column=table_column)
     781  
     782      def render(self, task: "Task") -> Text:
     783          """Show time remaining."""
     784          if self.elapsed_when_finished and task.finished:
     785              task_time = task.finished_time
     786              style = "progress.elapsed"
     787          else:
     788              task_time = task.time_remaining
     789              style = "progress.remaining"
     790  
     791          if task.total is None:
     792              return Text("", style=style)
     793  
     794          if task_time is None:
     795              return Text("--:--" if self.compact else "-:--:--", style=style)
     796  
     797          # Based on https://github.com/tqdm/tqdm/blob/master/tqdm/std.py
     798          minutes, seconds = divmod(int(task_time), 60)
     799          hours, minutes = divmod(minutes, 60)
     800  
     801          if self.compact and not hours:
     802              formatted = f"{minutes:02d}:{seconds:02d}"
     803          else:
     804              formatted = f"{hours:d}:{minutes:02d}:{seconds:02d}"
     805  
     806          return Text(formatted, style=style)
     807  
     808  
     809  class ESC[4;38;5;81mFileSizeColumn(ESC[4;38;5;149mProgressColumn):
     810      """Renders completed filesize."""
     811  
     812      def render(self, task: "Task") -> Text:
     813          """Show data completed."""
     814          data_size = filesize.decimal(int(task.completed))
     815          return Text(data_size, style="progress.filesize")
     816  
     817  
     818  class ESC[4;38;5;81mTotalFileSizeColumn(ESC[4;38;5;149mProgressColumn):
     819      """Renders total filesize."""
     820  
     821      def render(self, task: "Task") -> Text:
     822          """Show data completed."""
     823          data_size = filesize.decimal(int(task.total)) if task.total is not None else ""
     824          return Text(data_size, style="progress.filesize.total")
     825  
     826  
     827  class ESC[4;38;5;81mMofNCompleteColumn(ESC[4;38;5;149mProgressColumn):
     828      """Renders completed count/total, e.g. '  10/1000'.
     829  
     830      Best for bounded tasks with int quantities.
     831  
     832      Space pads the completed count so that progress length does not change as task progresses
     833      past powers of 10.
     834  
     835      Args:
     836          separator (str, optional): Text to separate completed and total values. Defaults to "/".
     837      """
     838  
     839      def __init__(self, separator: str = "/", table_column: Optional[Column] = None):
     840          self.separator = separator
     841          super().__init__(table_column=table_column)
     842  
     843      def render(self, task: "Task") -> Text:
     844          """Show completed/total."""
     845          completed = int(task.completed)
     846          total = int(task.total) if task.total is not None else "?"
     847          total_width = len(str(total))
     848          return Text(
     849              f"{completed:{total_width}d}{self.separator}{total}",
     850              style="progress.download",
     851          )
     852  
     853  
     854  class ESC[4;38;5;81mDownloadColumn(ESC[4;38;5;149mProgressColumn):
     855      """Renders file size downloaded and total, e.g. '0.5/2.3 GB'.
     856  
     857      Args:
     858          binary_units (bool, optional): Use binary units, KiB, MiB etc. Defaults to False.
     859      """
     860  
     861      def __init__(
     862          self, binary_units: bool = False, table_column: Optional[Column] = None
     863      ) -> None:
     864          self.binary_units = binary_units
     865          super().__init__(table_column=table_column)
     866  
     867      def render(self, task: "Task") -> Text:
     868          """Calculate common unit for completed and total."""
     869          completed = int(task.completed)
     870  
     871          unit_and_suffix_calculation_base = (
     872              int(task.total) if task.total is not None else completed
     873          )
     874          if self.binary_units:
     875              unit, suffix = filesize.pick_unit_and_suffix(
     876                  unit_and_suffix_calculation_base,
     877                  ["bytes", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"],
     878                  1024,
     879              )
     880          else:
     881              unit, suffix = filesize.pick_unit_and_suffix(
     882                  unit_and_suffix_calculation_base,
     883                  ["bytes", "kB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"],
     884                  1000,
     885              )
     886          precision = 0 if unit == 1 else 1
     887  
     888          completed_ratio = completed / unit
     889          completed_str = f"{completed_ratio:,.{precision}f}"
     890  
     891          if task.total is not None:
     892              total = int(task.total)
     893              total_ratio = total / unit
     894              total_str = f"{total_ratio:,.{precision}f}"
     895          else:
     896              total_str = "?"
     897  
     898          download_status = f"{completed_str}/{total_str} {suffix}"
     899          download_text = Text(download_status, style="progress.download")
     900          return download_text
     901  
     902  
     903  class ESC[4;38;5;81mTransferSpeedColumn(ESC[4;38;5;149mProgressColumn):
     904      """Renders human readable transfer speed."""
     905  
     906      def render(self, task: "Task") -> Text:
     907          """Show data transfer speed."""
     908          speed = task.finished_speed or task.speed
     909          if speed is None:
     910              return Text("?", style="progress.data.speed")
     911          data_speed = filesize.decimal(int(speed))
     912          return Text(f"{data_speed}/s", style="progress.data.speed")
     913  
     914  
     915  class ESC[4;38;5;81mProgressSample(ESC[4;38;5;149mNamedTuple):
     916      """Sample of progress for a given time."""
     917  
     918      timestamp: float
     919      """Timestamp of sample."""
     920      completed: float
     921      """Number of steps completed."""
     922  
     923  
     924  @dataclass
     925  class ESC[4;38;5;81mTask:
     926      """Information regarding a progress task.
     927  
     928      This object should be considered read-only outside of the :class:`~Progress` class.
     929  
     930      """
     931  
     932      id: TaskID
     933      """Task ID associated with this task (used in Progress methods)."""
     934  
     935      description: str
     936      """str: Description of the task."""
     937  
     938      total: Optional[float]
     939      """Optional[float]: Total number of steps in this task."""
     940  
     941      completed: float
     942      """float: Number of steps completed"""
     943  
     944      _get_time: GetTimeCallable
     945      """Callable to get the current time."""
     946  
     947      finished_time: Optional[float] = None
     948      """float: Time task was finished."""
     949  
     950      visible: bool = True
     951      """bool: Indicates if this task is visible in the progress display."""
     952  
     953      fields: Dict[str, Any] = field(default_factory=dict)
     954      """dict: Arbitrary fields passed in via Progress.update."""
     955  
     956      start_time: Optional[float] = field(default=None, init=False, repr=False)
     957      """Optional[float]: Time this task was started, or None if not started."""
     958  
     959      stop_time: Optional[float] = field(default=None, init=False, repr=False)
     960      """Optional[float]: Time this task was stopped, or None if not stopped."""
     961  
     962      finished_speed: Optional[float] = None
     963      """Optional[float]: The last speed for a finished task."""
     964  
     965      _progress: Deque[ProgressSample] = field(
     966          default_factory=lambda: deque(maxlen=1000), init=False, repr=False
     967      )
     968  
     969      _lock: RLock = field(repr=False, default_factory=RLock)
     970      """Thread lock."""
     971  
     972      def get_time(self) -> float:
     973          """float: Get the current time, in seconds."""
     974          return self._get_time()
     975  
     976      @property
     977      def started(self) -> bool:
     978          """bool: Check if the task as started."""
     979          return self.start_time is not None
     980  
     981      @property
     982      def remaining(self) -> Optional[float]:
     983          """Optional[float]: Get the number of steps remaining, if a non-None total was set."""
     984          if self.total is None:
     985              return None
     986          return self.total - self.completed
     987  
     988      @property
     989      def elapsed(self) -> Optional[float]:
     990          """Optional[float]: Time elapsed since task was started, or ``None`` if the task hasn't started."""
     991          if self.start_time is None:
     992              return None
     993          if self.stop_time is not None:
     994              return self.stop_time - self.start_time
     995          return self.get_time() - self.start_time
     996  
     997      @property
     998      def finished(self) -> bool:
     999          """Check if the task has finished."""
    1000          return self.finished_time is not None
    1001  
    1002      @property
    1003      def percentage(self) -> float:
    1004          """float: Get progress of task as a percentage. If a None total was set, returns 0"""
    1005          if not self.total:
    1006              return 0.0
    1007          completed = (self.completed / self.total) * 100.0
    1008          completed = min(100.0, max(0.0, completed))
    1009          return completed
    1010  
    1011      @property
    1012      def speed(self) -> Optional[float]:
    1013          """Optional[float]: Get the estimated speed in steps per second."""
    1014          if self.start_time is None:
    1015              return None
    1016          with self._lock:
    1017              progress = self._progress
    1018              if not progress:
    1019                  return None
    1020              total_time = progress[-1].timestamp - progress[0].timestamp
    1021              if total_time == 0:
    1022                  return None
    1023              iter_progress = iter(progress)
    1024              next(iter_progress)
    1025              total_completed = sum(sample.completed for sample in iter_progress)
    1026              speed = total_completed / total_time
    1027              return speed
    1028  
    1029      @property
    1030      def time_remaining(self) -> Optional[float]:
    1031          """Optional[float]: Get estimated time to completion, or ``None`` if no data."""
    1032          if self.finished:
    1033              return 0.0
    1034          speed = self.speed
    1035          if not speed:
    1036              return None
    1037          remaining = self.remaining
    1038          if remaining is None:
    1039              return None
    1040          estimate = ceil(remaining / speed)
    1041          return estimate
    1042  
    1043      def _reset(self) -> None:
    1044          """Reset progress."""
    1045          self._progress.clear()
    1046          self.finished_time = None
    1047          self.finished_speed = None
    1048  
    1049  
    1050  class ESC[4;38;5;81mProgress(ESC[4;38;5;149mJupyterMixin):
    1051      """Renders an auto-updating progress bar(s).
    1052  
    1053      Args:
    1054          console (Console, optional): Optional Console instance. Default will an internal Console instance writing to stdout.
    1055          auto_refresh (bool, optional): Enable auto refresh. If disabled, you will need to call `refresh()`.
    1056          refresh_per_second (Optional[float], optional): Number of times per second to refresh the progress information or None to use default (10). Defaults to None.
    1057          speed_estimate_period: (float, optional): Period (in seconds) used to calculate the speed estimate. Defaults to 30.
    1058          transient: (bool, optional): Clear the progress on exit. Defaults to False.
    1059          redirect_stdout: (bool, optional): Enable redirection of stdout, so ``print`` may be used. Defaults to True.
    1060          redirect_stderr: (bool, optional): Enable redirection of stderr. Defaults to True.
    1061          get_time: (Callable, optional): A callable that gets the current time, or None to use Console.get_time. Defaults to None.
    1062          disable (bool, optional): Disable progress display. Defaults to False
    1063          expand (bool, optional): Expand tasks table to fit width. Defaults to False.
    1064      """
    1065  
    1066      def __init__(
    1067          self,
    1068          *columns: Union[str, ProgressColumn],
    1069          console: Optional[Console] = None,
    1070          auto_refresh: bool = True,
    1071          refresh_per_second: float = 10,
    1072          speed_estimate_period: float = 30.0,
    1073          transient: bool = False,
    1074          redirect_stdout: bool = True,
    1075          redirect_stderr: bool = True,
    1076          get_time: Optional[GetTimeCallable] = None,
    1077          disable: bool = False,
    1078          expand: bool = False,
    1079      ) -> None:
    1080          assert refresh_per_second > 0, "refresh_per_second must be > 0"
    1081          self._lock = RLock()
    1082          self.columns = columns or self.get_default_columns()
    1083          self.speed_estimate_period = speed_estimate_period
    1084  
    1085          self.disable = disable
    1086          self.expand = expand
    1087          self._tasks: Dict[TaskID, Task] = {}
    1088          self._task_index: TaskID = TaskID(0)
    1089          self.live = Live(
    1090              console=console or get_console(),
    1091              auto_refresh=auto_refresh,
    1092              refresh_per_second=refresh_per_second,
    1093              transient=transient,
    1094              redirect_stdout=redirect_stdout,
    1095              redirect_stderr=redirect_stderr,
    1096              get_renderable=self.get_renderable,
    1097          )
    1098          self.get_time = get_time or self.console.get_time
    1099          self.print = self.console.print
    1100          self.log = self.console.log
    1101  
    1102      @classmethod
    1103      def get_default_columns(cls) -> Tuple[ProgressColumn, ...]:
    1104          """Get the default columns used for a new Progress instance:
    1105             - a text column for the description (TextColumn)
    1106             - the bar itself (BarColumn)
    1107             - a text column showing completion percentage (TextColumn)
    1108             - an estimated-time-remaining column (TimeRemainingColumn)
    1109          If the Progress instance is created without passing a columns argument,
    1110          the default columns defined here will be used.
    1111  
    1112          You can also create a Progress instance using custom columns before
    1113          and/or after the defaults, as in this example:
    1114  
    1115              progress = Progress(
    1116                  SpinnerColumn(),
    1117                  *Progress.default_columns(),
    1118                  "Elapsed:",
    1119                  TimeElapsedColumn(),
    1120              )
    1121  
    1122          This code shows the creation of a Progress display, containing
    1123          a spinner to the left, the default columns, and a labeled elapsed
    1124          time column.
    1125          """
    1126          return (
    1127              TextColumn("[progress.description]{task.description}"),
    1128              BarColumn(),
    1129              TaskProgressColumn(),
    1130              TimeRemainingColumn(),
    1131          )
    1132  
    1133      @property
    1134      def console(self) -> Console:
    1135          return self.live.console
    1136  
    1137      @property
    1138      def tasks(self) -> List[Task]:
    1139          """Get a list of Task instances."""
    1140          with self._lock:
    1141              return list(self._tasks.values())
    1142  
    1143      @property
    1144      def task_ids(self) -> List[TaskID]:
    1145          """A list of task IDs."""
    1146          with self._lock:
    1147              return list(self._tasks.keys())
    1148  
    1149      @property
    1150      def finished(self) -> bool:
    1151          """Check if all tasks have been completed."""
    1152          with self._lock:
    1153              if not self._tasks:
    1154                  return True
    1155              return all(task.finished for task in self._tasks.values())
    1156  
    1157      def start(self) -> None:
    1158          """Start the progress display."""
    1159          if not self.disable:
    1160              self.live.start(refresh=True)
    1161  
    1162      def stop(self) -> None:
    1163          """Stop the progress display."""
    1164          self.live.stop()
    1165          if not self.console.is_interactive:
    1166              self.console.print()
    1167  
    1168      def __enter__(self) -> "Progress":
    1169          self.start()
    1170          return self
    1171  
    1172      def __exit__(
    1173          self,
    1174          exc_type: Optional[Type[BaseException]],
    1175          exc_val: Optional[BaseException],
    1176          exc_tb: Optional[TracebackType],
    1177      ) -> None:
    1178          self.stop()
    1179  
    1180      def track(
    1181          self,
    1182          sequence: Union[Iterable[ProgressType], Sequence[ProgressType]],
    1183          total: Optional[float] = None,
    1184          task_id: Optional[TaskID] = None,
    1185          description: str = "Working...",
    1186          update_period: float = 0.1,
    1187      ) -> Iterable[ProgressType]:
    1188          """Track progress by iterating over a sequence.
    1189  
    1190          Args:
    1191              sequence (Sequence[ProgressType]): A sequence of values you want to iterate over and track progress.
    1192              total: (float, optional): Total number of steps. Default is len(sequence).
    1193              task_id: (TaskID): Task to track. Default is new task.
    1194              description: (str, optional): Description of task, if new task is created.
    1195              update_period (float, optional): Minimum time (in seconds) between calls to update(). Defaults to 0.1.
    1196  
    1197          Returns:
    1198              Iterable[ProgressType]: An iterable of values taken from the provided sequence.
    1199          """
    1200          if total is None:
    1201              total = float(length_hint(sequence)) or None
    1202  
    1203          if task_id is None:
    1204              task_id = self.add_task(description, total=total)
    1205          else:
    1206              self.update(task_id, total=total)
    1207  
    1208          if self.live.auto_refresh:
    1209              with _TrackThread(self, task_id, update_period) as track_thread:
    1210                  for value in sequence:
    1211                      yield value
    1212                      track_thread.completed += 1
    1213          else:
    1214              advance = self.advance
    1215              refresh = self.refresh
    1216              for value in sequence:
    1217                  yield value
    1218                  advance(task_id, 1)
    1219                  refresh()
    1220  
    1221      def wrap_file(
    1222          self,
    1223          file: BinaryIO,
    1224          total: Optional[int] = None,
    1225          *,
    1226          task_id: Optional[TaskID] = None,
    1227          description: str = "Reading...",
    1228      ) -> BinaryIO:
    1229          """Track progress file reading from a binary file.
    1230  
    1231          Args:
    1232              file (BinaryIO): A file-like object opened in binary mode.
    1233              total (int, optional): Total number of bytes to read. This must be provided unless a task with a total is also given.
    1234              task_id (TaskID): Task to track. Default is new task.
    1235              description (str, optional): Description of task, if new task is created.
    1236  
    1237          Returns:
    1238              BinaryIO: A readable file-like object in binary mode.
    1239  
    1240          Raises:
    1241              ValueError: When no total value can be extracted from the arguments or the task.
    1242          """
    1243          # attempt to recover the total from the task
    1244          total_bytes: Optional[float] = None
    1245          if total is not None:
    1246              total_bytes = total
    1247          elif task_id is not None:
    1248              with self._lock:
    1249                  total_bytes = self._tasks[task_id].total
    1250          if total_bytes is None:
    1251              raise ValueError(
    1252                  f"unable to get the total number of bytes, please specify 'total'"
    1253              )
    1254  
    1255          # update total of task or create new task
    1256          if task_id is None:
    1257              task_id = self.add_task(description, total=total_bytes)
    1258          else:
    1259              self.update(task_id, total=total_bytes)
    1260  
    1261          return _Reader(file, self, task_id, close_handle=False)
    1262  
    1263      @typing.overload
    1264      def open(
    1265          self,
    1266          file: Union[str, "PathLike[str]", bytes],
    1267          mode: Literal["rb"],
    1268          buffering: int = -1,
    1269          encoding: Optional[str] = None,
    1270          errors: Optional[str] = None,
    1271          newline: Optional[str] = None,
    1272          *,
    1273          total: Optional[int] = None,
    1274          task_id: Optional[TaskID] = None,
    1275          description: str = "Reading...",
    1276      ) -> BinaryIO:
    1277          pass
    1278  
    1279      @typing.overload
    1280      def open(
    1281          self,
    1282          file: Union[str, "PathLike[str]", bytes],
    1283          mode: Union[Literal["r"], Literal["rt"]],
    1284          buffering: int = -1,
    1285          encoding: Optional[str] = None,
    1286          errors: Optional[str] = None,
    1287          newline: Optional[str] = None,
    1288          *,
    1289          total: Optional[int] = None,
    1290          task_id: Optional[TaskID] = None,
    1291          description: str = "Reading...",
    1292      ) -> TextIO:
    1293          pass
    1294  
    1295      def open(
    1296          self,
    1297          file: Union[str, "PathLike[str]", bytes],
    1298          mode: Union[Literal["rb"], Literal["rt"], Literal["r"]] = "r",
    1299          buffering: int = -1,
    1300          encoding: Optional[str] = None,
    1301          errors: Optional[str] = None,
    1302          newline: Optional[str] = None,
    1303          *,
    1304          total: Optional[int] = None,
    1305          task_id: Optional[TaskID] = None,
    1306          description: str = "Reading...",
    1307      ) -> Union[BinaryIO, TextIO]:
    1308          """Track progress while reading from a binary file.
    1309  
    1310          Args:
    1311              path (Union[str, PathLike[str]]): The path to the file to read.
    1312              mode (str): The mode to use to open the file. Only supports "r", "rb" or "rt".
    1313              buffering (int): The buffering strategy to use, see :func:`io.open`.
    1314              encoding (str, optional): The encoding to use when reading in text mode, see :func:`io.open`.
    1315              errors (str, optional): The error handling strategy for decoding errors, see :func:`io.open`.
    1316              newline (str, optional): The strategy for handling newlines in text mode, see :func:`io.open`.
    1317              total (int, optional): Total number of bytes to read. If none given, os.stat(path).st_size is used.
    1318              task_id (TaskID): Task to track. Default is new task.
    1319              description (str, optional): Description of task, if new task is created.
    1320  
    1321          Returns:
    1322              BinaryIO: A readable file-like object in binary mode.
    1323  
    1324          Raises:
    1325              ValueError: When an invalid mode is given.
    1326          """
    1327          # normalize the mode (always rb, rt)
    1328          _mode = "".join(sorted(mode, reverse=False))
    1329          if _mode not in ("br", "rt", "r"):
    1330              raise ValueError("invalid mode {!r}".format(mode))
    1331  
    1332          # patch buffering to provide the same behaviour as the builtin `open`
    1333          line_buffering = buffering == 1
    1334          if _mode == "br" and buffering == 1:
    1335              warnings.warn(
    1336                  "line buffering (buffering=1) isn't supported in binary mode, the default buffer size will be used",
    1337                  RuntimeWarning,
    1338              )
    1339              buffering = -1
    1340          elif _mode in ("rt", "r"):
    1341              if buffering == 0:
    1342                  raise ValueError("can't have unbuffered text I/O")
    1343              elif buffering == 1:
    1344                  buffering = -1
    1345  
    1346          # attempt to get the total with `os.stat`
    1347          if total is None:
    1348              total = stat(file).st_size
    1349  
    1350          # update total of task or create new task
    1351          if task_id is None:
    1352              task_id = self.add_task(description, total=total)
    1353          else:
    1354              self.update(task_id, total=total)
    1355  
    1356          # open the file in binary mode,
    1357          handle = io.open(file, "rb", buffering=buffering)
    1358          reader = _Reader(handle, self, task_id, close_handle=True)
    1359  
    1360          # wrap the reader in a `TextIOWrapper` if text mode
    1361          if mode in ("r", "rt"):
    1362              return io.TextIOWrapper(
    1363                  reader,
    1364                  encoding=encoding,
    1365                  errors=errors,
    1366                  newline=newline,
    1367                  line_buffering=line_buffering,
    1368              )
    1369  
    1370          return reader
    1371  
    1372      def start_task(self, task_id: TaskID) -> None:
    1373          """Start a task.
    1374  
    1375          Starts a task (used when calculating elapsed time). You may need to call this manually,
    1376          if you called ``add_task`` with ``start=False``.
    1377  
    1378          Args:
    1379              task_id (TaskID): ID of task.
    1380          """
    1381          with self._lock:
    1382              task = self._tasks[task_id]
    1383              if task.start_time is None:
    1384                  task.start_time = self.get_time()
    1385  
    1386      def stop_task(self, task_id: TaskID) -> None:
    1387          """Stop a task.
    1388  
    1389          This will freeze the elapsed time on the task.
    1390  
    1391          Args:
    1392              task_id (TaskID): ID of task.
    1393          """
    1394          with self._lock:
    1395              task = self._tasks[task_id]
    1396              current_time = self.get_time()
    1397              if task.start_time is None:
    1398                  task.start_time = current_time
    1399              task.stop_time = current_time
    1400  
    1401      def update(
    1402          self,
    1403          task_id: TaskID,
    1404          *,
    1405          total: Optional[float] = None,
    1406          completed: Optional[float] = None,
    1407          advance: Optional[float] = None,
    1408          description: Optional[str] = None,
    1409          visible: Optional[bool] = None,
    1410          refresh: bool = False,
    1411          **fields: Any,
    1412      ) -> None:
    1413          """Update information associated with a task.
    1414  
    1415          Args:
    1416              task_id (TaskID): Task id (returned by add_task).
    1417              total (float, optional): Updates task.total if not None.
    1418              completed (float, optional): Updates task.completed if not None.
    1419              advance (float, optional): Add a value to task.completed if not None.
    1420              description (str, optional): Change task description if not None.
    1421              visible (bool, optional): Set visible flag if not None.
    1422              refresh (bool): Force a refresh of progress information. Default is False.
    1423              **fields (Any): Additional data fields required for rendering.
    1424          """
    1425          with self._lock:
    1426              task = self._tasks[task_id]
    1427              completed_start = task.completed
    1428  
    1429              if total is not None and total != task.total:
    1430                  task.total = total
    1431                  task._reset()
    1432              if advance is not None:
    1433                  task.completed += advance
    1434              if completed is not None:
    1435                  task.completed = completed
    1436              if description is not None:
    1437                  task.description = description
    1438              if visible is not None:
    1439                  task.visible = visible
    1440              task.fields.update(fields)
    1441              update_completed = task.completed - completed_start
    1442  
    1443              current_time = self.get_time()
    1444              old_sample_time = current_time - self.speed_estimate_period
    1445              _progress = task._progress
    1446  
    1447              popleft = _progress.popleft
    1448              while _progress and _progress[0].timestamp < old_sample_time:
    1449                  popleft()
    1450              if update_completed > 0:
    1451                  _progress.append(ProgressSample(current_time, update_completed))
    1452              if (
    1453                  task.total is not None
    1454                  and task.completed >= task.total
    1455                  and task.finished_time is None
    1456              ):
    1457                  task.finished_time = task.elapsed
    1458  
    1459          if refresh:
    1460              self.refresh()
    1461  
    1462      def reset(
    1463          self,
    1464          task_id: TaskID,
    1465          *,
    1466          start: bool = True,
    1467          total: Optional[float] = None,
    1468          completed: int = 0,
    1469          visible: Optional[bool] = None,
    1470          description: Optional[str] = None,
    1471          **fields: Any,
    1472      ) -> None:
    1473          """Reset a task so completed is 0 and the clock is reset.
    1474  
    1475          Args:
    1476              task_id (TaskID): ID of task.
    1477              start (bool, optional): Start the task after reset. Defaults to True.
    1478              total (float, optional): New total steps in task, or None to use current total. Defaults to None.
    1479              completed (int, optional): Number of steps completed. Defaults to 0.
    1480              visible (bool, optional): Enable display of the task. Defaults to True.
    1481              description (str, optional): Change task description if not None. Defaults to None.
    1482              **fields (str): Additional data fields required for rendering.
    1483          """
    1484          current_time = self.get_time()
    1485          with self._lock:
    1486              task = self._tasks[task_id]
    1487              task._reset()
    1488              task.start_time = current_time if start else None
    1489              if total is not None:
    1490                  task.total = total
    1491              task.completed = completed
    1492              if visible is not None:
    1493                  task.visible = visible
    1494              if fields:
    1495                  task.fields = fields
    1496              if description is not None:
    1497                  task.description = description
    1498              task.finished_time = None
    1499          self.refresh()
    1500  
    1501      def advance(self, task_id: TaskID, advance: float = 1) -> None:
    1502          """Advance task by a number of steps.
    1503  
    1504          Args:
    1505              task_id (TaskID): ID of task.
    1506              advance (float): Number of steps to advance. Default is 1.
    1507          """
    1508          current_time = self.get_time()
    1509          with self._lock:
    1510              task = self._tasks[task_id]
    1511              completed_start = task.completed
    1512              task.completed += advance
    1513              update_completed = task.completed - completed_start
    1514              old_sample_time = current_time - self.speed_estimate_period
    1515              _progress = task._progress
    1516  
    1517              popleft = _progress.popleft
    1518              while _progress and _progress[0].timestamp < old_sample_time:
    1519                  popleft()
    1520              while len(_progress) > 1000:
    1521                  popleft()
    1522              _progress.append(ProgressSample(current_time, update_completed))
    1523              if (
    1524                  task.total is not None
    1525                  and task.completed >= task.total
    1526                  and task.finished_time is None
    1527              ):
    1528                  task.finished_time = task.elapsed
    1529                  task.finished_speed = task.speed
    1530  
    1531      def refresh(self) -> None:
    1532          """Refresh (render) the progress information."""
    1533          if not self.disable and self.live.is_started:
    1534              self.live.refresh()
    1535  
    1536      def get_renderable(self) -> RenderableType:
    1537          """Get a renderable for the progress display."""
    1538          renderable = Group(*self.get_renderables())
    1539          return renderable
    1540  
    1541      def get_renderables(self) -> Iterable[RenderableType]:
    1542          """Get a number of renderables for the progress display."""
    1543          table = self.make_tasks_table(self.tasks)
    1544          yield table
    1545  
    1546      def make_tasks_table(self, tasks: Iterable[Task]) -> Table:
    1547          """Get a table to render the Progress display.
    1548  
    1549          Args:
    1550              tasks (Iterable[Task]): An iterable of Task instances, one per row of the table.
    1551  
    1552          Returns:
    1553              Table: A table instance.
    1554          """
    1555          table_columns = (
    1556              (
    1557                  Column(no_wrap=True)
    1558                  if isinstance(_column, str)
    1559                  else _column.get_table_column().copy()
    1560              )
    1561              for _column in self.columns
    1562          )
    1563          table = Table.grid(*table_columns, padding=(0, 1), expand=self.expand)
    1564  
    1565          for task in tasks:
    1566              if task.visible:
    1567                  table.add_row(
    1568                      *(
    1569                          (
    1570                              column.format(task=task)
    1571                              if isinstance(column, str)
    1572                              else column(task)
    1573                          )
    1574                          for column in self.columns
    1575                      )
    1576                  )
    1577          return table
    1578  
    1579      def __rich__(self) -> RenderableType:
    1580          """Makes the Progress class itself renderable."""
    1581          with self._lock:
    1582              return self.get_renderable()
    1583  
    1584      def add_task(
    1585          self,
    1586          description: str,
    1587          start: bool = True,
    1588          total: Optional[float] = 100.0,
    1589          completed: int = 0,
    1590          visible: bool = True,
    1591          **fields: Any,
    1592      ) -> TaskID:
    1593          """Add a new 'task' to the Progress display.
    1594  
    1595          Args:
    1596              description (str): A description of the task.
    1597              start (bool, optional): Start the task immediately (to calculate elapsed time). If set to False,
    1598                  you will need to call `start` manually. Defaults to True.
    1599              total (float, optional): Number of total steps in the progress if known.
    1600                  Set to None to render a pulsing animation. Defaults to 100.
    1601              completed (int, optional): Number of steps completed so far. Defaults to 0.
    1602              visible (bool, optional): Enable display of the task. Defaults to True.
    1603              **fields (str): Additional data fields required for rendering.
    1604  
    1605          Returns:
    1606              TaskID: An ID you can use when calling `update`.
    1607          """
    1608          with self._lock:
    1609              task = Task(
    1610                  self._task_index,
    1611                  description,
    1612                  total,
    1613                  completed,
    1614                  visible=visible,
    1615                  fields=fields,
    1616                  _get_time=self.get_time,
    1617                  _lock=self._lock,
    1618              )
    1619              self._tasks[self._task_index] = task
    1620              if start:
    1621                  self.start_task(self._task_index)
    1622              new_task_index = self._task_index
    1623              self._task_index = TaskID(int(self._task_index) + 1)
    1624          self.refresh()
    1625          return new_task_index
    1626  
    1627      def remove_task(self, task_id: TaskID) -> None:
    1628          """Delete a task if it exists.
    1629  
    1630          Args:
    1631              task_id (TaskID): A task ID.
    1632  
    1633          """
    1634          with self._lock:
    1635              del self._tasks[task_id]
    1636  
    1637  
    1638  if __name__ == "__main__":  # pragma: no coverage
    1639  
    1640      import random
    1641      import time
    1642  
    1643      from .panel import Panel
    1644      from .rule import Rule
    1645      from .syntax import Syntax
    1646      from .table import Table
    1647  
    1648      syntax = Syntax(
    1649          '''def loop_last(values: Iterable[T]) -> Iterable[Tuple[bool, T]]:
    1650      """Iterate and generate a tuple with a flag for last value."""
    1651      iter_values = iter(values)
    1652      try:
    1653          previous_value = next(iter_values)
    1654      except StopIteration:
    1655          return
    1656      for value in iter_values:
    1657          yield False, previous_value
    1658          previous_value = value
    1659      yield True, previous_value''',
    1660          "python",
    1661          line_numbers=True,
    1662      )
    1663  
    1664      table = Table("foo", "bar", "baz")
    1665      table.add_row("1", "2", "3")
    1666  
    1667      progress_renderables = [
    1668          "Text may be printed while the progress bars are rendering.",
    1669          Panel("In fact, [i]any[/i] renderable will work"),
    1670          "Such as [magenta]tables[/]...",
    1671          table,
    1672          "Pretty printed structures...",
    1673          {"type": "example", "text": "Pretty printed"},
    1674          "Syntax...",
    1675          syntax,
    1676          Rule("Give it a try!"),
    1677      ]
    1678  
    1679      from itertools import cycle
    1680  
    1681      examples = cycle(progress_renderables)
    1682  
    1683      console = Console(record=True)
    1684  
    1685      with Progress(
    1686          SpinnerColumn(),
    1687          *Progress.get_default_columns(),
    1688          TimeElapsedColumn(),
    1689          console=console,
    1690          transient=False,
    1691      ) as progress:
    1692  
    1693          task1 = progress.add_task("[red]Downloading", total=1000)
    1694          task2 = progress.add_task("[green]Processing", total=1000)
    1695          task3 = progress.add_task("[yellow]Thinking", total=None)
    1696  
    1697          while not progress.finished:
    1698              progress.update(task1, advance=0.5)
    1699              progress.update(task2, advance=0.3)
    1700              time.sleep(0.01)
    1701              if random.randint(0, 100) < 1:
    1702                  progress.log(next(examples))