(root)/
Python-3.11.7/
Lib/
test/
test_threadsignals.py
       1  """PyUnit testing that threads honor our signal semantics"""
       2  
       3  import unittest
       4  import signal
       5  import os
       6  import sys
       7  from test.support import threading_helper
       8  import _thread as thread
       9  import time
      10  
      11  if (sys.platform[:3] == 'win'):
      12      raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
      13  
      14  process_pid = os.getpid()
      15  signalled_all=thread.allocate_lock()
      16  
      17  USING_PTHREAD_COND = (sys.thread_info.name == 'pthread'
      18                        and sys.thread_info.lock == 'mutex+cond')
      19  
      20  def registerSignals(for_usr1, for_usr2, for_alrm):
      21      usr1 = signal.signal(signal.SIGUSR1, for_usr1)
      22      usr2 = signal.signal(signal.SIGUSR2, for_usr2)
      23      alrm = signal.signal(signal.SIGALRM, for_alrm)
      24      return usr1, usr2, alrm
      25  
      26  
      27  # The signal handler. Just note that the signal occurred and
      28  # from who.
      29  def handle_signals(sig,frame):
      30      signal_blackboard[sig]['tripped'] += 1
      31      signal_blackboard[sig]['tripped_by'] = thread.get_ident()
      32  
      33  # a function that will be spawned as a separate thread.
      34  def send_signals():
      35      os.kill(process_pid, signal.SIGUSR1)
      36      os.kill(process_pid, signal.SIGUSR2)
      37      signalled_all.release()
      38  
      39  
      40  @threading_helper.requires_working_threading()
      41  @unittest.skipUnless(hasattr(signal, "alarm"), "test requires signal.alarm")
      42  class ESC[4;38;5;81mThreadSignals(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      43  
      44      def test_signals(self):
      45          with threading_helper.wait_threads_exit():
      46              # Test signal handling semantics of threads.
      47              # We spawn a thread, have the thread send two signals, and
      48              # wait for it to finish. Check that we got both signals
      49              # and that they were run by the main thread.
      50              signalled_all.acquire()
      51              self.spawnSignallingThread()
      52              signalled_all.acquire()
      53  
      54          # the signals that we asked the kernel to send
      55          # will come back, but we don't know when.
      56          # (it might even be after the thread exits
      57          # and might be out of order.)  If we haven't seen
      58          # the signals yet, send yet another signal and
      59          # wait for it return.
      60          if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
      61             or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
      62              try:
      63                  signal.alarm(1)
      64                  signal.pause()
      65              finally:
      66                  signal.alarm(0)
      67  
      68          self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
      69          self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
      70                             thread.get_ident())
      71          self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
      72          self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
      73                             thread.get_ident())
      74          signalled_all.release()
      75  
      76      def spawnSignallingThread(self):
      77          thread.start_new_thread(send_signals, ())
      78  
      79      def alarm_interrupt(self, sig, frame):
      80          raise KeyboardInterrupt
      81  
      82      @unittest.skipIf(USING_PTHREAD_COND,
      83                       'POSIX condition variables cannot be interrupted')
      84      @unittest.skipIf(sys.platform.startswith('linux') and
      85                       not sys.thread_info.version,
      86                       'Issue 34004: musl does not allow interruption of locks '
      87                       'by signals.')
      88      # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD
      89      @unittest.skipIf(sys.platform.startswith('openbsd'),
      90                       'lock cannot be interrupted on OpenBSD')
      91      def test_lock_acquire_interruption(self):
      92          # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
      93          # in a deadlock.
      94          # XXX this test can fail when the legacy (non-semaphore) implementation
      95          # of locks is used in thread_pthread.h, see issue #11223.
      96          oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
      97          try:
      98              lock = thread.allocate_lock()
      99              lock.acquire()
     100              signal.alarm(1)
     101              t1 = time.monotonic()
     102              self.assertRaises(KeyboardInterrupt, lock.acquire, timeout=5)
     103              dt = time.monotonic() - t1
     104              # Checking that KeyboardInterrupt was raised is not sufficient.
     105              # We want to assert that lock.acquire() was interrupted because
     106              # of the signal, not that the signal handler was called immediately
     107              # after timeout return of lock.acquire() (which can fool assertRaises).
     108              self.assertLess(dt, 3.0)
     109          finally:
     110              signal.alarm(0)
     111              signal.signal(signal.SIGALRM, oldalrm)
     112  
     113      @unittest.skipIf(USING_PTHREAD_COND,
     114                       'POSIX condition variables cannot be interrupted')
     115      @unittest.skipIf(sys.platform.startswith('linux') and
     116                       not sys.thread_info.version,
     117                       'Issue 34004: musl does not allow interruption of locks '
     118                       'by signals.')
     119      # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD
     120      @unittest.skipIf(sys.platform.startswith('openbsd'),
     121                       'lock cannot be interrupted on OpenBSD')
     122      def test_rlock_acquire_interruption(self):
     123          # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
     124          # in a deadlock.
     125          # XXX this test can fail when the legacy (non-semaphore) implementation
     126          # of locks is used in thread_pthread.h, see issue #11223.
     127          oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
     128          try:
     129              rlock = thread.RLock()
     130              # For reentrant locks, the initial acquisition must be in another
     131              # thread.
     132              def other_thread():
     133                  rlock.acquire()
     134  
     135              with threading_helper.wait_threads_exit():
     136                  thread.start_new_thread(other_thread, ())
     137                  # Wait until we can't acquire it without blocking...
     138                  while rlock.acquire(blocking=False):
     139                      rlock.release()
     140                      time.sleep(0.01)
     141                  signal.alarm(1)
     142                  t1 = time.monotonic()
     143                  self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5)
     144                  dt = time.monotonic() - t1
     145                  # See rationale above in test_lock_acquire_interruption
     146                  self.assertLess(dt, 3.0)
     147          finally:
     148              signal.alarm(0)
     149              signal.signal(signal.SIGALRM, oldalrm)
     150  
     151      def acquire_retries_on_intr(self, lock):
     152          self.sig_recvd = False
     153          def my_handler(signal, frame):
     154              self.sig_recvd = True
     155  
     156          old_handler = signal.signal(signal.SIGUSR1, my_handler)
     157          try:
     158              def other_thread():
     159                  # Acquire the lock in a non-main thread, so this test works for
     160                  # RLocks.
     161                  lock.acquire()
     162                  # Wait until the main thread is blocked in the lock acquire, and
     163                  # then wake it up with this.
     164                  time.sleep(0.5)
     165                  os.kill(process_pid, signal.SIGUSR1)
     166                  # Let the main thread take the interrupt, handle it, and retry
     167                  # the lock acquisition.  Then we'll let it run.
     168                  time.sleep(0.5)
     169                  lock.release()
     170  
     171              with threading_helper.wait_threads_exit():
     172                  thread.start_new_thread(other_thread, ())
     173                  # Wait until we can't acquire it without blocking...
     174                  while lock.acquire(blocking=False):
     175                      lock.release()
     176                      time.sleep(0.01)
     177                  result = lock.acquire()  # Block while we receive a signal.
     178                  self.assertTrue(self.sig_recvd)
     179                  self.assertTrue(result)
     180          finally:
     181              signal.signal(signal.SIGUSR1, old_handler)
     182  
     183      def test_lock_acquire_retries_on_intr(self):
     184          self.acquire_retries_on_intr(thread.allocate_lock())
     185  
     186      def test_rlock_acquire_retries_on_intr(self):
     187          self.acquire_retries_on_intr(thread.RLock())
     188  
     189      def test_interrupted_timed_acquire(self):
     190          # Test to make sure we recompute lock acquisition timeouts when we
     191          # receive a signal.  Check this by repeatedly interrupting a lock
     192          # acquire in the main thread, and make sure that the lock acquire times
     193          # out after the right amount of time.
     194          # NOTE: this test only behaves as expected if C signals get delivered
     195          # to the main thread.  Otherwise lock.acquire() itself doesn't get
     196          # interrupted and the test trivially succeeds.
     197          self.start = None
     198          self.end = None
     199          self.sigs_recvd = 0
     200          done = thread.allocate_lock()
     201          done.acquire()
     202          lock = thread.allocate_lock()
     203          lock.acquire()
     204          def my_handler(signum, frame):
     205              self.sigs_recvd += 1
     206          old_handler = signal.signal(signal.SIGUSR1, my_handler)
     207          try:
     208              def timed_acquire():
     209                  self.start = time.monotonic()
     210                  lock.acquire(timeout=0.5)
     211                  self.end = time.monotonic()
     212              def send_signals():
     213                  for _ in range(40):
     214                      time.sleep(0.02)
     215                      os.kill(process_pid, signal.SIGUSR1)
     216                  done.release()
     217  
     218              with threading_helper.wait_threads_exit():
     219                  # Send the signals from the non-main thread, since the main thread
     220                  # is the only one that can process signals.
     221                  thread.start_new_thread(send_signals, ())
     222                  timed_acquire()
     223                  # Wait for thread to finish
     224                  done.acquire()
     225                  # This allows for some timing and scheduling imprecision
     226                  self.assertLess(self.end - self.start, 2.0)
     227                  self.assertGreater(self.end - self.start, 0.3)
     228                  # If the signal is received several times before PyErr_CheckSignals()
     229                  # is called, the handler will get called less than 40 times. Just
     230                  # check it's been called at least once.
     231                  self.assertGreater(self.sigs_recvd, 0)
     232          finally:
     233              signal.signal(signal.SIGUSR1, old_handler)
     234  
     235  
     236  def setUpModule():
     237      global signal_blackboard
     238  
     239      signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 },
     240                            signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 },
     241                            signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } }
     242  
     243      oldsigs = registerSignals(handle_signals, handle_signals, handle_signals)
     244      unittest.addModuleCleanup(registerSignals, *oldsigs)
     245  
     246  
     247  if __name__ == '__main__':
     248      unittest.main()