(root)/
Python-3.12.0/
Lib/
test/
_test_eintr.py
       1  """
       2  This test suite exercises some system calls subject to interruption with EINTR,
       3  to check that it is actually handled transparently.
       4  It is intended to be run by the main test suite within a child process, to
       5  ensure there is no background thread running (so that signals are delivered to
       6  the correct thread).
       7  Signals are generated in-process using setitimer(ITIMER_REAL), which allows
       8  sub-second periodicity (contrarily to signal()).
       9  """
      10  
      11  import contextlib
      12  import faulthandler
      13  import fcntl
      14  import os
      15  import platform
      16  import select
      17  import signal
      18  import socket
      19  import subprocess
      20  import sys
      21  import time
      22  import unittest
      23  
      24  from test import support
      25  from test.support import os_helper
      26  from test.support import socket_helper
      27  
      28  @contextlib.contextmanager
      29  def kill_on_error(proc):
      30      """Context manager killing the subprocess if a Python exception is raised."""
      31      with proc:
      32          try:
      33              yield proc
      34          except:
      35              proc.kill()
      36              raise
      37  
      38  
      39  @unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
      40  class ESC[4;38;5;81mEINTRBaseTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      41      """ Base class for EINTR tests. """
      42  
      43      # delay for initial signal delivery
      44      signal_delay = 0.1
      45      # signal delivery periodicity
      46      signal_period = 0.1
      47      # default sleep time for tests - should obviously have:
      48      # sleep_time > signal_period
      49      sleep_time = 0.2
      50  
      51      def sighandler(self, signum, frame):
      52          self.signals += 1
      53  
      54      def setUp(self):
      55          self.signals = 0
      56          self.orig_handler = signal.signal(signal.SIGALRM, self.sighandler)
      57          signal.setitimer(signal.ITIMER_REAL, self.signal_delay,
      58                           self.signal_period)
      59  
      60          # Use faulthandler as watchdog to debug when a test hangs
      61          # (timeout of 10 minutes)
      62          faulthandler.dump_traceback_later(10 * 60, exit=True,
      63                                            file=sys.__stderr__)
      64  
      65      @staticmethod
      66      def stop_alarm():
      67          signal.setitimer(signal.ITIMER_REAL, 0, 0)
      68  
      69      def tearDown(self):
      70          self.stop_alarm()
      71          signal.signal(signal.SIGALRM, self.orig_handler)
      72          faulthandler.cancel_dump_traceback_later()
      73  
      74      def subprocess(self, *args, **kw):
      75          cmd_args = (sys.executable, '-c') + args
      76          return subprocess.Popen(cmd_args, **kw)
      77  
      78  
      79  @unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
      80  class ESC[4;38;5;81mOSEINTRTest(ESC[4;38;5;149mEINTRBaseTest):
      81      """ EINTR tests for the os module. """
      82  
      83      def new_sleep_process(self):
      84          code = 'import time; time.sleep(%r)' % self.sleep_time
      85          return self.subprocess(code)
      86  
      87      def _test_wait_multiple(self, wait_func):
      88          num = 3
      89          processes = [self.new_sleep_process() for _ in range(num)]
      90          for _ in range(num):
      91              wait_func()
      92          # Call the Popen method to avoid a ResourceWarning
      93          for proc in processes:
      94              proc.wait()
      95  
      96      def test_wait(self):
      97          self._test_wait_multiple(os.wait)
      98  
      99      @unittest.skipUnless(hasattr(os, 'wait3'), 'requires wait3()')
     100      def test_wait3(self):
     101          self._test_wait_multiple(lambda: os.wait3(0))
     102  
     103      def _test_wait_single(self, wait_func):
     104          proc = self.new_sleep_process()
     105          wait_func(proc.pid)
     106          # Call the Popen method to avoid a ResourceWarning
     107          proc.wait()
     108  
     109      def test_waitpid(self):
     110          self._test_wait_single(lambda pid: os.waitpid(pid, 0))
     111  
     112      @unittest.skipUnless(hasattr(os, 'wait4'), 'requires wait4()')
     113      def test_wait4(self):
     114          self._test_wait_single(lambda pid: os.wait4(pid, 0))
     115  
     116      def test_read(self):
     117          rd, wr = os.pipe()
     118          self.addCleanup(os.close, rd)
     119          # wr closed explicitly by parent
     120  
     121          # the payload below are smaller than PIPE_BUF, hence the writes will be
     122          # atomic
     123          datas = [b"hello", b"world", b"spam"]
     124  
     125          code = '\n'.join((
     126              'import os, sys, time',
     127              '',
     128              'wr = int(sys.argv[1])',
     129              'datas = %r' % datas,
     130              'sleep_time = %r' % self.sleep_time,
     131              '',
     132              'for data in datas:',
     133              '    # let the parent block on read()',
     134              '    time.sleep(sleep_time)',
     135              '    os.write(wr, data)',
     136          ))
     137  
     138          proc = self.subprocess(code, str(wr), pass_fds=[wr])
     139          with kill_on_error(proc):
     140              os.close(wr)
     141              for data in datas:
     142                  self.assertEqual(data, os.read(rd, len(data)))
     143              self.assertEqual(proc.wait(), 0)
     144  
     145      def test_write(self):
     146          rd, wr = os.pipe()
     147          self.addCleanup(os.close, wr)
     148          # rd closed explicitly by parent
     149  
     150          # we must write enough data for the write() to block
     151          data = b"x" * support.PIPE_MAX_SIZE
     152  
     153          code = '\n'.join((
     154              'import io, os, sys, time',
     155              '',
     156              'rd = int(sys.argv[1])',
     157              'sleep_time = %r' % self.sleep_time,
     158              'data = b"x" * %s' % support.PIPE_MAX_SIZE,
     159              'data_len = len(data)',
     160              '',
     161              '# let the parent block on write()',
     162              'time.sleep(sleep_time)',
     163              '',
     164              'read_data = io.BytesIO()',
     165              'while len(read_data.getvalue()) < data_len:',
     166              '    chunk = os.read(rd, 2 * data_len)',
     167              '    read_data.write(chunk)',
     168              '',
     169              'value = read_data.getvalue()',
     170              'if value != data:',
     171              '    raise Exception("read error: %s vs %s bytes"',
     172              '                    % (len(value), data_len))',
     173          ))
     174  
     175          proc = self.subprocess(code, str(rd), pass_fds=[rd])
     176          with kill_on_error(proc):
     177              os.close(rd)
     178              written = 0
     179              while written < len(data):
     180                  written += os.write(wr, memoryview(data)[written:])
     181              self.assertEqual(proc.wait(), 0)
     182  
     183  
     184  @unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
     185  class ESC[4;38;5;81mSocketEINTRTest(ESC[4;38;5;149mEINTRBaseTest):
     186      """ EINTR tests for the socket module. """
     187  
     188      @unittest.skipUnless(hasattr(socket, 'socketpair'), 'needs socketpair()')
     189      def _test_recv(self, recv_func):
     190          rd, wr = socket.socketpair()
     191          self.addCleanup(rd.close)
     192          # wr closed explicitly by parent
     193  
     194          # single-byte payload guard us against partial recv
     195          datas = [b"x", b"y", b"z"]
     196  
     197          code = '\n'.join((
     198              'import os, socket, sys, time',
     199              '',
     200              'fd = int(sys.argv[1])',
     201              'family = %s' % int(wr.family),
     202              'sock_type = %s' % int(wr.type),
     203              'datas = %r' % datas,
     204              'sleep_time = %r' % self.sleep_time,
     205              '',
     206              'wr = socket.fromfd(fd, family, sock_type)',
     207              'os.close(fd)',
     208              '',
     209              'with wr:',
     210              '    for data in datas:',
     211              '        # let the parent block on recv()',
     212              '        time.sleep(sleep_time)',
     213              '        wr.sendall(data)',
     214          ))
     215  
     216          fd = wr.fileno()
     217          proc = self.subprocess(code, str(fd), pass_fds=[fd])
     218          with kill_on_error(proc):
     219              wr.close()
     220              for data in datas:
     221                  self.assertEqual(data, recv_func(rd, len(data)))
     222              self.assertEqual(proc.wait(), 0)
     223  
     224      def test_recv(self):
     225          self._test_recv(socket.socket.recv)
     226  
     227      @unittest.skipUnless(hasattr(socket.socket, 'recvmsg'), 'needs recvmsg()')
     228      def test_recvmsg(self):
     229          self._test_recv(lambda sock, data: sock.recvmsg(data)[0])
     230  
     231      def _test_send(self, send_func):
     232          rd, wr = socket.socketpair()
     233          self.addCleanup(wr.close)
     234          # rd closed explicitly by parent
     235  
     236          # we must send enough data for the send() to block
     237          data = b"xyz" * (support.SOCK_MAX_SIZE // 3)
     238  
     239          code = '\n'.join((
     240              'import os, socket, sys, time',
     241              '',
     242              'fd = int(sys.argv[1])',
     243              'family = %s' % int(rd.family),
     244              'sock_type = %s' % int(rd.type),
     245              'sleep_time = %r' % self.sleep_time,
     246              'data = b"xyz" * %s' % (support.SOCK_MAX_SIZE // 3),
     247              'data_len = len(data)',
     248              '',
     249              'rd = socket.fromfd(fd, family, sock_type)',
     250              'os.close(fd)',
     251              '',
     252              'with rd:',
     253              '    # let the parent block on send()',
     254              '    time.sleep(sleep_time)',
     255              '',
     256              '    received_data = bytearray(data_len)',
     257              '    n = 0',
     258              '    while n < data_len:',
     259              '        n += rd.recv_into(memoryview(received_data)[n:])',
     260              '',
     261              'if received_data != data:',
     262              '    raise Exception("recv error: %s vs %s bytes"',
     263              '                    % (len(received_data), data_len))',
     264          ))
     265  
     266          fd = rd.fileno()
     267          proc = self.subprocess(code, str(fd), pass_fds=[fd])
     268          with kill_on_error(proc):
     269              rd.close()
     270              written = 0
     271              while written < len(data):
     272                  sent = send_func(wr, memoryview(data)[written:])
     273                  # sendall() returns None
     274                  written += len(data) if sent is None else sent
     275              self.assertEqual(proc.wait(), 0)
     276  
     277      def test_send(self):
     278          self._test_send(socket.socket.send)
     279  
     280      def test_sendall(self):
     281          self._test_send(socket.socket.sendall)
     282  
     283      @unittest.skipUnless(hasattr(socket.socket, 'sendmsg'), 'needs sendmsg()')
     284      def test_sendmsg(self):
     285          self._test_send(lambda sock, data: sock.sendmsg([data]))
     286  
     287      def test_accept(self):
     288          sock = socket.create_server((socket_helper.HOST, 0))
     289          self.addCleanup(sock.close)
     290          port = sock.getsockname()[1]
     291  
     292          code = '\n'.join((
     293              'import socket, time',
     294              '',
     295              'host = %r' % socket_helper.HOST,
     296              'port = %s' % port,
     297              'sleep_time = %r' % self.sleep_time,
     298              '',
     299              '# let parent block on accept()',
     300              'time.sleep(sleep_time)',
     301              'with socket.create_connection((host, port)):',
     302              '    time.sleep(sleep_time)',
     303          ))
     304  
     305          proc = self.subprocess(code)
     306          with kill_on_error(proc):
     307              client_sock, _ = sock.accept()
     308              client_sock.close()
     309              self.assertEqual(proc.wait(), 0)
     310  
     311      # Issue #25122: There is a race condition in the FreeBSD kernel on
     312      # handling signals in the FIFO device. Skip the test until the bug is
     313      # fixed in the kernel.
     314      # https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=203162
     315      @support.requires_freebsd_version(10, 3)
     316      @unittest.skipUnless(hasattr(os, 'mkfifo'), 'needs mkfifo()')
     317      def _test_open(self, do_open_close_reader, do_open_close_writer):
     318          filename = os_helper.TESTFN
     319  
     320          # Use a fifo: until the child opens it for reading, the parent will
     321          # block when trying to open it for writing.
     322          os_helper.unlink(filename)
     323          try:
     324              os.mkfifo(filename)
     325          except PermissionError as e:
     326              self.skipTest('os.mkfifo(): %s' % e)
     327          self.addCleanup(os_helper.unlink, filename)
     328  
     329          code = '\n'.join((
     330              'import os, time',
     331              '',
     332              'path = %a' % filename,
     333              'sleep_time = %r' % self.sleep_time,
     334              '',
     335              '# let the parent block',
     336              'time.sleep(sleep_time)',
     337              '',
     338              do_open_close_reader,
     339          ))
     340  
     341          proc = self.subprocess(code)
     342          with kill_on_error(proc):
     343              do_open_close_writer(filename)
     344              self.assertEqual(proc.wait(), 0)
     345  
     346      def python_open(self, path):
     347          fp = open(path, 'w')
     348          fp.close()
     349  
     350      @unittest.skipIf(sys.platform == "darwin",
     351                       "hangs under macOS; see bpo-25234, bpo-35363")
     352      def test_open(self):
     353          self._test_open("fp = open(path, 'r')\nfp.close()",
     354                          self.python_open)
     355  
     356      def os_open(self, path):
     357          fd = os.open(path, os.O_WRONLY)
     358          os.close(fd)
     359  
     360      @unittest.skipIf(sys.platform == "darwin",
     361                       "hangs under macOS; see bpo-25234, bpo-35363")
     362      def test_os_open(self):
     363          self._test_open("fd = os.open(path, os.O_RDONLY)\nos.close(fd)",
     364                          self.os_open)
     365  
     366  
     367  @unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
     368  class ESC[4;38;5;81mTimeEINTRTest(ESC[4;38;5;149mEINTRBaseTest):
     369      """ EINTR tests for the time module. """
     370  
     371      def test_sleep(self):
     372          t0 = time.monotonic()
     373          time.sleep(self.sleep_time)
     374          self.stop_alarm()
     375          dt = time.monotonic() - t0
     376          self.assertGreaterEqual(dt, self.sleep_time)
     377  
     378  
     379  @unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
     380  # bpo-30320: Need pthread_sigmask() to block the signal, otherwise the test
     381  # is vulnerable to a race condition between the child and the parent processes.
     382  @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
     383                       'need signal.pthread_sigmask()')
     384  class ESC[4;38;5;81mSignalEINTRTest(ESC[4;38;5;149mEINTRBaseTest):
     385      """ EINTR tests for the signal module. """
     386  
     387      def check_sigwait(self, wait_func):
     388          signum = signal.SIGUSR1
     389          pid = os.getpid()
     390  
     391          old_handler = signal.signal(signum, lambda *args: None)
     392          self.addCleanup(signal.signal, signum, old_handler)
     393  
     394          code = '\n'.join((
     395              'import os, time',
     396              'pid = %s' % os.getpid(),
     397              'signum = %s' % int(signum),
     398              'sleep_time = %r' % self.sleep_time,
     399              'time.sleep(sleep_time)',
     400              'os.kill(pid, signum)',
     401          ))
     402  
     403          old_mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
     404          self.addCleanup(signal.pthread_sigmask, signal.SIG_UNBLOCK, [signum])
     405  
     406          proc = self.subprocess(code)
     407          with kill_on_error(proc):
     408              wait_func(signum)
     409  
     410          self.assertEqual(proc.wait(), 0)
     411  
     412      @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
     413                           'need signal.sigwaitinfo()')
     414      def test_sigwaitinfo(self):
     415          def wait_func(signum):
     416              signal.sigwaitinfo([signum])
     417  
     418          self.check_sigwait(wait_func)
     419  
     420      @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
     421                           'need signal.sigwaitinfo()')
     422      def test_sigtimedwait(self):
     423          def wait_func(signum):
     424              signal.sigtimedwait([signum], 120.0)
     425  
     426          self.check_sigwait(wait_func)
     427  
     428  
     429  @unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
     430  class ESC[4;38;5;81mSelectEINTRTest(ESC[4;38;5;149mEINTRBaseTest):
     431      """ EINTR tests for the select module. """
     432  
     433      def test_select(self):
     434          t0 = time.monotonic()
     435          select.select([], [], [], self.sleep_time)
     436          dt = time.monotonic() - t0
     437          self.stop_alarm()
     438          self.assertGreaterEqual(dt, self.sleep_time)
     439  
     440      @unittest.skipIf(sys.platform == "darwin",
     441                       "poll may fail on macOS; see issue #28087")
     442      @unittest.skipUnless(hasattr(select, 'poll'), 'need select.poll')
     443      def test_poll(self):
     444          poller = select.poll()
     445  
     446          t0 = time.monotonic()
     447          poller.poll(self.sleep_time * 1e3)
     448          dt = time.monotonic() - t0
     449          self.stop_alarm()
     450          self.assertGreaterEqual(dt, self.sleep_time)
     451  
     452      @unittest.skipUnless(hasattr(select, 'epoll'), 'need select.epoll')
     453      def test_epoll(self):
     454          poller = select.epoll()
     455          self.addCleanup(poller.close)
     456  
     457          t0 = time.monotonic()
     458          poller.poll(self.sleep_time)
     459          dt = time.monotonic() - t0
     460          self.stop_alarm()
     461          self.assertGreaterEqual(dt, self.sleep_time)
     462  
     463      @unittest.skipUnless(hasattr(select, 'kqueue'), 'need select.kqueue')
     464      def test_kqueue(self):
     465          kqueue = select.kqueue()
     466          self.addCleanup(kqueue.close)
     467  
     468          t0 = time.monotonic()
     469          kqueue.control(None, 1, self.sleep_time)
     470          dt = time.monotonic() - t0
     471          self.stop_alarm()
     472          self.assertGreaterEqual(dt, self.sleep_time)
     473  
     474      @unittest.skipUnless(hasattr(select, 'devpoll'), 'need select.devpoll')
     475      def test_devpoll(self):
     476          poller = select.devpoll()
     477          self.addCleanup(poller.close)
     478  
     479          t0 = time.monotonic()
     480          poller.poll(self.sleep_time * 1e3)
     481          dt = time.monotonic() - t0
     482          self.stop_alarm()
     483          self.assertGreaterEqual(dt, self.sleep_time)
     484  
     485  
     486  class ESC[4;38;5;81mFNTLEINTRTest(ESC[4;38;5;149mEINTRBaseTest):
     487      def _lock(self, lock_func, lock_name):
     488          self.addCleanup(os_helper.unlink, os_helper.TESTFN)
     489          code = '\n'.join((
     490              "import fcntl, time",
     491              "with open('%s', 'wb') as f:" % os_helper.TESTFN,
     492              "   fcntl.%s(f, fcntl.LOCK_EX)" % lock_name,
     493              "   time.sleep(%s)" % self.sleep_time))
     494          start_time = time.monotonic()
     495          proc = self.subprocess(code)
     496          with kill_on_error(proc):
     497              with open(os_helper.TESTFN, 'wb') as f:
     498                  # synchronize the subprocess
     499                  start_time = time.monotonic()
     500                  for _ in support.sleeping_retry(support.LONG_TIMEOUT, error=False):
     501                      try:
     502                          lock_func(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
     503                          lock_func(f, fcntl.LOCK_UN)
     504                      except BlockingIOError:
     505                          break
     506                  else:
     507                      dt = time.monotonic() - start_time
     508                      raise Exception("failed to sync child in %.1f sec" % dt)
     509  
     510                  # the child locked the file just a moment ago for 'sleep_time' seconds
     511                  # that means that the lock below will block for 'sleep_time' minus some
     512                  # potential context switch delay
     513                  lock_func(f, fcntl.LOCK_EX)
     514                  dt = time.monotonic() - start_time
     515                  self.assertGreaterEqual(dt, self.sleep_time)
     516                  self.stop_alarm()
     517              proc.wait()
     518  
     519      # Issue 35633: See https://bugs.python.org/issue35633#msg333662
     520      # skip test rather than accept PermissionError from all platforms
     521      @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError")
     522      def test_lockf(self):
     523          self._lock(fcntl.lockf, "lockf")
     524  
     525      def test_flock(self):
     526          self._lock(fcntl.flock, "flock")
     527  
     528  
     529  if __name__ == "__main__":
     530      unittest.main()