1 import errno
2 import os
3 import random
4 import selectors
5 import signal
6 import socket
7 import sys
8 from test import support
9 from test.support import os_helper
10 from test.support import socket_helper
11 from time import sleep
12 import unittest
13 import unittest.mock
14 import tempfile
15 from time import monotonic as time
16 try:
17 import resource
18 except ImportError:
19 resource = None
20
21
22 if support.is_emscripten or support.is_wasi:
23 raise unittest.SkipTest("Cannot create socketpair on Emscripten/WASI.")
24
25
26 if hasattr(socket, 'socketpair'):
27 socketpair = socket.socketpair
28 else:
29 def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0):
30 with socket.socket(family, type, proto) as l:
31 l.bind((socket_helper.HOST, 0))
32 l.listen()
33 c = socket.socket(family, type, proto)
34 try:
35 c.connect(l.getsockname())
36 caddr = c.getsockname()
37 while True:
38 a, addr = l.accept()
39 # check that we've got the correct client
40 if addr == caddr:
41 return c, a
42 a.close()
43 except OSError:
44 c.close()
45 raise
46
47
48 def find_ready_matching(ready, flag):
49 match = []
50 for key, events in ready:
51 if events & flag:
52 match.append(key.fileobj)
53 return match
54
55
56 class ESC[4;38;5;81mBaseSelectorTestCase:
57
58 def make_socketpair(self):
59 rd, wr = socketpair()
60 self.addCleanup(rd.close)
61 self.addCleanup(wr.close)
62 return rd, wr
63
64 def test_register(self):
65 s = self.SELECTOR()
66 self.addCleanup(s.close)
67
68 rd, wr = self.make_socketpair()
69
70 key = s.register(rd, selectors.EVENT_READ, "data")
71 self.assertIsInstance(key, selectors.SelectorKey)
72 self.assertEqual(key.fileobj, rd)
73 self.assertEqual(key.fd, rd.fileno())
74 self.assertEqual(key.events, selectors.EVENT_READ)
75 self.assertEqual(key.data, "data")
76
77 # register an unknown event
78 self.assertRaises(ValueError, s.register, 0, 999999)
79
80 # register an invalid FD
81 self.assertRaises(ValueError, s.register, -10, selectors.EVENT_READ)
82
83 # register twice
84 self.assertRaises(KeyError, s.register, rd, selectors.EVENT_READ)
85
86 # register the same FD, but with a different object
87 self.assertRaises(KeyError, s.register, rd.fileno(),
88 selectors.EVENT_READ)
89
90 def test_unregister(self):
91 s = self.SELECTOR()
92 self.addCleanup(s.close)
93
94 rd, wr = self.make_socketpair()
95
96 s.register(rd, selectors.EVENT_READ)
97 s.unregister(rd)
98
99 # unregister an unknown file obj
100 self.assertRaises(KeyError, s.unregister, 999999)
101
102 # unregister twice
103 self.assertRaises(KeyError, s.unregister, rd)
104
105 def test_unregister_after_fd_close(self):
106 s = self.SELECTOR()
107 self.addCleanup(s.close)
108 rd, wr = self.make_socketpair()
109 r, w = rd.fileno(), wr.fileno()
110 s.register(r, selectors.EVENT_READ)
111 s.register(w, selectors.EVENT_WRITE)
112 rd.close()
113 wr.close()
114 s.unregister(r)
115 s.unregister(w)
116
117 @unittest.skipUnless(os.name == 'posix', "requires posix")
118 def test_unregister_after_fd_close_and_reuse(self):
119 s = self.SELECTOR()
120 self.addCleanup(s.close)
121 rd, wr = self.make_socketpair()
122 r, w = rd.fileno(), wr.fileno()
123 s.register(r, selectors.EVENT_READ)
124 s.register(w, selectors.EVENT_WRITE)
125 rd2, wr2 = self.make_socketpair()
126 rd.close()
127 wr.close()
128 os.dup2(rd2.fileno(), r)
129 os.dup2(wr2.fileno(), w)
130 self.addCleanup(os.close, r)
131 self.addCleanup(os.close, w)
132 s.unregister(r)
133 s.unregister(w)
134
135 def test_unregister_after_socket_close(self):
136 s = self.SELECTOR()
137 self.addCleanup(s.close)
138 rd, wr = self.make_socketpair()
139 s.register(rd, selectors.EVENT_READ)
140 s.register(wr, selectors.EVENT_WRITE)
141 rd.close()
142 wr.close()
143 s.unregister(rd)
144 s.unregister(wr)
145
146 def test_modify(self):
147 s = self.SELECTOR()
148 self.addCleanup(s.close)
149
150 rd, wr = self.make_socketpair()
151
152 key = s.register(rd, selectors.EVENT_READ)
153
154 # modify events
155 key2 = s.modify(rd, selectors.EVENT_WRITE)
156 self.assertNotEqual(key.events, key2.events)
157 self.assertEqual(key2, s.get_key(rd))
158
159 s.unregister(rd)
160
161 # modify data
162 d1 = object()
163 d2 = object()
164
165 key = s.register(rd, selectors.EVENT_READ, d1)
166 key2 = s.modify(rd, selectors.EVENT_READ, d2)
167 self.assertEqual(key.events, key2.events)
168 self.assertNotEqual(key.data, key2.data)
169 self.assertEqual(key2, s.get_key(rd))
170 self.assertEqual(key2.data, d2)
171
172 # modify unknown file obj
173 self.assertRaises(KeyError, s.modify, 999999, selectors.EVENT_READ)
174
175 # modify use a shortcut
176 d3 = object()
177 s.register = unittest.mock.Mock()
178 s.unregister = unittest.mock.Mock()
179
180 s.modify(rd, selectors.EVENT_READ, d3)
181 self.assertFalse(s.register.called)
182 self.assertFalse(s.unregister.called)
183
184 def test_modify_unregister(self):
185 # Make sure the fd is unregister()ed in case of error on
186 # modify(): http://bugs.python.org/issue30014
187 if self.SELECTOR.__name__ == 'EpollSelector':
188 patch = unittest.mock.patch(
189 'selectors.EpollSelector._selector_cls')
190 elif self.SELECTOR.__name__ == 'PollSelector':
191 patch = unittest.mock.patch(
192 'selectors.PollSelector._selector_cls')
193 elif self.SELECTOR.__name__ == 'DevpollSelector':
194 patch = unittest.mock.patch(
195 'selectors.DevpollSelector._selector_cls')
196 else:
197 raise self.skipTest("")
198
199 with patch as m:
200 m.return_value.modify = unittest.mock.Mock(
201 side_effect=ZeroDivisionError)
202 s = self.SELECTOR()
203 self.addCleanup(s.close)
204 rd, wr = self.make_socketpair()
205 s.register(rd, selectors.EVENT_READ)
206 self.assertEqual(len(s._map), 1)
207 with self.assertRaises(ZeroDivisionError):
208 s.modify(rd, selectors.EVENT_WRITE)
209 self.assertEqual(len(s._map), 0)
210
211 def test_close(self):
212 s = self.SELECTOR()
213 self.addCleanup(s.close)
214
215 mapping = s.get_map()
216 rd, wr = self.make_socketpair()
217
218 s.register(rd, selectors.EVENT_READ)
219 s.register(wr, selectors.EVENT_WRITE)
220
221 s.close()
222 self.assertRaises(RuntimeError, s.get_key, rd)
223 self.assertRaises(RuntimeError, s.get_key, wr)
224 self.assertRaises(KeyError, mapping.__getitem__, rd)
225 self.assertRaises(KeyError, mapping.__getitem__, wr)
226
227 def test_get_key(self):
228 s = self.SELECTOR()
229 self.addCleanup(s.close)
230
231 rd, wr = self.make_socketpair()
232
233 key = s.register(rd, selectors.EVENT_READ, "data")
234 self.assertEqual(key, s.get_key(rd))
235
236 # unknown file obj
237 self.assertRaises(KeyError, s.get_key, 999999)
238
239 def test_get_map(self):
240 s = self.SELECTOR()
241 self.addCleanup(s.close)
242
243 rd, wr = self.make_socketpair()
244
245 keys = s.get_map()
246 self.assertFalse(keys)
247 self.assertEqual(len(keys), 0)
248 self.assertEqual(list(keys), [])
249 key = s.register(rd, selectors.EVENT_READ, "data")
250 self.assertIn(rd, keys)
251 self.assertEqual(key, keys[rd])
252 self.assertEqual(len(keys), 1)
253 self.assertEqual(list(keys), [rd.fileno()])
254 self.assertEqual(list(keys.values()), [key])
255
256 # unknown file obj
257 with self.assertRaises(KeyError):
258 keys[999999]
259
260 # Read-only mapping
261 with self.assertRaises(TypeError):
262 del keys[rd]
263
264 def test_select(self):
265 s = self.SELECTOR()
266 self.addCleanup(s.close)
267
268 rd, wr = self.make_socketpair()
269
270 s.register(rd, selectors.EVENT_READ)
271 wr_key = s.register(wr, selectors.EVENT_WRITE)
272
273 result = s.select()
274 for key, events in result:
275 self.assertTrue(isinstance(key, selectors.SelectorKey))
276 self.assertTrue(events)
277 self.assertFalse(events & ~(selectors.EVENT_READ |
278 selectors.EVENT_WRITE))
279
280 self.assertEqual([(wr_key, selectors.EVENT_WRITE)], result)
281
282 def test_select_read_write(self):
283 # gh-110038: when a file descriptor is registered for both read and
284 # write, the two events must be seen on a single call to select().
285 s = self.SELECTOR()
286 self.addCleanup(s.close)
287
288 sock1, sock2 = self.make_socketpair()
289 sock2.send(b"foo")
290 my_key = s.register(sock1, selectors.EVENT_READ | selectors.EVENT_WRITE)
291
292 seen_read, seen_write = False, False
293 result = s.select()
294 # We get the read and write either in the same result entry or in two
295 # distinct entries with the same key.
296 self.assertLessEqual(len(result), 2)
297 for key, events in result:
298 self.assertTrue(isinstance(key, selectors.SelectorKey))
299 self.assertEqual(key, my_key)
300 self.assertFalse(events & ~(selectors.EVENT_READ |
301 selectors.EVENT_WRITE))
302 if events & selectors.EVENT_READ:
303 self.assertFalse(seen_read)
304 seen_read = True
305 if events & selectors.EVENT_WRITE:
306 self.assertFalse(seen_write)
307 seen_write = True
308 self.assertTrue(seen_read)
309 self.assertTrue(seen_write)
310
311 def test_context_manager(self):
312 s = self.SELECTOR()
313 self.addCleanup(s.close)
314
315 rd, wr = self.make_socketpair()
316
317 with s as sel:
318 sel.register(rd, selectors.EVENT_READ)
319 sel.register(wr, selectors.EVENT_WRITE)
320
321 self.assertRaises(RuntimeError, s.get_key, rd)
322 self.assertRaises(RuntimeError, s.get_key, wr)
323
324 def test_fileno(self):
325 s = self.SELECTOR()
326 self.addCleanup(s.close)
327
328 if hasattr(s, 'fileno'):
329 fd = s.fileno()
330 self.assertTrue(isinstance(fd, int))
331 self.assertGreaterEqual(fd, 0)
332
333 def test_selector(self):
334 s = self.SELECTOR()
335 self.addCleanup(s.close)
336
337 NUM_SOCKETS = 12
338 MSG = b" This is a test."
339 MSG_LEN = len(MSG)
340 readers = []
341 writers = []
342 r2w = {}
343 w2r = {}
344
345 for i in range(NUM_SOCKETS):
346 rd, wr = self.make_socketpair()
347 s.register(rd, selectors.EVENT_READ)
348 s.register(wr, selectors.EVENT_WRITE)
349 readers.append(rd)
350 writers.append(wr)
351 r2w[rd] = wr
352 w2r[wr] = rd
353
354 bufs = []
355
356 while writers:
357 ready = s.select()
358 ready_writers = find_ready_matching(ready, selectors.EVENT_WRITE)
359 if not ready_writers:
360 self.fail("no sockets ready for writing")
361 wr = random.choice(ready_writers)
362 wr.send(MSG)
363
364 for i in range(10):
365 ready = s.select()
366 ready_readers = find_ready_matching(ready,
367 selectors.EVENT_READ)
368 if ready_readers:
369 break
370 # there might be a delay between the write to the write end and
371 # the read end is reported ready
372 sleep(0.1)
373 else:
374 self.fail("no sockets ready for reading")
375 self.assertEqual([w2r[wr]], ready_readers)
376 rd = ready_readers[0]
377 buf = rd.recv(MSG_LEN)
378 self.assertEqual(len(buf), MSG_LEN)
379 bufs.append(buf)
380 s.unregister(r2w[rd])
381 s.unregister(rd)
382 writers.remove(r2w[rd])
383
384 self.assertEqual(bufs, [MSG] * NUM_SOCKETS)
385
386 @unittest.skipIf(sys.platform == 'win32',
387 'select.select() cannot be used with empty fd sets')
388 def test_empty_select(self):
389 # Issue #23009: Make sure EpollSelector.select() works when no FD is
390 # registered.
391 s = self.SELECTOR()
392 self.addCleanup(s.close)
393 self.assertEqual(s.select(timeout=0), [])
394
395 def test_timeout(self):
396 s = self.SELECTOR()
397 self.addCleanup(s.close)
398
399 rd, wr = self.make_socketpair()
400
401 s.register(wr, selectors.EVENT_WRITE)
402 t = time()
403 self.assertEqual(1, len(s.select(0)))
404 self.assertEqual(1, len(s.select(-1)))
405 self.assertLess(time() - t, 0.5)
406
407 s.unregister(wr)
408 s.register(rd, selectors.EVENT_READ)
409 t = time()
410 self.assertFalse(s.select(0))
411 self.assertFalse(s.select(-1))
412 self.assertLess(time() - t, 0.5)
413
414 t0 = time()
415 self.assertFalse(s.select(1))
416 t1 = time()
417 dt = t1 - t0
418 # Tolerate 2.0 seconds for very slow buildbots
419 self.assertTrue(0.8 <= dt <= 2.0, dt)
420
421 @unittest.skipUnless(hasattr(signal, "alarm"),
422 "signal.alarm() required for this test")
423 def test_select_interrupt_exc(self):
424 s = self.SELECTOR()
425 self.addCleanup(s.close)
426
427 rd, wr = self.make_socketpair()
428
429 class ESC[4;38;5;81mInterruptSelect(ESC[4;38;5;149mException):
430 pass
431
432 def handler(*args):
433 raise InterruptSelect
434
435 orig_alrm_handler = signal.signal(signal.SIGALRM, handler)
436 self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
437
438 try:
439 signal.alarm(1)
440
441 s.register(rd, selectors.EVENT_READ)
442 t = time()
443 # select() is interrupted by a signal which raises an exception
444 with self.assertRaises(InterruptSelect):
445 s.select(30)
446 # select() was interrupted before the timeout of 30 seconds
447 self.assertLess(time() - t, 5.0)
448 finally:
449 signal.alarm(0)
450
451 @unittest.skipUnless(hasattr(signal, "alarm"),
452 "signal.alarm() required for this test")
453 def test_select_interrupt_noraise(self):
454 s = self.SELECTOR()
455 self.addCleanup(s.close)
456
457 rd, wr = self.make_socketpair()
458
459 orig_alrm_handler = signal.signal(signal.SIGALRM, lambda *args: None)
460 self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
461
462 try:
463 signal.alarm(1)
464
465 s.register(rd, selectors.EVENT_READ)
466 t = time()
467 # select() is interrupted by a signal, but the signal handler doesn't
468 # raise an exception, so select() should by retries with a recomputed
469 # timeout
470 self.assertFalse(s.select(1.5))
471 self.assertGreaterEqual(time() - t, 1.0)
472 finally:
473 signal.alarm(0)
474
475
476 class ESC[4;38;5;81mScalableSelectorMixIn:
477
478 # see issue #18963 for why it's skipped on older OS X versions
479 @support.requires_mac_ver(10, 5)
480 @unittest.skipUnless(resource, "Test needs resource module")
481 @support.requires_resource('cpu')
482 def test_above_fd_setsize(self):
483 # A scalable implementation should have no problem with more than
484 # FD_SETSIZE file descriptors. Since we don't know the value, we just
485 # try to set the soft RLIMIT_NOFILE to the hard RLIMIT_NOFILE ceiling.
486 soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
487 try:
488 resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard))
489 self.addCleanup(resource.setrlimit, resource.RLIMIT_NOFILE,
490 (soft, hard))
491 NUM_FDS = min(hard, 2**16)
492 except (OSError, ValueError):
493 NUM_FDS = soft
494
495 # guard for already allocated FDs (stdin, stdout...)
496 NUM_FDS -= 32
497
498 s = self.SELECTOR()
499 self.addCleanup(s.close)
500
501 for i in range(NUM_FDS // 2):
502 try:
503 rd, wr = self.make_socketpair()
504 except OSError:
505 # too many FDs, skip - note that we should only catch EMFILE
506 # here, but apparently *BSD and Solaris can fail upon connect()
507 # or bind() with EADDRNOTAVAIL, so let's be safe
508 self.skipTest("FD limit reached")
509
510 try:
511 s.register(rd, selectors.EVENT_READ)
512 s.register(wr, selectors.EVENT_WRITE)
513 except OSError as e:
514 if e.errno == errno.ENOSPC:
515 # this can be raised by epoll if we go over
516 # fs.epoll.max_user_watches sysctl
517 self.skipTest("FD limit reached")
518 raise
519
520 try:
521 fds = s.select()
522 except OSError as e:
523 if e.errno == errno.EINVAL and sys.platform == 'darwin':
524 # unexplainable errors on macOS don't need to fail the test
525 self.skipTest("Invalid argument error calling poll()")
526 raise
527 self.assertEqual(NUM_FDS // 2, len(fds))
528
529
530 class ESC[4;38;5;81mDefaultSelectorTestCase(ESC[4;38;5;149mBaseSelectorTestCase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
531
532 SELECTOR = selectors.DefaultSelector
533
534
535 class ESC[4;38;5;81mSelectSelectorTestCase(ESC[4;38;5;149mBaseSelectorTestCase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
536
537 SELECTOR = selectors.SelectSelector
538
539
540 @unittest.skipUnless(hasattr(selectors, 'PollSelector'),
541 "Test needs selectors.PollSelector")
542 class ESC[4;38;5;81mPollSelectorTestCase(ESC[4;38;5;149mBaseSelectorTestCase, ESC[4;38;5;149mScalableSelectorMixIn,
543 ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
544
545 SELECTOR = getattr(selectors, 'PollSelector', None)
546
547
548 @unittest.skipUnless(hasattr(selectors, 'EpollSelector'),
549 "Test needs selectors.EpollSelector")
550 class ESC[4;38;5;81mEpollSelectorTestCase(ESC[4;38;5;149mBaseSelectorTestCase, ESC[4;38;5;149mScalableSelectorMixIn,
551 ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
552
553 SELECTOR = getattr(selectors, 'EpollSelector', None)
554
555 def test_register_file(self):
556 # epoll(7) returns EPERM when given a file to watch
557 s = self.SELECTOR()
558 with tempfile.NamedTemporaryFile() as f:
559 with self.assertRaises(IOError):
560 s.register(f, selectors.EVENT_READ)
561 # the SelectorKey has been removed
562 with self.assertRaises(KeyError):
563 s.get_key(f)
564
565
566 @unittest.skipUnless(hasattr(selectors, 'KqueueSelector'),
567 "Test needs selectors.KqueueSelector)")
568 class ESC[4;38;5;81mKqueueSelectorTestCase(ESC[4;38;5;149mBaseSelectorTestCase, ESC[4;38;5;149mScalableSelectorMixIn,
569 ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
570
571 SELECTOR = getattr(selectors, 'KqueueSelector', None)
572
573 def test_register_bad_fd(self):
574 # a file descriptor that's been closed should raise an OSError
575 # with EBADF
576 s = self.SELECTOR()
577 bad_f = os_helper.make_bad_fd()
578 with self.assertRaises(OSError) as cm:
579 s.register(bad_f, selectors.EVENT_READ)
580 self.assertEqual(cm.exception.errno, errno.EBADF)
581 # the SelectorKey has been removed
582 with self.assertRaises(KeyError):
583 s.get_key(bad_f)
584
585 def test_empty_select_timeout(self):
586 # Issues #23009, #29255: Make sure timeout is applied when no fds
587 # are registered.
588 s = self.SELECTOR()
589 self.addCleanup(s.close)
590
591 t0 = time()
592 self.assertEqual(s.select(1), [])
593 t1 = time()
594 dt = t1 - t0
595 # Tolerate 2.0 seconds for very slow buildbots
596 self.assertTrue(0.8 <= dt <= 2.0, dt)
597
598
599 @unittest.skipUnless(hasattr(selectors, 'DevpollSelector'),
600 "Test needs selectors.DevpollSelector")
601 class ESC[4;38;5;81mDevpollSelectorTestCase(ESC[4;38;5;149mBaseSelectorTestCase, ESC[4;38;5;149mScalableSelectorMixIn,
602 ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
603
604 SELECTOR = getattr(selectors, 'DevpollSelector', None)
605
606
607 def tearDownModule():
608 support.reap_children()
609
610
611 if __name__ == "__main__":
612 unittest.main()