(root)/
Python-3.12.0/
Lib/
test/
test_epoll.py
       1  # Copyright (c) 2001-2006 Twisted Matrix Laboratories.
       2  #
       3  # Permission is hereby granted, free of charge, to any person obtaining
       4  # a copy of this software and associated documentation files (the
       5  # "Software"), to deal in the Software without restriction, including
       6  # without limitation the rights to use, copy, modify, merge, publish,
       7  # distribute, sublicense, and/or sell copies of the Software, and to
       8  # permit persons to whom the Software is furnished to do so, subject to
       9  # the following conditions:
      10  #
      11  # The above copyright notice and this permission notice shall be
      12  # included in all copies or substantial portions of the Software.
      13  #
      14  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
      15  # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
      16  # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
      17  # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
      18  # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
      19  # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
      20  # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
      21  """
      22  Tests for epoll wrapper.
      23  """
      24  import errno
      25  import os
      26  import select
      27  import socket
      28  import time
      29  import unittest
      30  from test import support
      31  
      32  if not hasattr(select, "epoll"):
      33      raise unittest.SkipTest("test works only on Linux 2.6")
      34  
      35  try:
      36      select.epoll()
      37  except OSError as e:
      38      if e.errno == errno.ENOSYS:
      39          raise unittest.SkipTest("kernel doesn't support epoll()")
      40      raise
      41  
      42  class ESC[4;38;5;81mTestEPoll(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      43  
      44      def setUp(self):
      45          self.serverSocket = socket.create_server(('127.0.0.1', 0))
      46          self.connections = [self.serverSocket]
      47  
      48      def tearDown(self):
      49          for skt in self.connections:
      50              skt.close()
      51  
      52      def _connected_pair(self):
      53          client = socket.socket()
      54          client.setblocking(False)
      55          try:
      56              client.connect(('127.0.0.1', self.serverSocket.getsockname()[1]))
      57          except OSError as e:
      58              self.assertEqual(e.args[0], errno.EINPROGRESS)
      59          else:
      60              raise AssertionError("Connect should have raised EINPROGRESS")
      61          server, addr = self.serverSocket.accept()
      62  
      63          self.connections.extend((client, server))
      64          return client, server
      65  
      66      def test_create(self):
      67          try:
      68              ep = select.epoll(16)
      69          except OSError as e:
      70              raise AssertionError(str(e))
      71          self.assertTrue(ep.fileno() > 0, ep.fileno())
      72          self.assertTrue(not ep.closed)
      73          ep.close()
      74          self.assertTrue(ep.closed)
      75          self.assertRaises(ValueError, ep.fileno)
      76  
      77          if hasattr(select, "EPOLL_CLOEXEC"):
      78              select.epoll(-1, select.EPOLL_CLOEXEC).close()
      79              select.epoll(flags=select.EPOLL_CLOEXEC).close()
      80              select.epoll(flags=0).close()
      81  
      82      def test_badcreate(self):
      83          self.assertRaises(TypeError, select.epoll, 1, 2, 3)
      84          self.assertRaises(TypeError, select.epoll, 'foo')
      85          self.assertRaises(TypeError, select.epoll, None)
      86          self.assertRaises(TypeError, select.epoll, ())
      87          self.assertRaises(TypeError, select.epoll, ['foo'])
      88          self.assertRaises(TypeError, select.epoll, {})
      89  
      90          self.assertRaises(ValueError, select.epoll, 0)
      91          self.assertRaises(ValueError, select.epoll, -2)
      92          self.assertRaises(ValueError, select.epoll, sizehint=-2)
      93  
      94          if hasattr(select, "EPOLL_CLOEXEC"):
      95              self.assertRaises(OSError, select.epoll, flags=12356)
      96  
      97      def test_context_manager(self):
      98          with select.epoll(16) as ep:
      99              self.assertGreater(ep.fileno(), 0)
     100              self.assertFalse(ep.closed)
     101          self.assertTrue(ep.closed)
     102          self.assertRaises(ValueError, ep.fileno)
     103  
     104      def test_add(self):
     105          server, client = self._connected_pair()
     106  
     107          ep = select.epoll(2)
     108          try:
     109              ep.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
     110              ep.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
     111          finally:
     112              ep.close()
     113  
     114          # adding by object w/ fileno works, too.
     115          ep = select.epoll(2)
     116          try:
     117              ep.register(server, select.EPOLLIN | select.EPOLLOUT)
     118              ep.register(client, select.EPOLLIN | select.EPOLLOUT)
     119          finally:
     120              ep.close()
     121  
     122          ep = select.epoll(2)
     123          try:
     124              # TypeError: argument must be an int, or have a fileno() method.
     125              self.assertRaises(TypeError, ep.register, object(),
     126                                select.EPOLLIN | select.EPOLLOUT)
     127              self.assertRaises(TypeError, ep.register, None,
     128                                select.EPOLLIN | select.EPOLLOUT)
     129              # ValueError: file descriptor cannot be a negative integer (-1)
     130              self.assertRaises(ValueError, ep.register, -1,
     131                                select.EPOLLIN | select.EPOLLOUT)
     132              # OSError: [Errno 9] Bad file descriptor
     133              self.assertRaises(OSError, ep.register, 10000,
     134                                select.EPOLLIN | select.EPOLLOUT)
     135              # registering twice also raises an exception
     136              ep.register(server, select.EPOLLIN | select.EPOLLOUT)
     137              self.assertRaises(OSError, ep.register, server,
     138                                select.EPOLLIN | select.EPOLLOUT)
     139          finally:
     140              ep.close()
     141  
     142      def test_fromfd(self):
     143          server, client = self._connected_pair()
     144  
     145          with select.epoll(2) as ep:
     146              ep2 = select.epoll.fromfd(ep.fileno())
     147  
     148              ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
     149              ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
     150  
     151              events = ep.poll(1, 4)
     152              events2 = ep2.poll(0.9, 4)
     153              self.assertEqual(len(events), 2)
     154              self.assertEqual(len(events2), 2)
     155  
     156          try:
     157              ep2.poll(1, 4)
     158          except OSError as e:
     159              self.assertEqual(e.args[0], errno.EBADF, e)
     160          else:
     161              self.fail("epoll on closed fd didn't raise EBADF")
     162  
     163      def test_control_and_wait(self):
     164          # create the epoll object
     165          client, server = self._connected_pair()
     166          ep = select.epoll(16)
     167          ep.register(server.fileno(),
     168                      select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
     169          ep.register(client.fileno(),
     170                      select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
     171  
     172          # EPOLLOUT
     173          now = time.monotonic()
     174          events = ep.poll(1, 4)
     175          then = time.monotonic()
     176          self.assertFalse(then - now > 0.1, then - now)
     177  
     178          expected = [(client.fileno(), select.EPOLLOUT),
     179                      (server.fileno(), select.EPOLLOUT)]
     180          self.assertEqual(sorted(events), sorted(expected))
     181  
     182          # no event
     183          events = ep.poll(timeout=0.1, maxevents=4)
     184          self.assertFalse(events)
     185  
     186          # send: EPOLLIN and EPOLLOUT
     187          client.sendall(b"Hello!")
     188          server.sendall(b"world!!!")
     189  
     190          # we might receive events one at a time, necessitating multiple calls to
     191          # poll
     192          events = []
     193          for _ in support.busy_retry(support.SHORT_TIMEOUT):
     194              now = time.monotonic()
     195              events += ep.poll(1.0, 4)
     196              then = time.monotonic()
     197              self.assertFalse(then - now > 0.01)
     198              if len(events) >= 2:
     199                  break
     200  
     201          expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
     202                      (server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
     203          self.assertEqual(sorted(events), sorted(expected))
     204  
     205          # unregister, modify
     206          ep.unregister(client.fileno())
     207          ep.modify(server.fileno(), select.EPOLLOUT)
     208          now = time.monotonic()
     209          events = ep.poll(1, 4)
     210          then = time.monotonic()
     211          self.assertFalse(then - now > 0.01)
     212  
     213          expected = [(server.fileno(), select.EPOLLOUT)]
     214          self.assertEqual(events, expected)
     215  
     216      def test_errors(self):
     217          self.assertRaises(ValueError, select.epoll, -2)
     218          self.assertRaises(ValueError, select.epoll().register, -1,
     219                            select.EPOLLIN)
     220  
     221      def test_unregister_closed(self):
     222          server, client = self._connected_pair()
     223          fd = server.fileno()
     224          ep = select.epoll(16)
     225          ep.register(server)
     226  
     227          now = time.monotonic()
     228          events = ep.poll(1, 4)
     229          then = time.monotonic()
     230          self.assertFalse(then - now > 0.01)
     231  
     232          server.close()
     233  
     234          with self.assertRaises(OSError) as cm:
     235              ep.unregister(fd)
     236          self.assertEqual(cm.exception.errno, errno.EBADF)
     237  
     238      def test_close(self):
     239          open_file = open(__file__, "rb")
     240          self.addCleanup(open_file.close)
     241          fd = open_file.fileno()
     242          epoll = select.epoll()
     243  
     244          # test fileno() method and closed attribute
     245          self.assertIsInstance(epoll.fileno(), int)
     246          self.assertFalse(epoll.closed)
     247  
     248          # test close()
     249          epoll.close()
     250          self.assertTrue(epoll.closed)
     251          self.assertRaises(ValueError, epoll.fileno)
     252  
     253          # close() can be called more than once
     254          epoll.close()
     255  
     256          # operations must fail with ValueError("I/O operation on closed ...")
     257          self.assertRaises(ValueError, epoll.modify, fd, select.EPOLLIN)
     258          self.assertRaises(ValueError, epoll.poll, 1.0)
     259          self.assertRaises(ValueError, epoll.register, fd, select.EPOLLIN)
     260          self.assertRaises(ValueError, epoll.unregister, fd)
     261  
     262      def test_fd_non_inheritable(self):
     263          epoll = select.epoll()
     264          self.addCleanup(epoll.close)
     265          self.assertEqual(os.get_inheritable(epoll.fileno()), False)
     266  
     267  
     268  if __name__ == "__main__":
     269      unittest.main()