(root)/
Python-3.11.7/
Lib/
test/
_test_eintr.py
       1  """
       2  This test suite exercises some system calls subject to interruption with EINTR,
       3  to check that it is actually handled transparently.
       4  It is intended to be run by the main test suite within a child process, to
       5  ensure there is no background thread running (so that signals are delivered to
       6  the correct thread).
       7  Signals are generated in-process using setitimer(ITIMER_REAL), which allows
       8  sub-second periodicity (contrarily to signal()).
       9  """
      10  
      11  import contextlib
      12  import faulthandler
      13  import fcntl
      14  import os
      15  import platform
      16  import select
      17  import signal
      18  import socket
      19  import subprocess
      20  import sys
      21  import time
      22  import unittest
      23  
      24  from test import support
      25  from test.support import os_helper
      26  from test.support import socket_helper
      27  
      28  
      29  # gh-109592: Tolerate a difference of 20 ms when comparing timings
      30  # (clock resolution)
      31  CLOCK_RES = 0.020
      32  
      33  
      34  @contextlib.contextmanager
      35  def kill_on_error(proc):
      36      """Context manager killing the subprocess if a Python exception is raised."""
      37      with proc:
      38          try:
      39              yield proc
      40          except:
      41              proc.kill()
      42              raise
      43  
      44  
      45  @unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
      46  class ESC[4;38;5;81mEINTRBaseTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      47      """ Base class for EINTR tests. """
      48  
      49      # delay for initial signal delivery
      50      signal_delay = 0.1
      51      # signal delivery periodicity
      52      signal_period = 0.1
      53      # default sleep time for tests - should obviously have:
      54      # sleep_time > signal_period
      55      sleep_time = 0.2
      56  
      57      def sighandler(self, signum, frame):
      58          self.signals += 1
      59  
      60      def setUp(self):
      61          self.signals = 0
      62          self.orig_handler = signal.signal(signal.SIGALRM, self.sighandler)
      63          signal.setitimer(signal.ITIMER_REAL, self.signal_delay,
      64                           self.signal_period)
      65  
      66          # Use faulthandler as watchdog to debug when a test hangs
      67          # (timeout of 10 minutes)
      68          faulthandler.dump_traceback_later(10 * 60, exit=True,
      69                                            file=sys.__stderr__)
      70  
      71      @staticmethod
      72      def stop_alarm():
      73          signal.setitimer(signal.ITIMER_REAL, 0, 0)
      74  
      75      def tearDown(self):
      76          self.stop_alarm()
      77          signal.signal(signal.SIGALRM, self.orig_handler)
      78          faulthandler.cancel_dump_traceback_later()
      79  
      80      def subprocess(self, *args, **kw):
      81          cmd_args = (sys.executable, '-c') + args
      82          return subprocess.Popen(cmd_args, **kw)
      83  
      84      def check_elapsed_time(self, elapsed):
      85          self.assertGreaterEqual(elapsed, self.sleep_time - CLOCK_RES)
      86  
      87  
      88  @unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
      89  class ESC[4;38;5;81mOSEINTRTest(ESC[4;38;5;149mEINTRBaseTest):
      90      """ EINTR tests for the os module. """
      91  
      92      def new_sleep_process(self):
      93          code = 'import time; time.sleep(%r)' % self.sleep_time
      94          return self.subprocess(code)
      95  
      96      def _test_wait_multiple(self, wait_func):
      97          num = 3
      98          processes = [self.new_sleep_process() for _ in range(num)]
      99          for _ in range(num):
     100              wait_func()
     101          # Call the Popen method to avoid a ResourceWarning
     102          for proc in processes:
     103              proc.wait()
     104  
     105      def test_wait(self):
     106          self._test_wait_multiple(os.wait)
     107  
     108      @unittest.skipUnless(hasattr(os, 'wait3'), 'requires wait3()')
     109      def test_wait3(self):
     110          self._test_wait_multiple(lambda: os.wait3(0))
     111  
     112      def _test_wait_single(self, wait_func):
     113          proc = self.new_sleep_process()
     114          wait_func(proc.pid)
     115          # Call the Popen method to avoid a ResourceWarning
     116          proc.wait()
     117  
     118      def test_waitpid(self):
     119          self._test_wait_single(lambda pid: os.waitpid(pid, 0))
     120  
     121      @unittest.skipUnless(hasattr(os, 'wait4'), 'requires wait4()')
     122      def test_wait4(self):
     123          self._test_wait_single(lambda pid: os.wait4(pid, 0))
     124  
     125      def test_read(self):
     126          rd, wr = os.pipe()
     127          self.addCleanup(os.close, rd)
     128          # wr closed explicitly by parent
     129  
     130          # the payload below are smaller than PIPE_BUF, hence the writes will be
     131          # atomic
     132          datas = [b"hello", b"world", b"spam"]
     133  
     134          code = '\n'.join((
     135              'import os, sys, time',
     136              '',
     137              'wr = int(sys.argv[1])',
     138              'datas = %r' % datas,
     139              'sleep_time = %r' % self.sleep_time,
     140              '',
     141              'for data in datas:',
     142              '    # let the parent block on read()',
     143              '    time.sleep(sleep_time)',
     144              '    os.write(wr, data)',
     145          ))
     146  
     147          proc = self.subprocess(code, str(wr), pass_fds=[wr])
     148          with kill_on_error(proc):
     149              os.close(wr)
     150              for data in datas:
     151                  self.assertEqual(data, os.read(rd, len(data)))
     152              self.assertEqual(proc.wait(), 0)
     153  
     154      def test_write(self):
     155          rd, wr = os.pipe()
     156          self.addCleanup(os.close, wr)
     157          # rd closed explicitly by parent
     158  
     159          # we must write enough data for the write() to block
     160          data = b"x" * support.PIPE_MAX_SIZE
     161  
     162          code = '\n'.join((
     163              'import io, os, sys, time',
     164              '',
     165              'rd = int(sys.argv[1])',
     166              'sleep_time = %r' % self.sleep_time,
     167              'data = b"x" * %s' % support.PIPE_MAX_SIZE,
     168              'data_len = len(data)',
     169              '',
     170              '# let the parent block on write()',
     171              'time.sleep(sleep_time)',
     172              '',
     173              'read_data = io.BytesIO()',
     174              'while len(read_data.getvalue()) < data_len:',
     175              '    chunk = os.read(rd, 2 * data_len)',
     176              '    read_data.write(chunk)',
     177              '',
     178              'value = read_data.getvalue()',
     179              'if value != data:',
     180              '    raise Exception("read error: %s vs %s bytes"',
     181              '                    % (len(value), data_len))',
     182          ))
     183  
     184          proc = self.subprocess(code, str(rd), pass_fds=[rd])
     185          with kill_on_error(proc):
     186              os.close(rd)
     187              written = 0
     188              while written < len(data):
     189                  written += os.write(wr, memoryview(data)[written:])
     190              self.assertEqual(proc.wait(), 0)
     191  
     192  
     193  @unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
     194  class ESC[4;38;5;81mSocketEINTRTest(ESC[4;38;5;149mEINTRBaseTest):
     195      """ EINTR tests for the socket module. """
     196  
     197      @unittest.skipUnless(hasattr(socket, 'socketpair'), 'needs socketpair()')
     198      def _test_recv(self, recv_func):
     199          rd, wr = socket.socketpair()
     200          self.addCleanup(rd.close)
     201          # wr closed explicitly by parent
     202  
     203          # single-byte payload guard us against partial recv
     204          datas = [b"x", b"y", b"z"]
     205  
     206          code = '\n'.join((
     207              'import os, socket, sys, time',
     208              '',
     209              'fd = int(sys.argv[1])',
     210              'family = %s' % int(wr.family),
     211              'sock_type = %s' % int(wr.type),
     212              'datas = %r' % datas,
     213              'sleep_time = %r' % self.sleep_time,
     214              '',
     215              'wr = socket.fromfd(fd, family, sock_type)',
     216              'os.close(fd)',
     217              '',
     218              'with wr:',
     219              '    for data in datas:',
     220              '        # let the parent block on recv()',
     221              '        time.sleep(sleep_time)',
     222              '        wr.sendall(data)',
     223          ))
     224  
     225          fd = wr.fileno()
     226          proc = self.subprocess(code, str(fd), pass_fds=[fd])
     227          with kill_on_error(proc):
     228              wr.close()
     229              for data in datas:
     230                  self.assertEqual(data, recv_func(rd, len(data)))
     231              self.assertEqual(proc.wait(), 0)
     232  
     233      def test_recv(self):
     234          self._test_recv(socket.socket.recv)
     235  
     236      @unittest.skipUnless(hasattr(socket.socket, 'recvmsg'), 'needs recvmsg()')
     237      def test_recvmsg(self):
     238          self._test_recv(lambda sock, data: sock.recvmsg(data)[0])
     239  
     240      def _test_send(self, send_func):
     241          rd, wr = socket.socketpair()
     242          self.addCleanup(wr.close)
     243          # rd closed explicitly by parent
     244  
     245          # we must send enough data for the send() to block
     246          data = b"xyz" * (support.SOCK_MAX_SIZE // 3)
     247  
     248          code = '\n'.join((
     249              'import os, socket, sys, time',
     250              '',
     251              'fd = int(sys.argv[1])',
     252              'family = %s' % int(rd.family),
     253              'sock_type = %s' % int(rd.type),
     254              'sleep_time = %r' % self.sleep_time,
     255              'data = b"xyz" * %s' % (support.SOCK_MAX_SIZE // 3),
     256              'data_len = len(data)',
     257              '',
     258              'rd = socket.fromfd(fd, family, sock_type)',
     259              'os.close(fd)',
     260              '',
     261              'with rd:',
     262              '    # let the parent block on send()',
     263              '    time.sleep(sleep_time)',
     264              '',
     265              '    received_data = bytearray(data_len)',
     266              '    n = 0',
     267              '    while n < data_len:',
     268              '        n += rd.recv_into(memoryview(received_data)[n:])',
     269              '',
     270              'if received_data != data:',
     271              '    raise Exception("recv error: %s vs %s bytes"',
     272              '                    % (len(received_data), data_len))',
     273          ))
     274  
     275          fd = rd.fileno()
     276          proc = self.subprocess(code, str(fd), pass_fds=[fd])
     277          with kill_on_error(proc):
     278              rd.close()
     279              written = 0
     280              while written < len(data):
     281                  sent = send_func(wr, memoryview(data)[written:])
     282                  # sendall() returns None
     283                  written += len(data) if sent is None else sent
     284              self.assertEqual(proc.wait(), 0)
     285  
     286      def test_send(self):
     287          self._test_send(socket.socket.send)
     288  
     289      def test_sendall(self):
     290          self._test_send(socket.socket.sendall)
     291  
     292      @unittest.skipUnless(hasattr(socket.socket, 'sendmsg'), 'needs sendmsg()')
     293      def test_sendmsg(self):
     294          self._test_send(lambda sock, data: sock.sendmsg([data]))
     295  
     296      def test_accept(self):
     297          sock = socket.create_server((socket_helper.HOST, 0))
     298          self.addCleanup(sock.close)
     299          port = sock.getsockname()[1]
     300  
     301          code = '\n'.join((
     302              'import socket, time',
     303              '',
     304              'host = %r' % socket_helper.HOST,
     305              'port = %s' % port,
     306              'sleep_time = %r' % self.sleep_time,
     307              '',
     308              '# let parent block on accept()',
     309              'time.sleep(sleep_time)',
     310              'with socket.create_connection((host, port)):',
     311              '    time.sleep(sleep_time)',
     312          ))
     313  
     314          proc = self.subprocess(code)
     315          with kill_on_error(proc):
     316              client_sock, _ = sock.accept()
     317              client_sock.close()
     318              self.assertEqual(proc.wait(), 0)
     319  
     320      # Issue #25122: There is a race condition in the FreeBSD kernel on
     321      # handling signals in the FIFO device. Skip the test until the bug is
     322      # fixed in the kernel.
     323      # https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=203162
     324      @support.requires_freebsd_version(10, 3)
     325      @unittest.skipUnless(hasattr(os, 'mkfifo'), 'needs mkfifo()')
     326      def _test_open(self, do_open_close_reader, do_open_close_writer):
     327          filename = os_helper.TESTFN
     328  
     329          # Use a fifo: until the child opens it for reading, the parent will
     330          # block when trying to open it for writing.
     331          os_helper.unlink(filename)
     332          try:
     333              os.mkfifo(filename)
     334          except PermissionError as e:
     335              self.skipTest('os.mkfifo(): %s' % e)
     336          self.addCleanup(os_helper.unlink, filename)
     337  
     338          code = '\n'.join((
     339              'import os, time',
     340              '',
     341              'path = %a' % filename,
     342              'sleep_time = %r' % self.sleep_time,
     343              '',
     344              '# let the parent block',
     345              'time.sleep(sleep_time)',
     346              '',
     347              do_open_close_reader,
     348          ))
     349  
     350          proc = self.subprocess(code)
     351          with kill_on_error(proc):
     352              do_open_close_writer(filename)
     353              self.assertEqual(proc.wait(), 0)
     354  
     355      def python_open(self, path):
     356          fp = open(path, 'w')
     357          fp.close()
     358  
     359      @unittest.skipIf(sys.platform == "darwin",
     360                       "hangs under macOS; see bpo-25234, bpo-35363")
     361      def test_open(self):
     362          self._test_open("fp = open(path, 'r')\nfp.close()",
     363                          self.python_open)
     364  
     365      def os_open(self, path):
     366          fd = os.open(path, os.O_WRONLY)
     367          os.close(fd)
     368  
     369      @unittest.skipIf(sys.platform == "darwin",
     370                       "hangs under macOS; see bpo-25234, bpo-35363")
     371      def test_os_open(self):
     372          self._test_open("fd = os.open(path, os.O_RDONLY)\nos.close(fd)",
     373                          self.os_open)
     374  
     375  
     376  @unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
     377  class ESC[4;38;5;81mTimeEINTRTest(ESC[4;38;5;149mEINTRBaseTest):
     378      """ EINTR tests for the time module. """
     379  
     380      def test_sleep(self):
     381          t0 = time.monotonic()
     382          time.sleep(self.sleep_time)
     383          self.stop_alarm()
     384          dt = time.monotonic() - t0
     385          self.check_elapsed_time(dt)
     386  
     387  
     388  @unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
     389  # bpo-30320: Need pthread_sigmask() to block the signal, otherwise the test
     390  # is vulnerable to a race condition between the child and the parent processes.
     391  @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
     392                       'need signal.pthread_sigmask()')
     393  class ESC[4;38;5;81mSignalEINTRTest(ESC[4;38;5;149mEINTRBaseTest):
     394      """ EINTR tests for the signal module. """
     395  
     396      def check_sigwait(self, wait_func):
     397          signum = signal.SIGUSR1
     398          pid = os.getpid()
     399  
     400          old_handler = signal.signal(signum, lambda *args: None)
     401          self.addCleanup(signal.signal, signum, old_handler)
     402  
     403          code = '\n'.join((
     404              'import os, time',
     405              'pid = %s' % os.getpid(),
     406              'signum = %s' % int(signum),
     407              'sleep_time = %r' % self.sleep_time,
     408              'time.sleep(sleep_time)',
     409              'os.kill(pid, signum)',
     410          ))
     411  
     412          old_mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
     413          self.addCleanup(signal.pthread_sigmask, signal.SIG_UNBLOCK, [signum])
     414  
     415          t0 = time.monotonic()
     416          proc = self.subprocess(code)
     417          with kill_on_error(proc):
     418              wait_func(signum)
     419              dt = time.monotonic() - t0
     420  
     421          self.assertEqual(proc.wait(), 0)
     422  
     423      @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
     424                           'need signal.sigwaitinfo()')
     425      def test_sigwaitinfo(self):
     426          def wait_func(signum):
     427              signal.sigwaitinfo([signum])
     428  
     429          self.check_sigwait(wait_func)
     430  
     431      @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
     432                           'need signal.sigwaitinfo()')
     433      def test_sigtimedwait(self):
     434          def wait_func(signum):
     435              signal.sigtimedwait([signum], 120.0)
     436  
     437          self.check_sigwait(wait_func)
     438  
     439  
     440  @unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
     441  class ESC[4;38;5;81mSelectEINTRTest(ESC[4;38;5;149mEINTRBaseTest):
     442      """ EINTR tests for the select module. """
     443  
     444      def test_select(self):
     445          t0 = time.monotonic()
     446          select.select([], [], [], self.sleep_time)
     447          dt = time.monotonic() - t0
     448          self.stop_alarm()
     449          self.check_elapsed_time(dt)
     450  
     451      @unittest.skipIf(sys.platform == "darwin",
     452                       "poll may fail on macOS; see issue #28087")
     453      @unittest.skipUnless(hasattr(select, 'poll'), 'need select.poll')
     454      def test_poll(self):
     455          poller = select.poll()
     456  
     457          t0 = time.monotonic()
     458          poller.poll(self.sleep_time * 1e3)
     459          dt = time.monotonic() - t0
     460          self.stop_alarm()
     461          self.check_elapsed_time(dt)
     462  
     463      @unittest.skipUnless(hasattr(select, 'epoll'), 'need select.epoll')
     464      def test_epoll(self):
     465          poller = select.epoll()
     466          self.addCleanup(poller.close)
     467  
     468          t0 = time.monotonic()
     469          poller.poll(self.sleep_time)
     470          dt = time.monotonic() - t0
     471          self.stop_alarm()
     472          self.check_elapsed_time(dt)
     473  
     474      @unittest.skipUnless(hasattr(select, 'kqueue'), 'need select.kqueue')
     475      def test_kqueue(self):
     476          kqueue = select.kqueue()
     477          self.addCleanup(kqueue.close)
     478  
     479          t0 = time.monotonic()
     480          kqueue.control(None, 1, self.sleep_time)
     481          dt = time.monotonic() - t0
     482          self.stop_alarm()
     483          self.check_elapsed_time(dt)
     484  
     485      @unittest.skipUnless(hasattr(select, 'devpoll'), 'need select.devpoll')
     486      def test_devpoll(self):
     487          poller = select.devpoll()
     488          self.addCleanup(poller.close)
     489  
     490          t0 = time.monotonic()
     491          poller.poll(self.sleep_time * 1e3)
     492          dt = time.monotonic() - t0
     493          self.stop_alarm()
     494          self.check_elapsed_time(dt)
     495  
     496  
     497  class ESC[4;38;5;81mFNTLEINTRTest(ESC[4;38;5;149mEINTRBaseTest):
     498      def _lock(self, lock_func, lock_name):
     499          self.addCleanup(os_helper.unlink, os_helper.TESTFN)
     500          code = '\n'.join((
     501              "import fcntl, time",
     502              "with open('%s', 'wb') as f:" % os_helper.TESTFN,
     503              "   fcntl.%s(f, fcntl.LOCK_EX)" % lock_name,
     504              "   time.sleep(%s)" % self.sleep_time))
     505          start_time = time.monotonic()
     506          proc = self.subprocess(code)
     507          with kill_on_error(proc):
     508              with open(os_helper.TESTFN, 'wb') as f:
     509                  while True:  # synchronize the subprocess
     510                      dt = time.monotonic() - start_time
     511                      if dt > 60.0:
     512                          raise Exception("failed to sync child in %.1f sec" % dt)
     513                      try:
     514                          lock_func(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
     515                          lock_func(f, fcntl.LOCK_UN)
     516                          time.sleep(0.01)
     517                      except BlockingIOError:
     518                          break
     519                  # the child locked the file just a moment ago for 'sleep_time' seconds
     520                  # that means that the lock below will block for 'sleep_time' minus some
     521                  # potential context switch delay
     522                  lock_func(f, fcntl.LOCK_EX)
     523                  dt = time.monotonic() - start_time
     524                  self.stop_alarm()
     525                  self.check_elapsed_time(dt)
     526              proc.wait()
     527  
     528      # Issue 35633: See https://bugs.python.org/issue35633#msg333662
     529      # skip test rather than accept PermissionError from all platforms
     530      @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError")
     531      def test_lockf(self):
     532          self._lock(fcntl.lockf, "lockf")
     533  
     534      def test_flock(self):
     535          self._lock(fcntl.flock, "flock")
     536  
     537  
     538  if __name__ == "__main__":
     539      unittest.main()