(root)/
Python-3.12.0/
Lib/
test/
test_concurrent_futures/
executor.py
       1  import threading
       2  import time
       3  import weakref
       4  from concurrent import futures
       5  from test import support
       6  
       7  
       8  def mul(x, y):
       9      return x * y
      10  
      11  def capture(*args, **kwargs):
      12      return args, kwargs
      13  
      14  
      15  class ESC[4;38;5;81mMyObject(ESC[4;38;5;149mobject):
      16      def my_method(self):
      17          pass
      18  
      19  
      20  def make_dummy_object(_):
      21      return MyObject()
      22  
      23  
      24  class ESC[4;38;5;81mExecutorTest:
      25      # Executor.shutdown() and context manager usage is tested by
      26      # ExecutorShutdownTest.
      27      def test_submit(self):
      28          future = self.executor.submit(pow, 2, 8)
      29          self.assertEqual(256, future.result())
      30  
      31      def test_submit_keyword(self):
      32          future = self.executor.submit(mul, 2, y=8)
      33          self.assertEqual(16, future.result())
      34          future = self.executor.submit(capture, 1, self=2, fn=3)
      35          self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3}))
      36          with self.assertRaises(TypeError):
      37              self.executor.submit(fn=capture, arg=1)
      38          with self.assertRaises(TypeError):
      39              self.executor.submit(arg=1)
      40  
      41      def test_map(self):
      42          self.assertEqual(
      43                  list(self.executor.map(pow, range(10), range(10))),
      44                  list(map(pow, range(10), range(10))))
      45  
      46          self.assertEqual(
      47                  list(self.executor.map(pow, range(10), range(10), chunksize=3)),
      48                  list(map(pow, range(10), range(10))))
      49  
      50      def test_map_exception(self):
      51          i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
      52          self.assertEqual(i.__next__(), (0, 1))
      53          self.assertEqual(i.__next__(), (0, 1))
      54          self.assertRaises(ZeroDivisionError, i.__next__)
      55  
      56      @support.requires_resource('walltime')
      57      def test_map_timeout(self):
      58          results = []
      59          try:
      60              for i in self.executor.map(time.sleep,
      61                                         [0, 0, 6],
      62                                         timeout=5):
      63                  results.append(i)
      64          except futures.TimeoutError:
      65              pass
      66          else:
      67              self.fail('expected TimeoutError')
      68  
      69          self.assertEqual([None, None], results)
      70  
      71      def test_shutdown_race_issue12456(self):
      72          # Issue #12456: race condition at shutdown where trying to post a
      73          # sentinel in the call queue blocks (the queue is full while processes
      74          # have exited).
      75          self.executor.map(str, [2] * (self.worker_count + 1))
      76          self.executor.shutdown()
      77  
      78      @support.cpython_only
      79      def test_no_stale_references(self):
      80          # Issue #16284: check that the executors don't unnecessarily hang onto
      81          # references.
      82          my_object = MyObject()
      83          my_object_collected = threading.Event()
      84          my_object_callback = weakref.ref(
      85              my_object, lambda obj: my_object_collected.set())
      86          # Deliberately discarding the future.
      87          self.executor.submit(my_object.my_method)
      88          del my_object
      89  
      90          collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT)
      91          self.assertTrue(collected,
      92                          "Stale reference not collected within timeout.")
      93  
      94      def test_max_workers_negative(self):
      95          for number in (0, -1):
      96              with self.assertRaisesRegex(ValueError,
      97                                          "max_workers must be greater "
      98                                          "than 0"):
      99                  self.executor_type(max_workers=number)
     100  
     101      def test_free_reference(self):
     102          # Issue #14406: Result iterator should not keep an internal
     103          # reference to result objects.
     104          for obj in self.executor.map(make_dummy_object, range(10)):
     105              wr = weakref.ref(obj)
     106              del obj
     107              support.gc_collect()  # For PyPy or other GCs.
     108              self.assertIsNone(wr())