(root)/
Python-3.11.7/
Lib/
asyncio/
timeouts.py
       1  import enum
       2  
       3  from types import TracebackType
       4  from typing import final, Optional, Type
       5  
       6  from . import events
       7  from . import exceptions
       8  from . import tasks
       9  
      10  
      11  __all__ = (
      12      "Timeout",
      13      "timeout",
      14      "timeout_at",
      15  )
      16  
      17  
      18  class ESC[4;38;5;81m_State(ESC[4;38;5;149menumESC[4;38;5;149m.ESC[4;38;5;149mEnum):
      19      CREATED = "created"
      20      ENTERED = "active"
      21      EXPIRING = "expiring"
      22      EXPIRED = "expired"
      23      EXITED = "finished"
      24  
      25  
      26  @final
      27  class ESC[4;38;5;81mTimeout:
      28      """Asynchronous context manager for cancelling overdue coroutines.
      29  
      30      Use `timeout()` or `timeout_at()` rather than instantiating this class directly.
      31      """
      32  
      33      def __init__(self, when: Optional[float]) -> None:
      34          """Schedule a timeout that will trigger at a given loop time.
      35  
      36          - If `when` is `None`, the timeout will never trigger.
      37          - If `when < loop.time()`, the timeout will trigger on the next
      38            iteration of the event loop.
      39          """
      40          self._state = _State.CREATED
      41  
      42          self._timeout_handler: Optional[events.TimerHandle] = None
      43          self._task: Optional[tasks.Task] = None
      44          self._when = when
      45  
      46      def when(self) -> Optional[float]:
      47          """Return the current deadline."""
      48          return self._when
      49  
      50      def reschedule(self, when: Optional[float]) -> None:
      51          """Reschedule the timeout."""
      52          if self._state is not _State.ENTERED:
      53              if self._state is _State.CREATED:
      54                  raise RuntimeError("Timeout has not been entered")
      55              raise RuntimeError(
      56                  f"Cannot change state of {self._state.value} Timeout",
      57              )
      58  
      59          self._when = when
      60  
      61          if self._timeout_handler is not None:
      62              self._timeout_handler.cancel()
      63  
      64          if when is None:
      65              self._timeout_handler = None
      66          else:
      67              loop = events.get_running_loop()
      68              if when <= loop.time():
      69                  self._timeout_handler = loop.call_soon(self._on_timeout)
      70              else:
      71                  self._timeout_handler = loop.call_at(when, self._on_timeout)
      72  
      73      def expired(self) -> bool:
      74          """Is timeout expired during execution?"""
      75          return self._state in (_State.EXPIRING, _State.EXPIRED)
      76  
      77      def __repr__(self) -> str:
      78          info = ['']
      79          if self._state is _State.ENTERED:
      80              when = round(self._when, 3) if self._when is not None else None
      81              info.append(f"when={when}")
      82          info_str = ' '.join(info)
      83          return f"<Timeout [{self._state.value}]{info_str}>"
      84  
      85      async def __aenter__(self) -> "Timeout":
      86          if self._state is not _State.CREATED:
      87              raise RuntimeError("Timeout has already been entered")
      88          task = tasks.current_task()
      89          if task is None:
      90              raise RuntimeError("Timeout should be used inside a task")
      91          self._state = _State.ENTERED
      92          self._task = task
      93          self._cancelling = self._task.cancelling()
      94          self.reschedule(self._when)
      95          return self
      96  
      97      async def __aexit__(
      98          self,
      99          exc_type: Optional[Type[BaseException]],
     100          exc_val: Optional[BaseException],
     101          exc_tb: Optional[TracebackType],
     102      ) -> Optional[bool]:
     103          assert self._state in (_State.ENTERED, _State.EXPIRING)
     104  
     105          if self._timeout_handler is not None:
     106              self._timeout_handler.cancel()
     107              self._timeout_handler = None
     108  
     109          if self._state is _State.EXPIRING:
     110              self._state = _State.EXPIRED
     111  
     112              if self._task.uncancel() <= self._cancelling and exc_type is exceptions.CancelledError:
     113                  # Since there are no new cancel requests, we're
     114                  # handling this.
     115                  raise TimeoutError from exc_val
     116          elif self._state is _State.ENTERED:
     117              self._state = _State.EXITED
     118  
     119          return None
     120  
     121      def _on_timeout(self) -> None:
     122          assert self._state is _State.ENTERED
     123          self._task.cancel()
     124          self._state = _State.EXPIRING
     125          # drop the reference early
     126          self._timeout_handler = None
     127  
     128  
     129  def timeout(delay: Optional[float]) -> Timeout:
     130      """Timeout async context manager.
     131  
     132      Useful in cases when you want to apply timeout logic around block
     133      of code or in cases when asyncio.wait_for is not suitable. For example:
     134  
     135      >>> async with asyncio.timeout(10):  # 10 seconds timeout
     136      ...     await long_running_task()
     137  
     138  
     139      delay - value in seconds or None to disable timeout logic
     140  
     141      long_running_task() is interrupted by raising asyncio.CancelledError,
     142      the top-most affected timeout() context manager converts CancelledError
     143      into TimeoutError.
     144      """
     145      loop = events.get_running_loop()
     146      return Timeout(loop.time() + delay if delay is not None else None)
     147  
     148  
     149  def timeout_at(when: Optional[float]) -> Timeout:
     150      """Schedule the timeout at absolute time.
     151  
     152      Like timeout() but argument gives absolute time in the same clock system
     153      as loop.time().
     154  
     155      Please note: it is not POSIX time but a time with
     156      undefined starting base, e.g. the time of the system power on.
     157  
     158      >>> async with asyncio.timeout_at(loop.time() + 10):
     159      ...     await long_running_task()
     160  
     161  
     162      when - a deadline when timeout occurs or None to disable timeout logic
     163  
     164      long_running_task() is interrupted by raising asyncio.CancelledError,
     165      the top-most affected timeout() context manager converts CancelledError
     166      into TimeoutError.
     167      """
     168      return Timeout(when)