1 # Copyright (c) 2001-2006 Twisted Matrix Laboratories.
2 #
3 # Permission is hereby granted, free of charge, to any person obtaining
4 # a copy of this software and associated documentation files (the
5 # "Software"), to deal in the Software without restriction, including
6 # without limitation the rights to use, copy, modify, merge, publish,
7 # distribute, sublicense, and/or sell copies of the Software, and to
8 # permit persons to whom the Software is furnished to do so, subject to
9 # the following conditions:
10 #
11 # The above copyright notice and this permission notice shall be
12 # included in all copies or substantial portions of the Software.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
15 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
17 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
18 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
19 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21 """
22 Tests for epoll wrapper.
23 """
24 import errno
25 import os
26 import select
27 import socket
28 import time
29 import unittest
30 from test import support
31
32 if not hasattr(select, "epoll"):
33 raise unittest.SkipTest("test works only on Linux 2.6")
34
35 try:
36 select.epoll()
37 except OSError as e:
38 if e.errno == errno.ENOSYS:
39 raise unittest.SkipTest("kernel doesn't support epoll()")
40 raise
41
42 class ESC[4;38;5;81mTestEPoll(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
43
44 def setUp(self):
45 self.serverSocket = socket.create_server(('127.0.0.1', 0))
46 self.connections = [self.serverSocket]
47
48 def tearDown(self):
49 for skt in self.connections:
50 skt.close()
51
52 def _connected_pair(self):
53 client = socket.socket()
54 client.setblocking(False)
55 try:
56 client.connect(('127.0.0.1', self.serverSocket.getsockname()[1]))
57 except OSError as e:
58 self.assertEqual(e.args[0], errno.EINPROGRESS)
59 else:
60 raise AssertionError("Connect should have raised EINPROGRESS")
61 server, addr = self.serverSocket.accept()
62
63 self.connections.extend((client, server))
64 return client, server
65
66 def test_create(self):
67 try:
68 ep = select.epoll(16)
69 except OSError as e:
70 raise AssertionError(str(e))
71 self.assertTrue(ep.fileno() > 0, ep.fileno())
72 self.assertTrue(not ep.closed)
73 ep.close()
74 self.assertTrue(ep.closed)
75 self.assertRaises(ValueError, ep.fileno)
76
77 if hasattr(select, "EPOLL_CLOEXEC"):
78 select.epoll(-1, select.EPOLL_CLOEXEC).close()
79 select.epoll(flags=select.EPOLL_CLOEXEC).close()
80 select.epoll(flags=0).close()
81
82 def test_badcreate(self):
83 self.assertRaises(TypeError, select.epoll, 1, 2, 3)
84 self.assertRaises(TypeError, select.epoll, 'foo')
85 self.assertRaises(TypeError, select.epoll, None)
86 self.assertRaises(TypeError, select.epoll, ())
87 self.assertRaises(TypeError, select.epoll, ['foo'])
88 self.assertRaises(TypeError, select.epoll, {})
89
90 self.assertRaises(ValueError, select.epoll, 0)
91 self.assertRaises(ValueError, select.epoll, -2)
92 self.assertRaises(ValueError, select.epoll, sizehint=-2)
93
94 if hasattr(select, "EPOLL_CLOEXEC"):
95 self.assertRaises(OSError, select.epoll, flags=12356)
96
97 def test_context_manager(self):
98 with select.epoll(16) as ep:
99 self.assertGreater(ep.fileno(), 0)
100 self.assertFalse(ep.closed)
101 self.assertTrue(ep.closed)
102 self.assertRaises(ValueError, ep.fileno)
103
104 def test_add(self):
105 server, client = self._connected_pair()
106
107 ep = select.epoll(2)
108 try:
109 ep.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
110 ep.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
111 finally:
112 ep.close()
113
114 # adding by object w/ fileno works, too.
115 ep = select.epoll(2)
116 try:
117 ep.register(server, select.EPOLLIN | select.EPOLLOUT)
118 ep.register(client, select.EPOLLIN | select.EPOLLOUT)
119 finally:
120 ep.close()
121
122 ep = select.epoll(2)
123 try:
124 # TypeError: argument must be an int, or have a fileno() method.
125 self.assertRaises(TypeError, ep.register, object(),
126 select.EPOLLIN | select.EPOLLOUT)
127 self.assertRaises(TypeError, ep.register, None,
128 select.EPOLLIN | select.EPOLLOUT)
129 # ValueError: file descriptor cannot be a negative integer (-1)
130 self.assertRaises(ValueError, ep.register, -1,
131 select.EPOLLIN | select.EPOLLOUT)
132 # OSError: [Errno 9] Bad file descriptor
133 self.assertRaises(OSError, ep.register, 10000,
134 select.EPOLLIN | select.EPOLLOUT)
135 # registering twice also raises an exception
136 ep.register(server, select.EPOLLIN | select.EPOLLOUT)
137 self.assertRaises(OSError, ep.register, server,
138 select.EPOLLIN | select.EPOLLOUT)
139 finally:
140 ep.close()
141
142 def test_fromfd(self):
143 server, client = self._connected_pair()
144
145 with select.epoll(2) as ep:
146 ep2 = select.epoll.fromfd(ep.fileno())
147
148 ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
149 ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
150
151 events = ep.poll(1, 4)
152 events2 = ep2.poll(0.9, 4)
153 self.assertEqual(len(events), 2)
154 self.assertEqual(len(events2), 2)
155
156 try:
157 ep2.poll(1, 4)
158 except OSError as e:
159 self.assertEqual(e.args[0], errno.EBADF, e)
160 else:
161 self.fail("epoll on closed fd didn't raise EBADF")
162
163 def test_control_and_wait(self):
164 # create the epoll object
165 client, server = self._connected_pair()
166 ep = select.epoll(16)
167 ep.register(server.fileno(),
168 select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
169 ep.register(client.fileno(),
170 select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
171
172 # EPOLLOUT
173 now = time.monotonic()
174 events = ep.poll(1, 4)
175 then = time.monotonic()
176 self.assertFalse(then - now > 0.1, then - now)
177
178 expected = [(client.fileno(), select.EPOLLOUT),
179 (server.fileno(), select.EPOLLOUT)]
180 self.assertEqual(sorted(events), sorted(expected))
181
182 # no event
183 events = ep.poll(timeout=0.1, maxevents=4)
184 self.assertFalse(events)
185
186 # send: EPOLLIN and EPOLLOUT
187 client.sendall(b"Hello!")
188 server.sendall(b"world!!!")
189
190 # we might receive events one at a time, necessitating multiple calls to
191 # poll
192 events = []
193 for _ in support.busy_retry(support.SHORT_TIMEOUT):
194 now = time.monotonic()
195 events += ep.poll(1.0, 4)
196 then = time.monotonic()
197 self.assertFalse(then - now > 0.01)
198 if len(events) >= 2:
199 break
200
201 expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
202 (server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
203 self.assertEqual(sorted(events), sorted(expected))
204
205 # unregister, modify
206 ep.unregister(client.fileno())
207 ep.modify(server.fileno(), select.EPOLLOUT)
208 now = time.monotonic()
209 events = ep.poll(1, 4)
210 then = time.monotonic()
211 self.assertFalse(then - now > 0.01)
212
213 expected = [(server.fileno(), select.EPOLLOUT)]
214 self.assertEqual(events, expected)
215
216 def test_errors(self):
217 self.assertRaises(ValueError, select.epoll, -2)
218 self.assertRaises(ValueError, select.epoll().register, -1,
219 select.EPOLLIN)
220
221 def test_unregister_closed(self):
222 server, client = self._connected_pair()
223 fd = server.fileno()
224 ep = select.epoll(16)
225 ep.register(server)
226
227 now = time.monotonic()
228 events = ep.poll(1, 4)
229 then = time.monotonic()
230 self.assertFalse(then - now > 0.01)
231
232 server.close()
233
234 with self.assertRaises(OSError) as cm:
235 ep.unregister(fd)
236 self.assertEqual(cm.exception.errno, errno.EBADF)
237
238 def test_close(self):
239 open_file = open(__file__, "rb")
240 self.addCleanup(open_file.close)
241 fd = open_file.fileno()
242 epoll = select.epoll()
243
244 # test fileno() method and closed attribute
245 self.assertIsInstance(epoll.fileno(), int)
246 self.assertFalse(epoll.closed)
247
248 # test close()
249 epoll.close()
250 self.assertTrue(epoll.closed)
251 self.assertRaises(ValueError, epoll.fileno)
252
253 # close() can be called more than once
254 epoll.close()
255
256 # operations must fail with ValueError("I/O operation on closed ...")
257 self.assertRaises(ValueError, epoll.modify, fd, select.EPOLLIN)
258 self.assertRaises(ValueError, epoll.poll, 1.0)
259 self.assertRaises(ValueError, epoll.register, fd, select.EPOLLIN)
260 self.assertRaises(ValueError, epoll.unregister, fd)
261
262 def test_fd_non_inheritable(self):
263 epoll = select.epoll()
264 self.addCleanup(epoll.close)
265 self.assertEqual(os.get_inheritable(epoll.fileno()), False)
266
267
268 if __name__ == "__main__":
269 unittest.main()