python (3.12.0)

(root)/
lib/
python3.12/
test/
__pycache__/
test_signal.cpython-312.pyc

Αe	vddlZddlZddlZddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlZddlZddl
mZddlmZddlmZmZddlmZ	ddlZGddej.Zej2e	j4dk(d	Gd
dej.Zej8e	j4dk(dGd
dej.ZGddej.Zej2e	j4dk(d	Gddej.Zej8e eddGddej.Z!ej2e	j4dk(d	ej8e eddejDej8e eddGddej.Z#ej2e	j4dk(d	ej8e edxr	e eddGd d!ej.Z$Gd"d#ej.Z%Gd$d%ej.Z&Gd&d'ej.Z'Gd(d)ej.Z(d*Z)e*d+k(rejVyy#e$rdZYcwxYw),N)support)	os_helper)assert_python_okspawn_python)threading_helperceZdZdZdZy)GenericTestscttD]}tt|}|dvr!|j|tj8|dvr!|j|tj
]|j
dr2|j
ds!|j|tj|j
ds|j|tj|jtjdtjtjddd	t
}tj|tjtjtjdddt
}tj|tjttd
d}|Dtjtjd
ddt
}tj||yy)N>SIG_DFLSIG_IGN>	SIG_BLOCKSIG_SETMASKSIG_UNBLOCKSIGSIG_CTRL_win32Signalssignalc|jxr%|jdxr|jdxs|jdS)Nrrr)isupper
startswithnames D/BuggyBox/python/3.12.0/bootstrap/lib/python3.12/test/test_signal.py<lambda>z)GenericTests.test_enums.<locals>.<lambda>)sDLLNQ/O8O4O0w/)sourceHandlersc
|dvS)N)rrrs rrz)GenericTests.test_enums.<locals>.<lambda>3sT%;;rSigmasksc
|dvS)N)r
rrr!rs rrz)GenericTests.test_enums.<locals>.<lambda><s)T!Tr)dirrgetattrassertIsInstancerr"rrassertEqualsysplatformenum
_old_convert_IntEnum_test_simple_enum)selfrsigCheckedSignalsCheckedHandlersr"CheckedSigmaskss       r
test_enumszGenericTests.test_enumsszK
	8D&$'C--%%c6??;DD%%c6??;'0G%%c6>>:)%%c6>>:  w7
	8++i0
	
~v~~>,,j(;
	
@6:t4"00LL*hT!O

""?H=
 rcttD]Z}tt|}tj|s)tj
|r?|j
|jd\y)Nr)r$rr%inspect	isroutine	isbuiltinr'
__module__)r.rvalues   rtest_functions_module_attrz'GenericTests.test_functions_module_attrAsTK	=DFD)E  '0A0A%0H  !1!18<	=rN)__name__r8__qualname__r3r:r!rrr	r	s
%>N=rr	rzNot valid on WindowsceZdZdZdZdZdZdZdZe	jeeddd	Z
e	jejd
ej"dZy)
PosixTestscyNr!r.argss  rtrivial_signal_handlerz!PosixTests.trivial_signal_handlerLsrc|jttjd|jttjd|j|jttj
dy)Ni)assertRaises
ValueErrorr	getsignalrC	strsignalr.s r,test_out_of_range_signal_number_raises_errorz7PosixTests.test_out_of_range_signal_number_raises_errorOsU*f&6&6=*fmmT55	7	
*f&6&6=rcl|jttjtjdyr@)rE	TypeErrorrSIGUSR1rIs r0test_setting_signal_handler_to_none_raises_errorz;PosixTests.test_setting_signal_handler_to_none_raises_errorWs!)V]] ..$	0rctjtj|j}|j|tj|jtjtj|jtjtj||jtjtj|yr@)rSIGHUPrCr&rr'rG)r.hups  rtest_getsignalzPosixTests.test_getsignal[smmFMM4+F+FGc6??3))&--844	6

fmmS)))&--8#>rc6|jdtjtj|jdtjtj|jdtjtj
y)N	Interrupt
TerminatedHangup)assertInrrHSIGINTSIGTERMrPrIs rtest_strsignalzPosixTests.test_strsignalcsY

k6#3#3FMM#BC

lF$4$4V^^$DE

h 0 0 ?@rctjjt}tjj	|d}t|y)Nzsignalinterproctester.py)ospathdirname__file__joinr)r.r^scripts   rtest_interprocess_signalz#PosixTests.test_interprocess_signalis1''//(+g'AB r
valid_signalszrequires signal.valid_signalsctj}|j|t|j	tj
j||j	tj
j||jd||jtj||jt|tjttD]v}|jds|dvr|j|5tt|}|j!|d|j|tjdddxy#1swYxYw)Nrr>rrr)rrcr&setrWrrXSIGALRMassertNotInNSIG
assertLesslenr$rsubTestr%assertGreaterEqual)r.srsignums    rtest_valid_signalszPosixTests.test_valid_signalsns

  "a%

fnn++Q/

