python (3.11.7)

(root)/
lib/
python3.11/
test/
__pycache__/
test_signal.cpython-311.pyc

e	NddlZddlZddlZddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlZddlZddl
mZddlmZddlmZmZddlmZ	ddlZn
#e$rdZYnwxYwGddejZeje	jdkd	Gd
dejZeje	jdkdGd
dejZGddejZeje	jdkd	GddejZeje eddGddejZ!eje	jdkd	eje eddej"eje eddGddejZ#eje	jdkd	eje edoe eddGd d!ejZ$Gd"d#ejZ%Gd$d%ejZ&Gd&d'ejZ'Gd(d)ejZ(d*Z)e*d+krej+dSdS),N)support)	os_helper)assert_python_okspawn_python)threading_helperceZdZdZdZdS)GenericTestscttD]}tt|}|dvr!||tj=|dvr!||tjb|dr6|ds!||tj|dr@||tj|tj
dtjtj
ddd	t
}tj|tjtjtj
dddt
}tj|tjttd
d}|@tjtj
d
ddt
}tj||dSdS)N>SIG_DFLSIG_IGN>	SIG_BLOCKSIG_SETMASKSIG_UNBLOCKSIGSIG_CTRL_win32Signalssignalc|r+|do|dp|dS)Nrrr)isupper
startswithnames D/BuggyBox/python/3.11.7/bootstrap/lib/python3.11/test/test_signal.py<lambda>z)GenericTests.test_enums.<locals>.<lambda>)sOLLNNQ//O8O8O4O0w//)sourceHandlersc
|dvS)N)rrrs rrz)GenericTests.test_enums.<locals>.<lambda>3sT%;;rSigmasksc
|dvS)N)r
rrr!rs rrz)GenericTests.test_enums.<locals>.<lambda><s)T!Tr)dirrgetattrassertIsInstancerr"rrassertEqualsysplatformenum
_old_convert_IntEnum_test_simple_enum)selfrsigCheckedSignalsCheckedHandlersr"CheckedSigmaskss       r
test_enumszGenericTests.test_enumssKK
	8
	8D&$''C---%%c6?;;;;DDD%%c6?;;;;''
80G0G
8%%c6>::::))
8%%c6>:::  w777+i00
	
~v~>>>,j(;;
	
@@@6:t44"0L*hTT!O

"?H=====
 rcttD]Z}tt|}tj|r/tj|s||jd[dS)Nr)r$rr%inspect	isroutine	isbuiltinr'
__module__)r.rvalues   rtest_functions_module_attrz'GenericTests.test_functions_module_attrAsqKK	=	=DFD))E ''
=0A%0H0H
=  !18<<<	=	=rN)__name__r8__qualname__r3r:r!rrr	r	s3%>%>%>N=====rr	rzNot valid on WindowsceZdZdZdZdZdZdZdZe	j
eeddd	Z
e	j
ejd
ejdZdS)
PosixTestscdSNr!r.argss  rtrivial_signal_handlerz!PosixTests.trivial_signal_handlerLsrc|ttjd|ttjd|j|ttjddS)Ni)assertRaises
ValueErrorr	getsignalrC	strsignalr.s r,test_out_of_range_signal_number_raises_errorz7PosixTests.test_out_of_range_signal_number_raises_errorOsf*f&6===*fmT5	7	7	7	
*f&6=====rch|ttjtjddSr@)rE	TypeErrorrSIGUSR1rIs r0test_setting_signal_handler_to_none_raises_errorz;PosixTests.test_setting_signal_handler_to_none_raises_errorWs1)V] .$	0	0	0	0	0rctjtj|j}||tj|tjtj|jtjtj||tjtj|dSr@)rSIGHUPrCr&rr'rG)r.hups  rtest_getsignalzPosixTests.test_getsignal[smFM4+FGGc6?333)&-884	6	6	6
fmS))))&-88#>>>>>rc2|dtjtj|dtjtj|dtjtjdS)N	Interrupt
TerminatedHangup)assertInrrHSIGINTSIGTERMrPrIs rtest_strsignalzPosixTests.test_strsignalcsk

k6#3FM#B#BCCC

lF$4V^$D$DEEE

h 0 ? ?@@@@@rctjt}tj|d}t|dS)Nzsignalinterproctester.py)ospathdirname__file__joinr)r.r^scripts   rtest_interprocess_signalz#PosixTests.test_interprocess_signalis?'//(++g'ABB     r
valid_signalszrequires signal.valid_signalsctj}||t|tjj||tjj||d||tj	||
t|tj	ttD]}|
ds|dvr||5tt|}||d|
|tj	dddn#1swxYwYdS)Nrr>rrr)rrcr&setrWrrXSIGALRMassertNotInNSIG
assertLesslenr$rsubTestr%assertGreaterEqual)r.srsignums    rtest_valid_signalszPosixTests.test_valid_signalsns

 ""a%%%

fn+Q///

fn,a000Aa(((A,,,KK		5		5D??5))
---4((
5
5 ..''222444
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
		5		5s#AE;;E?	E?	sys.executable required.ctjtjddgtj}|d|j||jtj
dS)z+KeyboardInterrupt triggers exit via SIGINT.-czaimport os, signal, time
os.kill(os.getpid(), signal.SIGINT)
for _ in range(999): time.sleep(0.01)stderrKeyboardInterruptN)
subprocessrunr(
executablePIPErWrtr'
returncoderrX)r.processs  r!test_keyboard_interrupt_exit_codez,PosixTests.test_keyboard_interrupt_exit_codesn.9:"(((	


*GN;;;+fm^<<<<<rN)r;r8r<rCrJrNrRrZrbunittest
skipUnlesshasattrrror(rxrrequires_subprocessr|r!rrr>r>Js


>>>000???AAA!!!
X(('55	5,X)CDD W ""	=	=#"ED	=	=	=rr>zWindows specificceZdZdZdZejejde	j
dZdS)WindowsSignalTestsctj}||t|t|d|tjj||	d||	tj
||t|tj
dS)Nr)rrcr&rerlrjrWrrXrgrhrir.rms  rroz%WindowsSignalTests.test_valid_signalss ""a%%%A***

