python (3.12.0)

(root)/
lib/
python3.12/
test/
test_signal.py
       1  import enum
       2  import errno
       3  import inspect
       4  import os
       5  import random
       6  import signal
       7  import socket
       8  import statistics
       9  import subprocess
      10  import sys
      11  import threading
      12  import time
      13  import unittest
      14  from test import support
      15  from test.support import os_helper
      16  from test.support.script_helper import assert_python_ok, spawn_python
      17  from test.support import threading_helper
      18  try:
      19      import _testcapi
      20  except ImportError:
      21      _testcapi = None
      22  
      23  
      24  class ESC[4;38;5;81mGenericTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      25  
      26      def test_enums(self):
      27          for name in dir(signal):
      28              sig = getattr(signal, name)
      29              if name in {'SIG_DFL', 'SIG_IGN'}:
      30                  self.assertIsInstance(sig, signal.Handlers)
      31              elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}:
      32                  self.assertIsInstance(sig, signal.Sigmasks)
      33              elif name.startswith('SIG') and not name.startswith('SIG_'):
      34                  self.assertIsInstance(sig, signal.Signals)
      35              elif name.startswith('CTRL_'):
      36                  self.assertIsInstance(sig, signal.Signals)
      37                  self.assertEqual(sys.platform, "win32")
      38  
      39          CheckedSignals = enum._old_convert_(
      40                  enum.IntEnum, 'Signals', 'signal',
      41                  lambda name:
      42                      name.isupper()
      43                      and (name.startswith('SIG') and not name.startswith('SIG_'))
      44                      or name.startswith('CTRL_'),
      45                  source=signal,
      46                  )
      47          enum._test_simple_enum(CheckedSignals, signal.Signals)
      48  
      49          CheckedHandlers = enum._old_convert_(
      50                  enum.IntEnum, 'Handlers', 'signal',
      51                  lambda name: name in ('SIG_DFL', 'SIG_IGN'),
      52                  source=signal,
      53                  )
      54          enum._test_simple_enum(CheckedHandlers, signal.Handlers)
      55  
      56          Sigmasks = getattr(signal, 'Sigmasks', None)
      57          if Sigmasks is not None:
      58              CheckedSigmasks = enum._old_convert_(
      59                      enum.IntEnum, 'Sigmasks', 'signal',
      60                      lambda name: name in ('SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'),
      61                      source=signal,
      62                      )
      63              enum._test_simple_enum(CheckedSigmasks, Sigmasks)
      64  
      65      def test_functions_module_attr(self):
      66          # Issue #27718: If __all__ is not defined all non-builtin functions
      67          # should have correct __module__ to be displayed by pydoc.
      68          for name in dir(signal):
      69              value = getattr(signal, name)
      70              if inspect.isroutine(value) and not inspect.isbuiltin(value):
      71                  self.assertEqual(value.__module__, 'signal')
      72  
      73  
      74  @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
      75  class ESC[4;38;5;81mPosixTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      76      def trivial_signal_handler(self, *args):
      77          pass
      78  
      79      def test_out_of_range_signal_number_raises_error(self):
      80          self.assertRaises(ValueError, signal.getsignal, 4242)
      81  
      82          self.assertRaises(ValueError, signal.signal, 4242,
      83                            self.trivial_signal_handler)
      84  
      85          self.assertRaises(ValueError, signal.strsignal, 4242)
      86  
      87      def test_setting_signal_handler_to_none_raises_error(self):
      88          self.assertRaises(TypeError, signal.signal,
      89                            signal.SIGUSR1, None)
      90  
      91      def test_getsignal(self):
      92          hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
      93          self.assertIsInstance(hup, signal.Handlers)
      94          self.assertEqual(signal.getsignal(signal.SIGHUP),
      95                           self.trivial_signal_handler)
      96          signal.signal(signal.SIGHUP, hup)
      97          self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
      98  
      99      def test_strsignal(self):
     100          self.assertIn("Interrupt", signal.strsignal(signal.SIGINT))
     101          self.assertIn("Terminated", signal.strsignal(signal.SIGTERM))
     102          self.assertIn("Hangup", signal.strsignal(signal.SIGHUP))
     103  
     104      # Issue 3864, unknown if this affects earlier versions of freebsd also
     105      def test_interprocess_signal(self):
     106          dirname = os.path.dirname(__file__)
     107          script = os.path.join(dirname, 'signalinterproctester.py')
     108          assert_python_ok(script)
     109  
     110      @unittest.skipUnless(
     111          hasattr(signal, "valid_signals"),
     112          "requires signal.valid_signals"
     113      )
     114      def test_valid_signals(self):
     115          s = signal.valid_signals()
     116          self.assertIsInstance(s, set)
     117          self.assertIn(signal.Signals.SIGINT, s)
     118          self.assertIn(signal.Signals.SIGALRM, s)
     119          self.assertNotIn(0, s)
     120          self.assertNotIn(signal.NSIG, s)
     121          self.assertLess(len(s), signal.NSIG)
     122  
     123          # gh-91145: Make sure that all SIGxxx constants exposed by the Python
     124          # signal module have a number in the [0; signal.NSIG-1] range.
     125          for name in dir(signal):
     126              if not name.startswith("SIG"):
     127                  continue
     128              if name in {"SIG_IGN", "SIG_DFL"}:
     129                  # SIG_IGN and SIG_DFL are pointers
     130                  continue
     131              with self.subTest(name=name):
     132                  signum = getattr(signal, name)
     133                  self.assertGreaterEqual(signum, 0)
     134                  self.assertLess(signum, signal.NSIG)
     135  
     136      @unittest.skipUnless(sys.executable, "sys.executable required.")
     137      @support.requires_subprocess()
     138      def test_keyboard_interrupt_exit_code(self):
     139          """KeyboardInterrupt triggers exit via SIGINT."""
     140          process = subprocess.run(
     141                  [sys.executable, "-c",
     142                   "import os, signal, time\n"
     143                   "os.kill(os.getpid(), signal.SIGINT)\n"
     144                   "for _ in range(999): time.sleep(0.01)"],
     145                  stderr=subprocess.PIPE)
     146          self.assertIn(b"KeyboardInterrupt", process.stderr)
     147          self.assertEqual(process.returncode, -signal.SIGINT)
     148          # Caveat: The exit code is insufficient to guarantee we actually died
     149          # via a signal.  POSIX shells do more than look at the 8 bit value.
     150          # Writing an automation friendly test of an interactive shell
     151          # to confirm that our process died via a SIGINT proved too complex.
     152  
     153  
     154  @unittest.skipUnless(sys.platform == "win32", "Windows specific")
     155  class ESC[4;38;5;81mWindowsSignalTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     156  
     157      def test_valid_signals(self):
     158          s = signal.valid_signals()
     159          self.assertIsInstance(s, set)
     160          self.assertGreaterEqual(len(s), 6)
     161          self.assertIn(signal.Signals.SIGINT, s)
     162          self.assertNotIn(0, s)
     163          self.assertNotIn(signal.NSIG, s)
     164          self.assertLess(len(s), signal.NSIG)
     165  
     166      def test_issue9324(self):
     167          # Updated for issue #10003, adding SIGBREAK
     168          handler = lambda x, y: None
     169          checked = set()
     170          for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
     171                      signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
     172                      signal.SIGTERM):
     173              # Set and then reset a handler for signals that work on windows.
     174              # Issue #18396, only for signals without a C-level handler.
     175              if signal.getsignal(sig) is not None:
     176                  signal.signal(sig, signal.signal(sig, handler))
     177                  checked.add(sig)
     178          # Issue #18396: Ensure the above loop at least tested *something*
     179          self.assertTrue(checked)
     180  
     181          with self.assertRaises(ValueError):
     182              signal.signal(-1, handler)
     183  
     184          with self.assertRaises(ValueError):
     185              signal.signal(7, handler)
     186  
     187      @unittest.skipUnless(sys.executable, "sys.executable required.")
     188      @support.requires_subprocess()
     189      def test_keyboard_interrupt_exit_code(self):
     190          """KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT."""
     191          # We don't test via os.kill(os.getpid(), signal.CTRL_C_EVENT) here
     192          # as that requires setting up a console control handler in a child
     193          # in its own process group.  Doable, but quite complicated.  (see
     194          # @eryksun on https://github.com/python/cpython/pull/11862)
     195          process = subprocess.run(
     196                  [sys.executable, "-c", "raise KeyboardInterrupt"],
     197                  stderr=subprocess.PIPE)
     198          self.assertIn(b"KeyboardInterrupt", process.stderr)
     199          STATUS_CONTROL_C_EXIT = 0xC000013A
     200          self.assertEqual(process.returncode, STATUS_CONTROL_C_EXIT)
     201  
     202  
     203  class ESC[4;38;5;81mWakeupFDTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     204  
     205      def test_invalid_call(self):
     206          # First parameter is positional-only
     207          with self.assertRaises(TypeError):
     208              signal.set_wakeup_fd(signum=signal.SIGINT)
     209  
     210          # warn_on_full_buffer is a keyword-only parameter
     211          with self.assertRaises(TypeError):
     212              signal.set_wakeup_fd(signal.SIGINT, False)
     213  
     214      def test_invalid_fd(self):
     215          fd = os_helper.make_bad_fd()
     216          self.assertRaises((ValueError, OSError),
     217                            signal.set_wakeup_fd, fd)
     218  
     219      @unittest.skipUnless(support.has_socket_support, "needs working sockets.")
     220      def test_invalid_socket(self):
     221          sock = socket.socket()
     222          fd = sock.fileno()
     223          sock.close()
     224          self.assertRaises((ValueError, OSError),
     225                            signal.set_wakeup_fd, fd)
     226  
     227      # Emscripten does not support fstat on pipes yet.
     228      # https://github.com/emscripten-core/emscripten/issues/16414
     229      @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
     230      @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
     231      def test_set_wakeup_fd_result(self):
     232          r1, w1 = os.pipe()
     233          self.addCleanup(os.close, r1)
     234          self.addCleanup(os.close, w1)
     235          r2, w2 = os.pipe()
     236          self.addCleanup(os.close, r2)
     237          self.addCleanup(os.close, w2)
     238  
     239          if hasattr(os, 'set_blocking'):
     240              os.set_blocking(w1, False)
     241              os.set_blocking(w2, False)
     242  
     243          signal.set_wakeup_fd(w1)
     244          self.assertEqual(signal.set_wakeup_fd(w2), w1)
     245          self.assertEqual(signal.set_wakeup_fd(-1), w2)
     246          self.assertEqual(signal.set_wakeup_fd(-1), -1)
     247  
     248      @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
     249      @unittest.skipUnless(support.has_socket_support, "needs working sockets.")
     250      def test_set_wakeup_fd_socket_result(self):
     251          sock1 = socket.socket()
     252          self.addCleanup(sock1.close)
     253          sock1.setblocking(False)
     254          fd1 = sock1.fileno()
     255  
     256          sock2 = socket.socket()
     257          self.addCleanup(sock2.close)
     258          sock2.setblocking(False)
     259          fd2 = sock2.fileno()
     260  
     261          signal.set_wakeup_fd(fd1)
     262          self.assertEqual(signal.set_wakeup_fd(fd2), fd1)
     263          self.assertEqual(signal.set_wakeup_fd(-1), fd2)
     264          self.assertEqual(signal.set_wakeup_fd(-1), -1)
     265  
     266      # On Windows, files are always blocking and Windows does not provide a
     267      # function to test if a socket is in non-blocking mode.
     268      @unittest.skipIf(sys.platform == "win32", "tests specific to POSIX")
     269      @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
     270      @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
     271      def test_set_wakeup_fd_blocking(self):
     272          rfd, wfd = os.pipe()
     273          self.addCleanup(os.close, rfd)
     274          self.addCleanup(os.close, wfd)
     275  
     276          # fd must be non-blocking
     277          os.set_blocking(wfd, True)
     278          with self.assertRaises(ValueError) as cm:
     279              signal.set_wakeup_fd(wfd)
     280          self.assertEqual(str(cm.exception),
     281                           "the fd %s must be in non-blocking mode" % wfd)
     282  
     283          # non-blocking is ok
     284          os.set_blocking(wfd, False)
     285          signal.set_wakeup_fd(wfd)
     286          signal.set_wakeup_fd(-1)
     287  
     288  
     289  @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
     290  class ESC[4;38;5;81mWakeupSignalTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     291      @unittest.skipIf(_testcapi is None, 'need _testcapi')
     292      def check_wakeup(self, test_body, *signals, ordered=True):
     293          # use a subprocess to have only one thread
     294          code = """if 1:
     295          import _testcapi
     296          import os
     297          import signal
     298          import struct
     299  
     300          signals = {!r}
     301  
     302          def handler(signum, frame):
     303              pass
     304  
     305          def check_signum(signals):
     306              data = os.read(read, len(signals)+1)
     307              raised = struct.unpack('%uB' % len(data), data)
     308              if not {!r}:
     309                  raised = set(raised)
     310                  signals = set(signals)
     311              if raised != signals:
     312                  raise Exception("%r != %r" % (raised, signals))
     313  
     314          {}
     315  
     316          signal.signal(signal.SIGALRM, handler)
     317          read, write = os.pipe()
     318          os.set_blocking(write, False)
     319          signal.set_wakeup_fd(write)
     320  
     321          test()
     322          check_signum(signals)
     323  
     324          os.close(read)
     325          os.close(write)
     326          """.format(tuple(map(int, signals)), ordered, test_body)
     327  
     328          assert_python_ok('-c', code)
     329  
     330      @unittest.skipIf(_testcapi is None, 'need _testcapi')
     331      @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
     332      def test_wakeup_write_error(self):
     333          # Issue #16105: write() errors in the C signal handler should not
     334          # pass silently.
     335          # Use a subprocess to have only one thread.
     336          code = """if 1:
     337          import _testcapi
     338          import errno
     339          import os
     340          import signal
     341          import sys
     342          from test.support import captured_stderr
     343  
     344          def handler(signum, frame):
     345              1/0
     346  
     347          signal.signal(signal.SIGALRM, handler)
     348          r, w = os.pipe()
     349          os.set_blocking(r, False)
     350  
     351          # Set wakeup_fd a read-only file descriptor to trigger the error
     352          signal.set_wakeup_fd(r)
     353          try:
     354              with captured_stderr() as err:
     355                  signal.raise_signal(signal.SIGALRM)
     356          except ZeroDivisionError:
     357              # An ignored exception should have been printed out on stderr
     358              err = err.getvalue()
     359              if ('Exception ignored when trying to write to the signal wakeup fd'
     360                  not in err):
     361                  raise AssertionError(err)
     362              if ('OSError: [Errno %d]' % errno.EBADF) not in err:
     363                  raise AssertionError(err)
     364          else:
     365              raise AssertionError("ZeroDivisionError not raised")
     366  
     367          os.close(r)
     368          os.close(w)
     369          """
     370          r, w = os.pipe()
     371          try:
     372              os.write(r, b'x')
     373          except OSError:
     374              pass
     375          else:
     376              self.skipTest("OS doesn't report write() error on the read end of a pipe")
     377          finally:
     378              os.close(r)
     379              os.close(w)
     380  
     381          assert_python_ok('-c', code)
     382  
     383      def test_wakeup_fd_early(self):
     384          self.check_wakeup("""def test():
     385              import select
     386              import time
     387  
     388              TIMEOUT_FULL = 10
     389              TIMEOUT_HALF = 5
     390  
     391              class InterruptSelect(Exception):
     392                  pass
     393  
     394              def handler(signum, frame):
     395                  raise InterruptSelect
     396              signal.signal(signal.SIGALRM, handler)
     397  
     398              signal.alarm(1)
     399  
     400              # We attempt to get a signal during the sleep,
     401              # before select is called
     402              try:
     403                  select.select([], [], [], TIMEOUT_FULL)
     404              except InterruptSelect:
     405                  pass
     406              else:
     407                  raise Exception("select() was not interrupted")
     408  
     409              before_time = time.monotonic()
     410              select.select([read], [], [], TIMEOUT_FULL)
     411              after_time = time.monotonic()
     412              dt = after_time - before_time
     413              if dt >= TIMEOUT_HALF:
     414                  raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
     415          """, signal.SIGALRM)
     416  
     417      def test_wakeup_fd_during(self):
     418          self.check_wakeup("""def test():
     419              import select
     420              import time
     421  
     422              TIMEOUT_FULL = 10
     423              TIMEOUT_HALF = 5
     424  
     425              class InterruptSelect(Exception):
     426                  pass
     427  
     428              def handler(signum, frame):
     429                  raise InterruptSelect
     430              signal.signal(signal.SIGALRM, handler)
     431  
     432              signal.alarm(1)
     433              before_time = time.monotonic()
     434              # We attempt to get a signal during the select call
     435              try:
     436                  select.select([read], [], [], TIMEOUT_FULL)
     437              except InterruptSelect:
     438                  pass
     439              else:
     440                  raise Exception("select() was not interrupted")
     441              after_time = time.monotonic()
     442              dt = after_time - before_time
     443              if dt >= TIMEOUT_HALF:
     444                  raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
     445          """, signal.SIGALRM)
     446  
     447      def test_signum(self):
     448          self.check_wakeup("""def test():
     449              signal.signal(signal.SIGUSR1, handler)
     450              signal.raise_signal(signal.SIGUSR1)
     451              signal.raise_signal(signal.SIGALRM)
     452          """, signal.SIGUSR1, signal.SIGALRM)
     453  
     454      @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
     455                           'need signal.pthread_sigmask()')
     456      def test_pending(self):
     457          self.check_wakeup("""def test():
     458              signum1 = signal.SIGUSR1
     459              signum2 = signal.SIGUSR2
     460  
     461              signal.signal(signum1, handler)
     462              signal.signal(signum2, handler)
     463  
     464              signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
     465              signal.raise_signal(signum1)
     466              signal.raise_signal(signum2)
     467              # Unblocking the 2 signals calls the C signal handler twice
     468              signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
     469          """,  signal.SIGUSR1, signal.SIGUSR2, ordered=False)
     470  
     471  
     472  @unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair')
     473  class ESC[4;38;5;81mWakeupSocketSignalTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     474  
     475      @unittest.skipIf(_testcapi is None, 'need _testcapi')
     476      def test_socket(self):
     477          # use a subprocess to have only one thread
     478          code = """if 1:
     479          import signal
     480          import socket
     481          import struct
     482          import _testcapi
     483  
     484          signum = signal.SIGINT
     485          signals = (signum,)
     486  
     487          def handler(signum, frame):
     488              pass
     489  
     490          signal.signal(signum, handler)
     491  
     492          read, write = socket.socketpair()
     493          write.setblocking(False)
     494          signal.set_wakeup_fd(write.fileno())
     495  
     496          signal.raise_signal(signum)
     497  
     498          data = read.recv(1)
     499          if not data:
     500              raise Exception("no signum written")
     501          raised = struct.unpack('B', data)
     502          if raised != signals:
     503              raise Exception("%r != %r" % (raised, signals))
     504  
     505          read.close()
     506          write.close()
     507          """
     508  
     509          assert_python_ok('-c', code)
     510  
     511      @unittest.skipIf(_testcapi is None, 'need _testcapi')
     512      def test_send_error(self):
     513          # Use a subprocess to have only one thread.
     514          if os.name == 'nt':
     515              action = 'send'
     516          else:
     517              action = 'write'
     518          code = """if 1:
     519          import errno
     520          import signal
     521          import socket
     522          import sys
     523          import time
     524          import _testcapi
     525          from test.support import captured_stderr
     526  
     527          signum = signal.SIGINT
     528  
     529          def handler(signum, frame):
     530              pass
     531  
     532          signal.signal(signum, handler)
     533  
     534          read, write = socket.socketpair()
     535          read.setblocking(False)
     536          write.setblocking(False)
     537  
     538          signal.set_wakeup_fd(write.fileno())
     539  
     540          # Close sockets: send() will fail
     541          read.close()
     542          write.close()
     543  
     544          with captured_stderr() as err:
     545              signal.raise_signal(signum)
     546  
     547          err = err.getvalue()
     548          if ('Exception ignored when trying to {action} to the signal wakeup fd'
     549              not in err):
     550              raise AssertionError(err)
     551          """.format(action=action)
     552          assert_python_ok('-c', code)
     553  
     554      @unittest.skipIf(_testcapi is None, 'need _testcapi')
     555      def test_warn_on_full_buffer(self):
     556          # Use a subprocess to have only one thread.
     557          if os.name == 'nt':
     558              action = 'send'
     559          else:
     560              action = 'write'
     561          code = """if 1:
     562          import errno
     563          import signal
     564          import socket
     565          import sys
     566          import time
     567          import _testcapi
     568          from test.support import captured_stderr
     569  
     570          signum = signal.SIGINT
     571  
     572          # This handler will be called, but we intentionally won't read from
     573          # the wakeup fd.
     574          def handler(signum, frame):
     575              pass
     576  
     577          signal.signal(signum, handler)
     578  
     579          read, write = socket.socketpair()
     580  
     581          # Fill the socketpair buffer
     582          if sys.platform == 'win32':
     583              # bpo-34130: On Windows, sometimes non-blocking send fails to fill
     584              # the full socketpair buffer, so use a timeout of 50 ms instead.
     585              write.settimeout(0.050)
     586          else:
     587              write.setblocking(False)
     588  
     589          written = 0
     590          if sys.platform == "vxworks":
     591              CHUNK_SIZES = (1,)
     592          else:
     593              # Start with large chunk size to reduce the
     594              # number of send needed to fill the buffer.
     595              CHUNK_SIZES = (2 ** 16, 2 ** 8, 1)
     596          for chunk_size in CHUNK_SIZES:
     597              chunk = b"x" * chunk_size
     598              try:
     599                  while True:
     600                      write.send(chunk)
     601                      written += chunk_size
     602              except (BlockingIOError, TimeoutError):
     603                  pass
     604  
     605          print(f"%s bytes written into the socketpair" % written, flush=True)
     606  
     607          write.setblocking(False)
     608          try:
     609              write.send(b"x")
     610          except BlockingIOError:
     611              # The socketpair buffer seems full
     612              pass
     613          else:
     614              raise AssertionError("%s bytes failed to fill the socketpair "
     615                                   "buffer" % written)
     616  
     617          # By default, we get a warning when a signal arrives
     618          msg = ('Exception ignored when trying to {action} '
     619                 'to the signal wakeup fd')
     620          signal.set_wakeup_fd(write.fileno())
     621  
     622          with captured_stderr() as err:
     623              signal.raise_signal(signum)
     624  
     625          err = err.getvalue()
     626          if msg not in err:
     627              raise AssertionError("first set_wakeup_fd() test failed, "
     628                                   "stderr: %r" % err)
     629  
     630          # And also if warn_on_full_buffer=True
     631          signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True)
     632  
     633          with captured_stderr() as err:
     634              signal.raise_signal(signum)
     635  
     636          err = err.getvalue()
     637          if msg not in err:
     638              raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) "
     639                                   "test failed, stderr: %r" % err)
     640  
     641          # But not if warn_on_full_buffer=False
     642          signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False)
     643  
     644          with captured_stderr() as err:
     645              signal.raise_signal(signum)
     646  
     647          err = err.getvalue()
     648          if err != "":
     649              raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) "
     650                                   "test failed, stderr: %r" % err)
     651  
     652          # And then check the default again, to make sure warn_on_full_buffer
     653          # settings don't leak across calls.
     654          signal.set_wakeup_fd(write.fileno())
     655  
     656          with captured_stderr() as err:
     657              signal.raise_signal(signum)
     658  
     659          err = err.getvalue()
     660          if msg not in err:
     661              raise AssertionError("second set_wakeup_fd() test failed, "
     662                                   "stderr: %r" % err)
     663  
     664          """.format(action=action)
     665          assert_python_ok('-c', code)
     666  
     667  
     668  @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
     669  @unittest.skipUnless(hasattr(signal, 'siginterrupt'), "needs signal.siginterrupt()")
     670  @support.requires_subprocess()
     671  @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
     672  class ESC[4;38;5;81mSiginterruptTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     673  
     674      def readpipe_interrupted(self, interrupt):
     675          """Perform a read during which a signal will arrive.  Return True if the
     676          read is interrupted by the signal and raises an exception.  Return False
     677          if it returns normally.
     678          """
     679          # use a subprocess to have only one thread, to have a timeout on the
     680          # blocking read and to not touch signal handling in this process
     681          code = """if 1:
     682              import errno
     683              import os
     684              import signal
     685              import sys
     686  
     687              interrupt = %r
     688              r, w = os.pipe()
     689  
     690              def handler(signum, frame):
     691                  1 / 0
     692  
     693              signal.signal(signal.SIGALRM, handler)
     694              if interrupt is not None:
     695                  signal.siginterrupt(signal.SIGALRM, interrupt)
     696  
     697              print("ready")
     698              sys.stdout.flush()
     699  
     700              # run the test twice
     701              try:
     702                  for loop in range(2):
     703                      # send a SIGALRM in a second (during the read)
     704                      signal.alarm(1)
     705                      try:
     706                          # blocking call: read from a pipe without data
     707                          os.read(r, 1)
     708                      except ZeroDivisionError:
     709                          pass
     710                      else:
     711                          sys.exit(2)
     712                  sys.exit(3)
     713              finally:
     714                  os.close(r)
     715                  os.close(w)
     716          """ % (interrupt,)
     717          with spawn_python('-c', code) as process:
     718              try:
     719                  # wait until the child process is loaded and has started
     720                  first_line = process.stdout.readline()
     721  
     722                  stdout, stderr = process.communicate(timeout=support.SHORT_TIMEOUT)
     723              except subprocess.TimeoutExpired:
     724                  process.kill()
     725                  return False
     726              else:
     727                  stdout = first_line + stdout
     728                  exitcode = process.wait()
     729                  if exitcode not in (2, 3):
     730                      raise Exception("Child error (exit code %s): %r"
     731                                      % (exitcode, stdout))
     732                  return (exitcode == 3)
     733  
     734      def test_without_siginterrupt(self):
     735          # If a signal handler is installed and siginterrupt is not called
     736          # at all, when that signal arrives, it interrupts a syscall that's in
     737          # progress.
     738          interrupted = self.readpipe_interrupted(None)
     739          self.assertTrue(interrupted)
     740  
     741      def test_siginterrupt_on(self):
     742          # If a signal handler is installed and siginterrupt is called with
     743          # a true value for the second argument, when that signal arrives, it
     744          # interrupts a syscall that's in progress.
     745          interrupted = self.readpipe_interrupted(True)
     746          self.assertTrue(interrupted)
     747  
     748      @support.requires_resource('walltime')
     749      def test_siginterrupt_off(self):
     750          # If a signal handler is installed and siginterrupt is called with
     751          # a false value for the second argument, when that signal arrives, it
     752          # does not interrupt a syscall that's in progress.
     753          interrupted = self.readpipe_interrupted(False)
     754          self.assertFalse(interrupted)
     755  
     756  
     757  @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
     758  @unittest.skipUnless(hasattr(signal, 'getitimer') and hasattr(signal, 'setitimer'),
     759                           "needs signal.getitimer() and signal.setitimer()")
     760  class ESC[4;38;5;81mItimerTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     761      def setUp(self):
     762          self.hndl_called = False
     763          self.hndl_count = 0
     764          self.itimer = None
     765          self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
     766  
     767      def tearDown(self):
     768          signal.signal(signal.SIGALRM, self.old_alarm)
     769          if self.itimer is not None: # test_itimer_exc doesn't change this attr
     770              # just ensure that itimer is stopped
     771              signal.setitimer(self.itimer, 0)
     772  
     773      def sig_alrm(self, *args):
     774          self.hndl_called = True
     775  
     776      def sig_vtalrm(self, *args):
     777          self.hndl_called = True
     778  
     779          if self.hndl_count > 3:
     780              # it shouldn't be here, because it should have been disabled.
     781              raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
     782                  "timer.")
     783          elif self.hndl_count == 3:
     784              # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
     785              signal.setitimer(signal.ITIMER_VIRTUAL, 0)
     786  
     787          self.hndl_count += 1
     788  
     789      def sig_prof(self, *args):
     790          self.hndl_called = True
     791          signal.setitimer(signal.ITIMER_PROF, 0)
     792  
     793      def test_itimer_exc(self):
     794          # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
     795          # defines it ?
     796          self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
     797          # Negative times are treated as zero on some platforms.
     798          if 0:
     799              self.assertRaises(signal.ItimerError,
     800                                signal.setitimer, signal.ITIMER_REAL, -1)
     801  
     802      def test_itimer_real(self):
     803          self.itimer = signal.ITIMER_REAL
     804          signal.setitimer(self.itimer, 1.0)
     805          signal.pause()
     806          self.assertEqual(self.hndl_called, True)
     807  
     808      # Issue 3864, unknown if this affects earlier versions of freebsd also
     809      @unittest.skipIf(sys.platform in ('netbsd5',),
     810          'itimer not reliable (does not mix well with threading) on some BSDs.')
     811      def test_itimer_virtual(self):
     812          self.itimer = signal.ITIMER_VIRTUAL
     813          signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
     814          signal.setitimer(self.itimer, 0.3, 0.2)
     815  
     816          for _ in support.busy_retry(support.LONG_TIMEOUT):
     817              # use up some virtual time by doing real work
     818              _ = pow(12345, 67890, 10000019)
     819              if signal.getitimer(self.itimer) == (0.0, 0.0):
     820                  # sig_vtalrm handler stopped this itimer
     821                  break
     822  
     823          # virtual itimer should be (0.0, 0.0) now
     824          self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
     825          # and the handler should have been called
     826          self.assertEqual(self.hndl_called, True)
     827  
     828      def test_itimer_prof(self):
     829          self.itimer = signal.ITIMER_PROF
     830          signal.signal(signal.SIGPROF, self.sig_prof)
     831          signal.setitimer(self.itimer, 0.2, 0.2)
     832  
     833          for _ in support.busy_retry(support.LONG_TIMEOUT):
     834              # do some work
     835              _ = pow(12345, 67890, 10000019)
     836              if signal.getitimer(self.itimer) == (0.0, 0.0):
     837                  # sig_prof handler stopped this itimer
     838                  break
     839  
     840          # profiling itimer should be (0.0, 0.0) now
     841          self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
     842          # and the handler should have been called
     843          self.assertEqual(self.hndl_called, True)
     844  
     845      def test_setitimer_tiny(self):
     846          # bpo-30807: C setitimer() takes a microsecond-resolution interval.
     847          # Check that float -> timeval conversion doesn't round
     848          # the interval down to zero, which would disable the timer.
     849          self.itimer = signal.ITIMER_REAL
     850          signal.setitimer(self.itimer, 1e-6)
     851          time.sleep(1)
     852          self.assertEqual(self.hndl_called, True)
     853  
     854  
     855  class ESC[4;38;5;81mPendingSignalsTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     856      """
     857      Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
     858      functions.
     859      """
     860      @unittest.skipUnless(hasattr(signal, 'sigpending'),
     861                           'need signal.sigpending()')
     862      def test_sigpending_empty(self):
     863          self.assertEqual(signal.sigpending(), set())
     864  
     865      @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
     866                           'need signal.pthread_sigmask()')
     867      @unittest.skipUnless(hasattr(signal, 'sigpending'),
     868                           'need signal.sigpending()')
     869      def test_sigpending(self):
     870          code = """if 1:
     871              import os
     872              import signal
     873  
     874              def handler(signum, frame):
     875                  1/0
     876  
     877              signum = signal.SIGUSR1
     878              signal.signal(signum, handler)
     879  
     880              signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
     881              os.kill(os.getpid(), signum)
     882              pending = signal.sigpending()
     883              for sig in pending:
     884                  assert isinstance(sig, signal.Signals), repr(pending)
     885              if pending != {signum}:
     886                  raise Exception('%s != {%s}' % (pending, signum))
     887              try:
     888                  signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
     889              except ZeroDivisionError:
     890                  pass
     891              else:
     892                  raise Exception("ZeroDivisionError not raised")
     893          """
     894          assert_python_ok('-c', code)
     895  
     896      @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
     897                           'need signal.pthread_kill()')
     898      @threading_helper.requires_working_threading()
     899      def test_pthread_kill(self):
     900          code = """if 1:
     901              import signal
     902              import threading
     903              import sys
     904  
     905              signum = signal.SIGUSR1
     906  
     907              def handler(signum, frame):
     908                  1/0
     909  
     910              signal.signal(signum, handler)
     911  
     912              tid = threading.get_ident()
     913              try:
     914                  signal.pthread_kill(tid, signum)
     915              except ZeroDivisionError:
     916                  pass
     917              else:
     918                  raise Exception("ZeroDivisionError not raised")
     919          """
     920          assert_python_ok('-c', code)
     921  
     922      @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
     923                           'need signal.pthread_sigmask()')
     924      def wait_helper(self, blocked, test):
     925          """
     926          test: body of the "def test(signum):" function.
     927          blocked: number of the blocked signal
     928          """
     929          code = '''if 1:
     930          import signal
     931          import sys
     932          from signal import Signals
     933  
     934          def handler(signum, frame):
     935              1/0
     936  
     937          %s
     938  
     939          blocked = %s
     940          signum = signal.SIGALRM
     941  
     942          # child: block and wait the signal
     943          try:
     944              signal.signal(signum, handler)
     945              signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
     946  
     947              # Do the tests
     948              test(signum)
     949  
     950              # The handler must not be called on unblock
     951              try:
     952                  signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
     953              except ZeroDivisionError:
     954                  print("the signal handler has been called",
     955                        file=sys.stderr)
     956                  sys.exit(1)
     957          except BaseException as err:
     958              print("error: {}".format(err), file=sys.stderr)
     959              sys.stderr.flush()
     960              sys.exit(1)
     961          ''' % (test.strip(), blocked)
     962  
     963          # sig*wait* must be called with the signal blocked: since the current
     964          # process might have several threads running, use a subprocess to have
     965          # a single thread.
     966          assert_python_ok('-c', code)
     967  
     968      @unittest.skipUnless(hasattr(signal, 'sigwait'),
     969                           'need signal.sigwait()')
     970      def test_sigwait(self):
     971          self.wait_helper(signal.SIGALRM, '''
     972          def test(signum):
     973              signal.alarm(1)
     974              received = signal.sigwait([signum])
     975              assert isinstance(received, signal.Signals), received
     976              if received != signum:
     977                  raise Exception('received %s, not %s' % (received, signum))
     978          ''')
     979  
     980      @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
     981                           'need signal.sigwaitinfo()')
     982      def test_sigwaitinfo(self):
     983          self.wait_helper(signal.SIGALRM, '''
     984          def test(signum):
     985              signal.alarm(1)
     986              info = signal.sigwaitinfo([signum])
     987              if info.si_signo != signum:
     988                  raise Exception("info.si_signo != %s" % signum)
     989          ''')
     990  
     991      @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
     992                           'need signal.sigtimedwait()')
     993      def test_sigtimedwait(self):
     994          self.wait_helper(signal.SIGALRM, '''
     995          def test(signum):
     996              signal.alarm(1)
     997              info = signal.sigtimedwait([signum], 10.1000)
     998              if info.si_signo != signum:
     999                  raise Exception('info.si_signo != %s' % signum)
    1000          ''')
    1001  
    1002      @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
    1003                           'need signal.sigtimedwait()')
    1004      def test_sigtimedwait_poll(self):
    1005          # check that polling with sigtimedwait works
    1006          self.wait_helper(signal.SIGALRM, '''
    1007          def test(signum):
    1008              import os
    1009              os.kill(os.getpid(), signum)
    1010              info = signal.sigtimedwait([signum], 0)
    1011              if info.si_signo != signum:
    1012                  raise Exception('info.si_signo != %s' % signum)
    1013          ''')
    1014  
    1015      @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
    1016                           'need signal.sigtimedwait()')
    1017      def test_sigtimedwait_timeout(self):
    1018          self.wait_helper(signal.SIGALRM, '''
    1019          def test(signum):
    1020              received = signal.sigtimedwait([signum], 1.0)
    1021              if received is not None:
    1022                  raise Exception("received=%r" % (received,))
    1023          ''')
    1024  
    1025      @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
    1026                           'need signal.sigtimedwait()')
    1027      def test_sigtimedwait_negative_timeout(self):
    1028          signum = signal.SIGALRM
    1029          self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0)
    1030  
    1031      @unittest.skipUnless(hasattr(signal, 'sigwait'),
    1032                           'need signal.sigwait()')
    1033      @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
    1034                           'need signal.pthread_sigmask()')
    1035      @threading_helper.requires_working_threading()
    1036      def test_sigwait_thread(self):
    1037          # Check that calling sigwait() from a thread doesn't suspend the whole
    1038          # process. A new interpreter is spawned to avoid problems when mixing
    1039          # threads and fork(): only async-safe functions are allowed between
    1040          # fork() and exec().
    1041          assert_python_ok("-c", """if True:
    1042              import os, threading, sys, time, signal
    1043  
    1044              # the default handler terminates the process
    1045              signum = signal.SIGUSR1
    1046  
    1047              def kill_later():
    1048                  # wait until the main thread is waiting in sigwait()
    1049                  time.sleep(1)
    1050                  os.kill(os.getpid(), signum)
    1051  
    1052              # the signal must be blocked by all the threads
    1053              signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
    1054              killer = threading.Thread(target=kill_later)
    1055              killer.start()
    1056              received = signal.sigwait([signum])
    1057              if received != signum:
    1058                  print("sigwait() received %s, not %s" % (received, signum),
    1059                        file=sys.stderr)
    1060                  sys.exit(1)
    1061              killer.join()
    1062              # unblock the signal, which should have been cleared by sigwait()
    1063              signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
    1064          """)
    1065  
    1066      @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
    1067                           'need signal.pthread_sigmask()')
    1068      def test_pthread_sigmask_arguments(self):
    1069          self.assertRaises(TypeError, signal.pthread_sigmask)
    1070          self.assertRaises(TypeError, signal.pthread_sigmask, 1)
    1071          self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3)
    1072          self.assertRaises(OSError, signal.pthread_sigmask, 1700, [])
    1073          with self.assertRaises(ValueError):
    1074              signal.pthread_sigmask(signal.SIG_BLOCK, [signal.NSIG])
    1075          with self.assertRaises(ValueError):
    1076              signal.pthread_sigmask(signal.SIG_BLOCK, [0])
    1077          with self.assertRaises(ValueError):
    1078              signal.pthread_sigmask(signal.SIG_BLOCK, [1<<1000])
    1079  
    1080      @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
    1081                           'need signal.pthread_sigmask()')
    1082      def test_pthread_sigmask_valid_signals(self):
    1083          s = signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals())
    1084          self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, s)
    1085          # Get current blocked set
    1086          s = signal.pthread_sigmask(signal.SIG_UNBLOCK, signal.valid_signals())
    1087          self.assertLessEqual(s, signal.valid_signals())
    1088  
    1089      @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
    1090                           'need signal.pthread_sigmask()')
    1091      @threading_helper.requires_working_threading()
    1092      def test_pthread_sigmask(self):
    1093          code = """if 1:
    1094          import signal
    1095          import os; import threading
    1096  
    1097          def handler(signum, frame):
    1098              1/0
    1099  
    1100          def kill(signum):
    1101              os.kill(os.getpid(), signum)
    1102  
    1103          def check_mask(mask):
    1104              for sig in mask:
    1105                  assert isinstance(sig, signal.Signals), repr(sig)
    1106  
    1107          def read_sigmask():
    1108              sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
    1109              check_mask(sigmask)
    1110              return sigmask
    1111  
    1112          signum = signal.SIGUSR1
    1113  
    1114          # Install our signal handler
    1115          old_handler = signal.signal(signum, handler)
    1116  
    1117          # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
    1118          old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
    1119          check_mask(old_mask)
    1120          try:
    1121              kill(signum)
    1122          except ZeroDivisionError:
    1123              pass
    1124          else:
    1125              raise Exception("ZeroDivisionError not raised")
    1126  
    1127          # Block and then raise SIGUSR1. The signal is blocked: the signal
    1128          # handler is not called, and the signal is now pending
    1129          mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
    1130          check_mask(mask)
    1131          kill(signum)
    1132  
    1133          # Check the new mask
    1134          blocked = read_sigmask()
    1135          check_mask(blocked)
    1136          if signum not in blocked:
    1137              raise Exception("%s not in %s" % (signum, blocked))
    1138          if old_mask ^ blocked != {signum}:
    1139              raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))
    1140  
    1141          # Unblock SIGUSR1
    1142          try:
    1143              # unblock the pending signal calls immediately the signal handler
    1144              signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
    1145          except ZeroDivisionError:
    1146              pass
    1147          else:
    1148              raise Exception("ZeroDivisionError not raised")
    1149          try:
    1150              kill(signum)
    1151          except ZeroDivisionError:
    1152              pass
    1153          else:
    1154              raise Exception("ZeroDivisionError not raised")
    1155  
    1156          # Check the new mask
    1157          unblocked = read_sigmask()
    1158          if signum in unblocked:
    1159              raise Exception("%s in %s" % (signum, unblocked))
    1160          if blocked ^ unblocked != {signum}:
    1161              raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
    1162          if old_mask != unblocked:
    1163              raise Exception("%s != %s" % (old_mask, unblocked))
    1164          """
    1165          assert_python_ok('-c', code)
    1166  
    1167      @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
    1168                           'need signal.pthread_kill()')
    1169      @threading_helper.requires_working_threading()
    1170      def test_pthread_kill_main_thread(self):
    1171          # Test that a signal can be sent to the main thread with pthread_kill()
    1172          # before any other thread has been created (see issue #12392).
    1173          code = """if True:
    1174              import threading
    1175              import signal
    1176              import sys
    1177  
    1178              def handler(signum, frame):
    1179                  sys.exit(3)
    1180  
    1181              signal.signal(signal.SIGUSR1, handler)
    1182              signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
    1183              sys.exit(2)
    1184          """
    1185  
    1186          with spawn_python('-c', code) as process:
    1187              stdout, stderr = process.communicate()
    1188              exitcode = process.wait()
    1189              if exitcode != 3:
    1190                  raise Exception("Child error (exit code %s): %s" %
    1191                                  (exitcode, stdout))
    1192  
    1193  
    1194  class ESC[4;38;5;81mStressTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1195      """
    1196      Stress signal delivery, especially when a signal arrives in
    1197      the middle of recomputing the signal state or executing
    1198      previously tripped signal handlers.
    1199      """
    1200  
    1201      def setsig(self, signum, handler):
    1202          old_handler = signal.signal(signum, handler)
    1203          self.addCleanup(signal.signal, signum, old_handler)
    1204  
    1205      def measure_itimer_resolution(self):
    1206          N = 20
    1207          times = []
    1208  
    1209          def handler(signum=None, frame=None):
    1210              if len(times) < N:
    1211                  times.append(time.perf_counter())
    1212                  # 1 µs is the smallest possible timer interval,
    1213                  # we want to measure what the concrete duration
    1214                  # will be on this platform
    1215                  signal.setitimer(signal.ITIMER_REAL, 1e-6)
    1216  
    1217          self.addCleanup(signal.setitimer, signal.ITIMER_REAL, 0)
    1218          self.setsig(signal.SIGALRM, handler)
    1219          handler()
    1220          while len(times) < N:
    1221              time.sleep(1e-3)
    1222  
    1223          durations = [times[i+1] - times[i] for i in range(len(times) - 1)]
    1224          med = statistics.median(durations)
    1225          if support.verbose:
    1226              print("detected median itimer() resolution: %.6f s." % (med,))
    1227          return med
    1228  
    1229      def decide_itimer_count(self):
    1230          # Some systems have poor setitimer() resolution (for example
    1231          # measured around 20 ms. on FreeBSD 9), so decide on a reasonable
    1232          # number of sequential timers based on that.
    1233          reso = self.measure_itimer_resolution()
    1234          if reso <= 1e-4:
    1235              return 10000
    1236          elif reso <= 1e-2:
    1237              return 100
    1238          else:
    1239              self.skipTest("detected itimer resolution (%.3f s.) too high "
    1240                            "(> 10 ms.) on this platform (or system too busy)"
    1241                            % (reso,))
    1242  
    1243      @unittest.skipUnless(hasattr(signal, "setitimer"),
    1244                           "test needs setitimer()")
    1245      def test_stress_delivery_dependent(self):
    1246          """
    1247          This test uses dependent signal handlers.
    1248          """
    1249          N = self.decide_itimer_count()
    1250          sigs = []
    1251  
    1252          def first_handler(signum, frame):
    1253              # 1e-6 is the minimum non-zero value for `setitimer()`.
    1254              # Choose a random delay so as to improve chances of
    1255              # triggering a race condition.  Ideally the signal is received
    1256              # when inside critical signal-handling routines such as
    1257              # Py_MakePendingCalls().
    1258              signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
    1259  
    1260          def second_handler(signum=None, frame=None):
    1261              sigs.append(signum)
    1262  
    1263          # Here on Linux, SIGPROF > SIGALRM > SIGUSR1.  By using both
    1264          # ascending and descending sequences (SIGUSR1 then SIGALRM,
    1265          # SIGPROF then SIGALRM), we maximize chances of hitting a bug.
    1266          self.setsig(signal.SIGPROF, first_handler)
    1267          self.setsig(signal.SIGUSR1, first_handler)
    1268          self.setsig(signal.SIGALRM, second_handler)  # for ITIMER_REAL
    1269  
    1270          expected_sigs = 0
    1271          deadline = time.monotonic() + support.SHORT_TIMEOUT
    1272  
    1273          while expected_sigs < N:
    1274              os.kill(os.getpid(), signal.SIGPROF)
    1275              expected_sigs += 1
    1276              # Wait for handlers to run to avoid signal coalescing
    1277              while len(sigs) < expected_sigs and time.monotonic() < deadline:
    1278                  time.sleep(1e-5)
    1279  
    1280              os.kill(os.getpid(), signal.SIGUSR1)
    1281              expected_sigs += 1
    1282              while len(sigs) < expected_sigs and time.monotonic() < deadline:
    1283                  time.sleep(1e-5)
    1284  
    1285          # All ITIMER_REAL signals should have been delivered to the
    1286          # Python handler
    1287          self.assertEqual(len(sigs), N, "Some signals were lost")
    1288  
    1289      @unittest.skipUnless(hasattr(signal, "setitimer"),
    1290                           "test needs setitimer()")
    1291      def test_stress_delivery_simultaneous(self):
    1292          """
    1293          This test uses simultaneous signal handlers.
    1294          """
    1295          N = self.decide_itimer_count()
    1296          sigs = []
    1297  
    1298          def handler(signum, frame):
    1299              sigs.append(signum)
    1300  
    1301          self.setsig(signal.SIGUSR1, handler)
    1302          self.setsig(signal.SIGALRM, handler)  # for ITIMER_REAL
    1303  
    1304          expected_sigs = 0
    1305          while expected_sigs < N:
    1306              # Hopefully the SIGALRM will be received somewhere during
    1307              # initial processing of SIGUSR1.
    1308              signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
    1309              os.kill(os.getpid(), signal.SIGUSR1)
    1310  
    1311              expected_sigs += 2
    1312              # Wait for handlers to run to avoid signal coalescing
    1313              for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
    1314                  if len(sigs) >= expected_sigs:
    1315                      break
    1316  
    1317          # All ITIMER_REAL signals should have been delivered to the
    1318          # Python handler
    1319          self.assertEqual(len(sigs), N, "Some signals were lost")
    1320  
    1321      @unittest.skipUnless(hasattr(signal, "SIGUSR1"),
    1322                           "test needs SIGUSR1")
    1323      @threading_helper.requires_working_threading()
    1324      def test_stress_modifying_handlers(self):
    1325          # bpo-43406: race condition between trip_signal() and signal.signal
    1326          signum = signal.SIGUSR1
    1327          num_sent_signals = 0
    1328          num_received_signals = 0
    1329          do_stop = False
    1330  
    1331          def custom_handler(signum, frame):
    1332              nonlocal num_received_signals
    1333              num_received_signals += 1
    1334  
    1335          def set_interrupts():
    1336              nonlocal num_sent_signals
    1337              while not do_stop:
    1338                  signal.raise_signal(signum)
    1339                  num_sent_signals += 1
    1340  
    1341          def cycle_handlers():
    1342              while num_sent_signals < 100:
    1343                  for i in range(20000):
    1344                      # Cycle between a Python-defined and a non-Python handler
    1345                      for handler in [custom_handler, signal.SIG_IGN]:
    1346                          signal.signal(signum, handler)
    1347  
    1348          old_handler = signal.signal(signum, custom_handler)
    1349          self.addCleanup(signal.signal, signum, old_handler)
    1350  
    1351          t = threading.Thread(target=set_interrupts)
    1352          try:
    1353              ignored = False
    1354              with support.catch_unraisable_exception() as cm:
    1355                  t.start()
    1356                  cycle_handlers()
    1357                  do_stop = True
    1358                  t.join()
    1359  
    1360                  if cm.unraisable is not None:
    1361                      # An unraisable exception may be printed out when
    1362                      # a signal is ignored due to the aforementioned
    1363                      # race condition, check it.
    1364                      self.assertIsInstance(cm.unraisable.exc_value, OSError)
    1365                      self.assertIn(
    1366                          f"Signal {signum:d} ignored due to race condition",
    1367                          str(cm.unraisable.exc_value))
    1368                      ignored = True
    1369  
    1370              # bpo-43406: Even if it is unlikely, it's technically possible that
    1371              # all signals were ignored because of race conditions.
    1372              if not ignored:
    1373                  # Sanity check that some signals were received, but not all
    1374                  self.assertGreater(num_received_signals, 0)
    1375              self.assertLess(num_received_signals, num_sent_signals)
    1376          finally:
    1377              do_stop = True
    1378              t.join()
    1379  
    1380  
    1381  class ESC[4;38;5;81mRaiseSignalTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1382  
    1383      def test_sigint(self):
    1384          with self.assertRaises(KeyboardInterrupt):
    1385              signal.raise_signal(signal.SIGINT)
    1386  
    1387      @unittest.skipIf(sys.platform != "win32", "Windows specific test")
    1388      def test_invalid_argument(self):
    1389          try:
    1390              SIGHUP = 1 # not supported on win32
    1391              signal.raise_signal(SIGHUP)
    1392              self.fail("OSError (Invalid argument) expected")
    1393          except OSError as e:
    1394              if e.errno == errno.EINVAL:
    1395                  pass
    1396              else:
    1397                  raise
    1398  
    1399      def test_handler(self):
    1400          is_ok = False
    1401          def handler(a, b):
    1402              nonlocal is_ok
    1403              is_ok = True
    1404          old_signal = signal.signal(signal.SIGINT, handler)
    1405          self.addCleanup(signal.signal, signal.SIGINT, old_signal)
    1406  
    1407          signal.raise_signal(signal.SIGINT)
    1408          self.assertTrue(is_ok)
    1409  
    1410      def test__thread_interrupt_main(self):
    1411          # See https://github.com/python/cpython/issues/102397
    1412          code = """if 1:
    1413          import _thread
    1414          class Foo():
    1415              def __del__(self):
    1416                  _thread.interrupt_main()
    1417  
    1418          x = Foo()
    1419          """
    1420  
    1421          rc, out, err = assert_python_ok('-c', code)
    1422          self.assertIn(b'OSError: Signal 2 ignored due to race condition', err)
    1423  
    1424  
    1425  
    1426  class ESC[4;38;5;81mPidfdSignalTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1427  
    1428      @unittest.skipUnless(
    1429          hasattr(signal, "pidfd_send_signal"),
    1430          "pidfd support not built in",
    1431      )
    1432      def test_pidfd_send_signal(self):
    1433          with self.assertRaises(OSError) as cm:
    1434              signal.pidfd_send_signal(0, signal.SIGINT)
    1435          if cm.exception.errno == errno.ENOSYS:
    1436              self.skipTest("kernel does not support pidfds")
    1437          elif cm.exception.errno == errno.EPERM:
    1438              self.skipTest("Not enough privileges to use pidfs")
    1439          self.assertEqual(cm.exception.errno, errno.EBADF)
    1440          my_pidfd = os.open(f'/proc/{os.getpid()}', os.O_DIRECTORY)
    1441          self.addCleanup(os.close, my_pidfd)
    1442          with self.assertRaisesRegex(TypeError, "^siginfo must be None$"):
    1443              signal.pidfd_send_signal(my_pidfd, signal.SIGINT, object(), 0)
    1444          with self.assertRaises(KeyboardInterrupt):
    1445              signal.pidfd_send_signal(my_pidfd, signal.SIGINT)
    1446  
    1447  def tearDownModule():
    1448      support.reap_children()
    1449  
    1450  if __name__ == "__main__":
    1451      unittest.main()