fnn,,a0Aa(A,K		5D??5)--4(
5 .''24
5
5
		5
5
5s
AE##E,	sys.executable required.ctjtjddgtj}|jd|j|j|jtjy)z+KeyboardInterrupt triggers exit via SIGINT.-czaimport os, signal, time
os.kill(os.getpid(), signal.SIGINT)
for _ in range(999): time.sleep(0.01)stderrKeyboardInterruptN)
subprocessrunr(
executablePIPErWrtr'
returncoderrX)r.processs  r!test_keyboard_interrupt_exit_codez,PosixTests.test_keyboard_interrupt_exit_codes_..9:"(	


*GNN;++fmm^<rN)r;r8r<rCrJrNrRrZrbunittest
skipUnlesshasattrrror(rxrrequires_subprocessr|r!rrr>r>Js
>0?A!
X('5	5,X)CD W  "	=#E	=rr>zWindows specificceZdZdZdZejejde	jdZy)WindowsSignalTestsctj}|j|t|j	t|d|j
tjj||jd||jtj||jt|tjy)Nr)rrcr&rerlrjrWrrXrgrhrir.rms  rroz%WindowsSignalTests.test_valid_signalss  "a%A*

fnn++Q/Aa(A,rcd}t}tjtjtjtj
tjtjtjfD]S}tj|tj|tj|||j|U|j||jt5tjd|ddd|jt5tjd|dddy#1swY>xYw#1swYyxYw)Ncyr@r!)xys  rrz3WindowsSignalTests.test_issue9324.<locals>.<lambda>sr)rerSIGABRTSIGBREAKSIGFPESIGILLrXSIGSEGVrYrGadd
assertTruerErF)r.handlercheckedr/s    rtest_issue9324z!WindowsSignalTests.test_issue9324s#%NNFOOV]]MM6==&..NN$	!C
$0

c6==g#>?C 	!	
 


z
*	'MM"g&	'

z
*	&MM!W%	&	&	'	'	&	&s5E	)E	EErpctjtjddgtj}|jd|jd}|j|j|y)z?KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT.rrzraise KeyboardInterruptrsrul:N)	rvrwr(rxryrWrtr'rz)r.r{STATUS_CONTROL_C_EXITs   rr|z4WindowsSignalTests.test_keyboard_interrupt_exit_codesZ..'@A!(	


*GNN; *++-BCrN)r;r8r<rorr}r~r(rxrrr|r!rrrrsJ-&*X)CD W  "D#EDrrc*eZdZdZdZejejddZ	ejejdejee
dddZejejdejejdd	Zejej"d
k(dejejdejee
dddZy
)
WakeupFDTestsc<|jt5tjtjddd|jt5tjtjddddy#1swYLxYw#1swYyxYw)N)rnF)rErLr
set_wakeup_fdrXrIs rtest_invalid_callzWakeupFDTests.test_invalid_callsu


y
)	7  

6	7

y
)	7  6	7	7		7	7	7	7s%B%BBBctj}|jttft
j|yr@)rmake_bad_fdrErFOSErrorrr)r.fds  rtest_invalid_fdzWakeupFDTests.test_invalid_fds0

"
"
$:w/ ..	4rzneeds working sockets.ctj}|j}|j|jtt
ftj|yr@)socketfilenocloserErFrrr)r.sockrs   rtest_invalid_socketz!WakeupFDTests.test_invalid_socketsA}}
[[]

:w/ ..	4rzEmscripten cannot fstat pipes.piperequires os.pipe()ctj\}}|jtj||jtj|tj\}}|jtj||jtj|t	tdr,tj
|dtj
|dt
j||jt
j|||jt
jd||jt
jddy)Nset_blockingFr)	r\r
addCleanuprrrrrr')r.r1w1r2w2s     rtest_set_wakeup_fd_resultz'WakeupFDTests.test_set_wakeup_fd_resultsB"%"%B"%"%2~&OOB&OOB&R --b126--b126--b126rcLtj}|j|j|jd|j	}tj}|j|j|jd|j	}tj||jtj|||jtjd||jtjddy)NFr)rrrsetblockingrrrr')r.sock1fd1sock2fd2s     r test_set_wakeup_fd_socket_resultz.WakeupFDTests.test_set_wakeup_fd_socket_results

$
% lln

$
% llnS!--c2C8--b137--b126rrztests specific to POSIXc,tj\}}|jtj||jtj|tj|d|jt5}tj|ddd|jtjd|ztj|dtj|tjdy#1swYrxYw)NTz&the fd %s must be in non-blocking modeFr)r\rrrrrErFrrr'str	exception)r.rfdwfdcms    rtest_set_wakeup_fd_blockingz)WakeupFDTests.test_set_wakeup_fd_blockings779S#&#&	T"


z
*	&b  %	&R\\*ACG	I	U#S!R 	&	&sD

