python (3.11.7)
       1  import logging
       2  import shutil
       3  import sys
       4  import textwrap
       5  import xmlrpc.client
       6  from collections import OrderedDict
       7  from optparse import Values
       8  from typing import TYPE_CHECKING, Dict, List, Optional
       9  
      10  from pip._vendor.packaging.version import parse as parse_version
      11  
      12  from pip._internal.cli.base_command import Command
      13  from pip._internal.cli.req_command import SessionCommandMixin
      14  from pip._internal.cli.status_codes import NO_MATCHES_FOUND, SUCCESS
      15  from pip._internal.exceptions import CommandError
      16  from pip._internal.metadata import get_default_environment
      17  from pip._internal.models.index import PyPI
      18  from pip._internal.network.xmlrpc import PipXmlrpcTransport
      19  from pip._internal.utils.logging import indent_log
      20  from pip._internal.utils.misc import write_output
      21  
      22  if TYPE_CHECKING:
      23      from typing import TypedDict
      24  
      25      class ESC[4;38;5;81mTransformedHit(ESC[4;38;5;149mTypedDict):
      26          name: str
      27          summary: str
      28          versions: List[str]
      29  
      30  
      31  logger = logging.getLogger(__name__)
      32  
      33  
      34  class ESC[4;38;5;81mSearchCommand(ESC[4;38;5;149mCommand, ESC[4;38;5;149mSessionCommandMixin):
      35      """Search for PyPI packages whose name or summary contains <query>."""
      36  
      37      usage = """
      38        %prog [options] <query>"""
      39      ignore_require_venv = True
      40  
      41      def add_options(self) -> None:
      42          self.cmd_opts.add_option(
      43              "-i",
      44              "--index",
      45              dest="index",
      46              metavar="URL",
      47              default=PyPI.pypi_url,
      48              help="Base URL of Python Package Index (default %default)",
      49          )
      50  
      51          self.parser.insert_option_group(0, self.cmd_opts)
      52  
      53      def run(self, options: Values, args: List[str]) -> int:
      54          if not args:
      55              raise CommandError("Missing required argument (search query).")
      56          query = args
      57          pypi_hits = self.search(query, options)
      58          hits = transform_hits(pypi_hits)
      59  
      60          terminal_width = None
      61          if sys.stdout.isatty():
      62              terminal_width = shutil.get_terminal_size()[0]
      63  
      64          print_results(hits, terminal_width=terminal_width)
      65          if pypi_hits:
      66              return SUCCESS
      67          return NO_MATCHES_FOUND
      68  
      69      def search(self, query: List[str], options: Values) -> List[Dict[str, str]]:
      70          index_url = options.index
      71  
      72          session = self.get_default_session(options)
      73  
      74          transport = PipXmlrpcTransport(index_url, session)
      75          pypi = xmlrpc.client.ServerProxy(index_url, transport)
      76          try:
      77              hits = pypi.search({"name": query, "summary": query}, "or")
      78          except xmlrpc.client.Fault as fault:
      79              message = "XMLRPC request failed [code: {code}]\n{string}".format(
      80                  code=fault.faultCode,
      81                  string=fault.faultString,
      82              )
      83              raise CommandError(message)
      84          assert isinstance(hits, list)
      85          return hits
      86  
      87  
      88  def transform_hits(hits: List[Dict[str, str]]) -> List["TransformedHit"]:
      89      """
      90      The list from pypi is really a list of versions. We want a list of
      91      packages with the list of versions stored inline. This converts the
      92      list from pypi into one we can use.
      93      """
      94      packages: Dict[str, "TransformedHit"] = OrderedDict()
      95      for hit in hits:
      96          name = hit["name"]
      97          summary = hit["summary"]
      98          version = hit["version"]
      99  
     100          if name not in packages.keys():
     101              packages[name] = {
     102                  "name": name,
     103                  "summary": summary,
     104                  "versions": [version],
     105              }
     106          else:
     107              packages[name]["versions"].append(version)
     108  
     109              # if this is the highest version, replace summary and score
     110              if version == highest_version(packages[name]["versions"]):
     111                  packages[name]["summary"] = summary
     112  
     113      return list(packages.values())
     114  
     115  
     116  def print_dist_installation_info(name: str, latest: str) -> None:
     117      env = get_default_environment()
     118      dist = env.get_distribution(name)
     119      if dist is not None:
     120          with indent_log():
     121              if dist.version == latest:
     122                  write_output("INSTALLED: %s (latest)", dist.version)
     123              else:
     124                  write_output("INSTALLED: %s", dist.version)
     125                  if parse_version(latest).pre:
     126                      write_output(
     127                          "LATEST:    %s (pre-release; install"
     128                          " with `pip install --pre`)",
     129                          latest,
     130                      )
     131                  else:
     132                      write_output("LATEST:    %s", latest)
     133  
     134  
     135  def print_results(
     136      hits: List["TransformedHit"],
     137      name_column_width: Optional[int] = None,
     138      terminal_width: Optional[int] = None,
     139  ) -> None:
     140      if not hits:
     141          return
     142      if name_column_width is None:
     143          name_column_width = (
     144              max(
     145                  [
     146                      len(hit["name"]) + len(highest_version(hit.get("versions", ["-"])))
     147                      for hit in hits
     148                  ]
     149              )
     150              + 4
     151          )
     152  
     153      for hit in hits:
     154          name = hit["name"]
     155          summary = hit["summary"] or ""
     156          latest = highest_version(hit.get("versions", ["-"]))
     157          if terminal_width is not None:
     158              target_width = terminal_width - name_column_width - 5
     159              if target_width > 10:
     160                  # wrap and indent summary to fit terminal
     161                  summary_lines = textwrap.wrap(summary, target_width)
     162                  summary = ("\n" + " " * (name_column_width + 3)).join(summary_lines)
     163  
     164          name_latest = f"{name} ({latest})"
     165          line = f"{name_latest:{name_column_width}} - {summary}"
     166          try:
     167              write_output(line)
     168              print_dist_installation_info(name, latest)
     169          except UnicodeEncodeError:
     170              pass
     171  
     172  
     173  def highest_version(versions: List[str]) -> str:
     174      return max(versions, key=parse_version)