(root)/
Python-3.11.7/
Lib/
test/
test_signal.py
       1  import enum
       2  import errno
       3  import inspect
       4  import os
       5  import random
       6  import signal
       7  import socket
       8  import statistics
       9  import subprocess
      10  import sys
      11  import threading
      12  import time
      13  import unittest
      14  from test import support
      15  from test.support import os_helper
      16  from test.support.script_helper import assert_python_ok, spawn_python
      17  from test.support import threading_helper
      18  try:
      19      import _testcapi
      20  except ImportError:
      21      _testcapi = None
      22  
      23  
      24  class ESC[4;38;5;81mGenericTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      25  
      26      def test_enums(self):
      27          for name in dir(signal):
      28              sig = getattr(signal, name)
      29              if name in {'SIG_DFL', 'SIG_IGN'}:
      30                  self.assertIsInstance(sig, signal.Handlers)
      31              elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}:
      32                  self.assertIsInstance(sig, signal.Sigmasks)
      33              elif name.startswith('SIG') and not name.startswith('SIG_'):
      34                  self.assertIsInstance(sig, signal.Signals)
      35              elif name.startswith('CTRL_'):
      36                  self.assertIsInstance(sig, signal.Signals)
      37                  self.assertEqual(sys.platform, "win32")
      38  
      39          CheckedSignals = enum._old_convert_(
      40                  enum.IntEnum, 'Signals', 'signal',
      41                  lambda name:
      42                      name.isupper()
      43                      and (name.startswith('SIG') and not name.startswith('SIG_'))
      44                      or name.startswith('CTRL_'),
      45                  source=signal,
      46                  )
      47          enum._test_simple_enum(CheckedSignals, signal.Signals)
      48  
      49          CheckedHandlers = enum._old_convert_(
      50                  enum.IntEnum, 'Handlers', 'signal',
      51                  lambda name: name in ('SIG_DFL', 'SIG_IGN'),
      52                  source=signal,
      53                  )
      54          enum._test_simple_enum(CheckedHandlers, signal.Handlers)
      55  
      56          Sigmasks = getattr(signal, 'Sigmasks', None)
      57          if Sigmasks is not None:
      58              CheckedSigmasks = enum._old_convert_(
      59                      enum.IntEnum, 'Sigmasks', 'signal',
      60                      lambda name: name in ('SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'),
      61                      source=signal,
      62                      )
      63              enum._test_simple_enum(CheckedSigmasks, Sigmasks)
      64  
      65      def test_functions_module_attr(self):
      66          # Issue #27718: If __all__ is not defined all non-builtin functions
      67          # should have correct __module__ to be displayed by pydoc.
      68          for name in dir(signal):
      69              value = getattr(signal, name)
      70              if inspect.isroutine(value) and not inspect.isbuiltin(value):
      71                  self.assertEqual(value.__module__, 'signal')
      72  
      73  
      74  @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
      75  class ESC[4;38;5;81mPosixTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      76      def trivial_signal_handler(self, *args):
      77          pass
      78  
      79      def test_out_of_range_signal_number_raises_error(self):
      80          self.assertRaises(ValueError, signal.getsignal, 4242)
      81  
      82          self.assertRaises(ValueError, signal.signal, 4242,
      83                            self.trivial_signal_handler)
      84  
      85          self.assertRaises(ValueError, signal.strsignal, 4242)
      86  
      87      def test_setting_signal_handler_to_none_raises_error(self):
      88          self.assertRaises(TypeError, signal.signal,
      89                            signal.SIGUSR1, None)
      90  
      91      def test_getsignal(self):
      92          hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
      93          self.assertIsInstance(hup, signal.Handlers)
      94          self.assertEqual(signal.getsignal(signal.SIGHUP),
      95                           self.trivial_signal_handler)
      96          signal.signal(signal.SIGHUP, hup)
      97          self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
      98  
      99      def test_strsignal(self):
     100          self.assertIn("Interrupt", signal.strsignal(signal.SIGINT))
     101          self.assertIn("Terminated", signal.strsignal(signal.SIGTERM))
     102          self.assertIn("Hangup", signal.strsignal(signal.SIGHUP))
     103  
     104      # Issue 3864, unknown if this affects earlier versions of freebsd also
     105      def test_interprocess_signal(self):
     106          dirname = os.path.dirname(__file__)
     107          script = os.path.join(dirname, 'signalinterproctester.py')
     108          assert_python_ok(script)
     109  
     110      @unittest.skipUnless(
     111          hasattr(signal, "valid_signals"),
     112          "requires signal.valid_signals"
     113      )
     114      def test_valid_signals(self):
     115          s = signal.valid_signals()
     116          self.assertIsInstance(s, set)
     117          self.assertIn(signal.Signals.SIGINT, s)
     118          self.assertIn(signal.Signals.SIGALRM, s)
     119          self.assertNotIn(0, s)
     120          self.assertNotIn(signal.NSIG, s)
     121          self.assertLess(len(s), signal.NSIG)
     122  
     123          # gh-91145: Make sure that all SIGxxx constants exposed by the Python
     124          # signal module have a number in the [0; signal.NSIG-1] range.
     125          for name in dir(signal):
     126              if not name.startswith("SIG"):
     127                  continue
     128              if name in {"SIG_IGN", "SIG_DFL"}:
     129                  # SIG_IGN and SIG_DFL are pointers
     130                  continue
     131              with self.subTest(name=name):
     132                  signum = getattr(signal, name)
     133                  self.assertGreaterEqual(signum, 0)
     134                  self.assertLess(signum, signal.NSIG)
     135  
     136      @unittest.skipUnless(sys.executable, "sys.executable required.")
     137      @support.requires_subprocess()
     138      def test_keyboard_interrupt_exit_code(self):
     139          """KeyboardInterrupt triggers exit via SIGINT."""
     140          process = subprocess.run(
     141                  [sys.executable, "-c",
     142                   "import os, signal, time\n"
     143                   "os.kill(os.getpid(), signal.SIGINT)\n"
     144                   "for _ in range(999): time.sleep(0.01)"],
     145                  stderr=subprocess.PIPE)
     146          self.assertIn(b"KeyboardInterrupt", process.stderr)
     147          self.assertEqual(process.returncode, -signal.SIGINT)
     148          # Caveat: The exit code is insufficient to guarantee we actually died
     149          # via a signal.  POSIX shells do more than look at the 8 bit value.
     150          # Writing an automation friendly test of an interactive shell
     151          # to confirm that our process died via a SIGINT proved too complex.
     152  
     153  
     154  @unittest.skipUnless(sys.platform == "win32", "Windows specific")
     155  class ESC[4;38;5;81mWindowsSignalTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     156  
     157      def test_valid_signals(self):
     158          s = signal.valid_signals()
     159          self.assertIsInstance(s, set)
     160          self.assertGreaterEqual(len(s), 6)
     161          self.assertIn(signal.Signals.SIGINT, s)
     162          self.assertNotIn(0, s)
     163          self.assertNotIn(signal.NSIG, s)
     164          self.assertLess(len(s), signal.NSIG)
     165  
     166      def test_issue9324(self):
     167          # Updated for issue #10003, adding SIGBREAK
     168          handler = lambda x, y: None
     169          checked = set()
     170          for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
     171                      signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
     172                      signal.SIGTERM):
     173              # Set and then reset a handler for signals that work on windows.
     174              # Issue #18396, only for signals without a C-level handler.
     175              if signal.getsignal(sig) is not None:
     176                  signal.signal(sig, signal.signal(sig, handler))
     177                  checked.add(sig)
     178          # Issue #18396: Ensure the above loop at least tested *something*
     179          self.assertTrue(checked)
     180  
     181          with self.assertRaises(ValueError):
     182              signal.signal(-1, handler)
     183  
     184          with self.assertRaises(ValueError):
     185              signal.signal(7, handler)
     186  
     187      @unittest.skipUnless(sys.executable, "sys.executable required.")
     188      @support.requires_subprocess()
     189      def test_keyboard_interrupt_exit_code(self):
     190          """KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT."""
     191          # We don't test via os.kill(os.getpid(), signal.CTRL_C_EVENT) here
     192          # as that requires setting up a console control handler in a child
     193          # in its own process group.  Doable, but quite complicated.  (see
     194          # @eryksun on https://github.com/python/cpython/pull/11862)
     195          process = subprocess.run(
     196                  [sys.executable, "-c", "raise KeyboardInterrupt"],
     197                  stderr=subprocess.PIPE)
     198          self.assertIn(b"KeyboardInterrupt", process.stderr)
     199          STATUS_CONTROL_C_EXIT = 0xC000013A
     200          self.assertEqual(process.returncode, STATUS_CONTROL_C_EXIT)
     201  
     202  
     203  class ESC[4;38;5;81mWakeupFDTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     204  
     205      def test_invalid_call(self):
     206          # First parameter is positional-only
     207          with self.assertRaises(TypeError):
     208              signal.set_wakeup_fd(signum=signal.SIGINT)
     209  
     210          # warn_on_full_buffer is a keyword-only parameter
     211          with self.assertRaises(TypeError):
     212              signal.set_wakeup_fd(signal.SIGINT, False)
     213  
     214      def test_invalid_fd(self):
     215          fd = os_helper.make_bad_fd()
     216          self.assertRaises((ValueError, OSError),
     217                            signal.set_wakeup_fd, fd)
     218  
     219      @unittest.skipUnless(support.has_socket_support, "needs working sockets.")
     220      def test_invalid_socket(self):
     221          sock = socket.socket()
     222          fd = sock.fileno()
     223          sock.close()
     224          self.assertRaises((ValueError, OSError),
     225                            signal.set_wakeup_fd, fd)
     226  
     227      # Emscripten does not support fstat on pipes yet.
     228      # https://github.com/emscripten-core/emscripten/issues/16414
     229      @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
     230      @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
     231      def test_set_wakeup_fd_result(self):
     232          r1, w1 = os.pipe()
     233          self.addCleanup(os.close, r1)
     234          self.addCleanup(os.close, w1)
     235          r2, w2 = os.pipe()
     236          self.addCleanup(os.close, r2)
     237          self.addCleanup(os.close, w2)
     238  
     239          if hasattr(os, 'set_blocking'):
     240              os.set_blocking(w1, False)
     241              os.set_blocking(w2, False)
     242  
     243          signal.set_wakeup_fd(w1)
     244          self.assertEqual(signal.set_wakeup_fd(w2), w1)
     245          self.assertEqual(signal.set_wakeup_fd(-1), w2)
     246          self.assertEqual(signal.set_wakeup_fd(-1), -1)
     247  
     248      @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
     249      @unittest.skipUnless(support.has_socket_support, "needs working sockets.")
     250      def test_set_wakeup_fd_socket_result(self):
     251          sock1 = socket.socket()
     252          self.addCleanup(sock1.close)
     253          sock1.setblocking(False)
     254          fd1 = sock1.fileno()
     255  
     256          sock2 = socket.socket()
     257          self.addCleanup(sock2.close)
     258          sock2.setblocking(False)
     259          fd2 = sock2.fileno()
     260  
     261          signal.set_wakeup_fd(fd1)
     262          self.assertEqual(signal.set_wakeup_fd(fd2), fd1)
     263          self.assertEqual(signal.set_wakeup_fd(-1), fd2)
     264          self.assertEqual(signal.set_wakeup_fd(-1), -1)
     265  
     266      # On Windows, files are always blocking and Windows does not provide a
     267      # function to test if a socket is in non-blocking mode.
     268      @unittest.skipIf(sys.platform == "win32", "tests specific to POSIX")
     269      @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
     270      @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
     271      def test_set_wakeup_fd_blocking(self):
     272          rfd, wfd = os.pipe()
     273          self.addCleanup(os.close, rfd)
     274          self.addCleanup(os.close, wfd)
     275  
     276          # fd must be non-blocking
     277          os.set_blocking(wfd, True)
     278          with self.assertRaises(ValueError) as cm:
     279              signal.set_wakeup_fd(wfd)
     280          self.assertEqual(str(cm.exception),
     281                           "the fd %s must be in non-blocking mode" % wfd)
     282  
     283          # non-blocking is ok
     284          os.set_blocking(wfd, False)
     285          signal.set_wakeup_fd(wfd)
     286          signal.set_wakeup_fd(-1)
     287  
     288  
     289  @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
     290  class ESC[4;38;5;81mWakeupSignalTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     291      @unittest.skipIf(_testcapi is None, 'need _testcapi')
     292      def check_wakeup(self, test_body, *signals, ordered=True):
     293          # use a subprocess to have only one thread
     294          code = """if 1:
     295          import _testcapi
     296          import os
     297          import signal
     298          import struct
     299  
     300          signals = {!r}
     301  
     302          def handler(signum, frame):
     303              pass
     304  
     305          def check_signum(signals):
     306              data = os.read(read, len(signals)+1)
     307              raised = struct.unpack('%uB' % len(data), data)
     308              if not {!r}:
     309                  raised = set(raised)
     310                  signals = set(signals)
     311              if raised != signals:
     312                  raise Exception("%r != %r" % (raised, signals))
     313  
     314          {}
     315  
     316          signal.signal(signal.SIGALRM, handler)
     317          read, write = os.pipe()
     318          os.set_blocking(write, False)
     319          signal.set_wakeup_fd(write)
     320  
     321          test()
     322          check_signum(signals)
     323  
     324          os.close(read)
     325          os.close(write)
     326          """.format(tuple(map(int, signals)), ordered, test_body)
     327  
     328          assert_python_ok('-c', code)
     329  
     330      @unittest.skipIf(_testcapi is None, 'need _testcapi')
     331      @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
     332      def test_wakeup_write_error(self):
     333          # Issue #16105: write() errors in the C signal handler should not
     334          # pass silently.
     335          # Use a subprocess to have only one thread.
     336          code = """if 1:
     337          import _testcapi
     338          import errno
     339          import os
     340          import signal
     341          import sys
     342          from test.support import captured_stderr
     343  
     344          def handler(signum, frame):
     345              1/0
     346  
     347          signal.signal(signal.SIGALRM, handler)
     348          r, w = os.pipe()
     349          os.set_blocking(r, False)
     350  
     351          # Set wakeup_fd a read-only file descriptor to trigger the error
     352          signal.set_wakeup_fd(r)
     353          try:
     354              with captured_stderr() as err:
     355                  signal.raise_signal(signal.SIGALRM)
     356          except ZeroDivisionError:
     357              # An ignored exception should have been printed out on stderr
     358              err = err.getvalue()
     359              if ('Exception ignored when trying to write to the signal wakeup fd'
     360                  not in err):
     361                  raise AssertionError(err)
     362              if ('OSError: [Errno %d]' % errno.EBADF) not in err:
     363                  raise AssertionError(err)
     364          else:
     365              raise AssertionError("ZeroDivisionError not raised")
     366  
     367          os.close(r)
     368          os.close(w)
     369          """
     370          r, w = os.pipe()
     371          try:
     372              os.write(r, b'x')
     373          except OSError:
     374              pass
     375          else:
     376              self.skipTest("OS doesn't report write() error on the read end of a pipe")
     377          finally:
     378              os.close(r)
     379              os.close(w)
     380  
     381          assert_python_ok('-c', code)
     382  
     383      def test_wakeup_fd_early(self):
     384          self.check_wakeup("""def test():
     385              import select
     386              import time
     387  
     388              TIMEOUT_FULL = 10
     389              TIMEOUT_HALF = 5
     390  
     391              class InterruptSelect(Exception):
     392                  pass
     393  
     394              def handler(signum, frame):
     395                  raise InterruptSelect
     396              signal.signal(signal.SIGALRM, handler)
     397  
     398              signal.alarm(1)
     399  
     400              # We attempt to get a signal during the sleep,
     401              # before select is called
     402              try:
     403                  select.select([], [], [], TIMEOUT_FULL)
     404              except InterruptSelect:
     405                  pass
     406              else:
     407                  raise Exception("select() was not interrupted")
     408  
     409              before_time = time.monotonic()
     410              select.select([read], [], [], TIMEOUT_FULL)
     411              after_time = time.monotonic()
     412              dt = after_time - before_time
     413              if dt >= TIMEOUT_HALF:
     414                  raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
     415          """, signal.SIGALRM)
     416  
     417      def test_wakeup_fd_during(self):
     418          self.check_wakeup("""def test():
     419              import select
     420              import time
     421  
     422              TIMEOUT_FULL = 10
     423              TIMEOUT_HALF = 5
     424  
     425              class InterruptSelect(Exception):
     426                  pass
     427  
     428              def handler(signum, frame):
     429                  raise InterruptSelect
     430              signal.signal(signal.SIGALRM, handler)
     431  
     432              signal.alarm(1)
     433              before_time = time.monotonic()
     434              # We attempt to get a signal during the select call
     435              try:
     436                  select.select([read], [], [], TIMEOUT_FULL)
     437              except InterruptSelect:
     438                  pass
     439              else:
     440                  raise Exception("select() was not interrupted")
     441              after_time = time.monotonic()
     442              dt = after_time - before_time
     443              if dt >= TIMEOUT_HALF:
     444                  raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
     445          """, signal.SIGALRM)
     446  
     447      def test_signum(self):
     448          self.check_wakeup("""def test():
     449              signal.signal(signal.SIGUSR1, handler)
     450              signal.raise_signal(signal.SIGUSR1)
     451              signal.raise_signal(signal.SIGALRM)
     452          """, signal.SIGUSR1, signal.SIGALRM)
     453  
     454      @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
     455                           'need signal.pthread_sigmask()')
     456      def test_pending(self):
     457          self.check_wakeup("""def test():
     458              signum1 = signal.SIGUSR1
     459              signum2 = signal.SIGUSR2
     460  
     461              signal.signal(signum1, handler)
     462              signal.signal(signum2, handler)
     463  
     464              signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
     465              signal.raise_signal(signum1)
     466              signal.raise_signal(signum2)
     467              # Unblocking the 2 signals calls the C signal handler twice
     468              signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
     469          """,  signal.SIGUSR1, signal.SIGUSR2, ordered=False)
     470  
     471  
     472  @unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair')
     473  class ESC[4;38;5;81mWakeupSocketSignalTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     474  
     475      @unittest.skipIf(_testcapi is None, 'need _testcapi')
     476      def test_socket(self):
     477          # use a subprocess to have only one thread
     478          code = """if 1:
     479          import signal
     480          import socket
     481          import struct
     482          import _testcapi
     483  
     484          signum = signal.SIGINT
     485          signals = (signum,)
     486  
     487          def handler(signum, frame):
     488              pass
     489  
     490          signal.signal(signum, handler)
     491  
     492          read, write = socket.socketpair()
     493          write.setblocking(False)
     494          signal.set_wakeup_fd(write.fileno())
     495  
     496          signal.raise_signal(signum)
     497  
     498          data = read.recv(1)
     499          if not data:
     500              raise Exception("no signum written")
     501          raised = struct.unpack('B', data)
     502          if raised != signals:
     503              raise Exception("%r != %r" % (raised, signals))
     504  
     505          read.close()
     506          write.close()
     507          """
     508  
     509          assert_python_ok('-c', code)
     510  
     511      @unittest.skipIf(_testcapi is None, 'need _testcapi')
     512      def test_send_error(self):
     513          # Use a subprocess to have only one thread.
     514          if os.name == 'nt':
     515              action = 'send'
     516          else:
     517              action = 'write'
     518          code = """if 1:
     519          import errno
     520          import signal
     521          import socket
     522          import sys
     523          import time
     524          import _testcapi
     525          from test.support import captured_stderr
     526  
     527          signum = signal.SIGINT
     528  
     529          def handler(signum, frame):
     530              pass
     531  
     532          signal.signal(signum, handler)
     533  
     534          read, write = socket.socketpair()
     535          read.setblocking(False)
     536          write.setblocking(False)
     537  
     538          signal.set_wakeup_fd(write.fileno())
     539  
     540          # Close sockets: send() will fail
     541          read.close()
     542          write.close()
     543  
     544          with captured_stderr() as err:
     545              signal.raise_signal(signum)
     546  
     547          err = err.getvalue()
     548          if ('Exception ignored when trying to {action} to the signal wakeup fd'
     549              not in err):
     550              raise AssertionError(err)
     551          """.format(action=action)
     552          assert_python_ok('-c', code)
     553  
     554      @unittest.skipIf(_testcapi is None, 'need _testcapi')
     555      def test_warn_on_full_buffer(self):
     556          # Use a subprocess to have only one thread.
     557          if os.name == 'nt':
     558              action = 'send'
     559          else:
     560              action = 'write'
     561          code = """if 1:
     562          import errno
     563          import signal
     564          import socket
     565          import sys
     566          import time
     567          import _testcapi
     568          from test.support import captured_stderr
     569  
     570          signum = signal.SIGINT
     571  
     572          # This handler will be called, but we intentionally won't read from
     573          # the wakeup fd.
     574          def handler(signum, frame):
     575              pass
     576  
     577          signal.signal(signum, handler)
     578  
     579          read, write = socket.socketpair()
     580  
     581          # Fill the socketpair buffer
     582          if sys.platform == 'win32':
     583              # bpo-34130: On Windows, sometimes non-blocking send fails to fill
     584              # the full socketpair buffer, so use a timeout of 50 ms instead.
     585              write.settimeout(0.050)
     586          else:
     587              write.setblocking(False)
     588  
     589          written = 0
     590          if sys.platform == "vxworks":
     591              CHUNK_SIZES = (1,)
     592          else:
     593              # Start with large chunk size to reduce the
     594              # number of send needed to fill the buffer.
     595              CHUNK_SIZES = (2 ** 16, 2 ** 8, 1)
     596          for chunk_size in CHUNK_SIZES:
     597              chunk = b"x" * chunk_size
     598              try:
     599                  while True:
     600                      write.send(chunk)
     601                      written += chunk_size
     602              except (BlockingIOError, TimeoutError):
     603                  pass
     604  
     605          print(f"%s bytes written into the socketpair" % written, flush=True)
     606  
     607          write.setblocking(False)
     608          try:
     609              write.send(b"x")
     610          except BlockingIOError:
     611              # The socketpair buffer seems full
     612              pass
     613          else:
     614              raise AssertionError("%s bytes failed to fill the socketpair "
     615                                   "buffer" % written)
     616  
     617          # By default, we get a warning when a signal arrives
     618          msg = ('Exception ignored when trying to {action} '
     619                 'to the signal wakeup fd')
     620          signal.set_wakeup_fd(write.fileno())
     621  
     622          with captured_stderr() as err:
     623              signal.raise_signal(signum)
     624  
     625          err = err.getvalue()
     626          if msg not in err:
     627              raise AssertionError("first set_wakeup_fd() test failed, "
     628                                   "stderr: %r" % err)
     629  
     630          # And also if warn_on_full_buffer=True
     631          signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True)
     632  
     633          with captured_stderr() as err:
     634              signal.raise_signal(signum)
     635  
     636          err = err.getvalue()
     637          if msg not in err:
     638              raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) "
     639                                   "test failed, stderr: %r" % err)
     640  
     641          # But not if warn_on_full_buffer=False
     642          signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False)
     643  
     644          with captured_stderr() as err:
     645              signal.raise_signal(signum)
     646  
     647          err = err.getvalue()
     648          if err != "":
     649              raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) "
     650                                   "test failed, stderr: %r" % err)
     651  
     652          # And then check the default again, to make sure warn_on_full_buffer
     653          # settings don't leak across calls.
     654          signal.set_wakeup_fd(write.fileno())
     655  
     656          with captured_stderr() as err:
     657              signal.raise_signal(signum)
     658  
     659          err = err.getvalue()
     660          if msg not in err:
     661              raise AssertionError("second set_wakeup_fd() test failed, "
     662                                   "stderr: %r" % err)
     663  
     664          """.format(action=action)
     665          assert_python_ok('-c', code)
     666  
     667  
     668  @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
     669  @unittest.skipUnless(hasattr(signal, 'siginterrupt'), "needs signal.siginterrupt()")
     670  @support.requires_subprocess()
     671  @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
     672  class ESC[4;38;5;81mSiginterruptTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     673  
     674      def readpipe_interrupted(self, interrupt):
     675          """Perform a read during which a signal will arrive.  Return True if the
     676          read is interrupted by the signal and raises an exception.  Return False
     677          if it returns normally.
     678          """
     679          # use a subprocess to have only one thread, to have a timeout on the
     680          # blocking read and to not touch signal handling in this process
     681          code = """if 1:
     682              import errno
     683              import os
     684              import signal
     685              import sys
     686  
     687              interrupt = %r
     688              r, w = os.pipe()
     689  
     690              def handler(signum, frame):
     691                  1 / 0
     692  
     693              signal.signal(signal.SIGALRM, handler)
     694              if interrupt is not None:
     695                  signal.siginterrupt(signal.SIGALRM, interrupt)
     696  
     697              print("ready")
     698              sys.stdout.flush()
     699  
     700              # run the test twice
     701              try:
     702                  for loop in range(2):
     703                      # send a SIGALRM in a second (during the read)
     704                      signal.alarm(1)
     705                      try:
     706                          # blocking call: read from a pipe without data
     707                          os.read(r, 1)
     708                      except ZeroDivisionError:
     709                          pass
     710                      else:
     711                          sys.exit(2)
     712                  sys.exit(3)
     713              finally:
     714                  os.close(r)
     715                  os.close(w)
     716          """ % (interrupt,)
     717          with spawn_python('-c', code) as process:
     718              try:
     719                  # wait until the child process is loaded and has started
     720                  first_line = process.stdout.readline()
     721  
     722                  stdout, stderr = process.communicate(timeout=support.SHORT_TIMEOUT)
     723              except subprocess.TimeoutExpired:
     724                  process.kill()
     725                  return False
     726              else:
     727                  stdout = first_line + stdout
     728                  exitcode = process.wait()
     729                  if exitcode not in (2, 3):
     730                      raise Exception("Child error (exit code %s): %r"
     731                                      % (exitcode, stdout))
     732                  return (exitcode == 3)
     733  
     734      def test_without_siginterrupt(self):
     735          # If a signal handler is installed and siginterrupt is not called
     736          # at all, when that signal arrives, it interrupts a syscall that's in
     737          # progress.
     738          interrupted = self.readpipe_interrupted(None)
     739          self.assertTrue(interrupted)
     740  
     741      def test_siginterrupt_on(self):
     742          # If a signal handler is installed and siginterrupt is called with
     743          # a true value for the second argument, when that signal arrives, it
     744          # interrupts a syscall that's in progress.
     745          interrupted = self.readpipe_interrupted(True)
     746          self.assertTrue(interrupted)
     747  
     748      @support.requires_resource('walltime')
     749      def test_siginterrupt_off(self):
     750          # If a signal handler is installed and siginterrupt is called with
     751          # a false value for the second argument, when that signal arrives, it
     752          # does not interrupt a syscall that's in progress.
     753          interrupted = self.readpipe_interrupted(False)
     754          self.assertFalse(interrupted)
     755  
     756  
     757  @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
     758  @unittest.skipUnless(hasattr(signal, 'getitimer') and hasattr(signal, 'setitimer'),
     759                           "needs signal.getitimer() and signal.setitimer()")
     760  class ESC[4;38;5;81mItimerTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     761      def setUp(self):
     762          self.hndl_called = False
     763          self.hndl_count = 0
     764          self.itimer = None
     765          self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
     766  
     767      def tearDown(self):
     768          signal.signal(signal.SIGALRM, self.old_alarm)
     769          if self.itimer is not None: # test_itimer_exc doesn't change this attr
     770              # just ensure that itimer is stopped
     771              signal.setitimer(self.itimer, 0)
     772  
     773      def sig_alrm(self, *args):
     774          self.hndl_called = True
     775  
     776      def sig_vtalrm(self, *args):
     777          self.hndl_called = True
     778  
     779          if self.hndl_count > 3:
     780              # it shouldn't be here, because it should have been disabled.
     781              raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
     782                  "timer.")
     783          elif self.hndl_count == 3:
     784              # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
     785              signal.setitimer(signal.ITIMER_VIRTUAL, 0)
     786  
     787          self.hndl_count += 1
     788  
     789      def sig_prof(self, *args):
     790          self.hndl_called = True
     791          signal.setitimer(signal.ITIMER_PROF, 0)
     792  
     793      def test_itimer_exc(self):
     794          # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
     795          # defines it ?
     796          self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
     797          # Negative times are treated as zero on some platforms.
     798          if 0:
     799              self.assertRaises(signal.ItimerError,
     800                                signal.setitimer, signal.ITIMER_REAL, -1)
     801  
     802      def test_itimer_real(self):
     803          self.itimer = signal.ITIMER_REAL
     804          signal.setitimer(self.itimer, 1.0)
     805          signal.pause()
     806          self.assertEqual(self.hndl_called, True)
     807  
     808      # Issue 3864, unknown if this affects earlier versions of freebsd also
     809      @unittest.skipIf(sys.platform in ('netbsd5',),
     810          'itimer not reliable (does not mix well with threading) on some BSDs.')
     811      def test_itimer_virtual(self):
     812          self.itimer = signal.ITIMER_VIRTUAL
     813          signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
     814          signal.setitimer(self.itimer, 0.3, 0.2)
     815  
     816          for _ in support.busy_retry(60.0, error=False):
     817              # use up some virtual time by doing real work
     818              _ = pow(12345, 67890, 10000019)
     819              if signal.getitimer(self.itimer) == (0.0, 0.0):
     820                  # sig_vtalrm handler stopped this itimer
     821                  break
     822          else:
     823              # bpo-8424
     824              self.skipTest("timeout: likely cause: machine too slow or load too "
     825                            "high")
     826  
     827          # virtual itimer should be (0.0, 0.0) now
     828          self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
     829          # and the handler should have been called
     830          self.assertEqual(self.hndl_called, True)
     831  
     832      def test_itimer_prof(self):
     833          self.itimer = signal.ITIMER_PROF
     834          signal.signal(signal.SIGPROF, self.sig_prof)
     835          signal.setitimer(self.itimer, 0.2, 0.2)
     836  
     837          for _ in support.busy_retry(60.0, error=False):
     838              # do some work
     839              _ = pow(12345, 67890, 10000019)
     840              if signal.getitimer(self.itimer) == (0.0, 0.0):
     841                  # sig_prof handler stopped this itimer
     842                  break
     843          else:
     844              # bpo-8424
     845              self.skipTest("timeout: likely cause: machine too slow or load too "
     846                            "high")
     847  
     848          # profiling itimer should be (0.0, 0.0) now
     849          self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
     850          # and the handler should have been called
     851          self.assertEqual(self.hndl_called, True)
     852  
     853      def test_setitimer_tiny(self):
     854          # bpo-30807: C setitimer() takes a microsecond-resolution interval.
     855          # Check that float -> timeval conversion doesn't round
     856          # the interval down to zero, which would disable the timer.
     857          self.itimer = signal.ITIMER_REAL
     858          signal.setitimer(self.itimer, 1e-6)
     859          time.sleep(1)
     860          self.assertEqual(self.hndl_called, True)
     861  
     862  
     863  class ESC[4;38;5;81mPendingSignalsTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     864      """
     865      Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
     866      functions.
     867      """
     868      @unittest.skipUnless(hasattr(signal, 'sigpending'),
     869                           'need signal.sigpending()')
     870      def test_sigpending_empty(self):
     871          self.assertEqual(signal.sigpending(), set())
     872  
     873      @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
     874                           'need signal.pthread_sigmask()')
     875      @unittest.skipUnless(hasattr(signal, 'sigpending'),
     876                           'need signal.sigpending()')
     877      def test_sigpending(self):
     878          code = """if 1:
     879              import os
     880              import signal
     881  
     882              def handler(signum, frame):
     883                  1/0
     884  
     885              signum = signal.SIGUSR1
     886              signal.signal(signum, handler)
     887  
     888              signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
     889              os.kill(os.getpid(), signum)
     890              pending = signal.sigpending()
     891              for sig in pending:
     892                  assert isinstance(sig, signal.Signals), repr(pending)
     893              if pending != {signum}:
     894                  raise Exception('%s != {%s}' % (pending, signum))
     895              try:
     896                  signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
     897              except ZeroDivisionError:
     898                  pass
     899              else:
     900                  raise Exception("ZeroDivisionError not raised")
     901          """
     902          assert_python_ok('-c', code)
     903  
     904      @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
     905                           'need signal.pthread_kill()')
     906      @threading_helper.requires_working_threading()
     907      def test_pthread_kill(self):
     908          code = """if 1:
     909              import signal
     910              import threading
     911              import sys
     912  
     913              signum = signal.SIGUSR1
     914  
     915              def handler(signum, frame):
     916                  1/0
     917  
     918              signal.signal(signum, handler)
     919  
     920              tid = threading.get_ident()
     921              try:
     922                  signal.pthread_kill(tid, signum)
     923              except ZeroDivisionError:
     924                  pass
     925              else:
     926                  raise Exception("ZeroDivisionError not raised")
     927          """
     928          assert_python_ok('-c', code)
     929  
     930      @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
     931                           'need signal.pthread_sigmask()')
     932      def wait_helper(self, blocked, test):
     933          """
     934          test: body of the "def test(signum):" function.
     935          blocked: number of the blocked signal
     936          """
     937          code = '''if 1:
     938          import signal
     939          import sys
     940          from signal import Signals
     941  
     942          def handler(signum, frame):
     943              1/0
     944  
     945          %s
     946  
     947          blocked = %s
     948          signum = signal.SIGALRM
     949  
     950          # child: block and wait the signal
     951          try:
     952              signal.signal(signum, handler)
     953              signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
     954  
     955              # Do the tests
     956              test(signum)
     957  
     958              # The handler must not be called on unblock
     959              try:
     960                  signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
     961              except ZeroDivisionError:
     962                  print("the signal handler has been called",
     963                        file=sys.stderr)
     964                  sys.exit(1)
     965          except BaseException as err:
     966              print("error: {}".format(err), file=sys.stderr)
     967              sys.stderr.flush()
     968              sys.exit(1)
     969          ''' % (test.strip(), blocked)
     970  
     971          # sig*wait* must be called with the signal blocked: since the current
     972          # process might have several threads running, use a subprocess to have
     973          # a single thread.
     974          assert_python_ok('-c', code)
     975  
     976      @unittest.skipUnless(hasattr(signal, 'sigwait'),
     977                           'need signal.sigwait()')
     978      def test_sigwait(self):
     979          self.wait_helper(signal.SIGALRM, '''
     980          def test(signum):
     981              signal.alarm(1)
     982              received = signal.sigwait([signum])
     983              assert isinstance(received, signal.Signals), received
     984              if received != signum:
     985                  raise Exception('received %s, not %s' % (received, signum))
     986          ''')
     987  
     988      @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
     989                           'need signal.sigwaitinfo()')
     990      def test_sigwaitinfo(self):
     991          self.wait_helper(signal.SIGALRM, '''
     992          def test(signum):
     993              signal.alarm(1)
     994              info = signal.sigwaitinfo([signum])
     995              if info.si_signo != signum:
     996                  raise Exception("info.si_signo != %s" % signum)
     997          ''')
     998  
     999      @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
    1000                           'need signal.sigtimedwait()')
    1001      def test_sigtimedwait(self):
    1002          self.wait_helper(signal.SIGALRM, '''
    1003          def test(signum):
    1004              signal.alarm(1)
    1005              info = signal.sigtimedwait([signum], 10.1000)
    1006              if info.si_signo != signum:
    1007                  raise Exception('info.si_signo != %s' % signum)
    1008          ''')
    1009  
    1010      @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
    1011                           'need signal.sigtimedwait()')
    1012      def test_sigtimedwait_poll(self):
    1013          # check that polling with sigtimedwait works
    1014          self.wait_helper(signal.SIGALRM, '''
    1015          def test(signum):
    1016              import os
    1017              os.kill(os.getpid(), signum)
    1018              info = signal.sigtimedwait([signum], 0)
    1019              if info.si_signo != signum:
    1020                  raise Exception('info.si_signo != %s' % signum)
    1021          ''')
    1022  
    1023      @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
    1024                           'need signal.sigtimedwait()')
    1025      def test_sigtimedwait_timeout(self):
    1026          self.wait_helper(signal.SIGALRM, '''
    1027          def test(signum):
    1028              received = signal.sigtimedwait([signum], 1.0)
    1029              if received is not None:
    1030                  raise Exception("received=%r" % (received,))
    1031          ''')
    1032  
    1033      @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
    1034                           'need signal.sigtimedwait()')
    1035      def test_sigtimedwait_negative_timeout(self):
    1036          signum = signal.SIGALRM
    1037          self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0)
    1038  
    1039      @unittest.skipUnless(hasattr(signal, 'sigwait'),
    1040                           'need signal.sigwait()')
    1041      @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
    1042                           'need signal.pthread_sigmask()')
    1043      @threading_helper.requires_working_threading()
    1044      def test_sigwait_thread(self):
    1045          # Check that calling sigwait() from a thread doesn't suspend the whole
    1046          # process. A new interpreter is spawned to avoid problems when mixing
    1047          # threads and fork(): only async-safe functions are allowed between
    1048          # fork() and exec().
    1049          assert_python_ok("-c", """if True:
    1050              import os, threading, sys, time, signal
    1051  
    1052              # the default handler terminates the process
    1053              signum = signal.SIGUSR1
    1054  
    1055              def kill_later():
    1056                  # wait until the main thread is waiting in sigwait()
    1057                  time.sleep(1)
    1058                  os.kill(os.getpid(), signum)
    1059  
    1060              # the signal must be blocked by all the threads
    1061              signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
    1062              killer = threading.Thread(target=kill_later)
    1063              killer.start()
    1064              received = signal.sigwait([signum])
    1065              if received != signum:
    1066                  print("sigwait() received %s, not %s" % (received, signum),
    1067                        file=sys.stderr)
    1068                  sys.exit(1)
    1069              killer.join()
    1070              # unblock the signal, which should have been cleared by sigwait()
    1071              signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
    1072          """)
    1073  
    1074      @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
    1075                           'need signal.pthread_sigmask()')
    1076      def test_pthread_sigmask_arguments(self):
    1077          self.assertRaises(TypeError, signal.pthread_sigmask)
    1078          self.assertRaises(TypeError, signal.pthread_sigmask, 1)
    1079          self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3)
    1080          self.assertRaises(OSError, signal.pthread_sigmask, 1700, [])
    1081          with self.assertRaises(ValueError):
    1082              signal.pthread_sigmask(signal.SIG_BLOCK, [signal.NSIG])
    1083          with self.assertRaises(ValueError):
    1084              signal.pthread_sigmask(signal.SIG_BLOCK, [0])
    1085          with self.assertRaises(ValueError):
    1086              signal.pthread_sigmask(signal.SIG_BLOCK, [1<<1000])
    1087  
    1088      @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
    1089                           'need signal.pthread_sigmask()')
    1090      def test_pthread_sigmask_valid_signals(self):
    1091          s = signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals())
    1092          self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, s)
    1093          # Get current blocked set
    1094          s = signal.pthread_sigmask(signal.SIG_UNBLOCK, signal.valid_signals())
    1095          self.assertLessEqual(s, signal.valid_signals())
    1096  
    1097      @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
    1098                           'need signal.pthread_sigmask()')
    1099      @threading_helper.requires_working_threading()
    1100      def test_pthread_sigmask(self):
    1101          code = """if 1:
    1102          import signal
    1103          import os; import threading
    1104  
    1105          def handler(signum, frame):
    1106              1/0
    1107  
    1108          def kill(signum):
    1109              os.kill(os.getpid(), signum)
    1110  
    1111          def check_mask(mask):
    1112              for sig in mask:
    1113                  assert isinstance(sig, signal.Signals), repr(sig)
    1114  
    1115          def read_sigmask():
    1116              sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
    1117              check_mask(sigmask)
    1118              return sigmask
    1119  
    1120          signum = signal.SIGUSR1
    1121  
    1122          # Install our signal handler
    1123          old_handler = signal.signal(signum, handler)
    1124  
    1125          # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
    1126          old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
    1127          check_mask(old_mask)
    1128          try:
    1129              kill(signum)
    1130          except ZeroDivisionError:
    1131              pass
    1132          else:
    1133              raise Exception("ZeroDivisionError not raised")
    1134  
    1135          # Block and then raise SIGUSR1. The signal is blocked: the signal
    1136          # handler is not called, and the signal is now pending
    1137          mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
    1138          check_mask(mask)
    1139          kill(signum)
    1140  
    1141          # Check the new mask
    1142          blocked = read_sigmask()
    1143          check_mask(blocked)
    1144          if signum not in blocked:
    1145              raise Exception("%s not in %s" % (signum, blocked))
    1146          if old_mask ^ blocked != {signum}:
    1147              raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))
    1148  
    1149          # Unblock SIGUSR1
    1150          try:
    1151              # unblock the pending signal calls immediately the signal handler
    1152              signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
    1153          except ZeroDivisionError:
    1154              pass
    1155          else:
    1156              raise Exception("ZeroDivisionError not raised")
    1157          try:
    1158              kill(signum)
    1159          except ZeroDivisionError:
    1160              pass
    1161          else:
    1162              raise Exception("ZeroDivisionError not raised")
    1163  
    1164          # Check the new mask
    1165          unblocked = read_sigmask()
    1166          if signum in unblocked:
    1167              raise Exception("%s in %s" % (signum, unblocked))
    1168          if blocked ^ unblocked != {signum}:
    1169              raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
    1170          if old_mask != unblocked:
    1171              raise Exception("%s != %s" % (old_mask, unblocked))
    1172          """
    1173          assert_python_ok('-c', code)
    1174  
    1175      @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
    1176                           'need signal.pthread_kill()')
    1177      @threading_helper.requires_working_threading()
    1178      def test_pthread_kill_main_thread(self):
    1179          # Test that a signal can be sent to the main thread with pthread_kill()
    1180          # before any other thread has been created (see issue #12392).
    1181          code = """if True:
    1182              import threading
    1183              import signal
    1184              import sys
    1185  
    1186              def handler(signum, frame):
    1187                  sys.exit(3)
    1188  
    1189              signal.signal(signal.SIGUSR1, handler)
    1190              signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
    1191              sys.exit(2)
    1192          """
    1193  
    1194          with spawn_python('-c', code) as process:
    1195              stdout, stderr = process.communicate()
    1196              exitcode = process.wait()
    1197              if exitcode != 3:
    1198                  raise Exception("Child error (exit code %s): %s" %
    1199                                  (exitcode, stdout))
    1200  
    1201  
    1202  class ESC[4;38;5;81mStressTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1203      """
    1204      Stress signal delivery, especially when a signal arrives in
    1205      the middle of recomputing the signal state or executing
    1206      previously tripped signal handlers.
    1207      """
    1208  
    1209      def setsig(self, signum, handler):
    1210          old_handler = signal.signal(signum, handler)
    1211          self.addCleanup(signal.signal, signum, old_handler)
    1212  
    1213      def measure_itimer_resolution(self):
    1214          N = 20
    1215          times = []
    1216  
    1217          def handler(signum=None, frame=None):
    1218              if len(times) < N:
    1219                  times.append(time.perf_counter())
    1220                  # 1 µs is the smallest possible timer interval,
    1221                  # we want to measure what the concrete duration
    1222                  # will be on this platform
    1223                  signal.setitimer(signal.ITIMER_REAL, 1e-6)
    1224  
    1225          self.addCleanup(signal.setitimer, signal.ITIMER_REAL, 0)
    1226          self.setsig(signal.SIGALRM, handler)
    1227          handler()
    1228          while len(times) < N:
    1229              time.sleep(1e-3)
    1230  
    1231          durations = [times[i+1] - times[i] for i in range(len(times) - 1)]
    1232          med = statistics.median(durations)
    1233          if support.verbose:
    1234              print("detected median itimer() resolution: %.6f s." % (med,))
    1235          return med
    1236  
    1237      def decide_itimer_count(self):
    1238          # Some systems have poor setitimer() resolution (for example
    1239          # measured around 20 ms. on FreeBSD 9), so decide on a reasonable
    1240          # number of sequential timers based on that.
    1241          reso = self.measure_itimer_resolution()
    1242          if reso <= 1e-4:
    1243              return 10000
    1244          elif reso <= 1e-2:
    1245              return 100
    1246          else:
    1247              self.skipTest("detected itimer resolution (%.3f s.) too high "
    1248                            "(> 10 ms.) on this platform (or system too busy)"
    1249                            % (reso,))
    1250  
    1251      @unittest.skipUnless(hasattr(signal, "setitimer"),
    1252                           "test needs setitimer()")
    1253      def test_stress_delivery_dependent(self):
    1254          """
    1255          This test uses dependent signal handlers.
    1256          """
    1257          N = self.decide_itimer_count()
    1258          sigs = []
    1259  
    1260          def first_handler(signum, frame):
    1261              # 1e-6 is the minimum non-zero value for `setitimer()`.
    1262              # Choose a random delay so as to improve chances of
    1263              # triggering a race condition.  Ideally the signal is received
    1264              # when inside critical signal-handling routines such as
    1265              # Py_MakePendingCalls().
    1266              signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
    1267  
    1268          def second_handler(signum=None, frame=None):
    1269              sigs.append(signum)
    1270  
    1271          # Here on Linux, SIGPROF > SIGALRM > SIGUSR1.  By using both
    1272          # ascending and descending sequences (SIGUSR1 then SIGALRM,
    1273          # SIGPROF then SIGALRM), we maximize chances of hitting a bug.
    1274          self.setsig(signal.SIGPROF, first_handler)
    1275          self.setsig(signal.SIGUSR1, first_handler)
    1276          self.setsig(signal.SIGALRM, second_handler)  # for ITIMER_REAL
    1277  
    1278          expected_sigs = 0
    1279          deadline = time.monotonic() + support.SHORT_TIMEOUT
    1280  
    1281          while expected_sigs < N:
    1282              os.kill(os.getpid(), signal.SIGPROF)
    1283              expected_sigs += 1
    1284              # Wait for handlers to run to avoid signal coalescing
    1285              while len(sigs) < expected_sigs and time.monotonic() < deadline:
    1286                  time.sleep(1e-5)
    1287  
    1288              os.kill(os.getpid(), signal.SIGUSR1)
    1289              expected_sigs += 1
    1290              while len(sigs) < expected_sigs and time.monotonic() < deadline:
    1291                  time.sleep(1e-5)
    1292  
    1293          # All ITIMER_REAL signals should have been delivered to the
    1294          # Python handler
    1295          self.assertEqual(len(sigs), N, "Some signals were lost")
    1296  
    1297      @unittest.skipUnless(hasattr(signal, "setitimer"),
    1298                           "test needs setitimer()")
    1299      def test_stress_delivery_simultaneous(self):
    1300          """
    1301          This test uses simultaneous signal handlers.
    1302          """
    1303          N = self.decide_itimer_count()
    1304          sigs = []
    1305  
    1306          def handler(signum, frame):
    1307              sigs.append(signum)
    1308  
    1309          self.setsig(signal.SIGUSR1, handler)
    1310          self.setsig(signal.SIGALRM, handler)  # for ITIMER_REAL
    1311  
    1312          expected_sigs = 0
    1313          while expected_sigs < N:
    1314              # Hopefully the SIGALRM will be received somewhere during
    1315              # initial processing of SIGUSR1.
    1316              signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
    1317              os.kill(os.getpid(), signal.SIGUSR1)
    1318  
    1319              expected_sigs += 2
    1320              # Wait for handlers to run to avoid signal coalescing
    1321              for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
    1322                  if len(sigs) >= expected_sigs:
    1323                      break
    1324  
    1325          # All ITIMER_REAL signals should have been delivered to the
    1326          # Python handler
    1327          self.assertEqual(len(sigs), N, "Some signals were lost")
    1328  
    1329      @unittest.skipUnless(hasattr(signal, "SIGUSR1"),
    1330                           "test needs SIGUSR1")
    1331      @threading_helper.requires_working_threading()
    1332      def test_stress_modifying_handlers(self):
    1333          # bpo-43406: race condition between trip_signal() and signal.signal
    1334          signum = signal.SIGUSR1
    1335          num_sent_signals = 0
    1336          num_received_signals = 0
    1337          do_stop = False
    1338  
    1339          def custom_handler(signum, frame):
    1340              nonlocal num_received_signals
    1341              num_received_signals += 1
    1342  
    1343          def set_interrupts():
    1344              nonlocal num_sent_signals
    1345              while not do_stop:
    1346                  signal.raise_signal(signum)
    1347                  num_sent_signals += 1
    1348  
    1349          def cycle_handlers():
    1350              while num_sent_signals < 100 or num_received_signals < 1:
    1351                  for i in range(20000):
    1352                      # Cycle between a Python-defined and a non-Python handler
    1353                      for handler in [custom_handler, signal.SIG_IGN]:
    1354                          signal.signal(signum, handler)
    1355  
    1356          old_handler = signal.signal(signum, custom_handler)
    1357          self.addCleanup(signal.signal, signum, old_handler)
    1358  
    1359          t = threading.Thread(target=set_interrupts)
    1360          try:
    1361              ignored = False
    1362              with support.catch_unraisable_exception() as cm:
    1363                  t.start()
    1364                  cycle_handlers()
    1365                  do_stop = True
    1366                  t.join()
    1367  
    1368                  if cm.unraisable is not None:
    1369                      # An unraisable exception may be printed out when
    1370                      # a signal is ignored due to the aforementioned
    1371                      # race condition, check it.
    1372                      self.assertIsInstance(cm.unraisable.exc_value, OSError)
    1373                      self.assertIn(
    1374                          f"Signal {signum:d} ignored due to race condition",
    1375                          str(cm.unraisable.exc_value))
    1376                      ignored = True
    1377  
    1378              # bpo-43406: Even if it is unlikely, it's technically possible that
    1379              # all signals were ignored because of race conditions.
    1380              if not ignored:
    1381                  # Sanity check that some signals were received, but not all
    1382                  self.assertGreater(num_received_signals, 0)
    1383              self.assertLessEqual(num_received_signals, num_sent_signals)
    1384          finally:
    1385              do_stop = True
    1386              t.join()
    1387  
    1388  
    1389  class ESC[4;38;5;81mRaiseSignalTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1390  
    1391      def test_sigint(self):
    1392          with self.assertRaises(KeyboardInterrupt):
    1393              signal.raise_signal(signal.SIGINT)
    1394  
    1395      @unittest.skipIf(sys.platform != "win32", "Windows specific test")
    1396      def test_invalid_argument(self):
    1397          try:
    1398              SIGHUP = 1 # not supported on win32
    1399              signal.raise_signal(SIGHUP)
    1400              self.fail("OSError (Invalid argument) expected")
    1401          except OSError as e:
    1402              if e.errno == errno.EINVAL:
    1403                  pass
    1404              else:
    1405                  raise
    1406  
    1407      def test_handler(self):
    1408          is_ok = False
    1409          def handler(a, b):
    1410              nonlocal is_ok
    1411              is_ok = True
    1412          old_signal = signal.signal(signal.SIGINT, handler)
    1413          self.addCleanup(signal.signal, signal.SIGINT, old_signal)
    1414  
    1415          signal.raise_signal(signal.SIGINT)
    1416          self.assertTrue(is_ok)
    1417  
    1418      def test__thread_interrupt_main(self):
    1419          # See https://github.com/python/cpython/issues/102397
    1420          code = """if 1:
    1421          import _thread
    1422          class Foo():
    1423              def __del__(self):
    1424                  _thread.interrupt_main()
    1425  
    1426          x = Foo()
    1427          """
    1428  
    1429          rc, out, err = assert_python_ok('-c', code)
    1430          self.assertIn(b'OSError: Signal 2 ignored due to race condition', err)
    1431  
    1432  
    1433  
    1434  class ESC[4;38;5;81mPidfdSignalTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1435  
    1436      @unittest.skipUnless(
    1437          hasattr(signal, "pidfd_send_signal"),
    1438          "pidfd support not built in",
    1439      )
    1440      def test_pidfd_send_signal(self):
    1441          with self.assertRaises(OSError) as cm:
    1442              signal.pidfd_send_signal(0, signal.SIGINT)
    1443          if cm.exception.errno == errno.ENOSYS:
    1444              self.skipTest("kernel does not support pidfds")
    1445          elif cm.exception.errno == errno.EPERM:
    1446              self.skipTest("Not enough privileges to use pidfs")
    1447          self.assertEqual(cm.exception.errno, errno.EBADF)
    1448          my_pidfd = os.open(f'/proc/{os.getpid()}', os.O_DIRECTORY)
    1449          self.addCleanup(os.close, my_pidfd)
    1450          with self.assertRaisesRegex(TypeError, "^siginfo must be None$"):
    1451              signal.pidfd_send_signal(my_pidfd, signal.SIGINT, object(), 0)
    1452          with self.assertRaises(KeyboardInterrupt):
    1453              signal.pidfd_send_signal(my_pidfd, signal.SIGINT)
    1454  
    1455  def tearDownModule():
    1456      support.reap_children()
    1457  
    1458  if __name__ == "__main__":
    1459      unittest.main()