DN)r;r8r<rrr}r~rhas_socket_supportrskipIf
is_emscriptenrr\rrr(r)rr!rrrrs174
X335MN4O4X__W**,LMXV,.BC7DN7"X__W**,LMX335MN7ON7$X__S\\W,.GHX__W**,LMXV,.BC!DNI!rrc
eZdZejeduddddZejedudejee	dddZ
d	Zd
ZdZ
ejeedd
dZy)WakeupSignalTestsNneed _testcapiTorderedcpdjttt|||}t	d|y)Naif 1:
        import _testcapi
        import os
        import signal
        import struct

        signals = {!r}

        def handler(signum, frame):
            pass

        def check_signum(signals):
            data = os.read(read, len(signals)+1)
            raised = struct.unpack('%uB' % len(data), data)
            if not {!r}:
                raised = set(raised)
                signals = set(signals)
            if raised != signals:
                raise Exception("%r != %r" % (raised, signals))

        {}

        signal.signal(signal.SIGALRM, handler)
        read, write = os.pipe()
        os.set_blocking(write, False)
        signal.set_wakeup_fd(write)

        test()
        check_signum(signals)

        os.close(read)
        os.close(write)
        rr)formattuplemapintr)r.	test_bodyrsignalscodes     rcheck_wakeupzWakeupSignalTests.check_wakeup#s7 @
F5S'*+Wi@A	
D	t$rrrcnd}tj\}}	tj|d|jdtj
|tj
|t
d|y#t$rYBwxYw#tj
|tj
|wxYw)Na&if 1:
        import _testcapi
        import errno
        import os
        import signal
        import sys
        from test.support import captured_stderr

        def handler(signum, frame):
            1/0

        signal.signal(signal.SIGALRM, handler)
        r, w = os.pipe()
        os.set_blocking(r, False)

        # Set wakeup_fd a read-only file descriptor to trigger the error
        signal.set_wakeup_fd(r)
        try:
            with captured_stderr() as err:
                signal.raise_signal(signal.SIGALRM)
        except ZeroDivisionError:
            # An ignored exception should have been printed out on stderr
            err = err.getvalue()
            if ('Exception ignored when trying to write to the signal wakeup fd'
                not in err):
                raise AssertionError(err)
            if ('OSError: [Errno %d]' % errno.EBADF) not in err:
                raise AssertionError(err)
        else:
            raise AssertionError("ZeroDivisionError not raised")

        os.close(r)
        os.close(w)
        xz9OS doesn't report write() error on the read end of a piperr)r\rwriteskipTestrrr)r.rrws    rtest_wakeup_write_errorz)WakeupSignalTests.test_wakeup_write_errorJs!Dwwy1	HHQ
MMUVHHQKHHQKt$		

