python (3.11.7)

(root)/
lib/
python3.11/
test/
test_kqueue.py
       1  """
       2  Tests for kqueue wrapper.
       3  """
       4  import errno
       5  import os
       6  import select
       7  import socket
       8  import time
       9  import unittest
      10  
      11  if not hasattr(select, "kqueue"):
      12      raise unittest.SkipTest("test works only on BSD")
      13  
      14  class ESC[4;38;5;81mTestKQueue(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      15      def test_create_queue(self):
      16          kq = select.kqueue()
      17          self.assertTrue(kq.fileno() > 0, kq.fileno())
      18          self.assertTrue(not kq.closed)
      19          kq.close()
      20          self.assertTrue(kq.closed)
      21          self.assertRaises(ValueError, kq.fileno)
      22  
      23      def test_create_event(self):
      24          from operator import lt, le, gt, ge
      25  
      26          fd = os.open(os.devnull, os.O_WRONLY)
      27          self.addCleanup(os.close, fd)
      28  
      29          ev = select.kevent(fd)
      30          other = select.kevent(1000)
      31          self.assertEqual(ev.ident, fd)
      32          self.assertEqual(ev.filter, select.KQ_FILTER_READ)
      33          self.assertEqual(ev.flags, select.KQ_EV_ADD)
      34          self.assertEqual(ev.fflags, 0)
      35          self.assertEqual(ev.data, 0)
      36          self.assertEqual(ev.udata, 0)
      37          self.assertEqual(ev, ev)
      38          self.assertNotEqual(ev, other)
      39          self.assertTrue(ev < other)
      40          self.assertTrue(other >= ev)
      41          for op in lt, le, gt, ge:
      42              self.assertRaises(TypeError, op, ev, None)
      43              self.assertRaises(TypeError, op, ev, 1)
      44              self.assertRaises(TypeError, op, ev, "ev")
      45  
      46          ev = select.kevent(fd, select.KQ_FILTER_WRITE)
      47          self.assertEqual(ev.ident, fd)
      48          self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
      49          self.assertEqual(ev.flags, select.KQ_EV_ADD)
      50          self.assertEqual(ev.fflags, 0)
      51          self.assertEqual(ev.data, 0)
      52          self.assertEqual(ev.udata, 0)
      53          self.assertEqual(ev, ev)
      54          self.assertNotEqual(ev, other)
      55  
      56          ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT)
      57          self.assertEqual(ev.ident, fd)
      58          self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
      59          self.assertEqual(ev.flags, select.KQ_EV_ONESHOT)
      60          self.assertEqual(ev.fflags, 0)
      61          self.assertEqual(ev.data, 0)
      62          self.assertEqual(ev.udata, 0)
      63          self.assertEqual(ev, ev)
      64          self.assertNotEqual(ev, other)
      65  
      66          ev = select.kevent(1, 2, 3, 4, 5, 6)
      67          self.assertEqual(ev.ident, 1)
      68          self.assertEqual(ev.filter, 2)
      69          self.assertEqual(ev.flags, 3)
      70          self.assertEqual(ev.fflags, 4)
      71          self.assertEqual(ev.data, 5)
      72          self.assertEqual(ev.udata, 6)
      73          self.assertEqual(ev, ev)
      74          self.assertNotEqual(ev, other)
      75  
      76          bignum = 0x7fff
      77          ev = select.kevent(bignum, 1, 2, 3, bignum - 1, bignum)
      78          self.assertEqual(ev.ident, bignum)
      79          self.assertEqual(ev.filter, 1)
      80          self.assertEqual(ev.flags, 2)
      81          self.assertEqual(ev.fflags, 3)
      82          self.assertEqual(ev.data, bignum - 1)
      83          self.assertEqual(ev.udata, bignum)
      84          self.assertEqual(ev, ev)
      85          self.assertNotEqual(ev, other)
      86  
      87          # Issue 11973
      88          bignum = 0xffff
      89          ev = select.kevent(0, 1, bignum)
      90          self.assertEqual(ev.ident, 0)
      91          self.assertEqual(ev.filter, 1)
      92          self.assertEqual(ev.flags, bignum)
      93          self.assertEqual(ev.fflags, 0)
      94          self.assertEqual(ev.data, 0)
      95          self.assertEqual(ev.udata, 0)
      96          self.assertEqual(ev, ev)
      97          self.assertNotEqual(ev, other)
      98  
      99          # Issue 11973
     100          bignum = 0xffffffff
     101          ev = select.kevent(0, 1, 2, bignum)
     102          self.assertEqual(ev.ident, 0)
     103          self.assertEqual(ev.filter, 1)
     104          self.assertEqual(ev.flags, 2)
     105          self.assertEqual(ev.fflags, bignum)
     106          self.assertEqual(ev.data, 0)
     107          self.assertEqual(ev.udata, 0)
     108          self.assertEqual(ev, ev)
     109          self.assertNotEqual(ev, other)
     110  
     111  
     112      def test_queue_event(self):
     113          serverSocket = socket.create_server(('127.0.0.1', 0))
     114          client = socket.socket()
     115          client.setblocking(False)
     116          try:
     117              client.connect(('127.0.0.1', serverSocket.getsockname()[1]))
     118          except OSError as e:
     119              self.assertEqual(e.args[0], errno.EINPROGRESS)
     120          else:
     121              #raise AssertionError("Connect should have raised EINPROGRESS")
     122              pass # FreeBSD doesn't raise an exception here
     123          server, addr = serverSocket.accept()
     124  
     125          kq = select.kqueue()
     126          kq2 = select.kqueue.fromfd(kq.fileno())
     127  
     128          ev = select.kevent(server.fileno(),
     129                             select.KQ_FILTER_WRITE,
     130                             select.KQ_EV_ADD | select.KQ_EV_ENABLE)
     131          kq.control([ev], 0)
     132          ev = select.kevent(server.fileno(),
     133                             select.KQ_FILTER_READ,
     134                             select.KQ_EV_ADD | select.KQ_EV_ENABLE)
     135          kq.control([ev], 0)
     136          ev = select.kevent(client.fileno(),
     137                             select.KQ_FILTER_WRITE,
     138                             select.KQ_EV_ADD | select.KQ_EV_ENABLE)
     139          kq2.control([ev], 0)
     140          ev = select.kevent(client.fileno(),
     141                             select.KQ_FILTER_READ,
     142                             select.KQ_EV_ADD | select.KQ_EV_ENABLE)
     143          kq2.control([ev], 0)
     144  
     145          events = kq.control(None, 4, 1)
     146          events = set((e.ident, e.filter) for e in events)
     147          self.assertEqual(events, set([
     148              (client.fileno(), select.KQ_FILTER_WRITE),
     149              (server.fileno(), select.KQ_FILTER_WRITE)]))
     150  
     151          client.send(b"Hello!")
     152          server.send(b"world!!!")
     153  
     154          # We may need to call it several times
     155          for i in range(10):
     156              events = kq.control(None, 4, 1)
     157              if len(events) == 4:
     158                  break
     159              time.sleep(1.0)
     160          else:
     161              self.fail('timeout waiting for event notifications')
     162  
     163          events = set((e.ident, e.filter) for e in events)
     164          self.assertEqual(events, set([
     165              (client.fileno(), select.KQ_FILTER_WRITE),
     166              (client.fileno(), select.KQ_FILTER_READ),
     167              (server.fileno(), select.KQ_FILTER_WRITE),
     168              (server.fileno(), select.KQ_FILTER_READ)]))
     169  
     170          # Remove completely client, and server read part
     171          ev = select.kevent(client.fileno(),
     172                             select.KQ_FILTER_WRITE,
     173                             select.KQ_EV_DELETE)
     174          kq.control([ev], 0)
     175          ev = select.kevent(client.fileno(),
     176                             select.KQ_FILTER_READ,
     177                             select.KQ_EV_DELETE)
     178          kq.control([ev], 0)
     179          ev = select.kevent(server.fileno(),
     180                             select.KQ_FILTER_READ,
     181                             select.KQ_EV_DELETE)
     182          kq.control([ev], 0, 0)
     183  
     184          events = kq.control([], 4, 0.99)
     185          events = set((e.ident, e.filter) for e in events)
     186          self.assertEqual(events, set([
     187              (server.fileno(), select.KQ_FILTER_WRITE)]))
     188  
     189          client.close()
     190          server.close()
     191          serverSocket.close()
     192  
     193      def testPair(self):
     194          kq = select.kqueue()
     195          a, b = socket.socketpair()
     196  
     197          a.send(b'foo')
     198          event1 = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
     199          event2 = select.kevent(b, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
     200          r = kq.control([event1, event2], 1, 1)
     201          self.assertTrue(r)
     202          self.assertFalse(r[0].flags & select.KQ_EV_ERROR)
     203          self.assertEqual(b.recv(r[0].data), b'foo')
     204  
     205          a.close()
     206          b.close()
     207          kq.close()
     208  
     209      def test_issue30058(self):
     210          # changelist must be an iterable
     211          kq = select.kqueue()
     212          a, b = socket.socketpair()
     213          ev = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
     214  
     215          kq.control([ev], 0)
     216          # not a list
     217          kq.control((ev,), 0)
     218          # __len__ is not consistent with __iter__
     219          class ESC[4;38;5;81mBadList:
     220              def __len__(self):
     221                  return 0
     222              def __iter__(self):
     223                  for i in range(100):
     224                      yield ev
     225          kq.control(BadList(), 0)
     226          # doesn't have __len__
     227          kq.control(iter([ev]), 0)
     228  
     229          a.close()
     230          b.close()
     231          kq.close()
     232  
     233      def test_close(self):
     234          open_file = open(__file__, "rb")
     235          self.addCleanup(open_file.close)
     236          fd = open_file.fileno()
     237          kqueue = select.kqueue()
     238  
     239          # test fileno() method and closed attribute
     240          self.assertIsInstance(kqueue.fileno(), int)
     241          self.assertFalse(kqueue.closed)
     242  
     243          # test close()
     244          kqueue.close()
     245          self.assertTrue(kqueue.closed)
     246          self.assertRaises(ValueError, kqueue.fileno)
     247  
     248          # close() can be called more than once
     249          kqueue.close()
     250  
     251          # operations must fail with ValueError("I/O operation on closed ...")
     252          self.assertRaises(ValueError, kqueue.control, None, 4)
     253  
     254      def test_fd_non_inheritable(self):
     255          kqueue = select.kqueue()
     256          self.addCleanup(kqueue.close)
     257          self.assertEqual(os.get_inheritable(kqueue.fileno()), False)
     258  
     259  
     260  if __name__ == "__main__":
     261      unittest.main()