(root)/
Python-3.11.7/
Lib/
test/
libregrtest/
run_workers.py
       1  import contextlib
       2  import dataclasses
       3  import faulthandler
       4  import os.path
       5  import queue
       6  import signal
       7  import subprocess
       8  import sys
       9  import tempfile
      10  import threading
      11  import time
      12  import traceback
      13  from typing import Any, Literal, TextIO
      14  
      15  from test import support
      16  from test.support import os_helper, MS_WINDOWS
      17  
      18  from .logger import Logger
      19  from .result import TestResult, State
      20  from .results import TestResults
      21  from .runtests import RunTests, WorkerRunTests, JsonFile, JsonFileType
      22  from .single import PROGRESS_MIN_TIME
      23  from .utils import (
      24      StrPath, TestName,
      25      format_duration, print_warning, count, plural, get_signal_name)
      26  from .worker import create_worker_process, USE_PROCESS_GROUP
      27  
      28  if MS_WINDOWS:
      29      import locale
      30      import msvcrt
      31  
      32  
      33  
      34  # Display the running tests if nothing happened last N seconds
      35  PROGRESS_UPDATE = 30.0   # seconds
      36  assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME
      37  
      38  # Kill the main process after 5 minutes. It is supposed to write an update
      39  # every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest
      40  # buildbot workers.
      41  MAIN_PROCESS_TIMEOUT = 5 * 60.0
      42  assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE
      43  
      44  # Time to wait until a worker completes: should be immediate
      45  WAIT_COMPLETED_TIMEOUT = 30.0   # seconds
      46  
      47  # Time to wait a killed process (in seconds)
      48  WAIT_KILLED_TIMEOUT = 60.0
      49  
      50  
      51  # We do not use a generator so multiple threads can call next().
      52  class ESC[4;38;5;81mMultiprocessIterator:
      53  
      54      """A thread-safe iterator over tests for multiprocess mode."""
      55  
      56      def __init__(self, tests_iter):
      57          self.lock = threading.Lock()
      58          self.tests_iter = tests_iter
      59  
      60      def __iter__(self):
      61          return self
      62  
      63      def __next__(self):
      64          with self.lock:
      65              if self.tests_iter is None:
      66                  raise StopIteration
      67              return next(self.tests_iter)
      68  
      69      def stop(self):
      70          with self.lock:
      71              self.tests_iter = None
      72  
      73  
      74  @dataclasses.dataclass(slots=True, frozen=True)
      75  class ESC[4;38;5;81mMultiprocessResult:
      76      result: TestResult
      77      # bpo-45410: stderr is written into stdout to keep messages order
      78      worker_stdout: str | None = None
      79      err_msg: str | None = None
      80  
      81  
      82  ExcStr = str
      83  QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr]
      84  
      85  
      86  class ESC[4;38;5;81mExitThread(ESC[4;38;5;149mException):
      87      pass
      88  
      89  
      90  class ESC[4;38;5;81mWorkerError(ESC[4;38;5;149mException):
      91      def __init__(self,
      92                   test_name: TestName,
      93                   err_msg: str | None,
      94                   stdout: str | None,
      95                   state: str):
      96          result = TestResult(test_name, state=state)
      97          self.mp_result = MultiprocessResult(result, stdout, err_msg)
      98          super().__init__()
      99  
     100  
     101  class ESC[4;38;5;81mWorkerThread(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
     102      def __init__(self, worker_id: int, runner: "RunWorkers") -> None:
     103          super().__init__()
     104          self.worker_id = worker_id
     105          self.runtests = runner.runtests
     106          self.pending = runner.pending
     107          self.output = runner.output
     108          self.timeout = runner.worker_timeout
     109          self.log = runner.log
     110          self.test_name: TestName | None = None
     111          self.start_time: float | None = None
     112          self._popen: subprocess.Popen[str] | None = None
     113          self._killed = False
     114          self._stopped = False
     115  
     116      def __repr__(self) -> str:
     117          info = [f'WorkerThread #{self.worker_id}']
     118          if self.is_alive():
     119              info.append("running")
     120          else:
     121              info.append('stopped')
     122          test = self.test_name
     123          if test:
     124              info.append(f'test={test}')
     125          popen = self._popen
     126          if popen is not None:
     127              dt = time.monotonic() - self.start_time
     128              info.extend((f'pid={self._popen.pid}',
     129                           f'time={format_duration(dt)}'))
     130          return '<%s>' % ' '.join(info)
     131  
     132      def _kill(self) -> None:
     133          popen = self._popen
     134          if popen is None:
     135              return
     136  
     137          if self._killed:
     138              return
     139          self._killed = True
     140  
     141          if USE_PROCESS_GROUP:
     142              what = f"{self} process group"
     143          else:
     144              what = f"{self} process"
     145  
     146          print(f"Kill {what}", file=sys.stderr, flush=True)
     147          try:
     148              if USE_PROCESS_GROUP:
     149                  os.killpg(popen.pid, signal.SIGKILL)
     150              else:
     151                  popen.kill()
     152          except ProcessLookupError:
     153              # popen.kill(): the process completed, the WorkerThread thread
     154              # read its exit status, but Popen.send_signal() read the returncode
     155              # just before Popen.wait() set returncode.
     156              pass
     157          except OSError as exc:
     158              print_warning(f"Failed to kill {what}: {exc!r}")
     159  
     160      def stop(self) -> None:
     161          # Method called from a different thread to stop this thread
     162          self._stopped = True
     163          self._kill()
     164  
     165      def _run_process(self, runtests: WorkerRunTests, output_fd: int,
     166                       tmp_dir: StrPath | None = None) -> int | None:
     167          popen = create_worker_process(runtests, output_fd, tmp_dir)
     168          self._popen = popen
     169          self._killed = False
     170  
     171          try:
     172              if self._stopped:
     173                  # If kill() has been called before self._popen is set,
     174                  # self._popen is still running. Call again kill()
     175                  # to ensure that the process is killed.
     176                  self._kill()
     177                  raise ExitThread
     178  
     179              try:
     180                  # gh-94026: stdout+stderr are written to tempfile
     181                  retcode = popen.wait(timeout=self.timeout)
     182                  assert retcode is not None
     183                  return retcode
     184              except subprocess.TimeoutExpired:
     185                  if self._stopped:
     186                      # kill() has been called: communicate() fails on reading
     187                      # closed stdout
     188                      raise ExitThread
     189  
     190                  # On timeout, kill the process
     191                  self._kill()
     192  
     193                  # None means TIMEOUT for the caller
     194                  retcode = None
     195                  # bpo-38207: Don't attempt to call communicate() again: on it
     196                  # can hang until all child processes using stdout
     197                  # pipes completes.
     198              except OSError:
     199                  if self._stopped:
     200                      # kill() has been called: communicate() fails
     201                      # on reading closed stdout
     202                      raise ExitThread
     203                  raise
     204          except:
     205              self._kill()
     206              raise
     207          finally:
     208              self._wait_completed()
     209              self._popen = None
     210  
     211      def create_stdout(self, stack: contextlib.ExitStack) -> TextIO:
     212          """Create stdout temporay file (file descriptor)."""
     213  
     214          if MS_WINDOWS:
     215              # gh-95027: When stdout is not a TTY, Python uses the ANSI code
     216              # page for the sys.stdout encoding. If the main process runs in a
     217              # terminal, sys.stdout uses WindowsConsoleIO with UTF-8 encoding.
     218              encoding = locale.getencoding()
     219          else:
     220              encoding = sys.stdout.encoding
     221  
     222          # gh-94026: Write stdout+stderr to a tempfile as workaround for
     223          # non-blocking pipes on Emscripten with NodeJS.
     224          # gh-109425: Use "backslashreplace" error handler: log corrupted
     225          # stdout+stderr, instead of failing with a UnicodeDecodeError and not
     226          # logging stdout+stderr at all.
     227          stdout_file = tempfile.TemporaryFile('w+',
     228                                               encoding=encoding,
     229                                               errors='backslashreplace')
     230          stack.enter_context(stdout_file)
     231          return stdout_file
     232  
     233      def create_json_file(self, stack: contextlib.ExitStack) -> tuple[JsonFile, TextIO | None]:
     234          """Create JSON file."""
     235  
     236          json_file_use_stdout = self.runtests.json_file_use_stdout()
     237          if json_file_use_stdout:
     238              json_file = JsonFile(None, JsonFileType.STDOUT)
     239              json_tmpfile = None
     240          else:
     241              json_tmpfile = tempfile.TemporaryFile('w+', encoding='utf8')
     242              stack.enter_context(json_tmpfile)
     243  
     244              json_fd = json_tmpfile.fileno()
     245              if MS_WINDOWS:
     246                  # The msvcrt module is only available on Windows;
     247                  # we run mypy with `--platform=linux` in CI
     248                  json_handle: int = msvcrt.get_osfhandle(json_fd)  # type: ignore[attr-defined]
     249                  json_file = JsonFile(json_handle,
     250                                       JsonFileType.WINDOWS_HANDLE)
     251              else:
     252                  json_file = JsonFile(json_fd, JsonFileType.UNIX_FD)
     253          return (json_file, json_tmpfile)
     254  
     255      def create_worker_runtests(self, test_name: TestName, json_file: JsonFile) -> WorkerRunTests:
     256          tests = (test_name,)
     257          if self.runtests.rerun:
     258              match_tests = self.runtests.get_match_tests(test_name)
     259          else:
     260              match_tests = None
     261  
     262          kwargs: dict[str, Any] = {}
     263          if match_tests:
     264              kwargs['match_tests'] = [(test, True) for test in match_tests]
     265          if self.runtests.output_on_failure:
     266              kwargs['verbose'] = True
     267              kwargs['output_on_failure'] = False
     268          return self.runtests.create_worker_runtests(
     269              tests=tests,
     270              json_file=json_file,
     271              **kwargs)
     272  
     273      def run_tmp_files(self, worker_runtests: WorkerRunTests,
     274                        stdout_fd: int) -> tuple[int | None, list[StrPath]]:
     275          # gh-93353: Check for leaked temporary files in the parent process,
     276          # since the deletion of temporary files can happen late during
     277          # Python finalization: too late for libregrtest.
     278          if not support.is_wasi:
     279              # Don't check for leaked temporary files and directories if Python is
     280              # run on WASI. WASI don't pass environment variables like TMPDIR to
     281              # worker processes.
     282              tmp_dir = tempfile.mkdtemp(prefix="test_python_")
     283              tmp_dir = os.path.abspath(tmp_dir)
     284              try:
     285                  retcode = self._run_process(worker_runtests,
     286                                              stdout_fd, tmp_dir)
     287              finally:
     288                  tmp_files = os.listdir(tmp_dir)
     289                  os_helper.rmtree(tmp_dir)
     290          else:
     291              retcode = self._run_process(worker_runtests, stdout_fd)
     292              tmp_files = []
     293  
     294          return (retcode, tmp_files)
     295  
     296      def read_stdout(self, stdout_file: TextIO) -> str:
     297          stdout_file.seek(0)
     298          try:
     299              return stdout_file.read().strip()
     300          except Exception as exc:
     301              # gh-101634: Catch UnicodeDecodeError if stdout cannot be
     302              # decoded from encoding
     303              raise WorkerError(self.test_name,
     304                                f"Cannot read process stdout: {exc}",
     305                                stdout=None,
     306                                state=State.WORKER_BUG)
     307  
     308      def read_json(self, json_file: JsonFile, json_tmpfile: TextIO | None,
     309                    stdout: str) -> tuple[TestResult, str]:
     310          try:
     311              if json_tmpfile is not None:
     312                  json_tmpfile.seek(0)
     313                  worker_json = json_tmpfile.read()
     314              elif json_file.file_type == JsonFileType.STDOUT:
     315                  stdout, _, worker_json = stdout.rpartition("\n")
     316                  stdout = stdout.rstrip()
     317              else:
     318                  with json_file.open(encoding='utf8') as json_fp:
     319                      worker_json = json_fp.read()
     320          except Exception as exc:
     321              # gh-101634: Catch UnicodeDecodeError if stdout cannot be
     322              # decoded from encoding
     323              err_msg = f"Failed to read worker process JSON: {exc}"
     324              raise WorkerError(self.test_name, err_msg, stdout,
     325                                state=State.WORKER_BUG)
     326  
     327          if not worker_json:
     328              raise WorkerError(self.test_name, "empty JSON", stdout,
     329                                state=State.WORKER_BUG)
     330  
     331          try:
     332              result = TestResult.from_json(worker_json)
     333          except Exception as exc:
     334              # gh-101634: Catch UnicodeDecodeError if stdout cannot be
     335              # decoded from encoding
     336              err_msg = f"Failed to parse worker process JSON: {exc}"
     337              raise WorkerError(self.test_name, err_msg, stdout,
     338                                state=State.WORKER_BUG)
     339  
     340          return (result, stdout)
     341  
     342      def _runtest(self, test_name: TestName) -> MultiprocessResult:
     343          with contextlib.ExitStack() as stack:
     344              stdout_file = self.create_stdout(stack)
     345              json_file, json_tmpfile = self.create_json_file(stack)
     346              worker_runtests = self.create_worker_runtests(test_name, json_file)
     347  
     348              retcode: str | int | None
     349              retcode, tmp_files = self.run_tmp_files(worker_runtests,
     350                                                      stdout_file.fileno())
     351  
     352              stdout = self.read_stdout(stdout_file)
     353  
     354              if retcode is None:
     355                  raise WorkerError(self.test_name, stdout=stdout,
     356                                    err_msg=None,
     357                                    state=State.TIMEOUT)
     358              if retcode != 0:
     359                  name = get_signal_name(retcode)
     360                  if name:
     361                      retcode = f"{retcode} ({name})"
     362                  raise WorkerError(self.test_name, f"Exit code {retcode}", stdout,
     363                                    state=State.WORKER_FAILED)
     364  
     365              result, stdout = self.read_json(json_file, json_tmpfile, stdout)
     366  
     367          if tmp_files:
     368              msg = (f'\n\n'
     369                     f'Warning -- {test_name} leaked temporary files '
     370                     f'({len(tmp_files)}): {", ".join(sorted(tmp_files))}')
     371              stdout += msg
     372              result.set_env_changed()
     373  
     374          return MultiprocessResult(result, stdout)
     375  
     376      def run(self) -> None:
     377          fail_fast = self.runtests.fail_fast
     378          fail_env_changed = self.runtests.fail_env_changed
     379          while not self._stopped:
     380              try:
     381                  try:
     382                      test_name = next(self.pending)
     383                  except StopIteration:
     384                      break
     385  
     386                  self.start_time = time.monotonic()
     387                  self.test_name = test_name
     388                  try:
     389                      mp_result = self._runtest(test_name)
     390                  except WorkerError as exc:
     391                      mp_result = exc.mp_result
     392                  finally:
     393                      self.test_name = None
     394                  mp_result.result.duration = time.monotonic() - self.start_time
     395                  self.output.put((False, mp_result))
     396  
     397                  if mp_result.result.must_stop(fail_fast, fail_env_changed):
     398                      break
     399              except ExitThread:
     400                  break
     401              except BaseException:
     402                  self.output.put((True, traceback.format_exc()))
     403                  break
     404  
     405      def _wait_completed(self) -> None:
     406          popen = self._popen
     407  
     408          try:
     409              popen.wait(WAIT_COMPLETED_TIMEOUT)
     410          except (subprocess.TimeoutExpired, OSError) as exc:
     411              print_warning(f"Failed to wait for {self} completion "
     412                            f"(timeout={format_duration(WAIT_COMPLETED_TIMEOUT)}): "
     413                            f"{exc!r}")
     414  
     415      def wait_stopped(self, start_time: float) -> None:
     416          # bpo-38207: RunWorkers.stop_workers() called self.stop()
     417          # which killed the process. Sometimes, killing the process from the
     418          # main thread does not interrupt popen.communicate() in
     419          # WorkerThread thread. This loop with a timeout is a workaround
     420          # for that.
     421          #
     422          # Moreover, if this method fails to join the thread, it is likely
     423          # that Python will hang at exit while calling threading._shutdown()
     424          # which tries again to join the blocked thread. Regrtest.main()
     425          # uses EXIT_TIMEOUT to workaround this second bug.
     426          while True:
     427              # Write a message every second
     428              self.join(1.0)
     429              if not self.is_alive():
     430                  break
     431              dt = time.monotonic() - start_time
     432              self.log(f"Waiting for {self} thread for {format_duration(dt)}")
     433              if dt > WAIT_KILLED_TIMEOUT:
     434                  print_warning(f"Failed to join {self} in {format_duration(dt)}")
     435                  break
     436  
     437  
     438  def get_running(workers: list[WorkerThread]) -> str | None:
     439      running: list[str] = []
     440      for worker in workers:
     441          test_name = worker.test_name
     442          if not test_name:
     443              continue
     444          dt = time.monotonic() - worker.start_time
     445          if dt >= PROGRESS_MIN_TIME:
     446              text = f'{test_name} ({format_duration(dt)})'
     447              running.append(text)
     448      if not running:
     449          return None
     450      return f"running ({len(running)}): {', '.join(running)}"
     451  
     452  
     453  class ESC[4;38;5;81mRunWorkers:
     454      def __init__(self, num_workers: int, runtests: RunTests,
     455                   logger: Logger, results: TestResults) -> None:
     456          self.num_workers = num_workers
     457          self.runtests = runtests
     458          self.log = logger.log
     459          self.display_progress = logger.display_progress
     460          self.results: TestResults = results
     461  
     462          self.output: queue.Queue[QueueOutput] = queue.Queue()
     463          tests_iter = runtests.iter_tests()
     464          self.pending = MultiprocessIterator(tests_iter)
     465          self.timeout = runtests.timeout
     466          if self.timeout is not None:
     467              # Rely on faulthandler to kill a worker process. This timouet is
     468              # when faulthandler fails to kill a worker process. Give a maximum
     469              # of 5 minutes to faulthandler to kill the worker.
     470              self.worker_timeout: float | None = min(self.timeout * 1.5, self.timeout + 5 * 60)
     471          else:
     472              self.worker_timeout = None
     473          self.workers: list[WorkerThread] | None = None
     474  
     475          jobs = self.runtests.get_jobs()
     476          if jobs is not None:
     477              # Don't spawn more threads than the number of jobs:
     478              # these worker threads would never get anything to do.
     479              self.num_workers = min(self.num_workers, jobs)
     480  
     481      def start_workers(self) -> None:
     482          self.workers = [WorkerThread(index, self)
     483                          for index in range(1, self.num_workers + 1)]
     484          jobs = self.runtests.get_jobs()
     485          if jobs is not None:
     486              tests = count(jobs, 'test')
     487          else:
     488              tests = 'tests'
     489          nworkers = len(self.workers)
     490          processes = plural(nworkers, "process", "processes")
     491          msg = (f"Run {tests} in parallel using "
     492                 f"{nworkers} worker {processes}")
     493          if self.timeout:
     494              msg += (" (timeout: %s, worker timeout: %s)"
     495                      % (format_duration(self.timeout),
     496                         format_duration(self.worker_timeout)))
     497          self.log(msg)
     498          for worker in self.workers:
     499              worker.start()
     500  
     501      def stop_workers(self) -> None:
     502          start_time = time.monotonic()
     503          for worker in self.workers:
     504              worker.stop()
     505          for worker in self.workers:
     506              worker.wait_stopped(start_time)
     507  
     508      def _get_result(self) -> QueueOutput | None:
     509          pgo = self.runtests.pgo
     510          use_faulthandler = (self.timeout is not None)
     511  
     512          # bpo-46205: check the status of workers every iteration to avoid
     513          # waiting forever on an empty queue.
     514          while any(worker.is_alive() for worker in self.workers):
     515              if use_faulthandler:
     516                  faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT,
     517                                                    exit=True)
     518  
     519              # wait for a thread
     520              try:
     521                  return self.output.get(timeout=PROGRESS_UPDATE)
     522              except queue.Empty:
     523                  pass
     524  
     525              if not pgo:
     526                  # display progress
     527                  running = get_running(self.workers)
     528                  if running:
     529                      self.log(running)
     530  
     531          # all worker threads are done: consume pending results
     532          try:
     533              return self.output.get(timeout=0)
     534          except queue.Empty:
     535              return None
     536  
     537      def display_result(self, mp_result: MultiprocessResult) -> None:
     538          result = mp_result.result
     539          pgo = self.runtests.pgo
     540  
     541          text = str(result)
     542          if mp_result.err_msg:
     543              # WORKER_BUG
     544              text += ' (%s)' % mp_result.err_msg
     545          elif (result.duration >= PROGRESS_MIN_TIME and not pgo):
     546              text += ' (%s)' % format_duration(result.duration)
     547          if not pgo:
     548              running = get_running(self.workers)
     549              if running:
     550                  text += f' -- {running}'
     551          self.display_progress(self.test_index, text)
     552  
     553      def _process_result(self, item: QueueOutput) -> TestResult:
     554          """Returns True if test runner must stop."""
     555          if item[0]:
     556              # Thread got an exception
     557              format_exc = item[1]
     558              print_warning(f"regrtest worker thread failed: {format_exc}")
     559              result = TestResult("<regrtest worker>", state=State.WORKER_BUG)
     560              self.results.accumulate_result(result, self.runtests)
     561              return result
     562  
     563          self.test_index += 1
     564          mp_result = item[1]
     565          result = mp_result.result
     566          self.results.accumulate_result(result, self.runtests)
     567          self.display_result(mp_result)
     568  
     569          # Display worker stdout
     570          if not self.runtests.output_on_failure:
     571              show_stdout = True
     572          else:
     573              # --verbose3 ignores stdout on success
     574              show_stdout = (result.state != State.PASSED)
     575          if show_stdout:
     576              stdout = mp_result.worker_stdout
     577              if stdout:
     578                  print(stdout, flush=True)
     579  
     580          return result
     581  
     582      def run(self) -> None:
     583          fail_fast = self.runtests.fail_fast
     584          fail_env_changed = self.runtests.fail_env_changed
     585  
     586          self.start_workers()
     587  
     588          self.test_index = 0
     589          try:
     590              while True:
     591                  item = self._get_result()
     592                  if item is None:
     593                      break
     594  
     595                  result = self._process_result(item)
     596                  if result.must_stop(fail_fast, fail_env_changed):
     597                      break
     598          except KeyboardInterrupt:
     599              print()
     600              self.results.interrupted = True
     601          finally:
     602              if self.timeout is not None:
     603                  faulthandler.cancel_dump_traceback_later()
     604  
     605              # Always ensure that all worker processes are no longer
     606              # worker when we exit this function
     607              self.pending.stop()
     608              self.stop_workers()