HHQKHHQKs(A9B9	BBBB,B4cD|jdtjy)Nadef test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            class InterruptSelect(Exception):
                pass

            def handler(signum, frame):
                raise InterruptSelect
            signal.signal(signal.SIGALRM, handler)

            signal.alarm(1)

            # We attempt to get a signal during the sleep,
            # before select is called
            try:
                select.select([], [], [], TIMEOUT_FULL)
            except InterruptSelect:
                pass
            else:
                raise Exception("select() was not interrupted")

            before_time = time.monotonic()
            select.select([read], [], [], TIMEOUT_FULL)
            after_time = time.monotonic()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        rrrfrIs rtest_wakeup_fd_earlyz&WakeupSignalTests.test_wakeup_fd_earlys>^^?	rcD|jdtjy)Na`def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            class InterruptSelect(Exception):
                pass

            def handler(signum, frame):
                raise InterruptSelect
            signal.signal(signal.SIGALRM, handler)

            signal.alarm(1)
            before_time = time.monotonic()
            # We attempt to get a signal during the select call
            try:
                select.select([read], [], [], TIMEOUT_FULL)
            except InterruptSelect:
                pass
            else:
                raise Exception("select() was not interrupted")
            after_time = time.monotonic()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        rrIs rtest_wakeup_fd_duringz'WakeupSignalTests.test_wakeup_fd_durings6^^7	rcb|jdtjtjy)Nzdef test():
            signal.signal(signal.SIGUSR1, handler)
            signal.raise_signal(signal.SIGUSR1)
            signal.raise_signal(signal.SIGALRM)
        )rrrMrfrIs rtest_signumzWakeupSignalTests.test_signums$^^V^^		-rpthread_sigmaskneed signal.pthread_sigmask()cf|jdtjtjdy)Nadef test():
            signum1 = signal.SIGUSR1
            signum2 = signal.SIGUSR2

            signal.signal(signum1, handler)
            signal.signal(signum2, handler)

            signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
            signal.raise_signal(signum1)
            signal.raise_signal(signum2)
            # Unblocking the 2 signals calls the C signal handler twice
            signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
        Fr)rrrMSIGUSR2rIs rtest_pendingzWakeupSignalTests.test_pendings-	
nnfnne		=r)r;r8r<r}r	_testcapirr~rr\rrrrrrr!rrrr!sX__Y$&(898<$%:$%LX__Y$&(89XV,.BC1%D:1%f D<-X):;8:
=:
=rr
socketpairzneed socket.socketpairceZdZejeduddZejeduddZejeduddZy)WakeupSocketSignalTestsNrc d}td|y)Naif 1:
        import signal
        import socket
        import struct
        import _testcapi

        signum = signal.SIGINT
        signals = (signum,)

        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()
        write.setblocking(False)
        signal.set_wakeup_fd(write.fileno())

        signal.raise_signal(signum)

        data = read.recv(1)
        if not data:
            raise Exception("no signum written")
        raised = struct.unpack('B', data)
        if raised != signals:
            raise Exception("%r != %r" % (raised, signals))

        read.close()
        write.close()
        rrrr.rs  rtest_socketz#WakeupSocketSignalTests.test_sockets>	t$rcptjdk(rd}nd}dj|}td|y)Nntsendra+if 1:
        import errno
        import signal
        import socket
        import sys
        import time
        import _testcapi
        from test.support import captured_stderr

        signum = signal.SIGINT

        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()
        read.setblocking(False)
        write.setblocking(False)

        signal.set_wakeup_fd(write.fileno())

        # Close sockets: send() will fail
        read.close()
        write.close()

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if ('Exception ignored when trying to {action} to the signal wakeup fd'
            not in err):
            raise AssertionError(err)
        actionrrr\rrrr.rrs   rtest_send_errorz'WakeupSocketSignalTests.test_send_errors@77d?FF!B
F&F!C	
D	t$rcptjdk(rd}nd}dj|}td|y)NrrraJ
if 1:
        import errno
        import signal
        import socket
        import sys
        import time
        import _testcapi
        from test.support import captured_stderr

        signum = signal.SIGINT

        # This handler will be called, but we intentionally won't read from
        # the wakeup fd.
        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()

        # Fill the socketpair buffer
        if sys.platform == 'win32':
            # bpo-34130: On Windows, sometimes non-blocking send fails to fill
            # the full socketpair buffer, so use a timeout of 50 ms instead.
            write.settimeout(0.050)
        else:
            write.setblocking(False)

        written = 0
        if sys.platform == "vxworks":
            CHUNK_SIZES = (1,)
        else:
            # Start with large chunk size to reduce the
            # number of send needed to fill the buffer.
            CHUNK_SIZES = (2 ** 16, 2 ** 8, 1)
        for chunk_size in CHUNK_SIZES:
            chunk = b"x" * chunk_size
            try:
                while True:
                    write.send(chunk)
                    written += chunk_size
            except (BlockingIOError, TimeoutError):
                pass

        print(f"%s bytes written into the socketpair" % written, flush=True)

        write.setblocking(False)
        try:
            write.send(b"x")
        except BlockingIOError:
            # The socketpair buffer seems full
            pass
        else:
            raise AssertionError("%s bytes failed to fill the socketpair "
                                 "buffer" % written)

        # By default, we get a warning when a signal arrives
        msg = ('Exception ignored when trying to {action} '
               'to the signal wakeup fd')
        signal.set_wakeup_fd(write.fileno())

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("first set_wakeup_fd() test failed, "
                                 "stderr: %r" % err)

        # And also if warn_on_full_buffer=True
        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True)

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) "
                                 "test failed, stderr: %r" % err)

        # But not if warn_on_full_buffer=False
        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False)

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if err != "":
            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) "
                                 "test failed, stderr: %r" % err)

        # And then check the default again, to make sure warn_on_full_buffer
        # settings don't leak across calls.
        signal.set_wakeup_fd(write.fileno())

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("second set_wakeup_fd() test failed, "
                                 "stderr: %r" % err)

        rrrrrs   rtest_warn_on_full_bufferz0WakeupSocketSignalTests.test_warn_on_full_buffer*sA77d?FFgN
F&F!O	
P	t$r)	r;r8r<r}rrrrrr!rrrrs{X__Y$&(89!%:!%FX__Y$&(89(%:(%TX__Y$&(89n%:n%rrsiginterruptzneeds signal.siginterrupt()rrcNeZdZdZdZdZejddZy)SiginterruptTestcd|d}td|5}	|jj}|jtj
\}}||z}|j
}|dvrtd|d||dk(cd	d	d	S#tj$r|jYd	d	d	y
wxYw#1swYy	xYw)zPerform a read during which a signal will arrive.  Return True if the
        read is interrupted by the signal and raises an exception.  Return False
        if it returns normally.
        zif 1:
            import errno
            import os
            import signal
            import sys

            interrupt = aq
            r, w = os.pipe()

            def handler(signum, frame):
                1 / 0

            signal.signal(signal.SIGALRM, handler)
            if interrupt is not None:
                signal.siginterrupt(signal.SIGALRM, interrupt)

            print("ready")
            sys.stdout.flush()

            # run the test twice
            try:
                for loop in range(2):
                    # send a SIGALRM in a second (during the read)
                    signal.alarm(1)
                    try:
                        # blocking call: read from a pipe without data
                        os.read(r, 1)
                    except ZeroDivisionError:
                        pass
                    else:
                        sys.exit(2)
                sys.exit(3)
            finally:
                os.close(r)
                os.close(w)
        rr)timeout)Child error (exit code ): rNF)rstdoutreadlinecommunicater
SHORT_TIMEOUTwait	ExceptionrvTimeoutExpiredkill)r.	interruptrr{
first_linerrtexitcodes        rreadpipe_interruptedz%SiginterruptTest.readpipe_interruptedsTG#H$
%	'
'$^^446
!(!4!4W=R=R!4!S
$f,"<<>6)#'/%9:: A
	'	',,
	'	'

	'	's.B;=B
.B;
#B8-B;7B88B;;CcH|jd}|j|yr@r
rr.interrupteds  rtest_without_siginterruptz*SiginterruptTest.test_without_siginterrupt //5$rcH|jd}|j|yNTrr
s  rtest_siginterrupt_onz%SiginterruptTest.test_siginterrupt_onrrwalltimecH|jd}|j|y)NF)r
assertFalser
s  rtest_siginterrupt_offz&SiginterruptTest.test_siginterrupt_offs"
//6%rN)	r;r8r<r
rrrrequires_resourcerr!rrrrs3:'x%%Wz*&+&rr	getitimer	setitimerz/needs signal.getitimer() and signal.setitimer()ceZdZdZdZdZdZdZdZdZ	e
jejdvd	d
ZdZdZy
)
ItimerTestcd|_d|_d|_tjtj|j
|_y)NFr)hndl_called
hndl_countitimerrrfsig_alrm	old_alarmrIs rsetUpzItimerTest.setUps2 v~~t}}Erctjtj|j|j!tj|jdyy)Nr)rrfr"r rrIs rtearDownzItimerTest.tearDowns;

fnndnn5;;"T[[!,#rcd|_yr)rrAs  rr!zItimerTest.sig_alrms
rcd|_|jdkDrtjd|jdk(r$tjtj
d|xjdz
c_y)NTrz.setitimer didn't disable ITIMER_VIRTUAL timer.r)rrrItimerErrorrITIMER_VIRTUALrAs  r
sig_vtalrmzItimerTest.sig_vtalrms^??Q$$&

__
!V22A61rcZd|_tjtjdy)NTr)rrrITIMER_PROFrAs  rsig_profzItimerTest.sig_profs ++Q/rcd|jtjtjddy)Nrr)rErr)rITIMER_REALrIs rtest_itimer_exczItimerTest.test_itimer_excs'	
&,,f.>.>AFrctj|_tj|jdtj|j|jdy)Ng?T)rr0r rpauser'rrIs rtest_itimer_realzItimerTest.test_itimer_real"sB((c*))40r)netbsd5zDitimer not reliable (does not mix well with threading) on some BSDs.ctj|_tjtj|jtj
|jddt
jtjD]3}tddd}tj|jdk(s3n|jtj|jd|j|jdy)Ng333333?皙?902	铖r<T)
rr*r 	SIGVTALRMr+rr
busy_retryLONG_TIMEOUTpowrr'rr._s  rtest_itimer_virtualzItimerTest.test_itimer_virtual)s++

f&&8c3/##G$8$89	AE5(+A,
:		
))$++6
C))40rctj|_tjtj|jtj
|jddt
jtjD]3}tddd}tj|jdk(s3n|jtj|jd|j|jdy)Nr7r8r9r:r;T)
rr-r SIGPROFr.rrr>r?r@rr'rrAs  rtest_itimer_profzItimerTest.test_itimer_prof<s((

fnndmm4c3/##G$8$89	AE5(+A,
:		
))$++6
C))40rctj|_tj|jdt	j
d|j
|jdy)Nư>r(T)rr0r rtimesleepr'rrIs rtest_setitimer_tinyzItimerTest.test_setitimer_tinyMsF((d+

1
))40rN)r;r8r<r#r%r!r+r.r1r4r}rr(r)rCrFrKr!rrrrs`F- 0H1X__S\\\1NP1P1"1"1rrceZdZdZej
eedddZej
eeddej
eedddZ	ej
eedd	e
jd
Zej
eedddZ
ej
eedd
dZej
eedddZej
eedddZej
eedddZej
eedddZej
eedddZej
eedd
ej
eedde
jdZej
eedddZej
eedddZej
eedde
jdZej
eedd	e
jdZy)PendingSignalsTestsz[
    Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
    functions.
    
sigpendingzneed signal.sigpending()c\|jtjtyr@)r'rrNrerIs rtest_sigpending_emptyz)PendingSignalsTests.test_sigpending_empty\s	
**,ce4rrrc d}td|y)Na
if 1:
            import os
            import signal

            def handler(signum, frame):
                1/0

            signum = signal.SIGUSR1
            signal.signal(signum, handler)

            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
            os.kill(os.getpid(), signum)
            pending = signal.sigpending()
            for sig in pending:
                assert isinstance(sig, signal.Signals), repr(pending)
            if pending != {signum}:
                raise Exception('%s != {%s}' % (pending, signum))
            try:
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
            except ZeroDivisionError:
                pass
            else:
                raise Exception("ZeroDivisionError not raised")
        rrrrs  rtest_sigpendingz#PendingSignalsTests.test_sigpendingas
