(root)/
Python-3.12.0/
Lib/
test/
test_selectors.py
       1  import errno
       2  import os
       3  import random
       4  import selectors
       5  import signal
       6  import socket
       7  import sys
       8  from test import support
       9  from test.support import os_helper
      10  from test.support import socket_helper
      11  from time import sleep
      12  import unittest
      13  import unittest.mock
      14  import tempfile
      15  from time import monotonic as time
      16  try:
      17      import resource
      18  except ImportError:
      19      resource = None
      20  
      21  
      22  if support.is_emscripten or support.is_wasi:
      23      raise unittest.SkipTest("Cannot create socketpair on Emscripten/WASI.")
      24  
      25  
      26  if hasattr(socket, 'socketpair'):
      27      socketpair = socket.socketpair
      28  else:
      29      def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0):
      30          with socket.socket(family, type, proto) as l:
      31              l.bind((socket_helper.HOST, 0))
      32              l.listen()
      33              c = socket.socket(family, type, proto)
      34              try:
      35                  c.connect(l.getsockname())
      36                  caddr = c.getsockname()
      37                  while True:
      38                      a, addr = l.accept()
      39                      # check that we've got the correct client
      40                      if addr == caddr:
      41                          return c, a
      42                      a.close()
      43              except OSError:
      44                  c.close()
      45                  raise
      46  
      47  
      48  def find_ready_matching(ready, flag):
      49      match = []
      50      for key, events in ready:
      51          if events & flag:
      52              match.append(key.fileobj)
      53      return match
      54  
      55  
      56  class ESC[4;38;5;81mBaseSelectorTestCase:
      57  
      58      def make_socketpair(self):
      59          rd, wr = socketpair()
      60          self.addCleanup(rd.close)
      61          self.addCleanup(wr.close)
      62          return rd, wr
      63  
      64      def test_register(self):
      65          s = self.SELECTOR()
      66          self.addCleanup(s.close)
      67  
      68          rd, wr = self.make_socketpair()
      69  
      70          key = s.register(rd, selectors.EVENT_READ, "data")
      71          self.assertIsInstance(key, selectors.SelectorKey)
      72          self.assertEqual(key.fileobj, rd)
      73          self.assertEqual(key.fd, rd.fileno())
      74          self.assertEqual(key.events, selectors.EVENT_READ)
      75          self.assertEqual(key.data, "data")
      76  
      77          # register an unknown event
      78          self.assertRaises(ValueError, s.register, 0, 999999)
      79  
      80          # register an invalid FD
      81          self.assertRaises(ValueError, s.register, -10, selectors.EVENT_READ)
      82  
      83          # register twice
      84          self.assertRaises(KeyError, s.register, rd, selectors.EVENT_READ)
      85  
      86          # register the same FD, but with a different object
      87          self.assertRaises(KeyError, s.register, rd.fileno(),
      88                            selectors.EVENT_READ)
      89  
      90      def test_unregister(self):
      91          s = self.SELECTOR()
      92          self.addCleanup(s.close)
      93  
      94          rd, wr = self.make_socketpair()
      95  
      96          s.register(rd, selectors.EVENT_READ)
      97          s.unregister(rd)
      98  
      99          # unregister an unknown file obj
     100          self.assertRaises(KeyError, s.unregister, 999999)
     101  
     102          # unregister twice
     103          self.assertRaises(KeyError, s.unregister, rd)
     104  
     105      def test_unregister_after_fd_close(self):
     106          s = self.SELECTOR()
     107          self.addCleanup(s.close)
     108          rd, wr = self.make_socketpair()
     109          r, w = rd.fileno(), wr.fileno()
     110          s.register(r, selectors.EVENT_READ)
     111          s.register(w, selectors.EVENT_WRITE)
     112          rd.close()
     113          wr.close()
     114          s.unregister(r)
     115          s.unregister(w)
     116  
     117      @unittest.skipUnless(os.name == 'posix', "requires posix")
     118      def test_unregister_after_fd_close_and_reuse(self):
     119          s = self.SELECTOR()
     120          self.addCleanup(s.close)
     121          rd, wr = self.make_socketpair()
     122          r, w = rd.fileno(), wr.fileno()
     123          s.register(r, selectors.EVENT_READ)
     124          s.register(w, selectors.EVENT_WRITE)
     125          rd2, wr2 = self.make_socketpair()
     126          rd.close()
     127          wr.close()
     128          os.dup2(rd2.fileno(), r)
     129          os.dup2(wr2.fileno(), w)
     130          self.addCleanup(os.close, r)
     131          self.addCleanup(os.close, w)
     132          s.unregister(r)
     133          s.unregister(w)
     134  
     135      def test_unregister_after_socket_close(self):
     136          s = self.SELECTOR()
     137          self.addCleanup(s.close)
     138          rd, wr = self.make_socketpair()
     139          s.register(rd, selectors.EVENT_READ)
     140          s.register(wr, selectors.EVENT_WRITE)
     141          rd.close()
     142          wr.close()
     143          s.unregister(rd)
     144          s.unregister(wr)
     145  
     146      def test_modify(self):
     147          s = self.SELECTOR()
     148          self.addCleanup(s.close)
     149  
     150          rd, wr = self.make_socketpair()
     151  
     152          key = s.register(rd, selectors.EVENT_READ)
     153  
     154          # modify events
     155          key2 = s.modify(rd, selectors.EVENT_WRITE)
     156          self.assertNotEqual(key.events, key2.events)
     157          self.assertEqual(key2, s.get_key(rd))
     158  
     159          s.unregister(rd)
     160  
     161          # modify data
     162          d1 = object()
     163          d2 = object()
     164  
     165          key = s.register(rd, selectors.EVENT_READ, d1)
     166          key2 = s.modify(rd, selectors.EVENT_READ, d2)
     167          self.assertEqual(key.events, key2.events)
     168          self.assertNotEqual(key.data, key2.data)
     169          self.assertEqual(key2, s.get_key(rd))
     170          self.assertEqual(key2.data, d2)
     171  
     172          # modify unknown file obj
     173          self.assertRaises(KeyError, s.modify, 999999, selectors.EVENT_READ)
     174  
     175          # modify use a shortcut
     176          d3 = object()
     177          s.register = unittest.mock.Mock()
     178          s.unregister = unittest.mock.Mock()
     179  
     180          s.modify(rd, selectors.EVENT_READ, d3)
     181          self.assertFalse(s.register.called)
     182          self.assertFalse(s.unregister.called)
     183  
     184      def test_modify_unregister(self):
     185          # Make sure the fd is unregister()ed in case of error on
     186          # modify(): http://bugs.python.org/issue30014
     187          if self.SELECTOR.__name__ == 'EpollSelector':
     188              patch = unittest.mock.patch(
     189                  'selectors.EpollSelector._selector_cls')
     190          elif self.SELECTOR.__name__ == 'PollSelector':
     191              patch = unittest.mock.patch(
     192                  'selectors.PollSelector._selector_cls')
     193          elif self.SELECTOR.__name__ == 'DevpollSelector':
     194              patch = unittest.mock.patch(
     195                  'selectors.DevpollSelector._selector_cls')
     196          else:
     197              raise self.skipTest("")
     198  
     199          with patch as m:
     200              m.return_value.modify = unittest.mock.Mock(
     201                  side_effect=ZeroDivisionError)
     202              s = self.SELECTOR()
     203              self.addCleanup(s.close)
     204              rd, wr = self.make_socketpair()
     205              s.register(rd, selectors.EVENT_READ)
     206              self.assertEqual(len(s._map), 1)
     207              with self.assertRaises(ZeroDivisionError):
     208                  s.modify(rd, selectors.EVENT_WRITE)
     209              self.assertEqual(len(s._map), 0)
     210  
     211      def test_close(self):
     212          s = self.SELECTOR()
     213          self.addCleanup(s.close)
     214  
     215          mapping = s.get_map()
     216          rd, wr = self.make_socketpair()
     217  
     218          s.register(rd, selectors.EVENT_READ)
     219          s.register(wr, selectors.EVENT_WRITE)
     220  
     221          s.close()
     222          self.assertRaises(RuntimeError, s.get_key, rd)
     223          self.assertRaises(RuntimeError, s.get_key, wr)
     224          self.assertRaises(KeyError, mapping.__getitem__, rd)
     225          self.assertRaises(KeyError, mapping.__getitem__, wr)
     226  
     227      def test_get_key(self):
     228          s = self.SELECTOR()
     229          self.addCleanup(s.close)
     230  
     231          rd, wr = self.make_socketpair()
     232  
     233          key = s.register(rd, selectors.EVENT_READ, "data")
     234          self.assertEqual(key, s.get_key(rd))
     235  
     236          # unknown file obj
     237          self.assertRaises(KeyError, s.get_key, 999999)
     238  
     239      def test_get_map(self):
     240          s = self.SELECTOR()
     241          self.addCleanup(s.close)
     242  
     243          rd, wr = self.make_socketpair()
     244  
     245          keys = s.get_map()
     246          self.assertFalse(keys)
     247          self.assertEqual(len(keys), 0)
     248          self.assertEqual(list(keys), [])
     249          key = s.register(rd, selectors.EVENT_READ, "data")
     250          self.assertIn(rd, keys)
     251          self.assertEqual(key, keys[rd])
     252          self.assertEqual(len(keys), 1)
     253          self.assertEqual(list(keys), [rd.fileno()])
     254          self.assertEqual(list(keys.values()), [key])
     255  
     256          # unknown file obj
     257          with self.assertRaises(KeyError):
     258              keys[999999]
     259  
     260          # Read-only mapping
     261          with self.assertRaises(TypeError):
     262              del keys[rd]
     263  
     264      def test_select(self):
     265          s = self.SELECTOR()
     266          self.addCleanup(s.close)
     267  
     268          rd, wr = self.make_socketpair()
     269  
     270          s.register(rd, selectors.EVENT_READ)
     271          wr_key = s.register(wr, selectors.EVENT_WRITE)
     272  
     273          result = s.select()
     274          for key, events in result:
     275              self.assertTrue(isinstance(key, selectors.SelectorKey))
     276              self.assertTrue(events)
     277              self.assertFalse(events & ~(selectors.EVENT_READ |
     278                                          selectors.EVENT_WRITE))
     279  
     280          self.assertEqual([(wr_key, selectors.EVENT_WRITE)], result)
     281  
     282      def test_context_manager(self):
     283          s = self.SELECTOR()
     284          self.addCleanup(s.close)
     285  
     286          rd, wr = self.make_socketpair()
     287  
     288          with s as sel:
     289              sel.register(rd, selectors.EVENT_READ)
     290              sel.register(wr, selectors.EVENT_WRITE)
     291  
     292          self.assertRaises(RuntimeError, s.get_key, rd)
     293          self.assertRaises(RuntimeError, s.get_key, wr)
     294  
     295      def test_fileno(self):
     296          s = self.SELECTOR()
     297          self.addCleanup(s.close)
     298  
     299          if hasattr(s, 'fileno'):
     300              fd = s.fileno()
     301              self.assertTrue(isinstance(fd, int))
     302              self.assertGreaterEqual(fd, 0)
     303  
     304      def test_selector(self):
     305          s = self.SELECTOR()
     306          self.addCleanup(s.close)
     307  
     308          NUM_SOCKETS = 12
     309          MSG = b" This is a test."
     310          MSG_LEN = len(MSG)
     311          readers = []
     312          writers = []
     313          r2w = {}
     314          w2r = {}
     315  
     316          for i in range(NUM_SOCKETS):
     317              rd, wr = self.make_socketpair()
     318              s.register(rd, selectors.EVENT_READ)
     319              s.register(wr, selectors.EVENT_WRITE)
     320              readers.append(rd)
     321              writers.append(wr)
     322              r2w[rd] = wr
     323              w2r[wr] = rd
     324  
     325          bufs = []
     326  
     327          while writers:
     328              ready = s.select()
     329              ready_writers = find_ready_matching(ready, selectors.EVENT_WRITE)
     330              if not ready_writers:
     331                  self.fail("no sockets ready for writing")
     332              wr = random.choice(ready_writers)
     333              wr.send(MSG)
     334  
     335              for i in range(10):
     336                  ready = s.select()
     337                  ready_readers = find_ready_matching(ready,
     338                                                      selectors.EVENT_READ)
     339                  if ready_readers:
     340                      break
     341                  # there might be a delay between the write to the write end and
     342                  # the read end is reported ready
     343                  sleep(0.1)
     344              else:
     345                  self.fail("no sockets ready for reading")
     346              self.assertEqual([w2r[wr]], ready_readers)
     347              rd = ready_readers[0]
     348              buf = rd.recv(MSG_LEN)
     349              self.assertEqual(len(buf), MSG_LEN)
     350              bufs.append(buf)
     351              s.unregister(r2w[rd])
     352              s.unregister(rd)
     353              writers.remove(r2w[rd])
     354  
     355          self.assertEqual(bufs, [MSG] * NUM_SOCKETS)
     356  
     357      @unittest.skipIf(sys.platform == 'win32',
     358                       'select.select() cannot be used with empty fd sets')
     359      def test_empty_select(self):
     360          # Issue #23009: Make sure EpollSelector.select() works when no FD is
     361          # registered.
     362          s = self.SELECTOR()
     363          self.addCleanup(s.close)
     364          self.assertEqual(s.select(timeout=0), [])
     365  
     366      def test_timeout(self):
     367          s = self.SELECTOR()
     368          self.addCleanup(s.close)
     369  
     370          rd, wr = self.make_socketpair()
     371  
     372          s.register(wr, selectors.EVENT_WRITE)
     373          t = time()
     374          self.assertEqual(1, len(s.select(0)))
     375          self.assertEqual(1, len(s.select(-1)))
     376          self.assertLess(time() - t, 0.5)
     377  
     378          s.unregister(wr)
     379          s.register(rd, selectors.EVENT_READ)
     380          t = time()
     381          self.assertFalse(s.select(0))
     382          self.assertFalse(s.select(-1))
     383          self.assertLess(time() - t, 0.5)
     384  
     385          t0 = time()
     386          self.assertFalse(s.select(1))
     387          t1 = time()
     388          dt = t1 - t0
     389          # Tolerate 2.0 seconds for very slow buildbots
     390          self.assertTrue(0.8 <= dt <= 2.0, dt)
     391  
     392      @unittest.skipUnless(hasattr(signal, "alarm"),
     393                           "signal.alarm() required for this test")
     394      def test_select_interrupt_exc(self):
     395          s = self.SELECTOR()
     396          self.addCleanup(s.close)
     397  
     398          rd, wr = self.make_socketpair()
     399  
     400          class ESC[4;38;5;81mInterruptSelect(ESC[4;38;5;149mException):
     401              pass
     402  
     403          def handler(*args):
     404              raise InterruptSelect
     405  
     406          orig_alrm_handler = signal.signal(signal.SIGALRM, handler)
     407          self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
     408  
     409          try:
     410              signal.alarm(1)
     411  
     412              s.register(rd, selectors.EVENT_READ)
     413              t = time()
     414              # select() is interrupted by a signal which raises an exception
     415              with self.assertRaises(InterruptSelect):
     416                  s.select(30)
     417              # select() was interrupted before the timeout of 30 seconds
     418              self.assertLess(time() - t, 5.0)
     419          finally:
     420              signal.alarm(0)
     421  
     422      @unittest.skipUnless(hasattr(signal, "alarm"),
     423                           "signal.alarm() required for this test")
     424      def test_select_interrupt_noraise(self):
     425          s = self.SELECTOR()
     426          self.addCleanup(s.close)
     427  
     428          rd, wr = self.make_socketpair()
     429  
     430          orig_alrm_handler = signal.signal(signal.SIGALRM, lambda *args: None)
     431          self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
     432  
     433          try:
     434              signal.alarm(1)
     435  
     436              s.register(rd, selectors.EVENT_READ)
     437              t = time()
     438              # select() is interrupted by a signal, but the signal handler doesn't
     439              # raise an exception, so select() should by retries with a recomputed
     440              # timeout
     441              self.assertFalse(s.select(1.5))
     442              self.assertGreaterEqual(time() - t, 1.0)
     443          finally:
     444              signal.alarm(0)
     445  
     446  
     447  class ESC[4;38;5;81mScalableSelectorMixIn:
     448  
     449      # see issue #18963 for why it's skipped on older OS X versions
     450      @support.requires_mac_ver(10, 5)
     451      @unittest.skipUnless(resource, "Test needs resource module")
     452      @support.requires_resource('cpu')
     453      def test_above_fd_setsize(self):
     454          # A scalable implementation should have no problem with more than
     455          # FD_SETSIZE file descriptors. Since we don't know the value, we just
     456          # try to set the soft RLIMIT_NOFILE to the hard RLIMIT_NOFILE ceiling.
     457          soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
     458          try:
     459              resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard))
     460              self.addCleanup(resource.setrlimit, resource.RLIMIT_NOFILE,
     461                              (soft, hard))
     462              NUM_FDS = min(hard, 2**16)
     463          except (OSError, ValueError):
     464              NUM_FDS = soft
     465  
     466          # guard for already allocated FDs (stdin, stdout...)
     467          NUM_FDS -= 32
     468  
     469          s = self.SELECTOR()
     470          self.addCleanup(s.close)
     471  
     472          for i in range(NUM_FDS // 2):
     473              try:
     474                  rd, wr = self.make_socketpair()
     475              except OSError:
     476                  # too many FDs, skip - note that we should only catch EMFILE
     477                  # here, but apparently *BSD and Solaris can fail upon connect()
     478                  # or bind() with EADDRNOTAVAIL, so let's be safe
     479                  self.skipTest("FD limit reached")
     480  
     481              try:
     482                  s.register(rd, selectors.EVENT_READ)
     483                  s.register(wr, selectors.EVENT_WRITE)
     484              except OSError as e:
     485                  if e.errno == errno.ENOSPC:
     486                      # this can be raised by epoll if we go over
     487                      # fs.epoll.max_user_watches sysctl
     488                      self.skipTest("FD limit reached")
     489                  raise
     490  
     491          try:
     492              fds = s.select()
     493          except OSError as e:
     494              if e.errno == errno.EINVAL and sys.platform == 'darwin':
     495                  # unexplainable errors on macOS don't need to fail the test
     496                  self.skipTest("Invalid argument error calling poll()")
     497              raise
     498          self.assertEqual(NUM_FDS // 2, len(fds))
     499  
     500  
     501  class ESC[4;38;5;81mDefaultSelectorTestCase(ESC[4;38;5;149mBaseSelectorTestCase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     502  
     503      SELECTOR = selectors.DefaultSelector
     504  
     505  
     506  class ESC[4;38;5;81mSelectSelectorTestCase(ESC[4;38;5;149mBaseSelectorTestCase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     507  
     508      SELECTOR = selectors.SelectSelector
     509  
     510  
     511  @unittest.skipUnless(hasattr(selectors, 'PollSelector'),
     512                       "Test needs selectors.PollSelector")
     513  class ESC[4;38;5;81mPollSelectorTestCase(ESC[4;38;5;149mBaseSelectorTestCase, ESC[4;38;5;149mScalableSelectorMixIn,
     514                             ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     515  
     516      SELECTOR = getattr(selectors, 'PollSelector', None)
     517  
     518  
     519  @unittest.skipUnless(hasattr(selectors, 'EpollSelector'),
     520                       "Test needs selectors.EpollSelector")
     521  class ESC[4;38;5;81mEpollSelectorTestCase(ESC[4;38;5;149mBaseSelectorTestCase, ESC[4;38;5;149mScalableSelectorMixIn,
     522                              ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     523  
     524      SELECTOR = getattr(selectors, 'EpollSelector', None)
     525  
     526      def test_register_file(self):
     527          # epoll(7) returns EPERM when given a file to watch
     528          s = self.SELECTOR()
     529          with tempfile.NamedTemporaryFile() as f:
     530              with self.assertRaises(IOError):
     531                  s.register(f, selectors.EVENT_READ)
     532              # the SelectorKey has been removed
     533              with self.assertRaises(KeyError):
     534                  s.get_key(f)
     535  
     536  
     537  @unittest.skipUnless(hasattr(selectors, 'KqueueSelector'),
     538                       "Test needs selectors.KqueueSelector)")
     539  class ESC[4;38;5;81mKqueueSelectorTestCase(ESC[4;38;5;149mBaseSelectorTestCase, ESC[4;38;5;149mScalableSelectorMixIn,
     540                               ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     541  
     542      SELECTOR = getattr(selectors, 'KqueueSelector', None)
     543  
     544      def test_register_bad_fd(self):
     545          # a file descriptor that's been closed should raise an OSError
     546          # with EBADF
     547          s = self.SELECTOR()
     548          bad_f = os_helper.make_bad_fd()
     549          with self.assertRaises(OSError) as cm:
     550              s.register(bad_f, selectors.EVENT_READ)
     551          self.assertEqual(cm.exception.errno, errno.EBADF)
     552          # the SelectorKey has been removed
     553          with self.assertRaises(KeyError):
     554              s.get_key(bad_f)
     555  
     556      def test_empty_select_timeout(self):
     557          # Issues #23009, #29255: Make sure timeout is applied when no fds
     558          # are registered.
     559          s = self.SELECTOR()
     560          self.addCleanup(s.close)
     561  
     562          t0 = time()
     563          self.assertEqual(s.select(1), [])
     564          t1 = time()
     565          dt = t1 - t0
     566          # Tolerate 2.0 seconds for very slow buildbots
     567          self.assertTrue(0.8 <= dt <= 2.0, dt)
     568  
     569  
     570  @unittest.skipUnless(hasattr(selectors, 'DevpollSelector'),
     571                       "Test needs selectors.DevpollSelector")
     572  class ESC[4;38;5;81mDevpollSelectorTestCase(ESC[4;38;5;149mBaseSelectorTestCase, ESC[4;38;5;149mScalableSelectorMixIn,
     573                                ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     574  
     575      SELECTOR = getattr(selectors, 'DevpollSelector', None)
     576  
     577  
     578  def tearDownModule():
     579      support.reap_children()
     580  
     581  
     582  if __name__ == "__main__":
     583      unittest.main()