(root)/
Python-3.11.7/
Lib/
test/
test_concurrent_futures/
test_as_completed.py
       1  import itertools
       2  import time
       3  import unittest
       4  import weakref
       5  from concurrent import futures
       6  from concurrent.futures._base import (
       7      CANCELLED_AND_NOTIFIED, FINISHED, Future)
       8  
       9  from test import support
      10  
      11  from .util import (
      12      PENDING_FUTURE, RUNNING_FUTURE,
      13      CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE,
      14      create_future, create_executor_tests, setup_module)
      15  
      16  
      17  def mul(x, y):
      18      return x * y
      19  
      20  
      21  class ESC[4;38;5;81mAsCompletedTests:
      22      # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
      23      def test_no_timeout(self):
      24          future1 = self.executor.submit(mul, 2, 21)
      25          future2 = self.executor.submit(mul, 7, 6)
      26  
      27          completed = set(futures.as_completed(
      28                  [CANCELLED_AND_NOTIFIED_FUTURE,
      29                   EXCEPTION_FUTURE,
      30                   SUCCESSFUL_FUTURE,
      31                   future1, future2]))
      32          self.assertEqual(set(
      33                  [CANCELLED_AND_NOTIFIED_FUTURE,
      34                   EXCEPTION_FUTURE,
      35                   SUCCESSFUL_FUTURE,
      36                   future1, future2]),
      37                  completed)
      38  
      39      def test_zero_timeout(self):
      40          future1 = self.executor.submit(time.sleep, 2)
      41          completed_futures = set()
      42          try:
      43              for future in futures.as_completed(
      44                      [CANCELLED_AND_NOTIFIED_FUTURE,
      45                       EXCEPTION_FUTURE,
      46                       SUCCESSFUL_FUTURE,
      47                       future1],
      48                      timeout=0):
      49                  completed_futures.add(future)
      50          except futures.TimeoutError:
      51              pass
      52  
      53          self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
      54                                EXCEPTION_FUTURE,
      55                                SUCCESSFUL_FUTURE]),
      56                           completed_futures)
      57  
      58      def test_duplicate_futures(self):
      59          # Issue 20367. Duplicate futures should not raise exceptions or give
      60          # duplicate responses.
      61          # Issue #31641: accept arbitrary iterables.
      62          future1 = self.executor.submit(time.sleep, 2)
      63          completed = [
      64              f for f in futures.as_completed(itertools.repeat(future1, 3))
      65          ]
      66          self.assertEqual(len(completed), 1)
      67  
      68      def test_free_reference_yielded_future(self):
      69          # Issue #14406: Generator should not keep references
      70          # to finished futures.
      71          futures_list = [Future() for _ in range(8)]
      72          futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
      73          futures_list.append(create_future(state=FINISHED, result=42))
      74  
      75          with self.assertRaises(futures.TimeoutError):
      76              for future in futures.as_completed(futures_list, timeout=0):
      77                  futures_list.remove(future)
      78                  wr = weakref.ref(future)
      79                  del future
      80                  support.gc_collect()  # For PyPy or other GCs.
      81                  self.assertIsNone(wr())
      82  
      83          futures_list[0].set_result("test")
      84          for future in futures.as_completed(futures_list):
      85              futures_list.remove(future)
      86              wr = weakref.ref(future)
      87              del future
      88              support.gc_collect()  # For PyPy or other GCs.
      89              self.assertIsNone(wr())
      90              if futures_list:
      91                  futures_list[0].set_result("test")
      92  
      93      def test_correct_timeout_exception_msg(self):
      94          futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
      95                          RUNNING_FUTURE, SUCCESSFUL_FUTURE]
      96  
      97          with self.assertRaises(futures.TimeoutError) as cm:
      98              list(futures.as_completed(futures_list, timeout=0))
      99  
     100          self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
     101  
     102  
     103  create_executor_tests(globals(), AsCompletedTests)
     104  
     105  
     106  def setUpModule():
     107      setup_module()
     108  
     109  
     110  if __name__ == "__main__":
     111      unittest.main()