python (3.12.0)
1 import logging
2 import shutil
3 import sys
4 import textwrap
5 import xmlrpc.client
6 from collections import OrderedDict
7 from optparse import Values
8 from typing import TYPE_CHECKING, Dict, List, Optional
9
10 from pip._vendor.packaging.version import parse as parse_version
11
12 from pip._internal.cli.base_command import Command
13 from pip._internal.cli.req_command import SessionCommandMixin
14 from pip._internal.cli.status_codes import NO_MATCHES_FOUND, SUCCESS
15 from pip._internal.exceptions import CommandError
16 from pip._internal.metadata import get_default_environment
17 from pip._internal.models.index import PyPI
18 from pip._internal.network.xmlrpc import PipXmlrpcTransport
19 from pip._internal.utils.logging import indent_log
20 from pip._internal.utils.misc import write_output
21
22 if TYPE_CHECKING:
23 from typing import TypedDict
24
25 class ESC[4;38;5;81mTransformedHit(ESC[4;38;5;149mTypedDict):
26 name: str
27 summary: str
28 versions: List[str]
29
30
31 logger = logging.getLogger(__name__)
32
33
34 class ESC[4;38;5;81mSearchCommand(ESC[4;38;5;149mCommand, ESC[4;38;5;149mSessionCommandMixin):
35 """Search for PyPI packages whose name or summary contains <query>."""
36
37 usage = """
38 %prog [options] <query>"""
39 ignore_require_venv = True
40
41 def add_options(self) -> None:
42 self.cmd_opts.add_option(
43 "-i",
44 "--index",
45 dest="index",
46 metavar="URL",
47 default=PyPI.pypi_url,
48 help="Base URL of Python Package Index (default %default)",
49 )
50
51 self.parser.insert_option_group(0, self.cmd_opts)
52
53 def run(self, options: Values, args: List[str]) -> int:
54 if not args:
55 raise CommandError("Missing required argument (search query).")
56 query = args
57 pypi_hits = self.search(query, options)
58 hits = transform_hits(pypi_hits)
59
60 terminal_width = None
61 if sys.stdout.isatty():
62 terminal_width = shutil.get_terminal_size()[0]
63
64 print_results(hits, terminal_width=terminal_width)
65 if pypi_hits:
66 return SUCCESS
67 return NO_MATCHES_FOUND
68
69 def search(self, query: List[str], options: Values) -> List[Dict[str, str]]:
70 index_url = options.index
71
72 session = self.get_default_session(options)
73
74 transport = PipXmlrpcTransport(index_url, session)
75 pypi = xmlrpc.client.ServerProxy(index_url, transport)
76 try:
77 hits = pypi.search({"name": query, "summary": query}, "or")
78 except xmlrpc.client.Fault as fault:
79 message = "XMLRPC request failed [code: {code}]\n{string}".format(
80 code=fault.faultCode,
81 string=fault.faultString,
82 )
83 raise CommandError(message)
84 assert isinstance(hits, list)
85 return hits
86
87
88 def transform_hits(hits: List[Dict[str, str]]) -> List["TransformedHit"]:
89 """
90 The list from pypi is really a list of versions. We want a list of
91 packages with the list of versions stored inline. This converts the
92 list from pypi into one we can use.
93 """
94 packages: Dict[str, "TransformedHit"] = OrderedDict()
95 for hit in hits:
96 name = hit["name"]
97 summary = hit["summary"]
98 version = hit["version"]
99
100 if name not in packages.keys():
101 packages[name] = {
102 "name": name,
103 "summary": summary,
104 "versions": [version],
105 }
106 else:
107 packages[name]["versions"].append(version)
108
109 # if this is the highest version, replace summary and score
110 if version == highest_version(packages[name]["versions"]):
111 packages[name]["summary"] = summary
112
113 return list(packages.values())
114
115
116 def print_dist_installation_info(name: str, latest: str) -> None:
117 env = get_default_environment()
118 dist = env.get_distribution(name)
119 if dist is not None:
120 with indent_log():
121 if dist.version == latest:
122 write_output("INSTALLED: %s (latest)", dist.version)
123 else:
124 write_output("INSTALLED: %s", dist.version)
125 if parse_version(latest).pre:
126 write_output(
127 "LATEST: %s (pre-release; install"
128 " with `pip install --pre`)",
129 latest,
130 )
131 else:
132 write_output("LATEST: %s", latest)
133
134
135 def print_results(
136 hits: List["TransformedHit"],
137 name_column_width: Optional[int] = None,
138 terminal_width: Optional[int] = None,
139 ) -> None:
140 if not hits:
141 return
142 if name_column_width is None:
143 name_column_width = (
144 max(
145 [
146 len(hit["name"]) + len(highest_version(hit.get("versions", ["-"])))
147 for hit in hits
148 ]
149 )
150 + 4
151 )
152
153 for hit in hits:
154 name = hit["name"]
155 summary = hit["summary"] or ""
156 latest = highest_version(hit.get("versions", ["-"]))
157 if terminal_width is not None:
158 target_width = terminal_width - name_column_width - 5
159 if target_width > 10:
160 # wrap and indent summary to fit terminal
161 summary_lines = textwrap.wrap(summary, target_width)
162 summary = ("\n" + " " * (name_column_width + 3)).join(summary_lines)
163
164 name_latest = f"{name} ({latest})"
165 line = f"{name_latest:{name_column_width}} - {summary}"
166 try:
167 write_output(line)
168 print_dist_installation_info(name, latest)
169 except UnicodeEncodeError:
170 pass
171
172
173 def highest_version(versions: List[str]) -> str:
174 return max(versions, key=parse_version)