1 import contextlib
2 import dataclasses
3 import faulthandler
4 import os.path
5 import queue
6 import signal
7 import subprocess
8 import sys
9 import tempfile
10 import threading
11 import time
12 import traceback
13 from typing import Any, Literal, TextIO
14
15 from test import support
16 from test.support import os_helper, MS_WINDOWS
17
18 from .logger import Logger
19 from .result import TestResult, State
20 from .results import TestResults
21 from .runtests import RunTests, WorkerRunTests, JsonFile, JsonFileType
22 from .single import PROGRESS_MIN_TIME
23 from .utils import (
24 StrPath, TestName,
25 format_duration, print_warning, count, plural, get_signal_name)
26 from .worker import create_worker_process, USE_PROCESS_GROUP
27
28 if MS_WINDOWS:
29 import locale
30 import msvcrt
31
32
33
34 # Display the running tests if nothing happened last N seconds
35 PROGRESS_UPDATE = 30.0 # seconds
36 assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME
37
38 # Kill the main process after 5 minutes. It is supposed to write an update
39 # every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest
40 # buildbot workers.
41 MAIN_PROCESS_TIMEOUT = 5 * 60.0
42 assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE
43
44 # Time to wait until a worker completes: should be immediate
45 WAIT_COMPLETED_TIMEOUT = 30.0 # seconds
46
47 # Time to wait a killed process (in seconds)
48 WAIT_KILLED_TIMEOUT = 60.0
49
50
51 # We do not use a generator so multiple threads can call next().
52 class ESC[4;38;5;81mMultiprocessIterator:
53
54 """A thread-safe iterator over tests for multiprocess mode."""
55
56 def __init__(self, tests_iter):
57 self.lock = threading.Lock()
58 self.tests_iter = tests_iter
59
60 def __iter__(self):
61 return self
62
63 def __next__(self):
64 with self.lock:
65 if self.tests_iter is None:
66 raise StopIteration
67 return next(self.tests_iter)
68
69 def stop(self):
70 with self.lock:
71 self.tests_iter = None
72
73
74 @dataclasses.dataclass(slots=True, frozen=True)
75 class ESC[4;38;5;81mMultiprocessResult:
76 result: TestResult
77 # bpo-45410: stderr is written into stdout to keep messages order
78 worker_stdout: str | None = None
79 err_msg: str | None = None
80
81
82 ExcStr = str
83 QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr]
84
85
86 class ESC[4;38;5;81mExitThread(ESC[4;38;5;149mException):
87 pass
88
89
90 class ESC[4;38;5;81mWorkerError(ESC[4;38;5;149mException):
91 def __init__(self,
92 test_name: TestName,
93 err_msg: str | None,
94 stdout: str | None,
95 state: str):
96 result = TestResult(test_name, state=state)
97 self.mp_result = MultiprocessResult(result, stdout, err_msg)
98 super().__init__()
99
100
101 class ESC[4;38;5;81mWorkerThread(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
102 def __init__(self, worker_id: int, runner: "RunWorkers") -> None:
103 super().__init__()
104 self.worker_id = worker_id
105 self.runtests = runner.runtests
106 self.pending = runner.pending
107 self.output = runner.output
108 self.timeout = runner.worker_timeout
109 self.log = runner.log
110 self.test_name: TestName | None = None
111 self.start_time: float | None = None
112 self._popen: subprocess.Popen[str] | None = None
113 self._killed = False
114 self._stopped = False
115
116 def __repr__(self) -> str:
117 info = [f'WorkerThread #{self.worker_id}']
118 if self.is_alive():
119 info.append("running")
120 else:
121 info.append('stopped')
122 test = self.test_name
123 if test:
124 info.append(f'test={test}')
125 popen = self._popen
126 if popen is not None:
127 dt = time.monotonic() - self.start_time
128 info.extend((f'pid={self._popen.pid}',
129 f'time={format_duration(dt)}'))
130 return '<%s>' % ' '.join(info)
131
132 def _kill(self) -> None:
133 popen = self._popen
134 if popen is None:
135 return
136
137 if self._killed:
138 return
139 self._killed = True
140
141 if USE_PROCESS_GROUP:
142 what = f"{self} process group"
143 else:
144 what = f"{self} process"
145
146 print(f"Kill {what}", file=sys.stderr, flush=True)
147 try:
148 if USE_PROCESS_GROUP:
149 os.killpg(popen.pid, signal.SIGKILL)
150 else:
151 popen.kill()
152 except ProcessLookupError:
153 # popen.kill(): the process completed, the WorkerThread thread
154 # read its exit status, but Popen.send_signal() read the returncode
155 # just before Popen.wait() set returncode.
156 pass
157 except OSError as exc:
158 print_warning(f"Failed to kill {what}: {exc!r}")
159
160 def stop(self) -> None:
161 # Method called from a different thread to stop this thread
162 self._stopped = True
163 self._kill()
164
165 def _run_process(self, runtests: WorkerRunTests, output_fd: int,
166 tmp_dir: StrPath | None = None) -> int | None:
167 popen = create_worker_process(runtests, output_fd, tmp_dir)
168 self._popen = popen
169 self._killed = False
170
171 try:
172 if self._stopped:
173 # If kill() has been called before self._popen is set,
174 # self._popen is still running. Call again kill()
175 # to ensure that the process is killed.
176 self._kill()
177 raise ExitThread
178
179 try:
180 # gh-94026: stdout+stderr are written to tempfile
181 retcode = popen.wait(timeout=self.timeout)
182 assert retcode is not None
183 return retcode
184 except subprocess.TimeoutExpired:
185 if self._stopped:
186 # kill() has been called: communicate() fails on reading
187 # closed stdout
188 raise ExitThread
189
190 # On timeout, kill the process
191 self._kill()
192
193 # None means TIMEOUT for the caller
194 retcode = None
195 # bpo-38207: Don't attempt to call communicate() again: on it
196 # can hang until all child processes using stdout
197 # pipes completes.
198 except OSError:
199 if self._stopped:
200 # kill() has been called: communicate() fails
201 # on reading closed stdout
202 raise ExitThread
203 raise
204 except:
205 self._kill()
206 raise
207 finally:
208 self._wait_completed()
209 self._popen = None
210
211 def create_stdout(self, stack: contextlib.ExitStack) -> TextIO:
212 """Create stdout temporay file (file descriptor)."""
213
214 if MS_WINDOWS:
215 # gh-95027: When stdout is not a TTY, Python uses the ANSI code
216 # page for the sys.stdout encoding. If the main process runs in a
217 # terminal, sys.stdout uses WindowsConsoleIO with UTF-8 encoding.
218 encoding = locale.getencoding()
219 else:
220 encoding = sys.stdout.encoding
221
222 # gh-94026: Write stdout+stderr to a tempfile as workaround for
223 # non-blocking pipes on Emscripten with NodeJS.
224 # gh-109425: Use "backslashreplace" error handler: log corrupted
225 # stdout+stderr, instead of failing with a UnicodeDecodeError and not
226 # logging stdout+stderr at all.
227 stdout_file = tempfile.TemporaryFile('w+',
228 encoding=encoding,
229 errors='backslashreplace')
230 stack.enter_context(stdout_file)
231 return stdout_file
232
233 def create_json_file(self, stack: contextlib.ExitStack) -> tuple[JsonFile, TextIO | None]:
234 """Create JSON file."""
235
236 json_file_use_stdout = self.runtests.json_file_use_stdout()
237 if json_file_use_stdout:
238 json_file = JsonFile(None, JsonFileType.STDOUT)
239 json_tmpfile = None
240 else:
241 json_tmpfile = tempfile.TemporaryFile('w+', encoding='utf8')
242 stack.enter_context(json_tmpfile)
243
244 json_fd = json_tmpfile.fileno()
245 if MS_WINDOWS:
246 # The msvcrt module is only available on Windows;
247 # we run mypy with `--platform=linux` in CI
248 json_handle: int = msvcrt.get_osfhandle(json_fd) # type: ignore[attr-defined]
249 json_file = JsonFile(json_handle,
250 JsonFileType.WINDOWS_HANDLE)
251 else:
252 json_file = JsonFile(json_fd, JsonFileType.UNIX_FD)
253 return (json_file, json_tmpfile)
254
255 def create_worker_runtests(self, test_name: TestName, json_file: JsonFile) -> WorkerRunTests:
256 tests = (test_name,)
257 if self.runtests.rerun:
258 match_tests = self.runtests.get_match_tests(test_name)
259 else:
260 match_tests = None
261
262 kwargs: dict[str, Any] = {}
263 if match_tests:
264 kwargs['match_tests'] = [(test, True) for test in match_tests]
265 if self.runtests.output_on_failure:
266 kwargs['verbose'] = True
267 kwargs['output_on_failure'] = False
268 return self.runtests.create_worker_runtests(
269 tests=tests,
270 json_file=json_file,
271 **kwargs)
272
273 def run_tmp_files(self, worker_runtests: WorkerRunTests,
274 stdout_fd: int) -> tuple[int | None, list[StrPath]]:
275 # gh-93353: Check for leaked temporary files in the parent process,
276 # since the deletion of temporary files can happen late during
277 # Python finalization: too late for libregrtest.
278 if not support.is_wasi:
279 # Don't check for leaked temporary files and directories if Python is
280 # run on WASI. WASI don't pass environment variables like TMPDIR to
281 # worker processes.
282 tmp_dir = tempfile.mkdtemp(prefix="test_python_")
283 tmp_dir = os.path.abspath(tmp_dir)
284 try:
285 retcode = self._run_process(worker_runtests,
286 stdout_fd, tmp_dir)
287 finally:
288 tmp_files = os.listdir(tmp_dir)
289 os_helper.rmtree(tmp_dir)
290 else:
291 retcode = self._run_process(worker_runtests, stdout_fd)
292 tmp_files = []
293
294 return (retcode, tmp_files)
295
296 def read_stdout(self, stdout_file: TextIO) -> str:
297 stdout_file.seek(0)
298 try:
299 return stdout_file.read().strip()
300 except Exception as exc:
301 # gh-101634: Catch UnicodeDecodeError if stdout cannot be
302 # decoded from encoding
303 raise WorkerError(self.test_name,
304 f"Cannot read process stdout: {exc}",
305 stdout=None,
306 state=State.WORKER_BUG)
307
308 def read_json(self, json_file: JsonFile, json_tmpfile: TextIO | None,
309 stdout: str) -> tuple[TestResult, str]:
310 try:
311 if json_tmpfile is not None:
312 json_tmpfile.seek(0)
313 worker_json = json_tmpfile.read()
314 elif json_file.file_type == JsonFileType.STDOUT:
315 stdout, _, worker_json = stdout.rpartition("\n")
316 stdout = stdout.rstrip()
317 else:
318 with json_file.open(encoding='utf8') as json_fp:
319 worker_json = json_fp.read()
320 except Exception as exc:
321 # gh-101634: Catch UnicodeDecodeError if stdout cannot be
322 # decoded from encoding
323 err_msg = f"Failed to read worker process JSON: {exc}"
324 raise WorkerError(self.test_name, err_msg, stdout,
325 state=State.WORKER_BUG)
326
327 if not worker_json:
328 raise WorkerError(self.test_name, "empty JSON", stdout,
329 state=State.WORKER_BUG)
330
331 try:
332 result = TestResult.from_json(worker_json)
333 except Exception as exc:
334 # gh-101634: Catch UnicodeDecodeError if stdout cannot be
335 # decoded from encoding
336 err_msg = f"Failed to parse worker process JSON: {exc}"
337 raise WorkerError(self.test_name, err_msg, stdout,
338 state=State.WORKER_BUG)
339
340 return (result, stdout)
341
342 def _runtest(self, test_name: TestName) -> MultiprocessResult:
343 with contextlib.ExitStack() as stack:
344 stdout_file = self.create_stdout(stack)
345 json_file, json_tmpfile = self.create_json_file(stack)
346 worker_runtests = self.create_worker_runtests(test_name, json_file)
347
348 retcode: str | int | None
349 retcode, tmp_files = self.run_tmp_files(worker_runtests,
350 stdout_file.fileno())
351
352 stdout = self.read_stdout(stdout_file)
353
354 if retcode is None:
355 raise WorkerError(self.test_name, stdout=stdout,
356 err_msg=None,
357 state=State.TIMEOUT)
358 if retcode != 0:
359 name = get_signal_name(retcode)
360 if name:
361 retcode = f"{retcode} ({name})"
362 raise WorkerError(self.test_name, f"Exit code {retcode}", stdout,
363 state=State.WORKER_FAILED)
364
365 result, stdout = self.read_json(json_file, json_tmpfile, stdout)
366
367 if tmp_files:
368 msg = (f'\n\n'
369 f'Warning -- {test_name} leaked temporary files '
370 f'({len(tmp_files)}): {", ".join(sorted(tmp_files))}')
371 stdout += msg
372 result.set_env_changed()
373
374 return MultiprocessResult(result, stdout)
375
376 def run(self) -> None:
377 fail_fast = self.runtests.fail_fast
378 fail_env_changed = self.runtests.fail_env_changed
379 while not self._stopped:
380 try:
381 try:
382 test_name = next(self.pending)
383 except StopIteration:
384 break
385
386 self.start_time = time.monotonic()
387 self.test_name = test_name
388 try:
389 mp_result = self._runtest(test_name)
390 except WorkerError as exc:
391 mp_result = exc.mp_result
392 finally:
393 self.test_name = None
394 mp_result.result.duration = time.monotonic() - self.start_time
395 self.output.put((False, mp_result))
396
397 if mp_result.result.must_stop(fail_fast, fail_env_changed):
398 break
399 except ExitThread:
400 break
401 except BaseException:
402 self.output.put((True, traceback.format_exc()))
403 break
404
405 def _wait_completed(self) -> None:
406 popen = self._popen
407
408 try:
409 popen.wait(WAIT_COMPLETED_TIMEOUT)
410 except (subprocess.TimeoutExpired, OSError) as exc:
411 print_warning(f"Failed to wait for {self} completion "
412 f"(timeout={format_duration(WAIT_COMPLETED_TIMEOUT)}): "
413 f"{exc!r}")
414
415 def wait_stopped(self, start_time: float) -> None:
416 # bpo-38207: RunWorkers.stop_workers() called self.stop()
417 # which killed the process. Sometimes, killing the process from the
418 # main thread does not interrupt popen.communicate() in
419 # WorkerThread thread. This loop with a timeout is a workaround
420 # for that.
421 #
422 # Moreover, if this method fails to join the thread, it is likely
423 # that Python will hang at exit while calling threading._shutdown()
424 # which tries again to join the blocked thread. Regrtest.main()
425 # uses EXIT_TIMEOUT to workaround this second bug.
426 while True:
427 # Write a message every second
428 self.join(1.0)
429 if not self.is_alive():
430 break
431 dt = time.monotonic() - start_time
432 self.log(f"Waiting for {self} thread for {format_duration(dt)}")
433 if dt > WAIT_KILLED_TIMEOUT:
434 print_warning(f"Failed to join {self} in {format_duration(dt)}")
435 break
436
437
438 def get_running(workers: list[WorkerThread]) -> str | None:
439 running: list[str] = []
440 for worker in workers:
441 test_name = worker.test_name
442 if not test_name:
443 continue
444 dt = time.monotonic() - worker.start_time
445 if dt >= PROGRESS_MIN_TIME:
446 text = f'{test_name} ({format_duration(dt)})'
447 running.append(text)
448 if not running:
449 return None
450 return f"running ({len(running)}): {', '.join(running)}"
451
452
453 class ESC[4;38;5;81mRunWorkers:
454 def __init__(self, num_workers: int, runtests: RunTests,
455 logger: Logger, results: TestResults) -> None:
456 self.num_workers = num_workers
457 self.runtests = runtests
458 self.log = logger.log
459 self.display_progress = logger.display_progress
460 self.results: TestResults = results
461
462 self.output: queue.Queue[QueueOutput] = queue.Queue()
463 tests_iter = runtests.iter_tests()
464 self.pending = MultiprocessIterator(tests_iter)
465 self.timeout = runtests.timeout
466 if self.timeout is not None:
467 # Rely on faulthandler to kill a worker process. This timouet is
468 # when faulthandler fails to kill a worker process. Give a maximum
469 # of 5 minutes to faulthandler to kill the worker.
470 self.worker_timeout: float | None = min(self.timeout * 1.5, self.timeout + 5 * 60)
471 else:
472 self.worker_timeout = None
473 self.workers: list[WorkerThread] | None = None
474
475 jobs = self.runtests.get_jobs()
476 if jobs is not None:
477 # Don't spawn more threads than the number of jobs:
478 # these worker threads would never get anything to do.
479 self.num_workers = min(self.num_workers, jobs)
480
481 def start_workers(self) -> None:
482 self.workers = [WorkerThread(index, self)
483 for index in range(1, self.num_workers + 1)]
484 jobs = self.runtests.get_jobs()
485 if jobs is not None:
486 tests = count(jobs, 'test')
487 else:
488 tests = 'tests'
489 nworkers = len(self.workers)
490 processes = plural(nworkers, "process", "processes")
491 msg = (f"Run {tests} in parallel using "
492 f"{nworkers} worker {processes}")
493 if self.timeout:
494 msg += (" (timeout: %s, worker timeout: %s)"
495 % (format_duration(self.timeout),
496 format_duration(self.worker_timeout)))
497 self.log(msg)
498 for worker in self.workers:
499 worker.start()
500
501 def stop_workers(self) -> None:
502 start_time = time.monotonic()
503 for worker in self.workers:
504 worker.stop()
505 for worker in self.workers:
506 worker.wait_stopped(start_time)
507
508 def _get_result(self) -> QueueOutput | None:
509 pgo = self.runtests.pgo
510 use_faulthandler = (self.timeout is not None)
511
512 # bpo-46205: check the status of workers every iteration to avoid
513 # waiting forever on an empty queue.
514 while any(worker.is_alive() for worker in self.workers):
515 if use_faulthandler:
516 faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT,
517 exit=True)
518
519 # wait for a thread
520 try:
521 return self.output.get(timeout=PROGRESS_UPDATE)
522 except queue.Empty:
523 pass
524
525 if not pgo:
526 # display progress
527 running = get_running(self.workers)
528 if running:
529 self.log(running)
530
531 # all worker threads are done: consume pending results
532 try:
533 return self.output.get(timeout=0)
534 except queue.Empty:
535 return None
536
537 def display_result(self, mp_result: MultiprocessResult) -> None:
538 result = mp_result.result
539 pgo = self.runtests.pgo
540
541 text = str(result)
542 if mp_result.err_msg:
543 # WORKER_BUG
544 text += ' (%s)' % mp_result.err_msg
545 elif (result.duration >= PROGRESS_MIN_TIME and not pgo):
546 text += ' (%s)' % format_duration(result.duration)
547 if not pgo:
548 running = get_running(self.workers)
549 if running:
550 text += f' -- {running}'
551 self.display_progress(self.test_index, text)
552
553 def _process_result(self, item: QueueOutput) -> TestResult:
554 """Returns True if test runner must stop."""
555 if item[0]:
556 # Thread got an exception
557 format_exc = item[1]
558 print_warning(f"regrtest worker thread failed: {format_exc}")
559 result = TestResult("<regrtest worker>", state=State.WORKER_BUG)
560 self.results.accumulate_result(result, self.runtests)
561 return result
562
563 self.test_index += 1
564 mp_result = item[1]
565 result = mp_result.result
566 self.results.accumulate_result(result, self.runtests)
567 self.display_result(mp_result)
568
569 # Display worker stdout
570 if not self.runtests.output_on_failure:
571 show_stdout = True
572 else:
573 # --verbose3 ignores stdout on success
574 show_stdout = (result.state != State.PASSED)
575 if show_stdout:
576 stdout = mp_result.worker_stdout
577 if stdout:
578 print(stdout, flush=True)
579
580 return result
581
582 def run(self) -> None:
583 fail_fast = self.runtests.fail_fast
584 fail_env_changed = self.runtests.fail_env_changed
585
586 self.start_workers()
587
588 self.test_index = 0
589 try:
590 while True:
591 item = self._get_result()
592 if item is None:
593 break
594
595 result = self._process_result(item)
596 if result.must_stop(fail_fast, fail_env_changed):
597 break
598 except KeyboardInterrupt:
599 print()
600 self.results.interrupted = True
601 finally:
602 if self.timeout is not None:
603 faulthandler.cancel_dump_traceback_later()
604
605 # Always ensure that all worker processes are no longer
606 # worker when we exit this function
607 self.pending.stop()
608 self.stop_workers()