python (3.11.7)
       1  import collections
       2  import itertools
       3  import operator
       4  
       5  from .providers import AbstractResolver
       6  from .structs import DirectedGraph, IteratorMapping, build_iter_view
       7  
       8  RequirementInformation = collections.namedtuple(
       9      "RequirementInformation", ["requirement", "parent"]
      10  )
      11  
      12  
      13  class ESC[4;38;5;81mResolverException(ESC[4;38;5;149mException):
      14      """A base class for all exceptions raised by this module.
      15  
      16      Exceptions derived by this class should all be handled in this module. Any
      17      bubbling pass the resolver should be treated as a bug.
      18      """
      19  
      20  
      21  class ESC[4;38;5;81mRequirementsConflicted(ESC[4;38;5;149mResolverException):
      22      def __init__(self, criterion):
      23          super(RequirementsConflicted, self).__init__(criterion)
      24          self.criterion = criterion
      25  
      26      def __str__(self):
      27          return "Requirements conflict: {}".format(
      28              ", ".join(repr(r) for r in self.criterion.iter_requirement()),
      29          )
      30  
      31  
      32  class ESC[4;38;5;81mInconsistentCandidate(ESC[4;38;5;149mResolverException):
      33      def __init__(self, candidate, criterion):
      34          super(InconsistentCandidate, self).__init__(candidate, criterion)
      35          self.candidate = candidate
      36          self.criterion = criterion
      37  
      38      def __str__(self):
      39          return "Provided candidate {!r} does not satisfy {}".format(
      40              self.candidate,
      41              ", ".join(repr(r) for r in self.criterion.iter_requirement()),
      42          )
      43  
      44  
      45  class ESC[4;38;5;81mCriterion(ESC[4;38;5;149mobject):
      46      """Representation of possible resolution results of a package.
      47  
      48      This holds three attributes:
      49  
      50      * `information` is a collection of `RequirementInformation` pairs.
      51        Each pair is a requirement contributing to this criterion, and the
      52        candidate that provides the requirement.
      53      * `incompatibilities` is a collection of all known not-to-work candidates
      54        to exclude from consideration.
      55      * `candidates` is a collection containing all possible candidates deducted
      56        from the union of contributing requirements and known incompatibilities.
      57        It should never be empty, except when the criterion is an attribute of a
      58        raised `RequirementsConflicted` (in which case it is always empty).
      59  
      60      .. note::
      61          This class is intended to be externally immutable. **Do not** mutate
      62          any of its attribute containers.
      63      """
      64  
      65      def __init__(self, candidates, information, incompatibilities):
      66          self.candidates = candidates
      67          self.information = information
      68          self.incompatibilities = incompatibilities
      69  
      70      def __repr__(self):
      71          requirements = ", ".join(
      72              "({!r}, via={!r})".format(req, parent)
      73              for req, parent in self.information
      74          )
      75          return "Criterion({})".format(requirements)
      76  
      77      def iter_requirement(self):
      78          return (i.requirement for i in self.information)
      79  
      80      def iter_parent(self):
      81          return (i.parent for i in self.information)
      82  
      83  
      84  class ESC[4;38;5;81mResolutionError(ESC[4;38;5;149mResolverException):
      85      pass
      86  
      87  
      88  class ESC[4;38;5;81mResolutionImpossible(ESC[4;38;5;149mResolutionError):
      89      def __init__(self, causes):
      90          super(ResolutionImpossible, self).__init__(causes)
      91          # causes is a list of RequirementInformation objects
      92          self.causes = causes
      93  
      94  
      95  class ESC[4;38;5;81mResolutionTooDeep(ESC[4;38;5;149mResolutionError):
      96      def __init__(self, round_count):
      97          super(ResolutionTooDeep, self).__init__(round_count)
      98          self.round_count = round_count
      99  
     100  
     101  # Resolution state in a round.
     102  State = collections.namedtuple("State", "mapping criteria backtrack_causes")
     103  
     104  
     105  class ESC[4;38;5;81mResolution(ESC[4;38;5;149mobject):
     106      """Stateful resolution object.
     107  
     108      This is designed as a one-off object that holds information to kick start
     109      the resolution process, and holds the results afterwards.
     110      """
     111  
     112      def __init__(self, provider, reporter):
     113          self._p = provider
     114          self._r = reporter
     115          self._states = []
     116  
     117      @property
     118      def state(self):
     119          try:
     120              return self._states[-1]
     121          except IndexError:
     122              raise AttributeError("state")
     123  
     124      def _push_new_state(self):
     125          """Push a new state into history.
     126  
     127          This new state will be used to hold resolution results of the next
     128          coming round.
     129          """
     130          base = self._states[-1]
     131          state = State(
     132              mapping=base.mapping.copy(),
     133              criteria=base.criteria.copy(),
     134              backtrack_causes=base.backtrack_causes[:],
     135          )
     136          self._states.append(state)
     137  
     138      def _add_to_criteria(self, criteria, requirement, parent):
     139          self._r.adding_requirement(requirement=requirement, parent=parent)
     140  
     141          identifier = self._p.identify(requirement_or_candidate=requirement)
     142          criterion = criteria.get(identifier)
     143          if criterion:
     144              incompatibilities = list(criterion.incompatibilities)
     145          else:
     146              incompatibilities = []
     147  
     148          matches = self._p.find_matches(
     149              identifier=identifier,
     150              requirements=IteratorMapping(
     151                  criteria,
     152                  operator.methodcaller("iter_requirement"),
     153                  {identifier: [requirement]},
     154              ),
     155              incompatibilities=IteratorMapping(
     156                  criteria,
     157                  operator.attrgetter("incompatibilities"),
     158                  {identifier: incompatibilities},
     159              ),
     160          )
     161  
     162          if criterion:
     163              information = list(criterion.information)
     164              information.append(RequirementInformation(requirement, parent))
     165          else:
     166              information = [RequirementInformation(requirement, parent)]
     167  
     168          criterion = Criterion(
     169              candidates=build_iter_view(matches),
     170              information=information,
     171              incompatibilities=incompatibilities,
     172          )
     173          if not criterion.candidates:
     174              raise RequirementsConflicted(criterion)
     175          criteria[identifier] = criterion
     176  
     177      def _remove_information_from_criteria(self, criteria, parents):
     178          """Remove information from parents of criteria.
     179  
     180          Concretely, removes all values from each criterion's ``information``
     181          field that have one of ``parents`` as provider of the requirement.
     182  
     183          :param criteria: The criteria to update.
     184          :param parents: Identifiers for which to remove information from all criteria.
     185          """
     186          if not parents:
     187              return
     188          for key, criterion in criteria.items():
     189              criteria[key] = Criterion(
     190                  criterion.candidates,
     191                  [
     192                      information
     193                      for information in criterion.information
     194                      if (
     195                          information.parent is None
     196                          or self._p.identify(information.parent) not in parents
     197                      )
     198                  ],
     199                  criterion.incompatibilities,
     200              )
     201  
     202      def _get_preference(self, name):
     203          return self._p.get_preference(
     204              identifier=name,
     205              resolutions=self.state.mapping,
     206              candidates=IteratorMapping(
     207                  self.state.criteria,
     208                  operator.attrgetter("candidates"),
     209              ),
     210              information=IteratorMapping(
     211                  self.state.criteria,
     212                  operator.attrgetter("information"),
     213              ),
     214              backtrack_causes=self.state.backtrack_causes,
     215          )
     216  
     217      def _is_current_pin_satisfying(self, name, criterion):
     218          try:
     219              current_pin = self.state.mapping[name]
     220          except KeyError:
     221              return False
     222          return all(
     223              self._p.is_satisfied_by(requirement=r, candidate=current_pin)
     224              for r in criterion.iter_requirement()
     225          )
     226  
     227      def _get_updated_criteria(self, candidate):
     228          criteria = self.state.criteria.copy()
     229          for requirement in self._p.get_dependencies(candidate=candidate):
     230              self._add_to_criteria(criteria, requirement, parent=candidate)
     231          return criteria
     232  
     233      def _attempt_to_pin_criterion(self, name):
     234          criterion = self.state.criteria[name]
     235  
     236          causes = []
     237          for candidate in criterion.candidates:
     238              try:
     239                  criteria = self._get_updated_criteria(candidate)
     240              except RequirementsConflicted as e:
     241                  self._r.rejecting_candidate(e.criterion, candidate)
     242                  causes.append(e.criterion)
     243                  continue
     244  
     245              # Check the newly-pinned candidate actually works. This should
     246              # always pass under normal circumstances, but in the case of a
     247              # faulty provider, we will raise an error to notify the implementer
     248              # to fix find_matches() and/or is_satisfied_by().
     249              satisfied = all(
     250                  self._p.is_satisfied_by(requirement=r, candidate=candidate)
     251                  for r in criterion.iter_requirement()
     252              )
     253              if not satisfied:
     254                  raise InconsistentCandidate(candidate, criterion)
     255  
     256              self._r.pinning(candidate=candidate)
     257              self.state.criteria.update(criteria)
     258  
     259              # Put newly-pinned candidate at the end. This is essential because
     260              # backtracking looks at this mapping to get the last pin.
     261              self.state.mapping.pop(name, None)
     262              self.state.mapping[name] = candidate
     263  
     264              return []
     265  
     266          # All candidates tried, nothing works. This criterion is a dead
     267          # end, signal for backtracking.
     268          return causes
     269  
     270      def _backjump(self, causes):
     271          """Perform backjumping.
     272  
     273          When we enter here, the stack is like this::
     274  
     275              [ state Z ]
     276              [ state Y ]
     277              [ state X ]
     278              .... earlier states are irrelevant.
     279  
     280          1. No pins worked for Z, so it does not have a pin.
     281          2. We want to reset state Y to unpinned, and pin another candidate.
     282          3. State X holds what state Y was before the pin, but does not
     283             have the incompatibility information gathered in state Y.
     284  
     285          Each iteration of the loop will:
     286  
     287          1.  Identify Z. The incompatibility is not always caused by the latest
     288              state. For example, given three requirements A, B and C, with
     289              dependencies A1, B1 and C1, where A1 and B1 are incompatible: the
     290              last state might be related to C, so we want to discard the
     291              previous state.
     292          2.  Discard Z.
     293          3.  Discard Y but remember its incompatibility information gathered
     294              previously, and the failure we're dealing with right now.
     295          4.  Push a new state Y' based on X, and apply the incompatibility
     296              information from Y to Y'.
     297          5a. If this causes Y' to conflict, we need to backtrack again. Make Y'
     298              the new Z and go back to step 2.
     299          5b. If the incompatibilities apply cleanly, end backtracking.
     300          """
     301          incompatible_reqs = itertools.chain(
     302              (c.parent for c in causes if c.parent is not None),
     303              (c.requirement for c in causes),
     304          )
     305          incompatible_deps = {self._p.identify(r) for r in incompatible_reqs}
     306          while len(self._states) >= 3:
     307              # Remove the state that triggered backtracking.
     308              del self._states[-1]
     309  
     310              # Ensure to backtrack to a state that caused the incompatibility
     311              incompatible_state = False
     312              while not incompatible_state:
     313                  # Retrieve the last candidate pin and known incompatibilities.
     314                  try:
     315                      broken_state = self._states.pop()
     316                      name, candidate = broken_state.mapping.popitem()
     317                  except (IndexError, KeyError):
     318                      raise ResolutionImpossible(causes)
     319                  current_dependencies = {
     320                      self._p.identify(d)
     321                      for d in self._p.get_dependencies(candidate)
     322                  }
     323                  incompatible_state = not current_dependencies.isdisjoint(
     324                      incompatible_deps
     325                  )
     326  
     327              incompatibilities_from_broken = [
     328                  (k, list(v.incompatibilities))
     329                  for k, v in broken_state.criteria.items()
     330              ]
     331  
     332              # Also mark the newly known incompatibility.
     333              incompatibilities_from_broken.append((name, [candidate]))
     334  
     335              # Create a new state from the last known-to-work one, and apply
     336              # the previously gathered incompatibility information.
     337              def _patch_criteria():
     338                  for k, incompatibilities in incompatibilities_from_broken:
     339                      if not incompatibilities:
     340                          continue
     341                      try:
     342                          criterion = self.state.criteria[k]
     343                      except KeyError:
     344                          continue
     345                      matches = self._p.find_matches(
     346                          identifier=k,
     347                          requirements=IteratorMapping(
     348                              self.state.criteria,
     349                              operator.methodcaller("iter_requirement"),
     350                          ),
     351                          incompatibilities=IteratorMapping(
     352                              self.state.criteria,
     353                              operator.attrgetter("incompatibilities"),
     354                              {k: incompatibilities},
     355                          ),
     356                      )
     357                      candidates = build_iter_view(matches)
     358                      if not candidates:
     359                          return False
     360                      incompatibilities.extend(criterion.incompatibilities)
     361                      self.state.criteria[k] = Criterion(
     362                          candidates=candidates,
     363                          information=list(criterion.information),
     364                          incompatibilities=incompatibilities,
     365                      )
     366                  return True
     367  
     368              self._push_new_state()
     369              success = _patch_criteria()
     370  
     371              # It works! Let's work on this new state.
     372              if success:
     373                  return True
     374  
     375              # State does not work after applying known incompatibilities.
     376              # Try the still previous state.
     377  
     378          # No way to backtrack anymore.
     379          return False
     380  
     381      def resolve(self, requirements, max_rounds):
     382          if self._states:
     383              raise RuntimeError("already resolved")
     384  
     385          self._r.starting()
     386  
     387          # Initialize the root state.
     388          self._states = [
     389              State(
     390                  mapping=collections.OrderedDict(),
     391                  criteria={},
     392                  backtrack_causes=[],
     393              )
     394          ]
     395          for r in requirements:
     396              try:
     397                  self._add_to_criteria(self.state.criteria, r, parent=None)
     398              except RequirementsConflicted as e:
     399                  raise ResolutionImpossible(e.criterion.information)
     400  
     401          # The root state is saved as a sentinel so the first ever pin can have
     402          # something to backtrack to if it fails. The root state is basically
     403          # pinning the virtual "root" package in the graph.
     404          self._push_new_state()
     405  
     406          for round_index in range(max_rounds):
     407              self._r.starting_round(index=round_index)
     408  
     409              unsatisfied_names = [
     410                  key
     411                  for key, criterion in self.state.criteria.items()
     412                  if not self._is_current_pin_satisfying(key, criterion)
     413              ]
     414  
     415              # All criteria are accounted for. Nothing more to pin, we are done!
     416              if not unsatisfied_names:
     417                  self._r.ending(state=self.state)
     418                  return self.state
     419  
     420              # keep track of satisfied names to calculate diff after pinning
     421              satisfied_names = set(self.state.criteria.keys()) - set(
     422                  unsatisfied_names
     423              )
     424  
     425              # Choose the most preferred unpinned criterion to try.
     426              name = min(unsatisfied_names, key=self._get_preference)
     427              failure_causes = self._attempt_to_pin_criterion(name)
     428  
     429              if failure_causes:
     430                  causes = [i for c in failure_causes for i in c.information]
     431                  # Backjump if pinning fails. The backjump process puts us in
     432                  # an unpinned state, so we can work on it in the next round.
     433                  self._r.resolving_conflicts(causes=causes)
     434                  success = self._backjump(causes)
     435                  self.state.backtrack_causes[:] = causes
     436  
     437                  # Dead ends everywhere. Give up.
     438                  if not success:
     439                      raise ResolutionImpossible(self.state.backtrack_causes)
     440              else:
     441                  # discard as information sources any invalidated names
     442                  # (unsatisfied names that were previously satisfied)
     443                  newly_unsatisfied_names = {
     444                      key
     445                      for key, criterion in self.state.criteria.items()
     446                      if key in satisfied_names
     447                      and not self._is_current_pin_satisfying(key, criterion)
     448                  }
     449                  self._remove_information_from_criteria(
     450                      self.state.criteria, newly_unsatisfied_names
     451                  )
     452                  # Pinning was successful. Push a new state to do another pin.
     453                  self._push_new_state()
     454  
     455              self._r.ending_round(index=round_index, state=self.state)
     456  
     457          raise ResolutionTooDeep(max_rounds)
     458  
     459  
     460  def _has_route_to_root(criteria, key, all_keys, connected):
     461      if key in connected:
     462          return True
     463      if key not in criteria:
     464          return False
     465      for p in criteria[key].iter_parent():
     466          try:
     467              pkey = all_keys[id(p)]
     468          except KeyError:
     469              continue
     470          if pkey in connected:
     471              connected.add(key)
     472              return True
     473          if _has_route_to_root(criteria, pkey, all_keys, connected):
     474              connected.add(key)
     475              return True
     476      return False
     477  
     478  
     479  Result = collections.namedtuple("Result", "mapping graph criteria")
     480  
     481  
     482  def _build_result(state):
     483      mapping = state.mapping
     484      all_keys = {id(v): k for k, v in mapping.items()}
     485      all_keys[id(None)] = None
     486  
     487      graph = DirectedGraph()
     488      graph.add(None)  # Sentinel as root dependencies' parent.
     489  
     490      connected = {None}
     491      for key, criterion in state.criteria.items():
     492          if not _has_route_to_root(state.criteria, key, all_keys, connected):
     493              continue
     494          if key not in graph:
     495              graph.add(key)
     496          for p in criterion.iter_parent():
     497              try:
     498                  pkey = all_keys[id(p)]
     499              except KeyError:
     500                  continue
     501              if pkey not in graph:
     502                  graph.add(pkey)
     503              graph.connect(pkey, key)
     504  
     505      return Result(
     506          mapping={k: v for k, v in mapping.items() if k in connected},
     507          graph=graph,
     508          criteria=state.criteria,
     509      )
     510  
     511  
     512  class ESC[4;38;5;81mResolver(ESC[4;38;5;149mAbstractResolver):
     513      """The thing that performs the actual resolution work."""
     514  
     515      base_exception = ResolverException
     516  
     517      def resolve(self, requirements, max_rounds=100):
     518          """Take a collection of constraints, spit out the resolution result.
     519  
     520          The return value is a representation to the final resolution result. It
     521          is a tuple subclass with three public members:
     522  
     523          * `mapping`: A dict of resolved candidates. Each key is an identifier
     524              of a requirement (as returned by the provider's `identify` method),
     525              and the value is the resolved candidate.
     526          * `graph`: A `DirectedGraph` instance representing the dependency tree.
     527              The vertices are keys of `mapping`, and each edge represents *why*
     528              a particular package is included. A special vertex `None` is
     529              included to represent parents of user-supplied requirements.
     530          * `criteria`: A dict of "criteria" that hold detailed information on
     531              how edges in the graph are derived. Each key is an identifier of a
     532              requirement, and the value is a `Criterion` instance.
     533  
     534          The following exceptions may be raised if a resolution cannot be found:
     535  
     536          * `ResolutionImpossible`: A resolution cannot be found for the given
     537              combination of requirements. The `causes` attribute of the
     538              exception is a list of (requirement, parent), giving the
     539              requirements that could not be satisfied.
     540          * `ResolutionTooDeep`: The dependency tree is too deeply nested and
     541              the resolver gave up. This is usually caused by a circular
     542              dependency, but you can try to resolve this by increasing the
     543              `max_rounds` argument.
     544          """
     545          resolution = Resolution(self.provider, self.reporter)
     546          state = resolution.resolve(requirements, max_rounds=max_rounds)
     547          return _build_result(state)