python (3.11.7)

(root)/
lib/
python3.11/
multiprocessing/
pool.py
       1  #
       2  # Module providing the `Pool` class for managing a process pool
       3  #
       4  # multiprocessing/pool.py
       5  #
       6  # Copyright (c) 2006-2008, R Oudkerk
       7  # Licensed to PSF under a Contributor Agreement.
       8  #
       9  
      10  __all__ = ['Pool', 'ThreadPool']
      11  
      12  #
      13  # Imports
      14  #
      15  
      16  import collections
      17  import itertools
      18  import os
      19  import queue
      20  import threading
      21  import time
      22  import traceback
      23  import types
      24  import warnings
      25  
      26  # If threading is available then ThreadPool should be provided.  Therefore
      27  # we avoid top-level imports which are liable to fail on some systems.
      28  from . import util
      29  from . import get_context, TimeoutError
      30  from .connection import wait
      31  
      32  #
      33  # Constants representing the state of a pool
      34  #
      35  
      36  INIT = "INIT"
      37  RUN = "RUN"
      38  CLOSE = "CLOSE"
      39  TERMINATE = "TERMINATE"
      40  
      41  #
      42  # Miscellaneous
      43  #
      44  
      45  job_counter = itertools.count()
      46  
      47  def mapstar(args):
      48      return list(map(*args))
      49  
      50  def starmapstar(args):
      51      return list(itertools.starmap(args[0], args[1]))
      52  
      53  #
      54  # Hack to embed stringification of remote traceback in local traceback
      55  #
      56  
      57  class ESC[4;38;5;81mRemoteTraceback(ESC[4;38;5;149mException):
      58      def __init__(self, tb):
      59          self.tb = tb
      60      def __str__(self):
      61          return self.tb
      62  
      63  class ESC[4;38;5;81mExceptionWithTraceback:
      64      def __init__(self, exc, tb):
      65          tb = traceback.format_exception(type(exc), exc, tb)
      66          tb = ''.join(tb)
      67          self.exc = exc
      68          self.tb = '\n"""\n%s"""' % tb
      69      def __reduce__(self):
      70          return rebuild_exc, (self.exc, self.tb)
      71  
      72  def rebuild_exc(exc, tb):
      73      exc.__cause__ = RemoteTraceback(tb)
      74      return exc
      75  
      76  #
      77  # Code run by worker processes
      78  #
      79  
      80  class ESC[4;38;5;81mMaybeEncodingError(ESC[4;38;5;149mException):
      81      """Wraps possible unpickleable errors, so they can be
      82      safely sent through the socket."""
      83  
      84      def __init__(self, exc, value):
      85          self.exc = repr(exc)
      86          self.value = repr(value)
      87          super(MaybeEncodingError, self).__init__(self.exc, self.value)
      88  
      89      def __str__(self):
      90          return "Error sending result: '%s'. Reason: '%s'" % (self.value,
      91                                                               self.exc)
      92  
      93      def __repr__(self):
      94          return "<%s: %s>" % (self.__class__.__name__, self)
      95  
      96  
      97  def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
      98             wrap_exception=False):
      99      if (maxtasks is not None) and not (isinstance(maxtasks, int)
     100                                         and maxtasks >= 1):
     101          raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks))
     102      put = outqueue.put
     103      get = inqueue.get
     104      if hasattr(inqueue, '_writer'):
     105          inqueue._writer.close()
     106          outqueue._reader.close()
     107  
     108      if initializer is not None:
     109          initializer(*initargs)
     110  
     111      completed = 0
     112      while maxtasks is None or (maxtasks and completed < maxtasks):
     113          try:
     114              task = get()
     115          except (EOFError, OSError):
     116              util.debug('worker got EOFError or OSError -- exiting')
     117              break
     118  
     119          if task is None:
     120              util.debug('worker got sentinel -- exiting')
     121              break
     122  
     123          job, i, func, args, kwds = task
     124          try:
     125              result = (True, func(*args, **kwds))
     126          except Exception as e:
     127              if wrap_exception and func is not _helper_reraises_exception:
     128                  e = ExceptionWithTraceback(e, e.__traceback__)
     129              result = (False, e)
     130          try:
     131              put((job, i, result))
     132          except Exception as e:
     133              wrapped = MaybeEncodingError(e, result[1])
     134              util.debug("Possible encoding error while sending result: %s" % (
     135                  wrapped))
     136              put((job, i, (False, wrapped)))
     137  
     138          task = job = result = func = args = kwds = None
     139          completed += 1
     140      util.debug('worker exiting after %d tasks' % completed)
     141  
     142  def _helper_reraises_exception(ex):
     143      'Pickle-able helper function for use by _guarded_task_generation.'
     144      raise ex
     145  
     146  #
     147  # Class representing a process pool
     148  #
     149  
     150  class ESC[4;38;5;81m_PoolCache(ESC[4;38;5;149mdict):
     151      """
     152      Class that implements a cache for the Pool class that will notify
     153      the pool management threads every time the cache is emptied. The
     154      notification is done by the use of a queue that is provided when
     155      instantiating the cache.
     156      """
     157      def __init__(self, /, *args, notifier=None, **kwds):
     158          self.notifier = notifier
     159          super().__init__(*args, **kwds)
     160  
     161      def __delitem__(self, item):
     162          super().__delitem__(item)
     163  
     164          # Notify that the cache is empty. This is important because the
     165          # pool keeps maintaining workers until the cache gets drained. This
     166          # eliminates a race condition in which a task is finished after the
     167          # the pool's _handle_workers method has enter another iteration of the
     168          # loop. In this situation, the only event that can wake up the pool
     169          # is the cache to be emptied (no more tasks available).
     170          if not self:
     171              self.notifier.put(None)
     172  
     173  class ESC[4;38;5;81mPool(ESC[4;38;5;149mobject):
     174      '''
     175      Class which supports an async version of applying functions to arguments.
     176      '''
     177      _wrap_exception = True
     178  
     179      @staticmethod
     180      def Process(ctx, *args, **kwds):
     181          return ctx.Process(*args, **kwds)
     182  
     183      def __init__(self, processes=None, initializer=None, initargs=(),
     184                   maxtasksperchild=None, context=None):
     185          # Attributes initialized early to make sure that they exist in
     186          # __del__() if __init__() raises an exception
     187          self._pool = []
     188          self._state = INIT
     189  
     190          self._ctx = context or get_context()
     191          self._setup_queues()
     192          self._taskqueue = queue.SimpleQueue()
     193          # The _change_notifier queue exist to wake up self._handle_workers()
     194          # when the cache (self._cache) is empty or when there is a change in
     195          # the _state variable of the thread that runs _handle_workers.
     196          self._change_notifier = self._ctx.SimpleQueue()
     197          self._cache = _PoolCache(notifier=self._change_notifier)
     198          self._maxtasksperchild = maxtasksperchild
     199          self._initializer = initializer
     200          self._initargs = initargs
     201  
     202          if processes is None:
     203              processes = os.cpu_count() or 1
     204          if processes < 1:
     205              raise ValueError("Number of processes must be at least 1")
     206          if maxtasksperchild is not None:
     207              if not isinstance(maxtasksperchild, int) or maxtasksperchild <= 0:
     208                  raise ValueError("maxtasksperchild must be a positive int or None")
     209  
     210          if initializer is not None and not callable(initializer):
     211              raise TypeError('initializer must be a callable')
     212  
     213          self._processes = processes
     214          try:
     215              self._repopulate_pool()
     216          except Exception:
     217              for p in self._pool:
     218                  if p.exitcode is None:
     219                      p.terminate()
     220              for p in self._pool:
     221                  p.join()
     222              raise
     223  
     224          sentinels = self._get_sentinels()
     225  
     226          self._worker_handler = threading.Thread(
     227              target=Pool._handle_workers,
     228              args=(self._cache, self._taskqueue, self._ctx, self.Process,
     229                    self._processes, self._pool, self._inqueue, self._outqueue,
     230                    self._initializer, self._initargs, self._maxtasksperchild,
     231                    self._wrap_exception, sentinels, self._change_notifier)
     232              )
     233          self._worker_handler.daemon = True
     234          self._worker_handler._state = RUN
     235          self._worker_handler.start()
     236  
     237  
     238          self._task_handler = threading.Thread(
     239              target=Pool._handle_tasks,
     240              args=(self._taskqueue, self._quick_put, self._outqueue,
     241                    self._pool, self._cache)
     242              )
     243          self._task_handler.daemon = True
     244          self._task_handler._state = RUN
     245          self._task_handler.start()
     246  
     247          self._result_handler = threading.Thread(
     248              target=Pool._handle_results,
     249              args=(self._outqueue, self._quick_get, self._cache)
     250              )
     251          self._result_handler.daemon = True
     252          self._result_handler._state = RUN
     253          self._result_handler.start()
     254  
     255          self._terminate = util.Finalize(
     256              self, self._terminate_pool,
     257              args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
     258                    self._change_notifier, self._worker_handler, self._task_handler,
     259                    self._result_handler, self._cache),
     260              exitpriority=15
     261              )
     262          self._state = RUN
     263  
     264      # Copy globals as function locals to make sure that they are available
     265      # during Python shutdown when the Pool is destroyed.
     266      def __del__(self, _warn=warnings.warn, RUN=RUN):
     267          if self._state == RUN:
     268              _warn(f"unclosed running multiprocessing pool {self!r}",
     269                    ResourceWarning, source=self)
     270              if getattr(self, '_change_notifier', None) is not None:
     271                  self._change_notifier.put(None)
     272  
     273      def __repr__(self):
     274          cls = self.__class__
     275          return (f'<{cls.__module__}.{cls.__qualname__} '
     276                  f'state={self._state} '
     277                  f'pool_size={len(self._pool)}>')
     278  
     279      def _get_sentinels(self):
     280          task_queue_sentinels = [self._outqueue._reader]
     281          self_notifier_sentinels = [self._change_notifier._reader]
     282          return [*task_queue_sentinels, *self_notifier_sentinels]
     283  
     284      @staticmethod
     285      def _get_worker_sentinels(workers):
     286          return [worker.sentinel for worker in
     287                  workers if hasattr(worker, "sentinel")]
     288  
     289      @staticmethod
     290      def _join_exited_workers(pool):
     291          """Cleanup after any worker processes which have exited due to reaching
     292          their specified lifetime.  Returns True if any workers were cleaned up.
     293          """
     294          cleaned = False
     295          for i in reversed(range(len(pool))):
     296              worker = pool[i]
     297              if worker.exitcode is not None:
     298                  # worker exited
     299                  util.debug('cleaning up worker %d' % i)
     300                  worker.join()
     301                  cleaned = True
     302                  del pool[i]
     303          return cleaned
     304  
     305      def _repopulate_pool(self):
     306          return self._repopulate_pool_static(self._ctx, self.Process,
     307                                              self._processes,
     308                                              self._pool, self._inqueue,
     309                                              self._outqueue, self._initializer,
     310                                              self._initargs,
     311                                              self._maxtasksperchild,
     312                                              self._wrap_exception)
     313  
     314      @staticmethod
     315      def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
     316                                  outqueue, initializer, initargs,
     317                                  maxtasksperchild, wrap_exception):
     318          """Bring the number of pool processes up to the specified number,
     319          for use after reaping workers which have exited.
     320          """
     321          for i in range(processes - len(pool)):
     322              w = Process(ctx, target=worker,
     323                          args=(inqueue, outqueue,
     324                                initializer,
     325                                initargs, maxtasksperchild,
     326                                wrap_exception))
     327              w.name = w.name.replace('Process', 'PoolWorker')
     328              w.daemon = True
     329              w.start()
     330              pool.append(w)
     331              util.debug('added worker')
     332  
     333      @staticmethod
     334      def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,
     335                         initializer, initargs, maxtasksperchild,
     336                         wrap_exception):
     337          """Clean up any exited workers and start replacements for them.
     338          """
     339          if Pool._join_exited_workers(pool):
     340              Pool._repopulate_pool_static(ctx, Process, processes, pool,
     341                                           inqueue, outqueue, initializer,
     342                                           initargs, maxtasksperchild,
     343                                           wrap_exception)
     344  
     345      def _setup_queues(self):
     346          self._inqueue = self._ctx.SimpleQueue()
     347          self._outqueue = self._ctx.SimpleQueue()
     348          self._quick_put = self._inqueue._writer.send
     349          self._quick_get = self._outqueue._reader.recv
     350  
     351      def _check_running(self):
     352          if self._state != RUN:
     353              raise ValueError("Pool not running")
     354  
     355      def apply(self, func, args=(), kwds={}):
     356          '''
     357          Equivalent of `func(*args, **kwds)`.
     358          Pool must be running.
     359          '''
     360          return self.apply_async(func, args, kwds).get()
     361  
     362      def map(self, func, iterable, chunksize=None):
     363          '''
     364          Apply `func` to each element in `iterable`, collecting the results
     365          in a list that is returned.
     366          '''
     367          return self._map_async(func, iterable, mapstar, chunksize).get()
     368  
     369      def starmap(self, func, iterable, chunksize=None):
     370          '''
     371          Like `map()` method but the elements of the `iterable` are expected to
     372          be iterables as well and will be unpacked as arguments. Hence
     373          `func` and (a, b) becomes func(a, b).
     374          '''
     375          return self._map_async(func, iterable, starmapstar, chunksize).get()
     376  
     377      def starmap_async(self, func, iterable, chunksize=None, callback=None,
     378              error_callback=None):
     379          '''
     380          Asynchronous version of `starmap()` method.
     381          '''
     382          return self._map_async(func, iterable, starmapstar, chunksize,
     383                                 callback, error_callback)
     384  
     385      def _guarded_task_generation(self, result_job, func, iterable):
     386          '''Provides a generator of tasks for imap and imap_unordered with
     387          appropriate handling for iterables which throw exceptions during
     388          iteration.'''
     389          try:
     390              i = -1
     391              for i, x in enumerate(iterable):
     392                  yield (result_job, i, func, (x,), {})
     393          except Exception as e:
     394              yield (result_job, i+1, _helper_reraises_exception, (e,), {})
     395  
     396      def imap(self, func, iterable, chunksize=1):
     397          '''
     398          Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
     399          '''
     400          self._check_running()
     401          if chunksize == 1:
     402              result = IMapIterator(self)
     403              self._taskqueue.put(
     404                  (
     405                      self._guarded_task_generation(result._job, func, iterable),
     406                      result._set_length
     407                  ))
     408              return result
     409          else:
     410              if chunksize < 1:
     411                  raise ValueError(
     412                      "Chunksize must be 1+, not {0:n}".format(
     413                          chunksize))
     414              task_batches = Pool._get_tasks(func, iterable, chunksize)
     415              result = IMapIterator(self)
     416              self._taskqueue.put(
     417                  (
     418                      self._guarded_task_generation(result._job,
     419                                                    mapstar,
     420                                                    task_batches),
     421                      result._set_length
     422                  ))
     423              return (item for chunk in result for item in chunk)
     424  
     425      def imap_unordered(self, func, iterable, chunksize=1):
     426          '''
     427          Like `imap()` method but ordering of results is arbitrary.
     428          '''
     429          self._check_running()
     430          if chunksize == 1:
     431              result = IMapUnorderedIterator(self)
     432              self._taskqueue.put(
     433                  (
     434                      self._guarded_task_generation(result._job, func, iterable),
     435                      result._set_length
     436                  ))
     437              return result
     438          else:
     439              if chunksize < 1:
     440                  raise ValueError(
     441                      "Chunksize must be 1+, not {0!r}".format(chunksize))
     442              task_batches = Pool._get_tasks(func, iterable, chunksize)
     443              result = IMapUnorderedIterator(self)
     444              self._taskqueue.put(
     445                  (
     446                      self._guarded_task_generation(result._job,
     447                                                    mapstar,
     448                                                    task_batches),
     449                      result._set_length
     450                  ))
     451              return (item for chunk in result for item in chunk)
     452  
     453      def apply_async(self, func, args=(), kwds={}, callback=None,
     454              error_callback=None):
     455          '''
     456          Asynchronous version of `apply()` method.
     457          '''
     458          self._check_running()
     459          result = ApplyResult(self, callback, error_callback)
     460          self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
     461          return result
     462  
     463      def map_async(self, func, iterable, chunksize=None, callback=None,
     464              error_callback=None):
     465          '''
     466          Asynchronous version of `map()` method.
     467          '''
     468          return self._map_async(func, iterable, mapstar, chunksize, callback,
     469              error_callback)
     470  
     471      def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
     472              error_callback=None):
     473          '''
     474          Helper function to implement map, starmap and their async counterparts.
     475          '''
     476          self._check_running()
     477          if not hasattr(iterable, '__len__'):
     478              iterable = list(iterable)
     479  
     480          if chunksize is None:
     481              chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
     482              if extra:
     483                  chunksize += 1
     484          if len(iterable) == 0:
     485              chunksize = 0
     486  
     487          task_batches = Pool._get_tasks(func, iterable, chunksize)
     488          result = MapResult(self, chunksize, len(iterable), callback,
     489                             error_callback=error_callback)
     490          self._taskqueue.put(
     491              (
     492                  self._guarded_task_generation(result._job,
     493                                                mapper,
     494                                                task_batches),
     495                  None
     496              )
     497          )
     498          return result
     499  
     500      @staticmethod
     501      def _wait_for_updates(sentinels, change_notifier, timeout=None):
     502          wait(sentinels, timeout=timeout)
     503          while not change_notifier.empty():
     504              change_notifier.get()
     505  
     506      @classmethod
     507      def _handle_workers(cls, cache, taskqueue, ctx, Process, processes,
     508                          pool, inqueue, outqueue, initializer, initargs,
     509                          maxtasksperchild, wrap_exception, sentinels,
     510                          change_notifier):
     511          thread = threading.current_thread()
     512  
     513          # Keep maintaining workers until the cache gets drained, unless the pool
     514          # is terminated.
     515          while thread._state == RUN or (cache and thread._state != TERMINATE):
     516              cls._maintain_pool(ctx, Process, processes, pool, inqueue,
     517                                 outqueue, initializer, initargs,
     518                                 maxtasksperchild, wrap_exception)
     519  
     520              current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels]
     521  
     522              cls._wait_for_updates(current_sentinels, change_notifier)
     523          # send sentinel to stop workers
     524          taskqueue.put(None)
     525          util.debug('worker handler exiting')
     526  
     527      @staticmethod
     528      def _handle_tasks(taskqueue, put, outqueue, pool, cache):
     529          thread = threading.current_thread()
     530  
     531          for taskseq, set_length in iter(taskqueue.get, None):
     532              task = None
     533              try:
     534                  # iterating taskseq cannot fail
     535                  for task in taskseq:
     536                      if thread._state != RUN:
     537                          util.debug('task handler found thread._state != RUN')
     538                          break
     539                      try:
     540                          put(task)
     541                      except Exception as e:
     542                          job, idx = task[:2]
     543                          try:
     544                              cache[job]._set(idx, (False, e))
     545                          except KeyError:
     546                              pass
     547                  else:
     548                      if set_length:
     549                          util.debug('doing set_length()')
     550                          idx = task[1] if task else -1
     551                          set_length(idx + 1)
     552                      continue
     553                  break
     554              finally:
     555                  task = taskseq = job = None
     556          else:
     557              util.debug('task handler got sentinel')
     558  
     559          try:
     560              # tell result handler to finish when cache is empty
     561              util.debug('task handler sending sentinel to result handler')
     562              outqueue.put(None)
     563  
     564              # tell workers there is no more work
     565              util.debug('task handler sending sentinel to workers')
     566              for p in pool:
     567                  put(None)
     568          except OSError:
     569              util.debug('task handler got OSError when sending sentinels')
     570  
     571          util.debug('task handler exiting')
     572  
     573      @staticmethod
     574      def _handle_results(outqueue, get, cache):
     575          thread = threading.current_thread()
     576  
     577          while 1:
     578              try:
     579                  task = get()
     580              except (OSError, EOFError):
     581                  util.debug('result handler got EOFError/OSError -- exiting')
     582                  return
     583  
     584              if thread._state != RUN:
     585                  assert thread._state == TERMINATE, "Thread not in TERMINATE"
     586                  util.debug('result handler found thread._state=TERMINATE')
     587                  break
     588  
     589              if task is None:
     590                  util.debug('result handler got sentinel')
     591                  break
     592  
     593              job, i, obj = task
     594              try:
     595                  cache[job]._set(i, obj)
     596              except KeyError:
     597                  pass
     598              task = job = obj = None
     599  
     600          while cache and thread._state != TERMINATE:
     601              try:
     602                  task = get()
     603              except (OSError, EOFError):
     604                  util.debug('result handler got EOFError/OSError -- exiting')
     605                  return
     606  
     607              if task is None:
     608                  util.debug('result handler ignoring extra sentinel')
     609                  continue
     610              job, i, obj = task
     611              try:
     612                  cache[job]._set(i, obj)
     613              except KeyError:
     614                  pass
     615              task = job = obj = None
     616  
     617          if hasattr(outqueue, '_reader'):
     618              util.debug('ensuring that outqueue is not full')
     619              # If we don't make room available in outqueue then
     620              # attempts to add the sentinel (None) to outqueue may
     621              # block.  There is guaranteed to be no more than 2 sentinels.
     622              try:
     623                  for i in range(10):
     624                      if not outqueue._reader.poll():
     625                          break
     626                      get()
     627              except (OSError, EOFError):
     628                  pass
     629  
     630          util.debug('result handler exiting: len(cache)=%s, thread._state=%s',
     631                len(cache), thread._state)
     632  
     633      @staticmethod
     634      def _get_tasks(func, it, size):
     635          it = iter(it)
     636          while 1:
     637              x = tuple(itertools.islice(it, size))
     638              if not x:
     639                  return
     640              yield (func, x)
     641  
     642      def __reduce__(self):
     643          raise NotImplementedError(
     644                'pool objects cannot be passed between processes or pickled'
     645                )
     646  
     647      def close(self):
     648          util.debug('closing pool')
     649          if self._state == RUN:
     650              self._state = CLOSE
     651              self._worker_handler._state = CLOSE
     652              self._change_notifier.put(None)
     653  
     654      def terminate(self):
     655          util.debug('terminating pool')
     656          self._state = TERMINATE
     657          self._terminate()
     658  
     659      def join(self):
     660          util.debug('joining pool')
     661          if self._state == RUN:
     662              raise ValueError("Pool is still running")
     663          elif self._state not in (CLOSE, TERMINATE):
     664              raise ValueError("In unknown state")
     665          self._worker_handler.join()
     666          self._task_handler.join()
     667          self._result_handler.join()
     668          for p in self._pool:
     669              p.join()
     670  
     671      @staticmethod
     672      def _help_stuff_finish(inqueue, task_handler, size):
     673          # task_handler may be blocked trying to put items on inqueue
     674          util.debug('removing tasks from inqueue until task handler finished')
     675          inqueue._rlock.acquire()
     676          while task_handler.is_alive() and inqueue._reader.poll():
     677              inqueue._reader.recv()
     678              time.sleep(0)
     679  
     680      @classmethod
     681      def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
     682                          worker_handler, task_handler, result_handler, cache):
     683          # this is guaranteed to only be called once
     684          util.debug('finalizing pool')
     685  
     686          # Notify that the worker_handler state has been changed so the
     687          # _handle_workers loop can be unblocked (and exited) in order to
     688          # send the finalization sentinel all the workers.
     689          worker_handler._state = TERMINATE
     690          change_notifier.put(None)
     691  
     692          task_handler._state = TERMINATE
     693  
     694          util.debug('helping task handler/workers to finish')
     695          cls._help_stuff_finish(inqueue, task_handler, len(pool))
     696  
     697          if (not result_handler.is_alive()) and (len(cache) != 0):
     698              raise AssertionError(
     699                  "Cannot have cache with result_hander not alive")
     700  
     701          result_handler._state = TERMINATE
     702          change_notifier.put(None)
     703          outqueue.put(None)                  # sentinel
     704  
     705          # We must wait for the worker handler to exit before terminating
     706          # workers because we don't want workers to be restarted behind our back.
     707          util.debug('joining worker handler')
     708          if threading.current_thread() is not worker_handler:
     709              worker_handler.join()
     710  
     711          # Terminate workers which haven't already finished.
     712          if pool and hasattr(pool[0], 'terminate'):
     713              util.debug('terminating workers')
     714              for p in pool:
     715                  if p.exitcode is None:
     716                      p.terminate()
     717  
     718          util.debug('joining task handler')
     719          if threading.current_thread() is not task_handler:
     720              task_handler.join()
     721  
     722          util.debug('joining result handler')
     723          if threading.current_thread() is not result_handler:
     724              result_handler.join()
     725  
     726          if pool and hasattr(pool[0], 'terminate'):
     727              util.debug('joining pool workers')
     728              for p in pool:
     729                  if p.is_alive():
     730                      # worker has not yet exited
     731                      util.debug('cleaning up worker %d' % p.pid)
     732                      p.join()
     733  
     734      def __enter__(self):
     735          self._check_running()
     736          return self
     737  
     738      def __exit__(self, exc_type, exc_val, exc_tb):
     739          self.terminate()
     740  
     741  #
     742  # Class whose instances are returned by `Pool.apply_async()`
     743  #
     744  
     745  class ESC[4;38;5;81mApplyResult(ESC[4;38;5;149mobject):
     746  
     747      def __init__(self, pool, callback, error_callback):
     748          self._pool = pool
     749          self._event = threading.Event()
     750          self._job = next(job_counter)
     751          self._cache = pool._cache
     752          self._callback = callback
     753          self._error_callback = error_callback
     754          self._cache[self._job] = self
     755  
     756      def ready(self):
     757          return self._event.is_set()
     758  
     759      def successful(self):
     760          if not self.ready():
     761              raise ValueError("{0!r} not ready".format(self))
     762          return self._success
     763  
     764      def wait(self, timeout=None):
     765          self._event.wait(timeout)
     766  
     767      def get(self, timeout=None):
     768          self.wait(timeout)
     769          if not self.ready():
     770              raise TimeoutError
     771          if self._success:
     772              return self._value
     773          else:
     774              raise self._value
     775  
     776      def _set(self, i, obj):
     777          self._success, self._value = obj
     778          if self._callback and self._success:
     779              self._callback(self._value)
     780          if self._error_callback and not self._success:
     781              self._error_callback(self._value)
     782          self._event.set()
     783          del self._cache[self._job]
     784          self._pool = None
     785  
     786      __class_getitem__ = classmethod(types.GenericAlias)
     787  
     788  AsyncResult = ApplyResult       # create alias -- see #17805
     789  
     790  #
     791  # Class whose instances are returned by `Pool.map_async()`
     792  #
     793  
     794  class ESC[4;38;5;81mMapResult(ESC[4;38;5;149mApplyResult):
     795  
     796      def __init__(self, pool, chunksize, length, callback, error_callback):
     797          ApplyResult.__init__(self, pool, callback,
     798                               error_callback=error_callback)
     799          self._success = True
     800          self._value = [None] * length
     801          self._chunksize = chunksize
     802          if chunksize <= 0:
     803              self._number_left = 0
     804              self._event.set()
     805              del self._cache[self._job]
     806          else:
     807              self._number_left = length//chunksize + bool(length % chunksize)
     808  
     809      def _set(self, i, success_result):
     810          self._number_left -= 1
     811          success, result = success_result
     812          if success and self._success:
     813              self._value[i*self._chunksize:(i+1)*self._chunksize] = result
     814              if self._number_left == 0:
     815                  if self._callback:
     816                      self._callback(self._value)
     817                  del self._cache[self._job]
     818                  self._event.set()
     819                  self._pool = None
     820          else:
     821              if not success and self._success:
     822                  # only store first exception
     823                  self._success = False
     824                  self._value = result
     825              if self._number_left == 0:
     826                  # only consider the result ready once all jobs are done
     827                  if self._error_callback:
     828                      self._error_callback(self._value)
     829                  del self._cache[self._job]
     830                  self._event.set()
     831                  self._pool = None
     832  
     833  #
     834  # Class whose instances are returned by `Pool.imap()`
     835  #
     836  
     837  class ESC[4;38;5;81mIMapIterator(ESC[4;38;5;149mobject):
     838  
     839      def __init__(self, pool):
     840          self._pool = pool
     841          self._cond = threading.Condition(threading.Lock())
     842          self._job = next(job_counter)
     843          self._cache = pool._cache
     844          self._items = collections.deque()
     845          self._index = 0
     846          self._length = None
     847          self._unsorted = {}
     848          self._cache[self._job] = self
     849  
     850      def __iter__(self):
     851          return self
     852  
     853      def next(self, timeout=None):
     854          with self._cond:
     855              try:
     856                  item = self._items.popleft()
     857              except IndexError:
     858                  if self._index == self._length:
     859                      self._pool = None
     860                      raise StopIteration from None
     861                  self._cond.wait(timeout)
     862                  try:
     863                      item = self._items.popleft()
     864                  except IndexError:
     865                      if self._index == self._length:
     866                          self._pool = None
     867                          raise StopIteration from None
     868                      raise TimeoutError from None
     869  
     870          success, value = item
     871          if success:
     872              return value
     873          raise value
     874  
     875      __next__ = next                    # XXX
     876  
     877      def _set(self, i, obj):
     878          with self._cond:
     879              if self._index == i:
     880                  self._items.append(obj)
     881                  self._index += 1
     882                  while self._index in self._unsorted:
     883                      obj = self._unsorted.pop(self._index)
     884                      self._items.append(obj)
     885                      self._index += 1
     886                  self._cond.notify()
     887              else:
     888                  self._unsorted[i] = obj
     889  
     890              if self._index == self._length:
     891                  del self._cache[self._job]
     892                  self._pool = None
     893  
     894      def _set_length(self, length):
     895          with self._cond:
     896              self._length = length
     897              if self._index == self._length:
     898                  self._cond.notify()
     899                  del self._cache[self._job]
     900                  self._pool = None
     901  
     902  #
     903  # Class whose instances are returned by `Pool.imap_unordered()`
     904  #
     905  
     906  class ESC[4;38;5;81mIMapUnorderedIterator(ESC[4;38;5;149mIMapIterator):
     907  
     908      def _set(self, i, obj):
     909          with self._cond:
     910              self._items.append(obj)
     911              self._index += 1
     912              self._cond.notify()
     913              if self._index == self._length:
     914                  del self._cache[self._job]
     915                  self._pool = None
     916  
     917  #
     918  #
     919  #
     920  
     921  class ESC[4;38;5;81mThreadPool(ESC[4;38;5;149mPool):
     922      _wrap_exception = False
     923  
     924      @staticmethod
     925      def Process(ctx, *args, **kwds):
     926          from .dummy import Process
     927          return Process(*args, **kwds)
     928  
     929      def __init__(self, processes=None, initializer=None, initargs=()):
     930          Pool.__init__(self, processes, initializer, initargs)
     931  
     932      def _setup_queues(self):
     933          self._inqueue = queue.SimpleQueue()
     934          self._outqueue = queue.SimpleQueue()
     935          self._quick_put = self._inqueue.put
     936          self._quick_get = self._outqueue.get
     937  
     938      def _get_sentinels(self):
     939          return [self._change_notifier._reader]
     940  
     941      @staticmethod
     942      def _get_worker_sentinels(workers):
     943          return []
     944  
     945      @staticmethod
     946      def _help_stuff_finish(inqueue, task_handler, size):
     947          # drain inqueue, and put sentinels at its head to make workers finish
     948          try:
     949              while True:
     950                  inqueue.get(block=False)
     951          except queue.Empty:
     952              pass
     953          for i in range(size):
     954              inqueue.put(None)
     955  
     956      def _wait_for_updates(self, sentinels, change_notifier, timeout):
     957          time.sleep(timeout)