0	t$rpthread_killzneed signal.pthread_kill()c d}td|y)Naif 1:
            import signal
            import threading
            import sys

            signum = signal.SIGUSR1

            def handler(signum, frame):
                1/0

            signal.signal(signum, handler)

            tid = threading.get_ident()
            try:
                signal.pthread_kill(tid, signum)
            except ZeroDivisionError:
                pass
            else:
                raise Exception("ZeroDivisionError not raised")
        rrrrs  rtest_pthread_killz%PendingSignalsTests.test_pthread_kills(	t$rcJd|jd|d}td|y)zo
        test: body of the "def test(signum):" function.
        blocked: number of the blocked signal
        zif 1:
        import signal
        import sys
        from signal import Signals

        def handler(signum, frame):
            1/0

        z

        blocked = a
        signum = signal.SIGALRM

        # child: block and wait the signal
        try:
            signal.signal(signum, handler)
            signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])

            # Do the tests
            test(signum)

            # The handler must not be called on unblock
            try:
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
            except ZeroDivisionError:
                print("the signal handler has been called",
                      file=sys.stderr)
                sys.exit(1)
        except BaseException as err:
            print("error: {}".format(err), file=sys.stderr)
            sys.stderr.flush()
            sys.exit(1)
        rrN)stripr)r.blockedtestrs    rwait_helperzPendingSignalsTests.wait_helpers%Nzz|WA &J	t$rsigwaitzneed signal.sigwait()cD|jtjdy)Na 
        def test(signum):
            signal.alarm(1)
            received = signal.sigwait([signum])
            assert isinstance(received, signal.Signals), received
            if received != signum:
                raise Exception('received %s, not %s' % (received, signum))
        rZrrfrIs rtest_sigwaitz PendingSignalsTests.test_sigwaits	
