(root)/
Python-3.11.7/
Lib/
multiprocessing/
queues.py
       1  #
       2  # Module implementing queues
       3  #
       4  # multiprocessing/queues.py
       5  #
       6  # Copyright (c) 2006-2008, R Oudkerk
       7  # Licensed to PSF under a Contributor Agreement.
       8  #
       9  
      10  __all__ = ['Queue', 'SimpleQueue', 'JoinableQueue']
      11  
      12  import sys
      13  import os
      14  import threading
      15  import collections
      16  import time
      17  import types
      18  import weakref
      19  import errno
      20  
      21  from queue import Empty, Full
      22  
      23  import _multiprocessing
      24  
      25  from . import connection
      26  from . import context
      27  _ForkingPickler = context.reduction.ForkingPickler
      28  
      29  from .util import debug, info, Finalize, register_after_fork, is_exiting
      30  
      31  #
      32  # Queue type using a pipe, buffer and thread
      33  #
      34  
      35  class ESC[4;38;5;81mQueue(ESC[4;38;5;149mobject):
      36  
      37      def __init__(self, maxsize=0, *, ctx):
      38          if maxsize <= 0:
      39              # Can raise ImportError (see issues #3770 and #23400)
      40              from .synchronize import SEM_VALUE_MAX as maxsize
      41          self._maxsize = maxsize
      42          self._reader, self._writer = connection.Pipe(duplex=False)
      43          self._rlock = ctx.Lock()
      44          self._opid = os.getpid()
      45          if sys.platform == 'win32':
      46              self._wlock = None
      47          else:
      48              self._wlock = ctx.Lock()
      49          self._sem = ctx.BoundedSemaphore(maxsize)
      50          # For use by concurrent.futures
      51          self._ignore_epipe = False
      52          self._reset()
      53  
      54          if sys.platform != 'win32':
      55              register_after_fork(self, Queue._after_fork)
      56  
      57      def __getstate__(self):
      58          context.assert_spawning(self)
      59          return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
      60                  self._rlock, self._wlock, self._sem, self._opid)
      61  
      62      def __setstate__(self, state):
      63          (self._ignore_epipe, self._maxsize, self._reader, self._writer,
      64           self._rlock, self._wlock, self._sem, self._opid) = state
      65          self._reset()
      66  
      67      def _after_fork(self):
      68          debug('Queue._after_fork()')
      69          self._reset(after_fork=True)
      70  
      71      def _reset(self, after_fork=False):
      72          if after_fork:
      73              self._notempty._at_fork_reinit()
      74          else:
      75              self._notempty = threading.Condition(threading.Lock())
      76          self._buffer = collections.deque()
      77          self._thread = None
      78          self._jointhread = None
      79          self._joincancelled = False
      80          self._closed = False
      81          self._close = None
      82          self._send_bytes = self._writer.send_bytes
      83          self._recv_bytes = self._reader.recv_bytes
      84          self._poll = self._reader.poll
      85  
      86      def put(self, obj, block=True, timeout=None):
      87          if self._closed:
      88              raise ValueError(f"Queue {self!r} is closed")
      89          if not self._sem.acquire(block, timeout):
      90              raise Full
      91  
      92          with self._notempty:
      93              if self._thread is None:
      94                  self._start_thread()
      95              self._buffer.append(obj)
      96              self._notempty.notify()
      97  
      98      def get(self, block=True, timeout=None):
      99          if self._closed:
     100              raise ValueError(f"Queue {self!r} is closed")
     101          if block and timeout is None:
     102              with self._rlock:
     103                  res = self._recv_bytes()
     104              self._sem.release()
     105          else:
     106              if block:
     107                  deadline = time.monotonic() + timeout
     108              if not self._rlock.acquire(block, timeout):
     109                  raise Empty
     110              try:
     111                  if block:
     112                      timeout = deadline - time.monotonic()
     113                      if not self._poll(timeout):
     114                          raise Empty
     115                  elif not self._poll():
     116                      raise Empty
     117                  res = self._recv_bytes()
     118                  self._sem.release()
     119              finally:
     120                  self._rlock.release()
     121          # unserialize the data after having released the lock
     122          return _ForkingPickler.loads(res)
     123  
     124      def qsize(self):
     125          # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
     126          return self._maxsize - self._sem._semlock._get_value()
     127  
     128      def empty(self):
     129          return not self._poll()
     130  
     131      def full(self):
     132          return self._sem._semlock._is_zero()
     133  
     134      def get_nowait(self):
     135          return self.get(False)
     136  
     137      def put_nowait(self, obj):
     138          return self.put(obj, False)
     139  
     140      def close(self):
     141          self._closed = True
     142          close = self._close
     143          if close:
     144              self._close = None
     145              close()
     146  
     147      def join_thread(self):
     148          debug('Queue.join_thread()')
     149          assert self._closed, "Queue {0!r} not closed".format(self)
     150          if self._jointhread:
     151              self._jointhread()
     152  
     153      def cancel_join_thread(self):
     154          debug('Queue.cancel_join_thread()')
     155          self._joincancelled = True
     156          try:
     157              self._jointhread.cancel()
     158          except AttributeError:
     159              pass
     160  
     161      def _start_thread(self):
     162          debug('Queue._start_thread()')
     163  
     164          # Start thread which transfers data from buffer to pipe
     165          self._buffer.clear()
     166          self._thread = threading.Thread(
     167              target=Queue._feed,
     168              args=(self._buffer, self._notempty, self._send_bytes,
     169                    self._wlock, self._reader.close, self._writer.close,
     170                    self._ignore_epipe, self._on_queue_feeder_error,
     171                    self._sem),
     172              name='QueueFeederThread'
     173          )
     174          self._thread.daemon = True
     175  
     176          debug('doing self._thread.start()')
     177          self._thread.start()
     178          debug('... done self._thread.start()')
     179  
     180          if not self._joincancelled:
     181              self._jointhread = Finalize(
     182                  self._thread, Queue._finalize_join,
     183                  [weakref.ref(self._thread)],
     184                  exitpriority=-5
     185                  )
     186  
     187          # Send sentinel to the thread queue object when garbage collected
     188          self._close = Finalize(
     189              self, Queue._finalize_close,
     190              [self._buffer, self._notempty],
     191              exitpriority=10
     192              )
     193  
     194      @staticmethod
     195      def _finalize_join(twr):
     196          debug('joining queue thread')
     197          thread = twr()
     198          if thread is not None:
     199              thread.join()
     200              debug('... queue thread joined')
     201          else:
     202              debug('... queue thread already dead')
     203  
     204      @staticmethod
     205      def _finalize_close(buffer, notempty):
     206          debug('telling queue thread to quit')
     207          with notempty:
     208              buffer.append(_sentinel)
     209              notempty.notify()
     210  
     211      @staticmethod
     212      def _feed(buffer, notempty, send_bytes, writelock, reader_close,
     213                writer_close, ignore_epipe, onerror, queue_sem):
     214          debug('starting thread to feed data to pipe')
     215          nacquire = notempty.acquire
     216          nrelease = notempty.release
     217          nwait = notempty.wait
     218          bpopleft = buffer.popleft
     219          sentinel = _sentinel
     220          if sys.platform != 'win32':
     221              wacquire = writelock.acquire
     222              wrelease = writelock.release
     223          else:
     224              wacquire = None
     225  
     226          while 1:
     227              try:
     228                  nacquire()
     229                  try:
     230                      if not buffer:
     231                          nwait()
     232                  finally:
     233                      nrelease()
     234                  try:
     235                      while 1:
     236                          obj = bpopleft()
     237                          if obj is sentinel:
     238                              debug('feeder thread got sentinel -- exiting')
     239                              reader_close()
     240                              writer_close()
     241                              return
     242  
     243                          # serialize the data before acquiring the lock
     244                          obj = _ForkingPickler.dumps(obj)
     245                          if wacquire is None:
     246                              send_bytes(obj)
     247                          else:
     248                              wacquire()
     249                              try:
     250                                  send_bytes(obj)
     251                              finally:
     252                                  wrelease()
     253                  except IndexError:
     254                      pass
     255              except Exception as e:
     256                  if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
     257                      return
     258                  # Since this runs in a daemon thread the resources it uses
     259                  # may be become unusable while the process is cleaning up.
     260                  # We ignore errors which happen after the process has
     261                  # started to cleanup.
     262                  if is_exiting():
     263                      info('error in queue thread: %s', e)
     264                      return
     265                  else:
     266                      # Since the object has not been sent in the queue, we need
     267                      # to decrease the size of the queue. The error acts as
     268                      # if the object had been silently removed from the queue
     269                      # and this step is necessary to have a properly working
     270                      # queue.
     271                      queue_sem.release()
     272                      onerror(e, obj)
     273  
     274      @staticmethod
     275      def _on_queue_feeder_error(e, obj):
     276          """
     277          Private API hook called when feeding data in the background thread
     278          raises an exception.  For overriding by concurrent.futures.
     279          """
     280          import traceback
     281          traceback.print_exc()
     282  
     283  
     284  _sentinel = object()
     285  
     286  #
     287  # A queue type which also supports join() and task_done() methods
     288  #
     289  # Note that if you do not call task_done() for each finished task then
     290  # eventually the counter's semaphore may overflow causing Bad Things
     291  # to happen.
     292  #
     293  
     294  class ESC[4;38;5;81mJoinableQueue(ESC[4;38;5;149mQueue):
     295  
     296      def __init__(self, maxsize=0, *, ctx):
     297          Queue.__init__(self, maxsize, ctx=ctx)
     298          self._unfinished_tasks = ctx.Semaphore(0)
     299          self._cond = ctx.Condition()
     300  
     301      def __getstate__(self):
     302          return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
     303  
     304      def __setstate__(self, state):
     305          Queue.__setstate__(self, state[:-2])
     306          self._cond, self._unfinished_tasks = state[-2:]
     307  
     308      def put(self, obj, block=True, timeout=None):
     309          if self._closed:
     310              raise ValueError(f"Queue {self!r} is closed")
     311          if not self._sem.acquire(block, timeout):
     312              raise Full
     313  
     314          with self._notempty, self._cond:
     315              if self._thread is None:
     316                  self._start_thread()
     317              self._buffer.append(obj)
     318              self._unfinished_tasks.release()
     319              self._notempty.notify()
     320  
     321      def task_done(self):
     322          with self._cond:
     323              if not self._unfinished_tasks.acquire(False):
     324                  raise ValueError('task_done() called too many times')
     325              if self._unfinished_tasks._semlock._is_zero():
     326                  self._cond.notify_all()
     327  
     328      def join(self):
     329          with self._cond:
     330              if not self._unfinished_tasks._semlock._is_zero():
     331                  self._cond.wait()
     332  
     333  #
     334  # Simplified Queue type -- really just a locked pipe
     335  #
     336  
     337  class ESC[4;38;5;81mSimpleQueue(ESC[4;38;5;149mobject):
     338  
     339      def __init__(self, *, ctx):
     340          self._reader, self._writer = connection.Pipe(duplex=False)
     341          self._rlock = ctx.Lock()
     342          self._poll = self._reader.poll
     343          if sys.platform == 'win32':
     344              self._wlock = None
     345          else:
     346              self._wlock = ctx.Lock()
     347  
     348      def close(self):
     349          self._reader.close()
     350          self._writer.close()
     351  
     352      def empty(self):
     353          return not self._poll()
     354  
     355      def __getstate__(self):
     356          context.assert_spawning(self)
     357          return (self._reader, self._writer, self._rlock, self._wlock)
     358  
     359      def __setstate__(self, state):
     360          (self._reader, self._writer, self._rlock, self._wlock) = state
     361          self._poll = self._reader.poll
     362  
     363      def get(self):
     364          with self._rlock:
     365              res = self._reader.recv_bytes()
     366          # unserialize the data after having released the lock
     367          return _ForkingPickler.loads(res)
     368  
     369      def put(self, obj):
     370          # serialize the data before acquiring the lock
     371          obj = _ForkingPickler.dumps(obj)
     372          if self._wlock is None:
     373              # writes to a message oriented win32 pipe are atomic
     374              self._writer.send_bytes(obj)
     375          else:
     376              with self._wlock:
     377                  self._writer.send_bytes(obj)
     378  
     379      __class_getitem__ = classmethod(types.GenericAlias)