(root)/
Python-3.11.7/
Lib/
test/
test_faulthandler.py
       1  from contextlib import contextmanager
       2  import datetime
       3  import faulthandler
       4  import os
       5  import re
       6  import signal
       7  import subprocess
       8  import sys
       9  from test import support
      10  from test.support import os_helper, script_helper, is_android, MS_WINDOWS
      11  import tempfile
      12  import unittest
      13  from textwrap import dedent
      14  
      15  try:
      16      import _testcapi
      17  except ImportError:
      18      _testcapi = None
      19  
      20  if not support.has_subprocess_support:
      21      raise unittest.SkipTest("test module requires subprocess")
      22  
      23  TIMEOUT = 0.5
      24  
      25  
      26  def expected_traceback(lineno1, lineno2, header, min_count=1):
      27      regex = header
      28      regex += '  File "<string>", line %s in func\n' % lineno1
      29      regex += '  File "<string>", line %s in <module>' % lineno2
      30      if 1 < min_count:
      31          return '^' + (regex + '\n') * (min_count - 1) + regex
      32      else:
      33          return '^' + regex + '$'
      34  
      35  def skip_segfault_on_android(test):
      36      # gh-76319: Raising SIGSEGV on Android may not cause a crash.
      37      return unittest.skipIf(is_android,
      38                             'raising SIGSEGV on Android is unreliable')(test)
      39  
      40  @contextmanager
      41  def temporary_filename():
      42      filename = tempfile.mktemp()
      43      try:
      44          yield filename
      45      finally:
      46          os_helper.unlink(filename)
      47  
      48  class ESC[4;38;5;81mFaultHandlerTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      49  
      50      def get_output(self, code, filename=None, fd=None):
      51          """
      52          Run the specified code in Python (in a new child process) and read the
      53          output from the standard error or from a file (if filename is set).
      54          Return the output lines as a list.
      55  
      56          Strip the reference count from the standard error for Python debug
      57          build, and replace "Current thread 0x00007f8d8fbd9700" by "Current
      58          thread XXX".
      59          """
      60          code = dedent(code).strip()
      61          pass_fds = []
      62          if fd is not None:
      63              pass_fds.append(fd)
      64          env = dict(os.environ)
      65  
      66          # Sanitizers must not handle SIGSEGV (ex: for test_enable_fd())
      67          option = 'handle_segv=0'
      68          support.set_sanitizer_env_var(env, option)
      69  
      70          with support.SuppressCrashReport():
      71              process = script_helper.spawn_python('-c', code,
      72                                                   pass_fds=pass_fds,
      73                                                   env=env)
      74              with process:
      75                  output, stderr = process.communicate()
      76                  exitcode = process.wait()
      77          output = output.decode('ascii', 'backslashreplace')
      78          if filename:
      79              self.assertEqual(output, '')
      80              with open(filename, "rb") as fp:
      81                  output = fp.read()
      82              output = output.decode('ascii', 'backslashreplace')
      83          elif fd is not None:
      84              self.assertEqual(output, '')
      85              os.lseek(fd, os.SEEK_SET, 0)
      86              with open(fd, "rb", closefd=False) as fp:
      87                  output = fp.read()
      88              output = output.decode('ascii', 'backslashreplace')
      89          return output.splitlines(), exitcode
      90  
      91      def check_error(self, code, lineno, fatal_error, *,
      92                      filename=None, all_threads=True, other_regex=None,
      93                      fd=None, know_current_thread=True,
      94                      py_fatal_error=False,
      95                      garbage_collecting=False,
      96                      function='<module>'):
      97          """
      98          Check that the fault handler for fatal errors is enabled and check the
      99          traceback from the child process output.
     100  
     101          Raise an error if the output doesn't match the expected format.
     102          """
     103          if all_threads:
     104              if know_current_thread:
     105                  header = 'Current thread 0x[0-9a-f]+'
     106              else:
     107                  header = 'Thread 0x[0-9a-f]+'
     108          else:
     109              header = 'Stack'
     110          regex = [f'^{fatal_error}']
     111          if py_fatal_error:
     112              regex.append("Python runtime state: initialized")
     113          regex.append('')
     114          regex.append(fr'{header} \(most recent call first\):')
     115          if garbage_collecting:
     116              regex.append('  Garbage-collecting')
     117          regex.append(fr'  File "<string>", line {lineno} in {function}')
     118          regex = '\n'.join(regex)
     119  
     120          if other_regex:
     121              regex = f'(?:{regex}|{other_regex})'
     122  
     123          # Enable MULTILINE flag
     124          regex = f'(?m){regex}'
     125          output, exitcode = self.get_output(code, filename=filename, fd=fd)
     126          output = '\n'.join(output)
     127          self.assertRegex(output, regex)
     128          self.assertNotEqual(exitcode, 0)
     129  
     130      def check_fatal_error(self, code, line_number, name_regex, func=None, **kw):
     131          if func:
     132              name_regex = '%s: %s' % (func, name_regex)
     133          fatal_error = 'Fatal Python error: %s' % name_regex
     134          self.check_error(code, line_number, fatal_error, **kw)
     135  
     136      def check_windows_exception(self, code, line_number, name_regex, **kw):
     137          fatal_error = 'Windows fatal exception: %s' % name_regex
     138          self.check_error(code, line_number, fatal_error, **kw)
     139  
     140      @unittest.skipIf(sys.platform.startswith('aix'),
     141                       "the first page of memory is a mapped read-only on AIX")
     142      def test_read_null(self):
     143          if not MS_WINDOWS:
     144              self.check_fatal_error("""
     145                  import faulthandler
     146                  faulthandler.enable()
     147                  faulthandler._read_null()
     148                  """,
     149                  3,
     150                  # Issue #12700: Read NULL raises SIGILL on Mac OS X Lion
     151                  '(?:Segmentation fault'
     152                      '|Bus error'
     153                      '|Illegal instruction)')
     154          else:
     155              self.check_windows_exception("""
     156                  import faulthandler
     157                  faulthandler.enable()
     158                  faulthandler._read_null()
     159                  """,
     160                  3,
     161                  'access violation')
     162  
     163      @skip_segfault_on_android
     164      def test_sigsegv(self):
     165          self.check_fatal_error("""
     166              import faulthandler
     167              faulthandler.enable()
     168              faulthandler._sigsegv()
     169              """,
     170              3,
     171              'Segmentation fault')
     172  
     173      @skip_segfault_on_android
     174      def test_gc(self):
     175          # bpo-44466: Detect if the GC is running
     176          self.check_fatal_error("""
     177              import faulthandler
     178              import gc
     179              import sys
     180  
     181              faulthandler.enable()
     182  
     183              class RefCycle:
     184                  def __del__(self):
     185                      faulthandler._sigsegv()
     186  
     187              # create a reference cycle which triggers a fatal
     188              # error in a destructor
     189              a = RefCycle()
     190              b = RefCycle()
     191              a.b = b
     192              b.a = a
     193  
     194              # Delete the objects, not the cycle
     195              a = None
     196              b = None
     197  
     198              # Break the reference cycle: call __del__()
     199              gc.collect()
     200  
     201              # Should not reach this line
     202              print("exit", file=sys.stderr)
     203              """,
     204              9,
     205              'Segmentation fault',
     206              function='__del__',
     207              garbage_collecting=True)
     208  
     209      def test_fatal_error_c_thread(self):
     210          self.check_fatal_error("""
     211              import faulthandler
     212              faulthandler.enable()
     213              faulthandler._fatal_error_c_thread()
     214              """,
     215              3,
     216              'in new thread',
     217              know_current_thread=False,
     218              func='faulthandler_fatal_error_thread',
     219              py_fatal_error=True)
     220  
     221      def test_sigabrt(self):
     222          self.check_fatal_error("""
     223              import faulthandler
     224              faulthandler.enable()
     225              faulthandler._sigabrt()
     226              """,
     227              3,
     228              'Aborted')
     229  
     230      @unittest.skipIf(sys.platform == 'win32',
     231                       "SIGFPE cannot be caught on Windows")
     232      def test_sigfpe(self):
     233          self.check_fatal_error("""
     234              import faulthandler
     235              faulthandler.enable()
     236              faulthandler._sigfpe()
     237              """,
     238              3,
     239              'Floating point exception')
     240  
     241      @unittest.skipIf(_testcapi is None, 'need _testcapi')
     242      @unittest.skipUnless(hasattr(signal, 'SIGBUS'), 'need signal.SIGBUS')
     243      @skip_segfault_on_android
     244      def test_sigbus(self):
     245          self.check_fatal_error("""
     246              import faulthandler
     247              import signal
     248  
     249              faulthandler.enable()
     250              signal.raise_signal(signal.SIGBUS)
     251              """,
     252              5,
     253              'Bus error')
     254  
     255      @unittest.skipIf(_testcapi is None, 'need _testcapi')
     256      @unittest.skipUnless(hasattr(signal, 'SIGILL'), 'need signal.SIGILL')
     257      @skip_segfault_on_android
     258      def test_sigill(self):
     259          self.check_fatal_error("""
     260              import faulthandler
     261              import signal
     262  
     263              faulthandler.enable()
     264              signal.raise_signal(signal.SIGILL)
     265              """,
     266              5,
     267              'Illegal instruction')
     268  
     269      def check_fatal_error_func(self, release_gil):
     270          # Test that Py_FatalError() dumps a traceback
     271          with support.SuppressCrashReport():
     272              self.check_fatal_error(f"""
     273                  import _testcapi
     274                  _testcapi.fatal_error(b'xyz', {release_gil})
     275                  """,
     276                  2,
     277                  'xyz',
     278                  func='test_fatal_error',
     279                  py_fatal_error=True)
     280  
     281      def test_fatal_error(self):
     282          self.check_fatal_error_func(False)
     283  
     284      def test_fatal_error_without_gil(self):
     285          self.check_fatal_error_func(True)
     286  
     287      @unittest.skipIf(sys.platform.startswith('openbsd'),
     288                       "Issue #12868: sigaltstack() doesn't work on "
     289                       "OpenBSD if Python is compiled with pthread")
     290      @unittest.skipIf(not hasattr(faulthandler, '_stack_overflow'),
     291                       'need faulthandler._stack_overflow()')
     292      def test_stack_overflow(self):
     293          self.check_fatal_error("""
     294              import faulthandler
     295              faulthandler.enable()
     296              faulthandler._stack_overflow()
     297              """,
     298              3,
     299              '(?:Segmentation fault|Bus error)',
     300              other_regex='unable to raise a stack overflow')
     301  
     302      @skip_segfault_on_android
     303      def test_gil_released(self):
     304          self.check_fatal_error("""
     305              import faulthandler
     306              faulthandler.enable()
     307              faulthandler._sigsegv(True)
     308              """,
     309              3,
     310              'Segmentation fault')
     311  
     312      @skip_segfault_on_android
     313      def test_enable_file(self):
     314          with temporary_filename() as filename:
     315              self.check_fatal_error("""
     316                  import faulthandler
     317                  output = open({filename}, 'wb')
     318                  faulthandler.enable(output)
     319                  faulthandler._sigsegv()
     320                  """.format(filename=repr(filename)),
     321                  4,
     322                  'Segmentation fault',
     323                  filename=filename)
     324  
     325      @unittest.skipIf(sys.platform == "win32",
     326                       "subprocess doesn't support pass_fds on Windows")
     327      @skip_segfault_on_android
     328      def test_enable_fd(self):
     329          with tempfile.TemporaryFile('wb+') as fp:
     330              fd = fp.fileno()
     331              self.check_fatal_error("""
     332                  import faulthandler
     333                  import sys
     334                  faulthandler.enable(%s)
     335                  faulthandler._sigsegv()
     336                  """ % fd,
     337                  4,
     338                  'Segmentation fault',
     339                  fd=fd)
     340  
     341      @skip_segfault_on_android
     342      def test_enable_single_thread(self):
     343          self.check_fatal_error("""
     344              import faulthandler
     345              faulthandler.enable(all_threads=False)
     346              faulthandler._sigsegv()
     347              """,
     348              3,
     349              'Segmentation fault',
     350              all_threads=False)
     351  
     352      @skip_segfault_on_android
     353      def test_disable(self):
     354          code = """
     355              import faulthandler
     356              faulthandler.enable()
     357              faulthandler.disable()
     358              faulthandler._sigsegv()
     359              """
     360          not_expected = 'Fatal Python error'
     361          stderr, exitcode = self.get_output(code)
     362          stderr = '\n'.join(stderr)
     363          self.assertTrue(not_expected not in stderr,
     364                       "%r is present in %r" % (not_expected, stderr))
     365          self.assertNotEqual(exitcode, 0)
     366  
     367      @skip_segfault_on_android
     368      def test_dump_ext_modules(self):
     369          code = """
     370              import faulthandler
     371              import sys
     372              # Don't filter stdlib module names
     373              sys.stdlib_module_names = frozenset()
     374              faulthandler.enable()
     375              faulthandler._sigsegv()
     376              """
     377          stderr, exitcode = self.get_output(code)
     378          stderr = '\n'.join(stderr)
     379          match = re.search(r'^Extension modules:(.*) \(total: [0-9]+\)$',
     380                            stderr, re.MULTILINE)
     381          if not match:
     382              self.fail(f"Cannot find 'Extension modules:' in {stderr!r}")
     383          modules = set(match.group(1).strip().split(', '))
     384          for name in ('sys', 'faulthandler'):
     385              self.assertIn(name, modules)
     386  
     387      def test_is_enabled(self):
     388          orig_stderr = sys.stderr
     389          try:
     390              # regrtest may replace sys.stderr by io.StringIO object, but
     391              # faulthandler.enable() requires that sys.stderr has a fileno()
     392              # method
     393              sys.stderr = sys.__stderr__
     394  
     395              was_enabled = faulthandler.is_enabled()
     396              try:
     397                  faulthandler.enable()
     398                  self.assertTrue(faulthandler.is_enabled())
     399                  faulthandler.disable()
     400                  self.assertFalse(faulthandler.is_enabled())
     401              finally:
     402                  if was_enabled:
     403                      faulthandler.enable()
     404                  else:
     405                      faulthandler.disable()
     406          finally:
     407              sys.stderr = orig_stderr
     408  
     409      @support.requires_subprocess()
     410      def test_disabled_by_default(self):
     411          # By default, the module should be disabled
     412          code = "import faulthandler; print(faulthandler.is_enabled())"
     413          args = (sys.executable, "-E", "-c", code)
     414          # don't use assert_python_ok() because it always enables faulthandler
     415          output = subprocess.check_output(args)
     416          self.assertEqual(output.rstrip(), b"False")
     417  
     418      @support.requires_subprocess()
     419      def test_sys_xoptions(self):
     420          # Test python -X faulthandler
     421          code = "import faulthandler; print(faulthandler.is_enabled())"
     422          args = filter(None, (sys.executable,
     423                               "-E" if sys.flags.ignore_environment else "",
     424                               "-X", "faulthandler", "-c", code))
     425          env = os.environ.copy()
     426          env.pop("PYTHONFAULTHANDLER", None)
     427          # don't use assert_python_ok() because it always enables faulthandler
     428          output = subprocess.check_output(args, env=env)
     429          self.assertEqual(output.rstrip(), b"True")
     430  
     431      @support.requires_subprocess()
     432      def test_env_var(self):
     433          # empty env var
     434          code = "import faulthandler; print(faulthandler.is_enabled())"
     435          args = (sys.executable, "-c", code)
     436          env = dict(os.environ)
     437          env['PYTHONFAULTHANDLER'] = ''
     438          env['PYTHONDEVMODE'] = ''
     439          # don't use assert_python_ok() because it always enables faulthandler
     440          output = subprocess.check_output(args, env=env)
     441          self.assertEqual(output.rstrip(), b"False")
     442  
     443          # non-empty env var
     444          env = dict(os.environ)
     445          env['PYTHONFAULTHANDLER'] = '1'
     446          env['PYTHONDEVMODE'] = ''
     447          output = subprocess.check_output(args, env=env)
     448          self.assertEqual(output.rstrip(), b"True")
     449  
     450      def check_dump_traceback(self, *, filename=None, fd=None):
     451          """
     452          Explicitly call dump_traceback() function and check its output.
     453          Raise an error if the output doesn't match the expected format.
     454          """
     455          code = """
     456              import faulthandler
     457  
     458              filename = {filename!r}
     459              fd = {fd}
     460  
     461              def funcB():
     462                  if filename:
     463                      with open(filename, "wb") as fp:
     464                          faulthandler.dump_traceback(fp, all_threads=False)
     465                  elif fd is not None:
     466                      faulthandler.dump_traceback(fd,
     467                                                  all_threads=False)
     468                  else:
     469                      faulthandler.dump_traceback(all_threads=False)
     470  
     471              def funcA():
     472                  funcB()
     473  
     474              funcA()
     475              """
     476          code = code.format(
     477              filename=filename,
     478              fd=fd,
     479          )
     480          if filename:
     481              lineno = 9
     482          elif fd is not None:
     483              lineno = 11
     484          else:
     485              lineno = 14
     486          expected = [
     487              'Stack (most recent call first):',
     488              '  File "<string>", line %s in funcB' % lineno,
     489              '  File "<string>", line 17 in funcA',
     490              '  File "<string>", line 19 in <module>'
     491          ]
     492          trace, exitcode = self.get_output(code, filename, fd)
     493          self.assertEqual(trace, expected)
     494          self.assertEqual(exitcode, 0)
     495  
     496      def test_dump_traceback(self):
     497          self.check_dump_traceback()
     498  
     499      def test_dump_traceback_file(self):
     500          with temporary_filename() as filename:
     501              self.check_dump_traceback(filename=filename)
     502  
     503      @unittest.skipIf(sys.platform == "win32",
     504                       "subprocess doesn't support pass_fds on Windows")
     505      def test_dump_traceback_fd(self):
     506          with tempfile.TemporaryFile('wb+') as fp:
     507              self.check_dump_traceback(fd=fp.fileno())
     508  
     509      def test_truncate(self):
     510          maxlen = 500
     511          func_name = 'x' * (maxlen + 50)
     512          truncated = 'x' * maxlen + '...'
     513          code = """
     514              import faulthandler
     515  
     516              def {func_name}():
     517                  faulthandler.dump_traceback(all_threads=False)
     518  
     519              {func_name}()
     520              """
     521          code = code.format(
     522              func_name=func_name,
     523          )
     524          expected = [
     525              'Stack (most recent call first):',
     526              '  File "<string>", line 4 in %s' % truncated,
     527              '  File "<string>", line 6 in <module>'
     528          ]
     529          trace, exitcode = self.get_output(code)
     530          self.assertEqual(trace, expected)
     531          self.assertEqual(exitcode, 0)
     532  
     533      def check_dump_traceback_threads(self, filename):
     534          """
     535          Call explicitly dump_traceback(all_threads=True) and check the output.
     536          Raise an error if the output doesn't match the expected format.
     537          """
     538          code = """
     539              import faulthandler
     540              from threading import Thread, Event
     541              import time
     542  
     543              def dump():
     544                  if {filename}:
     545                      with open({filename}, "wb") as fp:
     546                          faulthandler.dump_traceback(fp, all_threads=True)
     547                  else:
     548                      faulthandler.dump_traceback(all_threads=True)
     549  
     550              class Waiter(Thread):
     551                  # avoid blocking if the main thread raises an exception.
     552                  daemon = True
     553  
     554                  def __init__(self):
     555                      Thread.__init__(self)
     556                      self.running = Event()
     557                      self.stop = Event()
     558  
     559                  def run(self):
     560                      self.running.set()
     561                      self.stop.wait()
     562  
     563              waiter = Waiter()
     564              waiter.start()
     565              waiter.running.wait()
     566              dump()
     567              waiter.stop.set()
     568              waiter.join()
     569              """
     570          code = code.format(filename=repr(filename))
     571          output, exitcode = self.get_output(code, filename)
     572          output = '\n'.join(output)
     573          if filename:
     574              lineno = 8
     575          else:
     576              lineno = 10
     577          regex = r"""
     578              ^Thread 0x[0-9a-f]+ \(most recent call first\):
     579              (?:  File ".*threading.py", line [0-9]+ in [_a-z]+
     580              ){{1,3}}  File "<string>", line 23 in run
     581                File ".*threading.py", line [0-9]+ in _bootstrap_inner
     582                File ".*threading.py", line [0-9]+ in _bootstrap
     583  
     584              Current thread 0x[0-9a-f]+ \(most recent call first\):
     585                File "<string>", line {lineno} in dump
     586                File "<string>", line 28 in <module>$
     587              """
     588          regex = dedent(regex.format(lineno=lineno)).strip()
     589          self.assertRegex(output, regex)
     590          self.assertEqual(exitcode, 0)
     591  
     592      def test_dump_traceback_threads(self):
     593          self.check_dump_traceback_threads(None)
     594  
     595      def test_dump_traceback_threads_file(self):
     596          with temporary_filename() as filename:
     597              self.check_dump_traceback_threads(filename)
     598  
     599      def check_dump_traceback_later(self, repeat=False, cancel=False, loops=1,
     600                                     *, filename=None, fd=None):
     601          """
     602          Check how many times the traceback is written in timeout x 2.5 seconds,
     603          or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending
     604          on repeat and cancel options.
     605  
     606          Raise an error if the output doesn't match the expect format.
     607          """
     608          timeout_str = str(datetime.timedelta(seconds=TIMEOUT))
     609          code = """
     610              import faulthandler
     611              import time
     612              import sys
     613  
     614              timeout = {timeout}
     615              repeat = {repeat}
     616              cancel = {cancel}
     617              loops = {loops}
     618              filename = {filename!r}
     619              fd = {fd}
     620  
     621              def func(timeout, repeat, cancel, file, loops):
     622                  for loop in range(loops):
     623                      faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file)
     624                      if cancel:
     625                          faulthandler.cancel_dump_traceback_later()
     626                      time.sleep(timeout * 5)
     627                      faulthandler.cancel_dump_traceback_later()
     628  
     629              if filename:
     630                  file = open(filename, "wb")
     631              elif fd is not None:
     632                  file = sys.stderr.fileno()
     633              else:
     634                  file = None
     635              func(timeout, repeat, cancel, file, loops)
     636              if filename:
     637                  file.close()
     638              """
     639          code = code.format(
     640              timeout=TIMEOUT,
     641              repeat=repeat,
     642              cancel=cancel,
     643              loops=loops,
     644              filename=filename,
     645              fd=fd,
     646          )
     647          trace, exitcode = self.get_output(code, filename)
     648          trace = '\n'.join(trace)
     649  
     650          if not cancel:
     651              count = loops
     652              if repeat:
     653                  count *= 2
     654              header = r'Timeout \(%s\)!\nThread 0x[0-9a-f]+ \(most recent call first\):\n' % timeout_str
     655              regex = expected_traceback(17, 26, header, min_count=count)
     656              self.assertRegex(trace, regex)
     657          else:
     658              self.assertEqual(trace, '')
     659          self.assertEqual(exitcode, 0)
     660  
     661      def test_dump_traceback_later(self):
     662          self.check_dump_traceback_later()
     663  
     664      def test_dump_traceback_later_repeat(self):
     665          self.check_dump_traceback_later(repeat=True)
     666  
     667      def test_dump_traceback_later_cancel(self):
     668          self.check_dump_traceback_later(cancel=True)
     669  
     670      def test_dump_traceback_later_file(self):
     671          with temporary_filename() as filename:
     672              self.check_dump_traceback_later(filename=filename)
     673  
     674      @unittest.skipIf(sys.platform == "win32",
     675                       "subprocess doesn't support pass_fds on Windows")
     676      def test_dump_traceback_later_fd(self):
     677          with tempfile.TemporaryFile('wb+') as fp:
     678              self.check_dump_traceback_later(fd=fp.fileno())
     679  
     680      @support.requires_resource('walltime')
     681      def test_dump_traceback_later_twice(self):
     682          self.check_dump_traceback_later(loops=2)
     683  
     684      @unittest.skipIf(not hasattr(faulthandler, "register"),
     685                       "need faulthandler.register")
     686      def check_register(self, filename=False, all_threads=False,
     687                         unregister=False, chain=False, fd=None):
     688          """
     689          Register a handler displaying the traceback on a user signal. Raise the
     690          signal and check the written traceback.
     691  
     692          If chain is True, check that the previous signal handler is called.
     693  
     694          Raise an error if the output doesn't match the expected format.
     695          """
     696          signum = signal.SIGUSR1
     697          code = """
     698              import faulthandler
     699              import os
     700              import signal
     701              import sys
     702  
     703              all_threads = {all_threads}
     704              signum = {signum:d}
     705              unregister = {unregister}
     706              chain = {chain}
     707              filename = {filename!r}
     708              fd = {fd}
     709  
     710              def func(signum):
     711                  os.kill(os.getpid(), signum)
     712  
     713              def handler(signum, frame):
     714                  handler.called = True
     715              handler.called = False
     716  
     717              if filename:
     718                  file = open(filename, "wb")
     719              elif fd is not None:
     720                  file = sys.stderr.fileno()
     721              else:
     722                  file = None
     723              if chain:
     724                  signal.signal(signum, handler)
     725              faulthandler.register(signum, file=file,
     726                                    all_threads=all_threads, chain={chain})
     727              if unregister:
     728                  faulthandler.unregister(signum)
     729              func(signum)
     730              if chain and not handler.called:
     731                  if file is not None:
     732                      output = file
     733                  else:
     734                      output = sys.stderr
     735                  print("Error: signal handler not called!", file=output)
     736                  exitcode = 1
     737              else:
     738                  exitcode = 0
     739              if filename:
     740                  file.close()
     741              sys.exit(exitcode)
     742              """
     743          code = code.format(
     744              all_threads=all_threads,
     745              signum=signum,
     746              unregister=unregister,
     747              chain=chain,
     748              filename=filename,
     749              fd=fd,
     750          )
     751          trace, exitcode = self.get_output(code, filename)
     752          trace = '\n'.join(trace)
     753          if not unregister:
     754              if all_threads:
     755                  regex = r'Current thread 0x[0-9a-f]+ \(most recent call first\):\n'
     756              else:
     757                  regex = r'Stack \(most recent call first\):\n'
     758              regex = expected_traceback(14, 32, regex)
     759              self.assertRegex(trace, regex)
     760          else:
     761              self.assertEqual(trace, '')
     762          if unregister:
     763              self.assertNotEqual(exitcode, 0)
     764          else:
     765              self.assertEqual(exitcode, 0)
     766  
     767      def test_register(self):
     768          self.check_register()
     769  
     770      def test_unregister(self):
     771          self.check_register(unregister=True)
     772  
     773      def test_register_file(self):
     774          with temporary_filename() as filename:
     775              self.check_register(filename=filename)
     776  
     777      @unittest.skipIf(sys.platform == "win32",
     778                       "subprocess doesn't support pass_fds on Windows")
     779      def test_register_fd(self):
     780          with tempfile.TemporaryFile('wb+') as fp:
     781              self.check_register(fd=fp.fileno())
     782  
     783      def test_register_threads(self):
     784          self.check_register(all_threads=True)
     785  
     786      def test_register_chain(self):
     787          self.check_register(chain=True)
     788  
     789      @contextmanager
     790      def check_stderr_none(self):
     791          stderr = sys.stderr
     792          try:
     793              sys.stderr = None
     794              with self.assertRaises(RuntimeError) as cm:
     795                  yield
     796              self.assertEqual(str(cm.exception), "sys.stderr is None")
     797          finally:
     798              sys.stderr = stderr
     799  
     800      def test_stderr_None(self):
     801          # Issue #21497: provide a helpful error if sys.stderr is None,
     802          # instead of just an attribute error: "None has no attribute fileno".
     803          with self.check_stderr_none():
     804              faulthandler.enable()
     805          with self.check_stderr_none():
     806              faulthandler.dump_traceback()
     807          with self.check_stderr_none():
     808              faulthandler.dump_traceback_later(1e-3)
     809          if hasattr(faulthandler, "register"):
     810              with self.check_stderr_none():
     811                  faulthandler.register(signal.SIGUSR1)
     812  
     813      @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
     814      def test_raise_exception(self):
     815          for exc, name in (
     816              ('EXCEPTION_ACCESS_VIOLATION', 'access violation'),
     817              ('EXCEPTION_INT_DIVIDE_BY_ZERO', 'int divide by zero'),
     818              ('EXCEPTION_STACK_OVERFLOW', 'stack overflow'),
     819          ):
     820              self.check_windows_exception(f"""
     821                  import faulthandler
     822                  faulthandler.enable()
     823                  faulthandler._raise_exception(faulthandler._{exc})
     824                  """,
     825                  3,
     826                  name)
     827  
     828      @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
     829      def test_ignore_exception(self):
     830          for exc_code in (
     831              0xE06D7363,   # MSC exception ("Emsc")
     832              0xE0434352,   # COM Callable Runtime exception ("ECCR")
     833          ):
     834              code = f"""
     835                      import faulthandler
     836                      faulthandler.enable()
     837                      faulthandler._raise_exception({exc_code})
     838                      """
     839              code = dedent(code)
     840              output, exitcode = self.get_output(code)
     841              self.assertEqual(output, [])
     842              self.assertEqual(exitcode, exc_code)
     843  
     844      @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
     845      def test_raise_nonfatal_exception(self):
     846          # These exceptions are not strictly errors. Letting
     847          # faulthandler display the traceback when they are
     848          # raised is likely to result in noise. However, they
     849          # may still terminate the process if there is no
     850          # handler installed for them (which there typically
     851          # is, e.g. for debug messages).
     852          for exc in (
     853              0x00000000,
     854              0x34567890,
     855              0x40000000,
     856              0x40001000,
     857              0x70000000,
     858              0x7FFFFFFF,
     859          ):
     860              output, exitcode = self.get_output(f"""
     861                  import faulthandler
     862                  faulthandler.enable()
     863                  faulthandler._raise_exception(0x{exc:x})
     864                  """
     865              )
     866              self.assertEqual(output, [])
     867              # On Windows older than 7 SP1, the actual exception code has
     868              # bit 29 cleared.
     869              self.assertIn(exitcode,
     870                            (exc, exc & ~0x10000000))
     871  
     872      @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
     873      def test_disable_windows_exc_handler(self):
     874          code = dedent("""
     875              import faulthandler
     876              faulthandler.enable()
     877              faulthandler.disable()
     878              code = faulthandler._EXCEPTION_ACCESS_VIOLATION
     879              faulthandler._raise_exception(code)
     880          """)
     881          output, exitcode = self.get_output(code)
     882          self.assertEqual(output, [])
     883          self.assertEqual(exitcode, 0xC0000005)
     884  
     885      def test_cancel_later_without_dump_traceback_later(self):
     886          # bpo-37933: Calling cancel_dump_traceback_later()
     887          # without dump_traceback_later() must not segfault.
     888          code = dedent("""
     889              import faulthandler
     890              faulthandler.cancel_dump_traceback_later()
     891          """)
     892          output, exitcode = self.get_output(code)
     893          self.assertEqual(output, [])
     894          self.assertEqual(exitcode, 0)
     895  
     896  
     897  if __name__ == "__main__":
     898      unittest.main()