python (3.12.0)
1 import os
2 import signal
3 import sys
4 import unittest
5 import warnings
6 from unittest import mock
7
8 import asyncio
9 from asyncio import base_subprocess
10 from asyncio import subprocess
11 from test.test_asyncio import utils as test_utils
12 from test import support
13 from test.support import os_helper
14
15 if sys.platform != 'win32':
16 from asyncio import unix_events
17
18 if support.check_sanitizer(address=True):
19 raise unittest.SkipTest("Exposes ASAN flakiness in GitHub CI")
20
21 # Program blocking
22 PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)']
23
24 # Program copying input to output
25 PROGRAM_CAT = [
26 sys.executable, '-c',
27 ';'.join(('import sys',
28 'data = sys.stdin.buffer.read()',
29 'sys.stdout.buffer.write(data)'))]
30
31
32 def tearDownModule():
33 asyncio.set_event_loop_policy(None)
34
35
36 class ESC[4;38;5;81mTestSubprocessTransport(ESC[4;38;5;149mbase_subprocessESC[4;38;5;149m.ESC[4;38;5;149mBaseSubprocessTransport):
37 def _start(self, *args, **kwargs):
38 self._proc = mock.Mock()
39 self._proc.stdin = None
40 self._proc.stdout = None
41 self._proc.stderr = None
42 self._proc.pid = -1
43
44
45 class ESC[4;38;5;81mSubprocessTransportTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
46 def setUp(self):
47 super().setUp()
48 self.loop = self.new_test_loop()
49 self.set_event_loop(self.loop)
50
51 def create_transport(self, waiter=None):
52 protocol = mock.Mock()
53 transport = TestSubprocessTransport(
54 self.loop, protocol, ['test'], False,
55 None, None, None, 0, waiter=waiter)
56 return (transport, protocol)
57
58 def test_proc_exited(self):
59 waiter = self.loop.create_future()
60 transport, protocol = self.create_transport(waiter)
61 transport._process_exited(6)
62 self.loop.run_until_complete(waiter)
63
64 self.assertEqual(transport.get_returncode(), 6)
65
66 self.assertTrue(protocol.connection_made.called)
67 self.assertTrue(protocol.process_exited.called)
68 self.assertTrue(protocol.connection_lost.called)
69 self.assertEqual(protocol.connection_lost.call_args[0], (None,))
70
71 self.assertFalse(transport.is_closing())
72 self.assertIsNone(transport._loop)
73 self.assertIsNone(transport._proc)
74 self.assertIsNone(transport._protocol)
75
76 # methods must raise ProcessLookupError if the process exited
77 self.assertRaises(ProcessLookupError,
78 transport.send_signal, signal.SIGTERM)
79 self.assertRaises(ProcessLookupError, transport.terminate)
80 self.assertRaises(ProcessLookupError, transport.kill)
81
82 transport.close()
83
84 def test_subprocess_repr(self):
85 waiter = self.loop.create_future()
86 transport, protocol = self.create_transport(waiter)
87 transport._process_exited(6)
88 self.loop.run_until_complete(waiter)
89
90 self.assertEqual(
91 repr(transport),
92 "<TestSubprocessTransport pid=-1 returncode=6>"
93 )
94 transport._returncode = None
95 self.assertEqual(
96 repr(transport),
97 "<TestSubprocessTransport pid=-1 running>"
98 )
99 transport._pid = None
100 transport._returncode = None
101 self.assertEqual(
102 repr(transport),
103 "<TestSubprocessTransport not started>"
104 )
105 transport.close()
106
107
108 class ESC[4;38;5;81mSubprocessMixin:
109
110 def test_stdin_stdout(self):
111 args = PROGRAM_CAT
112
113 async def run(data):
114 proc = await asyncio.create_subprocess_exec(
115 *args,
116 stdin=subprocess.PIPE,
117 stdout=subprocess.PIPE,
118 )
119
120 # feed data
121 proc.stdin.write(data)
122 await proc.stdin.drain()
123 proc.stdin.close()
124
125 # get output and exitcode
126 data = await proc.stdout.read()
127 exitcode = await proc.wait()
128 return (exitcode, data)
129
130 task = run(b'some data')
131 task = asyncio.wait_for(task, 60.0)
132 exitcode, stdout = self.loop.run_until_complete(task)
133 self.assertEqual(exitcode, 0)
134 self.assertEqual(stdout, b'some data')
135
136 def test_communicate(self):
137 args = PROGRAM_CAT
138
139 async def run(data):
140 proc = await asyncio.create_subprocess_exec(
141 *args,
142 stdin=subprocess.PIPE,
143 stdout=subprocess.PIPE,
144 )
145 stdout, stderr = await proc.communicate(data)
146 return proc.returncode, stdout
147
148 task = run(b'some data')
149 task = asyncio.wait_for(task, support.LONG_TIMEOUT)
150 exitcode, stdout = self.loop.run_until_complete(task)
151 self.assertEqual(exitcode, 0)
152 self.assertEqual(stdout, b'some data')
153
154 def test_communicate_none_input(self):
155 args = PROGRAM_CAT
156
157 async def run():
158 proc = await asyncio.create_subprocess_exec(
159 *args,
160 stdin=subprocess.PIPE,
161 stdout=subprocess.PIPE,
162 )
163 stdout, stderr = await proc.communicate()
164 return proc.returncode, stdout
165
166 task = run()
167 task = asyncio.wait_for(task, support.LONG_TIMEOUT)
168 exitcode, stdout = self.loop.run_until_complete(task)
169 self.assertEqual(exitcode, 0)
170 self.assertEqual(stdout, b'')
171
172 def test_shell(self):
173 proc = self.loop.run_until_complete(
174 asyncio.create_subprocess_shell('exit 7')
175 )
176 exitcode = self.loop.run_until_complete(proc.wait())
177 self.assertEqual(exitcode, 7)
178
179 def test_start_new_session(self):
180 # start the new process in a new session
181 proc = self.loop.run_until_complete(
182 asyncio.create_subprocess_shell(
183 'exit 8',
184 start_new_session=True,
185 )
186 )
187 exitcode = self.loop.run_until_complete(proc.wait())
188 self.assertEqual(exitcode, 8)
189
190 def test_kill(self):
191 args = PROGRAM_BLOCKED
192 proc = self.loop.run_until_complete(
193 asyncio.create_subprocess_exec(*args)
194 )
195 proc.kill()
196 returncode = self.loop.run_until_complete(proc.wait())
197 if sys.platform == 'win32':
198 self.assertIsInstance(returncode, int)
199 # expect 1 but sometimes get 0
200 else:
201 self.assertEqual(-signal.SIGKILL, returncode)
202
203 def test_kill_issue43884(self):
204 if sys.platform == 'win32':
205 blocking_shell_command = f'{sys.executable} -c "import time; time.sleep(2)"'
206 else:
207 blocking_shell_command = 'sleep 1; sleep 1'
208 creationflags = 0
209 if sys.platform == 'win32':
210 from subprocess import CREATE_NEW_PROCESS_GROUP
211 # On windows create a new process group so that killing process
212 # kills the process and all its children.
213 creationflags = CREATE_NEW_PROCESS_GROUP
214 proc = self.loop.run_until_complete(
215 asyncio.create_subprocess_shell(blocking_shell_command, stdout=asyncio.subprocess.PIPE,
216 creationflags=creationflags)
217 )
218 self.loop.run_until_complete(asyncio.sleep(1))
219 if sys.platform == 'win32':
220 proc.send_signal(signal.CTRL_BREAK_EVENT)
221 # On windows it is an alias of terminate which sets the return code
222 proc.kill()
223 returncode = self.loop.run_until_complete(proc.wait())
224 if sys.platform == 'win32':
225 self.assertIsInstance(returncode, int)
226 # expect 1 but sometimes get 0
227 else:
228 self.assertEqual(-signal.SIGKILL, returncode)
229
230 def test_terminate(self):
231 args = PROGRAM_BLOCKED
232 proc = self.loop.run_until_complete(
233 asyncio.create_subprocess_exec(*args)
234 )
235 proc.terminate()
236 returncode = self.loop.run_until_complete(proc.wait())
237 if sys.platform == 'win32':
238 self.assertIsInstance(returncode, int)
239 # expect 1 but sometimes get 0
240 else:
241 self.assertEqual(-signal.SIGTERM, returncode)
242
243 @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
244 def test_send_signal(self):
245 # bpo-31034: Make sure that we get the default signal handler (killing
246 # the process). The parent process may have decided to ignore SIGHUP,
247 # and signal handlers are inherited.
248 old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL)
249 try:
250 code = 'import time; print("sleeping", flush=True); time.sleep(3600)'
251 args = [sys.executable, '-c', code]
252 proc = self.loop.run_until_complete(
253 asyncio.create_subprocess_exec(
254 *args,
255 stdout=subprocess.PIPE,
256 )
257 )
258
259 async def send_signal(proc):
260 # basic synchronization to wait until the program is sleeping
261 line = await proc.stdout.readline()
262 self.assertEqual(line, b'sleeping\n')
263
264 proc.send_signal(signal.SIGHUP)
265 returncode = await proc.wait()
266 return returncode
267
268 returncode = self.loop.run_until_complete(send_signal(proc))
269 self.assertEqual(-signal.SIGHUP, returncode)
270 finally:
271 signal.signal(signal.SIGHUP, old_handler)
272
273 def prepare_broken_pipe_test(self):
274 # buffer large enough to feed the whole pipe buffer
275 large_data = b'x' * support.PIPE_MAX_SIZE
276
277 # the program ends before the stdin can be fed
278 proc = self.loop.run_until_complete(
279 asyncio.create_subprocess_exec(
280 sys.executable, '-c', 'pass',
281 stdin=subprocess.PIPE,
282 )
283 )
284
285 return (proc, large_data)
286
287 def test_stdin_broken_pipe(self):
288 proc, large_data = self.prepare_broken_pipe_test()
289
290 async def write_stdin(proc, data):
291 await asyncio.sleep(0.5)
292 proc.stdin.write(data)
293 await proc.stdin.drain()
294
295 coro = write_stdin(proc, large_data)
296 # drain() must raise BrokenPipeError or ConnectionResetError
297 with test_utils.disable_logger():
298 self.assertRaises((BrokenPipeError, ConnectionResetError),
299 self.loop.run_until_complete, coro)
300 self.loop.run_until_complete(proc.wait())
301
302 def test_communicate_ignore_broken_pipe(self):
303 proc, large_data = self.prepare_broken_pipe_test()
304
305 # communicate() must ignore BrokenPipeError when feeding stdin
306 self.loop.set_exception_handler(lambda loop, msg: None)
307 self.loop.run_until_complete(proc.communicate(large_data))
308 self.loop.run_until_complete(proc.wait())
309
310 def test_pause_reading(self):
311 limit = 10
312 size = (limit * 2 + 1)
313
314 async def test_pause_reading():
315 code = '\n'.join((
316 'import sys',
317 'sys.stdout.write("x" * %s)' % size,
318 'sys.stdout.flush()',
319 ))
320
321 connect_read_pipe = self.loop.connect_read_pipe
322
323 async def connect_read_pipe_mock(*args, **kw):
324 transport, protocol = await connect_read_pipe(*args, **kw)
325 transport.pause_reading = mock.Mock()
326 transport.resume_reading = mock.Mock()
327 return (transport, protocol)
328
329 self.loop.connect_read_pipe = connect_read_pipe_mock
330
331 proc = await asyncio.create_subprocess_exec(
332 sys.executable, '-c', code,
333 stdin=asyncio.subprocess.PIPE,
334 stdout=asyncio.subprocess.PIPE,
335 limit=limit,
336 )
337 stdout_transport = proc._transport.get_pipe_transport(1)
338
339 stdout, stderr = await proc.communicate()
340
341 # The child process produced more than limit bytes of output,
342 # the stream reader transport should pause the protocol to not
343 # allocate too much memory.
344 return (stdout, stdout_transport)
345
346 # Issue #22685: Ensure that the stream reader pauses the protocol
347 # when the child process produces too much data
348 stdout, transport = self.loop.run_until_complete(test_pause_reading())
349
350 self.assertEqual(stdout, b'x' * size)
351 self.assertTrue(transport.pause_reading.called)
352 self.assertTrue(transport.resume_reading.called)
353
354 def test_stdin_not_inheritable(self):
355 # asyncio issue #209: stdin must not be inheritable, otherwise
356 # the Process.communicate() hangs
357 async def len_message(message):
358 code = 'import sys; data = sys.stdin.read(); print(len(data))'
359 proc = await asyncio.create_subprocess_exec(
360 sys.executable, '-c', code,
361 stdin=asyncio.subprocess.PIPE,
362 stdout=asyncio.subprocess.PIPE,
363 stderr=asyncio.subprocess.PIPE,
364 close_fds=False,
365 )
366 stdout, stderr = await proc.communicate(message)
367 exitcode = await proc.wait()
368 return (stdout, exitcode)
369
370 output, exitcode = self.loop.run_until_complete(len_message(b'abc'))
371 self.assertEqual(output.rstrip(), b'3')
372 self.assertEqual(exitcode, 0)
373
374 def test_empty_input(self):
375
376 async def empty_input():
377 code = 'import sys; data = sys.stdin.read(); print(len(data))'
378 proc = await asyncio.create_subprocess_exec(
379 sys.executable, '-c', code,
380 stdin=asyncio.subprocess.PIPE,
381 stdout=asyncio.subprocess.PIPE,
382 stderr=asyncio.subprocess.PIPE,
383 close_fds=False,
384 )
385 stdout, stderr = await proc.communicate(b'')
386 exitcode = await proc.wait()
387 return (stdout, exitcode)
388
389 output, exitcode = self.loop.run_until_complete(empty_input())
390 self.assertEqual(output.rstrip(), b'0')
391 self.assertEqual(exitcode, 0)
392
393 def test_devnull_input(self):
394
395 async def empty_input():
396 code = 'import sys; data = sys.stdin.read(); print(len(data))'
397 proc = await asyncio.create_subprocess_exec(
398 sys.executable, '-c', code,
399 stdin=asyncio.subprocess.DEVNULL,
400 stdout=asyncio.subprocess.PIPE,
401 stderr=asyncio.subprocess.PIPE,
402 close_fds=False,
403 )
404 stdout, stderr = await proc.communicate()
405 exitcode = await proc.wait()
406 return (stdout, exitcode)
407
408 output, exitcode = self.loop.run_until_complete(empty_input())
409 self.assertEqual(output.rstrip(), b'0')
410 self.assertEqual(exitcode, 0)
411
412 def test_devnull_output(self):
413
414 async def empty_output():
415 code = 'import sys; data = sys.stdin.read(); print(len(data))'
416 proc = await asyncio.create_subprocess_exec(
417 sys.executable, '-c', code,
418 stdin=asyncio.subprocess.PIPE,
419 stdout=asyncio.subprocess.DEVNULL,
420 stderr=asyncio.subprocess.PIPE,
421 close_fds=False,
422 )
423 stdout, stderr = await proc.communicate(b"abc")
424 exitcode = await proc.wait()
425 return (stdout, exitcode)
426
427 output, exitcode = self.loop.run_until_complete(empty_output())
428 self.assertEqual(output, None)
429 self.assertEqual(exitcode, 0)
430
431 def test_devnull_error(self):
432
433 async def empty_error():
434 code = 'import sys; data = sys.stdin.read(); print(len(data))'
435 proc = await asyncio.create_subprocess_exec(
436 sys.executable, '-c', code,
437 stdin=asyncio.subprocess.PIPE,
438 stdout=asyncio.subprocess.PIPE,
439 stderr=asyncio.subprocess.DEVNULL,
440 close_fds=False,
441 )
442 stdout, stderr = await proc.communicate(b"abc")
443 exitcode = await proc.wait()
444 return (stderr, exitcode)
445
446 output, exitcode = self.loop.run_until_complete(empty_error())
447 self.assertEqual(output, None)
448 self.assertEqual(exitcode, 0)
449
450 @unittest.skipIf(sys.platform != 'linux', "Don't have /dev/stdin")
451 def test_devstdin_input(self):
452
453 async def devstdin_input(message):
454 code = 'file = open("/dev/stdin"); data = file.read(); print(len(data))'
455 proc = await asyncio.create_subprocess_exec(
456 sys.executable, '-c', code,
457 stdin=asyncio.subprocess.PIPE,
458 stdout=asyncio.subprocess.PIPE,
459 stderr=asyncio.subprocess.PIPE,
460 close_fds=False,
461 )
462 stdout, stderr = await proc.communicate(message)
463 exitcode = await proc.wait()
464 return (stdout, exitcode)
465
466 output, exitcode = self.loop.run_until_complete(devstdin_input(b'abc'))
467 self.assertEqual(output.rstrip(), b'3')
468 self.assertEqual(exitcode, 0)
469
470 def test_cancel_process_wait(self):
471 # Issue #23140: cancel Process.wait()
472
473 async def cancel_wait():
474 proc = await asyncio.create_subprocess_exec(*PROGRAM_BLOCKED)
475
476 # Create an internal future waiting on the process exit
477 task = self.loop.create_task(proc.wait())
478 self.loop.call_soon(task.cancel)
479 try:
480 await task
481 except asyncio.CancelledError:
482 pass
483
484 # Cancel the future
485 task.cancel()
486
487 # Kill the process and wait until it is done
488 proc.kill()
489 await proc.wait()
490
491 self.loop.run_until_complete(cancel_wait())
492
493 def test_cancel_make_subprocess_transport_exec(self):
494
495 async def cancel_make_transport():
496 coro = asyncio.create_subprocess_exec(*PROGRAM_BLOCKED)
497 task = self.loop.create_task(coro)
498
499 self.loop.call_soon(task.cancel)
500 try:
501 await task
502 except asyncio.CancelledError:
503 pass
504
505 # ignore the log:
506 # "Exception during subprocess creation, kill the subprocess"
507 with test_utils.disable_logger():
508 self.loop.run_until_complete(cancel_make_transport())
509
510 def test_cancel_post_init(self):
511
512 async def cancel_make_transport():
513 coro = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
514 *PROGRAM_BLOCKED)
515 task = self.loop.create_task(coro)
516
517 self.loop.call_soon(task.cancel)
518 try:
519 await task
520 except asyncio.CancelledError:
521 pass
522
523 # ignore the log:
524 # "Exception during subprocess creation, kill the subprocess"
525 with test_utils.disable_logger():
526 self.loop.run_until_complete(cancel_make_transport())
527 test_utils.run_briefly(self.loop)
528
529 def test_close_kill_running(self):
530
531 async def kill_running():
532 create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
533 *PROGRAM_BLOCKED)
534 transport, protocol = await create
535
536 kill_called = False
537 def kill():
538 nonlocal kill_called
539 kill_called = True
540 orig_kill()
541
542 proc = transport.get_extra_info('subprocess')
543 orig_kill = proc.kill
544 proc.kill = kill
545 returncode = transport.get_returncode()
546 transport.close()
547 await asyncio.wait_for(transport._wait(), 5)
548 return (returncode, kill_called)
549
550 # Ignore "Close running child process: kill ..." log
551 with test_utils.disable_logger():
552 try:
553 returncode, killed = self.loop.run_until_complete(
554 kill_running()
555 )
556 except asyncio.TimeoutError:
557 self.skipTest(
558 "Timeout failure on waiting for subprocess stopping"
559 )
560 self.assertIsNone(returncode)
561
562 # transport.close() must kill the process if it is still running
563 self.assertTrue(killed)
564 test_utils.run_briefly(self.loop)
565
566 def test_close_dont_kill_finished(self):
567
568 async def kill_running():
569 create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
570 *PROGRAM_BLOCKED)
571 transport, protocol = await create
572 proc = transport.get_extra_info('subprocess')
573
574 # kill the process (but asyncio is not notified immediately)
575 proc.kill()
576 proc.wait()
577
578 proc.kill = mock.Mock()
579 proc_returncode = proc.poll()
580 transport_returncode = transport.get_returncode()
581 transport.close()
582 return (proc_returncode, transport_returncode, proc.kill.called)
583
584 # Ignore "Unknown child process pid ..." log of SafeChildWatcher,
585 # emitted because the test already consumes the exit status:
586 # proc.wait()
587 with test_utils.disable_logger():
588 result = self.loop.run_until_complete(kill_running())
589 test_utils.run_briefly(self.loop)
590
591 proc_returncode, transport_return_code, killed = result
592
593 self.assertIsNotNone(proc_returncode)
594 self.assertIsNone(transport_return_code)
595
596 # transport.close() must not kill the process if it finished, even if
597 # the transport was not notified yet
598 self.assertFalse(killed)
599
600 # Unlike SafeChildWatcher, FastChildWatcher does not pop the
601 # callbacks if waitpid() is called elsewhere. Let's clear them
602 # manually to avoid a warning when the watcher is detached.
603 if (sys.platform != 'win32' and
604 isinstance(self, SubprocessFastWatcherTests)):
605 with warnings.catch_warnings():
606 warnings.simplefilter('ignore', DeprecationWarning)
607 asyncio.get_child_watcher()._callbacks.clear()
608
609 async def _test_popen_error(self, stdin):
610 if sys.platform == 'win32':
611 target = 'asyncio.windows_utils.Popen'
612 else:
613 target = 'subprocess.Popen'
614 with mock.patch(target) as popen:
615 exc = ZeroDivisionError
616 popen.side_effect = exc
617
618 with warnings.catch_warnings(record=True) as warns:
619 with self.assertRaises(exc):
620 await asyncio.create_subprocess_exec(
621 sys.executable,
622 '-c',
623 'pass',
624 stdin=stdin
625 )
626 self.assertEqual(warns, [])
627
628 def test_popen_error(self):
629 # Issue #24763: check that the subprocess transport is closed
630 # when BaseSubprocessTransport fails
631 self.loop.run_until_complete(self._test_popen_error(stdin=None))
632
633 def test_popen_error_with_stdin_pipe(self):
634 # Issue #35721: check that newly created socket pair is closed when
635 # Popen fails
636 self.loop.run_until_complete(
637 self._test_popen_error(stdin=subprocess.PIPE))
638
639 def test_read_stdout_after_process_exit(self):
640
641 async def execute():
642 code = '\n'.join(['import sys',
643 'for _ in range(64):',
644 ' sys.stdout.write("x" * 4096)',
645 'sys.stdout.flush()',
646 'sys.exit(1)'])
647
648 process = await asyncio.create_subprocess_exec(
649 sys.executable, '-c', code,
650 stdout=asyncio.subprocess.PIPE,
651 )
652
653 while True:
654 data = await process.stdout.read(65536)
655 if data:
656 await asyncio.sleep(0.3)
657 else:
658 break
659
660 self.loop.run_until_complete(execute())
661
662 def test_create_subprocess_exec_text_mode_fails(self):
663 async def execute():
664 with self.assertRaises(ValueError):
665 await subprocess.create_subprocess_exec(sys.executable,
666 text=True)
667
668 with self.assertRaises(ValueError):
669 await subprocess.create_subprocess_exec(sys.executable,
670 encoding="utf-8")
671
672 with self.assertRaises(ValueError):
673 await subprocess.create_subprocess_exec(sys.executable,
674 errors="strict")
675
676 self.loop.run_until_complete(execute())
677
678 def test_create_subprocess_shell_text_mode_fails(self):
679
680 async def execute():
681 with self.assertRaises(ValueError):
682 await subprocess.create_subprocess_shell(sys.executable,
683 text=True)
684
685 with self.assertRaises(ValueError):
686 await subprocess.create_subprocess_shell(sys.executable,
687 encoding="utf-8")
688
689 with self.assertRaises(ValueError):
690 await subprocess.create_subprocess_shell(sys.executable,
691 errors="strict")
692
693 self.loop.run_until_complete(execute())
694
695 def test_create_subprocess_exec_with_path(self):
696 async def execute():
697 p = await subprocess.create_subprocess_exec(
698 os_helper.FakePath(sys.executable), '-c', 'pass')
699 await p.wait()
700 p = await subprocess.create_subprocess_exec(
701 sys.executable, '-c', 'pass', os_helper.FakePath('.'))
702 await p.wait()
703
704 self.assertIsNone(self.loop.run_until_complete(execute()))
705
706 async def check_stdout_output(self, coro, output):
707 proc = await coro
708 stdout, _ = await proc.communicate()
709 self.assertEqual(stdout, output)
710 self.assertEqual(proc.returncode, 0)
711 task = asyncio.create_task(proc.wait())
712 await asyncio.sleep(0)
713 self.assertEqual(task.result(), proc.returncode)
714
715 def test_create_subprocess_env_shell(self) -> None:
716 async def main() -> None:
717 cmd = f'''{sys.executable} -c "import os, sys; sys.stdout.write(os.getenv('FOO'))"'''
718 env = os.environ.copy()
719 env["FOO"] = "bar"
720 proc = await asyncio.create_subprocess_shell(
721 cmd, env=env, stdout=subprocess.PIPE
722 )
723 return proc
724
725 self.loop.run_until_complete(self.check_stdout_output(main(), b'bar'))
726
727 def test_create_subprocess_env_exec(self) -> None:
728 async def main() -> None:
729 cmd = [sys.executable, "-c",
730 "import os, sys; sys.stdout.write(os.getenv('FOO'))"]
731 env = os.environ.copy()
732 env["FOO"] = "baz"
733 proc = await asyncio.create_subprocess_exec(
734 *cmd, env=env, stdout=subprocess.PIPE
735 )
736 return proc
737
738 self.loop.run_until_complete(self.check_stdout_output(main(), b'baz'))
739
740
741 def test_subprocess_concurrent_wait(self) -> None:
742 async def main() -> None:
743 proc = await asyncio.create_subprocess_exec(
744 *PROGRAM_CAT,
745 stdin=subprocess.PIPE,
746 stdout=subprocess.PIPE,
747 )
748 stdout, _ = await proc.communicate(b'some data')
749 self.assertEqual(stdout, b"some data")
750 self.assertEqual(proc.returncode, 0)
751 self.assertEqual(await asyncio.gather(*[proc.wait() for _ in range(10)]),
752 [proc.returncode] * 10)
753
754 self.loop.run_until_complete(main())
755
756 def test_subprocess_consistent_callbacks(self):
757 events = []
758 class ESC[4;38;5;81mMyProtocol(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mSubprocessProtocol):
759 def __init__(self, exit_future: asyncio.Future) -> None:
760 self.exit_future = exit_future
761
762 def pipe_data_received(self, fd, data) -> None:
763 events.append(('pipe_data_received', fd, data))
764
765 def pipe_connection_lost(self, fd, exc) -> None:
766 events.append('pipe_connection_lost')
767
768 def process_exited(self) -> None:
769 events.append('process_exited')
770 self.exit_future.set_result(True)
771
772 async def main() -> None:
773 loop = asyncio.get_running_loop()
774 exit_future = asyncio.Future()
775 code = 'import sys; sys.stdout.write("stdout"); sys.stderr.write("stderr")'
776 transport, _ = await loop.subprocess_exec(lambda: MyProtocol(exit_future),
777 sys.executable, '-c', code, stdin=None)
778 await exit_future
779 transport.close()
780 self.assertEqual(events, [
781 ('pipe_data_received', 1, b'stdout'),
782 ('pipe_data_received', 2, b'stderr'),
783 'pipe_connection_lost',
784 'pipe_connection_lost',
785 'process_exited',
786 ])
787
788 self.loop.run_until_complete(main())
789
790 def test_subprocess_communicate_stdout(self):
791 # See https://github.com/python/cpython/issues/100133
792 async def get_command_stdout(cmd, *args):
793 proc = await asyncio.create_subprocess_exec(
794 cmd, *args, stdout=asyncio.subprocess.PIPE,
795 )
796 stdout, _ = await proc.communicate()
797 return stdout.decode().strip()
798
799 async def main():
800 outputs = [f'foo{i}' for i in range(10)]
801 res = await asyncio.gather(*[get_command_stdout(sys.executable, '-c',
802 f'print({out!r})') for out in outputs])
803 self.assertEqual(res, outputs)
804
805 self.loop.run_until_complete(main())
806
807
808 if sys.platform != 'win32':
809 # Unix
810 class ESC[4;38;5;81mSubprocessWatcherMixin(ESC[4;38;5;149mSubprocessMixin):
811
812 Watcher = None
813
814 def setUp(self):
815 super().setUp()
816 policy = asyncio.get_event_loop_policy()
817 self.loop = policy.new_event_loop()
818 self.set_event_loop(self.loop)
819
820 watcher = self._get_watcher()
821 watcher.attach_loop(self.loop)
822 with warnings.catch_warnings():
823 warnings.simplefilter('ignore', DeprecationWarning)
824 policy.set_child_watcher(watcher)
825
826 def tearDown(self):
827 super().tearDown()
828 policy = asyncio.get_event_loop_policy()
829 with warnings.catch_warnings():
830 warnings.simplefilter('ignore', DeprecationWarning)
831 watcher = policy.get_child_watcher()
832 policy.set_child_watcher(None)
833 watcher.attach_loop(None)
834 watcher.close()
835
836 class ESC[4;38;5;81mSubprocessThreadedWatcherTests(ESC[4;38;5;149mSubprocessWatcherMixin,
837 ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
838
839 def _get_watcher(self):
840 return unix_events.ThreadedChildWatcher()
841
842 class ESC[4;38;5;81mSubprocessSafeWatcherTests(ESC[4;38;5;149mSubprocessWatcherMixin,
843 ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
844
845 def _get_watcher(self):
846 with self.assertWarns(DeprecationWarning):
847 return unix_events.SafeChildWatcher()
848
849 class ESC[4;38;5;81mMultiLoopChildWatcherTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
850
851 def test_warns(self):
852 with self.assertWarns(DeprecationWarning):
853 unix_events.MultiLoopChildWatcher()
854
855 class ESC[4;38;5;81mSubprocessFastWatcherTests(ESC[4;38;5;149mSubprocessWatcherMixin,
856 ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
857
858 def _get_watcher(self):
859 with self.assertWarns(DeprecationWarning):
860 return unix_events.FastChildWatcher()
861
862 @unittest.skipUnless(
863 unix_events.can_use_pidfd(),
864 "operating system does not support pidfds",
865 )
866 class ESC[4;38;5;81mSubprocessPidfdWatcherTests(ESC[4;38;5;149mSubprocessWatcherMixin,
867 ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
868
869 def _get_watcher(self):
870 return unix_events.PidfdChildWatcher()
871
872
873 class ESC[4;38;5;81mGenericWatcherTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
874
875 def test_create_subprocess_fails_with_inactive_watcher(self):
876 watcher = mock.create_autospec(asyncio.AbstractChildWatcher)
877 watcher.is_active.return_value = False
878
879 async def execute():
880 asyncio.set_child_watcher(watcher)
881
882 with self.assertRaises(RuntimeError):
883 await subprocess.create_subprocess_exec(
884 os_helper.FakePath(sys.executable), '-c', 'pass')
885
886 watcher.add_child_handler.assert_not_called()
887
888 with asyncio.Runner(loop_factory=asyncio.new_event_loop) as runner:
889 with warnings.catch_warnings():
890 warnings.simplefilter('ignore', DeprecationWarning)
891 self.assertIsNone(runner.run(execute()))
892 self.assertListEqual(watcher.mock_calls, [
893 mock.call.__enter__(),
894 mock.call.is_active(),
895 mock.call.__exit__(RuntimeError, mock.ANY, mock.ANY),
896 ], watcher.mock_calls)
897
898
899 @unittest.skipUnless(
900 unix_events.can_use_pidfd(),
901 "operating system does not support pidfds",
902 )
903 def test_create_subprocess_with_pidfd(self):
904 async def in_thread():
905 proc = await asyncio.create_subprocess_exec(
906 *PROGRAM_CAT,
907 stdin=subprocess.PIPE,
908 stdout=subprocess.PIPE,
909 )
910 stdout, stderr = await proc.communicate(b"some data")
911 return proc.returncode, stdout
912
913 async def main():
914 # asyncio.Runner did not call asyncio.set_event_loop()
915 with self.assertRaises(RuntimeError):
916 asyncio.get_event_loop_policy().get_event_loop()
917 return await asyncio.to_thread(asyncio.run, in_thread())
918 with self.assertWarns(DeprecationWarning):
919 asyncio.set_child_watcher(asyncio.PidfdChildWatcher())
920 try:
921 with asyncio.Runner(loop_factory=asyncio.new_event_loop) as runner:
922 returncode, stdout = runner.run(main())
923 self.assertEqual(returncode, 0)
924 self.assertEqual(stdout, b'some data')
925 finally:
926 with self.assertWarns(DeprecationWarning):
927 asyncio.set_child_watcher(None)
928 else:
929 # Windows
930 class ESC[4;38;5;81mSubprocessProactorTests(ESC[4;38;5;149mSubprocessMixin, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
931
932 def setUp(self):
933 super().setUp()
934 self.loop = asyncio.ProactorEventLoop()
935 self.set_event_loop(self.loop)
936
937
938 if __name__ == '__main__':
939 unittest.main()