1 import os
2 import signal
3 import socket
4 import sys
5 import time
6 import threading
7 import unittest
8 from unittest import mock
9
10 if sys.platform != 'win32':
11 raise unittest.SkipTest('Windows only')
12
13 import _overlapped
14 import _winapi
15
16 import asyncio
17 from asyncio import windows_events
18 from test.test_asyncio import utils as test_utils
19
20
21 def tearDownModule():
22 asyncio.set_event_loop_policy(None)
23
24
25 class ESC[4;38;5;81mUpperProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mProtocol):
26 def __init__(self):
27 self.buf = []
28
29 def connection_made(self, trans):
30 self.trans = trans
31
32 def data_received(self, data):
33 self.buf.append(data)
34 if b'\n' in data:
35 self.trans.write(b''.join(self.buf).upper())
36 self.trans.close()
37
38
39 class ESC[4;38;5;81mProactorLoopCtrlC(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
40
41 def test_ctrl_c(self):
42
43 def SIGINT_after_delay():
44 time.sleep(0.1)
45 signal.raise_signal(signal.SIGINT)
46
47 thread = threading.Thread(target=SIGINT_after_delay)
48 loop = asyncio.new_event_loop()
49 try:
50 # only start the loop once the event loop is running
51 loop.call_soon(thread.start)
52 loop.run_forever()
53 self.fail("should not fall through 'run_forever'")
54 except KeyboardInterrupt:
55 pass
56 finally:
57 self.close_loop(loop)
58 thread.join()
59
60
61 class ESC[4;38;5;81mProactorMultithreading(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
62 def test_run_from_nonmain_thread(self):
63 finished = False
64
65 async def coro():
66 await asyncio.sleep(0)
67
68 def func():
69 nonlocal finished
70 loop = asyncio.new_event_loop()
71 loop.run_until_complete(coro())
72 # close() must not call signal.set_wakeup_fd()
73 loop.close()
74 finished = True
75
76 thread = threading.Thread(target=func)
77 thread.start()
78 thread.join()
79 self.assertTrue(finished)
80
81
82 class ESC[4;38;5;81mProactorTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
83
84 def setUp(self):
85 super().setUp()
86 self.loop = asyncio.ProactorEventLoop()
87 self.set_event_loop(self.loop)
88
89 def test_close(self):
90 a, b = socket.socketpair()
91 trans = self.loop._make_socket_transport(a, asyncio.Protocol())
92 f = asyncio.ensure_future(self.loop.sock_recv(b, 100), loop=self.loop)
93 trans.close()
94 self.loop.run_until_complete(f)
95 self.assertEqual(f.result(), b'')
96 b.close()
97
98 def test_double_bind(self):
99 ADDRESS = r'\\.\pipe\test_double_bind-%s' % os.getpid()
100 server1 = windows_events.PipeServer(ADDRESS)
101 with self.assertRaises(PermissionError):
102 windows_events.PipeServer(ADDRESS)
103 server1.close()
104
105 def test_pipe(self):
106 res = self.loop.run_until_complete(self._test_pipe())
107 self.assertEqual(res, 'done')
108
109 async def _test_pipe(self):
110 ADDRESS = r'\\.\pipe\_test_pipe-%s' % os.getpid()
111
112 with self.assertRaises(FileNotFoundError):
113 await self.loop.create_pipe_connection(
114 asyncio.Protocol, ADDRESS)
115
116 [server] = await self.loop.start_serving_pipe(
117 UpperProto, ADDRESS)
118 self.assertIsInstance(server, windows_events.PipeServer)
119
120 clients = []
121 for i in range(5):
122 stream_reader = asyncio.StreamReader(loop=self.loop)
123 protocol = asyncio.StreamReaderProtocol(stream_reader,
124 loop=self.loop)
125 trans, proto = await self.loop.create_pipe_connection(
126 lambda: protocol, ADDRESS)
127 self.assertIsInstance(trans, asyncio.Transport)
128 self.assertEqual(protocol, proto)
129 clients.append((stream_reader, trans))
130
131 for i, (r, w) in enumerate(clients):
132 w.write('lower-{}\n'.format(i).encode())
133
134 for i, (r, w) in enumerate(clients):
135 response = await r.readline()
136 self.assertEqual(response, 'LOWER-{}\n'.format(i).encode())
137 w.close()
138
139 server.close()
140
141 with self.assertRaises(FileNotFoundError):
142 await self.loop.create_pipe_connection(
143 asyncio.Protocol, ADDRESS)
144
145 return 'done'
146
147 def test_connect_pipe_cancel(self):
148 exc = OSError()
149 exc.winerror = _overlapped.ERROR_PIPE_BUSY
150 with mock.patch.object(_overlapped, 'ConnectPipe',
151 side_effect=exc) as connect:
152 coro = self.loop._proactor.connect_pipe('pipe_address')
153 task = self.loop.create_task(coro)
154
155 # check that it's possible to cancel connect_pipe()
156 task.cancel()
157 with self.assertRaises(asyncio.CancelledError):
158 self.loop.run_until_complete(task)
159
160 def test_wait_for_handle(self):
161 event = _overlapped.CreateEvent(None, True, False, None)
162 self.addCleanup(_winapi.CloseHandle, event)
163
164 # Wait for unset event with 0.5s timeout;
165 # result should be False at timeout
166 timeout = 0.5
167 fut = self.loop._proactor.wait_for_handle(event, timeout)
168 start = self.loop.time()
169 done = self.loop.run_until_complete(fut)
170 elapsed = self.loop.time() - start
171
172 self.assertEqual(done, False)
173 self.assertFalse(fut.result())
174 self.assertGreaterEqual(elapsed, timeout - test_utils.CLOCK_RES)
175
176 _overlapped.SetEvent(event)
177
178 # Wait for set event;
179 # result should be True immediately
180 fut = self.loop._proactor.wait_for_handle(event, 10)
181 done = self.loop.run_until_complete(fut)
182
183 self.assertEqual(done, True)
184 self.assertTrue(fut.result())
185
186 # asyncio issue #195: cancelling a done _WaitHandleFuture
187 # must not crash
188 fut.cancel()
189
190 def test_wait_for_handle_cancel(self):
191 event = _overlapped.CreateEvent(None, True, False, None)
192 self.addCleanup(_winapi.CloseHandle, event)
193
194 # Wait for unset event with a cancelled future;
195 # CancelledError should be raised immediately
196 fut = self.loop._proactor.wait_for_handle(event, 10)
197 fut.cancel()
198 with self.assertRaises(asyncio.CancelledError):
199 self.loop.run_until_complete(fut)
200
201 # asyncio issue #195: cancelling a _WaitHandleFuture twice
202 # must not crash
203 fut = self.loop._proactor.wait_for_handle(event)
204 fut.cancel()
205 fut.cancel()
206
207 def test_read_self_pipe_restart(self):
208 # Regression test for https://bugs.python.org/issue39010
209 # Previously, restarting a proactor event loop in certain states
210 # would lead to spurious ConnectionResetErrors being logged.
211 self.loop.call_exception_handler = mock.Mock()
212 # Start an operation in another thread so that the self-pipe is used.
213 # This is theoretically timing-dependent (the task in the executor
214 # must complete before our start/stop cycles), but in practice it
215 # seems to work every time.
216 f = self.loop.run_in_executor(None, lambda: None)
217 self.loop.stop()
218 self.loop.run_forever()
219 self.loop.stop()
220 self.loop.run_forever()
221
222 # Shut everything down cleanly. This is an important part of the
223 # test - in issue 39010, the error occurred during loop.close(),
224 # so we want to close the loop during the test instead of leaving
225 # it for tearDown.
226 #
227 # First wait for f to complete to avoid a "future's result was never
228 # retrieved" error.
229 self.loop.run_until_complete(f)
230 # Now shut down the loop itself (self.close_loop also shuts down the
231 # loop's default executor).
232 self.close_loop(self.loop)
233 self.assertFalse(self.loop.call_exception_handler.called)
234
235 def test_address_argument_type_error(self):
236 # Regression test for https://github.com/python/cpython/issues/98793
237 proactor = self.loop._proactor
238 sock = socket.socket(type=socket.SOCK_DGRAM)
239 bad_address = None
240 with self.assertRaises(TypeError):
241 proactor.connect(sock, bad_address)
242 with self.assertRaises(TypeError):
243 proactor.sendto(sock, b'abc', addr=bad_address)
244 sock.close()
245
246 def test_client_pipe_stat(self):
247 res = self.loop.run_until_complete(self._test_client_pipe_stat())
248 self.assertEqual(res, 'done')
249
250 async def _test_client_pipe_stat(self):
251 # Regression test for https://github.com/python/cpython/issues/100573
252 ADDRESS = r'\\.\pipe\test_client_pipe_stat-%s' % os.getpid()
253
254 async def probe():
255 # See https://github.com/python/cpython/pull/100959#discussion_r1068533658
256 h = _overlapped.ConnectPipe(ADDRESS)
257 try:
258 _winapi.CloseHandle(_overlapped.ConnectPipe(ADDRESS))
259 except OSError as e:
260 if e.winerror != _overlapped.ERROR_PIPE_BUSY:
261 raise
262 finally:
263 _winapi.CloseHandle(h)
264
265 with self.assertRaises(FileNotFoundError):
266 await probe()
267
268 [server] = await self.loop.start_serving_pipe(asyncio.Protocol, ADDRESS)
269 self.assertIsInstance(server, windows_events.PipeServer)
270
271 errors = []
272 self.loop.set_exception_handler(lambda _, data: errors.append(data))
273
274 for i in range(5):
275 await self.loop.create_task(probe())
276
277 self.assertEqual(len(errors), 0, errors)
278
279 server.close()
280
281 with self.assertRaises(FileNotFoundError):
282 await probe()
283
284 return "done"
285
286
287 class ESC[4;38;5;81mWinPolicyTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
288
289 def test_selector_win_policy(self):
290 async def main():
291 self.assertIsInstance(
292 asyncio.get_running_loop(),
293 asyncio.SelectorEventLoop)
294
295 old_policy = asyncio.get_event_loop_policy()
296 try:
297 asyncio.set_event_loop_policy(
298 asyncio.WindowsSelectorEventLoopPolicy())
299 asyncio.run(main())
300 finally:
301 asyncio.set_event_loop_policy(old_policy)
302
303 def test_proactor_win_policy(self):
304 async def main():
305 self.assertIsInstance(
306 asyncio.get_running_loop(),
307 asyncio.ProactorEventLoop)
308
309 old_policy = asyncio.get_event_loop_policy()
310 try:
311 asyncio.set_event_loop_policy(
312 asyncio.WindowsProactorEventLoopPolicy())
313 asyncio.run(main())
314 finally:
315 asyncio.set_event_loop_policy(old_policy)
316
317
318 if __name__ == '__main__':
319 unittest.main()