1  import itertools
       2  import time
       3  import unittest
       4  import weakref
       5  from concurrent import futures
       6  from concurrent.futures._base import (
       7      CANCELLED_AND_NOTIFIED, FINISHED, Future)
       8  
       9  from test import support
      10  
      11  from .util import (
      12      PENDING_FUTURE, RUNNING_FUTURE,
      13      CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE,
      14      create_future, create_executor_tests, setup_module)
      15  
      16  
      17  def mul(x, y):
      18      return x * y
      19  
      20  
      21  class ESC[4;38;5;81mAsCompletedTests:
      22      def test_no_timeout(self):
      23          future1 = self.executor.submit(mul, 2, 21)
      24          future2 = self.executor.submit(mul, 7, 6)
      25  
      26          completed = set(futures.as_completed(
      27                  [CANCELLED_AND_NOTIFIED_FUTURE,
      28                   EXCEPTION_FUTURE,
      29                   SUCCESSFUL_FUTURE,
      30                   future1, future2]))
      31          self.assertEqual(set(
      32                  [CANCELLED_AND_NOTIFIED_FUTURE,
      33                   EXCEPTION_FUTURE,
      34                   SUCCESSFUL_FUTURE,
      35                   future1, future2]),
      36                  completed)
      37  
      38      def test_future_times_out(self):
      39          """Test ``futures.as_completed`` timing out before
      40          completing it's final future."""
      41          already_completed = {CANCELLED_AND_NOTIFIED_FUTURE,
      42                               EXCEPTION_FUTURE,
      43                               SUCCESSFUL_FUTURE}
      44  
      45          for timeout in (0, 0.01):
      46              with self.subTest(timeout):
      47  
      48                  future = self.executor.submit(time.sleep, 0.1)
      49                  completed_futures = set()
      50                  try:
      51                      for f in futures.as_completed(
      52                          already_completed | {future},
      53                          timeout
      54                      ):
      55                          completed_futures.add(f)
      56                  except futures.TimeoutError:
      57                      pass
      58  
      59                  # Check that ``future`` wasn't completed.
      60                  self.assertEqual(completed_futures, already_completed)
      61  
      62      def test_duplicate_futures(self):
      63          # Issue 20367. Duplicate futures should not raise exceptions or give
      64          # duplicate responses.
      65          # Issue #31641: accept arbitrary iterables.
      66          future1 = self.executor.submit(time.sleep, 2)
      67          completed = [
      68              f for f in futures.as_completed(itertools.repeat(future1, 3))
      69          ]
      70          self.assertEqual(len(completed), 1)
      71  
      72      def test_free_reference_yielded_future(self):
      73          # Issue #14406: Generator should not keep references
      74          # to finished futures.
      75          futures_list = [Future() for _ in range(8)]
      76          futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
      77          futures_list.append(create_future(state=FINISHED, result=42))
      78  
      79          with self.assertRaises(futures.TimeoutError):
      80              for future in futures.as_completed(futures_list, timeout=0):
      81                  futures_list.remove(future)
      82                  wr = weakref.ref(future)
      83                  del future
      84                  support.gc_collect()  # For PyPy or other GCs.
      85                  self.assertIsNone(wr())
      86  
      87          futures_list[0].set_result("test")
      88          for future in futures.as_completed(futures_list):
      89              futures_list.remove(future)
      90              wr = weakref.ref(future)
      91              del future
      92              support.gc_collect()  # For PyPy or other GCs.
      93              self.assertIsNone(wr())
      94              if futures_list:
      95                  futures_list[0].set_result("test")
      96  
      97      def test_correct_timeout_exception_msg(self):
      98          futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
      99                          RUNNING_FUTURE, SUCCESSFUL_FUTURE]
     100  
     101          with self.assertRaises(futures.TimeoutError) as cm:
     102              list(futures.as_completed(futures_list, timeout=0))
     103  
     104          self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
     105  
     106  
     107  create_executor_tests(globals(), AsCompletedTests)
     108  
     109  
     110  def setUpModule():
     111      setup_module()
     112  
     113  
     114  if __name__ == "__main__":
     115      unittest.main()