1 __all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
2
3 import collections
4 import heapq
5 from types import GenericAlias
6
7 from . import locks
8 from . import mixins
9
10
11 class ESC[4;38;5;81mQueueEmpty(ESC[4;38;5;149mException):
12 """Raised when Queue.get_nowait() is called on an empty Queue."""
13 pass
14
15
16 class ESC[4;38;5;81mQueueFull(ESC[4;38;5;149mException):
17 """Raised when the Queue.put_nowait() method is called on a full Queue."""
18 pass
19
20
21 class ESC[4;38;5;81mQueue(ESC[4;38;5;149mmixinsESC[4;38;5;149m.ESC[4;38;5;149m_LoopBoundMixin):
22 """A queue, useful for coordinating producer and consumer coroutines.
23
24 If maxsize is less than or equal to zero, the queue size is infinite. If it
25 is an integer greater than 0, then "await put()" will block when the
26 queue reaches maxsize, until an item is removed by get().
27
28 Unlike the standard library Queue, you can reliably know this Queue's size
29 with qsize(), since your single-threaded asyncio application won't be
30 interrupted between calling qsize() and doing an operation on the Queue.
31 """
32
33 def __init__(self, maxsize=0):
34 self._maxsize = maxsize
35
36 # Futures.
37 self._getters = collections.deque()
38 # Futures.
39 self._putters = collections.deque()
40 self._unfinished_tasks = 0
41 self._finished = locks.Event()
42 self._finished.set()
43 self._init(maxsize)
44
45 # These three are overridable in subclasses.
46
47 def _init(self, maxsize):
48 self._queue = collections.deque()
49
50 def _get(self):
51 return self._queue.popleft()
52
53 def _put(self, item):
54 self._queue.append(item)
55
56 # End of the overridable methods.
57
58 def _wakeup_next(self, waiters):
59 # Wake up the next waiter (if any) that isn't cancelled.
60 while waiters:
61 waiter = waiters.popleft()
62 if not waiter.done():
63 waiter.set_result(None)
64 break
65
66 def __repr__(self):
67 return f'<{type(self).__name__} at {id(self):#x} {self._format()}>'
68
69 def __str__(self):
70 return f'<{type(self).__name__} {self._format()}>'
71
72 __class_getitem__ = classmethod(GenericAlias)
73
74 def _format(self):
75 result = f'maxsize={self._maxsize!r}'
76 if getattr(self, '_queue', None):
77 result += f' _queue={list(self._queue)!r}'
78 if self._getters:
79 result += f' _getters[{len(self._getters)}]'
80 if self._putters:
81 result += f' _putters[{len(self._putters)}]'
82 if self._unfinished_tasks:
83 result += f' tasks={self._unfinished_tasks}'
84 return result
85
86 def qsize(self):
87 """Number of items in the queue."""
88 return len(self._queue)
89
90 @property
91 def maxsize(self):
92 """Number of items allowed in the queue."""
93 return self._maxsize
94
95 def empty(self):
96 """Return True if the queue is empty, False otherwise."""
97 return not self._queue
98
99 def full(self):
100 """Return True if there are maxsize items in the queue.
101
102 Note: if the Queue was initialized with maxsize=0 (the default),
103 then full() is never True.
104 """
105 if self._maxsize <= 0:
106 return False
107 else:
108 return self.qsize() >= self._maxsize
109
110 async def put(self, item):
111 """Put an item into the queue.
112
113 Put an item into the queue. If the queue is full, wait until a free
114 slot is available before adding item.
115 """
116 while self.full():
117 putter = self._get_loop().create_future()
118 self._putters.append(putter)
119 try:
120 await putter
121 except:
122 putter.cancel() # Just in case putter is not done yet.
123 try:
124 # Clean self._putters from canceled putters.
125 self._putters.remove(putter)
126 except ValueError:
127 # The putter could be removed from self._putters by a
128 # previous get_nowait call.
129 pass
130 if not self.full() and not putter.cancelled():
131 # We were woken up by get_nowait(), but can't take
132 # the call. Wake up the next in line.
133 self._wakeup_next(self._putters)
134 raise
135 return self.put_nowait(item)
136
137 def put_nowait(self, item):
138 """Put an item into the queue without blocking.
139
140 If no free slot is immediately available, raise QueueFull.
141 """
142 if self.full():
143 raise QueueFull
144 self._put(item)
145 self._unfinished_tasks += 1
146 self._finished.clear()
147 self._wakeup_next(self._getters)
148
149 async def get(self):
150 """Remove and return an item from the queue.
151
152 If queue is empty, wait until an item is available.
153 """
154 while self.empty():
155 getter = self._get_loop().create_future()
156 self._getters.append(getter)
157 try:
158 await getter
159 except:
160 getter.cancel() # Just in case getter is not done yet.
161 try:
162 # Clean self._getters from canceled getters.
163 self._getters.remove(getter)
164 except ValueError:
165 # The getter could be removed from self._getters by a
166 # previous put_nowait call.
167 pass
168 if not self.empty() and not getter.cancelled():
169 # We were woken up by put_nowait(), but can't take
170 # the call. Wake up the next in line.
171 self._wakeup_next(self._getters)
172 raise
173 return self.get_nowait()
174
175 def get_nowait(self):
176 """Remove and return an item from the queue.
177
178 Return an item if one is immediately available, else raise QueueEmpty.
179 """
180 if self.empty():
181 raise QueueEmpty
182 item = self._get()
183 self._wakeup_next(self._putters)
184 return item
185
186 def task_done(self):
187 """Indicate that a formerly enqueued task is complete.
188
189 Used by queue consumers. For each get() used to fetch a task,
190 a subsequent call to task_done() tells the queue that the processing
191 on the task is complete.
192
193 If a join() is currently blocking, it will resume when all items have
194 been processed (meaning that a task_done() call was received for every
195 item that had been put() into the queue).
196
197 Raises ValueError if called more times than there were items placed in
198 the queue.
199 """
200 if self._unfinished_tasks <= 0:
201 raise ValueError('task_done() called too many times')
202 self._unfinished_tasks -= 1
203 if self._unfinished_tasks == 0:
204 self._finished.set()
205
206 async def join(self):
207 """Block until all items in the queue have been gotten and processed.
208
209 The count of unfinished tasks goes up whenever an item is added to the
210 queue. The count goes down whenever a consumer calls task_done() to
211 indicate that the item was retrieved and all work on it is complete.
212 When the count of unfinished tasks drops to zero, join() unblocks.
213 """
214 if self._unfinished_tasks > 0:
215 await self._finished.wait()
216
217
218 class ESC[4;38;5;81mPriorityQueue(ESC[4;38;5;149mQueue):
219 """A subclass of Queue; retrieves entries in priority order (lowest first).
220
221 Entries are typically tuples of the form: (priority number, data).
222 """
223
224 def _init(self, maxsize):
225 self._queue = []
226
227 def _put(self, item, heappush=heapq.heappush):
228 heappush(self._queue, item)
229
230 def _get(self, heappop=heapq.heappop):
231 return heappop(self._queue)
232
233
234 class ESC[4;38;5;81mLifoQueue(ESC[4;38;5;149mQueue):
235 """A subclass of Queue that retrieves most recently added entries first."""
236
237 def _init(self, maxsize):
238 self._queue = []
239
240 def _put(self, item):
241 self._queue.append(item)
242
243 def _get(self):
244 return self._queue.pop()