(root)/
Python-3.11.7/
Lib/
asyncio/
queues.py
       1  __all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
       2  
       3  import collections
       4  import heapq
       5  from types import GenericAlias
       6  
       7  from . import locks
       8  from . import mixins
       9  
      10  
      11  class ESC[4;38;5;81mQueueEmpty(ESC[4;38;5;149mException):
      12      """Raised when Queue.get_nowait() is called on an empty Queue."""
      13      pass
      14  
      15  
      16  class ESC[4;38;5;81mQueueFull(ESC[4;38;5;149mException):
      17      """Raised when the Queue.put_nowait() method is called on a full Queue."""
      18      pass
      19  
      20  
      21  class ESC[4;38;5;81mQueue(ESC[4;38;5;149mmixinsESC[4;38;5;149m.ESC[4;38;5;149m_LoopBoundMixin):
      22      """A queue, useful for coordinating producer and consumer coroutines.
      23  
      24      If maxsize is less than or equal to zero, the queue size is infinite. If it
      25      is an integer greater than 0, then "await put()" will block when the
      26      queue reaches maxsize, until an item is removed by get().
      27  
      28      Unlike the standard library Queue, you can reliably know this Queue's size
      29      with qsize(), since your single-threaded asyncio application won't be
      30      interrupted between calling qsize() and doing an operation on the Queue.
      31      """
      32  
      33      def __init__(self, maxsize=0):
      34          self._maxsize = maxsize
      35  
      36          # Futures.
      37          self._getters = collections.deque()
      38          # Futures.
      39          self._putters = collections.deque()
      40          self._unfinished_tasks = 0
      41          self._finished = locks.Event()
      42          self._finished.set()
      43          self._init(maxsize)
      44  
      45      # These three are overridable in subclasses.
      46  
      47      def _init(self, maxsize):
      48          self._queue = collections.deque()
      49  
      50      def _get(self):
      51          return self._queue.popleft()
      52  
      53      def _put(self, item):
      54          self._queue.append(item)
      55  
      56      # End of the overridable methods.
      57  
      58      def _wakeup_next(self, waiters):
      59          # Wake up the next waiter (if any) that isn't cancelled.
      60          while waiters:
      61              waiter = waiters.popleft()
      62              if not waiter.done():
      63                  waiter.set_result(None)
      64                  break
      65  
      66      def __repr__(self):
      67          return f'<{type(self).__name__} at {id(self):#x} {self._format()}>'
      68  
      69      def __str__(self):
      70          return f'<{type(self).__name__} {self._format()}>'
      71  
      72      __class_getitem__ = classmethod(GenericAlias)
      73  
      74      def _format(self):
      75          result = f'maxsize={self._maxsize!r}'
      76          if getattr(self, '_queue', None):
      77              result += f' _queue={list(self._queue)!r}'
      78          if self._getters:
      79              result += f' _getters[{len(self._getters)}]'
      80          if self._putters:
      81              result += f' _putters[{len(self._putters)}]'
      82          if self._unfinished_tasks:
      83              result += f' tasks={self._unfinished_tasks}'
      84          return result
      85  
      86      def qsize(self):
      87          """Number of items in the queue."""
      88          return len(self._queue)
      89  
      90      @property
      91      def maxsize(self):
      92          """Number of items allowed in the queue."""
      93          return self._maxsize
      94  
      95      def empty(self):
      96          """Return True if the queue is empty, False otherwise."""
      97          return not self._queue
      98  
      99      def full(self):
     100          """Return True if there are maxsize items in the queue.
     101  
     102          Note: if the Queue was initialized with maxsize=0 (the default),
     103          then full() is never True.
     104          """
     105          if self._maxsize <= 0:
     106              return False
     107          else:
     108              return self.qsize() >= self._maxsize
     109  
     110      async def put(self, item):
     111          """Put an item into the queue.
     112  
     113          Put an item into the queue. If the queue is full, wait until a free
     114          slot is available before adding item.
     115          """
     116          while self.full():
     117              putter = self._get_loop().create_future()
     118              self._putters.append(putter)
     119              try:
     120                  await putter
     121              except:
     122                  putter.cancel()  # Just in case putter is not done yet.
     123                  try:
     124                      # Clean self._putters from canceled putters.
     125                      self._putters.remove(putter)
     126                  except ValueError:
     127                      # The putter could be removed from self._putters by a
     128                      # previous get_nowait call.
     129                      pass
     130                  if not self.full() and not putter.cancelled():
     131                      # We were woken up by get_nowait(), but can't take
     132                      # the call.  Wake up the next in line.
     133                      self._wakeup_next(self._putters)
     134                  raise
     135          return self.put_nowait(item)
     136  
     137      def put_nowait(self, item):
     138          """Put an item into the queue without blocking.
     139  
     140          If no free slot is immediately available, raise QueueFull.
     141          """
     142          if self.full():
     143              raise QueueFull
     144          self._put(item)
     145          self._unfinished_tasks += 1
     146          self._finished.clear()
     147          self._wakeup_next(self._getters)
     148  
     149      async def get(self):
     150          """Remove and return an item from the queue.
     151  
     152          If queue is empty, wait until an item is available.
     153          """
     154          while self.empty():
     155              getter = self._get_loop().create_future()
     156              self._getters.append(getter)
     157              try:
     158                  await getter
     159              except:
     160                  getter.cancel()  # Just in case getter is not done yet.
     161                  try:
     162                      # Clean self._getters from canceled getters.
     163                      self._getters.remove(getter)
     164                  except ValueError:
     165                      # The getter could be removed from self._getters by a
     166                      # previous put_nowait call.
     167                      pass
     168                  if not self.empty() and not getter.cancelled():
     169                      # We were woken up by put_nowait(), but can't take
     170                      # the call.  Wake up the next in line.
     171                      self._wakeup_next(self._getters)
     172                  raise
     173          return self.get_nowait()
     174  
     175      def get_nowait(self):
     176          """Remove and return an item from the queue.
     177  
     178          Return an item if one is immediately available, else raise QueueEmpty.
     179          """
     180          if self.empty():
     181              raise QueueEmpty
     182          item = self._get()
     183          self._wakeup_next(self._putters)
     184          return item
     185  
     186      def task_done(self):
     187          """Indicate that a formerly enqueued task is complete.
     188  
     189          Used by queue consumers. For each get() used to fetch a task,
     190          a subsequent call to task_done() tells the queue that the processing
     191          on the task is complete.
     192  
     193          If a join() is currently blocking, it will resume when all items have
     194          been processed (meaning that a task_done() call was received for every
     195          item that had been put() into the queue).
     196  
     197          Raises ValueError if called more times than there were items placed in
     198          the queue.
     199          """
     200          if self._unfinished_tasks <= 0:
     201              raise ValueError('task_done() called too many times')
     202          self._unfinished_tasks -= 1
     203          if self._unfinished_tasks == 0:
     204              self._finished.set()
     205  
     206      async def join(self):
     207          """Block until all items in the queue have been gotten and processed.
     208  
     209          The count of unfinished tasks goes up whenever an item is added to the
     210          queue. The count goes down whenever a consumer calls task_done() to
     211          indicate that the item was retrieved and all work on it is complete.
     212          When the count of unfinished tasks drops to zero, join() unblocks.
     213          """
     214          if self._unfinished_tasks > 0:
     215              await self._finished.wait()
     216  
     217  
     218  class ESC[4;38;5;81mPriorityQueue(ESC[4;38;5;149mQueue):
     219      """A subclass of Queue; retrieves entries in priority order (lowest first).
     220  
     221      Entries are typically tuples of the form: (priority number, data).
     222      """
     223  
     224      def _init(self, maxsize):
     225          self._queue = []
     226  
     227      def _put(self, item, heappush=heapq.heappush):
     228          heappush(self._queue, item)
     229  
     230      def _get(self, heappop=heapq.heappop):
     231          return heappop(self._queue)
     232  
     233  
     234  class ESC[4;38;5;81mLifoQueue(ESC[4;38;5;149mQueue):
     235      """A subclass of Queue that retrieves most recently added entries first."""
     236  
     237      def _init(self, maxsize):
     238          self._queue = []
     239  
     240      def _put(self, item):
     241          self._queue.append(item)
     242  
     243      def _get(self):
     244          return self._queue.pop()