(root)/
Python-3.11.7/
Lib/
test/
test_selectors.py
       1  import errno
       2  import os
       3  import random
       4  import selectors
       5  import signal
       6  import socket
       7  import sys
       8  from test import support
       9  from test.support import os_helper
      10  from test.support import socket_helper
      11  from time import sleep
      12  import unittest
      13  import unittest.mock
      14  import tempfile
      15  from time import monotonic as time
      16  try:
      17      import resource
      18  except ImportError:
      19      resource = None
      20  
      21  
      22  if support.is_emscripten or support.is_wasi:
      23      raise unittest.SkipTest("Cannot create socketpair on Emscripten/WASI.")
      24  
      25  
      26  if hasattr(socket, 'socketpair'):
      27      socketpair = socket.socketpair
      28  else:
      29      def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0):
      30          with socket.socket(family, type, proto) as l:
      31              l.bind((socket_helper.HOST, 0))
      32              l.listen()
      33              c = socket.socket(family, type, proto)
      34              try:
      35                  c.connect(l.getsockname())
      36                  caddr = c.getsockname()
      37                  while True:
      38                      a, addr = l.accept()
      39                      # check that we've got the correct client
      40                      if addr == caddr:
      41                          return c, a
      42                      a.close()
      43              except OSError:
      44                  c.close()
      45                  raise
      46  
      47  
      48  def find_ready_matching(ready, flag):
      49      match = []
      50      for key, events in ready:
      51          if events & flag:
      52              match.append(key.fileobj)
      53      return match
      54  
      55  
      56  class ESC[4;38;5;81mBaseSelectorTestCase:
      57  
      58      def make_socketpair(self):
      59          rd, wr = socketpair()
      60          self.addCleanup(rd.close)
      61          self.addCleanup(wr.close)
      62          return rd, wr
      63  
      64      def test_register(self):
      65          s = self.SELECTOR()
      66          self.addCleanup(s.close)
      67  
      68          rd, wr = self.make_socketpair()
      69  
      70          key = s.register(rd, selectors.EVENT_READ, "data")
      71          self.assertIsInstance(key, selectors.SelectorKey)
      72          self.assertEqual(key.fileobj, rd)
      73          self.assertEqual(key.fd, rd.fileno())
      74          self.assertEqual(key.events, selectors.EVENT_READ)
      75          self.assertEqual(key.data, "data")
      76  
      77          # register an unknown event
      78          self.assertRaises(ValueError, s.register, 0, 999999)
      79  
      80          # register an invalid FD
      81          self.assertRaises(ValueError, s.register, -10, selectors.EVENT_READ)
      82  
      83          # register twice
      84          self.assertRaises(KeyError, s.register, rd, selectors.EVENT_READ)
      85  
      86          # register the same FD, but with a different object
      87          self.assertRaises(KeyError, s.register, rd.fileno(),
      88                            selectors.EVENT_READ)
      89  
      90      def test_unregister(self):
      91          s = self.SELECTOR()
      92          self.addCleanup(s.close)
      93  
      94          rd, wr = self.make_socketpair()
      95  
      96          s.register(rd, selectors.EVENT_READ)
      97          s.unregister(rd)
      98  
      99          # unregister an unknown file obj
     100          self.assertRaises(KeyError, s.unregister, 999999)
     101  
     102          # unregister twice
     103          self.assertRaises(KeyError, s.unregister, rd)
     104  
     105      def test_unregister_after_fd_close(self):
     106          s = self.SELECTOR()
     107          self.addCleanup(s.close)
     108          rd, wr = self.make_socketpair()
     109          r, w = rd.fileno(), wr.fileno()
     110          s.register(r, selectors.EVENT_READ)
     111          s.register(w, selectors.EVENT_WRITE)
     112          rd.close()
     113          wr.close()
     114          s.unregister(r)
     115          s.unregister(w)
     116  
     117      @unittest.skipUnless(os.name == 'posix', "requires posix")
     118      def test_unregister_after_fd_close_and_reuse(self):
     119          s = self.SELECTOR()
     120          self.addCleanup(s.close)
     121          rd, wr = self.make_socketpair()
     122          r, w = rd.fileno(), wr.fileno()
     123          s.register(r, selectors.EVENT_READ)
     124          s.register(w, selectors.EVENT_WRITE)
     125          rd2, wr2 = self.make_socketpair()
     126          rd.close()
     127          wr.close()
     128          os.dup2(rd2.fileno(), r)
     129          os.dup2(wr2.fileno(), w)
     130          self.addCleanup(os.close, r)
     131          self.addCleanup(os.close, w)
     132          s.unregister(r)
     133          s.unregister(w)
     134  
     135      def test_unregister_after_socket_close(self):
     136          s = self.SELECTOR()
     137          self.addCleanup(s.close)
     138          rd, wr = self.make_socketpair()
     139          s.register(rd, selectors.EVENT_READ)
     140          s.register(wr, selectors.EVENT_WRITE)
     141          rd.close()
     142          wr.close()
     143          s.unregister(rd)
     144          s.unregister(wr)
     145  
     146      def test_modify(self):
     147          s = self.SELECTOR()
     148          self.addCleanup(s.close)
     149  
     150          rd, wr = self.make_socketpair()
     151  
     152          key = s.register(rd, selectors.EVENT_READ)
     153  
     154          # modify events
     155          key2 = s.modify(rd, selectors.EVENT_WRITE)
     156          self.assertNotEqual(key.events, key2.events)
     157          self.assertEqual(key2, s.get_key(rd))
     158  
     159          s.unregister(rd)
     160  
     161          # modify data
     162          d1 = object()
     163          d2 = object()
     164  
     165          key = s.register(rd, selectors.EVENT_READ, d1)
     166          key2 = s.modify(rd, selectors.EVENT_READ, d2)
     167          self.assertEqual(key.events, key2.events)
     168          self.assertNotEqual(key.data, key2.data)
     169          self.assertEqual(key2, s.get_key(rd))
     170          self.assertEqual(key2.data, d2)
     171  
     172          # modify unknown file obj
     173          self.assertRaises(KeyError, s.modify, 999999, selectors.EVENT_READ)
     174  
     175          # modify use a shortcut
     176          d3 = object()
     177          s.register = unittest.mock.Mock()
     178          s.unregister = unittest.mock.Mock()
     179  
     180          s.modify(rd, selectors.EVENT_READ, d3)
     181          self.assertFalse(s.register.called)
     182          self.assertFalse(s.unregister.called)
     183  
     184      def test_modify_unregister(self):
     185          # Make sure the fd is unregister()ed in case of error on
     186          # modify(): http://bugs.python.org/issue30014
     187          if self.SELECTOR.__name__ == 'EpollSelector':
     188              patch = unittest.mock.patch(
     189                  'selectors.EpollSelector._selector_cls')
     190          elif self.SELECTOR.__name__ == 'PollSelector':
     191              patch = unittest.mock.patch(
     192                  'selectors.PollSelector._selector_cls')
     193          elif self.SELECTOR.__name__ == 'DevpollSelector':
     194              patch = unittest.mock.patch(
     195                  'selectors.DevpollSelector._selector_cls')
     196          else:
     197              raise self.skipTest("")
     198  
     199          with patch as m:
     200              m.return_value.modify = unittest.mock.Mock(
     201                  side_effect=ZeroDivisionError)
     202              s = self.SELECTOR()
     203              self.addCleanup(s.close)
     204              rd, wr = self.make_socketpair()
     205              s.register(rd, selectors.EVENT_READ)
     206              self.assertEqual(len(s._map), 1)
     207              with self.assertRaises(ZeroDivisionError):
     208                  s.modify(rd, selectors.EVENT_WRITE)
     209              self.assertEqual(len(s._map), 0)
     210  
     211      def test_close(self):
     212          s = self.SELECTOR()
     213          self.addCleanup(s.close)
     214  
     215          mapping = s.get_map()
     216          rd, wr = self.make_socketpair()
     217  
     218          s.register(rd, selectors.EVENT_READ)
     219          s.register(wr, selectors.EVENT_WRITE)
     220  
     221          s.close()
     222          self.assertRaises(RuntimeError, s.get_key, rd)
     223          self.assertRaises(RuntimeError, s.get_key, wr)
     224          self.assertRaises(KeyError, mapping.__getitem__, rd)
     225          self.assertRaises(KeyError, mapping.__getitem__, wr)
     226  
     227      def test_get_key(self):
     228          s = self.SELECTOR()
     229          self.addCleanup(s.close)
     230  
     231          rd, wr = self.make_socketpair()
     232  
     233          key = s.register(rd, selectors.EVENT_READ, "data")
     234          self.assertEqual(key, s.get_key(rd))
     235  
     236          # unknown file obj
     237          self.assertRaises(KeyError, s.get_key, 999999)
     238  
     239      def test_get_map(self):
     240          s = self.SELECTOR()
     241          self.addCleanup(s.close)
     242  
     243          rd, wr = self.make_socketpair()
     244  
     245          keys = s.get_map()
     246          self.assertFalse(keys)
     247          self.assertEqual(len(keys), 0)
     248          self.assertEqual(list(keys), [])
     249          key = s.register(rd, selectors.EVENT_READ, "data")
     250          self.assertIn(rd, keys)
     251          self.assertEqual(key, keys[rd])
     252          self.assertEqual(len(keys), 1)
     253          self.assertEqual(list(keys), [rd.fileno()])
     254          self.assertEqual(list(keys.values()), [key])
     255  
     256          # unknown file obj
     257          with self.assertRaises(KeyError):
     258              keys[999999]
     259  
     260          # Read-only mapping
     261          with self.assertRaises(TypeError):
     262              del keys[rd]
     263  
     264      def test_select(self):
     265          s = self.SELECTOR()
     266          self.addCleanup(s.close)
     267  
     268          rd, wr = self.make_socketpair()
     269  
     270          s.register(rd, selectors.EVENT_READ)
     271          wr_key = s.register(wr, selectors.EVENT_WRITE)
     272  
     273          result = s.select()
     274          for key, events in result:
     275              self.assertTrue(isinstance(key, selectors.SelectorKey))
     276              self.assertTrue(events)
     277              self.assertFalse(events & ~(selectors.EVENT_READ |
     278                                          selectors.EVENT_WRITE))
     279  
     280          self.assertEqual([(wr_key, selectors.EVENT_WRITE)], result)
     281  
     282      def test_select_read_write(self):
     283          # gh-110038: when a file descriptor is registered for both read and
     284          # write, the two events must be seen on a single call to select().
     285          s = self.SELECTOR()
     286          self.addCleanup(s.close)
     287  
     288          sock1, sock2 = self.make_socketpair()
     289          sock2.send(b"foo")
     290          my_key = s.register(sock1, selectors.EVENT_READ | selectors.EVENT_WRITE)
     291  
     292          seen_read, seen_write = False, False
     293          result = s.select()
     294          # We get the read and write either in the same result entry or in two
     295          # distinct entries with the same key.
     296          self.assertLessEqual(len(result), 2)
     297          for key, events in result:
     298              self.assertTrue(isinstance(key, selectors.SelectorKey))
     299              self.assertEqual(key, my_key)
     300              self.assertFalse(events & ~(selectors.EVENT_READ |
     301                                          selectors.EVENT_WRITE))
     302              if events & selectors.EVENT_READ:
     303                  self.assertFalse(seen_read)
     304                  seen_read = True
     305              if events & selectors.EVENT_WRITE:
     306                  self.assertFalse(seen_write)
     307                  seen_write = True
     308          self.assertTrue(seen_read)
     309          self.assertTrue(seen_write)
     310  
     311      def test_context_manager(self):
     312          s = self.SELECTOR()
     313          self.addCleanup(s.close)
     314  
     315          rd, wr = self.make_socketpair()
     316  
     317          with s as sel:
     318              sel.register(rd, selectors.EVENT_READ)
     319              sel.register(wr, selectors.EVENT_WRITE)
     320  
     321          self.assertRaises(RuntimeError, s.get_key, rd)
     322          self.assertRaises(RuntimeError, s.get_key, wr)
     323  
     324      def test_fileno(self):
     325          s = self.SELECTOR()
     326          self.addCleanup(s.close)
     327  
     328          if hasattr(s, 'fileno'):
     329              fd = s.fileno()
     330              self.assertTrue(isinstance(fd, int))
     331              self.assertGreaterEqual(fd, 0)
     332  
     333      def test_selector(self):
     334          s = self.SELECTOR()
     335          self.addCleanup(s.close)
     336  
     337          NUM_SOCKETS = 12
     338          MSG = b" This is a test."
     339          MSG_LEN = len(MSG)
     340          readers = []
     341          writers = []
     342          r2w = {}
     343          w2r = {}
     344  
     345          for i in range(NUM_SOCKETS):
     346              rd, wr = self.make_socketpair()
     347              s.register(rd, selectors.EVENT_READ)
     348              s.register(wr, selectors.EVENT_WRITE)
     349              readers.append(rd)
     350              writers.append(wr)
     351              r2w[rd] = wr
     352              w2r[wr] = rd
     353  
     354          bufs = []
     355  
     356          while writers:
     357              ready = s.select()
     358              ready_writers = find_ready_matching(ready, selectors.EVENT_WRITE)
     359              if not ready_writers:
     360                  self.fail("no sockets ready for writing")
     361              wr = random.choice(ready_writers)
     362              wr.send(MSG)
     363  
     364              for i in range(10):
     365                  ready = s.select()
     366                  ready_readers = find_ready_matching(ready,
     367                                                      selectors.EVENT_READ)
     368                  if ready_readers:
     369                      break
     370                  # there might be a delay between the write to the write end and
     371                  # the read end is reported ready
     372                  sleep(0.1)
     373              else:
     374                  self.fail("no sockets ready for reading")
     375              self.assertEqual([w2r[wr]], ready_readers)
     376              rd = ready_readers[0]
     377              buf = rd.recv(MSG_LEN)
     378              self.assertEqual(len(buf), MSG_LEN)
     379              bufs.append(buf)
     380              s.unregister(r2w[rd])
     381              s.unregister(rd)
     382              writers.remove(r2w[rd])
     383  
     384          self.assertEqual(bufs, [MSG] * NUM_SOCKETS)
     385  
     386      @unittest.skipIf(sys.platform == 'win32',
     387                       'select.select() cannot be used with empty fd sets')
     388      def test_empty_select(self):
     389          # Issue #23009: Make sure EpollSelector.select() works when no FD is
     390          # registered.
     391          s = self.SELECTOR()
     392          self.addCleanup(s.close)
     393          self.assertEqual(s.select(timeout=0), [])
     394  
     395      def test_timeout(self):
     396          s = self.SELECTOR()
     397          self.addCleanup(s.close)
     398  
     399          rd, wr = self.make_socketpair()
     400  
     401          s.register(wr, selectors.EVENT_WRITE)
     402          t = time()
     403          self.assertEqual(1, len(s.select(0)))
     404          self.assertEqual(1, len(s.select(-1)))
     405          self.assertLess(time() - t, 0.5)
     406  
     407          s.unregister(wr)
     408          s.register(rd, selectors.EVENT_READ)
     409          t = time()
     410          self.assertFalse(s.select(0))
     411          self.assertFalse(s.select(-1))
     412          self.assertLess(time() - t, 0.5)
     413  
     414          t0 = time()
     415          self.assertFalse(s.select(1))
     416          t1 = time()
     417          dt = t1 - t0
     418          # Tolerate 2.0 seconds for very slow buildbots
     419          self.assertTrue(0.8 <= dt <= 2.0, dt)
     420  
     421      @unittest.skipUnless(hasattr(signal, "alarm"),
     422                           "signal.alarm() required for this test")
     423      def test_select_interrupt_exc(self):
     424          s = self.SELECTOR()
     425          self.addCleanup(s.close)
     426  
     427          rd, wr = self.make_socketpair()
     428  
     429          class ESC[4;38;5;81mInterruptSelect(ESC[4;38;5;149mException):
     430              pass
     431  
     432          def handler(*args):
     433              raise InterruptSelect
     434  
     435          orig_alrm_handler = signal.signal(signal.SIGALRM, handler)
     436          self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
     437  
     438          try:
     439              signal.alarm(1)
     440  
     441              s.register(rd, selectors.EVENT_READ)
     442              t = time()
     443              # select() is interrupted by a signal which raises an exception
     444              with self.assertRaises(InterruptSelect):
     445                  s.select(30)
     446              # select() was interrupted before the timeout of 30 seconds
     447              self.assertLess(time() - t, 5.0)
     448          finally:
     449              signal.alarm(0)
     450  
     451      @unittest.skipUnless(hasattr(signal, "alarm"),
     452                           "signal.alarm() required for this test")
     453      def test_select_interrupt_noraise(self):
     454          s = self.SELECTOR()
     455          self.addCleanup(s.close)
     456  
     457          rd, wr = self.make_socketpair()
     458  
     459          orig_alrm_handler = signal.signal(signal.SIGALRM, lambda *args: None)
     460          self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
     461  
     462          try:
     463              signal.alarm(1)
     464  
     465              s.register(rd, selectors.EVENT_READ)
     466              t = time()
     467              # select() is interrupted by a signal, but the signal handler doesn't
     468              # raise an exception, so select() should by retries with a recomputed
     469              # timeout
     470              self.assertFalse(s.select(1.5))
     471              self.assertGreaterEqual(time() - t, 1.0)
     472          finally:
     473              signal.alarm(0)
     474  
     475  
     476  class ESC[4;38;5;81mScalableSelectorMixIn:
     477  
     478      # see issue #18963 for why it's skipped on older OS X versions
     479      @support.requires_mac_ver(10, 5)
     480      @unittest.skipUnless(resource, "Test needs resource module")
     481      @support.requires_resource('cpu')
     482      def test_above_fd_setsize(self):
     483          # A scalable implementation should have no problem with more than
     484          # FD_SETSIZE file descriptors. Since we don't know the value, we just
     485          # try to set the soft RLIMIT_NOFILE to the hard RLIMIT_NOFILE ceiling.
     486          soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
     487          try:
     488              resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard))
     489              self.addCleanup(resource.setrlimit, resource.RLIMIT_NOFILE,
     490                              (soft, hard))
     491              NUM_FDS = min(hard, 2**16)
     492          except (OSError, ValueError):
     493              NUM_FDS = soft
     494  
     495          # guard for already allocated FDs (stdin, stdout...)
     496          NUM_FDS -= 32
     497  
     498          s = self.SELECTOR()
     499          self.addCleanup(s.close)
     500  
     501          for i in range(NUM_FDS // 2):
     502              try:
     503                  rd, wr = self.make_socketpair()
     504              except OSError:
     505                  # too many FDs, skip - note that we should only catch EMFILE
     506                  # here, but apparently *BSD and Solaris can fail upon connect()
     507                  # or bind() with EADDRNOTAVAIL, so let's be safe
     508                  self.skipTest("FD limit reached")
     509  
     510              try:
     511                  s.register(rd, selectors.EVENT_READ)
     512                  s.register(wr, selectors.EVENT_WRITE)
     513              except OSError as e:
     514                  if e.errno == errno.ENOSPC:
     515                      # this can be raised by epoll if we go over
     516                      # fs.epoll.max_user_watches sysctl
     517                      self.skipTest("FD limit reached")
     518                  raise
     519  
     520          try:
     521              fds = s.select()
     522          except OSError as e:
     523              if e.errno == errno.EINVAL and sys.platform == 'darwin':
     524                  # unexplainable errors on macOS don't need to fail the test
     525                  self.skipTest("Invalid argument error calling poll()")
     526              raise
     527          self.assertEqual(NUM_FDS // 2, len(fds))
     528  
     529  
     530  class ESC[4;38;5;81mDefaultSelectorTestCase(ESC[4;38;5;149mBaseSelectorTestCase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     531  
     532      SELECTOR = selectors.DefaultSelector
     533  
     534  
     535  class ESC[4;38;5;81mSelectSelectorTestCase(ESC[4;38;5;149mBaseSelectorTestCase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     536  
     537      SELECTOR = selectors.SelectSelector
     538  
     539  
     540  @unittest.skipUnless(hasattr(selectors, 'PollSelector'),
     541                       "Test needs selectors.PollSelector")
     542  class ESC[4;38;5;81mPollSelectorTestCase(ESC[4;38;5;149mBaseSelectorTestCase, ESC[4;38;5;149mScalableSelectorMixIn,
     543                             ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     544  
     545      SELECTOR = getattr(selectors, 'PollSelector', None)
     546  
     547  
     548  @unittest.skipUnless(hasattr(selectors, 'EpollSelector'),
     549                       "Test needs selectors.EpollSelector")
     550  class ESC[4;38;5;81mEpollSelectorTestCase(ESC[4;38;5;149mBaseSelectorTestCase, ESC[4;38;5;149mScalableSelectorMixIn,
     551                              ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     552  
     553      SELECTOR = getattr(selectors, 'EpollSelector', None)
     554  
     555      def test_register_file(self):
     556          # epoll(7) returns EPERM when given a file to watch
     557          s = self.SELECTOR()
     558          with tempfile.NamedTemporaryFile() as f:
     559              with self.assertRaises(IOError):
     560                  s.register(f, selectors.EVENT_READ)
     561              # the SelectorKey has been removed
     562              with self.assertRaises(KeyError):
     563                  s.get_key(f)
     564  
     565  
     566  @unittest.skipUnless(hasattr(selectors, 'KqueueSelector'),
     567                       "Test needs selectors.KqueueSelector)")
     568  class ESC[4;38;5;81mKqueueSelectorTestCase(ESC[4;38;5;149mBaseSelectorTestCase, ESC[4;38;5;149mScalableSelectorMixIn,
     569                               ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     570  
     571      SELECTOR = getattr(selectors, 'KqueueSelector', None)
     572  
     573      def test_register_bad_fd(self):
     574          # a file descriptor that's been closed should raise an OSError
     575          # with EBADF
     576          s = self.SELECTOR()
     577          bad_f = os_helper.make_bad_fd()
     578          with self.assertRaises(OSError) as cm:
     579              s.register(bad_f, selectors.EVENT_READ)
     580          self.assertEqual(cm.exception.errno, errno.EBADF)
     581          # the SelectorKey has been removed
     582          with self.assertRaises(KeyError):
     583              s.get_key(bad_f)
     584  
     585      def test_empty_select_timeout(self):
     586          # Issues #23009, #29255: Make sure timeout is applied when no fds
     587          # are registered.
     588          s = self.SELECTOR()
     589          self.addCleanup(s.close)
     590  
     591          t0 = time()
     592          self.assertEqual(s.select(1), [])
     593          t1 = time()
     594          dt = t1 - t0
     595          # Tolerate 2.0 seconds for very slow buildbots
     596          self.assertTrue(0.8 <= dt <= 2.0, dt)
     597  
     598  
     599  @unittest.skipUnless(hasattr(selectors, 'DevpollSelector'),
     600                       "Test needs selectors.DevpollSelector")
     601  class ESC[4;38;5;81mDevpollSelectorTestCase(ESC[4;38;5;149mBaseSelectorTestCase, ESC[4;38;5;149mScalableSelectorMixIn,
     602                                ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     603  
     604      SELECTOR = getattr(selectors, 'DevpollSelector', None)
     605  
     606  
     607  def tearDownModule():
     608      support.reap_children()
     609  
     610  
     611  if __name__ == "__main__":
     612      unittest.main()