(root)/
Python-3.12.0/
Lib/
pty.py
       1  """Pseudo terminal utilities."""
       2  
       3  # Bugs: No signal handling.  Doesn't set slave termios and window size.
       4  #       Only tested on Linux, FreeBSD, and macOS.
       5  # See:  W. Richard Stevens. 1992.  Advanced Programming in the
       6  #       UNIX Environment.  Chapter 19.
       7  # Author: Steen Lumholt -- with additions by Guido.
       8  
       9  from select import select
      10  import os
      11  import sys
      12  import tty
      13  
      14  # names imported directly for test mocking purposes
      15  from os import close, waitpid
      16  from tty import setraw, tcgetattr, tcsetattr
      17  
      18  __all__ = ["openpty", "fork", "spawn"]
      19  
      20  STDIN_FILENO = 0
      21  STDOUT_FILENO = 1
      22  STDERR_FILENO = 2
      23  
      24  CHILD = 0
      25  
      26  def openpty():
      27      """openpty() -> (master_fd, slave_fd)
      28      Open a pty master/slave pair, using os.openpty() if possible."""
      29  
      30      try:
      31          return os.openpty()
      32      except (AttributeError, OSError):
      33          pass
      34      master_fd, slave_name = _open_terminal()
      35      slave_fd = slave_open(slave_name)
      36      return master_fd, slave_fd
      37  
      38  def master_open():
      39      """master_open() -> (master_fd, slave_name)
      40      Open a pty master and return the fd, and the filename of the slave end.
      41      Deprecated, use openpty() instead."""
      42  
      43      import warnings
      44      warnings.warn("Use pty.openpty() instead.", DeprecationWarning, stacklevel=2)  # Remove API in 3.14
      45  
      46      try:
      47          master_fd, slave_fd = os.openpty()
      48      except (AttributeError, OSError):
      49          pass
      50      else:
      51          slave_name = os.ttyname(slave_fd)
      52          os.close(slave_fd)
      53          return master_fd, slave_name
      54  
      55      return _open_terminal()
      56  
      57  def _open_terminal():
      58      """Open pty master and return (master_fd, tty_name)."""
      59      for x in 'pqrstuvwxyzPQRST':
      60          for y in '0123456789abcdef':
      61              pty_name = '/dev/pty' + x + y
      62              try:
      63                  fd = os.open(pty_name, os.O_RDWR)
      64              except OSError:
      65                  continue
      66              return (fd, '/dev/tty' + x + y)
      67      raise OSError('out of pty devices')
      68  
      69  def slave_open(tty_name):
      70      """slave_open(tty_name) -> slave_fd
      71      Open the pty slave and acquire the controlling terminal, returning
      72      opened filedescriptor.
      73      Deprecated, use openpty() instead."""
      74  
      75      import warnings
      76      warnings.warn("Use pty.openpty() instead.", DeprecationWarning, stacklevel=2)  # Remove API in 3.14
      77  
      78      result = os.open(tty_name, os.O_RDWR)
      79      try:
      80          from fcntl import ioctl, I_PUSH
      81      except ImportError:
      82          return result
      83      try:
      84          ioctl(result, I_PUSH, "ptem")
      85          ioctl(result, I_PUSH, "ldterm")
      86      except OSError:
      87          pass
      88      return result
      89  
      90  def fork():
      91      """fork() -> (pid, master_fd)
      92      Fork and make the child a session leader with a controlling terminal."""
      93  
      94      try:
      95          pid, fd = os.forkpty()
      96      except (AttributeError, OSError):
      97          pass
      98      else:
      99          if pid == CHILD:
     100              try:
     101                  os.setsid()
     102              except OSError:
     103                  # os.forkpty() already set us session leader
     104                  pass
     105          return pid, fd
     106  
     107      master_fd, slave_fd = openpty()
     108      pid = os.fork()
     109      if pid == CHILD:
     110          os.close(master_fd)
     111          os.login_tty(slave_fd)
     112      else:
     113          os.close(slave_fd)
     114  
     115      # Parent and child process.
     116      return pid, master_fd
     117  
     118  def _read(fd):
     119      """Default read function."""
     120      return os.read(fd, 1024)
     121  
     122  def _copy(master_fd, master_read=_read, stdin_read=_read):
     123      """Parent copy loop.
     124      Copies
     125              pty master -> standard output   (master_read)
     126              standard input -> pty master    (stdin_read)"""
     127      if os.get_blocking(master_fd):
     128          # If we write more than tty/ndisc is willing to buffer, we may block
     129          # indefinitely. So we set master_fd to non-blocking temporarily during
     130          # the copy operation.
     131          os.set_blocking(master_fd, False)
     132          try:
     133              _copy(master_fd, master_read=master_read, stdin_read=stdin_read)
     134          finally:
     135              # restore blocking mode for backwards compatibility
     136              os.set_blocking(master_fd, True)
     137          return
     138      high_waterlevel = 4096
     139      stdin_avail = master_fd != STDIN_FILENO
     140      stdout_avail = master_fd != STDOUT_FILENO
     141      i_buf = b''
     142      o_buf = b''
     143      while 1:
     144          rfds = []
     145          wfds = []
     146          if stdin_avail and len(i_buf) < high_waterlevel:
     147              rfds.append(STDIN_FILENO)
     148          if stdout_avail and len(o_buf) < high_waterlevel:
     149              rfds.append(master_fd)
     150          if stdout_avail and len(o_buf) > 0:
     151              wfds.append(STDOUT_FILENO)
     152          if len(i_buf) > 0:
     153              wfds.append(master_fd)
     154  
     155          rfds, wfds, _xfds = select(rfds, wfds, [])
     156  
     157          if STDOUT_FILENO in wfds:
     158              try:
     159                  n = os.write(STDOUT_FILENO, o_buf)
     160                  o_buf = o_buf[n:]
     161              except OSError:
     162                  stdout_avail = False
     163  
     164          if master_fd in rfds:
     165              # Some OSes signal EOF by returning an empty byte string,
     166              # some throw OSErrors.
     167              try:
     168                  data = master_read(master_fd)
     169              except OSError:
     170                  data = b""
     171              if not data:  # Reached EOF.
     172                  return    # Assume the child process has exited and is
     173                            # unreachable, so we clean up.
     174              o_buf += data
     175  
     176          if master_fd in wfds:
     177              n = os.write(master_fd, i_buf)
     178              i_buf = i_buf[n:]
     179  
     180          if stdin_avail and STDIN_FILENO in rfds:
     181              data = stdin_read(STDIN_FILENO)
     182              if not data:
     183                  stdin_avail = False
     184              else:
     185                  i_buf += data
     186  
     187  def spawn(argv, master_read=_read, stdin_read=_read):
     188      """Create a spawned process."""
     189      if isinstance(argv, str):
     190          argv = (argv,)
     191      sys.audit('pty.spawn', argv)
     192  
     193      pid, master_fd = fork()
     194      if pid == CHILD:
     195          os.execlp(argv[0], *argv)
     196  
     197      try:
     198          mode = tcgetattr(STDIN_FILENO)
     199          setraw(STDIN_FILENO)
     200          restore = True
     201      except tty.error:    # This is the same as termios.error
     202          restore = False
     203  
     204      try:
     205          _copy(master_fd, master_read, stdin_read)
     206      finally:
     207          if restore:
     208              tcsetattr(STDIN_FILENO, tty.TCSAFLUSH, mode)
     209  
     210      close(master_fd)
     211      return waitpid(pid, 0)[1]