1 # Some simple queue module tests, plus some failure conditions
2 # to ensure the Queue locks remain stable.
3 import itertools
4 import random
5 import threading
6 import time
7 import unittest
8 import weakref
9 from test.support import gc_collect
10 from test.support import import_helper
11 from test.support import threading_helper
12
13 # queue module depends on threading primitives
14 threading_helper.requires_working_threading(module=True)
15
16 py_queue = import_helper.import_fresh_module('queue', blocked=['_queue'])
17 c_queue = import_helper.import_fresh_module('queue', fresh=['_queue'])
18 need_c_queue = unittest.skipUnless(c_queue, "No _queue module found")
19
20 QUEUE_SIZE = 5
21
22 def qfull(q):
23 return q.maxsize > 0 and q.qsize() == q.maxsize
24
25 # A thread to run a function that unclogs a blocked Queue.
26 class ESC[4;38;5;81m_TriggerThread(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
27 def __init__(self, fn, args):
28 self.fn = fn
29 self.args = args
30 self.startedEvent = threading.Event()
31 threading.Thread.__init__(self)
32
33 def run(self):
34 # The sleep isn't necessary, but is intended to give the blocking
35 # function in the main thread a chance at actually blocking before
36 # we unclog it. But if the sleep is longer than the timeout-based
37 # tests wait in their blocking functions, those tests will fail.
38 # So we give them much longer timeout values compared to the
39 # sleep here (I aimed at 10 seconds for blocking functions --
40 # they should never actually wait that long - they should make
41 # progress as soon as we call self.fn()).
42 time.sleep(0.1)
43 self.startedEvent.set()
44 self.fn(*self.args)
45
46
47 # Execute a function that blocks, and in a separate thread, a function that
48 # triggers the release. Returns the result of the blocking function. Caution:
49 # block_func must guarantee to block until trigger_func is called, and
50 # trigger_func must guarantee to change queue state so that block_func can make
51 # enough progress to return. In particular, a block_func that just raises an
52 # exception regardless of whether trigger_func is called will lead to
53 # timing-dependent sporadic failures, and one of those went rarely seen but
54 # undiagnosed for years. Now block_func must be unexceptional. If block_func
55 # is supposed to raise an exception, call do_exceptional_blocking_test()
56 # instead.
57
58 class ESC[4;38;5;81mBlockingTestMixin:
59
60 def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
61 thread = _TriggerThread(trigger_func, trigger_args)
62 thread.start()
63 try:
64 self.result = block_func(*block_args)
65 # If block_func returned before our thread made the call, we failed!
66 if not thread.startedEvent.is_set():
67 self.fail("blocking function %r appeared not to block" %
68 block_func)
69 return self.result
70 finally:
71 threading_helper.join_thread(thread) # make sure the thread terminates
72
73 # Call this instead if block_func is supposed to raise an exception.
74 def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
75 trigger_args, expected_exception_class):
76 thread = _TriggerThread(trigger_func, trigger_args)
77 thread.start()
78 try:
79 try:
80 block_func(*block_args)
81 except expected_exception_class:
82 raise
83 else:
84 self.fail("expected exception of kind %r" %
85 expected_exception_class)
86 finally:
87 threading_helper.join_thread(thread) # make sure the thread terminates
88 if not thread.startedEvent.is_set():
89 self.fail("trigger thread ended but event never set")
90
91
92 class ESC[4;38;5;81mBaseQueueTestMixin(ESC[4;38;5;149mBlockingTestMixin):
93 def setUp(self):
94 self.cum = 0
95 self.cumlock = threading.Lock()
96
97 def basic_queue_test(self, q):
98 if q.qsize():
99 raise RuntimeError("Call this function with an empty queue")
100 self.assertTrue(q.empty())
101 self.assertFalse(q.full())
102 # I guess we better check things actually queue correctly a little :)
103 q.put(111)
104 q.put(333)
105 q.put(222)
106 target_order = dict(Queue = [111, 333, 222],
107 LifoQueue = [222, 333, 111],
108 PriorityQueue = [111, 222, 333])
109 actual_order = [q.get(), q.get(), q.get()]
110 self.assertEqual(actual_order, target_order[q.__class__.__name__],
111 "Didn't seem to queue the correct data!")
112 for i in range(QUEUE_SIZE-1):
113 q.put(i)
114 self.assertTrue(q.qsize(), "Queue should not be empty")
115 self.assertTrue(not qfull(q), "Queue should not be full")
116 last = 2 * QUEUE_SIZE
117 full = 3 * 2 * QUEUE_SIZE
118 q.put(last)
119 self.assertTrue(qfull(q), "Queue should be full")
120 self.assertFalse(q.empty())
121 self.assertTrue(q.full())
122 try:
123 q.put(full, block=0)
124 self.fail("Didn't appear to block with a full queue")
125 except self.queue.Full:
126 pass
127 try:
128 q.put(full, timeout=0.01)
129 self.fail("Didn't appear to time-out with a full queue")
130 except self.queue.Full:
131 pass
132 # Test a blocking put
133 self.do_blocking_test(q.put, (full,), q.get, ())
134 self.do_blocking_test(q.put, (full, True, 10), q.get, ())
135 # Empty it
136 for i in range(QUEUE_SIZE):
137 q.get()
138 self.assertTrue(not q.qsize(), "Queue should be empty")
139 try:
140 q.get(block=0)
141 self.fail("Didn't appear to block with an empty queue")
142 except self.queue.Empty:
143 pass
144 try:
145 q.get(timeout=0.01)
146 self.fail("Didn't appear to time-out with an empty queue")
147 except self.queue.Empty:
148 pass
149 # Test a blocking get
150 self.do_blocking_test(q.get, (), q.put, ('empty',))
151 self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))
152
153
154 def worker(self, q):
155 while True:
156 x = q.get()
157 if x < 0:
158 q.task_done()
159 return
160 with self.cumlock:
161 self.cum += x
162 q.task_done()
163
164 def queue_join_test(self, q):
165 self.cum = 0
166 threads = []
167 for i in (0,1):
168 thread = threading.Thread(target=self.worker, args=(q,))
169 thread.start()
170 threads.append(thread)
171 for i in range(100):
172 q.put(i)
173 q.join()
174 self.assertEqual(self.cum, sum(range(100)),
175 "q.join() did not block until all tasks were done")
176 for i in (0,1):
177 q.put(-1) # instruct the threads to close
178 q.join() # verify that you can join twice
179 for thread in threads:
180 thread.join()
181
182 def test_queue_task_done(self):
183 # Test to make sure a queue task completed successfully.
184 q = self.type2test()
185 try:
186 q.task_done()
187 except ValueError:
188 pass
189 else:
190 self.fail("Did not detect task count going negative")
191
192 def test_queue_join(self):
193 # Test that a queue join()s successfully, and before anything else
194 # (done twice for insurance).
195 q = self.type2test()
196 self.queue_join_test(q)
197 self.queue_join_test(q)
198 try:
199 q.task_done()
200 except ValueError:
201 pass
202 else:
203 self.fail("Did not detect task count going negative")
204
205 def test_basic(self):
206 # Do it a couple of times on the same queue.
207 # Done twice to make sure works with same instance reused.
208 q = self.type2test(QUEUE_SIZE)
209 self.basic_queue_test(q)
210 self.basic_queue_test(q)
211
212 def test_negative_timeout_raises_exception(self):
213 q = self.type2test(QUEUE_SIZE)
214 with self.assertRaises(ValueError):
215 q.put(1, timeout=-1)
216 with self.assertRaises(ValueError):
217 q.get(1, timeout=-1)
218
219 def test_nowait(self):
220 q = self.type2test(QUEUE_SIZE)
221 for i in range(QUEUE_SIZE):
222 q.put_nowait(1)
223 with self.assertRaises(self.queue.Full):
224 q.put_nowait(1)
225
226 for i in range(QUEUE_SIZE):
227 q.get_nowait()
228 with self.assertRaises(self.queue.Empty):
229 q.get_nowait()
230
231 def test_shrinking_queue(self):
232 # issue 10110
233 q = self.type2test(3)
234 q.put(1)
235 q.put(2)
236 q.put(3)
237 with self.assertRaises(self.queue.Full):
238 q.put_nowait(4)
239 self.assertEqual(q.qsize(), 3)
240 q.maxsize = 2 # shrink the queue
241 with self.assertRaises(self.queue.Full):
242 q.put_nowait(4)
243
244 class ESC[4;38;5;81mQueueTest(ESC[4;38;5;149mBaseQueueTestMixin):
245
246 def setUp(self):
247 self.type2test = self.queue.Queue
248 super().setUp()
249
250 class ESC[4;38;5;81mPyQueueTest(ESC[4;38;5;149mQueueTest, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
251 queue = py_queue
252
253
254 @need_c_queue
255 class ESC[4;38;5;81mCQueueTest(ESC[4;38;5;149mQueueTest, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
256 queue = c_queue
257
258
259 class ESC[4;38;5;81mLifoQueueTest(ESC[4;38;5;149mBaseQueueTestMixin):
260
261 def setUp(self):
262 self.type2test = self.queue.LifoQueue
263 super().setUp()
264
265
266 class ESC[4;38;5;81mPyLifoQueueTest(ESC[4;38;5;149mLifoQueueTest, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
267 queue = py_queue
268
269
270 @need_c_queue
271 class ESC[4;38;5;81mCLifoQueueTest(ESC[4;38;5;149mLifoQueueTest, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
272 queue = c_queue
273
274
275 class ESC[4;38;5;81mPriorityQueueTest(ESC[4;38;5;149mBaseQueueTestMixin):
276
277 def setUp(self):
278 self.type2test = self.queue.PriorityQueue
279 super().setUp()
280
281
282 class ESC[4;38;5;81mPyPriorityQueueTest(ESC[4;38;5;149mPriorityQueueTest, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
283 queue = py_queue
284
285
286 @need_c_queue
287 class ESC[4;38;5;81mCPriorityQueueTest(ESC[4;38;5;149mPriorityQueueTest, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
288 queue = c_queue
289
290
291 # A Queue subclass that can provoke failure at a moment's notice :)
292 class ESC[4;38;5;81mFailingQueueException(ESC[4;38;5;149mException): pass
293
294
295 class ESC[4;38;5;81mFailingQueueTest(ESC[4;38;5;149mBlockingTestMixin):
296
297 def setUp(self):
298
299 Queue = self.queue.Queue
300
301 class ESC[4;38;5;81mFailingQueue(ESC[4;38;5;149mQueue):
302 def __init__(self, *args):
303 self.fail_next_put = False
304 self.fail_next_get = False
305 Queue.__init__(self, *args)
306 def _put(self, item):
307 if self.fail_next_put:
308 self.fail_next_put = False
309 raise FailingQueueException("You Lose")
310 return Queue._put(self, item)
311 def _get(self):
312 if self.fail_next_get:
313 self.fail_next_get = False
314 raise FailingQueueException("You Lose")
315 return Queue._get(self)
316
317 self.FailingQueue = FailingQueue
318
319 super().setUp()
320
321 def failing_queue_test(self, q):
322 if q.qsize():
323 raise RuntimeError("Call this function with an empty queue")
324 for i in range(QUEUE_SIZE-1):
325 q.put(i)
326 # Test a failing non-blocking put.
327 q.fail_next_put = True
328 try:
329 q.put("oops", block=0)
330 self.fail("The queue didn't fail when it should have")
331 except FailingQueueException:
332 pass
333 q.fail_next_put = True
334 try:
335 q.put("oops", timeout=0.1)
336 self.fail("The queue didn't fail when it should have")
337 except FailingQueueException:
338 pass
339 q.put("last")
340 self.assertTrue(qfull(q), "Queue should be full")
341 # Test a failing blocking put
342 q.fail_next_put = True
343 try:
344 self.do_blocking_test(q.put, ("full",), q.get, ())
345 self.fail("The queue didn't fail when it should have")
346 except FailingQueueException:
347 pass
348 # Check the Queue isn't damaged.
349 # put failed, but get succeeded - re-add
350 q.put("last")
351 # Test a failing timeout put
352 q.fail_next_put = True
353 try:
354 self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
355 FailingQueueException)
356 self.fail("The queue didn't fail when it should have")
357 except FailingQueueException:
358 pass
359 # Check the Queue isn't damaged.
360 # put failed, but get succeeded - re-add
361 q.put("last")
362 self.assertTrue(qfull(q), "Queue should be full")
363 q.get()
364 self.assertTrue(not qfull(q), "Queue should not be full")
365 q.put("last")
366 self.assertTrue(qfull(q), "Queue should be full")
367 # Test a blocking put
368 self.do_blocking_test(q.put, ("full",), q.get, ())
369 # Empty it
370 for i in range(QUEUE_SIZE):
371 q.get()
372 self.assertTrue(not q.qsize(), "Queue should be empty")
373 q.put("first")
374 q.fail_next_get = True
375 try:
376 q.get()
377 self.fail("The queue didn't fail when it should have")
378 except FailingQueueException:
379 pass
380 self.assertTrue(q.qsize(), "Queue should not be empty")
381 q.fail_next_get = True
382 try:
383 q.get(timeout=0.1)
384 self.fail("The queue didn't fail when it should have")
385 except FailingQueueException:
386 pass
387 self.assertTrue(q.qsize(), "Queue should not be empty")
388 q.get()
389 self.assertTrue(not q.qsize(), "Queue should be empty")
390 q.fail_next_get = True
391 try:
392 self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
393 FailingQueueException)
394 self.fail("The queue didn't fail when it should have")
395 except FailingQueueException:
396 pass
397 # put succeeded, but get failed.
398 self.assertTrue(q.qsize(), "Queue should not be empty")
399 q.get()
400 self.assertTrue(not q.qsize(), "Queue should be empty")
401
402 def test_failing_queue(self):
403
404 # Test to make sure a queue is functioning correctly.
405 # Done twice to the same instance.
406 q = self.FailingQueue(QUEUE_SIZE)
407 self.failing_queue_test(q)
408 self.failing_queue_test(q)
409
410
411
412 class ESC[4;38;5;81mPyFailingQueueTest(ESC[4;38;5;149mFailingQueueTest, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
413 queue = py_queue
414
415
416 @need_c_queue
417 class ESC[4;38;5;81mCFailingQueueTest(ESC[4;38;5;149mFailingQueueTest, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
418 queue = c_queue
419
420
421 class ESC[4;38;5;81mBaseSimpleQueueTest:
422
423 def setUp(self):
424 self.q = self.type2test()
425
426 def feed(self, q, seq, rnd, sentinel):
427 while True:
428 try:
429 val = seq.pop()
430 except IndexError:
431 q.put(sentinel)
432 return
433 q.put(val)
434 if rnd.random() > 0.5:
435 time.sleep(rnd.random() * 1e-3)
436
437 def consume(self, q, results, sentinel):
438 while True:
439 val = q.get()
440 if val == sentinel:
441 return
442 results.append(val)
443
444 def consume_nonblock(self, q, results, sentinel):
445 while True:
446 while True:
447 try:
448 val = q.get(block=False)
449 except self.queue.Empty:
450 time.sleep(1e-5)
451 else:
452 break
453 if val == sentinel:
454 return
455 results.append(val)
456
457 def consume_timeout(self, q, results, sentinel):
458 while True:
459 while True:
460 try:
461 val = q.get(timeout=1e-5)
462 except self.queue.Empty:
463 pass
464 else:
465 break
466 if val == sentinel:
467 return
468 results.append(val)
469
470 def run_threads(self, n_threads, q, inputs, feed_func, consume_func):
471 results = []
472 sentinel = None
473 seq = inputs.copy()
474 seq.reverse()
475 rnd = random.Random(42)
476
477 exceptions = []
478 def log_exceptions(f):
479 def wrapper(*args, **kwargs):
480 try:
481 f(*args, **kwargs)
482 except BaseException as e:
483 exceptions.append(e)
484 return wrapper
485
486 feeders = [threading.Thread(target=log_exceptions(feed_func),
487 args=(q, seq, rnd, sentinel))
488 for i in range(n_threads)]
489 consumers = [threading.Thread(target=log_exceptions(consume_func),
490 args=(q, results, sentinel))
491 for i in range(n_threads)]
492
493 with threading_helper.start_threads(feeders + consumers):
494 pass
495
496 self.assertFalse(exceptions)
497 self.assertTrue(q.empty())
498 self.assertEqual(q.qsize(), 0)
499
500 return results
501
502 def test_basic(self):
503 # Basic tests for get(), put() etc.
504 q = self.q
505 self.assertTrue(q.empty())
506 self.assertEqual(q.qsize(), 0)
507 q.put(1)
508 self.assertFalse(q.empty())
509 self.assertEqual(q.qsize(), 1)
510 q.put(2)
511 q.put_nowait(3)
512 q.put(4)
513 self.assertFalse(q.empty())
514 self.assertEqual(q.qsize(), 4)
515
516 self.assertEqual(q.get(), 1)
517 self.assertEqual(q.qsize(), 3)
518
519 self.assertEqual(q.get_nowait(), 2)
520 self.assertEqual(q.qsize(), 2)
521
522 self.assertEqual(q.get(block=False), 3)
523 self.assertFalse(q.empty())
524 self.assertEqual(q.qsize(), 1)
525
526 self.assertEqual(q.get(timeout=0.1), 4)
527 self.assertTrue(q.empty())
528 self.assertEqual(q.qsize(), 0)
529
530 with self.assertRaises(self.queue.Empty):
531 q.get(block=False)
532 with self.assertRaises(self.queue.Empty):
533 q.get(timeout=1e-3)
534 with self.assertRaises(self.queue.Empty):
535 q.get_nowait()
536 self.assertTrue(q.empty())
537 self.assertEqual(q.qsize(), 0)
538
539 def test_negative_timeout_raises_exception(self):
540 q = self.q
541 q.put(1)
542 with self.assertRaises(ValueError):
543 q.get(timeout=-1)
544
545 def test_order(self):
546 # Test a pair of concurrent put() and get()
547 q = self.q
548 inputs = list(range(100))
549 results = self.run_threads(1, q, inputs, self.feed, self.consume)
550
551 # One producer, one consumer => results appended in well-defined order
552 self.assertEqual(results, inputs)
553
554 def test_many_threads(self):
555 # Test multiple concurrent put() and get()
556 N = 50
557 q = self.q
558 inputs = list(range(10000))
559 results = self.run_threads(N, q, inputs, self.feed, self.consume)
560
561 # Multiple consumers without synchronization append the
562 # results in random order
563 self.assertEqual(sorted(results), inputs)
564
565 def test_many_threads_nonblock(self):
566 # Test multiple concurrent put() and get(block=False)
567 N = 50
568 q = self.q
569 inputs = list(range(10000))
570 results = self.run_threads(N, q, inputs,
571 self.feed, self.consume_nonblock)
572
573 self.assertEqual(sorted(results), inputs)
574
575 def test_many_threads_timeout(self):
576 # Test multiple concurrent put() and get(timeout=...)
577 N = 50
578 q = self.q
579 inputs = list(range(1000))
580 results = self.run_threads(N, q, inputs,
581 self.feed, self.consume_timeout)
582
583 self.assertEqual(sorted(results), inputs)
584
585 def test_references(self):
586 # The queue should lose references to each item as soon as
587 # it leaves the queue.
588 class ESC[4;38;5;81mC:
589 pass
590
591 N = 20
592 q = self.q
593 for i in range(N):
594 q.put(C())
595 for i in range(N):
596 wr = weakref.ref(q.get())
597 gc_collect() # For PyPy or other GCs.
598 self.assertIsNone(wr())
599
600
601 class ESC[4;38;5;81mPySimpleQueueTest(ESC[4;38;5;149mBaseSimpleQueueTest, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
602
603 queue = py_queue
604 def setUp(self):
605 self.type2test = self.queue._PySimpleQueue
606 super().setUp()
607
608
609 @need_c_queue
610 class ESC[4;38;5;81mCSimpleQueueTest(ESC[4;38;5;149mBaseSimpleQueueTest, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
611
612 queue = c_queue
613
614 def setUp(self):
615 self.type2test = self.queue.SimpleQueue
616 super().setUp()
617
618 def test_is_default(self):
619 self.assertIs(self.type2test, self.queue.SimpleQueue)
620 self.assertIs(self.type2test, self.queue.SimpleQueue)
621
622 def test_reentrancy(self):
623 # bpo-14976: put() may be called reentrantly in an asynchronous
624 # callback.
625 q = self.q
626 gen = itertools.count()
627 N = 10000
628 results = []
629
630 # This test exploits the fact that __del__ in a reference cycle
631 # can be called any time the GC may run.
632
633 class ESC[4;38;5;81mCircular(ESC[4;38;5;149mobject):
634 def __init__(self):
635 self.circular = self
636
637 def __del__(self):
638 q.put(next(gen))
639
640 while True:
641 o = Circular()
642 q.put(next(gen))
643 del o
644 results.append(q.get())
645 if results[-1] >= N:
646 break
647
648 self.assertEqual(results, list(range(N + 1)))
649
650
651 if __name__ == "__main__":
652 unittest.main()