1 #
2 # Module implementing queues
3 #
4 # multiprocessing/queues.py
5 #
6 # Copyright (c) 2006-2008, R Oudkerk
7 # Licensed to PSF under a Contributor Agreement.
8 #
9
10 __all__ = ['Queue', 'SimpleQueue', 'JoinableQueue']
11
12 import sys
13 import os
14 import threading
15 import collections
16 import time
17 import types
18 import weakref
19 import errno
20
21 from queue import Empty, Full
22
23 import _multiprocessing
24
25 from . import connection
26 from . import context
27 _ForkingPickler = context.reduction.ForkingPickler
28
29 from .util import debug, info, Finalize, register_after_fork, is_exiting
30
31 #
32 # Queue type using a pipe, buffer and thread
33 #
34
35 class ESC[4;38;5;81mQueue(ESC[4;38;5;149mobject):
36
37 def __init__(self, maxsize=0, *, ctx):
38 if maxsize <= 0:
39 # Can raise ImportError (see issues #3770 and #23400)
40 from .synchronize import SEM_VALUE_MAX as maxsize
41 self._maxsize = maxsize
42 self._reader, self._writer = connection.Pipe(duplex=False)
43 self._rlock = ctx.Lock()
44 self._opid = os.getpid()
45 if sys.platform == 'win32':
46 self._wlock = None
47 else:
48 self._wlock = ctx.Lock()
49 self._sem = ctx.BoundedSemaphore(maxsize)
50 # For use by concurrent.futures
51 self._ignore_epipe = False
52 self._reset()
53
54 if sys.platform != 'win32':
55 register_after_fork(self, Queue._after_fork)
56
57 def __getstate__(self):
58 context.assert_spawning(self)
59 return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
60 self._rlock, self._wlock, self._sem, self._opid)
61
62 def __setstate__(self, state):
63 (self._ignore_epipe, self._maxsize, self._reader, self._writer,
64 self._rlock, self._wlock, self._sem, self._opid) = state
65 self._reset()
66
67 def _after_fork(self):
68 debug('Queue._after_fork()')
69 self._reset(after_fork=True)
70
71 def _reset(self, after_fork=False):
72 if after_fork:
73 self._notempty._at_fork_reinit()
74 else:
75 self._notempty = threading.Condition(threading.Lock())
76 self._buffer = collections.deque()
77 self._thread = None
78 self._jointhread = None
79 self._joincancelled = False
80 self._closed = False
81 self._close = None
82 self._send_bytes = self._writer.send_bytes
83 self._recv_bytes = self._reader.recv_bytes
84 self._poll = self._reader.poll
85
86 def put(self, obj, block=True, timeout=None):
87 if self._closed:
88 raise ValueError(f"Queue {self!r} is closed")
89 if not self._sem.acquire(block, timeout):
90 raise Full
91
92 with self._notempty:
93 if self._thread is None:
94 self._start_thread()
95 self._buffer.append(obj)
96 self._notempty.notify()
97
98 def get(self, block=True, timeout=None):
99 if self._closed:
100 raise ValueError(f"Queue {self!r} is closed")
101 if block and timeout is None:
102 with self._rlock:
103 res = self._recv_bytes()
104 self._sem.release()
105 else:
106 if block:
107 deadline = time.monotonic() + timeout
108 if not self._rlock.acquire(block, timeout):
109 raise Empty
110 try:
111 if block:
112 timeout = deadline - time.monotonic()
113 if not self._poll(timeout):
114 raise Empty
115 elif not self._poll():
116 raise Empty
117 res = self._recv_bytes()
118 self._sem.release()
119 finally:
120 self._rlock.release()
121 # unserialize the data after having released the lock
122 return _ForkingPickler.loads(res)
123
124 def qsize(self):
125 # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
126 return self._maxsize - self._sem._semlock._get_value()
127
128 def empty(self):
129 return not self._poll()
130
131 def full(self):
132 return self._sem._semlock._is_zero()
133
134 def get_nowait(self):
135 return self.get(False)
136
137 def put_nowait(self, obj):
138 return self.put(obj, False)
139
140 def close(self):
141 self._closed = True
142 close = self._close
143 if close:
144 self._close = None
145 close()
146
147 def join_thread(self):
148 debug('Queue.join_thread()')
149 assert self._closed, "Queue {0!r} not closed".format(self)
150 if self._jointhread:
151 self._jointhread()
152
153 def cancel_join_thread(self):
154 debug('Queue.cancel_join_thread()')
155 self._joincancelled = True
156 try:
157 self._jointhread.cancel()
158 except AttributeError:
159 pass
160
161 def _start_thread(self):
162 debug('Queue._start_thread()')
163
164 # Start thread which transfers data from buffer to pipe
165 self._buffer.clear()
166 self._thread = threading.Thread(
167 target=Queue._feed,
168 args=(self._buffer, self._notempty, self._send_bytes,
169 self._wlock, self._reader.close, self._writer.close,
170 self._ignore_epipe, self._on_queue_feeder_error,
171 self._sem),
172 name='QueueFeederThread'
173 )
174 self._thread.daemon = True
175
176 debug('doing self._thread.start()')
177 self._thread.start()
178 debug('... done self._thread.start()')
179
180 if not self._joincancelled:
181 self._jointhread = Finalize(
182 self._thread, Queue._finalize_join,
183 [weakref.ref(self._thread)],
184 exitpriority=-5
185 )
186
187 # Send sentinel to the thread queue object when garbage collected
188 self._close = Finalize(
189 self, Queue._finalize_close,
190 [self._buffer, self._notempty],
191 exitpriority=10
192 )
193
194 @staticmethod
195 def _finalize_join(twr):
196 debug('joining queue thread')
197 thread = twr()
198 if thread is not None:
199 thread.join()
200 debug('... queue thread joined')
201 else:
202 debug('... queue thread already dead')
203
204 @staticmethod
205 def _finalize_close(buffer, notempty):
206 debug('telling queue thread to quit')
207 with notempty:
208 buffer.append(_sentinel)
209 notempty.notify()
210
211 @staticmethod
212 def _feed(buffer, notempty, send_bytes, writelock, reader_close,
213 writer_close, ignore_epipe, onerror, queue_sem):
214 debug('starting thread to feed data to pipe')
215 nacquire = notempty.acquire
216 nrelease = notempty.release
217 nwait = notempty.wait
218 bpopleft = buffer.popleft
219 sentinel = _sentinel
220 if sys.platform != 'win32':
221 wacquire = writelock.acquire
222 wrelease = writelock.release
223 else:
224 wacquire = None
225
226 while 1:
227 try:
228 nacquire()
229 try:
230 if not buffer:
231 nwait()
232 finally:
233 nrelease()
234 try:
235 while 1:
236 obj = bpopleft()
237 if obj is sentinel:
238 debug('feeder thread got sentinel -- exiting')
239 reader_close()
240 writer_close()
241 return
242
243 # serialize the data before acquiring the lock
244 obj = _ForkingPickler.dumps(obj)
245 if wacquire is None:
246 send_bytes(obj)
247 else:
248 wacquire()
249 try:
250 send_bytes(obj)
251 finally:
252 wrelease()
253 except IndexError:
254 pass
255 except Exception as e:
256 if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
257 return
258 # Since this runs in a daemon thread the resources it uses
259 # may be become unusable while the process is cleaning up.
260 # We ignore errors which happen after the process has
261 # started to cleanup.
262 if is_exiting():
263 info('error in queue thread: %s', e)
264 return
265 else:
266 # Since the object has not been sent in the queue, we need
267 # to decrease the size of the queue. The error acts as
268 # if the object had been silently removed from the queue
269 # and this step is necessary to have a properly working
270 # queue.
271 queue_sem.release()
272 onerror(e, obj)
273
274 @staticmethod
275 def _on_queue_feeder_error(e, obj):
276 """
277 Private API hook called when feeding data in the background thread
278 raises an exception. For overriding by concurrent.futures.
279 """
280 import traceback
281 traceback.print_exc()
282
283
284 _sentinel = object()
285
286 #
287 # A queue type which also supports join() and task_done() methods
288 #
289 # Note that if you do not call task_done() for each finished task then
290 # eventually the counter's semaphore may overflow causing Bad Things
291 # to happen.
292 #
293
294 class ESC[4;38;5;81mJoinableQueue(ESC[4;38;5;149mQueue):
295
296 def __init__(self, maxsize=0, *, ctx):
297 Queue.__init__(self, maxsize, ctx=ctx)
298 self._unfinished_tasks = ctx.Semaphore(0)
299 self._cond = ctx.Condition()
300
301 def __getstate__(self):
302 return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
303
304 def __setstate__(self, state):
305 Queue.__setstate__(self, state[:-2])
306 self._cond, self._unfinished_tasks = state[-2:]
307
308 def put(self, obj, block=True, timeout=None):
309 if self._closed:
310 raise ValueError(f"Queue {self!r} is closed")
311 if not self._sem.acquire(block, timeout):
312 raise Full
313
314 with self._notempty, self._cond:
315 if self._thread is None:
316 self._start_thread()
317 self._buffer.append(obj)
318 self._unfinished_tasks.release()
319 self._notempty.notify()
320
321 def task_done(self):
322 with self._cond:
323 if not self._unfinished_tasks.acquire(False):
324 raise ValueError('task_done() called too many times')
325 if self._unfinished_tasks._semlock._is_zero():
326 self._cond.notify_all()
327
328 def join(self):
329 with self._cond:
330 if not self._unfinished_tasks._semlock._is_zero():
331 self._cond.wait()
332
333 #
334 # Simplified Queue type -- really just a locked pipe
335 #
336
337 class ESC[4;38;5;81mSimpleQueue(ESC[4;38;5;149mobject):
338
339 def __init__(self, *, ctx):
340 self._reader, self._writer = connection.Pipe(duplex=False)
341 self._rlock = ctx.Lock()
342 self._poll = self._reader.poll
343 if sys.platform == 'win32':
344 self._wlock = None
345 else:
346 self._wlock = ctx.Lock()
347
348 def close(self):
349 self._reader.close()
350 self._writer.close()
351
352 def empty(self):
353 return not self._poll()
354
355 def __getstate__(self):
356 context.assert_spawning(self)
357 return (self._reader, self._writer, self._rlock, self._wlock)
358
359 def __setstate__(self, state):
360 (self._reader, self._writer, self._rlock, self._wlock) = state
361 self._poll = self._reader.poll
362
363 def get(self):
364 with self._rlock:
365 res = self._reader.recv_bytes()
366 # unserialize the data after having released the lock
367 return _ForkingPickler.loads(res)
368
369 def put(self, obj):
370 # serialize the data before acquiring the lock
371 obj = _ForkingPickler.dumps(obj)
372 if self._wlock is None:
373 # writes to a message oriented win32 pipe are atomic
374 self._writer.send_bytes(obj)
375 else:
376 with self._wlock:
377 self._writer.send_bytes(obj)
378
379 __class_getitem__ = classmethod(types.GenericAlias)