python (3.11.7)

(root)/
lib/
python3.11/
site-packages/
pip/
_internal/
resolution/
resolvelib/
resolver.py
       1  import functools
       2  import logging
       3  import os
       4  from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, cast
       5  
       6  from pip._vendor.packaging.utils import canonicalize_name
       7  from pip._vendor.resolvelib import BaseReporter, ResolutionImpossible
       8  from pip._vendor.resolvelib import Resolver as RLResolver
       9  from pip._vendor.resolvelib.structs import DirectedGraph
      10  
      11  from pip._internal.cache import WheelCache
      12  from pip._internal.index.package_finder import PackageFinder
      13  from pip._internal.operations.prepare import RequirementPreparer
      14  from pip._internal.req.req_install import InstallRequirement
      15  from pip._internal.req.req_set import RequirementSet
      16  from pip._internal.resolution.base import BaseResolver, InstallRequirementProvider
      17  from pip._internal.resolution.resolvelib.provider import PipProvider
      18  from pip._internal.resolution.resolvelib.reporter import (
      19      PipDebuggingReporter,
      20      PipReporter,
      21  )
      22  
      23  from .base import Candidate, Requirement
      24  from .factory import Factory
      25  
      26  if TYPE_CHECKING:
      27      from pip._vendor.resolvelib.resolvers import Result as RLResult
      28  
      29      Result = RLResult[Requirement, Candidate, str]
      30  
      31  
      32  logger = logging.getLogger(__name__)
      33  
      34  
      35  class ESC[4;38;5;81mResolver(ESC[4;38;5;149mBaseResolver):
      36      _allowed_strategies = {"eager", "only-if-needed", "to-satisfy-only"}
      37  
      38      def __init__(
      39          self,
      40          preparer: RequirementPreparer,
      41          finder: PackageFinder,
      42          wheel_cache: Optional[WheelCache],
      43          make_install_req: InstallRequirementProvider,
      44          use_user_site: bool,
      45          ignore_dependencies: bool,
      46          ignore_installed: bool,
      47          ignore_requires_python: bool,
      48          force_reinstall: bool,
      49          upgrade_strategy: str,
      50          py_version_info: Optional[Tuple[int, ...]] = None,
      51      ):
      52          super().__init__()
      53          assert upgrade_strategy in self._allowed_strategies
      54  
      55          self.factory = Factory(
      56              finder=finder,
      57              preparer=preparer,
      58              make_install_req=make_install_req,
      59              wheel_cache=wheel_cache,
      60              use_user_site=use_user_site,
      61              force_reinstall=force_reinstall,
      62              ignore_installed=ignore_installed,
      63              ignore_requires_python=ignore_requires_python,
      64              py_version_info=py_version_info,
      65          )
      66          self.ignore_dependencies = ignore_dependencies
      67          self.upgrade_strategy = upgrade_strategy
      68          self._result: Optional[Result] = None
      69  
      70      def resolve(
      71          self, root_reqs: List[InstallRequirement], check_supported_wheels: bool
      72      ) -> RequirementSet:
      73          collected = self.factory.collect_root_requirements(root_reqs)
      74          provider = PipProvider(
      75              factory=self.factory,
      76              constraints=collected.constraints,
      77              ignore_dependencies=self.ignore_dependencies,
      78              upgrade_strategy=self.upgrade_strategy,
      79              user_requested=collected.user_requested,
      80          )
      81          if "PIP_RESOLVER_DEBUG" in os.environ:
      82              reporter: BaseReporter = PipDebuggingReporter()
      83          else:
      84              reporter = PipReporter()
      85          resolver: RLResolver[Requirement, Candidate, str] = RLResolver(
      86              provider,
      87              reporter,
      88          )
      89  
      90          try:
      91              limit_how_complex_resolution_can_be = 200000
      92              result = self._result = resolver.resolve(
      93                  collected.requirements, max_rounds=limit_how_complex_resolution_can_be
      94              )
      95  
      96          except ResolutionImpossible as e:
      97              error = self.factory.get_installation_error(
      98                  cast("ResolutionImpossible[Requirement, Candidate]", e),
      99                  collected.constraints,
     100              )
     101              raise error from e
     102  
     103          req_set = RequirementSet(check_supported_wheels=check_supported_wheels)
     104          for candidate in result.mapping.values():
     105              ireq = candidate.get_install_requirement()
     106              if ireq is None:
     107                  continue
     108  
     109              # Check if there is already an installation under the same name,
     110              # and set a flag for later stages to uninstall it, if needed.
     111              installed_dist = self.factory.get_dist_to_uninstall(candidate)
     112              if installed_dist is None:
     113                  # There is no existing installation -- nothing to uninstall.
     114                  ireq.should_reinstall = False
     115              elif self.factory.force_reinstall:
     116                  # The --force-reinstall flag is set -- reinstall.
     117                  ireq.should_reinstall = True
     118              elif installed_dist.version != candidate.version:
     119                  # The installation is different in version -- reinstall.
     120                  ireq.should_reinstall = True
     121              elif candidate.is_editable or installed_dist.editable:
     122                  # The incoming distribution is editable, or different in
     123                  # editable-ness to installation -- reinstall.
     124                  ireq.should_reinstall = True
     125              elif candidate.source_link and candidate.source_link.is_file:
     126                  # The incoming distribution is under file://
     127                  if candidate.source_link.is_wheel:
     128                      # is a local wheel -- do nothing.
     129                      logger.info(
     130                          "%s is already installed with the same version as the "
     131                          "provided wheel. Use --force-reinstall to force an "
     132                          "installation of the wheel.",
     133                          ireq.name,
     134                      )
     135                      continue
     136  
     137                  # is a local sdist or path -- reinstall
     138                  ireq.should_reinstall = True
     139              else:
     140                  continue
     141  
     142              link = candidate.source_link
     143              if link and link.is_yanked:
     144                  # The reason can contain non-ASCII characters, Unicode
     145                  # is required for Python 2.
     146                  msg = (
     147                      "The candidate selected for download or install is a "
     148                      "yanked version: {name!r} candidate (version {version} "
     149                      "at {link})\nReason for being yanked: {reason}"
     150                  ).format(
     151                      name=candidate.name,
     152                      version=candidate.version,
     153                      link=link,
     154                      reason=link.yanked_reason or "<none given>",
     155                  )
     156                  logger.warning(msg)
     157  
     158              req_set.add_named_requirement(ireq)
     159  
     160          reqs = req_set.all_requirements
     161          self.factory.preparer.prepare_linked_requirements_more(reqs)
     162          for req in reqs:
     163              req.prepared = True
     164              req.needs_more_preparation = False
     165          return req_set
     166  
     167      def get_installation_order(
     168          self, req_set: RequirementSet
     169      ) -> List[InstallRequirement]:
     170          """Get order for installation of requirements in RequirementSet.
     171  
     172          The returned list contains a requirement before another that depends on
     173          it. This helps ensure that the environment is kept consistent as they
     174          get installed one-by-one.
     175  
     176          The current implementation creates a topological ordering of the
     177          dependency graph, giving more weight to packages with less
     178          or no dependencies, while breaking any cycles in the graph at
     179          arbitrary points. We make no guarantees about where the cycle
     180          would be broken, other than it *would* be broken.
     181          """
     182          assert self._result is not None, "must call resolve() first"
     183  
     184          if not req_set.requirements:
     185              # Nothing is left to install, so we do not need an order.
     186              return []
     187  
     188          graph = self._result.graph
     189          weights = get_topological_weights(graph, set(req_set.requirements.keys()))
     190  
     191          sorted_items = sorted(
     192              req_set.requirements.items(),
     193              key=functools.partial(_req_set_item_sorter, weights=weights),
     194              reverse=True,
     195          )
     196          return [ireq for _, ireq in sorted_items]
     197  
     198  
     199  def get_topological_weights(
     200      graph: "DirectedGraph[Optional[str]]", requirement_keys: Set[str]
     201  ) -> Dict[Optional[str], int]:
     202      """Assign weights to each node based on how "deep" they are.
     203  
     204      This implementation may change at any point in the future without prior
     205      notice.
     206  
     207      We first simplify the dependency graph by pruning any leaves and giving them
     208      the highest weight: a package without any dependencies should be installed
     209      first. This is done again and again in the same way, giving ever less weight
     210      to the newly found leaves. The loop stops when no leaves are left: all
     211      remaining packages have at least one dependency left in the graph.
     212  
     213      Then we continue with the remaining graph, by taking the length for the
     214      longest path to any node from root, ignoring any paths that contain a single
     215      node twice (i.e. cycles). This is done through a depth-first search through
     216      the graph, while keeping track of the path to the node.
     217  
     218      Cycles in the graph result would result in node being revisited while also
     219      being on its own path. In this case, take no action. This helps ensure we
     220      don't get stuck in a cycle.
     221  
     222      When assigning weight, the longer path (i.e. larger length) is preferred.
     223  
     224      We are only interested in the weights of packages that are in the
     225      requirement_keys.
     226      """
     227      path: Set[Optional[str]] = set()
     228      weights: Dict[Optional[str], int] = {}
     229  
     230      def visit(node: Optional[str]) -> None:
     231          if node in path:
     232              # We hit a cycle, so we'll break it here.
     233              return
     234  
     235          # Time to visit the children!
     236          path.add(node)
     237          for child in graph.iter_children(node):
     238              visit(child)
     239          path.remove(node)
     240  
     241          if node not in requirement_keys:
     242              return
     243  
     244          last_known_parent_count = weights.get(node, 0)
     245          weights[node] = max(last_known_parent_count, len(path))
     246  
     247      # Simplify the graph, pruning leaves that have no dependencies.
     248      # This is needed for large graphs (say over 200 packages) because the
     249      # `visit` function is exponentially slower then, taking minutes.
     250      # See https://github.com/pypa/pip/issues/10557
     251      # We will loop until we explicitly break the loop.
     252      while True:
     253          leaves = set()
     254          for key in graph:
     255              if key is None:
     256                  continue
     257              for _child in graph.iter_children(key):
     258                  # This means we have at least one child
     259                  break
     260              else:
     261                  # No child.
     262                  leaves.add(key)
     263          if not leaves:
     264              # We are done simplifying.
     265              break
     266          # Calculate the weight for the leaves.
     267          weight = len(graph) - 1
     268          for leaf in leaves:
     269              if leaf not in requirement_keys:
     270                  continue
     271              weights[leaf] = weight
     272          # Remove the leaves from the graph, making it simpler.
     273          for leaf in leaves:
     274              graph.remove(leaf)
     275  
     276      # Visit the remaining graph.
     277      # `None` is guaranteed to be the root node by resolvelib.
     278      visit(None)
     279  
     280      # Sanity check: all requirement keys should be in the weights,
     281      # and no other keys should be in the weights.
     282      difference = set(weights.keys()).difference(requirement_keys)
     283      assert not difference, difference
     284  
     285      return weights
     286  
     287  
     288  def _req_set_item_sorter(
     289      item: Tuple[str, InstallRequirement],
     290      weights: Dict[Optional[str], int],
     291  ) -> Tuple[int, str]:
     292      """Key function used to sort install requirements for installation.
     293  
     294      Based on the "weight" mapping calculated in ``get_installation_order()``.
     295      The canonical package name is returned as the second member as a tie-
     296      breaker to ensure the result is predictable, which is useful in tests.
     297      """
     298      name = canonicalize_name(item[0])
     299      return weights[name], name