1 from contextlib import contextmanager
2 import datetime
3 import faulthandler
4 import os
5 import re
6 import signal
7 import subprocess
8 import sys
9 from test import support
10 from test.support import os_helper
11 from test.support import script_helper, is_android
12 from test.support import skip_if_sanitizer
13 import tempfile
14 import unittest
15 from textwrap import dedent
16
17 try:
18 import _testcapi
19 except ImportError:
20 _testcapi = None
21
22 if not support.has_subprocess_support:
23 raise unittest.SkipTest("test module requires subprocess")
24
25 TIMEOUT = 0.5
26 MS_WINDOWS = (os.name == 'nt')
27
28
29 def expected_traceback(lineno1, lineno2, header, min_count=1):
30 regex = header
31 regex += ' File "<string>", line %s in func\n' % lineno1
32 regex += ' File "<string>", line %s in <module>' % lineno2
33 if 1 < min_count:
34 return '^' + (regex + '\n') * (min_count - 1) + regex
35 else:
36 return '^' + regex + '$'
37
38 def skip_segfault_on_android(test):
39 # Issue #32138: Raising SIGSEGV on Android may not cause a crash.
40 return unittest.skipIf(is_android,
41 'raising SIGSEGV on Android is unreliable')(test)
42
43 @contextmanager
44 def temporary_filename():
45 filename = tempfile.mktemp()
46 try:
47 yield filename
48 finally:
49 os_helper.unlink(filename)
50
51 class ESC[4;38;5;81mFaultHandlerTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
52
53 def get_output(self, code, filename=None, fd=None):
54 """
55 Run the specified code in Python (in a new child process) and read the
56 output from the standard error or from a file (if filename is set).
57 Return the output lines as a list.
58
59 Strip the reference count from the standard error for Python debug
60 build, and replace "Current thread 0x00007f8d8fbd9700" by "Current
61 thread XXX".
62 """
63 code = dedent(code).strip()
64 pass_fds = []
65 if fd is not None:
66 pass_fds.append(fd)
67 with support.SuppressCrashReport():
68 process = script_helper.spawn_python('-c', code, pass_fds=pass_fds)
69 with process:
70 output, stderr = process.communicate()
71 exitcode = process.wait()
72 output = output.decode('ascii', 'backslashreplace')
73 if filename:
74 self.assertEqual(output, '')
75 with open(filename, "rb") as fp:
76 output = fp.read()
77 output = output.decode('ascii', 'backslashreplace')
78 elif fd is not None:
79 self.assertEqual(output, '')
80 os.lseek(fd, os.SEEK_SET, 0)
81 with open(fd, "rb", closefd=False) as fp:
82 output = fp.read()
83 output = output.decode('ascii', 'backslashreplace')
84 return output.splitlines(), exitcode
85
86 def check_error(self, code, lineno, fatal_error, *,
87 filename=None, all_threads=True, other_regex=None,
88 fd=None, know_current_thread=True,
89 py_fatal_error=False,
90 garbage_collecting=False,
91 function='<module>'):
92 """
93 Check that the fault handler for fatal errors is enabled and check the
94 traceback from the child process output.
95
96 Raise an error if the output doesn't match the expected format.
97 """
98 if all_threads:
99 if know_current_thread:
100 header = 'Current thread 0x[0-9a-f]+'
101 else:
102 header = 'Thread 0x[0-9a-f]+'
103 else:
104 header = 'Stack'
105 regex = [f'^{fatal_error}']
106 if py_fatal_error:
107 regex.append("Python runtime state: initialized")
108 regex.append('')
109 regex.append(fr'{header} \(most recent call first\):')
110 if garbage_collecting:
111 regex.append(' Garbage-collecting')
112 regex.append(fr' File "<string>", line {lineno} in {function}')
113 regex = '\n'.join(regex)
114
115 if other_regex:
116 regex = f'(?:{regex}|{other_regex})'
117
118 # Enable MULTILINE flag
119 regex = f'(?m){regex}'
120 output, exitcode = self.get_output(code, filename=filename, fd=fd)
121 output = '\n'.join(output)
122 self.assertRegex(output, regex)
123 self.assertNotEqual(exitcode, 0)
124
125 def check_fatal_error(self, code, line_number, name_regex, func=None, **kw):
126 if func:
127 name_regex = '%s: %s' % (func, name_regex)
128 fatal_error = 'Fatal Python error: %s' % name_regex
129 self.check_error(code, line_number, fatal_error, **kw)
130
131 def check_windows_exception(self, code, line_number, name_regex, **kw):
132 fatal_error = 'Windows fatal exception: %s' % name_regex
133 self.check_error(code, line_number, fatal_error, **kw)
134
135 @unittest.skipIf(sys.platform.startswith('aix'),
136 "the first page of memory is a mapped read-only on AIX")
137 def test_read_null(self):
138 if not MS_WINDOWS:
139 self.check_fatal_error("""
140 import faulthandler
141 faulthandler.enable()
142 faulthandler._read_null()
143 """,
144 3,
145 # Issue #12700: Read NULL raises SIGILL on Mac OS X Lion
146 '(?:Segmentation fault'
147 '|Bus error'
148 '|Illegal instruction)')
149 else:
150 self.check_windows_exception("""
151 import faulthandler
152 faulthandler.enable()
153 faulthandler._read_null()
154 """,
155 3,
156 'access violation')
157
158 @skip_segfault_on_android
159 def test_sigsegv(self):
160 self.check_fatal_error("""
161 import faulthandler
162 faulthandler.enable()
163 faulthandler._sigsegv()
164 """,
165 3,
166 'Segmentation fault')
167
168 @skip_segfault_on_android
169 def test_gc(self):
170 # bpo-44466: Detect if the GC is running
171 self.check_fatal_error("""
172 import faulthandler
173 import gc
174 import sys
175
176 faulthandler.enable()
177
178 class RefCycle:
179 def __del__(self):
180 faulthandler._sigsegv()
181
182 # create a reference cycle which triggers a fatal
183 # error in a destructor
184 a = RefCycle()
185 b = RefCycle()
186 a.b = b
187 b.a = a
188
189 # Delete the objects, not the cycle
190 a = None
191 b = None
192
193 # Break the reference cycle: call __del__()
194 gc.collect()
195
196 # Should not reach this line
197 print("exit", file=sys.stderr)
198 """,
199 9,
200 'Segmentation fault',
201 function='__del__',
202 garbage_collecting=True)
203
204 def test_fatal_error_c_thread(self):
205 self.check_fatal_error("""
206 import faulthandler
207 faulthandler.enable()
208 faulthandler._fatal_error_c_thread()
209 """,
210 3,
211 'in new thread',
212 know_current_thread=False,
213 func='faulthandler_fatal_error_thread',
214 py_fatal_error=True)
215
216 def test_sigabrt(self):
217 self.check_fatal_error("""
218 import faulthandler
219 faulthandler.enable()
220 faulthandler._sigabrt()
221 """,
222 3,
223 'Aborted')
224
225 @unittest.skipIf(sys.platform == 'win32',
226 "SIGFPE cannot be caught on Windows")
227 def test_sigfpe(self):
228 self.check_fatal_error("""
229 import faulthandler
230 faulthandler.enable()
231 faulthandler._sigfpe()
232 """,
233 3,
234 'Floating point exception')
235
236 @unittest.skipIf(_testcapi is None, 'need _testcapi')
237 @unittest.skipUnless(hasattr(signal, 'SIGBUS'), 'need signal.SIGBUS')
238 @skip_segfault_on_android
239 def test_sigbus(self):
240 self.check_fatal_error("""
241 import faulthandler
242 import signal
243
244 faulthandler.enable()
245 signal.raise_signal(signal.SIGBUS)
246 """,
247 5,
248 'Bus error')
249
250 @unittest.skipIf(_testcapi is None, 'need _testcapi')
251 @unittest.skipUnless(hasattr(signal, 'SIGILL'), 'need signal.SIGILL')
252 @skip_segfault_on_android
253 def test_sigill(self):
254 self.check_fatal_error("""
255 import faulthandler
256 import signal
257
258 faulthandler.enable()
259 signal.raise_signal(signal.SIGILL)
260 """,
261 5,
262 'Illegal instruction')
263
264 def check_fatal_error_func(self, release_gil):
265 # Test that Py_FatalError() dumps a traceback
266 with support.SuppressCrashReport():
267 self.check_fatal_error(f"""
268 import _testcapi
269 _testcapi.fatal_error(b'xyz', {release_gil})
270 """,
271 2,
272 'xyz',
273 func='_testcapi_fatal_error_impl',
274 py_fatal_error=True)
275
276 def test_fatal_error(self):
277 self.check_fatal_error_func(False)
278
279 def test_fatal_error_without_gil(self):
280 self.check_fatal_error_func(True)
281
282 @unittest.skipIf(sys.platform.startswith('openbsd'),
283 "Issue #12868: sigaltstack() doesn't work on "
284 "OpenBSD if Python is compiled with pthread")
285 @unittest.skipIf(not hasattr(faulthandler, '_stack_overflow'),
286 'need faulthandler._stack_overflow()')
287 def test_stack_overflow(self):
288 self.check_fatal_error("""
289 import faulthandler
290 faulthandler.enable()
291 faulthandler._stack_overflow()
292 """,
293 3,
294 '(?:Segmentation fault|Bus error)',
295 other_regex='unable to raise a stack overflow')
296
297 @skip_segfault_on_android
298 def test_gil_released(self):
299 self.check_fatal_error("""
300 import faulthandler
301 faulthandler.enable()
302 faulthandler._sigsegv(True)
303 """,
304 3,
305 'Segmentation fault')
306
307 @skip_if_sanitizer(memory=True, ub=True, reason="sanitizer "
308 "builds change crashing process output.")
309 @skip_segfault_on_android
310 def test_enable_file(self):
311 with temporary_filename() as filename:
312 self.check_fatal_error("""
313 import faulthandler
314 output = open({filename}, 'wb')
315 faulthandler.enable(output)
316 faulthandler._sigsegv()
317 """.format(filename=repr(filename)),
318 4,
319 'Segmentation fault',
320 filename=filename)
321
322 @unittest.skipIf(sys.platform == "win32",
323 "subprocess doesn't support pass_fds on Windows")
324 @skip_if_sanitizer(memory=True, ub=True, reason="sanitizer "
325 "builds change crashing process output.")
326 @skip_segfault_on_android
327 def test_enable_fd(self):
328 with tempfile.TemporaryFile('wb+') as fp:
329 fd = fp.fileno()
330 self.check_fatal_error("""
331 import faulthandler
332 import sys
333 faulthandler.enable(%s)
334 faulthandler._sigsegv()
335 """ % fd,
336 4,
337 'Segmentation fault',
338 fd=fd)
339
340 @skip_segfault_on_android
341 def test_enable_single_thread(self):
342 self.check_fatal_error("""
343 import faulthandler
344 faulthandler.enable(all_threads=False)
345 faulthandler._sigsegv()
346 """,
347 3,
348 'Segmentation fault',
349 all_threads=False)
350
351 @skip_segfault_on_android
352 def test_disable(self):
353 code = """
354 import faulthandler
355 faulthandler.enable()
356 faulthandler.disable()
357 faulthandler._sigsegv()
358 """
359 not_expected = 'Fatal Python error'
360 stderr, exitcode = self.get_output(code)
361 stderr = '\n'.join(stderr)
362 self.assertTrue(not_expected not in stderr,
363 "%r is present in %r" % (not_expected, stderr))
364 self.assertNotEqual(exitcode, 0)
365
366 @skip_segfault_on_android
367 def test_dump_ext_modules(self):
368 code = """
369 import faulthandler
370 import sys
371 # Don't filter stdlib module names
372 sys.stdlib_module_names = frozenset()
373 faulthandler.enable()
374 faulthandler._sigsegv()
375 """
376 stderr, exitcode = self.get_output(code)
377 stderr = '\n'.join(stderr)
378 match = re.search(r'^Extension modules:(.*) \(total: [0-9]+\)$',
379 stderr, re.MULTILINE)
380 if not match:
381 self.fail(f"Cannot find 'Extension modules:' in {stderr!r}")
382 modules = set(match.group(1).strip().split(', '))
383 for name in ('sys', 'faulthandler'):
384 self.assertIn(name, modules)
385
386 def test_is_enabled(self):
387 orig_stderr = sys.stderr
388 try:
389 # regrtest may replace sys.stderr by io.StringIO object, but
390 # faulthandler.enable() requires that sys.stderr has a fileno()
391 # method
392 sys.stderr = sys.__stderr__
393
394 was_enabled = faulthandler.is_enabled()
395 try:
396 faulthandler.enable()
397 self.assertTrue(faulthandler.is_enabled())
398 faulthandler.disable()
399 self.assertFalse(faulthandler.is_enabled())
400 finally:
401 if was_enabled:
402 faulthandler.enable()
403 else:
404 faulthandler.disable()
405 finally:
406 sys.stderr = orig_stderr
407
408 @support.requires_subprocess()
409 def test_disabled_by_default(self):
410 # By default, the module should be disabled
411 code = "import faulthandler; print(faulthandler.is_enabled())"
412 args = (sys.executable, "-E", "-c", code)
413 # don't use assert_python_ok() because it always enables faulthandler
414 output = subprocess.check_output(args)
415 self.assertEqual(output.rstrip(), b"False")
416
417 @support.requires_subprocess()
418 def test_sys_xoptions(self):
419 # Test python -X faulthandler
420 code = "import faulthandler; print(faulthandler.is_enabled())"
421 args = filter(None, (sys.executable,
422 "-E" if sys.flags.ignore_environment else "",
423 "-X", "faulthandler", "-c", code))
424 env = os.environ.copy()
425 env.pop("PYTHONFAULTHANDLER", None)
426 # don't use assert_python_ok() because it always enables faulthandler
427 output = subprocess.check_output(args, env=env)
428 self.assertEqual(output.rstrip(), b"True")
429
430 @support.requires_subprocess()
431 def test_env_var(self):
432 # empty env var
433 code = "import faulthandler; print(faulthandler.is_enabled())"
434 args = (sys.executable, "-c", code)
435 env = dict(os.environ)
436 env['PYTHONFAULTHANDLER'] = ''
437 env['PYTHONDEVMODE'] = ''
438 # don't use assert_python_ok() because it always enables faulthandler
439 output = subprocess.check_output(args, env=env)
440 self.assertEqual(output.rstrip(), b"False")
441
442 # non-empty env var
443 env = dict(os.environ)
444 env['PYTHONFAULTHANDLER'] = '1'
445 env['PYTHONDEVMODE'] = ''
446 output = subprocess.check_output(args, env=env)
447 self.assertEqual(output.rstrip(), b"True")
448
449 def check_dump_traceback(self, *, filename=None, fd=None):
450 """
451 Explicitly call dump_traceback() function and check its output.
452 Raise an error if the output doesn't match the expected format.
453 """
454 code = """
455 import faulthandler
456
457 filename = {filename!r}
458 fd = {fd}
459
460 def funcB():
461 if filename:
462 with open(filename, "wb") as fp:
463 faulthandler.dump_traceback(fp, all_threads=False)
464 elif fd is not None:
465 faulthandler.dump_traceback(fd,
466 all_threads=False)
467 else:
468 faulthandler.dump_traceback(all_threads=False)
469
470 def funcA():
471 funcB()
472
473 funcA()
474 """
475 code = code.format(
476 filename=filename,
477 fd=fd,
478 )
479 if filename:
480 lineno = 9
481 elif fd is not None:
482 lineno = 11
483 else:
484 lineno = 14
485 expected = [
486 'Stack (most recent call first):',
487 ' File "<string>", line %s in funcB' % lineno,
488 ' File "<string>", line 17 in funcA',
489 ' File "<string>", line 19 in <module>'
490 ]
491 trace, exitcode = self.get_output(code, filename, fd)
492 self.assertEqual(trace, expected)
493 self.assertEqual(exitcode, 0)
494
495 def test_dump_traceback(self):
496 self.check_dump_traceback()
497
498 def test_dump_traceback_file(self):
499 with temporary_filename() as filename:
500 self.check_dump_traceback(filename=filename)
501
502 @unittest.skipIf(sys.platform == "win32",
503 "subprocess doesn't support pass_fds on Windows")
504 def test_dump_traceback_fd(self):
505 with tempfile.TemporaryFile('wb+') as fp:
506 self.check_dump_traceback(fd=fp.fileno())
507
508 def test_truncate(self):
509 maxlen = 500
510 func_name = 'x' * (maxlen + 50)
511 truncated = 'x' * maxlen + '...'
512 code = """
513 import faulthandler
514
515 def {func_name}():
516 faulthandler.dump_traceback(all_threads=False)
517
518 {func_name}()
519 """
520 code = code.format(
521 func_name=func_name,
522 )
523 expected = [
524 'Stack (most recent call first):',
525 ' File "<string>", line 4 in %s' % truncated,
526 ' File "<string>", line 6 in <module>'
527 ]
528 trace, exitcode = self.get_output(code)
529 self.assertEqual(trace, expected)
530 self.assertEqual(exitcode, 0)
531
532 def check_dump_traceback_threads(self, filename):
533 """
534 Call explicitly dump_traceback(all_threads=True) and check the output.
535 Raise an error if the output doesn't match the expected format.
536 """
537 code = """
538 import faulthandler
539 from threading import Thread, Event
540 import time
541
542 def dump():
543 if {filename}:
544 with open({filename}, "wb") as fp:
545 faulthandler.dump_traceback(fp, all_threads=True)
546 else:
547 faulthandler.dump_traceback(all_threads=True)
548
549 class Waiter(Thread):
550 # avoid blocking if the main thread raises an exception.
551 daemon = True
552
553 def __init__(self):
554 Thread.__init__(self)
555 self.running = Event()
556 self.stop = Event()
557
558 def run(self):
559 self.running.set()
560 self.stop.wait()
561
562 waiter = Waiter()
563 waiter.start()
564 waiter.running.wait()
565 dump()
566 waiter.stop.set()
567 waiter.join()
568 """
569 code = code.format(filename=repr(filename))
570 output, exitcode = self.get_output(code, filename)
571 output = '\n'.join(output)
572 if filename:
573 lineno = 8
574 else:
575 lineno = 10
576 regex = r"""
577 ^Thread 0x[0-9a-f]+ \(most recent call first\):
578 (?: File ".*threading.py", line [0-9]+ in [_a-z]+
579 ){{1,3}} File "<string>", line 23 in run
580 File ".*threading.py", line [0-9]+ in _bootstrap_inner
581 File ".*threading.py", line [0-9]+ in _bootstrap
582
583 Current thread 0x[0-9a-f]+ \(most recent call first\):
584 File "<string>", line {lineno} in dump
585 File "<string>", line 28 in <module>$
586 """
587 regex = dedent(regex.format(lineno=lineno)).strip()
588 self.assertRegex(output, regex)
589 self.assertEqual(exitcode, 0)
590
591 def test_dump_traceback_threads(self):
592 self.check_dump_traceback_threads(None)
593
594 def test_dump_traceback_threads_file(self):
595 with temporary_filename() as filename:
596 self.check_dump_traceback_threads(filename)
597
598 def check_dump_traceback_later(self, repeat=False, cancel=False, loops=1,
599 *, filename=None, fd=None):
600 """
601 Check how many times the traceback is written in timeout x 2.5 seconds,
602 or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending
603 on repeat and cancel options.
604
605 Raise an error if the output doesn't match the expect format.
606 """
607 timeout_str = str(datetime.timedelta(seconds=TIMEOUT))
608 code = """
609 import faulthandler
610 import time
611 import sys
612
613 timeout = {timeout}
614 repeat = {repeat}
615 cancel = {cancel}
616 loops = {loops}
617 filename = {filename!r}
618 fd = {fd}
619
620 def func(timeout, repeat, cancel, file, loops):
621 for loop in range(loops):
622 faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file)
623 if cancel:
624 faulthandler.cancel_dump_traceback_later()
625 time.sleep(timeout * 5)
626 faulthandler.cancel_dump_traceback_later()
627
628 if filename:
629 file = open(filename, "wb")
630 elif fd is not None:
631 file = sys.stderr.fileno()
632 else:
633 file = None
634 func(timeout, repeat, cancel, file, loops)
635 if filename:
636 file.close()
637 """
638 code = code.format(
639 timeout=TIMEOUT,
640 repeat=repeat,
641 cancel=cancel,
642 loops=loops,
643 filename=filename,
644 fd=fd,
645 )
646 trace, exitcode = self.get_output(code, filename)
647 trace = '\n'.join(trace)
648
649 if not cancel:
650 count = loops
651 if repeat:
652 count *= 2
653 header = r'Timeout \(%s\)!\nThread 0x[0-9a-f]+ \(most recent call first\):\n' % timeout_str
654 regex = expected_traceback(17, 26, header, min_count=count)
655 self.assertRegex(trace, regex)
656 else:
657 self.assertEqual(trace, '')
658 self.assertEqual(exitcode, 0)
659
660 def test_dump_traceback_later(self):
661 self.check_dump_traceback_later()
662
663 def test_dump_traceback_later_repeat(self):
664 self.check_dump_traceback_later(repeat=True)
665
666 def test_dump_traceback_later_cancel(self):
667 self.check_dump_traceback_later(cancel=True)
668
669 def test_dump_traceback_later_file(self):
670 with temporary_filename() as filename:
671 self.check_dump_traceback_later(filename=filename)
672
673 @unittest.skipIf(sys.platform == "win32",
674 "subprocess doesn't support pass_fds on Windows")
675 def test_dump_traceback_later_fd(self):
676 with tempfile.TemporaryFile('wb+') as fp:
677 self.check_dump_traceback_later(fd=fp.fileno())
678
679 @support.requires_resource('walltime')
680 def test_dump_traceback_later_twice(self):
681 self.check_dump_traceback_later(loops=2)
682
683 @unittest.skipIf(not hasattr(faulthandler, "register"),
684 "need faulthandler.register")
685 def check_register(self, filename=False, all_threads=False,
686 unregister=False, chain=False, fd=None):
687 """
688 Register a handler displaying the traceback on a user signal. Raise the
689 signal and check the written traceback.
690
691 If chain is True, check that the previous signal handler is called.
692
693 Raise an error if the output doesn't match the expected format.
694 """
695 signum = signal.SIGUSR1
696 code = """
697 import faulthandler
698 import os
699 import signal
700 import sys
701
702 all_threads = {all_threads}
703 signum = {signum:d}
704 unregister = {unregister}
705 chain = {chain}
706 filename = {filename!r}
707 fd = {fd}
708
709 def func(signum):
710 os.kill(os.getpid(), signum)
711
712 def handler(signum, frame):
713 handler.called = True
714 handler.called = False
715
716 if filename:
717 file = open(filename, "wb")
718 elif fd is not None:
719 file = sys.stderr.fileno()
720 else:
721 file = None
722 if chain:
723 signal.signal(signum, handler)
724 faulthandler.register(signum, file=file,
725 all_threads=all_threads, chain={chain})
726 if unregister:
727 faulthandler.unregister(signum)
728 func(signum)
729 if chain and not handler.called:
730 if file is not None:
731 output = file
732 else:
733 output = sys.stderr
734 print("Error: signal handler not called!", file=output)
735 exitcode = 1
736 else:
737 exitcode = 0
738 if filename:
739 file.close()
740 sys.exit(exitcode)
741 """
742 code = code.format(
743 all_threads=all_threads,
744 signum=signum,
745 unregister=unregister,
746 chain=chain,
747 filename=filename,
748 fd=fd,
749 )
750 trace, exitcode = self.get_output(code, filename)
751 trace = '\n'.join(trace)
752 if not unregister:
753 if all_threads:
754 regex = r'Current thread 0x[0-9a-f]+ \(most recent call first\):\n'
755 else:
756 regex = r'Stack \(most recent call first\):\n'
757 regex = expected_traceback(14, 32, regex)
758 self.assertRegex(trace, regex)
759 else:
760 self.assertEqual(trace, '')
761 if unregister:
762 self.assertNotEqual(exitcode, 0)
763 else:
764 self.assertEqual(exitcode, 0)
765
766 def test_register(self):
767 self.check_register()
768
769 def test_unregister(self):
770 self.check_register(unregister=True)
771
772 def test_register_file(self):
773 with temporary_filename() as filename:
774 self.check_register(filename=filename)
775
776 @unittest.skipIf(sys.platform == "win32",
777 "subprocess doesn't support pass_fds on Windows")
778 def test_register_fd(self):
779 with tempfile.TemporaryFile('wb+') as fp:
780 self.check_register(fd=fp.fileno())
781
782 def test_register_threads(self):
783 self.check_register(all_threads=True)
784
785 def test_register_chain(self):
786 self.check_register(chain=True)
787
788 @contextmanager
789 def check_stderr_none(self):
790 stderr = sys.stderr
791 try:
792 sys.stderr = None
793 with self.assertRaises(RuntimeError) as cm:
794 yield
795 self.assertEqual(str(cm.exception), "sys.stderr is None")
796 finally:
797 sys.stderr = stderr
798
799 def test_stderr_None(self):
800 # Issue #21497: provide a helpful error if sys.stderr is None,
801 # instead of just an attribute error: "None has no attribute fileno".
802 with self.check_stderr_none():
803 faulthandler.enable()
804 with self.check_stderr_none():
805 faulthandler.dump_traceback()
806 with self.check_stderr_none():
807 faulthandler.dump_traceback_later(1e-3)
808 if hasattr(faulthandler, "register"):
809 with self.check_stderr_none():
810 faulthandler.register(signal.SIGUSR1)
811
812 @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
813 def test_raise_exception(self):
814 for exc, name in (
815 ('EXCEPTION_ACCESS_VIOLATION', 'access violation'),
816 ('EXCEPTION_INT_DIVIDE_BY_ZERO', 'int divide by zero'),
817 ('EXCEPTION_STACK_OVERFLOW', 'stack overflow'),
818 ):
819 self.check_windows_exception(f"""
820 import faulthandler
821 faulthandler.enable()
822 faulthandler._raise_exception(faulthandler._{exc})
823 """,
824 3,
825 name)
826
827 @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
828 def test_ignore_exception(self):
829 for exc_code in (
830 0xE06D7363, # MSC exception ("Emsc")
831 0xE0434352, # COM Callable Runtime exception ("ECCR")
832 ):
833 code = f"""
834 import faulthandler
835 faulthandler.enable()
836 faulthandler._raise_exception({exc_code})
837 """
838 code = dedent(code)
839 output, exitcode = self.get_output(code)
840 self.assertEqual(output, [])
841 self.assertEqual(exitcode, exc_code)
842
843 @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
844 def test_raise_nonfatal_exception(self):
845 # These exceptions are not strictly errors. Letting
846 # faulthandler display the traceback when they are
847 # raised is likely to result in noise. However, they
848 # may still terminate the process if there is no
849 # handler installed for them (which there typically
850 # is, e.g. for debug messages).
851 for exc in (
852 0x00000000,
853 0x34567890,
854 0x40000000,
855 0x40001000,
856 0x70000000,
857 0x7FFFFFFF,
858 ):
859 output, exitcode = self.get_output(f"""
860 import faulthandler
861 faulthandler.enable()
862 faulthandler._raise_exception(0x{exc:x})
863 """
864 )
865 self.assertEqual(output, [])
866 # On Windows older than 7 SP1, the actual exception code has
867 # bit 29 cleared.
868 self.assertIn(exitcode,
869 (exc, exc & ~0x10000000))
870
871 @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
872 def test_disable_windows_exc_handler(self):
873 code = dedent("""
874 import faulthandler
875 faulthandler.enable()
876 faulthandler.disable()
877 code = faulthandler._EXCEPTION_ACCESS_VIOLATION
878 faulthandler._raise_exception(code)
879 """)
880 output, exitcode = self.get_output(code)
881 self.assertEqual(output, [])
882 self.assertEqual(exitcode, 0xC0000005)
883
884 def test_cancel_later_without_dump_traceback_later(self):
885 # bpo-37933: Calling cancel_dump_traceback_later()
886 # without dump_traceback_later() must not segfault.
887 code = dedent("""
888 import faulthandler
889 faulthandler.cancel_dump_traceback_later()
890 """)
891 output, exitcode = self.get_output(code)
892 self.assertEqual(output, [])
893 self.assertEqual(exitcode, 0)
894
895
896 if __name__ == "__main__":
897 unittest.main()