1 """Tests for queues.py"""
2
3 import asyncio
4 import unittest
5 from types import GenericAlias
6
7
8 def tearDownModule():
9 asyncio.set_event_loop_policy(None)
10
11
12 class ESC[4;38;5;81mQueueBasicTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
13
14 async def _test_repr_or_str(self, fn, expect_id):
15 """Test Queue's repr or str.
16
17 fn is repr or str. expect_id is True if we expect the Queue's id to
18 appear in fn(Queue()).
19 """
20 q = asyncio.Queue()
21 self.assertTrue(fn(q).startswith('<Queue'), fn(q))
22 id_is_present = hex(id(q)) in fn(q)
23 self.assertEqual(expect_id, id_is_present)
24
25 # getters
26 q = asyncio.Queue()
27 async with asyncio.TaskGroup() as tg:
28 # Start a task that waits to get.
29 getter = tg.create_task(q.get())
30 # Let it start waiting.
31 await asyncio.sleep(0)
32 self.assertTrue('_getters[1]' in fn(q))
33 # resume q.get coroutine to finish generator
34 q.put_nowait(0)
35
36 self.assertEqual(0, await getter)
37
38 # putters
39 q = asyncio.Queue(maxsize=1)
40 async with asyncio.TaskGroup() as tg:
41 q.put_nowait(1)
42 # Start a task that waits to put.
43 putter = tg.create_task(q.put(2))
44 # Let it start waiting.
45 await asyncio.sleep(0)
46 self.assertTrue('_putters[1]' in fn(q))
47 # resume q.put coroutine to finish generator
48 q.get_nowait()
49
50 self.assertTrue(putter.done())
51
52 q = asyncio.Queue()
53 q.put_nowait(1)
54 self.assertTrue('_queue=[1]' in fn(q))
55
56 async def test_repr(self):
57 await self._test_repr_or_str(repr, True)
58
59 async def test_str(self):
60 await self._test_repr_or_str(str, False)
61
62 def test_generic_alias(self):
63 q = asyncio.Queue[int]
64 self.assertEqual(q.__args__, (int,))
65 self.assertIsInstance(q, GenericAlias)
66
67 async def test_empty(self):
68 q = asyncio.Queue()
69 self.assertTrue(q.empty())
70 await q.put(1)
71 self.assertFalse(q.empty())
72 self.assertEqual(1, await q.get())
73 self.assertTrue(q.empty())
74
75 async def test_full(self):
76 q = asyncio.Queue()
77 self.assertFalse(q.full())
78
79 q = asyncio.Queue(maxsize=1)
80 await q.put(1)
81 self.assertTrue(q.full())
82
83 async def test_order(self):
84 q = asyncio.Queue()
85 for i in [1, 3, 2]:
86 await q.put(i)
87
88 items = [await q.get() for _ in range(3)]
89 self.assertEqual([1, 3, 2], items)
90
91 async def test_maxsize(self):
92 q = asyncio.Queue(maxsize=2)
93 self.assertEqual(2, q.maxsize)
94 have_been_put = []
95
96 async def putter():
97 for i in range(3):
98 await q.put(i)
99 have_been_put.append(i)
100 return True
101
102 t = asyncio.create_task(putter())
103 for i in range(2):
104 await asyncio.sleep(0)
105
106 # The putter is blocked after putting two items.
107 self.assertEqual([0, 1], have_been_put)
108 self.assertEqual(0, await q.get())
109
110 # Let the putter resume and put last item.
111 await asyncio.sleep(0)
112 self.assertEqual([0, 1, 2], have_been_put)
113 self.assertEqual(1, await q.get())
114 self.assertEqual(2, await q.get())
115
116 self.assertTrue(t.done())
117 self.assertTrue(t.result())
118
119
120 class ESC[4;38;5;81mQueueGetTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
121
122 async def test_blocking_get(self):
123 q = asyncio.Queue()
124 q.put_nowait(1)
125
126 self.assertEqual(1, await q.get())
127
128 async def test_get_with_putters(self):
129 loop = asyncio.get_running_loop()
130
131 q = asyncio.Queue(1)
132 await q.put(1)
133
134 waiter = loop.create_future()
135 q._putters.append(waiter)
136
137 self.assertEqual(1, await q.get())
138 self.assertTrue(waiter.done())
139 self.assertIsNone(waiter.result())
140
141 async def test_blocking_get_wait(self):
142 loop = asyncio.get_running_loop()
143 q = asyncio.Queue()
144 started = asyncio.Event()
145 finished = False
146
147 async def queue_get():
148 nonlocal finished
149 started.set()
150 res = await q.get()
151 finished = True
152 return res
153
154 queue_get_task = asyncio.create_task(queue_get())
155 await started.wait()
156 self.assertFalse(finished)
157 loop.call_later(0.01, q.put_nowait, 1)
158 res = await queue_get_task
159 self.assertTrue(finished)
160 self.assertEqual(1, res)
161
162 def test_nonblocking_get(self):
163 q = asyncio.Queue()
164 q.put_nowait(1)
165 self.assertEqual(1, q.get_nowait())
166
167 def test_nonblocking_get_exception(self):
168 q = asyncio.Queue()
169 self.assertRaises(asyncio.QueueEmpty, q.get_nowait)
170
171 async def test_get_cancelled_race(self):
172 q = asyncio.Queue()
173
174 t1 = asyncio.create_task(q.get())
175 t2 = asyncio.create_task(q.get())
176
177 await asyncio.sleep(0)
178 t1.cancel()
179 await asyncio.sleep(0)
180 self.assertTrue(t1.done())
181 await q.put('a')
182 await asyncio.sleep(0)
183 self.assertEqual('a', await t2)
184
185 async def test_get_with_waiting_putters(self):
186 q = asyncio.Queue(maxsize=1)
187 asyncio.create_task(q.put('a'))
188 asyncio.create_task(q.put('b'))
189 self.assertEqual(await q.get(), 'a')
190 self.assertEqual(await q.get(), 'b')
191
192 async def test_why_are_getters_waiting(self):
193 async def consumer(queue, num_expected):
194 for _ in range(num_expected):
195 await queue.get()
196
197 async def producer(queue, num_items):
198 for i in range(num_items):
199 await queue.put(i)
200
201 producer_num_items = 5
202
203 q = asyncio.Queue(1)
204 async with asyncio.TaskGroup() as tg:
205 tg.create_task(producer(q, producer_num_items))
206 tg.create_task(consumer(q, producer_num_items))
207
208 async def test_cancelled_getters_not_being_held_in_self_getters(self):
209 queue = asyncio.Queue(maxsize=5)
210
211 with self.assertRaises(TimeoutError):
212 await asyncio.wait_for(queue.get(), 0.1)
213
214 self.assertEqual(len(queue._getters), 0)
215
216
217 class ESC[4;38;5;81mQueuePutTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
218
219 async def test_blocking_put(self):
220 q = asyncio.Queue()
221
222 # No maxsize, won't block.
223 await q.put(1)
224 self.assertEqual(1, await q.get())
225
226 async def test_blocking_put_wait(self):
227 q = asyncio.Queue(maxsize=1)
228 started = asyncio.Event()
229 finished = False
230
231 async def queue_put():
232 nonlocal finished
233 started.set()
234 await q.put(1)
235 await q.put(2)
236 finished = True
237
238 loop = asyncio.get_running_loop()
239 loop.call_later(0.01, q.get_nowait)
240 queue_put_task = asyncio.create_task(queue_put())
241 await started.wait()
242 self.assertFalse(finished)
243 await queue_put_task
244 self.assertTrue(finished)
245
246 def test_nonblocking_put(self):
247 q = asyncio.Queue()
248 q.put_nowait(1)
249 self.assertEqual(1, q.get_nowait())
250
251 async def test_get_cancel_drop_one_pending_reader(self):
252 q = asyncio.Queue()
253
254 reader = asyncio.create_task(q.get())
255
256 await asyncio.sleep(0)
257
258 q.put_nowait(1)
259 q.put_nowait(2)
260 reader.cancel()
261
262 try:
263 await reader
264 except asyncio.CancelledError:
265 # try again
266 reader = asyncio.create_task(q.get())
267 await reader
268
269 result = reader.result()
270 # if we get 2, it means 1 got dropped!
271 self.assertEqual(1, result)
272
273 async def test_get_cancel_drop_many_pending_readers(self):
274 q = asyncio.Queue()
275
276 async with asyncio.TaskGroup() as tg:
277 reader1 = tg.create_task(q.get())
278 reader2 = tg.create_task(q.get())
279 reader3 = tg.create_task(q.get())
280
281 await asyncio.sleep(0)
282
283 q.put_nowait(1)
284 q.put_nowait(2)
285 reader1.cancel()
286
287 with self.assertRaises(asyncio.CancelledError):
288 await reader1
289
290 await reader3
291
292 # It is undefined in which order concurrent readers receive results.
293 self.assertEqual({reader2.result(), reader3.result()}, {1, 2})
294
295 async def test_put_cancel_drop(self):
296 q = asyncio.Queue(1)
297
298 q.put_nowait(1)
299
300 # putting a second item in the queue has to block (qsize=1)
301 writer = asyncio.create_task(q.put(2))
302 await asyncio.sleep(0)
303
304 value1 = q.get_nowait()
305 self.assertEqual(value1, 1)
306
307 writer.cancel()
308 try:
309 await writer
310 except asyncio.CancelledError:
311 # try again
312 writer = asyncio.create_task(q.put(2))
313 await writer
314
315 value2 = q.get_nowait()
316 self.assertEqual(value2, 2)
317 self.assertEqual(q.qsize(), 0)
318
319 def test_nonblocking_put_exception(self):
320 q = asyncio.Queue(maxsize=1, )
321 q.put_nowait(1)
322 self.assertRaises(asyncio.QueueFull, q.put_nowait, 2)
323
324 async def test_float_maxsize(self):
325 q = asyncio.Queue(maxsize=1.3, )
326 q.put_nowait(1)
327 q.put_nowait(2)
328 self.assertTrue(q.full())
329 self.assertRaises(asyncio.QueueFull, q.put_nowait, 3)
330
331 q = asyncio.Queue(maxsize=1.3, )
332
333 await q.put(1)
334 await q.put(2)
335 self.assertTrue(q.full())
336
337 async def test_put_cancelled(self):
338 q = asyncio.Queue()
339
340 async def queue_put():
341 await q.put(1)
342 return True
343
344 t = asyncio.create_task(queue_put())
345
346 self.assertEqual(1, await q.get())
347 self.assertTrue(t.done())
348 self.assertTrue(t.result())
349
350 async def test_put_cancelled_race(self):
351 q = asyncio.Queue(maxsize=1)
352
353 put_a = asyncio.create_task(q.put('a'))
354 put_b = asyncio.create_task(q.put('b'))
355 put_c = asyncio.create_task(q.put('X'))
356
357 await asyncio.sleep(0)
358 self.assertTrue(put_a.done())
359 self.assertFalse(put_b.done())
360
361 put_c.cancel()
362 await asyncio.sleep(0)
363 self.assertTrue(put_c.done())
364 self.assertEqual(q.get_nowait(), 'a')
365 await asyncio.sleep(0)
366 self.assertEqual(q.get_nowait(), 'b')
367
368 await put_b
369
370 async def test_put_with_waiting_getters(self):
371 q = asyncio.Queue()
372 t = asyncio.create_task(q.get())
373 await asyncio.sleep(0)
374 await q.put('a')
375 self.assertEqual(await t, 'a')
376
377 async def test_why_are_putters_waiting(self):
378 queue = asyncio.Queue(2)
379
380 async def putter(item):
381 await queue.put(item)
382
383 async def getter():
384 await asyncio.sleep(0)
385 num = queue.qsize()
386 for _ in range(num):
387 queue.get_nowait()
388
389 async with asyncio.TaskGroup() as tg:
390 tg.create_task(getter())
391 tg.create_task(putter(0))
392 tg.create_task(putter(1))
393 tg.create_task(putter(2))
394 tg.create_task(putter(3))
395
396 async def test_cancelled_puts_not_being_held_in_self_putters(self):
397 # Full queue.
398 queue = asyncio.Queue(maxsize=1)
399 queue.put_nowait(1)
400
401 # Task waiting for space to put an item in the queue.
402 put_task = asyncio.create_task(queue.put(1))
403 await asyncio.sleep(0)
404
405 # Check that the putter is correctly removed from queue._putters when
406 # the task is canceled.
407 self.assertEqual(len(queue._putters), 1)
408 put_task.cancel()
409 with self.assertRaises(asyncio.CancelledError):
410 await put_task
411 self.assertEqual(len(queue._putters), 0)
412
413 async def test_cancelled_put_silence_value_error_exception(self):
414 # Full Queue.
415 queue = asyncio.Queue(1)
416 queue.put_nowait(1)
417
418 # Task waiting for space to put a item in the queue.
419 put_task = asyncio.create_task(queue.put(1))
420 await asyncio.sleep(0)
421
422 # get_nowait() remove the future of put_task from queue._putters.
423 queue.get_nowait()
424 # When canceled, queue.put is going to remove its future from
425 # self._putters but it was removed previously by queue.get_nowait().
426 put_task.cancel()
427
428 # The ValueError exception triggered by queue._putters.remove(putter)
429 # inside queue.put should be silenced.
430 # If the ValueError is silenced we should catch a CancelledError.
431 with self.assertRaises(asyncio.CancelledError):
432 await put_task
433
434
435 class ESC[4;38;5;81mLifoQueueTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
436
437 async def test_order(self):
438 q = asyncio.LifoQueue()
439 for i in [1, 3, 2]:
440 await q.put(i)
441
442 items = [await q.get() for _ in range(3)]
443 self.assertEqual([2, 3, 1], items)
444
445
446 class ESC[4;38;5;81mPriorityQueueTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
447
448 async def test_order(self):
449 q = asyncio.PriorityQueue()
450 for i in [1, 3, 2]:
451 await q.put(i)
452
453 items = [await q.get() for _ in range(3)]
454 self.assertEqual([1, 2, 3], items)
455
456
457 class ESC[4;38;5;81m_QueueJoinTestMixin:
458
459 q_class = None
460
461 def test_task_done_underflow(self):
462 q = self.q_class()
463 self.assertRaises(ValueError, q.task_done)
464
465 async def test_task_done(self):
466 q = self.q_class()
467 for i in range(100):
468 q.put_nowait(i)
469
470 accumulator = 0
471
472 # Two workers get items from the queue and call task_done after each.
473 # Join the queue and assert all items have been processed.
474 running = True
475
476 async def worker():
477 nonlocal accumulator
478
479 while running:
480 item = await q.get()
481 accumulator += item
482 q.task_done()
483
484 async with asyncio.TaskGroup() as tg:
485 tasks = [tg.create_task(worker())
486 for index in range(2)]
487
488 await q.join()
489 self.assertEqual(sum(range(100)), accumulator)
490
491 # close running generators
492 running = False
493 for i in range(len(tasks)):
494 q.put_nowait(0)
495
496 async def test_join_empty_queue(self):
497 q = self.q_class()
498
499 # Test that a queue join()s successfully, and before anything else
500 # (done twice for insurance).
501
502 await q.join()
503 await q.join()
504
505 async def test_format(self):
506 q = self.q_class()
507 self.assertEqual(q._format(), 'maxsize=0')
508
509 q._unfinished_tasks = 2
510 self.assertEqual(q._format(), 'maxsize=0 tasks=2')
511
512
513 class ESC[4;38;5;81mQueueJoinTests(ESC[4;38;5;149m_QueueJoinTestMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
514 q_class = asyncio.Queue
515
516
517 class ESC[4;38;5;81mLifoQueueJoinTests(ESC[4;38;5;149m_QueueJoinTestMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
518 q_class = asyncio.LifoQueue
519
520
521 class ESC[4;38;5;81mPriorityQueueJoinTests(ESC[4;38;5;149m_QueueJoinTestMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
522 q_class = asyncio.PriorityQueue
523
524
525 if __name__ == '__main__':
526 unittest.main()