1  #!/usr/bin/env python3
       2  """Check proposed changes for common issues."""
       3  import re
       4  import sys
       5  import shutil
       6  import os.path
       7  import subprocess
       8  import sysconfig
       9  
      10  import reindent
      11  import untabify
      12  
      13  
      14  # Excluded directories which are copies of external libraries:
      15  # don't check their coding style
      16  EXCLUDE_DIRS = [
      17      os.path.join('Modules', '_decimal', 'libmpdec'),
      18      os.path.join('Modules', 'expat'),
      19      os.path.join('Modules', 'zlib'),
      20      ]
      21  SRCDIR = sysconfig.get_config_var('srcdir')
      22  
      23  
      24  def n_files_str(count):
      25      """Return 'N file(s)' with the proper plurality on 'file'."""
      26      return "{} file{}".format(count, "s" if count != 1 else "")
      27  
      28  
      29  def status(message, modal=False, info=None):
      30      """Decorator to output status info to stdout."""
      31      def decorated_fxn(fxn):
      32          def call_fxn(*args, **kwargs):
      33              sys.stdout.write(message + ' ... ')
      34              sys.stdout.flush()
      35              result = fxn(*args, **kwargs)
      36              if not modal and not info:
      37                  print("done")
      38              elif info:
      39                  print(info(result))
      40              else:
      41                  print("yes" if result else "NO")
      42              return result
      43          return call_fxn
      44      return decorated_fxn
      45  
      46  
      47  def get_git_branch():
      48      """Get the symbolic name for the current git branch"""
      49      cmd = "git rev-parse --abbrev-ref HEAD".split()
      50      try:
      51          return subprocess.check_output(cmd,
      52                                         stderr=subprocess.DEVNULL,
      53                                         cwd=SRCDIR,
      54                                         encoding='UTF-8')
      55      except subprocess.CalledProcessError:
      56          return None
      57  
      58  
      59  def get_git_upstream_remote():
      60      """Get the remote name to use for upstream branches
      61  
      62      Uses "upstream" if it exists, "origin" otherwise
      63      """
      64      cmd = "git remote get-url upstream".split()
      65      try:
      66          subprocess.check_output(cmd,
      67                                  stderr=subprocess.DEVNULL,
      68                                  cwd=SRCDIR,
      69                                  encoding='UTF-8')
      70      except subprocess.CalledProcessError:
      71          return "origin"
      72      return "upstream"
      73  
      74  
      75  def get_git_remote_default_branch(remote_name):
      76      """Get the name of the default branch for the given remote
      77  
      78      It is typically called 'main', but may differ
      79      """
      80      cmd = "git remote show {}".format(remote_name).split()
      81      env = os.environ.copy()
      82      env['LANG'] = 'C'
      83      try:
      84          remote_info = subprocess.check_output(cmd,
      85                                                stderr=subprocess.DEVNULL,
      86                                                cwd=SRCDIR,
      87                                                encoding='UTF-8',
      88                                                env=env)
      89      except subprocess.CalledProcessError:
      90          return None
      91      for line in remote_info.splitlines():
      92          if "HEAD branch:" in line:
      93              base_branch = line.split(":")[1].strip()
      94              return base_branch
      95      return None
      96  
      97  
      98  @status("Getting base branch for PR",
      99          info=lambda x: x if x is not None else "not a PR branch")
     100  def get_base_branch():
     101      if not os.path.exists(os.path.join(SRCDIR, '.git')):
     102          # Not a git checkout, so there's no base branch
     103          return None
     104      upstream_remote = get_git_upstream_remote()
     105      version = sys.version_info
     106      if version.releaselevel == 'alpha':
     107          base_branch = get_git_remote_default_branch(upstream_remote)
     108      else:
     109          base_branch = "{0.major}.{0.minor}".format(version)
     110      this_branch = get_git_branch()
     111      if this_branch is None or this_branch == base_branch:
     112          # Not on a git PR branch, so there's no base branch
     113          return None
     114      return upstream_remote + "/" + base_branch
     115  
     116  
     117  @status("Getting the list of files that have been added/changed",
     118          info=lambda x: n_files_str(len(x)))
     119  def changed_files(base_branch=None):
     120      """Get the list of changed or added files from git."""
     121      if os.path.exists(os.path.join(SRCDIR, '.git')):
     122          # We just use an existence check here as:
     123          #  directory = normal git checkout/clone
     124          #  file = git worktree directory
     125          if base_branch:
     126              cmd = 'git diff --name-status ' + base_branch
     127          else:
     128              cmd = 'git status --porcelain'
     129          filenames = []
     130          with subprocess.Popen(cmd.split(),
     131                                stdout=subprocess.PIPE,
     132                                cwd=SRCDIR) as st:
     133              git_file_status, _ = st.communicate()
     134              if st.returncode != 0:
     135                  sys.exit(f'error running {cmd}')
     136              for line in git_file_status.splitlines():
     137                  line = line.decode().rstrip()
     138                  status_text, filename = line.split(maxsplit=1)
     139                  status = set(status_text)
     140                  # modified, added or unmerged files
     141                  if not status.intersection('MAU'):
     142                      continue
     143                  if ' -> ' in filename:
     144                      # file is renamed
     145                      filename = filename.split(' -> ', 2)[1].strip()
     146                  filenames.append(filename)
     147      else:
     148          sys.exit('need a git checkout to get modified files')
     149  
     150      filenames2 = []
     151      for filename in filenames:
     152          # Normalize the path to be able to match using .startswith()
     153          filename = os.path.normpath(filename)
     154          if any(filename.startswith(path) for path in EXCLUDE_DIRS):
     155              # Exclude the file
     156              continue
     157          filenames2.append(filename)
     158  
     159      return filenames2
     160  
     161  
     162  def report_modified_files(file_paths):
     163      count = len(file_paths)
     164      if count == 0:
     165          return n_files_str(count)
     166      else:
     167          lines = ["{}:".format(n_files_str(count))]
     168          for path in file_paths:
     169              lines.append("  {}".format(path))
     170          return "\n".join(lines)
     171  
     172  
     173  #: Python files that have tabs by design:
     174  _PYTHON_FILES_WITH_TABS = frozenset({
     175      'Tools/c-analyzer/cpython/_parser.py',
     176  })
     177  
     178  
     179  @status("Fixing Python file whitespace", info=report_modified_files)
     180  def normalize_whitespace(file_paths):
     181      """Make sure that the whitespace for .py files have been normalized."""
     182      reindent.makebackup = False  # No need to create backups.
     183      fixed = [
     184          path for path in file_paths
     185          if (
     186              path.endswith('.py')
     187              and path not in _PYTHON_FILES_WITH_TABS
     188              and reindent.check(os.path.join(SRCDIR, path))
     189          )
     190      ]
     191      return fixed
     192  
     193  
     194  @status("Fixing C file whitespace", info=report_modified_files)
     195  def normalize_c_whitespace(file_paths):
     196      """Report if any C files """
     197      fixed = []
     198      for path in file_paths:
     199          abspath = os.path.join(SRCDIR, path)
     200          with open(abspath, 'r') as f:
     201              if '\t' not in f.read():
     202                  continue
     203          untabify.process(abspath, 8, verbose=False)
     204          fixed.append(path)
     205      return fixed
     206  
     207  
     208  ws_re = re.compile(br'\s+(\r?\n)$')
     209  
     210  @status("Fixing docs whitespace", info=report_modified_files)
     211  def normalize_docs_whitespace(file_paths):
     212      fixed = []
     213      for path in file_paths:
     214          abspath = os.path.join(SRCDIR, path)
     215          try:
     216              with open(abspath, 'rb') as f:
     217                  lines = f.readlines()
     218              new_lines = [ws_re.sub(br'\1', line) for line in lines]
     219              if new_lines != lines:
     220                  shutil.copyfile(abspath, abspath + '.bak')
     221                  with open(abspath, 'wb') as f:
     222                      f.writelines(new_lines)
     223                  fixed.append(path)
     224          except Exception as err:
     225              print('Cannot fix %s: %s' % (path, err))
     226      return fixed
     227  
     228  
     229  @status("Docs modified", modal=True)
     230  def docs_modified(file_paths):
     231      """Report if any file in the Doc directory has been changed."""
     232      return bool(file_paths)
     233  
     234  
     235  @status("Misc/ACKS updated", modal=True)
     236  def credit_given(file_paths):
     237      """Check if Misc/ACKS has been changed."""
     238      return os.path.join('Misc', 'ACKS') in file_paths
     239  
     240  
     241  @status("Misc/NEWS.d updated with `blurb`", modal=True)
     242  def reported_news(file_paths):
     243      """Check if Misc/NEWS.d has been changed."""
     244      return any(p.startswith(os.path.join('Misc', 'NEWS.d', 'next'))
     245                 for p in file_paths)
     246  
     247  @status("configure regenerated", modal=True, info=str)
     248  def regenerated_configure(file_paths):
     249      """Check if configure has been regenerated."""
     250      if 'configure.ac' in file_paths:
     251          return "yes" if 'configure' in file_paths else "no"
     252      else:
     253          return "not needed"
     254  
     255  @status("pyconfig.h.in regenerated", modal=True, info=str)
     256  def regenerated_pyconfig_h_in(file_paths):
     257      """Check if pyconfig.h.in has been regenerated."""
     258      if 'configure.ac' in file_paths:
     259          return "yes" if 'pyconfig.h.in' in file_paths else "no"
     260      else:
     261          return "not needed"
     262  
     263  def ci(pull_request):
     264      if pull_request == 'false':
     265          print('Not a pull request; skipping')
     266          return
     267      base_branch = get_base_branch()
     268      file_paths = changed_files(base_branch)
     269      python_files = [fn for fn in file_paths if fn.endswith('.py')]
     270      c_files = [fn for fn in file_paths if fn.endswith(('.c', '.h'))]
     271      doc_files = [fn for fn in file_paths if fn.startswith('Doc') and
     272                   fn.endswith(('.rst', '.inc'))]
     273      fixed = []
     274      fixed.extend(normalize_whitespace(python_files))
     275      fixed.extend(normalize_c_whitespace(c_files))
     276      fixed.extend(normalize_docs_whitespace(doc_files))
     277      if not fixed:
     278          print('No whitespace issues found')
     279      else:
     280          print(f'Please fix the {len(fixed)} file(s) with whitespace issues')
     281          print('(on UNIX you can run `make patchcheck` to make the fixes)')
     282          sys.exit(1)
     283  
     284  def main():
     285      base_branch = get_base_branch()
     286      file_paths = changed_files(base_branch)
     287      python_files = [fn for fn in file_paths if fn.endswith('.py')]
     288      c_files = [fn for fn in file_paths if fn.endswith(('.c', '.h'))]
     289      doc_files = [fn for fn in file_paths if fn.startswith('Doc') and
     290                   fn.endswith(('.rst', '.inc'))]
     291      misc_files = {p for p in file_paths if p.startswith('Misc')}
     292      # PEP 8 whitespace rules enforcement.
     293      normalize_whitespace(python_files)
     294      # C rules enforcement.
     295      normalize_c_whitespace(c_files)
     296      # Doc whitespace enforcement.
     297      normalize_docs_whitespace(doc_files)
     298      # Docs updated.
     299      docs_modified(doc_files)
     300      # Misc/ACKS changed.
     301      credit_given(misc_files)
     302      # Misc/NEWS changed.
     303      reported_news(misc_files)
     304      # Regenerated configure, if necessary.
     305      regenerated_configure(file_paths)
     306      # Regenerated pyconfig.h.in, if necessary.
     307      regenerated_pyconfig_h_in(file_paths)
     308  
     309      # Test suite run and passed.
     310      if python_files or c_files:
     311          end = " and check for refleaks?" if c_files else "?"
     312          print()
     313          print("Did you run the test suite" + end)
     314  
     315  
     316  if __name__ == '__main__':
     317      import argparse
     318      parser = argparse.ArgumentParser(description=__doc__)
     319      parser.add_argument('--ci',
     320                          help='Perform pass/fail checks')
     321      args = parser.parse_args()
     322      if args.ci:
     323          ci(args.ci)
     324      else:
     325          main()