python (3.12.0)

(root)/
lib/
python3.12/
multiprocessing/
connection.py
       1  #
       2  # A higher level module for using sockets (or Windows named pipes)
       3  #
       4  # multiprocessing/connection.py
       5  #
       6  # Copyright (c) 2006-2008, R Oudkerk
       7  # Licensed to PSF under a Contributor Agreement.
       8  #
       9  
      10  __all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
      11  
      12  import io
      13  import os
      14  import sys
      15  import socket
      16  import struct
      17  import time
      18  import tempfile
      19  import itertools
      20  
      21  import _multiprocessing
      22  
      23  from . import util
      24  
      25  from . import AuthenticationError, BufferTooShort
      26  from .context import reduction
      27  _ForkingPickler = reduction.ForkingPickler
      28  
      29  try:
      30      import _winapi
      31      from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE
      32  except ImportError:
      33      if sys.platform == 'win32':
      34          raise
      35      _winapi = None
      36  
      37  #
      38  #
      39  #
      40  
      41  BUFSIZE = 8192
      42  # A very generous timeout when it comes to local connections...
      43  CONNECTION_TIMEOUT = 20.
      44  
      45  _mmap_counter = itertools.count()
      46  
      47  default_family = 'AF_INET'
      48  families = ['AF_INET']
      49  
      50  if hasattr(socket, 'AF_UNIX'):
      51      default_family = 'AF_UNIX'
      52      families += ['AF_UNIX']
      53  
      54  if sys.platform == 'win32':
      55      default_family = 'AF_PIPE'
      56      families += ['AF_PIPE']
      57  
      58  
      59  def _init_timeout(timeout=CONNECTION_TIMEOUT):
      60      return time.monotonic() + timeout
      61  
      62  def _check_timeout(t):
      63      return time.monotonic() > t
      64  
      65  #
      66  #
      67  #
      68  
      69  def arbitrary_address(family):
      70      '''
      71      Return an arbitrary free address for the given family
      72      '''
      73      if family == 'AF_INET':
      74          return ('localhost', 0)
      75      elif family == 'AF_UNIX':
      76          return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir())
      77      elif family == 'AF_PIPE':
      78          return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
      79                                 (os.getpid(), next(_mmap_counter)), dir="")
      80      else:
      81          raise ValueError('unrecognized family')
      82  
      83  def _validate_family(family):
      84      '''
      85      Checks if the family is valid for the current environment.
      86      '''
      87      if sys.platform != 'win32' and family == 'AF_PIPE':
      88          raise ValueError('Family %s is not recognized.' % family)
      89  
      90      if sys.platform == 'win32' and family == 'AF_UNIX':
      91          # double check
      92          if not hasattr(socket, family):
      93              raise ValueError('Family %s is not recognized.' % family)
      94  
      95  def address_type(address):
      96      '''
      97      Return the types of the address
      98  
      99      This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
     100      '''
     101      if type(address) == tuple:
     102          return 'AF_INET'
     103      elif type(address) is str and address.startswith('\\\\'):
     104          return 'AF_PIPE'
     105      elif type(address) is str or util.is_abstract_socket_namespace(address):
     106          return 'AF_UNIX'
     107      else:
     108          raise ValueError('address type of %r unrecognized' % address)
     109  
     110  #
     111  # Connection classes
     112  #
     113  
     114  class ESC[4;38;5;81m_ConnectionBase:
     115      _handle = None
     116  
     117      def __init__(self, handle, readable=True, writable=True):
     118          handle = handle.__index__()
     119          if handle < 0:
     120              raise ValueError("invalid handle")
     121          if not readable and not writable:
     122              raise ValueError(
     123                  "at least one of `readable` and `writable` must be True")
     124          self._handle = handle
     125          self._readable = readable
     126          self._writable = writable
     127  
     128      # XXX should we use util.Finalize instead of a __del__?
     129  
     130      def __del__(self):
     131          if self._handle is not None:
     132              self._close()
     133  
     134      def _check_closed(self):
     135          if self._handle is None:
     136              raise OSError("handle is closed")
     137  
     138      def _check_readable(self):
     139          if not self._readable:
     140              raise OSError("connection is write-only")
     141  
     142      def _check_writable(self):
     143          if not self._writable:
     144              raise OSError("connection is read-only")
     145  
     146      def _bad_message_length(self):
     147          if self._writable:
     148              self._readable = False
     149          else:
     150              self.close()
     151          raise OSError("bad message length")
     152  
     153      @property
     154      def closed(self):
     155          """True if the connection is closed"""
     156          return self._handle is None
     157  
     158      @property
     159      def readable(self):
     160          """True if the connection is readable"""
     161          return self._readable
     162  
     163      @property
     164      def writable(self):
     165          """True if the connection is writable"""
     166          return self._writable
     167  
     168      def fileno(self):
     169          """File descriptor or handle of the connection"""
     170          self._check_closed()
     171          return self._handle
     172  
     173      def close(self):
     174          """Close the connection"""
     175          if self._handle is not None:
     176              try:
     177                  self._close()
     178              finally:
     179                  self._handle = None
     180  
     181      def send_bytes(self, buf, offset=0, size=None):
     182          """Send the bytes data from a bytes-like object"""
     183          self._check_closed()
     184          self._check_writable()
     185          m = memoryview(buf)
     186          if m.itemsize > 1:
     187              m = m.cast('B')
     188          n = m.nbytes
     189          if offset < 0:
     190              raise ValueError("offset is negative")
     191          if n < offset:
     192              raise ValueError("buffer length < offset")
     193          if size is None:
     194              size = n - offset
     195          elif size < 0:
     196              raise ValueError("size is negative")
     197          elif offset + size > n:
     198              raise ValueError("buffer length < offset + size")
     199          self._send_bytes(m[offset:offset + size])
     200  
     201      def send(self, obj):
     202          """Send a (picklable) object"""
     203          self._check_closed()
     204          self._check_writable()
     205          self._send_bytes(_ForkingPickler.dumps(obj))
     206  
     207      def recv_bytes(self, maxlength=None):
     208          """
     209          Receive bytes data as a bytes object.
     210          """
     211          self._check_closed()
     212          self._check_readable()
     213          if maxlength is not None and maxlength < 0:
     214              raise ValueError("negative maxlength")
     215          buf = self._recv_bytes(maxlength)
     216          if buf is None:
     217              self._bad_message_length()
     218          return buf.getvalue()
     219  
     220      def recv_bytes_into(self, buf, offset=0):
     221          """
     222          Receive bytes data into a writeable bytes-like object.
     223          Return the number of bytes read.
     224          """
     225          self._check_closed()
     226          self._check_readable()
     227          with memoryview(buf) as m:
     228              # Get bytesize of arbitrary buffer
     229              itemsize = m.itemsize
     230              bytesize = itemsize * len(m)
     231              if offset < 0:
     232                  raise ValueError("negative offset")
     233              elif offset > bytesize:
     234                  raise ValueError("offset too large")
     235              result = self._recv_bytes()
     236              size = result.tell()
     237              if bytesize < offset + size:
     238                  raise BufferTooShort(result.getvalue())
     239              # Message can fit in dest
     240              result.seek(0)
     241              result.readinto(m[offset // itemsize :
     242                                (offset + size) // itemsize])
     243              return size
     244  
     245      def recv(self):
     246          """Receive a (picklable) object"""
     247          self._check_closed()
     248          self._check_readable()
     249          buf = self._recv_bytes()
     250          return _ForkingPickler.loads(buf.getbuffer())
     251  
     252      def poll(self, timeout=0.0):
     253          """Whether there is any input available to be read"""
     254          self._check_closed()
     255          self._check_readable()
     256          return self._poll(timeout)
     257  
     258      def __enter__(self):
     259          return self
     260  
     261      def __exit__(self, exc_type, exc_value, exc_tb):
     262          self.close()
     263  
     264  
     265  if _winapi:
     266  
     267      class ESC[4;38;5;81mPipeConnection(ESC[4;38;5;149m_ConnectionBase):
     268          """
     269          Connection class based on a Windows named pipe.
     270          Overlapped I/O is used, so the handles must have been created
     271          with FILE_FLAG_OVERLAPPED.
     272          """
     273          _got_empty_message = False
     274  
     275          def _close(self, _CloseHandle=_winapi.CloseHandle):
     276              _CloseHandle(self._handle)
     277  
     278          def _send_bytes(self, buf):
     279              ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
     280              try:
     281                  if err == _winapi.ERROR_IO_PENDING:
     282                      waitres = _winapi.WaitForMultipleObjects(
     283                          [ov.event], False, INFINITE)
     284                      assert waitres == WAIT_OBJECT_0
     285              except:
     286                  ov.cancel()
     287                  raise
     288              finally:
     289                  nwritten, err = ov.GetOverlappedResult(True)
     290              assert err == 0
     291              assert nwritten == len(buf)
     292  
     293          def _recv_bytes(self, maxsize=None):
     294              if self._got_empty_message:
     295                  self._got_empty_message = False
     296                  return io.BytesIO()
     297              else:
     298                  bsize = 128 if maxsize is None else min(maxsize, 128)
     299                  try:
     300                      ov, err = _winapi.ReadFile(self._handle, bsize,
     301                                                  overlapped=True)
     302                      try:
     303                          if err == _winapi.ERROR_IO_PENDING:
     304                              waitres = _winapi.WaitForMultipleObjects(
     305                                  [ov.event], False, INFINITE)
     306                              assert waitres == WAIT_OBJECT_0
     307                      except:
     308                          ov.cancel()
     309                          raise
     310                      finally:
     311                          nread, err = ov.GetOverlappedResult(True)
     312                          if err == 0:
     313                              f = io.BytesIO()
     314                              f.write(ov.getbuffer())
     315                              return f
     316                          elif err == _winapi.ERROR_MORE_DATA:
     317                              return self._get_more_data(ov, maxsize)
     318                  except OSError as e:
     319                      if e.winerror == _winapi.ERROR_BROKEN_PIPE:
     320                          raise EOFError
     321                      else:
     322                          raise
     323              raise RuntimeError("shouldn't get here; expected KeyboardInterrupt")
     324  
     325          def _poll(self, timeout):
     326              if (self._got_empty_message or
     327                          _winapi.PeekNamedPipe(self._handle)[0] != 0):
     328                  return True
     329              return bool(wait([self], timeout))
     330  
     331          def _get_more_data(self, ov, maxsize):
     332              buf = ov.getbuffer()
     333              f = io.BytesIO()
     334              f.write(buf)
     335              left = _winapi.PeekNamedPipe(self._handle)[1]
     336              assert left > 0
     337              if maxsize is not None and len(buf) + left > maxsize:
     338                  self._bad_message_length()
     339              ov, err = _winapi.ReadFile(self._handle, left, overlapped=True)
     340              rbytes, err = ov.GetOverlappedResult(True)
     341              assert err == 0
     342              assert rbytes == left
     343              f.write(ov.getbuffer())
     344              return f
     345  
     346  
     347  class ESC[4;38;5;81mConnection(ESC[4;38;5;149m_ConnectionBase):
     348      """
     349      Connection class based on an arbitrary file descriptor (Unix only), or
     350      a socket handle (Windows).
     351      """
     352  
     353      if _winapi:
     354          def _close(self, _close=_multiprocessing.closesocket):
     355              _close(self._handle)
     356          _write = _multiprocessing.send
     357          _read = _multiprocessing.recv
     358      else:
     359          def _close(self, _close=os.close):
     360              _close(self._handle)
     361          _write = os.write
     362          _read = os.read
     363  
     364      def _send(self, buf, write=_write):
     365          remaining = len(buf)
     366          while True:
     367              n = write(self._handle, buf)
     368              remaining -= n
     369              if remaining == 0:
     370                  break
     371              buf = buf[n:]
     372  
     373      def _recv(self, size, read=_read):
     374          buf = io.BytesIO()
     375          handle = self._handle
     376          remaining = size
     377          while remaining > 0:
     378              chunk = read(handle, remaining)
     379              n = len(chunk)
     380              if n == 0:
     381                  if remaining == size:
     382                      raise EOFError
     383                  else:
     384                      raise OSError("got end of file during message")
     385              buf.write(chunk)
     386              remaining -= n
     387          return buf
     388  
     389      def _send_bytes(self, buf):
     390          n = len(buf)
     391          if n > 0x7fffffff:
     392              pre_header = struct.pack("!i", -1)
     393              header = struct.pack("!Q", n)
     394              self._send(pre_header)
     395              self._send(header)
     396              self._send(buf)
     397          else:
     398              # For wire compatibility with 3.7 and lower
     399              header = struct.pack("!i", n)
     400              if n > 16384:
     401                  # The payload is large so Nagle's algorithm won't be triggered
     402                  # and we'd better avoid the cost of concatenation.
     403                  self._send(header)
     404                  self._send(buf)
     405              else:
     406                  # Issue #20540: concatenate before sending, to avoid delays due
     407                  # to Nagle's algorithm on a TCP socket.
     408                  # Also note we want to avoid sending a 0-length buffer separately,
     409                  # to avoid "broken pipe" errors if the other end closed the pipe.
     410                  self._send(header + buf)
     411  
     412      def _recv_bytes(self, maxsize=None):
     413          buf = self._recv(4)
     414          size, = struct.unpack("!i", buf.getvalue())
     415          if size == -1:
     416              buf = self._recv(8)
     417              size, = struct.unpack("!Q", buf.getvalue())
     418          if maxsize is not None and size > maxsize:
     419              return None
     420          return self._recv(size)
     421  
     422      def _poll(self, timeout):
     423          r = wait([self], timeout)
     424          return bool(r)
     425  
     426  
     427  #
     428  # Public functions
     429  #
     430  
     431  class ESC[4;38;5;81mListener(ESC[4;38;5;149mobject):
     432      '''
     433      Returns a listener object.
     434  
     435      This is a wrapper for a bound socket which is 'listening' for
     436      connections, or for a Windows named pipe.
     437      '''
     438      def __init__(self, address=None, family=None, backlog=1, authkey=None):
     439          family = family or (address and address_type(address)) \
     440                   or default_family
     441          address = address or arbitrary_address(family)
     442  
     443          _validate_family(family)
     444          if family == 'AF_PIPE':
     445              self._listener = PipeListener(address, backlog)
     446          else:
     447              self._listener = SocketListener(address, family, backlog)
     448  
     449          if authkey is not None and not isinstance(authkey, bytes):
     450              raise TypeError('authkey should be a byte string')
     451  
     452          self._authkey = authkey
     453  
     454      def accept(self):
     455          '''
     456          Accept a connection on the bound socket or named pipe of `self`.
     457  
     458          Returns a `Connection` object.
     459          '''
     460          if self._listener is None:
     461              raise OSError('listener is closed')
     462          c = self._listener.accept()
     463          if self._authkey:
     464              deliver_challenge(c, self._authkey)
     465              answer_challenge(c, self._authkey)
     466          return c
     467  
     468      def close(self):
     469          '''
     470          Close the bound socket or named pipe of `self`.
     471          '''
     472          listener = self._listener
     473          if listener is not None:
     474              self._listener = None
     475              listener.close()
     476  
     477      @property
     478      def address(self):
     479          return self._listener._address
     480  
     481      @property
     482      def last_accepted(self):
     483          return self._listener._last_accepted
     484  
     485      def __enter__(self):
     486          return self
     487  
     488      def __exit__(self, exc_type, exc_value, exc_tb):
     489          self.close()
     490  
     491  
     492  def Client(address, family=None, authkey=None):
     493      '''
     494      Returns a connection to the address of a `Listener`
     495      '''
     496      family = family or address_type(address)
     497      _validate_family(family)
     498      if family == 'AF_PIPE':
     499          c = PipeClient(address)
     500      else:
     501          c = SocketClient(address)
     502  
     503      if authkey is not None and not isinstance(authkey, bytes):
     504          raise TypeError('authkey should be a byte string')
     505  
     506      if authkey is not None:
     507          answer_challenge(c, authkey)
     508          deliver_challenge(c, authkey)
     509  
     510      return c
     511  
     512  
     513  if sys.platform != 'win32':
     514  
     515      def Pipe(duplex=True):
     516          '''
     517          Returns pair of connection objects at either end of a pipe
     518          '''
     519          if duplex:
     520              s1, s2 = socket.socketpair()
     521              s1.setblocking(True)
     522              s2.setblocking(True)
     523              c1 = Connection(s1.detach())
     524              c2 = Connection(s2.detach())
     525          else:
     526              fd1, fd2 = os.pipe()
     527              c1 = Connection(fd1, writable=False)
     528              c2 = Connection(fd2, readable=False)
     529  
     530          return c1, c2
     531  
     532  else:
     533  
     534      def Pipe(duplex=True):
     535          '''
     536          Returns pair of connection objects at either end of a pipe
     537          '''
     538          address = arbitrary_address('AF_PIPE')
     539          if duplex:
     540              openmode = _winapi.PIPE_ACCESS_DUPLEX
     541              access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
     542              obsize, ibsize = BUFSIZE, BUFSIZE
     543          else:
     544              openmode = _winapi.PIPE_ACCESS_INBOUND
     545              access = _winapi.GENERIC_WRITE
     546              obsize, ibsize = 0, BUFSIZE
     547  
     548          h1 = _winapi.CreateNamedPipe(
     549              address, openmode | _winapi.FILE_FLAG_OVERLAPPED |
     550              _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE,
     551              _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
     552              _winapi.PIPE_WAIT,
     553              1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER,
     554              # default security descriptor: the handle cannot be inherited
     555              _winapi.NULL
     556              )
     557          h2 = _winapi.CreateFile(
     558              address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
     559              _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
     560              )
     561          _winapi.SetNamedPipeHandleState(
     562              h2, _winapi.PIPE_READMODE_MESSAGE, None, None
     563              )
     564  
     565          overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True)
     566          _, err = overlapped.GetOverlappedResult(True)
     567          assert err == 0
     568  
     569          c1 = PipeConnection(h1, writable=duplex)
     570          c2 = PipeConnection(h2, readable=duplex)
     571  
     572          return c1, c2
     573  
     574  #
     575  # Definitions for connections based on sockets
     576  #
     577  
     578  class ESC[4;38;5;81mSocketListener(ESC[4;38;5;149mobject):
     579      '''
     580      Representation of a socket which is bound to an address and listening
     581      '''
     582      def __init__(self, address, family, backlog=1):
     583          self._socket = socket.socket(getattr(socket, family))
     584          try:
     585              # SO_REUSEADDR has different semantics on Windows (issue #2550).
     586              if os.name == 'posix':
     587                  self._socket.setsockopt(socket.SOL_SOCKET,
     588                                          socket.SO_REUSEADDR, 1)
     589              self._socket.setblocking(True)
     590              self._socket.bind(address)
     591              self._socket.listen(backlog)
     592              self._address = self._socket.getsockname()
     593          except OSError:
     594              self._socket.close()
     595              raise
     596          self._family = family
     597          self._last_accepted = None
     598  
     599          if family == 'AF_UNIX' and not util.is_abstract_socket_namespace(address):
     600              # Linux abstract socket namespaces do not need to be explicitly unlinked
     601              self._unlink = util.Finalize(
     602                  self, os.unlink, args=(address,), exitpriority=0
     603                  )
     604          else:
     605              self._unlink = None
     606  
     607      def accept(self):
     608          s, self._last_accepted = self._socket.accept()
     609          s.setblocking(True)
     610          return Connection(s.detach())
     611  
     612      def close(self):
     613          try:
     614              self._socket.close()
     615          finally:
     616              unlink = self._unlink
     617              if unlink is not None:
     618                  self._unlink = None
     619                  unlink()
     620  
     621  
     622  def SocketClient(address):
     623      '''
     624      Return a connection object connected to the socket given by `address`
     625      '''
     626      family = address_type(address)
     627      with socket.socket( getattr(socket, family) ) as s:
     628          s.setblocking(True)
     629          s.connect(address)
     630          return Connection(s.detach())
     631  
     632  #
     633  # Definitions for connections based on named pipes
     634  #
     635  
     636  if sys.platform == 'win32':
     637  
     638      class ESC[4;38;5;81mPipeListener(ESC[4;38;5;149mobject):
     639          '''
     640          Representation of a named pipe
     641          '''
     642          def __init__(self, address, backlog=None):
     643              self._address = address
     644              self._handle_queue = [self._new_handle(first=True)]
     645  
     646              self._last_accepted = None
     647              util.sub_debug('listener created with address=%r', self._address)
     648              self.close = util.Finalize(
     649                  self, PipeListener._finalize_pipe_listener,
     650                  args=(self._handle_queue, self._address), exitpriority=0
     651                  )
     652  
     653          def _new_handle(self, first=False):
     654              flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
     655              if first:
     656                  flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
     657              return _winapi.CreateNamedPipe(
     658                  self._address, flags,
     659                  _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
     660                  _winapi.PIPE_WAIT,
     661                  _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
     662                  _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
     663                  )
     664  
     665          def accept(self):
     666              self._handle_queue.append(self._new_handle())
     667              handle = self._handle_queue.pop(0)
     668              try:
     669                  ov = _winapi.ConnectNamedPipe(handle, overlapped=True)
     670              except OSError as e:
     671                  if e.winerror != _winapi.ERROR_NO_DATA:
     672                      raise
     673                  # ERROR_NO_DATA can occur if a client has already connected,
     674                  # written data and then disconnected -- see Issue 14725.
     675              else:
     676                  try:
     677                      res = _winapi.WaitForMultipleObjects(
     678                          [ov.event], False, INFINITE)
     679                  except:
     680                      ov.cancel()
     681                      _winapi.CloseHandle(handle)
     682                      raise
     683                  finally:
     684                      _, err = ov.GetOverlappedResult(True)
     685                      assert err == 0
     686              return PipeConnection(handle)
     687  
     688          @staticmethod
     689          def _finalize_pipe_listener(queue, address):
     690              util.sub_debug('closing listener with address=%r', address)
     691              for handle in queue:
     692                  _winapi.CloseHandle(handle)
     693  
     694      def PipeClient(address):
     695          '''
     696          Return a connection object connected to the pipe given by `address`
     697          '''
     698          t = _init_timeout()
     699          while 1:
     700              try:
     701                  _winapi.WaitNamedPipe(address, 1000)
     702                  h = _winapi.CreateFile(
     703                      address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE,
     704                      0, _winapi.NULL, _winapi.OPEN_EXISTING,
     705                      _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
     706                      )
     707              except OSError as e:
     708                  if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT,
     709                                        _winapi.ERROR_PIPE_BUSY) or _check_timeout(t):
     710                      raise
     711              else:
     712                  break
     713          else:
     714              raise
     715  
     716          _winapi.SetNamedPipeHandleState(
     717              h, _winapi.PIPE_READMODE_MESSAGE, None, None
     718              )
     719          return PipeConnection(h)
     720  
     721  #
     722  # Authentication stuff
     723  #
     724  
     725  MESSAGE_LENGTH = 40  # MUST be > 20
     726  
     727  _CHALLENGE = b'#CHALLENGE#'
     728  _WELCOME = b'#WELCOME#'
     729  _FAILURE = b'#FAILURE#'
     730  
     731  # multiprocessing.connection Authentication Handshake Protocol Description
     732  # (as documented for reference after reading the existing code)
     733  # =============================================================================
     734  #
     735  # On Windows: native pipes with "overlapped IO" are used to send the bytes,
     736  # instead of the length prefix SIZE scheme described below. (ie: the OS deals
     737  # with message sizes for us)
     738  #
     739  # Protocol error behaviors:
     740  #
     741  # On POSIX, any failure to receive the length prefix into SIZE, for SIZE greater
     742  # than the requested maxsize to receive, or receiving fewer than SIZE bytes
     743  # results in the connection being closed and auth to fail.
     744  #
     745  # On Windows, receiving too few bytes is never a low level _recv_bytes read
     746  # error, receiving too many will trigger an error only if receive maxsize
     747  # value was larger than 128 OR the if the data arrived in smaller pieces.
     748  #
     749  #      Serving side                           Client side
     750  #     ------------------------------  ---------------------------------------
     751  # 0.                                  Open a connection on the pipe.
     752  # 1.  Accept connection.
     753  # 2.  Random 20+ bytes -> MESSAGE
     754  #     Modern servers always send
     755  #     more than 20 bytes and include
     756  #     a {digest} prefix on it with
     757  #     their preferred HMAC digest.
     758  #     Legacy ones send ==20 bytes.
     759  # 3.  send 4 byte length (net order)
     760  #     prefix followed by:
     761  #       b'#CHALLENGE#' + MESSAGE
     762  # 4.                                  Receive 4 bytes, parse as network byte
     763  #                                     order integer. If it is -1, receive an
     764  #                                     additional 8 bytes, parse that as network
     765  #                                     byte order. The result is the length of
     766  #                                     the data that follows -> SIZE.
     767  # 5.                                  Receive min(SIZE, 256) bytes -> M1
     768  # 6.                                  Assert that M1 starts with:
     769  #                                       b'#CHALLENGE#'
     770  # 7.                                  Strip that prefix from M1 into -> M2
     771  # 7.1.                                Parse M2: if it is exactly 20 bytes in
     772  #                                     length this indicates a legacy server
     773  #                                     supporting only HMAC-MD5. Otherwise the
     774  # 7.2.                                preferred digest is looked up from an
     775  #                                     expected "{digest}" prefix on M2. No prefix
     776  #                                     or unsupported digest? <- AuthenticationError
     777  # 7.3.                                Put divined algorithm name in -> D_NAME
     778  # 8.                                  Compute HMAC-D_NAME of AUTHKEY, M2 -> C_DIGEST
     779  # 9.                                  Send 4 byte length prefix (net order)
     780  #                                     followed by C_DIGEST bytes.
     781  # 10. Receive 4 or 4+8 byte length
     782  #     prefix (#4 dance) -> SIZE.
     783  # 11. Receive min(SIZE, 256) -> C_D.
     784  # 11.1. Parse C_D: legacy servers
     785  #     accept it as is, "md5" -> D_NAME
     786  # 11.2. modern servers check the length
     787  #     of C_D, IF it is 16 bytes?
     788  # 11.2.1. "md5" -> D_NAME
     789  #         and skip to step 12.
     790  # 11.3. longer? expect and parse a "{digest}"
     791  #     prefix into -> D_NAME.
     792  #     Strip the prefix and store remaining
     793  #     bytes in -> C_D.
     794  # 11.4. Don't like D_NAME? <- AuthenticationError
     795  # 12. Compute HMAC-D_NAME of AUTHKEY,
     796  #     MESSAGE into -> M_DIGEST.
     797  # 13. Compare M_DIGEST == C_D:
     798  # 14a: Match? Send length prefix &
     799  #       b'#WELCOME#'
     800  #    <- RETURN
     801  # 14b: Mismatch? Send len prefix &
     802  #       b'#FAILURE#'
     803  #    <- CLOSE & AuthenticationError
     804  # 15.                                 Receive 4 or 4+8 byte length prefix (net
     805  #                                     order) again as in #4 into -> SIZE.
     806  # 16.                                 Receive min(SIZE, 256) bytes -> M3.
     807  # 17.                                 Compare M3 == b'#WELCOME#':
     808  # 17a.                                Match? <- RETURN
     809  # 17b.                                Mismatch? <- CLOSE & AuthenticationError
     810  #
     811  # If this RETURNed, the connection remains open: it has been authenticated.
     812  #
     813  # Length prefixes are used consistently. Even on the legacy protocol, this
     814  # was good fortune and allowed us to evolve the protocol by using the length
     815  # of the opening challenge or length of the returned digest as a signal as
     816  # to which protocol the other end supports.
     817  
     818  _ALLOWED_DIGESTS = frozenset(
     819          {b'md5', b'sha256', b'sha384', b'sha3_256', b'sha3_384'})
     820  _MAX_DIGEST_LEN = max(len(_) for _ in _ALLOWED_DIGESTS)
     821  
     822  # Old hmac-md5 only server versions from Python <=3.11 sent a message of this
     823  # length. It happens to not match the length of any supported digest so we can
     824  # use a message of this length to indicate that we should work in backwards
     825  # compatible md5-only mode without a {digest_name} prefix on our response.
     826  _MD5ONLY_MESSAGE_LENGTH = 20
     827  _MD5_DIGEST_LEN = 16
     828  _LEGACY_LENGTHS = (_MD5ONLY_MESSAGE_LENGTH, _MD5_DIGEST_LEN)
     829  
     830  
     831  def _get_digest_name_and_payload(message: bytes) -> (str, bytes):
     832      """Returns a digest name and the payload for a response hash.
     833  
     834      If a legacy protocol is detected based on the message length
     835      or contents the digest name returned will be empty to indicate
     836      legacy mode where MD5 and no digest prefix should be sent.
     837      """
     838      # modern message format: b"{digest}payload" longer than 20 bytes
     839      # legacy message format: 16 or 20 byte b"payload"
     840      if len(message) in _LEGACY_LENGTHS:
     841          # Either this was a legacy server challenge, or we're processing
     842          # a reply from a legacy client that sent an unprefixed 16-byte
     843          # HMAC-MD5 response. All messages using the modern protocol will
     844          # be longer than either of these lengths.
     845          return '', message
     846      if (message.startswith(b'{') and
     847          (curly := message.find(b'}', 1, _MAX_DIGEST_LEN+2)) > 0):
     848          digest = message[1:curly]
     849          if digest in _ALLOWED_DIGESTS:
     850              payload = message[curly+1:]
     851              return digest.decode('ascii'), payload
     852      raise AuthenticationError(
     853              'unsupported message length, missing digest prefix, '
     854              f'or unsupported digest: {message=}')
     855  
     856  
     857  def _create_response(authkey, message):
     858      """Create a MAC based on authkey and message
     859  
     860      The MAC algorithm defaults to HMAC-MD5, unless MD5 is not available or
     861      the message has a '{digest_name}' prefix. For legacy HMAC-MD5, the response
     862      is the raw MAC, otherwise the response is prefixed with '{digest_name}',
     863      e.g. b'{sha256}abcdefg...'
     864  
     865      Note: The MAC protects the entire message including the digest_name prefix.
     866      """
     867      import hmac
     868      digest_name = _get_digest_name_and_payload(message)[0]
     869      # The MAC protects the entire message: digest header and payload.
     870      if not digest_name:
     871          # Legacy server without a {digest} prefix on message.
     872          # Generate a legacy non-prefixed HMAC-MD5 reply.
     873          try:
     874              return hmac.new(authkey, message, 'md5').digest()
     875          except ValueError:
     876              # HMAC-MD5 is not available (FIPS mode?), fall back to
     877              # HMAC-SHA2-256 modern protocol. The legacy server probably
     878              # doesn't support it and will reject us anyways. :shrug:
     879              digest_name = 'sha256'
     880      # Modern protocol, indicate the digest used in the reply.
     881      response = hmac.new(authkey, message, digest_name).digest()
     882      return b'{%s}%s' % (digest_name.encode('ascii'), response)
     883  
     884  
     885  def _verify_challenge(authkey, message, response):
     886      """Verify MAC challenge
     887  
     888      If our message did not include a digest_name prefix, the client is allowed
     889      to select a stronger digest_name from _ALLOWED_DIGESTS.
     890  
     891      In case our message is prefixed, a client cannot downgrade to a weaker
     892      algorithm, because the MAC is calculated over the entire message
     893      including the '{digest_name}' prefix.
     894      """
     895      import hmac
     896      response_digest, response_mac = _get_digest_name_and_payload(response)
     897      response_digest = response_digest or 'md5'
     898      try:
     899          expected = hmac.new(authkey, message, response_digest).digest()
     900      except ValueError:
     901          raise AuthenticationError(f'{response_digest=} unsupported')
     902      if len(expected) != len(response_mac):
     903          raise AuthenticationError(
     904                  f'expected {response_digest!r} of length {len(expected)} '
     905                  f'got {len(response_mac)}')
     906      if not hmac.compare_digest(expected, response_mac):
     907          raise AuthenticationError('digest received was wrong')
     908  
     909  
     910  def deliver_challenge(connection, authkey: bytes, digest_name='sha256'):
     911      if not isinstance(authkey, bytes):
     912          raise ValueError(
     913              "Authkey must be bytes, not {0!s}".format(type(authkey)))
     914      assert MESSAGE_LENGTH > _MD5ONLY_MESSAGE_LENGTH, "protocol constraint"
     915      message = os.urandom(MESSAGE_LENGTH)
     916      message = b'{%s}%s' % (digest_name.encode('ascii'), message)
     917      # Even when sending a challenge to a legacy client that does not support
     918      # digest prefixes, they'll take the entire thing as a challenge and
     919      # respond to it with a raw HMAC-MD5.
     920      connection.send_bytes(_CHALLENGE + message)
     921      response = connection.recv_bytes(256)        # reject large message
     922      try:
     923          _verify_challenge(authkey, message, response)
     924      except AuthenticationError:
     925          connection.send_bytes(_FAILURE)
     926          raise
     927      else:
     928          connection.send_bytes(_WELCOME)
     929  
     930  
     931  def answer_challenge(connection, authkey: bytes):
     932      if not isinstance(authkey, bytes):
     933          raise ValueError(
     934              "Authkey must be bytes, not {0!s}".format(type(authkey)))
     935      message = connection.recv_bytes(256)         # reject large message
     936      if not message.startswith(_CHALLENGE):
     937          raise AuthenticationError(
     938                  f'Protocol error, expected challenge: {message=}')
     939      message = message[len(_CHALLENGE):]
     940      if len(message) < _MD5ONLY_MESSAGE_LENGTH:
     941          raise AuthenticationError('challenge too short: {len(message)} bytes')
     942      digest = _create_response(authkey, message)
     943      connection.send_bytes(digest)
     944      response = connection.recv_bytes(256)        # reject large message
     945      if response != _WELCOME:
     946          raise AuthenticationError('digest sent was rejected')
     947  
     948  #
     949  # Support for using xmlrpclib for serialization
     950  #
     951  
     952  class ESC[4;38;5;81mConnectionWrapper(ESC[4;38;5;149mobject):
     953      def __init__(self, conn, dumps, loads):
     954          self._conn = conn
     955          self._dumps = dumps
     956          self._loads = loads
     957          for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
     958              obj = getattr(conn, attr)
     959              setattr(self, attr, obj)
     960      def send(self, obj):
     961          s = self._dumps(obj)
     962          self._conn.send_bytes(s)
     963      def recv(self):
     964          s = self._conn.recv_bytes()
     965          return self._loads(s)
     966  
     967  def _xml_dumps(obj):
     968      return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8')
     969  
     970  def _xml_loads(s):
     971      (obj,), method = xmlrpclib.loads(s.decode('utf-8'))
     972      return obj
     973  
     974  class ESC[4;38;5;81mXmlListener(ESC[4;38;5;149mListener):
     975      def accept(self):
     976          global xmlrpclib
     977          import xmlrpc.client as xmlrpclib
     978          obj = Listener.accept(self)
     979          return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
     980  
     981  def XmlClient(*args, **kwds):
     982      global xmlrpclib
     983      import xmlrpc.client as xmlrpclib
     984      return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
     985  
     986  #
     987  # Wait
     988  #
     989  
     990  if sys.platform == 'win32':
     991  
     992      def _exhaustive_wait(handles, timeout):
     993          # Return ALL handles which are currently signalled.  (Only
     994          # returning the first signalled might create starvation issues.)
     995          L = list(handles)
     996          ready = []
     997          while L:
     998              res = _winapi.WaitForMultipleObjects(L, False, timeout)
     999              if res == WAIT_TIMEOUT:
    1000                  break
    1001              elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L):
    1002                  res -= WAIT_OBJECT_0
    1003              elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L):
    1004                  res -= WAIT_ABANDONED_0
    1005              else:
    1006                  raise RuntimeError('Should not get here')
    1007              ready.append(L[res])
    1008              L = L[res+1:]
    1009              timeout = 0
    1010          return ready
    1011  
    1012      _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED}
    1013  
    1014      def wait(object_list, timeout=None):
    1015          '''
    1016          Wait till an object in object_list is ready/readable.
    1017  
    1018          Returns list of those objects in object_list which are ready/readable.
    1019          '''
    1020          if timeout is None:
    1021              timeout = INFINITE
    1022          elif timeout < 0:
    1023              timeout = 0
    1024          else:
    1025              timeout = int(timeout * 1000 + 0.5)
    1026  
    1027          object_list = list(object_list)
    1028          waithandle_to_obj = {}
    1029          ov_list = []
    1030          ready_objects = set()
    1031          ready_handles = set()
    1032  
    1033          try:
    1034              for o in object_list:
    1035                  try:
    1036                      fileno = getattr(o, 'fileno')
    1037                  except AttributeError:
    1038                      waithandle_to_obj[o.__index__()] = o
    1039                  else:
    1040                      # start an overlapped read of length zero
    1041                      try:
    1042                          ov, err = _winapi.ReadFile(fileno(), 0, True)
    1043                      except OSError as e:
    1044                          ov, err = None, e.winerror
    1045                          if err not in _ready_errors:
    1046                              raise
    1047                      if err == _winapi.ERROR_IO_PENDING:
    1048                          ov_list.append(ov)
    1049                          waithandle_to_obj[ov.event] = o
    1050                      else:
    1051                          # If o.fileno() is an overlapped pipe handle and
    1052                          # err == 0 then there is a zero length message
    1053                          # in the pipe, but it HAS NOT been consumed...
    1054                          if ov and sys.getwindowsversion()[:2] >= (6, 2):
    1055                              # ... except on Windows 8 and later, where
    1056                              # the message HAS been consumed.
    1057                              try:
    1058                                  _, err = ov.GetOverlappedResult(False)
    1059                              except OSError as e:
    1060                                  err = e.winerror
    1061                              if not err and hasattr(o, '_got_empty_message'):
    1062                                  o._got_empty_message = True
    1063                          ready_objects.add(o)
    1064                          timeout = 0
    1065  
    1066              ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout)
    1067          finally:
    1068              # request that overlapped reads stop
    1069              for ov in ov_list:
    1070                  ov.cancel()
    1071  
    1072              # wait for all overlapped reads to stop
    1073              for ov in ov_list:
    1074                  try:
    1075                      _, err = ov.GetOverlappedResult(True)
    1076                  except OSError as e:
    1077                      err = e.winerror
    1078                      if err not in _ready_errors:
    1079                          raise
    1080                  if err != _winapi.ERROR_OPERATION_ABORTED:
    1081                      o = waithandle_to_obj[ov.event]
    1082                      ready_objects.add(o)
    1083                      if err == 0:
    1084                          # If o.fileno() is an overlapped pipe handle then
    1085                          # a zero length message HAS been consumed.
    1086                          if hasattr(o, '_got_empty_message'):
    1087                              o._got_empty_message = True
    1088  
    1089          ready_objects.update(waithandle_to_obj[h] for h in ready_handles)
    1090          return [o for o in object_list if o in ready_objects]
    1091  
    1092  else:
    1093  
    1094      import selectors
    1095  
    1096      # poll/select have the advantage of not requiring any extra file
    1097      # descriptor, contrarily to epoll/kqueue (also, they require a single
    1098      # syscall).
    1099      if hasattr(selectors, 'PollSelector'):
    1100          _WaitSelector = selectors.PollSelector
    1101      else:
    1102          _WaitSelector = selectors.SelectSelector
    1103  
    1104      def wait(object_list, timeout=None):
    1105          '''
    1106          Wait till an object in object_list is ready/readable.
    1107  
    1108          Returns list of those objects in object_list which are ready/readable.
    1109          '''
    1110          with _WaitSelector() as selector:
    1111              for obj in object_list:
    1112                  selector.register(obj, selectors.EVENT_READ)
    1113  
    1114              if timeout is not None:
    1115                  deadline = time.monotonic() + timeout
    1116  
    1117              while True:
    1118                  ready = selector.select(timeout)
    1119                  if ready:
    1120                      return [key.fileobj for (key, events) in ready]
    1121                  else:
    1122                      if timeout is not None:
    1123                          timeout = deadline - time.monotonic()
    1124                          if timeout < 0:
    1125                              return ready
    1126  
    1127  #
    1128  # Make connection and socket objects shareable if possible
    1129  #
    1130  
    1131  if sys.platform == 'win32':
    1132      def reduce_connection(conn):
    1133          handle = conn.fileno()
    1134          with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
    1135              from . import resource_sharer
    1136              ds = resource_sharer.DupSocket(s)
    1137              return rebuild_connection, (ds, conn.readable, conn.writable)
    1138      def rebuild_connection(ds, readable, writable):
    1139          sock = ds.detach()
    1140          return Connection(sock.detach(), readable, writable)
    1141      reduction.register(Connection, reduce_connection)
    1142  
    1143      def reduce_pipe_connection(conn):
    1144          access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
    1145                    (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
    1146          dh = reduction.DupHandle(conn.fileno(), access)
    1147          return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
    1148      def rebuild_pipe_connection(dh, readable, writable):
    1149          handle = dh.detach()
    1150          return PipeConnection(handle, readable, writable)
    1151      reduction.register(PipeConnection, reduce_pipe_connection)
    1152  
    1153  else:
    1154      def reduce_connection(conn):
    1155          df = reduction.DupFd(conn.fileno())
    1156          return rebuild_connection, (df, conn.readable, conn.writable)
    1157      def rebuild_connection(df, readable, writable):
    1158          fd = df.detach()
    1159          return Connection(fd, readable, writable)
    1160      reduction.register(Connection, reduce_connection)