python (3.11.7)

(root)/
lib/
python3.11/
site-packages/
pip/
_internal/
req/
req_file.py
       1  """
       2  Requirements file parsing
       3  """
       4  
       5  import logging
       6  import optparse
       7  import os
       8  import re
       9  import shlex
      10  import urllib.parse
      11  from optparse import Values
      12  from typing import (
      13      TYPE_CHECKING,
      14      Any,
      15      Callable,
      16      Dict,
      17      Generator,
      18      Iterable,
      19      List,
      20      Optional,
      21      Tuple,
      22  )
      23  
      24  from pip._internal.cli import cmdoptions
      25  from pip._internal.exceptions import InstallationError, RequirementsFileParseError
      26  from pip._internal.models.search_scope import SearchScope
      27  from pip._internal.network.session import PipSession
      28  from pip._internal.network.utils import raise_for_status
      29  from pip._internal.utils.encoding import auto_decode
      30  from pip._internal.utils.urls import get_url_scheme
      31  
      32  if TYPE_CHECKING:
      33      # NoReturn introduced in 3.6.2; imported only for type checking to maintain
      34      # pip compatibility with older patch versions of Python 3.6
      35      from typing import NoReturn
      36  
      37      from pip._internal.index.package_finder import PackageFinder
      38  
      39  __all__ = ["parse_requirements"]
      40  
      41  ReqFileLines = Iterable[Tuple[int, str]]
      42  
      43  LineParser = Callable[[str], Tuple[str, Values]]
      44  
      45  SCHEME_RE = re.compile(r"^(http|https|file):", re.I)
      46  COMMENT_RE = re.compile(r"(^|\s+)#.*$")
      47  
      48  # Matches environment variable-style values in '${MY_VARIABLE_1}' with the
      49  # variable name consisting of only uppercase letters, digits or the '_'
      50  # (underscore). This follows the POSIX standard defined in IEEE Std 1003.1,
      51  # 2013 Edition.
      52  ENV_VAR_RE = re.compile(r"(?P<var>\$\{(?P<name>[A-Z0-9_]+)\})")
      53  
      54  SUPPORTED_OPTIONS: List[Callable[..., optparse.Option]] = [
      55      cmdoptions.index_url,
      56      cmdoptions.extra_index_url,
      57      cmdoptions.no_index,
      58      cmdoptions.constraints,
      59      cmdoptions.requirements,
      60      cmdoptions.editable,
      61      cmdoptions.find_links,
      62      cmdoptions.no_binary,
      63      cmdoptions.only_binary,
      64      cmdoptions.prefer_binary,
      65      cmdoptions.require_hashes,
      66      cmdoptions.pre,
      67      cmdoptions.trusted_host,
      68      cmdoptions.use_new_feature,
      69  ]
      70  
      71  # options to be passed to requirements
      72  SUPPORTED_OPTIONS_REQ: List[Callable[..., optparse.Option]] = [
      73      cmdoptions.global_options,
      74      cmdoptions.hash,
      75      cmdoptions.config_settings,
      76  ]
      77  
      78  # the 'dest' string values
      79  SUPPORTED_OPTIONS_REQ_DEST = [str(o().dest) for o in SUPPORTED_OPTIONS_REQ]
      80  
      81  logger = logging.getLogger(__name__)
      82  
      83  
      84  class ESC[4;38;5;81mParsedRequirement:
      85      def __init__(
      86          self,
      87          requirement: str,
      88          is_editable: bool,
      89          comes_from: str,
      90          constraint: bool,
      91          options: Optional[Dict[str, Any]] = None,
      92          line_source: Optional[str] = None,
      93      ) -> None:
      94          self.requirement = requirement
      95          self.is_editable = is_editable
      96          self.comes_from = comes_from
      97          self.options = options
      98          self.constraint = constraint
      99          self.line_source = line_source
     100  
     101  
     102  class ESC[4;38;5;81mParsedLine:
     103      def __init__(
     104          self,
     105          filename: str,
     106          lineno: int,
     107          args: str,
     108          opts: Values,
     109          constraint: bool,
     110      ) -> None:
     111          self.filename = filename
     112          self.lineno = lineno
     113          self.opts = opts
     114          self.constraint = constraint
     115  
     116          if args:
     117              self.is_requirement = True
     118              self.is_editable = False
     119              self.requirement = args
     120          elif opts.editables:
     121              self.is_requirement = True
     122              self.is_editable = True
     123              # We don't support multiple -e on one line
     124              self.requirement = opts.editables[0]
     125          else:
     126              self.is_requirement = False
     127  
     128  
     129  def parse_requirements(
     130      filename: str,
     131      session: PipSession,
     132      finder: Optional["PackageFinder"] = None,
     133      options: Optional[optparse.Values] = None,
     134      constraint: bool = False,
     135  ) -> Generator[ParsedRequirement, None, None]:
     136      """Parse a requirements file and yield ParsedRequirement instances.
     137  
     138      :param filename:    Path or url of requirements file.
     139      :param session:     PipSession instance.
     140      :param finder:      Instance of pip.index.PackageFinder.
     141      :param options:     cli options.
     142      :param constraint:  If true, parsing a constraint file rather than
     143          requirements file.
     144      """
     145      line_parser = get_line_parser(finder)
     146      parser = RequirementsFileParser(session, line_parser)
     147  
     148      for parsed_line in parser.parse(filename, constraint):
     149          parsed_req = handle_line(
     150              parsed_line, options=options, finder=finder, session=session
     151          )
     152          if parsed_req is not None:
     153              yield parsed_req
     154  
     155  
     156  def preprocess(content: str) -> ReqFileLines:
     157      """Split, filter, and join lines, and return a line iterator
     158  
     159      :param content: the content of the requirements file
     160      """
     161      lines_enum: ReqFileLines = enumerate(content.splitlines(), start=1)
     162      lines_enum = join_lines(lines_enum)
     163      lines_enum = ignore_comments(lines_enum)
     164      lines_enum = expand_env_variables(lines_enum)
     165      return lines_enum
     166  
     167  
     168  def handle_requirement_line(
     169      line: ParsedLine,
     170      options: Optional[optparse.Values] = None,
     171  ) -> ParsedRequirement:
     172      # preserve for the nested code path
     173      line_comes_from = "{} {} (line {})".format(
     174          "-c" if line.constraint else "-r",
     175          line.filename,
     176          line.lineno,
     177      )
     178  
     179      assert line.is_requirement
     180  
     181      if line.is_editable:
     182          # For editable requirements, we don't support per-requirement
     183          # options, so just return the parsed requirement.
     184          return ParsedRequirement(
     185              requirement=line.requirement,
     186              is_editable=line.is_editable,
     187              comes_from=line_comes_from,
     188              constraint=line.constraint,
     189          )
     190      else:
     191          # get the options that apply to requirements
     192          req_options = {}
     193          for dest in SUPPORTED_OPTIONS_REQ_DEST:
     194              if dest in line.opts.__dict__ and line.opts.__dict__[dest]:
     195                  req_options[dest] = line.opts.__dict__[dest]
     196  
     197          line_source = f"line {line.lineno} of {line.filename}"
     198          return ParsedRequirement(
     199              requirement=line.requirement,
     200              is_editable=line.is_editable,
     201              comes_from=line_comes_from,
     202              constraint=line.constraint,
     203              options=req_options,
     204              line_source=line_source,
     205          )
     206  
     207  
     208  def handle_option_line(
     209      opts: Values,
     210      filename: str,
     211      lineno: int,
     212      finder: Optional["PackageFinder"] = None,
     213      options: Optional[optparse.Values] = None,
     214      session: Optional[PipSession] = None,
     215  ) -> None:
     216      if opts.hashes:
     217          logger.warning(
     218              "%s line %s has --hash but no requirement, and will be ignored.",
     219              filename,
     220              lineno,
     221          )
     222  
     223      if options:
     224          # percolate options upward
     225          if opts.require_hashes:
     226              options.require_hashes = opts.require_hashes
     227          if opts.features_enabled:
     228              options.features_enabled.extend(
     229                  f for f in opts.features_enabled if f not in options.features_enabled
     230              )
     231  
     232      # set finder options
     233      if finder:
     234          find_links = finder.find_links
     235          index_urls = finder.index_urls
     236          no_index = finder.search_scope.no_index
     237          if opts.no_index is True:
     238              no_index = True
     239              index_urls = []
     240          if opts.index_url and not no_index:
     241              index_urls = [opts.index_url]
     242          if opts.extra_index_urls and not no_index:
     243              index_urls.extend(opts.extra_index_urls)
     244          if opts.find_links:
     245              # FIXME: it would be nice to keep track of the source
     246              # of the find_links: support a find-links local path
     247              # relative to a requirements file.
     248              value = opts.find_links[0]
     249              req_dir = os.path.dirname(os.path.abspath(filename))
     250              relative_to_reqs_file = os.path.join(req_dir, value)
     251              if os.path.exists(relative_to_reqs_file):
     252                  value = relative_to_reqs_file
     253              find_links.append(value)
     254  
     255          if session:
     256              # We need to update the auth urls in session
     257              session.update_index_urls(index_urls)
     258  
     259          search_scope = SearchScope(
     260              find_links=find_links,
     261              index_urls=index_urls,
     262              no_index=no_index,
     263          )
     264          finder.search_scope = search_scope
     265  
     266          if opts.pre:
     267              finder.set_allow_all_prereleases()
     268  
     269          if opts.prefer_binary:
     270              finder.set_prefer_binary()
     271  
     272          if session:
     273              for host in opts.trusted_hosts or []:
     274                  source = f"line {lineno} of {filename}"
     275                  session.add_trusted_host(host, source=source)
     276  
     277  
     278  def handle_line(
     279      line: ParsedLine,
     280      options: Optional[optparse.Values] = None,
     281      finder: Optional["PackageFinder"] = None,
     282      session: Optional[PipSession] = None,
     283  ) -> Optional[ParsedRequirement]:
     284      """Handle a single parsed requirements line; This can result in
     285      creating/yielding requirements, or updating the finder.
     286  
     287      :param line:        The parsed line to be processed.
     288      :param options:     CLI options.
     289      :param finder:      The finder - updated by non-requirement lines.
     290      :param session:     The session - updated by non-requirement lines.
     291  
     292      Returns a ParsedRequirement object if the line is a requirement line,
     293      otherwise returns None.
     294  
     295      For lines that contain requirements, the only options that have an effect
     296      are from SUPPORTED_OPTIONS_REQ, and they are scoped to the
     297      requirement. Other options from SUPPORTED_OPTIONS may be present, but are
     298      ignored.
     299  
     300      For lines that do not contain requirements, the only options that have an
     301      effect are from SUPPORTED_OPTIONS. Options from SUPPORTED_OPTIONS_REQ may
     302      be present, but are ignored. These lines may contain multiple options
     303      (although our docs imply only one is supported), and all our parsed and
     304      affect the finder.
     305      """
     306  
     307      if line.is_requirement:
     308          parsed_req = handle_requirement_line(line, options)
     309          return parsed_req
     310      else:
     311          handle_option_line(
     312              line.opts,
     313              line.filename,
     314              line.lineno,
     315              finder,
     316              options,
     317              session,
     318          )
     319          return None
     320  
     321  
     322  class ESC[4;38;5;81mRequirementsFileParser:
     323      def __init__(
     324          self,
     325          session: PipSession,
     326          line_parser: LineParser,
     327      ) -> None:
     328          self._session = session
     329          self._line_parser = line_parser
     330  
     331      def parse(
     332          self, filename: str, constraint: bool
     333      ) -> Generator[ParsedLine, None, None]:
     334          """Parse a given file, yielding parsed lines."""
     335          yield from self._parse_and_recurse(filename, constraint)
     336  
     337      def _parse_and_recurse(
     338          self, filename: str, constraint: bool
     339      ) -> Generator[ParsedLine, None, None]:
     340          for line in self._parse_file(filename, constraint):
     341              if not line.is_requirement and (
     342                  line.opts.requirements or line.opts.constraints
     343              ):
     344                  # parse a nested requirements file
     345                  if line.opts.requirements:
     346                      req_path = line.opts.requirements[0]
     347                      nested_constraint = False
     348                  else:
     349                      req_path = line.opts.constraints[0]
     350                      nested_constraint = True
     351  
     352                  # original file is over http
     353                  if SCHEME_RE.search(filename):
     354                      # do a url join so relative paths work
     355                      req_path = urllib.parse.urljoin(filename, req_path)
     356                  # original file and nested file are paths
     357                  elif not SCHEME_RE.search(req_path):
     358                      # do a join so relative paths work
     359                      req_path = os.path.join(
     360                          os.path.dirname(filename),
     361                          req_path,
     362                      )
     363  
     364                  yield from self._parse_and_recurse(req_path, nested_constraint)
     365              else:
     366                  yield line
     367  
     368      def _parse_file(
     369          self, filename: str, constraint: bool
     370      ) -> Generator[ParsedLine, None, None]:
     371          _, content = get_file_content(filename, self._session)
     372  
     373          lines_enum = preprocess(content)
     374  
     375          for line_number, line in lines_enum:
     376              try:
     377                  args_str, opts = self._line_parser(line)
     378              except OptionParsingError as e:
     379                  # add offending line
     380                  msg = f"Invalid requirement: {line}\n{e.msg}"
     381                  raise RequirementsFileParseError(msg)
     382  
     383              yield ParsedLine(
     384                  filename,
     385                  line_number,
     386                  args_str,
     387                  opts,
     388                  constraint,
     389              )
     390  
     391  
     392  def get_line_parser(finder: Optional["PackageFinder"]) -> LineParser:
     393      def parse_line(line: str) -> Tuple[str, Values]:
     394          # Build new parser for each line since it accumulates appendable
     395          # options.
     396          parser = build_parser()
     397          defaults = parser.get_default_values()
     398          defaults.index_url = None
     399          if finder:
     400              defaults.format_control = finder.format_control
     401  
     402          args_str, options_str = break_args_options(line)
     403  
     404          try:
     405              options = shlex.split(options_str)
     406          except ValueError as e:
     407              raise OptionParsingError(f"Could not split options: {options_str}") from e
     408  
     409          opts, _ = parser.parse_args(options, defaults)
     410  
     411          return args_str, opts
     412  
     413      return parse_line
     414  
     415  
     416  def break_args_options(line: str) -> Tuple[str, str]:
     417      """Break up the line into an args and options string.  We only want to shlex
     418      (and then optparse) the options, not the args.  args can contain markers
     419      which are corrupted by shlex.
     420      """
     421      tokens = line.split(" ")
     422      args = []
     423      options = tokens[:]
     424      for token in tokens:
     425          if token.startswith("-") or token.startswith("--"):
     426              break
     427          else:
     428              args.append(token)
     429              options.pop(0)
     430      return " ".join(args), " ".join(options)
     431  
     432  
     433  class ESC[4;38;5;81mOptionParsingError(ESC[4;38;5;149mException):
     434      def __init__(self, msg: str) -> None:
     435          self.msg = msg
     436  
     437  
     438  def build_parser() -> optparse.OptionParser:
     439      """
     440      Return a parser for parsing requirement lines
     441      """
     442      parser = optparse.OptionParser(add_help_option=False)
     443  
     444      option_factories = SUPPORTED_OPTIONS + SUPPORTED_OPTIONS_REQ
     445      for option_factory in option_factories:
     446          option = option_factory()
     447          parser.add_option(option)
     448  
     449      # By default optparse sys.exits on parsing errors. We want to wrap
     450      # that in our own exception.
     451      def parser_exit(self: Any, msg: str) -> "NoReturn":
     452          raise OptionParsingError(msg)
     453  
     454      # NOTE: mypy disallows assigning to a method
     455      #       https://github.com/python/mypy/issues/2427
     456      parser.exit = parser_exit  # type: ignore
     457  
     458      return parser
     459  
     460  
     461  def join_lines(lines_enum: ReqFileLines) -> ReqFileLines:
     462      """Joins a line ending in '\' with the previous line (except when following
     463      comments).  The joined line takes on the index of the first line.
     464      """
     465      primary_line_number = None
     466      new_line: List[str] = []
     467      for line_number, line in lines_enum:
     468          if not line.endswith("\\") or COMMENT_RE.match(line):
     469              if COMMENT_RE.match(line):
     470                  # this ensures comments are always matched later
     471                  line = " " + line
     472              if new_line:
     473                  new_line.append(line)
     474                  assert primary_line_number is not None
     475                  yield primary_line_number, "".join(new_line)
     476                  new_line = []
     477              else:
     478                  yield line_number, line
     479          else:
     480              if not new_line:
     481                  primary_line_number = line_number
     482              new_line.append(line.strip("\\"))
     483  
     484      # last line contains \
     485      if new_line:
     486          assert primary_line_number is not None
     487          yield primary_line_number, "".join(new_line)
     488  
     489      # TODO: handle space after '\'.
     490  
     491  
     492  def ignore_comments(lines_enum: ReqFileLines) -> ReqFileLines:
     493      """
     494      Strips comments and filter empty lines.
     495      """
     496      for line_number, line in lines_enum:
     497          line = COMMENT_RE.sub("", line)
     498          line = line.strip()
     499          if line:
     500              yield line_number, line
     501  
     502  
     503  def expand_env_variables(lines_enum: ReqFileLines) -> ReqFileLines:
     504      """Replace all environment variables that can be retrieved via `os.getenv`.
     505  
     506      The only allowed format for environment variables defined in the
     507      requirement file is `${MY_VARIABLE_1}` to ensure two things:
     508  
     509      1. Strings that contain a `$` aren't accidentally (partially) expanded.
     510      2. Ensure consistency across platforms for requirement files.
     511  
     512      These points are the result of a discussion on the `github pull
     513      request #3514 <https://github.com/pypa/pip/pull/3514>`_.
     514  
     515      Valid characters in variable names follow the `POSIX standard
     516      <http://pubs.opengroup.org/onlinepubs/9699919799/>`_ and are limited
     517      to uppercase letter, digits and the `_` (underscore).
     518      """
     519      for line_number, line in lines_enum:
     520          for env_var, var_name in ENV_VAR_RE.findall(line):
     521              value = os.getenv(var_name)
     522              if not value:
     523                  continue
     524  
     525              line = line.replace(env_var, value)
     526  
     527          yield line_number, line
     528  
     529  
     530  def get_file_content(url: str, session: PipSession) -> Tuple[str, str]:
     531      """Gets the content of a file; it may be a filename, file: URL, or
     532      http: URL.  Returns (location, content).  Content is unicode.
     533      Respects # -*- coding: declarations on the retrieved files.
     534  
     535      :param url:         File path or url.
     536      :param session:     PipSession instance.
     537      """
     538      scheme = get_url_scheme(url)
     539  
     540      # Pip has special support for file:// URLs (LocalFSAdapter).
     541      if scheme in ["http", "https", "file"]:
     542          resp = session.get(url)
     543          raise_for_status(resp)
     544          return resp.url, resp.text
     545  
     546      # Assume this is a bare path.
     547      try:
     548          with open(url, "rb") as f:
     549              content = auto_decode(f.read())
     550      except OSError as exc:
     551          raise InstallationError(f"Could not open requirements file: {exc}")
     552      return url, content