1 import enum
2 import errno
3 import inspect
4 import os
5 import random
6 import signal
7 import socket
8 import statistics
9 import subprocess
10 import sys
11 import threading
12 import time
13 import unittest
14 from test import support
15 from test.support import os_helper
16 from test.support.script_helper import assert_python_ok, spawn_python
17 from test.support import threading_helper
18 try:
19 import _testcapi
20 except ImportError:
21 _testcapi = None
22
23
24 class ESC[4;38;5;81mGenericTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
25
26 def test_enums(self):
27 for name in dir(signal):
28 sig = getattr(signal, name)
29 if name in {'SIG_DFL', 'SIG_IGN'}:
30 self.assertIsInstance(sig, signal.Handlers)
31 elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}:
32 self.assertIsInstance(sig, signal.Sigmasks)
33 elif name.startswith('SIG') and not name.startswith('SIG_'):
34 self.assertIsInstance(sig, signal.Signals)
35 elif name.startswith('CTRL_'):
36 self.assertIsInstance(sig, signal.Signals)
37 self.assertEqual(sys.platform, "win32")
38
39 CheckedSignals = enum._old_convert_(
40 enum.IntEnum, 'Signals', 'signal',
41 lambda name:
42 name.isupper()
43 and (name.startswith('SIG') and not name.startswith('SIG_'))
44 or name.startswith('CTRL_'),
45 source=signal,
46 )
47 enum._test_simple_enum(CheckedSignals, signal.Signals)
48
49 CheckedHandlers = enum._old_convert_(
50 enum.IntEnum, 'Handlers', 'signal',
51 lambda name: name in ('SIG_DFL', 'SIG_IGN'),
52 source=signal,
53 )
54 enum._test_simple_enum(CheckedHandlers, signal.Handlers)
55
56 Sigmasks = getattr(signal, 'Sigmasks', None)
57 if Sigmasks is not None:
58 CheckedSigmasks = enum._old_convert_(
59 enum.IntEnum, 'Sigmasks', 'signal',
60 lambda name: name in ('SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'),
61 source=signal,
62 )
63 enum._test_simple_enum(CheckedSigmasks, Sigmasks)
64
65 def test_functions_module_attr(self):
66 # Issue #27718: If __all__ is not defined all non-builtin functions
67 # should have correct __module__ to be displayed by pydoc.
68 for name in dir(signal):
69 value = getattr(signal, name)
70 if inspect.isroutine(value) and not inspect.isbuiltin(value):
71 self.assertEqual(value.__module__, 'signal')
72
73
74 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
75 class ESC[4;38;5;81mPosixTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
76 def trivial_signal_handler(self, *args):
77 pass
78
79 def test_out_of_range_signal_number_raises_error(self):
80 self.assertRaises(ValueError, signal.getsignal, 4242)
81
82 self.assertRaises(ValueError, signal.signal, 4242,
83 self.trivial_signal_handler)
84
85 self.assertRaises(ValueError, signal.strsignal, 4242)
86
87 def test_setting_signal_handler_to_none_raises_error(self):
88 self.assertRaises(TypeError, signal.signal,
89 signal.SIGUSR1, None)
90
91 def test_getsignal(self):
92 hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
93 self.assertIsInstance(hup, signal.Handlers)
94 self.assertEqual(signal.getsignal(signal.SIGHUP),
95 self.trivial_signal_handler)
96 signal.signal(signal.SIGHUP, hup)
97 self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
98
99 def test_strsignal(self):
100 self.assertIn("Interrupt", signal.strsignal(signal.SIGINT))
101 self.assertIn("Terminated", signal.strsignal(signal.SIGTERM))
102 self.assertIn("Hangup", signal.strsignal(signal.SIGHUP))
103
104 # Issue 3864, unknown if this affects earlier versions of freebsd also
105 def test_interprocess_signal(self):
106 dirname = os.path.dirname(__file__)
107 script = os.path.join(dirname, 'signalinterproctester.py')
108 assert_python_ok(script)
109
110 @unittest.skipUnless(
111 hasattr(signal, "valid_signals"),
112 "requires signal.valid_signals"
113 )
114 def test_valid_signals(self):
115 s = signal.valid_signals()
116 self.assertIsInstance(s, set)
117 self.assertIn(signal.Signals.SIGINT, s)
118 self.assertIn(signal.Signals.SIGALRM, s)
119 self.assertNotIn(0, s)
120 self.assertNotIn(signal.NSIG, s)
121 self.assertLess(len(s), signal.NSIG)
122
123 # gh-91145: Make sure that all SIGxxx constants exposed by the Python
124 # signal module have a number in the [0; signal.NSIG-1] range.
125 for name in dir(signal):
126 if not name.startswith("SIG"):
127 continue
128 if name in {"SIG_IGN", "SIG_DFL"}:
129 # SIG_IGN and SIG_DFL are pointers
130 continue
131 with self.subTest(name=name):
132 signum = getattr(signal, name)
133 self.assertGreaterEqual(signum, 0)
134 self.assertLess(signum, signal.NSIG)
135
136 @unittest.skipUnless(sys.executable, "sys.executable required.")
137 @support.requires_subprocess()
138 def test_keyboard_interrupt_exit_code(self):
139 """KeyboardInterrupt triggers exit via SIGINT."""
140 process = subprocess.run(
141 [sys.executable, "-c",
142 "import os, signal, time\n"
143 "os.kill(os.getpid(), signal.SIGINT)\n"
144 "for _ in range(999): time.sleep(0.01)"],
145 stderr=subprocess.PIPE)
146 self.assertIn(b"KeyboardInterrupt", process.stderr)
147 self.assertEqual(process.returncode, -signal.SIGINT)
148 # Caveat: The exit code is insufficient to guarantee we actually died
149 # via a signal. POSIX shells do more than look at the 8 bit value.
150 # Writing an automation friendly test of an interactive shell
151 # to confirm that our process died via a SIGINT proved too complex.
152
153
154 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
155 class ESC[4;38;5;81mWindowsSignalTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
156
157 def test_valid_signals(self):
158 s = signal.valid_signals()
159 self.assertIsInstance(s, set)
160 self.assertGreaterEqual(len(s), 6)
161 self.assertIn(signal.Signals.SIGINT, s)
162 self.assertNotIn(0, s)
163 self.assertNotIn(signal.NSIG, s)
164 self.assertLess(len(s), signal.NSIG)
165
166 def test_issue9324(self):
167 # Updated for issue #10003, adding SIGBREAK
168 handler = lambda x, y: None
169 checked = set()
170 for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
171 signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
172 signal.SIGTERM):
173 # Set and then reset a handler for signals that work on windows.
174 # Issue #18396, only for signals without a C-level handler.
175 if signal.getsignal(sig) is not None:
176 signal.signal(sig, signal.signal(sig, handler))
177 checked.add(sig)
178 # Issue #18396: Ensure the above loop at least tested *something*
179 self.assertTrue(checked)
180
181 with self.assertRaises(ValueError):
182 signal.signal(-1, handler)
183
184 with self.assertRaises(ValueError):
185 signal.signal(7, handler)
186
187 @unittest.skipUnless(sys.executable, "sys.executable required.")
188 @support.requires_subprocess()
189 def test_keyboard_interrupt_exit_code(self):
190 """KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT."""
191 # We don't test via os.kill(os.getpid(), signal.CTRL_C_EVENT) here
192 # as that requires setting up a console control handler in a child
193 # in its own process group. Doable, but quite complicated. (see
194 # @eryksun on https://github.com/python/cpython/pull/11862)
195 process = subprocess.run(
196 [sys.executable, "-c", "raise KeyboardInterrupt"],
197 stderr=subprocess.PIPE)
198 self.assertIn(b"KeyboardInterrupt", process.stderr)
199 STATUS_CONTROL_C_EXIT = 0xC000013A
200 self.assertEqual(process.returncode, STATUS_CONTROL_C_EXIT)
201
202
203 class ESC[4;38;5;81mWakeupFDTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
204
205 def test_invalid_call(self):
206 # First parameter is positional-only
207 with self.assertRaises(TypeError):
208 signal.set_wakeup_fd(signum=signal.SIGINT)
209
210 # warn_on_full_buffer is a keyword-only parameter
211 with self.assertRaises(TypeError):
212 signal.set_wakeup_fd(signal.SIGINT, False)
213
214 def test_invalid_fd(self):
215 fd = os_helper.make_bad_fd()
216 self.assertRaises((ValueError, OSError),
217 signal.set_wakeup_fd, fd)
218
219 @unittest.skipUnless(support.has_socket_support, "needs working sockets.")
220 def test_invalid_socket(self):
221 sock = socket.socket()
222 fd = sock.fileno()
223 sock.close()
224 self.assertRaises((ValueError, OSError),
225 signal.set_wakeup_fd, fd)
226
227 # Emscripten does not support fstat on pipes yet.
228 # https://github.com/emscripten-core/emscripten/issues/16414
229 @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
230 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
231 def test_set_wakeup_fd_result(self):
232 r1, w1 = os.pipe()
233 self.addCleanup(os.close, r1)
234 self.addCleanup(os.close, w1)
235 r2, w2 = os.pipe()
236 self.addCleanup(os.close, r2)
237 self.addCleanup(os.close, w2)
238
239 if hasattr(os, 'set_blocking'):
240 os.set_blocking(w1, False)
241 os.set_blocking(w2, False)
242
243 signal.set_wakeup_fd(w1)
244 self.assertEqual(signal.set_wakeup_fd(w2), w1)
245 self.assertEqual(signal.set_wakeup_fd(-1), w2)
246 self.assertEqual(signal.set_wakeup_fd(-1), -1)
247
248 @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
249 @unittest.skipUnless(support.has_socket_support, "needs working sockets.")
250 def test_set_wakeup_fd_socket_result(self):
251 sock1 = socket.socket()
252 self.addCleanup(sock1.close)
253 sock1.setblocking(False)
254 fd1 = sock1.fileno()
255
256 sock2 = socket.socket()
257 self.addCleanup(sock2.close)
258 sock2.setblocking(False)
259 fd2 = sock2.fileno()
260
261 signal.set_wakeup_fd(fd1)
262 self.assertEqual(signal.set_wakeup_fd(fd2), fd1)
263 self.assertEqual(signal.set_wakeup_fd(-1), fd2)
264 self.assertEqual(signal.set_wakeup_fd(-1), -1)
265
266 # On Windows, files are always blocking and Windows does not provide a
267 # function to test if a socket is in non-blocking mode.
268 @unittest.skipIf(sys.platform == "win32", "tests specific to POSIX")
269 @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
270 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
271 def test_set_wakeup_fd_blocking(self):
272 rfd, wfd = os.pipe()
273 self.addCleanup(os.close, rfd)
274 self.addCleanup(os.close, wfd)
275
276 # fd must be non-blocking
277 os.set_blocking(wfd, True)
278 with self.assertRaises(ValueError) as cm:
279 signal.set_wakeup_fd(wfd)
280 self.assertEqual(str(cm.exception),
281 "the fd %s must be in non-blocking mode" % wfd)
282
283 # non-blocking is ok
284 os.set_blocking(wfd, False)
285 signal.set_wakeup_fd(wfd)
286 signal.set_wakeup_fd(-1)
287
288
289 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
290 class ESC[4;38;5;81mWakeupSignalTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
291 @unittest.skipIf(_testcapi is None, 'need _testcapi')
292 def check_wakeup(self, test_body, *signals, ordered=True):
293 # use a subprocess to have only one thread
294 code = """if 1:
295 import _testcapi
296 import os
297 import signal
298 import struct
299
300 signals = {!r}
301
302 def handler(signum, frame):
303 pass
304
305 def check_signum(signals):
306 data = os.read(read, len(signals)+1)
307 raised = struct.unpack('%uB' % len(data), data)
308 if not {!r}:
309 raised = set(raised)
310 signals = set(signals)
311 if raised != signals:
312 raise Exception("%r != %r" % (raised, signals))
313
314 {}
315
316 signal.signal(signal.SIGALRM, handler)
317 read, write = os.pipe()
318 os.set_blocking(write, False)
319 signal.set_wakeup_fd(write)
320
321 test()
322 check_signum(signals)
323
324 os.close(read)
325 os.close(write)
326 """.format(tuple(map(int, signals)), ordered, test_body)
327
328 assert_python_ok('-c', code)
329
330 @unittest.skipIf(_testcapi is None, 'need _testcapi')
331 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
332 def test_wakeup_write_error(self):
333 # Issue #16105: write() errors in the C signal handler should not
334 # pass silently.
335 # Use a subprocess to have only one thread.
336 code = """if 1:
337 import _testcapi
338 import errno
339 import os
340 import signal
341 import sys
342 from test.support import captured_stderr
343
344 def handler(signum, frame):
345 1/0
346
347 signal.signal(signal.SIGALRM, handler)
348 r, w = os.pipe()
349 os.set_blocking(r, False)
350
351 # Set wakeup_fd a read-only file descriptor to trigger the error
352 signal.set_wakeup_fd(r)
353 try:
354 with captured_stderr() as err:
355 signal.raise_signal(signal.SIGALRM)
356 except ZeroDivisionError:
357 # An ignored exception should have been printed out on stderr
358 err = err.getvalue()
359 if ('Exception ignored when trying to write to the signal wakeup fd'
360 not in err):
361 raise AssertionError(err)
362 if ('OSError: [Errno %d]' % errno.EBADF) not in err:
363 raise AssertionError(err)
364 else:
365 raise AssertionError("ZeroDivisionError not raised")
366
367 os.close(r)
368 os.close(w)
369 """
370 r, w = os.pipe()
371 try:
372 os.write(r, b'x')
373 except OSError:
374 pass
375 else:
376 self.skipTest("OS doesn't report write() error on the read end of a pipe")
377 finally:
378 os.close(r)
379 os.close(w)
380
381 assert_python_ok('-c', code)
382
383 def test_wakeup_fd_early(self):
384 self.check_wakeup("""def test():
385 import select
386 import time
387
388 TIMEOUT_FULL = 10
389 TIMEOUT_HALF = 5
390
391 class InterruptSelect(Exception):
392 pass
393
394 def handler(signum, frame):
395 raise InterruptSelect
396 signal.signal(signal.SIGALRM, handler)
397
398 signal.alarm(1)
399
400 # We attempt to get a signal during the sleep,
401 # before select is called
402 try:
403 select.select([], [], [], TIMEOUT_FULL)
404 except InterruptSelect:
405 pass
406 else:
407 raise Exception("select() was not interrupted")
408
409 before_time = time.monotonic()
410 select.select([read], [], [], TIMEOUT_FULL)
411 after_time = time.monotonic()
412 dt = after_time - before_time
413 if dt >= TIMEOUT_HALF:
414 raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
415 """, signal.SIGALRM)
416
417 def test_wakeup_fd_during(self):
418 self.check_wakeup("""def test():
419 import select
420 import time
421
422 TIMEOUT_FULL = 10
423 TIMEOUT_HALF = 5
424
425 class InterruptSelect(Exception):
426 pass
427
428 def handler(signum, frame):
429 raise InterruptSelect
430 signal.signal(signal.SIGALRM, handler)
431
432 signal.alarm(1)
433 before_time = time.monotonic()
434 # We attempt to get a signal during the select call
435 try:
436 select.select([read], [], [], TIMEOUT_FULL)
437 except InterruptSelect:
438 pass
439 else:
440 raise Exception("select() was not interrupted")
441 after_time = time.monotonic()
442 dt = after_time - before_time
443 if dt >= TIMEOUT_HALF:
444 raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
445 """, signal.SIGALRM)
446
447 def test_signum(self):
448 self.check_wakeup("""def test():
449 signal.signal(signal.SIGUSR1, handler)
450 signal.raise_signal(signal.SIGUSR1)
451 signal.raise_signal(signal.SIGALRM)
452 """, signal.SIGUSR1, signal.SIGALRM)
453
454 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
455 'need signal.pthread_sigmask()')
456 def test_pending(self):
457 self.check_wakeup("""def test():
458 signum1 = signal.SIGUSR1
459 signum2 = signal.SIGUSR2
460
461 signal.signal(signum1, handler)
462 signal.signal(signum2, handler)
463
464 signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
465 signal.raise_signal(signum1)
466 signal.raise_signal(signum2)
467 # Unblocking the 2 signals calls the C signal handler twice
468 signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
469 """, signal.SIGUSR1, signal.SIGUSR2, ordered=False)
470
471
472 @unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair')
473 class ESC[4;38;5;81mWakeupSocketSignalTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
474
475 @unittest.skipIf(_testcapi is None, 'need _testcapi')
476 def test_socket(self):
477 # use a subprocess to have only one thread
478 code = """if 1:
479 import signal
480 import socket
481 import struct
482 import _testcapi
483
484 signum = signal.SIGINT
485 signals = (signum,)
486
487 def handler(signum, frame):
488 pass
489
490 signal.signal(signum, handler)
491
492 read, write = socket.socketpair()
493 write.setblocking(False)
494 signal.set_wakeup_fd(write.fileno())
495
496 signal.raise_signal(signum)
497
498 data = read.recv(1)
499 if not data:
500 raise Exception("no signum written")
501 raised = struct.unpack('B', data)
502 if raised != signals:
503 raise Exception("%r != %r" % (raised, signals))
504
505 read.close()
506 write.close()
507 """
508
509 assert_python_ok('-c', code)
510
511 @unittest.skipIf(_testcapi is None, 'need _testcapi')
512 def test_send_error(self):
513 # Use a subprocess to have only one thread.
514 if os.name == 'nt':
515 action = 'send'
516 else:
517 action = 'write'
518 code = """if 1:
519 import errno
520 import signal
521 import socket
522 import sys
523 import time
524 import _testcapi
525 from test.support import captured_stderr
526
527 signum = signal.SIGINT
528
529 def handler(signum, frame):
530 pass
531
532 signal.signal(signum, handler)
533
534 read, write = socket.socketpair()
535 read.setblocking(False)
536 write.setblocking(False)
537
538 signal.set_wakeup_fd(write.fileno())
539
540 # Close sockets: send() will fail
541 read.close()
542 write.close()
543
544 with captured_stderr() as err:
545 signal.raise_signal(signum)
546
547 err = err.getvalue()
548 if ('Exception ignored when trying to {action} to the signal wakeup fd'
549 not in err):
550 raise AssertionError(err)
551 """.format(action=action)
552 assert_python_ok('-c', code)
553
554 @unittest.skipIf(_testcapi is None, 'need _testcapi')
555 def test_warn_on_full_buffer(self):
556 # Use a subprocess to have only one thread.
557 if os.name == 'nt':
558 action = 'send'
559 else:
560 action = 'write'
561 code = """if 1:
562 import errno
563 import signal
564 import socket
565 import sys
566 import time
567 import _testcapi
568 from test.support import captured_stderr
569
570 signum = signal.SIGINT
571
572 # This handler will be called, but we intentionally won't read from
573 # the wakeup fd.
574 def handler(signum, frame):
575 pass
576
577 signal.signal(signum, handler)
578
579 read, write = socket.socketpair()
580
581 # Fill the socketpair buffer
582 if sys.platform == 'win32':
583 # bpo-34130: On Windows, sometimes non-blocking send fails to fill
584 # the full socketpair buffer, so use a timeout of 50 ms instead.
585 write.settimeout(0.050)
586 else:
587 write.setblocking(False)
588
589 written = 0
590 if sys.platform == "vxworks":
591 CHUNK_SIZES = (1,)
592 else:
593 # Start with large chunk size to reduce the
594 # number of send needed to fill the buffer.
595 CHUNK_SIZES = (2 ** 16, 2 ** 8, 1)
596 for chunk_size in CHUNK_SIZES:
597 chunk = b"x" * chunk_size
598 try:
599 while True:
600 write.send(chunk)
601 written += chunk_size
602 except (BlockingIOError, TimeoutError):
603 pass
604
605 print(f"%s bytes written into the socketpair" % written, flush=True)
606
607 write.setblocking(False)
608 try:
609 write.send(b"x")
610 except BlockingIOError:
611 # The socketpair buffer seems full
612 pass
613 else:
614 raise AssertionError("%s bytes failed to fill the socketpair "
615 "buffer" % written)
616
617 # By default, we get a warning when a signal arrives
618 msg = ('Exception ignored when trying to {action} '
619 'to the signal wakeup fd')
620 signal.set_wakeup_fd(write.fileno())
621
622 with captured_stderr() as err:
623 signal.raise_signal(signum)
624
625 err = err.getvalue()
626 if msg not in err:
627 raise AssertionError("first set_wakeup_fd() test failed, "
628 "stderr: %r" % err)
629
630 # And also if warn_on_full_buffer=True
631 signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True)
632
633 with captured_stderr() as err:
634 signal.raise_signal(signum)
635
636 err = err.getvalue()
637 if msg not in err:
638 raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) "
639 "test failed, stderr: %r" % err)
640
641 # But not if warn_on_full_buffer=False
642 signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False)
643
644 with captured_stderr() as err:
645 signal.raise_signal(signum)
646
647 err = err.getvalue()
648 if err != "":
649 raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) "
650 "test failed, stderr: %r" % err)
651
652 # And then check the default again, to make sure warn_on_full_buffer
653 # settings don't leak across calls.
654 signal.set_wakeup_fd(write.fileno())
655
656 with captured_stderr() as err:
657 signal.raise_signal(signum)
658
659 err = err.getvalue()
660 if msg not in err:
661 raise AssertionError("second set_wakeup_fd() test failed, "
662 "stderr: %r" % err)
663
664 """.format(action=action)
665 assert_python_ok('-c', code)
666
667
668 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
669 @unittest.skipUnless(hasattr(signal, 'siginterrupt'), "needs signal.siginterrupt()")
670 @support.requires_subprocess()
671 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
672 class ESC[4;38;5;81mSiginterruptTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
673
674 def readpipe_interrupted(self, interrupt):
675 """Perform a read during which a signal will arrive. Return True if the
676 read is interrupted by the signal and raises an exception. Return False
677 if it returns normally.
678 """
679 # use a subprocess to have only one thread, to have a timeout on the
680 # blocking read and to not touch signal handling in this process
681 code = """if 1:
682 import errno
683 import os
684 import signal
685 import sys
686
687 interrupt = %r
688 r, w = os.pipe()
689
690 def handler(signum, frame):
691 1 / 0
692
693 signal.signal(signal.SIGALRM, handler)
694 if interrupt is not None:
695 signal.siginterrupt(signal.SIGALRM, interrupt)
696
697 print("ready")
698 sys.stdout.flush()
699
700 # run the test twice
701 try:
702 for loop in range(2):
703 # send a SIGALRM in a second (during the read)
704 signal.alarm(1)
705 try:
706 # blocking call: read from a pipe without data
707 os.read(r, 1)
708 except ZeroDivisionError:
709 pass
710 else:
711 sys.exit(2)
712 sys.exit(3)
713 finally:
714 os.close(r)
715 os.close(w)
716 """ % (interrupt,)
717 with spawn_python('-c', code) as process:
718 try:
719 # wait until the child process is loaded and has started
720 first_line = process.stdout.readline()
721
722 stdout, stderr = process.communicate(timeout=support.SHORT_TIMEOUT)
723 except subprocess.TimeoutExpired:
724 process.kill()
725 return False
726 else:
727 stdout = first_line + stdout
728 exitcode = process.wait()
729 if exitcode not in (2, 3):
730 raise Exception("Child error (exit code %s): %r"
731 % (exitcode, stdout))
732 return (exitcode == 3)
733
734 def test_without_siginterrupt(self):
735 # If a signal handler is installed and siginterrupt is not called
736 # at all, when that signal arrives, it interrupts a syscall that's in
737 # progress.
738 interrupted = self.readpipe_interrupted(None)
739 self.assertTrue(interrupted)
740
741 def test_siginterrupt_on(self):
742 # If a signal handler is installed and siginterrupt is called with
743 # a true value for the second argument, when that signal arrives, it
744 # interrupts a syscall that's in progress.
745 interrupted = self.readpipe_interrupted(True)
746 self.assertTrue(interrupted)
747
748 @support.requires_resource('walltime')
749 def test_siginterrupt_off(self):
750 # If a signal handler is installed and siginterrupt is called with
751 # a false value for the second argument, when that signal arrives, it
752 # does not interrupt a syscall that's in progress.
753 interrupted = self.readpipe_interrupted(False)
754 self.assertFalse(interrupted)
755
756
757 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
758 @unittest.skipUnless(hasattr(signal, 'getitimer') and hasattr(signal, 'setitimer'),
759 "needs signal.getitimer() and signal.setitimer()")
760 class ESC[4;38;5;81mItimerTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
761 def setUp(self):
762 self.hndl_called = False
763 self.hndl_count = 0
764 self.itimer = None
765 self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
766
767 def tearDown(self):
768 signal.signal(signal.SIGALRM, self.old_alarm)
769 if self.itimer is not None: # test_itimer_exc doesn't change this attr
770 # just ensure that itimer is stopped
771 signal.setitimer(self.itimer, 0)
772
773 def sig_alrm(self, *args):
774 self.hndl_called = True
775
776 def sig_vtalrm(self, *args):
777 self.hndl_called = True
778
779 if self.hndl_count > 3:
780 # it shouldn't be here, because it should have been disabled.
781 raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
782 "timer.")
783 elif self.hndl_count == 3:
784 # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
785 signal.setitimer(signal.ITIMER_VIRTUAL, 0)
786
787 self.hndl_count += 1
788
789 def sig_prof(self, *args):
790 self.hndl_called = True
791 signal.setitimer(signal.ITIMER_PROF, 0)
792
793 def test_itimer_exc(self):
794 # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
795 # defines it ?
796 self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
797 # Negative times are treated as zero on some platforms.
798 if 0:
799 self.assertRaises(signal.ItimerError,
800 signal.setitimer, signal.ITIMER_REAL, -1)
801
802 def test_itimer_real(self):
803 self.itimer = signal.ITIMER_REAL
804 signal.setitimer(self.itimer, 1.0)
805 signal.pause()
806 self.assertEqual(self.hndl_called, True)
807
808 # Issue 3864, unknown if this affects earlier versions of freebsd also
809 @unittest.skipIf(sys.platform in ('netbsd5',),
810 'itimer not reliable (does not mix well with threading) on some BSDs.')
811 def test_itimer_virtual(self):
812 self.itimer = signal.ITIMER_VIRTUAL
813 signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
814 signal.setitimer(self.itimer, 0.3, 0.2)
815
816 for _ in support.busy_retry(60.0, error=False):
817 # use up some virtual time by doing real work
818 _ = pow(12345, 67890, 10000019)
819 if signal.getitimer(self.itimer) == (0.0, 0.0):
820 # sig_vtalrm handler stopped this itimer
821 break
822 else:
823 # bpo-8424
824 self.skipTest("timeout: likely cause: machine too slow or load too "
825 "high")
826
827 # virtual itimer should be (0.0, 0.0) now
828 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
829 # and the handler should have been called
830 self.assertEqual(self.hndl_called, True)
831
832 def test_itimer_prof(self):
833 self.itimer = signal.ITIMER_PROF
834 signal.signal(signal.SIGPROF, self.sig_prof)
835 signal.setitimer(self.itimer, 0.2, 0.2)
836
837 for _ in support.busy_retry(60.0, error=False):
838 # do some work
839 _ = pow(12345, 67890, 10000019)
840 if signal.getitimer(self.itimer) == (0.0, 0.0):
841 # sig_prof handler stopped this itimer
842 break
843 else:
844 # bpo-8424
845 self.skipTest("timeout: likely cause: machine too slow or load too "
846 "high")
847
848 # profiling itimer should be (0.0, 0.0) now
849 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
850 # and the handler should have been called
851 self.assertEqual(self.hndl_called, True)
852
853 def test_setitimer_tiny(self):
854 # bpo-30807: C setitimer() takes a microsecond-resolution interval.
855 # Check that float -> timeval conversion doesn't round
856 # the interval down to zero, which would disable the timer.
857 self.itimer = signal.ITIMER_REAL
858 signal.setitimer(self.itimer, 1e-6)
859 time.sleep(1)
860 self.assertEqual(self.hndl_called, True)
861
862
863 class ESC[4;38;5;81mPendingSignalsTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
864 """
865 Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
866 functions.
867 """
868 @unittest.skipUnless(hasattr(signal, 'sigpending'),
869 'need signal.sigpending()')
870 def test_sigpending_empty(self):
871 self.assertEqual(signal.sigpending(), set())
872
873 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
874 'need signal.pthread_sigmask()')
875 @unittest.skipUnless(hasattr(signal, 'sigpending'),
876 'need signal.sigpending()')
877 def test_sigpending(self):
878 code = """if 1:
879 import os
880 import signal
881
882 def handler(signum, frame):
883 1/0
884
885 signum = signal.SIGUSR1
886 signal.signal(signum, handler)
887
888 signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
889 os.kill(os.getpid(), signum)
890 pending = signal.sigpending()
891 for sig in pending:
892 assert isinstance(sig, signal.Signals), repr(pending)
893 if pending != {signum}:
894 raise Exception('%s != {%s}' % (pending, signum))
895 try:
896 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
897 except ZeroDivisionError:
898 pass
899 else:
900 raise Exception("ZeroDivisionError not raised")
901 """
902 assert_python_ok('-c', code)
903
904 @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
905 'need signal.pthread_kill()')
906 @threading_helper.requires_working_threading()
907 def test_pthread_kill(self):
908 code = """if 1:
909 import signal
910 import threading
911 import sys
912
913 signum = signal.SIGUSR1
914
915 def handler(signum, frame):
916 1/0
917
918 signal.signal(signum, handler)
919
920 tid = threading.get_ident()
921 try:
922 signal.pthread_kill(tid, signum)
923 except ZeroDivisionError:
924 pass
925 else:
926 raise Exception("ZeroDivisionError not raised")
927 """
928 assert_python_ok('-c', code)
929
930 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
931 'need signal.pthread_sigmask()')
932 def wait_helper(self, blocked, test):
933 """
934 test: body of the "def test(signum):" function.
935 blocked: number of the blocked signal
936 """
937 code = '''if 1:
938 import signal
939 import sys
940 from signal import Signals
941
942 def handler(signum, frame):
943 1/0
944
945 %s
946
947 blocked = %s
948 signum = signal.SIGALRM
949
950 # child: block and wait the signal
951 try:
952 signal.signal(signum, handler)
953 signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
954
955 # Do the tests
956 test(signum)
957
958 # The handler must not be called on unblock
959 try:
960 signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
961 except ZeroDivisionError:
962 print("the signal handler has been called",
963 file=sys.stderr)
964 sys.exit(1)
965 except BaseException as err:
966 print("error: {}".format(err), file=sys.stderr)
967 sys.stderr.flush()
968 sys.exit(1)
969 ''' % (test.strip(), blocked)
970
971 # sig*wait* must be called with the signal blocked: since the current
972 # process might have several threads running, use a subprocess to have
973 # a single thread.
974 assert_python_ok('-c', code)
975
976 @unittest.skipUnless(hasattr(signal, 'sigwait'),
977 'need signal.sigwait()')
978 def test_sigwait(self):
979 self.wait_helper(signal.SIGALRM, '''
980 def test(signum):
981 signal.alarm(1)
982 received = signal.sigwait([signum])
983 assert isinstance(received, signal.Signals), received
984 if received != signum:
985 raise Exception('received %s, not %s' % (received, signum))
986 ''')
987
988 @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
989 'need signal.sigwaitinfo()')
990 def test_sigwaitinfo(self):
991 self.wait_helper(signal.SIGALRM, '''
992 def test(signum):
993 signal.alarm(1)
994 info = signal.sigwaitinfo([signum])
995 if info.si_signo != signum:
996 raise Exception("info.si_signo != %s" % signum)
997 ''')
998
999 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
1000 'need signal.sigtimedwait()')
1001 def test_sigtimedwait(self):
1002 self.wait_helper(signal.SIGALRM, '''
1003 def test(signum):
1004 signal.alarm(1)
1005 info = signal.sigtimedwait([signum], 10.1000)
1006 if info.si_signo != signum:
1007 raise Exception('info.si_signo != %s' % signum)
1008 ''')
1009
1010 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
1011 'need signal.sigtimedwait()')
1012 def test_sigtimedwait_poll(self):
1013 # check that polling with sigtimedwait works
1014 self.wait_helper(signal.SIGALRM, '''
1015 def test(signum):
1016 import os
1017 os.kill(os.getpid(), signum)
1018 info = signal.sigtimedwait([signum], 0)
1019 if info.si_signo != signum:
1020 raise Exception('info.si_signo != %s' % signum)
1021 ''')
1022
1023 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
1024 'need signal.sigtimedwait()')
1025 def test_sigtimedwait_timeout(self):
1026 self.wait_helper(signal.SIGALRM, '''
1027 def test(signum):
1028 received = signal.sigtimedwait([signum], 1.0)
1029 if received is not None:
1030 raise Exception("received=%r" % (received,))
1031 ''')
1032
1033 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
1034 'need signal.sigtimedwait()')
1035 def test_sigtimedwait_negative_timeout(self):
1036 signum = signal.SIGALRM
1037 self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0)
1038
1039 @unittest.skipUnless(hasattr(signal, 'sigwait'),
1040 'need signal.sigwait()')
1041 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1042 'need signal.pthread_sigmask()')
1043 @threading_helper.requires_working_threading()
1044 def test_sigwait_thread(self):
1045 # Check that calling sigwait() from a thread doesn't suspend the whole
1046 # process. A new interpreter is spawned to avoid problems when mixing
1047 # threads and fork(): only async-safe functions are allowed between
1048 # fork() and exec().
1049 assert_python_ok("-c", """if True:
1050 import os, threading, sys, time, signal
1051
1052 # the default handler terminates the process
1053 signum = signal.SIGUSR1
1054
1055 def kill_later():
1056 # wait until the main thread is waiting in sigwait()
1057 time.sleep(1)
1058 os.kill(os.getpid(), signum)
1059
1060 # the signal must be blocked by all the threads
1061 signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
1062 killer = threading.Thread(target=kill_later)
1063 killer.start()
1064 received = signal.sigwait([signum])
1065 if received != signum:
1066 print("sigwait() received %s, not %s" % (received, signum),
1067 file=sys.stderr)
1068 sys.exit(1)
1069 killer.join()
1070 # unblock the signal, which should have been cleared by sigwait()
1071 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
1072 """)
1073
1074 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1075 'need signal.pthread_sigmask()')
1076 def test_pthread_sigmask_arguments(self):
1077 self.assertRaises(TypeError, signal.pthread_sigmask)
1078 self.assertRaises(TypeError, signal.pthread_sigmask, 1)
1079 self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3)
1080 self.assertRaises(OSError, signal.pthread_sigmask, 1700, [])
1081 with self.assertRaises(ValueError):
1082 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.NSIG])
1083 with self.assertRaises(ValueError):
1084 signal.pthread_sigmask(signal.SIG_BLOCK, [0])
1085 with self.assertRaises(ValueError):
1086 signal.pthread_sigmask(signal.SIG_BLOCK, [1<<1000])
1087
1088 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1089 'need signal.pthread_sigmask()')
1090 def test_pthread_sigmask_valid_signals(self):
1091 s = signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals())
1092 self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, s)
1093 # Get current blocked set
1094 s = signal.pthread_sigmask(signal.SIG_UNBLOCK, signal.valid_signals())
1095 self.assertLessEqual(s, signal.valid_signals())
1096
1097 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1098 'need signal.pthread_sigmask()')
1099 @threading_helper.requires_working_threading()
1100 def test_pthread_sigmask(self):
1101 code = """if 1:
1102 import signal
1103 import os; import threading
1104
1105 def handler(signum, frame):
1106 1/0
1107
1108 def kill(signum):
1109 os.kill(os.getpid(), signum)
1110
1111 def check_mask(mask):
1112 for sig in mask:
1113 assert isinstance(sig, signal.Signals), repr(sig)
1114
1115 def read_sigmask():
1116 sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
1117 check_mask(sigmask)
1118 return sigmask
1119
1120 signum = signal.SIGUSR1
1121
1122 # Install our signal handler
1123 old_handler = signal.signal(signum, handler)
1124
1125 # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
1126 old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
1127 check_mask(old_mask)
1128 try:
1129 kill(signum)
1130 except ZeroDivisionError:
1131 pass
1132 else:
1133 raise Exception("ZeroDivisionError not raised")
1134
1135 # Block and then raise SIGUSR1. The signal is blocked: the signal
1136 # handler is not called, and the signal is now pending
1137 mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
1138 check_mask(mask)
1139 kill(signum)
1140
1141 # Check the new mask
1142 blocked = read_sigmask()
1143 check_mask(blocked)
1144 if signum not in blocked:
1145 raise Exception("%s not in %s" % (signum, blocked))
1146 if old_mask ^ blocked != {signum}:
1147 raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))
1148
1149 # Unblock SIGUSR1
1150 try:
1151 # unblock the pending signal calls immediately the signal handler
1152 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
1153 except ZeroDivisionError:
1154 pass
1155 else:
1156 raise Exception("ZeroDivisionError not raised")
1157 try:
1158 kill(signum)
1159 except ZeroDivisionError:
1160 pass
1161 else:
1162 raise Exception("ZeroDivisionError not raised")
1163
1164 # Check the new mask
1165 unblocked = read_sigmask()
1166 if signum in unblocked:
1167 raise Exception("%s in %s" % (signum, unblocked))
1168 if blocked ^ unblocked != {signum}:
1169 raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
1170 if old_mask != unblocked:
1171 raise Exception("%s != %s" % (old_mask, unblocked))
1172 """
1173 assert_python_ok('-c', code)
1174
1175 @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
1176 'need signal.pthread_kill()')
1177 @threading_helper.requires_working_threading()
1178 def test_pthread_kill_main_thread(self):
1179 # Test that a signal can be sent to the main thread with pthread_kill()
1180 # before any other thread has been created (see issue #12392).
1181 code = """if True:
1182 import threading
1183 import signal
1184 import sys
1185
1186 def handler(signum, frame):
1187 sys.exit(3)
1188
1189 signal.signal(signal.SIGUSR1, handler)
1190 signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
1191 sys.exit(2)
1192 """
1193
1194 with spawn_python('-c', code) as process:
1195 stdout, stderr = process.communicate()
1196 exitcode = process.wait()
1197 if exitcode != 3:
1198 raise Exception("Child error (exit code %s): %s" %
1199 (exitcode, stdout))
1200
1201
1202 class ESC[4;38;5;81mStressTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1203 """
1204 Stress signal delivery, especially when a signal arrives in
1205 the middle of recomputing the signal state or executing
1206 previously tripped signal handlers.
1207 """
1208
1209 def setsig(self, signum, handler):
1210 old_handler = signal.signal(signum, handler)
1211 self.addCleanup(signal.signal, signum, old_handler)
1212
1213 def measure_itimer_resolution(self):
1214 N = 20
1215 times = []
1216
1217 def handler(signum=None, frame=None):
1218 if len(times) < N:
1219 times.append(time.perf_counter())
1220 # 1 µs is the smallest possible timer interval,
1221 # we want to measure what the concrete duration
1222 # will be on this platform
1223 signal.setitimer(signal.ITIMER_REAL, 1e-6)
1224
1225 self.addCleanup(signal.setitimer, signal.ITIMER_REAL, 0)
1226 self.setsig(signal.SIGALRM, handler)
1227 handler()
1228 while len(times) < N:
1229 time.sleep(1e-3)
1230
1231 durations = [times[i+1] - times[i] for i in range(len(times) - 1)]
1232 med = statistics.median(durations)
1233 if support.verbose:
1234 print("detected median itimer() resolution: %.6f s." % (med,))
1235 return med
1236
1237 def decide_itimer_count(self):
1238 # Some systems have poor setitimer() resolution (for example
1239 # measured around 20 ms. on FreeBSD 9), so decide on a reasonable
1240 # number of sequential timers based on that.
1241 reso = self.measure_itimer_resolution()
1242 if reso <= 1e-4:
1243 return 10000
1244 elif reso <= 1e-2:
1245 return 100
1246 else:
1247 self.skipTest("detected itimer resolution (%.3f s.) too high "
1248 "(> 10 ms.) on this platform (or system too busy)"
1249 % (reso,))
1250
1251 @unittest.skipUnless(hasattr(signal, "setitimer"),
1252 "test needs setitimer()")
1253 def test_stress_delivery_dependent(self):
1254 """
1255 This test uses dependent signal handlers.
1256 """
1257 N = self.decide_itimer_count()
1258 sigs = []
1259
1260 def first_handler(signum, frame):
1261 # 1e-6 is the minimum non-zero value for `setitimer()`.
1262 # Choose a random delay so as to improve chances of
1263 # triggering a race condition. Ideally the signal is received
1264 # when inside critical signal-handling routines such as
1265 # Py_MakePendingCalls().
1266 signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
1267
1268 def second_handler(signum=None, frame=None):
1269 sigs.append(signum)
1270
1271 # Here on Linux, SIGPROF > SIGALRM > SIGUSR1. By using both
1272 # ascending and descending sequences (SIGUSR1 then SIGALRM,
1273 # SIGPROF then SIGALRM), we maximize chances of hitting a bug.
1274 self.setsig(signal.SIGPROF, first_handler)
1275 self.setsig(signal.SIGUSR1, first_handler)
1276 self.setsig(signal.SIGALRM, second_handler) # for ITIMER_REAL
1277
1278 expected_sigs = 0
1279 deadline = time.monotonic() + support.SHORT_TIMEOUT
1280
1281 while expected_sigs < N:
1282 os.kill(os.getpid(), signal.SIGPROF)
1283 expected_sigs += 1
1284 # Wait for handlers to run to avoid signal coalescing
1285 while len(sigs) < expected_sigs and time.monotonic() < deadline:
1286 time.sleep(1e-5)
1287
1288 os.kill(os.getpid(), signal.SIGUSR1)
1289 expected_sigs += 1
1290 while len(sigs) < expected_sigs and time.monotonic() < deadline:
1291 time.sleep(1e-5)
1292
1293 # All ITIMER_REAL signals should have been delivered to the
1294 # Python handler
1295 self.assertEqual(len(sigs), N, "Some signals were lost")
1296
1297 @unittest.skipUnless(hasattr(signal, "setitimer"),
1298 "test needs setitimer()")
1299 def test_stress_delivery_simultaneous(self):
1300 """
1301 This test uses simultaneous signal handlers.
1302 """
1303 N = self.decide_itimer_count()
1304 sigs = []
1305
1306 def handler(signum, frame):
1307 sigs.append(signum)
1308
1309 self.setsig(signal.SIGUSR1, handler)
1310 self.setsig(signal.SIGALRM, handler) # for ITIMER_REAL
1311
1312 expected_sigs = 0
1313 while expected_sigs < N:
1314 # Hopefully the SIGALRM will be received somewhere during
1315 # initial processing of SIGUSR1.
1316 signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
1317 os.kill(os.getpid(), signal.SIGUSR1)
1318
1319 expected_sigs += 2
1320 # Wait for handlers to run to avoid signal coalescing
1321 for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
1322 if len(sigs) >= expected_sigs:
1323 break
1324
1325 # All ITIMER_REAL signals should have been delivered to the
1326 # Python handler
1327 self.assertEqual(len(sigs), N, "Some signals were lost")
1328
1329 @unittest.skipUnless(hasattr(signal, "SIGUSR1"),
1330 "test needs SIGUSR1")
1331 @threading_helper.requires_working_threading()
1332 def test_stress_modifying_handlers(self):
1333 # bpo-43406: race condition between trip_signal() and signal.signal
1334 signum = signal.SIGUSR1
1335 num_sent_signals = 0
1336 num_received_signals = 0
1337 do_stop = False
1338
1339 def custom_handler(signum, frame):
1340 nonlocal num_received_signals
1341 num_received_signals += 1
1342
1343 def set_interrupts():
1344 nonlocal num_sent_signals
1345 while not do_stop:
1346 signal.raise_signal(signum)
1347 num_sent_signals += 1
1348
1349 def cycle_handlers():
1350 while num_sent_signals < 100 or num_received_signals < 1:
1351 for i in range(20000):
1352 # Cycle between a Python-defined and a non-Python handler
1353 for handler in [custom_handler, signal.SIG_IGN]:
1354 signal.signal(signum, handler)
1355
1356 old_handler = signal.signal(signum, custom_handler)
1357 self.addCleanup(signal.signal, signum, old_handler)
1358
1359 t = threading.Thread(target=set_interrupts)
1360 try:
1361 ignored = False
1362 with support.catch_unraisable_exception() as cm:
1363 t.start()
1364 cycle_handlers()
1365 do_stop = True
1366 t.join()
1367
1368 if cm.unraisable is not None:
1369 # An unraisable exception may be printed out when
1370 # a signal is ignored due to the aforementioned
1371 # race condition, check it.
1372 self.assertIsInstance(cm.unraisable.exc_value, OSError)
1373 self.assertIn(
1374 f"Signal {signum:d} ignored due to race condition",
1375 str(cm.unraisable.exc_value))
1376 ignored = True
1377
1378 # bpo-43406: Even if it is unlikely, it's technically possible that
1379 # all signals were ignored because of race conditions.
1380 if not ignored:
1381 # Sanity check that some signals were received, but not all
1382 self.assertGreater(num_received_signals, 0)
1383 self.assertLessEqual(num_received_signals, num_sent_signals)
1384 finally:
1385 do_stop = True
1386 t.join()
1387
1388
1389 class ESC[4;38;5;81mRaiseSignalTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1390
1391 def test_sigint(self):
1392 with self.assertRaises(KeyboardInterrupt):
1393 signal.raise_signal(signal.SIGINT)
1394
1395 @unittest.skipIf(sys.platform != "win32", "Windows specific test")
1396 def test_invalid_argument(self):
1397 try:
1398 SIGHUP = 1 # not supported on win32
1399 signal.raise_signal(SIGHUP)
1400 self.fail("OSError (Invalid argument) expected")
1401 except OSError as e:
1402 if e.errno == errno.EINVAL:
1403 pass
1404 else:
1405 raise
1406
1407 def test_handler(self):
1408 is_ok = False
1409 def handler(a, b):
1410 nonlocal is_ok
1411 is_ok = True
1412 old_signal = signal.signal(signal.SIGINT, handler)
1413 self.addCleanup(signal.signal, signal.SIGINT, old_signal)
1414
1415 signal.raise_signal(signal.SIGINT)
1416 self.assertTrue(is_ok)
1417
1418 def test__thread_interrupt_main(self):
1419 # See https://github.com/python/cpython/issues/102397
1420 code = """if 1:
1421 import _thread
1422 class Foo():
1423 def __del__(self):
1424 _thread.interrupt_main()
1425
1426 x = Foo()
1427 """
1428
1429 rc, out, err = assert_python_ok('-c', code)
1430 self.assertIn(b'OSError: Signal 2 ignored due to race condition', err)
1431
1432
1433
1434 class ESC[4;38;5;81mPidfdSignalTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1435
1436 @unittest.skipUnless(
1437 hasattr(signal, "pidfd_send_signal"),
1438 "pidfd support not built in",
1439 )
1440 def test_pidfd_send_signal(self):
1441 with self.assertRaises(OSError) as cm:
1442 signal.pidfd_send_signal(0, signal.SIGINT)
1443 if cm.exception.errno == errno.ENOSYS:
1444 self.skipTest("kernel does not support pidfds")
1445 elif cm.exception.errno == errno.EPERM:
1446 self.skipTest("Not enough privileges to use pidfs")
1447 self.assertEqual(cm.exception.errno, errno.EBADF)
1448 my_pidfd = os.open(f'/proc/{os.getpid()}', os.O_DIRECTORY)
1449 self.addCleanup(os.close, my_pidfd)
1450 with self.assertRaisesRegex(TypeError, "^siginfo must be None$"):
1451 signal.pidfd_send_signal(my_pidfd, signal.SIGINT, object(), 0)
1452 with self.assertRaises(KeyboardInterrupt):
1453 signal.pidfd_send_signal(my_pidfd, signal.SIGINT)
1454
1455 def tearDownModule():
1456 support.reap_children()
1457
1458 if __name__ == "__main__":
1459 unittest.main()