python (3.11.7)
       1  """Download files with progress indicators.
       2  """
       3  import email.message
       4  import logging
       5  import mimetypes
       6  import os
       7  from typing import Iterable, Optional, Tuple
       8  
       9  from pip._vendor.requests.models import CONTENT_CHUNK_SIZE, Response
      10  
      11  from pip._internal.cli.progress_bars import get_download_progress_renderer
      12  from pip._internal.exceptions import NetworkConnectionError
      13  from pip._internal.models.index import PyPI
      14  from pip._internal.models.link import Link
      15  from pip._internal.network.cache import is_from_cache
      16  from pip._internal.network.session import PipSession
      17  from pip._internal.network.utils import HEADERS, raise_for_status, response_chunks
      18  from pip._internal.utils.misc import format_size, redact_auth_from_url, splitext
      19  
      20  logger = logging.getLogger(__name__)
      21  
      22  
      23  def _get_http_response_size(resp: Response) -> Optional[int]:
      24      try:
      25          return int(resp.headers["content-length"])
      26      except (ValueError, KeyError, TypeError):
      27          return None
      28  
      29  
      30  def _prepare_download(
      31      resp: Response,
      32      link: Link,
      33      progress_bar: str,
      34  ) -> Iterable[bytes]:
      35      total_length = _get_http_response_size(resp)
      36  
      37      if link.netloc == PyPI.file_storage_domain:
      38          url = link.show_url
      39      else:
      40          url = link.url_without_fragment
      41  
      42      logged_url = redact_auth_from_url(url)
      43  
      44      if total_length:
      45          logged_url = "{} ({})".format(logged_url, format_size(total_length))
      46  
      47      if is_from_cache(resp):
      48          logger.info("Using cached %s", logged_url)
      49      else:
      50          logger.info("Downloading %s", logged_url)
      51  
      52      if logger.getEffectiveLevel() > logging.INFO:
      53          show_progress = False
      54      elif is_from_cache(resp):
      55          show_progress = False
      56      elif not total_length:
      57          show_progress = True
      58      elif total_length > (40 * 1000):
      59          show_progress = True
      60      else:
      61          show_progress = False
      62  
      63      chunks = response_chunks(resp, CONTENT_CHUNK_SIZE)
      64  
      65      if not show_progress:
      66          return chunks
      67  
      68      renderer = get_download_progress_renderer(bar_type=progress_bar, size=total_length)
      69      return renderer(chunks)
      70  
      71  
      72  def sanitize_content_filename(filename: str) -> str:
      73      """
      74      Sanitize the "filename" value from a Content-Disposition header.
      75      """
      76      return os.path.basename(filename)
      77  
      78  
      79  def parse_content_disposition(content_disposition: str, default_filename: str) -> str:
      80      """
      81      Parse the "filename" value from a Content-Disposition header, and
      82      return the default filename if the result is empty.
      83      """
      84      m = email.message.Message()
      85      m["content-type"] = content_disposition
      86      filename = m.get_param("filename")
      87      if filename:
      88          # We need to sanitize the filename to prevent directory traversal
      89          # in case the filename contains ".." path parts.
      90          filename = sanitize_content_filename(str(filename))
      91      return filename or default_filename
      92  
      93  
      94  def _get_http_response_filename(resp: Response, link: Link) -> str:
      95      """Get an ideal filename from the given HTTP response, falling back to
      96      the link filename if not provided.
      97      """
      98      filename = link.filename  # fallback
      99      # Have a look at the Content-Disposition header for a better guess
     100      content_disposition = resp.headers.get("content-disposition")
     101      if content_disposition:
     102          filename = parse_content_disposition(content_disposition, filename)
     103      ext: Optional[str] = splitext(filename)[1]
     104      if not ext:
     105          ext = mimetypes.guess_extension(resp.headers.get("content-type", ""))
     106          if ext:
     107              filename += ext
     108      if not ext and link.url != resp.url:
     109          ext = os.path.splitext(resp.url)[1]
     110          if ext:
     111              filename += ext
     112      return filename
     113  
     114  
     115  def _http_get_download(session: PipSession, link: Link) -> Response:
     116      target_url = link.url.split("#", 1)[0]
     117      resp = session.get(target_url, headers=HEADERS, stream=True)
     118      raise_for_status(resp)
     119      return resp
     120  
     121  
     122  class ESC[4;38;5;81mDownloader:
     123      def __init__(
     124          self,
     125          session: PipSession,
     126          progress_bar: str,
     127      ) -> None:
     128          self._session = session
     129          self._progress_bar = progress_bar
     130  
     131      def __call__(self, link: Link, location: str) -> Tuple[str, str]:
     132          """Download the file given by link into location."""
     133          try:
     134              resp = _http_get_download(self._session, link)
     135          except NetworkConnectionError as e:
     136              assert e.response is not None
     137              logger.critical(
     138                  "HTTP error %s while getting %s", e.response.status_code, link
     139              )
     140              raise
     141  
     142          filename = _get_http_response_filename(resp, link)
     143          filepath = os.path.join(location, filename)
     144  
     145          chunks = _prepare_download(resp, link, self._progress_bar)
     146          with open(filepath, "wb") as content_file:
     147              for chunk in chunks:
     148                  content_file.write(chunk)
     149          content_type = resp.headers.get("Content-Type", "")
     150          return filepath, content_type
     151  
     152  
     153  class ESC[4;38;5;81mBatchDownloader:
     154      def __init__(
     155          self,
     156          session: PipSession,
     157          progress_bar: str,
     158      ) -> None:
     159          self._session = session
     160          self._progress_bar = progress_bar
     161  
     162      def __call__(
     163          self, links: Iterable[Link], location: str
     164      ) -> Iterable[Tuple[Link, Tuple[str, str]]]:
     165          """Download the files given by links into location."""
     166          for link in links:
     167              try:
     168                  resp = _http_get_download(self._session, link)
     169              except NetworkConnectionError as e:
     170                  assert e.response is not None
     171                  logger.critical(
     172                      "HTTP error %s while getting %s",
     173                      e.response.status_code,
     174                      link,
     175                  )
     176                  raise
     177  
     178              filename = _get_http_response_filename(resp, link)
     179              filepath = os.path.join(location, filename)
     180  
     181              chunks = _prepare_download(resp, link, self._progress_bar)
     182              with open(filepath, "wb") as content_file:
     183                  for chunk in chunks:
     184                      content_file.write(chunk)
     185              content_type = resp.headers.get("Content-Type", "")
     186              yield link, (filepath, content_type)