python (3.11.7)
1 """Dependency Resolution
2
3 The dependency resolution in pip is performed as follows:
4
5 for top-level requirements:
6 a. only one spec allowed per project, regardless of conflicts or not.
7 otherwise a "double requirement" exception is raised
8 b. they override sub-dependency requirements.
9 for sub-dependencies
10 a. "first found, wins" (where the order is breadth first)
11 """
12
13 # The following comment should be removed at some point in the future.
14 # mypy: strict-optional=False
15
16 import logging
17 import sys
18 from collections import defaultdict
19 from itertools import chain
20 from typing import DefaultDict, Iterable, List, Optional, Set, Tuple
21
22 from pip._vendor.packaging import specifiers
23 from pip._vendor.packaging.requirements import Requirement
24
25 from pip._internal.cache import WheelCache
26 from pip._internal.exceptions import (
27 BestVersionAlreadyInstalled,
28 DistributionNotFound,
29 HashError,
30 HashErrors,
31 InstallationError,
32 NoneMetadataError,
33 UnsupportedPythonVersion,
34 )
35 from pip._internal.index.package_finder import PackageFinder
36 from pip._internal.metadata import BaseDistribution
37 from pip._internal.models.link import Link
38 from pip._internal.models.wheel import Wheel
39 from pip._internal.operations.prepare import RequirementPreparer
40 from pip._internal.req.req_install import (
41 InstallRequirement,
42 check_invalid_constraint_type,
43 )
44 from pip._internal.req.req_set import RequirementSet
45 from pip._internal.resolution.base import BaseResolver, InstallRequirementProvider
46 from pip._internal.utils import compatibility_tags
47 from pip._internal.utils.compatibility_tags import get_supported
48 from pip._internal.utils.direct_url_helpers import direct_url_from_link
49 from pip._internal.utils.logging import indent_log
50 from pip._internal.utils.misc import normalize_version_info
51 from pip._internal.utils.packaging import check_requires_python
52
53 logger = logging.getLogger(__name__)
54
55 DiscoveredDependencies = DefaultDict[str, List[InstallRequirement]]
56
57
58 def _check_dist_requires_python(
59 dist: BaseDistribution,
60 version_info: Tuple[int, int, int],
61 ignore_requires_python: bool = False,
62 ) -> None:
63 """
64 Check whether the given Python version is compatible with a distribution's
65 "Requires-Python" value.
66
67 :param version_info: A 3-tuple of ints representing the Python
68 major-minor-micro version to check.
69 :param ignore_requires_python: Whether to ignore the "Requires-Python"
70 value if the given Python version isn't compatible.
71
72 :raises UnsupportedPythonVersion: When the given Python version isn't
73 compatible.
74 """
75 # This idiosyncratically converts the SpecifierSet to str and let
76 # check_requires_python then parse it again into SpecifierSet. But this
77 # is the legacy resolver so I'm just not going to bother refactoring.
78 try:
79 requires_python = str(dist.requires_python)
80 except FileNotFoundError as e:
81 raise NoneMetadataError(dist, str(e))
82 try:
83 is_compatible = check_requires_python(
84 requires_python,
85 version_info=version_info,
86 )
87 except specifiers.InvalidSpecifier as exc:
88 logger.warning(
89 "Package %r has an invalid Requires-Python: %s", dist.raw_name, exc
90 )
91 return
92
93 if is_compatible:
94 return
95
96 version = ".".join(map(str, version_info))
97 if ignore_requires_python:
98 logger.debug(
99 "Ignoring failed Requires-Python check for package %r: %s not in %r",
100 dist.raw_name,
101 version,
102 requires_python,
103 )
104 return
105
106 raise UnsupportedPythonVersion(
107 "Package {!r} requires a different Python: {} not in {!r}".format(
108 dist.raw_name, version, requires_python
109 )
110 )
111
112
113 class ESC[4;38;5;81mResolver(ESC[4;38;5;149mBaseResolver):
114 """Resolves which packages need to be installed/uninstalled to perform \
115 the requested operation without breaking the requirements of any package.
116 """
117
118 _allowed_strategies = {"eager", "only-if-needed", "to-satisfy-only"}
119
120 def __init__(
121 self,
122 preparer: RequirementPreparer,
123 finder: PackageFinder,
124 wheel_cache: Optional[WheelCache],
125 make_install_req: InstallRequirementProvider,
126 use_user_site: bool,
127 ignore_dependencies: bool,
128 ignore_installed: bool,
129 ignore_requires_python: bool,
130 force_reinstall: bool,
131 upgrade_strategy: str,
132 py_version_info: Optional[Tuple[int, ...]] = None,
133 ) -> None:
134 super().__init__()
135 assert upgrade_strategy in self._allowed_strategies
136
137 if py_version_info is None:
138 py_version_info = sys.version_info[:3]
139 else:
140 py_version_info = normalize_version_info(py_version_info)
141
142 self._py_version_info = py_version_info
143
144 self.preparer = preparer
145 self.finder = finder
146 self.wheel_cache = wheel_cache
147
148 self.upgrade_strategy = upgrade_strategy
149 self.force_reinstall = force_reinstall
150 self.ignore_dependencies = ignore_dependencies
151 self.ignore_installed = ignore_installed
152 self.ignore_requires_python = ignore_requires_python
153 self.use_user_site = use_user_site
154 self._make_install_req = make_install_req
155
156 self._discovered_dependencies: DiscoveredDependencies = defaultdict(list)
157
158 def resolve(
159 self, root_reqs: List[InstallRequirement], check_supported_wheels: bool
160 ) -> RequirementSet:
161 """Resolve what operations need to be done
162
163 As a side-effect of this method, the packages (and their dependencies)
164 are downloaded, unpacked and prepared for installation. This
165 preparation is done by ``pip.operations.prepare``.
166
167 Once PyPI has static dependency metadata available, it would be
168 possible to move the preparation to become a step separated from
169 dependency resolution.
170 """
171 requirement_set = RequirementSet(check_supported_wheels=check_supported_wheels)
172 for req in root_reqs:
173 if req.constraint:
174 check_invalid_constraint_type(req)
175 self._add_requirement_to_set(requirement_set, req)
176
177 # Actually prepare the files, and collect any exceptions. Most hash
178 # exceptions cannot be checked ahead of time, because
179 # _populate_link() needs to be called before we can make decisions
180 # based on link type.
181 discovered_reqs: List[InstallRequirement] = []
182 hash_errors = HashErrors()
183 for req in chain(requirement_set.all_requirements, discovered_reqs):
184 try:
185 discovered_reqs.extend(self._resolve_one(requirement_set, req))
186 except HashError as exc:
187 exc.req = req
188 hash_errors.append(exc)
189
190 if hash_errors:
191 raise hash_errors
192
193 return requirement_set
194
195 def _add_requirement_to_set(
196 self,
197 requirement_set: RequirementSet,
198 install_req: InstallRequirement,
199 parent_req_name: Optional[str] = None,
200 extras_requested: Optional[Iterable[str]] = None,
201 ) -> Tuple[List[InstallRequirement], Optional[InstallRequirement]]:
202 """Add install_req as a requirement to install.
203
204 :param parent_req_name: The name of the requirement that needed this
205 added. The name is used because when multiple unnamed requirements
206 resolve to the same name, we could otherwise end up with dependency
207 links that point outside the Requirements set. parent_req must
208 already be added. Note that None implies that this is a user
209 supplied requirement, vs an inferred one.
210 :param extras_requested: an iterable of extras used to evaluate the
211 environment markers.
212 :return: Additional requirements to scan. That is either [] if
213 the requirement is not applicable, or [install_req] if the
214 requirement is applicable and has just been added.
215 """
216 # If the markers do not match, ignore this requirement.
217 if not install_req.match_markers(extras_requested):
218 logger.info(
219 "Ignoring %s: markers '%s' don't match your environment",
220 install_req.name,
221 install_req.markers,
222 )
223 return [], None
224
225 # If the wheel is not supported, raise an error.
226 # Should check this after filtering out based on environment markers to
227 # allow specifying different wheels based on the environment/OS, in a
228 # single requirements file.
229 if install_req.link and install_req.link.is_wheel:
230 wheel = Wheel(install_req.link.filename)
231 tags = compatibility_tags.get_supported()
232 if requirement_set.check_supported_wheels and not wheel.supported(tags):
233 raise InstallationError(
234 "{} is not a supported wheel on this platform.".format(
235 wheel.filename
236 )
237 )
238
239 # This next bit is really a sanity check.
240 assert (
241 not install_req.user_supplied or parent_req_name is None
242 ), "a user supplied req shouldn't have a parent"
243
244 # Unnamed requirements are scanned again and the requirement won't be
245 # added as a dependency until after scanning.
246 if not install_req.name:
247 requirement_set.add_unnamed_requirement(install_req)
248 return [install_req], None
249
250 try:
251 existing_req: Optional[
252 InstallRequirement
253 ] = requirement_set.get_requirement(install_req.name)
254 except KeyError:
255 existing_req = None
256
257 has_conflicting_requirement = (
258 parent_req_name is None
259 and existing_req
260 and not existing_req.constraint
261 and existing_req.extras == install_req.extras
262 and existing_req.req
263 and install_req.req
264 and existing_req.req.specifier != install_req.req.specifier
265 )
266 if has_conflicting_requirement:
267 raise InstallationError(
268 "Double requirement given: {} (already in {}, name={!r})".format(
269 install_req, existing_req, install_req.name
270 )
271 )
272
273 # When no existing requirement exists, add the requirement as a
274 # dependency and it will be scanned again after.
275 if not existing_req:
276 requirement_set.add_named_requirement(install_req)
277 # We'd want to rescan this requirement later
278 return [install_req], install_req
279
280 # Assume there's no need to scan, and that we've already
281 # encountered this for scanning.
282 if install_req.constraint or not existing_req.constraint:
283 return [], existing_req
284
285 does_not_satisfy_constraint = install_req.link and not (
286 existing_req.link and install_req.link.path == existing_req.link.path
287 )
288 if does_not_satisfy_constraint:
289 raise InstallationError(
290 "Could not satisfy constraints for '{}': "
291 "installation from path or url cannot be "
292 "constrained to a version".format(install_req.name)
293 )
294 # If we're now installing a constraint, mark the existing
295 # object for real installation.
296 existing_req.constraint = False
297 # If we're now installing a user supplied requirement,
298 # mark the existing object as such.
299 if install_req.user_supplied:
300 existing_req.user_supplied = True
301 existing_req.extras = tuple(
302 sorted(set(existing_req.extras) | set(install_req.extras))
303 )
304 logger.debug(
305 "Setting %s extras to: %s",
306 existing_req,
307 existing_req.extras,
308 )
309 # Return the existing requirement for addition to the parent and
310 # scanning again.
311 return [existing_req], existing_req
312
313 def _is_upgrade_allowed(self, req: InstallRequirement) -> bool:
314 if self.upgrade_strategy == "to-satisfy-only":
315 return False
316 elif self.upgrade_strategy == "eager":
317 return True
318 else:
319 assert self.upgrade_strategy == "only-if-needed"
320 return req.user_supplied or req.constraint
321
322 def _set_req_to_reinstall(self, req: InstallRequirement) -> None:
323 """
324 Set a requirement to be installed.
325 """
326 # Don't uninstall the conflict if doing a user install and the
327 # conflict is not a user install.
328 if not self.use_user_site or req.satisfied_by.in_usersite:
329 req.should_reinstall = True
330 req.satisfied_by = None
331
332 def _check_skip_installed(
333 self, req_to_install: InstallRequirement
334 ) -> Optional[str]:
335 """Check if req_to_install should be skipped.
336
337 This will check if the req is installed, and whether we should upgrade
338 or reinstall it, taking into account all the relevant user options.
339
340 After calling this req_to_install will only have satisfied_by set to
341 None if the req_to_install is to be upgraded/reinstalled etc. Any
342 other value will be a dist recording the current thing installed that
343 satisfies the requirement.
344
345 Note that for vcs urls and the like we can't assess skipping in this
346 routine - we simply identify that we need to pull the thing down,
347 then later on it is pulled down and introspected to assess upgrade/
348 reinstalls etc.
349
350 :return: A text reason for why it was skipped, or None.
351 """
352 if self.ignore_installed:
353 return None
354
355 req_to_install.check_if_exists(self.use_user_site)
356 if not req_to_install.satisfied_by:
357 return None
358
359 if self.force_reinstall:
360 self._set_req_to_reinstall(req_to_install)
361 return None
362
363 if not self._is_upgrade_allowed(req_to_install):
364 if self.upgrade_strategy == "only-if-needed":
365 return "already satisfied, skipping upgrade"
366 return "already satisfied"
367
368 # Check for the possibility of an upgrade. For link-based
369 # requirements we have to pull the tree down and inspect to assess
370 # the version #, so it's handled way down.
371 if not req_to_install.link:
372 try:
373 self.finder.find_requirement(req_to_install, upgrade=True)
374 except BestVersionAlreadyInstalled:
375 # Then the best version is installed.
376 return "already up-to-date"
377 except DistributionNotFound:
378 # No distribution found, so we squash the error. It will
379 # be raised later when we re-try later to do the install.
380 # Why don't we just raise here?
381 pass
382
383 self._set_req_to_reinstall(req_to_install)
384 return None
385
386 def _find_requirement_link(self, req: InstallRequirement) -> Optional[Link]:
387 upgrade = self._is_upgrade_allowed(req)
388 best_candidate = self.finder.find_requirement(req, upgrade)
389 if not best_candidate:
390 return None
391
392 # Log a warning per PEP 592 if necessary before returning.
393 link = best_candidate.link
394 if link.is_yanked:
395 reason = link.yanked_reason or "<none given>"
396 msg = (
397 # Mark this as a unicode string to prevent
398 # "UnicodeEncodeError: 'ascii' codec can't encode character"
399 # in Python 2 when the reason contains non-ascii characters.
400 "The candidate selected for download or install is a "
401 "yanked version: {candidate}\n"
402 "Reason for being yanked: {reason}"
403 ).format(candidate=best_candidate, reason=reason)
404 logger.warning(msg)
405
406 return link
407
408 def _populate_link(self, req: InstallRequirement) -> None:
409 """Ensure that if a link can be found for this, that it is found.
410
411 Note that req.link may still be None - if the requirement is already
412 installed and not needed to be upgraded based on the return value of
413 _is_upgrade_allowed().
414
415 If preparer.require_hashes is True, don't use the wheel cache, because
416 cached wheels, always built locally, have different hashes than the
417 files downloaded from the index server and thus throw false hash
418 mismatches. Furthermore, cached wheels at present have undeterministic
419 contents due to file modification times.
420 """
421 if req.link is None:
422 req.link = self._find_requirement_link(req)
423
424 if self.wheel_cache is None or self.preparer.require_hashes:
425 return
426 cache_entry = self.wheel_cache.get_cache_entry(
427 link=req.link,
428 package_name=req.name,
429 supported_tags=get_supported(),
430 )
431 if cache_entry is not None:
432 logger.debug("Using cached wheel link: %s", cache_entry.link)
433 if req.link is req.original_link and cache_entry.persistent:
434 req.cached_wheel_source_link = req.link
435 if cache_entry.origin is not None:
436 req.download_info = cache_entry.origin
437 else:
438 # Legacy cache entry that does not have origin.json.
439 # download_info may miss the archive_info.hashes field.
440 req.download_info = direct_url_from_link(
441 req.link, link_is_in_wheel_cache=cache_entry.persistent
442 )
443 req.link = cache_entry.link
444
445 def _get_dist_for(self, req: InstallRequirement) -> BaseDistribution:
446 """Takes a InstallRequirement and returns a single AbstractDist \
447 representing a prepared variant of the same.
448 """
449 if req.editable:
450 return self.preparer.prepare_editable_requirement(req)
451
452 # satisfied_by is only evaluated by calling _check_skip_installed,
453 # so it must be None here.
454 assert req.satisfied_by is None
455 skip_reason = self._check_skip_installed(req)
456
457 if req.satisfied_by:
458 return self.preparer.prepare_installed_requirement(req, skip_reason)
459
460 # We eagerly populate the link, since that's our "legacy" behavior.
461 self._populate_link(req)
462 dist = self.preparer.prepare_linked_requirement(req)
463
464 # NOTE
465 # The following portion is for determining if a certain package is
466 # going to be re-installed/upgraded or not and reporting to the user.
467 # This should probably get cleaned up in a future refactor.
468
469 # req.req is only avail after unpack for URL
470 # pkgs repeat check_if_exists to uninstall-on-upgrade
471 # (#14)
472 if not self.ignore_installed:
473 req.check_if_exists(self.use_user_site)
474
475 if req.satisfied_by:
476 should_modify = (
477 self.upgrade_strategy != "to-satisfy-only"
478 or self.force_reinstall
479 or self.ignore_installed
480 or req.link.scheme == "file"
481 )
482 if should_modify:
483 self._set_req_to_reinstall(req)
484 else:
485 logger.info(
486 "Requirement already satisfied (use --upgrade to upgrade): %s",
487 req,
488 )
489 return dist
490
491 def _resolve_one(
492 self,
493 requirement_set: RequirementSet,
494 req_to_install: InstallRequirement,
495 ) -> List[InstallRequirement]:
496 """Prepare a single requirements file.
497
498 :return: A list of additional InstallRequirements to also install.
499 """
500 # Tell user what we are doing for this requirement:
501 # obtain (editable), skipping, processing (local url), collecting
502 # (remote url or package name)
503 if req_to_install.constraint or req_to_install.prepared:
504 return []
505
506 req_to_install.prepared = True
507
508 # Parse and return dependencies
509 dist = self._get_dist_for(req_to_install)
510 # This will raise UnsupportedPythonVersion if the given Python
511 # version isn't compatible with the distribution's Requires-Python.
512 _check_dist_requires_python(
513 dist,
514 version_info=self._py_version_info,
515 ignore_requires_python=self.ignore_requires_python,
516 )
517
518 more_reqs: List[InstallRequirement] = []
519
520 def add_req(subreq: Requirement, extras_requested: Iterable[str]) -> None:
521 # This idiosyncratically converts the Requirement to str and let
522 # make_install_req then parse it again into Requirement. But this is
523 # the legacy resolver so I'm just not going to bother refactoring.
524 sub_install_req = self._make_install_req(str(subreq), req_to_install)
525 parent_req_name = req_to_install.name
526 to_scan_again, add_to_parent = self._add_requirement_to_set(
527 requirement_set,
528 sub_install_req,
529 parent_req_name=parent_req_name,
530 extras_requested=extras_requested,
531 )
532 if parent_req_name and add_to_parent:
533 self._discovered_dependencies[parent_req_name].append(add_to_parent)
534 more_reqs.extend(to_scan_again)
535
536 with indent_log():
537 # We add req_to_install before its dependencies, so that we
538 # can refer to it when adding dependencies.
539 if not requirement_set.has_requirement(req_to_install.name):
540 # 'unnamed' requirements will get added here
541 # 'unnamed' requirements can only come from being directly
542 # provided by the user.
543 assert req_to_install.user_supplied
544 self._add_requirement_to_set(
545 requirement_set, req_to_install, parent_req_name=None
546 )
547
548 if not self.ignore_dependencies:
549 if req_to_install.extras:
550 logger.debug(
551 "Installing extra requirements: %r",
552 ",".join(req_to_install.extras),
553 )
554 missing_requested = sorted(
555 set(req_to_install.extras) - set(dist.iter_provided_extras())
556 )
557 for missing in missing_requested:
558 logger.warning(
559 "%s %s does not provide the extra '%s'",
560 dist.raw_name,
561 dist.version,
562 missing,
563 )
564
565 available_requested = sorted(
566 set(dist.iter_provided_extras()) & set(req_to_install.extras)
567 )
568 for subreq in dist.iter_dependencies(available_requested):
569 add_req(subreq, extras_requested=available_requested)
570
571 return more_reqs
572
573 def get_installation_order(
574 self, req_set: RequirementSet
575 ) -> List[InstallRequirement]:
576 """Create the installation order.
577
578 The installation order is topological - requirements are installed
579 before the requiring thing. We break cycles at an arbitrary point,
580 and make no other guarantees.
581 """
582 # The current implementation, which we may change at any point
583 # installs the user specified things in the order given, except when
584 # dependencies must come earlier to achieve topological order.
585 order = []
586 ordered_reqs: Set[InstallRequirement] = set()
587
588 def schedule(req: InstallRequirement) -> None:
589 if req.satisfied_by or req in ordered_reqs:
590 return
591 if req.constraint:
592 return
593 ordered_reqs.add(req)
594 for dep in self._discovered_dependencies[req.name]:
595 schedule(dep)
596 order.append(req)
597
598 for install_req in req_set.requirements.values():
599 schedule(install_req)
600 return order