(root)/
Python-3.12.0/
Lib/
test/
test_queue.py
       1  # Some simple queue module tests, plus some failure conditions
       2  # to ensure the Queue locks remain stable.
       3  import itertools
       4  import random
       5  import threading
       6  import time
       7  import unittest
       8  import weakref
       9  from test.support import gc_collect
      10  from test.support import import_helper
      11  from test.support import threading_helper
      12  
      13  # queue module depends on threading primitives
      14  threading_helper.requires_working_threading(module=True)
      15  
      16  py_queue = import_helper.import_fresh_module('queue', blocked=['_queue'])
      17  c_queue = import_helper.import_fresh_module('queue', fresh=['_queue'])
      18  need_c_queue = unittest.skipUnless(c_queue, "No _queue module found")
      19  
      20  QUEUE_SIZE = 5
      21  
      22  def qfull(q):
      23      return q.maxsize > 0 and q.qsize() == q.maxsize
      24  
      25  # A thread to run a function that unclogs a blocked Queue.
      26  class ESC[4;38;5;81m_TriggerThread(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
      27      def __init__(self, fn, args):
      28          self.fn = fn
      29          self.args = args
      30          self.startedEvent = threading.Event()
      31          threading.Thread.__init__(self)
      32  
      33      def run(self):
      34          # The sleep isn't necessary, but is intended to give the blocking
      35          # function in the main thread a chance at actually blocking before
      36          # we unclog it.  But if the sleep is longer than the timeout-based
      37          # tests wait in their blocking functions, those tests will fail.
      38          # So we give them much longer timeout values compared to the
      39          # sleep here (I aimed at 10 seconds for blocking functions --
      40          # they should never actually wait that long - they should make
      41          # progress as soon as we call self.fn()).
      42          time.sleep(0.1)
      43          self.startedEvent.set()
      44          self.fn(*self.args)
      45  
      46  
      47  # Execute a function that blocks, and in a separate thread, a function that
      48  # triggers the release.  Returns the result of the blocking function.  Caution:
      49  # block_func must guarantee to block until trigger_func is called, and
      50  # trigger_func must guarantee to change queue state so that block_func can make
      51  # enough progress to return.  In particular, a block_func that just raises an
      52  # exception regardless of whether trigger_func is called will lead to
      53  # timing-dependent sporadic failures, and one of those went rarely seen but
      54  # undiagnosed for years.  Now block_func must be unexceptional.  If block_func
      55  # is supposed to raise an exception, call do_exceptional_blocking_test()
      56  # instead.
      57  
      58  class ESC[4;38;5;81mBlockingTestMixin:
      59  
      60      def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
      61          thread = _TriggerThread(trigger_func, trigger_args)
      62          thread.start()
      63          try:
      64              self.result = block_func(*block_args)
      65              # If block_func returned before our thread made the call, we failed!
      66              if not thread.startedEvent.is_set():
      67                  self.fail("blocking function %r appeared not to block" %
      68                            block_func)
      69              return self.result
      70          finally:
      71              threading_helper.join_thread(thread) # make sure the thread terminates
      72  
      73      # Call this instead if block_func is supposed to raise an exception.
      74      def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
      75                                     trigger_args, expected_exception_class):
      76          thread = _TriggerThread(trigger_func, trigger_args)
      77          thread.start()
      78          try:
      79              try:
      80                  block_func(*block_args)
      81              except expected_exception_class:
      82                  raise
      83              else:
      84                  self.fail("expected exception of kind %r" %
      85                                   expected_exception_class)
      86          finally:
      87              threading_helper.join_thread(thread) # make sure the thread terminates
      88              if not thread.startedEvent.is_set():
      89                  self.fail("trigger thread ended but event never set")
      90  
      91  
      92  class ESC[4;38;5;81mBaseQueueTestMixin(ESC[4;38;5;149mBlockingTestMixin):
      93      def setUp(self):
      94          self.cum = 0
      95          self.cumlock = threading.Lock()
      96  
      97      def basic_queue_test(self, q):
      98          if q.qsize():
      99              raise RuntimeError("Call this function with an empty queue")
     100          self.assertTrue(q.empty())
     101          self.assertFalse(q.full())
     102          # I guess we better check things actually queue correctly a little :)
     103          q.put(111)
     104          q.put(333)
     105          q.put(222)
     106          target_order = dict(Queue = [111, 333, 222],
     107                              LifoQueue = [222, 333, 111],
     108                              PriorityQueue = [111, 222, 333])
     109          actual_order = [q.get(), q.get(), q.get()]
     110          self.assertEqual(actual_order, target_order[q.__class__.__name__],
     111                           "Didn't seem to queue the correct data!")
     112          for i in range(QUEUE_SIZE-1):
     113              q.put(i)
     114              self.assertTrue(q.qsize(), "Queue should not be empty")
     115          self.assertTrue(not qfull(q), "Queue should not be full")
     116          last = 2 * QUEUE_SIZE
     117          full = 3 * 2 * QUEUE_SIZE
     118          q.put(last)
     119          self.assertTrue(qfull(q), "Queue should be full")
     120          self.assertFalse(q.empty())
     121          self.assertTrue(q.full())
     122          try:
     123              q.put(full, block=0)
     124              self.fail("Didn't appear to block with a full queue")
     125          except self.queue.Full:
     126              pass
     127          try:
     128              q.put(full, timeout=0.01)
     129              self.fail("Didn't appear to time-out with a full queue")
     130          except self.queue.Full:
     131              pass
     132          # Test a blocking put
     133          self.do_blocking_test(q.put, (full,), q.get, ())
     134          self.do_blocking_test(q.put, (full, True, 10), q.get, ())
     135          # Empty it
     136          for i in range(QUEUE_SIZE):
     137              q.get()
     138          self.assertTrue(not q.qsize(), "Queue should be empty")
     139          try:
     140              q.get(block=0)
     141              self.fail("Didn't appear to block with an empty queue")
     142          except self.queue.Empty:
     143              pass
     144          try:
     145              q.get(timeout=0.01)
     146              self.fail("Didn't appear to time-out with an empty queue")
     147          except self.queue.Empty:
     148              pass
     149          # Test a blocking get
     150          self.do_blocking_test(q.get, (), q.put, ('empty',))
     151          self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))
     152  
     153  
     154      def worker(self, q):
     155          while True:
     156              x = q.get()
     157              if x < 0:
     158                  q.task_done()
     159                  return
     160              with self.cumlock:
     161                  self.cum += x
     162              q.task_done()
     163  
     164      def queue_join_test(self, q):
     165          self.cum = 0
     166          threads = []
     167          for i in (0,1):
     168              thread = threading.Thread(target=self.worker, args=(q,))
     169              thread.start()
     170              threads.append(thread)
     171          for i in range(100):
     172              q.put(i)
     173          q.join()
     174          self.assertEqual(self.cum, sum(range(100)),
     175                           "q.join() did not block until all tasks were done")
     176          for i in (0,1):
     177              q.put(-1)         # instruct the threads to close
     178          q.join()                # verify that you can join twice
     179          for thread in threads:
     180              thread.join()
     181  
     182      def test_queue_task_done(self):
     183          # Test to make sure a queue task completed successfully.
     184          q = self.type2test()
     185          try:
     186              q.task_done()
     187          except ValueError:
     188              pass
     189          else:
     190              self.fail("Did not detect task count going negative")
     191  
     192      def test_queue_join(self):
     193          # Test that a queue join()s successfully, and before anything else
     194          # (done twice for insurance).
     195          q = self.type2test()
     196          self.queue_join_test(q)
     197          self.queue_join_test(q)
     198          try:
     199              q.task_done()
     200          except ValueError:
     201              pass
     202          else:
     203              self.fail("Did not detect task count going negative")
     204  
     205      def test_basic(self):
     206          # Do it a couple of times on the same queue.
     207          # Done twice to make sure works with same instance reused.
     208          q = self.type2test(QUEUE_SIZE)
     209          self.basic_queue_test(q)
     210          self.basic_queue_test(q)
     211  
     212      def test_negative_timeout_raises_exception(self):
     213          q = self.type2test(QUEUE_SIZE)
     214          with self.assertRaises(ValueError):
     215              q.put(1, timeout=-1)
     216          with self.assertRaises(ValueError):
     217              q.get(1, timeout=-1)
     218  
     219      def test_nowait(self):
     220          q = self.type2test(QUEUE_SIZE)
     221          for i in range(QUEUE_SIZE):
     222              q.put_nowait(1)
     223          with self.assertRaises(self.queue.Full):
     224              q.put_nowait(1)
     225  
     226          for i in range(QUEUE_SIZE):
     227              q.get_nowait()
     228          with self.assertRaises(self.queue.Empty):
     229              q.get_nowait()
     230  
     231      def test_shrinking_queue(self):
     232          # issue 10110
     233          q = self.type2test(3)
     234          q.put(1)
     235          q.put(2)
     236          q.put(3)
     237          with self.assertRaises(self.queue.Full):
     238              q.put_nowait(4)
     239          self.assertEqual(q.qsize(), 3)
     240          q.maxsize = 2                       # shrink the queue
     241          with self.assertRaises(self.queue.Full):
     242              q.put_nowait(4)
     243  
     244  class ESC[4;38;5;81mQueueTest(ESC[4;38;5;149mBaseQueueTestMixin):
     245  
     246      def setUp(self):
     247          self.type2test = self.queue.Queue
     248          super().setUp()
     249  
     250  class ESC[4;38;5;81mPyQueueTest(ESC[4;38;5;149mQueueTest, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     251      queue = py_queue
     252  
     253  
     254  @need_c_queue
     255  class ESC[4;38;5;81mCQueueTest(ESC[4;38;5;149mQueueTest, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     256      queue = c_queue
     257  
     258  
     259  class ESC[4;38;5;81mLifoQueueTest(ESC[4;38;5;149mBaseQueueTestMixin):
     260  
     261      def setUp(self):
     262          self.type2test = self.queue.LifoQueue
     263          super().setUp()
     264  
     265  
     266  class ESC[4;38;5;81mPyLifoQueueTest(ESC[4;38;5;149mLifoQueueTest, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     267      queue = py_queue
     268  
     269  
     270  @need_c_queue
     271  class ESC[4;38;5;81mCLifoQueueTest(ESC[4;38;5;149mLifoQueueTest, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     272      queue = c_queue
     273  
     274  
     275  class ESC[4;38;5;81mPriorityQueueTest(ESC[4;38;5;149mBaseQueueTestMixin):
     276  
     277      def setUp(self):
     278          self.type2test = self.queue.PriorityQueue
     279          super().setUp()
     280  
     281  
     282  class ESC[4;38;5;81mPyPriorityQueueTest(ESC[4;38;5;149mPriorityQueueTest, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     283      queue = py_queue
     284  
     285  
     286  @need_c_queue
     287  class ESC[4;38;5;81mCPriorityQueueTest(ESC[4;38;5;149mPriorityQueueTest, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     288      queue = c_queue
     289  
     290  
     291  # A Queue subclass that can provoke failure at a moment's notice :)
     292  class ESC[4;38;5;81mFailingQueueException(ESC[4;38;5;149mException): pass
     293  
     294  
     295  class ESC[4;38;5;81mFailingQueueTest(ESC[4;38;5;149mBlockingTestMixin):
     296  
     297      def setUp(self):
     298  
     299          Queue = self.queue.Queue
     300  
     301          class ESC[4;38;5;81mFailingQueue(ESC[4;38;5;149mQueue):
     302              def __init__(self, *args):
     303                  self.fail_next_put = False
     304                  self.fail_next_get = False
     305                  Queue.__init__(self, *args)
     306              def _put(self, item):
     307                  if self.fail_next_put:
     308                      self.fail_next_put = False
     309                      raise FailingQueueException("You Lose")
     310                  return Queue._put(self, item)
     311              def _get(self):
     312                  if self.fail_next_get:
     313                      self.fail_next_get = False
     314                      raise FailingQueueException("You Lose")
     315                  return Queue._get(self)
     316  
     317          self.FailingQueue = FailingQueue
     318  
     319          super().setUp()
     320  
     321      def failing_queue_test(self, q):
     322          if q.qsize():
     323              raise RuntimeError("Call this function with an empty queue")
     324          for i in range(QUEUE_SIZE-1):
     325              q.put(i)
     326          # Test a failing non-blocking put.
     327          q.fail_next_put = True
     328          try:
     329              q.put("oops", block=0)
     330              self.fail("The queue didn't fail when it should have")
     331          except FailingQueueException:
     332              pass
     333          q.fail_next_put = True
     334          try:
     335              q.put("oops", timeout=0.1)
     336              self.fail("The queue didn't fail when it should have")
     337          except FailingQueueException:
     338              pass
     339          q.put("last")
     340          self.assertTrue(qfull(q), "Queue should be full")
     341          # Test a failing blocking put
     342          q.fail_next_put = True
     343          try:
     344              self.do_blocking_test(q.put, ("full",), q.get, ())
     345              self.fail("The queue didn't fail when it should have")
     346          except FailingQueueException:
     347              pass
     348          # Check the Queue isn't damaged.
     349          # put failed, but get succeeded - re-add
     350          q.put("last")
     351          # Test a failing timeout put
     352          q.fail_next_put = True
     353          try:
     354              self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
     355                                                FailingQueueException)
     356              self.fail("The queue didn't fail when it should have")
     357          except FailingQueueException:
     358              pass
     359          # Check the Queue isn't damaged.
     360          # put failed, but get succeeded - re-add
     361          q.put("last")
     362          self.assertTrue(qfull(q), "Queue should be full")
     363          q.get()
     364          self.assertTrue(not qfull(q), "Queue should not be full")
     365          q.put("last")
     366          self.assertTrue(qfull(q), "Queue should be full")
     367          # Test a blocking put
     368          self.do_blocking_test(q.put, ("full",), q.get, ())
     369          # Empty it
     370          for i in range(QUEUE_SIZE):
     371              q.get()
     372          self.assertTrue(not q.qsize(), "Queue should be empty")
     373          q.put("first")
     374          q.fail_next_get = True
     375          try:
     376              q.get()
     377              self.fail("The queue didn't fail when it should have")
     378          except FailingQueueException:
     379              pass
     380          self.assertTrue(q.qsize(), "Queue should not be empty")
     381          q.fail_next_get = True
     382          try:
     383              q.get(timeout=0.1)
     384              self.fail("The queue didn't fail when it should have")
     385          except FailingQueueException:
     386              pass
     387          self.assertTrue(q.qsize(), "Queue should not be empty")
     388          q.get()
     389          self.assertTrue(not q.qsize(), "Queue should be empty")
     390          q.fail_next_get = True
     391          try:
     392              self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
     393                                                FailingQueueException)
     394              self.fail("The queue didn't fail when it should have")
     395          except FailingQueueException:
     396              pass
     397          # put succeeded, but get failed.
     398          self.assertTrue(q.qsize(), "Queue should not be empty")
     399          q.get()
     400          self.assertTrue(not q.qsize(), "Queue should be empty")
     401  
     402      def test_failing_queue(self):
     403  
     404          # Test to make sure a queue is functioning correctly.
     405          # Done twice to the same instance.
     406          q = self.FailingQueue(QUEUE_SIZE)
     407          self.failing_queue_test(q)
     408          self.failing_queue_test(q)
     409  
     410  
     411  
     412  class ESC[4;38;5;81mPyFailingQueueTest(ESC[4;38;5;149mFailingQueueTest, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     413      queue = py_queue
     414  
     415  
     416  @need_c_queue
     417  class ESC[4;38;5;81mCFailingQueueTest(ESC[4;38;5;149mFailingQueueTest, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     418      queue = c_queue
     419  
     420  
     421  class ESC[4;38;5;81mBaseSimpleQueueTest:
     422  
     423      def setUp(self):
     424          self.q = self.type2test()
     425  
     426      def feed(self, q, seq, rnd, sentinel):
     427          while True:
     428              try:
     429                  val = seq.pop()
     430              except IndexError:
     431                  q.put(sentinel)
     432                  return
     433              q.put(val)
     434              if rnd.random() > 0.5:
     435                  time.sleep(rnd.random() * 1e-3)
     436  
     437      def consume(self, q, results, sentinel):
     438          while True:
     439              val = q.get()
     440              if val == sentinel:
     441                  return
     442              results.append(val)
     443  
     444      def consume_nonblock(self, q, results, sentinel):
     445          while True:
     446              while True:
     447                  try:
     448                      val = q.get(block=False)
     449                  except self.queue.Empty:
     450                      time.sleep(1e-5)
     451                  else:
     452                      break
     453              if val == sentinel:
     454                  return
     455              results.append(val)
     456  
     457      def consume_timeout(self, q, results, sentinel):
     458          while True:
     459              while True:
     460                  try:
     461                      val = q.get(timeout=1e-5)
     462                  except self.queue.Empty:
     463                      pass
     464                  else:
     465                      break
     466              if val == sentinel:
     467                  return
     468              results.append(val)
     469  
     470      def run_threads(self, n_threads, q, inputs, feed_func, consume_func):
     471          results = []
     472          sentinel = None
     473          seq = inputs.copy()
     474          seq.reverse()
     475          rnd = random.Random(42)
     476  
     477          exceptions = []
     478          def log_exceptions(f):
     479              def wrapper(*args, **kwargs):
     480                  try:
     481                      f(*args, **kwargs)
     482                  except BaseException as e:
     483                      exceptions.append(e)
     484              return wrapper
     485  
     486          feeders = [threading.Thread(target=log_exceptions(feed_func),
     487                                      args=(q, seq, rnd, sentinel))
     488                     for i in range(n_threads)]
     489          consumers = [threading.Thread(target=log_exceptions(consume_func),
     490                                        args=(q, results, sentinel))
     491                       for i in range(n_threads)]
     492  
     493          with threading_helper.start_threads(feeders + consumers):
     494              pass
     495  
     496          self.assertFalse(exceptions)
     497          self.assertTrue(q.empty())
     498          self.assertEqual(q.qsize(), 0)
     499  
     500          return results
     501  
     502      def test_basic(self):
     503          # Basic tests for get(), put() etc.
     504          q = self.q
     505          self.assertTrue(q.empty())
     506          self.assertEqual(q.qsize(), 0)
     507          q.put(1)
     508          self.assertFalse(q.empty())
     509          self.assertEqual(q.qsize(), 1)
     510          q.put(2)
     511          q.put_nowait(3)
     512          q.put(4)
     513          self.assertFalse(q.empty())
     514          self.assertEqual(q.qsize(), 4)
     515  
     516          self.assertEqual(q.get(), 1)
     517          self.assertEqual(q.qsize(), 3)
     518  
     519          self.assertEqual(q.get_nowait(), 2)
     520          self.assertEqual(q.qsize(), 2)
     521  
     522          self.assertEqual(q.get(block=False), 3)
     523          self.assertFalse(q.empty())
     524          self.assertEqual(q.qsize(), 1)
     525  
     526          self.assertEqual(q.get(timeout=0.1), 4)
     527          self.assertTrue(q.empty())
     528          self.assertEqual(q.qsize(), 0)
     529  
     530          with self.assertRaises(self.queue.Empty):
     531              q.get(block=False)
     532          with self.assertRaises(self.queue.Empty):
     533              q.get(timeout=1e-3)
     534          with self.assertRaises(self.queue.Empty):
     535              q.get_nowait()
     536          self.assertTrue(q.empty())
     537          self.assertEqual(q.qsize(), 0)
     538  
     539      def test_negative_timeout_raises_exception(self):
     540          q = self.q
     541          q.put(1)
     542          with self.assertRaises(ValueError):
     543              q.get(timeout=-1)
     544  
     545      def test_order(self):
     546          # Test a pair of concurrent put() and get()
     547          q = self.q
     548          inputs = list(range(100))
     549          results = self.run_threads(1, q, inputs, self.feed, self.consume)
     550  
     551          # One producer, one consumer => results appended in well-defined order
     552          self.assertEqual(results, inputs)
     553  
     554      def test_many_threads(self):
     555          # Test multiple concurrent put() and get()
     556          N = 50
     557          q = self.q
     558          inputs = list(range(10000))
     559          results = self.run_threads(N, q, inputs, self.feed, self.consume)
     560  
     561          # Multiple consumers without synchronization append the
     562          # results in random order
     563          self.assertEqual(sorted(results), inputs)
     564  
     565      def test_many_threads_nonblock(self):
     566          # Test multiple concurrent put() and get(block=False)
     567          N = 50
     568          q = self.q
     569          inputs = list(range(10000))
     570          results = self.run_threads(N, q, inputs,
     571                                     self.feed, self.consume_nonblock)
     572  
     573          self.assertEqual(sorted(results), inputs)
     574  
     575      def test_many_threads_timeout(self):
     576          # Test multiple concurrent put() and get(timeout=...)
     577          N = 50
     578          q = self.q
     579          inputs = list(range(1000))
     580          results = self.run_threads(N, q, inputs,
     581                                     self.feed, self.consume_timeout)
     582  
     583          self.assertEqual(sorted(results), inputs)
     584  
     585      def test_references(self):
     586          # The queue should lose references to each item as soon as
     587          # it leaves the queue.
     588          class ESC[4;38;5;81mC:
     589              pass
     590  
     591          N = 20
     592          q = self.q
     593          for i in range(N):
     594              q.put(C())
     595          for i in range(N):
     596              wr = weakref.ref(q.get())
     597              gc_collect()  # For PyPy or other GCs.
     598              self.assertIsNone(wr())
     599  
     600  
     601  class ESC[4;38;5;81mPySimpleQueueTest(ESC[4;38;5;149mBaseSimpleQueueTest, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     602  
     603      queue = py_queue
     604      def setUp(self):
     605          self.type2test = self.queue._PySimpleQueue
     606          super().setUp()
     607  
     608  
     609  @need_c_queue
     610  class ESC[4;38;5;81mCSimpleQueueTest(ESC[4;38;5;149mBaseSimpleQueueTest, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     611  
     612      queue = c_queue
     613  
     614      def setUp(self):
     615          self.type2test = self.queue.SimpleQueue
     616          super().setUp()
     617  
     618      def test_is_default(self):
     619          self.assertIs(self.type2test, self.queue.SimpleQueue)
     620          self.assertIs(self.type2test, self.queue.SimpleQueue)
     621  
     622      def test_reentrancy(self):
     623          # bpo-14976: put() may be called reentrantly in an asynchronous
     624          # callback.
     625          q = self.q
     626          gen = itertools.count()
     627          N = 10000
     628          results = []
     629  
     630          # This test exploits the fact that __del__ in a reference cycle
     631          # can be called any time the GC may run.
     632  
     633          class ESC[4;38;5;81mCircular(ESC[4;38;5;149mobject):
     634              def __init__(self):
     635                  self.circular = self
     636  
     637              def __del__(self):
     638                  q.put(next(gen))
     639  
     640          while True:
     641              o = Circular()
     642              q.put(next(gen))
     643              del o
     644              results.append(q.get())
     645              if results[-1] >= N:
     646                  break
     647  
     648          self.assertEqual(results, list(range(N + 1)))
     649  
     650  
     651  if __name__ == "__main__":
     652      unittest.main()