python (3.11.7)

(root)/
lib/
python3.11/
test/
signalinterproctester.py
       1  import gc
       2  import os
       3  import signal
       4  import subprocess
       5  import sys
       6  import time
       7  import unittest
       8  from test import support
       9  
      10  
      11  class ESC[4;38;5;81mSIGUSR1Exception(ESC[4;38;5;149mException):
      12      pass
      13  
      14  
      15  class ESC[4;38;5;81mInterProcessSignalTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      16      def setUp(self):
      17          self.got_signals = {'SIGHUP': 0, 'SIGUSR1': 0, 'SIGALRM': 0}
      18  
      19      def sighup_handler(self, signum, frame):
      20          self.got_signals['SIGHUP'] += 1
      21  
      22      def sigusr1_handler(self, signum, frame):
      23          self.got_signals['SIGUSR1'] += 1
      24          raise SIGUSR1Exception
      25  
      26      def wait_signal(self, child, signame):
      27          if child is not None:
      28              # This wait should be interrupted by exc_class
      29              # (if set)
      30              child.wait()
      31  
      32          timeout = support.SHORT_TIMEOUT
      33          deadline = time.monotonic() + timeout
      34  
      35          while time.monotonic() < deadline:
      36              if self.got_signals[signame]:
      37                  return
      38              signal.pause()
      39  
      40          self.fail('signal %s not received after %s seconds'
      41                    % (signame, timeout))
      42  
      43      def subprocess_send_signal(self, pid, signame):
      44          code = 'import os, signal; os.kill(%s, signal.%s)' % (pid, signame)
      45          args = [sys.executable, '-I', '-c', code]
      46          return subprocess.Popen(args)
      47  
      48      def test_interprocess_signal(self):
      49          # Install handlers. This function runs in a sub-process, so we
      50          # don't worry about re-setting the default handlers.
      51          signal.signal(signal.SIGHUP, self.sighup_handler)
      52          signal.signal(signal.SIGUSR1, self.sigusr1_handler)
      53          signal.signal(signal.SIGUSR2, signal.SIG_IGN)
      54          signal.signal(signal.SIGALRM, signal.default_int_handler)
      55  
      56          # Let the sub-processes know who to send signals to.
      57          pid = str(os.getpid())
      58  
      59          with self.subprocess_send_signal(pid, "SIGHUP") as child:
      60              self.wait_signal(child, 'SIGHUP')
      61          self.assertEqual(self.got_signals, {'SIGHUP': 1, 'SIGUSR1': 0,
      62                                              'SIGALRM': 0})
      63  
      64          # gh-110033: Make sure that the subprocess.Popen is deleted before
      65          # the next test which raises an exception. Otherwise, the exception
      66          # may be raised when Popen.__del__() is executed and so be logged
      67          # as "Exception ignored in: <function Popen.__del__ at ...>".
      68          child = None
      69          gc.collect()
      70  
      71          with self.assertRaises(SIGUSR1Exception):
      72              with self.subprocess_send_signal(pid, "SIGUSR1") as child:
      73                  self.wait_signal(child, 'SIGUSR1')
      74          self.assertEqual(self.got_signals, {'SIGHUP': 1, 'SIGUSR1': 1,
      75                                              'SIGALRM': 0})
      76  
      77          with self.subprocess_send_signal(pid, "SIGUSR2") as child:
      78              # Nothing should happen: SIGUSR2 is ignored
      79              child.wait()
      80  
      81          try:
      82              with self.assertRaises(KeyboardInterrupt):
      83                  signal.alarm(1)
      84                  self.wait_signal(None, 'SIGALRM')
      85              self.assertEqual(self.got_signals, {'SIGHUP': 1, 'SIGUSR1': 1,
      86                                                  'SIGALRM': 0})
      87          finally:
      88              signal.alarm(0)
      89  
      90  
      91  if __name__ == "__main__":
      92      unittest.main()