*	
rsigwaitinfozneed signal.sigwaitinfo()cD|jtjdy)Nz
        def test(signum):
            signal.alarm(1)
            info = signal.sigwaitinfo([signum])
            if info.si_signo != signum:
                raise Exception("info.si_signo != %s" % signum)
        r]rIs rtest_sigwaitinfoz$PendingSignalsTests.test_sigwaitinfo	
*	
rsigtimedwaitzneed signal.sigtimedwait()cD|jtjdy)Nz
        def test(signum):
            signal.alarm(1)
            info = signal.sigtimedwait([signum], 10.1000)
            if info.si_signo != signum:
                raise Exception('info.si_signo != %s' % signum)
        r]rIs rtest_sigtimedwaitz%PendingSignalsTests.test_sigtimedwaitrbrcD|jtjdy)Nz
        def test(signum):
            import os
            os.kill(os.getpid(), signum)
            info = signal.sigtimedwait([signum], 0)
            if info.si_signo != signum:
                raise Exception('info.si_signo != %s' % signum)
        r]rIs rtest_sigtimedwait_pollz*PendingSignalsTests.test_sigtimedwait_polls	
*	
rcD|jtjdy)Nz
        def test(signum):
            received = signal.sigtimedwait([signum], 1.0)
            if received is not None:
                raise Exception("received=%r" % (received,))
        r]rIs rtest_sigtimedwait_timeoutz-PendingSignalsTests.test_sigtimedwait_timeouts	
*	
rcrtj}|jttj|gdy)Ng)rrfrErFrc)r.rns  r"test_sigtimedwait_negative_timeoutz6PendingSignalsTests.test_sigtimedwait_negative_timeouts)*f&9&9F8TJrctddy)Nrraif True:
            import os, threading, sys, time, signal

            # the default handler terminates the process
            signum = signal.SIGUSR1

            def kill_later():
                # wait until the main thread is waiting in sigwait()
                time.sleep(1)
                os.kill(os.getpid(), signum)

            # the signal must be blocked by all the threads
            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
            killer = threading.Thread(target=kill_later)
            killer.start()
            received = signal.sigwait([signum])
            if received != signum:
                print("sigwait() received %s, not %s" % (received, signum),
                      file=sys.stderr)
                sys.exit(1)
            killer.join()
            # unblock the signal, which should have been cleared by sigwait()
            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        rrIs rtest_sigwait_threadz'PendingSignalsTests.test_sigwait_threads	 	
rc,|jttj|jttjd|jttjddd|jttjdg|jt
5tjtjtjgddd|jt
5tjtjdgddd|jt
5tjtjddzgdddy#1swYxYw#1swY\xYw#1swYyxYw)Nr(rriri)rErLrrrrFr
rhrIs rtest_pthread_sigmask_argumentsz2PendingSignalsTests.test_pthread_sigmask_arguments*s/	
)V%;%;<)V%;%;Q?)V%;%;Q1E'6#9#94D


z
*	D""6#3#3fkk]C	D


z
*	:""6#3#3aS9	:


