1 from contextlib import contextmanager
2 import datetime
3 import faulthandler
4 import os
5 import re
6 import signal
7 import subprocess
8 import sys
9 from test import support
10 from test.support import os_helper, script_helper, is_android, MS_WINDOWS
11 import tempfile
12 import unittest
13 from textwrap import dedent
14
15 try:
16 import _testcapi
17 except ImportError:
18 _testcapi = None
19
20 if not support.has_subprocess_support:
21 raise unittest.SkipTest("test module requires subprocess")
22
23 TIMEOUT = 0.5
24
25
26 def expected_traceback(lineno1, lineno2, header, min_count=1):
27 regex = header
28 regex += ' File "<string>", line %s in func\n' % lineno1
29 regex += ' File "<string>", line %s in <module>' % lineno2
30 if 1 < min_count:
31 return '^' + (regex + '\n') * (min_count - 1) + regex
32 else:
33 return '^' + regex + '$'
34
35 def skip_segfault_on_android(test):
36 # gh-76319: Raising SIGSEGV on Android may not cause a crash.
37 return unittest.skipIf(is_android,
38 'raising SIGSEGV on Android is unreliable')(test)
39
40 @contextmanager
41 def temporary_filename():
42 filename = tempfile.mktemp()
43 try:
44 yield filename
45 finally:
46 os_helper.unlink(filename)
47
48 class ESC[4;38;5;81mFaultHandlerTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
49
50 def get_output(self, code, filename=None, fd=None):
51 """
52 Run the specified code in Python (in a new child process) and read the
53 output from the standard error or from a file (if filename is set).
54 Return the output lines as a list.
55
56 Strip the reference count from the standard error for Python debug
57 build, and replace "Current thread 0x00007f8d8fbd9700" by "Current
58 thread XXX".
59 """
60 code = dedent(code).strip()
61 pass_fds = []
62 if fd is not None:
63 pass_fds.append(fd)
64 env = dict(os.environ)
65
66 # Sanitizers must not handle SIGSEGV (ex: for test_enable_fd())
67 option = 'handle_segv=0'
68 support.set_sanitizer_env_var(env, option)
69
70 with support.SuppressCrashReport():
71 process = script_helper.spawn_python('-c', code,
72 pass_fds=pass_fds,
73 env=env)
74 with process:
75 output, stderr = process.communicate()
76 exitcode = process.wait()
77 output = output.decode('ascii', 'backslashreplace')
78 if filename:
79 self.assertEqual(output, '')
80 with open(filename, "rb") as fp:
81 output = fp.read()
82 output = output.decode('ascii', 'backslashreplace')
83 elif fd is not None:
84 self.assertEqual(output, '')
85 os.lseek(fd, os.SEEK_SET, 0)
86 with open(fd, "rb", closefd=False) as fp:
87 output = fp.read()
88 output = output.decode('ascii', 'backslashreplace')
89 return output.splitlines(), exitcode
90
91 def check_error(self, code, lineno, fatal_error, *,
92 filename=None, all_threads=True, other_regex=None,
93 fd=None, know_current_thread=True,
94 py_fatal_error=False,
95 garbage_collecting=False,
96 function='<module>'):
97 """
98 Check that the fault handler for fatal errors is enabled and check the
99 traceback from the child process output.
100
101 Raise an error if the output doesn't match the expected format.
102 """
103 if all_threads:
104 if know_current_thread:
105 header = 'Current thread 0x[0-9a-f]+'
106 else:
107 header = 'Thread 0x[0-9a-f]+'
108 else:
109 header = 'Stack'
110 regex = [f'^{fatal_error}']
111 if py_fatal_error:
112 regex.append("Python runtime state: initialized")
113 regex.append('')
114 regex.append(fr'{header} \(most recent call first\):')
115 if garbage_collecting:
116 regex.append(' Garbage-collecting')
117 regex.append(fr' File "<string>", line {lineno} in {function}')
118 regex = '\n'.join(regex)
119
120 if other_regex:
121 regex = f'(?:{regex}|{other_regex})'
122
123 # Enable MULTILINE flag
124 regex = f'(?m){regex}'
125 output, exitcode = self.get_output(code, filename=filename, fd=fd)
126 output = '\n'.join(output)
127 self.assertRegex(output, regex)
128 self.assertNotEqual(exitcode, 0)
129
130 def check_fatal_error(self, code, line_number, name_regex, func=None, **kw):
131 if func:
132 name_regex = '%s: %s' % (func, name_regex)
133 fatal_error = 'Fatal Python error: %s' % name_regex
134 self.check_error(code, line_number, fatal_error, **kw)
135
136 def check_windows_exception(self, code, line_number, name_regex, **kw):
137 fatal_error = 'Windows fatal exception: %s' % name_regex
138 self.check_error(code, line_number, fatal_error, **kw)
139
140 @unittest.skipIf(sys.platform.startswith('aix'),
141 "the first page of memory is a mapped read-only on AIX")
142 def test_read_null(self):
143 if not MS_WINDOWS:
144 self.check_fatal_error("""
145 import faulthandler
146 faulthandler.enable()
147 faulthandler._read_null()
148 """,
149 3,
150 # Issue #12700: Read NULL raises SIGILL on Mac OS X Lion
151 '(?:Segmentation fault'
152 '|Bus error'
153 '|Illegal instruction)')
154 else:
155 self.check_windows_exception("""
156 import faulthandler
157 faulthandler.enable()
158 faulthandler._read_null()
159 """,
160 3,
161 'access violation')
162
163 @skip_segfault_on_android
164 def test_sigsegv(self):
165 self.check_fatal_error("""
166 import faulthandler
167 faulthandler.enable()
168 faulthandler._sigsegv()
169 """,
170 3,
171 'Segmentation fault')
172
173 @skip_segfault_on_android
174 def test_gc(self):
175 # bpo-44466: Detect if the GC is running
176 self.check_fatal_error("""
177 import faulthandler
178 import gc
179 import sys
180
181 faulthandler.enable()
182
183 class RefCycle:
184 def __del__(self):
185 faulthandler._sigsegv()
186
187 # create a reference cycle which triggers a fatal
188 # error in a destructor
189 a = RefCycle()
190 b = RefCycle()
191 a.b = b
192 b.a = a
193
194 # Delete the objects, not the cycle
195 a = None
196 b = None
197
198 # Break the reference cycle: call __del__()
199 gc.collect()
200
201 # Should not reach this line
202 print("exit", file=sys.stderr)
203 """,
204 9,
205 'Segmentation fault',
206 function='__del__',
207 garbage_collecting=True)
208
209 def test_fatal_error_c_thread(self):
210 self.check_fatal_error("""
211 import faulthandler
212 faulthandler.enable()
213 faulthandler._fatal_error_c_thread()
214 """,
215 3,
216 'in new thread',
217 know_current_thread=False,
218 func='faulthandler_fatal_error_thread',
219 py_fatal_error=True)
220
221 def test_sigabrt(self):
222 self.check_fatal_error("""
223 import faulthandler
224 faulthandler.enable()
225 faulthandler._sigabrt()
226 """,
227 3,
228 'Aborted')
229
230 @unittest.skipIf(sys.platform == 'win32',
231 "SIGFPE cannot be caught on Windows")
232 def test_sigfpe(self):
233 self.check_fatal_error("""
234 import faulthandler
235 faulthandler.enable()
236 faulthandler._sigfpe()
237 """,
238 3,
239 'Floating point exception')
240
241 @unittest.skipIf(_testcapi is None, 'need _testcapi')
242 @unittest.skipUnless(hasattr(signal, 'SIGBUS'), 'need signal.SIGBUS')
243 @skip_segfault_on_android
244 def test_sigbus(self):
245 self.check_fatal_error("""
246 import faulthandler
247 import signal
248
249 faulthandler.enable()
250 signal.raise_signal(signal.SIGBUS)
251 """,
252 5,
253 'Bus error')
254
255 @unittest.skipIf(_testcapi is None, 'need _testcapi')
256 @unittest.skipUnless(hasattr(signal, 'SIGILL'), 'need signal.SIGILL')
257 @skip_segfault_on_android
258 def test_sigill(self):
259 self.check_fatal_error("""
260 import faulthandler
261 import signal
262
263 faulthandler.enable()
264 signal.raise_signal(signal.SIGILL)
265 """,
266 5,
267 'Illegal instruction')
268
269 def check_fatal_error_func(self, release_gil):
270 # Test that Py_FatalError() dumps a traceback
271 with support.SuppressCrashReport():
272 self.check_fatal_error(f"""
273 import _testcapi
274 _testcapi.fatal_error(b'xyz', {release_gil})
275 """,
276 2,
277 'xyz',
278 func='test_fatal_error',
279 py_fatal_error=True)
280
281 def test_fatal_error(self):
282 self.check_fatal_error_func(False)
283
284 def test_fatal_error_without_gil(self):
285 self.check_fatal_error_func(True)
286
287 @unittest.skipIf(sys.platform.startswith('openbsd'),
288 "Issue #12868: sigaltstack() doesn't work on "
289 "OpenBSD if Python is compiled with pthread")
290 @unittest.skipIf(not hasattr(faulthandler, '_stack_overflow'),
291 'need faulthandler._stack_overflow()')
292 def test_stack_overflow(self):
293 self.check_fatal_error("""
294 import faulthandler
295 faulthandler.enable()
296 faulthandler._stack_overflow()
297 """,
298 3,
299 '(?:Segmentation fault|Bus error)',
300 other_regex='unable to raise a stack overflow')
301
302 @skip_segfault_on_android
303 def test_gil_released(self):
304 self.check_fatal_error("""
305 import faulthandler
306 faulthandler.enable()
307 faulthandler._sigsegv(True)
308 """,
309 3,
310 'Segmentation fault')
311
312 @skip_segfault_on_android
313 def test_enable_file(self):
314 with temporary_filename() as filename:
315 self.check_fatal_error("""
316 import faulthandler
317 output = open({filename}, 'wb')
318 faulthandler.enable(output)
319 faulthandler._sigsegv()
320 """.format(filename=repr(filename)),
321 4,
322 'Segmentation fault',
323 filename=filename)
324
325 @unittest.skipIf(sys.platform == "win32",
326 "subprocess doesn't support pass_fds on Windows")
327 @skip_segfault_on_android
328 def test_enable_fd(self):
329 with tempfile.TemporaryFile('wb+') as fp:
330 fd = fp.fileno()
331 self.check_fatal_error("""
332 import faulthandler
333 import sys
334 faulthandler.enable(%s)
335 faulthandler._sigsegv()
336 """ % fd,
337 4,
338 'Segmentation fault',
339 fd=fd)
340
341 @skip_segfault_on_android
342 def test_enable_single_thread(self):
343 self.check_fatal_error("""
344 import faulthandler
345 faulthandler.enable(all_threads=False)
346 faulthandler._sigsegv()
347 """,
348 3,
349 'Segmentation fault',
350 all_threads=False)
351
352 @skip_segfault_on_android
353 def test_disable(self):
354 code = """
355 import faulthandler
356 faulthandler.enable()
357 faulthandler.disable()
358 faulthandler._sigsegv()
359 """
360 not_expected = 'Fatal Python error'
361 stderr, exitcode = self.get_output(code)
362 stderr = '\n'.join(stderr)
363 self.assertTrue(not_expected not in stderr,
364 "%r is present in %r" % (not_expected, stderr))
365 self.assertNotEqual(exitcode, 0)
366
367 @skip_segfault_on_android
368 def test_dump_ext_modules(self):
369 code = """
370 import faulthandler
371 import sys
372 # Don't filter stdlib module names
373 sys.stdlib_module_names = frozenset()
374 faulthandler.enable()
375 faulthandler._sigsegv()
376 """
377 stderr, exitcode = self.get_output(code)
378 stderr = '\n'.join(stderr)
379 match = re.search(r'^Extension modules:(.*) \(total: [0-9]+\)$',
380 stderr, re.MULTILINE)
381 if not match:
382 self.fail(f"Cannot find 'Extension modules:' in {stderr!r}")
383 modules = set(match.group(1).strip().split(', '))
384 for name in ('sys', 'faulthandler'):
385 self.assertIn(name, modules)
386
387 def test_is_enabled(self):
388 orig_stderr = sys.stderr
389 try:
390 # regrtest may replace sys.stderr by io.StringIO object, but
391 # faulthandler.enable() requires that sys.stderr has a fileno()
392 # method
393 sys.stderr = sys.__stderr__
394
395 was_enabled = faulthandler.is_enabled()
396 try:
397 faulthandler.enable()
398 self.assertTrue(faulthandler.is_enabled())
399 faulthandler.disable()
400 self.assertFalse(faulthandler.is_enabled())
401 finally:
402 if was_enabled:
403 faulthandler.enable()
404 else:
405 faulthandler.disable()
406 finally:
407 sys.stderr = orig_stderr
408
409 @support.requires_subprocess()
410 def test_disabled_by_default(self):
411 # By default, the module should be disabled
412 code = "import faulthandler; print(faulthandler.is_enabled())"
413 args = (sys.executable, "-E", "-c", code)
414 # don't use assert_python_ok() because it always enables faulthandler
415 output = subprocess.check_output(args)
416 self.assertEqual(output.rstrip(), b"False")
417
418 @support.requires_subprocess()
419 def test_sys_xoptions(self):
420 # Test python -X faulthandler
421 code = "import faulthandler; print(faulthandler.is_enabled())"
422 args = filter(None, (sys.executable,
423 "-E" if sys.flags.ignore_environment else "",
424 "-X", "faulthandler", "-c", code))
425 env = os.environ.copy()
426 env.pop("PYTHONFAULTHANDLER", None)
427 # don't use assert_python_ok() because it always enables faulthandler
428 output = subprocess.check_output(args, env=env)
429 self.assertEqual(output.rstrip(), b"True")
430
431 @support.requires_subprocess()
432 def test_env_var(self):
433 # empty env var
434 code = "import faulthandler; print(faulthandler.is_enabled())"
435 args = (sys.executable, "-c", code)
436 env = dict(os.environ)
437 env['PYTHONFAULTHANDLER'] = ''
438 env['PYTHONDEVMODE'] = ''
439 # don't use assert_python_ok() because it always enables faulthandler
440 output = subprocess.check_output(args, env=env)
441 self.assertEqual(output.rstrip(), b"False")
442
443 # non-empty env var
444 env = dict(os.environ)
445 env['PYTHONFAULTHANDLER'] = '1'
446 env['PYTHONDEVMODE'] = ''
447 output = subprocess.check_output(args, env=env)
448 self.assertEqual(output.rstrip(), b"True")
449
450 def check_dump_traceback(self, *, filename=None, fd=None):
451 """
452 Explicitly call dump_traceback() function and check its output.
453 Raise an error if the output doesn't match the expected format.
454 """
455 code = """
456 import faulthandler
457
458 filename = {filename!r}
459 fd = {fd}
460
461 def funcB():
462 if filename:
463 with open(filename, "wb") as fp:
464 faulthandler.dump_traceback(fp, all_threads=False)
465 elif fd is not None:
466 faulthandler.dump_traceback(fd,
467 all_threads=False)
468 else:
469 faulthandler.dump_traceback(all_threads=False)
470
471 def funcA():
472 funcB()
473
474 funcA()
475 """
476 code = code.format(
477 filename=filename,
478 fd=fd,
479 )
480 if filename:
481 lineno = 9
482 elif fd is not None:
483 lineno = 11
484 else:
485 lineno = 14
486 expected = [
487 'Stack (most recent call first):',
488 ' File "<string>", line %s in funcB' % lineno,
489 ' File "<string>", line 17 in funcA',
490 ' File "<string>", line 19 in <module>'
491 ]
492 trace, exitcode = self.get_output(code, filename, fd)
493 self.assertEqual(trace, expected)
494 self.assertEqual(exitcode, 0)
495
496 def test_dump_traceback(self):
497 self.check_dump_traceback()
498
499 def test_dump_traceback_file(self):
500 with temporary_filename() as filename:
501 self.check_dump_traceback(filename=filename)
502
503 @unittest.skipIf(sys.platform == "win32",
504 "subprocess doesn't support pass_fds on Windows")
505 def test_dump_traceback_fd(self):
506 with tempfile.TemporaryFile('wb+') as fp:
507 self.check_dump_traceback(fd=fp.fileno())
508
509 def test_truncate(self):
510 maxlen = 500
511 func_name = 'x' * (maxlen + 50)
512 truncated = 'x' * maxlen + '...'
513 code = """
514 import faulthandler
515
516 def {func_name}():
517 faulthandler.dump_traceback(all_threads=False)
518
519 {func_name}()
520 """
521 code = code.format(
522 func_name=func_name,
523 )
524 expected = [
525 'Stack (most recent call first):',
526 ' File "<string>", line 4 in %s' % truncated,
527 ' File "<string>", line 6 in <module>'
528 ]
529 trace, exitcode = self.get_output(code)
530 self.assertEqual(trace, expected)
531 self.assertEqual(exitcode, 0)
532
533 def check_dump_traceback_threads(self, filename):
534 """
535 Call explicitly dump_traceback(all_threads=True) and check the output.
536 Raise an error if the output doesn't match the expected format.
537 """
538 code = """
539 import faulthandler
540 from threading import Thread, Event
541 import time
542
543 def dump():
544 if {filename}:
545 with open({filename}, "wb") as fp:
546 faulthandler.dump_traceback(fp, all_threads=True)
547 else:
548 faulthandler.dump_traceback(all_threads=True)
549
550 class Waiter(Thread):
551 # avoid blocking if the main thread raises an exception.
552 daemon = True
553
554 def __init__(self):
555 Thread.__init__(self)
556 self.running = Event()
557 self.stop = Event()
558
559 def run(self):
560 self.running.set()
561 self.stop.wait()
562
563 waiter = Waiter()
564 waiter.start()
565 waiter.running.wait()
566 dump()
567 waiter.stop.set()
568 waiter.join()
569 """
570 code = code.format(filename=repr(filename))
571 output, exitcode = self.get_output(code, filename)
572 output = '\n'.join(output)
573 if filename:
574 lineno = 8
575 else:
576 lineno = 10
577 regex = r"""
578 ^Thread 0x[0-9a-f]+ \(most recent call first\):
579 (?: File ".*threading.py", line [0-9]+ in [_a-z]+
580 ){{1,3}} File "<string>", line 23 in run
581 File ".*threading.py", line [0-9]+ in _bootstrap_inner
582 File ".*threading.py", line [0-9]+ in _bootstrap
583
584 Current thread 0x[0-9a-f]+ \(most recent call first\):
585 File "<string>", line {lineno} in dump
586 File "<string>", line 28 in <module>$
587 """
588 regex = dedent(regex.format(lineno=lineno)).strip()
589 self.assertRegex(output, regex)
590 self.assertEqual(exitcode, 0)
591
592 def test_dump_traceback_threads(self):
593 self.check_dump_traceback_threads(None)
594
595 def test_dump_traceback_threads_file(self):
596 with temporary_filename() as filename:
597 self.check_dump_traceback_threads(filename)
598
599 def check_dump_traceback_later(self, repeat=False, cancel=False, loops=1,
600 *, filename=None, fd=None):
601 """
602 Check how many times the traceback is written in timeout x 2.5 seconds,
603 or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending
604 on repeat and cancel options.
605
606 Raise an error if the output doesn't match the expect format.
607 """
608 timeout_str = str(datetime.timedelta(seconds=TIMEOUT))
609 code = """
610 import faulthandler
611 import time
612 import sys
613
614 timeout = {timeout}
615 repeat = {repeat}
616 cancel = {cancel}
617 loops = {loops}
618 filename = {filename!r}
619 fd = {fd}
620
621 def func(timeout, repeat, cancel, file, loops):
622 for loop in range(loops):
623 faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file)
624 if cancel:
625 faulthandler.cancel_dump_traceback_later()
626 time.sleep(timeout * 5)
627 faulthandler.cancel_dump_traceback_later()
628
629 if filename:
630 file = open(filename, "wb")
631 elif fd is not None:
632 file = sys.stderr.fileno()
633 else:
634 file = None
635 func(timeout, repeat, cancel, file, loops)
636 if filename:
637 file.close()
638 """
639 code = code.format(
640 timeout=TIMEOUT,
641 repeat=repeat,
642 cancel=cancel,
643 loops=loops,
644 filename=filename,
645 fd=fd,
646 )
647 trace, exitcode = self.get_output(code, filename)
648 trace = '\n'.join(trace)
649
650 if not cancel:
651 count = loops
652 if repeat:
653 count *= 2
654 header = r'Timeout \(%s\)!\nThread 0x[0-9a-f]+ \(most recent call first\):\n' % timeout_str
655 regex = expected_traceback(17, 26, header, min_count=count)
656 self.assertRegex(trace, regex)
657 else:
658 self.assertEqual(trace, '')
659 self.assertEqual(exitcode, 0)
660
661 def test_dump_traceback_later(self):
662 self.check_dump_traceback_later()
663
664 def test_dump_traceback_later_repeat(self):
665 self.check_dump_traceback_later(repeat=True)
666
667 def test_dump_traceback_later_cancel(self):
668 self.check_dump_traceback_later(cancel=True)
669
670 def test_dump_traceback_later_file(self):
671 with temporary_filename() as filename:
672 self.check_dump_traceback_later(filename=filename)
673
674 @unittest.skipIf(sys.platform == "win32",
675 "subprocess doesn't support pass_fds on Windows")
676 def test_dump_traceback_later_fd(self):
677 with tempfile.TemporaryFile('wb+') as fp:
678 self.check_dump_traceback_later(fd=fp.fileno())
679
680 @support.requires_resource('walltime')
681 def test_dump_traceback_later_twice(self):
682 self.check_dump_traceback_later(loops=2)
683
684 @unittest.skipIf(not hasattr(faulthandler, "register"),
685 "need faulthandler.register")
686 def check_register(self, filename=False, all_threads=False,
687 unregister=False, chain=False, fd=None):
688 """
689 Register a handler displaying the traceback on a user signal. Raise the
690 signal and check the written traceback.
691
692 If chain is True, check that the previous signal handler is called.
693
694 Raise an error if the output doesn't match the expected format.
695 """
696 signum = signal.SIGUSR1
697 code = """
698 import faulthandler
699 import os
700 import signal
701 import sys
702
703 all_threads = {all_threads}
704 signum = {signum:d}
705 unregister = {unregister}
706 chain = {chain}
707 filename = {filename!r}
708 fd = {fd}
709
710 def func(signum):
711 os.kill(os.getpid(), signum)
712
713 def handler(signum, frame):
714 handler.called = True
715 handler.called = False
716
717 if filename:
718 file = open(filename, "wb")
719 elif fd is not None:
720 file = sys.stderr.fileno()
721 else:
722 file = None
723 if chain:
724 signal.signal(signum, handler)
725 faulthandler.register(signum, file=file,
726 all_threads=all_threads, chain={chain})
727 if unregister:
728 faulthandler.unregister(signum)
729 func(signum)
730 if chain and not handler.called:
731 if file is not None:
732 output = file
733 else:
734 output = sys.stderr
735 print("Error: signal handler not called!", file=output)
736 exitcode = 1
737 else:
738 exitcode = 0
739 if filename:
740 file.close()
741 sys.exit(exitcode)
742 """
743 code = code.format(
744 all_threads=all_threads,
745 signum=signum,
746 unregister=unregister,
747 chain=chain,
748 filename=filename,
749 fd=fd,
750 )
751 trace, exitcode = self.get_output(code, filename)
752 trace = '\n'.join(trace)
753 if not unregister:
754 if all_threads:
755 regex = r'Current thread 0x[0-9a-f]+ \(most recent call first\):\n'
756 else:
757 regex = r'Stack \(most recent call first\):\n'
758 regex = expected_traceback(14, 32, regex)
759 self.assertRegex(trace, regex)
760 else:
761 self.assertEqual(trace, '')
762 if unregister:
763 self.assertNotEqual(exitcode, 0)
764 else:
765 self.assertEqual(exitcode, 0)
766
767 def test_register(self):
768 self.check_register()
769
770 def test_unregister(self):
771 self.check_register(unregister=True)
772
773 def test_register_file(self):
774 with temporary_filename() as filename:
775 self.check_register(filename=filename)
776
777 @unittest.skipIf(sys.platform == "win32",
778 "subprocess doesn't support pass_fds on Windows")
779 def test_register_fd(self):
780 with tempfile.TemporaryFile('wb+') as fp:
781 self.check_register(fd=fp.fileno())
782
783 def test_register_threads(self):
784 self.check_register(all_threads=True)
785
786 def test_register_chain(self):
787 self.check_register(chain=True)
788
789 @contextmanager
790 def check_stderr_none(self):
791 stderr = sys.stderr
792 try:
793 sys.stderr = None
794 with self.assertRaises(RuntimeError) as cm:
795 yield
796 self.assertEqual(str(cm.exception), "sys.stderr is None")
797 finally:
798 sys.stderr = stderr
799
800 def test_stderr_None(self):
801 # Issue #21497: provide a helpful error if sys.stderr is None,
802 # instead of just an attribute error: "None has no attribute fileno".
803 with self.check_stderr_none():
804 faulthandler.enable()
805 with self.check_stderr_none():
806 faulthandler.dump_traceback()
807 with self.check_stderr_none():
808 faulthandler.dump_traceback_later(1e-3)
809 if hasattr(faulthandler, "register"):
810 with self.check_stderr_none():
811 faulthandler.register(signal.SIGUSR1)
812
813 @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
814 def test_raise_exception(self):
815 for exc, name in (
816 ('EXCEPTION_ACCESS_VIOLATION', 'access violation'),
817 ('EXCEPTION_INT_DIVIDE_BY_ZERO', 'int divide by zero'),
818 ('EXCEPTION_STACK_OVERFLOW', 'stack overflow'),
819 ):
820 self.check_windows_exception(f"""
821 import faulthandler
822 faulthandler.enable()
823 faulthandler._raise_exception(faulthandler._{exc})
824 """,
825 3,
826 name)
827
828 @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
829 def test_ignore_exception(self):
830 for exc_code in (
831 0xE06D7363, # MSC exception ("Emsc")
832 0xE0434352, # COM Callable Runtime exception ("ECCR")
833 ):
834 code = f"""
835 import faulthandler
836 faulthandler.enable()
837 faulthandler._raise_exception({exc_code})
838 """
839 code = dedent(code)
840 output, exitcode = self.get_output(code)
841 self.assertEqual(output, [])
842 self.assertEqual(exitcode, exc_code)
843
844 @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
845 def test_raise_nonfatal_exception(self):
846 # These exceptions are not strictly errors. Letting
847 # faulthandler display the traceback when they are
848 # raised is likely to result in noise. However, they
849 # may still terminate the process if there is no
850 # handler installed for them (which there typically
851 # is, e.g. for debug messages).
852 for exc in (
853 0x00000000,
854 0x34567890,
855 0x40000000,
856 0x40001000,
857 0x70000000,
858 0x7FFFFFFF,
859 ):
860 output, exitcode = self.get_output(f"""
861 import faulthandler
862 faulthandler.enable()
863 faulthandler._raise_exception(0x{exc:x})
864 """
865 )
866 self.assertEqual(output, [])
867 # On Windows older than 7 SP1, the actual exception code has
868 # bit 29 cleared.
869 self.assertIn(exitcode,
870 (exc, exc & ~0x10000000))
871
872 @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
873 def test_disable_windows_exc_handler(self):
874 code = dedent("""
875 import faulthandler
876 faulthandler.enable()
877 faulthandler.disable()
878 code = faulthandler._EXCEPTION_ACCESS_VIOLATION
879 faulthandler._raise_exception(code)
880 """)
881 output, exitcode = self.get_output(code)
882 self.assertEqual(output, [])
883 self.assertEqual(exitcode, 0xC0000005)
884
885 def test_cancel_later_without_dump_traceback_later(self):
886 # bpo-37933: Calling cancel_dump_traceback_later()
887 # without dump_traceback_later() must not segfault.
888 code = dedent("""
889 import faulthandler
890 faulthandler.cancel_dump_traceback_later()
891 """)
892 output, exitcode = self.get_output(code)
893 self.assertEqual(output, [])
894 self.assertEqual(exitcode, 0)
895
896
897 if __name__ == "__main__":
898 unittest.main()