python (3.11.7)
1 """A Future class similar to the one in PEP 3148."""
2
3 __all__ = (
4 'Future', 'wrap_future', 'isfuture',
5 )
6
7 import concurrent.futures
8 import contextvars
9 import logging
10 import sys
11 from types import GenericAlias
12
13 from . import base_futures
14 from . import events
15 from . import exceptions
16 from . import format_helpers
17
18
19 isfuture = base_futures.isfuture
20
21
22 _PENDING = base_futures._PENDING
23 _CANCELLED = base_futures._CANCELLED
24 _FINISHED = base_futures._FINISHED
25
26
27 STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
28
29
30 class ESC[4;38;5;81mFuture:
31 """This class is *almost* compatible with concurrent.futures.Future.
32
33 Differences:
34
35 - This class is not thread-safe.
36
37 - result() and exception() do not take a timeout argument and
38 raise an exception when the future isn't done yet.
39
40 - Callbacks registered with add_done_callback() are always called
41 via the event loop's call_soon().
42
43 - This class is not compatible with the wait() and as_completed()
44 methods in the concurrent.futures package.
45
46 (In Python 3.4 or later we may be able to unify the implementations.)
47 """
48
49 # Class variables serving as defaults for instance variables.
50 _state = _PENDING
51 _result = None
52 _exception = None
53 _loop = None
54 _source_traceback = None
55 _cancel_message = None
56 # A saved CancelledError for later chaining as an exception context.
57 _cancelled_exc = None
58
59 # This field is used for a dual purpose:
60 # - Its presence is a marker to declare that a class implements
61 # the Future protocol (i.e. is intended to be duck-type compatible).
62 # The value must also be not-None, to enable a subclass to declare
63 # that it is not compatible by setting this to None.
64 # - It is set by __iter__() below so that Task._step() can tell
65 # the difference between
66 # `await Future()` or`yield from Future()` (correct) vs.
67 # `yield Future()` (incorrect).
68 _asyncio_future_blocking = False
69
70 __log_traceback = False
71
72 def __init__(self, *, loop=None):
73 """Initialize the future.
74
75 The optional event_loop argument allows explicitly setting the event
76 loop object used by the future. If it's not provided, the future uses
77 the default event loop.
78 """
79 if loop is None:
80 self._loop = events._get_event_loop()
81 else:
82 self._loop = loop
83 self._callbacks = []
84 if self._loop.get_debug():
85 self._source_traceback = format_helpers.extract_stack(
86 sys._getframe(1))
87
88 def __repr__(self):
89 return base_futures._future_repr(self)
90
91 def __del__(self):
92 if not self.__log_traceback:
93 # set_exception() was not called, or result() or exception()
94 # has consumed the exception
95 return
96 exc = self._exception
97 context = {
98 'message':
99 f'{self.__class__.__name__} exception was never retrieved',
100 'exception': exc,
101 'future': self,
102 }
103 if self._source_traceback:
104 context['source_traceback'] = self._source_traceback
105 self._loop.call_exception_handler(context)
106
107 __class_getitem__ = classmethod(GenericAlias)
108
109 @property
110 def _log_traceback(self):
111 return self.__log_traceback
112
113 @_log_traceback.setter
114 def _log_traceback(self, val):
115 if val:
116 raise ValueError('_log_traceback can only be set to False')
117 self.__log_traceback = False
118
119 def get_loop(self):
120 """Return the event loop the Future is bound to."""
121 loop = self._loop
122 if loop is None:
123 raise RuntimeError("Future object is not initialized.")
124 return loop
125
126 def _make_cancelled_error(self):
127 """Create the CancelledError to raise if the Future is cancelled.
128
129 This should only be called once when handling a cancellation since
130 it erases the saved context exception value.
131 """
132 if self._cancelled_exc is not None:
133 exc = self._cancelled_exc
134 self._cancelled_exc = None
135 return exc
136
137 if self._cancel_message is None:
138 exc = exceptions.CancelledError()
139 else:
140 exc = exceptions.CancelledError(self._cancel_message)
141 exc.__context__ = self._cancelled_exc
142 # Remove the reference since we don't need this anymore.
143 self._cancelled_exc = None
144 return exc
145
146 def cancel(self, msg=None):
147 """Cancel the future and schedule callbacks.
148
149 If the future is already done or cancelled, return False. Otherwise,
150 change the future's state to cancelled, schedule the callbacks and
151 return True.
152 """
153 self.__log_traceback = False
154 if self._state != _PENDING:
155 return False
156 self._state = _CANCELLED
157 self._cancel_message = msg
158 self.__schedule_callbacks()
159 return True
160
161 def __schedule_callbacks(self):
162 """Internal: Ask the event loop to call all callbacks.
163
164 The callbacks are scheduled to be called as soon as possible. Also
165 clears the callback list.
166 """
167 callbacks = self._callbacks[:]
168 if not callbacks:
169 return
170
171 self._callbacks[:] = []
172 for callback, ctx in callbacks:
173 self._loop.call_soon(callback, self, context=ctx)
174
175 def cancelled(self):
176 """Return True if the future was cancelled."""
177 return self._state == _CANCELLED
178
179 # Don't implement running(); see http://bugs.python.org/issue18699
180
181 def done(self):
182 """Return True if the future is done.
183
184 Done means either that a result / exception are available, or that the
185 future was cancelled.
186 """
187 return self._state != _PENDING
188
189 def result(self):
190 """Return the result this future represents.
191
192 If the future has been cancelled, raises CancelledError. If the
193 future's result isn't yet available, raises InvalidStateError. If
194 the future is done and has an exception set, this exception is raised.
195 """
196 if self._state == _CANCELLED:
197 exc = self._make_cancelled_error()
198 raise exc
199 if self._state != _FINISHED:
200 raise exceptions.InvalidStateError('Result is not ready.')
201 self.__log_traceback = False
202 if self._exception is not None:
203 raise self._exception.with_traceback(self._exception_tb)
204 return self._result
205
206 def exception(self):
207 """Return the exception that was set on this future.
208
209 The exception (or None if no exception was set) is returned only if
210 the future is done. If the future has been cancelled, raises
211 CancelledError. If the future isn't done yet, raises
212 InvalidStateError.
213 """
214 if self._state == _CANCELLED:
215 exc = self._make_cancelled_error()
216 raise exc
217 if self._state != _FINISHED:
218 raise exceptions.InvalidStateError('Exception is not set.')
219 self.__log_traceback = False
220 return self._exception
221
222 def add_done_callback(self, fn, *, context=None):
223 """Add a callback to be run when the future becomes done.
224
225 The callback is called with a single argument - the future object. If
226 the future is already done when this is called, the callback is
227 scheduled with call_soon.
228 """
229 if self._state != _PENDING:
230 self._loop.call_soon(fn, self, context=context)
231 else:
232 if context is None:
233 context = contextvars.copy_context()
234 self._callbacks.append((fn, context))
235
236 # New method not in PEP 3148.
237
238 def remove_done_callback(self, fn):
239 """Remove all instances of a callback from the "call when done" list.
240
241 Returns the number of callbacks removed.
242 """
243 filtered_callbacks = [(f, ctx)
244 for (f, ctx) in self._callbacks
245 if f != fn]
246 removed_count = len(self._callbacks) - len(filtered_callbacks)
247 if removed_count:
248 self._callbacks[:] = filtered_callbacks
249 return removed_count
250
251 # So-called internal methods (note: no set_running_or_notify_cancel()).
252
253 def set_result(self, result):
254 """Mark the future done and set its result.
255
256 If the future is already done when this method is called, raises
257 InvalidStateError.
258 """
259 if self._state != _PENDING:
260 raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
261 self._result = result
262 self._state = _FINISHED
263 self.__schedule_callbacks()
264
265 def set_exception(self, exception):
266 """Mark the future done and set an exception.
267
268 If the future is already done when this method is called, raises
269 InvalidStateError.
270 """
271 if self._state != _PENDING:
272 raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
273 if isinstance(exception, type):
274 exception = exception()
275 if type(exception) is StopIteration:
276 raise TypeError("StopIteration interacts badly with generators "
277 "and cannot be raised into a Future")
278 self._exception = exception
279 self._exception_tb = exception.__traceback__
280 self._state = _FINISHED
281 self.__schedule_callbacks()
282 self.__log_traceback = True
283
284 def __await__(self):
285 if not self.done():
286 self._asyncio_future_blocking = True
287 yield self # This tells Task to wait for completion.
288 if not self.done():
289 raise RuntimeError("await wasn't used with future")
290 return self.result() # May raise too.
291
292 __iter__ = __await__ # make compatible with 'yield from'.
293
294
295 # Needed for testing purposes.
296 _PyFuture = Future
297
298
299 def _get_loop(fut):
300 # Tries to call Future.get_loop() if it's available.
301 # Otherwise fallbacks to using the old '_loop' property.
302 try:
303 get_loop = fut.get_loop
304 except AttributeError:
305 pass
306 else:
307 return get_loop()
308 return fut._loop
309
310
311 def _set_result_unless_cancelled(fut, result):
312 """Helper setting the result only if the future was not cancelled."""
313 if fut.cancelled():
314 return
315 fut.set_result(result)
316
317
318 def _convert_future_exc(exc):
319 exc_class = type(exc)
320 if exc_class is concurrent.futures.CancelledError:
321 return exceptions.CancelledError(*exc.args)
322 elif exc_class is concurrent.futures.TimeoutError:
323 return exceptions.TimeoutError(*exc.args)
324 elif exc_class is concurrent.futures.InvalidStateError:
325 return exceptions.InvalidStateError(*exc.args)
326 else:
327 return exc
328
329
330 def _set_concurrent_future_state(concurrent, source):
331 """Copy state from a future to a concurrent.futures.Future."""
332 assert source.done()
333 if source.cancelled():
334 concurrent.cancel()
335 if not concurrent.set_running_or_notify_cancel():
336 return
337 exception = source.exception()
338 if exception is not None:
339 concurrent.set_exception(_convert_future_exc(exception))
340 else:
341 result = source.result()
342 concurrent.set_result(result)
343
344
345 def _copy_future_state(source, dest):
346 """Internal helper to copy state from another Future.
347
348 The other Future may be a concurrent.futures.Future.
349 """
350 assert source.done()
351 if dest.cancelled():
352 return
353 assert not dest.done()
354 if source.cancelled():
355 dest.cancel()
356 else:
357 exception = source.exception()
358 if exception is not None:
359 dest.set_exception(_convert_future_exc(exception))
360 else:
361 result = source.result()
362 dest.set_result(result)
363
364
365 def _chain_future(source, destination):
366 """Chain two futures so that when one completes, so does the other.
367
368 The result (or exception) of source will be copied to destination.
369 If destination is cancelled, source gets cancelled too.
370 Compatible with both asyncio.Future and concurrent.futures.Future.
371 """
372 if not isfuture(source) and not isinstance(source,
373 concurrent.futures.Future):
374 raise TypeError('A future is required for source argument')
375 if not isfuture(destination) and not isinstance(destination,
376 concurrent.futures.Future):
377 raise TypeError('A future is required for destination argument')
378 source_loop = _get_loop(source) if isfuture(source) else None
379 dest_loop = _get_loop(destination) if isfuture(destination) else None
380
381 def _set_state(future, other):
382 if isfuture(future):
383 _copy_future_state(other, future)
384 else:
385 _set_concurrent_future_state(future, other)
386
387 def _call_check_cancel(destination):
388 if destination.cancelled():
389 if source_loop is None or source_loop is dest_loop:
390 source.cancel()
391 else:
392 source_loop.call_soon_threadsafe(source.cancel)
393
394 def _call_set_state(source):
395 if (destination.cancelled() and
396 dest_loop is not None and dest_loop.is_closed()):
397 return
398 if dest_loop is None or dest_loop is source_loop:
399 _set_state(destination, source)
400 else:
401 if dest_loop.is_closed():
402 return
403 dest_loop.call_soon_threadsafe(_set_state, destination, source)
404
405 destination.add_done_callback(_call_check_cancel)
406 source.add_done_callback(_call_set_state)
407
408
409 def wrap_future(future, *, loop=None):
410 """Wrap concurrent.futures.Future object."""
411 if isfuture(future):
412 return future
413 assert isinstance(future, concurrent.futures.Future), \
414 f'concurrent.futures.Future is expected, got {future!r}'
415 if loop is None:
416 loop = events._get_event_loop()
417 new_future = loop.create_future()
418 _chain_future(future, new_future)
419 return new_future
420
421
422 try:
423 import _asyncio
424 except ImportError:
425 pass
426 else:
427 # _CFuture is needed for tests.
428 Future = _CFuture = _asyncio.Future