python (3.12.0)
1 """Event loop using a selector and related classes.
2
3 A selector is a "notify-when-ready" multiplexer. For a subclass which
4 also includes support for signal handling, see the unix_events sub-module.
5 """
6
7 __all__ = 'BaseSelectorEventLoop',
8
9 import collections
10 import errno
11 import functools
12 import itertools
13 import os
14 import selectors
15 import socket
16 import warnings
17 import weakref
18 try:
19 import ssl
20 except ImportError: # pragma: no cover
21 ssl = None
22
23 from . import base_events
24 from . import constants
25 from . import events
26 from . import futures
27 from . import protocols
28 from . import sslproto
29 from . import transports
30 from . import trsock
31 from .log import logger
32
33 _HAS_SENDMSG = hasattr(socket.socket, 'sendmsg')
34
35 if _HAS_SENDMSG:
36 try:
37 SC_IOV_MAX = os.sysconf('SC_IOV_MAX')
38 except OSError:
39 # Fallback to send
40 _HAS_SENDMSG = False
41
42 def _test_selector_event(selector, fd, event):
43 # Test if the selector is monitoring 'event' events
44 # for the file descriptor 'fd'.
45 try:
46 key = selector.get_key(fd)
47 except KeyError:
48 return False
49 else:
50 return bool(key.events & event)
51
52
53 class ESC[4;38;5;81mBaseSelectorEventLoop(ESC[4;38;5;149mbase_eventsESC[4;38;5;149m.ESC[4;38;5;149mBaseEventLoop):
54 """Selector event loop.
55
56 See events.EventLoop for API specification.
57 """
58
59 def __init__(self, selector=None):
60 super().__init__()
61
62 if selector is None:
63 selector = selectors.DefaultSelector()
64 logger.debug('Using selector: %s', selector.__class__.__name__)
65 self._selector = selector
66 self._make_self_pipe()
67 self._transports = weakref.WeakValueDictionary()
68
69 def _make_socket_transport(self, sock, protocol, waiter=None, *,
70 extra=None, server=None):
71 self._ensure_fd_no_transport(sock)
72 return _SelectorSocketTransport(self, sock, protocol, waiter,
73 extra, server)
74
75 def _make_ssl_transport(
76 self, rawsock, protocol, sslcontext, waiter=None,
77 *, server_side=False, server_hostname=None,
78 extra=None, server=None,
79 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
80 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT,
81 ):
82 self._ensure_fd_no_transport(rawsock)
83 ssl_protocol = sslproto.SSLProtocol(
84 self, protocol, sslcontext, waiter,
85 server_side, server_hostname,
86 ssl_handshake_timeout=ssl_handshake_timeout,
87 ssl_shutdown_timeout=ssl_shutdown_timeout
88 )
89 _SelectorSocketTransport(self, rawsock, ssl_protocol,
90 extra=extra, server=server)
91 return ssl_protocol._app_transport
92
93 def _make_datagram_transport(self, sock, protocol,
94 address=None, waiter=None, extra=None):
95 self._ensure_fd_no_transport(sock)
96 return _SelectorDatagramTransport(self, sock, protocol,
97 address, waiter, extra)
98
99 def close(self):
100 if self.is_running():
101 raise RuntimeError("Cannot close a running event loop")
102 if self.is_closed():
103 return
104 self._close_self_pipe()
105 super().close()
106 if self._selector is not None:
107 self._selector.close()
108 self._selector = None
109
110 def _close_self_pipe(self):
111 self._remove_reader(self._ssock.fileno())
112 self._ssock.close()
113 self._ssock = None
114 self._csock.close()
115 self._csock = None
116 self._internal_fds -= 1
117
118 def _make_self_pipe(self):
119 # A self-socket, really. :-)
120 self._ssock, self._csock = socket.socketpair()
121 self._ssock.setblocking(False)
122 self._csock.setblocking(False)
123 self._internal_fds += 1
124 self._add_reader(self._ssock.fileno(), self._read_from_self)
125
126 def _process_self_data(self, data):
127 pass
128
129 def _read_from_self(self):
130 while True:
131 try:
132 data = self._ssock.recv(4096)
133 if not data:
134 break
135 self._process_self_data(data)
136 except InterruptedError:
137 continue
138 except BlockingIOError:
139 break
140
141 def _write_to_self(self):
142 # This may be called from a different thread, possibly after
143 # _close_self_pipe() has been called or even while it is
144 # running. Guard for self._csock being None or closed. When
145 # a socket is closed, send() raises OSError (with errno set to
146 # EBADF, but let's not rely on the exact error code).
147 csock = self._csock
148 if csock is None:
149 return
150
151 try:
152 csock.send(b'\0')
153 except OSError:
154 if self._debug:
155 logger.debug("Fail to write a null byte into the "
156 "self-pipe socket",
157 exc_info=True)
158
159 def _start_serving(self, protocol_factory, sock,
160 sslcontext=None, server=None, backlog=100,
161 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
162 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
163 self._add_reader(sock.fileno(), self._accept_connection,
164 protocol_factory, sock, sslcontext, server, backlog,
165 ssl_handshake_timeout, ssl_shutdown_timeout)
166
167 def _accept_connection(
168 self, protocol_factory, sock,
169 sslcontext=None, server=None, backlog=100,
170 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
171 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
172 # This method is only called once for each event loop tick where the
173 # listening socket has triggered an EVENT_READ. There may be multiple
174 # connections waiting for an .accept() so it is called in a loop.
175 # See https://bugs.python.org/issue27906 for more details.
176 for _ in range(backlog):
177 try:
178 conn, addr = sock.accept()
179 if self._debug:
180 logger.debug("%r got a new connection from %r: %r",
181 server, addr, conn)
182 conn.setblocking(False)
183 except (BlockingIOError, InterruptedError, ConnectionAbortedError):
184 # Early exit because the socket accept buffer is empty.
185 return None
186 except OSError as exc:
187 # There's nowhere to send the error, so just log it.
188 if exc.errno in (errno.EMFILE, errno.ENFILE,
189 errno.ENOBUFS, errno.ENOMEM):
190 # Some platforms (e.g. Linux keep reporting the FD as
191 # ready, so we remove the read handler temporarily.
192 # We'll try again in a while.
193 self.call_exception_handler({
194 'message': 'socket.accept() out of system resource',
195 'exception': exc,
196 'socket': trsock.TransportSocket(sock),
197 })
198 self._remove_reader(sock.fileno())
199 self.call_later(constants.ACCEPT_RETRY_DELAY,
200 self._start_serving,
201 protocol_factory, sock, sslcontext, server,
202 backlog, ssl_handshake_timeout,
203 ssl_shutdown_timeout)
204 else:
205 raise # The event loop will catch, log and ignore it.
206 else:
207 extra = {'peername': addr}
208 accept = self._accept_connection2(
209 protocol_factory, conn, extra, sslcontext, server,
210 ssl_handshake_timeout, ssl_shutdown_timeout)
211 self.create_task(accept)
212
213 async def _accept_connection2(
214 self, protocol_factory, conn, extra,
215 sslcontext=None, server=None,
216 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
217 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
218 protocol = None
219 transport = None
220 try:
221 protocol = protocol_factory()
222 waiter = self.create_future()
223 if sslcontext:
224 transport = self._make_ssl_transport(
225 conn, protocol, sslcontext, waiter=waiter,
226 server_side=True, extra=extra, server=server,
227 ssl_handshake_timeout=ssl_handshake_timeout,
228 ssl_shutdown_timeout=ssl_shutdown_timeout)
229 else:
230 transport = self._make_socket_transport(
231 conn, protocol, waiter=waiter, extra=extra,
232 server=server)
233
234 try:
235 await waiter
236 except BaseException:
237 transport.close()
238 raise
239 # It's now up to the protocol to handle the connection.
240
241 except (SystemExit, KeyboardInterrupt):
242 raise
243 except BaseException as exc:
244 if self._debug:
245 context = {
246 'message':
247 'Error on transport creation for incoming connection',
248 'exception': exc,
249 }
250 if protocol is not None:
251 context['protocol'] = protocol
252 if transport is not None:
253 context['transport'] = transport
254 self.call_exception_handler(context)
255
256 def _ensure_fd_no_transport(self, fd):
257 fileno = fd
258 if not isinstance(fileno, int):
259 try:
260 fileno = int(fileno.fileno())
261 except (AttributeError, TypeError, ValueError):
262 # This code matches selectors._fileobj_to_fd function.
263 raise ValueError(f"Invalid file object: {fd!r}") from None
264 try:
265 transport = self._transports[fileno]
266 except KeyError:
267 pass
268 else:
269 if not transport.is_closing():
270 raise RuntimeError(
271 f'File descriptor {fd!r} is used by transport '
272 f'{transport!r}')
273
274 def _add_reader(self, fd, callback, *args):
275 self._check_closed()
276 handle = events.Handle(callback, args, self, None)
277 try:
278 key = self._selector.get_key(fd)
279 except KeyError:
280 self._selector.register(fd, selectors.EVENT_READ,
281 (handle, None))
282 else:
283 mask, (reader, writer) = key.events, key.data
284 self._selector.modify(fd, mask | selectors.EVENT_READ,
285 (handle, writer))
286 if reader is not None:
287 reader.cancel()
288 return handle
289
290 def _remove_reader(self, fd):
291 if self.is_closed():
292 return False
293 try:
294 key = self._selector.get_key(fd)
295 except KeyError:
296 return False
297 else:
298 mask, (reader, writer) = key.events, key.data
299 mask &= ~selectors.EVENT_READ
300 if not mask:
301 self._selector.unregister(fd)
302 else:
303 self._selector.modify(fd, mask, (None, writer))
304
305 if reader is not None:
306 reader.cancel()
307 return True
308 else:
309 return False
310
311 def _add_writer(self, fd, callback, *args):
312 self._check_closed()
313 handle = events.Handle(callback, args, self, None)
314 try:
315 key = self._selector.get_key(fd)
316 except KeyError:
317 self._selector.register(fd, selectors.EVENT_WRITE,
318 (None, handle))
319 else:
320 mask, (reader, writer) = key.events, key.data
321 self._selector.modify(fd, mask | selectors.EVENT_WRITE,
322 (reader, handle))
323 if writer is not None:
324 writer.cancel()
325 return handle
326
327 def _remove_writer(self, fd):
328 """Remove a writer callback."""
329 if self.is_closed():
330 return False
331 try:
332 key = self._selector.get_key(fd)
333 except KeyError:
334 return False
335 else:
336 mask, (reader, writer) = key.events, key.data
337 # Remove both writer and connector.
338 mask &= ~selectors.EVENT_WRITE
339 if not mask:
340 self._selector.unregister(fd)
341 else:
342 self._selector.modify(fd, mask, (reader, None))
343
344 if writer is not None:
345 writer.cancel()
346 return True
347 else:
348 return False
349
350 def add_reader(self, fd, callback, *args):
351 """Add a reader callback."""
352 self._ensure_fd_no_transport(fd)
353 self._add_reader(fd, callback, *args)
354
355 def remove_reader(self, fd):
356 """Remove a reader callback."""
357 self._ensure_fd_no_transport(fd)
358 return self._remove_reader(fd)
359
360 def add_writer(self, fd, callback, *args):
361 """Add a writer callback.."""
362 self._ensure_fd_no_transport(fd)
363 self._add_writer(fd, callback, *args)
364
365 def remove_writer(self, fd):
366 """Remove a writer callback."""
367 self._ensure_fd_no_transport(fd)
368 return self._remove_writer(fd)
369
370 async def sock_recv(self, sock, n):
371 """Receive data from the socket.
372
373 The return value is a bytes object representing the data received.
374 The maximum amount of data to be received at once is specified by
375 nbytes.
376 """
377 base_events._check_ssl_socket(sock)
378 if self._debug and sock.gettimeout() != 0:
379 raise ValueError("the socket must be non-blocking")
380 try:
381 return sock.recv(n)
382 except (BlockingIOError, InterruptedError):
383 pass
384 fut = self.create_future()
385 fd = sock.fileno()
386 self._ensure_fd_no_transport(fd)
387 handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
388 fut.add_done_callback(
389 functools.partial(self._sock_read_done, fd, handle=handle))
390 return await fut
391
392 def _sock_read_done(self, fd, fut, handle=None):
393 if handle is None or not handle.cancelled():
394 self.remove_reader(fd)
395
396 def _sock_recv(self, fut, sock, n):
397 # _sock_recv() can add itself as an I/O callback if the operation can't
398 # be done immediately. Don't use it directly, call sock_recv().
399 if fut.done():
400 return
401 try:
402 data = sock.recv(n)
403 except (BlockingIOError, InterruptedError):
404 return # try again next time
405 except (SystemExit, KeyboardInterrupt):
406 raise
407 except BaseException as exc:
408 fut.set_exception(exc)
409 else:
410 fut.set_result(data)
411
412 async def sock_recv_into(self, sock, buf):
413 """Receive data from the socket.
414
415 The received data is written into *buf* (a writable buffer).
416 The return value is the number of bytes written.
417 """
418 base_events._check_ssl_socket(sock)
419 if self._debug and sock.gettimeout() != 0:
420 raise ValueError("the socket must be non-blocking")
421 try:
422 return sock.recv_into(buf)
423 except (BlockingIOError, InterruptedError):
424 pass
425 fut = self.create_future()
426 fd = sock.fileno()
427 self._ensure_fd_no_transport(fd)
428 handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf)
429 fut.add_done_callback(
430 functools.partial(self._sock_read_done, fd, handle=handle))
431 return await fut
432
433 def _sock_recv_into(self, fut, sock, buf):
434 # _sock_recv_into() can add itself as an I/O callback if the operation
435 # can't be done immediately. Don't use it directly, call
436 # sock_recv_into().
437 if fut.done():
438 return
439 try:
440 nbytes = sock.recv_into(buf)
441 except (BlockingIOError, InterruptedError):
442 return # try again next time
443 except (SystemExit, KeyboardInterrupt):
444 raise
445 except BaseException as exc:
446 fut.set_exception(exc)
447 else:
448 fut.set_result(nbytes)
449
450 async def sock_recvfrom(self, sock, bufsize):
451 """Receive a datagram from a datagram socket.
452
453 The return value is a tuple of (bytes, address) representing the
454 datagram received and the address it came from.
455 The maximum amount of data to be received at once is specified by
456 nbytes.
457 """
458 base_events._check_ssl_socket(sock)
459 if self._debug and sock.gettimeout() != 0:
460 raise ValueError("the socket must be non-blocking")
461 try:
462 return sock.recvfrom(bufsize)
463 except (BlockingIOError, InterruptedError):
464 pass
465 fut = self.create_future()
466 fd = sock.fileno()
467 self._ensure_fd_no_transport(fd)
468 handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize)
469 fut.add_done_callback(
470 functools.partial(self._sock_read_done, fd, handle=handle))
471 return await fut
472
473 def _sock_recvfrom(self, fut, sock, bufsize):
474 # _sock_recvfrom() can add itself as an I/O callback if the operation
475 # can't be done immediately. Don't use it directly, call
476 # sock_recvfrom().
477 if fut.done():
478 return
479 try:
480 result = sock.recvfrom(bufsize)
481 except (BlockingIOError, InterruptedError):
482 return # try again next time
483 except (SystemExit, KeyboardInterrupt):
484 raise
485 except BaseException as exc:
486 fut.set_exception(exc)
487 else:
488 fut.set_result(result)
489
490 async def sock_recvfrom_into(self, sock, buf, nbytes=0):
491 """Receive data from the socket.
492
493 The received data is written into *buf* (a writable buffer).
494 The return value is a tuple of (number of bytes written, address).
495 """
496 base_events._check_ssl_socket(sock)
497 if self._debug and sock.gettimeout() != 0:
498 raise ValueError("the socket must be non-blocking")
499 if not nbytes:
500 nbytes = len(buf)
501
502 try:
503 return sock.recvfrom_into(buf, nbytes)
504 except (BlockingIOError, InterruptedError):
505 pass
506 fut = self.create_future()
507 fd = sock.fileno()
508 self._ensure_fd_no_transport(fd)
509 handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf,
510 nbytes)
511 fut.add_done_callback(
512 functools.partial(self._sock_read_done, fd, handle=handle))
513 return await fut
514
515 def _sock_recvfrom_into(self, fut, sock, buf, bufsize):
516 # _sock_recv_into() can add itself as an I/O callback if the operation
517 # can't be done immediately. Don't use it directly, call
518 # sock_recv_into().
519 if fut.done():
520 return
521 try:
522 result = sock.recvfrom_into(buf, bufsize)
523 except (BlockingIOError, InterruptedError):
524 return # try again next time
525 except (SystemExit, KeyboardInterrupt):
526 raise
527 except BaseException as exc:
528 fut.set_exception(exc)
529 else:
530 fut.set_result(result)
531
532 async def sock_sendall(self, sock, data):
533 """Send data to the socket.
534
535 The socket must be connected to a remote socket. This method continues
536 to send data from data until either all data has been sent or an
537 error occurs. None is returned on success. On error, an exception is
538 raised, and there is no way to determine how much data, if any, was
539 successfully processed by the receiving end of the connection.
540 """
541 base_events._check_ssl_socket(sock)
542 if self._debug and sock.gettimeout() != 0:
543 raise ValueError("the socket must be non-blocking")
544 try:
545 n = sock.send(data)
546 except (BlockingIOError, InterruptedError):
547 n = 0
548
549 if n == len(data):
550 # all data sent
551 return
552
553 fut = self.create_future()
554 fd = sock.fileno()
555 self._ensure_fd_no_transport(fd)
556 # use a trick with a list in closure to store a mutable state
557 handle = self._add_writer(fd, self._sock_sendall, fut, sock,
558 memoryview(data), [n])
559 fut.add_done_callback(
560 functools.partial(self._sock_write_done, fd, handle=handle))
561 return await fut
562
563 def _sock_sendall(self, fut, sock, view, pos):
564 if fut.done():
565 # Future cancellation can be scheduled on previous loop iteration
566 return
567 start = pos[0]
568 try:
569 n = sock.send(view[start:])
570 except (BlockingIOError, InterruptedError):
571 return
572 except (SystemExit, KeyboardInterrupt):
573 raise
574 except BaseException as exc:
575 fut.set_exception(exc)
576 return
577
578 start += n
579
580 if start == len(view):
581 fut.set_result(None)
582 else:
583 pos[0] = start
584
585 async def sock_sendto(self, sock, data, address):
586 """Send data to the socket.
587
588 The socket must be connected to a remote socket. This method continues
589 to send data from data until either all data has been sent or an
590 error occurs. None is returned on success. On error, an exception is
591 raised, and there is no way to determine how much data, if any, was
592 successfully processed by the receiving end of the connection.
593 """
594 base_events._check_ssl_socket(sock)
595 if self._debug and sock.gettimeout() != 0:
596 raise ValueError("the socket must be non-blocking")
597 try:
598 return sock.sendto(data, address)
599 except (BlockingIOError, InterruptedError):
600 pass
601
602 fut = self.create_future()
603 fd = sock.fileno()
604 self._ensure_fd_no_transport(fd)
605 # use a trick with a list in closure to store a mutable state
606 handle = self._add_writer(fd, self._sock_sendto, fut, sock, data,
607 address)
608 fut.add_done_callback(
609 functools.partial(self._sock_write_done, fd, handle=handle))
610 return await fut
611
612 def _sock_sendto(self, fut, sock, data, address):
613 if fut.done():
614 # Future cancellation can be scheduled on previous loop iteration
615 return
616 try:
617 n = sock.sendto(data, 0, address)
618 except (BlockingIOError, InterruptedError):
619 return
620 except (SystemExit, KeyboardInterrupt):
621 raise
622 except BaseException as exc:
623 fut.set_exception(exc)
624 else:
625 fut.set_result(n)
626
627 async def sock_connect(self, sock, address):
628 """Connect to a remote socket at address.
629
630 This method is a coroutine.
631 """
632 base_events._check_ssl_socket(sock)
633 if self._debug and sock.gettimeout() != 0:
634 raise ValueError("the socket must be non-blocking")
635
636 if sock.family == socket.AF_INET or (
637 base_events._HAS_IPv6 and sock.family == socket.AF_INET6):
638 resolved = await self._ensure_resolved(
639 address, family=sock.family, type=sock.type, proto=sock.proto,
640 loop=self,
641 )
642 _, _, _, _, address = resolved[0]
643
644 fut = self.create_future()
645 self._sock_connect(fut, sock, address)
646 try:
647 return await fut
648 finally:
649 # Needed to break cycles when an exception occurs.
650 fut = None
651
652 def _sock_connect(self, fut, sock, address):
653 fd = sock.fileno()
654 try:
655 sock.connect(address)
656 except (BlockingIOError, InterruptedError):
657 # Issue #23618: When the C function connect() fails with EINTR, the
658 # connection runs in background. We have to wait until the socket
659 # becomes writable to be notified when the connection succeed or
660 # fails.
661 self._ensure_fd_no_transport(fd)
662 handle = self._add_writer(
663 fd, self._sock_connect_cb, fut, sock, address)
664 fut.add_done_callback(
665 functools.partial(self._sock_write_done, fd, handle=handle))
666 except (SystemExit, KeyboardInterrupt):
667 raise
668 except BaseException as exc:
669 fut.set_exception(exc)
670 else:
671 fut.set_result(None)
672 finally:
673 fut = None
674
675 def _sock_write_done(self, fd, fut, handle=None):
676 if handle is None or not handle.cancelled():
677 self.remove_writer(fd)
678
679 def _sock_connect_cb(self, fut, sock, address):
680 if fut.done():
681 return
682
683 try:
684 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
685 if err != 0:
686 # Jump to any except clause below.
687 raise OSError(err, f'Connect call failed {address}')
688 except (BlockingIOError, InterruptedError):
689 # socket is still registered, the callback will be retried later
690 pass
691 except (SystemExit, KeyboardInterrupt):
692 raise
693 except BaseException as exc:
694 fut.set_exception(exc)
695 else:
696 fut.set_result(None)
697 finally:
698 fut = None
699
700 async def sock_accept(self, sock):
701 """Accept a connection.
702
703 The socket must be bound to an address and listening for connections.
704 The return value is a pair (conn, address) where conn is a new socket
705 object usable to send and receive data on the connection, and address
706 is the address bound to the socket on the other end of the connection.
707 """
708 base_events._check_ssl_socket(sock)
709 if self._debug and sock.gettimeout() != 0:
710 raise ValueError("the socket must be non-blocking")
711 fut = self.create_future()
712 self._sock_accept(fut, sock)
713 return await fut
714
715 def _sock_accept(self, fut, sock):
716 fd = sock.fileno()
717 try:
718 conn, address = sock.accept()
719 conn.setblocking(False)
720 except (BlockingIOError, InterruptedError):
721 self._ensure_fd_no_transport(fd)
722 handle = self._add_reader(fd, self._sock_accept, fut, sock)
723 fut.add_done_callback(
724 functools.partial(self._sock_read_done, fd, handle=handle))
725 except (SystemExit, KeyboardInterrupt):
726 raise
727 except BaseException as exc:
728 fut.set_exception(exc)
729 else:
730 fut.set_result((conn, address))
731
732 async def _sendfile_native(self, transp, file, offset, count):
733 del self._transports[transp._sock_fd]
734 resume_reading = transp.is_reading()
735 transp.pause_reading()
736 await transp._make_empty_waiter()
737 try:
738 return await self.sock_sendfile(transp._sock, file, offset, count,
739 fallback=False)
740 finally:
741 transp._reset_empty_waiter()
742 if resume_reading:
743 transp.resume_reading()
744 self._transports[transp._sock_fd] = transp
745
746 def _process_events(self, event_list):
747 for key, mask in event_list:
748 fileobj, (reader, writer) = key.fileobj, key.data
749 if mask & selectors.EVENT_READ and reader is not None:
750 if reader._cancelled:
751 self._remove_reader(fileobj)
752 else:
753 self._add_callback(reader)
754 if mask & selectors.EVENT_WRITE and writer is not None:
755 if writer._cancelled:
756 self._remove_writer(fileobj)
757 else:
758 self._add_callback(writer)
759
760 def _stop_serving(self, sock):
761 self._remove_reader(sock.fileno())
762 sock.close()
763
764
765 class ESC[4;38;5;81m_SelectorTransport(ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149m_FlowControlMixin,
766 ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149mTransport):
767
768 max_size = 256 * 1024 # Buffer size passed to recv().
769
770 # Attribute used in the destructor: it must be set even if the constructor
771 # is not called (see _SelectorSslTransport which may start by raising an
772 # exception)
773 _sock = None
774
775 def __init__(self, loop, sock, protocol, extra=None, server=None):
776 super().__init__(extra, loop)
777 self._extra['socket'] = trsock.TransportSocket(sock)
778 try:
779 self._extra['sockname'] = sock.getsockname()
780 except OSError:
781 self._extra['sockname'] = None
782 if 'peername' not in self._extra:
783 try:
784 self._extra['peername'] = sock.getpeername()
785 except socket.error:
786 self._extra['peername'] = None
787 self._sock = sock
788 self._sock_fd = sock.fileno()
789
790 self._protocol_connected = False
791 self.set_protocol(protocol)
792
793 self._server = server
794 self._buffer = collections.deque()
795 self._conn_lost = 0 # Set when call to connection_lost scheduled.
796 self._closing = False # Set when close() called.
797 self._paused = False # Set when pause_reading() called
798
799 if self._server is not None:
800 self._server._attach()
801 loop._transports[self._sock_fd] = self
802
803 def __repr__(self):
804 info = [self.__class__.__name__]
805 if self._sock is None:
806 info.append('closed')
807 elif self._closing:
808 info.append('closing')
809 info.append(f'fd={self._sock_fd}')
810 # test if the transport was closed
811 if self._loop is not None and not self._loop.is_closed():
812 polling = _test_selector_event(self._loop._selector,
813 self._sock_fd, selectors.EVENT_READ)
814 if polling:
815 info.append('read=polling')
816 else:
817 info.append('read=idle')
818
819 polling = _test_selector_event(self._loop._selector,
820 self._sock_fd,
821 selectors.EVENT_WRITE)
822 if polling:
823 state = 'polling'
824 else:
825 state = 'idle'
826
827 bufsize = self.get_write_buffer_size()
828 info.append(f'write=<{state}, bufsize={bufsize}>')
829 return '<{}>'.format(' '.join(info))
830
831 def abort(self):
832 self._force_close(None)
833
834 def set_protocol(self, protocol):
835 self._protocol = protocol
836 self._protocol_connected = True
837
838 def get_protocol(self):
839 return self._protocol
840
841 def is_closing(self):
842 return self._closing
843
844 def is_reading(self):
845 return not self.is_closing() and not self._paused
846
847 def pause_reading(self):
848 if not self.is_reading():
849 return
850 self._paused = True
851 self._loop._remove_reader(self._sock_fd)
852 if self._loop.get_debug():
853 logger.debug("%r pauses reading", self)
854
855 def resume_reading(self):
856 if self._closing or not self._paused:
857 return
858 self._paused = False
859 self._add_reader(self._sock_fd, self._read_ready)
860 if self._loop.get_debug():
861 logger.debug("%r resumes reading", self)
862
863 def close(self):
864 if self._closing:
865 return
866 self._closing = True
867 self._loop._remove_reader(self._sock_fd)
868 if not self._buffer:
869 self._conn_lost += 1
870 self._loop._remove_writer(self._sock_fd)
871 self._loop.call_soon(self._call_connection_lost, None)
872
873 def __del__(self, _warn=warnings.warn):
874 if self._sock is not None:
875 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
876 self._sock.close()
877
878 def _fatal_error(self, exc, message='Fatal error on transport'):
879 # Should be called from exception handler only.
880 if isinstance(exc, OSError):
881 if self._loop.get_debug():
882 logger.debug("%r: %s", self, message, exc_info=True)
883 else:
884 self._loop.call_exception_handler({
885 'message': message,
886 'exception': exc,
887 'transport': self,
888 'protocol': self._protocol,
889 })
890 self._force_close(exc)
891
892 def _force_close(self, exc):
893 if self._conn_lost:
894 return
895 if self._buffer:
896 self._buffer.clear()
897 self._loop._remove_writer(self._sock_fd)
898 if not self._closing:
899 self._closing = True
900 self._loop._remove_reader(self._sock_fd)
901 self._conn_lost += 1
902 self._loop.call_soon(self._call_connection_lost, exc)
903
904 def _call_connection_lost(self, exc):
905 try:
906 if self._protocol_connected:
907 self._protocol.connection_lost(exc)
908 finally:
909 self._sock.close()
910 self._sock = None
911 self._protocol = None
912 self._loop = None
913 server = self._server
914 if server is not None:
915 server._detach()
916 self._server = None
917
918 def get_write_buffer_size(self):
919 return sum(map(len, self._buffer))
920
921 def _add_reader(self, fd, callback, *args):
922 if not self.is_reading():
923 return
924 self._loop._add_reader(fd, callback, *args)
925
926
927 class ESC[4;38;5;81m_SelectorSocketTransport(ESC[4;38;5;149m_SelectorTransport):
928
929 _start_tls_compatible = True
930 _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
931
932 def __init__(self, loop, sock, protocol, waiter=None,
933 extra=None, server=None):
934
935 self._read_ready_cb = None
936 super().__init__(loop, sock, protocol, extra, server)
937 self._eof = False
938 self._empty_waiter = None
939 if _HAS_SENDMSG:
940 self._write_ready = self._write_sendmsg
941 else:
942 self._write_ready = self._write_send
943 # Disable the Nagle algorithm -- small writes will be
944 # sent without waiting for the TCP ACK. This generally
945 # decreases the latency (in some cases significantly.)
946 base_events._set_nodelay(self._sock)
947
948 self._loop.call_soon(self._protocol.connection_made, self)
949 # only start reading when connection_made() has been called
950 self._loop.call_soon(self._add_reader,
951 self._sock_fd, self._read_ready)
952 if waiter is not None:
953 # only wake up the waiter when connection_made() has been called
954 self._loop.call_soon(futures._set_result_unless_cancelled,
955 waiter, None)
956
957 def set_protocol(self, protocol):
958 if isinstance(protocol, protocols.BufferedProtocol):
959 self._read_ready_cb = self._read_ready__get_buffer
960 else:
961 self._read_ready_cb = self._read_ready__data_received
962
963 super().set_protocol(protocol)
964
965 def _read_ready(self):
966 self._read_ready_cb()
967
968 def _read_ready__get_buffer(self):
969 if self._conn_lost:
970 return
971
972 try:
973 buf = self._protocol.get_buffer(-1)
974 if not len(buf):
975 raise RuntimeError('get_buffer() returned an empty buffer')
976 except (SystemExit, KeyboardInterrupt):
977 raise
978 except BaseException as exc:
979 self._fatal_error(
980 exc, 'Fatal error: protocol.get_buffer() call failed.')
981 return
982
983 try:
984 nbytes = self._sock.recv_into(buf)
985 except (BlockingIOError, InterruptedError):
986 return
987 except (SystemExit, KeyboardInterrupt):
988 raise
989 except BaseException as exc:
990 self._fatal_error(exc, 'Fatal read error on socket transport')
991 return
992
993 if not nbytes:
994 self._read_ready__on_eof()
995 return
996
997 try:
998 self._protocol.buffer_updated(nbytes)
999 except (SystemExit, KeyboardInterrupt):
1000 raise
1001 except BaseException as exc:
1002 self._fatal_error(
1003 exc, 'Fatal error: protocol.buffer_updated() call failed.')
1004
1005 def _read_ready__data_received(self):
1006 if self._conn_lost:
1007 return
1008 try:
1009 data = self._sock.recv(self.max_size)
1010 except (BlockingIOError, InterruptedError):
1011 return
1012 except (SystemExit, KeyboardInterrupt):
1013 raise
1014 except BaseException as exc:
1015 self._fatal_error(exc, 'Fatal read error on socket transport')
1016 return
1017
1018 if not data:
1019 self._read_ready__on_eof()
1020 return
1021
1022 try:
1023 self._protocol.data_received(data)
1024 except (SystemExit, KeyboardInterrupt):
1025 raise
1026 except BaseException as exc:
1027 self._fatal_error(
1028 exc, 'Fatal error: protocol.data_received() call failed.')
1029
1030 def _read_ready__on_eof(self):
1031 if self._loop.get_debug():
1032 logger.debug("%r received EOF", self)
1033
1034 try:
1035 keep_open = self._protocol.eof_received()
1036 except (SystemExit, KeyboardInterrupt):
1037 raise
1038 except BaseException as exc:
1039 self._fatal_error(
1040 exc, 'Fatal error: protocol.eof_received() call failed.')
1041 return
1042
1043 if keep_open:
1044 # We're keeping the connection open so the
1045 # protocol can write more, but we still can't
1046 # receive more, so remove the reader callback.
1047 self._loop._remove_reader(self._sock_fd)
1048 else:
1049 self.close()
1050
1051 def write(self, data):
1052 if not isinstance(data, (bytes, bytearray, memoryview)):
1053 raise TypeError(f'data argument must be a bytes-like object, '
1054 f'not {type(data).__name__!r}')
1055 if self._eof:
1056 raise RuntimeError('Cannot call write() after write_eof()')
1057 if self._empty_waiter is not None:
1058 raise RuntimeError('unable to write; sendfile is in progress')
1059 if not data:
1060 return
1061
1062 if self._conn_lost:
1063 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1064 logger.warning('socket.send() raised exception.')
1065 self._conn_lost += 1
1066 return
1067
1068 if not self._buffer:
1069 # Optimization: try to send now.
1070 try:
1071 n = self._sock.send(data)
1072 except (BlockingIOError, InterruptedError):
1073 pass
1074 except (SystemExit, KeyboardInterrupt):
1075 raise
1076 except BaseException as exc:
1077 self._fatal_error(exc, 'Fatal write error on socket transport')
1078 return
1079 else:
1080 data = memoryview(data)[n:]
1081 if not data:
1082 return
1083 # Not all was written; register write handler.
1084 self._loop._add_writer(self._sock_fd, self._write_ready)
1085
1086 # Add it to the buffer.
1087 self._buffer.append(data)
1088 self._maybe_pause_protocol()
1089
1090 def _get_sendmsg_buffer(self):
1091 return itertools.islice(self._buffer, SC_IOV_MAX)
1092
1093 def _write_sendmsg(self):
1094 assert self._buffer, 'Data should not be empty'
1095 if self._conn_lost:
1096 return
1097 try:
1098 nbytes = self._sock.sendmsg(self._get_sendmsg_buffer())
1099 self._adjust_leftover_buffer(nbytes)
1100 except (BlockingIOError, InterruptedError):
1101 pass
1102 except (SystemExit, KeyboardInterrupt):
1103 raise
1104 except BaseException as exc:
1105 self._loop._remove_writer(self._sock_fd)
1106 self._buffer.clear()
1107 self._fatal_error(exc, 'Fatal write error on socket transport')
1108 if self._empty_waiter is not None:
1109 self._empty_waiter.set_exception(exc)
1110 else:
1111 self._maybe_resume_protocol() # May append to buffer.
1112 if not self._buffer:
1113 self._loop._remove_writer(self._sock_fd)
1114 if self._empty_waiter is not None:
1115 self._empty_waiter.set_result(None)
1116 if self._closing:
1117 self._call_connection_lost(None)
1118 elif self._eof:
1119 self._sock.shutdown(socket.SHUT_WR)
1120
1121 def _adjust_leftover_buffer(self, nbytes: int) -> None:
1122 buffer = self._buffer
1123 while nbytes:
1124 b = buffer.popleft()
1125 b_len = len(b)
1126 if b_len <= nbytes:
1127 nbytes -= b_len
1128 else:
1129 buffer.appendleft(b[nbytes:])
1130 break
1131
1132 def _write_send(self):
1133 assert self._buffer, 'Data should not be empty'
1134 if self._conn_lost:
1135 return
1136 try:
1137 buffer = self._buffer.popleft()
1138 n = self._sock.send(buffer)
1139 if n != len(buffer):
1140 # Not all data was written
1141 self._buffer.appendleft(buffer[n:])
1142 except (BlockingIOError, InterruptedError):
1143 pass
1144 except (SystemExit, KeyboardInterrupt):
1145 raise
1146 except BaseException as exc:
1147 self._loop._remove_writer(self._sock_fd)
1148 self._buffer.clear()
1149 self._fatal_error(exc, 'Fatal write error on socket transport')
1150 if self._empty_waiter is not None:
1151 self._empty_waiter.set_exception(exc)
1152 else:
1153 self._maybe_resume_protocol() # May append to buffer.
1154 if not self._buffer:
1155 self._loop._remove_writer(self._sock_fd)
1156 if self._empty_waiter is not None:
1157 self._empty_waiter.set_result(None)
1158 if self._closing:
1159 self._call_connection_lost(None)
1160 elif self._eof:
1161 self._sock.shutdown(socket.SHUT_WR)
1162
1163 def write_eof(self):
1164 if self._closing or self._eof:
1165 return
1166 self._eof = True
1167 if not self._buffer:
1168 self._sock.shutdown(socket.SHUT_WR)
1169
1170 def writelines(self, list_of_data):
1171 if self._eof:
1172 raise RuntimeError('Cannot call writelines() after write_eof()')
1173 if self._empty_waiter is not None:
1174 raise RuntimeError('unable to writelines; sendfile is in progress')
1175 if not list_of_data:
1176 return
1177 self._buffer.extend([memoryview(data) for data in list_of_data])
1178 self._write_ready()
1179 # If the entire buffer couldn't be written, register a write handler
1180 if self._buffer:
1181 self._loop._add_writer(self._sock_fd, self._write_ready)
1182
1183 def can_write_eof(self):
1184 return True
1185
1186 def _call_connection_lost(self, exc):
1187 super()._call_connection_lost(exc)
1188 if self._empty_waiter is not None:
1189 self._empty_waiter.set_exception(
1190 ConnectionError("Connection is closed by peer"))
1191
1192 def _make_empty_waiter(self):
1193 if self._empty_waiter is not None:
1194 raise RuntimeError("Empty waiter is already set")
1195 self._empty_waiter = self._loop.create_future()
1196 if not self._buffer:
1197 self._empty_waiter.set_result(None)
1198 return self._empty_waiter
1199
1200 def _reset_empty_waiter(self):
1201 self._empty_waiter = None
1202
1203 def close(self):
1204 self._read_ready_cb = None
1205 self._write_ready = None
1206 super().close()
1207
1208
1209 class ESC[4;38;5;81m_SelectorDatagramTransport(ESC[4;38;5;149m_SelectorTransport, ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149mDatagramTransport):
1210
1211 _buffer_factory = collections.deque
1212
1213 def __init__(self, loop, sock, protocol, address=None,
1214 waiter=None, extra=None):
1215 super().__init__(loop, sock, protocol, extra)
1216 self._address = address
1217 self._buffer_size = 0
1218 self._loop.call_soon(self._protocol.connection_made, self)
1219 # only start reading when connection_made() has been called
1220 self._loop.call_soon(self._add_reader,
1221 self._sock_fd, self._read_ready)
1222 if waiter is not None:
1223 # only wake up the waiter when connection_made() has been called
1224 self._loop.call_soon(futures._set_result_unless_cancelled,
1225 waiter, None)
1226
1227 def get_write_buffer_size(self):
1228 return self._buffer_size
1229
1230 def _read_ready(self):
1231 if self._conn_lost:
1232 return
1233 try:
1234 data, addr = self._sock.recvfrom(self.max_size)
1235 except (BlockingIOError, InterruptedError):
1236 pass
1237 except OSError as exc:
1238 self._protocol.error_received(exc)
1239 except (SystemExit, KeyboardInterrupt):
1240 raise
1241 except BaseException as exc:
1242 self._fatal_error(exc, 'Fatal read error on datagram transport')
1243 else:
1244 self._protocol.datagram_received(data, addr)
1245
1246 def sendto(self, data, addr=None):
1247 if not isinstance(data, (bytes, bytearray, memoryview)):
1248 raise TypeError(f'data argument must be a bytes-like object, '
1249 f'not {type(data).__name__!r}')
1250 if not data:
1251 return
1252
1253 if self._address:
1254 if addr not in (None, self._address):
1255 raise ValueError(
1256 f'Invalid address: must be None or {self._address}')
1257 addr = self._address
1258
1259 if self._conn_lost and self._address:
1260 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1261 logger.warning('socket.send() raised exception.')
1262 self._conn_lost += 1
1263 return
1264
1265 if not self._buffer:
1266 # Attempt to send it right away first.
1267 try:
1268 if self._extra['peername']:
1269 self._sock.send(data)
1270 else:
1271 self._sock.sendto(data, addr)
1272 return
1273 except (BlockingIOError, InterruptedError):
1274 self._loop._add_writer(self._sock_fd, self._sendto_ready)
1275 except OSError as exc:
1276 self._protocol.error_received(exc)
1277 return
1278 except (SystemExit, KeyboardInterrupt):
1279 raise
1280 except BaseException as exc:
1281 self._fatal_error(
1282 exc, 'Fatal write error on datagram transport')
1283 return
1284
1285 # Ensure that what we buffer is immutable.
1286 self._buffer.append((bytes(data), addr))
1287 self._buffer_size += len(data)
1288 self._maybe_pause_protocol()
1289
1290 def _sendto_ready(self):
1291 while self._buffer:
1292 data, addr = self._buffer.popleft()
1293 self._buffer_size -= len(data)
1294 try:
1295 if self._extra['peername']:
1296 self._sock.send(data)
1297 else:
1298 self._sock.sendto(data, addr)
1299 except (BlockingIOError, InterruptedError):
1300 self._buffer.appendleft((data, addr)) # Try again later.
1301 self._buffer_size += len(data)
1302 break
1303 except OSError as exc:
1304 self._protocol.error_received(exc)
1305 return
1306 except (SystemExit, KeyboardInterrupt):
1307 raise
1308 except BaseException as exc:
1309 self._fatal_error(
1310 exc, 'Fatal write error on datagram transport')
1311 return
1312
1313 self._maybe_resume_protocol() # May append to buffer.
1314 if not self._buffer:
1315 self._loop._remove_writer(self._sock_fd)
1316 if self._closing:
1317 self._call_connection_lost(None)