z
*	@""6#3#3agY?	@	@		D	D	:	:	@	@s$,4E2=&E>)F
2E;>F
Fctjtjtj}|j	tjtj
|tjtjtj}|j|tjyr@)rrr
rcrrrassertLessEqualrs  r"test_pthread_sigmask_valid_signalsz6PendingSignalsTests.test_pthread_sigmask_valid_signals8s{
""6#3#3V5I5I5KL..0B0BAF""6#5#5v7K7K7MNQ 4 4 67rc d}td|y)Na-	if 1:
        import signal
        import os; import threading

        def handler(signum, frame):
            1/0

        def kill(signum):
            os.kill(os.getpid(), signum)

        def check_mask(mask):
            for sig in mask:
                assert isinstance(sig, signal.Signals), repr(sig)

        def read_sigmask():
            sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
            check_mask(sigmask)
            return sigmask

        signum = signal.SIGUSR1

        # Install our signal handler
        old_handler = signal.signal(signum, handler)

        # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
        old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        check_mask(old_mask)
        try:
            kill(signum)
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")

        # Block and then raise SIGUSR1. The signal is blocked: the signal
        # handler is not called, and the signal is now pending
        mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
        check_mask(mask)
        kill(signum)

        # Check the new mask
        blocked = read_sigmask()
        check_mask(blocked)
        if signum not in blocked:
            raise Exception("%s not in %s" % (signum, blocked))
        if old_mask ^ blocked != {signum}:
            raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))

        # Unblock SIGUSR1
        try:
            # unblock the pending signal calls immediately the signal handler
            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")
        try:
            kill(signum)
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")

        # Check the new mask
        unblocked = read_sigmask()
        if signum in unblocked:
            raise Exception("%s in %s" % (signum, unblocked))
        if blocked ^ unblocked != {signum}:
            raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
        if old_mask != unblocked:
            raise Exception("%s != %s" % (old_mask, unblocked))
        rrrrs  rtest_pthread_sigmaskz(PendingSignalsTests.test_pthread_sigmaskAsGP	t$rcd}td|5}|j\}}|j}|dk7rtd|d|	dddy#1swYyxYw)Na7if True:
            import threading
            import signal
            import sys

            def handler(signum, frame):
                sys.exit(3)

            signal.signal(signal.SIGUSR1, handler)
            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
            sys.exit(2)
        rrrrr)rrrr)r.rr{rrtr	s      rtest_pthread_kill_main_threadz1PendingSignalsTests.test_pthread_kill_main_threadsl$
%	4$002NFF||~H1}!)6!344	4	4	4s:AAN)r;r8r<__doc__r}r~rrrPrRrrequires_working_threadingrUrZr^rarergrirkrmrorrrtrvr!rrrMrMWsJX635555X):;8:X635%5:%6X8570002%37%.X):;8:*%:*%XX302
2
X746
6
X857
7
X857	
7	
X857
7
X857K7KX302X):;8:0002
3:2

<X):;8:
@:
@X):;8:8:8X):;8:0002I%3:I%VX85700024374rrMc
eZdZdZdZdZdZeje	e
dddZeje	e
dddZeje	e
d	d
e
jdZy)
StressTestz
    Stress signal delivery, especially when a signal arrives in
    the middle of recomputing the signal state or executing
    previously tripped signal handlers.
    crtj||}|jtj||yr@)rr)r.rnrold_handlers    rsetsigzStressTest.setsigs&mmFG4

v{;rcdgdfd	}|jtjtjd|j	tj
||t
kr$tjdt
kr$tt
dz
Dcgc]}|dz|z
}}tj|}tjrtd|fz|Scc}w)NctkrHjtjt	j
tjdyy)NrH)rjappendrIperf_counterrrr0)rnframeNtimess  rrz5StressTest.measure_itimer_resolution.<locals>.handlers@5zA~T..01  !3!3T:rrgMbP?r(z,detected median itimer() resolution: %.6f s.NN)rrrr0r}rfrjrIrJrange
statisticsmedianrverboseprint)r.ri	durationsmedrrs     @@rmeasure_itimer_resolutionz$StressTest.measure_itimer_resolutions	;	
((&*<*<a@FNNG,	%j1nJJt%j1n5:#e*q.4IJqU1Q3Z%(*J	J	*??@C6IJ
	Ks/C<cf|j}|dkry|dkry|jd|fzy)Ng-C6?i'g{Gz?dz^detected itimer resolution (%.3f s.) too high (> 10 ms.) on this platform (or system too busy))rr)r.resos  rdecide_itimer_countzStressTest.decide_itimer_countsB--/4<
T\MMM!G$
%rrztest needs setitimer()c|j}gd}dfd	}|jtj||jtj||jtj
|d}t
jtjz}||kr=tjtjtj|dz
}t|krRt
j|kr;t
jdt|krt
j|kr;tjtjtj|dz
}t|krRt
j|kr;t
jdt|krt
j|kr;||kr=|jt|dy)	z;
        This test uses dependent signal handlers.
        c|tjtjdtjdzzy)NrHh㈵>)rrr0random)rnrs  r
first_handlerz@StressTest.test_stress_delivery_dependent.<locals>.first_handlers*
V//

$8N1NOrNc(j|yr@rrnrsigss  rsecond_handlerzAStressTest.test_stress_delivery_dependent.<locals>.second_handlerKKrrr(rSome signals were lostr)rr}rrErMrfrI	monotonicrrr\rgetpidrjrJr')r.rrr
expected_sigsdeadliners      @rtest_stress_delivery_dependentz)StressTest.test_stress_delivery_dependentsl
$$&	P	 	
FNNM2FNNM2FNNN3
>>#g&;&;;aGGBIIK0QMd)m+0@80K

4 d)m+0@80K
GGBIIK0QMd)m+0@80K

4 d)m+0@80Ka	
TA'?@rcd|j}gfd}|jtj||jtj|d}||krtj
tjdtjdzztjtjtj|dz
}tjtjD]}t|k\sn||kr|jt|dy)z>
        This test uses simultaneous signal handlers.
        c(j|yr@rrs  rrz=StressTest.test_stress_delivery_simultaneous.<locals>.handlerrrrrHrrrN)rr}rrMrfrr0rr\rrrsleeping_retryrrjr')r.rrrrBrs     @r!test_stress_delivery_simultaneousz,StressTest.test_stress_delivery_simultaneous	s
