python (3.12.0)

(root)/
lib/
python3.12/
test/
test_asyncio/
test_subprocess.py
       1  import os
       2  import signal
       3  import sys
       4  import unittest
       5  import warnings
       6  from unittest import mock
       7  
       8  import asyncio
       9  from asyncio import base_subprocess
      10  from asyncio import subprocess
      11  from test.test_asyncio import utils as test_utils
      12  from test import support
      13  from test.support import os_helper
      14  
      15  if sys.platform != 'win32':
      16      from asyncio import unix_events
      17  
      18  if support.check_sanitizer(address=True):
      19      raise unittest.SkipTest("Exposes ASAN flakiness in GitHub CI")
      20  
      21  # Program blocking
      22  PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)']
      23  
      24  # Program copying input to output
      25  PROGRAM_CAT = [
      26      sys.executable, '-c',
      27      ';'.join(('import sys',
      28                'data = sys.stdin.buffer.read()',
      29                'sys.stdout.buffer.write(data)'))]
      30  
      31  
      32  def tearDownModule():
      33      asyncio.set_event_loop_policy(None)
      34  
      35  
      36  class ESC[4;38;5;81mTestSubprocessTransport(ESC[4;38;5;149mbase_subprocessESC[4;38;5;149m.ESC[4;38;5;149mBaseSubprocessTransport):
      37      def _start(self, *args, **kwargs):
      38          self._proc = mock.Mock()
      39          self._proc.stdin = None
      40          self._proc.stdout = None
      41          self._proc.stderr = None
      42          self._proc.pid = -1
      43  
      44  
      45  class ESC[4;38;5;81mSubprocessTransportTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      46      def setUp(self):
      47          super().setUp()
      48          self.loop = self.new_test_loop()
      49          self.set_event_loop(self.loop)
      50  
      51      def create_transport(self, waiter=None):
      52          protocol = mock.Mock()
      53          transport = TestSubprocessTransport(
      54                          self.loop, protocol, ['test'], False,
      55                          None, None, None, 0, waiter=waiter)
      56          return (transport, protocol)
      57  
      58      def test_proc_exited(self):
      59          waiter = self.loop.create_future()
      60          transport, protocol = self.create_transport(waiter)
      61          transport._process_exited(6)
      62          self.loop.run_until_complete(waiter)
      63  
      64          self.assertEqual(transport.get_returncode(), 6)
      65  
      66          self.assertTrue(protocol.connection_made.called)
      67          self.assertTrue(protocol.process_exited.called)
      68          self.assertTrue(protocol.connection_lost.called)
      69          self.assertEqual(protocol.connection_lost.call_args[0], (None,))
      70  
      71          self.assertFalse(transport.is_closing())
      72          self.assertIsNone(transport._loop)
      73          self.assertIsNone(transport._proc)
      74          self.assertIsNone(transport._protocol)
      75  
      76          # methods must raise ProcessLookupError if the process exited
      77          self.assertRaises(ProcessLookupError,
      78                            transport.send_signal, signal.SIGTERM)
      79          self.assertRaises(ProcessLookupError, transport.terminate)
      80          self.assertRaises(ProcessLookupError, transport.kill)
      81  
      82          transport.close()
      83  
      84      def test_subprocess_repr(self):
      85          waiter = self.loop.create_future()
      86          transport, protocol = self.create_transport(waiter)
      87          transport._process_exited(6)
      88          self.loop.run_until_complete(waiter)
      89  
      90          self.assertEqual(
      91              repr(transport),
      92              "<TestSubprocessTransport pid=-1 returncode=6>"
      93          )
      94          transport._returncode = None
      95          self.assertEqual(
      96              repr(transport),
      97              "<TestSubprocessTransport pid=-1 running>"
      98          )
      99          transport._pid = None
     100          transport._returncode = None
     101          self.assertEqual(
     102              repr(transport),
     103              "<TestSubprocessTransport not started>"
     104          )
     105          transport.close()
     106  
     107  
     108  class ESC[4;38;5;81mSubprocessMixin:
     109  
     110      def test_stdin_stdout(self):
     111          args = PROGRAM_CAT
     112  
     113          async def run(data):
     114              proc = await asyncio.create_subprocess_exec(
     115                  *args,
     116                  stdin=subprocess.PIPE,
     117                  stdout=subprocess.PIPE,
     118              )
     119  
     120              # feed data
     121              proc.stdin.write(data)
     122              await proc.stdin.drain()
     123              proc.stdin.close()
     124  
     125              # get output and exitcode
     126              data = await proc.stdout.read()
     127              exitcode = await proc.wait()
     128              return (exitcode, data)
     129  
     130          task = run(b'some data')
     131          task = asyncio.wait_for(task, 60.0)
     132          exitcode, stdout = self.loop.run_until_complete(task)
     133          self.assertEqual(exitcode, 0)
     134          self.assertEqual(stdout, b'some data')
     135  
     136      def test_communicate(self):
     137          args = PROGRAM_CAT
     138  
     139          async def run(data):
     140              proc = await asyncio.create_subprocess_exec(
     141                  *args,
     142                  stdin=subprocess.PIPE,
     143                  stdout=subprocess.PIPE,
     144              )
     145              stdout, stderr = await proc.communicate(data)
     146              return proc.returncode, stdout
     147  
     148          task = run(b'some data')
     149          task = asyncio.wait_for(task, support.LONG_TIMEOUT)
     150          exitcode, stdout = self.loop.run_until_complete(task)
     151          self.assertEqual(exitcode, 0)
     152          self.assertEqual(stdout, b'some data')
     153  
     154      def test_communicate_none_input(self):
     155          args = PROGRAM_CAT
     156  
     157          async def run():
     158              proc = await asyncio.create_subprocess_exec(
     159                  *args,
     160                  stdin=subprocess.PIPE,
     161                  stdout=subprocess.PIPE,
     162              )
     163              stdout, stderr = await proc.communicate()
     164              return proc.returncode, stdout
     165  
     166          task = run()
     167          task = asyncio.wait_for(task, support.LONG_TIMEOUT)
     168          exitcode, stdout = self.loop.run_until_complete(task)
     169          self.assertEqual(exitcode, 0)
     170          self.assertEqual(stdout, b'')
     171  
     172      def test_shell(self):
     173          proc = self.loop.run_until_complete(
     174              asyncio.create_subprocess_shell('exit 7')
     175          )
     176          exitcode = self.loop.run_until_complete(proc.wait())
     177          self.assertEqual(exitcode, 7)
     178  
     179      def test_start_new_session(self):
     180          # start the new process in a new session
     181          proc = self.loop.run_until_complete(
     182              asyncio.create_subprocess_shell(
     183                  'exit 8',
     184                  start_new_session=True,
     185              )
     186          )
     187          exitcode = self.loop.run_until_complete(proc.wait())
     188          self.assertEqual(exitcode, 8)
     189  
     190      def test_kill(self):
     191          args = PROGRAM_BLOCKED
     192          proc = self.loop.run_until_complete(
     193              asyncio.create_subprocess_exec(*args)
     194          )
     195          proc.kill()
     196          returncode = self.loop.run_until_complete(proc.wait())
     197          if sys.platform == 'win32':
     198              self.assertIsInstance(returncode, int)
     199              # expect 1 but sometimes get 0
     200          else:
     201              self.assertEqual(-signal.SIGKILL, returncode)
     202  
     203      def test_kill_issue43884(self):
     204          if sys.platform == 'win32':
     205              blocking_shell_command = f'{sys.executable} -c "import time; time.sleep(2)"'
     206          else:
     207              blocking_shell_command = 'sleep 1; sleep 1'
     208          creationflags = 0
     209          if sys.platform == 'win32':
     210              from subprocess import CREATE_NEW_PROCESS_GROUP
     211              # On windows create a new process group so that killing process
     212              # kills the process and all its children.
     213              creationflags = CREATE_NEW_PROCESS_GROUP
     214          proc = self.loop.run_until_complete(
     215              asyncio.create_subprocess_shell(blocking_shell_command, stdout=asyncio.subprocess.PIPE,
     216              creationflags=creationflags)
     217          )
     218          self.loop.run_until_complete(asyncio.sleep(1))
     219          if sys.platform == 'win32':
     220              proc.send_signal(signal.CTRL_BREAK_EVENT)
     221          # On windows it is an alias of terminate which sets the return code
     222          proc.kill()
     223          returncode = self.loop.run_until_complete(proc.wait())
     224          if sys.platform == 'win32':
     225              self.assertIsInstance(returncode, int)
     226              # expect 1 but sometimes get 0
     227          else:
     228              self.assertEqual(-signal.SIGKILL, returncode)
     229  
     230      def test_terminate(self):
     231          args = PROGRAM_BLOCKED
     232          proc = self.loop.run_until_complete(
     233              asyncio.create_subprocess_exec(*args)
     234          )
     235          proc.terminate()
     236          returncode = self.loop.run_until_complete(proc.wait())
     237          if sys.platform == 'win32':
     238              self.assertIsInstance(returncode, int)
     239              # expect 1 but sometimes get 0
     240          else:
     241              self.assertEqual(-signal.SIGTERM, returncode)
     242  
     243      @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
     244      def test_send_signal(self):
     245          # bpo-31034: Make sure that we get the default signal handler (killing
     246          # the process). The parent process may have decided to ignore SIGHUP,
     247          # and signal handlers are inherited.
     248          old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL)
     249          try:
     250              code = 'import time; print("sleeping", flush=True); time.sleep(3600)'
     251              args = [sys.executable, '-c', code]
     252              proc = self.loop.run_until_complete(
     253                  asyncio.create_subprocess_exec(
     254                      *args,
     255                      stdout=subprocess.PIPE,
     256                  )
     257              )
     258  
     259              async def send_signal(proc):
     260                  # basic synchronization to wait until the program is sleeping
     261                  line = await proc.stdout.readline()
     262                  self.assertEqual(line, b'sleeping\n')
     263  
     264                  proc.send_signal(signal.SIGHUP)
     265                  returncode = await proc.wait()
     266                  return returncode
     267  
     268              returncode = self.loop.run_until_complete(send_signal(proc))
     269              self.assertEqual(-signal.SIGHUP, returncode)
     270          finally:
     271              signal.signal(signal.SIGHUP, old_handler)
     272  
     273      def prepare_broken_pipe_test(self):
     274          # buffer large enough to feed the whole pipe buffer
     275          large_data = b'x' * support.PIPE_MAX_SIZE
     276  
     277          # the program ends before the stdin can be fed
     278          proc = self.loop.run_until_complete(
     279              asyncio.create_subprocess_exec(
     280                  sys.executable, '-c', 'pass',
     281                  stdin=subprocess.PIPE,
     282              )
     283          )
     284  
     285          return (proc, large_data)
     286  
     287      def test_stdin_broken_pipe(self):
     288          proc, large_data = self.prepare_broken_pipe_test()
     289  
     290          async def write_stdin(proc, data):
     291              await asyncio.sleep(0.5)
     292              proc.stdin.write(data)
     293              await proc.stdin.drain()
     294  
     295          coro = write_stdin(proc, large_data)
     296          # drain() must raise BrokenPipeError or ConnectionResetError
     297          with test_utils.disable_logger():
     298              self.assertRaises((BrokenPipeError, ConnectionResetError),
     299                                self.loop.run_until_complete, coro)
     300          self.loop.run_until_complete(proc.wait())
     301  
     302      def test_communicate_ignore_broken_pipe(self):
     303          proc, large_data = self.prepare_broken_pipe_test()
     304  
     305          # communicate() must ignore BrokenPipeError when feeding stdin
     306          self.loop.set_exception_handler(lambda loop, msg: None)
     307          self.loop.run_until_complete(proc.communicate(large_data))
     308          self.loop.run_until_complete(proc.wait())
     309  
     310      def test_pause_reading(self):
     311          limit = 10
     312          size = (limit * 2 + 1)
     313  
     314          async def test_pause_reading():
     315              code = '\n'.join((
     316                  'import sys',
     317                  'sys.stdout.write("x" * %s)' % size,
     318                  'sys.stdout.flush()',
     319              ))
     320  
     321              connect_read_pipe = self.loop.connect_read_pipe
     322  
     323              async def connect_read_pipe_mock(*args, **kw):
     324                  transport, protocol = await connect_read_pipe(*args, **kw)
     325                  transport.pause_reading = mock.Mock()
     326                  transport.resume_reading = mock.Mock()
     327                  return (transport, protocol)
     328  
     329              self.loop.connect_read_pipe = connect_read_pipe_mock
     330  
     331              proc = await asyncio.create_subprocess_exec(
     332                  sys.executable, '-c', code,
     333                  stdin=asyncio.subprocess.PIPE,
     334                  stdout=asyncio.subprocess.PIPE,
     335                  limit=limit,
     336              )
     337              stdout_transport = proc._transport.get_pipe_transport(1)
     338  
     339              stdout, stderr = await proc.communicate()
     340  
     341              # The child process produced more than limit bytes of output,
     342              # the stream reader transport should pause the protocol to not
     343              # allocate too much memory.
     344              return (stdout, stdout_transport)
     345  
     346          # Issue #22685: Ensure that the stream reader pauses the protocol
     347          # when the child process produces too much data
     348          stdout, transport = self.loop.run_until_complete(test_pause_reading())
     349  
     350          self.assertEqual(stdout, b'x' * size)
     351          self.assertTrue(transport.pause_reading.called)
     352          self.assertTrue(transport.resume_reading.called)
     353  
     354      def test_stdin_not_inheritable(self):
     355          # asyncio issue #209: stdin must not be inheritable, otherwise
     356          # the Process.communicate() hangs
     357          async def len_message(message):
     358              code = 'import sys; data = sys.stdin.read(); print(len(data))'
     359              proc = await asyncio.create_subprocess_exec(
     360                  sys.executable, '-c', code,
     361                  stdin=asyncio.subprocess.PIPE,
     362                  stdout=asyncio.subprocess.PIPE,
     363                  stderr=asyncio.subprocess.PIPE,
     364                  close_fds=False,
     365              )
     366              stdout, stderr = await proc.communicate(message)
     367              exitcode = await proc.wait()
     368              return (stdout, exitcode)
     369  
     370          output, exitcode = self.loop.run_until_complete(len_message(b'abc'))
     371          self.assertEqual(output.rstrip(), b'3')
     372          self.assertEqual(exitcode, 0)
     373  
     374      def test_empty_input(self):
     375  
     376          async def empty_input():
     377              code = 'import sys; data = sys.stdin.read(); print(len(data))'
     378              proc = await asyncio.create_subprocess_exec(
     379                  sys.executable, '-c', code,
     380                  stdin=asyncio.subprocess.PIPE,
     381                  stdout=asyncio.subprocess.PIPE,
     382                  stderr=asyncio.subprocess.PIPE,
     383                  close_fds=False,
     384              )
     385              stdout, stderr = await proc.communicate(b'')
     386              exitcode = await proc.wait()
     387              return (stdout, exitcode)
     388  
     389          output, exitcode = self.loop.run_until_complete(empty_input())
     390          self.assertEqual(output.rstrip(), b'0')
     391          self.assertEqual(exitcode, 0)
     392  
     393      def test_devnull_input(self):
     394  
     395          async def empty_input():
     396              code = 'import sys; data = sys.stdin.read(); print(len(data))'
     397              proc = await asyncio.create_subprocess_exec(
     398                  sys.executable, '-c', code,
     399                  stdin=asyncio.subprocess.DEVNULL,
     400                  stdout=asyncio.subprocess.PIPE,
     401                  stderr=asyncio.subprocess.PIPE,
     402                  close_fds=False,
     403              )
     404              stdout, stderr = await proc.communicate()
     405              exitcode = await proc.wait()
     406              return (stdout, exitcode)
     407  
     408          output, exitcode = self.loop.run_until_complete(empty_input())
     409          self.assertEqual(output.rstrip(), b'0')
     410          self.assertEqual(exitcode, 0)
     411  
     412      def test_devnull_output(self):
     413  
     414          async def empty_output():
     415              code = 'import sys; data = sys.stdin.read(); print(len(data))'
     416              proc = await asyncio.create_subprocess_exec(
     417                  sys.executable, '-c', code,
     418                  stdin=asyncio.subprocess.PIPE,
     419                  stdout=asyncio.subprocess.DEVNULL,
     420                  stderr=asyncio.subprocess.PIPE,
     421                  close_fds=False,
     422              )
     423              stdout, stderr = await proc.communicate(b"abc")
     424              exitcode = await proc.wait()
     425              return (stdout, exitcode)
     426  
     427          output, exitcode = self.loop.run_until_complete(empty_output())
     428          self.assertEqual(output, None)
     429          self.assertEqual(exitcode, 0)
     430  
     431      def test_devnull_error(self):
     432  
     433          async def empty_error():
     434              code = 'import sys; data = sys.stdin.read(); print(len(data))'
     435              proc = await asyncio.create_subprocess_exec(
     436                  sys.executable, '-c', code,
     437                  stdin=asyncio.subprocess.PIPE,
     438                  stdout=asyncio.subprocess.PIPE,
     439                  stderr=asyncio.subprocess.DEVNULL,
     440                  close_fds=False,
     441              )
     442              stdout, stderr = await proc.communicate(b"abc")
     443              exitcode = await proc.wait()
     444              return (stderr, exitcode)
     445  
     446          output, exitcode = self.loop.run_until_complete(empty_error())
     447          self.assertEqual(output, None)
     448          self.assertEqual(exitcode, 0)
     449  
     450      @unittest.skipIf(sys.platform != 'linux', "Don't have /dev/stdin")
     451      def test_devstdin_input(self):
     452  
     453          async def devstdin_input(message):
     454              code = 'file = open("/dev/stdin"); data = file.read(); print(len(data))'
     455              proc = await asyncio.create_subprocess_exec(
     456                  sys.executable, '-c', code,
     457                  stdin=asyncio.subprocess.PIPE,
     458                  stdout=asyncio.subprocess.PIPE,
     459                  stderr=asyncio.subprocess.PIPE,
     460                  close_fds=False,
     461              )
     462              stdout, stderr = await proc.communicate(message)
     463              exitcode = await proc.wait()
     464              return (stdout, exitcode)
     465  
     466          output, exitcode = self.loop.run_until_complete(devstdin_input(b'abc'))
     467          self.assertEqual(output.rstrip(), b'3')
     468          self.assertEqual(exitcode, 0)
     469  
     470      def test_cancel_process_wait(self):
     471          # Issue #23140: cancel Process.wait()
     472  
     473          async def cancel_wait():
     474              proc = await asyncio.create_subprocess_exec(*PROGRAM_BLOCKED)
     475  
     476              # Create an internal future waiting on the process exit
     477              task = self.loop.create_task(proc.wait())
     478              self.loop.call_soon(task.cancel)
     479              try:
     480                  await task
     481              except asyncio.CancelledError:
     482                  pass
     483  
     484              # Cancel the future
     485              task.cancel()
     486  
     487              # Kill the process and wait until it is done
     488              proc.kill()
     489              await proc.wait()
     490  
     491          self.loop.run_until_complete(cancel_wait())
     492  
     493      def test_cancel_make_subprocess_transport_exec(self):
     494  
     495          async def cancel_make_transport():
     496              coro = asyncio.create_subprocess_exec(*PROGRAM_BLOCKED)
     497              task = self.loop.create_task(coro)
     498  
     499              self.loop.call_soon(task.cancel)
     500              try:
     501                  await task
     502              except asyncio.CancelledError:
     503                  pass
     504  
     505          # ignore the log:
     506          # "Exception during subprocess creation, kill the subprocess"
     507          with test_utils.disable_logger():
     508              self.loop.run_until_complete(cancel_make_transport())
     509  
     510      def test_cancel_post_init(self):
     511  
     512          async def cancel_make_transport():
     513              coro = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
     514                                               *PROGRAM_BLOCKED)
     515              task = self.loop.create_task(coro)
     516  
     517              self.loop.call_soon(task.cancel)
     518              try:
     519                  await task
     520              except asyncio.CancelledError:
     521                  pass
     522  
     523          # ignore the log:
     524          # "Exception during subprocess creation, kill the subprocess"
     525          with test_utils.disable_logger():
     526              self.loop.run_until_complete(cancel_make_transport())
     527              test_utils.run_briefly(self.loop)
     528  
     529      def test_close_kill_running(self):
     530  
     531          async def kill_running():
     532              create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
     533                                                 *PROGRAM_BLOCKED)
     534              transport, protocol = await create
     535  
     536              kill_called = False
     537              def kill():
     538                  nonlocal kill_called
     539                  kill_called = True
     540                  orig_kill()
     541  
     542              proc = transport.get_extra_info('subprocess')
     543              orig_kill = proc.kill
     544              proc.kill = kill
     545              returncode = transport.get_returncode()
     546              transport.close()
     547              await asyncio.wait_for(transport._wait(), 5)
     548              return (returncode, kill_called)
     549  
     550          # Ignore "Close running child process: kill ..." log
     551          with test_utils.disable_logger():
     552              try:
     553                  returncode, killed = self.loop.run_until_complete(
     554                      kill_running()
     555                  )
     556              except asyncio.TimeoutError:
     557                  self.skipTest(
     558                      "Timeout failure on waiting for subprocess stopping"
     559                  )
     560          self.assertIsNone(returncode)
     561  
     562          # transport.close() must kill the process if it is still running
     563          self.assertTrue(killed)
     564          test_utils.run_briefly(self.loop)
     565  
     566      def test_close_dont_kill_finished(self):
     567  
     568          async def kill_running():
     569              create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
     570                                                 *PROGRAM_BLOCKED)
     571              transport, protocol = await create
     572              proc = transport.get_extra_info('subprocess')
     573  
     574              # kill the process (but asyncio is not notified immediately)
     575              proc.kill()
     576              proc.wait()
     577  
     578              proc.kill = mock.Mock()
     579              proc_returncode = proc.poll()
     580              transport_returncode = transport.get_returncode()
     581              transport.close()
     582              return (proc_returncode, transport_returncode, proc.kill.called)
     583  
     584          # Ignore "Unknown child process pid ..." log of SafeChildWatcher,
     585          # emitted because the test already consumes the exit status:
     586          # proc.wait()
     587          with test_utils.disable_logger():
     588              result = self.loop.run_until_complete(kill_running())
     589              test_utils.run_briefly(self.loop)
     590  
     591          proc_returncode, transport_return_code, killed = result
     592  
     593          self.assertIsNotNone(proc_returncode)
     594          self.assertIsNone(transport_return_code)
     595  
     596          # transport.close() must not kill the process if it finished, even if
     597          # the transport was not notified yet
     598          self.assertFalse(killed)
     599  
     600          # Unlike SafeChildWatcher, FastChildWatcher does not pop the
     601          # callbacks if waitpid() is called elsewhere. Let's clear them
     602          # manually to avoid a warning when the watcher is detached.
     603          if (sys.platform != 'win32' and
     604                  isinstance(self, SubprocessFastWatcherTests)):
     605              with warnings.catch_warnings():
     606                  warnings.simplefilter('ignore', DeprecationWarning)
     607                  asyncio.get_child_watcher()._callbacks.clear()
     608  
     609      async def _test_popen_error(self, stdin):
     610          if sys.platform == 'win32':
     611              target = 'asyncio.windows_utils.Popen'
     612          else:
     613              target = 'subprocess.Popen'
     614          with mock.patch(target) as popen:
     615              exc = ZeroDivisionError
     616              popen.side_effect = exc
     617  
     618              with warnings.catch_warnings(record=True) as warns:
     619                  with self.assertRaises(exc):
     620                      await asyncio.create_subprocess_exec(
     621                          sys.executable,
     622                          '-c',
     623                          'pass',
     624                          stdin=stdin
     625                      )
     626                  self.assertEqual(warns, [])
     627  
     628      def test_popen_error(self):
     629          # Issue #24763: check that the subprocess transport is closed
     630          # when BaseSubprocessTransport fails
     631          self.loop.run_until_complete(self._test_popen_error(stdin=None))
     632  
     633      def test_popen_error_with_stdin_pipe(self):
     634          # Issue #35721: check that newly created socket pair is closed when
     635          # Popen fails
     636          self.loop.run_until_complete(
     637              self._test_popen_error(stdin=subprocess.PIPE))
     638  
     639      def test_read_stdout_after_process_exit(self):
     640  
     641          async def execute():
     642              code = '\n'.join(['import sys',
     643                                'for _ in range(64):',
     644                                '    sys.stdout.write("x" * 4096)',
     645                                'sys.stdout.flush()',
     646                                'sys.exit(1)'])
     647  
     648              process = await asyncio.create_subprocess_exec(
     649                  sys.executable, '-c', code,
     650                  stdout=asyncio.subprocess.PIPE,
     651              )
     652  
     653              while True:
     654                  data = await process.stdout.read(65536)
     655                  if data:
     656                      await asyncio.sleep(0.3)
     657                  else:
     658                      break
     659  
     660          self.loop.run_until_complete(execute())
     661  
     662      def test_create_subprocess_exec_text_mode_fails(self):
     663          async def execute():
     664              with self.assertRaises(ValueError):
     665                  await subprocess.create_subprocess_exec(sys.executable,
     666                                                          text=True)
     667  
     668              with self.assertRaises(ValueError):
     669                  await subprocess.create_subprocess_exec(sys.executable,
     670                                                          encoding="utf-8")
     671  
     672              with self.assertRaises(ValueError):
     673                  await subprocess.create_subprocess_exec(sys.executable,
     674                                                          errors="strict")
     675  
     676          self.loop.run_until_complete(execute())
     677  
     678      def test_create_subprocess_shell_text_mode_fails(self):
     679  
     680          async def execute():
     681              with self.assertRaises(ValueError):
     682                  await subprocess.create_subprocess_shell(sys.executable,
     683                                                           text=True)
     684  
     685              with self.assertRaises(ValueError):
     686                  await subprocess.create_subprocess_shell(sys.executable,
     687                                                           encoding="utf-8")
     688  
     689              with self.assertRaises(ValueError):
     690                  await subprocess.create_subprocess_shell(sys.executable,
     691                                                           errors="strict")
     692  
     693          self.loop.run_until_complete(execute())
     694  
     695      def test_create_subprocess_exec_with_path(self):
     696          async def execute():
     697              p = await subprocess.create_subprocess_exec(
     698                  os_helper.FakePath(sys.executable), '-c', 'pass')
     699              await p.wait()
     700              p = await subprocess.create_subprocess_exec(
     701                  sys.executable, '-c', 'pass', os_helper.FakePath('.'))
     702              await p.wait()
     703  
     704          self.assertIsNone(self.loop.run_until_complete(execute()))
     705  
     706      async def check_stdout_output(self, coro, output):
     707          proc = await coro
     708          stdout, _ = await proc.communicate()
     709          self.assertEqual(stdout, output)
     710          self.assertEqual(proc.returncode, 0)
     711          task = asyncio.create_task(proc.wait())
     712          await asyncio.sleep(0)
     713          self.assertEqual(task.result(), proc.returncode)
     714  
     715      def test_create_subprocess_env_shell(self) -> None:
     716          async def main() -> None:
     717              cmd = f'''{sys.executable} -c "import os, sys; sys.stdout.write(os.getenv('FOO'))"'''
     718              env = os.environ.copy()
     719              env["FOO"] = "bar"
     720              proc = await asyncio.create_subprocess_shell(
     721                  cmd, env=env, stdout=subprocess.PIPE
     722              )
     723              return proc
     724  
     725          self.loop.run_until_complete(self.check_stdout_output(main(), b'bar'))
     726  
     727      def test_create_subprocess_env_exec(self) -> None:
     728          async def main() -> None:
     729              cmd = [sys.executable, "-c",
     730                     "import os, sys; sys.stdout.write(os.getenv('FOO'))"]
     731              env = os.environ.copy()
     732              env["FOO"] = "baz"
     733              proc = await asyncio.create_subprocess_exec(
     734                  *cmd, env=env, stdout=subprocess.PIPE
     735              )
     736              return proc
     737  
     738          self.loop.run_until_complete(self.check_stdout_output(main(), b'baz'))
     739  
     740  
     741      def test_subprocess_concurrent_wait(self) -> None:
     742          async def main() -> None:
     743              proc = await asyncio.create_subprocess_exec(
     744                  *PROGRAM_CAT,
     745                  stdin=subprocess.PIPE,
     746                  stdout=subprocess.PIPE,
     747              )
     748              stdout, _ = await proc.communicate(b'some data')
     749              self.assertEqual(stdout, b"some data")
     750              self.assertEqual(proc.returncode, 0)
     751              self.assertEqual(await asyncio.gather(*[proc.wait() for _ in range(10)]),
     752                               [proc.returncode] * 10)
     753  
     754          self.loop.run_until_complete(main())
     755  
     756      def test_subprocess_consistent_callbacks(self):
     757          events = []
     758          class ESC[4;38;5;81mMyProtocol(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mSubprocessProtocol):
     759              def __init__(self, exit_future: asyncio.Future) -> None:
     760                  self.exit_future = exit_future
     761  
     762              def pipe_data_received(self, fd, data) -> None:
     763                  events.append(('pipe_data_received', fd, data))
     764  
     765              def pipe_connection_lost(self, fd, exc) -> None:
     766                  events.append('pipe_connection_lost')
     767  
     768              def process_exited(self) -> None:
     769                  events.append('process_exited')
     770                  self.exit_future.set_result(True)
     771  
     772          async def main() -> None:
     773              loop = asyncio.get_running_loop()
     774              exit_future = asyncio.Future()
     775              code = 'import sys; sys.stdout.write("stdout"); sys.stderr.write("stderr")'
     776              transport, _ = await loop.subprocess_exec(lambda: MyProtocol(exit_future),
     777                                                        sys.executable, '-c', code, stdin=None)
     778              await exit_future
     779              transport.close()
     780              self.assertEqual(events, [
     781                  ('pipe_data_received', 1, b'stdout'),
     782                  ('pipe_data_received', 2, b'stderr'),
     783                  'pipe_connection_lost',
     784                  'pipe_connection_lost',
     785                  'process_exited',
     786              ])
     787  
     788          self.loop.run_until_complete(main())
     789  
     790      def test_subprocess_communicate_stdout(self):
     791          # See https://github.com/python/cpython/issues/100133
     792          async def get_command_stdout(cmd, *args):
     793              proc = await asyncio.create_subprocess_exec(
     794                  cmd, *args, stdout=asyncio.subprocess.PIPE,
     795              )
     796              stdout, _ = await proc.communicate()
     797              return stdout.decode().strip()
     798  
     799          async def main():
     800              outputs = [f'foo{i}' for i in range(10)]
     801              res = await asyncio.gather(*[get_command_stdout(sys.executable, '-c',
     802                                          f'print({out!r})') for out in outputs])
     803              self.assertEqual(res, outputs)
     804  
     805          self.loop.run_until_complete(main())
     806  
     807  
     808  if sys.platform != 'win32':
     809      # Unix
     810      class ESC[4;38;5;81mSubprocessWatcherMixin(ESC[4;38;5;149mSubprocessMixin):
     811  
     812          Watcher = None
     813  
     814          def setUp(self):
     815              super().setUp()
     816              policy = asyncio.get_event_loop_policy()
     817              self.loop = policy.new_event_loop()
     818              self.set_event_loop(self.loop)
     819  
     820              watcher = self._get_watcher()
     821              watcher.attach_loop(self.loop)
     822              with warnings.catch_warnings():
     823                  warnings.simplefilter('ignore', DeprecationWarning)
     824                  policy.set_child_watcher(watcher)
     825  
     826          def tearDown(self):
     827              super().tearDown()
     828              policy = asyncio.get_event_loop_policy()
     829              with warnings.catch_warnings():
     830                  warnings.simplefilter('ignore', DeprecationWarning)
     831                  watcher = policy.get_child_watcher()
     832                  policy.set_child_watcher(None)
     833              watcher.attach_loop(None)
     834              watcher.close()
     835  
     836      class ESC[4;38;5;81mSubprocessThreadedWatcherTests(ESC[4;38;5;149mSubprocessWatcherMixin,
     837                                           ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     838  
     839          def _get_watcher(self):
     840              return unix_events.ThreadedChildWatcher()
     841  
     842      class ESC[4;38;5;81mSubprocessSafeWatcherTests(ESC[4;38;5;149mSubprocessWatcherMixin,
     843                                       ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     844  
     845          def _get_watcher(self):
     846              with self.assertWarns(DeprecationWarning):
     847                  return unix_events.SafeChildWatcher()
     848  
     849      class ESC[4;38;5;81mMultiLoopChildWatcherTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     850  
     851          def test_warns(self):
     852              with self.assertWarns(DeprecationWarning):
     853                  unix_events.MultiLoopChildWatcher()
     854  
     855      class ESC[4;38;5;81mSubprocessFastWatcherTests(ESC[4;38;5;149mSubprocessWatcherMixin,
     856                                       ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     857  
     858          def _get_watcher(self):
     859              with self.assertWarns(DeprecationWarning):
     860                  return unix_events.FastChildWatcher()
     861  
     862      @unittest.skipUnless(
     863          unix_events.can_use_pidfd(),
     864          "operating system does not support pidfds",
     865      )
     866      class ESC[4;38;5;81mSubprocessPidfdWatcherTests(ESC[4;38;5;149mSubprocessWatcherMixin,
     867                                        ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     868  
     869          def _get_watcher(self):
     870              return unix_events.PidfdChildWatcher()
     871  
     872  
     873      class ESC[4;38;5;81mGenericWatcherTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     874  
     875          def test_create_subprocess_fails_with_inactive_watcher(self):
     876              watcher = mock.create_autospec(asyncio.AbstractChildWatcher)
     877              watcher.is_active.return_value = False
     878  
     879              async def execute():
     880                  asyncio.set_child_watcher(watcher)
     881  
     882                  with self.assertRaises(RuntimeError):
     883                      await subprocess.create_subprocess_exec(
     884                          os_helper.FakePath(sys.executable), '-c', 'pass')
     885  
     886                  watcher.add_child_handler.assert_not_called()
     887  
     888              with asyncio.Runner(loop_factory=asyncio.new_event_loop) as runner:
     889                  with warnings.catch_warnings():
     890                      warnings.simplefilter('ignore', DeprecationWarning)
     891                      self.assertIsNone(runner.run(execute()))
     892              self.assertListEqual(watcher.mock_calls, [
     893                  mock.call.__enter__(),
     894                  mock.call.is_active(),
     895                  mock.call.__exit__(RuntimeError, mock.ANY, mock.ANY),
     896              ], watcher.mock_calls)
     897  
     898  
     899          @unittest.skipUnless(
     900              unix_events.can_use_pidfd(),
     901              "operating system does not support pidfds",
     902          )
     903          def test_create_subprocess_with_pidfd(self):
     904              async def in_thread():
     905                  proc = await asyncio.create_subprocess_exec(
     906                      *PROGRAM_CAT,
     907                      stdin=subprocess.PIPE,
     908                      stdout=subprocess.PIPE,
     909                  )
     910                  stdout, stderr = await proc.communicate(b"some data")
     911                  return proc.returncode, stdout
     912  
     913              async def main():
     914                  # asyncio.Runner did not call asyncio.set_event_loop()
     915                  with self.assertRaises(RuntimeError):
     916                      asyncio.get_event_loop_policy().get_event_loop()
     917                  return await asyncio.to_thread(asyncio.run, in_thread())
     918              with self.assertWarns(DeprecationWarning):
     919                  asyncio.set_child_watcher(asyncio.PidfdChildWatcher())
     920              try:
     921                  with asyncio.Runner(loop_factory=asyncio.new_event_loop) as runner:
     922                      returncode, stdout = runner.run(main())
     923                  self.assertEqual(returncode, 0)
     924                  self.assertEqual(stdout, b'some data')
     925              finally:
     926                  with self.assertWarns(DeprecationWarning):
     927                      asyncio.set_child_watcher(None)
     928  else:
     929      # Windows
     930      class ESC[4;38;5;81mSubprocessProactorTests(ESC[4;38;5;149mSubprocessMixin, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     931  
     932          def setUp(self):
     933              super().setUp()
     934              self.loop = asyncio.ProactorEventLoop()
     935              self.set_event_loop(self.loop)
     936  
     937  
     938  if __name__ == '__main__':
     939      unittest.main()