python (3.12.0)
1 """Download files with progress indicators.
2 """
3 import email.message
4 import logging
5 import mimetypes
6 import os
7 from typing import Iterable, Optional, Tuple
8
9 from pip._vendor.requests.models import CONTENT_CHUNK_SIZE, Response
10
11 from pip._internal.cli.progress_bars import get_download_progress_renderer
12 from pip._internal.exceptions import NetworkConnectionError
13 from pip._internal.models.index import PyPI
14 from pip._internal.models.link import Link
15 from pip._internal.network.cache import is_from_cache
16 from pip._internal.network.session import PipSession
17 from pip._internal.network.utils import HEADERS, raise_for_status, response_chunks
18 from pip._internal.utils.misc import format_size, redact_auth_from_url, splitext
19
20 logger = logging.getLogger(__name__)
21
22
23 def _get_http_response_size(resp: Response) -> Optional[int]:
24 try:
25 return int(resp.headers["content-length"])
26 except (ValueError, KeyError, TypeError):
27 return None
28
29
30 def _prepare_download(
31 resp: Response,
32 link: Link,
33 progress_bar: str,
34 ) -> Iterable[bytes]:
35 total_length = _get_http_response_size(resp)
36
37 if link.netloc == PyPI.file_storage_domain:
38 url = link.show_url
39 else:
40 url = link.url_without_fragment
41
42 logged_url = redact_auth_from_url(url)
43
44 if total_length:
45 logged_url = "{} ({})".format(logged_url, format_size(total_length))
46
47 if is_from_cache(resp):
48 logger.info("Using cached %s", logged_url)
49 else:
50 logger.info("Downloading %s", logged_url)
51
52 if logger.getEffectiveLevel() > logging.INFO:
53 show_progress = False
54 elif is_from_cache(resp):
55 show_progress = False
56 elif not total_length:
57 show_progress = True
58 elif total_length > (40 * 1000):
59 show_progress = True
60 else:
61 show_progress = False
62
63 chunks = response_chunks(resp, CONTENT_CHUNK_SIZE)
64
65 if not show_progress:
66 return chunks
67
68 renderer = get_download_progress_renderer(bar_type=progress_bar, size=total_length)
69 return renderer(chunks)
70
71
72 def sanitize_content_filename(filename: str) -> str:
73 """
74 Sanitize the "filename" value from a Content-Disposition header.
75 """
76 return os.path.basename(filename)
77
78
79 def parse_content_disposition(content_disposition: str, default_filename: str) -> str:
80 """
81 Parse the "filename" value from a Content-Disposition header, and
82 return the default filename if the result is empty.
83 """
84 m = email.message.Message()
85 m["content-type"] = content_disposition
86 filename = m.get_param("filename")
87 if filename:
88 # We need to sanitize the filename to prevent directory traversal
89 # in case the filename contains ".." path parts.
90 filename = sanitize_content_filename(str(filename))
91 return filename or default_filename
92
93
94 def _get_http_response_filename(resp: Response, link: Link) -> str:
95 """Get an ideal filename from the given HTTP response, falling back to
96 the link filename if not provided.
97 """
98 filename = link.filename # fallback
99 # Have a look at the Content-Disposition header for a better guess
100 content_disposition = resp.headers.get("content-disposition")
101 if content_disposition:
102 filename = parse_content_disposition(content_disposition, filename)
103 ext: Optional[str] = splitext(filename)[1]
104 if not ext:
105 ext = mimetypes.guess_extension(resp.headers.get("content-type", ""))
106 if ext:
107 filename += ext
108 if not ext and link.url != resp.url:
109 ext = os.path.splitext(resp.url)[1]
110 if ext:
111 filename += ext
112 return filename
113
114
115 def _http_get_download(session: PipSession, link: Link) -> Response:
116 target_url = link.url.split("#", 1)[0]
117 resp = session.get(target_url, headers=HEADERS, stream=True)
118 raise_for_status(resp)
119 return resp
120
121
122 class ESC[4;38;5;81mDownloader:
123 def __init__(
124 self,
125 session: PipSession,
126 progress_bar: str,
127 ) -> None:
128 self._session = session
129 self._progress_bar = progress_bar
130
131 def __call__(self, link: Link, location: str) -> Tuple[str, str]:
132 """Download the file given by link into location."""
133 try:
134 resp = _http_get_download(self._session, link)
135 except NetworkConnectionError as e:
136 assert e.response is not None
137 logger.critical(
138 "HTTP error %s while getting %s", e.response.status_code, link
139 )
140 raise
141
142 filename = _get_http_response_filename(resp, link)
143 filepath = os.path.join(location, filename)
144
145 chunks = _prepare_download(resp, link, self._progress_bar)
146 with open(filepath, "wb") as content_file:
147 for chunk in chunks:
148 content_file.write(chunk)
149 content_type = resp.headers.get("Content-Type", "")
150 return filepath, content_type
151
152
153 class ESC[4;38;5;81mBatchDownloader:
154 def __init__(
155 self,
156 session: PipSession,
157 progress_bar: str,
158 ) -> None:
159 self._session = session
160 self._progress_bar = progress_bar
161
162 def __call__(
163 self, links: Iterable[Link], location: str
164 ) -> Iterable[Tuple[Link, Tuple[str, str]]]:
165 """Download the files given by links into location."""
166 for link in links:
167 try:
168 resp = _http_get_download(self._session, link)
169 except NetworkConnectionError as e:
170 assert e.response is not None
171 logger.critical(
172 "HTTP error %s while getting %s",
173 e.response.status_code,
174 link,
175 )
176 raise
177
178 filename = _get_http_response_filename(resp, link)
179 filepath = os.path.join(location, filename)
180
181 chunks = _prepare_download(resp, link, self._progress_bar)
182 with open(filepath, "wb") as content_file:
183 for chunk in chunks:
184 content_file.write(chunk)
185 content_type = resp.headers.get("Content-Type", "")
186 yield link, (filepath, content_type)