python (3.11.7)
1 # Test case for the os.poll() function
2
3 import os
4 import subprocess
5 import random
6 import select
7 import threading
8 import time
9 import unittest
10 from test.support import (
11 cpython_only, requires_subprocess, requires_working_socket, requires_resource
12 )
13 from test.support import threading_helper
14 from test.support.os_helper import TESTFN
15
16
17 try:
18 select.poll
19 except AttributeError:
20 raise unittest.SkipTest("select.poll not defined")
21
22 requires_working_socket(module=True)
23
24 def find_ready_matching(ready, flag):
25 match = []
26 for fd, mode in ready:
27 if mode & flag:
28 match.append(fd)
29 return match
30
31 class ESC[4;38;5;81mPollTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
32
33 def test_poll1(self):
34 # Basic functional test of poll object
35 # Create a bunch of pipe and test that poll works with them.
36
37 p = select.poll()
38
39 NUM_PIPES = 12
40 MSG = b" This is a test."
41 MSG_LEN = len(MSG)
42 readers = []
43 writers = []
44 r2w = {}
45 w2r = {}
46
47 for i in range(NUM_PIPES):
48 rd, wr = os.pipe()
49 p.register(rd)
50 p.modify(rd, select.POLLIN)
51 p.register(wr, select.POLLOUT)
52 readers.append(rd)
53 writers.append(wr)
54 r2w[rd] = wr
55 w2r[wr] = rd
56
57 bufs = []
58
59 while writers:
60 ready = p.poll()
61 ready_writers = find_ready_matching(ready, select.POLLOUT)
62 if not ready_writers:
63 raise RuntimeError("no pipes ready for writing")
64 wr = random.choice(ready_writers)
65 os.write(wr, MSG)
66
67 ready = p.poll()
68 ready_readers = find_ready_matching(ready, select.POLLIN)
69 if not ready_readers:
70 raise RuntimeError("no pipes ready for reading")
71 rd = random.choice(ready_readers)
72 buf = os.read(rd, MSG_LEN)
73 self.assertEqual(len(buf), MSG_LEN)
74 bufs.append(buf)
75 os.close(r2w[rd]) ; os.close( rd )
76 p.unregister( r2w[rd] )
77 p.unregister( rd )
78 writers.remove(r2w[rd])
79
80 self.assertEqual(bufs, [MSG] * NUM_PIPES)
81
82 def test_poll_unit_tests(self):
83 # returns NVAL for invalid file descriptor
84 FD, w = os.pipe()
85 os.close(FD)
86 os.close(w)
87 p = select.poll()
88 p.register(FD)
89 r = p.poll()
90 self.assertEqual(r[0], (FD, select.POLLNVAL))
91
92 with open(TESTFN, 'w') as f:
93 fd = f.fileno()
94 p = select.poll()
95 p.register(f)
96 r = p.poll()
97 self.assertEqual(r[0][0], fd)
98 r = p.poll()
99 self.assertEqual(r[0], (fd, select.POLLNVAL))
100 os.unlink(TESTFN)
101
102 # type error for invalid arguments
103 p = select.poll()
104 self.assertRaises(TypeError, p.register, p)
105 self.assertRaises(TypeError, p.unregister, p)
106
107 # can't unregister non-existent object
108 p = select.poll()
109 self.assertRaises(KeyError, p.unregister, 3)
110
111 # Test error cases
112 pollster = select.poll()
113 class ESC[4;38;5;81mNope:
114 pass
115
116 class ESC[4;38;5;81mAlmost:
117 def fileno(self):
118 return 'fileno'
119
120 self.assertRaises(TypeError, pollster.register, Nope(), 0)
121 self.assertRaises(TypeError, pollster.register, Almost(), 0)
122
123 # Another test case for poll(). This is copied from the test case for
124 # select(), modified to use poll() instead.
125
126 @requires_subprocess()
127 @requires_resource('walltime')
128 def test_poll2(self):
129 cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
130 proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
131 bufsize=0)
132 self.enterContext(proc)
133 p = proc.stdout
134 pollster = select.poll()
135 pollster.register( p, select.POLLIN )
136 for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
137 fdlist = pollster.poll(tout)
138 if (fdlist == []):
139 continue
140 fd, flags = fdlist[0]
141 if flags & select.POLLHUP:
142 line = p.readline()
143 if line != b"":
144 self.fail('error: pipe seems to be closed, but still returns data')
145 continue
146
147 elif flags & select.POLLIN:
148 line = p.readline()
149 if not line:
150 break
151 self.assertEqual(line, b'testing...\n')
152 continue
153 else:
154 self.fail('Unexpected return value from select.poll: %s' % fdlist)
155
156 def test_poll3(self):
157 # test int overflow
158 pollster = select.poll()
159 pollster.register(1)
160
161 self.assertRaises(OverflowError, pollster.poll, 1 << 64)
162
163 x = 2 + 3
164 if x != 5:
165 self.fail('Overflow must have occurred')
166
167 # Issues #15989, #17919
168 self.assertRaises(ValueError, pollster.register, 0, -1)
169 self.assertRaises(OverflowError, pollster.register, 0, 1 << 64)
170 self.assertRaises(ValueError, pollster.modify, 1, -1)
171 self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64)
172
173 @cpython_only
174 def test_poll_c_limits(self):
175 from _testcapi import USHRT_MAX, INT_MAX, UINT_MAX
176 pollster = select.poll()
177 pollster.register(1)
178
179 # Issues #15989, #17919
180 self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
181 self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
182 self.assertRaises(OverflowError, pollster.poll, INT_MAX + 1)
183 self.assertRaises(OverflowError, pollster.poll, UINT_MAX + 1)
184
185 @threading_helper.reap_threads
186 def test_threaded_poll(self):
187 r, w = os.pipe()
188 self.addCleanup(os.close, r)
189 self.addCleanup(os.close, w)
190 rfds = []
191 for i in range(10):
192 fd = os.dup(r)
193 self.addCleanup(os.close, fd)
194 rfds.append(fd)
195 pollster = select.poll()
196 for fd in rfds:
197 pollster.register(fd, select.POLLIN)
198
199 t = threading.Thread(target=pollster.poll)
200 t.start()
201 try:
202 time.sleep(0.5)
203 # trigger ufds array reallocation
204 for fd in rfds:
205 pollster.unregister(fd)
206 pollster.register(w, select.POLLOUT)
207 self.assertRaises(RuntimeError, pollster.poll)
208 finally:
209 # and make the call to poll() from the thread return
210 os.write(w, b'spam')
211 t.join()
212
213 @unittest.skipUnless(threading, 'Threading required for this test.')
214 @threading_helper.reap_threads
215 def test_poll_blocks_with_negative_ms(self):
216 for timeout_ms in [None, -1000, -1, -1.0, -0.1, -1e-100]:
217 # Create two file descriptors. This will be used to unlock
218 # the blocking call to poll.poll inside the thread
219 r, w = os.pipe()
220 pollster = select.poll()
221 pollster.register(r, select.POLLIN)
222
223 poll_thread = threading.Thread(target=pollster.poll, args=(timeout_ms,))
224 poll_thread.start()
225 poll_thread.join(timeout=0.1)
226 self.assertTrue(poll_thread.is_alive())
227
228 # Write to the pipe so pollster.poll unblocks and the thread ends.
229 os.write(w, b'spam')
230 poll_thread.join()
231 self.assertFalse(poll_thread.is_alive())
232 os.close(r)
233 os.close(w)
234
235
236 if __name__ == '__main__':
237 unittest.main()