fn+Q///Aa(((A,,,,,rcd}t}tjtjtjtjtjtjtjfD]S}tj	|=tj|tj|||
|T|||t5tjd|dddn#1swxYwY|t5tjd|ddddS#1swxYwYdS)NcdSr@r!)xys  rrz3WindowsSignalTests.test_issue9324.<locals>.<lambda>str)rerSIGABRTSIGBREAKSIGFPESIGILLrXSIGSEGVrYrGadd
assertTruerErF)r.handlercheckedr/s    rtest_issue9324z!WindowsSignalTests.test_issue9324s##%%NFOV]M6=&.N$	!	!C
$$0
c6=g#>#>???C      


z
*
*	'	'M"g&&&	'	'	'	'	'	'	'	'	'	'	'	'	'	'	'

z
*
*	&	&M!W%%%	&	&	&	&	&	&	&	&	&	&	&	&	&	&	&	&	&	&s$$DD

D
+EEErpctjtjddgtj}|d|jd}||j|dS)z?KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT.rrzraise KeyboardInterruptrsrul:N)	rvrwr(rxryrWrtr'rz)r.r{STATUS_CONTROL_C_EXITs   rr|z4WindowsSignalTests.test_keyboard_interrupt_exit_codesk.'@A!(((	


*GN;;; *+-BCCCCCrN)r;r8r<rorr}r~r(rxrrr|r!rrrrs|---&&&*X)CDD W ""DD#"EDDDDrrceZdZdZdZejejddZ	ej
ejdejee
dddZej
ejdejejdd	Zej
ejd
kdej
ejdejee
dddZd
S)
WakeupFDTestscL|t5tjtjdddn#1swxYwY|t5tjtjdddddS#1swxYwYdS)N)rnF)rErLr
set_wakeup_fdrXrIs rtest_invalid_callzWakeupFDTests.test_invalid_calls


y
)
)	7	7 
6666	7	7	7	7	7	7	7	7	7	7	7	7	7	7	7

y
)
)	7	7 666	7	7	7	7	7	7	7	7	7	7	7	7	7	7	7	7	7	7s# AAA, BB Bctj}|ttft
j|dSr@)rmake_bad_fdrErFOSErrorrr)r.fds  rtest_invalid_fdzWakeupFDTests.test_invalid_fdsA

"
$
$:w/ .	4	4	4	4	4rzneeds working sockets.ctj}|}||tt
ftj|dSr@)socketfilenocloserErFrrr)r.sockrs   rtest_invalid_socketz!WakeupFDTests.test_invalid_socketsX}
[[]]

:w/ .	4	4	4	4	4rzEmscripten cannot fstat pipes.piperequires os.pipe()ctj\}}|tj||tj|tj\}}|tj||tj|t	tdr*tj|dtj|dt
j||t
j|||t
jd||t
jdddS)Nset_blockingFr)	r\r
addCleanuprrrrrr')r.r1w1r2w2s     rtest_set_wakeup_fd_resultz'WakeupFDTests.test_set_wakeup_fd_results*B"%%%"%%%B"%%%"%%%2~&&	'OB&&&OB&&&R   -b112666-b112666-b11266666rcvtj}||j|d|}tj}||j|d|}tj||tj|||tjd||tjdddS)NFr)rrrsetblockingrrrr')r.sock1fd1sock2fd2s     r test_set_wakeup_fd_socket_resultz.WakeupFDTests.test_set_wakeup_fd_socket_results
$$$
%   llnn
$$$
%   llnnS!!!-c22C888-b113777-b11266666rrztests specific to POSIXc8tj\}}|tj||tj|tj|d|t5}tj|dddn#1swxYwY|	t|jd|ztj|dtj|tjddS)NTz&the fd %s must be in non-blocking modeFr)r\rrrrrErFrrr'str	exception)r.rfdwfdcms    rtest_set_wakeup_fd_blockingz)WakeupFDTests.test_set_wakeup_fd_blockings5799S#&&&#&&&	T"""


z
*
*	&b %%%	&	&	&	&	&	&	&	&	&	&	&	&	&	&	&R\**ACG	I	I	I	U###S!!!R     sB''B+.B+N)r;r8r<rrr}r~rhas_socket_supportrskipIf
is_emscriptenrr\rrr(r)rr!rrrrs777444
X35MNN44ON4X_W*,LMMXV,,.BCC77DCNM7"X_W*,LMMX35MNN77ONNM7$X_S\W,.GHHX_W*,LMMXV,,.BCC!!DCNMIH!!!rrc eZdZejeduddddZejedudejee	dddZ
d	Zd
ZdZ
ejeedd
dZdS)WakeupSignalTestsNneed _testcapiTorderedcdttt|||}t	d|dS)Naif 1:
        import _testcapi
        import os
        import signal
        import struct

        signals = {!r}

        def handler(signum, frame):
            pass

        def check_signum(signals):
            data = os.read(read, len(signals)+1)
            raised = struct.unpack('%uB' % len(data), data)
            if not {!r}:
                raised = set(raised)
                signals = set(signals)
            if raised != signals:
                raise Exception("%r != %r" % (raised, signals))

        {}

        signal.signal(signal.SIGALRM, handler)
        read, write = os.pipe()
        os.set_blocking(write, False)
        signal.set_wakeup_fd(write)

        test()
        check_signum(signals)

        os.close(read)
        os.close(write)
        rr)formattuplemapintr)r.	test_bodyrsignalscodes     rcheck_wakeupzWakeupSignalTests.check_wakeup#sL @
