(root)/
Python-3.11.7/
Lib/
test/
test_asyncio/
test_windows_events.py
       1  import os
       2  import signal
       3  import socket
       4  import sys
       5  import time
       6  import threading
       7  import unittest
       8  from unittest import mock
       9  
      10  if sys.platform != 'win32':
      11      raise unittest.SkipTest('Windows only')
      12  
      13  import _overlapped
      14  import _winapi
      15  
      16  import asyncio
      17  from asyncio import windows_events
      18  from test.test_asyncio import utils as test_utils
      19  
      20  
      21  def tearDownModule():
      22      asyncio.set_event_loop_policy(None)
      23  
      24  
      25  class ESC[4;38;5;81mUpperProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mProtocol):
      26      def __init__(self):
      27          self.buf = []
      28  
      29      def connection_made(self, trans):
      30          self.trans = trans
      31  
      32      def data_received(self, data):
      33          self.buf.append(data)
      34          if b'\n' in data:
      35              self.trans.write(b''.join(self.buf).upper())
      36              self.trans.close()
      37  
      38  
      39  class ESC[4;38;5;81mProactorLoopCtrlC(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      40  
      41      def test_ctrl_c(self):
      42  
      43          def SIGINT_after_delay():
      44              time.sleep(0.1)
      45              signal.raise_signal(signal.SIGINT)
      46  
      47          thread = threading.Thread(target=SIGINT_after_delay)
      48          loop = asyncio.new_event_loop()
      49          try:
      50              # only start the loop once the event loop is running
      51              loop.call_soon(thread.start)
      52              loop.run_forever()
      53              self.fail("should not fall through 'run_forever'")
      54          except KeyboardInterrupt:
      55              pass
      56          finally:
      57              self.close_loop(loop)
      58          thread.join()
      59  
      60  
      61  class ESC[4;38;5;81mProactorMultithreading(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      62      def test_run_from_nonmain_thread(self):
      63          finished = False
      64  
      65          async def coro():
      66              await asyncio.sleep(0)
      67  
      68          def func():
      69              nonlocal finished
      70              loop = asyncio.new_event_loop()
      71              loop.run_until_complete(coro())
      72              # close() must not call signal.set_wakeup_fd()
      73              loop.close()
      74              finished = True
      75  
      76          thread = threading.Thread(target=func)
      77          thread.start()
      78          thread.join()
      79          self.assertTrue(finished)
      80  
      81  
      82  class ESC[4;38;5;81mProactorTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      83  
      84      def setUp(self):
      85          super().setUp()
      86          self.loop = asyncio.ProactorEventLoop()
      87          self.set_event_loop(self.loop)
      88  
      89      def test_close(self):
      90          a, b = socket.socketpair()
      91          trans = self.loop._make_socket_transport(a, asyncio.Protocol())
      92          f = asyncio.ensure_future(self.loop.sock_recv(b, 100), loop=self.loop)
      93          trans.close()
      94          self.loop.run_until_complete(f)
      95          self.assertEqual(f.result(), b'')
      96          b.close()
      97  
      98      def test_double_bind(self):
      99          ADDRESS = r'\\.\pipe\test_double_bind-%s' % os.getpid()
     100          server1 = windows_events.PipeServer(ADDRESS)
     101          with self.assertRaises(PermissionError):
     102              windows_events.PipeServer(ADDRESS)
     103          server1.close()
     104  
     105      def test_pipe(self):
     106          res = self.loop.run_until_complete(self._test_pipe())
     107          self.assertEqual(res, 'done')
     108  
     109      async def _test_pipe(self):
     110          ADDRESS = r'\\.\pipe\_test_pipe-%s' % os.getpid()
     111  
     112          with self.assertRaises(FileNotFoundError):
     113              await self.loop.create_pipe_connection(
     114                  asyncio.Protocol, ADDRESS)
     115  
     116          [server] = await self.loop.start_serving_pipe(
     117              UpperProto, ADDRESS)
     118          self.assertIsInstance(server, windows_events.PipeServer)
     119  
     120          clients = []
     121          for i in range(5):
     122              stream_reader = asyncio.StreamReader(loop=self.loop)
     123              protocol = asyncio.StreamReaderProtocol(stream_reader,
     124                                                      loop=self.loop)
     125              trans, proto = await self.loop.create_pipe_connection(
     126                  lambda: protocol, ADDRESS)
     127              self.assertIsInstance(trans, asyncio.Transport)
     128              self.assertEqual(protocol, proto)
     129              clients.append((stream_reader, trans))
     130  
     131          for i, (r, w) in enumerate(clients):
     132              w.write('lower-{}\n'.format(i).encode())
     133  
     134          for i, (r, w) in enumerate(clients):
     135              response = await r.readline()
     136              self.assertEqual(response, 'LOWER-{}\n'.format(i).encode())
     137              w.close()
     138  
     139          server.close()
     140  
     141          with self.assertRaises(FileNotFoundError):
     142              await self.loop.create_pipe_connection(
     143                  asyncio.Protocol, ADDRESS)
     144  
     145          return 'done'
     146  
     147      def test_connect_pipe_cancel(self):
     148          exc = OSError()
     149          exc.winerror = _overlapped.ERROR_PIPE_BUSY
     150          with mock.patch.object(_overlapped, 'ConnectPipe',
     151                                 side_effect=exc) as connect:
     152              coro = self.loop._proactor.connect_pipe('pipe_address')
     153              task = self.loop.create_task(coro)
     154  
     155              # check that it's possible to cancel connect_pipe()
     156              task.cancel()
     157              with self.assertRaises(asyncio.CancelledError):
     158                  self.loop.run_until_complete(task)
     159  
     160      def test_wait_for_handle(self):
     161          event = _overlapped.CreateEvent(None, True, False, None)
     162          self.addCleanup(_winapi.CloseHandle, event)
     163  
     164          # Wait for unset event with 0.5s timeout;
     165          # result should be False at timeout
     166          timeout = 0.5
     167          fut = self.loop._proactor.wait_for_handle(event, timeout)
     168          start = self.loop.time()
     169          done = self.loop.run_until_complete(fut)
     170          elapsed = self.loop.time() - start
     171  
     172          self.assertEqual(done, False)
     173          self.assertFalse(fut.result())
     174          self.assertGreaterEqual(elapsed, timeout - test_utils.CLOCK_RES)
     175  
     176          _overlapped.SetEvent(event)
     177  
     178          # Wait for set event;
     179          # result should be True immediately
     180          fut = self.loop._proactor.wait_for_handle(event, 10)
     181          done = self.loop.run_until_complete(fut)
     182  
     183          self.assertEqual(done, True)
     184          self.assertTrue(fut.result())
     185  
     186          # asyncio issue #195: cancelling a done _WaitHandleFuture
     187          # must not crash
     188          fut.cancel()
     189  
     190      def test_wait_for_handle_cancel(self):
     191          event = _overlapped.CreateEvent(None, True, False, None)
     192          self.addCleanup(_winapi.CloseHandle, event)
     193  
     194          # Wait for unset event with a cancelled future;
     195          # CancelledError should be raised immediately
     196          fut = self.loop._proactor.wait_for_handle(event, 10)
     197          fut.cancel()
     198          with self.assertRaises(asyncio.CancelledError):
     199              self.loop.run_until_complete(fut)
     200  
     201          # asyncio issue #195: cancelling a _WaitHandleFuture twice
     202          # must not crash
     203          fut = self.loop._proactor.wait_for_handle(event)
     204          fut.cancel()
     205          fut.cancel()
     206  
     207      def test_read_self_pipe_restart(self):
     208          # Regression test for https://bugs.python.org/issue39010
     209          # Previously, restarting a proactor event loop in certain states
     210          # would lead to spurious ConnectionResetErrors being logged.
     211          self.loop.call_exception_handler = mock.Mock()
     212          # Start an operation in another thread so that the self-pipe is used.
     213          # This is theoretically timing-dependent (the task in the executor
     214          # must complete before our start/stop cycles), but in practice it
     215          # seems to work every time.
     216          f = self.loop.run_in_executor(None, lambda: None)
     217          self.loop.stop()
     218          self.loop.run_forever()
     219          self.loop.stop()
     220          self.loop.run_forever()
     221  
     222          # Shut everything down cleanly. This is an important part of the
     223          # test - in issue 39010, the error occurred during loop.close(),
     224          # so we want to close the loop during the test instead of leaving
     225          # it for tearDown.
     226          #
     227          # First wait for f to complete to avoid a "future's result was never
     228          # retrieved" error.
     229          self.loop.run_until_complete(f)
     230          # Now shut down the loop itself (self.close_loop also shuts down the
     231          # loop's default executor).
     232          self.close_loop(self.loop)
     233          self.assertFalse(self.loop.call_exception_handler.called)
     234  
     235      def test_address_argument_type_error(self):
     236          # Regression test for https://github.com/python/cpython/issues/98793
     237          proactor = self.loop._proactor
     238          sock = socket.socket(type=socket.SOCK_DGRAM)
     239          bad_address = None
     240          with self.assertRaises(TypeError):
     241              proactor.connect(sock, bad_address)
     242          with self.assertRaises(TypeError):
     243              proactor.sendto(sock, b'abc', addr=bad_address)
     244          sock.close()
     245  
     246      def test_client_pipe_stat(self):
     247          res = self.loop.run_until_complete(self._test_client_pipe_stat())
     248          self.assertEqual(res, 'done')
     249  
     250      async def _test_client_pipe_stat(self):
     251          # Regression test for https://github.com/python/cpython/issues/100573
     252          ADDRESS = r'\\.\pipe\test_client_pipe_stat-%s' % os.getpid()
     253  
     254          async def probe():
     255              # See https://github.com/python/cpython/pull/100959#discussion_r1068533658
     256              h = _overlapped.ConnectPipe(ADDRESS)
     257              try:
     258                  _winapi.CloseHandle(_overlapped.ConnectPipe(ADDRESS))
     259              except OSError as e:
     260                  if e.winerror != _overlapped.ERROR_PIPE_BUSY:
     261                      raise
     262              finally:
     263                  _winapi.CloseHandle(h)
     264  
     265          with self.assertRaises(FileNotFoundError):
     266              await probe()
     267  
     268          [server] = await self.loop.start_serving_pipe(asyncio.Protocol, ADDRESS)
     269          self.assertIsInstance(server, windows_events.PipeServer)
     270  
     271          errors = []
     272          self.loop.set_exception_handler(lambda _, data: errors.append(data))
     273  
     274          for i in range(5):
     275              await self.loop.create_task(probe())
     276  
     277          self.assertEqual(len(errors), 0, errors)
     278  
     279          server.close()
     280  
     281          with self.assertRaises(FileNotFoundError):
     282              await probe()
     283  
     284          return "done"
     285  
     286  
     287  class ESC[4;38;5;81mWinPolicyTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     288  
     289      def test_selector_win_policy(self):
     290          async def main():
     291              self.assertIsInstance(
     292                  asyncio.get_running_loop(),
     293                  asyncio.SelectorEventLoop)
     294  
     295          old_policy = asyncio.get_event_loop_policy()
     296          try:
     297              asyncio.set_event_loop_policy(
     298                  asyncio.WindowsSelectorEventLoopPolicy())
     299              asyncio.run(main())
     300          finally:
     301              asyncio.set_event_loop_policy(old_policy)
     302  
     303      def test_proactor_win_policy(self):
     304          async def main():
     305              self.assertIsInstance(
     306                  asyncio.get_running_loop(),
     307                  asyncio.ProactorEventLoop)
     308  
     309          old_policy = asyncio.get_event_loop_policy()
     310          try:
     311              asyncio.set_event_loop_policy(
     312                  asyncio.WindowsProactorEventLoopPolicy())
     313              asyncio.run(main())
     314          finally:
     315              asyncio.set_event_loop_policy(old_policy)
     316  
     317  
     318  if __name__ == '__main__':
     319      unittest.main()