python (3.11.7)
1 import gc
2 import io
3 import os
4 import sys
5 import signal
6 import weakref
7 import unittest
8
9 from test import support
10
11
12 @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
13 @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
14 class ESC[4;38;5;81mTestBreak(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
15 int_handler = None
16 # This number was smart-guessed, previously tests were failing
17 # after 7th run. So, we take `x * 2 + 1` to be sure.
18 default_repeats = 15
19
20 def setUp(self):
21 self._default_handler = signal.getsignal(signal.SIGINT)
22 if self.int_handler is not None:
23 signal.signal(signal.SIGINT, self.int_handler)
24
25 def tearDown(self):
26 signal.signal(signal.SIGINT, self._default_handler)
27 unittest.signals._results = weakref.WeakKeyDictionary()
28 unittest.signals._interrupt_handler = None
29
30
31 def withRepeats(self, test_function, repeats=None):
32 if not support.check_impl_detail(cpython=True):
33 # Override repeats count on non-cpython to execute only once.
34 # Because this test only makes sense to be repeated on CPython.
35 repeats = 1
36 elif repeats is None:
37 repeats = self.default_repeats
38
39 for repeat in range(repeats):
40 with self.subTest(repeat=repeat):
41 # We don't run `setUp` for the very first repeat
42 # and we don't run `tearDown` for the very last one,
43 # because they are handled by the test class itself.
44 if repeat != 0:
45 self.setUp()
46 try:
47 test_function()
48 finally:
49 if repeat != repeats - 1:
50 self.tearDown()
51
52 def testInstallHandler(self):
53 default_handler = signal.getsignal(signal.SIGINT)
54 unittest.installHandler()
55 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
56
57 try:
58 pid = os.getpid()
59 os.kill(pid, signal.SIGINT)
60 except KeyboardInterrupt:
61 self.fail("KeyboardInterrupt not handled")
62
63 self.assertTrue(unittest.signals._interrupt_handler.called)
64
65 def testRegisterResult(self):
66 result = unittest.TestResult()
67 self.assertNotIn(result, unittest.signals._results)
68
69 unittest.registerResult(result)
70 try:
71 self.assertIn(result, unittest.signals._results)
72 finally:
73 unittest.removeResult(result)
74
75 def testInterruptCaught(self):
76 def test(result):
77 pid = os.getpid()
78 os.kill(pid, signal.SIGINT)
79 result.breakCaught = True
80 self.assertTrue(result.shouldStop)
81
82 def test_function():
83 result = unittest.TestResult()
84 unittest.installHandler()
85 unittest.registerResult(result)
86
87 self.assertNotEqual(
88 signal.getsignal(signal.SIGINT),
89 self._default_handler,
90 )
91
92 try:
93 test(result)
94 except KeyboardInterrupt:
95 self.fail("KeyboardInterrupt not handled")
96 self.assertTrue(result.breakCaught)
97 self.withRepeats(test_function)
98
99 def testSecondInterrupt(self):
100 # Can't use skipIf decorator because the signal handler may have
101 # been changed after defining this method.
102 if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
103 self.skipTest("test requires SIGINT to not be ignored")
104
105 def test(result):
106 pid = os.getpid()
107 os.kill(pid, signal.SIGINT)
108 result.breakCaught = True
109 self.assertTrue(result.shouldStop)
110 os.kill(pid, signal.SIGINT)
111 self.fail("Second KeyboardInterrupt not raised")
112
113 def test_function():
114 result = unittest.TestResult()
115 unittest.installHandler()
116 unittest.registerResult(result)
117
118 with self.assertRaises(KeyboardInterrupt):
119 test(result)
120 self.assertTrue(result.breakCaught)
121 self.withRepeats(test_function)
122
123
124 def testTwoResults(self):
125 def test_function():
126 unittest.installHandler()
127
128 result = unittest.TestResult()
129 unittest.registerResult(result)
130 new_handler = signal.getsignal(signal.SIGINT)
131
132 result2 = unittest.TestResult()
133 unittest.registerResult(result2)
134 self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
135
136 result3 = unittest.TestResult()
137
138 try:
139 os.kill(os.getpid(), signal.SIGINT)
140 except KeyboardInterrupt:
141 self.fail("KeyboardInterrupt not handled")
142
143 self.assertTrue(result.shouldStop)
144 self.assertTrue(result2.shouldStop)
145 self.assertFalse(result3.shouldStop)
146 self.withRepeats(test_function)
147
148
149 def testHandlerReplacedButCalled(self):
150 # Can't use skipIf decorator because the signal handler may have
151 # been changed after defining this method.
152 if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
153 self.skipTest("test requires SIGINT to not be ignored")
154
155 def test_function():
156 # If our handler has been replaced (is no longer installed) but is
157 # called by the *new* handler, then it isn't safe to delay the
158 # SIGINT and we should immediately delegate to the default handler
159 unittest.installHandler()
160
161 handler = signal.getsignal(signal.SIGINT)
162 def new_handler(frame, signum):
163 handler(frame, signum)
164 signal.signal(signal.SIGINT, new_handler)
165
166 try:
167 os.kill(os.getpid(), signal.SIGINT)
168 except KeyboardInterrupt:
169 pass
170 else:
171 self.fail("replaced but delegated handler doesn't raise interrupt")
172 self.withRepeats(test_function)
173
174 def testRunner(self):
175 # Creating a TextTestRunner with the appropriate argument should
176 # register the TextTestResult it creates
177 runner = unittest.TextTestRunner(stream=io.StringIO())
178
179 result = runner.run(unittest.TestSuite())
180 self.assertIn(result, unittest.signals._results)
181
182 def testWeakReferences(self):
183 # Calling registerResult on a result should not keep it alive
184 result = unittest.TestResult()
185 unittest.registerResult(result)
186
187 ref = weakref.ref(result)
188 del result
189
190 # For non-reference counting implementations
191 gc.collect();gc.collect()
192 self.assertIsNone(ref())
193
194
195 def testRemoveResult(self):
196 result = unittest.TestResult()
197 unittest.registerResult(result)
198
199 unittest.installHandler()
200 self.assertTrue(unittest.removeResult(result))
201
202 # Should this raise an error instead?
203 self.assertFalse(unittest.removeResult(unittest.TestResult()))
204
205 try:
206 pid = os.getpid()
207 os.kill(pid, signal.SIGINT)
208 except KeyboardInterrupt:
209 pass
210
211 self.assertFalse(result.shouldStop)
212
213 def testMainInstallsHandler(self):
214 failfast = object()
215 test = object()
216 verbosity = object()
217 result = object()
218 default_handler = signal.getsignal(signal.SIGINT)
219
220 class ESC[4;38;5;81mFakeRunner(ESC[4;38;5;149mobject):
221 initArgs = []
222 runArgs = []
223 def __init__(self, *args, **kwargs):
224 self.initArgs.append((args, kwargs))
225 def run(self, test):
226 self.runArgs.append(test)
227 return result
228
229 class ESC[4;38;5;81mProgram(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestProgram):
230 def __init__(self, catchbreak):
231 self.exit = False
232 self.verbosity = verbosity
233 self.failfast = failfast
234 self.catchbreak = catchbreak
235 self.tb_locals = False
236 self.testRunner = FakeRunner
237 self.test = test
238 self.result = None
239
240 p = Program(False)
241 p.runTests()
242
243 self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
244 'verbosity': verbosity,
245 'failfast': failfast,
246 'tb_locals': False,
247 'warnings': None})])
248 self.assertEqual(FakeRunner.runArgs, [test])
249 self.assertEqual(p.result, result)
250
251 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
252
253 FakeRunner.initArgs = []
254 FakeRunner.runArgs = []
255 p = Program(True)
256 p.runTests()
257
258 self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
259 'verbosity': verbosity,
260 'failfast': failfast,
261 'tb_locals': False,
262 'warnings': None})])
263 self.assertEqual(FakeRunner.runArgs, [test])
264 self.assertEqual(p.result, result)
265
266 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
267
268 def testRemoveHandler(self):
269 default_handler = signal.getsignal(signal.SIGINT)
270 unittest.installHandler()
271 unittest.removeHandler()
272 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
273
274 # check that calling removeHandler multiple times has no ill-effect
275 unittest.removeHandler()
276 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
277
278 def testRemoveHandlerAsDecorator(self):
279 default_handler = signal.getsignal(signal.SIGINT)
280 unittest.installHandler()
281
282 @unittest.removeHandler
283 def test():
284 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
285
286 test()
287 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
288
289 @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
290 @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
291 class ESC[4;38;5;81mTestBreakDefaultIntHandler(ESC[4;38;5;149mTestBreak):
292 int_handler = signal.default_int_handler
293
294 @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
295 @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
296 class ESC[4;38;5;81mTestBreakSignalIgnored(ESC[4;38;5;149mTestBreak):
297 int_handler = signal.SIG_IGN
298
299 @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
300 @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
301 class ESC[4;38;5;81mTestBreakSignalDefault(ESC[4;38;5;149mTestBreak):
302 int_handler = signal.SIG_DFL
303
304
305 if __name__ == "__main__":
306 unittest.main()