1 import errno
2 import select
3 import subprocess
4 import sys
5 import textwrap
6 import unittest
7 from test import support
8
9 support.requires_working_socket(module=True)
10
11 @unittest.skipIf((sys.platform[:3]=='win'),
12 "can't easily test on this system")
13 class ESC[4;38;5;81mSelectTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
14
15 class ESC[4;38;5;81mNope:
16 pass
17
18 class ESC[4;38;5;81mAlmost:
19 def fileno(self):
20 return 'fileno'
21
22 def test_error_conditions(self):
23 self.assertRaises(TypeError, select.select, 1, 2, 3)
24 self.assertRaises(TypeError, select.select, [self.Nope()], [], [])
25 self.assertRaises(TypeError, select.select, [self.Almost()], [], [])
26 self.assertRaises(TypeError, select.select, [], [], [], "not a number")
27 self.assertRaises(ValueError, select.select, [], [], [], -1)
28
29 # Issue #12367: http://www.freebsd.org/cgi/query-pr.cgi?pr=kern/155606
30 @unittest.skipIf(sys.platform.startswith('freebsd'),
31 'skip because of a FreeBSD bug: kern/155606')
32 def test_errno(self):
33 with open(__file__, 'rb') as fp:
34 fd = fp.fileno()
35 fp.close()
36 try:
37 select.select([fd], [], [], 0)
38 except OSError as err:
39 self.assertEqual(err.errno, errno.EBADF)
40 else:
41 self.fail("exception not raised")
42
43 def test_returned_list_identity(self):
44 # See issue #8329
45 r, w, x = select.select([], [], [], 1)
46 self.assertIsNot(r, w)
47 self.assertIsNot(r, x)
48 self.assertIsNot(w, x)
49
50 @support.requires_fork()
51 def test_select(self):
52 code = textwrap.dedent('''
53 import time
54 for i in range(10):
55 print("testing...", flush=True)
56 time.sleep(0.050)
57 ''')
58 cmd = [sys.executable, '-I', '-c', code]
59 with subprocess.Popen(cmd, stdout=subprocess.PIPE) as proc:
60 pipe = proc.stdout
61 for timeout in (0, 1, 2, 4, 8, 16) + (None,)*10:
62 if support.verbose:
63 print(f'timeout = {timeout}')
64 rfd, wfd, xfd = select.select([pipe], [], [], timeout)
65 self.assertEqual(wfd, [])
66 self.assertEqual(xfd, [])
67 if not rfd:
68 continue
69 if rfd == [pipe]:
70 line = pipe.readline()
71 if support.verbose:
72 print(repr(line))
73 if not line:
74 if support.verbose:
75 print('EOF')
76 break
77 continue
78 self.fail('Unexpected return values from select():',
79 rfd, wfd, xfd)
80
81 # Issue 16230: Crash on select resized list
82 @unittest.skipIf(
83 support.is_emscripten, "Emscripten cannot select a fd multiple times."
84 )
85 def test_select_mutated(self):
86 a = []
87 class ESC[4;38;5;81mF:
88 def fileno(self):
89 del a[-1]
90 return sys.__stdout__.fileno()
91 a[:] = [F()] * 10
92 self.assertEqual(select.select([], a, []), ([], a[:5], []))
93
94 def test_disallow_instantiation(self):
95 support.check_disallow_instantiation(self, type(select.poll()))
96
97 if hasattr(select, 'devpoll'):
98 support.check_disallow_instantiation(self, type(select.devpoll()))
99
100 def tearDownModule():
101 support.reap_children()
102
103 if __name__ == "__main__":
104 unittest.main()