python (3.12.0)
1 import os
2 import signal
3 import socket
4 import sys
5 import time
6 import threading
7 import unittest
8 from unittest import mock
9
10 if sys.platform != 'win32':
11 raise unittest.SkipTest('Windows only')
12
13 import _overlapped
14 import _winapi
15
16 import asyncio
17 from asyncio import windows_events
18 from test.test_asyncio import utils as test_utils
19
20
21 def tearDownModule():
22 asyncio.set_event_loop_policy(None)
23
24
25 class ESC[4;38;5;81mUpperProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mProtocol):
26 def __init__(self):
27 self.buf = []
28
29 def connection_made(self, trans):
30 self.trans = trans
31
32 def data_received(self, data):
33 self.buf.append(data)
34 if b'\n' in data:
35 self.trans.write(b''.join(self.buf).upper())
36 self.trans.close()
37
38
39 class ESC[4;38;5;81mProactorLoopCtrlC(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
40
41 def test_ctrl_c(self):
42
43 def SIGINT_after_delay():
44 time.sleep(0.1)
45 signal.raise_signal(signal.SIGINT)
46
47 thread = threading.Thread(target=SIGINT_after_delay)
48 loop = asyncio.new_event_loop()
49 try:
50 # only start the loop once the event loop is running
51 loop.call_soon(thread.start)
52 loop.run_forever()
53 self.fail("should not fall through 'run_forever'")
54 except KeyboardInterrupt:
55 pass
56 finally:
57 self.close_loop(loop)
58 thread.join()
59
60
61 class ESC[4;38;5;81mProactorMultithreading(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
62 def test_run_from_nonmain_thread(self):
63 finished = False
64
65 async def coro():
66 await asyncio.sleep(0)
67
68 def func():
69 nonlocal finished
70 loop = asyncio.new_event_loop()
71 loop.run_until_complete(coro())
72 # close() must not call signal.set_wakeup_fd()
73 loop.close()
74 finished = True
75
76 thread = threading.Thread(target=func)
77 thread.start()
78 thread.join()
79 self.assertTrue(finished)
80
81
82 class ESC[4;38;5;81mProactorTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
83
84 def setUp(self):
85 super().setUp()
86 self.loop = asyncio.ProactorEventLoop()
87 self.set_event_loop(self.loop)
88
89 def test_close(self):
90 a, b = socket.socketpair()
91 trans = self.loop._make_socket_transport(a, asyncio.Protocol())
92 f = asyncio.ensure_future(self.loop.sock_recv(b, 100), loop=self.loop)
93 trans.close()
94 self.loop.run_until_complete(f)
95 self.assertEqual(f.result(), b'')
96 b.close()
97
98 def test_double_bind(self):
99 ADDRESS = r'\\.\pipe\test_double_bind-%s' % os.getpid()
100 server1 = windows_events.PipeServer(ADDRESS)
101 with self.assertRaises(PermissionError):
102 windows_events.PipeServer(ADDRESS)
103 server1.close()
104
105 def test_pipe(self):
106 res = self.loop.run_until_complete(self._test_pipe())
107 self.assertEqual(res, 'done')
108
109 async def _test_pipe(self):
110 ADDRESS = r'\\.\pipe\_test_pipe-%s' % os.getpid()
111
112 with self.assertRaises(FileNotFoundError):
113 await self.loop.create_pipe_connection(
114 asyncio.Protocol, ADDRESS)
115
116 [server] = await self.loop.start_serving_pipe(
117 UpperProto, ADDRESS)
118 self.assertIsInstance(server, windows_events.PipeServer)
119
120 clients = []
121 for i in range(5):
122 stream_reader = asyncio.StreamReader(loop=self.loop)
123 protocol = asyncio.StreamReaderProtocol(stream_reader,
124 loop=self.loop)
125 trans, proto = await self.loop.create_pipe_connection(
126 lambda: protocol, ADDRESS)
127 self.assertIsInstance(trans, asyncio.Transport)
128 self.assertEqual(protocol, proto)
129 clients.append((stream_reader, trans))
130
131 for i, (r, w) in enumerate(clients):
132 w.write('lower-{}\n'.format(i).encode())
133
134 for i, (r, w) in enumerate(clients):
135 response = await r.readline()
136 self.assertEqual(response, 'LOWER-{}\n'.format(i).encode())
137 w.close()
138
139 server.close()
140
141 with self.assertRaises(FileNotFoundError):
142 await self.loop.create_pipe_connection(
143 asyncio.Protocol, ADDRESS)
144
145 return 'done'
146
147 def test_connect_pipe_cancel(self):
148 exc = OSError()
149 exc.winerror = _overlapped.ERROR_PIPE_BUSY
150 with mock.patch.object(_overlapped, 'ConnectPipe',
151 side_effect=exc) as connect:
152 coro = self.loop._proactor.connect_pipe('pipe_address')
153 task = self.loop.create_task(coro)
154
155 # check that it's possible to cancel connect_pipe()
156 task.cancel()
157 with self.assertRaises(asyncio.CancelledError):
158 self.loop.run_until_complete(task)
159
160 def test_wait_for_handle(self):
161 event = _overlapped.CreateEvent(None, True, False, None)
162 self.addCleanup(_winapi.CloseHandle, event)
163
164 # Wait for unset event with 0.5s timeout;
165 # result should be False at timeout
166 fut = self.loop._proactor.wait_for_handle(event, 0.5)
167 start = self.loop.time()
168 done = self.loop.run_until_complete(fut)
169 elapsed = self.loop.time() - start
170
171 self.assertEqual(done, False)
172 self.assertFalse(fut.result())
173 # bpo-31008: Tolerate only 450 ms (at least 500 ms expected),
174 # because of bad clock resolution on Windows
175 self.assertTrue(0.45 <= elapsed <= 0.9, elapsed)
176
177 _overlapped.SetEvent(event)
178
179 # Wait for set event;
180 # result should be True immediately
181 fut = self.loop._proactor.wait_for_handle(event, 10)
182 start = self.loop.time()
183 done = self.loop.run_until_complete(fut)
184 elapsed = self.loop.time() - start
185
186 self.assertEqual(done, True)
187 self.assertTrue(fut.result())
188 self.assertTrue(0 <= elapsed < 0.3, elapsed)
189
190 # asyncio issue #195: cancelling a done _WaitHandleFuture
191 # must not crash
192 fut.cancel()
193
194 def test_wait_for_handle_cancel(self):
195 event = _overlapped.CreateEvent(None, True, False, None)
196 self.addCleanup(_winapi.CloseHandle, event)
197
198 # Wait for unset event with a cancelled future;
199 # CancelledError should be raised immediately
200 fut = self.loop._proactor.wait_for_handle(event, 10)
201 fut.cancel()
202 start = self.loop.time()
203 with self.assertRaises(asyncio.CancelledError):
204 self.loop.run_until_complete(fut)
205 elapsed = self.loop.time() - start
206 self.assertTrue(0 <= elapsed < 0.1, elapsed)
207
208 # asyncio issue #195: cancelling a _WaitHandleFuture twice
209 # must not crash
210 fut = self.loop._proactor.wait_for_handle(event)
211 fut.cancel()
212 fut.cancel()
213
214 def test_read_self_pipe_restart(self):
215 # Regression test for https://bugs.python.org/issue39010
216 # Previously, restarting a proactor event loop in certain states
217 # would lead to spurious ConnectionResetErrors being logged.
218 self.loop.call_exception_handler = mock.Mock()
219 # Start an operation in another thread so that the self-pipe is used.
220 # This is theoretically timing-dependent (the task in the executor
221 # must complete before our start/stop cycles), but in practice it
222 # seems to work every time.
223 f = self.loop.run_in_executor(None, lambda: None)
224 self.loop.stop()
225 self.loop.run_forever()
226 self.loop.stop()
227 self.loop.run_forever()
228
229 # Shut everything down cleanly. This is an important part of the
230 # test - in issue 39010, the error occurred during loop.close(),
231 # so we want to close the loop during the test instead of leaving
232 # it for tearDown.
233 #
234 # First wait for f to complete to avoid a "future's result was never
235 # retrieved" error.
236 self.loop.run_until_complete(f)
237 # Now shut down the loop itself (self.close_loop also shuts down the
238 # loop's default executor).
239 self.close_loop(self.loop)
240 self.assertFalse(self.loop.call_exception_handler.called)
241
242 def test_address_argument_type_error(self):
243 # Regression test for https://github.com/python/cpython/issues/98793
244 proactor = self.loop._proactor
245 sock = socket.socket(type=socket.SOCK_DGRAM)
246 bad_address = None
247 with self.assertRaises(TypeError):
248 proactor.connect(sock, bad_address)
249 with self.assertRaises(TypeError):
250 proactor.sendto(sock, b'abc', addr=bad_address)
251 sock.close()
252
253 def test_client_pipe_stat(self):
254 res = self.loop.run_until_complete(self._test_client_pipe_stat())
255 self.assertEqual(res, 'done')
256
257 async def _test_client_pipe_stat(self):
258 # Regression test for https://github.com/python/cpython/issues/100573
259 ADDRESS = r'\\.\pipe\test_client_pipe_stat-%s' % os.getpid()
260
261 async def probe():
262 # See https://github.com/python/cpython/pull/100959#discussion_r1068533658
263 h = _overlapped.ConnectPipe(ADDRESS)
264 try:
265 _winapi.CloseHandle(_overlapped.ConnectPipe(ADDRESS))
266 except OSError as e:
267 if e.winerror != _overlapped.ERROR_PIPE_BUSY:
268 raise
269 finally:
270 _winapi.CloseHandle(h)
271
272 with self.assertRaises(FileNotFoundError):
273 await probe()
274
275 [server] = await self.loop.start_serving_pipe(asyncio.Protocol, ADDRESS)
276 self.assertIsInstance(server, windows_events.PipeServer)
277
278 errors = []
279 self.loop.set_exception_handler(lambda _, data: errors.append(data))
280
281 for i in range(5):
282 await self.loop.create_task(probe())
283
284 self.assertEqual(len(errors), 0, errors)
285
286 server.close()
287
288 with self.assertRaises(FileNotFoundError):
289 await probe()
290
291 return "done"
292
293
294 class ESC[4;38;5;81mWinPolicyTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
295
296 def test_selector_win_policy(self):
297 async def main():
298 self.assertIsInstance(
299 asyncio.get_running_loop(),
300 asyncio.SelectorEventLoop)
301
302 old_policy = asyncio.get_event_loop_policy()
303 try:
304 asyncio.set_event_loop_policy(
305 asyncio.WindowsSelectorEventLoopPolicy())
306 asyncio.run(main())
307 finally:
308 asyncio.set_event_loop_policy(old_policy)
309
310 def test_proactor_win_policy(self):
311 async def main():
312 self.assertIsInstance(
313 asyncio.get_running_loop(),
314 asyncio.ProactorEventLoop)
315
316 old_policy = asyncio.get_event_loop_policy()
317 try:
318 asyncio.set_event_loop_policy(
319 asyncio.WindowsProactorEventLoopPolicy())
320 asyncio.run(main())
321 finally:
322 asyncio.set_event_loop_policy(old_policy)
323
324
325 if __name__ == '__main__':
326 unittest.main()