(root)/
Python-3.11.7/
Lib/
test/
test_asyncio/
test_queues.py
       1  """Tests for queues.py"""
       2  
       3  import asyncio
       4  import unittest
       5  from types import GenericAlias
       6  
       7  
       8  def tearDownModule():
       9      asyncio.set_event_loop_policy(None)
      10  
      11  
      12  class ESC[4;38;5;81mQueueBasicTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
      13  
      14      async def _test_repr_or_str(self, fn, expect_id):
      15          """Test Queue's repr or str.
      16  
      17          fn is repr or str. expect_id is True if we expect the Queue's id to
      18          appear in fn(Queue()).
      19          """
      20          q = asyncio.Queue()
      21          self.assertTrue(fn(q).startswith('<Queue'), fn(q))
      22          id_is_present = hex(id(q)) in fn(q)
      23          self.assertEqual(expect_id, id_is_present)
      24  
      25          # getters
      26          q = asyncio.Queue()
      27          async with asyncio.TaskGroup() as tg:
      28              # Start a task that waits to get.
      29              getter = tg.create_task(q.get())
      30              # Let it start waiting.
      31              await asyncio.sleep(0)
      32              self.assertTrue('_getters[1]' in fn(q))
      33              # resume q.get coroutine to finish generator
      34              q.put_nowait(0)
      35  
      36          self.assertEqual(0, await getter)
      37  
      38          # putters
      39          q = asyncio.Queue(maxsize=1)
      40          async with asyncio.TaskGroup() as tg:
      41              q.put_nowait(1)
      42              # Start a task that waits to put.
      43              putter = tg.create_task(q.put(2))
      44              # Let it start waiting.
      45              await asyncio.sleep(0)
      46              self.assertTrue('_putters[1]' in fn(q))
      47              # resume q.put coroutine to finish generator
      48              q.get_nowait()
      49  
      50          self.assertTrue(putter.done())
      51  
      52          q = asyncio.Queue()
      53          q.put_nowait(1)
      54          self.assertTrue('_queue=[1]' in fn(q))
      55  
      56      async def test_repr(self):
      57          await self._test_repr_or_str(repr, True)
      58  
      59      async def test_str(self):
      60          await self._test_repr_or_str(str, False)
      61  
      62      def test_generic_alias(self):
      63          q = asyncio.Queue[int]
      64          self.assertEqual(q.__args__, (int,))
      65          self.assertIsInstance(q, GenericAlias)
      66  
      67      async def test_empty(self):
      68          q = asyncio.Queue()
      69          self.assertTrue(q.empty())
      70          await q.put(1)
      71          self.assertFalse(q.empty())
      72          self.assertEqual(1, await q.get())
      73          self.assertTrue(q.empty())
      74  
      75      async def test_full(self):
      76          q = asyncio.Queue()
      77          self.assertFalse(q.full())
      78  
      79          q = asyncio.Queue(maxsize=1)
      80          await q.put(1)
      81          self.assertTrue(q.full())
      82  
      83      async def test_order(self):
      84          q = asyncio.Queue()
      85          for i in [1, 3, 2]:
      86              await q.put(i)
      87  
      88          items = [await q.get() for _ in range(3)]
      89          self.assertEqual([1, 3, 2], items)
      90  
      91      async def test_maxsize(self):
      92          q = asyncio.Queue(maxsize=2)
      93          self.assertEqual(2, q.maxsize)
      94          have_been_put = []
      95  
      96          async def putter():
      97              for i in range(3):
      98                  await q.put(i)
      99                  have_been_put.append(i)
     100              return True
     101  
     102          t = asyncio.create_task(putter())
     103          for i in range(2):
     104              await asyncio.sleep(0)
     105  
     106          # The putter is blocked after putting two items.
     107          self.assertEqual([0, 1], have_been_put)
     108          self.assertEqual(0, await q.get())
     109  
     110          # Let the putter resume and put last item.
     111          await asyncio.sleep(0)
     112          self.assertEqual([0, 1, 2], have_been_put)
     113          self.assertEqual(1, await q.get())
     114          self.assertEqual(2, await q.get())
     115  
     116          self.assertTrue(t.done())
     117          self.assertTrue(t.result())
     118  
     119  
     120  class ESC[4;38;5;81mQueueGetTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
     121  
     122      async def test_blocking_get(self):
     123          q = asyncio.Queue()
     124          q.put_nowait(1)
     125  
     126          self.assertEqual(1, await q.get())
     127  
     128      async def test_get_with_putters(self):
     129          loop = asyncio.get_running_loop()
     130  
     131          q = asyncio.Queue(1)
     132          await q.put(1)
     133  
     134          waiter = loop.create_future()
     135          q._putters.append(waiter)
     136  
     137          self.assertEqual(1, await q.get())
     138          self.assertTrue(waiter.done())
     139          self.assertIsNone(waiter.result())
     140  
     141      async def test_blocking_get_wait(self):
     142          loop = asyncio.get_running_loop()
     143          q = asyncio.Queue()
     144          started = asyncio.Event()
     145          finished = False
     146  
     147          async def queue_get():
     148              nonlocal finished
     149              started.set()
     150              res = await q.get()
     151              finished = True
     152              return res
     153  
     154          queue_get_task = asyncio.create_task(queue_get())
     155          await started.wait()
     156          self.assertFalse(finished)
     157          loop.call_later(0.01, q.put_nowait, 1)
     158          res = await queue_get_task
     159          self.assertTrue(finished)
     160          self.assertEqual(1, res)
     161  
     162      def test_nonblocking_get(self):
     163          q = asyncio.Queue()
     164          q.put_nowait(1)
     165          self.assertEqual(1, q.get_nowait())
     166  
     167      def test_nonblocking_get_exception(self):
     168          q = asyncio.Queue()
     169          self.assertRaises(asyncio.QueueEmpty, q.get_nowait)
     170  
     171      async def test_get_cancelled_race(self):
     172          q = asyncio.Queue()
     173  
     174          t1 = asyncio.create_task(q.get())
     175          t2 = asyncio.create_task(q.get())
     176  
     177          await asyncio.sleep(0)
     178          t1.cancel()
     179          await asyncio.sleep(0)
     180          self.assertTrue(t1.done())
     181          await q.put('a')
     182          await asyncio.sleep(0)
     183          self.assertEqual('a', await t2)
     184  
     185      async def test_get_with_waiting_putters(self):
     186          q = asyncio.Queue(maxsize=1)
     187          asyncio.create_task(q.put('a'))
     188          asyncio.create_task(q.put('b'))
     189          self.assertEqual(await q.get(), 'a')
     190          self.assertEqual(await q.get(), 'b')
     191  
     192      async def test_why_are_getters_waiting(self):
     193          async def consumer(queue, num_expected):
     194              for _ in range(num_expected):
     195                  await queue.get()
     196  
     197          async def producer(queue, num_items):
     198              for i in range(num_items):
     199                  await queue.put(i)
     200  
     201          producer_num_items = 5
     202  
     203          q = asyncio.Queue(1)
     204          async with asyncio.TaskGroup() as tg:
     205              tg.create_task(producer(q, producer_num_items))
     206              tg.create_task(consumer(q, producer_num_items))
     207  
     208      async def test_cancelled_getters_not_being_held_in_self_getters(self):
     209          queue = asyncio.Queue(maxsize=5)
     210  
     211          with self.assertRaises(TimeoutError):
     212              await asyncio.wait_for(queue.get(), 0.1)
     213  
     214          self.assertEqual(len(queue._getters), 0)
     215  
     216  
     217  class ESC[4;38;5;81mQueuePutTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
     218  
     219      async def test_blocking_put(self):
     220          q = asyncio.Queue()
     221  
     222          # No maxsize, won't block.
     223          await q.put(1)
     224          self.assertEqual(1, await q.get())
     225  
     226      async def test_blocking_put_wait(self):
     227          q = asyncio.Queue(maxsize=1)
     228          started = asyncio.Event()
     229          finished = False
     230  
     231          async def queue_put():
     232              nonlocal finished
     233              started.set()
     234              await q.put(1)
     235              await q.put(2)
     236              finished = True
     237  
     238          loop = asyncio.get_running_loop()
     239          loop.call_later(0.01, q.get_nowait)
     240          queue_put_task = asyncio.create_task(queue_put())
     241          await started.wait()
     242          self.assertFalse(finished)
     243          await queue_put_task
     244          self.assertTrue(finished)
     245  
     246      def test_nonblocking_put(self):
     247          q = asyncio.Queue()
     248          q.put_nowait(1)
     249          self.assertEqual(1, q.get_nowait())
     250  
     251      async def test_get_cancel_drop_one_pending_reader(self):
     252          q = asyncio.Queue()
     253  
     254          reader = asyncio.create_task(q.get())
     255  
     256          await asyncio.sleep(0)
     257  
     258          q.put_nowait(1)
     259          q.put_nowait(2)
     260          reader.cancel()
     261  
     262          try:
     263              await reader
     264          except asyncio.CancelledError:
     265              # try again
     266              reader = asyncio.create_task(q.get())
     267              await reader
     268  
     269          result = reader.result()
     270          # if we get 2, it means 1 got dropped!
     271          self.assertEqual(1, result)
     272  
     273      async def test_get_cancel_drop_many_pending_readers(self):
     274          q = asyncio.Queue()
     275  
     276          async with asyncio.TaskGroup() as tg:
     277              reader1 = tg.create_task(q.get())
     278              reader2 = tg.create_task(q.get())
     279              reader3 = tg.create_task(q.get())
     280  
     281              await asyncio.sleep(0)
     282  
     283              q.put_nowait(1)
     284              q.put_nowait(2)
     285              reader1.cancel()
     286  
     287              with self.assertRaises(asyncio.CancelledError):
     288                  await reader1
     289  
     290              await reader3
     291  
     292          # It is undefined in which order concurrent readers receive results.
     293          self.assertEqual({reader2.result(), reader3.result()}, {1, 2})
     294  
     295      async def test_put_cancel_drop(self):
     296          q = asyncio.Queue(1)
     297  
     298          q.put_nowait(1)
     299  
     300          # putting a second item in the queue has to block (qsize=1)
     301          writer = asyncio.create_task(q.put(2))
     302          await asyncio.sleep(0)
     303  
     304          value1 = q.get_nowait()
     305          self.assertEqual(value1, 1)
     306  
     307          writer.cancel()
     308          try:
     309              await writer
     310          except asyncio.CancelledError:
     311              # try again
     312              writer = asyncio.create_task(q.put(2))
     313              await writer
     314  
     315          value2 = q.get_nowait()
     316          self.assertEqual(value2, 2)
     317          self.assertEqual(q.qsize(), 0)
     318  
     319      def test_nonblocking_put_exception(self):
     320          q = asyncio.Queue(maxsize=1, )
     321          q.put_nowait(1)
     322          self.assertRaises(asyncio.QueueFull, q.put_nowait, 2)
     323  
     324      async def test_float_maxsize(self):
     325          q = asyncio.Queue(maxsize=1.3, )
     326          q.put_nowait(1)
     327          q.put_nowait(2)
     328          self.assertTrue(q.full())
     329          self.assertRaises(asyncio.QueueFull, q.put_nowait, 3)
     330  
     331          q = asyncio.Queue(maxsize=1.3, )
     332  
     333          await q.put(1)
     334          await q.put(2)
     335          self.assertTrue(q.full())
     336  
     337      async def test_put_cancelled(self):
     338          q = asyncio.Queue()
     339  
     340          async def queue_put():
     341              await q.put(1)
     342              return True
     343  
     344          t = asyncio.create_task(queue_put())
     345  
     346          self.assertEqual(1, await q.get())
     347          self.assertTrue(t.done())
     348          self.assertTrue(t.result())
     349  
     350      async def test_put_cancelled_race(self):
     351          q = asyncio.Queue(maxsize=1)
     352  
     353          put_a = asyncio.create_task(q.put('a'))
     354          put_b = asyncio.create_task(q.put('b'))
     355          put_c = asyncio.create_task(q.put('X'))
     356  
     357          await asyncio.sleep(0)
     358          self.assertTrue(put_a.done())
     359          self.assertFalse(put_b.done())
     360  
     361          put_c.cancel()
     362          await asyncio.sleep(0)
     363          self.assertTrue(put_c.done())
     364          self.assertEqual(q.get_nowait(), 'a')
     365          await asyncio.sleep(0)
     366          self.assertEqual(q.get_nowait(), 'b')
     367  
     368          await put_b
     369  
     370      async def test_put_with_waiting_getters(self):
     371          q = asyncio.Queue()
     372          t = asyncio.create_task(q.get())
     373          await asyncio.sleep(0)
     374          await q.put('a')
     375          self.assertEqual(await t, 'a')
     376  
     377      async def test_why_are_putters_waiting(self):
     378          queue = asyncio.Queue(2)
     379  
     380          async def putter(item):
     381              await queue.put(item)
     382  
     383          async def getter():
     384              await asyncio.sleep(0)
     385              num = queue.qsize()
     386              for _ in range(num):
     387                  queue.get_nowait()
     388  
     389          async with asyncio.TaskGroup() as tg:
     390              tg.create_task(getter())
     391              tg.create_task(putter(0))
     392              tg.create_task(putter(1))
     393              tg.create_task(putter(2))
     394              tg.create_task(putter(3))
     395  
     396      async def test_cancelled_puts_not_being_held_in_self_putters(self):
     397          # Full queue.
     398          queue = asyncio.Queue(maxsize=1)
     399          queue.put_nowait(1)
     400  
     401          # Task waiting for space to put an item in the queue.
     402          put_task = asyncio.create_task(queue.put(1))
     403          await asyncio.sleep(0)
     404  
     405          # Check that the putter is correctly removed from queue._putters when
     406          # the task is canceled.
     407          self.assertEqual(len(queue._putters), 1)
     408          put_task.cancel()
     409          with self.assertRaises(asyncio.CancelledError):
     410              await put_task
     411          self.assertEqual(len(queue._putters), 0)
     412  
     413      async def test_cancelled_put_silence_value_error_exception(self):
     414          # Full Queue.
     415          queue = asyncio.Queue(1)
     416          queue.put_nowait(1)
     417  
     418          # Task waiting for space to put a item in the queue.
     419          put_task = asyncio.create_task(queue.put(1))
     420          await asyncio.sleep(0)
     421  
     422          # get_nowait() remove the future of put_task from queue._putters.
     423          queue.get_nowait()
     424          # When canceled, queue.put is going to remove its future from
     425          # self._putters but it was removed previously by queue.get_nowait().
     426          put_task.cancel()
     427  
     428          # The ValueError exception triggered by queue._putters.remove(putter)
     429          # inside queue.put should be silenced.
     430          # If the ValueError is silenced we should catch a CancelledError.
     431          with self.assertRaises(asyncio.CancelledError):
     432              await put_task
     433  
     434  
     435  class ESC[4;38;5;81mLifoQueueTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
     436  
     437      async def test_order(self):
     438          q = asyncio.LifoQueue()
     439          for i in [1, 3, 2]:
     440              await q.put(i)
     441  
     442          items = [await q.get() for _ in range(3)]
     443          self.assertEqual([2, 3, 1], items)
     444  
     445  
     446  class ESC[4;38;5;81mPriorityQueueTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
     447  
     448      async def test_order(self):
     449          q = asyncio.PriorityQueue()
     450          for i in [1, 3, 2]:
     451              await q.put(i)
     452  
     453          items = [await q.get() for _ in range(3)]
     454          self.assertEqual([1, 2, 3], items)
     455  
     456  
     457  class ESC[4;38;5;81m_QueueJoinTestMixin:
     458  
     459      q_class = None
     460  
     461      def test_task_done_underflow(self):
     462          q = self.q_class()
     463          self.assertRaises(ValueError, q.task_done)
     464  
     465      async def test_task_done(self):
     466          q = self.q_class()
     467          for i in range(100):
     468              q.put_nowait(i)
     469  
     470          accumulator = 0
     471  
     472          # Two workers get items from the queue and call task_done after each.
     473          # Join the queue and assert all items have been processed.
     474          running = True
     475  
     476          async def worker():
     477              nonlocal accumulator
     478  
     479              while running:
     480                  item = await q.get()
     481                  accumulator += item
     482                  q.task_done()
     483  
     484          async with asyncio.TaskGroup() as tg:
     485              tasks = [tg.create_task(worker())
     486                       for index in range(2)]
     487  
     488              await q.join()
     489              self.assertEqual(sum(range(100)), accumulator)
     490  
     491              # close running generators
     492              running = False
     493              for i in range(len(tasks)):
     494                  q.put_nowait(0)
     495  
     496      async def test_join_empty_queue(self):
     497          q = self.q_class()
     498  
     499          # Test that a queue join()s successfully, and before anything else
     500          # (done twice for insurance).
     501  
     502          await q.join()
     503          await q.join()
     504  
     505      async def test_format(self):
     506          q = self.q_class()
     507          self.assertEqual(q._format(), 'maxsize=0')
     508  
     509          q._unfinished_tasks = 2
     510          self.assertEqual(q._format(), 'maxsize=0 tasks=2')
     511  
     512  
     513  class ESC[4;38;5;81mQueueJoinTests(ESC[4;38;5;149m_QueueJoinTestMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
     514      q_class = asyncio.Queue
     515  
     516  
     517  class ESC[4;38;5;81mLifoQueueJoinTests(ESC[4;38;5;149m_QueueJoinTestMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
     518      q_class = asyncio.LifoQueue
     519  
     520  
     521  class ESC[4;38;5;81mPriorityQueueJoinTests(ESC[4;38;5;149m_QueueJoinTestMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
     522      q_class = asyncio.PriorityQueue
     523  
     524  
     525  if __name__ == '__main__':
     526      unittest.main()