F5S'**++Wi@@A	
D	t$$$$$rrrczd}tj\}}	tj|d|dn#t$rYnwxYwtj|tj|n-#tj|tj|wxYwt
d|dS)Na&if 1:
        import _testcapi
        import errno
        import os
        import signal
        import sys
        from test.support import captured_stderr

        def handler(signum, frame):
            1/0

        signal.signal(signal.SIGALRM, handler)
        r, w = os.pipe()
        os.set_blocking(r, False)

        # Set wakeup_fd a read-only file descriptor to trigger the error
        signal.set_wakeup_fd(r)
        try:
            with captured_stderr() as err:
                signal.raise_signal(signal.SIGALRM)
        except ZeroDivisionError:
            # An ignored exception should have been printed out on stderr
            err = err.getvalue()
            if ('Exception ignored when trying to write to the signal wakeup fd'
                not in err):
                raise AssertionError(err)
            if ('OSError: [Errno %d]' % errno.EBADF) not in err:
                raise AssertionError(err)
        else:
            raise AssertionError("ZeroDivisionError not raised")

        os.close(r)
        os.close(w)
        xz9OS doesn't report write() error on the read end of a piperr)r\rwriteskipTestrrr)r.rrws    rtest_wakeup_write_errorz)WakeupSignalTests.test_wakeup_write_errorJs!Dwyy1	HQ
MMUVVVV			D	

HQKKKHQKKKK
HQKKKHQKKKKt$$$$$s(AA>
AA>AA>>*B(cF|dtjdS)Nadef test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            class InterruptSelect(Exception):
                pass

            def handler(signum, frame):
                raise InterruptSelect
            signal.signal(signal.SIGALRM, handler)

            signal.alarm(1)

            # We attempt to get a signal during the sleep,
            # before select is called
            try:
                select.select([], [], [], TIMEOUT_FULL)
            except InterruptSelect:
                pass
            else:
                raise Exception("select() was not interrupted")

            before_time = time.monotonic()
            select.select([read], [], [], TIMEOUT_FULL)
            after_time = time.monotonic()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        rrrfrIs rtest_wakeup_fd_earlyz&WakeupSignalTests.test_wakeup_fd_earlys0>^?					rcF|dtjdS)Na`def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            class InterruptSelect(Exception):
                pass

            def handler(signum, frame):
                raise InterruptSelect
            signal.signal(signal.SIGALRM, handler)

            signal.alarm(1)
            before_time = time.monotonic()
            # We attempt to get a signal during the select call
            try:
                select.select([read], [], [], TIMEOUT_FULL)
            except InterruptSelect:
                pass
            else:
                raise Exception("select() was not interrupted")
            after_time = time.monotonic()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        rrIs rtest_wakeup_fd_duringz'WakeupSignalTests.test_wakeup_fd_durings06^7					rc\|dtjtjdS)Nzdef test():
            signal.signal(signal.SIGUSR1, handler)
            signal.raise_signal(signal.SIGUSR1)
            signal.raise_signal(signal.SIGALRM)
        )rrrMrfrIs rtest_signumzWakeupSignalTests.test_signums4^V^		-	-	-	-	-rpthread_sigmaskneed signal.pthread_sigmask()c`|dtjtjddS)Nadef test():
            signum1 = signal.SIGUSR1
            signum2 = signal.SIGUSR2

            signal.signal(signum1, handler)
            signal.signal(signum2, handler)

            signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
            signal.raise_signal(signum1)
            signal.raise_signal(signum2)
            # Unblocking the 2 signals calls the C signal handler twice
            signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
        Fr)rrrMSIGUSR2rIs rtest_pendingzWakeupSignalTests.test_pendings=	
nfne		=	=	=	=	=r)r;r8r<r}r	_testcapirr~rr\rrrrrrr!rrrr!sX_Y$&(8998<$%$%$%$%:9$%LX_Y$&(899XV,,.BCC1%1%DC:91%f   D<---X):;;8::
=
=::
=
=
=rr
socketpairzneed socket.socketpairceZdZejeduddZejeduddZejeduddZdS)WakeupSocketSignalTestsNrc*d}td|dS)Naif 1:
        import signal
        import socket
        import struct
        import _testcapi

        signum = signal.SIGINT
        signals = (signum,)

        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()
        write.setblocking(False)
        signal.set_wakeup_fd(write.fileno())

        signal.raise_signal(signum)

        data = read.recv(1)
        if not data:
            raise Exception("no signum written")
        raised = struct.unpack('B', data)
        if raised != signals:
            raise Exception("%r != %r" % (raised, signals))

        read.close()
        write.close()
        rrrr.rs  rtest_socketz#WakeupSocketSignalTests.test_sockets!>	t$$$$$rc|tjdkrd}nd}d|}td|dS)Nntsendra+if 1:
        import errno
        import signal
        import socket
        import sys
        import time
        import _testcapi
        from test.support import captured_stderr

        signum = signal.SIGINT

        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()
        read.setblocking(False)
        write.setblocking(False)

        signal.set_wakeup_fd(write.fileno())

        # Close sockets: send() will fail
        read.close()
        write.close()

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if ('Exception ignored when trying to {action} to the signal wakeup fd'
            not in err):
            raise AssertionError(err)
        actionrrr\rrrr.rrs   rtest_send_errorz'WakeupSocketSignalTests.test_send_errorsQ7d??FFF!B
