1 """Selectors module.
2
3 This module allows high-level and efficient I/O multiplexing, built upon the
4 `select` module primitives.
5 """
6
7
8 from abc import ABCMeta, abstractmethod
9 from collections import namedtuple
10 from collections.abc import Mapping
11 import math
12 import select
13 import sys
14
15
16 # generic events, that must be mapped to implementation-specific ones
17 EVENT_READ = (1 << 0)
18 EVENT_WRITE = (1 << 1)
19
20
21 def _fileobj_to_fd(fileobj):
22 """Return a file descriptor from a file object.
23
24 Parameters:
25 fileobj -- file object or file descriptor
26
27 Returns:
28 corresponding file descriptor
29
30 Raises:
31 ValueError if the object is invalid
32 """
33 if isinstance(fileobj, int):
34 fd = fileobj
35 else:
36 try:
37 fd = int(fileobj.fileno())
38 except (AttributeError, TypeError, ValueError):
39 raise ValueError("Invalid file object: "
40 "{!r}".format(fileobj)) from None
41 if fd < 0:
42 raise ValueError("Invalid file descriptor: {}".format(fd))
43 return fd
44
45
46 SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
47
48 SelectorKey.__doc__ = """SelectorKey(fileobj, fd, events, data)
49
50 Object used to associate a file object to its backing
51 file descriptor, selected event mask, and attached data.
52 """
53 SelectorKey.fileobj.__doc__ = 'File object registered.'
54 SelectorKey.fd.__doc__ = 'Underlying file descriptor.'
55 SelectorKey.events.__doc__ = 'Events that must be waited for on this file object.'
56 SelectorKey.data.__doc__ = ('''Optional opaque data associated to this file object.
57 For example, this could be used to store a per-client session ID.''')
58
59
60 class ESC[4;38;5;81m_SelectorMapping(ESC[4;38;5;149mMapping):
61 """Mapping of file objects to selector keys."""
62
63 def __init__(self, selector):
64 self._selector = selector
65
66 def __len__(self):
67 return len(self._selector._fd_to_key)
68
69 def __getitem__(self, fileobj):
70 try:
71 fd = self._selector._fileobj_lookup(fileobj)
72 return self._selector._fd_to_key[fd]
73 except KeyError:
74 raise KeyError("{!r} is not registered".format(fileobj)) from None
75
76 def __iter__(self):
77 return iter(self._selector._fd_to_key)
78
79
80 class ESC[4;38;5;81mBaseSelector(metaclass=ESC[4;38;5;149mABCMeta):
81 """Selector abstract base class.
82
83 A selector supports registering file objects to be monitored for specific
84 I/O events.
85
86 A file object is a file descriptor or any object with a `fileno()` method.
87 An arbitrary object can be attached to the file object, which can be used
88 for example to store context information, a callback, etc.
89
90 A selector can use various implementations (select(), poll(), epoll()...)
91 depending on the platform. The default `Selector` class uses the most
92 efficient implementation on the current platform.
93 """
94
95 @abstractmethod
96 def register(self, fileobj, events, data=None):
97 """Register a file object.
98
99 Parameters:
100 fileobj -- file object or file descriptor
101 events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
102 data -- attached data
103
104 Returns:
105 SelectorKey instance
106
107 Raises:
108 ValueError if events is invalid
109 KeyError if fileobj is already registered
110 OSError if fileobj is closed or otherwise is unacceptable to
111 the underlying system call (if a system call is made)
112
113 Note:
114 OSError may or may not be raised
115 """
116 raise NotImplementedError
117
118 @abstractmethod
119 def unregister(self, fileobj):
120 """Unregister a file object.
121
122 Parameters:
123 fileobj -- file object or file descriptor
124
125 Returns:
126 SelectorKey instance
127
128 Raises:
129 KeyError if fileobj is not registered
130
131 Note:
132 If fileobj is registered but has since been closed this does
133 *not* raise OSError (even if the wrapped syscall does)
134 """
135 raise NotImplementedError
136
137 def modify(self, fileobj, events, data=None):
138 """Change a registered file object monitored events or attached data.
139
140 Parameters:
141 fileobj -- file object or file descriptor
142 events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
143 data -- attached data
144
145 Returns:
146 SelectorKey instance
147
148 Raises:
149 Anything that unregister() or register() raises
150 """
151 self.unregister(fileobj)
152 return self.register(fileobj, events, data)
153
154 @abstractmethod
155 def select(self, timeout=None):
156 """Perform the actual selection, until some monitored file objects are
157 ready or a timeout expires.
158
159 Parameters:
160 timeout -- if timeout > 0, this specifies the maximum wait time, in
161 seconds
162 if timeout <= 0, the select() call won't block, and will
163 report the currently ready file objects
164 if timeout is None, select() will block until a monitored
165 file object becomes ready
166
167 Returns:
168 list of (key, events) for ready file objects
169 `events` is a bitwise mask of EVENT_READ|EVENT_WRITE
170 """
171 raise NotImplementedError
172
173 def close(self):
174 """Close the selector.
175
176 This must be called to make sure that any underlying resource is freed.
177 """
178 pass
179
180 def get_key(self, fileobj):
181 """Return the key associated to a registered file object.
182
183 Returns:
184 SelectorKey for this file object
185 """
186 mapping = self.get_map()
187 if mapping is None:
188 raise RuntimeError('Selector is closed')
189 try:
190 return mapping[fileobj]
191 except KeyError:
192 raise KeyError("{!r} is not registered".format(fileobj)) from None
193
194 @abstractmethod
195 def get_map(self):
196 """Return a mapping of file objects to selector keys."""
197 raise NotImplementedError
198
199 def __enter__(self):
200 return self
201
202 def __exit__(self, *args):
203 self.close()
204
205
206 class ESC[4;38;5;81m_BaseSelectorImpl(ESC[4;38;5;149mBaseSelector):
207 """Base selector implementation."""
208
209 def __init__(self):
210 # this maps file descriptors to keys
211 self._fd_to_key = {}
212 # read-only mapping returned by get_map()
213 self._map = _SelectorMapping(self)
214
215 def _fileobj_lookup(self, fileobj):
216 """Return a file descriptor from a file object.
217
218 This wraps _fileobj_to_fd() to do an exhaustive search in case
219 the object is invalid but we still have it in our map. This
220 is used by unregister() so we can unregister an object that
221 was previously registered even if it is closed. It is also
222 used by _SelectorMapping.
223 """
224 try:
225 return _fileobj_to_fd(fileobj)
226 except ValueError:
227 # Do an exhaustive search.
228 for key in self._fd_to_key.values():
229 if key.fileobj is fileobj:
230 return key.fd
231 # Raise ValueError after all.
232 raise
233
234 def register(self, fileobj, events, data=None):
235 if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
236 raise ValueError("Invalid events: {!r}".format(events))
237
238 key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
239
240 if key.fd in self._fd_to_key:
241 raise KeyError("{!r} (FD {}) is already registered"
242 .format(fileobj, key.fd))
243
244 self._fd_to_key[key.fd] = key
245 return key
246
247 def unregister(self, fileobj):
248 try:
249 key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))
250 except KeyError:
251 raise KeyError("{!r} is not registered".format(fileobj)) from None
252 return key
253
254 def modify(self, fileobj, events, data=None):
255 try:
256 key = self._fd_to_key[self._fileobj_lookup(fileobj)]
257 except KeyError:
258 raise KeyError("{!r} is not registered".format(fileobj)) from None
259 if events != key.events:
260 self.unregister(fileobj)
261 key = self.register(fileobj, events, data)
262 elif data != key.data:
263 # Use a shortcut to update the data.
264 key = key._replace(data=data)
265 self._fd_to_key[key.fd] = key
266 return key
267
268 def close(self):
269 self._fd_to_key.clear()
270 self._map = None
271
272 def get_map(self):
273 return self._map
274
275 def _key_from_fd(self, fd):
276 """Return the key associated to a given file descriptor.
277
278 Parameters:
279 fd -- file descriptor
280
281 Returns:
282 corresponding key, or None if not found
283 """
284 try:
285 return self._fd_to_key[fd]
286 except KeyError:
287 return None
288
289
290 class ESC[4;38;5;81mSelectSelector(ESC[4;38;5;149m_BaseSelectorImpl):
291 """Select-based selector."""
292
293 def __init__(self):
294 super().__init__()
295 self._readers = set()
296 self._writers = set()
297
298 def register(self, fileobj, events, data=None):
299 key = super().register(fileobj, events, data)
300 if events & EVENT_READ:
301 self._readers.add(key.fd)
302 if events & EVENT_WRITE:
303 self._writers.add(key.fd)
304 return key
305
306 def unregister(self, fileobj):
307 key = super().unregister(fileobj)
308 self._readers.discard(key.fd)
309 self._writers.discard(key.fd)
310 return key
311
312 if sys.platform == 'win32':
313 def _select(self, r, w, _, timeout=None):
314 r, w, x = select.select(r, w, w, timeout)
315 return r, w + x, []
316 else:
317 _select = select.select
318
319 def select(self, timeout=None):
320 timeout = None if timeout is None else max(timeout, 0)
321 ready = []
322 try:
323 r, w, _ = self._select(self._readers, self._writers, [], timeout)
324 except InterruptedError:
325 return ready
326 r = set(r)
327 w = set(w)
328 for fd in r | w:
329 events = 0
330 if fd in r:
331 events |= EVENT_READ
332 if fd in w:
333 events |= EVENT_WRITE
334
335 key = self._key_from_fd(fd)
336 if key:
337 ready.append((key, events & key.events))
338 return ready
339
340
341 class ESC[4;38;5;81m_PollLikeSelector(ESC[4;38;5;149m_BaseSelectorImpl):
342 """Base class shared between poll, epoll and devpoll selectors."""
343 _selector_cls = None
344 _EVENT_READ = None
345 _EVENT_WRITE = None
346
347 def __init__(self):
348 super().__init__()
349 self._selector = self._selector_cls()
350
351 def register(self, fileobj, events, data=None):
352 key = super().register(fileobj, events, data)
353 poller_events = 0
354 if events & EVENT_READ:
355 poller_events |= self._EVENT_READ
356 if events & EVENT_WRITE:
357 poller_events |= self._EVENT_WRITE
358 try:
359 self._selector.register(key.fd, poller_events)
360 except:
361 super().unregister(fileobj)
362 raise
363 return key
364
365 def unregister(self, fileobj):
366 key = super().unregister(fileobj)
367 try:
368 self._selector.unregister(key.fd)
369 except OSError:
370 # This can happen if the FD was closed since it
371 # was registered.
372 pass
373 return key
374
375 def modify(self, fileobj, events, data=None):
376 try:
377 key = self._fd_to_key[self._fileobj_lookup(fileobj)]
378 except KeyError:
379 raise KeyError(f"{fileobj!r} is not registered") from None
380
381 changed = False
382 if events != key.events:
383 selector_events = 0
384 if events & EVENT_READ:
385 selector_events |= self._EVENT_READ
386 if events & EVENT_WRITE:
387 selector_events |= self._EVENT_WRITE
388 try:
389 self._selector.modify(key.fd, selector_events)
390 except:
391 super().unregister(fileobj)
392 raise
393 changed = True
394 if data != key.data:
395 changed = True
396
397 if changed:
398 key = key._replace(events=events, data=data)
399 self._fd_to_key[key.fd] = key
400 return key
401
402 def select(self, timeout=None):
403 # This is shared between poll() and epoll().
404 # epoll() has a different signature and handling of timeout parameter.
405 if timeout is None:
406 timeout = None
407 elif timeout <= 0:
408 timeout = 0
409 else:
410 # poll() has a resolution of 1 millisecond, round away from
411 # zero to wait *at least* timeout seconds.
412 timeout = math.ceil(timeout * 1e3)
413 ready = []
414 try:
415 fd_event_list = self._selector.poll(timeout)
416 except InterruptedError:
417 return ready
418 for fd, event in fd_event_list:
419 events = 0
420 if event & ~self._EVENT_READ:
421 events |= EVENT_WRITE
422 if event & ~self._EVENT_WRITE:
423 events |= EVENT_READ
424
425 key = self._key_from_fd(fd)
426 if key:
427 ready.append((key, events & key.events))
428 return ready
429
430
431 if hasattr(select, 'poll'):
432
433 class ESC[4;38;5;81mPollSelector(ESC[4;38;5;149m_PollLikeSelector):
434 """Poll-based selector."""
435 _selector_cls = select.poll
436 _EVENT_READ = select.POLLIN
437 _EVENT_WRITE = select.POLLOUT
438
439
440 if hasattr(select, 'epoll'):
441
442 class ESC[4;38;5;81mEpollSelector(ESC[4;38;5;149m_PollLikeSelector):
443 """Epoll-based selector."""
444 _selector_cls = select.epoll
445 _EVENT_READ = select.EPOLLIN
446 _EVENT_WRITE = select.EPOLLOUT
447
448 def fileno(self):
449 return self._selector.fileno()
450
451 def select(self, timeout=None):
452 if timeout is None:
453 timeout = -1
454 elif timeout <= 0:
455 timeout = 0
456 else:
457 # epoll_wait() has a resolution of 1 millisecond, round away
458 # from zero to wait *at least* timeout seconds.
459 timeout = math.ceil(timeout * 1e3) * 1e-3
460
461 # epoll_wait() expects `maxevents` to be greater than zero;
462 # we want to make sure that `select()` can be called when no
463 # FD is registered.
464 max_ev = max(len(self._fd_to_key), 1)
465
466 ready = []
467 try:
468 fd_event_list = self._selector.poll(timeout, max_ev)
469 except InterruptedError:
470 return ready
471 for fd, event in fd_event_list:
472 events = 0
473 if event & ~select.EPOLLIN:
474 events |= EVENT_WRITE
475 if event & ~select.EPOLLOUT:
476 events |= EVENT_READ
477
478 key = self._key_from_fd(fd)
479 if key:
480 ready.append((key, events & key.events))
481 return ready
482
483 def close(self):
484 self._selector.close()
485 super().close()
486
487
488 if hasattr(select, 'devpoll'):
489
490 class ESC[4;38;5;81mDevpollSelector(ESC[4;38;5;149m_PollLikeSelector):
491 """Solaris /dev/poll selector."""
492 _selector_cls = select.devpoll
493 _EVENT_READ = select.POLLIN
494 _EVENT_WRITE = select.POLLOUT
495
496 def fileno(self):
497 return self._selector.fileno()
498
499 def close(self):
500 self._selector.close()
501 super().close()
502
503
504 if hasattr(select, 'kqueue'):
505
506 class ESC[4;38;5;81mKqueueSelector(ESC[4;38;5;149m_BaseSelectorImpl):
507 """Kqueue-based selector."""
508
509 def __init__(self):
510 super().__init__()
511 self._selector = select.kqueue()
512
513 def fileno(self):
514 return self._selector.fileno()
515
516 def register(self, fileobj, events, data=None):
517 key = super().register(fileobj, events, data)
518 try:
519 if events & EVENT_READ:
520 kev = select.kevent(key.fd, select.KQ_FILTER_READ,
521 select.KQ_EV_ADD)
522 self._selector.control([kev], 0, 0)
523 if events & EVENT_WRITE:
524 kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
525 select.KQ_EV_ADD)
526 self._selector.control([kev], 0, 0)
527 except:
528 super().unregister(fileobj)
529 raise
530 return key
531
532 def unregister(self, fileobj):
533 key = super().unregister(fileobj)
534 if key.events & EVENT_READ:
535 kev = select.kevent(key.fd, select.KQ_FILTER_READ,
536 select.KQ_EV_DELETE)
537 try:
538 self._selector.control([kev], 0, 0)
539 except OSError:
540 # This can happen if the FD was closed since it
541 # was registered.
542 pass
543 if key.events & EVENT_WRITE:
544 kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
545 select.KQ_EV_DELETE)
546 try:
547 self._selector.control([kev], 0, 0)
548 except OSError:
549 # See comment above.
550 pass
551 return key
552
553 def select(self, timeout=None):
554 timeout = None if timeout is None else max(timeout, 0)
555 # If max_ev is 0, kqueue will ignore the timeout. For consistent
556 # behavior with the other selector classes, we prevent that here
557 # (using max). See https://bugs.python.org/issue29255
558 max_ev = max(len(self._fd_to_key), 1)
559 ready = []
560 try:
561 kev_list = self._selector.control(None, max_ev, timeout)
562 except InterruptedError:
563 return ready
564 for kev in kev_list:
565 fd = kev.ident
566 flag = kev.filter
567 events = 0
568 if flag == select.KQ_FILTER_READ:
569 events |= EVENT_READ
570 if flag == select.KQ_FILTER_WRITE:
571 events |= EVENT_WRITE
572
573 key = self._key_from_fd(fd)
574 if key:
575 ready.append((key, events & key.events))
576 return ready
577
578 def close(self):
579 self._selector.close()
580 super().close()
581
582
583 def _can_use(method):
584 """Check if we can use the selector depending upon the
585 operating system. """
586 # Implementation based upon https://github.com/sethmlarson/selectors2/blob/master/selectors2.py
587 selector = getattr(select, method, None)
588 if selector is None:
589 # select module does not implement method
590 return False
591 # check if the OS and Kernel actually support the method. Call may fail with
592 # OSError: [Errno 38] Function not implemented
593 try:
594 selector_obj = selector()
595 if method == 'poll':
596 # check that poll actually works
597 selector_obj.poll(0)
598 else:
599 # close epoll, kqueue, and devpoll fd
600 selector_obj.close()
601 return True
602 except OSError:
603 return False
604
605
606 # Choose the best implementation, roughly:
607 # epoll|kqueue|devpoll > poll > select.
608 # select() also can't accept a FD > FD_SETSIZE (usually around 1024)
609 if _can_use('kqueue'):
610 DefaultSelector = KqueueSelector
611 elif _can_use('epoll'):
612 DefaultSelector = EpollSelector
613 elif _can_use('devpoll'):
614 DefaultSelector = DevpollSelector
615 elif _can_use('poll'):
616 DefaultSelector = PollSelector
617 else:
618 DefaultSelector = SelectSelector