python (3.11.7)

(root)/
lib/
python3.11/
unittest/
test/
test_break.py
       1  import gc
       2  import io
       3  import os
       4  import sys
       5  import signal
       6  import weakref
       7  import unittest
       8  
       9  from test import support
      10  
      11  
      12  @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
      13  @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
      14  class ESC[4;38;5;81mTestBreak(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      15      int_handler = None
      16      # This number was smart-guessed, previously tests were failing
      17      # after 7th run. So, we take `x * 2 + 1` to be sure.
      18      default_repeats = 15
      19  
      20      def setUp(self):
      21          self._default_handler = signal.getsignal(signal.SIGINT)
      22          if self.int_handler is not None:
      23              signal.signal(signal.SIGINT, self.int_handler)
      24  
      25      def tearDown(self):
      26          signal.signal(signal.SIGINT, self._default_handler)
      27          unittest.signals._results = weakref.WeakKeyDictionary()
      28          unittest.signals._interrupt_handler = None
      29  
      30  
      31      def withRepeats(self, test_function, repeats=None):
      32          if not support.check_impl_detail(cpython=True):
      33              # Override repeats count on non-cpython to execute only once.
      34              # Because this test only makes sense to be repeated on CPython.
      35              repeats = 1
      36          elif repeats is None:
      37              repeats = self.default_repeats
      38  
      39          for repeat in range(repeats):
      40              with self.subTest(repeat=repeat):
      41                  # We don't run `setUp` for the very first repeat
      42                  # and we don't run `tearDown` for the very last one,
      43                  # because they are handled by the test class itself.
      44                  if repeat != 0:
      45                      self.setUp()
      46                  try:
      47                      test_function()
      48                  finally:
      49                      if repeat != repeats - 1:
      50                          self.tearDown()
      51  
      52      def testInstallHandler(self):
      53          default_handler = signal.getsignal(signal.SIGINT)
      54          unittest.installHandler()
      55          self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
      56  
      57          try:
      58              pid = os.getpid()
      59              os.kill(pid, signal.SIGINT)
      60          except KeyboardInterrupt:
      61              self.fail("KeyboardInterrupt not handled")
      62  
      63          self.assertTrue(unittest.signals._interrupt_handler.called)
      64  
      65      def testRegisterResult(self):
      66          result = unittest.TestResult()
      67          self.assertNotIn(result, unittest.signals._results)
      68  
      69          unittest.registerResult(result)
      70          try:
      71              self.assertIn(result, unittest.signals._results)
      72          finally:
      73              unittest.removeResult(result)
      74  
      75      def testInterruptCaught(self):
      76          def test(result):
      77              pid = os.getpid()
      78              os.kill(pid, signal.SIGINT)
      79              result.breakCaught = True
      80              self.assertTrue(result.shouldStop)
      81  
      82          def test_function():
      83              result = unittest.TestResult()
      84              unittest.installHandler()
      85              unittest.registerResult(result)
      86  
      87              self.assertNotEqual(
      88                  signal.getsignal(signal.SIGINT),
      89                  self._default_handler,
      90              )
      91  
      92              try:
      93                  test(result)
      94              except KeyboardInterrupt:
      95                  self.fail("KeyboardInterrupt not handled")
      96              self.assertTrue(result.breakCaught)
      97          self.withRepeats(test_function)
      98  
      99      def testSecondInterrupt(self):
     100          # Can't use skipIf decorator because the signal handler may have
     101          # been changed after defining this method.
     102          if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
     103              self.skipTest("test requires SIGINT to not be ignored")
     104  
     105          def test(result):
     106              pid = os.getpid()
     107              os.kill(pid, signal.SIGINT)
     108              result.breakCaught = True
     109              self.assertTrue(result.shouldStop)
     110              os.kill(pid, signal.SIGINT)
     111              self.fail("Second KeyboardInterrupt not raised")
     112  
     113          def test_function():
     114              result = unittest.TestResult()
     115              unittest.installHandler()
     116              unittest.registerResult(result)
     117  
     118              with self.assertRaises(KeyboardInterrupt):
     119                  test(result)
     120              self.assertTrue(result.breakCaught)
     121          self.withRepeats(test_function)
     122  
     123  
     124      def testTwoResults(self):
     125          def test_function():
     126              unittest.installHandler()
     127  
     128              result = unittest.TestResult()
     129              unittest.registerResult(result)
     130              new_handler = signal.getsignal(signal.SIGINT)
     131  
     132              result2 = unittest.TestResult()
     133              unittest.registerResult(result2)
     134              self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
     135  
     136              result3 = unittest.TestResult()
     137  
     138              try:
     139                  os.kill(os.getpid(), signal.SIGINT)
     140              except KeyboardInterrupt:
     141                  self.fail("KeyboardInterrupt not handled")
     142  
     143              self.assertTrue(result.shouldStop)
     144              self.assertTrue(result2.shouldStop)
     145              self.assertFalse(result3.shouldStop)
     146          self.withRepeats(test_function)
     147  
     148  
     149      def testHandlerReplacedButCalled(self):
     150          # Can't use skipIf decorator because the signal handler may have
     151          # been changed after defining this method.
     152          if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
     153              self.skipTest("test requires SIGINT to not be ignored")
     154  
     155          def test_function():
     156              # If our handler has been replaced (is no longer installed) but is
     157              # called by the *new* handler, then it isn't safe to delay the
     158              # SIGINT and we should immediately delegate to the default handler
     159              unittest.installHandler()
     160  
     161              handler = signal.getsignal(signal.SIGINT)
     162              def new_handler(frame, signum):
     163                  handler(frame, signum)
     164              signal.signal(signal.SIGINT, new_handler)
     165  
     166              try:
     167                  os.kill(os.getpid(), signal.SIGINT)
     168              except KeyboardInterrupt:
     169                  pass
     170              else:
     171                  self.fail("replaced but delegated handler doesn't raise interrupt")
     172          self.withRepeats(test_function)
     173  
     174      def testRunner(self):
     175          # Creating a TextTestRunner with the appropriate argument should
     176          # register the TextTestResult it creates
     177          runner = unittest.TextTestRunner(stream=io.StringIO())
     178  
     179          result = runner.run(unittest.TestSuite())
     180          self.assertIn(result, unittest.signals._results)
     181  
     182      def testWeakReferences(self):
     183          # Calling registerResult on a result should not keep it alive
     184          result = unittest.TestResult()
     185          unittest.registerResult(result)
     186  
     187          ref = weakref.ref(result)
     188          del result
     189  
     190          # For non-reference counting implementations
     191          gc.collect();gc.collect()
     192          self.assertIsNone(ref())
     193  
     194  
     195      def testRemoveResult(self):
     196          result = unittest.TestResult()
     197          unittest.registerResult(result)
     198  
     199          unittest.installHandler()
     200          self.assertTrue(unittest.removeResult(result))
     201  
     202          # Should this raise an error instead?
     203          self.assertFalse(unittest.removeResult(unittest.TestResult()))
     204  
     205          try:
     206              pid = os.getpid()
     207              os.kill(pid, signal.SIGINT)
     208          except KeyboardInterrupt:
     209              pass
     210  
     211          self.assertFalse(result.shouldStop)
     212  
     213      def testMainInstallsHandler(self):
     214          failfast = object()
     215          test = object()
     216          verbosity = object()
     217          result = object()
     218          default_handler = signal.getsignal(signal.SIGINT)
     219  
     220          class ESC[4;38;5;81mFakeRunner(ESC[4;38;5;149mobject):
     221              initArgs = []
     222              runArgs = []
     223              def __init__(self, *args, **kwargs):
     224                  self.initArgs.append((args, kwargs))
     225              def run(self, test):
     226                  self.runArgs.append(test)
     227                  return result
     228  
     229          class ESC[4;38;5;81mProgram(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestProgram):
     230              def __init__(self, catchbreak):
     231                  self.exit = False
     232                  self.verbosity = verbosity
     233                  self.failfast = failfast
     234                  self.catchbreak = catchbreak
     235                  self.tb_locals = False
     236                  self.testRunner = FakeRunner
     237                  self.test = test
     238                  self.result = None
     239  
     240          p = Program(False)
     241          p.runTests()
     242  
     243          self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
     244                                                       'verbosity': verbosity,
     245                                                       'failfast': failfast,
     246                                                       'tb_locals': False,
     247                                                       'warnings': None})])
     248          self.assertEqual(FakeRunner.runArgs, [test])
     249          self.assertEqual(p.result, result)
     250  
     251          self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
     252  
     253          FakeRunner.initArgs = []
     254          FakeRunner.runArgs = []
     255          p = Program(True)
     256          p.runTests()
     257  
     258          self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
     259                                                       'verbosity': verbosity,
     260                                                       'failfast': failfast,
     261                                                       'tb_locals': False,
     262                                                       'warnings': None})])
     263          self.assertEqual(FakeRunner.runArgs, [test])
     264          self.assertEqual(p.result, result)
     265  
     266          self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
     267  
     268      def testRemoveHandler(self):
     269          default_handler = signal.getsignal(signal.SIGINT)
     270          unittest.installHandler()
     271          unittest.removeHandler()
     272          self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
     273  
     274          # check that calling removeHandler multiple times has no ill-effect
     275          unittest.removeHandler()
     276          self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
     277  
     278      def testRemoveHandlerAsDecorator(self):
     279          default_handler = signal.getsignal(signal.SIGINT)
     280          unittest.installHandler()
     281  
     282          @unittest.removeHandler
     283          def test():
     284              self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
     285  
     286          test()
     287          self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
     288  
     289  @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
     290  @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
     291  class ESC[4;38;5;81mTestBreakDefaultIntHandler(ESC[4;38;5;149mTestBreak):
     292      int_handler = signal.default_int_handler
     293  
     294  @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
     295  @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
     296  class ESC[4;38;5;81mTestBreakSignalIgnored(ESC[4;38;5;149mTestBreak):
     297      int_handler = signal.SIG_IGN
     298  
     299  @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
     300  @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
     301  class ESC[4;38;5;81mTestBreakSignalDefault(ESC[4;38;5;149mTestBreak):
     302      int_handler = signal.SIG_DFL
     303  
     304  
     305  if __name__ == "__main__":
     306      unittest.main()