(root)/
Python-3.12.0/
Tools/
c-analyzer/
c_common/
scriptutil.py
       1  import argparse
       2  import contextlib
       3  import fnmatch
       4  import logging
       5  import os
       6  import os.path
       7  import shutil
       8  import sys
       9  
      10  from . import fsutil, strutil, iterutil, logging as loggingutil
      11  
      12  
      13  _NOT_SET = object()
      14  
      15  
      16  def get_prog(spec=None, *, absolute=False, allowsuffix=True):
      17      if spec is None:
      18          _, spec = _find_script()
      19          # This is more natural for prog than __file__ would be.
      20          filename = sys.argv[0]
      21      elif isinstance(spec, str):
      22          filename = os.path.normpath(spec)
      23          spec = None
      24      else:
      25          filename = spec.origin
      26      if _is_standalone(filename):
      27          # Check if "installed".
      28          if allowsuffix or not filename.endswith('.py'):
      29              basename = os.path.basename(filename)
      30              found = shutil.which(basename)
      31              if found:
      32                  script = os.path.abspath(filename)
      33                  found = os.path.abspath(found)
      34                  if os.path.normcase(script) == os.path.normcase(found):
      35                      return basename
      36          # It is only "standalone".
      37          if absolute:
      38              filename = os.path.abspath(filename)
      39          return filename
      40      elif spec is not None:
      41          module = spec.name
      42          if module.endswith('.__main__'):
      43              module = module[:-9]
      44          return f'{sys.executable} -m {module}'
      45      else:
      46          if absolute:
      47              filename = os.path.abspath(filename)
      48          return f'{sys.executable} {filename}'
      49  
      50  
      51  def _find_script():
      52      frame = sys._getframe(2)
      53      while frame.f_globals['__name__'] != '__main__':
      54          frame = frame.f_back
      55  
      56      # This should match sys.argv[0].
      57      filename = frame.f_globals['__file__']
      58      # This will be None if -m wasn't used..
      59      spec = frame.f_globals['__spec__']
      60      return filename, spec
      61  
      62  
      63  def is_installed(filename, *, allowsuffix=True):
      64      if not allowsuffix and filename.endswith('.py'):
      65          return False
      66      filename = os.path.abspath(os.path.normalize(filename))
      67      found = shutil.which(os.path.basename(filename))
      68      if not found:
      69          return False
      70      if found != filename:
      71          return False
      72      return _is_standalone(filename)
      73  
      74  
      75  def is_standalone(filename):
      76      filename = os.path.abspath(os.path.normalize(filename))
      77      return _is_standalone(filename)
      78  
      79  
      80  def _is_standalone(filename):
      81      return fsutil.is_executable(filename)
      82  
      83  
      84  ##################################
      85  # logging
      86  
      87  VERBOSITY = 3
      88  
      89  TRACEBACK = os.environ.get('SHOW_TRACEBACK', '').strip()
      90  TRACEBACK = bool(TRACEBACK and TRACEBACK.upper() not in ('0', 'FALSE', 'NO'))
      91  
      92  
      93  logger = logging.getLogger(__name__)
      94  
      95  
      96  def configure_logger(verbosity, logger=None, **kwargs):
      97      if logger is None:
      98          # Configure the root logger.
      99          logger = logging.getLogger()
     100      loggingutil.configure_logger(logger, verbosity, **kwargs)
     101  
     102  
     103  ##################################
     104  # selections
     105  
     106  class ESC[4;38;5;81mUnsupportedSelectionError(ESC[4;38;5;149mException):
     107      def __init__(self, values, possible):
     108          self.values = tuple(values)
     109          self.possible = tuple(possible)
     110          super().__init__(f'unsupported selections {self.unique}')
     111  
     112      @property
     113      def unique(self):
     114          return tuple(sorted(set(self.values)))
     115  
     116  
     117  def normalize_selection(selected: str, *, possible=None):
     118      if selected in (None, True, False):
     119          return selected
     120      elif isinstance(selected, str):
     121          selected = [selected]
     122      elif not selected:
     123          return ()
     124  
     125      unsupported = []
     126      _selected = set()
     127      for item in selected:
     128          if not item:
     129              continue
     130          for value in item.strip().replace(',', ' ').split():
     131              if not value:
     132                  continue
     133              # XXX Handle subtraction (leading "-").
     134              if possible and value not in possible and value != 'all':
     135                  unsupported.append(value)
     136              _selected.add(value)
     137      if unsupported:
     138          raise UnsupportedSelectionError(unsupported, tuple(possible))
     139      if 'all' in _selected:
     140          return True
     141      return frozenset(selected)
     142  
     143  
     144  ##################################
     145  # CLI parsing helpers
     146  
     147  class ESC[4;38;5;81mCLIArgSpec(ESC[4;38;5;149mtuple):
     148      def __new__(cls, *args, **kwargs):
     149          return super().__new__(cls, (args, kwargs))
     150  
     151      def __repr__(self):
     152          args, kwargs = self
     153          args = [repr(arg) for arg in args]
     154          for name, value in kwargs.items():
     155              args.append(f'{name}={value!r}')
     156          return f'{type(self).__name__}({", ".join(args)})'
     157  
     158      def __call__(self, parser, *, _noop=(lambda a: None)):
     159          self.apply(parser)
     160          return _noop
     161  
     162      def apply(self, parser):
     163          args, kwargs = self
     164          parser.add_argument(*args, **kwargs)
     165  
     166  
     167  def apply_cli_argspecs(parser, specs):
     168      processors = []
     169      for spec in specs:
     170          if callable(spec):
     171              procs = spec(parser)
     172              _add_procs(processors, procs)
     173          else:
     174              args, kwargs = spec
     175              parser.add_argument(args, kwargs)
     176      return processors
     177  
     178  
     179  def _add_procs(flattened, procs):
     180      # XXX Fail on non-empty, non-callable procs?
     181      if not procs:
     182          return
     183      if callable(procs):
     184          flattened.append(procs)
     185      else:
     186          #processors.extend(p for p in procs if callable(p))
     187          for proc in procs:
     188              _add_procs(flattened, proc)
     189  
     190  
     191  def add_verbosity_cli(parser):
     192      parser.add_argument('-q', '--quiet', action='count', default=0)
     193      parser.add_argument('-v', '--verbose', action='count', default=0)
     194  
     195      def process_args(args, *, argv=None):
     196          ns = vars(args)
     197          key = 'verbosity'
     198          if key in ns:
     199              parser.error(f'duplicate arg {key!r}')
     200          ns[key] = max(0, VERBOSITY + ns.pop('verbose') - ns.pop('quiet'))
     201          return key
     202      return process_args
     203  
     204  
     205  def add_traceback_cli(parser):
     206      parser.add_argument('--traceback', '--tb', action='store_true',
     207                          default=TRACEBACK)
     208      parser.add_argument('--no-traceback', '--no-tb', dest='traceback',
     209                          action='store_const', const=False)
     210  
     211      def process_args(args, *, argv=None):
     212          ns = vars(args)
     213          key = 'traceback_cm'
     214          if key in ns:
     215              parser.error(f'duplicate arg {key!r}')
     216          showtb = ns.pop('traceback')
     217  
     218          @contextlib.contextmanager
     219          def traceback_cm():
     220              restore = loggingutil.hide_emit_errors()
     221              try:
     222                  yield
     223              except BrokenPipeError:
     224                  # It was piped to "head" or something similar.
     225                  pass
     226              except NotImplementedError:
     227                  raise  # re-raise
     228              except Exception as exc:
     229                  if not showtb:
     230                      sys.exit(f'ERROR: {exc}')
     231                  raise  # re-raise
     232              except KeyboardInterrupt:
     233                  if not showtb:
     234                      sys.exit('\nINTERRUPTED')
     235                  raise  # re-raise
     236              except BaseException as exc:
     237                  if not showtb:
     238                      sys.exit(f'{type(exc).__name__}: {exc}')
     239                  raise  # re-raise
     240              finally:
     241                  restore()
     242          ns[key] = traceback_cm()
     243          return key
     244      return process_args
     245  
     246  
     247  def add_sepval_cli(parser, opt, dest, choices, *, sep=',', **kwargs):
     248  #    if opt is True:
     249  #        parser.add_argument(f'--{dest}', action='append', **kwargs)
     250  #    elif isinstance(opt, str) and opt.startswith('-'):
     251  #        parser.add_argument(opt, dest=dest, action='append', **kwargs)
     252  #    else:
     253  #        arg = dest if not opt else opt
     254  #        kwargs.setdefault('nargs', '+')
     255  #        parser.add_argument(arg, dest=dest, action='append', **kwargs)
     256      if not isinstance(opt, str):
     257          parser.error(f'opt must be a string, got {opt!r}')
     258      elif opt.startswith('-'):
     259          parser.add_argument(opt, dest=dest, action='append', **kwargs)
     260      else:
     261          kwargs.setdefault('nargs', '+')
     262          #kwargs.setdefault('metavar', opt.upper())
     263          parser.add_argument(opt, dest=dest, action='append', **kwargs)
     264  
     265      def process_args(args, *, argv=None):
     266          ns = vars(args)
     267  
     268          # XXX Use normalize_selection()?
     269          if isinstance(ns[dest], str):
     270              ns[dest] = [ns[dest]]
     271          selections = []
     272          for many in ns[dest] or ():
     273              for value in many.split(sep):
     274                  if value not in choices:
     275                      parser.error(f'unknown {dest} {value!r}')
     276                  selections.append(value)
     277          ns[dest] = selections
     278      return process_args
     279  
     280  
     281  def add_files_cli(parser, *, excluded=None, nargs=None):
     282      process_files = add_file_filtering_cli(parser, excluded=excluded)
     283      parser.add_argument('filenames', nargs=nargs or '+', metavar='FILENAME')
     284      return [
     285          process_files,
     286      ]
     287  
     288  
     289  def add_file_filtering_cli(parser, *, excluded=None):
     290      parser.add_argument('--start')
     291      parser.add_argument('--include', action='append')
     292      parser.add_argument('--exclude', action='append')
     293  
     294      excluded = tuple(excluded or ())
     295  
     296      def process_args(args, *, argv=None):
     297          ns = vars(args)
     298          key = 'iter_filenames'
     299          if key in ns:
     300              parser.error(f'duplicate arg {key!r}')
     301  
     302          _include = tuple(ns.pop('include') or ())
     303          _exclude = excluded + tuple(ns.pop('exclude') or ())
     304          kwargs = dict(
     305              start=ns.pop('start'),
     306              include=tuple(_parse_files(_include)),
     307              exclude=tuple(_parse_files(_exclude)),
     308              # We use the default for "show_header"
     309          )
     310          def process_filenames(filenames, relroot=None):
     311              return fsutil.process_filenames(filenames, relroot=relroot, **kwargs)
     312          ns[key] = process_filenames
     313      return process_args
     314  
     315  
     316  def _parse_files(filenames):
     317      for filename, _ in strutil.parse_entries(filenames):
     318          yield filename.strip()
     319  
     320  
     321  def add_progress_cli(parser, *, threshold=VERBOSITY, **kwargs):
     322      parser.add_argument('--progress', dest='track_progress', action='store_const', const=True)
     323      parser.add_argument('--no-progress', dest='track_progress', action='store_false')
     324      parser.set_defaults(track_progress=True)
     325  
     326      def process_args(args, *, argv=None):
     327          if args.track_progress:
     328              ns = vars(args)
     329              verbosity = ns.get('verbosity', VERBOSITY)
     330              if verbosity <= threshold:
     331                  args.track_progress = track_progress_compact
     332              else:
     333                  args.track_progress = track_progress_flat
     334      return process_args
     335  
     336  
     337  def add_failure_filtering_cli(parser, pool, *, default=False):
     338      parser.add_argument('--fail', action='append',
     339                          metavar=f'"{{all|{"|".join(sorted(pool))}}},..."')
     340      parser.add_argument('--no-fail', dest='fail', action='store_const', const=())
     341  
     342      def process_args(args, *, argv=None):
     343          ns = vars(args)
     344  
     345          fail = ns.pop('fail')
     346          try:
     347              fail = normalize_selection(fail, possible=pool)
     348          except UnsupportedSelectionError as exc:
     349              parser.error(f'invalid --fail values: {", ".join(exc.unique)}')
     350          else:
     351              if fail is None:
     352                  fail = default
     353  
     354              if fail is True:
     355                  def ignore_exc(_exc):
     356                      return False
     357              elif fail is False:
     358                  def ignore_exc(_exc):
     359                      return True
     360              else:
     361                  def ignore_exc(exc):
     362                      for err in fail:
     363                          if type(exc) == pool[err]:
     364                              return False
     365                      else:
     366                          return True
     367              args.ignore_exc = ignore_exc
     368      return process_args
     369  
     370  
     371  def add_kind_filtering_cli(parser, *, default=None):
     372      parser.add_argument('--kinds', action='append')
     373  
     374      def process_args(args, *, argv=None):
     375          ns = vars(args)
     376  
     377          kinds = []
     378          for kind in ns.pop('kinds') or default or ():
     379              kinds.extend(kind.strip().replace(',', ' ').split())
     380  
     381          if not kinds:
     382              match_kind = (lambda k: True)
     383          else:
     384              included = set()
     385              excluded = set()
     386              for kind in kinds:
     387                  if kind.startswith('-'):
     388                      kind = kind[1:]
     389                      excluded.add(kind)
     390                      if kind in included:
     391                          included.remove(kind)
     392                  else:
     393                      included.add(kind)
     394                      if kind in excluded:
     395                          excluded.remove(kind)
     396              if excluded:
     397                  if included:
     398                      ...  # XXX fail?
     399                  def match_kind(kind, *, _excluded=excluded):
     400                      return kind not in _excluded
     401              else:
     402                  def match_kind(kind, *, _included=included):
     403                      return kind in _included
     404          args.match_kind = match_kind
     405      return process_args
     406  
     407  
     408  COMMON_CLI = [
     409      add_verbosity_cli,
     410      add_traceback_cli,
     411      #add_dryrun_cli,
     412  ]
     413  
     414  
     415  def add_commands_cli(parser, commands, *, commonspecs=COMMON_CLI, subset=None):
     416      arg_processors = {}
     417      if isinstance(subset, str):
     418          cmdname = subset
     419          try:
     420              _, argspecs, _ = commands[cmdname]
     421          except KeyError:
     422              raise ValueError(f'unsupported subset {subset!r}')
     423          parser.set_defaults(cmd=cmdname)
     424          arg_processors[cmdname] = _add_cmd_cli(parser, commonspecs, argspecs)
     425      else:
     426          if subset is None:
     427              cmdnames = subset = list(commands)
     428          elif not subset:
     429              raise NotImplementedError
     430          elif isinstance(subset, set):
     431              cmdnames = [k for k in commands if k in subset]
     432              subset = sorted(subset)
     433          else:
     434              cmdnames = [n for n in subset if n in commands]
     435          if len(cmdnames) < len(subset):
     436              bad = tuple(n for n in subset if n not in commands)
     437              raise ValueError(f'unsupported subset {bad}')
     438  
     439          common = argparse.ArgumentParser(add_help=False)
     440          common_processors = apply_cli_argspecs(common, commonspecs)
     441          subs = parser.add_subparsers(dest='cmd')
     442          for cmdname in cmdnames:
     443              description, argspecs, _ = commands[cmdname]
     444              sub = subs.add_parser(
     445                  cmdname,
     446                  description=description,
     447                  parents=[common],
     448              )
     449              cmd_processors = _add_cmd_cli(sub, (), argspecs)
     450              arg_processors[cmdname] = common_processors + cmd_processors
     451      return arg_processors
     452  
     453  
     454  def _add_cmd_cli(parser, commonspecs, argspecs):
     455      processors = []
     456      argspecs = list(commonspecs or ()) + list(argspecs or ())
     457      for argspec in argspecs:
     458          if callable(argspec):
     459              procs = argspec(parser)
     460              _add_procs(processors, procs)
     461          else:
     462              if not argspec:
     463                  raise NotImplementedError
     464              args = list(argspec)
     465              if not isinstance(args[-1], str):
     466                  kwargs = args.pop()
     467                  if not isinstance(args[0], str):
     468                      try:
     469                          args, = args
     470                      except (TypeError, ValueError):
     471                          parser.error(f'invalid cmd args {argspec!r}')
     472              else:
     473                  kwargs = {}
     474              parser.add_argument(*args, **kwargs)
     475              # There will be nothing to process.
     476      return processors
     477  
     478  
     479  def _flatten_processors(processors):
     480      for proc in processors:
     481          if proc is None:
     482              continue
     483          if callable(proc):
     484              yield proc
     485          else:
     486              yield from _flatten_processors(proc)
     487  
     488  
     489  def process_args(args, argv, processors, *, keys=None):
     490      processors = _flatten_processors(processors)
     491      ns = vars(args)
     492      extracted = {}
     493      if keys is None:
     494          for process_args in processors:
     495              for key in process_args(args, argv=argv):
     496                  extracted[key] = ns.pop(key)
     497      else:
     498          remainder = set(keys)
     499          for process_args in processors:
     500              hanging = process_args(args, argv=argv)
     501              if isinstance(hanging, str):
     502                  hanging = [hanging]
     503              for key in hanging or ():
     504                  if key not in remainder:
     505                      raise NotImplementedError(key)
     506                  extracted[key] = ns.pop(key)
     507                  remainder.remove(key)
     508          if remainder:
     509              raise NotImplementedError(sorted(remainder))
     510      return extracted
     511  
     512  
     513  def process_args_by_key(args, argv, processors, keys):
     514      extracted = process_args(args, argv, processors, keys=keys)
     515      return [extracted[key] for key in keys]
     516  
     517  
     518  ##################################
     519  # commands
     520  
     521  def set_command(name, add_cli):
     522      """A decorator factory to set CLI info."""
     523      def decorator(func):
     524          if hasattr(func, '__cli__'):
     525              raise Exception(f'already set')
     526          func.__cli__ = (name, add_cli)
     527          return func
     528      return decorator
     529  
     530  
     531  ##################################
     532  # main() helpers
     533  
     534  def filter_filenames(filenames, process_filenames=None, relroot=fsutil.USE_CWD):
     535      # We expect each filename to be a normalized, absolute path.
     536      for filename, _, check, _ in _iter_filenames(filenames, process_filenames, relroot):
     537          if (reason := check()):
     538              logger.debug(f'{filename}: {reason}')
     539              continue
     540          yield filename
     541  
     542  
     543  def main_for_filenames(filenames, process_filenames=None, relroot=fsutil.USE_CWD):
     544      filenames, relroot = fsutil.fix_filenames(filenames, relroot=relroot)
     545      for filename, relfile, check, show in _iter_filenames(filenames, process_filenames, relroot):
     546          if show:
     547              print()
     548              print(relfile)
     549              print('-------------------------------------------')
     550          if (reason := check()):
     551              print(reason)
     552              continue
     553          yield filename, relfile
     554  
     555  
     556  def _iter_filenames(filenames, process, relroot):
     557      if process is None:
     558          yield from fsutil.process_filenames(filenames, relroot=relroot)
     559          return
     560  
     561      onempty = Exception('no filenames provided')
     562      items = process(filenames, relroot=relroot)
     563      items, peeked = iterutil.peek_and_iter(items)
     564      if not items:
     565          raise onempty
     566      if isinstance(peeked, str):
     567          if relroot and relroot is not fsutil.USE_CWD:
     568              relroot = os.path.abspath(relroot)
     569          check = (lambda: True)
     570          for filename, ismany in iterutil.iter_many(items, onempty):
     571              relfile = fsutil.format_filename(filename, relroot, fixroot=False)
     572              yield filename, relfile, check, ismany
     573      elif len(peeked) == 4:
     574          yield from items
     575      else:
     576          raise NotImplementedError
     577  
     578  
     579  def track_progress_compact(items, *, groups=5, **mark_kwargs):
     580      last = os.linesep
     581      marks = iter_marks(groups=groups, **mark_kwargs)
     582      for item in items:
     583          last = next(marks)
     584          print(last, end='', flush=True)
     585          yield item
     586      if not last.endswith(os.linesep):
     587          print()
     588  
     589  
     590  def track_progress_flat(items, fmt='<{}>'):
     591      for item in items:
     592          print(fmt.format(item), flush=True)
     593          yield item
     594  
     595  
     596  def iter_marks(mark='.', *, group=5, groups=2, lines=_NOT_SET, sep=' '):
     597      mark = mark or ''
     598      group = group if group and group > 1 else 1
     599      groups = groups if groups and groups > 1 else 1
     600  
     601      sep = f'{mark}{sep}' if sep else mark
     602      end = f'{mark}{os.linesep}'
     603      div = os.linesep
     604      perline = group * groups
     605      if lines is _NOT_SET:
     606          # By default we try to put about 100 in each line group.
     607          perlines = 100 // perline * perline
     608      elif not lines or lines < 0:
     609          perlines = None
     610      else:
     611          perlines = perline * lines
     612  
     613      if perline == 1:
     614          yield end
     615      elif group == 1:
     616          yield sep
     617  
     618      count = 1
     619      while True:
     620          if count % perline == 0:
     621              yield end
     622              if perlines and count % perlines == 0:
     623                  yield div
     624          elif count % group == 0:
     625              yield sep
     626          else:
     627              yield mark
     628          count += 1