F&F!!C	
D	t$$$$$rc|tjdkrd}nd}d|}td|dS)NrrraJ
if 1:
        import errno
        import signal
        import socket
        import sys
        import time
        import _testcapi
        from test.support import captured_stderr

        signum = signal.SIGINT

        # This handler will be called, but we intentionally won't read from
        # the wakeup fd.
        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()

        # Fill the socketpair buffer
        if sys.platform == 'win32':
            # bpo-34130: On Windows, sometimes non-blocking send fails to fill
            # the full socketpair buffer, so use a timeout of 50 ms instead.
            write.settimeout(0.050)
        else:
            write.setblocking(False)

        written = 0
        if sys.platform == "vxworks":
            CHUNK_SIZES = (1,)
        else:
            # Start with large chunk size to reduce the
            # number of send needed to fill the buffer.
            CHUNK_SIZES = (2 ** 16, 2 ** 8, 1)
        for chunk_size in CHUNK_SIZES:
            chunk = b"x" * chunk_size
            try:
                while True:
                    write.send(chunk)
                    written += chunk_size
            except (BlockingIOError, TimeoutError):
                pass

        print(f"%s bytes written into the socketpair" % written, flush=True)

        write.setblocking(False)
        try:
            write.send(b"x")
        except BlockingIOError:
            # The socketpair buffer seems full
            pass
        else:
            raise AssertionError("%s bytes failed to fill the socketpair "
                                 "buffer" % written)

        # By default, we get a warning when a signal arrives
        msg = ('Exception ignored when trying to {action} '
               'to the signal wakeup fd')
        signal.set_wakeup_fd(write.fileno())

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("first set_wakeup_fd() test failed, "
                                 "stderr: %r" % err)

        # And also if warn_on_full_buffer=True
        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True)

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) "
                                 "test failed, stderr: %r" % err)

        # But not if warn_on_full_buffer=False
        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False)

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if err != "":
            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) "
                                 "test failed, stderr: %r" % err)

        # And then check the default again, to make sure warn_on_full_buffer
        # settings don't leak across calls.
        signal.set_wakeup_fd(write.fileno())

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("second set_wakeup_fd() test failed, "
                                 "stderr: %r" % err)

        rrrrrs   rtest_warn_on_full_bufferz0WakeupSocketSignalTests.test_warn_on_full_buffer*sR7d??FFFgN
F&F!!O	
P	t$$$$$r)	r;r8r<r}rrrrrr!rrrrsX_Y$&(899!%!%:9!%FX_Y$&(899(%(%:9(%TX_Y$&(899n%n%:9n%n%n%rrsiginterruptzneeds signal.siginterrupt()rrcReZdZdZdZdZejddZdS)SiginterruptTestcd|d}td|5}	|j}|tj\}}||z}|}|dvrtd|d||dkcd	d	d	S#tj	$r#|
Yd	d	d	d
SwxYw#1swxYwYd	S)zPerform a read during which a signal will arrive.  Return True if the
        read is interrupted by the signal and raises an exception.  Return False
        if it returns normally.
        zif 1:
            import errno
            import os
            import signal
            import sys

            interrupt = aq
            r, w = os.pipe()

            def handler(signum, frame):
                1 / 0

            signal.signal(signal.SIGALRM, handler)
            if interrupt is not None:
                signal.siginterrupt(signal.SIGALRM, interrupt)

            print("ready")
            sys.stdout.flush()

            # run the test twice
            try:
                for loop in range(2):
                    # send a SIGALRM in a second (during the read)
                    signal.alarm(1)
                    try:
                        # blocking call: read from a pipe without data
                        os.read(r, 1)
                    except ZeroDivisionError:
                        pass
                    else:
                        sys.exit(2)
                sys.exit(3)
            finally:
                os.close(r)
                os.close(w)
        rr)timeout)Child error (exit code ): rNF)rstdoutreadlinecommunicater
SHORT_TIMEOUTwait	ExceptionrvTimeoutExpiredkill)r.	interruptrr{
first_linerrtexitcodes        rreadpipe_interruptedz%SiginterruptTest.readpipe_interruptedsfTyyG#H$
%
%	'
'$^4466
!(!4!4W=R!4!S!S
$f,"<<>>6))#)'/xx%9::: A
	'	'	'	'	'	'	'	',


	'	'	'	'	'	'	'	'

	'	'	'	'	'	'	'	'	'	's4C<B7C#C<C
CCCCcZ|d}||dSr@r
rr.interrupteds  rtest_without_siginterruptz*SiginterruptTest.test_without_siginterrupt///55$$$$$rcZ|d}||dSNTrr
s  rtest_siginterrupt_onz%SiginterruptTest.test_siginterrupt_onrrwalltimecZ|d}||dS)NF)r
assertFalser
s  rtest_siginterrupt_offz&SiginterruptTest.test_siginterrupt_offs1
//66%%%%%rN)	r;r8r<r
rrrrequires_resourcerr!rrrrsk:':':'x%%%%%%Wz**&&+*&&&rr	getitimer	setitimerz/needs signal.getitimer() and signal.setitimer()ceZdZdZdZdZdZdZdZdZ	e
jej
dvd	d
ZdZdZd
S)
ItimerTestcd|_d|_d|_tjtj|j|_dS)NFr)hndl_called
hndl_countitimerrrfsig_alrm	old_alarmrIs rsetUpzItimerTest.setUps3 v~t}EErctjtj|j|jtj|jddSdS)Nr)rrfr"r rrIs rtearDownzItimerTest.tearDownsC
fndn555;"T[!,,,,,#"rcd|_dSr)rrAs  rr!zItimerTest.sig_alrmsrcd|_|jdkrtjd|jdkrtjtjd|xjdz
c_dS)NTrz.setitimer didn't disable ITIMER_VIRTUAL timer.r)rrrItimerErrorrITIMER_VIRTUALrAs  r
sig_vtalrmzItimerTest.sig_vtalrmsh?Q$&

