1 # Copyright (c) 2001-2006 Twisted Matrix Laboratories.
2 #
3 # Permission is hereby granted, free of charge, to any person obtaining
4 # a copy of this software and associated documentation files (the
5 # "Software"), to deal in the Software without restriction, including
6 # without limitation the rights to use, copy, modify, merge, publish,
7 # distribute, sublicense, and/or sell copies of the Software, and to
8 # permit persons to whom the Software is furnished to do so, subject to
9 # the following conditions:
10 #
11 # The above copyright notice and this permission notice shall be
12 # included in all copies or substantial portions of the Software.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
15 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
17 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
18 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
19 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21 """
22 Tests for epoll wrapper.
23 """
24 import errno
25 import os
26 import select
27 import socket
28 import time
29 import unittest
30
31 if not hasattr(select, "epoll"):
32 raise unittest.SkipTest("test works only on Linux 2.6")
33
34 try:
35 select.epoll()
36 except OSError as e:
37 if e.errno == errno.ENOSYS:
38 raise unittest.SkipTest("kernel doesn't support epoll()")
39 raise
40
41 class ESC[4;38;5;81mTestEPoll(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
42
43 def setUp(self):
44 self.serverSocket = socket.create_server(('127.0.0.1', 0))
45 self.connections = [self.serverSocket]
46
47 def tearDown(self):
48 for skt in self.connections:
49 skt.close()
50
51 def _connected_pair(self):
52 client = socket.socket()
53 client.setblocking(False)
54 try:
55 client.connect(('127.0.0.1', self.serverSocket.getsockname()[1]))
56 except OSError as e:
57 self.assertEqual(e.args[0], errno.EINPROGRESS)
58 else:
59 raise AssertionError("Connect should have raised EINPROGRESS")
60 server, addr = self.serverSocket.accept()
61
62 self.connections.extend((client, server))
63 return client, server
64
65 def test_create(self):
66 try:
67 ep = select.epoll(16)
68 except OSError as e:
69 raise AssertionError(str(e))
70 self.assertTrue(ep.fileno() > 0, ep.fileno())
71 self.assertTrue(not ep.closed)
72 ep.close()
73 self.assertTrue(ep.closed)
74 self.assertRaises(ValueError, ep.fileno)
75
76 if hasattr(select, "EPOLL_CLOEXEC"):
77 select.epoll(-1, select.EPOLL_CLOEXEC).close()
78 select.epoll(flags=select.EPOLL_CLOEXEC).close()
79 select.epoll(flags=0).close()
80
81 def test_badcreate(self):
82 self.assertRaises(TypeError, select.epoll, 1, 2, 3)
83 self.assertRaises(TypeError, select.epoll, 'foo')
84 self.assertRaises(TypeError, select.epoll, None)
85 self.assertRaises(TypeError, select.epoll, ())
86 self.assertRaises(TypeError, select.epoll, ['foo'])
87 self.assertRaises(TypeError, select.epoll, {})
88
89 self.assertRaises(ValueError, select.epoll, 0)
90 self.assertRaises(ValueError, select.epoll, -2)
91 self.assertRaises(ValueError, select.epoll, sizehint=-2)
92
93 if hasattr(select, "EPOLL_CLOEXEC"):
94 self.assertRaises(OSError, select.epoll, flags=12356)
95
96 def test_context_manager(self):
97 with select.epoll(16) as ep:
98 self.assertGreater(ep.fileno(), 0)
99 self.assertFalse(ep.closed)
100 self.assertTrue(ep.closed)
101 self.assertRaises(ValueError, ep.fileno)
102
103 def test_add(self):
104 server, client = self._connected_pair()
105
106 ep = select.epoll(2)
107 try:
108 ep.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
109 ep.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
110 finally:
111 ep.close()
112
113 # adding by object w/ fileno works, too.
114 ep = select.epoll(2)
115 try:
116 ep.register(server, select.EPOLLIN | select.EPOLLOUT)
117 ep.register(client, select.EPOLLIN | select.EPOLLOUT)
118 finally:
119 ep.close()
120
121 ep = select.epoll(2)
122 try:
123 # TypeError: argument must be an int, or have a fileno() method.
124 self.assertRaises(TypeError, ep.register, object(),
125 select.EPOLLIN | select.EPOLLOUT)
126 self.assertRaises(TypeError, ep.register, None,
127 select.EPOLLIN | select.EPOLLOUT)
128 # ValueError: file descriptor cannot be a negative integer (-1)
129 self.assertRaises(ValueError, ep.register, -1,
130 select.EPOLLIN | select.EPOLLOUT)
131 # OSError: [Errno 9] Bad file descriptor
132 self.assertRaises(OSError, ep.register, 10000,
133 select.EPOLLIN | select.EPOLLOUT)
134 # registering twice also raises an exception
135 ep.register(server, select.EPOLLIN | select.EPOLLOUT)
136 self.assertRaises(OSError, ep.register, server,
137 select.EPOLLIN | select.EPOLLOUT)
138 finally:
139 ep.close()
140
141 def test_fromfd(self):
142 server, client = self._connected_pair()
143
144 with select.epoll(2) as ep:
145 ep2 = select.epoll.fromfd(ep.fileno())
146
147 ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
148 ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
149
150 events = ep.poll(1, 4)
151 events2 = ep2.poll(0.9, 4)
152 self.assertEqual(len(events), 2)
153 self.assertEqual(len(events2), 2)
154
155 try:
156 ep2.poll(1, 4)
157 except OSError as e:
158 self.assertEqual(e.args[0], errno.EBADF, e)
159 else:
160 self.fail("epoll on closed fd didn't raise EBADF")
161
162 def test_control_and_wait(self):
163 # create the epoll object
164 client, server = self._connected_pair()
165 ep = select.epoll(16)
166 ep.register(server.fileno(),
167 select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
168 ep.register(client.fileno(),
169 select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
170
171 # EPOLLOUT
172 now = time.monotonic()
173 events = ep.poll(1, 4)
174 then = time.monotonic()
175 self.assertFalse(then - now > 0.1, then - now)
176
177 expected = [(client.fileno(), select.EPOLLOUT),
178 (server.fileno(), select.EPOLLOUT)]
179 self.assertEqual(sorted(events), sorted(expected))
180
181 # no event
182 events = ep.poll(timeout=0.1, maxevents=4)
183 self.assertFalse(events)
184
185 # send: EPOLLIN and EPOLLOUT
186 client.sendall(b"Hello!")
187 server.sendall(b"world!!!")
188
189 now = time.monotonic()
190 events = ep.poll(1.0, 4)
191 then = time.monotonic()
192 self.assertFalse(then - now > 0.01)
193
194 expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
195 (server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
196 self.assertEqual(sorted(events), sorted(expected))
197
198 # unregister, modify
199 ep.unregister(client.fileno())
200 ep.modify(server.fileno(), select.EPOLLOUT)
201 now = time.monotonic()
202 events = ep.poll(1, 4)
203 then = time.monotonic()
204 self.assertFalse(then - now > 0.01)
205
206 expected = [(server.fileno(), select.EPOLLOUT)]
207 self.assertEqual(events, expected)
208
209 def test_errors(self):
210 self.assertRaises(ValueError, select.epoll, -2)
211 self.assertRaises(ValueError, select.epoll().register, -1,
212 select.EPOLLIN)
213
214 def test_unregister_closed(self):
215 server, client = self._connected_pair()
216 fd = server.fileno()
217 ep = select.epoll(16)
218 ep.register(server)
219
220 now = time.monotonic()
221 events = ep.poll(1, 4)
222 then = time.monotonic()
223 self.assertFalse(then - now > 0.01)
224
225 server.close()
226
227 with self.assertRaises(OSError) as cm:
228 ep.unregister(fd)
229 self.assertEqual(cm.exception.errno, errno.EBADF)
230
231 def test_close(self):
232 open_file = open(__file__, "rb")
233 self.addCleanup(open_file.close)
234 fd = open_file.fileno()
235 epoll = select.epoll()
236
237 # test fileno() method and closed attribute
238 self.assertIsInstance(epoll.fileno(), int)
239 self.assertFalse(epoll.closed)
240
241 # test close()
242 epoll.close()
243 self.assertTrue(epoll.closed)
244 self.assertRaises(ValueError, epoll.fileno)
245
246 # close() can be called more than once
247 epoll.close()
248
249 # operations must fail with ValueError("I/O operation on closed ...")
250 self.assertRaises(ValueError, epoll.modify, fd, select.EPOLLIN)
251 self.assertRaises(ValueError, epoll.poll, 1.0)
252 self.assertRaises(ValueError, epoll.register, fd, select.EPOLLIN)
253 self.assertRaises(ValueError, epoll.unregister, fd)
254
255 def test_fd_non_inheritable(self):
256 epoll = select.epoll()
257 self.addCleanup(epoll.close)
258 self.assertEqual(os.get_inheritable(epoll.fileno()), False)
259
260
261 if __name__ == "__main__":
262 unittest.main()