(root)/
Python-3.11.7/
Lib/
test/
libregrtest/
worker.py
       1  import subprocess
       2  import sys
       3  import os
       4  from typing import Any, NoReturn
       5  
       6  from test import support
       7  from test.support import os_helper
       8  
       9  from .setup import setup_process, setup_test_dir
      10  from .runtests import WorkerRunTests, JsonFile, JsonFileType
      11  from .single import run_single_test
      12  from .utils import (
      13      StrPath, StrJSON, TestFilter,
      14      get_temp_dir, get_work_dir, exit_timeout)
      15  
      16  
      17  USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
      18  
      19  
      20  def create_worker_process(runtests: WorkerRunTests, output_fd: int,
      21                            tmp_dir: StrPath | None = None) -> subprocess.Popen:
      22      python_cmd = runtests.python_cmd
      23      worker_json = runtests.as_json()
      24  
      25      python_opts = support.args_from_interpreter_flags()
      26      if python_cmd is not None:
      27          executable = python_cmd
      28          # Remove -E option, since --python=COMMAND can set PYTHON environment
      29          # variables, such as PYTHONPATH, in the worker process.
      30          python_opts = [opt for opt in python_opts if opt != "-E"]
      31      else:
      32          executable = (sys.executable,)
      33      cmd = [*executable, *python_opts,
      34             '-u',    # Unbuffered stdout and stderr
      35             '-m', 'test.libregrtest.worker',
      36             worker_json]
      37  
      38      env = dict(os.environ)
      39      if tmp_dir is not None:
      40          env['TMPDIR'] = tmp_dir
      41          env['TEMP'] = tmp_dir
      42          env['TMP'] = tmp_dir
      43  
      44      # Running the child from the same working directory as regrtest's original
      45      # invocation ensures that TEMPDIR for the child is the same when
      46      # sysconfig.is_python_build() is true. See issue 15300.
      47      #
      48      # Emscripten and WASI Python must start in the Python source code directory
      49      # to get 'python.js' or 'python.wasm' file. Then worker_process() changes
      50      # to a temporary directory created to run tests.
      51      work_dir = os_helper.SAVEDCWD
      52  
      53      kwargs: dict[str, Any] = dict(
      54          env=env,
      55          stdout=output_fd,
      56          # bpo-45410: Write stderr into stdout to keep messages order
      57          stderr=output_fd,
      58          text=True,
      59          close_fds=True,
      60          cwd=work_dir,
      61      )
      62      if USE_PROCESS_GROUP:
      63          kwargs['start_new_session'] = True
      64  
      65      # Pass json_file to the worker process
      66      json_file = runtests.json_file
      67      json_file.configure_subprocess(kwargs)
      68  
      69      with json_file.inherit_subprocess():
      70          return subprocess.Popen(cmd, **kwargs)
      71  
      72  
      73  def worker_process(worker_json: StrJSON) -> NoReturn:
      74      runtests = WorkerRunTests.from_json(worker_json)
      75      test_name = runtests.tests[0]
      76      match_tests: TestFilter = runtests.match_tests
      77      json_file: JsonFile = runtests.json_file
      78  
      79      setup_test_dir(runtests.test_dir)
      80      setup_process()
      81  
      82      if runtests.rerun:
      83          if match_tests:
      84              matching = "matching: " + ", ".join(pattern for pattern, result in match_tests if result)
      85              print(f"Re-running {test_name} in verbose mode ({matching})", flush=True)
      86          else:
      87              print(f"Re-running {test_name} in verbose mode", flush=True)
      88  
      89      result = run_single_test(test_name, runtests)
      90  
      91      if json_file.file_type == JsonFileType.STDOUT:
      92          print()
      93          result.write_json_into(sys.stdout)
      94      else:
      95          with json_file.open('w', encoding='utf-8') as json_fp:
      96              result.write_json_into(json_fp)
      97  
      98      sys.exit(0)
      99  
     100  
     101  def main():
     102      if len(sys.argv) != 2:
     103          print("usage: python -m test.libregrtest.worker JSON")
     104          sys.exit(1)
     105      worker_json = sys.argv[1]
     106  
     107      tmp_dir = get_temp_dir()
     108      work_dir = get_work_dir(tmp_dir, worker=True)
     109  
     110      with exit_timeout():
     111          with os_helper.temp_cwd(work_dir, quiet=True):
     112              worker_process(worker_json)
     113  
     114  
     115  if __name__ == "__main__":
     116      main()