python (3.11.7)

(root)/
lib/
python3.11/
site-packages/
pip/
_internal/
network/
xmlrpc.py
       1  """xmlrpclib.Transport implementation
       2  """
       3  
       4  import logging
       5  import urllib.parse
       6  import xmlrpc.client
       7  from typing import TYPE_CHECKING, Tuple
       8  
       9  from pip._internal.exceptions import NetworkConnectionError
      10  from pip._internal.network.session import PipSession
      11  from pip._internal.network.utils import raise_for_status
      12  
      13  if TYPE_CHECKING:
      14      from xmlrpc.client import _HostType, _Marshallable
      15  
      16  logger = logging.getLogger(__name__)
      17  
      18  
      19  class ESC[4;38;5;81mPipXmlrpcTransport(ESC[4;38;5;149mxmlrpcESC[4;38;5;149m.ESC[4;38;5;149mclientESC[4;38;5;149m.ESC[4;38;5;149mTransport):
      20      """Provide a `xmlrpclib.Transport` implementation via a `PipSession`
      21      object.
      22      """
      23  
      24      def __init__(
      25          self, index_url: str, session: PipSession, use_datetime: bool = False
      26      ) -> None:
      27          super().__init__(use_datetime)
      28          index_parts = urllib.parse.urlparse(index_url)
      29          self._scheme = index_parts.scheme
      30          self._session = session
      31  
      32      def request(
      33          self,
      34          host: "_HostType",
      35          handler: str,
      36          request_body: bytes,
      37          verbose: bool = False,
      38      ) -> Tuple["_Marshallable", ...]:
      39          assert isinstance(host, str)
      40          parts = (self._scheme, host, handler, None, None, None)
      41          url = urllib.parse.urlunparse(parts)
      42          try:
      43              headers = {"Content-Type": "text/xml"}
      44              response = self._session.post(
      45                  url,
      46                  data=request_body,
      47                  headers=headers,
      48                  stream=True,
      49              )
      50              raise_for_status(response)
      51              self.verbose = verbose
      52              return self.parse_response(response.raw)
      53          except NetworkConnectionError as exc:
      54              assert exc.response
      55              logger.critical(
      56                  "HTTP error %s while getting %s",
      57                  exc.response.status_code,
      58                  url,
      59              )
      60              raise