1  import sys
       2  import threading
       3  import time
       4  import unittest
       5  from concurrent import futures
       6  from test import support
       7  
       8  from .util import (
       9      CANCELLED_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE,
      10      SUCCESSFUL_FUTURE,
      11      create_executor_tests, setup_module,
      12      BaseTestCase, ThreadPoolMixin,
      13      ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)
      14  
      15  
      16  def mul(x, y):
      17      return x * y
      18  
      19  def sleep_and_raise(t):
      20      time.sleep(t)
      21      raise Exception('this is an exception')
      22  
      23  
      24  class ESC[4;38;5;81mWaitTests:
      25      def test_20369(self):
      26          # See https://bugs.python.org/issue20369
      27          future = self.executor.submit(time.sleep, 1.5)
      28          done, not_done = futures.wait([future, future],
      29                              return_when=futures.ALL_COMPLETED)
      30          self.assertEqual({future}, done)
      31          self.assertEqual(set(), not_done)
      32  
      33  
      34      def test_first_completed(self):
      35          future1 = self.executor.submit(mul, 21, 2)
      36          future2 = self.executor.submit(time.sleep, 1.5)
      37  
      38          done, not_done = futures.wait(
      39                  [CANCELLED_FUTURE, future1, future2],
      40                   return_when=futures.FIRST_COMPLETED)
      41  
      42          self.assertEqual(set([future1]), done)
      43          self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
      44  
      45      def test_first_completed_some_already_completed(self):
      46          future1 = self.executor.submit(time.sleep, 1.5)
      47  
      48          finished, pending = futures.wait(
      49                   [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
      50                   return_when=futures.FIRST_COMPLETED)
      51  
      52          self.assertEqual(
      53                  set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
      54                  finished)
      55          self.assertEqual(set([future1]), pending)
      56  
      57      @support.requires_resource('walltime')
      58      def test_first_exception(self):
      59          future1 = self.executor.submit(mul, 2, 21)
      60          future2 = self.executor.submit(sleep_and_raise, 1.5)
      61          future3 = self.executor.submit(time.sleep, 3)
      62  
      63          finished, pending = futures.wait(
      64                  [future1, future2, future3],
      65                  return_when=futures.FIRST_EXCEPTION)
      66  
      67          self.assertEqual(set([future1, future2]), finished)
      68          self.assertEqual(set([future3]), pending)
      69  
      70      def test_first_exception_some_already_complete(self):
      71          future1 = self.executor.submit(divmod, 21, 0)
      72          future2 = self.executor.submit(time.sleep, 1.5)
      73  
      74          finished, pending = futures.wait(
      75                  [SUCCESSFUL_FUTURE,
      76                   CANCELLED_FUTURE,
      77                   CANCELLED_AND_NOTIFIED_FUTURE,
      78                   future1, future2],
      79                  return_when=futures.FIRST_EXCEPTION)
      80  
      81          self.assertEqual(set([SUCCESSFUL_FUTURE,
      82                                CANCELLED_AND_NOTIFIED_FUTURE,
      83                                future1]), finished)
      84          self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
      85  
      86      def test_first_exception_one_already_failed(self):
      87          future1 = self.executor.submit(time.sleep, 2)
      88  
      89          finished, pending = futures.wait(
      90                   [EXCEPTION_FUTURE, future1],
      91                   return_when=futures.FIRST_EXCEPTION)
      92  
      93          self.assertEqual(set([EXCEPTION_FUTURE]), finished)
      94          self.assertEqual(set([future1]), pending)
      95  
      96      def test_all_completed(self):
      97          future1 = self.executor.submit(divmod, 2, 0)
      98          future2 = self.executor.submit(mul, 2, 21)
      99  
     100          finished, pending = futures.wait(
     101                  [SUCCESSFUL_FUTURE,
     102                   CANCELLED_AND_NOTIFIED_FUTURE,
     103                   EXCEPTION_FUTURE,
     104                   future1,
     105                   future2],
     106                  return_when=futures.ALL_COMPLETED)
     107  
     108          self.assertEqual(set([SUCCESSFUL_FUTURE,
     109                                CANCELLED_AND_NOTIFIED_FUTURE,
     110                                EXCEPTION_FUTURE,
     111                                future1,
     112                                future2]), finished)
     113          self.assertEqual(set(), pending)
     114  
     115      @support.requires_resource('walltime')
     116      def test_timeout(self):
     117          future1 = self.executor.submit(mul, 6, 7)
     118          future2 = self.executor.submit(time.sleep, 6)
     119  
     120          finished, pending = futures.wait(
     121                  [CANCELLED_AND_NOTIFIED_FUTURE,
     122                   EXCEPTION_FUTURE,
     123                   SUCCESSFUL_FUTURE,
     124                   future1, future2],
     125                  timeout=5,
     126                  return_when=futures.ALL_COMPLETED)
     127  
     128          self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
     129                                EXCEPTION_FUTURE,
     130                                SUCCESSFUL_FUTURE,
     131                                future1]), finished)
     132          self.assertEqual(set([future2]), pending)
     133  
     134  
     135  class ESC[4;38;5;81mThreadPoolWaitTests(ESC[4;38;5;149mThreadPoolMixin, ESC[4;38;5;149mWaitTests, ESC[4;38;5;149mBaseTestCase):
     136  
     137      def test_pending_calls_race(self):
     138          # Issue #14406: multi-threaded race condition when waiting on all
     139          # futures.
     140          event = threading.Event()
     141          def future_func():
     142              event.wait()
     143          oldswitchinterval = sys.getswitchinterval()
     144          sys.setswitchinterval(1e-6)
     145          try:
     146              fs = {self.executor.submit(future_func) for i in range(100)}
     147              event.set()
     148              futures.wait(fs, return_when=futures.ALL_COMPLETED)
     149          finally:
     150              sys.setswitchinterval(oldswitchinterval)
     151  
     152  
     153  create_executor_tests(globals(), WaitTests,
     154                        executor_mixins=(ProcessPoolForkMixin,
     155                                         ProcessPoolForkserverMixin,
     156                                         ProcessPoolSpawnMixin))
     157  
     158  
     159  def setUpModule():
     160      setup_module()
     161  
     162  
     163  if __name__ == "__main__":
     164      unittest.main()