1 #
2 # Module implementing queues
3 #
4 # multiprocessing/queues.py
5 #
6 # Copyright (c) 2006-2008, R Oudkerk
7 # Licensed to PSF under a Contributor Agreement.
8 #
9
10 __all__ = ['Queue', 'SimpleQueue', 'JoinableQueue']
11
12 import sys
13 import os
14 import threading
15 import collections
16 import time
17 import types
18 import weakref
19 import errno
20
21 from queue import Empty, Full
22
23 import _multiprocessing
24
25 from . import connection
26 from . import context
27 _ForkingPickler = context.reduction.ForkingPickler
28
29 from .util import debug, info, Finalize, register_after_fork, is_exiting
30
31 #
32 # Queue type using a pipe, buffer and thread
33 #
34
35 class ESC[4;38;5;81mQueue(ESC[4;38;5;149mobject):
36
37 def __init__(self, maxsize=0, *, ctx):
38 if maxsize <= 0:
39 # Can raise ImportError (see issues #3770 and #23400)
40 from .synchronize import SEM_VALUE_MAX as maxsize
41 self._maxsize = maxsize
42 self._reader, self._writer = connection.Pipe(duplex=False)
43 self._rlock = ctx.Lock()
44 self._opid = os.getpid()
45 if sys.platform == 'win32':
46 self._wlock = None
47 else:
48 self._wlock = ctx.Lock()
49 self._sem = ctx.BoundedSemaphore(maxsize)
50 # For use by concurrent.futures
51 self._ignore_epipe = False
52 self._reset()
53
54 if sys.platform != 'win32':
55 register_after_fork(self, Queue._after_fork)
56
57 def __getstate__(self):
58 context.assert_spawning(self)
59 return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
60 self._rlock, self._wlock, self._sem, self._opid)
61
62 def __setstate__(self, state):
63 (self._ignore_epipe, self._maxsize, self._reader, self._writer,
64 self._rlock, self._wlock, self._sem, self._opid) = state
65 self._reset()
66
67 def _after_fork(self):
68 debug('Queue._after_fork()')
69 self._reset(after_fork=True)
70
71 def _reset(self, after_fork=False):
72 if after_fork:
73 self._notempty._at_fork_reinit()
74 else:
75 self._notempty = threading.Condition(threading.Lock())
76 self._buffer = collections.deque()
77 self._thread = None
78 self._jointhread = None
79 self._joincancelled = False
80 self._closed = False
81 self._close = None
82 self._send_bytes = self._writer.send_bytes
83 self._recv_bytes = self._reader.recv_bytes
84 self._poll = self._reader.poll
85
86 def put(self, obj, block=True, timeout=None):
87 if self._closed:
88 raise ValueError(f"Queue {self!r} is closed")
89 if not self._sem.acquire(block, timeout):
90 raise Full
91
92 with self._notempty:
93 if self._thread is None:
94 self._start_thread()
95 self._buffer.append(obj)
96 self._notempty.notify()
97
98 def get(self, block=True, timeout=None):
99 if self._closed:
100 raise ValueError(f"Queue {self!r} is closed")
101 if block and timeout is None:
102 with self._rlock:
103 res = self._recv_bytes()
104 self._sem.release()
105 else:
106 if block:
107 deadline = time.monotonic() + timeout
108 if not self._rlock.acquire(block, timeout):
109 raise Empty
110 try:
111 if block:
112 timeout = deadline - time.monotonic()
113 if not self._poll(timeout):
114 raise Empty
115 elif not self._poll():
116 raise Empty
117 res = self._recv_bytes()
118 self._sem.release()
119 finally:
120 self._rlock.release()
121 # unserialize the data after having released the lock
122 return _ForkingPickler.loads(res)
123
124 def qsize(self):
125 # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
126 return self._maxsize - self._sem._semlock._get_value()
127
128 def empty(self):
129 return not self._poll()
130
131 def full(self):
132 return self._sem._semlock._is_zero()
133
134 def get_nowait(self):
135 return self.get(False)
136
137 def put_nowait(self, obj):
138 return self.put(obj, False)
139
140 def close(self):
141 self._closed = True
142 close = self._close
143 if close:
144 self._close = None
145 close()
146
147 def join_thread(self):
148 debug('Queue.join_thread()')
149 assert self._closed, "Queue {0!r} not closed".format(self)
150 if self._jointhread:
151 self._jointhread()
152
153 def cancel_join_thread(self):
154 debug('Queue.cancel_join_thread()')
155 self._joincancelled = True
156 try:
157 self._jointhread.cancel()
158 except AttributeError:
159 pass
160
161 def _start_thread(self):
162 debug('Queue._start_thread()')
163
164 # Start thread which transfers data from buffer to pipe
165 self._buffer.clear()
166 self._thread = threading.Thread(
167 target=Queue._feed,
168 args=(self._buffer, self._notempty, self._send_bytes,
169 self._wlock, self._reader.close, self._writer.close,
170 self._ignore_epipe, self._on_queue_feeder_error,
171 self._sem),
172 name='QueueFeederThread'
173 )
174 self._thread.daemon = True
175
176 debug('doing self._thread.start()')
177 self._thread.start()
178 debug('... done self._thread.start()')
179
180 if not self._joincancelled:
181 self._jointhread = Finalize(
182 self._thread, Queue._finalize_join,
183 [weakref.ref(self._thread)],
184 exitpriority=-5
185 )
186
187 # Send sentinel to the thread queue object when garbage collected
188 self._close = Finalize(
189 self, Queue._finalize_close,
190 [self._buffer, self._notempty],
191 exitpriority=10
192 )
193
194 @staticmethod
195 def _finalize_join(twr):
196 debug('joining queue thread')
197 thread = twr()
198 if thread is not None:
199 thread.join()
200 debug('... queue thread joined')
201 else:
202 debug('... queue thread already dead')
203
204 @staticmethod
205 def _finalize_close(buffer, notempty):
206 debug('telling queue thread to quit')
207 with notempty:
208 buffer.append(_sentinel)
209 notempty.notify()
210
211 @staticmethod
212 def _feed(buffer, notempty, send_bytes, writelock, reader_close,
213 writer_close, ignore_epipe, onerror, queue_sem):
214 debug('starting thread to feed data to pipe')
215 nacquire = notempty.acquire
216 nrelease = notempty.release
217 nwait = notempty.wait
218 bpopleft = buffer.popleft
219 sentinel = _sentinel
220 if sys.platform != 'win32':
221 wacquire = writelock.acquire
222 wrelease = writelock.release
223 else:
224 wacquire = None
225
226 while 1:
227 try:
228 nacquire()
229 try:
230 if not buffer:
231 nwait()
232 finally:
233 nrelease()
234 try:
235 while 1:
236 obj = bpopleft()
237 if obj is sentinel:
238 debug('feeder thread got sentinel -- exiting')
239 reader_close()
240 writer_close()
241 return
242
243 # serialize the data before acquiring the lock
244 obj = _ForkingPickler.dumps(obj)
245 if wacquire is None:
246 send_bytes(obj)
247 else:
248 wacquire()
249 try:
250 send_bytes(obj)
251 finally:
252 wrelease()
253 except IndexError:
254 pass
255 except Exception as e:
256 if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
257 return
258 # Since this runs in a daemon thread the resources it uses
259 # may be become unusable while the process is cleaning up.
260 # We ignore errors which happen after the process has
261 # started to cleanup.
262 if is_exiting():
263 info('error in queue thread: %s', e)
264 return
265 else:
266 # Since the object has not been sent in the queue, we need
267 # to decrease the size of the queue. The error acts as
268 # if the object had been silently removed from the queue
269 # and this step is necessary to have a properly working
270 # queue.
271 queue_sem.release()
272 onerror(e, obj)
273
274 @staticmethod
275 def _on_queue_feeder_error(e, obj):
276 """
277 Private API hook called when feeding data in the background thread
278 raises an exception. For overriding by concurrent.futures.
279 """
280 import traceback
281 traceback.print_exc()
282
283 __class_getitem__ = classmethod(types.GenericAlias)
284
285
286 _sentinel = object()
287
288 #
289 # A queue type which also supports join() and task_done() methods
290 #
291 # Note that if you do not call task_done() for each finished task then
292 # eventually the counter's semaphore may overflow causing Bad Things
293 # to happen.
294 #
295
296 class ESC[4;38;5;81mJoinableQueue(ESC[4;38;5;149mQueue):
297
298 def __init__(self, maxsize=0, *, ctx):
299 Queue.__init__(self, maxsize, ctx=ctx)
300 self._unfinished_tasks = ctx.Semaphore(0)
301 self._cond = ctx.Condition()
302
303 def __getstate__(self):
304 return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
305
306 def __setstate__(self, state):
307 Queue.__setstate__(self, state[:-2])
308 self._cond, self._unfinished_tasks = state[-2:]
309
310 def put(self, obj, block=True, timeout=None):
311 if self._closed:
312 raise ValueError(f"Queue {self!r} is closed")
313 if not self._sem.acquire(block, timeout):
314 raise Full
315
316 with self._notempty, self._cond:
317 if self._thread is None:
318 self._start_thread()
319 self._buffer.append(obj)
320 self._unfinished_tasks.release()
321 self._notempty.notify()
322
323 def task_done(self):
324 with self._cond:
325 if not self._unfinished_tasks.acquire(False):
326 raise ValueError('task_done() called too many times')
327 if self._unfinished_tasks._semlock._is_zero():
328 self._cond.notify_all()
329
330 def join(self):
331 with self._cond:
332 if not self._unfinished_tasks._semlock._is_zero():
333 self._cond.wait()
334
335 #
336 # Simplified Queue type -- really just a locked pipe
337 #
338
339 class ESC[4;38;5;81mSimpleQueue(ESC[4;38;5;149mobject):
340
341 def __init__(self, *, ctx):
342 self._reader, self._writer = connection.Pipe(duplex=False)
343 self._rlock = ctx.Lock()
344 self._poll = self._reader.poll
345 if sys.platform == 'win32':
346 self._wlock = None
347 else:
348 self._wlock = ctx.Lock()
349
350 def close(self):
351 self._reader.close()
352 self._writer.close()
353
354 def empty(self):
355 return not self._poll()
356
357 def __getstate__(self):
358 context.assert_spawning(self)
359 return (self._reader, self._writer, self._rlock, self._wlock)
360
361 def __setstate__(self, state):
362 (self._reader, self._writer, self._rlock, self._wlock) = state
363 self._poll = self._reader.poll
364
365 def get(self):
366 with self._rlock:
367 res = self._reader.recv_bytes()
368 # unserialize the data after having released the lock
369 return _ForkingPickler.loads(res)
370
371 def put(self, obj):
372 # serialize the data before acquiring the lock
373 obj = _ForkingPickler.dumps(obj)
374 if self._wlock is None:
375 # writes to a message oriented win32 pipe are atomic
376 self._writer.send_bytes(obj)
377 else:
378 with self._wlock:
379 self._writer.send_bytes(obj)
380
381 __class_getitem__ = classmethod(types.GenericAlias)