(root)/
Python-3.11.7/
Lib/
pty.py
       1  """Pseudo terminal utilities."""
       2  
       3  # Bugs: No signal handling.  Doesn't set slave termios and window size.
       4  #       Only tested on Linux, FreeBSD, and macOS.
       5  # See:  W. Richard Stevens. 1992.  Advanced Programming in the
       6  #       UNIX Environment.  Chapter 19.
       7  # Author: Steen Lumholt -- with additions by Guido.
       8  
       9  from select import select
      10  import os
      11  import sys
      12  import tty
      13  
      14  # names imported directly for test mocking purposes
      15  from os import close, waitpid
      16  from tty import setraw, tcgetattr, tcsetattr
      17  
      18  __all__ = ["openpty", "fork", "spawn"]
      19  
      20  STDIN_FILENO = 0
      21  STDOUT_FILENO = 1
      22  STDERR_FILENO = 2
      23  
      24  CHILD = 0
      25  
      26  def openpty():
      27      """openpty() -> (master_fd, slave_fd)
      28      Open a pty master/slave pair, using os.openpty() if possible."""
      29  
      30      try:
      31          return os.openpty()
      32      except (AttributeError, OSError):
      33          pass
      34      master_fd, slave_name = _open_terminal()
      35      slave_fd = slave_open(slave_name)
      36      return master_fd, slave_fd
      37  
      38  def master_open():
      39      """master_open() -> (master_fd, slave_name)
      40      Open a pty master and return the fd, and the filename of the slave end.
      41      Deprecated, use openpty() instead."""
      42  
      43      try:
      44          master_fd, slave_fd = os.openpty()
      45      except (AttributeError, OSError):
      46          pass
      47      else:
      48          slave_name = os.ttyname(slave_fd)
      49          os.close(slave_fd)
      50          return master_fd, slave_name
      51  
      52      return _open_terminal()
      53  
      54  def _open_terminal():
      55      """Open pty master and return (master_fd, tty_name)."""
      56      for x in 'pqrstuvwxyzPQRST':
      57          for y in '0123456789abcdef':
      58              pty_name = '/dev/pty' + x + y
      59              try:
      60                  fd = os.open(pty_name, os.O_RDWR)
      61              except OSError:
      62                  continue
      63              return (fd, '/dev/tty' + x + y)
      64      raise OSError('out of pty devices')
      65  
      66  def slave_open(tty_name):
      67      """slave_open(tty_name) -> slave_fd
      68      Open the pty slave and acquire the controlling terminal, returning
      69      opened filedescriptor.
      70      Deprecated, use openpty() instead."""
      71  
      72      result = os.open(tty_name, os.O_RDWR)
      73      try:
      74          from fcntl import ioctl, I_PUSH
      75      except ImportError:
      76          return result
      77      try:
      78          ioctl(result, I_PUSH, "ptem")
      79          ioctl(result, I_PUSH, "ldterm")
      80      except OSError:
      81          pass
      82      return result
      83  
      84  def fork():
      85      """fork() -> (pid, master_fd)
      86      Fork and make the child a session leader with a controlling terminal."""
      87  
      88      try:
      89          pid, fd = os.forkpty()
      90      except (AttributeError, OSError):
      91          pass
      92      else:
      93          if pid == CHILD:
      94              try:
      95                  os.setsid()
      96              except OSError:
      97                  # os.forkpty() already set us session leader
      98                  pass
      99          return pid, fd
     100  
     101      master_fd, slave_fd = openpty()
     102      pid = os.fork()
     103      if pid == CHILD:
     104          # Establish a new session.
     105          os.setsid()
     106          os.close(master_fd)
     107  
     108          # Slave becomes stdin/stdout/stderr of child.
     109          os.dup2(slave_fd, STDIN_FILENO)
     110          os.dup2(slave_fd, STDOUT_FILENO)
     111          os.dup2(slave_fd, STDERR_FILENO)
     112          if slave_fd > STDERR_FILENO:
     113              os.close(slave_fd)
     114  
     115          # Explicitly open the tty to make it become a controlling tty.
     116          tmp_fd = os.open(os.ttyname(STDOUT_FILENO), os.O_RDWR)
     117          os.close(tmp_fd)
     118      else:
     119          os.close(slave_fd)
     120  
     121      # Parent and child process.
     122      return pid, master_fd
     123  
     124  def _read(fd):
     125      """Default read function."""
     126      return os.read(fd, 1024)
     127  
     128  def _copy(master_fd, master_read=_read, stdin_read=_read):
     129      """Parent copy loop.
     130      Copies
     131              pty master -> standard output   (master_read)
     132              standard input -> pty master    (stdin_read)"""
     133      if os.get_blocking(master_fd):
     134          # If we write more than tty/ndisc is willing to buffer, we may block
     135          # indefinitely. So we set master_fd to non-blocking temporarily during
     136          # the copy operation.
     137          os.set_blocking(master_fd, False)
     138          try:
     139              _copy(master_fd, master_read=master_read, stdin_read=stdin_read)
     140          finally:
     141              # restore blocking mode for backwards compatibility
     142              os.set_blocking(master_fd, True)
     143          return
     144      high_waterlevel = 4096
     145      stdin_avail = master_fd != STDIN_FILENO
     146      stdout_avail = master_fd != STDOUT_FILENO
     147      i_buf = b''
     148      o_buf = b''
     149      while 1:
     150          rfds = []
     151          wfds = []
     152          if stdin_avail and len(i_buf) < high_waterlevel:
     153              rfds.append(STDIN_FILENO)
     154          if stdout_avail and len(o_buf) < high_waterlevel:
     155              rfds.append(master_fd)
     156          if stdout_avail and len(o_buf) > 0:
     157              wfds.append(STDOUT_FILENO)
     158          if len(i_buf) > 0:
     159              wfds.append(master_fd)
     160  
     161          rfds, wfds, _xfds = select(rfds, wfds, [])
     162  
     163          if STDOUT_FILENO in wfds:
     164              try:
     165                  n = os.write(STDOUT_FILENO, o_buf)
     166                  o_buf = o_buf[n:]
     167              except OSError:
     168                  stdout_avail = False
     169  
     170          if master_fd in rfds:
     171              # Some OSes signal EOF by returning an empty byte string,
     172              # some throw OSErrors.
     173              try:
     174                  data = master_read(master_fd)
     175              except OSError:
     176                  data = b""
     177              if not data:  # Reached EOF.
     178                  return    # Assume the child process has exited and is
     179                            # unreachable, so we clean up.
     180              o_buf += data
     181  
     182          if master_fd in wfds:
     183              n = os.write(master_fd, i_buf)
     184              i_buf = i_buf[n:]
     185  
     186          if stdin_avail and STDIN_FILENO in rfds:
     187              data = stdin_read(STDIN_FILENO)
     188              if not data:
     189                  stdin_avail = False
     190              else:
     191                  i_buf += data
     192  
     193  def spawn(argv, master_read=_read, stdin_read=_read):
     194      """Create a spawned process."""
     195      if type(argv) == type(''):
     196          argv = (argv,)
     197      sys.audit('pty.spawn', argv)
     198  
     199      pid, master_fd = fork()
     200      if pid == CHILD:
     201          os.execlp(argv[0], *argv)
     202  
     203      try:
     204          mode = tcgetattr(STDIN_FILENO)
     205          setraw(STDIN_FILENO)
     206          restore = True
     207      except tty.error:    # This is the same as termios.error
     208          restore = False
     209  
     210      try:
     211          _copy(master_fd, master_read, stdin_read)
     212      finally:
     213          if restore:
     214              tcsetattr(STDIN_FILENO, tty.TCSAFLUSH, mode)
     215  
     216      close(master_fd)
     217      return waitpid(pid, 0)[1]