(root)/
Python-3.11.7/
Lib/
queue.py
       1  '''A multi-producer, multi-consumer queue.'''
       2  
       3  import threading
       4  import types
       5  from collections import deque
       6  from heapq import heappush, heappop
       7  from time import monotonic as time
       8  try:
       9      from _queue import SimpleQueue
      10  except ImportError:
      11      SimpleQueue = None
      12  
      13  __all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue', 'SimpleQueue']
      14  
      15  
      16  try:
      17      from _queue import Empty
      18  except ImportError:
      19      class ESC[4;38;5;81mEmpty(ESC[4;38;5;149mException):
      20          'Exception raised by Queue.get(block=0)/get_nowait().'
      21          pass
      22  
      23  class ESC[4;38;5;81mFull(ESC[4;38;5;149mException):
      24      'Exception raised by Queue.put(block=0)/put_nowait().'
      25      pass
      26  
      27  
      28  class ESC[4;38;5;81mQueue:
      29      '''Create a queue object with a given maximum size.
      30  
      31      If maxsize is <= 0, the queue size is infinite.
      32      '''
      33  
      34      def __init__(self, maxsize=0):
      35          self.maxsize = maxsize
      36          self._init(maxsize)
      37  
      38          # mutex must be held whenever the queue is mutating.  All methods
      39          # that acquire mutex must release it before returning.  mutex
      40          # is shared between the three conditions, so acquiring and
      41          # releasing the conditions also acquires and releases mutex.
      42          self.mutex = threading.Lock()
      43  
      44          # Notify not_empty whenever an item is added to the queue; a
      45          # thread waiting to get is notified then.
      46          self.not_empty = threading.Condition(self.mutex)
      47  
      48          # Notify not_full whenever an item is removed from the queue;
      49          # a thread waiting to put is notified then.
      50          self.not_full = threading.Condition(self.mutex)
      51  
      52          # Notify all_tasks_done whenever the number of unfinished tasks
      53          # drops to zero; thread waiting to join() is notified to resume
      54          self.all_tasks_done = threading.Condition(self.mutex)
      55          self.unfinished_tasks = 0
      56  
      57      def task_done(self):
      58          '''Indicate that a formerly enqueued task is complete.
      59  
      60          Used by Queue consumer threads.  For each get() used to fetch a task,
      61          a subsequent call to task_done() tells the queue that the processing
      62          on the task is complete.
      63  
      64          If a join() is currently blocking, it will resume when all items
      65          have been processed (meaning that a task_done() call was received
      66          for every item that had been put() into the queue).
      67  
      68          Raises a ValueError if called more times than there were items
      69          placed in the queue.
      70          '''
      71          with self.all_tasks_done:
      72              unfinished = self.unfinished_tasks - 1
      73              if unfinished <= 0:
      74                  if unfinished < 0:
      75                      raise ValueError('task_done() called too many times')
      76                  self.all_tasks_done.notify_all()
      77              self.unfinished_tasks = unfinished
      78  
      79      def join(self):
      80          '''Blocks until all items in the Queue have been gotten and processed.
      81  
      82          The count of unfinished tasks goes up whenever an item is added to the
      83          queue. The count goes down whenever a consumer thread calls task_done()
      84          to indicate the item was retrieved and all work on it is complete.
      85  
      86          When the count of unfinished tasks drops to zero, join() unblocks.
      87          '''
      88          with self.all_tasks_done:
      89              while self.unfinished_tasks:
      90                  self.all_tasks_done.wait()
      91  
      92      def qsize(self):
      93          '''Return the approximate size of the queue (not reliable!).'''
      94          with self.mutex:
      95              return self._qsize()
      96  
      97      def empty(self):
      98          '''Return True if the queue is empty, False otherwise (not reliable!).
      99  
     100          This method is likely to be removed at some point.  Use qsize() == 0
     101          as a direct substitute, but be aware that either approach risks a race
     102          condition where a queue can grow before the result of empty() or
     103          qsize() can be used.
     104  
     105          To create code that needs to wait for all queued tasks to be
     106          completed, the preferred technique is to use the join() method.
     107          '''
     108          with self.mutex:
     109              return not self._qsize()
     110  
     111      def full(self):
     112          '''Return True if the queue is full, False otherwise (not reliable!).
     113  
     114          This method is likely to be removed at some point.  Use qsize() >= n
     115          as a direct substitute, but be aware that either approach risks a race
     116          condition where a queue can shrink before the result of full() or
     117          qsize() can be used.
     118          '''
     119          with self.mutex:
     120              return 0 < self.maxsize <= self._qsize()
     121  
     122      def put(self, item, block=True, timeout=None):
     123          '''Put an item into the queue.
     124  
     125          If optional args 'block' is true and 'timeout' is None (the default),
     126          block if necessary until a free slot is available. If 'timeout' is
     127          a non-negative number, it blocks at most 'timeout' seconds and raises
     128          the Full exception if no free slot was available within that time.
     129          Otherwise ('block' is false), put an item on the queue if a free slot
     130          is immediately available, else raise the Full exception ('timeout'
     131          is ignored in that case).
     132          '''
     133          with self.not_full:
     134              if self.maxsize > 0:
     135                  if not block:
     136                      if self._qsize() >= self.maxsize:
     137                          raise Full
     138                  elif timeout is None:
     139                      while self._qsize() >= self.maxsize:
     140                          self.not_full.wait()
     141                  elif timeout < 0:
     142                      raise ValueError("'timeout' must be a non-negative number")
     143                  else:
     144                      endtime = time() + timeout
     145                      while self._qsize() >= self.maxsize:
     146                          remaining = endtime - time()
     147                          if remaining <= 0.0:
     148                              raise Full
     149                          self.not_full.wait(remaining)
     150              self._put(item)
     151              self.unfinished_tasks += 1
     152              self.not_empty.notify()
     153  
     154      def get(self, block=True, timeout=None):
     155          '''Remove and return an item from the queue.
     156  
     157          If optional args 'block' is true and 'timeout' is None (the default),
     158          block if necessary until an item is available. If 'timeout' is
     159          a non-negative number, it blocks at most 'timeout' seconds and raises
     160          the Empty exception if no item was available within that time.
     161          Otherwise ('block' is false), return an item if one is immediately
     162          available, else raise the Empty exception ('timeout' is ignored
     163          in that case).
     164          '''
     165          with self.not_empty:
     166              if not block:
     167                  if not self._qsize():
     168                      raise Empty
     169              elif timeout is None:
     170                  while not self._qsize():
     171                      self.not_empty.wait()
     172              elif timeout < 0:
     173                  raise ValueError("'timeout' must be a non-negative number")
     174              else:
     175                  endtime = time() + timeout
     176                  while not self._qsize():
     177                      remaining = endtime - time()
     178                      if remaining <= 0.0:
     179                          raise Empty
     180                      self.not_empty.wait(remaining)
     181              item = self._get()
     182              self.not_full.notify()
     183              return item
     184  
     185      def put_nowait(self, item):
     186          '''Put an item into the queue without blocking.
     187  
     188          Only enqueue the item if a free slot is immediately available.
     189          Otherwise raise the Full exception.
     190          '''
     191          return self.put(item, block=False)
     192  
     193      def get_nowait(self):
     194          '''Remove and return an item from the queue without blocking.
     195  
     196          Only get an item if one is immediately available. Otherwise
     197          raise the Empty exception.
     198          '''
     199          return self.get(block=False)
     200  
     201      # Override these methods to implement other queue organizations
     202      # (e.g. stack or priority queue).
     203      # These will only be called with appropriate locks held
     204  
     205      # Initialize the queue representation
     206      def _init(self, maxsize):
     207          self.queue = deque()
     208  
     209      def _qsize(self):
     210          return len(self.queue)
     211  
     212      # Put a new item in the queue
     213      def _put(self, item):
     214          self.queue.append(item)
     215  
     216      # Get an item from the queue
     217      def _get(self):
     218          return self.queue.popleft()
     219  
     220      __class_getitem__ = classmethod(types.GenericAlias)
     221  
     222  
     223  class ESC[4;38;5;81mPriorityQueue(ESC[4;38;5;149mQueue):
     224      '''Variant of Queue that retrieves open entries in priority order (lowest first).
     225  
     226      Entries are typically tuples of the form:  (priority number, data).
     227      '''
     228  
     229      def _init(self, maxsize):
     230          self.queue = []
     231  
     232      def _qsize(self):
     233          return len(self.queue)
     234  
     235      def _put(self, item):
     236          heappush(self.queue, item)
     237  
     238      def _get(self):
     239          return heappop(self.queue)
     240  
     241  
     242  class ESC[4;38;5;81mLifoQueue(ESC[4;38;5;149mQueue):
     243      '''Variant of Queue that retrieves most recently added entries first.'''
     244  
     245      def _init(self, maxsize):
     246          self.queue = []
     247  
     248      def _qsize(self):
     249          return len(self.queue)
     250  
     251      def _put(self, item):
     252          self.queue.append(item)
     253  
     254      def _get(self):
     255          return self.queue.pop()
     256  
     257  
     258  class ESC[4;38;5;81m_PySimpleQueue:
     259      '''Simple, unbounded FIFO queue.
     260  
     261      This pure Python implementation is not reentrant.
     262      '''
     263      # Note: while this pure Python version provides fairness
     264      # (by using a threading.Semaphore which is itself fair, being based
     265      #  on threading.Condition), fairness is not part of the API contract.
     266      # This allows the C version to use a different implementation.
     267  
     268      def __init__(self):
     269          self._queue = deque()
     270          self._count = threading.Semaphore(0)
     271  
     272      def put(self, item, block=True, timeout=None):
     273          '''Put the item on the queue.
     274  
     275          The optional 'block' and 'timeout' arguments are ignored, as this method
     276          never blocks.  They are provided for compatibility with the Queue class.
     277          '''
     278          self._queue.append(item)
     279          self._count.release()
     280  
     281      def get(self, block=True, timeout=None):
     282          '''Remove and return an item from the queue.
     283  
     284          If optional args 'block' is true and 'timeout' is None (the default),
     285          block if necessary until an item is available. If 'timeout' is
     286          a non-negative number, it blocks at most 'timeout' seconds and raises
     287          the Empty exception if no item was available within that time.
     288          Otherwise ('block' is false), return an item if one is immediately
     289          available, else raise the Empty exception ('timeout' is ignored
     290          in that case).
     291          '''
     292          if timeout is not None and timeout < 0:
     293              raise ValueError("'timeout' must be a non-negative number")
     294          if not self._count.acquire(block, timeout):
     295              raise Empty
     296          return self._queue.popleft()
     297  
     298      def put_nowait(self, item):
     299          '''Put an item into the queue without blocking.
     300  
     301          This is exactly equivalent to `put(item, block=False)` and is only provided
     302          for compatibility with the Queue class.
     303          '''
     304          return self.put(item, block=False)
     305  
     306      def get_nowait(self):
     307          '''Remove and return an item from the queue without blocking.
     308  
     309          Only get an item if one is immediately available. Otherwise
     310          raise the Empty exception.
     311          '''
     312          return self.get(block=False)
     313  
     314      def empty(self):
     315          '''Return True if the queue is empty, False otherwise (not reliable!).'''
     316          return len(self._queue) == 0
     317  
     318      def qsize(self):
     319          '''Return the approximate size of the queue (not reliable!).'''
     320          return len(self._queue)
     321  
     322      __class_getitem__ = classmethod(types.GenericAlias)
     323  
     324  
     325  if SimpleQueue is None:
     326      SimpleQueue = _PySimpleQueue