_
!
!V2A6661rcRd|_tjtjddS)NTr)rrrITIMER_PROFrAs  rsig_profzItimerTest.sig_profs&+Q/////rc^|tjtjdddS)Nrr)rErr)rITIMER_REALrIs rtest_itimer_exczItimerTest.test_itimer_excs0	
&,f.>AFFF	H	Hrctj|_tj|jdtj||jddS)Ng?T)rr0r rpauser'rrIs rtest_itimer_realzItimerTest.test_itimer_real"sK(c***)400000r)netbsd5zDitimer not reliable (does not mix well with threading) on some BSDs.ctj|_tjtj|jtj|jddt
jddD]2}tddd}tj	|jd	krn3|
d
|tj	|jd	||jddS)Ng333333?皙?N@Ferror902	铖r?8timeout: likely cause: machine too slow or load too highT)
rr*r 	SIGVTALRMr+rr
busy_retrypowrrr'rr._s  rtest_itimer_virtualzItimerTest.test_itimer_virtual)s+
f&888c3///#D666		"		"AE5(++A,,
::;

MM!
"
"
"	
)$+66
CCC)400000rctj|_tjtj|jtj|jddt
jddD]2}tddd}tj	|jdkrn3|
d	|tj	|jd||jd
dS)Nr7r8Fr9r;r<r=r>r@T)
rr-r SIGPROFr.rrrBrCrrr'rrDs  rtest_itimer_profzItimerTest.test_itimer_prof@s(
fndm444c3///#D666		"		"AE5(++A,,
::;

MM!
"
"
"	
)$+66
CCC)400000rctj|_tj|jdt	jd||jddS)Nư>r(T)rr0r rtimesleepr'rrIs rtest_setitimer_tinyzItimerTest.test_setitimer_tinyUsO(d+++
1


)400000rN)r;r8r<r#r%r!r+r.r1r4r}rr(r)rFrIrNr!rrrrsFFF---   000HHH111X_S\\1NPP11PP1*111*11111rrcveZdZdZejeedddZejeeddejeedddZ	ejeedd	e
jd
ZejeedddZ
ejeedd
dZejeedddZejeedddZejeedddZejeedddZejeedddZejeedd
ejeedde
jdZejeedddZejeedddZejeedde
jdZejeedd	e
jdZdS)PendingSignalsTestsz[
    Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
    functions.
    
sigpendingzneed signal.sigpending()cl|tjtdSr@)r'rrQrerIs rtest_sigpending_emptyz)PendingSignalsTests.test_sigpending_emptyds-	
*,,cee44444rrrc*d}td|dS)Na
if 1:
            import os
            import signal

            def handler(signum, frame):
                1/0

            signum = signal.SIGUSR1
            signal.signal(signum, handler)

            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
            os.kill(os.getpid(), signum)
            pending = signal.sigpending()
            for sig in pending:
                assert isinstance(sig, signal.Signals), repr(pending)
            if pending != {signum}:
                raise Exception('%s != {%s}' % (pending, signum))
            try:
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
            except ZeroDivisionError:
                pass
            else:
                raise Exception("ZeroDivisionError not raised")
        rrrrs  rtest_sigpendingz#PendingSignalsTests.test_sigpendingis!
0	t$$$$$rpthread_killzneed signal.pthread_kill()c*d}td|dS)Naif 1:
            import signal
            import threading
            import sys

            signum = signal.SIGUSR1

            def handler(signum, frame):
                1/0

            signal.signal(signum, handler)

            tid = threading.get_ident()
            try:
                signal.pthread_kill(tid, signum)
            except ZeroDivisionError:
                pass
            else:
                raise Exception("ZeroDivisionError not raised")
        rrrrs  rtest_pthread_killz%PendingSignalsTests.test_pthread_kills!(	t$$$$$rc\d|d|d}td|dS)zo
        test: body of the "def test(signum):" function.
        blocked: number of the blocked signal
        zif 1:
        import signal
        import sys
        from signal import Signals

        def handler(signum, frame):
            1/0

        z

        blocked = a
        signum = signal.SIGALRM

        # child: block and wait the signal
        try:
            signal.signal(signum, handler)
            signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])

            # Do the tests
            test(signum)

            # The handler must not be called on unblock
            try:
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
            except ZeroDivisionError:
                print("the signal handler has been called",
                      file=sys.stderr)
                sys.exit(1)
        except BaseException as err:
            print("error: {}".format(err), file=sys.stderr)
            sys.stderr.flush()
            sys.exit(1)
        rrN)stripr)r.blockedtestrs    rwait_helperzPendingSignalsTests.wait_helpers=Nzz||||WWWA &J	t$$$$$rsigwaitzneed signal.sigwait()cF|tjddS)Na 
        def test(signum):
            signal.alarm(1)
            received = signal.sigwait([signum])
            assert isinstance(received, signal.Signals), received
            if received != signum:
                raise Exception('received %s, not %s' % (received, signum))
        r]rrfrIs rtest_sigwaitz PendingSignalsTests.test_sigwaits/	
