(root)/
Python-3.11.7/
Lib/
test/
test_epoll.py
       1  # Copyright (c) 2001-2006 Twisted Matrix Laboratories.
       2  #
       3  # Permission is hereby granted, free of charge, to any person obtaining
       4  # a copy of this software and associated documentation files (the
       5  # "Software"), to deal in the Software without restriction, including
       6  # without limitation the rights to use, copy, modify, merge, publish,
       7  # distribute, sublicense, and/or sell copies of the Software, and to
       8  # permit persons to whom the Software is furnished to do so, subject to
       9  # the following conditions:
      10  #
      11  # The above copyright notice and this permission notice shall be
      12  # included in all copies or substantial portions of the Software.
      13  #
      14  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
      15  # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
      16  # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
      17  # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
      18  # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
      19  # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
      20  # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
      21  """
      22  Tests for epoll wrapper.
      23  """
      24  import errno
      25  import os
      26  import select
      27  import socket
      28  import time
      29  import unittest
      30  
      31  if not hasattr(select, "epoll"):
      32      raise unittest.SkipTest("test works only on Linux 2.6")
      33  
      34  try:
      35      select.epoll()
      36  except OSError as e:
      37      if e.errno == errno.ENOSYS:
      38          raise unittest.SkipTest("kernel doesn't support epoll()")
      39      raise
      40  
      41  class ESC[4;38;5;81mTestEPoll(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      42  
      43      def setUp(self):
      44          self.serverSocket = socket.create_server(('127.0.0.1', 0))
      45          self.connections = [self.serverSocket]
      46  
      47      def tearDown(self):
      48          for skt in self.connections:
      49              skt.close()
      50  
      51      def _connected_pair(self):
      52          client = socket.socket()
      53          client.setblocking(False)
      54          try:
      55              client.connect(('127.0.0.1', self.serverSocket.getsockname()[1]))
      56          except OSError as e:
      57              self.assertEqual(e.args[0], errno.EINPROGRESS)
      58          else:
      59              raise AssertionError("Connect should have raised EINPROGRESS")
      60          server, addr = self.serverSocket.accept()
      61  
      62          self.connections.extend((client, server))
      63          return client, server
      64  
      65      def test_create(self):
      66          try:
      67              ep = select.epoll(16)
      68          except OSError as e:
      69              raise AssertionError(str(e))
      70          self.assertTrue(ep.fileno() > 0, ep.fileno())
      71          self.assertTrue(not ep.closed)
      72          ep.close()
      73          self.assertTrue(ep.closed)
      74          self.assertRaises(ValueError, ep.fileno)
      75  
      76          if hasattr(select, "EPOLL_CLOEXEC"):
      77              select.epoll(-1, select.EPOLL_CLOEXEC).close()
      78              select.epoll(flags=select.EPOLL_CLOEXEC).close()
      79              select.epoll(flags=0).close()
      80  
      81      def test_badcreate(self):
      82          self.assertRaises(TypeError, select.epoll, 1, 2, 3)
      83          self.assertRaises(TypeError, select.epoll, 'foo')
      84          self.assertRaises(TypeError, select.epoll, None)
      85          self.assertRaises(TypeError, select.epoll, ())
      86          self.assertRaises(TypeError, select.epoll, ['foo'])
      87          self.assertRaises(TypeError, select.epoll, {})
      88  
      89          self.assertRaises(ValueError, select.epoll, 0)
      90          self.assertRaises(ValueError, select.epoll, -2)
      91          self.assertRaises(ValueError, select.epoll, sizehint=-2)
      92  
      93          if hasattr(select, "EPOLL_CLOEXEC"):
      94              self.assertRaises(OSError, select.epoll, flags=12356)
      95  
      96      def test_context_manager(self):
      97          with select.epoll(16) as ep:
      98              self.assertGreater(ep.fileno(), 0)
      99              self.assertFalse(ep.closed)
     100          self.assertTrue(ep.closed)
     101          self.assertRaises(ValueError, ep.fileno)
     102  
     103      def test_add(self):
     104          server, client = self._connected_pair()
     105  
     106          ep = select.epoll(2)
     107          try:
     108              ep.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
     109              ep.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
     110          finally:
     111              ep.close()
     112  
     113          # adding by object w/ fileno works, too.
     114          ep = select.epoll(2)
     115          try:
     116              ep.register(server, select.EPOLLIN | select.EPOLLOUT)
     117              ep.register(client, select.EPOLLIN | select.EPOLLOUT)
     118          finally:
     119              ep.close()
     120  
     121          ep = select.epoll(2)
     122          try:
     123              # TypeError: argument must be an int, or have a fileno() method.
     124              self.assertRaises(TypeError, ep.register, object(),
     125                                select.EPOLLIN | select.EPOLLOUT)
     126              self.assertRaises(TypeError, ep.register, None,
     127                                select.EPOLLIN | select.EPOLLOUT)
     128              # ValueError: file descriptor cannot be a negative integer (-1)
     129              self.assertRaises(ValueError, ep.register, -1,
     130                                select.EPOLLIN | select.EPOLLOUT)
     131              # OSError: [Errno 9] Bad file descriptor
     132              self.assertRaises(OSError, ep.register, 10000,
     133                                select.EPOLLIN | select.EPOLLOUT)
     134              # registering twice also raises an exception
     135              ep.register(server, select.EPOLLIN | select.EPOLLOUT)
     136              self.assertRaises(OSError, ep.register, server,
     137                                select.EPOLLIN | select.EPOLLOUT)
     138          finally:
     139              ep.close()
     140  
     141      def test_fromfd(self):
     142          server, client = self._connected_pair()
     143  
     144          with select.epoll(2) as ep:
     145              ep2 = select.epoll.fromfd(ep.fileno())
     146  
     147              ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
     148              ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
     149  
     150              events = ep.poll(1, 4)
     151              events2 = ep2.poll(0.9, 4)
     152              self.assertEqual(len(events), 2)
     153              self.assertEqual(len(events2), 2)
     154  
     155          try:
     156              ep2.poll(1, 4)
     157          except OSError as e:
     158              self.assertEqual(e.args[0], errno.EBADF, e)
     159          else:
     160              self.fail("epoll on closed fd didn't raise EBADF")
     161  
     162      def test_control_and_wait(self):
     163          # create the epoll object
     164          client, server = self._connected_pair()
     165          ep = select.epoll(16)
     166          ep.register(server.fileno(),
     167                      select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
     168          ep.register(client.fileno(),
     169                      select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
     170  
     171          # EPOLLOUT
     172          now = time.monotonic()
     173          events = ep.poll(1, 4)
     174          then = time.monotonic()
     175          self.assertFalse(then - now > 0.1, then - now)
     176  
     177          expected = [(client.fileno(), select.EPOLLOUT),
     178                      (server.fileno(), select.EPOLLOUT)]
     179          self.assertEqual(sorted(events), sorted(expected))
     180  
     181          # no event
     182          events = ep.poll(timeout=0.1, maxevents=4)
     183          self.assertFalse(events)
     184  
     185          # send: EPOLLIN and EPOLLOUT
     186          client.sendall(b"Hello!")
     187          server.sendall(b"world!!!")
     188  
     189          now = time.monotonic()
     190          events = ep.poll(1.0, 4)
     191          then = time.monotonic()
     192          self.assertFalse(then - now > 0.01)
     193  
     194          expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
     195                      (server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
     196          self.assertEqual(sorted(events), sorted(expected))
     197  
     198          # unregister, modify
     199          ep.unregister(client.fileno())
     200          ep.modify(server.fileno(), select.EPOLLOUT)
     201          now = time.monotonic()
     202          events = ep.poll(1, 4)
     203          then = time.monotonic()
     204          self.assertFalse(then - now > 0.01)
     205  
     206          expected = [(server.fileno(), select.EPOLLOUT)]
     207          self.assertEqual(events, expected)
     208  
     209      def test_errors(self):
     210          self.assertRaises(ValueError, select.epoll, -2)
     211          self.assertRaises(ValueError, select.epoll().register, -1,
     212                            select.EPOLLIN)
     213  
     214      def test_unregister_closed(self):
     215          server, client = self._connected_pair()
     216          fd = server.fileno()
     217          ep = select.epoll(16)
     218          ep.register(server)
     219  
     220          now = time.monotonic()
     221          events = ep.poll(1, 4)
     222          then = time.monotonic()
     223          self.assertFalse(then - now > 0.01)
     224  
     225          server.close()
     226  
     227          with self.assertRaises(OSError) as cm:
     228              ep.unregister(fd)
     229          self.assertEqual(cm.exception.errno, errno.EBADF)
     230  
     231      def test_close(self):
     232          open_file = open(__file__, "rb")
     233          self.addCleanup(open_file.close)
     234          fd = open_file.fileno()
     235          epoll = select.epoll()
     236  
     237          # test fileno() method and closed attribute
     238          self.assertIsInstance(epoll.fileno(), int)
     239          self.assertFalse(epoll.closed)
     240  
     241          # test close()
     242          epoll.close()
     243          self.assertTrue(epoll.closed)
     244          self.assertRaises(ValueError, epoll.fileno)
     245  
     246          # close() can be called more than once
     247          epoll.close()
     248  
     249          # operations must fail with ValueError("I/O operation on closed ...")
     250          self.assertRaises(ValueError, epoll.modify, fd, select.EPOLLIN)
     251          self.assertRaises(ValueError, epoll.poll, 1.0)
     252          self.assertRaises(ValueError, epoll.register, fd, select.EPOLLIN)
     253          self.assertRaises(ValueError, epoll.unregister, fd)
     254  
     255      def test_fd_non_inheritable(self):
     256          epoll = select.epoll()
     257          self.addCleanup(epoll.close)
     258          self.assertEqual(os.get_inheritable(epoll.fileno()), False)
     259  
     260  
     261  if __name__ == "__main__":
     262      unittest.main()