1 """PyUnit testing that threads honor our signal semantics"""
2
3 import unittest
4 import signal
5 import os
6 import sys
7 from test.support import threading_helper
8 import _thread as thread
9 import time
10
11 if (sys.platform[:3] == 'win'):
12 raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
13
14 process_pid = os.getpid()
15 signalled_all=thread.allocate_lock()
16
17 USING_PTHREAD_COND = (sys.thread_info.name == 'pthread'
18 and sys.thread_info.lock == 'mutex+cond')
19
20 def registerSignals(for_usr1, for_usr2, for_alrm):
21 usr1 = signal.signal(signal.SIGUSR1, for_usr1)
22 usr2 = signal.signal(signal.SIGUSR2, for_usr2)
23 alrm = signal.signal(signal.SIGALRM, for_alrm)
24 return usr1, usr2, alrm
25
26
27 # The signal handler. Just note that the signal occurred and
28 # from who.
29 def handle_signals(sig,frame):
30 signal_blackboard[sig]['tripped'] += 1
31 signal_blackboard[sig]['tripped_by'] = thread.get_ident()
32
33 # a function that will be spawned as a separate thread.
34 def send_signals():
35 os.kill(process_pid, signal.SIGUSR1)
36 os.kill(process_pid, signal.SIGUSR2)
37 signalled_all.release()
38
39
40 @threading_helper.requires_working_threading()
41 @unittest.skipUnless(hasattr(signal, "alarm"), "test requires signal.alarm")
42 class ESC[4;38;5;81mThreadSignals(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
43
44 def test_signals(self):
45 with threading_helper.wait_threads_exit():
46 # Test signal handling semantics of threads.
47 # We spawn a thread, have the thread send two signals, and
48 # wait for it to finish. Check that we got both signals
49 # and that they were run by the main thread.
50 signalled_all.acquire()
51 self.spawnSignallingThread()
52 signalled_all.acquire()
53
54 # the signals that we asked the kernel to send
55 # will come back, but we don't know when.
56 # (it might even be after the thread exits
57 # and might be out of order.) If we haven't seen
58 # the signals yet, send yet another signal and
59 # wait for it return.
60 if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
61 or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
62 try:
63 signal.alarm(1)
64 signal.pause()
65 finally:
66 signal.alarm(0)
67
68 self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
69 self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
70 thread.get_ident())
71 self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
72 self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
73 thread.get_ident())
74 signalled_all.release()
75
76 def spawnSignallingThread(self):
77 thread.start_new_thread(send_signals, ())
78
79 def alarm_interrupt(self, sig, frame):
80 raise KeyboardInterrupt
81
82 @unittest.skipIf(USING_PTHREAD_COND,
83 'POSIX condition variables cannot be interrupted')
84 @unittest.skipIf(sys.platform.startswith('linux') and
85 not sys.thread_info.version,
86 'Issue 34004: musl does not allow interruption of locks '
87 'by signals.')
88 # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD
89 @unittest.skipIf(sys.platform.startswith('openbsd'),
90 'lock cannot be interrupted on OpenBSD')
91 def test_lock_acquire_interruption(self):
92 # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
93 # in a deadlock.
94 # XXX this test can fail when the legacy (non-semaphore) implementation
95 # of locks is used in thread_pthread.h, see issue #11223.
96 oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
97 try:
98 lock = thread.allocate_lock()
99 lock.acquire()
100 signal.alarm(1)
101 t1 = time.monotonic()
102 self.assertRaises(KeyboardInterrupt, lock.acquire, timeout=5)
103 dt = time.monotonic() - t1
104 # Checking that KeyboardInterrupt was raised is not sufficient.
105 # We want to assert that lock.acquire() was interrupted because
106 # of the signal, not that the signal handler was called immediately
107 # after timeout return of lock.acquire() (which can fool assertRaises).
108 self.assertLess(dt, 3.0)
109 finally:
110 signal.alarm(0)
111 signal.signal(signal.SIGALRM, oldalrm)
112
113 @unittest.skipIf(USING_PTHREAD_COND,
114 'POSIX condition variables cannot be interrupted')
115 @unittest.skipIf(sys.platform.startswith('linux') and
116 not sys.thread_info.version,
117 'Issue 34004: musl does not allow interruption of locks '
118 'by signals.')
119 # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD
120 @unittest.skipIf(sys.platform.startswith('openbsd'),
121 'lock cannot be interrupted on OpenBSD')
122 def test_rlock_acquire_interruption(self):
123 # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
124 # in a deadlock.
125 # XXX this test can fail when the legacy (non-semaphore) implementation
126 # of locks is used in thread_pthread.h, see issue #11223.
127 oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
128 try:
129 rlock = thread.RLock()
130 # For reentrant locks, the initial acquisition must be in another
131 # thread.
132 def other_thread():
133 rlock.acquire()
134
135 with threading_helper.wait_threads_exit():
136 thread.start_new_thread(other_thread, ())
137 # Wait until we can't acquire it without blocking...
138 while rlock.acquire(blocking=False):
139 rlock.release()
140 time.sleep(0.01)
141 signal.alarm(1)
142 t1 = time.monotonic()
143 self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5)
144 dt = time.monotonic() - t1
145 # See rationale above in test_lock_acquire_interruption
146 self.assertLess(dt, 3.0)
147 finally:
148 signal.alarm(0)
149 signal.signal(signal.SIGALRM, oldalrm)
150
151 def acquire_retries_on_intr(self, lock):
152 self.sig_recvd = False
153 def my_handler(signal, frame):
154 self.sig_recvd = True
155
156 old_handler = signal.signal(signal.SIGUSR1, my_handler)
157 try:
158 def other_thread():
159 # Acquire the lock in a non-main thread, so this test works for
160 # RLocks.
161 lock.acquire()
162 # Wait until the main thread is blocked in the lock acquire, and
163 # then wake it up with this.
164 time.sleep(0.5)
165 os.kill(process_pid, signal.SIGUSR1)
166 # Let the main thread take the interrupt, handle it, and retry
167 # the lock acquisition. Then we'll let it run.
168 time.sleep(0.5)
169 lock.release()
170
171 with threading_helper.wait_threads_exit():
172 thread.start_new_thread(other_thread, ())
173 # Wait until we can't acquire it without blocking...
174 while lock.acquire(blocking=False):
175 lock.release()
176 time.sleep(0.01)
177 result = lock.acquire() # Block while we receive a signal.
178 self.assertTrue(self.sig_recvd)
179 self.assertTrue(result)
180 finally:
181 signal.signal(signal.SIGUSR1, old_handler)
182
183 def test_lock_acquire_retries_on_intr(self):
184 self.acquire_retries_on_intr(thread.allocate_lock())
185
186 def test_rlock_acquire_retries_on_intr(self):
187 self.acquire_retries_on_intr(thread.RLock())
188
189 def test_interrupted_timed_acquire(self):
190 # Test to make sure we recompute lock acquisition timeouts when we
191 # receive a signal. Check this by repeatedly interrupting a lock
192 # acquire in the main thread, and make sure that the lock acquire times
193 # out after the right amount of time.
194 # NOTE: this test only behaves as expected if C signals get delivered
195 # to the main thread. Otherwise lock.acquire() itself doesn't get
196 # interrupted and the test trivially succeeds.
197 self.start = None
198 self.end = None
199 self.sigs_recvd = 0
200 done = thread.allocate_lock()
201 done.acquire()
202 lock = thread.allocate_lock()
203 lock.acquire()
204 def my_handler(signum, frame):
205 self.sigs_recvd += 1
206 old_handler = signal.signal(signal.SIGUSR1, my_handler)
207 try:
208 def timed_acquire():
209 self.start = time.monotonic()
210 lock.acquire(timeout=0.5)
211 self.end = time.monotonic()
212 def send_signals():
213 for _ in range(40):
214 time.sleep(0.02)
215 os.kill(process_pid, signal.SIGUSR1)
216 done.release()
217
218 with threading_helper.wait_threads_exit():
219 # Send the signals from the non-main thread, since the main thread
220 # is the only one that can process signals.
221 thread.start_new_thread(send_signals, ())
222 timed_acquire()
223 # Wait for thread to finish
224 done.acquire()
225 # This allows for some timing and scheduling imprecision
226 self.assertLess(self.end - self.start, 2.0)
227 self.assertGreater(self.end - self.start, 0.3)
228 # If the signal is received several times before PyErr_CheckSignals()
229 # is called, the handler will get called less than 40 times. Just
230 # check it's been called at least once.
231 self.assertGreater(self.sigs_recvd, 0)
232 finally:
233 signal.signal(signal.SIGUSR1, old_handler)
234
235
236 def setUpModule():
237 global signal_blackboard
238
239 signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 },
240 signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 },
241 signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } }
242
243 oldsigs = registerSignals(handle_signals, handle_signals, handle_signals)
244 unittest.addModuleCleanup(registerSignals, *oldsigs)
245
246
247 if __name__ == '__main__':
248 unittest.main()