(root)/
Python-3.11.7/
Lib/
test/
test_poll.py
       1  # Test case for the os.poll() function
       2  
       3  import os
       4  import subprocess
       5  import random
       6  import select
       7  import threading
       8  import time
       9  import unittest
      10  from test.support import (
      11      cpython_only, requires_subprocess, requires_working_socket, requires_resource
      12  )
      13  from test.support import threading_helper
      14  from test.support.os_helper import TESTFN
      15  
      16  
      17  try:
      18      select.poll
      19  except AttributeError:
      20      raise unittest.SkipTest("select.poll not defined")
      21  
      22  requires_working_socket(module=True)
      23  
      24  def find_ready_matching(ready, flag):
      25      match = []
      26      for fd, mode in ready:
      27          if mode & flag:
      28              match.append(fd)
      29      return match
      30  
      31  class ESC[4;38;5;81mPollTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      32  
      33      def test_poll1(self):
      34          # Basic functional test of poll object
      35          # Create a bunch of pipe and test that poll works with them.
      36  
      37          p = select.poll()
      38  
      39          NUM_PIPES = 12
      40          MSG = b" This is a test."
      41          MSG_LEN = len(MSG)
      42          readers = []
      43          writers = []
      44          r2w = {}
      45          w2r = {}
      46  
      47          for i in range(NUM_PIPES):
      48              rd, wr = os.pipe()
      49              p.register(rd)
      50              p.modify(rd, select.POLLIN)
      51              p.register(wr, select.POLLOUT)
      52              readers.append(rd)
      53              writers.append(wr)
      54              r2w[rd] = wr
      55              w2r[wr] = rd
      56  
      57          bufs = []
      58  
      59          while writers:
      60              ready = p.poll()
      61              ready_writers = find_ready_matching(ready, select.POLLOUT)
      62              if not ready_writers:
      63                  raise RuntimeError("no pipes ready for writing")
      64              wr = random.choice(ready_writers)
      65              os.write(wr, MSG)
      66  
      67              ready = p.poll()
      68              ready_readers = find_ready_matching(ready, select.POLLIN)
      69              if not ready_readers:
      70                  raise RuntimeError("no pipes ready for reading")
      71              rd = random.choice(ready_readers)
      72              buf = os.read(rd, MSG_LEN)
      73              self.assertEqual(len(buf), MSG_LEN)
      74              bufs.append(buf)
      75              os.close(r2w[rd]) ; os.close( rd )
      76              p.unregister( r2w[rd] )
      77              p.unregister( rd )
      78              writers.remove(r2w[rd])
      79  
      80          self.assertEqual(bufs, [MSG] * NUM_PIPES)
      81  
      82      def test_poll_unit_tests(self):
      83          # returns NVAL for invalid file descriptor
      84          FD, w = os.pipe()
      85          os.close(FD)
      86          os.close(w)
      87          p = select.poll()
      88          p.register(FD)
      89          r = p.poll()
      90          self.assertEqual(r[0], (FD, select.POLLNVAL))
      91  
      92          with open(TESTFN, 'w') as f:
      93              fd = f.fileno()
      94              p = select.poll()
      95              p.register(f)
      96              r = p.poll()
      97              self.assertEqual(r[0][0], fd)
      98          r = p.poll()
      99          self.assertEqual(r[0], (fd, select.POLLNVAL))
     100          os.unlink(TESTFN)
     101  
     102          # type error for invalid arguments
     103          p = select.poll()
     104          self.assertRaises(TypeError, p.register, p)
     105          self.assertRaises(TypeError, p.unregister, p)
     106  
     107          # can't unregister non-existent object
     108          p = select.poll()
     109          self.assertRaises(KeyError, p.unregister, 3)
     110  
     111          # Test error cases
     112          pollster = select.poll()
     113          class ESC[4;38;5;81mNope:
     114              pass
     115  
     116          class ESC[4;38;5;81mAlmost:
     117              def fileno(self):
     118                  return 'fileno'
     119  
     120          self.assertRaises(TypeError, pollster.register, Nope(), 0)
     121          self.assertRaises(TypeError, pollster.register, Almost(), 0)
     122  
     123      # Another test case for poll().  This is copied from the test case for
     124      # select(), modified to use poll() instead.
     125  
     126      @requires_subprocess()
     127      @requires_resource('walltime')
     128      def test_poll2(self):
     129          cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
     130          proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
     131                                  bufsize=0)
     132          self.enterContext(proc)
     133          p = proc.stdout
     134          pollster = select.poll()
     135          pollster.register( p, select.POLLIN )
     136          for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
     137              fdlist = pollster.poll(tout)
     138              if (fdlist == []):
     139                  continue
     140              fd, flags = fdlist[0]
     141              if flags & select.POLLHUP:
     142                  line = p.readline()
     143                  if line != b"":
     144                      self.fail('error: pipe seems to be closed, but still returns data')
     145                  continue
     146  
     147              elif flags & select.POLLIN:
     148                  line = p.readline()
     149                  if not line:
     150                      break
     151                  self.assertEqual(line, b'testing...\n')
     152                  continue
     153              else:
     154                  self.fail('Unexpected return value from select.poll: %s' % fdlist)
     155  
     156      def test_poll3(self):
     157          # test int overflow
     158          pollster = select.poll()
     159          pollster.register(1)
     160  
     161          self.assertRaises(OverflowError, pollster.poll, 1 << 64)
     162  
     163          x = 2 + 3
     164          if x != 5:
     165              self.fail('Overflow must have occurred')
     166  
     167          # Issues #15989, #17919
     168          self.assertRaises(ValueError, pollster.register, 0, -1)
     169          self.assertRaises(OverflowError, pollster.register, 0, 1 << 64)
     170          self.assertRaises(ValueError, pollster.modify, 1, -1)
     171          self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64)
     172  
     173      @cpython_only
     174      def test_poll_c_limits(self):
     175          from _testcapi import USHRT_MAX, INT_MAX, UINT_MAX
     176          pollster = select.poll()
     177          pollster.register(1)
     178  
     179          # Issues #15989, #17919
     180          self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
     181          self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
     182          self.assertRaises(OverflowError, pollster.poll, INT_MAX + 1)
     183          self.assertRaises(OverflowError, pollster.poll, UINT_MAX + 1)
     184  
     185      @threading_helper.reap_threads
     186      def test_threaded_poll(self):
     187          r, w = os.pipe()
     188          self.addCleanup(os.close, r)
     189          self.addCleanup(os.close, w)
     190          rfds = []
     191          for i in range(10):
     192              fd = os.dup(r)
     193              self.addCleanup(os.close, fd)
     194              rfds.append(fd)
     195          pollster = select.poll()
     196          for fd in rfds:
     197              pollster.register(fd, select.POLLIN)
     198  
     199          t = threading.Thread(target=pollster.poll)
     200          t.start()
     201          try:
     202              time.sleep(0.5)
     203              # trigger ufds array reallocation
     204              for fd in rfds:
     205                  pollster.unregister(fd)
     206              pollster.register(w, select.POLLOUT)
     207              self.assertRaises(RuntimeError, pollster.poll)
     208          finally:
     209              # and make the call to poll() from the thread return
     210              os.write(w, b'spam')
     211              t.join()
     212  
     213      @unittest.skipUnless(threading, 'Threading required for this test.')
     214      @threading_helper.reap_threads
     215      def test_poll_blocks_with_negative_ms(self):
     216          for timeout_ms in [None, -1000, -1, -1.0, -0.1, -1e-100]:
     217              # Create two file descriptors. This will be used to unlock
     218              # the blocking call to poll.poll inside the thread
     219              r, w = os.pipe()
     220              pollster = select.poll()
     221              pollster.register(r, select.POLLIN)
     222  
     223              poll_thread = threading.Thread(target=pollster.poll, args=(timeout_ms,))
     224              poll_thread.start()
     225              poll_thread.join(timeout=0.1)
     226              self.assertTrue(poll_thread.is_alive())
     227  
     228              # Write to the pipe so pollster.poll unblocks and the thread ends.
     229              os.write(w, b'spam')
     230              poll_thread.join()
     231              self.assertFalse(poll_thread.is_alive())
     232              os.close(r)
     233              os.close(w)
     234  
     235  
     236  if __name__ == '__main__':
     237      unittest.main()