1 import errno
2 import os
3 import select
4 import subprocess
5 import sys
6 import textwrap
7 import unittest
8 from test import support
9
10 support.requires_working_socket(module=True)
11
12 @unittest.skipIf((sys.platform[:3]=='win'),
13 "can't easily test on this system")
14 class ESC[4;38;5;81mSelectTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
15
16 class ESC[4;38;5;81mNope:
17 pass
18
19 class ESC[4;38;5;81mAlmost:
20 def fileno(self):
21 return 'fileno'
22
23 def test_error_conditions(self):
24 self.assertRaises(TypeError, select.select, 1, 2, 3)
25 self.assertRaises(TypeError, select.select, [self.Nope()], [], [])
26 self.assertRaises(TypeError, select.select, [self.Almost()], [], [])
27 self.assertRaises(TypeError, select.select, [], [], [], "not a number")
28 self.assertRaises(ValueError, select.select, [], [], [], -1)
29
30 # Issue #12367: http://www.freebsd.org/cgi/query-pr.cgi?pr=kern/155606
31 @unittest.skipIf(sys.platform.startswith('freebsd'),
32 'skip because of a FreeBSD bug: kern/155606')
33 def test_errno(self):
34 with open(__file__, 'rb') as fp:
35 fd = fp.fileno()
36 fp.close()
37 try:
38 select.select([fd], [], [], 0)
39 except OSError as err:
40 self.assertEqual(err.errno, errno.EBADF)
41 else:
42 self.fail("exception not raised")
43
44 def test_returned_list_identity(self):
45 # See issue #8329
46 r, w, x = select.select([], [], [], 1)
47 self.assertIsNot(r, w)
48 self.assertIsNot(r, x)
49 self.assertIsNot(w, x)
50
51 @support.requires_fork()
52 def test_select(self):
53 code = textwrap.dedent('''
54 import time
55 for i in range(10):
56 print("testing...", flush=True)
57 time.sleep(0.050)
58 ''')
59 cmd = [sys.executable, '-I', '-c', code]
60 with subprocess.Popen(cmd, stdout=subprocess.PIPE) as proc:
61 pipe = proc.stdout
62 for timeout in (0, 1, 2, 4, 8, 16) + (None,)*10:
63 if support.verbose:
64 print(f'timeout = {timeout}')
65 rfd, wfd, xfd = select.select([pipe], [], [], timeout)
66 self.assertEqual(wfd, [])
67 self.assertEqual(xfd, [])
68 if not rfd:
69 continue
70 if rfd == [pipe]:
71 line = pipe.readline()
72 if support.verbose:
73 print(repr(line))
74 if not line:
75 if support.verbose:
76 print('EOF')
77 break
78 continue
79 self.fail('Unexpected return values from select():',
80 rfd, wfd, xfd)
81
82 # Issue 16230: Crash on select resized list
83 @unittest.skipIf(
84 support.is_emscripten, "Emscripten cannot select a fd multiple times."
85 )
86 def test_select_mutated(self):
87 a = []
88 class ESC[4;38;5;81mF:
89 def fileno(self):
90 del a[-1]
91 return sys.__stdout__.fileno()
92 a[:] = [F()] * 10
93 self.assertEqual(select.select([], a, []), ([], a[:5], []))
94
95 def test_disallow_instantiation(self):
96 support.check_disallow_instantiation(self, type(select.poll()))
97
98 if hasattr(select, 'devpoll'):
99 support.check_disallow_instantiation(self, type(select.devpoll()))
100
101 def tearDownModule():
102 support.reap_children()
103
104 if __name__ == "__main__":
105 unittest.main()