1 import os
2 import shutil
3 import signal
4 import sys
5 import textwrap
6 import unittest
7 import warnings
8 from unittest import mock
9
10 import asyncio
11 from asyncio import base_subprocess
12 from asyncio import subprocess
13 from test.test_asyncio import utils as test_utils
14 from test import support
15 from test.support import os_helper
16
17
18 if support.MS_WINDOWS:
19 import msvcrt
20 else:
21 from asyncio import unix_events
22
23
24 if support.check_sanitizer(address=True):
25 raise unittest.SkipTest("Exposes ASAN flakiness in GitHub CI")
26
27 # Program blocking
28 PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)']
29
30 # Program copying input to output
31 PROGRAM_CAT = [
32 sys.executable, '-c',
33 ';'.join(('import sys',
34 'data = sys.stdin.buffer.read()',
35 'sys.stdout.buffer.write(data)'))]
36
37
38 def tearDownModule():
39 asyncio.set_event_loop_policy(None)
40
41
42 class ESC[4;38;5;81mTestSubprocessTransport(ESC[4;38;5;149mbase_subprocessESC[4;38;5;149m.ESC[4;38;5;149mBaseSubprocessTransport):
43 def _start(self, *args, **kwargs):
44 self._proc = mock.Mock()
45 self._proc.stdin = None
46 self._proc.stdout = None
47 self._proc.stderr = None
48 self._proc.pid = -1
49
50
51 class ESC[4;38;5;81mSubprocessTransportTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
52 def setUp(self):
53 super().setUp()
54 self.loop = self.new_test_loop()
55 self.set_event_loop(self.loop)
56
57 def create_transport(self, waiter=None):
58 protocol = mock.Mock()
59 transport = TestSubprocessTransport(
60 self.loop, protocol, ['test'], False,
61 None, None, None, 0, waiter=waiter)
62 return (transport, protocol)
63
64 def test_proc_exited(self):
65 waiter = self.loop.create_future()
66 transport, protocol = self.create_transport(waiter)
67 transport._process_exited(6)
68 self.loop.run_until_complete(waiter)
69
70 self.assertEqual(transport.get_returncode(), 6)
71
72 self.assertTrue(protocol.connection_made.called)
73 self.assertTrue(protocol.process_exited.called)
74 self.assertTrue(protocol.connection_lost.called)
75 self.assertEqual(protocol.connection_lost.call_args[0], (None,))
76
77 self.assertFalse(transport.is_closing())
78 self.assertIsNone(transport._loop)
79 self.assertIsNone(transport._proc)
80 self.assertIsNone(transport._protocol)
81
82 # methods must raise ProcessLookupError if the process exited
83 self.assertRaises(ProcessLookupError,
84 transport.send_signal, signal.SIGTERM)
85 self.assertRaises(ProcessLookupError, transport.terminate)
86 self.assertRaises(ProcessLookupError, transport.kill)
87
88 transport.close()
89
90 def test_subprocess_repr(self):
91 waiter = self.loop.create_future()
92 transport, protocol = self.create_transport(waiter)
93 transport._process_exited(6)
94 self.loop.run_until_complete(waiter)
95
96 self.assertEqual(
97 repr(transport),
98 "<TestSubprocessTransport pid=-1 returncode=6>"
99 )
100 transport._returncode = None
101 self.assertEqual(
102 repr(transport),
103 "<TestSubprocessTransport pid=-1 running>"
104 )
105 transport._pid = None
106 transport._returncode = None
107 self.assertEqual(
108 repr(transport),
109 "<TestSubprocessTransport not started>"
110 )
111 transport.close()
112
113
114 class ESC[4;38;5;81mSubprocessMixin:
115
116 def test_stdin_stdout(self):
117 args = PROGRAM_CAT
118
119 async def run(data):
120 proc = await asyncio.create_subprocess_exec(
121 *args,
122 stdin=subprocess.PIPE,
123 stdout=subprocess.PIPE,
124 )
125
126 # feed data
127 proc.stdin.write(data)
128 await proc.stdin.drain()
129 proc.stdin.close()
130
131 # get output and exitcode
132 data = await proc.stdout.read()
133 exitcode = await proc.wait()
134 return (exitcode, data)
135
136 task = run(b'some data')
137 task = asyncio.wait_for(task, 60.0)
138 exitcode, stdout = self.loop.run_until_complete(task)
139 self.assertEqual(exitcode, 0)
140 self.assertEqual(stdout, b'some data')
141
142 def test_communicate(self):
143 args = PROGRAM_CAT
144
145 async def run(data):
146 proc = await asyncio.create_subprocess_exec(
147 *args,
148 stdin=subprocess.PIPE,
149 stdout=subprocess.PIPE,
150 )
151 stdout, stderr = await proc.communicate(data)
152 return proc.returncode, stdout
153
154 task = run(b'some data')
155 task = asyncio.wait_for(task, support.LONG_TIMEOUT)
156 exitcode, stdout = self.loop.run_until_complete(task)
157 self.assertEqual(exitcode, 0)
158 self.assertEqual(stdout, b'some data')
159
160 def test_shell(self):
161 proc = self.loop.run_until_complete(
162 asyncio.create_subprocess_shell('exit 7')
163 )
164 exitcode = self.loop.run_until_complete(proc.wait())
165 self.assertEqual(exitcode, 7)
166
167 def test_start_new_session(self):
168 # start the new process in a new session
169 proc = self.loop.run_until_complete(
170 asyncio.create_subprocess_shell(
171 'exit 8',
172 start_new_session=True,
173 )
174 )
175 exitcode = self.loop.run_until_complete(proc.wait())
176 self.assertEqual(exitcode, 8)
177
178 def test_kill(self):
179 args = PROGRAM_BLOCKED
180 proc = self.loop.run_until_complete(
181 asyncio.create_subprocess_exec(*args)
182 )
183 proc.kill()
184 returncode = self.loop.run_until_complete(proc.wait())
185 if sys.platform == 'win32':
186 self.assertIsInstance(returncode, int)
187 # expect 1 but sometimes get 0
188 else:
189 self.assertEqual(-signal.SIGKILL, returncode)
190
191 def test_kill_issue43884(self):
192 if sys.platform == 'win32':
193 blocking_shell_command = f'{sys.executable} -c "import time; time.sleep(2)"'
194 else:
195 blocking_shell_command = 'sleep 1; sleep 1'
196 creationflags = 0
197 if sys.platform == 'win32':
198 from subprocess import CREATE_NEW_PROCESS_GROUP
199 # On windows create a new process group so that killing process
200 # kills the process and all its children.
201 creationflags = CREATE_NEW_PROCESS_GROUP
202 proc = self.loop.run_until_complete(
203 asyncio.create_subprocess_shell(blocking_shell_command, stdout=asyncio.subprocess.PIPE,
204 creationflags=creationflags)
205 )
206 self.loop.run_until_complete(asyncio.sleep(1))
207 if sys.platform == 'win32':
208 proc.send_signal(signal.CTRL_BREAK_EVENT)
209 # On windows it is an alias of terminate which sets the return code
210 proc.kill()
211 returncode = self.loop.run_until_complete(proc.wait())
212 if sys.platform == 'win32':
213 self.assertIsInstance(returncode, int)
214 # expect 1 but sometimes get 0
215 else:
216 self.assertEqual(-signal.SIGKILL, returncode)
217
218 def test_terminate(self):
219 args = PROGRAM_BLOCKED
220 proc = self.loop.run_until_complete(
221 asyncio.create_subprocess_exec(*args)
222 )
223 proc.terminate()
224 returncode = self.loop.run_until_complete(proc.wait())
225 if sys.platform == 'win32':
226 self.assertIsInstance(returncode, int)
227 # expect 1 but sometimes get 0
228 else:
229 self.assertEqual(-signal.SIGTERM, returncode)
230
231 @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
232 def test_send_signal(self):
233 # bpo-31034: Make sure that we get the default signal handler (killing
234 # the process). The parent process may have decided to ignore SIGHUP,
235 # and signal handlers are inherited.
236 old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL)
237 try:
238 code = 'import time; print("sleeping", flush=True); time.sleep(3600)'
239 args = [sys.executable, '-c', code]
240 proc = self.loop.run_until_complete(
241 asyncio.create_subprocess_exec(
242 *args,
243 stdout=subprocess.PIPE,
244 )
245 )
246
247 async def send_signal(proc):
248 # basic synchronization to wait until the program is sleeping
249 line = await proc.stdout.readline()
250 self.assertEqual(line, b'sleeping\n')
251
252 proc.send_signal(signal.SIGHUP)
253 returncode = await proc.wait()
254 return returncode
255
256 returncode = self.loop.run_until_complete(send_signal(proc))
257 self.assertEqual(-signal.SIGHUP, returncode)
258 finally:
259 signal.signal(signal.SIGHUP, old_handler)
260
261 def test_stdin_broken_pipe(self):
262 # buffer large enough to feed the whole pipe buffer
263 large_data = b'x' * support.PIPE_MAX_SIZE
264
265 rfd, wfd = os.pipe()
266 self.addCleanup(os.close, rfd)
267 self.addCleanup(os.close, wfd)
268 if support.MS_WINDOWS:
269 handle = msvcrt.get_osfhandle(rfd)
270 os.set_handle_inheritable(handle, True)
271 code = textwrap.dedent(f'''
272 import os, msvcrt
273 handle = {handle}
274 fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
275 os.read(fd, 1)
276 ''')
277 from subprocess import STARTUPINFO
278 startupinfo = STARTUPINFO()
279 startupinfo.lpAttributeList = {"handle_list": [handle]}
280 kwargs = dict(startupinfo=startupinfo)
281 else:
282 code = f'import os; fd = {rfd}; os.read(fd, 1)'
283 kwargs = dict(pass_fds=(rfd,))
284
285 # the program ends before the stdin can be fed
286 proc = self.loop.run_until_complete(
287 asyncio.create_subprocess_exec(
288 sys.executable, '-c', code,
289 stdin=subprocess.PIPE,
290 **kwargs
291 )
292 )
293
294 async def write_stdin(proc, data):
295 proc.stdin.write(data)
296 # Only exit the child process once the write buffer is filled
297 os.write(wfd, b'go')
298 await proc.stdin.drain()
299
300 coro = write_stdin(proc, large_data)
301 # drain() must raise BrokenPipeError or ConnectionResetError
302 with test_utils.disable_logger():
303 self.assertRaises((BrokenPipeError, ConnectionResetError),
304 self.loop.run_until_complete, coro)
305 self.loop.run_until_complete(proc.wait())
306
307 def test_communicate_ignore_broken_pipe(self):
308 # buffer large enough to feed the whole pipe buffer
309 large_data = b'x' * support.PIPE_MAX_SIZE
310
311 # the program ends before the stdin can be fed
312 proc = self.loop.run_until_complete(
313 asyncio.create_subprocess_exec(
314 sys.executable, '-c', 'pass',
315 stdin=subprocess.PIPE,
316 )
317 )
318
319 # communicate() must ignore BrokenPipeError when feeding stdin
320 self.loop.set_exception_handler(lambda loop, msg: None)
321 self.loop.run_until_complete(proc.communicate(large_data))
322 self.loop.run_until_complete(proc.wait())
323
324 def test_pause_reading(self):
325 limit = 10
326 size = (limit * 2 + 1)
327
328 async def test_pause_reading():
329 code = '\n'.join((
330 'import sys',
331 'sys.stdout.write("x" * %s)' % size,
332 'sys.stdout.flush()',
333 ))
334
335 connect_read_pipe = self.loop.connect_read_pipe
336
337 async def connect_read_pipe_mock(*args, **kw):
338 transport, protocol = await connect_read_pipe(*args, **kw)
339 transport.pause_reading = mock.Mock()
340 transport.resume_reading = mock.Mock()
341 return (transport, protocol)
342
343 self.loop.connect_read_pipe = connect_read_pipe_mock
344
345 proc = await asyncio.create_subprocess_exec(
346 sys.executable, '-c', code,
347 stdin=asyncio.subprocess.PIPE,
348 stdout=asyncio.subprocess.PIPE,
349 limit=limit,
350 )
351 stdout_transport = proc._transport.get_pipe_transport(1)
352
353 stdout, stderr = await proc.communicate()
354
355 # The child process produced more than limit bytes of output,
356 # the stream reader transport should pause the protocol to not
357 # allocate too much memory.
358 return (stdout, stdout_transport)
359
360 # Issue #22685: Ensure that the stream reader pauses the protocol
361 # when the child process produces too much data
362 stdout, transport = self.loop.run_until_complete(test_pause_reading())
363
364 self.assertEqual(stdout, b'x' * size)
365 self.assertTrue(transport.pause_reading.called)
366 self.assertTrue(transport.resume_reading.called)
367
368 def test_stdin_not_inheritable(self):
369 # asyncio issue #209: stdin must not be inheritable, otherwise
370 # the Process.communicate() hangs
371 async def len_message(message):
372 code = 'import sys; data = sys.stdin.read(); print(len(data))'
373 proc = await asyncio.create_subprocess_exec(
374 sys.executable, '-c', code,
375 stdin=asyncio.subprocess.PIPE,
376 stdout=asyncio.subprocess.PIPE,
377 stderr=asyncio.subprocess.PIPE,
378 close_fds=False,
379 )
380 stdout, stderr = await proc.communicate(message)
381 exitcode = await proc.wait()
382 return (stdout, exitcode)
383
384 output, exitcode = self.loop.run_until_complete(len_message(b'abc'))
385 self.assertEqual(output.rstrip(), b'3')
386 self.assertEqual(exitcode, 0)
387
388 def test_empty_input(self):
389
390 async def empty_input():
391 code = 'import sys; data = sys.stdin.read(); print(len(data))'
392 proc = await asyncio.create_subprocess_exec(
393 sys.executable, '-c', code,
394 stdin=asyncio.subprocess.PIPE,
395 stdout=asyncio.subprocess.PIPE,
396 stderr=asyncio.subprocess.PIPE,
397 close_fds=False,
398 )
399 stdout, stderr = await proc.communicate(b'')
400 exitcode = await proc.wait()
401 return (stdout, exitcode)
402
403 output, exitcode = self.loop.run_until_complete(empty_input())
404 self.assertEqual(output.rstrip(), b'0')
405 self.assertEqual(exitcode, 0)
406
407 def test_devnull_input(self):
408
409 async def empty_input():
410 code = 'import sys; data = sys.stdin.read(); print(len(data))'
411 proc = await asyncio.create_subprocess_exec(
412 sys.executable, '-c', code,
413 stdin=asyncio.subprocess.DEVNULL,
414 stdout=asyncio.subprocess.PIPE,
415 stderr=asyncio.subprocess.PIPE,
416 close_fds=False,
417 )
418 stdout, stderr = await proc.communicate()
419 exitcode = await proc.wait()
420 return (stdout, exitcode)
421
422 output, exitcode = self.loop.run_until_complete(empty_input())
423 self.assertEqual(output.rstrip(), b'0')
424 self.assertEqual(exitcode, 0)
425
426 def test_devnull_output(self):
427
428 async def empty_output():
429 code = 'import sys; data = sys.stdin.read(); print(len(data))'
430 proc = await asyncio.create_subprocess_exec(
431 sys.executable, '-c', code,
432 stdin=asyncio.subprocess.PIPE,
433 stdout=asyncio.subprocess.DEVNULL,
434 stderr=asyncio.subprocess.PIPE,
435 close_fds=False,
436 )
437 stdout, stderr = await proc.communicate(b"abc")
438 exitcode = await proc.wait()
439 return (stdout, exitcode)
440
441 output, exitcode = self.loop.run_until_complete(empty_output())
442 self.assertEqual(output, None)
443 self.assertEqual(exitcode, 0)
444
445 def test_devnull_error(self):
446
447 async def empty_error():
448 code = 'import sys; data = sys.stdin.read(); print(len(data))'
449 proc = await asyncio.create_subprocess_exec(
450 sys.executable, '-c', code,
451 stdin=asyncio.subprocess.PIPE,
452 stdout=asyncio.subprocess.PIPE,
453 stderr=asyncio.subprocess.DEVNULL,
454 close_fds=False,
455 )
456 stdout, stderr = await proc.communicate(b"abc")
457 exitcode = await proc.wait()
458 return (stderr, exitcode)
459
460 output, exitcode = self.loop.run_until_complete(empty_error())
461 self.assertEqual(output, None)
462 self.assertEqual(exitcode, 0)
463
464 @unittest.skipIf(sys.platform != 'linux', "Don't have /dev/stdin")
465 def test_devstdin_input(self):
466
467 async def devstdin_input(message):
468 code = 'file = open("/dev/stdin"); data = file.read(); print(len(data))'
469 proc = await asyncio.create_subprocess_exec(
470 sys.executable, '-c', code,
471 stdin=asyncio.subprocess.PIPE,
472 stdout=asyncio.subprocess.PIPE,
473 stderr=asyncio.subprocess.PIPE,
474 close_fds=False,
475 )
476 stdout, stderr = await proc.communicate(message)
477 exitcode = await proc.wait()
478 return (stdout, exitcode)
479
480 output, exitcode = self.loop.run_until_complete(devstdin_input(b'abc'))
481 self.assertEqual(output.rstrip(), b'3')
482 self.assertEqual(exitcode, 0)
483
484 def test_cancel_process_wait(self):
485 # Issue #23140: cancel Process.wait()
486
487 async def cancel_wait():
488 proc = await asyncio.create_subprocess_exec(*PROGRAM_BLOCKED)
489
490 # Create an internal future waiting on the process exit
491 task = self.loop.create_task(proc.wait())
492 self.loop.call_soon(task.cancel)
493 try:
494 await task
495 except asyncio.CancelledError:
496 pass
497
498 # Cancel the future
499 task.cancel()
500
501 # Kill the process and wait until it is done
502 proc.kill()
503 await proc.wait()
504
505 self.loop.run_until_complete(cancel_wait())
506
507 def test_cancel_make_subprocess_transport_exec(self):
508
509 async def cancel_make_transport():
510 coro = asyncio.create_subprocess_exec(*PROGRAM_BLOCKED)
511 task = self.loop.create_task(coro)
512
513 self.loop.call_soon(task.cancel)
514 try:
515 await task
516 except asyncio.CancelledError:
517 pass
518
519 # ignore the log:
520 # "Exception during subprocess creation, kill the subprocess"
521 with test_utils.disable_logger():
522 self.loop.run_until_complete(cancel_make_transport())
523
524 def test_cancel_post_init(self):
525
526 async def cancel_make_transport():
527 coro = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
528 *PROGRAM_BLOCKED)
529 task = self.loop.create_task(coro)
530
531 self.loop.call_soon(task.cancel)
532 try:
533 await task
534 except asyncio.CancelledError:
535 pass
536
537 # ignore the log:
538 # "Exception during subprocess creation, kill the subprocess"
539 with test_utils.disable_logger():
540 self.loop.run_until_complete(cancel_make_transport())
541 test_utils.run_briefly(self.loop)
542
543 def test_close_kill_running(self):
544
545 async def kill_running():
546 create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
547 *PROGRAM_BLOCKED)
548 transport, protocol = await create
549
550 kill_called = False
551 def kill():
552 nonlocal kill_called
553 kill_called = True
554 orig_kill()
555
556 proc = transport.get_extra_info('subprocess')
557 orig_kill = proc.kill
558 proc.kill = kill
559 returncode = transport.get_returncode()
560 transport.close()
561 await asyncio.wait_for(transport._wait(), 5)
562 return (returncode, kill_called)
563
564 # Ignore "Close running child process: kill ..." log
565 with test_utils.disable_logger():
566 try:
567 returncode, killed = self.loop.run_until_complete(
568 kill_running()
569 )
570 except asyncio.TimeoutError:
571 self.skipTest(
572 "Timeout failure on waiting for subprocess stopping"
573 )
574 self.assertIsNone(returncode)
575
576 # transport.close() must kill the process if it is still running
577 self.assertTrue(killed)
578 test_utils.run_briefly(self.loop)
579
580 def test_close_dont_kill_finished(self):
581
582 async def kill_running():
583 create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
584 *PROGRAM_BLOCKED)
585 transport, protocol = await create
586 proc = transport.get_extra_info('subprocess')
587
588 # kill the process (but asyncio is not notified immediately)
589 proc.kill()
590 proc.wait()
591
592 proc.kill = mock.Mock()
593 proc_returncode = proc.poll()
594 transport_returncode = transport.get_returncode()
595 transport.close()
596 return (proc_returncode, transport_returncode, proc.kill.called)
597
598 # Ignore "Unknown child process pid ..." log of SafeChildWatcher,
599 # emitted because the test already consumes the exit status:
600 # proc.wait()
601 with test_utils.disable_logger():
602 result = self.loop.run_until_complete(kill_running())
603 test_utils.run_briefly(self.loop)
604
605 proc_returncode, transport_return_code, killed = result
606
607 self.assertIsNotNone(proc_returncode)
608 self.assertIsNone(transport_return_code)
609
610 # transport.close() must not kill the process if it finished, even if
611 # the transport was not notified yet
612 self.assertFalse(killed)
613
614 # Unlike SafeChildWatcher, FastChildWatcher does not pop the
615 # callbacks if waitpid() is called elsewhere. Let's clear them
616 # manually to avoid a warning when the watcher is detached.
617 if (sys.platform != 'win32' and
618 isinstance(self, SubprocessFastWatcherTests)):
619 asyncio.get_child_watcher()._callbacks.clear()
620
621 async def _test_popen_error(self, stdin):
622 if sys.platform == 'win32':
623 target = 'asyncio.windows_utils.Popen'
624 else:
625 target = 'subprocess.Popen'
626 with mock.patch(target) as popen:
627 exc = ZeroDivisionError
628 popen.side_effect = exc
629
630 with warnings.catch_warnings(record=True) as warns:
631 with self.assertRaises(exc):
632 await asyncio.create_subprocess_exec(
633 sys.executable,
634 '-c',
635 'pass',
636 stdin=stdin
637 )
638 self.assertEqual(warns, [])
639
640 def test_popen_error(self):
641 # Issue #24763: check that the subprocess transport is closed
642 # when BaseSubprocessTransport fails
643 self.loop.run_until_complete(self._test_popen_error(stdin=None))
644
645 def test_popen_error_with_stdin_pipe(self):
646 # Issue #35721: check that newly created socket pair is closed when
647 # Popen fails
648 self.loop.run_until_complete(
649 self._test_popen_error(stdin=subprocess.PIPE))
650
651 def test_read_stdout_after_process_exit(self):
652
653 async def execute():
654 code = '\n'.join(['import sys',
655 'for _ in range(64):',
656 ' sys.stdout.write("x" * 4096)',
657 'sys.stdout.flush()',
658 'sys.exit(1)'])
659
660 process = await asyncio.create_subprocess_exec(
661 sys.executable, '-c', code,
662 stdout=asyncio.subprocess.PIPE,
663 )
664
665 while True:
666 data = await process.stdout.read(65536)
667 if data:
668 await asyncio.sleep(0.3)
669 else:
670 break
671
672 self.loop.run_until_complete(execute())
673
674 def test_create_subprocess_exec_text_mode_fails(self):
675 async def execute():
676 with self.assertRaises(ValueError):
677 await subprocess.create_subprocess_exec(sys.executable,
678 text=True)
679
680 with self.assertRaises(ValueError):
681 await subprocess.create_subprocess_exec(sys.executable,
682 encoding="utf-8")
683
684 with self.assertRaises(ValueError):
685 await subprocess.create_subprocess_exec(sys.executable,
686 errors="strict")
687
688 self.loop.run_until_complete(execute())
689
690 def test_create_subprocess_shell_text_mode_fails(self):
691
692 async def execute():
693 with self.assertRaises(ValueError):
694 await subprocess.create_subprocess_shell(sys.executable,
695 text=True)
696
697 with self.assertRaises(ValueError):
698 await subprocess.create_subprocess_shell(sys.executable,
699 encoding="utf-8")
700
701 with self.assertRaises(ValueError):
702 await subprocess.create_subprocess_shell(sys.executable,
703 errors="strict")
704
705 self.loop.run_until_complete(execute())
706
707 def test_create_subprocess_exec_with_path(self):
708 async def execute():
709 p = await subprocess.create_subprocess_exec(
710 os_helper.FakePath(sys.executable), '-c', 'pass')
711 await p.wait()
712 p = await subprocess.create_subprocess_exec(
713 sys.executable, '-c', 'pass', os_helper.FakePath('.'))
714 await p.wait()
715
716 self.assertIsNone(self.loop.run_until_complete(execute()))
717
718 def test_subprocess_communicate_stdout(self):
719 # See https://github.com/python/cpython/issues/100133
720 async def get_command_stdout(cmd, *args):
721 proc = await asyncio.create_subprocess_exec(
722 cmd, *args, stdout=asyncio.subprocess.PIPE,
723 )
724 stdout, _ = await proc.communicate()
725 return stdout.decode().strip()
726
727 async def main():
728 outputs = [f'foo{i}' for i in range(10)]
729 res = await asyncio.gather(*[get_command_stdout(sys.executable, '-c',
730 f'print({out!r})') for out in outputs])
731 self.assertEqual(res, outputs)
732
733 self.loop.run_until_complete(main())
734
735
736 if sys.platform != 'win32':
737 # Unix
738 class ESC[4;38;5;81mSubprocessWatcherMixin(ESC[4;38;5;149mSubprocessMixin):
739
740 Watcher = None
741
742 def setUp(self):
743 super().setUp()
744 policy = asyncio.get_event_loop_policy()
745 self.loop = policy.new_event_loop()
746 self.set_event_loop(self.loop)
747
748 watcher = self.Watcher()
749 watcher.attach_loop(self.loop)
750 policy.set_child_watcher(watcher)
751
752 def tearDown(self):
753 super().tearDown()
754 policy = asyncio.get_event_loop_policy()
755 watcher = policy.get_child_watcher()
756 policy.set_child_watcher(None)
757 watcher.attach_loop(None)
758 watcher.close()
759
760 class ESC[4;38;5;81mSubprocessThreadedWatcherTests(ESC[4;38;5;149mSubprocessWatcherMixin,
761 ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
762
763 Watcher = unix_events.ThreadedChildWatcher
764
765 @unittest.skip("bpo-38323: MultiLoopChildWatcher has a race condition \
766 and these tests can hang the test suite")
767 class ESC[4;38;5;81mSubprocessMultiLoopWatcherTests(ESC[4;38;5;149mSubprocessWatcherMixin,
768 ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
769
770 Watcher = unix_events.MultiLoopChildWatcher
771
772 class ESC[4;38;5;81mSubprocessSafeWatcherTests(ESC[4;38;5;149mSubprocessWatcherMixin,
773 ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
774
775 Watcher = unix_events.SafeChildWatcher
776
777 class ESC[4;38;5;81mSubprocessFastWatcherTests(ESC[4;38;5;149mSubprocessWatcherMixin,
778 ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
779
780 Watcher = unix_events.FastChildWatcher
781
782 def has_pidfd_support():
783 if not hasattr(os, 'pidfd_open'):
784 return False
785 try:
786 os.close(os.pidfd_open(os.getpid()))
787 except OSError:
788 return False
789 return True
790
791 @unittest.skipUnless(
792 has_pidfd_support(),
793 "operating system does not support pidfds",
794 )
795 class ESC[4;38;5;81mSubprocessPidfdWatcherTests(ESC[4;38;5;149mSubprocessWatcherMixin,
796 ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
797 Watcher = unix_events.PidfdChildWatcher
798
799
800 class ESC[4;38;5;81mGenericWatcherTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
801
802 def test_create_subprocess_fails_with_inactive_watcher(self):
803 watcher = mock.create_autospec(
804 asyncio.AbstractChildWatcher,
805 **{"__enter__.return_value.is_active.return_value": False}
806 )
807
808 async def execute():
809 asyncio.set_child_watcher(watcher)
810
811 with self.assertRaises(RuntimeError):
812 await subprocess.create_subprocess_exec(
813 os_helper.FakePath(sys.executable), '-c', 'pass')
814
815 watcher.add_child_handler.assert_not_called()
816
817 with asyncio.Runner(loop_factory=asyncio.new_event_loop) as runner:
818 self.assertIsNone(runner.run(execute()))
819 self.assertListEqual(watcher.mock_calls, [
820 mock.call.__enter__(),
821 mock.call.__enter__().is_active(),
822 mock.call.__exit__(RuntimeError, mock.ANY, mock.ANY),
823 ])
824
825 else:
826 # Windows
827 class ESC[4;38;5;81mSubprocessProactorTests(ESC[4;38;5;149mSubprocessMixin, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
828
829 def setUp(self):
830 super().setUp()
831 self.loop = asyncio.ProactorEventLoop()
832 self.set_event_loop(self.loop)
833
834
835 if __name__ == '__main__':
836 unittest.main()