(root)/
Python-3.11.7/
Lib/
test/
test_devpoll.py
       1  # Test case for the select.devpoll() function
       2  
       3  # Initial tests are copied as is from "test_poll.py"
       4  
       5  import os
       6  import random
       7  import select
       8  import unittest
       9  from test.support import cpython_only
      10  
      11  if not hasattr(select, 'devpoll') :
      12      raise unittest.SkipTest('test works only on Solaris OS family')
      13  
      14  
      15  def find_ready_matching(ready, flag):
      16      match = []
      17      for fd, mode in ready:
      18          if mode & flag:
      19              match.append(fd)
      20      return match
      21  
      22  class ESC[4;38;5;81mDevPollTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      23  
      24      def test_devpoll1(self):
      25          # Basic functional test of poll object
      26          # Create a bunch of pipe and test that poll works with them.
      27  
      28          p = select.devpoll()
      29  
      30          NUM_PIPES = 12
      31          MSG = b" This is a test."
      32          MSG_LEN = len(MSG)
      33          readers = []
      34          writers = []
      35          r2w = {}
      36          w2r = {}
      37  
      38          for i in range(NUM_PIPES):
      39              rd, wr = os.pipe()
      40              p.register(rd)
      41              p.modify(rd, select.POLLIN)
      42              p.register(wr, select.POLLOUT)
      43              readers.append(rd)
      44              writers.append(wr)
      45              r2w[rd] = wr
      46              w2r[wr] = rd
      47  
      48          bufs = []
      49  
      50          while writers:
      51              ready = p.poll()
      52              ready_writers = find_ready_matching(ready, select.POLLOUT)
      53              if not ready_writers:
      54                  self.fail("no pipes ready for writing")
      55              wr = random.choice(ready_writers)
      56              os.write(wr, MSG)
      57  
      58              ready = p.poll()
      59              ready_readers = find_ready_matching(ready, select.POLLIN)
      60              if not ready_readers:
      61                  self.fail("no pipes ready for reading")
      62              self.assertEqual([w2r[wr]], ready_readers)
      63              rd = ready_readers[0]
      64              buf = os.read(rd, MSG_LEN)
      65              self.assertEqual(len(buf), MSG_LEN)
      66              bufs.append(buf)
      67              os.close(r2w[rd]) ; os.close(rd)
      68              p.unregister(r2w[rd])
      69              p.unregister(rd)
      70              writers.remove(r2w[rd])
      71  
      72          self.assertEqual(bufs, [MSG] * NUM_PIPES)
      73  
      74      def test_timeout_overflow(self):
      75          pollster = select.devpoll()
      76          w, r = os.pipe()
      77          pollster.register(w)
      78  
      79          pollster.poll(-1)
      80          self.assertRaises(OverflowError, pollster.poll, -2)
      81          self.assertRaises(OverflowError, pollster.poll, -1 << 31)
      82          self.assertRaises(OverflowError, pollster.poll, -1 << 64)
      83  
      84          pollster.poll(0)
      85          pollster.poll(1)
      86          pollster.poll(1 << 30)
      87          self.assertRaises(OverflowError, pollster.poll, 1 << 31)
      88          self.assertRaises(OverflowError, pollster.poll, 1 << 63)
      89          self.assertRaises(OverflowError, pollster.poll, 1 << 64)
      90  
      91      def test_close(self):
      92          open_file = open(__file__, "rb")
      93          self.addCleanup(open_file.close)
      94          fd = open_file.fileno()
      95          devpoll = select.devpoll()
      96  
      97          # test fileno() method and closed attribute
      98          self.assertIsInstance(devpoll.fileno(), int)
      99          self.assertFalse(devpoll.closed)
     100  
     101          # test close()
     102          devpoll.close()
     103          self.assertTrue(devpoll.closed)
     104          self.assertRaises(ValueError, devpoll.fileno)
     105  
     106          # close() can be called more than once
     107          devpoll.close()
     108  
     109          # operations must fail with ValueError("I/O operation on closed ...")
     110          self.assertRaises(ValueError, devpoll.modify, fd, select.POLLIN)
     111          self.assertRaises(ValueError, devpoll.poll)
     112          self.assertRaises(ValueError, devpoll.register, fd, select.POLLIN)
     113          self.assertRaises(ValueError, devpoll.unregister, fd)
     114  
     115      def test_fd_non_inheritable(self):
     116          devpoll = select.devpoll()
     117          self.addCleanup(devpoll.close)
     118          self.assertEqual(os.get_inheritable(devpoll.fileno()), False)
     119  
     120      def test_events_mask_overflow(self):
     121          pollster = select.devpoll()
     122          w, r = os.pipe()
     123          pollster.register(w)
     124          # Issue #17919
     125          self.assertRaises(ValueError, pollster.register, 0, -1)
     126          self.assertRaises(OverflowError, pollster.register, 0, 1 << 64)
     127          self.assertRaises(ValueError, pollster.modify, 1, -1)
     128          self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64)
     129  
     130      @cpython_only
     131      def test_events_mask_overflow_c_limits(self):
     132          from _testcapi import USHRT_MAX
     133          pollster = select.devpoll()
     134          w, r = os.pipe()
     135          pollster.register(w)
     136          # Issue #17919
     137          self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
     138          self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
     139  
     140  
     141  if __name__ == '__main__':
     142      unittest.main()