1 """Selector and proactor event loops for Windows."""
2
3 import sys
4
5 if sys.platform != 'win32': # pragma: no cover
6 raise ImportError('win32 only')
7
8 import _overlapped
9 import _winapi
10 import errno
11 import math
12 import msvcrt
13 import socket
14 import struct
15 import time
16 import weakref
17
18 from . import events
19 from . import base_subprocess
20 from . import futures
21 from . import exceptions
22 from . import proactor_events
23 from . import selector_events
24 from . import tasks
25 from . import windows_utils
26 from .log import logger
27
28
29 __all__ = (
30 'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
31 'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy',
32 'WindowsProactorEventLoopPolicy',
33 )
34
35
36 NULL = _winapi.NULL
37 INFINITE = _winapi.INFINITE
38 ERROR_CONNECTION_REFUSED = 1225
39 ERROR_CONNECTION_ABORTED = 1236
40
41 # Initial delay in seconds for connect_pipe() before retrying to connect
42 CONNECT_PIPE_INIT_DELAY = 0.001
43
44 # Maximum delay in seconds for connect_pipe() before retrying to connect
45 CONNECT_PIPE_MAX_DELAY = 0.100
46
47
48 class ESC[4;38;5;81m_OverlappedFuture(ESC[4;38;5;149mfuturesESC[4;38;5;149m.ESC[4;38;5;149mFuture):
49 """Subclass of Future which represents an overlapped operation.
50
51 Cancelling it will immediately cancel the overlapped operation.
52 """
53
54 def __init__(self, ov, *, loop=None):
55 super().__init__(loop=loop)
56 if self._source_traceback:
57 del self._source_traceback[-1]
58 self._ov = ov
59
60 def _repr_info(self):
61 info = super()._repr_info()
62 if self._ov is not None:
63 state = 'pending' if self._ov.pending else 'completed'
64 info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>')
65 return info
66
67 def _cancel_overlapped(self):
68 if self._ov is None:
69 return
70 try:
71 self._ov.cancel()
72 except OSError as exc:
73 context = {
74 'message': 'Cancelling an overlapped future failed',
75 'exception': exc,
76 'future': self,
77 }
78 if self._source_traceback:
79 context['source_traceback'] = self._source_traceback
80 self._loop.call_exception_handler(context)
81 self._ov = None
82
83 def cancel(self, msg=None):
84 self._cancel_overlapped()
85 return super().cancel(msg=msg)
86
87 def set_exception(self, exception):
88 super().set_exception(exception)
89 self._cancel_overlapped()
90
91 def set_result(self, result):
92 super().set_result(result)
93 self._ov = None
94
95
96 class ESC[4;38;5;81m_BaseWaitHandleFuture(ESC[4;38;5;149mfuturesESC[4;38;5;149m.ESC[4;38;5;149mFuture):
97 """Subclass of Future which represents a wait handle."""
98
99 def __init__(self, ov, handle, wait_handle, *, loop=None):
100 super().__init__(loop=loop)
101 if self._source_traceback:
102 del self._source_traceback[-1]
103 # Keep a reference to the Overlapped object to keep it alive until the
104 # wait is unregistered
105 self._ov = ov
106 self._handle = handle
107 self._wait_handle = wait_handle
108
109 # Should we call UnregisterWaitEx() if the wait completes
110 # or is cancelled?
111 self._registered = True
112
113 def _poll(self):
114 # non-blocking wait: use a timeout of 0 millisecond
115 return (_winapi.WaitForSingleObject(self._handle, 0) ==
116 _winapi.WAIT_OBJECT_0)
117
118 def _repr_info(self):
119 info = super()._repr_info()
120 info.append(f'handle={self._handle:#x}')
121 if self._handle is not None:
122 state = 'signaled' if self._poll() else 'waiting'
123 info.append(state)
124 if self._wait_handle is not None:
125 info.append(f'wait_handle={self._wait_handle:#x}')
126 return info
127
128 def _unregister_wait_cb(self, fut):
129 # The wait was unregistered: it's not safe to destroy the Overlapped
130 # object
131 self._ov = None
132
133 def _unregister_wait(self):
134 if not self._registered:
135 return
136 self._registered = False
137
138 wait_handle = self._wait_handle
139 self._wait_handle = None
140 try:
141 _overlapped.UnregisterWait(wait_handle)
142 except OSError as exc:
143 if exc.winerror != _overlapped.ERROR_IO_PENDING:
144 context = {
145 'message': 'Failed to unregister the wait handle',
146 'exception': exc,
147 'future': self,
148 }
149 if self._source_traceback:
150 context['source_traceback'] = self._source_traceback
151 self._loop.call_exception_handler(context)
152 return
153 # ERROR_IO_PENDING means that the unregister is pending
154
155 self._unregister_wait_cb(None)
156
157 def cancel(self, msg=None):
158 self._unregister_wait()
159 return super().cancel(msg=msg)
160
161 def set_exception(self, exception):
162 self._unregister_wait()
163 super().set_exception(exception)
164
165 def set_result(self, result):
166 self._unregister_wait()
167 super().set_result(result)
168
169
170 class ESC[4;38;5;81m_WaitCancelFuture(ESC[4;38;5;149m_BaseWaitHandleFuture):
171 """Subclass of Future which represents a wait for the cancellation of a
172 _WaitHandleFuture using an event.
173 """
174
175 def __init__(self, ov, event, wait_handle, *, loop=None):
176 super().__init__(ov, event, wait_handle, loop=loop)
177
178 self._done_callback = None
179
180 def cancel(self):
181 raise RuntimeError("_WaitCancelFuture must not be cancelled")
182
183 def set_result(self, result):
184 super().set_result(result)
185 if self._done_callback is not None:
186 self._done_callback(self)
187
188 def set_exception(self, exception):
189 super().set_exception(exception)
190 if self._done_callback is not None:
191 self._done_callback(self)
192
193
194 class ESC[4;38;5;81m_WaitHandleFuture(ESC[4;38;5;149m_BaseWaitHandleFuture):
195 def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
196 super().__init__(ov, handle, wait_handle, loop=loop)
197 self._proactor = proactor
198 self._unregister_proactor = True
199 self._event = _overlapped.CreateEvent(None, True, False, None)
200 self._event_fut = None
201
202 def _unregister_wait_cb(self, fut):
203 if self._event is not None:
204 _winapi.CloseHandle(self._event)
205 self._event = None
206 self._event_fut = None
207
208 # If the wait was cancelled, the wait may never be signalled, so
209 # it's required to unregister it. Otherwise, IocpProactor.close() will
210 # wait forever for an event which will never come.
211 #
212 # If the IocpProactor already received the event, it's safe to call
213 # _unregister() because we kept a reference to the Overlapped object
214 # which is used as a unique key.
215 self._proactor._unregister(self._ov)
216 self._proactor = None
217
218 super()._unregister_wait_cb(fut)
219
220 def _unregister_wait(self):
221 if not self._registered:
222 return
223 self._registered = False
224
225 wait_handle = self._wait_handle
226 self._wait_handle = None
227 try:
228 _overlapped.UnregisterWaitEx(wait_handle, self._event)
229 except OSError as exc:
230 if exc.winerror != _overlapped.ERROR_IO_PENDING:
231 context = {
232 'message': 'Failed to unregister the wait handle',
233 'exception': exc,
234 'future': self,
235 }
236 if self._source_traceback:
237 context['source_traceback'] = self._source_traceback
238 self._loop.call_exception_handler(context)
239 return
240 # ERROR_IO_PENDING is not an error, the wait was unregistered
241
242 self._event_fut = self._proactor._wait_cancel(self._event,
243 self._unregister_wait_cb)
244
245
246 class ESC[4;38;5;81mPipeServer(ESC[4;38;5;149mobject):
247 """Class representing a pipe server.
248
249 This is much like a bound, listening socket.
250 """
251 def __init__(self, address):
252 self._address = address
253 self._free_instances = weakref.WeakSet()
254 # initialize the pipe attribute before calling _server_pipe_handle()
255 # because this function can raise an exception and the destructor calls
256 # the close() method
257 self._pipe = None
258 self._accept_pipe_future = None
259 self._pipe = self._server_pipe_handle(True)
260
261 def _get_unconnected_pipe(self):
262 # Create new instance and return previous one. This ensures
263 # that (until the server is closed) there is always at least
264 # one pipe handle for address. Therefore if a client attempt
265 # to connect it will not fail with FileNotFoundError.
266 tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
267 return tmp
268
269 def _server_pipe_handle(self, first):
270 # Return a wrapper for a new pipe handle.
271 if self.closed():
272 return None
273 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
274 if first:
275 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
276 h = _winapi.CreateNamedPipe(
277 self._address, flags,
278 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
279 _winapi.PIPE_WAIT,
280 _winapi.PIPE_UNLIMITED_INSTANCES,
281 windows_utils.BUFSIZE, windows_utils.BUFSIZE,
282 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
283 pipe = windows_utils.PipeHandle(h)
284 self._free_instances.add(pipe)
285 return pipe
286
287 def closed(self):
288 return (self._address is None)
289
290 def close(self):
291 if self._accept_pipe_future is not None:
292 self._accept_pipe_future.cancel()
293 self._accept_pipe_future = None
294 # Close all instances which have not been connected to by a client.
295 if self._address is not None:
296 for pipe in self._free_instances:
297 pipe.close()
298 self._pipe = None
299 self._address = None
300 self._free_instances.clear()
301
302 __del__ = close
303
304
305 class ESC[4;38;5;81m_WindowsSelectorEventLoop(ESC[4;38;5;149mselector_eventsESC[4;38;5;149m.ESC[4;38;5;149mBaseSelectorEventLoop):
306 """Windows version of selector event loop."""
307
308
309 class ESC[4;38;5;81mProactorEventLoop(ESC[4;38;5;149mproactor_eventsESC[4;38;5;149m.ESC[4;38;5;149mBaseProactorEventLoop):
310 """Windows version of proactor event loop using IOCP."""
311
312 def __init__(self, proactor=None):
313 if proactor is None:
314 proactor = IocpProactor()
315 super().__init__(proactor)
316
317 def run_forever(self):
318 try:
319 assert self._self_reading_future is None
320 self.call_soon(self._loop_self_reading)
321 super().run_forever()
322 finally:
323 if self._self_reading_future is not None:
324 ov = self._self_reading_future._ov
325 self._self_reading_future.cancel()
326 # self_reading_future was just cancelled so if it hasn't been
327 # finished yet, it never will be (it's possible that it has
328 # already finished and its callback is waiting in the queue,
329 # where it could still happen if the event loop is restarted).
330 # Unregister it otherwise IocpProactor.close will wait for it
331 # forever
332 if ov is not None:
333 self._proactor._unregister(ov)
334 self._self_reading_future = None
335
336 async def create_pipe_connection(self, protocol_factory, address):
337 f = self._proactor.connect_pipe(address)
338 pipe = await f
339 protocol = protocol_factory()
340 trans = self._make_duplex_pipe_transport(pipe, protocol,
341 extra={'addr': address})
342 return trans, protocol
343
344 async def start_serving_pipe(self, protocol_factory, address):
345 server = PipeServer(address)
346
347 def loop_accept_pipe(f=None):
348 pipe = None
349 try:
350 if f:
351 pipe = f.result()
352 server._free_instances.discard(pipe)
353
354 if server.closed():
355 # A client connected before the server was closed:
356 # drop the client (close the pipe) and exit
357 pipe.close()
358 return
359
360 protocol = protocol_factory()
361 self._make_duplex_pipe_transport(
362 pipe, protocol, extra={'addr': address})
363
364 pipe = server._get_unconnected_pipe()
365 if pipe is None:
366 return
367
368 f = self._proactor.accept_pipe(pipe)
369 except BrokenPipeError:
370 if pipe and pipe.fileno() != -1:
371 pipe.close()
372 self.call_soon(loop_accept_pipe)
373 except OSError as exc:
374 if pipe and pipe.fileno() != -1:
375 self.call_exception_handler({
376 'message': 'Pipe accept failed',
377 'exception': exc,
378 'pipe': pipe,
379 })
380 pipe.close()
381 elif self._debug:
382 logger.warning("Accept pipe failed on pipe %r",
383 pipe, exc_info=True)
384 self.call_soon(loop_accept_pipe)
385 except exceptions.CancelledError:
386 if pipe:
387 pipe.close()
388 else:
389 server._accept_pipe_future = f
390 f.add_done_callback(loop_accept_pipe)
391
392 self.call_soon(loop_accept_pipe)
393 return [server]
394
395 async def _make_subprocess_transport(self, protocol, args, shell,
396 stdin, stdout, stderr, bufsize,
397 extra=None, **kwargs):
398 waiter = self.create_future()
399 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
400 stdin, stdout, stderr, bufsize,
401 waiter=waiter, extra=extra,
402 **kwargs)
403 try:
404 await waiter
405 except (SystemExit, KeyboardInterrupt):
406 raise
407 except BaseException:
408 transp.close()
409 await transp._wait()
410 raise
411
412 return transp
413
414
415 class ESC[4;38;5;81mIocpProactor:
416 """Proactor implementation using IOCP."""
417
418 def __init__(self, concurrency=INFINITE):
419 self._loop = None
420 self._results = []
421 self._iocp = _overlapped.CreateIoCompletionPort(
422 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
423 self._cache = {}
424 self._registered = weakref.WeakSet()
425 self._unregistered = []
426 self._stopped_serving = weakref.WeakSet()
427
428 def _check_closed(self):
429 if self._iocp is None:
430 raise RuntimeError('IocpProactor is closed')
431
432 def __repr__(self):
433 info = ['overlapped#=%s' % len(self._cache),
434 'result#=%s' % len(self._results)]
435 if self._iocp is None:
436 info.append('closed')
437 return '<%s %s>' % (self.__class__.__name__, " ".join(info))
438
439 def set_loop(self, loop):
440 self._loop = loop
441
442 def select(self, timeout=None):
443 if not self._results:
444 self._poll(timeout)
445 tmp = self._results
446 self._results = []
447 try:
448 return tmp
449 finally:
450 # Needed to break cycles when an exception occurs.
451 tmp = None
452
453 def _result(self, value):
454 fut = self._loop.create_future()
455 fut.set_result(value)
456 return fut
457
458 def recv(self, conn, nbytes, flags=0):
459 self._register_with_iocp(conn)
460 ov = _overlapped.Overlapped(NULL)
461 try:
462 if isinstance(conn, socket.socket):
463 ov.WSARecv(conn.fileno(), nbytes, flags)
464 else:
465 ov.ReadFile(conn.fileno(), nbytes)
466 except BrokenPipeError:
467 return self._result(b'')
468
469 def finish_recv(trans, key, ov):
470 try:
471 return ov.getresult()
472 except OSError as exc:
473 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
474 _overlapped.ERROR_OPERATION_ABORTED):
475 raise ConnectionResetError(*exc.args)
476 else:
477 raise
478
479 return self._register(ov, conn, finish_recv)
480
481 def recv_into(self, conn, buf, flags=0):
482 self._register_with_iocp(conn)
483 ov = _overlapped.Overlapped(NULL)
484 try:
485 if isinstance(conn, socket.socket):
486 ov.WSARecvInto(conn.fileno(), buf, flags)
487 else:
488 ov.ReadFileInto(conn.fileno(), buf)
489 except BrokenPipeError:
490 return self._result(0)
491
492 def finish_recv(trans, key, ov):
493 try:
494 return ov.getresult()
495 except OSError as exc:
496 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
497 _overlapped.ERROR_OPERATION_ABORTED):
498 raise ConnectionResetError(*exc.args)
499 else:
500 raise
501
502 return self._register(ov, conn, finish_recv)
503
504 def recvfrom(self, conn, nbytes, flags=0):
505 self._register_with_iocp(conn)
506 ov = _overlapped.Overlapped(NULL)
507 try:
508 ov.WSARecvFrom(conn.fileno(), nbytes, flags)
509 except BrokenPipeError:
510 return self._result((b'', None))
511
512 def finish_recv(trans, key, ov):
513 try:
514 return ov.getresult()
515 except OSError as exc:
516 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
517 _overlapped.ERROR_OPERATION_ABORTED):
518 raise ConnectionResetError(*exc.args)
519 else:
520 raise
521
522 return self._register(ov, conn, finish_recv)
523
524 def recvfrom_into(self, conn, buf, flags=0):
525 self._register_with_iocp(conn)
526 ov = _overlapped.Overlapped(NULL)
527 try:
528 ov.WSARecvFromInto(conn.fileno(), buf, flags)
529 except BrokenPipeError:
530 return self._result((0, None))
531
532 def finish_recv(trans, key, ov):
533 try:
534 return ov.getresult()
535 except OSError as exc:
536 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
537 _overlapped.ERROR_OPERATION_ABORTED):
538 raise ConnectionResetError(*exc.args)
539 else:
540 raise
541
542 return self._register(ov, conn, finish_recv)
543
544 def sendto(self, conn, buf, flags=0, addr=None):
545 self._register_with_iocp(conn)
546 ov = _overlapped.Overlapped(NULL)
547
548 ov.WSASendTo(conn.fileno(), buf, flags, addr)
549
550 def finish_send(trans, key, ov):
551 try:
552 return ov.getresult()
553 except OSError as exc:
554 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
555 _overlapped.ERROR_OPERATION_ABORTED):
556 raise ConnectionResetError(*exc.args)
557 else:
558 raise
559
560 return self._register(ov, conn, finish_send)
561
562 def send(self, conn, buf, flags=0):
563 self._register_with_iocp(conn)
564 ov = _overlapped.Overlapped(NULL)
565 if isinstance(conn, socket.socket):
566 ov.WSASend(conn.fileno(), buf, flags)
567 else:
568 ov.WriteFile(conn.fileno(), buf)
569
570 def finish_send(trans, key, ov):
571 try:
572 return ov.getresult()
573 except OSError as exc:
574 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
575 _overlapped.ERROR_OPERATION_ABORTED):
576 raise ConnectionResetError(*exc.args)
577 else:
578 raise
579
580 return self._register(ov, conn, finish_send)
581
582 def accept(self, listener):
583 self._register_with_iocp(listener)
584 conn = self._get_accept_socket(listener.family)
585 ov = _overlapped.Overlapped(NULL)
586 ov.AcceptEx(listener.fileno(), conn.fileno())
587
588 def finish_accept(trans, key, ov):
589 ov.getresult()
590 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
591 buf = struct.pack('@P', listener.fileno())
592 conn.setsockopt(socket.SOL_SOCKET,
593 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
594 conn.settimeout(listener.gettimeout())
595 return conn, conn.getpeername()
596
597 async def accept_coro(future, conn):
598 # Coroutine closing the accept socket if the future is cancelled
599 try:
600 await future
601 except exceptions.CancelledError:
602 conn.close()
603 raise
604
605 future = self._register(ov, listener, finish_accept)
606 coro = accept_coro(future, conn)
607 tasks.ensure_future(coro, loop=self._loop)
608 return future
609
610 def connect(self, conn, address):
611 if conn.type == socket.SOCK_DGRAM:
612 # WSAConnect will complete immediately for UDP sockets so we don't
613 # need to register any IOCP operation
614 _overlapped.WSAConnect(conn.fileno(), address)
615 fut = self._loop.create_future()
616 fut.set_result(None)
617 return fut
618
619 self._register_with_iocp(conn)
620 # The socket needs to be locally bound before we call ConnectEx().
621 try:
622 _overlapped.BindLocal(conn.fileno(), conn.family)
623 except OSError as e:
624 if e.winerror != errno.WSAEINVAL:
625 raise
626 # Probably already locally bound; check using getsockname().
627 if conn.getsockname()[1] == 0:
628 raise
629 ov = _overlapped.Overlapped(NULL)
630 ov.ConnectEx(conn.fileno(), address)
631
632 def finish_connect(trans, key, ov):
633 ov.getresult()
634 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
635 conn.setsockopt(socket.SOL_SOCKET,
636 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
637 return conn
638
639 return self._register(ov, conn, finish_connect)
640
641 def sendfile(self, sock, file, offset, count):
642 self._register_with_iocp(sock)
643 ov = _overlapped.Overlapped(NULL)
644 offset_low = offset & 0xffff_ffff
645 offset_high = (offset >> 32) & 0xffff_ffff
646 ov.TransmitFile(sock.fileno(),
647 msvcrt.get_osfhandle(file.fileno()),
648 offset_low, offset_high,
649 count, 0, 0)
650
651 def finish_sendfile(trans, key, ov):
652 try:
653 return ov.getresult()
654 except OSError as exc:
655 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
656 _overlapped.ERROR_OPERATION_ABORTED):
657 raise ConnectionResetError(*exc.args)
658 else:
659 raise
660 return self._register(ov, sock, finish_sendfile)
661
662 def accept_pipe(self, pipe):
663 self._register_with_iocp(pipe)
664 ov = _overlapped.Overlapped(NULL)
665 connected = ov.ConnectNamedPipe(pipe.fileno())
666
667 if connected:
668 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
669 # that the pipe is connected. There is no need to wait for the
670 # completion of the connection.
671 return self._result(pipe)
672
673 def finish_accept_pipe(trans, key, ov):
674 ov.getresult()
675 return pipe
676
677 return self._register(ov, pipe, finish_accept_pipe)
678
679 async def connect_pipe(self, address):
680 delay = CONNECT_PIPE_INIT_DELAY
681 while True:
682 # Unfortunately there is no way to do an overlapped connect to
683 # a pipe. Call CreateFile() in a loop until it doesn't fail with
684 # ERROR_PIPE_BUSY.
685 try:
686 handle = _overlapped.ConnectPipe(address)
687 break
688 except OSError as exc:
689 if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
690 raise
691
692 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
693 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
694 await tasks.sleep(delay)
695
696 return windows_utils.PipeHandle(handle)
697
698 def wait_for_handle(self, handle, timeout=None):
699 """Wait for a handle.
700
701 Return a Future object. The result of the future is True if the wait
702 completed, or False if the wait did not complete (on timeout).
703 """
704 return self._wait_for_handle(handle, timeout, False)
705
706 def _wait_cancel(self, event, done_callback):
707 fut = self._wait_for_handle(event, None, True)
708 # add_done_callback() cannot be used because the wait may only complete
709 # in IocpProactor.close(), while the event loop is not running.
710 fut._done_callback = done_callback
711 return fut
712
713 def _wait_for_handle(self, handle, timeout, _is_cancel):
714 self._check_closed()
715
716 if timeout is None:
717 ms = _winapi.INFINITE
718 else:
719 # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
720 # round away from zero to wait *at least* timeout seconds.
721 ms = math.ceil(timeout * 1e3)
722
723 # We only create ov so we can use ov.address as a key for the cache.
724 ov = _overlapped.Overlapped(NULL)
725 wait_handle = _overlapped.RegisterWaitWithQueue(
726 handle, self._iocp, ov.address, ms)
727 if _is_cancel:
728 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
729 else:
730 f = _WaitHandleFuture(ov, handle, wait_handle, self,
731 loop=self._loop)
732 if f._source_traceback:
733 del f._source_traceback[-1]
734
735 def finish_wait_for_handle(trans, key, ov):
736 # Note that this second wait means that we should only use
737 # this with handles types where a successful wait has no
738 # effect. So events or processes are all right, but locks
739 # or semaphores are not. Also note if the handle is
740 # signalled and then quickly reset, then we may return
741 # False even though we have not timed out.
742 return f._poll()
743
744 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
745 return f
746
747 def _register_with_iocp(self, obj):
748 # To get notifications of finished ops on this objects sent to the
749 # completion port, were must register the handle.
750 if obj not in self._registered:
751 self._registered.add(obj)
752 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
753 # XXX We could also use SetFileCompletionNotificationModes()
754 # to avoid sending notifications to completion port of ops
755 # that succeed immediately.
756
757 def _register(self, ov, obj, callback):
758 self._check_closed()
759
760 # Return a future which will be set with the result of the
761 # operation when it completes. The future's value is actually
762 # the value returned by callback().
763 f = _OverlappedFuture(ov, loop=self._loop)
764 if f._source_traceback:
765 del f._source_traceback[-1]
766 if not ov.pending:
767 # The operation has completed, so no need to postpone the
768 # work. We cannot take this short cut if we need the
769 # NumberOfBytes, CompletionKey values returned by
770 # PostQueuedCompletionStatus().
771 try:
772 value = callback(None, None, ov)
773 except OSError as e:
774 f.set_exception(e)
775 else:
776 f.set_result(value)
777 # Even if GetOverlappedResult() was called, we have to wait for the
778 # notification of the completion in GetQueuedCompletionStatus().
779 # Register the overlapped operation to keep a reference to the
780 # OVERLAPPED object, otherwise the memory is freed and Windows may
781 # read uninitialized memory.
782
783 # Register the overlapped operation for later. Note that
784 # we only store obj to prevent it from being garbage
785 # collected too early.
786 self._cache[ov.address] = (f, ov, obj, callback)
787 return f
788
789 def _unregister(self, ov):
790 """Unregister an overlapped object.
791
792 Call this method when its future has been cancelled. The event can
793 already be signalled (pending in the proactor event queue). It is also
794 safe if the event is never signalled (because it was cancelled).
795 """
796 self._check_closed()
797 self._unregistered.append(ov)
798
799 def _get_accept_socket(self, family):
800 s = socket.socket(family)
801 s.settimeout(0)
802 return s
803
804 def _poll(self, timeout=None):
805 if timeout is None:
806 ms = INFINITE
807 elif timeout < 0:
808 raise ValueError("negative timeout")
809 else:
810 # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
811 # round away from zero to wait *at least* timeout seconds.
812 ms = math.ceil(timeout * 1e3)
813 if ms >= INFINITE:
814 raise ValueError("timeout too big")
815
816 while True:
817 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
818 if status is None:
819 break
820 ms = 0
821
822 err, transferred, key, address = status
823 try:
824 f, ov, obj, callback = self._cache.pop(address)
825 except KeyError:
826 if self._loop.get_debug():
827 self._loop.call_exception_handler({
828 'message': ('GetQueuedCompletionStatus() returned an '
829 'unexpected event'),
830 'status': ('err=%s transferred=%s key=%#x address=%#x'
831 % (err, transferred, key, address)),
832 })
833
834 # key is either zero, or it is used to return a pipe
835 # handle which should be closed to avoid a leak.
836 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
837 _winapi.CloseHandle(key)
838 continue
839
840 if obj in self._stopped_serving:
841 f.cancel()
842 # Don't call the callback if _register() already read the result or
843 # if the overlapped has been cancelled
844 elif not f.done():
845 try:
846 value = callback(transferred, key, ov)
847 except OSError as e:
848 f.set_exception(e)
849 self._results.append(f)
850 else:
851 f.set_result(value)
852 self._results.append(f)
853 finally:
854 f = None
855
856 # Remove unregistered futures
857 for ov in self._unregistered:
858 self._cache.pop(ov.address, None)
859 self._unregistered.clear()
860
861 def _stop_serving(self, obj):
862 # obj is a socket or pipe handle. It will be closed in
863 # BaseProactorEventLoop._stop_serving() which will make any
864 # pending operations fail quickly.
865 self._stopped_serving.add(obj)
866
867 def close(self):
868 if self._iocp is None:
869 # already closed
870 return
871
872 # Cancel remaining registered operations.
873 for fut, ov, obj, callback in list(self._cache.values()):
874 if fut.cancelled():
875 # Nothing to do with cancelled futures
876 pass
877 elif isinstance(fut, _WaitCancelFuture):
878 # _WaitCancelFuture must not be cancelled
879 pass
880 else:
881 try:
882 fut.cancel()
883 except OSError as exc:
884 if self._loop is not None:
885 context = {
886 'message': 'Cancelling a future failed',
887 'exception': exc,
888 'future': fut,
889 }
890 if fut._source_traceback:
891 context['source_traceback'] = fut._source_traceback
892 self._loop.call_exception_handler(context)
893
894 # Wait until all cancelled overlapped complete: don't exit with running
895 # overlapped to prevent a crash. Display progress every second if the
896 # loop is still running.
897 msg_update = 1.0
898 start_time = time.monotonic()
899 next_msg = start_time + msg_update
900 while self._cache:
901 if next_msg <= time.monotonic():
902 logger.debug('%r is running after closing for %.1f seconds',
903 self, time.monotonic() - start_time)
904 next_msg = time.monotonic() + msg_update
905
906 # handle a few events, or timeout
907 self._poll(msg_update)
908
909 self._results = []
910
911 _winapi.CloseHandle(self._iocp)
912 self._iocp = None
913
914 def __del__(self):
915 self.close()
916
917
918 class ESC[4;38;5;81m_WindowsSubprocessTransport(ESC[4;38;5;149mbase_subprocessESC[4;38;5;149m.ESC[4;38;5;149mBaseSubprocessTransport):
919
920 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
921 self._proc = windows_utils.Popen(
922 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
923 bufsize=bufsize, **kwargs)
924
925 def callback(f):
926 returncode = self._proc.poll()
927 self._process_exited(returncode)
928
929 f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
930 f.add_done_callback(callback)
931
932
933 SelectorEventLoop = _WindowsSelectorEventLoop
934
935
936 class ESC[4;38;5;81mWindowsSelectorEventLoopPolicy(ESC[4;38;5;149meventsESC[4;38;5;149m.ESC[4;38;5;149mBaseDefaultEventLoopPolicy):
937 _loop_factory = SelectorEventLoop
938
939
940 class ESC[4;38;5;81mWindowsProactorEventLoopPolicy(ESC[4;38;5;149meventsESC[4;38;5;149m.ESC[4;38;5;149mBaseDefaultEventLoopPolicy):
941 _loop_factory = ProactorEventLoop
942
943
944 DefaultEventLoopPolicy = WindowsProactorEventLoopPolicy