(root)/
Python-3.11.7/
Lib/
test/
test_concurrent_futures/
test_wait.py
       1  import sys
       2  import threading
       3  import time
       4  import unittest
       5  from concurrent import futures
       6  
       7  from .util import (
       8      CANCELLED_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE,
       9      SUCCESSFUL_FUTURE,
      10      create_executor_tests, setup_module,
      11      BaseTestCase, ThreadPoolMixin,
      12      ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)
      13  
      14  
      15  def mul(x, y):
      16      return x * y
      17  
      18  def sleep_and_raise(t):
      19      time.sleep(t)
      20      raise Exception('this is an exception')
      21  
      22  
      23  class ESC[4;38;5;81mWaitTests:
      24      def test_20369(self):
      25          # See https://bugs.python.org/issue20369
      26          future = self.executor.submit(time.sleep, 1.5)
      27          done, not_done = futures.wait([future, future],
      28                              return_when=futures.ALL_COMPLETED)
      29          self.assertEqual({future}, done)
      30          self.assertEqual(set(), not_done)
      31  
      32  
      33      def test_first_completed(self):
      34          future1 = self.executor.submit(mul, 21, 2)
      35          future2 = self.executor.submit(time.sleep, 1.5)
      36  
      37          done, not_done = futures.wait(
      38                  [CANCELLED_FUTURE, future1, future2],
      39                   return_when=futures.FIRST_COMPLETED)
      40  
      41          self.assertEqual(set([future1]), done)
      42          self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
      43  
      44      def test_first_completed_some_already_completed(self):
      45          future1 = self.executor.submit(time.sleep, 1.5)
      46  
      47          finished, pending = futures.wait(
      48                   [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
      49                   return_when=futures.FIRST_COMPLETED)
      50  
      51          self.assertEqual(
      52                  set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
      53                  finished)
      54          self.assertEqual(set([future1]), pending)
      55  
      56      def test_first_exception(self):
      57          future1 = self.executor.submit(mul, 2, 21)
      58          future2 = self.executor.submit(sleep_and_raise, 1.5)
      59          future3 = self.executor.submit(time.sleep, 3)
      60  
      61          finished, pending = futures.wait(
      62                  [future1, future2, future3],
      63                  return_when=futures.FIRST_EXCEPTION)
      64  
      65          self.assertEqual(set([future1, future2]), finished)
      66          self.assertEqual(set([future3]), pending)
      67  
      68      def test_first_exception_some_already_complete(self):
      69          future1 = self.executor.submit(divmod, 21, 0)
      70          future2 = self.executor.submit(time.sleep, 1.5)
      71  
      72          finished, pending = futures.wait(
      73                  [SUCCESSFUL_FUTURE,
      74                   CANCELLED_FUTURE,
      75                   CANCELLED_AND_NOTIFIED_FUTURE,
      76                   future1, future2],
      77                  return_when=futures.FIRST_EXCEPTION)
      78  
      79          self.assertEqual(set([SUCCESSFUL_FUTURE,
      80                                CANCELLED_AND_NOTIFIED_FUTURE,
      81                                future1]), finished)
      82          self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
      83  
      84      def test_first_exception_one_already_failed(self):
      85          future1 = self.executor.submit(time.sleep, 2)
      86  
      87          finished, pending = futures.wait(
      88                   [EXCEPTION_FUTURE, future1],
      89                   return_when=futures.FIRST_EXCEPTION)
      90  
      91          self.assertEqual(set([EXCEPTION_FUTURE]), finished)
      92          self.assertEqual(set([future1]), pending)
      93  
      94      def test_all_completed(self):
      95          future1 = self.executor.submit(divmod, 2, 0)
      96          future2 = self.executor.submit(mul, 2, 21)
      97  
      98          finished, pending = futures.wait(
      99                  [SUCCESSFUL_FUTURE,
     100                   CANCELLED_AND_NOTIFIED_FUTURE,
     101                   EXCEPTION_FUTURE,
     102                   future1,
     103                   future2],
     104                  return_when=futures.ALL_COMPLETED)
     105  
     106          self.assertEqual(set([SUCCESSFUL_FUTURE,
     107                                CANCELLED_AND_NOTIFIED_FUTURE,
     108                                EXCEPTION_FUTURE,
     109                                future1,
     110                                future2]), finished)
     111          self.assertEqual(set(), pending)
     112  
     113      def test_timeout(self):
     114          short_timeout = 0.050
     115          long_timeout = short_timeout * 10
     116  
     117          future = self.executor.submit(time.sleep, long_timeout)
     118  
     119          finished, pending = futures.wait(
     120                  [CANCELLED_AND_NOTIFIED_FUTURE,
     121                   EXCEPTION_FUTURE,
     122                   SUCCESSFUL_FUTURE,
     123                   future],
     124                  timeout=short_timeout,
     125                  return_when=futures.ALL_COMPLETED)
     126  
     127          self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
     128                                EXCEPTION_FUTURE,
     129                                SUCCESSFUL_FUTURE]),
     130                           finished)
     131          self.assertEqual(set([future]), pending)
     132  
     133  
     134  class ESC[4;38;5;81mThreadPoolWaitTests(ESC[4;38;5;149mThreadPoolMixin, ESC[4;38;5;149mWaitTests, ESC[4;38;5;149mBaseTestCase):
     135  
     136      def test_pending_calls_race(self):
     137          # Issue #14406: multi-threaded race condition when waiting on all
     138          # futures.
     139          event = threading.Event()
     140          def future_func():
     141              event.wait()
     142          oldswitchinterval = sys.getswitchinterval()
     143          sys.setswitchinterval(1e-6)
     144          try:
     145              fs = {self.executor.submit(future_func) for i in range(100)}
     146              event.set()
     147              futures.wait(fs, return_when=futures.ALL_COMPLETED)
     148          finally:
     149              sys.setswitchinterval(oldswitchinterval)
     150  
     151  
     152  create_executor_tests(globals(), WaitTests,
     153                        executor_mixins=(ProcessPoolForkMixin,
     154                                         ProcessPoolForkserverMixin,
     155                                         ProcessPoolSpawnMixin))
     156  
     157  
     158  def setUpModule():
     159      setup_module()
     160  
     161  
     162  if __name__ == "__main__":
     163      unittest.main()