(root)/
Python-3.12.0/
Lib/
asyncio/
base_tasks.py
       1  import linecache
       2  import reprlib
       3  import traceback
       4  
       5  from . import base_futures
       6  from . import coroutines
       7  
       8  
       9  def _task_repr_info(task):
      10      info = base_futures._future_repr_info(task)
      11  
      12      if task.cancelling() and not task.done():
      13          # replace status
      14          info[0] = 'cancelling'
      15  
      16      info.insert(1, 'name=%r' % task.get_name())
      17  
      18      if task._fut_waiter is not None:
      19          info.insert(2, f'wait_for={task._fut_waiter!r}')
      20  
      21      if task._coro:
      22          coro = coroutines._format_coroutine(task._coro)
      23          info.insert(2, f'coro=<{coro}>')
      24  
      25      return info
      26  
      27  
      28  @reprlib.recursive_repr()
      29  def _task_repr(task):
      30      info = ' '.join(_task_repr_info(task))
      31      return f'<{task.__class__.__name__} {info}>'
      32  
      33  
      34  def _task_get_stack(task, limit):
      35      frames = []
      36      if hasattr(task._coro, 'cr_frame'):
      37          # case 1: 'async def' coroutines
      38          f = task._coro.cr_frame
      39      elif hasattr(task._coro, 'gi_frame'):
      40          # case 2: legacy coroutines
      41          f = task._coro.gi_frame
      42      elif hasattr(task._coro, 'ag_frame'):
      43          # case 3: async generators
      44          f = task._coro.ag_frame
      45      else:
      46          # case 4: unknown objects
      47          f = None
      48      if f is not None:
      49          while f is not None:
      50              if limit is not None:
      51                  if limit <= 0:
      52                      break
      53                  limit -= 1
      54              frames.append(f)
      55              f = f.f_back
      56          frames.reverse()
      57      elif task._exception is not None:
      58          tb = task._exception.__traceback__
      59          while tb is not None:
      60              if limit is not None:
      61                  if limit <= 0:
      62                      break
      63                  limit -= 1
      64              frames.append(tb.tb_frame)
      65              tb = tb.tb_next
      66      return frames
      67  
      68  
      69  def _task_print_stack(task, limit, file):
      70      extracted_list = []
      71      checked = set()
      72      for f in task.get_stack(limit=limit):
      73          lineno = f.f_lineno
      74          co = f.f_code
      75          filename = co.co_filename
      76          name = co.co_name
      77          if filename not in checked:
      78              checked.add(filename)
      79              linecache.checkcache(filename)
      80          line = linecache.getline(filename, lineno, f.f_globals)
      81          extracted_list.append((filename, lineno, name, line))
      82  
      83      exc = task._exception
      84      if not extracted_list:
      85          print(f'No stack for {task!r}', file=file)
      86      elif exc is not None:
      87          print(f'Traceback for {task!r} (most recent call last):', file=file)
      88      else:
      89          print(f'Stack for {task!r} (most recent call last):', file=file)
      90  
      91      traceback.print_list(extracted_list, file=file)
      92      if exc is not None:
      93          for line in traceback.format_exception_only(exc.__class__, exc):
      94              print(line, file=file, end='')