(root)/
Python-3.12.0/
Lib/
test/
libregrtest/
runtest_mp.py
       1  import dataclasses
       2  import faulthandler
       3  import json
       4  import os.path
       5  import queue
       6  import signal
       7  import subprocess
       8  import sys
       9  import tempfile
      10  import threading
      11  import time
      12  import traceback
      13  from typing import NamedTuple, NoReturn, Literal, Any, TextIO
      14  
      15  from test import support
      16  from test.support import os_helper
      17  from test.support import TestStats
      18  
      19  from test.libregrtest.cmdline import Namespace
      20  from test.libregrtest.main import Regrtest
      21  from test.libregrtest.runtest import (
      22      runtest, TestResult, State, PROGRESS_MIN_TIME,
      23      MatchTests, RunTests)
      24  from test.libregrtest.setup import setup_tests
      25  from test.libregrtest.utils import format_duration, print_warning
      26  
      27  if sys.platform == 'win32':
      28      import locale
      29  
      30  
      31  # Display the running tests if nothing happened last N seconds
      32  PROGRESS_UPDATE = 30.0   # seconds
      33  assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME
      34  
      35  # Kill the main process after 5 minutes. It is supposed to write an update
      36  # every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest
      37  # buildbot workers.
      38  MAIN_PROCESS_TIMEOUT = 5 * 60.0
      39  assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE
      40  
      41  # Time to wait until a worker completes: should be immediate
      42  JOIN_TIMEOUT = 30.0   # seconds
      43  
      44  USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
      45  
      46  
      47  @dataclasses.dataclass(slots=True)
      48  class ESC[4;38;5;81mWorkerJob:
      49      test_name: str
      50      namespace: Namespace
      51      rerun: bool = False
      52      match_tests: MatchTests | None = None
      53  
      54  
      55  class ESC[4;38;5;81m_EncodeWorkerJob(ESC[4;38;5;149mjsonESC[4;38;5;149m.ESC[4;38;5;149mJSONEncoder):
      56      def default(self, o: Any) -> dict[str, Any]:
      57          match o:
      58              case WorkerJob():
      59                  result = dataclasses.asdict(o)
      60                  result["__worker_job__"] = True
      61                  return result
      62              case Namespace():
      63                  result = vars(o)
      64                  result["__namespace__"] = True
      65                  return result
      66              case _:
      67                  return super().default(o)
      68  
      69  
      70  def _decode_worker_job(d: dict[str, Any]) -> WorkerJob | dict[str, Any]:
      71      if "__worker_job__" in d:
      72          d.pop('__worker_job__')
      73          return WorkerJob(**d)
      74      if "__namespace__" in d:
      75          d.pop('__namespace__')
      76          return Namespace(**d)
      77      else:
      78          return d
      79  
      80  
      81  def _parse_worker_args(worker_json: str) -> tuple[Namespace, str]:
      82      return json.loads(worker_json,
      83                        object_hook=_decode_worker_job)
      84  
      85  
      86  def run_test_in_subprocess(worker_job: WorkerJob,
      87                             output_file: TextIO,
      88                             tmp_dir: str | None = None) -> subprocess.Popen:
      89      ns = worker_job.namespace
      90      python = ns.python
      91      worker_args = json.dumps(worker_job, cls=_EncodeWorkerJob)
      92  
      93      if python is not None:
      94          executable = python
      95      else:
      96          executable = [sys.executable]
      97      cmd = [*executable, *support.args_from_interpreter_flags(),
      98             '-u',    # Unbuffered stdout and stderr
      99             '-m', 'test.regrtest',
     100             '--worker-args', worker_args]
     101  
     102      env = dict(os.environ)
     103      if tmp_dir is not None:
     104          env['TMPDIR'] = tmp_dir
     105          env['TEMP'] = tmp_dir
     106          env['TMP'] = tmp_dir
     107  
     108      # Running the child from the same working directory as regrtest's original
     109      # invocation ensures that TEMPDIR for the child is the same when
     110      # sysconfig.is_python_build() is true. See issue 15300.
     111      kw = dict(
     112          env=env,
     113          stdout=output_file,
     114          # bpo-45410: Write stderr into stdout to keep messages order
     115          stderr=output_file,
     116          text=True,
     117          close_fds=(os.name != 'nt'),
     118          cwd=os_helper.SAVEDCWD,
     119      )
     120      if USE_PROCESS_GROUP:
     121          kw['start_new_session'] = True
     122      return subprocess.Popen(cmd, **kw)
     123  
     124  
     125  def run_tests_worker(worker_json: str) -> NoReturn:
     126      worker_job = _parse_worker_args(worker_json)
     127      ns = worker_job.namespace
     128      test_name = worker_job.test_name
     129      rerun = worker_job.rerun
     130      match_tests = worker_job.match_tests
     131  
     132      setup_tests(ns)
     133  
     134      if rerun:
     135          if match_tests:
     136              matching = "matching: " + ", ".join(match_tests)
     137              print(f"Re-running {test_name} in verbose mode ({matching})", flush=True)
     138          else:
     139              print(f"Re-running {test_name} in verbose mode", flush=True)
     140          ns.verbose = True
     141  
     142      if match_tests is not None:
     143          ns.match_tests = match_tests
     144  
     145      result = runtest(ns, test_name)
     146      print()   # Force a newline (just in case)
     147  
     148      # Serialize TestResult as dict in JSON
     149      print(json.dumps(result, cls=EncodeTestResult), flush=True)
     150      sys.exit(0)
     151  
     152  
     153  # We do not use a generator so multiple threads can call next().
     154  class ESC[4;38;5;81mMultiprocessIterator:
     155  
     156      """A thread-safe iterator over tests for multiprocess mode."""
     157  
     158      def __init__(self, tests_iter):
     159          self.lock = threading.Lock()
     160          self.tests_iter = tests_iter
     161  
     162      def __iter__(self):
     163          return self
     164  
     165      def __next__(self):
     166          with self.lock:
     167              if self.tests_iter is None:
     168                  raise StopIteration
     169              return next(self.tests_iter)
     170  
     171      def stop(self):
     172          with self.lock:
     173              self.tests_iter = None
     174  
     175  
     176  class ESC[4;38;5;81mMultiprocessResult(ESC[4;38;5;149mNamedTuple):
     177      result: TestResult
     178      # bpo-45410: stderr is written into stdout to keep messages order
     179      worker_stdout: str | None = None
     180      err_msg: str | None = None
     181  
     182  
     183  ExcStr = str
     184  QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr]
     185  
     186  
     187  class ESC[4;38;5;81mExitThread(ESC[4;38;5;149mException):
     188      pass
     189  
     190  
     191  class ESC[4;38;5;81mTestWorkerProcess(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
     192      def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None:
     193          super().__init__()
     194          self.worker_id = worker_id
     195          self.runtests = runner.runtests
     196          self.pending = runner.pending
     197          self.output = runner.output
     198          self.ns = runner.ns
     199          self.timeout = runner.worker_timeout
     200          self.regrtest = runner.regrtest
     201          self.rerun = runner.rerun
     202          self.current_test_name = None
     203          self.start_time = None
     204          self._popen = None
     205          self._killed = False
     206          self._stopped = False
     207  
     208      def __repr__(self) -> str:
     209          info = [f'TestWorkerProcess #{self.worker_id}']
     210          if self.is_alive():
     211              info.append("running")
     212          else:
     213              info.append('stopped')
     214          test = self.current_test_name
     215          if test:
     216              info.append(f'test={test}')
     217          popen = self._popen
     218          if popen is not None:
     219              dt = time.monotonic() - self.start_time
     220              info.extend((f'pid={self._popen.pid}',
     221                           f'time={format_duration(dt)}'))
     222          return '<%s>' % ' '.join(info)
     223  
     224      def _kill(self) -> None:
     225          popen = self._popen
     226          if popen is None:
     227              return
     228  
     229          if self._killed:
     230              return
     231          self._killed = True
     232  
     233          if USE_PROCESS_GROUP:
     234              what = f"{self} process group"
     235          else:
     236              what = f"{self}"
     237  
     238          print(f"Kill {what}", file=sys.stderr, flush=True)
     239          try:
     240              if USE_PROCESS_GROUP:
     241                  os.killpg(popen.pid, signal.SIGKILL)
     242              else:
     243                  popen.kill()
     244          except ProcessLookupError:
     245              # popen.kill(): the process completed, the TestWorkerProcess thread
     246              # read its exit status, but Popen.send_signal() read the returncode
     247              # just before Popen.wait() set returncode.
     248              pass
     249          except OSError as exc:
     250              print_warning(f"Failed to kill {what}: {exc!r}")
     251  
     252      def stop(self) -> None:
     253          # Method called from a different thread to stop this thread
     254          self._stopped = True
     255          self._kill()
     256  
     257      def mp_result_error(
     258          self,
     259          test_result: TestResult,
     260          stdout: str | None = None,
     261          err_msg=None
     262      ) -> MultiprocessResult:
     263          return MultiprocessResult(test_result, stdout, err_msg)
     264  
     265      def _run_process(self, worker_job, output_file: TextIO,
     266                       tmp_dir: str | None = None) -> int:
     267          self.current_test_name = worker_job.test_name
     268          try:
     269              popen = run_test_in_subprocess(worker_job, output_file, tmp_dir)
     270  
     271              self._killed = False
     272              self._popen = popen
     273          except:
     274              self.current_test_name = None
     275              raise
     276  
     277          try:
     278              if self._stopped:
     279                  # If kill() has been called before self._popen is set,
     280                  # self._popen is still running. Call again kill()
     281                  # to ensure that the process is killed.
     282                  self._kill()
     283                  raise ExitThread
     284  
     285              try:
     286                  # gh-94026: stdout+stderr are written to tempfile
     287                  retcode = popen.wait(timeout=self.timeout)
     288                  assert retcode is not None
     289                  return retcode
     290              except subprocess.TimeoutExpired:
     291                  if self._stopped:
     292                      # kill() has been called: communicate() fails on reading
     293                      # closed stdout
     294                      raise ExitThread
     295  
     296                  # On timeout, kill the process
     297                  self._kill()
     298  
     299                  # None means TIMEOUT for the caller
     300                  retcode = None
     301                  # bpo-38207: Don't attempt to call communicate() again: on it
     302                  # can hang until all child processes using stdout
     303                  # pipes completes.
     304              except OSError:
     305                  if self._stopped:
     306                      # kill() has been called: communicate() fails
     307                      # on reading closed stdout
     308                      raise ExitThread
     309                  raise
     310          except:
     311              self._kill()
     312              raise
     313          finally:
     314              self._wait_completed()
     315              self._popen = None
     316              self.current_test_name = None
     317  
     318      def _runtest(self, test_name: str) -> MultiprocessResult:
     319          if sys.platform == 'win32':
     320              # gh-95027: When stdout is not a TTY, Python uses the ANSI code
     321              # page for the sys.stdout encoding. If the main process runs in a
     322              # terminal, sys.stdout uses WindowsConsoleIO with UTF-8 encoding.
     323              encoding = locale.getencoding()
     324          else:
     325              encoding = sys.stdout.encoding
     326  
     327          match_tests = self.runtests.get_match_tests(test_name)
     328  
     329          # gh-94026: Write stdout+stderr to a tempfile as workaround for
     330          # non-blocking pipes on Emscripten with NodeJS.
     331          with tempfile.TemporaryFile('w+', encoding=encoding) as stdout_file:
     332              worker_job = WorkerJob(test_name,
     333                                     namespace=self.ns,
     334                                     rerun=self.rerun,
     335                                     match_tests=match_tests)
     336              # gh-93353: Check for leaked temporary files in the parent process,
     337              # since the deletion of temporary files can happen late during
     338              # Python finalization: too late for libregrtest.
     339              if not support.is_wasi:
     340                  # Don't check for leaked temporary files and directories if Python is
     341                  # run on WASI. WASI don't pass environment variables like TMPDIR to
     342                  # worker processes.
     343                  tmp_dir = tempfile.mkdtemp(prefix="test_python_")
     344                  tmp_dir = os.path.abspath(tmp_dir)
     345                  try:
     346                      retcode = self._run_process(worker_job, stdout_file, tmp_dir)
     347                  finally:
     348                      tmp_files = os.listdir(tmp_dir)
     349                      os_helper.rmtree(tmp_dir)
     350              else:
     351                  retcode = self._run_process(worker_job, stdout_file)
     352                  tmp_files = ()
     353              stdout_file.seek(0)
     354  
     355              try:
     356                  stdout = stdout_file.read().strip()
     357              except Exception as exc:
     358                  # gh-101634: Catch UnicodeDecodeError if stdout cannot be
     359                  # decoded from encoding
     360                  err_msg = f"Cannot read process stdout: {exc}"
     361                  result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
     362                  return self.mp_result_error(result, err_msg=err_msg)
     363  
     364          if retcode is None:
     365              result = TestResult(test_name, state=State.TIMEOUT)
     366              return self.mp_result_error(result, stdout)
     367  
     368          err_msg = None
     369          if retcode != 0:
     370              err_msg = "Exit code %s" % retcode
     371          else:
     372              stdout, _, worker_json = stdout.rpartition("\n")
     373              stdout = stdout.rstrip()
     374              if not worker_json:
     375                  err_msg = "Failed to parse worker stdout"
     376              else:
     377                  try:
     378                      # deserialize run_tests_worker() output
     379                      result = json.loads(worker_json,
     380                                          object_hook=decode_test_result)
     381                  except Exception as exc:
     382                      err_msg = "Failed to parse worker JSON: %s" % exc
     383  
     384          if err_msg:
     385              result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
     386              return self.mp_result_error(result, stdout, err_msg)
     387  
     388          if tmp_files:
     389              msg = (f'\n\n'
     390                     f'Warning -- {test_name} leaked temporary files '
     391                     f'({len(tmp_files)}): {", ".join(sorted(tmp_files))}')
     392              stdout += msg
     393              result.set_env_changed()
     394  
     395          return MultiprocessResult(result, stdout)
     396  
     397      def run(self) -> None:
     398          fail_fast = self.ns.failfast
     399          fail_env_changed = self.ns.fail_env_changed
     400          while not self._stopped:
     401              try:
     402                  try:
     403                      test_name = next(self.pending)
     404                  except StopIteration:
     405                      break
     406  
     407                  self.start_time = time.monotonic()
     408                  mp_result = self._runtest(test_name)
     409                  mp_result.result.duration = time.monotonic() - self.start_time
     410                  self.output.put((False, mp_result))
     411  
     412                  if mp_result.result.must_stop(fail_fast, fail_env_changed):
     413                      break
     414              except ExitThread:
     415                  break
     416              except BaseException:
     417                  self.output.put((True, traceback.format_exc()))
     418                  break
     419  
     420      def _wait_completed(self) -> None:
     421          popen = self._popen
     422  
     423          try:
     424              popen.wait(JOIN_TIMEOUT)
     425          except (subprocess.TimeoutExpired, OSError) as exc:
     426              print_warning(f"Failed to wait for {self} completion "
     427                            f"(timeout={format_duration(JOIN_TIMEOUT)}): "
     428                            f"{exc!r}")
     429  
     430      def wait_stopped(self, start_time: float) -> None:
     431          # bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop()
     432          # which killed the process. Sometimes, killing the process from the
     433          # main thread does not interrupt popen.communicate() in
     434          # TestWorkerProcess thread. This loop with a timeout is a workaround
     435          # for that.
     436          #
     437          # Moreover, if this method fails to join the thread, it is likely
     438          # that Python will hang at exit while calling threading._shutdown()
     439          # which tries again to join the blocked thread. Regrtest.main()
     440          # uses EXIT_TIMEOUT to workaround this second bug.
     441          while True:
     442              # Write a message every second
     443              self.join(1.0)
     444              if not self.is_alive():
     445                  break
     446              dt = time.monotonic() - start_time
     447              self.regrtest.log(f"Waiting for {self} thread "
     448                                f"for {format_duration(dt)}")
     449              if dt > JOIN_TIMEOUT:
     450                  print_warning(f"Failed to join {self} in {format_duration(dt)}")
     451                  break
     452  
     453  
     454  def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]:
     455      running = []
     456      for worker in workers:
     457          current_test_name = worker.current_test_name
     458          if not current_test_name:
     459              continue
     460          dt = time.monotonic() - worker.start_time
     461          if dt >= PROGRESS_MIN_TIME:
     462              text = '%s (%s)' % (current_test_name, format_duration(dt))
     463              running.append(text)
     464      return running
     465  
     466  
     467  class ESC[4;38;5;81mMultiprocessTestRunner:
     468      def __init__(self, regrtest: Regrtest, runtests: RunTests) -> None:
     469          ns = regrtest.ns
     470          timeout = ns.timeout
     471  
     472          self.regrtest = regrtest
     473          self.runtests = runtests
     474          self.rerun = runtests.rerun
     475          self.log = self.regrtest.log
     476          self.ns = ns
     477          self.output: queue.Queue[QueueOutput] = queue.Queue()
     478          tests_iter = runtests.iter_tests()
     479          self.pending = MultiprocessIterator(tests_iter)
     480          if timeout is not None:
     481              # Rely on faulthandler to kill a worker process. This timouet is
     482              # when faulthandler fails to kill a worker process. Give a maximum
     483              # of 5 minutes to faulthandler to kill the worker.
     484              self.worker_timeout = min(timeout * 1.5, timeout + 5 * 60)
     485          else:
     486              self.worker_timeout = None
     487          self.workers = None
     488  
     489      def start_workers(self) -> None:
     490          use_mp = self.ns.use_mp
     491          timeout = self.ns.timeout
     492          self.workers = [TestWorkerProcess(index, self)
     493                          for index in range(1, use_mp + 1)]
     494          msg = f"Run tests in parallel using {len(self.workers)} child processes"
     495          if timeout:
     496              msg += (" (timeout: %s, worker timeout: %s)"
     497                      % (format_duration(timeout),
     498                         format_duration(self.worker_timeout)))
     499          self.log(msg)
     500          for worker in self.workers:
     501              worker.start()
     502  
     503      def stop_workers(self) -> None:
     504          start_time = time.monotonic()
     505          for worker in self.workers:
     506              worker.stop()
     507          for worker in self.workers:
     508              worker.wait_stopped(start_time)
     509  
     510      def _get_result(self) -> QueueOutput | None:
     511          pgo = self.ns.pgo
     512          use_faulthandler = (self.ns.timeout is not None)
     513          timeout = PROGRESS_UPDATE
     514  
     515          # bpo-46205: check the status of workers every iteration to avoid
     516          # waiting forever on an empty queue.
     517          while any(worker.is_alive() for worker in self.workers):
     518              if use_faulthandler:
     519                  faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT,
     520                                                    exit=True)
     521  
     522              # wait for a thread
     523              try:
     524                  return self.output.get(timeout=timeout)
     525              except queue.Empty:
     526                  pass
     527  
     528              # display progress
     529              running = get_running(self.workers)
     530              if running and not pgo:
     531                  self.log('running: %s' % ', '.join(running))
     532  
     533          # all worker threads are done: consume pending results
     534          try:
     535              return self.output.get(timeout=0)
     536          except queue.Empty:
     537              return None
     538  
     539      def display_result(self, mp_result: MultiprocessResult) -> None:
     540          result = mp_result.result
     541          pgo = self.ns.pgo
     542  
     543          text = str(result)
     544          if mp_result.err_msg:
     545              # MULTIPROCESSING_ERROR
     546              text += ' (%s)' % mp_result.err_msg
     547          elif (result.duration >= PROGRESS_MIN_TIME and not pgo):
     548              text += ' (%s)' % format_duration(result.duration)
     549          running = get_running(self.workers)
     550          if running and not pgo:
     551              text += ' -- running: %s' % ', '.join(running)
     552          self.regrtest.display_progress(self.test_index, text)
     553  
     554      def _process_result(self, item: QueueOutput) -> bool:
     555          """Returns True if test runner must stop."""
     556          rerun = self.runtests.rerun
     557          if item[0]:
     558              # Thread got an exception
     559              format_exc = item[1]
     560              print_warning(f"regrtest worker thread failed: {format_exc}")
     561              result = TestResult("<regrtest worker>", state=State.MULTIPROCESSING_ERROR)
     562              self.regrtest.accumulate_result(result, rerun=rerun)
     563              return result
     564  
     565          self.test_index += 1
     566          mp_result = item[1]
     567          result = mp_result.result
     568          self.regrtest.accumulate_result(result, rerun=rerun)
     569          self.display_result(mp_result)
     570  
     571          if mp_result.worker_stdout:
     572              print(mp_result.worker_stdout, flush=True)
     573  
     574          return result
     575  
     576      def run_tests(self) -> None:
     577          fail_fast = self.ns.failfast
     578          fail_env_changed = self.ns.fail_env_changed
     579          timeout = self.ns.timeout
     580  
     581          self.start_workers()
     582  
     583          self.test_index = 0
     584          try:
     585              while True:
     586                  item = self._get_result()
     587                  if item is None:
     588                      break
     589  
     590                  result = self._process_result(item)
     591                  if result.must_stop(fail_fast, fail_env_changed):
     592                      break
     593          except KeyboardInterrupt:
     594              print()
     595              self.regrtest.interrupted = True
     596          finally:
     597              if timeout is not None:
     598                  faulthandler.cancel_dump_traceback_later()
     599  
     600              # Always ensure that all worker processes are no longer
     601              # worker when we exit this function
     602              self.pending.stop()
     603              self.stop_workers()
     604  
     605  
     606  def run_tests_multiprocess(regrtest: Regrtest, runtests: RunTests) -> None:
     607      MultiprocessTestRunner(regrtest, runtests).run_tests()
     608  
     609  
     610  class ESC[4;38;5;81mEncodeTestResult(ESC[4;38;5;149mjsonESC[4;38;5;149m.ESC[4;38;5;149mJSONEncoder):
     611      """Encode a TestResult (sub)class object into a JSON dict."""
     612  
     613      def default(self, o: Any) -> dict[str, Any]:
     614          if isinstance(o, TestResult):
     615              result = dataclasses.asdict(o)
     616              result["__test_result__"] = o.__class__.__name__
     617              return result
     618  
     619          return super().default(o)
     620  
     621  
     622  def decode_test_result(d: dict[str, Any]) -> TestResult | dict[str, Any]:
     623      """Decode a TestResult (sub)class object from a JSON dict."""
     624  
     625      if "__test_result__" not in d:
     626          return d
     627  
     628      d.pop('__test_result__')
     629      if d['stats'] is not None:
     630          d['stats'] = TestStats(**d['stats'])
     631      return TestResult(**d)