(root)/
Python-3.11.7/
Lib/
test/
test_asyncio/
test_subprocess.py
       1  import os
       2  import shutil
       3  import signal
       4  import sys
       5  import textwrap
       6  import unittest
       7  import warnings
       8  from unittest import mock
       9  
      10  import asyncio
      11  from asyncio import base_subprocess
      12  from asyncio import subprocess
      13  from test.test_asyncio import utils as test_utils
      14  from test import support
      15  from test.support import os_helper
      16  
      17  
      18  if support.MS_WINDOWS:
      19      import msvcrt
      20  else:
      21      from asyncio import unix_events
      22  
      23  
      24  if support.check_sanitizer(address=True):
      25      raise unittest.SkipTest("Exposes ASAN flakiness in GitHub CI")
      26  
      27  # Program blocking
      28  PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)']
      29  
      30  # Program copying input to output
      31  PROGRAM_CAT = [
      32      sys.executable, '-c',
      33      ';'.join(('import sys',
      34                'data = sys.stdin.buffer.read()',
      35                'sys.stdout.buffer.write(data)'))]
      36  
      37  
      38  def tearDownModule():
      39      asyncio.set_event_loop_policy(None)
      40  
      41  
      42  class ESC[4;38;5;81mTestSubprocessTransport(ESC[4;38;5;149mbase_subprocessESC[4;38;5;149m.ESC[4;38;5;149mBaseSubprocessTransport):
      43      def _start(self, *args, **kwargs):
      44          self._proc = mock.Mock()
      45          self._proc.stdin = None
      46          self._proc.stdout = None
      47          self._proc.stderr = None
      48          self._proc.pid = -1
      49  
      50  
      51  class ESC[4;38;5;81mSubprocessTransportTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      52      def setUp(self):
      53          super().setUp()
      54          self.loop = self.new_test_loop()
      55          self.set_event_loop(self.loop)
      56  
      57      def create_transport(self, waiter=None):
      58          protocol = mock.Mock()
      59          transport = TestSubprocessTransport(
      60                          self.loop, protocol, ['test'], False,
      61                          None, None, None, 0, waiter=waiter)
      62          return (transport, protocol)
      63  
      64      def test_proc_exited(self):
      65          waiter = self.loop.create_future()
      66          transport, protocol = self.create_transport(waiter)
      67          transport._process_exited(6)
      68          self.loop.run_until_complete(waiter)
      69  
      70          self.assertEqual(transport.get_returncode(), 6)
      71  
      72          self.assertTrue(protocol.connection_made.called)
      73          self.assertTrue(protocol.process_exited.called)
      74          self.assertTrue(protocol.connection_lost.called)
      75          self.assertEqual(protocol.connection_lost.call_args[0], (None,))
      76  
      77          self.assertFalse(transport.is_closing())
      78          self.assertIsNone(transport._loop)
      79          self.assertIsNone(transport._proc)
      80          self.assertIsNone(transport._protocol)
      81  
      82          # methods must raise ProcessLookupError if the process exited
      83          self.assertRaises(ProcessLookupError,
      84                            transport.send_signal, signal.SIGTERM)
      85          self.assertRaises(ProcessLookupError, transport.terminate)
      86          self.assertRaises(ProcessLookupError, transport.kill)
      87  
      88          transport.close()
      89  
      90      def test_subprocess_repr(self):
      91          waiter = self.loop.create_future()
      92          transport, protocol = self.create_transport(waiter)
      93          transport._process_exited(6)
      94          self.loop.run_until_complete(waiter)
      95  
      96          self.assertEqual(
      97              repr(transport),
      98              "<TestSubprocessTransport pid=-1 returncode=6>"
      99          )
     100          transport._returncode = None
     101          self.assertEqual(
     102              repr(transport),
     103              "<TestSubprocessTransport pid=-1 running>"
     104          )
     105          transport._pid = None
     106          transport._returncode = None
     107          self.assertEqual(
     108              repr(transport),
     109              "<TestSubprocessTransport not started>"
     110          )
     111          transport.close()
     112  
     113  
     114  class ESC[4;38;5;81mSubprocessMixin:
     115  
     116      def test_stdin_stdout(self):
     117          args = PROGRAM_CAT
     118  
     119          async def run(data):
     120              proc = await asyncio.create_subprocess_exec(
     121                  *args,
     122                  stdin=subprocess.PIPE,
     123                  stdout=subprocess.PIPE,
     124              )
     125  
     126              # feed data
     127              proc.stdin.write(data)
     128              await proc.stdin.drain()
     129              proc.stdin.close()
     130  
     131              # get output and exitcode
     132              data = await proc.stdout.read()
     133              exitcode = await proc.wait()
     134              return (exitcode, data)
     135  
     136          task = run(b'some data')
     137          task = asyncio.wait_for(task, 60.0)
     138          exitcode, stdout = self.loop.run_until_complete(task)
     139          self.assertEqual(exitcode, 0)
     140          self.assertEqual(stdout, b'some data')
     141  
     142      def test_communicate(self):
     143          args = PROGRAM_CAT
     144  
     145          async def run(data):
     146              proc = await asyncio.create_subprocess_exec(
     147                  *args,
     148                  stdin=subprocess.PIPE,
     149                  stdout=subprocess.PIPE,
     150              )
     151              stdout, stderr = await proc.communicate(data)
     152              return proc.returncode, stdout
     153  
     154          task = run(b'some data')
     155          task = asyncio.wait_for(task, support.LONG_TIMEOUT)
     156          exitcode, stdout = self.loop.run_until_complete(task)
     157          self.assertEqual(exitcode, 0)
     158          self.assertEqual(stdout, b'some data')
     159  
     160      def test_shell(self):
     161          proc = self.loop.run_until_complete(
     162              asyncio.create_subprocess_shell('exit 7')
     163          )
     164          exitcode = self.loop.run_until_complete(proc.wait())
     165          self.assertEqual(exitcode, 7)
     166  
     167      def test_start_new_session(self):
     168          # start the new process in a new session
     169          proc = self.loop.run_until_complete(
     170              asyncio.create_subprocess_shell(
     171                  'exit 8',
     172                  start_new_session=True,
     173              )
     174          )
     175          exitcode = self.loop.run_until_complete(proc.wait())
     176          self.assertEqual(exitcode, 8)
     177  
     178      def test_kill(self):
     179          args = PROGRAM_BLOCKED
     180          proc = self.loop.run_until_complete(
     181              asyncio.create_subprocess_exec(*args)
     182          )
     183          proc.kill()
     184          returncode = self.loop.run_until_complete(proc.wait())
     185          if sys.platform == 'win32':
     186              self.assertIsInstance(returncode, int)
     187              # expect 1 but sometimes get 0
     188          else:
     189              self.assertEqual(-signal.SIGKILL, returncode)
     190  
     191      def test_kill_issue43884(self):
     192          if sys.platform == 'win32':
     193              blocking_shell_command = f'{sys.executable} -c "import time; time.sleep(2)"'
     194          else:
     195              blocking_shell_command = 'sleep 1; sleep 1'
     196          creationflags = 0
     197          if sys.platform == 'win32':
     198              from subprocess import CREATE_NEW_PROCESS_GROUP
     199              # On windows create a new process group so that killing process
     200              # kills the process and all its children.
     201              creationflags = CREATE_NEW_PROCESS_GROUP
     202          proc = self.loop.run_until_complete(
     203              asyncio.create_subprocess_shell(blocking_shell_command, stdout=asyncio.subprocess.PIPE,
     204              creationflags=creationflags)
     205          )
     206          self.loop.run_until_complete(asyncio.sleep(1))
     207          if sys.platform == 'win32':
     208              proc.send_signal(signal.CTRL_BREAK_EVENT)
     209          # On windows it is an alias of terminate which sets the return code
     210          proc.kill()
     211          returncode = self.loop.run_until_complete(proc.wait())
     212          if sys.platform == 'win32':
     213              self.assertIsInstance(returncode, int)
     214              # expect 1 but sometimes get 0
     215          else:
     216              self.assertEqual(-signal.SIGKILL, returncode)
     217  
     218      def test_terminate(self):
     219          args = PROGRAM_BLOCKED
     220          proc = self.loop.run_until_complete(
     221              asyncio.create_subprocess_exec(*args)
     222          )
     223          proc.terminate()
     224          returncode = self.loop.run_until_complete(proc.wait())
     225          if sys.platform == 'win32':
     226              self.assertIsInstance(returncode, int)
     227              # expect 1 but sometimes get 0
     228          else:
     229              self.assertEqual(-signal.SIGTERM, returncode)
     230  
     231      @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
     232      def test_send_signal(self):
     233          # bpo-31034: Make sure that we get the default signal handler (killing
     234          # the process). The parent process may have decided to ignore SIGHUP,
     235          # and signal handlers are inherited.
     236          old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL)
     237          try:
     238              code = 'import time; print("sleeping", flush=True); time.sleep(3600)'
     239              args = [sys.executable, '-c', code]
     240              proc = self.loop.run_until_complete(
     241                  asyncio.create_subprocess_exec(
     242                      *args,
     243                      stdout=subprocess.PIPE,
     244                  )
     245              )
     246  
     247              async def send_signal(proc):
     248                  # basic synchronization to wait until the program is sleeping
     249                  line = await proc.stdout.readline()
     250                  self.assertEqual(line, b'sleeping\n')
     251  
     252                  proc.send_signal(signal.SIGHUP)
     253                  returncode = await proc.wait()
     254                  return returncode
     255  
     256              returncode = self.loop.run_until_complete(send_signal(proc))
     257              self.assertEqual(-signal.SIGHUP, returncode)
     258          finally:
     259              signal.signal(signal.SIGHUP, old_handler)
     260  
     261      def test_stdin_broken_pipe(self):
     262          # buffer large enough to feed the whole pipe buffer
     263          large_data = b'x' * support.PIPE_MAX_SIZE
     264  
     265          rfd, wfd = os.pipe()
     266          self.addCleanup(os.close, rfd)
     267          self.addCleanup(os.close, wfd)
     268          if support.MS_WINDOWS:
     269              handle = msvcrt.get_osfhandle(rfd)
     270              os.set_handle_inheritable(handle, True)
     271              code = textwrap.dedent(f'''
     272                  import os, msvcrt
     273                  handle = {handle}
     274                  fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
     275                  os.read(fd, 1)
     276              ''')
     277              from subprocess import STARTUPINFO
     278              startupinfo = STARTUPINFO()
     279              startupinfo.lpAttributeList = {"handle_list": [handle]}
     280              kwargs = dict(startupinfo=startupinfo)
     281          else:
     282              code = f'import os; fd = {rfd}; os.read(fd, 1)'
     283              kwargs = dict(pass_fds=(rfd,))
     284  
     285          # the program ends before the stdin can be fed
     286          proc = self.loop.run_until_complete(
     287              asyncio.create_subprocess_exec(
     288                  sys.executable, '-c', code,
     289                  stdin=subprocess.PIPE,
     290                  **kwargs
     291              )
     292          )
     293  
     294          async def write_stdin(proc, data):
     295              proc.stdin.write(data)
     296              # Only exit the child process once the write buffer is filled
     297              os.write(wfd, b'go')
     298              await proc.stdin.drain()
     299  
     300          coro = write_stdin(proc, large_data)
     301          # drain() must raise BrokenPipeError or ConnectionResetError
     302          with test_utils.disable_logger():
     303              self.assertRaises((BrokenPipeError, ConnectionResetError),
     304                                self.loop.run_until_complete, coro)
     305          self.loop.run_until_complete(proc.wait())
     306  
     307      def test_communicate_ignore_broken_pipe(self):
     308          # buffer large enough to feed the whole pipe buffer
     309          large_data = b'x' * support.PIPE_MAX_SIZE
     310  
     311          # the program ends before the stdin can be fed
     312          proc = self.loop.run_until_complete(
     313              asyncio.create_subprocess_exec(
     314                  sys.executable, '-c', 'pass',
     315                  stdin=subprocess.PIPE,
     316              )
     317          )
     318  
     319          # communicate() must ignore BrokenPipeError when feeding stdin
     320          self.loop.set_exception_handler(lambda loop, msg: None)
     321          self.loop.run_until_complete(proc.communicate(large_data))
     322          self.loop.run_until_complete(proc.wait())
     323  
     324      def test_pause_reading(self):
     325          limit = 10
     326          size = (limit * 2 + 1)
     327  
     328          async def test_pause_reading():
     329              code = '\n'.join((
     330                  'import sys',
     331                  'sys.stdout.write("x" * %s)' % size,
     332                  'sys.stdout.flush()',
     333              ))
     334  
     335              connect_read_pipe = self.loop.connect_read_pipe
     336  
     337              async def connect_read_pipe_mock(*args, **kw):
     338                  transport, protocol = await connect_read_pipe(*args, **kw)
     339                  transport.pause_reading = mock.Mock()
     340                  transport.resume_reading = mock.Mock()
     341                  return (transport, protocol)
     342  
     343              self.loop.connect_read_pipe = connect_read_pipe_mock
     344  
     345              proc = await asyncio.create_subprocess_exec(
     346                  sys.executable, '-c', code,
     347                  stdin=asyncio.subprocess.PIPE,
     348                  stdout=asyncio.subprocess.PIPE,
     349                  limit=limit,
     350              )
     351              stdout_transport = proc._transport.get_pipe_transport(1)
     352  
     353              stdout, stderr = await proc.communicate()
     354  
     355              # The child process produced more than limit bytes of output,
     356              # the stream reader transport should pause the protocol to not
     357              # allocate too much memory.
     358              return (stdout, stdout_transport)
     359  
     360          # Issue #22685: Ensure that the stream reader pauses the protocol
     361          # when the child process produces too much data
     362          stdout, transport = self.loop.run_until_complete(test_pause_reading())
     363  
     364          self.assertEqual(stdout, b'x' * size)
     365          self.assertTrue(transport.pause_reading.called)
     366          self.assertTrue(transport.resume_reading.called)
     367  
     368      def test_stdin_not_inheritable(self):
     369          # asyncio issue #209: stdin must not be inheritable, otherwise
     370          # the Process.communicate() hangs
     371          async def len_message(message):
     372              code = 'import sys; data = sys.stdin.read(); print(len(data))'
     373              proc = await asyncio.create_subprocess_exec(
     374                  sys.executable, '-c', code,
     375                  stdin=asyncio.subprocess.PIPE,
     376                  stdout=asyncio.subprocess.PIPE,
     377                  stderr=asyncio.subprocess.PIPE,
     378                  close_fds=False,
     379              )
     380              stdout, stderr = await proc.communicate(message)
     381              exitcode = await proc.wait()
     382              return (stdout, exitcode)
     383  
     384          output, exitcode = self.loop.run_until_complete(len_message(b'abc'))
     385          self.assertEqual(output.rstrip(), b'3')
     386          self.assertEqual(exitcode, 0)
     387  
     388      def test_empty_input(self):
     389  
     390          async def empty_input():
     391              code = 'import sys; data = sys.stdin.read(); print(len(data))'
     392              proc = await asyncio.create_subprocess_exec(
     393                  sys.executable, '-c', code,
     394                  stdin=asyncio.subprocess.PIPE,
     395                  stdout=asyncio.subprocess.PIPE,
     396                  stderr=asyncio.subprocess.PIPE,
     397                  close_fds=False,
     398              )
     399              stdout, stderr = await proc.communicate(b'')
     400              exitcode = await proc.wait()
     401              return (stdout, exitcode)
     402  
     403          output, exitcode = self.loop.run_until_complete(empty_input())
     404          self.assertEqual(output.rstrip(), b'0')
     405          self.assertEqual(exitcode, 0)
     406  
     407      def test_devnull_input(self):
     408  
     409          async def empty_input():
     410              code = 'import sys; data = sys.stdin.read(); print(len(data))'
     411              proc = await asyncio.create_subprocess_exec(
     412                  sys.executable, '-c', code,
     413                  stdin=asyncio.subprocess.DEVNULL,
     414                  stdout=asyncio.subprocess.PIPE,
     415                  stderr=asyncio.subprocess.PIPE,
     416                  close_fds=False,
     417              )
     418              stdout, stderr = await proc.communicate()
     419              exitcode = await proc.wait()
     420              return (stdout, exitcode)
     421  
     422          output, exitcode = self.loop.run_until_complete(empty_input())
     423          self.assertEqual(output.rstrip(), b'0')
     424          self.assertEqual(exitcode, 0)
     425  
     426      def test_devnull_output(self):
     427  
     428          async def empty_output():
     429              code = 'import sys; data = sys.stdin.read(); print(len(data))'
     430              proc = await asyncio.create_subprocess_exec(
     431                  sys.executable, '-c', code,
     432                  stdin=asyncio.subprocess.PIPE,
     433                  stdout=asyncio.subprocess.DEVNULL,
     434                  stderr=asyncio.subprocess.PIPE,
     435                  close_fds=False,
     436              )
     437              stdout, stderr = await proc.communicate(b"abc")
     438              exitcode = await proc.wait()
     439              return (stdout, exitcode)
     440  
     441          output, exitcode = self.loop.run_until_complete(empty_output())
     442          self.assertEqual(output, None)
     443          self.assertEqual(exitcode, 0)
     444  
     445      def test_devnull_error(self):
     446  
     447          async def empty_error():
     448              code = 'import sys; data = sys.stdin.read(); print(len(data))'
     449              proc = await asyncio.create_subprocess_exec(
     450                  sys.executable, '-c', code,
     451                  stdin=asyncio.subprocess.PIPE,
     452                  stdout=asyncio.subprocess.PIPE,
     453                  stderr=asyncio.subprocess.DEVNULL,
     454                  close_fds=False,
     455              )
     456              stdout, stderr = await proc.communicate(b"abc")
     457              exitcode = await proc.wait()
     458              return (stderr, exitcode)
     459  
     460          output, exitcode = self.loop.run_until_complete(empty_error())
     461          self.assertEqual(output, None)
     462          self.assertEqual(exitcode, 0)
     463  
     464      @unittest.skipIf(sys.platform != 'linux', "Don't have /dev/stdin")
     465      def test_devstdin_input(self):
     466  
     467          async def devstdin_input(message):
     468              code = 'file = open("/dev/stdin"); data = file.read(); print(len(data))'
     469              proc = await asyncio.create_subprocess_exec(
     470                  sys.executable, '-c', code,
     471                  stdin=asyncio.subprocess.PIPE,
     472                  stdout=asyncio.subprocess.PIPE,
     473                  stderr=asyncio.subprocess.PIPE,
     474                  close_fds=False,
     475              )
     476              stdout, stderr = await proc.communicate(message)
     477              exitcode = await proc.wait()
     478              return (stdout, exitcode)
     479  
     480          output, exitcode = self.loop.run_until_complete(devstdin_input(b'abc'))
     481          self.assertEqual(output.rstrip(), b'3')
     482          self.assertEqual(exitcode, 0)
     483  
     484      def test_cancel_process_wait(self):
     485          # Issue #23140: cancel Process.wait()
     486  
     487          async def cancel_wait():
     488              proc = await asyncio.create_subprocess_exec(*PROGRAM_BLOCKED)
     489  
     490              # Create an internal future waiting on the process exit
     491              task = self.loop.create_task(proc.wait())
     492              self.loop.call_soon(task.cancel)
     493              try:
     494                  await task
     495              except asyncio.CancelledError:
     496                  pass
     497  
     498              # Cancel the future
     499              task.cancel()
     500  
     501              # Kill the process and wait until it is done
     502              proc.kill()
     503              await proc.wait()
     504  
     505          self.loop.run_until_complete(cancel_wait())
     506  
     507      def test_cancel_make_subprocess_transport_exec(self):
     508  
     509          async def cancel_make_transport():
     510              coro = asyncio.create_subprocess_exec(*PROGRAM_BLOCKED)
     511              task = self.loop.create_task(coro)
     512  
     513              self.loop.call_soon(task.cancel)
     514              try:
     515                  await task
     516              except asyncio.CancelledError:
     517                  pass
     518  
     519          # ignore the log:
     520          # "Exception during subprocess creation, kill the subprocess"
     521          with test_utils.disable_logger():
     522              self.loop.run_until_complete(cancel_make_transport())
     523  
     524      def test_cancel_post_init(self):
     525  
     526          async def cancel_make_transport():
     527              coro = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
     528                                               *PROGRAM_BLOCKED)
     529              task = self.loop.create_task(coro)
     530  
     531              self.loop.call_soon(task.cancel)
     532              try:
     533                  await task
     534              except asyncio.CancelledError:
     535                  pass
     536  
     537          # ignore the log:
     538          # "Exception during subprocess creation, kill the subprocess"
     539          with test_utils.disable_logger():
     540              self.loop.run_until_complete(cancel_make_transport())
     541              test_utils.run_briefly(self.loop)
     542  
     543      def test_close_kill_running(self):
     544  
     545          async def kill_running():
     546              create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
     547                                                 *PROGRAM_BLOCKED)
     548              transport, protocol = await create
     549  
     550              kill_called = False
     551              def kill():
     552                  nonlocal kill_called
     553                  kill_called = True
     554                  orig_kill()
     555  
     556              proc = transport.get_extra_info('subprocess')
     557              orig_kill = proc.kill
     558              proc.kill = kill
     559              returncode = transport.get_returncode()
     560              transport.close()
     561              await asyncio.wait_for(transport._wait(), 5)
     562              return (returncode, kill_called)
     563  
     564          # Ignore "Close running child process: kill ..." log
     565          with test_utils.disable_logger():
     566              try:
     567                  returncode, killed = self.loop.run_until_complete(
     568                      kill_running()
     569                  )
     570              except asyncio.TimeoutError:
     571                  self.skipTest(
     572                      "Timeout failure on waiting for subprocess stopping"
     573                  )
     574          self.assertIsNone(returncode)
     575  
     576          # transport.close() must kill the process if it is still running
     577          self.assertTrue(killed)
     578          test_utils.run_briefly(self.loop)
     579  
     580      def test_close_dont_kill_finished(self):
     581  
     582          async def kill_running():
     583              create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
     584                                                 *PROGRAM_BLOCKED)
     585              transport, protocol = await create
     586              proc = transport.get_extra_info('subprocess')
     587  
     588              # kill the process (but asyncio is not notified immediately)
     589              proc.kill()
     590              proc.wait()
     591  
     592              proc.kill = mock.Mock()
     593              proc_returncode = proc.poll()
     594              transport_returncode = transport.get_returncode()
     595              transport.close()
     596              return (proc_returncode, transport_returncode, proc.kill.called)
     597  
     598          # Ignore "Unknown child process pid ..." log of SafeChildWatcher,
     599          # emitted because the test already consumes the exit status:
     600          # proc.wait()
     601          with test_utils.disable_logger():
     602              result = self.loop.run_until_complete(kill_running())
     603              test_utils.run_briefly(self.loop)
     604  
     605          proc_returncode, transport_return_code, killed = result
     606  
     607          self.assertIsNotNone(proc_returncode)
     608          self.assertIsNone(transport_return_code)
     609  
     610          # transport.close() must not kill the process if it finished, even if
     611          # the transport was not notified yet
     612          self.assertFalse(killed)
     613  
     614          # Unlike SafeChildWatcher, FastChildWatcher does not pop the
     615          # callbacks if waitpid() is called elsewhere. Let's clear them
     616          # manually to avoid a warning when the watcher is detached.
     617          if (sys.platform != 'win32' and
     618                  isinstance(self, SubprocessFastWatcherTests)):
     619              asyncio.get_child_watcher()._callbacks.clear()
     620  
     621      async def _test_popen_error(self, stdin):
     622          if sys.platform == 'win32':
     623              target = 'asyncio.windows_utils.Popen'
     624          else:
     625              target = 'subprocess.Popen'
     626          with mock.patch(target) as popen:
     627              exc = ZeroDivisionError
     628              popen.side_effect = exc
     629  
     630              with warnings.catch_warnings(record=True) as warns:
     631                  with self.assertRaises(exc):
     632                      await asyncio.create_subprocess_exec(
     633                          sys.executable,
     634                          '-c',
     635                          'pass',
     636                          stdin=stdin
     637                      )
     638                  self.assertEqual(warns, [])
     639  
     640      def test_popen_error(self):
     641          # Issue #24763: check that the subprocess transport is closed
     642          # when BaseSubprocessTransport fails
     643          self.loop.run_until_complete(self._test_popen_error(stdin=None))
     644  
     645      def test_popen_error_with_stdin_pipe(self):
     646          # Issue #35721: check that newly created socket pair is closed when
     647          # Popen fails
     648          self.loop.run_until_complete(
     649              self._test_popen_error(stdin=subprocess.PIPE))
     650  
     651      def test_read_stdout_after_process_exit(self):
     652  
     653          async def execute():
     654              code = '\n'.join(['import sys',
     655                                'for _ in range(64):',
     656                                '    sys.stdout.write("x" * 4096)',
     657                                'sys.stdout.flush()',
     658                                'sys.exit(1)'])
     659  
     660              process = await asyncio.create_subprocess_exec(
     661                  sys.executable, '-c', code,
     662                  stdout=asyncio.subprocess.PIPE,
     663              )
     664  
     665              while True:
     666                  data = await process.stdout.read(65536)
     667                  if data:
     668                      await asyncio.sleep(0.3)
     669                  else:
     670                      break
     671  
     672          self.loop.run_until_complete(execute())
     673  
     674      def test_create_subprocess_exec_text_mode_fails(self):
     675          async def execute():
     676              with self.assertRaises(ValueError):
     677                  await subprocess.create_subprocess_exec(sys.executable,
     678                                                          text=True)
     679  
     680              with self.assertRaises(ValueError):
     681                  await subprocess.create_subprocess_exec(sys.executable,
     682                                                          encoding="utf-8")
     683  
     684              with self.assertRaises(ValueError):
     685                  await subprocess.create_subprocess_exec(sys.executable,
     686                                                          errors="strict")
     687  
     688          self.loop.run_until_complete(execute())
     689  
     690      def test_create_subprocess_shell_text_mode_fails(self):
     691  
     692          async def execute():
     693              with self.assertRaises(ValueError):
     694                  await subprocess.create_subprocess_shell(sys.executable,
     695                                                           text=True)
     696  
     697              with self.assertRaises(ValueError):
     698                  await subprocess.create_subprocess_shell(sys.executable,
     699                                                           encoding="utf-8")
     700  
     701              with self.assertRaises(ValueError):
     702                  await subprocess.create_subprocess_shell(sys.executable,
     703                                                           errors="strict")
     704  
     705          self.loop.run_until_complete(execute())
     706  
     707      def test_create_subprocess_exec_with_path(self):
     708          async def execute():
     709              p = await subprocess.create_subprocess_exec(
     710                  os_helper.FakePath(sys.executable), '-c', 'pass')
     711              await p.wait()
     712              p = await subprocess.create_subprocess_exec(
     713                  sys.executable, '-c', 'pass', os_helper.FakePath('.'))
     714              await p.wait()
     715  
     716          self.assertIsNone(self.loop.run_until_complete(execute()))
     717  
     718      def test_subprocess_communicate_stdout(self):
     719          # See https://github.com/python/cpython/issues/100133
     720          async def get_command_stdout(cmd, *args):
     721              proc = await asyncio.create_subprocess_exec(
     722                  cmd, *args, stdout=asyncio.subprocess.PIPE,
     723              )
     724              stdout, _ = await proc.communicate()
     725              return stdout.decode().strip()
     726  
     727          async def main():
     728              outputs = [f'foo{i}' for i in range(10)]
     729              res = await asyncio.gather(*[get_command_stdout(sys.executable, '-c',
     730                                          f'print({out!r})') for out in outputs])
     731              self.assertEqual(res, outputs)
     732  
     733          self.loop.run_until_complete(main())
     734  
     735  
     736  if sys.platform != 'win32':
     737      # Unix
     738      class ESC[4;38;5;81mSubprocessWatcherMixin(ESC[4;38;5;149mSubprocessMixin):
     739  
     740          Watcher = None
     741  
     742          def setUp(self):
     743              super().setUp()
     744              policy = asyncio.get_event_loop_policy()
     745              self.loop = policy.new_event_loop()
     746              self.set_event_loop(self.loop)
     747  
     748              watcher = self.Watcher()
     749              watcher.attach_loop(self.loop)
     750              policy.set_child_watcher(watcher)
     751  
     752          def tearDown(self):
     753              super().tearDown()
     754              policy = asyncio.get_event_loop_policy()
     755              watcher = policy.get_child_watcher()
     756              policy.set_child_watcher(None)
     757              watcher.attach_loop(None)
     758              watcher.close()
     759  
     760      class ESC[4;38;5;81mSubprocessThreadedWatcherTests(ESC[4;38;5;149mSubprocessWatcherMixin,
     761                                           ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     762  
     763          Watcher = unix_events.ThreadedChildWatcher
     764  
     765      @unittest.skip("bpo-38323: MultiLoopChildWatcher has a race condition \
     766                      and these tests can hang the test suite")
     767      class ESC[4;38;5;81mSubprocessMultiLoopWatcherTests(ESC[4;38;5;149mSubprocessWatcherMixin,
     768                                            ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     769  
     770          Watcher = unix_events.MultiLoopChildWatcher
     771  
     772      class ESC[4;38;5;81mSubprocessSafeWatcherTests(ESC[4;38;5;149mSubprocessWatcherMixin,
     773                                       ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     774  
     775          Watcher = unix_events.SafeChildWatcher
     776  
     777      class ESC[4;38;5;81mSubprocessFastWatcherTests(ESC[4;38;5;149mSubprocessWatcherMixin,
     778                                       ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     779  
     780          Watcher = unix_events.FastChildWatcher
     781  
     782      def has_pidfd_support():
     783          if not hasattr(os, 'pidfd_open'):
     784              return False
     785          try:
     786              os.close(os.pidfd_open(os.getpid()))
     787          except OSError:
     788              return False
     789          return True
     790  
     791      @unittest.skipUnless(
     792          has_pidfd_support(),
     793          "operating system does not support pidfds",
     794      )
     795      class ESC[4;38;5;81mSubprocessPidfdWatcherTests(ESC[4;38;5;149mSubprocessWatcherMixin,
     796                                        ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     797          Watcher = unix_events.PidfdChildWatcher
     798  
     799  
     800      class ESC[4;38;5;81mGenericWatcherTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     801  
     802          def test_create_subprocess_fails_with_inactive_watcher(self):
     803              watcher = mock.create_autospec(
     804                  asyncio.AbstractChildWatcher,
     805                  **{"__enter__.return_value.is_active.return_value": False}
     806              )
     807  
     808              async def execute():
     809                  asyncio.set_child_watcher(watcher)
     810  
     811                  with self.assertRaises(RuntimeError):
     812                      await subprocess.create_subprocess_exec(
     813                          os_helper.FakePath(sys.executable), '-c', 'pass')
     814  
     815                  watcher.add_child_handler.assert_not_called()
     816  
     817              with asyncio.Runner(loop_factory=asyncio.new_event_loop) as runner:
     818                  self.assertIsNone(runner.run(execute()))
     819              self.assertListEqual(watcher.mock_calls, [
     820                  mock.call.__enter__(),
     821                  mock.call.__enter__().is_active(),
     822                  mock.call.__exit__(RuntimeError, mock.ANY, mock.ANY),
     823              ])
     824  
     825  else:
     826      # Windows
     827      class ESC[4;38;5;81mSubprocessProactorTests(ESC[4;38;5;149mSubprocessMixin, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     828  
     829          def setUp(self):
     830              super().setUp()
     831              self.loop = asyncio.ProactorEventLoop()
     832              self.set_event_loop(self.loop)
     833  
     834  
     835  if __name__ == '__main__':
     836      unittest.main()