python (3.12.0)
1 import dataclasses
2 import faulthandler
3 import json
4 import os.path
5 import queue
6 import signal
7 import subprocess
8 import sys
9 import tempfile
10 import threading
11 import time
12 import traceback
13 from typing import NamedTuple, NoReturn, Literal, Any, TextIO
14
15 from test import support
16 from test.support import os_helper
17 from test.support import TestStats
18
19 from test.libregrtest.cmdline import Namespace
20 from test.libregrtest.main import Regrtest
21 from test.libregrtest.runtest import (
22 runtest, TestResult, State, PROGRESS_MIN_TIME,
23 MatchTests, RunTests)
24 from test.libregrtest.setup import setup_tests
25 from test.libregrtest.utils import format_duration, print_warning
26
27 if sys.platform == 'win32':
28 import locale
29
30
31 # Display the running tests if nothing happened last N seconds
32 PROGRESS_UPDATE = 30.0 # seconds
33 assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME
34
35 # Kill the main process after 5 minutes. It is supposed to write an update
36 # every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest
37 # buildbot workers.
38 MAIN_PROCESS_TIMEOUT = 5 * 60.0
39 assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE
40
41 # Time to wait until a worker completes: should be immediate
42 JOIN_TIMEOUT = 30.0 # seconds
43
44 USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
45
46
47 @dataclasses.dataclass(slots=True)
48 class ESC[4;38;5;81mWorkerJob:
49 test_name: str
50 namespace: Namespace
51 rerun: bool = False
52 match_tests: MatchTests | None = None
53
54
55 class ESC[4;38;5;81m_EncodeWorkerJob(ESC[4;38;5;149mjsonESC[4;38;5;149m.ESC[4;38;5;149mJSONEncoder):
56 def default(self, o: Any) -> dict[str, Any]:
57 match o:
58 case WorkerJob():
59 result = dataclasses.asdict(o)
60 result["__worker_job__"] = True
61 return result
62 case Namespace():
63 result = vars(o)
64 result["__namespace__"] = True
65 return result
66 case _:
67 return super().default(o)
68
69
70 def _decode_worker_job(d: dict[str, Any]) -> WorkerJob | dict[str, Any]:
71 if "__worker_job__" in d:
72 d.pop('__worker_job__')
73 return WorkerJob(**d)
74 if "__namespace__" in d:
75 d.pop('__namespace__')
76 return Namespace(**d)
77 else:
78 return d
79
80
81 def _parse_worker_args(worker_json: str) -> tuple[Namespace, str]:
82 return json.loads(worker_json,
83 object_hook=_decode_worker_job)
84
85
86 def run_test_in_subprocess(worker_job: WorkerJob,
87 output_file: TextIO,
88 tmp_dir: str | None = None) -> subprocess.Popen:
89 ns = worker_job.namespace
90 python = ns.python
91 worker_args = json.dumps(worker_job, cls=_EncodeWorkerJob)
92
93 if python is not None:
94 executable = python
95 else:
96 executable = [sys.executable]
97 cmd = [*executable, *support.args_from_interpreter_flags(),
98 '-u', # Unbuffered stdout and stderr
99 '-m', 'test.regrtest',
100 '--worker-args', worker_args]
101
102 env = dict(os.environ)
103 if tmp_dir is not None:
104 env['TMPDIR'] = tmp_dir
105 env['TEMP'] = tmp_dir
106 env['TMP'] = tmp_dir
107
108 # Running the child from the same working directory as regrtest's original
109 # invocation ensures that TEMPDIR for the child is the same when
110 # sysconfig.is_python_build() is true. See issue 15300.
111 kw = dict(
112 env=env,
113 stdout=output_file,
114 # bpo-45410: Write stderr into stdout to keep messages order
115 stderr=output_file,
116 text=True,
117 close_fds=(os.name != 'nt'),
118 cwd=os_helper.SAVEDCWD,
119 )
120 if USE_PROCESS_GROUP:
121 kw['start_new_session'] = True
122 return subprocess.Popen(cmd, **kw)
123
124
125 def run_tests_worker(worker_json: str) -> NoReturn:
126 worker_job = _parse_worker_args(worker_json)
127 ns = worker_job.namespace
128 test_name = worker_job.test_name
129 rerun = worker_job.rerun
130 match_tests = worker_job.match_tests
131
132 setup_tests(ns)
133
134 if rerun:
135 if match_tests:
136 matching = "matching: " + ", ".join(match_tests)
137 print(f"Re-running {test_name} in verbose mode ({matching})", flush=True)
138 else:
139 print(f"Re-running {test_name} in verbose mode", flush=True)
140 ns.verbose = True
141
142 if match_tests is not None:
143 ns.match_tests = match_tests
144
145 result = runtest(ns, test_name)
146 print() # Force a newline (just in case)
147
148 # Serialize TestResult as dict in JSON
149 print(json.dumps(result, cls=EncodeTestResult), flush=True)
150 sys.exit(0)
151
152
153 # We do not use a generator so multiple threads can call next().
154 class ESC[4;38;5;81mMultiprocessIterator:
155
156 """A thread-safe iterator over tests for multiprocess mode."""
157
158 def __init__(self, tests_iter):
159 self.lock = threading.Lock()
160 self.tests_iter = tests_iter
161
162 def __iter__(self):
163 return self
164
165 def __next__(self):
166 with self.lock:
167 if self.tests_iter is None:
168 raise StopIteration
169 return next(self.tests_iter)
170
171 def stop(self):
172 with self.lock:
173 self.tests_iter = None
174
175
176 class ESC[4;38;5;81mMultiprocessResult(ESC[4;38;5;149mNamedTuple):
177 result: TestResult
178 # bpo-45410: stderr is written into stdout to keep messages order
179 worker_stdout: str | None = None
180 err_msg: str | None = None
181
182
183 ExcStr = str
184 QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr]
185
186
187 class ESC[4;38;5;81mExitThread(ESC[4;38;5;149mException):
188 pass
189
190
191 class ESC[4;38;5;81mTestWorkerProcess(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
192 def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None:
193 super().__init__()
194 self.worker_id = worker_id
195 self.runtests = runner.runtests
196 self.pending = runner.pending
197 self.output = runner.output
198 self.ns = runner.ns
199 self.timeout = runner.worker_timeout
200 self.regrtest = runner.regrtest
201 self.rerun = runner.rerun
202 self.current_test_name = None
203 self.start_time = None
204 self._popen = None
205 self._killed = False
206 self._stopped = False
207
208 def __repr__(self) -> str:
209 info = [f'TestWorkerProcess #{self.worker_id}']
210 if self.is_alive():
211 info.append("running")
212 else:
213 info.append('stopped')
214 test = self.current_test_name
215 if test:
216 info.append(f'test={test}')
217 popen = self._popen
218 if popen is not None:
219 dt = time.monotonic() - self.start_time
220 info.extend((f'pid={self._popen.pid}',
221 f'time={format_duration(dt)}'))
222 return '<%s>' % ' '.join(info)
223
224 def _kill(self) -> None:
225 popen = self._popen
226 if popen is None:
227 return
228
229 if self._killed:
230 return
231 self._killed = True
232
233 if USE_PROCESS_GROUP:
234 what = f"{self} process group"
235 else:
236 what = f"{self}"
237
238 print(f"Kill {what}", file=sys.stderr, flush=True)
239 try:
240 if USE_PROCESS_GROUP:
241 os.killpg(popen.pid, signal.SIGKILL)
242 else:
243 popen.kill()
244 except ProcessLookupError:
245 # popen.kill(): the process completed, the TestWorkerProcess thread
246 # read its exit status, but Popen.send_signal() read the returncode
247 # just before Popen.wait() set returncode.
248 pass
249 except OSError as exc:
250 print_warning(f"Failed to kill {what}: {exc!r}")
251
252 def stop(self) -> None:
253 # Method called from a different thread to stop this thread
254 self._stopped = True
255 self._kill()
256
257 def mp_result_error(
258 self,
259 test_result: TestResult,
260 stdout: str | None = None,
261 err_msg=None
262 ) -> MultiprocessResult:
263 return MultiprocessResult(test_result, stdout, err_msg)
264
265 def _run_process(self, worker_job, output_file: TextIO,
266 tmp_dir: str | None = None) -> int:
267 self.current_test_name = worker_job.test_name
268 try:
269 popen = run_test_in_subprocess(worker_job, output_file, tmp_dir)
270
271 self._killed = False
272 self._popen = popen
273 except:
274 self.current_test_name = None
275 raise
276
277 try:
278 if self._stopped:
279 # If kill() has been called before self._popen is set,
280 # self._popen is still running. Call again kill()
281 # to ensure that the process is killed.
282 self._kill()
283 raise ExitThread
284
285 try:
286 # gh-94026: stdout+stderr are written to tempfile
287 retcode = popen.wait(timeout=self.timeout)
288 assert retcode is not None
289 return retcode
290 except subprocess.TimeoutExpired:
291 if self._stopped:
292 # kill() has been called: communicate() fails on reading
293 # closed stdout
294 raise ExitThread
295
296 # On timeout, kill the process
297 self._kill()
298
299 # None means TIMEOUT for the caller
300 retcode = None
301 # bpo-38207: Don't attempt to call communicate() again: on it
302 # can hang until all child processes using stdout
303 # pipes completes.
304 except OSError:
305 if self._stopped:
306 # kill() has been called: communicate() fails
307 # on reading closed stdout
308 raise ExitThread
309 raise
310 except:
311 self._kill()
312 raise
313 finally:
314 self._wait_completed()
315 self._popen = None
316 self.current_test_name = None
317
318 def _runtest(self, test_name: str) -> MultiprocessResult:
319 if sys.platform == 'win32':
320 # gh-95027: When stdout is not a TTY, Python uses the ANSI code
321 # page for the sys.stdout encoding. If the main process runs in a
322 # terminal, sys.stdout uses WindowsConsoleIO with UTF-8 encoding.
323 encoding = locale.getencoding()
324 else:
325 encoding = sys.stdout.encoding
326
327 match_tests = self.runtests.get_match_tests(test_name)
328
329 # gh-94026: Write stdout+stderr to a tempfile as workaround for
330 # non-blocking pipes on Emscripten with NodeJS.
331 with tempfile.TemporaryFile('w+', encoding=encoding) as stdout_file:
332 worker_job = WorkerJob(test_name,
333 namespace=self.ns,
334 rerun=self.rerun,
335 match_tests=match_tests)
336 # gh-93353: Check for leaked temporary files in the parent process,
337 # since the deletion of temporary files can happen late during
338 # Python finalization: too late for libregrtest.
339 if not support.is_wasi:
340 # Don't check for leaked temporary files and directories if Python is
341 # run on WASI. WASI don't pass environment variables like TMPDIR to
342 # worker processes.
343 tmp_dir = tempfile.mkdtemp(prefix="test_python_")
344 tmp_dir = os.path.abspath(tmp_dir)
345 try:
346 retcode = self._run_process(worker_job, stdout_file, tmp_dir)
347 finally:
348 tmp_files = os.listdir(tmp_dir)
349 os_helper.rmtree(tmp_dir)
350 else:
351 retcode = self._run_process(worker_job, stdout_file)
352 tmp_files = ()
353 stdout_file.seek(0)
354
355 try:
356 stdout = stdout_file.read().strip()
357 except Exception as exc:
358 # gh-101634: Catch UnicodeDecodeError if stdout cannot be
359 # decoded from encoding
360 err_msg = f"Cannot read process stdout: {exc}"
361 result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
362 return self.mp_result_error(result, err_msg=err_msg)
363
364 if retcode is None:
365 result = TestResult(test_name, state=State.TIMEOUT)
366 return self.mp_result_error(result, stdout)
367
368 err_msg = None
369 if retcode != 0:
370 err_msg = "Exit code %s" % retcode
371 else:
372 stdout, _, worker_json = stdout.rpartition("\n")
373 stdout = stdout.rstrip()
374 if not worker_json:
375 err_msg = "Failed to parse worker stdout"
376 else:
377 try:
378 # deserialize run_tests_worker() output
379 result = json.loads(worker_json,
380 object_hook=decode_test_result)
381 except Exception as exc:
382 err_msg = "Failed to parse worker JSON: %s" % exc
383
384 if err_msg:
385 result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
386 return self.mp_result_error(result, stdout, err_msg)
387
388 if tmp_files:
389 msg = (f'\n\n'
390 f'Warning -- {test_name} leaked temporary files '
391 f'({len(tmp_files)}): {", ".join(sorted(tmp_files))}')
392 stdout += msg
393 result.set_env_changed()
394
395 return MultiprocessResult(result, stdout)
396
397 def run(self) -> None:
398 fail_fast = self.ns.failfast
399 fail_env_changed = self.ns.fail_env_changed
400 while not self._stopped:
401 try:
402 try:
403 test_name = next(self.pending)
404 except StopIteration:
405 break
406
407 self.start_time = time.monotonic()
408 mp_result = self._runtest(test_name)
409 mp_result.result.duration = time.monotonic() - self.start_time
410 self.output.put((False, mp_result))
411
412 if mp_result.result.must_stop(fail_fast, fail_env_changed):
413 break
414 except ExitThread:
415 break
416 except BaseException:
417 self.output.put((True, traceback.format_exc()))
418 break
419
420 def _wait_completed(self) -> None:
421 popen = self._popen
422
423 try:
424 popen.wait(JOIN_TIMEOUT)
425 except (subprocess.TimeoutExpired, OSError) as exc:
426 print_warning(f"Failed to wait for {self} completion "
427 f"(timeout={format_duration(JOIN_TIMEOUT)}): "
428 f"{exc!r}")
429
430 def wait_stopped(self, start_time: float) -> None:
431 # bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop()
432 # which killed the process. Sometimes, killing the process from the
433 # main thread does not interrupt popen.communicate() in
434 # TestWorkerProcess thread. This loop with a timeout is a workaround
435 # for that.
436 #
437 # Moreover, if this method fails to join the thread, it is likely
438 # that Python will hang at exit while calling threading._shutdown()
439 # which tries again to join the blocked thread. Regrtest.main()
440 # uses EXIT_TIMEOUT to workaround this second bug.
441 while True:
442 # Write a message every second
443 self.join(1.0)
444 if not self.is_alive():
445 break
446 dt = time.monotonic() - start_time
447 self.regrtest.log(f"Waiting for {self} thread "
448 f"for {format_duration(dt)}")
449 if dt > JOIN_TIMEOUT:
450 print_warning(f"Failed to join {self} in {format_duration(dt)}")
451 break
452
453
454 def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]:
455 running = []
456 for worker in workers:
457 current_test_name = worker.current_test_name
458 if not current_test_name:
459 continue
460 dt = time.monotonic() - worker.start_time
461 if dt >= PROGRESS_MIN_TIME:
462 text = '%s (%s)' % (current_test_name, format_duration(dt))
463 running.append(text)
464 return running
465
466
467 class ESC[4;38;5;81mMultiprocessTestRunner:
468 def __init__(self, regrtest: Regrtest, runtests: RunTests) -> None:
469 ns = regrtest.ns
470 timeout = ns.timeout
471
472 self.regrtest = regrtest
473 self.runtests = runtests
474 self.rerun = runtests.rerun
475 self.log = self.regrtest.log
476 self.ns = ns
477 self.output: queue.Queue[QueueOutput] = queue.Queue()
478 tests_iter = runtests.iter_tests()
479 self.pending = MultiprocessIterator(tests_iter)
480 if timeout is not None:
481 # Rely on faulthandler to kill a worker process. This timouet is
482 # when faulthandler fails to kill a worker process. Give a maximum
483 # of 5 minutes to faulthandler to kill the worker.
484 self.worker_timeout = min(timeout * 1.5, timeout + 5 * 60)
485 else:
486 self.worker_timeout = None
487 self.workers = None
488
489 def start_workers(self) -> None:
490 use_mp = self.ns.use_mp
491 timeout = self.ns.timeout
492 self.workers = [TestWorkerProcess(index, self)
493 for index in range(1, use_mp + 1)]
494 msg = f"Run tests in parallel using {len(self.workers)} child processes"
495 if timeout:
496 msg += (" (timeout: %s, worker timeout: %s)"
497 % (format_duration(timeout),
498 format_duration(self.worker_timeout)))
499 self.log(msg)
500 for worker in self.workers:
501 worker.start()
502
503 def stop_workers(self) -> None:
504 start_time = time.monotonic()
505 for worker in self.workers:
506 worker.stop()
507 for worker in self.workers:
508 worker.wait_stopped(start_time)
509
510 def _get_result(self) -> QueueOutput | None:
511 pgo = self.ns.pgo
512 use_faulthandler = (self.ns.timeout is not None)
513 timeout = PROGRESS_UPDATE
514
515 # bpo-46205: check the status of workers every iteration to avoid
516 # waiting forever on an empty queue.
517 while any(worker.is_alive() for worker in self.workers):
518 if use_faulthandler:
519 faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT,
520 exit=True)
521
522 # wait for a thread
523 try:
524 return self.output.get(timeout=timeout)
525 except queue.Empty:
526 pass
527
528 # display progress
529 running = get_running(self.workers)
530 if running and not pgo:
531 self.log('running: %s' % ', '.join(running))
532
533 # all worker threads are done: consume pending results
534 try:
535 return self.output.get(timeout=0)
536 except queue.Empty:
537 return None
538
539 def display_result(self, mp_result: MultiprocessResult) -> None:
540 result = mp_result.result
541 pgo = self.ns.pgo
542
543 text = str(result)
544 if mp_result.err_msg:
545 # MULTIPROCESSING_ERROR
546 text += ' (%s)' % mp_result.err_msg
547 elif (result.duration >= PROGRESS_MIN_TIME and not pgo):
548 text += ' (%s)' % format_duration(result.duration)
549 running = get_running(self.workers)
550 if running and not pgo:
551 text += ' -- running: %s' % ', '.join(running)
552 self.regrtest.display_progress(self.test_index, text)
553
554 def _process_result(self, item: QueueOutput) -> bool:
555 """Returns True if test runner must stop."""
556 rerun = self.runtests.rerun
557 if item[0]:
558 # Thread got an exception
559 format_exc = item[1]
560 print_warning(f"regrtest worker thread failed: {format_exc}")
561 result = TestResult("<regrtest worker>", state=State.MULTIPROCESSING_ERROR)
562 self.regrtest.accumulate_result(result, rerun=rerun)
563 return result
564
565 self.test_index += 1
566 mp_result = item[1]
567 result = mp_result.result
568 self.regrtest.accumulate_result(result, rerun=rerun)
569 self.display_result(mp_result)
570
571 if mp_result.worker_stdout:
572 print(mp_result.worker_stdout, flush=True)
573
574 return result
575
576 def run_tests(self) -> None:
577 fail_fast = self.ns.failfast
578 fail_env_changed = self.ns.fail_env_changed
579 timeout = self.ns.timeout
580
581 self.start_workers()
582
583 self.test_index = 0
584 try:
585 while True:
586 item = self._get_result()
587 if item is None:
588 break
589
590 result = self._process_result(item)
591 if result.must_stop(fail_fast, fail_env_changed):
592 break
593 except KeyboardInterrupt:
594 print()
595 self.regrtest.interrupted = True
596 finally:
597 if timeout is not None:
598 faulthandler.cancel_dump_traceback_later()
599
600 # Always ensure that all worker processes are no longer
601 # worker when we exit this function
602 self.pending.stop()
603 self.stop_workers()
604
605
606 def run_tests_multiprocess(regrtest: Regrtest, runtests: RunTests) -> None:
607 MultiprocessTestRunner(regrtest, runtests).run_tests()
608
609
610 class ESC[4;38;5;81mEncodeTestResult(ESC[4;38;5;149mjsonESC[4;38;5;149m.ESC[4;38;5;149mJSONEncoder):
611 """Encode a TestResult (sub)class object into a JSON dict."""
612
613 def default(self, o: Any) -> dict[str, Any]:
614 if isinstance(o, TestResult):
615 result = dataclasses.asdict(o)
616 result["__test_result__"] = o.__class__.__name__
617 return result
618
619 return super().default(o)
620
621
622 def decode_test_result(d: dict[str, Any]) -> TestResult | dict[str, Any]:
623 """Decode a TestResult (sub)class object from a JSON dict."""
624
625 if "__test_result__" not in d:
626 return d
627
628 d.pop('__test_result__')
629 if d['stats'] is not None:
630 d['stats'] = TestStats(**d['stats'])
631 return TestResult(**d)