python (3.12.0)
1 """Tests for unix_events.py."""
2
3 import contextlib
4 import errno
5 import io
6 import multiprocessing
7 import os
8 import pathlib
9 import signal
10 import socket
11 import stat
12 import sys
13 import threading
14 import unittest
15 from unittest import mock
16 import warnings
17 from test.support import os_helper
18 from test.support import socket_helper
19 from test.support import wait_process
20 from test.support import hashlib_helper
21
22 if sys.platform == 'win32':
23 raise unittest.SkipTest('UNIX only')
24
25
26 import asyncio
27 from asyncio import log
28 from asyncio import unix_events
29 from test.test_asyncio import utils as test_utils
30
31
32 def tearDownModule():
33 asyncio.set_event_loop_policy(None)
34
35
36 MOCK_ANY = mock.ANY
37
38
39 def EXITCODE(exitcode):
40 return 32768 + exitcode
41
42
43 def SIGNAL(signum):
44 if not 1 <= signum <= 68:
45 raise AssertionError(f'invalid signum {signum}')
46 return 32768 - signum
47
48
49 def close_pipe_transport(transport):
50 # Don't call transport.close() because the event loop and the selector
51 # are mocked
52 if transport._pipe is None:
53 return
54 transport._pipe.close()
55 transport._pipe = None
56
57
58 @unittest.skipUnless(signal, 'Signals are not supported')
59 class ESC[4;38;5;81mSelectorEventLoopSignalTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
60
61 def setUp(self):
62 super().setUp()
63 self.loop = asyncio.SelectorEventLoop()
64 self.set_event_loop(self.loop)
65
66 def test_check_signal(self):
67 self.assertRaises(
68 TypeError, self.loop._check_signal, '1')
69 self.assertRaises(
70 ValueError, self.loop._check_signal, signal.NSIG + 1)
71
72 def test_handle_signal_no_handler(self):
73 self.loop._handle_signal(signal.NSIG + 1)
74
75 def test_handle_signal_cancelled_handler(self):
76 h = asyncio.Handle(mock.Mock(), (),
77 loop=mock.Mock())
78 h.cancel()
79 self.loop._signal_handlers[signal.NSIG + 1] = h
80 self.loop.remove_signal_handler = mock.Mock()
81 self.loop._handle_signal(signal.NSIG + 1)
82 self.loop.remove_signal_handler.assert_called_with(signal.NSIG + 1)
83
84 @mock.patch('asyncio.unix_events.signal')
85 def test_add_signal_handler_setup_error(self, m_signal):
86 m_signal.NSIG = signal.NSIG
87 m_signal.valid_signals = signal.valid_signals
88 m_signal.set_wakeup_fd.side_effect = ValueError
89
90 self.assertRaises(
91 RuntimeError,
92 self.loop.add_signal_handler,
93 signal.SIGINT, lambda: True)
94
95 @mock.patch('asyncio.unix_events.signal')
96 def test_add_signal_handler_coroutine_error(self, m_signal):
97 m_signal.NSIG = signal.NSIG
98
99 async def simple_coroutine():
100 pass
101
102 # callback must not be a coroutine function
103 coro_func = simple_coroutine
104 coro_obj = coro_func()
105 self.addCleanup(coro_obj.close)
106 for func in (coro_func, coro_obj):
107 self.assertRaisesRegex(
108 TypeError, 'coroutines cannot be used with add_signal_handler',
109 self.loop.add_signal_handler,
110 signal.SIGINT, func)
111
112 @mock.patch('asyncio.unix_events.signal')
113 def test_add_signal_handler(self, m_signal):
114 m_signal.NSIG = signal.NSIG
115 m_signal.valid_signals = signal.valid_signals
116
117 cb = lambda: True
118 self.loop.add_signal_handler(signal.SIGHUP, cb)
119 h = self.loop._signal_handlers.get(signal.SIGHUP)
120 self.assertIsInstance(h, asyncio.Handle)
121 self.assertEqual(h._callback, cb)
122
123 @mock.patch('asyncio.unix_events.signal')
124 def test_add_signal_handler_install_error(self, m_signal):
125 m_signal.NSIG = signal.NSIG
126 m_signal.valid_signals = signal.valid_signals
127
128 def set_wakeup_fd(fd):
129 if fd == -1:
130 raise ValueError()
131 m_signal.set_wakeup_fd = set_wakeup_fd
132
133 class ESC[4;38;5;81mErr(ESC[4;38;5;149mOSError):
134 errno = errno.EFAULT
135 m_signal.signal.side_effect = Err
136
137 self.assertRaises(
138 Err,
139 self.loop.add_signal_handler,
140 signal.SIGINT, lambda: True)
141
142 @mock.patch('asyncio.unix_events.signal')
143 @mock.patch('asyncio.base_events.logger')
144 def test_add_signal_handler_install_error2(self, m_logging, m_signal):
145 m_signal.NSIG = signal.NSIG
146 m_signal.valid_signals = signal.valid_signals
147
148 class ESC[4;38;5;81mErr(ESC[4;38;5;149mOSError):
149 errno = errno.EINVAL
150 m_signal.signal.side_effect = Err
151
152 self.loop._signal_handlers[signal.SIGHUP] = lambda: True
153 self.assertRaises(
154 RuntimeError,
155 self.loop.add_signal_handler,
156 signal.SIGINT, lambda: True)
157 self.assertFalse(m_logging.info.called)
158 self.assertEqual(1, m_signal.set_wakeup_fd.call_count)
159
160 @mock.patch('asyncio.unix_events.signal')
161 @mock.patch('asyncio.base_events.logger')
162 def test_add_signal_handler_install_error3(self, m_logging, m_signal):
163 class ESC[4;38;5;81mErr(ESC[4;38;5;149mOSError):
164 errno = errno.EINVAL
165 m_signal.signal.side_effect = Err
166 m_signal.NSIG = signal.NSIG
167 m_signal.valid_signals = signal.valid_signals
168
169 self.assertRaises(
170 RuntimeError,
171 self.loop.add_signal_handler,
172 signal.SIGINT, lambda: True)
173 self.assertFalse(m_logging.info.called)
174 self.assertEqual(2, m_signal.set_wakeup_fd.call_count)
175
176 @mock.patch('asyncio.unix_events.signal')
177 def test_remove_signal_handler(self, m_signal):
178 m_signal.NSIG = signal.NSIG
179 m_signal.valid_signals = signal.valid_signals
180
181 self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
182
183 self.assertTrue(
184 self.loop.remove_signal_handler(signal.SIGHUP))
185 self.assertTrue(m_signal.set_wakeup_fd.called)
186 self.assertTrue(m_signal.signal.called)
187 self.assertEqual(
188 (signal.SIGHUP, m_signal.SIG_DFL), m_signal.signal.call_args[0])
189
190 @mock.patch('asyncio.unix_events.signal')
191 def test_remove_signal_handler_2(self, m_signal):
192 m_signal.NSIG = signal.NSIG
193 m_signal.SIGINT = signal.SIGINT
194 m_signal.valid_signals = signal.valid_signals
195
196 self.loop.add_signal_handler(signal.SIGINT, lambda: True)
197 self.loop._signal_handlers[signal.SIGHUP] = object()
198 m_signal.set_wakeup_fd.reset_mock()
199
200 self.assertTrue(
201 self.loop.remove_signal_handler(signal.SIGINT))
202 self.assertFalse(m_signal.set_wakeup_fd.called)
203 self.assertTrue(m_signal.signal.called)
204 self.assertEqual(
205 (signal.SIGINT, m_signal.default_int_handler),
206 m_signal.signal.call_args[0])
207
208 @mock.patch('asyncio.unix_events.signal')
209 @mock.patch('asyncio.base_events.logger')
210 def test_remove_signal_handler_cleanup_error(self, m_logging, m_signal):
211 m_signal.NSIG = signal.NSIG
212 m_signal.valid_signals = signal.valid_signals
213 self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
214
215 m_signal.set_wakeup_fd.side_effect = ValueError
216
217 self.loop.remove_signal_handler(signal.SIGHUP)
218 self.assertTrue(m_logging.info)
219
220 @mock.patch('asyncio.unix_events.signal')
221 def test_remove_signal_handler_error(self, m_signal):
222 m_signal.NSIG = signal.NSIG
223 m_signal.valid_signals = signal.valid_signals
224 self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
225
226 m_signal.signal.side_effect = OSError
227
228 self.assertRaises(
229 OSError, self.loop.remove_signal_handler, signal.SIGHUP)
230
231 @mock.patch('asyncio.unix_events.signal')
232 def test_remove_signal_handler_error2(self, m_signal):
233 m_signal.NSIG = signal.NSIG
234 m_signal.valid_signals = signal.valid_signals
235 self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
236
237 class ESC[4;38;5;81mErr(ESC[4;38;5;149mOSError):
238 errno = errno.EINVAL
239 m_signal.signal.side_effect = Err
240
241 self.assertRaises(
242 RuntimeError, self.loop.remove_signal_handler, signal.SIGHUP)
243
244 @mock.patch('asyncio.unix_events.signal')
245 def test_close(self, m_signal):
246 m_signal.NSIG = signal.NSIG
247 m_signal.valid_signals = signal.valid_signals
248
249 self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
250 self.loop.add_signal_handler(signal.SIGCHLD, lambda: True)
251
252 self.assertEqual(len(self.loop._signal_handlers), 2)
253
254 m_signal.set_wakeup_fd.reset_mock()
255
256 self.loop.close()
257
258 self.assertEqual(len(self.loop._signal_handlers), 0)
259 m_signal.set_wakeup_fd.assert_called_once_with(-1)
260
261 @mock.patch('asyncio.unix_events.sys')
262 @mock.patch('asyncio.unix_events.signal')
263 def test_close_on_finalizing(self, m_signal, m_sys):
264 m_signal.NSIG = signal.NSIG
265 m_signal.valid_signals = signal.valid_signals
266 self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
267
268 self.assertEqual(len(self.loop._signal_handlers), 1)
269 m_sys.is_finalizing.return_value = True
270 m_signal.signal.reset_mock()
271
272 with self.assertWarnsRegex(ResourceWarning,
273 "skipping signal handlers removal"):
274 self.loop.close()
275
276 self.assertEqual(len(self.loop._signal_handlers), 0)
277 self.assertFalse(m_signal.signal.called)
278
279
280 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'),
281 'UNIX Sockets are not supported')
282 class ESC[4;38;5;81mSelectorEventLoopUnixSocketTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
283
284 def setUp(self):
285 super().setUp()
286 self.loop = asyncio.SelectorEventLoop()
287 self.set_event_loop(self.loop)
288
289 @socket_helper.skip_unless_bind_unix_socket
290 def test_create_unix_server_existing_path_sock(self):
291 with test_utils.unix_socket_path() as path:
292 sock = socket.socket(socket.AF_UNIX)
293 sock.bind(path)
294 sock.listen(1)
295 sock.close()
296
297 coro = self.loop.create_unix_server(lambda: None, path)
298 srv = self.loop.run_until_complete(coro)
299 srv.close()
300 self.loop.run_until_complete(srv.wait_closed())
301
302 @socket_helper.skip_unless_bind_unix_socket
303 def test_create_unix_server_pathlib(self):
304 with test_utils.unix_socket_path() as path:
305 path = pathlib.Path(path)
306 srv_coro = self.loop.create_unix_server(lambda: None, path)
307 srv = self.loop.run_until_complete(srv_coro)
308 srv.close()
309 self.loop.run_until_complete(srv.wait_closed())
310
311 def test_create_unix_connection_pathlib(self):
312 with test_utils.unix_socket_path() as path:
313 path = pathlib.Path(path)
314 coro = self.loop.create_unix_connection(lambda: None, path)
315 with self.assertRaises(FileNotFoundError):
316 # If pathlib.Path wasn't supported, the exception would be
317 # different.
318 self.loop.run_until_complete(coro)
319
320 def test_create_unix_server_existing_path_nonsock(self):
321 path = test_utils.gen_unix_socket_path()
322 self.addCleanup(os_helper.unlink, path)
323 # create the file
324 open(path, "wb").close()
325
326 coro = self.loop.create_unix_server(lambda: None, path)
327 with self.assertRaisesRegex(OSError,
328 'Address.*is already in use'):
329 self.loop.run_until_complete(coro)
330
331 def test_create_unix_server_ssl_bool(self):
332 coro = self.loop.create_unix_server(lambda: None, path='spam',
333 ssl=True)
334 with self.assertRaisesRegex(TypeError,
335 'ssl argument must be an SSLContext'):
336 self.loop.run_until_complete(coro)
337
338 def test_create_unix_server_nopath_nosock(self):
339 coro = self.loop.create_unix_server(lambda: None, path=None)
340 with self.assertRaisesRegex(ValueError,
341 'path was not specified, and no sock'):
342 self.loop.run_until_complete(coro)
343
344 def test_create_unix_server_path_inetsock(self):
345 sock = socket.socket()
346 with sock:
347 coro = self.loop.create_unix_server(lambda: None, path=None,
348 sock=sock)
349 with self.assertRaisesRegex(ValueError,
350 'A UNIX Domain Stream.*was expected'):
351 self.loop.run_until_complete(coro)
352
353 def test_create_unix_server_path_dgram(self):
354 sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
355 with sock:
356 coro = self.loop.create_unix_server(lambda: None, path=None,
357 sock=sock)
358 with self.assertRaisesRegex(ValueError,
359 'A UNIX Domain Stream.*was expected'):
360 self.loop.run_until_complete(coro)
361
362 @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
363 'no socket.SOCK_NONBLOCK (linux only)')
364 @socket_helper.skip_unless_bind_unix_socket
365 def test_create_unix_server_path_stream_bittype(self):
366 fn = test_utils.gen_unix_socket_path()
367 self.addCleanup(os_helper.unlink, fn)
368
369 sock = socket.socket(socket.AF_UNIX,
370 socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
371 with sock:
372 sock.bind(fn)
373 coro = self.loop.create_unix_server(lambda: None, path=None,
374 sock=sock)
375 srv = self.loop.run_until_complete(coro)
376 srv.close()
377 self.loop.run_until_complete(srv.wait_closed())
378
379 def test_create_unix_server_ssl_timeout_with_plain_sock(self):
380 coro = self.loop.create_unix_server(lambda: None, path='spam',
381 ssl_handshake_timeout=1)
382 with self.assertRaisesRegex(
383 ValueError,
384 'ssl_handshake_timeout is only meaningful with ssl'):
385 self.loop.run_until_complete(coro)
386
387 def test_create_unix_connection_path_inetsock(self):
388 sock = socket.socket()
389 with sock:
390 coro = self.loop.create_unix_connection(lambda: None,
391 sock=sock)
392 with self.assertRaisesRegex(ValueError,
393 'A UNIX Domain Stream.*was expected'):
394 self.loop.run_until_complete(coro)
395
396 @mock.patch('asyncio.unix_events.socket')
397 def test_create_unix_server_bind_error(self, m_socket):
398 # Ensure that the socket is closed on any bind error
399 sock = mock.Mock()
400 m_socket.socket.return_value = sock
401
402 sock.bind.side_effect = OSError
403 coro = self.loop.create_unix_server(lambda: None, path="/test")
404 with self.assertRaises(OSError):
405 self.loop.run_until_complete(coro)
406 self.assertTrue(sock.close.called)
407
408 sock.bind.side_effect = MemoryError
409 coro = self.loop.create_unix_server(lambda: None, path="/test")
410 with self.assertRaises(MemoryError):
411 self.loop.run_until_complete(coro)
412 self.assertTrue(sock.close.called)
413
414 def test_create_unix_connection_path_sock(self):
415 coro = self.loop.create_unix_connection(
416 lambda: None, os.devnull, sock=object())
417 with self.assertRaisesRegex(ValueError, 'path and sock can not be'):
418 self.loop.run_until_complete(coro)
419
420 def test_create_unix_connection_nopath_nosock(self):
421 coro = self.loop.create_unix_connection(
422 lambda: None, None)
423 with self.assertRaisesRegex(ValueError,
424 'no path and sock were specified'):
425 self.loop.run_until_complete(coro)
426
427 def test_create_unix_connection_nossl_serverhost(self):
428 coro = self.loop.create_unix_connection(
429 lambda: None, os.devnull, server_hostname='spam')
430 with self.assertRaisesRegex(ValueError,
431 'server_hostname is only meaningful'):
432 self.loop.run_until_complete(coro)
433
434 def test_create_unix_connection_ssl_noserverhost(self):
435 coro = self.loop.create_unix_connection(
436 lambda: None, os.devnull, ssl=True)
437
438 with self.assertRaisesRegex(
439 ValueError, 'you have to pass server_hostname when using ssl'):
440
441 self.loop.run_until_complete(coro)
442
443 def test_create_unix_connection_ssl_timeout_with_plain_sock(self):
444 coro = self.loop.create_unix_connection(lambda: None, path='spam',
445 ssl_handshake_timeout=1)
446 with self.assertRaisesRegex(
447 ValueError,
448 'ssl_handshake_timeout is only meaningful with ssl'):
449 self.loop.run_until_complete(coro)
450
451
452 @unittest.skipUnless(hasattr(os, 'sendfile'),
453 'sendfile is not supported')
454 class ESC[4;38;5;81mSelectorEventLoopUnixSockSendfileTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
455 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
456
457 class ESC[4;38;5;81mMyProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mProtocol):
458
459 def __init__(self, loop):
460 self.started = False
461 self.closed = False
462 self.data = bytearray()
463 self.fut = loop.create_future()
464 self.transport = None
465 self._ready = loop.create_future()
466
467 def connection_made(self, transport):
468 self.started = True
469 self.transport = transport
470 self._ready.set_result(None)
471
472 def data_received(self, data):
473 self.data.extend(data)
474
475 def connection_lost(self, exc):
476 self.closed = True
477 self.fut.set_result(None)
478
479 async def wait_closed(self):
480 await self.fut
481
482 @classmethod
483 def setUpClass(cls):
484 with open(os_helper.TESTFN, 'wb') as fp:
485 fp.write(cls.DATA)
486 super().setUpClass()
487
488 @classmethod
489 def tearDownClass(cls):
490 os_helper.unlink(os_helper.TESTFN)
491 super().tearDownClass()
492
493 def setUp(self):
494 self.loop = asyncio.new_event_loop()
495 self.set_event_loop(self.loop)
496 self.file = open(os_helper.TESTFN, 'rb')
497 self.addCleanup(self.file.close)
498 super().setUp()
499
500 def make_socket(self, cleanup=True):
501 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
502 sock.setblocking(False)
503 sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
504 sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
505 if cleanup:
506 self.addCleanup(sock.close)
507 return sock
508
509 def run_loop(self, coro):
510 return self.loop.run_until_complete(coro)
511
512 def prepare(self):
513 sock = self.make_socket()
514 proto = self.MyProto(self.loop)
515 port = socket_helper.find_unused_port()
516 srv_sock = self.make_socket(cleanup=False)
517 srv_sock.bind((socket_helper.HOST, port))
518 server = self.run_loop(self.loop.create_server(
519 lambda: proto, sock=srv_sock))
520 self.run_loop(self.loop.sock_connect(sock, (socket_helper.HOST, port)))
521 self.run_loop(proto._ready)
522
523 def cleanup():
524 proto.transport.close()
525 self.run_loop(proto.wait_closed())
526
527 server.close()
528 self.run_loop(server.wait_closed())
529
530 self.addCleanup(cleanup)
531
532 return sock, proto
533
534 def test_sock_sendfile_not_available(self):
535 sock, proto = self.prepare()
536 with mock.patch('asyncio.unix_events.os', spec=[]):
537 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
538 "os[.]sendfile[(][)] is not available"):
539 self.run_loop(self.loop._sock_sendfile_native(sock, self.file,
540 0, None))
541 self.assertEqual(self.file.tell(), 0)
542
543 def test_sock_sendfile_not_a_file(self):
544 sock, proto = self.prepare()
545 f = object()
546 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
547 "not a regular file"):
548 self.run_loop(self.loop._sock_sendfile_native(sock, f,
549 0, None))
550 self.assertEqual(self.file.tell(), 0)
551
552 def test_sock_sendfile_iobuffer(self):
553 sock, proto = self.prepare()
554 f = io.BytesIO()
555 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
556 "not a regular file"):
557 self.run_loop(self.loop._sock_sendfile_native(sock, f,
558 0, None))
559 self.assertEqual(self.file.tell(), 0)
560
561 def test_sock_sendfile_not_regular_file(self):
562 sock, proto = self.prepare()
563 f = mock.Mock()
564 f.fileno.return_value = -1
565 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
566 "not a regular file"):
567 self.run_loop(self.loop._sock_sendfile_native(sock, f,
568 0, None))
569 self.assertEqual(self.file.tell(), 0)
570
571 def test_sock_sendfile_cancel1(self):
572 sock, proto = self.prepare()
573
574 fut = self.loop.create_future()
575 fileno = self.file.fileno()
576 self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
577 0, None, len(self.DATA), 0)
578 fut.cancel()
579 with contextlib.suppress(asyncio.CancelledError):
580 self.run_loop(fut)
581 with self.assertRaises(KeyError):
582 self.loop._selector.get_key(sock)
583
584 def test_sock_sendfile_cancel2(self):
585 sock, proto = self.prepare()
586
587 fut = self.loop.create_future()
588 fileno = self.file.fileno()
589 self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
590 0, None, len(self.DATA), 0)
591 fut.cancel()
592 self.loop._sock_sendfile_native_impl(fut, sock.fileno(), sock, fileno,
593 0, None, len(self.DATA), 0)
594 with self.assertRaises(KeyError):
595 self.loop._selector.get_key(sock)
596
597 def test_sock_sendfile_blocking_error(self):
598 sock, proto = self.prepare()
599
600 fileno = self.file.fileno()
601 fut = mock.Mock()
602 fut.cancelled.return_value = False
603 with mock.patch('os.sendfile', side_effect=BlockingIOError()):
604 self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
605 0, None, len(self.DATA), 0)
606 key = self.loop._selector.get_key(sock)
607 self.assertIsNotNone(key)
608 fut.add_done_callback.assert_called_once_with(mock.ANY)
609
610 def test_sock_sendfile_os_error_first_call(self):
611 sock, proto = self.prepare()
612
613 fileno = self.file.fileno()
614 fut = self.loop.create_future()
615 with mock.patch('os.sendfile', side_effect=OSError()):
616 self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
617 0, None, len(self.DATA), 0)
618 with self.assertRaises(KeyError):
619 self.loop._selector.get_key(sock)
620 exc = fut.exception()
621 self.assertIsInstance(exc, asyncio.SendfileNotAvailableError)
622 self.assertEqual(0, self.file.tell())
623
624 def test_sock_sendfile_os_error_next_call(self):
625 sock, proto = self.prepare()
626
627 fileno = self.file.fileno()
628 fut = self.loop.create_future()
629 err = OSError()
630 with mock.patch('os.sendfile', side_effect=err):
631 self.loop._sock_sendfile_native_impl(fut, sock.fileno(),
632 sock, fileno,
633 1000, None, len(self.DATA),
634 1000)
635 with self.assertRaises(KeyError):
636 self.loop._selector.get_key(sock)
637 exc = fut.exception()
638 self.assertIs(exc, err)
639 self.assertEqual(1000, self.file.tell())
640
641 def test_sock_sendfile_exception(self):
642 sock, proto = self.prepare()
643
644 fileno = self.file.fileno()
645 fut = self.loop.create_future()
646 err = asyncio.SendfileNotAvailableError()
647 with mock.patch('os.sendfile', side_effect=err):
648 self.loop._sock_sendfile_native_impl(fut, sock.fileno(),
649 sock, fileno,
650 1000, None, len(self.DATA),
651 1000)
652 with self.assertRaises(KeyError):
653 self.loop._selector.get_key(sock)
654 exc = fut.exception()
655 self.assertIs(exc, err)
656 self.assertEqual(1000, self.file.tell())
657
658
659 class ESC[4;38;5;81mUnixReadPipeTransportTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
660
661 def setUp(self):
662 super().setUp()
663 self.loop = self.new_test_loop()
664 self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
665 self.pipe = mock.Mock(spec_set=io.RawIOBase)
666 self.pipe.fileno.return_value = 5
667
668 blocking_patcher = mock.patch('os.set_blocking')
669 blocking_patcher.start()
670 self.addCleanup(blocking_patcher.stop)
671
672 fstat_patcher = mock.patch('os.fstat')
673 m_fstat = fstat_patcher.start()
674 st = mock.Mock()
675 st.st_mode = stat.S_IFIFO
676 m_fstat.return_value = st
677 self.addCleanup(fstat_patcher.stop)
678
679 def read_pipe_transport(self, waiter=None):
680 transport = unix_events._UnixReadPipeTransport(self.loop, self.pipe,
681 self.protocol,
682 waiter=waiter)
683 self.addCleanup(close_pipe_transport, transport)
684 return transport
685
686 def test_ctor(self):
687 waiter = self.loop.create_future()
688 tr = self.read_pipe_transport(waiter=waiter)
689 self.loop.run_until_complete(waiter)
690
691 self.protocol.connection_made.assert_called_with(tr)
692 self.loop.assert_reader(5, tr._read_ready)
693 self.assertIsNone(waiter.result())
694
695 @mock.patch('os.read')
696 def test__read_ready(self, m_read):
697 tr = self.read_pipe_transport()
698 m_read.return_value = b'data'
699 tr._read_ready()
700
701 m_read.assert_called_with(5, tr.max_size)
702 self.protocol.data_received.assert_called_with(b'data')
703
704 @mock.patch('os.read')
705 def test__read_ready_eof(self, m_read):
706 tr = self.read_pipe_transport()
707 m_read.return_value = b''
708 tr._read_ready()
709
710 m_read.assert_called_with(5, tr.max_size)
711 self.assertFalse(self.loop.readers)
712 test_utils.run_briefly(self.loop)
713 self.protocol.eof_received.assert_called_with()
714 self.protocol.connection_lost.assert_called_with(None)
715
716 @mock.patch('os.read')
717 def test__read_ready_blocked(self, m_read):
718 tr = self.read_pipe_transport()
719 m_read.side_effect = BlockingIOError
720 tr._read_ready()
721
722 m_read.assert_called_with(5, tr.max_size)
723 test_utils.run_briefly(self.loop)
724 self.assertFalse(self.protocol.data_received.called)
725
726 @mock.patch('asyncio.log.logger.error')
727 @mock.patch('os.read')
728 def test__read_ready_error(self, m_read, m_logexc):
729 tr = self.read_pipe_transport()
730 err = OSError()
731 m_read.side_effect = err
732 tr._close = mock.Mock()
733 tr._read_ready()
734
735 m_read.assert_called_with(5, tr.max_size)
736 tr._close.assert_called_with(err)
737 m_logexc.assert_called_with(
738 test_utils.MockPattern(
739 'Fatal read error on pipe transport'
740 '\nprotocol:.*\ntransport:.*'),
741 exc_info=(OSError, MOCK_ANY, MOCK_ANY))
742
743 @mock.patch('os.read')
744 def test_pause_reading(self, m_read):
745 tr = self.read_pipe_transport()
746 m = mock.Mock()
747 self.loop.add_reader(5, m)
748 tr.pause_reading()
749 self.assertFalse(self.loop.readers)
750
751 @mock.patch('os.read')
752 def test_resume_reading(self, m_read):
753 tr = self.read_pipe_transport()
754 tr.pause_reading()
755 tr.resume_reading()
756 self.loop.assert_reader(5, tr._read_ready)
757
758 @mock.patch('os.read')
759 def test_close(self, m_read):
760 tr = self.read_pipe_transport()
761 tr._close = mock.Mock()
762 tr.close()
763 tr._close.assert_called_with(None)
764
765 @mock.patch('os.read')
766 def test_close_already_closing(self, m_read):
767 tr = self.read_pipe_transport()
768 tr._closing = True
769 tr._close = mock.Mock()
770 tr.close()
771 self.assertFalse(tr._close.called)
772
773 @mock.patch('os.read')
774 def test__close(self, m_read):
775 tr = self.read_pipe_transport()
776 err = object()
777 tr._close(err)
778 self.assertTrue(tr.is_closing())
779 self.assertFalse(self.loop.readers)
780 test_utils.run_briefly(self.loop)
781 self.protocol.connection_lost.assert_called_with(err)
782
783 def test__call_connection_lost(self):
784 tr = self.read_pipe_transport()
785 self.assertIsNotNone(tr._protocol)
786 self.assertIsNotNone(tr._loop)
787
788 err = None
789 tr._call_connection_lost(err)
790 self.protocol.connection_lost.assert_called_with(err)
791 self.pipe.close.assert_called_with()
792
793 self.assertIsNone(tr._protocol)
794 self.assertIsNone(tr._loop)
795
796 def test__call_connection_lost_with_err(self):
797 tr = self.read_pipe_transport()
798 self.assertIsNotNone(tr._protocol)
799 self.assertIsNotNone(tr._loop)
800
801 err = OSError()
802 tr._call_connection_lost(err)
803 self.protocol.connection_lost.assert_called_with(err)
804 self.pipe.close.assert_called_with()
805
806 self.assertIsNone(tr._protocol)
807 self.assertIsNone(tr._loop)
808
809 def test_pause_reading_on_closed_pipe(self):
810 tr = self.read_pipe_transport()
811 tr.close()
812 test_utils.run_briefly(self.loop)
813 self.assertIsNone(tr._loop)
814 tr.pause_reading()
815
816 def test_pause_reading_on_paused_pipe(self):
817 tr = self.read_pipe_transport()
818 tr.pause_reading()
819 # the second call should do nothing
820 tr.pause_reading()
821
822 def test_resume_reading_on_closed_pipe(self):
823 tr = self.read_pipe_transport()
824 tr.close()
825 test_utils.run_briefly(self.loop)
826 self.assertIsNone(tr._loop)
827 tr.resume_reading()
828
829 def test_resume_reading_on_paused_pipe(self):
830 tr = self.read_pipe_transport()
831 # the pipe is not paused
832 # resuming should do nothing
833 tr.resume_reading()
834
835
836 class ESC[4;38;5;81mUnixWritePipeTransportTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
837
838 def setUp(self):
839 super().setUp()
840 self.loop = self.new_test_loop()
841 self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
842 self.pipe = mock.Mock(spec_set=io.RawIOBase)
843 self.pipe.fileno.return_value = 5
844
845 blocking_patcher = mock.patch('os.set_blocking')
846 blocking_patcher.start()
847 self.addCleanup(blocking_patcher.stop)
848
849 fstat_patcher = mock.patch('os.fstat')
850 m_fstat = fstat_patcher.start()
851 st = mock.Mock()
852 st.st_mode = stat.S_IFSOCK
853 m_fstat.return_value = st
854 self.addCleanup(fstat_patcher.stop)
855
856 def write_pipe_transport(self, waiter=None):
857 transport = unix_events._UnixWritePipeTransport(self.loop, self.pipe,
858 self.protocol,
859 waiter=waiter)
860 self.addCleanup(close_pipe_transport, transport)
861 return transport
862
863 def test_ctor(self):
864 waiter = self.loop.create_future()
865 tr = self.write_pipe_transport(waiter=waiter)
866 self.loop.run_until_complete(waiter)
867
868 self.protocol.connection_made.assert_called_with(tr)
869 self.loop.assert_reader(5, tr._read_ready)
870 self.assertEqual(None, waiter.result())
871
872 def test_can_write_eof(self):
873 tr = self.write_pipe_transport()
874 self.assertTrue(tr.can_write_eof())
875
876 @mock.patch('os.write')
877 def test_write(self, m_write):
878 tr = self.write_pipe_transport()
879 m_write.return_value = 4
880 tr.write(b'data')
881 m_write.assert_called_with(5, b'data')
882 self.assertFalse(self.loop.writers)
883 self.assertEqual(bytearray(), tr._buffer)
884
885 @mock.patch('os.write')
886 def test_write_no_data(self, m_write):
887 tr = self.write_pipe_transport()
888 tr.write(b'')
889 self.assertFalse(m_write.called)
890 self.assertFalse(self.loop.writers)
891 self.assertEqual(bytearray(b''), tr._buffer)
892
893 @mock.patch('os.write')
894 def test_write_partial(self, m_write):
895 tr = self.write_pipe_transport()
896 m_write.return_value = 2
897 tr.write(b'data')
898 self.loop.assert_writer(5, tr._write_ready)
899 self.assertEqual(bytearray(b'ta'), tr._buffer)
900
901 @mock.patch('os.write')
902 def test_write_buffer(self, m_write):
903 tr = self.write_pipe_transport()
904 self.loop.add_writer(5, tr._write_ready)
905 tr._buffer = bytearray(b'previous')
906 tr.write(b'data')
907 self.assertFalse(m_write.called)
908 self.loop.assert_writer(5, tr._write_ready)
909 self.assertEqual(bytearray(b'previousdata'), tr._buffer)
910
911 @mock.patch('os.write')
912 def test_write_again(self, m_write):
913 tr = self.write_pipe_transport()
914 m_write.side_effect = BlockingIOError()
915 tr.write(b'data')
916 m_write.assert_called_with(5, bytearray(b'data'))
917 self.loop.assert_writer(5, tr._write_ready)
918 self.assertEqual(bytearray(b'data'), tr._buffer)
919
920 @mock.patch('asyncio.unix_events.logger')
921 @mock.patch('os.write')
922 def test_write_err(self, m_write, m_log):
923 tr = self.write_pipe_transport()
924 err = OSError()
925 m_write.side_effect = err
926 tr._fatal_error = mock.Mock()
927 tr.write(b'data')
928 m_write.assert_called_with(5, b'data')
929 self.assertFalse(self.loop.writers)
930 self.assertEqual(bytearray(), tr._buffer)
931 tr._fatal_error.assert_called_with(
932 err,
933 'Fatal write error on pipe transport')
934 self.assertEqual(1, tr._conn_lost)
935
936 tr.write(b'data')
937 self.assertEqual(2, tr._conn_lost)
938 tr.write(b'data')
939 tr.write(b'data')
940 tr.write(b'data')
941 tr.write(b'data')
942 # This is a bit overspecified. :-(
943 m_log.warning.assert_called_with(
944 'pipe closed by peer or os.write(pipe, data) raised exception.')
945 tr.close()
946
947 @mock.patch('os.write')
948 def test_write_close(self, m_write):
949 tr = self.write_pipe_transport()
950 tr._read_ready() # pipe was closed by peer
951
952 tr.write(b'data')
953 self.assertEqual(tr._conn_lost, 1)
954 tr.write(b'data')
955 self.assertEqual(tr._conn_lost, 2)
956
957 def test__read_ready(self):
958 tr = self.write_pipe_transport()
959 tr._read_ready()
960 self.assertFalse(self.loop.readers)
961 self.assertFalse(self.loop.writers)
962 self.assertTrue(tr.is_closing())
963 test_utils.run_briefly(self.loop)
964 self.protocol.connection_lost.assert_called_with(None)
965
966 @mock.patch('os.write')
967 def test__write_ready(self, m_write):
968 tr = self.write_pipe_transport()
969 self.loop.add_writer(5, tr._write_ready)
970 tr._buffer = bytearray(b'data')
971 m_write.return_value = 4
972 tr._write_ready()
973 self.assertFalse(self.loop.writers)
974 self.assertEqual(bytearray(), tr._buffer)
975
976 @mock.patch('os.write')
977 def test__write_ready_partial(self, m_write):
978 tr = self.write_pipe_transport()
979 self.loop.add_writer(5, tr._write_ready)
980 tr._buffer = bytearray(b'data')
981 m_write.return_value = 3
982 tr._write_ready()
983 self.loop.assert_writer(5, tr._write_ready)
984 self.assertEqual(bytearray(b'a'), tr._buffer)
985
986 @mock.patch('os.write')
987 def test__write_ready_again(self, m_write):
988 tr = self.write_pipe_transport()
989 self.loop.add_writer(5, tr._write_ready)
990 tr._buffer = bytearray(b'data')
991 m_write.side_effect = BlockingIOError()
992 tr._write_ready()
993 m_write.assert_called_with(5, bytearray(b'data'))
994 self.loop.assert_writer(5, tr._write_ready)
995 self.assertEqual(bytearray(b'data'), tr._buffer)
996
997 @mock.patch('os.write')
998 def test__write_ready_empty(self, m_write):
999 tr = self.write_pipe_transport()
1000 self.loop.add_writer(5, tr._write_ready)
1001 tr._buffer = bytearray(b'data')
1002 m_write.return_value = 0
1003 tr._write_ready()
1004 m_write.assert_called_with(5, bytearray(b'data'))
1005 self.loop.assert_writer(5, tr._write_ready)
1006 self.assertEqual(bytearray(b'data'), tr._buffer)
1007
1008 @mock.patch('asyncio.log.logger.error')
1009 @mock.patch('os.write')
1010 def test__write_ready_err(self, m_write, m_logexc):
1011 tr = self.write_pipe_transport()
1012 self.loop.add_writer(5, tr._write_ready)
1013 tr._buffer = bytearray(b'data')
1014 m_write.side_effect = err = OSError()
1015 tr._write_ready()
1016 self.assertFalse(self.loop.writers)
1017 self.assertFalse(self.loop.readers)
1018 self.assertEqual(bytearray(), tr._buffer)
1019 self.assertTrue(tr.is_closing())
1020 m_logexc.assert_not_called()
1021 self.assertEqual(1, tr._conn_lost)
1022 test_utils.run_briefly(self.loop)
1023 self.protocol.connection_lost.assert_called_with(err)
1024
1025 @mock.patch('os.write')
1026 def test__write_ready_closing(self, m_write):
1027 tr = self.write_pipe_transport()
1028 self.loop.add_writer(5, tr._write_ready)
1029 tr._closing = True
1030 tr._buffer = bytearray(b'data')
1031 m_write.return_value = 4
1032 tr._write_ready()
1033 self.assertFalse(self.loop.writers)
1034 self.assertFalse(self.loop.readers)
1035 self.assertEqual(bytearray(), tr._buffer)
1036 self.protocol.connection_lost.assert_called_with(None)
1037 self.pipe.close.assert_called_with()
1038
1039 @mock.patch('os.write')
1040 def test_abort(self, m_write):
1041 tr = self.write_pipe_transport()
1042 self.loop.add_writer(5, tr._write_ready)
1043 self.loop.add_reader(5, tr._read_ready)
1044 tr._buffer = [b'da', b'ta']
1045 tr.abort()
1046 self.assertFalse(m_write.called)
1047 self.assertFalse(self.loop.readers)
1048 self.assertFalse(self.loop.writers)
1049 self.assertEqual([], tr._buffer)
1050 self.assertTrue(tr.is_closing())
1051 test_utils.run_briefly(self.loop)
1052 self.protocol.connection_lost.assert_called_with(None)
1053
1054 def test__call_connection_lost(self):
1055 tr = self.write_pipe_transport()
1056 self.assertIsNotNone(tr._protocol)
1057 self.assertIsNotNone(tr._loop)
1058
1059 err = None
1060 tr._call_connection_lost(err)
1061 self.protocol.connection_lost.assert_called_with(err)
1062 self.pipe.close.assert_called_with()
1063
1064 self.assertIsNone(tr._protocol)
1065 self.assertIsNone(tr._loop)
1066
1067 def test__call_connection_lost_with_err(self):
1068 tr = self.write_pipe_transport()
1069 self.assertIsNotNone(tr._protocol)
1070 self.assertIsNotNone(tr._loop)
1071
1072 err = OSError()
1073 tr._call_connection_lost(err)
1074 self.protocol.connection_lost.assert_called_with(err)
1075 self.pipe.close.assert_called_with()
1076
1077 self.assertIsNone(tr._protocol)
1078 self.assertIsNone(tr._loop)
1079
1080 def test_close(self):
1081 tr = self.write_pipe_transport()
1082 tr.write_eof = mock.Mock()
1083 tr.close()
1084 tr.write_eof.assert_called_with()
1085
1086 # closing the transport twice must not fail
1087 tr.close()
1088
1089 def test_close_closing(self):
1090 tr = self.write_pipe_transport()
1091 tr.write_eof = mock.Mock()
1092 tr._closing = True
1093 tr.close()
1094 self.assertFalse(tr.write_eof.called)
1095
1096 def test_write_eof(self):
1097 tr = self.write_pipe_transport()
1098 tr.write_eof()
1099 self.assertTrue(tr.is_closing())
1100 self.assertFalse(self.loop.readers)
1101 test_utils.run_briefly(self.loop)
1102 self.protocol.connection_lost.assert_called_with(None)
1103
1104 def test_write_eof_pending(self):
1105 tr = self.write_pipe_transport()
1106 tr._buffer = [b'data']
1107 tr.write_eof()
1108 self.assertTrue(tr.is_closing())
1109 self.assertFalse(self.protocol.connection_lost.called)
1110
1111
1112 class ESC[4;38;5;81mAbstractChildWatcherTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1113
1114 def test_warns_on_subclassing(self):
1115 with self.assertWarns(DeprecationWarning):
1116 class ESC[4;38;5;81mMyWatcher(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mAbstractChildWatcher):
1117 pass
1118
1119 def test_not_implemented(self):
1120 f = mock.Mock()
1121 watcher = asyncio.AbstractChildWatcher()
1122 self.assertRaises(
1123 NotImplementedError, watcher.add_child_handler, f, f)
1124 self.assertRaises(
1125 NotImplementedError, watcher.remove_child_handler, f)
1126 self.assertRaises(
1127 NotImplementedError, watcher.attach_loop, f)
1128 self.assertRaises(
1129 NotImplementedError, watcher.close)
1130 self.assertRaises(
1131 NotImplementedError, watcher.is_active)
1132 self.assertRaises(
1133 NotImplementedError, watcher.__enter__)
1134 self.assertRaises(
1135 NotImplementedError, watcher.__exit__, f, f, f)
1136
1137
1138 class ESC[4;38;5;81mBaseChildWatcherTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1139
1140 def test_not_implemented(self):
1141 f = mock.Mock()
1142 watcher = unix_events.BaseChildWatcher()
1143 self.assertRaises(
1144 NotImplementedError, watcher._do_waitpid, f)
1145
1146
1147 class ESC[4;38;5;81mChildWatcherTestsMixin:
1148
1149 ignore_warnings = mock.patch.object(log.logger, "warning")
1150
1151 def setUp(self):
1152 super().setUp()
1153 self.loop = self.new_test_loop()
1154 self.running = False
1155 self.zombies = {}
1156
1157 with mock.patch.object(
1158 self.loop, "add_signal_handler") as self.m_add_signal_handler:
1159 self.watcher = self.create_watcher()
1160 self.watcher.attach_loop(self.loop)
1161
1162 def waitpid(self, pid, flags):
1163 if isinstance(self.watcher, asyncio.SafeChildWatcher) or pid != -1:
1164 self.assertGreater(pid, 0)
1165 try:
1166 if pid < 0:
1167 return self.zombies.popitem()
1168 else:
1169 return pid, self.zombies.pop(pid)
1170 except KeyError:
1171 pass
1172 if self.running:
1173 return 0, 0
1174 else:
1175 raise ChildProcessError()
1176
1177 def add_zombie(self, pid, status):
1178 self.zombies[pid] = status
1179
1180 def waitstatus_to_exitcode(self, status):
1181 if status > 32768:
1182 return status - 32768
1183 elif 32700 < status < 32768:
1184 return status - 32768
1185 else:
1186 return status
1187
1188 def test_create_watcher(self):
1189 self.m_add_signal_handler.assert_called_once_with(
1190 signal.SIGCHLD, self.watcher._sig_chld)
1191
1192 def waitpid_mocks(func):
1193 def wrapped_func(self):
1194 def patch(target, wrapper):
1195 return mock.patch(target, wraps=wrapper,
1196 new_callable=mock.Mock)
1197
1198 with patch('asyncio.unix_events.waitstatus_to_exitcode', self.waitstatus_to_exitcode), \
1199 patch('os.waitpid', self.waitpid) as m_waitpid:
1200 func(self, m_waitpid)
1201 return wrapped_func
1202
1203 @waitpid_mocks
1204 def test_sigchld(self, m_waitpid):
1205 # register a child
1206 callback = mock.Mock()
1207
1208 with self.watcher:
1209 self.running = True
1210 self.watcher.add_child_handler(42, callback, 9, 10, 14)
1211
1212 self.assertFalse(callback.called)
1213
1214 # child is running
1215 self.watcher._sig_chld()
1216
1217 self.assertFalse(callback.called)
1218
1219 # child terminates (returncode 12)
1220 self.running = False
1221 self.add_zombie(42, EXITCODE(12))
1222 self.watcher._sig_chld()
1223
1224 callback.assert_called_once_with(42, 12, 9, 10, 14)
1225
1226 callback.reset_mock()
1227
1228 # ensure that the child is effectively reaped
1229 self.add_zombie(42, EXITCODE(13))
1230 with self.ignore_warnings:
1231 self.watcher._sig_chld()
1232
1233 self.assertFalse(callback.called)
1234
1235 # sigchld called again
1236 self.zombies.clear()
1237 self.watcher._sig_chld()
1238
1239 self.assertFalse(callback.called)
1240
1241 @waitpid_mocks
1242 def test_sigchld_two_children(self, m_waitpid):
1243 callback1 = mock.Mock()
1244 callback2 = mock.Mock()
1245
1246 # register child 1
1247 with self.watcher:
1248 self.running = True
1249 self.watcher.add_child_handler(43, callback1, 7, 8)
1250
1251 self.assertFalse(callback1.called)
1252 self.assertFalse(callback2.called)
1253
1254 # register child 2
1255 with self.watcher:
1256 self.watcher.add_child_handler(44, callback2, 147, 18)
1257
1258 self.assertFalse(callback1.called)
1259 self.assertFalse(callback2.called)
1260
1261 # children are running
1262 self.watcher._sig_chld()
1263
1264 self.assertFalse(callback1.called)
1265 self.assertFalse(callback2.called)
1266
1267 # child 1 terminates (signal 3)
1268 self.add_zombie(43, SIGNAL(3))
1269 self.watcher._sig_chld()
1270
1271 callback1.assert_called_once_with(43, -3, 7, 8)
1272 self.assertFalse(callback2.called)
1273
1274 callback1.reset_mock()
1275
1276 # child 2 still running
1277 self.watcher._sig_chld()
1278
1279 self.assertFalse(callback1.called)
1280 self.assertFalse(callback2.called)
1281
1282 # child 2 terminates (code 108)
1283 self.add_zombie(44, EXITCODE(108))
1284 self.running = False
1285 self.watcher._sig_chld()
1286
1287 callback2.assert_called_once_with(44, 108, 147, 18)
1288 self.assertFalse(callback1.called)
1289
1290 callback2.reset_mock()
1291
1292 # ensure that the children are effectively reaped
1293 self.add_zombie(43, EXITCODE(14))
1294 self.add_zombie(44, EXITCODE(15))
1295 with self.ignore_warnings:
1296 self.watcher._sig_chld()
1297
1298 self.assertFalse(callback1.called)
1299 self.assertFalse(callback2.called)
1300
1301 # sigchld called again
1302 self.zombies.clear()
1303 self.watcher._sig_chld()
1304
1305 self.assertFalse(callback1.called)
1306 self.assertFalse(callback2.called)
1307
1308 @waitpid_mocks
1309 def test_sigchld_two_children_terminating_together(self, m_waitpid):
1310 callback1 = mock.Mock()
1311 callback2 = mock.Mock()
1312
1313 # register child 1
1314 with self.watcher:
1315 self.running = True
1316 self.watcher.add_child_handler(45, callback1, 17, 8)
1317
1318 self.assertFalse(callback1.called)
1319 self.assertFalse(callback2.called)
1320
1321 # register child 2
1322 with self.watcher:
1323 self.watcher.add_child_handler(46, callback2, 1147, 18)
1324
1325 self.assertFalse(callback1.called)
1326 self.assertFalse(callback2.called)
1327
1328 # children are running
1329 self.watcher._sig_chld()
1330
1331 self.assertFalse(callback1.called)
1332 self.assertFalse(callback2.called)
1333
1334 # child 1 terminates (code 78)
1335 # child 2 terminates (signal 5)
1336 self.add_zombie(45, EXITCODE(78))
1337 self.add_zombie(46, SIGNAL(5))
1338 self.running = False
1339 self.watcher._sig_chld()
1340
1341 callback1.assert_called_once_with(45, 78, 17, 8)
1342 callback2.assert_called_once_with(46, -5, 1147, 18)
1343
1344 callback1.reset_mock()
1345 callback2.reset_mock()
1346
1347 # ensure that the children are effectively reaped
1348 self.add_zombie(45, EXITCODE(14))
1349 self.add_zombie(46, EXITCODE(15))
1350 with self.ignore_warnings:
1351 self.watcher._sig_chld()
1352
1353 self.assertFalse(callback1.called)
1354 self.assertFalse(callback2.called)
1355
1356 @waitpid_mocks
1357 def test_sigchld_race_condition(self, m_waitpid):
1358 # register a child
1359 callback = mock.Mock()
1360
1361 with self.watcher:
1362 # child terminates before being registered
1363 self.add_zombie(50, EXITCODE(4))
1364 self.watcher._sig_chld()
1365
1366 self.watcher.add_child_handler(50, callback, 1, 12)
1367
1368 callback.assert_called_once_with(50, 4, 1, 12)
1369 callback.reset_mock()
1370
1371 # ensure that the child is effectively reaped
1372 self.add_zombie(50, SIGNAL(1))
1373 with self.ignore_warnings:
1374 self.watcher._sig_chld()
1375
1376 self.assertFalse(callback.called)
1377
1378 @waitpid_mocks
1379 def test_sigchld_replace_handler(self, m_waitpid):
1380 callback1 = mock.Mock()
1381 callback2 = mock.Mock()
1382
1383 # register a child
1384 with self.watcher:
1385 self.running = True
1386 self.watcher.add_child_handler(51, callback1, 19)
1387
1388 self.assertFalse(callback1.called)
1389 self.assertFalse(callback2.called)
1390
1391 # register the same child again
1392 with self.watcher:
1393 self.watcher.add_child_handler(51, callback2, 21)
1394
1395 self.assertFalse(callback1.called)
1396 self.assertFalse(callback2.called)
1397
1398 # child terminates (signal 8)
1399 self.running = False
1400 self.add_zombie(51, SIGNAL(8))
1401 self.watcher._sig_chld()
1402
1403 callback2.assert_called_once_with(51, -8, 21)
1404 self.assertFalse(callback1.called)
1405
1406 callback2.reset_mock()
1407
1408 # ensure that the child is effectively reaped
1409 self.add_zombie(51, EXITCODE(13))
1410 with self.ignore_warnings:
1411 self.watcher._sig_chld()
1412
1413 self.assertFalse(callback1.called)
1414 self.assertFalse(callback2.called)
1415
1416 @waitpid_mocks
1417 def test_sigchld_remove_handler(self, m_waitpid):
1418 callback = mock.Mock()
1419
1420 # register a child
1421 with self.watcher:
1422 self.running = True
1423 self.watcher.add_child_handler(52, callback, 1984)
1424
1425 self.assertFalse(callback.called)
1426
1427 # unregister the child
1428 self.watcher.remove_child_handler(52)
1429
1430 self.assertFalse(callback.called)
1431
1432 # child terminates (code 99)
1433 self.running = False
1434 self.add_zombie(52, EXITCODE(99))
1435 with self.ignore_warnings:
1436 self.watcher._sig_chld()
1437
1438 self.assertFalse(callback.called)
1439
1440 @waitpid_mocks
1441 def test_sigchld_unknown_status(self, m_waitpid):
1442 callback = mock.Mock()
1443
1444 # register a child
1445 with self.watcher:
1446 self.running = True
1447 self.watcher.add_child_handler(53, callback, -19)
1448
1449 self.assertFalse(callback.called)
1450
1451 # terminate with unknown status
1452 self.zombies[53] = 1178
1453 self.running = False
1454 self.watcher._sig_chld()
1455
1456 callback.assert_called_once_with(53, 1178, -19)
1457
1458 callback.reset_mock()
1459
1460 # ensure that the child is effectively reaped
1461 self.add_zombie(53, EXITCODE(101))
1462 with self.ignore_warnings:
1463 self.watcher._sig_chld()
1464
1465 self.assertFalse(callback.called)
1466
1467 @waitpid_mocks
1468 def test_remove_child_handler(self, m_waitpid):
1469 callback1 = mock.Mock()
1470 callback2 = mock.Mock()
1471 callback3 = mock.Mock()
1472
1473 # register children
1474 with self.watcher:
1475 self.running = True
1476 self.watcher.add_child_handler(54, callback1, 1)
1477 self.watcher.add_child_handler(55, callback2, 2)
1478 self.watcher.add_child_handler(56, callback3, 3)
1479
1480 # remove child handler 1
1481 self.assertTrue(self.watcher.remove_child_handler(54))
1482
1483 # remove child handler 2 multiple times
1484 self.assertTrue(self.watcher.remove_child_handler(55))
1485 self.assertFalse(self.watcher.remove_child_handler(55))
1486 self.assertFalse(self.watcher.remove_child_handler(55))
1487
1488 # all children terminate
1489 self.add_zombie(54, EXITCODE(0))
1490 self.add_zombie(55, EXITCODE(1))
1491 self.add_zombie(56, EXITCODE(2))
1492 self.running = False
1493 with self.ignore_warnings:
1494 self.watcher._sig_chld()
1495
1496 self.assertFalse(callback1.called)
1497 self.assertFalse(callback2.called)
1498 callback3.assert_called_once_with(56, 2, 3)
1499
1500 @waitpid_mocks
1501 def test_sigchld_unhandled_exception(self, m_waitpid):
1502 callback = mock.Mock()
1503
1504 # register a child
1505 with self.watcher:
1506 self.running = True
1507 self.watcher.add_child_handler(57, callback)
1508
1509 # raise an exception
1510 m_waitpid.side_effect = ValueError
1511
1512 with mock.patch.object(log.logger,
1513 'error') as m_error:
1514
1515 self.assertEqual(self.watcher._sig_chld(), None)
1516 self.assertTrue(m_error.called)
1517
1518 @waitpid_mocks
1519 def test_sigchld_child_reaped_elsewhere(self, m_waitpid):
1520 # register a child
1521 callback = mock.Mock()
1522
1523 with self.watcher:
1524 self.running = True
1525 self.watcher.add_child_handler(58, callback)
1526
1527 self.assertFalse(callback.called)
1528
1529 # child terminates
1530 self.running = False
1531 self.add_zombie(58, EXITCODE(4))
1532
1533 # waitpid is called elsewhere
1534 os.waitpid(58, os.WNOHANG)
1535
1536 m_waitpid.reset_mock()
1537
1538 # sigchld
1539 with self.ignore_warnings:
1540 self.watcher._sig_chld()
1541
1542 if isinstance(self.watcher, asyncio.FastChildWatcher):
1543 # here the FastChildWatcher enters a deadlock
1544 # (there is no way to prevent it)
1545 self.assertFalse(callback.called)
1546 else:
1547 callback.assert_called_once_with(58, 255)
1548
1549 @waitpid_mocks
1550 def test_sigchld_unknown_pid_during_registration(self, m_waitpid):
1551 # register two children
1552 callback1 = mock.Mock()
1553 callback2 = mock.Mock()
1554
1555 with self.ignore_warnings, self.watcher:
1556 self.running = True
1557 # child 1 terminates
1558 self.add_zombie(591, EXITCODE(7))
1559 # an unknown child terminates
1560 self.add_zombie(593, EXITCODE(17))
1561
1562 self.watcher._sig_chld()
1563
1564 self.watcher.add_child_handler(591, callback1)
1565 self.watcher.add_child_handler(592, callback2)
1566
1567 callback1.assert_called_once_with(591, 7)
1568 self.assertFalse(callback2.called)
1569
1570 @waitpid_mocks
1571 def test_set_loop(self, m_waitpid):
1572 # register a child
1573 callback = mock.Mock()
1574
1575 with self.watcher:
1576 self.running = True
1577 self.watcher.add_child_handler(60, callback)
1578
1579 # attach a new loop
1580 old_loop = self.loop
1581 self.loop = self.new_test_loop()
1582 patch = mock.patch.object
1583
1584 with patch(old_loop, "remove_signal_handler") as m_old_remove, \
1585 patch(self.loop, "add_signal_handler") as m_new_add:
1586
1587 self.watcher.attach_loop(self.loop)
1588
1589 m_old_remove.assert_called_once_with(
1590 signal.SIGCHLD)
1591 m_new_add.assert_called_once_with(
1592 signal.SIGCHLD, self.watcher._sig_chld)
1593
1594 # child terminates
1595 self.running = False
1596 self.add_zombie(60, EXITCODE(9))
1597 self.watcher._sig_chld()
1598
1599 callback.assert_called_once_with(60, 9)
1600
1601 @waitpid_mocks
1602 def test_set_loop_race_condition(self, m_waitpid):
1603 # register 3 children
1604 callback1 = mock.Mock()
1605 callback2 = mock.Mock()
1606 callback3 = mock.Mock()
1607
1608 with self.watcher:
1609 self.running = True
1610 self.watcher.add_child_handler(61, callback1)
1611 self.watcher.add_child_handler(62, callback2)
1612 self.watcher.add_child_handler(622, callback3)
1613
1614 # detach the loop
1615 old_loop = self.loop
1616 self.loop = None
1617
1618 with mock.patch.object(
1619 old_loop, "remove_signal_handler") as m_remove_signal_handler:
1620
1621 with self.assertWarnsRegex(
1622 RuntimeWarning, 'A loop is being detached'):
1623 self.watcher.attach_loop(None)
1624
1625 m_remove_signal_handler.assert_called_once_with(
1626 signal.SIGCHLD)
1627
1628 # child 1 & 2 terminate
1629 self.add_zombie(61, EXITCODE(11))
1630 self.add_zombie(62, SIGNAL(5))
1631
1632 # SIGCHLD was not caught
1633 self.assertFalse(callback1.called)
1634 self.assertFalse(callback2.called)
1635 self.assertFalse(callback3.called)
1636
1637 # attach a new loop
1638 self.loop = self.new_test_loop()
1639
1640 with mock.patch.object(
1641 self.loop, "add_signal_handler") as m_add_signal_handler:
1642
1643 self.watcher.attach_loop(self.loop)
1644
1645 m_add_signal_handler.assert_called_once_with(
1646 signal.SIGCHLD, self.watcher._sig_chld)
1647 callback1.assert_called_once_with(61, 11) # race condition!
1648 callback2.assert_called_once_with(62, -5) # race condition!
1649 self.assertFalse(callback3.called)
1650
1651 callback1.reset_mock()
1652 callback2.reset_mock()
1653
1654 # child 3 terminates
1655 self.running = False
1656 self.add_zombie(622, EXITCODE(19))
1657 self.watcher._sig_chld()
1658
1659 self.assertFalse(callback1.called)
1660 self.assertFalse(callback2.called)
1661 callback3.assert_called_once_with(622, 19)
1662
1663 @waitpid_mocks
1664 def test_close(self, m_waitpid):
1665 # register two children
1666 callback1 = mock.Mock()
1667
1668 with self.watcher:
1669 self.running = True
1670 # child 1 terminates
1671 self.add_zombie(63, EXITCODE(9))
1672 # other child terminates
1673 self.add_zombie(65, EXITCODE(18))
1674 self.watcher._sig_chld()
1675
1676 self.watcher.add_child_handler(63, callback1)
1677 self.watcher.add_child_handler(64, callback1)
1678
1679 self.assertEqual(len(self.watcher._callbacks), 1)
1680 if isinstance(self.watcher, asyncio.FastChildWatcher):
1681 self.assertEqual(len(self.watcher._zombies), 1)
1682
1683 with mock.patch.object(
1684 self.loop,
1685 "remove_signal_handler") as m_remove_signal_handler:
1686
1687 self.watcher.close()
1688
1689 m_remove_signal_handler.assert_called_once_with(
1690 signal.SIGCHLD)
1691 self.assertFalse(self.watcher._callbacks)
1692 if isinstance(self.watcher, asyncio.FastChildWatcher):
1693 self.assertFalse(self.watcher._zombies)
1694
1695
1696 class ESC[4;38;5;81mSafeChildWatcherTests (ESC[4;38;5;149mChildWatcherTestsMixin, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1697 def create_watcher(self):
1698 with warnings.catch_warnings():
1699 warnings.simplefilter("ignore", DeprecationWarning)
1700 return asyncio.SafeChildWatcher()
1701
1702
1703 class ESC[4;38;5;81mFastChildWatcherTests (ESC[4;38;5;149mChildWatcherTestsMixin, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1704 def create_watcher(self):
1705 with warnings.catch_warnings():
1706 warnings.simplefilter("ignore", DeprecationWarning)
1707 return asyncio.FastChildWatcher()
1708
1709
1710 class ESC[4;38;5;81mPolicyTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1711
1712 def create_policy(self):
1713 return asyncio.DefaultEventLoopPolicy()
1714
1715 @mock.patch('asyncio.unix_events.can_use_pidfd')
1716 def test_get_default_child_watcher(self, m_can_use_pidfd):
1717 m_can_use_pidfd.return_value = False
1718 policy = self.create_policy()
1719 self.assertIsNone(policy._watcher)
1720 with self.assertWarns(DeprecationWarning):
1721 watcher = policy.get_child_watcher()
1722 self.assertIsInstance(watcher, asyncio.ThreadedChildWatcher)
1723
1724 self.assertIs(policy._watcher, watcher)
1725 with self.assertWarns(DeprecationWarning):
1726 self.assertIs(watcher, policy.get_child_watcher())
1727
1728 m_can_use_pidfd.return_value = True
1729 policy = self.create_policy()
1730 self.assertIsNone(policy._watcher)
1731 with self.assertWarns(DeprecationWarning):
1732 watcher = policy.get_child_watcher()
1733 self.assertIsInstance(watcher, asyncio.PidfdChildWatcher)
1734
1735 self.assertIs(policy._watcher, watcher)
1736 with self.assertWarns(DeprecationWarning):
1737 self.assertIs(watcher, policy.get_child_watcher())
1738
1739 def test_get_child_watcher_after_set(self):
1740 policy = self.create_policy()
1741 with warnings.catch_warnings():
1742 warnings.simplefilter("ignore", DeprecationWarning)
1743 watcher = asyncio.FastChildWatcher()
1744 policy.set_child_watcher(watcher)
1745
1746 self.assertIs(policy._watcher, watcher)
1747 with self.assertWarns(DeprecationWarning):
1748 self.assertIs(watcher, policy.get_child_watcher())
1749
1750 def test_get_child_watcher_thread(self):
1751
1752 def f():
1753 policy.set_event_loop(policy.new_event_loop())
1754
1755 self.assertIsInstance(policy.get_event_loop(),
1756 asyncio.AbstractEventLoop)
1757 with warnings.catch_warnings():
1758 warnings.simplefilter("ignore", DeprecationWarning)
1759 watcher = policy.get_child_watcher()
1760
1761 self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
1762 self.assertIsNone(watcher._loop)
1763
1764 policy.get_event_loop().close()
1765
1766 policy = self.create_policy()
1767 with warnings.catch_warnings():
1768 warnings.simplefilter("ignore", DeprecationWarning)
1769 policy.set_child_watcher(asyncio.SafeChildWatcher())
1770
1771 th = threading.Thread(target=f)
1772 th.start()
1773 th.join()
1774
1775 def test_child_watcher_replace_mainloop_existing(self):
1776 policy = self.create_policy()
1777 loop = policy.new_event_loop()
1778 policy.set_event_loop(loop)
1779
1780 # Explicitly setup SafeChildWatcher,
1781 # default ThreadedChildWatcher has no _loop property
1782 with warnings.catch_warnings():
1783 warnings.simplefilter("ignore", DeprecationWarning)
1784 watcher = asyncio.SafeChildWatcher()
1785 policy.set_child_watcher(watcher)
1786 watcher.attach_loop(loop)
1787
1788 self.assertIs(watcher._loop, loop)
1789
1790 new_loop = policy.new_event_loop()
1791 policy.set_event_loop(new_loop)
1792
1793 self.assertIs(watcher._loop, new_loop)
1794
1795 policy.set_event_loop(None)
1796
1797 self.assertIs(watcher._loop, None)
1798
1799 loop.close()
1800 new_loop.close()
1801
1802
1803 class ESC[4;38;5;81mTestFunctional(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1804
1805 def setUp(self):
1806 self.loop = asyncio.new_event_loop()
1807 asyncio.set_event_loop(self.loop)
1808
1809 def tearDown(self):
1810 self.loop.close()
1811 asyncio.set_event_loop(None)
1812
1813 def test_add_reader_invalid_argument(self):
1814 def assert_raises():
1815 return self.assertRaisesRegex(ValueError, r'Invalid file object')
1816
1817 cb = lambda: None
1818
1819 with assert_raises():
1820 self.loop.add_reader(object(), cb)
1821 with assert_raises():
1822 self.loop.add_writer(object(), cb)
1823
1824 with assert_raises():
1825 self.loop.remove_reader(object())
1826 with assert_raises():
1827 self.loop.remove_writer(object())
1828
1829 def test_add_reader_or_writer_transport_fd(self):
1830 def assert_raises():
1831 return self.assertRaisesRegex(
1832 RuntimeError,
1833 r'File descriptor .* is used by transport')
1834
1835 async def runner():
1836 tr, pr = await self.loop.create_connection(
1837 lambda: asyncio.Protocol(), sock=rsock)
1838
1839 try:
1840 cb = lambda: None
1841
1842 with assert_raises():
1843 self.loop.add_reader(rsock, cb)
1844 with assert_raises():
1845 self.loop.add_reader(rsock.fileno(), cb)
1846
1847 with assert_raises():
1848 self.loop.remove_reader(rsock)
1849 with assert_raises():
1850 self.loop.remove_reader(rsock.fileno())
1851
1852 with assert_raises():
1853 self.loop.add_writer(rsock, cb)
1854 with assert_raises():
1855 self.loop.add_writer(rsock.fileno(), cb)
1856
1857 with assert_raises():
1858 self.loop.remove_writer(rsock)
1859 with assert_raises():
1860 self.loop.remove_writer(rsock.fileno())
1861
1862 finally:
1863 tr.close()
1864
1865 rsock, wsock = socket.socketpair()
1866 try:
1867 self.loop.run_until_complete(runner())
1868 finally:
1869 rsock.close()
1870 wsock.close()
1871
1872
1873 @unittest.skipUnless(hasattr(os, 'fork'), 'requires os.fork()')
1874 class ESC[4;38;5;81mTestFork(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
1875
1876 async def test_fork_not_share_event_loop(self):
1877 # The forked process should not share the event loop with the parent
1878 loop = asyncio.get_running_loop()
1879 r, w = os.pipe()
1880 self.addCleanup(os.close, r)
1881 self.addCleanup(os.close, w)
1882 pid = os.fork()
1883 if pid == 0:
1884 # child
1885 try:
1886 with self.assertWarns(DeprecationWarning):
1887 loop = asyncio.get_event_loop_policy().get_event_loop()
1888 os.write(w, b'LOOP:' + str(id(loop)).encode())
1889 except RuntimeError:
1890 os.write(w, b'NO LOOP')
1891 except BaseException as e:
1892 os.write(w, b'ERROR:' + ascii(e).encode())
1893 finally:
1894 os._exit(0)
1895 else:
1896 # parent
1897 result = os.read(r, 100)
1898 self.assertEqual(result[:5], b'LOOP:', result)
1899 self.assertNotEqual(int(result[5:]), id(loop))
1900 wait_process(pid, exitcode=0)
1901
1902 @hashlib_helper.requires_hashdigest('md5')
1903 def test_fork_signal_handling(self):
1904 # Sending signal to the forked process should not affect the parent
1905 # process
1906 ctx = multiprocessing.get_context('fork')
1907 manager = ctx.Manager()
1908 self.addCleanup(manager.shutdown)
1909 child_started = manager.Event()
1910 child_handled = manager.Event()
1911 parent_handled = manager.Event()
1912
1913 def child_main():
1914 signal.signal(signal.SIGTERM, lambda *args: child_handled.set())
1915 child_started.set()
1916
1917 async def main():
1918 loop = asyncio.get_running_loop()
1919 loop.add_signal_handler(signal.SIGTERM, lambda *args: parent_handled.set())
1920
1921 process = ctx.Process(target=child_main)
1922 process.start()
1923 child_started.wait()
1924 os.kill(process.pid, signal.SIGTERM)
1925 process.join()
1926
1927 async def func():
1928 await asyncio.sleep(0.1)
1929 return 42
1930
1931 # Test parent's loop is still functional
1932 self.assertEqual(await asyncio.create_task(func()), 42)
1933
1934 asyncio.run(main())
1935
1936 self.assertFalse(parent_handled.is_set())
1937 self.assertTrue(child_handled.is_set())
1938
1939 @hashlib_helper.requires_hashdigest('md5')
1940 def test_fork_asyncio_run(self):
1941 ctx = multiprocessing.get_context('fork')
1942 manager = ctx.Manager()
1943 self.addCleanup(manager.shutdown)
1944 result = manager.Value('i', 0)
1945
1946 async def child_main():
1947 await asyncio.sleep(0.1)
1948 result.value = 42
1949
1950 process = ctx.Process(target=lambda: asyncio.run(child_main()))
1951 process.start()
1952 process.join()
1953
1954 self.assertEqual(result.value, 42)
1955
1956 @hashlib_helper.requires_hashdigest('md5')
1957 def test_fork_asyncio_subprocess(self):
1958 ctx = multiprocessing.get_context('fork')
1959 manager = ctx.Manager()
1960 self.addCleanup(manager.shutdown)
1961 result = manager.Value('i', 1)
1962
1963 async def child_main():
1964 proc = await asyncio.create_subprocess_exec(sys.executable, '-c', 'pass')
1965 result.value = await proc.wait()
1966
1967 process = ctx.Process(target=lambda: asyncio.run(child_main()))
1968 process.start()
1969 process.join()
1970
1971 self.assertEqual(result.value, 0)
1972
1973 if __name__ == '__main__':
1974 unittest.main()