(root)/
Python-3.12.0/
Lib/
test/
test_faulthandler.py
       1  from contextlib import contextmanager
       2  import datetime
       3  import faulthandler
       4  import os
       5  import re
       6  import signal
       7  import subprocess
       8  import sys
       9  from test import support
      10  from test.support import os_helper
      11  from test.support import script_helper, is_android
      12  from test.support import skip_if_sanitizer
      13  import tempfile
      14  import unittest
      15  from textwrap import dedent
      16  
      17  try:
      18      import _testcapi
      19  except ImportError:
      20      _testcapi = None
      21  
      22  if not support.has_subprocess_support:
      23      raise unittest.SkipTest("test module requires subprocess")
      24  
      25  TIMEOUT = 0.5
      26  MS_WINDOWS = (os.name == 'nt')
      27  
      28  
      29  def expected_traceback(lineno1, lineno2, header, min_count=1):
      30      regex = header
      31      regex += '  File "<string>", line %s in func\n' % lineno1
      32      regex += '  File "<string>", line %s in <module>' % lineno2
      33      if 1 < min_count:
      34          return '^' + (regex + '\n') * (min_count - 1) + regex
      35      else:
      36          return '^' + regex + '$'
      37  
      38  def skip_segfault_on_android(test):
      39      # Issue #32138: Raising SIGSEGV on Android may not cause a crash.
      40      return unittest.skipIf(is_android,
      41                             'raising SIGSEGV on Android is unreliable')(test)
      42  
      43  @contextmanager
      44  def temporary_filename():
      45      filename = tempfile.mktemp()
      46      try:
      47          yield filename
      48      finally:
      49          os_helper.unlink(filename)
      50  
      51  class ESC[4;38;5;81mFaultHandlerTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      52  
      53      def get_output(self, code, filename=None, fd=None):
      54          """
      55          Run the specified code in Python (in a new child process) and read the
      56          output from the standard error or from a file (if filename is set).
      57          Return the output lines as a list.
      58  
      59          Strip the reference count from the standard error for Python debug
      60          build, and replace "Current thread 0x00007f8d8fbd9700" by "Current
      61          thread XXX".
      62          """
      63          code = dedent(code).strip()
      64          pass_fds = []
      65          if fd is not None:
      66              pass_fds.append(fd)
      67          with support.SuppressCrashReport():
      68              process = script_helper.spawn_python('-c', code, pass_fds=pass_fds)
      69              with process:
      70                  output, stderr = process.communicate()
      71                  exitcode = process.wait()
      72          output = output.decode('ascii', 'backslashreplace')
      73          if filename:
      74              self.assertEqual(output, '')
      75              with open(filename, "rb") as fp:
      76                  output = fp.read()
      77              output = output.decode('ascii', 'backslashreplace')
      78          elif fd is not None:
      79              self.assertEqual(output, '')
      80              os.lseek(fd, os.SEEK_SET, 0)
      81              with open(fd, "rb", closefd=False) as fp:
      82                  output = fp.read()
      83              output = output.decode('ascii', 'backslashreplace')
      84          return output.splitlines(), exitcode
      85  
      86      def check_error(self, code, lineno, fatal_error, *,
      87                      filename=None, all_threads=True, other_regex=None,
      88                      fd=None, know_current_thread=True,
      89                      py_fatal_error=False,
      90                      garbage_collecting=False,
      91                      function='<module>'):
      92          """
      93          Check that the fault handler for fatal errors is enabled and check the
      94          traceback from the child process output.
      95  
      96          Raise an error if the output doesn't match the expected format.
      97          """
      98          if all_threads:
      99              if know_current_thread:
     100                  header = 'Current thread 0x[0-9a-f]+'
     101              else:
     102                  header = 'Thread 0x[0-9a-f]+'
     103          else:
     104              header = 'Stack'
     105          regex = [f'^{fatal_error}']
     106          if py_fatal_error:
     107              regex.append("Python runtime state: initialized")
     108          regex.append('')
     109          regex.append(fr'{header} \(most recent call first\):')
     110          if garbage_collecting:
     111              regex.append('  Garbage-collecting')
     112          regex.append(fr'  File "<string>", line {lineno} in {function}')
     113          regex = '\n'.join(regex)
     114  
     115          if other_regex:
     116              regex = f'(?:{regex}|{other_regex})'
     117  
     118          # Enable MULTILINE flag
     119          regex = f'(?m){regex}'
     120          output, exitcode = self.get_output(code, filename=filename, fd=fd)
     121          output = '\n'.join(output)
     122          self.assertRegex(output, regex)
     123          self.assertNotEqual(exitcode, 0)
     124  
     125      def check_fatal_error(self, code, line_number, name_regex, func=None, **kw):
     126          if func:
     127              name_regex = '%s: %s' % (func, name_regex)
     128          fatal_error = 'Fatal Python error: %s' % name_regex
     129          self.check_error(code, line_number, fatal_error, **kw)
     130  
     131      def check_windows_exception(self, code, line_number, name_regex, **kw):
     132          fatal_error = 'Windows fatal exception: %s' % name_regex
     133          self.check_error(code, line_number, fatal_error, **kw)
     134  
     135      @unittest.skipIf(sys.platform.startswith('aix'),
     136                       "the first page of memory is a mapped read-only on AIX")
     137      def test_read_null(self):
     138          if not MS_WINDOWS:
     139              self.check_fatal_error("""
     140                  import faulthandler
     141                  faulthandler.enable()
     142                  faulthandler._read_null()
     143                  """,
     144                  3,
     145                  # Issue #12700: Read NULL raises SIGILL on Mac OS X Lion
     146                  '(?:Segmentation fault'
     147                      '|Bus error'
     148                      '|Illegal instruction)')
     149          else:
     150              self.check_windows_exception("""
     151                  import faulthandler
     152                  faulthandler.enable()
     153                  faulthandler._read_null()
     154                  """,
     155                  3,
     156                  'access violation')
     157  
     158      @skip_segfault_on_android
     159      def test_sigsegv(self):
     160          self.check_fatal_error("""
     161              import faulthandler
     162              faulthandler.enable()
     163              faulthandler._sigsegv()
     164              """,
     165              3,
     166              'Segmentation fault')
     167  
     168      @skip_segfault_on_android
     169      def test_gc(self):
     170          # bpo-44466: Detect if the GC is running
     171          self.check_fatal_error("""
     172              import faulthandler
     173              import gc
     174              import sys
     175  
     176              faulthandler.enable()
     177  
     178              class RefCycle:
     179                  def __del__(self):
     180                      faulthandler._sigsegv()
     181  
     182              # create a reference cycle which triggers a fatal
     183              # error in a destructor
     184              a = RefCycle()
     185              b = RefCycle()
     186              a.b = b
     187              b.a = a
     188  
     189              # Delete the objects, not the cycle
     190              a = None
     191              b = None
     192  
     193              # Break the reference cycle: call __del__()
     194              gc.collect()
     195  
     196              # Should not reach this line
     197              print("exit", file=sys.stderr)
     198              """,
     199              9,
     200              'Segmentation fault',
     201              function='__del__',
     202              garbage_collecting=True)
     203  
     204      def test_fatal_error_c_thread(self):
     205          self.check_fatal_error("""
     206              import faulthandler
     207              faulthandler.enable()
     208              faulthandler._fatal_error_c_thread()
     209              """,
     210              3,
     211              'in new thread',
     212              know_current_thread=False,
     213              func='faulthandler_fatal_error_thread',
     214              py_fatal_error=True)
     215  
     216      def test_sigabrt(self):
     217          self.check_fatal_error("""
     218              import faulthandler
     219              faulthandler.enable()
     220              faulthandler._sigabrt()
     221              """,
     222              3,
     223              'Aborted')
     224  
     225      @unittest.skipIf(sys.platform == 'win32',
     226                       "SIGFPE cannot be caught on Windows")
     227      def test_sigfpe(self):
     228          self.check_fatal_error("""
     229              import faulthandler
     230              faulthandler.enable()
     231              faulthandler._sigfpe()
     232              """,
     233              3,
     234              'Floating point exception')
     235  
     236      @unittest.skipIf(_testcapi is None, 'need _testcapi')
     237      @unittest.skipUnless(hasattr(signal, 'SIGBUS'), 'need signal.SIGBUS')
     238      @skip_segfault_on_android
     239      def test_sigbus(self):
     240          self.check_fatal_error("""
     241              import faulthandler
     242              import signal
     243  
     244              faulthandler.enable()
     245              signal.raise_signal(signal.SIGBUS)
     246              """,
     247              5,
     248              'Bus error')
     249  
     250      @unittest.skipIf(_testcapi is None, 'need _testcapi')
     251      @unittest.skipUnless(hasattr(signal, 'SIGILL'), 'need signal.SIGILL')
     252      @skip_segfault_on_android
     253      def test_sigill(self):
     254          self.check_fatal_error("""
     255              import faulthandler
     256              import signal
     257  
     258              faulthandler.enable()
     259              signal.raise_signal(signal.SIGILL)
     260              """,
     261              5,
     262              'Illegal instruction')
     263  
     264      def check_fatal_error_func(self, release_gil):
     265          # Test that Py_FatalError() dumps a traceback
     266          with support.SuppressCrashReport():
     267              self.check_fatal_error(f"""
     268                  import _testcapi
     269                  _testcapi.fatal_error(b'xyz', {release_gil})
     270                  """,
     271                  2,
     272                  'xyz',
     273                  func='_testcapi_fatal_error_impl',
     274                  py_fatal_error=True)
     275  
     276      def test_fatal_error(self):
     277          self.check_fatal_error_func(False)
     278  
     279      def test_fatal_error_without_gil(self):
     280          self.check_fatal_error_func(True)
     281  
     282      @unittest.skipIf(sys.platform.startswith('openbsd'),
     283                       "Issue #12868: sigaltstack() doesn't work on "
     284                       "OpenBSD if Python is compiled with pthread")
     285      @unittest.skipIf(not hasattr(faulthandler, '_stack_overflow'),
     286                       'need faulthandler._stack_overflow()')
     287      def test_stack_overflow(self):
     288          self.check_fatal_error("""
     289              import faulthandler
     290              faulthandler.enable()
     291              faulthandler._stack_overflow()
     292              """,
     293              3,
     294              '(?:Segmentation fault|Bus error)',
     295              other_regex='unable to raise a stack overflow')
     296  
     297      @skip_segfault_on_android
     298      def test_gil_released(self):
     299          self.check_fatal_error("""
     300              import faulthandler
     301              faulthandler.enable()
     302              faulthandler._sigsegv(True)
     303              """,
     304              3,
     305              'Segmentation fault')
     306  
     307      @skip_if_sanitizer(memory=True, ub=True, reason="sanitizer "
     308                         "builds change crashing process output.")
     309      @skip_segfault_on_android
     310      def test_enable_file(self):
     311          with temporary_filename() as filename:
     312              self.check_fatal_error("""
     313                  import faulthandler
     314                  output = open({filename}, 'wb')
     315                  faulthandler.enable(output)
     316                  faulthandler._sigsegv()
     317                  """.format(filename=repr(filename)),
     318                  4,
     319                  'Segmentation fault',
     320                  filename=filename)
     321  
     322      @unittest.skipIf(sys.platform == "win32",
     323                       "subprocess doesn't support pass_fds on Windows")
     324      @skip_if_sanitizer(memory=True, ub=True, reason="sanitizer "
     325                         "builds change crashing process output.")
     326      @skip_segfault_on_android
     327      def test_enable_fd(self):
     328          with tempfile.TemporaryFile('wb+') as fp:
     329              fd = fp.fileno()
     330              self.check_fatal_error("""
     331                  import faulthandler
     332                  import sys
     333                  faulthandler.enable(%s)
     334                  faulthandler._sigsegv()
     335                  """ % fd,
     336                  4,
     337                  'Segmentation fault',
     338                  fd=fd)
     339  
     340      @skip_segfault_on_android
     341      def test_enable_single_thread(self):
     342          self.check_fatal_error("""
     343              import faulthandler
     344              faulthandler.enable(all_threads=False)
     345              faulthandler._sigsegv()
     346              """,
     347              3,
     348              'Segmentation fault',
     349              all_threads=False)
     350  
     351      @skip_segfault_on_android
     352      def test_disable(self):
     353          code = """
     354              import faulthandler
     355              faulthandler.enable()
     356              faulthandler.disable()
     357              faulthandler._sigsegv()
     358              """
     359          not_expected = 'Fatal Python error'
     360          stderr, exitcode = self.get_output(code)
     361          stderr = '\n'.join(stderr)
     362          self.assertTrue(not_expected not in stderr,
     363                       "%r is present in %r" % (not_expected, stderr))
     364          self.assertNotEqual(exitcode, 0)
     365  
     366      @skip_segfault_on_android
     367      def test_dump_ext_modules(self):
     368          code = """
     369              import faulthandler
     370              import sys
     371              # Don't filter stdlib module names
     372              sys.stdlib_module_names = frozenset()
     373              faulthandler.enable()
     374              faulthandler._sigsegv()
     375              """
     376          stderr, exitcode = self.get_output(code)
     377          stderr = '\n'.join(stderr)
     378          match = re.search(r'^Extension modules:(.*) \(total: [0-9]+\)$',
     379                            stderr, re.MULTILINE)
     380          if not match:
     381              self.fail(f"Cannot find 'Extension modules:' in {stderr!r}")
     382          modules = set(match.group(1).strip().split(', '))
     383          for name in ('sys', 'faulthandler'):
     384              self.assertIn(name, modules)
     385  
     386      def test_is_enabled(self):
     387          orig_stderr = sys.stderr
     388          try:
     389              # regrtest may replace sys.stderr by io.StringIO object, but
     390              # faulthandler.enable() requires that sys.stderr has a fileno()
     391              # method
     392              sys.stderr = sys.__stderr__
     393  
     394              was_enabled = faulthandler.is_enabled()
     395              try:
     396                  faulthandler.enable()
     397                  self.assertTrue(faulthandler.is_enabled())
     398                  faulthandler.disable()
     399                  self.assertFalse(faulthandler.is_enabled())
     400              finally:
     401                  if was_enabled:
     402                      faulthandler.enable()
     403                  else:
     404                      faulthandler.disable()
     405          finally:
     406              sys.stderr = orig_stderr
     407  
     408      @support.requires_subprocess()
     409      def test_disabled_by_default(self):
     410          # By default, the module should be disabled
     411          code = "import faulthandler; print(faulthandler.is_enabled())"
     412          args = (sys.executable, "-E", "-c", code)
     413          # don't use assert_python_ok() because it always enables faulthandler
     414          output = subprocess.check_output(args)
     415          self.assertEqual(output.rstrip(), b"False")
     416  
     417      @support.requires_subprocess()
     418      def test_sys_xoptions(self):
     419          # Test python -X faulthandler
     420          code = "import faulthandler; print(faulthandler.is_enabled())"
     421          args = filter(None, (sys.executable,
     422                               "-E" if sys.flags.ignore_environment else "",
     423                               "-X", "faulthandler", "-c", code))
     424          env = os.environ.copy()
     425          env.pop("PYTHONFAULTHANDLER", None)
     426          # don't use assert_python_ok() because it always enables faulthandler
     427          output = subprocess.check_output(args, env=env)
     428          self.assertEqual(output.rstrip(), b"True")
     429  
     430      @support.requires_subprocess()
     431      def test_env_var(self):
     432          # empty env var
     433          code = "import faulthandler; print(faulthandler.is_enabled())"
     434          args = (sys.executable, "-c", code)
     435          env = dict(os.environ)
     436          env['PYTHONFAULTHANDLER'] = ''
     437          env['PYTHONDEVMODE'] = ''
     438          # don't use assert_python_ok() because it always enables faulthandler
     439          output = subprocess.check_output(args, env=env)
     440          self.assertEqual(output.rstrip(), b"False")
     441  
     442          # non-empty env var
     443          env = dict(os.environ)
     444          env['PYTHONFAULTHANDLER'] = '1'
     445          env['PYTHONDEVMODE'] = ''
     446          output = subprocess.check_output(args, env=env)
     447          self.assertEqual(output.rstrip(), b"True")
     448  
     449      def check_dump_traceback(self, *, filename=None, fd=None):
     450          """
     451          Explicitly call dump_traceback() function and check its output.
     452          Raise an error if the output doesn't match the expected format.
     453          """
     454          code = """
     455              import faulthandler
     456  
     457              filename = {filename!r}
     458              fd = {fd}
     459  
     460              def funcB():
     461                  if filename:
     462                      with open(filename, "wb") as fp:
     463                          faulthandler.dump_traceback(fp, all_threads=False)
     464                  elif fd is not None:
     465                      faulthandler.dump_traceback(fd,
     466                                                  all_threads=False)
     467                  else:
     468                      faulthandler.dump_traceback(all_threads=False)
     469  
     470              def funcA():
     471                  funcB()
     472  
     473              funcA()
     474              """
     475          code = code.format(
     476              filename=filename,
     477              fd=fd,
     478          )
     479          if filename:
     480              lineno = 9
     481          elif fd is not None:
     482              lineno = 11
     483          else:
     484              lineno = 14
     485          expected = [
     486              'Stack (most recent call first):',
     487              '  File "<string>", line %s in funcB' % lineno,
     488              '  File "<string>", line 17 in funcA',
     489              '  File "<string>", line 19 in <module>'
     490          ]
     491          trace, exitcode = self.get_output(code, filename, fd)
     492          self.assertEqual(trace, expected)
     493          self.assertEqual(exitcode, 0)
     494  
     495      def test_dump_traceback(self):
     496          self.check_dump_traceback()
     497  
     498      def test_dump_traceback_file(self):
     499          with temporary_filename() as filename:
     500              self.check_dump_traceback(filename=filename)
     501  
     502      @unittest.skipIf(sys.platform == "win32",
     503                       "subprocess doesn't support pass_fds on Windows")
     504      def test_dump_traceback_fd(self):
     505          with tempfile.TemporaryFile('wb+') as fp:
     506              self.check_dump_traceback(fd=fp.fileno())
     507  
     508      def test_truncate(self):
     509          maxlen = 500
     510          func_name = 'x' * (maxlen + 50)
     511          truncated = 'x' * maxlen + '...'
     512          code = """
     513              import faulthandler
     514  
     515              def {func_name}():
     516                  faulthandler.dump_traceback(all_threads=False)
     517  
     518              {func_name}()
     519              """
     520          code = code.format(
     521              func_name=func_name,
     522          )
     523          expected = [
     524              'Stack (most recent call first):',
     525              '  File "<string>", line 4 in %s' % truncated,
     526              '  File "<string>", line 6 in <module>'
     527          ]
     528          trace, exitcode = self.get_output(code)
     529          self.assertEqual(trace, expected)
     530          self.assertEqual(exitcode, 0)
     531  
     532      def check_dump_traceback_threads(self, filename):
     533          """
     534          Call explicitly dump_traceback(all_threads=True) and check the output.
     535          Raise an error if the output doesn't match the expected format.
     536          """
     537          code = """
     538              import faulthandler
     539              from threading import Thread, Event
     540              import time
     541  
     542              def dump():
     543                  if {filename}:
     544                      with open({filename}, "wb") as fp:
     545                          faulthandler.dump_traceback(fp, all_threads=True)
     546                  else:
     547                      faulthandler.dump_traceback(all_threads=True)
     548  
     549              class Waiter(Thread):
     550                  # avoid blocking if the main thread raises an exception.
     551                  daemon = True
     552  
     553                  def __init__(self):
     554                      Thread.__init__(self)
     555                      self.running = Event()
     556                      self.stop = Event()
     557  
     558                  def run(self):
     559                      self.running.set()
     560                      self.stop.wait()
     561  
     562              waiter = Waiter()
     563              waiter.start()
     564              waiter.running.wait()
     565              dump()
     566              waiter.stop.set()
     567              waiter.join()
     568              """
     569          code = code.format(filename=repr(filename))
     570          output, exitcode = self.get_output(code, filename)
     571          output = '\n'.join(output)
     572          if filename:
     573              lineno = 8
     574          else:
     575              lineno = 10
     576          regex = r"""
     577              ^Thread 0x[0-9a-f]+ \(most recent call first\):
     578              (?:  File ".*threading.py", line [0-9]+ in [_a-z]+
     579              ){{1,3}}  File "<string>", line 23 in run
     580                File ".*threading.py", line [0-9]+ in _bootstrap_inner
     581                File ".*threading.py", line [0-9]+ in _bootstrap
     582  
     583              Current thread 0x[0-9a-f]+ \(most recent call first\):
     584                File "<string>", line {lineno} in dump
     585                File "<string>", line 28 in <module>$
     586              """
     587          regex = dedent(regex.format(lineno=lineno)).strip()
     588          self.assertRegex(output, regex)
     589          self.assertEqual(exitcode, 0)
     590  
     591      def test_dump_traceback_threads(self):
     592          self.check_dump_traceback_threads(None)
     593  
     594      def test_dump_traceback_threads_file(self):
     595          with temporary_filename() as filename:
     596              self.check_dump_traceback_threads(filename)
     597  
     598      def check_dump_traceback_later(self, repeat=False, cancel=False, loops=1,
     599                                     *, filename=None, fd=None):
     600          """
     601          Check how many times the traceback is written in timeout x 2.5 seconds,
     602          or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending
     603          on repeat and cancel options.
     604  
     605          Raise an error if the output doesn't match the expect format.
     606          """
     607          timeout_str = str(datetime.timedelta(seconds=TIMEOUT))
     608          code = """
     609              import faulthandler
     610              import time
     611              import sys
     612  
     613              timeout = {timeout}
     614              repeat = {repeat}
     615              cancel = {cancel}
     616              loops = {loops}
     617              filename = {filename!r}
     618              fd = {fd}
     619  
     620              def func(timeout, repeat, cancel, file, loops):
     621                  for loop in range(loops):
     622                      faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file)
     623                      if cancel:
     624                          faulthandler.cancel_dump_traceback_later()
     625                      time.sleep(timeout * 5)
     626                      faulthandler.cancel_dump_traceback_later()
     627  
     628              if filename:
     629                  file = open(filename, "wb")
     630              elif fd is not None:
     631                  file = sys.stderr.fileno()
     632              else:
     633                  file = None
     634              func(timeout, repeat, cancel, file, loops)
     635              if filename:
     636                  file.close()
     637              """
     638          code = code.format(
     639              timeout=TIMEOUT,
     640              repeat=repeat,
     641              cancel=cancel,
     642              loops=loops,
     643              filename=filename,
     644              fd=fd,
     645          )
     646          trace, exitcode = self.get_output(code, filename)
     647          trace = '\n'.join(trace)
     648  
     649          if not cancel:
     650              count = loops
     651              if repeat:
     652                  count *= 2
     653              header = r'Timeout \(%s\)!\nThread 0x[0-9a-f]+ \(most recent call first\):\n' % timeout_str
     654              regex = expected_traceback(17, 26, header, min_count=count)
     655              self.assertRegex(trace, regex)
     656          else:
     657              self.assertEqual(trace, '')
     658          self.assertEqual(exitcode, 0)
     659  
     660      def test_dump_traceback_later(self):
     661          self.check_dump_traceback_later()
     662  
     663      def test_dump_traceback_later_repeat(self):
     664          self.check_dump_traceback_later(repeat=True)
     665  
     666      def test_dump_traceback_later_cancel(self):
     667          self.check_dump_traceback_later(cancel=True)
     668  
     669      def test_dump_traceback_later_file(self):
     670          with temporary_filename() as filename:
     671              self.check_dump_traceback_later(filename=filename)
     672  
     673      @unittest.skipIf(sys.platform == "win32",
     674                       "subprocess doesn't support pass_fds on Windows")
     675      def test_dump_traceback_later_fd(self):
     676          with tempfile.TemporaryFile('wb+') as fp:
     677              self.check_dump_traceback_later(fd=fp.fileno())
     678  
     679      @support.requires_resource('walltime')
     680      def test_dump_traceback_later_twice(self):
     681          self.check_dump_traceback_later(loops=2)
     682  
     683      @unittest.skipIf(not hasattr(faulthandler, "register"),
     684                       "need faulthandler.register")
     685      def check_register(self, filename=False, all_threads=False,
     686                         unregister=False, chain=False, fd=None):
     687          """
     688          Register a handler displaying the traceback on a user signal. Raise the
     689          signal and check the written traceback.
     690  
     691          If chain is True, check that the previous signal handler is called.
     692  
     693          Raise an error if the output doesn't match the expected format.
     694          """
     695          signum = signal.SIGUSR1
     696          code = """
     697              import faulthandler
     698              import os
     699              import signal
     700              import sys
     701  
     702              all_threads = {all_threads}
     703              signum = {signum:d}
     704              unregister = {unregister}
     705              chain = {chain}
     706              filename = {filename!r}
     707              fd = {fd}
     708  
     709              def func(signum):
     710                  os.kill(os.getpid(), signum)
     711  
     712              def handler(signum, frame):
     713                  handler.called = True
     714              handler.called = False
     715  
     716              if filename:
     717                  file = open(filename, "wb")
     718              elif fd is not None:
     719                  file = sys.stderr.fileno()
     720              else:
     721                  file = None
     722              if chain:
     723                  signal.signal(signum, handler)
     724              faulthandler.register(signum, file=file,
     725                                    all_threads=all_threads, chain={chain})
     726              if unregister:
     727                  faulthandler.unregister(signum)
     728              func(signum)
     729              if chain and not handler.called:
     730                  if file is not None:
     731                      output = file
     732                  else:
     733                      output = sys.stderr
     734                  print("Error: signal handler not called!", file=output)
     735                  exitcode = 1
     736              else:
     737                  exitcode = 0
     738              if filename:
     739                  file.close()
     740              sys.exit(exitcode)
     741              """
     742          code = code.format(
     743              all_threads=all_threads,
     744              signum=signum,
     745              unregister=unregister,
     746              chain=chain,
     747              filename=filename,
     748              fd=fd,
     749          )
     750          trace, exitcode = self.get_output(code, filename)
     751          trace = '\n'.join(trace)
     752          if not unregister:
     753              if all_threads:
     754                  regex = r'Current thread 0x[0-9a-f]+ \(most recent call first\):\n'
     755              else:
     756                  regex = r'Stack \(most recent call first\):\n'
     757              regex = expected_traceback(14, 32, regex)
     758              self.assertRegex(trace, regex)
     759          else:
     760              self.assertEqual(trace, '')
     761          if unregister:
     762              self.assertNotEqual(exitcode, 0)
     763          else:
     764              self.assertEqual(exitcode, 0)
     765  
     766      def test_register(self):
     767          self.check_register()
     768  
     769      def test_unregister(self):
     770          self.check_register(unregister=True)
     771  
     772      def test_register_file(self):
     773          with temporary_filename() as filename:
     774              self.check_register(filename=filename)
     775  
     776      @unittest.skipIf(sys.platform == "win32",
     777                       "subprocess doesn't support pass_fds on Windows")
     778      def test_register_fd(self):
     779          with tempfile.TemporaryFile('wb+') as fp:
     780              self.check_register(fd=fp.fileno())
     781  
     782      def test_register_threads(self):
     783          self.check_register(all_threads=True)
     784  
     785      def test_register_chain(self):
     786          self.check_register(chain=True)
     787  
     788      @contextmanager
     789      def check_stderr_none(self):
     790          stderr = sys.stderr
     791          try:
     792              sys.stderr = None
     793              with self.assertRaises(RuntimeError) as cm:
     794                  yield
     795              self.assertEqual(str(cm.exception), "sys.stderr is None")
     796          finally:
     797              sys.stderr = stderr
     798  
     799      def test_stderr_None(self):
     800          # Issue #21497: provide a helpful error if sys.stderr is None,
     801          # instead of just an attribute error: "None has no attribute fileno".
     802          with self.check_stderr_none():
     803              faulthandler.enable()
     804          with self.check_stderr_none():
     805              faulthandler.dump_traceback()
     806          with self.check_stderr_none():
     807              faulthandler.dump_traceback_later(1e-3)
     808          if hasattr(faulthandler, "register"):
     809              with self.check_stderr_none():
     810                  faulthandler.register(signal.SIGUSR1)
     811  
     812      @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
     813      def test_raise_exception(self):
     814          for exc, name in (
     815              ('EXCEPTION_ACCESS_VIOLATION', 'access violation'),
     816              ('EXCEPTION_INT_DIVIDE_BY_ZERO', 'int divide by zero'),
     817              ('EXCEPTION_STACK_OVERFLOW', 'stack overflow'),
     818          ):
     819              self.check_windows_exception(f"""
     820                  import faulthandler
     821                  faulthandler.enable()
     822                  faulthandler._raise_exception(faulthandler._{exc})
     823                  """,
     824                  3,
     825                  name)
     826  
     827      @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
     828      def test_ignore_exception(self):
     829          for exc_code in (
     830              0xE06D7363,   # MSC exception ("Emsc")
     831              0xE0434352,   # COM Callable Runtime exception ("ECCR")
     832          ):
     833              code = f"""
     834                      import faulthandler
     835                      faulthandler.enable()
     836                      faulthandler._raise_exception({exc_code})
     837                      """
     838              code = dedent(code)
     839              output, exitcode = self.get_output(code)
     840              self.assertEqual(output, [])
     841              self.assertEqual(exitcode, exc_code)
     842  
     843      @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
     844      def test_raise_nonfatal_exception(self):
     845          # These exceptions are not strictly errors. Letting
     846          # faulthandler display the traceback when they are
     847          # raised is likely to result in noise. However, they
     848          # may still terminate the process if there is no
     849          # handler installed for them (which there typically
     850          # is, e.g. for debug messages).
     851          for exc in (
     852              0x00000000,
     853              0x34567890,
     854              0x40000000,
     855              0x40001000,
     856              0x70000000,
     857              0x7FFFFFFF,
     858          ):
     859              output, exitcode = self.get_output(f"""
     860                  import faulthandler
     861                  faulthandler.enable()
     862                  faulthandler._raise_exception(0x{exc:x})
     863                  """
     864              )
     865              self.assertEqual(output, [])
     866              # On Windows older than 7 SP1, the actual exception code has
     867              # bit 29 cleared.
     868              self.assertIn(exitcode,
     869                            (exc, exc & ~0x10000000))
     870  
     871      @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
     872      def test_disable_windows_exc_handler(self):
     873          code = dedent("""
     874              import faulthandler
     875              faulthandler.enable()
     876              faulthandler.disable()
     877              code = faulthandler._EXCEPTION_ACCESS_VIOLATION
     878              faulthandler._raise_exception(code)
     879          """)
     880          output, exitcode = self.get_output(code)
     881          self.assertEqual(output, [])
     882          self.assertEqual(exitcode, 0xC0000005)
     883  
     884      def test_cancel_later_without_dump_traceback_later(self):
     885          # bpo-37933: Calling cancel_dump_traceback_later()
     886          # without dump_traceback_later() must not segfault.
     887          code = dedent("""
     888              import faulthandler
     889              faulthandler.cancel_dump_traceback_later()
     890          """)
     891          output, exitcode = self.get_output(code)
     892          self.assertEqual(output, [])
     893          self.assertEqual(exitcode, 0)
     894  
     895  
     896  if __name__ == "__main__":
     897      unittest.main()