*	
	
	
	
	
rsigwaitinfozneed signal.sigwaitinfo()cF|tjddS)Nz
        def test(signum):
            signal.alarm(1)
            info = signal.sigwaitinfo([signum])
            if info.si_signo != signum:
                raise Exception("info.si_signo != %s" % signum)
        r`rIs rtest_sigwaitinfoz$PendingSignalsTests.test_sigwaitinfo/	
*	
	
	
	
	
rsigtimedwaitzneed signal.sigtimedwait()cF|tjddS)Nz
        def test(signum):
            signal.alarm(1)
            info = signal.sigtimedwait([signum], 10.1000)
            if info.si_signo != signum:
                raise Exception('info.si_signo != %s' % signum)
        r`rIs rtest_sigtimedwaitz%PendingSignalsTests.test_sigtimedwaitrercF|tjddS)Nz
        def test(signum):
            import os
            os.kill(os.getpid(), signum)
            info = signal.sigtimedwait([signum], 0)
            if info.si_signo != signum:
                raise Exception('info.si_signo != %s' % signum)
        r`rIs rtest_sigtimedwait_pollz*PendingSignalsTests.test_sigtimedwait_polls/	
*	
	
	
	
	
rcF|tjddS)Nz
        def test(signum):
            received = signal.sigtimedwait([signum], 1.0)
            if received is not None:
                raise Exception("received=%r" % (received,))
        r`rIs rtest_sigtimedwait_timeoutz-PendingSignalsTests.test_sigtimedwait_timeouts/	
*	
	
	
	
	
rcntj}|ttj|gddS)Ng)rrfrErFrf)r.rns  r"test_sigtimedwait_negative_timeoutz6PendingSignalsTests.test_sigtimedwait_negative_timeout	s0*f&9F8TJJJJJrc&tdddS)Nrraif True:
            import os, threading, sys, time, signal

            # the default handler terminates the process
            signum = signal.SIGUSR1

            def kill_later():
                # wait until the main thread is waiting in sigwait()
                time.sleep(1)
                os.kill(os.getpid(), signum)

            # the signal must be blocked by all the threads
            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
            killer = threading.Thread(target=kill_later)
            killer.start()
            received = signal.sigwait([signum])
            if received != signum:
                print("sigwait() received %s, not %s" % (received, signum),
                      file=sys.stderr)
                sys.exit(1)
            killer.join()
            # unblock the signal, which should have been cleared by sigwait()
            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        rrIs rtest_sigwait_threadz'PendingSignalsTests.test_sigwait_threads'	 	
	
	
	
	
rcB|ttj|ttjd|ttjddd|ttjdg|t
5tjtjtjgdddn#1swxYwY|t
5tjtjdgdddn#1swxYwY|t
5tjtjddzgddddS#1swxYwYdS)Nr(rriri)rErLrrrrFr
rhrIs rtest_pthread_sigmask_argumentsz2PendingSignalsTests.test_pthread_sigmask_arguments2s1	
)V%;<<<)V%;Q???)V%;Q1EEE'6#94DDD


z
*
*	D	D"6#3fk]CCC	D	D	D	D	D	D	D	D	D	D	D	D	D	D	D


z
*
*	:	:"6#3aS999	:	:	:	:	:	:	:	:	:	:	:	:	:	:	:


z
*
*	@	@"6#3agY???	@	@	@	@	@	@	@	@	@	@	@	@	@	@	@	@	@	@s65+C,,C03C0!D>>EE#$FFFcjtjtjtj}|tjtj|tjtjtj}||tjdSr@)rrr
rcrrrassertLessEqualrs  r"test_pthread_sigmask_valid_signalsz6PendingSignalsTests.test_pthread_sigmask_valid_signals@s
"6#3V5I5K5KLL.0BAFFF"6#5v7K7M7MNNQ 4 6 677777rc*d}td|dS)Na-	if 1:
        import signal
        import os; import threading

        def handler(signum, frame):
            1/0

        def kill(signum):
            os.kill(os.getpid(), signum)

        def check_mask(mask):
            for sig in mask:
                assert isinstance(sig, signal.Signals), repr(sig)

        def read_sigmask():
            sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
            check_mask(sigmask)
            return sigmask

        signum = signal.SIGUSR1

        # Install our signal handler
        old_handler = signal.signal(signum, handler)

        # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
        old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        check_mask(old_mask)
        try:
            kill(signum)
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")

        # Block and then raise SIGUSR1. The signal is blocked: the signal
        # handler is not called, and the signal is now pending
        mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
        check_mask(mask)
        kill(signum)

        # Check the new mask
        blocked = read_sigmask()
        check_mask(blocked)
        if signum not in blocked:
            raise Exception("%s not in %s" % (signum, blocked))
        if old_mask ^ blocked != {signum}:
            raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))

        # Unblock SIGUSR1
        try:
            # unblock the pending signal calls immediately the signal handler
            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")
        try:
            kill(signum)
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")

        # Check the new mask
        unblocked = read_sigmask()
        if signum in unblocked:
            raise Exception("%s in %s" % (signum, unblocked))
        if blocked ^ unblocked != {signum}:
            raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
        if old_mask != unblocked:
            raise Exception("%s != %s" % (old_mask, unblocked))
        rrrrs  rtest_pthread_sigmaskz(PendingSignalsTests.test_pthread_sigmaskIs#GP	t$$$$$rcd}td|5}|\}}|}|dkrtd|d|	ddddS#1swxYwYdS)Na7if True:
            import threading
            import signal
            import sys

            def handler(signum, frame):
                sys.exit(3)

            signal.signal(signal.SIGUSR1, handler)
            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
            sys.exit(2)
        rrrrr)rrrr)r.rr{rrtr	s      rtest_pthread_kill_main_threadz1PendingSignalsTests.test_pthread_kill_main_threads$
%
%	4$0022NFF||~~H1}}i!)66!3444	4	4	4	4	4	4	4	4	4	4	4	4	4	4	4	4	4	4sAA((A,/A,N)r;r8r<__doc__r}r~rrrSrUrrequires_working_threadingrXr]rardrhrjrlrnrprrrurwryr!rrrPrP_sX6635555555X):;;8::X66355%%55::%6X885770022%%3277%.X):;;8::*%*%::*%XX33022

