(root)/
Python-3.11.7/
Lib/
asyncio/
base_subprocess.py
       1  import collections
       2  import subprocess
       3  import warnings
       4  
       5  from . import protocols
       6  from . import transports
       7  from .log import logger
       8  
       9  
      10  class ESC[4;38;5;81mBaseSubprocessTransport(ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149mSubprocessTransport):
      11  
      12      def __init__(self, loop, protocol, args, shell,
      13                   stdin, stdout, stderr, bufsize,
      14                   waiter=None, extra=None, **kwargs):
      15          super().__init__(extra)
      16          self._closed = False
      17          self._protocol = protocol
      18          self._loop = loop
      19          self._proc = None
      20          self._pid = None
      21          self._returncode = None
      22          self._exit_waiters = []
      23          self._pending_calls = collections.deque()
      24          self._pipes = {}
      25          self._finished = False
      26  
      27          if stdin == subprocess.PIPE:
      28              self._pipes[0] = None
      29          if stdout == subprocess.PIPE:
      30              self._pipes[1] = None
      31          if stderr == subprocess.PIPE:
      32              self._pipes[2] = None
      33  
      34          # Create the child process: set the _proc attribute
      35          try:
      36              self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
      37                          stderr=stderr, bufsize=bufsize, **kwargs)
      38          except:
      39              self.close()
      40              raise
      41  
      42          self._pid = self._proc.pid
      43          self._extra['subprocess'] = self._proc
      44  
      45          if self._loop.get_debug():
      46              if isinstance(args, (bytes, str)):
      47                  program = args
      48              else:
      49                  program = args[0]
      50              logger.debug('process %r created: pid %s',
      51                           program, self._pid)
      52  
      53          self._loop.create_task(self._connect_pipes(waiter))
      54  
      55      def __repr__(self):
      56          info = [self.__class__.__name__]
      57          if self._closed:
      58              info.append('closed')
      59          if self._pid is not None:
      60              info.append(f'pid={self._pid}')
      61          if self._returncode is not None:
      62              info.append(f'returncode={self._returncode}')
      63          elif self._pid is not None:
      64              info.append('running')
      65          else:
      66              info.append('not started')
      67  
      68          stdin = self._pipes.get(0)
      69          if stdin is not None:
      70              info.append(f'stdin={stdin.pipe}')
      71  
      72          stdout = self._pipes.get(1)
      73          stderr = self._pipes.get(2)
      74          if stdout is not None and stderr is stdout:
      75              info.append(f'stdout=stderr={stdout.pipe}')
      76          else:
      77              if stdout is not None:
      78                  info.append(f'stdout={stdout.pipe}')
      79              if stderr is not None:
      80                  info.append(f'stderr={stderr.pipe}')
      81  
      82          return '<{}>'.format(' '.join(info))
      83  
      84      def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
      85          raise NotImplementedError
      86  
      87      def set_protocol(self, protocol):
      88          self._protocol = protocol
      89  
      90      def get_protocol(self):
      91          return self._protocol
      92  
      93      def is_closing(self):
      94          return self._closed
      95  
      96      def close(self):
      97          if self._closed:
      98              return
      99          self._closed = True
     100  
     101          for proto in self._pipes.values():
     102              if proto is None:
     103                  continue
     104              proto.pipe.close()
     105  
     106          if (self._proc is not None and
     107                  # has the child process finished?
     108                  self._returncode is None and
     109                  # the child process has finished, but the
     110                  # transport hasn't been notified yet?
     111                  self._proc.poll() is None):
     112  
     113              if self._loop.get_debug():
     114                  logger.warning('Close running child process: kill %r', self)
     115  
     116              try:
     117                  self._proc.kill()
     118              except ProcessLookupError:
     119                  pass
     120  
     121              # Don't clear the _proc reference yet: _post_init() may still run
     122  
     123      def __del__(self, _warn=warnings.warn):
     124          if not self._closed:
     125              _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
     126              self.close()
     127  
     128      def get_pid(self):
     129          return self._pid
     130  
     131      def get_returncode(self):
     132          return self._returncode
     133  
     134      def get_pipe_transport(self, fd):
     135          if fd in self._pipes:
     136              return self._pipes[fd].pipe
     137          else:
     138              return None
     139  
     140      def _check_proc(self):
     141          if self._proc is None:
     142              raise ProcessLookupError()
     143  
     144      def send_signal(self, signal):
     145          self._check_proc()
     146          self._proc.send_signal(signal)
     147  
     148      def terminate(self):
     149          self._check_proc()
     150          self._proc.terminate()
     151  
     152      def kill(self):
     153          self._check_proc()
     154          self._proc.kill()
     155  
     156      async def _connect_pipes(self, waiter):
     157          try:
     158              proc = self._proc
     159              loop = self._loop
     160  
     161              if proc.stdin is not None:
     162                  _, pipe = await loop.connect_write_pipe(
     163                      lambda: WriteSubprocessPipeProto(self, 0),
     164                      proc.stdin)
     165                  self._pipes[0] = pipe
     166  
     167              if proc.stdout is not None:
     168                  _, pipe = await loop.connect_read_pipe(
     169                      lambda: ReadSubprocessPipeProto(self, 1),
     170                      proc.stdout)
     171                  self._pipes[1] = pipe
     172  
     173              if proc.stderr is not None:
     174                  _, pipe = await loop.connect_read_pipe(
     175                      lambda: ReadSubprocessPipeProto(self, 2),
     176                      proc.stderr)
     177                  self._pipes[2] = pipe
     178  
     179              assert self._pending_calls is not None
     180  
     181              loop.call_soon(self._protocol.connection_made, self)
     182              for callback, data in self._pending_calls:
     183                  loop.call_soon(callback, *data)
     184              self._pending_calls = None
     185          except (SystemExit, KeyboardInterrupt):
     186              raise
     187          except BaseException as exc:
     188              if waiter is not None and not waiter.cancelled():
     189                  waiter.set_exception(exc)
     190          else:
     191              if waiter is not None and not waiter.cancelled():
     192                  waiter.set_result(None)
     193  
     194      def _call(self, cb, *data):
     195          if self._pending_calls is not None:
     196              self._pending_calls.append((cb, data))
     197          else:
     198              self._loop.call_soon(cb, *data)
     199  
     200      def _pipe_connection_lost(self, fd, exc):
     201          self._call(self._protocol.pipe_connection_lost, fd, exc)
     202          self._try_finish()
     203  
     204      def _pipe_data_received(self, fd, data):
     205          self._call(self._protocol.pipe_data_received, fd, data)
     206  
     207      def _process_exited(self, returncode):
     208          assert returncode is not None, returncode
     209          assert self._returncode is None, self._returncode
     210          if self._loop.get_debug():
     211              logger.info('%r exited with return code %r', self, returncode)
     212          self._returncode = returncode
     213          if self._proc.returncode is None:
     214              # asyncio uses a child watcher: copy the status into the Popen
     215              # object. On Python 3.6, it is required to avoid a ResourceWarning.
     216              self._proc.returncode = returncode
     217          self._call(self._protocol.process_exited)
     218  
     219          self._try_finish()
     220  
     221      async def _wait(self):
     222          """Wait until the process exit and return the process return code.
     223  
     224          This method is a coroutine."""
     225          if self._returncode is not None:
     226              return self._returncode
     227  
     228          waiter = self._loop.create_future()
     229          self._exit_waiters.append(waiter)
     230          return await waiter
     231  
     232      def _try_finish(self):
     233          assert not self._finished
     234          if self._returncode is None:
     235              return
     236          if all(p is not None and p.disconnected
     237                 for p in self._pipes.values()):
     238              self._finished = True
     239              self._call(self._call_connection_lost, None)
     240  
     241      def _call_connection_lost(self, exc):
     242          try:
     243              self._protocol.connection_lost(exc)
     244          finally:
     245              # wake up futures waiting for wait()
     246              for waiter in self._exit_waiters:
     247                  if not waiter.cancelled():
     248                      waiter.set_result(self._returncode)
     249              self._exit_waiters = None
     250              self._loop = None
     251              self._proc = None
     252              self._protocol = None
     253  
     254  
     255  class ESC[4;38;5;81mWriteSubprocessPipeProto(ESC[4;38;5;149mprotocolsESC[4;38;5;149m.ESC[4;38;5;149mBaseProtocol):
     256  
     257      def __init__(self, proc, fd):
     258          self.proc = proc
     259          self.fd = fd
     260          self.pipe = None
     261          self.disconnected = False
     262  
     263      def connection_made(self, transport):
     264          self.pipe = transport
     265  
     266      def __repr__(self):
     267          return f'<{self.__class__.__name__} fd={self.fd} pipe={self.pipe!r}>'
     268  
     269      def connection_lost(self, exc):
     270          self.disconnected = True
     271          self.proc._pipe_connection_lost(self.fd, exc)
     272          self.proc = None
     273  
     274      def pause_writing(self):
     275          self.proc._protocol.pause_writing()
     276  
     277      def resume_writing(self):
     278          self.proc._protocol.resume_writing()
     279  
     280  
     281  class ESC[4;38;5;81mReadSubprocessPipeProto(ESC[4;38;5;149mWriteSubprocessPipeProto,
     282                                ESC[4;38;5;149mprotocolsESC[4;38;5;149m.ESC[4;38;5;149mProtocol):
     283  
     284      def data_received(self, data):
     285          self.proc._pipe_data_received(self.fd, data)