$$&	 	
FNNG,FNNG,
a
V//

$8N1NOGGBIIK0QM++G,A,AB
t9
-
a	
TA'?@rrMztest needs SIGUSR1c	
tjd
d	d	fd
fd}
fd}tj}|jtj|tj|}	d}tj5}|j|d|j|j`|j|jjt|jdd	d
t|jjd}ddd|s|j	d|j!	
d|jy#1swYBxYw#d|jwxYw)NrFcdz
yNr(r!)rnrnum_received_signalss  rcustom_handlerzAStressTest.test_stress_modifying_handlers.<locals>.custom_handler3s A% rcFstjdz
syyr)rraise_signal)do_stopnum_sent_signalsrnsrset_interruptszAStressTest.test_stress_modifying_handlers.<locals>.set_interrupts7s$##F+ A% rcdkrDtdD]/}tjfD]}tj|1dkrCyy)Nri N)rrr)rrrrrns  rcycle_handlerszAStressTest.test_stress_modifying_handlers.<locals>.cycle_handlers=sN"S(u7A$2FNN#C7

fg677#S(r)targetTzSignal dz ignored due to race condition)rrMr	threadingThreadrcatch_unraisable_exceptionstartr`
unraisabler&	exc_valuerrWr
assertGreaterri)r.rrr|tignoredrrrrrrns       @@@@@rtest_stress_modifying_handlersz)StressTest.test_stress_modifying_handlers)sM
 	&	&	7mmFN;

v{;N3	G335
#	 ==,))"--*A*A7KMM!&+IJBMM3346#G
#$""#7;OO02BCG
FFH1
#
#.G
FFHs%=E6BE*).E6*E3/E66F
N)r;r8r<rwr}rrr}r~rrrrrrxrr!rrrzrzs<0%X513*A3*AXX513A3A<X3-/000263/6rrzcjeZdZdZej
ejdk7ddZdZ	dZ
y)RaiseSignalTestc|jt5tjtjdddy#1swYyxYwr@)rEKeyboardInterruptrrrXrIs rtest_sigintzRaiseSignalTest.test_sigintgs8


0
1	/

.	/	/	/s$AArzWindows specific testc	d}tj||jdy#t$r)}|jtj
k(rnYd}~yd}~wwxYw)Nr(z#OSError (Invalid argument) expected)rrfailrerrnoEINVAL)r.rPes   rtest_invalid_argumentz%RaiseSignalTest.test_invalid_argumentksR	F'II;<	ww%,,&	s(+	AAAc"dfd}tjtj|}|jtjtj|tjtj|j	y)NFc
dyrr!)abis_oks  rrz-RaiseSignalTest.test_handler.<locals>.handlerysEr)rrXrrr)r.r
old_signalrs   @rtest_handlerzRaiseSignalTest.test_handlerwsY	]]6==':


v}}jAFMM*rcLd}td|\}}}|jd|y)Nzif 1:
        import _thread
        class Foo():
            def __del__(self):
                _thread.interrupt_main()

        x = Foo()
        rrs/OSError: Signal 2 ignored due to race condition)rrW)r.rrcouterrs     rtest__thread_interrupt_mainz+RaiseSignalTest.test__thread_interrupt_mains,(d3C

H#NrN)r;r8r<rr}rr(r)rrrr!rrrres>/X__S\\W,.EF	G		OrrcLeZdZejeedddZy)PidfdSignalTestpidfd_send_signalzpidfd support not built inc|jt5}tjdtjdddj
jtjk(r|jdn8|j
jtjk(r|jd|j|j
jtjtjdtjtj}|j!tj"||j%t&d5tj|tjt)dddd|jt*5tj|tjdddy#1swYxYw#1swYYxYw#1swYyxYw)Nrzkernel does not support pidfdsz"Not enough privileges to use pidfsz/proc/z^siginfo must be None$)rErrrrXrrENOSYSrEPERMr'EBADFr\openrO_DIRECTORYrrassertRaisesRegexrLobjectr)r.rmy_pidfds   rtest_pidfd_send_signalz&PidfdSignalTest.test_pidfd_send_signalsU


w
'	72$$Q

6	7
<<-MM:;
\\

5;;
.MM>?++U[[977VBIIK=12>>B(+

#
#I/G
H	K$$Xv}}fhJ	K


0
1	>$$Xv}}=	>	>	7	7	K	K	>	>s#%G/G#%G*GG'*G3N)r;r8r<r}r~rrrr!rrrrs/X+,$
>	
>rrc,tjyr@)r
reap_childrenr!rrtearDownModulersr__main__),r*rr5r\rrrrrvr(rrIr}rYrtest.supportrtest.support.script_helperrrrrImportErrorTestCaser	rr)r>r~rrrrrrrrrMrzrrrr;mainr!rr<module>rs	



"E)
/=8$$/=d(*@AH=""H=BH=^S\\W,.@A-D**-DB-D`S!H%%S!l(*@As=))s=Bs=lWV\24LM@%h//@%N@%F(*@AWV^46STWR(*>?R&x((R&@UBR&j(*@AWV[1Rgfk6RJL\1""\1LB\1~P4(++P4f
x""xv)Oh'')OZ>h''>*zHMMOm,IsJ--J87J8