22
X77466

66
X88577

77
X88577	
	
77	
X88577

77
X88577KK77KX33022X):;;8::0022

32::22

<X):;;8::
@
@::
@X):;;8::88::8X):;;8::0022I%I%32::I%VX885770022443277444rrPc&eZdZdZdZdZdZeje	e
dddZeje	e
dddZeje	e
d	d
e
jdZdS)
StressTestz
    Stress signal delivery, especially when a signal arrives in
    the middle of recomputing the signal state or executing
    previously tripped signal handlers.
    crtj||}|tj||dSr@)rr)r.rnrold_handlers    rsetsigzStressTest.setsigs1mFG44
v{;;;;;rcdgdfd	}|tjtjd|tj||t
kr'tjdt
k'fdtt
dz
D}tj|}tj
rtd|fz|S)	NctkrGtjt	jtjddSdS)NrK)rjappendrLperf_counterrrr0)rnframeNtimess  rrz5StressTest.measure_itimer_resolution.<locals>.handlersR5zzA~~T.00111 !3T:::::~rrgMbP?c8g|]}|dz|z
S)r(r!).0irs  r
<listcomp>z8StressTest.measure_itimer_resolution.<locals>.<listcomp>s*JJJqU1Q3Z%(*JJJrr(z,detected median itimer() resolution: %.6f s.NN)rrrr0rrfrjrLrMrange
statisticsmedianrverboseprint)r.r	durationsmedrrs    @@rmeasure_itimer_resolutionz$StressTest.measure_itimer_resolutions	;	;	;	;	;	;	;	
(&*<a@@@FNG,,,			%jj1nnJt%jj1nnKJJJE#e**q.4I4IJJJ		**?	K@C6IJJJ
rc|}|dkrdS|dkrdS|d|fzdS)Ng-C6?i'g{Gz?dz^detected itimer resolution (%.3f s.) too high (> 10 ms.) on this platform (or system too busy))rr)r.resos  rdecide_itimer_countzStressTest.decide_itimer_countsa--//4<<5
T\\3MMM!G$
%
%
%
%
%rrztest needs setitimer()c"|}gd}dfd	}|tj||tj||tj|d}t
jtj	z}||krAtjtjtj|dz
}t|krUt
j|kr>t
jdt|krt
j|k>tjtjtj|dz
}t|krUt
j|kr>t
jdt|krt
j|k>||kA|t|ddS)	z;
        This test uses dependent signal handlers.
        crtjtjdtjdzzdS)NrKh㈵>)rrr0random)rnrs  r
first_handlerz@StressTest.test_stress_delivery_dependent.<locals>.first_handlers0
V/
$8N1NOOOOOrNc2|dSr@rrnrsigss  rsecond_handlerzAStressTest.test_stress_delivery_dependent.<locals>.second_handlerKKrrr(rSome signals were lostr)rrrrHrMrfrL	monotonicrrr\rgetpidrjrMr')r.rrr
expected_sigsdeadliners      @rtest_stress_delivery_dependentz)StressTest.test_stress_delivery_dependents
$$&&	P	P	P	 	 	 	 	 	 	
FNM222FNM222FNN333
>##g&;;aGBIKK000QMd))m++0@0@80K0K
4   d))m++0@0@80K0K
GBIKK000QMd))m++0@0@80K0K
4   d))m++0@0@80K0Ka	
TA'?@@@@@rch|}gfd}|tj||tj|d}||krtjtjdtjdzztj	tj
tj|dz
}tjtj
dD]}t|krn||k|t|dd	S)
z>
        This test uses simultaneous signal handlers.
        c2|dSr@rrs  rrz=StressTest.test_stress_delivery_simultaneous.<locals>.handlerrrrrKrrFr9rN)rrrrMrfrr0rr\rrrsleeping_retryrrjr')r.rrrrErs     @r!test_stress_delivery_simultaneousz,StressTest.test_stress_delivery_simultaneouss/
$$&&	 	 	 	 	 	
FNG,,,FNG,,,
a
V/
$8N1NOOOGBIKK000QM+G,AOOO

