(root)/
Python-3.11.7/
Lib/
asyncio/
windows_events.py
       1  """Selector and proactor event loops for Windows."""
       2  
       3  import sys
       4  
       5  if sys.platform != 'win32':  # pragma: no cover
       6      raise ImportError('win32 only')
       7  
       8  import _overlapped
       9  import _winapi
      10  import errno
      11  import math
      12  import msvcrt
      13  import socket
      14  import struct
      15  import time
      16  import weakref
      17  
      18  from . import events
      19  from . import base_subprocess
      20  from . import futures
      21  from . import exceptions
      22  from . import proactor_events
      23  from . import selector_events
      24  from . import tasks
      25  from . import windows_utils
      26  from .log import logger
      27  
      28  
      29  __all__ = (
      30      'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
      31      'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy',
      32      'WindowsProactorEventLoopPolicy',
      33  )
      34  
      35  
      36  NULL = _winapi.NULL
      37  INFINITE = _winapi.INFINITE
      38  ERROR_CONNECTION_REFUSED = 1225
      39  ERROR_CONNECTION_ABORTED = 1236
      40  
      41  # Initial delay in seconds for connect_pipe() before retrying to connect
      42  CONNECT_PIPE_INIT_DELAY = 0.001
      43  
      44  # Maximum delay in seconds for connect_pipe() before retrying to connect
      45  CONNECT_PIPE_MAX_DELAY = 0.100
      46  
      47  
      48  class ESC[4;38;5;81m_OverlappedFuture(ESC[4;38;5;149mfuturesESC[4;38;5;149m.ESC[4;38;5;149mFuture):
      49      """Subclass of Future which represents an overlapped operation.
      50  
      51      Cancelling it will immediately cancel the overlapped operation.
      52      """
      53  
      54      def __init__(self, ov, *, loop=None):
      55          super().__init__(loop=loop)
      56          if self._source_traceback:
      57              del self._source_traceback[-1]
      58          self._ov = ov
      59  
      60      def _repr_info(self):
      61          info = super()._repr_info()
      62          if self._ov is not None:
      63              state = 'pending' if self._ov.pending else 'completed'
      64              info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>')
      65          return info
      66  
      67      def _cancel_overlapped(self):
      68          if self._ov is None:
      69              return
      70          try:
      71              self._ov.cancel()
      72          except OSError as exc:
      73              context = {
      74                  'message': 'Cancelling an overlapped future failed',
      75                  'exception': exc,
      76                  'future': self,
      77              }
      78              if self._source_traceback:
      79                  context['source_traceback'] = self._source_traceback
      80              self._loop.call_exception_handler(context)
      81          self._ov = None
      82  
      83      def cancel(self, msg=None):
      84          self._cancel_overlapped()
      85          return super().cancel(msg=msg)
      86  
      87      def set_exception(self, exception):
      88          super().set_exception(exception)
      89          self._cancel_overlapped()
      90  
      91      def set_result(self, result):
      92          super().set_result(result)
      93          self._ov = None
      94  
      95  
      96  class ESC[4;38;5;81m_BaseWaitHandleFuture(ESC[4;38;5;149mfuturesESC[4;38;5;149m.ESC[4;38;5;149mFuture):
      97      """Subclass of Future which represents a wait handle."""
      98  
      99      def __init__(self, ov, handle, wait_handle, *, loop=None):
     100          super().__init__(loop=loop)
     101          if self._source_traceback:
     102              del self._source_traceback[-1]
     103          # Keep a reference to the Overlapped object to keep it alive until the
     104          # wait is unregistered
     105          self._ov = ov
     106          self._handle = handle
     107          self._wait_handle = wait_handle
     108  
     109          # Should we call UnregisterWaitEx() if the wait completes
     110          # or is cancelled?
     111          self._registered = True
     112  
     113      def _poll(self):
     114          # non-blocking wait: use a timeout of 0 millisecond
     115          return (_winapi.WaitForSingleObject(self._handle, 0) ==
     116                  _winapi.WAIT_OBJECT_0)
     117  
     118      def _repr_info(self):
     119          info = super()._repr_info()
     120          info.append(f'handle={self._handle:#x}')
     121          if self._handle is not None:
     122              state = 'signaled' if self._poll() else 'waiting'
     123              info.append(state)
     124          if self._wait_handle is not None:
     125              info.append(f'wait_handle={self._wait_handle:#x}')
     126          return info
     127  
     128      def _unregister_wait_cb(self, fut):
     129          # The wait was unregistered: it's not safe to destroy the Overlapped
     130          # object
     131          self._ov = None
     132  
     133      def _unregister_wait(self):
     134          if not self._registered:
     135              return
     136          self._registered = False
     137  
     138          wait_handle = self._wait_handle
     139          self._wait_handle = None
     140          try:
     141              _overlapped.UnregisterWait(wait_handle)
     142          except OSError as exc:
     143              if exc.winerror != _overlapped.ERROR_IO_PENDING:
     144                  context = {
     145                      'message': 'Failed to unregister the wait handle',
     146                      'exception': exc,
     147                      'future': self,
     148                  }
     149                  if self._source_traceback:
     150                      context['source_traceback'] = self._source_traceback
     151                  self._loop.call_exception_handler(context)
     152                  return
     153              # ERROR_IO_PENDING means that the unregister is pending
     154  
     155          self._unregister_wait_cb(None)
     156  
     157      def cancel(self, msg=None):
     158          self._unregister_wait()
     159          return super().cancel(msg=msg)
     160  
     161      def set_exception(self, exception):
     162          self._unregister_wait()
     163          super().set_exception(exception)
     164  
     165      def set_result(self, result):
     166          self._unregister_wait()
     167          super().set_result(result)
     168  
     169  
     170  class ESC[4;38;5;81m_WaitCancelFuture(ESC[4;38;5;149m_BaseWaitHandleFuture):
     171      """Subclass of Future which represents a wait for the cancellation of a
     172      _WaitHandleFuture using an event.
     173      """
     174  
     175      def __init__(self, ov, event, wait_handle, *, loop=None):
     176          super().__init__(ov, event, wait_handle, loop=loop)
     177  
     178          self._done_callback = None
     179  
     180      def cancel(self):
     181          raise RuntimeError("_WaitCancelFuture must not be cancelled")
     182  
     183      def set_result(self, result):
     184          super().set_result(result)
     185          if self._done_callback is not None:
     186              self._done_callback(self)
     187  
     188      def set_exception(self, exception):
     189          super().set_exception(exception)
     190          if self._done_callback is not None:
     191              self._done_callback(self)
     192  
     193  
     194  class ESC[4;38;5;81m_WaitHandleFuture(ESC[4;38;5;149m_BaseWaitHandleFuture):
     195      def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
     196          super().__init__(ov, handle, wait_handle, loop=loop)
     197          self._proactor = proactor
     198          self._unregister_proactor = True
     199          self._event = _overlapped.CreateEvent(None, True, False, None)
     200          self._event_fut = None
     201  
     202      def _unregister_wait_cb(self, fut):
     203          if self._event is not None:
     204              _winapi.CloseHandle(self._event)
     205              self._event = None
     206              self._event_fut = None
     207  
     208          # If the wait was cancelled, the wait may never be signalled, so
     209          # it's required to unregister it. Otherwise, IocpProactor.close() will
     210          # wait forever for an event which will never come.
     211          #
     212          # If the IocpProactor already received the event, it's safe to call
     213          # _unregister() because we kept a reference to the Overlapped object
     214          # which is used as a unique key.
     215          self._proactor._unregister(self._ov)
     216          self._proactor = None
     217  
     218          super()._unregister_wait_cb(fut)
     219  
     220      def _unregister_wait(self):
     221          if not self._registered:
     222              return
     223          self._registered = False
     224  
     225          wait_handle = self._wait_handle
     226          self._wait_handle = None
     227          try:
     228              _overlapped.UnregisterWaitEx(wait_handle, self._event)
     229          except OSError as exc:
     230              if exc.winerror != _overlapped.ERROR_IO_PENDING:
     231                  context = {
     232                      'message': 'Failed to unregister the wait handle',
     233                      'exception': exc,
     234                      'future': self,
     235                  }
     236                  if self._source_traceback:
     237                      context['source_traceback'] = self._source_traceback
     238                  self._loop.call_exception_handler(context)
     239                  return
     240              # ERROR_IO_PENDING is not an error, the wait was unregistered
     241  
     242          self._event_fut = self._proactor._wait_cancel(self._event,
     243                                                        self._unregister_wait_cb)
     244  
     245  
     246  class ESC[4;38;5;81mPipeServer(ESC[4;38;5;149mobject):
     247      """Class representing a pipe server.
     248  
     249      This is much like a bound, listening socket.
     250      """
     251      def __init__(self, address):
     252          self._address = address
     253          self._free_instances = weakref.WeakSet()
     254          # initialize the pipe attribute before calling _server_pipe_handle()
     255          # because this function can raise an exception and the destructor calls
     256          # the close() method
     257          self._pipe = None
     258          self._accept_pipe_future = None
     259          self._pipe = self._server_pipe_handle(True)
     260  
     261      def _get_unconnected_pipe(self):
     262          # Create new instance and return previous one.  This ensures
     263          # that (until the server is closed) there is always at least
     264          # one pipe handle for address.  Therefore if a client attempt
     265          # to connect it will not fail with FileNotFoundError.
     266          tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
     267          return tmp
     268  
     269      def _server_pipe_handle(self, first):
     270          # Return a wrapper for a new pipe handle.
     271          if self.closed():
     272              return None
     273          flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
     274          if first:
     275              flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
     276          h = _winapi.CreateNamedPipe(
     277              self._address, flags,
     278              _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
     279              _winapi.PIPE_WAIT,
     280              _winapi.PIPE_UNLIMITED_INSTANCES,
     281              windows_utils.BUFSIZE, windows_utils.BUFSIZE,
     282              _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
     283          pipe = windows_utils.PipeHandle(h)
     284          self._free_instances.add(pipe)
     285          return pipe
     286  
     287      def closed(self):
     288          return (self._address is None)
     289  
     290      def close(self):
     291          if self._accept_pipe_future is not None:
     292              self._accept_pipe_future.cancel()
     293              self._accept_pipe_future = None
     294          # Close all instances which have not been connected to by a client.
     295          if self._address is not None:
     296              for pipe in self._free_instances:
     297                  pipe.close()
     298              self._pipe = None
     299              self._address = None
     300              self._free_instances.clear()
     301  
     302      __del__ = close
     303  
     304  
     305  class ESC[4;38;5;81m_WindowsSelectorEventLoop(ESC[4;38;5;149mselector_eventsESC[4;38;5;149m.ESC[4;38;5;149mBaseSelectorEventLoop):
     306      """Windows version of selector event loop."""
     307  
     308  
     309  class ESC[4;38;5;81mProactorEventLoop(ESC[4;38;5;149mproactor_eventsESC[4;38;5;149m.ESC[4;38;5;149mBaseProactorEventLoop):
     310      """Windows version of proactor event loop using IOCP."""
     311  
     312      def __init__(self, proactor=None):
     313          if proactor is None:
     314              proactor = IocpProactor()
     315          super().__init__(proactor)
     316  
     317      def run_forever(self):
     318          try:
     319              assert self._self_reading_future is None
     320              self.call_soon(self._loop_self_reading)
     321              super().run_forever()
     322          finally:
     323              if self._self_reading_future is not None:
     324                  ov = self._self_reading_future._ov
     325                  self._self_reading_future.cancel()
     326                  # self_reading_future was just cancelled so if it hasn't been
     327                  # finished yet, it never will be (it's possible that it has
     328                  # already finished and its callback is waiting in the queue,
     329                  # where it could still happen if the event loop is restarted).
     330                  # Unregister it otherwise IocpProactor.close will wait for it
     331                  # forever
     332                  if ov is not None:
     333                      self._proactor._unregister(ov)
     334                  self._self_reading_future = None
     335  
     336      async def create_pipe_connection(self, protocol_factory, address):
     337          f = self._proactor.connect_pipe(address)
     338          pipe = await f
     339          protocol = protocol_factory()
     340          trans = self._make_duplex_pipe_transport(pipe, protocol,
     341                                                   extra={'addr': address})
     342          return trans, protocol
     343  
     344      async def start_serving_pipe(self, protocol_factory, address):
     345          server = PipeServer(address)
     346  
     347          def loop_accept_pipe(f=None):
     348              pipe = None
     349              try:
     350                  if f:
     351                      pipe = f.result()
     352                      server._free_instances.discard(pipe)
     353  
     354                      if server.closed():
     355                          # A client connected before the server was closed:
     356                          # drop the client (close the pipe) and exit
     357                          pipe.close()
     358                          return
     359  
     360                      protocol = protocol_factory()
     361                      self._make_duplex_pipe_transport(
     362                          pipe, protocol, extra={'addr': address})
     363  
     364                  pipe = server._get_unconnected_pipe()
     365                  if pipe is None:
     366                      return
     367  
     368                  f = self._proactor.accept_pipe(pipe)
     369              except BrokenPipeError:
     370                  if pipe and pipe.fileno() != -1:
     371                      pipe.close()
     372                  self.call_soon(loop_accept_pipe)
     373              except OSError as exc:
     374                  if pipe and pipe.fileno() != -1:
     375                      self.call_exception_handler({
     376                          'message': 'Pipe accept failed',
     377                          'exception': exc,
     378                          'pipe': pipe,
     379                      })
     380                      pipe.close()
     381                  elif self._debug:
     382                      logger.warning("Accept pipe failed on pipe %r",
     383                                     pipe, exc_info=True)
     384                  self.call_soon(loop_accept_pipe)
     385              except exceptions.CancelledError:
     386                  if pipe:
     387                      pipe.close()
     388              else:
     389                  server._accept_pipe_future = f
     390                  f.add_done_callback(loop_accept_pipe)
     391  
     392          self.call_soon(loop_accept_pipe)
     393          return [server]
     394  
     395      async def _make_subprocess_transport(self, protocol, args, shell,
     396                                           stdin, stdout, stderr, bufsize,
     397                                           extra=None, **kwargs):
     398          waiter = self.create_future()
     399          transp = _WindowsSubprocessTransport(self, protocol, args, shell,
     400                                               stdin, stdout, stderr, bufsize,
     401                                               waiter=waiter, extra=extra,
     402                                               **kwargs)
     403          try:
     404              await waiter
     405          except (SystemExit, KeyboardInterrupt):
     406              raise
     407          except BaseException:
     408              transp.close()
     409              await transp._wait()
     410              raise
     411  
     412          return transp
     413  
     414  
     415  class ESC[4;38;5;81mIocpProactor:
     416      """Proactor implementation using IOCP."""
     417  
     418      def __init__(self, concurrency=INFINITE):
     419          self._loop = None
     420          self._results = []
     421          self._iocp = _overlapped.CreateIoCompletionPort(
     422              _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
     423          self._cache = {}
     424          self._registered = weakref.WeakSet()
     425          self._unregistered = []
     426          self._stopped_serving = weakref.WeakSet()
     427  
     428      def _check_closed(self):
     429          if self._iocp is None:
     430              raise RuntimeError('IocpProactor is closed')
     431  
     432      def __repr__(self):
     433          info = ['overlapped#=%s' % len(self._cache),
     434                  'result#=%s' % len(self._results)]
     435          if self._iocp is None:
     436              info.append('closed')
     437          return '<%s %s>' % (self.__class__.__name__, " ".join(info))
     438  
     439      def set_loop(self, loop):
     440          self._loop = loop
     441  
     442      def select(self, timeout=None):
     443          if not self._results:
     444              self._poll(timeout)
     445          tmp = self._results
     446          self._results = []
     447          try:
     448              return tmp
     449          finally:
     450              # Needed to break cycles when an exception occurs.
     451              tmp = None
     452  
     453      def _result(self, value):
     454          fut = self._loop.create_future()
     455          fut.set_result(value)
     456          return fut
     457  
     458      def recv(self, conn, nbytes, flags=0):
     459          self._register_with_iocp(conn)
     460          ov = _overlapped.Overlapped(NULL)
     461          try:
     462              if isinstance(conn, socket.socket):
     463                  ov.WSARecv(conn.fileno(), nbytes, flags)
     464              else:
     465                  ov.ReadFile(conn.fileno(), nbytes)
     466          except BrokenPipeError:
     467              return self._result(b'')
     468  
     469          def finish_recv(trans, key, ov):
     470              try:
     471                  return ov.getresult()
     472              except OSError as exc:
     473                  if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
     474                                      _overlapped.ERROR_OPERATION_ABORTED):
     475                      raise ConnectionResetError(*exc.args)
     476                  else:
     477                      raise
     478  
     479          return self._register(ov, conn, finish_recv)
     480  
     481      def recv_into(self, conn, buf, flags=0):
     482          self._register_with_iocp(conn)
     483          ov = _overlapped.Overlapped(NULL)
     484          try:
     485              if isinstance(conn, socket.socket):
     486                  ov.WSARecvInto(conn.fileno(), buf, flags)
     487              else:
     488                  ov.ReadFileInto(conn.fileno(), buf)
     489          except BrokenPipeError:
     490              return self._result(0)
     491  
     492          def finish_recv(trans, key, ov):
     493              try:
     494                  return ov.getresult()
     495              except OSError as exc:
     496                  if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
     497                                      _overlapped.ERROR_OPERATION_ABORTED):
     498                      raise ConnectionResetError(*exc.args)
     499                  else:
     500                      raise
     501  
     502          return self._register(ov, conn, finish_recv)
     503  
     504      def recvfrom(self, conn, nbytes, flags=0):
     505          self._register_with_iocp(conn)
     506          ov = _overlapped.Overlapped(NULL)
     507          try:
     508              ov.WSARecvFrom(conn.fileno(), nbytes, flags)
     509          except BrokenPipeError:
     510              return self._result((b'', None))
     511  
     512          def finish_recv(trans, key, ov):
     513              try:
     514                  return ov.getresult()
     515              except OSError as exc:
     516                  if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
     517                                      _overlapped.ERROR_OPERATION_ABORTED):
     518                      raise ConnectionResetError(*exc.args)
     519                  else:
     520                      raise
     521  
     522          return self._register(ov, conn, finish_recv)
     523  
     524      def recvfrom_into(self, conn, buf, flags=0):
     525          self._register_with_iocp(conn)
     526          ov = _overlapped.Overlapped(NULL)
     527          try:
     528              ov.WSARecvFromInto(conn.fileno(), buf, flags)
     529          except BrokenPipeError:
     530              return self._result((0, None))
     531  
     532          def finish_recv(trans, key, ov):
     533              try:
     534                  return ov.getresult()
     535              except OSError as exc:
     536                  if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
     537                                      _overlapped.ERROR_OPERATION_ABORTED):
     538                      raise ConnectionResetError(*exc.args)
     539                  else:
     540                      raise
     541  
     542          return self._register(ov, conn, finish_recv)
     543  
     544      def sendto(self, conn, buf, flags=0, addr=None):
     545          self._register_with_iocp(conn)
     546          ov = _overlapped.Overlapped(NULL)
     547  
     548          ov.WSASendTo(conn.fileno(), buf, flags, addr)
     549  
     550          def finish_send(trans, key, ov):
     551              try:
     552                  return ov.getresult()
     553              except OSError as exc:
     554                  if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
     555                                      _overlapped.ERROR_OPERATION_ABORTED):
     556                      raise ConnectionResetError(*exc.args)
     557                  else:
     558                      raise
     559  
     560          return self._register(ov, conn, finish_send)
     561  
     562      def send(self, conn, buf, flags=0):
     563          self._register_with_iocp(conn)
     564          ov = _overlapped.Overlapped(NULL)
     565          if isinstance(conn, socket.socket):
     566              ov.WSASend(conn.fileno(), buf, flags)
     567          else:
     568              ov.WriteFile(conn.fileno(), buf)
     569  
     570          def finish_send(trans, key, ov):
     571              try:
     572                  return ov.getresult()
     573              except OSError as exc:
     574                  if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
     575                                      _overlapped.ERROR_OPERATION_ABORTED):
     576                      raise ConnectionResetError(*exc.args)
     577                  else:
     578                      raise
     579  
     580          return self._register(ov, conn, finish_send)
     581  
     582      def accept(self, listener):
     583          self._register_with_iocp(listener)
     584          conn = self._get_accept_socket(listener.family)
     585          ov = _overlapped.Overlapped(NULL)
     586          ov.AcceptEx(listener.fileno(), conn.fileno())
     587  
     588          def finish_accept(trans, key, ov):
     589              ov.getresult()
     590              # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
     591              buf = struct.pack('@P', listener.fileno())
     592              conn.setsockopt(socket.SOL_SOCKET,
     593                              _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
     594              conn.settimeout(listener.gettimeout())
     595              return conn, conn.getpeername()
     596  
     597          async def accept_coro(future, conn):
     598              # Coroutine closing the accept socket if the future is cancelled
     599              try:
     600                  await future
     601              except exceptions.CancelledError:
     602                  conn.close()
     603                  raise
     604  
     605          future = self._register(ov, listener, finish_accept)
     606          coro = accept_coro(future, conn)
     607          tasks.ensure_future(coro, loop=self._loop)
     608          return future
     609  
     610      def connect(self, conn, address):
     611          if conn.type == socket.SOCK_DGRAM:
     612              # WSAConnect will complete immediately for UDP sockets so we don't
     613              # need to register any IOCP operation
     614              _overlapped.WSAConnect(conn.fileno(), address)
     615              fut = self._loop.create_future()
     616              fut.set_result(None)
     617              return fut
     618  
     619          self._register_with_iocp(conn)
     620          # The socket needs to be locally bound before we call ConnectEx().
     621          try:
     622              _overlapped.BindLocal(conn.fileno(), conn.family)
     623          except OSError as e:
     624              if e.winerror != errno.WSAEINVAL:
     625                  raise
     626              # Probably already locally bound; check using getsockname().
     627              if conn.getsockname()[1] == 0:
     628                  raise
     629          ov = _overlapped.Overlapped(NULL)
     630          ov.ConnectEx(conn.fileno(), address)
     631  
     632          def finish_connect(trans, key, ov):
     633              ov.getresult()
     634              # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
     635              conn.setsockopt(socket.SOL_SOCKET,
     636                              _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
     637              return conn
     638  
     639          return self._register(ov, conn, finish_connect)
     640  
     641      def sendfile(self, sock, file, offset, count):
     642          self._register_with_iocp(sock)
     643          ov = _overlapped.Overlapped(NULL)
     644          offset_low = offset & 0xffff_ffff
     645          offset_high = (offset >> 32) & 0xffff_ffff
     646          ov.TransmitFile(sock.fileno(),
     647                          msvcrt.get_osfhandle(file.fileno()),
     648                          offset_low, offset_high,
     649                          count, 0, 0)
     650  
     651          def finish_sendfile(trans, key, ov):
     652              try:
     653                  return ov.getresult()
     654              except OSError as exc:
     655                  if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
     656                                      _overlapped.ERROR_OPERATION_ABORTED):
     657                      raise ConnectionResetError(*exc.args)
     658                  else:
     659                      raise
     660          return self._register(ov, sock, finish_sendfile)
     661  
     662      def accept_pipe(self, pipe):
     663          self._register_with_iocp(pipe)
     664          ov = _overlapped.Overlapped(NULL)
     665          connected = ov.ConnectNamedPipe(pipe.fileno())
     666  
     667          if connected:
     668              # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
     669              # that the pipe is connected. There is no need to wait for the
     670              # completion of the connection.
     671              return self._result(pipe)
     672  
     673          def finish_accept_pipe(trans, key, ov):
     674              ov.getresult()
     675              return pipe
     676  
     677          return self._register(ov, pipe, finish_accept_pipe)
     678  
     679      async def connect_pipe(self, address):
     680          delay = CONNECT_PIPE_INIT_DELAY
     681          while True:
     682              # Unfortunately there is no way to do an overlapped connect to
     683              # a pipe.  Call CreateFile() in a loop until it doesn't fail with
     684              # ERROR_PIPE_BUSY.
     685              try:
     686                  handle = _overlapped.ConnectPipe(address)
     687                  break
     688              except OSError as exc:
     689                  if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
     690                      raise
     691  
     692              # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
     693              delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
     694              await tasks.sleep(delay)
     695  
     696          return windows_utils.PipeHandle(handle)
     697  
     698      def wait_for_handle(self, handle, timeout=None):
     699          """Wait for a handle.
     700  
     701          Return a Future object. The result of the future is True if the wait
     702          completed, or False if the wait did not complete (on timeout).
     703          """
     704          return self._wait_for_handle(handle, timeout, False)
     705  
     706      def _wait_cancel(self, event, done_callback):
     707          fut = self._wait_for_handle(event, None, True)
     708          # add_done_callback() cannot be used because the wait may only complete
     709          # in IocpProactor.close(), while the event loop is not running.
     710          fut._done_callback = done_callback
     711          return fut
     712  
     713      def _wait_for_handle(self, handle, timeout, _is_cancel):
     714          self._check_closed()
     715  
     716          if timeout is None:
     717              ms = _winapi.INFINITE
     718          else:
     719              # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
     720              # round away from zero to wait *at least* timeout seconds.
     721              ms = math.ceil(timeout * 1e3)
     722  
     723          # We only create ov so we can use ov.address as a key for the cache.
     724          ov = _overlapped.Overlapped(NULL)
     725          wait_handle = _overlapped.RegisterWaitWithQueue(
     726              handle, self._iocp, ov.address, ms)
     727          if _is_cancel:
     728              f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
     729          else:
     730              f = _WaitHandleFuture(ov, handle, wait_handle, self,
     731                                    loop=self._loop)
     732          if f._source_traceback:
     733              del f._source_traceback[-1]
     734  
     735          def finish_wait_for_handle(trans, key, ov):
     736              # Note that this second wait means that we should only use
     737              # this with handles types where a successful wait has no
     738              # effect.  So events or processes are all right, but locks
     739              # or semaphores are not.  Also note if the handle is
     740              # signalled and then quickly reset, then we may return
     741              # False even though we have not timed out.
     742              return f._poll()
     743  
     744          self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
     745          return f
     746  
     747      def _register_with_iocp(self, obj):
     748          # To get notifications of finished ops on this objects sent to the
     749          # completion port, were must register the handle.
     750          if obj not in self._registered:
     751              self._registered.add(obj)
     752              _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
     753              # XXX We could also use SetFileCompletionNotificationModes()
     754              # to avoid sending notifications to completion port of ops
     755              # that succeed immediately.
     756  
     757      def _register(self, ov, obj, callback):
     758          self._check_closed()
     759  
     760          # Return a future which will be set with the result of the
     761          # operation when it completes.  The future's value is actually
     762          # the value returned by callback().
     763          f = _OverlappedFuture(ov, loop=self._loop)
     764          if f._source_traceback:
     765              del f._source_traceback[-1]
     766          if not ov.pending:
     767              # The operation has completed, so no need to postpone the
     768              # work.  We cannot take this short cut if we need the
     769              # NumberOfBytes, CompletionKey values returned by
     770              # PostQueuedCompletionStatus().
     771              try:
     772                  value = callback(None, None, ov)
     773              except OSError as e:
     774                  f.set_exception(e)
     775              else:
     776                  f.set_result(value)
     777              # Even if GetOverlappedResult() was called, we have to wait for the
     778              # notification of the completion in GetQueuedCompletionStatus().
     779              # Register the overlapped operation to keep a reference to the
     780              # OVERLAPPED object, otherwise the memory is freed and Windows may
     781              # read uninitialized memory.
     782  
     783          # Register the overlapped operation for later.  Note that
     784          # we only store obj to prevent it from being garbage
     785          # collected too early.
     786          self._cache[ov.address] = (f, ov, obj, callback)
     787          return f
     788  
     789      def _unregister(self, ov):
     790          """Unregister an overlapped object.
     791  
     792          Call this method when its future has been cancelled. The event can
     793          already be signalled (pending in the proactor event queue). It is also
     794          safe if the event is never signalled (because it was cancelled).
     795          """
     796          self._check_closed()
     797          self._unregistered.append(ov)
     798  
     799      def _get_accept_socket(self, family):
     800          s = socket.socket(family)
     801          s.settimeout(0)
     802          return s
     803  
     804      def _poll(self, timeout=None):
     805          if timeout is None:
     806              ms = INFINITE
     807          elif timeout < 0:
     808              raise ValueError("negative timeout")
     809          else:
     810              # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
     811              # round away from zero to wait *at least* timeout seconds.
     812              ms = math.ceil(timeout * 1e3)
     813              if ms >= INFINITE:
     814                  raise ValueError("timeout too big")
     815  
     816          while True:
     817              status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
     818              if status is None:
     819                  break
     820              ms = 0
     821  
     822              err, transferred, key, address = status
     823              try:
     824                  f, ov, obj, callback = self._cache.pop(address)
     825              except KeyError:
     826                  if self._loop.get_debug():
     827                      self._loop.call_exception_handler({
     828                          'message': ('GetQueuedCompletionStatus() returned an '
     829                                      'unexpected event'),
     830                          'status': ('err=%s transferred=%s key=%#x address=%#x'
     831                                     % (err, transferred, key, address)),
     832                      })
     833  
     834                  # key is either zero, or it is used to return a pipe
     835                  # handle which should be closed to avoid a leak.
     836                  if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
     837                      _winapi.CloseHandle(key)
     838                  continue
     839  
     840              if obj in self._stopped_serving:
     841                  f.cancel()
     842              # Don't call the callback if _register() already read the result or
     843              # if the overlapped has been cancelled
     844              elif not f.done():
     845                  try:
     846                      value = callback(transferred, key, ov)
     847                  except OSError as e:
     848                      f.set_exception(e)
     849                      self._results.append(f)
     850                  else:
     851                      f.set_result(value)
     852                      self._results.append(f)
     853                  finally:
     854                      f = None
     855  
     856          # Remove unregistered futures
     857          for ov in self._unregistered:
     858              self._cache.pop(ov.address, None)
     859          self._unregistered.clear()
     860  
     861      def _stop_serving(self, obj):
     862          # obj is a socket or pipe handle.  It will be closed in
     863          # BaseProactorEventLoop._stop_serving() which will make any
     864          # pending operations fail quickly.
     865          self._stopped_serving.add(obj)
     866  
     867      def close(self):
     868          if self._iocp is None:
     869              # already closed
     870              return
     871  
     872          # Cancel remaining registered operations.
     873          for fut, ov, obj, callback in list(self._cache.values()):
     874              if fut.cancelled():
     875                  # Nothing to do with cancelled futures
     876                  pass
     877              elif isinstance(fut, _WaitCancelFuture):
     878                  # _WaitCancelFuture must not be cancelled
     879                  pass
     880              else:
     881                  try:
     882                      fut.cancel()
     883                  except OSError as exc:
     884                      if self._loop is not None:
     885                          context = {
     886                              'message': 'Cancelling a future failed',
     887                              'exception': exc,
     888                              'future': fut,
     889                          }
     890                          if fut._source_traceback:
     891                              context['source_traceback'] = fut._source_traceback
     892                          self._loop.call_exception_handler(context)
     893  
     894          # Wait until all cancelled overlapped complete: don't exit with running
     895          # overlapped to prevent a crash. Display progress every second if the
     896          # loop is still running.
     897          msg_update = 1.0
     898          start_time = time.monotonic()
     899          next_msg = start_time + msg_update
     900          while self._cache:
     901              if next_msg <= time.monotonic():
     902                  logger.debug('%r is running after closing for %.1f seconds',
     903                               self, time.monotonic() - start_time)
     904                  next_msg = time.monotonic() + msg_update
     905  
     906              # handle a few events, or timeout
     907              self._poll(msg_update)
     908  
     909          self._results = []
     910  
     911          _winapi.CloseHandle(self._iocp)
     912          self._iocp = None
     913  
     914      def __del__(self):
     915          self.close()
     916  
     917  
     918  class ESC[4;38;5;81m_WindowsSubprocessTransport(ESC[4;38;5;149mbase_subprocessESC[4;38;5;149m.ESC[4;38;5;149mBaseSubprocessTransport):
     919  
     920      def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
     921          self._proc = windows_utils.Popen(
     922              args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
     923              bufsize=bufsize, **kwargs)
     924  
     925          def callback(f):
     926              returncode = self._proc.poll()
     927              self._process_exited(returncode)
     928  
     929          f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
     930          f.add_done_callback(callback)
     931  
     932  
     933  SelectorEventLoop = _WindowsSelectorEventLoop
     934  
     935  
     936  class ESC[4;38;5;81mWindowsSelectorEventLoopPolicy(ESC[4;38;5;149meventsESC[4;38;5;149m.ESC[4;38;5;149mBaseDefaultEventLoopPolicy):
     937      _loop_factory = SelectorEventLoop
     938  
     939  
     940  class ESC[4;38;5;81mWindowsProactorEventLoopPolicy(ESC[4;38;5;149meventsESC[4;38;5;149m.ESC[4;38;5;149mBaseDefaultEventLoopPolicy):
     941      _loop_factory = ProactorEventLoop
     942  
     943  
     944  DefaultEventLoopPolicy = WindowsProactorEventLoopPolicy