python (3.12.0)
1 import gc
2 import io
3 import os
4 import sys
5 import signal
6 import weakref
7 import unittest
8
9 from test import support
10
11
12 @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
13 @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
14 class ESC[4;38;5;81mTestBreak(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
15 int_handler = None
16 # This number was smart-guessed, previously tests were failing
17 # after 7th run. So, we take `x * 2 + 1` to be sure.
18 default_repeats = 15
19
20 def setUp(self):
21 self._default_handler = signal.getsignal(signal.SIGINT)
22 if self.int_handler is not None:
23 signal.signal(signal.SIGINT, self.int_handler)
24
25 def tearDown(self):
26 signal.signal(signal.SIGINT, self._default_handler)
27 unittest.signals._results = weakref.WeakKeyDictionary()
28 unittest.signals._interrupt_handler = None
29
30
31 def withRepeats(self, test_function, repeats=None):
32 if not support.check_impl_detail(cpython=True):
33 # Override repeats count on non-cpython to execute only once.
34 # Because this test only makes sense to be repeated on CPython.
35 repeats = 1
36 elif repeats is None:
37 repeats = self.default_repeats
38
39 for repeat in range(repeats):
40 with self.subTest(repeat=repeat):
41 # We don't run `setUp` for the very first repeat
42 # and we don't run `tearDown` for the very last one,
43 # because they are handled by the test class itself.
44 if repeat != 0:
45 self.setUp()
46 try:
47 test_function()
48 finally:
49 if repeat != repeats - 1:
50 self.tearDown()
51
52 def testInstallHandler(self):
53 default_handler = signal.getsignal(signal.SIGINT)
54 unittest.installHandler()
55 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
56
57 try:
58 pid = os.getpid()
59 os.kill(pid, signal.SIGINT)
60 except KeyboardInterrupt:
61 self.fail("KeyboardInterrupt not handled")
62
63 self.assertTrue(unittest.signals._interrupt_handler.called)
64
65 def testRegisterResult(self):
66 result = unittest.TestResult()
67 self.assertNotIn(result, unittest.signals._results)
68
69 unittest.registerResult(result)
70 try:
71 self.assertIn(result, unittest.signals._results)
72 finally:
73 unittest.removeResult(result)
74
75 def testInterruptCaught(self):
76 def test(result):
77 pid = os.getpid()
78 os.kill(pid, signal.SIGINT)
79 result.breakCaught = True
80 self.assertTrue(result.shouldStop)
81
82 def test_function():
83 result = unittest.TestResult()
84 unittest.installHandler()
85 unittest.registerResult(result)
86
87 self.assertNotEqual(
88 signal.getsignal(signal.SIGINT),
89 self._default_handler,
90 )
91
92 try:
93 test(result)
94 except KeyboardInterrupt:
95 self.fail("KeyboardInterrupt not handled")
96 self.assertTrue(result.breakCaught)
97 self.withRepeats(test_function)
98
99 def testSecondInterrupt(self):
100 # Can't use skipIf decorator because the signal handler may have
101 # been changed after defining this method.
102 if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
103 self.skipTest("test requires SIGINT to not be ignored")
104
105 def test(result):
106 pid = os.getpid()
107 os.kill(pid, signal.SIGINT)
108 result.breakCaught = True
109 self.assertTrue(result.shouldStop)
110 os.kill(pid, signal.SIGINT)
111 self.fail("Second KeyboardInterrupt not raised")
112
113 def test_function():
114 result = unittest.TestResult()
115 unittest.installHandler()
116 unittest.registerResult(result)
117
118 with self.assertRaises(KeyboardInterrupt):
119 test(result)
120 self.assertTrue(result.breakCaught)
121 self.withRepeats(test_function)
122
123
124 def testTwoResults(self):
125 def test_function():
126 unittest.installHandler()
127
128 result = unittest.TestResult()
129 unittest.registerResult(result)
130 new_handler = signal.getsignal(signal.SIGINT)
131
132 result2 = unittest.TestResult()
133 unittest.registerResult(result2)
134 self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
135
136 result3 = unittest.TestResult()
137
138 try:
139 os.kill(os.getpid(), signal.SIGINT)
140 except KeyboardInterrupt:
141 self.fail("KeyboardInterrupt not handled")
142
143 self.assertTrue(result.shouldStop)
144 self.assertTrue(result2.shouldStop)
145 self.assertFalse(result3.shouldStop)
146 self.withRepeats(test_function)
147
148
149 def testHandlerReplacedButCalled(self):
150 # Can't use skipIf decorator because the signal handler may have
151 # been changed after defining this method.
152 if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
153 self.skipTest("test requires SIGINT to not be ignored")
154
155 def test_function():
156 # If our handler has been replaced (is no longer installed) but is
157 # called by the *new* handler, then it isn't safe to delay the
158 # SIGINT and we should immediately delegate to the default handler
159 unittest.installHandler()
160
161 handler = signal.getsignal(signal.SIGINT)
162 def new_handler(frame, signum):
163 handler(frame, signum)
164 signal.signal(signal.SIGINT, new_handler)
165
166 try:
167 os.kill(os.getpid(), signal.SIGINT)
168 except KeyboardInterrupt:
169 pass
170 else:
171 self.fail("replaced but delegated handler doesn't raise interrupt")
172 self.withRepeats(test_function)
173
174 def testRunner(self):
175 # Creating a TextTestRunner with the appropriate argument should
176 # register the TextTestResult it creates
177 runner = unittest.TextTestRunner(stream=io.StringIO())
178
179 result = runner.run(unittest.TestSuite())
180 self.assertIn(result, unittest.signals._results)
181
182 def testWeakReferences(self):
183 # Calling registerResult on a result should not keep it alive
184 result = unittest.TestResult()
185 unittest.registerResult(result)
186
187 ref = weakref.ref(result)
188 del result
189
190 # For non-reference counting implementations
191 gc.collect();gc.collect()
192 self.assertIsNone(ref())
193
194
195 def testRemoveResult(self):
196 result = unittest.TestResult()
197 unittest.registerResult(result)
198
199 unittest.installHandler()
200 self.assertTrue(unittest.removeResult(result))
201
202 # Should this raise an error instead?
203 self.assertFalse(unittest.removeResult(unittest.TestResult()))
204
205 try:
206 pid = os.getpid()
207 os.kill(pid, signal.SIGINT)
208 except KeyboardInterrupt:
209 pass
210
211 self.assertFalse(result.shouldStop)
212
213 def testMainInstallsHandler(self):
214 failfast = object()
215 test = object()
216 verbosity = object()
217 result = object()
218 default_handler = signal.getsignal(signal.SIGINT)
219
220 class ESC[4;38;5;81mFakeRunner(ESC[4;38;5;149mobject):
221 initArgs = []
222 runArgs = []
223 def __init__(self, *args, **kwargs):
224 self.initArgs.append((args, kwargs))
225 def run(self, test):
226 self.runArgs.append(test)
227 return result
228
229 class ESC[4;38;5;81mProgram(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestProgram):
230 def __init__(self, catchbreak):
231 self.exit = False
232 self.verbosity = verbosity
233 self.failfast = failfast
234 self.catchbreak = catchbreak
235 self.tb_locals = False
236 self.testRunner = FakeRunner
237 self.test = test
238 self.result = None
239 self.durations = None
240
241 p = Program(False)
242 p.runTests()
243
244 self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
245 'verbosity': verbosity,
246 'failfast': failfast,
247 'tb_locals': False,
248 'warnings': None,
249 'durations': None})])
250 self.assertEqual(FakeRunner.runArgs, [test])
251 self.assertEqual(p.result, result)
252
253 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
254
255 FakeRunner.initArgs = []
256 FakeRunner.runArgs = []
257 p = Program(True)
258 p.runTests()
259
260 self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
261 'verbosity': verbosity,
262 'failfast': failfast,
263 'tb_locals': False,
264 'warnings': None,
265 'durations': None})])
266 self.assertEqual(FakeRunner.runArgs, [test])
267 self.assertEqual(p.result, result)
268
269 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
270
271 def testRemoveHandler(self):
272 default_handler = signal.getsignal(signal.SIGINT)
273 unittest.installHandler()
274 unittest.removeHandler()
275 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
276
277 # check that calling removeHandler multiple times has no ill-effect
278 unittest.removeHandler()
279 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
280
281 def testRemoveHandlerAsDecorator(self):
282 default_handler = signal.getsignal(signal.SIGINT)
283 unittest.installHandler()
284
285 @unittest.removeHandler
286 def test():
287 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
288
289 test()
290 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
291
292 @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
293 @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
294 class ESC[4;38;5;81mTestBreakDefaultIntHandler(ESC[4;38;5;149mTestBreak):
295 int_handler = signal.default_int_handler
296
297 @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
298 @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
299 class ESC[4;38;5;81mTestBreakSignalIgnored(ESC[4;38;5;149mTestBreak):
300 int_handler = signal.SIG_IGN
301
302 @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
303 @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
304 class ESC[4;38;5;81mTestBreakSignalDefault(ESC[4;38;5;149mTestBreak):
305 int_handler = signal.SIG_DFL
306
307
308 if __name__ == "__main__":
309 unittest.main()