t99
--E.a	
TA'?@@@@@rrMztest needs SIGUSR1c4	
tjd
d	d	fd
fd}	
fd}tj}|tj|tj|}	d}tj5}||d||j	Y|
|j	jt|
dd	d
t|j	jd}dddn#1swxYwY|s|	d|	
d|dS#d|wxYw)NrFcdz
dSNr(r!)rnrnum_received_signalss  rcustom_handlerzAStressTest.test_stress_modifying_handlers.<locals>.custom_handler;s A%   rcFstjdz
dSdSr)rraise_signal)do_stopnum_sent_signalsrnsrset_interruptszAStressTest.test_stress_modifying_handlers.<locals>.set_interrupts?sE
&#F+++ A% 
&
&
&
&
&rcdksdkrFtdD](}tjfD]}tj|)dk>dkDdSdS)Nrr(i N)rrr)rrrrrrns  rcycle_handlerszAStressTest.test_stress_modifying_handlers.<locals>.cycle_handlersEs"S((,@1,D,Du77A$2FN#C77
fg66667#S((,@1,D,D,D,D,D,Dr)targetTzSignal dz ignored due to race condition)rrMr	threadingThreadrcatch_unraisable_exceptionstartr`
unraisabler&	exc_valuerrWr
assertGreaterrt)r.rrrtignoredrrrrrrns       @@@@@rtest_stress_modifying_handlersz)StressTest.test_stress_modifying_handlers1s'
 	&	&	&	&	&	&	&	&	&	&	&	&	7	7	7	7	7	7	7	7mFN;;
v{;;;N333	G355
#			   =,))"-*A7KKKMMJ&JJJJBM344666#G
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#$
<""#7;;;  !57GHHHG
FFHHHHHG
FFHHHHs18E?
BD."E?.D22E?5D261E??FN)r;r8r<rzrrrr}r~rrrrrr{rr!rrr}r}s#<<<0%%%X55133*A*A33*AXX55133AA33A<X33-//00226632//666rr}cfeZdZdZejejdkddZdZ	dZ
dS)RaiseSignalTestc|t5tjtjddddS#1swxYwYdSr@)rEKeyboardInterruptrrrXrIs rtest_sigintzRaiseSignalTest.test_sigintos


0
1
1	/	/
...	/	/	/	/	/	/	/	/	/	/	/	/	/	/	/	/	/	/sAAArzWindows specific testc	d}tj||ddS#t$r"}|jtjkrnYd}~dSd}~wwxYw)Nr(z#OSError (Invalid argument) expected)rrfailrerrnoEINVAL)r.rPes   rtest_invalid_argumentz%RaiseSignalTest.test_invalid_argumentssy	F'''II;<<<<<			w%,&&	s+/
AAAcdfd}tjtj|}|tjtj|tjtj|dS)NFcddSrr!)abis_oks  rrz-RaiseSignalTest.test_handler.<locals>.handlersEEEr)rrXrrr)r.r
old_signalrs   @rtest_handlerzRaiseSignalTest.test_handlers{					]6='::

v}jAAAFM***rc^d}td|\}}}|d|dS)Nzif 1:
        import _thread
        class Foo():
            def __del__(self):
                _thread.interrupt_main()

        x = Foo()
        rrs/OSError: Signal 2 ignored due to race condition)rrW)r.rrcouterrs     rtest__thread_interrupt_mainz+RaiseSignalTest.test__thread_interrupt_mains;(d33C

H#NNNNNrN)r;r8r<rr}rr(r)rrrr!rrrrmsy///X_S\W,.EFF		GF				OOOOOrrcVeZdZejeedddZdS)PidfdSignalTestpidfd_send_signalzpidfd support not built inc|t5}tjdtjdddn#1swxYwY|jjtjkr|dn/|jjtj	kr|d|
|jjtjtj
dtjtj}|tj||t&d5tj|tjt)ddddn#1swxYwY|t*5tj|tjddddS#1swxYwYdS)Nrzkernel does not support pidfdsz"Not enough privileges to use pidfsz/proc/z^siginfo must be None$)rErrrrXrrENOSYSrEPERMr'EBADFr\openrO_DIRECTORYrrassertRaisesRegexrLobjectr)r.rmy_pidfds   rtest_pidfd_send_signalz&PidfdSignalTest.test_pidfd_send_signalsI


w
'
'	72$Q
666	7	7	7	7	7	7	7	7	7	7	7	7	7	7	7
<--MM:;;;;
\
5;
.
.MM>???+U[99971BIKK112>BB(+++

#
#I/G
H
H	K	K$Xv}fhhJJJ	K	K	K	K	K	K	K	K	K	K	K	K	K	K	K


0
1
1	>	>$Xv}===	>	>	>	>	>	>	>	>	>	>	>	>	>	>	>	>	>	>s5 AAA	.FF
F( GGGN)r;r8r<r}r~rrrr!rrrrsRX+,,$
>
>	
>
>
>rrc,tjdSr@)r
reap_childrenr!rrtearDownModulersr__main__),r*rr5r\rrrrrvr(rrLr}r\rtest.supportrtest.support.script_helperrrrrImportErrorTestCaser	rr)r>r~rrrrrrrrrPr}rrrr;mainr!rr<module>rs<				















""""""EEEEEEEE))))))III/=/=/=/=/=8$/=/=/=d(*@AAH=H=H=H=H="H=H=BAH=^S\W,.@AA-D-D-D-D-D*-D-DBA-D`S!S!S!S!S!H%S!S!S!l(*@AAs=s=s=s=s=)s=s=BAs=lWWV\224LMM@%@%@%@%@%h/@%@%NM@%F(*@AAWWV^446STTWWR((*>??R&R&R&R&R&x(R&R&@?UTBAR&j(*@AAWWV[11Rggfk6R6RJLLd1d1d1d1d1"d1d1LLBAd1NP4P4P4P4P4(+P4P4P4f
xxxxx"xxxv)O)O)O)O)Oh')O)O)OZ>>>>>h'>>>*zHMOOOOOsAAA