1 """Event loop using a selector and related classes.
2
3 A selector is a "notify-when-ready" multiplexer. For a subclass which
4 also includes support for signal handling, see the unix_events sub-module.
5 """
6
7 __all__ = 'BaseSelectorEventLoop',
8
9 import collections
10 import errno
11 import functools
12 import selectors
13 import socket
14 import warnings
15 import weakref
16 try:
17 import ssl
18 except ImportError: # pragma: no cover
19 ssl = None
20
21 from . import base_events
22 from . import constants
23 from . import events
24 from . import futures
25 from . import protocols
26 from . import sslproto
27 from . import transports
28 from . import trsock
29 from .log import logger
30
31
32 def _test_selector_event(selector, fd, event):
33 # Test if the selector is monitoring 'event' events
34 # for the file descriptor 'fd'.
35 try:
36 key = selector.get_key(fd)
37 except KeyError:
38 return False
39 else:
40 return bool(key.events & event)
41
42
43 class ESC[4;38;5;81mBaseSelectorEventLoop(ESC[4;38;5;149mbase_eventsESC[4;38;5;149m.ESC[4;38;5;149mBaseEventLoop):
44 """Selector event loop.
45
46 See events.EventLoop for API specification.
47 """
48
49 def __init__(self, selector=None):
50 super().__init__()
51
52 if selector is None:
53 selector = selectors.DefaultSelector()
54 logger.debug('Using selector: %s', selector.__class__.__name__)
55 self._selector = selector
56 self._make_self_pipe()
57 self._transports = weakref.WeakValueDictionary()
58
59 def _make_socket_transport(self, sock, protocol, waiter=None, *,
60 extra=None, server=None):
61 return _SelectorSocketTransport(self, sock, protocol, waiter,
62 extra, server)
63
64 def _make_ssl_transport(
65 self, rawsock, protocol, sslcontext, waiter=None,
66 *, server_side=False, server_hostname=None,
67 extra=None, server=None,
68 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
69 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT,
70 ):
71 ssl_protocol = sslproto.SSLProtocol(
72 self, protocol, sslcontext, waiter,
73 server_side, server_hostname,
74 ssl_handshake_timeout=ssl_handshake_timeout,
75 ssl_shutdown_timeout=ssl_shutdown_timeout
76 )
77 _SelectorSocketTransport(self, rawsock, ssl_protocol,
78 extra=extra, server=server)
79 return ssl_protocol._app_transport
80
81 def _make_datagram_transport(self, sock, protocol,
82 address=None, waiter=None, extra=None):
83 return _SelectorDatagramTransport(self, sock, protocol,
84 address, waiter, extra)
85
86 def close(self):
87 if self.is_running():
88 raise RuntimeError("Cannot close a running event loop")
89 if self.is_closed():
90 return
91 self._close_self_pipe()
92 super().close()
93 if self._selector is not None:
94 self._selector.close()
95 self._selector = None
96
97 def _close_self_pipe(self):
98 self._remove_reader(self._ssock.fileno())
99 self._ssock.close()
100 self._ssock = None
101 self._csock.close()
102 self._csock = None
103 self._internal_fds -= 1
104
105 def _make_self_pipe(self):
106 # A self-socket, really. :-)
107 self._ssock, self._csock = socket.socketpair()
108 self._ssock.setblocking(False)
109 self._csock.setblocking(False)
110 self._internal_fds += 1
111 self._add_reader(self._ssock.fileno(), self._read_from_self)
112
113 def _process_self_data(self, data):
114 pass
115
116 def _read_from_self(self):
117 while True:
118 try:
119 data = self._ssock.recv(4096)
120 if not data:
121 break
122 self._process_self_data(data)
123 except InterruptedError:
124 continue
125 except BlockingIOError:
126 break
127
128 def _write_to_self(self):
129 # This may be called from a different thread, possibly after
130 # _close_self_pipe() has been called or even while it is
131 # running. Guard for self._csock being None or closed. When
132 # a socket is closed, send() raises OSError (with errno set to
133 # EBADF, but let's not rely on the exact error code).
134 csock = self._csock
135 if csock is None:
136 return
137
138 try:
139 csock.send(b'\0')
140 except OSError:
141 if self._debug:
142 logger.debug("Fail to write a null byte into the "
143 "self-pipe socket",
144 exc_info=True)
145
146 def _start_serving(self, protocol_factory, sock,
147 sslcontext=None, server=None, backlog=100,
148 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
149 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
150 self._add_reader(sock.fileno(), self._accept_connection,
151 protocol_factory, sock, sslcontext, server, backlog,
152 ssl_handshake_timeout, ssl_shutdown_timeout)
153
154 def _accept_connection(
155 self, protocol_factory, sock,
156 sslcontext=None, server=None, backlog=100,
157 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
158 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
159 # This method is only called once for each event loop tick where the
160 # listening socket has triggered an EVENT_READ. There may be multiple
161 # connections waiting for an .accept() so it is called in a loop.
162 # See https://bugs.python.org/issue27906 for more details.
163 for _ in range(backlog):
164 try:
165 conn, addr = sock.accept()
166 if self._debug:
167 logger.debug("%r got a new connection from %r: %r",
168 server, addr, conn)
169 conn.setblocking(False)
170 except (BlockingIOError, InterruptedError, ConnectionAbortedError):
171 # Early exit because the socket accept buffer is empty.
172 return None
173 except OSError as exc:
174 # There's nowhere to send the error, so just log it.
175 if exc.errno in (errno.EMFILE, errno.ENFILE,
176 errno.ENOBUFS, errno.ENOMEM):
177 # Some platforms (e.g. Linux keep reporting the FD as
178 # ready, so we remove the read handler temporarily.
179 # We'll try again in a while.
180 self.call_exception_handler({
181 'message': 'socket.accept() out of system resource',
182 'exception': exc,
183 'socket': trsock.TransportSocket(sock),
184 })
185 self._remove_reader(sock.fileno())
186 self.call_later(constants.ACCEPT_RETRY_DELAY,
187 self._start_serving,
188 protocol_factory, sock, sslcontext, server,
189 backlog, ssl_handshake_timeout,
190 ssl_shutdown_timeout)
191 else:
192 raise # The event loop will catch, log and ignore it.
193 else:
194 extra = {'peername': addr}
195 accept = self._accept_connection2(
196 protocol_factory, conn, extra, sslcontext, server,
197 ssl_handshake_timeout, ssl_shutdown_timeout)
198 self.create_task(accept)
199
200 async def _accept_connection2(
201 self, protocol_factory, conn, extra,
202 sslcontext=None, server=None,
203 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
204 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
205 protocol = None
206 transport = None
207 try:
208 protocol = protocol_factory()
209 waiter = self.create_future()
210 if sslcontext:
211 transport = self._make_ssl_transport(
212 conn, protocol, sslcontext, waiter=waiter,
213 server_side=True, extra=extra, server=server,
214 ssl_handshake_timeout=ssl_handshake_timeout,
215 ssl_shutdown_timeout=ssl_shutdown_timeout)
216 else:
217 transport = self._make_socket_transport(
218 conn, protocol, waiter=waiter, extra=extra,
219 server=server)
220
221 try:
222 await waiter
223 except BaseException:
224 transport.close()
225 raise
226 # It's now up to the protocol to handle the connection.
227
228 except (SystemExit, KeyboardInterrupt):
229 raise
230 except BaseException as exc:
231 if self._debug:
232 context = {
233 'message':
234 'Error on transport creation for incoming connection',
235 'exception': exc,
236 }
237 if protocol is not None:
238 context['protocol'] = protocol
239 if transport is not None:
240 context['transport'] = transport
241 self.call_exception_handler(context)
242
243 def _ensure_fd_no_transport(self, fd):
244 fileno = fd
245 if not isinstance(fileno, int):
246 try:
247 fileno = int(fileno.fileno())
248 except (AttributeError, TypeError, ValueError):
249 # This code matches selectors._fileobj_to_fd function.
250 raise ValueError(f"Invalid file object: {fd!r}") from None
251 try:
252 transport = self._transports[fileno]
253 except KeyError:
254 pass
255 else:
256 if not transport.is_closing():
257 raise RuntimeError(
258 f'File descriptor {fd!r} is used by transport '
259 f'{transport!r}')
260
261 def _add_reader(self, fd, callback, *args):
262 self._check_closed()
263 handle = events.Handle(callback, args, self, None)
264 try:
265 key = self._selector.get_key(fd)
266 except KeyError:
267 self._selector.register(fd, selectors.EVENT_READ,
268 (handle, None))
269 else:
270 mask, (reader, writer) = key.events, key.data
271 self._selector.modify(fd, mask | selectors.EVENT_READ,
272 (handle, writer))
273 if reader is not None:
274 reader.cancel()
275 return handle
276
277 def _remove_reader(self, fd):
278 if self.is_closed():
279 return False
280 try:
281 key = self._selector.get_key(fd)
282 except KeyError:
283 return False
284 else:
285 mask, (reader, writer) = key.events, key.data
286 mask &= ~selectors.EVENT_READ
287 if not mask:
288 self._selector.unregister(fd)
289 else:
290 self._selector.modify(fd, mask, (None, writer))
291
292 if reader is not None:
293 reader.cancel()
294 return True
295 else:
296 return False
297
298 def _add_writer(self, fd, callback, *args):
299 self._check_closed()
300 handle = events.Handle(callback, args, self, None)
301 try:
302 key = self._selector.get_key(fd)
303 except KeyError:
304 self._selector.register(fd, selectors.EVENT_WRITE,
305 (None, handle))
306 else:
307 mask, (reader, writer) = key.events, key.data
308 self._selector.modify(fd, mask | selectors.EVENT_WRITE,
309 (reader, handle))
310 if writer is not None:
311 writer.cancel()
312 return handle
313
314 def _remove_writer(self, fd):
315 """Remove a writer callback."""
316 if self.is_closed():
317 return False
318 try:
319 key = self._selector.get_key(fd)
320 except KeyError:
321 return False
322 else:
323 mask, (reader, writer) = key.events, key.data
324 # Remove both writer and connector.
325 mask &= ~selectors.EVENT_WRITE
326 if not mask:
327 self._selector.unregister(fd)
328 else:
329 self._selector.modify(fd, mask, (reader, None))
330
331 if writer is not None:
332 writer.cancel()
333 return True
334 else:
335 return False
336
337 def add_reader(self, fd, callback, *args):
338 """Add a reader callback."""
339 self._ensure_fd_no_transport(fd)
340 self._add_reader(fd, callback, *args)
341
342 def remove_reader(self, fd):
343 """Remove a reader callback."""
344 self._ensure_fd_no_transport(fd)
345 return self._remove_reader(fd)
346
347 def add_writer(self, fd, callback, *args):
348 """Add a writer callback.."""
349 self._ensure_fd_no_transport(fd)
350 self._add_writer(fd, callback, *args)
351
352 def remove_writer(self, fd):
353 """Remove a writer callback."""
354 self._ensure_fd_no_transport(fd)
355 return self._remove_writer(fd)
356
357 async def sock_recv(self, sock, n):
358 """Receive data from the socket.
359
360 The return value is a bytes object representing the data received.
361 The maximum amount of data to be received at once is specified by
362 nbytes.
363 """
364 base_events._check_ssl_socket(sock)
365 if self._debug and sock.gettimeout() != 0:
366 raise ValueError("the socket must be non-blocking")
367 try:
368 return sock.recv(n)
369 except (BlockingIOError, InterruptedError):
370 pass
371 fut = self.create_future()
372 fd = sock.fileno()
373 self._ensure_fd_no_transport(fd)
374 handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
375 fut.add_done_callback(
376 functools.partial(self._sock_read_done, fd, handle=handle))
377 return await fut
378
379 def _sock_read_done(self, fd, fut, handle=None):
380 if handle is None or not handle.cancelled():
381 self.remove_reader(fd)
382
383 def _sock_recv(self, fut, sock, n):
384 # _sock_recv() can add itself as an I/O callback if the operation can't
385 # be done immediately. Don't use it directly, call sock_recv().
386 if fut.done():
387 return
388 try:
389 data = sock.recv(n)
390 except (BlockingIOError, InterruptedError):
391 return # try again next time
392 except (SystemExit, KeyboardInterrupt):
393 raise
394 except BaseException as exc:
395 fut.set_exception(exc)
396 else:
397 fut.set_result(data)
398
399 async def sock_recv_into(self, sock, buf):
400 """Receive data from the socket.
401
402 The received data is written into *buf* (a writable buffer).
403 The return value is the number of bytes written.
404 """
405 base_events._check_ssl_socket(sock)
406 if self._debug and sock.gettimeout() != 0:
407 raise ValueError("the socket must be non-blocking")
408 try:
409 return sock.recv_into(buf)
410 except (BlockingIOError, InterruptedError):
411 pass
412 fut = self.create_future()
413 fd = sock.fileno()
414 self._ensure_fd_no_transport(fd)
415 handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf)
416 fut.add_done_callback(
417 functools.partial(self._sock_read_done, fd, handle=handle))
418 return await fut
419
420 def _sock_recv_into(self, fut, sock, buf):
421 # _sock_recv_into() can add itself as an I/O callback if the operation
422 # can't be done immediately. Don't use it directly, call
423 # sock_recv_into().
424 if fut.done():
425 return
426 try:
427 nbytes = sock.recv_into(buf)
428 except (BlockingIOError, InterruptedError):
429 return # try again next time
430 except (SystemExit, KeyboardInterrupt):
431 raise
432 except BaseException as exc:
433 fut.set_exception(exc)
434 else:
435 fut.set_result(nbytes)
436
437 async def sock_recvfrom(self, sock, bufsize):
438 """Receive a datagram from a datagram socket.
439
440 The return value is a tuple of (bytes, address) representing the
441 datagram received and the address it came from.
442 The maximum amount of data to be received at once is specified by
443 nbytes.
444 """
445 base_events._check_ssl_socket(sock)
446 if self._debug and sock.gettimeout() != 0:
447 raise ValueError("the socket must be non-blocking")
448 try:
449 return sock.recvfrom(bufsize)
450 except (BlockingIOError, InterruptedError):
451 pass
452 fut = self.create_future()
453 fd = sock.fileno()
454 self._ensure_fd_no_transport(fd)
455 handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize)
456 fut.add_done_callback(
457 functools.partial(self._sock_read_done, fd, handle=handle))
458 return await fut
459
460 def _sock_recvfrom(self, fut, sock, bufsize):
461 # _sock_recvfrom() can add itself as an I/O callback if the operation
462 # can't be done immediately. Don't use it directly, call
463 # sock_recvfrom().
464 if fut.done():
465 return
466 try:
467 result = sock.recvfrom(bufsize)
468 except (BlockingIOError, InterruptedError):
469 return # try again next time
470 except (SystemExit, KeyboardInterrupt):
471 raise
472 except BaseException as exc:
473 fut.set_exception(exc)
474 else:
475 fut.set_result(result)
476
477 async def sock_recvfrom_into(self, sock, buf, nbytes=0):
478 """Receive data from the socket.
479
480 The received data is written into *buf* (a writable buffer).
481 The return value is a tuple of (number of bytes written, address).
482 """
483 base_events._check_ssl_socket(sock)
484 if self._debug and sock.gettimeout() != 0:
485 raise ValueError("the socket must be non-blocking")
486 if not nbytes:
487 nbytes = len(buf)
488
489 try:
490 return sock.recvfrom_into(buf, nbytes)
491 except (BlockingIOError, InterruptedError):
492 pass
493 fut = self.create_future()
494 fd = sock.fileno()
495 self._ensure_fd_no_transport(fd)
496 handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf,
497 nbytes)
498 fut.add_done_callback(
499 functools.partial(self._sock_read_done, fd, handle=handle))
500 return await fut
501
502 def _sock_recvfrom_into(self, fut, sock, buf, bufsize):
503 # _sock_recv_into() can add itself as an I/O callback if the operation
504 # can't be done immediately. Don't use it directly, call
505 # sock_recv_into().
506 if fut.done():
507 return
508 try:
509 result = sock.recvfrom_into(buf, bufsize)
510 except (BlockingIOError, InterruptedError):
511 return # try again next time
512 except (SystemExit, KeyboardInterrupt):
513 raise
514 except BaseException as exc:
515 fut.set_exception(exc)
516 else:
517 fut.set_result(result)
518
519 async def sock_sendall(self, sock, data):
520 """Send data to the socket.
521
522 The socket must be connected to a remote socket. This method continues
523 to send data from data until either all data has been sent or an
524 error occurs. None is returned on success. On error, an exception is
525 raised, and there is no way to determine how much data, if any, was
526 successfully processed by the receiving end of the connection.
527 """
528 base_events._check_ssl_socket(sock)
529 if self._debug and sock.gettimeout() != 0:
530 raise ValueError("the socket must be non-blocking")
531 try:
532 n = sock.send(data)
533 except (BlockingIOError, InterruptedError):
534 n = 0
535
536 if n == len(data):
537 # all data sent
538 return
539
540 fut = self.create_future()
541 fd = sock.fileno()
542 self._ensure_fd_no_transport(fd)
543 # use a trick with a list in closure to store a mutable state
544 handle = self._add_writer(fd, self._sock_sendall, fut, sock,
545 memoryview(data), [n])
546 fut.add_done_callback(
547 functools.partial(self._sock_write_done, fd, handle=handle))
548 return await fut
549
550 def _sock_sendall(self, fut, sock, view, pos):
551 if fut.done():
552 # Future cancellation can be scheduled on previous loop iteration
553 return
554 start = pos[0]
555 try:
556 n = sock.send(view[start:])
557 except (BlockingIOError, InterruptedError):
558 return
559 except (SystemExit, KeyboardInterrupt):
560 raise
561 except BaseException as exc:
562 fut.set_exception(exc)
563 return
564
565 start += n
566
567 if start == len(view):
568 fut.set_result(None)
569 else:
570 pos[0] = start
571
572 async def sock_sendto(self, sock, data, address):
573 """Send data to the socket.
574
575 The socket must be connected to a remote socket. This method continues
576 to send data from data until either all data has been sent or an
577 error occurs. None is returned on success. On error, an exception is
578 raised, and there is no way to determine how much data, if any, was
579 successfully processed by the receiving end of the connection.
580 """
581 base_events._check_ssl_socket(sock)
582 if self._debug and sock.gettimeout() != 0:
583 raise ValueError("the socket must be non-blocking")
584 try:
585 return sock.sendto(data, address)
586 except (BlockingIOError, InterruptedError):
587 pass
588
589 fut = self.create_future()
590 fd = sock.fileno()
591 self._ensure_fd_no_transport(fd)
592 # use a trick with a list in closure to store a mutable state
593 handle = self._add_writer(fd, self._sock_sendto, fut, sock, data,
594 address)
595 fut.add_done_callback(
596 functools.partial(self._sock_write_done, fd, handle=handle))
597 return await fut
598
599 def _sock_sendto(self, fut, sock, data, address):
600 if fut.done():
601 # Future cancellation can be scheduled on previous loop iteration
602 return
603 try:
604 n = sock.sendto(data, 0, address)
605 except (BlockingIOError, InterruptedError):
606 return
607 except (SystemExit, KeyboardInterrupt):
608 raise
609 except BaseException as exc:
610 fut.set_exception(exc)
611 else:
612 fut.set_result(n)
613
614 async def sock_connect(self, sock, address):
615 """Connect to a remote socket at address.
616
617 This method is a coroutine.
618 """
619 base_events._check_ssl_socket(sock)
620 if self._debug and sock.gettimeout() != 0:
621 raise ValueError("the socket must be non-blocking")
622
623 if sock.family == socket.AF_INET or (
624 base_events._HAS_IPv6 and sock.family == socket.AF_INET6):
625 resolved = await self._ensure_resolved(
626 address, family=sock.family, type=sock.type, proto=sock.proto,
627 loop=self,
628 )
629 _, _, _, _, address = resolved[0]
630
631 fut = self.create_future()
632 self._sock_connect(fut, sock, address)
633 try:
634 return await fut
635 finally:
636 # Needed to break cycles when an exception occurs.
637 fut = None
638
639 def _sock_connect(self, fut, sock, address):
640 fd = sock.fileno()
641 try:
642 sock.connect(address)
643 except (BlockingIOError, InterruptedError):
644 # Issue #23618: When the C function connect() fails with EINTR, the
645 # connection runs in background. We have to wait until the socket
646 # becomes writable to be notified when the connection succeed or
647 # fails.
648 self._ensure_fd_no_transport(fd)
649 handle = self._add_writer(
650 fd, self._sock_connect_cb, fut, sock, address)
651 fut.add_done_callback(
652 functools.partial(self._sock_write_done, fd, handle=handle))
653 except (SystemExit, KeyboardInterrupt):
654 raise
655 except BaseException as exc:
656 fut.set_exception(exc)
657 else:
658 fut.set_result(None)
659 finally:
660 fut = None
661
662 def _sock_write_done(self, fd, fut, handle=None):
663 if handle is None or not handle.cancelled():
664 self.remove_writer(fd)
665
666 def _sock_connect_cb(self, fut, sock, address):
667 if fut.done():
668 return
669
670 try:
671 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
672 if err != 0:
673 # Jump to any except clause below.
674 raise OSError(err, f'Connect call failed {address}')
675 except (BlockingIOError, InterruptedError):
676 # socket is still registered, the callback will be retried later
677 pass
678 except (SystemExit, KeyboardInterrupt):
679 raise
680 except BaseException as exc:
681 fut.set_exception(exc)
682 else:
683 fut.set_result(None)
684 finally:
685 fut = None
686
687 async def sock_accept(self, sock):
688 """Accept a connection.
689
690 The socket must be bound to an address and listening for connections.
691 The return value is a pair (conn, address) where conn is a new socket
692 object usable to send and receive data on the connection, and address
693 is the address bound to the socket on the other end of the connection.
694 """
695 base_events._check_ssl_socket(sock)
696 if self._debug and sock.gettimeout() != 0:
697 raise ValueError("the socket must be non-blocking")
698 fut = self.create_future()
699 self._sock_accept(fut, sock)
700 return await fut
701
702 def _sock_accept(self, fut, sock):
703 fd = sock.fileno()
704 try:
705 conn, address = sock.accept()
706 conn.setblocking(False)
707 except (BlockingIOError, InterruptedError):
708 self._ensure_fd_no_transport(fd)
709 handle = self._add_reader(fd, self._sock_accept, fut, sock)
710 fut.add_done_callback(
711 functools.partial(self._sock_read_done, fd, handle=handle))
712 except (SystemExit, KeyboardInterrupt):
713 raise
714 except BaseException as exc:
715 fut.set_exception(exc)
716 else:
717 fut.set_result((conn, address))
718
719 async def _sendfile_native(self, transp, file, offset, count):
720 del self._transports[transp._sock_fd]
721 resume_reading = transp.is_reading()
722 transp.pause_reading()
723 await transp._make_empty_waiter()
724 try:
725 return await self.sock_sendfile(transp._sock, file, offset, count,
726 fallback=False)
727 finally:
728 transp._reset_empty_waiter()
729 if resume_reading:
730 transp.resume_reading()
731 self._transports[transp._sock_fd] = transp
732
733 def _process_events(self, event_list):
734 for key, mask in event_list:
735 fileobj, (reader, writer) = key.fileobj, key.data
736 if mask & selectors.EVENT_READ and reader is not None:
737 if reader._cancelled:
738 self._remove_reader(fileobj)
739 else:
740 self._add_callback(reader)
741 if mask & selectors.EVENT_WRITE and writer is not None:
742 if writer._cancelled:
743 self._remove_writer(fileobj)
744 else:
745 self._add_callback(writer)
746
747 def _stop_serving(self, sock):
748 self._remove_reader(sock.fileno())
749 sock.close()
750
751
752 class ESC[4;38;5;81m_SelectorTransport(ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149m_FlowControlMixin,
753 ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149mTransport):
754
755 max_size = 256 * 1024 # Buffer size passed to recv().
756
757 _buffer_factory = bytearray # Constructs initial value for self._buffer.
758
759 # Attribute used in the destructor: it must be set even if the constructor
760 # is not called (see _SelectorSslTransport which may start by raising an
761 # exception)
762 _sock = None
763
764 def __init__(self, loop, sock, protocol, extra=None, server=None):
765 super().__init__(extra, loop)
766 self._extra['socket'] = trsock.TransportSocket(sock)
767 try:
768 self._extra['sockname'] = sock.getsockname()
769 except OSError:
770 self._extra['sockname'] = None
771 if 'peername' not in self._extra:
772 try:
773 self._extra['peername'] = sock.getpeername()
774 except socket.error:
775 self._extra['peername'] = None
776 self._sock = sock
777 self._sock_fd = sock.fileno()
778
779 self._protocol_connected = False
780 self.set_protocol(protocol)
781
782 self._server = server
783 self._buffer = self._buffer_factory()
784 self._conn_lost = 0 # Set when call to connection_lost scheduled.
785 self._closing = False # Set when close() called.
786 self._paused = False # Set when pause_reading() called
787
788 if self._server is not None:
789 self._server._attach()
790 loop._transports[self._sock_fd] = self
791
792 def __repr__(self):
793 info = [self.__class__.__name__]
794 if self._sock is None:
795 info.append('closed')
796 elif self._closing:
797 info.append('closing')
798 info.append(f'fd={self._sock_fd}')
799 # test if the transport was closed
800 if self._loop is not None and not self._loop.is_closed():
801 polling = _test_selector_event(self._loop._selector,
802 self._sock_fd, selectors.EVENT_READ)
803 if polling:
804 info.append('read=polling')
805 else:
806 info.append('read=idle')
807
808 polling = _test_selector_event(self._loop._selector,
809 self._sock_fd,
810 selectors.EVENT_WRITE)
811 if polling:
812 state = 'polling'
813 else:
814 state = 'idle'
815
816 bufsize = self.get_write_buffer_size()
817 info.append(f'write=<{state}, bufsize={bufsize}>')
818 return '<{}>'.format(' '.join(info))
819
820 def abort(self):
821 self._force_close(None)
822
823 def set_protocol(self, protocol):
824 self._protocol = protocol
825 self._protocol_connected = True
826
827 def get_protocol(self):
828 return self._protocol
829
830 def is_closing(self):
831 return self._closing
832
833 def is_reading(self):
834 return not self.is_closing() and not self._paused
835
836 def pause_reading(self):
837 if not self.is_reading():
838 return
839 self._paused = True
840 self._loop._remove_reader(self._sock_fd)
841 if self._loop.get_debug():
842 logger.debug("%r pauses reading", self)
843
844 def resume_reading(self):
845 if self._closing or not self._paused:
846 return
847 self._paused = False
848 self._add_reader(self._sock_fd, self._read_ready)
849 if self._loop.get_debug():
850 logger.debug("%r resumes reading", self)
851
852 def close(self):
853 if self._closing:
854 return
855 self._closing = True
856 self._loop._remove_reader(self._sock_fd)
857 if not self._buffer:
858 self._conn_lost += 1
859 self._loop._remove_writer(self._sock_fd)
860 self._loop.call_soon(self._call_connection_lost, None)
861
862 def __del__(self, _warn=warnings.warn):
863 if self._sock is not None:
864 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
865 self._sock.close()
866
867 def _fatal_error(self, exc, message='Fatal error on transport'):
868 # Should be called from exception handler only.
869 if isinstance(exc, OSError):
870 if self._loop.get_debug():
871 logger.debug("%r: %s", self, message, exc_info=True)
872 else:
873 self._loop.call_exception_handler({
874 'message': message,
875 'exception': exc,
876 'transport': self,
877 'protocol': self._protocol,
878 })
879 self._force_close(exc)
880
881 def _force_close(self, exc):
882 if self._conn_lost:
883 return
884 if self._buffer:
885 self._buffer.clear()
886 self._loop._remove_writer(self._sock_fd)
887 if not self._closing:
888 self._closing = True
889 self._loop._remove_reader(self._sock_fd)
890 self._conn_lost += 1
891 self._loop.call_soon(self._call_connection_lost, exc)
892
893 def _call_connection_lost(self, exc):
894 try:
895 if self._protocol_connected:
896 self._protocol.connection_lost(exc)
897 finally:
898 self._sock.close()
899 self._sock = None
900 self._protocol = None
901 self._loop = None
902 server = self._server
903 if server is not None:
904 server._detach()
905 self._server = None
906
907 def get_write_buffer_size(self):
908 return len(self._buffer)
909
910 def _add_reader(self, fd, callback, *args):
911 if not self.is_reading():
912 return
913 self._loop._add_reader(fd, callback, *args)
914
915
916 class ESC[4;38;5;81m_SelectorSocketTransport(ESC[4;38;5;149m_SelectorTransport):
917
918 _start_tls_compatible = True
919 _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
920
921 def __init__(self, loop, sock, protocol, waiter=None,
922 extra=None, server=None):
923
924 self._read_ready_cb = None
925 super().__init__(loop, sock, protocol, extra, server)
926 self._eof = False
927 self._empty_waiter = None
928
929 # Disable the Nagle algorithm -- small writes will be
930 # sent without waiting for the TCP ACK. This generally
931 # decreases the latency (in some cases significantly.)
932 base_events._set_nodelay(self._sock)
933
934 self._loop.call_soon(self._protocol.connection_made, self)
935 # only start reading when connection_made() has been called
936 self._loop.call_soon(self._add_reader,
937 self._sock_fd, self._read_ready)
938 if waiter is not None:
939 # only wake up the waiter when connection_made() has been called
940 self._loop.call_soon(futures._set_result_unless_cancelled,
941 waiter, None)
942
943 def set_protocol(self, protocol):
944 if isinstance(protocol, protocols.BufferedProtocol):
945 self._read_ready_cb = self._read_ready__get_buffer
946 else:
947 self._read_ready_cb = self._read_ready__data_received
948
949 super().set_protocol(protocol)
950
951 def _read_ready(self):
952 self._read_ready_cb()
953
954 def _read_ready__get_buffer(self):
955 if self._conn_lost:
956 return
957
958 try:
959 buf = self._protocol.get_buffer(-1)
960 if not len(buf):
961 raise RuntimeError('get_buffer() returned an empty buffer')
962 except (SystemExit, KeyboardInterrupt):
963 raise
964 except BaseException as exc:
965 self._fatal_error(
966 exc, 'Fatal error: protocol.get_buffer() call failed.')
967 return
968
969 try:
970 nbytes = self._sock.recv_into(buf)
971 except (BlockingIOError, InterruptedError):
972 return
973 except (SystemExit, KeyboardInterrupt):
974 raise
975 except BaseException as exc:
976 self._fatal_error(exc, 'Fatal read error on socket transport')
977 return
978
979 if not nbytes:
980 self._read_ready__on_eof()
981 return
982
983 try:
984 self._protocol.buffer_updated(nbytes)
985 except (SystemExit, KeyboardInterrupt):
986 raise
987 except BaseException as exc:
988 self._fatal_error(
989 exc, 'Fatal error: protocol.buffer_updated() call failed.')
990
991 def _read_ready__data_received(self):
992 if self._conn_lost:
993 return
994 try:
995 data = self._sock.recv(self.max_size)
996 except (BlockingIOError, InterruptedError):
997 return
998 except (SystemExit, KeyboardInterrupt):
999 raise
1000 except BaseException as exc:
1001 self._fatal_error(exc, 'Fatal read error on socket transport')
1002 return
1003
1004 if not data:
1005 self._read_ready__on_eof()
1006 return
1007
1008 try:
1009 self._protocol.data_received(data)
1010 except (SystemExit, KeyboardInterrupt):
1011 raise
1012 except BaseException as exc:
1013 self._fatal_error(
1014 exc, 'Fatal error: protocol.data_received() call failed.')
1015
1016 def _read_ready__on_eof(self):
1017 if self._loop.get_debug():
1018 logger.debug("%r received EOF", self)
1019
1020 try:
1021 keep_open = self._protocol.eof_received()
1022 except (SystemExit, KeyboardInterrupt):
1023 raise
1024 except BaseException as exc:
1025 self._fatal_error(
1026 exc, 'Fatal error: protocol.eof_received() call failed.')
1027 return
1028
1029 if keep_open:
1030 # We're keeping the connection open so the
1031 # protocol can write more, but we still can't
1032 # receive more, so remove the reader callback.
1033 self._loop._remove_reader(self._sock_fd)
1034 else:
1035 self.close()
1036
1037 def write(self, data):
1038 if not isinstance(data, (bytes, bytearray, memoryview)):
1039 raise TypeError(f'data argument must be a bytes-like object, '
1040 f'not {type(data).__name__!r}')
1041 if self._eof:
1042 raise RuntimeError('Cannot call write() after write_eof()')
1043 if self._empty_waiter is not None:
1044 raise RuntimeError('unable to write; sendfile is in progress')
1045 if not data:
1046 return
1047
1048 if self._conn_lost:
1049 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1050 logger.warning('socket.send() raised exception.')
1051 self._conn_lost += 1
1052 return
1053
1054 if not self._buffer:
1055 # Optimization: try to send now.
1056 try:
1057 n = self._sock.send(data)
1058 except (BlockingIOError, InterruptedError):
1059 pass
1060 except (SystemExit, KeyboardInterrupt):
1061 raise
1062 except BaseException as exc:
1063 self._fatal_error(exc, 'Fatal write error on socket transport')
1064 return
1065 else:
1066 data = data[n:]
1067 if not data:
1068 return
1069 # Not all was written; register write handler.
1070 self._loop._add_writer(self._sock_fd, self._write_ready)
1071
1072 # Add it to the buffer.
1073 self._buffer.extend(data)
1074 self._maybe_pause_protocol()
1075
1076 def _write_ready(self):
1077 assert self._buffer, 'Data should not be empty'
1078
1079 if self._conn_lost:
1080 return
1081 try:
1082 n = self._sock.send(self._buffer)
1083 except (BlockingIOError, InterruptedError):
1084 pass
1085 except (SystemExit, KeyboardInterrupt):
1086 raise
1087 except BaseException as exc:
1088 self._loop._remove_writer(self._sock_fd)
1089 self._buffer.clear()
1090 self._fatal_error(exc, 'Fatal write error on socket transport')
1091 if self._empty_waiter is not None:
1092 self._empty_waiter.set_exception(exc)
1093 else:
1094 if n:
1095 del self._buffer[:n]
1096 self._maybe_resume_protocol() # May append to buffer.
1097 if not self._buffer:
1098 self._loop._remove_writer(self._sock_fd)
1099 if self._empty_waiter is not None:
1100 self._empty_waiter.set_result(None)
1101 if self._closing:
1102 self._call_connection_lost(None)
1103 elif self._eof:
1104 self._sock.shutdown(socket.SHUT_WR)
1105
1106 def write_eof(self):
1107 if self._closing or self._eof:
1108 return
1109 self._eof = True
1110 if not self._buffer:
1111 self._sock.shutdown(socket.SHUT_WR)
1112
1113 def can_write_eof(self):
1114 return True
1115
1116 def _call_connection_lost(self, exc):
1117 super()._call_connection_lost(exc)
1118 if self._empty_waiter is not None:
1119 self._empty_waiter.set_exception(
1120 ConnectionError("Connection is closed by peer"))
1121
1122 def _make_empty_waiter(self):
1123 if self._empty_waiter is not None:
1124 raise RuntimeError("Empty waiter is already set")
1125 self._empty_waiter = self._loop.create_future()
1126 if not self._buffer:
1127 self._empty_waiter.set_result(None)
1128 return self._empty_waiter
1129
1130 def _reset_empty_waiter(self):
1131 self._empty_waiter = None
1132
1133
1134 class ESC[4;38;5;81m_SelectorDatagramTransport(ESC[4;38;5;149m_SelectorTransport):
1135
1136 _buffer_factory = collections.deque
1137
1138 def __init__(self, loop, sock, protocol, address=None,
1139 waiter=None, extra=None):
1140 super().__init__(loop, sock, protocol, extra)
1141 self._address = address
1142 self._buffer_size = 0
1143 self._loop.call_soon(self._protocol.connection_made, self)
1144 # only start reading when connection_made() has been called
1145 self._loop.call_soon(self._add_reader,
1146 self._sock_fd, self._read_ready)
1147 if waiter is not None:
1148 # only wake up the waiter when connection_made() has been called
1149 self._loop.call_soon(futures._set_result_unless_cancelled,
1150 waiter, None)
1151
1152 def get_write_buffer_size(self):
1153 return self._buffer_size
1154
1155 def _read_ready(self):
1156 if self._conn_lost:
1157 return
1158 try:
1159 data, addr = self._sock.recvfrom(self.max_size)
1160 except (BlockingIOError, InterruptedError):
1161 pass
1162 except OSError as exc:
1163 self._protocol.error_received(exc)
1164 except (SystemExit, KeyboardInterrupt):
1165 raise
1166 except BaseException as exc:
1167 self._fatal_error(exc, 'Fatal read error on datagram transport')
1168 else:
1169 self._protocol.datagram_received(data, addr)
1170
1171 def sendto(self, data, addr=None):
1172 if not isinstance(data, (bytes, bytearray, memoryview)):
1173 raise TypeError(f'data argument must be a bytes-like object, '
1174 f'not {type(data).__name__!r}')
1175 if not data:
1176 return
1177
1178 if self._address:
1179 if addr not in (None, self._address):
1180 raise ValueError(
1181 f'Invalid address: must be None or {self._address}')
1182 addr = self._address
1183
1184 if self._conn_lost and self._address:
1185 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1186 logger.warning('socket.send() raised exception.')
1187 self._conn_lost += 1
1188 return
1189
1190 if not self._buffer:
1191 # Attempt to send it right away first.
1192 try:
1193 if self._extra['peername']:
1194 self._sock.send(data)
1195 else:
1196 self._sock.sendto(data, addr)
1197 return
1198 except (BlockingIOError, InterruptedError):
1199 self._loop._add_writer(self._sock_fd, self._sendto_ready)
1200 except OSError as exc:
1201 self._protocol.error_received(exc)
1202 return
1203 except (SystemExit, KeyboardInterrupt):
1204 raise
1205 except BaseException as exc:
1206 self._fatal_error(
1207 exc, 'Fatal write error on datagram transport')
1208 return
1209
1210 # Ensure that what we buffer is immutable.
1211 self._buffer.append((bytes(data), addr))
1212 self._buffer_size += len(data)
1213 self._maybe_pause_protocol()
1214
1215 def _sendto_ready(self):
1216 while self._buffer:
1217 data, addr = self._buffer.popleft()
1218 self._buffer_size -= len(data)
1219 try:
1220 if self._extra['peername']:
1221 self._sock.send(data)
1222 else:
1223 self._sock.sendto(data, addr)
1224 except (BlockingIOError, InterruptedError):
1225 self._buffer.appendleft((data, addr)) # Try again later.
1226 self._buffer_size += len(data)
1227 break
1228 except OSError as exc:
1229 self._protocol.error_received(exc)
1230 return
1231 except (SystemExit, KeyboardInterrupt):
1232 raise
1233 except BaseException as exc:
1234 self._fatal_error(
1235 exc, 'Fatal write error on datagram transport')
1236 return
1237
1238 self._maybe_resume_protocol() # May append to buffer.
1239 if not self._buffer:
1240 self._loop._remove_writer(self._sock_fd)
1241 if self._closing:
1242 self._call_connection_lost(None)