1  import os
       2  import signal
       3  import socket
       4  import sys
       5  import time
       6  import threading
       7  import unittest
       8  from unittest import mock
       9  
      10  if sys.platform != 'win32':
      11      raise unittest.SkipTest('Windows only')
      12  
      13  import _overlapped
      14  import _winapi
      15  
      16  import asyncio
      17  from asyncio import windows_events
      18  from test.test_asyncio import utils as test_utils
      19  
      20  
      21  def tearDownModule():
      22      asyncio.set_event_loop_policy(None)
      23  
      24  
      25  class ESC[4;38;5;81mUpperProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mProtocol):
      26      def __init__(self):
      27          self.buf = []
      28  
      29      def connection_made(self, trans):
      30          self.trans = trans
      31  
      32      def data_received(self, data):
      33          self.buf.append(data)
      34          if b'\n' in data:
      35              self.trans.write(b''.join(self.buf).upper())
      36              self.trans.close()
      37  
      38  
      39  class ESC[4;38;5;81mProactorLoopCtrlC(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      40  
      41      def test_ctrl_c(self):
      42  
      43          def SIGINT_after_delay():
      44              time.sleep(0.1)
      45              signal.raise_signal(signal.SIGINT)
      46  
      47          thread = threading.Thread(target=SIGINT_after_delay)
      48          loop = asyncio.new_event_loop()
      49          try:
      50              # only start the loop once the event loop is running
      51              loop.call_soon(thread.start)
      52              loop.run_forever()
      53              self.fail("should not fall through 'run_forever'")
      54          except KeyboardInterrupt:
      55              pass
      56          finally:
      57              self.close_loop(loop)
      58          thread.join()
      59  
      60  
      61  class ESC[4;38;5;81mProactorMultithreading(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      62      def test_run_from_nonmain_thread(self):
      63          finished = False
      64  
      65          async def coro():
      66              await asyncio.sleep(0)
      67  
      68          def func():
      69              nonlocal finished
      70              loop = asyncio.new_event_loop()
      71              loop.run_until_complete(coro())
      72              # close() must not call signal.set_wakeup_fd()
      73              loop.close()
      74              finished = True
      75  
      76          thread = threading.Thread(target=func)
      77          thread.start()
      78          thread.join()
      79          self.assertTrue(finished)
      80  
      81  
      82  class ESC[4;38;5;81mProactorTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      83  
      84      def setUp(self):
      85          super().setUp()
      86          self.loop = asyncio.ProactorEventLoop()
      87          self.set_event_loop(self.loop)
      88  
      89      def test_close(self):
      90          a, b = socket.socketpair()
      91          trans = self.loop._make_socket_transport(a, asyncio.Protocol())
      92          f = asyncio.ensure_future(self.loop.sock_recv(b, 100), loop=self.loop)
      93          trans.close()
      94          self.loop.run_until_complete(f)
      95          self.assertEqual(f.result(), b'')
      96          b.close()
      97  
      98      def test_double_bind(self):
      99          ADDRESS = r'\\.\pipe\test_double_bind-%s' % os.getpid()
     100          server1 = windows_events.PipeServer(ADDRESS)
     101          with self.assertRaises(PermissionError):
     102              windows_events.PipeServer(ADDRESS)
     103          server1.close()
     104  
     105      def test_pipe(self):
     106          res = self.loop.run_until_complete(self._test_pipe())
     107          self.assertEqual(res, 'done')
     108  
     109      async def _test_pipe(self):
     110          ADDRESS = r'\\.\pipe\_test_pipe-%s' % os.getpid()
     111  
     112          with self.assertRaises(FileNotFoundError):
     113              await self.loop.create_pipe_connection(
     114                  asyncio.Protocol, ADDRESS)
     115  
     116          [server] = await self.loop.start_serving_pipe(
     117              UpperProto, ADDRESS)
     118          self.assertIsInstance(server, windows_events.PipeServer)
     119  
     120          clients = []
     121          for i in range(5):
     122              stream_reader = asyncio.StreamReader(loop=self.loop)
     123              protocol = asyncio.StreamReaderProtocol(stream_reader,
     124                                                      loop=self.loop)
     125              trans, proto = await self.loop.create_pipe_connection(
     126                  lambda: protocol, ADDRESS)
     127              self.assertIsInstance(trans, asyncio.Transport)
     128              self.assertEqual(protocol, proto)
     129              clients.append((stream_reader, trans))
     130  
     131          for i, (r, w) in enumerate(clients):
     132              w.write('lower-{}\n'.format(i).encode())
     133  
     134          for i, (r, w) in enumerate(clients):
     135              response = await r.readline()
     136              self.assertEqual(response, 'LOWER-{}\n'.format(i).encode())
     137              w.close()
     138  
     139          server.close()
     140  
     141          with self.assertRaises(FileNotFoundError):
     142              await self.loop.create_pipe_connection(
     143                  asyncio.Protocol, ADDRESS)
     144  
     145          return 'done'
     146  
     147      def test_connect_pipe_cancel(self):
     148          exc = OSError()
     149          exc.winerror = _overlapped.ERROR_PIPE_BUSY
     150          with mock.patch.object(_overlapped, 'ConnectPipe',
     151                                 side_effect=exc) as connect:
     152              coro = self.loop._proactor.connect_pipe('pipe_address')
     153              task = self.loop.create_task(coro)
     154  
     155              # check that it's possible to cancel connect_pipe()
     156              task.cancel()
     157              with self.assertRaises(asyncio.CancelledError):
     158                  self.loop.run_until_complete(task)
     159  
     160      def test_wait_for_handle(self):
     161          event = _overlapped.CreateEvent(None, True, False, None)
     162          self.addCleanup(_winapi.CloseHandle, event)
     163  
     164          # Wait for unset event with 0.5s timeout;
     165          # result should be False at timeout
     166          fut = self.loop._proactor.wait_for_handle(event, 0.5)
     167          start = self.loop.time()
     168          done = self.loop.run_until_complete(fut)
     169          elapsed = self.loop.time() - start
     170  
     171          self.assertEqual(done, False)
     172          self.assertFalse(fut.result())
     173          # bpo-31008: Tolerate only 450 ms (at least 500 ms expected),
     174          # because of bad clock resolution on Windows
     175          self.assertTrue(0.45 <= elapsed <= 0.9, elapsed)
     176  
     177          _overlapped.SetEvent(event)
     178  
     179          # Wait for set event;
     180          # result should be True immediately
     181          fut = self.loop._proactor.wait_for_handle(event, 10)
     182          start = self.loop.time()
     183          done = self.loop.run_until_complete(fut)
     184          elapsed = self.loop.time() - start
     185  
     186          self.assertEqual(done, True)
     187          self.assertTrue(fut.result())
     188          self.assertTrue(0 <= elapsed < 0.3, elapsed)
     189  
     190          # asyncio issue #195: cancelling a done _WaitHandleFuture
     191          # must not crash
     192          fut.cancel()
     193  
     194      def test_wait_for_handle_cancel(self):
     195          event = _overlapped.CreateEvent(None, True, False, None)
     196          self.addCleanup(_winapi.CloseHandle, event)
     197  
     198          # Wait for unset event with a cancelled future;
     199          # CancelledError should be raised immediately
     200          fut = self.loop._proactor.wait_for_handle(event, 10)
     201          fut.cancel()
     202          start = self.loop.time()
     203          with self.assertRaises(asyncio.CancelledError):
     204              self.loop.run_until_complete(fut)
     205          elapsed = self.loop.time() - start
     206          self.assertTrue(0 <= elapsed < 0.1, elapsed)
     207  
     208          # asyncio issue #195: cancelling a _WaitHandleFuture twice
     209          # must not crash
     210          fut = self.loop._proactor.wait_for_handle(event)
     211          fut.cancel()
     212          fut.cancel()
     213  
     214      def test_read_self_pipe_restart(self):
     215          # Regression test for https://bugs.python.org/issue39010
     216          # Previously, restarting a proactor event loop in certain states
     217          # would lead to spurious ConnectionResetErrors being logged.
     218          self.loop.call_exception_handler = mock.Mock()
     219          # Start an operation in another thread so that the self-pipe is used.
     220          # This is theoretically timing-dependent (the task in the executor
     221          # must complete before our start/stop cycles), but in practice it
     222          # seems to work every time.
     223          f = self.loop.run_in_executor(None, lambda: None)
     224          self.loop.stop()
     225          self.loop.run_forever()
     226          self.loop.stop()
     227          self.loop.run_forever()
     228  
     229          # Shut everything down cleanly. This is an important part of the
     230          # test - in issue 39010, the error occurred during loop.close(),
     231          # so we want to close the loop during the test instead of leaving
     232          # it for tearDown.
     233          #
     234          # First wait for f to complete to avoid a "future's result was never
     235          # retrieved" error.
     236          self.loop.run_until_complete(f)
     237          # Now shut down the loop itself (self.close_loop also shuts down the
     238          # loop's default executor).
     239          self.close_loop(self.loop)
     240          self.assertFalse(self.loop.call_exception_handler.called)
     241  
     242      def test_address_argument_type_error(self):
     243          # Regression test for https://github.com/python/cpython/issues/98793
     244          proactor = self.loop._proactor
     245          sock = socket.socket(type=socket.SOCK_DGRAM)
     246          bad_address = None
     247          with self.assertRaises(TypeError):
     248              proactor.connect(sock, bad_address)
     249          with self.assertRaises(TypeError):
     250              proactor.sendto(sock, b'abc', addr=bad_address)
     251          sock.close()
     252  
     253      def test_client_pipe_stat(self):
     254          res = self.loop.run_until_complete(self._test_client_pipe_stat())
     255          self.assertEqual(res, 'done')
     256  
     257      async def _test_client_pipe_stat(self):
     258          # Regression test for https://github.com/python/cpython/issues/100573
     259          ADDRESS = r'\\.\pipe\test_client_pipe_stat-%s' % os.getpid()
     260  
     261          async def probe():
     262              # See https://github.com/python/cpython/pull/100959#discussion_r1068533658
     263              h = _overlapped.ConnectPipe(ADDRESS)
     264              try:
     265                  _winapi.CloseHandle(_overlapped.ConnectPipe(ADDRESS))
     266              except OSError as e:
     267                  if e.winerror != _overlapped.ERROR_PIPE_BUSY:
     268                      raise
     269              finally:
     270                  _winapi.CloseHandle(h)
     271  
     272          with self.assertRaises(FileNotFoundError):
     273              await probe()
     274  
     275          [server] = await self.loop.start_serving_pipe(asyncio.Protocol, ADDRESS)
     276          self.assertIsInstance(server, windows_events.PipeServer)
     277  
     278          errors = []
     279          self.loop.set_exception_handler(lambda _, data: errors.append(data))
     280  
     281          for i in range(5):
     282              await self.loop.create_task(probe())
     283  
     284          self.assertEqual(len(errors), 0, errors)
     285  
     286          server.close()
     287  
     288          with self.assertRaises(FileNotFoundError):
     289              await probe()
     290  
     291          return "done"
     292  
     293  
     294  class ESC[4;38;5;81mWinPolicyTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     295  
     296      def test_selector_win_policy(self):
     297          async def main():
     298              self.assertIsInstance(
     299                  asyncio.get_running_loop(),
     300                  asyncio.SelectorEventLoop)
     301  
     302          old_policy = asyncio.get_event_loop_policy()
     303          try:
     304              asyncio.set_event_loop_policy(
     305                  asyncio.WindowsSelectorEventLoopPolicy())
     306              asyncio.run(main())
     307          finally:
     308              asyncio.set_event_loop_policy(old_policy)
     309  
     310      def test_proactor_win_policy(self):
     311          async def main():
     312              self.assertIsInstance(
     313                  asyncio.get_running_loop(),
     314                  asyncio.ProactorEventLoop)
     315  
     316          old_policy = asyncio.get_event_loop_policy()
     317          try:
     318              asyncio.set_event_loop_policy(
     319                  asyncio.WindowsProactorEventLoopPolicy())
     320              asyncio.run(main())
     321          finally:
     322              asyncio.set_event_loop_policy(old_policy)
     323  
     324  
     325  if __name__ == '__main__':
     326      unittest.main()